1.fix MockData bug

2.optimize example of stop's properties
3.add visualization type for visualization stop
This commit is contained in:
judy0131 2020-09-09 05:10:33 -04:00
parent 4ad23a182e
commit 6d880ca8ee
13 changed files with 212 additions and 30 deletions

View File

@ -0,0 +1,44 @@
{
"flow": {
"name": "test",
"executorMemory": "1g",
"executorNumber": "1",
"uuid": "8a80da1b73d7047f0173eaf27c1f007a",
"paths": [
{
"inport": "",
"from": "SelectHiveQL",
"to": "PieChart",
"outport": ""
}
],
"executorCores": "1",
"driverMemory": "1g",
"stops": [
{
"name": "SelectHiveQL",
"bundle": "cn.piflow.bundle.hive.SelectHiveQL",
"uuid": "8a80da1b73d7047f0173eaf27c230080",
"properties": {
"hiveQL": "select * from test.student"
},
"customizedProperties": {
}
},
{
"name": "PieChart",
"bundle": "cn.piflow.bundle.visualization.PieChart",
"uuid": "8a80da1b73d7047f0173eaf27c20007c",
"properties": {
"dimension": "birthday",
"indicator": "name",
"indicatorOption": "COUNT"
},
"customizedProperties": {
}
}
]
}
}

View File

@ -90,7 +90,7 @@ class MockData extends ConfigurableStop{
}
val schemaStructType = StructType(structFieldArray)
val rnd : Random = new Random()
val df = spark.read.schema(schemaStructType).json((0 to count).map{ _ => compact(randomJson(rnd,schemaStructType))}.toDS())
val df = spark.read.schema(schemaStructType).json((0 to count -1 ).map{ _ => compact(randomJson(rnd,schemaStructType))}.toDS())
out.write(df)
}

View File

@ -54,7 +54,7 @@ class MysqlRead extends ConfigurableStop {
.description("The Url of mysql database")
.defaultValue("")
.required(true)
.example("jdbc:mysql://127.0.0.1/dbname")
.example("jdbc:mysql://127.0.0.1:3306/dbname")
descriptor = url :: descriptor
val user=new PropertyDescriptor()

View File

@ -1,7 +1,7 @@
package cn.piflow.bundle.visualization
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.conf.{ConfigurableVisualizationStop, Port, StopGroup}
import cn.piflow.conf.{ConfigurableVisualizationStop, Port, StopGroup, VisualizationType}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import org.apache.spark.sql.SparkSession
@ -14,9 +14,10 @@ class LineChart extends ConfigurableVisualizationStop{
override val inportList: List[String] = List(Port.DefaultPort)
override val outportList: List[String] = List(Port.DefaultPort)
var x:String =_
//var dimension:String =_
var x:String =_
override var visualizationType: String = VisualizationType.LineChart
override val isCustomized: Boolean = true
override val customizedAllowValue: List[String] = List("COUNT","SUM","AVG","MAX","MIN")
@ -32,6 +33,7 @@ class LineChart extends ConfigurableVisualizationStop{
.displayName("x")
.description("The abscissa of line chart")
.defaultValue("")
.example("year")
.required(true)
/*val dimension = new PropertyDescriptor()
.name("dimension")

View File

@ -0,0 +1,84 @@
package cn.piflow.bundle.visualization
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.conf.{ConfigurableVisualizationStop, Port, StopGroup, VisualizationType}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.spark.sql.SparkSession
class PieChart extends ConfigurableVisualizationStop {
override val authorEmail: String = "xjzhu@cnic.cn"
override val description: String = "Show data with pie chart. "
override val inportList: List[String] = List(Port.DefaultPort)
override val outportList: List[String] = List(Port.DefaultPort)
//var x:String =_
var dimension: String = _
var indicator: String = _
var indicatorOption: String = _
override var visualizationType: String = VisualizationType.PieChart
override def setProperties(map: Map[String, Any]): Unit = {
//x=MapUtil.get(map,key="x").asInstanceOf[String]
dimension = MapUtil.get(map, key = "dimension").asInstanceOf[String]
indicator = MapUtil.get(map, key = "indicator").asInstanceOf[String]
indicatorOption = MapUtil.get(map, key = "indicatorOption").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor: List[PropertyDescriptor] = List()
val dimension = new PropertyDescriptor()
.name("dimension")
.displayName("Dimension")
.description("The dimension of pie chart")
.defaultValue("")
.example("type")
.required(true)
val indicator = new PropertyDescriptor()
.name("indicator")
.displayName("Indicator")
.description("The indicator of pie chart")
.defaultValue("")
.example("project")
.required(true)
val indicatorOption = new PropertyDescriptor()
.name("indicatorOption")
.displayName("IndicatorOption")
.description("The indicator option of pie chart")
.allowableValues(Set("COUNT","SUM","AVG","MAX","MIN"))
.defaultValue("COUNT")
.example("COUNT")
.required(true)
descriptor = dimension :: descriptor
descriptor = indicator :: descriptor
descriptor = indicatorOption :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("icon/visualization/pie-chart.png")
}
override def getGroup(): List[String] = {
List(StopGroup.Visualization)
}
override def initialize(ctx: ProcessContext): Unit = {}
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
val sqlContext = spark.sqlContext
val dataFrame = in.read()
dataFrame.createOrReplaceTempView("PieChart")
val sqlText = "select " + dimension + "," +indicatorOption+ "(" + indicator + ") from PieChart group by " + dimension;
println("PieChart Sql: " + sqlText)
val pieChartDF = spark.sql(sqlText)
out.write(pieChartDF.repartition(1))
}
}

View File

@ -14,6 +14,7 @@ abstract class ConfigurableStop extends Stop{
val inportList : List[String] //= List(PortEnum.DefaultPort.toString)
val outportList : List[String] //= List(PortEnum.DefaultPort.toString)
//Have customized properties or not
val isCustomized = false
val customizedAllowKey = List[String]()
val customizedAllowValue = List[String]()

View File

@ -0,0 +1,8 @@
package cn.piflow.conf
object VisualizationType {
val LineChart = "LINECHART"
val PieChart = "PIECHART"
}

View File

@ -32,7 +32,9 @@ class StopBean {
def constructStop() : ConfigurableStop = {
try{
println("Construct stop: " + this.bundle + "!!!!!!!!!!!!!!!!!!!!!")
val stop = ClassUtil.findConfigurableStop(this.bundle)
println("Construct stop: " + stop + "!!!!!!!!!!!!!!!!!!!!!")
//init ConfigurableIncrementalStop
@ -62,7 +64,8 @@ class StopBean {
stop.asInstanceOf[ConfigurableVisualizationStop].init(name)
stop.setProperties(this.properties)
}else {
stop.setProperties(this.properties)
print(this.properties)
stop.asInstanceOf[ConfigurableStop].setProperties(this.properties)
}
stop.setCustomizedProperties(this.customizedProperties)

View File

@ -214,7 +214,8 @@ object ClassUtil {
("groups" -> stop.getGroup().mkString(",")) ~
("isCustomized" -> stop.getCustomized().toString) ~
/*("customizedAllowKey" -> "") ~
("customizedAllowValue" -> "") ~*/
("customizedAllowValue" -> "")
("visualizationType" -> "") ~*/
("description" -> stop.description) ~
("icon" -> base64Encoder.encode(stop.getIcon())) ~
("properties" ->

View File

@ -93,6 +93,7 @@ trait VisualizationStop extends Stop{
var processId : String
var stopName : String
var visualizationPath : String
var visualizationType : String
def init(stopName : String): Unit
def getVisualizationPath(processId : String) : String

View File

@ -68,6 +68,17 @@ object FlowLauncher {
})
}
/*val classPath = PropertyUtil.getClassPath()
val classPathFile = new File(classPath)
if(classPathFile.exists()){
FileUtil.getJarFile(new File(classPath)).foreach(f => {
sparkLauncher.addJar(f.getPath)
})
}*/
val scalaPath = PropertyUtil.getScalaPath()
val scalaPathFile = new File(scalaPath)
if(scalaPathFile.exists()){

View File

@ -6,6 +6,7 @@ import java.text.SimpleDateFormat
import java.util.{Date, Properties}
import java.util.concurrent.CountDownLatch
import cn.piflow.conf.VisualizationType
import org.apache.spark.sql.SparkSession
import cn.piflow.conf.util.{ClassUtil, MapUtil, OptionUtil, PluginManager, ScalaExecutorUtil}
import cn.piflow.{GroupExecution, Process, Runner}
@ -286,7 +287,7 @@ object API {
result
}
def getFlowVisualizationData(appId : String, stopName : String) : String = {
def getFlowVisualizationData(appId : String, stopName : String, visualizationType : String) : String = {
var dimensionMap = Map[String, List[String]]()
val visuanlizationPath :String = ConfigureUtil.getVisualizationPath().stripSuffix("/") + "/" + appId + "/" + stopName + "/"
@ -295,32 +296,57 @@ object API {
val schemaArray = visualizationSchema.split(",")
val jsonMapList = getJsonMapList(visuanlizationPath + "/data")
var visuanlizationTuple = List[Tuple2[String,String]]()
val jsonTupleList = jsonMapList.flatMap( map => map.toSeq)
if(VisualizationType.LineChart == visualizationType){
val visualizationInfo = jsonTupleList.groupBy(_._1)
visualizationInfo.foreach(dimension => {
var valueList = List[String]()
val dimensionList = dimension._2
dimensionList.foreach( dimensionAndCountPair => {
val v = String.valueOf(dimensionAndCountPair._2)
println(v)
valueList = valueList :+ v
var visualizationTuple = List[Tuple2[String,String]]()
val jsonTupleList = jsonMapList.flatMap( map => map.toSeq)
val visualizationInfo = jsonTupleList.groupBy(_._1)
visualizationInfo.foreach(dimension => {
var valueList = List[String]()
val dimensionList = dimension._2
dimensionList.foreach( dimensionAndCountPair => {
val v = String.valueOf(dimensionAndCountPair._2)
println(v)
valueList = valueList :+ v
})
dimensionMap += (dimension._1 -> valueList)
})
dimensionMap += (dimension._1 -> valueList)
})
//dimensionMap
//dimensionMap
var lineChartMap = Map[String, Any]()
val x = schemaArray(0)
val xMap = Map(x -> OptionUtil.getAny(dimensionMap.get(schemaArray(0))) )
lineChartMap += {"x" -> xMap}
lineChartMap += {"y" -> dimensionMap.filterKeys(!_.equals(x))}
val visualizationJsonData = JsonUtil.format(JsonUtil.toJson(lineChartMap))
println(visualizationJsonData)
visualizationJsonData
}else if(VisualizationType.PieChart == visualizationType){
val schemaArray = visualizationSchema.split(",")
val schemaReplaceMap = Map(schemaArray(1)->"value", schemaArray(0)->"name")
val jsonMapList = getJsonMapList(visuanlizationPath + "/data")
var pieChartList = List[Map[String, Any]]()
jsonMapList.foreach(map => {
var lineMap = Map[String, Any]()
for(i <- 0 to schemaArray.size-1){
val column = schemaArray(i)
lineMap += (schemaReplaceMap.getOrElse(column,"")-> map.getOrElse(column,""))
}
pieChartList = lineMap +: pieChartList
})
val pieChartMap = Map("data" -> pieChartList)
val visualizationJsonData = JsonUtil.format(JsonUtil.toJson(pieChartMap))
println(visualizationJsonData)
visualizationJsonData
}else{
""
}
var lineChartMap = Map[String, Any]()
val x = schemaArray(0)
val xMap = Map(x -> OptionUtil.getAny(dimensionMap.get(schemaArray(0))) )
lineChartMap += {"x" -> xMap}
lineChartMap += {"y" -> dimensionMap.filterKeys(!_.equals(x))}
val visualizationJsonData = JsonUtil.format(JsonUtil.toJson(lineChartMap))
println(visualizationJsonData)
visualizationJsonData
}
def getStopInfo(bundle : String) : String = {

View File

@ -154,9 +154,10 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup
val appID = req.getUri().query().getOrElse("appID","")
val stopName = req.getUri().query().getOrElse("stopName","")
val visualizationType = req.getUri().query().getOrElse("visualizationType","")
//val port = req.getUri().query().getOrElse("port","default")
if(!appID.equals("") && !stopName.equals()){
val result = API.getFlowVisualizationData(appID, stopName)
val result = API.getFlowVisualizationData(appID, stopName,visualizationType)
Future.successful(HttpResponse(SUCCESS_CODE, entity = result))
}else{
Future.successful(HttpResponse(FAIL_CODE, entity = "appID is null or stop does not have visualization data!"))