goldData Parse

This commit is contained in:
yanggang 2018-11-22 15:42:14 +08:00
parent 4bb8abba9c
commit 81cfdb8953
13 changed files with 59 additions and 99 deletions

View File

@ -4,17 +4,17 @@ import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroupEnum}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.elasticsearch.spark.rdd.EsSpark
import org.elasticsearch.spark.sql.EsSparkSQL
class FetchEs extends ConfigurableStop {
override val description: String = "fetch data with dataframe from elasticSearch "
val authorEmail: String = "ygang@cnic.cn"
val inportCount: Int = 0
val outportCount: Int = 1
override val inportList: List[String] = List(PortEnum.NonePort.toString)
override val outportList: List[String] = List(PortEnum.DefaultPort.toString)
override val description: String = "fetch data with dataframe from elasticSearch "
var es_nodes:String = _ //es的节点多个用逗号隔开
var port:String= _ //es的端口好
@ -71,6 +71,5 @@ class FetchEs extends ConfigurableStop {
}
override val inportList: List[String] = List(PortEnum.DefaultPort.toString)
override val outportList: List[String] = List(PortEnum.NonePort.toString)
}

View File

@ -4,17 +4,16 @@ import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroupEnum}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.elasticsearch.spark.rdd.EsSpark
import org.elasticsearch.spark.sql.EsSparkSQL
class PutEs extends ConfigurableStop {
override val description: String = "put data with dataframe to elasticSearch "
val authorEmail: String = "ygang@cnic.cn"
val inportCount: Int = 0
val outportCount: Int = 1
override val inportList: List[String] = List(PortEnum.DefaultPort.toString)
override val outportList: List[String] = List(PortEnum.NonePort.toString)
var es_nodes:String = _ //es的节点多个用逗号隔开
var port:String= _ //es的端口好
@ -82,7 +81,4 @@ class PutEs extends ConfigurableStop {
List(StopGroupEnum.ESGroup.toString)
}
override val inportList: List[String] = List(PortEnum.DefaultPort.toString)
override val outportList: List[String] = List(PortEnum.NonePort.toString)
}

View File

@ -8,11 +8,11 @@ import org.apache.spark.sql.SparkSession
import org.elasticsearch.spark.rdd.EsSpark
class QueryEs extends ConfigurableStop {
override val description: String = "query data with dataframe from elasticSearch "
val authorEmail: String = "ygang@cnic.cn"
val inportCount: Int = 0
val outportCount: Int = 1
override val inportList: List[String] = List(PortEnum.NonePort.toString)
override val outportList: List[String] = List(PortEnum.DefaultPort.toString)
override val description: String = "query data with dataframe from elasticSearch "
var es_nodes:String = _ //es的节点多个用逗号隔开
var port:String= _ //es的端口好
@ -116,8 +116,4 @@ class QueryEs extends ConfigurableStop {
override def getGroup(): List[String] = {
List(StopGroupEnum.ESGroup.toString)
}
override val inportList: List[String] = List(PortEnum.DefaultPort.toString)
override val outportList: List[String] = List(PortEnum.NonePort.toString)
}

View File

@ -1,21 +1,19 @@
package cn.piflow.bundle.ftp
import java.util
import java.util.zip.GZIPInputStream
import cn.piflow.bundle.util.UnGzUtil
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.conf.util.ImageUtil
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroupEnum}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
class UnGz extends ConfigurableStop{
val authorEmail: String = "xiaoxiao@cnic.cn"
val description: String = "Load file from ftp url."
val inportList: List[String] = List(PortEnum.NonePort.toString)
val outportList: List[String] = List(PortEnum.NonePort.toString)
val authorEmail: String = "ygang@cnic.cn"
val description: String = "UnZip seq.gz "
val inportList: List[String] = List(PortEnum.DefaultPort.toString)
val outportList: List[String] = List(PortEnum.DefaultPort.toString)
var gzType:String =_
@ -36,17 +34,12 @@ class UnGz extends ConfigurableStop{
for (i <- 0 until rows.size) {
//row(i) [/ftpUrlDownLoadDIR555/gbbct151.seq.gz]
// 文件 路径
val sourceFile = rows(i)(0).toString
// println(sourceFile+"----------------------------------")
if(sourceFile.endsWith("seq.gz")){
if (count == 0){
println(count+"-------------------------------")
// 加载文件为 byteArray
val byteArray: Array[Byte] = UnGzUtil.unGzStream(sourceFile)
@ -60,7 +53,6 @@ class UnGz extends ConfigurableStop{
}
}
}
df.schema.printTreeString()
println(df.count())
out.write(df)
@ -69,14 +61,13 @@ class UnGz extends ConfigurableStop{
def setProperties(map: Map[String, Any]): Unit = {
gzType=MapUtil.get(map,key="gzType").asInstanceOf[String]
// gzType=MapUtil.get(map,key="gzType").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val gzType = new PropertyDescriptor().name("gzType").displayName("gzType").defaultValue("").required(true)
descriptor = gzType :: descriptor
// val gzType = new PropertyDescriptor().name("gzType").displayName("gzType").defaultValue("").required(true)
// descriptor = gzType :: descriptor
descriptor
}

View File

@ -17,18 +17,14 @@ import org.apache.spark.sql.SparkSession
class FetchHbase extends ConfigurableStop {
override val description: String = "put data with dataframe to elasticSearch "
val authorEmail: String = "ygang@cnic.cn"
val inportCount: Int = 0
val outportCount: Int = 1
override val inportList: List[String] = List(PortEnum.NonePort.toString)
override val outportList: List[String] = List(PortEnum.DefaultPort.toString)
override val description: String = "fetch data from hbase "
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
val sc = spark.sparkContext
val inDf = in.read()
inDf.show()
val tableName = "person"
val configuration: Configuration = HBaseConfiguration.create()
@ -84,14 +80,11 @@ class FetchHbase extends ConfigurableStop {
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("es.png")
ImageUtil.getImage("hbase.png")
}
override def getGroup(): List[String] = {
List(StopGroupEnum.ESGroup.toString)
List(StopGroupEnum.HbaseGroup.toString)
}
override val inportList: List[String] = List(PortEnum.DefaultPort.toString)
override val outportList: List[String] = List(PortEnum.NonePort.toString)
}

View File

@ -16,22 +16,18 @@ import org.apache.hadoop.hbase.util.{Base64, Bytes}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
class GetHbase extends ConfigurableStop {
override val description: String = "put data with dataframe to elasticSearch "
val authorEmail: String = "ygang@cnic.cn"
val inportCount: Int = 0
val outportCount: Int = 1
override val inportList: List[String] = List(PortEnum.NonePort.toString)
override val outportList: List[String] = List(PortEnum.DefaultPort.toString)
override val description: String = "get data from hbase "
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
val sc = spark.sparkContext
val inDf = in.read()
inDf.show()
val tableName = "person"
val configuration: Configuration = HBaseConfiguration.create()
@ -105,14 +101,11 @@ class GetHbase extends ConfigurableStop {
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("es.png")
ImageUtil.getImage("hbase.png")
}
override def getGroup(): List[String] = {
List(StopGroupEnum.ESGroup.toString)
List(StopGroupEnum.HbaseGroup.toString)
}
override val inportList: List[String] = List(PortEnum.DefaultPort.toString)
override val outportList: List[String] = List(PortEnum.NonePort.toString)
}

View File

@ -17,11 +17,11 @@ import org.apache.spark.sql.SparkSession
// Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.io.ImmutableBytesWritable
class PutHbase extends ConfigurableStop {
override val description: String = "put data with dataframe to elasticSearch "
val authorEmail: String = "ygang@cnic.cn"
val inportCount: Int = 0
val outportCount: Int = 1
override val inportList: List[String] = List(PortEnum.DefaultPort.toString)
override val outportList: List[String] = List(PortEnum.NonePort.toString)
override val description: String = "put data to hbase "
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
@ -89,14 +89,11 @@ class PutHbase extends ConfigurableStop {
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("es.png")
ImageUtil.getImage("hbase.png")
}
override def getGroup(): List[String] = {
List(StopGroupEnum.ESGroup.toString)
List(StopGroupEnum.HbaseGroup.toString)
}
override val inportList: List[String] = List(PortEnum.DefaultPort.toString)
override val outportList: List[String] = List(PortEnum.NonePort.toString)
}

View File

@ -13,6 +13,9 @@ import org.apache.hadoop.fs.Path
class DeleteHdfs extends ConfigurableStop{
override val authorEmail: String = "ygang@cmic.com"
override val inportList: List[String] = List(PortEnum.NonePort.toString)
override val outportList: List[String] = List(PortEnum.NonePort.toString)
override val description: String = "delete file or dir from hdfs"
var hdfsUrl :String= _
@ -20,7 +23,6 @@ class DeleteHdfs extends ConfigurableStop{
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
println(deletePath)
val array = deletePath.split(",")
@ -74,6 +76,4 @@ class DeleteHdfs extends ConfigurableStop{
}
override val inportList: List[String] = List(PortEnum.DefaultPort.toString)
override val outportList: List[String] = List(PortEnum.NonePort.toString)
}

View File

@ -10,6 +10,8 @@ import org.apache.spark.sql.SparkSession
class GetHdfs extends ConfigurableStop{
override val authorEmail: String = "ygang@cmic.com"
override val description: String = "write dataframe data from hdfs"
override val inportList: List[String] = List(PortEnum.NonePort.toString)
override val outportList: List[String] = List(PortEnum.DefaultPort.toString)
var hdfsUrl : String=_
var hdfsPath :String= _
@ -104,7 +106,4 @@ class GetHdfs extends ConfigurableStop{
override def initialize(ctx: ProcessContext): Unit = {
}
override val inportList: List[String] = List(PortEnum.DefaultPort.toString)
override val outportList: List[String] = List(PortEnum.NonePort.toString)
}

View File

@ -1,9 +1,5 @@
package cn.piflow.bundle.hdfs
import java.util
import java.util.ArrayList
import breeze.collection.mutable.ArrayLike
import cn.piflow._
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
@ -17,17 +13,15 @@ import org.apache.spark.sql. SparkSession
class ListHdfs extends ConfigurableStop{
override val authorEmail: String = "ygang@cmic.com"
override val inportList: List[String] = List(PortEnum.NonePort.toString)
override val outportList: List[String] = List(PortEnum.DefaultPort.toString)
override val description: String = "retrieves a listing of files from hdfs "
var hdfsPath :String= _
var hdfsUrl :String= _
var list = List("")
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
val sc = spark.sparkContext
val path = new Path(hdfsPath)
@ -95,6 +89,4 @@ class ListHdfs extends ConfigurableStop{
}
override val inportList: List[String] = List(PortEnum.DefaultPort.toString)
override val outportList: List[String] = List(PortEnum.NonePort.toString)
}

View File

@ -12,17 +12,17 @@ import org.apache.spark.sql.SparkSession
class PutHdfs extends ConfigurableStop{
override val authorEmail: String = "ygang@cmic.com"
override val inportList: List[String] = List(PortEnum.DefaultPort.toString)
override val outportList: List[String] = List(PortEnum.NonePort.toString)
override val description: String = "from dataframe write data to hdfs"
var hdfsPath :String= _
var hdfsUrl :String= _
var partition :Int=_
var types :String= _
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
val inDF = in.read()
inDF.show()
inDF.schema.printTreeString()
@ -35,12 +35,12 @@ class PutHdfs extends ConfigurableStop{
println(hdfsUrl+hdfsPath+"pppppppppppppppppppppppppppppppp--putHdfs")
if (types=="json"){
inDF.repartition(3).write.json(hdfsUrl+hdfsPath)
inDF.repartition(partition).write.json(hdfsUrl+hdfsPath)
} else if (types=="csv"){
inDF.repartition(3).write.csv(hdfsUrl+hdfsPath)
inDF.repartition(partition).write.csv(hdfsUrl+hdfsPath)
} else {
//parquet
inDF.repartition(3).write.save(hdfsUrl+hdfsPath)
inDF.repartition(partition).write.save(hdfsUrl+hdfsPath)
}
}
@ -48,6 +48,7 @@ class PutHdfs extends ConfigurableStop{
hdfsUrl = MapUtil.get(map,key="hdfsUrl").asInstanceOf[String]
hdfsPath = MapUtil.get(map,key="hdfsPath").asInstanceOf[String]
types = MapUtil.get(map,key="types").asInstanceOf[String]
partition = MapUtil.get(map,key="partition").asInstanceOf[Int]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
@ -55,6 +56,8 @@ class PutHdfs extends ConfigurableStop{
val hdfsPath = new PropertyDescriptor().name("hdfsPath").displayName("hdfsPath").defaultValue("").required(true)
val hdfsUrl = new PropertyDescriptor().name("hdfsUrl").displayName("hdfsUrl").defaultValue("").required(true)
val types = new PropertyDescriptor().name("types").displayName("json,csv,parquet").defaultValue("").required(true)
val partition = new PropertyDescriptor().name("partition").displayName("repartition").defaultValue("").required(true)
descriptor = partition :: descriptor
descriptor = types :: descriptor
descriptor = hdfsPath :: descriptor
descriptor = hdfsUrl :: descriptor
@ -73,6 +76,5 @@ class PutHdfs extends ConfigurableStop{
}
override val inportList: List[String] = List(PortEnum.DefaultPort.toString)
override val outportList: List[String] = List(PortEnum.NonePort.toString)
}

View File

@ -19,6 +19,7 @@ case object ESGroup extends StopGroup
case object HdfsGroup extends StopGroup
case object MicroorganismGroup extends StopGroup
case object ExcelGroup extends StopGroup
case object HbaseGroup extends StopGroup

View File

@ -27,5 +27,6 @@ object StopGroupEnum extends Enumeration {
val Memcache= Value("Memcache")
val GraphX=Value("GraphX")
val ExcelGroup=Value("ExcelGroup")
val HbaseGroup=Value("HbaseGroup")
}