forked from opensci/piflow
add trim for stop's properties in common package
This commit is contained in:
parent
fe2ebe96e6
commit
5faf9d4f75
|
@ -17,10 +17,10 @@ class ConvertSchema extends ConfigurableStop {
|
|||
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
|
||||
var df = in.read()
|
||||
|
||||
val field = schema.split(",")
|
||||
val field = schema.split(",").map(x => x.trim)
|
||||
|
||||
field.foreach(f => {
|
||||
val old_new: Array[String] = f.split("->")
|
||||
val old_new: Array[String] = f.split("->").map(x => x.trim)
|
||||
df = df.withColumnRenamed(old_new(0),old_new(1))
|
||||
})
|
||||
|
||||
|
|
|
@ -18,7 +18,7 @@ class DropField extends ConfigurableStop {
|
|||
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
|
||||
var df = in.read()
|
||||
|
||||
val field = columnNames.split(",")
|
||||
val field = columnNames.split(",").map(x => x.trim)
|
||||
for( x <- 0 until field.size){
|
||||
df = df.drop(field(x))
|
||||
}
|
||||
|
|
|
@ -17,7 +17,8 @@ class Fork extends ConfigurableStop{
|
|||
|
||||
override def setProperties(map: Map[String, Any]): Unit = {
|
||||
val outportStr = MapUtil.get(map,"outports").asInstanceOf[String]
|
||||
outports = outportStr.split(",").toList
|
||||
outports = outportStr.split(",").map(x => x.trim).toList
|
||||
|
||||
}
|
||||
|
||||
override def initialize(ctx: ProcessContext): Unit = {
|
||||
|
|
|
@ -22,7 +22,7 @@ class Join extends ConfigurableStop{
|
|||
|
||||
var seq: Seq[String]= Seq()
|
||||
correlationColumn.split(",").foreach(x=>{
|
||||
seq = seq .++(Seq(x.toString))
|
||||
seq = seq .++(Seq(x.trim.toString))
|
||||
})
|
||||
|
||||
var df: DataFrame = null
|
||||
|
|
|
@ -24,7 +24,7 @@ class Merge extends ConfigurableStop{
|
|||
|
||||
def setProperties(map : Map[String, Any]): Unit = {
|
||||
val inportStr = MapUtil.get(map,"inports").asInstanceOf[String]
|
||||
inports = inportStr.split(",").toList
|
||||
inports = inportStr.split(",").map(x => x.trim).toList
|
||||
}
|
||||
|
||||
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
|
||||
|
|
|
@ -21,7 +21,7 @@ class SelectField extends ConfigurableStop {
|
|||
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
|
||||
val df = in.read()
|
||||
|
||||
val field = columnNames.split(",")
|
||||
val field = columnNames.split(",").map(x => x.trim)
|
||||
val columnArray : Array[Column] = new Array[Column](field.size)
|
||||
for(i <- 0 to field.size - 1){
|
||||
columnArray(i) = new Column(field(i))
|
||||
|
|
|
@ -209,6 +209,8 @@ class HivePRDDistinct extends ConfigurableStop {
|
|||
catch {
|
||||
case ignored: Exception =>
|
||||
} finally if (connection != null) connection.close()
|
||||
}catch {
|
||||
case _ => print(_)
|
||||
}
|
||||
}
|
||||
keys
|
||||
|
|
Loading…
Reference in New Issue