diff --git a/piflow.iml b/piflow.iml
index 4869786..7a68369 100644
--- a/piflow.iml
+++ b/piflow.iml
@@ -11,10 +11,10 @@
-
-
+
+
diff --git a/src/main/scala/control.scala b/src/main/scala/control.scala
index 597e18e..020c831 100644
--- a/src/main/scala/control.scala
+++ b/src/main/scala/control.scala
@@ -14,7 +14,7 @@ import scala.collection.mutable.{Map => MMap}
trait Flow {
def addProcess(name: String, process: Process): Flow;
- def trigger(name: String, trigger: Trigger): Flow;
+ def schedule(name: String, trigger: Trigger): Flow;
def getProcess(name: String): Process;
@@ -28,44 +28,48 @@ trait Trigger {
}
trait BoundTrigger {
- def start(context: ExecutionContext);
-
def getProcessName(): String;
def getProcess(): Process;
- def onProcessComplete(processName: String, context: ExecutionContext): Unit;
+ def onStart(context: FlowExecutionContext);
- def stop(context: ExecutionContext);
+ def onProcessComplete(processName: String, context: FlowExecutionContext): Unit;
+
+ def onStop(context: FlowExecutionContext);
}
abstract class AbstractBoundTrigger(name: String, process: Process) extends BoundTrigger {
- def start(context: ExecutionContext) {}
+ def onStart(context: FlowExecutionContext) {}
def getProcessName() = name;
def getProcess() = process;
- def stop(context: ExecutionContext) {}
+ def onStop(context: FlowExecutionContext) {}
}
object SequenceTriggerBuilder {
class SequenceTrigger(predecessors: Seq[String]) extends Trigger {
val dps = predecessors.distinct;
-
+var fired = false;
val completed = MMap[String, Boolean]();
dps.foreach { processName =>
completed(processName) = false;
};
- override def bind(name: String, process: Process): BoundTrigger = new AbstractBoundTrigger(name, process) {
- override def onProcessComplete(processName: String, context: ExecutionContext): Unit = {
- if (completed.contains(processName))
- completed(processName) = true;
+ override def bind(boundProcessName: String, process: Process): BoundTrigger = new AbstractBoundTrigger(boundProcessName, process) {
+ override def onProcessComplete(processName: String, context: FlowExecutionContext): Unit = {
+ //TODO: only fire once
+ if (!fired && !processName.equals(boundProcessName)) {
+ if (completed.contains(processName))
+ completed(processName) = true;
- if (completed.values.filter(!_).isEmpty) {
- context.startProcess(processName);
+ if (completed.values.filter(!_).isEmpty) {
+ fired = true;
+ context.startProcess(boundProcessName);
+ }
}
}
}
@@ -78,11 +82,11 @@ object TimerTriggerBuilder {
class TimerTrigger(cronExpr: String) extends Trigger {
override def bind(name: String, process: Process): BoundTrigger = new AbstractBoundTrigger(name, process) {
- override def start(context: ExecutionContext): Unit = {
+ override def onStart(context: FlowExecutionContext): Unit = {
context.scheduleProcess(name, cronExpr);
}
- override def onProcessComplete(processName: String, context: ExecutionContext): Unit = {
+ override def onProcessComplete(processName: String, context: FlowExecutionContext): Unit = {
}
}
}
@@ -117,7 +121,7 @@ class FlowImpl extends Flow {
this;
};
- def trigger(name: String, trigger: Trigger) = {
+ def schedule(name: String, trigger: Trigger) = {
triggers(name) = trigger.bind(name, processes(name));
this;
}
@@ -137,7 +141,7 @@ class RunnerImpl extends Runner {
}
}
-trait ExecutionContext {
+trait FlowExecutionContext {
def scheduleProcess(name: String, cronExpr: String);
def startProcess(name: String);
@@ -145,24 +149,24 @@ trait ExecutionContext {
class ExecutionImpl(flow: Flow, starts: Seq[String]) extends Execution with Logging {
def start(): Unit = {
- triggers.foreach(_.start(ec));
-
- starts.foreach { processName =>
- ec.startProcess(processName);
- };
+ triggers.foreach(_.onStart(executionContext));
quartzScheduler.start();
+
+ starts.foreach { processName =>
+ executionContext.startProcess(processName);
+ }
}
def stop() = {
- triggers.foreach(_.stop(ec));
+ triggers.foreach(_.onStop(executionContext));
quartzScheduler.shutdown();
}
val quartzScheduler = StdSchedulerFactory.getDefaultScheduler();
val triggers = flow.getTriggers();
- val ec = new ExecutionContext() {
+ val executionContext = new FlowExecutionContext() with Logging {
private def scheduleProcess(name: String, scheduleBuilder: ScheduleBuilder[CronTrigger] = null): Unit = {
val builder = TriggerBuilder.newTrigger().startNow();
@@ -207,7 +211,7 @@ class ExecutionImpl(flow: Flow, starts: Seq[String]) extends Execution with Logg
logger.info(s"process completed: $processName");
//notify all triggers
- triggers.foreach(_.onProcessComplete(processName, ec));
+ triggers.foreach(_.onProcessComplete(processName, executionContext));
}
});
}
diff --git a/src/test/scala/FlowTest.scala b/src/test/scala/FlowTest.scala
index d4f454d..7127f84 100644
--- a/src/test/scala/FlowTest.scala
+++ b/src/test/scala/FlowTest.scala
@@ -9,22 +9,22 @@ import org.junit.Test
class FlowTest {
@Test
def test1() {
- val chain = new FlowImpl();
- chain.addProcess("CleanHouse", new CleanHouse());
- chain.addProcess("CopyTextFile", new CopyTextFile());
- chain.addProcess("CountWords", new CountWords());
- chain.addProcess("PrintCount", new PrintCount());
- chain.addProcess("PrintMessage", new PrintMessage());
+ val flow = new FlowImpl();
+ flow.addProcess("CleanHouse", new CleanHouse());
+ flow.addProcess("CopyTextFile", new CopyTextFile());
+ flow.addProcess("CountWords", new CountWords());
+ flow.addProcess("PrintCount", new PrintCount());
+ flow.addProcess("PrintMessage", new PrintMessage());
- chain.trigger("CopyTextFile", SequenceTriggerBuilder.after("CleanHouse"));
- chain.trigger("CountWords", SequenceTriggerBuilder.after("CopyTextFile"));
- chain.trigger("PrintCount", SequenceTriggerBuilder.after("CountWords"));
- chain.trigger("PrintMessage", TimerTriggerBuilder.cron("* * * * * ?"));
+ flow.schedule("CopyTextFile", SequenceTriggerBuilder.after("CleanHouse"));
+ flow.schedule("CountWords", SequenceTriggerBuilder.after("CopyTextFile"));
+ flow.schedule("PrintCount", SequenceTriggerBuilder.after("CountWords"));
+ flow.schedule("PrintMessage", TimerTriggerBuilder.cron("0/30 * * * * ? "));
val runner = new RunnerImpl();
- val exe = runner.run(chain, "CleanHouse");
+ val exe = runner.run(flow, "CleanHouse");
- Thread.sleep(10000);
+ Thread.sleep(20000);
exe.stop();
}
}