remove spark conf

This commit is contained in:
judy0131 2019-07-23 18:22:57 +08:00
parent d71b720dba
commit 44352f6898
1 changed files with 33 additions and 20 deletions

View File

@ -103,31 +103,38 @@ object API {
val countDownLatch = new CountDownLatch(1)
val launcher = new SparkLauncher
val handle =launcher
val sparkLauncher =launcher
.setAppName(appName)
.setMaster(PropertyUtil.getPropertyValue("spark.master"))
.setDeployMode(PropertyUtil.getPropertyValue("spark.deploy.mode"))
.setAppResource(PropertyUtil.getPropertyValue("piflow.bundle"))
.setVerbose(true)
.setConf("spark.hadoop.yarn.resourcemanager.hostname", PropertyUtil.getPropertyValue("yarn.resourcemanager.hostname"))
.setConf("spark.hadoop.yarn.resourcemanager.address", PropertyUtil.getPropertyValue("yarn.resourcemanager.address"))
.setConf("spark.yarn.access.namenode", PropertyUtil.getPropertyValue("yarn.access.namenode"))
.setConf("spark.yarn.stagingDir", PropertyUtil.getPropertyValue("yarn.stagingDir"))
.setConf("spark.yarn.jars", PropertyUtil.getPropertyValue("yarn.jars"))
.setConf("spark.jars", PropertyUtil.getPropertyValue("piflow.bundle"))
.setConf("spark.hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.setConf("spark.driver.memory", dirverMem)
.setConf("spark.num.executors",executorNum)
.setConf("spark.executor.memory", executorMem)
.setConf("spark.executor.cores",executorCores)
//.setConf("spark.cores.max", "4")
//.setConf("spark.checkpoint", PropertyUtil.getPropertyValue("checkpoint.path"))
.addFile(PropertyUtil.getConfigureFile())
.setMainClass("cn.piflow.api.StartFlowMain")
.addAppArgs(flowJson.stripMargin)
//.redirectOutput(stdout)
//.redirectError(stderr)
.startApplication( new SparkAppHandle.Listener {
if(PropertyUtil.getPropertyValue("yarn.resourcemanager.hostname") != null)
sparkLauncher.setConf("spark.hadoop.yarn.resourcemanager.hostname", PropertyUtil.getPropertyValue("yarn.resourcemanager.hostname"))
if(PropertyUtil.getPropertyValue("yarn.resourcemanager.address") != null)
sparkLauncher.setConf("spark.hadoop.yarn.resourcemanager.address", PropertyUtil.getPropertyValue("yarn.resourcemanager.address"))
if(PropertyUtil.getPropertyValue("yarn.access.namenode") != null)
sparkLauncher.setConf("spark.yarn.access.namenode", PropertyUtil.getPropertyValue("yarn.access.namenode"))
if(PropertyUtil.getPropertyValue("yarn.stagingDir") != null)
sparkLauncher.setConf("spark.yarn.stagingDir", PropertyUtil.getPropertyValue("yarn.stagingDir"))
if(PropertyUtil.getPropertyValue("yarn.jars") != null)
sparkLauncher.setConf("spark.yarn.jars", PropertyUtil.getPropertyValue("yarn.jars"))
val handle = sparkLauncher.startApplication( new SparkAppHandle.Listener {
override def stateChanged(handle: SparkAppHandle): Unit = {
appId = handle.getAppId
val sparkAppState = handle.getState
@ -167,15 +174,7 @@ object API {
def stopFlow(appID : String, process : SparkAppHandle) : String = {
//yarn application kill appId
val url = PropertyUtil.getPropertyValue("yarn.url") + appID + "/state"
val client = HttpClients.createDefault()
val put:HttpPut = new HttpPut(url)
val body ="{\"state\":\"KILLED\"}"
put.addHeader("Content-Type", "application/json")
put.setEntity(new StringEntity(body))
val response:CloseableHttpResponse = client.execute(put)
val entity = response.getEntity
val str = EntityUtils.toString(entity,"UTF-8")
stopFlowOnYarn(appID)
//process kill
process.kill()
@ -187,6 +186,20 @@ object API {
"ok"
}
def stopFlowOnYarn(appID : String) : String = {
//yarn application kill appId
val url = PropertyUtil.getPropertyValue("yarn.url") + appID + "/state"
val client = HttpClients.createDefault()
val put:HttpPut = new HttpPut(url)
val body ="{\"state\":\"KILLED\"}"
put.addHeader("Content-Type", "application/json")
put.setEntity(new StringEntity(body))
val response:CloseableHttpResponse = client.execute(put)
val entity = response.getEntity
val str = EntityUtils.toString(entity,"UTF-8")
str
}
def getFlowInfo(appID : String) : String = {
val flowInfo = H2Util.getFlowInfo(appID)
flowInfo