From f2510a428507805a700f3799531a84285b7c1a1b Mon Sep 17 00:00:00 2001 From: judy0131 Date: Mon, 8 Oct 2018 13:46:12 +0800 Subject: [PATCH] set appname --- piflow-server/src/main/scala/cn/piflow/api/API.scala | 10 ++++++++-- .../main/scala/cn/piflow/api/HTTPClientStartFlow.scala | 2 +- .../scala/cn/piflow/api/HTTPClientStartFlow1.scala | 2 +- .../src/main/scala/cn/piflow/api/StartFlowMain.scala | 1 - 4 files changed, 10 insertions(+), 5 deletions(-) diff --git a/piflow-server/src/main/scala/cn/piflow/api/API.scala b/piflow-server/src/main/scala/cn/piflow/api/API.scala index 35809bd..e55cb8e 100644 --- a/piflow-server/src/main/scala/cn/piflow/api/API.scala +++ b/piflow-server/src/main/scala/cn/piflow/api/API.scala @@ -5,7 +5,7 @@ import java.util.concurrent.CountDownLatch import cn.piflow.Runner import cn.piflow.conf.bean.{FlowBean, PropertyDescriptor} import org.apache.spark.sql.SparkSession -import cn.piflow.conf.util.{ClassUtil, OptionUtil} +import cn.piflow.conf.util.{ClassUtil, MapUtil, OptionUtil} import cn.piflow.Process import cn.piflow.api.util.PropertyUtil import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet, HttpPost} @@ -57,10 +57,16 @@ object API { }*/ def startFlow(flowJson : String):(String,SparkAppHandle) = { + var appId:String = null + val map = OptionUtil.getAny(JSON.parseFull(flowJson)).asInstanceOf[Map[String, Any]] + val flowMap = MapUtil.get(map, "flow").asInstanceOf[Map[String, Any]] + val appName = MapUtil.get(flowMap,"name").asInstanceOf[String] + val countDownLatch = new CountDownLatch(1) val launcher = new SparkLauncher - val handle =launcher//.setMaster(PropertyUtil.getPropertyValue("spark.master")) + val handle =launcher + .setAppName(appName) .setMaster(PropertyUtil.getPropertyValue("spark.master")) .setDeployMode(PropertyUtil.getPropertyValue("spark.deploy.mode")) .setAppResource(PropertyUtil.getPropertyValue("piflow.bundle")) diff --git a/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStartFlow.scala b/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStartFlow.scala index ca6ca23..9b74529 100644 --- a/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStartFlow.scala +++ b/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStartFlow.scala @@ -13,7 +13,7 @@ object HTTPClientStartFlow { val json =""" |{ | "flow":{ - | "name":"test", + | "name":"xml,csv-merge-fork-hive,json,csv", | "uuid":"1234", | "checkpoint":"Merge", | "stops":[ diff --git a/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStartFlow1.scala b/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStartFlow1.scala index be92730..a714f1f 100644 --- a/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStartFlow1.scala +++ b/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStartFlow1.scala @@ -12,7 +12,7 @@ object HTTPClientStartFlow1 { """ |{ | "flow":{ - | "name":"test", + | "name":"xml2csv", | "uuid":"1234", | "checkpoint":"Merge", | "stops":[ diff --git a/piflow-server/src/main/scala/cn/piflow/api/StartFlowMain.scala b/piflow-server/src/main/scala/cn/piflow/api/StartFlowMain.scala index eeeb4ce..435cd35 100644 --- a/piflow-server/src/main/scala/cn/piflow/api/StartFlowMain.scala +++ b/piflow-server/src/main/scala/cn/piflow/api/StartFlowMain.scala @@ -13,7 +13,6 @@ object StartFlowMain { def main(args: Array[String]): Unit = { val flowJson = args(0) println(flowJson) - val t = JSON.parseFull(flowJson) val map = OptionUtil.getAny(JSON.parseFull(flowJson)).asInstanceOf[Map[String, Any]] println(map)