From 079635644c6d742981b8da39be9ed4b1aa81d663 Mon Sep 17 00:00:00 2001 From: judy_0131 Date: Tue, 3 Sep 2019 18:17:49 +0800 Subject: [PATCH] add filter stop --- .../cn/piflow/bundle/common/Filter.scala | 48 +++++++++++++++++++ 1 file changed, 48 insertions(+) create mode 100644 piflow-bundle/src/main/scala/cn/piflow/bundle/common/Filter.scala diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Filter.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Filter.scala new file mode 100644 index 0000000..0ca1595 --- /dev/null +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Filter.scala @@ -0,0 +1,48 @@ +package cn.piflow.bundle.common + +import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} +import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup} +import cn.piflow.conf.bean.PropertyDescriptor +import cn.piflow.conf.util.{ImageUtil, MapUtil} +import org.apache.spark.sql.{Column, DataFrame} + +class Filter extends ConfigurableStop{ + override val authorEmail: String = "xjzhu@cnic.cn" + override val description: String = "Do filter by condition" + override val inportList: List[String] = List(PortEnum.DefaultPort) + override val outportList: List[String] = List(PortEnum.DefaultPort) + + var condition: String = _ + + override def setProperties(map: Map[String, Any]): Unit = { + condition = MapUtil.get(map,"condition").asInstanceOf[String] + } + + override def getPropertyDescriptor(): List[PropertyDescriptor] = { + var descriptor : List[PropertyDescriptor] = List() + val condition = new PropertyDescriptor().name("condition").displayName("condition").description("The condition you want to filter").defaultValue("").required(true) + descriptor = condition :: descriptor + descriptor + } + + override def getIcon(): Array[Byte] = { + ImageUtil.getImage("icon/common/SelectField.png") + } + + override def getGroup(): List[String] = { + List(StopGroup.CommonGroup) + } + + override def initialize(ctx: ProcessContext): Unit = { + + } + + override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { + + val df = in.read() + + var filterDF : DataFrame = df.filter(condition) + + out.write(filterDF) + } +}