diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/JdbcWrite.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/JdbcWrite.scala deleted file mode 100644 index a524b03..0000000 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/JdbcWrite.scala +++ /dev/null @@ -1,78 +0,0 @@ -package cn.piflow.bundle.jdbc - -import java.util.Properties - -import cn.piflow._ -import cn.piflow.conf._ -import cn.piflow.conf.bean.PropertyDescriptor -import cn.piflow.conf.util.{ImageUtil, MapUtil} -import org.apache.spark.sql.{SaveMode, SparkSession} - -import scala.beans.BeanProperty - -class -JdbcWrite extends ConfigurableStop{ - - val authorEmail: String = "xjzhu@cnic.cn" - val description: String = "Write data into jdbc database" - val inportList: List[String] = List(Port.DefaultPort.toString) - val outportList: List[String] = List(Port.NonePort.toString) - - var url:String = _ - var user:String = _ - var password:String = _ - var dbtable:String = _ - def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { - val spark = pec.get[SparkSession]() - val jdbcDF = in.read() - val properties = new Properties() - properties.put("user", user) - properties.put("password", password) - jdbcDF.write.mode(SaveMode.Append).jdbc(url,dbtable,properties) - //jdbcDF.show(10) - out.write(jdbcDF) - } - - def initialize(ctx: ProcessContext): Unit = { - - } - - override def setProperties(map: Map[String, Any]): Unit = { - url = MapUtil.get(map,"url").asInstanceOf[String] - user = MapUtil.get(map,"user").asInstanceOf[String] - password = MapUtil.get(map,"password").asInstanceOf[String] - dbtable = MapUtil.get(map,"dbtable").asInstanceOf[String] - } - - override def getPropertyDescriptor(): List[PropertyDescriptor] = { - var descriptor : List[PropertyDescriptor] = List() - - val url=new PropertyDescriptor().name("url").displayName("url").description("The Url, for example jdbc:mysql://127.0.0.1/dbname").defaultValue("").required(true) - //descriptor = url :: descriptor - - val user=new PropertyDescriptor().name("user").displayName("user").description("The user name of database").defaultValue("").required(true) - //descriptor = user :: descriptor - - val password=new PropertyDescriptor().name("password").displayName("password").description("The password of database").defaultValue("").required(true) - //descriptor = password :: descriptor - - val dbtable=new PropertyDescriptor().name("dbtable").displayName("dbtable").description("The table you want to write").defaultValue("").required(true) - //descriptor = dbtable :: descriptor - - descriptor = url :: descriptor - descriptor = user :: descriptor - descriptor = password :: descriptor - descriptor = dbtable :: descriptor - descriptor - } - - override def getIcon(): Array[Byte] = { - ImageUtil.getImage("icon/jdbc/jdbcWrite.png") - } - - override def getGroup(): List[String] = { - List(StopGroup.JdbcGroup.toString) - } - - -}