Increased functionality for decompressing stop

QiDong Yang
This commit is contained in:
yanfqidong0604 2018-12-24 15:58:46 +08:00
parent a509a6d70a
commit ceb2239755
2 changed files with 128 additions and 85 deletions

View File

@ -88,6 +88,8 @@ class LoadFromFtpToHDFS extends ConfigurableStop {
con = new FTPClientConfig(FTPClientConfig.SYST_NT)
con.setServerLanguageCode("zh")
ftp.setFileType(FTP.BINARY_FILE_TYPE)
ftp.setDataTimeout(600000)
ftp.setConnectTimeout(600000)
ftp
}

View File

@ -1,5 +1,6 @@
package cn.piflow.bundle.http
import java.io._
import java.util.zip.GZIPInputStream
import cn.piflow.conf._
@ -7,10 +8,11 @@ import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataInputStream, FileSystem, Path}
import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, FileSystem, Path}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.tools.tar.{TarEntry, TarInputStream}
import scala.collection.mutable.ArrayBuffer
@ -21,105 +23,36 @@ class UnzipFilesOnHDFS extends ConfigurableStop {
val outportList: List[String] = List(PortEnum.DefaultPort.toString)
var isCustomize:String=_
var hdfsUrl:String=_
var filePath:String=_
var fileType:String=_
var unzipPath:String=_
var savePath:String=_
var session: SparkSession = null
var arr:ArrayBuffer[Row]=ArrayBuffer()
def unzipFile(hdfsFilePath: String, zipFileType: String, unzipHdfsPath: String):String = {
var zft: String = ""
if(zipFileType.length < 1){
zft = hdfsFilePath.split("\\.").last
}else{
zft = zipFileType
}
val configuration: Configuration = new Configuration()
val pathARR: Array[String] = hdfsFilePath.split("\\/")
var hdfsUrl:String=""
for (x <- (0 until 3)){
hdfsUrl+=(pathARR(x) +"/")
}
configuration.set("fs.defaultFS",hdfsUrl)
var uhp : String=""
if(unzipHdfsPath.length < 1){
for (x <- (0 until pathARR.length-1)){
uhp+=(pathARR(x) +"/")
}
}else{
uhp=unzipHdfsPath
}
val fs = FileSystem.get(configuration)
val fdis: FSDataInputStream = fs.open(new Path(hdfsFilePath))
val filePathArr: Array[String] = hdfsFilePath.split("/")
var fileName: String = filePathArr.last
if(fileName.length == 0){
fileName = filePathArr(filePathArr.size-2)
}
var savePath:String=""
if(zft.equals("gz")){
val gzip: GZIPInputStream = new GZIPInputStream(fdis)
var n = -1
val buf=new Array[Byte](10*1024*1024)
savePath = uhp +fileName.replace(".gz","")
val path = new Path(savePath)
val fdos = fs.create(path)
while((n=gzip.read(buf)) != -1 && n != -1){
fdos.write(buf,0,n)
fdos.flush()
}
fdos.close()
gzip.close()
fdis.close()
}else{
throw new RuntimeException("File type fill in error, or do not support this type.")
}
savePath
}
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
session = pec.get[SparkSession]()
var savePath: String = ""
var arr:ArrayBuffer[Row]=ArrayBuffer()
if(isCustomize.equals("true")){
println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
savePath = unzipFile(filePath,fileType,unzipPath)
println("savepath : "+savePath)
arr += Row.fromSeq(Array(savePath))
unzipFile(filePath,savePath)
}else if (isCustomize .equals("false")){
val inDf: DataFrame = in.read()
inDf.collect().foreach(row => {
filePath = row.get(0).asInstanceOf[String]
savePath = unzipFile(filePath,"","")
arr += Row.fromSeq(Array(savePath))
savePath = ""
unzipFile(filePath,savePath)
})
}
val rdd: RDD[Row] = session.sparkContext.makeRDD(arr.toList)
val fields: Array[StructField] =Array(StructField("unzipPath",StringType,nullable = true))
val fields: Array[StructField] =Array(StructField("savePath",StringType,nullable = true))
val schema: StructType = StructType(fields)
val df: DataFrame = session.createDataFrame(rdd,schema)
@ -132,6 +65,115 @@ class UnzipFilesOnHDFS extends ConfigurableStop {
}
def whatType(p:String): String = {
var typeStr:String=""
val pathNames: Array[String] = p.split("\\.")
val lastStr: String = pathNames.last
if(lastStr.equals("gz")){
val penultStr: String = pathNames(pathNames.length-2)
if(penultStr.equals("tar")){
typeStr="tar.gz"
}else {
typeStr="gz"
}
}else{
throw new RuntimeException("File type fill in error, or do not support this type.")
}
typeStr
}
def getFs(fileHdfsPath: String): FileSystem = {
var configuration: Configuration = new Configuration()
var fs: FileSystem =null
if (isCustomize.equals("false")) {
val pathARR: Array[String] = fileHdfsPath.split("\\/")
hdfsUrl = ""
for (x <- (0 until 3)) {
hdfsUrl += (pathARR(x) + "/")
}
}
configuration.set("fs.defaultFS", hdfsUrl)
fs = FileSystem.get(configuration)
fs
}
def unzipFile(fileHdfsPath: String, saveHdfsPath: String)= {
var eachSavePath : String=""
var unType: String = whatType(fileHdfsPath)
var fileName: String = fileHdfsPath.split("\\/").last
var fs: FileSystem= getFs(fileHdfsPath)
var sp:String=""
if(saveHdfsPath.length < 1){
sp=fileHdfsPath.replace(fileName,"")
}else{
sp = saveHdfsPath
}
val fdis: FSDataInputStream = fs.open(new Path(fileHdfsPath))
if(unType.equals("gz")){
val gzip: GZIPInputStream = new GZIPInputStream(fdis)
var n = -1
val buf=new Array[Byte](10*1024*1024)
eachSavePath = sp +fileName.replace(".gz","")
arr += Row.fromSeq(Array(eachSavePath))
val path = new Path(eachSavePath)
val fdos = fs.create(path)
while((n=gzip.read(buf)) != -1 && n != -1){
fdos.write(buf,0,n)
fdos.flush()
}
fdos.close()
gzip.close()
fdis.close()
}else if(unType.equals("tar.gz")){
try {
val gzip = new GZIPInputStream(new BufferedInputStream(fdis))
val tarIn = new TarInputStream(gzip, 1024 * 2)
fs.create(new Path(sp)).close()
var entry: TarEntry = null
while ((entry = tarIn.getNextEntry) != null && entry !=null) {
if (entry.isDirectory()) {
val outPath = sp + "/" + entry.getName
fs.create(new Path(outPath)).close()
} else {
val outPath = sp + "/" + entry.getName
arr += Row.fromSeq(Array(outPath))
val fos: FSDataOutputStream = fs.create(new Path(outPath))
var lenth = 0
val buff = new Array[Byte](1024)
while ((lenth = tarIn.read(buff)) != -1 && (lenth != -1)) {
fos.write(buff, 0, lenth)
}
fos.close()
}
}
}catch {
case e: IOException =>
e.printStackTrace()
}
}
}
def initialize(ctx: ProcessContext): Unit = {
}
@ -139,25 +181,24 @@ class UnzipFilesOnHDFS extends ConfigurableStop {
def setProperties(map : Map[String, Any]) = {
isCustomize=MapUtil.get(map,key="isCustomize").asInstanceOf[String]
filePath=MapUtil.get(map,key="filePath").asInstanceOf[String]
fileType=MapUtil.get(map,key="fileType").asInstanceOf[String]
unzipPath=MapUtil.get(map,key="unzipPath").asInstanceOf[String]
hdfsUrl=MapUtil.get(map,key="hdfsUrl").asInstanceOf[String]
savePath=MapUtil.get(map,key="savePath").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val filePath = new PropertyDescriptor().name("filePath").displayName("filePath").description("file path,such as hdfs://10.0.86.89:9000/a/a.gz").defaultValue("").required(false)
val fileType = new PropertyDescriptor().name("fileType").displayName("fileType").description("file type,such as gz").defaultValue("").required(false)
val unzipPath = new PropertyDescriptor().name("unzipPath").displayName("unzipPath").description("unzip path, such as hdfs://10.0.86.89:9000/b/").defaultValue("").required(true)
val filePath = new PropertyDescriptor().name("filePath").displayName("filePath").description("file path,such as /a/a.gz").defaultValue("").required(false)
val hdfsUrl = new PropertyDescriptor().name("hdfsUrl").displayName("hdfsUrl").description("the url of HDFS,such as hdfs://10.0.86.89:9000").defaultValue("").required(false)
val savePath = new PropertyDescriptor().name("savePath").displayName("savePath").description("unzip dir path, such as /b/").defaultValue("").required(true)
val isCustomize = new PropertyDescriptor().name("isCustomize").displayName("isCustomize").description("Whether to customize the compressed file path, if true, " +
"you must specify the path where the compressed file is located and the saved path after decompression. " +
"If it is fals, it will automatically find the file path data from the upstream port and " +
"save it to the original folder after decompression.")
"you must specify the path where the compressed file is located . " +
"If it is fals, it will automatically find the file path data from the upstream port ")
.defaultValue("").required(false)
descriptor = isCustomize :: descriptor
descriptor = filePath :: descriptor
descriptor = fileType :: descriptor
descriptor = unzipPath :: descriptor
descriptor = hdfsUrl :: descriptor
descriptor = savePath :: descriptor
descriptor
}