diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/clean/PhoneNumberClean.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/clean/PhoneNumberClean.scala new file mode 100644 index 0000000..584f958 --- /dev/null +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/clean/PhoneNumberClean.scala @@ -0,0 +1,52 @@ +package cn.piflow.bundle.clean + +import cn.piflow.bundle.util.CleanUtil +import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} +import cn.piflow.conf.{CleanGroup, ConfigurableStop, StopGroup} +import cn.piflow.conf.bean.PropertyDescriptor +import cn.piflow.conf.util.MapUtil +import org.apache.spark.sql.SparkSession + +class PhoneNumberClean extends ConfigurableStop{ + val inportCount: Int = 0 + val outportCount: Int = 1 + var columnName:String=_ + + def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { + val spark = pec.get[SparkSession]() + val sqlContext=spark.sqlContext + val dfOld = in.read() + dfOld.createOrReplaceTempView("thesis") + sqlContext.udf.register("regexPro",(str:String)=>CleanUtil.processPhonenum(str)) + val sqlText:String="select *,regexPro("+columnName+") as "+columnName+"_new from thesis" + val dfNew=sqlContext.sql(sqlText) + dfNew.show() + out.write(dfNew) + } + + + + def initialize(ctx: ProcessContext): Unit = { + + } + + + def setProperties(map: Map[String, Any]): Unit = { + columnName=MapUtil.get(map,key="columnName").asInstanceOf[String] + + } + + override def getPropertyDescriptor(): List[PropertyDescriptor] = { + var descriptor : List[PropertyDescriptor] = List() + val columnName = new PropertyDescriptor().name("columnName").displayName("COLUMN_NAME").defaultValue("").required(true) + descriptor = columnName :: descriptor + descriptor + } + + override def getIcon(): Array[Byte] = ??? + + override def getGroup(): StopGroup = { + CleanGroup + } + +}