diff --git a/piflow-core/src/main/scala/cn/piflow/project.scala b/piflow-core/src/main/scala/cn/piflow/project.scala index 696d1a0..32252de 100644 --- a/piflow-core/src/main/scala/cn/piflow/project.scala +++ b/piflow-core/src/main/scala/cn/piflow/project.scala @@ -4,7 +4,7 @@ import java.sql.Date import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.{CountDownLatch, TimeUnit} -import cn.piflow.util.{FlowLauncher, PropertyUtil} +import cn.piflow.util.{FlowLauncher, FlowState, H2Util, PropertyUtil} import org.apache.spark.launcher.{SparkAppHandle, SparkLauncher} import org.apache.spark.launcher.SparkAppHandle.State @@ -65,6 +65,8 @@ class ProjectExecutionImpl(project: Project, runnerContext: Context, runner: Run val startedProcesses = MMap[String, SparkAppHandle](); val startedFlowGroup = MMap[String, FlowGroupExecution]() + val startedProcessesAppID = MMap[String, String]() + val execution = this; val POLLING_INTERVAL = 1000; val latch = new CountDownLatch(1); @@ -120,6 +122,7 @@ class ProjectExecutionImpl(project: Project, runnerContext: Context, runner: Run var flowJson = flow.getFlowJson() flowJson = flowJson.replaceAll("}","}\n") + var appId : String = "" val countDownLatch = new CountDownLatch(1) @@ -133,11 +136,11 @@ class ProjectExecutionImpl(project: Project, runnerContext: Context, runner: Run println("Spark job's state changed to: " + sparkAppState) } - //TODO: get the process status - if (handle.getState.equals(State.FINISHED)){ + if(H2Util.getFlowState(appId).equals(FlowState.COMPLETED)){ completedProjectEntry(flow.getFlowName()) = true; numWaitingProjectEntry.decrementAndGet(); } + if (handle.getState().isFinal){ countDownLatch.countDown() println("Task is finished!") @@ -150,7 +153,13 @@ class ProjectExecutionImpl(project: Project, runnerContext: Context, runner: Run } ) + + while (appId == null){ + appId = handle.getAppId + Thread.sleep(100) + } startedProcesses(name) = handle; + startedProcessesAppID(name) = appId } private def startFlowGroup(name: String, flowGroup: FlowGroup): Unit = { @@ -211,9 +220,21 @@ class ProjectExecutionImpl(project: Project, runnerContext: Context, runner: Run private def finalizeExecution(completed: Boolean): Unit = { if (running) { if (!completed) { - pollingThread.interrupt(); - startedProcesses.filter(x => !isEntryCompleted(x._1)).map(_._2).foreach(_.stop()); + //pollingThread.interrupt(); + //startedProcesses.filter(x => !isEntryCompleted(x._1)).map(_._2).foreach(_.stop()); + startedProcesses.filter(x => !isEntryCompleted(x._1)).foreach(x => { + + x._2.stop() + val appID: String = startedProcessesAppID.getOrElse(x._1,"") + if(!appID.equals("")){ + println("Stop Flow " + appID + " by FlowLauncher!") + FlowLauncher.stop(appID) + } + + }); startedFlowGroup.filter(x => !isEntryCompleted(x._1)).map(_._2).foreach(_.stop()); + pollingThread.interrupt(); + } runner.removeListener(listener); diff --git a/piflow-server/src/main/scala/cn/piflow/api/API.scala b/piflow-server/src/main/scala/cn/piflow/api/API.scala index a51777f..baceb9a 100644 --- a/piflow-server/src/main/scala/cn/piflow/api/API.scala +++ b/piflow-server/src/main/scala/cn/piflow/api/API.scala @@ -36,10 +36,13 @@ object API { val projectBean = ProjectBean(map) val project = projectBean.constructProject() - val process = Runner.create() + val projectExecution = Runner.create() .bind("checkpoint.path",PropertyUtil.getPropertyValue("checkpoint.path")) .bind("debug.path",PropertyUtil.getPropertyValue("debug.path")) .start(project); + + (projectBean.name,projectExecution) + } def startFlowGroup(flowGroupJson : String) = { diff --git a/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStopProject.scala b/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStopProject.scala new file mode 100644 index 0000000..2f70cf4 --- /dev/null +++ b/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStopProject.scala @@ -0,0 +1,24 @@ +package cn.piflow.api + +import org.apache.http.client.methods.{CloseableHttpResponse, HttpPost} +import org.apache.http.entity.StringEntity +import org.apache.http.impl.client.HttpClients +import org.apache.http.util.EntityUtils + +object HTTPClientStopProject { + def main(args: Array[String]): Unit = { + val json = """{"projectName":"TestProject"}""" + val url = "http://10.0.86.98:8001/project/stop" + val client = HttpClients.createDefault() + val post:HttpPost = new HttpPost(url) + + post.addHeader("Content-Type", "application/json") + post.setEntity(new StringEntity(json)) + + val response:CloseableHttpResponse = client.execute(post) + val entity = response.getEntity + val str = EntityUtils.toString(entity,"UTF-8") + println(str) + } + +} diff --git a/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala b/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala index bc57f98..95a5412 100644 --- a/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala +++ b/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala @@ -7,7 +7,7 @@ import akka.http.scaladsl.model._ import akka.http.scaladsl.model.HttpMethods._ import akka.http.scaladsl.server.Directives import akka.stream.ActorMaterializer -import cn.piflow.FlowGroupExecution +import cn.piflow.{FlowGroupExecution, ProjectExecution} import cn.piflow.api.util.PropertyUtil import cn.piflow.conf.util.{MapUtil, OptionUtil} import com.typesafe.config.ConfigFactory @@ -26,6 +26,7 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup implicit val executionContext = system.dispatcher var processMap = Map[String, SparkAppHandle]() var flowGroupMap = Map[String, FlowGroupExecution]() + var projectMap = Map[String, ProjectExecution]() def toJson(entity: RequestEntity): Map[String, Any] = { entity match { @@ -269,7 +270,8 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup case HttpEntity.Strict(_, data) =>{ var projectJson = data.utf8String projectJson = projectJson.replaceAll("}","}\n") - API.startProject(projectJson) + val (projectName, projectExecution) = API.startProject(projectJson) + projectMap += (projectName -> projectExecution) Future.successful(HttpResponse(entity = "start project ok!!!")) } @@ -282,27 +284,27 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup } - /*case HttpRequest(POST, Uri.Path("/project/stop"), headers, entity, protocol) =>{ + case HttpRequest(POST, Uri.Path("/project/stop"), headers, entity, protocol) =>{ val data = toJson(entity) val projectName = data.get("projectName").getOrElse("").asInstanceOf[String] - if(projectName.equals("") || !flowGroupMap.contains(projectName)){ - Future.failed(new Exception("Can not found flowGroup Error!")) + if(projectName.equals("") || !projectMap.contains(projectName)){ + Future.failed(new Exception("Can not found project Error!")) }else{ - flowGroupMap.get(groupName) match { - case Some(flowGroupExecution) => - val result = flowGroupExecution.stop() - flowGroupMap.-(groupName) - Future.successful(HttpResponse(entity = "Stop FlowGroup Ok!!!")) + projectMap.get(projectName) match { + case Some(projectExecution) => + val result = projectExecution.stop() + projectMap.-(projectName) + Future.successful(HttpResponse(entity = "Stop project Ok!!!")) case ex =>{ println(ex) - Future.successful(HttpResponse(entity = "Can not found FlowGroup Error!")) + Future.successful(HttpResponse(entity = "Can not found project Error!")) } } } - }*/ + } case _: HttpRequest => Future.successful(HttpResponse(404, entity = "Unknown resource!"))