fix stop flow bug

This commit is contained in:
judy0131 2018-12-14 18:12:21 +08:00
parent 3d48146b99
commit 212f60479a
6 changed files with 51 additions and 3 deletions

View File

@ -7,4 +7,5 @@ object FlowState {
val FAILED = "FAILED"
val ABORTED = "ABORTED"
val FORK = "FORK"
val KILLED = "KILLED"
}

View File

@ -57,6 +57,14 @@ object H2Util {
val statement = getConnectionInstance().createStatement()
statement.setQueryTimeout(QUERY_TIME)
val updateSql = "update flow set state='" + state + "' where id='" + appId + "'"
//update related stop stop when flow state is KILLED
if(state.equals(FlowState.KILLED)){
val startedStopList = getStartedStop(appId)
startedStopList.foreach(stopName => {
updateStopState(appId,stopName,StopState.KILLED)
})
}
//println(updateSql)
statement.executeUpdate(updateSql)
statement.close()
@ -239,6 +247,21 @@ object H2Util {
statement.close()
}
def getStartedStop(appId:String) : List[String] = {
val statement = getConnectionInstance().createStatement()
statement.setQueryTimeout(QUERY_TIME)
var stopList:List[String] = List()
val rs : ResultSet = statement.executeQuery("select * from stop where flowId='" + appId +"' and state = '" + StopState.STARTED + "'")
while(rs.next()){
stopList = rs.getString("name") +: stopList
}
rs.close()
statement.close()
stopList
}
def main(args: Array[String]): Unit = {
/*try{

View File

@ -5,4 +5,5 @@ object StopState {
val STARTED = "STARTED"
val COMPLETED = "COMPLETED"
val FAILED = "FAILED"
val KILLED = "KILLED"
}

View File

@ -9,8 +9,9 @@ import org.apache.spark.sql.SparkSession
import cn.piflow.conf.util.{ClassUtil, MapUtil, OptionUtil}
import cn.piflow.{Process, Runner}
import cn.piflow.api.util.PropertyUtil
import cn.piflow.util.{H2Util, HadoopFileUtil}
import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet, HttpPost}
import cn.piflow.util.{FlowState, H2Util, HadoopFileUtil}
import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet, HttpPost, HttpPut}
import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.HttpClients
import org.apache.http.util.EntityUtils
import org.apache.spark.launcher.{SparkAppHandle, SparkLauncher}
@ -136,6 +137,28 @@ object API {
"ok"
}
def stopFlow(appID : String, process : SparkAppHandle) : String = {
//yarn application kill appId
val url = PropertyUtil.getPropertyValue("yarn.url") + appID + "/state"
val client = HttpClients.createDefault()
val put:HttpPut = new HttpPut(url)
val body ="{\"state\":\"KILLED\"}"
put.addHeader("Content-Type", "application/json")
put.setEntity(new StringEntity(body))
val response:CloseableHttpResponse = client.execute(put)
val entity = response.getEntity
val str = EntityUtils.toString(entity,"UTF-8")
//process kill
process.kill()
//update db
H2Util.updateFlowState(appID, FlowState.KILLED)
"ok"
}
def getFlowInfo(appID : String) : String = {
val flowInfo = H2Util.getFlowInfo(appID)
flowInfo

View File

@ -136,7 +136,7 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup
processMap.get(appId) match {
case Some(process) =>
val result = API.stopFlow(process)
val result = API.stopFlow(appId, process)
processMap.-(appId)
Future.successful(HttpResponse(entity = result))
case ex =>{