From 1825eb07ffdba9734c07787d0b48063dbefdee90 Mon Sep 17 00:00:00 2001 From: judy0131 Date: Sat, 29 Sep 2018 17:53:30 +0800 Subject: [PATCH] support multi sparkContext to run more than one flow at the same time(but only in standalone mode, yarn still have problems) --- conf/config.properties | 9 +-- .../src/main/scala/cn/piflow/api/API.scala | 69 ++++++++++++++----- .../cn/piflow/api/HTTPClientStartFlow.scala | 2 +- .../scala/cn/piflow/api/HTTPService.scala | 5 +- .../scala/cn/piflow/api/StartFlowMain.scala | 39 +++++++++++ 5 files changed, 101 insertions(+), 23 deletions(-) create mode 100644 piflow-server/src/main/scala/cn/piflow/api/StartFlowMain.scala diff --git a/conf/config.properties b/conf/config.properties index 3becfa6..0d0d186 100644 --- a/conf/config.properties +++ b/conf/config.properties @@ -1,10 +1,10 @@ server.ip=10.0.86.98 server.port=8001 -#spark.master=spark://10.0.86.89:7077 +spark.master=spark://10.0.86.89:7077 #spark.master=spark://10.0.86.191:7077 -spark.master=yarn -spark.deploy.mode=client +#spark.master=yarn +spark.deploy.mode=cluster yarn.resourcemanager.hostname=10.0.86.191 yarn.resourcemanager.address=10.0.86.191:8032 yarn.access.namenode=hdfs://10.0.86.191:9000 @@ -13,7 +13,8 @@ yarn.jars=hdfs://10.0.86.191:9000/user/spark/share/lib/*.jar hive.metastore.uris=thrift://10.0.86.191:9083 -piflow.bundle=/opt/project/piflow/out/artifacts/piflow_bundle/piflow-bundle.jar +#piflow.bundle=/opt/project/piflow/out/artifacts/piflow_bundle/piflow-bundle.jar +piflow.bundle=/opt/project/piflow/piflow-server/target/piflow-server-0.9.jar yarn.url=http://10.0.86.191:8088/ws/v1/cluster/apps/ checkpoint.path=hdfs://10.0.86.89:9000/xjzhu/piflow/checkpoints/" \ No newline at end of file diff --git a/piflow-server/src/main/scala/cn/piflow/api/API.scala b/piflow-server/src/main/scala/cn/piflow/api/API.scala index e26dbd3..6258bbe 100644 --- a/piflow-server/src/main/scala/cn/piflow/api/API.scala +++ b/piflow-server/src/main/scala/cn/piflow/api/API.scala @@ -1,21 +1,23 @@ package cn.piflow.api +import java.util.concurrent.CountDownLatch + import cn.piflow.Runner import cn.piflow.conf.bean.{FlowBean, PropertyDescriptor} import org.apache.spark.sql.SparkSession import cn.piflow.conf.util.{ClassUtil, OptionUtil} import cn.piflow.Process -import cn.piflow.api.util.{PropertyUtil} +import cn.piflow.api.util.PropertyUtil import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet, HttpPost} import org.apache.http.impl.client.HttpClients import org.apache.http.util.EntityUtils -import org.apache.spark.launcher.SparkLauncher +import org.apache.spark.launcher.{SparkAppHandle, SparkLauncher} import scala.util.parsing.json.JSON object API { - def startFlow(flowJson : String):(String,Process) = { + /*def startFlow(flowJson : String):(String,Process) = { val map = OptionUtil.getAny(JSON.parseFull(flowJson)).asInstanceOf[Map[String, Any]] println(map) @@ -53,22 +55,57 @@ object API { new Thread( new WaitProcessTerminateRunnable(spark, process)).start() (applicationId,process) - /*val launcher = new SparkLauncher - launcher.setMaster(PropertyUtil.getPropertyValue("spark.master")) - .setAppName("test") - .setDeployMode(PropertyUtil.getPropertyValue("spark.deploy.mode")) - .setConf("spark.hadoop.yarn.resourcemanager.hostname", PropertyUtil.getPropertyValue("yarn.resourcemanager.hostname")) - .setConf("spark.hadoop.yarn.resourcemanager.address", PropertyUtil.getPropertyValue("yarn.resourcemanager.address")).setConf("spark.yarn.access.namenode", PropertyUtil.getPropertyValue("yarn.access.namenode")) - .setConf("spark.yarn.stagingDir", PropertyUtil.getPropertyValue("yarn.stagingDir")) - .setConf("spark.yarn.jars", PropertyUtil.getPropertyValue("yarn.jars")) - .setConf("spark.jars", PropertyUtil.getPropertyValue("piflow.bundle")) - .setConf("hive.metastore.uris", PropertyUtil.getPropertyValue("hive.metastore.uris")) - .setMainClass("lalla") - .addAppArgs(flowJson)*/ + }*/ + def startFlow(flowJson : String):(String,SparkAppHandle) = { + var appId:String = null + val countDownLatch = new CountDownLatch(1) + val launcher = new SparkLauncher + val handle =launcher.setMaster(PropertyUtil.getPropertyValue("spark.master")) + //.setMaster(PropertyUtil.getPropertyValue("spark.master")) + //.setDeployMode(PropertyUtil.getPropertyValue("spark.deploy.mode")) + .setAppResource(PropertyUtil.getPropertyValue("piflow.bundle")) + //.setVerbose(true) + .setConf("spark.driver.memory", "1g") + .setConf("spark.executor.memory", "1g") + .setConf("spark.cores.max", "1") + .setMainClass("cn.piflow.api.StartFlowMain") + .addAppArgs(flowJson) + .startApplication( new SparkAppHandle.Listener { + override def stateChanged(handle: SparkAppHandle): Unit = { + appId = handle.getAppId + val sparkAppState = handle.getState + if(appId != null){ + println("Spark jon with app id: " + appId + ",\t State changed to: " + sparkAppState) + }else{ + println("Spark jon's state changed to: " + sparkAppState) + } + if (handle.getState().isFinal){ + countDownLatch.countDown() + println("Task is finished!") + } + } + override def infoChanged(handle: SparkAppHandle): Unit = { + //println("Info:" + handle.getState().toString) + } + }) + while (appId == null){ + Thread.sleep(1000) + } + //println("Task is executing, please wait...") + //countDownLatch.await() + //println("Task is finished!") + /*launcher.launch() + val sparkAppHandle : SparkAppHandle = launcher.startApplication() + + while(sparkAppHandle.getState != SparkAppHandle.State.FINISHED){ + Thread.sleep(10000) + println("ApplicationId = " + sparkAppHandle.getAppId + "---Current State = " + sparkAppHandle.getState) + }*/ + (appId, handle) } - def stopFlow(process : Process): String = { + def stopFlow(process : SparkAppHandle): String = { process.stop() "ok" } diff --git a/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStartFlow.scala b/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStartFlow.scala index 3c37be0..9f75b95 100644 --- a/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStartFlow.scala +++ b/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStartFlow.scala @@ -9,7 +9,7 @@ import org.apache.http.util.EntityUtils object HTTPClientStartFlow { def main(args: Array[String]): Unit = { - val json = """{"flow":{"name":"test","uuid":"1234","stops":[{"uuid":"1111","name":"XmlParser","bundle":"cn.piflow.bundle.xml.XmlParser","properties":{"xmlpath":"hdfs://10.0.86.89:9000/xjzhu/dblp.mini.xml","rowTag":"phdthesis"}},{"uuid":"2222","name":"SelectField","bundle":"cn.piflow.bundle.common.SelectField","properties":{"schema":"title,author,pages"}},{"uuid":"3333","name":"PutHiveStreaming","bundle":"cn.piflow.bundle.hive.PutHiveStreaming","properties":{"database":"sparktest","table":"dblp_phdthesis"}},{"uuid":"4444","name":"CsvParser","bundle":"cn.piflow.bundle.csv.CsvParser","properties":{"csvPath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.csv","header":"false","delimiter":",","schema":"title,author,pages"}},{"uuid":"555","name":"Merge","bundle":"cn.piflow.bundle.common.Merge","properties":{}},{"uuid":"666","name":"Fork","bundle":"cn.piflow.bundle.common.Fork","properties":{"outports":["out1","out2","out3"]}},{"uuid":"777","name":"JsonSave","bundle":"cn.piflow.bundle.json.JsonSave","properties":{"jsonSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.json"}},{"uuid":"888","name":"CsvSave","bundle":"cn.piflow.bundle.csv.CsvSave","properties":{"csvSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis_result.csv","header":"true","delimiter":","}}],"paths":[{"from":"XmlParser","outport":"","inport":"","to":"SelectField"},{"from":"SelectField","outport":"","inport":"data1","to":"Merge"},{"from":"CsvParser","outport":"","inport":"data2","to":"Merge"},{"from":"Merge","outport":"","inport":"","to":"Fork"},{"from":"Fork","outport":"out1","inport":"","to":"PutHiveStreaming"},{"from":"Fork","outport":"out2","inport":"","to":"JsonSave"},{"from":"Fork","outport":"out3","inport":"","to":"CsvSave"}]}}""" + val json = """{"flow":{"name":"Flow","uuid":"1234","stops":[{"uuid":"1111","name":"XmlParser","bundle":"cn.piflow.bundle.xml.XmlParser","properties":{"xmlpath":"hdfs://10.0.86.89:9000/xjzhu/dblp.mini.xml","rowTag":"phdthesis"}},{"uuid":"2222","name":"SelectField","bundle":"cn.piflow.bundle.common.SelectField","properties":{"schema":"title,author,pages"}},{"uuid":"3333","name":"PutHiveStreaming","bundle":"cn.piflow.bundle.hive.PutHiveStreaming","properties":{"database":"sparktest","table":"dblp_phdthesis"}},{"uuid":"4444","name":"CsvParser","bundle":"cn.piflow.bundle.csv.CsvParser","properties":{"csvPath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.csv","header":"false","delimiter":",","schema":"title,author,pages"}},{"uuid":"555","name":"Merge","bundle":"cn.piflow.bundle.common.Merge","properties":{}},{"uuid":"666","name":"Fork","bundle":"cn.piflow.bundle.common.Fork","properties":{"outports":["out1","out2","out3"]}},{"uuid":"777","name":"JsonSave","bundle":"cn.piflow.bundle.json.JsonSave","properties":{"jsonSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.json"}},{"uuid":"888","name":"CsvSave","bundle":"cn.piflow.bundle.csv.CsvSave","properties":{"csvSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis_result.csv","header":"true","delimiter":","}}],"paths":[{"from":"XmlParser","outport":"","inport":"","to":"SelectField"},{"from":"SelectField","outport":"","inport":"data1","to":"Merge"},{"from":"CsvParser","outport":"","inport":"data2","to":"Merge"},{"from":"Merge","outport":"","inport":"","to":"Fork"},{"from":"Fork","outport":"out1","inport":"","to":"PutHiveStreaming"},{"from":"Fork","outport":"out2","inport":"","to":"JsonSave"},{"from":"Fork","outport":"out3","inport":"","to":"CsvSave"}]}}""" val url = "http://10.0.86.98:8001/flow/start" val client = HttpClients.createDefault() val post:HttpPost = new HttpPost(url) diff --git a/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala b/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala index 408fd8e..3f30290 100644 --- a/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala +++ b/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala @@ -16,6 +16,7 @@ import com.typesafe.config.ConfigFactory import scala.concurrent.Future import scala.util.parsing.json.JSON import cn.piflow.Process +import org.apache.spark.launcher.SparkAppHandle import spray.json.DefaultJsonProtocol @@ -23,7 +24,7 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup implicit val system = ActorSystem("HTTPService", ConfigFactory.load()) implicit val materializer = ActorMaterializer() implicit val executionContext = system.dispatcher - var processMap = Map[String, Process]() + var processMap = Map[String, SparkAppHandle]() def toJson(entity: RequestEntity): Map[String, Any] = { entity match { @@ -77,7 +78,7 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup processMap.get(appId) match { case Some(process) => - val result = API.stopFlow(process.asInstanceOf[Process]) + val result = API.stopFlow(process) Future.successful(HttpResponse(entity = result)) case _ => Future.successful(HttpResponse(entity = "Can not found process Error!")) diff --git a/piflow-server/src/main/scala/cn/piflow/api/StartFlowMain.scala b/piflow-server/src/main/scala/cn/piflow/api/StartFlowMain.scala new file mode 100644 index 0000000..7b0b1cc --- /dev/null +++ b/piflow-server/src/main/scala/cn/piflow/api/StartFlowMain.scala @@ -0,0 +1,39 @@ +package cn.piflow.api + +import cn.piflow.Runner +import cn.piflow.api.util.PropertyUtil +import cn.piflow.conf.bean.FlowBean +import cn.piflow.conf.util.OptionUtil +import org.apache.spark.sql.SparkSession + +import scala.util.parsing.json.JSON + +object StartFlowMain { + + def main(args: Array[String]): Unit = { + val flowJson = args(0) + val map = OptionUtil.getAny(JSON.parseFull(flowJson)).asInstanceOf[Map[String, Any]] + println(map) + + //create flow + val flowBean = FlowBean(map) + val flow = flowBean.constructFlow() + + //execute flow + val spark = SparkSession.builder() + .appName(flowBean.name) + .enableHiveSupport() + .getOrCreate() + + val process = Runner.create() + .bind(classOf[SparkSession].getName, spark) + .bind("checkpoint.path", PropertyUtil.getPropertyValue("checkpoint.path")) + .start(flow); + val applicationId = spark.sparkContext.applicationId + process.awaitTermination(); + spark.close(); + /*new Thread( new WaitProcessTerminateRunnable(spark, process)).start() + (applicationId,process)*/ + } + +}