Merge branch 'master' of https://github.com/cas-bigdatalab/piflow
This commit is contained in:
commit
763644cfda
Binary file not shown.
|
@ -4,7 +4,7 @@ server.port=8001
|
|||
#spark.master=spark://10.0.86.89:7077
|
||||
#spark.master=spark://10.0.86.191:7077
|
||||
spark.master=yarn
|
||||
spark.deploy.mode=client
|
||||
spark.deploy.mode=cluster
|
||||
yarn.resourcemanager.hostname=10.0.86.191
|
||||
yarn.resourcemanager.address=10.0.86.191:8032
|
||||
yarn.access.namenode=hdfs://10.0.86.191:9000
|
||||
|
@ -13,7 +13,10 @@ yarn.jars=hdfs://10.0.86.191:9000/user/spark/share/lib/*.jar
|
|||
|
||||
hive.metastore.uris=thrift://10.0.86.191:9083
|
||||
|
||||
piflow.bundle=/opt/project/piflow/out/artifacts/piflow_bundle/piflow-bundle.jar
|
||||
#piflow.bundle=/opt/project/piflow/out/artifacts/piflow_bundle/piflow-bundle.jar
|
||||
piflow.bundle=/opt/project/piflow/piflow-server/target/piflow-server-0.9.jar
|
||||
|
||||
yarn.url=http://10.0.86.191:8088/ws/v1/cluster/apps/
|
||||
checkpoint.path=hdfs://10.0.86.89:9000/xjzhu/piflow/checkpoints/"
|
||||
checkpoint.path=hdfs://10.0.86.89:9000/xjzhu/piflow/checkpoints/
|
||||
|
||||
log.path=/opt/project/piflow/logs
|
|
@ -26,7 +26,7 @@
|
|||
<dependency>
|
||||
<groupId>org.reflections</groupId>
|
||||
<artifactId>reflections</artifactId>
|
||||
<version>0.9.11</version>
|
||||
<version>0.9.9</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.chuusai</groupId>
|
||||
|
@ -109,7 +109,6 @@
|
|||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
|
|
|
@ -8,7 +8,7 @@ import org.junit.Test
|
|||
|
||||
import scala.util.parsing.json.JSON
|
||||
|
||||
class FlowTest {
|
||||
class FlowTest_XX {
|
||||
|
||||
@Test
|
||||
def testFlow(): Unit ={
|
||||
|
|
|
@ -1,21 +1,24 @@
|
|||
package cn.piflow.api
|
||||
|
||||
import cn.piflow.Runner
|
||||
import cn.piflow.conf.bean.{FlowBean, PropertyDescriptor}
|
||||
import java.io.File
|
||||
import java.text.SimpleDateFormat
|
||||
import java.util.Date
|
||||
import java.util.concurrent.CountDownLatch
|
||||
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import cn.piflow.conf.util.{ClassUtil, OptionUtil}
|
||||
import cn.piflow.conf.util.{ClassUtil, MapUtil, OptionUtil}
|
||||
import cn.piflow.Process
|
||||
import cn.piflow.api.util.{PropertyUtil}
|
||||
import cn.piflow.api.util.PropertyUtil
|
||||
import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet, HttpPost}
|
||||
import org.apache.http.impl.client.HttpClients
|
||||
import org.apache.http.util.EntityUtils
|
||||
import org.apache.spark.launcher.SparkLauncher
|
||||
import org.apache.spark.launcher.{SparkAppHandle, SparkLauncher}
|
||||
|
||||
import scala.util.parsing.json.JSON
|
||||
|
||||
object API {
|
||||
|
||||
def startFlow(flowJson : String):(String,Process) = {
|
||||
/*def startFlow(flowJson : String):(String,Process) = {
|
||||
|
||||
val map = OptionUtil.getAny(JSON.parseFull(flowJson)).asInstanceOf[Map[String, Any]]
|
||||
println(map)
|
||||
|
@ -53,22 +56,75 @@ object API {
|
|||
new Thread( new WaitProcessTerminateRunnable(spark, process)).start()
|
||||
(applicationId,process)
|
||||
|
||||
/*val launcher = new SparkLauncher
|
||||
launcher.setMaster(PropertyUtil.getPropertyValue("spark.master"))
|
||||
.setAppName("test")
|
||||
}*/
|
||||
def startFlow(flowJson : String):(String,SparkAppHandle) = {
|
||||
|
||||
var appId:String = null
|
||||
val map = OptionUtil.getAny(JSON.parseFull(flowJson)).asInstanceOf[Map[String, Any]]
|
||||
val flowMap = MapUtil.get(map, "flow").asInstanceOf[Map[String, Any]]
|
||||
val uuid = MapUtil.get(flowMap,"uuid").asInstanceOf[String]
|
||||
val appName = MapUtil.get(flowMap,"name").asInstanceOf[String]
|
||||
|
||||
val (stdout, stderr) = getLogFile(uuid, appName)
|
||||
|
||||
val countDownLatch = new CountDownLatch(1)
|
||||
val launcher = new SparkLauncher
|
||||
val handle =launcher
|
||||
.setAppName(appName)
|
||||
.setMaster(PropertyUtil.getPropertyValue("spark.master"))
|
||||
.setDeployMode(PropertyUtil.getPropertyValue("spark.deploy.mode"))
|
||||
.setAppResource(PropertyUtil.getPropertyValue("piflow.bundle"))
|
||||
.setVerbose(true)
|
||||
.setConf("spark.hadoop.yarn.resourcemanager.hostname", PropertyUtil.getPropertyValue("yarn.resourcemanager.hostname"))
|
||||
.setConf("spark.hadoop.yarn.resourcemanager.address", PropertyUtil.getPropertyValue("yarn.resourcemanager.address")).setConf("spark.yarn.access.namenode", PropertyUtil.getPropertyValue("yarn.access.namenode"))
|
||||
.setConf("spark.hadoop.yarn.resourcemanager.address", PropertyUtil.getPropertyValue("yarn.resourcemanager.address"))
|
||||
.setConf("spark.yarn.access.namenode", PropertyUtil.getPropertyValue("yarn.access.namenode"))
|
||||
.setConf("spark.yarn.stagingDir", PropertyUtil.getPropertyValue("yarn.stagingDir"))
|
||||
.setConf("spark.yarn.jars", PropertyUtil.getPropertyValue("yarn.jars"))
|
||||
.setConf("spark.jars", PropertyUtil.getPropertyValue("piflow.bundle"))
|
||||
.setConf("hive.metastore.uris", PropertyUtil.getPropertyValue("hive.metastore.uris"))
|
||||
.setMainClass("lalla")
|
||||
.addAppArgs(flowJson)*/
|
||||
.setConf("spark.hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
|
||||
.setConf("spark.driver.memory", "1g")
|
||||
.setConf("spark.executor.memory", "1g")
|
||||
.setConf("spark.cores.max", "2")
|
||||
.setMainClass("cn.piflow.api.StartFlowMain")
|
||||
.addAppArgs(flowJson)
|
||||
.redirectOutput(stdout)
|
||||
.redirectError(stderr)
|
||||
.startApplication( new SparkAppHandle.Listener {
|
||||
override def stateChanged(handle: SparkAppHandle): Unit = {
|
||||
appId = handle.getAppId
|
||||
val sparkAppState = handle.getState
|
||||
if(appId != null){
|
||||
println("Spark job with app id: " + appId + ",\t State changed to: " + sparkAppState)
|
||||
}else{
|
||||
println("Spark job's state changed to: " + sparkAppState)
|
||||
}
|
||||
if (handle.getState().isFinal){
|
||||
countDownLatch.countDown()
|
||||
println("Task is finished!")
|
||||
}
|
||||
}
|
||||
override def infoChanged(handle: SparkAppHandle): Unit = {
|
||||
//println("Info:" + handle.getState().toString)
|
||||
}
|
||||
})
|
||||
while (appId == null){
|
||||
Thread.sleep(1000)
|
||||
}
|
||||
//println("Task is executing, please wait...")
|
||||
//countDownLatch.await()
|
||||
//println("Task is finished!")
|
||||
|
||||
/*launcher.launch()
|
||||
val sparkAppHandle : SparkAppHandle = launcher.startApplication()
|
||||
|
||||
while(sparkAppHandle.getState != SparkAppHandle.State.FINISHED){
|
||||
Thread.sleep(10000)
|
||||
println("ApplicationId = " + sparkAppHandle.getAppId + "---Current State = " + sparkAppHandle.getState)
|
||||
}*/
|
||||
(appId, handle)
|
||||
}
|
||||
|
||||
def stopFlow(process : Process): String = {
|
||||
def stopFlow(process : SparkAppHandle): String = {
|
||||
process.stop()
|
||||
"ok"
|
||||
}
|
||||
|
@ -102,6 +158,20 @@ object API {
|
|||
"""{"groups":"""" + groups + """"}"""
|
||||
}
|
||||
|
||||
private def getLogFile(uuid : String, appName : String) : (File,File) = {
|
||||
val now : Date = new Date()
|
||||
val dataFormat : SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd_HH:mm:ss")
|
||||
val nowDate = dataFormat.format(now)
|
||||
|
||||
val stdoutPathString = PropertyUtil.getPropertyValue("log.path") + "/" + appName + "_" + uuid + "_stdout_" + nowDate
|
||||
val stdout = new File(stdoutPathString)
|
||||
|
||||
val stderrPathString = PropertyUtil.getPropertyValue("log.path") + "/" + appName + "_" + uuid + "_stderr_" + nowDate
|
||||
val stderr = new File(stderrPathString)
|
||||
|
||||
(stdout, stderr)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class WaitProcessTerminateRunnable(spark : SparkSession, process: Process) extends Runnable {
|
||||
|
|
|
@ -9,7 +9,133 @@ import org.apache.http.util.EntityUtils
|
|||
object HTTPClientStartFlow {
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
val json = """{"flow":{"name":"test","uuid":"1234","stops":[{"uuid":"1111","name":"XmlParser","bundle":"cn.piflow.bundle.xml.XmlParser","properties":{"xmlpath":"hdfs://10.0.86.89:9000/xjzhu/dblp.mini.xml","rowTag":"phdthesis"}},{"uuid":"2222","name":"SelectField","bundle":"cn.piflow.bundle.common.SelectField","properties":{"schema":"title,author,pages"}},{"uuid":"3333","name":"PutHiveStreaming","bundle":"cn.piflow.bundle.hive.PutHiveStreaming","properties":{"database":"sparktest","table":"dblp_phdthesis"}},{"uuid":"4444","name":"CsvParser","bundle":"cn.piflow.bundle.csv.CsvParser","properties":{"csvPath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.csv","header":"false","delimiter":",","schema":"title,author,pages"}},{"uuid":"555","name":"Merge","bundle":"cn.piflow.bundle.common.Merge","properties":{}},{"uuid":"666","name":"Fork","bundle":"cn.piflow.bundle.common.Fork","properties":{"outports":["out1","out2","out3"]}},{"uuid":"777","name":"JsonSave","bundle":"cn.piflow.bundle.json.JsonSave","properties":{"jsonSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.json"}},{"uuid":"888","name":"CsvSave","bundle":"cn.piflow.bundle.csv.CsvSave","properties":{"csvSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis_result.csv","header":"true","delimiter":","}}],"paths":[{"from":"XmlParser","outport":"","inport":"","to":"SelectField"},{"from":"SelectField","outport":"","inport":"data1","to":"Merge"},{"from":"CsvParser","outport":"","inport":"data2","to":"Merge"},{"from":"Merge","outport":"","inport":"","to":"Fork"},{"from":"Fork","outport":"out1","inport":"","to":"PutHiveStreaming"},{"from":"Fork","outport":"out2","inport":"","to":"JsonSave"},{"from":"Fork","outport":"out3","inport":"","to":"CsvSave"}]}}"""
|
||||
//val json:String = """{"flow":{"name":"Flow","uuid":"1234","stops":[{"uuid":"1111","name":"XmlParser","bundle":"cn.piflow.bundle.xml.XmlParser","properties":{"xmlpath":"hdfs://10.0.86.89:9000/xjzhu/dblp.mini.xml","rowTag":"phdthesis"}},{"uuid":"2222","name":"SelectField","bundle":"cn.piflow.bundle.common.SelectField","properties":{"schema":"title,author,pages"}},{"uuid":"3333","name":"PutHiveStreaming","bundle":"cn.piflow.bundle.hive.PutHiveStreaming","properties":{"database":"sparktest","table":"dblp_phdthesis"}},{"uuid":"4444","name":"CsvParser","bundle":"cn.piflow.bundle.csv.CsvParser","properties":{"csvPath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.csv","header":"false","delimiter":",","schema":"title,author,pages"}},{"uuid":"555","name":"Merge","bundle":"cn.piflow.bundle.common.Merge","properties":{}},{"uuid":"666","name":"Fork","bundle":"cn.piflow.bundle.common.Fork","properties":{"outports":["out1","out2","out3"]}},{"uuid":"777","name":"JsonSave","bundle":"cn.piflow.bundle.json.JsonSave","properties":{"jsonSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.json"}},{"uuid":"888","name":"CsvSave","bundle":"cn.piflow.bundle.csv.CsvSave","properties":{"csvSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis_result.csv","header":"true","delimiter":","}}],"paths":[{"from":"XmlParser","outport":"","inport":"","to":"SelectField"},{"from":"SelectField","outport":"","inport":"data1","to":"Merge"},{"from":"CsvParser","outport":"","inport":"data2","to":"Merge"},{"from":"Merge","outport":"","inport":"","to":"Fork"},{"from":"Fork","outport":"out1","inport":"","to":"PutHiveStreaming"},{"from":"Fork","outport":"out2","inport":"","to":"JsonSave"},{"from":"Fork","outport":"out3","inport":"","to":"CsvSave"}]}}"""
|
||||
val json ="""
|
||||
|{
|
||||
| "flow":{
|
||||
| "name":"xml,csv-merge-fork-hive,json,csv",
|
||||
| "uuid":"1234",
|
||||
| "checkpoint":"Merge",
|
||||
| "stops":[
|
||||
| {
|
||||
| "uuid":"1111",
|
||||
| "name":"XmlParser",
|
||||
| "bundle":"cn.piflow.bundle.xml.XmlParser",
|
||||
| "properties":{
|
||||
| "xmlpath":"hdfs://10.0.86.89:9000/xjzhu/dblp.mini.xml",
|
||||
| "rowTag":"phdthesis"
|
||||
| }
|
||||
|
|
||||
| },
|
||||
| {
|
||||
| "uuid":"2222",
|
||||
| "name":"SelectField",
|
||||
| "bundle":"cn.piflow.bundle.common.SelectField",
|
||||
| "properties":{
|
||||
| "schema":"title,author,pages"
|
||||
| }
|
||||
|
|
||||
| },
|
||||
| {
|
||||
| "uuid":"3333",
|
||||
| "name":"PutHiveStreaming",
|
||||
| "bundle":"cn.piflow.bundle.hive.PutHiveStreaming",
|
||||
| "properties":{
|
||||
| "database":"sparktest",
|
||||
| "table":"dblp_phdthesis"
|
||||
| }
|
||||
| },
|
||||
| {
|
||||
| "uuid":"4444",
|
||||
| "name":"CsvParser",
|
||||
| "bundle":"cn.piflow.bundle.csv.CsvParser",
|
||||
| "properties":{
|
||||
| "csvPath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.csv",
|
||||
| "header":"false",
|
||||
| "delimiter":",",
|
||||
| "schema":"title,author,pages"
|
||||
| }
|
||||
| },
|
||||
| {
|
||||
| "uuid":"555",
|
||||
| "name":"Merge",
|
||||
| "bundle":"cn.piflow.bundle.common.Merge",
|
||||
| "properties":{}
|
||||
| },
|
||||
| {
|
||||
| "uuid":"666",
|
||||
| "name":"Fork",
|
||||
| "bundle":"cn.piflow.bundle.common.Fork",
|
||||
| "properties":{
|
||||
| "outports":["out1","out2","out3"]
|
||||
| }
|
||||
| },
|
||||
| {
|
||||
| "uuid":"777",
|
||||
| "name":"JsonSave",
|
||||
| "bundle":"cn.piflow.bundle.json.JsonSave",
|
||||
| "properties":{
|
||||
| "jsonSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.json"
|
||||
| }
|
||||
| },
|
||||
| {
|
||||
| "uuid":"888",
|
||||
| "name":"CsvSave",
|
||||
| "bundle":"cn.piflow.bundle.csv.CsvSave",
|
||||
| "properties":{
|
||||
| "csvSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis_result.csv",
|
||||
| "header":"true",
|
||||
| "delimiter":","
|
||||
| }
|
||||
| }
|
||||
| ],
|
||||
| "paths":[
|
||||
| {
|
||||
| "from":"XmlParser",
|
||||
| "outport":"",
|
||||
| "inport":"",
|
||||
| "to":"SelectField"
|
||||
| },
|
||||
| {
|
||||
| "from":"SelectField",
|
||||
| "outport":"",
|
||||
| "inport":"data1",
|
||||
| "to":"Merge"
|
||||
| },
|
||||
| {
|
||||
| "from":"CsvParser",
|
||||
| "outport":"",
|
||||
| "inport":"data2",
|
||||
| "to":"Merge"
|
||||
| },
|
||||
| {
|
||||
| "from":"Merge",
|
||||
| "outport":"",
|
||||
| "inport":"",
|
||||
| "to":"Fork"
|
||||
| },
|
||||
| {
|
||||
| "from":"Fork",
|
||||
| "outport":"out1",
|
||||
| "inport":"",
|
||||
| "to":"PutHiveStreaming"
|
||||
| },
|
||||
| {
|
||||
| "from":"Fork",
|
||||
| "outport":"out2",
|
||||
| "inport":"",
|
||||
| "to":"JsonSave"
|
||||
| },
|
||||
| {
|
||||
| "from":"Fork",
|
||||
| "outport":"out3",
|
||||
| "inport":"",
|
||||
| "to":"CsvSave"
|
||||
| }
|
||||
| ]
|
||||
| }
|
||||
|}
|
||||
""".stripMargin
|
||||
val url = "http://10.0.86.98:8001/flow/start"
|
||||
val client = HttpClients.createDefault()
|
||||
val post:HttpPost = new HttpPost(url)
|
||||
|
|
|
@ -0,0 +1,79 @@
|
|||
package cn.piflow.api
|
||||
|
||||
import org.apache.http.client.methods.{CloseableHttpResponse, HttpPost}
|
||||
import org.apache.http.entity.StringEntity
|
||||
import org.apache.http.impl.client.HttpClients
|
||||
import org.apache.http.util.EntityUtils
|
||||
|
||||
object HTTPClientStartFlow1 {
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
val json =
|
||||
"""
|
||||
|{
|
||||
| "flow":{
|
||||
| "name":"xml2csv",
|
||||
| "uuid":"1234",
|
||||
| "checkpoint":"Merge",
|
||||
| "stops":[
|
||||
| {
|
||||
| "uuid":"1111",
|
||||
| "name":"XmlParser",
|
||||
| "bundle":"cn.piflow.bundle.xml.XmlParser",
|
||||
| "properties":{
|
||||
| "xmlpath":"hdfs://10.0.86.89:9000/xjzhu/dblp.mini.xml",
|
||||
| "rowTag":"phdthesis"
|
||||
| }
|
||||
|
|
||||
| },
|
||||
| {
|
||||
| "uuid":"2222",
|
||||
| "name":"SelectField",
|
||||
| "bundle":"cn.piflow.bundle.common.SelectField",
|
||||
| "properties":{
|
||||
| "schema":"title,author,pages"
|
||||
| }
|
||||
|
|
||||
| },
|
||||
| {
|
||||
| "uuid":"888",
|
||||
| "name":"CsvSave",
|
||||
| "bundle":"cn.piflow.bundle.csv.CsvSave",
|
||||
| "properties":{
|
||||
| "csvSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis_result.csv",
|
||||
| "header":"true",
|
||||
| "delimiter":","
|
||||
| }
|
||||
| }
|
||||
| ],
|
||||
| "paths":[
|
||||
| {
|
||||
| "from":"XmlParser",
|
||||
| "outport":"",
|
||||
| "inport":"",
|
||||
| "to":"SelectField"
|
||||
| },
|
||||
| {
|
||||
| "from":"SelectField",
|
||||
| "outport":"",
|
||||
| "inport":"",
|
||||
| "to":"CsvSave"
|
||||
| }
|
||||
| ]
|
||||
| }
|
||||
|}
|
||||
""".stripMargin
|
||||
val url = "http://10.0.86.98:8001/flow/start"
|
||||
val client = HttpClients.createDefault()
|
||||
val post:HttpPost = new HttpPost(url)
|
||||
|
||||
post.addHeader("Content-Type", "application/json")
|
||||
post.setEntity(new StringEntity(json))
|
||||
|
||||
val response:CloseableHttpResponse = client.execute(post)
|
||||
val entity = response.getEntity
|
||||
val str = EntityUtils.toString(entity,"UTF-8")
|
||||
println("Code is " + str)
|
||||
}
|
||||
|
||||
}
|
|
@ -7,7 +7,7 @@ import org.apache.http.util.EntityUtils
|
|||
|
||||
object HTTPClientStopFlow {
|
||||
def main(args: Array[String]): Unit = {
|
||||
val json = """{"appID":"application_1536718350536_0023"}"""
|
||||
val json = """{"appID":"app-20180929163623-0059"}"""
|
||||
val url = "http://10.0.86.98:8001/flow/stop"
|
||||
val client = HttpClients.createDefault()
|
||||
val post:HttpPost = new HttpPost(url)
|
||||
|
|
|
@ -16,6 +16,7 @@ import com.typesafe.config.ConfigFactory
|
|||
import scala.concurrent.Future
|
||||
import scala.util.parsing.json.JSON
|
||||
import cn.piflow.Process
|
||||
import org.apache.spark.launcher.SparkAppHandle
|
||||
import spray.json.DefaultJsonProtocol
|
||||
|
||||
|
||||
|
@ -23,7 +24,7 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup
|
|||
implicit val system = ActorSystem("HTTPService", ConfigFactory.load())
|
||||
implicit val materializer = ActorMaterializer()
|
||||
implicit val executionContext = system.dispatcher
|
||||
var processMap = Map[String, Process]()
|
||||
var processMap = Map[String, SparkAppHandle]()
|
||||
|
||||
def toJson(entity: RequestEntity): Map[String, Any] = {
|
||||
entity match {
|
||||
|
@ -77,7 +78,7 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup
|
|||
|
||||
processMap.get(appId) match {
|
||||
case Some(process) =>
|
||||
val result = API.stopFlow(process.asInstanceOf[Process])
|
||||
val result = API.stopFlow(process)
|
||||
Future.successful(HttpResponse(entity = result))
|
||||
case _ =>
|
||||
Future.successful(HttpResponse(entity = "Can not found process Error!"))
|
||||
|
@ -105,7 +106,10 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup
|
|||
val stopGroups = API.getAllGroups()
|
||||
Future.successful(HttpResponse(entity = stopGroups))
|
||||
}catch {
|
||||
case _ => Future.successful(HttpResponse(entity = "Can not found stop properties Error!"))
|
||||
case ex => {
|
||||
println(ex)
|
||||
Future.successful(HttpResponse(entity = "Can not found stop properties Error!"))
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
package cn.piflow.api
|
||||
|
||||
import cn.piflow.Runner
|
||||
import cn.piflow.api.util.PropertyUtil
|
||||
import cn.piflow.conf.bean.FlowBean
|
||||
import cn.piflow.conf.util.OptionUtil
|
||||
import org.apache.spark.sql.SparkSession
|
||||
|
||||
import scala.util.parsing.json.JSON
|
||||
|
||||
object StartFlowMain {
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
val flowJson = args(0)
|
||||
println(flowJson)
|
||||
val map = OptionUtil.getAny(JSON.parseFull(flowJson)).asInstanceOf[Map[String, Any]]
|
||||
println(map)
|
||||
|
||||
//create flow
|
||||
val flowBean = FlowBean(map)
|
||||
val flow = flowBean.constructFlow()
|
||||
|
||||
//execute flow
|
||||
val spark = SparkSession.builder()
|
||||
.appName(flowBean.name)
|
||||
.enableHiveSupport()
|
||||
.getOrCreate()
|
||||
|
||||
val process = Runner.create()
|
||||
.bind(classOf[SparkSession].getName, spark)
|
||||
//.bind("checkpoint.path", PropertyUtil.getPropertyValue("checkpoint.path"))
|
||||
.bind("checkpoint.path","hdfs://10.0.86.89:9000/xjzhu/piflow/checkpoints/")
|
||||
.start(flow);
|
||||
val applicationId = spark.sparkContext.applicationId
|
||||
process.awaitTermination();
|
||||
spark.close();
|
||||
/*new Thread( new WaitProcessTerminateRunnable(spark, process)).start()
|
||||
(applicationId,process)*/
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue