forked from opensci/piflow
fix the bug with data consumption from kafka
This commit is contained in:
parent
1b63b70c4d
commit
a987093467
|
@ -18,7 +18,7 @@
|
|||
"bundle":"cn.piflow.bundle.kafka.WriteToKafka",
|
||||
"properties":{
|
||||
"kafka_host":"10.0.86.93:9092,10.0.86.94:9092,10.0.86.95:9092",
|
||||
"topic":"test_topic"
|
||||
"topic":"test_topic1"
|
||||
}
|
||||
|
||||
},
|
||||
|
@ -28,7 +28,7 @@
|
|||
"bundle":"cn.piflow.bundle.kafka.ReadFromKafka",
|
||||
"properties":{
|
||||
"kafka_host":"10.0.86.93:9092,10.0.86.94:9092,10.0.86.95:9092",
|
||||
"topic":"test_topic",
|
||||
"topic":"test_topic1",
|
||||
"schema":"title,author,pages"
|
||||
}
|
||||
|
||||
|
|
|
@ -32,7 +32,10 @@ class ReadFromKafka extends ConfigurableStop{
|
|||
import org.apache.kafka.clients.producer.ProducerConfig
|
||||
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka_host)
|
||||
properties.put("acks", "all")
|
||||
properties.put("group.id","hhhhh")
|
||||
properties.put("group.id","mmmm")
|
||||
properties.put("enable.auto.commit","true")
|
||||
properties.put("max.poll.records","1000")
|
||||
properties.put("auto.offset.reset","earliest")
|
||||
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
|
||||
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
|
||||
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
|
||||
|
@ -43,15 +46,16 @@ class ReadFromKafka extends ConfigurableStop{
|
|||
|
||||
|
||||
//var res:List[Array[String]]=List()
|
||||
//val topicName=topic
|
||||
var res:List[Row]=List()
|
||||
|
||||
|
||||
val dfSchema=StructType(schema.split(",").map(f=>StructField(f,org.apache.spark.sql.types.StringType,true)))
|
||||
|
||||
val consumer = new KafkaConsumer[String,String](properties)
|
||||
consumer.subscribe(Collections.singleton(topic))
|
||||
val records:ConsumerRecords[String,String] = consumer.poll(100)
|
||||
val it=records.iterator()
|
||||
consumer.subscribe(java.util.Arrays.asList(topic,"finally"))
|
||||
val records:ConsumerRecords[String,String] = consumer.poll(1000)
|
||||
val it=records.records(topic).iterator()
|
||||
while(it.hasNext){
|
||||
//println(it.next().value())
|
||||
val row=Row.fromSeq(it.next().value().split(",").toSeq)
|
||||
|
|
Loading…
Reference in New Issue