From 718e8793125945bf6720270986e3229ab5799ada Mon Sep 17 00:00:00 2001 From: bao319 <826013665@qq.com> Date: Thu, 26 Mar 2020 15:17:27 +0800 Subject: [PATCH] clean --- .../main/resources/flow/common/distinct.json | 2 +- .../main/resources/flow/common/dropField.json | 2 +- .../resources/flow/common/executeSql.json | 2 +- .../src/main/resources/flow/common/join.json | 2 +- .../resources/flow/common/selectField.json | 2 +- .../{DropField.png => DropColumnNames.png} | Bin .../cn/piflow/bundle/clean/EmailClean.scala | 11 +++--- .../bundle/clean/IdentityNumberClean.scala | 8 ++--- .../bundle/clean/PhoneNumberClean.scala | 8 ++--- .../piflow/bundle/clean/ProvinceClean.scala | 8 ++--- .../cn/piflow/bundle/clean/TitleClean.scala | 11 +++--- .../cn/piflow/bundle/common/AddUUIDStop.scala | 21 +++++------ .../piflow/bundle/common/ConvertSchema.scala | 8 ++--- .../cn/piflow/bundle/common/Distinct.scala | 23 ++++++------ .../cn/piflow/bundle/common/DropField.scala | 22 ++++++------ .../piflow/bundle/common/ExecuteSQLStop.scala | 24 ++++++------- .../cn/piflow/bundle/common/Filter.scala | 2 +- .../scala/cn/piflow/bundle/common/Fork.scala | 10 +++--- .../scala/cn/piflow/bundle/common/Join.scala | 33 ++++++++++-------- .../scala/cn/piflow/bundle/common/Merge.scala | 12 +++---- .../scala/cn/piflow/bundle/common/Route.scala | 10 +++--- .../cn/piflow/bundle/common/SelectField.scala | 19 +++++----- .../cn/piflow/bundle/common/Subtract.scala | 8 ++--- .../cn/piflow/bundle/common/Trager.scala | 6 ++-- .../cn/piflow/bundle/csv/CsvParser.scala | 12 +++---- .../scala/cn/piflow/bundle/csv/CsvSave.scala | 16 +++++---- .../piflow/bundle/csv/CsvStringParser.scala | 18 +++++----- ...FetchEs.scala => FetchElasticsearch.scala} | 6 ++-- .../{PutEs.scala => PutElasticsearch.scala} | 7 ++-- ...QueryEs.scala => QueryElasticsearch.scala} | 6 ++-- .../piflow/bundle/file/RegexTextProcess.scala | 32 ++++++++--------- 31 files changed, 174 insertions(+), 177 deletions(-) rename piflow-bundle/src/main/resources/icon/common/{DropField.png => DropColumnNames.png} (100%) rename piflow-bundle/src/main/scala/cn/piflow/bundle/es/{FetchEs.scala => FetchElasticsearch.scala} (94%) rename piflow-bundle/src/main/scala/cn/piflow/bundle/es/{PutEs.scala => PutElasticsearch.scala} (95%) rename piflow-bundle/src/main/scala/cn/piflow/bundle/es/{QueryEs.scala => QueryElasticsearch.scala} (94%) diff --git a/piflow-bundle/src/main/resources/flow/common/distinct.json b/piflow-bundle/src/main/resources/flow/common/distinct.json index 1863da0..04c28d2 100644 --- a/piflow-bundle/src/main/resources/flow/common/distinct.json +++ b/piflow-bundle/src/main/resources/flow/common/distinct.json @@ -15,7 +15,7 @@ "name":"Distinct", "bundle":"cn.piflow.bundle.common.Distinct", "properties":{ - "fields":"id" + "columnNames":"id" } } diff --git a/piflow-bundle/src/main/resources/flow/common/dropField.json b/piflow-bundle/src/main/resources/flow/common/dropField.json index 1bec0c9..0d8ba88 100644 --- a/piflow-bundle/src/main/resources/flow/common/dropField.json +++ b/piflow-bundle/src/main/resources/flow/common/dropField.json @@ -15,7 +15,7 @@ "name":"ConvertSchema", "bundle":"cn.piflow.bundle.common.DropField", "properties":{ - "fields":"id" + "columnNames":"id" } } diff --git a/piflow-bundle/src/main/resources/flow/common/executeSql.json b/piflow-bundle/src/main/resources/flow/common/executeSql.json index 67e4058..93d5c3a 100644 --- a/piflow-bundle/src/main/resources/flow/common/executeSql.json +++ b/piflow-bundle/src/main/resources/flow/common/executeSql.json @@ -16,7 +16,7 @@ "bundle":"cn.piflow.bundle.common.ExecuteSQLStop", "properties":{ "sql":"select * from temp where id = 0001", - "tempViewName": "temp" + "ViewName": "temp" } } ], diff --git a/piflow-bundle/src/main/resources/flow/common/join.json b/piflow-bundle/src/main/resources/flow/common/join.json index f5bf7d2..f025fb6 100644 --- a/piflow-bundle/src/main/resources/flow/common/join.json +++ b/piflow-bundle/src/main/resources/flow/common/join.json @@ -23,7 +23,7 @@ "name":"Join", "bundle":"cn.piflow.bundle.common.Join", "properties":{ - "correlationField": "id", + "correlationColumn": "id", "joinMode": "left" } diff --git a/piflow-bundle/src/main/resources/flow/common/selectField.json b/piflow-bundle/src/main/resources/flow/common/selectField.json index f215474..2d1f11c 100644 --- a/piflow-bundle/src/main/resources/flow/common/selectField.json +++ b/piflow-bundle/src/main/resources/flow/common/selectField.json @@ -15,7 +15,7 @@ "name":"SelectField", "bundle":"cn.piflow.bundle.common.SelectField", "properties":{ - "fields":"id" + "columnNames":"id" } } diff --git a/piflow-bundle/src/main/resources/icon/common/DropField.png b/piflow-bundle/src/main/resources/icon/common/DropColumnNames.png similarity index 100% rename from piflow-bundle/src/main/resources/icon/common/DropField.png rename to piflow-bundle/src/main/resources/icon/common/DropColumnNames.png diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/clean/EmailClean.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/clean/EmailClean.scala index 91971bb..43ac683 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/clean/EmailClean.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/clean/EmailClean.scala @@ -10,9 +10,9 @@ import org.apache.spark.sql.SparkSession class EmailClean extends ConfigurableStop{ val authorEmail: String = "songdongze@cnic.cn" - val description: String = "Clean email format data." - val inportList: List[String] = List(Port.DefaultPort.toString) - val outportList: List[String] = List(Port.DefaultPort.toString) + val description: String = "Cleaning data in email format" + val inportList: List[String] = List(Port.DefaultPort) + val outportList: List[String] = List(Port.DefaultPort) var columnName:String= _ @@ -54,7 +54,6 @@ class EmailClean extends ConfigurableStop{ out.write(dfNew1) } - def initialize(ctx: ProcessContext): Unit = { } @@ -69,8 +68,8 @@ class EmailClean extends ConfigurableStop{ var descriptor : List[PropertyDescriptor] = List() val columnName = new PropertyDescriptor() .name("columnName") - .displayName("Column_Name") - .description("The columnName you want to clean,Multiple are separated by commas") + .displayName("Column Name") + .description("Column names are what you want to clean,multiple column names are separated by commas") .defaultValue("") .required(true) .example("email") diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/clean/IdentityNumberClean.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/clean/IdentityNumberClean.scala index 222d48a..43a9ad8 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/clean/IdentityNumberClean.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/clean/IdentityNumberClean.scala @@ -13,9 +13,9 @@ import org.apache.spark.sql.{DataFrame, SparkSession} class IdentityNumberClean extends ConfigurableStop{ val authorEmail: String = "06whuxx@163.com" - val description: String = "Clean Id Card data." - val inportList: List[String] = List(Port.DefaultPort.toString) - val outportList: List[String] = List(Port.DefaultPort.toString) + val description: String = "Cleaning data in ID format" + val inportList: List[String] = List(Port.DefaultPort) + val outportList: List[String] = List(Port.DefaultPort) var columnName:String=_ @@ -72,7 +72,7 @@ class IdentityNumberClean extends ConfigurableStop{ val columnName = new PropertyDescriptor() .name("columnName") .displayName("Column_Name") - .description("The columnName you want to clean,Multiple are separated by commas") + .description("Column names are what you want to clean,multiple column names are separated by commas") .defaultValue("") .required(true) .example("idcard") diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/clean/PhoneNumberClean.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/clean/PhoneNumberClean.scala index b96afba..d0a65d3 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/clean/PhoneNumberClean.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/clean/PhoneNumberClean.scala @@ -11,9 +11,9 @@ import org.apache.spark.sql.types.StructField class PhoneNumberClean extends ConfigurableStop{ val authorEmail: String = "06whuxx@163.com" - val description: String = "Clean phone number format data." - val inportList: List[String] = List(Port.DefaultPort.toString) - val outportList: List[String] = List(Port.DefaultPort.toString) + val description: String = "Cleaning data in mobile number format" + val inportList: List[String] = List(Port.DefaultPort) + val outportList: List[String] = List(Port.DefaultPort) var columnName:String=_ @@ -72,7 +72,7 @@ class PhoneNumberClean extends ConfigurableStop{ val columnName = new PropertyDescriptor() .name("columnName") .displayName("Column_Name") - .description("The columnName you want to clean,Multiple are separated by commas") + .description("Column names are what you want to clean,multiple column names are separated by commas") .defaultValue("") .required(true) .example("phonenumber") diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/clean/ProvinceClean.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/clean/ProvinceClean.scala index a7824f0..0918834 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/clean/ProvinceClean.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/clean/ProvinceClean.scala @@ -9,9 +9,9 @@ import org.apache.spark.sql.SparkSession class ProvinceClean extends ConfigurableStop{ val authorEmail: String = "songdongze@cnic.cn" - val description: String = "Clean email format data." - val inportList: List[String] = List(Port.DefaultPort.toString) - val outportList: List[String] = List(Port.DefaultPort.toString) + val description: String = "Cleaning province data" + val inportList: List[String] = List(Port.DefaultPort) + val outportList: List[String] = List(Port.DefaultPort) var columnName:String=_ @@ -69,7 +69,7 @@ class ProvinceClean extends ConfigurableStop{ val columnName = new PropertyDescriptor() .name("columnName") .displayName("Column_Name") - .description("The columnName you want to clean,Multiple are separated by commas") + .description("Column names are what you want to clean,multiple column names are separated by commas") .defaultValue("") .required(true) .example("province") diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/clean/TitleClean.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/clean/TitleClean.scala index b002381..a176980 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/clean/TitleClean.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/clean/TitleClean.scala @@ -10,9 +10,9 @@ import org.apache.spark.sql.types.StructField class TitleClean extends ConfigurableStop{ val authorEmail: String = "06whuxx@163.com" - val description: String = "Clean title format data." - val inportList: List[String] = List(Port.DefaultPort.toString) - val outportList: List[String] = List(Port.DefaultPort.toString) + val description: String = "Cleaning title data" + val inportList: List[String] = List(Port.DefaultPort) + val outportList: List[String] = List(Port.DefaultPort) var columnName:String=_ @@ -65,15 +65,16 @@ class TitleClean extends ConfigurableStop{ override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() + val columnName = new PropertyDescriptor() .name("columnName") .displayName("Column_Name") - .description("The columnName you want to clean,Multiple are separated by commas") + .description("Column names are what you want to clean,multiple column names are separated by commas") .defaultValue("") .required(true) .example("title") - descriptor = columnName :: descriptor + descriptor } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/AddUUIDStop.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/AddUUIDStop.scala index 0880b14..d01c375 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/AddUUIDStop.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/AddUUIDStop.scala @@ -9,14 +9,11 @@ import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} import org.apache.spark.sql.{DataFrame, SparkSession} class AddUUIDStop extends ConfigurableStop{ + override val authorEmail: String = "ygang@cnic.cn" - - - - override val description: String = "Add UUID column" - override val inportList: List[String] =List(Port.DefaultPort.toString) - override val outportList: List[String] = List(Port.DefaultPort.toString) - + override val description: String = "Add UUID column" + override val inportList: List[String] =List(Port.DefaultPort) + override val outportList: List[String] = List(Port.DefaultPort) var column:String=_ @@ -24,11 +21,10 @@ class AddUUIDStop extends ConfigurableStop{ val spark = pec.get[SparkSession]() var df = in.read() - - spark.udf.register("generaterUUID",()=>UUID.randomUUID().toString.replace("-","")) + spark.udf.register("generateUUID",()=>UUID.randomUUID().toString.replace("-","")) df.createOrReplaceTempView("temp") - df = spark.sql(s"select generaterUUID() as ${column},* from temp") + df = spark.sql(s"select generateUUID() as ${column},* from temp") out.write(df) @@ -44,9 +40,10 @@ class AddUUIDStop extends ConfigurableStop{ var descriptor : List[PropertyDescriptor] = List() - val column = new PropertyDescriptor().name("column") + val column = new PropertyDescriptor() + .name("column") .displayName("Column") - .description("The column is you want to add uuid column's name,") + .description("The column is the name of the uuid you want to add") .defaultValue("uuid") .required(true) .example("uuid") diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/ConvertSchema.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/ConvertSchema.scala index 32012e4..06a5452 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/ConvertSchema.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/ConvertSchema.scala @@ -10,8 +10,8 @@ class ConvertSchema extends ConfigurableStop { val authorEmail: String = "yangqidong@cnic.cn" val description: String = "Change field name" - val inportList: List[String] = List(Port.DefaultPort.toString) - val outportList: List[String] = List(Port.DefaultPort.toString) + val inportList: List[String] = List(Port.DefaultPort) + val outportList: List[String] = List(Port.DefaultPort) var schema:String = _ def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { @@ -40,7 +40,7 @@ class ConvertSchema extends ConfigurableStop { var descriptor : List[PropertyDescriptor] = List() val inports = new PropertyDescriptor().name("schema") .displayName("Schema") - .description("Change field name, you can write oldField1 -> newField1, Multiple separated by commas, Such as 'oldField1->newField1, oldField2->newField2' ") + .description("Change column names,multiple column names are separated by commas") .defaultValue("") .required(true) .example("id->uuid") @@ -53,7 +53,7 @@ class ConvertSchema extends ConfigurableStop { } override def getGroup(): List[String] = { - List(StopGroup.CommonGroup.toString) + List(StopGroup.CommonGroup) } } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Distinct.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Distinct.scala index fbb84e7..38e493e 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Distinct.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Distinct.scala @@ -8,22 +8,23 @@ import org.apache.spark.sql.DataFrame class Distinct extends ConfigurableStop{ override val authorEmail: String = "yangqidong@cnic.cn" - override val description: String = "De duplicate data according to all fields or specified fields " - override val inportList: List[String] =List(Port.DefaultPort.toString) - override val outportList: List[String] = List(Port.DefaultPort.toString) + override val description: String = "Duplicate based on the specified column name or all colume names" + override val inportList: List[String] =List(Port.DefaultPort) + override val outportList: List[String] = List(Port.DefaultPort) - var fields:String=_ + var columnNames:String=_ override def setProperties(map: Map[String, Any]): Unit = { - fields = MapUtil.get(map,"fields").asInstanceOf[String] + columnNames = MapUtil.get(map,"columnNames").asInstanceOf[String] } override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - val fields = new PropertyDescriptor().name("fields") - .displayName("Fields") - .description("De duplicate data according to all fields or specified fields,Multiple separated by commas ; If not, all fields will be de duplicated") + val fields = new PropertyDescriptor() + .name("columnNames") + .displayName("ColumnNames") + .description("Fill in the column names you want to duplicate,multiple columns names separated by commas,if not,all the columns will be deduplicated") .defaultValue("") .required(false) .example("id") @@ -37,7 +38,7 @@ class Distinct extends ConfigurableStop{ } override def getGroup(): List[String] = { - List(StopGroup.CommonGroup.toString) + List(StopGroup.CommonGroup) } @@ -48,8 +49,8 @@ class Distinct extends ConfigurableStop{ override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { val inDf: DataFrame = in.read() var outDf: DataFrame = null - if(fields.length > 0){ - val fileArr: Array[String] = fields.split(",") + if(columnNames.length > 0){ + val fileArr: Array[String] = columnNames.split(",") outDf = inDf.dropDuplicates(fileArr) }else{ outDf = inDf.distinct() diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/DropField.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/DropField.scala index 7c925f1..c1912c8 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/DropField.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/DropField.scala @@ -9,16 +9,16 @@ import cn.piflow.conf.util.{ImageUtil, MapUtil} class DropField extends ConfigurableStop { val authorEmail: String = "ygang@cnic.cn" - val description: String = "Delete fields in schema" - val inportList: List[String] = List(Port.DefaultPort.toString) - val outportList: List[String] = List(Port.DefaultPort.toString) + val description: String = "Delete one or more columns" + val inportList: List[String] = List(Port.DefaultPort) + val outportList: List[String] = List(Port.DefaultPort) - var fields:String = _ + var columnNames:String = _ def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { var df = in.read() - val field = fields.split(",") + val field = columnNames.split(",") for( x <- 0 until field.size){ df = df.drop(field(x)) } @@ -31,13 +31,15 @@ class DropField extends ConfigurableStop { } def setProperties(map : Map[String, Any]): Unit = { - fields = MapUtil.get(map,"fields").asInstanceOf[String] + columnNames = MapUtil.get(map,"columnNames").asInstanceOf[String] } override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - val inports = new PropertyDescriptor().name("fields").displayName("Fields") - .description("Delete fields in schema,multiple are separated by commas") + val inports = new PropertyDescriptor() + .name("columnNames") + .displayName("ColumnNames") + .description("Fill in the columns you want to delete,multiple columns names separated by commas") .defaultValue("") .required(true) .example("id") @@ -46,11 +48,11 @@ class DropField extends ConfigurableStop { } override def getIcon(): Array[Byte] = { - ImageUtil.getImage("icon/common/DropField.png") + ImageUtil.getImage("icon/common/DropColumnNames.png") } override def getGroup(): List[String] = { - List(StopGroup.CommonGroup.toString) + List(StopGroup.CommonGroup) } } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/ExecuteSQLStop.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/ExecuteSQLStop.scala index c928a2b..4c0fbca 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/ExecuteSQLStop.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/ExecuteSQLStop.scala @@ -16,18 +16,18 @@ class ExecuteSQLStop extends ConfigurableStop{ val authorEmail: String = "ygang@cnic.cn" val description: String = "Create temporary view table to execute sql" - val inportList: List[String] = List(Port.DefaultPort.toString) - val outportList: List[String] = List(Port.DefaultPort.toString) + val inportList: List[String] = List(Port.DefaultPort) + val outportList: List[String] = List(Port.DefaultPort) var sql: String = _ - var tempViewName: String = _ + var ViewName: String = _ override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { val spark = pec.get[SparkSession]() val inDF = in.read() - inDF.createOrReplaceTempView(tempViewName) + inDF.createOrReplaceTempView(ViewName) val frame: DataFrame = spark.sql(sql) out.write(frame) @@ -36,7 +36,7 @@ class ExecuteSQLStop extends ConfigurableStop{ override def setProperties(map: Map[String, Any]): Unit = { sql = MapUtil.get(map,"sql").asInstanceOf[String] - tempViewName = MapUtil.get(map,"tempViewName").asInstanceOf[String] + ViewName = MapUtil.get(map,"ViewName").asInstanceOf[String] } override def initialize(ctx: ProcessContext): Unit = { @@ -49,18 +49,18 @@ class ExecuteSQLStop extends ConfigurableStop{ .description("Sql string") .defaultValue("") .required(true) - .example("select * from temp") + .example("select * from temp") descriptor = sql :: descriptor - val tableName = new PropertyDescriptor() - .name("tempViewName") - .displayName("TempViewName") - .description(" Temporary view table") + val ViewName = new PropertyDescriptor() + .name("viewName") + .displayName("ViewName") + .description("Name of the temporary view table") .defaultValue("temp") .required(true) .example("temp") - descriptor = tableName :: descriptor + descriptor = ViewName :: descriptor descriptor } @@ -69,7 +69,7 @@ class ExecuteSQLStop extends ConfigurableStop{ } override def getGroup(): List[String] = { - List(StopGroup.CommonGroup.toString) + List(StopGroup.CommonGroup) } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Filter.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Filter.scala index 268b95b..fe2b164 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Filter.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Filter.scala @@ -8,7 +8,7 @@ import org.apache.spark.sql.{Column, DataFrame} class Filter extends ConfigurableStop{ override val authorEmail: String = "xjzhu@cnic.cn" - override val description: String = "Do filter by condition" + override val description: String = "Filter by condition" override val inportList: List[String] = List(Port.DefaultPort) override val outportList: List[String] = List(Port.DefaultPort) diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Fork.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Fork.scala index 36c9853..e650b72 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Fork.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Fork.scala @@ -9,9 +9,9 @@ import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} class Fork extends ConfigurableStop{ val authorEmail: String = "xjzhu@cnic.cn" - val description: String = "Fork data into different stops" - val inportList: List[String] = List(Port.DefaultPort.toString) - val outportList: List[String] = List(Port.AnyPort.toString) + val description: String = "Forking data to different stops" + val inportList: List[String] = List(Port.DefaultPort) + val outportList: List[String] = List(Port.AnyPort) var outports : List[String] = _ @@ -33,7 +33,7 @@ class Fork extends ConfigurableStop{ var descriptor : List[PropertyDescriptor] = List() val outports = new PropertyDescriptor().name("outports") .displayName("outports") - .description("outports string, seperated by ,.") + .description("Output ports string with comma") .defaultValue("") .required(true) descriptor = outports :: descriptor @@ -45,6 +45,6 @@ class Fork extends ConfigurableStop{ } override def getGroup(): List[String] = { - List(StopGroup.CommonGroup.toString) + List(StopGroup.CommonGroup) } } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Join.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Join.scala index 3764bbd..35203de 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Join.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Join.scala @@ -8,12 +8,12 @@ import org.apache.spark.sql.{Column, DataFrame} class Join extends ConfigurableStop{ override val authorEmail: String = "yangqidong@cnic.cn" - override val description: String = "Table connection, including full connection, left connection, right connection and inner connection" - override val inportList: List[String] =List(Port.LeftPort.toString,Port.RightPort.toString) - override val outportList: List[String] = List(Port.DefaultPort.toString) + override val description: String = "Table joins include full join, left join, right join and inner join" + override val inportList: List[String] =List(Port.LeftPort,Port.RightPort) + override val outportList: List[String] = List(Port.DefaultPort) var joinMode:String=_ - var correlationField:String=_ + var correlationColumn:String=_ override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { @@ -21,7 +21,7 @@ class Join extends ConfigurableStop{ val rightDF = in.read(Port.RightPort) var seq: Seq[String]= Seq() - correlationField.split(",").foreach(x=>{ + correlationColumn.split(",").foreach(x=>{ seq = seq .++(Seq(x.toString)) }) @@ -37,27 +37,30 @@ class Join extends ConfigurableStop{ override def setProperties(map: Map[String, Any]): Unit = { joinMode = MapUtil.get(map,"joinMode").asInstanceOf[String] - correlationField = MapUtil.get(map,"correlationField").asInstanceOf[String] + correlationColumn = MapUtil.get(map,"correlationColumn").asInstanceOf[String] } override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - val joinMode = new PropertyDescriptor().name("joinMode") - .displayName("joinMode") - .description("For table association, you can choose INNER, LEFT, RIGHT, FULL") + val joinMode = new PropertyDescriptor() + .name("joinMode") + .displayName("JoinMode") + .description("For table associations, you can choose inner,left,right,full") .allowableValues(Set("inner","left","right","full_outer")) .defaultValue("inner") .required(true) + .example("left") descriptor = joinMode :: descriptor - val correlationField = new PropertyDescriptor() - .name("correlationField") - .displayName("correlationField") - .description("Fields associated with tables,If there are more than one, please use , separate") + val correlationColumn = new PropertyDescriptor() + .name("correlationColumn") + .displayName("CorrelationColumn") + .description("Columns associated with tables,if multiple are separated by commas") .defaultValue("") .required(true) - descriptor = correlationField :: descriptor + .example("id,name") + descriptor = correlationColumn :: descriptor descriptor } @@ -67,7 +70,7 @@ class Join extends ConfigurableStop{ } override def getGroup(): List[String] = { - List(StopGroup.CommonGroup.toString) + List(StopGroup.CommonGroup) } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Merge.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Merge.scala index 8e371c6..b9715fa 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Merge.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Merge.scala @@ -5,14 +5,12 @@ import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.util.{ImageUtil, MapUtil} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} -import scala.beans.BeanProperty - class Merge extends ConfigurableStop{ val authorEmail: String = "xjzhu@cnic.cn" val description: String = "Merge data into one stop" - val inportList: List[String] = List(Port.AnyPort.toString) - val outportList: List[String] = List(Port.DefaultPort.toString) + val inportList: List[String] = List(Port.AnyPort) + val outportList: List[String] = List(Port.DefaultPort) var inports : List[String] = _ @@ -33,8 +31,8 @@ class Merge extends ConfigurableStop{ var descriptor : List[PropertyDescriptor] = List() val inports = new PropertyDescriptor() .name("inports") - .displayName("inports") - .description("inports string, seperated by ,.") + .displayName("Inports") + .description("Inports string are separated by commas") .defaultValue("") .required(true) descriptor = inports :: descriptor @@ -46,7 +44,7 @@ class Merge extends ConfigurableStop{ } override def getGroup(): List[String] = { - List(StopGroup.CommonGroup.toString) + List(StopGroup.CommonGroup) } } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Route.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Route.scala index 6a648e1..955a73d 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Route.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Route.scala @@ -8,15 +8,14 @@ import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} class Route extends ConfigurableStop{ val authorEmail: String = "xjzhu@cnic.cn" - val description: String = "Route data by customizedProperties, key is port & value is filter condition" + val description: String = "Route data by custom properties,key is port,value is filter" val inportList: List[String] = List(Port.DefaultPort) val outportList: List[String] = List(Port.RoutePort) override val isCustomized: Boolean = true override def setProperties(map: Map[String, Any]): Unit = { - //val outportStr = MapUtil.get(map,"outports").asInstanceOf[String] - //outports = outportStr.split(",").toList + } override def initialize(ctx: ProcessContext): Unit = { @@ -40,8 +39,7 @@ class Route extends ConfigurableStop{ override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - //val outports = new PropertyDescriptor().name("outports").displayName("outports").description("outports string, seperated by ,.").defaultValue("").required(true) - //descriptor = outports :: descriptor + descriptor } @@ -50,6 +48,6 @@ class Route extends ConfigurableStop{ } override def getGroup(): List[String] = { - List(StopGroup.CommonGroup.toString) + List(StopGroup.CommonGroup) } } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/SelectField.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/SelectField.scala index 71ea4a2..78b0a60 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/SelectField.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/SelectField.scala @@ -12,16 +12,16 @@ import scala.beans.BeanProperty class SelectField extends ConfigurableStop { val authorEmail: String = "xjzhu@cnic.cn" - val description: String = "Select data field" - val inportList: List[String] = List(Port.DefaultPort.toString) - val outportList: List[String] = List(Port.DefaultPort.toString) + val description: String = "Select data column" + val inportList: List[String] = List(Port.DefaultPort) + val outportList: List[String] = List(Port.DefaultPort) - var fields:String = _ + var columnNames:String = _ def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { val df = in.read() - val field = fields.split(",") + val field = columnNames.split(",") val columnArray : Array[Column] = new Array[Column](field.size) for(i <- 0 to field.size - 1){ columnArray(i) = new Column(field(i)) @@ -36,17 +36,18 @@ class SelectField extends ConfigurableStop { } def setProperties(map : Map[String, Any]): Unit = { - fields = MapUtil.get(map,"fields").asInstanceOf[String] + columnNames = MapUtil.get(map,"columnNames").asInstanceOf[String] } override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() val inports = new PropertyDescriptor() - .name("fields") - .displayName("Fields") - .description("The fields you want to select") + .name("columnNames") + .displayName("ColumnNames") + .description("Select the column you want,multiple columns separated by commas") .defaultValue("") .required(true) + .example("id,name") descriptor = inports :: descriptor descriptor } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Subtract.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Subtract.scala index 0924aaa..1f99f91 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Subtract.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Subtract.scala @@ -10,9 +10,9 @@ import org.apache.spark.sql.{DataFrame, Row, SparkSession} class Subtract extends ConfigurableStop{ override val authorEmail: String = "yangqidong@cnic.cn" - override val description: String = "Delete the data existing in the right table from the left table" - override val inportList: List[String] =List(Port.LeftPort.toString,Port.RightPort.toString) - override val outportList: List[String] = List(Port.DefaultPort.toString) + override val description: String = "Delete the existing data in the right table from the left table" + override val inportList: List[String] =List(Port.LeftPort,Port.RightPort) + override val outportList: List[String] = List(Port.DefaultPort) override def setProperties(map: Map[String, Any]): Unit = { } @@ -28,7 +28,7 @@ class Subtract extends ConfigurableStop{ } override def getGroup(): List[String] = { - List(StopGroup.CommonGroup.toString) + List(StopGroup.CommonGroup) } override def initialize(ctx: ProcessContext): Unit = { diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Trager.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Trager.scala index 434f755..cbf7bae 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Trager.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Trager.scala @@ -9,8 +9,8 @@ class Trager extends ConfigurableStop{ val authorEmail: String = "ygang@cnic.cn" val description: String = "Upstream and downstream middleware" - val inportList: List[String] = List(Port.AnyPort.toString) - val outportList: List[String] = List(Port.AnyPort.toString) + val inportList: List[String] = List(Port.AnyPort) + val outportList: List[String] = List(Port.AnyPort) def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { @@ -35,7 +35,7 @@ class Trager extends ConfigurableStop{ } override def getGroup(): List[String] = { - List(StopGroup.CommonGroup.toString) + List(StopGroup.CommonGroup) } } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvParser.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvParser.scala index a945242..cdf2e8b 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvParser.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvParser.scala @@ -12,8 +12,8 @@ class CsvParser extends ConfigurableStop{ val authorEmail: String = "xjzhu@cnic.cn" val description: String = "Parse csv file" - val inportList: List[String] = List(Port.NonePort.toString) - val outportList: List[String] = List(Port.DefaultPort.toString) + val inportList: List[String] = List(Port.DefaultPort) + val outportList: List[String] = List(Port.DefaultPort) var csvPath: String = _ var header: Boolean = _ @@ -72,7 +72,7 @@ class CsvParser extends ConfigurableStop{ .description("The path of csv file") .defaultValue("") .required(true) - .example("") + .example("hdfs://192.168.3.138:8020/test/") descriptor = csvPath :: descriptor val header = new PropertyDescriptor() @@ -82,7 +82,7 @@ class CsvParser extends ConfigurableStop{ .defaultValue("false") .allowableValues(Set("true","false")) .required(true) - .example("") + .example("true") descriptor = header :: descriptor val delimiter = new PropertyDescriptor() @@ -91,7 +91,7 @@ class CsvParser extends ConfigurableStop{ .description("The delimiter of csv file") .defaultValue("") .required(true) - .example("") + .example(",") descriptor = delimiter :: descriptor val schema = new PropertyDescriptor() @@ -100,7 +100,7 @@ class CsvParser extends ConfigurableStop{ .description("The schema of csv file") .defaultValue("") .required(false) - .example("") + .example("id,name,gender,age") descriptor = schema :: descriptor descriptor diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvSave.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvSave.scala index efec529..50195d8 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvSave.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvSave.scala @@ -8,9 +8,9 @@ import org.apache.spark.sql.SaveMode class CsvSave extends ConfigurableStop{ val authorEmail: String = "xjzhu@cnic.cn" - val description: String = "Save data into csv file." - val inportList: List[String] = List(Port.DefaultPort.toString) - val outportList: List[String] = List(Port.DefaultPort.toString) + val description: String = "Save the data as a csv file." + val inportList: List[String] = List(Port.DefaultPort) + val outportList: List[String] = List(Port.DefaultPort) var csvSavePath: String = _ var header: Boolean = _ @@ -33,7 +33,7 @@ class CsvSave extends ConfigurableStop{ .description("The save path of csv file") .defaultValue("") .required(true) - .example("") + .example("hdfs://192.168.3.138:8020/test/") descriptor = csvSavePath :: descriptor val header = new PropertyDescriptor() @@ -42,6 +42,7 @@ class CsvSave extends ConfigurableStop{ .description("Whether the csv file has a header") .defaultValue("") .required(true) + .example("true") descriptor = header :: descriptor val delimiter = new PropertyDescriptor() @@ -50,16 +51,16 @@ class CsvSave extends ConfigurableStop{ .description("The delimiter of csv file") .defaultValue(",") .required(true) - .example("") + .example(",") descriptor = delimiter :: descriptor val partition = new PropertyDescriptor() .name("partition") .displayName("Partition") .description("The partition of csv file") - .defaultValue("1") + .defaultValue("") .required(true) - .example("") + .example("3") descriptor = partition :: descriptor descriptor @@ -79,6 +80,7 @@ class CsvSave extends ConfigurableStop{ override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { val df = in.read() + df.repartition(partition.toInt).write .format("csv") .mode(SaveMode.Overwrite) diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvStringParser.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvStringParser.scala index 1957478..2525469 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvStringParser.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvStringParser.scala @@ -11,12 +11,12 @@ import org.apache.spark.sql.{DataFrame, Row, SparkSession} class CsvStringParser extends ConfigurableStop{ override val authorEmail: String = "yangqidong@cnic.cn" - val inportList: List[String] = List(Port.NonePort.toString) - val outportList: List[String] = List(Port.DefaultPort.toString) + val inportList: List[String] = List(Port.DefaultPort) + val outportList: List[String] = List(Port.DefaultPort) override val description: String = "Parse csv string" - var Str:String=_ + var str:String=_ var delimiter: String = _ var schema: String = _ @@ -25,7 +25,7 @@ class CsvStringParser extends ConfigurableStop{ val session: SparkSession = pec.get[SparkSession] val context: SparkContext = session.sparkContext - val arrStr: Array[String] = Str.split("\n") + val arrStr: Array[String] = str.split("\n") var num:Int=0 val listROW: List[Row] = arrStr.map(line => { @@ -50,7 +50,7 @@ class CsvStringParser extends ConfigurableStop{ override def setProperties(map: Map[String, Any]): Unit = { - Str = MapUtil.get(map,"Str").asInstanceOf[String] + str = MapUtil.get(map,"String").asInstanceOf[String] delimiter = MapUtil.get(map,"delimiter").asInstanceOf[String] schema = MapUtil.get(map,"schema").asInstanceOf[String] } @@ -58,17 +58,17 @@ class CsvStringParser extends ConfigurableStop{ override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - val Str = new PropertyDescriptor() + val str = new PropertyDescriptor() .name("Str") .displayName("Str") .defaultValue("") .required(true) .example("") - descriptor = Str :: descriptor + descriptor = str :: descriptor val delimiter = new PropertyDescriptor() .name("delimiter") - .displayName("delimiter") + .displayName("Delimiter") .description("The delimiter of CSV string") .defaultValue("") .required(true) @@ -77,7 +77,7 @@ class CsvStringParser extends ConfigurableStop{ val schema = new PropertyDescriptor() .name("schema") - .displayName("schema") + .displayName("Schema") .description("The schema of CSV string") .defaultValue("") .required(false) diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/es/FetchEs.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/es/FetchElasticsearch.scala similarity index 94% rename from piflow-bundle/src/main/scala/cn/piflow/bundle/es/FetchEs.scala rename to piflow-bundle/src/main/scala/cn/piflow/bundle/es/FetchElasticsearch.scala index e1db5d7..7b535e0 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/es/FetchEs.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/es/FetchElasticsearch.scala @@ -7,12 +7,12 @@ import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} import org.apache.spark.sql.SparkSession -class FetchEs extends ConfigurableStop { +class FetchElasticsearch extends ConfigurableStop { val authorEmail: String = "ygang@cnic.cn" val description: String = "Fetch data from Elasticsearch" - val inportList: List[String] = List(Port.NonePort.toString) - val outportList: List[String] = List(Port.DefaultPort.toString) + val inportList: List[String] = List(Port.DefaultPort) + val outportList: List[String] = List(Port.DefaultPort) var es_nodes : String = _ var es_port : String = _ diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/es/PutEs.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/es/PutElasticsearch.scala similarity index 95% rename from piflow-bundle/src/main/scala/cn/piflow/bundle/es/PutEs.scala rename to piflow-bundle/src/main/scala/cn/piflow/bundle/es/PutElasticsearch.scala index 58e0539..849a8b7 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/es/PutEs.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/es/PutElasticsearch.scala @@ -7,12 +7,12 @@ import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} import org.apache.spark.sql.SparkSession import org.elasticsearch.spark.sql.EsSparkSQL -class PutEs extends ConfigurableStop { +class PutElasticsearch extends ConfigurableStop { val authorEmail: String = "ygang@cnic.cn" val description: String = "Put data into Elasticsearch" - val inportList: List[String] = List(Port.DefaultPort.toString) - val outportList: List[String] = List(Port.NonePort.toString) + val inportList: List[String] = List(Port.DefaultPort) + val outportList: List[String] = List(Port.DefaultPort) var es_nodes : String = _ var es_port : String = _ @@ -20,7 +20,6 @@ class PutEs extends ConfigurableStop { var es_type : String = _ var configuration_item:String = _ - def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { val spark = pec.get[SparkSession]() val inDf = in.read() diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/es/QueryEs.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/es/QueryElasticsearch.scala similarity index 94% rename from piflow-bundle/src/main/scala/cn/piflow/bundle/es/QueryEs.scala rename to piflow-bundle/src/main/scala/cn/piflow/bundle/es/QueryElasticsearch.scala index d2c1a22..e3109cb 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/es/QueryEs.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/es/QueryElasticsearch.scala @@ -6,12 +6,12 @@ import cn.piflow.conf.{ConfigurableStop, Port, StopGroup} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} import org.apache.spark.sql.SparkSession -class QueryEs extends ConfigurableStop { +class QueryElasticsearch extends ConfigurableStop { val authorEmail: String = "ygang@cnic.cn" val description: String = "Query data from Elasticsearch" - val inportList: List[String] = List(Port.NonePort.toString) - val outportList: List[String] = List(Port.DefaultPort.toString) + val inportList: List[String] = List(Port.NonePort) + val outportList: List[String] = List(Port.DefaultPort) var es_nodes : String = _ var es_port : String = _ diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/file/RegexTextProcess.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/file/RegexTextProcess.scala index f252538..ff4f0f5 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/file/RegexTextProcess.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/file/RegexTextProcess.scala @@ -6,15 +6,13 @@ import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} import cn.piflow.conf._ import org.apache.spark.sql.SparkSession - - class RegexTextProcess extends ConfigurableStop{ val authorEmail: String = "06whuxx@163.com" - val description: String = "Use regex to replace text" - val inportList: List[String] = List(Port.DefaultPort.toString) - val outportList: List[String] = List(Port.DefaultPort.toString) + val description: String = "Replace values in a column with regex" + val inportList: List[String] = List(Port.DefaultPort) + val outportList: List[String] = List(Port.DefaultPort) - var regex:String =_ + var regex:String =_ var columnName:String=_ var replaceStr:String=_ @@ -28,11 +26,9 @@ class RegexTextProcess extends ConfigurableStop{ sqlContext.udf.register("regexPro",(str:String)=>str.replaceAll(regexText,replaceText)) val sqlText:String="select *,regexPro("+columnName+") as "+columnName+"_new from thesis" val dfNew=sqlContext.sql(sqlText) - //dfNew.show() out.write(dfNew) } - def initialize(ctx: ProcessContext): Unit = { } @@ -47,31 +43,31 @@ class RegexTextProcess extends ConfigurableStop{ override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - val regex = new PropertyDescriptor().name("regex") + val regex = new PropertyDescriptor() + .name("regex") .displayName("Regex") .description("regex") .defaultValue("") .required(true) - .example("") + .example("0001") + descriptor = regex :: descriptor val columnName = new PropertyDescriptor() .name("columnName") .displayName("ColumnName") - .description("Field name of schema") + .description("The columns you want to replace") .defaultValue("") .required(true) - .example("") + .example("id") + descriptor = columnName :: descriptor val replaceStr = new PropertyDescriptor() .name("replaceStr") .displayName("ReplaceStr") - .description("Replaced string") + .description("Value after replacement") .defaultValue("") .required(true) - .example("") - - descriptor = regex :: descriptor - descriptor = columnName :: descriptor + .example("1111") descriptor = replaceStr :: descriptor descriptor } @@ -81,6 +77,6 @@ class RegexTextProcess extends ConfigurableStop{ } override def getGroup(): List[String] = { - List(StopGroup.FileGroup.toString) + List(StopGroup.FileGroup) } }