Path.of()

This commit is contained in:
bluejoe2008@gmail.com 2018-06-26 11:11:09 +08:00
parent e6a0886aef
commit 168a23bb5a
2 changed files with 30 additions and 1 deletions

View File

@ -1,5 +1,7 @@
package cn.piflow
import java.security.InvalidParameterException
import cn.piflow.util.{IdGenerator, Logging}
import org.apache.spark.sql._
@ -104,6 +106,33 @@ object Path {
}
};
}
def of(path: (Any, String)): Path = {
val pi = new PathImpl();
def _addEdges(path: (Any, String)): Unit = {
val value1 = path._1;
//String->String
if (value1.isInstanceOf[String]) {
pi.addEdge(new Edge(value1.asInstanceOf[String], path._2, "", ""));
}
//(String->String)->String
else if (value1.isInstanceOf[(Any, String)]) {
val tuple = value1.asInstanceOf[(Any, String)];
_addEdges(tuple);
pi.addEdge(new Edge(tuple._2, path._2, "", ""));
}
else {
throw new InvalidParameterException(s"invalid parameter: $value1, String or (String, String) expected!");
}
}
_addEdges(path);
pi;
}
}
class FlowImpl extends Flow {

View File

@ -20,7 +20,7 @@ class FlowTest {
flow.addProcess("PrintCount", new PrintCount());
flow.addProcess("PrintMessage", new PrintMessage());
flow.addPath(Path.from("CleanHouse").to("CopyTextFile").to("CountWords").to("PrintCount"));
flow.addPath(Path.of("CleanHouse" -> "CopyTextFile" -> "CountWords" -> "PrintCount"));
val spark = SparkSession.builder.master("local[4]")
.getOrCreate();