Merge remote-tracking branch 'origin/master'

# Conflicts:
#	piflow-bundle/piflow-bundle.iml
#	piflow-core/piflow-core.iml
This commit is contained in:
judy0131 2019-07-26 08:46:40 +08:00
commit 01bf0250f2
11 changed files with 364 additions and 32 deletions

View File

@ -1,23 +0,0 @@
server.ip=10.0.86.98
server.port=8001
#spark.master=spark://10.0.86.89:7077
#spark.master=spark://10.0.86.191:7077
spark.master=yarn
spark.deploy.mode=cluster
yarn.resourcemanager.hostname=10.0.86.191
yarn.resourcemanager.address=10.0.86.191:8032
yarn.access.namenode=hdfs://10.0.86.191:9000
yarn.stagingDir=hdfs://10.0.86.191:9000/tmp/
yarn.jars=hdfs://10.0.86.191:9000/user/spark/share/lib/*.jar
hive.metastore.uris=thrift://10.0.86.191:9083
#piflow.bundle=/opt/project/piflow/out/artifacts/piflow_bundle/piflow-bundle.jar
piflow.bundle=/opt/project/piflow/piflow-server/target/piflow-server-0.9.jar
yarn.url=http://10.0.86.191:8088/ws/v1/cluster/apps/
checkpoint.path=hdfs://10.0.86.89:9000/xjzhu/piflow/checkpoints/
log.path=/opt/project/piflow/logs
icon.path=/opt/project/piflow/icon

View File

@ -53,4 +53,12 @@ public class JedisClusterImplSer implements Serializable {
private void setJedisCluster(JedisCluster jedisCluster) {
this.jedisCluster = jedisCluster;
}
private void close() {
try {
this.jedisCluster.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

Binary file not shown.

Before

Width:  |  Height:  |  Size: 2.9 KiB

After

Width:  |  Height:  |  Size: 36 KiB

View File

@ -0,0 +1,5 @@
package cn.piflow.bundle.nsfc.distinct
class HivePRDDistinct {
}

View File

@ -0,0 +1,167 @@
package cn.piflow.bundle.nsfc.distinct
import cn.piflow.bundle.util.JedisClusterImplSer
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Column, DataFrame, Row, SaveMode, SparkSession, TypedColumn}
import redis.clients.jedis.HostAndPort
class HiveDistinct extends ConfigurableStop{
override val authorEmail: String = "xiaomeng7890@gmail.com"
override val description: String = ""
override val inportList: List[String] = List(PortEnum.DefaultPort.toString)
override val outportList: List[String] = List("Relation", "Entity")
// val primaryKey:String = _
// val subPrimaryKey:String = _
// val idKeyName : String = _
// val processKey = _
// val timeFields = _
// val subTimeFields = _
var tableName : String = _ //after wash
var noChangeSource : String = _
var sourceField : String = _
var timeField : String = _
var idKeys : String = _
var noChange : Boolean = _
var baseOnTime : Boolean = _
var baseOnField : Boolean = _
var distinctRule : String = _
var distinctFields : String = _
var primaryKey : String = _
var distinctTableType : String = _
var rel_fields : String = _ //"id,prj_code,psn_code"
var used_fields : String = _ //"id,psn_code,zh_name,id_type,id_no,birthday,gender,degree_code,nation,tel,email,org_code,zero,identity_card,military_id,passport,four,home_return_permit,mainland_travel_permit_for_taiwan_residents"
var redis_server_ip : String = _
var redis_server_port : Int = _
var redis_server_passwd : String = _
var jedisCluster : JedisClusterImplSer = _
// val subTableName = _
override def setProperties(map: Map[String, Any]): Unit = {
redis_server_ip = MapUtil.get(map,"redis ip").asInstanceOf[String]
redis_server_port = MapUtil.get(map,"redis port").asInstanceOf[Int]
redis_server_passwd = MapUtil.get(map,"redis passwd").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val redis_passwd = new PropertyDescriptor().
name("redis passwd").
displayName("redis passwd").
description("redis server passwd").
required(true)
descriptor = redis_passwd :: descriptor
val redis_port = new PropertyDescriptor().
name("redis port").
displayName("redis port").
description("redis server port").
required(true)
descriptor = redis_port :: descriptor
val redis_server = new PropertyDescriptor().
name("redis server").
displayName("redis server").
description("redis server ip").
required(true)
descriptor = redis_server :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = ImageUtil.getImage("icon/hive/SelectHiveQL.png")
override def getGroup(): List[String] = {
List(StopGroup.NSFC.toString, "sha0w", "distinct")
}
override def initialize(ctx: ProcessContext): Unit = {
jedisCluster = new JedisClusterImplSer(new HostAndPort(redis_server_ip, redis_server_port), redis_server_passwd)
}
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
if (noChange){
out.write(in.read())
return
}
val inDF = in.read().select(used_fields.split(",").map(s => new Column(s)) : _*)
val spark = pec.get[SparkSession]()
val inSchema = inDF.schema
val primaryIndex = inDF.schema.fieldIndex(primaryKey)
var pairRDD = inDF.rdd.map(row => (row.getString(primaryIndex), {
row
}))
var processKeyArray = distinctFields.split(",")
processKeyArray += idKeys
processKeyArray.foreach(key => { //对这里的每一组key
pairRDD = pairRDD.map(row => (cn.piflow.bundle.util.NSFCUtil.mkRowKey(inSchema, row._2, key), row)) //生成key pair 若key不存在则生成UUID
.groupByKey
.map(i => (i._1 , {
cn.piflow.bundle.util.RedisUtil.recordMKDup(i._2, tableName ,jedisCluster.getJedisCluster) //Mark需要合并的key
})) //opt:R/W deliver this part record the mk Dup
.values
.filter(r => {
r._2 != null
}) //reduce can combine two duplicate value)
})
var keykeyRDD = pairRDD.map(r => (r._1, cn.piflow.bundle.util.RedisUtil.checkRedis(r, inSchema, tableName,distinctTableType, distinctRule.split(",") , jedisCluster.getJedisCluster),r._2))
var backToHiveRdd = keykeyRDD.map(r => (r._1, {
var row = r._3
var psn = r._2
var id = r._1
var rowArray = row.toSeq
jedisCluster.getJedisCluster.hset(tableName + "@" + id, distinctTableType + "IdPSN" , psn)
Row.fromSeq(rowArray.updated(primaryIndex,psn))
})).filter(r => {
!jedisCluster.getJedisCluster.hexists(tableName + "@" + r._1, distinctTableType + "PSNExist")
}).values
println("=====================" + backToHiveRdd.count + "========================") //active
val df_backToHive = spark.sqlContext.createDataFrame(
backToHiveRdd, inSchema
)
val rel_fields_arr = rel_fields.split(",")
if (rel_fields_arr.size == 3) {
var df_relationship : DataFrame = inDF.select(rel_fields_arr.map(x => new Column(x)) : _*)
//此处应对pj_member进行映射
var rela_schema = df_relationship.schema
var relation_rdd = df_relationship.rdd
val id_1 = rel_fields_arr(0) //id
val id_2 = rel_fields_arr(1) //prj
val id_3 = rel_fields_arr(2) //psn
var backToHiveRelation = relation_rdd.map(row => (row.getString(rela_schema.fieldIndex(id_1))
,row.getString(rela_schema.fieldIndex(id_2))))
.map(r => (r._1, r._2, cn.piflow.bundle.util.RedisUtil.getMKFather(r._1, tableName, distinctTableType, jedisCluster.getJedisCluster)
)).map(i => Row.fromSeq(Array(i._1,i._2,i._3)))
var df_backToHiveRelation = spark.sqlContext.createDataFrame(backToHiveRelation, rela_schema)
jedisCluster.close()
out.write("Relation", df_backToHiveRelation)
}
else {
var df_relationship : DataFrame = inDF.select(rel_fields_arr.map(x => new Column(x)) : _*)
if (rel_fields_arr.size != 4) throw new Exception("wrong input rel schema size, should be 3 or 4")
var rela_schema = StructType(rel_fields_arr.map(StructField(_, StringType, nullable = true)))
var relation_rdd = df_relationship.rdd.map(i => {Row.fromSeq(i.toSeq.+:(cn.piflow.bundle.util.NSFCUtil.generateUUID()))})
//id,prp_code,psn_code,seq_no
var backToHiveRelation =
relation_rdd.
map(row => (row.getString(rela_schema.fieldIndex(rel_fields_arr(0))),
row.getString(rela_schema.fieldIndex(rel_fields_arr(1))),
row.getString(rela_schema.fieldIndex(rel_fields_arr(2))),
row.getString(rela_schema.fieldIndex(rel_fields_arr(3)))))
.map(r => (r._1, r._2, cn.piflow.bundle.util.RedisUtil.getMKFather(r._3, tableName,distinctTableType,jedisCluster.getJedisCluster), r._4))
.map(i => Row.fromSeq(Array(i._1,i._2,i._3,i._4)))
var df_backToHiveRelation = spark.sqlContext.createDataFrame(backToHiveRelation, rela_schema)
jedisCluster.close()
out.write("Relation", df_backToHiveRelation)
}
out.write("Entity", df_backToHive)
}
}

View File

@ -0,0 +1,106 @@
package cn.piflow.bundle.nsfc.distinct
import cn.piflow.bundle.util.JedisClusterImplSer
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import redis.clients.jedis.HostAndPort
class RedisDistinctCachePersist extends ConfigurableStop {
override val authorEmail: String = "xiaomeng7890@gmail.com"
override val description: String = "persist the df into redis server, which will be used to distinct"
override val inportList: List[String] = List(PortEnum.DefaultPort.toString)
override val outportList: List[String] = List(PortEnum.DefaultPort.toString)
var persist_needed_fields : String = _
var persist_primary_field : String = _
var distinct_rule : String = _
var redis_server_ip : String = _
var redis_server_port : Int = _
var redis_server_passwd : String = _
override def setProperties(map: Map[String, Any]): Unit = {
persist_needed_fields = MapUtil.get(map,"distinct field").asInstanceOf[String]
persist_primary_field = MapUtil.get(map,"primary field").asInstanceOf[String]
distinct_rule = MapUtil.get(map,"distinct rule").asInstanceOf[String]
redis_server_ip = MapUtil.get(map,"redis ip").asInstanceOf[String]
redis_server_port = MapUtil.get(map,"redis port").asInstanceOf[Int]
redis_server_passwd = MapUtil.get(map,"redis passwd").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val redis_passwd = new PropertyDescriptor().
name("redis passwd").
displayName("redis passwd").
description("redis server passwd").
required(true)
descriptor = redis_passwd :: descriptor
val redis_port = new PropertyDescriptor().
name("redis port").
displayName("redis port").
description("redis server port").
required(true)
descriptor = redis_port :: descriptor
val redis_server = new PropertyDescriptor().
name("redis server").
displayName("redis server").
description("redis server ip").
required(true)
descriptor = redis_server :: descriptor
val primary_field = new PropertyDescriptor().
name("primary field").
displayName("primary field").
description("the primary key").
defaultValue("psn_code").
required(true)
descriptor = primary_field :: descriptor
val distinct_field = new PropertyDescriptor().
name("distinct field").
displayName("distinct field").
description("the fields needed in distinct and distinct rule").
defaultValue("zh_name,email,tel").
required(true)
descriptor = distinct_field :: descriptor
val distinct_rule = new PropertyDescriptor().
name("distinct rule").
displayName("distinct rule").
description("the rule to organize distinct").
defaultValue("zh_name&email,zh_name&tel").
required(true)
descriptor = distinct_rule :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = ImageUtil.getImage("icon/hive/SelectHiveQL.png")
override def getGroup(): List[String] = List(StopGroup.NSFC.toString, "sha0w", "distinct")
override def initialize(ctx: ProcessContext): Unit = {}
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val jedisCluster : JedisClusterImplSer =
new JedisClusterImplSer(new HostAndPort(redis_server_ip, redis_server_port), redis_server_passwd)
val df = in.read()
val mPrimaryKeyIndex = df.schema.fieldIndex(persist_primary_field) //PSNCODE
val df_mperson_fields = persist_needed_fields.split(",").+:(persist_primary_field)
val s1 = df_mperson_fields.map(i => (i, {
df.schema.fieldIndex(i)
})).toMap[String, Int] //生产字段名 - index的键值对
df.rdd.foreach(
row => {
cn.piflow.bundle.util.RedisUtil.putRedis(
row, s1, distinct_rule, mPrimaryKeyIndex, jedisCluster.getJedisCluster) // create the redis dataset
}
)
out.write(df)
}
}

View File

@ -4,7 +4,7 @@ import java.text.SimpleDateFormat
import java.util.{Date, UUID}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
object NSFCUtil {
val dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
@ -35,8 +35,10 @@ object NSFCUtil {
for (index <- 0 until beforeSchema.length) {
val name = beforeSchema(index).name
name match {
case `idTypeField` => if (beforeRowSeq(index) == null) card_type = "null" else card_type = String.valueOf(beforeRowSeq(index))
case `idField` => if (beforeRowSeq(index) == null) card_code = "null" else card_code = String.valueOf(beforeRowSeq(index))
case `idTypeField` => if (beforeRowSeq(index) == null)
card_type = "null" else card_type = String.valueOf(beforeRowSeq(index))
case `idField` => if (beforeRowSeq(index) == null)
card_code = "null" else card_code = String.valueOf(beforeRowSeq(index))
case _ => afterMap.put(beforeSchema(index).name, beforeRowSeq(index))
}
}
@ -59,6 +61,7 @@ object NSFCUtil {
afterMap.put("mainland_travel_permit_for_taiwan_residents", ifNUll(six))
afterMap.put("source", source) // 加入source字段
afterMap.put("uuid", UUID.randomUUID().toString)
afterMap.put("id_hash", (card_code + card_type).##.toString)
for (index <- 0 until afterSchema.length) {
if (!afterMap.keySet.contains(afterSchema(index).name)) {
afterMap.put(afterSchema(index).name, null)
@ -81,7 +84,8 @@ object NSFCUtil {
afterSchema = afterSchema.add(f.name, f.dataType,nullable = true)
})
afterSchema = afterSchema.add("source", StringType, nullable = true)
afterSchema.add("uuid", StringType, nullable = true)
afterSchema = afterSchema.add("uuid", StringType, nullable = true)
afterSchema.add("id_hash", StringType, nullable = true)
}

View File

@ -2,6 +2,7 @@ package cn.piflow.bundle.util
import java.util
import org.apache.spark.sql.{Dataset, Row}
import redis.clients.jedis.JedisCluster
object RedisUtil extends Serializable {
def manipulateRow(row:Row,column_name:String,jedisClusterImplSer: JedisClusterImplSer):Unit={
var hm:util.HashMap[String,String]=new util.HashMap()
@ -18,5 +19,37 @@ object RedisUtil extends Serializable {
jedisClusterImplSer.getJedisCluster.hmset(key,hm)
}
/**
*
* @param row 存入redis的行
* @param map 字段名称与位置索引
* @param build 字段名 <- 用以去重的字段
* @param valueIndex PSNCODE的位置
* @param jedisCluster
*/
def putRedis(row: Row, map: Map[String, Int], build :String, valueIndex:Int, jedisCluster: JedisCluster): Unit = {
var hasNull = false
var value = row.getString(valueIndex) //get the primary key
build.split(",").foreach(idKey => { //name&tel,name&email
var field = idKey
var key = ""
if (idKey.contains("&")) { //name&tel
val sl = idKey.split("&")
sl.foreach(s_ => {
if (!row.isNullAt(map(s_))) {
key += row.getString(map(s_))
} else {hasNull = true}
})
}
else {
if (!row.isNullAt(map(idKey))) {
key += row.getString(map(idKey))
} else {hasNull = true}
}
if (!hasNull) {
jedisCluster.hset(key,field,value) // combined keys - fields - psn_code
// println(key + ":" + field + ":" + value) test pass
}
})
}
}

View File

@ -0,0 +1,8 @@
<configuration>
<property>
<name>hive.metastore.schema.verification</name>
<value>true</value>
</property>
</configuration>

View File

@ -0,0 +1,24 @@
# Set everything to be logged to the console
log4j.rootCategory=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
# Set the default spark-shell log level to WARN. When running the spark-shell, the
# log level for this class is used to overwrite the root logger's log level, so that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=WARN
# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=WARN
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR
# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR

View File

@ -1,9 +1,9 @@
1.maven error
apt-get install maven
mvn install:install-file -Dfile=/root/Desktop/dblp/piflow-bundle/lib/spark-xml_2.11-0.4.2.jar -DgroupId=com.databricks -DartifactId=spark-xml_2.11 -Dversion=0.4.2 -Dpackaging=jar
mvn install:install-file -Dfile=/root/Desktop/dblp/piflow-bundle/lib/java_memcached-release_2.6.6.jar -DgroupId=com.memcached -DartifactId=java_memcached-release -Dversion=2.6.6 -Dpackaging=jar
mvn install:install-file -Dfile=/root/Desktop/dblp/piflow-bundle/lib/ojdbc6.jar -DgroupId=jdbc_oracle -DartifactId=ojdbc -Dversion=6.0.0 -Dpackaging=jar
mvn install:install-file -Dfile=/root/Desktop/dblp/piflow-bundle/lib/edtftpj.jar -DgroupId=ftpClient -DartifactId=edtftp -Dversion=1.0.0 -Dpackaging=jar
mvn install:install-file -Dfile=/opt/project/piflow/piflow-bundle/lib/spark-xml_2.11-0.4.2.jar -DgroupId=com.databricks -DartifactId=spark-xml_2.11 -Dversion=0.4.2 -Dpackaging=jar
mvn install:install-file -Dfile=/opt/project/piflow/piflow-bundle/lib/java_memcached-release_2.6.6.jar -DgroupId=com.memcached -DartifactId=java_memcached-release -Dversion=2.6.6 -Dpackaging=jar
mvn install:install-file -Dfile=/opt/project/piflow/piflow-bundle/lib/ojdbc6.jar -DgroupId=jdbc_oracle -DartifactId=ojdbc -Dversion=6.0.0 -Dpackaging=jar
mvn install:install-file -Dfile=/opt/project/piflow/piflow-bundle/lib/edtftpj.jar -DgroupId=ftpClient -DartifactId=edtftp -Dversion=1.0.0 -Dpackaging=jar
2.packaging