From ceb223975554da645ed328061044d8cd4fe6dadf Mon Sep 17 00:00:00 2001 From: yanfqidong0604 Date: Mon, 24 Dec 2018 15:58:46 +0800 Subject: [PATCH] Increased functionality for decompressing stop QiDong Yang --- .../piflow/bundle/ftp/LoadFromFtpToHDFS.scala | 2 + .../piflow/bundle/http/UnzipFilesOnHDFS.scala | 211 +++++++++++------- 2 files changed, 128 insertions(+), 85 deletions(-) diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/ftp/LoadFromFtpToHDFS.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/ftp/LoadFromFtpToHDFS.scala index 211ba74..47bfa5b 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/ftp/LoadFromFtpToHDFS.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/ftp/LoadFromFtpToHDFS.scala @@ -88,6 +88,8 @@ class LoadFromFtpToHDFS extends ConfigurableStop { con = new FTPClientConfig(FTPClientConfig.SYST_NT) con.setServerLanguageCode("zh") ftp.setFileType(FTP.BINARY_FILE_TYPE) + ftp.setDataTimeout(600000) + ftp.setConnectTimeout(600000) ftp } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/http/UnzipFilesOnHDFS.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/http/UnzipFilesOnHDFS.scala index e3f7719..c79eef4 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/http/UnzipFilesOnHDFS.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/http/UnzipFilesOnHDFS.scala @@ -1,5 +1,6 @@ package cn.piflow.bundle.http +import java.io._ import java.util.zip.GZIPInputStream import cn.piflow.conf._ @@ -7,10 +8,11 @@ import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.util.{ImageUtil, MapUtil} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FSDataInputStream, FileSystem, Path} +import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, FileSystem, Path} import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.apache.tools.tar.{TarEntry, TarInputStream} import scala.collection.mutable.ArrayBuffer @@ -21,55 +23,110 @@ class UnzipFilesOnHDFS extends ConfigurableStop { val outportList: List[String] = List(PortEnum.DefaultPort.toString) var isCustomize:String=_ + var hdfsUrl:String=_ var filePath:String=_ - var fileType:String=_ - var unzipPath:String=_ - + var savePath:String=_ var session: SparkSession = null + var arr:ArrayBuffer[Row]=ArrayBuffer() + + + def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { + + session = pec.get[SparkSession]() + + + if(isCustomize.equals("true")){ + + unzipFile(filePath,savePath) + + }else if (isCustomize .equals("false")){ + + val inDf: DataFrame = in.read() + inDf.collect().foreach(row => { + filePath = row.get(0).asInstanceOf[String] + unzipFile(filePath,savePath) + + }) - def unzipFile(hdfsFilePath: String, zipFileType: String, unzipHdfsPath: String):String = { - var zft: String = "" - if(zipFileType.length < 1){ - zft = hdfsFilePath.split("\\.").last - }else{ - zft = zipFileType } - val configuration: Configuration = new Configuration() - val pathARR: Array[String] = hdfsFilePath.split("\\/") - var hdfsUrl:String="" - for (x <- (0 until 3)){ + val rdd: RDD[Row] = session.sparkContext.makeRDD(arr.toList) + val fields: Array[StructField] =Array(StructField("savePath",StringType,nullable = true)) + val schema: StructType = StructType(fields) + val df: DataFrame = session.createDataFrame(rdd,schema) - hdfsUrl+=(pathARR(x) +"/") - } - configuration.set("fs.defaultFS",hdfsUrl) + println("##################################################################################################") + // println(df.count()) + df.show(20) + println("##################################################################################################") - var uhp : String="" - if(unzipHdfsPath.length < 1){ - for (x <- (0 until pathARR.length-1)){ - uhp+=(pathARR(x) +"/") + out.write(df) + + } + + + def whatType(p:String): String = { + var typeStr:String="" + val pathNames: Array[String] = p.split("\\.") + val lastStr: String = pathNames.last + if(lastStr.equals("gz")){ + val penultStr: String = pathNames(pathNames.length-2) + if(penultStr.equals("tar")){ + typeStr="tar.gz" + }else { + typeStr="gz" } + }else{ + throw new RuntimeException("File type fill in error, or do not support this type.") + } + typeStr + } + + + def getFs(fileHdfsPath: String): FileSystem = { + var configuration: Configuration = new Configuration() + var fs: FileSystem =null + if (isCustomize.equals("false")) { + val pathARR: Array[String] = fileHdfsPath.split("\\/") + hdfsUrl = "" + for (x <- (0 until 3)) { + hdfsUrl += (pathARR(x) + "/") + } + } + configuration.set("fs.defaultFS", hdfsUrl) + fs = FileSystem.get(configuration) + fs + } + + + + + def unzipFile(fileHdfsPath: String, saveHdfsPath: String)= { + var eachSavePath : String="" + + var unType: String = whatType(fileHdfsPath) + var fileName: String = fileHdfsPath.split("\\/").last + var fs: FileSystem= getFs(fileHdfsPath) + + var sp:String="" + if(saveHdfsPath.length < 1){ + sp=fileHdfsPath.replace(fileName,"") }else{ - uhp=unzipHdfsPath + sp = saveHdfsPath } - val fs = FileSystem.get(configuration) - val fdis: FSDataInputStream = fs.open(new Path(hdfsFilePath)) - val filePathArr: Array[String] = hdfsFilePath.split("/") - var fileName: String = filePathArr.last - if(fileName.length == 0){ - fileName = filePathArr(filePathArr.size-2) - } + val fdis: FSDataInputStream = fs.open(new Path(fileHdfsPath)) - var savePath:String="" - if(zft.equals("gz")){ + if(unType.equals("gz")){ val gzip: GZIPInputStream = new GZIPInputStream(fdis) var n = -1 val buf=new Array[Byte](10*1024*1024) - savePath = uhp +fileName.replace(".gz","") - val path = new Path(savePath) + + eachSavePath = sp +fileName.replace(".gz","") + arr += Row.fromSeq(Array(eachSavePath)) + val path = new Path(eachSavePath) val fdos = fs.create(path) while((n=gzip.read(buf)) != -1 && n != -1){ fdos.write(buf,0,n) @@ -78,60 +135,45 @@ class UnzipFilesOnHDFS extends ConfigurableStop { fdos.close() gzip.close() fdis.close() - }else{ - throw new RuntimeException("File type fill in error, or do not support this type.") - } + }else if(unType.equals("tar.gz")){ - savePath + try { + val gzip = new GZIPInputStream(new BufferedInputStream(fdis)) + val tarIn = new TarInputStream(gzip, 1024 * 2) - } + fs.create(new Path(sp)).close() - def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { + var entry: TarEntry = null - session = pec.get[SparkSession]() + while ((entry = tarIn.getNextEntry) != null && entry !=null) { - var savePath: String = "" - var arr:ArrayBuffer[Row]=ArrayBuffer() + if (entry.isDirectory()) { + val outPath = sp + "/" + entry.getName + fs.create(new Path(outPath)).close() + } else { + val outPath = sp + "/" + entry.getName - if(isCustomize.equals("true")){ - println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") + arr += Row.fromSeq(Array(outPath)) + val fos: FSDataOutputStream = fs.create(new Path(outPath)) - savePath = unzipFile(filePath,fileType,unzipPath) - - - println("savepath : "+savePath) - - arr += Row.fromSeq(Array(savePath)) - - }else if (isCustomize.equals("false")){ - - val inDf: DataFrame = in.read() - inDf.collect().foreach(row => { - - filePath = row.get(0).asInstanceOf[String] - savePath = unzipFile(filePath,"","") - arr += Row.fromSeq(Array(savePath)) - savePath = "" - - }) + var lenth = 0 + val buff = new Array[Byte](1024) + while ((lenth = tarIn.read(buff)) != -1 && (lenth != -1)) { + fos.write(buff, 0, lenth) + } + fos.close() + } + } + }catch { + case e: IOException => + e.printStackTrace() + } } - - val rdd: RDD[Row] = session.sparkContext.makeRDD(arr.toList) - val fields: Array[StructField] =Array(StructField("unzipPath",StringType,nullable = true)) - val schema: StructType = StructType(fields) - val df: DataFrame = session.createDataFrame(rdd,schema) - - println("##################################################################################################") -// println(df.count()) - df.show(20) - println("##################################################################################################") - - out.write(df) - } + def initialize(ctx: ProcessContext): Unit = { } @@ -139,25 +181,24 @@ class UnzipFilesOnHDFS extends ConfigurableStop { def setProperties(map : Map[String, Any]) = { isCustomize=MapUtil.get(map,key="isCustomize").asInstanceOf[String] filePath=MapUtil.get(map,key="filePath").asInstanceOf[String] - fileType=MapUtil.get(map,key="fileType").asInstanceOf[String] - unzipPath=MapUtil.get(map,key="unzipPath").asInstanceOf[String] + hdfsUrl=MapUtil.get(map,key="hdfsUrl").asInstanceOf[String] + savePath=MapUtil.get(map,key="savePath").asInstanceOf[String] } override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - val filePath = new PropertyDescriptor().name("filePath").displayName("filePath").description("file path,such as hdfs://10.0.86.89:9000/a/a.gz").defaultValue("").required(false) - val fileType = new PropertyDescriptor().name("fileType").displayName("fileType").description("file type,such as gz").defaultValue("").required(false) - val unzipPath = new PropertyDescriptor().name("unzipPath").displayName("unzipPath").description("unzip path, such as hdfs://10.0.86.89:9000/b/").defaultValue("").required(true) + val filePath = new PropertyDescriptor().name("filePath").displayName("filePath").description("file path,such as /a/a.gz").defaultValue("").required(false) + val hdfsUrl = new PropertyDescriptor().name("hdfsUrl").displayName("hdfsUrl").description("the url of HDFS,such as hdfs://10.0.86.89:9000").defaultValue("").required(false) + val savePath = new PropertyDescriptor().name("savePath").displayName("savePath").description("unzip dir path, such as /b/").defaultValue("").required(true) val isCustomize = new PropertyDescriptor().name("isCustomize").displayName("isCustomize").description("Whether to customize the compressed file path, if true, " + - "you must specify the path where the compressed file is located and the saved path after decompression. " + - "If it is fals, it will automatically find the file path data from the upstream port and " + - "save it to the original folder after decompression.") + "you must specify the path where the compressed file is located . " + + "If it is fals, it will automatically find the file path data from the upstream port ") .defaultValue("").required(false) descriptor = isCustomize :: descriptor descriptor = filePath :: descriptor - descriptor = fileType :: descriptor - descriptor = unzipPath :: descriptor + descriptor = hdfsUrl :: descriptor + descriptor = savePath :: descriptor descriptor }