diff --git a/piflow-core/src/main/scala/cn/piflow/group.scala b/piflow-core/src/main/scala/cn/piflow/group.scala index 6efed31..8aae081 100644 --- a/piflow-core/src/main/scala/cn/piflow/group.scala +++ b/piflow-core/src/main/scala/cn/piflow/group.scala @@ -1,12 +1,13 @@ package cn.piflow import java.sql.Date +import java.util.Date import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.{CountDownLatch, TimeUnit} import scala.collection.mutable.{ArrayBuffer, Map => MMap} import cn.piflow.Execution -import cn.piflow.util.{FlowLauncher, PropertyUtil} +import cn.piflow.util.{FlowLauncher, FlowState, H2Util, PropertyUtil} import org.apache.spark.launcher.SparkAppHandle.State import org.apache.spark.launcher.{SparkAppHandle, SparkLauncher} import org.apache.spark.sql.SparkSession @@ -72,6 +73,7 @@ class FlowGroupExecutionImpl(fg: FlowGroup, runnerContext: Context, runner: Runn val numWaitingProcesses = new AtomicInteger(mapFlowWithConditions.size); val startedProcesses = MMap[String, SparkAppHandle](); + val startedProcessesAppID = MMap[String, String]() val execution = this; val POLLING_INTERVAL = 1000; @@ -111,11 +113,11 @@ class FlowGroupExecutionImpl(fg: FlowGroup, runnerContext: Context, runner: Runn println("Spark job's state changed to: " + sparkAppState) } - //TODO: get the process status - if (handle.getState.equals(State.FINISHED)){ + if(H2Util.getFlowState(appId).equals(FlowState.COMPLETED)){ completedProcesses(flow.getFlowName()) = true; numWaitingProcesses.decrementAndGet(); } + if (handle.getState().isFinal){ countDownLatch.countDown() println("Task is finished!") @@ -128,7 +130,13 @@ class FlowGroupExecutionImpl(fg: FlowGroup, runnerContext: Context, runner: Runn } ) + + while (appId == null){ + appId = handle.getAppId + Thread.sleep(100) + } startedProcesses(name) = handle; + startedProcessesAppID(name) = appId } val pollingThread = new Thread(new Runnable() { @@ -136,27 +144,36 @@ class FlowGroupExecutionImpl(fg: FlowGroup, runnerContext: Context, runner: Runn runnerListener.onFlowGroupStarted(flowGroupContext) - while (numWaitingProcesses.get() > 0) { - val todos = ArrayBuffer[(String, Flow)](); - mapFlowWithConditions.foreach { en => - if (!startedProcesses.contains(en._1) && en._2._2.matches(execution)) { - todos += (en._1 -> en._2._1); + try{ + + while (numWaitingProcesses.get() > 0) { + val todos = ArrayBuffer[(String, Flow)](); + mapFlowWithConditions.foreach { en => + if (!startedProcesses.contains(en._1) && en._2._2.matches(execution)) { + todos += (en._1 -> en._2._1); + } } + + startedProcesses.synchronized { + todos.foreach(en => startProcess(en._1, en._2)); + } + + Thread.sleep(POLLING_INTERVAL); } - startedProcesses.synchronized { - todos.foreach(en => startProcess(en._1, en._2)); - } - Thread.sleep(POLLING_INTERVAL); + runnerListener.onFlowGroupCompleted(flowGroupContext) + + }catch { + case e: Throwable => + runnerListener.onFlowGroupFailed(flowGroupContext); + throw e; + } + finally { + latch.countDown(); + finalizeExecution(true); } - latch.countDown(); - finalizeExecution(true); - - runnerListener.onFlowGroupCompleted(flowGroupContext) - //TODO: how to define FlowGroup Failed - //runnerListener.onFlowGroupFailed(ctx) } }); @@ -179,8 +196,20 @@ class FlowGroupExecutionImpl(fg: FlowGroup, runnerContext: Context, runner: Runn private def finalizeExecution(completed: Boolean): Unit = { if (running) { if (!completed) { + + //startedProcesses.filter(x => isEntryCompleted(x._1)).map(_._2).foreach(_.stop()); + startedProcesses.filter(x => !isEntryCompleted(x._1)).foreach(x => { + + x._2.stop() + val appID: String = startedProcessesAppID.getOrElse(x._1,"") + if(!appID.equals("")){ + println("Stop Flow " + appID + " by FlowLauncher!") + FlowLauncher.stop(appID) + } + + }); pollingThread.interrupt(); - startedProcesses.filter(x => isEntryCompleted(x._1)).map(_._2).foreach(_.stop()); + } running = false; diff --git a/piflow-core/src/main/scala/cn/piflow/project.scala b/piflow-core/src/main/scala/cn/piflow/project.scala index 569d2db..696d1a0 100644 --- a/piflow-core/src/main/scala/cn/piflow/project.scala +++ b/piflow-core/src/main/scala/cn/piflow/project.scala @@ -212,8 +212,8 @@ class ProjectExecutionImpl(project: Project, runnerContext: Context, runner: Run if (running) { if (!completed) { pollingThread.interrupt(); - startedProcesses.filter(x => isEntryCompleted(x._1)).map(_._2).foreach(_.stop()); - startedFlowGroup.filter(x => isEntryCompleted(x._1)).map(_._2).foreach(_.stop()); + startedProcesses.filter(x => !isEntryCompleted(x._1)).map(_._2).foreach(_.stop()); + startedFlowGroup.filter(x => !isEntryCompleted(x._1)).map(_._2).foreach(_.stop()); } runner.removeListener(listener); diff --git a/piflow-core/src/main/scala/cn/piflow/util/FlowLauncher.scala b/piflow-core/src/main/scala/cn/piflow/util/FlowLauncher.scala index e1776f3..7366385 100644 --- a/piflow-core/src/main/scala/cn/piflow/util/FlowLauncher.scala +++ b/piflow-core/src/main/scala/cn/piflow/util/FlowLauncher.scala @@ -1,8 +1,14 @@ package cn.piflow.util + +import java.util.Date import java.util.concurrent.CountDownLatch import cn.piflow.Flow +import org.apache.http.client.methods.{CloseableHttpResponse, HttpPut} +import org.apache.http.entity.StringEntity +import org.apache.http.impl.client.HttpClients +import org.apache.http.util.EntityUtils import org.apache.spark.launcher.SparkAppHandle.State import org.apache.spark.launcher.{SparkAppHandle, SparkLauncher} @@ -49,4 +55,27 @@ object FlowLauncher { sparkLauncher } + def stop(appID: String) = { + + println("Stop Flow !!!!!!!!!!!!!!!!!!!!!!!!!!") + //yarn application kill appId + val url = PropertyUtil.getPropertyValue("yarn.url") + appID + "/state" + val client = HttpClients.createDefault() + val put:HttpPut = new HttpPut(url) + val body ="{\"state\":\"KILLED\"}" + put.addHeader("Content-Type", "application/json") + put.setEntity(new StringEntity(body)) + val response:CloseableHttpResponse = client.execute(put) + val entity = response.getEntity + val str = EntityUtils.toString(entity,"UTF-8") + + //update db + println("Update flow state after Stop Flow !!!!!!!!!!!!!!!!!!!!!!!!!!") + H2Util.updateFlowState(appID, FlowState.KILLED) + H2Util.updateFlowFinishedTime(appID, new Date().toString) + + + "ok" + } + } diff --git a/piflow-server/src/main/scala/cn/piflow/api/API.scala b/piflow-server/src/main/scala/cn/piflow/api/API.scala index b130a98..a51777f 100644 --- a/piflow-server/src/main/scala/cn/piflow/api/API.scala +++ b/piflow-server/src/main/scala/cn/piflow/api/API.scala @@ -54,86 +54,14 @@ object API { val flowGroupBean = FlowGroupBean(map) val flowGroup = flowGroupBean.constructFlowGroup() - val process = Runner.create() + val flowGroupExecution = Runner.create() .bind("checkpoint.path",PropertyUtil.getPropertyValue("checkpoint.path")) .bind("debug.path",PropertyUtil.getPropertyValue("debug.path")) .start(flowGroup); + + (flowGroupBean.name, flowGroupExecution) } - /*def startFlowGroup(flowGroupJson : String):(String,String,SparkAppHandle) = { - - var appId:String = null - val map = OptionUtil.getAny(JSON.parseFull(flowGroupJson)).asInstanceOf[Map[String, Any]] - val flowGroupMap = MapUtil.get(map, "group").asInstanceOf[Map[String, Any]] - /*val uuid = MapUtil.get(flowMap,"uuid").asInstanceOf[String] - val appName = MapUtil.get(flowMap,"name").asInstanceOf[String]*/ - - val dirverMem = flowGroupMap.getOrElse("driverMemory","1g").asInstanceOf[String] - val executorNum = flowGroupMap.getOrElse("executorNumber","1").asInstanceOf[String] - val executorMem= flowGroupMap.getOrElse("executorMemory","1g").asInstanceOf[String] - val executorCores = flowGroupMap.getOrElse("executorCores","1").asInstanceOf[String] - - //val (stdout, stderr) = getLogFile(uuid, appName) - - println("StartFlowGroup API get json: \n" + flowGroupJson ) - - val countDownLatch = new CountDownLatch(1) - val launcher = new SparkLauncher - val handle =launcher - .setAppName("TestFlowGroup") - .setMaster(PropertyUtil.getPropertyValue("spark.master")) - .setDeployMode(PropertyUtil.getPropertyValue("spark.deploy.mode")) - .setAppResource(PropertyUtil.getPropertyValue("piflow.bundle")) - .setVerbose(true) - .setConf("spark.hadoop.yarn.resourcemanager.hostname", PropertyUtil.getPropertyValue("yarn.resourcemanager.hostname")) - .setConf("spark.hadoop.yarn.resourcemanager.address", PropertyUtil.getPropertyValue("yarn.resourcemanager.address")) - .setConf("spark.yarn.access.namenode", PropertyUtil.getPropertyValue("yarn.access.namenode")) - .setConf("spark.yarn.stagingDir", PropertyUtil.getPropertyValue("yarn.stagingDir")) - .setConf("spark.yarn.jars", PropertyUtil.getPropertyValue("yarn.jars")) - .setConf("spark.jars", PropertyUtil.getPropertyValue("piflow.bundle")) - .setConf("spark.hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris")) - .setConf("spark.driver.memory", dirverMem) - .setConf("spark.num.executors",executorNum) - .setConf("spark.executor.memory", executorMem) - .setConf("spark.executor.cores",executorCores) - //.setConf("spark.cores.max", "4") - //.setConf("spark.checkpoint", PropertyUtil.getPropertyValue("checkpoint.path")) - .addFile(PropertyUtil.getConfigureFile()) - .setMainClass("cn.piflow.api.StartFlowGroupMain") - .addAppArgs(flowGroupJson.stripMargin) - //.redirectOutput(stdout) - //.redirectError(stderr) - .startApplication( new SparkAppHandle.Listener { - override def stateChanged(handle: SparkAppHandle): Unit = { - appId = handle.getAppId - val sparkAppState = handle.getState - if(appId != null){ - println("Spark job with app id: " + appId + ",\t State changed to: " + sparkAppState) - }else{ - println("Spark job's state changed to: " + sparkAppState) - } - if (handle.getState().isFinal){ - countDownLatch.countDown() - println("Task is finished!") - } - } - override def infoChanged(handle: SparkAppHandle): Unit = { - //println("Info:" + handle.getState().toString) - } - } - ) - while (appId == null){ - Thread.sleep(1000) - } - var processId = "" - while(processId.equals("")){ - Thread.sleep(1000) - processId = H2Util.getFlowProcessId(appId) - } - (appId, processId, handle) - - }*/ - def startFlow(flowJson : String):(String,String,SparkAppHandle) = { var appId:String = null diff --git a/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStopFlowGroup.scala b/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStopFlowGroup.scala new file mode 100644 index 0000000..6fcf195 --- /dev/null +++ b/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStopFlowGroup.scala @@ -0,0 +1,24 @@ +package cn.piflow.api + +import org.apache.http.client.methods.{CloseableHttpResponse, HttpPost} +import org.apache.http.entity.StringEntity +import org.apache.http.impl.client.HttpClients +import org.apache.http.util.EntityUtils + +object HTTPClientStopFlowGroup { + def main(args: Array[String]): Unit = { + val json = """{"groupName":"FlowGroup"}""" + val url = "http://10.0.86.98:8001/flowGroup/stop" + val client = HttpClients.createDefault() + val post:HttpPost = new HttpPost(url) + + post.addHeader("Content-Type", "application/json") + post.setEntity(new StringEntity(json)) + + val response:CloseableHttpResponse = client.execute(post) + val entity = response.getEntity + val str = EntityUtils.toString(entity,"UTF-8") + println(str) + } + +} diff --git a/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala b/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala index b107937..bc57f98 100644 --- a/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala +++ b/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala @@ -7,6 +7,7 @@ import akka.http.scaladsl.model._ import akka.http.scaladsl.model.HttpMethods._ import akka.http.scaladsl.server.Directives import akka.stream.ActorMaterializer +import cn.piflow.FlowGroupExecution import cn.piflow.api.util.PropertyUtil import cn.piflow.conf.util.{MapUtil, OptionUtil} import com.typesafe.config.ConfigFactory @@ -24,6 +25,7 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup implicit val materializer = ActorMaterializer() implicit val executionContext = system.dispatcher var processMap = Map[String, SparkAppHandle]() + var flowGroupMap = Map[String, FlowGroupExecution]() def toJson(entity: RequestEntity): Map[String, Any] = { entity match { @@ -225,7 +227,8 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup case HttpEntity.Strict(_, data) =>{ var flowGroupJson = data.utf8String flowGroupJson = flowGroupJson.replaceAll("}","}\n") - API.startFlowGroup(flowGroupJson) + val (flowName, flowGroupExecution) = API.startFlowGroup(flowGroupJson) + flowGroupMap += (flowName -> flowGroupExecution) Future.successful(HttpResponse(entity = "start flow group ok!!!")) } @@ -238,6 +241,28 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup } + case HttpRequest(POST, Uri.Path("/flowGroup/stop"), headers, entity, protocol) =>{ + val data = toJson(entity) + val groupName = data.get("groupName").getOrElse("").asInstanceOf[String] + if(groupName.equals("") || !flowGroupMap.contains(groupName)){ + Future.failed(new Exception("Can not found flowGroup Error!")) + }else{ + + flowGroupMap.get(groupName) match { + case Some(flowGroupExecution) => + val result = flowGroupExecution.stop() + flowGroupMap.-(groupName) + Future.successful(HttpResponse(entity = "Stop FlowGroup Ok!!!")) + case ex =>{ + println(ex) + Future.successful(HttpResponse(entity = "Can not found FlowGroup Error!")) + } + + } + + } + } + case HttpRequest(POST, Uri.Path("/project/start"), headers, entity, protocol) =>{ entity match { @@ -257,6 +282,28 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup } + /*case HttpRequest(POST, Uri.Path("/project/stop"), headers, entity, protocol) =>{ + val data = toJson(entity) + val projectName = data.get("projectName").getOrElse("").asInstanceOf[String] + if(projectName.equals("") || !flowGroupMap.contains(projectName)){ + Future.failed(new Exception("Can not found flowGroup Error!")) + }else{ + + flowGroupMap.get(groupName) match { + case Some(flowGroupExecution) => + val result = flowGroupExecution.stop() + flowGroupMap.-(groupName) + Future.successful(HttpResponse(entity = "Stop FlowGroup Ok!!!")) + case ex =>{ + println(ex) + Future.successful(HttpResponse(entity = "Can not found FlowGroup Error!")) + } + + } + + } + }*/ + case _: HttpRequest => Future.successful(HttpResponse(404, entity = "Unknown resource!")) }