support flume streaming

This commit is contained in:
judy0131 2019-03-01 13:34:47 +08:00
parent 87a3b002e4
commit 91561ad010
12 changed files with 195 additions and 7 deletions

View File

@ -178,13 +178,12 @@
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->
<!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.8.0</version>
</dependency>
</dependency>-->
<!--https://mvnrepository.com/artifact/io.netty/netty-all-->
<dependency>

View File

@ -0,0 +1,50 @@
{
"flow":{
"name":"flumeStreaming",
"uuid":"1234",
"stops":[
{
"uuid":"1111",
"name":"SocketTextStream",
"bundle":"cn.piflow.bundle.streaming.FlumeStream",
"properties":{
"hostname":"10.0.86.191",
"port":"9998"
}
},
{
"uuid":"2222",
"name":"ConvertSchema",
"bundle":"cn.piflow.bundle.common.ConvertSchema",
"properties":{
"schema":"value->line"
}
},
{
"uuid":"3333",
"name":"CsvSave",
"bundle":"cn.piflow.bundle.csv.CsvSave",
"properties":{
"csvSavePath":"hdfs://10.0.86.89:9000/xjzhu/flowStreaming",
"header":"true",
"delimiter":","
}
}
],
"paths":[
{
"from":"SocketTextStream",
"outport":"",
"inport":"",
"to":"ConvertSchema"
},
{
"from":"ConvertSchema",
"outport":"",
"inport":"",
"to":"CsvSave"
}
]
}
}

View File

@ -8,7 +8,7 @@
"name":"TextFileStream",
"bundle":"cn.piflow.bundle.streaming.TextFileStream",
"properties":{
"directory":"hdfs://10.0.86.89:9000/xjzhu/textfileStream"
"directory":"hdfs://10.0.86.89:9000/xjzhu/streamDir"
}
},

View File

@ -0,0 +1,23 @@
streamingAgent.sources = spoolSource
streamingAgent.sinks = avroSink
streamingAgent.channels = channel
# Describe/configure the source
streamingAgent.sources.spoolSource.type = spooldir
streamingAgent.sources.spoolSource.channels = channel
streamingAgent.sources.spoolSource.spoolDir = /opt/flume-streaming
streamingAgent.sources.spoolSource.fileHeader = true
# Describe the sink
streamingAgent.sinks.avroSink.type = avro
streamingAgent.sinks.avroSink.hostname=10.0.86.210
streamingAgent.sinks.avroSink.port=7777
# Use a channel which buffers events in memory
streamingAgent.channels.channel.type = memory
streamingAgent.channels.channel.capacity = 1000
streamingAgent.channels.channel.transactionCapacity = 100
# Bind the source and sink to the channel
streamingAgent.sources.spoolSource.channels = channel
streamingAgent.sinks.avroSink.channel = channel

View File

@ -0,0 +1,52 @@
package cn.piflow.bundle.streaming
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.conf.{ConfigurableStreamingStop, PortEnum, StopGroup}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.flume._
class FlumeStream extends ConfigurableStreamingStop{
override val timing: Integer = 10
override val authorEmail: String = "xjzhu@cnic.cn"
override val description: String = "get data from flume Stream."
override val inportList: List[String] = List(PortEnum.NonePort)
override val outportList: List[String] = List(PortEnum.DefaultPort)
var hostname:String =_
var port:Int=_
override def setProperties(map: Map[String, Any]): Unit = {
hostname=MapUtil.get(map,key="hostname").asInstanceOf[String]
port=MapUtil.get(map,key="port").asInstanceOf[String].toInt
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val hostname = new PropertyDescriptor().name("hostname").displayName("hostname").description("flume sink hostname ").defaultValue("").required(true)
val port = new PropertyDescriptor().name("port").displayName("port").description("flume sink port").defaultValue("").required(true)
descriptor = hostname :: descriptor
descriptor = port :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("mllib.png")
}
override def getGroup(): List[String] = {
List(StopGroup.StreamingGroup)
}
override def getDStream(ssc: StreamingContext): DStream[String] = {
val flumeStream = FlumeUtils.createStream(ssc, hostname, port)
flumeStream.map(e => new String(e.event.getBody.array(), "UTF-8"))
}
override def initialize(ctx: ProcessContext): Unit = {}
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {}
}

View File

@ -38,7 +38,7 @@ class TextFileStream extends ConfigurableStreamingStop{
override def getDStream(ssc: StreamingContext): DStream[String] = {
val dstream = ssc.textFileStream(directory)
dstream.asInstanceOf[DStream[String]]
dstream
}
override def initialize(ctx: ProcessContext): Unit = {}

View File

@ -15,7 +15,7 @@ class StreamingTest {
def testSockStreaming(): Unit ={
//parse flow json
val file = "src/main/resources/flow_KafkaStreaming.json"
val file = "src/main/resources/flow/flow_FlumeStreaming.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)

View File

@ -8,7 +8,7 @@ import org.apache.http.util.EntityUtils
object HTTPClientStartFlowStreaming {
def main(args: Array[String]): Unit = {
val json =
/*val json =
"""
|{
| "flow":{
@ -61,6 +61,59 @@ object HTTPClientStartFlowStreaming {
| ]
| }
|}
""".stripMargin*/
val json=
"""
|{
| "flow":{
| "name":"flumeStreaming",
| "uuid":"1234",
| "stops":[
| {
| "uuid":"1111",
| "name":"SocketTextStream",
| "bundle":"cn.piflow.bundle.streaming.FlumeStream",
| "properties":{
| "hostname":"10.0.86.210",
| "port":"7777"
| }
|
| },
| {
| "uuid":"2222",
| "name":"ConvertSchema",
| "bundle":"cn.piflow.bundle.common.ConvertSchema",
| "properties":{
| "schema":"value->line"
| }
| },
| {
| "uuid":"3333",
| "name":"CsvSave",
| "bundle":"cn.piflow.bundle.csv.CsvSave",
| "properties":{
| "csvSavePath":"hdfs://10.0.86.89:9000/xjzhu/flowStreaming",
| "header":"true",
| "delimiter":","
| }
| }
| ],
| "paths":[
| {
| "from":"SocketTextStream",
| "outport":"",
| "inport":"",
| "to":"ConvertSchema"
| },
| {
| "from":"ConvertSchema",
| "outport":"",
| "inport":"",
| "to":"CsvSave"
| }
| ]
| }
|}
""".stripMargin
val url = "http://10.0.86.98:8001/flow/start"
val client = HttpClients.createDefault()

View File

@ -64,6 +64,11 @@
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>

View File

@ -31,4 +31,10 @@ clean package -Dmaven.test.skip=true -U
</property>
5.kafka related jars are needed to put on the spark cluster
spark-streaming-kafka-0-10_2.11-2.1.0.jar
kafka_2.11-2.1.1.jar
kafka-clients-2.1.1.jar
6.flume related jars are needed to put on the spark cluster
spark-streaming-flume_2.11-2.1.0.jar