stages of process

This commit is contained in:
bluejoe2008@gmail.com 2018-05-13 12:45:42 +08:00
parent ab3a86c6c3
commit f3391ddcf1
12 changed files with 597 additions and 261 deletions

View File

@ -0,0 +1,58 @@
package cn.piflow
import java.io.File
import scala.collection.mutable.{Map => MMap}
object FlowElement {
def fromFile(file: File): FlowElement = {
null;
}
def saveFile(flowElement: FlowElement, file: File): Unit = {
}
}
class FlowElement {
def build(): Flow = {
null;
}
}
class SparkProcessElement {
def build(): SparkProcess = {
null;
}
}
trait FlowElementManager {
def list(): Seq[(String, FlowElement)];
def get(name: String): Option[FlowElement];
def add(name: String, flowJson: FlowElement): Unit;
def delete(name: String): Unit;
}
class InMemoryFlowElementManager extends FlowElementManager {
val items = MMap[String, FlowElement]();
override def list(): Seq[(String, FlowElement)] = items.toSeq;
override def get(name: String): Option[FlowElement] = items.get(name);
override def delete(name: String) = items - name;
override def add(name: String, flowJson: FlowElement) {
items(name) = flowJson;
}
}
class SqlFlowElementManager /* extends FlowJsonManager */ {
}
class FileSystemFlowElementManager {
}

View File

@ -1,6 +1,7 @@
package cn.piflow package cn.piflow
import cn.piflow.util.Logging import cn.piflow.util.Logging
import scala.collection.mutable.{ArrayBuffer, Map => MMap} import scala.collection.mutable.{ArrayBuffer, Map => MMap}
trait Event { trait Event {
@ -24,7 +25,7 @@ trait EventHandler {
} }
trait EventEmiter { trait EventEmiter {
def fire(event: Event, args: Any): Unit; def fire(event: Event, args: Any = None): Unit;
def on(event: Event, handler: EventHandler): Unit; def on(event: Event, handler: EventHandler): Unit;
} }
@ -40,7 +41,7 @@ class EventEmiterImpl extends EventEmiter with Logging {
logger.debug(s"listening on $event, listener: $handler"); logger.debug(s"listening on $event, listener: $handler");
} }
def fire(event: Event, args: Any = None): Unit = { def fire(event: Event, args: Any): Unit = {
logger.debug(s"fired event: $event, args: $args"); logger.debug(s"fired event: $event, args: $args");
if (listeners.contains(event)) { if (listeners.contains(event)) {
for (listener <- listeners(event)) { for (listener <- listeners(event)) {

View File

@ -1,8 +1,6 @@
package cn.piflow package cn.piflow
import java.util.concurrent.atomic.AtomicInteger import cn.piflow.util.{IdGenerator, Logging}
import cn.piflow.util.Logging
import org.quartz.Trigger.CompletedExecutionInstruction import org.quartz.Trigger.CompletedExecutionInstruction
import org.quartz.impl.StdSchedulerFactory import org.quartz.impl.StdSchedulerFactory
import org.quartz.{Trigger => QuartzTrigger, _} import org.quartz.{Trigger => QuartzTrigger, _}
@ -13,8 +11,21 @@ import scala.collection.mutable.{ArrayBuffer, Map => MMap}
/** /**
* Created by bluejoe on 2018/5/2. * Created by bluejoe on 2018/5/2.
*/ */
class Flow {
val triggers = ArrayBuffer[Trigger](); trait Flow {
def getProcessNames(): Seq[String];
def addProcess(name: String, process: Process): Flow;
def addTrigger(processName: String, trigger: Trigger): Flow;
def getProcess(name: String): Process;
def getTriggers(processName: String): Seq[Trigger];
}
class FlowImpl extends Flow {
val triggers = MMap[String, ArrayBuffer[Trigger]]();
val processes = MMap[String, Process](); val processes = MMap[String, Process]();
def addProcess(name: String, process: Process) = { def addProcess(name: String, process: Process) = {
@ -22,21 +33,22 @@ class Flow {
this; this;
}; };
def addTrigger(trigger: Trigger) = { def addTrigger(processName: String, trigger: Trigger) = {
triggers += trigger; val processTriggers = triggers.getOrElseUpdate(processName, ArrayBuffer[Trigger]());
processTriggers += trigger;
this; this;
} }
def getProcess(name: String) = processes(name); def getProcess(name: String) = processes(name);
def getTriggers() = triggers.toSeq; def getTriggers(processName: String) = triggers.getOrElse(processName, ArrayBuffer[Trigger]()).toSeq;
override def getProcessNames(): Seq[String] = processes.map(_._1).toSeq;
} }
object Runner { object Runner {
val idgen = new AtomicInteger();
def run(flow: Flow, args: Map[String, Any] = Map()): FlowExecution = { def run(flow: Flow, args: Map[String, Any] = Map()): FlowExecution = {
new FlowExecutionImpl("" + idgen.incrementAndGet(), flow, args); new FlowExecutionImpl(flow, args);
} }
} }
@ -47,8 +59,6 @@ trait FlowExecution {
def start(starts: String*); def start(starts: String*);
def getContext(): ExecutionContext;
def getFlow(): Flow; def getFlow(): Flow;
def stop(); def stop();
@ -56,10 +66,6 @@ trait FlowExecution {
def getRunningProcesses(): Seq[(String, String)]; def getRunningProcesses(): Seq[(String, String)];
} }
trait Process {
def run(pc: ProcessContext);
}
trait Context { trait Context {
def get(key: String): Any; def get(key: String): Any;
@ -73,32 +79,23 @@ trait Context {
put(m.runtimeClass.getName, value); put(m.runtimeClass.getName, value);
} }
trait ExecutionContext extends Context with EventEmiter { trait FlowExecutionContext extends Context with EventEmiter {
def getFlow(): Flow; def getFlow(): Flow;
def runProcess(processName: String): ProcessExecution;
def getFlowExecution(): FlowExecution;
def scheduleProcessRepeatly(cronExpr: String, processName: String): Unit; def scheduleProcessRepeatly(cronExpr: String, processName: String): Unit;
} }
class ProcessContext(executionContext: ExecutionContext) extends Context { class FlowExecutionImpl(flow: Flow, args: Map[String, Any])
val context = MMap[String, Any]();
override def get(key: String): Any = {
if (context.contains(key))
context(key);
else
executionContext.get(key);
};
override def put(key: String, value: Any): this.type = {
context(key) = value;
this;
};
def getExecutionContext(): ExecutionContext = executionContext;
}
class FlowExecutionImpl(id: String, flow: Flow, args: Map[String, Any])
extends FlowExecution with Logging { extends FlowExecution with Logging {
val id = "flow_excution_" + IdGenerator.getNextId[FlowExecution];
val execution = this;
val executionContext = createContext();
def start(starts: String*): Unit = { def start(starts: String*): Unit = {
//set context //set context
args.foreach { (en) => args.foreach { (en) =>
@ -106,7 +103,11 @@ class FlowExecutionImpl(id: String, flow: Flow, args: Map[String, Any])
}; };
//activates all triggers //activates all triggers
triggers.foreach(_.activate(executionContext)); flow.getProcessNames().foreach { name =>
flow.getTriggers(name).foreach { trigger =>
trigger.activate(name, executionContext);
}
}
quartzScheduler.start(); quartzScheduler.start();
@ -128,53 +129,60 @@ class FlowExecutionImpl(id: String, flow: Flow, args: Map[String, Any])
}; };
} }
val executionContext = new EventEmiterImpl() with ExecutionContext { private def createContext(): FlowExecutionContext = {
//listens on LaunchProcess new EventEmiterImpl() with FlowExecutionContext {
this.on(LaunchProcess(), new EventHandler() { //listens on LaunchProcess
override def handle(event: Event, args: Any): Unit = { this.on(LaunchProcess(), new EventHandler() {
scheduleProcess(args.asInstanceOf[String]); override def handle(event: Event, args: Any): Unit = {
scheduleProcess(args.asInstanceOf[String]);
}
});
val context = MMap[String, Any]();
def get(key: String): Any = context(key);
def runProcess(processName: String): ProcessExecution = {
new ProcessExecutionImpl(processName, flow.getProcess(processName), executionContext);
} }
});
val context = MMap[String, Any](); def put(key: String, value: Any) = {
context(key) = value;
this;
}
def get(key: String): Any = context(key); private def _scheduleProcess(processName: String, scheduleBuilder: Option[ScheduleBuilder[_]] = None): Unit = {
val quartzTriggerBuilder = TriggerBuilder.newTrigger().startNow();
if (scheduleBuilder.isDefined) {
quartzTriggerBuilder.withSchedule(scheduleBuilder.get)
};
def put(key: String, value: Any) = { val quartzTrigger = quartzTriggerBuilder.build();
context(key) = value;
this;
}
private def _scheduleProcess(processName: String, scheduleBuilder: Option[ScheduleBuilder[_]] = None): Unit = { val quartzJob = JobBuilder.newJob(classOf[ProcessAsQuartzJob])
val quartzTriggerBuilder = TriggerBuilder.newTrigger().startNow(); .usingJobData("processName", processName)
if (scheduleBuilder.isDefined) { .build();
quartzTriggerBuilder.withSchedule(scheduleBuilder.get)
};
val quartzTrigger = quartzTriggerBuilder.build(); logger.debug(s"scheduled process: $processName");
quartzScheduler.scheduleJob(quartzJob, quartzTrigger);
}
val quartzJob = JobBuilder.newJob(classOf[ProcessAsQuartzJob]) def scheduleProcess(processName: String): Unit = {
.usingJobData("processName", processName) _scheduleProcess(processName);
.build(); }
logger.debug(s"scheduled process: $processName"); override def scheduleProcessRepeatly(processName: String, cronExpr: String): Unit = {
quartzScheduler.scheduleJob(quartzJob, quartzTrigger); _scheduleProcess(processName, Some(CronScheduleBuilder.cronSchedule(cronExpr)));
} }
def scheduleProcess(processName: String): Unit = { override def getFlow(): Flow = flow;
_scheduleProcess(processName);
}
override def scheduleProcessRepeatly(processName: String, cronExpr: String): Unit = { override def getFlowExecution(): FlowExecution = execution;
_scheduleProcess(processName, Some(CronScheduleBuilder.cronSchedule(cronExpr))); };
} }
override def getFlow(): Flow = flow;
};
val quartzScheduler = StdSchedulerFactory.getDefaultScheduler(); val quartzScheduler = StdSchedulerFactory.getDefaultScheduler();
quartzScheduler.getContext.put("executionContext", executionContext); quartzScheduler.getContext.put("executionContext", executionContext);
val triggers = flow.getTriggers();
val listeners = ArrayBuffer[FlowExecutionListener](); val listeners = ArrayBuffer[FlowExecutionListener]();
quartzScheduler.getListenerManager.addTriggerListener(new TriggerListener { quartzScheduler.getListenerManager.addTriggerListener(new TriggerListener {
@ -208,8 +216,6 @@ class FlowExecutionImpl(id: String, flow: Flow, args: Map[String, Any])
} }
}); });
override def getContext() = executionContext;
override def getId(): String = id; override def getId(): String = id;
override def addListener(listener: FlowExecutionListener): Unit = override def addListener(listener: FlowExecutionListener): Unit =
@ -217,21 +223,3 @@ class FlowExecutionImpl(id: String, flow: Flow, args: Map[String, Any])
override def getFlow(): Flow = flow; override def getFlow(): Flow = flow;
} }
class ProcessAsQuartzJob extends Job with Logging {
override def execute(context: JobExecutionContext): Unit = {
val map = context.getJobDetail.getJobDataMap;
val processName = map.get("processName").asInstanceOf[String];
val executionContext = context.getScheduler.getContext.get("executionContext").asInstanceOf[ExecutionContext];
try {
executionContext.getFlow().getProcess(processName)
.run(new ProcessContext(executionContext));
context.setResult(true);
}
catch {
case e: Throwable =>
e.printStackTrace();
throw new JobExecutionException(s"failed to execute process: $processName", e);
}
}
}

View File

@ -1,20 +1,20 @@
package cn.piflow.io package cn.piflow.io
import cn.piflow.{DataSink, ProcessContext, _} import cn.piflow.{DataSink, ProcessExecutionContext, _}
import org.apache.spark.sql._ import org.apache.spark.sql._
case class Console(nlimit: Int = 20) extends DataSink { case class Console(nlimit: Int = 20) extends DataSink {
override def save(data: DataFrame, ctx: ProcessContext): Unit = { override def save(data: DataFrame, ctx: ProcessExecutionContext): Unit = {
data.show(nlimit); data.show(nlimit);
} }
} }
case class TextFile(path: String, format: String = FileFormat.TEXT) extends DataSource with DataSink { case class TextFile(path: String, format: String = FileFormat.TEXT) extends DataSource with DataSink {
override def load(ctx: ProcessContext): DataFrame = { override def load(ctx: ProcessExecutionContext): DataFrame = {
ctx.get[SparkSession].read.format(format).load(path).asInstanceOf[DataFrame]; ctx.get[SparkSession].read.format(format).load(path).asInstanceOf[DataFrame];
} }
override def save(data: DataFrame, ctx: ProcessContext): Unit = { override def save(data: DataFrame, ctx: ProcessExecutionContext): Unit = {
data.write.format(format).save(path); data.write.format(format).save(path);
} }
} }

View File

@ -0,0 +1,219 @@
package cn.piflow
import cn.piflow.util.{IdGenerator, Logging}
import org.quartz._
import scala.collection.mutable.{ArrayBuffer, Map => MMap}
trait Process {
def onPrepare(pec: ProcessExecutionContext): Unit;
def onCommit(pec: ProcessExecutionContext): Unit;
def onRollback(pec: ProcessExecutionContext): Unit;
def onFail(errorStage: ProcessStage, cause: Throwable, pec: ProcessExecutionContext): Unit;
}
abstract class LazyProcess extends Process with Logging {
def onPrepare(pec: ProcessExecutionContext): Unit = {
logger.warn(s"onPrepare={}, process: $this");
}
def onCommit(pec: ProcessExecutionContext): Unit;
def onRollback(pec: ProcessExecutionContext): Unit = {
logger.warn(s"onRollback={}, process: $this");
}
def onFail(errorStage: ProcessStage, cause: Throwable, pec: ProcessExecutionContext): Unit = {}
}
//TODO: one ProcessExecution with multiple RUNs
trait ProcessExecution {
def getId(): String;
def start();
def getProcessName(): String;
def getProcess(): Process;
def getStage(): ProcessStage;
def handleError(jee: JobExecutionException): Unit;
}
trait ProcessExecutionContext extends Context {
def getProcessExecution(): ProcessExecution;
def setStage(stage: ProcessStage): Unit;
def getStage(): ProcessStage;
def setErrorHandler(handler: ErrorHandler): Unit;
}
class ProcessExecutionContextImpl(processExecution: ProcessExecution, executionContext: FlowExecutionContext)
extends ProcessExecutionContext {
val stages = ArrayBuffer[ProcessStage]();
var errorHandler: ErrorHandler = Noop();
def setStage(stage: ProcessStage) = stages += stage;
val context = MMap[String, Any]();
def getProcessExecution() = processExecution;
def getStage(): ProcessStage = stages.last;
override def get(key: String): Any = {
if (context.contains(key))
context(key);
else
executionContext.get(key);
};
override def put(key: String, value: Any): this.type = {
context(key) = value;
this;
};
override def setErrorHandler(handler: ErrorHandler): Unit = errorHandler = handler;
}
class ProcessAsQuartzJob extends Job with Logging {
override def execute(context: JobExecutionContext): Unit = {
val map = context.getJobDetail.getJobDataMap;
val processName = map.get("processName").asInstanceOf[String];
val executionContext = context.getScheduler.getContext.get("executionContext").asInstanceOf[FlowExecutionContext];
val pe = executionContext.runProcess(processName);
try {
pe.start();
context.setResult(true);
}
catch {
case e => {
val jee = new JobExecutionException(s"failed to execute process: $processName", e);
logger.error {
val stage = pe.getStage();
s"failed to execute process: $processName, stage: $stage, cause: $e"
};
pe.handleError(jee);
throw jee;
};
}
}
}
class ProcessExecutionImpl(processName: String, process: Process, executionContext: FlowExecutionContext)
extends ProcessExecution with Logging {
val id = "process_excution_" + IdGenerator.getNextId[ProcessExecution];
val processExecutionContext = createContext();
override def getId(): String = id;
override def start(): Unit = {
try {
processExecutionContext.setStage(PrepareStart());
process.onPrepare(processExecutionContext);
processExecutionContext.setStage(PrepareComplete());
}
catch {
case e =>
try {
logger.warn(s"onPrepare() failed: $e");
processExecutionContext.setStage(RollbackStart());
process.onRollback(processExecutionContext);
processExecutionContext.setStage(RollbackComplete());
throw e;
}
catch {
case e =>
logger.warn(s"onRollback() failed: $e");
process.onFail(RollbackStart(), e, processExecutionContext);
throw e;
}
}
try {
processExecutionContext.setStage(CommitStart());
process.onCommit(processExecutionContext);
processExecutionContext.setStage(CommitComplete());
}
catch {
case e =>
logger.warn(s"onCommit() failed: $e");
process.onFail(CommitStart(), e, processExecutionContext);
throw e;
}
}
private def createContext() =
new ProcessExecutionContextImpl(this, executionContext);
override def getProcessName(): String = processName;
override def getProcess(): Process = process;
override def handleError(jee: JobExecutionException): Unit = processExecutionContext.errorHandler.handle(jee);
override def getStage(): ProcessStage = processExecutionContext.getStage();
}
trait ErrorHandler {
def handle(jee: JobExecutionException);
}
case class Noop() extends ErrorHandler {
def handle(jee: JobExecutionException): Unit = {
}
}
case class Retry() extends ErrorHandler {
def handle(jee: JobExecutionException): Unit = {
jee.setRefireImmediately(true);
}
}
case class Abort() extends ErrorHandler {
def handle(jee: JobExecutionException): Unit = {
jee.setUnscheduleFiringTrigger(true);
}
}
case class Fail() extends ErrorHandler {
def handle(jee: JobExecutionException): Unit = {
jee.setUnscheduleAllTriggers(true);
}
}
trait ProcessStage {
def getName(): String;
}
case class PrepareStart() extends ProcessStage {
def getName(): String = this.getClass.getSimpleName;
}
case class PrepareComplete() extends ProcessStage {
def getName(): String = this.getClass.getSimpleName;
}
case class CommitStart() extends ProcessStage {
def getName(): String = this.getClass.getSimpleName;
}
case class CommitComplete() extends ProcessStage {
def getName(): String = this.getClass.getSimpleName;
}
case class RollbackStart() extends ProcessStage {
def getName(): String = this.getClass.getSimpleName;
}
case class RollbackComplete() extends ProcessStage {
def getName(): String = this.getClass.getSimpleName;
}

View File

@ -3,9 +3,7 @@
*/ */
package cn.piflow package cn.piflow
import java.util.concurrent.atomic.AtomicInteger import cn.piflow.util.{IdGenerator, Logging}
import cn.piflow.util.Logging
import org.apache.spark.sql._ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructType
@ -13,28 +11,39 @@ import org.apache.spark.sql.types.StructType
import scala.collection.JavaConversions import scala.collection.JavaConversions
import scala.collection.mutable.{ArrayBuffer, Map => MMap} import scala.collection.mutable.{ArrayBuffer, Map => MMap}
class SparkETLProcess extends Process with Logging { class SparkProcess extends Process with Logging {
val ends = ArrayBuffer[(ProcessContext) => Unit](); val ends = ArrayBuffer[(ProcessExecutionContext) => Unit]();
val idgen = new AtomicInteger();
def onPrepare(pec: ProcessExecutionContext) = {
ends.foreach(_.apply(pec));
}
override def onCommit(pec: ProcessExecutionContext): Unit = {
}
override def onRollback(pec: ProcessExecutionContext): Unit = {
}
override def onFail(errorStage: ProcessStage, cause: Throwable, pec: ProcessExecutionContext): Unit = {
override def run(pc: ProcessContext): Unit = {
ends.foreach(_.apply(pc));
} }
abstract class CachedStream extends Stream { abstract class CachedStream extends Stream {
val id = idgen.incrementAndGet(); val id = "" + IdGenerator.getNextId[Stream];
val context = MMap[String, Any](); val context = MMap[String, Any]();
var cache: Option[DataFrame] = None; var cache: Option[DataFrame] = None;
override def getId(): Int = id; override def getId(): String = id;
def put(key: String, value: Any) = context(key) = value; def put(key: String, value: Any) = context(key) = value;
override def get(key: String): Any = context.get(key); override def get(key: String): Any = context.get(key);
def produce(ctx: ProcessContext): DataFrame; def produce(ctx: ProcessExecutionContext): DataFrame;
override def feed(ctx: ProcessContext): DataFrame = { override def feed(ctx: ProcessExecutionContext): DataFrame = {
if (!cache.isDefined) { if (!cache.isDefined) {
cache = Some(produce(ctx)); cache = Some(produce(ctx));
} }
@ -44,7 +53,7 @@ class SparkETLProcess extends Process with Logging {
def loadStream(streamSource: DataSource): Stream = { def loadStream(streamSource: DataSource): Stream = {
return new CachedStream() { return new CachedStream() {
override def produce(ctx: ProcessContext): DataFrame = { override def produce(ctx: ProcessExecutionContext): DataFrame = {
logger.debug { logger.debug {
val oid = this.getId(); val oid = this.getId();
s"loading stream[_->$oid], source: $streamSource"; s"loading stream[_->$oid], source: $streamSource";
@ -57,7 +66,7 @@ class SparkETLProcess extends Process with Logging {
def writeStream(streamSink: DataSink, stream: Stream): Unit = { def writeStream(streamSink: DataSink, stream: Stream): Unit = {
ends += { ends += {
(ctx: ProcessContext) => { (ctx: ProcessExecutionContext) => {
val input = stream.feed(ctx); val input = stream.feed(ctx);
logger.debug { logger.debug {
val schema = input.schema; val schema = input.schema;
@ -76,7 +85,7 @@ class SparkETLProcess extends Process with Logging {
def transform(transformer: DataTransformer, streams: Map[String, Stream]): Stream = { def transform(transformer: DataTransformer, streams: Map[String, Stream]): Stream = {
return new CachedStream() { return new CachedStream() {
override def produce(ctx: ProcessContext): DataFrame = { override def produce(ctx: ProcessExecutionContext): DataFrame = {
val inputs = streams.map(x => (x._1, x._2.feed(ctx))); val inputs = streams.map(x => (x._1, x._2.feed(ctx)));
logger.debug { logger.debug {
val schemas = inputs.map(_._2.schema); val schemas = inputs.map(_._2.schema);
@ -92,33 +101,33 @@ class SparkETLProcess extends Process with Logging {
} }
trait Stream { trait Stream {
def getId(): Int; def getId(): String;
def feed(ctx: ProcessContext): DataFrame; def feed(ctx: ProcessExecutionContext): DataFrame;
def get(key: String): Any; def get(key: String): Any;
} }
trait DataSource { trait DataSource {
def load(ctx: ProcessContext): DataFrame; def load(ctx: ProcessExecutionContext): DataFrame;
} }
trait DataTransformer { trait DataTransformer {
def transform(data: Map[String, DataFrame], ctx: ProcessContext): DataFrame; def transform(data: Map[String, DataFrame], ctx: ProcessExecutionContext): DataFrame;
} }
trait DataTransformer1N1 extends DataTransformer { trait DataTransformer1N1 extends DataTransformer {
def transform(data: DataFrame, ctx: ProcessContext): DataFrame; def transform(data: DataFrame, ctx: ProcessExecutionContext): DataFrame;
def transform(dataset: Map[String, DataFrame], ctx: ProcessContext): DataFrame = { def transform(dataset: Map[String, DataFrame], ctx: ProcessExecutionContext): DataFrame = {
val first = dataset.head; val first = dataset.head;
transform(first._2, ctx); transform(first._2, ctx);
} }
} }
trait DataSink { trait DataSink {
def save(data: DataFrame, ctx: ProcessContext): Unit; def save(data: DataFrame, ctx: ProcessExecutionContext): Unit;
} }
trait FunctionLogic { trait FunctionLogic {
@ -126,7 +135,7 @@ trait FunctionLogic {
} }
case class DoMap(func: FunctionLogic, targetSchema: StructType = null) extends DataTransformer1N1 { case class DoMap(func: FunctionLogic, targetSchema: StructType = null) extends DataTransformer1N1 {
def transform(data: DataFrame, ctx: ProcessContext): DataFrame = { def transform(data: DataFrame, ctx: ProcessExecutionContext): DataFrame = {
val encoder = RowEncoder { val encoder = RowEncoder {
if (targetSchema == null) { if (targetSchema == null) {
data.schema; data.schema;
@ -141,7 +150,7 @@ case class DoMap(func: FunctionLogic, targetSchema: StructType = null) extends D
} }
case class DoFlatMap(func: FunctionLogic, targetSchema: StructType = null) extends DataTransformer1N1 { case class DoFlatMap(func: FunctionLogic, targetSchema: StructType = null) extends DataTransformer1N1 {
def transform(data: DataFrame, ctx: ProcessContext): DataFrame = { def transform(data: DataFrame, ctx: ProcessExecutionContext): DataFrame = {
val encoder = RowEncoder { val encoder = RowEncoder {
if (targetSchema == null) { if (targetSchema == null) {
data.schema; data.schema;
@ -157,7 +166,7 @@ case class DoFlatMap(func: FunctionLogic, targetSchema: StructType = null) exten
} }
case class ExecuteSQL(sql: String) extends DataTransformer with Logging { case class ExecuteSQL(sql: String) extends DataTransformer with Logging {
def transform(dataset: Map[String, DataFrame], ctx: ProcessContext): DataFrame = { def transform(dataset: Map[String, DataFrame], ctx: ProcessExecutionContext): DataFrame = {
dataset.foreach { x => dataset.foreach { x =>
val tableName = "table_" + x._1; val tableName = "table_" + x._1;
@ -171,11 +180,11 @@ case class ExecuteSQL(sql: String) extends DataTransformer with Logging {
} }
catch { catch {
case e: Throwable => case e: Throwable =>
throw new SqlExecutionErrorException(e, sql); throw new SqlExecutionErrorException(sql, e);
} }
} }
} }
class SqlExecutionErrorException(cause: Throwable, sql: String) class SqlExecutionErrorException(sql: String, cause: Throwable)
extends RuntimeException(s"sql execution error, sql: $sql", cause) { extends RuntimeException(s"sql execution error, sql: $sql", cause) {
} }

View File

@ -3,72 +3,64 @@ package cn.piflow
import scala.collection.mutable.{Map => MMap} import scala.collection.mutable.{Map => MMap}
trait Trigger { trait Trigger {
def activate(context: ExecutionContext): Unit; /**
* start current trigger on given process
def getTriggeredProcesses(): Seq[String]; *
* @param processName
* @param context
*/
def activate(processName: String, context: FlowExecutionContext): Unit;
} }
/** /**
* start process while dependent processes completed * start process while dependent processes completed
*/ */
object DependencyTrigger { class DependencyTrigger(dependentProcesses: String*) extends Trigger {
def declareDependency(processName: String, dependentProcesses: String*): Trigger = new Trigger() { override def activate(processName: String, executionContext: FlowExecutionContext): Unit = {
override def activate(executionContext: ExecutionContext): Unit = { val listener = new EventHandler {
val listener = new EventHandler { val completed = MMap[String, Boolean]();
val completed = MMap[String, Boolean](); dependentProcesses.foreach { processName =>
dependentProcesses.foreach { processName => completed(processName) = false;
completed(processName) = false;
};
def handle(event: Event, args: Any) {
completed(event.asInstanceOf[ProcessCompleted].processName) = true;
if (completed.values.filter(!_).isEmpty) {
completed.clear();
executionContext.fire(LaunchProcess(), processName);
}
}
}; };
dependentProcesses.foreach { dependency => def handle(event: Event, args: Any) {
executionContext.on(ProcessCompleted(dependency), listener); completed(event.asInstanceOf[ProcessCompleted].processName) = true;
}
}
override def getTriggeredProcesses(): Seq[String] = Seq(processName); if (completed.values.filter(!_).isEmpty) {
completed.clear();
executionContext.fire(LaunchProcess(), processName);
}
}
};
dependentProcesses.foreach { dependency =>
executionContext.on(ProcessCompleted(dependency), listener);
}
} }
} }
/** /**
* start processes repeatedly * start processes repeatedly
*/ */
object TimerTrigger { class TimerTrigger(cronExpr: String, processNames: String*) extends Trigger {
def cron(cronExpr: String, processNames: String*): Trigger = new Trigger() { override def activate(processName: String, executionContext: FlowExecutionContext): Unit = {
override def activate(executionContext: ExecutionContext): Unit = { processNames.foreach { processName =>
processNames.foreach { processName => executionContext.scheduleProcessRepeatly(processName, cronExpr);
executionContext.scheduleProcessRepeatly(processName, cronExpr);
}
} }
override def getTriggeredProcesses(): Seq[String] = processNames;
} }
} }
/** /**
* start processes while Events happen * start processes while Events happen
*/ */
object EventTrigger { class EventTrigger(event: Event, processNames: String*) extends Trigger {
def listen(event: Event, processNames: String*): Trigger = new Trigger() { override def activate(processName: String, executionContext: FlowExecutionContext): Unit = {
override def activate(executionContext: ExecutionContext): Unit = { processNames.foreach { processName =>
processNames.foreach { processName => executionContext.on(event, new EventHandler() {
executionContext.on(event, new EventHandler() { override def handle(event: Event, args: Any): Unit = {
override def handle(event: Event, args: Any): Unit = { executionContext.fire(LaunchProcess(), processName);
executionContext.fire(LaunchProcess(), processName); }
} });
});
}
} }
override def getTriggeredProcesses(): Seq[String] = processNames;
} }
} }

View File

@ -0,0 +1,20 @@
package cn.piflow.util
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable.{Map => MMap}
trait IdGenerator {
def generateId(): Int;
}
object IdGenerator {
val map = MMap[String, IdGenerator]();
def getNextId[T](implicit manifest: Manifest[T]) = map.getOrElseUpdate(manifest.runtimeClass.getName,
new IdGenerator() {
val ai = new AtomicInteger();
def generateId(): Int = ai.incrementAndGet();
})
}

View File

@ -0,0 +1,34 @@
import cn.piflow._
import org.junit.{Assert, Test}
class FlowElementManagerTest {
@Test
def testRAM(): Unit = {
_testMgr(new InMemoryFlowElementManager().asInstanceOf[FlowElementManager]);
}
@Test
def testSql(): Unit = {
_testMgr(new SqlFlowElementManager().asInstanceOf[FlowElementManager]);
}
@Test
def testDir(): Unit = {
_testMgr(new FileSystemFlowElementManager().asInstanceOf[FlowElementManager]);
}
def _testMgr(man: FlowElementManager): Unit = {
//clear all first
man.list().foreach(x => man.delete(x._1));
Assert.assertEquals(0, man.list().size);
man.add("test", new FlowElement());
Assert.assertEquals(1, man.list().size);
Assert.assertEquals("test", man.list().head._1);
Assert.assertNotNull(man.get("test"));
Assert.assertNull(man.get("test2"));
man.delete("test");
Assert.assertEquals(0, man.list().size);
}
}

View File

@ -0,0 +1,15 @@
import java.io.File
import cn.piflow.FlowElement
import org.junit.Test
class FlowElementTest {
@Test
def test1(): Unit = {
val f = new File("./test1.json");
val flowJson = new FlowElement();
FlowElement.saveFile(flowJson, f);
val flowJson2 = FlowElement.fromFile(f);
}
}

View File

@ -2,21 +2,22 @@ import java.io.{File, FileInputStream, FileOutputStream}
import java.util.Date import java.util.Date
import cn.piflow._ import cn.piflow._
import cn.piflow.io.{Console, FileFormat, TextFile}
import org.apache.commons.io.{FileUtils, IOUtils} import org.apache.commons.io.{FileUtils, IOUtils}
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSession
import org.junit.Test import org.junit.Test
class FlowTest { class FlowTest {
private def runFlow(processes: Map[String, Process]) { private def runFlow(processes: Map[String, Process]) {
val flow = new Flow(); val flow: Flow = new FlowImpl();
processes.foreach(en => flow.addProcess(en._1, en._2)); processes.foreach(en => flow.addProcess(en._1, en._2));
flow.addProcess("PrintMessage", new PrintMessage()); flow.addProcess("PrintMessage", new PrintMessage());
flow.addTrigger(DependencyTrigger.declareDependency("CopyTextFile", "CleanHouse")); flow.addTrigger("CopyTextFile", new DependencyTrigger("CleanHouse"));
flow.addTrigger(DependencyTrigger.declareDependency("CountWords", "CopyTextFile")); flow.addTrigger("CountWords", new DependencyTrigger("CopyTextFile"));
flow.addTrigger(DependencyTrigger.declareDependency("PrintCount", "CountWords")); flow.addTrigger("PrintCount", new DependencyTrigger("CountWords"));
flow.addTrigger(TimerTrigger.cron("0/5 * * * * ? ", "PrintMessage")); flow.addTrigger("PrintMessage", new TimerTrigger("0/5 * * * * ? "));
val spark = SparkSession.builder.master("local[4]") val spark = SparkSession.builder.master("local[4]")
.getOrCreate(); .getOrCreate();
@ -41,26 +42,73 @@ class FlowTest {
runFlow(Map( runFlow(Map(
"CleanHouse" -> new CleanHouse(), "CleanHouse" -> new CleanHouse(),
"CopyTextFile" -> new Process() { "CopyTextFile" -> new Process() {
def run(pc: ProcessContext): Unit = { override def onPrepare(pec: ProcessExecutionContext): Unit =
throw new RuntimeException("this is a bad process!"); throw new RuntimeException("this is a bad process!");
}
override def onRollback(pec: ProcessExecutionContext): Unit = ???
override def onFail(errorStage: ProcessStage, cause: Throwable, pec: ProcessExecutionContext): Unit = ???
override def onCommit(pec: ProcessExecutionContext): Unit = ???
}, },
"CountWords" -> new CountWords(), "CountWords" -> new CountWords(),
"PrintCount" -> new PrintCount())); "PrintCount" -> new PrintCount()));
} }
@Test @Test
def test2() { def testSparkProcess() {
runFlow(Map( runFlow(Map(
"CleanHouse" -> new CleanHouse(), "CleanHouse" -> new CleanHouse(),
"CopyTextFile" -> new CopyTextFile(), "CopyTextFile" -> new CopyTextFile(),
"CountWords" -> SparkETLTest.createProcessCountWords(), "CountWords" -> createProcessCountWords(),
"PrintCount" -> SparkETLTest.createProcessPrintCount())); "PrintCount" -> createProcessPrintCount()));
}
val SCRIPT_1 =
"""
function (row) {
return $.Row(row.get(0).replaceAll("[\\x00-\\xff]||。|||“|”||| ", ""));
}
""";
val SCRIPT_2 =
"""
function (row) {
var arr = $.Array();
var str = row.get(0);
var len = str.length;
for (var i = 0; i < len - 1; i++) {
arr.add($.Row(str.substring(i, i + 2)));
}
return arr;
}
""";
def createProcessCountWords() = {
val processCountWords = new SparkProcess();
val s1 = processCountWords.loadStream(TextFile("./out/honglou.txt", FileFormat.TEXT));
val s2 = processCountWords.transform(DoMap(ScriptEngine.logic(SCRIPT_1)), s1);
val s3 = processCountWords.transform(DoFlatMap(ScriptEngine.logic(SCRIPT_2)), s2);
val s4 = processCountWords.transform(ExecuteSQL(
"select value, count(*) count from table_0 group by value order by count desc"), s3);
processCountWords.writeStream(TextFile("./out/wordcount", FileFormat.JSON), s4);
processCountWords;
}
def createProcessPrintCount() = {
val processPrintCount = new SparkProcess();
val s1 = processPrintCount.loadStream(TextFile("./out/wordcount", FileFormat.JSON));
val s2 = processPrintCount.transform(ExecuteSQL(
"select value from table_0 order by count desc"), s1);
processPrintCount.writeStream(Console(40), s2);
processPrintCount;
} }
} }
class CountWords extends Process { class CountWords extends LazyProcess {
def run(pc: ProcessContext): Unit = { override def onPrepare(pec: ProcessExecutionContext): Unit = {
val spark = SparkSession.builder.master("local[4]") val spark = SparkSession.builder.master("local[4]")
.getOrCreate(); .getOrCreate();
import spark.implicits._ import spark.implicits._
@ -69,34 +117,55 @@ class CountWords extends Process {
.flatMap(s => s.zip(s.drop(1)).map(t => "" + t._1 + t._2)) .flatMap(s => s.zip(s.drop(1)).map(t => "" + t._1 + t._2))
.groupBy("value").count.sort($"count".desc); .groupBy("value").count.sort($"count".desc);
count.write.json("./out/wordcount"); val tmpfile = File.createTempFile(this.getClass.getSimpleName, "");
pec.put("tmpfile", tmpfile);
count.write.json(tmpfile.getAbsolutePath);
spark.close(); spark.close();
} }
override def onCommit(pec: ProcessExecutionContext): Unit = {
pec.get("tmpfile").asInstanceOf[File].renameTo(new File("./out/wordcount"));
}
override def onRollback(pec: ProcessExecutionContext): Unit = {
pec.get("tmpfile").asInstanceOf[File].delete();
}
} }
class PrintMessage extends Process { class PrintMessage extends LazyProcess {
def run(pc: ProcessContext): Unit = { def onCommit(pc: ProcessExecutionContext): Unit = {
println("*****hello******" + new Date()); println("*****hello******" + new Date());
} }
} }
class CleanHouse extends Process { class CleanHouse extends LazyProcess {
def run(pc: ProcessContext): Unit = { def onCommit(pc: ProcessExecutionContext): Unit = {
FileUtils.deleteDirectory(new File("./out/wordcount")); FileUtils.deleteDirectory(new File("./out/wordcount"));
FileUtils.deleteQuietly(new File("./out/honglou.txt")); FileUtils.deleteQuietly(new File("./out/honglou.txt"));
} }
} }
class CopyTextFile extends Process { class CopyTextFile extends LazyProcess {
def run(pc: ProcessContext): Unit = { override def onPrepare(pec: ProcessExecutionContext): Unit = {
val is = new FileInputStream(new File("/Users/bluejoe/testdata/honglou.txt")); val is = new FileInputStream(new File("/Users/bluejoe/testdata/honglou.txt"));
val os = new FileOutputStream(new File("./out/honglou.txt")); val tmpfile = File.createTempFile(this.getClass.getSimpleName, "");
pec.put("tmpfile", tmpfile);
val os = new FileOutputStream(tmpfile);
IOUtils.copy(is, os); IOUtils.copy(is, os);
} }
override def onCommit(pec: ProcessExecutionContext): Unit = {
pec.get("tmpfile").asInstanceOf[File].renameTo(new File("./out/honglou.txt"));
}
override def onRollback(pec: ProcessExecutionContext): Unit = {
pec.get("tmpfile").asInstanceOf[File].delete();
}
} }
class PrintCount extends Process { class PrintCount extends LazyProcess {
def run(pc: ProcessContext): Unit = { def onCommit(pc: ProcessExecutionContext): Unit = {
val spark = SparkSession.builder.master("local[4]") val spark = SparkSession.builder.master("local[4]")
.getOrCreate(); .getOrCreate();
import spark.implicits._ import spark.implicits._

View File

@ -1,69 +0,0 @@
/**
* Created by bluejoe on 2018/5/6.
*/
import java.io.File
import cn.piflow._
import cn.piflow.io.{Console, FileFormat, TextFile}
import org.apache.commons.io.FileUtils
import org.apache.spark.sql.SparkSession
import org.junit.Test
object SparkETLTest {
val SCRIPT_1 =
"""
function (row) {
return $.Row(row.get(0).replaceAll("[\\x00-\\xff]||。|||“|”||| ", ""));
}
""";
val SCRIPT_2 =
"""
function (row) {
var arr = $.Array();
var str = row.get(0);
var len = str.length;
for (var i = 0; i < len - 1; i++) {
arr.add($.Row(str.substring(i, i + 2)));
}
return arr;
}
""";
def createProcessCountWords() = {
val processCountWords = new SparkETLProcess();
val s1 = processCountWords.loadStream(TextFile("./out/honglou.txt", FileFormat.TEXT));
val s2 = processCountWords.transform(DoMap(ScriptEngine.logic(SCRIPT_1)), s1);
val s3 = processCountWords.transform(DoFlatMap(ScriptEngine.logic(SCRIPT_2)), s2);
val s4 = processCountWords.transform(ExecuteSQL(
"select value, count(*) count from table_0 group by value order by count desc"), s3);
processCountWords.writeStream(TextFile("./out/wordcount", FileFormat.JSON), s4);
processCountWords;
}
def createProcessPrintCount() = {
val processPrintCount = new SparkETLProcess();
val s1 = processPrintCount.loadStream(TextFile("./out/wordcount", FileFormat.JSON));
val s2 = processPrintCount.transform(ExecuteSQL(
"select value from table_0 order by count desc"), s1);
processPrintCount.writeStream(Console(40), s2);
processPrintCount;
}
}
class SparkETLTest {
@Test
def test1(): Unit = {
FileUtils.deleteDirectory(new File("./out/wordcount"));
val ctx = new ProcessContext(null);
ctx.put[SparkSession](SparkSession.builder.master("local[4]")
.getOrCreate());
SparkETLTest.createProcessCountWords().run(ctx);
SparkETLTest.createProcessPrintCount().run(ctx);
}
}