Merge remote-tracking branch 'origin/master'

This commit is contained in:
or 2020-03-26 15:29:25 +08:00
commit dc341a2adb
31 changed files with 174 additions and 177 deletions

View File

@ -15,7 +15,7 @@
"name":"Distinct", "name":"Distinct",
"bundle":"cn.piflow.bundle.common.Distinct", "bundle":"cn.piflow.bundle.common.Distinct",
"properties":{ "properties":{
"fields":"id" "columnNames":"id"
} }
} }

View File

@ -15,7 +15,7 @@
"name":"ConvertSchema", "name":"ConvertSchema",
"bundle":"cn.piflow.bundle.common.DropField", "bundle":"cn.piflow.bundle.common.DropField",
"properties":{ "properties":{
"fields":"id" "columnNames":"id"
} }
} }

View File

@ -16,7 +16,7 @@
"bundle":"cn.piflow.bundle.common.ExecuteSQLStop", "bundle":"cn.piflow.bundle.common.ExecuteSQLStop",
"properties":{ "properties":{
"sql":"select * from temp where id = 0001", "sql":"select * from temp where id = 0001",
"tempViewName": "temp" "ViewName": "temp"
} }
} }
], ],

View File

@ -23,7 +23,7 @@
"name":"Join", "name":"Join",
"bundle":"cn.piflow.bundle.common.Join", "bundle":"cn.piflow.bundle.common.Join",
"properties":{ "properties":{
"correlationField": "id", "correlationColumn": "id",
"joinMode": "left" "joinMode": "left"
} }

View File

@ -15,7 +15,7 @@
"name":"SelectField", "name":"SelectField",
"bundle":"cn.piflow.bundle.common.SelectField", "bundle":"cn.piflow.bundle.common.SelectField",
"properties":{ "properties":{
"fields":"id" "columnNames":"id"
} }
} }

View File

Before

Width:  |  Height:  |  Size: 6.8 KiB

After

Width:  |  Height:  |  Size: 6.8 KiB

View File

@ -10,9 +10,9 @@ import org.apache.spark.sql.SparkSession
class EmailClean extends ConfigurableStop{ class EmailClean extends ConfigurableStop{
val authorEmail: String = "songdongze@cnic.cn" val authorEmail: String = "songdongze@cnic.cn"
val description: String = "Clean email format data." val description: String = "Cleaning data in email format"
val inportList: List[String] = List(Port.DefaultPort.toString) val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.DefaultPort.toString) val outportList: List[String] = List(Port.DefaultPort)
var columnName:String= _ var columnName:String= _
@ -54,7 +54,6 @@ class EmailClean extends ConfigurableStop{
out.write(dfNew1) out.write(dfNew1)
} }
def initialize(ctx: ProcessContext): Unit = { def initialize(ctx: ProcessContext): Unit = {
} }
@ -69,8 +68,8 @@ class EmailClean extends ConfigurableStop{
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val columnName = new PropertyDescriptor() val columnName = new PropertyDescriptor()
.name("columnName") .name("columnName")
.displayName("Column_Name") .displayName("Column Name")
.description("The columnName you want to clean,Multiple are separated by commas") .description("Column names are what you want to clean,multiple column names are separated by commas")
.defaultValue("") .defaultValue("")
.required(true) .required(true)
.example("email") .example("email")

View File

@ -13,9 +13,9 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
class IdentityNumberClean extends ConfigurableStop{ class IdentityNumberClean extends ConfigurableStop{
val authorEmail: String = "06whuxx@163.com" val authorEmail: String = "06whuxx@163.com"
val description: String = "Clean Id Card data." val description: String = "Cleaning data in ID format"
val inportList: List[String] = List(Port.DefaultPort.toString) val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.DefaultPort.toString) val outportList: List[String] = List(Port.DefaultPort)
var columnName:String=_ var columnName:String=_
@ -72,7 +72,7 @@ class IdentityNumberClean extends ConfigurableStop{
val columnName = new PropertyDescriptor() val columnName = new PropertyDescriptor()
.name("columnName") .name("columnName")
.displayName("Column_Name") .displayName("Column_Name")
.description("The columnName you want to clean,Multiple are separated by commas") .description("Column names are what you want to clean,multiple column names are separated by commas")
.defaultValue("") .defaultValue("")
.required(true) .required(true)
.example("idcard") .example("idcard")

View File

@ -11,9 +11,9 @@ import org.apache.spark.sql.types.StructField
class PhoneNumberClean extends ConfigurableStop{ class PhoneNumberClean extends ConfigurableStop{
val authorEmail: String = "06whuxx@163.com" val authorEmail: String = "06whuxx@163.com"
val description: String = "Clean phone number format data." val description: String = "Cleaning data in mobile number format"
val inportList: List[String] = List(Port.DefaultPort.toString) val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.DefaultPort.toString) val outportList: List[String] = List(Port.DefaultPort)
var columnName:String=_ var columnName:String=_
@ -72,7 +72,7 @@ class PhoneNumberClean extends ConfigurableStop{
val columnName = new PropertyDescriptor() val columnName = new PropertyDescriptor()
.name("columnName") .name("columnName")
.displayName("Column_Name") .displayName("Column_Name")
.description("The columnName you want to clean,Multiple are separated by commas") .description("Column names are what you want to clean,multiple column names are separated by commas")
.defaultValue("") .defaultValue("")
.required(true) .required(true)
.example("phonenumber") .example("phonenumber")

View File

@ -9,9 +9,9 @@ import org.apache.spark.sql.SparkSession
class ProvinceClean extends ConfigurableStop{ class ProvinceClean extends ConfigurableStop{
val authorEmail: String = "songdongze@cnic.cn" val authorEmail: String = "songdongze@cnic.cn"
val description: String = "Clean email format data." val description: String = "Cleaning province data"
val inportList: List[String] = List(Port.DefaultPort.toString) val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.DefaultPort.toString) val outportList: List[String] = List(Port.DefaultPort)
var columnName:String=_ var columnName:String=_
@ -69,7 +69,7 @@ class ProvinceClean extends ConfigurableStop{
val columnName = new PropertyDescriptor() val columnName = new PropertyDescriptor()
.name("columnName") .name("columnName")
.displayName("Column_Name") .displayName("Column_Name")
.description("The columnName you want to clean,Multiple are separated by commas") .description("Column names are what you want to clean,multiple column names are separated by commas")
.defaultValue("") .defaultValue("")
.required(true) .required(true)
.example("province") .example("province")

View File

@ -10,9 +10,9 @@ import org.apache.spark.sql.types.StructField
class TitleClean extends ConfigurableStop{ class TitleClean extends ConfigurableStop{
val authorEmail: String = "06whuxx@163.com" val authorEmail: String = "06whuxx@163.com"
val description: String = "Clean title format data." val description: String = "Cleaning title data"
val inportList: List[String] = List(Port.DefaultPort.toString) val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.DefaultPort.toString) val outportList: List[String] = List(Port.DefaultPort)
var columnName:String=_ var columnName:String=_
@ -65,15 +65,16 @@ class TitleClean extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val columnName = new PropertyDescriptor() val columnName = new PropertyDescriptor()
.name("columnName") .name("columnName")
.displayName("Column_Name") .displayName("Column_Name")
.description("The columnName you want to clean,Multiple are separated by commas") .description("Column names are what you want to clean,multiple column names are separated by commas")
.defaultValue("") .defaultValue("")
.required(true) .required(true)
.example("title") .example("title")
descriptor = columnName :: descriptor descriptor = columnName :: descriptor
descriptor descriptor
} }

View File

@ -9,14 +9,11 @@ import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.{DataFrame, SparkSession}
class AddUUIDStop extends ConfigurableStop{ class AddUUIDStop extends ConfigurableStop{
override val authorEmail: String = "ygang@cnic.cn" override val authorEmail: String = "ygang@cnic.cn"
override val description: String = "Add UUID column" override val description: String = "Add UUID column"
override val inportList: List[String] =List(Port.DefaultPort.toString) override val inportList: List[String] =List(Port.DefaultPort)
override val outportList: List[String] = List(Port.DefaultPort.toString) override val outportList: List[String] = List(Port.DefaultPort)
var column:String=_ var column:String=_
@ -24,11 +21,10 @@ class AddUUIDStop extends ConfigurableStop{
val spark = pec.get[SparkSession]() val spark = pec.get[SparkSession]()
var df = in.read() var df = in.read()
spark.udf.register("generateUUID",()=>UUID.randomUUID().toString.replace("-",""))
spark.udf.register("generaterUUID",()=>UUID.randomUUID().toString.replace("-",""))
df.createOrReplaceTempView("temp") df.createOrReplaceTempView("temp")
df = spark.sql(s"select generaterUUID() as ${column},* from temp") df = spark.sql(s"select generateUUID() as ${column},* from temp")
out.write(df) out.write(df)
@ -44,9 +40,10 @@ class AddUUIDStop extends ConfigurableStop{
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val column = new PropertyDescriptor().name("column") val column = new PropertyDescriptor()
.name("column")
.displayName("Column") .displayName("Column")
.description("The column is you want to add uuid column's name,") .description("The column is the name of the uuid you want to add")
.defaultValue("uuid") .defaultValue("uuid")
.required(true) .required(true)
.example("uuid") .example("uuid")

View File

@ -10,8 +10,8 @@ class ConvertSchema extends ConfigurableStop {
val authorEmail: String = "yangqidong@cnic.cn" val authorEmail: String = "yangqidong@cnic.cn"
val description: String = "Change field name" val description: String = "Change field name"
val inportList: List[String] = List(Port.DefaultPort.toString) val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.DefaultPort.toString) val outportList: List[String] = List(Port.DefaultPort)
var schema:String = _ var schema:String = _
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
@ -40,7 +40,7 @@ class ConvertSchema extends ConfigurableStop {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val inports = new PropertyDescriptor().name("schema") val inports = new PropertyDescriptor().name("schema")
.displayName("Schema") .displayName("Schema")
.description("Change field name, you can write oldField1 -> newField1, Multiple separated by commas, Such as 'oldField1->newField1, oldField2->newField2' ") .description("Change column names,multiple column names are separated by commas")
.defaultValue("") .defaultValue("")
.required(true) .required(true)
.example("id->uuid") .example("id->uuid")
@ -53,7 +53,7 @@ class ConvertSchema extends ConfigurableStop {
} }
override def getGroup(): List[String] = { override def getGroup(): List[String] = {
List(StopGroup.CommonGroup.toString) List(StopGroup.CommonGroup)
} }
} }

View File

@ -8,22 +8,23 @@ import org.apache.spark.sql.DataFrame
class Distinct extends ConfigurableStop{ class Distinct extends ConfigurableStop{
override val authorEmail: String = "yangqidong@cnic.cn" override val authorEmail: String = "yangqidong@cnic.cn"
override val description: String = "De duplicate data according to all fields or specified fields " override val description: String = "Duplicate based on the specified column name or all colume names"
override val inportList: List[String] =List(Port.DefaultPort.toString) override val inportList: List[String] =List(Port.DefaultPort)
override val outportList: List[String] = List(Port.DefaultPort.toString) override val outportList: List[String] = List(Port.DefaultPort)
var fields:String=_ var columnNames:String=_
override def setProperties(map: Map[String, Any]): Unit = { override def setProperties(map: Map[String, Any]): Unit = {
fields = MapUtil.get(map,"fields").asInstanceOf[String] columnNames = MapUtil.get(map,"columnNames").asInstanceOf[String]
} }
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val fields = new PropertyDescriptor().name("fields") val fields = new PropertyDescriptor()
.displayName("Fields") .name("columnNames")
.description("De duplicate data according to all fields or specified fields,Multiple separated by commas ; If not, all fields will be de duplicated") .displayName("ColumnNames")
.description("Fill in the column names you want to duplicate,multiple columns names separated by commas,if not,all the columns will be deduplicated")
.defaultValue("") .defaultValue("")
.required(false) .required(false)
.example("id") .example("id")
@ -37,7 +38,7 @@ class Distinct extends ConfigurableStop{
} }
override def getGroup(): List[String] = { override def getGroup(): List[String] = {
List(StopGroup.CommonGroup.toString) List(StopGroup.CommonGroup)
} }
@ -48,8 +49,8 @@ class Distinct extends ConfigurableStop{
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val inDf: DataFrame = in.read() val inDf: DataFrame = in.read()
var outDf: DataFrame = null var outDf: DataFrame = null
if(fields.length > 0){ if(columnNames.length > 0){
val fileArr: Array[String] = fields.split(",") val fileArr: Array[String] = columnNames.split(",")
outDf = inDf.dropDuplicates(fileArr) outDf = inDf.dropDuplicates(fileArr)
}else{ }else{
outDf = inDf.distinct() outDf = inDf.distinct()

View File

@ -9,16 +9,16 @@ import cn.piflow.conf.util.{ImageUtil, MapUtil}
class DropField extends ConfigurableStop { class DropField extends ConfigurableStop {
val authorEmail: String = "ygang@cnic.cn" val authorEmail: String = "ygang@cnic.cn"
val description: String = "Delete fields in schema" val description: String = "Delete one or more columns"
val inportList: List[String] = List(Port.DefaultPort.toString) val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.DefaultPort.toString) val outportList: List[String] = List(Port.DefaultPort)
var fields:String = _ var columnNames:String = _
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
var df = in.read() var df = in.read()
val field = fields.split(",") val field = columnNames.split(",")
for( x <- 0 until field.size){ for( x <- 0 until field.size){
df = df.drop(field(x)) df = df.drop(field(x))
} }
@ -31,13 +31,15 @@ class DropField extends ConfigurableStop {
} }
def setProperties(map : Map[String, Any]): Unit = { def setProperties(map : Map[String, Any]): Unit = {
fields = MapUtil.get(map,"fields").asInstanceOf[String] columnNames = MapUtil.get(map,"columnNames").asInstanceOf[String]
} }
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val inports = new PropertyDescriptor().name("fields").displayName("Fields") val inports = new PropertyDescriptor()
.description("Delete fields in schema,multiple are separated by commas") .name("columnNames")
.displayName("ColumnNames")
.description("Fill in the columns you want to delete,multiple columns names separated by commas")
.defaultValue("") .defaultValue("")
.required(true) .required(true)
.example("id") .example("id")
@ -46,11 +48,11 @@ class DropField extends ConfigurableStop {
} }
override def getIcon(): Array[Byte] = { override def getIcon(): Array[Byte] = {
ImageUtil.getImage("icon/common/DropField.png") ImageUtil.getImage("icon/common/DropColumnNames.png")
} }
override def getGroup(): List[String] = { override def getGroup(): List[String] = {
List(StopGroup.CommonGroup.toString) List(StopGroup.CommonGroup)
} }
} }

View File

@ -16,18 +16,18 @@ class ExecuteSQLStop extends ConfigurableStop{
val authorEmail: String = "ygang@cnic.cn" val authorEmail: String = "ygang@cnic.cn"
val description: String = "Create temporary view table to execute sql" val description: String = "Create temporary view table to execute sql"
val inportList: List[String] = List(Port.DefaultPort.toString) val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.DefaultPort.toString) val outportList: List[String] = List(Port.DefaultPort)
var sql: String = _ var sql: String = _
var tempViewName: String = _ var ViewName: String = _
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]() val spark = pec.get[SparkSession]()
val inDF = in.read() val inDF = in.read()
inDF.createOrReplaceTempView(tempViewName) inDF.createOrReplaceTempView(ViewName)
val frame: DataFrame = spark.sql(sql) val frame: DataFrame = spark.sql(sql)
out.write(frame) out.write(frame)
@ -36,7 +36,7 @@ class ExecuteSQLStop extends ConfigurableStop{
override def setProperties(map: Map[String, Any]): Unit = { override def setProperties(map: Map[String, Any]): Unit = {
sql = MapUtil.get(map,"sql").asInstanceOf[String] sql = MapUtil.get(map,"sql").asInstanceOf[String]
tempViewName = MapUtil.get(map,"tempViewName").asInstanceOf[String] ViewName = MapUtil.get(map,"ViewName").asInstanceOf[String]
} }
override def initialize(ctx: ProcessContext): Unit = { override def initialize(ctx: ProcessContext): Unit = {
@ -52,15 +52,15 @@ class ExecuteSQLStop extends ConfigurableStop{
.example("select * from temp") .example("select * from temp")
descriptor = sql :: descriptor descriptor = sql :: descriptor
val tableName = new PropertyDescriptor() val ViewName = new PropertyDescriptor()
.name("tempViewName") .name("viewName")
.displayName("TempViewName") .displayName("ViewName")
.description(" Temporary view table") .description("Name of the temporary view table")
.defaultValue("temp") .defaultValue("temp")
.required(true) .required(true)
.example("temp") .example("temp")
descriptor = tableName :: descriptor descriptor = ViewName :: descriptor
descriptor descriptor
} }
@ -69,7 +69,7 @@ class ExecuteSQLStop extends ConfigurableStop{
} }
override def getGroup(): List[String] = { override def getGroup(): List[String] = {
List(StopGroup.CommonGroup.toString) List(StopGroup.CommonGroup)
} }

View File

@ -8,7 +8,7 @@ import org.apache.spark.sql.{Column, DataFrame}
class Filter extends ConfigurableStop{ class Filter extends ConfigurableStop{
override val authorEmail: String = "xjzhu@cnic.cn" override val authorEmail: String = "xjzhu@cnic.cn"
override val description: String = "Do filter by condition" override val description: String = "Filter by condition"
override val inportList: List[String] = List(Port.DefaultPort) override val inportList: List[String] = List(Port.DefaultPort)
override val outportList: List[String] = List(Port.DefaultPort) override val outportList: List[String] = List(Port.DefaultPort)

View File

@ -9,9 +9,9 @@ import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
class Fork extends ConfigurableStop{ class Fork extends ConfigurableStop{
val authorEmail: String = "xjzhu@cnic.cn" val authorEmail: String = "xjzhu@cnic.cn"
val description: String = "Fork data into different stops" val description: String = "Forking data to different stops"
val inportList: List[String] = List(Port.DefaultPort.toString) val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.AnyPort.toString) val outportList: List[String] = List(Port.AnyPort)
var outports : List[String] = _ var outports : List[String] = _
@ -33,7 +33,7 @@ class Fork extends ConfigurableStop{
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val outports = new PropertyDescriptor().name("outports") val outports = new PropertyDescriptor().name("outports")
.displayName("outports") .displayName("outports")
.description("outports string, seperated by ,.") .description("Output ports string with comma")
.defaultValue("") .defaultValue("")
.required(true) .required(true)
descriptor = outports :: descriptor descriptor = outports :: descriptor
@ -45,6 +45,6 @@ class Fork extends ConfigurableStop{
} }
override def getGroup(): List[String] = { override def getGroup(): List[String] = {
List(StopGroup.CommonGroup.toString) List(StopGroup.CommonGroup)
} }
} }

View File

@ -8,12 +8,12 @@ import org.apache.spark.sql.{Column, DataFrame}
class Join extends ConfigurableStop{ class Join extends ConfigurableStop{
override val authorEmail: String = "yangqidong@cnic.cn" override val authorEmail: String = "yangqidong@cnic.cn"
override val description: String = "Table connection, including full connection, left connection, right connection and inner connection" override val description: String = "Table joins include full join, left join, right join and inner join"
override val inportList: List[String] =List(Port.LeftPort.toString,Port.RightPort.toString) override val inportList: List[String] =List(Port.LeftPort,Port.RightPort)
override val outportList: List[String] = List(Port.DefaultPort.toString) override val outportList: List[String] = List(Port.DefaultPort)
var joinMode:String=_ var joinMode:String=_
var correlationField:String=_ var correlationColumn:String=_
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
@ -21,7 +21,7 @@ class Join extends ConfigurableStop{
val rightDF = in.read(Port.RightPort) val rightDF = in.read(Port.RightPort)
var seq: Seq[String]= Seq() var seq: Seq[String]= Seq()
correlationField.split(",").foreach(x=>{ correlationColumn.split(",").foreach(x=>{
seq = seq .++(Seq(x.toString)) seq = seq .++(Seq(x.toString))
}) })
@ -37,27 +37,30 @@ class Join extends ConfigurableStop{
override def setProperties(map: Map[String, Any]): Unit = { override def setProperties(map: Map[String, Any]): Unit = {
joinMode = MapUtil.get(map,"joinMode").asInstanceOf[String] joinMode = MapUtil.get(map,"joinMode").asInstanceOf[String]
correlationField = MapUtil.get(map,"correlationField").asInstanceOf[String] correlationColumn = MapUtil.get(map,"correlationColumn").asInstanceOf[String]
} }
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val joinMode = new PropertyDescriptor().name("joinMode") val joinMode = new PropertyDescriptor()
.displayName("joinMode") .name("joinMode")
.description("For table association, you can choose INNER, LEFT, RIGHT, FULL") .displayName("JoinMode")
.description("For table associations, you can choose inner,left,right,full")
.allowableValues(Set("inner","left","right","full_outer")) .allowableValues(Set("inner","left","right","full_outer"))
.defaultValue("inner") .defaultValue("inner")
.required(true) .required(true)
.example("left")
descriptor = joinMode :: descriptor descriptor = joinMode :: descriptor
val correlationField = new PropertyDescriptor() val correlationColumn = new PropertyDescriptor()
.name("correlationField") .name("correlationColumn")
.displayName("correlationField") .displayName("CorrelationColumn")
.description("Fields associated with tables,If there are more than one, please use , separate") .description("Columns associated with tables,if multiple are separated by commas")
.defaultValue("") .defaultValue("")
.required(true) .required(true)
descriptor = correlationField :: descriptor .example("id,name")
descriptor = correlationColumn :: descriptor
descriptor descriptor
} }
@ -67,7 +70,7 @@ class Join extends ConfigurableStop{
} }
override def getGroup(): List[String] = { override def getGroup(): List[String] = {
List(StopGroup.CommonGroup.toString) List(StopGroup.CommonGroup)
} }

View File

@ -5,14 +5,12 @@ import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil} import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import scala.beans.BeanProperty
class Merge extends ConfigurableStop{ class Merge extends ConfigurableStop{
val authorEmail: String = "xjzhu@cnic.cn" val authorEmail: String = "xjzhu@cnic.cn"
val description: String = "Merge data into one stop" val description: String = "Merge data into one stop"
val inportList: List[String] = List(Port.AnyPort.toString) val inportList: List[String] = List(Port.AnyPort)
val outportList: List[String] = List(Port.DefaultPort.toString) val outportList: List[String] = List(Port.DefaultPort)
var inports : List[String] = _ var inports : List[String] = _
@ -33,8 +31,8 @@ class Merge extends ConfigurableStop{
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val inports = new PropertyDescriptor() val inports = new PropertyDescriptor()
.name("inports") .name("inports")
.displayName("inports") .displayName("Inports")
.description("inports string, seperated by ,.") .description("Inports string are separated by commas")
.defaultValue("") .defaultValue("")
.required(true) .required(true)
descriptor = inports :: descriptor descriptor = inports :: descriptor
@ -46,7 +44,7 @@ class Merge extends ConfigurableStop{
} }
override def getGroup(): List[String] = { override def getGroup(): List[String] = {
List(StopGroup.CommonGroup.toString) List(StopGroup.CommonGroup)
} }
} }

View File

@ -8,15 +8,14 @@ import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
class Route extends ConfigurableStop{ class Route extends ConfigurableStop{
val authorEmail: String = "xjzhu@cnic.cn" val authorEmail: String = "xjzhu@cnic.cn"
val description: String = "Route data by customizedProperties, key is port & value is filter condition" val description: String = "Route data by custom properties,key is port,value is filter"
val inportList: List[String] = List(Port.DefaultPort) val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.RoutePort) val outportList: List[String] = List(Port.RoutePort)
override val isCustomized: Boolean = true override val isCustomized: Boolean = true
override def setProperties(map: Map[String, Any]): Unit = { override def setProperties(map: Map[String, Any]): Unit = {
//val outportStr = MapUtil.get(map,"outports").asInstanceOf[String]
//outports = outportStr.split(",").toList
} }
override def initialize(ctx: ProcessContext): Unit = { override def initialize(ctx: ProcessContext): Unit = {
@ -40,8 +39,7 @@ class Route extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
//val outports = new PropertyDescriptor().name("outports").displayName("outports").description("outports string, seperated by ,.").defaultValue("").required(true)
//descriptor = outports :: descriptor
descriptor descriptor
} }
@ -50,6 +48,6 @@ class Route extends ConfigurableStop{
} }
override def getGroup(): List[String] = { override def getGroup(): List[String] = {
List(StopGroup.CommonGroup.toString) List(StopGroup.CommonGroup)
} }
} }

View File

@ -12,16 +12,16 @@ import scala.beans.BeanProperty
class SelectField extends ConfigurableStop { class SelectField extends ConfigurableStop {
val authorEmail: String = "xjzhu@cnic.cn" val authorEmail: String = "xjzhu@cnic.cn"
val description: String = "Select data field" val description: String = "Select data column"
val inportList: List[String] = List(Port.DefaultPort.toString) val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.DefaultPort.toString) val outportList: List[String] = List(Port.DefaultPort)
var fields:String = _ var columnNames:String = _
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val df = in.read() val df = in.read()
val field = fields.split(",") val field = columnNames.split(",")
val columnArray : Array[Column] = new Array[Column](field.size) val columnArray : Array[Column] = new Array[Column](field.size)
for(i <- 0 to field.size - 1){ for(i <- 0 to field.size - 1){
columnArray(i) = new Column(field(i)) columnArray(i) = new Column(field(i))
@ -36,17 +36,18 @@ class SelectField extends ConfigurableStop {
} }
def setProperties(map : Map[String, Any]): Unit = { def setProperties(map : Map[String, Any]): Unit = {
fields = MapUtil.get(map,"fields").asInstanceOf[String] columnNames = MapUtil.get(map,"columnNames").asInstanceOf[String]
} }
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val inports = new PropertyDescriptor() val inports = new PropertyDescriptor()
.name("fields") .name("columnNames")
.displayName("Fields") .displayName("ColumnNames")
.description("The fields you want to select") .description("Select the column you want,multiple columns separated by commas")
.defaultValue("") .defaultValue("")
.required(true) .required(true)
.example("id,name")
descriptor = inports :: descriptor descriptor = inports :: descriptor
descriptor descriptor
} }

View File

@ -10,9 +10,9 @@ import org.apache.spark.sql.{DataFrame, Row, SparkSession}
class Subtract extends ConfigurableStop{ class Subtract extends ConfigurableStop{
override val authorEmail: String = "yangqidong@cnic.cn" override val authorEmail: String = "yangqidong@cnic.cn"
override val description: String = "Delete the data existing in the right table from the left table" override val description: String = "Delete the existing data in the right table from the left table"
override val inportList: List[String] =List(Port.LeftPort.toString,Port.RightPort.toString) override val inportList: List[String] =List(Port.LeftPort,Port.RightPort)
override val outportList: List[String] = List(Port.DefaultPort.toString) override val outportList: List[String] = List(Port.DefaultPort)
override def setProperties(map: Map[String, Any]): Unit = { override def setProperties(map: Map[String, Any]): Unit = {
} }
@ -28,7 +28,7 @@ class Subtract extends ConfigurableStop{
} }
override def getGroup(): List[String] = { override def getGroup(): List[String] = {
List(StopGroup.CommonGroup.toString) List(StopGroup.CommonGroup)
} }
override def initialize(ctx: ProcessContext): Unit = { override def initialize(ctx: ProcessContext): Unit = {

View File

@ -9,8 +9,8 @@ class Trager extends ConfigurableStop{
val authorEmail: String = "ygang@cnic.cn" val authorEmail: String = "ygang@cnic.cn"
val description: String = "Upstream and downstream middleware" val description: String = "Upstream and downstream middleware"
val inportList: List[String] = List(Port.AnyPort.toString) val inportList: List[String] = List(Port.AnyPort)
val outportList: List[String] = List(Port.AnyPort.toString) val outportList: List[String] = List(Port.AnyPort)
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
@ -35,7 +35,7 @@ class Trager extends ConfigurableStop{
} }
override def getGroup(): List[String] = { override def getGroup(): List[String] = {
List(StopGroup.CommonGroup.toString) List(StopGroup.CommonGroup)
} }
} }

View File

@ -12,8 +12,8 @@ class CsvParser extends ConfigurableStop{
val authorEmail: String = "xjzhu@cnic.cn" val authorEmail: String = "xjzhu@cnic.cn"
val description: String = "Parse csv file" val description: String = "Parse csv file"
val inportList: List[String] = List(Port.NonePort.toString) val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.DefaultPort.toString) val outportList: List[String] = List(Port.DefaultPort)
var csvPath: String = _ var csvPath: String = _
var header: Boolean = _ var header: Boolean = _
@ -72,7 +72,7 @@ class CsvParser extends ConfigurableStop{
.description("The path of csv file") .description("The path of csv file")
.defaultValue("") .defaultValue("")
.required(true) .required(true)
.example("") .example("hdfs://192.168.3.138:8020/test/")
descriptor = csvPath :: descriptor descriptor = csvPath :: descriptor
val header = new PropertyDescriptor() val header = new PropertyDescriptor()
@ -82,7 +82,7 @@ class CsvParser extends ConfigurableStop{
.defaultValue("false") .defaultValue("false")
.allowableValues(Set("true","false")) .allowableValues(Set("true","false"))
.required(true) .required(true)
.example("") .example("true")
descriptor = header :: descriptor descriptor = header :: descriptor
val delimiter = new PropertyDescriptor() val delimiter = new PropertyDescriptor()
@ -91,7 +91,7 @@ class CsvParser extends ConfigurableStop{
.description("The delimiter of csv file") .description("The delimiter of csv file")
.defaultValue("") .defaultValue("")
.required(true) .required(true)
.example("") .example(",")
descriptor = delimiter :: descriptor descriptor = delimiter :: descriptor
val schema = new PropertyDescriptor() val schema = new PropertyDescriptor()
@ -100,7 +100,7 @@ class CsvParser extends ConfigurableStop{
.description("The schema of csv file") .description("The schema of csv file")
.defaultValue("") .defaultValue("")
.required(false) .required(false)
.example("") .example("id,name,gender,age")
descriptor = schema :: descriptor descriptor = schema :: descriptor
descriptor descriptor

View File

@ -8,9 +8,9 @@ import org.apache.spark.sql.SaveMode
class CsvSave extends ConfigurableStop{ class CsvSave extends ConfigurableStop{
val authorEmail: String = "xjzhu@cnic.cn" val authorEmail: String = "xjzhu@cnic.cn"
val description: String = "Save data into csv file." val description: String = "Save the data as a csv file."
val inportList: List[String] = List(Port.DefaultPort.toString) val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.DefaultPort.toString) val outportList: List[String] = List(Port.DefaultPort)
var csvSavePath: String = _ var csvSavePath: String = _
var header: Boolean = _ var header: Boolean = _
@ -33,7 +33,7 @@ class CsvSave extends ConfigurableStop{
.description("The save path of csv file") .description("The save path of csv file")
.defaultValue("") .defaultValue("")
.required(true) .required(true)
.example("") .example("hdfs://192.168.3.138:8020/test/")
descriptor = csvSavePath :: descriptor descriptor = csvSavePath :: descriptor
val header = new PropertyDescriptor() val header = new PropertyDescriptor()
@ -42,6 +42,7 @@ class CsvSave extends ConfigurableStop{
.description("Whether the csv file has a header") .description("Whether the csv file has a header")
.defaultValue("") .defaultValue("")
.required(true) .required(true)
.example("true")
descriptor = header :: descriptor descriptor = header :: descriptor
val delimiter = new PropertyDescriptor() val delimiter = new PropertyDescriptor()
@ -50,16 +51,16 @@ class CsvSave extends ConfigurableStop{
.description("The delimiter of csv file") .description("The delimiter of csv file")
.defaultValue(",") .defaultValue(",")
.required(true) .required(true)
.example("") .example(",")
descriptor = delimiter :: descriptor descriptor = delimiter :: descriptor
val partition = new PropertyDescriptor() val partition = new PropertyDescriptor()
.name("partition") .name("partition")
.displayName("Partition") .displayName("Partition")
.description("The partition of csv file") .description("The partition of csv file")
.defaultValue("1") .defaultValue("")
.required(true) .required(true)
.example("") .example("3")
descriptor = partition :: descriptor descriptor = partition :: descriptor
descriptor descriptor
@ -79,6 +80,7 @@ class CsvSave extends ConfigurableStop{
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val df = in.read() val df = in.read()
df.repartition(partition.toInt).write df.repartition(partition.toInt).write
.format("csv") .format("csv")
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)

View File

@ -11,12 +11,12 @@ import org.apache.spark.sql.{DataFrame, Row, SparkSession}
class CsvStringParser extends ConfigurableStop{ class CsvStringParser extends ConfigurableStop{
override val authorEmail: String = "yangqidong@cnic.cn" override val authorEmail: String = "yangqidong@cnic.cn"
val inportList: List[String] = List(Port.NonePort.toString) val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.DefaultPort.toString) val outportList: List[String] = List(Port.DefaultPort)
override val description: String = "Parse csv string" override val description: String = "Parse csv string"
var Str:String=_ var str:String=_
var delimiter: String = _ var delimiter: String = _
var schema: String = _ var schema: String = _
@ -25,7 +25,7 @@ class CsvStringParser extends ConfigurableStop{
val session: SparkSession = pec.get[SparkSession] val session: SparkSession = pec.get[SparkSession]
val context: SparkContext = session.sparkContext val context: SparkContext = session.sparkContext
val arrStr: Array[String] = Str.split("\n") val arrStr: Array[String] = str.split("\n")
var num:Int=0 var num:Int=0
val listROW: List[Row] = arrStr.map(line => { val listROW: List[Row] = arrStr.map(line => {
@ -50,7 +50,7 @@ class CsvStringParser extends ConfigurableStop{
override def setProperties(map: Map[String, Any]): Unit = { override def setProperties(map: Map[String, Any]): Unit = {
Str = MapUtil.get(map,"Str").asInstanceOf[String] str = MapUtil.get(map,"String").asInstanceOf[String]
delimiter = MapUtil.get(map,"delimiter").asInstanceOf[String] delimiter = MapUtil.get(map,"delimiter").asInstanceOf[String]
schema = MapUtil.get(map,"schema").asInstanceOf[String] schema = MapUtil.get(map,"schema").asInstanceOf[String]
} }
@ -58,17 +58,17 @@ class CsvStringParser extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val Str = new PropertyDescriptor() val str = new PropertyDescriptor()
.name("Str") .name("Str")
.displayName("Str") .displayName("Str")
.defaultValue("") .defaultValue("")
.required(true) .required(true)
.example("") .example("")
descriptor = Str :: descriptor descriptor = str :: descriptor
val delimiter = new PropertyDescriptor() val delimiter = new PropertyDescriptor()
.name("delimiter") .name("delimiter")
.displayName("delimiter") .displayName("Delimiter")
.description("The delimiter of CSV string") .description("The delimiter of CSV string")
.defaultValue("") .defaultValue("")
.required(true) .required(true)
@ -77,7 +77,7 @@ class CsvStringParser extends ConfigurableStop{
val schema = new PropertyDescriptor() val schema = new PropertyDescriptor()
.name("schema") .name("schema")
.displayName("schema") .displayName("Schema")
.description("The schema of CSV string") .description("The schema of CSV string")
.defaultValue("") .defaultValue("")
.required(false) .required(false)

View File

@ -7,12 +7,12 @@ import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSession
class FetchEs extends ConfigurableStop { class FetchElasticsearch extends ConfigurableStop {
val authorEmail: String = "ygang@cnic.cn" val authorEmail: String = "ygang@cnic.cn"
val description: String = "Fetch data from Elasticsearch" val description: String = "Fetch data from Elasticsearch"
val inportList: List[String] = List(Port.NonePort.toString) val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.DefaultPort.toString) val outportList: List[String] = List(Port.DefaultPort)
var es_nodes : String = _ var es_nodes : String = _
var es_port : String = _ var es_port : String = _

View File

@ -7,12 +7,12 @@ import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSession
import org.elasticsearch.spark.sql.EsSparkSQL import org.elasticsearch.spark.sql.EsSparkSQL
class PutEs extends ConfigurableStop { class PutElasticsearch extends ConfigurableStop {
val authorEmail: String = "ygang@cnic.cn" val authorEmail: String = "ygang@cnic.cn"
val description: String = "Put data into Elasticsearch" val description: String = "Put data into Elasticsearch"
val inportList: List[String] = List(Port.DefaultPort.toString) val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.NonePort.toString) val outportList: List[String] = List(Port.DefaultPort)
var es_nodes : String = _ var es_nodes : String = _
var es_port : String = _ var es_port : String = _
@ -20,7 +20,6 @@ class PutEs extends ConfigurableStop {
var es_type : String = _ var es_type : String = _
var configuration_item:String = _ var configuration_item:String = _
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]() val spark = pec.get[SparkSession]()
val inDf = in.read() val inDf = in.read()

View File

@ -6,12 +6,12 @@ import cn.piflow.conf.{ConfigurableStop, Port, StopGroup}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSession
class QueryEs extends ConfigurableStop { class QueryElasticsearch extends ConfigurableStop {
val authorEmail: String = "ygang@cnic.cn" val authorEmail: String = "ygang@cnic.cn"
val description: String = "Query data from Elasticsearch" val description: String = "Query data from Elasticsearch"
val inportList: List[String] = List(Port.NonePort.toString) val inportList: List[String] = List(Port.NonePort)
val outportList: List[String] = List(Port.DefaultPort.toString) val outportList: List[String] = List(Port.DefaultPort)
var es_nodes : String = _ var es_nodes : String = _
var es_port : String = _ var es_port : String = _

View File

@ -6,13 +6,11 @@ import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.conf._ import cn.piflow.conf._
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSession
class RegexTextProcess extends ConfigurableStop{ class RegexTextProcess extends ConfigurableStop{
val authorEmail: String = "06whuxx@163.com" val authorEmail: String = "06whuxx@163.com"
val description: String = "Use regex to replace text" val description: String = "Replace values in a column with regex"
val inportList: List[String] = List(Port.DefaultPort.toString) val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.DefaultPort.toString) val outportList: List[String] = List(Port.DefaultPort)
var regex:String =_ var regex:String =_
var columnName:String=_ var columnName:String=_
@ -28,11 +26,9 @@ class RegexTextProcess extends ConfigurableStop{
sqlContext.udf.register("regexPro",(str:String)=>str.replaceAll(regexText,replaceText)) sqlContext.udf.register("regexPro",(str:String)=>str.replaceAll(regexText,replaceText))
val sqlText:String="select *,regexPro("+columnName+") as "+columnName+"_new from thesis" val sqlText:String="select *,regexPro("+columnName+") as "+columnName+"_new from thesis"
val dfNew=sqlContext.sql(sqlText) val dfNew=sqlContext.sql(sqlText)
//dfNew.show()
out.write(dfNew) out.write(dfNew)
} }
def initialize(ctx: ProcessContext): Unit = { def initialize(ctx: ProcessContext): Unit = {
} }
@ -47,31 +43,31 @@ class RegexTextProcess extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val regex = new PropertyDescriptor().name("regex") val regex = new PropertyDescriptor()
.name("regex")
.displayName("Regex") .displayName("Regex")
.description("regex") .description("regex")
.defaultValue("") .defaultValue("")
.required(true) .required(true)
.example("") .example("0001")
descriptor = regex :: descriptor
val columnName = new PropertyDescriptor() val columnName = new PropertyDescriptor()
.name("columnName") .name("columnName")
.displayName("ColumnName") .displayName("ColumnName")
.description("Field name of schema") .description("The columns you want to replace")
.defaultValue("") .defaultValue("")
.required(true) .required(true)
.example("") .example("id")
descriptor = columnName :: descriptor
val replaceStr = new PropertyDescriptor() val replaceStr = new PropertyDescriptor()
.name("replaceStr") .name("replaceStr")
.displayName("ReplaceStr") .displayName("ReplaceStr")
.description("Replaced string") .description("Value after replacement")
.defaultValue("") .defaultValue("")
.required(true) .required(true)
.example("") .example("1111")
descriptor = regex :: descriptor
descriptor = columnName :: descriptor
descriptor = replaceStr :: descriptor descriptor = replaceStr :: descriptor
descriptor descriptor
} }
@ -81,6 +77,6 @@ class RegexTextProcess extends ConfigurableStop{
} }
override def getGroup(): List[String] = { override def getGroup(): List[String] = {
List(StopGroup.FileGroup.toString) List(StopGroup.FileGroup)
} }
} }