This commit is contained in:
bao319 2020-03-26 17:03:04 +08:00
parent b7fcbcd780
commit acebfc6af2
24 changed files with 1098 additions and 126 deletions

View File

@ -20,7 +20,8 @@
"csvSavePath":"hdfs://192.168.3.138:8020/test/",
"header": "true",
"delimiter":",",
"partition":"1"
"partition":"1",
"saveMode": "append"
}
}

View File

@ -0,0 +1,39 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"1111",
"name":"SelectHiveQL",
"bundle":"cn.piflow.bundle.hive.SelectHiveQL",
"properties":{
"hiveQL":"select * from test.csvparser"
}
},
{
"uuid":"1324",
"name":"CsvSave",
"bundle":"cn.piflow.bundle.csv.CsvSave",
"properties":{
"csvSavePath":"hdfs://192.168.3.138:8020/test/",
"header": "true",
"delimiter":",",
"partition":"1",
"saveMode": "ErrorIfExists"
}
}
],
"paths":[
{
"from":"SelectHiveQL",
"outport":"",
"inport":"",
"to":"CsvSave"
}
]
}
}

View File

@ -0,0 +1,39 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"1111",
"name":"SelectHiveQL",
"bundle":"cn.piflow.bundle.hive.SelectHiveQL",
"properties":{
"hiveQL":"select * from test.csvparser"
}
},
{
"uuid":"1324",
"name":"CsvSave",
"bundle":"cn.piflow.bundle.csv.CsvSave",
"properties":{
"csvSavePath":"hdfs://192.168.3.138:8020/test/",
"header": "true",
"delimiter":",",
"partition":"1",
"saveMode": "ignore"
}
}
],
"paths":[
{
"from":"SelectHiveQL",
"outport":"",
"inport":"",
"to":"CsvSave"
}
]
}
}

View File

@ -0,0 +1,39 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"1111",
"name":"SelectHiveQL",
"bundle":"cn.piflow.bundle.hive.SelectHiveQL",
"properties":{
"hiveQL":"select * from test.csvparser"
}
},
{
"uuid":"1324",
"name":"CsvSave",
"bundle":"cn.piflow.bundle.csv.CsvSave",
"properties":{
"csvSavePath":"hdfs://192.168.3.138:8020/test/overwrite/",
"header": "true",
"delimiter":",",
"partition":"1",
"saveMode": "overwrite"
}
}
],
"paths":[
{
"from":"SelectHiveQL",
"outport":"",
"inport":"",
"to":"CsvSave"
}
]
}
}

View File

@ -0,0 +1,45 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"1111",
"name":"LoadFromFtpToHDFS",
"bundle":"cn.piflow.bundle.ftp.LoadFromFtpToHDFS",
"properties": {
"ftp_url":"",
"port":"",
"username":"",
"password":"",
"ftpFile": "",
"HDFSUrl":"",
"HDFSPath":"",
"isFile":"",
"filterByName":""
}
},
{
"uuid":"1324",
"name":"PutHiveMode",
"bundle":"cn.piflow.bundle.hive.PutHiveMode",
"properties":{
"database":"test",
"table" :"userMode",
"saveMode":"overwrite"
}
}
],
"paths":[
{
"from":"SelectHiveQL",
"outport":"",
"inport":"",
"to":"PutHiveMode"
}
]
}
}

View File

@ -0,0 +1,43 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"1111",
"name":"UploadToFtp",
"bundle":"cn.piflow.bundle.ftp.UploadToFtp",
"properties": {
"url_str":"",
"port":"",
"username":"",
"password":"",
"ftpFile": "",
"localPath":"",
"ftpClient":""
}
},
{
"uuid":"1324",
"name":"PutHiveMode",
"bundle":"cn.piflow.bundle.hive.PutHiveMode",
"properties":{
"database":"test",
"table" :"userMode",
"saveMode":"overwrite"
}
}
],
"paths":[
{
"from":"SelectHiveQL",
"outport":"",
"inport":"",
"to":"PutHiveMode"
}
]
}
}

View File

@ -16,15 +16,21 @@ class CsvSave extends ConfigurableStop{
var header: Boolean = _
var delimiter: String = _
var partition :String= _
var saveMode:String = _
override def setProperties(map: Map[String, Any]): Unit = {
csvSavePath = MapUtil.get(map,"csvSavePath").asInstanceOf[String]
header = MapUtil.get(map,"header").asInstanceOf[String].toBoolean
delimiter = MapUtil.get(map,"delimiter").asInstanceOf[String]
partition = MapUtil.get(map,key="partition").asInstanceOf[String]
saveMode = MapUtil.get(map,"saveMode").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
val saveModeOption = Set("append","overwrite","ErrorIfExists","ignore")
var descriptor : List[PropertyDescriptor] = List()
val csvSavePath = new PropertyDescriptor()
@ -63,6 +69,16 @@ class CsvSave extends ConfigurableStop{
.example("3")
descriptor = partition :: descriptor
val saveMode = new PropertyDescriptor()
.name("saveMode")
.displayName("SaveMode")
.description("The save mode for csv file")
.allowableValues(saveModeOption)
.defaultValue("append")
.required(true)
.example("append")
descriptor = saveMode :: descriptor
descriptor
}
@ -83,7 +99,7 @@ class CsvSave extends ConfigurableStop{
df.repartition(partition.toInt).write
.format("csv")
.mode(SaveMode.Overwrite)
.mode(saveMode)
.option("header", header)
.option("delimiter",delimiter)
.save(csvSavePath)

View File

@ -10,7 +10,7 @@ import org.apache.spark.sql.SparkSession
class GetFile extends ConfigurableStop{
override val authorEmail: String = "ygang@cnic.cn"
override val description: String = "Upload local files to hdfs"
override val description: String = "Download fields from hdfs to local"
val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.DefaultPort)
@ -29,7 +29,6 @@ class GetFile extends ConfigurableStop{
executor.exec(s"hdfs dfs -get ${hdfsFile} ${localPath}")
}

View File

@ -15,10 +15,11 @@ import org.apache.log4j.Logger
import scala.collection.mutable.ArrayBuffer
class LoadFromFtpToHDFS extends ConfigurableStop{
override val authorEmail: String = "yangqidong@cnic.cn"
override val description: String = "Download files or folders to HDFS"
override val inportList: List[String] = List(Port.NonePort.toString)
override val outportList: List[String] = List(Port.NonePort.toString)
override val description: String = "Upload files or folders to HDFS"
override val inportList: List[String] = List(Port.DefaultPort)
override val outportList: List[String] = List(Port.DefaultPort)
var ftp_url:String =_
var port:String=_
@ -50,7 +51,7 @@ class LoadFromFtpToHDFS extends ConfigurableStop{
}catch {
case e : Exception => {
isConnect = false
log.warn("Retry the connection")
log.warn("Retry connection")
Thread.sleep(1000*60*3)
}
}
@ -158,7 +159,7 @@ class LoadFromFtpToHDFS extends ConfigurableStop{
try{
ftp.disconnect()
}catch {
case e :Exception => log.warn("Failed to disconnect FTP server")
case e :Exception => log.warn("Failed to disconnect the ftp server")
}
fileArr.foreach(eachFile => {
val fileName: String = eachFile.getName
@ -215,7 +216,7 @@ class LoadFromFtpToHDFS extends ConfigurableStop{
}else if(isFile.equals("false") || isFile.equals("FALSE")){
downFileDir(ftpFile,HDFSPath)
}else{
throw new Exception("Please specify whether it is a file or directory.")
throw new Exception("Please specify a file or directory.")
}
if(errorFile.size > 0){
@ -239,27 +240,80 @@ class LoadFromFtpToHDFS extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val url_str = new PropertyDescriptor().name("url_str").displayName("URL").description("IP of FTP server, such as 128.136.0.1 or ftp.ei.addfc.gak").required(true)
val port = new PropertyDescriptor().name("port").displayName("PORT").description("Port of FTP server").required(false)
val username = new PropertyDescriptor().name("username").displayName("USER_NAME").description("").required(false)
val password = new PropertyDescriptor().name("password").displayName("PASSWORD").description("").required(false)
val ftpFile = new PropertyDescriptor().name("ftpFile").displayName("FTP_File").description("The path of the file to the FTP server, such as /test/Ab/ or /test/Ab/test.txt").required(true)
val HDFSUrl = new PropertyDescriptor().name("HDFSUrl").displayName("HDFSUrl").description("The URL of the HDFS file system, such as hdfs://10.0.88.70:9000").required(true)
val HDFSPath = new PropertyDescriptor().name("HDFSPath").displayName("HDFSPath").description("The save path of the HDFS file system, such as /test/Ab/").required(true)
val isFile = new PropertyDescriptor().name("isFile").displayName("isFile").description("Whether the path is a file or not, if true is filled in, only a single file specified by the path is downloaded. If false is filled in, all files under the folder are downloaded recursively.").required(true)
val filterByName = new PropertyDescriptor().name("filterByName").displayName("filterByName").description("If you choose to download the entire directory, you can use this parameter to filter the files you need to download. " +
"What you need to fill in here is standard Java regular expressions. For example, you need to download all files in the /A/ directory that end in .gz " +
"You can fill in .*.gz here. If there are multiple screening conditions, need to use ; segmentation.").required(false)
descriptor = filterByName :: descriptor
descriptor = isFile :: descriptor
val url_str = new PropertyDescriptor()
.name("url_str")
.displayName("URL")
.description("IP of FTP server")
.required(true)
.example("128.136.0.1 or ftp.ei.addfc.gak")
descriptor = url_str :: descriptor
val port = new PropertyDescriptor()
.name("port")
.displayName("Port")
.description("Port of FTP server")
.required(false)
.example("")
descriptor = port :: descriptor
val username = new PropertyDescriptor()
.name("username")
.displayName("User_Name")
.description("")
.required(false)
.example("root")
descriptor = username :: descriptor
val password = new PropertyDescriptor()
.name("password")
.displayName("Password")
.description("")
.required(false)
.example("123456")
descriptor = password :: descriptor
val ftpFile = new PropertyDescriptor()
.name("ftpFile")
.displayName("FTP_File")
.description("The path of the file to the FTP server")
.required(true)
.example("/test/Ab/ or /test/Ab/test.txt")
descriptor = ftpFile :: descriptor
val HDFSUrl = new PropertyDescriptor()
.name("hdfsUrl")
.displayName("HdfsUrl")
.description("The URL of the HDFS file system")
.required(true)
.example("hdfs://10.0.88.70:9000")
descriptor = HDFSUrl :: descriptor
val HDFSPath = new PropertyDescriptor()
.name("HDFSPath")
.displayName("HdfsPath")
.description("The save path of the HDFS file system")
.required(true)
.example("test/Ab/")
descriptor = HDFSPath :: descriptor
val isFile = new PropertyDescriptor()
.name("isFile")
.displayName("IsFile")
.description("Whether the path is a file or not, if true,only a single file specified by the path is downloaded. If false,recursively download all files in the folder")
.required(true)
.example("true")
descriptor = isFile :: descriptor
val filterByName = new PropertyDescriptor()
.name("filterByName")
.displayName("FilterByName")
.description("If you choose to download the entire directory, you can use this parameter to filter the files that need to be downloaded. " +
"Here you need to fill in a standard Java regular expressions. For example, you need to download all files ending in the /A/ directory," +
"you can fill in .*.gz here. If there are multiple filters,they need to be separated by commas")
.required(false)
.example(".*. gz")
descriptor = filterByName :: descriptor
descriptor
}
@ -268,7 +322,7 @@ class LoadFromFtpToHDFS extends ConfigurableStop{
}
override def getGroup(): List[String] = {
List(StopGroup.FtpGroup.toString)
List(StopGroup.FtpGroup)
}
override def initialize(ctx: ProcessContext): Unit = {

View File

@ -15,8 +15,9 @@ import scala.reflect.io.Directory
class UploadToFtp extends ConfigurableStop {
val authorEmail: String = "xiaoxiao@cnic.cn"
val description: String = "Upload file to ftp server"
val inportList: List[String] = List(Port.NonePort.toString)
val outportList: List[String] = List(Port.NonePort.toString)
val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.DefaultPort)
var url_str: String = _
var port: Int = _
var username: String = _
@ -38,6 +39,7 @@ class UploadToFtp extends ConfigurableStop{
import sun.net.ftp.FtpProtocolException
import java.io.IOException
import java.net.InetSocketAddress
def connectFTP(url: String, port: Int, username: String, password: String): FtpClient = { //创建ftp
var ftp: FtpClient = null
try { //创建地址
@ -78,8 +80,10 @@ class UploadToFtp extends ConfigurableStop{
import java.io.FileInputStream
import java.io.FileNotFoundException
import java.io.IOException
/**
* 上传文件到FTP
*
* @param sourcePath
* @param filePath 要保存在FTP上的路径文件夹
* @param fileName 文件名test001.jpg
@ -134,15 +138,10 @@ class UploadToFtp extends ConfigurableStop{
}
}
def initialize(ctx: ProcessContext): Unit = {
}
def setProperties(map: Map[String, Any]): Unit = {
url_str = MapUtil.get(map, key = "url_str").asInstanceOf[String]
port = Integer.parseInt(MapUtil.get(map, key = "port").toString)
@ -154,18 +153,54 @@ class UploadToFtp extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor: List[PropertyDescriptor] = List()
val url_str = new PropertyDescriptor().name("url_str").displayName("URL").defaultValue("").required(true)
val port = new PropertyDescriptor().name("port").displayName("PORT").defaultValue("").required(true)
val username = new PropertyDescriptor().name("username").displayName("USER_NAME").defaultValue("").required(true)
val password = new PropertyDescriptor().name("password").displayName("PASSWORD").defaultValue("").required(true)
val ftpFile = new PropertyDescriptor().name("ftpFile").displayName("FTP_File").defaultValue("").required(true)
val localPath = new PropertyDescriptor().name("localPath").displayName("Local_Path").defaultValue("").required(true)
val url_str = new PropertyDescriptor()
.name("url_str")
.displayName("URL")
.defaultValue("")
.required(true)
.example("")
descriptor = url_str :: descriptor
val port = new PropertyDescriptor()
.name("port")
.displayName("Port")
.defaultValue("")
.required(true)
.example("")
descriptor = port :: descriptor
val username = new PropertyDescriptor()
.name("username")
.displayName("User_Name")
.defaultValue("")
.required(true)
.example("")
descriptor = username :: descriptor
val password = new PropertyDescriptor()
.name("password")
.displayName("Password")
.defaultValue("")
.required(true)
.example("")
descriptor = password :: descriptor
val ftpFile = new PropertyDescriptor()
.name("ftpFile")
.displayName("FTP_File")
.defaultValue("")
.required(true)
.example("")
descriptor = ftpFile :: descriptor
val localPath = new PropertyDescriptor()
.name("localPath")
.displayName("Local_Path")
.defaultValue("")
.required(true)
.example("")
descriptor = localPath :: descriptor
descriptor
}
@ -174,6 +209,6 @@ class UploadToFtp extends ConfigurableStop{
}
override def getGroup(): List[String] = {
List(StopGroup.FtpGroup.toString)
List(StopGroup.FtpGroup)
}
}

View File

@ -63,7 +63,7 @@ class PutHiveMode extends ConfigurableStop {
.displayName("SaveMode")
.description("The save mode for table")
.allowableValues(saveModeOption)
.defaultValue("ignore")
.defaultValue("append")
.required(true)
.example("append")
descriptor = saveMode :: descriptor

View File

@ -0,0 +1,130 @@
package cn.piflow.bundle.hive
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.conf.{ConfigurableStop, Port, StopGroup}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession}
/**
* HIVE JDBC DRIVER DESIGN FOR HIVE 1.2.1
*/
class SelectHiveQLbyJDBC extends ConfigurableStop {
override val authorEmail: String = "xiaomeng7890@gmail.com"
override val description: String = "some hive can only achieve by jdbc, this stop is designed for this"
override val inportList: List[String] = List(Port.NonePort)
override val outportList: List[String] = List(Port.DefaultPort)
private val driverName = "org.apache.hive.jdbc.HiveDriver"
var hiveUser : String = _
var hivePassword : String = _
var jdbcUrl : String = _
var sql : String = _
override def setProperties(map: Map[String, Any]): Unit = {
hiveUser = MapUtil.get(map,"hiveUser").asInstanceOf[String]
hivePassword = MapUtil.get(map,"hivePassword").asInstanceOf[String]
jdbcUrl = MapUtil.get(map,"jdbcUrl").asInstanceOf[String]
sql = MapUtil.get(map,"sql").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val hiveUser = new PropertyDescriptor()
.name("hive user")
.displayName("Hive User")
.description("Users connected to hive")
.defaultValue("root")
.required(true)
.example("root")
descriptor = hiveUser :: descriptor
val hivePassword = new PropertyDescriptor().
name("hive password")
.displayName("Hive Password")
.description("Password to connect to hive")
.defaultValue("123456")
.required(true)
.example("123456")
descriptor = hivePassword :: descriptor
val jdbcUrl = new PropertyDescriptor().
name("jdbcUrl")
.displayName("JdbcUrl")
.description("URL for hive to connect to JDBC")
.defaultValue("jdbc:hive2://packone12:2181,packone13:2181,packone11:2181/middle;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2")
.required(true)
.example("jdbc:hive2://192.168.3.140:10000/default")
descriptor = jdbcUrl :: descriptor
val sql = new PropertyDescriptor().
name("query")
.displayName("Hive Query")
.description("SQL query statement of hive")
.defaultValue("select * from middle.m_person")
.required(true)
.example("select * from test.user1")
descriptor = sql :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("icon/hive/OptionalPutHiveQL.png")
}
override def getGroup(): List[String] = {
List(StopGroup.HiveGroup)
}
override def initialize(ctx: ProcessContext): Unit = {}
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val sc = pec.get[SparkSession]()
val df = getDF (sc.sqlContext, sc.sparkContext, sql)
out.write(df)
}
def getDF(sqlContext : SQLContext, sc : SparkContext, tableName : String) : DataFrame = {
var df = sqlContext.sql(sql)
val count = df.count()
if (count == 0) {
println("Cant read by normal read, using JDBC <== this will cost a lot of time")
df = getJDBCDF(sqlContext, sc, tableName)
}
df
}
def getJDBCDF(sqlContext : SQLContext, sc : SparkContext, tableName : String) : DataFrame = {
import java.sql.DriverManager
try
Class.forName(driverName)
catch {
case e: ClassNotFoundException =>
e.printStackTrace()
System.exit(1)
}
val conn = DriverManager.getConnection(jdbcUrl, hiveUser, hivePassword)
val ptsm = conn.prepareStatement(sql)
println(ptsm)
val rs = ptsm.executeQuery()
var rows = Seq[Row]()
val meta = rs.getMetaData
for (i <- 1 to meta.getColumnCount) {
println(meta.getColumnName(i))
}
while (rs.next) {
var row = Seq[String]()
for (i <- 1 to meta.getColumnCount) {
row = row.:+(rs.getString(i))
}
rows = rows.:+(Row.fromSeq(row))
}
val organizationRDD = sc.makeRDD(rows)
sqlContext.createDataFrame(organizationRDD, sqlContext.read.table(tableName).schema)
}
}

View File

@ -0,0 +1,102 @@
package cn.piflow.bundle.jdbc
import java.sql.{Connection, DriverManager, ResultSet, Statement}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.conf.{ConfigurableStop, Port, StopGroup}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import scala.collection.mutable.ArrayBuffer
class ReadImpala extends ConfigurableStop{
override val authorEmail: String = "yangqidong@cnic.cn"
override val description: String = "Get data from impala"
val inportList: List[String] = List(Port.NonePort.toString)
val outportList: List[String] = List(Port.DefaultPort.toString)
var url:String=_
var user:String=_
var password:String=_
var sql:String=_
var schameString : String=_
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val session: SparkSession = pec.get[SparkSession]()
Class.forName("org.apache.hive.jdbc.HiveDriver")
val con: Connection = DriverManager.getConnection("jdbc:hive2://"+url+"/;auth=noSasl",user,password)
val stmt: Statement = con.createStatement()
val rs: ResultSet = stmt.executeQuery(sql)
val filedNames: Array[String] = schameString.split(",")
var rowsArr:ArrayBuffer[ArrayBuffer[String]]=ArrayBuffer()
while (rs.next()){
var rowArr:ArrayBuffer[String]=ArrayBuffer()
for(fileName <- filedNames){
rowArr+=rs.getString(fileName)
}
rowsArr+=rowArr
}
val fields: Array[StructField] = filedNames.map(d=>StructField(d,StringType,nullable = true))
val schema: StructType = StructType(fields)
val rows: List[Row] = rowsArr.toList.map(arr => {
val row: Row = Row.fromSeq(arr)
row
})
val rdd: RDD[Row] = session.sparkContext.makeRDD(rows)
val df: DataFrame = session.createDataFrame(rdd,schema)
out.write(df)
}
override def setProperties(map: Map[String, Any]): Unit = {
url = MapUtil.get(map,"url").asInstanceOf[String]
user = MapUtil.get(map,"user").asInstanceOf[String]
password = MapUtil.get(map,"password").asInstanceOf[String]
sql = MapUtil.get(map,"sql").asInstanceOf[String]
schameString = MapUtil.get(map,"schameString").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val url=new PropertyDescriptor().name("url").displayName("url").description("IP and port number, you need to write like this -- ip:port").defaultValue("").required(true)
descriptor = url :: descriptor
val user=new PropertyDescriptor().name("user").displayName("user").description("user").defaultValue("").required(false)
descriptor = user :: descriptor
val password=new PropertyDescriptor().name("password").displayName("password").description("password").defaultValue("").required(false)
descriptor = password :: descriptor
val sql=new PropertyDescriptor().name("sql").displayName("sql").description("The name of the table has not changed.But you have to specify which database," +
" such as database.table.").defaultValue("").required(true)
descriptor = sql :: descriptor
val schameString=new PropertyDescriptor().name("schameString").displayName("schameString").description("The field of SQL statement query results is divided by ,").defaultValue("").required(true)
descriptor = schameString :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("icon/jdbc/SelectImpala.png")
}
override def getGroup(): List[String] = {
List(StopGroup.JdbcGroup)
}
override def initialize(ctx: ProcessContext): Unit = {
}
}

View File

@ -0,0 +1,77 @@
package cn.piflow.bundle.jdbc
import java.util.Properties
import cn.piflow._
import cn.piflow.conf._
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import org.apache.spark.sql.{SaveMode, SparkSession}
import scala.beans.BeanProperty
class WriteMysql extends ConfigurableStop{
val authorEmail: String = "xjzhu@cnic.cn"
val description: String = "Write data into jdbc database"
val inportList: List[String] = List(Port.DefaultPort.toString)
val outportList: List[String] = List(Port.NonePort.toString)
var url:String = _
var user:String = _
var password:String = _
var dbtable:String = _
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
val jdbcDF = in.read()
val properties = new Properties()
properties.put("user", user)
properties.put("password", password)
jdbcDF.write.mode(SaveMode.Append).jdbc(url,dbtable,properties)
//jdbcDF.show(10)
out.write(jdbcDF)
}
def initialize(ctx: ProcessContext): Unit = {
}
override def setProperties(map: Map[String, Any]): Unit = {
url = MapUtil.get(map,"url").asInstanceOf[String]
user = MapUtil.get(map,"user").asInstanceOf[String]
password = MapUtil.get(map,"password").asInstanceOf[String]
dbtable = MapUtil.get(map,"dbtable").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val url=new PropertyDescriptor().name("url").displayName("url").description("The Url, for example jdbc:mysql://127.0.0.1/dbname").defaultValue("").required(true)
//descriptor = url :: descriptor
val user=new PropertyDescriptor().name("user").displayName("user").description("The user name of database").defaultValue("").required(true)
//descriptor = user :: descriptor
val password=new PropertyDescriptor().name("password").displayName("password").description("The password of database").defaultValue("").required(true)
//descriptor = password :: descriptor
val dbtable=new PropertyDescriptor().name("dbtable").displayName("dbtable").description("The table you want to write").defaultValue("").required(true)
//descriptor = dbtable :: descriptor
descriptor = url :: descriptor
descriptor = user :: descriptor
descriptor = password :: descriptor
descriptor = dbtable :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("icon/jdbc/jdbcWrite.png")
}
override def getGroup(): List[String] = {
List(StopGroup.JdbcGroup.toString)
}
}

View File

@ -0,0 +1,94 @@
package cn.piflow.bundle.jdbc
import java.sql.{Connection, DriverManager, Statement}
import cn.piflow._
import cn.piflow.conf._
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import org.apache.spark.sql._
class WriteOracle extends ConfigurableStop{
val authorEmail: String = "yangqidong@cnic.cn"
val description: String = "Write data to oracle"
val inportList: List[String] = List(Port.NonePort.toString)
val outportList: List[String] = List(Port.DefaultPort.toString)
var url:String = _
var user:String = _
var password:String = _
var table:String = _
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val session = pec.get[SparkSession]()
val inDF: DataFrame = in.read()
Class.forName("oracle.jdbc.driver.OracleDriver")
val con: Connection = DriverManager.getConnection(url,user,password)
val star: Statement = con.createStatement()
val fileNames: Array[String] = inDF.columns
var fileNameStr:String=""
var createSQL:String="create table "+table+"("
fileNames.foreach(name => {
fileNameStr+=(","+name)
createSQL+=(name+" varchar2(100),")
})
star.executeUpdate(createSQL.substring(0,createSQL.length-1)+")")
inDF.collect().foreach(r => {
var insertSQL:String="insert into "+table+"("+fileNameStr.substring(1)+") Values("
var rowStr:String=""
val rs: Array[String] = r.toString().substring(1, r.toString().length - 1).split(",")
for(x <- rs){
rowStr+=(",'"+x+"'")
}
insertSQL+=(rowStr.substring(1)+")")
star.executeUpdate(insertSQL)
})
}
def initialize(ctx: ProcessContext): Unit = {
}
override def setProperties(map: Map[String, Any]): Unit = {
url = MapUtil.get(map,"url").asInstanceOf[String]
user = MapUtil.get(map,"user").asInstanceOf[String]
password = MapUtil.get(map,"password").asInstanceOf[String]
table = MapUtil.get(map,"table").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val url=new PropertyDescriptor().name("url").displayName("url").description("The Url, for example jdbc:oracle:thin:@192.168.0.1:1521/newdb").defaultValue("").required(true)
descriptor = url :: descriptor
val user=new PropertyDescriptor().name("user").displayName("user").description("The user name of database").defaultValue("").required(true)
descriptor = user :: descriptor
val password=new PropertyDescriptor().name("password").displayName("password").description("The password of database").defaultValue("").required(true)
descriptor = password :: descriptor
val table=new PropertyDescriptor().name("table").displayName("table").description("The table you want to write to").defaultValue("").required(true)
descriptor = table :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("icon/jdbc/jdbcWriteToOracle.png")
}
override def getGroup(): List[String] = {
List(StopGroup.JdbcGroup.toString)
}
}

View File

@ -10,13 +10,13 @@ import org.junit.Test
import scala.util.parsing.json.JSON
class CsvSaveTest {
class CsvSaveAsAppendTest {
@Test
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/csv/CsvSave.json"
val file = "src/main/resources/flow/csv/CsvSaveAsAppend.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)

View File

@ -0,0 +1,51 @@
package cn.piflow.bundle.csv
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
class CsvSaveAsErrorTest {
@Test
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/csv/CsvSaveAsError.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort", "50001").start()
//execute flow
val spark = SparkSession.builder()
.master("local[*]")
.appName("piflow-hive-bundle")
.config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g")
.config("spark.cores.max", "2")
.config("hive.metastore.uris", "thrift://192.168.3.140:9083")
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "")
.bind("debug.path","")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
}

View File

@ -0,0 +1,51 @@
package cn.piflow.bundle.csv
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
class CsvSaveAsIgnoreTest {
@Test
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/csv/CsvSaveAsIgnore.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort", "50001").start()
//execute flow
val spark = SparkSession.builder()
.master("local[*]")
.appName("piflow-hive-bundle")
.config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g")
.config("spark.cores.max", "2")
.config("hive.metastore.uris", "thrift://192.168.3.140:9083")
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "")
.bind("debug.path","")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
}

View File

@ -0,0 +1,51 @@
package cn.piflow.bundle.csv
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
class CsvSaveAsOverwriteTest {
@Test
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/csv/CsvSaveAsOverWrite.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort", "50001").start()
//execute flow
val spark = SparkSession.builder()
.master("local[*]")
.appName("piflow-hive-bundle")
.config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g")
.config("spark.cores.max", "2")
.config("hive.metastore.uris", "thrift://192.168.3.140:9083")
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "")
.bind("debug.path","")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
}

View File

@ -9,7 +9,7 @@ import org.junit.Test
import scala.util.parsing.json.JSON
class FecthESTest {
class FetchElasticsearchTest {
@Test
def testEs(): Unit ={

View File

@ -9,7 +9,7 @@ import org.junit.Test
import scala.util.parsing.json.JSON
class PutESTest {
class PutElasticsearchTest {
@Test
def testEs(): Unit ={

View File

@ -9,13 +9,13 @@ import org.junit.Test
import scala.util.parsing.json.JSON
class QueryESTest {
class QueryElasticsearchTest {
@Test
def testEs(): Unit ={
//parse flow json
val file = "src/main/resources/flow/csv/CsvParser.json"
val file = "src/main/resources/flow/es/QueryEs.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)

View File

@ -0,0 +1,53 @@
package cn.piflow.bundle.ftp
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
class LoadFromFtpToHDFS {
@Test
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/ftp/LoadFromFtpToHDFS.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort", "50001").start()
//execute flow
val spark = SparkSession.builder()
.master("local[*]")
.appName("CsvParserTest")
.config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g")
.config("spark.cores.max", "2")
.config("hive.metastore.uris", "thrift://192.168.3.140:9083")
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "")
.bind("debug.path","")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
}

View File

@ -0,0 +1,53 @@
package cn.piflow.bundle.ftp
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
class UploadToFtp {
@Test
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/ftp/UploadToFtp.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort", "50001").start()
//execute flow
val spark = SparkSession.builder()
.master("local[*]")
.appName("CsvParserTest")
.config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g")
.config("spark.cores.max", "2")
.config("hive.metastore.uris", "thrift://192.168.3.140:9083")
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "")
.bind("debug.path","")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
}