diff --git a/piflow-bundle/src/main/resources/flow/hive/OptionalSelectHiveQL.json b/piflow-bundle/src/main/resources/flow/hive/SelectHiveQLByJdbc.json similarity index 100% rename from piflow-bundle/src/main/resources/flow/hive/OptionalSelectHiveQL.json rename to piflow-bundle/src/main/resources/flow/hive/SelectHiveQLByJdbc.json diff --git a/piflow-bundle/src/main/resources/flow/jdbc/JdbcReadFromOracle.json b/piflow-bundle/src/main/resources/flow/jdbc/JdbcReadFromOracle.json new file mode 100644 index 0000000..c9e70cc --- /dev/null +++ b/piflow-bundle/src/main/resources/flow/jdbc/JdbcReadFromOracle.json @@ -0,0 +1,45 @@ +{ + "flow":{ + "name":"test", + "uuid":"1234", + "checkpoint":"Merge", + "stops":[ + + { + "uuid":"0000", + "name":"JdbcReadFromOracle", + "bundle":"cn.piflow.bundle.jdbc.JdbcReadFromOracle", + "properties":{ + "url":"jdbc:oracle:thin:@10.0.86.237:1521/newdb", + "user":"my", + "password":"bigdata", + "sql":"select * from typetype", + "schema":"mynum.number,mychar.varchar2,myblob.blob,myclob.clob,myxml.xmltype,mylong.long,mydate.date,mynclob.nclob" + } + + }, + { + "uuid":"1111", + "name":"CsvParser", + "bundle":"cn.piflow.bundle.csv.CsvParser", + "properties":{ + "csvPath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.csv", + "header":"false", + "delimiter":",", + "schema":"title,author,pages" + } + } + + ], + "paths":[ + { + "from":"JdbcReadFromOracle", + "outport":"", + "inport":"", + "to":"CsvParser" + } + + + ] + } +} \ No newline at end of file diff --git a/piflow-bundle/src/main/resources/flow/jdbc/JdbcRead.json b/piflow-bundle/src/main/resources/flow/jdbc/MysqlRead.json similarity index 83% rename from piflow-bundle/src/main/resources/flow/jdbc/JdbcRead.json rename to piflow-bundle/src/main/resources/flow/jdbc/MysqlRead.json index 953bd8f..61ae0ac 100644 --- a/piflow-bundle/src/main/resources/flow/jdbc/JdbcRead.json +++ b/piflow-bundle/src/main/resources/flow/jdbc/MysqlRead.json @@ -6,8 +6,8 @@ { "uuid":"1111", - "name":"JdbcRead", - "bundle":"cn.piflow.bundle.jdbc.JdbcRead", + "name":"MysqlRead", + "bundle":"cn.piflow.bundle.jdbc.MysqlRead", "properties":{ "url": "jdbc:mysql://192.168.3.141:3306/piflow_web", "user": "root", @@ -23,7 +23,8 @@ "csvSavePath":"hdfs://192.168.3.138:8020/test/", "header": "true", "delimiter":",", - "partition":"1" + "partition":"1", + "saveMode": "append" } } diff --git a/piflow-bundle/src/main/resources/flow/jdbc/MysqlWrite.json b/piflow-bundle/src/main/resources/flow/jdbc/MysqlWrite.json new file mode 100644 index 0000000..470f10e --- /dev/null +++ b/piflow-bundle/src/main/resources/flow/jdbc/MysqlWrite.json @@ -0,0 +1,38 @@ +{ + "flow":{ + "name":"test", + "uuid":"1234", + "stops":[ + { + "uuid":"1111", + "name":"CsvParser", + "bundle":"cn.piflow.bundle.csv.CsvParser", + "properties":{ + "csvPath":"hdfs://192.168.3.138:8020/test/", + "header": "true", + "delimiter":",", + "schema":"id,name,gender,age" + } + }, + { + "uuid":"1111", + "name":"MysqlWrite", + "bundle":"cn.piflow.bundle.jdbc.MysqlWrite", + "properties":{ + "url": "jdbc:mysql://192.168.3.141:3306/piflow_web", + "user": "root", + "password": "bigdata", + "dbtable":"MysqlWrite" + } + } + ], + "paths":[ + { + "from":"CsvParser", + "outport":"", + "inport":"", + "to":"MysqlWrite" + } + ] + } +} \ No newline at end of file diff --git a/piflow-bundle/src/main/resources/flow/jdbc/OracleRead.json b/piflow-bundle/src/main/resources/flow/jdbc/OracleRead.json new file mode 100644 index 0000000..83dd0d4 --- /dev/null +++ b/piflow-bundle/src/main/resources/flow/jdbc/OracleRead.json @@ -0,0 +1,42 @@ +{ + "flow":{ + "name":"test", + "uuid":"1234", + "stops":[ + + { + "uuid":"1111", + "name":"OracleRead", + "bundle":"cn.piflow.bundle.jdbc.OracleRead", + "properties":{ + "url": "jdbc:oracle:thin:@(DESCRIPTION =(ADDRESS_LIST =(ADDRESS=(PROTOCOL=TCP)(HOST=192.168.2.237)(PORT=1521)))(CONNECT_DATA=(SERVICE_NAME=RACDB_STANDBY)))", + "user": "dashuju", + "password": "DaShuju_732", + "sql":"select rpt_code,submit_date from egrant_isis.rpt_completion where rownum <=10" + } + }, + { + "uuid":"1324", + "name":"CsvSave", + "bundle":"cn.piflow.bundle.csv.CsvSave", + "properties":{ + "csvSavePath":"hdfs://192.168.3.138:8020/test/", + "header": "true", + "delimiter":",", + "partition":"1", + "saveMode": "append" + } + + } + + ], + "paths":[ + { + "from":"OracleRead", + "outport":"", + "inport":"", + "to":"CsvSave" + } + ] + } +} \ No newline at end of file diff --git a/piflow-bundle/src/main/resources/flow/jdbc/OracleReadByPartition.json b/piflow-bundle/src/main/resources/flow/jdbc/OracleReadByPartition.json new file mode 100644 index 0000000..43bd7ac --- /dev/null +++ b/piflow-bundle/src/main/resources/flow/jdbc/OracleReadByPartition.json @@ -0,0 +1,46 @@ +{ + "flow":{ + "name":"test", + "uuid":"1234", + "stops":[ + + { + "uuid":"1111", + "name":"OracleRead", + "bundle":"cn.piflow.bundle.jdbc.OracleRead", + "properties":{ + "url": "jdbc:oracle:thin:@(DESCRIPTION =(ADDRESS_LIST =(ADDRESS=(PROTOCOL=TCP)(HOST=192.168.2.237)(PORT=1521)))(CONNECT_DATA=(SERVICE_NAME=RACDB_STANDBY)))", + "user": "dashuju", + "password": "DaShuju_732", + "sql":"select rpt_code,submit_date from egrant_isis.rpt_completion where rownum <=10", + "partitionColumn": "rpt_code", + "lowerBound": "1", + "upperBound": "20", + "numPartitions": "5" + } + }, + { + "uuid":"1324", + "name":"CsvSave", + "bundle":"cn.piflow.bundle.csv.CsvSave", + "properties":{ + "csvSavePath":"hdfs://192.168.3.138:8020/test/", + "header": "true", + "delimiter":",", + "partition":"1", + "saveMode": "append" + } + + } + + ], + "paths":[ + { + "from":"OracleRead", + "outport":"", + "inport":"", + "to":"CsvSave" + } + ] + } +} \ No newline at end of file diff --git a/piflow-bundle/src/main/resources/flow/jdbc/OracleWrite.json b/piflow-bundle/src/main/resources/flow/jdbc/OracleWrite.json new file mode 100644 index 0000000..e89ca59 --- /dev/null +++ b/piflow-bundle/src/main/resources/flow/jdbc/OracleWrite.json @@ -0,0 +1,38 @@ +{ + "flow":{ + "name":"test", + "uuid":"1234", + "stops":[ + { + "uuid":"1111", + "name":"CsvParser", + "bundle":"cn.piflow.bundle.csv.CsvParser", + "properties":{ + "csvPath":"hdfs://192.168.3.138:8020/test/", + "header": "true", + "delimiter":",", + "schema":"id,name,gender,age" + } + }, + { + "uuid":"1111", + "name":"OracleWrite", + "bundle":"cn.piflow.bundle.jdbc.OracleWrite", + "properties":{ + "url": "jdbc:mysql://192.168.3.141:3306/piflow_web", + "user": "root", + "password": "bigdata", + "dbtable":"test" + } + } + ], + "paths":[ + { + "from":"CsvParser", + "outport":"", + "inport":"", + "to":"test" + } + ] + } +} \ No newline at end of file diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/clean/IdentityNumberClean.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/clean/IdentityNumberClean.scala index 43a9ad8..2d2c933 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/clean/IdentityNumberClean.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/clean/IdentityNumberClean.scala @@ -13,7 +13,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession} class IdentityNumberClean extends ConfigurableStop{ val authorEmail: String = "06whuxx@163.com" - val description: String = "Cleaning data in ID format" + val description: String = "Cleaning data in ID Card format" val inportList: List[String] = List(Port.DefaultPort) val outportList: List[String] = List(Port.DefaultPort) @@ -75,7 +75,7 @@ class IdentityNumberClean extends ConfigurableStop{ .description("Column names are what you want to clean,multiple column names are separated by commas") .defaultValue("") .required(true) - .example("idcard") + .example("IdCard") descriptor = columnName :: descriptor descriptor diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvParser.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvParser.scala index cdf2e8b..19b9a49 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvParser.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvParser.scala @@ -72,7 +72,7 @@ class CsvParser extends ConfigurableStop{ .description("The path of csv file") .defaultValue("") .required(true) - .example("hdfs://192.168.3.138:8020/test/") + .example("hdfs://127.0.0.1:9000/test/") descriptor = csvPath :: descriptor val header = new PropertyDescriptor() diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvSave.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvSave.scala index 3e48b7f..fdd370a 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvSave.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvSave.scala @@ -39,7 +39,7 @@ class CsvSave extends ConfigurableStop{ .description("The save path of csv file") .defaultValue("") .required(true) - .example("hdfs://192.168.3.138:8020/test/") + .example("hdfs://127.0.0.1:9000/test/") descriptor = csvSavePath :: descriptor val header = new PropertyDescriptor() diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/UnzipFilesOnHDFS.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/UnzipFilesOnHDFS.scala index 36256fc..72b307d 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/UnzipFilesOnHDFS.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/UnzipFilesOnHDFS.scala @@ -18,7 +18,7 @@ import scala.collection.mutable.ArrayBuffer class UnzipFilesOnHDFS extends ConfigurableStop { val authorEmail: String = "yangqidong@cnic.cn" - val description: String = "Unzip files on hdfs" + val description: String = "Extract files on hdfs" val inportList: List[String] = List(Port.DefaultPort) val outportList: List[String] = List(Port.DefaultPort) @@ -67,7 +67,7 @@ class UnzipFilesOnHDFS extends ConfigurableStop { typeStr="gz" } }else{ - throw new RuntimeException("File type fill in error, or do not support this type.") + throw new RuntimeException("The file type is incorrect,or is not supported.") } typeStr } @@ -88,9 +88,6 @@ class UnzipFilesOnHDFS extends ConfigurableStop { fs } - - - def unzipFile(fileHdfsPath: String, saveHdfsPath: String)= { var eachSavePath : String="" @@ -179,7 +176,7 @@ class UnzipFilesOnHDFS extends ConfigurableStop { .name("filePath") .displayName("FilePath") .defaultValue("") - .description("File path of HDFS,such as '/work/a.gz'") + .description("File path of HDFS") .required(false) .example("/work/a.gz ") @@ -195,9 +192,9 @@ class UnzipFilesOnHDFS extends ConfigurableStop { val savePath = new PropertyDescriptor() .name("savePath") - .displayName("savePath") + .displayName("SavePath") .description("This parameter can specify the location of the decompressed file, you can choose not to fill in, " + - "the program will default to save the decompressed file in the folder where the source file is located. If you fill in, you can specify a folder, such as /A/AB/") + "the program saves the decompressed file in the folder where the source file is located by default. If you fill in, you can specify a folder, such as /A/AB/") .defaultValue("") .required(false) .example("/work/aa/") @@ -209,7 +206,7 @@ class UnzipFilesOnHDFS extends ConfigurableStop { .displayName("isCustomize") .description("Whether to customize the compressed file path, if true, \n" + "you must specify the path where the compressed file is located . \n" + - "If it is false, it will automatically find the file path data from the upstream port ") + "If false, it will automatically find the file path data from the upstream port ") .defaultValue("false").allowableValues(Set("true","false")).required(true) descriptor = isCustomize :: descriptor @@ -225,6 +222,6 @@ class UnzipFilesOnHDFS extends ConfigurableStop { } override def getGroup(): List[String] = { - List(StopGroup.HdfsGroup.toString) + List(StopGroup.HdfsGroup) } } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/hive/PutHiveMode.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/hive/PutHiveMode.scala index bc4b306..ea9bf89 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/hive/PutHiveMode.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/hive/PutHiveMode.scala @@ -9,9 +9,9 @@ import org.apache.spark.sql.{SaveMode, SparkSession} class PutHiveMode extends ConfigurableStop { val authorEmail: String = "xjzhu@cnic.cn" - val description: String = "Save data to hive by overwrite mode" - val inportList: List[String] = List(Port.DefaultPort.toString) - val outportList: List[String] = List(Port.NonePort.toString) + val description: String = "Modes for saving data hive" + val inportList: List[String] = List(Port.DefaultPort) + val outportList: List[String] = List(Port.DefaultPort) var database:String = _ var table:String = _ diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/hive/PutHiveQL.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/hive/PutHiveQL.scala index d405ccc..35eead3 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/hive/PutHiveQL.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/hive/PutHiveQL.scala @@ -11,8 +11,8 @@ class PutHiveQL extends ConfigurableStop { val authorEmail: String = "xiaoxiao@cnic.cn" val description: String = "Execute hiveQL script" - val inportList: List[String] = List(Port.DefaultPort.toString) - val outportList: List[String] = List(Port.DefaultPort.toString) + val inportList: List[String] = List(Port.DefaultPort) + val outportList: List[String] = List(Port.DefaultPort) var database:String =_ var hiveQL_Path:String =_ diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/hive/PutHiveStreaming.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/hive/PutHiveStreaming.scala index f1e6bd4..fb43eda 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/hive/PutHiveStreaming.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/hive/PutHiveStreaming.scala @@ -12,8 +12,8 @@ class PutHiveStreaming extends ConfigurableStop { val authorEmail: String = "xjzhu@cnic.cn" val description: String = "Save data to hive" - val inportList: List[String] = List(Port.DefaultPort.toString) - val outportList: List[String] = List(Port.DefaultPort.toString) + val inportList: List[String] = List(Port.DefaultPort) + val outportList: List[String] = List(Port.DefaultPort) var database:String = _ var table:String = _ diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/hive/SelectHiveQL.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/hive/SelectHiveQL.scala index b48ca2a..71e228e 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/hive/SelectHiveQL.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/hive/SelectHiveQL.scala @@ -14,8 +14,8 @@ class SelectHiveQL extends ConfigurableStop { val authorEmail: String = "xjzhu@cnic.cn" val description: String = "Execute select clause of hiveQL" - val inportList: List[String] = List(Port.DefaultPort.toString) - val outportList: List[String] = List(Port.DefaultPort.toString) + val inportList: List[String] = List(Port.DefaultPort) + val outportList: List[String] = List(Port.DefaultPort) var hiveQL:String = _ @@ -43,6 +43,7 @@ class SelectHiveQL extends ConfigurableStop { .displayName("HiveQL") .defaultValue("") .allowableValues(Set("")) + .description("Execute select clause of hiveQL") .required(true) .example("select * from test.user1") descriptor = hiveQL :: descriptor @@ -54,7 +55,7 @@ class SelectHiveQL extends ConfigurableStop { } override def getGroup(): List[String] = { - List(StopGroup.HiveGroup.toString) + List(StopGroup.HiveGroup) } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/hive/SelectHiveQLbyJDBC.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/hive/SelectHiveQLByJDBC.scala similarity index 98% rename from piflow-bundle/src/main/scala/cn/piflow/bundle/hive/SelectHiveQLbyJDBC.scala rename to piflow-bundle/src/main/scala/cn/piflow/bundle/hive/SelectHiveQLByJDBC.scala index 0472bbd..d46924b 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/hive/SelectHiveQLbyJDBC.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/hive/SelectHiveQLByJDBC.scala @@ -10,7 +10,7 @@ import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession} /** * HIVE JDBC DRIVER DESIGN FOR HIVE 1.2.1 */ -class SelectHiveQLbyJDBC extends ConfigurableStop { +class SelectHiveQLByJDBC extends ConfigurableStop { override val authorEmail: String = "xiaomeng7890@gmail.com" override val description: String = "some hive can only achieve by jdbc, this stop is designed for this" override val inportList: List[String] = List(Port.NonePort) diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/ReadImpala.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/ImpalaRead.scala similarity index 98% rename from piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/ReadImpala.scala rename to piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/ImpalaRead.scala index a587143..32613ad 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/ReadImpala.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/ImpalaRead.scala @@ -13,7 +13,7 @@ import org.apache.spark.sql.{DataFrame, Row, SparkSession} import scala.collection.mutable.ArrayBuffer -class ReadImpala extends ConfigurableStop{ +class ImpalaRead extends ConfigurableStop{ override val authorEmail: String = "yangqidong@cnic.cn" override val description: String = "Get data from impala" val inportList: List[String] = List(Port.NonePort.toString) diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/JdbcReadFromOracle.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/JdbcReadFromOracle.scala index 019cafd..68fbda5 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/JdbcReadFromOracle.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/JdbcReadFromOracle.scala @@ -17,8 +17,8 @@ class JdbcReadFromOracle extends ConfigurableStop{ val authorEmail: String = "yangqidong@cnic.cn" val description: String = "Read from oracle" - val inportList: List[String] = List(Port.NonePort.toString) - val outportList: List[String] = List(Port.DefaultPort.toString) + val inportList: List[String] = List(Port.NonePort) + val outportList: List[String] = List(Port.DefaultPort) var url:String = _ var user:String = _ @@ -132,8 +132,6 @@ class JdbcReadFromOracle extends ConfigurableStop{ val df: DataFrame = session.createDataFrame(rdd,schemaNew) out.write(df) - - } def initialize(ctx: ProcessContext): Unit = { @@ -157,6 +155,7 @@ class JdbcReadFromOracle extends ConfigurableStop{ .description("The Url, for example jdbc:oracle:thin:@192.168.0.1:1521/newdb") .defaultValue("") .required(true) + .example("jdbc:oracle:thin:@192.168.0.1:1521/newdb") descriptor = url :: descriptor val user=new PropertyDescriptor() @@ -165,6 +164,7 @@ class JdbcReadFromOracle extends ConfigurableStop{ .description("The user name of database") .defaultValue("") .required(true) + .example("root") descriptor = user :: descriptor val password=new PropertyDescriptor() @@ -173,6 +173,7 @@ class JdbcReadFromOracle extends ConfigurableStop{ .description("The password of database") .defaultValue("") .required(true) + .example("123456") descriptor = password :: descriptor val sql=new PropertyDescriptor() @@ -181,6 +182,7 @@ class JdbcReadFromOracle extends ConfigurableStop{ .description("The sql you want") .defaultValue("") .required(true) + .example("select * from type") descriptor = sql :: descriptor val schema=new PropertyDescriptor() @@ -189,6 +191,7 @@ class JdbcReadFromOracle extends ConfigurableStop{ .description("The name of the field of your SQL statement query, such as: ID.number, name.varchar") .defaultValue("") .required(true) + .example("ID.number, name.varchar") descriptor = schema :: descriptor descriptor @@ -199,7 +202,7 @@ class JdbcReadFromOracle extends ConfigurableStop{ } override def getGroup(): List[String] = { - List(StopGroup.JdbcGroup.toString) + List(StopGroup.JdbcGroup) } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/JdbcRead.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/MysqlRead.scala similarity index 85% rename from piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/JdbcRead.scala rename to piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/MysqlRead.scala index 3ae6d92..96e3449 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/JdbcRead.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/MysqlRead.scala @@ -7,12 +7,12 @@ import cn.piflow.conf.util.{ImageUtil, MapUtil} import org.apache.spark.sql.SparkSession -class JdbcRead extends ConfigurableStop { +class MysqlRead extends ConfigurableStop { val authorEmail: String = "xjzhu@cnic.cn" - val description: String = "Read data from jdbc database" - val inportList: List[String] = List(Port.NonePort.toString) - val outportList: List[String] = List(Port.DefaultPort.toString) + val description: String = "Read data from mysql database with jdbc" + val inportList: List[String] = List(Port.NonePort) + val outportList: List[String] = List(Port.DefaultPort) var url:String = _ var user:String = _ @@ -51,7 +51,7 @@ class JdbcRead extends ConfigurableStop { val url=new PropertyDescriptor() .name("url") .displayName("Url") - .description("The Url of database") + .description("The Url of mysql database") .defaultValue("") .required(true) .example("jdbc:mysql://127.0.0.1/dbname") @@ -60,7 +60,7 @@ class JdbcRead extends ConfigurableStop { val user=new PropertyDescriptor() .name("user") .displayName("User") - .description("The user name of database") + .description("The user name of mysql database") .defaultValue("") .required(true) .example("root") @@ -69,7 +69,7 @@ class JdbcRead extends ConfigurableStop { val password=new PropertyDescriptor() .name("password") .displayName("Password") - .description("The password of database") + .description("The password of mysql database") .defaultValue("") .required(true) .example("12345") diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/MysqlReadIncremental.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/MysqlReadIncremental.scala index 28b9847..3951ea5 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/MysqlReadIncremental.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/MysqlReadIncremental.scala @@ -17,14 +17,12 @@ class MysqlReadIncremental extends ConfigurableIncrementalStop{ override val inportList: List[String] = List(Port.NonePort.toString) override val outportList: List[String] = List(Port.DefaultPort.toString) - //var driver:String = _ var url:String = _ var user:String = _ var password:String = _ var sql:String = _ override def setProperties(map: Map[String, Any]): Unit = { - //driver = MapUtil.get(map,"driver").asInstanceOf[String] url = MapUtil.get(map,"url").asInstanceOf[String] user = MapUtil.get(map,"user").asInstanceOf[String] password = MapUtil.get(map,"password").asInstanceOf[String] @@ -36,25 +34,53 @@ class MysqlReadIncremental extends ConfigurableIncrementalStop{ override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - //val driver=new PropertyDescriptor().name("driver").displayName("Driver").description("The driver name, for example com.mysql.jdbc.Driver").defaultValue("").required(true) - //descriptor = driver :: descriptor - val url=new PropertyDescriptor().name("url").displayName("url").description("The Url, for example jdbc:mysql://127.0.0.1/dbname").defaultValue("").required(true) + val url=new PropertyDescriptor() + .name("url") + .displayName("url") + .description("The Url, for example jdbc:mysql://127.0.0.1/dbname") + .defaultValue("") + .required(true) descriptor = url :: descriptor - val user=new PropertyDescriptor().name("user").displayName("user").description("The user name of database").defaultValue("").required(true) + val user=new PropertyDescriptor() + .name("user") + .displayName("user") + .description("The user name of database") + .defaultValue("") + .required(true) descriptor = user :: descriptor - val password=new PropertyDescriptor().name("password").displayName("password").description("The password of database").defaultValue("").required(true) + val password=new PropertyDescriptor() + .name("password") + .displayName("password") + .description("The password of database") + .defaultValue("") + .required(true) descriptor = password :: descriptor - val sql=new PropertyDescriptor().name("sql").displayName("sql").description("The sql sentence you want to execute").defaultValue("").required(true) + val sql=new PropertyDescriptor() + .name("sql") + .displayName("sql") + .description("The sql sentence you want to execute") + .defaultValue("") + .required(true) descriptor = sql :: descriptor - val incrementalField=new PropertyDescriptor().name("incrementalField").displayName("incrementalField").description("The incremental field").defaultValue("").required(true) + val incrementalField=new PropertyDescriptor() + .name("incrementalField") + .displayName("incrementalField") + .description("The incremental field") + .defaultValue("") + .required(true) descriptor = incrementalField :: descriptor - val incrementalStart=new PropertyDescriptor().name("incrementalStart").displayName("incrementalStart").description("The incremental start value").defaultValue("").required(true) + val incrementalStart=new PropertyDescriptor() + .name("incrementalStart") + .displayName("incrementalStart") + .description("The incremental start value") + .defaultValue("") + .required(true) descriptor = incrementalStart :: descriptor descriptor diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/WriteMysql.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/MysqlWrite.scala similarity index 60% rename from piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/WriteMysql.scala rename to piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/MysqlWrite.scala index f310305..e0369da 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/WriteMysql.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/MysqlWrite.scala @@ -10,17 +10,18 @@ import org.apache.spark.sql.{SaveMode, SparkSession} import scala.beans.BeanProperty -class WriteMysql extends ConfigurableStop{ +class MysqlWrite extends ConfigurableStop{ val authorEmail: String = "xjzhu@cnic.cn" - val description: String = "Write data into jdbc database" - val inportList: List[String] = List(Port.DefaultPort.toString) - val outportList: List[String] = List(Port.NonePort.toString) + val description: String = "Write data to mysql database with jdbc" + val inportList: List[String] = List(Port.DefaultPort) + val outportList: List[String] = List(Port.NonePort) var url:String = _ var user:String = _ var password:String = _ var dbtable:String = _ + def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { val spark = pec.get[SparkSession]() val jdbcDF = in.read() @@ -28,7 +29,6 @@ class WriteMysql extends ConfigurableStop{ properties.put("user", user) properties.put("password", password) jdbcDF.write.mode(SaveMode.Append).jdbc(url,dbtable,properties) - //jdbcDF.show(10) out.write(jdbcDF) } @@ -46,22 +46,42 @@ class WriteMysql extends ConfigurableStop{ override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - val url=new PropertyDescriptor().name("url").displayName("url").description("The Url, for example jdbc:mysql://127.0.0.1/dbname").defaultValue("").required(true) - //descriptor = url :: descriptor - - val user=new PropertyDescriptor().name("user").displayName("user").description("The user name of database").defaultValue("").required(true) - //descriptor = user :: descriptor - - val password=new PropertyDescriptor().name("password").displayName("password").description("The password of database").defaultValue("").required(true) - //descriptor = password :: descriptor - - val dbtable=new PropertyDescriptor().name("dbtable").displayName("dbtable").description("The table you want to write").defaultValue("").required(true) - //descriptor = dbtable :: descriptor - + val url=new PropertyDescriptor() + .name("url") + .displayName("Url") + .description("The Url, for example jdbc:mysql://127.0.0.1/dbname") + .defaultValue("") + .required(true) + .example("jdbc:mysql://127.0.0.1/dbname") descriptor = url :: descriptor + + val user=new PropertyDescriptor() + .name("user") + .displayName("User") + .description("The user name of database") + .defaultValue("") + .required(true) + .example("root") descriptor = user :: descriptor + + val password=new PropertyDescriptor() + .name("password") + .displayName("Password") + .description("The password of database") + .defaultValue("") + .required(true) + .example("123456") descriptor = password :: descriptor + + val dbtable=new PropertyDescriptor() + .name("dbtable") + .displayName("DBTable") + .description("The table you want to write") + .defaultValue("") + .required(true) + .example("test") descriptor = dbtable :: descriptor + descriptor } @@ -70,7 +90,7 @@ class WriteMysql extends ConfigurableStop{ } override def getGroup(): List[String] = { - List(StopGroup.JdbcGroup.toString) + List(StopGroup.JdbcGroup) } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/OracleRead.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/OracleRead.scala index d826359..afc48ee 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/OracleRead.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/OracleRead.scala @@ -44,16 +44,40 @@ class OracleRead extends ConfigurableStop{ override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - val url=new PropertyDescriptor().name("url").displayName("url").description("The Url, for example jdbc:mysql://127.0.0.1/dbname").defaultValue("").required(true) + val url=new PropertyDescriptor() + .name("url") + .displayName("Url") + .description("The Url, for example jdbc:oracle:thin:@10.0.86.237:1521/newdb") + .defaultValue("") + .required(true) + .example("jdbc:oracle:thin:@10.0.86.237:1521/newdb") descriptor = url :: descriptor - val user=new PropertyDescriptor().name("user").displayName("user").description("The user name of database").defaultValue("").required(true) + val user=new PropertyDescriptor() + .name("user") + .displayName("User") + .description("The user name of database") + .defaultValue("") + .required(true) + .example("root") descriptor = user :: descriptor - val password=new PropertyDescriptor().name("password").displayName("password").description("The password of database").defaultValue("").required(true) + val password=new PropertyDescriptor() + .name("password") + .displayName("Password") + .description("The password of database") + .defaultValue("") + .required(true) + .example("123456") descriptor = password :: descriptor - val sql=new PropertyDescriptor().name("sql").displayName("sql").description("The sql sentence you want to execute").defaultValue("").required(true) + val sql=new PropertyDescriptor() + .name("sql") + .displayName("Sql") + .description("The sql sentence you want to execute") + .defaultValue("") + .required(true) + .example("select * from test") descriptor = sql :: descriptor descriptor @@ -64,7 +88,7 @@ class OracleRead extends ConfigurableStop{ } override def getGroup(): List[String] = { - List(StopGroup.JdbcGroup.toString) + List(StopGroup.JdbcGroup) } override def initialize(ctx: ProcessContext): Unit = {} diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/OracleReadByPartition.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/OracleReadByPartition.scala index 7a5f7f2..498a404 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/OracleReadByPartition.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/OracleReadByPartition.scala @@ -38,28 +38,76 @@ class OracleReadByPartition extends ConfigurableStop{ override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - val url=new PropertyDescriptor().name("url").displayName("url").description("The Url, for example jdbc:mysql://127.0.0.1/dbname").defaultValue("").required(true) + val url=new PropertyDescriptor() + .name("url") + .displayName("Url") + .description("The Url,for example jdbc:oracle:thin:@10.0.86.237:1521/newdb") + .defaultValue("") + .required(true) + .example("jdbc:oracle:thin:@10.0.86.237:1521/newdb") descriptor = url :: descriptor - val user=new PropertyDescriptor().name("user").displayName("user").description("The user name of database").defaultValue("").required(true) + val user=new PropertyDescriptor() + .name("user") + .displayName("User") + .description("The user name of database") + .defaultValue("") + .required(true) + .example("root") descriptor = user :: descriptor - val password=new PropertyDescriptor().name("password").displayName("password").description("The password of database").defaultValue("").required(true) + val password=new PropertyDescriptor() + .name("password") + .displayName("Password") + .description("The password of database") + .defaultValue("") + .required(true) + .example("123456") descriptor = password :: descriptor - val sql=new PropertyDescriptor().name("sql").displayName("sql").description("The sql sentence you want to execute").defaultValue("").required(true) + val sql=new PropertyDescriptor() + .name("sql") + .displayName("Sql") + .description("The sql sentence you want to execute") + .defaultValue("") + .required(true) + .example("select * from test") descriptor = sql :: descriptor - val partitionColumn=new PropertyDescriptor().name("partitionColumn").displayName("partitionColumn").description("The partitionby column").defaultValue("").required(true) + val partitionColumn=new PropertyDescriptor() + .name("partitionColumn") + .displayName("PartitionColumn") + .description("partitioned column") + .defaultValue("") + .required(true) + .example("id") descriptor = partitionColumn :: descriptor - val lowerBound=new PropertyDescriptor().name("lowerBound").displayName("lowerBound").description("The lowerBound of partitioned column").defaultValue("").required(true) + val lowerBound=new PropertyDescriptor() + .name("lowerBound") + .displayName("LowerBound") + .description("The lowerbound of partitioned columns") + .defaultValue("") + .required(true) + .example("1") descriptor = lowerBound :: descriptor - val upperBound=new PropertyDescriptor().name("upperBound").displayName("upperBound").description("The upperBound of partitioned column").defaultValue("").required(true) + val upperBound=new PropertyDescriptor() + .name("upperBound") + .displayName("UpperBound") + .description("The upperBound of partitioned columns") + .defaultValue("") + .required(true) + .example("20") descriptor = upperBound :: descriptor - val numPartitions=new PropertyDescriptor().name("numPartitions").displayName("numPartitions").description("The number of partitions ").defaultValue("").required(true) + val numPartitions=new PropertyDescriptor() + .name("numPartitions") + .displayName("NumPartitions") + .description("The number of partitions ") + .defaultValue("") + .required(true) + .example("5") descriptor = numPartitions :: descriptor descriptor @@ -70,7 +118,7 @@ class OracleReadByPartition extends ConfigurableStop{ } override def getGroup(): List[String] = { - List(StopGroup.JdbcGroup.toString) + List(StopGroup.JdbcGroup) } override def initialize(ctx: ProcessContext): Unit = {} diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/WriteOracle.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/OracleWrite.scala similarity index 68% rename from piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/WriteOracle.scala rename to piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/OracleWrite.scala index 1bda7e6..679d06b 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/WriteOracle.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/OracleWrite.scala @@ -8,19 +8,18 @@ import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.util.{ImageUtil, MapUtil} import org.apache.spark.sql._ -class WriteOracle extends ConfigurableStop{ +class OracleWrite extends ConfigurableStop{ val authorEmail: String = "yangqidong@cnic.cn" val description: String = "Write data to oracle" - val inportList: List[String] = List(Port.NonePort.toString) - val outportList: List[String] = List(Port.DefaultPort.toString) + val inportList: List[String] = List(Port.NonePort) + val outportList: List[String] = List(Port.DefaultPort) var url:String = _ var user:String = _ var password:String = _ var table:String = _ - def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { val session = pec.get[SparkSession]() val inDF: DataFrame = in.read() @@ -67,16 +66,40 @@ class WriteOracle extends ConfigurableStop{ override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - val url=new PropertyDescriptor().name("url").displayName("url").description("The Url, for example jdbc:oracle:thin:@192.168.0.1:1521/newdb").defaultValue("").required(true) + val url=new PropertyDescriptor() + .name("url") + .displayName("Url") + .description("The Url, for example jdbc:oracle:thin:@10.0.86.237:1521/newdb") + .defaultValue("") + .required(true) + .example("jdbc:oracle:thin:@10.0.86.237:1521/newdb") descriptor = url :: descriptor - val user=new PropertyDescriptor().name("user").displayName("user").description("The user name of database").defaultValue("").required(true) + val user=new PropertyDescriptor() + .name("user") + .displayName("User") + .description("The user name of database") + .defaultValue("") + .required(true) + .example("root") descriptor = user :: descriptor - val password=new PropertyDescriptor().name("password").displayName("password").description("The password of database").defaultValue("").required(true) + val password=new PropertyDescriptor() + .name("password") + .displayName("Password") + .description("The password of database") + .defaultValue("") + .required(true) + .example("123456") descriptor = password :: descriptor - val table=new PropertyDescriptor().name("table").displayName("table").description("The table you want to write to").defaultValue("").required(true) + val table=new PropertyDescriptor() + .name("table") + .displayName("table") + .description("The table name") + .defaultValue("") + .required(true) + .example("test") descriptor = table :: descriptor descriptor @@ -87,7 +110,7 @@ class WriteOracle extends ConfigurableStop{ } override def getGroup(): List[String] = { - List(StopGroup.JdbcGroup.toString) + List(StopGroup.JdbcGroup) } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/json/EvaluateJsonPath.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/json/EvaluateJsonPath.scala index b64ad4f..9275d7d 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/json/EvaluateJsonPath.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/json/EvaluateJsonPath.scala @@ -7,6 +7,7 @@ import cn.piflow.conf.util.{ImageUtil, MapUtil} import org.apache.spark.sql.{DataFrame, SparkSession} class EvaluateJsonPath extends ConfigurableStop{ + override val authorEmail: String = "yangqidong@cnic.cn" val inportList: List[String] = List(Port.NonePort.toString) val outportList: List[String] = List(Port.DefaultPort.toString) diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/JDBC/getOracleTest.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/JDBC/JdbcReadFromOracleTest.scala similarity index 94% rename from piflow-bundle/src/test/scala/cn/piflow/bundle/JDBC/getOracleTest.scala rename to piflow-bundle/src/test/scala/cn/piflow/bundle/JDBC/JdbcReadFromOracleTest.scala index 7a429b9..416f0ea 100644 --- a/piflow-bundle/src/test/scala/cn/piflow/bundle/JDBC/getOracleTest.scala +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/JDBC/JdbcReadFromOracleTest.scala @@ -9,13 +9,13 @@ import org.junit.Test import scala.util.parsing.json.JSON -class getOracleTest { +class JdbcReadFromOracleTest { @Test def testFlow(): Unit ={ //parse flow json - val file = "src/main/resources/JDBC/getOracle.json" + val file = "src/main/resources/JDBC/JdbcReadFromOracle.json" val flowJsonStr = FileUtil.fileReader(file) val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]] println(map) @@ -26,7 +26,6 @@ class getOracleTest { val h2Server = Server.createTcpServer("-tcp","-tcpAllowOthers","-tcpPort","50001").start() - //execute flow val spark = SparkSession.builder() .master("spark://10.0.86.89:7077") diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/csv/CsvSaveAsErrorTest.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/JDBC/MysqlReadTest.scala similarity index 89% rename from piflow-bundle/src/test/scala/cn/piflow/bundle/csv/CsvSaveAsErrorTest.scala rename to piflow-bundle/src/test/scala/cn/piflow/bundle/JDBC/MysqlReadTest.scala index 18d4335..4ced82a 100644 --- a/piflow-bundle/src/test/scala/cn/piflow/bundle/csv/CsvSaveAsErrorTest.scala +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/JDBC/MysqlReadTest.scala @@ -1,4 +1,4 @@ -package cn.piflow.bundle.csv +package cn.piflow.bundle.JDBC import cn.piflow.Runner import cn.piflow.conf.bean.FlowBean @@ -10,13 +10,13 @@ import org.junit.Test import scala.util.parsing.json.JSON -class CsvSaveAsErrorTest { +class MysqlReadTest { @Test def testFlow(): Unit ={ //parse flow json - val file = "src/main/resources/flow/csv/CsvSaveAsError.json" + val file = "src/main/resources/flow/jdbc/MysqlRead.json" val flowJsonStr = FileUtil.fileReader(file) val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]] println(map) @@ -24,12 +24,13 @@ class CsvSaveAsErrorTest { //create flow val flowBean = FlowBean(map) val flow = flowBean.constructFlow() + val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort", "50001").start() //execute flow val spark = SparkSession.builder() .master("local[*]") - .appName("piflow-hive-bundle") + .appName("CsvParserTest") .config("spark.driver.memory", "1g") .config("spark.executor.memory", "2g") .config("spark.cores.max", "2") diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/JDBC/JdbcReadTest.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/JDBC/MysqlWriteTest.scala similarity index 85% rename from piflow-bundle/src/test/scala/cn/piflow/bundle/JDBC/JdbcReadTest.scala rename to piflow-bundle/src/test/scala/cn/piflow/bundle/JDBC/MysqlWriteTest.scala index 7479ff1..9bff58c 100644 --- a/piflow-bundle/src/test/scala/cn/piflow/bundle/JDBC/JdbcReadTest.scala +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/JDBC/MysqlWriteTest.scala @@ -3,19 +3,20 @@ package cn.piflow.bundle.JDBC import cn.piflow.Runner import cn.piflow.conf.bean.FlowBean import cn.piflow.conf.util.{FileUtil, OptionUtil} +import cn.piflow.util.PropertyUtil import org.apache.spark.sql.SparkSession import org.h2.tools.Server import org.junit.Test import scala.util.parsing.json.JSON -class JdbcReadTest { +class MysqlWriteTest { @Test def testFlow(): Unit ={ //parse flow json - val file = "src/main/resources/flow/jdbc/JdbcRead.json" + val file = "src/main/resources/flow/jdbc/MysqlWrite.json" val flowJsonStr = FileUtil.fileReader(file) val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]] println(map) @@ -33,7 +34,7 @@ class JdbcReadTest { .config("spark.driver.memory", "1g") .config("spark.executor.memory", "2g") .config("spark.cores.max", "2") - .config("hive.metastore.uris", "thrift://192.168.3.140:9083") + .config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris")) .enableHiveSupport() .getOrCreate() diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/csv/CsvSaveAsAppendTest.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/JDBC/OracleReadByPartitionTest.scala similarity index 84% rename from piflow-bundle/src/test/scala/cn/piflow/bundle/csv/CsvSaveAsAppendTest.scala rename to piflow-bundle/src/test/scala/cn/piflow/bundle/JDBC/OracleReadByPartitionTest.scala index d94d1b9..2890859 100644 --- a/piflow-bundle/src/test/scala/cn/piflow/bundle/csv/CsvSaveAsAppendTest.scala +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/JDBC/OracleReadByPartitionTest.scala @@ -1,23 +1,22 @@ -package cn.piflow.bundle.csv +package cn.piflow.bundle.JDBC -import cn.piflow.bundle.json.JsonSave +import cn.piflow.Runner import cn.piflow.conf.bean.FlowBean import cn.piflow.conf.util.{FileUtil, OptionUtil} import cn.piflow.util.PropertyUtil -import cn.piflow.{FlowImpl, Path, Runner} import org.apache.spark.sql.SparkSession import org.h2.tools.Server import org.junit.Test import scala.util.parsing.json.JSON -class CsvSaveAsAppendTest { +class OracleReadByPartitionTest { @Test def testFlow(): Unit ={ //parse flow json - val file = "src/main/resources/flow/csv/CsvSaveAsAppend.json" + val file = "src/main/resources/flow/jdbc/OracleReadByPartition.json" val flowJsonStr = FileUtil.fileReader(file) val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]] println(map) @@ -25,12 +24,13 @@ class CsvSaveAsAppendTest { //create flow val flowBean = FlowBean(map) val flow = flowBean.constructFlow() + val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort", "50001").start() //execute flow val spark = SparkSession.builder() .master("local[*]") - .appName("piflow-hive-bundle") + .appName("CsvParserTest") .config("spark.driver.memory", "1g") .config("spark.executor.memory", "2g") .config("spark.cores.max", "2") diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/JDBC/OracleReadTest.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/JDBC/OracleReadTest.scala new file mode 100644 index 0000000..d4c174b --- /dev/null +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/JDBC/OracleReadTest.scala @@ -0,0 +1,53 @@ +package cn.piflow.bundle.JDBC + +import cn.piflow.Runner +import cn.piflow.conf.bean.FlowBean +import cn.piflow.conf.util.{FileUtil, OptionUtil} +import cn.piflow.util.PropertyUtil +import org.apache.spark.sql.SparkSession +import org.h2.tools.Server +import org.junit.Test + +import scala.util.parsing.json.JSON + +class OracleReadTest { + + @Test + def testFlow(): Unit ={ + + //parse flow json + val file = "src/main/resources/flow/jdbc/OracleRead.json" + val flowJsonStr = FileUtil.fileReader(file) + val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]] + println(map) + + //create flow + val flowBean = FlowBean(map) + val flow = flowBean.constructFlow() + + val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort", "50001").start() + + //execute flow + val spark = SparkSession.builder() + .master("local[*]") + .appName("CsvParserTest") + .config("spark.driver.memory", "1g") + .config("spark.executor.memory", "2g") + .config("spark.cores.max", "2") + .config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris")) + .enableHiveSupport() + .getOrCreate() + + val process = Runner.create() + .bind(classOf[SparkSession].getName, spark) + .bind("checkpoint.path", "") + .bind("debug.path","") + .start(flow); + + process.awaitTermination(); + val pid = process.pid(); + println(pid + "!!!!!!!!!!!!!!!!!!!!!") + spark.close(); + } + +} diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/JDBC/OracleWriteTest.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/JDBC/OracleWriteTest.scala new file mode 100644 index 0000000..fba5305 --- /dev/null +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/JDBC/OracleWriteTest.scala @@ -0,0 +1,53 @@ +package cn.piflow.bundle.JDBC + +import cn.piflow.Runner +import cn.piflow.conf.bean.FlowBean +import cn.piflow.conf.util.{FileUtil, OptionUtil} +import cn.piflow.util.PropertyUtil +import org.apache.spark.sql.SparkSession +import org.h2.tools.Server +import org.junit.Test + +import scala.util.parsing.json.JSON + +class OracleWriteTest { + + @Test + def testFlow(): Unit ={ + + //parse flow json + val file = "src/main/resources/flow/jdbc/OracleWrite.json" + val flowJsonStr = FileUtil.fileReader(file) + val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]] + println(map) + + //create flow + val flowBean = FlowBean(map) + val flow = flowBean.constructFlow() + + val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort", "50001").start() + + //execute flow + val spark = SparkSession.builder() + .master("local[*]") + .appName("CsvParserTest") + .config("spark.driver.memory", "1g") + .config("spark.executor.memory", "2g") + .config("spark.cores.max", "2") + .config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris")) + .enableHiveSupport() + .getOrCreate() + + val process = Runner.create() + .bind(classOf[SparkSession].getName, spark) + .bind("checkpoint.path", "") + .bind("debug.path","") + .start(flow); + + process.awaitTermination(); + val pid = process.pid(); + println(pid + "!!!!!!!!!!!!!!!!!!!!!") + spark.close(); + } + +} diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/csv/CsvSaveAsIgnoreTest.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/csv/CsvSaveAsIgnoreTest.scala deleted file mode 100644 index 823cb60..0000000 --- a/piflow-bundle/src/test/scala/cn/piflow/bundle/csv/CsvSaveAsIgnoreTest.scala +++ /dev/null @@ -1,52 +0,0 @@ -package cn.piflow.bundle.csv - -import cn.piflow.Runner -import cn.piflow.conf.bean.FlowBean -import cn.piflow.conf.util.{FileUtil, OptionUtil} -import cn.piflow.util.PropertyUtil -import org.apache.spark.sql.SparkSession -import org.h2.tools.Server -import org.junit.Test - -import scala.util.parsing.json.JSON - -class CsvSaveAsIgnoreTest { - - @Test - def testFlow(): Unit ={ - - //parse flow json - val file = "src/main/resources/flow/csv/CsvSaveAsIgnore.json" - val flowJsonStr = FileUtil.fileReader(file) - val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]] - println(map) - - //create flow - val flowBean = FlowBean(map) - val flow = flowBean.constructFlow() - val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort", "50001").start() - - //execute flow - val spark = SparkSession.builder() - .master("local[*]") - .appName("piflow-hive-bundle") - .config("spark.driver.memory", "1g") - .config("spark.executor.memory", "2g") - .config("spark.cores.max", "2") - .config("hive.metastore.uris", PropertyUtil.getPropertyValue("hive.metastore.uris")) - .enableHiveSupport() - .getOrCreate() - - val process = Runner.create() - .bind(classOf[SparkSession].getName, spark) - .bind("checkpoint.path", "") - .bind("debug.path","") - .start(flow); - - process.awaitTermination(); - val pid = process.pid(); - println(pid + "!!!!!!!!!!!!!!!!!!!!!") - spark.close(); - } - -} diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/csv/CsvSaveTest.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/csv/CsvSaveTest.scala new file mode 100644 index 0000000..2957e50 --- /dev/null +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/csv/CsvSaveTest.scala @@ -0,0 +1,165 @@ +package cn.piflow.bundle.csv + +import cn.piflow.bundle.json.JsonSave +import cn.piflow.conf.bean.FlowBean +import cn.piflow.conf.util.{FileUtil, OptionUtil} +import cn.piflow.util.PropertyUtil +import cn.piflow.{FlowImpl, Path, Runner} +import org.apache.spark.sql.SparkSession +import org.h2.tools.Server +import org.junit.Test + +import scala.util.parsing.json.JSON + +class CsvSaveTest { + + @Test + def CsvSaveAsAppendFlow(): Unit ={ + + //parse flow json + val file = "src/main/resources/flow/csv/CsvSaveAsAppend.json" + val flowJsonStr = FileUtil.fileReader(file) + val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]] + println(map) + + //create flow + val flowBean = FlowBean(map) + val flow = flowBean.constructFlow() + val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort", "50001").start() + + //execute flow + val spark = SparkSession.builder() + .master("local[*]") + .appName("piflow-hive-bundle") + .config("spark.driver.memory", "1g") + .config("spark.executor.memory", "2g") + .config("spark.cores.max", "2") + .config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris")) + .enableHiveSupport() + .getOrCreate() + + val process = Runner.create() + .bind(classOf[SparkSession].getName, spark) + .bind("checkpoint.path", "") + .bind("debug.path","") + .start(flow); + + process.awaitTermination(); + val pid = process.pid(); + println(pid + "!!!!!!!!!!!!!!!!!!!!!") + spark.close(); + } + + @Test + def CsvSaveAsOverWriteFlow(): Unit ={ + + //parse flow json + val file = "src/main/resources/flow/csv/CsvSaveAsOverWrite.json" + val flowJsonStr = FileUtil.fileReader(file) + val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]] + println(map) + + //create flow + val flowBean = FlowBean(map) + val flow = flowBean.constructFlow() + val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort", "50001").start() + + //execute flow + val spark = SparkSession.builder() + .master("local[*]") + .appName("piflow-hive-bundle") + .config("spark.driver.memory", "1g") + .config("spark.executor.memory", "2g") + .config("spark.cores.max", "2") + .config("hive.metastore.uris", PropertyUtil.getPropertyValue("hive.metastore.uris")) + .enableHiveSupport() + .getOrCreate() + + val process = Runner.create() + .bind(classOf[SparkSession].getName, spark) + .bind("checkpoint.path", "") + .bind("debug.path","") + .start(flow); + + process.awaitTermination(); + val pid = process.pid(); + println(pid + "!!!!!!!!!!!!!!!!!!!!!") + spark.close(); + } + + @Test + def CsvSaveAsErrorFlow(): Unit ={ + + //parse flow json + val file = "src/main/resources/flow/csv/CsvSaveAsError.json" + val flowJsonStr = FileUtil.fileReader(file) + val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]] + println(map) + + //create flow + val flowBean = FlowBean(map) + val flow = flowBean.constructFlow() + val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort", "50001").start() + + //execute flow + val spark = SparkSession.builder() + .master("local[*]") + .appName("piflow-hive-bundle") + .config("spark.driver.memory", "1g") + .config("spark.executor.memory", "2g") + .config("spark.cores.max", "2") + .config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris")) + .enableHiveSupport() + .getOrCreate() + + val process = Runner.create() + .bind(classOf[SparkSession].getName, spark) + .bind("checkpoint.path", "") + .bind("debug.path","") + .start(flow); + + process.awaitTermination(); + val pid = process.pid(); + println(pid + "!!!!!!!!!!!!!!!!!!!!!") + spark.close(); + } + @Test + def CsvSaveAsIgnoreFlow(): Unit ={ + + //parse flow json + val file = "src/main/resources/flow/csv/CsvSaveAsIgnore.json" + val flowJsonStr = FileUtil.fileReader(file) + val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]] + println(map) + + //create flow + val flowBean = FlowBean(map) + val flow = flowBean.constructFlow() + val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort", "50001").start() + + //execute flow + val spark = SparkSession.builder() + .master("local[*]") + .appName("piflow-hive-bundle") + .config("spark.driver.memory", "1g") + .config("spark.executor.memory", "2g") + .config("spark.cores.max", "2") + .config("hive.metastore.uris", PropertyUtil.getPropertyValue("hive.metastore.uris")) + .enableHiveSupport() + .getOrCreate() + + val process = Runner.create() + .bind(classOf[SparkSession].getName, spark) + .bind("checkpoint.path", "") + .bind("debug.path","") + .start(flow); + + process.awaitTermination(); + val pid = process.pid(); + println(pid + "!!!!!!!!!!!!!!!!!!!!!") + spark.close(); + } + + + +} diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/graphx/LoadGraph.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/graphx/LoadGraph.scala new file mode 100644 index 0000000..4ac538e --- /dev/null +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/graphx/LoadGraph.scala @@ -0,0 +1,58 @@ +package cn.piflow.bundle.graphx + +import java.net.InetAddress + +import cn.piflow.Runner +import cn.piflow.conf.bean.FlowBean +import cn.piflow.conf.util.{FileUtil, OptionUtil} +import cn.piflow.util.{PropertyUtil, ServerIpUtil} +import org.apache.spark.sql.SparkSession +import org.h2.tools.Server +import org.junit.Test + +import scala.util.parsing.json.JSON + +class LoadGraph { + + @Test + def testFlow(): Unit ={ + + //parse flow json + val file = "src/main/resources/flow/graphx/LoadGraph.json" + val flowJsonStr = FileUtil.fileReader(file) + val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]] + println(map) + + //create flow + val flowBean = FlowBean(map) + val flow = flowBean.constructFlow() + + + val ip = InetAddress.getLocalHost.getHostAddress + cn.piflow.util.FileUtil.writeFile("server.ip=" + ip, ServerIpUtil.getServerIpFile()) + val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start() + //execute flow + val spark = SparkSession.builder() + .master("local[12]") + .appName("hive") + .config("spark.driver.memory", "4g") + .config("spark.executor.memory", "8g") + .config("spark.cores.max", "8") + .config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris")) + .enableHiveSupport() + .getOrCreate() + + val process = Runner.create() + .bind(classOf[SparkSession].getName, spark) + .bind("checkpoint.path", "") + .bind("debug.path","") + .start(flow); + + process.awaitTermination(); + val pid = process.pid(); + println(pid + "!!!!!!!!!!!!!!!!!!!!!") + spark.close(); + } + + +} diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/hive/PutHiveModeTest.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/hive/PutHiveModeTest.scala index 722f069..62e2f23 100644 --- a/piflow-bundle/src/test/scala/cn/piflow/bundle/hive/PutHiveModeTest.scala +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/hive/PutHiveModeTest.scala @@ -3,6 +3,7 @@ package cn.piflow.bundle.hive import cn.piflow.Runner import cn.piflow.conf.bean.FlowBean import cn.piflow.conf.util.{FileUtil, OptionUtil} +import cn.piflow.util.PropertyUtil import org.apache.spark.sql.SparkSession import org.h2.tools.Server import org.junit.Test @@ -33,7 +34,7 @@ class PutHiveModeTest { .config("spark.driver.memory", "1g") .config("spark.executor.memory", "2g") .config("spark.cores.max", "2") - .config("hive.metastore.uris", "thrift://192.168.3.140:9083") + .config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris")) .enableHiveSupport() .getOrCreate() diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/hive/PutHiveQLTest.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/hive/PutHiveQLTest.scala index 40173e2..830241c 100644 --- a/piflow-bundle/src/test/scala/cn/piflow/bundle/hive/PutHiveQLTest.scala +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/hive/PutHiveQLTest.scala @@ -3,6 +3,7 @@ package cn.piflow.bundle.hive import cn.piflow.Runner import cn.piflow.conf.bean.FlowBean import cn.piflow.conf.util.{FileUtil, OptionUtil} +import cn.piflow.util.PropertyUtil import org.apache.spark.sql.SparkSession import org.h2.tools.Server import org.junit.Test @@ -33,7 +34,7 @@ class PutHiveQLTest { .config("spark.driver.memory", "1g") .config("spark.executor.memory", "2g") .config("spark.cores.max", "2") - .config("hive.metastore.uris", "thrift://192.168.3.140:9083") + .config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris")) .enableHiveSupport() .getOrCreate() diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/hive/PutHiveStreamingTest.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/hive/PutHiveStreamingTest.scala index 8cf222e..332dbe8 100644 --- a/piflow-bundle/src/test/scala/cn/piflow/bundle/hive/PutHiveStreamingTest.scala +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/hive/PutHiveStreamingTest.scala @@ -3,6 +3,7 @@ package cn.piflow.bundle.hive import cn.piflow.Runner import cn.piflow.conf.bean.FlowBean import cn.piflow.conf.util.{FileUtil, OptionUtil} +import cn.piflow.util.PropertyUtil import org.apache.spark.sql.SparkSession import org.h2.tools.Server import org.junit.Test @@ -33,7 +34,7 @@ class PutHiveStreamingTest { .config("spark.driver.memory", "1g") .config("spark.executor.memory", "2g") .config("spark.cores.max", "2") - .config("hive.metastore.uris", "thrift://192.168.3.140:9083") + .config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris")) .enableHiveSupport() .getOrCreate() diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/hive/OptionalSelectHiveQLTest.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/hive/SelectHiveQLByJdbcTest.scala similarity index 84% rename from piflow-bundle/src/test/scala/cn/piflow/bundle/hive/OptionalSelectHiveQLTest.scala rename to piflow-bundle/src/test/scala/cn/piflow/bundle/hive/SelectHiveQLByJdbcTest.scala index 368f2a7..2100779 100644 --- a/piflow-bundle/src/test/scala/cn/piflow/bundle/hive/OptionalSelectHiveQLTest.scala +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/hive/SelectHiveQLByJdbcTest.scala @@ -3,19 +3,20 @@ package cn.piflow.bundle.hive import cn.piflow.Runner import cn.piflow.conf.bean.FlowBean import cn.piflow.conf.util.{FileUtil, OptionUtil} +import cn.piflow.util.PropertyUtil import org.apache.spark.sql.SparkSession import org.h2.tools.Server import org.junit.Test import scala.util.parsing.json.JSON -class OptionalSelectHiveQLTest { +class SelectHiveQLByJdbcTest { @Test def testFlow(): Unit ={ //parse flow json - val file = "src/main/resources/flow/hive/OptionalSelectHiveQL.json" + val file = "src/main/resources/flow/hive/SelectHiveQLByJdbc.json" val flowJsonStr = FileUtil.fileReader(file) val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]] println(map) @@ -33,7 +34,7 @@ class OptionalSelectHiveQLTest { .config("spark.driver.memory", "1g") .config("spark.executor.memory", "2g") .config("spark.cores.max", "2") - .config("hive.metastore.uris", "thrift://192.168.3.140:9083") + .config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris")) .enableHiveSupport() .getOrCreate() diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/hive/SelectHiveQLTest.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/hive/SelectHiveQLTest.scala index 82e0f81..1ce932a 100644 --- a/piflow-bundle/src/test/scala/cn/piflow/bundle/hive/SelectHiveQLTest.scala +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/hive/SelectHiveQLTest.scala @@ -3,6 +3,7 @@ package cn.piflow.bundle.hive import cn.piflow.Runner import cn.piflow.conf.bean.FlowBean import cn.piflow.conf.util.{FileUtil, OptionUtil} +import cn.piflow.util.PropertyUtil import org.apache.spark.sql.SparkSession import org.h2.tools.Server import org.junit.Test @@ -33,7 +34,7 @@ class SelectHiveQLTest { .config("spark.driver.memory", "1g") .config("spark.executor.memory", "2g") .config("spark.cores.max", "2") - .config("hive.metastore.uris", "thrift://192.168.3.140:9083") + .config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris")) .enableHiveSupport() .getOrCreate()