From 7deb8946c7edad18ee52ad145d95106e4e6d6534 Mon Sep 17 00:00:00 2001 From: judy0131 Date: Sun, 5 May 2019 15:34:35 +0800 Subject: [PATCH] support run project API --- config.properties | 2 +- .../scala/cn/piflow/conf/bean/FlowBean.scala | 4 +- .../cn/piflow/conf/bean/FlowGroupBean.scala | 29 +-- .../cn/piflow/conf/bean/ProjectBean.scala | 123 +++++++++++ .../piflow/conf/bean/ProjectEntryBean.scala | 19 ++ .../src/main/scala/cn/piflow/project.scala | 83 ++++++- .../src/main/scala/cn/piflow/api/API.scala | 20 +- .../piflow/api/HTTPClientStartProject.scala | 208 ++++++++++++++++++ .../scala/cn/piflow/api/HTTPService.scala | 27 ++- 9 files changed, 471 insertions(+), 44 deletions(-) create mode 100644 piflow-bundle/src/main/scala/cn/piflow/conf/bean/ProjectEntryBean.scala create mode 100644 piflow-server/src/main/scala/cn/piflow/api/HTTPClientStartProject.scala diff --git a/config.properties b/config.properties index 851dc4f..ca2e660 100644 --- a/config.properties +++ b/config.properties @@ -1,4 +1,4 @@ -server.ip=10.0.86.124 +server.ip=10.0.86.98 server.port=8001 #spark.master=spark://10.0.86.89:7077 diff --git a/piflow-bundle/src/main/scala/cn/piflow/conf/bean/FlowBean.scala b/piflow-bundle/src/main/scala/cn/piflow/conf/bean/FlowBean.scala index 6ca7f5c..ed8cbca 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/conf/bean/FlowBean.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/conf/bean/FlowBean.scala @@ -1,7 +1,7 @@ package cn.piflow.conf.bean import cn.piflow.conf.util.{JsonUtil, MapUtil} -import cn.piflow.{FlowImpl, Path} +import cn.piflow.{FlowImpl, Path, ProjectEntry} import net.liftweb.json.JsonDSL._ import net.liftweb.json._ @@ -9,7 +9,7 @@ import scala.util.parsing.json.JSONObject -class FlowBean { +class FlowBean extends ProjectEntryBean{ /*@BeanProperty*/ var uuid : String = _ var name : String = _ diff --git a/piflow-bundle/src/main/scala/cn/piflow/conf/bean/FlowGroupBean.scala b/piflow-bundle/src/main/scala/cn/piflow/conf/bean/FlowGroupBean.scala index 78ca7ae..ea3fa32 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/conf/bean/FlowGroupBean.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/conf/bean/FlowGroupBean.scala @@ -1,17 +1,16 @@ package cn.piflow.conf.bean -import cn.piflow.{Condition, FlowGroupExecution, FlowGroupImpl} +import cn.piflow._ import cn.piflow.conf.util.MapUtil /** * Created by xjzhu@cnic.cn on 4/25/19 */ -class FlowGroupBean { +class FlowGroupBean extends ProjectEntryBean { var uuid : String = _ var name : String = _ var flows : List[FlowBean] = List() - //var conditions : [ConditionBean] = List() var conditions = scala.collection.mutable.Map[String, ConditionBean]() def init(map : Map[String, Any]) = { @@ -69,30 +68,6 @@ class FlowGroupBean { flowGroup } - - /*def toJson():String = { - val json = - ("flow" -> - ("uuid" -> this.uuid) ~ - ("name" -> this.name) ~ - ("stops" -> - stops.map { stop =>( - ("uuid" -> stop.uuid) ~ - ("name" -> stop.name)~ - ("bundle" -> stop.bundle) )}) ~ - ("paths" -> - paths.map { path => ( - ("from" -> path.from) ~ - ("outport" -> path.outport) ~ - ("inport" -> path.inport) ~ - ("to" -> path.to) - )})) - val jsonString = compactRender(json) - //println(jsonString) - jsonString - }*/ - - } object FlowGroupBean{ diff --git a/piflow-bundle/src/main/scala/cn/piflow/conf/bean/ProjectBean.scala b/piflow-bundle/src/main/scala/cn/piflow/conf/bean/ProjectBean.scala index 8c4e2c1..391fd1c 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/conf/bean/ProjectBean.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/conf/bean/ProjectBean.scala @@ -1,8 +1,131 @@ package cn.piflow.conf.bean +import cn.piflow.{Condition, ProjectImpl} +import cn.piflow.conf.util.MapUtil + /** * Created by xjzhu@cnic.cn on 4/25/19 */ class ProjectBean { + var uuid : String = _ + var name : String = _ + var projectEntries : List[ProjectEntryBean] = List() + var conditions = scala.collection.mutable.Map[String, ConditionBean]() + + def init(map : Map[String, Any]) = { + + val projectMap = MapUtil.get(map, "project").asInstanceOf[Map[String, Any]] + + this.uuid = MapUtil.get(projectMap,"uuid").asInstanceOf[String] + this.name = MapUtil.get(projectMap,"name").asInstanceOf[String] + + //construct FlowGroupBean List + val groupList = MapUtil.get(projectMap,"groups").asInstanceOf[List[Map[String, Any]]] + groupList.foreach( flowGroupMap => { + val flowGroup = FlowGroupBean(flowGroupMap.asInstanceOf[Map[String, Any]]) + this.projectEntries = flowGroup +: this.projectEntries + }) + + //construct FlowBean List + val flowList = MapUtil.get(projectMap,"flows").asInstanceOf[List[Map[String, Any]]] + flowList.foreach( flowMap => { + val flow = FlowBean(flowMap.asInstanceOf[Map[String, Any]]) + this.projectEntries = flow +: this.projectEntries + }) + + //construct ConditionBean List + val conditionList = MapUtil.get(projectMap,"conditions").asInstanceOf[List[Map[String, Any]]] + conditionList.foreach( conditionMap => { + val conditionBean = ConditionBean(conditionMap.asInstanceOf[Map[String, Any]]) + conditions(conditionBean.entry) = conditionBean + }) + + } + + def constructProject() = { + val project = new ProjectImpl(); + project.setProjectName(name) + + this.projectEntries.foreach( projectEntryBean => { + if( !conditions.contains(projectEntryBean.name) ){ + if (projectEntryBean.isInstanceOf[FlowBean]){ + val bean = projectEntryBean.asInstanceOf[FlowBean] + project.addProjectEntry(projectEntryBean.name,bean.constructFlow()) + }else{ + val groupBean = projectEntryBean.asInstanceOf[FlowGroupBean] + project.addProjectEntry(groupBean.name,groupBean.constructFlowGroup()) + } + + } + else{ + val conditionBean = conditions(projectEntryBean.name) + + if(conditionBean.after.size == 0){ + + println(projectEntryBean.name + " do not have after flow " + "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@") + + if (projectEntryBean.isInstanceOf[FlowBean]){ + val bean = projectEntryBean.asInstanceOf[FlowBean] + project.addProjectEntry(projectEntryBean.name,bean.constructFlow()) + }else{ + val groupBean = projectEntryBean.asInstanceOf[FlowGroupBean] + project.addProjectEntry(groupBean.name,groupBean.constructFlowGroup()) + } + + } + else if(conditionBean.after.size == 1){ + + println(projectEntryBean.name + " after " + conditionBean.after(0) + "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@") + + if (projectEntryBean.isInstanceOf[FlowBean]){ + val bean = projectEntryBean.asInstanceOf[FlowBean] + project.addProjectEntry(projectEntryBean.name,bean.constructFlow(),Condition.after(conditionBean.after(0))) + }else{ + val groupBean = projectEntryBean.asInstanceOf[FlowGroupBean] + project.addProjectEntry(groupBean.name,groupBean.constructFlowGroup(), Condition.after(conditionBean.after(0))) + } + + } + else { + println(projectEntryBean.name + " after " + conditionBean.after.toSeq + "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@") + + var other = new Array[String](conditionBean.after.size-1) + conditionBean.after.copyToArray(other,1) + + if (projectEntryBean.isInstanceOf[FlowBean]){ + val bean = projectEntryBean.asInstanceOf[FlowBean] + project.addProjectEntry(projectEntryBean.name,bean.constructFlow(),Condition.after(conditionBean.after(0),other: _*)) + }else{ + val groupBean = projectEntryBean.asInstanceOf[FlowGroupBean] + project.addProjectEntry(groupBean.name,groupBean.constructFlowGroup(), Condition.after(conditionBean.after(0),other: _*)) + } + + } + } + + }) + + project + } + + /*private def addProjectEntry(project:ProjectImpl, projectEntryBean:ProjectEntryBean, conditionBean: ConditionBean): Unit ={ + if (projectEntryBean.isInstanceOf[FlowBean]){ + val bean = projectEntryBean.asInstanceOf[FlowBean] + project.addProjectEntry(projectEntryBean.name,bean.constructFlow(),Condition.after(conditionBean.after(0),other: _*)) + }else{ + val groupBean = projectEntryBean.asInstanceOf[FlowGroupBean] + project.addProjectEntry(groupBean.name,groupBean.constructFlowGroup(), Condition.after(conditionBean.after(0),other: _*)) + } + }*/ + +} + +object ProjectBean { + def apply(map : Map[String, Any]): ProjectBean = { + + val projectBean = new ProjectBean() + projectBean.init(map) + projectBean + } } diff --git a/piflow-bundle/src/main/scala/cn/piflow/conf/bean/ProjectEntryBean.scala b/piflow-bundle/src/main/scala/cn/piflow/conf/bean/ProjectEntryBean.scala new file mode 100644 index 0000000..3431dd8 --- /dev/null +++ b/piflow-bundle/src/main/scala/cn/piflow/conf/bean/ProjectEntryBean.scala @@ -0,0 +1,19 @@ +package cn.piflow.conf.bean + +import cn.piflow.conf.util.MapUtil +import cn.piflow.{Condition, FlowGroupImpl, ProjectEntry} + +/** + * Created by xjzhu@cnic.cn on 4/25/19 + */ +trait ProjectEntryBean { + + var uuid : String + var name : String + + + def init(map : Map[String, Any]) + +} + + diff --git a/piflow-core/src/main/scala/cn/piflow/project.scala b/piflow-core/src/main/scala/cn/piflow/project.scala index 21ae763..5a05a37 100644 --- a/piflow-core/src/main/scala/cn/piflow/project.scala +++ b/piflow-core/src/main/scala/cn/piflow/project.scala @@ -4,6 +4,10 @@ import java.sql.Date import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.{CountDownLatch, TimeUnit} +import cn.piflow.util.PropertyUtil +import org.apache.spark.launcher.{SparkAppHandle, SparkLauncher} +import org.apache.spark.launcher.SparkAppHandle.State + import scala.collection.mutable.{ArrayBuffer, Map => MMap} @@ -15,10 +19,17 @@ trait Project { def addProjectEntry(name: String, flowOrGroup: ProjectEntry, con: Condition[ProjectExecution] = Condition.AlwaysTrue[ProjectExecution]); def mapFlowWithConditions(): Map[String, (ProjectEntry, Condition[ProjectExecution])]; + + def getProjectName(): String; + + def setProjectName(projectName : String): Unit; } class ProjectImpl extends Project { + var name = "" + var uuid = "" + val _mapFlowWithConditions = MMap[String, (ProjectEntry, Condition[ProjectExecution])](); def addProjectEntry(name: String, flowOrGroup: ProjectEntry, con: Condition[ProjectExecution] = Condition.AlwaysTrue[ProjectExecution]) = { @@ -26,6 +37,14 @@ class ProjectImpl extends Project { } def mapFlowWithConditions(): Map[String, (ProjectEntry, Condition[ProjectExecution])] = _mapFlowWithConditions.toMap; + + override def getProjectName(): String = { + this.name + } + + override def setProjectName(projectName: String): Unit = { + this.name = projectName + } } trait ProjectExecution extends Execution{ @@ -43,7 +62,7 @@ class ProjectExecutionImpl(project: Project, runnerContext: Context, runner: Run completedProcesses ++= mapFlowWithConditions.map(x => (x._1, false)); val numWaitingProcesses = new AtomicInteger(mapFlowWithConditions.size); - val startedProcesses = MMap[String, Process](); + val startedProcesses = MMap[String, SparkAppHandle](); val startedFlowGroup = MMap[String, FlowGroupExecution]() val execution = this; @@ -60,10 +79,6 @@ class ProjectExecutionImpl(project: Project, runnerContext: Context, runner: Run } override def onProcessCompleted(ctx: ProcessContext): Unit = { - startedProcesses.filter(_._2 == ctx.getProcess()).foreach { x => - completedProcesses(x._1) = true; - numWaitingProcesses.decrementAndGet(); - } } override def onJobStarted(ctx: JobContext): Unit = {} @@ -100,8 +115,62 @@ class ProjectExecutionImpl(project: Project, runnerContext: Context, runner: Run } private def startProcess(name: String, flow: Flow): Unit = { - val process = runner.start(flow); - startedProcesses(name) = process; + + println(flow.getFlowJson()) + + var flowJson = flow.getFlowJson() + flowJson = flowJson.replaceAll("}","}\n") + //TODO + var appId : String = "" + val countDownLatch = new CountDownLatch(1) + val launcher = new SparkLauncher + val handle =launcher + .setAppName(flow.getFlowName()) + .setMaster(PropertyUtil.getPropertyValue("spark.master")) + .setDeployMode(PropertyUtil.getPropertyValue("spark.deploy.mode")) + .setAppResource(PropertyUtil.getPropertyValue("piflow.bundle")) + .setVerbose(true) + .setConf("spark.hadoop.yarn.resourcemanager.hostname", PropertyUtil.getPropertyValue("yarn.resourcemanager.hostname")) + .setConf("spark.hadoop.yarn.resourcemanager.address", PropertyUtil.getPropertyValue("yarn.resourcemanager.address")) + .setConf("spark.yarn.access.namenode", PropertyUtil.getPropertyValue("yarn.access.namenode")) + .setConf("spark.yarn.stagingDir", PropertyUtil.getPropertyValue("yarn.stagingDir")) + .setConf("spark.yarn.jars", PropertyUtil.getPropertyValue("yarn.jars")) + .setConf("spark.jars", PropertyUtil.getPropertyValue("piflow.bundle")) + .setConf("spark.hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris")) + .setConf("spark.driver.memory", flow.getDriverMemory()) + .setConf("spark.num.executors", flow.getExecutorNum()) + .setConf("spark.executor.memory", flow.getExecutorMem()) + .setConf("spark.executor.cores",flow.getExecutorCores()) + .addFile(PropertyUtil.getConfigureFile()) + .setMainClass("cn.piflow.api.StartFlowMain") + .addAppArgs(flowJson) + .startApplication( new SparkAppHandle.Listener { + override def stateChanged(handle: SparkAppHandle): Unit = { + appId = handle.getAppId + val sparkAppState = handle.getState + if(appId != null){ + println("Spark job with app id: " + appId + ",\t State changed to: " + sparkAppState) + }else{ + println("Spark job's state changed to: " + sparkAppState) + } + + //TODO: get the process status + if (handle.getState.equals(State.FINISHED)){ + completedProcesses(flow.getFlowName()) = true; + numWaitingProcesses.decrementAndGet(); + } + if (handle.getState().isFinal){ + countDownLatch.countDown() + println("Task is finished!") + } + } + override def infoChanged(handle: SparkAppHandle): Unit = { + + } + } + ) + + startedProcesses(name) = handle; } private def startFlowGroup(name: String, flowGroup: FlowGroup): Unit = { diff --git a/piflow-server/src/main/scala/cn/piflow/api/API.scala b/piflow-server/src/main/scala/cn/piflow/api/API.scala index 9fb5ac2..b130a98 100644 --- a/piflow-server/src/main/scala/cn/piflow/api/API.scala +++ b/piflow-server/src/main/scala/cn/piflow/api/API.scala @@ -10,7 +10,7 @@ import org.apache.spark.sql.SparkSession import cn.piflow.conf.util.{ClassUtil, MapUtil, OptionUtil} import cn.piflow.{Process, Runner} import cn.piflow.api.util.{HdfsUtil, PropertyUtil} -import cn.piflow.conf.bean.FlowGroupBean +import cn.piflow.conf.bean.{FlowGroupBean, ProjectBean} import cn.piflow.util.{FlowState, H2Util, HadoopFileUtil} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} @@ -24,6 +24,24 @@ import scala.util.parsing.json.JSON object API { + def startProject(projectJson : String) = { + + println("StartProject API get json: \n" + projectJson ) + + var appId:String = null + val map = OptionUtil.getAny(JSON.parseFull(projectJson)).asInstanceOf[Map[String, Any]] + val projectMap = MapUtil.get(map, "project").asInstanceOf[Map[String, Any]] + + //create flowGroup + val projectBean = ProjectBean(map) + val project = projectBean.constructProject() + + val process = Runner.create() + .bind("checkpoint.path",PropertyUtil.getPropertyValue("checkpoint.path")) + .bind("debug.path",PropertyUtil.getPropertyValue("debug.path")) + .start(project); + } + def startFlowGroup(flowGroupJson : String) = { println("StartFlowGroup API get json: \n" + flowGroupJson ) diff --git a/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStartProject.scala b/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStartProject.scala new file mode 100644 index 0000000..fa0e021 --- /dev/null +++ b/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStartProject.scala @@ -0,0 +1,208 @@ +package cn.piflow.api + +import org.apache.http.client.config.RequestConfig +import org.apache.http.client.methods.{CloseableHttpResponse, HttpPost} +import org.apache.http.entity.StringEntity +import org.apache.http.impl.client.HttpClientBuilder +import org.apache.http.util.EntityUtils + +object HTTPClientStartProject { + + def main(args: Array[String]): Unit = { + + val json = + """ + |{ + | "project": { + | "name": "TestFlowGroup", + | "uuid": "1111111111111", + | "groups": [{ + | "group": { + | "name": "TestFlowGroup", + | "uuid": "1111111111111", + | "flows": [{ + | "flow": { + | "name": "one", + | "uuid": "1234", + | "executorNumber": "2", + | "executorMemory": "1g", + | "executorCores": "1", + | "stops": [{ + | "uuid": "1111", + | "name": "XmlParser", + | "bundle": "cn.piflow.bundle.xml.XmlParser", + | "properties": { + | "xmlpath": "hdfs://10.0.86.89:9000/xjzhu/dblp.mini.xml", + | "rowTag": "phdthesis" + | } + | }, + | { + | "uuid": "2222", + | "name": "SelectField", + | "bundle": "cn.piflow.bundle.common.SelectField", + | "properties": { + | "schema": "title,author,pages" + | } + | + | }, + | { + | "uuid": "3333", + | "name": "PutHiveStreaming", + | "bundle": "cn.piflow.bundle.hive.PutHiveStreaming", + | "properties": { + | "database": "sparktest", + | "table": "dblp_phdthesis" + | } + | } + | ], + | "paths": [{ + | "from": "XmlParser", + | "outport": "", + | "inport": "", + | "to": "SelectField" + | }, + | { + | "from": "SelectField", + | "outport": "", + | "inport": "", + | "to": "PutHiveStreaming" + | } + | ] + | } + | }, + | { + | "flow": { + | "name": "two", + | "uuid": "5678", + | "stops": [{ + | "uuid": "1111", + | "name": "XmlParser", + | "bundle": "cn.piflow.bundle.xml.XmlParser", + | "properties": { + | "xmlpath": "hdfs://10.0.86.89:9000/xjzhu/dblp.mini.xml", + | "rowTag": "phdthesis" + | } + | }, + | { + | "uuid": "2222", + | "name": "SelectField", + | "bundle": "cn.piflow.bundle.common.SelectField", + | "properties": { + | "schema": "title,author,pages" + | } + | + | }, + | { + | "uuid": "3333", + | "name": "PutHiveStreaming", + | "bundle": "cn.piflow.bundle.hive.PutHiveStreaming", + | "properties": { + | "database": "sparktest", + | "table": "dblp_phdthesis" + | } + | } + | ], + | "paths": [{ + | "from": "XmlParser", + | "outport": "", + | "inport": "", + | "to": "SelectField" + | }, + | { + | "from": "SelectField", + | "outport": "", + | "inport": "", + | "to": "PutHiveStreaming" + | } + | ] + | } + | + | } + | ], + | + | "conditions": [{ + | "entry": "two", + | "after": "one" + | }] + | } + | }], + | "flows": [{ + | "flow": { + | "name": "three", + | "uuid": "91011", + | "executorNumber": "2", + | "executorMemory": "1g", + | "executorCores": "1", + | "stops": [{ + | "uuid": "1111", + | "name": "XmlParser", + | "bundle": "cn.piflow.bundle.xml.XmlParser", + | "properties": { + | "xmlpath": "hdfs://10.0.86.89:9000/xjzhu/dblp.mini.xml", + | "rowTag": "phdthesis" + | } + | }, + | { + | "uuid": "2222", + | "name": "SelectField", + | "bundle": "cn.piflow.bundle.common.SelectField", + | "properties": { + | "schema": "title,author,pages" + | } + | + | }, + | { + | "uuid": "3333", + | "name": "PutHiveStreaming", + | "bundle": "cn.piflow.bundle.hive.PutHiveStreaming", + | "properties": { + | "database": "sparktest", + | "table": "dblp_phdthesis" + | } + | } + | ], + | "paths": [{ + | "from": "XmlParser", + | "outport": "", + | "inport": "", + | "to": "SelectField" + | }, + | { + | "from": "SelectField", + | "outport": "", + | "inport": "", + | "to": "PutHiveStreaming" + | } + | ] + | } + | }], + | "conditions": [{ + | "entry": "three", + | "after": "TestFlowGroup" + | }] + | } + |} + """.stripMargin + + val url = "http://10.0.86.98:8001/project/start" + val timeout = 1800 + val requestConfig = RequestConfig.custom() + .setConnectTimeout(timeout*1000) + .setConnectionRequestTimeout(timeout*1000) + .setSocketTimeout(timeout*1000).build() + + val client = HttpClientBuilder.create().setDefaultRequestConfig(requestConfig).build() + + val post:HttpPost = new HttpPost(url) + post.addHeader("Content-Type", "application/json") + post.setEntity(new StringEntity(json)) + + + val response:CloseableHttpResponse = client.execute(post) + val entity = response.getEntity + val str = EntityUtils.toString(entity,"UTF-8") + println("Code is " + str) + + } + +} diff --git a/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala b/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala index d58516a..b107937 100644 --- a/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala +++ b/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala @@ -225,17 +225,32 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup case HttpEntity.Strict(_, data) =>{ var flowGroupJson = data.utf8String flowGroupJson = flowGroupJson.replaceAll("}","}\n") - //flowJson = JsonFormatTool.formatJson(flowJson) - //val (appId,pid,process) = API.startFlowGroup(flowGroupJson) API.startFlowGroup(flowGroupJson) - //processMap += (appId -> process) - //val result = "{\"flow\":{\"id\":\"" + appId + "\",\"pid\":\"" + pid + "\"}}" - Future.successful(HttpResponse(entity = "okokok!!!")) + Future.successful(HttpResponse(entity = "start flow group ok!!!")) } case ex => { println(ex) - Future.successful(HttpResponse(entity = "Can not start flow!")) + Future.successful(HttpResponse(entity = "Can not start flow group!")) + //Future.failed(/*new Exception("Can not start flow!")*/HttpResponse(entity = "Can not start flow!")) + } + } + + } + + case HttpRequest(POST, Uri.Path("/project/start"), headers, entity, protocol) =>{ + + entity match { + case HttpEntity.Strict(_, data) =>{ + var projectJson = data.utf8String + projectJson = projectJson.replaceAll("}","}\n") + API.startProject(projectJson) + Future.successful(HttpResponse(entity = "start project ok!!!")) + } + + case ex => { + println(ex) + Future.successful(HttpResponse(entity = "Can not start project!")) //Future.failed(/*new Exception("Can not start flow!")*/HttpResponse(entity = "Can not start flow!")) } }