support run project API

This commit is contained in:
judy0131 2019-05-05 15:34:35 +08:00
parent 1754a233cd
commit 7deb8946c7
9 changed files with 471 additions and 44 deletions

View File

@ -1,4 +1,4 @@
server.ip=10.0.86.124
server.ip=10.0.86.98
server.port=8001
#spark.master=spark://10.0.86.89:7077

View File

@ -1,7 +1,7 @@
package cn.piflow.conf.bean
import cn.piflow.conf.util.{JsonUtil, MapUtil}
import cn.piflow.{FlowImpl, Path}
import cn.piflow.{FlowImpl, Path, ProjectEntry}
import net.liftweb.json.JsonDSL._
import net.liftweb.json._
@ -9,7 +9,7 @@ import scala.util.parsing.json.JSONObject
class FlowBean {
class FlowBean extends ProjectEntryBean{
/*@BeanProperty*/
var uuid : String = _
var name : String = _

View File

@ -1,17 +1,16 @@
package cn.piflow.conf.bean
import cn.piflow.{Condition, FlowGroupExecution, FlowGroupImpl}
import cn.piflow._
import cn.piflow.conf.util.MapUtil
/**
* Created by xjzhu@cnic.cn on 4/25/19
*/
class FlowGroupBean {
class FlowGroupBean extends ProjectEntryBean {
var uuid : String = _
var name : String = _
var flows : List[FlowBean] = List()
//var conditions : [ConditionBean] = List()
var conditions = scala.collection.mutable.Map[String, ConditionBean]()
def init(map : Map[String, Any]) = {
@ -69,30 +68,6 @@ class FlowGroupBean {
flowGroup
}
/*def toJson():String = {
val json =
("flow" ->
("uuid" -> this.uuid) ~
("name" -> this.name) ~
("stops" ->
stops.map { stop =>(
("uuid" -> stop.uuid) ~
("name" -> stop.name)~
("bundle" -> stop.bundle) )}) ~
("paths" ->
paths.map { path => (
("from" -> path.from) ~
("outport" -> path.outport) ~
("inport" -> path.inport) ~
("to" -> path.to)
)}))
val jsonString = compactRender(json)
//println(jsonString)
jsonString
}*/
}
object FlowGroupBean{

View File

@ -1,8 +1,131 @@
package cn.piflow.conf.bean
import cn.piflow.{Condition, ProjectImpl}
import cn.piflow.conf.util.MapUtil
/**
* Created by xjzhu@cnic.cn on 4/25/19
*/
class ProjectBean {
var uuid : String = _
var name : String = _
var projectEntries : List[ProjectEntryBean] = List()
var conditions = scala.collection.mutable.Map[String, ConditionBean]()
def init(map : Map[String, Any]) = {
val projectMap = MapUtil.get(map, "project").asInstanceOf[Map[String, Any]]
this.uuid = MapUtil.get(projectMap,"uuid").asInstanceOf[String]
this.name = MapUtil.get(projectMap,"name").asInstanceOf[String]
//construct FlowGroupBean List
val groupList = MapUtil.get(projectMap,"groups").asInstanceOf[List[Map[String, Any]]]
groupList.foreach( flowGroupMap => {
val flowGroup = FlowGroupBean(flowGroupMap.asInstanceOf[Map[String, Any]])
this.projectEntries = flowGroup +: this.projectEntries
})
//construct FlowBean List
val flowList = MapUtil.get(projectMap,"flows").asInstanceOf[List[Map[String, Any]]]
flowList.foreach( flowMap => {
val flow = FlowBean(flowMap.asInstanceOf[Map[String, Any]])
this.projectEntries = flow +: this.projectEntries
})
//construct ConditionBean List
val conditionList = MapUtil.get(projectMap,"conditions").asInstanceOf[List[Map[String, Any]]]
conditionList.foreach( conditionMap => {
val conditionBean = ConditionBean(conditionMap.asInstanceOf[Map[String, Any]])
conditions(conditionBean.entry) = conditionBean
})
}
def constructProject() = {
val project = new ProjectImpl();
project.setProjectName(name)
this.projectEntries.foreach( projectEntryBean => {
if( !conditions.contains(projectEntryBean.name) ){
if (projectEntryBean.isInstanceOf[FlowBean]){
val bean = projectEntryBean.asInstanceOf[FlowBean]
project.addProjectEntry(projectEntryBean.name,bean.constructFlow())
}else{
val groupBean = projectEntryBean.asInstanceOf[FlowGroupBean]
project.addProjectEntry(groupBean.name,groupBean.constructFlowGroup())
}
}
else{
val conditionBean = conditions(projectEntryBean.name)
if(conditionBean.after.size == 0){
println(projectEntryBean.name + " do not have after flow " + "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@")
if (projectEntryBean.isInstanceOf[FlowBean]){
val bean = projectEntryBean.asInstanceOf[FlowBean]
project.addProjectEntry(projectEntryBean.name,bean.constructFlow())
}else{
val groupBean = projectEntryBean.asInstanceOf[FlowGroupBean]
project.addProjectEntry(groupBean.name,groupBean.constructFlowGroup())
}
}
else if(conditionBean.after.size == 1){
println(projectEntryBean.name + " after " + conditionBean.after(0) + "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@")
if (projectEntryBean.isInstanceOf[FlowBean]){
val bean = projectEntryBean.asInstanceOf[FlowBean]
project.addProjectEntry(projectEntryBean.name,bean.constructFlow(),Condition.after(conditionBean.after(0)))
}else{
val groupBean = projectEntryBean.asInstanceOf[FlowGroupBean]
project.addProjectEntry(groupBean.name,groupBean.constructFlowGroup(), Condition.after(conditionBean.after(0)))
}
}
else {
println(projectEntryBean.name + " after " + conditionBean.after.toSeq + "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@")
var other = new Array[String](conditionBean.after.size-1)
conditionBean.after.copyToArray(other,1)
if (projectEntryBean.isInstanceOf[FlowBean]){
val bean = projectEntryBean.asInstanceOf[FlowBean]
project.addProjectEntry(projectEntryBean.name,bean.constructFlow(),Condition.after(conditionBean.after(0),other: _*))
}else{
val groupBean = projectEntryBean.asInstanceOf[FlowGroupBean]
project.addProjectEntry(groupBean.name,groupBean.constructFlowGroup(), Condition.after(conditionBean.after(0),other: _*))
}
}
}
})
project
}
/*private def addProjectEntry(project:ProjectImpl, projectEntryBean:ProjectEntryBean, conditionBean: ConditionBean): Unit ={
if (projectEntryBean.isInstanceOf[FlowBean]){
val bean = projectEntryBean.asInstanceOf[FlowBean]
project.addProjectEntry(projectEntryBean.name,bean.constructFlow(),Condition.after(conditionBean.after(0),other: _*))
}else{
val groupBean = projectEntryBean.asInstanceOf[FlowGroupBean]
project.addProjectEntry(groupBean.name,groupBean.constructFlowGroup(), Condition.after(conditionBean.after(0),other: _*))
}
}*/
}
object ProjectBean {
def apply(map : Map[String, Any]): ProjectBean = {
val projectBean = new ProjectBean()
projectBean.init(map)
projectBean
}
}

View File

@ -0,0 +1,19 @@
package cn.piflow.conf.bean
import cn.piflow.conf.util.MapUtil
import cn.piflow.{Condition, FlowGroupImpl, ProjectEntry}
/**
* Created by xjzhu@cnic.cn on 4/25/19
*/
trait ProjectEntryBean {
var uuid : String
var name : String
def init(map : Map[String, Any])
}

View File

@ -4,6 +4,10 @@ import java.sql.Date
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{CountDownLatch, TimeUnit}
import cn.piflow.util.PropertyUtil
import org.apache.spark.launcher.{SparkAppHandle, SparkLauncher}
import org.apache.spark.launcher.SparkAppHandle.State
import scala.collection.mutable.{ArrayBuffer, Map => MMap}
@ -15,10 +19,17 @@ trait Project {
def addProjectEntry(name: String, flowOrGroup: ProjectEntry, con: Condition[ProjectExecution] = Condition.AlwaysTrue[ProjectExecution]);
def mapFlowWithConditions(): Map[String, (ProjectEntry, Condition[ProjectExecution])];
def getProjectName(): String;
def setProjectName(projectName : String): Unit;
}
class ProjectImpl extends Project {
var name = ""
var uuid = ""
val _mapFlowWithConditions = MMap[String, (ProjectEntry, Condition[ProjectExecution])]();
def addProjectEntry(name: String, flowOrGroup: ProjectEntry, con: Condition[ProjectExecution] = Condition.AlwaysTrue[ProjectExecution]) = {
@ -26,6 +37,14 @@ class ProjectImpl extends Project {
}
def mapFlowWithConditions(): Map[String, (ProjectEntry, Condition[ProjectExecution])] = _mapFlowWithConditions.toMap;
override def getProjectName(): String = {
this.name
}
override def setProjectName(projectName: String): Unit = {
this.name = projectName
}
}
trait ProjectExecution extends Execution{
@ -43,7 +62,7 @@ class ProjectExecutionImpl(project: Project, runnerContext: Context, runner: Run
completedProcesses ++= mapFlowWithConditions.map(x => (x._1, false));
val numWaitingProcesses = new AtomicInteger(mapFlowWithConditions.size);
val startedProcesses = MMap[String, Process]();
val startedProcesses = MMap[String, SparkAppHandle]();
val startedFlowGroup = MMap[String, FlowGroupExecution]()
val execution = this;
@ -60,10 +79,6 @@ class ProjectExecutionImpl(project: Project, runnerContext: Context, runner: Run
}
override def onProcessCompleted(ctx: ProcessContext): Unit = {
startedProcesses.filter(_._2 == ctx.getProcess()).foreach { x =>
completedProcesses(x._1) = true;
numWaitingProcesses.decrementAndGet();
}
}
override def onJobStarted(ctx: JobContext): Unit = {}
@ -100,8 +115,62 @@ class ProjectExecutionImpl(project: Project, runnerContext: Context, runner: Run
}
private def startProcess(name: String, flow: Flow): Unit = {
val process = runner.start(flow);
startedProcesses(name) = process;
println(flow.getFlowJson())
var flowJson = flow.getFlowJson()
flowJson = flowJson.replaceAll("}","}\n")
//TODO
var appId : String = ""
val countDownLatch = new CountDownLatch(1)
val launcher = new SparkLauncher
val handle =launcher
.setAppName(flow.getFlowName())
.setMaster(PropertyUtil.getPropertyValue("spark.master"))
.setDeployMode(PropertyUtil.getPropertyValue("spark.deploy.mode"))
.setAppResource(PropertyUtil.getPropertyValue("piflow.bundle"))
.setVerbose(true)
.setConf("spark.hadoop.yarn.resourcemanager.hostname", PropertyUtil.getPropertyValue("yarn.resourcemanager.hostname"))
.setConf("spark.hadoop.yarn.resourcemanager.address", PropertyUtil.getPropertyValue("yarn.resourcemanager.address"))
.setConf("spark.yarn.access.namenode", PropertyUtil.getPropertyValue("yarn.access.namenode"))
.setConf("spark.yarn.stagingDir", PropertyUtil.getPropertyValue("yarn.stagingDir"))
.setConf("spark.yarn.jars", PropertyUtil.getPropertyValue("yarn.jars"))
.setConf("spark.jars", PropertyUtil.getPropertyValue("piflow.bundle"))
.setConf("spark.hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.setConf("spark.driver.memory", flow.getDriverMemory())
.setConf("spark.num.executors", flow.getExecutorNum())
.setConf("spark.executor.memory", flow.getExecutorMem())
.setConf("spark.executor.cores",flow.getExecutorCores())
.addFile(PropertyUtil.getConfigureFile())
.setMainClass("cn.piflow.api.StartFlowMain")
.addAppArgs(flowJson)
.startApplication( new SparkAppHandle.Listener {
override def stateChanged(handle: SparkAppHandle): Unit = {
appId = handle.getAppId
val sparkAppState = handle.getState
if(appId != null){
println("Spark job with app id: " + appId + ",\t State changed to: " + sparkAppState)
}else{
println("Spark job's state changed to: " + sparkAppState)
}
//TODO: get the process status
if (handle.getState.equals(State.FINISHED)){
completedProcesses(flow.getFlowName()) = true;
numWaitingProcesses.decrementAndGet();
}
if (handle.getState().isFinal){
countDownLatch.countDown()
println("Task is finished!")
}
}
override def infoChanged(handle: SparkAppHandle): Unit = {
}
}
)
startedProcesses(name) = handle;
}
private def startFlowGroup(name: String, flowGroup: FlowGroup): Unit = {

View File

@ -10,7 +10,7 @@ import org.apache.spark.sql.SparkSession
import cn.piflow.conf.util.{ClassUtil, MapUtil, OptionUtil}
import cn.piflow.{Process, Runner}
import cn.piflow.api.util.{HdfsUtil, PropertyUtil}
import cn.piflow.conf.bean.FlowGroupBean
import cn.piflow.conf.bean.{FlowGroupBean, ProjectBean}
import cn.piflow.util.{FlowState, H2Util, HadoopFileUtil}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
@ -24,6 +24,24 @@ import scala.util.parsing.json.JSON
object API {
def startProject(projectJson : String) = {
println("StartProject API get json: \n" + projectJson )
var appId:String = null
val map = OptionUtil.getAny(JSON.parseFull(projectJson)).asInstanceOf[Map[String, Any]]
val projectMap = MapUtil.get(map, "project").asInstanceOf[Map[String, Any]]
//create flowGroup
val projectBean = ProjectBean(map)
val project = projectBean.constructProject()
val process = Runner.create()
.bind("checkpoint.path",PropertyUtil.getPropertyValue("checkpoint.path"))
.bind("debug.path",PropertyUtil.getPropertyValue("debug.path"))
.start(project);
}
def startFlowGroup(flowGroupJson : String) = {
println("StartFlowGroup API get json: \n" + flowGroupJson )

View File

@ -0,0 +1,208 @@
package cn.piflow.api
import org.apache.http.client.config.RequestConfig
import org.apache.http.client.methods.{CloseableHttpResponse, HttpPost}
import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.HttpClientBuilder
import org.apache.http.util.EntityUtils
object HTTPClientStartProject {
def main(args: Array[String]): Unit = {
val json =
"""
|{
| "project": {
| "name": "TestFlowGroup",
| "uuid": "1111111111111",
| "groups": [{
| "group": {
| "name": "TestFlowGroup",
| "uuid": "1111111111111",
| "flows": [{
| "flow": {
| "name": "one",
| "uuid": "1234",
| "executorNumber": "2",
| "executorMemory": "1g",
| "executorCores": "1",
| "stops": [{
| "uuid": "1111",
| "name": "XmlParser",
| "bundle": "cn.piflow.bundle.xml.XmlParser",
| "properties": {
| "xmlpath": "hdfs://10.0.86.89:9000/xjzhu/dblp.mini.xml",
| "rowTag": "phdthesis"
| }
| },
| {
| "uuid": "2222",
| "name": "SelectField",
| "bundle": "cn.piflow.bundle.common.SelectField",
| "properties": {
| "schema": "title,author,pages"
| }
|
| },
| {
| "uuid": "3333",
| "name": "PutHiveStreaming",
| "bundle": "cn.piflow.bundle.hive.PutHiveStreaming",
| "properties": {
| "database": "sparktest",
| "table": "dblp_phdthesis"
| }
| }
| ],
| "paths": [{
| "from": "XmlParser",
| "outport": "",
| "inport": "",
| "to": "SelectField"
| },
| {
| "from": "SelectField",
| "outport": "",
| "inport": "",
| "to": "PutHiveStreaming"
| }
| ]
| }
| },
| {
| "flow": {
| "name": "two",
| "uuid": "5678",
| "stops": [{
| "uuid": "1111",
| "name": "XmlParser",
| "bundle": "cn.piflow.bundle.xml.XmlParser",
| "properties": {
| "xmlpath": "hdfs://10.0.86.89:9000/xjzhu/dblp.mini.xml",
| "rowTag": "phdthesis"
| }
| },
| {
| "uuid": "2222",
| "name": "SelectField",
| "bundle": "cn.piflow.bundle.common.SelectField",
| "properties": {
| "schema": "title,author,pages"
| }
|
| },
| {
| "uuid": "3333",
| "name": "PutHiveStreaming",
| "bundle": "cn.piflow.bundle.hive.PutHiveStreaming",
| "properties": {
| "database": "sparktest",
| "table": "dblp_phdthesis"
| }
| }
| ],
| "paths": [{
| "from": "XmlParser",
| "outport": "",
| "inport": "",
| "to": "SelectField"
| },
| {
| "from": "SelectField",
| "outport": "",
| "inport": "",
| "to": "PutHiveStreaming"
| }
| ]
| }
|
| }
| ],
|
| "conditions": [{
| "entry": "two",
| "after": "one"
| }]
| }
| }],
| "flows": [{
| "flow": {
| "name": "three",
| "uuid": "91011",
| "executorNumber": "2",
| "executorMemory": "1g",
| "executorCores": "1",
| "stops": [{
| "uuid": "1111",
| "name": "XmlParser",
| "bundle": "cn.piflow.bundle.xml.XmlParser",
| "properties": {
| "xmlpath": "hdfs://10.0.86.89:9000/xjzhu/dblp.mini.xml",
| "rowTag": "phdthesis"
| }
| },
| {
| "uuid": "2222",
| "name": "SelectField",
| "bundle": "cn.piflow.bundle.common.SelectField",
| "properties": {
| "schema": "title,author,pages"
| }
|
| },
| {
| "uuid": "3333",
| "name": "PutHiveStreaming",
| "bundle": "cn.piflow.bundle.hive.PutHiveStreaming",
| "properties": {
| "database": "sparktest",
| "table": "dblp_phdthesis"
| }
| }
| ],
| "paths": [{
| "from": "XmlParser",
| "outport": "",
| "inport": "",
| "to": "SelectField"
| },
| {
| "from": "SelectField",
| "outport": "",
| "inport": "",
| "to": "PutHiveStreaming"
| }
| ]
| }
| }],
| "conditions": [{
| "entry": "three",
| "after": "TestFlowGroup"
| }]
| }
|}
""".stripMargin
val url = "http://10.0.86.98:8001/project/start"
val timeout = 1800
val requestConfig = RequestConfig.custom()
.setConnectTimeout(timeout*1000)
.setConnectionRequestTimeout(timeout*1000)
.setSocketTimeout(timeout*1000).build()
val client = HttpClientBuilder.create().setDefaultRequestConfig(requestConfig).build()
val post:HttpPost = new HttpPost(url)
post.addHeader("Content-Type", "application/json")
post.setEntity(new StringEntity(json))
val response:CloseableHttpResponse = client.execute(post)
val entity = response.getEntity
val str = EntityUtils.toString(entity,"UTF-8")
println("Code is " + str)
}
}

View File

@ -225,17 +225,32 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup
case HttpEntity.Strict(_, data) =>{
var flowGroupJson = data.utf8String
flowGroupJson = flowGroupJson.replaceAll("}","}\n")
//flowJson = JsonFormatTool.formatJson(flowJson)
//val (appId,pid,process) = API.startFlowGroup(flowGroupJson)
API.startFlowGroup(flowGroupJson)
//processMap += (appId -> process)
//val result = "{\"flow\":{\"id\":\"" + appId + "\",\"pid\":\"" + pid + "\"}}"
Future.successful(HttpResponse(entity = "okokok!!!"))
Future.successful(HttpResponse(entity = "start flow group ok!!!"))
}
case ex => {
println(ex)
Future.successful(HttpResponse(entity = "Can not start flow!"))
Future.successful(HttpResponse(entity = "Can not start flow group!"))
//Future.failed(/*new Exception("Can not start flow!")*/HttpResponse(entity = "Can not start flow!"))
}
}
}
case HttpRequest(POST, Uri.Path("/project/start"), headers, entity, protocol) =>{
entity match {
case HttpEntity.Strict(_, data) =>{
var projectJson = data.utf8String
projectJson = projectJson.replaceAll("}","}\n")
API.startProject(projectJson)
Future.successful(HttpResponse(entity = "start project ok!!!"))
}
case ex => {
println(ex)
Future.successful(HttpResponse(entity = "Can not start project!"))
//Future.failed(/*new Exception("Can not start flow!")*/HttpResponse(entity = "Can not start flow!"))
}
}