init
This commit is contained in:
parent
1228948ddb
commit
5fa5470fb2
|
@ -0,0 +1,25 @@
|
|||
BSD 2-Clause License
|
||||
|
||||
Copyright (c) 2018, Zhihong SHEN
|
||||
All rights reserved.
|
||||
|
||||
Redistribution and use in source and binary forms, with or without
|
||||
modification, are permitted provided that the following conditions are met:
|
||||
|
||||
* Redistributions of source code must retain the above copyright notice, this
|
||||
list of conditions and the following disclaimer.
|
||||
|
||||
* Redistributions in binary form must reproduce the above copyright notice,
|
||||
this list of conditions and the following disclaimer in the documentation
|
||||
and/or other materials provided with the distribution.
|
||||
|
||||
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
||||
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
||||
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
|
||||
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
||||
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
|
||||
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
|
||||
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
|
||||
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
|
@ -0,0 +1,6 @@
|
|||
# sparkflow
|
||||
|
||||
spark work flow:
|
||||
* no compile job code
|
||||
* XML defines regular jobs
|
||||
* SQL
|
|
@ -0,0 +1,37 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<groupId>bluejoe2008</groupId>
|
||||
<artifactId>piflow</artifactId>
|
||||
<version>1.0-SNAPSHOT</version>
|
||||
<properties>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<jetty.version>9.0.0.M0</jetty.version>
|
||||
<spark.version>2.1.0</spark.version>
|
||||
<scala.version>2.11.8</scala.version>
|
||||
<java.version>1.8</java.version>
|
||||
</properties>
|
||||
<dependencies>
|
||||
<!-- https://mvnrepository.com/artifact/junit/junit -->
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<version>4.11</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-core_2.11</artifactId>
|
||||
<version>${spark.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-sql_2.11</artifactId>
|
||||
<version>${spark.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
|
@ -0,0 +1,151 @@
|
|||
package cn.piflow
|
||||
|
||||
import java.util.Date
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
import cn.piflow.util.FormatUtils
|
||||
|
||||
import scala.collection.mutable.{ArrayBuffer, Map => MMap}
|
||||
|
||||
/**
|
||||
* Created by bluejoe on 2018/5/2.
|
||||
*/
|
||||
|
||||
trait Chain {
|
||||
def addProcess(name: String, process: Process, comment: String = null): String;
|
||||
|
||||
def scheduleAt(time: Date);
|
||||
|
||||
def scheduleAfter(processId: String, predecessors: String*);
|
||||
}
|
||||
|
||||
trait Execution {
|
||||
def awaitComplete();
|
||||
}
|
||||
|
||||
trait Runner {
|
||||
def run(chain: Chain, starts: String*): Execution;
|
||||
|
||||
def run(chain: Chain): Execution;
|
||||
}
|
||||
|
||||
trait ProcessContext {
|
||||
|
||||
}
|
||||
|
||||
trait Process {
|
||||
def run(pc: ProcessContext);
|
||||
}
|
||||
|
||||
|
||||
class ChainImpl extends Chain {
|
||||
val graph = new FlowGraph[ProcessInfo, String]();
|
||||
|
||||
case class ProcessInfo(name: String, process: Process, comment: String) {
|
||||
}
|
||||
|
||||
def getProcessInfo(id: String): ProcessInfo = graph.getNodeValue(id.toInt).asInstanceOf[ProcessInfo];
|
||||
|
||||
def addProcess(name: String, process: Process, comment: String = null) = {
|
||||
val id = graph.createNode(new ProcessInfo(name, process, comment));
|
||||
"" + id;
|
||||
}
|
||||
|
||||
def scheduleAfter(processId: String, predecessors: String*) = {
|
||||
predecessors.foreach { (predecessor) =>
|
||||
graph.link(predecessor.toInt, processId.toInt, "run after");
|
||||
}
|
||||
}
|
||||
|
||||
def getSuccessorNodes(nodeId: String): Seq[String] = {
|
||||
graph.getSuccessorEdges(nodeId.toInt).map("" + _.to);
|
||||
}
|
||||
|
||||
def getPredecessorNodes(nodeId: String): Seq[String] = {
|
||||
graph.getPredecessorEdges(nodeId.toInt).map("" + _.from);
|
||||
}
|
||||
}
|
||||
|
||||
class RunnerImpl extends Runner {
|
||||
def run(chain: Chain, starts: String*): Execution = {
|
||||
new ExecutionImpl(chain.asInstanceOf[ChainImpl], starts);
|
||||
}
|
||||
|
||||
def run(chain: Chain): Execution = {
|
||||
new ExecutionImpl(chain.asInstanceOf[ChainImpl], Seq());
|
||||
}
|
||||
}
|
||||
|
||||
class ExecutionImpl(chain: ChainImpl, starts: Seq[String]) extends Execution {
|
||||
def awaitComplete() = {
|
||||
if (!starts.isEmpty) {
|
||||
val todo = ArrayBuffer[String]();
|
||||
val completed = ArrayBuffer[String]();
|
||||
todo ++= starts;
|
||||
while (!todo.isEmpty) {
|
||||
val one = todo.head;
|
||||
//are all predecessor processes done?
|
||||
val pns = chain.getPredecessorNodes(one);
|
||||
val readyToRun = pns.filter(!completed.contains(_)).isEmpty;
|
||||
if (readyToRun) {
|
||||
val pi = chain.getProcessInfo(one);
|
||||
pi.process.run(null);
|
||||
|
||||
completed += one;
|
||||
todo.remove(0);
|
||||
todo ++= chain.getSuccessorNodes(one);
|
||||
}
|
||||
}
|
||||
}
|
||||
//TODO: timer triggers
|
||||
}
|
||||
}
|
||||
|
||||
class FlowGraph[NodeValue, EdgeValue] {
|
||||
private val nodeMap = MMap[Int, NodeValue]();
|
||||
private val edges = ArrayBuffer[GraphEdge]();
|
||||
private val nodeIdSerial = new AtomicInteger(0);
|
||||
|
||||
class GraphEdge(val from: Int, val to: Int, val label: EdgeValue) {
|
||||
def valueFrom() = nodeMap(from);
|
||||
|
||||
def valueTo() = nodeMap(to);
|
||||
}
|
||||
|
||||
def createNode(value: NodeValue): Int = {
|
||||
val nid = nodeIdSerial.incrementAndGet();
|
||||
nodeMap(nid) = value;
|
||||
nid;
|
||||
}
|
||||
|
||||
def getNodeValue(nodeId: Int) = nodeMap(nodeId);
|
||||
|
||||
def getSuccessorEdges(nodeId: Int): Seq[GraphEdge] = {
|
||||
edges.filter(_.from == nodeId);
|
||||
}
|
||||
|
||||
def getPredecessorEdges(nodeId: Int): Seq[GraphEdge] = {
|
||||
edges.filter(_.to == nodeId);
|
||||
}
|
||||
|
||||
def link(from: Int, to: Int,
|
||||
label: EdgeValue): FlowGraph[NodeValue, EdgeValue] = {
|
||||
edges += new GraphEdge(from, to, label);
|
||||
this;
|
||||
}
|
||||
|
||||
def show() {
|
||||
val data = edges
|
||||
.map { edge: GraphEdge ⇒
|
||||
val startNodeId = edge.from;
|
||||
val endNodeId = edge.to;
|
||||
Seq[Any](edge.from -> edge.to,
|
||||
s"$startNodeId->$endNodeId",
|
||||
edge.valueFrom(),
|
||||
edge.valueTo(),
|
||||
edge.label)
|
||||
}.sortBy(_.apply(0).asInstanceOf[(Int, Int)]).map(_.drop(1));
|
||||
|
||||
FormatUtils.printTable(Seq("", "from", "to", "label"), data);
|
||||
}
|
||||
}
|
|
@ -13,8 +13,9 @@ class ChainTest {
|
|||
val id2 = chain.addProcess("CountWords", new CountWords());
|
||||
val id3 = chain.addProcess("PrintCount", new PrintCount());
|
||||
|
||||
chain.link(id1, id2);
|
||||
chain.link(id2, id3);
|
||||
chain.scheduleAfter( id2,id1);
|
||||
chain.scheduleAfter(id3,id2);
|
||||
chain.scheduleAt(1000);
|
||||
|
||||
val runner = new RunnerImpl();
|
||||
val exe = runner.run(chain, id1);
|
||||
|
|
Loading…
Reference in New Issue