iptimize code

This commit is contained in:
judy0131 2019-01-23 14:17:34 +08:00
parent b48ff0bfc2
commit 2ef86d8097
1 changed files with 10 additions and 20 deletions

View File

@ -341,7 +341,7 @@ class JobOutputStreamImpl() extends JobOutputStream with Logging {
mapDataFrame.foreach(en => { mapDataFrame.foreach(en => {
val portName = if(en._1.equals("")) "default" else en._1 val portName = if(en._1.equals("")) "default" else en._1
val portDataPath = debugPath + "/" + portName val portDataPath = debugPath + "/" + portName
println(portDataPath) //println(portDataPath)
en._2.apply().write.json(portDataPath) en._2.apply().write.json(portDataPath)
}) })
} }
@ -451,16 +451,11 @@ class ProcessImpl(flow: Flow, runnerContext: Context, runner: Runner, parentProc
if (checkpointParentProcessId.equals("")) { if (checkpointParentProcessId.equals("")) {
println("Visit process " + stopName + "!!!!!!!!!!!!!") println("Visit process " + stopName + "!!!!!!!!!!!!!")
outputs = pe.perform(inputs); outputs = pe.perform(inputs);
runnerListener.onJobCompleted(pe.getContext());
//TODO: need to test
outputs.showDataFrame()
if(flow.getRunMode() == FlowRunMode.DEBUG) {
outputs.saveDataFrame(debugPath)
}
if (flow.hasCheckPoint(stopName)) { if (flow.hasCheckPoint(stopName)) {
outputs.makeCheckPoint(pe.getContext()); outputs.makeCheckPoint(pe.getContext());
} }
}else{//read checkpoint from old process }else{//read checkpoint from old process
if(flow.hasCheckPoint(stopName)){ if(flow.hasCheckPoint(stopName)){
val pec = pe.getContext() val pec = pe.getContext()
@ -468,25 +463,20 @@ class ProcessImpl(flow: Flow, runnerContext: Context, runner: Runner, parentProc
val checkpointPath = pec.get("checkpoint.path").asInstanceOf[String].stripSuffix("/") + "/" + checkpointParentProcessId + "/" + pec.getStopJob().getStopName(); val checkpointPath = pec.get("checkpoint.path").asInstanceOf[String].stripSuffix("/") + "/" + checkpointParentProcessId + "/" + pec.getStopJob().getStopName();
println("Visit process " + stopName + " by Checkpoint!!!!!!!!!!!!!") println("Visit process " + stopName + " by Checkpoint!!!!!!!!!!!!!")
outputs.loadCheckPoint(pe.getContext(),checkpointPath) outputs.loadCheckPoint(pe.getContext(),checkpointPath)
//TODO: need to test
outputs.showDataFrame()
if(flow.getRunMode() == FlowRunMode.DEBUG) {
outputs.saveDataFrame(debugPath)
}
runnerListener.onJobCompleted(pe.getContext());
}else{ }else{
println("Visit process " + stopName + "!!!!!!!!!!!!!") println("Visit process " + stopName + "!!!!!!!!!!!!!")
outputs = pe.perform(inputs); outputs = pe.perform(inputs);
//TODO: need to test
outputs.showDataFrame()
if(flow.getRunMode() == FlowRunMode.DEBUG) {
outputs.saveDataFrame(debugPath)
}
runnerListener.onJobCompleted(pe.getContext());
}
}
} }
//TODO: need to test
outputs.showDataFrame()
if(flow.getRunMode() == FlowRunMode.DEBUG) {
outputs.saveDataFrame(debugPath)
}
runnerListener.onJobCompleted(pe.getContext());
} }
catch { catch {
case e: Throwable => case e: Throwable =>