fix bug NullPointException (ProcessXMLInAvro)

This commit is contained in:
or 2019-09-12 17:57:41 +08:00
parent 463f530a57
commit 3b64a5f174
1 changed files with 15 additions and 3 deletions

View File

@ -5,6 +5,7 @@ import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil} import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup} import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import com.alibaba.fastjson.{JSON, JSONArray, JSONObject}
import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.{SaveMode, SparkSession}
class ProcessXMLInAvro extends ConfigurableStop{ class ProcessXMLInAvro extends ConfigurableStop{
@ -15,6 +16,7 @@ class ProcessXMLInAvro extends ConfigurableStop{
var relevance :String = _ var relevance :String = _
var labeled :String = _
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]() val spark = pec.get[SparkSession]()
@ -38,6 +40,11 @@ class ProcessXMLInAvro extends ConfigurableStop{
} else { } else {
relevance="" relevance=""
} }
if(labeled.equals("true")){
labeled=",product_xml.product.pub_basic.labeled"
} else {
labeled=""
}
val outDF = spark.sql( val outDF = spark.sql(
s""" s"""
@ -65,7 +72,7 @@ class ProcessXMLInAvro extends ConfigurableStop{
|,product_xml.product.pub_basic.full_text.upload_date |,product_xml.product.pub_basic.full_text.upload_date
|,product_xml.product.pub_basic.full_text_img_url |,product_xml.product.pub_basic.full_text_img_url
|,product_xml.product.pub_basic.has_full_text |,product_xml.product.pub_basic.has_full_text
|,product_xml.product.pub_basic.labeled |${labeled}
|,product_xml.product.pub_basic.language |,product_xml.product.pub_basic.language
|,product_xml.product.pub_basic.list_bdzw |,product_xml.product.pub_basic.list_bdzw
|,product_xml.product.pub_basic.list_cssci |,product_xml.product.pub_basic.list_cssci
@ -139,6 +146,7 @@ class ProcessXMLInAvro extends ConfigurableStop{
override def setProperties(map: Map[String, Any]): Unit = { override def setProperties(map: Map[String, Any]): Unit = {
relevance = MapUtil.get(map,"relevance").asInstanceOf[String] relevance = MapUtil.get(map,"relevance").asInstanceOf[String]
labeled = MapUtil.get(map,"labeled").asInstanceOf[String]
} }
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
@ -147,6 +155,10 @@ class ProcessXMLInAvro extends ConfigurableStop{
allowableValues(Set("true","false")).defaultValue("true").required(true) allowableValues(Set("true","false")).defaultValue("true").required(true)
descriptor = relevance :: descriptor descriptor = relevance :: descriptor
val labeled = new PropertyDescriptor().name("labeled").displayName("labeled").description("Does the schema contain an 'labeled' field").
allowableValues(Set("true","false")).defaultValue("true").required(true)
descriptor = labeled :: descriptor
descriptor descriptor
} }