diff --git a/piflow-core/src/main/scala/cn/piflow/project.scala b/piflow-core/src/main/scala/cn/piflow/project.scala index 01e16a5..569d2db 100644 --- a/piflow-core/src/main/scala/cn/piflow/project.scala +++ b/piflow-core/src/main/scala/cn/piflow/project.scala @@ -57,10 +57,10 @@ trait ProjectExecution extends Execution{ } class ProjectExecutionImpl(project: Project, runnerContext: Context, runner: Runner) extends ProjectExecution { - val mapFlowWithConditions: Map[String, (ProjectEntry, Condition[ProjectExecution])] = project.mapFlowWithConditions(); - val completedProcesses = MMap[String, Boolean](); - completedProcesses ++= mapFlowWithConditions.map(x => (x._1, false)); - val numWaitingProcesses = new AtomicInteger(mapFlowWithConditions.size); + val mapProjectEntryWithConditions: Map[String, (ProjectEntry, Condition[ProjectExecution])] = project.mapFlowWithConditions(); + val completedProjectEntry = MMap[String, Boolean](); + completedProjectEntry ++= mapProjectEntryWithConditions.map(x => (x._1, false)); + val numWaitingProjectEntry = new AtomicInteger(mapProjectEntryWithConditions.size); val startedProcesses = MMap[String, SparkAppHandle](); val startedFlowGroup = MMap[String, FlowGroupExecution]() @@ -99,8 +99,8 @@ class ProjectExecutionImpl(project: Project, runnerContext: Context, runner: Run override def onFlowGroupCompleted(ctx: FlowGroupContext): Unit = { startedFlowGroup.filter(_._2 == ctx.getFlowGroupExecution()).foreach { x => - completedProcesses(x._1) = true; - numWaitingProcesses.decrementAndGet(); + completedProjectEntry(x._1) = true; + numWaitingProjectEntry.decrementAndGet(); } } @@ -111,7 +111,7 @@ class ProjectExecutionImpl(project: Project, runnerContext: Context, runner: Run def isEntryCompleted(name: String): Boolean = { - completedProcesses(name) + completedProjectEntry(name) } private def startProcess(name: String, flow: Flow): Unit = { @@ -135,8 +135,8 @@ class ProjectExecutionImpl(project: Project, runnerContext: Context, runner: Run //TODO: get the process status if (handle.getState.equals(State.FINISHED)){ - completedProcesses(flow.getFlowName()) = true; - numWaitingProcesses.decrementAndGet(); + completedProjectEntry(flow.getFlowName()) = true; + numWaitingProjectEntry.decrementAndGet(); } if (handle.getState().isFinal){ countDownLatch.countDown() @@ -160,10 +160,10 @@ class ProjectExecutionImpl(project: Project, runnerContext: Context, runner: Run val pollingThread = new Thread(new Runnable() { override def run(): Unit = { - while (numWaitingProcesses.get() > 0) { + while (numWaitingProjectEntry.get() > 0) { val todosFlow = ArrayBuffer[(String, Flow)](); val todosFlowGroup = ArrayBuffer[(String, FlowGroup)](); - mapFlowWithConditions.foreach { en => + mapProjectEntryWithConditions.foreach { en => if(en._2._1.isInstanceOf[Flow]){ if (!startedProcesses.contains(en._1) && en._2._2.matches(execution)) {