add checkpoint

This commit is contained in:
judy0131 2018-07-31 14:15:02 +08:00
parent 67d09e945e
commit 20a9bdd42e
2 changed files with 7 additions and 0 deletions

View File

@ -36,6 +36,7 @@ class FlowTest {
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "hdfs://10.0.86.89:9000/xjzhu/piflow/checkpoints/")
.start(flow);
process.awaitTermination();

View File

@ -12,6 +12,7 @@ class FlowBean {
/*@BeanProperty*/
var uuid : String = _
var name : String = _
var checkpoint : String = _
var stops : List[StopBean] = List()
var paths : List[PathBean] = List()
@ -21,6 +22,7 @@ class FlowBean {
this.uuid = MapUtil.get(flowMap,"uuid").asInstanceOf[String]
this.name = MapUtil.get(flowMap,"name").asInstanceOf[String]
this.checkpoint = flowMap.getOrElse("checkpoint","").asInstanceOf[String]
//construct StopBean List
val stopsList = MapUtil.get(flowMap,"stops").asInstanceOf[List[Map[String, Any]]]
@ -48,6 +50,10 @@ class FlowBean {
flow.addPath(Path.from(pathBean.from).via(pathBean.outport, pathBean.inport).to(pathBean.to))
})
if(!this.checkpoint.equals("")){
flow.addCheckPoint(this.checkpoint)
}
flow
}