forked from opensci/piflow
modification stop(Multiple fields can be parsed)
This commit is contained in:
parent
bcb919f897
commit
049388a8e7
|
@ -16,20 +16,41 @@ class XmlParserWithJson extends ConfigurableStop {
|
|||
val inportList: List[String] = List(PortEnum.DefaultPort.toString)
|
||||
val outportList: List[String] = List(PortEnum.DefaultPort.toString)
|
||||
|
||||
var xmlColumn:String = _
|
||||
|
||||
var xmlColumns:String = _
|
||||
|
||||
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
|
||||
|
||||
val spark = pec.get[SparkSession]()
|
||||
val inDf = in.read().select(xmlColumn)
|
||||
val df = in.read()
|
||||
|
||||
val rdd: RDD[String] = inDf.rdd.map(x => {
|
||||
XmlToJson.xmlParse(x.get(0).toString).replace("\n", "")
|
||||
spark.sqlContext.udf.register("xmlToJson",(str:String)=>{
|
||||
XmlToJson.xmlParse(str)
|
||||
})
|
||||
val xmlDF: DataFrame = spark.read.json(rdd)
|
||||
val columns: Array[String] = xmlColumns.split(",")
|
||||
|
||||
val fields: Array[String] = df.schema.fieldNames
|
||||
var fieldString = new StringBuilder
|
||||
fields.foreach(x=>{
|
||||
if (columns.contains(x)){
|
||||
fieldString.append(s"xmlToJson(${x}) as ${x} ,")
|
||||
} else {
|
||||
fieldString.append(s"${x},")
|
||||
}
|
||||
})
|
||||
|
||||
df.createOrReplaceTempView("temp")
|
||||
val sqlText = "select " +fieldString.stripSuffix(",")+ " from temp"
|
||||
val frame: DataFrame = spark.sql(sqlText)
|
||||
|
||||
val rdd: RDD[String] = frame.toJSON.rdd.map(x => {
|
||||
x.toString().replace("\\n", "").replace("}\"", "}").replace(":\"{", ":{").replace("\\","")
|
||||
})
|
||||
|
||||
val outDF: DataFrame = spark.read.json(rdd)
|
||||
outDF.printSchema()
|
||||
|
||||
out.write(outDF)
|
||||
|
||||
out.write(xmlDF)
|
||||
}
|
||||
|
||||
def initialize(ctx: ProcessContext): Unit = {
|
||||
|
@ -37,13 +58,13 @@ class XmlParserWithJson extends ConfigurableStop {
|
|||
}
|
||||
|
||||
def setProperties(map : Map[String, Any]) = {
|
||||
xmlColumn = MapUtil.get(map,"xmlColumn").asInstanceOf[String]
|
||||
xmlColumns = MapUtil.get(map,"xmlColumns").asInstanceOf[String]
|
||||
}
|
||||
|
||||
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
|
||||
var descriptor : List[PropertyDescriptor] = List()
|
||||
val xmlColumn = new PropertyDescriptor().name("xmlColumn").displayName("xmlColumn").description("the Column Contains XML String").defaultValue("").required(true)
|
||||
descriptor = xmlColumn :: descriptor
|
||||
val xmlColumns = new PropertyDescriptor().name("xmlColumns").displayName("xmlColumns").description("you want to parse contains XML fields ,Multiple are separated by commas").defaultValue("").required(true)
|
||||
descriptor = xmlColumns :: descriptor
|
||||
descriptor
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue