diff --git a/piflow-bundle/src/main/resources/flow/csv/CsvSaveAsError.json b/piflow-bundle/src/main/resources/flow/csv/CsvSaveAsError.json index e4973de..0c33a1a 100644 --- a/piflow-bundle/src/main/resources/flow/csv/CsvSaveAsError.json +++ b/piflow-bundle/src/main/resources/flow/csv/CsvSaveAsError.json @@ -21,7 +21,7 @@ "header": "true", "delimiter":",", "partition":"1", - "saveMode": "ErrorIfExists" + "saveMode": "error" } } diff --git a/piflow-bundle/src/main/resources/flow/graphx/LoadGraph.json b/piflow-bundle/src/main/resources/flow/graphx/LoadGraph.json new file mode 100644 index 0000000..4d896e3 --- /dev/null +++ b/piflow-bundle/src/main/resources/flow/graphx/LoadGraph.json @@ -0,0 +1,42 @@ +{ + "flow":{ + "name":"test", + "uuid":"1234", + "stops":[ + { + "uuid":"0000", + "name":"LoadGraph", + "bundle":"cn.piflow.bundle.graphx.LoadGraph", + "properties":{ + "dataPath":"hdfs://192.168.3.138:8020/work/test/test.csv" + } + + }, + { + "uuid":"1111", + "name":"LabelPropagation", + "bundle":"cn.piflow.bundle.graphx.LabelPropagation", + "properties":{ + "maxIter":"20" + } + + } + + ], + "paths":[ + { + "from":"LoadGraph", + "outport":"edges", + "inport":"edgesIn", + "to":"LabelPropagation" + }, + { + "from":"LoadGraph", + "outport":"vertex", + "inport":"vertexIn", + "to":"LabelPropagation" + } + + ] + } +} \ No newline at end of file diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/SelectField.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/SelectField.scala index 78b0a60..df8ed79 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/SelectField.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/SelectField.scala @@ -57,7 +57,7 @@ class SelectField extends ConfigurableStop { } override def getGroup(): List[String] = { - List(StopGroup.CommonGroup.toString) + List(StopGroup.CommonGroup) } } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvSave.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvSave.scala index c5ba4c9..3e48b7f 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvSave.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvSave.scala @@ -30,7 +30,7 @@ class CsvSave extends ConfigurableStop{ override def getPropertyDescriptor(): List[PropertyDescriptor] = { - val saveModeOption = Set("append","overwrite","ErrorIfExists","ignore") + val saveModeOption = Set("append","overwrite","error","ignore") var descriptor : List[PropertyDescriptor] = List() val csvSavePath = new PropertyDescriptor() diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/FolderCsvParser.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/FolderCsvParser.scala index 266be71..9e05a85 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/FolderCsvParser.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/FolderCsvParser.scala @@ -11,8 +11,8 @@ import org.apache.spark.sql.{DataFrame, Row, SparkSession} class FolderCsvParser extends ConfigurableStop{ override val authorEmail: String = "yangqidong@cnic.cn" - val inportList: List[String] = List(Port.NonePort.toString) - val outportList: List[String] = List(Port.DefaultPort.toString) + val inportList: List[String] = List(Port.NonePort) + val outportList: List[String] = List(Port.DefaultPort) override val description: String = "Parse csv folder" var FolderPath:String=_ diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/es/QueryElasticsearch.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/es/QueryElasticsearch.scala index e3109cb..96626d3 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/es/QueryElasticsearch.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/es/QueryElasticsearch.scala @@ -10,7 +10,7 @@ class QueryElasticsearch extends ConfigurableStop { val authorEmail: String = "ygang@cnic.cn" val description: String = "Query data from Elasticsearch" - val inportList: List[String] = List(Port.NonePort) + val inportList: List[String] = List(Port.DefaultPort) val outportList: List[String] = List(Port.DefaultPort) var es_nodes : String = _ diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/file/GetFile.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/file/GetFile.scala index 02e78ad..1d81c76 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/file/GetFile.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/file/GetFile.scala @@ -50,7 +50,8 @@ class GetFile extends ConfigurableStop{ .description("Server IP where the local file is located") .defaultValue("") .required(true) - .description("192.168.3.139") + .example("192.168.3.139") + descriptor = IP :: descriptor val User = new PropertyDescriptor() .name("User") @@ -59,6 +60,7 @@ class GetFile extends ConfigurableStop{ .defaultValue("root") .required(true) .example("root") + descriptor = User :: descriptor val PassWord = new PropertyDescriptor() .name("PassWord") @@ -67,7 +69,7 @@ class GetFile extends ConfigurableStop{ .defaultValue("") .required(true) .example("123456") - + descriptor = PassWord :: descriptor val hdfsFile = new PropertyDescriptor() .name("hdfsFile") @@ -75,23 +77,18 @@ class GetFile extends ConfigurableStop{ .description("path to file on hdfs") .required(true) .example("/work/test.csv") - + descriptor = hdfsFile :: descriptor val localPath = new PropertyDescriptor() .name("localPath") - .displayName("localPath") + .displayName("LocalPath") .description("Local folder") .defaultValue("") .required(true) .example("/opt/") - - - - descriptor = IP :: descriptor - descriptor = User :: descriptor - descriptor = PassWord :: descriptor descriptor = localPath :: descriptor - descriptor = hdfsFile :: descriptor + + descriptor } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/graphx/LabelPropagation.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/graphx/LabelPropagation.scala index 6fd6686..8867f67 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/graphx/LabelPropagation.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/graphx/LabelPropagation.scala @@ -51,7 +51,13 @@ class LabelPropagation extends ConfigurableStop { override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - val maxIter = new PropertyDescriptor().name("maxIter").displayName("MAX_ITER").defaultValue("").allowableValues(Set("")).required(false) + val maxIter = new PropertyDescriptor() + .name("maxIter") + .displayName("MAX_ITER") + .defaultValue("") + .allowableValues(Set("")) + .required(false) + .example("20") descriptor = maxIter :: descriptor descriptor } @@ -61,7 +67,7 @@ class LabelPropagation extends ConfigurableStop { } override def getGroup(): List[String] = { - List(StopGroup.GraphX.toString) + List(StopGroup.GraphX) } } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/graphx/LoadGraph.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/graphx/LoadGraph.scala index e85fbdd..571104c 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/graphx/LoadGraph.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/graphx/LoadGraph.scala @@ -15,7 +15,6 @@ class LoadGraph extends ConfigurableStop { var edgePort : String = "edges" var vertexPort : String = "vertex" - val outportList: List[String] = List(edgePort,vertexPort) @@ -26,13 +25,13 @@ class LoadGraph extends ConfigurableStop { val sc=spark.sparkContext import spark.sqlContext.implicits._ - var graph=GraphLoader.edgeListFile(sc,dataPath,true).partitionBy(PartitionStrategy.RandomVertexCut) - //val df=Seq((graphx.edges.to,graphx.vertices)).toDF() + var graph=GraphLoader + .edgeListFile(sc,dataPath,true) + .partitionBy(PartitionStrategy.RandomVertexCut) //TODO:can not transfer EdgeRdd to Dataset out.write(edgePort,graph.edges.toDF()) out.write(vertexPort,graph.vertices.toDF()) - //df.show() } def initialize(ctx: ProcessContext): Unit = { @@ -45,7 +44,13 @@ class LoadGraph extends ConfigurableStop { override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - val dataPath = new PropertyDescriptor().name("dataPath").displayName("DATA_PATH").defaultValue("").allowableValues(Set("")).required(true) + val dataPath = new PropertyDescriptor() + .name("dataPath") + .displayName("Data_Path") + .defaultValue("") + .allowableValues(Set("")) + .required(true) + .example("hdfs://192.168.3.138:8020/work/test/test.csv") descriptor = dataPath :: descriptor descriptor } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/DeleteHdfs.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/DeleteHdfs.scala index bea6bcf..8d3e3e9 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/DeleteHdfs.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/DeleteHdfs.scala @@ -12,15 +12,16 @@ import org.apache.hadoop.fs.Path class DeleteHdfs extends ConfigurableStop{ - override val authorEmail: String = "ygang@cnic.com" - override val inportList: List[String] = List(Port.DefaultPort.toString) - override val outportList: List[String] = List(Port.DefaultPort.toString) - override val description: String = "Delete file or directory on hdfs" + override val authorEmail: String = "ygang@cnic.com" + override val inportList: List[String] = List(Port.DefaultPort) + override val outportList: List[String] = List(Port.DefaultPort) + override val description: String = "Delete files or directories on HDFS" var hdfsUrl :String= _ var hdfsPath :String = _ var isCustomize:String=_ + override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { val spark = pec.get[SparkSession]() @@ -89,14 +90,14 @@ class DeleteHdfs extends ConfigurableStop{ val isCustomize = new PropertyDescriptor() .name("isCustomize") - .displayName("isCustomize") - .description("Whether to customize the compressed file path, if true, " + - "you must specify the path where the compressed file is located . " + - "If it is false, it will automatically find the file path data from the upstream port ") + .displayName("IsCustomize") + .description("Whether to customize the compressed file path, if true," + + "you must specify the path where the compressed file is located." + + "If false,automatically find the file path data from the upstream port") .defaultValue("true") .allowableValues(Set("true","false")) .required(true) - .example("true") + .example("true") descriptor = isCustomize :: descriptor @@ -108,7 +109,7 @@ class DeleteHdfs extends ConfigurableStop{ } override def getGroup(): List[String] = { - List(StopGroup.HdfsGroup.toString) + List(StopGroup.HdfsGroup) } override def initialize(ctx: ProcessContext): Unit = { diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/FileDownHdfs.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/FileDownHdfs.scala index 10d52cd..6e436d5 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/FileDownHdfs.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/FileDownHdfs.scala @@ -14,11 +14,11 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Row, SparkSession} class FileDownHdfs extends ConfigurableStop{ + val authorEmail: String = "yangqidong@cnic.cn" val description: String = "Download the data from the url to HDFS" - val inportList: List[String] = List(Port.DefaultPort.toString) - val outportList: List[String] = List(Port.DefaultPort.toString) - + val inportList: List[String] = List(Port.DefaultPort) + val outportList: List[String] = List(Port.DefaultPort) var hdfsUrl:String =_ var hdfsPath:String =_ @@ -38,13 +38,11 @@ class FileDownHdfs extends ConfigurableStop{ val configuration: Configuration = new Configuration() - configuration.set("fs.defaultFS",hdfsUrl) val fs = FileSystem.get(configuration) val fdos: FSDataOutputStream = fs.create(new Path(hdfsUrl+hdfsPath)) - while(((byteRead=inputStream.read(buffer)) != -1) && (byteRead != -1)){ fdos.write(buffer,0,byteRead) fdos.flush() @@ -63,7 +61,6 @@ class FileDownHdfs extends ConfigurableStop{ out.write(df) - } def initialize(ctx: ProcessContext): Unit = { @@ -79,22 +76,22 @@ class FileDownHdfs extends ConfigurableStop{ override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - val url_str = new PropertyDescriptor() .name("url_str") - .displayName("Url_str") + .displayName("Url_Str") .description("Network address of file") .defaultValue("") .required(true) + descriptor = url_str :: descriptor val hdfsPath = new PropertyDescriptor() - .name("hdfsDirPath") - .displayName("HdfsDirPath") + .name("hdfsPath") + .displayName("HdfsPath") .defaultValue("") - .description("File dir path of HDFS") + .description("File path of HDFS") .required(true) .example("/work/test.gz") - + descriptor = hdfsPath :: descriptor val hdfsUrl = new PropertyDescriptor() .name("hdfsUrl") @@ -103,11 +100,8 @@ class FileDownHdfs extends ConfigurableStop{ .description("URL address of HDFS") .required(true) .example("hdfs://192.168.3.138:8020") - - - descriptor = url_str :: descriptor descriptor = hdfsUrl :: descriptor - descriptor = hdfsPath :: descriptor + descriptor } @@ -116,7 +110,7 @@ class FileDownHdfs extends ConfigurableStop{ } override def getGroup(): List[String] = { - List(StopGroup.HdfsGroup.toString) + List(StopGroup.HdfsGroup) } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/GetHdfs.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/GetHdfs.scala index 831dccd..1670093 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/GetHdfs.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/GetHdfs.scala @@ -10,8 +10,8 @@ import org.apache.spark.sql.SparkSession class GetHdfs extends ConfigurableStop{ override val authorEmail: String = "ygang@cnic.com" override val description: String = "Get data from hdfs" - override val inportList: List[String] = List(Port.DefaultPort.toString) - override val outportList: List[String] = List(Port.DefaultPort.toString) + override val inportList: List[String] = List(Port.DefaultPort) + override val outportList: List[String] = List(Port.DefaultPort) var hdfsUrl : String=_ var hdfsPath :String= _ @@ -26,24 +26,20 @@ class GetHdfs extends ConfigurableStop{ if (types == "json") { val rdd = spark.read.json(path) - //rdd.show() rdd.schema.printTreeString() out.write(rdd) } else if (types == "csv") { - val rdd = spark.read.csv(path) - //rdd.show() rdd.schema.printTreeString() out.write(rdd) }else if (types == "parquet") { val rdd = spark.read.csv(path) - //rdd.show() rdd.schema.printTreeString() out.write(rdd) - } - else { + + }else { val rdd = sc.textFile(path) val outDf = rdd.toDF() outDf.schema.printTreeString() @@ -79,13 +75,12 @@ class GetHdfs extends ConfigurableStop{ val types = new PropertyDescriptor(). name("types") - .displayName("types") + .displayName("Types") .description("The type of file you want to load") .defaultValue("csv") .allowableValues(Set("txt","parquet","csv","json")) .required(true) .example("csv") - descriptor = types :: descriptor descriptor @@ -96,7 +91,7 @@ class GetHdfs extends ConfigurableStop{ } override def getGroup(): List[String] = { - List(StopGroup.HdfsGroup.toString) + List(StopGroup.HdfsGroup) } override def initialize(ctx: ProcessContext): Unit = { diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/ListHdfs.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/ListHdfs.scala index 0f752f4..ac58b6e 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/ListHdfs.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/ListHdfs.scala @@ -15,15 +15,16 @@ import scala.collection.mutable.ArrayBuffer class ListHdfs extends ConfigurableStop{ - override val authorEmail: String = "ygang@cnic.com" + override val authorEmail: String = "ygang@cnic.com" + override val description: String = "Retrieve a list of files from hdfs" override val inportList: List[String] = List(Port.DefaultPort.toString) override val outportList: List[String] = List(Port.DefaultPort.toString) - override val description: String = "Retrieve a list of files from hdfs" var hdfsPath :String= _ var hdfsUrl :String= _ var pathARR:ArrayBuffer[String]=ArrayBuffer() + override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { val spark = pec.get[SparkSession]() val sc = spark.sparkContext @@ -58,7 +59,6 @@ class ListHdfs extends ConfigurableStop{ for (f <- statuses) { val fsPath = f.getPath().toString if (f.isDirectory) { -// pathARR += fsPath iterationFile(fsPath) } else{ pathARR += f.getPath.toString @@ -100,7 +100,7 @@ class ListHdfs extends ConfigurableStop{ } override def getGroup(): List[String] = { - List(StopGroup.HdfsGroup.toString) + List(StopGroup.HdfsGroup) } override def initialize(ctx: ProcessContext): Unit = { diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/PutHdfs.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/PutHdfs.scala index 7929fe4..310db5d 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/PutHdfs.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/PutHdfs.scala @@ -10,10 +10,11 @@ import org.apache.hadoop.fs.FileSystem import org.apache.spark.sql.SparkSession class PutHdfs extends ConfigurableStop{ + override val authorEmail: String = "ygang@cnic.com" - override val inportList: List[String] = List(Port.DefaultPort.toString) - override val outportList: List[String] = List(Port.DefaultPort.toString) - override val description: String = "Put data to hdfs" + override val description: String = "Put data into hdfs" + override val inportList: List[String] = List(Port.DefaultPort) + override val outportList: List[String] = List(Port.DefaultPort) var hdfsPath :String= _ var hdfsUrl :String= _ @@ -54,7 +55,7 @@ class PutHdfs extends ConfigurableStop{ .description("File path of HDFS") .required(true) .example("/work/") - + descriptor = hdfsPath :: descriptor val hdfsUrl = new PropertyDescriptor() .name("hdfsUrl") @@ -63,29 +64,27 @@ class PutHdfs extends ConfigurableStop{ .description("URL address of HDFS") .required(true) .example("hdfs://192.168.3.138:8020") - + descriptor = hdfsUrl :: descriptor val types = new PropertyDescriptor() .name("types") .displayName("Types") - .description("What format do you want to write : json,csv,parquet") + .description("The format you want to write is json,csv,parquet") .defaultValue("csv") .allowableValues(Set("json","csv","parquet")) .required(true) .example("csv") + descriptor = types :: descriptor val partition = new PropertyDescriptor() .name("partition") .displayName("Partition") - .description("Write a few partitions") + .description("Write several partitions") .defaultValue("1") .required(true) .example("1") - descriptor = partition :: descriptor - descriptor = types :: descriptor - descriptor = hdfsPath :: descriptor - descriptor = hdfsUrl :: descriptor + descriptor } @@ -94,7 +93,7 @@ class PutHdfs extends ConfigurableStop{ } override def getGroup(): List[String] = { - List(StopGroup.HdfsGroup.toString) + List(StopGroup.HdfsGroup) } override def initialize(ctx: ProcessContext): Unit = { diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/SaveToHdfs.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/SaveToHdfs.scala index b3df5ec..de401e2 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/SaveToHdfs.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/SaveToHdfs.scala @@ -15,11 +15,10 @@ import scala.collection.mutable.ArrayBuffer class SaveToHdfs extends ConfigurableStop { - override val description: String = "Put data to hdfs " val authorEmail: String = "ygang@cnic.cn" - - override val inportList: List[String] = List(Port.DefaultPort.toString) - override val outportList: List[String] = List(Port.DefaultPort.toString) + override val description: String = "Put data into hdfs " + override val inportList: List[String] = List(Port.DefaultPort) + override val outportList: List[String] = List(Port.DefaultPort) var hdfsDirPath :String= _ var hdfsUrl :String= _ @@ -57,7 +56,6 @@ class SaveToHdfs extends ConfigurableStop { inDF.repartition(1).write.text(hdfsDir) } - iterationFile(hdfsDir) val oldPath = new Path(oldFilePath) @@ -79,7 +77,6 @@ class SaveToHdfs extends ConfigurableStop { val outDF: DataFrame = spark.createDataFrame(rowRDD,schema) out.write(outDF) - } // recursively traverse the folder @@ -96,7 +93,6 @@ class SaveToHdfs extends ConfigurableStop { for (f <- statuses) { val fsPath = f.getPath().toString if (f.isDirectory) { - // pathARR += fsPath iterationFile(fsPath) } else{ if (f.getPath.toString.contains("part")){ @@ -133,7 +129,7 @@ class SaveToHdfs extends ConfigurableStop { .description("File dir path of HDFS") .required(true) .example("/work/") - + descriptor = hdfsDirPath :: descriptor val hdfsUrl = new PropertyDescriptor() .name("hdfsUrl") @@ -142,46 +138,43 @@ class SaveToHdfs extends ConfigurableStop { .description("URL address of HDFS") .required(true) .example("hdfs://192.168.3.138:8020") - + descriptor = hdfsUrl :: descriptor val fileName = new PropertyDescriptor() .name("fileName") - .displayName("fileName") + .displayName("FileName") .description("File name") .defaultValue("") .required(true) .example("test.csv") + descriptor = fileName :: descriptor val types = new PropertyDescriptor() .name("types") .displayName("json,csv,text") - .description("What format do you want to write : json,csv,parquet") + .description("The format you want to write is json,csv,parquet") .defaultValue("csv") .allowableValues(Set("json","csv","text")) .required(true) .example("csv") + descriptor = types :: descriptor val delimiter = new PropertyDescriptor() .name("delimiter") .displayName("delimiter") - .description("The delimiter of csv file,types is csv ,please set it ") + .description("Please set the separator for the type of csv file") .defaultValue(",") .required(true) + .example(",") + descriptor = delimiter :: descriptor - //header val header = new PropertyDescriptor() .name("header") .displayName("header") - .description("Whether the csv file have header or not") + .description("Does the csv file have a header") .defaultValue("true") .required(true) - descriptor = header :: descriptor - descriptor = fileName :: descriptor - descriptor = delimiter :: descriptor - descriptor = hdfsDirPath :: descriptor - descriptor = hdfsUrl :: descriptor - descriptor = types :: descriptor descriptor } @@ -190,7 +183,7 @@ class SaveToHdfs extends ConfigurableStop { } override def getGroup(): List[String] = { - List(StopGroup.HdfsGroup.toString) + List(StopGroup.HdfsGroup) } } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/SelectFilesByName.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/SelectFilesByName.scala index 12f7880..d7e189e 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/SelectFilesByName.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/SelectFilesByName.scala @@ -17,8 +17,8 @@ import scala.collection.mutable.ArrayBuffer class SelectFilesByName extends ConfigurableStop{ override val authorEmail: String = "yangqidong@cnic.cn" override val description: String = "Select files by file name" - override val inportList: List[String] = List(Port.DefaultPort.toString) - override val outportList: List[String] = List(Port.DefaultPort.toString) + override val inportList: List[String] = List(Port.DefaultPort) + override val outportList: List[String] = List(Port.DefaultPort) var hdfsUrl:String=_ var hdfsPath:String=_ @@ -88,7 +88,7 @@ class SelectFilesByName extends ConfigurableStop{ .description("File path of HDFS") .required(true) .example("/work/") - + descriptor = hdfsPath :: descriptor val hdfsUrl = new PropertyDescriptor() .name("hdfsUrl") @@ -97,18 +97,17 @@ class SelectFilesByName extends ConfigurableStop{ .description("URL address of HDFS") .required(true) .example("hdfs://192.168.3.138:8020") + descriptor = hdfsUrl :: descriptor val selectionConditions = new PropertyDescriptor() .name("selectionConditions") .displayName("SelectionConditions") - .description("To select conditions, you need to fill in regular expressions in java, such as '.*.csv'") + .description("To select a conditions,a regular expression needs to be populated in java") .defaultValue("") .required(true) - .example("") - - descriptor = hdfsUrl :: descriptor - descriptor = hdfsPath :: descriptor + .example(".*.csv") descriptor = selectionConditions :: descriptor + descriptor } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/UnzipFilesOnHDFS.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/UnzipFilesOnHDFS.scala index a186a66..36256fc 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/UnzipFilesOnHDFS.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/UnzipFilesOnHDFS.scala @@ -19,8 +19,8 @@ import scala.collection.mutable.ArrayBuffer class UnzipFilesOnHDFS extends ConfigurableStop { val authorEmail: String = "yangqidong@cnic.cn" val description: String = "Unzip files on hdfs" - val inportList: List[String] = List(Port.DefaultPort.toString) - val outportList: List[String] = List(Port.DefaultPort.toString) + val inportList: List[String] = List(Port.DefaultPort) + val outportList: List[String] = List(Port.DefaultPort) var isCustomize:String=_ var hdfsUrl:String=_ @@ -30,12 +30,10 @@ class UnzipFilesOnHDFS extends ConfigurableStop { var session: SparkSession = null var arr:ArrayBuffer[Row]=ArrayBuffer() - def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { session = pec.get[SparkSession]() - if(isCustomize.equals("true")){ unzipFile(hdfsUrl+filePath,savePath) @@ -55,10 +53,8 @@ class UnzipFilesOnHDFS extends ConfigurableStop { val df: DataFrame = session.createDataFrame(rdd,schema) out.write(df) - } - def whatType(p:String): String = { var typeStr:String="" val pathNames: Array[String] = p.split("\\.") @@ -197,7 +193,6 @@ class UnzipFilesOnHDFS extends ConfigurableStop { .example("hdfs://192.168.3.138:8020") - val savePath = new PropertyDescriptor() .name("savePath") .displayName("savePath") diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/clean/EmailCleanTest.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/clean/EmailCleanTest.scala index d9601a0..e8bca3d 100644 --- a/piflow-bundle/src/test/scala/cn/piflow/bundle/clean/EmailCleanTest.scala +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/clean/EmailCleanTest.scala @@ -3,6 +3,7 @@ package cn.piflow.bundle.clean import cn.piflow.Runner import cn.piflow.conf.bean.FlowBean import cn.piflow.conf.util.{FileUtil, OptionUtil} +import cn.piflow.util.PropertyUtil import org.apache.spark.sql.SparkSession import org.h2.tools.Server import org.junit.Test @@ -33,7 +34,7 @@ class EmailCleanTest { .config("spark.driver.memory", "1g") .config("spark.executor.memory", "2g") .config("spark.cores.max", "2") - .config("hive.metastore.uris", "thrift://192.168.3.140:9083") + .config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris")) .enableHiveSupport() .getOrCreate() diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/clean/IdentityNumberCleanTest.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/clean/IdentityNumberCleanTest.scala index d2069a0..e3d2a52 100644 --- a/piflow-bundle/src/test/scala/cn/piflow/bundle/clean/IdentityNumberCleanTest.scala +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/clean/IdentityNumberCleanTest.scala @@ -3,6 +3,7 @@ package cn.piflow.bundle.clean import cn.piflow.Runner import cn.piflow.conf.bean.FlowBean import cn.piflow.conf.util.{FileUtil, OptionUtil} +import cn.piflow.util.PropertyUtil import org.apache.spark.sql.SparkSession import org.h2.tools.Server import org.junit.Test @@ -33,7 +34,7 @@ class IdentityNumberCleanTest { .config("spark.driver.memory", "1g") .config("spark.executor.memory", "2g") .config("spark.cores.max", "2") - .config("hive.metastore.uris", "thrift://192.168.3.140:9083") + .config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris")) .enableHiveSupport() .getOrCreate() diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/clean/PhoneNumberCleanTest.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/clean/PhoneNumberCleanTest.scala index 52c1878..6ce3e7e 100644 --- a/piflow-bundle/src/test/scala/cn/piflow/bundle/clean/PhoneNumberCleanTest.scala +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/clean/PhoneNumberCleanTest.scala @@ -3,6 +3,7 @@ package cn.piflow.bundle.clean import cn.piflow.Runner import cn.piflow.conf.bean.FlowBean import cn.piflow.conf.util.{FileUtil, OptionUtil} +import cn.piflow.util.PropertyUtil import org.apache.spark.sql.SparkSession import org.h2.tools.Server import org.junit.Test @@ -33,7 +34,7 @@ class PhoneNumberCleanTest { .config("spark.driver.memory", "1g") .config("spark.executor.memory", "2g") .config("spark.cores.max", "2") - .config("hive.metastore.uris", "thrift://192.168.3.140:9083") + .config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris")) .enableHiveSupport() .getOrCreate() diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/clean/ProvinceCleanTest.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/clean/ProvinceCleanTest.scala index 009bbf6..8d193cf 100644 --- a/piflow-bundle/src/test/scala/cn/piflow/bundle/clean/ProvinceCleanTest.scala +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/clean/ProvinceCleanTest.scala @@ -3,6 +3,7 @@ package cn.piflow.bundle.clean import cn.piflow.Runner import cn.piflow.conf.bean.FlowBean import cn.piflow.conf.util.{FileUtil, OptionUtil} +import cn.piflow.util.PropertyUtil import org.apache.spark.sql.SparkSession import org.h2.tools.Server import org.junit.Test @@ -33,7 +34,7 @@ class ProvinceCleanTest { .config("spark.driver.memory", "1g") .config("spark.executor.memory", "2g") .config("spark.cores.max", "2") - .config("hive.metastore.uris", "thrift://192.168.3.140:9083") + .config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris")) .enableHiveSupport() .getOrCreate() diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/clean/TitleCleanTest.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/clean/TitleCleanTest.scala index 0831135..34366a0 100644 --- a/piflow-bundle/src/test/scala/cn/piflow/bundle/clean/TitleCleanTest.scala +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/clean/TitleCleanTest.scala @@ -3,6 +3,7 @@ package cn.piflow.bundle.clean import cn.piflow.Runner import cn.piflow.conf.bean.FlowBean import cn.piflow.conf.util.{FileUtil, OptionUtil} +import cn.piflow.util.PropertyUtil import org.apache.spark.sql.SparkSession import org.h2.tools.Server import org.junit.Test @@ -33,7 +34,7 @@ class TitleCleanTest { .config("spark.driver.memory", "1g") .config("spark.executor.memory", "2g") .config("spark.cores.max", "2") - .config("hive.metastore.uris", "thrift://192.168.3.140:9083") + .config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris")) .enableHiveSupport() .getOrCreate() diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/csv/CsvParserTest.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/csv/CsvParserTest.scala index aaf9f73..3a0001c 100644 --- a/piflow-bundle/src/test/scala/cn/piflow/bundle/csv/CsvParserTest.scala +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/csv/CsvParserTest.scala @@ -3,6 +3,7 @@ package cn.piflow.bundle.csv import cn.piflow.Runner import cn.piflow.conf.bean.FlowBean import cn.piflow.conf.util.{FileUtil, OptionUtil} +import cn.piflow.util.PropertyUtil import org.apache.spark.sql.SparkSession import org.h2.tools.Server import org.junit.Test @@ -33,7 +34,7 @@ class CsvParserTest { .config("spark.driver.memory", "1g") .config("spark.executor.memory", "2g") .config("spark.cores.max", "2") - .config("hive.metastore.uris", "thrift://192.168.3.140:9083") + .config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris")) .enableHiveSupport() .getOrCreate() diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/csv/CsvSaveAsAppendTest.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/csv/CsvSaveAsAppendTest.scala index 59043aa..d94d1b9 100644 --- a/piflow-bundle/src/test/scala/cn/piflow/bundle/csv/CsvSaveAsAppendTest.scala +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/csv/CsvSaveAsAppendTest.scala @@ -3,6 +3,7 @@ package cn.piflow.bundle.csv import cn.piflow.bundle.json.JsonSave import cn.piflow.conf.bean.FlowBean import cn.piflow.conf.util.{FileUtil, OptionUtil} +import cn.piflow.util.PropertyUtil import cn.piflow.{FlowImpl, Path, Runner} import org.apache.spark.sql.SparkSession import org.h2.tools.Server @@ -33,7 +34,7 @@ class CsvSaveAsAppendTest { .config("spark.driver.memory", "1g") .config("spark.executor.memory", "2g") .config("spark.cores.max", "2") - .config("hive.metastore.uris", "thrift://192.168.3.140:9083") + .config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris")) .enableHiveSupport() .getOrCreate() diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/csv/CsvSaveAsErrorTest.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/csv/CsvSaveAsErrorTest.scala index 6261bb3..18d4335 100644 --- a/piflow-bundle/src/test/scala/cn/piflow/bundle/csv/CsvSaveAsErrorTest.scala +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/csv/CsvSaveAsErrorTest.scala @@ -3,6 +3,7 @@ package cn.piflow.bundle.csv import cn.piflow.Runner import cn.piflow.conf.bean.FlowBean import cn.piflow.conf.util.{FileUtil, OptionUtil} +import cn.piflow.util.PropertyUtil import org.apache.spark.sql.SparkSession import org.h2.tools.Server import org.junit.Test @@ -32,7 +33,7 @@ class CsvSaveAsErrorTest { .config("spark.driver.memory", "1g") .config("spark.executor.memory", "2g") .config("spark.cores.max", "2") - .config("hive.metastore.uris", "thrift://192.168.3.140:9083") + .config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris")) .enableHiveSupport() .getOrCreate() diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/csv/CsvSaveAsIgnoreTest.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/csv/CsvSaveAsIgnoreTest.scala index e0b5e45..823cb60 100644 --- a/piflow-bundle/src/test/scala/cn/piflow/bundle/csv/CsvSaveAsIgnoreTest.scala +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/csv/CsvSaveAsIgnoreTest.scala @@ -3,6 +3,7 @@ package cn.piflow.bundle.csv import cn.piflow.Runner import cn.piflow.conf.bean.FlowBean import cn.piflow.conf.util.{FileUtil, OptionUtil} +import cn.piflow.util.PropertyUtil import org.apache.spark.sql.SparkSession import org.h2.tools.Server import org.junit.Test @@ -32,7 +33,7 @@ class CsvSaveAsIgnoreTest { .config("spark.driver.memory", "1g") .config("spark.executor.memory", "2g") .config("spark.cores.max", "2") - .config("hive.metastore.uris", "thrift://192.168.3.140:9083") + .config("hive.metastore.uris", PropertyUtil.getPropertyValue("hive.metastore.uris")) .enableHiveSupport() .getOrCreate() diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/csv/CsvSaveAsOverwriteTest.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/csv/CsvSaveAsOverwriteTest.scala index fd56f0d..5845fad 100644 --- a/piflow-bundle/src/test/scala/cn/piflow/bundle/csv/CsvSaveAsOverwriteTest.scala +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/csv/CsvSaveAsOverwriteTest.scala @@ -3,6 +3,7 @@ package cn.piflow.bundle.csv import cn.piflow.Runner import cn.piflow.conf.bean.FlowBean import cn.piflow.conf.util.{FileUtil, OptionUtil} +import cn.piflow.util.PropertyUtil import org.apache.spark.sql.SparkSession import org.h2.tools.Server import org.junit.Test @@ -32,7 +33,7 @@ class CsvSaveAsOverwriteTest { .config("spark.driver.memory", "1g") .config("spark.executor.memory", "2g") .config("spark.cores.max", "2") - .config("hive.metastore.uris", "thrift://192.168.3.140:9083") + .config("hive.metastore.uris", PropertyUtil.getPropertyValue("hive.metastore.uris")) .enableHiveSupport() .getOrCreate() diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/csv/CsvStringParserTest.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/csv/CsvStringParserTest.scala index 183794c..4c151f9 100644 --- a/piflow-bundle/src/test/scala/cn/piflow/bundle/csv/CsvStringParserTest.scala +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/csv/CsvStringParserTest.scala @@ -3,6 +3,7 @@ package cn.piflow.bundle.csv import cn.piflow.Runner import cn.piflow.conf.bean.FlowBean import cn.piflow.conf.util.{FileUtil, OptionUtil} +import cn.piflow.util.PropertyUtil import org.apache.spark.sql.SparkSession import org.h2.tools.Server import org.junit.Test @@ -31,7 +32,7 @@ class CsvStringParserTest { .config("spark.driver.memory", "1g") .config("spark.executor.memory", "2g") .config("spark.cores.max", "2") - .config("hive.metastore.uris", "thrift://192.168.3.140:9083") + .config("hive.metastore.uris", PropertyUtil.getPropertyValue("hive.metastore.uris")) .enableHiveSupport() .getOrCreate() diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/csv/FolderCsvParserTest.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/csv/FolderCsvParserTest.scala index fb3d609..30a00fe 100644 --- a/piflow-bundle/src/test/scala/cn/piflow/bundle/csv/FolderCsvParserTest.scala +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/csv/FolderCsvParserTest.scala @@ -3,6 +3,7 @@ package cn.piflow.bundle.csv import cn.piflow.Runner import cn.piflow.conf.bean.FlowBean import cn.piflow.conf.util.{FileUtil, OptionUtil} +import cn.piflow.util.PropertyUtil import org.apache.spark.sql.SparkSession import org.h2.tools.Server import org.junit.Test @@ -32,7 +33,7 @@ class FolderCsvParserTest { .config("spark.driver.memory", "1g") .config("spark.executor.memory", "2g") .config("spark.cores.max", "2") - .config("hive.metastore.uris", "thrift://192.168.3.140:9083") + .config("hive.metastore.uris", PropertyUtil.getPropertyValue("hive.metastore.uris")) .enableHiveSupport() .getOrCreate() diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/es/FetchElasticsearchTest.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/es/FetchElasticsearchTest.scala index 2ae26c4..4a64502 100644 --- a/piflow-bundle/src/test/scala/cn/piflow/bundle/es/FetchElasticsearchTest.scala +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/es/FetchElasticsearchTest.scala @@ -3,6 +3,7 @@ package cn.piflow.bundle.es import cn.piflow.Runner import cn.piflow.conf.bean.FlowBean import cn.piflow.conf.util.{FileUtil, OptionUtil} +import cn.piflow.util.PropertyUtil import org.apache.spark.sql.SparkSession import org.h2.tools.Server import org.junit.Test @@ -33,7 +34,7 @@ class FetchElasticsearchTest { .config("spark.driver.memory", "1g") .config("spark.executor.memory", "2g") .config("spark.cores.max", "2") - .config("hive.metastore.uris", "thrift://192.168.3.140:9083") + .config("hive.metastore.uris", PropertyUtil.getPropertyValue("hive.metastore.uris")) .enableHiveSupport() .getOrCreate() diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/es/PutElasticsearchTest.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/es/PutElasticsearchTest.scala index f9c6d0d..77865c3 100644 --- a/piflow-bundle/src/test/scala/cn/piflow/bundle/es/PutElasticsearchTest.scala +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/es/PutElasticsearchTest.scala @@ -3,6 +3,7 @@ package cn.piflow.bundle.es import cn.piflow.Runner import cn.piflow.conf.bean.FlowBean import cn.piflow.conf.util.{FileUtil, OptionUtil} +import cn.piflow.util.PropertyUtil import org.apache.spark.sql.SparkSession import org.h2.tools.Server import org.junit.Test @@ -33,7 +34,7 @@ class PutElasticsearchTest { .config("spark.driver.memory", "1g") .config("spark.executor.memory", "2g") .config("spark.cores.max", "2") - .config("hive.metastore.uris", "thrift://192.168.3.140:9083") + .config("hive.metastore.uris", PropertyUtil.getPropertyValue("hive.metastore.uris")) .enableHiveSupport() .getOrCreate() diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/es/QueryElasticsearchTest.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/es/QueryElasticsearchTest.scala index f8fda68..e6d27db 100644 --- a/piflow-bundle/src/test/scala/cn/piflow/bundle/es/QueryElasticsearchTest.scala +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/es/QueryElasticsearchTest.scala @@ -3,6 +3,7 @@ package cn.piflow.bundle.es import cn.piflow.Runner import cn.piflow.conf.bean.FlowBean import cn.piflow.conf.util.{FileUtil, OptionUtil} +import cn.piflow.util.PropertyUtil import org.apache.spark.sql.SparkSession import org.h2.tools.Server import org.junit.Test @@ -33,7 +34,7 @@ class QueryElasticsearchTest { .config("spark.driver.memory", "1g") .config("spark.executor.memory", "2g") .config("spark.cores.max", "2") - .config("hive.metastore.uris", "thrift://192.168.3.140:9083") + .config("hive.metastore.uris", PropertyUtil.getPropertyValue("hive.metastore.uris")) .enableHiveSupport() .getOrCreate() diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/ftp/LoadFromFtpToHDFS.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/ftp/LoadFromFtpToHDFSTest.scala similarity index 89% rename from piflow-bundle/src/test/scala/cn/piflow/bundle/ftp/LoadFromFtpToHDFS.scala rename to piflow-bundle/src/test/scala/cn/piflow/bundle/ftp/LoadFromFtpToHDFSTest.scala index 6800d71..69c403b 100644 --- a/piflow-bundle/src/test/scala/cn/piflow/bundle/ftp/LoadFromFtpToHDFS.scala +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/ftp/LoadFromFtpToHDFSTest.scala @@ -3,13 +3,14 @@ package cn.piflow.bundle.ftp import cn.piflow.Runner import cn.piflow.conf.bean.FlowBean import cn.piflow.conf.util.{FileUtil, OptionUtil} +import cn.piflow.util.PropertyUtil import org.apache.spark.sql.SparkSession import org.h2.tools.Server import org.junit.Test import scala.util.parsing.json.JSON -class LoadFromFtpToHDFS { +class LoadFromFtpToHDFSTest { @Test def testFlow(): Unit ={ @@ -33,7 +34,7 @@ class LoadFromFtpToHDFS { .config("spark.driver.memory", "1g") .config("spark.executor.memory", "2g") .config("spark.cores.max", "2") - .config("hive.metastore.uris", "thrift://192.168.3.140:9083") + .config("hive.metastore.uris", PropertyUtil.getPropertyValue("hive.metastore.uris")) .enableHiveSupport() .getOrCreate() diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/ftp/UploadToFtp.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/ftp/UploadToFtpTest.scala similarity index 89% rename from piflow-bundle/src/test/scala/cn/piflow/bundle/ftp/UploadToFtp.scala rename to piflow-bundle/src/test/scala/cn/piflow/bundle/ftp/UploadToFtpTest.scala index 08513d6..b47b5aa 100644 --- a/piflow-bundle/src/test/scala/cn/piflow/bundle/ftp/UploadToFtp.scala +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/ftp/UploadToFtpTest.scala @@ -3,13 +3,14 @@ package cn.piflow.bundle.ftp import cn.piflow.Runner import cn.piflow.conf.bean.FlowBean import cn.piflow.conf.util.{FileUtil, OptionUtil} +import cn.piflow.util.PropertyUtil import org.apache.spark.sql.SparkSession import org.h2.tools.Server import org.junit.Test import scala.util.parsing.json.JSON -class UploadToFtp { +class UploadToFtpTest { @Test def testFlow(): Unit ={ @@ -33,7 +34,7 @@ class UploadToFtp { .config("spark.driver.memory", "1g") .config("spark.executor.memory", "2g") .config("spark.cores.max", "2") - .config("hive.metastore.uris", "thrift://192.168.3.140:9083") + .config("hive.metastore.uris", PropertyUtil.getPropertyValue("hive.metastore.uris")) .enableHiveSupport() .getOrCreate() diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/graphx/Graphx.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/graphx/LabelPropagationTest.scala similarity index 98% rename from piflow-bundle/src/test/scala/cn/piflow/bundle/graphx/Graphx.scala rename to piflow-bundle/src/test/scala/cn/piflow/bundle/graphx/LabelPropagationTest.scala index c829fe4..abf9c07 100644 --- a/piflow-bundle/src/test/scala/cn/piflow/bundle/graphx/Graphx.scala +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/graphx/LabelPropagationTest.scala @@ -12,7 +12,7 @@ import org.junit.Test import scala.util.parsing.json.JSON -class Graphx { +class LabelPropagationTest { @Test def testFlow(): Unit ={