add redis group(WriteToRedis and ReadFromRedis)

This commit is contained in:
xiaoxiao 2018-09-27 12:16:24 +08:00
parent bb2ddc60d3
commit 7ae7e6a888
3 changed files with 207 additions and 0 deletions

View File

@ -0,0 +1,39 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"0000",
"name":"SelectHiveQL",
"bundle":"cn.piflow.bundle.hive.SelectHiveQL",
"properties":{
"hiveQL":"select * from sparktest.dblp_phdthesis"
}
},
{
"uuid":"1111",
"name":"ReadFromRedis",
"bundle":"cn.piflow.bundle.redis.ReadFromRedis",
"properties":{
"redis_host":"10.0.88.9",
"port":"7000",
"password":"bigdata",
"column_name":"title",
"schema":"author,pages"
}
}
],
"paths":[
{
"from":"SelectHiveQL",
"outport":"",
"inport":"",
"to":"ReadFromRedis"
}
]
}
}

View File

@ -0,0 +1,98 @@
package cn.piflow.bundle.redis
import java.util
import cn.piflow.bundle.util.{JedisClusterImplSer, RedisUtil}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.conf.{ConfigurableStop, FileGroup, RedisGroup, StopGroup}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.MapUtil
import org.apache.avro.generic.GenericData.StringType
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.{DataType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import redis.clients.jedis.{HostAndPort, Jedis, JedisCluster}
import scala.collection.mutable.ArrayBuffer
class ReadFromRedis extends ConfigurableStop{
val inportCount: Int = 1
val outportCount: Int = 0
var redis_host:String =_
var port:Int=_
var password:String=_
var column_name:String=_
var schema:String=_
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
var dfIn=in.read()
var colName=column_name
//connect to redis
val jedisCluster=new JedisClusterImplSer(new HostAndPort(redis_host,port),password)
//val keysArray:Array[String]=keys.split(",")
val fields:Array[String]=schema.split(",")
val col_str:String=column_name+","+schema
val newSchema:Array[String]=col_str.split(",")
//var res:List[List[String]]=List()
//import org.apache.spark.sql.types._
val dfSchema=StructType(newSchema.map(f=>StructField(f,org.apache.spark.sql.types.StringType,true)))
val newRDD=dfIn.rdd.map(line=>{
import spark.implicits._
val row=new ArrayBuffer[String]
val key=line.getAs[String](colName)
row += key
for(j<-0 until fields.length){
row += jedisCluster.getJedisCluster.hget(key,fields(j))
}
Row.fromSeq(row.toArray.toSeq)
})
//val df=spark.createDataFrame(newRDD,dfSchema)
//newRDD.show(20)
//out.write(df)
}
def initialize(ctx: ProcessContext): Unit = {
}
def setProperties(map: Map[String, Any]): Unit = {
redis_host=MapUtil.get(map,key="redis_host").asInstanceOf[String]
port=Integer.parseInt(MapUtil.get(map,key="port").toString)
password=MapUtil.get(map,key="password").asInstanceOf[String]
schema=MapUtil.get(map,key="schema").asInstanceOf[String]
column_name=MapUtil.get(map,key="column_name").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val redis_host = new PropertyDescriptor().name("redis_host").displayName("REDIS_HOST").defaultValue("").required(true)
val port = new PropertyDescriptor().name("port").displayName("PORT").defaultValue("").required(true)
val password = new PropertyDescriptor().name("password").displayName("PASSWORD").defaultValue("").required(true)
val schema = new PropertyDescriptor().name("schema").displayName("SCHEMA").defaultValue("").required(true)
val column_name = new PropertyDescriptor().name("column_name").displayName("COLUMN_NAME").defaultValue("").required(true)
descriptor = redis_host :: descriptor
descriptor = port :: descriptor
descriptor = password :: descriptor
descriptor = schema :: descriptor
descriptor = column_name :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = ???
override def getGroup(): StopGroup = {
RedisGroup
}
}

View File

@ -0,0 +1,70 @@
package cn.piflow.bundle.redis
import cn.piflow.bundle.util.{JedisClusterImplSer, RedisUtil}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.conf.{ConfigurableStop, FileGroup, RedisGroup, StopGroup}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.MapUtil
import org.apache.spark.sql.{DataFrame, SparkSession}
import redis.clients.jedis.{HostAndPort}
class WriteToRedis extends ConfigurableStop{
val inportCount: Int = 1
val outportCount: Int = 0
var redis_host:String =_
var port:Int=_
var password:String=_
var column_name:String=_
//var schema_str:String=_
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
val df = in.read()
var col_name:String=column_name
println(df.schema)
//connect to redis
var jedisCluster=new JedisClusterImplSer(new HostAndPort(redis_host,port),password)
df.collect.foreach(row=> {
RedisUtil.manipulateRow(row,col_name,jedisCluster)
})
//val v=jedisCluster.getJedisCluster.hmget("Python","author","pages")
val v=jedisCluster.getJedisCluster.hkeys("Python")
println(v)
}
def initialize(ctx: ProcessContext): Unit = {
}
def setProperties(map: Map[String, Any]): Unit = {
redis_host=MapUtil.get(map,key="redis_host").asInstanceOf[String]
port=Integer.parseInt(MapUtil.get(map,key="port").toString)
password=MapUtil.get(map,key="password").asInstanceOf[String]
column_name=MapUtil.get(map,key="column_name").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val redis_host = new PropertyDescriptor().name("redis_host").displayName("REDIS_HOST").defaultValue("").required(true)
val port = new PropertyDescriptor().name("port").displayName("PORT").defaultValue("").required(true)
val password = new PropertyDescriptor().name("password").displayName("PASSWORD").defaultValue("").required(true)
val column_name = new PropertyDescriptor().name("column_name").displayName("COLUMN_NAME").defaultValue("").required(true)
descriptor = redis_host :: descriptor
descriptor = port :: descriptor
descriptor = password :: descriptor
descriptor = column_name :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = ???
override def getGroup(): StopGroup = {
RedisGroup
}
}