add DataFrameRowParser Stop for ShellExecutor

This commit is contained in:
judy0131 2018-07-18 14:04:50 +08:00
parent 576fbd039e
commit faf07b49a0
3 changed files with 74 additions and 4 deletions

View File

@ -16,6 +16,15 @@
}, },
{ {
"uuid":"2222", "uuid":"2222",
"name":"DataFrameRowParser",
"bundle":"cn.piflow.bundle.script.DataFrameRowParser",
"properties":{
"schema":"id,name,gender,age",
"separator":","
}
},
{
"uuid":"3333",
"name":"PutHiveStreaming", "name":"PutHiveStreaming",
"bundle":"cn.piflow.bundle.hive.PutHiveStreaming", "bundle":"cn.piflow.bundle.hive.PutHiveStreaming",
"properties":{ "properties":{
@ -29,6 +38,12 @@
"from":"ShellExecutor", "from":"ShellExecutor",
"outport":"", "outport":"",
"inport":"", "inport":"",
"to":"DataFrameRowParser"
},
{
"from":"DataFrameRowParser",
"outport":"",
"inport":"",
"to":"PutHiveStreaming" "to":"PutHiveStreaming"
} }
] ]

View File

@ -0,0 +1,54 @@
package cn.piflow.bundle.script
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.conf.{CommonGroup, ConfigurableStop, StopGroup}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
class DataFrameRowParser extends ConfigurableStop{
var schema: String = _
var separator: String = _
override def setProperties(map: Map[String, Any]): Unit = {
schema = MapUtil.get(map,"schema").asInstanceOf[String]
separator = MapUtil.get(map,"separator").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = ???
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("./src/main/resources/DataFrameParse.jpg")
}
override def getGroup(): StopGroup = {
CommonGroup
}
override def initialize(ctx: ProcessContext): Unit = {}
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
val inDF = in.read()
//parse RDD
val rdd = inDF.rdd.map(row => {
val fieldArray = row.asInstanceOf[Row].get(0).asInstanceOf[String].split(",")
Row.fromSeq(fieldArray.toSeq)
})
//parse schema
val field = schema.split(",")
val structFieldArray : Array[StructField] = new Array[StructField](field.size)
for(i <- 0 to field.size - 1){
structFieldArray(i) = new StructField(field(i),StringType, nullable = true)
}
val schemaStructType = StructType(structFieldArray)
//create DataFrame
val df = spark.createDataFrame(rdd,schemaStructType)
df.show()
out.write(df)
}
}

View File

@ -38,17 +38,18 @@ class ShellExecutor extends ConfigurableStop{
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val command = shellPath + " " + args val command = shellPath + " " + args
val result = command!! val result = command!!
val rawData = result.split("\n").toList
var rowList : List[Row] = List() var rowList : List[Row] = List()
val rawData = result.split("\n").toList
rawData.foreach( s => rowList = Row(s) :: rowList ) rawData.foreach( s => rowList = Row(s) :: rowList )
val spark = pec.get[SparkSession]() val spark = pec.get[SparkSession]()
val rdd = spark.sparkContext.parallelize(rowList) val rowsRdd = spark.sparkContext.parallelize(rowList)
val schema = StructType(List(new StructField("row", StringType, nullable = true))) val schema = StructType(List(new StructField("row", StringType, nullable = true)))
val df = spark.createDataFrame(rdd, schema) val df = spark.createDataFrame(rowsRdd,schema)
out.write(df) df.show()
out.write(df)
} }
} }