fix a bug for ReadFromKafka Stop
This commit is contained in:
parent
caf919bae5
commit
97e2600fb8
|
@ -32,7 +32,7 @@ class ReadFromKafka extends ConfigurableStop{
|
|||
import org.apache.kafka.clients.producer.ProducerConfig
|
||||
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka_host)
|
||||
properties.put("acks", "all")
|
||||
properties.put("group.id","b")
|
||||
properties.put("group.id","hhhhh")
|
||||
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
|
||||
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
|
||||
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
|
||||
|
@ -49,7 +49,7 @@ class ReadFromKafka extends ConfigurableStop{
|
|||
var res:List[Array[String]]=List()
|
||||
|
||||
//import org.apache.spark.sql.types._
|
||||
//val dfSchema=StructType(newSchema.map(f=>StructField(f,org.apache.spark.sql.types.StringType,true)))
|
||||
val dfSchema=StructType(schema.split(",").map(f=>StructField(f,org.apache.spark.sql.types.StringType,true)))
|
||||
|
||||
val consumer = new KafkaConsumer[String,String](properties)
|
||||
consumer.subscribe(Collections.singleton(topic))
|
||||
|
@ -61,10 +61,12 @@ class ReadFromKafka extends ConfigurableStop{
|
|||
res = row::res
|
||||
}
|
||||
import spark.implicits._
|
||||
val schemaArr = schema.split(",")
|
||||
val newDF=spark.sparkContext.parallelize(res).toDF(schemaArr:_*)
|
||||
newDF.show(20)
|
||||
out.write(newDF)
|
||||
//val schemaArr = schema.split(",")
|
||||
val rdd=spark.sparkContext.parallelize(res)
|
||||
val newRdd=rdd.map(line=>Row.fromSeq(line.toSeq))
|
||||
val df=spark.sqlContext.createDataFrame(newRdd,dfSchema)
|
||||
df.show(20)
|
||||
out.write(df)
|
||||
}
|
||||
def initialize(ctx: ProcessContext): Unit = {
|
||||
|
||||
|
@ -74,14 +76,17 @@ class ReadFromKafka extends ConfigurableStop{
|
|||
def setProperties(map: Map[String, Any]): Unit = {
|
||||
kafka_host=MapUtil.get(map,key="kafka_host").asInstanceOf[String]
|
||||
topic=MapUtil.get(map,key="topic").asInstanceOf[String]
|
||||
schema=MapUtil.get(map,key="schema").asInstanceOf[String]
|
||||
}
|
||||
|
||||
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
|
||||
var descriptor : List[PropertyDescriptor] = List()
|
||||
val kafka_host = new PropertyDescriptor().name("kafka_host").displayName("KAFKA_HOST").defaultValue("").required(true)
|
||||
val topic = new PropertyDescriptor().name("topic").displayName("TOPIC").defaultValue("").required(true)
|
||||
val schema = new PropertyDescriptor().name("schema").displayName("SCHEMA").defaultValue("").required(true)
|
||||
descriptor = kafka_host :: descriptor
|
||||
descriptor = topic :: descriptor
|
||||
descriptor = schema :: descriptor
|
||||
descriptor
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue