1.verify flume streaming example

2.add debugData api
This commit is contained in:
judy0131 2019-03-07 15:59:10 +08:00
parent 375af3a1ab
commit 0936d4b495
12 changed files with 361 additions and 191 deletions

View File

@ -8,8 +8,8 @@
"name":"SocketTextStream",
"bundle":"cn.piflow.bundle.streaming.FlumeStream",
"properties":{
"hostname":"10.0.86.191",
"port":"9998",
"hostname":"slave1",
"port":"10001",
"batchDuration":"5"
}

View File

@ -8,7 +8,7 @@
"name":"TextFileStream",
"bundle":"cn.piflow.bundle.streaming.TextFileStream",
"properties":{
"directory":"hdfs://10.0.86.89:9000/xjzhu/streamDir",
"directory":"hdfs://10.0.86.89:9000/textfilestream1",
"batchDuration":"5"
}

View File

@ -8,10 +8,10 @@ streamingAgent.sources.spoolSource.channels = channel
streamingAgent.sources.spoolSource.spoolDir = /opt/flume-streaming
streamingAgent.sources.spoolSource.fileHeader = true
# Describe the sink
# Describe the sink, the hostname must be one of the cluster worker node, and the port should be greater than 10000
streamingAgent.sinks.avroSink.type = avro
streamingAgent.sinks.avroSink.hostname=10.0.86.210
streamingAgent.sinks.avroSink.port=7777
streamingAgent.sinks.avroSink.hostname=slave1
streamingAgent.sinks.avroSink.port=10001
# Use a channel which buffers events in memory
streamingAgent.channels.channel.type = memory

View File

@ -15,7 +15,7 @@ class StreamingTest {
def testSockStreaming(): Unit ={
//parse flow json
val file = "src/main/resources/flow/flow_SocketTextStreamingByWindow.json"
val file = "src/main/resources/flow/flow_TextFileStreaming.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)

View File

@ -0,0 +1,81 @@
package cn.piflow.api
import org.apache.http.client.methods.{CloseableHttpResponse, HttpPost}
import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.HttpClients
import org.apache.http.util.EntityUtils
object HTTPClientStartFlowFlumeStreaming {
def main(args: Array[String]): Unit = {
//flume hostname must be one of the cluster worker node, the port should be greater than 10000
val json=
"""
|{
| "flow":{
| "name":"flumeStreaming",
| "uuid":"1234",
| "stops":[
| {
| "uuid":"1111",
| "name":"FlumeStream",
| "bundle":"cn.piflow.bundle.streaming.FlumeStream",
| "properties":{
| "hostname":"slave2",
| "port":"10002",
| "batchDuration":"5"
| }
|
| },
| {
| "uuid":"2222",
| "name":"ConvertSchema",
| "bundle":"cn.piflow.bundle.common.ConvertSchema",
| "properties":{
| "schema":"value->line"
| }
| },
| {
| "uuid":"3333",
| "name":"CsvSave",
| "bundle":"cn.piflow.bundle.csv.CsvSave",
| "properties":{
| "csvSavePath":"hdfs://10.0.86.89:9000/xjzhu/flowStreaming",
| "header":"true",
| "delimiter":","
| }
| }
| ],
| "paths":[
| {
| "from":"FlumeStream",
| "outport":"",
| "inport":"",
| "to":"ConvertSchema"
| },
| {
| "from":"ConvertSchema",
| "outport":"",
| "inport":"",
| "to":"CsvSave"
| }
| ]
| }
|}
""".stripMargin
val url = "http://10.0.86.98:8001/flow/start"
val client = HttpClients.createDefault()
val post:HttpPost = new HttpPost(url)
post.addHeader("Content-Type", "application/json")
post.setEntity(new StringEntity(json))
val response:CloseableHttpResponse = client.execute(post)
val entity = response.getEntity
val str = EntityUtils.toString(entity,"UTF-8")
println("Code is " + str)
}
}

View File

@ -0,0 +1,81 @@
package cn.piflow.api
import org.apache.http.client.methods.{CloseableHttpResponse, HttpPost}
import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.HttpClients
import org.apache.http.util.EntityUtils
object HTTPClientStartFlowKafkaStreaming {
def main(args: Array[String]): Unit = {
val json =
"""
|{
| "flow":{
| "name":"kafkaStreaming",
| "uuid":"1234",
| "stops":[
| {
| "uuid":"1111",
| "name":"kafkaStream",
| "bundle":"cn.piflow.bundle.streaming.KafkaStream",
| "properties":{
| "brokers":"10.0.86.191:9092,10.0.86.203:9092,10.0.86.210:9092",
| "groupId":"piflow",
| "topics":"streaming",
| "batchDuration":"5"
| }
|
| },
| {
| "uuid":"2222",
| "name":"ConvertSchema",
| "bundle":"cn.piflow.bundle.common.ConvertSchema",
| "properties":{
| "schema":"value->line"
| }
| },
| {
| "uuid":"3333",
| "name":"CsvSave",
| "bundle":"cn.piflow.bundle.csv.CsvSave",
| "properties":{
| "csvSavePath":"hdfs://10.0.86.89:9000/xjzhu/flowStreaming",
| "header":"true",
| "delimiter":","
| }
| }
| ],
| "paths":[
| {
| "from":"kafkaStream",
| "outport":"",
| "inport":"",
| "to":"ConvertSchema"
| },
| {
| "from":"ConvertSchema",
| "outport":"",
| "inport":"",
| "to":"CsvSave"
| }
| ]
| }
|}
""".stripMargin
val url = "http://10.0.86.98:8001/flow/start"
val client = HttpClients.createDefault()
val post:HttpPost = new HttpPost(url)
post.addHeader("Content-Type", "application/json")
post.setEntity(new StringEntity(json))
val response:CloseableHttpResponse = client.execute(post)
val entity = response.getEntity
val str = EntityUtils.toString(entity,"UTF-8")
println("Code is " + str)
}
}

View File

@ -0,0 +1,80 @@
package cn.piflow.api
import org.apache.http.client.methods.{CloseableHttpResponse, HttpPost}
import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.HttpClients
import org.apache.http.util.EntityUtils
object HTTPClientStartFlowSocketTextStreaming {
def main(args: Array[String]): Unit = {
val json =
"""
|{
| "flow":{
| "name":"socketStreaming",
| "uuid":"1234",
| "stops":[
| {
| "uuid":"1111",
| "name":"SocketTextStream",
| "bundle":"cn.piflow.bundle.streaming.SocketTextStream",
| "properties":{
| "hostname":"10.0.86.98",
| "port":"9999",
| "batchDuration":"5"
| }
|
| },
| {
| "uuid":"2222",
| "name":"ConvertSchema",
| "bundle":"cn.piflow.bundle.common.ConvertSchema",
| "properties":{
| "schema":"value->line"
| }
| },
| {
| "uuid":"3333",
| "name":"CsvSave",
| "bundle":"cn.piflow.bundle.csv.CsvSave",
| "properties":{
| "csvSavePath":"hdfs://10.0.86.89:9000/xjzhu/flowStreaming",
| "header":"true",
| "delimiter":","
| }
| }
| ],
| "paths":[
| {
| "from":"SocketTextStream",
| "outport":"",
| "inport":"",
| "to":"ConvertSchema"
| },
| {
| "from":"ConvertSchema",
| "outport":"",
| "inport":"",
| "to":"CsvSave"
| }
| ]
| }
|}
""".stripMargin
val url = "http://10.0.86.98:8001/flow/start"
val client = HttpClients.createDefault()
val post:HttpPost = new HttpPost(url)
post.addHeader("Content-Type", "application/json")
post.setEntity(new StringEntity(json))
val response:CloseableHttpResponse = client.execute(post)
val entity = response.getEntity
val str = EntityUtils.toString(entity,"UTF-8")
println("Code is " + str)
}
}

View File

@ -1,184 +0,0 @@
package cn.piflow.api
import org.apache.http.client.methods.{CloseableHttpResponse, HttpPost}
import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.HttpClients
import org.apache.http.util.EntityUtils
object HTTPClientStartFlowStreaming {
def main(args: Array[String]): Unit = {
/*val json =
"""
|{
| "flow":{
| "name":"kafkaStreaming",
| "uuid":"1234",
| "stops":[
| {
| "uuid":"1111",
| "name":"kafkaStream",
| "bundle":"cn.piflow.bundle.streaming.KafkaStream",
| "properties":{
| "brokers":"10.0.86.191:9092,10.0.86.203:9092,10.0.86.210:9092",
| "groupId":"piflow",
| "topics":"streaming"
| }
|
| },
| {
| "uuid":"2222",
| "name":"ConvertSchema",
| "bundle":"cn.piflow.bundle.common.ConvertSchema",
| "properties":{
| "schema":"value->line"
| }
| },
| {
| "uuid":"3333",
| "name":"CsvSave",
| "bundle":"cn.piflow.bundle.csv.CsvSave",
| "properties":{
| "csvSavePath":"hdfs://10.0.86.89:9000/xjzhu/flowStreaming",
| "header":"true",
| "delimiter":","
| }
| }
| ],
| "paths":[
| {
| "from":"kafkaStream",
| "outport":"",
| "inport":"",
| "to":"ConvertSchema"
| },
| {
| "from":"ConvertSchema",
| "outport":"",
| "inport":"",
| "to":"CsvSave"
| }
| ]
| }
|}
""".stripMargin*/
/*val json=
"""
|{
| "flow":{
| "name":"flumeStreaming",
| "uuid":"1234",
| "stops":[
| {
| "uuid":"1111",
| "name":"SocketTextStream",
| "bundle":"cn.piflow.bundle.streaming.FlumeStream",
| "properties":{
| "hostname":"10.0.86.210",
| "port":"7777"
| }
|
| },
| {
| "uuid":"2222",
| "name":"ConvertSchema",
| "bundle":"cn.piflow.bundle.common.ConvertSchema",
| "properties":{
| "schema":"value->line"
| }
| },
| {
| "uuid":"3333",
| "name":"CsvSave",
| "bundle":"cn.piflow.bundle.csv.CsvSave",
| "properties":{
| "csvSavePath":"hdfs://10.0.86.89:9000/xjzhu/flowStreaming",
| "header":"true",
| "delimiter":","
| }
| }
| ],
| "paths":[
| {
| "from":"SocketTextStream",
| "outport":"",
| "inport":"",
| "to":"ConvertSchema"
| },
| {
| "from":"ConvertSchema",
| "outport":"",
| "inport":"",
| "to":"CsvSave"
| }
| ]
| }
|}
""".stripMargin*/
val json =
"""
|{
| "flow":{
| "name":"TextFileStream",
| "uuid":"1234",
| "stops":[
| {
| "uuid":"1111",
| "name":"TextFileStream",
| "bundle":"cn.piflow.bundle.streaming.TextFileStream",
| "properties":{
| "directory":"hdfs://10.0.86.89:9000/textfilestream"
| }
|
| },
| {
| "uuid":"2222",
| "name":"ConvertSchema",
| "bundle":"cn.piflow.bundle.common.ConvertSchema",
| "properties":{
| "schema":"value->line"
| }
| },
| {
| "uuid":"3333",
| "name":"CsvSave",
| "bundle":"cn.piflow.bundle.csv.CsvSave",
| "properties":{
| "csvSavePath":"hdfs://10.0.86.89:9000/xjzhu/flowStreaming",
| "header":"true",
| "delimiter":","
| }
| }
| ],
| "paths":[
| {
| "from":"TextFileStream",
| "outport":"",
| "inport":"",
| "to":"ConvertSchema"
| },
| {
| "from":"ConvertSchema",
| "outport":"",
| "inport":"",
| "to":"CsvSave"
| }
| ]
| }
|}
""".stripMargin
val url = "http://10.0.86.98:8001/flow/start"
val client = HttpClients.createDefault()
val post:HttpPost = new HttpPost(url)
post.addHeader("Content-Type", "application/json")
post.setEntity(new StringEntity(json))
val response:CloseableHttpResponse = client.execute(post)
val entity = response.getEntity
val str = EntityUtils.toString(entity,"UTF-8")
println("Code is " + str)
}
}

View File

@ -0,0 +1,80 @@
package cn.piflow.api
import org.apache.http.client.methods.{CloseableHttpResponse, HttpPost}
import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.HttpClients
import org.apache.http.util.EntityUtils
object HTTPClientStartFlowTextFileStreaming {
def main(args: Array[String]): Unit = {
//mybe you should change the directory when no data received
val json =
"""
|{
| "flow":{
| "name":"TextFileStream",
| "uuid":"1234",
| "stops":[
| {
| "uuid":"1111",
| "name":"TextFileStream",
| "bundle":"cn.piflow.bundle.streaming.TextFileStream",
| "properties":{
| "directory":"hdfs://10.0.86.89:9000/textfilestream1",
| "batchDuration":"5"
| }
|
| },
| {
| "uuid":"2222",
| "name":"ConvertSchema",
| "bundle":"cn.piflow.bundle.common.ConvertSchema",
| "properties":{
| "schema":"value->line"
| }
| },
| {
| "uuid":"3333",
| "name":"CsvSave",
| "bundle":"cn.piflow.bundle.csv.CsvSave",
| "properties":{
| "csvSavePath":"hdfs://10.0.86.89:9000/xjzhu/flowStreaming",
| "header":"true",
| "delimiter":","
| }
| }
| ],
| "paths":[
| {
| "from":"TextFileStream",
| "outport":"",
| "inport":"",
| "to":"ConvertSchema"
| },
| {
| "from":"ConvertSchema",
| "outport":"",
| "inport":"",
| "to":"CsvSave"
| }
| ]
| }
|}
""".stripMargin
val url = "http://10.0.86.98:8001/flow/start"
val client = HttpClients.createDefault()
val post:HttpPost = new HttpPost(url)
post.addHeader("Content-Type", "application/json")
post.setEntity(new StringEntity(json))
val response:CloseableHttpResponse = client.execute(post)
val entity = response.getEntity
val str = EntityUtils.toString(entity,"UTF-8")
println("Code is " + str)
}
}

View File

@ -105,6 +105,20 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup
}
case HttpRequest(GET, Uri.Path("/flow/debugData"), headers, entity, protocol) => {
val processID = req.getUri().query().getOrElse("processID","")
val stopName = req.getUri().query().getOrElse("stopName","")
val port = req.getUri().query().getOrElse("port","default")
if(!processID.equals("") && !stopName.equals()){
val result = API.getFlowDebugData(processID, stopName, port)
Future.successful(HttpResponse(entity = result))
}else{
Future.successful(HttpResponse(entity = "processID is null or stop does not have debug data!"))
}
}
case HttpRequest(POST, Uri.Path("/flow/start"), headers, entity, protocol) =>{
entity match {

View File

@ -30,6 +30,7 @@ object StartFlowMain {
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path",PropertyUtil.getPropertyValue("checkpoint.path"))
.bind("debug.path",PropertyUtil.getPropertyValue("debug.path"))
.start(flow);
val applicationId = spark.sparkContext.applicationId
process.awaitTermination();

View File

@ -35,6 +35,23 @@ clean package -Dmaven.test.skip=true -U
kafka_2.11-2.1.1.jar
kafka-clients-2.1.1.jar
start kafka server: ./bin/kafka-server-start.sh -daemon config/server.properties
stop kafka server: ./bin/kafka-server-stop.sh
start kafka producer: ./bin/kafka-console-producer.sh --broker-list master:9092,slave1:9092,slave2:9092 --topic streaming
start kafka consumer: ./bin/kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181 --topic streaming
list topics:
./bin/kafka-topics.sh --list --zookeeper master:2181,slave1:2181,slave2:2181
./bin/kafka-topics.sh --list --zookeeper master:2181,slave1:2181,slave2:2181
create topics:
./bin/kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --replication-factor 3 --partictions 3 --topic newTopic
6.flume related jars are needed to put on the spark cluster
spark-streaming-flume_2.11-2.1.0.jar
start flume agent: bin/flume-ng agent -n streamingAgent -c conf -f conf/streaming.conf -Dflume.root.logger=INFO,console
7.socket text stream
nc -lk 9999