From e2e3779e6311b494ec3b4ecf86fa6b35a3920323 Mon Sep 17 00:00:00 2001 From: "bluejoe2008@gmail.com" Date: Thu, 10 May 2018 23:02:21 +0800 Subject: [PATCH] add listeners --- src/main/scala/event.scala | 41 +++++++++++++++++ src/main/scala/flow.scala | 86 ++++++++++++++--------------------- src/main/scala/listener.scala | 18 ++++++++ src/main/scala/trigger.scala | 8 +--- src/test/scala/FlowTest.scala | 6 +-- 5 files changed, 96 insertions(+), 63 deletions(-) create mode 100644 src/main/scala/event.scala create mode 100644 src/main/scala/listener.scala diff --git a/src/main/scala/event.scala b/src/main/scala/event.scala new file mode 100644 index 0000000..6284c67 --- /dev/null +++ b/src/main/scala/event.scala @@ -0,0 +1,41 @@ +package cn.piflow + +import cn.piflow.util.Logging +import scala.collection.mutable.{ArrayBuffer, Map => MMap} + +trait Event { + +} + +trait EventHandler { + def handle(event: Event, args: Any): Unit; +} + +trait EventEmiter { + def fire(event: Event, args: Any): Unit; + + def on(event: Event, handler: EventHandler): Unit; +} + +class EventEmiterImpl extends EventEmiter with Logging { + val listeners = MMap[Event, ArrayBuffer[EventHandler]](); + + def on(event: Event, handler: EventHandler): Unit = { + if (!listeners.contains(event)) + listeners += event -> ArrayBuffer[EventHandler](); + + listeners(event) += handler; + logger.debug(s"listening on $event, listener: $handler"); + } + + def fire(event: Event, args: Any = None): Unit = { + logger.debug(s"fired event: $event, args: $args"); + if (listeners.contains(event)) { + for (listener <- listeners(event)) { + logger.debug(s"handling event: $event, args: $args, listener: $listener"); + listener.handle(event, args); + logger.debug(s"handled event: $event, args: $args, listener: $listener"); + } + } + } +} \ No newline at end of file diff --git a/src/main/scala/flow.scala b/src/main/scala/flow.scala index bb1f3aa..03f9511 100644 --- a/src/main/scala/flow.scala +++ b/src/main/scala/flow.scala @@ -1,9 +1,10 @@ package cn.piflow +import java.util.concurrent.atomic.AtomicInteger + import cn.piflow.util.Logging import org.quartz.Trigger.CompletedExecutionInstruction import org.quartz.impl.StdSchedulerFactory -import org.quartz.impl.matchers.GroupMatcher import org.quartz.{Trigger => QuartzTrigger, _} import scala.collection.JavaConversions._ @@ -31,39 +32,34 @@ class Flow { def getTriggers() = triggers.toSeq; } -class Runner { - def run(flow: Flow, starts: String*): FlowExecution = { - new FlowExecutionImpl(flow, starts); +object Runner { + val idgen = new AtomicInteger(); + + def run(flow: Flow, args: Map[String, Any] = Map()): FlowExecution = { + new FlowExecutionImpl("" + idgen.incrementAndGet(), flow, args); } } trait FlowExecution { - def start(args: Map[String, Any] = Map()); + def addListener(listener: FlowExecutionListener); + + def getId(): String; + + def start(starts: String*); def getContext(): ExecutionContext; + def getFlow(): Flow; + def stop(); def getRunningProcesses(): Seq[(String, String)]; - - def getScheduledProcesses(): Seq[String]; - } trait Process { def run(pc: ProcessContext); } -trait EventListener { - def handle(event: Event, args: Any): Unit; -} - -trait EventEmiter { - def fire(event: Event, args: Any): Unit; - - def on(event: Event, listener: EventListener): Unit; -} - trait Context { def get(key: String): Any; @@ -101,31 +97,9 @@ class ProcessContext(executionContext: ExecutionContext) extends Context { def getExecutionContext(): ExecutionContext = executionContext; } -class EventEmiterImpl extends EventEmiter with Logging { - val listeners = MMap[Event, ArrayBuffer[EventListener]](); - - def on(event: Event, listener: EventListener): Unit = { - if (!listeners.contains(event)) - listeners += event -> ArrayBuffer[EventListener](); - - listeners(event) += listener; - logger.debug(s"listening on $event, listener: $listener"); - } - - def fire(event: Event, args: Any = None): Unit = { - logger.debug(s"fired event: $event, args: $args"); - if (listeners.contains(event)) { - for (listener <- listeners(event)) { - logger.debug(s"handling event: $event, args: $args, listener: $listener"); - listener.handle(event, args); - logger.debug(s"handled event: $event, args: $args, listener: $listener"); - } - } - } -} - -class FlowExecutionImpl(flow: Flow, starts: Seq[String]) extends FlowExecution with Logging { - def start(args: Map[String, Any]): Unit = { +class FlowExecutionImpl(id: String, flow: Flow, args: Map[String, Any]) + extends FlowExecution with Logging { + def start(starts: String*): Unit = { //set context args.foreach { (en) => executionContext.put(en._1, en._2); @@ -146,7 +120,7 @@ class FlowExecutionImpl(flow: Flow, starts: Seq[String]) extends FlowExecution w quartzScheduler.shutdown(); } - def getRunningProcesses(): Seq[(String, String)] = { + override def getRunningProcesses(): Seq[(String, String)] = { quartzScheduler.getCurrentlyExecutingJobs() .map { jec: JobExecutionContext => (jec.getFireInstanceId, @@ -154,16 +128,9 @@ class FlowExecutionImpl(flow: Flow, starts: Seq[String]) extends FlowExecution w }; } - def getScheduledProcesses(): Seq[String] = { - quartzScheduler.getJobKeys(GroupMatcher.anyGroup()) - .map(quartzScheduler.getJobDetail(_)) - .map(_.getJobDataMap.get("processName").asInstanceOf[String]) - .toSeq; - } - val executionContext = new EventEmiterImpl() with ExecutionContext { //listens on LaunchProcess - this.on(LaunchProcess(), new EventListener() { + this.on(LaunchProcess(), new EventHandler() { override def handle(event: Event, args: Any): Unit = { scheduleProcess(args.asInstanceOf[String]); } @@ -208,6 +175,7 @@ class FlowExecutionImpl(flow: Flow, starts: Seq[String]) extends FlowExecution w val quartzScheduler = StdSchedulerFactory.getDefaultScheduler(); quartzScheduler.getContext.put("executionContext", executionContext); val triggers = flow.getTriggers(); + val listeners = ArrayBuffer[FlowExecutionListener](); quartzScheduler.getListenerManager.addTriggerListener(new TriggerListener { override def vetoJobExecution(trigger: QuartzTrigger, context: JobExecutionContext): Boolean = false; @@ -233,10 +201,20 @@ class FlowExecutionImpl(flow: Flow, starts: Seq[String]) extends FlowExecution w logger.debug(s"process completed: $processName"); executionContext.fire(ProcessCompleted(processName)); } + else { + logger.debug(s"process failed: $processName"); + executionContext.fire(ProcessFailed(processName)); + } } }); override def getContext() = executionContext; + + override def getId(): String = id; + + override def addListener(listener: FlowExecutionListener): Unit = listeners += listener; + + override def getFlow(): Flow = flow; } case class LaunchProcess() extends Event { @@ -245,6 +223,9 @@ case class LaunchProcess() extends Event { case class ProcessStarted(processName: String) extends Event { } +case class ProcessFailed(processName: String) extends Event { +} + case class ProcessCompleted(processName: String) extends Event { } @@ -260,7 +241,6 @@ class ProcessAsQuartzJob extends Job with Logging { } catch { case e: Throwable => - logger.warn(s"failed to execute process: $processName, cause: $e"); e.printStackTrace(); throw new JobExecutionException(s"failed to execute process: $processName", e); } diff --git a/src/main/scala/listener.scala b/src/main/scala/listener.scala new file mode 100644 index 0000000..a7a5cc6 --- /dev/null +++ b/src/main/scala/listener.scala @@ -0,0 +1,18 @@ +/** + * Created by bluejoe on 2018/5/10. + */ +package cn.piflow + +trait FlowExecutionListener { + def onFlowStarted(execution: FlowExecution); + + def onFlowShutdown(execution: FlowExecution); + + def onEventFired(event: Event, args: Any, execution: FlowExecution); + + def onProcessStarted(processName: String, process: Process, execution: FlowExecution); + + def onProcessCompleted(processName: String, process: Process, execution: FlowExecution); + + def onProcessFailed(processName: String, process: Process, cause: Exception, execution: FlowExecution); +} diff --git a/src/main/scala/trigger.scala b/src/main/scala/trigger.scala index c5d9eb2..dd2e891 100644 --- a/src/main/scala/trigger.scala +++ b/src/main/scala/trigger.scala @@ -8,17 +8,13 @@ trait Trigger { def getTriggeredProcesses(): Seq[String]; } -trait Event { - -} - /** * start process while dependent processes completed */ object DependencyTrigger { def declareDependency(processName: String, dependentProcesses: String*): Trigger = new Trigger() { override def activate(executionContext: ExecutionContext): Unit = { - val listener = new EventListener { + val listener = new EventHandler { val completed = MMap[String, Boolean](); dependentProcesses.foreach { processName => completed(processName) = false; @@ -65,7 +61,7 @@ object EventTrigger { def listen(event: Event, processNames: String*): Trigger = new Trigger() { override def activate(executionContext: ExecutionContext): Unit = { processNames.foreach { processName => - executionContext.on(event, new EventListener() { + executionContext.on(event, new EventHandler() { override def handle(event: Event, args: Any): Unit = { executionContext.fire(LaunchProcess(), processName); } diff --git a/src/test/scala/FlowTest.scala b/src/test/scala/FlowTest.scala index f5719b5..b8f00a2 100644 --- a/src/test/scala/FlowTest.scala +++ b/src/test/scala/FlowTest.scala @@ -18,13 +18,11 @@ class FlowTest { flow.addTrigger(DependencyTrigger.declareDependency("PrintCount", "CountWords")); flow.addTrigger(TimerTrigger.cron("0/5 * * * * ? ", "PrintMessage")); - val runner = new Runner(); - val exe = runner.run(flow, "CleanHouse"); - val spark = SparkSession.builder.master("local[4]") .getOrCreate(); - exe.start(Map(classOf[SparkSession].getName -> spark)); + val exe = Runner.run(flow, Map(classOf[SparkSession].getName -> spark)); + exe.start("CleanHouse"); Thread.sleep(20000); exe.stop(); }