add listeners

This commit is contained in:
bluejoe2008@gmail.com 2018-05-10 23:02:21 +08:00
parent 9cf23d78d6
commit e2e3779e63
5 changed files with 96 additions and 63 deletions

View File

@ -0,0 +1,41 @@
package cn.piflow
import cn.piflow.util.Logging
import scala.collection.mutable.{ArrayBuffer, Map => MMap}
trait Event {
}
trait EventHandler {
def handle(event: Event, args: Any): Unit;
}
trait EventEmiter {
def fire(event: Event, args: Any): Unit;
def on(event: Event, handler: EventHandler): Unit;
}
class EventEmiterImpl extends EventEmiter with Logging {
val listeners = MMap[Event, ArrayBuffer[EventHandler]]();
def on(event: Event, handler: EventHandler): Unit = {
if (!listeners.contains(event))
listeners += event -> ArrayBuffer[EventHandler]();
listeners(event) += handler;
logger.debug(s"listening on $event, listener: $handler");
}
def fire(event: Event, args: Any = None): Unit = {
logger.debug(s"fired event: $event, args: $args");
if (listeners.contains(event)) {
for (listener <- listeners(event)) {
logger.debug(s"handling event: $event, args: $args, listener: $listener");
listener.handle(event, args);
logger.debug(s"handled event: $event, args: $args, listener: $listener");
}
}
}
}

View File

@ -1,9 +1,10 @@
package cn.piflow package cn.piflow
import java.util.concurrent.atomic.AtomicInteger
import cn.piflow.util.Logging import cn.piflow.util.Logging
import org.quartz.Trigger.CompletedExecutionInstruction import org.quartz.Trigger.CompletedExecutionInstruction
import org.quartz.impl.StdSchedulerFactory import org.quartz.impl.StdSchedulerFactory
import org.quartz.impl.matchers.GroupMatcher
import org.quartz.{Trigger => QuartzTrigger, _} import org.quartz.{Trigger => QuartzTrigger, _}
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
@ -31,39 +32,34 @@ class Flow {
def getTriggers() = triggers.toSeq; def getTriggers() = triggers.toSeq;
} }
class Runner { object Runner {
def run(flow: Flow, starts: String*): FlowExecution = { val idgen = new AtomicInteger();
new FlowExecutionImpl(flow, starts);
def run(flow: Flow, args: Map[String, Any] = Map()): FlowExecution = {
new FlowExecutionImpl("" + idgen.incrementAndGet(), flow, args);
} }
} }
trait FlowExecution { trait FlowExecution {
def start(args: Map[String, Any] = Map()); def addListener(listener: FlowExecutionListener);
def getId(): String;
def start(starts: String*);
def getContext(): ExecutionContext; def getContext(): ExecutionContext;
def getFlow(): Flow;
def stop(); def stop();
def getRunningProcesses(): Seq[(String, String)]; def getRunningProcesses(): Seq[(String, String)];
def getScheduledProcesses(): Seq[String];
} }
trait Process { trait Process {
def run(pc: ProcessContext); def run(pc: ProcessContext);
} }
trait EventListener {
def handle(event: Event, args: Any): Unit;
}
trait EventEmiter {
def fire(event: Event, args: Any): Unit;
def on(event: Event, listener: EventListener): Unit;
}
trait Context { trait Context {
def get(key: String): Any; def get(key: String): Any;
@ -101,31 +97,9 @@ class ProcessContext(executionContext: ExecutionContext) extends Context {
def getExecutionContext(): ExecutionContext = executionContext; def getExecutionContext(): ExecutionContext = executionContext;
} }
class EventEmiterImpl extends EventEmiter with Logging { class FlowExecutionImpl(id: String, flow: Flow, args: Map[String, Any])
val listeners = MMap[Event, ArrayBuffer[EventListener]](); extends FlowExecution with Logging {
def start(starts: String*): Unit = {
def on(event: Event, listener: EventListener): Unit = {
if (!listeners.contains(event))
listeners += event -> ArrayBuffer[EventListener]();
listeners(event) += listener;
logger.debug(s"listening on $event, listener: $listener");
}
def fire(event: Event, args: Any = None): Unit = {
logger.debug(s"fired event: $event, args: $args");
if (listeners.contains(event)) {
for (listener <- listeners(event)) {
logger.debug(s"handling event: $event, args: $args, listener: $listener");
listener.handle(event, args);
logger.debug(s"handled event: $event, args: $args, listener: $listener");
}
}
}
}
class FlowExecutionImpl(flow: Flow, starts: Seq[String]) extends FlowExecution with Logging {
def start(args: Map[String, Any]): Unit = {
//set context //set context
args.foreach { (en) => args.foreach { (en) =>
executionContext.put(en._1, en._2); executionContext.put(en._1, en._2);
@ -146,7 +120,7 @@ class FlowExecutionImpl(flow: Flow, starts: Seq[String]) extends FlowExecution w
quartzScheduler.shutdown(); quartzScheduler.shutdown();
} }
def getRunningProcesses(): Seq[(String, String)] = { override def getRunningProcesses(): Seq[(String, String)] = {
quartzScheduler.getCurrentlyExecutingJobs() quartzScheduler.getCurrentlyExecutingJobs()
.map { jec: JobExecutionContext => .map { jec: JobExecutionContext =>
(jec.getFireInstanceId, (jec.getFireInstanceId,
@ -154,16 +128,9 @@ class FlowExecutionImpl(flow: Flow, starts: Seq[String]) extends FlowExecution w
}; };
} }
def getScheduledProcesses(): Seq[String] = {
quartzScheduler.getJobKeys(GroupMatcher.anyGroup())
.map(quartzScheduler.getJobDetail(_))
.map(_.getJobDataMap.get("processName").asInstanceOf[String])
.toSeq;
}
val executionContext = new EventEmiterImpl() with ExecutionContext { val executionContext = new EventEmiterImpl() with ExecutionContext {
//listens on LaunchProcess //listens on LaunchProcess
this.on(LaunchProcess(), new EventListener() { this.on(LaunchProcess(), new EventHandler() {
override def handle(event: Event, args: Any): Unit = { override def handle(event: Event, args: Any): Unit = {
scheduleProcess(args.asInstanceOf[String]); scheduleProcess(args.asInstanceOf[String]);
} }
@ -208,6 +175,7 @@ class FlowExecutionImpl(flow: Flow, starts: Seq[String]) extends FlowExecution w
val quartzScheduler = StdSchedulerFactory.getDefaultScheduler(); val quartzScheduler = StdSchedulerFactory.getDefaultScheduler();
quartzScheduler.getContext.put("executionContext", executionContext); quartzScheduler.getContext.put("executionContext", executionContext);
val triggers = flow.getTriggers(); val triggers = flow.getTriggers();
val listeners = ArrayBuffer[FlowExecutionListener]();
quartzScheduler.getListenerManager.addTriggerListener(new TriggerListener { quartzScheduler.getListenerManager.addTriggerListener(new TriggerListener {
override def vetoJobExecution(trigger: QuartzTrigger, context: JobExecutionContext): Boolean = false; override def vetoJobExecution(trigger: QuartzTrigger, context: JobExecutionContext): Boolean = false;
@ -233,10 +201,20 @@ class FlowExecutionImpl(flow: Flow, starts: Seq[String]) extends FlowExecution w
logger.debug(s"process completed: $processName"); logger.debug(s"process completed: $processName");
executionContext.fire(ProcessCompleted(processName)); executionContext.fire(ProcessCompleted(processName));
} }
else {
logger.debug(s"process failed: $processName");
executionContext.fire(ProcessFailed(processName));
}
} }
}); });
override def getContext() = executionContext; override def getContext() = executionContext;
override def getId(): String = id;
override def addListener(listener: FlowExecutionListener): Unit = listeners += listener;
override def getFlow(): Flow = flow;
} }
case class LaunchProcess() extends Event { case class LaunchProcess() extends Event {
@ -245,6 +223,9 @@ case class LaunchProcess() extends Event {
case class ProcessStarted(processName: String) extends Event { case class ProcessStarted(processName: String) extends Event {
} }
case class ProcessFailed(processName: String) extends Event {
}
case class ProcessCompleted(processName: String) extends Event { case class ProcessCompleted(processName: String) extends Event {
} }
@ -260,7 +241,6 @@ class ProcessAsQuartzJob extends Job with Logging {
} }
catch { catch {
case e: Throwable => case e: Throwable =>
logger.warn(s"failed to execute process: $processName, cause: $e");
e.printStackTrace(); e.printStackTrace();
throw new JobExecutionException(s"failed to execute process: $processName", e); throw new JobExecutionException(s"failed to execute process: $processName", e);
} }

View File

@ -0,0 +1,18 @@
/**
* Created by bluejoe on 2018/5/10.
*/
package cn.piflow
trait FlowExecutionListener {
def onFlowStarted(execution: FlowExecution);
def onFlowShutdown(execution: FlowExecution);
def onEventFired(event: Event, args: Any, execution: FlowExecution);
def onProcessStarted(processName: String, process: Process, execution: FlowExecution);
def onProcessCompleted(processName: String, process: Process, execution: FlowExecution);
def onProcessFailed(processName: String, process: Process, cause: Exception, execution: FlowExecution);
}

View File

@ -8,17 +8,13 @@ trait Trigger {
def getTriggeredProcesses(): Seq[String]; def getTriggeredProcesses(): Seq[String];
} }
trait Event {
}
/** /**
* start process while dependent processes completed * start process while dependent processes completed
*/ */
object DependencyTrigger { object DependencyTrigger {
def declareDependency(processName: String, dependentProcesses: String*): Trigger = new Trigger() { def declareDependency(processName: String, dependentProcesses: String*): Trigger = new Trigger() {
override def activate(executionContext: ExecutionContext): Unit = { override def activate(executionContext: ExecutionContext): Unit = {
val listener = new EventListener { val listener = new EventHandler {
val completed = MMap[String, Boolean](); val completed = MMap[String, Boolean]();
dependentProcesses.foreach { processName => dependentProcesses.foreach { processName =>
completed(processName) = false; completed(processName) = false;
@ -65,7 +61,7 @@ object EventTrigger {
def listen(event: Event, processNames: String*): Trigger = new Trigger() { def listen(event: Event, processNames: String*): Trigger = new Trigger() {
override def activate(executionContext: ExecutionContext): Unit = { override def activate(executionContext: ExecutionContext): Unit = {
processNames.foreach { processName => processNames.foreach { processName =>
executionContext.on(event, new EventListener() { executionContext.on(event, new EventHandler() {
override def handle(event: Event, args: Any): Unit = { override def handle(event: Event, args: Any): Unit = {
executionContext.fire(LaunchProcess(), processName); executionContext.fire(LaunchProcess(), processName);
} }

View File

@ -18,13 +18,11 @@ class FlowTest {
flow.addTrigger(DependencyTrigger.declareDependency("PrintCount", "CountWords")); flow.addTrigger(DependencyTrigger.declareDependency("PrintCount", "CountWords"));
flow.addTrigger(TimerTrigger.cron("0/5 * * * * ? ", "PrintMessage")); flow.addTrigger(TimerTrigger.cron("0/5 * * * * ? ", "PrintMessage"));
val runner = new Runner();
val exe = runner.run(flow, "CleanHouse");
val spark = SparkSession.builder.master("local[4]") val spark = SparkSession.builder.master("local[4]")
.getOrCreate(); .getOrCreate();
exe.start(Map(classOf[SparkSession].getName -> spark)); val exe = Runner.run(flow, Map(classOf[SparkSession].getName -> spark));
exe.start("CleanHouse");
Thread.sleep(20000); Thread.sleep(20000);
exe.stop(); exe.stop();
} }