diff --git a/src/main/scala/etl.scala b/src/main/scala/etl.scala index 965ebf1..8b853a3 100644 --- a/src/main/scala/etl.scala +++ b/src/main/scala/etl.scala @@ -3,8 +3,80 @@ */ package cn.piflow -class SparkETLProcess extends Process{ - override def run(pc: ProcessContext): Unit = { +import java.util.concurrent.atomic.AtomicInteger +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.execution.datasources.{FileFormat => SparkFileFormat} + +import scala.collection.mutable.{ArrayBuffer, Map => MMap} + +class SparkETLProcess extends Process { + val ends = ArrayBuffer[() => Unit](); + val idgen = new AtomicInteger(); + + override def run(pc: ProcessContext): Unit = { + ends.foreach(_.apply()); } + + abstract class AbstractStream extends Stream { + val id = idgen.incrementAndGet(); + + override def getId(): Int = id; + } + + def loadStream(streamSource: StreamSource): Stream = { + return new AbstractStream() { + override def getDataFrame(): DataFrame = { + println("load"); + null; + } + } + } + + def writeStream(stream: Stream, streamSink: StreamSink): Unit = { + ends += { () => { + println("write"); + stream.getDataFrame().show(); + } + }; + } + + def transform(stream: Stream, transformer: Transformer): Stream = { + return new AbstractStream() { + override def getDataFrame(): DataFrame = { + println("transform"); + stream.getDataFrame(); + } + } + } +} + +trait Stream { + def getId(): Int; + + def getDataFrame(): DataFrame; +} + +trait StreamSource { + +} + +trait Transformer { + +} + +trait StreamSink { + +} + +case class TextFile(path: String, format: String) extends StreamSource with StreamSink { + +} + +case class DoMap(mapFunc: String) extends Transformer { + +} + +case class DoFlatMap(mapFunc: String) extends Transformer { + } \ No newline at end of file diff --git a/src/test/scala/FlowTest.scala b/src/test/scala/FlowTest.scala index c64bacc..87943d5 100644 --- a/src/test/scala/FlowTest.scala +++ b/src/test/scala/FlowTest.scala @@ -50,21 +50,20 @@ class FlowTest { @Test def test2() { val fg = new SparkETLProcess(); - val node1 = fg.createNode(DoLoad(TextFile("./out/honglou.txt"))); - val node2 = fg.createNode(DoMap( + val s1 = fg.loadStream(TextFile("./out/honglou.txt", "text")); + val s2 = fg.transform(s1, DoMap( """ function(s){ return s.replaceAll("[\\x00-\\xff]|,|。|:|.|“|”|?|!| ", ""); }""")); - val node3 = fg.createNode(DoFlatMap( + val s3 = fg.transform(s2, DoFlatMap( """ function(s){ return s.zip(s.drop(1)).map(t => "" + t._1 + t._2); }""")); - val node4 = fg.createNode(DoWrite(JsonFile("./out/wordcount"))); - fg.pipe(node1, node2, node3, node4); + fg.writeStream(s3, TextFile("./out/wordcount", "json")); _testFlow1(fg); } diff --git a/src/test/scala/SparkETLTest.scala b/src/test/scala/SparkETLTest.scala new file mode 100644 index 0000000..1e1b3e5 --- /dev/null +++ b/src/test/scala/SparkETLTest.scala @@ -0,0 +1,29 @@ +/** + * Created by bluejoe on 2018/5/6. + */ + +import cn.piflow._ +import org.junit.Test + +class SparkETLTest { + @Test + def test1(): Unit = { + val fg = new SparkETLProcess(); + val s1 = fg.loadStream(TextFile("./out/honglou.txt", "text")); + val s2 = fg.transform(s1, DoMap( + """ + function(s){ + return s.replaceAll("[\\x00-\\xff]|,|。|:|.|“|”|?|!| ", ""); + }""")); + + val s3 = fg.transform(s2, DoFlatMap( + """ + function(s){ + return s.zip(s.drop(1)).map(t => "" + t._1 + t._2); + }""")); + + fg.writeStream(s3, TextFile("./out/wordcount", "json")); + + fg.run(null); + } +}