This commit is contained in:
xiaoxiao 2018-08-13 17:02:00 +08:00
parent e7014c206d
commit 6c82b4c452
1 changed files with 124 additions and 22 deletions

View File

@ -1,13 +1,14 @@
package cn.piflow.bundle.http package cn.piflow.bundle.http
import java.io.{ByteArrayInputStream, File, FileOutputStream} import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream}
import java.lang.Exception
import java.util.zip.GZIPInputStream import java.util.zip.GZIPInputStream
import cn.piflow.conf.{ConfigurableStop, HiveGroup, HttpGroup, StopGroup} import cn.piflow.conf.{ConfigurableStop, HiveGroup, HttpGroup, StopGroup}
import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream import org.apache.commons.compress.archivers.tar.{TarArchiveEntry, TarArchiveInputStream}
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.{DataFrame, SparkSession}
class UnGZip extends ConfigurableStop { class UnGZip extends ConfigurableStop {
val inportCount: Int = 0 val inportCount: Int = 0
@ -18,16 +19,24 @@ class UnGZip extends ConfigurableStop {
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]() val spark = pec.get[SparkSession]()
val df = in.read() val df = in.read()
var outDF:DataFrame=null
import spark.sqlContext.implicits._
val bis:ByteArrayInputStream=new ByteArrayInputStream(df.head().get(0).asInstanceOf[Array[Byte]]) val bis:ByteArrayInputStream=new ByteArrayInputStream(df.head().get(0).asInstanceOf[Array[Byte]])
val filename=df.head().get(1).asInstanceOf[String]//.replace(".gz","") val filename=df.head().get(1).asInstanceOf[String]//.replace(".gz","")
val extention=getExtension(filename) val extention=getExtension(filename)
var filePath:String=null var filePath:String=null
var filePathList:List[String]=null
if(extention.equals("gz")){ if(extention.equals("gz")){
filePath=unGz(bis,filename) filePath=unGz(bis,filename)
outDF=Seq((filePath)).toDF()
}
if(extention.equals("tar")){
filePathList=unTar(bis,filename)
outDF=Seq(filePathList).toDF()
} }
import spark.sqlContext.implicits._
val outDF=Seq((filePath)).toDF()
out.write(outDF) out.write(outDF)
} }
@ -50,38 +59,131 @@ class UnGZip extends ConfigurableStop {
} }
} }
def unGz(bis:ByteArrayInputStream,filename:String):String={ def unGz(bis:ByteArrayInputStream,srcFileName:String):String={
val gzip:GZIPInputStream=new GZIPInputStream(bis) val gzip:GZIPInputStream=new GZIPInputStream(bis)
createDir(outPutDir,null)
val savePath=new File(outPutDir) val savePath=new File(outPutDir)
createDir(outPutDir,null)
/*if(!savePath.exists()){ /*if(!savePath.exists()){
savePath.mkdir() savePath.mkdir()
}*/ }*/
val filePath:String=savePath+"/"+filename.replace("gz","") val fileName:String=savePath+"/"+srcFileName.replace(".gz","")
val fos:FileOutputStream=new FileOutputStream(filePath) val fos:FileOutputStream=new FileOutputStream(fileName)
var mark = -1 var mark = -1
val buf=new Array[Byte](4*1024) val buf=new Array[Byte](4*1024)
while((mark=gzip.read(buf)) != -1 && mark != -1){ while((mark=gzip.read(buf)) != -1 && mark != -1){
fos.write(buf,0,mark) fos.write(buf,0,mark)
} }
return filePath return fileName
} }
def unTar(bis:ByteArrayInputStream,filename:String):String={ def unTar(bis:ByteArrayInputStream,filename:String):List[String]={
val tarIs:TarInputStream=new TarInputStream(bis) var entryNum:Int=0
val savePath=new File("/unzip") var entryFileName:String=null
if(!savePath.exists()){ var entryFile:File=null
savePath.mkdirs() var subEntryFile:File=null
var subEntryFileName:String=null
var tarArchiveEntries:Array[TarArchiveEntry]=null
var fileList:List[String]=List()
var fos:FileOutputStream=null
var entry: TarArchiveEntry = null
val tarIs: TarArchiveInputStream = new TarArchiveInputStream(bis)
while ((entry = tarIs.getNextTarEntry) != null && entry != null) {
entryFileName=outPutDir+File.separator+entry.getName()
entryFile=new File(entryFileName)
entryNum += 1
if(entry.isDirectory()){
if(!entryFile.exists()){
entryFile.mkdirs()
} }
val filePath:String=savePath+"/"+filename.replace("gz","") tarArchiveEntries=entry.getDirectoryEntries()
val fos:FileOutputStream=new FileOutputStream(filePath) for(i<-0 until tarArchiveEntries.length){
subEntryFileName=entryFileName+File.separator+tarArchiveEntries(i).getName()
subEntryFile=new File(subEntryFileName)
fileList=subEntryFileName::fileList
fos=new FileOutputStream(subEntryFile)
var mark = -1 var mark = -1
val buf=new Array[Byte](4*1024) val buf=new Array[Byte](4*1024)
while((mark=gzip.read(buf)) != -1 && mark != -1){ while((mark=tarIs.read(buf)) != -1 && mark != -1){
fos.write(buf,0,mark) fos.write(buf,0,mark)
} }
return filePath fos.close()
fos=null
} }
}else{
fileList = entryFileName :: fileList
fos=new FileOutputStream(entryFile)
var mark = -1
val buf=new Array[Byte](4*1024)
while((mark=tarIs.read(buf)) != -1 && mark != -1){
fos.write(buf,0,mark)
}
fos.close()
fos=null
}
}
if(entryNum==0){
println("there is no file!")
}
fileList
}
def unTarGz(bis:ByteArrayInputStream,filename:String):List[String]={
var entryNum:Int=0
var entryFileName:String=null
var entryFile:File=null
var subEntryFile:File=null
var subEntryFileName:String=null
var tarArchiveEntries:Array[TarArchiveEntry]=null
var fileList:List[String]=List()
var fos:FileOutputStream=null
var entry: TarArchiveEntry = null
val gzip:GZIPInputStream=new GZIPInputStream(bis)
val tarIs: TarArchiveInputStream = new TarArchiveInputStream(gzip)
while ((entry = tarIs.getNextTarEntry) != null && entry != null) {
entryFileName=outPutDir+File.separator+entry.getName()
entryFile=new File(entryFileName)
entryNum += 1
if(entry.isDirectory()){
if(!entryFile.exists()){
entryFile.mkdirs()
}
tarArchiveEntries=entry.getDirectoryEntries()
for(i<-0 until tarArchiveEntries.length){
subEntryFileName=entryFileName+File.separator+tarArchiveEntries(i).getName()
subEntryFile=new File(subEntryFileName)
fileList=subEntryFileName::fileList
fos=new FileOutputStream(subEntryFile)
var mark = -1
val buf=new Array[Byte](4*1024)
while((mark=tarIs.read(buf)) != -1 && mark != -1){
fos.write(buf,0,mark)
}
fos.close()
fos=null
}
}else{
fileList = entryFileName :: fileList
fos=new FileOutputStream(entryFile)
var mark = -1
val buf=new Array[Byte](4*1024)
while((mark=tarIs.read(buf)) != -1 && mark != -1){
fos.write(buf,0,mark)
}
fos.close()
fos=null
}
}
if(entryNum==0){
println("there is no file!")
}
fileList
}
def initialize(ctx: ProcessContext): Unit = { def initialize(ctx: ProcessContext): Unit = {