From e97fac9d8441ce8e24e8a3678b0a86efe908f1b4 Mon Sep 17 00:00:00 2001 From: yanfqidong0604 Date: Thu, 22 Nov 2018 16:52:05 +0800 Subject: [PATCH] Schema substitution for dataframe About downloading network files to HDFS About HDFS file decompression yang qidong --- piflow-bundle/src/main/resources/convert.jpg | Bin 0 -> 11882 bytes .../src/main/resources/core-site.xml | 17 ++ .../src/main/resources/hdfs-site.xml | 44 ++++ .../piflow/bundle/common/ConvertSchema.scala | 63 ++++++ .../cn/piflow/bundle/http/FileDownHDFS.scala | 93 ++++++++ .../piflow/bundle/http/UnzipFilesOnHDFS.scala | 204 ++++++++++++++++++ 6 files changed, 421 insertions(+) create mode 100644 piflow-bundle/src/main/resources/convert.jpg create mode 100644 piflow-bundle/src/main/resources/core-site.xml create mode 100644 piflow-bundle/src/main/resources/hdfs-site.xml create mode 100644 piflow-bundle/src/main/scala/cn/piflow/bundle/common/ConvertSchema.scala create mode 100644 piflow-bundle/src/main/scala/cn/piflow/bundle/http/FileDownHDFS.scala create mode 100644 piflow-bundle/src/main/scala/cn/piflow/bundle/http/UnzipFilesOnHDFS.scala diff --git a/piflow-bundle/src/main/resources/convert.jpg b/piflow-bundle/src/main/resources/convert.jpg new file mode 100644 index 0000000000000000000000000000000000000000..67ba8c7b68d48986009d2fc49f4995e86388f0da GIT binary patch literal 11882 zcmeHt2UJt*^6!qJOAIIo(t?80M3g2yAW{^Sq7s@!1q4JuDH1Rc1d-ka6s0H~fdB$Z z?@d8ENN-Z5BoH75NM1bmoqIgzf12mLyVhH8-Oc()_P1N+o0;Ft{AN==Q6>PE^P1;0 z0T2iPK-52gG7P8#40Lq#bhHfg^z@933{1>y5azvmnGfwh0A=Ilg7NWk@$m2qo)F?c za$JCiNBFeJ@sp>drKMp)^2&0ON++bGCBNMS#K_3VyqB310^yWA%yU@shc`+Mz{&t3 zfCymF5nvB12+RtiGypK_@1zC&v;qHn1MLCR(9+Q}Ff#3>o>0sJ>;ZwnduYJ4v@|r- zvpuQD0UB1?{fAGf(H+pgL4V{Po8-gDmka`D3ToL6x^RM0H=TSKnGSMr9^yKBOh{Nn zR9Z&%w4A(x`dJN4E$wsXFJHN8h%hoXv9PqdWo=`7+u6nSz8lKj!}pQjWB-7_ps464 zF|lz^pT)mQNli=7$jr(vd{b0h^7dV6SzUcYV^ecW>-+AW-oE~UPlH2~Q`4VkX6NP? z7V&HAgpJKD;#bl)xj+E;Ut&@J|4Xpn$i+&PYYz<#n1=qFT%bK})B$Ftp*?(xZois7 z{f&DEjz~UaU^^4}vY?hxK*|8ee$%Oo>7bzWAQ)%@ucW6P@dWnlJa|PTivb$Lk%OiA=z}4OTd5(+B5r z>8uXsxGp|G+QW1xVkD|hK4gqD6~R;iUs~g=7DVz*?qO%OY?2B zG*G0k0kG+tk;|tY_Yv(G`(#W}X3POwRvi<<;N({o>2sS8ad#w2B;H=2cl8Hv$Q;dR z9;Y3NC~S3i(A}D7*{My28oVl9KG3Yfjc0)k##a_$dp<}MR|{MN%*XLTAk>AOi<1!MnjtI2kD_s(66^xm6T2oYvj%stdC>aC@zX}4- zPD%#!p@W=Ghb}Q-&*7|_l2}o3+qUvnc%_BEO$X?ZvY#F7MN<+swI6^h)ojVHgYdGu=c*P zH)v5s2(4d=Z)`K%^wF^wm4=lH#d*zCN09XB<{F@3CSe8AC44+?g+2J9k8-iP&G}gV zc-s?#wzncbM~3Ohx57iUzUfp0P>|;nf^rwieU^N|Y92b#5mI#oSDsec)~ugT|1{%H zVfdzLu6rdLXmjfk+A0dHlOn8Z$e zb~W-xwnd1|3=t$kNMG!btd3U>%osm(@Ks5`TsD88O3SAI%Q!^U;5RM)JjD7`=!msN zFpd*3ikO$Tjl!erT6csPV}!eoE4)jPDh%3mCSVe6)azgOVrmH6%FHf)k+^&al6x-|`ZV1#v-(ZikB@38Dd4BoM}Ovtw=mv3lJnf~To2#6_SfzkAe$KCWfHA`^Hd@=`uxAbi90bf~rZO|o@c1t1{GxJL z#EjdLrK@q>3b0*Z1G>EoZ}L_y48mLPrb^Sv*~9eozdV8W=hbhkqt11#+7~lWfaM5y zzqhhe(E2tkRE4gxMxo%X5cxsF2eM4>LX}9TtC#ey`TNQ0vS`S7oCz@SDuSoG^KV0y zwl)x@3hs}sfKDSC<@qBkWRgei?`l^jO0`gc>##Xht|S(+{IE^*#Wdnw4L1*eyOE@L zxehc2MUGjYEuIobHk=N1%^q)e7K2SY(0979KRqWT__&b}r+A>TI?H>>>vr(8{>V@pUYDCrOWg+oc&=->BkN#+(~bS`L64%YyaHf?u-?-P)gNbHeDbqr zgJ$7;OxXA}JXpu7l8DZDcrB1a9ZpeH|DuM;Tiz;r|r)f?!W}I~cFrhfzd0KbcpFsB;@czg!@7gfUJz!YU zoAX&jx!nIUVHtS`g=jc9CX5@6#om5zh^n=sUpbAJn6R6SC+*#I!tc9mfZM@t-E(FT z!rk$D0$Tl74A%gh%@uJ;JIAUx<1|lWtkwm!*6%ws^&~!H|2(~Edx?C^afgNiEWIAL zNrr-{u4NNUe(Xj8YSbt|qdo;-?&8>GUZnuu78D>Di+<&;f{EU=<@fs1P)Y$j24LGV z)}!S1p|WH4uI1-+x6Eaccx&}^ceX%|m=M%w65 z#Jy)X`Mq4Xu4>$~hm43r8qOWRSTB&tPp0!$p>1T}zyy;SP|gi3t+LNKdnCdSPr=G9 zR;nkF3dHi7p>dzpico`*fT!feN%?d11>`57cBJ_f%dJ1T_YP|Qj5LrjP^ZB6#%*%<*1(kl&*D^;`w4uBxIW&-&|w99(IM~`>8*M4TKMGa0Hv7xEmaA zxt+6;=&(NpbN7V4%TuUoCT2|vS0E-{zk8&H6_04Y^Xc=+Ljrhq{09+3(Hft94AMZ% zO46Hw;EK>QbM+jOH|1;NhlPmES9i4FeyD49L<>Z;8eTRbtEqw8z3<~bNvVznRhWyu z-jG?dyESdw1upp%V|M?$np*2wnST~J!Bdz36R~+d`Mx{GM#2e+N@3X};d5q zIxWh09cWbsumvjniC0jjH6@WB4w-&|n*akwK`HfSZqr1urD>!Rx!Xifr;^+7%B%*8*T00JW7Y;-n@LsuHh7}HaHymH znoO$a+R#dg#rTYp*V8<51^1>y-`3C~N&Vz|$&n*lwG$NJ{8s#*npZ~FLTr51b7eom z8B^QNl$R@F(;Wfk;7XiI#b%*ctCfhjy^5R_L1n6{Bc~EmPqGelDV5$e>=Syp&6n+| zwqygIJTKORcEj13;)6VcUCMGzDLQ0u`wo#0=$jE=FBHUqbR`SS=1LYIv0aG zYC{41Sf{aH*NUlt9fQrK;_iQsf44q%*6O>7Xm$5vt`ExXIBP!(xQA#>HhP}c0Ne-s zRG3l7`r~i+4<~5iFP$AwWyu)yK)e#o7QU*NEe9}zbpASGvzzMqE}OWvS4qt_w0`vw zZD#}Fj@G%mNoH)Bh4-iALheflAbXEA$(prP6Y)7DpcgmV@oDE`+6V>cP|O^r0DSCQ z=TEvA@-noe4_FRMEDvuNX6@RoavvMDa2P8P*%jX7@golPe|&Cc-r1%xHHVfr_KJH$ zGL;L3_n3&yVOY_3@YRoHJ|u|gbr-L?iQYeaOR?8eQzKIn*b1OP@ zit|cnkOw%*JsI*N6d3DM(b$^+STKG~UPazE5YKG$@bb2hidtpIIp+`xP(GBoV^56P zjkX!%?iQi|7ZbG0BB7*Iczx1kq^!f7@;rF8WzQ`J! z6taJ^f)Nz@>UEVsX9KeNu894;6yo>hCaQ#X6pQgq0qy&9JaXTSI?AeSZPR}mXc^e# zP_WwBL_|s>SW3^aK4v&lwTV4Stj9Gf>E1Y2oTf^L-80jNTJOA10UmUFtC+k>64Yf5 zG1#@I}x?vnPOypLx)Da8iv|1Ko;`o^(ceEpIGyeM&fS>zGYX(&2FOd(3$NKO~}Sa-Nzvq!$;* zZ!U`2q2i6xt_K838YTL8%P2`qzNP>N?ILVFs8mqIPWvE7IqdefSX~uoJ}%bRF>L|w zr`_x5wRZYG++>z~a7Wp{hQ#9DWqP$tZ(zUcFu7)vef;m&&-!L&RK<%otR27GkT&{h zLNx50jK~5~apx{E);d;~X?~qNgQ|>yj=@Lx#)?2qXaq4(BGk19k`q#U%7rK$&~nAf>6{x7jX zcBwc-NHLVw4!>xqxDnNX|8097*_sp zrpXQ&&Ee=TG&O4|tQU=Oxlv1Bd{F~H2jX=ni<^C9EbiUG9t(GxYl&E&Ni63iO|N4W zvE{C#`3m{zKuhBo-&62pdWB?*%U%FUQv{h)><65_p+_F7A~ zc4I=AabWAQs$L-qCJgoC~CqZEJ%_?j3* zviQ{$eDZ~oG6hhxS6@TolA+J$^M7_$vA43V%vJ#Ip>2P1Ww6fUu+^JE8A!KVf2w0r|&MJQojel=WED7)S$ff#_ z&2q-=doynIVH(?A`700?Ha%Jh?A)8qqSyAab*${gWi%SFw(a=pf5x+d)?v$s_>>&4 zQ-HAo@Y*d3z=eOmIX-m$NY&AB~$Hz(|T25I2E z%m)3^oGbr5+t-%j6n!1ZUq zWPg@P4A!8KuU{;T|@3hT+-vk$HFj4tS^zR9z* zy>?AqggN6Sy?iqL|1|SIVK}IeU{OwOPD{*$Lnmu^ySoe;qePWLRKCk%b1|jtfX8=F_OyC31>Z(~EpzG}J z>@E^JJ7t`zY@&5scTSJWX^EB}eqTV%8W5A?fqbI4Ht$cj-$-AZeDO5^1Mb1)T?zt7 zDK&6z7wiEg53jrfmwL4)3K|SxH$7k$<~a=fEKbl1h#J)xeqSHOfzllbVt1KC;*f6O zto7ru{q}ht1vQgUf;4EQiXYGSY%)~zqSnlB&``*s<&Pr2~8zlo_zz5BaI22-b~w8h_I$?+9+J+R0q#u>A?1wT$#S zPi0~7{NGLG<&uI;oNpr`G68*V()#W-nDTbCJc&GdNLhnOtwyCt1UrKEjglh3zXUH5JWw>~X;)6#uPNGUoV!2VeS@I0d- z&3*=X8UuY*7;@(b{wWm+kE|!T?gp@oSiink?8$x<}M%Ts$!& zk}>v>d9LuqPA-v)v~cK}x)_~-B!d#G>b^gq34YCiy-J+jYt$BZKx;pA>OK?Pr0yMl z_0clYl6<(QtCD%j`(%~W*lzHoLyp_^_xo8_OTDq-UlyUkRbsf=Qe20ANzcY}v8=6z zmdN*edE}I!>2_53`MoIf{oFiyFWP05XMR9Ab^$~>;EQzdjYP7b1P3!hrJqU6X^UHw zYDJyf#|AjJdq!GM<%0E^2W6fga^4*rp!Q~N2HjTBei+&Jgdpk!QTXdF`3L;LV8M0F0v=`6`!ekOx(cjW?Y_XS2$iM zY}%{eyN^m&t0R@Byg{{C3{ea3w_i?4aY{ahT~)`fMCpMT1qema3dj+dcNQk~JyCEW_Uz+hQJPEvjJJfmg+a&S#Zcz9 zsYcVP#tC5nU+REAYI>R=n=2mf_sTfU;41bgjOvOmZlw4rve?emGQ9H6kY=2eMoL}d zIm6l)VROSn73toxGD+-qiTv-}H!fb9oqSF-ZqX;`H(DM@Bs}?>y&f7InZ)x2}sv}KpJrdFj(rYmm z!m&fH%wdW_K5@hiL=k~N_p)Oj4el%V(E}_z9 zcL^|cbH=O%w;_$`q1y+ehA;UuN1cEGms6zmRS$||^OipEHyORSH!PGpMqn*3=);t5 zNDjb44A&D?I36t(-d}1W4DQS&%c2(P=(n$0@7-e%UC6ZIJ>;=2POKQhvo6S(@kxVh3;}K!> z3M_t#)_U{Kb#jp&-RRHB*dKjb+Iewyp|$Q@?<*R9kfyvB+olpC&yLjqP{=@xwW;D` z+X}_Z=QrJ1%x2=|Ii1a|a?w2a^bq5)Schp?^X*y+aH)6`!kG;9btv+@)H>);w4G_} z5qS7Ucz4$Iu;Q)WSX|qwD0sbFp82$Teq^}8b#rR*OLPd@)|{Kf_&E5l zD#=?yvqdhGxTq>+!qij~K>lQsBR&}#W`ixLp43HgkiGRW;}^-&Nw|!Py;LqsxO%mO znl1rfS0^Hy8$AUUB*?+HNu<(?u*qsaX~-pUt&80jJ}bwUB2EcfUf&|f^!>XMribBm z^*p5}6U*<0>9<$M<|z1lme|0CKs+t*^Y4h)+=?R7QEpKE3XB<|wfEoV@B=yNw`)KI zdg=llQcoueEV)VYG`mdFd5AD!o=GE~pxm^2R|);`9}SHj`DufzXsNGz6#tQu{^mRn zym9o@hOQr}_f(#_TtD?)B;a~8fv!qUW~*_g$SwBr(0fpr%^>Fk)*xbZlM2r}Tt<+i zOYp-si6=L@BG#ZYs+?DTuuJ-{&W%YTU)KpG4Xp8kY|7SKNt%x;^D7vTDoh*AjeDVw zRxC+erUIsw(gWc^=I~ z^cuSAON=&nw6{Dm`096HGSLCNw1C2IQ@($?pO4m=7~R`glBkKj8EvuBqJluCFpW_|JsJ;Nn&Xd5!>yz< z@tx(krj>ug-1RkD5T72Q`#eQQ0UWwXW$<8Wo{?0NM4bXK2nA7J{Xq6&R_m#R@^^cP z;a)xZ1~|?Kg(ds#YLRzjVBdZBto)~!FMqq$pIvVL?YI1=p8k^{|LExti9fHyPnV~> zMAPFPa`x*K`Csz$mQN@g-)g~Hx>OkeUJ;xB VzV7*}cI6M% + + + fs.defaultFS + hdfs://master:9000 + + + + io.file.buffer.size + 131072 + + + + hadoop.tmp.dir + /opt/hadoop-2.6.0/tmp + + diff --git a/piflow-bundle/src/main/resources/hdfs-site.xml b/piflow-bundle/src/main/resources/hdfs-site.xml new file mode 100644 index 0000000..1d98eac --- /dev/null +++ b/piflow-bundle/src/main/resources/hdfs-site.xml @@ -0,0 +1,44 @@ + + + + dfs.namenode.http-address + master:50070 + + + + dfs.namenode.secondary.http-address + master:50090 + + + dfs.replication + 1 + + + + dfs.namenode.name.dir + file:/opt/hadoop-2.6.0/dfs/name + + + + dfs.datanode.data.dir + file:/opt/hadoop-2.6.0/dfs/data + + + dfs.client.read.shortcircuit + true + + + dfs.domain.socket.path + /var/run/hadoop-hdfs/dn._PORT + + + + dfs.client.file-block-storage-locations.timeout + 10000 + + + dfs.datanode.hdfs-blocks-metadata.enabled + true + + + diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/ConvertSchema.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/ConvertSchema.scala new file mode 100644 index 0000000..230df82 --- /dev/null +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/ConvertSchema.scala @@ -0,0 +1,63 @@ +package cn.piflow.bundle.common + +import cn.piflow._ +import cn.piflow.conf._ +import cn.piflow.conf.bean.PropertyDescriptor +import cn.piflow.conf.util.{ImageUtil, MapUtil} + + +class ConvertSchema extends ConfigurableStop { + + val authorEmail: String = "yangqidong@cnic.cn" + val description: String = "convert data field." + val inportList: List[String] = List(PortEnum.DefaultPort.toString) + val outportList: List[String] = List(PortEnum.DefaultPort.toString) + + var schema:String = _ + + def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { + val df = in.read() + + //oldField1->newField1, oldField2->newField2 + val field = schema.split(",") + + field.foreach(f => { + val old_new: Array[String] = f.split("->") + df.withColumnRenamed(old_new(0),old_new(1)) + }) + + println("###########################") + df.show(20) + println("###########################") + + out.write(df) + + } + + def initialize(ctx: ProcessContext): Unit = { + + } + + def setProperties(map : Map[String, Any]): Unit = { + schema = MapUtil.get(map,"schema").asInstanceOf[String] + } + + override def getPropertyDescriptor(): List[PropertyDescriptor] = { + var descriptor : List[PropertyDescriptor] = List() + val inports = new PropertyDescriptor().name("schema").displayName("schema").description("The Schema you want to convert,You can write like this: oldField1->newField1, oldField2->newField2").defaultValue("").required(true) + descriptor = inports :: descriptor + descriptor + } + + override def getIcon(): Array[Byte] = { + ImageUtil.getImage("convert.jpg") + } + + override def getGroup(): List[String] = { + List(StopGroupEnum.CommonGroup.toString) + } + +} + + + diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/http/FileDownHDFS.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/http/FileDownHDFS.scala new file mode 100644 index 0000000..9c824f0 --- /dev/null +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/http/FileDownHDFS.scala @@ -0,0 +1,93 @@ +package cn.piflow.bundle.http + +import java.io.InputStream +import java.net.{HttpURLConnection, URL} + +import cn.piflow.conf._ +import cn.piflow.conf.bean.PropertyDescriptor +import cn.piflow.conf.util.{ImageUtil, MapUtil} +import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem, Path} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.sql.{DataFrame, Row, SparkSession} + +class FileDownHDFS extends ConfigurableStop{ + val authorEmail: String = "yangqidong@cnic.cn" + val description: String = "Download network files to HDFS" + val inportList: List[String] = List(PortEnum.NonePort.toString) + val outportList: List[String] = List(PortEnum.DefaultPort.toString) + + var url_str:String =_ + var savePath:String=_ + + def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { + val spark = pec.get[SparkSession]() + + val url=new URL(url_str) + val uc:HttpURLConnection=url.openConnection().asInstanceOf[HttpURLConnection] + uc.setDoInput(true) + uc.connect() + val inputStream:InputStream=uc.getInputStream() + + val buffer=new Array[Byte](1024*1024*10) + var byteRead= -1 + + + + val configuration: Configuration = new Configuration() + val fs = FileSystem.get(configuration) + val fdos: FSDataOutputStream = fs.create(new Path(savePath)) + + + while(((byteRead=inputStream.read(buffer)) != -1) && (byteRead != -1)){ + fdos.write(buffer,0,byteRead) + fdos.flush() + } + + inputStream.close() + fdos.close() + + + var seq:Seq[String]=Seq(savePath) + val row: Row = Row.fromSeq(seq) + val list:List[Row]=List(row) + val rdd: RDD[Row] = spark.sparkContext.makeRDD(list) + val fields: Array[StructField] =Array(StructField("savePath",StringType,nullable = true)) + val schema: StructType = StructType(fields) + val df: DataFrame = spark.createDataFrame(rdd,schema) + + out.write(df) + + + } + + def initialize(ctx: ProcessContext): Unit = { + + } + + def setProperties(map: Map[String, Any]): Unit = { + url_str=MapUtil.get(map,key="url_str").asInstanceOf[String] + savePath=MapUtil.get(map,key="savePath").asInstanceOf[String] + } + + override def getPropertyDescriptor(): List[PropertyDescriptor] = { + var descriptor : List[PropertyDescriptor] = List() + val url_str = new PropertyDescriptor().name("url_str").displayName("URL").description("Network address of file").defaultValue("").required(true) + val savePath = new PropertyDescriptor().name("savePath").displayName("savePath").description("The HDFS path and name you want to save, such as hdfs://10.0.86.89:9000/a/a.gz").defaultValue("").required(true) + descriptor = url_str :: descriptor + descriptor = savePath :: descriptor + descriptor + } + + override def getIcon(): Array[Byte] = { + ImageUtil.getImage("http.png") + } + + override def getGroup(): List[String] = { + List(StopGroupEnum.HttpGroup.toString) + } + + +} diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/http/UnzipFilesOnHDFS.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/http/UnzipFilesOnHDFS.scala new file mode 100644 index 0000000..9297dc6 --- /dev/null +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/http/UnzipFilesOnHDFS.scala @@ -0,0 +1,204 @@ +package cn.piflow.bundle.http + +import java.util.zip.GZIPInputStream + +import cn.piflow.conf._ +import cn.piflow.conf.bean.PropertyDescriptor +import cn.piflow.conf.util.{ImageUtil, MapUtil} +import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FSDataInputStream, FileSystem, Path} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.sql.{DataFrame, Row, SparkSession} + +class UnzipFilesOnHDFS extends ConfigurableStop { + val authorEmail: String = "yangqidong@cnic.cn" + val description: String = "Unzip files on HDFS" + val inportList: List[String] = List(PortEnum.NonePort.toString) + val outportList: List[String] = List(PortEnum.DefaultPort.toString) + + var filePath:String=_ + var fileType:String=_ + var unzipPath:String=_ + + def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { + + val session: SparkSession = pec.get[SparkSession]() + + val configuration: Configuration = new Configuration() + val fs = FileSystem.get(configuration) + val fdis: FSDataInputStream = fs.open(new Path(filePath)) + + + val filePathArr: Array[String] = filePath.split("/") + var fileName: String = filePathArr.last + if(fileName.length == 0){ + fileName = filePathArr(filePathArr.size-2) + } + + if(fileType.equals("gz")){ + + val gzip: GZIPInputStream = new GZIPInputStream(fdis) + var n = -1 + val buf=new Array[Byte](10*1024*1024) + val savePath = new Path(unzipPath +fileName.replace(".gz","")) + val fdos = fs.create(savePath) + while((n=gzip.read(buf)) != -1 && n != -1){ + fdos.write(buf,0,n) + fdos.flush() + } + + + }/*else if(fileType.equals("tar")){ + + var entryNum:Int=0 + var entryFileName:String=null + var entryFile:File=null + var subEntryFile:File=null + var subEntryFileName:String=null + var tarArchiveEntries:Array[TarArchiveEntry]=null + var fileList:List[String]=List() + var fos:FileOutputStream=null + + var entry: TarArchiveEntry = null + val tarIs: TarArchiveInputStream = new TarArchiveInputStream(fdis) + while ((entry = tarIs.getNextTarEntry) != null && entry != null) { + entryFileName= localPath +File.separator+entry.getName() + entryFile=new File(entryFileName) + entryNum += 1 + if(entry.isDirectory()){ + if(!entryFile.exists()){ + entryFile.mkdirs() + } + tarArchiveEntries=entry.getDirectoryEntries() + for(i<-0 until tarArchiveEntries.length){ + subEntryFileName=entryFileName+File.separator+tarArchiveEntries(i).getName() + subEntryFile=new File(subEntryFileName) + fileList=subEntryFileName::fileList + fos=new FileOutputStream(subEntryFile) + var mark = -1 + val buf=new Array[Byte](4*1024) + while((mark=tarIs.read(buf)) != -1 && mark != -1){ + fos.write(buf,0,mark) + } + fos.close() + fos=null + } + }else{ + fileList = entryFileName :: fileList + fos=new FileOutputStream(entryFile) + var mark = -1 + val buf=new Array[Byte](4*1024) + while((mark=tarIs.read(buf)) != -1 && mark != -1){ + fos.write(buf,0,mark) + } + fos.close() + fos=null + } + + } + if(entryNum==0){ + println("there is no file!") + } + + }else if(fileType.equals("tar.gz")){ + + var entryNum:Int=0 + var entryFileName:String=null + var entryFile:File=null + var subEntryFile:File=null + var subEntryFileName:String=null + var tarArchiveEntries:Array[TarArchiveEntry]=null + var fileList:List[String]=List() + var fos:FileOutputStream=null + + var entry: TarArchiveEntry = null + val gzip:GZIPInputStream=new GZIPInputStream(fdis) + val tarIs: TarArchiveInputStream = new TarArchiveInputStream(gzip) + while ((entry = tarIs.getNextTarEntry) != null && entry != null) { + entryFileName=localPath +File.separator+entry.getName() + entryFile=new File(entryFileName) + entryNum += 1 + if(entry.isDirectory()){ + if(!entryFile.exists()){ + entryFile.mkdirs() + } + tarArchiveEntries=entry.getDirectoryEntries() + for(i<-0 until tarArchiveEntries.length){ + subEntryFileName=entryFileName+File.separator+tarArchiveEntries(i).getName() + subEntryFile=new File(subEntryFileName) + fileList=subEntryFileName::fileList + fos=new FileOutputStream(subEntryFile) + var mark = -1 + val buf=new Array[Byte](4*1024) + while((mark=tarIs.read(buf)) != -1 && mark != -1){ + fos.write(buf,0,mark) + } + fos.close() + fos=null + } + }else{ + fileList = entryFileName :: fileList + fos=new FileOutputStream(entryFile) + var mark = -1 + val buf=new Array[Byte](4*1024) + while((mark=tarIs.read(buf)) != -1 && mark != -1){ + fos.write(buf,0,mark) + } + fos.close() + fos=null + } + + } + if(entryNum==0){ + println("there is no file!") + } + }*/else{ + throw new RuntimeException("File type fill in error, or do not support this type.") + } + + var seq:Seq[String]=Seq(unzipPath) + val row: Row = Row.fromSeq(seq) + val list:List[Row]=List(row) + val rdd: RDD[Row] = session.sparkContext.makeRDD(list) + val fields: Array[StructField] =Array(StructField("unzipPath",StringType,nullable = true)) + val schema: StructType = StructType(fields) + val df: DataFrame = session.createDataFrame(rdd,schema) + + out.write(df) + + } + + def initialize(ctx: ProcessContext): Unit = { + + } + + def setProperties(map : Map[String, Any]) = { + filePath=MapUtil.get(map,key="filePath").asInstanceOf[String] + fileType=MapUtil.get(map,key="fileType").asInstanceOf[String] + unzipPath=MapUtil.get(map,key="unzipPath").asInstanceOf[String] + } + + override def getPropertyDescriptor(): List[PropertyDescriptor] = { + var descriptor : List[PropertyDescriptor] = List() + + val filePath = new PropertyDescriptor().name("filePath").displayName("filePath").description("file path,such as hdfs://10.0.86.89:9000/a/a.gz").defaultValue("").required(true) + val fileType = new PropertyDescriptor().name("fileType").displayName("fileType").description("file type,such as gz").defaultValue("").required(true) + val unzipPath = new PropertyDescriptor().name("unzipPath").displayName("unzipPath").description("unzip path, such as hdfs://10.0.86.89:9000/b/").defaultValue("").required(true) + descriptor = filePath :: descriptor + descriptor = fileType :: descriptor + descriptor = unzipPath :: descriptor + + descriptor + } + + override def getIcon(): Array[Byte] = { + ImageUtil.getImage("http.png") + } + + override def getGroup(): List[String] = { + List(StopGroupEnum.HttpGroup.toString) + } + +}