From 4c3385ea9fcefd257873acb93247ee50e5216aa1 Mon Sep 17 00:00:00 2001 From: judy0131 Date: Tue, 25 Sep 2018 17:35:11 +0800 Subject: [PATCH] add stop groups api --- .../src/main/scala/cn/piflow/api/API.scala | 23 +++++++++++++++++-- .../cn/piflow/api/HTTPClientGetGroups.scala | 21 +++++++++++++++++ .../scala/cn/piflow/api/HTTPService.scala | 10 ++++++++ 3 files changed, 52 insertions(+), 2 deletions(-) create mode 100644 piflow-server/src/main/scala/cn/piflow/api/HTTPClientGetGroups.scala diff --git a/piflow-server/src/main/scala/cn/piflow/api/API.scala b/piflow-server/src/main/scala/cn/piflow/api/API.scala index 0babe70..f57a65b 100644 --- a/piflow-server/src/main/scala/cn/piflow/api/API.scala +++ b/piflow-server/src/main/scala/cn/piflow/api/API.scala @@ -10,7 +10,7 @@ import cn.piflow.conf.util.ClassUtil.findConfigurableStopPropertyDescriptor import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet, HttpPost} import org.apache.http.impl.client.HttpClients import org.apache.http.util.EntityUtils - +import org.apache.spark.launcher.SparkLauncher import scala.util.parsing.json.JSON @@ -53,6 +53,20 @@ object API { //spark.close(); new Thread( new WaitProcessTerminateRunnable(spark, process)).start() (applicationId,process) + + /*val launcher = new SparkLauncher + launcher.setMaster(PropertyUtil.getPropertyValue("spark.master")) + .setAppName("test") + .setDeployMode(PropertyUtil.getPropertyValue("spark.deploy.mode")) + .setConf("spark.hadoop.yarn.resourcemanager.hostname", PropertyUtil.getPropertyValue("yarn.resourcemanager.hostname")) + .setConf("spark.hadoop.yarn.resourcemanager.address", PropertyUtil.getPropertyValue("yarn.resourcemanager.address")).setConf("spark.yarn.access.namenode", PropertyUtil.getPropertyValue("yarn.access.namenode")) + .setConf("spark.yarn.stagingDir", PropertyUtil.getPropertyValue("yarn.stagingDir")) + .setConf("spark.yarn.jars", PropertyUtil.getPropertyValue("yarn.jars")) + .setConf("spark.jars", PropertyUtil.getPropertyValue("piflow.bundle")) + .setConf("hive.metastore.uris", PropertyUtil.getPropertyValue("hive.metastore.uris")) + .setMainClass("lalla") + .addAppArgs(flowJson)*/ + } def stopFlow(process : Process): String = { @@ -89,11 +103,16 @@ object API { } + def getAllGroups() = { + val groups = ClassUtil.findAllGroups().mkString(",") + """{"groups":"""" + groups + """"}""" + } + } class WaitProcessTerminateRunnable(spark : SparkSession, process: Process) extends Runnable { override def run(): Unit = { process.awaitTermination() - spark.close() + //spark.close() } } diff --git a/piflow-server/src/main/scala/cn/piflow/api/HTTPClientGetGroups.scala b/piflow-server/src/main/scala/cn/piflow/api/HTTPClientGetGroups.scala new file mode 100644 index 0000000..c554eb1 --- /dev/null +++ b/piflow-server/src/main/scala/cn/piflow/api/HTTPClientGetGroups.scala @@ -0,0 +1,21 @@ +package cn.piflow.api + +import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet} +import org.apache.http.impl.client.HttpClients +import org.apache.http.util.EntityUtils + +object HTTPClientGetGroups { + + def main(args: Array[String]): Unit = { + + val url = "http://10.0.86.98:8001/stop/groups" + val client = HttpClients.createDefault() + val getGroups:HttpGet = new HttpGet(url) + + val response:CloseableHttpResponse = client.execute(getGroups) + val entity = response.getEntity + val str = EntityUtils.toString(entity,"UTF-8") + println("Groups is: " + str) + } + +} diff --git a/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala b/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala index 9287236..8be709c 100644 --- a/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala +++ b/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala @@ -98,6 +98,16 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup case _ => Future.successful(HttpResponse(entity = "Can not found stop properties Error!")) } } + } + case HttpRequest(GET, Uri.Path("/stop/groups"), headers, entity, protocol) =>{ + + try{ + val stopGroups = API.getAllGroups() + Future.successful(HttpResponse(entity = stopGroups)) + }catch { + case _ => Future.successful(HttpResponse(entity = "Can not found stop properties Error!")) + } + } case _: HttpRequest =>