This commit is contained in:
bao319 2020-03-25 09:06:49 +08:00
parent a2109eb2f6
commit e1f41a82a7
5 changed files with 62 additions and 37 deletions

View File

@ -1,22 +1,20 @@
package cn.piflow.bundle.clean
import java.beans.Transient
import cn.piflow.bundle.util.CleanUtil
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.conf._
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructField
class EmailClean extends ConfigurableStop{
val authorEmail: String = "songdongze@cnic.cn"
val description: String = "Clean email format data."
val inportList: List[String] = List(Port.DefaultPort.toString)
val outportList: List[String] = List(Port.DefaultPort.toString)
var columnName:String=_
var columnName:String= _
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
@ -50,10 +48,9 @@ class EmailClean extends ConfigurableStop{
}
schemaStr ++= ","
})
val sqlText1:String = "select " + schemaStr.substring(0,schemaStr.length -1) + " from thesis"
val dfNew1=sqlContext.sql(sqlText1)
val sqlTextNew:String = "select " + schemaStr.substring(0,schemaStr.length -1) + " from thesis"
val dfNew1=sqlContext.sql(sqlTextNew)
//dfNew.show()
out.write(dfNew1)
}
@ -70,7 +67,14 @@ class EmailClean extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val columnName = new PropertyDescriptor().name("columnName").displayName("COLUMN_NAME").description("The columnName you want to clean,Multiple are separated by commas").defaultValue("").required(true)
val columnName = new PropertyDescriptor()
.name("columnName")
.displayName("Column_Name")
.description("The columnName you want to clean,Multiple are separated by commas")
.defaultValue("")
.required(true)
.example("")
descriptor = columnName :: descriptor
descriptor
}
@ -81,7 +85,7 @@ class EmailClean extends ConfigurableStop{
}
override def getGroup(): List[String] = {
List(StopGroup.CleanGroup.toString)
List(StopGroup.CleanGroup)
}
}

View File

@ -1,27 +1,23 @@
package cn.piflow.bundle.clean
import java.text.SimpleDateFormat
import java.util.regex.Pattern
import java.util.{Calendar, Date}
import cn.piflow.bundle.util.CleanUtil
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.conf._
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.reflect.macros.ParseException
class IdentityNumberClean extends ConfigurableStop{
val authorEmail: String = "06whuxx@163.com"
val description: String = "Clean Id Card data."
val inportList: List[String] = List(Port.DefaultPort.toString)
val outportList: List[String] = List(Port.DefaultPort.toString)
//var regex:String =_
var columnName:String=_
//var replaceStr:String=_
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
@ -55,10 +51,9 @@ class IdentityNumberClean extends ConfigurableStop{
}
schemaStr ++= ","
})
val sqlText1:String = "select " + schemaStr.substring(0,schemaStr.length -1) + " from thesis"
val dfNew1=sqlContext.sql(sqlText1)
val sqlTextNew:String = "select " + schemaStr.substring(0,schemaStr.length -1) + " from thesis"
val dfNew1=sqlContext.sql(sqlTextNew)
//dfNew.show()
out.write(dfNew1)
}
@ -74,7 +69,14 @@ class IdentityNumberClean extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val columnName = new PropertyDescriptor().name("columnName").displayName("COLUMN_NAME").description("The columnName you want to clean,Multiple are separated by commas").defaultValue("").required(true)
val columnName = new PropertyDescriptor()
.name("columnName")
.displayName("Column_Name")
.description("The columnName you want to clean,Multiple are separated by commas")
.defaultValue("")
.required(true)
.example("")
descriptor = columnName :: descriptor
descriptor
}
@ -84,6 +86,6 @@ class IdentityNumberClean extends ConfigurableStop{
}
override def getGroup(): List[String] = {
List(StopGroup.CleanGroup.toString)
List(StopGroup.CleanGroup)
}
}

View File

@ -9,10 +9,12 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructField
class PhoneNumberClean extends ConfigurableStop{
val authorEmail: String = "06whuxx@163.com"
val description: String = "Clean phone number format data."
val inportList: List[String] = List(Port.DefaultPort.toString)
val outportList: List[String] = List(Port.DefaultPort.toString)
var columnName:String=_
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
@ -47,10 +49,9 @@ class PhoneNumberClean extends ConfigurableStop{
}
schemaStr ++= ","
})
val sqlText1:String = "select " + schemaStr.substring(0,schemaStr.length -1) + " from thesis"
val dfNew1=sqlContext.sql(sqlText1)
val sqlTextNew:String = "select " + schemaStr.substring(0,schemaStr.length -1) + " from thesis"
val dfNew1=sqlContext.sql(sqlTextNew)
//dfNew.show()
out.write(dfNew1)
}
@ -68,7 +69,13 @@ class PhoneNumberClean extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val columnName = new PropertyDescriptor().name("columnName").displayName("COLUMN_NAME").description("The columnName you want to clean,Multiple are separated by commas").defaultValue("").required(true)
val columnName = new PropertyDescriptor()
.name("columnName")
.displayName("Column_Name")
.description("The columnName you want to clean,Multiple are separated by commas")
.defaultValue("")
.required(true)
.example("")
descriptor = columnName :: descriptor
descriptor
}
@ -78,7 +85,7 @@ class PhoneNumberClean extends ConfigurableStop{
}
override def getGroup(): List[String] = {
List(StopGroup.CleanGroup.toString)
List(StopGroup.CleanGroup)
}
}

View File

@ -47,10 +47,9 @@ class ProvinceClean extends ConfigurableStop{
}
schemaStr ++= ","
})
val sqlText1:String = "select " + schemaStr.substring(0,schemaStr.length -1) + " from thesis"
val dfNew1=sqlContext.sql(sqlText1)
val sqlTextNew:String = "select " + schemaStr.substring(0,schemaStr.length -1) + " from thesis"
val dfNew1=sqlContext.sql(sqlTextNew)
//dfNew.show()
out.write(dfNew1)
}
@ -67,8 +66,15 @@ class ProvinceClean extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val columnName = new PropertyDescriptor().name("columnName").displayName("COLUMN_NAME").description("The columnName you want to clean,Multiple are separated by commas").defaultValue("").required(true)
val columnName = new PropertyDescriptor()
.name("columnName")
.displayName("Column_Name")
.description("The columnName you want to clean,Multiple are separated by commas")
.defaultValue("")
.required(true)
.example("")
descriptor = columnName :: descriptor
descriptor
}
@ -78,7 +84,7 @@ class ProvinceClean extends ConfigurableStop{
}
override def getGroup(): List[String] = {
List(StopGroup.CleanGroup.toString)
List(StopGroup.CleanGroup)
}
}

View File

@ -13,6 +13,7 @@ class TitleClean extends ConfigurableStop{
val description: String = "Clean title format data."
val inportList: List[String] = List(Port.DefaultPort.toString)
val outportList: List[String] = List(Port.DefaultPort.toString)
var columnName:String=_
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
@ -47,14 +48,12 @@ class TitleClean extends ConfigurableStop{
}
schemaStr ++= ","
})
val sqlText1:String = "select " + schemaStr.substring(0,schemaStr.length -1) + " from thesis"
val dfNew1=sqlContext.sql(sqlText1)
val sqlTextNew:String = "select " + schemaStr.substring(0,schemaStr.length -1) + " from thesis"
val dfNew1=sqlContext.sql(sqlTextNew)
//dfNew.show()
out.write(dfNew1)
}
def initialize(ctx: ProcessContext): Unit = {
}
@ -66,7 +65,14 @@ class TitleClean extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val columnName = new PropertyDescriptor().name("columnName").displayName("COLUMN_NAME").description("The columnName you want to clean,Multiple are separated by commas").defaultValue("").required(true)
val columnName = new PropertyDescriptor()
.name("columnName")
.displayName("Column_Name")
.description("The columnName you want to clean,Multiple are separated by commas")
.defaultValue("")
.required(true)
.example("")
descriptor = columnName :: descriptor
descriptor
}
@ -76,7 +82,7 @@ class TitleClean extends ConfigurableStop{
}
override def getGroup(): List[String] = {
List(StopGroup.CleanGroup.toString)
List(StopGroup.CleanGroup)
}
}