add piflow bundle module

This commit is contained in:
judy0131 2018-07-13 10:42:48 +08:00
parent d4a38b0a84
commit 1440871961
26 changed files with 1183 additions and 0 deletions

Binary file not shown.

Binary file not shown.

32
piflow-bundle/pom.xml Normal file
View File

@ -0,0 +1,32 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>piflow-project</artifactId>
<groupId>piflow</groupId>
<version>0.9</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>piflow-bundle</artifactId>
<dependencies>
<dependency>
<groupId>piflow</groupId>
<artifactId>piflow-core</artifactId>
<version>0.9</version>
</dependency>
<dependency>
<groupId>piflow</groupId>
<artifactId>piflow-conf</artifactId>
<version>0.9</version>
</dependency>
<dependency>
<groupId>org.clapper</groupId>
<artifactId>classutil_2.11</artifactId>
<version>1.3.0</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,79 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"1111",
"name":"XmlParser",
"bundle":"cn.piflow.bundle.xml.XmlParser",
"properties":{
"xmlpath":"hdfs://10.0.86.89:9000/xjzhu/dblp.mini.xml",
"rowTag":"phdthesis"
}
},
{
"uuid":"2222",
"name":"SelectField",
"bundle":"cn.piflow.bundle.common.SelectField",
"properties":{
"schema":"title,author,pages"
}
},
{
"uuid":"3333",
"name":"PutHiveStreaming",
"bundle":"cn.piflow.bundle.hive.PutHiveStreaming",
"properties":{
"database":"sparktest",
"table":"dblp_phdthesis"
}
},
{
"uuid":"4444",
"name":"CSVParser",
"bundle":"cn.piflow.bundle.csv.CSVParser",
"properties":{
"csvPath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.csv",
"header":"false",
"delimiter":",",
"schema":"title,author,pages"
}
},
{
"uuid":"555",
"name":"Merge",
"bundle":"cn.piflow.bundle.common.Merge",
"properties":{}
}
],
"paths":[
{
"from":"XmlParser",
"outport":"",
"inport":"",
"to":"SelectField"
},
{
"from":"SelectField",
"outport":"",
"inport":"data1",
"to":"Merge"
},
{
"from":"CSVParser",
"outport":"",
"inport":"data2",
"to":"Merge"
},
{
"from":"Merge",
"outport":"",
"inport":"",
"to":"PutHiveStreaming"
}
]
}
}

View File

@ -0,0 +1,90 @@
<configuration>
<property>
<name>hive.metastore.warehouse.dir</name>
<!--之前hdfs中创建的warehouse文件夹-->
<value>/user/hive/warehouse</value>
<description>location of default database for the warehouse</description>
</property>
<property>
<name>hive.metastore.uris</name>
<value>thrift://10.0.86.89:9083</value>
<description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>
</property>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://10.0.86.90:3306/hive?createDatabaseIfNotExist=true</value>
<description>JDBC connect string for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
<description>Driver class name for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
<description>username to use against metastore database</description>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>root</value>
<description>password to use against metastore database</description>
</property>
<!--start for trancaction -->
<property>
<name>hive.support.concurrency</name>
<value>true</value>
</property>
<property>
<name>hive.enforce.bucketing</name>
<value>true</value>
</property>
<property>
<name>hive.exec.dynamic.partition.mode</name>
<value>nonstrict</value>
</property>
<property>
<name>hive.txn.manager</name>
<value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
</property>
<property>
<name>hive.compactor.initiator.on</name>
<value>true</value>
</property>
<property>
<name>hive.compactor.worker.threads</name>
<value>1</value>
</property>
<!-- property>
<name>hive.in.test</name>
<value>true</value>
</propertyi-->
</configuration>

View File

@ -0,0 +1,25 @@
package cn.piflow.bundle.common
import cn.piflow.conf.ConfigurableStop
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.MapUtil
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
class Fork extends ConfigurableStop{
var outports : List[String] = _
override def setProperties(map: Map[String, Any]): Unit = {
outports = MapUtil.get(map,"outports").asInstanceOf[List[String]]
}
override def initialize(ctx: ProcessContext): Unit = {
}
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
outports.foreach(out.write(_, in.read()));
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = ???
}

View File

@ -0,0 +1,22 @@
package cn.piflow.bundle.common
import cn.piflow.conf.ConfigurableStop
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
class Merge extends ConfigurableStop{
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
out.write(in.ports().map(in.read(_)).reduce((x, y) => x.union(y)));
}
def initialize(ctx: ProcessContext): Unit = {
}
def setProperties(map : Map[String, Any]): Unit = {
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = ???
}

View File

@ -0,0 +1,42 @@
package cn.piflow.bundle.common
import cn.piflow._
import cn.piflow.conf.ConfigurableStop
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.MapUtil
import org.apache.spark.sql.{Column, DataFrame}
class SelectField extends ConfigurableStop {
var schema:String = _
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val df = in.read()
val field = schema.split(",")
val columnArray : Array[Column] = new Array[Column](field.size)
for(i <- 0 to field.size - 1){
columnArray(i) = new Column(field(i))
}
var finalFieldDF : DataFrame = df.select(columnArray:_*)
finalFieldDF.show(2)
out.write(finalFieldDF)
}
def initialize(ctx: ProcessContext): Unit = {
}
def setProperties(map : Map[String, Any]): Unit = {
schema = MapUtil.get(map,"schema").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = ???
}

View File

@ -0,0 +1,66 @@
package cn.piflow.bundle.csv
import cn.piflow._
import cn.piflow.conf.ConfigurableStop
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.MapUtil
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, SparkSession}
class CSVParser extends ConfigurableStop{
var csvPath: String = _
var header: Boolean = _
var delimiter: String = _
var schema: String = _
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
var csvDF:DataFrame = null
if (header){
csvDF = spark.read
.option("header",header)
.option("inferSchema","true")
.option("delimiter",delimiter)
/*.schema(schema)*/
.csv(csvPath)
}else{
val field = schema.split(",")
val structFieldArray : Array[StructField] = new Array[StructField](field.size)
for(i <- 0 to field.size - 1){
structFieldArray(i) = new StructField(field(i), StringType, nullable = true)
}
val schemaStructType = StructType(structFieldArray)
csvDF = spark.read
.option("header",header)
.option("inferSchema","false")
.option("delimiter",delimiter)
.schema(schemaStructType)
.csv(csvPath)
}
csvDF.show(10)
out.write(csvDF)
}
def initialize(ctx: ProcessContext): Unit = {
}
def setProperties(map : Map[String, Any]): Unit = {
csvPath = MapUtil.get(map,"csvPath").asInstanceOf[String]
header = MapUtil.get(map,"header").asInstanceOf[String].toBoolean
delimiter = MapUtil.get(map,"delimiter").asInstanceOf[String]
schema = MapUtil.get(map,"schema").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = ???
}

View File

@ -0,0 +1,35 @@
package cn.piflow.bundle.hive
import cn.piflow._
import cn.piflow.conf.ConfigurableStop
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.MapUtil
import org.apache.spark.sql.SparkSession
class PutHiveStreaming extends ConfigurableStop {
var database:String = _
var table:String = _
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
val inDF = in.read()
inDF.show()
val dfTempTable = table + "_temp"
inDF.createOrReplaceTempView(dfTempTable)
spark.sql("insert into " + database + "." + table + " select * from " + dfTempTable)
//out.write(studentDF)
}
def initialize(ctx: ProcessContext): Unit = {
}
def setProperties(map : Map[String, Any]) = {
database = MapUtil.get(map,"database").asInstanceOf[String]
table = MapUtil.get(map,"table").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = ???
}

View File

@ -0,0 +1,41 @@
package cn.piflow.bundle.hive
import cn.piflow._
import cn.piflow.conf.ConfigurableStop
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.MapUtil
import org.apache.spark.sql.SparkSession
class SelectHiveQL extends ConfigurableStop {
var hiveQL:String = _
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
import spark.sql
val df = sql(hiveQL)
df.show()
out.write(df)
}
def initialize(ctx: ProcessContext): Unit = {
}
def setProperties(map : Map[String, Any]): Unit = {
hiveQL = MapUtil.get(map,"hiveQL").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val hiveQL = new PropertyDescriptor().name("hiveQL").displayName("HiveQL").defaultValue("").required(true)
descriptor = hiveQL :: descriptor
descriptor
}
}

View File

@ -0,0 +1,46 @@
package cn.piflow.bundle.jdbc
import cn.piflow._
import cn.piflow.conf.ConfigurableStop
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.MapUtil
import org.apache.spark.sql.SparkSession
class JDBCRead extends ConfigurableStop {
var driver:String = _
var url:String = _
var user:String = _
var password:String = _
var sql:String = _
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
val dbtable = "( " + sql + ") AS Temp"
val jdbcDF = spark.read.format("jdbc")
.option("url", url)
//.option("driver", driver)
.option("dbtable", dbtable)
.option("user", user)
.option("password",password)
.load()
jdbcDF.show(10)
out.write(jdbcDF)
}
def initialize(ctx: ProcessContext): Unit = {
}
override def setProperties(map: Map[String, Any]): Unit = {
driver = MapUtil.get(map,"driver").asInstanceOf[String]
url = MapUtil.get(map,"url").asInstanceOf[String]
user = MapUtil.get(map,"user").asInstanceOf[String]
password = MapUtil.get(map,"password").asInstanceOf[String]
sql = MapUtil.get(map,"sql").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = ???
}

View File

@ -0,0 +1,41 @@
package cn.piflow.bundle.jdbc
import java.util.Properties
import cn.piflow._
import cn.piflow.conf.ConfigurableStop
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.MapUtil
import org.apache.spark.sql.{SaveMode, SparkSession}
class JDBCWrite extends ConfigurableStop{
var url:String = _
var user:String = _
var password:String = _
var dbtable:String = _
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
val jdbcDF = in.read()
val properties = new Properties()
properties.put("user", user)
properties.put("password", password)
jdbcDF.write.mode(SaveMode.Append).jdbc(url,dbtable,properties)
jdbcDF.show(10)
out.write(jdbcDF)
}
def initialize(ctx: ProcessContext): Unit = {
}
override def setProperties(map: Map[String, Any]): Unit = {
url = MapUtil.get(map,"url").asInstanceOf[String]
user = MapUtil.get(map,"user").asInstanceOf[String]
password = MapUtil.get(map,"password").asInstanceOf[String]
dbtable = MapUtil.get(map,"dbtable").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = ???
}

View File

@ -0,0 +1,60 @@
package cn.piflow.bundle.json
import cn.piflow._
import cn.piflow.conf.ConfigurableStop
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.MapUtil
import org.apache.spark.sql.SparkSession
class JsonPathParser extends ConfigurableStop{
var jsonPath: String = _
var tag : String = _
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
val jsonDF = spark.read.option("multiline","true").json(jsonPath)
val jsonDFNew = jsonDF.select(tag)
jsonDFNew.printSchema()
jsonDFNew.show(10)
out.write(jsonDF)
}
def initialize(ctx: ProcessContext): Unit = {
}
override def setProperties(map: Map[String, Any]): Unit = {
jsonPath = MapUtil.get(map,"jsonPath").asInstanceOf[String]
tag = MapUtil.get(map,"tag").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = ???
}
class JsonStringParser extends ConfigurableStop{
var jsonString: String = _
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
val jsonRDD = spark.sparkContext.makeRDD(jsonString :: Nil)
val jsonDF = spark.read.json(jsonRDD)
jsonDF.show(10)
out.write(jsonDF)
}
def initialize(ctx: ProcessContext): Unit = {
}
override def setProperties(map: Map[String, Any]): Unit = {
jsonString = MapUtil.get(map,"jsonString").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = ???
}

View File

@ -0,0 +1,30 @@
package cn.piflow.bundle.json
import cn.piflow._
import cn.piflow.conf.ConfigurableStop
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.MapUtil
import org.apache.spark.sql.SaveMode
class JsonSave extends ConfigurableStop{
var jsonSavePath: String = _
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val jsonDF = in.read()
jsonDF.show()
jsonDF.write.format("json").mode(SaveMode.Overwrite).save(jsonSavePath)
}
def initialize(ctx: ProcessContext): Unit = {
}
override def setProperties(map: Map[String, Any]): Unit = {
jsonSavePath = MapUtil.get(map,"jsonSavePath").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = ???
}

View File

@ -0,0 +1,67 @@
package cn.piflow.bundle.util
import java.io.File
import cn.piflow.conf.ConfigurableStop
import org.clapper.classutil.ClassFinder
import scala.util.control.Breaks
object ClassUtil {
val configurableStopClass:String = "cn.piflow.conf.ConfigurableStop"
val classpath:String = "/opt/project/piflow"
def findAllConfigurableStop() : List[String] = {
var stopList : List[String] = List()
val classpathFile = new File(classpath)
val finder = ClassFinder(getJarFile(classpathFile))
val classes = finder.getClasses
val classMap = ClassFinder.classInfoMap(classes)
val plugins = ClassFinder.concreteSubclasses(configurableStopClass,classMap)
plugins.foreach{
plugin =>
//println(plugin.name)
stopList = plugin.name :: stopList
}
stopList
}
def findConfigurableStop(bundle : String) : Option[ConfigurableStop] = {
var stop:Option[ConfigurableStop] = None
val classpathFile = new File(classpath)
val finder = ClassFinder(getJarFile(classpathFile))
val classes = finder.getClasses
val classMap = ClassFinder.classInfoMap(classes)
val plugins = ClassFinder.concreteSubclasses("cn.piflow.conf.ConfigurableStop",classMap)
plugins.foreach{
pluginString =>
//println(pluginString.name)
if(pluginString.name.equals(bundle)){
val plugin = Class.forName(pluginString.name).newInstance()
stop = Some(plugin.asInstanceOf[ConfigurableStop])
return stop
}
}
stop
}
def getJarFile(dir : File) : Seq[File] = {
val files = dir.listFiles.filter(! _.isDirectory).filter( _.toString.endsWith(".jar"))
files ++ dir.listFiles().filter(_.isDirectory).flatMap(getJarFile)
}
def main(args: Array[String]): Unit = {
//val stop = findConfigurableStop("cn.piflow.bundle.Class1")
val allConfigurableStopList = findAllConfigurableStop()
println("\n\n\n" + allConfigurableStopList)
}
}

View File

@ -0,0 +1,44 @@
package cn.piflow.bundle.xml
import cn.piflow._
import cn.piflow.conf.ConfigurableStop
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.MapUtil
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
class XmlParser extends ConfigurableStop {
var xmlpath:String = _
var rowTag:String = _
var schema: StructType = _
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
val xmlDF = spark.read.format("com.databricks.spark.xml")
.option("rowTag",rowTag)
.option("treatEmptyValuesAsNulls",true)
/*.schema(schema)*/
.load(xmlpath)
/*xmlDF.select("ee").rdd.collect().foreach( row =>
println(row.toSeq)
)*/
xmlDF.show(30)
out.write(xmlDF)
}
def initialize(ctx: ProcessContext): Unit = {
}
def setProperties(map : Map[String, Any]) = {
xmlpath = MapUtil.get(map,"xmlpath").asInstanceOf[String]
rowTag = MapUtil.get(map,"rowTag").asInstanceOf[String]
schema = null
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = ???
}

View File

@ -0,0 +1,28 @@
package cn.piflow.bundle.xml
import cn.piflow._
import cn.piflow.conf.ConfigurableStop
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.MapUtil
class XmlSave extends ConfigurableStop{
var xmlSavePath:String = _
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val xmlDF = in.read()
xmlDF.show()
xmlDF.write.format("xml").save(xmlSavePath)
}
def initialize(ctx: ProcessContext): Unit = {
}
override def setProperties(map: Map[String, Any]): Unit = {
xmlSavePath = MapUtil.get(map,"xmlSavePath").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = ???
}

View File

@ -0,0 +1,95 @@
package cn.piflow.bundle
import cn.piflow.bundle.csv.CSVParser
import cn.piflow.bundle.json.JsonSave
import cn.piflow.{FlowImpl, Path, Runner}
import org.apache.spark.sql.SparkSession
import org.junit.Test
class CSVTest {
@Test
def testCSVHeaderRead(): Unit ={
val csvParserParameters = Map(
"csvPath" -> "hdfs://10.0.86.89:9000/xjzhu/student.csv",
"header" -> "true",
"delimiter" -> ",",
"schema" -> "")
val jsonSaveParameters = Map(
"jsonPath" -> "hdfs://10.0.86.89:9000/xjzhu/student_csv2json")
val csvParserStop = new CSVParser
csvParserStop.setProperties(csvParserParameters)
val jsonPathStop =new JsonSave
jsonPathStop.setProperties(jsonSaveParameters)
val flow = new FlowImpl();
flow.addStop("CSVParser", csvParserStop);
flow.addStop("JsonSave", jsonPathStop);
flow.addPath(Path.from("CSVParser").to("JsonSave"));
val spark = SparkSession.builder()
.master("spark://10.0.86.89:7077")
.appName("piflow-hive-bundle")
.config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g")
.config("spark.cores.max", "2")
.config("spark.jars","/opt/project/piflow-jar-bundle/out/artifacts/piflow-jar-bundle/piflow-jar-bundle.jar")
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.start(flow);
process.awaitTermination();
spark.close();
}
@Test
def testCSVSchemaRead(): Unit ={
val csvParserParameters : Map[String, String] = Map(
"csvPath" -> "hdfs://10.0.86.89:9000/xjzhu/student_schema.csv",
"header" -> "false",
"delimiter" -> ",",
"schema" -> "id,name,gender,age"
)
val jsonSaveParameters = Map(
"jsonPath" -> "hdfs://10.0.86.89:9000/xjzhu/student_schema_csv2json")
val csvParserStop = new CSVParser
csvParserStop.setProperties(csvParserParameters)
val jsonSaveStop = new JsonSave
jsonSaveStop.setProperties(jsonSaveParameters)
val flow = new FlowImpl();
flow.addStop("CSVParser", csvParserStop);
flow.addStop("JsonSave", jsonSaveStop);
flow.addPath(Path.from("CSVParser").to("JsonSave"));
val spark = SparkSession.builder()
.master("spark://10.0.86.89:7077")
.appName("piflow-hive-bundle")
.config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g")
.config("spark.cores.max", "2")
.config("spark.jars","/opt/project/piflow-jar-bundle/out/artifacts/piflow-jar-bundle/piflow-jar-bundle.jar")
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.start(flow);
process.awaitTermination();
spark.close();
}
}

View File

@ -0,0 +1,62 @@
package cn.piflow.bundle
import cn.piflow.bundle.util.ClassUtil.{findAllConfigurableStop, findConfigurableStop}
import org.junit.Test
class ClassFindTest {
/*@Test
def testClassFind(): Unit = {
val selectHiveQLClassName = "cn.cnic.bigdata.bundle.hive.SelectHiveQL"
val putHiveStreamingClassName = "cn.cnic.bigdata.bundle.hive.PutHiveStreaming"
//classOf[SelectHiveQL].getConstructor(classOf[Map[String, String]]).newInstance(map)
val selectHiveQLParameters : Map[String, String] = Map("hiveQL" -> "select * from sparktest.student")
val putHiveStreamingParameters : Map[String, String] = Map("database" -> "sparktest", "table" -> "studenthivestreaming")
val selectHiveQLStop = Class.forName(selectHiveQLClassName).getConstructor(classOf[Map[String, String]]).newInstance(selectHiveQLParameters)
val putHiveStreamingStop = Class.forName(putHiveStreamingClassName).getConstructor(classOf[Map[String, String]]).newInstance(putHiveStreamingParameters)
val flow = new FlowImpl();
flow.addStop("SelectHiveQL", selectHiveQLStop.asInstanceOf[Stop]);
flow.addStop("PutHiveStreaming", putHiveStreamingStop.asInstanceOf[Stop]);
flow.addPath(Path.from("SelectHiveQL").to("PutHiveStreaming"));
val spark = SparkSession.builder()
.master("spark://10.0.86.89:7077")
.appName("piflow-hive-bundle")
.config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g")
.config("spark.cores.max", "2")
.config("spark.jars","/opt/project/piflow-jar-bundle/out/artifacts/piflow-jar-bundle/piflow-jar-bundle.jar")
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.start(flow);
process.awaitTermination();
spark.close();
}*/
@Test
def testFindConfigurableStop() = {
val bundle = "cn.piflow.bundle.hive.SelectHiveQL"
val stop = findConfigurableStop(bundle)
stop match {
case Some(x) => {
val propertiesDescList = x.getPropertyDescriptor()
propertiesDescList.foreach(println(_))
}
case _ => println("Can not find : " + bundle)
}
}
@Test
def testFindAllConfigurableStop() = {
val allConfigurableStopList = findAllConfigurableStop()
println("\n\n\n" + allConfigurableStopList)
}
}

View File

@ -0,0 +1,59 @@
package cn.piflow.bundle
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import org.apache.spark.sql.SparkSession
import org.junit.Test
import scala.util.parsing.json.JSON
class FlowTest {
/*@Test
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
//execute flow
val spark = SparkSession.builder()
.master("spark://10.0.86.89:7077")
.appName("piflow-hive-bundle")
.config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g")
.config("spark.cores.max", "2")
.config("spark.jars","/opt/project/piflow/out/artifacts/piflow_bundle/piflow-bundle.jar")
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.start(flow);
process.awaitTermination();
spark.close();
}*/
@Test
def testFlow2json() = {
//parse flow json
val file = "src/main/resources/flow.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
//create flow
val flowBean = FlowBean(map)
val flowJson = flowBean.toJson()
println(flowJson)
}
}

View File

@ -0,0 +1,49 @@
package cn.piflow.bundle
import cn.piflow._
import cn.piflow.bundle.hive.{PutHiveStreaming, SelectHiveQL}
import org.apache.spark.sql.SparkSession
import org.junit.Test
class HiveTest {
val selectHiveQLParameters : Map[String, String] = Map("hiveQL" -> "select * from sparktest.student")
val putHiveStreamingParameters : Map[String, String] = Map("database" -> "sparktest", "table" -> "studenthivestreaming")
@Test
def testHive(): Unit = {
val selectHiveQLStop = new SelectHiveQL
selectHiveQLStop.setProperties(selectHiveQLParameters)
val putHiveStreamingStop = new PutHiveStreaming
putHiveStreamingStop.setProperties(putHiveStreamingParameters)
val flow = new FlowImpl();
flow.addStop("SelectHiveQL", selectHiveQLStop);
flow.addStop("PutHiveStreaming", putHiveStreamingStop);
flow.addPath(Path.from("SelectHiveQL").to("PutHiveStreaming"));
val spark = SparkSession.builder()
.master("spark://10.0.86.89:7077")
.appName("piflow-hive-bundle")
.config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g")
.config("spark.cores.max", "2")
.config("spark.jars","/opt/project/piflow-jar-bundle/out/artifacts/piflow-jar-bundle/piflow-jar-bundle.jar")
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.start(flow);
process.awaitTermination();
spark.close();
}
}

View File

@ -0,0 +1,52 @@
package cn.piflow.bundle
import cn.piflow.bundle.jdbc.{JDBCRead, JDBCWrite}
import cn.piflow.{FlowImpl, Path, Runner}
import org.apache.spark.sql.SparkSession
import org.junit.Test
class JDBCTest {
@Test
def testMysqlRead(): Unit ={
val jdbcReadParameters = Map(
"url" -> "jdbc:mysql://10.0.86.90/sparktest",
"driver"->"com.mysql.jdbc.Driver",
"sql"->"select student.id, name, gender, age, score from student, student_score where student.id = student_score.id",
"user"->"root",
"password"->"root")
val jdbcWriteParameters = Map("writeDBtable" -> "student_full")
val jDBCReadStop = new JDBCRead()
jDBCReadStop.setProperties(jdbcReadParameters)
val jDBCWriteStop = new JDBCWrite()
jDBCWriteStop.setProperties(jdbcWriteParameters)
val flow = new FlowImpl();
flow.addStop("JDBCRead", jDBCReadStop);
flow.addStop("JDBCWrite", jDBCWriteStop);
flow.addPath(Path.from("JDBCRead").to("JDBCWrite"));
val spark = SparkSession.builder()
.master("spark://10.0.86.89:7077")
.appName("piflow-hive-bundle")
.config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g")
.config("spark.cores.max", "2")
.config("spark.jars","/opt/project/piflow-jar-bundle/out/artifacts/piflow-jar-bundle/piflow-jar-bundle.jar")
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.start(flow);
process.awaitTermination();
spark.close();
}
}

View File

@ -0,0 +1,52 @@
package cn.piflow.bundle
import cn.piflow.bundle.json.{JsonPathParser, JsonSave}
import cn.piflow.{FlowImpl, Path, Runner}
import org.apache.spark.sql.SparkSession
import org.junit.Test
class JsonTest {
@Test
def testJsonPathParser(): Unit ={
val JsonPathParserParameters = Map("jsonPath"->"hdfs://10.0.86.89:9000/xjzhu/student.json", "tag"->"student")
val JsonSavePathParameters = Map("jsonSavePath" -> "hdfs://10.0.86.89:9000/xjzhu/example_json_save")
val flow = new FlowImpl();
val jsonPathParserStop = new JsonPathParser()
jsonPathParserStop.setProperties(JsonPathParserParameters)
val jsonSaveStop = new JsonSave()
jsonSaveStop.setProperties(JsonSavePathParameters)
flow.addStop("JsonPathParser", jsonPathParserStop)
flow.addStop("JsonSave", jsonSaveStop)
flow.addPath(Path.from("JsonPathParser").to("JsonSave"));
val spark = SparkSession.builder()
.master("spark://10.0.86.89:7077")
.appName("piflow-hive-bundle")
.config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g")
.config("spark.cores.max", "2")
.config("spark.jars","/opt/project/piflow-jar-bundle/out/artifacts/piflow-jar-bundle/piflow-jar-bundle.jar")
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.start(flow);
process.awaitTermination();
spark.close();
}
@Test
def testJsonStringParser(): Unit ={
}
}

View File

@ -0,0 +1,66 @@
package cn.piflow.bundle
import cn.piflow._
import cn.piflow.bundle.common.SelectField
import cn.piflow.bundle.hive.PutHiveStreaming
import cn.piflow.bundle.xml.XmlParser
import org.apache.spark.sql.SparkSession
import org.junit.Test
class XmlTest {
@Test
def testNodeXML(): Unit = {
val flow = new FlowImpl();
/*val schema = StructType(Array(
StructField("_key", StringType, nullable = true),
StructField("_mdate", StringType, nullable = true),
StructField("author", StringType, nullable = true),
StructField("title", StringType, nullable = true),
StructField("year", StringType, nullable = true),
StructField("school", StringType, nullable = true),
StructField("ee", StringType, nullable = true),
StructField("note", StringType, nullable = true)
))*/
val xmlParserParameters = Map("xmlpath"->"hdfs://10.0.86.89:9000/xjzhu/dblp.mini.xml", "rowTag" -> "phdthesis")
val selectedFieldParameters = Map("selectedField"->"title,author,pages")
val putHiveStreamingParameters = Map("database" -> "sparktest", "table" -> "dblp_phdthesis")
val xmlParserStop = new XmlParser
xmlParserStop.setProperties(xmlParserParameters)
val selectFieldStop = new SelectField
selectFieldStop.setProperties(selectedFieldParameters)
val putHiveStreamingStop = new PutHiveStreaming
putHiveStreamingStop.setProperties(putHiveStreamingParameters)
flow.addStop("XmlParser", xmlParserStop);
flow.addStop("SelectField", selectFieldStop);
flow.addStop("PutHiveStreaming", putHiveStreamingStop);
flow.addPath(Path.from("XmlParser").to("SelectField"))
flow.addPath(Path.from("SelectField").to("PutHiveStreaming"))
val spark = SparkSession.builder()
.master("spark://10.0.86.89:7077")
.appName("DblpParserTest")
.config("spark.driver.memory", "4g")
.config("spark.executor.memory", "2g")
.config("spark.cores.max", "3")
.config("spark.jars","/opt/project/piflow/out/artifacts/piflow_bundle/piflow-bundle.jar")
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.start(flow);
process.awaitTermination();
spark.close();
}
}