1. modify show data function

2. fix bug: when more than 5 requests of starting flow are posted, piflow server will get exception:No element passed in the last 1 minute
This commit is contained in:
judy0131 2019-03-27 15:09:34 +08:00
parent e9f354457f
commit a0764ceb68
5 changed files with 38 additions and 23 deletions

View File

@ -13,6 +13,7 @@ class FlowBean {
var checkpoint : String = _ var checkpoint : String = _
var checkpointParentProcessId : String = _ var checkpointParentProcessId : String = _
var runMode : String = _ var runMode : String = _
var showData : String = _
var stops : List[StopBean] = List() var stops : List[StopBean] = List()
var paths : List[PathBean] = List() var paths : List[PathBean] = List()
@ -25,6 +26,7 @@ class FlowBean {
this.checkpoint = flowMap.getOrElse("checkpoint","").asInstanceOf[String] this.checkpoint = flowMap.getOrElse("checkpoint","").asInstanceOf[String]
this.checkpointParentProcessId = flowMap.getOrElse("checkpointParentProcessId", "").asInstanceOf[String] this.checkpointParentProcessId = flowMap.getOrElse("checkpointParentProcessId", "").asInstanceOf[String]
this.runMode = flowMap.getOrElse("runMode","RUN").asInstanceOf[String] this.runMode = flowMap.getOrElse("runMode","RUN").asInstanceOf[String]
this.showData = flowMap.getOrElse("showData","0").asInstanceOf[String]
//construct StopBean List //construct StopBean List
val stopsList = MapUtil.get(flowMap,"stops").asInstanceOf[List[Map[String, Any]]] val stopsList = MapUtil.get(flowMap,"stops").asInstanceOf[List[Map[String, Any]]]

View File

@ -378,13 +378,12 @@ class JobOutputStreamImpl() extends JobOutputStream with Logging {
def getDataFrame(port: String) = mapDataFrame(port); def getDataFrame(port: String) = mapDataFrame(port);
def showData() = { def showData(count:Int) = {
mapDataFrame.foreach(en => { mapDataFrame.foreach(en => {
val portName = if(en._1.equals("")) "default" else en._1 val portName = if(en._1.equals("")) "default" else en._1
println(portName + " port: ") println(portName + " port: ")
en._2.apply().show(count)
en._2.apply().show(PropertyUtil.getPropertyValue("data.show").toInt)
}) })
} }
@ -484,7 +483,10 @@ class ProcessImpl(flow: Flow, runnerContext: Context, runner: Runner, parentProc
runnerListener.onJobCompleted(pe.getContext()); runnerListener.onJobCompleted(pe.getContext());
//show data in log //show data in log
outputs.showData() val showDataCount = PropertyUtil.getPropertyValue("data.show").toInt
if(showDataCount > 0) {
outputs.showData(showDataCount)
}
} }
catch { catch {
case e: Throwable => case e: Throwable =>
@ -528,7 +530,10 @@ class ProcessImpl(flow: Flow, runnerContext: Context, runner: Runner, parentProc
} }
} }
//show data in log //show data in log
outputs.showData() val showDataCount = PropertyUtil.getPropertyValue("data.show").toInt
if(showDataCount > 0) {
outputs.showData(showDataCount)
}
//save data in debug mode //save data in debug mode
if(flow.getRunMode() == FlowRunMode.DEBUG) { if(flow.getRunMode() == FlowRunMode.DEBUG) {

View File

@ -10,6 +10,12 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>piflow-server</artifactId> <artifactId>piflow-server</artifactId>
<properties>
<akka.version>2.4.14</akka.version>
<akka.http.version>10.0.0</akka.http.version>
</properties>
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>piflow</groupId> <groupId>piflow</groupId>
@ -30,40 +36,31 @@
<dependency> <dependency>
<groupId>com.typesafe.akka</groupId> <groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_2.11</artifactId> <artifactId>akka-remote_2.11</artifactId>
<version>2.3.14</version> <version>${akka.version}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.typesafe.akka</groupId> <groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId> <artifactId>akka-actor_2.11</artifactId>
<version>2.3.14</version> <version>${akka.version}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.typesafe.akka</groupId> <groupId>com.typesafe.akka</groupId>
<artifactId>akka-slf4j_2.11</artifactId> <artifactId>akka-slf4j_2.11</artifactId>
<version>2.3.14</version> <version>${akka.version}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.typesafe.akka</groupId> <groupId>com.typesafe.akka</groupId>
<artifactId>akka-stream-experimental_2.11</artifactId> <artifactId>akka-http_2.11</artifactId>
<version>2.0.4</version> <version>${akka.http.version}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.typesafe.akka</groupId> <groupId>com.typesafe.akka</groupId>
<artifactId>akka-http-core-experimental_2.11</artifactId> <artifactId>akka-http-spray-json_2.11</artifactId>
<version>2.0.4</version> <version>${akka.http.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-http-experimental_2.11</artifactId>
<version>2.0.4</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-http-spray-json-experimental_2.11</artifactId>
<version>2.0.4</version>
</dependency> </dependency>
</dependencies> </dependencies>

View File

@ -0,0 +1,10 @@
PiFlowHTTPService{
shutdown-timeout:300s
}
akka {
http {
idle-timeout = 600 s
request-timeout = 200 s
}
}

View File

@ -19,7 +19,8 @@ import spray.json.DefaultJsonProtocol
object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSupport{ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSupport{
implicit val system = ActorSystem("HTTPService", ConfigFactory.load()) implicit val config = ConfigFactory.load()
implicit val system = ActorSystem("PiFlowHTTPService", config)
implicit val materializer = ActorMaterializer() implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher implicit val executionContext = system.dispatcher
var processMap = Map[String, SparkAppHandle]() var processMap = Map[String, SparkAppHandle]()