forked from opensci/piflow
optimize code
This commit is contained in:
parent
da63af8562
commit
f36dd9cf64
|
@ -57,10 +57,10 @@ trait ProjectExecution extends Execution{
|
|||
}
|
||||
|
||||
class ProjectExecutionImpl(project: Project, runnerContext: Context, runner: Runner) extends ProjectExecution {
|
||||
val mapFlowWithConditions: Map[String, (ProjectEntry, Condition[ProjectExecution])] = project.mapFlowWithConditions();
|
||||
val completedProcesses = MMap[String, Boolean]();
|
||||
completedProcesses ++= mapFlowWithConditions.map(x => (x._1, false));
|
||||
val numWaitingProcesses = new AtomicInteger(mapFlowWithConditions.size);
|
||||
val mapProjectEntryWithConditions: Map[String, (ProjectEntry, Condition[ProjectExecution])] = project.mapFlowWithConditions();
|
||||
val completedProjectEntry = MMap[String, Boolean]();
|
||||
completedProjectEntry ++= mapProjectEntryWithConditions.map(x => (x._1, false));
|
||||
val numWaitingProjectEntry = new AtomicInteger(mapProjectEntryWithConditions.size);
|
||||
|
||||
val startedProcesses = MMap[String, SparkAppHandle]();
|
||||
val startedFlowGroup = MMap[String, FlowGroupExecution]()
|
||||
|
@ -99,8 +99,8 @@ class ProjectExecutionImpl(project: Project, runnerContext: Context, runner: Run
|
|||
|
||||
override def onFlowGroupCompleted(ctx: FlowGroupContext): Unit = {
|
||||
startedFlowGroup.filter(_._2 == ctx.getFlowGroupExecution()).foreach { x =>
|
||||
completedProcesses(x._1) = true;
|
||||
numWaitingProcesses.decrementAndGet();
|
||||
completedProjectEntry(x._1) = true;
|
||||
numWaitingProjectEntry.decrementAndGet();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -111,7 +111,7 @@ class ProjectExecutionImpl(project: Project, runnerContext: Context, runner: Run
|
|||
|
||||
|
||||
def isEntryCompleted(name: String): Boolean = {
|
||||
completedProcesses(name)
|
||||
completedProjectEntry(name)
|
||||
}
|
||||
|
||||
private def startProcess(name: String, flow: Flow): Unit = {
|
||||
|
@ -135,8 +135,8 @@ class ProjectExecutionImpl(project: Project, runnerContext: Context, runner: Run
|
|||
|
||||
//TODO: get the process status
|
||||
if (handle.getState.equals(State.FINISHED)){
|
||||
completedProcesses(flow.getFlowName()) = true;
|
||||
numWaitingProcesses.decrementAndGet();
|
||||
completedProjectEntry(flow.getFlowName()) = true;
|
||||
numWaitingProjectEntry.decrementAndGet();
|
||||
}
|
||||
if (handle.getState().isFinal){
|
||||
countDownLatch.countDown()
|
||||
|
@ -160,10 +160,10 @@ class ProjectExecutionImpl(project: Project, runnerContext: Context, runner: Run
|
|||
|
||||
val pollingThread = new Thread(new Runnable() {
|
||||
override def run(): Unit = {
|
||||
while (numWaitingProcesses.get() > 0) {
|
||||
while (numWaitingProjectEntry.get() > 0) {
|
||||
val todosFlow = ArrayBuffer[(String, Flow)]();
|
||||
val todosFlowGroup = ArrayBuffer[(String, FlowGroup)]();
|
||||
mapFlowWithConditions.foreach { en =>
|
||||
mapProjectEntryWithConditions.foreach { en =>
|
||||
|
||||
if(en._2._1.isInstanceOf[Flow]){
|
||||
if (!startedProcesses.contains(en._1) && en._2._2.matches(execution)) {
|
||||
|
|
Loading…
Reference in New Issue