clean bug

This commit is contained in:
SongDz 2019-08-28 13:42:17 +08:00
parent d3887c170b
commit c399533b91
4 changed files with 125 additions and 13 deletions

View File

@ -8,6 +8,7 @@ import cn.piflow.conf._
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructField
class EmailClean extends ConfigurableStop{
val authorEmail: String = "06whuxx@163.com"
@ -23,10 +24,37 @@ class EmailClean extends ConfigurableStop{
val dfOld = in.read()
dfOld.createOrReplaceTempView("thesis")
sqlContext.udf.register("regexPro",(str:String)=>CleanUtil.processEmail(str))
val sqlText:String="select *,regexPro("+columnName+") as "+columnName+"_new from thesis"
val structFields: Array[String] = dfOld.schema.fieldNames
val columnNames = columnName.split(",").toSet
val sqlNewFieldStr = new StringBuilder
columnNames.foreach(c=>{
if (columnNames.contains(c)) {
sqlNewFieldStr ++= ",regexPro("
sqlNewFieldStr ++= c
sqlNewFieldStr ++= ") as "
sqlNewFieldStr ++= c
sqlNewFieldStr ++= "_new "
}
})
val sqlText:String="select * " + sqlNewFieldStr + " from thesis"
val dfNew=sqlContext.sql(sqlText)
dfNew.createOrReplaceTempView("thesis")
val schemaStr = new StringBuilder
structFields.foreach(field => {
schemaStr ++= field
if (columnNames.contains(field)) {
schemaStr ++= "_new as "
schemaStr ++= field
}
schemaStr ++= ","
})
val sqlText1:String = "select " + schemaStr.substring(0,schemaStr.length -1) + " from thesis"
val dfNew1=sqlContext.sql(sqlText1)
//dfNew.show()
out.write(dfNew)
out.write(dfNew1)
}
@ -42,7 +70,7 @@ class EmailClean extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val columnName = new PropertyDescriptor().name("columnName").displayName("COLUMN_NAME").defaultValue("").required(true)
val columnName = new PropertyDescriptor().name("columnName").displayName("COLUMN_NAME").description("The columnName you want to clean,Multiple are separated by commas").defaultValue("").required(true)
descriptor = columnName :: descriptor
descriptor
}

View File

@ -9,7 +9,8 @@ import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.conf._
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.reflect.macros.ParseException
@ -28,10 +29,37 @@ class IdentityNumberClean extends ConfigurableStop{
val dfOld = in.read()
dfOld.createOrReplaceTempView("thesis")
sqlContext.udf.register("regexPro",(str:String)=>CleanUtil.processCardCode(str))
val sqlText:String="select *,regexPro("+columnName+") as "+columnName+"_new from thesis"
val structFields: Array[String] = dfOld.schema.fieldNames
val columnNames = columnName.split(",").toSet
val sqlNewFieldStr = new StringBuilder
columnNames.foreach(c=>{
if (columnNames.contains(c)) {
sqlNewFieldStr ++= ",regexPro("
sqlNewFieldStr ++= c
sqlNewFieldStr ++= ") as "
sqlNewFieldStr ++= c
sqlNewFieldStr ++= "_new "
}
})
val sqlText:String="select * " + sqlNewFieldStr + " from thesis"
val dfNew=sqlContext.sql(sqlText)
dfNew.createOrReplaceTempView("thesis")
val schemaStr = new StringBuilder
structFields.foreach(field => {
schemaStr ++= field
if (columnNames.contains(field)) {
schemaStr ++= "_new as "
schemaStr ++= field
}
schemaStr ++= ","
})
val sqlText1:String = "select " + schemaStr.substring(0,schemaStr.length -1) + " from thesis"
val dfNew1=sqlContext.sql(sqlText1)
//dfNew.show()
out.write(dfNew)
out.write(dfNew1)
}
@ -46,7 +74,7 @@ class IdentityNumberClean extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val columnName = new PropertyDescriptor().name("columnName").displayName("COLUMN_NAME").defaultValue("").required(true)
val columnName = new PropertyDescriptor().name("columnName").displayName("COLUMN_NAME").description("The columnName you want to clean,Multiple are separated by commas").defaultValue("").required(true)
descriptor = columnName :: descriptor
descriptor
}

View File

@ -6,6 +6,7 @@ import cn.piflow.conf._
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructField
class PhoneNumberClean extends ConfigurableStop{
val authorEmail: String = "06whuxx@163.com"
@ -20,10 +21,37 @@ class PhoneNumberClean extends ConfigurableStop{
val dfOld = in.read()
dfOld.createOrReplaceTempView("thesis")
sqlContext.udf.register("regexPro",(str:String)=>CleanUtil.processPhonenum(str))
val sqlText:String="select *,regexPro("+columnName+") as "+columnName+"_new from thesis"
val structFields: Array[String] = dfOld.schema.fieldNames
val columnNames = columnName.split(",").toSet
val sqlNewFieldStr = new StringBuilder
columnNames.foreach(c=>{
if (columnNames.contains(c)) {
sqlNewFieldStr ++= ",regexPro("
sqlNewFieldStr ++= c
sqlNewFieldStr ++= ") as "
sqlNewFieldStr ++= c
sqlNewFieldStr ++= "_new "
}
})
val sqlText:String="select * " + sqlNewFieldStr + " from thesis"
val dfNew=sqlContext.sql(sqlText)
dfNew.createOrReplaceTempView("thesis")
val schemaStr = new StringBuilder
structFields.foreach(field => {
schemaStr ++= field
if (columnNames.contains(field)) {
schemaStr ++= "_new as "
schemaStr ++= field
}
schemaStr ++= ","
})
val sqlText1:String = "select " + schemaStr.substring(0,schemaStr.length -1) + " from thesis"
val dfNew1=sqlContext.sql(sqlText1)
//dfNew.show()
out.write(dfNew)
out.write(dfNew1)
}
@ -40,7 +68,7 @@ class PhoneNumberClean extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val columnName = new PropertyDescriptor().name("columnName").displayName("COLUMN_NAME").defaultValue("").required(true)
val columnName = new PropertyDescriptor().name("columnName").displayName("COLUMN_NAME").description("The columnName you want to clean,Multiple are separated by commas").defaultValue("").required(true)
descriptor = columnName :: descriptor
descriptor
}

View File

@ -6,6 +6,7 @@ import cn.piflow.conf._
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructField
class TitleClean extends ConfigurableStop{
val authorEmail: String = "06whuxx@163.com"
@ -20,10 +21,37 @@ class TitleClean extends ConfigurableStop{
val dfOld = in.read()
dfOld.createOrReplaceTempView("thesis")
sqlContext.udf.register("regexPro",(str:String)=>CleanUtil.processTitle(str))
val sqlText:String="select *,regexPro("+columnName+") as "+columnName+"_new from thesis"
val structFields: Array[String] = dfOld.schema.fieldNames
val columnNames = columnName.split(",").toSet
val sqlNewFieldStr = new StringBuilder
columnNames.foreach(c=>{
if (columnNames.contains(c)) {
sqlNewFieldStr ++= ",regexPro("
sqlNewFieldStr ++= c
sqlNewFieldStr ++= ") as "
sqlNewFieldStr ++= c
sqlNewFieldStr ++= "_new "
}
})
val sqlText:String="select * " + sqlNewFieldStr + " from thesis"
val dfNew=sqlContext.sql(sqlText)
dfNew.createOrReplaceTempView("thesis")
val schemaStr = new StringBuilder
structFields.foreach(field => {
schemaStr ++= field
if (columnNames.contains(field)) {
schemaStr ++= "_new as "
schemaStr ++= field
}
schemaStr ++= ","
})
val sqlText1:String = "select " + schemaStr.substring(0,schemaStr.length -1) + " from thesis"
val dfNew1=sqlContext.sql(sqlText1)
//dfNew.show()
out.write(dfNew)
out.write(dfNew1)
}
@ -38,7 +66,7 @@ class TitleClean extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val columnName = new PropertyDescriptor().name("columnName").displayName("COLUMN_NAME").defaultValue("").required(true)
val columnName = new PropertyDescriptor().name("columnName").displayName("COLUMN_NAME").description("The columnName you want to clean,Multiple are separated by commas").defaultValue("").required(true)
descriptor = columnName :: descriptor
descriptor
}