Merge remote-tracking branch 'origin/master'

This commit is contained in:
judy0131 2020-04-15 12:02:11 +08:00
commit 037f762239
15 changed files with 2865 additions and 19 deletions

View File

@ -11,7 +11,7 @@
"properties":{
"Str":"1,zs\n2,ls\n3,ww",
"delimiter":",",
"schema":""
"schema":"id,name"
}
},
{

View File

@ -0,0 +1,42 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"0000",
"name":"ReadHbase",
"bundle":"cn.piflow.bundle.hbase.ReadHbase",
"properties":{
"quorum": "packone138,packone139,packone140",
"port":"2181",
"znodeParent": "/hbase-unsecure",
"table": "db0408:test02",
"rowid": "rowkey",
"family": "t1,t2",
"qualifier": "name,age,gender"
}
},
{
"uuid":"1324",
"name":"CsvSave",
"bundle":"cn.piflow.bundle.csv.CsvSave",
"properties":{
"csvSavePath":"hdfs://192.168.3.138:8020/test/hbasetest",
"header": "true",
"delimiter":",",
"partition":"1",
"saveMode": "append"
}
}
],
"paths":[
{
"from":"ReadHbase",
"outport":"",
"inport":"",
"to":"CsvSave"
}
]
}
}

View File

@ -1,4 +1,4 @@
{
{
"flow":{
"name":"test",
"uuid":"1234",

View File

@ -10,7 +10,7 @@
"bundle":"cn.piflow.bundle.http.GetUrl",
"properties":{
"url":"https://api.elsevier.com/content/search/scopus?query=TITLE('title')&apiKey=555637gxd",
"types":"json",
"httpAcceptTypes":"json",
"label":"",
"schema":""
}
@ -20,7 +20,7 @@
"bundle":"cn.piflow.bundle.http.GetUrl",
"properties":{
"url":"https://api.elsevier.com/content/search/scopus?query=TITLE('title')&apiKey=555637gxd",
"types":"xml",
"httpAcceptTypes":"xml",
"label":"service-error,status",
"schema":"statusCode,statusText"
}

View File

@ -0,0 +1,37 @@
{
"flow":{
"name":"incremental",
"uuid":"1234",
"stops":[
{
"uuid":"1111",
"name":"MysqlReadIncremental",
"bundle":"cn.piflow.bundle.jdbc.MysqlReadIncremental",
"properties":{
"url":"jdbc:mysql://10.0.86.191:3306/piflow_web",
"sql":"select * from flow where last_update_dttm > #~#",
"user":"root",
"password":"root",
"incrementalField":"last_update_dttm",
"incrementalStart":"2019-05-05 15:48:29"
}
},
{
"uuid":"2222",
"name":"JsonSave",
"bundle":"cn.piflow.bundle.json.JsonSave",
"properties":{
"jsonSavePath":"hdfs://10.0.86.89:9000/xjzhu/flow.json"
}
}
],
"paths":[
{
"from":"MysqlReadIncremental",
"outport":"",
"inport":"",
"to":"JsonSave"
}
]
}
}

View File

@ -63,7 +63,7 @@ class CsvStringParser extends ConfigurableStop{
.displayName("Str")
.defaultValue("")
.required(true)
.example("")
.example("1,zs\n2,ls\n3,ww")
descriptor = str :: descriptor
val delimiter = new PropertyDescriptor()
@ -72,7 +72,7 @@ class CsvStringParser extends ConfigurableStop{
.description("The delimiter of CSV string")
.defaultValue("")
.required(true)
.example("")
.example(",")
descriptor = delimiter :: descriptor
val schema = new PropertyDescriptor()

View File

@ -50,7 +50,7 @@ class GetFile extends ConfigurableStop{
.description("Server IP where the local file is located")
.defaultValue("")
.required(true)
.example("192.168.3.139")
.example("127.0.0.1")
descriptor = IP :: descriptor
val User = new PropertyDescriptor()

View File

@ -0,0 +1,175 @@
package cn.piflow.bundle.hbase
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.conf.{ConfigurableStop, Port, StopGroup}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
import scala.collection.mutable.ArrayBuffer
class ReadHbase extends ConfigurableStop{
override val authorEmail: String = "bf219319@cnic.com"
override val description: String = "Read data from Hbase"
override val inportList: List[String] = List(Port.DefaultPort)
override val outportList: List[String] = List(Port.DefaultPort)
var quorum :String= _
var port :String = _
var znodeParent:String= _
var table:String=_
var rowid:String=_
var family:String= _
var qualifier:String=_
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum", quorum)
hbaseConf.set("hbase.zookeeper.property.clientPort", port)
hbaseConf.set("zookeeper.znode.parent",znodeParent)
hbaseConf.set(TableInputFormat.INPUT_TABLE, table)
val sc = spark.sparkContext
val hbaseRDD= sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
val schema: Array[String] = qualifier.split(",")
val families=family.split(",")
val col_str=rowid+","+qualifier
val newSchema:Array[String]=col_str.split(",")
val fields: Array[StructField] = newSchema.map(d=>StructField(d,StringType,nullable = true))
val dfSchema: StructType = StructType(fields)
val kv = hbaseRDD.map(r => {
val rowkey = Bytes.toString(r._2.getRow)
val row = new ArrayBuffer[String]
row += rowkey
if(families.size==1){
schema.foreach(c => {
val fields = Bytes.toString(r._2.getValue(Bytes.toBytes(family), Bytes.toBytes(c)))
row += fields
})
}else{
families.foreach(f=>{
schema.foreach(c => {
val fields = Bytes.toString(r._2.getValue(Bytes.toBytes(f), Bytes.toBytes(c)))
if (fields==null){
row
}else{
row += fields
}
})
})
}
Row.fromSeq(row.toArray.toSeq)
})
val df=spark.createDataFrame(kv,dfSchema)
out.write(df)
}
override def setProperties(map: Map[String, Any]): Unit = {
quorum = MapUtil.get(map,key="quorum").asInstanceOf[String]
port = MapUtil.get(map,key="port").asInstanceOf[String]
znodeParent = MapUtil.get(map,key="znodeParent").asInstanceOf[String]
table = MapUtil.get(map,key="table").asInstanceOf[String]
rowid = MapUtil.get(map,key="rowid").asInstanceOf[String]
family = MapUtil.get(map,key="family").asInstanceOf[String]
qualifier = MapUtil.get(map,key="qualifier").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val quorum = new PropertyDescriptor()
.name("quorum")
.displayName("Quorum")
.defaultValue("")
.description("Zookeeper cluster address")
.required(true)
.example("10.0.0.101,10.0.0.102,10.0.0.103")
descriptor = quorum :: descriptor
val port = new PropertyDescriptor()
.name("port")
.displayName("Port")
.defaultValue("")
.description("Zookeeper connection port")
.required(true)
.example("2181")
descriptor = port :: descriptor
val znodeParent = new PropertyDescriptor()
.name("znodeParent")
.displayName("ZnodeParent")
.defaultValue("")
.description("Hbase znode location in zookeeper")
.required(true)
.example("/hbase-unsecure")
descriptor = znodeParent :: descriptor
val table = new PropertyDescriptor()
.name("table")
.displayName("Table")
.defaultValue("")
.description("Table in Hbase")
.required(true)
.example("test or dbname:test")
descriptor = table :: descriptor
val rowid = new PropertyDescriptor()
.name("rowid")
.displayName("rowid")
.defaultValue("")
.description("Rowkey of table in Hbase")
.required(true)
.example("rowkey")
descriptor = rowid :: descriptor
val family = new PropertyDescriptor()
.name("family")
.displayName("Family")
.defaultValue("")
.description("The column family of table,multiple column families are separated by commas")
.required(true)
.example("info")
descriptor = family :: descriptor
val qualifier = new PropertyDescriptor()
.name("qualifier")
.displayName("Qualifier")
.defaultValue("")
.description("Field of column family,fill in the order of column family")
.required(true)
.example("name,age")
descriptor = qualifier :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("icon/hbase/GetHbase.png")
}
override def getGroup(): List[String] = {
List(StopGroup.HbaseGroup)
}
override def initialize(ctx: ProcessContext): Unit = {
}
}

View File

@ -174,6 +174,7 @@ class SaveToHdfs extends ConfigurableStop {
.description("Does the csv file have a header")
.defaultValue("true")
.required(true)
.example("true")
descriptor = header :: descriptor
descriptor
}

View File

@ -51,10 +51,10 @@ class MysqlReadIncremental extends ConfigurableIncrementalStop{
val url=new PropertyDescriptor()
.name("url")
.displayName("Url")
.description("The Url, for example jdbc:mysql://127.0.0.1/dbname")
.description("The Url of mysql")
.defaultValue("")
.required(true)
.example("")
.example("jdbc:mysql://127.0.0.1/dbname")
descriptor = url :: descriptor
val user=new PropertyDescriptor()
@ -63,7 +63,7 @@ class MysqlReadIncremental extends ConfigurableIncrementalStop{
.description("The user name of database")
.defaultValue("")
.required(true)
.example("")
.example("root")
descriptor = user :: descriptor
val password=new PropertyDescriptor()
@ -72,7 +72,7 @@ class MysqlReadIncremental extends ConfigurableIncrementalStop{
.description("The password of database")
.defaultValue("")
.required(true)
.example("")
.example("123456")
.sensitive(true)
descriptor = password :: descriptor
@ -82,7 +82,7 @@ class MysqlReadIncremental extends ConfigurableIncrementalStop{
.description("The sql sentence you want to execute")
.defaultValue("")
.required(true)
.example("")
.example("select * from user")
descriptor = sql :: descriptor
val incrementalField=new PropertyDescriptor()
@ -91,7 +91,7 @@ class MysqlReadIncremental extends ConfigurableIncrementalStop{
.description("The incremental field")
.defaultValue("")
.required(true)
.example("")
.example("update_date")
descriptor = incrementalField :: descriptor
val incrementalStart=new PropertyDescriptor()
@ -100,7 +100,7 @@ class MysqlReadIncremental extends ConfigurableIncrementalStop{
.description("The incremental start value")
.defaultValue("")
.required(true)
.example("")
.example("2020-04-07")
descriptor = incrementalStart :: descriptor
descriptor

View File

@ -47,7 +47,7 @@ class OracleRead extends ConfigurableStop{
val url=new PropertyDescriptor()
.name("url")
.displayName("Url")
.description("The Url, for example jdbc:oracle:thin:@10.0.86.237:1521/newdb")
.description("The Url of oracle")
.defaultValue("")
.required(true)
.example("jdbc:oracle:thin:@10.0.86.237:1521/newdb")

View File

@ -18,7 +18,6 @@ class XmlParser extends ConfigurableStop {
var xmlpath:String = _
var rowTag:String = _
var schema: StructType = _
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
@ -27,7 +26,6 @@ class XmlParser extends ConfigurableStop {
val xmlDF = spark.read.format("com.databricks.spark.xml")
.option("rowTag",rowTag)
.option("treatEmptyValuesAsNulls",true)
/*.schema(schema)*/
.load(xmlpath)
out.write(xmlDF)
@ -40,7 +38,6 @@ class XmlParser extends ConfigurableStop {
def setProperties(map : Map[String, Any]) = {
xmlpath = MapUtil.get(map,"xmlpath").asInstanceOf[String]
rowTag = MapUtil.get(map,"rowTag").asInstanceOf[String]
schema = null
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {

View File

@ -16,7 +16,7 @@ class MysqlReadIncrementalTest {
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/jdbc/JdbcReadFromOracle.json"
val file = "src/main/resources/flow/jdbc/MysqlReadIncremental.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)

View File

@ -0,0 +1,57 @@
package cn.piflow.bundle.hbase
import java.net.InetAddress
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.{PropertyUtil, ServerIpUtil}
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
class ReadHbaseTest {
@Test
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/hbase/ReadHbase.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val ip = InetAddress.getLocalHost.getHostAddress
cn.piflow.util.FileUtil.writeFile("server.ip=" + ip, ServerIpUtil.getServerIpFile())
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort", "50001").start()
//execute flow
val spark = SparkSession.builder()
.master("local[*]")
.appName("MysqlReadTest")
.config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g")
.config("spark.cores.max", "2")
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "")
.bind("debug.path","")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
}

2537
piflow_V0.7_componets.md Normal file

File diff suppressed because it is too large Load Diff