Merge remote-tracking branch 'origin/master'

This commit is contained in:
bao319 2020-04-27 14:46:12 +08:00
commit 8b4269fa82
4 changed files with 536 additions and 24 deletions

View File

@ -1,14 +1,15 @@
package cn.piflow
import java.lang.Thread.UncaughtExceptionHandler
import java.util.Date
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{CountDownLatch, TimeUnit}
import cn.piflow.util._
import org.apache.spark.launcher.{SparkAppHandle, SparkLauncher}
import org.apache.spark.launcher.SparkAppHandle.State
import scala.collection.mutable.{ArrayBuffer, Map => MMap}
import scala.util.{Failure, Success, Try}
/**
@ -84,8 +85,8 @@ class GroupExecutionImpl(group: Group, runnerContext: Context, runner: Runner) e
val mapGroupEntryWithConditions: Map[String, (GroupEntry, Condition[GroupExecution])] = group.mapFlowWithConditions();
val completedGroupEntry = MMap[String, Boolean]();
completedGroupEntry ++= mapGroupEntryWithConditions.map(x => (x._1, false));
val numWaitingGroupEntry = new AtomicInteger(mapGroupEntryWithConditions.size);
completedGroupEntry ++= mapGroupEntryWithConditions.map(x => (x._1, false))
val numWaitingGroupEntry = new AtomicInteger(mapGroupEntryWithConditions.size)
val startedProcesses = MMap[String, SparkAppHandle]();
val startedGroup = MMap[String, GroupExecution]()
@ -199,9 +200,9 @@ class GroupExecutionImpl(group: Group, runnerContext: Context, runner: Runner) e
H2Util.updateFlowGroupId(appId, groupId)
}
startedProcesses(name) = handle;
startedProcesses(name) = handle
startedProcessesAppID(name) = appId
}
}
private def startGroup(name: String, group: Group, parentId: String): Unit = {
val groupExecution = runner.start(group);
@ -215,6 +216,9 @@ class GroupExecutionImpl(group: Group, runnerContext: Context, runner: Runner) e
}
}
@volatile
var maybeException:Option[Throwable] = None
val pollingThread = new Thread(new Runnable() {
override def run(): Unit = {
@ -222,28 +226,25 @@ class GroupExecutionImpl(group: Group, runnerContext: Context, runner: Runner) e
try{
while (numWaitingGroupEntry.get() > 0) {
val todosFlow = ArrayBuffer[(String, Flow)]();
val todosGroup = ArrayBuffer[(String, Group)]();
mapGroupEntryWithConditions.foreach { en =>
if(en._2._1.isInstanceOf[Flow]){
if (!startedProcesses.contains(en._1) && en._2._2.matches(execution)) {
todosFlow += (en._1 -> en._2._1.asInstanceOf[Flow]);
}
}else if (en._2._1.isInstanceOf[Group]){
if (!startedGroup.contains(en._1) && en._2._2.matches(execution)) {
todosGroup += (en._1 -> en._2._1.asInstanceOf[Group]);
}
}
val (todosFlow, todosGroup) = getTodos()
if(todosFlow.size == 0 && todosGroup.size == 0 && H2Util.isGroupChildError(id) && !H2Util.isGroupChildRunning(id)){
val (todosFlow, todosGroup) = getTodos()
if(todosFlow.size == 0 && todosGroup.size == 0)
throw new GroupException("Group Failed!")
}
startedProcesses.synchronized {
todosFlow.foreach(en => startProcess(en._1, en._2.asInstanceOf[Flow],id));
todosFlow.foreach(en => {
startProcess(en._1, en._2.asInstanceOf[Flow],id)
});
}
startedGroup.synchronized{
todosGroup.foreach(en => startGroup(en._1, en._2.asInstanceOf[Group],id))
todosGroup.foreach(en => {
startGroup(en._1, en._2.asInstanceOf[Group],id)
})
}
Thread.sleep(POLLING_INTERVAL);
@ -254,7 +255,9 @@ class GroupExecutionImpl(group: Group, runnerContext: Context, runner: Runner) e
}catch {
case e: Throwable =>
runnerListener.onGroupFailed(groupContext);
throw e;
println(e)
if(e.isInstanceOf[GroupException])
throw e
}
finally {
latch.countDown();
@ -263,7 +266,26 @@ class GroupExecutionImpl(group: Group, runnerContext: Context, runner: Runner) e
}
});
pollingThread.start();
val doit = Try{
pollingThread.setUncaughtExceptionHandler( new UncaughtExceptionHandler {
override def uncaughtException(thread: Thread, throwable: Throwable): Unit = {
maybeException = Some(throwable)
}
})
pollingThread.start()
//pollingThread.join()
}
doit match {
case Success(v) => {
println("Did not capture error!")
}
case Failure(v) =>{
println("Capture error!")
runnerListener.onGroupFailed(groupContext)
}
}
override def awaitTermination(): Unit = {
latch.await();
@ -319,6 +341,25 @@ class GroupExecutionImpl(group: Group, runnerContext: Context, runner: Runner) e
};
}
private def getTodos() : (ArrayBuffer[(String, Flow)], ArrayBuffer[(String, Group)]) = {
val todosFlow = ArrayBuffer[(String, Flow)]();
val todosGroup = ArrayBuffer[(String, Group)]();
mapGroupEntryWithConditions.foreach { en =>
if(en._2._1.isInstanceOf[Flow]){
if (!startedProcesses.contains(en._1) && en._2._2.matches(execution)) {
todosFlow += (en._1 -> en._2._1.asInstanceOf[Flow]);
}
}else if (en._2._1.isInstanceOf[Group]){
if (!startedGroup.contains(en._1) && en._2._2.matches(execution)) {
todosGroup += (en._1 -> en._2._1.asInstanceOf[Group]);
}
}
}
(todosFlow, todosGroup)
}
override def getGroupId(): String = id
override def getChildCount(): Int = {

View File

@ -0,0 +1,5 @@
package cn.piflow
class GroupException(msg : String) extends Exception(msg){
}

View File

@ -7,6 +7,7 @@ import java.util.Date
import net.liftweb.json.compactRender
import net.liftweb.json.JsonDSL._
import org.h2.tools.Server
import scala.util.control.Breaks.{breakable}
object H2Util {
@ -461,6 +462,68 @@ object H2Util {
return groupState
}
def isGroupChildError( groupId : String) : Boolean = {
if(getGroupChildByStatus(groupId, GroupState.FAILED).size > 0 || getGroupChildByStatus(groupId, GroupState.KILLED).size > 0)
return true
else if(getFlowChildByStatus(groupId, FlowState.FAILED).size > 0 || getFlowChildByStatus(groupId, FlowState.KILLED).size > 0)
return true
else
return false
}
def isGroupChildRunning( groupId : String) : Boolean = {
if(getGroupChildByStatus(groupId, GroupState.STARTED).size > 0 )
return true
else if(getFlowChildByStatus(groupId, FlowState.STARTED).size > 0 )
return true
else
return false
}
def getGroupChildByStatus(groupId: String, status : String) : List[String] = {
val statement = getConnectionInstance().createStatement()
statement.setQueryTimeout(QUERY_TIME)
var failedList = List[String]()
//group children state
val groupRS : ResultSet = statement.executeQuery("select * from flowGroup where parentId='" + groupId +"'")
breakable{
while (groupRS.next()){
val groupName = groupRS.getString("name")
val groupState = groupRS.getString("state")
if(groupState == status){
failedList = groupName +: failedList
}
}
}
groupRS.close()
statement.close()
return failedList
}
def getFlowChildByStatus(groupId: String, status : String) : List[String] = {
val statement = getConnectionInstance().createStatement()
statement.setQueryTimeout(QUERY_TIME)
var failedList = List[String]()
//flow children state
val rs : ResultSet = statement.executeQuery("select * from flow where groupId='" + groupId +"'")
breakable{
while(rs.next()){
val flowName = rs.getString("name")
val flowState = rs.getString("state")
if(flowState == status){
failedList = flowName +: failedList
}
}
}
rs.close()
statement.close()
return failedList
}
def getFlowGroupInfo(groupId:String) : String = {
val flowGroupInfoMap = getGroupInfoMap(groupId)

View File

@ -10,7 +10,7 @@ object HTTPClientStartFlowGroup {
def main(args: Array[String]): Unit = {
val json =
/*val json =
"""
|{
| "group": {
@ -294,9 +294,412 @@ object HTTPClientStartFlowGroup {
| }
|}
|
""".stripMargin*/
/*val json=
"""
|{
| "group" : {
| "name" : "xjzhu",
| "flows" : [ {
| "flow" : {
| "executorNumber" : "1",
| "driverMemory" : "1g",
| "executorMemory" : "1g",
| "executorCores" : "1",
| "paths" : [ ],
| "name" : "flow1",
| "stops" : [ {
| "customizedProperties" : { },
| "name" : "SelectHiveQL",
| "uuid" : "8a80d88d712aa8c601717c68f7220272",
| "bundle" : "cn.piflow.bundle.hive.SelectHiveQL",
| "properties" : {
| "hiveQL" : "show databases"
| }
| } ],
| "uuid" : "8a80d88d712aa8c601717c68f7220271"
| }
| } ],
| "groups" : [ {
| "group" : {
| "flows" : [ {
| "flow" : {
| "executorNumber" : "1",
| "driverMemory" : "1g",
| "executorMemory" : "1g",
| "executorCores" : "1",
| "paths" : [ ],
| "name" : "flow3",
| "stops" : [ {
| "customizedProperties" : { },
| "name" : "SelectHiveQL",
| "uuid" : "8a80d88d712aa8c601717c68f720026b",
| "bundle" : "cn.piflow.bundle.hive.SelectHiveQL",
| "properties" : {
| "hiveQL" : "show databases"
| }
| } ],
| "uuid" : "8a80d88d712aa8c601717c68f71f026a"
| }
| }, {
| "flow" : {
| "executorNumber" : "1",
| "driverMemory" : "1g",
| "executorMemory" : "1g",
| "executorCores" : "1",
| "paths" : [ ],
| "name" : "flow2",
| "stops" : [ {
| "customizedProperties" : { },
| "name" : "SelectHiveQL",
| "uuid" : "8a80d88d712aa8c601717c68f721026e",
| "bundle" : "cn.piflow.bundle.hive.SelectHiveQL",
| "properties" : {
| "hiveQL" : "show databases;"
| }
| } ],
| "uuid" : "8a80d88d712aa8c601717c68f721026d"
| }
| },{
| "flow" : {
| "executorNumber" : "1",
| "driverMemory" : "1g",
| "executorMemory" : "1g",
| "executorCores" : "1",
| "paths" : [ ],
| "name" : "flow4",
| "stops" : [ {
| "customizedProperties" : { },
| "name" : "SelectHiveQL",
| "uuid" : "8a80d88d712aa8c601717c68f720026b",
| "bundle" : "cn.piflow.bundle.hive.SelectHiveQL",
| "properties" : {
| "hiveQL" : "show databases"
| }
| } ],
| "uuid" : "8a80d88d712aa8c601717c68f71f026a"
| }
| },{
| "flow" : {
| "executorNumber" : "1",
| "driverMemory" : "1g",
| "executorMemory" : "1g",
| "executorCores" : "1",
| "paths" : [ ],
| "name" : "flow5",
| "stops" : [ {
| "customizedProperties" : { },
| "name" : "SelectHiveQL",
| "uuid" : "8a80d88d712aa8c601717c68f720026b",
| "bundle" : "cn.piflow.bundle.hive.SelectHiveQL",
| "properties" : {
| "hiveQL" : "show databases"
| }
| } ],
| "uuid" : "8a80d88d712aa8c601717c68f71f026a"
| }
| }],
| "conditions" : [ {
| "entry" : "flow3",
| "after" : "flow2"
| },
| {
| "entry" : "flow5",
| "after" : "flow4"
| }],
| "name" : "group1",
| "uuid" : "8a80d88d712aa8c601717c68f71f0269"
| }
| } ],
| "conditions" : [ {
| "entry" : "group1",
| "after" : "flow1"
| } ],
| "uuid" : "8a80d88d712aa8c601717c68f71e0268"
| }
|}
""".stripMargin*/
val json =
"""
|{
| "group" : {
| "name" : "xjzhu",
| "flows" : [ {
| "flow" : {
| "executorNumber" : "1",
| "driverMemory" : "1g",
| "executorMemory" : "1g",
| "executorCores" : "1",
| "paths" : [ ],
| "name" : "flow1",
| "stops" : [ {
| "customizedProperties" : { },
| "name" : "SelectHiveQL",
| "uuid" : "8a80d88d712aa8c601717c68f7220272",
| "bundle" : "cn.piflow.bundle.hive.SelectHiveQL",
| "properties" : {
| "hiveQL" : "show databases"
| }
| } ],
| "uuid" : "8a80d88d712aa8c601717c68f7220271"
| }
| } ],
| "groups" : [ {
| "group" : {
| "flows" : [ {
| "flow" : {
| "executorNumber" : "1",
| "driverMemory" : "1g",
| "executorMemory" : "1g",
| "executorCores" : "1",
| "paths" : [ ],
| "name" : "flow3",
| "stops" : [ {
| "customizedProperties" : { },
| "name" : "SelectHiveQL",
| "uuid" : "8a80d88d712aa8c601717c68f720026b",
| "bundle" : "cn.piflow.bundle.hive.SelectHiveQL",
| "properties" : {
| "hiveQL" : "show databases"
| }
| } ],
| "uuid" : "8a80d88d712aa8c601717c68f71f026a"
| }
| }, {
| "flow" : {
| "executorNumber" : "1",
| "driverMemory" : "1g",
| "executorMemory" : "1g",
| "executorCores" : "1",
| "paths" : [ ],
| "name" : "flow2",
| "stops" : [ {
| "customizedProperties" : { },
| "name" : "SelectHiveQL",
| "uuid" : "8a80d88d712aa8c601717c68f721026e",
| "bundle" : "cn.piflow.bundle.hive.SelectHiveQL",
| "properties" : {
| "hiveQL" : "show databases;"
| }
| } ],
| "uuid" : "8a80d88d712aa8c601717c68f721026d"
| }
| },{
| "flow" : {
| "executorNumber" : "1",
| "driverMemory" : "1g",
| "executorMemory" : "1g",
| "executorCores" : "1",
| "paths" : [ ],
| "name" : "flow4",
| "stops" : [ {
| "customizedProperties" : { },
| "name" : "SelectHiveQL",
| "uuid" : "8a80d88d712aa8c601717c68f720026b",
| "bundle" : "cn.piflow.bundle.hive.SelectHiveQL",
| "properties" : {
| "hiveQL" : "show databases"
| }
| } ],
| "uuid" : "8a80d88d712aa8c601717c68f71f026a"
| }
| },{
| "flow" : {
| "executorNumber" : "1",
| "driverMemory" : "1g",
| "executorMemory" : "1g",
| "executorCores" : "1",
| "paths" : [ ],
| "name" : "flow5",
| "stops" : [ {
| "customizedProperties" : { },
| "name" : "SelectHiveQL",
| "uuid" : "8a80d88d712aa8c601717c68f720026b",
| "bundle" : "cn.piflow.bundle.hive.SelectHiveQL",
| "properties" : {
| "hiveQL" : "show databases"
| }
| } ],
| "uuid" : "8a80d88d712aa8c601717c68f71f026a"
| }
| }],
| "conditions" : [ {
| "entry" : "flow3",
| "after" : "flow2"
| },
| {
| "entry" : "flow5",
| "after" : "flow4"
| }],
| "name" : "group1",
| "uuid" : "8a80d88d712aa8c601717c68f71f0269"
| }
| },{
| "group" : {
| "flows" : [ {
| "flow" : {
| "executorNumber" : "1",
| "driverMemory" : "1g",
| "executorMemory" : "1g",
| "executorCores" : "1",
| "paths" : [ ],
| "name" : "flow6",
| "stops" : [ {
| "customizedProperties" : { },
| "name" : "SelectHiveQL",
| "uuid" : "8a80d88d712aa8c601717c68f720026b",
| "bundle" : "cn.piflow.bundle.hive.SelectHiveQL",
| "properties" : {
| "hiveQL" : "show databases;"
| }
| } ],
| "uuid" : "8a80d88d712aa8c601717c68f71f026a"
| }
| }, {
| "flow" : {
| "executorNumber" : "1",
| "driverMemory" : "1g",
| "executorMemory" : "1g",
| "executorCores" : "1",
| "paths" : [ ],
| "name" : "flow7",
| "stops" : [ {
| "customizedProperties" : { },
| "name" : "SelectHiveQL",
| "uuid" : "8a80d88d712aa8c601717c68f721026e",
| "bundle" : "cn.piflow.bundle.hive.SelectHiveQL",
| "properties" : {
| "hiveQL" : "show databases"
| }
| } ],
| "uuid" : "8a80d88d712aa8c601717c68f721026d"
| }
| },{
| "flow" : {
| "executorNumber" : "1",
| "driverMemory" : "1g",
| "executorMemory" : "1g",
| "executorCores" : "1",
| "paths" : [ ],
| "name" : "flow8",
| "stops" : [ {
| "customizedProperties" : { },
| "name" : "SelectHiveQL",
| "uuid" : "8a80d88d712aa8c601717c68f720026b",
| "bundle" : "cn.piflow.bundle.hive.SelectHiveQL",
| "properties" : {
| "hiveQL" : "show databases"
| }
| } ],
| "uuid" : "8a80d88d712aa8c601717c68f71f026a"
| }
| },{
| "flow" : {
| "executorNumber" : "1",
| "driverMemory" : "1g",
| "executorMemory" : "1g",
| "executorCores" : "1",
| "paths" : [ ],
| "name" : "flow9",
| "stops" : [ {
| "customizedProperties" : { },
| "name" : "SelectHiveQL",
| "uuid" : "8a80d88d712aa8c601717c68f720026b",
| "bundle" : "cn.piflow.bundle.hive.SelectHiveQL",
| "properties" : {
| "hiveQL" : "show databases"
| }
| } ],
| "uuid" : "8a80d88d712aa8c601717c68f71f026a"
| }
| }],
| "conditions" : [ {
| "entry" : "flow7",
| "after" : "flow6"
| },
| {
| "entry" : "flow9",
| "after" : "flow8"
| }],
| "name" : "group2",
| "uuid" : "8a80d88d712aa8c601717c68f71f0269"
| }
| } ],
| "conditions" : [ {
| "entry" : "group1",
| "after" : "flow1"
| },{
| "entry" : "group2",
| "after" : "flow1"
| } ],
| "uuid" : "8a80d88d712aa8c601717c68f71e0268"
| }
|}
""".stripMargin
val url = "http://10.0.88.13:8002/group/start"
/*val json=
"""
|{
| "group" : {
| "flows" : [ {
| "flow" : {
| "executorNumber" : "1",
| "driverMemory" : "1g",
| "executorMemory" : "1g",
| "executorCores" : "1",
| "paths" : [ ],
| "name" : "flow1",
| "stops" : [ {
| "customizedProperties" : { },
| "name" : "SelectHiveQL",
| "uuid" : "8a80d88d712aa8c601717c68f7220272",
| "bundle" : "cn.piflow.bundle.hive.SelectHiveQL",
| "properties" : {
| "hiveQL" : "show databases"
| }
| } ],
| "uuid" : "8a80d88d712aa8c601717c68f7220271"
| }
| } ],
| "name" : "xjzhu",
| "groups" : [ {
| "group" : {
| "flows" : [ {
| "flow" : {
| "executorNumber" : "1",
| "driverMemory" : "1g",
| "executorMemory" : "1g",
| "executorCores" : "1",
| "paths" : [ ],
| "name" : "flow2",
| "stops" : [ {
| "customizedProperties" : { },
| "name" : "SelectHiveQL",
| "uuid" : "8a80d88d712aa8c601717c68f721026e",
| "bundle" : "cn.piflow.bundle.hive.SelectHiveQL",
| "properties" : {
| "hiveQL" : "show databases;"
| }
| } ],
| "uuid" : "8a80d88d712aa8c601717c68f721026d"
| }
| }],
| "conditions" : [ ],
| "name" : "group1",
| "uuid" : "8a80d88d712aa8c601717c68f71f0269"
| }
| } ],
| "conditions" : [ {
| "entry" : "group1",
| "after" : "flow1"
| } ],
| "uuid" : "8a80d88d712aa8c601717c68f71e0268"
| }
|}
""".stripMargin*/
val url = "http://10.0.85.83:8001/group/start"
val timeout = 1800
val requestConfig = RequestConfig.custom()
.setConnectTimeout(timeout*1000)