add ftp group(LoadFromFtp and UploadToFtp)

This commit is contained in:
xiaoxiao 2018-09-28 13:24:32 +08:00
parent 97e2600fb8
commit 952f0cefea
4 changed files with 121 additions and 18 deletions

View File

@ -67,6 +67,20 @@
<version>0.4.2</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>
</dependencies>
<build>

View File

@ -14,14 +14,22 @@
},
{
"uuid":"1111",
"name":"ReadFromRedis",
"bundle":"cn.piflow.bundle.redis.ReadFromRedis",
"name":"WriteToKafka",
"bundle":"cn.piflow.bundle.kafka.WriteToKafka",
"properties":{
"redis_host":"10.0.88.9",
"port":"7000",
"password":"bigdata",
"column_name":"title",
"schema":"author,pages"
"kafka_host":"10.0.86.93:9092,10.0.86.94:9092,10.0.86.95:9092",
"topic":"test_topic"
}
},
{
"uuid":"2222",
"name":"ReadFromKafka",
"bundle":"cn.piflow.bundle.kafka.ReadFromKafka",
"properties":{
"kafka_host":"10.0.86.93:9092,10.0.86.94:9092,10.0.86.95:9092",
"topic":"test_topic",
"schema":"title,author,pages"
}
}
@ -32,8 +40,15 @@
"from":"SelectHiveQL",
"outport":"",
"inport":"",
"to":"ReadFromRedis"
"to":"WriteToKafka"
},
{
"from":"WriteToKafka",
"outport":"",
"inport":"",
"to":"ReadFromKafka"
}
]
}
}

View File

@ -42,13 +42,10 @@ class ReadFromKafka extends ConfigurableStop{
//properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
//var res:List[Array[String]]=List()
var res:List[Row]=List()
//val fields:Array[String]=schema.split(",")
//val newSchema:Array[String]=col_str.split(",")
var res:List[Array[String]]=List()
//import org.apache.spark.sql.types._
val dfSchema=StructType(schema.split(",").map(f=>StructField(f,org.apache.spark.sql.types.StringType,true)))
val consumer = new KafkaConsumer[String,String](properties)
@ -57,14 +54,13 @@ class ReadFromKafka extends ConfigurableStop{
val it=records.iterator()
while(it.hasNext){
//println(it.next().value())
val row=it.next().value().split(",")
val row=Row.fromSeq(it.next().value().split(",").toSeq)
res = row::res
}
import spark.implicits._
//val schemaArr = schema.split(",")
val rdd=spark.sparkContext.parallelize(res)
val newRdd=rdd.map(line=>Row.fromSeq(line.toSeq))
val df=spark.sqlContext.createDataFrame(newRdd,dfSchema)
//val newRdd=rdd.map(line=>Row.fromSeq(line.toSeq))
val df=spark.sqlContext.createDataFrame(rdd,dfSchema)
df.show(20)
out.write(df)
}

View File

@ -1,5 +1,83 @@
package cn.piflow.bundle.kafka
class WriteToKafka {
import java.util
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.conf._
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.MapUtil
import java.util.Properties
import org.apache.spark.sql.SparkSession
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.clients.producer.ProducerRecord
import scala.collection.mutable
class WriteToKafka extends ConfigurableStop{
val inportCount: Int = 1
val outportCount: Int = 0
var kafka_host:String =_
var topic:String=_
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
val df = in.read()
val properties:Properties = new Properties()
properties.put("bootstrap.servers", kafka_host)
properties.put("acks", "all")
//properties.put("retries", 0)
//properties.put("batch.size", 16384)
//properties.put("linger.ms", 1)
//properties.put("buffer.memory", 33554432)
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
var producer:Producer[String,String] = new KafkaProducer[String,String](properties)
df.collect().foreach(row=>{
//var hm:util.HashMap[String,String]=new util.HashMap()
//row.schema.fields.foreach(f=>(if(!f.name.equals(column_name)&&row.getAs(f.name)!=null)hm.put(f.name,row.getAs(f.name).asInstanceOf[String])))
var res:List[String]=List()
row.schema.fields.foreach(f=>{
if(row.getAs(f.name)==null)res="None"::res
else{
res=row.getAs(f.name).asInstanceOf[String]::res
}
})
val s:String=res.reverse.mkString(",")
val record=new ProducerRecord[String,String](topic,s)
producer.send(record)
})
producer.close()
}
def initialize(ctx: ProcessContext): Unit = {
}
def setProperties(map: Map[String, Any]): Unit = {
kafka_host=MapUtil.get(map,key="kafka_host").asInstanceOf[String]
//port=Integer.parseInt(MapUtil.get(map,key="port").toString)
topic=MapUtil.get(map,key="topic").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val kafka_host = new PropertyDescriptor().name("kafka_host").displayName("KAFKA_HOST").defaultValue("").required(true)
val topic = new PropertyDescriptor().name("topic").displayName("TOPIC").defaultValue("").required(true)
descriptor = kafka_host :: descriptor
descriptor = topic :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = ???
override def getGroup(): List[String] = {
List(StopGroupEnum.KafkaGroup.toString)
}
override val authorEmail: String = "xiaoxiao@cnic.cn"
}