support kafka streaming

This commit is contained in:
judy0131 2019-02-28 11:00:21 +08:00
parent 695e61e455
commit c28fee4c1e
11 changed files with 226 additions and 8 deletions

Binary file not shown.

View File

@ -297,7 +297,7 @@
</execution>
<execution>
<id>install-external-3</id>
<id>install-external-4</id>
<goals>
<goal>install-file</goal>
</goals>

View File

@ -0,0 +1,51 @@
{
"flow":{
"name":"TextFileStream",
"uuid":"1234",
"stops":[
{
"uuid":"1111",
"name":"kafkaStream",
"bundle":"cn.piflow.bundle.streaming.KafkaStream",
"properties":{
"brokers":"10.0.86.191:9092,10.0.86.203:9092,10.0.86.210:9092",
"groupId":"piflow",
"topics":"streaming"
}
},
{
"uuid":"2222",
"name":"ConvertSchema",
"bundle":"cn.piflow.bundle.common.ConvertSchema",
"properties":{
"schema":"value->line"
}
},
{
"uuid":"3333",
"name":"CsvSave",
"bundle":"cn.piflow.bundle.csv.CsvSave",
"properties":{
"csvSavePath":"hdfs://10.0.86.89:9000/xjzhu/flowStreaming",
"header":"true",
"delimiter":","
}
}
],
"paths":[
{
"from":"kafkaStream",
"outport":"",
"inport":"",
"to":"ConvertSchema"
},
{
"from":"ConvertSchema",
"outport":"",
"inport":"",
"to":"CsvSave"
}
]
}
}

View File

@ -8,7 +8,7 @@
"name":"TextFileStream",
"bundle":"cn.piflow.bundle.streaming.TextFileStream",
"properties":{
"directory":"hdfs://10.0.86.89:9000/xjzhu/"
"directory":"hdfs://10.0.86.89:9000/xjzhu/textfileStream"
}
},

View File

@ -0,0 +1,72 @@
package cn.piflow.bundle.streaming
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.conf.{ConfigurableStreamingStop, PortEnum, StopGroup}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
class KafkaStream extends ConfigurableStreamingStop{
override val timing: Integer = 1
override val authorEmail: String = "xjzhu@cnic.cn"
override val description: String = "read data from kafka."
override val inportList: List[String] = List(PortEnum.NonePort)
override val outportList: List[String] = List(PortEnum.DefaultPort)
var brokers:String = _
var groupId:String = _
var topics:Array[String] = _
override def setProperties(map: Map[String, Any]): Unit = {
brokers=MapUtil.get(map,key="brokers").asInstanceOf[String]
groupId=MapUtil.get(map,key="groupId").asInstanceOf[String]
topics=MapUtil.get(map,key="topics").asInstanceOf[String].split(",")
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val brokers = new PropertyDescriptor().name("brokers").displayName("brokers").description("kafka brokers, seperated by ','").defaultValue("").required(true)
val groupId = new PropertyDescriptor().name("groupId").displayName("groupId").description("kafka consumer group").defaultValue("group").required(true)
val topics = new PropertyDescriptor().name("topics").displayName("topics").description("kafka topics").defaultValue("").required(true)
descriptor = brokers :: descriptor
descriptor = groupId :: descriptor
descriptor = topics :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("mllib.png")
}
override def getGroup(): List[String] = {
List(StopGroup.StreamingGroup)
}
override def getDStream(ssc: StreamingContext): DStream[String] = {
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> brokers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupId,
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false:java.lang.Boolean)
)
val stream = KafkaUtils.createDirectStream[String,String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.map(record => record.key() + "," + record.value())
//stream.asInstanceOf[DStream[ConsumerRecord]]
}
override def initialize(ctx: ProcessContext): Unit = {}
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {}
}

View File

@ -12,8 +12,8 @@ import org.apache.spark.streaming.{Seconds, StreamingContext}
class SocketTextStream extends ConfigurableStreamingStop {
override val authorEmail: String = "xjzhu@cnic.cn"
override val description: String = "Receive text data from a data server listening on a TCP socket."
override val inportList: List[String] = List(PortEnum.NonePort.toString)
override val outportList: List[String] = List(PortEnum.DefaultPort.toString)
override val inportList: List[String] = List(PortEnum.NonePort)
override val outportList: List[String] = List(PortEnum.DefaultPort)
override val timing: Integer = 1
var hostname:String =_
@ -75,7 +75,8 @@ class SocketTextStream extends ConfigurableStreamingStop {
//lines.asInstanceOf[ReceiverInputDStream[String]]
}*/
override def getDStream(ssc: StreamingContext): DStream[String] = {
ssc.socketTextStream(hostname,Integer.parseInt(port))
val dstream = ssc.socketTextStream(hostname,Integer.parseInt(port))
dstream.asInstanceOf[DStream[String]]
}
}

View File

@ -37,7 +37,8 @@ class TextFileStream extends ConfigurableStreamingStop{
}
override def getDStream(ssc: StreamingContext): DStream[String] = {
ssc.textFileStream(directory)
val dstream = ssc.textFileStream(directory)
dstream.asInstanceOf[DStream[String]]
}
override def initialize(ctx: ProcessContext): Unit = {}

View File

@ -15,7 +15,7 @@ class StreamingTest {
def testSockStreaming(): Unit ={
//parse flow json
val file = "src/main/resources/flow_SocketTextStreaming.json"
val file = "src/main/resources/flow_KafkaStreaming.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)

View File

@ -7,11 +7,15 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
import cn.piflow.util._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import scala.collection.mutable.{ArrayBuffer, Map => MMap}
import scala.reflect.ClassTag
@ -452,7 +456,7 @@ class ProcessImpl(flow: Flow, runnerContext: Context, runner: Runner, parentProc
val lines = streamingStop.getDStream(ssc)
lines.foreachRDD {
rdd => {
println(rdd.count())
//println(rdd.count())
val spark = pec.get[SparkSession]()
import spark.implicits._
val df = rdd.toDF("value")

View File

@ -0,0 +1,79 @@
package cn.piflow.api
import org.apache.http.client.methods.{CloseableHttpResponse, HttpPost}
import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.HttpClients
import org.apache.http.util.EntityUtils
object HTTPClientStartFlow {
def main(args: Array[String]): Unit = {
val json =
"""
|{
| "flow":{
| "name":"TextFileStream",
| "uuid":"1234",
| "stops":[
| {
| "uuid":"1111",
| "name":"kafkaStream",
| "bundle":"cn.piflow.bundle.streaming.KafkaStream",
| "properties":{
| "brokers":"10.0.86.191:9092,10.0.86.203:9092,10.0.86.210:9092",
| "groupId":"piflow",
| "topics":"streaming"
| }
|
| },
| {
| "uuid":"2222",
| "name":"ConvertSchema",
| "bundle":"cn.piflow.bundle.common.ConvertSchema",
| "properties":{
| "schema":"value->line"
| }
| },
| {
| "uuid":"3333",
| "name":"CsvSave",
| "bundle":"cn.piflow.bundle.csv.CsvSave",
| "properties":{
| "csvSavePath":"hdfs://10.0.86.89:9000/xjzhu/flowStreaming",
| "header":"true",
| "delimiter":","
| }
| }
| ],
| "paths":[
| {
| "from":"kafkaStream",
| "outport":"",
| "inport":"",
| "to":"ConvertSchema"
| },
| {
| "from":"ConvertSchema",
| "outport":"",
| "inport":"",
| "to":"CsvSave"
| }
| ]
| }
|}
""".stripMargin
val url = "http://10.0.86.98:8001/flow/start"
val client = HttpClients.createDefault()
val post:HttpPost = new HttpPost(url)
post.addHeader("Content-Type", "application/json")
post.setEntity(new StringEntity(json))
val response:CloseableHttpResponse = client.execute(post)
val entity = response.getEntity
val str = EntityUtils.toString(entity,"UTF-8")
println("Code is " + str)
}
}

10
pom.xml
View File

@ -64,6 +64,16 @@
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>