simplify process

This commit is contained in:
bluejoe2008@gmail.com 2018-06-22 16:45:16 +08:00
parent e45dc768ae
commit 0762fb7eb2
2 changed files with 248 additions and 65 deletions

View File

@ -2,7 +2,6 @@ package cn.piflow
import cn.piflow.util.{IdGenerator, Logging}
import org.apache.spark.sql._
import org.quartz.{Trigger => QuartzTrigger}
import scala.collection.mutable.{ArrayBuffer, Map => MMap}
@ -53,42 +52,54 @@ trait Flow {
}
trait Path {
def toArrowSeq(): Seq[Arrow];
def toEdges(): Seq[Edge];
def to(processTo: String): Path;
def addEdge(edge: Edge): Path;
def to(processTo: String, bundleOut: String = "", bundleIn: String = ""): Path;
}
class PathImpl extends Path {
val arrows = ArrayBuffer[Arrow]();
class PathImpl() extends Path {
val edges = ArrayBuffer[Edge]();
override def toArrowSeq(): Seq[Arrow] = arrows.toSeq;
override def toEdges(): Seq[Edge] = edges.toSeq;
override def to(processTo: String): Path = {
arrows.last.processTo = processTo;
override def addEdge(edge: Edge): Path = {
edges += edge;
this;
}
def add(arrow: Arrow) = {
arrows += arrow;
override def to(processTo: String, bundleOut: String, bundleIn: String): Path = {
edges += new Edge(edges.last.processTo, processTo, bundleOut, bundleIn);
this;
}
}
class Arrow(val processFrom: String, var processTo: String, var bundleOut: String, var bundleIn: String) {
class Edge(val processFrom: String, var processTo: String, var bundleOut: String, var bundleIn: String) {
override def toString() = {
s"[$processFrom]-($bundleOut)-($bundleIn)-[$processTo]";
}
}
object Path {
def from(processFrom: String): Path = {
val path = new PathImpl();
path.add(new Arrow(processFrom, null, "", ""));
path;
trait PathHead {
def to(processTo: String, bundleOut: String = "", bundleIn: String = ""): Path;
}
def from(processFrom: String): PathHead = {
new PathHead() {
override def to(processTo: String, bundleOut: String, bundleIn: String): Path = {
val path = new PathImpl();
path.addEdge(new Edge(processFrom, processTo, bundleOut, bundleIn));
path;
}
};
}
}
class FlowImpl extends Flow {
val arrows = ArrayBuffer[Arrow]();
val edges = ArrayBuffer[Edge]();
val processes = MMap[String, Process]();
def addProcess(name: String, process: Process) = {
@ -97,7 +108,7 @@ class FlowImpl extends Flow {
};
def print(): Unit = {
arrows.foreach { arrow =>
edges.foreach { arrow =>
println(arrow.toString());
}
}
@ -107,33 +118,34 @@ class FlowImpl extends Flow {
override def getProcessNames(): Seq[String] = processes.map(_._1).toSeq;
def addPath(path: Path): Flow = {
arrows ++= path.toArrowSeq();
edges ++= path.toEdges();
this;
}
override def analyze(): AnalyzedFlowGraph =
new AnalyzedFlowGraph() {
val previousProcesses = MMap[String, ArrayBuffer[Arrow]]();
val nextProcesses = MMap[String, ArrayBuffer[Arrow]]();
val incomingEdges = MMap[String, ArrayBuffer[Edge]]();
val outgoingEdges = MMap[String, ArrayBuffer[Edge]]();
arrows.foreach { arrow =>
previousProcesses.getOrElseUpdate(arrow.processTo, ArrayBuffer[Arrow]()) += arrow;
nextProcesses.getOrElseUpdate(arrow.processFrom, ArrayBuffer[Arrow]()) += arrow;
edges.foreach { edge =>
incomingEdges.getOrElseUpdate(edge.processTo, ArrayBuffer[Edge]()) += edge;
outgoingEdges.getOrElseUpdate(edge.processFrom, ArrayBuffer[Edge]()) += edge;
}
private def _visitProcess[T](processName: String, op: (String, Map[String, T]) => T, visited: MMap[String, T]): T = {
private def _visitProcess[T](processName: String, op: (String, Map[Edge, T]) => T, visited: MMap[String, T]): T = {
if (!visited.contains(processName)) {
//executes dependent processes
val inputs =
if (previousProcesses.contains(processName)) {
val arrows = previousProcesses(processName);
arrows.map { arrow =>
arrow.bundleIn ->
_visitProcess(arrow.processFrom, op, visited);
if (incomingEdges.contains(processName)) {
//all incoming edges
val edges = incomingEdges(processName);
edges.map { edge =>
edge ->
_visitProcess(edge.processFrom, op, visited);
}.toMap
}
else {
Map[String, T]();
Map[Edge, T]();
}
val ret = op(processName, inputs);
@ -145,8 +157,8 @@ class FlowImpl extends Flow {
}
}
override def visit[T](op: (String, Map[String, T]) => T): Unit = {
val ends = processes.keys.filterNot(nextProcesses.contains(_));
override def visit[T](op: (String, Map[Edge, T]) => T): Unit = {
val ends = processes.keys.filterNot(outgoingEdges.contains(_));
val visited = MMap[String, T]();
ends.foreach {
_visitProcess(_, op, visited);
@ -156,7 +168,7 @@ class FlowImpl extends Flow {
}
trait AnalyzedFlowGraph {
def visit[T](op: (String, Map[String, T]) => T): Unit;
def visit[T](op: (String, Map[Edge, T]) => T): Unit;
}
object Runner {
@ -193,11 +205,17 @@ class ProcessInputStreamImpl() extends ProcessInputStream {
override def isEmpty(): Boolean = inputs.isEmpty;
def attach(inputs: Map[String, ProcessOutputStreamImpl]) = {
this.inputs ++ inputs;
def attach(inputs: Map[Edge, ProcessOutputStreamImpl]) = {
this.inputs ++= inputs.filter(x => x._2.contains(x._1.bundleOut))
.map(x => (x._1.bundleIn, x._2.getDataFrame(x._1.bundleOut)));
};
override def read(): DataFrame = read(inputs.keysIterator.next());
override def read(): DataFrame = {
if (inputs.isEmpty)
throw new NoInputAvailableException();
read(inputs.head._1);
};
override def read(bundle: String): DataFrame = {
inputs(bundle);
@ -207,7 +225,7 @@ class ProcessInputStreamImpl() extends ProcessInputStream {
class ProcessOutputStreamImpl() extends ProcessOutputStream {
val mapDataFrame = MMap[String, DataFrame]();
override def write(data: DataFrame): Unit = write("default", data);
override def write(data: DataFrame): Unit = write("", data);
override def sendError(): Unit = ???
@ -215,6 +233,8 @@ class ProcessOutputStreamImpl() extends ProcessOutputStream {
mapDataFrame(bundle) = data;
}
def contains(bundle: String) = mapDataFrame.contains(bundle);
def getDataFrame(bundle: String) = mapDataFrame(bundle);
}
@ -248,8 +268,8 @@ class FlowExecutionImpl(flow: Flow, runnerContext: Context)
val analyzed = flow.analyze();
//runs start processes
analyzed.visit[ProcessOutputStreamImpl]((processName: String, inputs: Map[String, ProcessOutputStreamImpl]) => {
//runs processes
analyzed.visit[ProcessOutputStreamImpl]((processName: String, inputs: Map[Edge, ProcessOutputStreamImpl]) => {
val pe = executions(processName);
var outputs: ProcessOutputStreamImpl = null;
try {
@ -306,7 +326,7 @@ class ProcessExecutionImpl(processName: String, process: Process, flowExecutionC
def getContext() = pec;
def perform(inputs: Map[String, ProcessOutputStreamImpl]): ProcessOutputStreamImpl = {
def perform(inputs: Map[Edge, ProcessOutputStreamImpl]): ProcessOutputStreamImpl = {
pec.getInputStream().asInstanceOf[ProcessInputStreamImpl].attach(inputs);
process.perform(pec.getInputStream(), pec.getOutputStream(), pec);
pec.getOutputStream().asInstanceOf[ProcessOutputStreamImpl];
@ -395,4 +415,12 @@ class FlowExecutionLogger extends FlowExecutionListener with Logging {
val processName = ctx.getProcessExecution().getProcessName();
logger.debug(s"process completed: $processName");
};
}
class FlowException extends RuntimeException {
}
class NoInputAvailableException extends FlowException {
}

View File

@ -19,9 +19,75 @@ class FlowTest {
flow.addProcess("PrintCount", new PrintCount());
flow.addProcess("PrintMessage", new PrintMessage());
flow.addPath(Path.from("CleanHouse").to("CopyTextFile"));
flow.addPath(Path.from("CopyTextFile").to("CountWords"));
flow.addPath(Path.from("CountWords").to("PrintCount"));
flow.addPath(Path.from("CleanHouse").to("CopyTextFile").to("CountWords").to("PrintCount"));
val spark = SparkSession.builder.master("local[4]")
.getOrCreate();
val exe = Runner.bind("localBackupDir", "/tmp/")
.bind(classOf[SparkSession].getName, spark)
.run(flow);
flow.print();
exe.start();
}
@Test
def testPipedProcess() {
val flow = new FlowImpl();
flow.addProcess("CleanHouse", new CleanHouse());
flow.addProcess("PipedReadTextFile", new PipedReadTextFile());
flow.addProcess("PipedCountWords", new PipedCountWords());
flow.addProcess("PipedPrintCount", new PipedPrintCount());
flow.addPath(Path.from("CleanHouse").to("PipedReadTextFile").to("PipedCountWords").to("PipedPrintCount"));
val spark = SparkSession.builder.master("local[4]")
.getOrCreate();
val exe = Runner.bind("localBackupDir", "/tmp/")
.bind(classOf[SparkSession].getName, spark)
.run(flow);
flow.print();
exe.start();
}
@Test
def testMergeProcess() {
val flow = new FlowImpl();
flow.addProcess("flow1", new TestDataGeneratorProcess(Seq("a", "b", "c")));
flow.addProcess("flow2", new TestDataGeneratorProcess(Seq("1", "2", "3")));
flow.addProcess("zip", new ZipProcess());
flow.addProcess("print", new PrintDataFrameProcess());
flow.addPath(Path.from("flow1").to("zip", "", "data1").to("print"));
flow.addPath(Path.from("flow2").to("zip", "", "data2"));
val spark = SparkSession.builder.master("local[4]")
.getOrCreate();
val exe = Runner.bind("localBackupDir", "/tmp/")
.bind(classOf[SparkSession].getName, spark)
.run(flow);
flow.print();
exe.start();
}
@Test
def testForkProcess() {
val flow = new FlowImpl();
flow.addProcess("flow", new TestDataGeneratorProcess(Seq("a", "b", "c", "d")));
flow.addProcess("fork", new ForkProcess());
flow.addProcess("print1", new PrintDataFrameProcess());
flow.addProcess("print2", new PrintDataFrameProcess());
flow.addPath(Path.from("flow").to("fork").to("print1", "data1", ""));
flow.addPath(Path.from("fork").to("print2", "data2", ""));
val spark = SparkSession.builder.master("local[4]")
.getOrCreate();
@ -145,28 +211,6 @@ class FlowTest {
}
}
class CountWords extends Process {
def perform(in: ProcessInputStream, out: ProcessOutputStream, pec: ProcessExecutionContext): Unit = {
val spark = SparkSession.builder.master("local[4]")
.getOrCreate();
import spark.implicits._
val count = spark.read.textFile("./out/honglou.txt")
.map(_.replaceAll("[\\x00-\\xff]||。|||“|”||| ", ""))
.flatMap(s => s.zip(s.drop(1)).map(t => "" + t._1 + t._2))
.groupBy("value").count.sort($"count".desc);
val tmpfile = File.createTempFile(this.getClass.getName + "-", "");
tmpfile.delete();
count.write.json(tmpfile.getAbsolutePath);
spark.close();
tmpfile.renameTo(new File("./out/wordcount"));
}
def initialize(ctx: FlowExecutionContext): Unit = {
}
}
class PrintMessage extends Process {
def perform(in: ProcessInputStream, out: ProcessOutputStream, pec: ProcessExecutionContext): Unit = {
println("*****hello******" + new Date());
@ -215,4 +259,115 @@ class PrintCount extends Process {
def initialize(ctx: FlowExecutionContext): Unit = {
}
}
class CountWords extends Process {
def perform(in: ProcessInputStream, out: ProcessOutputStream, pec: ProcessExecutionContext): Unit = {
val spark = pec.get[SparkSession]();
import spark.implicits._
val count = spark.read.textFile("./out/honglou.txt")
.map(_.replaceAll("[\\x00-\\xff]||。|||“|”||| ", ""))
.flatMap(s => s.zip(s.drop(1)).map(t => "" + t._1 + t._2))
.groupBy("value").count.sort($"count".desc);
val tmpfile = File.createTempFile(this.getClass.getName + "-", "");
tmpfile.delete();
count.write.json(tmpfile.getAbsolutePath);
spark.close();
tmpfile.renameTo(new File("./out/wordcount"));
}
def initialize(ctx: FlowExecutionContext): Unit = {
}
}
class PipedReadTextFile extends Process {
def perform(in: ProcessInputStream, out: ProcessOutputStream, pec: ProcessExecutionContext): Unit = {
val spark = pec.get[SparkSession]();
val df = spark.read.json("./testdata/honglou.txt");
out.write(df);
}
def initialize(ctx: FlowExecutionContext): Unit = {
}
}
class PipedCountWords extends Process {
def perform(in: ProcessInputStream, out: ProcessOutputStream, pec: ProcessExecutionContext): Unit = {
val spark = pec.get[SparkSession]();
import spark.implicits._
val df = in.read();
val count = df.as[String]
.map(_.replaceAll("[\\x00-\\xff]||。|||“|”||| ", ""))
.flatMap(s => s.zip(s.drop(1)).map(t => "" + t._1 + t._2))
.groupBy("value").count.sort($"count".desc);
out.write(count);
}
def initialize(ctx: FlowExecutionContext): Unit = {
}
}
class PipedPrintCount extends Process {
def perform(in: ProcessInputStream, out: ProcessOutputStream, pec: ProcessExecutionContext): Unit = {
val spark = pec.get[SparkSession]();
import spark.implicits._
val df = in.read();
val count = df.sort($"count".desc);
count.show(40);
out.write(df);
}
def initialize(ctx: FlowExecutionContext): Unit = {
}
}
class PrintDataFrameProcess extends Process {
def perform(in: ProcessInputStream, out: ProcessOutputStream, pec: ProcessExecutionContext): Unit = {
val spark = pec.get[SparkSession]();
val df = in.read();
df.show(40);
}
def initialize(ctx: FlowExecutionContext): Unit = {
}
}
class TestDataGeneratorProcess(seq: Seq[String]) extends Process {
override def initialize(ctx: FlowExecutionContext): Unit = {}
override def perform(in: ProcessInputStream, out: ProcessOutputStream, pec: ProcessExecutionContext): Unit = {
val spark = pec.get[SparkSession]();
import spark.implicits._
out.write(seq.toDF());
}
}
class ZipProcess extends Process {
override def initialize(ctx: FlowExecutionContext): Unit = {}
override def perform(in: ProcessInputStream, out: ProcessOutputStream, pec: ProcessExecutionContext): Unit = {
out.write(in.read("data1").union(in.read("data2")));
}
}
class ForkProcess extends Process {
override def initialize(ctx: FlowExecutionContext): Unit = {}
override def perform(in: ProcessInputStream, out: ProcessOutputStream, pec: ProcessExecutionContext): Unit = {
val ds = in.read();
val spark = pec.get[SparkSession]();
import spark.implicits._
out.write("data1", ds.as[String].filter(_.head % 2 == 0).toDF());
out.write("data2", ds.as[String].filter(_.head % 2 == 1).toDF());
}
}