javascript function

This commit is contained in:
bluejoe2008@gmail.com 2018-05-10 15:27:33 +08:00
parent 8aab03c2b7
commit 26cf55255c
7 changed files with 336 additions and 196 deletions

View File

@ -37,7 +37,7 @@ trait Event {
* start process while dependent processes completed
*/
object DependencyTrigger {
def dependency(processName: String, dependentProcesses: String*): Trigger = new Trigger() {
def isDependentOn(processName: String, dependentProcesses: String*): Trigger = new Trigger() {
override def activate(executionContext: ExecutionContext): Unit = {
val listener = new EventListener {
var fired = false;
@ -100,6 +100,9 @@ object EventTrigger {
}
trait Execution {
def start(args: Map[String, Any] = Map());
def getContext(): ExecutionContext;
def stop();
@ -113,14 +116,6 @@ trait Runner {
def run(flow: Flow, starts: String*): Execution;
}
class ProcessContext {
val context = MMap[String, Any]();
def get(key: String): Any = context(key);
def put(key: String, value: Any) = context(key) = value;
}
trait Process {
def run(pc: ProcessContext);
}
@ -146,9 +141,7 @@ class FlowImpl extends Flow {
class RunnerImpl extends Runner {
def run(flow: Flow, starts: String*): Execution = {
val execution = new ExecutionImpl(flow, starts);
execution.start();
execution;
new ExecutionImpl(flow, starts);
}
}
@ -162,7 +155,20 @@ trait EventEmiter {
def on(event: Event, listener: EventListener): Unit;
}
trait ExecutionContext extends EventEmiter {
trait Context {
def get(key: String): Any;
def get[T]()(implicit m: Manifest[T]): T = {
get(m.runtimeClass.getName).asInstanceOf[T];
}
def put(key: String, value: Any): this.type;
def put[T](value: T)(implicit m: Manifest[T]): this.type =
put(m.runtimeClass.getName, value);
}
trait ExecutionContext extends Context with EventEmiter {
def getFlow(): Flow;
def scheduleProcess(name: String): Unit;
@ -170,6 +176,24 @@ trait ExecutionContext extends EventEmiter {
def scheduleProcessCronly(cronExpr: String, processName: String): Unit;
}
class ProcessContext(executionContext: ExecutionContext) extends Context {
val context = MMap[String, Any]();
override def get(key: String): Any = {
if (context.contains(key))
context(key);
else
executionContext.get(key);
};
override def put(key: String, value: Any): this.type = {
context(key) = value;
this;
};
def getExecutionContext(): ExecutionContext = executionContext;
}
class EventEmiterImpl extends EventEmiter with Logging {
val listeners = MMap[Event, ArrayBuffer[EventListener]]();
@ -194,7 +218,10 @@ class EventEmiterImpl extends EventEmiter with Logging {
}
class ExecutionImpl(flow: Flow, starts: Seq[String]) extends Execution with Logging {
def start(): Unit = {
def start(args: Map[String, Any]): Unit = {
args.foreach { (en) =>
executionContext.put(en._1, en._2);
};
triggers.foreach(_.activate(executionContext));
quartzScheduler.start();
starts.foreach { processName =>
@ -222,6 +249,15 @@ class ExecutionImpl(flow: Flow, starts: Seq[String]) extends Execution with Logg
}
val executionContext = new EventEmiterImpl() with ExecutionContext {
val context = MMap[String, Any]();
def get(key: String): Any = context(key);
def put(key: String, value: Any) = {
context(key) = value;
this;
}
//listens on LaunchProcess
this.on(LaunchProcess(), new EventListener() {
override def handle(event: Event, args: Any): Unit = {
@ -281,9 +317,13 @@ class ExecutionImpl(flow: Flow, starts: Seq[String]) extends Execution with Logg
logger.debug(s"process completed: $processName");
//notify all triggers
//TODO: ProcessCompleted is not true when job running is failed
//this makes flow continue to run next process when current job fails
executionContext.fire(ProcessCompleted(processName));
}
});
override def getContext() = executionContext;
}
case class LaunchProcess() extends Event {
@ -300,6 +340,18 @@ class ProcessAsQuartzJob extends Job with Logging {
val map = context.getJobDetail.getJobDataMap;
val processName = map.get("processName").asInstanceOf[String];
val executionContext = context.getScheduler.getContext.get("executionContext").asInstanceOf[ExecutionContext];
executionContext.getFlow().getProcess(processName).run(null);
try {
executionContext.getFlow().getProcess(processName)
.run(new ProcessContext(executionContext));
}
catch {
case e: Throwable =>
throw new ProcessExecutionException(processName, e);
}
}
}
class ProcessExecutionException(processName: String, cause: Throwable)
extends RuntimeException(s"failed to execute process: $processName", cause) {
}

View File

@ -5,21 +5,23 @@ package cn.piflow
import java.util.concurrent.atomic.AtomicInteger
import java.util.{Map => JMap}
import javax.script.{Compilable, CompiledScript => JCompiledScript, ScriptEngineManager}
import javax.script.{Compilable, ScriptEngineManager}
import org.apache.spark.api.java.function.{FlatMapFunction, MapFunction}
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
import cn.piflow.util.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.StructType
import scala.collection.JavaConversions._
import scala.collection.immutable.StringOps
import scala.collection.mutable.{ArrayBuffer, Map => MMap}
class SparkETLProcess extends Process {
val ends = ArrayBuffer[(SparkProcessContext) => Unit]();
class SparkETLProcess extends Process with Logging {
val ends = ArrayBuffer[(ProcessContext) => Unit]();
val idgen = new AtomicInteger();
override def run(pc: ProcessContext): Unit = {
ends.foreach(_.apply(pc.asInstanceOf[SparkProcessContext]));
ends.foreach(_.apply(pc));
}
abstract class AbstractStream extends Stream {
@ -33,144 +35,137 @@ class SparkETLProcess extends Process {
override def get(key: String): Any = context.get(key);
}
def loadStream(streamSource: DatasetSource): Stream = {
def loadStream(streamSource: DataSource): Stream = {
return new AbstractStream() {
override def singleDataset(ctx: SparkProcessContext): Dataset[Any] = {
streamSource.loadDataset(ctx);
override def produce(ctx: ProcessContext): DataFrame = {
logger.debug {
val oid = this.getId();
s"loading stream[_->$oid], source: $streamSource";
};
streamSource.load(ctx);
}
}
}
def writeStream(stream: Stream, streamSink: DatasetSink): Unit = {
ends += { (ctx: SparkProcessContext) => {
streamSink.saveDataset(stream.singleDataset(ctx), ctx);
}
def writeStream(stream: Stream, streamSink: DataSink): Unit = {
ends += {
(ctx: ProcessContext) => {
val input = stream.produce(ctx);
logger.debug {
val schema = input.schema;
val iid = stream.getId();
s"saving stream[$iid->_], schema: $schema, sink: $streamSink";
};
streamSink.save(input, ctx);
}
};
}
def transform(stream: Stream, transformer: DatasetTransformer): Stream = {
def transform(stream: Stream, transformer: DataTransformer): Stream = {
return new AbstractStream() {
override def singleDataset(ctx: SparkProcessContext): Dataset[Any] = {
transformer.transform(stream.singleDataset(ctx), ctx);
override def produce(ctx: ProcessContext): DataFrame = {
val input = stream.produce(ctx);
logger.debug {
val schema = input.schema;
val iid = stream.getId();
val oid = this.getId();
s"transforming stream[$iid->$oid], schema: $schema, transformer: $transformer"
};
transformer.transform(input, ctx);
}
}
}
}
class SparkProcessContext extends ProcessContext {
val spark = SparkSession.builder.master("local[4]")
.getOrCreate();
super.put(classOf[SparkSession].getName, spark);
def getSparkSession() = get(classOf[SparkSession].getName).asInstanceOf[SparkSession];
}
trait Stream {
def getId(): Int;
def singleDataset(ctx: SparkProcessContext): Dataset[Any];
def produce(ctx: ProcessContext): DataFrame;
def get(key: String): Any;
}
trait DatasetSource {
def loadDataset(ctx: SparkProcessContext): Dataset[Any];
trait DataSource {
def load(ctx: ProcessContext): DataFrame;
}
trait DatasetTransformer {
def transform(dataset: Dataset[Any], ctx: SparkProcessContext): Dataset[Any];
trait DataTransformer {
def transform(data: DataFrame, ctx: ProcessContext): DataFrame;
}
trait DatasetSink {
def saveDataset(dataset: Dataset[Any], ctx: SparkProcessContext): Unit;
trait DataSink {
def save(data: DataFrame, ctx: ProcessContext): Unit;
}
case class TextFile(path: String, format: String) extends DatasetSource with DatasetSink {
override def loadDataset(ctx: SparkProcessContext): Dataset[Any] = {
ctx.getSparkSession().read.textFile(path).asInstanceOf[Dataset[Any]];
}
override def saveDataset(dataset: Dataset[Any], ctx: SparkProcessContext): Unit = {
dataset.write.json(path);
case class Console(nlimit: Int = 20) extends DataSink {
override def save(data: DataFrame, ctx: ProcessContext): Unit = {
data.show(nlimit);
}
}
case class DoMap(mapFuncText: String, targetClass: Class[_], lang: String = ScriptEngine.JAVASCRIPT) extends DatasetTransformer {
def transform(dataset: Dataset[Any], ctx: SparkProcessContext): Dataset[Any] = {
dataset.map(new MapFunction[Any, Any]() {
val cached = ArrayBuffer[CompiledFunction]();
case class TextFile(path: String, format: String = FileFormat.TEXT) extends DataSource with DataSink {
override def load(ctx: ProcessContext): DataFrame = {
ctx.get[SparkSession].read.format(format).load(path).asInstanceOf[DataFrame];
}
override def call(value: Any): Any = {
if (cached.isEmpty) {
try {
val engine = ScriptEngine.get(lang);
cached += engine.compile(mapFuncText);
}
catch {
case e: Throwable =>
throw new ScriptExecutionErrorException(e, mapFuncText, value);
}
}
override def save(data: DataFrame, ctx: ProcessContext): Unit = {
data.write.format(format).save(path);
}
}
try {
cached(0).invoke(Map("value" -> value));
}
catch {
case e: Throwable =>
throw new ScriptExecutionErrorException(e, mapFuncText, value);
};
object FileFormat {
val TEXT = "text";
val JSON = "json";
}
trait FunctionLogic {
def call(value: Any): Any;
}
case class DoMap(func: FunctionLogic, targetSchema: StructType = null) extends DataTransformer {
def transform(data: DataFrame, ctx: ProcessContext): DataFrame = {
val encoder = RowEncoder {
if (targetSchema == null) {
data.schema;
}
}, EncoderManager.encoderFor(targetClass));
}
}
case class DoFlatMap(mapFuncText: String, targetClass: Class[_], lang: String = ScriptEngine.JAVASCRIPT) extends DatasetTransformer {
def transform(dataset: Dataset[Any], ctx: SparkProcessContext): Dataset[Any] = {
dataset.flatMap(new FlatMapFunction[Any, Any]() {
val cached = ArrayBuffer[CompiledFunction]();
override def call(value: Any): java.util.Iterator[Any] = {
if (cached.isEmpty) {
try {
val engine = ScriptEngine.get(lang);
cached += engine.compile(mapFuncText);
}
catch {
case e: Throwable =>
throw new ScriptCompilationErrorException(e, mapFuncText);
}
}
try {
cached(0).invoke(Map("value" -> value))
.asInstanceOf[java.util.Collection[_]]
.iterator
.asInstanceOf[java.util.Iterator[Any]];
}
catch {
case e: Throwable =>
throw new ScriptExecutionErrorException(e, mapFuncText, value);
};
else {
targetSchema;
}
}, EncoderManager.encoderFor(targetClass));
};
data.map(func.call(_).asInstanceOf[Row])(encoder);
}
}
object EncoderManager {
val stockEncoders = Map[Class[_], Encoder[_]](
classOf[java.lang.String] -> Encoders.STRING,
classOf[java.lang.Integer] -> Encoders.INT,
classOf[java.lang.Boolean] -> Encoders.BOOLEAN,
classOf[java.lang.Float] -> Encoders.FLOAT,
classOf[java.lang.Double] -> Encoders.DOUBLE,
classOf[Int] -> Encoders.scalaInt
);
case class DoFlatMap(func: FunctionLogic, targetSchema: StructType = null) extends DataTransformer {
def transform(data: DataFrame, ctx: ProcessContext): DataFrame = {
val encoder = RowEncoder {
if (targetSchema == null) {
data.schema;
}
else {
targetSchema;
}
};
def encoderFor(clazz: Class[_]): Encoder[Any] = {
if (stockEncoders.contains(clazz))
stockEncoders(clazz).asInstanceOf[Encoder[Any]];
else
throw new NoSuitableEncoderException(clazz);
data.flatMap(func.call(_).asInstanceOf[java.util.Iterator[Row]])(encoder);
}
}
case class ExecuteSQL(sql: String) extends DataTransformer {
def transform(data: DataFrame, ctx: ProcessContext): DataFrame = {
try {
data.createOrReplaceTempView("table0");
ctx.get[SparkSession].sql(sql).asInstanceOf[DataFrame];
}
catch {
case e: Throwable =>
throw new SqlExecutionErrorException(e, sql);
}
}
}
@ -178,6 +173,10 @@ class NoSuitableEncoderException(clazz: Class[_]) extends RuntimeException {
override def getMessage: String = s"no suitable encoder for $clazz";
}
class SqlExecutionErrorException(cause: Throwable, sql: String)
extends RuntimeException(s"sql execution error, sql: $sql", cause) {
}
class ScriptExecutionErrorException(cause: Throwable, sourceScript: String, args: Any)
extends RuntimeException(s"script execution error, script: $sourceScript, args: $args", cause) {
}
@ -190,12 +189,38 @@ trait ScriptEngine {
def compile(funcText: String): CompiledFunction;
}
object ScriptEngine {
val JAVASCRIPT = "javascript";
val SCALA = "scala";
val engines = Map[String, ScriptEngine](JAVASCRIPT -> new JavaScriptEngine());
def get(lang: String) = engines(lang);
def logic(script: String, lang: String = ScriptEngine.JAVASCRIPT): FunctionLogic = new FunctionLogic with Serializable {
val cached = ArrayBuffer[CompiledFunction]();
override def call(value: Any): Any = {
if (cached.isEmpty) {
try {
val engine = ScriptEngine.get(lang);
cached += engine.compile(script);
}
catch {
case e: Throwable =>
throw new ScriptExecutionErrorException(e, script, value);
}
}
try {
cached(0).invoke(Map("value" -> value));
}
catch {
case e: Throwable =>
throw new ScriptExecutionErrorException(e, script, value);
};
}
}
}
trait CompiledFunction {
@ -204,7 +229,12 @@ trait CompiledFunction {
class JavaScriptEngine extends ScriptEngine {
val engine = new ScriptEngineManager().getEngineByName("javascript");
engine.put("Tool", Tool);
val tools = {
val map = MMap[String, AnyRef]();
map += "$" -> Predef;
map.toMap;
}
def compile(funcText: String): CompiledFunction = {
val wrapped = s"($funcText)(value)";
@ -213,6 +243,7 @@ class JavaScriptEngine extends ScriptEngine {
def invoke(args: Map[String, Any] = Map[String, Any]()): Any = {
val bindings = engine.createBindings();
bindings.asInstanceOf[JMap[String, Any]].putAll(tools);
bindings.asInstanceOf[JMap[String, Any]].putAll(args);
val value = compiled.eval(bindings);
@ -222,6 +253,18 @@ class JavaScriptEngine extends ScriptEngine {
}
}
object Tool {
def ops(s: String) = new StringOps(s);
object Predef {
def StringOps(s: String) = new StringOps(s);
def Row(value1: Any) = _row(value1);
def Row(value1: Any, value2: Any) = _row(value1, value2);
def Row(value1: Any, value2: Any, value3: Any) = _row(value1, value2, value3);
def Row(value1: Any, value2: Any, value3: Any, value4: Any) = _row(value1, value2, value3, value4);
private def _row(values: Any*) = org.apache.spark.sql.Row(values: _*);
def Array() = new java.util.ArrayList();
}

View File

@ -1,14 +1,22 @@
/**
* Created by bluejoe on 2018/5/8.
*/
//type javascripts in this file to use IDE formatter
var fun =
function (s) {
var arr = Array();
var len = s.length;
for (var i = 0; i < s.length - 1; i++) {
arr.push(s.substring(i, i + 2));
}
//type scripts here to use javascript formatter
[
[
function (row) {
var arr = Array();
var str = row.get(0);
var len = str.length;
for (var i = 0; i < len - 1; i++) {
arr.push($.Row(str.substring(i, i + 2)));
}
return arr;
}
return java.util.Arrays.asList($.Row(arr));
}
,
function (row) {
return $.Row(row.get(0).replaceAll("[\\x00-\\xff]||。|||“|”||| ", ""));
}
]
]

View File

@ -8,4 +8,5 @@ log4j.logger.org=WARN
log4j.logger.io=WARN
log4j.logger.java=WARN
log4j.logger.org.quartz=WARN
#log4j.logger.cn.piflow.ExecutionImpl=WARN
log4j.logger.cn.piflow.ExecutionImpl$$anon$1=WARN
log4j.logger.cn.piflow.ExecutionImpl=DEBUG

View File

@ -7,59 +7,72 @@ import org.apache.spark.sql.SparkSession
import org.junit.Test
class FlowTest {
private def _testFlow1(processCountWords: Process) {
private def runFlow(processes: Map[String, Process]) {
val flow = new FlowImpl();
flow.addProcess("CleanHouse", new CleanHouse());
flow.addProcess("CopyTextFile", new CopyTextFile());
flow.addProcess("CountWords", processCountWords);
flow.addProcess("PrintCount", new PrintCount());
processes.foreach(en => flow.addProcess(en._1, en._2));
flow.addProcess("PrintMessage", new PrintMessage());
flow.addTrigger(DependencyTrigger.dependency("CopyTextFile", "CleanHouse"));
flow.addTrigger(DependencyTrigger.dependency("CountWords", "CopyTextFile"));
flow.addTrigger(DependencyTrigger.dependency("PrintCount", "CountWords"));
flow.addTrigger(DependencyTrigger.isDependentOn("CopyTextFile", "CleanHouse"));
flow.addTrigger(DependencyTrigger.isDependentOn("CountWords", "CopyTextFile"));
flow.addTrigger(DependencyTrigger.isDependentOn("PrintCount", "CountWords"));
flow.addTrigger(TimerTrigger.cron("0/5 * * * * ? ", "PrintMessage"));
val runner = new RunnerImpl();
val exe = runner.run(flow, "CleanHouse");
val spark = SparkSession.builder.master("local[4]")
.getOrCreate();
exe.start(Map(classOf[SparkSession].getName -> spark));
Thread.sleep(20000);
exe.stop();
}
@Test
def test1() {
_testFlow1(new CountWords());
runFlow(Map(
"CleanHouse" -> new CleanHouse(),
"CopyTextFile" -> new CopyTextFile(),
"CountWords" -> new CountWords(),
"PrintCount" -> new PrintCount()));
}
class CountWords extends Process {
def run(pc: ProcessContext): Unit = {
val spark = SparkSession.builder.master("local[4]")
.getOrCreate();
import spark.implicits._
val count = spark.read.textFile("./out/honglou.txt")
.map(_.replaceAll("[\\x00-\\xff]||。|||“|”||| ", ""))
.flatMap(s => s.zip(s.drop(1)).map(t => "" + t._1 + t._2))
.groupBy("value").count.sort($"count".desc);
count.write.json("./out/wordcount");
spark.close();
}
}
@Test
def testProcessError() {
runFlow(Map(
"CleanHouse" -> new CleanHouse(),
"CopyTextFile" -> new Process() {
def run(pc: ProcessContext): Unit = {
throw new RuntimeException("this is a bad process!");
}
},
"CountWords" -> new CountWords(),
"PrintCount" -> new PrintCount()));
}
@Test
def test2() {
val fg = new SparkETLProcess();
val s1 = fg.loadStream(TextFile("./out/honglou.txt", "text"));
val s2 = fg.transform(s1, DoMap(
SparkETLTest.SCRIPT_1, classOf[String]));
runFlow(Map(
"CleanHouse" -> new CleanHouse(),
"CopyTextFile" -> new CopyTextFile(),
"CountWords" -> SparkETLTest.createProcessCountWords(),
"PrintCount" -> SparkETLTest.createProcessPrintCount()));
}
}
val s3 = fg.transform(s2, DoFlatMap(
SparkETLTest.SCRIPT_2, classOf[String]));
class CountWords extends Process {
def run(pc: ProcessContext): Unit = {
val spark = SparkSession.builder.master("local[4]")
.getOrCreate();
import spark.implicits._
val count = spark.read.textFile("./out/honglou.txt")
.map(_.replaceAll("[\\x00-\\xff]||。|||“|”||| ", ""))
.flatMap(s => s.zip(s.drop(1)).map(t => "" + t._1 + t._2))
.groupBy("value").count.sort($"count".desc);
fg.writeStream(s3, TextFile("./out/wordcount", "json"));
_testFlow1(fg);
count.write.json("./out/wordcount");
spark.close();
}
}

View File

@ -25,6 +25,8 @@ class ScriptEngineTest {
println(engine.eval("true;").getClass);
println(engine.eval("1.1;").getClass);
println(engine.eval("var x = {'a':1}; x;").getClass);
println(engine.eval("new java.lang.Object()").getClass);
println(engine.eval("java").getClass);
}
@Test

View File

@ -6,42 +6,63 @@ import java.io.File
import cn.piflow._
import org.apache.commons.io.FileUtils
import org.apache.spark.sql.SparkSession
import org.junit.Test
class SparkETLTest {
@Test
def test1(): Unit = {
val fg = new SparkETLProcess();
val s1 = fg.loadStream(TextFile("./out/honglou.txt", "text"));
val s2 = fg.transform(s1, DoMap(
SparkETLTest.SCRIPT_1, classOf[String]));
val s3 = fg.transform(s2, DoFlatMap(
SparkETLTest.SCRIPT_2, classOf[String]));
fg.writeStream(s3, TextFile("./out/wordcount", "json"));
FileUtils.deleteDirectory(new File("./out/wordcount"));
fg.run(new SparkProcessContext());
}
}
object SparkETLTest {
val SCRIPT_1 =
"""
function(s){
return s.replaceAll("[\\x00-\\xff]||。|||“|”||| ", "");
}""";
function (row) {
return $.Row(row.get(0).replaceAll("[\\x00-\\xff]||。|||“|”||| ", ""));
}
""";
val SCRIPT_2 =
"""
function (s) {
var arr = Array();
var len = s.length;
for (var i = 0; i < s.length - 1; i++) {
arr.push(s.substring(i, i + 2));
}
function (row) {
var arr = $.Array();
var str = row.get(0);
var len = str.length;
for (var i = 0; i < len - 1; i++) {
arr.add($.Row(str.substring(i, i + 2)));
}
return java.util.Arrays.asList(arr);
}
return arr.iterator();
}
""";
def createProcessCountWords() = {
val processCountWords = new SparkETLProcess();
val s1 = processCountWords.loadStream(TextFile("./out/honglou.txt", FileFormat.TEXT));
val s2 = processCountWords.transform(s1, DoMap(ScriptEngine.logic(SCRIPT_1)));
val s3 = processCountWords.transform(s2, DoFlatMap(ScriptEngine.logic(SCRIPT_2)));
val s4 = processCountWords.transform(s3, ExecuteSQL(
"select value, count(*) count from table0 group by value order by count desc"));
processCountWords.writeStream(s4, TextFile("./out/wordcount", FileFormat.JSON));
processCountWords;
}
def createProcessPrintCount() = {
val processPrintCount = new SparkETLProcess();
val s1 = processPrintCount.loadStream(TextFile("./out/wordcount", FileFormat.JSON));
val s2 = processPrintCount.transform(s1, ExecuteSQL(
"select value from table0 order by count desc"));
processPrintCount.writeStream(s2, Console(40));
processPrintCount;
}
}
class SparkETLTest {
@Test
def test1(): Unit = {
FileUtils.deleteDirectory(new File("./out/wordcount"));
val ctx = new ProcessContext(null);
ctx.put[SparkSession](SparkSession.builder.master("local[4]")
.getOrCreate());
SparkETLTest.createProcessCountWords().run(ctx);
SparkETLTest.createProcessPrintCount().run(ctx);
}
}