forked from opensci/piflow
delete jdbcwrite
This commit is contained in:
parent
c61e5b549a
commit
2ce3ccbc60
|
@ -1,78 +0,0 @@
|
||||||
package cn.piflow.bundle.jdbc
|
|
||||||
|
|
||||||
import java.util.Properties
|
|
||||||
|
|
||||||
import cn.piflow._
|
|
||||||
import cn.piflow.conf._
|
|
||||||
import cn.piflow.conf.bean.PropertyDescriptor
|
|
||||||
import cn.piflow.conf.util.{ImageUtil, MapUtil}
|
|
||||||
import org.apache.spark.sql.{SaveMode, SparkSession}
|
|
||||||
|
|
||||||
import scala.beans.BeanProperty
|
|
||||||
|
|
||||||
class
|
|
||||||
JdbcWrite extends ConfigurableStop{
|
|
||||||
|
|
||||||
val authorEmail: String = "xjzhu@cnic.cn"
|
|
||||||
val description: String = "Write data into jdbc database"
|
|
||||||
val inportList: List[String] = List(Port.DefaultPort.toString)
|
|
||||||
val outportList: List[String] = List(Port.NonePort.toString)
|
|
||||||
|
|
||||||
var url:String = _
|
|
||||||
var user:String = _
|
|
||||||
var password:String = _
|
|
||||||
var dbtable:String = _
|
|
||||||
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
|
|
||||||
val spark = pec.get[SparkSession]()
|
|
||||||
val jdbcDF = in.read()
|
|
||||||
val properties = new Properties()
|
|
||||||
properties.put("user", user)
|
|
||||||
properties.put("password", password)
|
|
||||||
jdbcDF.write.mode(SaveMode.Append).jdbc(url,dbtable,properties)
|
|
||||||
//jdbcDF.show(10)
|
|
||||||
out.write(jdbcDF)
|
|
||||||
}
|
|
||||||
|
|
||||||
def initialize(ctx: ProcessContext): Unit = {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
override def setProperties(map: Map[String, Any]): Unit = {
|
|
||||||
url = MapUtil.get(map,"url").asInstanceOf[String]
|
|
||||||
user = MapUtil.get(map,"user").asInstanceOf[String]
|
|
||||||
password = MapUtil.get(map,"password").asInstanceOf[String]
|
|
||||||
dbtable = MapUtil.get(map,"dbtable").asInstanceOf[String]
|
|
||||||
}
|
|
||||||
|
|
||||||
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
|
|
||||||
var descriptor : List[PropertyDescriptor] = List()
|
|
||||||
|
|
||||||
val url=new PropertyDescriptor().name("url").displayName("url").description("The Url, for example jdbc:mysql://127.0.0.1/dbname").defaultValue("").required(true)
|
|
||||||
//descriptor = url :: descriptor
|
|
||||||
|
|
||||||
val user=new PropertyDescriptor().name("user").displayName("user").description("The user name of database").defaultValue("").required(true)
|
|
||||||
//descriptor = user :: descriptor
|
|
||||||
|
|
||||||
val password=new PropertyDescriptor().name("password").displayName("password").description("The password of database").defaultValue("").required(true)
|
|
||||||
//descriptor = password :: descriptor
|
|
||||||
|
|
||||||
val dbtable=new PropertyDescriptor().name("dbtable").displayName("dbtable").description("The table you want to write").defaultValue("").required(true)
|
|
||||||
//descriptor = dbtable :: descriptor
|
|
||||||
|
|
||||||
descriptor = url :: descriptor
|
|
||||||
descriptor = user :: descriptor
|
|
||||||
descriptor = password :: descriptor
|
|
||||||
descriptor = dbtable :: descriptor
|
|
||||||
descriptor
|
|
||||||
}
|
|
||||||
|
|
||||||
override def getIcon(): Array[Byte] = {
|
|
||||||
ImageUtil.getImage("icon/jdbc/jdbcWrite.png")
|
|
||||||
}
|
|
||||||
|
|
||||||
override def getGroup(): List[String] = {
|
|
||||||
List(StopGroup.JdbcGroup.toString)
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
}
|
|
Loading…
Reference in New Issue