add histogram stop
This commit is contained in:
parent
6d880ca8ee
commit
b9d01b5c0c
|
@ -38,8 +38,6 @@ class GetFile extends ConfigurableStop{
|
|||
PassWord = MapUtil.get(map,key="PassWord").asInstanceOf[String]
|
||||
hdfsFile = MapUtil.get(map,key="hdfsFile").asInstanceOf[String]
|
||||
localPath = MapUtil.get(map,key="localPath").asInstanceOf[String]
|
||||
|
||||
|
||||
}
|
||||
|
||||
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
|
||||
|
|
|
@ -0,0 +1,86 @@
|
|||
package cn.piflow.bundle.visualization
|
||||
|
||||
import cn.piflow.conf.bean.PropertyDescriptor
|
||||
import cn.piflow.conf.util.{ImageUtil, MapUtil}
|
||||
import cn.piflow.conf.{ConfigurableVisualizationStop, Port, StopGroup, VisualizationType}
|
||||
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
|
||||
import org.apache.spark.sql.SparkSession
|
||||
|
||||
class Histogram extends ConfigurableVisualizationStop{
|
||||
override val authorEmail: String = "xjzhu@cnic.cn"
|
||||
override val description: String = "Show data with histogram. " +
|
||||
"X represents the abscissa, the ordinate is represented by customizedProperties, " +
|
||||
"the key is the dimentsion, and the value is the operation for the dimentsion, such as SUM."
|
||||
override val inportList: List[String] = List(Port.DefaultPort)
|
||||
override val outportList: List[String] = List(Port.DefaultPort)
|
||||
|
||||
|
||||
var x:String =_
|
||||
|
||||
override var visualizationType: String = VisualizationType.Histogram
|
||||
override val isCustomized: Boolean = true
|
||||
override val customizedAllowValue: List[String] = List("COUNT","SUM","AVG","MAX","MIN")
|
||||
|
||||
override def setProperties(map: Map[String, Any]): Unit = {
|
||||
x=MapUtil.get(map,key="x").asInstanceOf[String]
|
||||
//dimension=MapUtil.get(map,key="dimension").asInstanceOf[String]
|
||||
}
|
||||
|
||||
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
|
||||
var descriptor : List[PropertyDescriptor] = List()
|
||||
val abscissa = new PropertyDescriptor()
|
||||
.name("x")
|
||||
.displayName("x")
|
||||
.description("The abscissa of histogram")
|
||||
.defaultValue("")
|
||||
.example("year")
|
||||
.required(true)
|
||||
/*val dimension = new PropertyDescriptor()
|
||||
.name("dimension")
|
||||
.displayName("Dimension")
|
||||
.description("The dimension of line chart, multi demensions are separated by commas")
|
||||
.defaultValue("")
|
||||
.required(true)*/
|
||||
descriptor = abscissa :: descriptor
|
||||
//descriptor = dimension :: descriptor
|
||||
descriptor
|
||||
}
|
||||
|
||||
override def getIcon(): Array[Byte] = {
|
||||
ImageUtil.getImage("icon/visualization/histogram.png")
|
||||
}
|
||||
|
||||
override def getGroup(): List[String] = {
|
||||
List(StopGroup.Visualization)
|
||||
}
|
||||
|
||||
override def initialize(ctx: ProcessContext): Unit = {}
|
||||
|
||||
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
|
||||
val spark = pec.get[SparkSession]()
|
||||
val sqlContext=spark.sqlContext
|
||||
val dataFrame = in.read()
|
||||
dataFrame.createOrReplaceTempView("Histoqram")
|
||||
|
||||
if(this.customizedProperties != null || this.customizedProperties.size != 0){
|
||||
|
||||
println("dimension is " + this.customizedProperties.keySet.mkString(",") + "!!!!!!!!!!!!!!!")
|
||||
|
||||
var dimensionActionArray = List[String]()
|
||||
val it = this.customizedProperties.keySet.iterator
|
||||
while (it.hasNext){
|
||||
val dimention = it.next()
|
||||
val action = MapUtil.get(this.customizedProperties,dimention).asInstanceOf[String]
|
||||
val dimentionAction = action + "(" + dimention + ") as " + dimention + "_" + action
|
||||
dimensionActionArray = dimensionActionArray :+ dimentionAction
|
||||
}
|
||||
|
||||
val sqlText = "select " + x + "," + dimensionActionArray.mkString(",") + " from Histoqram group by " + x;
|
||||
println("Histoqram Sql: " + sqlText)
|
||||
val lineChartDF = spark.sql(sqlText)
|
||||
out.write(lineChartDF.repartition(1))
|
||||
}else{
|
||||
out.write(dataFrame)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -4,5 +4,6 @@ object VisualizationType {
|
|||
|
||||
val LineChart = "LINECHART"
|
||||
val PieChart = "PIECHART"
|
||||
val Histogram = "HISTOGRAM"
|
||||
|
||||
}
|
||||
|
|
|
@ -297,7 +297,7 @@ object API {
|
|||
val jsonMapList = getJsonMapList(visuanlizationPath + "/data")
|
||||
|
||||
|
||||
if(VisualizationType.LineChart == visualizationType){
|
||||
if(VisualizationType.LineChart == visualizationType || VisualizationType.Histogram == visualizationType){
|
||||
|
||||
var visualizationTuple = List[Tuple2[String,String]]()
|
||||
|
||||
|
@ -317,9 +317,22 @@ object API {
|
|||
//dimensionMap
|
||||
var lineChartMap = Map[String, Any]()
|
||||
val x = schemaArray(0)
|
||||
val xMap = Map(x -> OptionUtil.getAny(dimensionMap.get(schemaArray(0))) )
|
||||
lineChartMap += {"x" -> xMap}
|
||||
lineChartMap += {"y" -> dimensionMap.filterKeys(!_.equals(x))}
|
||||
lineChartMap += {"xAxis" -> Map("type" -> x, "data" -> OptionUtil.getAny(dimensionMap.get(schemaArray(0))) )}
|
||||
lineChartMap += {"yAxis" -> Map("type" -> "value")}
|
||||
var seritesList = List[Map[String, Any]]()
|
||||
dimensionMap.filterKeys(!_.equals(x)).foreach(item =>{
|
||||
val name_action = item._1
|
||||
val data = item._2
|
||||
val name = name_action.split("_")(0)
|
||||
val action = name_action.split("_")(1)
|
||||
val vType = visualizationType match {
|
||||
case VisualizationType.LineChart => "line"
|
||||
case VisualizationType.Histogram => "bar"
|
||||
}
|
||||
val map = Map("name" -> name, "type" -> vType,"stack" -> action, "data" -> data)
|
||||
seritesList = map +: seritesList
|
||||
})
|
||||
lineChartMap += {"series" -> seritesList}
|
||||
|
||||
val visualizationJsonData = JsonUtil.format(JsonUtil.toJson(lineChartMap))
|
||||
println(visualizationJsonData)
|
||||
|
|
Loading…
Reference in New Issue