diff --git a/conf/config.properties b/conf/config.properties index 095efa6..f1e81c2 100644 --- a/conf/config.properties +++ b/conf/config.properties @@ -17,4 +17,4 @@ hive.metastore.uris=thrift://10.0.86.191:9083 piflow.bundle=/opt/project/piflow/piflow-server/target/piflow-server-0.9.jar yarn.url=http://10.0.86.191:8088/ws/v1/cluster/apps/ -checkpoint.path=hdfs://10.0.86.89:9000/xjzhu/piflow/checkpoints/" \ No newline at end of file +checkpoint.path=hdfs://10.0.86.89:9000/xjzhu/piflow/checkpoints/ \ No newline at end of file diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/FlowTest_XX.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/FlowTest_XX.scala index 93ff00c..0309ef1 100644 --- a/piflow-bundle/src/test/scala/cn/piflow/bundle/FlowTest_XX.scala +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/FlowTest_XX.scala @@ -8,7 +8,7 @@ import org.junit.Test import scala.util.parsing.json.JSON -class FlowTest { +class FlowTest_XX { @Test def testFlow(): Unit ={ diff --git a/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStartFlow1.scala b/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStartFlow1.scala index c24c32c..be92730 100644 --- a/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStartFlow1.scala +++ b/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStartFlow1.scala @@ -8,7 +8,61 @@ import org.apache.http.util.EntityUtils object HTTPClientStartFlow1 { def main(args: Array[String]): Unit = { - val json = """{"flow":{"name":"Flow1","uuid":"1234","stops":[{"uuid":"1111","name":"XmlParser","bundle":"cn.piflow.bundle.xml.XmlParser","properties":{"xmlpath":"hdfs://10.0.86.89:9000/xjzhu/dblp.mini.xml","rowTag":"phdthesis"}},{"uuid":"2222","name":"SelectField","bundle":"cn.piflow.bundle.common.SelectField","properties":{"schema":"title,author,pages"}},{"uuid":"3333","name":"PutHiveStreaming","bundle":"cn.piflow.bundle.hive.PutHiveStreaming","properties":{"database":"sparktest","table":"dblp_phdthesis"}},{"uuid":"4444","name":"CsvParser","bundle":"cn.piflow.bundle.csv.CsvParser","properties":{"csvPath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.csv","header":"false","delimiter":",","schema":"title,author,pages"}},{"uuid":"555","name":"Merge","bundle":"cn.piflow.bundle.common.Merge","properties":{}}],"paths":[{"from":"XmlParser","outport":"","inport":"","to":"SelectField"},{"from":"SelectField","outport":"","inport":"data1","to":"Merge"},{"from":"CsvParser","outport":"","inport":"data2","to":"Merge"},{"from":"Merge","outport":"","inport":"","to":"PutHiveStreaming"}]}}""" + val json = + """ + |{ + | "flow":{ + | "name":"test", + | "uuid":"1234", + | "checkpoint":"Merge", + | "stops":[ + | { + | "uuid":"1111", + | "name":"XmlParser", + | "bundle":"cn.piflow.bundle.xml.XmlParser", + | "properties":{ + | "xmlpath":"hdfs://10.0.86.89:9000/xjzhu/dblp.mini.xml", + | "rowTag":"phdthesis" + | } + | + | }, + | { + | "uuid":"2222", + | "name":"SelectField", + | "bundle":"cn.piflow.bundle.common.SelectField", + | "properties":{ + | "schema":"title,author,pages" + | } + | + | }, + | { + | "uuid":"888", + | "name":"CsvSave", + | "bundle":"cn.piflow.bundle.csv.CsvSave", + | "properties":{ + | "csvSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis_result.csv", + | "header":"true", + | "delimiter":"," + | } + | } + | ], + | "paths":[ + | { + | "from":"XmlParser", + | "outport":"", + | "inport":"", + | "to":"SelectField" + | }, + | { + | "from":"SelectField", + | "outport":"", + | "inport":"", + | "to":"CsvSave" + | } + | ] + | } + |} + """.stripMargin val url = "http://10.0.86.98:8001/flow/start" val client = HttpClients.createDefault() val post:HttpPost = new HttpPost(url) diff --git a/piflow-server/src/main/scala/cn/piflow/api/StartFlowMain.scala b/piflow-server/src/main/scala/cn/piflow/api/StartFlowMain.scala index d463ba2..eeeb4ce 100644 --- a/piflow-server/src/main/scala/cn/piflow/api/StartFlowMain.scala +++ b/piflow-server/src/main/scala/cn/piflow/api/StartFlowMain.scala @@ -29,7 +29,8 @@ object StartFlowMain { val process = Runner.create() .bind(classOf[SparkSession].getName, spark) - .bind("checkpoint.path", PropertyUtil.getPropertyValue("checkpoint.path")) + //.bind("checkpoint.path", PropertyUtil.getPropertyValue("checkpoint.path")) + .bind("checkpoint.path","hdfs://10.0.86.89:9000/xjzhu/piflow/checkpoints/") .start(flow); val applicationId = spark.sparkContext.applicationId process.awaitTermination();