1.add properties from DataFrame in piflow

2.implement route by customized Properties
This commit is contained in:
judy0131 2019-09-06 15:15:11 +08:00
parent 049388a8e7
commit f5b3c3eee2
7 changed files with 224 additions and 9 deletions

View File

@ -94,19 +94,19 @@
<dependency>
<groupId>com.sksamuel.scrimage</groupId>
<artifactId>scrimage-core_2.11</artifactId>
<version>2.1.0</version>
<version>2.1.7</version>
</dependency>
<dependency>
<groupId>com.sksamuel.scrimage</groupId>
<artifactId>scrimage-io-extra_2.11</artifactId>
<version>2.1.0</version>
<version>2.1.7</version>
</dependency>
<dependency>
<groupId>com.sksamuel.scrimage</groupId>
<artifactId>scrimage-filters_2.11</artifactId>
<version>2.1.0</version>
<version>2.1.7</version>
</dependency>
<dependency>

View File

@ -0,0 +1,99 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"checkpoint":"Merge",
"stops":[
{
"uuid":"1111",
"name":"XmlParser",
"bundle":"cn.piflow.bundle.xml.XmlParser",
"properties":{
"xmlpath":"hdfs://10.0.86.89:9000/xjzhu/dblp.mini.xml",
"rowTag":"phdthesis"
}
},
{
"uuid":"2222",
"name":"SelectField",
"bundle":"cn.piflow.bundle.common.SelectField",
"properties":{
"schema":"title,author,pages"
}
},
{
"uuid":"3333",
"name":"PutHiveStreaming",
"bundle":"cn.piflow.bundle.hive.PutHiveStreaming",
"properties":{
"database":"sparktest",
"table":"dblp_phdthesis"
}
},
{
"uuid":"666",
"name":"Route",
"bundle":"cn.piflow.bundle.common.Route",
"properties":{
},
"customizedProperties":{
"port1":"author = \"Carmen Heine\"",
"port2":"author = \"Gerd Hoff\""
}
},
{
"uuid":"777",
"name":"JsonSave",
"bundle":"cn.piflow.bundle.json.JsonSave",
"properties":{
"jsonSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.json"
}
},
{
"uuid":"888",
"name":"CsvSave",
"bundle":"cn.piflow.bundle.csv.CsvSave",
"properties":{
"csvSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis_result.csv",
"header":"true",
"delimiter":","
}
}
],
"paths":[
{
"from":"XmlParser",
"outport":"",
"inport":"",
"to":"SelectField"
},
{
"from":"SelectField",
"outport":"",
"inport":"",
"to":"Route"
},
{
"from":"Route",
"outport":"",
"inport":"",
"to":"PutHiveStreaming"
},
{
"from":"Route",
"outport":"port1",
"inport":"",
"to":"JsonSave"
},
{
"from":"Route",
"outport":"port2",
"inport":"",
"to":"CsvSave"
}
]
}
}

View File

@ -0,0 +1,55 @@
package cn.piflow.bundle.common
import cn.piflow.conf._
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
class Route extends ConfigurableStop{
val authorEmail: String = "xjzhu@cnic.cn"
val description: String = "Route data by customizedProperties, key is port & value is filter condition"
val inportList: List[String] = List(PortEnum.DefaultPort.toString)
val outportList: List[String] = List(PortEnum.AnyPort.toString)
//var outports : List[String] = _
override def setProperties(map: Map[String, Any]): Unit = {
//val outportStr = MapUtil.get(map,"outports").asInstanceOf[String]
//outports = outportStr.split(",").toList
}
override def initialize(ctx: ProcessContext): Unit = {
}
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val df = in.read().cache()
if(this.customizedProperties != null || this.customizedProperties.size != 0){
val it = this.customizedProperties.keySet.iterator
while (it.hasNext){
val port = it.next()
val filterCondition = MapUtil.get(this.customizedProperties,port).asInstanceOf[String]
val filterDf = df.filter(filterCondition)
out.write(port,filterDf)
}
}
out.write(df);
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
//val outports = new PropertyDescriptor().name("outports").displayName("outports").description("outports string, seperated by ,.").defaultValue("").required(true)
//descriptor = outports :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("icon/common/Fork.png")
}
override def getGroup(): List[String] = {
List(StopGroup.CommonGroup.toString)
}
}

View File

@ -13,6 +13,7 @@ abstract class ConfigurableStop extends Stop{
val inportList : List[String] //= List(PortEnum.DefaultPort.toString)
val outportList : List[String] //= List(PortEnum.DefaultPort.toString)
var customizedProperties : Map[String, String] = null
def setProperties(map: Map[String, Any])
@ -22,4 +23,8 @@ abstract class ConfigurableStop extends Stop{
def getGroup() : List[String]
def setCustomizedProperties( customizedProperties : Map[String, String]) = {
this.customizedProperties = customizedProperties
}
}

View File

@ -13,6 +13,7 @@ class StopBean {
var name : String = _
var bundle : String = _
var properties : Map[String, String] = _
var customizedProperties : Map[String, String] = _
def init(flowName : String, map:Map[String,Any]) = {
this.flowName = flowName
@ -20,6 +21,12 @@ class StopBean {
this.name = MapUtil.get(map,"name").asInstanceOf[String]
this.bundle = MapUtil.get(map,"bundle").asInstanceOf[String]
this.properties = MapUtil.get(map, "properties").asInstanceOf[Map[String, String]]
if(map.contains("customizedProperties")){
this.customizedProperties = MapUtil.get(map, "customizedProperties").asInstanceOf[Map[String, String]]
}else{
this.customizedProperties = Map[String, String]()
}
}
def constructStop() : ConfigurableStop = {
@ -51,8 +58,12 @@ class StopBean {
}
stop.setProperties(newProperties.toMap)
}else
}else {
stop.setProperties(this.properties)
}
stop.setCustomizedProperties(this.customizedProperties)
stop

View File

@ -15,7 +15,7 @@ class FlowTest {
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow.json"
val file = "src/main/resources/flow_route.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
@ -28,13 +28,13 @@ class FlowTest {
//execute flow
val spark = SparkSession.builder()
.master("spark://10.0.86.89:7077")
.master("local")
.appName("piflow-hive-bundle-xjzhu")
.config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g")
.config("spark.cores.max", "2")
.config("hive.metastore.uris","thrift://10.0.86.191:9083")
.config("spark.jars","/opt/project/piflow/out/artifacts/piflow_bundle/piflow-bundle.jar")
//.config("spark.jars","/opt/project/piflow/out/artifacts/piflow_bundle/piflow-bundle.jar")
.enableHiveSupport()
.getOrCreate()
// val spark = SparkSession.builder()
@ -66,7 +66,7 @@ class FlowTest {
spark.close();
}
@Test
/*@Test
def testFlow2json() = {
//parse flow json
@ -78,6 +78,6 @@ class FlowTest {
val flowBean = FlowBean(map)
val flowJson = flowBean.toJson()
println(flowJson)
}
}*/
}

View File

@ -22,6 +22,10 @@ trait JobInputStream {
def ports(): Seq[String];
def read(inport: String): DataFrame;
def readProperties() : MMap[String, String];
def readProperties(inport : String) : MMap[String, String]
}
trait JobOutputStream {
@ -33,6 +37,10 @@ trait JobOutputStream {
def write(bundle: String, data: DataFrame);
def writeProperties(properties : MMap[String, String]);
def writeProperties(bundle: String, properties : MMap[String, String]);
def sendError();
def getDataCount() : MMap[String, Long];
@ -391,14 +399,19 @@ trait ProjectContext extends Context {
class JobInputStreamImpl() extends JobInputStream {
//only returns DataFrame on calling read()
val inputs = MMap[String, () => DataFrame]();
val inputsProperties = MMap[String, () => MMap[String, String]]()
override def isEmpty(): Boolean = inputs.isEmpty;
def attach(inputs: Map[Edge, JobOutputStreamImpl]) = {
this.inputs ++= inputs.filter(x => x._2.contains(x._1.outport))
.map(x => (x._1.inport, x._2.getDataFrame(x._1.outport)));
this.inputsProperties ++= inputs.filter(x => x._2.contains(x._1.outport))
.map(x => (x._1.inport, x._2.getDataFrameProperties(x._1.outport)));
};
override def ports(): Seq[String] = {
inputs.keySet.toSeq;
}
@ -413,6 +426,16 @@ class JobInputStreamImpl() extends JobInputStream {
override def read(inport: String): DataFrame = {
inputs(inport)();
}
override def readProperties(): MMap[String, String] = {
readProperties("")
}
override def readProperties(inport: String): MMap[String, String] = {
inputsProperties(inport)()
}
}
class JobOutputStreamImpl() extends JobOutputStream with Logging {
@ -484,6 +507,8 @@ class JobOutputStreamImpl() extends JobOutputStream with Logging {
val mapDataFrame = MMap[String, () => DataFrame]();
val mapDataFrameProperties = MMap[String, () => MMap[String, String]]();
override def write(data: DataFrame): Unit = write("", data);
override def sendError(): Unit = ???
@ -582,6 +607,26 @@ class JobOutputStreamImpl() extends JobOutputStream with Logging {
})
incrementalValue
}
override def writeProperties(properties: MMap[String, String]): Unit = {
writeProperties("",properties)
}
override def writeProperties(outport: String, properties: MMap[String, String]): Unit = {
mapDataFrameProperties(outport) = () => properties
}
def getDataFrameProperties(port : String) = {
if(!mapDataFrameProperties.contains(port)){
mapDataFrameProperties(port) = () => MMap[String, String]()
}
mapDataFrameProperties(port)
}
}
class ProcessImpl(flow: Flow, runnerContext: Context, runner: Runner, parentProcess: Option[Process] = None)