diff --git a/classpath/piflow-external.jar b/classpath/piflow-external.jar index 5633af9..8a449b8 100644 Binary files a/classpath/piflow-external.jar and b/classpath/piflow-external.jar differ diff --git a/conf/config.properties b/conf/config.properties index 3becfa6..1dd10ae 100644 --- a/conf/config.properties +++ b/conf/config.properties @@ -4,7 +4,7 @@ server.port=8001 #spark.master=spark://10.0.86.89:7077 #spark.master=spark://10.0.86.191:7077 spark.master=yarn -spark.deploy.mode=client +spark.deploy.mode=cluster yarn.resourcemanager.hostname=10.0.86.191 yarn.resourcemanager.address=10.0.86.191:8032 yarn.access.namenode=hdfs://10.0.86.191:9000 @@ -13,7 +13,10 @@ yarn.jars=hdfs://10.0.86.191:9000/user/spark/share/lib/*.jar hive.metastore.uris=thrift://10.0.86.191:9083 -piflow.bundle=/opt/project/piflow/out/artifacts/piflow_bundle/piflow-bundle.jar +#piflow.bundle=/opt/project/piflow/out/artifacts/piflow_bundle/piflow-bundle.jar +piflow.bundle=/opt/project/piflow/piflow-server/target/piflow-server-0.9.jar yarn.url=http://10.0.86.191:8088/ws/v1/cluster/apps/ -checkpoint.path=hdfs://10.0.86.89:9000/xjzhu/piflow/checkpoints/" \ No newline at end of file +checkpoint.path=hdfs://10.0.86.89:9000/xjzhu/piflow/checkpoints/ + +log.path=/opt/project/piflow/logs \ No newline at end of file diff --git a/piflow-bundle/pom.xml b/piflow-bundle/pom.xml index 13fd46e..976a36e 100644 --- a/piflow-bundle/pom.xml +++ b/piflow-bundle/pom.xml @@ -26,7 +26,7 @@ org.reflections reflections - 0.9.11 + 0.9.9 com.chuusai @@ -109,7 +109,6 @@ - diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/FlowTest_XX.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/FlowTest_XX.scala index 93ff00c..0309ef1 100644 --- a/piflow-bundle/src/test/scala/cn/piflow/bundle/FlowTest_XX.scala +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/FlowTest_XX.scala @@ -8,7 +8,7 @@ import org.junit.Test import scala.util.parsing.json.JSON -class FlowTest { +class FlowTest_XX { @Test def testFlow(): Unit ={ diff --git a/piflow-server/src/main/scala/cn/piflow/api/API.scala b/piflow-server/src/main/scala/cn/piflow/api/API.scala index e26dbd3..d76a5d3 100644 --- a/piflow-server/src/main/scala/cn/piflow/api/API.scala +++ b/piflow-server/src/main/scala/cn/piflow/api/API.scala @@ -1,21 +1,24 @@ package cn.piflow.api -import cn.piflow.Runner -import cn.piflow.conf.bean.{FlowBean, PropertyDescriptor} +import java.io.File +import java.text.SimpleDateFormat +import java.util.Date +import java.util.concurrent.CountDownLatch + import org.apache.spark.sql.SparkSession -import cn.piflow.conf.util.{ClassUtil, OptionUtil} +import cn.piflow.conf.util.{ClassUtil, MapUtil, OptionUtil} import cn.piflow.Process -import cn.piflow.api.util.{PropertyUtil} +import cn.piflow.api.util.PropertyUtil import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet, HttpPost} import org.apache.http.impl.client.HttpClients import org.apache.http.util.EntityUtils -import org.apache.spark.launcher.SparkLauncher +import org.apache.spark.launcher.{SparkAppHandle, SparkLauncher} import scala.util.parsing.json.JSON object API { - def startFlow(flowJson : String):(String,Process) = { + /*def startFlow(flowJson : String):(String,Process) = { val map = OptionUtil.getAny(JSON.parseFull(flowJson)).asInstanceOf[Map[String, Any]] println(map) @@ -53,22 +56,75 @@ object API { new Thread( new WaitProcessTerminateRunnable(spark, process)).start() (applicationId,process) - /*val launcher = new SparkLauncher - launcher.setMaster(PropertyUtil.getPropertyValue("spark.master")) - .setAppName("test") + }*/ + def startFlow(flowJson : String):(String,SparkAppHandle) = { + + var appId:String = null + val map = OptionUtil.getAny(JSON.parseFull(flowJson)).asInstanceOf[Map[String, Any]] + val flowMap = MapUtil.get(map, "flow").asInstanceOf[Map[String, Any]] + val uuid = MapUtil.get(flowMap,"uuid").asInstanceOf[String] + val appName = MapUtil.get(flowMap,"name").asInstanceOf[String] + + val (stdout, stderr) = getLogFile(uuid, appName) + + val countDownLatch = new CountDownLatch(1) + val launcher = new SparkLauncher + val handle =launcher + .setAppName(appName) + .setMaster(PropertyUtil.getPropertyValue("spark.master")) .setDeployMode(PropertyUtil.getPropertyValue("spark.deploy.mode")) + .setAppResource(PropertyUtil.getPropertyValue("piflow.bundle")) + .setVerbose(true) .setConf("spark.hadoop.yarn.resourcemanager.hostname", PropertyUtil.getPropertyValue("yarn.resourcemanager.hostname")) - .setConf("spark.hadoop.yarn.resourcemanager.address", PropertyUtil.getPropertyValue("yarn.resourcemanager.address")).setConf("spark.yarn.access.namenode", PropertyUtil.getPropertyValue("yarn.access.namenode")) + .setConf("spark.hadoop.yarn.resourcemanager.address", PropertyUtil.getPropertyValue("yarn.resourcemanager.address")) + .setConf("spark.yarn.access.namenode", PropertyUtil.getPropertyValue("yarn.access.namenode")) .setConf("spark.yarn.stagingDir", PropertyUtil.getPropertyValue("yarn.stagingDir")) .setConf("spark.yarn.jars", PropertyUtil.getPropertyValue("yarn.jars")) .setConf("spark.jars", PropertyUtil.getPropertyValue("piflow.bundle")) - .setConf("hive.metastore.uris", PropertyUtil.getPropertyValue("hive.metastore.uris")) - .setMainClass("lalla") - .addAppArgs(flowJson)*/ + .setConf("spark.hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris")) + .setConf("spark.driver.memory", "1g") + .setConf("spark.executor.memory", "1g") + .setConf("spark.cores.max", "2") + .setMainClass("cn.piflow.api.StartFlowMain") + .addAppArgs(flowJson) + .redirectOutput(stdout) + .redirectError(stderr) + .startApplication( new SparkAppHandle.Listener { + override def stateChanged(handle: SparkAppHandle): Unit = { + appId = handle.getAppId + val sparkAppState = handle.getState + if(appId != null){ + println("Spark job with app id: " + appId + ",\t State changed to: " + sparkAppState) + }else{ + println("Spark job's state changed to: " + sparkAppState) + } + if (handle.getState().isFinal){ + countDownLatch.countDown() + println("Task is finished!") + } + } + override def infoChanged(handle: SparkAppHandle): Unit = { + //println("Info:" + handle.getState().toString) + } + }) + while (appId == null){ + Thread.sleep(1000) + } + //println("Task is executing, please wait...") + //countDownLatch.await() + //println("Task is finished!") + /*launcher.launch() + val sparkAppHandle : SparkAppHandle = launcher.startApplication() + + while(sparkAppHandle.getState != SparkAppHandle.State.FINISHED){ + Thread.sleep(10000) + println("ApplicationId = " + sparkAppHandle.getAppId + "---Current State = " + sparkAppHandle.getState) + }*/ + (appId, handle) } - def stopFlow(process : Process): String = { + def stopFlow(process : SparkAppHandle): String = { process.stop() "ok" } @@ -102,6 +158,20 @@ object API { """{"groups":"""" + groups + """"}""" } + private def getLogFile(uuid : String, appName : String) : (File,File) = { + val now : Date = new Date() + val dataFormat : SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd_HH:mm:ss") + val nowDate = dataFormat.format(now) + + val stdoutPathString = PropertyUtil.getPropertyValue("log.path") + "/" + appName + "_" + uuid + "_stdout_" + nowDate + val stdout = new File(stdoutPathString) + + val stderrPathString = PropertyUtil.getPropertyValue("log.path") + "/" + appName + "_" + uuid + "_stderr_" + nowDate + val stderr = new File(stderrPathString) + + (stdout, stderr) + } + } class WaitProcessTerminateRunnable(spark : SparkSession, process: Process) extends Runnable { diff --git a/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStartFlow.scala b/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStartFlow.scala index 3c37be0..9b74529 100644 --- a/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStartFlow.scala +++ b/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStartFlow.scala @@ -9,7 +9,133 @@ import org.apache.http.util.EntityUtils object HTTPClientStartFlow { def main(args: Array[String]): Unit = { - val json = """{"flow":{"name":"test","uuid":"1234","stops":[{"uuid":"1111","name":"XmlParser","bundle":"cn.piflow.bundle.xml.XmlParser","properties":{"xmlpath":"hdfs://10.0.86.89:9000/xjzhu/dblp.mini.xml","rowTag":"phdthesis"}},{"uuid":"2222","name":"SelectField","bundle":"cn.piflow.bundle.common.SelectField","properties":{"schema":"title,author,pages"}},{"uuid":"3333","name":"PutHiveStreaming","bundle":"cn.piflow.bundle.hive.PutHiveStreaming","properties":{"database":"sparktest","table":"dblp_phdthesis"}},{"uuid":"4444","name":"CsvParser","bundle":"cn.piflow.bundle.csv.CsvParser","properties":{"csvPath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.csv","header":"false","delimiter":",","schema":"title,author,pages"}},{"uuid":"555","name":"Merge","bundle":"cn.piflow.bundle.common.Merge","properties":{}},{"uuid":"666","name":"Fork","bundle":"cn.piflow.bundle.common.Fork","properties":{"outports":["out1","out2","out3"]}},{"uuid":"777","name":"JsonSave","bundle":"cn.piflow.bundle.json.JsonSave","properties":{"jsonSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.json"}},{"uuid":"888","name":"CsvSave","bundle":"cn.piflow.bundle.csv.CsvSave","properties":{"csvSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis_result.csv","header":"true","delimiter":","}}],"paths":[{"from":"XmlParser","outport":"","inport":"","to":"SelectField"},{"from":"SelectField","outport":"","inport":"data1","to":"Merge"},{"from":"CsvParser","outport":"","inport":"data2","to":"Merge"},{"from":"Merge","outport":"","inport":"","to":"Fork"},{"from":"Fork","outport":"out1","inport":"","to":"PutHiveStreaming"},{"from":"Fork","outport":"out2","inport":"","to":"JsonSave"},{"from":"Fork","outport":"out3","inport":"","to":"CsvSave"}]}}""" + //val json:String = """{"flow":{"name":"Flow","uuid":"1234","stops":[{"uuid":"1111","name":"XmlParser","bundle":"cn.piflow.bundle.xml.XmlParser","properties":{"xmlpath":"hdfs://10.0.86.89:9000/xjzhu/dblp.mini.xml","rowTag":"phdthesis"}},{"uuid":"2222","name":"SelectField","bundle":"cn.piflow.bundle.common.SelectField","properties":{"schema":"title,author,pages"}},{"uuid":"3333","name":"PutHiveStreaming","bundle":"cn.piflow.bundle.hive.PutHiveStreaming","properties":{"database":"sparktest","table":"dblp_phdthesis"}},{"uuid":"4444","name":"CsvParser","bundle":"cn.piflow.bundle.csv.CsvParser","properties":{"csvPath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.csv","header":"false","delimiter":",","schema":"title,author,pages"}},{"uuid":"555","name":"Merge","bundle":"cn.piflow.bundle.common.Merge","properties":{}},{"uuid":"666","name":"Fork","bundle":"cn.piflow.bundle.common.Fork","properties":{"outports":["out1","out2","out3"]}},{"uuid":"777","name":"JsonSave","bundle":"cn.piflow.bundle.json.JsonSave","properties":{"jsonSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.json"}},{"uuid":"888","name":"CsvSave","bundle":"cn.piflow.bundle.csv.CsvSave","properties":{"csvSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis_result.csv","header":"true","delimiter":","}}],"paths":[{"from":"XmlParser","outport":"","inport":"","to":"SelectField"},{"from":"SelectField","outport":"","inport":"data1","to":"Merge"},{"from":"CsvParser","outport":"","inport":"data2","to":"Merge"},{"from":"Merge","outport":"","inport":"","to":"Fork"},{"from":"Fork","outport":"out1","inport":"","to":"PutHiveStreaming"},{"from":"Fork","outport":"out2","inport":"","to":"JsonSave"},{"from":"Fork","outport":"out3","inport":"","to":"CsvSave"}]}}""" + val json =""" + |{ + | "flow":{ + | "name":"xml,csv-merge-fork-hive,json,csv", + | "uuid":"1234", + | "checkpoint":"Merge", + | "stops":[ + | { + | "uuid":"1111", + | "name":"XmlParser", + | "bundle":"cn.piflow.bundle.xml.XmlParser", + | "properties":{ + | "xmlpath":"hdfs://10.0.86.89:9000/xjzhu/dblp.mini.xml", + | "rowTag":"phdthesis" + | } + | + | }, + | { + | "uuid":"2222", + | "name":"SelectField", + | "bundle":"cn.piflow.bundle.common.SelectField", + | "properties":{ + | "schema":"title,author,pages" + | } + | + | }, + | { + | "uuid":"3333", + | "name":"PutHiveStreaming", + | "bundle":"cn.piflow.bundle.hive.PutHiveStreaming", + | "properties":{ + | "database":"sparktest", + | "table":"dblp_phdthesis" + | } + | }, + | { + | "uuid":"4444", + | "name":"CsvParser", + | "bundle":"cn.piflow.bundle.csv.CsvParser", + | "properties":{ + | "csvPath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.csv", + | "header":"false", + | "delimiter":",", + | "schema":"title,author,pages" + | } + | }, + | { + | "uuid":"555", + | "name":"Merge", + | "bundle":"cn.piflow.bundle.common.Merge", + | "properties":{} + | }, + | { + | "uuid":"666", + | "name":"Fork", + | "bundle":"cn.piflow.bundle.common.Fork", + | "properties":{ + | "outports":["out1","out2","out3"] + | } + | }, + | { + | "uuid":"777", + | "name":"JsonSave", + | "bundle":"cn.piflow.bundle.json.JsonSave", + | "properties":{ + | "jsonSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.json" + | } + | }, + | { + | "uuid":"888", + | "name":"CsvSave", + | "bundle":"cn.piflow.bundle.csv.CsvSave", + | "properties":{ + | "csvSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis_result.csv", + | "header":"true", + | "delimiter":"," + | } + | } + | ], + | "paths":[ + | { + | "from":"XmlParser", + | "outport":"", + | "inport":"", + | "to":"SelectField" + | }, + | { + | "from":"SelectField", + | "outport":"", + | "inport":"data1", + | "to":"Merge" + | }, + | { + | "from":"CsvParser", + | "outport":"", + | "inport":"data2", + | "to":"Merge" + | }, + | { + | "from":"Merge", + | "outport":"", + | "inport":"", + | "to":"Fork" + | }, + | { + | "from":"Fork", + | "outport":"out1", + | "inport":"", + | "to":"PutHiveStreaming" + | }, + | { + | "from":"Fork", + | "outport":"out2", + | "inport":"", + | "to":"JsonSave" + | }, + | { + | "from":"Fork", + | "outport":"out3", + | "inport":"", + | "to":"CsvSave" + | } + | ] + | } + |} + """.stripMargin val url = "http://10.0.86.98:8001/flow/start" val client = HttpClients.createDefault() val post:HttpPost = new HttpPost(url) diff --git a/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStartFlow1.scala b/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStartFlow1.scala new file mode 100644 index 0000000..a714f1f --- /dev/null +++ b/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStartFlow1.scala @@ -0,0 +1,79 @@ +package cn.piflow.api + +import org.apache.http.client.methods.{CloseableHttpResponse, HttpPost} +import org.apache.http.entity.StringEntity +import org.apache.http.impl.client.HttpClients +import org.apache.http.util.EntityUtils + +object HTTPClientStartFlow1 { + + def main(args: Array[String]): Unit = { + val json = + """ + |{ + | "flow":{ + | "name":"xml2csv", + | "uuid":"1234", + | "checkpoint":"Merge", + | "stops":[ + | { + | "uuid":"1111", + | "name":"XmlParser", + | "bundle":"cn.piflow.bundle.xml.XmlParser", + | "properties":{ + | "xmlpath":"hdfs://10.0.86.89:9000/xjzhu/dblp.mini.xml", + | "rowTag":"phdthesis" + | } + | + | }, + | { + | "uuid":"2222", + | "name":"SelectField", + | "bundle":"cn.piflow.bundle.common.SelectField", + | "properties":{ + | "schema":"title,author,pages" + | } + | + | }, + | { + | "uuid":"888", + | "name":"CsvSave", + | "bundle":"cn.piflow.bundle.csv.CsvSave", + | "properties":{ + | "csvSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis_result.csv", + | "header":"true", + | "delimiter":"," + | } + | } + | ], + | "paths":[ + | { + | "from":"XmlParser", + | "outport":"", + | "inport":"", + | "to":"SelectField" + | }, + | { + | "from":"SelectField", + | "outport":"", + | "inport":"", + | "to":"CsvSave" + | } + | ] + | } + |} + """.stripMargin + val url = "http://10.0.86.98:8001/flow/start" + val client = HttpClients.createDefault() + val post:HttpPost = new HttpPost(url) + + post.addHeader("Content-Type", "application/json") + post.setEntity(new StringEntity(json)) + + val response:CloseableHttpResponse = client.execute(post) + val entity = response.getEntity + val str = EntityUtils.toString(entity,"UTF-8") + println("Code is " + str) + } + +} diff --git a/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStopFlow.scala b/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStopFlow.scala index 87f5788..6939d0e 100644 --- a/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStopFlow.scala +++ b/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStopFlow.scala @@ -7,7 +7,7 @@ import org.apache.http.util.EntityUtils object HTTPClientStopFlow { def main(args: Array[String]): Unit = { - val json = """{"appID":"application_1536718350536_0023"}""" + val json = """{"appID":"app-20180929163623-0059"}""" val url = "http://10.0.86.98:8001/flow/stop" val client = HttpClients.createDefault() val post:HttpPost = new HttpPost(url) diff --git a/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala b/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala index 408fd8e..9b1f133 100644 --- a/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala +++ b/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala @@ -16,6 +16,7 @@ import com.typesafe.config.ConfigFactory import scala.concurrent.Future import scala.util.parsing.json.JSON import cn.piflow.Process +import org.apache.spark.launcher.SparkAppHandle import spray.json.DefaultJsonProtocol @@ -23,7 +24,7 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup implicit val system = ActorSystem("HTTPService", ConfigFactory.load()) implicit val materializer = ActorMaterializer() implicit val executionContext = system.dispatcher - var processMap = Map[String, Process]() + var processMap = Map[String, SparkAppHandle]() def toJson(entity: RequestEntity): Map[String, Any] = { entity match { @@ -77,7 +78,7 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup processMap.get(appId) match { case Some(process) => - val result = API.stopFlow(process.asInstanceOf[Process]) + val result = API.stopFlow(process) Future.successful(HttpResponse(entity = result)) case _ => Future.successful(HttpResponse(entity = "Can not found process Error!")) @@ -105,7 +106,10 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup val stopGroups = API.getAllGroups() Future.successful(HttpResponse(entity = stopGroups)) }catch { - case _ => Future.successful(HttpResponse(entity = "Can not found stop properties Error!")) + case ex => { + println(ex) + Future.successful(HttpResponse(entity = "Can not found stop properties Error!")) + } } } diff --git a/piflow-server/src/main/scala/cn/piflow/api/StartFlowMain.scala b/piflow-server/src/main/scala/cn/piflow/api/StartFlowMain.scala new file mode 100644 index 0000000..435cd35 --- /dev/null +++ b/piflow-server/src/main/scala/cn/piflow/api/StartFlowMain.scala @@ -0,0 +1,41 @@ +package cn.piflow.api + +import cn.piflow.Runner +import cn.piflow.api.util.PropertyUtil +import cn.piflow.conf.bean.FlowBean +import cn.piflow.conf.util.OptionUtil +import org.apache.spark.sql.SparkSession + +import scala.util.parsing.json.JSON + +object StartFlowMain { + + def main(args: Array[String]): Unit = { + val flowJson = args(0) + println(flowJson) + val map = OptionUtil.getAny(JSON.parseFull(flowJson)).asInstanceOf[Map[String, Any]] + println(map) + + //create flow + val flowBean = FlowBean(map) + val flow = flowBean.constructFlow() + + //execute flow + val spark = SparkSession.builder() + .appName(flowBean.name) + .enableHiveSupport() + .getOrCreate() + + val process = Runner.create() + .bind(classOf[SparkSession].getName, spark) + //.bind("checkpoint.path", PropertyUtil.getPropertyValue("checkpoint.path")) + .bind("checkpoint.path","hdfs://10.0.86.89:9000/xjzhu/piflow/checkpoints/") + .start(flow); + val applicationId = spark.sparkContext.applicationId + process.awaitTermination(); + spark.close(); + /*new Thread( new WaitProcessTerminateRunnable(spark, process)).start() + (applicationId,process)*/ + } + +}