From f6223baf6851a104b10ae153a421428aac9b3490 Mon Sep 17 00:00:00 2001 From: judy0131 Date: Thu, 2 Aug 2018 17:24:47 +0800 Subject: [PATCH] add CsvSave --- piflow-bundle/src/main/resources/flow.json | 18 +++++- .../cn/piflow/bundle/csv/CsvParser.scala | 2 - .../scala/cn/piflow/bundle/csv/CsvSave.scala | 63 +++++++++++++++++++ 3 files changed, 80 insertions(+), 3 deletions(-) create mode 100644 piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvSave.scala diff --git a/piflow-bundle/src/main/resources/flow.json b/piflow-bundle/src/main/resources/flow.json index 023b128..3d01e85 100644 --- a/piflow-bundle/src/main/resources/flow.json +++ b/piflow-bundle/src/main/resources/flow.json @@ -54,7 +54,7 @@ "name":"Fork", "bundle":"cn.piflow.bundle.common.Fork", "properties":{ - "outports":["out1","out2"] + "outports":["out1","out2","out3"] } }, { @@ -64,6 +64,16 @@ "properties":{ "jsonSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.json" } + }, + { + "uuid":"888", + "name":"CsvSave", + "bundle":"cn.piflow.bundle.csv.CsvSave", + "properties":{ + "csvSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.csv", + "header":"true", + "delimiter":"," + } } ], "paths":[ @@ -102,6 +112,12 @@ "outport":"out2", "inport":"", "to":"JsonSave" + }, + { + "from":"Fork", + "outport":"out3", + "inport":"", + "to":"CsvSave" } ] } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvParser.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvParser.scala index 42dbdd2..b84014a 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvParser.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvParser.scala @@ -7,8 +7,6 @@ import cn.piflow.conf.util.MapUtil import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{DataFrame, SparkSession} -import scala.beans.BeanProperty - class CsvParser extends ConfigurableStop{ diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvSave.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvSave.scala new file mode 100644 index 0000000..52cad43 --- /dev/null +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvSave.scala @@ -0,0 +1,63 @@ +package cn.piflow.bundle.csv + +import cn.piflow.conf.bean.PropertyDescriptor +import cn.piflow.conf.util.{ImageUtil, MapUtil} +import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} +import cn.piflow.conf.{ConfigurableStop, CsvGroup, StopGroup} +import org.apache.spark.sql.SaveMode + +class CsvSave extends ConfigurableStop{ + override val inportCount: Int = 1 + override val outportCount: Int = 0 + + var csvSavePath: String = _ + var header: Boolean = _ + var delimiter: String = _ + + override def setProperties(map: Map[String, Any]): Unit = { + csvSavePath = MapUtil.get(map,"csvSavePath").asInstanceOf[String] + header = MapUtil.get(map,"header").asInstanceOf[String].toBoolean + delimiter = MapUtil.get(map,"delimiter").asInstanceOf[String] + } + + override def getPropertyDescriptor(): List[PropertyDescriptor] = { + var descriptor : List[PropertyDescriptor] = List() + + //csvSavePath + val csvSavePath = new PropertyDescriptor().name("csvSavePath").displayName("csvSavePath").defaultValue("").required(true) + descriptor = csvSavePath :: descriptor + + //header + val header = new PropertyDescriptor().name("header").displayName("header").defaultValue("header").required(true) + descriptor = header :: descriptor + + //delimiter + val delimiter = new PropertyDescriptor().name("delimiter").displayName("delimiter").defaultValue(",").required(true) + descriptor = delimiter :: descriptor + + descriptor + } + + override def getIcon(): Array[Byte] = { + ImageUtil.getImage("./src/main/resources/selectHiveQL.jpg") + } + + override def getGroup(): StopGroup = { + CsvGroup + } + + override def initialize(ctx: ProcessContext): Unit = { + + } + + override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { + val df = in.read() + df.show() + df.write + .format("csv") + .mode(SaveMode.Overwrite) + .option("header", header) + .save(csvSavePath) + } +} +