update state when stop flowGroup or project

This commit is contained in:
judy0131 2019-05-16 10:03:42 +08:00
parent c8c01811b4
commit c64f492db6
5 changed files with 91 additions and 38 deletions

View File

@ -1,6 +1,5 @@
package cn.piflow
import java.sql.Date
import java.util.Date
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{CountDownLatch, TimeUnit}
@ -69,7 +68,7 @@ class FlowGroupExecutionImpl(fg: FlowGroup, runnerContext: Context, runner: Runn
val flowGroupContext = createContext(runnerContext);
val flowGroupExecution = this;
val id : String = "group_" + IdGenerator.uuid() + "_" + IdGenerator.nextId[FlowGroupExecution];
val id : String = "group_" + IdGenerator.uuid() ;
val mapFlowWithConditions: Map[String, (Flow, Condition[FlowGroupExecution])] = fg.mapFlowWithConditions();
val completedProcesses = MMap[String, Boolean]();
@ -199,6 +198,7 @@ class FlowGroupExecutionImpl(fg: FlowGroup, runnerContext: Context, runner: Runn
override def stop(): Unit = {
finalizeExecution(false);
//runnerListener.onFlowGroupStoped(flowGroupContext)
}
override def awaitTermination(timeout: Long, unit: TimeUnit): Unit = {
@ -211,6 +211,7 @@ class FlowGroupExecutionImpl(fg: FlowGroup, runnerContext: Context, runner: Runn
if (!completed) {
//startedProcesses.filter(x => isEntryCompleted(x._1)).map(_._2).foreach(_.stop());
startedProcesses.synchronized{
startedProcesses.filter(x => !isEntryCompleted(x._1)).foreach(x => {
x._2.stop()
@ -222,6 +223,7 @@ class FlowGroupExecutionImpl(fg: FlowGroup, runnerContext: Context, runner: Runn
});
pollingThread.interrupt();
}
}

View File

@ -1,6 +1,6 @@
package cn.piflow
import java.sql.Date
import java.util.Date
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{CountDownLatch, TimeUnit}
@ -63,7 +63,7 @@ class ProjectExecutionImpl(project: Project, runnerContext: Context, runner: Run
val projectContext = createContext(runnerContext);
val projectExecution = this;
val id : String = "project_" + IdGenerator.uuid() + "_" + IdGenerator.nextId[ProjectExecution];
val id : String = "project_" + IdGenerator.uuid();
val mapProjectEntryWithConditions: Map[String, (ProjectEntry, Condition[ProjectExecution])] = project.mapFlowWithConditions();
val completedProjectEntry = MMap[String, Boolean]();
@ -121,6 +121,10 @@ class ProjectExecutionImpl(project: Project, runnerContext: Context, runner: Run
override def onProjectCompleted(ctx: ProjectContext): Unit = {}
override def onProjectFailed(ctx: ProjectContext): Unit = {}
override def onFlowGroupStoped(ctx: FlowGroupContext): Unit = {}
override def onProjectStoped(ctx: ProjectContext): Unit = {}
};
runner.addListener(listener);
@ -259,6 +263,7 @@ class ProjectExecutionImpl(project: Project, runnerContext: Context, runner: Run
override def stop(): Unit = {
finalizeExecution(false);
//runnerListener.onProjectStoped(projectContext)
}
override def awaitTermination(timeout: Long, unit: TimeUnit): Unit = {
@ -271,6 +276,7 @@ class ProjectExecutionImpl(project: Project, runnerContext: Context, runner: Run
if (!completed) {
//pollingThread.interrupt();
//startedProcesses.filter(x => !isEntryCompleted(x._1)).map(_._2).foreach(_.stop());
startedProcesses.synchronized{
startedProcesses.filter(x => !isEntryCompleted(x._1)).foreach(x => {
x._2.stop()
@ -281,7 +287,11 @@ class ProjectExecutionImpl(project: Project, runnerContext: Context, runner: Run
}
});
}
startedFlowGroup.synchronized{
startedFlowGroup.filter(x => !isEntryCompleted(x._1)).map(_._2).foreach(_.stop());
}
pollingThread.interrupt();
}

View File

@ -92,6 +92,14 @@ object Runner {
override def onProjectFailed(ctx: ProjectContext): Unit = {
listeners.foreach(_.onProjectFailed(ctx));
}
override def onFlowGroupStoped(ctx: FlowGroupContext): Unit = {
listeners.foreach(_.onFlowGroupStoped(ctx))
}
override def onProjectStoped(ctx: ProjectContext): Unit = {
listeners.foreach(_.onProjectStoped(ctx))
}
}
override def addListener(listener: RunnerListener) = {
@ -153,11 +161,15 @@ trait RunnerListener {
def onFlowGroupFailed(ctx: FlowGroupContext);
def onFlowGroupStoped(ctx: FlowGroupContext);
def onProjectStarted(ctx: ProjectContext);
def onProjectCompleted(ctx: ProjectContext);
def onProjectFailed(ctx: ProjectContext);
def onProjectStoped(ctx: ProjectContext);
}
@ -305,6 +317,19 @@ class RunnerLogger extends RunnerListener with Logging {
H2Util.updateFlowGroupFinishedTime(groupId,time)
}
override def onFlowGroupStoped(ctx: FlowGroupContext): Unit = {
//TODO: write monitor data into db
val groupId = ctx.getFlowGroupExecution().groupId()
val flowGroupName = ctx.getFlowGroup().getFlowGroupName()
val time = new Date().toString
logger.debug(s"Flow Group stoped: $groupId, time: $time");
println(s"Flow Group stoped: $groupId, time: $time")
//update flow group state to COMPLETED
H2Util.updateFlowGroupState(groupId,FlowGroupState.KILLED)
H2Util.updateFlowGroupFinishedTime(groupId,time)
}
override def onFlowGroupFailed(ctx: FlowGroupContext): Unit = {
//TODO: write monitor data into db
val groupId = ctx.getFlowGroupExecution().groupId()
@ -353,4 +378,18 @@ class RunnerLogger extends RunnerListener with Logging {
H2Util.updateProjectState(projectId,ProjectState.FAILED)
H2Util.updateProjectFinishedTime(projectId,time)
}
override def onProjectStoped(ctx: ProjectContext): Unit = {
val projectId = ctx.getProjectExecution().projectId()
val projectName = ctx.getProject().getProjectName()
val time = new Date().toString
logger.debug(s"Project failed: $projectId, time: $time");
println(s"Project failed: $projectId, time: $time")
//update project state to FAILED
H2Util.updateProjectState(projectId,ProjectState.KILLED)
H2Util.updateProjectFinishedTime(projectId,time)
}
}

View File

@ -41,7 +41,7 @@ object API {
.bind("debug.path",PropertyUtil.getPropertyValue("debug.path"))
.start(project);
(projectBean.name,projectExecution)
projectExecution
}
@ -62,7 +62,7 @@ object API {
.bind("debug.path",PropertyUtil.getPropertyValue("debug.path"))
.start(flowGroup);
(flowGroupBean.name, flowGroupExecution)
flowGroupExecution
}
def startFlow(flowJson : String):(String,String,SparkAppHandle) = {

View File

@ -228,9 +228,10 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup
case HttpEntity.Strict(_, data) =>{
var flowGroupJson = data.utf8String
flowGroupJson = flowGroupJson.replaceAll("}","}\n")
val (flowName, flowGroupExecution) = API.startFlowGroup(flowGroupJson)
flowGroupMap += (flowName -> flowGroupExecution)
Future.successful(HttpResponse(entity = "start flow group ok!!!"))
val flowGroupExecution = API.startFlowGroup(flowGroupJson)
flowGroupMap += (flowGroupExecution.groupId() -> flowGroupExecution)
val result = "{\"flowGroup\":{\"id\":\"" + flowGroupExecution.groupId() + "\"}}"
Future.successful(HttpResponse(entity = result))
}
case ex => {
@ -244,15 +245,15 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup
case HttpRequest(POST, Uri.Path("/flowGroup/stop"), headers, entity, protocol) =>{
val data = toJson(entity)
val groupName = data.get("groupName").getOrElse("").asInstanceOf[String]
if(groupName.equals("") || !flowGroupMap.contains(groupName)){
val groupId = data.get("groupId").getOrElse("").asInstanceOf[String]
if(groupId.equals("") || !flowGroupMap.contains(groupId)){
Future.failed(new Exception("Can not found flowGroup Error!"))
}else{
flowGroupMap.get(groupName) match {
flowGroupMap.get(groupId) match {
case Some(flowGroupExecution) =>
val result = flowGroupExecution.stop()
flowGroupMap.-(groupName)
flowGroupMap.-(groupId)
Future.successful(HttpResponse(entity = "Stop FlowGroup Ok!!!"))
case ex =>{
println(ex)
@ -270,9 +271,10 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup
case HttpEntity.Strict(_, data) =>{
var projectJson = data.utf8String
projectJson = projectJson.replaceAll("}","}\n")
val (projectName, projectExecution) = API.startProject(projectJson)
projectMap += (projectName -> projectExecution)
Future.successful(HttpResponse(entity = "start project ok!!!"))
val projectExecution = API.startProject(projectJson)
projectMap += (projectExecution.projectId() -> projectExecution)
val result = "{\"project\":{\"id\":\"" + projectExecution.projectId()+ "\"}}"
Future.successful(HttpResponse(entity = result))
}
case ex => {
@ -286,15 +288,15 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup
case HttpRequest(POST, Uri.Path("/project/stop"), headers, entity, protocol) =>{
val data = toJson(entity)
val projectName = data.get("projectName").getOrElse("").asInstanceOf[String]
if(projectName.equals("") || !projectMap.contains(projectName)){
val projectId = data.get("projectId").getOrElse("").asInstanceOf[String]
if(projectId.equals("") || !projectMap.contains(projectId)){
Future.failed(new Exception("Can not found project Error!"))
}else{
projectMap.get(projectName) match {
projectMap.get(projectId) match {
case Some(projectExecution) =>
val result = projectExecution.stop()
projectMap.-(projectName)
projectMap.-(projectId)
Future.successful(HttpResponse(entity = "Stop project Ok!!!"))
case ex =>{
println(ex)