process group

This commit is contained in:
bluejoe2008@gmail.com 2018-06-27 18:40:31 +08:00
parent c557af090c
commit fccf7a9717
3 changed files with 13 additions and 6 deletions

View File

@ -1,9 +1,10 @@
package cn.piflow package cn.piflow
import java.sql.Date import java.sql.Date
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.{CountDownLatch, TimeUnit}
import scala.collection.mutable.{Map => MMap} import scala.collection.mutable.{ArrayBuffer, Map => MMap}
/** /**
* Created by bluejoe on 2018/6/27. * Created by bluejoe on 2018/6/27.
@ -91,6 +92,7 @@ class FlowGroupExecutionImpl(fg: FlowGroup, runnerContext: Context, runner: Runn
val mapFlowWithConditions: Map[String, (Flow, Condition)] = fg.mapFlowWithConditions(); val mapFlowWithConditions: Map[String, (Flow, Condition)] = fg.mapFlowWithConditions();
val completedProcesses = MMap[String, Boolean](); val completedProcesses = MMap[String, Boolean]();
completedProcesses ++= mapFlowWithConditions.map(x => (x._1, false)); completedProcesses ++= mapFlowWithConditions.map(x => (x._1, false));
val numWaitingProcesses = new AtomicInteger(mapFlowWithConditions.size);
val startedProcesses = MMap[String, Process](); val startedProcesses = MMap[String, Process]();
val execution = this; val execution = this;
val POLLING_INTERVAL = 1000; val POLLING_INTERVAL = 1000;
@ -104,6 +106,7 @@ class FlowGroupExecutionImpl(fg: FlowGroup, runnerContext: Context, runner: Runn
override def onProcessCompleted(ctx: ProcessContext): Unit = { override def onProcessCompleted(ctx: ProcessContext): Unit = {
startedProcesses.filter(_._2 == ctx.getProcess()).foreach { x => startedProcesses.filter(_._2 == ctx.getProcess()).foreach { x =>
completedProcesses(x._1) = true; completedProcesses(x._1) = true;
numWaitingProcesses.decrementAndGet();
} }
} }
@ -123,7 +126,7 @@ class FlowGroupExecutionImpl(fg: FlowGroup, runnerContext: Context, runner: Runn
runner.addListener(listener); runner.addListener(listener);
def isProcessCompleted(processName: String): Boolean = { def isProcessCompleted(processName: String): Boolean = {
completedProcesses.contains(processName); completedProcesses(processName);
} }
private def startProcess(name: String, flow: Flow): Unit = { private def startProcess(name: String, flow: Flow): Unit = {
@ -133,13 +136,18 @@ class FlowGroupExecutionImpl(fg: FlowGroup, runnerContext: Context, runner: Runn
val pollingThread = new Thread(new Runnable() { val pollingThread = new Thread(new Runnable() {
override def run(): Unit = { override def run(): Unit = {
while (!completedProcesses.filter(_._2 == false).isEmpty) { while (numWaitingProcesses.get() > 0) {
val todos = ArrayBuffer[(String, Flow)]();
mapFlowWithConditions.foreach { en => mapFlowWithConditions.foreach { en =>
if (!startedProcesses.contains(en._1) && en._2._2.matches(execution)) { if (!startedProcesses.contains(en._1) && en._2._2.matches(execution)) {
startProcess(en._1, en._2._1); todos += (en._1 -> en._2._1);
} }
} }
startedProcesses.synchronized {
todos.foreach(en => startProcess(en._1, en._2));
}
Thread.sleep(POLLING_INTERVAL); Thread.sleep(POLLING_INTERVAL);
} }

View File

@ -19,7 +19,7 @@ trait Runner {
object Runner { object Runner {
def create(): Runner = new Runner() { def create(): Runner = new Runner() {
val listeners = ArrayBuffer[RunnerListener](new RunnerLogger()); val listeners = ArrayBuffer[RunnerListener](new RunnerLogger());
listeners += new RunnerLogger();
val compositeListener = new RunnerListener() { val compositeListener = new RunnerListener() {
override def onProcessStarted(ctx: ProcessContext): Unit = { override def onProcessStarted(ctx: ProcessContext): Unit = {
listeners.foreach(_.onProcessStarted(ctx)); listeners.foreach(_.onProcessStarted(ctx));

View File

@ -26,7 +26,6 @@ class FlowGroupTest {
val spark = SparkSession.builder.master("local[4]") val spark = SparkSession.builder.master("local[4]")
.getOrCreate(); .getOrCreate();
val process = Runner.create() val process = Runner.create()
.bind(classOf[SparkSession].getName, spark) .bind(classOf[SparkSession].getName, spark)
.start(fg); .start(fg);