From a81f18cc9cee82e2c39dc46cdfaff7a9a817849f Mon Sep 17 00:00:00 2001 From: judy0131 Date: Wed, 22 Apr 2020 14:48:30 +0800 Subject: [PATCH 1/2] fix bug: catch group exception to monitor the status --- .../src/main/scala/cn/piflow/Group.scala | 57 ++++++++++++++--- .../main/scala/cn/piflow/GroupException.scala | 5 ++ .../main/scala/cn/piflow/util/H2Util.scala | 63 +++++++++++++++++++ 3 files changed, 116 insertions(+), 9 deletions(-) create mode 100644 piflow-core/src/main/scala/cn/piflow/GroupException.scala diff --git a/piflow-core/src/main/scala/cn/piflow/Group.scala b/piflow-core/src/main/scala/cn/piflow/Group.scala index a067e05..3299a06 100644 --- a/piflow-core/src/main/scala/cn/piflow/Group.scala +++ b/piflow-core/src/main/scala/cn/piflow/Group.scala @@ -1,14 +1,15 @@ package cn.piflow +import java.lang.Thread.UncaughtExceptionHandler import java.util.Date import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.{CountDownLatch, TimeUnit} import cn.piflow.util._ import org.apache.spark.launcher.{SparkAppHandle, SparkLauncher} -import org.apache.spark.launcher.SparkAppHandle.State import scala.collection.mutable.{ArrayBuffer, Map => MMap} +import scala.util.{Failure, Success, Try} /** @@ -84,8 +85,10 @@ class GroupExecutionImpl(group: Group, runnerContext: Context, runner: Runner) e val mapGroupEntryWithConditions: Map[String, (GroupEntry, Condition[GroupExecution])] = group.mapFlowWithConditions(); val completedGroupEntry = MMap[String, Boolean](); - completedGroupEntry ++= mapGroupEntryWithConditions.map(x => (x._1, false)); - val numWaitingGroupEntry = new AtomicInteger(mapGroupEntryWithConditions.size); + completedGroupEntry ++= mapGroupEntryWithConditions.map(x => (x._1, false)) + val numWaitingGroupEntry = new AtomicInteger(mapGroupEntryWithConditions.size) + var notExecuteGroupEntry = new AtomicInteger(mapGroupEntryWithConditions.size) + var numErrorGroupEntry = 0 val startedProcesses = MMap[String, SparkAppHandle](); val startedGroup = MMap[String, GroupExecution]() @@ -199,8 +202,11 @@ class GroupExecutionImpl(group: Group, runnerContext: Context, runner: Runner) e H2Util.updateFlowGroupId(appId, groupId) } - startedProcesses(name) = handle; + startedProcesses(name) = handle startedProcessesAppID(name) = appId + notExecuteGroupEntry.decrementAndGet() + println("notExecuteGroupEntry = " + notExecuteGroupEntry.get()) + println("FlowName = " + name + "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!notExecuteGroupEntry = " + notExecuteGroupEntry.get()) } private def startGroup(name: String, group: Group, parentId: String): Unit = { @@ -213,8 +219,13 @@ class GroupExecutionImpl(group: Group, runnerContext: Context, runner: Runner) e if(parentId != ""){ H2Util.updateGroupParent(groupId,parentId) } + notExecuteGroupEntry.decrementAndGet() + println("GroupName = " + name + "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!notExecuteGroupEntry = " + notExecuteGroupEntry.get()) } + + @volatile + var maybeException:Option[Throwable] = None val pollingThread = new Thread(new Runnable() { override def run(): Unit = { @@ -224,6 +235,7 @@ class GroupExecutionImpl(group: Group, runnerContext: Context, runner: Runner) e while (numWaitingGroupEntry.get() > 0) { val todosFlow = ArrayBuffer[(String, Flow)](); val todosGroup = ArrayBuffer[(String, Group)](); + mapGroupEntryWithConditions.foreach { en => if(en._2._1.isInstanceOf[Flow]){ @@ -238,12 +250,18 @@ class GroupExecutionImpl(group: Group, runnerContext: Context, runner: Runner) e } + if(todosFlow.size == 0 && todosGroup.size == 0 && H2Util.isGroupChildError(id) && !H2Util.isGroupChildRunning(id)) + throw new GroupException("Group Failed!") + startedProcesses.synchronized { - todosFlow.foreach(en => startProcess(en._1, en._2.asInstanceOf[Flow],id)); + todosFlow.foreach(en => { + startProcess(en._1, en._2.asInstanceOf[Flow],id) + }); } startedGroup.synchronized{ - - todosGroup.foreach(en => startGroup(en._1, en._2.asInstanceOf[Group],id)) + todosGroup.foreach(en => { + startGroup(en._1, en._2.asInstanceOf[Group],id) + }) } Thread.sleep(POLLING_INTERVAL); @@ -254,7 +272,9 @@ class GroupExecutionImpl(group: Group, runnerContext: Context, runner: Runner) e }catch { case e: Throwable => runnerListener.onGroupFailed(groupContext); - throw e; + println(e) + if(e.isInstanceOf[GroupException]) + throw e } finally { latch.countDown(); @@ -263,7 +283,26 @@ class GroupExecutionImpl(group: Group, runnerContext: Context, runner: Runner) e } }); - pollingThread.start(); + val doit = Try{ + pollingThread.setUncaughtExceptionHandler( new UncaughtExceptionHandler { + override def uncaughtException(thread: Thread, throwable: Throwable): Unit = { + maybeException = Some(throwable) + } + }) + pollingThread.start() + //pollingThread.join() + } + + doit match { + case Success(v) => { + println("Did not capture error!") + } + case Failure(v) =>{ + println("Capture error!") + runnerListener.onGroupFailed(groupContext) + } + + } override def awaitTermination(): Unit = { latch.await(); diff --git a/piflow-core/src/main/scala/cn/piflow/GroupException.scala b/piflow-core/src/main/scala/cn/piflow/GroupException.scala new file mode 100644 index 0000000..14e3fb4 --- /dev/null +++ b/piflow-core/src/main/scala/cn/piflow/GroupException.scala @@ -0,0 +1,5 @@ +package cn.piflow + +class GroupException(msg : String) extends Exception(msg){ + +} diff --git a/piflow-core/src/main/scala/cn/piflow/util/H2Util.scala b/piflow-core/src/main/scala/cn/piflow/util/H2Util.scala index 420802e..2ace598 100644 --- a/piflow-core/src/main/scala/cn/piflow/util/H2Util.scala +++ b/piflow-core/src/main/scala/cn/piflow/util/H2Util.scala @@ -7,6 +7,7 @@ import java.util.Date import net.liftweb.json.compactRender import net.liftweb.json.JsonDSL._ import org.h2.tools.Server +import scala.util.control.Breaks.{breakable} object H2Util { @@ -461,6 +462,68 @@ object H2Util { return groupState } + def isGroupChildError( groupId : String) : Boolean = { + + if(getGroupChildByStatus(groupId, GroupState.FAILED).size > 0 || getGroupChildByStatus(groupId, GroupState.KILLED).size > 0) + return true + else if(getFlowChildByStatus(groupId, FlowState.FAILED).size > 0 || getFlowChildByStatus(groupId, FlowState.KILLED).size > 0) + return true + else + return false + } + + def isGroupChildRunning( groupId : String) : Boolean = { + + if(getGroupChildByStatus(groupId, GroupState.STARTED).size > 0 ) + return true + else if(getFlowChildByStatus(groupId, FlowState.STARTED).size > 0 ) + return true + else + return false + } + + def getGroupChildByStatus(groupId: String, status : String) : List[String] = { + val statement = getConnectionInstance().createStatement() + statement.setQueryTimeout(QUERY_TIME) + var failedList = List[String]() + + //group children state + val groupRS : ResultSet = statement.executeQuery("select * from flowGroup where parentId='" + groupId +"'") + breakable{ + while (groupRS.next()){ + val groupName = groupRS.getString("name") + val groupState = groupRS.getString("state") + if(groupState == status){ + failedList = groupName +: failedList + } + } + } + groupRS.close() + statement.close() + return failedList + } + def getFlowChildByStatus(groupId: String, status : String) : List[String] = { + val statement = getConnectionInstance().createStatement() + statement.setQueryTimeout(QUERY_TIME) + var failedList = List[String]() + + //flow children state + val rs : ResultSet = statement.executeQuery("select * from flow where groupId='" + groupId +"'") + breakable{ + while(rs.next()){ + val flowName = rs.getString("name") + val flowState = rs.getString("state") + if(flowState == status){ + failedList = flowName +: failedList + } + } + } + + rs.close() + statement.close() + return failedList + } + def getFlowGroupInfo(groupId:String) : String = { val flowGroupInfoMap = getGroupInfoMap(groupId) From ab132af31819ebb55d42e3f5a998e9f755a1b095 Mon Sep 17 00:00:00 2001 From: judy0131 Date: Thu, 23 Apr 2020 11:23:27 +0800 Subject: [PATCH 2/2] fix bug: monitor the status of group --- .../src/main/scala/cn/piflow/Group.scala | 48 ++- .../piflow/api/HTTPClientStartFlowGroup.scala | 407 +++++++++++++++++- 2 files changed, 430 insertions(+), 25 deletions(-) diff --git a/piflow-core/src/main/scala/cn/piflow/Group.scala b/piflow-core/src/main/scala/cn/piflow/Group.scala index 3299a06..f5b1868 100644 --- a/piflow-core/src/main/scala/cn/piflow/Group.scala +++ b/piflow-core/src/main/scala/cn/piflow/Group.scala @@ -87,8 +87,6 @@ class GroupExecutionImpl(group: Group, runnerContext: Context, runner: Runner) e val completedGroupEntry = MMap[String, Boolean](); completedGroupEntry ++= mapGroupEntryWithConditions.map(x => (x._1, false)) val numWaitingGroupEntry = new AtomicInteger(mapGroupEntryWithConditions.size) - var notExecuteGroupEntry = new AtomicInteger(mapGroupEntryWithConditions.size) - var numErrorGroupEntry = 0 val startedProcesses = MMap[String, SparkAppHandle](); val startedGroup = MMap[String, GroupExecution]() @@ -204,10 +202,7 @@ class GroupExecutionImpl(group: Group, runnerContext: Context, runner: Runner) e startedProcesses(name) = handle startedProcessesAppID(name) = appId - notExecuteGroupEntry.decrementAndGet() - println("notExecuteGroupEntry = " + notExecuteGroupEntry.get()) - println("FlowName = " + name + "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!notExecuteGroupEntry = " + notExecuteGroupEntry.get()) - } + } private def startGroup(name: String, group: Group, parentId: String): Unit = { val groupExecution = runner.start(group); @@ -219,8 +214,6 @@ class GroupExecutionImpl(group: Group, runnerContext: Context, runner: Runner) e if(parentId != ""){ H2Util.updateGroupParent(groupId,parentId) } - notExecuteGroupEntry.decrementAndGet() - println("GroupName = " + name + "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!notExecuteGroupEntry = " + notExecuteGroupEntry.get()) } @@ -233,26 +226,16 @@ class GroupExecutionImpl(group: Group, runnerContext: Context, runner: Runner) e try{ while (numWaitingGroupEntry.get() > 0) { - val todosFlow = ArrayBuffer[(String, Flow)](); - val todosGroup = ArrayBuffer[(String, Group)](); - mapGroupEntryWithConditions.foreach { en => + val (todosFlow, todosGroup) = getTodos() - if(en._2._1.isInstanceOf[Flow]){ - if (!startedProcesses.contains(en._1) && en._2._2.matches(execution)) { - todosFlow += (en._1 -> en._2._1.asInstanceOf[Flow]); - } - }else if (en._2._1.isInstanceOf[Group]){ - if (!startedGroup.contains(en._1) && en._2._2.matches(execution)) { - todosGroup += (en._1 -> en._2._1.asInstanceOf[Group]); - } - } + if(todosFlow.size == 0 && todosGroup.size == 0 && H2Util.isGroupChildError(id) && !H2Util.isGroupChildRunning(id)){ + val (todosFlow, todosGroup) = getTodos() + if(todosFlow.size == 0 && todosGroup.size == 0) + throw new GroupException("Group Failed!") } - if(todosFlow.size == 0 && todosGroup.size == 0 && H2Util.isGroupChildError(id) && !H2Util.isGroupChildRunning(id)) - throw new GroupException("Group Failed!") - startedProcesses.synchronized { todosFlow.foreach(en => { startProcess(en._1, en._2.asInstanceOf[Flow],id) @@ -358,6 +341,25 @@ class GroupExecutionImpl(group: Group, runnerContext: Context, runner: Runner) e }; } + private def getTodos() : (ArrayBuffer[(String, Flow)], ArrayBuffer[(String, Group)]) = { + + val todosFlow = ArrayBuffer[(String, Flow)](); + val todosGroup = ArrayBuffer[(String, Group)](); + mapGroupEntryWithConditions.foreach { en => + if(en._2._1.isInstanceOf[Flow]){ + if (!startedProcesses.contains(en._1) && en._2._2.matches(execution)) { + todosFlow += (en._1 -> en._2._1.asInstanceOf[Flow]); + } + }else if (en._2._1.isInstanceOf[Group]){ + if (!startedGroup.contains(en._1) && en._2._2.matches(execution)) { + todosGroup += (en._1 -> en._2._1.asInstanceOf[Group]); + } + } + + } + (todosFlow, todosGroup) + } + override def getGroupId(): String = id override def getChildCount(): Int = { diff --git a/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStartFlowGroup.scala b/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStartFlowGroup.scala index 168817c..3b53a5f 100644 --- a/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStartFlowGroup.scala +++ b/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStartFlowGroup.scala @@ -10,7 +10,7 @@ object HTTPClientStartFlowGroup { def main(args: Array[String]): Unit = { - val json = + /*val json = """ |{ | "group": { @@ -294,9 +294,412 @@ object HTTPClientStartFlowGroup { | } |} | + """.stripMargin*/ + /*val json= + """ + |{ + | "group" : { + | "name" : "xjzhu", + | "flows" : [ { + | "flow" : { + | "executorNumber" : "1", + | "driverMemory" : "1g", + | "executorMemory" : "1g", + | "executorCores" : "1", + | "paths" : [ ], + | "name" : "flow1", + | "stops" : [ { + | "customizedProperties" : { }, + | "name" : "SelectHiveQL", + | "uuid" : "8a80d88d712aa8c601717c68f7220272", + | "bundle" : "cn.piflow.bundle.hive.SelectHiveQL", + | "properties" : { + | "hiveQL" : "show databases" + | } + | } ], + | "uuid" : "8a80d88d712aa8c601717c68f7220271" + | } + | } ], + | "groups" : [ { + | "group" : { + | "flows" : [ { + | "flow" : { + | "executorNumber" : "1", + | "driverMemory" : "1g", + | "executorMemory" : "1g", + | "executorCores" : "1", + | "paths" : [ ], + | "name" : "flow3", + | "stops" : [ { + | "customizedProperties" : { }, + | "name" : "SelectHiveQL", + | "uuid" : "8a80d88d712aa8c601717c68f720026b", + | "bundle" : "cn.piflow.bundle.hive.SelectHiveQL", + | "properties" : { + | "hiveQL" : "show databases" + | } + | } ], + | "uuid" : "8a80d88d712aa8c601717c68f71f026a" + | } + | }, { + | "flow" : { + | "executorNumber" : "1", + | "driverMemory" : "1g", + | "executorMemory" : "1g", + | "executorCores" : "1", + | "paths" : [ ], + | "name" : "flow2", + | "stops" : [ { + | "customizedProperties" : { }, + | "name" : "SelectHiveQL", + | "uuid" : "8a80d88d712aa8c601717c68f721026e", + | "bundle" : "cn.piflow.bundle.hive.SelectHiveQL", + | "properties" : { + | "hiveQL" : "show databases;" + | } + | } ], + | "uuid" : "8a80d88d712aa8c601717c68f721026d" + | } + | },{ + | "flow" : { + | "executorNumber" : "1", + | "driverMemory" : "1g", + | "executorMemory" : "1g", + | "executorCores" : "1", + | "paths" : [ ], + | "name" : "flow4", + | "stops" : [ { + | "customizedProperties" : { }, + | "name" : "SelectHiveQL", + | "uuid" : "8a80d88d712aa8c601717c68f720026b", + | "bundle" : "cn.piflow.bundle.hive.SelectHiveQL", + | "properties" : { + | "hiveQL" : "show databases" + | } + | } ], + | "uuid" : "8a80d88d712aa8c601717c68f71f026a" + | } + | },{ + | "flow" : { + | "executorNumber" : "1", + | "driverMemory" : "1g", + | "executorMemory" : "1g", + | "executorCores" : "1", + | "paths" : [ ], + | "name" : "flow5", + | "stops" : [ { + | "customizedProperties" : { }, + | "name" : "SelectHiveQL", + | "uuid" : "8a80d88d712aa8c601717c68f720026b", + | "bundle" : "cn.piflow.bundle.hive.SelectHiveQL", + | "properties" : { + | "hiveQL" : "show databases" + | } + | } ], + | "uuid" : "8a80d88d712aa8c601717c68f71f026a" + | } + | }], + | "conditions" : [ { + | "entry" : "flow3", + | "after" : "flow2" + | }, + | { + | "entry" : "flow5", + | "after" : "flow4" + | }], + | "name" : "group1", + | "uuid" : "8a80d88d712aa8c601717c68f71f0269" + | } + | } ], + | "conditions" : [ { + | "entry" : "group1", + | "after" : "flow1" + | } ], + | "uuid" : "8a80d88d712aa8c601717c68f71e0268" + | } + |} + """.stripMargin*/ + + val json = + """ + |{ + | "group" : { + | "name" : "xjzhu", + | "flows" : [ { + | "flow" : { + | "executorNumber" : "1", + | "driverMemory" : "1g", + | "executorMemory" : "1g", + | "executorCores" : "1", + | "paths" : [ ], + | "name" : "flow1", + | "stops" : [ { + | "customizedProperties" : { }, + | "name" : "SelectHiveQL", + | "uuid" : "8a80d88d712aa8c601717c68f7220272", + | "bundle" : "cn.piflow.bundle.hive.SelectHiveQL", + | "properties" : { + | "hiveQL" : "show databases" + | } + | } ], + | "uuid" : "8a80d88d712aa8c601717c68f7220271" + | } + | } ], + | "groups" : [ { + | "group" : { + | "flows" : [ { + | "flow" : { + | "executorNumber" : "1", + | "driverMemory" : "1g", + | "executorMemory" : "1g", + | "executorCores" : "1", + | "paths" : [ ], + | "name" : "flow3", + | "stops" : [ { + | "customizedProperties" : { }, + | "name" : "SelectHiveQL", + | "uuid" : "8a80d88d712aa8c601717c68f720026b", + | "bundle" : "cn.piflow.bundle.hive.SelectHiveQL", + | "properties" : { + | "hiveQL" : "show databases" + | } + | } ], + | "uuid" : "8a80d88d712aa8c601717c68f71f026a" + | } + | }, { + | "flow" : { + | "executorNumber" : "1", + | "driverMemory" : "1g", + | "executorMemory" : "1g", + | "executorCores" : "1", + | "paths" : [ ], + | "name" : "flow2", + | "stops" : [ { + | "customizedProperties" : { }, + | "name" : "SelectHiveQL", + | "uuid" : "8a80d88d712aa8c601717c68f721026e", + | "bundle" : "cn.piflow.bundle.hive.SelectHiveQL", + | "properties" : { + | "hiveQL" : "show databases;" + | } + | } ], + | "uuid" : "8a80d88d712aa8c601717c68f721026d" + | } + | },{ + | "flow" : { + | "executorNumber" : "1", + | "driverMemory" : "1g", + | "executorMemory" : "1g", + | "executorCores" : "1", + | "paths" : [ ], + | "name" : "flow4", + | "stops" : [ { + | "customizedProperties" : { }, + | "name" : "SelectHiveQL", + | "uuid" : "8a80d88d712aa8c601717c68f720026b", + | "bundle" : "cn.piflow.bundle.hive.SelectHiveQL", + | "properties" : { + | "hiveQL" : "show databases" + | } + | } ], + | "uuid" : "8a80d88d712aa8c601717c68f71f026a" + | } + | },{ + | "flow" : { + | "executorNumber" : "1", + | "driverMemory" : "1g", + | "executorMemory" : "1g", + | "executorCores" : "1", + | "paths" : [ ], + | "name" : "flow5", + | "stops" : [ { + | "customizedProperties" : { }, + | "name" : "SelectHiveQL", + | "uuid" : "8a80d88d712aa8c601717c68f720026b", + | "bundle" : "cn.piflow.bundle.hive.SelectHiveQL", + | "properties" : { + | "hiveQL" : "show databases" + | } + | } ], + | "uuid" : "8a80d88d712aa8c601717c68f71f026a" + | } + | }], + | "conditions" : [ { + | "entry" : "flow3", + | "after" : "flow2" + | }, + | { + | "entry" : "flow5", + | "after" : "flow4" + | }], + | "name" : "group1", + | "uuid" : "8a80d88d712aa8c601717c68f71f0269" + | } + | },{ + | "group" : { + | "flows" : [ { + | "flow" : { + | "executorNumber" : "1", + | "driverMemory" : "1g", + | "executorMemory" : "1g", + | "executorCores" : "1", + | "paths" : [ ], + | "name" : "flow6", + | "stops" : [ { + | "customizedProperties" : { }, + | "name" : "SelectHiveQL", + | "uuid" : "8a80d88d712aa8c601717c68f720026b", + | "bundle" : "cn.piflow.bundle.hive.SelectHiveQL", + | "properties" : { + | "hiveQL" : "show databases;" + | } + | } ], + | "uuid" : "8a80d88d712aa8c601717c68f71f026a" + | } + | }, { + | "flow" : { + | "executorNumber" : "1", + | "driverMemory" : "1g", + | "executorMemory" : "1g", + | "executorCores" : "1", + | "paths" : [ ], + | "name" : "flow7", + | "stops" : [ { + | "customizedProperties" : { }, + | "name" : "SelectHiveQL", + | "uuid" : "8a80d88d712aa8c601717c68f721026e", + | "bundle" : "cn.piflow.bundle.hive.SelectHiveQL", + | "properties" : { + | "hiveQL" : "show databases" + | } + | } ], + | "uuid" : "8a80d88d712aa8c601717c68f721026d" + | } + | },{ + | "flow" : { + | "executorNumber" : "1", + | "driverMemory" : "1g", + | "executorMemory" : "1g", + | "executorCores" : "1", + | "paths" : [ ], + | "name" : "flow8", + | "stops" : [ { + | "customizedProperties" : { }, + | "name" : "SelectHiveQL", + | "uuid" : "8a80d88d712aa8c601717c68f720026b", + | "bundle" : "cn.piflow.bundle.hive.SelectHiveQL", + | "properties" : { + | "hiveQL" : "show databases" + | } + | } ], + | "uuid" : "8a80d88d712aa8c601717c68f71f026a" + | } + | },{ + | "flow" : { + | "executorNumber" : "1", + | "driverMemory" : "1g", + | "executorMemory" : "1g", + | "executorCores" : "1", + | "paths" : [ ], + | "name" : "flow9", + | "stops" : [ { + | "customizedProperties" : { }, + | "name" : "SelectHiveQL", + | "uuid" : "8a80d88d712aa8c601717c68f720026b", + | "bundle" : "cn.piflow.bundle.hive.SelectHiveQL", + | "properties" : { + | "hiveQL" : "show databases" + | } + | } ], + | "uuid" : "8a80d88d712aa8c601717c68f71f026a" + | } + | }], + | "conditions" : [ { + | "entry" : "flow7", + | "after" : "flow6" + | }, + | { + | "entry" : "flow9", + | "after" : "flow8" + | }], + | "name" : "group2", + | "uuid" : "8a80d88d712aa8c601717c68f71f0269" + | } + | } ], + | "conditions" : [ { + | "entry" : "group1", + | "after" : "flow1" + | },{ + | "entry" : "group2", + | "after" : "flow1" + | } ], + | "uuid" : "8a80d88d712aa8c601717c68f71e0268" + | } + |} """.stripMargin - val url = "http://10.0.88.13:8002/group/start" + /*val json= + """ + |{ + | "group" : { + | "flows" : [ { + | "flow" : { + | "executorNumber" : "1", + | "driverMemory" : "1g", + | "executorMemory" : "1g", + | "executorCores" : "1", + | "paths" : [ ], + | "name" : "flow1", + | "stops" : [ { + | "customizedProperties" : { }, + | "name" : "SelectHiveQL", + | "uuid" : "8a80d88d712aa8c601717c68f7220272", + | "bundle" : "cn.piflow.bundle.hive.SelectHiveQL", + | "properties" : { + | "hiveQL" : "show databases" + | } + | } ], + | "uuid" : "8a80d88d712aa8c601717c68f7220271" + | } + | } ], + | "name" : "xjzhu", + | "groups" : [ { + | "group" : { + | "flows" : [ { + | "flow" : { + | "executorNumber" : "1", + | "driverMemory" : "1g", + | "executorMemory" : "1g", + | "executorCores" : "1", + | "paths" : [ ], + | "name" : "flow2", + | "stops" : [ { + | "customizedProperties" : { }, + | "name" : "SelectHiveQL", + | "uuid" : "8a80d88d712aa8c601717c68f721026e", + | "bundle" : "cn.piflow.bundle.hive.SelectHiveQL", + | "properties" : { + | "hiveQL" : "show databases;" + | } + | } ], + | "uuid" : "8a80d88d712aa8c601717c68f721026d" + | } + | }], + | "conditions" : [ ], + | "name" : "group1", + | "uuid" : "8a80d88d712aa8c601717c68f71f0269" + | } + | } ], + | "conditions" : [ { + | "entry" : "group1", + | "after" : "flow1" + | } ], + | "uuid" : "8a80d88d712aa8c601717c68f71e0268" + | } + |} + """.stripMargin*/ + + val url = "http://10.0.85.83:8001/group/start" val timeout = 1800 val requestConfig = RequestConfig.custom() .setConnectTimeout(timeout*1000)