forked from opensci/piflow
monitor flow info and stop info
This commit is contained in:
parent
8d626a876a
commit
80e3e874d6
|
@ -0,0 +1,23 @@
|
|||
server.ip=10.0.86.98
|
||||
server.port=8001
|
||||
|
||||
#spark.master=spark://10.0.86.89:7077
|
||||
#spark.master=spark://10.0.86.191:7077
|
||||
spark.master=yarn
|
||||
spark.deploy.mode=cluster
|
||||
yarn.resourcemanager.hostname=10.0.86.191
|
||||
yarn.resourcemanager.address=10.0.86.191:8032
|
||||
yarn.access.namenode=hdfs://10.0.86.191:9000
|
||||
yarn.stagingDir=hdfs://10.0.86.191:9000/tmp/
|
||||
yarn.jars=hdfs://10.0.86.191:9000/user/spark/share/lib/*.jar
|
||||
|
||||
hive.metastore.uris=thrift://10.0.86.191:9083
|
||||
|
||||
#piflow.bundle=/opt/project/piflow/out/artifacts/piflow_bundle/piflow-bundle.jar
|
||||
piflow.bundle=/opt/project/piflow/piflow-server/target/piflow-server-0.9.jar
|
||||
|
||||
yarn.url=http://10.0.86.191:8088/ws/v1/cluster/apps/
|
||||
checkpoint.path=hdfs://10.0.86.89:9000/xjzhu/piflow/checkpoints/
|
||||
|
||||
log.path=/opt/project/piflow/logs
|
||||
icon.path=/opt/project/piflow/icon
|
|
@ -1,6 +1,7 @@
|
|||
package cn.piflow
|
||||
|
||||
import cn.piflow.util.Logging
|
||||
import cn.piflow.util.{FlowState, H2Util, Logging, StopState}
|
||||
import org.apache.spark.sql.SparkSession
|
||||
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
|
||||
|
@ -113,6 +114,11 @@ class RunnerLogger extends RunnerListener with Logging {
|
|||
val flowName = ctx.getFlow().toString;
|
||||
logger.debug(s"process started: $pid, flow: $flowName");
|
||||
println(s"process started: $pid, flow: $flowName")
|
||||
//update flow state to STARTED
|
||||
val appId = getAppId(ctx)
|
||||
H2Util.addFlow(appId,flowName)
|
||||
H2Util.updateFlowState(appId,FlowState.STARTED)
|
||||
H2Util.updateFlowStartTime(appId)
|
||||
};
|
||||
|
||||
override def onJobStarted(ctx: JobContext): Unit = {
|
||||
|
@ -120,42 +126,69 @@ class RunnerLogger extends RunnerListener with Logging {
|
|||
val stopName = ctx.getStopJob().getStopName();
|
||||
logger.debug(s"job started: $jid, stop: $stopName");
|
||||
println(s"job started: $jid, stop: $stopName")
|
||||
//update stop state to STARTED
|
||||
H2Util.updateStopState(getAppId(ctx),stopName,StopState.STARTED)
|
||||
H2Util.updateStopStartTime(getAppId(ctx),stopName)
|
||||
};
|
||||
|
||||
override def onJobFailed(ctx: JobContext): Unit = {
|
||||
ctx.getProcessContext()
|
||||
val stopName = ctx.getStopJob().getStopName();
|
||||
logger.debug(s"job failed: $stopName");
|
||||
println(s"job failed: $stopName")
|
||||
//update stop state to FAILED
|
||||
H2Util.updateStopState(getAppId(ctx),stopName,StopState.FAILED)
|
||||
H2Util.updateStopFinishedTime(getAppId(ctx),stopName)
|
||||
};
|
||||
|
||||
override def onJobInitialized(ctx: JobContext): Unit = {
|
||||
val stopName = ctx.getStopJob().getStopName();
|
||||
logger.debug(s"job initialized: $stopName");
|
||||
println(s"job initialized: $stopName")
|
||||
//add stop into h2 db and update stop state to INIT
|
||||
val appId = getAppId(ctx)
|
||||
H2Util.addStop(appId,stopName)
|
||||
H2Util.updateStopState(appId,stopName,StopState.INIT)
|
||||
};
|
||||
|
||||
override def onProcessCompleted(ctx: ProcessContext): Unit = {
|
||||
val pid = ctx.getProcess().pid();
|
||||
logger.debug(s"process completed: $pid");
|
||||
println(s"process completed: $pid")
|
||||
//update flow state to COMPLETED
|
||||
val appId = getAppId(ctx)
|
||||
H2Util.updateFlowState(appId,FlowState.COMPLETED)
|
||||
H2Util.updateFlowFinishedTime(appId)
|
||||
};
|
||||
|
||||
override def onJobCompleted(ctx: JobContext): Unit = {
|
||||
val stopName = ctx.getStopJob().getStopName();
|
||||
logger.debug(s"job completed: $stopName");
|
||||
println(s"job completed: $stopName")
|
||||
//update stop state to COMPLETED
|
||||
val appId = getAppId(ctx)
|
||||
H2Util.updateStopState(appId,stopName,StopState.COMPLETED)
|
||||
H2Util.updateStopFinishedTime(appId,stopName)
|
||||
};
|
||||
|
||||
override def onProcessFailed(ctx: ProcessContext): Unit = {
|
||||
val pid = ctx.getProcess().pid();
|
||||
logger.debug(s"process failed: $pid");
|
||||
println(s"process failed: $pid")
|
||||
//update flow state to FAILED
|
||||
val appId = getAppId(ctx)
|
||||
H2Util.updateFlowState(getAppId(ctx),FlowState.FAILED)
|
||||
H2Util.updateFlowFinishedTime(appId)
|
||||
}
|
||||
|
||||
override def onProcessAborted(ctx: ProcessContext): Unit = {
|
||||
val pid = ctx.getProcess().pid();
|
||||
logger.debug(s"process aborted: $pid");
|
||||
println(s"process aborted: $pid")
|
||||
//update flow state to ABORTED
|
||||
val appId = getAppId(ctx)
|
||||
H2Util.updateFlowState(appId,FlowState.ABORTED)
|
||||
H2Util.updateFlowFinishedTime(appId)
|
||||
}
|
||||
|
||||
override def onProcessForked(ctx: ProcessContext, child: ProcessContext): Unit = {
|
||||
|
@ -163,5 +196,12 @@ class RunnerLogger extends RunnerListener with Logging {
|
|||
val cid = child.getProcess().pid();
|
||||
logger.debug(s"process forked: $pid, child flow execution: $cid");
|
||||
println(s"process forked: $pid, child flow execution: $cid")
|
||||
//update flow state to FORK
|
||||
H2Util.updateFlowState(getAppId(ctx),FlowState.FORK)
|
||||
}
|
||||
|
||||
private def getAppId(ctx: Context) : String = {
|
||||
val sparkSession = ctx.get(classOf[SparkSession].getName).asInstanceOf[SparkSession]
|
||||
sparkSession.sparkContext.applicationId
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,10 @@
|
|||
package cn.piflow.util
|
||||
|
||||
object FlowState {
|
||||
|
||||
val STARTED = "STARTED"
|
||||
val COMPLETED = "COMPLETED"
|
||||
val FAILED = "FAILED"
|
||||
val ABORTED = "ABORTED"
|
||||
val FORK = "FORK"
|
||||
}
|
|
@ -0,0 +1,198 @@
|
|||
package cn.piflow.util
|
||||
|
||||
import java.sql.{Connection, DriverManager, ResultSet}
|
||||
import java.util.Date
|
||||
|
||||
import org.h2.tools.Server
|
||||
|
||||
object H2Util {
|
||||
|
||||
val QUERY_TIME = 30
|
||||
val CREATE_FLOW_TABLE = "create table if not exists flow (id varchar(255), name varchar(255), state varchar(255), startTime varchar(255), endTime varchar(255))"
|
||||
val CREATE_STOP_TABLE = "create table if not exists stop (flowId varchar(255), name varchar(255), state varchar(255), startTime varchar(255), endTime varchar(255))"
|
||||
//val CONNECTION_URL = "jdbc:h2:tcp://" + PropertyUtil.getPropertyValue("server.ip") + ":9092/~/piflow"
|
||||
val CONNECTION_URL = "jdbc:h2:tcp://" + PropertyUtil.getPropertyValue("server.ip") + "/~/piflow"
|
||||
var connection : Connection= null
|
||||
|
||||
try{
|
||||
|
||||
val statement = getConnectionInstance().createStatement()
|
||||
statement.setQueryTimeout(QUERY_TIME)
|
||||
//statement.executeUpdate("drop table if exists flow")
|
||||
//statement.executeUpdate("drop table if exists stop")
|
||||
statement.executeUpdate(CREATE_FLOW_TABLE)
|
||||
statement.executeUpdate(CREATE_STOP_TABLE)
|
||||
statement.close()
|
||||
}catch {
|
||||
case ex => println(ex)
|
||||
}
|
||||
|
||||
def getConnectionInstance() : Connection = {
|
||||
if(connection == null){
|
||||
Class.forName("org.h2.Driver")
|
||||
connection = DriverManager.getConnection(CONNECTION_URL)
|
||||
}
|
||||
connection
|
||||
}
|
||||
|
||||
def addFlow(appId:String,name:String)={
|
||||
val startTime = new Date().toString
|
||||
val statement = getConnectionInstance().createStatement()
|
||||
statement.setQueryTimeout(QUERY_TIME)
|
||||
statement.executeUpdate("insert into flow(id, name) values('" + appId + "','" + name + "')")
|
||||
statement.close()
|
||||
}
|
||||
def updateFlowState(appId:String, state:String) = {
|
||||
val statement = getConnectionInstance().createStatement()
|
||||
statement.setQueryTimeout(QUERY_TIME)
|
||||
val updateSql = "update flow set state='" + state + "' where id='" + appId + "'"
|
||||
println(updateSql)
|
||||
statement.executeUpdate(updateSql)
|
||||
statement.close()
|
||||
}
|
||||
def updateFlowStartTime(appId:String) = {
|
||||
val startTime = new Date().toString
|
||||
val statement = getConnectionInstance().createStatement()
|
||||
statement.setQueryTimeout(QUERY_TIME)
|
||||
val updateSql = "update flow set startTime='" + startTime + "' where id='" + appId + "'"
|
||||
println(updateSql)
|
||||
statement.executeUpdate(updateSql)
|
||||
statement.close()
|
||||
}
|
||||
def updateFlowFinishedTime(appId:String) = {
|
||||
val endTime = new Date().toString
|
||||
val statement = getConnectionInstance().createStatement()
|
||||
statement.setQueryTimeout(QUERY_TIME)
|
||||
val updateSql = "update flow set endTime='" + endTime + "' where id='" + appId + "'"
|
||||
println(updateSql)
|
||||
statement.executeUpdate(updateSql)
|
||||
statement.close()
|
||||
}
|
||||
|
||||
def getFlowState(appId: String): String = {
|
||||
var state = ""
|
||||
val statement = getConnectionInstance().createStatement()
|
||||
statement.setQueryTimeout(QUERY_TIME)
|
||||
val rs : ResultSet = statement.executeQuery("select * from flow where id='" + appId +"'")
|
||||
while(rs.next()){
|
||||
state = rs.getString("state")
|
||||
println("id:" + rs.getString("id") + "\tname:" + rs.getString("name") + "\tstate:" + rs.getString("state"))
|
||||
}
|
||||
rs.close()
|
||||
statement.close()
|
||||
state
|
||||
}
|
||||
|
||||
def getFlowInfo(appId:String) : String = {
|
||||
val statement = getConnectionInstance().createStatement()
|
||||
statement.setQueryTimeout(QUERY_TIME)
|
||||
var flowInfo = ""
|
||||
|
||||
val flowRS : ResultSet = statement.executeQuery("select * from flow where id='" + appId +"'")
|
||||
while (flowRS.next()){
|
||||
|
||||
}
|
||||
|
||||
var stopList:List[String] = List()
|
||||
val rs : ResultSet = statement.executeQuery("select * from stop where flowId='" + appId +"'")
|
||||
while(rs.next()){
|
||||
val stopStr = "{\"stop\":{\"name\":" + rs.getString("name") +
|
||||
"\",\"state\":\"" + rs.getString("state") +
|
||||
"\",\"startTime\":\"" + rs.getString("startTime") +
|
||||
"\",\"endTime\":\"" + rs.getString("endTime") + "\"}}"
|
||||
println(stopStr)
|
||||
stopList = stopStr.toString +: stopList
|
||||
}
|
||||
rs.close()
|
||||
|
||||
statement.close()
|
||||
flowInfo
|
||||
}
|
||||
|
||||
|
||||
def getFlowProgress(appId:String) : String = {
|
||||
val statement = getConnectionInstance().createStatement()
|
||||
statement.setQueryTimeout(QUERY_TIME)
|
||||
|
||||
var stopCount = 0
|
||||
var completedStopCount = 0
|
||||
val totalRS : ResultSet = statement.executeQuery("select count(*) as stopCount from stop where flowId='" + appId +"'")
|
||||
while(totalRS.next()){
|
||||
stopCount = totalRS.getInt("stopCount")
|
||||
println("stopCount:" + stopCount)
|
||||
}
|
||||
totalRS.close()
|
||||
|
||||
val completedRS : ResultSet = statement.executeQuery("select count(*) as completedStopCount from stop where flowId='" + appId +"' and state='" + StopState.COMPLETED + "'")
|
||||
while(completedRS.next()){
|
||||
completedStopCount = completedRS.getInt("completedStopCount")
|
||||
println("completedStopCount:" + completedStopCount)
|
||||
}
|
||||
completedRS.close()
|
||||
statement.close()
|
||||
|
||||
val process:Double = completedStopCount.asInstanceOf[Double] / stopCount * 100
|
||||
process.toString
|
||||
}
|
||||
|
||||
def addStop(appId:String,name:String)={
|
||||
val statement = getConnectionInstance().createStatement()
|
||||
statement.setQueryTimeout(QUERY_TIME)
|
||||
statement.executeUpdate("insert into stop(flowId, name) values('" + appId + "','" + name + "')")
|
||||
statement.close()
|
||||
}
|
||||
def updateStopState(appId:String, name:String, state:String) = {
|
||||
val statement = getConnectionInstance().createStatement()
|
||||
statement.setQueryTimeout(QUERY_TIME)
|
||||
val updateSql = "update stop set state='" + state + "' where flowId='" + appId + "' and name='" + name + "'"
|
||||
println(updateSql)
|
||||
statement.executeUpdate(updateSql)
|
||||
statement.close()
|
||||
}
|
||||
|
||||
def updateStopStartTime(appId:String, name:String) = {
|
||||
val startTime = new Date().toString
|
||||
val statement = getConnectionInstance().createStatement()
|
||||
statement.setQueryTimeout(QUERY_TIME)
|
||||
val updateSql = "update stop set startTime='" + startTime + "' where flowId='" + appId + "' and name='" + name + "'"
|
||||
println(updateSql)
|
||||
statement.executeUpdate(updateSql)
|
||||
statement.close()
|
||||
}
|
||||
|
||||
def updateStopFinishedTime(appId:String, name:String) = {
|
||||
val endTime = new Date().toString
|
||||
val statement = getConnectionInstance().createStatement()
|
||||
statement.setQueryTimeout(QUERY_TIME)
|
||||
val updateSql = "update stop set endTime='" + endTime + "' where flowId='" + appId + "' and name='" + name + "'"
|
||||
println(updateSql)
|
||||
statement.executeUpdate(updateSql)
|
||||
statement.close()
|
||||
}
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
|
||||
try{
|
||||
|
||||
val appId = "111"
|
||||
addFlow(appId,"xjzhu")
|
||||
updateFlowState(appId,"running")
|
||||
val state2 = getFlowState(appId)
|
||||
|
||||
val stop1 = "stop1"
|
||||
val stop2 = "stop2"
|
||||
addStop(appId, stop1)
|
||||
updateStopState(appId,stop1,StopState.COMPLETED)
|
||||
addStop(appId, stop2)
|
||||
updateStopState(appId,stop2,StopState.STARTED)
|
||||
|
||||
|
||||
val process = getFlowProgress(appId)
|
||||
println("appId=" + appId + "'s process is " + process + "%")
|
||||
|
||||
}catch {
|
||||
case ex => println(ex)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -1,9 +1,9 @@
|
|||
package cn.piflow.api.util
|
||||
package cn.piflow.util
|
||||
|
||||
import java.io.{FileInputStream, InputStream}
|
||||
import java.util.Properties
|
||||
|
||||
object ContainerPropertyUtil {
|
||||
object PropertyUtil {
|
||||
private val prop: Properties = new Properties()
|
||||
var fis: InputStream = null
|
||||
var path :String = ""
|
|
@ -0,0 +1,8 @@
|
|||
package cn.piflow.util
|
||||
|
||||
object StopState {
|
||||
val INIT = "INIT"
|
||||
val STARTED = "STARTED"
|
||||
val COMPLETED = "COMPLETED"
|
||||
val FAILED = "FAILED"
|
||||
}
|
|
@ -13,6 +13,7 @@ import com.typesafe.config.ConfigFactory
|
|||
import scala.concurrent.Future
|
||||
import scala.util.parsing.json.JSON
|
||||
import org.apache.spark.launcher.SparkAppHandle
|
||||
import org.h2.tools.Server
|
||||
import spray.json.DefaultJsonProtocol
|
||||
|
||||
|
||||
|
@ -163,5 +164,6 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup
|
|||
object Main {
|
||||
def main(argv: Array[String]):Unit = {
|
||||
HTTPService.run
|
||||
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers").start()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
package cn.piflow.api
|
||||
|
||||
import cn.piflow.Runner
|
||||
import cn.piflow.api.util.ContainerPropertyUtil
|
||||
import cn.piflow.api.util.PropertyUtil
|
||||
import cn.piflow.conf.bean.FlowBean
|
||||
import cn.piflow.conf.util.OptionUtil
|
||||
import org.apache.spark.sql.SparkSession
|
||||
|
@ -29,7 +29,7 @@ object StartFlowMain {
|
|||
|
||||
val process = Runner.create()
|
||||
.bind(classOf[SparkSession].getName, spark)
|
||||
.bind("checkpoint.path",ContainerPropertyUtil.getPropertyValue("checkpoint.path"))
|
||||
.bind("checkpoint.path",PropertyUtil.getPropertyValue("checkpoint.path"))
|
||||
.start(flow);
|
||||
val applicationId = spark.sparkContext.applicationId
|
||||
process.awaitTermination();
|
||||
|
|
|
@ -11,7 +11,7 @@ object PropertyUtil {
|
|||
//val path = Thread.currentThread().getContextClassLoader.getResource("config.properties").getPath
|
||||
//fis = this.getClass.getResourceAsStream("")
|
||||
val userDir = System.getProperty("user.dir")
|
||||
path = userDir + "/conf/" + "config.properties"
|
||||
path = userDir + "/config.properties"
|
||||
prop.load(new FileInputStream(path))
|
||||
} catch{
|
||||
case ex: Exception => ex.printStackTrace()
|
||||
|
|
5
pom.xml
5
pom.xml
|
@ -54,6 +54,11 @@
|
|||
<artifactId>spark-yarn_2.11</artifactId>
|
||||
<version>${spark.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.h2database</groupId>
|
||||
<artifactId>h2</artifactId>
|
||||
<version>1.4.197</version>
|
||||
</dependency>
|
||||
<!--<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-client</artifactId>
|
||||
|
|
Loading…
Reference in New Issue