run application on yarn by SparkLauncher(Now checkpoint.path is fixed, we can not read this config in config.properties, because yarn will find config.properties in path /opt/hadoop-2.6.0/tmp/nm-local-dir/usercache/root/appcache/appId/containerId/conf/config.properties)

This commit is contained in:
judy0131 2018-10-08 13:26:00 +08:00
parent 8b5fb4e619
commit f577b6ad79
4 changed files with 59 additions and 4 deletions

View File

@ -17,4 +17,4 @@ hive.metastore.uris=thrift://10.0.86.191:9083
piflow.bundle=/opt/project/piflow/piflow-server/target/piflow-server-0.9.jar
yarn.url=http://10.0.86.191:8088/ws/v1/cluster/apps/
checkpoint.path=hdfs://10.0.86.89:9000/xjzhu/piflow/checkpoints/"
checkpoint.path=hdfs://10.0.86.89:9000/xjzhu/piflow/checkpoints/

View File

@ -8,7 +8,7 @@ import org.junit.Test
import scala.util.parsing.json.JSON
class FlowTest {
class FlowTest_XX {
@Test
def testFlow(): Unit ={

View File

@ -8,7 +8,61 @@ import org.apache.http.util.EntityUtils
object HTTPClientStartFlow1 {
def main(args: Array[String]): Unit = {
val json = """{"flow":{"name":"Flow1","uuid":"1234","stops":[{"uuid":"1111","name":"XmlParser","bundle":"cn.piflow.bundle.xml.XmlParser","properties":{"xmlpath":"hdfs://10.0.86.89:9000/xjzhu/dblp.mini.xml","rowTag":"phdthesis"}},{"uuid":"2222","name":"SelectField","bundle":"cn.piflow.bundle.common.SelectField","properties":{"schema":"title,author,pages"}},{"uuid":"3333","name":"PutHiveStreaming","bundle":"cn.piflow.bundle.hive.PutHiveStreaming","properties":{"database":"sparktest","table":"dblp_phdthesis"}},{"uuid":"4444","name":"CsvParser","bundle":"cn.piflow.bundle.csv.CsvParser","properties":{"csvPath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.csv","header":"false","delimiter":",","schema":"title,author,pages"}},{"uuid":"555","name":"Merge","bundle":"cn.piflow.bundle.common.Merge","properties":{}}],"paths":[{"from":"XmlParser","outport":"","inport":"","to":"SelectField"},{"from":"SelectField","outport":"","inport":"data1","to":"Merge"},{"from":"CsvParser","outport":"","inport":"data2","to":"Merge"},{"from":"Merge","outport":"","inport":"","to":"PutHiveStreaming"}]}}"""
val json =
"""
|{
| "flow":{
| "name":"test",
| "uuid":"1234",
| "checkpoint":"Merge",
| "stops":[
| {
| "uuid":"1111",
| "name":"XmlParser",
| "bundle":"cn.piflow.bundle.xml.XmlParser",
| "properties":{
| "xmlpath":"hdfs://10.0.86.89:9000/xjzhu/dblp.mini.xml",
| "rowTag":"phdthesis"
| }
|
| },
| {
| "uuid":"2222",
| "name":"SelectField",
| "bundle":"cn.piflow.bundle.common.SelectField",
| "properties":{
| "schema":"title,author,pages"
| }
|
| },
| {
| "uuid":"888",
| "name":"CsvSave",
| "bundle":"cn.piflow.bundle.csv.CsvSave",
| "properties":{
| "csvSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis_result.csv",
| "header":"true",
| "delimiter":","
| }
| }
| ],
| "paths":[
| {
| "from":"XmlParser",
| "outport":"",
| "inport":"",
| "to":"SelectField"
| },
| {
| "from":"SelectField",
| "outport":"",
| "inport":"",
| "to":"CsvSave"
| }
| ]
| }
|}
""".stripMargin
val url = "http://10.0.86.98:8001/flow/start"
val client = HttpClients.createDefault()
val post:HttpPost = new HttpPost(url)

View File

@ -29,7 +29,8 @@ object StartFlowMain {
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", PropertyUtil.getPropertyValue("checkpoint.path"))
//.bind("checkpoint.path", PropertyUtil.getPropertyValue("checkpoint.path"))
.bind("checkpoint.path","hdfs://10.0.86.89:9000/xjzhu/piflow/checkpoints/")
.start(flow);
val applicationId = spark.sparkContext.applicationId
process.awaitTermination();