This commit is contained in:
judy0131 2019-09-18 18:40:44 +08:00
parent d1dd9871c6
commit d631ea7a2b
2 changed files with 12 additions and 10 deletions

View File

@ -170,20 +170,22 @@ class HivePSNDistinct extends ConfigurableStop {
val primaryIndex = inDF.schema.fieldIndex(primaryKey) val primaryIndex = inDF.schema.fieldIndex(primaryKey)
var pairRDD = inDF.rdd.map(row => (row.getString(primaryIndex), { var pairRDD = inDF.rdd.map(row => (row.getString(primaryIndex), {
row row
})) }))// id - row
var processKeyArray = distinctFields.split(",") var processKeyArray = distinctRule.split(",")
processKeyArray +:= idKey processKeyArray +:= idKey
processKeyArray.foreach(key => { //对这里的每一组key processKeyArray.foreach(key => { //对这里的每一组key hash - row
pairRDD = pairRDD.map(row => (cn.piflow.bundle.util.NSFCUtil.mkRowKey(inSchema, row._2, key), row)) //生成key pair 若key不存在则生成UUID // name & : row
pairRDD = pairRDD.map((row: (String, Row)) => (cn.piflow.bundle.util.NSFCUtil.mkRowKey(inSchema, row._2, key), row)) //生成key pair 若key不存在则生成UUID
.groupByKey .groupByKey
.map(i => (i._1 , { .map(i => (i._1 , {
cn.piflow.bundle.util.RedisUtil.recordMKDup(i._2, tableName ,jedisCluster.getJedisCluster) //Mark需要合并的key cn.piflow.bundle.util.RedisUtil.recordMKDup(i._2, tableName ,jedisCluster.getJedisCluster) //Mark需要合并的key id2 ->id1, id3->id2
})) //opt:R/W deliver this part record the mk Dup })) //opt:R/W deliver this part record the mk Dup
.values .values
.filter(r => { .filter(r => {
r._2 != null r._2 != null
}) //reduce can combine two duplicate value) }) //reduce can combine two duplicate value)
}) })
//id -> psn_code -> row
var keykeyRDD = pairRDD.map(r => (r._1, cn.piflow.bundle.util.RedisUtil.checkRedis(r, inSchema, tableName,distinctTableType, distinctRule.split(",") , jedisCluster.getJedisCluster),r._2)) var keykeyRDD = pairRDD.map(r => (r._1, cn.piflow.bundle.util.RedisUtil.checkRedis(r, inSchema, tableName,distinctTableType, distinctRule.split(",") , jedisCluster.getJedisCluster),r._2))
var backToHiveRdd = keykeyRDD.map(r => (r._1, { var backToHiveRdd = keykeyRDD.map(r => (r._1, {
@ -191,7 +193,7 @@ class HivePSNDistinct extends ConfigurableStop {
var psn = r._2 var psn = r._2
var id = r._1 var id = r._1
var rowArray = row.toSeq var rowArray = row.toSeq
jedisCluster.getJedisCluster.hset(tableName + "@" + id, distinctTableType + "IdPSN" , psn) jedisCluster.getJedisCluster.hset(tableName + "@" + id, distinctTableType + "IdPSN" , psn)//id:psn_code
Row.fromSeq(rowArray.updated(primaryIndex,psn)) Row.fromSeq(rowArray.updated(primaryIndex,psn))
})).filter(r => { })).filter(r => {
!jedisCluster.getJedisCluster.hexists(tableName + "@" + r._1, distinctTableType + "PSNExist") !jedisCluster.getJedisCluster.hexists(tableName + "@" + r._1, distinctTableType + "PSNExist")

View File

@ -107,12 +107,12 @@ object RedisUtil extends Serializable {
} }
def recordMKDup(rows: Iterable[(String,Row)], tableName : String, jedisCluster: JedisCluster) :(String,Row) = { def recordMKDup(rows: Iterable[(String,Row)], tableName : String, jedisCluster: JedisCluster) :(String,Row) = {
var f = rows.head //rows length >= 1 var head_row = rows.head //rows length >= 1
if (rows.size < 2) return f //only one elem if (rows.size < 2) return head_row //only one elem
for (row <- rows) { for (row <- rows) {
jedisCluster.hset(tableName + "@" + row._1 ,tableName + ":MK - S", f._1) //set son =:MK - S=> father (1 -> 1) jedisCluster.hset(tableName + "@" + row._1 ,tableName + ":MK - S", head_row._1) //set son =:MK - S=> father (1 -> 1)
} }
f head_row
} }
def getMKFather(key:String, tableName : String,psnType : String, jedisCluster: JedisCluster): String = { def getMKFather(key:String, tableName : String,psnType : String, jedisCluster: JedisCluster): String = {