1.add json util
2. fix bug: launch spark application for each flow in FlowGroup
This commit is contained in:
parent
76fa9280c2
commit
1754a233cd
|
@ -1,10 +1,13 @@
|
|||
package cn.piflow.conf.bean
|
||||
|
||||
import cn.piflow.conf.util.MapUtil
|
||||
import cn.piflow.conf.util.{JsonUtil, MapUtil}
|
||||
import cn.piflow.{FlowImpl, Path}
|
||||
import net.liftweb.json.JsonDSL._
|
||||
import net.liftweb.json._
|
||||
|
||||
import scala.util.parsing.json.JSONObject
|
||||
|
||||
|
||||
|
||||
class FlowBean {
|
||||
/*@BeanProperty*/
|
||||
|
@ -14,13 +17,27 @@ class FlowBean {
|
|||
var checkpointParentProcessId : String = _
|
||||
var runMode : String = _
|
||||
var showData : String = _
|
||||
|
||||
var stops : List[StopBean] = List()
|
||||
var paths : List[PathBean] = List()
|
||||
|
||||
//flow resource info
|
||||
var driverMem : String = _
|
||||
var executorNum : String = _
|
||||
var executorMem : String = _
|
||||
var executorCores : String = _
|
||||
|
||||
//flow json string
|
||||
var flowJson: String = _
|
||||
|
||||
def init(map : Map[String, Any]) = {
|
||||
|
||||
val flowJsonOjb = JsonUtil.toJson(map)
|
||||
this.flowJson = JsonUtil.format(flowJsonOjb)
|
||||
|
||||
val flowMap = MapUtil.get(map, "flow").asInstanceOf[Map[String, Any]]
|
||||
|
||||
|
||||
this.uuid = MapUtil.get(flowMap,"uuid").asInstanceOf[String]
|
||||
this.name = MapUtil.get(flowMap,"name").asInstanceOf[String]
|
||||
this.checkpoint = flowMap.getOrElse("checkpoint","").asInstanceOf[String]
|
||||
|
@ -28,6 +45,11 @@ class FlowBean {
|
|||
this.runMode = flowMap.getOrElse("runMode","RUN").asInstanceOf[String]
|
||||
this.showData = flowMap.getOrElse("showData","0").asInstanceOf[String]
|
||||
|
||||
this.driverMem = flowMap.getOrElse("driverMemory","1g").asInstanceOf[String]
|
||||
this.executorNum = flowMap.getOrElse("executorNumber","1").asInstanceOf[String]
|
||||
this.executorMem= flowMap.getOrElse("executorMemory","1g").asInstanceOf[String]
|
||||
this.executorCores = flowMap.getOrElse("executorCores","1").asInstanceOf[String]
|
||||
|
||||
//construct StopBean List
|
||||
val stopsList = MapUtil.get(flowMap,"stops").asInstanceOf[List[Map[String, Any]]]
|
||||
stopsList.foreach( stopMap => {
|
||||
|
@ -48,10 +70,16 @@ class FlowBean {
|
|||
def constructFlow()= {
|
||||
val flow = new FlowImpl();
|
||||
|
||||
flow.setFlowJson(this.flowJson)
|
||||
flow.setFlowName(this.name)
|
||||
flow.setCheckpointParentProcessId(this.checkpointParentProcessId)
|
||||
flow.setRunMode(this.runMode)
|
||||
|
||||
flow.setDriverMemory(this.driverMem)
|
||||
flow.setExecutorNum(this.executorNum)
|
||||
flow.setExecutorCores(this.executorCores)
|
||||
flow.setExecutorMem(this.executorMem)
|
||||
|
||||
this.stops.foreach( stopBean => {
|
||||
flow.addStop(stopBean.name,stopBean.constructStop())
|
||||
})
|
||||
|
|
|
@ -0,0 +1,8 @@
|
|||
package cn.piflow.conf.bean
|
||||
|
||||
/**
|
||||
* Created by xjzhu@cnic.cn on 4/25/19
|
||||
*/
|
||||
class ProjectBean {
|
||||
|
||||
}
|
|
@ -0,0 +1,39 @@
|
|||
package cn.piflow.conf.util
|
||||
|
||||
import scala.util.parsing.json.{JSONArray, JSONFormat, JSONObject}
|
||||
|
||||
/**
|
||||
* Created by xjzhu@cnic.cn on 4/30/19
|
||||
*/
|
||||
object JsonUtil {
|
||||
|
||||
def toJson(arr : List[Any]) : JSONArray = {
|
||||
JSONArray(arr.map {
|
||||
case (innerMap : Map[String, Any]) => toJson(innerMap)
|
||||
case (innerArray : List[Any]) => toJson(innerArray)
|
||||
case (other) => other
|
||||
})
|
||||
}
|
||||
|
||||
def toJson(map:Map[String,Any]):JSONObject = {
|
||||
JSONObject(map.map {
|
||||
case(key, innerMap:Map[String, Any]) => (key, toJson(innerMap))
|
||||
case(key, innerArray: List[Any]) => (key, toJson(innerArray))
|
||||
case(key, other) => (key, other)
|
||||
})
|
||||
}
|
||||
|
||||
def format(t:Any, i: Int = 0) : String = t match {
|
||||
case o: JSONObject =>
|
||||
o.obj.map{
|
||||
case (k, v) =>
|
||||
" "*(i+1) + JSONFormat.defaultFormatter(k) + ": " + format(v, i+1)
|
||||
}.mkString("{\n",",\n","\n" + " "*i + "}")
|
||||
case a: JSONArray =>
|
||||
a.list.map{
|
||||
e => " "*(i+1) + format(e, i+1)
|
||||
}.mkString("[\n",",\n","\n" + " "*i + "]")
|
||||
case _ => JSONFormat defaultFormatter t
|
||||
}
|
||||
|
||||
}
|
|
@ -6,6 +6,11 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
|
|||
|
||||
import scala.collection.mutable.{ArrayBuffer, Map => MMap}
|
||||
import cn.piflow.Execution
|
||||
import cn.piflow.util.PropertyUtil
|
||||
import cn.piflow.util.PropertyUtil
|
||||
import org.apache.spark.launcher.SparkAppHandle.State
|
||||
import org.apache.spark.launcher.{SparkAppHandle, SparkLauncher}
|
||||
import org.apache.spark.sql.SparkSession
|
||||
|
||||
/**
|
||||
* Created by bluejoe on 2018/6/27.
|
||||
|
@ -58,7 +63,7 @@ trait FlowGroupExecution extends Execution{
|
|||
|
||||
}
|
||||
|
||||
class FlowGroupExecutionImpl(fg: FlowGroup, runnerContext: Context, runner: Runner) extends FlowGroupExecution {
|
||||
/*class FlowGroupExecutionImpl(fg: FlowGroup, runnerContext: Context, runner: Runner) extends FlowGroupExecution {
|
||||
val flowGroupContext = createContext(runnerContext);
|
||||
val flowGroupExecution = this;
|
||||
|
||||
|
@ -126,6 +131,8 @@ class FlowGroupExecutionImpl(fg: FlowGroup, runnerContext: Context, runner: Runn
|
|||
}
|
||||
|
||||
private def startProcess(name: String, flow: Flow): Unit = {
|
||||
|
||||
//TODO
|
||||
val process = runner.start(flow);
|
||||
startedProcesses(name) = process;
|
||||
}
|
||||
|
@ -198,4 +205,161 @@ class FlowGroupExecutionImpl(fg: FlowGroup, runnerContext: Context, runner: Runn
|
|||
override def isEntryCompleted(name: String): Boolean = {
|
||||
completedProcesses(name);
|
||||
}
|
||||
}
|
||||
}*/
|
||||
|
||||
class FlowGroupExecutionImpl(fg: FlowGroup, runnerContext: Context, runner: Runner) extends FlowGroupExecution {
|
||||
val flowGroupContext = createContext(runnerContext);
|
||||
val flowGroupExecution = this;
|
||||
|
||||
val mapFlowWithConditions: Map[String, (Flow, Condition[FlowGroupExecution])] = fg.mapFlowWithConditions();
|
||||
val completedProcesses = MMap[String, Boolean]();
|
||||
completedProcesses ++= mapFlowWithConditions.map(x => (x._1, false));
|
||||
val numWaitingProcesses = new AtomicInteger(mapFlowWithConditions.size);
|
||||
|
||||
val startedProcesses = MMap[String, SparkAppHandle]();
|
||||
|
||||
val execution = this;
|
||||
val POLLING_INTERVAL = 1000;
|
||||
val latch = new CountDownLatch(1);
|
||||
var running = true;
|
||||
|
||||
|
||||
val runnerListener = runner.getListener()
|
||||
|
||||
|
||||
def isFlowGroupCompleted(): Boolean = {
|
||||
completedProcesses.foreach( en =>{
|
||||
if(en._2 == false){
|
||||
return false
|
||||
}
|
||||
})
|
||||
return true
|
||||
}
|
||||
|
||||
private def startProcess(name: String, flow: Flow): Unit = {
|
||||
|
||||
println(flow.getFlowJson())
|
||||
|
||||
var flowJson = flow.getFlowJson()
|
||||
flowJson = flowJson.replaceAll("}","}\n")
|
||||
//TODO
|
||||
var appId : String = ""
|
||||
val countDownLatch = new CountDownLatch(1)
|
||||
val launcher = new SparkLauncher
|
||||
val handle =launcher
|
||||
.setAppName(flow.getFlowName())
|
||||
.setMaster(PropertyUtil.getPropertyValue("spark.master"))
|
||||
.setDeployMode(PropertyUtil.getPropertyValue("spark.deploy.mode"))
|
||||
.setAppResource(PropertyUtil.getPropertyValue("piflow.bundle"))
|
||||
.setVerbose(true)
|
||||
.setConf("spark.hadoop.yarn.resourcemanager.hostname", PropertyUtil.getPropertyValue("yarn.resourcemanager.hostname"))
|
||||
.setConf("spark.hadoop.yarn.resourcemanager.address", PropertyUtil.getPropertyValue("yarn.resourcemanager.address"))
|
||||
.setConf("spark.yarn.access.namenode", PropertyUtil.getPropertyValue("yarn.access.namenode"))
|
||||
.setConf("spark.yarn.stagingDir", PropertyUtil.getPropertyValue("yarn.stagingDir"))
|
||||
.setConf("spark.yarn.jars", PropertyUtil.getPropertyValue("yarn.jars"))
|
||||
.setConf("spark.jars", PropertyUtil.getPropertyValue("piflow.bundle"))
|
||||
.setConf("spark.hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
|
||||
.setConf("spark.driver.memory", flow.getDriverMemory())
|
||||
.setConf("spark.num.executors", flow.getExecutorNum())
|
||||
.setConf("spark.executor.memory", flow.getExecutorMem())
|
||||
.setConf("spark.executor.cores",flow.getExecutorCores())
|
||||
.addFile(PropertyUtil.getConfigureFile())
|
||||
.setMainClass("cn.piflow.api.StartFlowMain")
|
||||
.addAppArgs(flowJson)
|
||||
.startApplication( new SparkAppHandle.Listener {
|
||||
override def stateChanged(handle: SparkAppHandle): Unit = {
|
||||
appId = handle.getAppId
|
||||
val sparkAppState = handle.getState
|
||||
if(appId != null){
|
||||
println("Spark job with app id: " + appId + ",\t State changed to: " + sparkAppState)
|
||||
}else{
|
||||
println("Spark job's state changed to: " + sparkAppState)
|
||||
}
|
||||
|
||||
//TODO: get the process status
|
||||
if (handle.getState.equals(State.FINISHED)){
|
||||
completedProcesses(flow.getFlowName()) = true;
|
||||
numWaitingProcesses.decrementAndGet();
|
||||
}
|
||||
if (handle.getState().isFinal){
|
||||
countDownLatch.countDown()
|
||||
println("Task is finished!")
|
||||
}
|
||||
}
|
||||
override def infoChanged(handle: SparkAppHandle): Unit = {
|
||||
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
startedProcesses(name) = handle;
|
||||
}
|
||||
|
||||
val pollingThread = new Thread(new Runnable() {
|
||||
override def run(): Unit = {
|
||||
|
||||
runnerListener.onFlowGroupStarted(flowGroupContext)
|
||||
|
||||
while (numWaitingProcesses.get() > 0) {
|
||||
val todos = ArrayBuffer[(String, Flow)]();
|
||||
mapFlowWithConditions.foreach { en =>
|
||||
if (!startedProcesses.contains(en._1) && en._2._2.matches(execution)) {
|
||||
todos += (en._1 -> en._2._1);
|
||||
}
|
||||
}
|
||||
|
||||
startedProcesses.synchronized {
|
||||
todos.foreach(en => startProcess(en._1, en._2));
|
||||
}
|
||||
|
||||
Thread.sleep(POLLING_INTERVAL);
|
||||
}
|
||||
|
||||
latch.countDown();
|
||||
finalizeExecution(true);
|
||||
|
||||
runnerListener.onFlowGroupCompleted(flowGroupContext)
|
||||
//TODO: how to define FlowGroup Failed
|
||||
//runnerListener.onFlowGroupFailed(ctx)
|
||||
}
|
||||
});
|
||||
|
||||
pollingThread.start();
|
||||
|
||||
override def awaitTermination(): Unit = {
|
||||
latch.await();
|
||||
finalizeExecution(true);
|
||||
}
|
||||
|
||||
override def stop(): Unit = {
|
||||
finalizeExecution(false);
|
||||
}
|
||||
|
||||
override def awaitTermination(timeout: Long, unit: TimeUnit): Unit = {
|
||||
if (!latch.await(timeout, unit))
|
||||
finalizeExecution(false);
|
||||
}
|
||||
|
||||
private def finalizeExecution(completed: Boolean): Unit = {
|
||||
if (running) {
|
||||
if (!completed) {
|
||||
pollingThread.interrupt();
|
||||
startedProcesses.filter(x => isEntryCompleted(x._1)).map(_._2).foreach(_.stop());
|
||||
}
|
||||
|
||||
running = false;
|
||||
}
|
||||
}
|
||||
|
||||
private def createContext(runnerContext: Context): FlowGroupContext = {
|
||||
new CascadeContext(runnerContext) with FlowGroupContext {
|
||||
override def getFlowGroup(): FlowGroup = fg
|
||||
|
||||
override def getFlowGroupExecution(): FlowGroupExecution = flowGroupExecution
|
||||
};
|
||||
}
|
||||
|
||||
override def isEntryCompleted(name: String): Boolean = {
|
||||
completedProcesses(name);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ import cn.piflow.util._
|
|||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.fs.FileSystem
|
||||
import org.apache.kafka.common.serialization.StringDeserializer
|
||||
import org.apache.spark.launcher.SparkAppHandle
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql._
|
||||
import org.apache.spark.sql.types.{StringType, StructField, StructType}
|
||||
|
@ -101,6 +102,30 @@ trait Flow extends ProjectEntry{
|
|||
def hasStreamingStop() : Boolean;
|
||||
|
||||
def getStreamingStop() : (String, StreamingStop);
|
||||
|
||||
|
||||
//Flow Josn String API
|
||||
def setFlowJson(flowJson:String);
|
||||
|
||||
def getFlowJson() : String;
|
||||
|
||||
|
||||
// Flow resource API
|
||||
def setDriverMemory(driverMem:String) ;
|
||||
|
||||
def getDriverMemory() : String;
|
||||
|
||||
def setExecutorNum(executorNum:String) ;
|
||||
|
||||
def getExecutorNum() : String;
|
||||
|
||||
def setExecutorMem(executorMem:String) ;
|
||||
|
||||
def getExecutorMem() : String;
|
||||
|
||||
def setExecutorCores(executorCores:String) ;
|
||||
|
||||
def getExecutorCores() : String;
|
||||
}
|
||||
|
||||
class FlowImpl extends Flow {
|
||||
|
@ -110,6 +135,13 @@ class FlowImpl extends Flow {
|
|||
val checkpoints = ArrayBuffer[String]();
|
||||
var checkpointParentProcessId = ""
|
||||
var runMode = ""
|
||||
var flowJson = ""
|
||||
|
||||
//Flow Resource
|
||||
var driverMem = ""
|
||||
var executorNum = ""
|
||||
var executorMem= ""
|
||||
var executorCores = ""
|
||||
|
||||
def addStop(name: String, process: Stop) = {
|
||||
stops(name) = process;
|
||||
|
@ -248,6 +280,46 @@ class FlowImpl extends Flow {
|
|||
}}
|
||||
null
|
||||
}
|
||||
|
||||
override def setFlowJson(flowJson: String): Unit = {
|
||||
this.flowJson = flowJson
|
||||
}
|
||||
|
||||
override def getFlowJson(): String = {
|
||||
flowJson
|
||||
}
|
||||
|
||||
override def setDriverMemory(driverMem: String): Unit = {
|
||||
this.driverMem = driverMem
|
||||
}
|
||||
|
||||
override def getDriverMemory(): String = {
|
||||
this.driverMem
|
||||
}
|
||||
|
||||
override def setExecutorNum(executorNum: String): Unit = {
|
||||
this.executorNum = executorNum
|
||||
}
|
||||
|
||||
override def getExecutorNum(): String = {
|
||||
this.executorNum
|
||||
}
|
||||
|
||||
override def setExecutorMem(executorMem: String): Unit = {
|
||||
this.executorMem = executorMem
|
||||
}
|
||||
|
||||
override def getExecutorMem(): String = {
|
||||
this.executorMem
|
||||
}
|
||||
|
||||
override def setExecutorCores(executorCores: String): Unit = {
|
||||
this.executorCores = executorCores
|
||||
}
|
||||
|
||||
override def getExecutorCores(): String = {
|
||||
this.executorCores
|
||||
}
|
||||
}
|
||||
|
||||
trait AnalyzedFlowGraph {
|
||||
|
|
|
@ -10,6 +10,7 @@ import org.apache.spark.sql.SparkSession
|
|||
import cn.piflow.conf.util.{ClassUtil, MapUtil, OptionUtil}
|
||||
import cn.piflow.{Process, Runner}
|
||||
import cn.piflow.api.util.{HdfsUtil, PropertyUtil}
|
||||
import cn.piflow.conf.bean.FlowGroupBean
|
||||
import cn.piflow.util.{FlowState, H2Util, HadoopFileUtil}
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.fs.{FileSystem, Path}
|
||||
|
@ -23,7 +24,25 @@ import scala.util.parsing.json.JSON
|
|||
|
||||
object API {
|
||||
|
||||
def startFlowGroup(flowGroupJson : String):(String,String,SparkAppHandle) = {
|
||||
def startFlowGroup(flowGroupJson : String) = {
|
||||
|
||||
println("StartFlowGroup API get json: \n" + flowGroupJson )
|
||||
|
||||
var appId:String = null
|
||||
val map = OptionUtil.getAny(JSON.parseFull(flowGroupJson)).asInstanceOf[Map[String, Any]]
|
||||
val flowGroupMap = MapUtil.get(map, "group").asInstanceOf[Map[String, Any]]
|
||||
|
||||
//create flowGroup
|
||||
val flowGroupBean = FlowGroupBean(map)
|
||||
val flowGroup = flowGroupBean.constructFlowGroup()
|
||||
|
||||
val process = Runner.create()
|
||||
.bind("checkpoint.path",PropertyUtil.getPropertyValue("checkpoint.path"))
|
||||
.bind("debug.path",PropertyUtil.getPropertyValue("debug.path"))
|
||||
.start(flowGroup);
|
||||
}
|
||||
|
||||
/*def startFlowGroup(flowGroupJson : String):(String,String,SparkAppHandle) = {
|
||||
|
||||
var appId:String = null
|
||||
val map = OptionUtil.getAny(JSON.parseFull(flowGroupJson)).asInstanceOf[Map[String, Any]]
|
||||
|
@ -95,7 +114,7 @@ object API {
|
|||
}
|
||||
(appId, processId, handle)
|
||||
|
||||
}
|
||||
}*/
|
||||
|
||||
def startFlow(flowJson : String):(String,String,SparkAppHandle) = {
|
||||
|
||||
|
|
|
@ -1,11 +1,14 @@
|
|||
package cn.piflow.api
|
||||
|
||||
import cn.piflow.conf.util.{JsonUtil, MapUtil, OptionUtil}
|
||||
import org.apache.http.client.config.RequestConfig
|
||||
import org.apache.http.client.methods.{CloseableHttpResponse, HttpPost}
|
||||
import org.apache.http.entity.StringEntity
|
||||
import org.apache.http.impl.client.HttpClientBuilder
|
||||
import org.apache.http.util.EntityUtils
|
||||
|
||||
import scala.util.parsing.json.{JSON, JSONArray, JSONObject}
|
||||
|
||||
object HTTPClientStartFlowGroup {
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
|
@ -19,6 +22,9 @@ object HTTPClientStartFlowGroup {
|
|||
| "flow": {
|
||||
| "name": "one",
|
||||
| "uuid": "1234",
|
||||
| "executorNumber": "2",
|
||||
| "executorMemory": "1g",
|
||||
| "executorCores": "1",
|
||||
| "stops": [{
|
||||
| "uuid": "1111",
|
||||
| "name": "XmlParser",
|
||||
|
@ -112,10 +118,11 @@ object HTTPClientStartFlowGroup {
|
|||
| }
|
||||
| ],
|
||||
|
|
||||
| "conditions": [{
|
||||
| "entry": "one",
|
||||
| "after": "another"
|
||||
| }
|
||||
| "conditions": [
|
||||
| {
|
||||
| "entry": "one",
|
||||
| "after": "another"
|
||||
| }
|
||||
|
|
||||
| ]
|
||||
| }
|
||||
|
@ -141,6 +148,22 @@ object HTTPClientStartFlowGroup {
|
|||
val entity = response.getEntity
|
||||
val str = EntityUtils.toString(entity,"UTF-8")
|
||||
println("Code is " + str)
|
||||
|
||||
/*val map = OptionUtil.getAny(JSON.parseFull(json)).asInstanceOf[Map[String, Any]]
|
||||
|
||||
val jsonObj = JsonUtil.toJson(map)
|
||||
val str = JsonUtil.format(jsonObj)
|
||||
//convertJson = convertJson.replaceAll("\\{","\\{\t\n")
|
||||
|
||||
|
||||
//var convertJson1 = convertJson.replaceAll("{","{\n")
|
||||
//var convertJson2 = convertJson1.replaceAll("}","}\n")
|
||||
println("Convert Json===" + str)
|
||||
|
||||
val convertMap = OptionUtil.getAny(JSON.parseFull(str)).asInstanceOf[Map[String, Any]]
|
||||
println("Convert Map===" + convertMap)*/
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -226,10 +226,11 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup
|
|||
var flowGroupJson = data.utf8String
|
||||
flowGroupJson = flowGroupJson.replaceAll("}","}\n")
|
||||
//flowJson = JsonFormatTool.formatJson(flowJson)
|
||||
val (appId,pid,process) = API.startFlowGroup(flowGroupJson)
|
||||
processMap += (appId -> process)
|
||||
val result = "{\"flow\":{\"id\":\"" + appId + "\",\"pid\":\"" + pid + "\"}}"
|
||||
Future.successful(HttpResponse(entity = result))
|
||||
//val (appId,pid,process) = API.startFlowGroup(flowGroupJson)
|
||||
API.startFlowGroup(flowGroupJson)
|
||||
//processMap += (appId -> process)
|
||||
//val result = "{\"flow\":{\"id\":\"" + appId + "\",\"pid\":\"" + pid + "\"}}"
|
||||
Future.successful(HttpResponse(entity = "okokok!!!"))
|
||||
}
|
||||
|
||||
case ex => {
|
||||
|
|
|
@ -1,11 +1,12 @@
|
|||
package cn.piflow.api
|
||||
|
||||
import cn.piflow.Runner
|
||||
import cn.piflow.{Flow, Runner}
|
||||
import cn.piflow.api.util.PropertyUtil
|
||||
import cn.piflow.conf.bean.FlowGroupBean
|
||||
import cn.piflow.conf.util.OptionUtil
|
||||
import org.apache.spark.sql.SparkSession
|
||||
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
import scala.util.parsing.json.JSON
|
||||
|
||||
object StartFlowGroupMain {
|
||||
|
@ -28,7 +29,7 @@ object StartFlowGroupMain {
|
|||
//val checkpointPath = spark.sparkContext.getConf.get("checkpoint.path")
|
||||
|
||||
val process = Runner.create()
|
||||
.bind(classOf[SparkSession].getName, spark)
|
||||
//.bind(classOf[SparkSession].getName, spark)
|
||||
.bind("checkpoint.path",PropertyUtil.getPropertyValue("checkpoint.path"))
|
||||
.bind("debug.path",PropertyUtil.getPropertyValue("debug.path"))
|
||||
.start(flowGroup);
|
||||
|
|
Loading…
Reference in New Issue