From c64f492db67e82252caa43fcaa18348cad4e80ec Mon Sep 17 00:00:00 2001 From: judy0131 Date: Thu, 16 May 2019 10:03:42 +0800 Subject: [PATCH] update state when stop flowGroup or project --- .../src/main/scala/cn/piflow/group.scala | 24 ++++++------ .../src/main/scala/cn/piflow/project.scala | 32 +++++++++------ .../src/main/scala/cn/piflow/runner.scala | 39 +++++++++++++++++++ .../src/main/scala/cn/piflow/api/API.scala | 4 +- .../scala/cn/piflow/api/HTTPService.scala | 30 +++++++------- 5 files changed, 91 insertions(+), 38 deletions(-) diff --git a/piflow-core/src/main/scala/cn/piflow/group.scala b/piflow-core/src/main/scala/cn/piflow/group.scala index dcb3e89..8b6e569 100644 --- a/piflow-core/src/main/scala/cn/piflow/group.scala +++ b/piflow-core/src/main/scala/cn/piflow/group.scala @@ -1,6 +1,5 @@ package cn.piflow -import java.sql.Date import java.util.Date import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.{CountDownLatch, TimeUnit} @@ -69,7 +68,7 @@ class FlowGroupExecutionImpl(fg: FlowGroup, runnerContext: Context, runner: Runn val flowGroupContext = createContext(runnerContext); val flowGroupExecution = this; - val id : String = "group_" + IdGenerator.uuid() + "_" + IdGenerator.nextId[FlowGroupExecution]; + val id : String = "group_" + IdGenerator.uuid() ; val mapFlowWithConditions: Map[String, (Flow, Condition[FlowGroupExecution])] = fg.mapFlowWithConditions(); val completedProcesses = MMap[String, Boolean](); @@ -199,6 +198,7 @@ class FlowGroupExecutionImpl(fg: FlowGroup, runnerContext: Context, runner: Runn override def stop(): Unit = { finalizeExecution(false); + //runnerListener.onFlowGroupStoped(flowGroupContext) } override def awaitTermination(timeout: Long, unit: TimeUnit): Unit = { @@ -211,17 +211,19 @@ class FlowGroupExecutionImpl(fg: FlowGroup, runnerContext: Context, runner: Runn if (!completed) { //startedProcesses.filter(x => isEntryCompleted(x._1)).map(_._2).foreach(_.stop()); - startedProcesses.filter(x => !isEntryCompleted(x._1)).foreach(x => { + startedProcesses.synchronized{ + startedProcesses.filter(x => !isEntryCompleted(x._1)).foreach(x => { - x._2.stop() - val appID: String = startedProcessesAppID.getOrElse(x._1,"") - if(!appID.equals("")){ - println("Stop Flow " + appID + " by FlowLauncher!") - FlowLauncher.stop(appID) - } + x._2.stop() + val appID: String = startedProcessesAppID.getOrElse(x._1,"") + if(!appID.equals("")){ + println("Stop Flow " + appID + " by FlowLauncher!") + FlowLauncher.stop(appID) + } - }); - pollingThread.interrupt(); + }); + pollingThread.interrupt(); + } } diff --git a/piflow-core/src/main/scala/cn/piflow/project.scala b/piflow-core/src/main/scala/cn/piflow/project.scala index 76d8456..e8b02e5 100644 --- a/piflow-core/src/main/scala/cn/piflow/project.scala +++ b/piflow-core/src/main/scala/cn/piflow/project.scala @@ -1,6 +1,6 @@ package cn.piflow -import java.sql.Date +import java.util.Date import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.{CountDownLatch, TimeUnit} @@ -63,7 +63,7 @@ class ProjectExecutionImpl(project: Project, runnerContext: Context, runner: Run val projectContext = createContext(runnerContext); val projectExecution = this; - val id : String = "project_" + IdGenerator.uuid() + "_" + IdGenerator.nextId[ProjectExecution]; + val id : String = "project_" + IdGenerator.uuid(); val mapProjectEntryWithConditions: Map[String, (ProjectEntry, Condition[ProjectExecution])] = project.mapFlowWithConditions(); val completedProjectEntry = MMap[String, Boolean](); @@ -121,6 +121,10 @@ class ProjectExecutionImpl(project: Project, runnerContext: Context, runner: Run override def onProjectCompleted(ctx: ProjectContext): Unit = {} override def onProjectFailed(ctx: ProjectContext): Unit = {} + + override def onFlowGroupStoped(ctx: FlowGroupContext): Unit = {} + + override def onProjectStoped(ctx: ProjectContext): Unit = {} }; runner.addListener(listener); @@ -259,6 +263,7 @@ class ProjectExecutionImpl(project: Project, runnerContext: Context, runner: Run override def stop(): Unit = { finalizeExecution(false); + //runnerListener.onProjectStoped(projectContext) } override def awaitTermination(timeout: Long, unit: TimeUnit): Unit = { @@ -271,17 +276,22 @@ class ProjectExecutionImpl(project: Project, runnerContext: Context, runner: Run if (!completed) { //pollingThread.interrupt(); //startedProcesses.filter(x => !isEntryCompleted(x._1)).map(_._2).foreach(_.stop()); - startedProcesses.filter(x => !isEntryCompleted(x._1)).foreach(x => { + startedProcesses.synchronized{ + startedProcesses.filter(x => !isEntryCompleted(x._1)).foreach(x => { - x._2.stop() - val appID: String = startedProcessesAppID.getOrElse(x._1,"") - if(!appID.equals("")){ - println("Stop Flow " + appID + " by FlowLauncher!") - FlowLauncher.stop(appID) - } + x._2.stop() + val appID: String = startedProcessesAppID.getOrElse(x._1,"") + if(!appID.equals("")){ + println("Stop Flow " + appID + " by FlowLauncher!") + FlowLauncher.stop(appID) + } + + }); + } + startedFlowGroup.synchronized{ + startedFlowGroup.filter(x => !isEntryCompleted(x._1)).map(_._2).foreach(_.stop()); + } - }); - startedFlowGroup.filter(x => !isEntryCompleted(x._1)).map(_._2).foreach(_.stop()); pollingThread.interrupt(); } diff --git a/piflow-core/src/main/scala/cn/piflow/runner.scala b/piflow-core/src/main/scala/cn/piflow/runner.scala index e42d8ee..b3ac99f 100644 --- a/piflow-core/src/main/scala/cn/piflow/runner.scala +++ b/piflow-core/src/main/scala/cn/piflow/runner.scala @@ -92,6 +92,14 @@ object Runner { override def onProjectFailed(ctx: ProjectContext): Unit = { listeners.foreach(_.onProjectFailed(ctx)); } + + override def onFlowGroupStoped(ctx: FlowGroupContext): Unit = { + listeners.foreach(_.onFlowGroupStoped(ctx)) + } + + override def onProjectStoped(ctx: ProjectContext): Unit = { + listeners.foreach(_.onProjectStoped(ctx)) + } } override def addListener(listener: RunnerListener) = { @@ -153,11 +161,15 @@ trait RunnerListener { def onFlowGroupFailed(ctx: FlowGroupContext); + def onFlowGroupStoped(ctx: FlowGroupContext); + def onProjectStarted(ctx: ProjectContext); def onProjectCompleted(ctx: ProjectContext); def onProjectFailed(ctx: ProjectContext); + + def onProjectStoped(ctx: ProjectContext); } @@ -305,6 +317,19 @@ class RunnerLogger extends RunnerListener with Logging { H2Util.updateFlowGroupFinishedTime(groupId,time) } + override def onFlowGroupStoped(ctx: FlowGroupContext): Unit = { + //TODO: write monitor data into db + val groupId = ctx.getFlowGroupExecution().groupId() + val flowGroupName = ctx.getFlowGroup().getFlowGroupName() + val time = new Date().toString + logger.debug(s"Flow Group stoped: $groupId, time: $time"); + println(s"Flow Group stoped: $groupId, time: $time") + //update flow group state to COMPLETED + H2Util.updateFlowGroupState(groupId,FlowGroupState.KILLED) + H2Util.updateFlowGroupFinishedTime(groupId,time) + + } + override def onFlowGroupFailed(ctx: FlowGroupContext): Unit = { //TODO: write monitor data into db val groupId = ctx.getFlowGroupExecution().groupId() @@ -353,4 +378,18 @@ class RunnerLogger extends RunnerListener with Logging { H2Util.updateProjectState(projectId,ProjectState.FAILED) H2Util.updateProjectFinishedTime(projectId,time) } + + + + override def onProjectStoped(ctx: ProjectContext): Unit = { + val projectId = ctx.getProjectExecution().projectId() + val projectName = ctx.getProject().getProjectName() + val time = new Date().toString + logger.debug(s"Project failed: $projectId, time: $time"); + println(s"Project failed: $projectId, time: $time") + //update project state to FAILED + H2Util.updateProjectState(projectId,ProjectState.KILLED) + H2Util.updateProjectFinishedTime(projectId,time) + + } } diff --git a/piflow-server/src/main/scala/cn/piflow/api/API.scala b/piflow-server/src/main/scala/cn/piflow/api/API.scala index baceb9a..965f852 100644 --- a/piflow-server/src/main/scala/cn/piflow/api/API.scala +++ b/piflow-server/src/main/scala/cn/piflow/api/API.scala @@ -41,7 +41,7 @@ object API { .bind("debug.path",PropertyUtil.getPropertyValue("debug.path")) .start(project); - (projectBean.name,projectExecution) + projectExecution } @@ -62,7 +62,7 @@ object API { .bind("debug.path",PropertyUtil.getPropertyValue("debug.path")) .start(flowGroup); - (flowGroupBean.name, flowGroupExecution) + flowGroupExecution } def startFlow(flowJson : String):(String,String,SparkAppHandle) = { diff --git a/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala b/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala index 95a5412..c1a735b 100644 --- a/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala +++ b/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala @@ -228,9 +228,10 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup case HttpEntity.Strict(_, data) =>{ var flowGroupJson = data.utf8String flowGroupJson = flowGroupJson.replaceAll("}","}\n") - val (flowName, flowGroupExecution) = API.startFlowGroup(flowGroupJson) - flowGroupMap += (flowName -> flowGroupExecution) - Future.successful(HttpResponse(entity = "start flow group ok!!!")) + val flowGroupExecution = API.startFlowGroup(flowGroupJson) + flowGroupMap += (flowGroupExecution.groupId() -> flowGroupExecution) + val result = "{\"flowGroup\":{\"id\":\"" + flowGroupExecution.groupId() + "\"}}" + Future.successful(HttpResponse(entity = result)) } case ex => { @@ -244,15 +245,15 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup case HttpRequest(POST, Uri.Path("/flowGroup/stop"), headers, entity, protocol) =>{ val data = toJson(entity) - val groupName = data.get("groupName").getOrElse("").asInstanceOf[String] - if(groupName.equals("") || !flowGroupMap.contains(groupName)){ + val groupId = data.get("groupId").getOrElse("").asInstanceOf[String] + if(groupId.equals("") || !flowGroupMap.contains(groupId)){ Future.failed(new Exception("Can not found flowGroup Error!")) }else{ - flowGroupMap.get(groupName) match { + flowGroupMap.get(groupId) match { case Some(flowGroupExecution) => val result = flowGroupExecution.stop() - flowGroupMap.-(groupName) + flowGroupMap.-(groupId) Future.successful(HttpResponse(entity = "Stop FlowGroup Ok!!!")) case ex =>{ println(ex) @@ -270,9 +271,10 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup case HttpEntity.Strict(_, data) =>{ var projectJson = data.utf8String projectJson = projectJson.replaceAll("}","}\n") - val (projectName, projectExecution) = API.startProject(projectJson) - projectMap += (projectName -> projectExecution) - Future.successful(HttpResponse(entity = "start project ok!!!")) + val projectExecution = API.startProject(projectJson) + projectMap += (projectExecution.projectId() -> projectExecution) + val result = "{\"project\":{\"id\":\"" + projectExecution.projectId()+ "\"}}" + Future.successful(HttpResponse(entity = result)) } case ex => { @@ -286,15 +288,15 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup case HttpRequest(POST, Uri.Path("/project/stop"), headers, entity, protocol) =>{ val data = toJson(entity) - val projectName = data.get("projectName").getOrElse("").asInstanceOf[String] - if(projectName.equals("") || !projectMap.contains(projectName)){ + val projectId = data.get("projectId").getOrElse("").asInstanceOf[String] + if(projectId.equals("") || !projectMap.contains(projectId)){ Future.failed(new Exception("Can not found project Error!")) }else{ - projectMap.get(projectName) match { + projectMap.get(projectId) match { case Some(projectExecution) => val result = projectExecution.stop() - projectMap.-(projectName) + projectMap.-(projectId) Future.successful(HttpResponse(entity = "Stop project Ok!!!")) case ex =>{ println(ex)