forked from opensci/piflow
use quartz
This commit is contained in:
parent
d6dbcda3d0
commit
5428123c92
|
@ -11,10 +11,10 @@
|
|||
<sourceFolder url="file://$MODULE_DIR$/src/test/scala" isTestSource="true" />
|
||||
<excludeFolder url="file://$MODULE_DIR$/target" />
|
||||
</content>
|
||||
<orderEntry type="inheritedJdk" />
|
||||
<orderEntry type="sourceFolder" forTests="false" />
|
||||
<orderEntry type="library" name="scala-sdk-2.11.8" level="application" />
|
||||
<orderEntry type="library" scope="TEST" name="Maven: junit:junit:4.11" level="project" />
|
||||
<orderEntry type="library" name="scala-sdk-2.11.8" level="application" />
|
||||
<orderEntry type="inheritedJdk" />
|
||||
<orderEntry type="library" scope="TEST" name="Maven: org.hamcrest:hamcrest-core:1.3" level="project" />
|
||||
<orderEntry type="library" name="Maven: org.apache.spark:spark-core_2.11:2.1.0" level="project" />
|
||||
<orderEntry type="library" name="Maven: org.apache.avro:avro-mapred:hadoop2:1.7.7" level="project" />
|
||||
|
|
|
@ -14,7 +14,7 @@ import scala.collection.mutable.{Map => MMap}
|
|||
trait Flow {
|
||||
def addProcess(name: String, process: Process): Flow;
|
||||
|
||||
def trigger(name: String, trigger: Trigger): Flow;
|
||||
def schedule(name: String, trigger: Trigger): Flow;
|
||||
|
||||
def getProcess(name: String): Process;
|
||||
|
||||
|
@ -28,44 +28,48 @@ trait Trigger {
|
|||
}
|
||||
|
||||
trait BoundTrigger {
|
||||
def start(context: ExecutionContext);
|
||||
|
||||
def getProcessName(): String;
|
||||
|
||||
def getProcess(): Process;
|
||||
|
||||
def onProcessComplete(processName: String, context: ExecutionContext): Unit;
|
||||
def onStart(context: FlowExecutionContext);
|
||||
|
||||
def stop(context: ExecutionContext);
|
||||
def onProcessComplete(processName: String, context: FlowExecutionContext): Unit;
|
||||
|
||||
def onStop(context: FlowExecutionContext);
|
||||
}
|
||||
|
||||
abstract class AbstractBoundTrigger(name: String, process: Process) extends BoundTrigger {
|
||||
def start(context: ExecutionContext) {}
|
||||
def onStart(context: FlowExecutionContext) {}
|
||||
|
||||
def getProcessName() = name;
|
||||
|
||||
def getProcess() = process;
|
||||
|
||||
def stop(context: ExecutionContext) {}
|
||||
def onStop(context: FlowExecutionContext) {}
|
||||
}
|
||||
|
||||
object SequenceTriggerBuilder {
|
||||
|
||||
class SequenceTrigger(predecessors: Seq[String]) extends Trigger {
|
||||
val dps = predecessors.distinct;
|
||||
|
||||
var fired = false;
|
||||
val completed = MMap[String, Boolean]();
|
||||
dps.foreach { processName =>
|
||||
completed(processName) = false;
|
||||
};
|
||||
|
||||
override def bind(name: String, process: Process): BoundTrigger = new AbstractBoundTrigger(name, process) {
|
||||
override def onProcessComplete(processName: String, context: ExecutionContext): Unit = {
|
||||
override def bind(boundProcessName: String, process: Process): BoundTrigger = new AbstractBoundTrigger(boundProcessName, process) {
|
||||
override def onProcessComplete(processName: String, context: FlowExecutionContext): Unit = {
|
||||
//TODO: only fire once
|
||||
if (!fired && !processName.equals(boundProcessName)) {
|
||||
if (completed.contains(processName))
|
||||
completed(processName) = true;
|
||||
|
||||
if (completed.values.filter(!_).isEmpty) {
|
||||
context.startProcess(processName);
|
||||
fired = true;
|
||||
context.startProcess(boundProcessName);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -78,11 +82,11 @@ object TimerTriggerBuilder {
|
|||
|
||||
class TimerTrigger(cronExpr: String) extends Trigger {
|
||||
override def bind(name: String, process: Process): BoundTrigger = new AbstractBoundTrigger(name, process) {
|
||||
override def start(context: ExecutionContext): Unit = {
|
||||
override def onStart(context: FlowExecutionContext): Unit = {
|
||||
context.scheduleProcess(name, cronExpr);
|
||||
}
|
||||
|
||||
override def onProcessComplete(processName: String, context: ExecutionContext): Unit = {
|
||||
override def onProcessComplete(processName: String, context: FlowExecutionContext): Unit = {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -117,7 +121,7 @@ class FlowImpl extends Flow {
|
|||
this;
|
||||
};
|
||||
|
||||
def trigger(name: String, trigger: Trigger) = {
|
||||
def schedule(name: String, trigger: Trigger) = {
|
||||
triggers(name) = trigger.bind(name, processes(name));
|
||||
this;
|
||||
}
|
||||
|
@ -137,7 +141,7 @@ class RunnerImpl extends Runner {
|
|||
}
|
||||
}
|
||||
|
||||
trait ExecutionContext {
|
||||
trait FlowExecutionContext {
|
||||
def scheduleProcess(name: String, cronExpr: String);
|
||||
|
||||
def startProcess(name: String);
|
||||
|
@ -145,24 +149,24 @@ trait ExecutionContext {
|
|||
|
||||
class ExecutionImpl(flow: Flow, starts: Seq[String]) extends Execution with Logging {
|
||||
def start(): Unit = {
|
||||
triggers.foreach(_.start(ec));
|
||||
|
||||
starts.foreach { processName =>
|
||||
ec.startProcess(processName);
|
||||
};
|
||||
triggers.foreach(_.onStart(executionContext));
|
||||
|
||||
quartzScheduler.start();
|
||||
|
||||
starts.foreach { processName =>
|
||||
executionContext.startProcess(processName);
|
||||
}
|
||||
}
|
||||
|
||||
def stop() = {
|
||||
triggers.foreach(_.stop(ec));
|
||||
triggers.foreach(_.onStop(executionContext));
|
||||
quartzScheduler.shutdown();
|
||||
}
|
||||
|
||||
val quartzScheduler = StdSchedulerFactory.getDefaultScheduler();
|
||||
val triggers = flow.getTriggers();
|
||||
|
||||
val ec = new ExecutionContext() {
|
||||
val executionContext = new FlowExecutionContext() with Logging {
|
||||
|
||||
private def scheduleProcess(name: String, scheduleBuilder: ScheduleBuilder[CronTrigger] = null): Unit = {
|
||||
val builder = TriggerBuilder.newTrigger().startNow();
|
||||
|
@ -207,7 +211,7 @@ class ExecutionImpl(flow: Flow, starts: Seq[String]) extends Execution with Logg
|
|||
logger.info(s"process completed: $processName");
|
||||
|
||||
//notify all triggers
|
||||
triggers.foreach(_.onProcessComplete(processName, ec));
|
||||
triggers.foreach(_.onProcessComplete(processName, executionContext));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -9,22 +9,22 @@ import org.junit.Test
|
|||
class FlowTest {
|
||||
@Test
|
||||
def test1() {
|
||||
val chain = new FlowImpl();
|
||||
chain.addProcess("CleanHouse", new CleanHouse());
|
||||
chain.addProcess("CopyTextFile", new CopyTextFile());
|
||||
chain.addProcess("CountWords", new CountWords());
|
||||
chain.addProcess("PrintCount", new PrintCount());
|
||||
chain.addProcess("PrintMessage", new PrintMessage());
|
||||
val flow = new FlowImpl();
|
||||
flow.addProcess("CleanHouse", new CleanHouse());
|
||||
flow.addProcess("CopyTextFile", new CopyTextFile());
|
||||
flow.addProcess("CountWords", new CountWords());
|
||||
flow.addProcess("PrintCount", new PrintCount());
|
||||
flow.addProcess("PrintMessage", new PrintMessage());
|
||||
|
||||
chain.trigger("CopyTextFile", SequenceTriggerBuilder.after("CleanHouse"));
|
||||
chain.trigger("CountWords", SequenceTriggerBuilder.after("CopyTextFile"));
|
||||
chain.trigger("PrintCount", SequenceTriggerBuilder.after("CountWords"));
|
||||
chain.trigger("PrintMessage", TimerTriggerBuilder.cron("* * * * * ?"));
|
||||
flow.schedule("CopyTextFile", SequenceTriggerBuilder.after("CleanHouse"));
|
||||
flow.schedule("CountWords", SequenceTriggerBuilder.after("CopyTextFile"));
|
||||
flow.schedule("PrintCount", SequenceTriggerBuilder.after("CountWords"));
|
||||
flow.schedule("PrintMessage", TimerTriggerBuilder.cron("0/30 * * * * ? "));
|
||||
|
||||
val runner = new RunnerImpl();
|
||||
val exe = runner.run(chain, "CleanHouse");
|
||||
val exe = runner.run(flow, "CleanHouse");
|
||||
|
||||
Thread.sleep(10000);
|
||||
Thread.sleep(20000);
|
||||
exe.stop();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue