forked from opensci/piflow
add trim for stop's properties
This commit is contained in:
parent
5faf9d4f75
commit
fd67efcaa5
|
@ -2,12 +2,12 @@ spark.master=yarn
|
||||||
spark.deploy.mode=cluster
|
spark.deploy.mode=cluster
|
||||||
|
|
||||||
#hdfs default file system
|
#hdfs default file system
|
||||||
fs.defaultFS=hdfs://10.0.88.13:9000
|
fs.defaultFS=hdfs://10.0.86.191:9000
|
||||||
#yarn resourcemanager hostname
|
#yarn resourcemanager hostname
|
||||||
yarn.resourcemanager.hostname=10.0.88.13
|
yarn.resourcemanager.hostname=10.0.86.191
|
||||||
|
|
||||||
#if you want to use hive, set hive metastore uris
|
#if you want to use hive, set hive metastore uris
|
||||||
hive.metastore.uris=thrift://10.0.88.13:9083
|
hive.metastore.uris=thrift://10.0.86.191:9083
|
||||||
|
|
||||||
#show data in log, set 0 if you do not show the logs
|
#show data in log, set 0 if you do not show the logs
|
||||||
data.show=10
|
data.show=10
|
||||||
|
|
|
@ -33,7 +33,7 @@ class CsvParser extends ConfigurableStop{
|
||||||
|
|
||||||
}else{
|
}else{
|
||||||
|
|
||||||
val field = schema.split(",")
|
val field = schema.split(",").map(x => x.trim)
|
||||||
val structFieldArray : Array[StructField] = new Array[StructField](field.size)
|
val structFieldArray : Array[StructField] = new Array[StructField](field.size)
|
||||||
for(i <- 0 to field.size - 1){
|
for(i <- 0 to field.size - 1){
|
||||||
structFieldArray(i) = new StructField(field(i), StringType, nullable = true)
|
structFieldArray(i) = new StructField(field(i), StringType, nullable = true)
|
||||||
|
|
|
@ -25,11 +25,11 @@ class CsvStringParser extends ConfigurableStop{
|
||||||
val session: SparkSession = pec.get[SparkSession]
|
val session: SparkSession = pec.get[SparkSession]
|
||||||
val context: SparkContext = session.sparkContext
|
val context: SparkContext = session.sparkContext
|
||||||
|
|
||||||
val arrStr: Array[String] = string.split("\n")
|
val arrStr: Array[String] = string.split("\n").map(x => x.trim)
|
||||||
|
|
||||||
var num:Int=0
|
var num:Int=0
|
||||||
val listROW: List[Row] = arrStr.map(line => {
|
val listROW: List[Row] = arrStr.map(line => {
|
||||||
val seqSTR: Seq[String] = line.split(delimiter).toSeq
|
val seqSTR: Seq[String] = line.split(delimiter).map(x=>x.trim).toSeq
|
||||||
num=seqSTR.size
|
num=seqSTR.size
|
||||||
val row = Row.fromSeq(seqSTR)
|
val row = Row.fromSeq(seqSTR)
|
||||||
row
|
row
|
||||||
|
|
|
@ -30,8 +30,8 @@ class PutElasticsearch extends ConfigurableStop {
|
||||||
"es.port"->es_port)
|
"es.port"->es_port)
|
||||||
|
|
||||||
if(configuration_item.length > 0){
|
if(configuration_item.length > 0){
|
||||||
configuration_item.split(",").foreach(each =>{
|
configuration_item.split(",").map(x => x.trim).foreach(each =>{
|
||||||
options += (each.split("->")(0) -> each.split("->")(1))
|
options += (each.split("->")(0).trim -> each.split("->")(1).trim)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -207,10 +207,10 @@ class LoadFromFtpToHDFS extends ConfigurableStop{
|
||||||
if(filterByName.length == 0){
|
if(filterByName.length == 0){
|
||||||
filterByName=".*"
|
filterByName=".*"
|
||||||
}
|
}
|
||||||
filters = filterByName.split(";")
|
filters = filterByName.split(";").map(x => x.trim)
|
||||||
|
|
||||||
if(isFile.equals("true") || isFile.equals("TRUE")){
|
if(isFile.equals("true") || isFile.equals("TRUE")){
|
||||||
val fileNmae: String = ftpFile.split("/").last
|
val fileNmae: String = ftpFile.split("/").last.trim
|
||||||
val fileDir = ftpFile.replace(fileNmae,"")
|
val fileDir = ftpFile.replace(fileNmae,"")
|
||||||
copyFile(fileDir,fileNmae,HDFSPath)
|
copyFile(fileDir,fileNmae,HDFSPath)
|
||||||
}else if(isFile.equals("false") || isFile.equals("FALSE")){
|
}else if(isFile.equals("false") || isFile.equals("FALSE")){
|
||||||
|
|
|
@ -50,7 +50,7 @@ class PutHbase extends ConfigurableStop{
|
||||||
|
|
||||||
val df = in.read()
|
val df = in.read()
|
||||||
|
|
||||||
val qualifiers=qualifier.split(",")
|
val qualifiers=qualifier.split(",").map(x => x.trim)
|
||||||
|
|
||||||
df.rdd.map(row =>{
|
df.rdd.map(row =>{
|
||||||
val rowkey = nullHandle(row.getAs[String](rowid))
|
val rowkey = nullHandle(row.getAs[String](rowid))
|
||||||
|
|
|
@ -42,8 +42,8 @@ class ReadHbase extends ConfigurableStop{
|
||||||
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
|
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
|
||||||
classOf[org.apache.hadoop.hbase.client.Result])
|
classOf[org.apache.hadoop.hbase.client.Result])
|
||||||
|
|
||||||
val schema: Array[String] = qualifier.split(",")
|
val schema: Array[String] = qualifier.split(",").map(x=>x.trim)
|
||||||
val families=family.split(",")
|
val families=family.split(",").map(x=>x.trim)
|
||||||
|
|
||||||
val col_str=rowid+","+qualifier
|
val col_str=rowid+","+qualifier
|
||||||
val newSchema:Array[String]=col_str.split(",")
|
val newSchema:Array[String]=col_str.split(",")
|
||||||
|
|
|
@ -47,7 +47,7 @@ class DeleteHdfs extends ConfigurableStop{
|
||||||
})
|
})
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
val array = hdfsPath.split(",")
|
val array = hdfsPath.split(",").map(x=>x.trim)
|
||||||
|
|
||||||
for (i<- 0 until array.length){
|
for (i<- 0 until array.length){
|
||||||
val hdfsPath = hdfsUrl+"/"+array(i)
|
val hdfsPath = hdfsUrl+"/"+array(i)
|
||||||
|
|
|
@ -34,7 +34,7 @@ class SelectFilesByName extends ConfigurableStop{
|
||||||
val pathStr = each.getPath.toString
|
val pathStr = each.getPath.toString
|
||||||
if(each.isFile){
|
if(each.isFile){
|
||||||
val fileName: String = pathStr.split("/").last
|
val fileName: String = pathStr.split("/").last
|
||||||
selectArr = selectionConditions.split(",")
|
selectArr = selectionConditions.split(",").map(x => x.trim)
|
||||||
var b: Boolean =false
|
var b: Boolean =false
|
||||||
for(x <- selectArr){
|
for(x <- selectArr){
|
||||||
b = Pattern.matches(x,fileName)
|
b = Pattern.matches(x,fileName)
|
||||||
|
|
|
@ -67,7 +67,7 @@ class GetUrl extends ConfigurableStop{
|
||||||
var arrbuffer:ArrayBuffer[Element]=ArrayBuffer()
|
var arrbuffer:ArrayBuffer[Element]=ArrayBuffer()
|
||||||
arrbuffer+=rootElt
|
arrbuffer+=rootElt
|
||||||
|
|
||||||
val arrLabel: Array[String] = label.split(",")
|
val arrLabel: Array[String] = label.split(",").map(x => x.trim)
|
||||||
for(x<-(1 until arrLabel.length)){
|
for(x<-(1 until arrLabel.length)){
|
||||||
var ele: Element =null
|
var ele: Element =null
|
||||||
if(x==1){
|
if(x==1){
|
||||||
|
@ -80,7 +80,7 @@ class GetUrl extends ConfigurableStop{
|
||||||
|
|
||||||
val FatherElement: Element = arrbuffer(arrbuffer.size-2)
|
val FatherElement: Element = arrbuffer(arrbuffer.size-2)
|
||||||
|
|
||||||
val arrSchame: Array[String] = schema.split(",")
|
val arrSchame: Array[String] = schema.split(",").map(x=>x.trim)
|
||||||
|
|
||||||
var list:ListBuffer[String]=ListBuffer()
|
var list:ListBuffer[String]=ListBuffer()
|
||||||
|
|
||||||
|
@ -98,7 +98,7 @@ class GetUrl extends ConfigurableStop{
|
||||||
}
|
}
|
||||||
|
|
||||||
val listRows: List[Row] = list.toList.map(line => {
|
val listRows: List[Row] = list.toList.map(line => {
|
||||||
val seq: Seq[String] = line.split(",").toSeq
|
val seq: Seq[String] = line.split(",").map(x => x.trim).toSeq
|
||||||
val row = Row.fromSeq(seq)
|
val row = Row.fromSeq(seq)
|
||||||
row
|
row
|
||||||
})
|
})
|
||||||
|
|
|
@ -35,7 +35,7 @@ class ImpalaRead extends ConfigurableStop{
|
||||||
val stmt: Statement = con.createStatement()
|
val stmt: Statement = con.createStatement()
|
||||||
val rs: ResultSet = stmt.executeQuery(sql)
|
val rs: ResultSet = stmt.executeQuery(sql)
|
||||||
|
|
||||||
val filedNames: Array[String] = schameString.split(",")
|
val filedNames: Array[String] = schameString.split(",").map(x => x.trim)
|
||||||
var rowsArr:ArrayBuffer[ArrayBuffer[String]]=ArrayBuffer()
|
var rowsArr:ArrayBuffer[ArrayBuffer[String]]=ArrayBuffer()
|
||||||
while (rs.next()){
|
while (rs.next()){
|
||||||
var rowArr:ArrayBuffer[String]=ArrayBuffer()
|
var rowArr:ArrayBuffer[String]=ArrayBuffer()
|
||||||
|
|
|
@ -48,7 +48,7 @@ class JdbcReadFromOracle extends ConfigurableStop{
|
||||||
val rs: ResultSet = pre.executeQuery()
|
val rs: ResultSet = pre.executeQuery()
|
||||||
|
|
||||||
|
|
||||||
val filedNames: Array[String] = schema.split(",")
|
val filedNames: Array[String] = schema.split(",").map(x => x.trim)
|
||||||
var rowsArr:ArrayBuffer[ArrayBuffer[Any]]=ArrayBuffer()
|
var rowsArr:ArrayBuffer[ArrayBuffer[Any]]=ArrayBuffer()
|
||||||
var rowArr:ArrayBuffer[Any]=ArrayBuffer()
|
var rowArr:ArrayBuffer[Any]=ArrayBuffer()
|
||||||
while (rs.next()){
|
while (rs.next()){
|
||||||
|
|
Loading…
Reference in New Issue