Merge remote-tracking branch 'origin/master'

This commit is contained in:
bao319 2020-03-26 21:03:50 +08:00
commit 03b2e6d439
16 changed files with 555 additions and 37 deletions

View File

@ -0,0 +1,34 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"0000",
"name":"SelectHiveQL",
"bundle":"cn.piflow.bundle.hive.SelectHiveQL",
"properties":{
"hiveQL":"select * from test.user1"
}
}, {
"uuid":"1111",
"name":"XmlParser",
"bundle":"cn.piflow.bundle.xml.XmlParser",
"properties":{
"xmlpath": "hdfs://192.168.3.138:8020/work/test/xml.xml",
"rowTag": "name"
}
}
],
"paths":[
{
"from":"SelectHiveQL",
"outport":"",
"inport":"",
"to":"XmlParser"
}
]
}
}

View File

@ -0,0 +1,45 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"0000",
"name":"SelectHiveQL",
"bundle":"cn.piflow.bundle.hive.SelectHiveQL",
"properties":{
"hiveQL":"select * from test.xmltest"
}
}, {
"uuid":"1111",
"name":"XmlParserColumns",
"bundle":"cn.piflow.bundle.xml.XmlParserColumns",
"properties":{
"xmlColumns": "product_xml"
}
},{
"uuid":"2222",
"name":"ExecuteSQLStop",
"bundle":"cn.piflow.bundle.common.ExecuteSQLStop",
"properties":{
"sql":"select id,rnum,product_xml.product.pub_basic.en_title as en_title,product_xml.product.pub_basic.pub_type_id as pub_type_id from temp ",
"ViewName": "temp"
}
}
],
"paths":[
{
"from":"SelectHiveQL",
"outport":"",
"inport":"",
"to":"XmlParserColumns"
}, {
"from":"XmlParserColumns",
"outport":"",
"inport":"",
"to":"ExecuteSQLStop"
}
]
}
}

View File

@ -0,0 +1,34 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"0000",
"name":"SelectHiveQL",
"bundle":"cn.piflow.bundle.hive.SelectHiveQL",
"properties":{
"hiveQL":"select * from test.user1"
}
}, {
"uuid":"1111",
"name":"XmlParserFolder",
"bundle":"cn.piflow.bundle.xml.XmlParserFolder",
"properties":{
"xmlpath": "hdfs://192.168.3.138:8020/work/test/xml/",
"rowTag": "name,url"
}
}
],
"paths":[
{
"from":"SelectHiveQL",
"outport":"",
"inport":"",
"to":"XmlParserFolder"
}
]
}
}

View File

@ -0,0 +1,33 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"0000",
"name":"SelectHiveQL",
"bundle":"cn.piflow.bundle.hive.SelectHiveQL",
"properties":{
"hiveQL":"select * from test.user1"
}
}, {
"uuid":"1111",
"name":"XmlSave",
"bundle":"cn.piflow.bundle.xml.XmlSave",
"properties":{
"xmlSavePath": "hdfs://192.168.3.138:8020/work/test/test.xml"
}
}
],
"paths":[
{
"from":"SelectHiveQL",
"outport":"",
"inport":"",
"to":"XmlSave"
}
]
}
}

View File

@ -0,0 +1,29 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"0000",
"name":"XmlStringParser",
"bundle":"cn.piflow.bundle.xml.XmlStringParser",
"properties":{
"XmlString":"<sites>\n <site>\n <name>菜鸟教程</name>\n <url>www.runoob.com</url>\n </site>\n <site>\n <name>Google</name>\n <url>www.google.com</url>\n </site>\n <site>\n <name>淘宝</name>\n <url>www.taobao.com</url>\n </site>\n</sites>",
"label":"sites,site",
"schema":"name,url"
}
}
],
"paths":[
{
"from":"",
"outport":"",
"inport":"",
"to":""
}
]
}
}

View File

@ -13,8 +13,8 @@ class XmlParser extends ConfigurableStop {
val authorEmail: String = "xjzhu@cnic.cn"
val description: String = "Parse xml file"
val inportList: List[String] = List(Port.AnyPort.toString)
val outportList: List[String] = List(Port.DefaultPort.toString)
val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.DefaultPort)
var xmlpath:String = _
var rowTag:String = _
@ -30,10 +30,6 @@ class XmlParser extends ConfigurableStop {
/*.schema(schema)*/
.load(xmlpath)
/*xmlDF.select("ee").rdd.collect().foreach( row =>
println(row.toSeq)
)*/
//xmlDF.show(30)
out.write(xmlDF)
}
@ -49,8 +45,22 @@ class XmlParser extends ConfigurableStop {
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val xmlpath = new PropertyDescriptor().name("xmlpath").displayName("xmlpath").description("the path of xml file").defaultValue("").required(true)
val rowTag = new PropertyDescriptor().name("rowTag").displayName("rowTag").description("the tag you want to parse in xml file").defaultValue("").required(true)
val xmlpath = new PropertyDescriptor()
.name("xmlpath")
.displayName("xmlpath")
.description("the path of xml file")
.defaultValue("")
.required(true)
.example("hdfs://192.168.3.138:8020/work/test/testxml.xml")
val rowTag = new PropertyDescriptor()
.name("rowTag")
.displayName("rowTag")
.description("the tag you want to parse in xml file")
.defaultValue("")
.required(true)
.example("name")
descriptor = xmlpath :: descriptor
descriptor = rowTag :: descriptor
descriptor
@ -61,7 +71,7 @@ class XmlParser extends ConfigurableStop {
}
override def getGroup(): List[String] = {
List(StopGroup.XmlGroup.toString)
List(StopGroup.XmlGroup)
}
}

View File

@ -9,12 +9,12 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
class XmlParserWithJson extends ConfigurableStop {
class XmlParserColumns extends ConfigurableStop {
val authorEmail: String = "ygang@cnic.cn"
val description: String = "Parse xml fields "
val inportList: List[String] = List(Port.DefaultPort.toString)
val outportList: List[String] = List(Port.DefaultPort.toString)
val description: String = "Parse xml data in columns in upstream data"
val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.DefaultPort)
var xmlColumns:String = _
@ -64,7 +64,13 @@ class XmlParserWithJson extends ConfigurableStop {
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val xmlColumns = new PropertyDescriptor().name("xmlColumns").displayName("xmlColumns").description("you want to parse contains XML fields ,Multiple are separated by commas").defaultValue("").required(true)
val xmlColumns = new PropertyDescriptor()
.name("xmlColumns")
.displayName("xmlColumns")
.description("Parsed column names containing xml,Multiple columns separated by commas")
.defaultValue("")
.required(true)
.example("product_xml")
descriptor = xmlColumns :: descriptor
descriptor
}
@ -74,7 +80,7 @@ class XmlParserWithJson extends ConfigurableStop {
}
override def getGroup(): List[String] = {
List(StopGroup.XmlGroup.toString)
List(StopGroup.XmlGroup)
}

View File

@ -16,11 +16,11 @@ import scala.util.control.Breaks._
/**
* Created by admin on 2018/8/27.
*/
class FolderXmlParser extends ConfigurableStop{
class XmlParserFolder extends ConfigurableStop{
val authorEmail: String = "lijie"
val description: String = "Parse xml folder"
val inportList: List[String] = List(Port.NonePort.toString)
val outportList: List[String] = List(Port.DefaultPort.toString)
val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.DefaultPort)
var rowTag:String = _
var xmlpath:String = _
@ -39,8 +39,21 @@ class FolderXmlParser extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val folederXmlStop = new PropertyDescriptor().name("folederXmlStop").displayName("FolederXmlStop").defaultValue("").required(true)
val rowTag = new PropertyDescriptor().name("rowTag").displayName("rowTag").description("the tag you want to parse in xml file").defaultValue("").required(true)
val folederXmlStop = new PropertyDescriptor()
.name("folederXmlStop")
.displayName("FolederXmlStop")
.defaultValue("")
.required(true)
.example("hdfs://192.168.3.138:8020/work/test/xml/")
val rowTag = new PropertyDescriptor()
.name("rowTag")
.displayName("rowTag")
.description("the tag you want to parse in xml file")
.defaultValue("")
.required(true)
.example("name,url")
descriptor = folederXmlStop :: descriptor
descriptor = rowTag :: descriptor
descriptor
@ -77,7 +90,7 @@ class FolderXmlParser extends ConfigurableStop{
//获取每个xml得dataframe
def getDf(path:String,sparkSession: SparkSession):DataFrame={
val df = sparkSession.read.format("com.databricks.spark.xml")
.option("rowTag", "phdthesis")
.option("rowTag", rowTag)
.option("treatEmptyValuesAsNulls", true)
.load(path)
df
@ -94,13 +107,13 @@ class FolderXmlParser extends ConfigurableStop{
}
}
var df = spark.read.format("com.databricks.spark.xml")
.option("rowTag", "phdthesis")
.option("rowTag", rowTag)
.option("treatEmptyValuesAsNulls", true)
.load(pathArr(index))
for(d <- index+1 until(pathArr.length)){
if(getDf(pathArr(d),spark).count()!=0){
val df1 = spark.read.format("com.databricks.spark.xml")
.option("rowTag", "phdthesis")
.option("rowTag", rowTag)
.option("treatEmptyValuesAsNulls", true)
.load(pathArr(d))
df = df.union(df1)

View File

@ -12,14 +12,13 @@ class XmlSave extends ConfigurableStop{
val authorEmail: String = "xjzhu@cnic.cn"
val description: String = "Save data to xml file"
val inportList: List[String] = List(Port.DefaultPort.toString)
val outportList: List[String] = List(Port.NonePort.toString)
val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.DefaultPort)
var xmlSavePath:String = _
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val xmlDF = in.read()
//xmlDF.show()
xmlDF.write.format("xml").save(xmlSavePath)
}
@ -34,7 +33,13 @@ class XmlSave extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val xmlSavePath = new PropertyDescriptor().name("xmlSavePath").displayName("xmlSavePath").description("the sva path of xml file").defaultValue("").required(true)
val xmlSavePath = new PropertyDescriptor()
.name("xmlSavePath")
.displayName("XmlSavePath")
.description("xml file save path")
.defaultValue("")
.required(true)
.example("hdfs://192.168.3.138:8020/work/test/test.xml")
descriptor = xmlSavePath :: descriptor
descriptor
}
@ -44,7 +49,7 @@ class XmlSave extends ConfigurableStop{
}
override def getGroup(): List[String] = {
List(StopGroup.XmlGroup.toString)
List(StopGroup.XmlGroup)
}
}

View File

@ -16,8 +16,8 @@ import scala.collection.mutable.{ArrayBuffer, ListBuffer}
class XmlStringParser extends ConfigurableStop {
override val authorEmail: String = "yangqidong@cnic.cn"
val inportList: List[String] = List(Port.NonePort.toString)
val outportList: List[String] = List(Port.DefaultPort.toString)
val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.DefaultPort)
override val description: String = "Parse xml string"
var XmlString:String=_
@ -65,7 +65,6 @@ class XmlStringParser extends ConfigurableStop {
}
val listRows: List[Row] = list.toList.map(line => {
val seq: Seq[String] = line.split(",").toSeq
val row = Row.fromSeq(seq)
@ -100,11 +99,31 @@ class XmlStringParser extends ConfigurableStop {
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val XmlString = new PropertyDescriptor().name("XmlString").displayName("XmlString").description("the xml String").defaultValue("").required(true)
val XmlString = new PropertyDescriptor()
.name("XmlString")
.displayName("XmlString")
.description("the xml String")
.defaultValue("")
.required(true)
.example("<sites>\n <site>\n <name>菜鸟教程</name>\n <url>www.runoob.com</url>\n </site>\n <site>\n <name>Google</name>\n <url>www.google.com</url>\n </site>\n <site>\n <name>淘宝</name>\n <url>www.taobao.com</url>\n </site>\n</sites>")
descriptor = XmlString :: descriptor
val label = new PropertyDescriptor().name("label").displayName("label").description("label path for hope,the delimiter is ,").defaultValue("").required(true)
val label = new PropertyDescriptor()
.name("label")
.displayName("label")
.description("Parsed label path")
.defaultValue("")
.required(true)
.example("sites,site")
descriptor = label :: descriptor
val schema = new PropertyDescriptor().name("schema").displayName("schema").description("name of field in label,the delimiter is ,").defaultValue("").required(true)
val schema = new PropertyDescriptor()
.name("schema")
.displayName("schema")
.description("Parsed tag name")
.defaultValue("")
.required(true)
.example("name,url")
descriptor = schema :: descriptor
descriptor
}
@ -114,7 +133,7 @@ class XmlStringParser extends ConfigurableStop {
}
override def getGroup(): List[String] = {
List(StopGroup.XmlGroup.toString)
List(StopGroup.XmlGroup)
}
override def initialize(ctx: ProcessContext): Unit = {

View File

@ -3,7 +3,7 @@ package cn.piflow.bundle.test
import cn.piflow._
import cn.piflow.bundle.common.SelectField
import cn.piflow.bundle.hive.PutHiveStreaming
import cn.piflow.bundle.xml.{FolderXmlParser, XmlParser}
import cn.piflow.bundle.xml.{XmlParserFolder, XmlParser}
import org.apache.spark.sql.SparkSession
import org.junit.Test
@ -74,7 +74,7 @@ class XmlTest {
val putHiveStreamingParameters = Map("database" -> "sparktest", "table" -> "xmldblp_phdthesis")
val folderXmlParserStop = new FolderXmlParser
val folderXmlParserStop = new XmlParserFolder
folderXmlParserStop.setProperties(folderXmlParserParameters)
val selectFieldStop = new SelectField

View File

@ -0,0 +1,58 @@
package cn.piflow.bundle.xml
import java.net.InetAddress
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.{PropertyUtil, ServerIpUtil}
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
class XmlParserColumnsTest {
@Test
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/xml/xmlParserColumns.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val ip = InetAddress.getLocalHost.getHostAddress
cn.piflow.util.FileUtil.writeFile("server.ip=" + ip, ServerIpUtil.getServerIpFile())
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start()
//execute flow
val spark = SparkSession.builder()
.master("local[12]")
.appName("hive")
.config("spark.driver.memory", "4g")
.config("spark.executor.memory", "8g")
.config("spark.cores.max", "8")
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "")
.bind("debug.path","")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
}

View File

@ -0,0 +1,58 @@
package cn.piflow.bundle.xml
import java.net.InetAddress
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.{PropertyUtil, ServerIpUtil}
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
class XmlParserFolderTest {
@Test
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/xml/xmlParserFolder.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val ip = InetAddress.getLocalHost.getHostAddress
cn.piflow.util.FileUtil.writeFile("server.ip=" + ip, ServerIpUtil.getServerIpFile())
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start()
//execute flow
val spark = SparkSession.builder()
.master("local[12]")
.appName("hive")
.config("spark.driver.memory", "4g")
.config("spark.executor.memory", "8g")
.config("spark.cores.max", "8")
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "")
.bind("debug.path","")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
}

View File

@ -0,0 +1,58 @@
package cn.piflow.bundle.xml
import java.net.InetAddress
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.{PropertyUtil, ServerIpUtil}
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
class XmlParserTest {
@Test
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/xml/xmlParser.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val ip = InetAddress.getLocalHost.getHostAddress
cn.piflow.util.FileUtil.writeFile("server.ip=" + ip, ServerIpUtil.getServerIpFile())
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start()
//execute flow
val spark = SparkSession.builder()
.master("local[12]")
.appName("hive")
.config("spark.driver.memory", "4g")
.config("spark.executor.memory", "8g")
.config("spark.cores.max", "8")
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "")
.bind("debug.path","")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
}

View File

@ -0,0 +1,58 @@
package cn.piflow.bundle.xml
import java.net.InetAddress
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.{PropertyUtil, ServerIpUtil}
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
class XmlSaveTest {
@Test
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/xml/xmlSave.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val ip = InetAddress.getLocalHost.getHostAddress
cn.piflow.util.FileUtil.writeFile("server.ip=" + ip, ServerIpUtil.getServerIpFile())
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start()
//execute flow
val spark = SparkSession.builder()
.master("local[12]")
.appName("hive")
.config("spark.driver.memory", "4g")
.config("spark.executor.memory", "8g")
.config("spark.cores.max", "8")
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "")
.bind("debug.path","")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
}

View File

@ -0,0 +1,58 @@
package cn.piflow.bundle.xml
import java.net.InetAddress
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.{PropertyUtil, ServerIpUtil}
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
class XmlStringTest {
@Test
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/xml/xmlStringParser.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val ip = InetAddress.getLocalHost.getHostAddress
cn.piflow.util.FileUtil.writeFile("server.ip=" + ip, ServerIpUtil.getServerIpFile())
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start()
//execute flow
val spark = SparkSession.builder()
.master("local[12]")
.appName("hive")
.config("spark.driver.memory", "4g")
.config("spark.executor.memory", "8g")
.config("spark.cores.max", "8")
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "")
.bind("debug.path","")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
}