support multi sparkContext to run more than one flow at the same time(but only in standalone mode, yarn still have problems)

This commit is contained in:
judy0131 2018-09-29 17:53:30 +08:00
parent 1b63b70c4d
commit 1825eb07ff
5 changed files with 101 additions and 23 deletions

View File

@ -1,10 +1,10 @@
server.ip=10.0.86.98
server.port=8001
#spark.master=spark://10.0.86.89:7077
spark.master=spark://10.0.86.89:7077
#spark.master=spark://10.0.86.191:7077
spark.master=yarn
spark.deploy.mode=client
#spark.master=yarn
spark.deploy.mode=cluster
yarn.resourcemanager.hostname=10.0.86.191
yarn.resourcemanager.address=10.0.86.191:8032
yarn.access.namenode=hdfs://10.0.86.191:9000
@ -13,7 +13,8 @@ yarn.jars=hdfs://10.0.86.191:9000/user/spark/share/lib/*.jar
hive.metastore.uris=thrift://10.0.86.191:9083
piflow.bundle=/opt/project/piflow/out/artifacts/piflow_bundle/piflow-bundle.jar
#piflow.bundle=/opt/project/piflow/out/artifacts/piflow_bundle/piflow-bundle.jar
piflow.bundle=/opt/project/piflow/piflow-server/target/piflow-server-0.9.jar
yarn.url=http://10.0.86.191:8088/ws/v1/cluster/apps/
checkpoint.path=hdfs://10.0.86.89:9000/xjzhu/piflow/checkpoints/"

View File

@ -1,21 +1,23 @@
package cn.piflow.api
import java.util.concurrent.CountDownLatch
import cn.piflow.Runner
import cn.piflow.conf.bean.{FlowBean, PropertyDescriptor}
import org.apache.spark.sql.SparkSession
import cn.piflow.conf.util.{ClassUtil, OptionUtil}
import cn.piflow.Process
import cn.piflow.api.util.{PropertyUtil}
import cn.piflow.api.util.PropertyUtil
import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet, HttpPost}
import org.apache.http.impl.client.HttpClients
import org.apache.http.util.EntityUtils
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.launcher.{SparkAppHandle, SparkLauncher}
import scala.util.parsing.json.JSON
object API {
def startFlow(flowJson : String):(String,Process) = {
/*def startFlow(flowJson : String):(String,Process) = {
val map = OptionUtil.getAny(JSON.parseFull(flowJson)).asInstanceOf[Map[String, Any]]
println(map)
@ -53,22 +55,57 @@ object API {
new Thread( new WaitProcessTerminateRunnable(spark, process)).start()
(applicationId,process)
/*val launcher = new SparkLauncher
launcher.setMaster(PropertyUtil.getPropertyValue("spark.master"))
.setAppName("test")
.setDeployMode(PropertyUtil.getPropertyValue("spark.deploy.mode"))
.setConf("spark.hadoop.yarn.resourcemanager.hostname", PropertyUtil.getPropertyValue("yarn.resourcemanager.hostname"))
.setConf("spark.hadoop.yarn.resourcemanager.address", PropertyUtil.getPropertyValue("yarn.resourcemanager.address")).setConf("spark.yarn.access.namenode", PropertyUtil.getPropertyValue("yarn.access.namenode"))
.setConf("spark.yarn.stagingDir", PropertyUtil.getPropertyValue("yarn.stagingDir"))
.setConf("spark.yarn.jars", PropertyUtil.getPropertyValue("yarn.jars"))
.setConf("spark.jars", PropertyUtil.getPropertyValue("piflow.bundle"))
.setConf("hive.metastore.uris", PropertyUtil.getPropertyValue("hive.metastore.uris"))
.setMainClass("lalla")
.addAppArgs(flowJson)*/
}*/
def startFlow(flowJson : String):(String,SparkAppHandle) = {
var appId:String = null
val countDownLatch = new CountDownLatch(1)
val launcher = new SparkLauncher
val handle =launcher.setMaster(PropertyUtil.getPropertyValue("spark.master"))
//.setMaster(PropertyUtil.getPropertyValue("spark.master"))
//.setDeployMode(PropertyUtil.getPropertyValue("spark.deploy.mode"))
.setAppResource(PropertyUtil.getPropertyValue("piflow.bundle"))
//.setVerbose(true)
.setConf("spark.driver.memory", "1g")
.setConf("spark.executor.memory", "1g")
.setConf("spark.cores.max", "1")
.setMainClass("cn.piflow.api.StartFlowMain")
.addAppArgs(flowJson)
.startApplication( new SparkAppHandle.Listener {
override def stateChanged(handle: SparkAppHandle): Unit = {
appId = handle.getAppId
val sparkAppState = handle.getState
if(appId != null){
println("Spark jon with app id: " + appId + ",\t State changed to: " + sparkAppState)
}else{
println("Spark jon's state changed to: " + sparkAppState)
}
if (handle.getState().isFinal){
countDownLatch.countDown()
println("Task is finished!")
}
}
override def infoChanged(handle: SparkAppHandle): Unit = {
//println("Info:" + handle.getState().toString)
}
})
while (appId == null){
Thread.sleep(1000)
}
//println("Task is executing, please wait...")
//countDownLatch.await()
//println("Task is finished!")
/*launcher.launch()
val sparkAppHandle : SparkAppHandle = launcher.startApplication()
while(sparkAppHandle.getState != SparkAppHandle.State.FINISHED){
Thread.sleep(10000)
println("ApplicationId = " + sparkAppHandle.getAppId + "---Current State = " + sparkAppHandle.getState)
}*/
(appId, handle)
}
def stopFlow(process : Process): String = {
def stopFlow(process : SparkAppHandle): String = {
process.stop()
"ok"
}

View File

@ -9,7 +9,7 @@ import org.apache.http.util.EntityUtils
object HTTPClientStartFlow {
def main(args: Array[String]): Unit = {
val json = """{"flow":{"name":"test","uuid":"1234","stops":[{"uuid":"1111","name":"XmlParser","bundle":"cn.piflow.bundle.xml.XmlParser","properties":{"xmlpath":"hdfs://10.0.86.89:9000/xjzhu/dblp.mini.xml","rowTag":"phdthesis"}},{"uuid":"2222","name":"SelectField","bundle":"cn.piflow.bundle.common.SelectField","properties":{"schema":"title,author,pages"}},{"uuid":"3333","name":"PutHiveStreaming","bundle":"cn.piflow.bundle.hive.PutHiveStreaming","properties":{"database":"sparktest","table":"dblp_phdthesis"}},{"uuid":"4444","name":"CsvParser","bundle":"cn.piflow.bundle.csv.CsvParser","properties":{"csvPath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.csv","header":"false","delimiter":",","schema":"title,author,pages"}},{"uuid":"555","name":"Merge","bundle":"cn.piflow.bundle.common.Merge","properties":{}},{"uuid":"666","name":"Fork","bundle":"cn.piflow.bundle.common.Fork","properties":{"outports":["out1","out2","out3"]}},{"uuid":"777","name":"JsonSave","bundle":"cn.piflow.bundle.json.JsonSave","properties":{"jsonSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.json"}},{"uuid":"888","name":"CsvSave","bundle":"cn.piflow.bundle.csv.CsvSave","properties":{"csvSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis_result.csv","header":"true","delimiter":","}}],"paths":[{"from":"XmlParser","outport":"","inport":"","to":"SelectField"},{"from":"SelectField","outport":"","inport":"data1","to":"Merge"},{"from":"CsvParser","outport":"","inport":"data2","to":"Merge"},{"from":"Merge","outport":"","inport":"","to":"Fork"},{"from":"Fork","outport":"out1","inport":"","to":"PutHiveStreaming"},{"from":"Fork","outport":"out2","inport":"","to":"JsonSave"},{"from":"Fork","outport":"out3","inport":"","to":"CsvSave"}]}}"""
val json = """{"flow":{"name":"Flow","uuid":"1234","stops":[{"uuid":"1111","name":"XmlParser","bundle":"cn.piflow.bundle.xml.XmlParser","properties":{"xmlpath":"hdfs://10.0.86.89:9000/xjzhu/dblp.mini.xml","rowTag":"phdthesis"}},{"uuid":"2222","name":"SelectField","bundle":"cn.piflow.bundle.common.SelectField","properties":{"schema":"title,author,pages"}},{"uuid":"3333","name":"PutHiveStreaming","bundle":"cn.piflow.bundle.hive.PutHiveStreaming","properties":{"database":"sparktest","table":"dblp_phdthesis"}},{"uuid":"4444","name":"CsvParser","bundle":"cn.piflow.bundle.csv.CsvParser","properties":{"csvPath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.csv","header":"false","delimiter":",","schema":"title,author,pages"}},{"uuid":"555","name":"Merge","bundle":"cn.piflow.bundle.common.Merge","properties":{}},{"uuid":"666","name":"Fork","bundle":"cn.piflow.bundle.common.Fork","properties":{"outports":["out1","out2","out3"]}},{"uuid":"777","name":"JsonSave","bundle":"cn.piflow.bundle.json.JsonSave","properties":{"jsonSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.json"}},{"uuid":"888","name":"CsvSave","bundle":"cn.piflow.bundle.csv.CsvSave","properties":{"csvSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis_result.csv","header":"true","delimiter":","}}],"paths":[{"from":"XmlParser","outport":"","inport":"","to":"SelectField"},{"from":"SelectField","outport":"","inport":"data1","to":"Merge"},{"from":"CsvParser","outport":"","inport":"data2","to":"Merge"},{"from":"Merge","outport":"","inport":"","to":"Fork"},{"from":"Fork","outport":"out1","inport":"","to":"PutHiveStreaming"},{"from":"Fork","outport":"out2","inport":"","to":"JsonSave"},{"from":"Fork","outport":"out3","inport":"","to":"CsvSave"}]}}"""
val url = "http://10.0.86.98:8001/flow/start"
val client = HttpClients.createDefault()
val post:HttpPost = new HttpPost(url)

View File

@ -16,6 +16,7 @@ import com.typesafe.config.ConfigFactory
import scala.concurrent.Future
import scala.util.parsing.json.JSON
import cn.piflow.Process
import org.apache.spark.launcher.SparkAppHandle
import spray.json.DefaultJsonProtocol
@ -23,7 +24,7 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup
implicit val system = ActorSystem("HTTPService", ConfigFactory.load())
implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher
var processMap = Map[String, Process]()
var processMap = Map[String, SparkAppHandle]()
def toJson(entity: RequestEntity): Map[String, Any] = {
entity match {
@ -77,7 +78,7 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup
processMap.get(appId) match {
case Some(process) =>
val result = API.stopFlow(process.asInstanceOf[Process])
val result = API.stopFlow(process)
Future.successful(HttpResponse(entity = result))
case _ =>
Future.successful(HttpResponse(entity = "Can not found process Error!"))

View File

@ -0,0 +1,39 @@
package cn.piflow.api
import cn.piflow.Runner
import cn.piflow.api.util.PropertyUtil
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.OptionUtil
import org.apache.spark.sql.SparkSession
import scala.util.parsing.json.JSON
object StartFlowMain {
def main(args: Array[String]): Unit = {
val flowJson = args(0)
val map = OptionUtil.getAny(JSON.parseFull(flowJson)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
//execute flow
val spark = SparkSession.builder()
.appName(flowBean.name)
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", PropertyUtil.getPropertyValue("checkpoint.path"))
.start(flow);
val applicationId = spark.sparkContext.applicationId
process.awaitTermination();
spark.close();
/*new Thread( new WaitProcessTerminateRunnable(spark, process)).start()
(applicationId,process)*/
}
}