support multi sparkContext to run more than one flow at the same time(but only in standalone mode, yarn still have problems)

This commit is contained in:
judy0131 2018-09-29 18:03:18 +08:00
parent 1825eb07ff
commit b34dec8110
3 changed files with 34 additions and 2 deletions

View File

@ -64,7 +64,14 @@ object API {
//.setMaster(PropertyUtil.getPropertyValue("spark.master"))
//.setDeployMode(PropertyUtil.getPropertyValue("spark.deploy.mode"))
.setAppResource(PropertyUtil.getPropertyValue("piflow.bundle"))
//.setVerbose(true)
.setVerbose(true)
.setConf("spark.hadoop.yarn.resourcemanager.hostname", PropertyUtil.getPropertyValue("yarn.resourcemanager.hostname"))
.setConf("spark.hadoop.yarn.resourcemanager.address", PropertyUtil.getPropertyValue("yarn.resourcemanager.address"))
.setConf("spark.yarn.access.namenode", PropertyUtil.getPropertyValue("yarn.access.namenode"))
.setConf("spark.yarn.stagingDir", PropertyUtil.getPropertyValue("yarn.stagingDir"))
.setConf("spark.yarn.jars", PropertyUtil.getPropertyValue("yarn.jars"))
.setConf("spark.jars", PropertyUtil.getPropertyValue("piflow.bundle"))
.setConf("spark.hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.setConf("spark.driver.memory", "1g")
.setConf("spark.executor.memory", "1g")
.setConf("spark.cores.max", "1")

View File

@ -0,0 +1,25 @@
package cn.piflow.api
import org.apache.http.client.methods.{CloseableHttpResponse, HttpPost}
import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.HttpClients
import org.apache.http.util.EntityUtils
object HTTPClientStartFlow1 {
def main(args: Array[String]): Unit = {
val json = """{"flow":{"name":"Flow1","uuid":"1234","stops":[{"uuid":"1111","name":"XmlParser","bundle":"cn.piflow.bundle.xml.XmlParser","properties":{"xmlpath":"hdfs://10.0.86.89:9000/xjzhu/dblp.mini.xml","rowTag":"phdthesis"}},{"uuid":"2222","name":"SelectField","bundle":"cn.piflow.bundle.common.SelectField","properties":{"schema":"title,author,pages"}},{"uuid":"3333","name":"PutHiveStreaming","bundle":"cn.piflow.bundle.hive.PutHiveStreaming","properties":{"database":"sparktest","table":"dblp_phdthesis"}},{"uuid":"4444","name":"CsvParser","bundle":"cn.piflow.bundle.csv.CsvParser","properties":{"csvPath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.csv","header":"false","delimiter":",","schema":"title,author,pages"}},{"uuid":"555","name":"Merge","bundle":"cn.piflow.bundle.common.Merge","properties":{}}],"paths":[{"from":"XmlParser","outport":"","inport":"","to":"SelectField"},{"from":"SelectField","outport":"","inport":"data1","to":"Merge"},{"from":"CsvParser","outport":"","inport":"data2","to":"Merge"},{"from":"Merge","outport":"","inport":"","to":"PutHiveStreaming"}]}}"""
val url = "http://10.0.86.98:8001/flow/start"
val client = HttpClients.createDefault()
val post:HttpPost = new HttpPost(url)
post.addHeader("Content-Type", "application/json")
post.setEntity(new StringEntity(json))
val response:CloseableHttpResponse = client.execute(post)
val entity = response.getEntity
val str = EntityUtils.toString(entity,"UTF-8")
println("Code is " + str)
}
}

View File

@ -7,7 +7,7 @@ import org.apache.http.util.EntityUtils
object HTTPClientStopFlow {
def main(args: Array[String]): Unit = {
val json = """{"appID":"application_1536718350536_0023"}"""
val json = """{"appID":"app-20180929163623-0059"}"""
val url = "http://10.0.86.98:8001/flow/stop"
val client = HttpClients.createDefault()
val post:HttpPost = new HttpPost(url)