forked from opensci/piflow
add terminal project api
This commit is contained in:
parent
33e4561072
commit
760a564c19
|
@ -4,7 +4,7 @@ import java.sql.Date
|
|||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import java.util.concurrent.{CountDownLatch, TimeUnit}
|
||||
|
||||
import cn.piflow.util.{FlowLauncher, PropertyUtil}
|
||||
import cn.piflow.util.{FlowLauncher, FlowState, H2Util, PropertyUtil}
|
||||
import org.apache.spark.launcher.{SparkAppHandle, SparkLauncher}
|
||||
import org.apache.spark.launcher.SparkAppHandle.State
|
||||
|
||||
|
@ -65,6 +65,8 @@ class ProjectExecutionImpl(project: Project, runnerContext: Context, runner: Run
|
|||
val startedProcesses = MMap[String, SparkAppHandle]();
|
||||
val startedFlowGroup = MMap[String, FlowGroupExecution]()
|
||||
|
||||
val startedProcessesAppID = MMap[String, String]()
|
||||
|
||||
val execution = this;
|
||||
val POLLING_INTERVAL = 1000;
|
||||
val latch = new CountDownLatch(1);
|
||||
|
@ -120,6 +122,7 @@ class ProjectExecutionImpl(project: Project, runnerContext: Context, runner: Run
|
|||
|
||||
var flowJson = flow.getFlowJson()
|
||||
flowJson = flowJson.replaceAll("}","}\n")
|
||||
|
||||
var appId : String = ""
|
||||
val countDownLatch = new CountDownLatch(1)
|
||||
|
||||
|
@ -133,11 +136,11 @@ class ProjectExecutionImpl(project: Project, runnerContext: Context, runner: Run
|
|||
println("Spark job's state changed to: " + sparkAppState)
|
||||
}
|
||||
|
||||
//TODO: get the process status
|
||||
if (handle.getState.equals(State.FINISHED)){
|
||||
if(H2Util.getFlowState(appId).equals(FlowState.COMPLETED)){
|
||||
completedProjectEntry(flow.getFlowName()) = true;
|
||||
numWaitingProjectEntry.decrementAndGet();
|
||||
}
|
||||
|
||||
if (handle.getState().isFinal){
|
||||
countDownLatch.countDown()
|
||||
println("Task is finished!")
|
||||
|
@ -150,7 +153,13 @@ class ProjectExecutionImpl(project: Project, runnerContext: Context, runner: Run
|
|||
}
|
||||
)
|
||||
|
||||
|
||||
while (appId == null){
|
||||
appId = handle.getAppId
|
||||
Thread.sleep(100)
|
||||
}
|
||||
startedProcesses(name) = handle;
|
||||
startedProcessesAppID(name) = appId
|
||||
}
|
||||
|
||||
private def startFlowGroup(name: String, flowGroup: FlowGroup): Unit = {
|
||||
|
@ -211,9 +220,21 @@ class ProjectExecutionImpl(project: Project, runnerContext: Context, runner: Run
|
|||
private def finalizeExecution(completed: Boolean): Unit = {
|
||||
if (running) {
|
||||
if (!completed) {
|
||||
pollingThread.interrupt();
|
||||
startedProcesses.filter(x => !isEntryCompleted(x._1)).map(_._2).foreach(_.stop());
|
||||
//pollingThread.interrupt();
|
||||
//startedProcesses.filter(x => !isEntryCompleted(x._1)).map(_._2).foreach(_.stop());
|
||||
startedProcesses.filter(x => !isEntryCompleted(x._1)).foreach(x => {
|
||||
|
||||
x._2.stop()
|
||||
val appID: String = startedProcessesAppID.getOrElse(x._1,"")
|
||||
if(!appID.equals("")){
|
||||
println("Stop Flow " + appID + " by FlowLauncher!")
|
||||
FlowLauncher.stop(appID)
|
||||
}
|
||||
|
||||
});
|
||||
startedFlowGroup.filter(x => !isEntryCompleted(x._1)).map(_._2).foreach(_.stop());
|
||||
pollingThread.interrupt();
|
||||
|
||||
}
|
||||
|
||||
runner.removeListener(listener);
|
||||
|
|
|
@ -36,10 +36,13 @@ object API {
|
|||
val projectBean = ProjectBean(map)
|
||||
val project = projectBean.constructProject()
|
||||
|
||||
val process = Runner.create()
|
||||
val projectExecution = Runner.create()
|
||||
.bind("checkpoint.path",PropertyUtil.getPropertyValue("checkpoint.path"))
|
||||
.bind("debug.path",PropertyUtil.getPropertyValue("debug.path"))
|
||||
.start(project);
|
||||
|
||||
(projectBean.name,projectExecution)
|
||||
|
||||
}
|
||||
|
||||
def startFlowGroup(flowGroupJson : String) = {
|
||||
|
|
|
@ -0,0 +1,24 @@
|
|||
package cn.piflow.api
|
||||
|
||||
import org.apache.http.client.methods.{CloseableHttpResponse, HttpPost}
|
||||
import org.apache.http.entity.StringEntity
|
||||
import org.apache.http.impl.client.HttpClients
|
||||
import org.apache.http.util.EntityUtils
|
||||
|
||||
object HTTPClientStopProject {
|
||||
def main(args: Array[String]): Unit = {
|
||||
val json = """{"projectName":"TestProject"}"""
|
||||
val url = "http://10.0.86.98:8001/project/stop"
|
||||
val client = HttpClients.createDefault()
|
||||
val post:HttpPost = new HttpPost(url)
|
||||
|
||||
post.addHeader("Content-Type", "application/json")
|
||||
post.setEntity(new StringEntity(json))
|
||||
|
||||
val response:CloseableHttpResponse = client.execute(post)
|
||||
val entity = response.getEntity
|
||||
val str = EntityUtils.toString(entity,"UTF-8")
|
||||
println(str)
|
||||
}
|
||||
|
||||
}
|
|
@ -7,7 +7,7 @@ import akka.http.scaladsl.model._
|
|||
import akka.http.scaladsl.model.HttpMethods._
|
||||
import akka.http.scaladsl.server.Directives
|
||||
import akka.stream.ActorMaterializer
|
||||
import cn.piflow.FlowGroupExecution
|
||||
import cn.piflow.{FlowGroupExecution, ProjectExecution}
|
||||
import cn.piflow.api.util.PropertyUtil
|
||||
import cn.piflow.conf.util.{MapUtil, OptionUtil}
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
@ -26,6 +26,7 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup
|
|||
implicit val executionContext = system.dispatcher
|
||||
var processMap = Map[String, SparkAppHandle]()
|
||||
var flowGroupMap = Map[String, FlowGroupExecution]()
|
||||
var projectMap = Map[String, ProjectExecution]()
|
||||
|
||||
def toJson(entity: RequestEntity): Map[String, Any] = {
|
||||
entity match {
|
||||
|
@ -269,7 +270,8 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup
|
|||
case HttpEntity.Strict(_, data) =>{
|
||||
var projectJson = data.utf8String
|
||||
projectJson = projectJson.replaceAll("}","}\n")
|
||||
API.startProject(projectJson)
|
||||
val (projectName, projectExecution) = API.startProject(projectJson)
|
||||
projectMap += (projectName -> projectExecution)
|
||||
Future.successful(HttpResponse(entity = "start project ok!!!"))
|
||||
}
|
||||
|
||||
|
@ -282,27 +284,27 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup
|
|||
|
||||
}
|
||||
|
||||
/*case HttpRequest(POST, Uri.Path("/project/stop"), headers, entity, protocol) =>{
|
||||
case HttpRequest(POST, Uri.Path("/project/stop"), headers, entity, protocol) =>{
|
||||
val data = toJson(entity)
|
||||
val projectName = data.get("projectName").getOrElse("").asInstanceOf[String]
|
||||
if(projectName.equals("") || !flowGroupMap.contains(projectName)){
|
||||
Future.failed(new Exception("Can not found flowGroup Error!"))
|
||||
if(projectName.equals("") || !projectMap.contains(projectName)){
|
||||
Future.failed(new Exception("Can not found project Error!"))
|
||||
}else{
|
||||
|
||||
flowGroupMap.get(groupName) match {
|
||||
case Some(flowGroupExecution) =>
|
||||
val result = flowGroupExecution.stop()
|
||||
flowGroupMap.-(groupName)
|
||||
Future.successful(HttpResponse(entity = "Stop FlowGroup Ok!!!"))
|
||||
projectMap.get(projectName) match {
|
||||
case Some(projectExecution) =>
|
||||
val result = projectExecution.stop()
|
||||
projectMap.-(projectName)
|
||||
Future.successful(HttpResponse(entity = "Stop project Ok!!!"))
|
||||
case ex =>{
|
||||
println(ex)
|
||||
Future.successful(HttpResponse(entity = "Can not found FlowGroup Error!"))
|
||||
Future.successful(HttpResponse(entity = "Can not found project Error!"))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
}*/
|
||||
}
|
||||
|
||||
case _: HttpRequest =>
|
||||
Future.successful(HttpResponse(404, entity = "Unknown resource!"))
|
||||
|
|
Loading…
Reference in New Issue