From ac4e3043d6295f92229d90530d76c0f43604a676 Mon Sep 17 00:00:00 2001 From: judy0131 Date: Thu, 16 May 2019 16:06:26 +0800 Subject: [PATCH] add getProjectInfo API --- .../main/scala/cn/piflow/util/H2Util.scala | 52 ++++++++++++++++++- .../src/main/scala/cn/piflow/api/API.scala | 11 +++- .../piflow/api/HTTPClientGetProjectInfo.scala | 21 ++++++++ .../scala/cn/piflow/api/HTTPService.scala | 17 +++++- 4 files changed, 97 insertions(+), 4 deletions(-) create mode 100644 piflow-server/src/main/scala/cn/piflow/api/HTTPClientGetProjectInfo.scala diff --git a/piflow-core/src/main/scala/cn/piflow/util/H2Util.scala b/piflow-core/src/main/scala/cn/piflow/util/H2Util.scala index 1e32fbc..bfcfeb4 100644 --- a/piflow-core/src/main/scala/cn/piflow/util/H2Util.scala +++ b/piflow-core/src/main/scala/cn/piflow/util/H2Util.scala @@ -470,7 +470,7 @@ object H2Util { statement.close() - Map[String, Any]("flowGroup" -> flowGroupInfoMap) + Map[String, Any]("group" -> flowGroupInfoMap) } //project related api @@ -515,6 +515,56 @@ object H2Util { statement.close() } + def getProjectInfo(projectId:String) : String = { + + val projectInfoMap = getProjectInfoMap(projectId) + JsonUtil.format(JsonUtil.toJson(projectInfoMap)) + + } + + def getProjectInfoMap(projectId:String) : Map[String, Any] = { + val statement = getConnectionInstance().createStatement() + statement.setQueryTimeout(QUERY_TIME) + + + var projectInfoMap = Map[String, Any]() + + val projectRS : ResultSet = statement.executeQuery("select * from project where id='" + projectId +"'") + while (projectRS.next()){ + + projectInfoMap += ("id" -> projectRS.getString("id")) + projectInfoMap += ("name" -> projectRS.getString("name")) + projectInfoMap += ("state" -> projectRS.getString("state")) + projectInfoMap += ("startTime" -> projectRS.getString("startTime")) + projectInfoMap += ("endTime" -> projectRS.getString("endTime")) + } + projectRS.close() + + //get flowGroups info + var flowGroupList:List[Map[String, Any]] = List() + val flowGroupRS : ResultSet = statement.executeQuery("select * from flowGroup where projectId='" + projectId +"'") + while (flowGroupRS.next()){ + val flowGroupId = flowGroupRS.getString("id") + flowGroupList = getFlowGroupInfoMap(flowGroupId) +: flowGroupList + } + flowGroupRS.close() + projectInfoMap += ("groups" -> flowGroupList) + + //get flow info + var flowList:List[Map[String, Any]] = List() + val flowRS : ResultSet = statement.executeQuery("select * from flow where projectId='" + projectId +"'") + while (flowRS.next()){ + val appId = flowRS.getString("id") + flowList = getFlowInfoMap(appId) +: flowList + } + flowRS.close() + projectInfoMap += ("flows" -> flowList) + + statement.close() + + Map[String, Any]("flowGroup" -> projectInfoMap) + } + def main(args: Array[String]): Unit = { diff --git a/piflow-server/src/main/scala/cn/piflow/api/API.scala b/piflow-server/src/main/scala/cn/piflow/api/API.scala index 74891a1..141831c 100644 --- a/piflow-server/src/main/scala/cn/piflow/api/API.scala +++ b/piflow-server/src/main/scala/cn/piflow/api/API.scala @@ -8,7 +8,7 @@ import java.util.concurrent.CountDownLatch import org.apache.spark.sql.SparkSession import cn.piflow.conf.util.{ClassUtil, MapUtil, OptionUtil} -import cn.piflow.{FlowGroupExecution, Process, Runner} +import cn.piflow.{FlowGroupExecution, Process, ProjectExecution, Runner} import cn.piflow.api.util.{HdfsUtil, PropertyUtil} import cn.piflow.conf.bean.{FlowGroupBean, ProjectBean} import cn.piflow.util.{FlowState, H2Util, HadoopFileUtil} @@ -45,6 +45,15 @@ object API { } + def stopProject(projectExecution : ProjectExecution): Unit ={ + projectExecution.stop() + } + + def getProjectInfo(projectId : String) : String = { + val projectInfo = H2Util.getProjectInfo(projectId) + projectInfo + } + def startFlowGroup(flowGroupJson : String) = { println("StartFlowGroup API get json: \n" + flowGroupJson ) diff --git a/piflow-server/src/main/scala/cn/piflow/api/HTTPClientGetProjectInfo.scala b/piflow-server/src/main/scala/cn/piflow/api/HTTPClientGetProjectInfo.scala new file mode 100644 index 0000000..2907bc4 --- /dev/null +++ b/piflow-server/src/main/scala/cn/piflow/api/HTTPClientGetProjectInfo.scala @@ -0,0 +1,21 @@ +package cn.piflow.api + +import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet} +import org.apache.http.impl.client.HttpClients +import org.apache.http.util.EntityUtils + +object HTTPClientGetProjectInfo { + + def main(args: Array[String]): Unit = { + + val url = "http://10.0.86.98:8001/project/info?projectId=project_9ababcea-0e5c-4825-a005-4591c09ec9c4" + val client = HttpClients.createDefault() + val getFlowGroupInfoData:HttpGet = new HttpGet(url) + + val response:CloseableHttpResponse = client.execute(getFlowGroupInfoData) + val entity = response.getEntity + val str = EntityUtils.toString(entity,"UTF-8") + println("Code is " + str) + } + +} diff --git a/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala b/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala index 91ee511..9f2bfde 100644 --- a/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala +++ b/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala @@ -274,7 +274,7 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup Future.successful(HttpResponse(entity = result)) }else{ - Future.successful(HttpResponse(entity = "groupId is null or flow run failed!")) + Future.successful(HttpResponse(entity = "groupId is null or flowGroup run failed!")) } } @@ -308,7 +308,7 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup projectMap.get(projectId) match { case Some(projectExecution) => - val result = projectExecution.stop() + val result = API.stopProject(projectExecution) projectMap.-(projectId) Future.successful(HttpResponse(entity = "Stop project Ok!!!")) case ex =>{ @@ -321,6 +321,19 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup } } + case HttpRequest(GET, Uri.Path("/project/info"), headers, entity, protocol) =>{ + + val projectId = req.getUri().query().getOrElse("projectId","") + if(!projectId.equals("")){ + var result = API.getProjectInfo(projectId) + println("getProjectInfo result: " + result) + + Future.successful(HttpResponse(entity = result)) + }else{ + Future.successful(HttpResponse(entity = "projectId is null or project run failed!")) + } + } + case _: HttpRequest => Future.successful(HttpResponse(404, entity = "Unknown resource!")) }