From e815bed7e2765f251010bf86dc29e3a0aa62ca3a Mon Sep 17 00:00:00 2001 From: or <1506355667@qq.com> Date: Thu, 2 Apr 2020 17:11:23 +0800 Subject: [PATCH] UPDATE STOP --- .../resources/flow/redis/ReadFromRedis.json | 12 +++---- .../resources/flow/redis/WriteToRedis.json | 7 ++-- .../cn/piflow/bundle/redis/WriteToRedis.scala | 5 +-- .../cn/piflow/bundle/util/RedisUtil.scala | 16 ++++----- .../bundle/redis/ReadFromRedisTest.scala | 33 ++++++++++++++++++- .../bundle/redis/WriteToRedisTest.scala | 2 +- 6 files changed, 52 insertions(+), 23 deletions(-) diff --git a/piflow-bundle/src/main/resources/flow/redis/ReadFromRedis.json b/piflow-bundle/src/main/resources/flow/redis/ReadFromRedis.json index 8484bb6..1b74903 100644 --- a/piflow-bundle/src/main/resources/flow/redis/ReadFromRedis.json +++ b/piflow-bundle/src/main/resources/flow/redis/ReadFromRedis.json @@ -8,19 +8,19 @@ "name":"SelectHiveQL", "bundle":"cn.piflow.bundle.hive.SelectHiveQL", "properties":{ - "hiveQL":"select * from test.user1" + "hiveQL":"select *,name as author from test.user1" } }, { "uuid":"1111", - "name":"WriteToRedis", - "bundle":"cn.piflow.bundle.redis.WriteToRedis", + "name":"ReadFromRedis", + "bundle":"cn.piflow.bundle.redis.ReadFromRedis", "properties":{ - "redis_host":"10.0.88.9", + "redis_host":"192.168.3.121", "port":"7000", "password":"bigdata", - "column_name":"title", - "schema":"author,pages" + "column_name":"author", + "schema":"author" } } diff --git a/piflow-bundle/src/main/resources/flow/redis/WriteToRedis.json b/piflow-bundle/src/main/resources/flow/redis/WriteToRedis.json index 403db5c..fef8c9c 100644 --- a/piflow-bundle/src/main/resources/flow/redis/WriteToRedis.json +++ b/piflow-bundle/src/main/resources/flow/redis/WriteToRedis.json @@ -8,7 +8,7 @@ "name":"SelectHiveQL", "bundle":"cn.piflow.bundle.hive.SelectHiveQL", "properties":{ - "hiveQL":"select * from test.user1" + "hiveQL":"select id,name from test.user1" } }, { @@ -19,8 +19,7 @@ "redis_host":"192.168.3.138", "port":"7000", "password":"bigdata", - "column_name":"title", - "schema":"author,pages" + "column_name":"test" } } ], @@ -29,7 +28,7 @@ "from":"SelectHiveQL", "outport":"", "inport":"", - "to":"ReadFromRedis" + "to":"WriteToRedis" } ] } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/redis/WriteToRedis.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/redis/WriteToRedis.scala index 53bdb62..67a7bf2 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/redis/WriteToRedis.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/redis/WriteToRedis.scala @@ -25,14 +25,15 @@ class WriteToRedis extends ConfigurableStop{ val spark = pec.get[SparkSession]() val df = in.read() var col_name:String=column_name + df.printSchema() - println(df.schema) //connect to redis var jedisCluster=new JedisClusterImplSer(new HostAndPort(redis_host,port),password) + df.collect.foreach(row=> { RedisUtil.manipulateRow(row,col_name,jedisCluster) }) - val v=jedisCluster.getJedisCluster.hkeys("Python") + val v=jedisCluster.getJedisCluster.hkeys("doi") println(v) } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/util/RedisUtil.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/util/RedisUtil.scala index 2551ca2..2d0fbb2 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/util/RedisUtil.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/util/RedisUtil.scala @@ -7,19 +7,17 @@ import redis.clients.jedis.JedisCluster import scala.util.control.Breaks.{break, breakable} object RedisUtil extends Serializable { + def manipulateRow(row:Row,column_name:String,jedisClusterImplSer: JedisClusterImplSer):Unit={ var hm:util.HashMap[String,String]=new util.HashMap() - val key=row.getAs(column_name).asInstanceOf[String] - //row.schema.fields.foreach(f=>(if(!f.name.equals(column_name)&&row.getAs(f.name)!=null)hm.put(f.name,row.getAs(f.name).asInstanceOf[String]))) + row.schema.fields.foreach(f=>{ - if(!f.name.equals(column_name)){ - if(row.getAs(f.name)==null)hm.put(f.name,"None") - else{ - hm.put(f.name,row.getAs(f.name).asInstanceOf[String]) - } - } + println(f.name+"---------"+row.getAs(f.name)) + hm.put(f.name,row.getAs(f.name).asInstanceOf[String]) }) - jedisClusterImplSer.getJedisCluster.hmset(key,hm) + + + jedisClusterImplSer.getJedisCluster.hmset(column_name,hm) } /** diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/redis/ReadFromRedisTest.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/redis/ReadFromRedisTest.scala index 9dfae22..b387127 100644 --- a/piflow-bundle/src/test/scala/cn/piflow/bundle/redis/ReadFromRedisTest.scala +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/redis/ReadFromRedisTest.scala @@ -6,7 +6,7 @@ import cn.piflow.Runner import cn.piflow.conf.bean.FlowBean import cn.piflow.conf.util.{FileUtil, OptionUtil} import cn.piflow.util.{PropertyUtil, ServerIpUtil} -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.{SaveMode, SparkSession} import org.h2.tools.Server import org.junit.Test @@ -55,4 +55,35 @@ class ReadFromRedisTest { spark.close(); } + @Test + def testFlow1(): Unit ={ + + val spark = SparkSession.builder() + .master("local[*]") + .appName("SparkReadRedis") + .config("spark.driver.memory", "1g") + .config("spark.executor.memory", "2g") + .config("spark.cores.max", "2") + .config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris")) + + .config("spark.redis.host","192.168.3.138") + .config("spark.redis.port", "7000") + .config("spark.redis.auth","bigdata") //指定redis密码 + .config("spark.redis.db","0") //指定redis库 + .enableHiveSupport() + .getOrCreate() + + + val df = spark.sql("select * from test.user1") + + df.write + .format("org.apache.spark.sql.redis") + .option("table", "person") + .option("key.column", "name") + .mode(SaveMode.Overwrite) + .save() + + + } + } diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/redis/WriteToRedisTest.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/redis/WriteToRedisTest.scala index b57bf1a..a608977 100644 --- a/piflow-bundle/src/test/scala/cn/piflow/bundle/redis/WriteToRedisTest.scala +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/redis/WriteToRedisTest.scala @@ -18,7 +18,7 @@ class WriteToRedisTest { def testFlow(): Unit ={ //parse flow json - val file = "src/main/resources/flow/redis/WritToRedis.json" + val file = "src/main/resources/flow/redis/WriteToRedis.json" val flowJsonStr = FileUtil.fileReader(file) val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]] println(map)