forked from opensci/piflow
YangQiDong
This commit is contained in:
parent
4bde8b0f23
commit
c2ae0447d7
|
@ -8,7 +8,6 @@ import org.apache.spark.SparkContext
|
|||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql.types.{StringType, StructField, StructType}
|
||||
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
|
||||
import org.apache.spark.sql.functions.monotonically_increasing_id
|
||||
|
||||
class CsvStringParser extends ConfigurableStop{
|
||||
override val authorEmail: String = "yangqidong@cnic.cn"
|
||||
|
@ -46,8 +45,6 @@ class CsvStringParser extends ConfigurableStop{
|
|||
val fields: Array[StructField] = schema.split(",").map(d=>StructField(d.trim,StringType,nullable = true))
|
||||
val NewSchema: StructType = StructType(fields)
|
||||
Fdf = session.createDataFrame(rowRDD,NewSchema)
|
||||
|
||||
//Fdf.show(10)
|
||||
out.write(Fdf)
|
||||
}
|
||||
|
||||
|
|
|
@ -8,7 +8,6 @@ import org.apache.spark.SparkContext
|
|||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql.types.{StringType, StructField, StructType}
|
||||
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
|
||||
import org.apache.spark.sql.functions.monotonically_increasing_id
|
||||
|
||||
class FolderCsvParser extends ConfigurableStop{
|
||||
override val authorEmail: String = "yangqidong@cnic.cn"
|
||||
|
@ -16,7 +15,6 @@ class FolderCsvParser extends ConfigurableStop{
|
|||
val outportList: List[String] = List(PortEnum.DefaultPort.toString)
|
||||
override val description: String = "Parsing of CSV folder"
|
||||
|
||||
|
||||
var FolderPath:String=_
|
||||
var delimiter: String = _
|
||||
var schema: String = _
|
||||
|
@ -50,7 +48,6 @@ class FolderCsvParser extends ConfigurableStop{
|
|||
!bool
|
||||
}).toDF()
|
||||
|
||||
//Fdf.show(10)
|
||||
out.write(Fdf)
|
||||
|
||||
}
|
||||
|
|
|
@ -126,7 +126,7 @@ class LoadFromFtpToHDFS extends ConfigurableStop {
|
|||
ftp.setFileType(FTP.BINARY_FILE_TYPE)
|
||||
ftp.setDataTimeout(600000)
|
||||
ftp.setConnectTimeout(600000)
|
||||
ftp.enterLocalPassiveMode()//被动模式
|
||||
ftp.enterLocalPassiveMode()
|
||||
ftp.setControlEncoding("UTF-8")
|
||||
ftp
|
||||
}
|
||||
|
|
|
@ -46,9 +46,7 @@ class UnzipFilesOnHDFS extends ConfigurableStop {
|
|||
inDf.collect().foreach(row => {
|
||||
filePath = row.get(0).asInstanceOf[String]
|
||||
unzipFile(filePath,savePath)
|
||||
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
val rdd: RDD[Row] = session.sparkContext.makeRDD(arr.toList)
|
||||
|
@ -56,11 +54,6 @@ class UnzipFilesOnHDFS extends ConfigurableStop {
|
|||
val schema: StructType = StructType(fields)
|
||||
val df: DataFrame = session.createDataFrame(rdd,schema)
|
||||
|
||||
//println("##################################################################################################")
|
||||
// println(df.count())
|
||||
//df.show(20)
|
||||
//println("##################################################################################################")
|
||||
|
||||
out.write(df)
|
||||
|
||||
}
|
||||
|
@ -141,8 +134,6 @@ class UnzipFilesOnHDFS extends ConfigurableStop {
|
|||
val gzip = new GZIPInputStream(new BufferedInputStream(fdis))
|
||||
val tarIn = new TarInputStream(gzip, 1024 * 2)
|
||||
|
||||
// fs.create(new Path(sp)).close()
|
||||
|
||||
var entry: TarEntry = null
|
||||
|
||||
while ((entry = tarIn.getNextEntry) != null && entry !=null) {
|
||||
|
|
|
@ -34,14 +34,8 @@ class FileDownHDFS extends ConfigurableStop{
|
|||
val buffer=new Array[Byte](1024*1024*10)
|
||||
var byteRead= -1
|
||||
|
||||
|
||||
|
||||
val configuration: Configuration = new Configuration()
|
||||
|
||||
|
||||
// "savePath":"hdfs://10.0.88.70:9000/yqd/hdfstest/dblp.xml.gz"
|
||||
// val hdfsUrl = "hdfs://10.0.88.70:9000"
|
||||
|
||||
val pathARR: Array[String] = savePath.split("\\/")
|
||||
var hdfsUrl:String=""
|
||||
for (x <- (0 until 3)){
|
||||
|
@ -49,18 +43,6 @@ class FileDownHDFS extends ConfigurableStop{
|
|||
hdfsUrl+=(pathARR(x) +"/")
|
||||
}
|
||||
configuration.set("fs.defaultFS",hdfsUrl)
|
||||
// configuration.set("dfs.nameservices", "nameservice1")
|
||||
// configuration.set("dfs.ha.namenodes.nameservice1", "nn1,nn2");
|
||||
// configuration.set("dfs.namenode.rpc-address.nameservice1.nn1", "xxx:8020");
|
||||
// configuration.set("dfs.namenode.rpc-address.nameservice1.nn2", "xxx:8020");
|
||||
// configuration.set("dfs.client.failover.proxy.provider.nameservice1"
|
||||
// ,"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
|
||||
|
||||
// configuration.addResource("classpath:/hadoop/core-site.xml");
|
||||
// configuration.addResource("classpath:/hadoop/hdfs-site.xml");
|
||||
// configuration.addResource("classpath:/hadoop/mapred-site.xml");
|
||||
|
||||
|
||||
|
||||
val fs = FileSystem.get(configuration)
|
||||
val fdos: FSDataOutputStream = fs.create(new Path(savePath))
|
||||
|
@ -74,7 +56,6 @@ class FileDownHDFS extends ConfigurableStop{
|
|||
inputStream.close()
|
||||
fdos.close()
|
||||
|
||||
|
||||
var seq:Seq[String]=Seq(savePath)
|
||||
val row: Row = Row.fromSeq(seq)
|
||||
val list:List[Row]=List(row)
|
||||
|
|
Loading…
Reference in New Issue