Merge remote-tracking branch 'origin/master'

This commit is contained in:
bao319 2020-04-02 18:02:52 +08:00
commit 11fb4168e3
6 changed files with 52 additions and 23 deletions

View File

@ -8,19 +8,19 @@
"name":"SelectHiveQL",
"bundle":"cn.piflow.bundle.hive.SelectHiveQL",
"properties":{
"hiveQL":"select * from test.user1"
"hiveQL":"select *,name as author from test.user1"
}
},
{
"uuid":"1111",
"name":"WriteToRedis",
"bundle":"cn.piflow.bundle.redis.WriteToRedis",
"name":"ReadFromRedis",
"bundle":"cn.piflow.bundle.redis.ReadFromRedis",
"properties":{
"redis_host":"10.0.88.9",
"redis_host":"192.168.3.121",
"port":"7000",
"password":"bigdata",
"column_name":"title",
"schema":"author,pages"
"column_name":"author",
"schema":"author"
}
}

View File

@ -8,7 +8,7 @@
"name":"SelectHiveQL",
"bundle":"cn.piflow.bundle.hive.SelectHiveQL",
"properties":{
"hiveQL":"select * from test.user1"
"hiveQL":"select id,name from test.user1"
}
},
{
@ -19,8 +19,7 @@
"redis_host":"192.168.3.138",
"port":"7000",
"password":"bigdata",
"column_name":"title",
"schema":"author,pages"
"column_name":"test"
}
}
],
@ -29,7 +28,7 @@
"from":"SelectHiveQL",
"outport":"",
"inport":"",
"to":"ReadFromRedis"
"to":"WriteToRedis"
}
]
}

View File

@ -25,14 +25,15 @@ class WriteToRedis extends ConfigurableStop{
val spark = pec.get[SparkSession]()
val df = in.read()
var col_name:String=column_name
df.printSchema()
println(df.schema)
//connect to redis
var jedisCluster=new JedisClusterImplSer(new HostAndPort(redis_host,port),password)
df.collect.foreach(row=> {
RedisUtil.manipulateRow(row,col_name,jedisCluster)
})
val v=jedisCluster.getJedisCluster.hkeys("Python")
val v=jedisCluster.getJedisCluster.hkeys("doi")
println(v)
}

View File

@ -7,19 +7,17 @@ import redis.clients.jedis.JedisCluster
import scala.util.control.Breaks.{break, breakable}
object RedisUtil extends Serializable {
def manipulateRow(row:Row,column_name:String,jedisClusterImplSer: JedisClusterImplSer):Unit={
var hm:util.HashMap[String,String]=new util.HashMap()
val key=row.getAs(column_name).asInstanceOf[String]
//row.schema.fields.foreach(f=>(if(!f.name.equals(column_name)&&row.getAs(f.name)!=null)hm.put(f.name,row.getAs(f.name).asInstanceOf[String])))
row.schema.fields.foreach(f=>{
if(!f.name.equals(column_name)){
if(row.getAs(f.name)==null)hm.put(f.name,"None")
else{
println(f.name+"---------"+row.getAs(f.name))
hm.put(f.name,row.getAs(f.name).asInstanceOf[String])
}
}
})
jedisClusterImplSer.getJedisCluster.hmset(key,hm)
jedisClusterImplSer.getJedisCluster.hmset(column_name,hm)
}
/**

View File

@ -6,7 +6,7 @@ import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.{PropertyUtil, ServerIpUtil}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.h2.tools.Server
import org.junit.Test
@ -55,4 +55,35 @@ class ReadFromRedisTest {
spark.close();
}
@Test
def testFlow1(): Unit ={
val spark = SparkSession.builder()
.master("local[*]")
.appName("SparkReadRedis")
.config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g")
.config("spark.cores.max", "2")
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.config("spark.redis.host","192.168.3.138")
.config("spark.redis.port", "7000")
.config("spark.redis.auth","bigdata") //指定redis密码
.config("spark.redis.db","0") //指定redis库
.enableHiveSupport()
.getOrCreate()
val df = spark.sql("select * from test.user1")
df.write
.format("org.apache.spark.sql.redis")
.option("table", "person")
.option("key.column", "name")
.mode(SaveMode.Overwrite)
.save()
}
}

View File

@ -18,7 +18,7 @@ class WriteToRedisTest {
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/redis/WritToRedis.json"
val file = "src/main/resources/flow/redis/WriteToRedis.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)