UPDATE STOP
This commit is contained in:
parent
ecbe42dc00
commit
e815bed7e2
|
@ -8,19 +8,19 @@
|
|||
"name":"SelectHiveQL",
|
||||
"bundle":"cn.piflow.bundle.hive.SelectHiveQL",
|
||||
"properties":{
|
||||
"hiveQL":"select * from test.user1"
|
||||
"hiveQL":"select *,name as author from test.user1"
|
||||
}
|
||||
},
|
||||
{
|
||||
"uuid":"1111",
|
||||
"name":"WriteToRedis",
|
||||
"bundle":"cn.piflow.bundle.redis.WriteToRedis",
|
||||
"name":"ReadFromRedis",
|
||||
"bundle":"cn.piflow.bundle.redis.ReadFromRedis",
|
||||
"properties":{
|
||||
"redis_host":"10.0.88.9",
|
||||
"redis_host":"192.168.3.121",
|
||||
"port":"7000",
|
||||
"password":"bigdata",
|
||||
"column_name":"title",
|
||||
"schema":"author,pages"
|
||||
"column_name":"author",
|
||||
"schema":"author"
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -8,7 +8,7 @@
|
|||
"name":"SelectHiveQL",
|
||||
"bundle":"cn.piflow.bundle.hive.SelectHiveQL",
|
||||
"properties":{
|
||||
"hiveQL":"select * from test.user1"
|
||||
"hiveQL":"select id,name from test.user1"
|
||||
}
|
||||
},
|
||||
{
|
||||
|
@ -19,8 +19,7 @@
|
|||
"redis_host":"192.168.3.138",
|
||||
"port":"7000",
|
||||
"password":"bigdata",
|
||||
"column_name":"title",
|
||||
"schema":"author,pages"
|
||||
"column_name":"test"
|
||||
}
|
||||
}
|
||||
],
|
||||
|
@ -29,7 +28,7 @@
|
|||
"from":"SelectHiveQL",
|
||||
"outport":"",
|
||||
"inport":"",
|
||||
"to":"ReadFromRedis"
|
||||
"to":"WriteToRedis"
|
||||
}
|
||||
]
|
||||
}
|
||||
|
|
|
@ -25,14 +25,15 @@ class WriteToRedis extends ConfigurableStop{
|
|||
val spark = pec.get[SparkSession]()
|
||||
val df = in.read()
|
||||
var col_name:String=column_name
|
||||
df.printSchema()
|
||||
|
||||
println(df.schema)
|
||||
//connect to redis
|
||||
var jedisCluster=new JedisClusterImplSer(new HostAndPort(redis_host,port),password)
|
||||
|
||||
df.collect.foreach(row=> {
|
||||
RedisUtil.manipulateRow(row,col_name,jedisCluster)
|
||||
})
|
||||
val v=jedisCluster.getJedisCluster.hkeys("Python")
|
||||
val v=jedisCluster.getJedisCluster.hkeys("doi")
|
||||
println(v)
|
||||
}
|
||||
|
||||
|
|
|
@ -7,19 +7,17 @@ import redis.clients.jedis.JedisCluster
|
|||
|
||||
import scala.util.control.Breaks.{break, breakable}
|
||||
object RedisUtil extends Serializable {
|
||||
|
||||
def manipulateRow(row:Row,column_name:String,jedisClusterImplSer: JedisClusterImplSer):Unit={
|
||||
var hm:util.HashMap[String,String]=new util.HashMap()
|
||||
val key=row.getAs(column_name).asInstanceOf[String]
|
||||
//row.schema.fields.foreach(f=>(if(!f.name.equals(column_name)&&row.getAs(f.name)!=null)hm.put(f.name,row.getAs(f.name).asInstanceOf[String])))
|
||||
|
||||
row.schema.fields.foreach(f=>{
|
||||
if(!f.name.equals(column_name)){
|
||||
if(row.getAs(f.name)==null)hm.put(f.name,"None")
|
||||
else{
|
||||
hm.put(f.name,row.getAs(f.name).asInstanceOf[String])
|
||||
}
|
||||
}
|
||||
println(f.name+"---------"+row.getAs(f.name))
|
||||
hm.put(f.name,row.getAs(f.name).asInstanceOf[String])
|
||||
})
|
||||
jedisClusterImplSer.getJedisCluster.hmset(key,hm)
|
||||
|
||||
|
||||
jedisClusterImplSer.getJedisCluster.hmset(column_name,hm)
|
||||
|
||||
}
|
||||
/**
|
||||
|
|
|
@ -6,7 +6,7 @@ import cn.piflow.Runner
|
|||
import cn.piflow.conf.bean.FlowBean
|
||||
import cn.piflow.conf.util.{FileUtil, OptionUtil}
|
||||
import cn.piflow.util.{PropertyUtil, ServerIpUtil}
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.apache.spark.sql.{SaveMode, SparkSession}
|
||||
import org.h2.tools.Server
|
||||
import org.junit.Test
|
||||
|
||||
|
@ -55,4 +55,35 @@ class ReadFromRedisTest {
|
|||
spark.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
def testFlow1(): Unit ={
|
||||
|
||||
val spark = SparkSession.builder()
|
||||
.master("local[*]")
|
||||
.appName("SparkReadRedis")
|
||||
.config("spark.driver.memory", "1g")
|
||||
.config("spark.executor.memory", "2g")
|
||||
.config("spark.cores.max", "2")
|
||||
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
|
||||
|
||||
.config("spark.redis.host","192.168.3.138")
|
||||
.config("spark.redis.port", "7000")
|
||||
.config("spark.redis.auth","bigdata") //指定redis密码
|
||||
.config("spark.redis.db","0") //指定redis库
|
||||
.enableHiveSupport()
|
||||
.getOrCreate()
|
||||
|
||||
|
||||
val df = spark.sql("select * from test.user1")
|
||||
|
||||
df.write
|
||||
.format("org.apache.spark.sql.redis")
|
||||
.option("table", "person")
|
||||
.option("key.column", "name")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.save()
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -18,7 +18,7 @@ class WriteToRedisTest {
|
|||
def testFlow(): Unit ={
|
||||
|
||||
//parse flow json
|
||||
val file = "src/main/resources/flow/redis/WritToRedis.json"
|
||||
val file = "src/main/resources/flow/redis/WriteToRedis.json"
|
||||
val flowJsonStr = FileUtil.fileReader(file)
|
||||
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
|
||||
println(map)
|
||||
|
|
Loading…
Reference in New Issue