add CsvSave

This commit is contained in:
judy0131 2018-08-02 17:24:47 +08:00
parent a7a11bd5dc
commit f6223baf68
3 changed files with 80 additions and 3 deletions

View File

@ -54,7 +54,7 @@
"name":"Fork",
"bundle":"cn.piflow.bundle.common.Fork",
"properties":{
"outports":["out1","out2"]
"outports":["out1","out2","out3"]
}
},
{
@ -64,6 +64,16 @@
"properties":{
"jsonSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.json"
}
},
{
"uuid":"888",
"name":"CsvSave",
"bundle":"cn.piflow.bundle.csv.CsvSave",
"properties":{
"csvSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.csv",
"header":"true",
"delimiter":","
}
}
],
"paths":[
@ -102,6 +112,12 @@
"outport":"out2",
"inport":"",
"to":"JsonSave"
},
{
"from":"Fork",
"outport":"out3",
"inport":"",
"to":"CsvSave"
}
]
}

View File

@ -7,8 +7,6 @@ import cn.piflow.conf.util.MapUtil
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.beans.BeanProperty
class CsvParser extends ConfigurableStop{

View File

@ -0,0 +1,63 @@
package cn.piflow.bundle.csv
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.conf.{ConfigurableStop, CsvGroup, StopGroup}
import org.apache.spark.sql.SaveMode
class CsvSave extends ConfigurableStop{
override val inportCount: Int = 1
override val outportCount: Int = 0
var csvSavePath: String = _
var header: Boolean = _
var delimiter: String = _
override def setProperties(map: Map[String, Any]): Unit = {
csvSavePath = MapUtil.get(map,"csvSavePath").asInstanceOf[String]
header = MapUtil.get(map,"header").asInstanceOf[String].toBoolean
delimiter = MapUtil.get(map,"delimiter").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
//csvSavePath
val csvSavePath = new PropertyDescriptor().name("csvSavePath").displayName("csvSavePath").defaultValue("").required(true)
descriptor = csvSavePath :: descriptor
//header
val header = new PropertyDescriptor().name("header").displayName("header").defaultValue("header").required(true)
descriptor = header :: descriptor
//delimiter
val delimiter = new PropertyDescriptor().name("delimiter").displayName("delimiter").defaultValue(",").required(true)
descriptor = delimiter :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("./src/main/resources/selectHiveQL.jpg")
}
override def getGroup(): StopGroup = {
CsvGroup
}
override def initialize(ctx: ProcessContext): Unit = {
}
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val df = in.read()
df.show()
df.write
.format("csv")
.mode(SaveMode.Overwrite)
.option("header", header)
.save(csvSavePath)
}
}