From 5faf9d4f750f14ef49d308d62b818c2a571c2030 Mon Sep 17 00:00:00 2001 From: judy0131 Date: Tue, 26 May 2020 17:58:03 +0800 Subject: [PATCH] add trim for stop's properties in common package --- .../main/scala/cn/piflow/bundle/common/ConvertSchema.scala | 4 ++-- .../src/main/scala/cn/piflow/bundle/common/DropField.scala | 2 +- .../src/main/scala/cn/piflow/bundle/common/Fork.scala | 3 ++- .../src/main/scala/cn/piflow/bundle/common/Join.scala | 2 +- .../src/main/scala/cn/piflow/bundle/common/Merge.scala | 2 +- .../src/main/scala/cn/piflow/bundle/common/SelectField.scala | 2 +- .../cn/piflow/bundle/nsfc/distinct/HivePRDDistinct.scala | 2 ++ 7 files changed, 10 insertions(+), 7 deletions(-) diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/ConvertSchema.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/ConvertSchema.scala index 06a5452..a8fb949 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/ConvertSchema.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/ConvertSchema.scala @@ -17,10 +17,10 @@ class ConvertSchema extends ConfigurableStop { def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { var df = in.read() - val field = schema.split(",") + val field = schema.split(",").map(x => x.trim) field.foreach(f => { - val old_new: Array[String] = f.split("->") + val old_new: Array[String] = f.split("->").map(x => x.trim) df = df.withColumnRenamed(old_new(0),old_new(1)) }) diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/DropField.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/DropField.scala index c1912c8..1378af7 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/DropField.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/DropField.scala @@ -18,7 +18,7 @@ class DropField extends ConfigurableStop { def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { var df = in.read() - val field = columnNames.split(",") + val field = columnNames.split(",").map(x => x.trim) for( x <- 0 until field.size){ df = df.drop(field(x)) } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Fork.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Fork.scala index e650b72..279e67e 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Fork.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Fork.scala @@ -17,7 +17,8 @@ class Fork extends ConfigurableStop{ override def setProperties(map: Map[String, Any]): Unit = { val outportStr = MapUtil.get(map,"outports").asInstanceOf[String] - outports = outportStr.split(",").toList + outports = outportStr.split(",").map(x => x.trim).toList + } override def initialize(ctx: ProcessContext): Unit = { diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Join.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Join.scala index 35203de..6dffbfa 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Join.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Join.scala @@ -22,7 +22,7 @@ class Join extends ConfigurableStop{ var seq: Seq[String]= Seq() correlationColumn.split(",").foreach(x=>{ - seq = seq .++(Seq(x.toString)) + seq = seq .++(Seq(x.trim.toString)) }) var df: DataFrame = null diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Merge.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Merge.scala index b9715fa..e5be9ea 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Merge.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Merge.scala @@ -24,7 +24,7 @@ class Merge extends ConfigurableStop{ def setProperties(map : Map[String, Any]): Unit = { val inportStr = MapUtil.get(map,"inports").asInstanceOf[String] - inports = inportStr.split(",").toList + inports = inportStr.split(",").map(x => x.trim).toList } override def getPropertyDescriptor(): List[PropertyDescriptor] = { diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/SelectField.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/SelectField.scala index df8ed79..b45cfe5 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/SelectField.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/SelectField.scala @@ -21,7 +21,7 @@ class SelectField extends ConfigurableStop { def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { val df = in.read() - val field = columnNames.split(",") + val field = columnNames.split(",").map(x => x.trim) val columnArray : Array[Column] = new Array[Column](field.size) for(i <- 0 to field.size - 1){ columnArray(i) = new Column(field(i)) diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/nsfc/distinct/HivePRDDistinct.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/nsfc/distinct/HivePRDDistinct.scala index 9ff9ce2..835a3a9 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/nsfc/distinct/HivePRDDistinct.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/nsfc/distinct/HivePRDDistinct.scala @@ -209,6 +209,8 @@ class HivePRDDistinct extends ConfigurableStop { catch { case ignored: Exception => } finally if (connection != null) connection.close() + }catch { + case _ => print(_) } } keys