This commit is contained in:
SongDz 2019-08-28 11:26:22 +08:00
commit d3887c170b
1 changed files with 14 additions and 2 deletions

View File

@ -6,7 +6,7 @@ import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil} import cn.piflow.conf.util.{ImageUtil, MapUtil}
import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.{SaveMode, SparkSession}
class PutHiveOverwrite extends ConfigurableStop { class PutHive extends ConfigurableStop {
val authorEmail: String = "xjzhu@cnic.cn" val authorEmail: String = "xjzhu@cnic.cn"
val description: String = "Save data to hive by overwrite mode" val description: String = "Save data to hive by overwrite mode"
@ -15,12 +15,15 @@ class PutHiveOverwrite extends ConfigurableStop {
var database:String = _ var database:String = _
var table:String = _ var table:String = _
var saveFormat:String = _
var saveMode:String = _
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]() val spark = pec.get[SparkSession]()
val inDF = in.read() val inDF = in.read()
inDF.write.format("parquet").mode(SaveMode.Overwrite).saveAsTable(database + "." + table) inDF.write.format("hive").mode(saveMode).saveAsTable(database + "." + table)
//inDF.show() //inDF.show()
//out.write(studentDF) //out.write(studentDF)
} }
@ -32,14 +35,23 @@ class PutHiveOverwrite extends ConfigurableStop {
def setProperties(map : Map[String, Any]) = { def setProperties(map : Map[String, Any]) = {
database = MapUtil.get(map,"database").asInstanceOf[String] database = MapUtil.get(map,"database").asInstanceOf[String]
table = MapUtil.get(map,"table").asInstanceOf[String] table = MapUtil.get(map,"table").asInstanceOf[String]
//saveFormat = MapUtil.get(map,"saveFormat").asInstanceOf[String]
saveMode = MapUtil.get(map,"saveMode").asInstanceOf[String]
} }
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
val saveModeOption = Set("append","overwrite","error","ignore")
val saveFormatOption = Set("parquet","orc","avro","csv","hive")
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val database=new PropertyDescriptor().name("database").displayName("DataBase").description("The database name").defaultValue("").required(true) val database=new PropertyDescriptor().name("database").displayName("DataBase").description("The database name").defaultValue("").required(true)
val table = new PropertyDescriptor().name("table").displayName("Table").description("The table name").defaultValue("").required(true) val table = new PropertyDescriptor().name("table").displayName("Table").description("The table name").defaultValue("").required(true)
val saveMode = new PropertyDescriptor().name("saveMode").displayName("SaveMode").description("The save mode for table").allowableValues(saveModeOption).defaultValue("ignore").required(true)
//val saveFormat = new PropertyDescriptor().name("saveFormat").displayName("saveFormat").description("The save format for table").allowableValues(saveFormatOption).defaultValue("csv").required(true)
descriptor = database :: descriptor descriptor = database :: descriptor
descriptor = table :: descriptor descriptor = table :: descriptor
//descriptor = saveFormat :: descriptor
descriptor = saveMode :: descriptor
descriptor descriptor
} }