diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/nsfc/distinct/HivePSNDistinct.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/nsfc/distinct/HivePSNDistinct.scala index 6a8c732..93be862 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/nsfc/distinct/HivePSNDistinct.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/nsfc/distinct/HivePSNDistinct.scala @@ -170,20 +170,22 @@ class HivePSNDistinct extends ConfigurableStop { val primaryIndex = inDF.schema.fieldIndex(primaryKey) var pairRDD = inDF.rdd.map(row => (row.getString(primaryIndex), { row - })) - var processKeyArray = distinctFields.split(",") + }))// id - row + var processKeyArray = distinctRule.split(",") processKeyArray +:= idKey - processKeyArray.foreach(key => { //对这里的每一组key - pairRDD = pairRDD.map(row => (cn.piflow.bundle.util.NSFCUtil.mkRowKey(inSchema, row._2, key), row)) //生成key pair, 若key不存在则生成UUID + processKeyArray.foreach(key => { //对这里的每一组key hash - row + // name & : row + pairRDD = pairRDD.map((row: (String, Row)) => (cn.piflow.bundle.util.NSFCUtil.mkRowKey(inSchema, row._2, key), row)) //生成key pair, 若key不存在则生成UUID .groupByKey .map(i => (i._1 , { - cn.piflow.bundle.util.RedisUtil.recordMKDup(i._2, tableName ,jedisCluster.getJedisCluster) //Mark需要合并的key + cn.piflow.bundle.util.RedisUtil.recordMKDup(i._2, tableName ,jedisCluster.getJedisCluster) //Mark需要合并的key id2 ->id1, id3->id2 })) //opt:R/W deliver this part record the mk Dup .values .filter(r => { r._2 != null }) //reduce can combine two duplicate value) }) + //id -> psn_code -> row var keykeyRDD = pairRDD.map(r => (r._1, cn.piflow.bundle.util.RedisUtil.checkRedis(r, inSchema, tableName,distinctTableType, distinctRule.split(",") , jedisCluster.getJedisCluster),r._2)) var backToHiveRdd = keykeyRDD.map(r => (r._1, { @@ -191,7 +193,7 @@ class HivePSNDistinct extends ConfigurableStop { var psn = r._2 var id = r._1 var rowArray = row.toSeq - jedisCluster.getJedisCluster.hset(tableName + "@" + id, distinctTableType + "IdPSN" , psn) + jedisCluster.getJedisCluster.hset(tableName + "@" + id, distinctTableType + "IdPSN" , psn)//id:psn_code Row.fromSeq(rowArray.updated(primaryIndex,psn)) })).filter(r => { !jedisCluster.getJedisCluster.hexists(tableName + "@" + r._1, distinctTableType + "PSNExist") diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/util/RedisUtil.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/util/RedisUtil.scala index 2551ca2..b52856b 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/util/RedisUtil.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/util/RedisUtil.scala @@ -107,12 +107,12 @@ object RedisUtil extends Serializable { } def recordMKDup(rows: Iterable[(String,Row)], tableName : String, jedisCluster: JedisCluster) :(String,Row) = { - var f = rows.head //rows length >= 1 - if (rows.size < 2) return f //only one elem + var head_row = rows.head //rows length >= 1 + if (rows.size < 2) return head_row //only one elem for (row <- rows) { - jedisCluster.hset(tableName + "@" + row._1 ,tableName + ":MK - S", f._1) //set son =:MK - S=> father (1 -> 1) + jedisCluster.hset(tableName + "@" + row._1 ,tableName + ":MK - S", head_row._1) //set son =:MK - S=> father (1 -> 1) } - f + head_row } def getMKFather(key:String, tableName : String,psnType : String, jedisCluster: JedisCluster): String = {