fix checkpoint bug
This commit is contained in:
parent
7921775508
commit
808a8e1693
|
@ -47,14 +47,16 @@
|
|||
"uuid":"555",
|
||||
"name":"Merge",
|
||||
"bundle":"cn.piflow.bundle.common.Merge",
|
||||
"properties":{}
|
||||
"properties":{
|
||||
"inports":"data1,data2"
|
||||
}
|
||||
},
|
||||
{
|
||||
"uuid":"666",
|
||||
"name":"Fork",
|
||||
"bundle":"cn.piflow.bundle.common.Fork",
|
||||
"properties":{
|
||||
"outports":["out1","out2","out3"]
|
||||
"outports":"out1,out2,out3"
|
||||
}
|
||||
},
|
||||
{
|
||||
|
|
|
@ -8,7 +8,7 @@ import org.apache.spark.ml.clustering.{DistributedLDAModel, LDAModel, LocalLDAMo
|
|||
import org.apache.spark.sql.SparkSession
|
||||
|
||||
class LDAPrediction extends ConfigurableStop{
|
||||
val authorEmail: String = "06whuxx@163.com
|
||||
val authorEmail: String = "06whuxx@163.com"
|
||||
val description: String = "Make use of a exist LDAModel to predict."
|
||||
val inportList: List[String] = List(PortEnum.NonePort.toString)
|
||||
val outportList: List[String] = List(PortEnum.DefaultPort.toString)
|
||||
|
|
|
@ -55,7 +55,8 @@ class FlowBean {
|
|||
})
|
||||
|
||||
if(!this.checkpoint.equals("")){
|
||||
flow.addCheckPoint(this.checkpoint)
|
||||
val checkpointList = this.checkpoint.split(",")
|
||||
checkpointList.foreach{checkpoint => flow.addCheckPoint(checkpoint)}
|
||||
}
|
||||
|
||||
flow
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package cn.piflow.bundle
|
||||
|
||||
import cn.piflow.{FlowImpl, Runner}
|
||||
import cn.piflow.bundle.url.{GetUrl, PostUrl}
|
||||
import cn.piflow.conf.bean.FlowBean
|
||||
import cn.piflow.conf.util.{FileUtil, OptionUtil}
|
||||
import org.apache.commons.httpclient.HttpClient
|
||||
|
|
|
@ -5,6 +5,7 @@ import java.util.Date
|
|||
|
||||
import net.liftweb.json.compactRender
|
||||
import net.liftweb.json.JsonDSL._
|
||||
import org.h2.tools.Server
|
||||
|
||||
object H2Util {
|
||||
|
||||
|
@ -19,8 +20,6 @@ object H2Util {
|
|||
|
||||
val statement = getConnectionInstance().createStatement()
|
||||
statement.setQueryTimeout(QUERY_TIME)
|
||||
//statement.executeUpdate("drop table if exists flow")
|
||||
//statement.executeUpdate("drop table if exists stop")
|
||||
statement.executeUpdate(CREATE_FLOW_TABLE)
|
||||
statement.executeUpdate(CREATE_STOP_TABLE)
|
||||
statement.close()
|
||||
|
@ -36,6 +35,17 @@ object H2Util {
|
|||
connection
|
||||
}
|
||||
|
||||
def cleanDatabase() = {
|
||||
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort",PropertyUtil.getPropertyValue("h2.port")).start()
|
||||
val statement = getConnectionInstance().createStatement()
|
||||
statement.setQueryTimeout(QUERY_TIME)
|
||||
statement.executeUpdate("drop table if exists flow")
|
||||
statement.executeUpdate("drop table if exists stop")
|
||||
statement.close()
|
||||
h2Server.shutdown()
|
||||
|
||||
}
|
||||
|
||||
def addFlow(appId:String,pId:String, name:String)={
|
||||
val startTime = new Date().toString
|
||||
val statement = getConnectionInstance().createStatement()
|
||||
|
|
|
@ -120,7 +120,8 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup
|
|||
|
||||
case ex => {
|
||||
println(ex)
|
||||
Future.failed(new Exception("Can not start flow!"))
|
||||
Future.successful(HttpResponse(entity = "Can not start flow!"))
|
||||
//Future.failed(/*new Exception("Can not start flow!")*/HttpResponse(entity = "Can not start flow!"))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue