This commit is contained in:
bao319 2020-03-27 20:46:33 +08:00
parent 03b2e6d439
commit 5bef166551
39 changed files with 803 additions and 167 deletions

View File

@ -0,0 +1,45 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"checkpoint":"Merge",
"stops":[
{
"uuid":"0000",
"name":"JdbcReadFromOracle",
"bundle":"cn.piflow.bundle.jdbc.JdbcReadFromOracle",
"properties":{
"url":"jdbc:oracle:thin:@10.0.86.237:1521/newdb",
"user":"my",
"password":"bigdata",
"sql":"select * from typetype",
"schema":"mynum.number,mychar.varchar2,myblob.blob,myclob.clob,myxml.xmltype,mylong.long,mydate.date,mynclob.nclob"
}
},
{
"uuid":"1111",
"name":"CsvParser",
"bundle":"cn.piflow.bundle.csv.CsvParser",
"properties":{
"csvPath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.csv",
"header":"false",
"delimiter":",",
"schema":"title,author,pages"
}
}
],
"paths":[
{
"from":"JdbcReadFromOracle",
"outport":"",
"inport":"",
"to":"CsvParser"
}
]
}
}

View File

@ -6,8 +6,8 @@
{ {
"uuid":"1111", "uuid":"1111",
"name":"JdbcRead", "name":"MysqlRead",
"bundle":"cn.piflow.bundle.jdbc.JdbcRead", "bundle":"cn.piflow.bundle.jdbc.MysqlRead",
"properties":{ "properties":{
"url": "jdbc:mysql://192.168.3.141:3306/piflow_web", "url": "jdbc:mysql://192.168.3.141:3306/piflow_web",
"user": "root", "user": "root",
@ -23,7 +23,8 @@
"csvSavePath":"hdfs://192.168.3.138:8020/test/", "csvSavePath":"hdfs://192.168.3.138:8020/test/",
"header": "true", "header": "true",
"delimiter":",", "delimiter":",",
"partition":"1" "partition":"1",
"saveMode": "append"
} }
} }

View File

@ -0,0 +1,38 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"1111",
"name":"CsvParser",
"bundle":"cn.piflow.bundle.csv.CsvParser",
"properties":{
"csvPath":"hdfs://192.168.3.138:8020/test/",
"header": "true",
"delimiter":",",
"schema":"id,name,gender,age"
}
},
{
"uuid":"1111",
"name":"MysqlWrite",
"bundle":"cn.piflow.bundle.jdbc.MysqlWrite",
"properties":{
"url": "jdbc:mysql://192.168.3.141:3306/piflow_web",
"user": "root",
"password": "bigdata",
"dbtable":"MysqlWrite"
}
}
],
"paths":[
{
"from":"CsvParser",
"outport":"",
"inport":"",
"to":"MysqlWrite"
}
]
}
}

View File

@ -0,0 +1,42 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"1111",
"name":"OracleRead",
"bundle":"cn.piflow.bundle.jdbc.OracleRead",
"properties":{
"url": "jdbc:oracle:thin:@(DESCRIPTION =(ADDRESS_LIST =(ADDRESS=(PROTOCOL=TCP)(HOST=192.168.2.237)(PORT=1521)))(CONNECT_DATA=(SERVICE_NAME=RACDB_STANDBY)))",
"user": "dashuju",
"password": "DaShuju_732",
"sql":"select rpt_code,submit_date from egrant_isis.rpt_completion where rownum <=10"
}
},
{
"uuid":"1324",
"name":"CsvSave",
"bundle":"cn.piflow.bundle.csv.CsvSave",
"properties":{
"csvSavePath":"hdfs://192.168.3.138:8020/test/",
"header": "true",
"delimiter":",",
"partition":"1",
"saveMode": "append"
}
}
],
"paths":[
{
"from":"OracleRead",
"outport":"",
"inport":"",
"to":"CsvSave"
}
]
}
}

View File

@ -0,0 +1,46 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"1111",
"name":"OracleRead",
"bundle":"cn.piflow.bundle.jdbc.OracleRead",
"properties":{
"url": "jdbc:oracle:thin:@(DESCRIPTION =(ADDRESS_LIST =(ADDRESS=(PROTOCOL=TCP)(HOST=192.168.2.237)(PORT=1521)))(CONNECT_DATA=(SERVICE_NAME=RACDB_STANDBY)))",
"user": "dashuju",
"password": "DaShuju_732",
"sql":"select rpt_code,submit_date from egrant_isis.rpt_completion where rownum <=10",
"partitionColumn": "rpt_code",
"lowerBound": "1",
"upperBound": "20",
"numPartitions": "5"
}
},
{
"uuid":"1324",
"name":"CsvSave",
"bundle":"cn.piflow.bundle.csv.CsvSave",
"properties":{
"csvSavePath":"hdfs://192.168.3.138:8020/test/",
"header": "true",
"delimiter":",",
"partition":"1",
"saveMode": "append"
}
}
],
"paths":[
{
"from":"OracleRead",
"outport":"",
"inport":"",
"to":"CsvSave"
}
]
}
}

View File

@ -0,0 +1,38 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"1111",
"name":"CsvParser",
"bundle":"cn.piflow.bundle.csv.CsvParser",
"properties":{
"csvPath":"hdfs://192.168.3.138:8020/test/",
"header": "true",
"delimiter":",",
"schema":"id,name,gender,age"
}
},
{
"uuid":"1111",
"name":"OracleWrite",
"bundle":"cn.piflow.bundle.jdbc.OracleWrite",
"properties":{
"url": "jdbc:mysql://192.168.3.141:3306/piflow_web",
"user": "root",
"password": "bigdata",
"dbtable":"test"
}
}
],
"paths":[
{
"from":"CsvParser",
"outport":"",
"inport":"",
"to":"test"
}
]
}
}

View File

@ -13,7 +13,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
class IdentityNumberClean extends ConfigurableStop{ class IdentityNumberClean extends ConfigurableStop{
val authorEmail: String = "06whuxx@163.com" val authorEmail: String = "06whuxx@163.com"
val description: String = "Cleaning data in ID format" val description: String = "Cleaning data in ID Card format"
val inportList: List[String] = List(Port.DefaultPort) val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.DefaultPort) val outportList: List[String] = List(Port.DefaultPort)
@ -75,7 +75,7 @@ class IdentityNumberClean extends ConfigurableStop{
.description("Column names are what you want to clean,multiple column names are separated by commas") .description("Column names are what you want to clean,multiple column names are separated by commas")
.defaultValue("") .defaultValue("")
.required(true) .required(true)
.example("idcard") .example("IdCard")
descriptor = columnName :: descriptor descriptor = columnName :: descriptor
descriptor descriptor

View File

@ -72,7 +72,7 @@ class CsvParser extends ConfigurableStop{
.description("The path of csv file") .description("The path of csv file")
.defaultValue("") .defaultValue("")
.required(true) .required(true)
.example("hdfs://192.168.3.138:8020/test/") .example("hdfs://127.0.0.1:9000/test/")
descriptor = csvPath :: descriptor descriptor = csvPath :: descriptor
val header = new PropertyDescriptor() val header = new PropertyDescriptor()

View File

@ -39,7 +39,7 @@ class CsvSave extends ConfigurableStop{
.description("The save path of csv file") .description("The save path of csv file")
.defaultValue("") .defaultValue("")
.required(true) .required(true)
.example("hdfs://192.168.3.138:8020/test/") .example("hdfs://127.0.0.1:9000/test/")
descriptor = csvSavePath :: descriptor descriptor = csvSavePath :: descriptor
val header = new PropertyDescriptor() val header = new PropertyDescriptor()

View File

@ -18,7 +18,7 @@ import scala.collection.mutable.ArrayBuffer
class UnzipFilesOnHDFS extends ConfigurableStop { class UnzipFilesOnHDFS extends ConfigurableStop {
val authorEmail: String = "yangqidong@cnic.cn" val authorEmail: String = "yangqidong@cnic.cn"
val description: String = "Unzip files on hdfs" val description: String = "Extract files on hdfs"
val inportList: List[String] = List(Port.DefaultPort) val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.DefaultPort) val outportList: List[String] = List(Port.DefaultPort)
@ -67,7 +67,7 @@ class UnzipFilesOnHDFS extends ConfigurableStop {
typeStr="gz" typeStr="gz"
} }
}else{ }else{
throw new RuntimeException("File type fill in error, or do not support this type.") throw new RuntimeException("The file type is incorrect,or is not supported.")
} }
typeStr typeStr
} }
@ -88,9 +88,6 @@ class UnzipFilesOnHDFS extends ConfigurableStop {
fs fs
} }
def unzipFile(fileHdfsPath: String, saveHdfsPath: String)= { def unzipFile(fileHdfsPath: String, saveHdfsPath: String)= {
var eachSavePath : String="" var eachSavePath : String=""
@ -179,7 +176,7 @@ class UnzipFilesOnHDFS extends ConfigurableStop {
.name("filePath") .name("filePath")
.displayName("FilePath") .displayName("FilePath")
.defaultValue("") .defaultValue("")
.description("File path of HDFS,such as '/work/a.gz'") .description("File path of HDFS")
.required(false) .required(false)
.example("/work/a.gz ") .example("/work/a.gz ")
@ -195,9 +192,9 @@ class UnzipFilesOnHDFS extends ConfigurableStop {
val savePath = new PropertyDescriptor() val savePath = new PropertyDescriptor()
.name("savePath") .name("savePath")
.displayName("savePath") .displayName("SavePath")
.description("This parameter can specify the location of the decompressed file, you can choose not to fill in, " + .description("This parameter can specify the location of the decompressed file, you can choose not to fill in, " +
"the program will default to save the decompressed file in the folder where the source file is located. If you fill in, you can specify a folder, such as /A/AB/") "the program saves the decompressed file in the folder where the source file is located by default. If you fill in, you can specify a folder, such as /A/AB/")
.defaultValue("") .defaultValue("")
.required(false) .required(false)
.example("/work/aa/") .example("/work/aa/")
@ -209,7 +206,7 @@ class UnzipFilesOnHDFS extends ConfigurableStop {
.displayName("isCustomize") .displayName("isCustomize")
.description("Whether to customize the compressed file path, if true, \n" + .description("Whether to customize the compressed file path, if true, \n" +
"you must specify the path where the compressed file is located . \n" + "you must specify the path where the compressed file is located . \n" +
"If it is false, it will automatically find the file path data from the upstream port ") "If false, it will automatically find the file path data from the upstream port ")
.defaultValue("false").allowableValues(Set("true","false")).required(true) .defaultValue("false").allowableValues(Set("true","false")).required(true)
descriptor = isCustomize :: descriptor descriptor = isCustomize :: descriptor
@ -225,6 +222,6 @@ class UnzipFilesOnHDFS extends ConfigurableStop {
} }
override def getGroup(): List[String] = { override def getGroup(): List[String] = {
List(StopGroup.HdfsGroup.toString) List(StopGroup.HdfsGroup)
} }
} }

View File

@ -9,9 +9,9 @@ import org.apache.spark.sql.{SaveMode, SparkSession}
class PutHiveMode extends ConfigurableStop { class PutHiveMode extends ConfigurableStop {
val authorEmail: String = "xjzhu@cnic.cn" val authorEmail: String = "xjzhu@cnic.cn"
val description: String = "Save data to hive by overwrite mode" val description: String = "Modes for saving data hive"
val inportList: List[String] = List(Port.DefaultPort.toString) val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.NonePort.toString) val outportList: List[String] = List(Port.DefaultPort)
var database:String = _ var database:String = _
var table:String = _ var table:String = _

View File

@ -11,8 +11,8 @@ class PutHiveQL extends ConfigurableStop {
val authorEmail: String = "xiaoxiao@cnic.cn" val authorEmail: String = "xiaoxiao@cnic.cn"
val description: String = "Execute hiveQL script" val description: String = "Execute hiveQL script"
val inportList: List[String] = List(Port.DefaultPort.toString) val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.DefaultPort.toString) val outportList: List[String] = List(Port.DefaultPort)
var database:String =_ var database:String =_
var hiveQL_Path:String =_ var hiveQL_Path:String =_

View File

@ -12,8 +12,8 @@ class PutHiveStreaming extends ConfigurableStop {
val authorEmail: String = "xjzhu@cnic.cn" val authorEmail: String = "xjzhu@cnic.cn"
val description: String = "Save data to hive" val description: String = "Save data to hive"
val inportList: List[String] = List(Port.DefaultPort.toString) val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.DefaultPort.toString) val outportList: List[String] = List(Port.DefaultPort)
var database:String = _ var database:String = _
var table:String = _ var table:String = _

View File

@ -14,8 +14,8 @@ class SelectHiveQL extends ConfigurableStop {
val authorEmail: String = "xjzhu@cnic.cn" val authorEmail: String = "xjzhu@cnic.cn"
val description: String = "Execute select clause of hiveQL" val description: String = "Execute select clause of hiveQL"
val inportList: List[String] = List(Port.DefaultPort.toString) val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.DefaultPort.toString) val outportList: List[String] = List(Port.DefaultPort)
var hiveQL:String = _ var hiveQL:String = _
@ -43,6 +43,7 @@ class SelectHiveQL extends ConfigurableStop {
.displayName("HiveQL") .displayName("HiveQL")
.defaultValue("") .defaultValue("")
.allowableValues(Set("")) .allowableValues(Set(""))
.description("Execute select clause of hiveQL")
.required(true) .required(true)
.example("select * from test.user1") .example("select * from test.user1")
descriptor = hiveQL :: descriptor descriptor = hiveQL :: descriptor
@ -54,7 +55,7 @@ class SelectHiveQL extends ConfigurableStop {
} }
override def getGroup(): List[String] = { override def getGroup(): List[String] = {
List(StopGroup.HiveGroup.toString) List(StopGroup.HiveGroup)
} }

View File

@ -10,7 +10,7 @@ import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession}
/** /**
* HIVE JDBC DRIVER DESIGN FOR HIVE 1.2.1 * HIVE JDBC DRIVER DESIGN FOR HIVE 1.2.1
*/ */
class SelectHiveQLbyJDBC extends ConfigurableStop { class SelectHiveQLByJDBC extends ConfigurableStop {
override val authorEmail: String = "xiaomeng7890@gmail.com" override val authorEmail: String = "xiaomeng7890@gmail.com"
override val description: String = "some hive can only achieve by jdbc, this stop is designed for this" override val description: String = "some hive can only achieve by jdbc, this stop is designed for this"
override val inportList: List[String] = List(Port.NonePort) override val inportList: List[String] = List(Port.NonePort)

View File

@ -13,7 +13,7 @@ import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
class ReadImpala extends ConfigurableStop{ class ImpalaRead extends ConfigurableStop{
override val authorEmail: String = "yangqidong@cnic.cn" override val authorEmail: String = "yangqidong@cnic.cn"
override val description: String = "Get data from impala" override val description: String = "Get data from impala"
val inportList: List[String] = List(Port.NonePort.toString) val inportList: List[String] = List(Port.NonePort.toString)

View File

@ -17,8 +17,8 @@ class JdbcReadFromOracle extends ConfigurableStop{
val authorEmail: String = "yangqidong@cnic.cn" val authorEmail: String = "yangqidong@cnic.cn"
val description: String = "Read from oracle" val description: String = "Read from oracle"
val inportList: List[String] = List(Port.NonePort.toString) val inportList: List[String] = List(Port.NonePort)
val outportList: List[String] = List(Port.DefaultPort.toString) val outportList: List[String] = List(Port.DefaultPort)
var url:String = _ var url:String = _
var user:String = _ var user:String = _
@ -132,8 +132,6 @@ class JdbcReadFromOracle extends ConfigurableStop{
val df: DataFrame = session.createDataFrame(rdd,schemaNew) val df: DataFrame = session.createDataFrame(rdd,schemaNew)
out.write(df) out.write(df)
} }
def initialize(ctx: ProcessContext): Unit = { def initialize(ctx: ProcessContext): Unit = {
@ -157,6 +155,7 @@ class JdbcReadFromOracle extends ConfigurableStop{
.description("The Url, for example jdbc:oracle:thin:@192.168.0.1:1521/newdb") .description("The Url, for example jdbc:oracle:thin:@192.168.0.1:1521/newdb")
.defaultValue("") .defaultValue("")
.required(true) .required(true)
.example("jdbc:oracle:thin:@192.168.0.1:1521/newdb")
descriptor = url :: descriptor descriptor = url :: descriptor
val user=new PropertyDescriptor() val user=new PropertyDescriptor()
@ -165,6 +164,7 @@ class JdbcReadFromOracle extends ConfigurableStop{
.description("The user name of database") .description("The user name of database")
.defaultValue("") .defaultValue("")
.required(true) .required(true)
.example("root")
descriptor = user :: descriptor descriptor = user :: descriptor
val password=new PropertyDescriptor() val password=new PropertyDescriptor()
@ -173,6 +173,7 @@ class JdbcReadFromOracle extends ConfigurableStop{
.description("The password of database") .description("The password of database")
.defaultValue("") .defaultValue("")
.required(true) .required(true)
.example("123456")
descriptor = password :: descriptor descriptor = password :: descriptor
val sql=new PropertyDescriptor() val sql=new PropertyDescriptor()
@ -181,6 +182,7 @@ class JdbcReadFromOracle extends ConfigurableStop{
.description("The sql you want") .description("The sql you want")
.defaultValue("") .defaultValue("")
.required(true) .required(true)
.example("select * from type")
descriptor = sql :: descriptor descriptor = sql :: descriptor
val schema=new PropertyDescriptor() val schema=new PropertyDescriptor()
@ -189,6 +191,7 @@ class JdbcReadFromOracle extends ConfigurableStop{
.description("The name of the field of your SQL statement query, such as: ID.number, name.varchar") .description("The name of the field of your SQL statement query, such as: ID.number, name.varchar")
.defaultValue("") .defaultValue("")
.required(true) .required(true)
.example("ID.number, name.varchar")
descriptor = schema :: descriptor descriptor = schema :: descriptor
descriptor descriptor
@ -199,7 +202,7 @@ class JdbcReadFromOracle extends ConfigurableStop{
} }
override def getGroup(): List[String] = { override def getGroup(): List[String] = {
List(StopGroup.JdbcGroup.toString) List(StopGroup.JdbcGroup)
} }

View File

@ -7,12 +7,12 @@ import cn.piflow.conf.util.{ImageUtil, MapUtil}
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSession
class JdbcRead extends ConfigurableStop { class MysqlRead extends ConfigurableStop {
val authorEmail: String = "xjzhu@cnic.cn" val authorEmail: String = "xjzhu@cnic.cn"
val description: String = "Read data from jdbc database" val description: String = "Read data from mysql database with jdbc"
val inportList: List[String] = List(Port.NonePort.toString) val inportList: List[String] = List(Port.NonePort)
val outportList: List[String] = List(Port.DefaultPort.toString) val outportList: List[String] = List(Port.DefaultPort)
var url:String = _ var url:String = _
var user:String = _ var user:String = _
@ -51,7 +51,7 @@ class JdbcRead extends ConfigurableStop {
val url=new PropertyDescriptor() val url=new PropertyDescriptor()
.name("url") .name("url")
.displayName("Url") .displayName("Url")
.description("The Url of database") .description("The Url of mysql database")
.defaultValue("") .defaultValue("")
.required(true) .required(true)
.example("jdbc:mysql://127.0.0.1/dbname") .example("jdbc:mysql://127.0.0.1/dbname")
@ -60,7 +60,7 @@ class JdbcRead extends ConfigurableStop {
val user=new PropertyDescriptor() val user=new PropertyDescriptor()
.name("user") .name("user")
.displayName("User") .displayName("User")
.description("The user name of database") .description("The user name of mysql database")
.defaultValue("") .defaultValue("")
.required(true) .required(true)
.example("root") .example("root")
@ -69,7 +69,7 @@ class JdbcRead extends ConfigurableStop {
val password=new PropertyDescriptor() val password=new PropertyDescriptor()
.name("password") .name("password")
.displayName("Password") .displayName("Password")
.description("The password of database") .description("The password of mysql database")
.defaultValue("") .defaultValue("")
.required(true) .required(true)
.example("12345") .example("12345")

View File

@ -17,14 +17,12 @@ class MysqlReadIncremental extends ConfigurableIncrementalStop{
override val inportList: List[String] = List(Port.NonePort.toString) override val inportList: List[String] = List(Port.NonePort.toString)
override val outportList: List[String] = List(Port.DefaultPort.toString) override val outportList: List[String] = List(Port.DefaultPort.toString)
//var driver:String = _
var url:String = _ var url:String = _
var user:String = _ var user:String = _
var password:String = _ var password:String = _
var sql:String = _ var sql:String = _
override def setProperties(map: Map[String, Any]): Unit = { override def setProperties(map: Map[String, Any]): Unit = {
//driver = MapUtil.get(map,"driver").asInstanceOf[String]
url = MapUtil.get(map,"url").asInstanceOf[String] url = MapUtil.get(map,"url").asInstanceOf[String]
user = MapUtil.get(map,"user").asInstanceOf[String] user = MapUtil.get(map,"user").asInstanceOf[String]
password = MapUtil.get(map,"password").asInstanceOf[String] password = MapUtil.get(map,"password").asInstanceOf[String]
@ -36,25 +34,53 @@ class MysqlReadIncremental extends ConfigurableIncrementalStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
//val driver=new PropertyDescriptor().name("driver").displayName("Driver").description("The driver name, for example com.mysql.jdbc.Driver").defaultValue("").required(true)
//descriptor = driver :: descriptor
val url=new PropertyDescriptor().name("url").displayName("url").description("The Url, for example jdbc:mysql://127.0.0.1/dbname").defaultValue("").required(true) val url=new PropertyDescriptor()
.name("url")
.displayName("url")
.description("The Url, for example jdbc:mysql://127.0.0.1/dbname")
.defaultValue("")
.required(true)
descriptor = url :: descriptor descriptor = url :: descriptor
val user=new PropertyDescriptor().name("user").displayName("user").description("The user name of database").defaultValue("").required(true) val user=new PropertyDescriptor()
.name("user")
.displayName("user")
.description("The user name of database")
.defaultValue("")
.required(true)
descriptor = user :: descriptor descriptor = user :: descriptor
val password=new PropertyDescriptor().name("password").displayName("password").description("The password of database").defaultValue("").required(true) val password=new PropertyDescriptor()
.name("password")
.displayName("password")
.description("The password of database")
.defaultValue("")
.required(true)
descriptor = password :: descriptor descriptor = password :: descriptor
val sql=new PropertyDescriptor().name("sql").displayName("sql").description("The sql sentence you want to execute").defaultValue("").required(true) val sql=new PropertyDescriptor()
.name("sql")
.displayName("sql")
.description("The sql sentence you want to execute")
.defaultValue("")
.required(true)
descriptor = sql :: descriptor descriptor = sql :: descriptor
val incrementalField=new PropertyDescriptor().name("incrementalField").displayName("incrementalField").description("The incremental field").defaultValue("").required(true) val incrementalField=new PropertyDescriptor()
.name("incrementalField")
.displayName("incrementalField")
.description("The incremental field")
.defaultValue("")
.required(true)
descriptor = incrementalField :: descriptor descriptor = incrementalField :: descriptor
val incrementalStart=new PropertyDescriptor().name("incrementalStart").displayName("incrementalStart").description("The incremental start value").defaultValue("").required(true) val incrementalStart=new PropertyDescriptor()
.name("incrementalStart")
.displayName("incrementalStart")
.description("The incremental start value")
.defaultValue("")
.required(true)
descriptor = incrementalStart :: descriptor descriptor = incrementalStart :: descriptor
descriptor descriptor

View File

@ -10,17 +10,18 @@ import org.apache.spark.sql.{SaveMode, SparkSession}
import scala.beans.BeanProperty import scala.beans.BeanProperty
class WriteMysql extends ConfigurableStop{ class MysqlWrite extends ConfigurableStop{
val authorEmail: String = "xjzhu@cnic.cn" val authorEmail: String = "xjzhu@cnic.cn"
val description: String = "Write data into jdbc database" val description: String = "Write data to mysql database with jdbc"
val inportList: List[String] = List(Port.DefaultPort.toString) val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.NonePort.toString) val outportList: List[String] = List(Port.NonePort)
var url:String = _ var url:String = _
var user:String = _ var user:String = _
var password:String = _ var password:String = _
var dbtable:String = _ var dbtable:String = _
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]() val spark = pec.get[SparkSession]()
val jdbcDF = in.read() val jdbcDF = in.read()
@ -28,7 +29,6 @@ class WriteMysql extends ConfigurableStop{
properties.put("user", user) properties.put("user", user)
properties.put("password", password) properties.put("password", password)
jdbcDF.write.mode(SaveMode.Append).jdbc(url,dbtable,properties) jdbcDF.write.mode(SaveMode.Append).jdbc(url,dbtable,properties)
//jdbcDF.show(10)
out.write(jdbcDF) out.write(jdbcDF)
} }
@ -46,22 +46,42 @@ class WriteMysql extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val url=new PropertyDescriptor().name("url").displayName("url").description("The Url, for example jdbc:mysql://127.0.0.1/dbname").defaultValue("").required(true) val url=new PropertyDescriptor()
//descriptor = url :: descriptor .name("url")
.displayName("Url")
val user=new PropertyDescriptor().name("user").displayName("user").description("The user name of database").defaultValue("").required(true) .description("The Url, for example jdbc:mysql://127.0.0.1/dbname")
//descriptor = user :: descriptor .defaultValue("")
.required(true)
val password=new PropertyDescriptor().name("password").displayName("password").description("The password of database").defaultValue("").required(true) .example("jdbc:mysql://127.0.0.1/dbname")
//descriptor = password :: descriptor
val dbtable=new PropertyDescriptor().name("dbtable").displayName("dbtable").description("The table you want to write").defaultValue("").required(true)
//descriptor = dbtable :: descriptor
descriptor = url :: descriptor descriptor = url :: descriptor
val user=new PropertyDescriptor()
.name("user")
.displayName("User")
.description("The user name of database")
.defaultValue("")
.required(true)
.example("root")
descriptor = user :: descriptor descriptor = user :: descriptor
val password=new PropertyDescriptor()
.name("password")
.displayName("Password")
.description("The password of database")
.defaultValue("")
.required(true)
.example("123456")
descriptor = password :: descriptor descriptor = password :: descriptor
val dbtable=new PropertyDescriptor()
.name("dbtable")
.displayName("DBTable")
.description("The table you want to write")
.defaultValue("")
.required(true)
.example("test")
descriptor = dbtable :: descriptor descriptor = dbtable :: descriptor
descriptor descriptor
} }
@ -70,7 +90,7 @@ class WriteMysql extends ConfigurableStop{
} }
override def getGroup(): List[String] = { override def getGroup(): List[String] = {
List(StopGroup.JdbcGroup.toString) List(StopGroup.JdbcGroup)
} }

View File

@ -44,16 +44,40 @@ class OracleRead extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val url=new PropertyDescriptor().name("url").displayName("url").description("The Url, for example jdbc:mysql://127.0.0.1/dbname").defaultValue("").required(true) val url=new PropertyDescriptor()
.name("url")
.displayName("Url")
.description("The Url, for example jdbc:oracle:thin:@10.0.86.237:1521/newdb")
.defaultValue("")
.required(true)
.example("jdbc:oracle:thin:@10.0.86.237:1521/newdb")
descriptor = url :: descriptor descriptor = url :: descriptor
val user=new PropertyDescriptor().name("user").displayName("user").description("The user name of database").defaultValue("").required(true) val user=new PropertyDescriptor()
.name("user")
.displayName("User")
.description("The user name of database")
.defaultValue("")
.required(true)
.example("root")
descriptor = user :: descriptor descriptor = user :: descriptor
val password=new PropertyDescriptor().name("password").displayName("password").description("The password of database").defaultValue("").required(true) val password=new PropertyDescriptor()
.name("password")
.displayName("Password")
.description("The password of database")
.defaultValue("")
.required(true)
.example("123456")
descriptor = password :: descriptor descriptor = password :: descriptor
val sql=new PropertyDescriptor().name("sql").displayName("sql").description("The sql sentence you want to execute").defaultValue("").required(true) val sql=new PropertyDescriptor()
.name("sql")
.displayName("Sql")
.description("The sql sentence you want to execute")
.defaultValue("")
.required(true)
.example("select * from test")
descriptor = sql :: descriptor descriptor = sql :: descriptor
descriptor descriptor
@ -64,7 +88,7 @@ class OracleRead extends ConfigurableStop{
} }
override def getGroup(): List[String] = { override def getGroup(): List[String] = {
List(StopGroup.JdbcGroup.toString) List(StopGroup.JdbcGroup)
} }
override def initialize(ctx: ProcessContext): Unit = {} override def initialize(ctx: ProcessContext): Unit = {}

View File

@ -38,28 +38,76 @@ class OracleReadByPartition extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val url=new PropertyDescriptor().name("url").displayName("url").description("The Url, for example jdbc:mysql://127.0.0.1/dbname").defaultValue("").required(true) val url=new PropertyDescriptor()
.name("url")
.displayName("Url")
.description("The Url,for example jdbc:oracle:thin:@10.0.86.237:1521/newdb")
.defaultValue("")
.required(true)
.example("jdbc:oracle:thin:@10.0.86.237:1521/newdb")
descriptor = url :: descriptor descriptor = url :: descriptor
val user=new PropertyDescriptor().name("user").displayName("user").description("The user name of database").defaultValue("").required(true) val user=new PropertyDescriptor()
.name("user")
.displayName("User")
.description("The user name of database")
.defaultValue("")
.required(true)
.example("root")
descriptor = user :: descriptor descriptor = user :: descriptor
val password=new PropertyDescriptor().name("password").displayName("password").description("The password of database").defaultValue("").required(true) val password=new PropertyDescriptor()
.name("password")
.displayName("Password")
.description("The password of database")
.defaultValue("")
.required(true)
.example("123456")
descriptor = password :: descriptor descriptor = password :: descriptor
val sql=new PropertyDescriptor().name("sql").displayName("sql").description("The sql sentence you want to execute").defaultValue("").required(true) val sql=new PropertyDescriptor()
.name("sql")
.displayName("Sql")
.description("The sql sentence you want to execute")
.defaultValue("")
.required(true)
.example("select * from test")
descriptor = sql :: descriptor descriptor = sql :: descriptor
val partitionColumn=new PropertyDescriptor().name("partitionColumn").displayName("partitionColumn").description("The partitionby column").defaultValue("").required(true) val partitionColumn=new PropertyDescriptor()
.name("partitionColumn")
.displayName("PartitionColumn")
.description("partitioned column")
.defaultValue("")
.required(true)
.example("id")
descriptor = partitionColumn :: descriptor descriptor = partitionColumn :: descriptor
val lowerBound=new PropertyDescriptor().name("lowerBound").displayName("lowerBound").description("The lowerBound of partitioned column").defaultValue("").required(true) val lowerBound=new PropertyDescriptor()
.name("lowerBound")
.displayName("LowerBound")
.description("The lowerbound of partitioned columns")
.defaultValue("")
.required(true)
.example("1")
descriptor = lowerBound :: descriptor descriptor = lowerBound :: descriptor
val upperBound=new PropertyDescriptor().name("upperBound").displayName("upperBound").description("The upperBound of partitioned column").defaultValue("").required(true) val upperBound=new PropertyDescriptor()
.name("upperBound")
.displayName("UpperBound")
.description("The upperBound of partitioned columns")
.defaultValue("")
.required(true)
.example("20")
descriptor = upperBound :: descriptor descriptor = upperBound :: descriptor
val numPartitions=new PropertyDescriptor().name("numPartitions").displayName("numPartitions").description("The number of partitions ").defaultValue("").required(true) val numPartitions=new PropertyDescriptor()
.name("numPartitions")
.displayName("NumPartitions")
.description("The number of partitions ")
.defaultValue("")
.required(true)
.example("5")
descriptor = numPartitions :: descriptor descriptor = numPartitions :: descriptor
descriptor descriptor
@ -70,7 +118,7 @@ class OracleReadByPartition extends ConfigurableStop{
} }
override def getGroup(): List[String] = { override def getGroup(): List[String] = {
List(StopGroup.JdbcGroup.toString) List(StopGroup.JdbcGroup)
} }
override def initialize(ctx: ProcessContext): Unit = {} override def initialize(ctx: ProcessContext): Unit = {}

View File

@ -8,19 +8,18 @@ import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil} import cn.piflow.conf.util.{ImageUtil, MapUtil}
import org.apache.spark.sql._ import org.apache.spark.sql._
class WriteOracle extends ConfigurableStop{ class OracleWrite extends ConfigurableStop{
val authorEmail: String = "yangqidong@cnic.cn" val authorEmail: String = "yangqidong@cnic.cn"
val description: String = "Write data to oracle" val description: String = "Write data to oracle"
val inportList: List[String] = List(Port.NonePort.toString) val inportList: List[String] = List(Port.NonePort)
val outportList: List[String] = List(Port.DefaultPort.toString) val outportList: List[String] = List(Port.DefaultPort)
var url:String = _ var url:String = _
var user:String = _ var user:String = _
var password:String = _ var password:String = _
var table:String = _ var table:String = _
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val session = pec.get[SparkSession]() val session = pec.get[SparkSession]()
val inDF: DataFrame = in.read() val inDF: DataFrame = in.read()
@ -67,16 +66,40 @@ class WriteOracle extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val url=new PropertyDescriptor().name("url").displayName("url").description("The Url, for example jdbc:oracle:thin:@192.168.0.1:1521/newdb").defaultValue("").required(true) val url=new PropertyDescriptor()
.name("url")
.displayName("Url")
.description("The Url, for example jdbc:oracle:thin:@10.0.86.237:1521/newdb")
.defaultValue("")
.required(true)
.example("jdbc:oracle:thin:@10.0.86.237:1521/newdb")
descriptor = url :: descriptor descriptor = url :: descriptor
val user=new PropertyDescriptor().name("user").displayName("user").description("The user name of database").defaultValue("").required(true) val user=new PropertyDescriptor()
.name("user")
.displayName("User")
.description("The user name of database")
.defaultValue("")
.required(true)
.example("root")
descriptor = user :: descriptor descriptor = user :: descriptor
val password=new PropertyDescriptor().name("password").displayName("password").description("The password of database").defaultValue("").required(true) val password=new PropertyDescriptor()
.name("password")
.displayName("Password")
.description("The password of database")
.defaultValue("")
.required(true)
.example("123456")
descriptor = password :: descriptor descriptor = password :: descriptor
val table=new PropertyDescriptor().name("table").displayName("table").description("The table you want to write to").defaultValue("").required(true) val table=new PropertyDescriptor()
.name("table")
.displayName("table")
.description("The table name")
.defaultValue("")
.required(true)
.example("test")
descriptor = table :: descriptor descriptor = table :: descriptor
descriptor descriptor
@ -87,7 +110,7 @@ class WriteOracle extends ConfigurableStop{
} }
override def getGroup(): List[String] = { override def getGroup(): List[String] = {
List(StopGroup.JdbcGroup.toString) List(StopGroup.JdbcGroup)
} }

View File

@ -7,6 +7,7 @@ import cn.piflow.conf.util.{ImageUtil, MapUtil}
import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.{DataFrame, SparkSession}
class EvaluateJsonPath extends ConfigurableStop{ class EvaluateJsonPath extends ConfigurableStop{
override val authorEmail: String = "yangqidong@cnic.cn" override val authorEmail: String = "yangqidong@cnic.cn"
val inportList: List[String] = List(Port.NonePort.toString) val inportList: List[String] = List(Port.NonePort.toString)
val outportList: List[String] = List(Port.DefaultPort.toString) val outportList: List[String] = List(Port.DefaultPort.toString)

View File

@ -9,13 +9,13 @@ import org.junit.Test
import scala.util.parsing.json.JSON import scala.util.parsing.json.JSON
class getOracleTest { class JdbcReadFromOracleTest {
@Test @Test
def testFlow(): Unit ={ def testFlow(): Unit ={
//parse flow json //parse flow json
val file = "src/main/resources/JDBC/getOracle.json" val file = "src/main/resources/JDBC/JdbcReadFromOracle.json"
val flowJsonStr = FileUtil.fileReader(file) val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]] val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map) println(map)
@ -26,7 +26,6 @@ class getOracleTest {
val h2Server = Server.createTcpServer("-tcp","-tcpAllowOthers","-tcpPort","50001").start() val h2Server = Server.createTcpServer("-tcp","-tcpAllowOthers","-tcpPort","50001").start()
//execute flow //execute flow
val spark = SparkSession.builder() val spark = SparkSession.builder()
.master("spark://10.0.86.89:7077") .master("spark://10.0.86.89:7077")

View File

@ -1,4 +1,4 @@
package cn.piflow.bundle.csv package cn.piflow.bundle.JDBC
import cn.piflow.Runner import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean import cn.piflow.conf.bean.FlowBean
@ -10,13 +10,13 @@ import org.junit.Test
import scala.util.parsing.json.JSON import scala.util.parsing.json.JSON
class CsvSaveAsErrorTest { class MysqlReadTest {
@Test @Test
def testFlow(): Unit ={ def testFlow(): Unit ={
//parse flow json //parse flow json
val file = "src/main/resources/flow/csv/CsvSaveAsError.json" val file = "src/main/resources/flow/jdbc/MysqlRead.json"
val flowJsonStr = FileUtil.fileReader(file) val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]] val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map) println(map)
@ -24,12 +24,13 @@ class CsvSaveAsErrorTest {
//create flow //create flow
val flowBean = FlowBean(map) val flowBean = FlowBean(map)
val flow = flowBean.constructFlow() val flow = flowBean.constructFlow()
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort", "50001").start() val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort", "50001").start()
//execute flow //execute flow
val spark = SparkSession.builder() val spark = SparkSession.builder()
.master("local[*]") .master("local[*]")
.appName("piflow-hive-bundle") .appName("CsvParserTest")
.config("spark.driver.memory", "1g") .config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g") .config("spark.executor.memory", "2g")
.config("spark.cores.max", "2") .config("spark.cores.max", "2")

View File

@ -3,19 +3,20 @@ package cn.piflow.bundle.JDBC
import cn.piflow.Runner import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil} import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.PropertyUtil
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSession
import org.h2.tools.Server import org.h2.tools.Server
import org.junit.Test import org.junit.Test
import scala.util.parsing.json.JSON import scala.util.parsing.json.JSON
class JdbcReadTest { class MysqlWriteTest {
@Test @Test
def testFlow(): Unit ={ def testFlow(): Unit ={
//parse flow json //parse flow json
val file = "src/main/resources/flow/jdbc/JdbcRead.json" val file = "src/main/resources/flow/jdbc/MysqlWrite.json"
val flowJsonStr = FileUtil.fileReader(file) val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]] val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map) println(map)
@ -33,7 +34,7 @@ class JdbcReadTest {
.config("spark.driver.memory", "1g") .config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g") .config("spark.executor.memory", "2g")
.config("spark.cores.max", "2") .config("spark.cores.max", "2")
.config("hive.metastore.uris", "thrift://192.168.3.140:9083") .config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport() .enableHiveSupport()
.getOrCreate() .getOrCreate()

View File

@ -1,23 +1,22 @@
package cn.piflow.bundle.csv package cn.piflow.bundle.JDBC
import cn.piflow.bundle.json.JsonSave import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil} import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.PropertyUtil import cn.piflow.util.PropertyUtil
import cn.piflow.{FlowImpl, Path, Runner}
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSession
import org.h2.tools.Server import org.h2.tools.Server
import org.junit.Test import org.junit.Test
import scala.util.parsing.json.JSON import scala.util.parsing.json.JSON
class CsvSaveAsAppendTest { class OracleReadByPartitionTest {
@Test @Test
def testFlow(): Unit ={ def testFlow(): Unit ={
//parse flow json //parse flow json
val file = "src/main/resources/flow/csv/CsvSaveAsAppend.json" val file = "src/main/resources/flow/jdbc/OracleReadByPartition.json"
val flowJsonStr = FileUtil.fileReader(file) val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]] val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map) println(map)
@ -25,12 +24,13 @@ class CsvSaveAsAppendTest {
//create flow //create flow
val flowBean = FlowBean(map) val flowBean = FlowBean(map)
val flow = flowBean.constructFlow() val flow = flowBean.constructFlow()
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort", "50001").start() val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort", "50001").start()
//execute flow //execute flow
val spark = SparkSession.builder() val spark = SparkSession.builder()
.master("local[*]") .master("local[*]")
.appName("piflow-hive-bundle") .appName("CsvParserTest")
.config("spark.driver.memory", "1g") .config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g") .config("spark.executor.memory", "2g")
.config("spark.cores.max", "2") .config("spark.cores.max", "2")

View File

@ -0,0 +1,53 @@
package cn.piflow.bundle.JDBC
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.PropertyUtil
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
class OracleReadTest {
@Test
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/jdbc/OracleRead.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort", "50001").start()
//execute flow
val spark = SparkSession.builder()
.master("local[*]")
.appName("CsvParserTest")
.config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g")
.config("spark.cores.max", "2")
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "")
.bind("debug.path","")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
}

View File

@ -0,0 +1,53 @@
package cn.piflow.bundle.JDBC
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.PropertyUtil
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
class OracleWriteTest {
@Test
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/jdbc/OracleWrite.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort", "50001").start()
//execute flow
val spark = SparkSession.builder()
.master("local[*]")
.appName("CsvParserTest")
.config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g")
.config("spark.cores.max", "2")
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "")
.bind("debug.path","")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
}

View File

@ -1,52 +0,0 @@
package cn.piflow.bundle.csv
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.PropertyUtil
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
class CsvSaveAsIgnoreTest {
@Test
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/csv/CsvSaveAsIgnore.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort", "50001").start()
//execute flow
val spark = SparkSession.builder()
.master("local[*]")
.appName("piflow-hive-bundle")
.config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g")
.config("spark.cores.max", "2")
.config("hive.metastore.uris", PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "")
.bind("debug.path","")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
}

View File

@ -0,0 +1,165 @@
package cn.piflow.bundle.csv
import cn.piflow.bundle.json.JsonSave
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.PropertyUtil
import cn.piflow.{FlowImpl, Path, Runner}
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
class CsvSaveTest {
@Test
def CsvSaveAsAppendFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/csv/CsvSaveAsAppend.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort", "50001").start()
//execute flow
val spark = SparkSession.builder()
.master("local[*]")
.appName("piflow-hive-bundle")
.config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g")
.config("spark.cores.max", "2")
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "")
.bind("debug.path","")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
@Test
def CsvSaveAsOverWriteFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/csv/CsvSaveAsOverWrite.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort", "50001").start()
//execute flow
val spark = SparkSession.builder()
.master("local[*]")
.appName("piflow-hive-bundle")
.config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g")
.config("spark.cores.max", "2")
.config("hive.metastore.uris", PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "")
.bind("debug.path","")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
@Test
def CsvSaveAsErrorFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/csv/CsvSaveAsError.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort", "50001").start()
//execute flow
val spark = SparkSession.builder()
.master("local[*]")
.appName("piflow-hive-bundle")
.config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g")
.config("spark.cores.max", "2")
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "")
.bind("debug.path","")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
@Test
def CsvSaveAsIgnoreFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/csv/CsvSaveAsIgnore.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort", "50001").start()
//execute flow
val spark = SparkSession.builder()
.master("local[*]")
.appName("piflow-hive-bundle")
.config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g")
.config("spark.cores.max", "2")
.config("hive.metastore.uris", PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "")
.bind("debug.path","")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
}

View File

@ -0,0 +1,58 @@
package cn.piflow.bundle.graphx
import java.net.InetAddress
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.{PropertyUtil, ServerIpUtil}
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
class LoadGraph {
@Test
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/graphx/LoadGraph.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val ip = InetAddress.getLocalHost.getHostAddress
cn.piflow.util.FileUtil.writeFile("server.ip=" + ip, ServerIpUtil.getServerIpFile())
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start()
//execute flow
val spark = SparkSession.builder()
.master("local[12]")
.appName("hive")
.config("spark.driver.memory", "4g")
.config("spark.executor.memory", "8g")
.config("spark.cores.max", "8")
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "")
.bind("debug.path","")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
}

View File

@ -3,6 +3,7 @@ package cn.piflow.bundle.hive
import cn.piflow.Runner import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil} import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.PropertyUtil
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSession
import org.h2.tools.Server import org.h2.tools.Server
import org.junit.Test import org.junit.Test
@ -33,7 +34,7 @@ class PutHiveModeTest {
.config("spark.driver.memory", "1g") .config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g") .config("spark.executor.memory", "2g")
.config("spark.cores.max", "2") .config("spark.cores.max", "2")
.config("hive.metastore.uris", "thrift://192.168.3.140:9083") .config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport() .enableHiveSupport()
.getOrCreate() .getOrCreate()

View File

@ -3,6 +3,7 @@ package cn.piflow.bundle.hive
import cn.piflow.Runner import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil} import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.PropertyUtil
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSession
import org.h2.tools.Server import org.h2.tools.Server
import org.junit.Test import org.junit.Test
@ -33,7 +34,7 @@ class PutHiveQLTest {
.config("spark.driver.memory", "1g") .config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g") .config("spark.executor.memory", "2g")
.config("spark.cores.max", "2") .config("spark.cores.max", "2")
.config("hive.metastore.uris", "thrift://192.168.3.140:9083") .config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport() .enableHiveSupport()
.getOrCreate() .getOrCreate()

View File

@ -3,6 +3,7 @@ package cn.piflow.bundle.hive
import cn.piflow.Runner import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil} import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.PropertyUtil
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSession
import org.h2.tools.Server import org.h2.tools.Server
import org.junit.Test import org.junit.Test
@ -33,7 +34,7 @@ class PutHiveStreamingTest {
.config("spark.driver.memory", "1g") .config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g") .config("spark.executor.memory", "2g")
.config("spark.cores.max", "2") .config("spark.cores.max", "2")
.config("hive.metastore.uris", "thrift://192.168.3.140:9083") .config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport() .enableHiveSupport()
.getOrCreate() .getOrCreate()

View File

@ -3,19 +3,20 @@ package cn.piflow.bundle.hive
import cn.piflow.Runner import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil} import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.PropertyUtil
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSession
import org.h2.tools.Server import org.h2.tools.Server
import org.junit.Test import org.junit.Test
import scala.util.parsing.json.JSON import scala.util.parsing.json.JSON
class OptionalSelectHiveQLTest { class SelectHiveQLByJdbcTest {
@Test @Test
def testFlow(): Unit ={ def testFlow(): Unit ={
//parse flow json //parse flow json
val file = "src/main/resources/flow/hive/OptionalSelectHiveQL.json" val file = "src/main/resources/flow/hive/SelectHiveQLByJdbc.json"
val flowJsonStr = FileUtil.fileReader(file) val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]] val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map) println(map)
@ -33,7 +34,7 @@ class OptionalSelectHiveQLTest {
.config("spark.driver.memory", "1g") .config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g") .config("spark.executor.memory", "2g")
.config("spark.cores.max", "2") .config("spark.cores.max", "2")
.config("hive.metastore.uris", "thrift://192.168.3.140:9083") .config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport() .enableHiveSupport()
.getOrCreate() .getOrCreate()

View File

@ -3,6 +3,7 @@ package cn.piflow.bundle.hive
import cn.piflow.Runner import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil} import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.PropertyUtil
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSession
import org.h2.tools.Server import org.h2.tools.Server
import org.junit.Test import org.junit.Test
@ -33,7 +34,7 @@ class SelectHiveQLTest {
.config("spark.driver.memory", "1g") .config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g") .config("spark.executor.memory", "2g")
.config("spark.cores.max", "2") .config("spark.cores.max", "2")
.config("hive.metastore.uris", "thrift://192.168.3.140:9083") .config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport() .enableHiveSupport()
.getOrCreate() .getOrCreate()