modify log
This commit is contained in:
parent
6b6a1588ec
commit
165965fda4
|
@ -1,5 +1,7 @@
|
|||
package cn.piflow
|
||||
|
||||
import java.util.Date
|
||||
|
||||
import cn.piflow.util.{FlowState, H2Util, Logging, StopState}
|
||||
import org.apache.spark.sql.SparkSession
|
||||
|
||||
|
@ -112,39 +114,43 @@ class RunnerLogger extends RunnerListener with Logging {
|
|||
override def onProcessStarted(ctx: ProcessContext): Unit = {
|
||||
val pid = ctx.getProcess().pid();
|
||||
val flowName = ctx.getFlow().toString;
|
||||
logger.debug(s"process started: $pid, flow: $flowName");
|
||||
println(s"process started: $pid, flow: $flowName")
|
||||
val time = new Date().toString
|
||||
logger.debug(s"process started: $pid, flow: $flowName, time: $time");
|
||||
println(s"process started: $pid, flow: $flowName, time: $time")
|
||||
//update flow state to STARTED
|
||||
val appId = getAppId(ctx)
|
||||
H2Util.addFlow(appId,ctx.getFlow().getClass.toString)
|
||||
H2Util.updateFlowState(appId,FlowState.STARTED)
|
||||
H2Util.updateFlowStartTime(appId)
|
||||
H2Util.updateFlowStartTime(appId,time)
|
||||
};
|
||||
|
||||
override def onJobStarted(ctx: JobContext): Unit = {
|
||||
val jid = ctx.getStopJob().jid();
|
||||
val stopName = ctx.getStopJob().getStopName();
|
||||
logger.debug(s"job started: $jid, stop: $stopName");
|
||||
println(s"job started: $jid, stop: $stopName")
|
||||
val time = new Date().toString
|
||||
logger.debug(s"job started: $jid, stop: $stopName, time: $time");
|
||||
println(s"job started: $jid, stop: $stopName, time: $time")
|
||||
//update stop state to STARTED
|
||||
H2Util.updateStopState(getAppId(ctx),stopName,StopState.STARTED)
|
||||
H2Util.updateStopStartTime(getAppId(ctx),stopName)
|
||||
H2Util.updateStopStartTime(getAppId(ctx),stopName,time)
|
||||
};
|
||||
|
||||
override def onJobFailed(ctx: JobContext): Unit = {
|
||||
ctx.getProcessContext()
|
||||
val stopName = ctx.getStopJob().getStopName();
|
||||
logger.debug(s"job failed: $stopName");
|
||||
println(s"job failed: $stopName")
|
||||
val time = new Date().toString
|
||||
logger.debug(s"job failed: $stopName, time: $time");
|
||||
println(s"job failed: $stopName, time: $time")
|
||||
//update stop state to FAILED
|
||||
H2Util.updateStopState(getAppId(ctx),stopName,StopState.FAILED)
|
||||
H2Util.updateStopFinishedTime(getAppId(ctx),stopName)
|
||||
H2Util.updateStopFinishedTime(getAppId(ctx),stopName,time)
|
||||
};
|
||||
|
||||
override def onJobInitialized(ctx: JobContext): Unit = {
|
||||
val stopName = ctx.getStopJob().getStopName();
|
||||
logger.debug(s"job initialized: $stopName");
|
||||
println(s"job initialized: $stopName")
|
||||
val time = new Date().toString
|
||||
logger.debug(s"job initialized: $stopName, time: $time");
|
||||
println(s"job initialized: $stopName, time: $time")
|
||||
//add stop into h2 db and update stop state to INIT
|
||||
val appId = getAppId(ctx)
|
||||
H2Util.addStop(appId,stopName)
|
||||
|
@ -153,49 +159,54 @@ class RunnerLogger extends RunnerListener with Logging {
|
|||
|
||||
override def onProcessCompleted(ctx: ProcessContext): Unit = {
|
||||
val pid = ctx.getProcess().pid();
|
||||
logger.debug(s"process completed: $pid");
|
||||
println(s"process completed: $pid")
|
||||
val time = new Date().toString
|
||||
logger.debug(s"process completed: $pid, time: $time");
|
||||
println(s"process completed: $pid, time: $time")
|
||||
//update flow state to COMPLETED
|
||||
val appId = getAppId(ctx)
|
||||
H2Util.updateFlowState(appId,FlowState.COMPLETED)
|
||||
H2Util.updateFlowFinishedTime(appId)
|
||||
H2Util.updateFlowFinishedTime(appId,time)
|
||||
};
|
||||
|
||||
override def onJobCompleted(ctx: JobContext): Unit = {
|
||||
val stopName = ctx.getStopJob().getStopName();
|
||||
logger.debug(s"job completed: $stopName");
|
||||
println(s"job completed: $stopName")
|
||||
val time = new Date().toString
|
||||
logger.debug(s"job completed: $stopName, time: $time");
|
||||
println(s"job completed: $stopName, time: $time")
|
||||
//update stop state to COMPLETED
|
||||
val appId = getAppId(ctx)
|
||||
H2Util.updateStopState(appId,stopName,StopState.COMPLETED)
|
||||
H2Util.updateStopFinishedTime(appId,stopName)
|
||||
H2Util.updateStopFinishedTime(appId,stopName,time)
|
||||
};
|
||||
|
||||
override def onProcessFailed(ctx: ProcessContext): Unit = {
|
||||
val pid = ctx.getProcess().pid();
|
||||
logger.debug(s"process failed: $pid");
|
||||
println(s"process failed: $pid")
|
||||
val time = new Date().toString
|
||||
logger.debug(s"process failed: $pid, time: $time");
|
||||
println(s"process failed: $pid, time: $time")
|
||||
//update flow state to FAILED
|
||||
val appId = getAppId(ctx)
|
||||
H2Util.updateFlowState(getAppId(ctx),FlowState.FAILED)
|
||||
H2Util.updateFlowFinishedTime(appId)
|
||||
H2Util.updateFlowFinishedTime(appId,time)
|
||||
}
|
||||
|
||||
override def onProcessAborted(ctx: ProcessContext): Unit = {
|
||||
val pid = ctx.getProcess().pid();
|
||||
logger.debug(s"process aborted: $pid");
|
||||
println(s"process aborted: $pid")
|
||||
val time = new Date().toString
|
||||
logger.debug(s"process aborted: $pid, time: $time");
|
||||
println(s"process aborted: $pid, time: $time")
|
||||
//update flow state to ABORTED
|
||||
val appId = getAppId(ctx)
|
||||
H2Util.updateFlowState(appId,FlowState.ABORTED)
|
||||
H2Util.updateFlowFinishedTime(appId)
|
||||
H2Util.updateFlowFinishedTime(appId,time)
|
||||
}
|
||||
|
||||
override def onProcessForked(ctx: ProcessContext, child: ProcessContext): Unit = {
|
||||
val pid = ctx.getProcess().pid();
|
||||
val cid = child.getProcess().pid();
|
||||
logger.debug(s"process forked: $pid, child flow execution: $cid");
|
||||
println(s"process forked: $pid, child flow execution: $cid")
|
||||
val time = new Date().toString
|
||||
logger.debug(s"process forked: $pid, child flow execution: $cid, time: $time");
|
||||
println(s"process forked: $pid, child flow execution: $cid, time: $time")
|
||||
//update flow state to FORK
|
||||
H2Util.updateFlowState(getAppId(ctx),FlowState.FORK)
|
||||
}
|
||||
|
|
|
@ -10,7 +10,6 @@ object H2Util {
|
|||
val QUERY_TIME = 30
|
||||
val CREATE_FLOW_TABLE = "create table if not exists flow (id varchar(255), name varchar(255), state varchar(255), startTime varchar(255), endTime varchar(255))"
|
||||
val CREATE_STOP_TABLE = "create table if not exists stop (flowId varchar(255), name varchar(255), state varchar(255), startTime varchar(255), endTime varchar(255))"
|
||||
//val CONNECTION_URL = "jdbc:h2:tcp://" + PropertyUtil.getPropertyValue("server.ip") + ":9092/~/piflow"
|
||||
val serverIP = PropertyUtil.getPropertyValue("server.ip") + ":" + PropertyUtil.getPropertyValue("h2.port")
|
||||
val CONNECTION_URL = "jdbc:h2:tcp://" + serverIP + "/~/piflow;AUTO_SERVER=true"
|
||||
var connection : Connection= null
|
||||
|
@ -47,25 +46,23 @@ object H2Util {
|
|||
val statement = getConnectionInstance().createStatement()
|
||||
statement.setQueryTimeout(QUERY_TIME)
|
||||
val updateSql = "update flow set state='" + state + "' where id='" + appId + "'"
|
||||
println(updateSql)
|
||||
//println(updateSql)
|
||||
statement.executeUpdate(updateSql)
|
||||
statement.close()
|
||||
}
|
||||
def updateFlowStartTime(appId:String) = {
|
||||
val startTime = new Date().toString
|
||||
def updateFlowStartTime(appId:String, startTime:String) = {
|
||||
val statement = getConnectionInstance().createStatement()
|
||||
statement.setQueryTimeout(QUERY_TIME)
|
||||
val updateSql = "update flow set startTime='" + startTime + "' where id='" + appId + "'"
|
||||
println(updateSql)
|
||||
//println(updateSql)
|
||||
statement.executeUpdate(updateSql)
|
||||
statement.close()
|
||||
}
|
||||
def updateFlowFinishedTime(appId:String) = {
|
||||
val endTime = new Date().toString
|
||||
def updateFlowFinishedTime(appId:String, endTime:String) = {
|
||||
val statement = getConnectionInstance().createStatement()
|
||||
statement.setQueryTimeout(QUERY_TIME)
|
||||
val updateSql = "update flow set endTime='" + endTime + "' where id='" + appId + "'"
|
||||
println(updateSql)
|
||||
//println(updateSql)
|
||||
statement.executeUpdate(updateSql)
|
||||
statement.close()
|
||||
}
|
||||
|
@ -77,7 +74,7 @@ object H2Util {
|
|||
val rs : ResultSet = statement.executeQuery("select * from flow where id='" + appId +"'")
|
||||
while(rs.next()){
|
||||
state = rs.getString("state")
|
||||
println("id:" + rs.getString("id") + "\tname:" + rs.getString("name") + "\tstate:" + rs.getString("state"))
|
||||
//println("id:" + rs.getString("id") + "\tname:" + rs.getString("name") + "\tstate:" + rs.getString("state"))
|
||||
}
|
||||
rs.close()
|
||||
statement.close()
|
||||
|
@ -129,14 +126,14 @@ object H2Util {
|
|||
val totalRS : ResultSet = statement.executeQuery("select count(*) as stopCount from stop where flowId='" + appId +"'")
|
||||
while(totalRS.next()){
|
||||
stopCount = totalRS.getInt("stopCount")
|
||||
println("stopCount:" + stopCount)
|
||||
//println("stopCount:" + stopCount)
|
||||
}
|
||||
totalRS.close()
|
||||
|
||||
val completedRS : ResultSet = statement.executeQuery("select count(*) as completedStopCount from stop where flowId='" + appId +"' and state='" + StopState.COMPLETED + "'")
|
||||
while(completedRS.next()){
|
||||
completedStopCount = completedRS.getInt("completedStopCount")
|
||||
println("completedStopCount:" + completedStopCount)
|
||||
//println("completedStopCount:" + completedStopCount)
|
||||
}
|
||||
completedRS.close()
|
||||
statement.close()
|
||||
|
@ -155,27 +152,25 @@ object H2Util {
|
|||
val statement = getConnectionInstance().createStatement()
|
||||
statement.setQueryTimeout(QUERY_TIME)
|
||||
val updateSql = "update stop set state='" + state + "' where flowId='" + appId + "' and name='" + name + "'"
|
||||
println(updateSql)
|
||||
//println(updateSql)
|
||||
statement.executeUpdate(updateSql)
|
||||
statement.close()
|
||||
}
|
||||
|
||||
def updateStopStartTime(appId:String, name:String) = {
|
||||
val startTime = new Date().toString
|
||||
def updateStopStartTime(appId:String, name:String, startTime:String) = {
|
||||
val statement = getConnectionInstance().createStatement()
|
||||
statement.setQueryTimeout(QUERY_TIME)
|
||||
val updateSql = "update stop set startTime='" + startTime + "' where flowId='" + appId + "' and name='" + name + "'"
|
||||
println(updateSql)
|
||||
//println(updateSql)
|
||||
statement.executeUpdate(updateSql)
|
||||
statement.close()
|
||||
}
|
||||
|
||||
def updateStopFinishedTime(appId:String, name:String) = {
|
||||
val endTime = new Date().toString
|
||||
def updateStopFinishedTime(appId:String, name:String, endTime:String) = {
|
||||
val statement = getConnectionInstance().createStatement()
|
||||
statement.setQueryTimeout(QUERY_TIME)
|
||||
val updateSql = "update stop set endTime='" + endTime + "' where flowId='" + appId + "' and name='" + name + "'"
|
||||
println(updateSql)
|
||||
//println(updateSql)
|
||||
statement.executeUpdate(updateSql)
|
||||
statement.close()
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue