fix bug: piflow can not run big flow/flowGroup, because flow json is too large

This commit is contained in:
judy_0131 2020-10-23 09:30:16 +08:00
parent 92461f0a8b
commit 434f86f82b
2 changed files with 113 additions and 25 deletions

View File

@ -12,8 +12,8 @@
<artifactId>piflow-server</artifactId>
<properties>
<akka.version>2.4.14</akka.version>
<akka.http.version>10.0.0</akka.http.version>
<akka.version>2.5.32</akka.version>
<akka.http.version>10.1.12</akka.http.version>
<akka.quartz.version>1.6.0-akka-2.4.x</akka.quartz.version>
</properties>

View File

@ -10,8 +10,12 @@ import akka.http.scaladsl.Http
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.HttpMethods._
import akka.http.scaladsl.model.StatusCodes.Success
import akka.http.scaladsl.server.Directives
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import akka.util.ByteString
import cn.piflow.GroupExecution
import cn.piflow.api.HTTPService.pluginManager
import cn.piflow.conf.util.{MapUtil, OptionUtil, PluginManager}
@ -19,7 +23,7 @@ import cn.piflow.util._
import com.typesafe.akka.extension.quartz.QuartzSchedulerExtension
import com.typesafe.config.ConfigFactory
import scala.concurrent.Future
import scala.concurrent.{Await, Future}
import scala.util.parsing.json.JSON
import org.apache.spark.launcher.SparkAppHandle
import org.flywaydb.core.Flyway
@ -167,24 +171,44 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup
case HttpRequest(POST, Uri.Path("/flow/start"), headers, entity, protocol) =>{
entity match {
case HttpEntity.Strict(_, data) =>{
var flowJson = data.utf8String
// flowJson = flowJson.replaceAll("}","}\n")
//flowJson = JsonFormatTool.formatJson(flowJson)
val (appId,process) = API.startFlow(flowJson)
processMap += (appId -> process)
val result = "{\"flow\":{\"id\":\"" + appId + "\"}}"
Future.successful(HttpResponse(SUCCESS_CODE, entity = result))
}
case ex => {
try{
/*entity match {
case HttpEntity.Strict(_, data) =>{
var flowJson = data.utf8String
// flowJson = flowJson.replaceAll("}","}\n")
//flowJson = JsonFormatTool.formatJson(flowJson)
val (appId,process) = API.startFlow(flowJson)
processMap += (appId -> process)
val result = "{\"flow\":{\"id\":\"" + appId + "\"}}"
Future.successful(HttpResponse(SUCCESS_CODE, entity = result))
}
case otherType => {
println(otherType)
val bodyFeature = Unmarshal(entity).to [String]
val flowJson = Await.result(bodyFeature,scala.concurrent.duration.Duration(1,"second"))
val (appId,process) = API.startFlow(flowJson)
processMap += (appId -> process)
val result = "{\"flow\":{\"id\":\"" + appId + "\"}}"
Future.successful(HttpResponse(SUCCESS_CODE, entity = result))
}
}*/
val bodyFeature = Unmarshal(entity).to [String]
val flowJson = Await.result(bodyFeature,scala.concurrent.duration.Duration(1,"second"))
val (appId,process) = API.startFlow(flowJson)
processMap += (appId -> process)
val result = "{\"flow\":{\"id\":\"" + appId + "\"}}"
Future.successful(HttpResponse(SUCCESS_CODE, entity = result))
}catch {
case ex : Exception => {
println(ex)
Future.successful(HttpResponse(FAIL_CODE, entity = "Can not start flow!"))
//Future.failed(/*new Exception("Can not start flow!")*/HttpResponse(entity = "Can not start flow!"))
}
}
}
@ -271,15 +295,33 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup
case HttpRequest(POST, Uri.Path("/group/start"), headers, entity, protocol) =>{
entity match {
case HttpEntity.Strict(_, data) =>{
var flowGroupJson = data.utf8String
val flowGroupExecution = API.startGroup(flowGroupJson)
flowGroupMap += (flowGroupExecution.getGroupId() -> flowGroupExecution)
val result = "{\"group\":{\"id\":\"" + flowGroupExecution.getGroupId() + "\"}}"
Future.successful(HttpResponse(SUCCESS_CODE, entity = result))
}
try{
/*entity match {
case HttpEntity.Strict(_, data) =>{
var flowGroupJson = data.utf8String
val flowGroupExecution = API.startGroup(flowGroupJson)
flowGroupMap += (flowGroupExecution.getGroupId() -> flowGroupExecution)
val result = "{\"group\":{\"id\":\"" + flowGroupExecution.getGroupId() + "\"}}"
Future.successful(HttpResponse(SUCCESS_CODE, entity = result))
}
case otherType => {
println(otherType)
val bodyFeature = Unmarshal(entity).to [String]
val flowGroupJson = Await.result(bodyFeature,scala.concurrent.duration.Duration(1,"second"))
val flowGroupExecution = API.startGroup(flowGroupJson)
flowGroupMap += (flowGroupExecution.getGroupId() -> flowGroupExecution)
val result = "{\"group\":{\"id\":\"" + flowGroupExecution.getGroupId() + "\"}}"
Future.successful(HttpResponse(SUCCESS_CODE, entity = result))
}
}*/
val bodyFeature = Unmarshal(entity).to [String]
val flowGroupJson = Await.result(bodyFeature,scala.concurrent.duration.Duration(1,"second"))
val flowGroupExecution = API.startGroup(flowGroupJson)
flowGroupMap += (flowGroupExecution.getGroupId() -> flowGroupExecution)
val result = "{\"group\":{\"id\":\"" + flowGroupExecution.getGroupId() + "\"}}"
Future.successful(HttpResponse(SUCCESS_CODE, entity = result))
}catch {
case ex => {
println(ex)
Future.successful(HttpResponse(FAIL_CODE, entity = "Can not start group!"))
@ -341,7 +383,53 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup
//schedule related API
case HttpRequest(POST, Uri.Path("/schedule/start"), headers, entity, protocol) =>{
entity match {
try{
val bodyFeature = Unmarshal(entity).to [String]
val data = Await.result(bodyFeature,scala.concurrent.duration.Duration(1,"second"))
val dataMap = toJson(data)
val expression = dataMap.get("expression").getOrElse("").asInstanceOf[String]
val startDateStr = dataMap.get("startDate").getOrElse("").asInstanceOf[String]
val endDateStr = dataMap.get("endDate").getOrElse("").asInstanceOf[String]
val scheduleInstance = dataMap.get("schedule").getOrElse(Map[String, Any]()).asInstanceOf[Map[String, Any]]
val id : String = "schedule_" + IdGenerator.uuid() ;
var scheduleType = ""
if(!scheduleInstance.getOrElse("flow","").equals("")){
scheduleType = ScheduleType.FLOW
}else if(!scheduleInstance.getOrElse("group","").equals("")){
scheduleType = ScheduleType.GROUP
}else{
Future.successful(HttpResponse(FAIL_CODE, entity = "Can not schedule, please check the json format!"))
}
val flowActor = system.actorOf(Props(new ExecutionActor(id,scheduleType)))
scheduler.createSchedule(id,cronExpression = expression)
//scheduler.schedule(id,flowActor,JsonUtil.format(JsonUtil.toJson(scheduleInstance)))
if(startDateStr.equals("")){
scheduler.schedule(id,flowActor,JsonUtil.format(JsonUtil.toJson(scheduleInstance)))
}else{
val startDate : Option[Date] = Some(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(startDateStr))
scheduler.schedule(id,flowActor,JsonUtil.format(JsonUtil.toJson(scheduleInstance)), startDate)
}
actorMap += (id -> flowActor)
H2Util.addScheduleInstance(id, expression, startDateStr, endDateStr, ScheduleState.STARTED)
//save schedule json file
val flowFile = FlowFileUtil.getScheduleFilePath(id)
FileUtil.writeFile(data, flowFile)
Future.successful(HttpResponse(SUCCESS_CODE, entity = id))
}catch {
case ex => {
println(ex)
Future.successful(HttpResponse(FAIL_CODE, entity = "Can not start flow!"))
}
}
/*entity match {
case HttpEntity.Strict(_, data) =>{
val dataMap = toJson(data.utf8String)
@ -383,7 +471,7 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup
println(ex)
Future.successful(HttpResponse(FAIL_CODE, entity = "Can not start flow!"))
}
}
}*/
}