This commit is contained in:
bao319 2020-03-26 15:17:27 +08:00
parent 1b57b79022
commit 718e879312
31 changed files with 174 additions and 177 deletions

View File

@ -15,7 +15,7 @@
"name":"Distinct",
"bundle":"cn.piflow.bundle.common.Distinct",
"properties":{
"fields":"id"
"columnNames":"id"
}
}

View File

@ -15,7 +15,7 @@
"name":"ConvertSchema",
"bundle":"cn.piflow.bundle.common.DropField",
"properties":{
"fields":"id"
"columnNames":"id"
}
}

View File

@ -16,7 +16,7 @@
"bundle":"cn.piflow.bundle.common.ExecuteSQLStop",
"properties":{
"sql":"select * from temp where id = 0001",
"tempViewName": "temp"
"ViewName": "temp"
}
}
],

View File

@ -23,7 +23,7 @@
"name":"Join",
"bundle":"cn.piflow.bundle.common.Join",
"properties":{
"correlationField": "id",
"correlationColumn": "id",
"joinMode": "left"
}

View File

@ -15,7 +15,7 @@
"name":"SelectField",
"bundle":"cn.piflow.bundle.common.SelectField",
"properties":{
"fields":"id"
"columnNames":"id"
}
}

View File

Before

Width:  |  Height:  |  Size: 6.8 KiB

After

Width:  |  Height:  |  Size: 6.8 KiB

View File

@ -10,9 +10,9 @@ import org.apache.spark.sql.SparkSession
class EmailClean extends ConfigurableStop{
val authorEmail: String = "songdongze@cnic.cn"
val description: String = "Clean email format data."
val inportList: List[String] = List(Port.DefaultPort.toString)
val outportList: List[String] = List(Port.DefaultPort.toString)
val description: String = "Cleaning data in email format"
val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.DefaultPort)
var columnName:String= _
@ -54,7 +54,6 @@ class EmailClean extends ConfigurableStop{
out.write(dfNew1)
}
def initialize(ctx: ProcessContext): Unit = {
}
@ -69,8 +68,8 @@ class EmailClean extends ConfigurableStop{
var descriptor : List[PropertyDescriptor] = List()
val columnName = new PropertyDescriptor()
.name("columnName")
.displayName("Column_Name")
.description("The columnName you want to clean,Multiple are separated by commas")
.displayName("Column Name")
.description("Column names are what you want to clean,multiple column names are separated by commas")
.defaultValue("")
.required(true)
.example("email")

View File

@ -13,9 +13,9 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
class IdentityNumberClean extends ConfigurableStop{
val authorEmail: String = "06whuxx@163.com"
val description: String = "Clean Id Card data."
val inportList: List[String] = List(Port.DefaultPort.toString)
val outportList: List[String] = List(Port.DefaultPort.toString)
val description: String = "Cleaning data in ID format"
val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.DefaultPort)
var columnName:String=_
@ -72,7 +72,7 @@ class IdentityNumberClean extends ConfigurableStop{
val columnName = new PropertyDescriptor()
.name("columnName")
.displayName("Column_Name")
.description("The columnName you want to clean,Multiple are separated by commas")
.description("Column names are what you want to clean,multiple column names are separated by commas")
.defaultValue("")
.required(true)
.example("idcard")

View File

@ -11,9 +11,9 @@ import org.apache.spark.sql.types.StructField
class PhoneNumberClean extends ConfigurableStop{
val authorEmail: String = "06whuxx@163.com"
val description: String = "Clean phone number format data."
val inportList: List[String] = List(Port.DefaultPort.toString)
val outportList: List[String] = List(Port.DefaultPort.toString)
val description: String = "Cleaning data in mobile number format"
val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.DefaultPort)
var columnName:String=_
@ -72,7 +72,7 @@ class PhoneNumberClean extends ConfigurableStop{
val columnName = new PropertyDescriptor()
.name("columnName")
.displayName("Column_Name")
.description("The columnName you want to clean,Multiple are separated by commas")
.description("Column names are what you want to clean,multiple column names are separated by commas")
.defaultValue("")
.required(true)
.example("phonenumber")

View File

@ -9,9 +9,9 @@ import org.apache.spark.sql.SparkSession
class ProvinceClean extends ConfigurableStop{
val authorEmail: String = "songdongze@cnic.cn"
val description: String = "Clean email format data."
val inportList: List[String] = List(Port.DefaultPort.toString)
val outportList: List[String] = List(Port.DefaultPort.toString)
val description: String = "Cleaning province data"
val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.DefaultPort)
var columnName:String=_
@ -69,7 +69,7 @@ class ProvinceClean extends ConfigurableStop{
val columnName = new PropertyDescriptor()
.name("columnName")
.displayName("Column_Name")
.description("The columnName you want to clean,Multiple are separated by commas")
.description("Column names are what you want to clean,multiple column names are separated by commas")
.defaultValue("")
.required(true)
.example("province")

View File

@ -10,9 +10,9 @@ import org.apache.spark.sql.types.StructField
class TitleClean extends ConfigurableStop{
val authorEmail: String = "06whuxx@163.com"
val description: String = "Clean title format data."
val inportList: List[String] = List(Port.DefaultPort.toString)
val outportList: List[String] = List(Port.DefaultPort.toString)
val description: String = "Cleaning title data"
val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.DefaultPort)
var columnName:String=_
@ -65,15 +65,16 @@ class TitleClean extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val columnName = new PropertyDescriptor()
.name("columnName")
.displayName("Column_Name")
.description("The columnName you want to clean,Multiple are separated by commas")
.description("Column names are what you want to clean,multiple column names are separated by commas")
.defaultValue("")
.required(true)
.example("title")
descriptor = columnName :: descriptor
descriptor
}

View File

@ -9,14 +9,11 @@ import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.spark.sql.{DataFrame, SparkSession}
class AddUUIDStop extends ConfigurableStop{
override val authorEmail: String = "ygang@cnic.cn"
override val description: String = "Add UUID column"
override val inportList: List[String] =List(Port.DefaultPort.toString)
override val outportList: List[String] = List(Port.DefaultPort.toString)
override val description: String = "Add UUID column"
override val inportList: List[String] =List(Port.DefaultPort)
override val outportList: List[String] = List(Port.DefaultPort)
var column:String=_
@ -24,11 +21,10 @@ class AddUUIDStop extends ConfigurableStop{
val spark = pec.get[SparkSession]()
var df = in.read()
spark.udf.register("generaterUUID",()=>UUID.randomUUID().toString.replace("-",""))
spark.udf.register("generateUUID",()=>UUID.randomUUID().toString.replace("-",""))
df.createOrReplaceTempView("temp")
df = spark.sql(s"select generaterUUID() as ${column},* from temp")
df = spark.sql(s"select generateUUID() as ${column},* from temp")
out.write(df)
@ -44,9 +40,10 @@ class AddUUIDStop extends ConfigurableStop{
var descriptor : List[PropertyDescriptor] = List()
val column = new PropertyDescriptor().name("column")
val column = new PropertyDescriptor()
.name("column")
.displayName("Column")
.description("The column is you want to add uuid column's name,")
.description("The column is the name of the uuid you want to add")
.defaultValue("uuid")
.required(true)
.example("uuid")

View File

@ -10,8 +10,8 @@ class ConvertSchema extends ConfigurableStop {
val authorEmail: String = "yangqidong@cnic.cn"
val description: String = "Change field name"
val inportList: List[String] = List(Port.DefaultPort.toString)
val outportList: List[String] = List(Port.DefaultPort.toString)
val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.DefaultPort)
var schema:String = _
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
@ -40,7 +40,7 @@ class ConvertSchema extends ConfigurableStop {
var descriptor : List[PropertyDescriptor] = List()
val inports = new PropertyDescriptor().name("schema")
.displayName("Schema")
.description("Change field name, you can write oldField1 -> newField1, Multiple separated by commas, Such as 'oldField1->newField1, oldField2->newField2' ")
.description("Change column names,multiple column names are separated by commas")
.defaultValue("")
.required(true)
.example("id->uuid")
@ -53,7 +53,7 @@ class ConvertSchema extends ConfigurableStop {
}
override def getGroup(): List[String] = {
List(StopGroup.CommonGroup.toString)
List(StopGroup.CommonGroup)
}
}

View File

@ -8,22 +8,23 @@ import org.apache.spark.sql.DataFrame
class Distinct extends ConfigurableStop{
override val authorEmail: String = "yangqidong@cnic.cn"
override val description: String = "De duplicate data according to all fields or specified fields "
override val inportList: List[String] =List(Port.DefaultPort.toString)
override val outportList: List[String] = List(Port.DefaultPort.toString)
override val description: String = "Duplicate based on the specified column name or all colume names"
override val inportList: List[String] =List(Port.DefaultPort)
override val outportList: List[String] = List(Port.DefaultPort)
var fields:String=_
var columnNames:String=_
override def setProperties(map: Map[String, Any]): Unit = {
fields = MapUtil.get(map,"fields").asInstanceOf[String]
columnNames = MapUtil.get(map,"columnNames").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val fields = new PropertyDescriptor().name("fields")
.displayName("Fields")
.description("De duplicate data according to all fields or specified fields,Multiple separated by commas ; If not, all fields will be de duplicated")
val fields = new PropertyDescriptor()
.name("columnNames")
.displayName("ColumnNames")
.description("Fill in the column names you want to duplicate,multiple columns names separated by commas,if not,all the columns will be deduplicated")
.defaultValue("")
.required(false)
.example("id")
@ -37,7 +38,7 @@ class Distinct extends ConfigurableStop{
}
override def getGroup(): List[String] = {
List(StopGroup.CommonGroup.toString)
List(StopGroup.CommonGroup)
}
@ -48,8 +49,8 @@ class Distinct extends ConfigurableStop{
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val inDf: DataFrame = in.read()
var outDf: DataFrame = null
if(fields.length > 0){
val fileArr: Array[String] = fields.split(",")
if(columnNames.length > 0){
val fileArr: Array[String] = columnNames.split(",")
outDf = inDf.dropDuplicates(fileArr)
}else{
outDf = inDf.distinct()

View File

@ -9,16 +9,16 @@ import cn.piflow.conf.util.{ImageUtil, MapUtil}
class DropField extends ConfigurableStop {
val authorEmail: String = "ygang@cnic.cn"
val description: String = "Delete fields in schema"
val inportList: List[String] = List(Port.DefaultPort.toString)
val outportList: List[String] = List(Port.DefaultPort.toString)
val description: String = "Delete one or more columns"
val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.DefaultPort)
var fields:String = _
var columnNames:String = _
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
var df = in.read()
val field = fields.split(",")
val field = columnNames.split(",")
for( x <- 0 until field.size){
df = df.drop(field(x))
}
@ -31,13 +31,15 @@ class DropField extends ConfigurableStop {
}
def setProperties(map : Map[String, Any]): Unit = {
fields = MapUtil.get(map,"fields").asInstanceOf[String]
columnNames = MapUtil.get(map,"columnNames").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val inports = new PropertyDescriptor().name("fields").displayName("Fields")
.description("Delete fields in schema,multiple are separated by commas")
val inports = new PropertyDescriptor()
.name("columnNames")
.displayName("ColumnNames")
.description("Fill in the columns you want to delete,multiple columns names separated by commas")
.defaultValue("")
.required(true)
.example("id")
@ -46,11 +48,11 @@ class DropField extends ConfigurableStop {
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("icon/common/DropField.png")
ImageUtil.getImage("icon/common/DropColumnNames.png")
}
override def getGroup(): List[String] = {
List(StopGroup.CommonGroup.toString)
List(StopGroup.CommonGroup)
}
}

View File

@ -16,18 +16,18 @@ class ExecuteSQLStop extends ConfigurableStop{
val authorEmail: String = "ygang@cnic.cn"
val description: String = "Create temporary view table to execute sql"
val inportList: List[String] = List(Port.DefaultPort.toString)
val outportList: List[String] = List(Port.DefaultPort.toString)
val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.DefaultPort)
var sql: String = _
var tempViewName: String = _
var ViewName: String = _
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
val inDF = in.read()
inDF.createOrReplaceTempView(tempViewName)
inDF.createOrReplaceTempView(ViewName)
val frame: DataFrame = spark.sql(sql)
out.write(frame)
@ -36,7 +36,7 @@ class ExecuteSQLStop extends ConfigurableStop{
override def setProperties(map: Map[String, Any]): Unit = {
sql = MapUtil.get(map,"sql").asInstanceOf[String]
tempViewName = MapUtil.get(map,"tempViewName").asInstanceOf[String]
ViewName = MapUtil.get(map,"ViewName").asInstanceOf[String]
}
override def initialize(ctx: ProcessContext): Unit = {
@ -49,18 +49,18 @@ class ExecuteSQLStop extends ConfigurableStop{
.description("Sql string")
.defaultValue("")
.required(true)
.example("select * from temp")
.example("select * from temp")
descriptor = sql :: descriptor
val tableName = new PropertyDescriptor()
.name("tempViewName")
.displayName("TempViewName")
.description(" Temporary view table")
val ViewName = new PropertyDescriptor()
.name("viewName")
.displayName("ViewName")
.description("Name of the temporary view table")
.defaultValue("temp")
.required(true)
.example("temp")
descriptor = tableName :: descriptor
descriptor = ViewName :: descriptor
descriptor
}
@ -69,7 +69,7 @@ class ExecuteSQLStop extends ConfigurableStop{
}
override def getGroup(): List[String] = {
List(StopGroup.CommonGroup.toString)
List(StopGroup.CommonGroup)
}

View File

@ -8,7 +8,7 @@ import org.apache.spark.sql.{Column, DataFrame}
class Filter extends ConfigurableStop{
override val authorEmail: String = "xjzhu@cnic.cn"
override val description: String = "Do filter by condition"
override val description: String = "Filter by condition"
override val inportList: List[String] = List(Port.DefaultPort)
override val outportList: List[String] = List(Port.DefaultPort)

View File

@ -9,9 +9,9 @@ import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
class Fork extends ConfigurableStop{
val authorEmail: String = "xjzhu@cnic.cn"
val description: String = "Fork data into different stops"
val inportList: List[String] = List(Port.DefaultPort.toString)
val outportList: List[String] = List(Port.AnyPort.toString)
val description: String = "Forking data to different stops"
val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.AnyPort)
var outports : List[String] = _
@ -33,7 +33,7 @@ class Fork extends ConfigurableStop{
var descriptor : List[PropertyDescriptor] = List()
val outports = new PropertyDescriptor().name("outports")
.displayName("outports")
.description("outports string, seperated by ,.")
.description("Output ports string with comma")
.defaultValue("")
.required(true)
descriptor = outports :: descriptor
@ -45,6 +45,6 @@ class Fork extends ConfigurableStop{
}
override def getGroup(): List[String] = {
List(StopGroup.CommonGroup.toString)
List(StopGroup.CommonGroup)
}
}

View File

@ -8,12 +8,12 @@ import org.apache.spark.sql.{Column, DataFrame}
class Join extends ConfigurableStop{
override val authorEmail: String = "yangqidong@cnic.cn"
override val description: String = "Table connection, including full connection, left connection, right connection and inner connection"
override val inportList: List[String] =List(Port.LeftPort.toString,Port.RightPort.toString)
override val outportList: List[String] = List(Port.DefaultPort.toString)
override val description: String = "Table joins include full join, left join, right join and inner join"
override val inportList: List[String] =List(Port.LeftPort,Port.RightPort)
override val outportList: List[String] = List(Port.DefaultPort)
var joinMode:String=_
var correlationField:String=_
var correlationColumn:String=_
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
@ -21,7 +21,7 @@ class Join extends ConfigurableStop{
val rightDF = in.read(Port.RightPort)
var seq: Seq[String]= Seq()
correlationField.split(",").foreach(x=>{
correlationColumn.split(",").foreach(x=>{
seq = seq .++(Seq(x.toString))
})
@ -37,27 +37,30 @@ class Join extends ConfigurableStop{
override def setProperties(map: Map[String, Any]): Unit = {
joinMode = MapUtil.get(map,"joinMode").asInstanceOf[String]
correlationField = MapUtil.get(map,"correlationField").asInstanceOf[String]
correlationColumn = MapUtil.get(map,"correlationColumn").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val joinMode = new PropertyDescriptor().name("joinMode")
.displayName("joinMode")
.description("For table association, you can choose INNER, LEFT, RIGHT, FULL")
val joinMode = new PropertyDescriptor()
.name("joinMode")
.displayName("JoinMode")
.description("For table associations, you can choose inner,left,right,full")
.allowableValues(Set("inner","left","right","full_outer"))
.defaultValue("inner")
.required(true)
.example("left")
descriptor = joinMode :: descriptor
val correlationField = new PropertyDescriptor()
.name("correlationField")
.displayName("correlationField")
.description("Fields associated with tables,If there are more than one, please use , separate")
val correlationColumn = new PropertyDescriptor()
.name("correlationColumn")
.displayName("CorrelationColumn")
.description("Columns associated with tables,if multiple are separated by commas")
.defaultValue("")
.required(true)
descriptor = correlationField :: descriptor
.example("id,name")
descriptor = correlationColumn :: descriptor
descriptor
}
@ -67,7 +70,7 @@ class Join extends ConfigurableStop{
}
override def getGroup(): List[String] = {
List(StopGroup.CommonGroup.toString)
List(StopGroup.CommonGroup)
}

View File

@ -5,14 +5,12 @@ import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import scala.beans.BeanProperty
class Merge extends ConfigurableStop{
val authorEmail: String = "xjzhu@cnic.cn"
val description: String = "Merge data into one stop"
val inportList: List[String] = List(Port.AnyPort.toString)
val outportList: List[String] = List(Port.DefaultPort.toString)
val inportList: List[String] = List(Port.AnyPort)
val outportList: List[String] = List(Port.DefaultPort)
var inports : List[String] = _
@ -33,8 +31,8 @@ class Merge extends ConfigurableStop{
var descriptor : List[PropertyDescriptor] = List()
val inports = new PropertyDescriptor()
.name("inports")
.displayName("inports")
.description("inports string, seperated by ,.")
.displayName("Inports")
.description("Inports string are separated by commas")
.defaultValue("")
.required(true)
descriptor = inports :: descriptor
@ -46,7 +44,7 @@ class Merge extends ConfigurableStop{
}
override def getGroup(): List[String] = {
List(StopGroup.CommonGroup.toString)
List(StopGroup.CommonGroup)
}
}

View File

@ -8,15 +8,14 @@ import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
class Route extends ConfigurableStop{
val authorEmail: String = "xjzhu@cnic.cn"
val description: String = "Route data by customizedProperties, key is port & value is filter condition"
val description: String = "Route data by custom properties,key is port,value is filter"
val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.RoutePort)
override val isCustomized: Boolean = true
override def setProperties(map: Map[String, Any]): Unit = {
//val outportStr = MapUtil.get(map,"outports").asInstanceOf[String]
//outports = outportStr.split(",").toList
}
override def initialize(ctx: ProcessContext): Unit = {
@ -40,8 +39,7 @@ class Route extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
//val outports = new PropertyDescriptor().name("outports").displayName("outports").description("outports string, seperated by ,.").defaultValue("").required(true)
//descriptor = outports :: descriptor
descriptor
}
@ -50,6 +48,6 @@ class Route extends ConfigurableStop{
}
override def getGroup(): List[String] = {
List(StopGroup.CommonGroup.toString)
List(StopGroup.CommonGroup)
}
}

View File

@ -12,16 +12,16 @@ import scala.beans.BeanProperty
class SelectField extends ConfigurableStop {
val authorEmail: String = "xjzhu@cnic.cn"
val description: String = "Select data field"
val inportList: List[String] = List(Port.DefaultPort.toString)
val outportList: List[String] = List(Port.DefaultPort.toString)
val description: String = "Select data column"
val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.DefaultPort)
var fields:String = _
var columnNames:String = _
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val df = in.read()
val field = fields.split(",")
val field = columnNames.split(",")
val columnArray : Array[Column] = new Array[Column](field.size)
for(i <- 0 to field.size - 1){
columnArray(i) = new Column(field(i))
@ -36,17 +36,18 @@ class SelectField extends ConfigurableStop {
}
def setProperties(map : Map[String, Any]): Unit = {
fields = MapUtil.get(map,"fields").asInstanceOf[String]
columnNames = MapUtil.get(map,"columnNames").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val inports = new PropertyDescriptor()
.name("fields")
.displayName("Fields")
.description("The fields you want to select")
.name("columnNames")
.displayName("ColumnNames")
.description("Select the column you want,multiple columns separated by commas")
.defaultValue("")
.required(true)
.example("id,name")
descriptor = inports :: descriptor
descriptor
}

View File

@ -10,9 +10,9 @@ import org.apache.spark.sql.{DataFrame, Row, SparkSession}
class Subtract extends ConfigurableStop{
override val authorEmail: String = "yangqidong@cnic.cn"
override val description: String = "Delete the data existing in the right table from the left table"
override val inportList: List[String] =List(Port.LeftPort.toString,Port.RightPort.toString)
override val outportList: List[String] = List(Port.DefaultPort.toString)
override val description: String = "Delete the existing data in the right table from the left table"
override val inportList: List[String] =List(Port.LeftPort,Port.RightPort)
override val outportList: List[String] = List(Port.DefaultPort)
override def setProperties(map: Map[String, Any]): Unit = {
}
@ -28,7 +28,7 @@ class Subtract extends ConfigurableStop{
}
override def getGroup(): List[String] = {
List(StopGroup.CommonGroup.toString)
List(StopGroup.CommonGroup)
}
override def initialize(ctx: ProcessContext): Unit = {

View File

@ -9,8 +9,8 @@ class Trager extends ConfigurableStop{
val authorEmail: String = "ygang@cnic.cn"
val description: String = "Upstream and downstream middleware"
val inportList: List[String] = List(Port.AnyPort.toString)
val outportList: List[String] = List(Port.AnyPort.toString)
val inportList: List[String] = List(Port.AnyPort)
val outportList: List[String] = List(Port.AnyPort)
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
@ -35,7 +35,7 @@ class Trager extends ConfigurableStop{
}
override def getGroup(): List[String] = {
List(StopGroup.CommonGroup.toString)
List(StopGroup.CommonGroup)
}
}

View File

@ -12,8 +12,8 @@ class CsvParser extends ConfigurableStop{
val authorEmail: String = "xjzhu@cnic.cn"
val description: String = "Parse csv file"
val inportList: List[String] = List(Port.NonePort.toString)
val outportList: List[String] = List(Port.DefaultPort.toString)
val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.DefaultPort)
var csvPath: String = _
var header: Boolean = _
@ -72,7 +72,7 @@ class CsvParser extends ConfigurableStop{
.description("The path of csv file")
.defaultValue("")
.required(true)
.example("")
.example("hdfs://192.168.3.138:8020/test/")
descriptor = csvPath :: descriptor
val header = new PropertyDescriptor()
@ -82,7 +82,7 @@ class CsvParser extends ConfigurableStop{
.defaultValue("false")
.allowableValues(Set("true","false"))
.required(true)
.example("")
.example("true")
descriptor = header :: descriptor
val delimiter = new PropertyDescriptor()
@ -91,7 +91,7 @@ class CsvParser extends ConfigurableStop{
.description("The delimiter of csv file")
.defaultValue("")
.required(true)
.example("")
.example(",")
descriptor = delimiter :: descriptor
val schema = new PropertyDescriptor()
@ -100,7 +100,7 @@ class CsvParser extends ConfigurableStop{
.description("The schema of csv file")
.defaultValue("")
.required(false)
.example("")
.example("id,name,gender,age")
descriptor = schema :: descriptor
descriptor

View File

@ -8,9 +8,9 @@ import org.apache.spark.sql.SaveMode
class CsvSave extends ConfigurableStop{
val authorEmail: String = "xjzhu@cnic.cn"
val description: String = "Save data into csv file."
val inportList: List[String] = List(Port.DefaultPort.toString)
val outportList: List[String] = List(Port.DefaultPort.toString)
val description: String = "Save the data as a csv file."
val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.DefaultPort)
var csvSavePath: String = _
var header: Boolean = _
@ -33,7 +33,7 @@ class CsvSave extends ConfigurableStop{
.description("The save path of csv file")
.defaultValue("")
.required(true)
.example("")
.example("hdfs://192.168.3.138:8020/test/")
descriptor = csvSavePath :: descriptor
val header = new PropertyDescriptor()
@ -42,6 +42,7 @@ class CsvSave extends ConfigurableStop{
.description("Whether the csv file has a header")
.defaultValue("")
.required(true)
.example("true")
descriptor = header :: descriptor
val delimiter = new PropertyDescriptor()
@ -50,16 +51,16 @@ class CsvSave extends ConfigurableStop{
.description("The delimiter of csv file")
.defaultValue(",")
.required(true)
.example("")
.example(",")
descriptor = delimiter :: descriptor
val partition = new PropertyDescriptor()
.name("partition")
.displayName("Partition")
.description("The partition of csv file")
.defaultValue("1")
.defaultValue("")
.required(true)
.example("")
.example("3")
descriptor = partition :: descriptor
descriptor
@ -79,6 +80,7 @@ class CsvSave extends ConfigurableStop{
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val df = in.read()
df.repartition(partition.toInt).write
.format("csv")
.mode(SaveMode.Overwrite)

View File

@ -11,12 +11,12 @@ import org.apache.spark.sql.{DataFrame, Row, SparkSession}
class CsvStringParser extends ConfigurableStop{
override val authorEmail: String = "yangqidong@cnic.cn"
val inportList: List[String] = List(Port.NonePort.toString)
val outportList: List[String] = List(Port.DefaultPort.toString)
val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.DefaultPort)
override val description: String = "Parse csv string"
var Str:String=_
var str:String=_
var delimiter: String = _
var schema: String = _
@ -25,7 +25,7 @@ class CsvStringParser extends ConfigurableStop{
val session: SparkSession = pec.get[SparkSession]
val context: SparkContext = session.sparkContext
val arrStr: Array[String] = Str.split("\n")
val arrStr: Array[String] = str.split("\n")
var num:Int=0
val listROW: List[Row] = arrStr.map(line => {
@ -50,7 +50,7 @@ class CsvStringParser extends ConfigurableStop{
override def setProperties(map: Map[String, Any]): Unit = {
Str = MapUtil.get(map,"Str").asInstanceOf[String]
str = MapUtil.get(map,"String").asInstanceOf[String]
delimiter = MapUtil.get(map,"delimiter").asInstanceOf[String]
schema = MapUtil.get(map,"schema").asInstanceOf[String]
}
@ -58,17 +58,17 @@ class CsvStringParser extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val Str = new PropertyDescriptor()
val str = new PropertyDescriptor()
.name("Str")
.displayName("Str")
.defaultValue("")
.required(true)
.example("")
descriptor = Str :: descriptor
descriptor = str :: descriptor
val delimiter = new PropertyDescriptor()
.name("delimiter")
.displayName("delimiter")
.displayName("Delimiter")
.description("The delimiter of CSV string")
.defaultValue("")
.required(true)
@ -77,7 +77,7 @@ class CsvStringParser extends ConfigurableStop{
val schema = new PropertyDescriptor()
.name("schema")
.displayName("schema")
.displayName("Schema")
.description("The schema of CSV string")
.defaultValue("")
.required(false)

View File

@ -7,12 +7,12 @@ import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.spark.sql.SparkSession
class FetchEs extends ConfigurableStop {
class FetchElasticsearch extends ConfigurableStop {
val authorEmail: String = "ygang@cnic.cn"
val description: String = "Fetch data from Elasticsearch"
val inportList: List[String] = List(Port.NonePort.toString)
val outportList: List[String] = List(Port.DefaultPort.toString)
val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.DefaultPort)
var es_nodes : String = _
var es_port : String = _

View File

@ -7,12 +7,12 @@ import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.spark.sql.SparkSession
import org.elasticsearch.spark.sql.EsSparkSQL
class PutEs extends ConfigurableStop {
class PutElasticsearch extends ConfigurableStop {
val authorEmail: String = "ygang@cnic.cn"
val description: String = "Put data into Elasticsearch"
val inportList: List[String] = List(Port.DefaultPort.toString)
val outportList: List[String] = List(Port.NonePort.toString)
val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.DefaultPort)
var es_nodes : String = _
var es_port : String = _
@ -20,7 +20,6 @@ class PutEs extends ConfigurableStop {
var es_type : String = _
var configuration_item:String = _
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
val inDf = in.read()

View File

@ -6,12 +6,12 @@ import cn.piflow.conf.{ConfigurableStop, Port, StopGroup}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.spark.sql.SparkSession
class QueryEs extends ConfigurableStop {
class QueryElasticsearch extends ConfigurableStop {
val authorEmail: String = "ygang@cnic.cn"
val description: String = "Query data from Elasticsearch"
val inportList: List[String] = List(Port.NonePort.toString)
val outportList: List[String] = List(Port.DefaultPort.toString)
val inportList: List[String] = List(Port.NonePort)
val outportList: List[String] = List(Port.DefaultPort)
var es_nodes : String = _
var es_port : String = _

View File

@ -6,15 +6,13 @@ import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.conf._
import org.apache.spark.sql.SparkSession
class RegexTextProcess extends ConfigurableStop{
val authorEmail: String = "06whuxx@163.com"
val description: String = "Use regex to replace text"
val inportList: List[String] = List(Port.DefaultPort.toString)
val outportList: List[String] = List(Port.DefaultPort.toString)
val description: String = "Replace values in a column with regex"
val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.DefaultPort)
var regex:String =_
var regex:String =_
var columnName:String=_
var replaceStr:String=_
@ -28,11 +26,9 @@ class RegexTextProcess extends ConfigurableStop{
sqlContext.udf.register("regexPro",(str:String)=>str.replaceAll(regexText,replaceText))
val sqlText:String="select *,regexPro("+columnName+") as "+columnName+"_new from thesis"
val dfNew=sqlContext.sql(sqlText)
//dfNew.show()
out.write(dfNew)
}
def initialize(ctx: ProcessContext): Unit = {
}
@ -47,31 +43,31 @@ class RegexTextProcess extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val regex = new PropertyDescriptor().name("regex")
val regex = new PropertyDescriptor()
.name("regex")
.displayName("Regex")
.description("regex")
.defaultValue("")
.required(true)
.example("")
.example("0001")
descriptor = regex :: descriptor
val columnName = new PropertyDescriptor()
.name("columnName")
.displayName("ColumnName")
.description("Field name of schema")
.description("The columns you want to replace")
.defaultValue("")
.required(true)
.example("")
.example("id")
descriptor = columnName :: descriptor
val replaceStr = new PropertyDescriptor()
.name("replaceStr")
.displayName("ReplaceStr")
.description("Replaced string")
.description("Value after replacement")
.defaultValue("")
.required(true)
.example("")
descriptor = regex :: descriptor
descriptor = columnName :: descriptor
.example("1111")
descriptor = replaceStr :: descriptor
descriptor
}
@ -81,6 +77,6 @@ class RegexTextProcess extends ConfigurableStop{
}
override def getGroup(): List[String] = {
List(StopGroup.FileGroup.toString)
List(StopGroup.FileGroup)
}
}