diff --git a/piflow-server/src/main/scala/cn/piflow/api/API.scala b/piflow-server/src/main/scala/cn/piflow/api/API.scala index 6fcd4bd..059dfc3 100644 --- a/piflow-server/src/main/scala/cn/piflow/api/API.scala +++ b/piflow-server/src/main/scala/cn/piflow/api/API.scala @@ -103,31 +103,38 @@ object API { val countDownLatch = new CountDownLatch(1) val launcher = new SparkLauncher - val handle =launcher + + + val sparkLauncher =launcher .setAppName(appName) .setMaster(PropertyUtil.getPropertyValue("spark.master")) .setDeployMode(PropertyUtil.getPropertyValue("spark.deploy.mode")) .setAppResource(PropertyUtil.getPropertyValue("piflow.bundle")) .setVerbose(true) - .setConf("spark.hadoop.yarn.resourcemanager.hostname", PropertyUtil.getPropertyValue("yarn.resourcemanager.hostname")) - .setConf("spark.hadoop.yarn.resourcemanager.address", PropertyUtil.getPropertyValue("yarn.resourcemanager.address")) - .setConf("spark.yarn.access.namenode", PropertyUtil.getPropertyValue("yarn.access.namenode")) - .setConf("spark.yarn.stagingDir", PropertyUtil.getPropertyValue("yarn.stagingDir")) - .setConf("spark.yarn.jars", PropertyUtil.getPropertyValue("yarn.jars")) - .setConf("spark.jars", PropertyUtil.getPropertyValue("piflow.bundle")) .setConf("spark.hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris")) .setConf("spark.driver.memory", dirverMem) .setConf("spark.num.executors",executorNum) .setConf("spark.executor.memory", executorMem) .setConf("spark.executor.cores",executorCores) - //.setConf("spark.cores.max", "4") - //.setConf("spark.checkpoint", PropertyUtil.getPropertyValue("checkpoint.path")) .addFile(PropertyUtil.getConfigureFile()) .setMainClass("cn.piflow.api.StartFlowMain") .addAppArgs(flowJson.stripMargin) //.redirectOutput(stdout) - //.redirectError(stderr) - .startApplication( new SparkAppHandle.Listener { + + + + if(PropertyUtil.getPropertyValue("yarn.resourcemanager.hostname") != null) + sparkLauncher.setConf("spark.hadoop.yarn.resourcemanager.hostname", PropertyUtil.getPropertyValue("yarn.resourcemanager.hostname")) + if(PropertyUtil.getPropertyValue("yarn.resourcemanager.address") != null) + sparkLauncher.setConf("spark.hadoop.yarn.resourcemanager.address", PropertyUtil.getPropertyValue("yarn.resourcemanager.address")) + if(PropertyUtil.getPropertyValue("yarn.access.namenode") != null) + sparkLauncher.setConf("spark.yarn.access.namenode", PropertyUtil.getPropertyValue("yarn.access.namenode")) + if(PropertyUtil.getPropertyValue("yarn.stagingDir") != null) + sparkLauncher.setConf("spark.yarn.stagingDir", PropertyUtil.getPropertyValue("yarn.stagingDir")) + if(PropertyUtil.getPropertyValue("yarn.jars") != null) + sparkLauncher.setConf("spark.yarn.jars", PropertyUtil.getPropertyValue("yarn.jars")) + + val handle = sparkLauncher.startApplication( new SparkAppHandle.Listener { override def stateChanged(handle: SparkAppHandle): Unit = { appId = handle.getAppId val sparkAppState = handle.getState @@ -167,15 +174,7 @@ object API { def stopFlow(appID : String, process : SparkAppHandle) : String = { //yarn application kill appId - val url = PropertyUtil.getPropertyValue("yarn.url") + appID + "/state" - val client = HttpClients.createDefault() - val put:HttpPut = new HttpPut(url) - val body ="{\"state\":\"KILLED\"}" - put.addHeader("Content-Type", "application/json") - put.setEntity(new StringEntity(body)) - val response:CloseableHttpResponse = client.execute(put) - val entity = response.getEntity - val str = EntityUtils.toString(entity,"UTF-8") + stopFlowOnYarn(appID) //process kill process.kill() @@ -187,6 +186,20 @@ object API { "ok" } + def stopFlowOnYarn(appID : String) : String = { + //yarn application kill appId + val url = PropertyUtil.getPropertyValue("yarn.url") + appID + "/state" + val client = HttpClients.createDefault() + val put:HttpPut = new HttpPut(url) + val body ="{\"state\":\"KILLED\"}" + put.addHeader("Content-Type", "application/json") + put.setEntity(new StringEntity(body)) + val response:CloseableHttpResponse = client.execute(put) + val entity = response.getEntity + val str = EntityUtils.toString(entity,"UTF-8") + str + } + def getFlowInfo(appID : String) : String = { val flowInfo = H2Util.getFlowInfo(appID) flowInfo