diff --git a/config.properties b/config.properties index 61c91a5..4106c7f 100644 --- a/config.properties +++ b/config.properties @@ -18,6 +18,7 @@ piflow.bundle=/opt/project/piflow/piflow-server/target/piflow-server-0.9.jar yarn.url=http://10.0.86.191:8088/ws/v1/cluster/apps/ checkpoint.path=hdfs://10.0.86.89:9000/xjzhu/piflow/checkpoints/ +debug.path=hdfs://10.0.86.89:9000/xjzhu/piflow/debug/ data.show=10 diff --git a/piflow-bundle/src/main/scala/cn/piflow/conf/bean/FlowBean.scala b/piflow-bundle/src/main/scala/cn/piflow/conf/bean/FlowBean.scala index 8ed9a74..9306944 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/conf/bean/FlowBean.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/conf/bean/FlowBean.scala @@ -12,6 +12,7 @@ class FlowBean { var name : String = _ var checkpoint : String = _ var checkpointParentProcessId : String = _ + var runMode : String = _ var stops : List[StopBean] = List() var paths : List[PathBean] = List() @@ -23,6 +24,7 @@ class FlowBean { this.name = MapUtil.get(flowMap,"name").asInstanceOf[String] this.checkpoint = flowMap.getOrElse("checkpoint","").asInstanceOf[String] this.checkpointParentProcessId = flowMap.getOrElse("checkpointParentProcessId", "").asInstanceOf[String] + this.runMode = flowMap.getOrElse("runMode","RUN").asInstanceOf[String] //construct StopBean List val stopsList = MapUtil.get(flowMap,"stops").asInstanceOf[List[Map[String, Any]]] @@ -46,6 +48,7 @@ class FlowBean { flow.setFlowName(this.name) flow.setCheckpointParentProcessId(this.checkpointParentProcessId) + flow.setRunMode(this.runMode) this.stops.foreach( stopBean => { flow.addStop(stopBean.name,stopBean.constructStop()) diff --git a/piflow-core/src/main/scala/cn/piflow/main.scala b/piflow-core/src/main/scala/cn/piflow/main.scala index 5ba7ba3..9fc66ac 100644 --- a/piflow-core/src/main/scala/cn/piflow/main.scala +++ b/piflow-core/src/main/scala/cn/piflow/main.scala @@ -4,8 +4,7 @@ import java.io.IOException import java.net.URI import java.util.concurrent.{CountDownLatch, TimeUnit} -import cn.piflow.util.PropertyUtil -import cn.piflow.util.{HadoopFileUtil, IdGenerator, Logging, PropertyUtil} +import cn.piflow.util._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path @@ -77,6 +76,10 @@ trait Flow { def getCheckpointParentProcessId() : String; def setCheckpointParentProcessId(checkpointParentProcessId : String); + + def getRunMode() : String; + + def setRunMode( runMode : String) : Unit; } class FlowImpl extends Flow { @@ -85,6 +88,7 @@ class FlowImpl extends Flow { val stops = MMap[String, Stop](); val checkpoints = ArrayBuffer[String](); var checkpointParentProcessId = "" + var runMode = "" def addStop(name: String, process: Stop) = { stops(name) = process; @@ -183,6 +187,14 @@ class FlowImpl extends Flow { override def setCheckpointParentProcessId(checkpointParentProcessId : String) = { this.checkpointParentProcessId = checkpointParentProcessId } + + override def getRunMode(): String = { + this.runMode + } + + override def setRunMode(runMode: String): Unit = { + this.runMode = runMode + } } trait AnalyzedFlowGraph { @@ -323,6 +335,17 @@ class JobOutputStreamImpl() extends JobOutputStream with Logging { }) } + def saveDataFrame(debugPath : String) = { + + + mapDataFrame.foreach(en => { + val portName = if(en._1.equals("")) "default" else en._1 + val portDataPath = debugPath + "/" + portName + println(portDataPath) + en._2.apply().write.json(portDataPath) + }) + } + } class ProcessImpl(flow: Flow, runnerContext: Context, runner: Runner, parentProcess: Option[Process] = None) @@ -422,14 +445,19 @@ class ProcessImpl(flow: Flow, runnerContext: Context, runner: Runner, parentProc var outputs : JobOutputStreamImpl = null try { runnerListener.onJobStarted(pe.getContext()); + val debugPath = pe.getContext().get("debug.path").asInstanceOf[String].stripSuffix("/") + "/" + pe.getContext().getProcessContext().getProcess().pid() + "/" + pe.getContext().getStopJob().getStopName(); //new flow process if (checkpointParentProcessId.equals("")) { println("Visit process " + stopName + "!!!!!!!!!!!!!") outputs = pe.perform(inputs); runnerListener.onJobCompleted(pe.getContext()); - //TODO: test + //TODO: need to test outputs.showDataFrame() + if(flow.getRunMode() == FlowRunMode.DEBUG) { + outputs.saveDataFrame(debugPath) + } + if (flow.hasCheckPoint(stopName)) { outputs.makeCheckPoint(pe.getContext()); } @@ -440,10 +468,21 @@ class ProcessImpl(flow: Flow, runnerContext: Context, runner: Runner, parentProc val checkpointPath = pec.get("checkpoint.path").asInstanceOf[String].stripSuffix("/") + "/" + checkpointParentProcessId + "/" + pec.getStopJob().getStopName(); println("Visit process " + stopName + " by Checkpoint!!!!!!!!!!!!!") outputs.loadCheckPoint(pe.getContext(),checkpointPath) + //TODO: need to test + outputs.showDataFrame() + if(flow.getRunMode() == FlowRunMode.DEBUG) { + outputs.saveDataFrame(debugPath) + } + runnerListener.onJobCompleted(pe.getContext()); }else{ println("Visit process " + stopName + "!!!!!!!!!!!!!") outputs = pe.perform(inputs); + //TODO: need to test + outputs.showDataFrame() + if(flow.getRunMode() == FlowRunMode.DEBUG) { + outputs.saveDataFrame(debugPath) + } runnerListener.onJobCompleted(pe.getContext()); } diff --git a/piflow-core/src/main/scala/cn/piflow/util/FlowRunMode.scala b/piflow-core/src/main/scala/cn/piflow/util/FlowRunMode.scala new file mode 100644 index 0000000..d668a03 --- /dev/null +++ b/piflow-core/src/main/scala/cn/piflow/util/FlowRunMode.scala @@ -0,0 +1,8 @@ +package cn.piflow.util + +object FlowRunMode { + + val RUN = "RUN" + val DEBUG = "DEBUG" + +} diff --git a/piflow-server/src/main/scala/cn/piflow/api/API.scala b/piflow-server/src/main/scala/cn/piflow/api/API.scala index 498d4ee..f4aeea4 100644 --- a/piflow-server/src/main/scala/cn/piflow/api/API.scala +++ b/piflow-server/src/main/scala/cn/piflow/api/API.scala @@ -1,15 +1,18 @@ package cn.piflow.api import java.io.{File, FileOutputStream} +import java.net.URI import java.text.SimpleDateFormat -import java.util.Date +import java.util.{Date, Properties} import java.util.concurrent.CountDownLatch import org.apache.spark.sql.SparkSession import cn.piflow.conf.util.{ClassUtil, MapUtil, OptionUtil} import cn.piflow.{Process, Runner} -import cn.piflow.api.util.PropertyUtil +import cn.piflow.api.util.{HdfsUtil, PropertyUtil} import cn.piflow.util.{FlowState, H2Util, HadoopFileUtil} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet, HttpPost, HttpPut} import org.apache.http.entity.StringEntity import org.apache.http.impl.client.HttpClients @@ -187,6 +190,27 @@ object API { } + def getFlowDebugData(processID : String, stopName : String, port : String) : String = { + var result = "" + val debugPath = PropertyUtil.getPropertyValue("debug.path").stripSuffix("/") + "/" + processID + "/" + stopName + "/" + port; + val properties = new Properties() + val hdfs = FileSystem.get(URI.create(debugPath), new Configuration()) + + val fileList = HdfsUtil.getFilesInFolder(PropertyUtil.getPropertyValue("debug.path"), debugPath) + + fileList.filter(!_.equals("_SUCCESS")).foreach( file => { + var stream = hdfs.open(new Path(file)) + def readLines = Stream.cons(stream.readLine(),Stream.continually(stream.readLine())) + readLines.takeWhile( _ != null).foreach( line => { + + println(line) + result += line + "\n" + }) + }) + + result.stripSuffix("\n") + } + def getStopInfo(bundle : String) : String = { try{ diff --git a/piflow-server/src/main/scala/cn/piflow/api/HTTPClientGetFlowDebugData.scala b/piflow-server/src/main/scala/cn/piflow/api/HTTPClientGetFlowDebugData.scala new file mode 100644 index 0000000..5fb578c --- /dev/null +++ b/piflow-server/src/main/scala/cn/piflow/api/HTTPClientGetFlowDebugData.scala @@ -0,0 +1,20 @@ +package cn.piflow.api + +import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet} +import org.apache.http.impl.client.HttpClients +import org.apache.http.util.EntityUtils + +object HTTPClientGetFlowDebugData { + + def main(args: Array[String]): Unit = { + + val url = "http://10.0.86.98:8001/flow/debugData?processID=process_104b9840-c163-4a38-bafb-749d2d66b8ad_1&stopName=Merge&port=default" + val client = HttpClients.createDefault() + val getFlowInfo:HttpGet = new HttpGet(url) + + val response:CloseableHttpResponse = client.execute(getFlowInfo) + val entity = response.getEntity + val str = EntityUtils.toString(entity,"UTF-8") + println("Code is " + str) + } +} diff --git a/piflow-server/src/main/scala/cn/piflow/api/util/HdfsUtil.scala b/piflow-server/src/main/scala/cn/piflow/api/util/HdfsUtil.scala new file mode 100644 index 0000000..eabc1ec --- /dev/null +++ b/piflow-server/src/main/scala/cn/piflow/api/util/HdfsUtil.scala @@ -0,0 +1,34 @@ +package cn.piflow.api.util + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} + +object HdfsUtil { + + def getFilesInFolder(hdfsUrl: String, path: String): List[String] = { + var result : List[String] = List() + + val config = new Configuration() + config.set("fs.defaultFS",hdfsUrl) + val fs = FileSystem.get(config) + val listf = new Path(path) + + val statuses: Array[FileStatus] = fs.listStatus(listf) + + for (f <- statuses) { + val fsPath = f.getPath().toString + //println(fsPath) + + if (f.isDirectory) { + result = fsPath::result + getFilesInFolder(hdfsUrl, fsPath) + + } else{ + + result = f.getPath.toString::result + } + } + result + } + +}