From 1b63b70c4d13591fe3076091fa2c8ddab5f8f336 Mon Sep 17 00:00:00 2001 From: xiaoxiao Date: Fri, 28 Sep 2018 13:31:27 +0800 Subject: [PATCH] alter getGroup method and add authorEmail for stops --- .../piflow/bundle/redis/ReadFromRedis.scala | 8 +-- .../cn/piflow/bundle/redis/WriteToRedis.scala | 10 ++-- .../bundle/util/JedisClusterImplSer.java | 54 ++++++++++++++++++- .../cn/piflow/bundle/util/RedisUtil.scala | 19 ++++++- .../main/scala/cn/piflow/conf/StopGroup.scala | 3 ++ .../scala/cn/piflow/conf/StopGroupEnum.scala | 2 + 6 files changed, 85 insertions(+), 11 deletions(-) diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/redis/ReadFromRedis.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/redis/ReadFromRedis.scala index b87d118..8befb5c 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/redis/ReadFromRedis.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/redis/ReadFromRedis.scala @@ -5,7 +5,7 @@ import java.util import cn.piflow.bundle.util.{JedisClusterImplSer, RedisUtil} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} -import cn.piflow.conf.{ConfigurableStop, FileGroup, RedisGroup, StopGroup} +import cn.piflow.conf._ import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.util.MapUtil import org.apache.avro.generic.GenericData.StringType @@ -90,9 +90,9 @@ class ReadFromRedis extends ConfigurableStop{ override def getIcon(): Array[Byte] = ??? - override def getGroup(): StopGroup = { - RedisGroup + override def getGroup(): List[String] = { + List(StopGroupEnum.RedisGroup.toString) } - + override val authorEmail: String = "xiaoxiao@cnic.cn" } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/redis/WriteToRedis.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/redis/WriteToRedis.scala index 64c018c..589f75c 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/redis/WriteToRedis.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/redis/WriteToRedis.scala @@ -3,11 +3,11 @@ package cn.piflow.bundle.redis import cn.piflow.bundle.util.{JedisClusterImplSer, RedisUtil} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} -import cn.piflow.conf.{ConfigurableStop, FileGroup, RedisGroup, StopGroup} +import cn.piflow.conf._ import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.util.MapUtil import org.apache.spark.sql.{DataFrame, SparkSession} -import redis.clients.jedis.{HostAndPort} +import redis.clients.jedis.HostAndPort class WriteToRedis extends ConfigurableStop{ @@ -62,9 +62,9 @@ class WriteToRedis extends ConfigurableStop{ override def getIcon(): Array[Byte] = ??? - override def getGroup(): StopGroup = { - RedisGroup + override def getGroup(): List[String] = { + List(StopGroupEnum.RedisGroup.toString) } - + override val authorEmail: String = "xiaoxiao@cnic.cn" } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/util/JedisClusterImplSer.java b/piflow-bundle/src/main/scala/cn/piflow/bundle/util/JedisClusterImplSer.java index ee2eb97..8c9206b 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/util/JedisClusterImplSer.java +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/util/JedisClusterImplSer.java @@ -1,4 +1,56 @@ package cn.piflow.bundle.util; -public class JedisClusterImplSer { +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.JedisPoolConfig; + +import java.io.IOException; +import java.io.ObjectStreamException; +import java.io.Serializable; + +public class JedisClusterImplSer implements Serializable { + + private static final long serialVersionUID = -51L; + + private static final int DEFAULT_TIMEOUT = 2000; + private static final int DEFAULT_REDIRECTIONS = 5; + private static final JedisPoolConfig DEFAULT_CONFIG = new JedisPoolConfig(); + + private HostAndPort hostAndPort; + private String password; + transient private JedisCluster jedisCluster; + + /*public JedisClusterImplSer(HostAndPort hostAndPort) { + this.hostAndPort = hostAndPort; + this.jedisCluster = new JedisCluster(hostAndPort, 3000000); + }*/ + + public JedisClusterImplSer(HostAndPort hostAndPort, String password){ + this.hostAndPort = hostAndPort; + this.password = password; + this.jedisCluster = new JedisCluster(hostAndPort,DEFAULT_TIMEOUT,DEFAULT_TIMEOUT,DEFAULT_REDIRECTIONS,this.password,DEFAULT_CONFIG); + } + + private void writeObject(java.io.ObjectOutputStream out) throws IOException { + out.defaultWriteObject(); + } + + private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException{ + in.defaultReadObject(); + //setJedisCluster(new JedisCluster(hostAndPort)); + setJedisCluster(new JedisCluster(hostAndPort,DEFAULT_TIMEOUT,DEFAULT_TIMEOUT,DEFAULT_REDIRECTIONS,this.password,DEFAULT_CONFIG)); + } + + private void readObjectNoData() throws ObjectStreamException { + + } + + public JedisCluster getJedisCluster() { + if (jedisCluster == null) this.jedisCluster = new JedisCluster(hostAndPort,DEFAULT_TIMEOUT,DEFAULT_TIMEOUT,DEFAULT_REDIRECTIONS,this.password,DEFAULT_CONFIG); + return jedisCluster; + } + + private void setJedisCluster(JedisCluster jedisCluster) { + this.jedisCluster = jedisCluster; + } } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/util/RedisUtil.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/util/RedisUtil.scala index 9c75be0..e0035e4 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/util/RedisUtil.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/util/RedisUtil.scala @@ -1,5 +1,22 @@ package cn.piflow.bundle.util +import java.util -class RedisUtil { +import org.apache.spark.sql.{Dataset, Row} +object RedisUtil extends Serializable { + def manipulateRow(row:Row,column_name:String,jedisClusterImplSer: JedisClusterImplSer):Unit={ + var hm:util.HashMap[String,String]=new util.HashMap() + val key=row.getAs(column_name).asInstanceOf[String] + //row.schema.fields.foreach(f=>(if(!f.name.equals(column_name)&&row.getAs(f.name)!=null)hm.put(f.name,row.getAs(f.name).asInstanceOf[String]))) + row.schema.fields.foreach(f=>{ + if(!f.name.equals(column_name)){ + if(row.getAs(f.name)==null)hm.put(f.name,"None") + else{ + hm.put(f.name,row.getAs(f.name).asInstanceOf[String]) + } + } + }) + jedisClusterImplSer.getJedisCluster.hmset(key,hm) + + } } diff --git a/piflow-bundle/src/main/scala/cn/piflow/conf/StopGroup.scala b/piflow-bundle/src/main/scala/cn/piflow/conf/StopGroup.scala index 6cf879b..034ea84 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/conf/StopGroup.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/conf/StopGroup.scala @@ -13,6 +13,9 @@ case object FtpGroup extends StopGroup case object ScriptGroup extends StopGroup case object FileGroup extends StopGroup case object CleanGroup extends StopGroup +case object RedisGroup extends StopGroup +case object KafkaGroup extends StopGroup + object StopGroup{ def findAllGroup(): List[String] ={ diff --git a/piflow-bundle/src/main/scala/cn/piflow/conf/StopGroupEnum.scala b/piflow-bundle/src/main/scala/cn/piflow/conf/StopGroupEnum.scala index 8761aad..1bf168f 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/conf/StopGroupEnum.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/conf/StopGroupEnum.scala @@ -14,5 +14,7 @@ object StopGroupEnum extends Enumeration { val ScriptGroup = Value("ScriptGroup") val FileGroup = Value("FileGroup") val CleanGroup = Value("CleanGroup") + val KafkaGroup = Value("kafkaGroup") + val RedisGroup = Value("RedisGroup") }