From 2ef86d8097493944a902c9d6f73791d4b466293b Mon Sep 17 00:00:00 2001 From: judy0131 Date: Wed, 23 Jan 2019 14:17:34 +0800 Subject: [PATCH] iptimize code --- .../src/main/scala/cn/piflow/main.scala | 30 +++++++------------ 1 file changed, 10 insertions(+), 20 deletions(-) diff --git a/piflow-core/src/main/scala/cn/piflow/main.scala b/piflow-core/src/main/scala/cn/piflow/main.scala index 9fc66ac..366344d 100644 --- a/piflow-core/src/main/scala/cn/piflow/main.scala +++ b/piflow-core/src/main/scala/cn/piflow/main.scala @@ -341,7 +341,7 @@ class JobOutputStreamImpl() extends JobOutputStream with Logging { mapDataFrame.foreach(en => { val portName = if(en._1.equals("")) "default" else en._1 val portDataPath = debugPath + "/" + portName - println(portDataPath) + //println(portDataPath) en._2.apply().write.json(portDataPath) }) } @@ -451,16 +451,11 @@ class ProcessImpl(flow: Flow, runnerContext: Context, runner: Runner, parentProc if (checkpointParentProcessId.equals("")) { println("Visit process " + stopName + "!!!!!!!!!!!!!") outputs = pe.perform(inputs); - runnerListener.onJobCompleted(pe.getContext()); - //TODO: need to test - outputs.showDataFrame() - if(flow.getRunMode() == FlowRunMode.DEBUG) { - outputs.saveDataFrame(debugPath) - } if (flow.hasCheckPoint(stopName)) { outputs.makeCheckPoint(pe.getContext()); } + }else{//read checkpoint from old process if(flow.hasCheckPoint(stopName)){ val pec = pe.getContext() @@ -468,25 +463,20 @@ class ProcessImpl(flow: Flow, runnerContext: Context, runner: Runner, parentProc val checkpointPath = pec.get("checkpoint.path").asInstanceOf[String].stripSuffix("/") + "/" + checkpointParentProcessId + "/" + pec.getStopJob().getStopName(); println("Visit process " + stopName + " by Checkpoint!!!!!!!!!!!!!") outputs.loadCheckPoint(pe.getContext(),checkpointPath) - //TODO: need to test - outputs.showDataFrame() - if(flow.getRunMode() == FlowRunMode.DEBUG) { - outputs.saveDataFrame(debugPath) - } - runnerListener.onJobCompleted(pe.getContext()); }else{ println("Visit process " + stopName + "!!!!!!!!!!!!!") outputs = pe.perform(inputs); - //TODO: need to test - outputs.showDataFrame() - if(flow.getRunMode() == FlowRunMode.DEBUG) { - outputs.saveDataFrame(debugPath) - } - runnerListener.onJobCompleted(pe.getContext()); - } + } } + //TODO: need to test + outputs.showDataFrame() + if(flow.getRunMode() == FlowRunMode.DEBUG) { + outputs.saveDataFrame(debugPath) + } + + runnerListener.onJobCompleted(pe.getContext()); } catch { case e: Throwable =>