From ca8abccf0c9b0037ed0f717b4441ef07dfd728db Mon Sep 17 00:00:00 2001 From: judy0131 Date: Tue, 5 Mar 2019 15:59:56 +0800 Subject: [PATCH] add streaming window function --- .../flow_SocketTextStreamingByWindow.json | 53 ++++++++++++++ .../streaming/SocketTextStreamByWindow.scala | 70 +++++++++++++++++++ .../cn/piflow/bundle/StreamingTest.scala | 2 +- 3 files changed, 124 insertions(+), 1 deletion(-) create mode 100644 piflow-bundle/src/main/resources/flow/flow_SocketTextStreamingByWindow.json create mode 100644 piflow-bundle/src/main/scala/cn/piflow/bundle/streaming/SocketTextStreamByWindow.scala diff --git a/piflow-bundle/src/main/resources/flow/flow_SocketTextStreamingByWindow.json b/piflow-bundle/src/main/resources/flow/flow_SocketTextStreamingByWindow.json new file mode 100644 index 0000000..705321a --- /dev/null +++ b/piflow-bundle/src/main/resources/flow/flow_SocketTextStreamingByWindow.json @@ -0,0 +1,53 @@ +{ + "flow":{ + "name":"socketStreaming", + "uuid":"1234", + "stops":[ + { + "uuid":"1111", + "name":"SocketTextStreamByWindow", + "bundle":"cn.piflow.bundle.streaming.SocketTextStreamByWindow", + "properties":{ + "hostname":"10.0.86.98", + "port":"9999", + "batchDuration":"1", + "windowDuration":"5", + "slideDuration":"2" + } + + }, + { + "uuid":"2222", + "name":"ConvertSchema", + "bundle":"cn.piflow.bundle.common.ConvertSchema", + "properties":{ + "schema":"value->line" + } + }, + { + "uuid":"3333", + "name":"CsvSave", + "bundle":"cn.piflow.bundle.csv.CsvSave", + "properties":{ + "csvSavePath":"hdfs://10.0.86.89:9000/xjzhu/flowStreaming", + "header":"true", + "delimiter":"," + } + } + ], + "paths":[ + { + "from":"SocketTextStreamByWindow", + "outport":"", + "inport":"", + "to":"ConvertSchema" + }, + { + "from":"ConvertSchema", + "outport":"", + "inport":"", + "to":"CsvSave" + } + ] + } +} \ No newline at end of file diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/streaming/SocketTextStreamByWindow.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/streaming/SocketTextStreamByWindow.scala new file mode 100644 index 0000000..1d3f099 --- /dev/null +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/streaming/SocketTextStreamByWindow.scala @@ -0,0 +1,70 @@ +package cn.piflow.bundle.streaming + +import cn.piflow.conf._ +import cn.piflow.conf.bean.PropertyDescriptor +import cn.piflow.conf.util.{ImageUtil, MapUtil} +import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.streaming.dstream.DStream + +class SocketTextStreamByWindow extends ConfigurableStreamingStop { + override val authorEmail: String = "xjzhu@cnic.cn" + override val description: String = "Receive text data from a data server listening on a TCP socket by window." + override val inportList: List[String] = List(PortEnum.NonePort) + override val outportList: List[String] = List(PortEnum.DefaultPort) + override var batchDuration: Int = _ + + var hostname:String =_ + var port:String=_ + var windowDuration:Int = _ + var slideDuration:Int = _ + + override def setProperties(map: Map[String, Any]): Unit = { + hostname=MapUtil.get(map,key="hostname").asInstanceOf[String] + port=MapUtil.get(map,key="port").asInstanceOf[String] + windowDuration=MapUtil.get(map,key="windowDuration").asInstanceOf[String].toInt + slideDuration=MapUtil.get(map,key="slideDuration").asInstanceOf[String].toInt + val timing = MapUtil.get(map,key="batchDuration") + batchDuration=if(timing == None) new Integer(1) else timing.asInstanceOf[String].toInt + } + + override def getPropertyDescriptor(): List[PropertyDescriptor] = { + var descriptor : List[PropertyDescriptor] = List() + val hostname = new PropertyDescriptor().name("hostname").displayName("hostname").description("socket hostname ").defaultValue("").required(true) + val port = new PropertyDescriptor().name("port").displayName("port").description("socket port").defaultValue("").required(true) + val batchDuration = new PropertyDescriptor().name("batchDuration").displayName("batchDuration").description("the streaming batch duration").defaultValue("1").required(true) + val windowDuration = new PropertyDescriptor().name("windowDuration").displayName("windowDuration").description("the window duration, the unit is seconds").defaultValue("").required(true) + val slideDuration = new PropertyDescriptor().name("slideDuration").displayName("slideDuration").description("the slide duration, the unit is seconds").defaultValue("").required(true) + descriptor = hostname :: descriptor + descriptor = port :: descriptor + descriptor = batchDuration :: descriptor + descriptor = windowDuration :: descriptor + descriptor = slideDuration :: descriptor + descriptor + } + + //TODO: change icon + override def getIcon(): Array[Byte] = { + ImageUtil.getImage("icon/streaming/socketTextStream.png") + } + + override def getGroup(): List[String] = { + List(StopGroup.StreamingGroup) + } + + override def initialize(ctx: ProcessContext): Unit = { + + } + + override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { + + + } + + override def getDStream(ssc: StreamingContext): DStream[String] = { + val dstream = ssc.socketTextStream(hostname,Integer.parseInt(port)) + dstream.window(Seconds(windowDuration),Seconds(slideDuration)) + //dstream.reduceByWindow(_ + _,Seconds(windowDuration),Seconds(slideDuration)) + } + +} diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/StreamingTest.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/StreamingTest.scala index 897d4f8..95d797c 100644 --- a/piflow-bundle/src/test/scala/cn/piflow/bundle/StreamingTest.scala +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/StreamingTest.scala @@ -15,7 +15,7 @@ class StreamingTest { def testSockStreaming(): Unit ={ //parse flow json - val file = "src/main/resources/flow/flow_SocketTextStreaming.json" + val file = "src/main/resources/flow/flow_SocketTextStreamingByWindow.json" val flowJsonStr = FileUtil.fileReader(file) val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]] println(map)