Merge remote-tracking branch 'origin/master'

This commit is contained in:
bao319 2020-03-27 20:47:26 +08:00
commit 259da94bbe
7 changed files with 252 additions and 2 deletions

View File

@ -0,0 +1,19 @@
{
"flow":{
"name":"mockData",
"uuid":"1234",
"stops":[
{
"uuid":"0000",
"name":"MockData",
"bundle":"cn.piflow.bundle.common.MockData",
"properties":{
"schema": "name:String:true, age:Int, weight:Double, totalMoney:Float, isStudent:Boolean",
"count": "10"
}
}
],
"paths":[]
}
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 31 KiB

View File

@ -0,0 +1,36 @@
{
"flow":{
"name":"scalaTest",
"uuid":"1234567890",
"stops":[
{
"uuid":"1111",
"name":"CsvParser",
"bundle":"cn.piflow.bundle.csv.CsvParser",
"properties":{
"csvPath":"hdfs://10.0.88.13:9000/xjzhu/test.csv",
"header":"false",
"delimiter":",",
"schema":"title,author"
}
},
{
"uuid":"2222",
"name":"ExecuteScala",
"bundle":"cn.piflow.bundle.script.ExecuteScala",
"properties":{
"script":"import sys\nimport os\n\nimport numpy as np\nfrom scipy import linalg\nimport pandas as pd\n\nimport matplotlib\nmatplotlib.use('Agg')\n\n\ndef listFunction(dictInfo):\n\n return dictInfo",
"execFunction": "listFunction"
}
}
],
"paths":[
{
"from":"CsvParser",
"outport":"",
"inport":"",
"to":"ExecuteScala"
}
]
}
}

View File

@ -0,0 +1,136 @@
package cn.piflow.bundle.common
import java.time.LocalDate
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.conf.{ConfigurableStop, Port, StopGroup}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import jodd.datetime.JDateTime
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types._
import org.json4s
import org.json4s.JsonAST._
import org.json4s.jackson.JsonMethods._
import scala.util.Random
class MockData extends ConfigurableStop{
override val authorEmail: String = "xjzhu@cnic.cn"
override val description: String = "Mock dataframe."
override val inportList: List[String] = List(Port.NonePort)
override val outportList: List[String] = List(Port.DefaultPort)
var schema: String = _
var count: Int = _
override def setProperties(map: Map[String, Any]): Unit = {
schema = MapUtil.get(map,"schema").asInstanceOf[String]
count = MapUtil.get(map,"count").asInstanceOf[String].toInt
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val schema = new PropertyDescriptor()
.name("schema")
.displayName("Schema")
.description("The schema of mock data, schema's format is column:columnType:isNullable." +
"columnType can be String/Int/Long/Float/Double/Boolean. " +
"isNullable can be left blank, the default value is false")
.defaultValue("")
.required(true)
.example("id:String,name:String,age:Int")
descriptor = schema :: descriptor
val count = new PropertyDescriptor()
.name("count")
.displayName("Count")
.description("The count of dataframe")
.defaultValue("")
.required(true)
.example("10")
descriptor = count :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("icon/common/MockData.png")
}
override def getGroup(): List[String] = {
List(StopGroup.CommonGroup)
}
override def initialize(ctx: ProcessContext): Unit = {}
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val field = this.schema.split(",")
val structFieldArray : Array[StructField] = new Array[StructField](field.size)
for(i <- 0 to field.size - 1){
val columnInfo = field(i).split(":")
val column = columnInfo(0)
val columnType = columnInfo(1)
var isNullable = false
if(columnInfo.size == 3){
isNullable = columnInfo(2).toBoolean
}
columnType match {
case "String"=> structFieldArray(i) = new StructField(column, StringType, isNullable)
case "Int"=>structFieldArray(i) = new StructField(column, IntegerType, isNullable)
case "Double"=>structFieldArray(i) = new StructField(column, DoubleType, isNullable)
case "Float"=>structFieldArray(i) = new StructField(column, FloatType, isNullable)
case "Long"=>structFieldArray(i) = new StructField(column, LongType, isNullable)
case "Boolean"=>structFieldArray(i) = new StructField(column, BooleanType, isNullable)
//case "Date"=>structFieldArray(i) = new StructField(column, DateType, nullable = true)
//case "Timestamp"=>structFieldArray(i) = new StructField(column, TimestampType, nullable = true)
}
}
val schemaStructType = StructType(structFieldArray)
val spark = pec.get[SparkSession]()
import spark.implicits._
val rnd : Random = new Random()
val i = randomJson(rnd,schemaStructType)
val df = spark.read.schema(schemaStructType).json((0 to count).map{ _ => compact(randomJson(rnd,schemaStructType))}.toDS())
out.write(df)
}
private def randomJson( rnd: Random, dataType : DataType): JValue ={
val alpha = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
dataType match {
case v:DoubleType =>
json4s.JDouble(rnd.nextDouble())
case v:StringType =>
JString((1 to 10).map(x => alpha(Random.nextInt.abs % alpha.size)).mkString)
case v:IntegerType =>
JInt(rnd.nextInt(100))
case v:LongType =>
JInt(rnd.nextLong())
case v:FloatType =>
JDouble(rnd.nextFloat())
case v:BooleanType =>
JBool(rnd.nextBoolean())
case v:ArrayType =>
val size = rnd.nextInt(10)
JArray(
(0 to size).map(_ => randomJson(rnd, v.elementType)).toList
)
case v:StructType =>
JObject(
v.fields.flatMap{
f =>
if( f.nullable && rnd.nextBoolean())
None
else
Some(JField(f.name, randomJson(rnd, f.dataType)))
}.toList
)
}
}
}

View File

@ -214,7 +214,8 @@ object ClassUtil {
("defaultValue" -> property.defaultValue) ~
("allowableValues" -> property.allowableValues) ~
("required" -> property.required.toString) ~
("sensitive" -> property.sensitive.toString)) }) )
("sensitive" -> property.sensitive.toString) ~
("example" -> property.example)) }) )
val jsonString = compactRender(json)
jsonString

View File

@ -0,0 +1,58 @@
package cn.piflow.bundle.common
import java.net.InetAddress
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.{PropertyUtil, ServerIpUtil}
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
class MockDataTest {
@Test
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/common/mockData.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val ip = InetAddress.getLocalHost.getHostAddress
cn.piflow.util.FileUtil.writeFile("server.ip=" + ip, ServerIpUtil.getServerIpFile())
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start()
//execute flow
val spark = SparkSession.builder()
.master("local[12]")
.appName("hive")
.config("spark.driver.memory", "4g")
.config("spark.executor.memory", "8g")
.config("spark.cores.max", "8")
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "")
.bind("debug.path","")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
}

View File

@ -8,7 +8,7 @@ object HTTPClientGetStopInfo {
def main(args: Array[String]): Unit = {
val url = "http://10.0.86.98:8001/stop/info?bundle=cn.piflow.bundle.jdbc.JdbcWrite"
val url = "http://10.0.85.83:8001/stop/info?bundle=cn.piflow.bundle.csv.CsvParser"
val client = HttpClients.createDefault()
val getFlowInfo:HttpGet = new HttpGet(url)