add stop trigger

This commit is contained in:
judy0131 2020-12-18 00:33:17 -05:00
parent 0a4204cbac
commit 34ac13f1dd
1 changed files with 57 additions and 0 deletions

View File

@ -0,0 +1,57 @@
package cn.piflow.bundle.inner
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.conf.{ConfigurableStop, Port, StopGroup}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import org.apache.spark.sql.SparkSession
class StopTrigger extends ConfigurableStop{
val authorEmail: String = "xjzhu@cnic.cn"
val description: String = "Run stop with mock data"
val inportList: List[String] = List(Port.NonePort)
val outportList: List[String] = List(Port.DefaultPort)
var jsonString: String = _
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
val jsonRDD = spark.sparkContext.makeRDD(jsonString :: Nil)
val jsonDF = spark.read.json(jsonRDD)
out.write(jsonDF)
}
def initialize(ctx: ProcessContext): Unit = {
}
override def setProperties(map: Map[String, Any]): Unit = {
jsonString = MapUtil.get(map,"jsonString").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val jsonString = new PropertyDescriptor()
.name("jsonString")
.displayName("JsonString")
.description("The json string")
.defaultValue("")
.required(true)
.example("{\"id\":\"13\",\"name\":\"13\",\"score\":\"13\",\"school\":\"13\",\"class\":\"13\"}")
descriptor = jsonString :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("icon/json/JsonStringParser.png")
}
override def getGroup(): List[String] = {
List(StopGroup.JsonGroup)
}
}