prepare/commit of sparkprocess

This commit is contained in:
bluejoe2008@gmail.com 2018-05-13 22:31:38 +08:00
parent df3258c005
commit 1a61158ebd
16 changed files with 206 additions and 350 deletions

1
.gitignore vendored
View File

@ -5,3 +5,4 @@
/.idea/
.DS_Store
/doc/api/

View File

@ -54,5 +54,4 @@
<version>2.3.0</version>
</dependency>
</dependencies>
</project>

View File

@ -1,7 +1,10 @@
package cn.piflow
package cn.piflow.element
import java.io.File
import cn.piflow._
import cn.piflow.spark._
import scala.collection.mutable.{Map => MMap}
object FlowElement {

View File

@ -91,10 +91,12 @@ trait FlowExecutionContext extends Context with EventEmiter {
class FlowExecutionImpl(flow: Flow, args: Map[String, Any])
extends FlowExecution with Logging {
val id = "flow_excution_" + IdGenerator.getNextId[FlowExecution];
val id = "flow_excution_" + IdGenerator.nextId[FlowExecution];
val execution = this;
val executionContext = createContext();
val quartzScheduler = StdSchedulerFactory.getDefaultScheduler();
val listeners = ArrayBuffer[FlowExecutionListener]();
def start(starts: String*): Unit = {
//set context
@ -128,6 +130,45 @@ class FlowExecutionImpl(flow: Flow, args: Map[String, Any])
jec.getJobDetail.getJobDataMap.get("processName").asInstanceOf[String])
};
}
quartzScheduler.getContext.put("executionContext", executionContext);
override def getId(): String = id;
quartzScheduler.getListenerManager.addTriggerListener(new TriggerListener {
override def vetoJobExecution(trigger: QuartzTrigger, context: JobExecutionContext): Boolean = false;
override def triggerFired(trigger: QuartzTrigger, context: JobExecutionContext): Unit = {
val map = context.getJobDetail.getJobDataMap;
val processName = map.get("processName").asInstanceOf[String];
logger.debug(s"process started: $processName");
executionContext.fire(ProcessStarted(processName));
}
override def getName: String = this.getClass.getName;
override def triggerMisfired(trigger: QuartzTrigger): Unit = {}
override def triggerComplete(trigger: QuartzTrigger, context: JobExecutionContext, triggerInstructionCode: CompletedExecutionInstruction): Unit = {
val map = context.getJobDetail.getJobDataMap;
val processName = map.get("processName").asInstanceOf[String];
val result = context.getResult;
if (true == result) {
logger.debug(s"process completed: $processName");
executionContext.fire(ProcessCompleted(processName));
}
else {
logger.debug(s"process failed: $processName");
executionContext.fire(ProcessFailed(processName));
}
}
});
override def addListener(listener: FlowExecutionListener): Unit =
listeners += listener;
override def getFlow(): Flow = flow;
private def createContext(): FlowExecutionContext = {
new EventEmiterImpl() with FlowExecutionContext {
@ -180,46 +221,4 @@ class FlowExecutionImpl(flow: Flow, args: Map[String, Any])
override def getFlowExecution(): FlowExecution = execution;
};
}
val quartzScheduler = StdSchedulerFactory.getDefaultScheduler();
quartzScheduler.getContext.put("executionContext", executionContext);
val listeners = ArrayBuffer[FlowExecutionListener]();
quartzScheduler.getListenerManager.addTriggerListener(new TriggerListener {
override def vetoJobExecution(trigger: QuartzTrigger, context: JobExecutionContext): Boolean = false;
override def triggerFired(trigger: QuartzTrigger, context: JobExecutionContext): Unit = {
val map = context.getJobDetail.getJobDataMap;
val processName = map.get("processName").asInstanceOf[String];
logger.debug(s"process started: $processName");
executionContext.fire(ProcessStarted(processName));
}
override def getName: String = this.getClass.getName;
override def triggerMisfired(trigger: QuartzTrigger): Unit = {}
override def triggerComplete(trigger: QuartzTrigger, context: JobExecutionContext, triggerInstructionCode: CompletedExecutionInstruction): Unit = {
val map = context.getJobDetail.getJobDataMap;
val processName = map.get("processName").asInstanceOf[String];
val result = context.getResult;
if (true == result) {
logger.debug(s"process completed: $processName");
executionContext.fire(ProcessCompleted(processName));
}
else {
logger.debug(s"process failed: $processName");
executionContext.fire(ProcessFailed(processName));
}
}
});
override def getId(): String = id;
override def addListener(listener: FlowExecutionListener): Unit =
listeners += listener;
override def getFlow(): Flow = flow;
}

View File

@ -1,26 +0,0 @@
package cn.piflow.io
import cn.piflow.{ProcessExecutionContext, Sink, _}
import org.apache.spark.sql._
case class Console(nlimit: Int = 20) extends Sink {
override def save(data: DataFrame, ctx: ProcessExecutionContext): Unit = {
data.show(nlimit);
}
}
case class TextFile(path: String, format: String = FileFormat.TEXT) extends Source with Sink {
override def load(ctx: ProcessExecutionContext): DataFrame = {
ctx.get[SparkSession].read.format(format).load(path).asInstanceOf[DataFrame];
}
override def save(data: DataFrame, ctx: ProcessExecutionContext): Unit = {
data.write.format(format).save(path);
}
}
object FileFormat {
val TEXT = "text";
val JSON = "json";
val PARQUET = "parquet";
}

View File

@ -49,20 +49,19 @@ trait ProcessExecutionContext extends Context {
class ProcessExecutionContextImpl(processExecution: ProcessExecution, executionContext: FlowExecutionContext)
extends ProcessExecutionContext with Logging {
val stages = ArrayBuffer[ProcessStage]();
val context = MMap[String, Any]();
var errorHandler: ErrorHandler = Noop();
def getProcessExecution() = processExecution;
def getStage(): ProcessStage = stages.last;
def setStage(stage: ProcessStage) = {
val processName = processExecution.getProcessName();
logger.debug(s"stage changed: $stage, process: $processName");
stages += stage
};
val context = MMap[String, Any]();
def getProcessExecution() = processExecution;
def getStage(): ProcessStage = stages.last;
def sendError(stage: ProcessStage, cause: Throwable) {
val processName = processExecution.getProcessName();
val jee = new JobExecutionException(s"failed to execute process: $processName", cause);
@ -101,7 +100,7 @@ class ProcessAsQuartzJob extends Job with Logging {
class ProcessExecutionImpl(processName: String, process: Process, executionContext: FlowExecutionContext)
extends ProcessExecution with Logging {
val id = "process_excution_" + IdGenerator.getNextId[ProcessExecution];
val id = "process_excution_" + IdGenerator.nextId[ProcessExecution];
val processExecutionContext = createContext();
override def getId(): String = id;
@ -114,16 +113,18 @@ class ProcessExecutionImpl(processName: String, process: Process, executionConte
processExecutionContext.setStage(PrepareComplete());
}
catch {
case e =>
case e: Throwable =>
try {
//rollback()
logger.warn(s"onPrepare() failed: $e");
e.printStackTrace();
processExecutionContext.setStage(RollbackStart());
process.onRollback(processExecutionContext);
processExecutionContext.setStage(RollbackComplete());
}
catch {
case e =>
case e: Throwable =>
logger.warn(s"onRollback() failed: $e");
processExecutionContext.sendError(RollbackStart(), e);
e.printStackTrace();
@ -138,7 +139,7 @@ class ProcessExecutionImpl(processName: String, process: Process, executionConte
processExecutionContext.setStage(CommitComplete());
}
catch {
case e =>
case e: Throwable =>
logger.warn(s"onCommit() failed: $e");
processExecutionContext.sendError(CommitStart(), e);
e.printStackTrace();
@ -146,12 +147,12 @@ class ProcessExecutionImpl(processName: String, process: Process, executionConte
}
}
private def createContext() =
new ProcessExecutionContextImpl(this, executionContext);
override def getProcessName(): String = processName;
override def getProcess(): Process = process;
private def createContext() =
new ProcessExecutionContextImpl(this, executionContext);
}
trait ErrorHandler {

View File

@ -1,100 +0,0 @@
package cn.piflow
import java.util.{Map => JMap}
import javax.script.{Compilable, ScriptEngineManager}
import scala.collection.JavaConversions._
import scala.collection.immutable.StringOps
import scala.collection.mutable.{ArrayBuffer, Map => MMap}
object ScriptEngine {
val JAVASCRIPT = "javascript";
val SCALA = "scala";
val engines = Map[String, ScriptEngine](JAVASCRIPT -> new JavaScriptEngine());
def get(lang: String) = engines(lang);
def logic(script: String, lang: String = ScriptEngine.JAVASCRIPT): FunctionLogic = new FunctionLogic with Serializable {
val cached = ArrayBuffer[CompiledFunction]();
override def call(value: Any): Any = {
if (cached.isEmpty) {
try {
val engine = ScriptEngine.get(lang);
cached += engine.compile(script);
}
catch {
case e: Throwable =>
throw new ScriptExecutionException(e, script, value);
}
}
try {
cached(0).invoke(Map("value" -> value));
}
catch {
case e: Throwable =>
throw new ScriptExecutionException(e, script, value);
};
}
}
}
trait CompiledFunction {
def invoke(args: Map[String, Any] = Map[String, Any]()): Any;
}
class JavaScriptEngine extends ScriptEngine {
val engine = new ScriptEngineManager().getEngineByName("javascript");
val tools = {
val map = MMap[String, AnyRef]();
map += "$" -> Predef;
map.toMap;
}
def compile(funcText: String): CompiledFunction = {
val wrapped = s"($funcText)(value)";
new CompiledFunction() {
val compiled = engine.asInstanceOf[Compilable].compile(wrapped);
def invoke(args: Map[String, Any] = Map[String, Any]()): Any = {
val bindings = engine.createBindings();
bindings.asInstanceOf[JMap[String, Any]].putAll(tools);
bindings.asInstanceOf[JMap[String, Any]].putAll(args);
val value = compiled.eval(bindings);
value;
}
}
}
}
object Predef {
def StringOps(s: String) = new StringOps(s);
def Row(value1: Any) = _row(value1);
def Row(value1: Any, value2: Any) = _row(value1, value2);
def Row(value1: Any, value2: Any, value3: Any) = _row(value1, value2, value3);
def Row(value1: Any, value2: Any, value3: Any, value4: Any) = _row(value1, value2, value3, value4);
private def _row(values: Any*) = org.apache.spark.sql.Row(values: _*);
def Array() = new java.util.ArrayList();
}
class ScriptExecutionException(cause: Throwable, sourceScript: String, args: Any)
extends RuntimeException(s"script execution error, script: $sourceScript, args: $args", cause) {
}
class ScriptCompilationException(cause: Throwable, sourceScript: String)
extends RuntimeException(s"script compilation error, script: $sourceScript", cause) {
}
trait ScriptEngine {
def compile(funcText: String): CompiledFunction;
}

View File

@ -0,0 +1,53 @@
package cn.piflow.spark.io
import java.io.File
import cn.piflow.ProcessExecutionContext
import cn.piflow.spark._
import cn.piflow.util.Logging
import org.apache.spark.sql._
/**
* Created by bluejoe on 2018/5/13.
*/
case class TextFile(path: String, format: String = FileFormat.TEXT) extends Source with SinkWithBackup {
override def load(ctx: ProcessExecutionContext): DataFrame = {
ctx.get[SparkSession].read.format(format).load(path).asInstanceOf[DataFrame];
}
override def save(data: DataFrame, ctx: ProcessExecutionContext): Unit = {
data.write.format(format).save(path);
}
def backup(ctx: ProcessExecutionContext): Backup = new Backup() with Logging {
val backupFile = File.createTempFile(classOf[TextFile].getName.toLowerCase + "_", ".tmp",
new File(ctx.get("localBackupDir").asInstanceOf[String]));
backupFile.delete();
override def getSink(): Sink = TextFile(backupFile.getAbsolutePath, format);
override def commit(): Unit = {
logger.debug {
val src = backupFile.getAbsolutePath;
s"rename $src to $path"
};
backupFile.renameTo(new File(path));
};
override def rollback(): Unit = {
backupFile.delete();
}
}
}
object FileFormat {
val TEXT = "text";
val JSON = "json";
val PARQUET = "parquet";
}
case class Console(nlimit: Int = 20) extends Sink {
override def save(data: DataFrame, ctx: ProcessExecutionContext): Unit = {
data.show(nlimit);
}
}

View File

@ -1,11 +1,9 @@
/**
* Created by bluejoe on 2018/5/6.
*/
package cn.piflow
package cn.piflow.spark
import java.io.File
import cn.piflow.io.{Console, FileFormat, TextFile}
import cn.piflow._
import cn.piflow.util.{IdGenerator, Logging}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.encoders.RowEncoder
@ -16,92 +14,40 @@ import scala.collection.mutable.{ArrayBuffer, Map => MMap}
class SparkProcess extends Process with Logging {
trait Ops {
def perform(ctx: ProcessExecutionContext): Unit;
}
val ends = ArrayBuffer[Ops]();
trait Backup {
def replica(): Sink;
def restore(): Unit;
def clean(): Unit;
}
def createBackup(originalSink: Sink, ctx: ProcessExecutionContext): Backup = {
if (originalSink.isInstanceOf[Console]) {
new Backup() {
override def replica(): Sink = originalSink;
override def clean(): Unit = {}
override def restore(): Unit = {}
}
}
else {
new Backup() {
val backupFile = File.createTempFile(classOf[SparkProcess].getName, ".bak",
new File(ctx.get("localBackupDir").asInstanceOf[String]));
backupFile.delete();
def replica(): Sink = {
//TODO: hdfs
TextFile(backupFile.getAbsolutePath, FileFormat.PARQUET)
}
def restore(): Unit = {
originalSink.save(TextFile(backupFile.getAbsolutePath, FileFormat.PARQUET).load(ctx), ctx);
}
def clean(): Unit = {
backupFile.delete();
}
}
}
}
def onPrepare(pec: ProcessExecutionContext) = {
val backup = ArrayBuffer[Backup]();
val backups = ArrayBuffer[Backup]();
val ne = ends.map { x =>
val so = x.asInstanceOf[SaveOps];
val bu = createBackup(so.streamSink, pec);
backup += bu;
SaveOps(bu.replica(), so.stream);
val sink = so.streamSink;
val backup = sink match {
case swb: SinkWithBackup =>
swb.backup(pec);
case _ =>
new Backup() {
override def getSink(): Sink = sink;
override def rollback(): Unit = {}
override def commit(): Unit = {}
}
}
pec.put("backup", backup);
backups += backup;
SaveOps(backup.getSink(), so.stream);
}
pec.put("backups", backups);
ne.foreach(_.perform(pec));
}
override def onCommit(pec: ProcessExecutionContext): Unit = {
pec.get("backup").asInstanceOf[ArrayBuffer[Backup]].foreach(_.restore());
pec.get("backups").asInstanceOf[ArrayBuffer[Backup]].foreach(_.commit());
}
override def onRollback(pec: ProcessExecutionContext): Unit = {
pec.get("backup").asInstanceOf[ArrayBuffer[Backup]].foreach(_.clean());
}
abstract class CachedStream extends Stream {
val id = "" + IdGenerator.getNextId[Stream];
val context = MMap[String, Any]();
var cache: Option[DataFrame] = None;
override def getId(): String = id;
def put(key: String, value: Any) = context(key) = value;
override def get(key: String): Any = context.get(key);
def produce(ctx: ProcessExecutionContext): DataFrame;
override def feed(ctx: ProcessExecutionContext): DataFrame = {
if (!cache.isDefined) {
cache = Some(produce(ctx));
}
cache.get;
}
pec.get("backups").asInstanceOf[ArrayBuffer[Backup]].foreach(_.rollback());
}
def loadStream(streamSource: Source): Stream = {
@ -117,21 +63,6 @@ class SparkProcess extends Process with Logging {
}
}
case class SaveOps(streamSink: Sink, stream: Stream)
extends Ops {
def perform(ctx: ProcessExecutionContext): Unit = {
val input = stream.feed(ctx);
logger.debug {
val schema = input.schema;
val iid = stream.getId();
s"saving stream[$iid->_], schema: $schema, sink: $streamSink";
};
streamSink.save(input, ctx);
}
}
def writeStream(streamSink: Sink, stream: Stream): Unit = {
ends += SaveOps(streamSink, stream);
}
@ -155,6 +86,45 @@ class SparkProcess extends Process with Logging {
}
}
}
trait Ops {
def perform(ctx: ProcessExecutionContext): Unit;
}
abstract class CachedStream extends Stream {
val id = "" + IdGenerator.nextId[Stream];
val context = MMap[String, Any]();
var cache: Option[DataFrame] = None;
override def getId(): String = id;
def put(key: String, value: Any) = context(key) = value;
override def get(key: String): Any = context.get(key);
def produce(ctx: ProcessExecutionContext): DataFrame;
override def feed(ctx: ProcessExecutionContext): DataFrame = {
if (!cache.isDefined) {
cache = Some(produce(ctx));
}
cache.get;
}
}
case class SaveOps(streamSink: Sink, stream: Stream)
extends Ops {
def perform(ctx: ProcessExecutionContext): Unit = {
val input = stream.feed(ctx);
logger.debug {
val schema = input.schema;
val iid = stream.getId();
s"saving stream[$iid->_], schema: $schema, sink: $streamSink";
};
streamSink.save(input, ctx);
}
}
}
trait Stream {
@ -187,8 +157,20 @@ trait Sink {
def save(data: DataFrame, ctx: ProcessExecutionContext): Unit;
}
trait SinkWithBackup extends Sink {
def backup(ctx: ProcessExecutionContext): Backup;
}
trait Backup {
def getSink(): Sink;
def commit();
def rollback();
}
trait FunctionLogic {
def call(value: Any): Any;
def perform(value: Any): Any;
}
case class DoMap(func: FunctionLogic, targetSchema: StructType = null) extends Transformer1N1 {
@ -202,7 +184,7 @@ case class DoMap(func: FunctionLogic, targetSchema: StructType = null) extends T
}
};
data.map(func.call(_).asInstanceOf[Row])(encoder);
data.map(func.perform(_).asInstanceOf[Row])(encoder);
}
}
@ -218,7 +200,7 @@ case class DoFlatMap(func: FunctionLogic, targetSchema: StructType = null) exten
};
data.flatMap(x =>
JavaConversions.iterableAsScalaIterable(func.call(x).asInstanceOf[java.util.ArrayList[Row]]))(encoder);
JavaConversions.iterableAsScalaIterable(func.perform(x).asInstanceOf[java.util.ArrayList[Row]]))(encoder);
}
}

View File

@ -1,52 +0,0 @@
package util
/**
* Created by bluejoe on 2017/9/30.
*/
object ReflectUtils {
implicit def reflected(o: AnyRef) = new ReflectedObject(o);
def singleton[T](implicit m: Manifest[T]): AnyRef = {
val field = Class.forName(m.runtimeClass.getName + "$").getDeclaredField("MODULE$");
field.setAccessible(true);
field.get();
}
def instanceOf[T](args: Any*)(implicit m: Manifest[T]): T = {
val constructor = m.runtimeClass.getDeclaredConstructor(args.map(_.getClass): _*);
constructor.setAccessible(true);
constructor.newInstance(args.map(_.asInstanceOf[Object]): _*).asInstanceOf[T];
}
def instanceOf(className: String)(args: Any*) = {
val constructor = Class.forName(className).getDeclaredConstructor(args.map(_.getClass): _*);
constructor.setAccessible(true);
constructor.newInstance(args.map(_.asInstanceOf[Object]): _*);
}
}
class ReflectedObject(o: AnyRef) {
//employee._get("company.name")
def _get(name: String): AnyRef = {
var o2 = o;
for (fn <- name.split("\\.")) {
val field = o2.getClass.getDeclaredField(fn);
field.setAccessible(true);
o2 = field.get(o2);
}
o2;
}
def _getLazy(name: String): AnyRef = {
_call(s"${name}$$lzycompute")();
}
def _call(name: String)(args: Any*): AnyRef = {
//val method = o.getClass.getDeclaredMethod(name, args.map(_.getClass): _*);
//TODO: supports overloaded methods?
val methods = o.getClass.getDeclaredMethods.filter(_.getName.equals(name));
val method = methods(0);
method.setAccessible(true);
method.invoke(o, args.map(_.asInstanceOf[Object]): _*);
}
}

View File

@ -4,17 +4,10 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable.{Map => MMap}
trait IdGenerator {
def generateId(): Int;
}
object IdGenerator {
val map = MMap[String, IdGenerator]();
val map = MMap[String, AtomicInteger]();
def getNextId[T](implicit manifest: Manifest[T]) = map.getOrElseUpdate(manifest.runtimeClass.getName,
new IdGenerator() {
val ai = new AtomicInteger();
def generateId(): Int = ai.incrementAndGet();
})
def nextId[T](implicit manifest: Manifest[T]): Int =
map.getOrElseUpdate(manifest.runtimeClass.getName,
new AtomicInteger()).incrementAndGet();
}

View File

@ -1,4 +1,6 @@
import cn.piflow._
import cn.piflow.spark._
import cn.piflow.element._
import org.junit.{Assert, Test}
class FlowElementManagerTest {

View File

@ -1,15 +1,15 @@
import java.io.File
import cn.piflow.FlowElement
import cn.piflow.element._
import org.junit.Test
class FlowElementTest {
@Test
def test1(): Unit = {
val f = new File("./test1.json");
val flowJson = new FlowElement();
FlowElement.saveFile(flowJson, f);
val flowJson2 = FlowElement.fromFile(f);
val e = new FlowElement();
FlowElement.saveFile(e, f);
val e2 = FlowElement.fromFile(f);
}
}

View File

@ -2,7 +2,8 @@ import java.io.{File, FileInputStream, FileOutputStream}
import java.util.Date
import cn.piflow._
import cn.piflow.io.{Console, FileFormat, TextFile}
import cn.piflow.spark._
import cn.piflow.spark.io._
import org.apache.commons.io.{FileUtils, IOUtils}
import org.apache.spark.sql.SparkSession
import org.junit.Test