Duplicate stop
This commit is contained in:
parent
40f277bb30
commit
f186ab3cec
|
@ -21,7 +21,7 @@
|
|||
"header": "true",
|
||||
"delimiter":",",
|
||||
"partition":"1",
|
||||
"saveMode": "ErrorIfExists"
|
||||
"saveMode": "error"
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
{
|
||||
"flow":{
|
||||
"name":"test",
|
||||
"uuid":"1234",
|
||||
"stops":[
|
||||
{
|
||||
"uuid":"0000",
|
||||
"name":"LoadGraph",
|
||||
"bundle":"cn.piflow.bundle.graphx.LoadGraph",
|
||||
"properties":{
|
||||
"dataPath":"hdfs://192.168.3.138:8020/work/test/test.csv"
|
||||
}
|
||||
|
||||
},
|
||||
{
|
||||
"uuid":"1111",
|
||||
"name":"LabelPropagation",
|
||||
"bundle":"cn.piflow.bundle.graphx.LabelPropagation",
|
||||
"properties":{
|
||||
"maxIter":"20"
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
],
|
||||
"paths":[
|
||||
{
|
||||
"from":"LoadGraph",
|
||||
"outport":"edges",
|
||||
"inport":"edgesIn",
|
||||
"to":"LabelPropagation"
|
||||
},
|
||||
{
|
||||
"from":"LoadGraph",
|
||||
"outport":"vertex",
|
||||
"inport":"vertexIn",
|
||||
"to":"LabelPropagation"
|
||||
}
|
||||
|
||||
]
|
||||
}
|
||||
}
|
|
@ -57,7 +57,7 @@ class SelectField extends ConfigurableStop {
|
|||
}
|
||||
|
||||
override def getGroup(): List[String] = {
|
||||
List(StopGroup.CommonGroup.toString)
|
||||
List(StopGroup.CommonGroup)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -30,7 +30,7 @@ class CsvSave extends ConfigurableStop{
|
|||
|
||||
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
|
||||
|
||||
val saveModeOption = Set("append","overwrite","ErrorIfExists","ignore")
|
||||
val saveModeOption = Set("append","overwrite","error","ignore")
|
||||
var descriptor : List[PropertyDescriptor] = List()
|
||||
|
||||
val csvSavePath = new PropertyDescriptor()
|
||||
|
|
|
@ -11,8 +11,8 @@ import org.apache.spark.sql.{DataFrame, Row, SparkSession}
|
|||
|
||||
class FolderCsvParser extends ConfigurableStop{
|
||||
override val authorEmail: String = "yangqidong@cnic.cn"
|
||||
val inportList: List[String] = List(Port.NonePort.toString)
|
||||
val outportList: List[String] = List(Port.DefaultPort.toString)
|
||||
val inportList: List[String] = List(Port.NonePort)
|
||||
val outportList: List[String] = List(Port.DefaultPort)
|
||||
override val description: String = "Parse csv folder"
|
||||
|
||||
var FolderPath:String=_
|
||||
|
|
|
@ -10,7 +10,7 @@ class QueryElasticsearch extends ConfigurableStop {
|
|||
|
||||
val authorEmail: String = "ygang@cnic.cn"
|
||||
val description: String = "Query data from Elasticsearch"
|
||||
val inportList: List[String] = List(Port.NonePort)
|
||||
val inportList: List[String] = List(Port.DefaultPort)
|
||||
val outportList: List[String] = List(Port.DefaultPort)
|
||||
|
||||
var es_nodes : String = _
|
||||
|
|
|
@ -50,7 +50,8 @@ class GetFile extends ConfigurableStop{
|
|||
.description("Server IP where the local file is located")
|
||||
.defaultValue("")
|
||||
.required(true)
|
||||
.description("192.168.3.139")
|
||||
.example("192.168.3.139")
|
||||
descriptor = IP :: descriptor
|
||||
|
||||
val User = new PropertyDescriptor()
|
||||
.name("User")
|
||||
|
@ -59,6 +60,7 @@ class GetFile extends ConfigurableStop{
|
|||
.defaultValue("root")
|
||||
.required(true)
|
||||
.example("root")
|
||||
descriptor = User :: descriptor
|
||||
|
||||
val PassWord = new PropertyDescriptor()
|
||||
.name("PassWord")
|
||||
|
@ -67,7 +69,7 @@ class GetFile extends ConfigurableStop{
|
|||
.defaultValue("")
|
||||
.required(true)
|
||||
.example("123456")
|
||||
|
||||
descriptor = PassWord :: descriptor
|
||||
|
||||
val hdfsFile = new PropertyDescriptor()
|
||||
.name("hdfsFile")
|
||||
|
@ -75,23 +77,18 @@ class GetFile extends ConfigurableStop{
|
|||
.description("path to file on hdfs")
|
||||
.required(true)
|
||||
.example("/work/test.csv")
|
||||
|
||||
descriptor = hdfsFile :: descriptor
|
||||
|
||||
val localPath = new PropertyDescriptor()
|
||||
.name("localPath")
|
||||
.displayName("localPath")
|
||||
.displayName("LocalPath")
|
||||
.description("Local folder")
|
||||
.defaultValue("")
|
||||
.required(true)
|
||||
.example("/opt/")
|
||||
|
||||
|
||||
|
||||
descriptor = IP :: descriptor
|
||||
descriptor = User :: descriptor
|
||||
descriptor = PassWord :: descriptor
|
||||
descriptor = localPath :: descriptor
|
||||
descriptor = hdfsFile :: descriptor
|
||||
|
||||
|
||||
descriptor
|
||||
}
|
||||
|
||||
|
|
|
@ -51,7 +51,13 @@ class LabelPropagation extends ConfigurableStop {
|
|||
|
||||
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
|
||||
var descriptor : List[PropertyDescriptor] = List()
|
||||
val maxIter = new PropertyDescriptor().name("maxIter").displayName("MAX_ITER").defaultValue("").allowableValues(Set("")).required(false)
|
||||
val maxIter = new PropertyDescriptor()
|
||||
.name("maxIter")
|
||||
.displayName("MAX_ITER")
|
||||
.defaultValue("")
|
||||
.allowableValues(Set(""))
|
||||
.required(false)
|
||||
.example("20")
|
||||
descriptor = maxIter :: descriptor
|
||||
descriptor
|
||||
}
|
||||
|
@ -61,7 +67,7 @@ class LabelPropagation extends ConfigurableStop {
|
|||
}
|
||||
|
||||
override def getGroup(): List[String] = {
|
||||
List(StopGroup.GraphX.toString)
|
||||
List(StopGroup.GraphX)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -15,7 +15,6 @@ class LoadGraph extends ConfigurableStop {
|
|||
|
||||
var edgePort : String = "edges"
|
||||
var vertexPort : String = "vertex"
|
||||
|
||||
val outportList: List[String] = List(edgePort,vertexPort)
|
||||
|
||||
|
||||
|
@ -26,13 +25,13 @@ class LoadGraph extends ConfigurableStop {
|
|||
val sc=spark.sparkContext
|
||||
|
||||
import spark.sqlContext.implicits._
|
||||
var graph=GraphLoader.edgeListFile(sc,dataPath,true).partitionBy(PartitionStrategy.RandomVertexCut)
|
||||
//val df=Seq((graphx.edges.to,graphx.vertices)).toDF()
|
||||
var graph=GraphLoader
|
||||
.edgeListFile(sc,dataPath,true)
|
||||
.partitionBy(PartitionStrategy.RandomVertexCut)
|
||||
//TODO:can not transfer EdgeRdd to Dataset
|
||||
out.write(edgePort,graph.edges.toDF())
|
||||
out.write(vertexPort,graph.vertices.toDF())
|
||||
|
||||
//df.show()
|
||||
}
|
||||
|
||||
def initialize(ctx: ProcessContext): Unit = {
|
||||
|
@ -45,7 +44,13 @@ class LoadGraph extends ConfigurableStop {
|
|||
|
||||
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
|
||||
var descriptor : List[PropertyDescriptor] = List()
|
||||
val dataPath = new PropertyDescriptor().name("dataPath").displayName("DATA_PATH").defaultValue("").allowableValues(Set("")).required(true)
|
||||
val dataPath = new PropertyDescriptor()
|
||||
.name("dataPath")
|
||||
.displayName("Data_Path")
|
||||
.defaultValue("")
|
||||
.allowableValues(Set(""))
|
||||
.required(true)
|
||||
.example("hdfs://192.168.3.138:8020/work/test/test.csv")
|
||||
descriptor = dataPath :: descriptor
|
||||
descriptor
|
||||
}
|
||||
|
|
|
@ -12,15 +12,16 @@ import org.apache.hadoop.fs.Path
|
|||
|
||||
|
||||
class DeleteHdfs extends ConfigurableStop{
|
||||
override val authorEmail: String = "ygang@cnic.com"
|
||||
|
||||
override val inportList: List[String] = List(Port.DefaultPort.toString)
|
||||
override val outportList: List[String] = List(Port.DefaultPort.toString)
|
||||
override val description: String = "Delete file or directory on hdfs"
|
||||
override val authorEmail: String = "ygang@cnic.com"
|
||||
override val inportList: List[String] = List(Port.DefaultPort)
|
||||
override val outportList: List[String] = List(Port.DefaultPort)
|
||||
override val description: String = "Delete files or directories on HDFS"
|
||||
|
||||
var hdfsUrl :String= _
|
||||
var hdfsPath :String = _
|
||||
var isCustomize:String=_
|
||||
|
||||
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
|
||||
|
||||
val spark = pec.get[SparkSession]()
|
||||
|
@ -89,14 +90,14 @@ class DeleteHdfs extends ConfigurableStop{
|
|||
|
||||
val isCustomize = new PropertyDescriptor()
|
||||
.name("isCustomize")
|
||||
.displayName("isCustomize")
|
||||
.description("Whether to customize the compressed file path, if true, " +
|
||||
"you must specify the path where the compressed file is located . " +
|
||||
"If it is false, it will automatically find the file path data from the upstream port ")
|
||||
.displayName("IsCustomize")
|
||||
.description("Whether to customize the compressed file path, if true," +
|
||||
"you must specify the path where the compressed file is located." +
|
||||
"If false,automatically find the file path data from the upstream port")
|
||||
.defaultValue("true")
|
||||
.allowableValues(Set("true","false"))
|
||||
.required(true)
|
||||
.example("true")
|
||||
.example("true")
|
||||
descriptor = isCustomize :: descriptor
|
||||
|
||||
|
||||
|
@ -108,7 +109,7 @@ class DeleteHdfs extends ConfigurableStop{
|
|||
}
|
||||
|
||||
override def getGroup(): List[String] = {
|
||||
List(StopGroup.HdfsGroup.toString)
|
||||
List(StopGroup.HdfsGroup)
|
||||
}
|
||||
|
||||
override def initialize(ctx: ProcessContext): Unit = {
|
||||
|
|
|
@ -14,11 +14,11 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType}
|
|||
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
|
||||
|
||||
class FileDownHdfs extends ConfigurableStop{
|
||||
|
||||
val authorEmail: String = "yangqidong@cnic.cn"
|
||||
val description: String = "Download the data from the url to HDFS"
|
||||
val inportList: List[String] = List(Port.DefaultPort.toString)
|
||||
val outportList: List[String] = List(Port.DefaultPort.toString)
|
||||
|
||||
val inportList: List[String] = List(Port.DefaultPort)
|
||||
val outportList: List[String] = List(Port.DefaultPort)
|
||||
|
||||
var hdfsUrl:String =_
|
||||
var hdfsPath:String =_
|
||||
|
@ -38,13 +38,11 @@ class FileDownHdfs extends ConfigurableStop{
|
|||
|
||||
val configuration: Configuration = new Configuration()
|
||||
|
||||
|
||||
configuration.set("fs.defaultFS",hdfsUrl)
|
||||
|
||||
val fs = FileSystem.get(configuration)
|
||||
val fdos: FSDataOutputStream = fs.create(new Path(hdfsUrl+hdfsPath))
|
||||
|
||||
|
||||
while(((byteRead=inputStream.read(buffer)) != -1) && (byteRead != -1)){
|
||||
fdos.write(buffer,0,byteRead)
|
||||
fdos.flush()
|
||||
|
@ -63,7 +61,6 @@ class FileDownHdfs extends ConfigurableStop{
|
|||
|
||||
out.write(df)
|
||||
|
||||
|
||||
}
|
||||
|
||||
def initialize(ctx: ProcessContext): Unit = {
|
||||
|
@ -79,22 +76,22 @@ class FileDownHdfs extends ConfigurableStop{
|
|||
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
|
||||
var descriptor : List[PropertyDescriptor] = List()
|
||||
|
||||
|
||||
val url_str = new PropertyDescriptor()
|
||||
.name("url_str")
|
||||
.displayName("Url_str")
|
||||
.displayName("Url_Str")
|
||||
.description("Network address of file")
|
||||
.defaultValue("")
|
||||
.required(true)
|
||||
descriptor = url_str :: descriptor
|
||||
|
||||
val hdfsPath = new PropertyDescriptor()
|
||||
.name("hdfsDirPath")
|
||||
.displayName("HdfsDirPath")
|
||||
.name("hdfsPath")
|
||||
.displayName("HdfsPath")
|
||||
.defaultValue("")
|
||||
.description("File dir path of HDFS")
|
||||
.description("File path of HDFS")
|
||||
.required(true)
|
||||
.example("/work/test.gz")
|
||||
|
||||
descriptor = hdfsPath :: descriptor
|
||||
|
||||
val hdfsUrl = new PropertyDescriptor()
|
||||
.name("hdfsUrl")
|
||||
|
@ -103,11 +100,8 @@ class FileDownHdfs extends ConfigurableStop{
|
|||
.description("URL address of HDFS")
|
||||
.required(true)
|
||||
.example("hdfs://192.168.3.138:8020")
|
||||
|
||||
|
||||
descriptor = url_str :: descriptor
|
||||
descriptor = hdfsUrl :: descriptor
|
||||
descriptor = hdfsPath :: descriptor
|
||||
|
||||
descriptor
|
||||
}
|
||||
|
||||
|
@ -116,7 +110,7 @@ class FileDownHdfs extends ConfigurableStop{
|
|||
}
|
||||
|
||||
override def getGroup(): List[String] = {
|
||||
List(StopGroup.HdfsGroup.toString)
|
||||
List(StopGroup.HdfsGroup)
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -10,8 +10,8 @@ import org.apache.spark.sql.SparkSession
|
|||
class GetHdfs extends ConfigurableStop{
|
||||
override val authorEmail: String = "ygang@cnic.com"
|
||||
override val description: String = "Get data from hdfs"
|
||||
override val inportList: List[String] = List(Port.DefaultPort.toString)
|
||||
override val outportList: List[String] = List(Port.DefaultPort.toString)
|
||||
override val inportList: List[String] = List(Port.DefaultPort)
|
||||
override val outportList: List[String] = List(Port.DefaultPort)
|
||||
|
||||
var hdfsUrl : String=_
|
||||
var hdfsPath :String= _
|
||||
|
@ -26,24 +26,20 @@ class GetHdfs extends ConfigurableStop{
|
|||
|
||||
if (types == "json") {
|
||||
val rdd = spark.read.json(path)
|
||||
//rdd.show()
|
||||
rdd.schema.printTreeString()
|
||||
out.write(rdd)
|
||||
|
||||
} else if (types == "csv") {
|
||||
|
||||
val rdd = spark.read.csv(path)
|
||||
//rdd.show()
|
||||
rdd.schema.printTreeString()
|
||||
out.write(rdd)
|
||||
|
||||
}else if (types == "parquet") {
|
||||
val rdd = spark.read.csv(path)
|
||||
//rdd.show()
|
||||
rdd.schema.printTreeString()
|
||||
out.write(rdd)
|
||||
}
|
||||
else {
|
||||
|
||||
}else {
|
||||
val rdd = sc.textFile(path)
|
||||
val outDf = rdd.toDF()
|
||||
outDf.schema.printTreeString()
|
||||
|
@ -79,13 +75,12 @@ class GetHdfs extends ConfigurableStop{
|
|||
|
||||
val types = new PropertyDescriptor().
|
||||
name("types")
|
||||
.displayName("types")
|
||||
.displayName("Types")
|
||||
.description("The type of file you want to load")
|
||||
.defaultValue("csv")
|
||||
.allowableValues(Set("txt","parquet","csv","json"))
|
||||
.required(true)
|
||||
.example("csv")
|
||||
|
||||
descriptor = types :: descriptor
|
||||
|
||||
descriptor
|
||||
|
@ -96,7 +91,7 @@ class GetHdfs extends ConfigurableStop{
|
|||
}
|
||||
|
||||
override def getGroup(): List[String] = {
|
||||
List(StopGroup.HdfsGroup.toString)
|
||||
List(StopGroup.HdfsGroup)
|
||||
}
|
||||
|
||||
override def initialize(ctx: ProcessContext): Unit = {
|
||||
|
|
|
@ -15,15 +15,16 @@ import scala.collection.mutable.ArrayBuffer
|
|||
|
||||
|
||||
class ListHdfs extends ConfigurableStop{
|
||||
override val authorEmail: String = "ygang@cnic.com"
|
||||
|
||||
override val authorEmail: String = "ygang@cnic.com"
|
||||
override val description: String = "Retrieve a list of files from hdfs"
|
||||
override val inportList: List[String] = List(Port.DefaultPort.toString)
|
||||
override val outportList: List[String] = List(Port.DefaultPort.toString)
|
||||
override val description: String = "Retrieve a list of files from hdfs"
|
||||
|
||||
var hdfsPath :String= _
|
||||
var hdfsUrl :String= _
|
||||
var pathARR:ArrayBuffer[String]=ArrayBuffer()
|
||||
|
||||
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
|
||||
val spark = pec.get[SparkSession]()
|
||||
val sc = spark.sparkContext
|
||||
|
@ -58,7 +59,6 @@ class ListHdfs extends ConfigurableStop{
|
|||
for (f <- statuses) {
|
||||
val fsPath = f.getPath().toString
|
||||
if (f.isDirectory) {
|
||||
// pathARR += fsPath
|
||||
iterationFile(fsPath)
|
||||
} else{
|
||||
pathARR += f.getPath.toString
|
||||
|
@ -100,7 +100,7 @@ class ListHdfs extends ConfigurableStop{
|
|||
}
|
||||
|
||||
override def getGroup(): List[String] = {
|
||||
List(StopGroup.HdfsGroup.toString)
|
||||
List(StopGroup.HdfsGroup)
|
||||
}
|
||||
|
||||
override def initialize(ctx: ProcessContext): Unit = {
|
||||
|
|
|
@ -10,10 +10,11 @@ import org.apache.hadoop.fs.FileSystem
|
|||
import org.apache.spark.sql.SparkSession
|
||||
|
||||
class PutHdfs extends ConfigurableStop{
|
||||
|
||||
override val authorEmail: String = "ygang@cnic.com"
|
||||
override val inportList: List[String] = List(Port.DefaultPort.toString)
|
||||
override val outportList: List[String] = List(Port.DefaultPort.toString)
|
||||
override val description: String = "Put data to hdfs"
|
||||
override val description: String = "Put data into hdfs"
|
||||
override val inportList: List[String] = List(Port.DefaultPort)
|
||||
override val outportList: List[String] = List(Port.DefaultPort)
|
||||
|
||||
var hdfsPath :String= _
|
||||
var hdfsUrl :String= _
|
||||
|
@ -54,7 +55,7 @@ class PutHdfs extends ConfigurableStop{
|
|||
.description("File path of HDFS")
|
||||
.required(true)
|
||||
.example("/work/")
|
||||
|
||||
descriptor = hdfsPath :: descriptor
|
||||
|
||||
val hdfsUrl = new PropertyDescriptor()
|
||||
.name("hdfsUrl")
|
||||
|
@ -63,29 +64,27 @@ class PutHdfs extends ConfigurableStop{
|
|||
.description("URL address of HDFS")
|
||||
.required(true)
|
||||
.example("hdfs://192.168.3.138:8020")
|
||||
|
||||
descriptor = hdfsUrl :: descriptor
|
||||
|
||||
val types = new PropertyDescriptor()
|
||||
.name("types")
|
||||
.displayName("Types")
|
||||
.description("What format do you want to write : json,csv,parquet")
|
||||
.description("The format you want to write is json,csv,parquet")
|
||||
.defaultValue("csv")
|
||||
.allowableValues(Set("json","csv","parquet"))
|
||||
.required(true)
|
||||
.example("csv")
|
||||
descriptor = types :: descriptor
|
||||
|
||||
val partition = new PropertyDescriptor()
|
||||
.name("partition")
|
||||
.displayName("Partition")
|
||||
.description("Write a few partitions")
|
||||
.description("Write several partitions")
|
||||
.defaultValue("1")
|
||||
.required(true)
|
||||
.example("1")
|
||||
|
||||
descriptor = partition :: descriptor
|
||||
descriptor = types :: descriptor
|
||||
descriptor = hdfsPath :: descriptor
|
||||
descriptor = hdfsUrl :: descriptor
|
||||
|
||||
descriptor
|
||||
}
|
||||
|
||||
|
@ -94,7 +93,7 @@ class PutHdfs extends ConfigurableStop{
|
|||
}
|
||||
|
||||
override def getGroup(): List[String] = {
|
||||
List(StopGroup.HdfsGroup.toString)
|
||||
List(StopGroup.HdfsGroup)
|
||||
}
|
||||
|
||||
override def initialize(ctx: ProcessContext): Unit = {
|
||||
|
|
|
@ -15,11 +15,10 @@ import scala.collection.mutable.ArrayBuffer
|
|||
|
||||
class SaveToHdfs extends ConfigurableStop {
|
||||
|
||||
override val description: String = "Put data to hdfs "
|
||||
val authorEmail: String = "ygang@cnic.cn"
|
||||
|
||||
override val inportList: List[String] = List(Port.DefaultPort.toString)
|
||||
override val outportList: List[String] = List(Port.DefaultPort.toString)
|
||||
override val description: String = "Put data into hdfs "
|
||||
override val inportList: List[String] = List(Port.DefaultPort)
|
||||
override val outportList: List[String] = List(Port.DefaultPort)
|
||||
|
||||
var hdfsDirPath :String= _
|
||||
var hdfsUrl :String= _
|
||||
|
@ -57,7 +56,6 @@ class SaveToHdfs extends ConfigurableStop {
|
|||
inDF.repartition(1).write.text(hdfsDir)
|
||||
}
|
||||
|
||||
|
||||
iterationFile(hdfsDir)
|
||||
|
||||
val oldPath = new Path(oldFilePath)
|
||||
|
@ -79,7 +77,6 @@ class SaveToHdfs extends ConfigurableStop {
|
|||
val outDF: DataFrame = spark.createDataFrame(rowRDD,schema)
|
||||
|
||||
out.write(outDF)
|
||||
|
||||
}
|
||||
|
||||
// recursively traverse the folder
|
||||
|
@ -96,7 +93,6 @@ class SaveToHdfs extends ConfigurableStop {
|
|||
for (f <- statuses) {
|
||||
val fsPath = f.getPath().toString
|
||||
if (f.isDirectory) {
|
||||
// pathARR += fsPath
|
||||
iterationFile(fsPath)
|
||||
} else{
|
||||
if (f.getPath.toString.contains("part")){
|
||||
|
@ -133,7 +129,7 @@ class SaveToHdfs extends ConfigurableStop {
|
|||
.description("File dir path of HDFS")
|
||||
.required(true)
|
||||
.example("/work/")
|
||||
|
||||
descriptor = hdfsDirPath :: descriptor
|
||||
|
||||
val hdfsUrl = new PropertyDescriptor()
|
||||
.name("hdfsUrl")
|
||||
|
@ -142,46 +138,43 @@ class SaveToHdfs extends ConfigurableStop {
|
|||
.description("URL address of HDFS")
|
||||
.required(true)
|
||||
.example("hdfs://192.168.3.138:8020")
|
||||
|
||||
descriptor = hdfsUrl :: descriptor
|
||||
|
||||
val fileName = new PropertyDescriptor()
|
||||
.name("fileName")
|
||||
.displayName("fileName")
|
||||
.displayName("FileName")
|
||||
.description("File name")
|
||||
.defaultValue("")
|
||||
.required(true)
|
||||
.example("test.csv")
|
||||
descriptor = fileName :: descriptor
|
||||
|
||||
val types = new PropertyDescriptor()
|
||||
.name("types")
|
||||
.displayName("json,csv,text")
|
||||
.description("What format do you want to write : json,csv,parquet")
|
||||
.description("The format you want to write is json,csv,parquet")
|
||||
.defaultValue("csv")
|
||||
.allowableValues(Set("json","csv","text"))
|
||||
.required(true)
|
||||
.example("csv")
|
||||
descriptor = types :: descriptor
|
||||
|
||||
val delimiter = new PropertyDescriptor()
|
||||
.name("delimiter")
|
||||
.displayName("delimiter")
|
||||
.description("The delimiter of csv file,types is csv ,please set it ")
|
||||
.description("Please set the separator for the type of csv file")
|
||||
.defaultValue(",")
|
||||
.required(true)
|
||||
.example(",")
|
||||
descriptor = delimiter :: descriptor
|
||||
|
||||
//header
|
||||
val header = new PropertyDescriptor()
|
||||
.name("header")
|
||||
.displayName("header")
|
||||
.description("Whether the csv file have header or not")
|
||||
.description("Does the csv file have a header")
|
||||
.defaultValue("true")
|
||||
.required(true)
|
||||
|
||||
descriptor = header :: descriptor
|
||||
descriptor = fileName :: descriptor
|
||||
descriptor = delimiter :: descriptor
|
||||
descriptor = hdfsDirPath :: descriptor
|
||||
descriptor = hdfsUrl :: descriptor
|
||||
descriptor = types :: descriptor
|
||||
descriptor
|
||||
}
|
||||
|
||||
|
@ -190,7 +183,7 @@ class SaveToHdfs extends ConfigurableStop {
|
|||
}
|
||||
|
||||
override def getGroup(): List[String] = {
|
||||
List(StopGroup.HdfsGroup.toString)
|
||||
List(StopGroup.HdfsGroup)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -17,8 +17,8 @@ import scala.collection.mutable.ArrayBuffer
|
|||
class SelectFilesByName extends ConfigurableStop{
|
||||
override val authorEmail: String = "yangqidong@cnic.cn"
|
||||
override val description: String = "Select files by file name"
|
||||
override val inportList: List[String] = List(Port.DefaultPort.toString)
|
||||
override val outportList: List[String] = List(Port.DefaultPort.toString)
|
||||
override val inportList: List[String] = List(Port.DefaultPort)
|
||||
override val outportList: List[String] = List(Port.DefaultPort)
|
||||
|
||||
var hdfsUrl:String=_
|
||||
var hdfsPath:String=_
|
||||
|
@ -88,7 +88,7 @@ class SelectFilesByName extends ConfigurableStop{
|
|||
.description("File path of HDFS")
|
||||
.required(true)
|
||||
.example("/work/")
|
||||
|
||||
descriptor = hdfsPath :: descriptor
|
||||
|
||||
val hdfsUrl = new PropertyDescriptor()
|
||||
.name("hdfsUrl")
|
||||
|
@ -97,18 +97,17 @@ class SelectFilesByName extends ConfigurableStop{
|
|||
.description("URL address of HDFS")
|
||||
.required(true)
|
||||
.example("hdfs://192.168.3.138:8020")
|
||||
descriptor = hdfsUrl :: descriptor
|
||||
|
||||
val selectionConditions = new PropertyDescriptor()
|
||||
.name("selectionConditions")
|
||||
.displayName("SelectionConditions")
|
||||
.description("To select conditions, you need to fill in regular expressions in java, such as '.*.csv'")
|
||||
.description("To select a conditions,a regular expression needs to be populated in java")
|
||||
.defaultValue("")
|
||||
.required(true)
|
||||
.example("")
|
||||
|
||||
descriptor = hdfsUrl :: descriptor
|
||||
descriptor = hdfsPath :: descriptor
|
||||
.example(".*.csv")
|
||||
descriptor = selectionConditions :: descriptor
|
||||
|
||||
descriptor
|
||||
}
|
||||
|
||||
|
|
|
@ -19,8 +19,8 @@ import scala.collection.mutable.ArrayBuffer
|
|||
class UnzipFilesOnHDFS extends ConfigurableStop {
|
||||
val authorEmail: String = "yangqidong@cnic.cn"
|
||||
val description: String = "Unzip files on hdfs"
|
||||
val inportList: List[String] = List(Port.DefaultPort.toString)
|
||||
val outportList: List[String] = List(Port.DefaultPort.toString)
|
||||
val inportList: List[String] = List(Port.DefaultPort)
|
||||
val outportList: List[String] = List(Port.DefaultPort)
|
||||
|
||||
var isCustomize:String=_
|
||||
var hdfsUrl:String=_
|
||||
|
@ -30,12 +30,10 @@ class UnzipFilesOnHDFS extends ConfigurableStop {
|
|||
var session: SparkSession = null
|
||||
var arr:ArrayBuffer[Row]=ArrayBuffer()
|
||||
|
||||
|
||||
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
|
||||
|
||||
session = pec.get[SparkSession]()
|
||||
|
||||
|
||||
if(isCustomize.equals("true")){
|
||||
|
||||
unzipFile(hdfsUrl+filePath,savePath)
|
||||
|
@ -55,10 +53,8 @@ class UnzipFilesOnHDFS extends ConfigurableStop {
|
|||
val df: DataFrame = session.createDataFrame(rdd,schema)
|
||||
|
||||
out.write(df)
|
||||
|
||||
}
|
||||
|
||||
|
||||
def whatType(p:String): String = {
|
||||
var typeStr:String=""
|
||||
val pathNames: Array[String] = p.split("\\.")
|
||||
|
@ -197,7 +193,6 @@ class UnzipFilesOnHDFS extends ConfigurableStop {
|
|||
.example("hdfs://192.168.3.138:8020")
|
||||
|
||||
|
||||
|
||||
val savePath = new PropertyDescriptor()
|
||||
.name("savePath")
|
||||
.displayName("savePath")
|
||||
|
|
|
@ -3,6 +3,7 @@ package cn.piflow.bundle.clean
|
|||
import cn.piflow.Runner
|
||||
import cn.piflow.conf.bean.FlowBean
|
||||
import cn.piflow.conf.util.{FileUtil, OptionUtil}
|
||||
import cn.piflow.util.PropertyUtil
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.h2.tools.Server
|
||||
import org.junit.Test
|
||||
|
@ -33,7 +34,7 @@ class EmailCleanTest {
|
|||
.config("spark.driver.memory", "1g")
|
||||
.config("spark.executor.memory", "2g")
|
||||
.config("spark.cores.max", "2")
|
||||
.config("hive.metastore.uris", "thrift://192.168.3.140:9083")
|
||||
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
|
||||
.enableHiveSupport()
|
||||
.getOrCreate()
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@ package cn.piflow.bundle.clean
|
|||
import cn.piflow.Runner
|
||||
import cn.piflow.conf.bean.FlowBean
|
||||
import cn.piflow.conf.util.{FileUtil, OptionUtil}
|
||||
import cn.piflow.util.PropertyUtil
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.h2.tools.Server
|
||||
import org.junit.Test
|
||||
|
@ -33,7 +34,7 @@ class IdentityNumberCleanTest {
|
|||
.config("spark.driver.memory", "1g")
|
||||
.config("spark.executor.memory", "2g")
|
||||
.config("spark.cores.max", "2")
|
||||
.config("hive.metastore.uris", "thrift://192.168.3.140:9083")
|
||||
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
|
||||
.enableHiveSupport()
|
||||
.getOrCreate()
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@ package cn.piflow.bundle.clean
|
|||
import cn.piflow.Runner
|
||||
import cn.piflow.conf.bean.FlowBean
|
||||
import cn.piflow.conf.util.{FileUtil, OptionUtil}
|
||||
import cn.piflow.util.PropertyUtil
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.h2.tools.Server
|
||||
import org.junit.Test
|
||||
|
@ -33,7 +34,7 @@ class PhoneNumberCleanTest {
|
|||
.config("spark.driver.memory", "1g")
|
||||
.config("spark.executor.memory", "2g")
|
||||
.config("spark.cores.max", "2")
|
||||
.config("hive.metastore.uris", "thrift://192.168.3.140:9083")
|
||||
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
|
||||
.enableHiveSupport()
|
||||
.getOrCreate()
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@ package cn.piflow.bundle.clean
|
|||
import cn.piflow.Runner
|
||||
import cn.piflow.conf.bean.FlowBean
|
||||
import cn.piflow.conf.util.{FileUtil, OptionUtil}
|
||||
import cn.piflow.util.PropertyUtil
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.h2.tools.Server
|
||||
import org.junit.Test
|
||||
|
@ -33,7 +34,7 @@ class ProvinceCleanTest {
|
|||
.config("spark.driver.memory", "1g")
|
||||
.config("spark.executor.memory", "2g")
|
||||
.config("spark.cores.max", "2")
|
||||
.config("hive.metastore.uris", "thrift://192.168.3.140:9083")
|
||||
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
|
||||
.enableHiveSupport()
|
||||
.getOrCreate()
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@ package cn.piflow.bundle.clean
|
|||
import cn.piflow.Runner
|
||||
import cn.piflow.conf.bean.FlowBean
|
||||
import cn.piflow.conf.util.{FileUtil, OptionUtil}
|
||||
import cn.piflow.util.PropertyUtil
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.h2.tools.Server
|
||||
import org.junit.Test
|
||||
|
@ -33,7 +34,7 @@ class TitleCleanTest {
|
|||
.config("spark.driver.memory", "1g")
|
||||
.config("spark.executor.memory", "2g")
|
||||
.config("spark.cores.max", "2")
|
||||
.config("hive.metastore.uris", "thrift://192.168.3.140:9083")
|
||||
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
|
||||
.enableHiveSupport()
|
||||
.getOrCreate()
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@ package cn.piflow.bundle.csv
|
|||
import cn.piflow.Runner
|
||||
import cn.piflow.conf.bean.FlowBean
|
||||
import cn.piflow.conf.util.{FileUtil, OptionUtil}
|
||||
import cn.piflow.util.PropertyUtil
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.h2.tools.Server
|
||||
import org.junit.Test
|
||||
|
@ -33,7 +34,7 @@ class CsvParserTest {
|
|||
.config("spark.driver.memory", "1g")
|
||||
.config("spark.executor.memory", "2g")
|
||||
.config("spark.cores.max", "2")
|
||||
.config("hive.metastore.uris", "thrift://192.168.3.140:9083")
|
||||
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
|
||||
.enableHiveSupport()
|
||||
.getOrCreate()
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@ package cn.piflow.bundle.csv
|
|||
import cn.piflow.bundle.json.JsonSave
|
||||
import cn.piflow.conf.bean.FlowBean
|
||||
import cn.piflow.conf.util.{FileUtil, OptionUtil}
|
||||
import cn.piflow.util.PropertyUtil
|
||||
import cn.piflow.{FlowImpl, Path, Runner}
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.h2.tools.Server
|
||||
|
@ -33,7 +34,7 @@ class CsvSaveAsAppendTest {
|
|||
.config("spark.driver.memory", "1g")
|
||||
.config("spark.executor.memory", "2g")
|
||||
.config("spark.cores.max", "2")
|
||||
.config("hive.metastore.uris", "thrift://192.168.3.140:9083")
|
||||
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
|
||||
.enableHiveSupport()
|
||||
.getOrCreate()
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@ package cn.piflow.bundle.csv
|
|||
import cn.piflow.Runner
|
||||
import cn.piflow.conf.bean.FlowBean
|
||||
import cn.piflow.conf.util.{FileUtil, OptionUtil}
|
||||
import cn.piflow.util.PropertyUtil
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.h2.tools.Server
|
||||
import org.junit.Test
|
||||
|
@ -32,7 +33,7 @@ class CsvSaveAsErrorTest {
|
|||
.config("spark.driver.memory", "1g")
|
||||
.config("spark.executor.memory", "2g")
|
||||
.config("spark.cores.max", "2")
|
||||
.config("hive.metastore.uris", "thrift://192.168.3.140:9083")
|
||||
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
|
||||
.enableHiveSupport()
|
||||
.getOrCreate()
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@ package cn.piflow.bundle.csv
|
|||
import cn.piflow.Runner
|
||||
import cn.piflow.conf.bean.FlowBean
|
||||
import cn.piflow.conf.util.{FileUtil, OptionUtil}
|
||||
import cn.piflow.util.PropertyUtil
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.h2.tools.Server
|
||||
import org.junit.Test
|
||||
|
@ -32,7 +33,7 @@ class CsvSaveAsIgnoreTest {
|
|||
.config("spark.driver.memory", "1g")
|
||||
.config("spark.executor.memory", "2g")
|
||||
.config("spark.cores.max", "2")
|
||||
.config("hive.metastore.uris", "thrift://192.168.3.140:9083")
|
||||
.config("hive.metastore.uris", PropertyUtil.getPropertyValue("hive.metastore.uris"))
|
||||
.enableHiveSupport()
|
||||
.getOrCreate()
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@ package cn.piflow.bundle.csv
|
|||
import cn.piflow.Runner
|
||||
import cn.piflow.conf.bean.FlowBean
|
||||
import cn.piflow.conf.util.{FileUtil, OptionUtil}
|
||||
import cn.piflow.util.PropertyUtil
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.h2.tools.Server
|
||||
import org.junit.Test
|
||||
|
@ -32,7 +33,7 @@ class CsvSaveAsOverwriteTest {
|
|||
.config("spark.driver.memory", "1g")
|
||||
.config("spark.executor.memory", "2g")
|
||||
.config("spark.cores.max", "2")
|
||||
.config("hive.metastore.uris", "thrift://192.168.3.140:9083")
|
||||
.config("hive.metastore.uris", PropertyUtil.getPropertyValue("hive.metastore.uris"))
|
||||
.enableHiveSupport()
|
||||
.getOrCreate()
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@ package cn.piflow.bundle.csv
|
|||
import cn.piflow.Runner
|
||||
import cn.piflow.conf.bean.FlowBean
|
||||
import cn.piflow.conf.util.{FileUtil, OptionUtil}
|
||||
import cn.piflow.util.PropertyUtil
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.h2.tools.Server
|
||||
import org.junit.Test
|
||||
|
@ -31,7 +32,7 @@ class CsvStringParserTest {
|
|||
.config("spark.driver.memory", "1g")
|
||||
.config("spark.executor.memory", "2g")
|
||||
.config("spark.cores.max", "2")
|
||||
.config("hive.metastore.uris", "thrift://192.168.3.140:9083")
|
||||
.config("hive.metastore.uris", PropertyUtil.getPropertyValue("hive.metastore.uris"))
|
||||
.enableHiveSupport()
|
||||
.getOrCreate()
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@ package cn.piflow.bundle.csv
|
|||
import cn.piflow.Runner
|
||||
import cn.piflow.conf.bean.FlowBean
|
||||
import cn.piflow.conf.util.{FileUtil, OptionUtil}
|
||||
import cn.piflow.util.PropertyUtil
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.h2.tools.Server
|
||||
import org.junit.Test
|
||||
|
@ -32,7 +33,7 @@ class FolderCsvParserTest {
|
|||
.config("spark.driver.memory", "1g")
|
||||
.config("spark.executor.memory", "2g")
|
||||
.config("spark.cores.max", "2")
|
||||
.config("hive.metastore.uris", "thrift://192.168.3.140:9083")
|
||||
.config("hive.metastore.uris", PropertyUtil.getPropertyValue("hive.metastore.uris"))
|
||||
.enableHiveSupport()
|
||||
.getOrCreate()
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@ package cn.piflow.bundle.es
|
|||
import cn.piflow.Runner
|
||||
import cn.piflow.conf.bean.FlowBean
|
||||
import cn.piflow.conf.util.{FileUtil, OptionUtil}
|
||||
import cn.piflow.util.PropertyUtil
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.h2.tools.Server
|
||||
import org.junit.Test
|
||||
|
@ -33,7 +34,7 @@ class FetchElasticsearchTest {
|
|||
.config("spark.driver.memory", "1g")
|
||||
.config("spark.executor.memory", "2g")
|
||||
.config("spark.cores.max", "2")
|
||||
.config("hive.metastore.uris", "thrift://192.168.3.140:9083")
|
||||
.config("hive.metastore.uris", PropertyUtil.getPropertyValue("hive.metastore.uris"))
|
||||
.enableHiveSupport()
|
||||
.getOrCreate()
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@ package cn.piflow.bundle.es
|
|||
import cn.piflow.Runner
|
||||
import cn.piflow.conf.bean.FlowBean
|
||||
import cn.piflow.conf.util.{FileUtil, OptionUtil}
|
||||
import cn.piflow.util.PropertyUtil
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.h2.tools.Server
|
||||
import org.junit.Test
|
||||
|
@ -33,7 +34,7 @@ class PutElasticsearchTest {
|
|||
.config("spark.driver.memory", "1g")
|
||||
.config("spark.executor.memory", "2g")
|
||||
.config("spark.cores.max", "2")
|
||||
.config("hive.metastore.uris", "thrift://192.168.3.140:9083")
|
||||
.config("hive.metastore.uris", PropertyUtil.getPropertyValue("hive.metastore.uris"))
|
||||
.enableHiveSupport()
|
||||
.getOrCreate()
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@ package cn.piflow.bundle.es
|
|||
import cn.piflow.Runner
|
||||
import cn.piflow.conf.bean.FlowBean
|
||||
import cn.piflow.conf.util.{FileUtil, OptionUtil}
|
||||
import cn.piflow.util.PropertyUtil
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.h2.tools.Server
|
||||
import org.junit.Test
|
||||
|
@ -33,7 +34,7 @@ class QueryElasticsearchTest {
|
|||
.config("spark.driver.memory", "1g")
|
||||
.config("spark.executor.memory", "2g")
|
||||
.config("spark.cores.max", "2")
|
||||
.config("hive.metastore.uris", "thrift://192.168.3.140:9083")
|
||||
.config("hive.metastore.uris", PropertyUtil.getPropertyValue("hive.metastore.uris"))
|
||||
.enableHiveSupport()
|
||||
.getOrCreate()
|
||||
|
||||
|
|
|
@ -3,13 +3,14 @@ package cn.piflow.bundle.ftp
|
|||
import cn.piflow.Runner
|
||||
import cn.piflow.conf.bean.FlowBean
|
||||
import cn.piflow.conf.util.{FileUtil, OptionUtil}
|
||||
import cn.piflow.util.PropertyUtil
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.h2.tools.Server
|
||||
import org.junit.Test
|
||||
|
||||
import scala.util.parsing.json.JSON
|
||||
|
||||
class LoadFromFtpToHDFS {
|
||||
class LoadFromFtpToHDFSTest {
|
||||
|
||||
@Test
|
||||
def testFlow(): Unit ={
|
||||
|
@ -33,7 +34,7 @@ class LoadFromFtpToHDFS {
|
|||
.config("spark.driver.memory", "1g")
|
||||
.config("spark.executor.memory", "2g")
|
||||
.config("spark.cores.max", "2")
|
||||
.config("hive.metastore.uris", "thrift://192.168.3.140:9083")
|
||||
.config("hive.metastore.uris", PropertyUtil.getPropertyValue("hive.metastore.uris"))
|
||||
.enableHiveSupport()
|
||||
.getOrCreate()
|
||||
|
|
@ -3,13 +3,14 @@ package cn.piflow.bundle.ftp
|
|||
import cn.piflow.Runner
|
||||
import cn.piflow.conf.bean.FlowBean
|
||||
import cn.piflow.conf.util.{FileUtil, OptionUtil}
|
||||
import cn.piflow.util.PropertyUtil
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.h2.tools.Server
|
||||
import org.junit.Test
|
||||
|
||||
import scala.util.parsing.json.JSON
|
||||
|
||||
class UploadToFtp {
|
||||
class UploadToFtpTest {
|
||||
|
||||
@Test
|
||||
def testFlow(): Unit ={
|
||||
|
@ -33,7 +34,7 @@ class UploadToFtp {
|
|||
.config("spark.driver.memory", "1g")
|
||||
.config("spark.executor.memory", "2g")
|
||||
.config("spark.cores.max", "2")
|
||||
.config("hive.metastore.uris", "thrift://192.168.3.140:9083")
|
||||
.config("hive.metastore.uris", PropertyUtil.getPropertyValue("hive.metastore.uris"))
|
||||
.enableHiveSupport()
|
||||
.getOrCreate()
|
||||
|
|
@ -12,7 +12,7 @@ import org.junit.Test
|
|||
|
||||
import scala.util.parsing.json.JSON
|
||||
|
||||
class Graphx {
|
||||
class LabelPropagationTest {
|
||||
|
||||
@Test
|
||||
def testFlow(): Unit ={
|
Loading…
Reference in New Issue