add streaming window function

This commit is contained in:
judy0131 2019-03-05 15:59:56 +08:00
parent cd615ad58e
commit ca8abccf0c
3 changed files with 124 additions and 1 deletions

View File

@ -0,0 +1,53 @@
{
"flow":{
"name":"socketStreaming",
"uuid":"1234",
"stops":[
{
"uuid":"1111",
"name":"SocketTextStreamByWindow",
"bundle":"cn.piflow.bundle.streaming.SocketTextStreamByWindow",
"properties":{
"hostname":"10.0.86.98",
"port":"9999",
"batchDuration":"1",
"windowDuration":"5",
"slideDuration":"2"
}
},
{
"uuid":"2222",
"name":"ConvertSchema",
"bundle":"cn.piflow.bundle.common.ConvertSchema",
"properties":{
"schema":"value->line"
}
},
{
"uuid":"3333",
"name":"CsvSave",
"bundle":"cn.piflow.bundle.csv.CsvSave",
"properties":{
"csvSavePath":"hdfs://10.0.86.89:9000/xjzhu/flowStreaming",
"header":"true",
"delimiter":","
}
}
],
"paths":[
{
"from":"SocketTextStreamByWindow",
"outport":"",
"inport":"",
"to":"ConvertSchema"
},
{
"from":"ConvertSchema",
"outport":"",
"inport":"",
"to":"CsvSave"
}
]
}
}

View File

@ -0,0 +1,70 @@
package cn.piflow.bundle.streaming
import cn.piflow.conf._
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream
class SocketTextStreamByWindow extends ConfigurableStreamingStop {
override val authorEmail: String = "xjzhu@cnic.cn"
override val description: String = "Receive text data from a data server listening on a TCP socket by window."
override val inportList: List[String] = List(PortEnum.NonePort)
override val outportList: List[String] = List(PortEnum.DefaultPort)
override var batchDuration: Int = _
var hostname:String =_
var port:String=_
var windowDuration:Int = _
var slideDuration:Int = _
override def setProperties(map: Map[String, Any]): Unit = {
hostname=MapUtil.get(map,key="hostname").asInstanceOf[String]
port=MapUtil.get(map,key="port").asInstanceOf[String]
windowDuration=MapUtil.get(map,key="windowDuration").asInstanceOf[String].toInt
slideDuration=MapUtil.get(map,key="slideDuration").asInstanceOf[String].toInt
val timing = MapUtil.get(map,key="batchDuration")
batchDuration=if(timing == None) new Integer(1) else timing.asInstanceOf[String].toInt
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val hostname = new PropertyDescriptor().name("hostname").displayName("hostname").description("socket hostname ").defaultValue("").required(true)
val port = new PropertyDescriptor().name("port").displayName("port").description("socket port").defaultValue("").required(true)
val batchDuration = new PropertyDescriptor().name("batchDuration").displayName("batchDuration").description("the streaming batch duration").defaultValue("1").required(true)
val windowDuration = new PropertyDescriptor().name("windowDuration").displayName("windowDuration").description("the window duration, the unit is seconds").defaultValue("").required(true)
val slideDuration = new PropertyDescriptor().name("slideDuration").displayName("slideDuration").description("the slide duration, the unit is seconds").defaultValue("").required(true)
descriptor = hostname :: descriptor
descriptor = port :: descriptor
descriptor = batchDuration :: descriptor
descriptor = windowDuration :: descriptor
descriptor = slideDuration :: descriptor
descriptor
}
//TODO: change icon
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("icon/streaming/socketTextStream.png")
}
override def getGroup(): List[String] = {
List(StopGroup.StreamingGroup)
}
override def initialize(ctx: ProcessContext): Unit = {
}
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
}
override def getDStream(ssc: StreamingContext): DStream[String] = {
val dstream = ssc.socketTextStream(hostname,Integer.parseInt(port))
dstream.window(Seconds(windowDuration),Seconds(slideDuration))
//dstream.reduceByWindow(_ + _,Seconds(windowDuration),Seconds(slideDuration))
}
}

View File

@ -15,7 +15,7 @@ class StreamingTest {
def testSockStreaming(): Unit ={ def testSockStreaming(): Unit ={
//parse flow json //parse flow json
val file = "src/main/resources/flow/flow_SocketTextStreaming.json" val file = "src/main/resources/flow/flow_SocketTextStreamingByWindow.json"
val flowJsonStr = FileUtil.fileReader(file) val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]] val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map) println(map)