文件夹中所有得.xml文件处理

This commit is contained in:
lj044500 2018-09-04 10:40:06 +08:00
parent 508f0639a9
commit 1250dee09f
1 changed files with 107 additions and 0 deletions

View File

@ -0,0 +1,107 @@
package cn.piflow.bundle.xml
import java.net.URI
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.MapUtil
import cn.piflow.conf.{ConfigurableStop, StopGroup, XmlGroup}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.collection.mutable.ArrayBuffer
import scala.util.control.Breaks._
/**
* Created by admin on 2018/8/27.
*/
class FolderXmlParser extends ConfigurableStop{
override val inportCount: Int = -1
override val outportCount: Int = 1
var rowTag:String = _
var xmlpath:String = _
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
val pathArr = getFileName(xmlpath)
val xmlDF = getResDf(pathArr,spark)
xmlDF.show(30)
out.write(xmlDF)
}
override def setProperties(map: Map[String, Any]): Unit = {
xmlpath = MapUtil.get(map,"xmlpath").asInstanceOf[String]
rowTag = MapUtil.get(map,"rowTag").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val folederXmlStop = new PropertyDescriptor().name("folederXmlStop").displayName("FolederXmlStop").defaultValue("").required(true)
descriptor = folederXmlStop :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = ???
override def getGroup(): StopGroup = {
XmlGroup
}
override def initialize(ctx: ProcessContext): Unit ={}
//获取.xml所有文件路径
def getFileName(path:String):ArrayBuffer[String]={
var arr = ArrayBuffer[String]()
val conf = new Configuration()
val fs = FileSystem.get(URI.create(path), conf)
val statuses = fs.listStatus(new Path(path))
for (i <- statuses) {
if(!i.isDirectory){
if (i.getPath().getName.endsWith(".xml")) {
arr+=i.getPath.toString
}
}else{
val arr1 = getFileName(i.getPath.toString)
arr=arr++(arr1)
}
}
arr
}
//获取每个xml得dataframe
def getDf(path:String,sparkSession: SparkSession):DataFrame={
val df = sparkSession.read.format("com.databricks.spark.xml")
.option("rowTag", "phdthesis")
.option("treatEmptyValuesAsNulls", true)
.load(path)
df
}
//获取文件夹最终得dataframe
def getResDf(pathArr:ArrayBuffer[String],spark:SparkSession):DataFrame={
var index =0
breakable{
for(i <- 0 until pathArr.length){
if(getDf(pathArr(i),spark).count()!=0){
index=i
break
}
}
}
var df = spark.read.format("com.databricks.spark.xml")
.option("rowTag", "phdthesis")
.option("treatEmptyValuesAsNulls", true)
.load(pathArr(index))
for(d <- index+1 until(pathArr.length)){
if(getDf(pathArr(d),spark).count()!=0){
val df1 = spark.read.format("com.databricks.spark.xml")
.option("rowTag", "phdthesis")
.option("treatEmptyValuesAsNulls", true)
.load(pathArr(d))
df = df.union(df1)
}
}
df
}
}