Merge remote-tracking branch 'origin/master'
This commit is contained in:
commit
5828c2b5bc
|
@ -268,7 +268,7 @@
|
|||
</execution>
|
||||
|
||||
<execution>
|
||||
<id>install-external-2</id>
|
||||
<id>install-external-3</id>
|
||||
<goals>
|
||||
<goal>install-file</goal>
|
||||
</goals>
|
||||
|
|
|
@ -0,0 +1,128 @@
|
|||
{
|
||||
"flow":{
|
||||
"name":"test",
|
||||
"uuid":"1234",
|
||||
"checkpoint":"Merge",
|
||||
"checkpointParentProcessId":"process_3ae4741c-3aca-4254-8d2e-71b6045b083c_1",
|
||||
"stops":[
|
||||
{
|
||||
"uuid":"1111",
|
||||
"name":"XmlParser",
|
||||
"bundle":"cn.piflow.bundle.xml.XmlParser",
|
||||
"properties":{
|
||||
"xmlpath":"hdfs://10.0.86.89:9000/xjzhu/dblp.mini.xml",
|
||||
"rowTag":"phdthesis"
|
||||
}
|
||||
},
|
||||
{
|
||||
"uuid":"2222",
|
||||
"name":"SelectField",
|
||||
"bundle":"cn.piflow.bundle.common.SelectField",
|
||||
"properties":{
|
||||
"schema":"title,author,pages"
|
||||
}
|
||||
|
||||
},
|
||||
{
|
||||
"uuid":"3333",
|
||||
"name":"PutHiveStreaming",
|
||||
"bundle":"cn.piflow.bundle.hive.PutHiveStreaming",
|
||||
"properties":{
|
||||
"database":"sparktest",
|
||||
"table":"dblp_phdthesis"
|
||||
}
|
||||
},
|
||||
{
|
||||
"uuid":"4444",
|
||||
"name":"CsvParser",
|
||||
"bundle":"cn.piflow.bundle.csv.CsvParser",
|
||||
"properties":{
|
||||
"csvPath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.csv",
|
||||
"header":"false",
|
||||
"delimiter":",",
|
||||
"schema":"title,author,pages"
|
||||
}
|
||||
},
|
||||
{
|
||||
"uuid":"555",
|
||||
"name":"Merge",
|
||||
"bundle":"cn.piflow.bundle.common.Merge",
|
||||
"properties":{
|
||||
"inports":"data1,data2"
|
||||
}
|
||||
|
||||
},
|
||||
{
|
||||
"uuid":"666",
|
||||
"name":"Fork",
|
||||
"bundle":"cn.piflow.bundle.common.Fork",
|
||||
"properties":{
|
||||
"outports":"out1,out2,out3"
|
||||
}
|
||||
},
|
||||
{
|
||||
"uuid":"777",
|
||||
"name":"JsonSave",
|
||||
"bundle":"cn.piflow.bundle.json.JsonSave",
|
||||
"properties":{
|
||||
"jsonSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.json"
|
||||
}
|
||||
},
|
||||
{
|
||||
"uuid":"888",
|
||||
"name":"CsvSave",
|
||||
"bundle":"cn.piflow.bundle.csv.CsvSave",
|
||||
"properties":{
|
||||
"csvSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis_result.csv",
|
||||
"header":"true",
|
||||
"delimiter":","
|
||||
}
|
||||
}
|
||||
],
|
||||
"paths":[
|
||||
{
|
||||
"from":"XmlParser",
|
||||
"outport":"",
|
||||
"inport":"",
|
||||
"to":"SelectField"
|
||||
},
|
||||
{
|
||||
"from":"SelectField",
|
||||
"outport":"",
|
||||
"inport":"data1",
|
||||
"to":"Merge"
|
||||
},
|
||||
{
|
||||
"from":"CsvParser",
|
||||
"outport":"",
|
||||
"inport":"data2",
|
||||
"to":"Merge"
|
||||
},
|
||||
{
|
||||
"from":"Merge",
|
||||
"outport":"",
|
||||
"inport":"",
|
||||
"to":"Fork"
|
||||
},
|
||||
{
|
||||
"from":"Fork",
|
||||
"outport":"out1",
|
||||
"inport":"",
|
||||
"to":"PutHiveStreaming"
|
||||
},
|
||||
{
|
||||
"from":"Fork",
|
||||
"outport":"out2",
|
||||
"inport":"",
|
||||
"to":"JsonSave"
|
||||
},
|
||||
{
|
||||
"from":"Fork",
|
||||
"outport":"out3",
|
||||
"inport":"",
|
||||
"to":"CsvSave"
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
|
Binary file not shown.
After Width: | Height: | Size: 2.9 KiB |
|
@ -14,7 +14,7 @@ class CsvStringParser extends ConfigurableStop{
|
|||
override val authorEmail: String = "yangqidong@cnic.cn"
|
||||
val inportList: List[String] = List(PortEnum.NonePort.toString)
|
||||
val outportList: List[String] = List(PortEnum.DefaultPort.toString)
|
||||
override val description: String = ""
|
||||
override val description: String = "Parsing for CSV strings"
|
||||
|
||||
|
||||
var Str:String=_
|
||||
|
|
|
@ -14,7 +14,7 @@ class FolderCsvParser extends ConfigurableStop{
|
|||
override val authorEmail: String = "yangqidong@cnic.cn"
|
||||
val inportList: List[String] = List(PortEnum.NonePort.toString)
|
||||
val outportList: List[String] = List(PortEnum.DefaultPort.toString)
|
||||
override val description: String = ""
|
||||
override val description: String = "Parsing of CSV folder"
|
||||
|
||||
|
||||
var FolderPath:String=_
|
||||
|
@ -22,14 +22,9 @@ class FolderCsvParser extends ConfigurableStop{
|
|||
var schema: String = _
|
||||
|
||||
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
|
||||
|
||||
println("kkkkkkkkkkkkkkkk")
|
||||
|
||||
val session: SparkSession = pec.get[SparkSession]
|
||||
val context: SparkContext = session.sparkContext
|
||||
|
||||
|
||||
|
||||
val StrRDD: RDD[String] = context.textFile(FolderPath)
|
||||
var num:Int=0
|
||||
var s:String=delimiter
|
||||
|
@ -41,8 +36,6 @@ class FolderCsvParser extends ConfigurableStop{
|
|||
row
|
||||
})
|
||||
|
||||
|
||||
|
||||
val schameARR: Array[String] = schema.split(",")
|
||||
val fields: Array[StructField] = schameARR.map(d=>StructField(d.trim,StringType,nullable = true))
|
||||
|
||||
|
@ -62,7 +55,6 @@ class FolderCsvParser extends ConfigurableStop{
|
|||
|
||||
}
|
||||
|
||||
|
||||
override def setProperties(map: Map[String, Any]): Unit = {
|
||||
FolderPath = MapUtil.get(map,"csvPath").asInstanceOf[String]
|
||||
delimiter = MapUtil.get(map,"delimiter").asInstanceOf[String]
|
||||
|
|
|
@ -61,7 +61,7 @@ class LabelPropagation extends ConfigurableStop {
|
|||
}
|
||||
|
||||
override def getGroup(): List[String] = {
|
||||
List(StopGroupEnum.GraphXGroup.toString)
|
||||
List(StopGroupEnum.GraphX.toString)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -55,7 +55,7 @@ class LoadGraph extends ConfigurableStop {
|
|||
}
|
||||
|
||||
override def getGroup(): List[String] = {
|
||||
List(StopGroupEnum.GraphXGroup.toString)
|
||||
List(StopGroupEnum.GraphX.toString)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -29,13 +29,10 @@ class SelectImpala extends ConfigurableStop{
|
|||
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
|
||||
val session: SparkSession = pec.get[SparkSession]()
|
||||
|
||||
//jdbc:hive2://10.0.82.165:21050/;auth=noSasl
|
||||
|
||||
Class.forName("org.apache.hive.jdbc.HiveDriver")
|
||||
|
||||
val con: Connection = DriverManager.getConnection("jdbc:hive2://"+url+"/;auth=noSasl",user,password)
|
||||
val stmt: Statement = con.createStatement()
|
||||
// val rs: ResultSet = stmt.executeQuery("select * from kylin.test1 full join kylin.morg on kylin.test1.pid=kylin.morg.belongtocode")
|
||||
val rs: ResultSet = stmt.executeQuery(sql)
|
||||
|
||||
val filedNames: Array[String] = schameString.split(",")
|
||||
|
@ -81,9 +78,9 @@ class SelectImpala extends ConfigurableStop{
|
|||
|
||||
val url=new PropertyDescriptor().name("url").displayName("url").description("IP and port number, you need to write like this -- ip:port").defaultValue("").required(true)
|
||||
descriptor = url :: descriptor
|
||||
val user=new PropertyDescriptor().name("user").displayName("user").description("").defaultValue("").required(false)
|
||||
val user=new PropertyDescriptor().name("user").displayName("user").description("user").defaultValue("").required(false)
|
||||
descriptor = user :: descriptor
|
||||
val password=new PropertyDescriptor().name("password").displayName("password").description("").defaultValue("").required(false)
|
||||
val password=new PropertyDescriptor().name("password").displayName("password").description("password").defaultValue("").required(false)
|
||||
descriptor = password :: descriptor
|
||||
val sql=new PropertyDescriptor().name("sql").displayName("sql").description("The name of the table has not changed.But you have to specify which database, such as database.table.").defaultValue("").required(true)
|
||||
descriptor = sql :: descriptor
|
||||
|
@ -101,8 +98,8 @@ class SelectImpala extends ConfigurableStop{
|
|||
List(StopGroupEnum.Mongodb.toString)
|
||||
}
|
||||
|
||||
|
||||
override def initialize(ctx: ProcessContext): Unit = {
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -40,9 +40,7 @@ class spider extends ConfigurableStop{
|
|||
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
|
||||
val session: SparkSession = pec.get[SparkSession]()
|
||||
|
||||
//解析第一层界面,得到跳转到第二层界面所需的href所在标签的ele集合
|
||||
var doc: Document =null
|
||||
//html>table>a>b/2/3 将用户提供的标签路径解析
|
||||
val selectArr: Array[String] = jumpDependence.split("/")
|
||||
try{
|
||||
doc = Jsoup.connect(firstUrl).timeout(50000).get()
|
||||
|
@ -55,42 +53,31 @@ class spider extends ConfigurableStop{
|
|||
}catch {
|
||||
case e:Exception => throw new RuntimeException("JumpDependence specified label path does not exist."+"\n"+"jumpDependence指定的标签路径不存在")
|
||||
}
|
||||
//循环集合得到每一个text作为标记字段,得到href作为跳转依赖
|
||||
var choiceIntArr: Array[Int] =0.until(eles.size()).toArray
|
||||
// var choiceIntArr: Array[Int] =0.until(15).toArray
|
||||
|
||||
//若用户制定了截取位置,从客户指定位置开始循环
|
||||
if(selectArr.size>1){
|
||||
choiceIntArr=selectArr(1).toInt.until(eles.size()).toArray
|
||||
}
|
||||
for(x <- choiceIntArr){
|
||||
var num=x
|
||||
//若用户指定了选择方式,按用户指定的方式选取标签
|
||||
if(selectArr.size==3){
|
||||
num = x + countInt * (selectArr(2).toInt)
|
||||
}
|
||||
val ele = eles.get(num)
|
||||
//将标记字段存储,区分每条数据
|
||||
var textStr: String = ele.text()
|
||||
if(textStr.length==0){
|
||||
textStr = " "
|
||||
throw new RuntimeException("Label field no value"+"\n"+"标记字段无值")
|
||||
}
|
||||
map+=(markupField->textStr)
|
||||
// 得到href,分析第二层界面
|
||||
val hrefStr: String = ele.attr("href")
|
||||
//将用户需要的每一个字段或者下载路径分离
|
||||
val strArr: Array[String] = fileMap.split("\\+")
|
||||
parseNext(rootUrl+hrefStr,strArr)
|
||||
//将map添加进去array,并清空map,开始下一条数据的存储
|
||||
array+=map
|
||||
/*map.empty*/
|
||||
|
||||
countInt+=1
|
||||
}
|
||||
//将计数器清空,留待下次使用
|
||||
countInt=0
|
||||
//循环爬取结束,将所得数据存储为df
|
||||
var keySet: Set[String] =Set()
|
||||
val rows1: List[Row] = array.toList.map(map => {
|
||||
keySet = map.keySet
|
||||
|
@ -104,7 +91,7 @@ class spider extends ConfigurableStop{
|
|||
val fields: Array[StructField] = keySet.toArray.map(d=>StructField(d,StringType,nullable = true))
|
||||
val schema: StructType = StructType(fields)
|
||||
val df: DataFrame = session.createDataFrame(rowRDD,schema)
|
||||
|
||||
println("%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%")
|
||||
df.show(10)
|
||||
println("%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%")
|
||||
|
||||
|
@ -112,10 +99,7 @@ class spider extends ConfigurableStop{
|
|||
}
|
||||
|
||||
|
||||
|
||||
//二层界面的解析
|
||||
def parseNext(url:String,strArr: Array[String]): Unit = {
|
||||
// 获取doc对象
|
||||
var doc: Document = null
|
||||
try {
|
||||
doc = Jsoup.connect(url).timeout(50000).get()
|
||||
|
@ -123,7 +107,6 @@ class spider extends ConfigurableStop{
|
|||
} catch {
|
||||
case e: IllegalArgumentException => throw new RuntimeException("The two level interface path does not exist."+"\n"+"二层界面路径不存在")
|
||||
}
|
||||
//循环用户需要的每一个字段
|
||||
var textDownFile:File =null
|
||||
for(each <- strArr){
|
||||
val fileMapArr: Array[String] = each.split("/")
|
||||
|
@ -135,21 +118,17 @@ class spider extends ConfigurableStop{
|
|||
val rangeArr: Array[String] = fileMapArr(4).split("-")
|
||||
for(r <- (rangeArr(0).toInt until rangeArr(1).toInt) ){
|
||||
val eachFileEle: Element = downList.get(r)
|
||||
// 建立每个数据集的本地文件夹
|
||||
if(downPath.size==0){
|
||||
downPath="/InternetWormDown/"
|
||||
}
|
||||
map+=("downPath"->downPath)
|
||||
//获取当前时间,并记录
|
||||
val nowDate: String = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())
|
||||
map+=("downDate"->nowDate)
|
||||
textDownFile = new File(downPath+map.get(markupField).get)
|
||||
if(! textDownFile.exists()){
|
||||
textDownFile.mkdirs()
|
||||
}
|
||||
//建立文件
|
||||
val eachFile: File = new File(downPath+map.get(markupField).get+"/"+eachFileEle.text())
|
||||
// 如果本地文件不存在,写入本地,若存在,跳过此文件
|
||||
if(!eachFile.exists()){
|
||||
val in: InputStream = new URL(url+eachFileEle.attr("href")).openStream()
|
||||
val out: BufferedOutputStream = new BufferedOutputStream(new FileOutputStream(eachFile),10485760)
|
||||
|
@ -168,7 +147,6 @@ class spider extends ConfigurableStop{
|
|||
|
||||
}
|
||||
}else{
|
||||
//循环用户给定的需求字段名称和标签地址,依次存储au
|
||||
if(fileMapArr.size > 2){
|
||||
map+=(fileMapArr(0) -> numEle.text())
|
||||
}else{
|
||||
|
@ -176,13 +154,6 @@ class spider extends ConfigurableStop{
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// for(key <- fileMap.keySet){
|
||||
// //将用户给的标签路径和选择条件分离
|
||||
// val selects = fileMap.get(key).get.split("/")
|
||||
// val value: String = doc.select(selects(0)).get(selects(1).toInt).text()
|
||||
// map+=(key -> value)
|
||||
}
|
||||
|
||||
override def setProperties(map: Map[String, Any]): Unit = {
|
||||
|
|
|
@ -134,16 +134,6 @@ class JdbcReadFromOracle extends ConfigurableStop{
|
|||
df.show(20)
|
||||
println("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@")
|
||||
|
||||
/*
|
||||
val v: Any = df.collect()(0).get(2)
|
||||
println("3333333333333333333333"+v.getClass)
|
||||
val ab: Array[Byte] = v.asInstanceOf[Seq[Byte]].toArray
|
||||
|
||||
val fos: FileOutputStream = new FileOutputStream(new File("/aa.txt"))
|
||||
fos.write(ab)
|
||||
fos.close()
|
||||
*/
|
||||
|
||||
out.write(df)
|
||||
|
||||
|
||||
|
|
|
@ -51,9 +51,6 @@ class JdbcWriteToOracle extends ConfigurableStop{
|
|||
|
||||
star.executeUpdate(insertSQL)
|
||||
})
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
def initialize(ctx: ProcessContext): Unit = {
|
||||
|
|
|
@ -18,15 +18,15 @@ class ComplementByMemcache extends ConfigurableStop {
|
|||
val inportList: List[String] = List(PortEnum.DefaultPort.toString)
|
||||
val outportList: List[String] = List(PortEnum.DefaultPort.toString)
|
||||
|
||||
var servers:String=_ //服务器地址和端口号Server address and port number,If you have multiple servers, use "," segmentation.
|
||||
var keyFile:String=_ //你想用来作为查询条件的字段The field you want to use as a query condition
|
||||
var weights:String=_ //每台服务器的权重Weight of each server
|
||||
var maxIdle:String=_ //最大处理时间Maximum processing time
|
||||
var maintSleep:String=_ //主线程睡眠时间Main thread sleep time
|
||||
var nagle:String=_ //socket参数,若为true,则写数据时不缓冲立即发送If the socket parameter is true, the data is not buffered and sent immediately.
|
||||
var socketTO:String=_ //socket阻塞时候的超时时间Socket timeout during blocking
|
||||
var socketConnectTO:String=_ //连接建立时的超时控制Timeout control during connection establishment
|
||||
var replaceField:String=_ //你想得到的字段The fields you want to get
|
||||
var servers:String=_ //Server address and port number,If you have multiple servers, use "," segmentation.
|
||||
var keyFile:String=_ //The field you want to use as a query condition
|
||||
var weights:String=_ //Weight of each server
|
||||
var maxIdle:String=_ //Maximum processing time
|
||||
var maintSleep:String=_ //Main thread sleep time
|
||||
var nagle:String=_ //If the socket parameter is true, the data is not buffered and sent immediately.
|
||||
var socketTO:String=_ //Socket timeout during blocking
|
||||
var socketConnectTO:String=_ //Timeout control during connection establishment
|
||||
var replaceField:String=_ //The fields you want to get
|
||||
|
||||
|
||||
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
|
||||
|
@ -35,10 +35,8 @@ class ComplementByMemcache extends ConfigurableStop {
|
|||
|
||||
val mcc: MemCachedClient =getMcc()
|
||||
|
||||
//获得输出df的描述信息
|
||||
val replaceFields: mutable.Buffer[String] = replaceField.split(",").toBuffer
|
||||
|
||||
//获取inDF中所有数据到数组,用于更改数据
|
||||
val rowArr: Array[Row] = inDF.collect()
|
||||
val fileNames: Array[String] = inDF.columns
|
||||
val data: Array[Map[String, String]] = rowArr.map(row => {
|
||||
|
@ -51,11 +49,9 @@ class ComplementByMemcache extends ConfigurableStop {
|
|||
map
|
||||
})
|
||||
|
||||
//查询memcache中数据,并按用户需求替换df中数据
|
||||
val finalData: Array[Map[String, String]] = data.map(eachData => {
|
||||
var d: Map[String, String] = eachData
|
||||
val anyRef: AnyRef = mcc.get(d.get(keyFile).get)
|
||||
//当从memcache中得到数据时,替换
|
||||
if(anyRef.getClass.toString.equals("class scala.Some")){
|
||||
val map: Map[String, String] = anyRef.asInstanceOf[Map[String, String]]
|
||||
for (f <- replaceFields) {
|
||||
|
@ -65,7 +61,6 @@ class ComplementByMemcache extends ConfigurableStop {
|
|||
d
|
||||
})
|
||||
|
||||
//将schame和数据转换为df
|
||||
var arrKey: Array[String] = Array()
|
||||
val rows: List[Row] = finalData.toList.map(map => {
|
||||
arrKey = map.keySet.toArray
|
||||
|
@ -89,11 +84,8 @@ class ComplementByMemcache extends ConfigurableStop {
|
|||
}
|
||||
|
||||
|
||||
//得到全局唯一实例
|
||||
def getMcc(): MemCachedClient = {
|
||||
//获取连接池实例对象
|
||||
val pool: SockIOPool = SockIOPool.getInstance()
|
||||
// 链接到数据库
|
||||
var serversArr:Array[String]=servers.split(",")
|
||||
pool.setServers(serversArr)
|
||||
|
||||
|
@ -118,7 +110,6 @@ class ComplementByMemcache extends ConfigurableStop {
|
|||
}
|
||||
|
||||
pool.initialize()
|
||||
//建立全局唯一实例
|
||||
val mcc: MemCachedClient = new MemCachedClient()
|
||||
mcc
|
||||
}
|
||||
|
|
|
@ -19,15 +19,15 @@ class GetMemcache extends ConfigurableStop{
|
|||
val inportList: List[String] = List(PortEnum.DefaultPort.toString)
|
||||
val outportList: List[String] = List(PortEnum.DefaultPort.toString)
|
||||
|
||||
var servers:String=_ //服务器地址和端口号Server address and port number,If you have multiple servers, use "," segmentation.
|
||||
var keyFile:String=_ //你想用来作为查询条件的字段The field you want to use as a query condition
|
||||
var weights:String=_ //每台服务器的权重Weight of each server
|
||||
var maxIdle:String=_ //最大处理时间Maximum processing time
|
||||
var maintSleep:String=_ //主线程睡眠时间Main thread sleep time
|
||||
var nagle:String=_ //socket参数,若为true,则写数据时不缓冲立即发送If the socket parameter is true, the data is not buffered and sent immediately.
|
||||
var socketTO:String=_ //socket阻塞时候的超时时间Socket timeout during blocking
|
||||
var socketConnectTO:String=_ //连接建立时的超时控制Timeout control during connection establishment
|
||||
var schame:String=_ //你想得到的字段The fields you want to get
|
||||
var servers:String=_ //Server address and port number,If you have multiple servers, use "," segmentation.
|
||||
var keyFile:String=_ //The field you want to use as a query condition
|
||||
var weights:String=_ //Weight of each server
|
||||
var maxIdle:String=_ //Maximum processing time
|
||||
var maintSleep:String=_ //Main thread sleep time
|
||||
var nagle:String=_ //If the socket parameter is true, the data is not buffered and sent immediately.
|
||||
var socketTO:String=_ //Socket timeout during blocking
|
||||
var socketConnectTO:String=_ //Timeout control during connection establishment
|
||||
var schame:String=_ //The fields you want to get
|
||||
|
||||
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
|
||||
val session: SparkSession = pec.get[SparkSession]()
|
||||
|
@ -42,21 +42,17 @@ class GetMemcache extends ConfigurableStop{
|
|||
str.substring(1,str.length-1)
|
||||
})
|
||||
|
||||
//获得输出df的描述信息
|
||||
var schameArr:Array[String] =Array()
|
||||
//若用户指定返回信息,返回用户需要的信息
|
||||
if(schame.length>0){
|
||||
val schameArrBuff: mutable.Buffer[String] = schame.split(",").toBuffer
|
||||
schameArrBuff.insert(0,keyFile)
|
||||
schameArr = schameArrBuff.toArray
|
||||
}
|
||||
|
||||
//获得输出df的数据
|
||||
var allFileDatas: ArrayBuffer[ArrayBuffer[String]] =ArrayBuffer()
|
||||
for(keyNum <- (0 until keys.size)){
|
||||
val map: Map[String, String] = mcc.get(keys(keyNum)).asInstanceOf[Map[String,String]]
|
||||
|
||||
//若用户没有指定返回字段,则将memcache中查到的数据全部返回
|
||||
if(schame.size==0){
|
||||
val arr: Array[String] = map.keySet.toArray
|
||||
val buffer: mutable.Buffer[String] = arr.toBuffer
|
||||
|
@ -64,7 +60,6 @@ class GetMemcache extends ConfigurableStop{
|
|||
schameArr = buffer.toArray
|
||||
}
|
||||
|
||||
//记录这一条数据的所有值
|
||||
var values: ArrayBuffer[String] =ArrayBuffer()
|
||||
values+=keys(keyNum)
|
||||
for(x <- (1 until schameArr.size)){
|
||||
|
@ -73,7 +68,6 @@ class GetMemcache extends ConfigurableStop{
|
|||
allFileDatas+=values
|
||||
}
|
||||
|
||||
//将schame和数据转换为df
|
||||
val rowList: List[Row] = allFileDatas.map(arr => {Row.fromSeq(arr)}).toList
|
||||
val rowRDD: RDD[Row] = session.sparkContext.makeRDD(rowList)
|
||||
val fields: Array[StructField] = schameArr.map(d=>StructField(d,StringType,nullable = true))
|
||||
|
@ -87,11 +81,8 @@ class GetMemcache extends ConfigurableStop{
|
|||
out.write(df)
|
||||
}
|
||||
|
||||
//得到全局唯一实例
|
||||
def getMcc(): MemCachedClient = {
|
||||
//获取连接池实例对象
|
||||
val pool: SockIOPool = SockIOPool.getInstance()
|
||||
// 链接到数据库
|
||||
var serversArr:Array[String]=servers.split(",")
|
||||
pool.setServers(serversArr)
|
||||
|
||||
|
@ -116,7 +107,6 @@ class GetMemcache extends ConfigurableStop{
|
|||
}
|
||||
|
||||
pool.initialize()
|
||||
//建立全局唯一实例
|
||||
val mcc: MemCachedClient = new MemCachedClient()
|
||||
mcc
|
||||
}
|
||||
|
|
|
@ -9,18 +9,18 @@ import org.apache.spark.sql.{DataFrame, Row, SparkSession}
|
|||
|
||||
class PutMemcache extends ConfigurableStop{
|
||||
override val authorEmail: String = "yangqidong@cnic.cn"
|
||||
override val description: String = "get data from memcache"
|
||||
override val description: String = "put data into memcache"
|
||||
val inportList: List[String] = List(PortEnum.DefaultPort.toString)
|
||||
val outportList: List[String] = List(PortEnum.NonePort.toString)
|
||||
|
||||
var servers:String=_ //服务器地址和端口号Server address and port number,If you have multiple servers, use "," segmentation.
|
||||
var keyFile:String=_ //你想用来作为key的字段You want to be used as a field for key.
|
||||
var weights:String=_ //每台服务器的权重Weight of each server
|
||||
var maxIdle:String=_ //最大处理时间Maximum processing time
|
||||
var maintSleep:String=_ //主线程睡眠时间Main thread sleep time
|
||||
var nagle:String=_ //socket参数,若为true,则写数据时不缓冲立即发送If the socket parameter is true, the data is not buffered and sent immediately.
|
||||
var socketTO:String=_ //socket阻塞时候的超时时间Socket timeout during blocking
|
||||
var socketConnectTO:String=_ //连接建立时的超时控制Timeout control during connection establishment
|
||||
var servers:String=_ //Server address and port number,If you have multiple servers, use "," segmentation.
|
||||
var keyFile:String=_ //You want to be used as a field for key.
|
||||
var weights:String=_ //Weight of each server
|
||||
var maxIdle:String=_ //Maximum processing time
|
||||
var maintSleep:String=_ //Main thread sleep time
|
||||
var nagle:String=_ //If the socket parameter is true, the data is not buffered and sent immediately.
|
||||
var socketTO:String=_ //Socket timeout during blocking
|
||||
var socketConnectTO:String=_ //Timeout control during connection establishment
|
||||
|
||||
|
||||
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
|
||||
|
@ -28,9 +28,7 @@ class PutMemcache extends ConfigurableStop{
|
|||
val session: SparkSession = pec.get[SparkSession]()
|
||||
val inDF: DataFrame = in.read()
|
||||
|
||||
//获取连接池实例对象
|
||||
val pool: SockIOPool = SockIOPool.getInstance()
|
||||
// 链接到数据库
|
||||
var serversArr:Array[String]=servers.split(",")
|
||||
pool.setServers(serversArr)
|
||||
|
||||
|
@ -55,7 +53,6 @@ class PutMemcache extends ConfigurableStop{
|
|||
}
|
||||
|
||||
pool.initialize()
|
||||
//建立全局唯一实例
|
||||
val mcc: MemCachedClient = new MemCachedClient()
|
||||
|
||||
val fileNames: Array[String] = inDF.columns
|
||||
|
|
|
@ -30,7 +30,6 @@ class GetMongo extends ConfigurableStop{
|
|||
|
||||
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
|
||||
val session: SparkSession = pec.get[SparkSession]()
|
||||
//注入链接地址
|
||||
var addressesArr: util.ArrayList[ServerAddress] = new util.ArrayList[ServerAddress]()
|
||||
val ipANDport: Array[String] = addresses.split(",")
|
||||
for(x <- (0 until ipANDport.size)){
|
||||
|
@ -39,7 +38,6 @@ class GetMongo extends ConfigurableStop{
|
|||
}
|
||||
}
|
||||
|
||||
//注入链接凭证
|
||||
var credentialsArr: util.ArrayList[MongoCredential] = new util.ArrayList[MongoCredential]()
|
||||
if(credentials.length!=0){
|
||||
val name_database_password: Array[String] = credentials.split(",")
|
||||
|
@ -50,22 +48,16 @@ class GetMongo extends ConfigurableStop{
|
|||
}
|
||||
}
|
||||
|
||||
//链接到数据库和表
|
||||
val client: MongoClient = new MongoClient(addressesArr,credentialsArr)
|
||||
val db: MongoDatabase = client.getDatabase(dataBase)
|
||||
val col: MongoCollection[Document] = db.getCollection(collection)
|
||||
|
||||
//获取表内全部数据 得到迭代器
|
||||
val documents: FindIterable[Document] = col.find()
|
||||
val dataIterator: MongoCursor[Document] = documents.iterator()
|
||||
var document: Document =null
|
||||
//记录字段名字
|
||||
var fileNamesArr: Array[String] =Array()
|
||||
//记录所有数据
|
||||
var rowArr:ArrayBuffer[ArrayBuffer[String]]=ArrayBuffer()
|
||||
//遍历数据
|
||||
while (dataIterator.hasNext){
|
||||
//记录每一条数据
|
||||
var dataArr:ArrayBuffer[String]=ArrayBuffer()
|
||||
document = dataIterator.next()
|
||||
val fileNamesSet: util.Set[String] = document.keySet()
|
||||
|
@ -76,7 +68,6 @@ class GetMongo extends ConfigurableStop{
|
|||
rowArr+=dataArr
|
||||
}
|
||||
|
||||
//生成df
|
||||
var names:ArrayBuffer[String]=ArrayBuffer()
|
||||
for(n <- (1 until fileNamesArr.size )){
|
||||
names += fileNamesArr(n)
|
||||
|
@ -128,7 +119,7 @@ class GetMongo extends ConfigurableStop{
|
|||
descriptor
|
||||
}
|
||||
override def getIcon(): Array[Byte] = {
|
||||
ImageUtil.getImage("mongoDB/mongodb.png")
|
||||
ImageUtil.getImage("mongoDB/mongoDB.png")
|
||||
}
|
||||
|
||||
override def getGroup(): List[String] = {
|
||||
|
|
|
@ -95,7 +95,7 @@ class PutMongo extends ConfigurableStop{
|
|||
}
|
||||
|
||||
override def getIcon(): Array[Byte] = {
|
||||
ImageUtil.getImage("mongoDB/mongodb.png")
|
||||
ImageUtil.getImage("mongoDB/mongoDB.png")
|
||||
}
|
||||
|
||||
override def getGroup(): List[String] = {
|
||||
|
|
|
@ -20,8 +20,8 @@ import scala.collection.mutable.ListBuffer
|
|||
|
||||
|
||||
class GetFromSolr extends ConfigurableStop{
|
||||
override val authorEmail: String ="18525746364@163.com"
|
||||
override val description: String = ""
|
||||
override val authorEmail: String ="yangqidong@cnic.cn"
|
||||
override val description: String = "get from solr"
|
||||
val inportList: List[String] = List(PortEnum.NonePort.toString)
|
||||
val outportList: List[String] = List(PortEnum.DefaultPort.toString)
|
||||
|
||||
|
|
|
@ -14,7 +14,7 @@ import org.apache.spark.sql.DataFrame
|
|||
class PutIntoSolr extends ConfigurableStop{
|
||||
|
||||
override val authorEmail: String = "yangqidong@cnic.cn"
|
||||
override val description: String = ""
|
||||
override val description: String = "put into solr"
|
||||
val inportList: List[String] = List(PortEnum.DefaultPort.toString)
|
||||
val outportList: List[String] = List(PortEnum.NonePort.toString)
|
||||
|
||||
|
|
|
@ -24,7 +24,7 @@ class FlattenXmlParser extends ConfigurableStop{
|
|||
override val authorEmail: String = "yangqidong@cnic.cn"
|
||||
val inportList: List[String] = List(PortEnum.NonePort.toString)
|
||||
val outportList: List[String] = List(PortEnum.DefaultPort.toString)
|
||||
override val description: String = "paring the xml file"
|
||||
override val description: String = "Parse the XML file and expand the label you need."
|
||||
|
||||
|
||||
var xmlpath:String = _
|
||||
|
@ -33,9 +33,7 @@ class FlattenXmlParser extends ConfigurableStop{
|
|||
var returnField: String = _
|
||||
|
||||
|
||||
//迭代器转换
|
||||
def javaIterToScalaIter(utilIter: util.Iterator[_]):Iterator[Element]={
|
||||
// asscala需要导入import scala.collection.JavaConverters._
|
||||
utilIter.asInstanceOf[util.Iterator[Element]].asScala
|
||||
}
|
||||
|
||||
|
@ -45,44 +43,31 @@ class FlattenXmlParser extends ConfigurableStop{
|
|||
|
||||
|
||||
ss=pec.get[SparkSession]()
|
||||
//读取hdfs文件,封装到流
|
||||
val conf = new Configuration()
|
||||
val fs: FileSystem = FileSystem.get(URI.create(xmlpath), conf)
|
||||
val hdfsInputStream: FSDataInputStream = fs.open(new Path(xmlpath))
|
||||
val saxReader: SAXReader = new SAXReader()
|
||||
// 将流放入dom4j中,解析数据
|
||||
val document: Document = saxReader.read(hdfsInputStream)
|
||||
//获取根节点
|
||||
val rootElt: Element = document.getRootElement
|
||||
|
||||
|
||||
var finalDF:DataFrame=null
|
||||
|
||||
|
||||
//建立节点路径的沿途所有ele的list
|
||||
var arrbuffer:ArrayBuffer[Element]=ArrayBuffer()
|
||||
arrbuffer+=rootElt
|
||||
|
||||
// 解析客户要解析的标签
|
||||
// papers,paper
|
||||
|
||||
val arrLabel: Array[String] = tagPath.split(",")
|
||||
// 循环标签路径,将沿途ele依次放入数组
|
||||
for(x<-(1 until arrLabel.length)){
|
||||
var ele: Element =arrbuffer(x-1).element(arrLabel(x))
|
||||
arrbuffer+=ele
|
||||
}
|
||||
|
||||
// 生成客户想解析标签的迭代器,并转换为scala迭代器
|
||||
val FatherElement: Element = arrbuffer(arrbuffer.size-1)
|
||||
val FatherInterator: util.Iterator[_] = FatherElement.elementIterator()
|
||||
val scalalInterator: Iterator[Element] = javaIterToScalaIter(FatherInterator)
|
||||
|
||||
// 提前准备关联的key和value
|
||||
var relationKey:String=""
|
||||
var relationValue:String=""
|
||||
|
||||
//解析、存储数据
|
||||
var valueArr:ArrayBuffer[ArrayBuffer[String]]=ArrayBuffer()
|
||||
var value:ArrayBuffer[String]=ArrayBuffer()
|
||||
var keyArr:ArrayBuffer[String]=ArrayBuffer()
|
||||
|
@ -109,7 +94,6 @@ class FlattenXmlParser extends ConfigurableStop{
|
|||
|
||||
|
||||
|
||||
// 解析用户要显示的字段名称
|
||||
if(returnField.size>0){
|
||||
val returnARR: Array[String] = returnField.split(",")
|
||||
val seq: Seq[Column] = returnARR.map(x=>finalDF(x)).toSeq
|
||||
|
@ -119,7 +103,6 @@ class FlattenXmlParser extends ConfigurableStop{
|
|||
}
|
||||
|
||||
|
||||
//如果用户希望展开某一字段
|
||||
var smallDF: DataFrame = null
|
||||
if(openTag.size>0){
|
||||
val eleExplode: Element = FatherElement.element(openTag)
|
||||
|
@ -153,7 +136,6 @@ class FlattenXmlParser extends ConfigurableStop{
|
|||
val sonSchame: StructType = StructType(field)
|
||||
smallDF = ss.createDataFrame(ss.sparkContext.makeRDD(valueRows),sonSchame)
|
||||
|
||||
// 两表join
|
||||
val df: DataFrame = smallDF.join(finalDF,finalDF(relationKey)===smallDF(relationKey+"_"),"left")
|
||||
finalDF= df.drop(relationKey+"_")
|
||||
}
|
||||
|
@ -163,12 +145,10 @@ class FlattenXmlParser extends ConfigurableStop{
|
|||
finalDF.show(20)
|
||||
println("%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%")
|
||||
|
||||
|
||||
out.write(finalDF)
|
||||
}
|
||||
|
||||
|
||||
|
||||
override def setProperties(map: Map[String, Any]): Unit = {
|
||||
xmlpath = MapUtil.get(map,"xmlpath").asInstanceOf[String]
|
||||
tagPath = MapUtil.get(map,"tagPath").asInstanceOf[String]
|
||||
|
|
|
@ -18,7 +18,7 @@ class XmlStringParser extends ConfigurableStop {
|
|||
override val authorEmail: String = "yangqidong@cnic.cn"
|
||||
val inportList: List[String] = List(PortEnum.NonePort.toString)
|
||||
val outportList: List[String] = List(PortEnum.DefaultPort.toString)
|
||||
override val description: String = ""
|
||||
override val description: String = "Parsing XML string"
|
||||
|
||||
var XmlString:String=_
|
||||
var label:String=_
|
||||
|
|
|
@ -11,6 +11,7 @@ class FlowBean {
|
|||
var uuid : String = _
|
||||
var name : String = _
|
||||
var checkpoint : String = _
|
||||
var checkpointParentProcessId : String = _
|
||||
var stops : List[StopBean] = List()
|
||||
var paths : List[PathBean] = List()
|
||||
|
||||
|
@ -21,6 +22,7 @@ class FlowBean {
|
|||
this.uuid = MapUtil.get(flowMap,"uuid").asInstanceOf[String]
|
||||
this.name = MapUtil.get(flowMap,"name").asInstanceOf[String]
|
||||
this.checkpoint = flowMap.getOrElse("checkpoint","").asInstanceOf[String]
|
||||
this.checkpointParentProcessId = flowMap.getOrElse("checkpointParentProcessId", "").asInstanceOf[String]
|
||||
|
||||
//construct StopBean List
|
||||
val stopsList = MapUtil.get(flowMap,"stops").asInstanceOf[List[Map[String, Any]]]
|
||||
|
@ -43,6 +45,7 @@ class FlowBean {
|
|||
val flow = new FlowImpl();
|
||||
|
||||
flow.setFlowName(this.name)
|
||||
flow.setCheckpointParentProcessId(this.checkpointParentProcessId)
|
||||
|
||||
this.stops.foreach( stopBean => {
|
||||
flow.addStop(stopBean.name,stopBean.constructStop())
|
||||
|
|
|
@ -4,6 +4,7 @@ import cn.piflow.Runner
|
|||
import cn.piflow.conf.bean.FlowBean
|
||||
import cn.piflow.conf.util.{FileUtil, OptionUtil}
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.h2.tools.Server
|
||||
import org.junit.Test
|
||||
|
||||
import scala.util.parsing.json.JSON
|
||||
|
@ -14,7 +15,7 @@ class FlowTest {
|
|||
def testFlow(): Unit ={
|
||||
|
||||
//parse flow json
|
||||
val file = "src/main/resources/flow.json"
|
||||
val file = "src/main/resources/flow_checkpoint.json"
|
||||
val flowJsonStr = FileUtil.fileReader(file)
|
||||
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
|
||||
println(map)
|
||||
|
@ -23,16 +24,34 @@ class FlowTest {
|
|||
val flowBean = FlowBean(map)
|
||||
val flow = flowBean.constructFlow()
|
||||
|
||||
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start()
|
||||
|
||||
//execute flow
|
||||
val spark = SparkSession.builder()
|
||||
.master("spark://10.0.86.89:7077")
|
||||
.appName("piflow-hive-bundle")
|
||||
.appName("piflow-hive-bundle-xjzhu")
|
||||
.config("spark.driver.memory", "1g")
|
||||
.config("spark.executor.memory", "2g")
|
||||
.config("spark.cores.max", "2")
|
||||
.config("spark.jars","/opt/project/piflow/out/artifacts/piflow_bundle/piflow-bundle.jar")
|
||||
.enableHiveSupport()
|
||||
.getOrCreate()
|
||||
// val spark = SparkSession.builder()
|
||||
// .master("yarn")
|
||||
// .appName(flowBean.name)
|
||||
// .config("spark.deploy.mode","cluster")
|
||||
// .config("spark.hadoop.yarn.resourcemanager.hostname", "10.0.86.191")
|
||||
// .config("spark.hadoop.cdefnprst.resourcemanager.address", "10.0.86.191:8032")
|
||||
// .config("spark.yarn.access.namenode", "hdfs://10.0.86.191:9000")
|
||||
// .config("spark.yarn.stagingDir", "hdfs://10.0.86.191:9000/tmp")
|
||||
// .config("spark.yarn.jars", "hdfs://10.0.86.191:9000/user/spark/share/lib/*.jar")
|
||||
// //.config("spark.driver.memory", "1g")
|
||||
// //.config("spark.executor.memory", "1g")
|
||||
// //.config("spark.cores.max", "2")
|
||||
// .config("spark.jars", "/opt/project/piflow/piflow-server/target/piflow-server-0.9.jar")
|
||||
// .config("hive.metastore.uris","thrift://10.0.86.191:9083")
|
||||
// .enableHiveSupport()
|
||||
// .getOrCreate()
|
||||
|
||||
val process = Runner.create()
|
||||
.bind(classOf[SparkSession].getName, spark)
|
||||
|
|
|
@ -1,8 +1,13 @@
|
|||
package cn.piflow
|
||||
|
||||
import java.io.IOException
|
||||
import java.net.URI
|
||||
import java.util.concurrent.{CountDownLatch, TimeUnit}
|
||||
|
||||
import cn.piflow.util.{IdGenerator, Logging}
|
||||
import cn.piflow.util.{HadoopFileUtil, IdGenerator, Logging}
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.fs.FileSystem
|
||||
import org.apache.hadoop.fs.Path
|
||||
import org.apache.spark.sql._
|
||||
|
||||
import scala.collection.mutable.{ArrayBuffer, Map => MMap}
|
||||
|
@ -20,6 +25,8 @@ trait JobInputStream {
|
|||
trait JobOutputStream {
|
||||
def makeCheckPoint(pec: JobContext): Unit;
|
||||
|
||||
def loadCheckPoint(pec: JobContext, path : String) : Unit;
|
||||
|
||||
def write(data: DataFrame);
|
||||
|
||||
def write(bundle: String, data: DataFrame);
|
||||
|
@ -65,6 +72,10 @@ trait Flow {
|
|||
def getFlowName(): String;
|
||||
|
||||
def setFlowName(flowName : String): Unit;
|
||||
|
||||
def getCheckpointParentProcessId() : String;
|
||||
|
||||
def setCheckpointParentProcessId(checkpointParentProcessId : String);
|
||||
}
|
||||
|
||||
class FlowImpl extends Flow {
|
||||
|
@ -72,6 +83,7 @@ class FlowImpl extends Flow {
|
|||
val edges = ArrayBuffer[Edge]();
|
||||
val stops = MMap[String, Stop]();
|
||||
val checkpoints = ArrayBuffer[String]();
|
||||
var checkpointParentProcessId = ""
|
||||
|
||||
def addStop(name: String, process: Stop) = {
|
||||
stops(name) = process;
|
||||
|
@ -113,8 +125,15 @@ class FlowImpl extends Flow {
|
|||
outgoingEdges.getOrElseUpdate(edge.stopFrom, ArrayBuffer[Edge]()) += edge;
|
||||
}
|
||||
|
||||
private def _visitProcess[T](processName: String, op: (String, Map[Edge, T]) => T, visited: MMap[String, T]): T = {
|
||||
private def _visitProcess[T](flow: Flow, processName: String, op: (String, Map[Edge, T]) => T, visited: MMap[String, T]): T = {
|
||||
if (!visited.contains(processName)) {
|
||||
|
||||
//TODO: need to check whether the checkpoint's data exist!!!!
|
||||
if(flow.hasCheckPoint(processName) && !flow.getCheckpointParentProcessId().equals("")){
|
||||
val ret = op(processName, null);
|
||||
visited(processName) = ret;
|
||||
return ret;
|
||||
}
|
||||
//executes dependent processes
|
||||
val inputs =
|
||||
if (incomingEdges.contains(processName)) {
|
||||
|
@ -122,7 +141,7 @@ class FlowImpl extends Flow {
|
|||
val edges = incomingEdges(processName);
|
||||
edges.map { edge =>
|
||||
edge ->
|
||||
_visitProcess(edge.stopFrom, op, visited);
|
||||
_visitProcess(flow, edge.stopFrom, op, visited);
|
||||
}.toMap
|
||||
}
|
||||
else {
|
||||
|
@ -138,11 +157,11 @@ class FlowImpl extends Flow {
|
|||
}
|
||||
}
|
||||
|
||||
override def visit[T](op: (String, Map[Edge, T]) => T): Unit = {
|
||||
override def visit[T](flow: Flow, op: (String, Map[Edge, T]) => T): Unit = {
|
||||
val ends = stops.keys.filterNot(outgoingEdges.contains(_));
|
||||
val visited = MMap[String, T]();
|
||||
ends.foreach {
|
||||
_visitProcess(_, op, visited);
|
||||
_visitProcess(flow, _, op, visited);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -154,10 +173,19 @@ class FlowImpl extends Flow {
|
|||
override def setFlowName(flowName : String): Unit = {
|
||||
this.name = flowName;
|
||||
}
|
||||
|
||||
//get the processId
|
||||
override def getCheckpointParentProcessId() : String = {
|
||||
this.checkpointParentProcessId
|
||||
}
|
||||
|
||||
override def setCheckpointParentProcessId(checkpointParentProcessId : String) = {
|
||||
this.checkpointParentProcessId = checkpointParentProcessId
|
||||
}
|
||||
}
|
||||
|
||||
trait AnalyzedFlowGraph {
|
||||
def visit[T](op: (String, Map[Edge, T]) => T): Unit;
|
||||
def visit[T](flow: Flow, op: (String, Map[Edge, T]) => T): Unit;
|
||||
}
|
||||
|
||||
trait Process {
|
||||
|
@ -208,18 +236,69 @@ class JobInputStreamImpl() extends JobInputStream {
|
|||
}
|
||||
|
||||
class JobOutputStreamImpl() extends JobOutputStream with Logging {
|
||||
private val defaultPort = "default"
|
||||
|
||||
override def makeCheckPoint(pec: JobContext) {
|
||||
mapDataFrame.foreach(en => {
|
||||
val path = pec.get("checkpoint.path").asInstanceOf[String].stripSuffix("/") + "/" + pec.getProcessContext().getProcess().pid() + "/" + pec.getStopJob().jid();
|
||||
val port = if(en._1.equals("")) defaultPort else en._1
|
||||
val path = pec.get("checkpoint.path").asInstanceOf[String].stripSuffix("/") + "/" + pec.getProcessContext().getProcess().pid() + "/" + pec.getStopJob().getStopName() + "/" + port;
|
||||
println("MakeCheckPoint Path: " + path)
|
||||
//val path = getCheckPointPath(pec)
|
||||
logger.debug(s"writing data on checkpoint: $path");
|
||||
|
||||
en._2.apply().write.parquet(path);
|
||||
mapDataFrame(en._1) = () => {
|
||||
logger.debug(s"loading data from checkpoint: $path");
|
||||
pec.get[SparkSession].read.parquet(path)
|
||||
pec.get[SparkSession].read.parquet(path)//default port?
|
||||
};
|
||||
})
|
||||
}
|
||||
|
||||
//load the checkpoint by path and port
|
||||
override def loadCheckPoint(pec: JobContext, checkpointPath : String): Unit = {
|
||||
|
||||
val ports = getCheckPointPorts(pec, checkpointPath)
|
||||
ports.foreach{ port => {
|
||||
|
||||
val mdf = () => {
|
||||
val checkpointPortPath = checkpointPath + "/" + port
|
||||
logger.debug(s"loading data from checkpoint: $checkpointPortPath")
|
||||
println(s"loading data from checkpoint: $checkpointPortPath")
|
||||
pec.get[SparkSession].read.parquet(checkpointPortPath)
|
||||
};
|
||||
|
||||
val newPort = if(port.equals(defaultPort)) "" else port
|
||||
mapDataFrame(newPort) = mdf
|
||||
|
||||
}}
|
||||
}
|
||||
|
||||
//get then checkpoint path
|
||||
private def getCheckPointPath(pec: JobContext) : String = {
|
||||
val pathStr = pec.get("checkpoint.path").asInstanceOf[String].stripSuffix("/") + "/" + pec.getProcessContext().getProcess().pid() + "/" + pec.getStopJob().getStopName();
|
||||
val conf:Configuration = new Configuration()
|
||||
try{
|
||||
val fs:FileSystem = FileSystem.get(URI.create(pathStr), conf)
|
||||
val path = new org.apache.hadoop.fs.Path(pathStr)
|
||||
if(fs.exists(path)){
|
||||
pathStr
|
||||
}else{
|
||||
""
|
||||
}
|
||||
}catch{
|
||||
case ex:IOException =>{
|
||||
println(ex)
|
||||
""
|
||||
}
|
||||
case _ => ""
|
||||
}
|
||||
}
|
||||
|
||||
//get the checkpoint ports list
|
||||
private def getCheckPointPorts(pec: JobContext, checkpointPath : String) : List[String] = {
|
||||
HadoopFileUtil.getFileInHadoopPath(checkpointPath)
|
||||
}
|
||||
|
||||
val mapDataFrame = MMap[String, () => DataFrame]();
|
||||
|
||||
override def write(data: DataFrame): Unit = write("", data);
|
||||
|
@ -233,6 +312,7 @@ class JobOutputStreamImpl() extends JobOutputStream with Logging {
|
|||
def contains(port: String) = mapDataFrame.contains(port);
|
||||
|
||||
def getDataFrame(port: String) = mapDataFrame(port);
|
||||
|
||||
}
|
||||
|
||||
class ProcessImpl(flow: Flow, runnerContext: Context, runner: Runner, parentProcess: Option[Process] = None)
|
||||
|
@ -265,9 +345,18 @@ class ProcessImpl(flow: Flow, runnerContext: Context, runner: Runner, parentProc
|
|||
}
|
||||
|
||||
val analyzed = flow.analyze();
|
||||
val checkpointParentProcessId = flow.getCheckpointParentProcessId()
|
||||
analyzed.visit[JobOutputStreamImpl](flow,performStopByCheckpoint)
|
||||
|
||||
/*if (checkpointParentProcessId == "" ){
|
||||
analyzed.visit[JobOutputStreamImpl](performStop)
|
||||
|
||||
}else{
|
||||
analyzed.visit[JobOutputStreamImpl](performStopByCheckpoint)
|
||||
}*/
|
||||
|
||||
//runs processes
|
||||
analyzed.visit[JobOutputStreamImpl]((stopName: String, inputs: Map[Edge, JobOutputStreamImpl]) => {
|
||||
/*analyzed.visit[JobOutputStreamImpl]((stopName: String, inputs: Map[Edge, JobOutputStreamImpl]) => {
|
||||
val pe = jobs(stopName);
|
||||
var outputs: JobOutputStreamImpl = null;
|
||||
try {
|
||||
|
@ -289,7 +378,71 @@ class ProcessImpl(flow: Flow, runnerContext: Context, runner: Runner, parentProc
|
|||
|
||||
outputs;
|
||||
}
|
||||
);
|
||||
);*/
|
||||
|
||||
def performStop(stopName: String, inputs: Map[Edge, JobOutputStreamImpl]) = {
|
||||
val pe = jobs(stopName);
|
||||
var outputs: JobOutputStreamImpl = null;
|
||||
try {
|
||||
runnerListener.onJobStarted(pe.getContext());
|
||||
outputs = pe.perform(inputs);
|
||||
runnerListener.onJobCompleted(pe.getContext());
|
||||
|
||||
//is a checkpoint?
|
||||
if (flow.hasCheckPoint(stopName)) {
|
||||
//store dataset
|
||||
outputs.makeCheckPoint(pe.getContext());
|
||||
}
|
||||
}
|
||||
catch {
|
||||
case e: Throwable =>
|
||||
runnerListener.onJobFailed(pe.getContext());
|
||||
throw e;
|
||||
}
|
||||
|
||||
outputs;
|
||||
}
|
||||
|
||||
//perform stop use checkpoint
|
||||
def performStopByCheckpoint(stopName: String, inputs: Map[Edge, JobOutputStreamImpl]) = {
|
||||
val pe = jobs(stopName);
|
||||
|
||||
var outputs : JobOutputStreamImpl = null
|
||||
try {
|
||||
runnerListener.onJobStarted(pe.getContext());
|
||||
|
||||
//new flow process
|
||||
if (checkpointParentProcessId.equals("")) {
|
||||
println("Visit process " + stopName + "!!!!!!!!!!!!!")
|
||||
outputs = pe.perform(inputs);
|
||||
runnerListener.onJobCompleted(pe.getContext());
|
||||
if (flow.hasCheckPoint(stopName)) {
|
||||
outputs.makeCheckPoint(pe.getContext());
|
||||
}
|
||||
}else{//read checkpoint from old process
|
||||
if(flow.hasCheckPoint(stopName)){
|
||||
val pec = pe.getContext()
|
||||
outputs = pec.getOutputStream().asInstanceOf[JobOutputStreamImpl];
|
||||
val checkpointPath = pec.get("checkpoint.path").asInstanceOf[String].stripSuffix("/") + "/" + checkpointParentProcessId + "/" + pec.getStopJob().getStopName();
|
||||
println("Visit process " + stopName + " by Checkpoint!!!!!!!!!!!!!")
|
||||
outputs.loadCheckPoint(pe.getContext(),checkpointPath)
|
||||
runnerListener.onJobCompleted(pe.getContext());
|
||||
}else{
|
||||
println("Visit process " + stopName + "!!!!!!!!!!!!!")
|
||||
outputs = pe.perform(inputs);
|
||||
runnerListener.onJobCompleted(pe.getContext());
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
catch {
|
||||
case e: Throwable =>
|
||||
runnerListener.onJobFailed(pe.getContext());
|
||||
throw e;
|
||||
}
|
||||
|
||||
outputs;
|
||||
}
|
||||
}
|
||||
|
||||
override def run(): Unit = {
|
||||
|
|
|
@ -119,7 +119,7 @@ class RunnerLogger extends RunnerListener with Logging {
|
|||
println(s"process started: $pid, flow: $flowName, time: $time")
|
||||
//update flow state to STARTED
|
||||
val appId = getAppId(ctx)
|
||||
H2Util.addFlow(appId,ctx.getFlow().getFlowName())
|
||||
H2Util.addFlow(appId,pid,ctx.getFlow().getFlowName())
|
||||
H2Util.updateFlowState(appId,FlowState.STARTED)
|
||||
H2Util.updateFlowStartTime(appId,time)
|
||||
};
|
||||
|
|
|
@ -9,7 +9,7 @@ import net.liftweb.json.JsonDSL._
|
|||
object H2Util {
|
||||
|
||||
val QUERY_TIME = 30
|
||||
val CREATE_FLOW_TABLE = "create table if not exists flow (id varchar(255), name varchar(255), state varchar(255), startTime varchar(255), endTime varchar(255))"
|
||||
val CREATE_FLOW_TABLE = "create table if not exists flow (id varchar(255), pid varchar(255), name varchar(255), state varchar(255), startTime varchar(255), endTime varchar(255))"
|
||||
val CREATE_STOP_TABLE = "create table if not exists stop (flowId varchar(255), name varchar(255), state varchar(255), startTime varchar(255), endTime varchar(255))"
|
||||
val serverIP = PropertyUtil.getPropertyValue("server.ip") + ":" + PropertyUtil.getPropertyValue("h2.port")
|
||||
val CONNECTION_URL = "jdbc:h2:tcp://" + serverIP + "/~/piflow;AUTO_SERVER=true"
|
||||
|
@ -36,11 +36,11 @@ object H2Util {
|
|||
connection
|
||||
}
|
||||
|
||||
def addFlow(appId:String,name:String)={
|
||||
def addFlow(appId:String,pId:String, name:String)={
|
||||
val startTime = new Date().toString
|
||||
val statement = getConnectionInstance().createStatement()
|
||||
statement.setQueryTimeout(QUERY_TIME)
|
||||
statement.executeUpdate("insert into flow(id, name) values('" + appId + "','" + name + "')")
|
||||
statement.executeUpdate("insert into flow(id, pid, name) values('" + appId + "','" + pId + "','" + name + "')")
|
||||
statement.close()
|
||||
}
|
||||
def updateFlowState(appId:String, state:String) = {
|
||||
|
@ -82,6 +82,19 @@ object H2Util {
|
|||
state
|
||||
}
|
||||
|
||||
def getFlowProcessId(appId:String) : String = {
|
||||
var pid = ""
|
||||
val statement = getConnectionInstance().createStatement()
|
||||
statement.setQueryTimeout(QUERY_TIME)
|
||||
val rs : ResultSet = statement.executeQuery("select pid from flow where id='" + appId +"'")
|
||||
while(rs.next()){
|
||||
pid = rs.getString("pid")
|
||||
}
|
||||
rs.close()
|
||||
statement.close()
|
||||
pid
|
||||
}
|
||||
|
||||
def getFlowInfo(appId:String) : String = {
|
||||
val statement = getConnectionInstance().createStatement()
|
||||
statement.setQueryTimeout(QUERY_TIME)
|
||||
|
@ -90,6 +103,7 @@ object H2Util {
|
|||
val flowRS : ResultSet = statement.executeQuery("select * from flow where id='" + appId +"'")
|
||||
while (flowRS.next()){
|
||||
flowInfo = "{\"flow\":{\"id\":\"" + flowRS.getString("id") +
|
||||
"\",\"pid\":\"" + flowRS.getString("pid") +
|
||||
"\",\"name\":\"" + flowRS.getString("name") +
|
||||
"\",\"state\":\"" + flowRS.getString("state") +
|
||||
"\",\"startTime\":\"" + flowRS.getString("startTime") +
|
||||
|
@ -124,7 +138,7 @@ object H2Util {
|
|||
|
||||
var stopCount = 0
|
||||
var completedStopCount = 0
|
||||
val totalRS : ResultSet = statement.executeQuery("select count(*) as stopCount from stop where flowId='" + appId +"'")
|
||||
val totalRS : ResultSet = statement.executeQuery("select count(*) as stopCount from stop where flowId='" + appId +"' and state!='" + StopState.INIT + "'")
|
||||
while(totalRS.next()){
|
||||
stopCount = totalRS.getInt("stopCount")
|
||||
//println("stopCount:" + stopCount)
|
||||
|
|
|
@ -0,0 +1,27 @@
|
|||
package cn.piflow.util
|
||||
|
||||
import java.net.URI
|
||||
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.fs.FileSystem
|
||||
|
||||
object HadoopFileUtil {
|
||||
|
||||
def getFileInHadoopPath(filePath : String) : List[String] = {
|
||||
var fileList = List[String]()
|
||||
if(!filePath.equals("")){
|
||||
try{
|
||||
val fs:FileSystem = FileSystem.get(URI.create(filePath), new Configuration())
|
||||
val path = new org.apache.hadoop.fs.Path(filePath)
|
||||
val status = fs.listStatus(path)
|
||||
status.foreach{ s =>
|
||||
fileList = s.getPath.getName +: fileList
|
||||
}
|
||||
}catch{
|
||||
case ex:Exception => println(ex)
|
||||
}
|
||||
}
|
||||
fileList
|
||||
}
|
||||
|
||||
}
|
|
@ -9,7 +9,7 @@ import org.apache.spark.sql.SparkSession
|
|||
import cn.piflow.conf.util.{ClassUtil, MapUtil, OptionUtil}
|
||||
import cn.piflow.{Process, Runner}
|
||||
import cn.piflow.api.util.PropertyUtil
|
||||
import cn.piflow.util.H2Util
|
||||
import cn.piflow.util.{H2Util, HadoopFileUtil}
|
||||
import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet, HttpPost}
|
||||
import org.apache.http.impl.client.HttpClients
|
||||
import org.apache.http.util.EntityUtils
|
||||
|
@ -58,7 +58,7 @@ object API {
|
|||
(applicationId,process)
|
||||
|
||||
}*/
|
||||
def startFlow(flowJson : String):(String,SparkAppHandle) = {
|
||||
def startFlow(flowJson : String):(String,String,SparkAppHandle) = {
|
||||
|
||||
var appId:String = null
|
||||
val map = OptionUtil.getAny(JSON.parseFull(flowJson)).asInstanceOf[Map[String, Any]]
|
||||
|
@ -122,7 +122,12 @@ object API {
|
|||
while (appId == null){
|
||||
Thread.sleep(1000)
|
||||
}
|
||||
(appId, handle)
|
||||
var processId = ""
|
||||
while(processId.equals("")){
|
||||
Thread.sleep(1000)
|
||||
processId = H2Util.getFlowProcessId(appId)
|
||||
}
|
||||
(appId, processId, handle)
|
||||
|
||||
}
|
||||
|
||||
|
@ -151,6 +156,12 @@ object API {
|
|||
str
|
||||
}
|
||||
|
||||
def getFlowCheckpoint(processID:String) : String = {
|
||||
val checkpointPath = PropertyUtil.getPropertyValue("checkpoint.path").stripSuffix("/") + "/" + processID
|
||||
val checkpointList = HadoopFileUtil.getFileInHadoopPath(checkpointPath)
|
||||
"""{"checkpoints":"""" + checkpointList.mkString(",") + """"}"""
|
||||
}
|
||||
|
||||
|
||||
def getStopInfo(bundle : String) : String = {
|
||||
try{
|
||||
|
|
|
@ -61,7 +61,7 @@ object HTTPClientStartFlow {
|
|||
| "name":"Merge",
|
||||
| "bundle":"cn.piflow.bundle.common.Merge",
|
||||
| "properties":{
|
||||
| "inports":""
|
||||
| "inports":"data1,data2"
|
||||
| }
|
||||
| },
|
||||
| {
|
||||
|
@ -138,7 +138,7 @@ object HTTPClientStartFlow {
|
|||
| }
|
||||
|}
|
||||
""".stripMargin
|
||||
val url = "http://10.0.86.191:8002/flow/start"
|
||||
val url = "http://10.0.86.98:8001/flow/start"
|
||||
val client = HttpClients.createDefault()
|
||||
val post:HttpPost = new HttpPost(url)
|
||||
|
||||
|
|
|
@ -1,79 +0,0 @@
|
|||
package cn.piflow.api
|
||||
|
||||
import org.apache.http.client.methods.{CloseableHttpResponse, HttpPost}
|
||||
import org.apache.http.entity.StringEntity
|
||||
import org.apache.http.impl.client.HttpClients
|
||||
import org.apache.http.util.EntityUtils
|
||||
|
||||
object HTTPClientStartFlow1 {
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
val json =
|
||||
"""
|
||||
|{
|
||||
| "flow":{
|
||||
| "name":"xml2csv",
|
||||
| "uuid":"1234",
|
||||
| "checkpoint":"Merge",
|
||||
| "stops":[
|
||||
| {
|
||||
| "uuid":"1111",
|
||||
| "name":"XmlParser",
|
||||
| "bundle":"cn.piflow.bundle.xml.XmlParser",
|
||||
| "properties":{
|
||||
| "xmlpath":"hdfs://10.0.86.89:9000/xjzhu/dblp.mini.xml",
|
||||
| "rowTag":"phdthesis"
|
||||
| }
|
||||
|
|
||||
| },
|
||||
| {
|
||||
| "uuid":"2222",
|
||||
| "name":"SelectField",
|
||||
| "bundle":"cn.piflow.bundle.common.SelectField",
|
||||
| "properties":{
|
||||
| "schema":"title,author,pages"
|
||||
| }
|
||||
|
|
||||
| },
|
||||
| {
|
||||
| "uuid":"888",
|
||||
| "name":"CsvSave",
|
||||
| "bundle":"cn.piflow.bundle.csv.CsvSave",
|
||||
| "properties":{
|
||||
| "csvSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis_result.csv",
|
||||
| "header":"true",
|
||||
| "delimiter":","
|
||||
| }
|
||||
| }
|
||||
| ],
|
||||
| "paths":[
|
||||
| {
|
||||
| "from":"XmlParser",
|
||||
| "outport":"",
|
||||
| "inport":"",
|
||||
| "to":"SelectField"
|
||||
| },
|
||||
| {
|
||||
| "from":"SelectField",
|
||||
| "outport":"",
|
||||
| "inport":"",
|
||||
| "to":"CsvSave"
|
||||
| }
|
||||
| ]
|
||||
| }
|
||||
|}
|
||||
""".stripMargin
|
||||
val url = "http://10.0.86.98:8001/flow/start"
|
||||
val client = HttpClients.createDefault()
|
||||
val post:HttpPost = new HttpPost(url)
|
||||
|
||||
post.addHeader("Content-Type", "application/json")
|
||||
post.setEntity(new StringEntity(json))
|
||||
|
||||
val response:CloseableHttpResponse = client.execute(post)
|
||||
val entity = response.getEntity
|
||||
val str = EntityUtils.toString(entity,"UTF-8")
|
||||
println("Code is " + str)
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,153 @@
|
|||
package cn.piflow.api
|
||||
|
||||
import org.apache.http.client.methods.{CloseableHttpResponse, HttpPost}
|
||||
import org.apache.http.entity.StringEntity
|
||||
import org.apache.http.impl.client.HttpClients
|
||||
import org.apache.http.util.EntityUtils
|
||||
|
||||
object HTTPClientStartFlowByCheckPoint {
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
val json ="""
|
||||
|{
|
||||
| "flow":{
|
||||
| "name":"xml,csv-merge-fork-hive,json,csv",
|
||||
| "uuid":"1234",
|
||||
| "checkpoint":"Merge",
|
||||
| "checkpointParentProcessId":"process_67adfebe-1792-4baa-abc0-1591d29e0d49_1",
|
||||
| "stops":[
|
||||
| {
|
||||
| "uuid":"1111",
|
||||
| "name":"XmlParser",
|
||||
| "bundle":"cn.piflow.bundle.xml.XmlParser",
|
||||
| "properties":{
|
||||
| "xmlpath":"hdfs://10.0.86.89:9000/xjzhu/dblp.mini.xml",
|
||||
| "rowTag":"phdthesis"
|
||||
| }
|
||||
|
|
||||
| },
|
||||
| {
|
||||
| "uuid":"2222",
|
||||
| "name":"SelectField",
|
||||
| "bundle":"cn.piflow.bundle.common.SelectField",
|
||||
| "properties":{
|
||||
| "schema":"title,author,pages"
|
||||
| }
|
||||
|
|
||||
| },
|
||||
| {
|
||||
| "uuid":"3333",
|
||||
| "name":"PutHiveStreaming",
|
||||
| "bundle":"cn.piflow.bundle.hive.PutHiveStreaming",
|
||||
| "properties":{
|
||||
| "database":"sparktest",
|
||||
| "table":"dblp_phdthesis"
|
||||
| }
|
||||
| },
|
||||
| {
|
||||
| "uuid":"4444",
|
||||
| "name":"CsvParser",
|
||||
| "bundle":"cn.piflow.bundle.csv.CsvParser",
|
||||
| "properties":{
|
||||
| "csvPath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.csv",
|
||||
| "header":"false",
|
||||
| "delimiter":",",
|
||||
| "schema":"title,author,pages"
|
||||
| }
|
||||
| },
|
||||
| {
|
||||
| "uuid":"555",
|
||||
| "name":"Merge",
|
||||
| "bundle":"cn.piflow.bundle.common.Merge",
|
||||
| "properties":{
|
||||
| "inports":"data1,data2"
|
||||
| }
|
||||
| },
|
||||
| {
|
||||
| "uuid":"666",
|
||||
| "name":"Fork",
|
||||
| "bundle":"cn.piflow.bundle.common.Fork",
|
||||
| "properties":{
|
||||
| "outports":"out1,out2,out3"
|
||||
| }
|
||||
| },
|
||||
| {
|
||||
| "uuid":"777",
|
||||
| "name":"JsonSave",
|
||||
| "bundle":"cn.piflow.bundle.json.JsonSave",
|
||||
| "properties":{
|
||||
| "jsonSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.json"
|
||||
| }
|
||||
| },
|
||||
| {
|
||||
| "uuid":"888",
|
||||
| "name":"CsvSave",
|
||||
| "bundle":"cn.piflow.bundle.csv.CsvSave",
|
||||
| "properties":{
|
||||
| "csvSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis_result.csv",
|
||||
| "header":"true",
|
||||
| "delimiter":","
|
||||
| }
|
||||
| }
|
||||
| ],
|
||||
| "paths":[
|
||||
| {
|
||||
| "from":"XmlParser",
|
||||
| "outport":"",
|
||||
| "inport":"",
|
||||
| "to":"SelectField"
|
||||
| },
|
||||
| {
|
||||
| "from":"SelectField",
|
||||
| "outport":"",
|
||||
| "inport":"data1",
|
||||
| "to":"Merge"
|
||||
| },
|
||||
| {
|
||||
| "from":"CsvParser",
|
||||
| "outport":"",
|
||||
| "inport":"data2",
|
||||
| "to":"Merge"
|
||||
| },
|
||||
| {
|
||||
| "from":"Merge",
|
||||
| "outport":"",
|
||||
| "inport":"",
|
||||
| "to":"Fork"
|
||||
| },
|
||||
| {
|
||||
| "from":"Fork",
|
||||
| "outport":"out1",
|
||||
| "inport":"",
|
||||
| "to":"PutHiveStreaming"
|
||||
| },
|
||||
| {
|
||||
| "from":"Fork",
|
||||
| "outport":"out2",
|
||||
| "inport":"",
|
||||
| "to":"JsonSave"
|
||||
| },
|
||||
| {
|
||||
| "from":"Fork",
|
||||
| "outport":"out3",
|
||||
| "inport":"",
|
||||
| "to":"CsvSave"
|
||||
| }
|
||||
| ]
|
||||
| }
|
||||
|}
|
||||
""".stripMargin
|
||||
val url = "http://10.0.86.98:8001/flow/start"
|
||||
val client = HttpClients.createDefault()
|
||||
val post:HttpPost = new HttpPost(url)
|
||||
|
||||
post.addHeader("Content-Type", "application/json")
|
||||
post.setEntity(new StringEntity(json))
|
||||
|
||||
val response:CloseableHttpResponse = client.execute(post)
|
||||
val entity = response.getEntity
|
||||
val str = EntityUtils.toString(entity,"UTF-8")
|
||||
println("Code is " + str)
|
||||
}
|
||||
|
||||
}
|
|
@ -93,6 +93,18 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup
|
|||
|
||||
}
|
||||
|
||||
case HttpRequest(GET, Uri.Path("/flow/checkpoints"), headers, entity, protocol) => {
|
||||
|
||||
val processID = req.getUri().query().getOrElse("processID","")
|
||||
if(!processID.equals("")){
|
||||
val result = API.getFlowCheckpoint(processID)
|
||||
Future.successful(HttpResponse(entity = result))
|
||||
}else{
|
||||
Future.successful(HttpResponse(entity = "processID is null or flow does not exist!"))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
case HttpRequest(POST, Uri.Path("/flow/start"), headers, entity, protocol) =>{
|
||||
|
||||
entity match {
|
||||
|
@ -100,9 +112,10 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup
|
|||
var flowJson = data.utf8String
|
||||
flowJson = flowJson.replaceAll("}","}\n")
|
||||
//flowJson = JsonFormatTool.formatJson(flowJson)
|
||||
val (appId,process) = API.startFlow(flowJson)
|
||||
val (appId,pid,process) = API.startFlow(flowJson)
|
||||
processMap += (appId -> process)
|
||||
Future.successful(HttpResponse(entity = appId))
|
||||
val result = "{\"flow\":{\"id\":\"" + appId + "\",\"pid\":\"" + pid + "\"}}"
|
||||
Future.successful(HttpResponse(entity = result))
|
||||
}
|
||||
|
||||
case ex => {
|
||||
|
|
|
@ -0,0 +1,21 @@
|
|||
package cn.piflow.api
|
||||
|
||||
import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet}
|
||||
import org.apache.http.impl.client.HttpClients
|
||||
import org.apache.http.util.EntityUtils
|
||||
|
||||
object HttpClientGetFlowCheckpoints {
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
|
||||
val url = "http://10.0.86.98:8001/flow/checkpoints?processID=process_088b2604-806e-4ccf-99cc-696aeaef730a_1"
|
||||
val client = HttpClients.createDefault()
|
||||
val getFlowInfo:HttpGet = new HttpGet(url)
|
||||
|
||||
val response:CloseableHttpResponse = client.execute(getFlowInfo)
|
||||
val entity = response.getEntity
|
||||
val str = EntityUtils.toString(entity,"UTF-8")
|
||||
println("Flow checkpoint is " + str)
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue