From ed8d9faaf2dd82982761cd12a8761fb5310991fa Mon Sep 17 00:00:00 2001 From: yanfqidong0604 Date: Mon, 19 Nov 2018 22:03:18 +0800 Subject: [PATCH 1/3] Standardization of historical stop yang qidong --- .../src/main/resources/mongoDB/mongoDB.png | Bin 0 -> 2986 bytes .../piflow/bundle/csv/CsvStringParser.scala | 2 +- .../piflow/bundle/csv/FolderCsvParser.scala | 10 +----- .../piflow/bundle/impala/SelectImpala.scala | 9 ++--- .../piflow/bundle/internetWorm/spider.scala | 31 +----------------- .../bundle/jdbc/JdbcReadFromOracle.scala | 10 ------ .../bundle/jdbc/JdbcWriteToOracle.scala | 3 -- .../memcache/ComplementByMemcache.scala | 27 +++++---------- .../piflow/bundle/memcache/GetMemcache.scala | 28 +++++----------- .../piflow/bundle/memcache/PutMemcache.scala | 21 +++++------- .../cn/piflow/bundle/mongodb/GetMongo.scala | 11 +------ .../cn/piflow/bundle/mongodb/PutMongo.scala | 2 +- .../cn/piflow/bundle/solr/GetFromSolr.scala | 4 +-- .../cn/piflow/bundle/solr/PutIntoSolr.scala | 2 +- .../piflow/bundle/xml/FlattenXmlParser.scala | 22 +------------ .../piflow/bundle/xml/XmlStringParser.scala | 2 +- 16 files changed, 40 insertions(+), 144 deletions(-) create mode 100644 piflow-bundle/src/main/resources/mongoDB/mongoDB.png diff --git a/piflow-bundle/src/main/resources/mongoDB/mongoDB.png b/piflow-bundle/src/main/resources/mongoDB/mongoDB.png new file mode 100644 index 0000000000000000000000000000000000000000..9901a5867f3c45091cc1e7695b5b5911d1a1cd6e GIT binary patch literal 2986 zcmV;b3sv-qP)Px#1ZP1_K>z@;j|==^1poj532;bRa{vGi!vFvd!vV){sAK>D3qMIjK~#8N?V5>K z6jv6;{cGk-CW%p@o26-JdWU95P>~?67^87taZMa224ryqK^h2)h_a{{9rsK!&YYOl z(M%@MB$`B{6XUoq(cF38g{rRCT`0$MBKAzxIp68-`|f*H^?vp4y{}-%@DY*pq#8Cn zLe=k{A7d-8KW<;OUXz%(2mwzT8bYEk24$iVbNuzzv@I z2NxYpDJ$GmxGR%#cchTB(n(Q?F_Mn~Zt&DEIM*yYjqOUOal0}YoTt%Ej#3Ai;sQPf zxWSVyxVX$X%56`fyv{T#=njB`5{gB>i!#6swnD%mulY85DQ9OYjS=AT#i|2DPlEsl z)w{_uCe8pi$O-{x&5x&y?LHbQR@tbIG!B{P-JufngQlAq0GBIP9l~Xf`0e%!Xnk_|U1S+&HNXv`ByiDQ3njOBDXq;%nPQb8T#gptMk?Uy zTr8K6eXZR9H;9tJS&I@VsWpjG+mfYlnJqv9ma45Ta@IP@>;ZkpKL%*FQaE>Y$d0Udmw=GEumze@2KtWE8Y%6RAIH8pE zY}&lJiq@?wqqj;*sl2>`Hf^e+>gpQWQd2|g%F1c{q#}xpi4W-;5oM+Fib^UeUB`V( z#Tb>9>uJNrjZ`5%r@uCv%obZnKM9=E?;(%Kalzp-qcxeFH3C+U>?NYQG{8|rw3V`R z@~FGBliodgggQGrC@m|8d;;i{snf~tucsr24$`qBhZ)98k+Evu7e_>sD=C$#s;aqf zcXt`<~sqB;azW6yRLV zUh)cX$pV}bF4M8%1SqW>LWz}u{han>|F4=mZy`@IR+V16DJFsT?&)ElIdkVrZFzm* z{C>Y=kG9(B@WBJTVq?U2#V&!fH@L|uz)9h<-N!4=QSA&-Vp(8IpgozMf}b$94Ks(> z^z?_9OdL}5+O&FJGoD^P(bGW2RrEMt1srmgUOO~@;S%=g*}Yp@K?=Cqnp(-TTy@xRga>(?n`3YO1-9M2dyo7LP3i?{HEZ4?x_tTH?0f(IeY$n)7S+}@ zNIsKanL&Gc_R*CqS7`r%Lo{#EQo4Eb7Cm_IfXALTW47e4sIjS)&VTnkojH4!?%lgb z7cN}n>wtaE{KcxDIo3vhdH*FW!ZDa`_$2AF+wui_WROW<&#{KCmRA;13mE1mqu zDVkXHD!F|bw6n9DZ9n~Vg?9J6L%Dh5XxeLY==SZ~4A7U6BV8CK@aL;n$&%pW)ryeu z@h6}07&|(;RKLQBQ|a4t=LpA*8#k%BbvxVqjZJI=K<&8Di4Q-fufO@0pLkp|;6Hfq zkRLWciivlypUss_SAPDPR{yDlhDOA&aLu@0_y)rp2soT`-`?JEnVyxa+7)nEU1)!& zr-u&g-^ahV*4B~T?UTllz{M0=$u7Vt;WD`|T>6^4DlDy{^DPoMMQckdc*1e*!EJMJV62z**K1~xYJ$|hzHWVqt zOcu6XH~3WLs~i*a?sRL+ZMU4Sd7MRfTR^r%r#))tz?2vU10=KMD)Q1}Q5}Z{;FCZQfGLm2XsBV1K057hinI{pT-U zD%rC~<%|1!D6LRv>9Q3(!S#N*@>Nv4bUD8XSzNqSdPty&lV9P!&CSh{9V+e{chQn% z{2N;xU2yojT-;2X2$voXm-+r20SwR_WL=QJ|40NsJ#Yvs$X5t;lc!Es?WojHp`lWY zjSs|jRDd|Jg;2r{Zfe(_y*$RTci-cp1fWpbBg~<4RrT*A=T=j;i zYJ2Yic~T!76cbGe3llk9E^nGnwn_o5*hVHxz$X|SLI?n00`at-{`{Zx*18G~hqzhn znGh~do;ocqDwCmM1;H26R)v@pOgM5PU}MaE`w!B{(J!k$5Gp+QK-gGc~o==}Nb=#x)Ra#+|s zBIe_P!9>DWDcxa18|?TT5oJ~TU?l;-q@w8zX2O(dT*+b|h(jCVAzUN!93q9@8=zjF z!2P1=z`f|*&!eQw&5Pn3kL&wra6%S~g?6@AQfg{SNShwGXI)D8vwH$>Efmd|HIG+b zO3=%IXHky@2l?wt$?g#Qwjg^rg2QVu9Kn?$gq(~Js@>c_s~Qlvt+i$3bh$(9;Rp_I zYOtO7`v)f|DK(3W7KBse&a-E~mNx6pvIYb$CpVj{@uDyZvWFu$2-_8GHSp@}$4i%Z zn}U^?oECTm^{i_^;QCXzg8N^P0WM5sfD2O@;KEb}xG-OfZvX%Q literal 0 HcmV?d00001 diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvStringParser.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvStringParser.scala index 89c0aae..ef66bb9 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvStringParser.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvStringParser.scala @@ -14,7 +14,7 @@ class CsvStringParser extends ConfigurableStop{ override val authorEmail: String = "yangqidong@cnic.cn" val inportList: List[String] = List(PortEnum.NonePort.toString) val outportList: List[String] = List(PortEnum.DefaultPort.toString) - override val description: String = "" + override val description: String = "Parsing for CSV strings" var Str:String=_ diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/FolderCsvParser.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/FolderCsvParser.scala index 180300c..e806166 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/FolderCsvParser.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/FolderCsvParser.scala @@ -14,7 +14,7 @@ class FolderCsvParser extends ConfigurableStop{ override val authorEmail: String = "yangqidong@cnic.cn" val inportList: List[String] = List(PortEnum.NonePort.toString) val outportList: List[String] = List(PortEnum.DefaultPort.toString) - override val description: String = "" + override val description: String = "Parsing of CSV folder" var FolderPath:String=_ @@ -22,14 +22,9 @@ class FolderCsvParser extends ConfigurableStop{ var schema: String = _ override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { - - println("kkkkkkkkkkkkkkkk") - val session: SparkSession = pec.get[SparkSession] val context: SparkContext = session.sparkContext - - val StrRDD: RDD[String] = context.textFile(FolderPath) var num:Int=0 var s:String=delimiter @@ -41,8 +36,6 @@ class FolderCsvParser extends ConfigurableStop{ row }) - - val schameARR: Array[String] = schema.split(",") val fields: Array[StructField] = schameARR.map(d=>StructField(d.trim,StringType,nullable = true)) @@ -62,7 +55,6 @@ class FolderCsvParser extends ConfigurableStop{ } - override def setProperties(map: Map[String, Any]): Unit = { FolderPath = MapUtil.get(map,"csvPath").asInstanceOf[String] delimiter = MapUtil.get(map,"delimiter").asInstanceOf[String] diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/impala/SelectImpala.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/impala/SelectImpala.scala index 1109b80..9afa2b2 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/impala/SelectImpala.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/impala/SelectImpala.scala @@ -29,13 +29,10 @@ class SelectImpala extends ConfigurableStop{ override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { val session: SparkSession = pec.get[SparkSession]() - //jdbc:hive2://10.0.82.165:21050/;auth=noSasl - Class.forName("org.apache.hive.jdbc.HiveDriver") val con: Connection = DriverManager.getConnection("jdbc:hive2://"+url+"/;auth=noSasl",user,password) val stmt: Statement = con.createStatement() - // val rs: ResultSet = stmt.executeQuery("select * from kylin.test1 full join kylin.morg on kylin.test1.pid=kylin.morg.belongtocode") val rs: ResultSet = stmt.executeQuery(sql) val filedNames: Array[String] = schameString.split(",") @@ -81,9 +78,9 @@ class SelectImpala extends ConfigurableStop{ val url=new PropertyDescriptor().name("url").displayName("url").description("IP and port number, you need to write like this -- ip:port").defaultValue("").required(true) descriptor = url :: descriptor - val user=new PropertyDescriptor().name("user").displayName("user").description("").defaultValue("").required(false) + val user=new PropertyDescriptor().name("user").displayName("user").description("user").defaultValue("").required(false) descriptor = user :: descriptor - val password=new PropertyDescriptor().name("password").displayName("password").description("").defaultValue("").required(false) + val password=new PropertyDescriptor().name("password").displayName("password").description("password").defaultValue("").required(false) descriptor = password :: descriptor val sql=new PropertyDescriptor().name("sql").displayName("sql").description("The name of the table has not changed.But you have to specify which database, such as database.table.").defaultValue("").required(true) descriptor = sql :: descriptor @@ -101,8 +98,8 @@ class SelectImpala extends ConfigurableStop{ List(StopGroupEnum.Mongodb.toString) } + override def initialize(ctx: ProcessContext): Unit = { } - } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/internetWorm/spider.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/internetWorm/spider.scala index 6eea32b..821f30e 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/internetWorm/spider.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/internetWorm/spider.scala @@ -40,9 +40,7 @@ class spider extends ConfigurableStop{ override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { val session: SparkSession = pec.get[SparkSession]() - //解析第一层界面,得到跳转到第二层界面所需的href所在标签的ele集合 var doc: Document =null - //html>table>a>b/2/3 将用户提供的标签路径解析 val selectArr: Array[String] = jumpDependence.split("/") try{ doc = Jsoup.connect(firstUrl).timeout(50000).get() @@ -55,42 +53,31 @@ class spider extends ConfigurableStop{ }catch { case e:Exception => throw new RuntimeException("JumpDependence specified label path does not exist."+"\n"+"jumpDependence指定的标签路径不存在") } - //循环集合得到每一个text作为标记字段,得到href作为跳转依赖 var choiceIntArr: Array[Int] =0.until(eles.size()).toArray -// var choiceIntArr: Array[Int] =0.until(15).toArray - //若用户制定了截取位置,从客户指定位置开始循环 if(selectArr.size>1){ choiceIntArr=selectArr(1).toInt.until(eles.size()).toArray } for(x <- choiceIntArr){ var num=x - //若用户指定了选择方式,按用户指定的方式选取标签 if(selectArr.size==3){ num = x + countInt * (selectArr(2).toInt) } val ele = eles.get(num) - //将标记字段存储,区分每条数据 var textStr: String = ele.text() if(textStr.length==0){ textStr = " " throw new RuntimeException("Label field no value"+"\n"+"标记字段无值") } map+=(markupField->textStr) -// 得到href,分析第二层界面 val hrefStr: String = ele.attr("href") - //将用户需要的每一个字段或者下载路径分离 val strArr: Array[String] = fileMap.split("\\+") parseNext(rootUrl+hrefStr,strArr) -//将map添加进去array,并清空map,开始下一条数据的存储 array+=map - /*map.empty*/ countInt+=1 } - //将计数器清空,留待下次使用 countInt=0 -//循环爬取结束,将所得数据存储为df var keySet: Set[String] =Set() val rows1: List[Row] = array.toList.map(map => { keySet = map.keySet @@ -104,7 +91,7 @@ class spider extends ConfigurableStop{ val fields: Array[StructField] = keySet.toArray.map(d=>StructField(d,StringType,nullable = true)) val schema: StructType = StructType(fields) val df: DataFrame = session.createDataFrame(rowRDD,schema) - + println("%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%") df.show(10) println("%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%") @@ -112,10 +99,7 @@ class spider extends ConfigurableStop{ } - - //二层界面的解析 def parseNext(url:String,strArr: Array[String]): Unit = { -// 获取doc对象 var doc: Document = null try { doc = Jsoup.connect(url).timeout(50000).get() @@ -123,7 +107,6 @@ class spider extends ConfigurableStop{ } catch { case e: IllegalArgumentException => throw new RuntimeException("The two level interface path does not exist."+"\n"+"二层界面路径不存在") } - //循环用户需要的每一个字段 var textDownFile:File =null for(each <- strArr){ val fileMapArr: Array[String] = each.split("/") @@ -135,21 +118,17 @@ class spider extends ConfigurableStop{ val rangeArr: Array[String] = fileMapArr(4).split("-") for(r <- (rangeArr(0).toInt until rangeArr(1).toInt) ){ val eachFileEle: Element = downList.get(r) - // 建立每个数据集的本地文件夹 if(downPath.size==0){ downPath="/InternetWormDown/" } map+=("downPath"->downPath) - //获取当前时间,并记录 val nowDate: String = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) map+=("downDate"->nowDate) textDownFile = new File(downPath+map.get(markupField).get) if(! textDownFile.exists()){ textDownFile.mkdirs() } - //建立文件 val eachFile: File = new File(downPath+map.get(markupField).get+"/"+eachFileEle.text()) - // 如果本地文件不存在,写入本地,若存在,跳过此文件 if(!eachFile.exists()){ val in: InputStream = new URL(url+eachFileEle.attr("href")).openStream() val out: BufferedOutputStream = new BufferedOutputStream(new FileOutputStream(eachFile),10485760) @@ -168,7 +147,6 @@ class spider extends ConfigurableStop{ } }else{ - //循环用户给定的需求字段名称和标签地址,依次存储au if(fileMapArr.size > 2){ map+=(fileMapArr(0) -> numEle.text()) }else{ @@ -176,13 +154,6 @@ class spider extends ConfigurableStop{ } } } - - - // for(key <- fileMap.keySet){ - // //将用户给的标签路径和选择条件分离 - // val selects = fileMap.get(key).get.split("/") - // val value: String = doc.select(selects(0)).get(selects(1).toInt).text() - // map+=(key -> value) } override def setProperties(map: Map[String, Any]): Unit = { diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/JdbcReadFromOracle.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/JdbcReadFromOracle.scala index 4a9f78e..d4faa94 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/JdbcReadFromOracle.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/JdbcReadFromOracle.scala @@ -134,16 +134,6 @@ class JdbcReadFromOracle extends ConfigurableStop{ df.show(20) println("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@") -/* - val v: Any = df.collect()(0).get(2) - println("3333333333333333333333"+v.getClass) - val ab: Array[Byte] = v.asInstanceOf[Seq[Byte]].toArray - - val fos: FileOutputStream = new FileOutputStream(new File("/aa.txt")) - fos.write(ab) - fos.close() -*/ - out.write(df) diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/JdbcWriteToOracle.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/JdbcWriteToOracle.scala index 1d3fe43..de05483 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/JdbcWriteToOracle.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/JdbcWriteToOracle.scala @@ -51,9 +51,6 @@ class JdbcWriteToOracle extends ConfigurableStop{ star.executeUpdate(insertSQL) }) - - - } def initialize(ctx: ProcessContext): Unit = { diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/memcache/ComplementByMemcache.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/memcache/ComplementByMemcache.scala index a859779..dd3acf1 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/memcache/ComplementByMemcache.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/memcache/ComplementByMemcache.scala @@ -18,15 +18,15 @@ class ComplementByMemcache extends ConfigurableStop { val inportList: List[String] = List(PortEnum.DefaultPort.toString) val outportList: List[String] = List(PortEnum.DefaultPort.toString) - var servers:String=_ //服务器地址和端口号Server address and port number,If you have multiple servers, use "," segmentation. - var keyFile:String=_ //你想用来作为查询条件的字段The field you want to use as a query condition - var weights:String=_ //每台服务器的权重Weight of each server - var maxIdle:String=_ //最大处理时间Maximum processing time - var maintSleep:String=_ //主线程睡眠时间Main thread sleep time - var nagle:String=_ //socket参数,若为true,则写数据时不缓冲立即发送If the socket parameter is true, the data is not buffered and sent immediately. - var socketTO:String=_ //socket阻塞时候的超时时间Socket timeout during blocking - var socketConnectTO:String=_ //连接建立时的超时控制Timeout control during connection establishment - var replaceField:String=_ //你想得到的字段The fields you want to get + var servers:String=_ //Server address and port number,If you have multiple servers, use "," segmentation. + var keyFile:String=_ //The field you want to use as a query condition + var weights:String=_ //Weight of each server + var maxIdle:String=_ //Maximum processing time + var maintSleep:String=_ //Main thread sleep time + var nagle:String=_ //If the socket parameter is true, the data is not buffered and sent immediately. + var socketTO:String=_ //Socket timeout during blocking + var socketConnectTO:String=_ //Timeout control during connection establishment + var replaceField:String=_ //The fields you want to get override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { @@ -35,10 +35,8 @@ class ComplementByMemcache extends ConfigurableStop { val mcc: MemCachedClient =getMcc() - //获得输出df的描述信息 val replaceFields: mutable.Buffer[String] = replaceField.split(",").toBuffer - //获取inDF中所有数据到数组,用于更改数据 val rowArr: Array[Row] = inDF.collect() val fileNames: Array[String] = inDF.columns val data: Array[Map[String, String]] = rowArr.map(row => { @@ -51,11 +49,9 @@ class ComplementByMemcache extends ConfigurableStop { map }) - //查询memcache中数据,并按用户需求替换df中数据 val finalData: Array[Map[String, String]] = data.map(eachData => { var d: Map[String, String] = eachData val anyRef: AnyRef = mcc.get(d.get(keyFile).get) - //当从memcache中得到数据时,替换 if(anyRef.getClass.toString.equals("class scala.Some")){ val map: Map[String, String] = anyRef.asInstanceOf[Map[String, String]] for (f <- replaceFields) { @@ -65,7 +61,6 @@ class ComplementByMemcache extends ConfigurableStop { d }) - //将schame和数据转换为df var arrKey: Array[String] = Array() val rows: List[Row] = finalData.toList.map(map => { arrKey = map.keySet.toArray @@ -89,11 +84,8 @@ class ComplementByMemcache extends ConfigurableStop { } - //得到全局唯一实例 def getMcc(): MemCachedClient = { - //获取连接池实例对象 val pool: SockIOPool = SockIOPool.getInstance() - // 链接到数据库 var serversArr:Array[String]=servers.split(",") pool.setServers(serversArr) @@ -118,7 +110,6 @@ class ComplementByMemcache extends ConfigurableStop { } pool.initialize() - //建立全局唯一实例 val mcc: MemCachedClient = new MemCachedClient() mcc } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/memcache/GetMemcache.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/memcache/GetMemcache.scala index a547670..c3548ef 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/memcache/GetMemcache.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/memcache/GetMemcache.scala @@ -19,15 +19,15 @@ class GetMemcache extends ConfigurableStop{ val inportList: List[String] = List(PortEnum.DefaultPort.toString) val outportList: List[String] = List(PortEnum.DefaultPort.toString) - var servers:String=_ //服务器地址和端口号Server address and port number,If you have multiple servers, use "," segmentation. - var keyFile:String=_ //你想用来作为查询条件的字段The field you want to use as a query condition - var weights:String=_ //每台服务器的权重Weight of each server - var maxIdle:String=_ //最大处理时间Maximum processing time - var maintSleep:String=_ //主线程睡眠时间Main thread sleep time - var nagle:String=_ //socket参数,若为true,则写数据时不缓冲立即发送If the socket parameter is true, the data is not buffered and sent immediately. - var socketTO:String=_ //socket阻塞时候的超时时间Socket timeout during blocking - var socketConnectTO:String=_ //连接建立时的超时控制Timeout control during connection establishment - var schame:String=_ //你想得到的字段The fields you want to get + var servers:String=_ //Server address and port number,If you have multiple servers, use "," segmentation. + var keyFile:String=_ //The field you want to use as a query condition + var weights:String=_ //Weight of each server + var maxIdle:String=_ //Maximum processing time + var maintSleep:String=_ //Main thread sleep time + var nagle:String=_ //If the socket parameter is true, the data is not buffered and sent immediately. + var socketTO:String=_ //Socket timeout during blocking + var socketConnectTO:String=_ //Timeout control during connection establishment + var schame:String=_ //The fields you want to get override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { val session: SparkSession = pec.get[SparkSession]() @@ -42,21 +42,17 @@ class GetMemcache extends ConfigurableStop{ str.substring(1,str.length-1) }) - //获得输出df的描述信息 var schameArr:Array[String] =Array() - //若用户指定返回信息,返回用户需要的信息 if(schame.length>0){ val schameArrBuff: mutable.Buffer[String] = schame.split(",").toBuffer schameArrBuff.insert(0,keyFile) schameArr = schameArrBuff.toArray } - //获得输出df的数据 var allFileDatas: ArrayBuffer[ArrayBuffer[String]] =ArrayBuffer() for(keyNum <- (0 until keys.size)){ val map: Map[String, String] = mcc.get(keys(keyNum)).asInstanceOf[Map[String,String]] - //若用户没有指定返回字段,则将memcache中查到的数据全部返回 if(schame.size==0){ val arr: Array[String] = map.keySet.toArray val buffer: mutable.Buffer[String] = arr.toBuffer @@ -64,7 +60,6 @@ class GetMemcache extends ConfigurableStop{ schameArr = buffer.toArray } - //记录这一条数据的所有值 var values: ArrayBuffer[String] =ArrayBuffer() values+=keys(keyNum) for(x <- (1 until schameArr.size)){ @@ -73,7 +68,6 @@ class GetMemcache extends ConfigurableStop{ allFileDatas+=values } - //将schame和数据转换为df val rowList: List[Row] = allFileDatas.map(arr => {Row.fromSeq(arr)}).toList val rowRDD: RDD[Row] = session.sparkContext.makeRDD(rowList) val fields: Array[StructField] = schameArr.map(d=>StructField(d,StringType,nullable = true)) @@ -87,11 +81,8 @@ class GetMemcache extends ConfigurableStop{ out.write(df) } - //得到全局唯一实例 def getMcc(): MemCachedClient = { - //获取连接池实例对象 val pool: SockIOPool = SockIOPool.getInstance() - // 链接到数据库 var serversArr:Array[String]=servers.split(",") pool.setServers(serversArr) @@ -116,7 +107,6 @@ class GetMemcache extends ConfigurableStop{ } pool.initialize() - //建立全局唯一实例 val mcc: MemCachedClient = new MemCachedClient() mcc } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/memcache/PutMemcache.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/memcache/PutMemcache.scala index 4d69773..17bd48c 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/memcache/PutMemcache.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/memcache/PutMemcache.scala @@ -9,18 +9,18 @@ import org.apache.spark.sql.{DataFrame, Row, SparkSession} class PutMemcache extends ConfigurableStop{ override val authorEmail: String = "yangqidong@cnic.cn" - override val description: String = "get data from memcache" + override val description: String = "put data into memcache" val inportList: List[String] = List(PortEnum.DefaultPort.toString) val outportList: List[String] = List(PortEnum.NonePort.toString) - var servers:String=_ //服务器地址和端口号Server address and port number,If you have multiple servers, use "," segmentation. - var keyFile:String=_ //你想用来作为key的字段You want to be used as a field for key. - var weights:String=_ //每台服务器的权重Weight of each server - var maxIdle:String=_ //最大处理时间Maximum processing time - var maintSleep:String=_ //主线程睡眠时间Main thread sleep time - var nagle:String=_ //socket参数,若为true,则写数据时不缓冲立即发送If the socket parameter is true, the data is not buffered and sent immediately. - var socketTO:String=_ //socket阻塞时候的超时时间Socket timeout during blocking - var socketConnectTO:String=_ //连接建立时的超时控制Timeout control during connection establishment + var servers:String=_ //Server address and port number,If you have multiple servers, use "," segmentation. + var keyFile:String=_ //You want to be used as a field for key. + var weights:String=_ //Weight of each server + var maxIdle:String=_ //Maximum processing time + var maintSleep:String=_ //Main thread sleep time + var nagle:String=_ //If the socket parameter is true, the data is not buffered and sent immediately. + var socketTO:String=_ //Socket timeout during blocking + var socketConnectTO:String=_ //Timeout control during connection establishment override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { @@ -28,9 +28,7 @@ class PutMemcache extends ConfigurableStop{ val session: SparkSession = pec.get[SparkSession]() val inDF: DataFrame = in.read() - //获取连接池实例对象 val pool: SockIOPool = SockIOPool.getInstance() - // 链接到数据库 var serversArr:Array[String]=servers.split(",") pool.setServers(serversArr) @@ -55,7 +53,6 @@ class PutMemcache extends ConfigurableStop{ } pool.initialize() - //建立全局唯一实例 val mcc: MemCachedClient = new MemCachedClient() val fileNames: Array[String] = inDF.columns diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/mongodb/GetMongo.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/mongodb/GetMongo.scala index 903c457..ceecc99 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/mongodb/GetMongo.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/mongodb/GetMongo.scala @@ -30,7 +30,6 @@ class GetMongo extends ConfigurableStop{ override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { val session: SparkSession = pec.get[SparkSession]() -//注入链接地址 var addressesArr: util.ArrayList[ServerAddress] = new util.ArrayList[ServerAddress]() val ipANDport: Array[String] = addresses.split(",") for(x <- (0 until ipANDport.size)){ @@ -39,7 +38,6 @@ class GetMongo extends ConfigurableStop{ } } -//注入链接凭证 var credentialsArr: util.ArrayList[MongoCredential] = new util.ArrayList[MongoCredential]() if(credentials.length!=0){ val name_database_password: Array[String] = credentials.split(",") @@ -50,22 +48,16 @@ class GetMongo extends ConfigurableStop{ } } -//链接到数据库和表 val client: MongoClient = new MongoClient(addressesArr,credentialsArr) val db: MongoDatabase = client.getDatabase(dataBase) val col: MongoCollection[Document] = db.getCollection(collection) -//获取表内全部数据 得到迭代器 val documents: FindIterable[Document] = col.find() val dataIterator: MongoCursor[Document] = documents.iterator() var document: Document =null -//记录字段名字 var fileNamesArr: Array[String] =Array() -//记录所有数据 var rowArr:ArrayBuffer[ArrayBuffer[String]]=ArrayBuffer() -//遍历数据 while (dataIterator.hasNext){ - //记录每一条数据 var dataArr:ArrayBuffer[String]=ArrayBuffer() document = dataIterator.next() val fileNamesSet: util.Set[String] = document.keySet() @@ -76,7 +68,6 @@ class GetMongo extends ConfigurableStop{ rowArr+=dataArr } -//生成df var names:ArrayBuffer[String]=ArrayBuffer() for(n <- (1 until fileNamesArr.size )){ names += fileNamesArr(n) @@ -128,7 +119,7 @@ class GetMongo extends ConfigurableStop{ descriptor } override def getIcon(): Array[Byte] = { - ImageUtil.getImage("mongoDB/mongodb.png") + ImageUtil.getImage("mongoDB/mongoDB.png") } override def getGroup(): List[String] = { diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/mongodb/PutMongo.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/mongodb/PutMongo.scala index 4309e0d..94aadfb 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/mongodb/PutMongo.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/mongodb/PutMongo.scala @@ -95,7 +95,7 @@ class PutMongo extends ConfigurableStop{ } override def getIcon(): Array[Byte] = { - ImageUtil.getImage("mongoDB/mongodb.png") + ImageUtil.getImage("mongoDB/mongoDB.png") } override def getGroup(): List[String] = { diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/solr/GetFromSolr.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/solr/GetFromSolr.scala index f364140..8c32044 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/solr/GetFromSolr.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/solr/GetFromSolr.scala @@ -20,8 +20,8 @@ import scala.collection.mutable.ListBuffer class GetFromSolr extends ConfigurableStop{ - override val authorEmail: String ="18525746364@163.com" - override val description: String = "" + override val authorEmail: String ="yangqidong@cnic.cn" + override val description: String = "get from solr" val inportList: List[String] = List(PortEnum.NonePort.toString) val outportList: List[String] = List(PortEnum.DefaultPort.toString) diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/solr/PutIntoSolr.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/solr/PutIntoSolr.scala index 5700825..b6bc432 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/solr/PutIntoSolr.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/solr/PutIntoSolr.scala @@ -14,7 +14,7 @@ import org.apache.spark.sql.DataFrame class PutIntoSolr extends ConfigurableStop{ override val authorEmail: String = "yangqidong@cnic.cn" - override val description: String = "" + override val description: String = "put into solr" val inportList: List[String] = List(PortEnum.DefaultPort.toString) val outportList: List[String] = List(PortEnum.NonePort.toString) diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/xml/FlattenXmlParser.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/xml/FlattenXmlParser.scala index 255c993..0ac6c6c 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/xml/FlattenXmlParser.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/xml/FlattenXmlParser.scala @@ -24,7 +24,7 @@ class FlattenXmlParser extends ConfigurableStop{ override val authorEmail: String = "yangqidong@cnic.cn" val inportList: List[String] = List(PortEnum.NonePort.toString) val outportList: List[String] = List(PortEnum.DefaultPort.toString) - override val description: String = "paring the xml file" + override val description: String = "Parse the XML file and expand the label you need." var xmlpath:String = _ @@ -33,9 +33,7 @@ class FlattenXmlParser extends ConfigurableStop{ var returnField: String = _ - //迭代器转换 def javaIterToScalaIter(utilIter: util.Iterator[_]):Iterator[Element]={ - // asscala需要导入import scala.collection.JavaConverters._ utilIter.asInstanceOf[util.Iterator[Element]].asScala } @@ -45,44 +43,31 @@ class FlattenXmlParser extends ConfigurableStop{ ss=pec.get[SparkSession]() -//读取hdfs文件,封装到流 val conf = new Configuration() val fs: FileSystem = FileSystem.get(URI.create(xmlpath), conf) val hdfsInputStream: FSDataInputStream = fs.open(new Path(xmlpath)) val saxReader: SAXReader = new SAXReader() -// 将流放入dom4j中,解析数据 val document: Document = saxReader.read(hdfsInputStream) -//获取根节点 val rootElt: Element = document.getRootElement - var finalDF:DataFrame=null - -//建立节点路径的沿途所有ele的list var arrbuffer:ArrayBuffer[Element]=ArrayBuffer() arrbuffer+=rootElt -// 解析客户要解析的标签 -// papers,paper - val arrLabel: Array[String] = tagPath.split(",") -// 循环标签路径,将沿途ele依次放入数组 for(x<-(1 until arrLabel.length)){ var ele: Element =arrbuffer(x-1).element(arrLabel(x)) arrbuffer+=ele } -// 生成客户想解析标签的迭代器,并转换为scala迭代器 val FatherElement: Element = arrbuffer(arrbuffer.size-1) val FatherInterator: util.Iterator[_] = FatherElement.elementIterator() val scalalInterator: Iterator[Element] = javaIterToScalaIter(FatherInterator) -// 提前准备关联的key和value var relationKey:String="" var relationValue:String="" - //解析、存储数据 var valueArr:ArrayBuffer[ArrayBuffer[String]]=ArrayBuffer() var value:ArrayBuffer[String]=ArrayBuffer() var keyArr:ArrayBuffer[String]=ArrayBuffer() @@ -109,7 +94,6 @@ class FlattenXmlParser extends ConfigurableStop{ - // 解析用户要显示的字段名称 if(returnField.size>0){ val returnARR: Array[String] = returnField.split(",") val seq: Seq[Column] = returnARR.map(x=>finalDF(x)).toSeq @@ -119,7 +103,6 @@ class FlattenXmlParser extends ConfigurableStop{ } - //如果用户希望展开某一字段 var smallDF: DataFrame = null if(openTag.size>0){ val eleExplode: Element = FatherElement.element(openTag) @@ -153,7 +136,6 @@ class FlattenXmlParser extends ConfigurableStop{ val sonSchame: StructType = StructType(field) smallDF = ss.createDataFrame(ss.sparkContext.makeRDD(valueRows),sonSchame) - // 两表join val df: DataFrame = smallDF.join(finalDF,finalDF(relationKey)===smallDF(relationKey+"_"),"left") finalDF= df.drop(relationKey+"_") } @@ -163,12 +145,10 @@ class FlattenXmlParser extends ConfigurableStop{ finalDF.show(20) println("%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%") - out.write(finalDF) } - override def setProperties(map: Map[String, Any]): Unit = { xmlpath = MapUtil.get(map,"xmlpath").asInstanceOf[String] tagPath = MapUtil.get(map,"tagPath").asInstanceOf[String] diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/xml/XmlStringParser.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/xml/XmlStringParser.scala index b8eca49..9bfc3fa 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/xml/XmlStringParser.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/xml/XmlStringParser.scala @@ -18,7 +18,7 @@ class XmlStringParser extends ConfigurableStop { override val authorEmail: String = "yangqidong@cnic.cn" val inportList: List[String] = List(PortEnum.NonePort.toString) val outportList: List[String] = List(PortEnum.DefaultPort.toString) - override val description: String = "" + override val description: String = "Parsing XML string" var XmlString:String=_ var label:String=_ From dd11c444a255381f586f1c98b07490e534f3663d Mon Sep 17 00:00:00 2001 From: judy0131 Date: Tue, 20 Nov 2018 16:46:47 +0800 Subject: [PATCH 2/3] 1.implement checkpoint --- piflow-bundle/pom.xml | 2 +- .../src/main/resources/flow_checkpoint.json | 128 +++++++++++++ .../bundle/graphx/LabelPropagation.scala | 2 +- .../cn/piflow/bundle/graphx/LoadGraph.scala | 2 +- .../scala/cn/piflow/conf/bean/FlowBean.scala | 3 + .../scala/cn/piflow/bundle/FlowTest.scala | 23 ++- .../src/main/scala/cn/piflow/main.scala | 173 +++++++++++++++++- .../src/main/scala/cn/piflow/runner.scala | 2 +- .../main/scala/cn/piflow/util/H2Util.scala | 20 +- .../scala/cn/piflow/util/HadoopFileUtil.scala | 27 +++ .../src/main/scala/cn/piflow/api/API.scala | 17 +- .../cn/piflow/api/HTTPClientStartFlow.scala | 4 +- .../cn/piflow/api/HTTPClientStartFlow1.scala | 79 -------- .../api/HTTPClientStartFlowByCheckPoint.scala | 153 ++++++++++++++++ .../scala/cn/piflow/api/HTTPService.scala | 17 +- .../api/HttpClientGetFlowCheckpoints.scala | 21 +++ 16 files changed, 568 insertions(+), 105 deletions(-) create mode 100644 piflow-bundle/src/main/resources/flow_checkpoint.json create mode 100644 piflow-core/src/main/scala/cn/piflow/util/HadoopFileUtil.scala delete mode 100644 piflow-server/src/main/scala/cn/piflow/api/HTTPClientStartFlow1.scala create mode 100644 piflow-server/src/main/scala/cn/piflow/api/HTTPClientStartFlowByCheckPoint.scala create mode 100644 piflow-server/src/main/scala/cn/piflow/api/HttpClientGetFlowCheckpoints.scala diff --git a/piflow-bundle/pom.xml b/piflow-bundle/pom.xml index ef65499..969241d 100644 --- a/piflow-bundle/pom.xml +++ b/piflow-bundle/pom.xml @@ -251,7 +251,7 @@ - install-external-2 + install-external-3 install-file diff --git a/piflow-bundle/src/main/resources/flow_checkpoint.json b/piflow-bundle/src/main/resources/flow_checkpoint.json new file mode 100644 index 0000000..5255966 --- /dev/null +++ b/piflow-bundle/src/main/resources/flow_checkpoint.json @@ -0,0 +1,128 @@ +{ + "flow":{ + "name":"test", + "uuid":"1234", + "checkpoint":"Merge", + "checkpointParentProcessId":"process_3ae4741c-3aca-4254-8d2e-71b6045b083c_1", + "stops":[ + { + "uuid":"1111", + "name":"XmlParser", + "bundle":"cn.piflow.bundle.xml.XmlParser", + "properties":{ + "xmlpath":"hdfs://10.0.86.89:9000/xjzhu/dblp.mini.xml", + "rowTag":"phdthesis" + } + }, + { + "uuid":"2222", + "name":"SelectField", + "bundle":"cn.piflow.bundle.common.SelectField", + "properties":{ + "schema":"title,author,pages" + } + + }, + { + "uuid":"3333", + "name":"PutHiveStreaming", + "bundle":"cn.piflow.bundle.hive.PutHiveStreaming", + "properties":{ + "database":"sparktest", + "table":"dblp_phdthesis" + } + }, + { + "uuid":"4444", + "name":"CsvParser", + "bundle":"cn.piflow.bundle.csv.CsvParser", + "properties":{ + "csvPath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.csv", + "header":"false", + "delimiter":",", + "schema":"title,author,pages" + } + }, + { + "uuid":"555", + "name":"Merge", + "bundle":"cn.piflow.bundle.common.Merge", + "properties":{ + "inports":"data1,data2" + } + + }, + { + "uuid":"666", + "name":"Fork", + "bundle":"cn.piflow.bundle.common.Fork", + "properties":{ + "outports":"out1,out2,out3" + } + }, + { + "uuid":"777", + "name":"JsonSave", + "bundle":"cn.piflow.bundle.json.JsonSave", + "properties":{ + "jsonSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.json" + } + }, + { + "uuid":"888", + "name":"CsvSave", + "bundle":"cn.piflow.bundle.csv.CsvSave", + "properties":{ + "csvSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis_result.csv", + "header":"true", + "delimiter":"," + } + } + ], + "paths":[ + { + "from":"XmlParser", + "outport":"", + "inport":"", + "to":"SelectField" + }, + { + "from":"SelectField", + "outport":"", + "inport":"data1", + "to":"Merge" + }, + { + "from":"CsvParser", + "outport":"", + "inport":"data2", + "to":"Merge" + }, + { + "from":"Merge", + "outport":"", + "inport":"", + "to":"Fork" + }, + { + "from":"Fork", + "outport":"out1", + "inport":"", + "to":"PutHiveStreaming" + }, + { + "from":"Fork", + "outport":"out2", + "inport":"", + "to":"JsonSave" + }, + { + "from":"Fork", + "outport":"out3", + "inport":"", + "to":"CsvSave" + } + ] + } +} + diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/graphx/LabelPropagation.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/graphx/LabelPropagation.scala index ac7b4fb..fffe597 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/graphx/LabelPropagation.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/graphx/LabelPropagation.scala @@ -61,7 +61,7 @@ class LabelPropagation extends ConfigurableStop { } override def getGroup(): List[String] = { - List(StopGroupEnum.GraphXGroup.toString) + List(StopGroupEnum.GraphX.toString) } } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/graphx/LoadGraph.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/graphx/LoadGraph.scala index d33477f..96c830f 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/graphx/LoadGraph.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/graphx/LoadGraph.scala @@ -55,7 +55,7 @@ class LoadGraph extends ConfigurableStop { } override def getGroup(): List[String] = { - List(StopGroupEnum.GraphXGroup.toString) + List(StopGroupEnum.GraphX.toString) } } diff --git a/piflow-bundle/src/main/scala/cn/piflow/conf/bean/FlowBean.scala b/piflow-bundle/src/main/scala/cn/piflow/conf/bean/FlowBean.scala index ed4244e..afcf97f 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/conf/bean/FlowBean.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/conf/bean/FlowBean.scala @@ -11,6 +11,7 @@ class FlowBean { var uuid : String = _ var name : String = _ var checkpoint : String = _ + var checkpointParentProcessId : String = _ var stops : List[StopBean] = List() var paths : List[PathBean] = List() @@ -21,6 +22,7 @@ class FlowBean { this.uuid = MapUtil.get(flowMap,"uuid").asInstanceOf[String] this.name = MapUtil.get(flowMap,"name").asInstanceOf[String] this.checkpoint = flowMap.getOrElse("checkpoint","").asInstanceOf[String] + this.checkpointParentProcessId = flowMap.getOrElse("checkpointParentProcessId", "").asInstanceOf[String] //construct StopBean List val stopsList = MapUtil.get(flowMap,"stops").asInstanceOf[List[Map[String, Any]]] @@ -43,6 +45,7 @@ class FlowBean { val flow = new FlowImpl(); flow.setFlowName(this.name) + flow.setCheckpointParentProcessId(this.checkpointParentProcessId) this.stops.foreach( stopBean => { flow.addStop(stopBean.name,stopBean.constructStop()) diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/FlowTest.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/FlowTest.scala index 93ff00c..66c87f3 100644 --- a/piflow-bundle/src/test/scala/cn/piflow/bundle/FlowTest.scala +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/FlowTest.scala @@ -4,6 +4,7 @@ import cn.piflow.Runner import cn.piflow.conf.bean.FlowBean import cn.piflow.conf.util.{FileUtil, OptionUtil} import org.apache.spark.sql.SparkSession +import org.h2.tools.Server import org.junit.Test import scala.util.parsing.json.JSON @@ -14,7 +15,7 @@ class FlowTest { def testFlow(): Unit ={ //parse flow json - val file = "src/main/resources/flow.json" + val file = "src/main/resources/flow_checkpoint.json" val flowJsonStr = FileUtil.fileReader(file) val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]] println(map) @@ -23,16 +24,34 @@ class FlowTest { val flowBean = FlowBean(map) val flow = flowBean.constructFlow() + val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start() + //execute flow val spark = SparkSession.builder() .master("spark://10.0.86.89:7077") - .appName("piflow-hive-bundle") + .appName("piflow-hive-bundle-xjzhu") .config("spark.driver.memory", "1g") .config("spark.executor.memory", "2g") .config("spark.cores.max", "2") .config("spark.jars","/opt/project/piflow/out/artifacts/piflow_bundle/piflow-bundle.jar") .enableHiveSupport() .getOrCreate() +// val spark = SparkSession.builder() +// .master("yarn") +// .appName(flowBean.name) +// .config("spark.deploy.mode","cluster") +// .config("spark.hadoop.yarn.resourcemanager.hostname", "10.0.86.191") +// .config("spark.hadoop.cdefnprst.resourcemanager.address", "10.0.86.191:8032") +// .config("spark.yarn.access.namenode", "hdfs://10.0.86.191:9000") +// .config("spark.yarn.stagingDir", "hdfs://10.0.86.191:9000/tmp") +// .config("spark.yarn.jars", "hdfs://10.0.86.191:9000/user/spark/share/lib/*.jar") +// //.config("spark.driver.memory", "1g") +// //.config("spark.executor.memory", "1g") +// //.config("spark.cores.max", "2") +// .config("spark.jars", "/opt/project/piflow/piflow-server/target/piflow-server-0.9.jar") +// .config("hive.metastore.uris","thrift://10.0.86.191:9083") +// .enableHiveSupport() +// .getOrCreate() val process = Runner.create() .bind(classOf[SparkSession].getName, spark) diff --git a/piflow-core/src/main/scala/cn/piflow/main.scala b/piflow-core/src/main/scala/cn/piflow/main.scala index d65ffbe..85798fd 100644 --- a/piflow-core/src/main/scala/cn/piflow/main.scala +++ b/piflow-core/src/main/scala/cn/piflow/main.scala @@ -1,8 +1,13 @@ package cn.piflow +import java.io.IOException +import java.net.URI import java.util.concurrent.{CountDownLatch, TimeUnit} -import cn.piflow.util.{IdGenerator, Logging} +import cn.piflow.util.{HadoopFileUtil, IdGenerator, Logging} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.Path import org.apache.spark.sql._ import scala.collection.mutable.{ArrayBuffer, Map => MMap} @@ -20,6 +25,8 @@ trait JobInputStream { trait JobOutputStream { def makeCheckPoint(pec: JobContext): Unit; + def loadCheckPoint(pec: JobContext, path : String) : Unit; + def write(data: DataFrame); def write(bundle: String, data: DataFrame); @@ -65,6 +72,10 @@ trait Flow { def getFlowName(): String; def setFlowName(flowName : String): Unit; + + def getCheckpointParentProcessId() : String; + + def setCheckpointParentProcessId(checkpointParentProcessId : String); } class FlowImpl extends Flow { @@ -72,6 +83,7 @@ class FlowImpl extends Flow { val edges = ArrayBuffer[Edge](); val stops = MMap[String, Stop](); val checkpoints = ArrayBuffer[String](); + var checkpointParentProcessId = "" def addStop(name: String, process: Stop) = { stops(name) = process; @@ -113,8 +125,15 @@ class FlowImpl extends Flow { outgoingEdges.getOrElseUpdate(edge.stopFrom, ArrayBuffer[Edge]()) += edge; } - private def _visitProcess[T](processName: String, op: (String, Map[Edge, T]) => T, visited: MMap[String, T]): T = { + private def _visitProcess[T](flow: Flow, processName: String, op: (String, Map[Edge, T]) => T, visited: MMap[String, T]): T = { if (!visited.contains(processName)) { + + //TODO: need to check whether the checkpoint's data exist!!!! + if(flow.hasCheckPoint(processName) && !flow.getCheckpointParentProcessId().equals("")){ + val ret = op(processName, null); + visited(processName) = ret; + return ret; + } //executes dependent processes val inputs = if (incomingEdges.contains(processName)) { @@ -122,7 +141,7 @@ class FlowImpl extends Flow { val edges = incomingEdges(processName); edges.map { edge => edge -> - _visitProcess(edge.stopFrom, op, visited); + _visitProcess(flow, edge.stopFrom, op, visited); }.toMap } else { @@ -138,11 +157,11 @@ class FlowImpl extends Flow { } } - override def visit[T](op: (String, Map[Edge, T]) => T): Unit = { + override def visit[T](flow: Flow, op: (String, Map[Edge, T]) => T): Unit = { val ends = stops.keys.filterNot(outgoingEdges.contains(_)); val visited = MMap[String, T](); ends.foreach { - _visitProcess(_, op, visited); + _visitProcess(flow, _, op, visited); } } } @@ -154,10 +173,19 @@ class FlowImpl extends Flow { override def setFlowName(flowName : String): Unit = { this.name = flowName; } + + //get the processId + override def getCheckpointParentProcessId() : String = { + this.checkpointParentProcessId + } + + override def setCheckpointParentProcessId(checkpointParentProcessId : String) = { + this.checkpointParentProcessId = checkpointParentProcessId + } } trait AnalyzedFlowGraph { - def visit[T](op: (String, Map[Edge, T]) => T): Unit; + def visit[T](flow: Flow, op: (String, Map[Edge, T]) => T): Unit; } trait Process { @@ -208,18 +236,69 @@ class JobInputStreamImpl() extends JobInputStream { } class JobOutputStreamImpl() extends JobOutputStream with Logging { + private val defaultPort = "default" + override def makeCheckPoint(pec: JobContext) { mapDataFrame.foreach(en => { - val path = pec.get("checkpoint.path").asInstanceOf[String].stripSuffix("/") + "/" + pec.getProcessContext().getProcess().pid() + "/" + pec.getStopJob().jid(); + val port = if(en._1.equals("")) defaultPort else en._1 + val path = pec.get("checkpoint.path").asInstanceOf[String].stripSuffix("/") + "/" + pec.getProcessContext().getProcess().pid() + "/" + pec.getStopJob().getStopName() + "/" + port; + println("MakeCheckPoint Path: " + path) + //val path = getCheckPointPath(pec) logger.debug(s"writing data on checkpoint: $path"); + en._2.apply().write.parquet(path); mapDataFrame(en._1) = () => { logger.debug(s"loading data from checkpoint: $path"); - pec.get[SparkSession].read.parquet(path) + pec.get[SparkSession].read.parquet(path)//default port? }; }) } + //load the checkpoint by path and port + override def loadCheckPoint(pec: JobContext, checkpointPath : String): Unit = { + + val ports = getCheckPointPorts(pec, checkpointPath) + ports.foreach{ port => { + + val mdf = () => { + val checkpointPortPath = checkpointPath + "/" + port + logger.debug(s"loading data from checkpoint: $checkpointPortPath") + println(s"loading data from checkpoint: $checkpointPortPath") + pec.get[SparkSession].read.parquet(checkpointPortPath) + }; + + val newPort = if(port.equals(defaultPort)) "" else port + mapDataFrame(newPort) = mdf + + }} + } + + //get then checkpoint path + private def getCheckPointPath(pec: JobContext) : String = { + val pathStr = pec.get("checkpoint.path").asInstanceOf[String].stripSuffix("/") + "/" + pec.getProcessContext().getProcess().pid() + "/" + pec.getStopJob().getStopName(); + val conf:Configuration = new Configuration() + try{ + val fs:FileSystem = FileSystem.get(URI.create(pathStr), conf) + val path = new org.apache.hadoop.fs.Path(pathStr) + if(fs.exists(path)){ + pathStr + }else{ + "" + } + }catch{ + case ex:IOException =>{ + println(ex) + "" + } + case _ => "" + } + } + + //get the checkpoint ports list + private def getCheckPointPorts(pec: JobContext, checkpointPath : String) : List[String] = { + HadoopFileUtil.getFileInHadoopPath(checkpointPath) + } + val mapDataFrame = MMap[String, () => DataFrame](); override def write(data: DataFrame): Unit = write("", data); @@ -233,6 +312,7 @@ class JobOutputStreamImpl() extends JobOutputStream with Logging { def contains(port: String) = mapDataFrame.contains(port); def getDataFrame(port: String) = mapDataFrame(port); + } class ProcessImpl(flow: Flow, runnerContext: Context, runner: Runner, parentProcess: Option[Process] = None) @@ -265,9 +345,18 @@ class ProcessImpl(flow: Flow, runnerContext: Context, runner: Runner, parentProc } val analyzed = flow.analyze(); + val checkpointParentProcessId = flow.getCheckpointParentProcessId() + analyzed.visit[JobOutputStreamImpl](flow,performStopByCheckpoint) + + /*if (checkpointParentProcessId == "" ){ + analyzed.visit[JobOutputStreamImpl](performStop) + + }else{ + analyzed.visit[JobOutputStreamImpl](performStopByCheckpoint) + }*/ //runs processes - analyzed.visit[JobOutputStreamImpl]((stopName: String, inputs: Map[Edge, JobOutputStreamImpl]) => { + /*analyzed.visit[JobOutputStreamImpl]((stopName: String, inputs: Map[Edge, JobOutputStreamImpl]) => { val pe = jobs(stopName); var outputs: JobOutputStreamImpl = null; try { @@ -289,7 +378,71 @@ class ProcessImpl(flow: Flow, runnerContext: Context, runner: Runner, parentProc outputs; } - ); + );*/ + + def performStop(stopName: String, inputs: Map[Edge, JobOutputStreamImpl]) = { + val pe = jobs(stopName); + var outputs: JobOutputStreamImpl = null; + try { + runnerListener.onJobStarted(pe.getContext()); + outputs = pe.perform(inputs); + runnerListener.onJobCompleted(pe.getContext()); + + //is a checkpoint? + if (flow.hasCheckPoint(stopName)) { + //store dataset + outputs.makeCheckPoint(pe.getContext()); + } + } + catch { + case e: Throwable => + runnerListener.onJobFailed(pe.getContext()); + throw e; + } + + outputs; + } + + //perform stop use checkpoint + def performStopByCheckpoint(stopName: String, inputs: Map[Edge, JobOutputStreamImpl]) = { + val pe = jobs(stopName); + + var outputs : JobOutputStreamImpl = null + try { + runnerListener.onJobStarted(pe.getContext()); + + //new flow process + if (checkpointParentProcessId.equals("")) { + println("Visit process " + stopName + "!!!!!!!!!!!!!") + outputs = pe.perform(inputs); + runnerListener.onJobCompleted(pe.getContext()); + if (flow.hasCheckPoint(stopName)) { + outputs.makeCheckPoint(pe.getContext()); + } + }else{//read checkpoint from old process + if(flow.hasCheckPoint(stopName)){ + val pec = pe.getContext() + outputs = pec.getOutputStream().asInstanceOf[JobOutputStreamImpl]; + val checkpointPath = pec.get("checkpoint.path").asInstanceOf[String].stripSuffix("/") + "/" + checkpointParentProcessId + "/" + pec.getStopJob().getStopName(); + println("Visit process " + stopName + " by Checkpoint!!!!!!!!!!!!!") + outputs.loadCheckPoint(pe.getContext(),checkpointPath) + runnerListener.onJobCompleted(pe.getContext()); + }else{ + println("Visit process " + stopName + "!!!!!!!!!!!!!") + outputs = pe.perform(inputs); + runnerListener.onJobCompleted(pe.getContext()); + } + + } + } + catch { + case e: Throwable => + runnerListener.onJobFailed(pe.getContext()); + throw e; + } + + outputs; + } } override def run(): Unit = { diff --git a/piflow-core/src/main/scala/cn/piflow/runner.scala b/piflow-core/src/main/scala/cn/piflow/runner.scala index 3e66732..d42a8d0 100644 --- a/piflow-core/src/main/scala/cn/piflow/runner.scala +++ b/piflow-core/src/main/scala/cn/piflow/runner.scala @@ -119,7 +119,7 @@ class RunnerLogger extends RunnerListener with Logging { println(s"process started: $pid, flow: $flowName, time: $time") //update flow state to STARTED val appId = getAppId(ctx) - H2Util.addFlow(appId,ctx.getFlow().getFlowName()) + H2Util.addFlow(appId,pid,ctx.getFlow().getFlowName()) H2Util.updateFlowState(appId,FlowState.STARTED) H2Util.updateFlowStartTime(appId,time) }; diff --git a/piflow-core/src/main/scala/cn/piflow/util/H2Util.scala b/piflow-core/src/main/scala/cn/piflow/util/H2Util.scala index 1d0283d..bf27ce9 100644 --- a/piflow-core/src/main/scala/cn/piflow/util/H2Util.scala +++ b/piflow-core/src/main/scala/cn/piflow/util/H2Util.scala @@ -9,7 +9,7 @@ import net.liftweb.json.JsonDSL._ object H2Util { val QUERY_TIME = 30 - val CREATE_FLOW_TABLE = "create table if not exists flow (id varchar(255), name varchar(255), state varchar(255), startTime varchar(255), endTime varchar(255))" + val CREATE_FLOW_TABLE = "create table if not exists flow (id varchar(255), pid varchar(255), name varchar(255), state varchar(255), startTime varchar(255), endTime varchar(255))" val CREATE_STOP_TABLE = "create table if not exists stop (flowId varchar(255), name varchar(255), state varchar(255), startTime varchar(255), endTime varchar(255))" val serverIP = PropertyUtil.getPropertyValue("server.ip") + ":" + PropertyUtil.getPropertyValue("h2.port") val CONNECTION_URL = "jdbc:h2:tcp://" + serverIP + "/~/piflow;AUTO_SERVER=true" @@ -36,11 +36,11 @@ object H2Util { connection } - def addFlow(appId:String,name:String)={ + def addFlow(appId:String,pId:String, name:String)={ val startTime = new Date().toString val statement = getConnectionInstance().createStatement() statement.setQueryTimeout(QUERY_TIME) - statement.executeUpdate("insert into flow(id, name) values('" + appId + "','" + name + "')") + statement.executeUpdate("insert into flow(id, pid, name) values('" + appId + "','" + pId + "','" + name + "')") statement.close() } def updateFlowState(appId:String, state:String) = { @@ -82,6 +82,19 @@ object H2Util { state } + def getFlowProcessId(appId:String) : String = { + var pid = "" + val statement = getConnectionInstance().createStatement() + statement.setQueryTimeout(QUERY_TIME) + val rs : ResultSet = statement.executeQuery("select pid from flow where id='" + appId +"'") + while(rs.next()){ + pid = rs.getString("pid") + } + rs.close() + statement.close() + pid + } + def getFlowInfo(appId:String) : String = { val statement = getConnectionInstance().createStatement() statement.setQueryTimeout(QUERY_TIME) @@ -90,6 +103,7 @@ object H2Util { val flowRS : ResultSet = statement.executeQuery("select * from flow where id='" + appId +"'") while (flowRS.next()){ flowInfo = "{\"flow\":{\"id\":\"" + flowRS.getString("id") + + "\",\"pid\":\"" + flowRS.getString("pid") + "\",\"name\":\"" + flowRS.getString("name") + "\",\"state\":\"" + flowRS.getString("state") + "\",\"startTime\":\"" + flowRS.getString("startTime") + diff --git a/piflow-core/src/main/scala/cn/piflow/util/HadoopFileUtil.scala b/piflow-core/src/main/scala/cn/piflow/util/HadoopFileUtil.scala new file mode 100644 index 0000000..9132100 --- /dev/null +++ b/piflow-core/src/main/scala/cn/piflow/util/HadoopFileUtil.scala @@ -0,0 +1,27 @@ +package cn.piflow.util + +import java.net.URI + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.FileSystem + +object HadoopFileUtil { + + def getFileInHadoopPath(filePath : String) : List[String] = { + var fileList = List[String]() + if(!filePath.equals("")){ + try{ + val fs:FileSystem = FileSystem.get(URI.create(filePath), new Configuration()) + val path = new org.apache.hadoop.fs.Path(filePath) + val status = fs.listStatus(path) + status.foreach{ s => + fileList = s.getPath.getName +: fileList + } + }catch{ + case ex:Exception => println(ex) + } + } + fileList + } + +} diff --git a/piflow-server/src/main/scala/cn/piflow/api/API.scala b/piflow-server/src/main/scala/cn/piflow/api/API.scala index 58d16b8..af5a717 100644 --- a/piflow-server/src/main/scala/cn/piflow/api/API.scala +++ b/piflow-server/src/main/scala/cn/piflow/api/API.scala @@ -9,7 +9,7 @@ import org.apache.spark.sql.SparkSession import cn.piflow.conf.util.{ClassUtil, MapUtil, OptionUtil} import cn.piflow.{Process, Runner} import cn.piflow.api.util.PropertyUtil -import cn.piflow.util.H2Util +import cn.piflow.util.{H2Util, HadoopFileUtil} import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet, HttpPost} import org.apache.http.impl.client.HttpClients import org.apache.http.util.EntityUtils @@ -58,7 +58,7 @@ object API { (applicationId,process) }*/ - def startFlow(flowJson : String):(String,SparkAppHandle) = { + def startFlow(flowJson : String):(String,String,SparkAppHandle) = { var appId:String = null val map = OptionUtil.getAny(JSON.parseFull(flowJson)).asInstanceOf[Map[String, Any]] @@ -122,7 +122,12 @@ object API { while (appId == null){ Thread.sleep(1000) } - (appId, handle) + var processId = "" + while(processId.equals("")){ + Thread.sleep(1000) + processId = H2Util.getFlowProcessId(appId) + } + (appId, processId, handle) } @@ -151,6 +156,12 @@ object API { str } + def getFlowCheckpoint(processID:String) : String = { + val checkpointPath = PropertyUtil.getPropertyValue("checkpoint.path").stripSuffix("/") + "/" + processID + val checkpointList = HadoopFileUtil.getFileInHadoopPath(checkpointPath) + """{"checkpoints":"""" + checkpointList.mkString(",") + """"}""" + } + def getStopInfo(bundle : String) : String = { try{ diff --git a/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStartFlow.scala b/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStartFlow.scala index c6b171d..4a0ba87 100644 --- a/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStartFlow.scala +++ b/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStartFlow.scala @@ -61,7 +61,7 @@ object HTTPClientStartFlow { | "name":"Merge", | "bundle":"cn.piflow.bundle.common.Merge", | "properties":{ - | "inports":"" + | "inports":"data1,data2" | } | }, | { @@ -138,7 +138,7 @@ object HTTPClientStartFlow { | } |} """.stripMargin - val url = "http://10.0.86.191:8002/flow/start" + val url = "http://10.0.86.98:8001/flow/start" val client = HttpClients.createDefault() val post:HttpPost = new HttpPost(url) diff --git a/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStartFlow1.scala b/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStartFlow1.scala deleted file mode 100644 index a714f1f..0000000 --- a/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStartFlow1.scala +++ /dev/null @@ -1,79 +0,0 @@ -package cn.piflow.api - -import org.apache.http.client.methods.{CloseableHttpResponse, HttpPost} -import org.apache.http.entity.StringEntity -import org.apache.http.impl.client.HttpClients -import org.apache.http.util.EntityUtils - -object HTTPClientStartFlow1 { - - def main(args: Array[String]): Unit = { - val json = - """ - |{ - | "flow":{ - | "name":"xml2csv", - | "uuid":"1234", - | "checkpoint":"Merge", - | "stops":[ - | { - | "uuid":"1111", - | "name":"XmlParser", - | "bundle":"cn.piflow.bundle.xml.XmlParser", - | "properties":{ - | "xmlpath":"hdfs://10.0.86.89:9000/xjzhu/dblp.mini.xml", - | "rowTag":"phdthesis" - | } - | - | }, - | { - | "uuid":"2222", - | "name":"SelectField", - | "bundle":"cn.piflow.bundle.common.SelectField", - | "properties":{ - | "schema":"title,author,pages" - | } - | - | }, - | { - | "uuid":"888", - | "name":"CsvSave", - | "bundle":"cn.piflow.bundle.csv.CsvSave", - | "properties":{ - | "csvSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis_result.csv", - | "header":"true", - | "delimiter":"," - | } - | } - | ], - | "paths":[ - | { - | "from":"XmlParser", - | "outport":"", - | "inport":"", - | "to":"SelectField" - | }, - | { - | "from":"SelectField", - | "outport":"", - | "inport":"", - | "to":"CsvSave" - | } - | ] - | } - |} - """.stripMargin - val url = "http://10.0.86.98:8001/flow/start" - val client = HttpClients.createDefault() - val post:HttpPost = new HttpPost(url) - - post.addHeader("Content-Type", "application/json") - post.setEntity(new StringEntity(json)) - - val response:CloseableHttpResponse = client.execute(post) - val entity = response.getEntity - val str = EntityUtils.toString(entity,"UTF-8") - println("Code is " + str) - } - -} diff --git a/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStartFlowByCheckPoint.scala b/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStartFlowByCheckPoint.scala new file mode 100644 index 0000000..3ebbd4c --- /dev/null +++ b/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStartFlowByCheckPoint.scala @@ -0,0 +1,153 @@ +package cn.piflow.api + +import org.apache.http.client.methods.{CloseableHttpResponse, HttpPost} +import org.apache.http.entity.StringEntity +import org.apache.http.impl.client.HttpClients +import org.apache.http.util.EntityUtils + +object HTTPClientStartFlowByCheckPoint { + + def main(args: Array[String]): Unit = { + val json =""" + |{ + | "flow":{ + | "name":"xml,csv-merge-fork-hive,json,csv", + | "uuid":"1234", + | "checkpoint":"Merge", + | "checkpointParentProcessId":"process_67adfebe-1792-4baa-abc0-1591d29e0d49_1", + | "stops":[ + | { + | "uuid":"1111", + | "name":"XmlParser", + | "bundle":"cn.piflow.bundle.xml.XmlParser", + | "properties":{ + | "xmlpath":"hdfs://10.0.86.89:9000/xjzhu/dblp.mini.xml", + | "rowTag":"phdthesis" + | } + | + | }, + | { + | "uuid":"2222", + | "name":"SelectField", + | "bundle":"cn.piflow.bundle.common.SelectField", + | "properties":{ + | "schema":"title,author,pages" + | } + | + | }, + | { + | "uuid":"3333", + | "name":"PutHiveStreaming", + | "bundle":"cn.piflow.bundle.hive.PutHiveStreaming", + | "properties":{ + | "database":"sparktest", + | "table":"dblp_phdthesis" + | } + | }, + | { + | "uuid":"4444", + | "name":"CsvParser", + | "bundle":"cn.piflow.bundle.csv.CsvParser", + | "properties":{ + | "csvPath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.csv", + | "header":"false", + | "delimiter":",", + | "schema":"title,author,pages" + | } + | }, + | { + | "uuid":"555", + | "name":"Merge", + | "bundle":"cn.piflow.bundle.common.Merge", + | "properties":{ + | "inports":"data1,data2" + | } + | }, + | { + | "uuid":"666", + | "name":"Fork", + | "bundle":"cn.piflow.bundle.common.Fork", + | "properties":{ + | "outports":"out1,out2,out3" + | } + | }, + | { + | "uuid":"777", + | "name":"JsonSave", + | "bundle":"cn.piflow.bundle.json.JsonSave", + | "properties":{ + | "jsonSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.json" + | } + | }, + | { + | "uuid":"888", + | "name":"CsvSave", + | "bundle":"cn.piflow.bundle.csv.CsvSave", + | "properties":{ + | "csvSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis_result.csv", + | "header":"true", + | "delimiter":"," + | } + | } + | ], + | "paths":[ + | { + | "from":"XmlParser", + | "outport":"", + | "inport":"", + | "to":"SelectField" + | }, + | { + | "from":"SelectField", + | "outport":"", + | "inport":"data1", + | "to":"Merge" + | }, + | { + | "from":"CsvParser", + | "outport":"", + | "inport":"data2", + | "to":"Merge" + | }, + | { + | "from":"Merge", + | "outport":"", + | "inport":"", + | "to":"Fork" + | }, + | { + | "from":"Fork", + | "outport":"out1", + | "inport":"", + | "to":"PutHiveStreaming" + | }, + | { + | "from":"Fork", + | "outport":"out2", + | "inport":"", + | "to":"JsonSave" + | }, + | { + | "from":"Fork", + | "outport":"out3", + | "inport":"", + | "to":"CsvSave" + | } + | ] + | } + |} + """.stripMargin + val url = "http://10.0.86.98:8001/flow/start" + val client = HttpClients.createDefault() + val post:HttpPost = new HttpPost(url) + + post.addHeader("Content-Type", "application/json") + post.setEntity(new StringEntity(json)) + + val response:CloseableHttpResponse = client.execute(post) + val entity = response.getEntity + val str = EntityUtils.toString(entity,"UTF-8") + println("Code is " + str) + } + +} diff --git a/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala b/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala index cae3762..c38e857 100644 --- a/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala +++ b/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala @@ -93,6 +93,18 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup } + case HttpRequest(GET, Uri.Path("/flow/checkpoints"), headers, entity, protocol) => { + + val processID = req.getUri().query().getOrElse("processID","") + if(!processID.equals("")){ + val result = API.getFlowCheckpoint(processID) + Future.successful(HttpResponse(entity = result)) + }else{ + Future.successful(HttpResponse(entity = "processID is null or flow does not exist!")) + } + + } + case HttpRequest(POST, Uri.Path("/flow/start"), headers, entity, protocol) =>{ entity match { @@ -100,9 +112,10 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup var flowJson = data.utf8String flowJson = flowJson.replaceAll("}","}\n") //flowJson = JsonFormatTool.formatJson(flowJson) - val (appId,process) = API.startFlow(flowJson) + val (appId,pid,process) = API.startFlow(flowJson) processMap += (appId -> process) - Future.successful(HttpResponse(entity = appId)) + val result = "{\"flow\":{\"id\":\"" + appId + "\",\"pid\":\"" + pid + "\"}}" + Future.successful(HttpResponse(entity = result)) } case ex => { diff --git a/piflow-server/src/main/scala/cn/piflow/api/HttpClientGetFlowCheckpoints.scala b/piflow-server/src/main/scala/cn/piflow/api/HttpClientGetFlowCheckpoints.scala new file mode 100644 index 0000000..3d3ab35 --- /dev/null +++ b/piflow-server/src/main/scala/cn/piflow/api/HttpClientGetFlowCheckpoints.scala @@ -0,0 +1,21 @@ +package cn.piflow.api + +import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet} +import org.apache.http.impl.client.HttpClients +import org.apache.http.util.EntityUtils + +object HttpClientGetFlowCheckpoints { + + def main(args: Array[String]): Unit = { + + val url = "http://10.0.86.98:8001/flow/checkpoints?processID=process_088b2604-806e-4ccf-99cc-696aeaef730a_1" + val client = HttpClients.createDefault() + val getFlowInfo:HttpGet = new HttpGet(url) + + val response:CloseableHttpResponse = client.execute(getFlowInfo) + val entity = response.getEntity + val str = EntityUtils.toString(entity,"UTF-8") + println("Flow checkpoint is " + str) + } + +} From 7449cbf1dcc9837599fa5446189cbd2abecb5516 Mon Sep 17 00:00:00 2001 From: judy0131 Date: Wed, 21 Nov 2018 11:26:45 +0800 Subject: [PATCH 3/3] fix bug: get flow progress using checkpoint --- piflow-core/src/main/scala/cn/piflow/util/H2Util.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piflow-core/src/main/scala/cn/piflow/util/H2Util.scala b/piflow-core/src/main/scala/cn/piflow/util/H2Util.scala index bf27ce9..01f14d4 100644 --- a/piflow-core/src/main/scala/cn/piflow/util/H2Util.scala +++ b/piflow-core/src/main/scala/cn/piflow/util/H2Util.scala @@ -138,7 +138,7 @@ object H2Util { var stopCount = 0 var completedStopCount = 0 - val totalRS : ResultSet = statement.executeQuery("select count(*) as stopCount from stop where flowId='" + appId +"'") + val totalRS : ResultSet = statement.executeQuery("select count(*) as stopCount from stop where flowId='" + appId +"' and state!='" + StopState.INIT + "'") while(totalRS.next()){ stopCount = totalRS.getInt("stopCount") //println("stopCount:" + stopCount)