Schema substitution for dataframe

About downloading network files to HDFS
About HDFS file decompression
yang qidong
This commit is contained in:
yanfqidong0604 2018-11-22 16:52:05 +08:00
parent 2e26f7cbe3
commit e97fac9d84
6 changed files with 421 additions and 0 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 12 KiB

View File

@ -0,0 +1,17 @@
<configuration>
<!-- 指定hdfs的nameservice为ns1 -->
<property>
<name>fs.defaultFS</name>
<value>hdfs://master:9000</value>
</property>
<!-- Size of read/write buffer used in SequenceFiles. -->
<property>
<name>io.file.buffer.size</name>
<value>131072</value>
</property>
<!-- 指定hadoop临时目录,自行创建 -->
<property>
<name>hadoop.tmp.dir</name>
<value>/opt/hadoop-2.6.0/tmp</value>
</property>
</configuration>

View File

@ -0,0 +1,44 @@
<configuration>
<!-- 设置namenode的http通讯地址 -->
<property>
<name>dfs.namenode.http-address</name>
<value>master:50070</value>
</property>
<!-- 设置secondarynamenode的http通讯地址 -->
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>master:50090</value>
</property>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<!-- 设置namenode存放的路径 -->
<property>
<name>dfs.namenode.name.dir</name>
<value>file:/opt/hadoop-2.6.0/dfs/name</value>
</property>
<!-- 设置datanode存放的路径 -->
<property>
<name>dfs.datanode.data.dir</name>
<value>file:/opt/hadoop-2.6.0/dfs/data</value>
</property>
<property>
<name>dfs.client.read.shortcircuit</name>
<value>true</value>
</property>
<property>
<name>dfs.domain.socket.path</name>
<value>/var/run/hadoop-hdfs/dn._PORT</value>
</property>
<property>
<name>dfs.client.file-block-storage-locations.timeout</name>
<value>10000</value>
</property>
<property>
<name>dfs.datanode.hdfs-blocks-metadata.enabled</name>
<value>true</value>
</property>
</configuration>

View File

@ -0,0 +1,63 @@
package cn.piflow.bundle.common
import cn.piflow._
import cn.piflow.conf._
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
class ConvertSchema extends ConfigurableStop {
val authorEmail: String = "yangqidong@cnic.cn"
val description: String = "convert data field."
val inportList: List[String] = List(PortEnum.DefaultPort.toString)
val outportList: List[String] = List(PortEnum.DefaultPort.toString)
var schema:String = _
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val df = in.read()
//oldField1->newField1, oldField2->newField2
val field = schema.split(",")
field.foreach(f => {
val old_new: Array[String] = f.split("->")
df.withColumnRenamed(old_new(0),old_new(1))
})
println("###########################")
df.show(20)
println("###########################")
out.write(df)
}
def initialize(ctx: ProcessContext): Unit = {
}
def setProperties(map : Map[String, Any]): Unit = {
schema = MapUtil.get(map,"schema").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val inports = new PropertyDescriptor().name("schema").displayName("schema").description("The Schema you want to convert,You can write like this: oldField1->newField1, oldField2->newField2").defaultValue("").required(true)
descriptor = inports :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("convert.jpg")
}
override def getGroup(): List[String] = {
List(StopGroupEnum.CommonGroup.toString)
}
}

View File

@ -0,0 +1,93 @@
package cn.piflow.bundle.http
import java.io.InputStream
import java.net.{HttpURLConnection, URL}
import cn.piflow.conf._
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem, Path}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
class FileDownHDFS extends ConfigurableStop{
val authorEmail: String = "yangqidong@cnic.cn"
val description: String = "Download network files to HDFS"
val inportList: List[String] = List(PortEnum.NonePort.toString)
val outportList: List[String] = List(PortEnum.DefaultPort.toString)
var url_str:String =_
var savePath:String=_
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
val url=new URL(url_str)
val uc:HttpURLConnection=url.openConnection().asInstanceOf[HttpURLConnection]
uc.setDoInput(true)
uc.connect()
val inputStream:InputStream=uc.getInputStream()
val buffer=new Array[Byte](1024*1024*10)
var byteRead= -1
val configuration: Configuration = new Configuration()
val fs = FileSystem.get(configuration)
val fdos: FSDataOutputStream = fs.create(new Path(savePath))
while(((byteRead=inputStream.read(buffer)) != -1) && (byteRead != -1)){
fdos.write(buffer,0,byteRead)
fdos.flush()
}
inputStream.close()
fdos.close()
var seq:Seq[String]=Seq(savePath)
val row: Row = Row.fromSeq(seq)
val list:List[Row]=List(row)
val rdd: RDD[Row] = spark.sparkContext.makeRDD(list)
val fields: Array[StructField] =Array(StructField("savePath",StringType,nullable = true))
val schema: StructType = StructType(fields)
val df: DataFrame = spark.createDataFrame(rdd,schema)
out.write(df)
}
def initialize(ctx: ProcessContext): Unit = {
}
def setProperties(map: Map[String, Any]): Unit = {
url_str=MapUtil.get(map,key="url_str").asInstanceOf[String]
savePath=MapUtil.get(map,key="savePath").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val url_str = new PropertyDescriptor().name("url_str").displayName("URL").description("Network address of file").defaultValue("").required(true)
val savePath = new PropertyDescriptor().name("savePath").displayName("savePath").description("The HDFS path and name you want to save, such as hdfs://10.0.86.89:9000/a/a.gz").defaultValue("").required(true)
descriptor = url_str :: descriptor
descriptor = savePath :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("http.png")
}
override def getGroup(): List[String] = {
List(StopGroupEnum.HttpGroup.toString)
}
}

View File

@ -0,0 +1,204 @@
package cn.piflow.bundle.http
import java.util.zip.GZIPInputStream
import cn.piflow.conf._
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataInputStream, FileSystem, Path}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
class UnzipFilesOnHDFS extends ConfigurableStop {
val authorEmail: String = "yangqidong@cnic.cn"
val description: String = "Unzip files on HDFS"
val inportList: List[String] = List(PortEnum.NonePort.toString)
val outportList: List[String] = List(PortEnum.DefaultPort.toString)
var filePath:String=_
var fileType:String=_
var unzipPath:String=_
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val session: SparkSession = pec.get[SparkSession]()
val configuration: Configuration = new Configuration()
val fs = FileSystem.get(configuration)
val fdis: FSDataInputStream = fs.open(new Path(filePath))
val filePathArr: Array[String] = filePath.split("/")
var fileName: String = filePathArr.last
if(fileName.length == 0){
fileName = filePathArr(filePathArr.size-2)
}
if(fileType.equals("gz")){
val gzip: GZIPInputStream = new GZIPInputStream(fdis)
var n = -1
val buf=new Array[Byte](10*1024*1024)
val savePath = new Path(unzipPath +fileName.replace(".gz",""))
val fdos = fs.create(savePath)
while((n=gzip.read(buf)) != -1 && n != -1){
fdos.write(buf,0,n)
fdos.flush()
}
}/*else if(fileType.equals("tar")){
var entryNum:Int=0
var entryFileName:String=null
var entryFile:File=null
var subEntryFile:File=null
var subEntryFileName:String=null
var tarArchiveEntries:Array[TarArchiveEntry]=null
var fileList:List[String]=List()
var fos:FileOutputStream=null
var entry: TarArchiveEntry = null
val tarIs: TarArchiveInputStream = new TarArchiveInputStream(fdis)
while ((entry = tarIs.getNextTarEntry) != null && entry != null) {
entryFileName= localPath +File.separator+entry.getName()
entryFile=new File(entryFileName)
entryNum += 1
if(entry.isDirectory()){
if(!entryFile.exists()){
entryFile.mkdirs()
}
tarArchiveEntries=entry.getDirectoryEntries()
for(i<-0 until tarArchiveEntries.length){
subEntryFileName=entryFileName+File.separator+tarArchiveEntries(i).getName()
subEntryFile=new File(subEntryFileName)
fileList=subEntryFileName::fileList
fos=new FileOutputStream(subEntryFile)
var mark = -1
val buf=new Array[Byte](4*1024)
while((mark=tarIs.read(buf)) != -1 && mark != -1){
fos.write(buf,0,mark)
}
fos.close()
fos=null
}
}else{
fileList = entryFileName :: fileList
fos=new FileOutputStream(entryFile)
var mark = -1
val buf=new Array[Byte](4*1024)
while((mark=tarIs.read(buf)) != -1 && mark != -1){
fos.write(buf,0,mark)
}
fos.close()
fos=null
}
}
if(entryNum==0){
println("there is no file!")
}
}else if(fileType.equals("tar.gz")){
var entryNum:Int=0
var entryFileName:String=null
var entryFile:File=null
var subEntryFile:File=null
var subEntryFileName:String=null
var tarArchiveEntries:Array[TarArchiveEntry]=null
var fileList:List[String]=List()
var fos:FileOutputStream=null
var entry: TarArchiveEntry = null
val gzip:GZIPInputStream=new GZIPInputStream(fdis)
val tarIs: TarArchiveInputStream = new TarArchiveInputStream(gzip)
while ((entry = tarIs.getNextTarEntry) != null && entry != null) {
entryFileName=localPath +File.separator+entry.getName()
entryFile=new File(entryFileName)
entryNum += 1
if(entry.isDirectory()){
if(!entryFile.exists()){
entryFile.mkdirs()
}
tarArchiveEntries=entry.getDirectoryEntries()
for(i<-0 until tarArchiveEntries.length){
subEntryFileName=entryFileName+File.separator+tarArchiveEntries(i).getName()
subEntryFile=new File(subEntryFileName)
fileList=subEntryFileName::fileList
fos=new FileOutputStream(subEntryFile)
var mark = -1
val buf=new Array[Byte](4*1024)
while((mark=tarIs.read(buf)) != -1 && mark != -1){
fos.write(buf,0,mark)
}
fos.close()
fos=null
}
}else{
fileList = entryFileName :: fileList
fos=new FileOutputStream(entryFile)
var mark = -1
val buf=new Array[Byte](4*1024)
while((mark=tarIs.read(buf)) != -1 && mark != -1){
fos.write(buf,0,mark)
}
fos.close()
fos=null
}
}
if(entryNum==0){
println("there is no file!")
}
}*/else{
throw new RuntimeException("File type fill in error, or do not support this type.")
}
var seq:Seq[String]=Seq(unzipPath)
val row: Row = Row.fromSeq(seq)
val list:List[Row]=List(row)
val rdd: RDD[Row] = session.sparkContext.makeRDD(list)
val fields: Array[StructField] =Array(StructField("unzipPath",StringType,nullable = true))
val schema: StructType = StructType(fields)
val df: DataFrame = session.createDataFrame(rdd,schema)
out.write(df)
}
def initialize(ctx: ProcessContext): Unit = {
}
def setProperties(map : Map[String, Any]) = {
filePath=MapUtil.get(map,key="filePath").asInstanceOf[String]
fileType=MapUtil.get(map,key="fileType").asInstanceOf[String]
unzipPath=MapUtil.get(map,key="unzipPath").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val filePath = new PropertyDescriptor().name("filePath").displayName("filePath").description("file path,such as hdfs://10.0.86.89:9000/a/a.gz").defaultValue("").required(true)
val fileType = new PropertyDescriptor().name("fileType").displayName("fileType").description("file type,such as gz").defaultValue("").required(true)
val unzipPath = new PropertyDescriptor().name("unzipPath").displayName("unzipPath").description("unzip path, such as hdfs://10.0.86.89:9000/b/").defaultValue("").required(true)
descriptor = filePath :: descriptor
descriptor = fileType :: descriptor
descriptor = unzipPath :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("http.png")
}
override def getGroup(): List[String] = {
List(StopGroupEnum.HttpGroup.toString)
}
}