This commit is contained in:
bluejoe2008@gmail.com 2018-05-06 23:16:44 +08:00
parent 215d68820f
commit 8b985eae0f
3 changed files with 107 additions and 7 deletions

View File

@ -3,8 +3,80 @@
*/ */
package cn.piflow package cn.piflow
import java.util.concurrent.atomic.AtomicInteger
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.execution.datasources.{FileFormat => SparkFileFormat}
import scala.collection.mutable.{ArrayBuffer, Map => MMap}
class SparkETLProcess extends Process { class SparkETLProcess extends Process {
val ends = ArrayBuffer[() => Unit]();
val idgen = new AtomicInteger();
override def run(pc: ProcessContext): Unit = { override def run(pc: ProcessContext): Unit = {
ends.foreach(_.apply());
}
abstract class AbstractStream extends Stream {
val id = idgen.incrementAndGet();
override def getId(): Int = id;
}
def loadStream(streamSource: StreamSource): Stream = {
return new AbstractStream() {
override def getDataFrame(): DataFrame = {
println("load");
null;
}
}
}
def writeStream(stream: Stream, streamSink: StreamSink): Unit = {
ends += { () => {
println("write");
stream.getDataFrame().show();
}
};
}
def transform(stream: Stream, transformer: Transformer): Stream = {
return new AbstractStream() {
override def getDataFrame(): DataFrame = {
println("transform");
stream.getDataFrame();
}
}
}
}
trait Stream {
def getId(): Int;
def getDataFrame(): DataFrame;
}
trait StreamSource {
} }
trait Transformer {
}
trait StreamSink {
}
case class TextFile(path: String, format: String) extends StreamSource with StreamSink {
}
case class DoMap(mapFunc: String) extends Transformer {
}
case class DoFlatMap(mapFunc: String) extends Transformer {
} }

View File

@ -50,21 +50,20 @@ class FlowTest {
@Test @Test
def test2() { def test2() {
val fg = new SparkETLProcess(); val fg = new SparkETLProcess();
val node1 = fg.createNode(DoLoad(TextFile("./out/honglou.txt"))); val s1 = fg.loadStream(TextFile("./out/honglou.txt", "text"));
val node2 = fg.createNode(DoMap( val s2 = fg.transform(s1, DoMap(
""" """
function(s){ function(s){
return s.replaceAll("[\\x00-\\xff]||。|||“|”||| ", ""); return s.replaceAll("[\\x00-\\xff]||。|||“|”||| ", "");
}""")); }"""));
val node3 = fg.createNode(DoFlatMap( val s3 = fg.transform(s2, DoFlatMap(
""" """
function(s){ function(s){
return s.zip(s.drop(1)).map(t => "" + t._1 + t._2); return s.zip(s.drop(1)).map(t => "" + t._1 + t._2);
}""")); }"""));
val node4 = fg.createNode(DoWrite(JsonFile("./out/wordcount"))); fg.writeStream(s3, TextFile("./out/wordcount", "json"));
fg.pipe(node1, node2, node3, node4);
_testFlow1(fg); _testFlow1(fg);
} }

View File

@ -0,0 +1,29 @@
/**
* Created by bluejoe on 2018/5/6.
*/
import cn.piflow._
import org.junit.Test
class SparkETLTest {
@Test
def test1(): Unit = {
val fg = new SparkETLProcess();
val s1 = fg.loadStream(TextFile("./out/honglou.txt", "text"));
val s2 = fg.transform(s1, DoMap(
"""
function(s){
return s.replaceAll("[\\x00-\\xff]||。|||“|”||| ", "");
}"""));
val s3 = fg.transform(s2, DoFlatMap(
"""
function(s){
return s.zip(s.drop(1)).map(t => "" + t._1 + t._2);
}"""));
fg.writeStream(s3, TextFile("./out/wordcount", "json"));
fg.run(null);
}
}