Merge remote-tracking branch 'origin/master'

This commit is contained in:
bao319 2020-03-25 21:11:03 +08:00
commit 1b57b79022
83 changed files with 2680 additions and 732 deletions

View File

@ -74,7 +74,8 @@
"properties":{ "properties":{
"csvSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis_result.csv", "csvSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis_result.csv",
"header":"true", "header":"true",
"delimiter":"," "delimiter":",",
"partition": "1"
} }
} }
], ],

View File

@ -0,0 +1,34 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"0000",
"name":"SelectHiveQL",
"bundle":"cn.piflow.bundle.hive.SelectHiveQL",
"properties":{
"hiveQL":"select * from test.user1"
}
}, {
"uuid":"1111",
"name":"ConvertSchema",
"bundle":"cn.piflow.bundle.common.ConvertSchema",
"properties":{
"schema":"id->uuid"
}
}
],
"paths":[
{
"from":"SelectHiveQL",
"outport":"",
"inport":"",
"to":"ConvertSchema"
}
]
}
}

View File

@ -0,0 +1,34 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"0000",
"name":"SelectHiveQL",
"bundle":"cn.piflow.bundle.hive.SelectHiveQL",
"properties":{
"hiveQL":"select * from test.user1"
}
}, {
"uuid":"1111",
"name":"Distinct",
"bundle":"cn.piflow.bundle.common.Distinct",
"properties":{
"fields":"id"
}
}
],
"paths":[
{
"from":"SelectHiveQL",
"outport":"",
"inport":"",
"to":"Distinct"
}
]
}
}

View File

@ -0,0 +1,34 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"0000",
"name":"SelectHiveQL",
"bundle":"cn.piflow.bundle.hive.SelectHiveQL",
"properties":{
"hiveQL":"select * from test.user1"
}
}, {
"uuid":"1111",
"name":"ConvertSchema",
"bundle":"cn.piflow.bundle.common.DropField",
"properties":{
"fields":"id"
}
}
],
"paths":[
{
"from":"SelectHiveQL",
"outport":"",
"inport":"",
"to":"ConvertSchema"
}
]
}
}

View File

@ -0,0 +1,32 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"0000",
"name":"SelectHiveQL",
"bundle":"cn.piflow.bundle.hive.SelectHiveQL",
"properties":{
"hiveQL":"select * from test.user1"
}
}, {
"uuid":"1111",
"name":"ExecuteSQLStop",
"bundle":"cn.piflow.bundle.common.ExecuteSQLStop",
"properties":{
"sql":"select * from temp where id = 0001",
"tempViewName": "temp"
}
}
],
"paths":[
{
"from":"SelectHiveQL",
"outport":"",
"inport":"",
"to":"ExecuteSQLStop"
}
]
}
}

View File

@ -0,0 +1,34 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"0000",
"name":"SelectHiveQL",
"bundle":"cn.piflow.bundle.hive.SelectHiveQL",
"properties":{
"hiveQL":"select * from test.user1"
}
}, {
"uuid":"1111",
"name":"Filter",
"bundle":"cn.piflow.bundle.common.Filter",
"properties":{
"condition":"name=='zhangsan'"
}
}
],
"paths":[
{
"from":"SelectHiveQL",
"outport":"",
"inport":"",
"to":"Filter"
}
]
}
}

View File

@ -0,0 +1,59 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"0000",
"name":"SelectHiveQL",
"bundle":"cn.piflow.bundle.hive.SelectHiveQL",
"properties":{
"hiveQL":"select * from test.user1"
}
},
{
"uuid":"1111",
"name":"Fork",
"bundle":"cn.piflow.bundle.common.Fork",
"properties":{
"outports":"out1,out2"
}
},{
"uuid":"2222",
"name":"ExecuteSQLStop",
"bundle":"cn.piflow.bundle.common.ExecuteSQLStop",
"properties":{
"sql":"select * from temp where id = 0001",
"tempViewName": "temp"
}
},{
"uuid":"3333",
"name":"SelectField",
"bundle":"cn.piflow.bundle.common.SelectField",
"properties":{
"fields":"id"
}
}
],
"paths":[
{
"from":"SelectHiveQL",
"outport":"",
"inport":"",
"to":"Fork"
},{
"from":"Fork",
"outport":"out1",
"inport":"",
"to":"ExecuteSQLStop"
},{
"from":"Fork",
"outport":"out2",
"inport":"",
"to":"SelectField"
}
]
}
}

View File

@ -0,0 +1,48 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"0000",
"name":"SelectHiveQL",
"bundle":"cn.piflow.bundle.hive.SelectHiveQL",
"properties":{
"hiveQL":"select * from test.user1"
}
},
{
"uuid":"0000",
"name":"SelectHiveQL1",
"bundle":"cn.piflow.bundle.hive.SelectHiveQL",
"properties":{
"hiveQL":"select * from test.user1 limit 10 "
}
},{
"uuid":"1111",
"name":"Join",
"bundle":"cn.piflow.bundle.common.Join",
"properties":{
"correlationField": "id",
"joinMode": "left"
}
}
],
"paths":[
{
"from":"SelectHiveQL",
"outport":"",
"inport":"Left",
"to":"Join"
},{
"from":"SelectHiveQL1",
"outport":"",
"inport":"Right",
"to":"Join"
}
]
}
}

View File

@ -0,0 +1,59 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"0000",
"name":"SelectHiveQL",
"bundle":"cn.piflow.bundle.hive.SelectHiveQL",
"properties":{
"hiveQL":"select * from test.user1"
}
},{
"uuid":"1111",
"name":"SelectHiveQL1",
"bundle":"cn.piflow.bundle.hive.SelectHiveQL",
"properties":{
"hiveQL":"select * from test.user2"
}
},
{
"uuid":"2222",
"name":"Merge",
"bundle":"cn.piflow.bundle.common.Merge",
"properties":{
"inports":"in1,in2"
}
},{
"uuid":"3333",
"name":"ExecuteSQLStop",
"bundle":"cn.piflow.bundle.common.ExecuteSQLStop",
"properties":{
"sql":"select * from temp where id = 0001",
"tempViewName": "temp"
}
}
],
"paths":[
{
"from":"SelectHiveQL",
"outport":"",
"inport":"in1",
"to":"Merge"
},{
"from":"SelectHiveQL1",
"outport":"",
"inport":"in2",
"to":"Merge"
},{
"from":"Merge",
"outport":"",
"inport":"",
"to":"ExecuteSQLStop"
}
]
}
}

View File

@ -0,0 +1,34 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"0000",
"name":"SelectHiveQL",
"bundle":"cn.piflow.bundle.hive.SelectHiveQL",
"properties":{
"hiveQL":"select * from test.user1"
}
}, {
"uuid":"1111",
"name":"Route",
"bundle":"cn.piflow.bundle.common.Route",
"properties":{
}
}
],
"paths":[
{
"from":"SelectHiveQL",
"outport":"",
"inport":"",
"to":"Route"
}
]
}
}

View File

@ -0,0 +1,34 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"0000",
"name":"SelectHiveQL",
"bundle":"cn.piflow.bundle.hive.SelectHiveQL",
"properties":{
"hiveQL":"select * from test.user1"
}
}, {
"uuid":"1111",
"name":"SelectField",
"bundle":"cn.piflow.bundle.common.SelectField",
"properties":{
"fields":"id"
}
}
],
"paths":[
{
"from":"SelectHiveQL",
"outport":"",
"inport":"",
"to":"SelectField"
}
]
}
}

View File

@ -0,0 +1,46 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"0000",
"name":"SelectHiveQL",
"bundle":"cn.piflow.bundle.hive.SelectHiveQL",
"properties":{
"hiveQL":"select * from test.user1"
}
},
{
"uuid":"0000",
"name":"SelectHiveQL1",
"bundle":"cn.piflow.bundle.hive.SelectHiveQL",
"properties":{
"hiveQL":"select * from test.user1 limit 10 "
}
},{
"uuid":"1111",
"name":"Subtract",
"bundle":"cn.piflow.bundle.common.Subtract",
"properties":{
}
}
],
"paths":[
{
"from":"SelectHiveQL",
"outport":"",
"inport":"Left",
"to":"Subtract"
},{
"from":"SelectHiveQL1",
"outport":"",
"inport":"Right",
"to":"Subtract"
}
]
}
}

View File

@ -0,0 +1,34 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"0000",
"name":"SelectHiveQL",
"bundle":"cn.piflow.bundle.hive.SelectHiveQL",
"properties":{
"hiveQL":"select * from test.user1"
}
}, {
"uuid":"1111",
"name":"AddUUIDStop",
"bundle":"cn.piflow.bundle.common.AddUUIDStop",
"properties":{
"column":"uuid"
}
}
],
"paths":[
{
"from":"SelectHiveQL",
"outport":"",
"inport":"",
"to":"AddUUIDStop"
}
]
}
}

View File

@ -0,0 +1,36 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"0000",
"name":"SelectHiveQL",
"bundle":"cn.piflow.bundle.hive.SelectHiveQL",
"properties":{
"hiveQL":"select * from test.user1"
}
}, {
"uuid":"1111",
"name":"RegexTextProcess",
"bundle":"cn.piflow.bundle.file.RegexTextProcess",
"properties":{
"regex": "0001",
"columnName": "id",
"replaceStr": "1111"
}
}
],
"paths":[
{
"from":"SelectHiveQL",
"outport":"",
"inport":"",
"to":"RegexTextProcess"
}
]
}
}

View File

@ -0,0 +1,42 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"0000",
"name":"LoadGraph",
"bundle":"cn.piflow.bundle.graphx.LoadGraph",
"properties":{
"dataPath":"hdfs://192.168.3.138:8020/work/test/test.csv"
}
},
{
"uuid":"1111",
"name":"LabelPropagation",
"bundle":"cn.piflow.bundle.graphx.LabelPropagation",
"properties":{
"maxIter":"20"
}
}
],
"paths":[
{
"from":"LoadGraph",
"outport":"edges",
"inport":"edgesIn",
"to":"LabelPropagation"
},
{
"from":"LoadGraph",
"outport":"vertex",
"inport":"vertexIn",
"to":"LabelPropagation"
}
]
}
}

View File

@ -0,0 +1,36 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"0000",
"name":"SelectHiveQL",
"bundle":"cn.piflow.bundle.hive.SelectHiveQL",
"properties":{
"hiveQL":"select * from test.user1"
}
}, {
"uuid":"1111",
"name":"DeleteHdfs",
"bundle":"cn.piflow.bundle.hdfs.DeleteHdfs",
"properties":{
"hdfsPath": "/work/test1/",
"hdfsUrl": "hdfs://192.168.3.138:8020",
"isCustomize": "true"
}
}
],
"paths":[
{
"from":"SelectHiveQL",
"outport":"",
"inport":"",
"to":"DeleteHdfs"
}
]
}
}

View File

@ -0,0 +1,36 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"0000",
"name":"SelectHiveQL",
"bundle":"cn.piflow.bundle.hive.SelectHiveQL",
"properties":{
"hiveQL":"select * from test.user1"
}
}, {
"uuid":"1111",
"name":"FileDownHdfs",
"bundle":"cn.piflow.bundle.hdfs.FileDownHdfs",
"properties":{
"hdfsPath": "/work/dblp/dblp.xml.gz",
"hdfsUrl": "hdfs://192.168.3.138:8020",
"url_str": "https://dblp.dagstuhl.de/xml/dblp.xml.gz"
}
}
],
"paths":[
{
"from":"SelectHiveQL",
"outport":"",
"inport":"",
"to":"FileDownHdfs"
}
]
}
}

View File

@ -0,0 +1,36 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"0000",
"name":"SelectHiveQL",
"bundle":"cn.piflow.bundle.hive.SelectHiveQL",
"properties":{
"hiveQL":"select * from test.user1"
}
}, {
"uuid":"1111",
"name":"GetHdfs",
"bundle":"cn.piflow.bundle.hdfs.GetHdfs",
"properties":{
"hdfsPath": "/work/test1/part-00000-a445068e-ab6b-4bc0-b3df-e1ccbb9a738e.csv",
"hdfsUrl": "hdfs://192.168.3.138:8020",
"types": "csv"
}
}
],
"paths":[
{
"from":"SelectHiveQL",
"outport":"",
"inport":"",
"to":"GetHdfs"
}
]
}
}

View File

@ -0,0 +1,35 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"0000",
"name":"SelectHiveQL",
"bundle":"cn.piflow.bundle.hive.SelectHiveQL",
"properties":{
"hiveQL":"select * from test.user1"
}
}, {
"uuid":"1111",
"name":"ListHdfs",
"bundle":"cn.piflow.bundle.hdfs.ListHdfs",
"properties":{
"hdfsPath": "/work/",
"hdfsUrl": "hdfs://192.168.3.138:8020"
}
}
],
"paths":[
{
"from":"SelectHiveQL",
"outport":"",
"inport":"",
"to":"ListHdfs"
}
]
}
}

View File

@ -0,0 +1,37 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"0000",
"name":"SelectHiveQL",
"bundle":"cn.piflow.bundle.hive.SelectHiveQL",
"properties":{
"hiveQL":"select * from test.user1"
}
}, {
"uuid":"1111",
"name":"PutHdfs",
"bundle":"cn.piflow.bundle.hdfs.PutHdfs",
"properties":{
"hdfsPath": "/work/test1/",
"hdfsUrl": "hdfs://192.168.3.138:8020",
"types": "csv",
"partition": "1"
}
}
],
"paths":[
{
"from":"SelectHiveQL",
"outport":"",
"inport":"",
"to":"PutHdfs"
}
]
}
}

View File

@ -0,0 +1,39 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"0000",
"name":"SelectHiveQL",
"bundle":"cn.piflow.bundle.hive.SelectHiveQL",
"properties":{
"hiveQL":"select * from test.user1"
}
}, {
"uuid":"1111",
"name":"SaveToHdfs",
"bundle":"cn.piflow.bundle.hdfs.SaveToHdfs",
"properties":{
"hdfsDirPath": "/work/test/",
"hdfsUrl": "hdfs://192.168.3.138:8020",
"fileName": "test.csv",
"delimiter": ",",
"types": "csv",
"header": "true"
}
}
],
"paths":[
{
"from":"SelectHiveQL",
"outport":"",
"inport":"",
"to":"SaveToHdfs"
}
]
}
}

View File

@ -0,0 +1,36 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"0000",
"name":"SelectHiveQL",
"bundle":"cn.piflow.bundle.hive.SelectHiveQL",
"properties":{
"hiveQL":"select * from test.user1"
}
}, {
"uuid":"1111",
"name":"SelectFilesByName",
"bundle":"cn.piflow.bundle.hdfs.SelectFilesByName",
"properties":{
"hdfsPath": "/work/",
"hdfsUrl": "hdfs://192.168.3.138:8020",
"selectionConditions": ".*.csv"
}
}
],
"paths":[
{
"from":"SelectHiveQL",
"outport":"",
"inport":"",
"to":"SelectFilesByName"
}
]
}
}

View File

@ -0,0 +1,37 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"0000",
"name":"SelectHiveQL",
"bundle":"cn.piflow.bundle.hive.SelectHiveQL",
"properties":{
"hiveQL":"select * from test.user1"
}
}, {
"uuid":"1111",
"name":"UnzipFilesOnHDFS",
"bundle":"cn.piflow.bundle.hdfs.UnzipFilesOnHDFS",
"properties":{
"filePath": "/work/test/ideaIU-2018.3.5.tar.gz",
"hdfsUrl": "hdfs://192.168.3.138:8020",
"savePath": "/work/test/",
"isCustomize": "true"
}
}
],
"paths":[
{
"from":"SelectHiveQL",
"outport":"",
"inport":"",
"to":"UnzipFilesOnHDFS"
}
]
}
}

View File

@ -59,7 +59,8 @@
"properties":{ "properties":{
"csvSavePath":"hdfs://10.0.86.191:9000/xjzhu/phdthesis_result.csv", "csvSavePath":"hdfs://10.0.86.191:9000/xjzhu/phdthesis_result.csv",
"header":"true", "header":"true",
"delimiter":"," "delimiter":",",
"partition": "1"
} }
} }
], ],

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.8 KiB

View File

@ -10,7 +10,10 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
class AddUUIDStop extends ConfigurableStop{ class AddUUIDStop extends ConfigurableStop{
override val authorEmail: String = "ygang@cnic.cn" override val authorEmail: String = "ygang@cnic.cn"
override val description: String = "Increase the column with uuid"
override val description: String = "Add UUID column"
override val inportList: List[String] =List(Port.DefaultPort.toString) override val inportList: List[String] =List(Port.DefaultPort.toString)
override val outportList: List[String] = List(Port.DefaultPort.toString) override val outportList: List[String] = List(Port.DefaultPort.toString)
@ -22,15 +25,11 @@ class AddUUIDStop extends ConfigurableStop{
var df = in.read() var df = in.read()
val sqlContext = spark.sqlContext spark.udf.register("generaterUUID",()=>UUID.randomUUID().toString.replace("-",""))
val name = df.schema(0).name
sqlContext.udf.register("code",(str:String)=>UUID.randomUUID().toString.replace("-",""))
val columns = column.split(",")
columns.foreach(t=>{
df.createOrReplaceTempView("temp") df.createOrReplaceTempView("temp")
df = sqlContext.sql("select code("+name+") as "+t +",* from temp") df = spark.sql(s"select generaterUUID() as ${column},* from temp")
})
out.write(df) out.write(df)
} }
@ -43,9 +42,16 @@ class AddUUIDStop extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val inports = new PropertyDescriptor().name("column").displayName("column")
.description("The column is you want to add uuid column's name,Multiple are separated by commas").defaultValue("uuid").required(true)
descriptor = inports :: descriptor val column = new PropertyDescriptor().name("column")
.displayName("Column")
.description("The column is you want to add uuid column's name,")
.defaultValue("uuid")
.required(true)
.example("uuid")
descriptor = column :: descriptor
descriptor descriptor
} }
@ -54,7 +60,7 @@ class AddUUIDStop extends ConfigurableStop{
} }
override def getGroup(): List[String] = { override def getGroup(): List[String] = {
List(StopGroup.CommonGroup.toString) List(StopGroup.CommonGroup)
} }

View File

@ -9,12 +9,11 @@ import cn.piflow.conf.util.{ImageUtil, MapUtil}
class ConvertSchema extends ConfigurableStop { class ConvertSchema extends ConfigurableStop {
val authorEmail: String = "yangqidong@cnic.cn" val authorEmail: String = "yangqidong@cnic.cn"
val description: String = "Transform field name" val description: String = "Change field name"
val inportList: List[String] = List(Port.DefaultPort.toString) val inportList: List[String] = List(Port.DefaultPort.toString)
val outportList: List[String] = List(Port.DefaultPort.toString) val outportList: List[String] = List(Port.DefaultPort.toString)
var schema:String = _ var schema:String = _
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
var df = in.read() var df = in.read()
@ -39,8 +38,12 @@ class ConvertSchema extends ConfigurableStop {
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val inports = new PropertyDescriptor().name("schema").displayName("schema").description("To change the field name of the name and the name you want, " + val inports = new PropertyDescriptor().name("schema")
"you can write oldField1 - > newField1, if there are more than one, you can use, partition, such as oldField1->newField1, oldField2->newField2").defaultValue("").required(true) .displayName("Schema")
.description("Change field name, you can write oldField1 -> newField1, Multiple separated by commas, Such as 'oldField1->newField1, oldField2->newField2' ")
.defaultValue("")
.required(true)
.example("id->uuid")
descriptor = inports :: descriptor descriptor = inports :: descriptor
descriptor descriptor
} }

View File

@ -1,115 +0,0 @@
package cn.piflow.bundle.common
import cn.piflow._
import cn.piflow.conf._
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types._
import scala.collection.mutable.ArrayBuffer
class ConvertSchemaType extends ConfigurableStop {
val authorEmail: String = "ygang@cnic.cn"
val description: String = "Transform the schema dataType"
val inportList: List[String] = List(Port.DefaultPort.toString)
val outportList: List[String] = List(Port.DefaultPort.toString)
var stringType:String = _
var integerType:String = _
var all:String = _
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
var df = in.read()
var colName = ArrayBuffer[String]()
var outDf :DataFrame= df
if (all.equals("true")){
colName.clear()
df.schema.foreach(x=>{
colName += x.name
})
import org.apache.spark.sql.functions._
colName.foreach(name => {
outDf = outDf.withColumn(name, col(name).cast(StringType))
})
} else {
if (stringType.nonEmpty){
colName.clear()
stringType.split(",").foreach(x=>{
colName += x
})
import org.apache.spark.sql.functions._
colName.foreach(name => {
outDf = outDf.withColumn(name, col(name).cast(StringType))
})
}
if (integerType.nonEmpty){
colName.clear()
integerType.split(",").foreach(x=>{
colName += x
})
import org.apache.spark.sql.functions._
colName.foreach(name => {
outDf = outDf.withColumn(name, col(name).cast(IntegerType))
})
}
}
out.write(outDf)
}
def initialize(ctx: ProcessContext): Unit = {
}
def setProperties(map : Map[String, Any]): Unit = {
stringType = MapUtil.get(map,"stringType").asInstanceOf[String]
integerType = MapUtil.get(map,"integerType").asInstanceOf[String]
all = MapUtil.get(map,"all").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val all = new PropertyDescriptor().name("all").displayName("all").description("if true ,the schema all types are converted to stringType").allowableValues(Set("true","false")).defaultValue("true").required(true)
val stringType = new PropertyDescriptor().name("stringType").displayName("stringType").description("the specified field types are converted to stringType, Multiple are separated by commas").defaultValue("").required(false)
val integerType = new PropertyDescriptor().name("integerType").displayName("integerType").description("the specified types are converted to integerType, Multiple are separated by commas").defaultValue("").required(false)
descriptor = stringType :: descriptor
descriptor = integerType :: descriptor
descriptor = all :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("icon/common/ConvertSchema.png")
}
override def getGroup(): List[String] = {
List(StopGroup.CommonGroup.toString)
}
}

View File

@ -8,22 +8,26 @@ import org.apache.spark.sql.DataFrame
class Distinct extends ConfigurableStop{ class Distinct extends ConfigurableStop{
override val authorEmail: String = "yangqidong@cnic.cn" override val authorEmail: String = "yangqidong@cnic.cn"
override val description: String = "Reduplicate data according to all fields or fields you specify" override val description: String = "De duplicate data according to all fields or specified fields "
override val inportList: List[String] =List(Port.DefaultPort.toString) override val inportList: List[String] =List(Port.DefaultPort.toString)
override val outportList: List[String] = List(Port.DefaultPort.toString) override val outportList: List[String] = List(Port.DefaultPort.toString)
var files:String=_ var fields:String=_
override def setProperties(map: Map[String, Any]): Unit = { override def setProperties(map: Map[String, Any]): Unit = {
files = MapUtil.get(map,"files").asInstanceOf[String] fields = MapUtil.get(map,"fields").asInstanceOf[String]
} }
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val files = new PropertyDescriptor().name("files").displayName("files").description("To de-duplicate the field, you can fill in the field name, " + val fields = new PropertyDescriptor().name("fields")
"if there are more than one, please use, separate. You can also not fill in, we will be based on all fields as a condition to weight.").defaultValue("").required(false) .displayName("Fields")
descriptor = files :: descriptor .description("De duplicate data according to all fields or specified fields,Multiple separated by commas ; If not, all fields will be de duplicated")
.defaultValue("")
.required(false)
.example("id")
descriptor = fields :: descriptor
descriptor descriptor
} }
@ -44,8 +48,8 @@ class Distinct extends ConfigurableStop{
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val inDf: DataFrame = in.read() val inDf: DataFrame = in.read()
var outDf: DataFrame = null var outDf: DataFrame = null
if(files.length > 0){ if(fields.length > 0){
val fileArr: Array[String] = files.split(",") val fileArr: Array[String] = fields.split(",")
outDf = inDf.dropDuplicates(fileArr) outDf = inDf.dropDuplicates(fileArr)
}else{ }else{
outDf = inDf.distinct() outDf = inDf.distinct()

View File

@ -1,56 +0,0 @@
package cn.piflow.bundle.common
import cn.piflow._
import cn.piflow.conf._
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.lib._
import cn.piflow.util.ScriptEngine
class DoFlatMapStop extends ConfigurableStop{
val authorEmail: String = "ygang@cnic.cn"
val description: String = "Do flat map"
val inportList: List[String] = List(Port.DefaultPort.toString)
val outportList: List[String] = List(Port.DefaultPort.toString)
var SCRIPT: String = _
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
in.read().show()
val doMap = new DoFlatMap(ScriptEngine.logic(SCRIPT))
doMap.perform(in,out,pec)
}
override def setProperties(map: Map[String, Any]): Unit = {
SCRIPT = MapUtil.get(map,"SCRIPT_2").asInstanceOf[String]
}
override def initialize(ctx: ProcessContext): Unit = {
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val SCRIPT = new PropertyDescriptor().name("SCRIPT").displayName("SCRIPT").description("").defaultValue("").required(true)
descriptor = SCRIPT :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("icon/common/DoFlatMapStop.png")
}
override def getGroup(): List[String] = {
List(StopGroup.CommonGroup.toString)
}
}

View File

@ -1,69 +0,0 @@
package cn.piflow.bundle.common
import cn.piflow.conf._
import cn.piflow.lib._
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.util.ScriptEngine
import cn.piflow._
import org.apache.spark.sql.types.{StringType, StructField, StructType}
class DoMapStop extends ConfigurableStop{
val authorEmail: String = "ygang@cnic.cn"
val description: String = "Do map"
val inportList: List[String] = List(Port.DefaultPort.toString)
val outportList: List[String] = List(Port.DefaultPort.toString)
var schema: String = _
var SCRIPT: String = _
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val fieldsArray: Array[String] = schema.split(",")
val fields = fieldsArray.map(x => {
StructField(x, StringType, nullable = true)
})
val targetSchema = StructType(fields)
val doMap = new DoMap(ScriptEngine.logic(SCRIPT),targetSchema)
doMap.perform(in,out,pec)
}
override def setProperties(map: Map[String, Any]): Unit = {
SCRIPT = MapUtil.get(map,"SCRIPT").asInstanceOf[String]
schema = MapUtil.get(map,"schema").asInstanceOf[String]
}
override def initialize(ctx: ProcessContext): Unit = {
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val SCRIPT = new PropertyDescriptor().name("SCRIPT").displayName("SCRIPT").description("").defaultValue("").required(true)
descriptor = SCRIPT :: descriptor
val schema = new PropertyDescriptor().name("schema").displayName("schema").description("").defaultValue("").required(true)
descriptor = schema :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("icon/common/DoMapStop.png")
}
override def getGroup(): List[String] = {
List(StopGroup.CommonGroup.toString)
}
}

View File

@ -9,16 +9,16 @@ import cn.piflow.conf.util.{ImageUtil, MapUtil}
class DropField extends ConfigurableStop { class DropField extends ConfigurableStop {
val authorEmail: String = "ygang@cnic.cn" val authorEmail: String = "ygang@cnic.cn"
val description: String = "drop data field" val description: String = "Delete fields in schema"
val inportList: List[String] = List(Port.DefaultPort.toString) val inportList: List[String] = List(Port.DefaultPort.toString)
val outportList: List[String] = List(Port.DefaultPort.toString) val outportList: List[String] = List(Port.DefaultPort.toString)
var schema:String = _ var fields:String = _
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
var df = in.read() var df = in.read()
val field = schema.split(",") val field = fields.split(",")
for( x <- 0 until field.size){ for( x <- 0 until field.size){
df = df.drop(field(x)) df = df.drop(field(x))
} }
@ -31,18 +31,22 @@ class DropField extends ConfigurableStop {
} }
def setProperties(map : Map[String, Any]): Unit = { def setProperties(map : Map[String, Any]): Unit = {
schema = MapUtil.get(map,"schema").asInstanceOf[String] fields = MapUtil.get(map,"fields").asInstanceOf[String]
} }
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val inports = new PropertyDescriptor().name("schema").displayName("schema").description("The Schema you want to drop,Multiple are separated by commas").defaultValue("").required(true) val inports = new PropertyDescriptor().name("fields").displayName("Fields")
.description("Delete fields in schema,multiple are separated by commas")
.defaultValue("")
.required(true)
.example("id")
descriptor = inports :: descriptor descriptor = inports :: descriptor
descriptor descriptor
} }
override def getIcon(): Array[Byte] = { override def getIcon(): Array[Byte] = {
ImageUtil.getImage("icon/common/Subtract.png") ImageUtil.getImage("icon/common/DropField.png")
} }
override def getGroup(): List[String] = { override def getGroup(): List[String] = {

View File

@ -15,68 +15,28 @@ import org.elasticsearch.common.collect.Tuple
class ExecuteSQLStop extends ConfigurableStop{ class ExecuteSQLStop extends ConfigurableStop{
val authorEmail: String = "ygang@cnic.cn" val authorEmail: String = "ygang@cnic.cn"
val description: String = "Execute sql" val description: String = "Create temporary view table to execute sql"
val inportList: List[String] = List(Port.DefaultPort.toString) val inportList: List[String] = List(Port.DefaultPort.toString)
val outportList: List[String] = List(Port.DefaultPort.toString) val outportList: List[String] = List(Port.DefaultPort.toString)
var sql: String = _ var sql: String = _
var tableName: String = _ var tempViewName: String = _
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]() val spark = pec.get[SparkSession]()
val inDF = in.read() val inDF = in.read()
inDF.createOrReplaceTempView(tableName) inDF.createOrReplaceTempView(tempViewName)
val frame: DataFrame = spark.sql(sql) val frame: DataFrame = spark.sql(sql)
out.write(frame) out.write(frame)
// val tableNames = bundle2TableNames.split(",")
// for (i <- 0 until tableNames.length){
//
// // 00->table1 ,.....
// if (i== 0){
// val imports = tableNames(i).split("->")(0)
// val tableName = tableNames(i).split("->")(1)
// val bundle2 = imports -> tableName
//
// val doMap = new ExecuteSQL(sql,bundle2);
// doMap.perform(in,out,pec)
//
// } else {
// val imports = tableNames(i).split("->")(0)
// val tableName = tableNames(i).split("->")(1)
// val bundle2:(String,String) = imports -> tableName
//
// val doMap = new ExecuteSQL(sql,bundle2);
// doMap.perform(in,out,pec)
// }
// }
} }
// def createCountWords() = {
//
// val processCountWords = new FlowImpl();
// //SparkProcess = loadStream + transform... + writeStream
// processCountWords.addStop("LoadStream", new LoadStream(TextFile("hdfs://10.0.86.89:9000/yg/2", FileFormat.TEXT)));
// processCountWords.addStop("DoMap", new ExecuteSQLStop);
//
// processCountWords.addPath(Path.from("LoadStream").to("DoMap"));
//
// new FlowAsStop(processCountWords);
// }
override def setProperties(map: Map[String, Any]): Unit = { override def setProperties(map: Map[String, Any]): Unit = {
sql = MapUtil.get(map,"sql").asInstanceOf[String] sql = MapUtil.get(map,"sql").asInstanceOf[String]
tableName = MapUtil.get(map,"tableName").asInstanceOf[String] tempViewName = MapUtil.get(map,"tempViewName").asInstanceOf[String]
} }
override def initialize(ctx: ProcessContext): Unit = { override def initialize(ctx: ProcessContext): Unit = {
@ -84,10 +44,23 @@ class ExecuteSQLStop extends ConfigurableStop{
} }
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val sql = new PropertyDescriptor().name("sql").displayName("sql").description("sql").defaultValue("").required(true) val sql = new PropertyDescriptor().name("sql")
val bundle2TableNames = new PropertyDescriptor().name("tableName").displayName("tableName").description(" tableName").defaultValue("temp").required(true) .displayName("Sql")
.description("Sql string")
.defaultValue("")
.required(true)
.example("select * from temp")
descriptor = sql :: descriptor descriptor = sql :: descriptor
descriptor = bundle2TableNames :: descriptor
val tableName = new PropertyDescriptor()
.name("tempViewName")
.displayName("TempViewName")
.description(" Temporary view table")
.defaultValue("temp")
.required(true)
.example("temp")
descriptor = tableName :: descriptor
descriptor descriptor
} }

View File

@ -20,9 +20,15 @@ class Filter extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val condition = new PropertyDescriptor().name("condition").displayName("condition").description("The condition you want to filter").defaultValue("").required(true) val condition = new PropertyDescriptor().name("condition").
displayName("condition")
.description("The condition you want to filter")
.defaultValue("name=='zhangsan'")
.required(true)
.example("name=='zhangsan'")
descriptor = condition :: descriptor descriptor = condition :: descriptor
descriptor descriptor
} }
override def getIcon(): Array[Byte] = { override def getIcon(): Array[Byte] = {

View File

@ -5,7 +5,6 @@ import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil} import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import scala.beans.BeanProperty
class Fork extends ConfigurableStop{ class Fork extends ConfigurableStop{
@ -32,7 +31,11 @@ class Fork extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val outports = new PropertyDescriptor().name("outports").displayName("outports").description("outports string, seperated by ,.").defaultValue("").required(true) val outports = new PropertyDescriptor().name("outports")
.displayName("outports")
.description("outports string, seperated by ,.")
.defaultValue("")
.required(true)
descriptor = outports :: descriptor descriptor = outports :: descriptor
descriptor descriptor
} }

View File

@ -15,8 +15,6 @@ class Join extends ConfigurableStop{
var joinMode:String=_ var joinMode:String=_
var correlationField:String=_ var correlationField:String=_
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val leftDF = in.read(Port.LeftPort) val leftDF = in.read(Port.LeftPort)
@ -35,11 +33,8 @@ class Join extends ConfigurableStop{
case "full_outer" => df = leftDF.join(rightDF,seq,"outer") case "full_outer" => df = leftDF.join(rightDF,seq,"outer")
} }
out.write(df) out.write(df)
} }
override def setProperties(map: Map[String, Any]): Unit = { override def setProperties(map: Map[String, Any]): Unit = {
joinMode = MapUtil.get(map,"joinMode").asInstanceOf[String] joinMode = MapUtil.get(map,"joinMode").asInstanceOf[String]
correlationField = MapUtil.get(map,"correlationField").asInstanceOf[String] correlationField = MapUtil.get(map,"correlationField").asInstanceOf[String]
@ -48,12 +43,22 @@ class Join extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val joinMode = new PropertyDescriptor().name("joinMode").displayName("joinMode").description("For table association, you can choose INNER, LEFT, RIGHT, FULL") val joinMode = new PropertyDescriptor().name("joinMode")
.allowableValues(Set("inner","left","right","full_outer")).defaultValue("inner").required(true) .displayName("joinMode")
val correlationField = new PropertyDescriptor().name("correlationField").displayName("correlationField").description("Fields associated with tables,If there are more than one, please use , separate").defaultValue("").required(true) .description("For table association, you can choose INNER, LEFT, RIGHT, FULL")
descriptor = correlationField :: descriptor .allowableValues(Set("inner","left","right","full_outer"))
.defaultValue("inner")
.required(true)
descriptor = joinMode :: descriptor descriptor = joinMode :: descriptor
val correlationField = new PropertyDescriptor()
.name("correlationField")
.displayName("correlationField")
.description("Fields associated with tables,If there are more than one, please use , separate")
.defaultValue("")
.required(true)
descriptor = correlationField :: descriptor
descriptor descriptor
} }

View File

@ -31,7 +31,12 @@ class Merge extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val inports = new PropertyDescriptor().name("inports").displayName("inports").description("inports string, seperated by ,.").defaultValue("").required(true) val inports = new PropertyDescriptor()
.name("inports")
.displayName("inports")
.description("inports string, seperated by ,.")
.defaultValue("")
.required(true)
descriptor = inports :: descriptor descriptor = inports :: descriptor
descriptor descriptor
} }

View File

@ -16,23 +16,18 @@ class SelectField extends ConfigurableStop {
val inportList: List[String] = List(Port.DefaultPort.toString) val inportList: List[String] = List(Port.DefaultPort.toString)
val outportList: List[String] = List(Port.DefaultPort.toString) val outportList: List[String] = List(Port.DefaultPort.toString)
var schema:String = _ var fields:String = _
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val df = in.read() val df = in.read()
val field = schema.split(",") val field = fields.split(",")
val columnArray : Array[Column] = new Array[Column](field.size) val columnArray : Array[Column] = new Array[Column](field.size)
for(i <- 0 to field.size - 1){ for(i <- 0 to field.size - 1){
columnArray(i) = new Column(field(i)) columnArray(i) = new Column(field(i))
} }
var finalFieldDF : DataFrame = df.select(columnArray:_*) var finalFieldDF : DataFrame = df.select(columnArray:_*)
//finalFieldDF.printSchema()
//finalFieldDF.show(2)
out.write(finalFieldDF) out.write(finalFieldDF)
} }
@ -41,12 +36,17 @@ class SelectField extends ConfigurableStop {
} }
def setProperties(map : Map[String, Any]): Unit = { def setProperties(map : Map[String, Any]): Unit = {
schema = MapUtil.get(map,"schema").asInstanceOf[String] fields = MapUtil.get(map,"fields").asInstanceOf[String]
} }
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val inports = new PropertyDescriptor().name("schema").displayName("schema").description("The Schema you want to select").defaultValue("").required(true) val inports = new PropertyDescriptor()
.name("fields")
.displayName("Fields")
.description("The fields you want to select")
.defaultValue("")
.required(true)
descriptor = inports :: descriptor descriptor = inports :: descriptor
descriptor descriptor
} }

View File

@ -10,8 +10,8 @@ import org.apache.spark.sql.{DataFrame, Row, SparkSession}
class Subtract extends ConfigurableStop{ class Subtract extends ConfigurableStop{
override val authorEmail: String = "yangqidong@cnic.cn" override val authorEmail: String = "yangqidong@cnic.cn"
override val description: String = "Delete duplicates of the first and second tables from the first table" override val description: String = "Delete the data existing in the right table from the left table"
override val inportList: List[String] =List(Port.AnyPort.toString) override val inportList: List[String] =List(Port.LeftPort.toString,Port.RightPort.toString)
override val outportList: List[String] = List(Port.DefaultPort.toString) override val outportList: List[String] = List(Port.DefaultPort.toString)
override def setProperties(map: Map[String, Any]): Unit = { override def setProperties(map: Map[String, Any]): Unit = {
@ -31,7 +31,6 @@ class Subtract extends ConfigurableStop{
List(StopGroup.CommonGroup.toString) List(StopGroup.CommonGroup.toString)
} }
override def initialize(ctx: ProcessContext): Unit = { override def initialize(ctx: ProcessContext): Unit = {
} }
@ -39,14 +38,10 @@ class Subtract extends ConfigurableStop{
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]() val spark = pec.get[SparkSession]()
val dfs: Seq[DataFrame] = in.ports().map(in.read(_)) val leftDF = in.read(Port.LeftPort)
var df1: DataFrame = dfs(0) val rightDF = in.read(Port.RightPort)
var df2: DataFrame = dfs(1)
val rdd: JavaRDD[Row] = df1.toJavaRDD.subtract(df2.toJavaRDD) val outDF = leftDF.except(rightDF)
val schema: StructType = df1.schema
val outDF: DataFrame = spark.createDataFrame(rdd,schema)
out.write(outDF) out.write(outDF)
} }

View File

@ -1,121 +0,0 @@
package cn.piflow.bundle.excel
import java.io.{BufferedInputStream, ByteArrayInputStream}
import cn.piflow._
import cn.piflow.bundle.util.ExcelToJson
import cn.piflow.conf._
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import net.sf.json.JSONArray
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem, Path}
import org.apache.spark.sql.{DataFrame, SparkSession}
class ExcelParser extends ConfigurableStop{
val authorEmail: String = "ygang@cnic.cn"
val description: String = "Parse excel file to json"
val inportList: List[String] = List(Port.DefaultPort.toString)
val outportList: List[String] = List(Port.DefaultPort.toString)
var cachePath: String = _
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
val sc = spark.sparkContext
val inDf = in.read()
val configuration: Configuration = new Configuration()
var pathStr: String =inDf.take(1)(0).get(0).asInstanceOf[String]
val pathARR: Array[String] = pathStr.split("\\/")
var hdfsUrl:String=""
for (x <- (0 until 3)){
hdfsUrl+=(pathARR(x) +"/")
}
configuration.set("fs.defaultFS",hdfsUrl)
var fs: FileSystem = FileSystem.get(configuration)
var hdfsPathJsonCache:String = ""
hdfsPathJsonCache = hdfsUrl+cachePath+"/excelCache/excelCache.json"
val path: Path = new Path(hdfsPathJsonCache)
if(fs.exists(path)){
fs.delete(path)
}
fs.create(path).close()
var fdosOut: FSDataOutputStream = fs.append(path)
var jsonStr: String =""
var bisIn: BufferedInputStream =null
var count = 0 ;
inDf.collect().foreach(row=>{
val pathStr = row.get(0).asInstanceOf[String]
if (pathStr.endsWith(".xls") || pathStr.endsWith("xlsx")){
val array: JSONArray = ExcelToJson.readExcel(pathStr,hdfsUrl)
println(array.size())
for (i <- 0 until array.size()){
jsonStr = array.get(i).toString
bisIn = new BufferedInputStream(new ByteArrayInputStream((jsonStr+"\n").getBytes()))
val buff: Array[Byte] = new Array[Byte](1048576)
var num: Int = bisIn.read(buff)
while (num != -1) {
fdosOut.write(buff, 0, num)
fdosOut.flush()
num = bisIn.read(buff)
}
fdosOut.flush()
bisIn = null
}
}
})
fdosOut.close()
val df: DataFrame = spark.read.json(hdfsPathJsonCache)
out.write(df)
}
def initialize(ctx: ProcessContext): Unit = {
}
def setProperties(map : Map[String, Any]): Unit = {
cachePath = MapUtil.get(map,"cachePath").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val jsonSavePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").description("save path of json").defaultValue("").required(true)
descriptor = jsonSavePath :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("icon/excel/excelParse.png")
}
override def getGroup(): List[String] = {
List(StopGroup.ExcelGroup.toString)
}
}

View File

@ -1,66 +0,0 @@
package cn.piflow.bundle.file
import java.net.URI
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.conf._
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
class FetchFile extends ConfigurableStop{
val authorEmail: String = "06whuxx@163.com"
val description: String = "Fetch file from hdfs to local"
val inportList: List[String] = List(Port.NonePort.toString)
val outportList: List[String] = List(Port.DefaultPort.toString)
var hdfs_path:String =_
var local_path:String=_
var fs:FileSystem=null
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
initFs(hdfs_path)
downloadFiletoLocal(hdfs_path,local_path)
}
def initFs(uri:String):Unit={
val conf:Configuration=new Configuration()
fs= FileSystem.get(new URI(uri),conf)
}
def downloadFiletoLocal(hdfs_path:String,local_path:String):Unit={
val src:Path=new Path(hdfs_path)
val dst:Path=new Path(local_path)
fs.copyToLocalFile(src,dst)
}
def initialize(ctx: ProcessContext): Unit = {
}
def setProperties(map: Map[String, Any]): Unit = {
hdfs_path=MapUtil.get(map,key="hdfs_path").asInstanceOf[String]
local_path=MapUtil.get(map,key="local_path").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val hdfs_path = new PropertyDescriptor().name("hdfs_path").displayName("HDFS_PATH").defaultValue("").required(true)
val local_path = new PropertyDescriptor().name("local_path").displayName("LOCAL_PATH").defaultValue("").required(true)
descriptor = hdfs_path :: descriptor
descriptor = local_path :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("icon/file/FetchFile.png")
}
override def getGroup(): List[String] = {
List(StopGroup.FileGroup.toString)
}
}

View File

@ -1,68 +0,0 @@
package cn.piflow.bundle.file
import java.io.{ByteArrayOutputStream, InputStream}
import java.net.{HttpURLConnection, URI, URL}
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.conf._
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.SparkSession
class PutFile extends ConfigurableStop{
val authorEmail: String = "06whuxx@163.com"
val description: String = "Put local file to hdfs"
val inportList: List[String] = List(Port.DefaultPort.toString)
val outportList: List[String] = List(Port.DefaultPort.toString)
var hdfs_path:String =_
var local_path:String=_
var fs:FileSystem=null
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
initFs(hdfs_path)
addFiletoHDFS(hdfs_path,local_path)
}
def initFs(uri:String):Unit={
val conf:Configuration=new Configuration()
fs= FileSystem.get(new URI(uri),conf)
}
def addFiletoHDFS(hdfs_path:String,local_path:String):Unit={
val dst:Path=new Path(hdfs_path)
val src:Path=new Path(local_path)
fs.copyFromLocalFile(src,dst)
}
def initialize(ctx: ProcessContext): Unit = {
}
def setProperties(map: Map[String, Any]): Unit = {
hdfs_path=MapUtil.get(map,key="hdfs_path").asInstanceOf[String]
local_path=MapUtil.get(map,key="local_path").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val hdfs_path = new PropertyDescriptor().name("hdfs_path").displayName("HDFS_PATH").defaultValue("").required(true)
val local_path = new PropertyDescriptor().name("local_path").displayName("LOCAL_PATH").defaultValue("").required(true)
descriptor = hdfs_path :: descriptor
descriptor = local_path :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("icon/file/PutFile.png")
}
override def getGroup(): List[String] = {
List(StopGroup.FileGroup.toString)
}
}

View File

@ -13,6 +13,7 @@ class RegexTextProcess extends ConfigurableStop{
val description: String = "Use regex to replace text" val description: String = "Use regex to replace text"
val inportList: List[String] = List(Port.DefaultPort.toString) val inportList: List[String] = List(Port.DefaultPort.toString)
val outportList: List[String] = List(Port.DefaultPort.toString) val outportList: List[String] = List(Port.DefaultPort.toString)
var regex:String =_ var regex:String =_
var columnName:String=_ var columnName:String=_
var replaceStr:String=_ var replaceStr:String=_
@ -46,9 +47,29 @@ class RegexTextProcess extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val regex = new PropertyDescriptor().name("regex").displayName("REGEX").defaultValue("").required(true) val regex = new PropertyDescriptor().name("regex")
val columnName = new PropertyDescriptor().name("columnName").displayName("COLUMN_NAME").defaultValue("").required(true) .displayName("Regex")
val replaceStr = new PropertyDescriptor().name("replaceStr").displayName("REPLACE_STR").defaultValue("").required(true) .description("regex")
.defaultValue("")
.required(true)
.example("")
val columnName = new PropertyDescriptor()
.name("columnName")
.displayName("ColumnName")
.description("Field name of schema")
.defaultValue("")
.required(true)
.example("")
val replaceStr = new PropertyDescriptor()
.name("replaceStr")
.displayName("ReplaceStr")
.description("Replaced string")
.defaultValue("")
.required(true)
.example("")
descriptor = regex :: descriptor descriptor = regex :: descriptor
descriptor = columnName :: descriptor descriptor = columnName :: descriptor
descriptor = replaceStr :: descriptor descriptor = replaceStr :: descriptor

View File

@ -9,7 +9,7 @@ import org.apache.spark.graphx.{GraphLoader, PartitionStrategy}
class LoadGraph extends ConfigurableStop { class LoadGraph extends ConfigurableStop {
val authorEmail: String = "06whuxx@163.com" val authorEmail: String = "06whuxx@163.com"
val description: String = "Load data and construct a graph" val description: String = "Load data and construct a graphx"
val inportList: List[String] = List(Port.NonePort.toString) val inportList: List[String] = List(Port.NonePort.toString)
@ -27,7 +27,7 @@ class LoadGraph extends ConfigurableStop {
import spark.sqlContext.implicits._ import spark.sqlContext.implicits._
var graph=GraphLoader.edgeListFile(sc,dataPath,true).partitionBy(PartitionStrategy.RandomVertexCut) var graph=GraphLoader.edgeListFile(sc,dataPath,true).partitionBy(PartitionStrategy.RandomVertexCut)
//val df=Seq((graph.edges.to,graph.vertices)).toDF() //val df=Seq((graphx.edges.to,graphx.vertices)).toDF()
//TODO:can not transfer EdgeRdd to Dataset //TODO:can not transfer EdgeRdd to Dataset
out.write(edgePort,graph.edges.toDF()) out.write(edgePort,graph.edges.toDF())
out.write(vertexPort,graph.vertices.toDF()) out.write(vertexPort,graph.vertices.toDF())

View File

@ -19,7 +19,7 @@ class DeleteHdfs extends ConfigurableStop{
override val description: String = "Delete file or directory on hdfs" override val description: String = "Delete file or directory on hdfs"
var hdfsUrl :String= _ var hdfsUrl :String= _
var deletePath :String = _ var hdfsPath :String = _
var isCustomize:String=_ var isCustomize:String=_
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
@ -46,7 +46,7 @@ class DeleteHdfs extends ConfigurableStop{
}) })
} else { } else {
val array = deletePath.split(",") val array = hdfsPath.split(",")
for (i<- 0 until array.length){ for (i<- 0 until array.length){
val hdfsPath = hdfsUrl+"/"+array(i) val hdfsPath = hdfsUrl+"/"+array(i)
@ -62,21 +62,44 @@ class DeleteHdfs extends ConfigurableStop{
} }
override def setProperties(map: Map[String, Any]): Unit = { override def setProperties(map: Map[String, Any]): Unit = {
hdfsUrl = MapUtil.get(map,key="hdfsUrl").asInstanceOf[String] hdfsUrl = MapUtil.get(map,key="hdfsUrl").asInstanceOf[String]
deletePath = MapUtil.get(map,key="deletePath").asInstanceOf[String] hdfsPath = MapUtil.get(map,key="hdfsPath").asInstanceOf[String]
isCustomize=MapUtil.get(map,key="isCustomize").asInstanceOf[String] isCustomize=MapUtil.get(map,key="isCustomize").asInstanceOf[String]
} }
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val hdfsUrl = new PropertyDescriptor().name("hdfsUrl").displayName("hdfsUrl").defaultValue("").required(true) val hdfsPath = new PropertyDescriptor()
val deletePath = new PropertyDescriptor().name("deletePath").displayName("deletePath").defaultValue("").required(true) .name("hdfsPath")
val isCustomize = new PropertyDescriptor().name("isCustomize").displayName("isCustomize").description("Whether to customize the compressed file path, if true, " + .displayName("HdfsPath")
.defaultValue("")
.description("File path of HDFS")
.required(true)
.example("/work/test1/")
descriptor = hdfsPath :: descriptor
val hdfsUrl = new PropertyDescriptor()
.name("hdfsUrl")
.displayName("HdfsUrl")
.defaultValue("")
.description("URL address of HDFS")
.required(true)
.example("hdfs://192.168.3.138:8020")
descriptor = hdfsUrl :: descriptor
val isCustomize = new PropertyDescriptor()
.name("isCustomize")
.displayName("isCustomize")
.description("Whether to customize the compressed file path, if true, " +
"you must specify the path where the compressed file is located . " + "you must specify the path where the compressed file is located . " +
"If it is false, it will automatically find the file path data from the upstream port ") "If it is false, it will automatically find the file path data from the upstream port ")
.defaultValue("false").allowableValues(Set("true","false")).required(true) .defaultValue("true")
.allowableValues(Set("true","false"))
.required(true)
.example("true")
descriptor = isCustomize :: descriptor descriptor = isCustomize :: descriptor
descriptor = hdfsUrl :: descriptor
descriptor = deletePath :: descriptor
descriptor descriptor
} }

View File

@ -1,4 +1,4 @@
package cn.piflow.bundle.http package cn.piflow.bundle.hdfs
import java.io.InputStream import java.io.InputStream
import java.net.{HttpURLConnection, URL} import java.net.{HttpURLConnection, URL}
@ -13,14 +13,16 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.{DataFrame, Row, SparkSession}
class FileDownHDFS extends ConfigurableStop{ class FileDownHdfs extends ConfigurableStop{
val authorEmail: String = "yangqidong@cnic.cn" val authorEmail: String = "yangqidong@cnic.cn"
val description: String = "Download url to hdfs" val description: String = "Download the data from the url to HDFS"
val inportList: List[String] = List(Port.NonePort.toString) val inportList: List[String] = List(Port.DefaultPort.toString)
val outportList: List[String] = List(Port.DefaultPort.toString) val outportList: List[String] = List(Port.DefaultPort.toString)
var hdfsUrl:String =_
var hdfsPath:String =_
var url_str:String=_ var url_str:String=_
var savePath:String=_
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]() val spark = pec.get[SparkSession]()
@ -36,16 +38,11 @@ class FileDownHDFS extends ConfigurableStop{
val configuration: Configuration = new Configuration() val configuration: Configuration = new Configuration()
val pathARR: Array[String] = savePath.split("\\/")
var hdfsUrl:String=""
for (x <- (0 until 3)){
hdfsUrl+=(pathARR(x) +"/")
}
configuration.set("fs.defaultFS",hdfsUrl) configuration.set("fs.defaultFS",hdfsUrl)
val fs = FileSystem.get(configuration) val fs = FileSystem.get(configuration)
val fdos: FSDataOutputStream = fs.create(new Path(savePath)) val fdos: FSDataOutputStream = fs.create(new Path(hdfsUrl+hdfsPath))
while(((byteRead=inputStream.read(buffer)) != -1) && (byteRead != -1)){ while(((byteRead=inputStream.read(buffer)) != -1) && (byteRead != -1)){
@ -56,7 +53,7 @@ class FileDownHDFS extends ConfigurableStop{
inputStream.close() inputStream.close()
fdos.close() fdos.close()
var seq:Seq[String]=Seq(savePath) var seq:Seq[String]=Seq(hdfsUrl+hdfsPath)
val row: Row = Row.fromSeq(seq) val row: Row = Row.fromSeq(seq)
val list:List[Row]=List(row) val list:List[Row]=List(row)
val rdd: RDD[Row] = spark.sparkContext.makeRDD(list) val rdd: RDD[Row] = spark.sparkContext.makeRDD(list)
@ -74,16 +71,43 @@ class FileDownHDFS extends ConfigurableStop{
} }
def setProperties(map: Map[String, Any]): Unit = { def setProperties(map: Map[String, Any]): Unit = {
hdfsUrl=MapUtil.get(map,key="hdfsUrl").asInstanceOf[String]
hdfsPath=MapUtil.get(map,key="hdfsPath").asInstanceOf[String]
url_str=MapUtil.get(map,key="url_str").asInstanceOf[String] url_str=MapUtil.get(map,key="url_str").asInstanceOf[String]
savePath=MapUtil.get(map,key="savePath").asInstanceOf[String]
} }
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val url_str = new PropertyDescriptor().name("url_str").displayName("URL").description("Network address of file").defaultValue("").required(true)
val savePath = new PropertyDescriptor().name("savePath").displayName("savePath").description("The HDFS path and name you want to save, such as hdfs://10.0.86.89:9000/a/a.gz").defaultValue("").required(true)
val url_str = new PropertyDescriptor()
.name("url_str")
.displayName("Url_str")
.description("Network address of file")
.defaultValue("")
.required(true)
val hdfsPath = new PropertyDescriptor()
.name("hdfsDirPath")
.displayName("HdfsDirPath")
.defaultValue("")
.description("File dir path of HDFS")
.required(true)
.example("/work/test.gz")
val hdfsUrl = new PropertyDescriptor()
.name("hdfsUrl")
.displayName("HdfsUrl")
.defaultValue("")
.description("URL address of HDFS")
.required(true)
.example("hdfs://192.168.3.138:8020")
descriptor = url_str :: descriptor descriptor = url_str :: descriptor
descriptor = savePath :: descriptor descriptor = hdfsUrl :: descriptor
descriptor = hdfsPath :: descriptor
descriptor descriptor
} }
@ -92,7 +116,7 @@ class FileDownHDFS extends ConfigurableStop{
} }
override def getGroup(): List[String] = { override def getGroup(): List[String] = {
List(StopGroup.HttpGroup.toString) List(StopGroup.HdfsGroup.toString)
} }

View File

@ -47,7 +47,6 @@ class GetHdfs extends ConfigurableStop{
val rdd = sc.textFile(path) val rdd = sc.textFile(path)
val outDf = rdd.toDF() val outDf = rdd.toDF()
outDf.schema.printTreeString() outDf.schema.printTreeString()
//outDf.show()
out.write(outDf) out.write(outDf)
} }
@ -60,16 +59,35 @@ class GetHdfs extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val hdfsPath = new PropertyDescriptor().name("hdfsPath").displayName("hdfsPath") val hdfsPath = new PropertyDescriptor()
.defaultValue("").required(true) .name("hdfsPath")
val hdfsUrl = new PropertyDescriptor().name("hdfsUrl").displayName("hdfsUrl") .displayName("HdfsPath")
.defaultValue("").required(true) .defaultValue("")
val types = new PropertyDescriptor().name("types").displayName("types").description("txt,parquet,csv,json") .description("File path of HDFS")
.defaultValue("txt").allowableValues(Set("txt","parquet","csv","json")).required(true) .required(true)
.example("/work/")
descriptor = hdfsPath :: descriptor
val hdfsUrl = new PropertyDescriptor()
.name("hdfsUrl")
.displayName("HdfsUrl")
.defaultValue("")
.description("URL address of HDFS")
.required(true)
.example("hdfs://192.168.3.138:8020")
descriptor = hdfsUrl :: descriptor
val types = new PropertyDescriptor().
name("types")
.displayName("types")
.description("The type of file you want to load")
.defaultValue("csv")
.allowableValues(Set("txt","parquet","csv","json"))
.required(true)
.example("csv")
descriptor = types :: descriptor descriptor = types :: descriptor
descriptor = hdfsPath :: descriptor
descriptor = hdfsUrl :: descriptor
descriptor descriptor
} }

View File

@ -21,18 +21,16 @@ class ListHdfs extends ConfigurableStop{
override val outportList: List[String] = List(Port.DefaultPort.toString) override val outportList: List[String] = List(Port.DefaultPort.toString)
override val description: String = "Retrieve a list of files from hdfs" override val description: String = "Retrieve a list of files from hdfs"
var HDFSPath :String= _ var hdfsPath :String= _
var HDFSUrl :String= _ var hdfsUrl :String= _
var pathARR:ArrayBuffer[String]=ArrayBuffer() var pathARR:ArrayBuffer[String]=ArrayBuffer()
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]() val spark = pec.get[SparkSession]()
val sc = spark.sparkContext val sc = spark.sparkContext
val path = new Path(HDFSPath) val path = new Path(hdfsPath)
iterationFile(path.toString) iterationFile(path.toString)
import spark.implicits._
val rows: List[Row] = pathARR.map(each => { val rows: List[Row] = pathARR.map(each => {
var arr:Array[String]=Array(each) var arr:Array[String]=Array(each)
val row: Row = Row.fromSeq(arr) val row: Row = Row.fromSeq(arr)
@ -40,8 +38,6 @@ class ListHdfs extends ConfigurableStop{
}).toList }).toList
val rowRDD: RDD[Row] = sc.makeRDD(rows) val rowRDD: RDD[Row] = sc.makeRDD(rows)
// val fields: Array[StructField] = "path".split("/").map(d=>StructField(d,StringType,nullable = true))
// val schema: StructType = StructType(fields)
val schema: StructType = StructType(Array( val schema: StructType = StructType(Array(
StructField("path",StringType) StructField("path",StringType)
@ -53,7 +49,7 @@ class ListHdfs extends ConfigurableStop{
// recursively traverse the folder // recursively traverse the folder
def iterationFile(path: String):Unit = { def iterationFile(path: String):Unit = {
val config = new Configuration() val config = new Configuration()
config.set("fs.defaultFS",HDFSUrl) config.set("fs.defaultFS",hdfsUrl)
val fs = FileSystem.get(config) val fs = FileSystem.get(config)
val listf = new Path(path) val listf = new Path(path)
@ -72,15 +68,29 @@ class ListHdfs extends ConfigurableStop{
} }
override def setProperties(map: Map[String, Any]): Unit = { override def setProperties(map: Map[String, Any]): Unit = {
HDFSUrl = MapUtil.get(map,key="HDFSUrl").asInstanceOf[String] hdfsUrl = MapUtil.get(map,key="hdfsUrl").asInstanceOf[String]
HDFSPath = MapUtil.get(map,key="HDFSPath").asInstanceOf[String] hdfsPath = MapUtil.get(map,key="hdfsPath").asInstanceOf[String]
} }
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val hdfsPath = new PropertyDescriptor().name("HDFSPath").displayName("HDFSPath").defaultValue("").required(true) val hdfsPath = new PropertyDescriptor()
val hdfsUrl = new PropertyDescriptor().name("HDFSUrl").displayName("HDFSUrl").defaultValue("").required(true) .name("hdfsPath")
.displayName("HdfsPath")
.defaultValue("")
.description("File path of HDFS")
.required(true)
.example("/work/")
descriptor = hdfsPath :: descriptor descriptor = hdfsPath :: descriptor
val hdfsUrl = new PropertyDescriptor()
.name("hdfsUrl")
.displayName("HdfsUrl")
.defaultValue("")
.description("URL address of HDFS")
.required(true)
.example("hdfs://192.168.3.138:8020")
descriptor = hdfsUrl :: descriptor descriptor = hdfsUrl :: descriptor
descriptor descriptor
} }

View File

@ -9,7 +9,6 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.FileSystem
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSession
class PutHdfs extends ConfigurableStop{ class PutHdfs extends ConfigurableStop{
override val authorEmail: String = "ygang@cnic.com" override val authorEmail: String = "ygang@cnic.com"
override val inportList: List[String] = List(Port.DefaultPort.toString) override val inportList: List[String] = List(Port.DefaultPort.toString)
@ -19,7 +18,7 @@ class PutHdfs extends ConfigurableStop{
var hdfsPath :String= _ var hdfsPath :String= _
var hdfsUrl :String= _ var hdfsUrl :String= _
var types :String= _ var types :String= _
var partition :Int= _ var partition :String= _
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]() val spark = pec.get[SparkSession]()
@ -30,12 +29,12 @@ class PutHdfs extends ConfigurableStop{
val fs = FileSystem.get(config) val fs = FileSystem.get(config)
if (types=="json"){ if (types=="json"){
inDF.repartition(partition).write.json(hdfsUrl+hdfsPath) inDF.repartition(partition.toInt).write.json(hdfsUrl+hdfsPath)
} else if (types=="csv"){ } else if (types=="csv"){
inDF.repartition(partition).write.csv(hdfsUrl+hdfsPath) inDF.repartition(partition.toInt).write.csv(hdfsUrl+hdfsPath)
} else { } else {
//parquet //parquet
inDF.repartition(partition).write.save(hdfsUrl+hdfsPath) inDF.repartition(partition.toInt).write.save(hdfsUrl+hdfsPath)
} }
} }
@ -43,17 +42,46 @@ class PutHdfs extends ConfigurableStop{
hdfsUrl = MapUtil.get(map,key="hdfsUrl").asInstanceOf[String] hdfsUrl = MapUtil.get(map,key="hdfsUrl").asInstanceOf[String]
hdfsPath = MapUtil.get(map,key="hdfsPath").asInstanceOf[String] hdfsPath = MapUtil.get(map,key="hdfsPath").asInstanceOf[String]
types = MapUtil.get(map,key="types").asInstanceOf[String] types = MapUtil.get(map,key="types").asInstanceOf[String]
partition = MapUtil.get(map,key="partition").asInstanceOf[Int] partition = MapUtil.get(map,key="partition").asInstanceOf[String]
} }
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val hdfsPath = new PropertyDescriptor().name("hdfsPath").displayName("hdfsPath").defaultValue("").required(true) val hdfsPath = new PropertyDescriptor()
val hdfsUrl = new PropertyDescriptor().name("hdfsUrl").displayName("hdfsUrl").defaultValue("").required(true) .name("hdfsPath")
val types = new PropertyDescriptor().name("types").displayName("json,csv,parquet").description("json,csv,parquet") .displayName("HdfsPath")
.defaultValue("csv").allowableValues(Set("json","csv","parquet")).required(true) .defaultValue("")
.description("File path of HDFS")
.required(true)
.example("/work/")
val hdfsUrl = new PropertyDescriptor()
.name("hdfsUrl")
.displayName("HdfsUrl")
.defaultValue("")
.description("URL address of HDFS")
.required(true)
.example("hdfs://192.168.3.138:8020")
val types = new PropertyDescriptor()
.name("types")
.displayName("Types")
.description("What format do you want to write : json,csv,parquet")
.defaultValue("csv")
.allowableValues(Set("json","csv","parquet"))
.required(true)
.example("csv")
val partition = new PropertyDescriptor()
.name("partition")
.displayName("Partition")
.description("Write a few partitions")
.defaultValue("1")
.required(true)
.example("1")
val partition = new PropertyDescriptor().name("partition").displayName("repartition").description("partition").defaultValue("").required(true)
descriptor = partition :: descriptor descriptor = partition :: descriptor
descriptor = types :: descriptor descriptor = types :: descriptor
descriptor = hdfsPath :: descriptor descriptor = hdfsPath :: descriptor

View File

@ -125,21 +125,58 @@ class SaveToHdfs extends ConfigurableStop {
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val hdfsDirPath = new PropertyDescriptor().name("hdfsDirPath").displayName("hdfsDirPath").defaultValue("/").required(true)
val hdfsUrl = new PropertyDescriptor().name("hdfsUrl").displayName("hdfsUrl").defaultValue("").required(true)
val fileName = new PropertyDescriptor().name("fileName").displayName("fileName").defaultValue("").required(true) val hdfsDirPath = new PropertyDescriptor()
.name("hdfsDirPath")
.displayName("HdfsDirPath")
.defaultValue("")
.description("File dir path of HDFS")
.required(true)
.example("/work/")
val types = new PropertyDescriptor().name("types").displayName("json,csv,text").description("json,csv,text")
.defaultValue("csv").allowableValues(Set("json","csv","text")).required(true)
val delimiter = new PropertyDescriptor().name("delimiter").displayName("delimiter").description("type is csv ,please set it ").defaultValue(",").required(true) val hdfsUrl = new PropertyDescriptor()
.name("hdfsUrl")
.displayName("HdfsUrl")
.defaultValue("")
.description("URL address of HDFS")
.required(true)
.example("hdfs://192.168.3.138:8020")
val fileName = new PropertyDescriptor()
.name("fileName")
.displayName("fileName")
.description("File name")
.defaultValue("")
.required(true)
.example("test.csv")
val types = new PropertyDescriptor()
.name("types")
.displayName("json,csv,text")
.description("What format do you want to write : json,csv,parquet")
.defaultValue("csv")
.allowableValues(Set("json","csv","text"))
.required(true)
.example("csv")
val delimiter = new PropertyDescriptor()
.name("delimiter")
.displayName("delimiter")
.description("The delimiter of csv file,types is csv ,please set it ")
.defaultValue(",")
.required(true)
//header //header
val header = new PropertyDescriptor().name("header").displayName("header").description("Whether the csv file have header or not").defaultValue("").required(true) val header = new PropertyDescriptor()
.name("header")
.displayName("header")
.description("Whether the csv file have header or not")
.defaultValue("true")
.required(true)
descriptor = header :: descriptor descriptor = header :: descriptor
descriptor = fileName :: descriptor descriptor = fileName :: descriptor
descriptor = delimiter :: descriptor descriptor = delimiter :: descriptor
descriptor = hdfsDirPath :: descriptor descriptor = hdfsDirPath :: descriptor

View File

@ -20,8 +20,8 @@ class SelectFilesByName extends ConfigurableStop{
override val inportList: List[String] = List(Port.DefaultPort.toString) override val inportList: List[String] = List(Port.DefaultPort.toString)
override val outportList: List[String] = List(Port.DefaultPort.toString) override val outportList: List[String] = List(Port.DefaultPort.toString)
var HDFSUrl:String=_ var hdfsUrl:String=_
var HDFSPath:String=_ var hdfsPath:String=_
var selectionConditions:String =_ var selectionConditions:String =_
var fs: FileSystem=null var fs: FileSystem=null
@ -53,10 +53,10 @@ class SelectFilesByName extends ConfigurableStop{
val session: SparkSession = pec.get[SparkSession]() val session: SparkSession = pec.get[SparkSession]()
val configuration: Configuration = new Configuration() val configuration: Configuration = new Configuration()
configuration.set("fs.defaultFS", HDFSUrl) configuration.set("fs.defaultFS", hdfsUrl)
fs = FileSystem.get(configuration) fs = FileSystem.get(configuration)
selectFile(HDFSPath) selectFile(hdfsPath)
val rows: List[Row] = pathARR.map(each => { val rows: List[Row] = pathARR.map(each => {
var arr:Array[String]=Array(each) var arr:Array[String]=Array(each)
@ -67,33 +67,47 @@ class SelectFilesByName extends ConfigurableStop{
val fields: Array[StructField] = "path".split("/").map(d=>StructField(d,StringType,nullable = true)) val fields: Array[StructField] = "path".split("/").map(d=>StructField(d,StringType,nullable = true))
val schema: StructType = StructType(fields) val schema: StructType = StructType(fields)
val df: DataFrame = session.createDataFrame(rowRDD,schema) val df: DataFrame = session.createDataFrame(rowRDD,schema)
df.collect().foreach(println)
println("#################################################")
df.show(20)
println(df.count+"#################################################")
out.write(df) out.write(df)
} }
override def setProperties(map: Map[String, Any]): Unit = { override def setProperties(map: Map[String, Any]): Unit = {
HDFSUrl=MapUtil.get(map,key="HDFSUrl").asInstanceOf[String] hdfsUrl=MapUtil.get(map,key="hdfsUrl").asInstanceOf[String]
HDFSPath=MapUtil.get(map,key="HDFSPath").asInstanceOf[String] hdfsPath=MapUtil.get(map,key="hdfsPath").asInstanceOf[String]
selectionConditions=MapUtil.get(map,key="selectionConditions").asInstanceOf[String] selectionConditions=MapUtil.get(map,key="selectionConditions").asInstanceOf[String]
} }
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val HDFSUrl = new PropertyDescriptor().name("HDFSUrl").displayName("HDFSUrl").description("The URL of the HDFS file system, such as hdfs://ip:port") val hdfsPath = new PropertyDescriptor()
.defaultValue("hdfs://").required(true) .name("hdfsPath")
val HDFSPath = new PropertyDescriptor().name("HDFSPath").displayName("HDFSPath").description("The save path of the HDFS file system, such as /test/Ab") .displayName("HdfsPath")
.defaultValue("").required(true) .defaultValue("")
val selectionConditions = new PropertyDescriptor().name("selectionConditions").displayName("selectionConditions").description("To select conditions, you need to fill in regular expressions in java, such as. * abc. *") .description("File path of HDFS")
.defaultValue("").required(true) .required(true)
descriptor = HDFSUrl :: descriptor .example("/work/")
descriptor = HDFSPath :: descriptor
val hdfsUrl = new PropertyDescriptor()
.name("hdfsUrl")
.displayName("HdfsUrl")
.defaultValue("")
.description("URL address of HDFS")
.required(true)
.example("hdfs://192.168.3.138:8020")
val selectionConditions = new PropertyDescriptor()
.name("selectionConditions")
.displayName("SelectionConditions")
.description("To select conditions, you need to fill in regular expressions in java, such as '.*.csv'")
.defaultValue("")
.required(true)
.example("")
descriptor = hdfsUrl :: descriptor
descriptor = hdfsPath :: descriptor
descriptor = selectionConditions :: descriptor descriptor = selectionConditions :: descriptor
descriptor descriptor
} }

View File

@ -179,13 +179,43 @@ class UnzipFilesOnHDFS extends ConfigurableStop {
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val filePath = new PropertyDescriptor().name("filePath").displayName("filePath").description("file path,such as /a/a.gz").defaultValue("").required(false) val filePath = new PropertyDescriptor()
val hdfsUrl = new PropertyDescriptor().name("hdfsUrl").displayName("hdfsUrl").description("the url of HDFS,such as hdfs://10.0.86.89:9000").defaultValue("").required(false) .name("filePath")
val savePath = new PropertyDescriptor().name("savePath").displayName("savePath").description("This parameter can specify the location of the decompressed file, you can choose not to fill in, " + .displayName("FilePath")
"the program will default to save the decompressed file in the folder where the source file is located. If you fill in, you can specify a folder, such as /A/AB/").defaultValue("").required(false) .defaultValue("")
val isCustomize = new PropertyDescriptor().name("isCustomize").displayName("isCustomize").description("Whether to customize the compressed file path, if true, " + .description("File path of HDFS,such as '/work/a.gz'")
"you must specify the path where the compressed file is located . " + .required(false)
.example("/work/a.gz ")
val hdfsUrl = new PropertyDescriptor()
.name("hdfsUrl")
.displayName("HdfsUrl")
.defaultValue("")
.description("URL address of HDFS")
.required(true)
.example("hdfs://192.168.3.138:8020")
val savePath = new PropertyDescriptor()
.name("savePath")
.displayName("savePath")
.description("This parameter can specify the location of the decompressed file, you can choose not to fill in, " +
"the program will default to save the decompressed file in the folder where the source file is located. If you fill in, you can specify a folder, such as /A/AB/")
.defaultValue("")
.required(false)
.example("/work/aa/")
val isCustomize = new PropertyDescriptor()
.name("isCustomize")
.displayName("isCustomize")
.description("Whether to customize the compressed file path, if true, \n" +
"you must specify the path where the compressed file is located . \n" +
"If it is false, it will automatically find the file path data from the upstream port ") "If it is false, it will automatically find the file path data from the upstream port ")
.defaultValue("false").allowableValues(Set("true","false")).required(true) .defaultValue("false").allowableValues(Set("true","false")).required(true)
descriptor = isCustomize :: descriptor descriptor = isCustomize :: descriptor
descriptor = filePath :: descriptor descriptor = filePath :: descriptor

View File

@ -0,0 +1,58 @@
package cn.piflow.bundle.File
import java.net.InetAddress
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.{PropertyUtil, ServerIpUtil}
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
class RegexTextProcessTest {
@Test
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/file/regexTextProcess.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val ip = InetAddress.getLocalHost.getHostAddress
cn.piflow.util.FileUtil.writeFile("server.ip=" + ip, ServerIpUtil.getServerIpFile())
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start()
//execute flow
val spark = SparkSession.builder()
.master("local[12]")
.appName("hive")
.config("spark.driver.memory", "4g")
.config("spark.executor.memory", "8g")
.config("spark.cores.max", "8")
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "")
.bind("debug.path","")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
}

View File

@ -0,0 +1,59 @@
package cn.piflow.bundle
import java.net.InetAddress
import java.util.ArrayList
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.{PropertyUtil, ServerIpUtil}
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
class AddUUIDTest {
@Test
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/common/uuid.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val ip = InetAddress.getLocalHost.getHostAddress
cn.piflow.util.FileUtil.writeFile("server.ip=" + ip, ServerIpUtil.getServerIpFile())
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start()
//execute flow
val spark = SparkSession.builder()
.master("local[12]")
.appName("hive")
.config("spark.driver.memory", "4g")
.config("spark.executor.memory", "8g")
.config("spark.cores.max", "8")
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "")
.bind("debug.path","")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
}

View File

@ -0,0 +1,58 @@
package cn.piflow.bundle.common
import java.net.InetAddress
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.{PropertyUtil, ServerIpUtil}
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
class ConvertSchemaTest {
@Test
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/common/convertSchema.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val ip = InetAddress.getLocalHost.getHostAddress
cn.piflow.util.FileUtil.writeFile("server.ip=" + ip, ServerIpUtil.getServerIpFile())
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start()
//execute flow
val spark = SparkSession.builder()
.master("local[12]")
.appName("hive")
.config("spark.driver.memory", "4g")
.config("spark.executor.memory", "8g")
.config("spark.cores.max", "8")
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "")
.bind("debug.path","")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
}

View File

@ -0,0 +1,58 @@
package cn.piflow.bundle.common
import java.net.InetAddress
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.{PropertyUtil, ServerIpUtil}
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
class DistinctTest {
@Test
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/common/distinct.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val ip = InetAddress.getLocalHost.getHostAddress
cn.piflow.util.FileUtil.writeFile("server.ip=" + ip, ServerIpUtil.getServerIpFile())
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start()
//execute flow
val spark = SparkSession.builder()
.master("local[12]")
.appName("hive")
.config("spark.driver.memory", "4g")
.config("spark.executor.memory", "8g")
.config("spark.cores.max", "8")
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "")
.bind("debug.path","")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
}

View File

@ -0,0 +1,58 @@
package cn.piflow.bundle.common
import java.net.InetAddress
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.{PropertyUtil, ServerIpUtil}
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
class DropFieldTest {
@Test
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/common/dropField.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val ip = InetAddress.getLocalHost.getHostAddress
cn.piflow.util.FileUtil.writeFile("server.ip=" + ip, ServerIpUtil.getServerIpFile())
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start()
//execute flow
val spark = SparkSession.builder()
.master("local[12]")
.appName("hive")
.config("spark.driver.memory", "4g")
.config("spark.executor.memory", "8g")
.config("spark.cores.max", "8")
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "")
.bind("debug.path","")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
}

View File

@ -0,0 +1,58 @@
package cn.piflow.bundle.common
import java.net.InetAddress
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.{PropertyUtil, ServerIpUtil}
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
class ExecuteSQLTest {
@Test
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/common/executeSql.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val ip = InetAddress.getLocalHost.getHostAddress
cn.piflow.util.FileUtil.writeFile("server.ip=" + ip, ServerIpUtil.getServerIpFile())
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start()
//execute flow
val spark = SparkSession.builder()
.master("local[12]")
.appName("hive")
.config("spark.driver.memory", "4g")
.config("spark.executor.memory", "8g")
.config("spark.cores.max", "8")
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "")
.bind("debug.path","")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
}

View File

@ -0,0 +1,58 @@
package cn.piflow.bundle.common
import java.net.InetAddress
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.{PropertyUtil, ServerIpUtil}
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
class FilterTest {
@Test
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/common/filter.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val ip = InetAddress.getLocalHost.getHostAddress
cn.piflow.util.FileUtil.writeFile("server.ip=" + ip, ServerIpUtil.getServerIpFile())
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start()
//execute flow
val spark = SparkSession.builder()
.master("local[12]")
.appName("hive")
.config("spark.driver.memory", "4g")
.config("spark.executor.memory", "8g")
.config("spark.cores.max", "8")
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "")
.bind("debug.path","")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
}

View File

@ -0,0 +1,58 @@
package cn.piflow.bundle.common
import java.net.InetAddress
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.{PropertyUtil, ServerIpUtil}
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
class ForkTest {
@Test
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/common/fork.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val ip = InetAddress.getLocalHost.getHostAddress
cn.piflow.util.FileUtil.writeFile("server.ip=" + ip, ServerIpUtil.getServerIpFile())
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start()
//execute flow
val spark = SparkSession.builder()
.master("local[12]")
.appName("hive")
.config("spark.driver.memory", "4g")
.config("spark.executor.memory", "8g")
.config("spark.cores.max", "8")
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "")
.bind("debug.path","")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
}

View File

@ -0,0 +1,58 @@
package cn.piflow.bundle.common
import java.net.InetAddress
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.{PropertyUtil, ServerIpUtil}
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
class JoinTest {
@Test
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/common/join.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val ip = InetAddress.getLocalHost.getHostAddress
cn.piflow.util.FileUtil.writeFile("server.ip=" + ip, ServerIpUtil.getServerIpFile())
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start()
//execute flow
val spark = SparkSession.builder()
.master("local[12]")
.appName("hive")
.config("spark.driver.memory", "4g")
.config("spark.executor.memory", "8g")
.config("spark.cores.max", "8")
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "")
.bind("debug.path","")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
}

View File

@ -0,0 +1,58 @@
package cn.piflow.bundle.common
import java.net.InetAddress
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.{PropertyUtil, ServerIpUtil}
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
class MergeTest {
@Test
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/common/merge.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val ip = InetAddress.getLocalHost.getHostAddress
cn.piflow.util.FileUtil.writeFile("server.ip=" + ip, ServerIpUtil.getServerIpFile())
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start()
//execute flow
val spark = SparkSession.builder()
.master("local[12]")
.appName("hive")
.config("spark.driver.memory", "4g")
.config("spark.executor.memory", "8g")
.config("spark.cores.max", "8")
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "")
.bind("debug.path","")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
}

View File

@ -0,0 +1,58 @@
package cn.piflow.bundle.common
import java.net.InetAddress
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.{PropertyUtil, ServerIpUtil}
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
class RouteTest {
@Test
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/common/route.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val ip = InetAddress.getLocalHost.getHostAddress
cn.piflow.util.FileUtil.writeFile("server.ip=" + ip, ServerIpUtil.getServerIpFile())
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start()
//execute flow
val spark = SparkSession.builder()
.master("local[12]")
.appName("hive")
.config("spark.driver.memory", "4g")
.config("spark.executor.memory", "8g")
.config("spark.cores.max", "8")
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "")
.bind("debug.path","")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
}

View File

@ -0,0 +1,58 @@
package cn.piflow.bundle.common
import java.net.InetAddress
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.{PropertyUtil, ServerIpUtil}
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
class SelectFieldTest {
@Test
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/common/selectField.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val ip = InetAddress.getLocalHost.getHostAddress
cn.piflow.util.FileUtil.writeFile("server.ip=" + ip, ServerIpUtil.getServerIpFile())
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start()
//execute flow
val spark = SparkSession.builder()
.master("local[12]")
.appName("hive")
.config("spark.driver.memory", "4g")
.config("spark.executor.memory", "8g")
.config("spark.cores.max", "8")
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "")
.bind("debug.path","")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
}

View File

@ -0,0 +1,58 @@
package cn.piflow.bundle.common
import java.net.InetAddress
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.{PropertyUtil, ServerIpUtil}
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
class SubtractTest {
@Test
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/common/subtract.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val ip = InetAddress.getLocalHost.getHostAddress
cn.piflow.util.FileUtil.writeFile("server.ip=" + ip, ServerIpUtil.getServerIpFile())
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start()
//execute flow
val spark = SparkSession.builder()
.master("local[12]")
.appName("hive")
.config("spark.driver.memory", "4g")
.config("spark.executor.memory", "8g")
.config("spark.cores.max", "8")
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "")
.bind("debug.path","")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
}

View File

@ -0,0 +1,58 @@
package cn.piflow.bundle.graphx
import java.net.InetAddress
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.{PropertyUtil, ServerIpUtil}
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
class Graphx {
@Test
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/graphx/labelpropagation.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val ip = InetAddress.getLocalHost.getHostAddress
cn.piflow.util.FileUtil.writeFile("server.ip=" + ip, ServerIpUtil.getServerIpFile())
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start()
//execute flow
val spark = SparkSession.builder()
.master("local[12]")
.appName("hive")
.config("spark.driver.memory", "4g")
.config("spark.executor.memory", "8g")
.config("spark.cores.max", "8")
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "")
.bind("debug.path","")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
}

View File

@ -0,0 +1,58 @@
package cn.piflow.bundle.hdfs
import java.net.InetAddress
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.{PropertyUtil, ServerIpUtil}
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
class DeleteHdfsTest {
@Test
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/hdfs/deleteHdfs.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val ip = InetAddress.getLocalHost.getHostAddress
cn.piflow.util.FileUtil.writeFile("server.ip=" + ip, ServerIpUtil.getServerIpFile())
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start()
//execute flow
val spark = SparkSession.builder()
.master("local[12]")
.appName("hive")
.config("spark.driver.memory", "4g")
.config("spark.executor.memory", "8g")
.config("spark.cores.max", "8")
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "")
.bind("debug.path","")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
}

View File

@ -0,0 +1,58 @@
package cn.piflow.bundle.hdfs
import java.net.InetAddress
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.{PropertyUtil, ServerIpUtil}
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
class FileDownhdfsHdfsTest {
@Test
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/hdfs/fileDownHdfs.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val ip = InetAddress.getLocalHost.getHostAddress
cn.piflow.util.FileUtil.writeFile("server.ip=" + ip, ServerIpUtil.getServerIpFile())
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start()
//execute flow
val spark = SparkSession.builder()
.master("local[12]")
.appName("hive")
.config("spark.driver.memory", "4g")
.config("spark.executor.memory", "8g")
.config("spark.cores.max", "8")
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "")
.bind("debug.path","")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
}

View File

@ -0,0 +1,58 @@
package cn.piflow.bundle.hdfs
import java.net.InetAddress
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.{PropertyUtil, ServerIpUtil}
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
class GetHdfsTest {
@Test
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/hdfs/getHdfs.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val ip = InetAddress.getLocalHost.getHostAddress
cn.piflow.util.FileUtil.writeFile("server.ip=" + ip, ServerIpUtil.getServerIpFile())
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start()
//execute flow
val spark = SparkSession.builder()
.master("local[12]")
.appName("hive")
.config("spark.driver.memory", "4g")
.config("spark.executor.memory", "8g")
.config("spark.cores.max", "8")
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "")
.bind("debug.path","")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
}

View File

@ -0,0 +1,58 @@
package cn.piflow.bundle.hdfs
import java.net.InetAddress
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.{PropertyUtil, ServerIpUtil}
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
class ListHdfsTest {
@Test
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/hdfs/listHdfs.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val ip = InetAddress.getLocalHost.getHostAddress
cn.piflow.util.FileUtil.writeFile("server.ip=" + ip, ServerIpUtil.getServerIpFile())
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start()
//execute flow
val spark = SparkSession.builder()
.master("local[12]")
.appName("hive")
.config("spark.driver.memory", "4g")
.config("spark.executor.memory", "8g")
.config("spark.cores.max", "8")
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "")
.bind("debug.path","")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
}

View File

@ -0,0 +1,58 @@
package cn.piflow.bundle.hdfs
import java.net.InetAddress
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.{PropertyUtil, ServerIpUtil}
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
class PutHdfsTest {
@Test
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/hdfs/putHdfs.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val ip = InetAddress.getLocalHost.getHostAddress
cn.piflow.util.FileUtil.writeFile("server.ip=" + ip, ServerIpUtil.getServerIpFile())
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start()
//execute flow
val spark = SparkSession.builder()
.master("local[12]")
.appName("hive")
.config("spark.driver.memory", "4g")
.config("spark.executor.memory", "8g")
.config("spark.cores.max", "8")
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "")
.bind("debug.path","")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
}

View File

@ -0,0 +1,58 @@
package cn.piflow.bundle.hdfs
import java.net.InetAddress
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.{PropertyUtil, ServerIpUtil}
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
class SaveToHdfsTest {
@Test
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/hdfs/saveToHdfs.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val ip = InetAddress.getLocalHost.getHostAddress
cn.piflow.util.FileUtil.writeFile("server.ip=" + ip, ServerIpUtil.getServerIpFile())
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start()
//execute flow
val spark = SparkSession.builder()
.master("local[12]")
.appName("hive")
.config("spark.driver.memory", "4g")
.config("spark.executor.memory", "8g")
.config("spark.cores.max", "8")
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "")
.bind("debug.path","")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
}

View File

@ -0,0 +1,58 @@
package cn.piflow.bundle.hdfs
import java.net.InetAddress
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.{PropertyUtil, ServerIpUtil}
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
class SelectFilesByNameTest {
@Test
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/hdfs/selectFileByName.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val ip = InetAddress.getLocalHost.getHostAddress
cn.piflow.util.FileUtil.writeFile("server.ip=" + ip, ServerIpUtil.getServerIpFile())
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start()
//execute flow
val spark = SparkSession.builder()
.master("local[12]")
.appName("hive")
.config("spark.driver.memory", "4g")
.config("spark.executor.memory", "8g")
.config("spark.cores.max", "8")
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "")
.bind("debug.path","")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
}

View File

@ -0,0 +1,58 @@
package cn.piflow.bundle.hdfs
import java.net.InetAddress
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.{PropertyUtil, ServerIpUtil}
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
class UnzipFilesonHdfsTest {
@Test
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/hdfs/unzipFilesOnHdfs.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val ip = InetAddress.getLocalHost.getHostAddress
cn.piflow.util.FileUtil.writeFile("server.ip=" + ip, ServerIpUtil.getServerIpFile())
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start()
//execute flow
val spark = SparkSession.builder()
.master("local[12]")
.appName("hive")
.config("spark.driver.memory", "4g")
.config("spark.executor.memory", "8g")
.config("spark.cores.max", "8")
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "")
.bind("debug.path","")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
}

View File

@ -97,8 +97,22 @@ object ConfigureUtil {
piflowBundleJar piflowBundleJar
} }
def getYarnResourceManagerAPI() : String = {
var yarnURL = PropertyUtil.getPropertyValue("yarn.url")
if(yarnURL == null){
var port = "8088"
val yarnHostName = PropertyUtil.getPropertyValue("yarn.resourcemanager.hostname")
if(PropertyUtil.getPropertyValue("yarn.resourcemanager.webapp.address.port") != null){
port = PropertyUtil.getPropertyValue("yarn.resourcemanager.webapp.address.port")
}
yarnURL = "http://" + yarnHostName + ":" + port
}
val yarnAPI = yarnURL + "/ws/v1/cluster/"
yarnAPI
}
def getYarnResourceManagerWebAppAddress() : String = { def getYarnResourceManagerWebAppAddress() : String = {
var yarnResourceManagerWebAppAddress = PropertyUtil.getPropertyValue("yarn.url") /*var yarnResourceManagerWebAppAddress = PropertyUtil.getPropertyValue("yarn.url")
if(yarnResourceManagerWebAppAddress == null){ if(yarnResourceManagerWebAppAddress == null){
var port = "8088" var port = "8088"
val yarnHostName = PropertyUtil.getPropertyValue("yarn.resourcemanager.hostname") val yarnHostName = PropertyUtil.getPropertyValue("yarn.resourcemanager.hostname")
@ -106,8 +120,25 @@ object ConfigureUtil {
port = PropertyUtil.getPropertyValue("yarn.resourcemanager.webapp.address.port") port = PropertyUtil.getPropertyValue("yarn.resourcemanager.webapp.address.port")
} }
yarnResourceManagerWebAppAddress = "http://" + yarnHostName + ":" + port + "/ws/v1/cluster/apps/" yarnResourceManagerWebAppAddress = "http://" + yarnHostName + ":" + port + "/ws/v1/cluster/apps/"
}*/
val yarnAPI = getYarnResourceManagerAPI()
val webAppAddress = yarnAPI + "apps" + "/"
webAppAddress
} }
yarnResourceManagerWebAppAddress
def getYarnResourceMatrics(): String = {
/*var yarnResourceManagerWebAppAddress = PropertyUtil.getPropertyValue("yarn.url")
if(yarnResourceManagerWebAppAddress == null){
var port = "8088"
val yarnHostName = PropertyUtil.getPropertyValue("yarn.resourcemanager.hostname")
if(PropertyUtil.getPropertyValue("yarn.resourcemanager.webapp.address.port") != null){
port = PropertyUtil.getPropertyValue("yarn.resourcemanager.webapp.address.port")
}
yarnResourceManagerWebAppAddress = "http://" + yarnHostName + ":" + port + "/ws/v1/cluster/matrics/"
}*/
val yarnAPI = getYarnResourceManagerAPI()
val matrics = yarnAPI + "metrics"
matrics
} }
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {

View File

@ -12,6 +12,11 @@ object FileUtil {
def writeFile(text: String, path: String) = { def writeFile(text: String, path: String) = {
val file = new File(path)
if(!file.exists()){
file.createNewFile()
}
val writer = new PrintWriter(new File(path)) val writer = new PrintWriter(new File(path))
writer.write(text) writer.write(text)
writer.close() writer.close()

View File

@ -29,8 +29,6 @@ object FlowLauncher {
.setDeployMode(PropertyUtil.getPropertyValue("spark.deploy.mode")) .setDeployMode(PropertyUtil.getPropertyValue("spark.deploy.mode"))
.setAppResource(ConfigureUtil.getPiFlowBundlePath()) .setAppResource(ConfigureUtil.getPiFlowBundlePath())
.setVerbose(true) .setVerbose(true)
//.setConf("spark.jars", PropertyUtil.getPropertyValue("piflow.bundle"))
//.setConf("spark.hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.setConf("spark.driver.memory", flow.getDriverMemory()) .setConf("spark.driver.memory", flow.getDriverMemory())
.setConf("spark.executor.instances", flow.getExecutorNum()) .setConf("spark.executor.instances", flow.getExecutorNum())
.setConf("spark.executor.memory", flow.getExecutorMem()) .setConf("spark.executor.memory", flow.getExecutorMem())
@ -40,24 +38,9 @@ object FlowLauncher {
.setMainClass("cn.piflow.api.StartFlowMain") .setMainClass("cn.piflow.api.StartFlowMain")
.addAppArgs(flowJson) .addAppArgs(flowJson)
/*if (PropertyUtil.getPropertyValue("hive.metastore.uris") != null){
sparkLauncher.setConf("spark.hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
}*/
if(PropertyUtil.getPropertyValue("yarn.resourcemanager.hostname") != null) if(PropertyUtil.getPropertyValue("yarn.resourcemanager.hostname") != null)
sparkLauncher.setConf("spark.hadoop.yarn.resourcemanager.hostname", PropertyUtil.getPropertyValue("yarn.resourcemanager.hostname")) sparkLauncher.setConf("spark.hadoop.yarn.resourcemanager.hostname", PropertyUtil.getPropertyValue("yarn.resourcemanager.hostname"))
/*if(PropertyUtil.getPropertyValue("yarn.resourcemanager.address") != null)
sparkLauncher.setConf("spark.hadoop.yarn.resourcemanager.address", PropertyUtil.getPropertyValue("yarn.resourcemanager.address"))
if(PropertyUtil.getPropertyValue("spark.yarn.access.namenode") != null)
sparkLauncher.setConf("spark.yarn.access.namenode", PropertyUtil.getPropertyValue("spark.yarn.access.namenode"))
else
sparkLauncher.setConf("spark.yarn.access.namenode", PropertyUtil.getPropertyValue("fs.defaultFS"))
if(PropertyUtil.getPrope rtyValue("yarn.stagingDir") != null)
sparkLauncher.setConf("spark.yarn.stagingDir", PropertyUtil.getPropertyValue("yarn.stagingDir"))
if(PropertyUtil.getPropertyValue("yarn.jars") != null)
sparkLauncher.setConf("spark.yarn.jars", PropertyUtil.getPropertyValue("yarn.jars"))*/
//add other jars for application //add other jars for application
val classPath = PropertyUtil.getClassPath() val classPath = PropertyUtil.getClassPath()

View File

@ -214,4 +214,16 @@ object HdfsUtil {
result result
} }
def getCapacity() : Map[String, Any] = {
val hdfsURL = PropertyUtil.getPropertyValue("fs.defaultFS")
val conf = new Configuration()
val fileSystem = FileSystem.get(new URI(hdfsURL),conf)
val fsStatus = fileSystem.getStatus
val capacity = fsStatus.getCapacity
val remaining = fsStatus.getRemaining
val used = fsStatus.getUsed
val map = Map("capacity" -> capacity, "remaining" -> remaining, "used" -> used)
map
}
} }

View File

@ -1,6 +1,6 @@
package cn.piflow.util package cn.piflow.util
import java.io.{FileInputStream, InputStream} import java.io.{File, FileInputStream, InputStream}
import java.net.InetAddress import java.net.InetAddress
import java.util.Properties import java.util.Properties
@ -13,6 +13,10 @@ object ServerIpUtil {
val userDir = System.getProperty("user.dir") val userDir = System.getProperty("user.dir")
path = userDir + "/server.ip" path = userDir + "/server.ip"
val file = new File(path)
if(!file.exists()){
file.createNewFile()
}
prop.load(new FileInputStream(path)) prop.load(new FileInputStream(path))
} catch{ } catch{
case ex: Exception => ex.printStackTrace() case ex: Exception => ex.printStackTrace()

View File

@ -22,6 +22,42 @@ import scala.util.parsing.json.JSON
object API { object API {
def getResourceInfo() : String = {
try{
val matricsURL = ConfigureUtil.getYarnResourceMatrics()
val client = HttpClients.createDefault()
val get:HttpGet = new HttpGet(matricsURL)
val response:CloseableHttpResponse = client.execute(get)
val entity = response.getEntity
val str = EntityUtils.toString(entity,"UTF-8")
val yarnInfo = OptionUtil.getAny(JSON.parseFull(str)).asInstanceOf[Map[String, Any]]
val matricInfo = MapUtil.get(yarnInfo, "clusterMetrics").asInstanceOf[Map[String, Any]]
val cpuInfo = Map(
"totalVirtualCores" -> matricInfo.getOrElse("totalVirtualCores",""),
"allocatedVirtualCores" -> matricInfo.getOrElse("allocatedVirtualCores",""),
"reservedVirtualCores" -> matricInfo.getOrElse("reservedVirtualCores","")
)
val memoryInfo = Map(
"totalMB" -> matricInfo.getOrElse("totalMB",""),
"allocatedMB" -> matricInfo.getOrElse("allocatedMB",""),
"reservedMB" -> matricInfo.getOrElse("reservedMB","")
)
val hdfsInfo = HdfsUtil.getCapacity()
val map = Map("cpu" -> cpuInfo, "memory" -> memoryInfo, "hdfs" -> hdfsInfo)
val resultMap = Map("resource" -> map)
JsonUtil.format(JsonUtil.toJson(resultMap))
}catch{
case ex:Exception => ""
}
}
def getScheduleInfo(scheduleId : String) : String = { def getScheduleInfo(scheduleId : String) : String = {
val scheduleInfo = H2Util.getScheduleInfo(scheduleId) val scheduleInfo = H2Util.getScheduleInfo(scheduleId)

View File

@ -0,0 +1,22 @@
package cn.piflow.api
import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet}
import org.apache.http.impl.client.HttpClients
import org.apache.http.util.EntityUtils
object HTTPClientGetResourceInfo {
def main(args: Array[String]): Unit = {
val url = "http://10.0.85.83:8001/resource/info"
val client = HttpClients.createDefault()
val getFlowDebugData:HttpGet = new HttpGet(url)
val response:CloseableHttpResponse = client.execute(getFlowDebugData)
val entity = response.getEntity
val str = EntityUtils.toString(entity,"UTF-8")
println("Code is " + str)
}
}

View File

@ -388,6 +388,17 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup
} }
} }
case HttpRequest(GET, Uri.Path("/resource/info"), headers, entity, protocol) =>{
val resourceInfo = API.getResourceInfo()
if (resourceInfo != ""){
Future.successful(HttpResponse(SUCCESS_CODE, entity = resourceInfo))
}else{
Future.successful(HttpResponse(FAIL_CODE, entity = "get resource info error!"))
}
}
case _: HttpRequest => case _: HttpRequest =>
Future.successful(HttpResponse(UNKNOWN_CODE, entity = "Unknown resource!")) Future.successful(HttpResponse(UNKNOWN_CODE, entity = "Unknown resource!"))
} }

View File

@ -51,6 +51,5 @@ clean package -Dmaven.test.skip=true -U
start flume agent: bin/flume-ng agent -n streamingAgent -c conf -f conf/streaming.conf -Dflume.root.logger=INFO,console start flume agent: bin/flume-ng agent -n streamingAgent -c conf -f conf/streaming.conf -Dflume.root.logger=INFO,console
7.socket text stream 7.socket text stream
nc -lk 9999 nc -lk 9999