forked from opensci/piflow
return applicationID when start the flow
This commit is contained in:
parent
60e4f3310e
commit
674de714c6
|
@ -14,7 +14,7 @@ import scala.util.parsing.json.JSON
|
|||
|
||||
object API {
|
||||
|
||||
def startFlow(flowJson : String):Process = {
|
||||
def startFlow(flowJson : String):(String,Process) = {
|
||||
|
||||
val map = OptionUtil.getAny(JSON.parseFull(flowJson)).asInstanceOf[Map[String, Any]]
|
||||
println(map)
|
||||
|
@ -40,10 +40,10 @@ object API {
|
|||
.bind(classOf[SparkSession].getName, spark)
|
||||
.bind("checkpoint.path", PropertyUtil.getPropertyValue("checkpoint.path"))
|
||||
.start(flow);
|
||||
|
||||
val applicationId = spark.sparkContext.applicationId
|
||||
process.awaitTermination();
|
||||
spark.close();
|
||||
process
|
||||
(applicationId,process)
|
||||
}
|
||||
|
||||
def stopFlow(process : Process): String = {
|
||||
|
|
|
@ -59,9 +59,9 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup
|
|||
entity match {
|
||||
case HttpEntity.Strict(_, data) =>{
|
||||
val flowJson = data.utf8String
|
||||
val process = API.startFlow(flowJson)
|
||||
val (appId,process) = API.startFlow(flowJson)
|
||||
processMap += (process.pid() -> process)
|
||||
Future.successful(HttpResponse(entity = process.pid()))
|
||||
Future.successful(HttpResponse(entity = appId))
|
||||
}
|
||||
|
||||
case _ => Future.failed(new Exception("Can not start flow!"))
|
||||
|
|
Loading…
Reference in New Issue