For JSON file read, folder read and file merge

QiDong Yang
This commit is contained in:
yanfqidong0604 2018-10-18 14:22:57 +08:00
parent c696d1366a
commit a19e66e3da
8 changed files with 350 additions and 20 deletions

View File

@ -5,13 +5,16 @@
"stops":[
{
"uuid":"1111",
"name":"FolderJsonParser",
"bundle":"cn.piflow.bundle.json.FolderJsonParser",
"properties":{
"FolderPath":"hdfs://10.0.86.89:9000/aYQDJson/",
"tag":"students"
"tag":"students_student"
}
},
{

View File

@ -0,0 +1,37 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"1111",
"name":"MultiFolderJsonParser",
"bundle":"cn.piflow.bundle.json.MultiFolderJsonParser",
"properties":{
"jsonPathes":"hdfs://10.0.86.89:9000/aYQDJson/student.json;hdfs://10.0.86.89:9000/aYQDJson/student01.json;hdfs://10.0.86.89:9000/aYQDJson/student02.json",
"tag":"students_student"
}
},
{
"uuid":"1111",
"name":"JsonSave",
"bundle":"cn.piflow.bundle.json.JsonSave",
"properties":{
"jsonSavePath":"hdfs://10.0.86.89:9000/xjzhu/yqd02.json"
}
}
],
"paths":[
{
"from":"MultiFolderJsonParser",
"outport":"",
"inport":"",
"to":"JsonSave"
}
]
}
}

View File

@ -3,16 +3,21 @@ package cn.piflow.bundle.json
import java.net.URI
import cn.piflow.bundle.util.JsonUtil
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroupEnum}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, FileUtil, Path}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql._
import scala.collection.mutable.ArrayBuffer
import scala.util.control.Breaks.{break, breakable}
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.functions._
class FolderJsonParser extends ConfigurableStop{
override val authorEmail: String = "yangqidong@cnic.cn"
@ -24,21 +29,37 @@ class FolderJsonParser extends ConfigurableStop{
var FolderPath:String = _
var tag : String = _
var openArrField:String=""
var ArrSchame:String=""
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
val sql: SQLContext = spark.sqlContext
val arrPath: ArrayBuffer[String] = getFileName(FolderPath)
val FinalDF = getFinalDF(arrPath,spark)
var FinalDF: DataFrame = getFinalDF(arrPath,sql)
if(tag.length>0){
val writeDF: DataFrame = JsonUtil.ParserJsonDF(FinalDF,tag)
FinalDF=writeDF
}
println("##########################################################################")
FinalDF.printSchema()
FinalDF.show(20)
println("##########################################################################")
out.write(FinalDF)
}
def getDf(Path: String,ss:SparkSession): DataFrame ={
//根据路径获取df
def getDf(Path: String,ss:SQLContext): DataFrame ={
val frame: DataFrame = ss.read.json(Path)
frame
}
def getFinalDF(arrPath: ArrayBuffer[String],ss:SparkSession): DataFrame = {
//遍历路径获取总df
def getFinalDF(arrPath: ArrayBuffer[String],ss:SQLContext): DataFrame = {
var index: Int = 0
breakable {
for (i <- 0 until arrPath.length) {
@ -50,12 +71,13 @@ class FolderJsonParser extends ConfigurableStop{
}
val df01 = ss.read.option("multiline","true").json(arrPath(index))
var df: DataFrame = df01.select(tag)
df.printSchema()
var aaa:String="name"
var df: DataFrame = df01
for(d <- index+1 until(arrPath.length)){
if(getDf(arrPath(d),ss).count()!=0){
val df1: DataFrame = ss.read.option("multiline","true").json(arrPath(d)).select(tag)
df1.printSchema()
val df1: DataFrame = ss.read.option("multiline","true").json(arrPath(d))
// df1.printSchema()
val df2: DataFrame = df.union(df1).toDF()
df=df2
}
@ -64,7 +86,7 @@ class FolderJsonParser extends ConfigurableStop{
}
//获取.xml所有文件路径
//获取.json所有文件路径
def getFileName(path:String):ArrayBuffer[String]={
val conf: Configuration = new Configuration()
val hdfs: FileSystem = FileSystem.get(URI.create(path),conf)
@ -82,6 +104,7 @@ class FolderJsonParser extends ConfigurableStop{
override def setProperties(map: Map[String, Any]): Unit = {
FolderPath = MapUtil.get(map,"FolderPath").asInstanceOf[String]
tag = MapUtil.get(map,"tag").asInstanceOf[String]
}
@ -90,8 +113,10 @@ class FolderJsonParser extends ConfigurableStop{
var descriptor : List[PropertyDescriptor] = List()
val FolderPath = new PropertyDescriptor().name("FolderPath").displayName("FolderPath").description("The path of the json folder").defaultValue("").required(true)
descriptor = FolderPath :: descriptor
val tag = new PropertyDescriptor().name("tag").displayName("tag").description("The tag you want to parse").defaultValue("").required(true)
val tag = new PropertyDescriptor().name("tag").displayName("tag").description("The tag you want to parse,If you want to open an array field,you have to write it like this:links_name(MasterField_ChildField)").defaultValue("").required(false)
descriptor = tag :: descriptor
descriptor
}

View File

@ -1,10 +1,11 @@
package cn.piflow.bundle.json
import cn.piflow._
import cn.piflow.bundle.util.JsonUtil
import cn.piflow.conf._
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.beans.BeanProperty
@ -22,10 +23,15 @@ class JsonParser extends ConfigurableStop{
val spark = pec.get[SparkSession]()
val jsonDF = spark.read.option("multiline","true").json(jsonPath)
val jsonDFNew = jsonDF.select(tag)
jsonDFNew.printSchema()
jsonDFNew.show(10)
var jsonDF = spark.read.json(jsonPath)
if(tag.length>0){
val writeDF: DataFrame = JsonUtil.ParserJsonDF(jsonDF,tag)
jsonDF=writeDF
}
jsonDF.printSchema()
jsonDF.show(10)
out.write(jsonDF)
}
@ -41,7 +47,7 @@ class JsonParser extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val jsonPath = new PropertyDescriptor().name("jsonPath").displayName("jsonPath").description("The path of the json file").defaultValue("").required(true)
val tag=new PropertyDescriptor().name("tag").displayName("tag").description("The tag you want to parse").defaultValue("").required(true)
val tag = new PropertyDescriptor().name("tag").displayName("tag").description("The tag you want to parse,If you want to open an array field,you have to write it like this:links_name(MasterField_ChildField)").defaultValue("").required(false)
descriptor = jsonPath :: descriptor
descriptor = tag :: descriptor
descriptor

View File

@ -0,0 +1,86 @@
package cn.piflow.bundle.json
import cn.piflow.bundle.util.JsonUtil
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroupEnum}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import org.apache.spark.sql
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.util.control.Breaks.{break, breakable}
class MultiFolderJsonParser extends ConfigurableStop{
val authorEmail: String = "yangqidong@cnic.cn"
val inportList: List[String] = List(PortEnum.NonePort.toString)
val outportList: List[String] = List(PortEnum.DefaultPort.toString)
val description: String = "Merge JSON files"
var jsonPathes: String = _
var tag : String = _
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val ss = pec.get[SparkSession]()
val arrPath: Array[String] = jsonPathes.split(";")
var index: Int = 0
breakable {
for (i <- 0 until arrPath.length) {
if (ss.read.json(arrPath(i)).count() != 0) {
index = i
break
}
}
}
var FinalDF = ss.read.option("multiline","true").json(arrPath(index))
for(d <- index+1 until(arrPath.length)){
if(ss.read.json(arrPath(d)).count()!=0){
val df1: DataFrame = ss.read.option("multiline","true").json(arrPath(d))
// df1.printSchema()
val df2: DataFrame = FinalDF.union(df1).toDF()
FinalDF=df2
}
}
if(tag.length>0){
val writeDF: DataFrame = JsonUtil.ParserJsonDF(FinalDF,tag)
FinalDF=writeDF
}
FinalDF.show(10)
out.write(FinalDF)
}
override def setProperties(map: Map[String, Any]): Unit = {
jsonPathes = MapUtil.get(map,"jsonPathes").asInstanceOf[String]
tag = MapUtil.get(map,"tag").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val jsonPathes = new PropertyDescriptor().name("jsonPathes").displayName("jsonPathes").description("The path of the json file,the delimiter is ;").defaultValue("").required(true)
val tag = new PropertyDescriptor().name("tag").displayName("tag").description("The tag you want to parse,If you want to open an array field,you have to write it like this:links_name(MasterField_ChildField)").defaultValue("").required(false)
descriptor = jsonPathes :: descriptor
descriptor = tag :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("./src/main/resources/selectHiveQL.jpg")
}
override def getGroup(): List[String] = {
List(StopGroupEnum.JsonGroup.toString)
}
override def initialize(ctx: ProcessContext): Unit = {
}
}

View File

@ -0,0 +1,82 @@
package cn.piflow.bundle.util
import org.apache.spark.sql.functions.explode
import org.apache.spark.sql.{Column, DataFrame, SQLContext, SparkSession}
import scala.collection.mutable.ArrayBuffer
object JsonUtil extends Serializable{
// The tag you want to parse,If you want to open an array field,you have to write it like this:links_name(MasterField_ChildField)
def ParserJsonDF(df:DataFrame,tag:String): DataFrame = {
var openArrField:String=""
var ArrSchame:String=""
var tagARR: Array[String] = tag.split(",")
var tagNew:String=""
for(tt<-tagARR){
if(tt.indexOf("_")> -1){
//包含.
val openField: Array[String] = tt.split("_")
openArrField=openField(0)
ArrSchame+=(openField(1)+",")
}else{
tagNew+=(tt+",")
}
}
tagNew+=openArrField
ArrSchame=ArrSchame.substring(0,ArrSchame.length-1)
tagARR = tagNew.split(",")
var FinalDF:DataFrame=df
//如果用户选择返回字段
var strings: Seq[Column] =tagNew.split(",").toSeq.map(p => new Column(p))
if(tag.length>0){
val df00 = FinalDF.select(strings : _*)
FinalDF=df00
}
//如果用户选择打开的数组字段并给出schame
if(openArrField.length>0&&ArrSchame.length>0){
val schames: Array[String] = ArrSchame.split(",")
var selARR:ArrayBuffer[String]=ArrayBuffer()//分别取出已经打开的字段
//遍历数组封装到column对象中
var coARR:ArrayBuffer[Column]=ArrayBuffer()//打开字段的select方法用
val sss = tagNew.split(",")//打开字段后todf方法用
var co: Column =null
for(each<-tagARR){
if(each==openArrField){
co = explode(FinalDF(openArrField))
for(x<-schames){
selARR+=(openArrField+"."+x)
}
}else{
selARR+=each
co=FinalDF(each)
}
coARR+=co
}
println("###################")
selARR.foreach(println(_))
var selSEQ: Seq[Column] = selARR.toSeq.map(q => new Column(q))
var df01: DataFrame = FinalDF.select(coARR : _*).toDF(sss:_*)
FinalDF = df01.select(selSEQ : _*)
}
FinalDF
}
}

View File

@ -13,6 +13,35 @@ class JsonFolderTest {
@Test
def testFlow(): Unit ={
//测试数据
/* {
"name": "BeJson",
"url": "http://www.bejson.com",
"page": 88,
"isNonProfit": true,
"address": {
"street": "科技园路.",
"city": "江苏苏州",
"country": "中国"
},
"links": [
{
"name": "Google",
"url": "http://www.google.com"
},
{
"name": "Baidu",
"url": "http://www.baidu.com"
},
{
"name": "SoSo",
"url": "http://www.SoSo.com"
}
]
}*/
//parse flow json
val file = "src/main/resources/JsonFolderTest.json"
val flowJsonStr = FileUtil.fileReader(file)
@ -24,7 +53,6 @@ class JsonFolderTest {
val flow = flowBean.constructFlow()
//execute flow
val spark = SparkSession.builder()
.master("spark://10.0.86.89:7077")

View File

@ -0,0 +1,63 @@
package cn.piflow.bundle
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import org.apache.spark.sql.SparkSession
import org.junit.Test
import scala.util.parsing.json.JSON
class MultiFolderJsonParserTest {
@Test
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/MultiFolderJsonParser.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
//execute flow
val spark = SparkSession.builder()
.master("spark://10.0.86.89:7077")
.appName("piflow-hive-bundle")
.config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g")
.config("spark.cores.max", "2")
.config("spark.jars","/root/Desktop/gitWORK/out/artifacts/piflow_bundle/piflow_bundle.jar")
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "hdfs://10.0.86.89:9000/xjzhu/piflow/checkpoints/")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
@Test
def testFlow2json() = {
//parse flow json
val file = "src/main/resources/flow.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
//create flow
val flowBean = FlowBean(map)
val flowJson = flowBean.toJson()
println(flowJson)
}
}