join stop

qidong yang
This commit is contained in:
yanfqidong0604 2019-01-11 15:14:12 +08:00
parent 180ef7a181
commit 2e1ecd0087
1 changed files with 7 additions and 19 deletions

View File

@ -20,35 +20,23 @@ class Join extends ConfigurableStop{
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val dfs: Seq[DataFrame] = in.ports().map(in.read(_))
var df1: DataFrame = dfs(0).withColumnRenamed(correlationField,correlationField+"_1")
var df2: DataFrame = dfs(1).withColumnRenamed(correlationField,correlationField+"_2")
var df1: DataFrame = dfs(0)
var df2: DataFrame = dfs(1)
var column: Column = null
val correlationFieldArr: Array[String] = correlationField.split(",")
if(correlationFieldArr.size ==1){
df1 = df1.withColumnRenamed(correlationFieldArr(0),correlationFieldArr(0)+"_1")
df2 = df2.withColumnRenamed(correlationFieldArr(0),correlationFieldArr(0)+"_2")
column = df1(correlationFieldArr(0)+"_1")===df2(correlationFieldArr(0)+"_2")
}else if(correlationFieldArr.size > 1){
for(x <- (0 until correlationFieldArr.size)){
if(correlationFieldArr.size > 1){
for(x <- (1 until correlationFieldArr.size)){
var newColumn: Column =null
if(x == 0){
df1 = df1.withColumnRenamed(correlationFieldArr(0),correlationFieldArr(0)+"_1")
df2 = df2.withColumnRenamed(correlationFieldArr(0),correlationFieldArr(0)+"_2")
column = df1(correlationFieldArr(0)+"_1")===df2(correlationFieldArr(0)+"_2")
}else{
df1 = df1.withColumnRenamed(correlationFieldArr(x),correlationFieldArr(x)+"_1")
df2 = df2.withColumnRenamed(correlationFieldArr(x),correlationFieldArr(x)+"_2")
newColumn = df1(correlationFieldArr(x)+"_1")===df2(correlationFieldArr(x)+"_2")
column = column and newColumn
}
}
}else{
throw new Exception("Misdescription of fields associated with tables")
}
var df: DataFrame = null
joinMode match {