diff --git a/piflow-bundle/pom.xml b/piflow-bundle/pom.xml
index 2332e6e..20be5cb 100644
--- a/piflow-bundle/pom.xml
+++ b/piflow-bundle/pom.xml
@@ -94,19 +94,19 @@
com.sksamuel.scrimage
scrimage-core_2.11
- 2.1.0
+ 2.1.7
com.sksamuel.scrimage
scrimage-io-extra_2.11
- 2.1.0
+ 2.1.7
com.sksamuel.scrimage
scrimage-filters_2.11
- 2.1.0
+ 2.1.7
diff --git a/piflow-bundle/src/main/resources/flow_route.json b/piflow-bundle/src/main/resources/flow_route.json
new file mode 100644
index 0000000..411493c
--- /dev/null
+++ b/piflow-bundle/src/main/resources/flow_route.json
@@ -0,0 +1,99 @@
+{
+ "flow":{
+ "name":"test",
+ "uuid":"1234",
+ "checkpoint":"Merge",
+ "stops":[
+ {
+ "uuid":"1111",
+ "name":"XmlParser",
+ "bundle":"cn.piflow.bundle.xml.XmlParser",
+ "properties":{
+ "xmlpath":"hdfs://10.0.86.89:9000/xjzhu/dblp.mini.xml",
+ "rowTag":"phdthesis"
+ }
+
+ },
+ {
+ "uuid":"2222",
+ "name":"SelectField",
+ "bundle":"cn.piflow.bundle.common.SelectField",
+ "properties":{
+ "schema":"title,author,pages"
+ }
+
+ },
+ {
+ "uuid":"3333",
+ "name":"PutHiveStreaming",
+ "bundle":"cn.piflow.bundle.hive.PutHiveStreaming",
+ "properties":{
+ "database":"sparktest",
+ "table":"dblp_phdthesis"
+ }
+ },
+ {
+ "uuid":"666",
+ "name":"Route",
+ "bundle":"cn.piflow.bundle.common.Route",
+ "properties":{
+
+ },
+ "customizedProperties":{
+ "port1":"author = \"Carmen Heine\"",
+ "port2":"author = \"Gerd Hoff\""
+ }
+ },
+ {
+ "uuid":"777",
+ "name":"JsonSave",
+ "bundle":"cn.piflow.bundle.json.JsonSave",
+ "properties":{
+ "jsonSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.json"
+ }
+ },
+ {
+ "uuid":"888",
+ "name":"CsvSave",
+ "bundle":"cn.piflow.bundle.csv.CsvSave",
+ "properties":{
+ "csvSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis_result.csv",
+ "header":"true",
+ "delimiter":","
+ }
+ }
+ ],
+ "paths":[
+ {
+ "from":"XmlParser",
+ "outport":"",
+ "inport":"",
+ "to":"SelectField"
+ },
+ {
+ "from":"SelectField",
+ "outport":"",
+ "inport":"",
+ "to":"Route"
+ },
+ {
+ "from":"Route",
+ "outport":"",
+ "inport":"",
+ "to":"PutHiveStreaming"
+ },
+ {
+ "from":"Route",
+ "outport":"port1",
+ "inport":"",
+ "to":"JsonSave"
+ },
+ {
+ "from":"Route",
+ "outport":"port2",
+ "inport":"",
+ "to":"CsvSave"
+ }
+ ]
+ }
+}
\ No newline at end of file
diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Route.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Route.scala
new file mode 100644
index 0000000..77cb7e8
--- /dev/null
+++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Route.scala
@@ -0,0 +1,55 @@
+package cn.piflow.bundle.common
+
+import cn.piflow.conf._
+import cn.piflow.conf.bean.PropertyDescriptor
+import cn.piflow.conf.util.{ImageUtil, MapUtil}
+import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
+
+class Route extends ConfigurableStop{
+
+ val authorEmail: String = "xjzhu@cnic.cn"
+ val description: String = "Route data by customizedProperties, key is port & value is filter condition"
+ val inportList: List[String] = List(PortEnum.DefaultPort.toString)
+ val outportList: List[String] = List(PortEnum.AnyPort.toString)
+
+ //var outports : List[String] = _
+
+ override def setProperties(map: Map[String, Any]): Unit = {
+ //val outportStr = MapUtil.get(map,"outports").asInstanceOf[String]
+ //outports = outportStr.split(",").toList
+ }
+
+ override def initialize(ctx: ProcessContext): Unit = {
+
+ }
+
+ override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
+ val df = in.read().cache()
+
+ if(this.customizedProperties != null || this.customizedProperties.size != 0){
+ val it = this.customizedProperties.keySet.iterator
+ while (it.hasNext){
+ val port = it.next()
+ val filterCondition = MapUtil.get(this.customizedProperties,port).asInstanceOf[String]
+ val filterDf = df.filter(filterCondition)
+ out.write(port,filterDf)
+ }
+ }
+ out.write(df);
+ }
+
+ override def getPropertyDescriptor(): List[PropertyDescriptor] = {
+ var descriptor : List[PropertyDescriptor] = List()
+ //val outports = new PropertyDescriptor().name("outports").displayName("outports").description("outports string, seperated by ,.").defaultValue("").required(true)
+ //descriptor = outports :: descriptor
+ descriptor
+ }
+
+ override def getIcon(): Array[Byte] = {
+ ImageUtil.getImage("icon/common/Fork.png")
+ }
+
+ override def getGroup(): List[String] = {
+ List(StopGroup.CommonGroup.toString)
+ }
+}
diff --git a/piflow-bundle/src/main/scala/cn/piflow/conf/ConfigurableStop.scala b/piflow-bundle/src/main/scala/cn/piflow/conf/ConfigurableStop.scala
index 39b030d..e78fd9a 100644
--- a/piflow-bundle/src/main/scala/cn/piflow/conf/ConfigurableStop.scala
+++ b/piflow-bundle/src/main/scala/cn/piflow/conf/ConfigurableStop.scala
@@ -13,6 +13,7 @@ abstract class ConfigurableStop extends Stop{
val inportList : List[String] //= List(PortEnum.DefaultPort.toString)
val outportList : List[String] //= List(PortEnum.DefaultPort.toString)
+ var customizedProperties : Map[String, String] = null
def setProperties(map: Map[String, Any])
@@ -22,4 +23,8 @@ abstract class ConfigurableStop extends Stop{
def getGroup() : List[String]
+ def setCustomizedProperties( customizedProperties : Map[String, String]) = {
+ this.customizedProperties = customizedProperties
+ }
+
}
diff --git a/piflow-bundle/src/main/scala/cn/piflow/conf/bean/StopBean.scala b/piflow-bundle/src/main/scala/cn/piflow/conf/bean/StopBean.scala
index 3ae4333..1a82826 100644
--- a/piflow-bundle/src/main/scala/cn/piflow/conf/bean/StopBean.scala
+++ b/piflow-bundle/src/main/scala/cn/piflow/conf/bean/StopBean.scala
@@ -13,6 +13,7 @@ class StopBean {
var name : String = _
var bundle : String = _
var properties : Map[String, String] = _
+ var customizedProperties : Map[String, String] = _
def init(flowName : String, map:Map[String,Any]) = {
this.flowName = flowName
@@ -20,6 +21,12 @@ class StopBean {
this.name = MapUtil.get(map,"name").asInstanceOf[String]
this.bundle = MapUtil.get(map,"bundle").asInstanceOf[String]
this.properties = MapUtil.get(map, "properties").asInstanceOf[Map[String, String]]
+ if(map.contains("customizedProperties")){
+ this.customizedProperties = MapUtil.get(map, "customizedProperties").asInstanceOf[Map[String, String]]
+ }else{
+ this.customizedProperties = Map[String, String]()
+ }
+
}
def constructStop() : ConfigurableStop = {
@@ -51,8 +58,12 @@ class StopBean {
}
stop.setProperties(newProperties.toMap)
- }else
+ }else {
stop.setProperties(this.properties)
+ }
+
+ stop.setCustomizedProperties(this.customizedProperties)
+
stop
diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/FlowTest.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/FlowTest.scala
index 580fe92..e3f293c 100644
--- a/piflow-bundle/src/test/scala/cn/piflow/bundle/FlowTest.scala
+++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/FlowTest.scala
@@ -15,7 +15,7 @@ class FlowTest {
def testFlow(): Unit ={
//parse flow json
- val file = "src/main/resources/flow.json"
+ val file = "src/main/resources/flow_route.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
@@ -28,13 +28,13 @@ class FlowTest {
//execute flow
val spark = SparkSession.builder()
- .master("spark://10.0.86.89:7077")
+ .master("local")
.appName("piflow-hive-bundle-xjzhu")
.config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g")
.config("spark.cores.max", "2")
.config("hive.metastore.uris","thrift://10.0.86.191:9083")
- .config("spark.jars","/opt/project/piflow/out/artifacts/piflow_bundle/piflow-bundle.jar")
+ //.config("spark.jars","/opt/project/piflow/out/artifacts/piflow_bundle/piflow-bundle.jar")
.enableHiveSupport()
.getOrCreate()
// val spark = SparkSession.builder()
@@ -66,7 +66,7 @@ class FlowTest {
spark.close();
}
- @Test
+ /*@Test
def testFlow2json() = {
//parse flow json
@@ -78,6 +78,6 @@ class FlowTest {
val flowBean = FlowBean(map)
val flowJson = flowBean.toJson()
println(flowJson)
- }
+ }*/
}
diff --git a/piflow-core/src/main/scala/cn/piflow/main.scala b/piflow-core/src/main/scala/cn/piflow/main.scala
index cc71390..29d537b 100644
--- a/piflow-core/src/main/scala/cn/piflow/main.scala
+++ b/piflow-core/src/main/scala/cn/piflow/main.scala
@@ -22,6 +22,10 @@ trait JobInputStream {
def ports(): Seq[String];
def read(inport: String): DataFrame;
+
+ def readProperties() : MMap[String, String];
+
+ def readProperties(inport : String) : MMap[String, String]
}
trait JobOutputStream {
@@ -33,6 +37,10 @@ trait JobOutputStream {
def write(bundle: String, data: DataFrame);
+ def writeProperties(properties : MMap[String, String]);
+
+ def writeProperties(bundle: String, properties : MMap[String, String]);
+
def sendError();
def getDataCount() : MMap[String, Long];
@@ -391,14 +399,19 @@ trait ProjectContext extends Context {
class JobInputStreamImpl() extends JobInputStream {
//only returns DataFrame on calling read()
val inputs = MMap[String, () => DataFrame]();
+ val inputsProperties = MMap[String, () => MMap[String, String]]()
override def isEmpty(): Boolean = inputs.isEmpty;
def attach(inputs: Map[Edge, JobOutputStreamImpl]) = {
this.inputs ++= inputs.filter(x => x._2.contains(x._1.outport))
.map(x => (x._1.inport, x._2.getDataFrame(x._1.outport)));
+
+ this.inputsProperties ++= inputs.filter(x => x._2.contains(x._1.outport))
+ .map(x => (x._1.inport, x._2.getDataFrameProperties(x._1.outport)));
};
+
override def ports(): Seq[String] = {
inputs.keySet.toSeq;
}
@@ -413,6 +426,16 @@ class JobInputStreamImpl() extends JobInputStream {
override def read(inport: String): DataFrame = {
inputs(inport)();
}
+
+ override def readProperties(): MMap[String, String] = {
+
+ readProperties("")
+ }
+
+ override def readProperties(inport: String): MMap[String, String] = {
+
+ inputsProperties(inport)()
+ }
}
class JobOutputStreamImpl() extends JobOutputStream with Logging {
@@ -484,6 +507,8 @@ class JobOutputStreamImpl() extends JobOutputStream with Logging {
val mapDataFrame = MMap[String, () => DataFrame]();
+ val mapDataFrameProperties = MMap[String, () => MMap[String, String]]();
+
override def write(data: DataFrame): Unit = write("", data);
override def sendError(): Unit = ???
@@ -582,6 +607,26 @@ class JobOutputStreamImpl() extends JobOutputStream with Logging {
})
incrementalValue
}
+
+ override def writeProperties(properties: MMap[String, String]): Unit = {
+
+ writeProperties("",properties)
+
+ }
+
+ override def writeProperties(outport: String, properties: MMap[String, String]): Unit = {
+
+ mapDataFrameProperties(outport) = () => properties
+
+ }
+
+ def getDataFrameProperties(port : String) = {
+ if(!mapDataFrameProperties.contains(port)){
+ mapDataFrameProperties(port) = () => MMap[String, String]()
+
+ }
+ mapDataFrameProperties(port)
+ }
}
class ProcessImpl(flow: Flow, runnerContext: Context, runner: Runner, parentProcess: Option[Process] = None)