forked from opensci/piflow
alter getGroup method and add authorEmail for stops
This commit is contained in:
parent
952f0cefea
commit
1b63b70c4d
|
@ -5,7 +5,7 @@ import java.util
|
||||||
|
|
||||||
import cn.piflow.bundle.util.{JedisClusterImplSer, RedisUtil}
|
import cn.piflow.bundle.util.{JedisClusterImplSer, RedisUtil}
|
||||||
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
|
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
|
||||||
import cn.piflow.conf.{ConfigurableStop, FileGroup, RedisGroup, StopGroup}
|
import cn.piflow.conf._
|
||||||
import cn.piflow.conf.bean.PropertyDescriptor
|
import cn.piflow.conf.bean.PropertyDescriptor
|
||||||
import cn.piflow.conf.util.MapUtil
|
import cn.piflow.conf.util.MapUtil
|
||||||
import org.apache.avro.generic.GenericData.StringType
|
import org.apache.avro.generic.GenericData.StringType
|
||||||
|
@ -90,9 +90,9 @@ class ReadFromRedis extends ConfigurableStop{
|
||||||
|
|
||||||
override def getIcon(): Array[Byte] = ???
|
override def getIcon(): Array[Byte] = ???
|
||||||
|
|
||||||
override def getGroup(): StopGroup = {
|
override def getGroup(): List[String] = {
|
||||||
RedisGroup
|
List(StopGroupEnum.RedisGroup.toString)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override val authorEmail: String = "xiaoxiao@cnic.cn"
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,11 +3,11 @@ package cn.piflow.bundle.redis
|
||||||
|
|
||||||
import cn.piflow.bundle.util.{JedisClusterImplSer, RedisUtil}
|
import cn.piflow.bundle.util.{JedisClusterImplSer, RedisUtil}
|
||||||
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
|
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
|
||||||
import cn.piflow.conf.{ConfigurableStop, FileGroup, RedisGroup, StopGroup}
|
import cn.piflow.conf._
|
||||||
import cn.piflow.conf.bean.PropertyDescriptor
|
import cn.piflow.conf.bean.PropertyDescriptor
|
||||||
import cn.piflow.conf.util.MapUtil
|
import cn.piflow.conf.util.MapUtil
|
||||||
import org.apache.spark.sql.{DataFrame, SparkSession}
|
import org.apache.spark.sql.{DataFrame, SparkSession}
|
||||||
import redis.clients.jedis.{HostAndPort}
|
import redis.clients.jedis.HostAndPort
|
||||||
|
|
||||||
|
|
||||||
class WriteToRedis extends ConfigurableStop{
|
class WriteToRedis extends ConfigurableStop{
|
||||||
|
@ -62,9 +62,9 @@ class WriteToRedis extends ConfigurableStop{
|
||||||
|
|
||||||
override def getIcon(): Array[Byte] = ???
|
override def getIcon(): Array[Byte] = ???
|
||||||
|
|
||||||
override def getGroup(): StopGroup = {
|
override def getGroup(): List[String] = {
|
||||||
RedisGroup
|
List(StopGroupEnum.RedisGroup.toString)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override val authorEmail: String = "xiaoxiao@cnic.cn"
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,4 +1,56 @@
|
||||||
package cn.piflow.bundle.util;
|
package cn.piflow.bundle.util;
|
||||||
|
|
||||||
public class JedisClusterImplSer {
|
import redis.clients.jedis.HostAndPort;
|
||||||
|
import redis.clients.jedis.JedisCluster;
|
||||||
|
import redis.clients.jedis.JedisPoolConfig;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.ObjectStreamException;
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
public class JedisClusterImplSer implements Serializable {
|
||||||
|
|
||||||
|
private static final long serialVersionUID = -51L;
|
||||||
|
|
||||||
|
private static final int DEFAULT_TIMEOUT = 2000;
|
||||||
|
private static final int DEFAULT_REDIRECTIONS = 5;
|
||||||
|
private static final JedisPoolConfig DEFAULT_CONFIG = new JedisPoolConfig();
|
||||||
|
|
||||||
|
private HostAndPort hostAndPort;
|
||||||
|
private String password;
|
||||||
|
transient private JedisCluster jedisCluster;
|
||||||
|
|
||||||
|
/*public JedisClusterImplSer(HostAndPort hostAndPort) {
|
||||||
|
this.hostAndPort = hostAndPort;
|
||||||
|
this.jedisCluster = new JedisCluster(hostAndPort, 3000000);
|
||||||
|
}*/
|
||||||
|
|
||||||
|
public JedisClusterImplSer(HostAndPort hostAndPort, String password){
|
||||||
|
this.hostAndPort = hostAndPort;
|
||||||
|
this.password = password;
|
||||||
|
this.jedisCluster = new JedisCluster(hostAndPort,DEFAULT_TIMEOUT,DEFAULT_TIMEOUT,DEFAULT_REDIRECTIONS,this.password,DEFAULT_CONFIG);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void writeObject(java.io.ObjectOutputStream out) throws IOException {
|
||||||
|
out.defaultWriteObject();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException{
|
||||||
|
in.defaultReadObject();
|
||||||
|
//setJedisCluster(new JedisCluster(hostAndPort));
|
||||||
|
setJedisCluster(new JedisCluster(hostAndPort,DEFAULT_TIMEOUT,DEFAULT_TIMEOUT,DEFAULT_REDIRECTIONS,this.password,DEFAULT_CONFIG));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void readObjectNoData() throws ObjectStreamException {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public JedisCluster getJedisCluster() {
|
||||||
|
if (jedisCluster == null) this.jedisCluster = new JedisCluster(hostAndPort,DEFAULT_TIMEOUT,DEFAULT_TIMEOUT,DEFAULT_REDIRECTIONS,this.password,DEFAULT_CONFIG);
|
||||||
|
return jedisCluster;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setJedisCluster(JedisCluster jedisCluster) {
|
||||||
|
this.jedisCluster = jedisCluster;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,22 @@
|
||||||
package cn.piflow.bundle.util
|
package cn.piflow.bundle.util
|
||||||
|
import java.util
|
||||||
|
|
||||||
class RedisUtil {
|
import org.apache.spark.sql.{Dataset, Row}
|
||||||
|
object RedisUtil extends Serializable {
|
||||||
|
def manipulateRow(row:Row,column_name:String,jedisClusterImplSer: JedisClusterImplSer):Unit={
|
||||||
|
var hm:util.HashMap[String,String]=new util.HashMap()
|
||||||
|
val key=row.getAs(column_name).asInstanceOf[String]
|
||||||
|
//row.schema.fields.foreach(f=>(if(!f.name.equals(column_name)&&row.getAs(f.name)!=null)hm.put(f.name,row.getAs(f.name).asInstanceOf[String])))
|
||||||
|
row.schema.fields.foreach(f=>{
|
||||||
|
if(!f.name.equals(column_name)){
|
||||||
|
if(row.getAs(f.name)==null)hm.put(f.name,"None")
|
||||||
|
else{
|
||||||
|
hm.put(f.name,row.getAs(f.name).asInstanceOf[String])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
jedisClusterImplSer.getJedisCluster.hmset(key,hm)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,6 +13,9 @@ case object FtpGroup extends StopGroup
|
||||||
case object ScriptGroup extends StopGroup
|
case object ScriptGroup extends StopGroup
|
||||||
case object FileGroup extends StopGroup
|
case object FileGroup extends StopGroup
|
||||||
case object CleanGroup extends StopGroup
|
case object CleanGroup extends StopGroup
|
||||||
|
case object RedisGroup extends StopGroup
|
||||||
|
case object KafkaGroup extends StopGroup
|
||||||
|
|
||||||
|
|
||||||
object StopGroup{
|
object StopGroup{
|
||||||
def findAllGroup(): List[String] ={
|
def findAllGroup(): List[String] ={
|
||||||
|
|
|
@ -14,5 +14,7 @@ object StopGroupEnum extends Enumeration {
|
||||||
val ScriptGroup = Value("ScriptGroup")
|
val ScriptGroup = Value("ScriptGroup")
|
||||||
val FileGroup = Value("FileGroup")
|
val FileGroup = Value("FileGroup")
|
||||||
val CleanGroup = Value("CleanGroup")
|
val CleanGroup = Value("CleanGroup")
|
||||||
|
val KafkaGroup = Value("kafkaGroup")
|
||||||
|
val RedisGroup = Value("RedisGroup")
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue