update join stop

This commit is contained in:
or 2019-09-02 10:50:55 +08:00
parent 7572fe96bc
commit c58a82a215
2 changed files with 16 additions and 24 deletions

View File

@ -9,7 +9,7 @@ import org.apache.spark.sql.{Column, DataFrame}
class Join extends ConfigurableStop{
override val authorEmail: String = "yangqidong@cnic.cn"
override val description: String = "Table connection, including full connection, left connection, right connection and inner connection"
override val inportList: List[String] =List(PortEnum.AnyPort.toString)
override val inportList: List[String] =List(PortEnum.LeftPort.toString,PortEnum.RightPort.toString)
override val outportList: List[String] = List(PortEnum.DefaultPort.toString)
var joinMode:String=_
@ -19,31 +19,20 @@ class Join extends ConfigurableStop{
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val dfs: Seq[DataFrame] = in.ports().map(in.read(_))
var df1: DataFrame = dfs(0)
var df2: DataFrame = dfs(1)
var column: Column = null
val leftDF = in.read(PortEnum.LeftPort)
val rightDF = in.read(PortEnum.RightPort)
val correlationFieldArr: Array[String] = correlationField.split(",")
df1 = df1.withColumnRenamed(correlationFieldArr(0),correlationFieldArr(0)+"_1")
df2 = df2.withColumnRenamed(correlationFieldArr(0),correlationFieldArr(0)+"_2")
column = df1(correlationFieldArr(0)+"_1")===df2(correlationFieldArr(0)+"_2")
if(correlationFieldArr.size > 1){
for(x <- (1 until correlationFieldArr.size)){
var newColumn: Column =null
df1 = df1.withColumnRenamed(correlationFieldArr(x),correlationFieldArr(x)+"_1")
df2 = df2.withColumnRenamed(correlationFieldArr(x),correlationFieldArr(x)+"_2")
newColumn = df1(correlationFieldArr(x)+"_1")===df2(correlationFieldArr(x)+"_2")
column = column and newColumn
}
}
var seq: Seq[String]= Seq()
correlationField.split(",").foreach(x=>{
seq = seq .++(Seq(x.toString))
})
var df: DataFrame = null
joinMode match {
case "INNER" =>df = df1.join(df2, column)
case "LEFT" => df = df1.join(df2,column,"left_outer")
case "RIGHT" => df = df1.join(df2,column,"right_outer")
case "FULL" => df = df1.join(df2,column,"outer")
case "inner" =>df = leftDF.join(rightDF, seq)
case "left" => df = leftDF.join(rightDF,seq,"left_outer")
case "right" => df = leftDF.join(rightDF,seq,"right_outer")
case "full_outer" => df = leftDF.join(rightDF,seq,"outer")
}
out.write(df)
@ -59,8 +48,9 @@ class Join extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val joinMode = new PropertyDescriptor().name("joinMode").displayName("joinMode").description("For table association, you can choose INNER, LEFT, RIGHT, FULL").defaultValue("").required(true)
val correlationField = new PropertyDescriptor().name("correlationField").displayName("correlationField").description("Fields associated with tables,If there are more than one, please use, separate").defaultValue("").required(true)
val joinMode = new PropertyDescriptor().name("joinMode").displayName("joinMode").description("For table association, you can choose INNER, LEFT, RIGHT, FULL")
.allowableValues(Set("inner","left","right","full_outer")).defaultValue("inner").required(true)
val correlationField = new PropertyDescriptor().name("correlationField").displayName("correlationField").description("Fields associated with tables,If there are more than one, please use , separate").defaultValue("").required(true)
descriptor = correlationField :: descriptor
descriptor = joinMode :: descriptor

View File

@ -5,6 +5,8 @@ object PortEnum {
val AnyPort = "Any"
val DefaultPort = "Default"
val NonePort = "None"
val LeftPort = "Left"
val RightPort= "Right"
val scopus_articlePort = "scopus_articlePort"
val scopus_article_api_response = "scopus_article_api_response"
}