YangQiDong

This commit is contained in:
yanfqidong0604 2019-01-18 16:13:22 +08:00
parent c2ae0447d7
commit d027f5e589
6 changed files with 2 additions and 32 deletions

View File

@ -55,10 +55,6 @@ class SelectImpala extends ConfigurableStop{
val rdd: RDD[Row] = session.sparkContext.makeRDD(rows)
val df: DataFrame = session.createDataFrame(rdd,schema)
//println("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@")
//df.show(20)
//println("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@")
out.write(df)
}

View File

@ -91,9 +91,6 @@ class spider extends ConfigurableStop{
val fields: Array[StructField] = keySet.toArray.map(d=>StructField(d,StringType,nullable = true))
val schema: StructType = StructType(fields)
val df: DataFrame = session.createDataFrame(rowRDD,schema)
//println("%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%")
//df.show(10)
//println("%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%")
out.write(df)
}

View File

@ -131,10 +131,6 @@ class JdbcReadFromOracle extends ConfigurableStop{
val rdd: RDD[Row] = session.sparkContext.makeRDD(rows)
val df: DataFrame = session.createDataFrame(rdd,schema)
//println("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@")
//df.show(20)
//println("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@")
out.write(df)

View File

@ -4,7 +4,6 @@ import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import org.apache.spark.sql
import org.apache.spark.sql.{DataFrame, SparkSession}
class EvaluateJsonPath extends ConfigurableStop{
@ -35,8 +34,6 @@ class EvaluateJsonPath extends ConfigurableStop{
}
}
//FinalDF.printSchema()
//FinalDF.show(10)
out.write(jsonDF)
}

View File

@ -15,7 +15,6 @@ import org.apache.spark.sql._
import scala.collection.mutable.ArrayBuffer
import scala.util.control.Breaks.{break, breakable}
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.functions._
@ -25,11 +24,9 @@ class FolderJsonParser extends ConfigurableStop{
val outportList: List[String] = List(PortEnum.DefaultPort.toString)
override val description: String ="parser json folder"
var FolderPath:String = _
var tag : String = _
var openArrField:String=""
var ArrSchame:String=""
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
@ -44,11 +41,6 @@ class FolderJsonParser extends ConfigurableStop{
FinalDF=writeDF
}
//println("##########################################################################")
//FinalDF.printSchema()
//FinalDF.show(20)
//println("##########################################################################")
out.write(FinalDF)
}
@ -75,7 +67,6 @@ class FolderJsonParser extends ConfigurableStop{
for(d <- index+1 until(arrPath.length)){
if(getDf(arrPath(d),ss).count()!=0){
val df1: DataFrame = ss.read.option("multiline","true").json(arrPath(d))
// df1.printSchema()
val df2: DataFrame = df.union(df1).toDF()
df=df2
}

View File

@ -11,17 +11,14 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.util.control.Breaks.{break, breakable}
class MultiFolderJsonParser extends ConfigurableStop{
val authorEmail: String = "yangqidong@cnic.cn"
val authorEmail: String = "yangqidong@cnic.cn"
val inportList: List[String] = List(PortEnum.NonePort.toString)
val outportList: List[String] = List(PortEnum.DefaultPort.toString)
val description: String = "Analysis of multiple JSON folders"
val description: String = "Analysis of multiple JSON folders"
var jsonPathes: String = _
var tag : String = _
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val ss = pec.get[SparkSession]()
@ -42,7 +39,6 @@ class MultiFolderJsonParser extends ConfigurableStop{
for(d <- index+1 until(arrPath.length)){
if(ss.read.json(arrPath(d)).count()!=0){
val df1: DataFrame = ss.read.option("multiline","true").json(arrPath(d))
// df1.printSchema()
val df2: DataFrame = FinalDF.union(df1).toDF()
FinalDF=df2
}
@ -53,9 +49,6 @@ class MultiFolderJsonParser extends ConfigurableStop{
FinalDF=writeDF
}
//FinalDF.show(10)
out.write(FinalDF)
}