From ec1680aadc7696d7f11dfa1f4cca9a1ec87d130c Mon Sep 17 00:00:00 2001 From: coco11563 Date: Wed, 31 Jul 2019 19:45:45 +0800 Subject: [PATCH] Done distinct part --- .../bundle/hive/OptionalSelectHiveQL.scala | 62 +++++- .../nsfc/distinct/HivePRDDistinct.scala | 184 +++++++++++++++++- .../piflow/bundle/nsfc/distinct/description | 11 ++ .../bundle/nsfc/distinct/foreignKeyWash.scala | 89 ++++++++- 4 files changed, 323 insertions(+), 23 deletions(-) create mode 100644 piflow-bundle/src/main/scala/cn/piflow/bundle/nsfc/distinct/description diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/hive/OptionalSelectHiveQL.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/hive/OptionalSelectHiveQL.scala index fe2725e..bf92a23 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/hive/OptionalSelectHiveQL.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/hive/OptionalSelectHiveQL.scala @@ -3,10 +3,13 @@ package cn.piflow.bundle.hive import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup} import cn.piflow.conf.bean.PropertyDescriptor -import cn.piflow.conf.util.ImageUtil +import cn.piflow.conf.util.{ImageUtil, MapUtil} import org.apache.spark.SparkContext -import org.apache.spark.sql.{DataFrame, Row, SQLContext} +import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession} +/** + * HIVE JDBC DRIVER DESIGN FOR HIVE 1.2.1 + */ class OptionalSelectHiveQL extends ConfigurableStop { override val authorEmail: String = "xiaomeng7890@gmail.com" override val description: String = "some hive can only achieve by jdbc, this stop is designed for this" @@ -17,11 +20,50 @@ class OptionalSelectHiveQL extends ConfigurableStop { var hiveUser : String = _ var hivePasswd : String = _ var jdbcUrl : String = _ + var sql : String = _ - override def setProperties(map: Map[String, Any]): Unit = {} + override def setProperties(map: Map[String, Any]): Unit = { + hiveUser = MapUtil.get(map,"hive user").asInstanceOf[String] + hivePasswd = MapUtil.get(map,"hive passwd").asInstanceOf[String] + jdbcUrl = MapUtil.get(map,"jdbcUrl").asInstanceOf[String] + sql = MapUtil.get(map,"query").asInstanceOf[String] + } override def getPropertyDescriptor(): List[PropertyDescriptor] = { - List() + var descriptor : List[PropertyDescriptor] = List() + val hiveUser = new PropertyDescriptor(). + name("hive user"). + displayName("hive user"). + description("hive user name"). + defaultValue("hdfs"). + required(true) + descriptor = hiveUser :: descriptor + + val hivePasswd = new PropertyDescriptor(). + name("hive passwd"). + displayName("hive passwd"). + description("hive password"). + defaultValue("123456"). + required(true) + descriptor = hivePasswd :: descriptor + + val jdbcUrl = new PropertyDescriptor(). + name("jdbcUrl"). + displayName("jdbcUrl"). + description("hive jdbc url"). + defaultValue("jdbc:hive2://packone12:2181,packone13:2181,packone11:2181/middle;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2"). + required(true) + descriptor = jdbcUrl :: descriptor + + val sql = new PropertyDescriptor(). + name("query"). + displayName("hive query"). + description("hive sql query"). + defaultValue("select * from middle.m_person"). + required(true) + descriptor = sql :: descriptor + + descriptor } @@ -37,10 +79,14 @@ class OptionalSelectHiveQL extends ConfigurableStop { override def initialize(ctx: ProcessContext): Unit = {} - override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {} + override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { + val sc = pec.get[SparkSession]() + val df = getDF (sc.sqlContext, sc.sparkContext, sql) + out.write(df) + } def getDF(sqlContext : SQLContext, sc : SparkContext, tableName : String) : DataFrame = { - var df = sqlContext.read.table(tableName) + var df = sqlContext.sql(sql) val count = df.count() if (count == 0) { println("Cant read by normal read, using JDBC <== this will cost a lot of time") @@ -58,8 +104,8 @@ class OptionalSelectHiveQL extends ConfigurableStop { e.printStackTrace() System.exit(1) } - val conn = DriverManager.getConnection(jdbcUrl, "hive", "123456") - val ptsm = conn.prepareStatement("select * from " + tableName) + val conn = DriverManager.getConnection(jdbcUrl, hiveUser, hivePasswd) + val ptsm = conn.prepareStatement(sql) println(ptsm) val rs = ptsm.executeQuery() var rows = Seq[Row]() diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/nsfc/distinct/HivePRDDistinct.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/nsfc/distinct/HivePRDDistinct.scala index 0c68b5d..9ff9ce2 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/nsfc/distinct/HivePRDDistinct.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/nsfc/distinct/HivePRDDistinct.scala @@ -1,15 +1,16 @@ package cn.piflow.bundle.nsfc.distinct +import java.util.UUID + import cn.piflow.bundle.util.JedisClusterImplSer import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.util.{ImageUtil, MapUtil} import cn.piflow.conf.{ConfigurableStop, StopGroup} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} -import redis.clients.jedis.HostAndPort +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{Column, Row, SparkSession} +import redis.clients.jedis.{HostAndPort, JedisCluster} -/** - * HIVE JDBC DRIVER DESIGN FOR HIVE 1.2.1 - */ class HivePRDDistinct extends ConfigurableStop { override val authorEmail: String = "xiaomeng7890@gmail.com" override val description: String = "" @@ -17,9 +18,12 @@ class HivePRDDistinct extends ConfigurableStop { override val inportList: List[String] = List("productTable","productSpecTable") override val outportList: List[String] = List("Relation", "Entity") - var processuserKey : String = _ - var psnPrimary : String = _ - var productPrimary : String = _ + var processUseKey : String = "product_id,zh_title,authors,en_title" + var rule : String = "zh_title&authors,en_title&authors,product_id" + lazy val recordKey = "id" + lazy val psn = "psn_code" + lazy val pro = "product_id" + var redis_server_ip : String = _ var redis_server_port : Int = _ @@ -69,8 +73,170 @@ class HivePRDDistinct extends ConfigurableStop { jedisCluster = new JedisClusterImplSer(new HostAndPort(redis_server_ip, redis_server_port), redis_server_passwd) } - override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {} + override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { + val df = in.read() + val ss = pec.get[SparkSession]() + val relation_df = df.select(Seq[String](recordKey, psn, pro).map(s => new Column(s)) : _*) + val cols = df.columns + var union_schema = df.schema + //get used key + val useKey = cols.array + val processUseKeyArray = processUseKey.split(",") + val useCol = useKey. + filter(s => {processUseKeyArray.contains(s)}). + map(x => new Column(x)) + var require_df = df.select(useCol : _*) + var require_schema = require_df.schema + var recordIndex = require_schema.fieldIndex(pro) + //check point + var require_rdd = require_df.rdd + println("================================================") -} \ No newline at end of file + println("================================================") + //dup + val processKeyArray = rule.split(",") + processKeyArray.foreach(key => { + require_rdd = require_rdd.map(row => (this.mkRowKey(require_schema, row, key).trim, row)) + .groupByKey() + .map(i => (i._1 , { + recordMKDup(i._2, recordIndex, jedisCluster.getJedisCluster)//记录 + })) //opt:R/W deliver this part record the mk Dup + .values + .filter(r => { + r != null + }) //reduce can combine two duplicate value) + }) + println( "temp size is : ====> + : " + require_rdd.count()) + //dup up + val back_to_hive_rdd = df.rdd.map(row => { + (this.getMKFather(row.getString(union_schema.fieldIndex(pro)), jedisCluster.getJedisCluster),row) + }) .groupByKey() + .map(i => { + val rows = i._2.toArray + gatherDup(rows, union_schema.fieldIndex(recordKey), i._1) + }) + println( "back to hive size is : ====> + : " + back_to_hive_rdd.count()) + val hive_df = ss.createDataFrame(back_to_hive_rdd, union_schema) + val relaSchema = relation_df.schema + val product_index_rela = relaSchema.fieldIndex(pro) + + val relation_rdd_afwash = relation_df.rdd.map(r => { + val pro_id = r.getString(product_index_rela) + Row.fromSeq(r.toSeq.updated(product_index_rela, getMKFather(pro_id, jedisCluster.getJedisCluster))) + }) + val relation_df_afwash = ss.createDataFrame(relation_rdd_afwash, relaSchema) + out.write("Entity", hive_df) + out.write("Relation", relation_df_afwash) + } + def getMKFather(key:String,jedisCluster: JedisCluster): String = { + val s = jedisCluster.hget("product_table_dup@" + key, "product:MK - S") + if (s == null || s == key) key //not son of any return itself + // I know you will feel confuse , just relax :-) + else getMKFather(s,jedisCluster) + } + def gatherDup(rows : Array[Row], primaryKeyIndex:Int, primaryKey : String): Row = { + var f = rows.head //rows length >= 1 + if (rows.length < 2) return f //only one elem + var father = f.toSeq.toArray + for (index <- 1 until rows.length) { + var row = rows(index) + if (row != null && f != null){ //null pointer + father = gatherDup_(father, row.toSeq.toArray, primaryKeyIndex, primaryKey) + } + } + Row.fromSeq(father.toSeq) + } + def gatherDup_(a: Array[Any], b: Array[Any], Pindex:Int, PK: String): Array[Any] = { + for (indi <- a.indices) { + if (a(indi) == null) + a.update(indi, b(indi)) + } + a.update(Pindex, PK) //set father key to this + a + } + def buildNewRow (before : Seq[Any], beforeSchema:StructType, afterSchema:StructType): Row = { + var afterSeq = scala.collection.mutable.ArraySeq[Any]() + for (index <- 0 until afterSchema.length) { + afterSeq = afterSeq.:+(before(beforeSchema.fieldIndex(afterSchema(index).name))) + } + Row.fromSeq(afterSeq) + } + def resetRedis(jedisCluster: JedisCluster): Unit = { + import scala.collection.JavaConversions._ + for (pool <- jedisCluster.getClusterNodes.values) { + try { + val jedis = pool.getResource + try + jedis.flushAll + catch { + case ex: Exception => + System.out.println(ex.getMessage) + } finally if (jedis != null) jedis.close() + } + } + } + + def getPositive(i : Int): String = { + if (i < 0) "n" + (i * -1).toString + else i.toString + } + + def recordMKDup(row1: Row, row2:Row , primaryKeyIndex:Int, jedisCluster: JedisCluster) : Row = { + if (row2 == null) return row1 //only one elem + jedisCluster.hset("product_table_dup@" + row2.getString(primaryKeyIndex), "product:MK - S", row1.getString(primaryKeyIndex)) //set son =:MK - S=> father (1 -> 1) + row1 + } + def recordMKDup(rows: Iterable[Row],primaryKeyIndex:Int, jedisCluster: JedisCluster) :Row = { + var f = rows.head //rows length >= 1 + if (rows.size < 2) return f //only one elem + for (row <- rows) { + jedisCluster.hset("product_table_dup@" + row.getString(primaryKeyIndex), "product:MK - S", f.getString(primaryKeyIndex)) //set son =:MK - S=> father (1 -> 1) + } + f + } + def keys(pattern: String, jedisCluster: JedisCluster): Set[String] = { + var keys = Set[String]() + val clusterNodes = jedisCluster.getClusterNodes + import scala.collection.JavaConversions._ + for (k <- clusterNodes.keySet) { + val jp = clusterNodes.get(k) + try { + val connection = jp.getResource + try + keys = keys.++:(connection.keys(pattern)) + catch { + case ignored: Exception => + } finally if (connection != null) connection.close() + } + } + keys + } + def mkRowKey(schema_result:StructType, row: Row, key : String): String = { + var hasNull = false + var s = "" + if (key.contains("&")) { + val sl = key.split("&") + sl.foreach(s_ => { + val index = schema_result.fieldIndex(s_) + if (row.isNullAt(index) || (row.getAs[String](index) == "null")) { + hasNull = true + } else { + s += row.getAs[String](index) + } + }) + } else { + val index = schema_result.fieldIndex(key) + if (row.isNullAt(index) || (row.getString(index) == "null")) { + hasNull = true + } else { + s = row.getAs[String](index) + } + } + if (hasNull) { + s = UUID.randomUUID().toString + } + s + } +} diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/nsfc/distinct/description b/piflow-bundle/src/main/scala/cn/piflow/bundle/nsfc/distinct/description new file mode 100644 index 0000000..9fe76af --- /dev/null +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/nsfc/distinct/description @@ -0,0 +1,11 @@ +Person: + person : + [HiveTableJoinOn(t_person & sub_person)] -> [PersonIdExpansion] -> [RedisDistinctCachePersist] -> [HiveWrite] + {Join on "psn_code"} + prj/prp : [SelectHiveQL] -> [PersonIdExpansion] -> [PSNDistinct] -("entity")-> [RedisDistinctCachePersist] -> [HiveWrite] + \-("relation")-> [HiveWrite] +Product: + [HiveTableJoinOn(product & sub_product)] -> [PRDDistinct] -("Entity")-> [HiveWrite] -> [HiveTableJoinOn(product & sub_product)] -> [PRDDistinct] -> ... + {Join on "id"} \-("Relation")-> [HiveWrite] +ForeignKeyWash: + [HiveRead] -> [Wash] -> [HiveWrite] \ No newline at end of file diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/nsfc/distinct/foreignKeyWash.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/nsfc/distinct/foreignKeyWash.scala index 7607ea4..5235de0 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/nsfc/distinct/foreignKeyWash.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/nsfc/distinct/foreignKeyWash.scala @@ -1,30 +1,107 @@ package cn.piflow.bundle.nsfc.distinct +import cn.piflow.bundle.util.JedisClusterImplSer import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup} import cn.piflow.conf.bean.PropertyDescriptor -import cn.piflow.conf.util.ImageUtil +import cn.piflow.conf.util.{ImageUtil, MapUtil} +import org.apache.spark.sql.{Row, SparkSession} +import redis.clients.jedis.{HostAndPort, JedisCluster} class foreignKeyWash extends ConfigurableStop { override val authorEmail: String = "xiaomeng7890@gmail.com" override val description: String = "" // + var washField : String = _ + var redis_server_ip : String = _ + var redis_server_port : Int = _ + var redis_server_passwd : String = _ + var jedisCluster : JedisClusterImplSer = _ + override val inportList: List[String] = List(PortEnum.DefaultPort.toString) override val outportList: List[String] = List(PortEnum.DefaultPort.toString) - override def setProperties(map: Map[String, Any]): Unit = {} + override def setProperties(map: Map[String, Any]): Unit = { + redis_server_ip = MapUtil.get(map,"redis ip").asInstanceOf[String] + redis_server_port = MapUtil.get(map,"redis port").asInstanceOf[Int] + redis_server_passwd = MapUtil.get(map,"redis passwd").asInstanceOf[String] + washField = MapUtil.get(map,"wash field").asInstanceOf[String] + } override def getPropertyDescriptor(): List[PropertyDescriptor] = { - List() + var descriptor : List[PropertyDescriptor] = List() + + val redis_passwd = new PropertyDescriptor(). + name("redis passwd"). + displayName("redis passwd"). + description("redis server passwd"). + required(true) + descriptor = redis_passwd :: descriptor + + val redis_port = new PropertyDescriptor(). + name("redis port"). + displayName("redis port"). + description("redis server port"). + required(true) + descriptor = redis_port :: descriptor + + val redis_server = new PropertyDescriptor(). + name("redis server"). + displayName("redis server"). + description("redis server ip"). + required(true) + descriptor = redis_server :: descriptor + + val wash_field = new PropertyDescriptor(). + name("wash field"). + displayName("wash field"). + description("the foreign key you need wash"). + required(true) + descriptor = wash_field :: descriptor + + descriptor } override def getIcon(): Array[Byte] = ImageUtil.getImage("icon/hive/SelectHiveQL.png") override def getGroup(): List[String] = { - List(StopGroup.NSFC.toString, "sha0w", "distinct", "product") + List(StopGroup.NSFC.toString, "sha0w", "distinct", "foreignKey") } - override def initialize(ctx: ProcessContext): Unit = {} + override def initialize(ctx: ProcessContext): Unit = { + jedisCluster = new JedisClusterImplSer(new HostAndPort(redis_server_ip, redis_server_port), redis_server_passwd) + } - override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {} + override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { + val washTable = in.read() + val ss = pec.get[SparkSession]() + val acc = ss.sparkContext.accumulator[Int](0,"changeCount") + val washTableSchema = washTable.schema + val washIndex = washTableSchema.fieldIndex(washField) + val washTableRdd = washTable.rdd + val afterWash = washTableRdd + .map(row => row.toSeq) + .map(seq => { + val key = getMKFather(seq(washIndex).asInstanceOf[String], jedisCluster.getJedisCluster) + if (key != seq(washIndex)) { + seq.updated(washIndex, key) + acc += 1 + } + seq + } + ).map(r => Row.fromSeq(r)) + + val bkDF = ss.createDataFrame(afterWash, washTableSchema) + println(bkDF.count()) + println("====================================") + println(acc.name.get+" : "+acc.value) + out.write(bkDF) + } + + def getMKFather(key:String, jedisCluster: JedisCluster): String = { + val s = jedisCluster.hget("product_table_dup@" + key, "product:MK - S") + if (s == null || s == key) key //not son of any return itself + // I know you will feel confuse , just relax :-) + else getMKFather(s, jedisCluster) + } }