From 2e1ecd0087a613b1055b3a9157e0f1dfcfcf5f7d Mon Sep 17 00:00:00 2001 From: yanfqidong0604 Date: Fri, 11 Jan 2019 15:14:12 +0800 Subject: [PATCH] join stop qidong yang --- .../scala/cn/piflow/bundle/common/Join.scala | 26 +++++-------------- 1 file changed, 7 insertions(+), 19 deletions(-) diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Join.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Join.scala index d36ab76..7dd94cf 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Join.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Join.scala @@ -20,34 +20,22 @@ class Join extends ConfigurableStop{ override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { val dfs: Seq[DataFrame] = in.ports().map(in.read(_)) - var df1: DataFrame = dfs(0).withColumnRenamed(correlationField,correlationField+"_1") - var df2: DataFrame = dfs(1).withColumnRenamed(correlationField,correlationField+"_2") + var df1: DataFrame = dfs(0) + var df2: DataFrame = dfs(1) var column: Column = null val correlationFieldArr: Array[String] = correlationField.split(",") - if(correlationFieldArr.size ==1){ - df1 = df1.withColumnRenamed(correlationFieldArr(0),correlationFieldArr(0)+"_1") - df2 = df2.withColumnRenamed(correlationFieldArr(0),correlationFieldArr(0)+"_2") - column = df1(correlationFieldArr(0)+"_1")===df2(correlationFieldArr(0)+"_2") - - }else if(correlationFieldArr.size > 1){ - for(x <- (0 until correlationFieldArr.size)){ + df1 = df1.withColumnRenamed(correlationFieldArr(0),correlationFieldArr(0)+"_1") + df2 = df2.withColumnRenamed(correlationFieldArr(0),correlationFieldArr(0)+"_2") + column = df1(correlationFieldArr(0)+"_1")===df2(correlationFieldArr(0)+"_2") + if(correlationFieldArr.size > 1){ + for(x <- (1 until correlationFieldArr.size)){ var newColumn: Column =null - if(x == 0){ - df1 = df1.withColumnRenamed(correlationFieldArr(0),correlationFieldArr(0)+"_1") - df2 = df2.withColumnRenamed(correlationFieldArr(0),correlationFieldArr(0)+"_2") - column = df1(correlationFieldArr(0)+"_1")===df2(correlationFieldArr(0)+"_2") - - }else{ df1 = df1.withColumnRenamed(correlationFieldArr(x),correlationFieldArr(x)+"_1") df2 = df2.withColumnRenamed(correlationFieldArr(x),correlationFieldArr(x)+"_2") newColumn = df1(correlationFieldArr(x)+"_1")===df2(correlationFieldArr(x)+"_2") column = column and newColumn - - } } - }else{ - throw new Exception("Misdescription of fields associated with tables") } var df: DataFrame = null