add terminal flowGroup API

This commit is contained in:
judy0131 2019-05-09 17:38:19 +08:00
parent f36dd9cf64
commit 33e4561072
6 changed files with 154 additions and 97 deletions

View File

@ -1,12 +1,13 @@
package cn.piflow
import java.sql.Date
import java.util.Date
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{CountDownLatch, TimeUnit}
import scala.collection.mutable.{ArrayBuffer, Map => MMap}
import cn.piflow.Execution
import cn.piflow.util.{FlowLauncher, PropertyUtil}
import cn.piflow.util.{FlowLauncher, FlowState, H2Util, PropertyUtil}
import org.apache.spark.launcher.SparkAppHandle.State
import org.apache.spark.launcher.{SparkAppHandle, SparkLauncher}
import org.apache.spark.sql.SparkSession
@ -72,6 +73,7 @@ class FlowGroupExecutionImpl(fg: FlowGroup, runnerContext: Context, runner: Runn
val numWaitingProcesses = new AtomicInteger(mapFlowWithConditions.size);
val startedProcesses = MMap[String, SparkAppHandle]();
val startedProcessesAppID = MMap[String, String]()
val execution = this;
val POLLING_INTERVAL = 1000;
@ -111,11 +113,11 @@ class FlowGroupExecutionImpl(fg: FlowGroup, runnerContext: Context, runner: Runn
println("Spark job's state changed to: " + sparkAppState)
}
//TODO: get the process status
if (handle.getState.equals(State.FINISHED)){
if(H2Util.getFlowState(appId).equals(FlowState.COMPLETED)){
completedProcesses(flow.getFlowName()) = true;
numWaitingProcesses.decrementAndGet();
}
if (handle.getState().isFinal){
countDownLatch.countDown()
println("Task is finished!")
@ -128,7 +130,13 @@ class FlowGroupExecutionImpl(fg: FlowGroup, runnerContext: Context, runner: Runn
}
)
while (appId == null){
appId = handle.getAppId
Thread.sleep(100)
}
startedProcesses(name) = handle;
startedProcessesAppID(name) = appId
}
val pollingThread = new Thread(new Runnable() {
@ -136,27 +144,36 @@ class FlowGroupExecutionImpl(fg: FlowGroup, runnerContext: Context, runner: Runn
runnerListener.onFlowGroupStarted(flowGroupContext)
while (numWaitingProcesses.get() > 0) {
val todos = ArrayBuffer[(String, Flow)]();
mapFlowWithConditions.foreach { en =>
if (!startedProcesses.contains(en._1) && en._2._2.matches(execution)) {
todos += (en._1 -> en._2._1);
try{
while (numWaitingProcesses.get() > 0) {
val todos = ArrayBuffer[(String, Flow)]();
mapFlowWithConditions.foreach { en =>
if (!startedProcesses.contains(en._1) && en._2._2.matches(execution)) {
todos += (en._1 -> en._2._1);
}
}
startedProcesses.synchronized {
todos.foreach(en => startProcess(en._1, en._2));
}
Thread.sleep(POLLING_INTERVAL);
}
startedProcesses.synchronized {
todos.foreach(en => startProcess(en._1, en._2));
}
Thread.sleep(POLLING_INTERVAL);
runnerListener.onFlowGroupCompleted(flowGroupContext)
}catch {
case e: Throwable =>
runnerListener.onFlowGroupFailed(flowGroupContext);
throw e;
}
finally {
latch.countDown();
finalizeExecution(true);
}
latch.countDown();
finalizeExecution(true);
runnerListener.onFlowGroupCompleted(flowGroupContext)
//TODO: how to define FlowGroup Failed
//runnerListener.onFlowGroupFailed(ctx)
}
});
@ -179,8 +196,20 @@ class FlowGroupExecutionImpl(fg: FlowGroup, runnerContext: Context, runner: Runn
private def finalizeExecution(completed: Boolean): Unit = {
if (running) {
if (!completed) {
//startedProcesses.filter(x => isEntryCompleted(x._1)).map(_._2).foreach(_.stop());
startedProcesses.filter(x => !isEntryCompleted(x._1)).foreach(x => {
x._2.stop()
val appID: String = startedProcessesAppID.getOrElse(x._1,"")
if(!appID.equals("")){
println("Stop Flow " + appID + " by FlowLauncher!")
FlowLauncher.stop(appID)
}
});
pollingThread.interrupt();
startedProcesses.filter(x => isEntryCompleted(x._1)).map(_._2).foreach(_.stop());
}
running = false;

View File

@ -212,8 +212,8 @@ class ProjectExecutionImpl(project: Project, runnerContext: Context, runner: Run
if (running) {
if (!completed) {
pollingThread.interrupt();
startedProcesses.filter(x => isEntryCompleted(x._1)).map(_._2).foreach(_.stop());
startedFlowGroup.filter(x => isEntryCompleted(x._1)).map(_._2).foreach(_.stop());
startedProcesses.filter(x => !isEntryCompleted(x._1)).map(_._2).foreach(_.stop());
startedFlowGroup.filter(x => !isEntryCompleted(x._1)).map(_._2).foreach(_.stop());
}
runner.removeListener(listener);

View File

@ -1,8 +1,14 @@
package cn.piflow.util
import java.util.Date
import java.util.concurrent.CountDownLatch
import cn.piflow.Flow
import org.apache.http.client.methods.{CloseableHttpResponse, HttpPut}
import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.HttpClients
import org.apache.http.util.EntityUtils
import org.apache.spark.launcher.SparkAppHandle.State
import org.apache.spark.launcher.{SparkAppHandle, SparkLauncher}
@ -49,4 +55,27 @@ object FlowLauncher {
sparkLauncher
}
def stop(appID: String) = {
println("Stop Flow !!!!!!!!!!!!!!!!!!!!!!!!!!")
//yarn application kill appId
val url = PropertyUtil.getPropertyValue("yarn.url") + appID + "/state"
val client = HttpClients.createDefault()
val put:HttpPut = new HttpPut(url)
val body ="{\"state\":\"KILLED\"}"
put.addHeader("Content-Type", "application/json")
put.setEntity(new StringEntity(body))
val response:CloseableHttpResponse = client.execute(put)
val entity = response.getEntity
val str = EntityUtils.toString(entity,"UTF-8")
//update db
println("Update flow state after Stop Flow !!!!!!!!!!!!!!!!!!!!!!!!!!")
H2Util.updateFlowState(appID, FlowState.KILLED)
H2Util.updateFlowFinishedTime(appID, new Date().toString)
"ok"
}
}

View File

@ -54,86 +54,14 @@ object API {
val flowGroupBean = FlowGroupBean(map)
val flowGroup = flowGroupBean.constructFlowGroup()
val process = Runner.create()
val flowGroupExecution = Runner.create()
.bind("checkpoint.path",PropertyUtil.getPropertyValue("checkpoint.path"))
.bind("debug.path",PropertyUtil.getPropertyValue("debug.path"))
.start(flowGroup);
(flowGroupBean.name, flowGroupExecution)
}
/*def startFlowGroup(flowGroupJson : String):(String,String,SparkAppHandle) = {
var appId:String = null
val map = OptionUtil.getAny(JSON.parseFull(flowGroupJson)).asInstanceOf[Map[String, Any]]
val flowGroupMap = MapUtil.get(map, "group").asInstanceOf[Map[String, Any]]
/*val uuid = MapUtil.get(flowMap,"uuid").asInstanceOf[String]
val appName = MapUtil.get(flowMap,"name").asInstanceOf[String]*/
val dirverMem = flowGroupMap.getOrElse("driverMemory","1g").asInstanceOf[String]
val executorNum = flowGroupMap.getOrElse("executorNumber","1").asInstanceOf[String]
val executorMem= flowGroupMap.getOrElse("executorMemory","1g").asInstanceOf[String]
val executorCores = flowGroupMap.getOrElse("executorCores","1").asInstanceOf[String]
//val (stdout, stderr) = getLogFile(uuid, appName)
println("StartFlowGroup API get json: \n" + flowGroupJson )
val countDownLatch = new CountDownLatch(1)
val launcher = new SparkLauncher
val handle =launcher
.setAppName("TestFlowGroup")
.setMaster(PropertyUtil.getPropertyValue("spark.master"))
.setDeployMode(PropertyUtil.getPropertyValue("spark.deploy.mode"))
.setAppResource(PropertyUtil.getPropertyValue("piflow.bundle"))
.setVerbose(true)
.setConf("spark.hadoop.yarn.resourcemanager.hostname", PropertyUtil.getPropertyValue("yarn.resourcemanager.hostname"))
.setConf("spark.hadoop.yarn.resourcemanager.address", PropertyUtil.getPropertyValue("yarn.resourcemanager.address"))
.setConf("spark.yarn.access.namenode", PropertyUtil.getPropertyValue("yarn.access.namenode"))
.setConf("spark.yarn.stagingDir", PropertyUtil.getPropertyValue("yarn.stagingDir"))
.setConf("spark.yarn.jars", PropertyUtil.getPropertyValue("yarn.jars"))
.setConf("spark.jars", PropertyUtil.getPropertyValue("piflow.bundle"))
.setConf("spark.hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.setConf("spark.driver.memory", dirverMem)
.setConf("spark.num.executors",executorNum)
.setConf("spark.executor.memory", executorMem)
.setConf("spark.executor.cores",executorCores)
//.setConf("spark.cores.max", "4")
//.setConf("spark.checkpoint", PropertyUtil.getPropertyValue("checkpoint.path"))
.addFile(PropertyUtil.getConfigureFile())
.setMainClass("cn.piflow.api.StartFlowGroupMain")
.addAppArgs(flowGroupJson.stripMargin)
//.redirectOutput(stdout)
//.redirectError(stderr)
.startApplication( new SparkAppHandle.Listener {
override def stateChanged(handle: SparkAppHandle): Unit = {
appId = handle.getAppId
val sparkAppState = handle.getState
if(appId != null){
println("Spark job with app id: " + appId + ",\t State changed to: " + sparkAppState)
}else{
println("Spark job's state changed to: " + sparkAppState)
}
if (handle.getState().isFinal){
countDownLatch.countDown()
println("Task is finished!")
}
}
override def infoChanged(handle: SparkAppHandle): Unit = {
//println("Info:" + handle.getState().toString)
}
}
)
while (appId == null){
Thread.sleep(1000)
}
var processId = ""
while(processId.equals("")){
Thread.sleep(1000)
processId = H2Util.getFlowProcessId(appId)
}
(appId, processId, handle)
}*/
def startFlow(flowJson : String):(String,String,SparkAppHandle) = {
var appId:String = null

View File

@ -0,0 +1,24 @@
package cn.piflow.api
import org.apache.http.client.methods.{CloseableHttpResponse, HttpPost}
import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.HttpClients
import org.apache.http.util.EntityUtils
object HTTPClientStopFlowGroup {
def main(args: Array[String]): Unit = {
val json = """{"groupName":"FlowGroup"}"""
val url = "http://10.0.86.98:8001/flowGroup/stop"
val client = HttpClients.createDefault()
val post:HttpPost = new HttpPost(url)
post.addHeader("Content-Type", "application/json")
post.setEntity(new StringEntity(json))
val response:CloseableHttpResponse = client.execute(post)
val entity = response.getEntity
val str = EntityUtils.toString(entity,"UTF-8")
println(str)
}
}

View File

@ -7,6 +7,7 @@ import akka.http.scaladsl.model._
import akka.http.scaladsl.model.HttpMethods._
import akka.http.scaladsl.server.Directives
import akka.stream.ActorMaterializer
import cn.piflow.FlowGroupExecution
import cn.piflow.api.util.PropertyUtil
import cn.piflow.conf.util.{MapUtil, OptionUtil}
import com.typesafe.config.ConfigFactory
@ -24,6 +25,7 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup
implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher
var processMap = Map[String, SparkAppHandle]()
var flowGroupMap = Map[String, FlowGroupExecution]()
def toJson(entity: RequestEntity): Map[String, Any] = {
entity match {
@ -225,7 +227,8 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup
case HttpEntity.Strict(_, data) =>{
var flowGroupJson = data.utf8String
flowGroupJson = flowGroupJson.replaceAll("}","}\n")
API.startFlowGroup(flowGroupJson)
val (flowName, flowGroupExecution) = API.startFlowGroup(flowGroupJson)
flowGroupMap += (flowName -> flowGroupExecution)
Future.successful(HttpResponse(entity = "start flow group ok!!!"))
}
@ -238,6 +241,28 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup
}
case HttpRequest(POST, Uri.Path("/flowGroup/stop"), headers, entity, protocol) =>{
val data = toJson(entity)
val groupName = data.get("groupName").getOrElse("").asInstanceOf[String]
if(groupName.equals("") || !flowGroupMap.contains(groupName)){
Future.failed(new Exception("Can not found flowGroup Error!"))
}else{
flowGroupMap.get(groupName) match {
case Some(flowGroupExecution) =>
val result = flowGroupExecution.stop()
flowGroupMap.-(groupName)
Future.successful(HttpResponse(entity = "Stop FlowGroup Ok!!!"))
case ex =>{
println(ex)
Future.successful(HttpResponse(entity = "Can not found FlowGroup Error!"))
}
}
}
}
case HttpRequest(POST, Uri.Path("/project/start"), headers, entity, protocol) =>{
entity match {
@ -257,6 +282,28 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup
}
/*case HttpRequest(POST, Uri.Path("/project/stop"), headers, entity, protocol) =>{
val data = toJson(entity)
val projectName = data.get("projectName").getOrElse("").asInstanceOf[String]
if(projectName.equals("") || !flowGroupMap.contains(projectName)){
Future.failed(new Exception("Can not found flowGroup Error!"))
}else{
flowGroupMap.get(groupName) match {
case Some(flowGroupExecution) =>
val result = flowGroupExecution.stop()
flowGroupMap.-(groupName)
Future.successful(HttpResponse(entity = "Stop FlowGroup Ok!!!"))
case ex =>{
println(ex)
Future.successful(HttpResponse(entity = "Can not found FlowGroup Error!"))
}
}
}
}*/
case _: HttpRequest =>
Future.successful(HttpResponse(404, entity = "Unknown resource!"))
}