diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/nsfc/xml/ProcessXMLInAvro.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/nsfc/xml/ProcessXMLInAvro.scala index 1ca6458..8dddde8 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/nsfc/xml/ProcessXMLInAvro.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/nsfc/xml/ProcessXMLInAvro.scala @@ -5,6 +5,7 @@ import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.util.{ImageUtil, MapUtil} import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} +import com.alibaba.fastjson.{JSON, JSONArray, JSONObject} import org.apache.spark.sql.{SaveMode, SparkSession} class ProcessXMLInAvro extends ConfigurableStop{ @@ -15,6 +16,7 @@ class ProcessXMLInAvro extends ConfigurableStop{ var relevance :String = _ + var labeled :String = _ override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { val spark = pec.get[SparkSession]() @@ -38,6 +40,11 @@ class ProcessXMLInAvro extends ConfigurableStop{ } else { relevance="" } + if(labeled.equals("true")){ + labeled=",product_xml.product.pub_basic.labeled" + } else { + labeled="" + } val outDF = spark.sql( s""" @@ -52,7 +59,7 @@ class ProcessXMLInAvro extends ConfigurableStop{ |,split(parseAuthors(product_xml.product.pub_basic.authors),'\t<&\t')[4] as firsr_author |,split(parseAuthors(product_xml.product.pub_basic.authors),'\t<&\t')[5] as is_mine | - |,product_xml.product.pub_basic.authors_name + |,product_xml.product.pub_basic.authors_name |,product_xml.product.pub_basic.cited_times |,product_xml.product.pub_basic.create_date |,product_xml.product.pub_basic.en_pub_type_name @@ -65,7 +72,7 @@ class ProcessXMLInAvro extends ConfigurableStop{ |,product_xml.product.pub_basic.full_text.upload_date |,product_xml.product.pub_basic.full_text_img_url |,product_xml.product.pub_basic.has_full_text - |,product_xml.product.pub_basic.labeled + |${labeled} |,product_xml.product.pub_basic.language |,product_xml.product.pub_basic.list_bdzw |,product_xml.product.pub_basic.list_cssci @@ -103,7 +110,7 @@ class ProcessXMLInAvro extends ConfigurableStop{ |,product_xml.product.pub_basic.zh_source |,product_xml.product.pub_basic.zh_title | - |,split(parseExtend(product_xml.product.pub_extend,product_xml.product.pub_basic.pub_type_id),'\t<&\t')[0] as article_no + |,split(parseExtend(product_xml.product.pub_extend,product_xml.product.pub_basic.pub_type_id),'\t<&\t')[0] as article_no |,split(parseExtend(product_xml.product.pub_extend,product_xml.product.pub_basic.pub_type_id),'\t<&\t')[1] as begin_num |,split(parseExtend(product_xml.product.pub_extend,product_xml.product.pub_basic.pub_type_id),'\t<&\t')[2] as city |,split(parseExtend(product_xml.product.pub_extend,product_xml.product.pub_basic.pub_type_id),'\t<&\t')[3] as conf_end_day @@ -139,6 +146,7 @@ class ProcessXMLInAvro extends ConfigurableStop{ override def setProperties(map: Map[String, Any]): Unit = { relevance = MapUtil.get(map,"relevance").asInstanceOf[String] + labeled = MapUtil.get(map,"labeled").asInstanceOf[String] } override def getPropertyDescriptor(): List[PropertyDescriptor] = { @@ -147,6 +155,10 @@ class ProcessXMLInAvro extends ConfigurableStop{ allowableValues(Set("true","false")).defaultValue("true").required(true) descriptor = relevance :: descriptor + val labeled = new PropertyDescriptor().name("labeled").displayName("labeled").description("Does the schema contain an 'labeled' field"). + allowableValues(Set("true","false")).defaultValue("true").required(true) + descriptor = labeled :: descriptor + descriptor }