show flow debug data by path

This commit is contained in:
judy0131 2019-01-23 11:27:05 +08:00
parent 11a15d6ca7
commit b48ff0bfc2
7 changed files with 134 additions and 5 deletions

View File

@ -18,6 +18,7 @@ piflow.bundle=/opt/project/piflow/piflow-server/target/piflow-server-0.9.jar
yarn.url=http://10.0.86.191:8088/ws/v1/cluster/apps/
checkpoint.path=hdfs://10.0.86.89:9000/xjzhu/piflow/checkpoints/
debug.path=hdfs://10.0.86.89:9000/xjzhu/piflow/debug/
data.show=10

View File

@ -12,6 +12,7 @@ class FlowBean {
var name : String = _
var checkpoint : String = _
var checkpointParentProcessId : String = _
var runMode : String = _
var stops : List[StopBean] = List()
var paths : List[PathBean] = List()
@ -23,6 +24,7 @@ class FlowBean {
this.name = MapUtil.get(flowMap,"name").asInstanceOf[String]
this.checkpoint = flowMap.getOrElse("checkpoint","").asInstanceOf[String]
this.checkpointParentProcessId = flowMap.getOrElse("checkpointParentProcessId", "").asInstanceOf[String]
this.runMode = flowMap.getOrElse("runMode","RUN").asInstanceOf[String]
//construct StopBean List
val stopsList = MapUtil.get(flowMap,"stops").asInstanceOf[List[Map[String, Any]]]
@ -46,6 +48,7 @@ class FlowBean {
flow.setFlowName(this.name)
flow.setCheckpointParentProcessId(this.checkpointParentProcessId)
flow.setRunMode(this.runMode)
this.stops.foreach( stopBean => {
flow.addStop(stopBean.name,stopBean.constructStop())

View File

@ -4,8 +4,7 @@ import java.io.IOException
import java.net.URI
import java.util.concurrent.{CountDownLatch, TimeUnit}
import cn.piflow.util.PropertyUtil
import cn.piflow.util.{HadoopFileUtil, IdGenerator, Logging, PropertyUtil}
import cn.piflow.util._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
@ -77,6 +76,10 @@ trait Flow {
def getCheckpointParentProcessId() : String;
def setCheckpointParentProcessId(checkpointParentProcessId : String);
def getRunMode() : String;
def setRunMode( runMode : String) : Unit;
}
class FlowImpl extends Flow {
@ -85,6 +88,7 @@ class FlowImpl extends Flow {
val stops = MMap[String, Stop]();
val checkpoints = ArrayBuffer[String]();
var checkpointParentProcessId = ""
var runMode = ""
def addStop(name: String, process: Stop) = {
stops(name) = process;
@ -183,6 +187,14 @@ class FlowImpl extends Flow {
override def setCheckpointParentProcessId(checkpointParentProcessId : String) = {
this.checkpointParentProcessId = checkpointParentProcessId
}
override def getRunMode(): String = {
this.runMode
}
override def setRunMode(runMode: String): Unit = {
this.runMode = runMode
}
}
trait AnalyzedFlowGraph {
@ -323,6 +335,17 @@ class JobOutputStreamImpl() extends JobOutputStream with Logging {
})
}
def saveDataFrame(debugPath : String) = {
mapDataFrame.foreach(en => {
val portName = if(en._1.equals("")) "default" else en._1
val portDataPath = debugPath + "/" + portName
println(portDataPath)
en._2.apply().write.json(portDataPath)
})
}
}
class ProcessImpl(flow: Flow, runnerContext: Context, runner: Runner, parentProcess: Option[Process] = None)
@ -422,14 +445,19 @@ class ProcessImpl(flow: Flow, runnerContext: Context, runner: Runner, parentProc
var outputs : JobOutputStreamImpl = null
try {
runnerListener.onJobStarted(pe.getContext());
val debugPath = pe.getContext().get("debug.path").asInstanceOf[String].stripSuffix("/") + "/" + pe.getContext().getProcessContext().getProcess().pid() + "/" + pe.getContext().getStopJob().getStopName();
//new flow process
if (checkpointParentProcessId.equals("")) {
println("Visit process " + stopName + "!!!!!!!!!!!!!")
outputs = pe.perform(inputs);
runnerListener.onJobCompleted(pe.getContext());
//TODO: test
//TODO: need to test
outputs.showDataFrame()
if(flow.getRunMode() == FlowRunMode.DEBUG) {
outputs.saveDataFrame(debugPath)
}
if (flow.hasCheckPoint(stopName)) {
outputs.makeCheckPoint(pe.getContext());
}
@ -440,10 +468,21 @@ class ProcessImpl(flow: Flow, runnerContext: Context, runner: Runner, parentProc
val checkpointPath = pec.get("checkpoint.path").asInstanceOf[String].stripSuffix("/") + "/" + checkpointParentProcessId + "/" + pec.getStopJob().getStopName();
println("Visit process " + stopName + " by Checkpoint!!!!!!!!!!!!!")
outputs.loadCheckPoint(pe.getContext(),checkpointPath)
//TODO: need to test
outputs.showDataFrame()
if(flow.getRunMode() == FlowRunMode.DEBUG) {
outputs.saveDataFrame(debugPath)
}
runnerListener.onJobCompleted(pe.getContext());
}else{
println("Visit process " + stopName + "!!!!!!!!!!!!!")
outputs = pe.perform(inputs);
//TODO: need to test
outputs.showDataFrame()
if(flow.getRunMode() == FlowRunMode.DEBUG) {
outputs.saveDataFrame(debugPath)
}
runnerListener.onJobCompleted(pe.getContext());
}

View File

@ -0,0 +1,8 @@
package cn.piflow.util
object FlowRunMode {
val RUN = "RUN"
val DEBUG = "DEBUG"
}

View File

@ -1,15 +1,18 @@
package cn.piflow.api
import java.io.{File, FileOutputStream}
import java.net.URI
import java.text.SimpleDateFormat
import java.util.Date
import java.util.{Date, Properties}
import java.util.concurrent.CountDownLatch
import org.apache.spark.sql.SparkSession
import cn.piflow.conf.util.{ClassUtil, MapUtil, OptionUtil}
import cn.piflow.{Process, Runner}
import cn.piflow.api.util.PropertyUtil
import cn.piflow.api.util.{HdfsUtil, PropertyUtil}
import cn.piflow.util.{FlowState, H2Util, HadoopFileUtil}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet, HttpPost, HttpPut}
import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.HttpClients
@ -187,6 +190,27 @@ object API {
}
def getFlowDebugData(processID : String, stopName : String, port : String) : String = {
var result = ""
val debugPath = PropertyUtil.getPropertyValue("debug.path").stripSuffix("/") + "/" + processID + "/" + stopName + "/" + port;
val properties = new Properties()
val hdfs = FileSystem.get(URI.create(debugPath), new Configuration())
val fileList = HdfsUtil.getFilesInFolder(PropertyUtil.getPropertyValue("debug.path"), debugPath)
fileList.filter(!_.equals("_SUCCESS")).foreach( file => {
var stream = hdfs.open(new Path(file))
def readLines = Stream.cons(stream.readLine(),Stream.continually(stream.readLine()))
readLines.takeWhile( _ != null).foreach( line => {
println(line)
result += line + "\n"
})
})
result.stripSuffix("\n")
}
def getStopInfo(bundle : String) : String = {
try{

View File

@ -0,0 +1,20 @@
package cn.piflow.api
import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet}
import org.apache.http.impl.client.HttpClients
import org.apache.http.util.EntityUtils
object HTTPClientGetFlowDebugData {
def main(args: Array[String]): Unit = {
val url = "http://10.0.86.98:8001/flow/debugData?processID=process_104b9840-c163-4a38-bafb-749d2d66b8ad_1&stopName=Merge&port=default"
val client = HttpClients.createDefault()
val getFlowInfo:HttpGet = new HttpGet(url)
val response:CloseableHttpResponse = client.execute(getFlowInfo)
val entity = response.getEntity
val str = EntityUtils.toString(entity,"UTF-8")
println("Code is " + str)
}
}

View File

@ -0,0 +1,34 @@
package cn.piflow.api.util
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
object HdfsUtil {
def getFilesInFolder(hdfsUrl: String, path: String): List[String] = {
var result : List[String] = List()
val config = new Configuration()
config.set("fs.defaultFS",hdfsUrl)
val fs = FileSystem.get(config)
val listf = new Path(path)
val statuses: Array[FileStatus] = fs.listStatus(listf)
for (f <- statuses) {
val fsPath = f.getPath().toString
//println(fsPath)
if (f.isDirectory) {
result = fsPath::result
getFilesInFolder(hdfsUrl, fsPath)
} else{
result = f.getPath.toString::result
}
}
result
}
}