From 81cfdb89533272228586bc98109fc0a573515276 Mon Sep 17 00:00:00 2001 From: yanggang Date: Thu, 22 Nov 2018 15:42:14 +0800 Subject: [PATCH] goldData Parse --- .../scala/cn/piflow/bundle/es/FetchEs.scala | 15 ++++++----- .../scala/cn/piflow/bundle/es/PutEs.scala | 10 +++----- .../scala/cn/piflow/bundle/es/QueryEs.scala | 12 +++------ .../scala/cn/piflow/bundle/ftp/UnGz.scala | 25 ++++++------------- .../cn/piflow/bundle/hbase/FetchHbase.scala | 17 ++++--------- .../cn/piflow/bundle/hbase/GetHbase.scala | 19 +++++--------- .../cn/piflow/bundle/hbase/PutHbase.scala | 15 +++++------ .../cn/piflow/bundle/hdfs/DeleteHdfs.scala | 6 ++--- .../scala/cn/piflow/bundle/hdfs/GetHdfs.scala | 5 ++-- .../cn/piflow/bundle/hdfs/ListHdfs.scala | 12 ++------- .../scala/cn/piflow/bundle/hdfs/PutHdfs.scala | 20 ++++++++------- .../main/scala/cn/piflow/conf/StopGroup.scala | 1 + .../scala/cn/piflow/conf/StopGroupEnum.scala | 1 + 13 files changed, 59 insertions(+), 99 deletions(-) diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/es/FetchEs.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/es/FetchEs.scala index 77ada14..fed11c5 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/es/FetchEs.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/es/FetchEs.scala @@ -4,17 +4,17 @@ import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.util.{ImageUtil, MapUtil} import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroupEnum} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} -import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession -import org.elasticsearch.spark.rdd.EsSpark -import org.elasticsearch.spark.sql.EsSparkSQL + class FetchEs extends ConfigurableStop { - override val description: String = "fetch data with dataframe from elasticSearch " val authorEmail: String = "ygang@cnic.cn" - val inportCount: Int = 0 - val outportCount: Int = 1 + + override val inportList: List[String] = List(PortEnum.NonePort.toString) + override val outportList: List[String] = List(PortEnum.DefaultPort.toString) + override val description: String = "fetch data with dataframe from elasticSearch " + var es_nodes:String = _ //es的节点,多个用逗号隔开 var port:String= _ //es的端口好 @@ -71,6 +71,5 @@ class FetchEs extends ConfigurableStop { } - override val inportList: List[String] = List(PortEnum.DefaultPort.toString) - override val outportList: List[String] = List(PortEnum.NonePort.toString) + } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/es/PutEs.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/es/PutEs.scala index f03879f..51f21ae 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/es/PutEs.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/es/PutEs.scala @@ -4,17 +4,16 @@ import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.util.{ImageUtil, MapUtil} import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroupEnum} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} -import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SparkSession -import org.elasticsearch.spark.rdd.EsSpark import org.elasticsearch.spark.sql.EsSparkSQL class PutEs extends ConfigurableStop { override val description: String = "put data with dataframe to elasticSearch " val authorEmail: String = "ygang@cnic.cn" - val inportCount: Int = 0 - val outportCount: Int = 1 + + override val inportList: List[String] = List(PortEnum.DefaultPort.toString) + override val outportList: List[String] = List(PortEnum.NonePort.toString) var es_nodes:String = _ //es的节点,多个用逗号隔开 var port:String= _ //es的端口好 @@ -82,7 +81,4 @@ class PutEs extends ConfigurableStop { List(StopGroupEnum.ESGroup.toString) } - - override val inportList: List[String] = List(PortEnum.DefaultPort.toString) - override val outportList: List[String] = List(PortEnum.NonePort.toString) } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/es/QueryEs.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/es/QueryEs.scala index 7eef03a..5b6df98 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/es/QueryEs.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/es/QueryEs.scala @@ -8,11 +8,11 @@ import org.apache.spark.sql.SparkSession import org.elasticsearch.spark.rdd.EsSpark class QueryEs extends ConfigurableStop { - - override val description: String = "query data with dataframe from elasticSearch " val authorEmail: String = "ygang@cnic.cn" - val inportCount: Int = 0 - val outportCount: Int = 1 + + override val inportList: List[String] = List(PortEnum.NonePort.toString) + override val outportList: List[String] = List(PortEnum.DefaultPort.toString) + override val description: String = "query data with dataframe from elasticSearch " var es_nodes:String = _ //es的节点,多个用逗号隔开 var port:String= _ //es的端口好 @@ -116,8 +116,4 @@ class QueryEs extends ConfigurableStop { override def getGroup(): List[String] = { List(StopGroupEnum.ESGroup.toString) } - - - override val inportList: List[String] = List(PortEnum.DefaultPort.toString) - override val outportList: List[String] = List(PortEnum.NonePort.toString) } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/ftp/UnGz.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/ftp/UnGz.scala index 6187c77..bed4ff5 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/ftp/UnGz.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/ftp/UnGz.scala @@ -1,21 +1,19 @@ package cn.piflow.bundle.ftp -import java.util -import java.util.zip.GZIPInputStream import cn.piflow.bundle.util.UnGzUtil import cn.piflow.conf.bean.PropertyDescriptor -import cn.piflow.conf.util.{ImageUtil, MapUtil} +import cn.piflow.conf.util.ImageUtil import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroupEnum} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} import org.apache.spark.sql.{DataFrame, Row, SparkSession} class UnGz extends ConfigurableStop{ - val authorEmail: String = "xiaoxiao@cnic.cn" - val description: String = "Load file from ftp url." - val inportList: List[String] = List(PortEnum.NonePort.toString) - val outportList: List[String] = List(PortEnum.NonePort.toString) + val authorEmail: String = "ygang@cnic.cn" + val description: String = "UnZip seq.gz " + val inportList: List[String] = List(PortEnum.DefaultPort.toString) + val outportList: List[String] = List(PortEnum.DefaultPort.toString) var gzType:String =_ @@ -36,17 +34,12 @@ class UnGz extends ConfigurableStop{ for (i <- 0 until rows.size) { //row(i) [/ftpUrlDownLoadDIR555/gbbct151.seq.gz] - // 文件 路径 val sourceFile = rows(i)(0).toString // println(sourceFile+"----------------------------------") - if(sourceFile.endsWith("seq.gz")){ - - if (count == 0){ println(count+"-------------------------------") - // 加载文件为 byteArray val byteArray: Array[Byte] = UnGzUtil.unGzStream(sourceFile) @@ -60,7 +53,6 @@ class UnGz extends ConfigurableStop{ } } } - df.schema.printTreeString() println(df.count()) out.write(df) @@ -69,14 +61,13 @@ class UnGz extends ConfigurableStop{ def setProperties(map: Map[String, Any]): Unit = { - - gzType=MapUtil.get(map,key="gzType").asInstanceOf[String] +// gzType=MapUtil.get(map,key="gzType").asInstanceOf[String] } override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - val gzType = new PropertyDescriptor().name("gzType").displayName("gzType").defaultValue("").required(true) - descriptor = gzType :: descriptor +// val gzType = new PropertyDescriptor().name("gzType").displayName("gzType").defaultValue("").required(true) +// descriptor = gzType :: descriptor descriptor } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/hbase/FetchHbase.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/hbase/FetchHbase.scala index edfa0c0..3c80dec 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/hbase/FetchHbase.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/hbase/FetchHbase.scala @@ -17,18 +17,14 @@ import org.apache.spark.sql.SparkSession class FetchHbase extends ConfigurableStop { - override val description: String = "put data with dataframe to elasticSearch " val authorEmail: String = "ygang@cnic.cn" - val inportCount: Int = 0 - val outportCount: Int = 1 - + override val inportList: List[String] = List(PortEnum.NonePort.toString) + override val outportList: List[String] = List(PortEnum.DefaultPort.toString) + override val description: String = "fetch data from hbase " def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { val spark = pec.get[SparkSession]() val sc = spark.sparkContext - val inDf = in.read() - inDf.show() - val tableName = "person" val configuration: Configuration = HBaseConfiguration.create() @@ -84,14 +80,11 @@ class FetchHbase extends ConfigurableStop { } override def getIcon(): Array[Byte] = { - ImageUtil.getImage("es.png") + ImageUtil.getImage("hbase.png") } override def getGroup(): List[String] = { - List(StopGroupEnum.ESGroup.toString) + List(StopGroupEnum.HbaseGroup.toString) } - - override val inportList: List[String] = List(PortEnum.DefaultPort.toString) - override val outportList: List[String] = List(PortEnum.NonePort.toString) } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/hbase/GetHbase.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/hbase/GetHbase.scala index 7e126f0..5d6e309 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/hbase/GetHbase.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/hbase/GetHbase.scala @@ -16,22 +16,18 @@ import org.apache.hadoop.hbase.util.{Base64, Bytes} import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession - - class GetHbase extends ConfigurableStop { - override val description: String = "put data with dataframe to elasticSearch " val authorEmail: String = "ygang@cnic.cn" - val inportCount: Int = 0 - val outportCount: Int = 1 + override val inportList: List[String] = List(PortEnum.NonePort.toString) + override val outportList: List[String] = List(PortEnum.DefaultPort.toString) + override val description: String = "get data from hbase " + def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { val spark = pec.get[SparkSession]() val sc = spark.sparkContext - val inDf = in.read() - inDf.show() - val tableName = "person" val configuration: Configuration = HBaseConfiguration.create() @@ -105,14 +101,11 @@ class GetHbase extends ConfigurableStop { } override def getIcon(): Array[Byte] = { - ImageUtil.getImage("es.png") + ImageUtil.getImage("hbase.png") } override def getGroup(): List[String] = { - List(StopGroupEnum.ESGroup.toString) + List(StopGroupEnum.HbaseGroup.toString) } - - override val inportList: List[String] = List(PortEnum.DefaultPort.toString) - override val outportList: List[String] = List(PortEnum.NonePort.toString) } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/hbase/PutHbase.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/hbase/PutHbase.scala index e660dde..5257d44 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/hbase/PutHbase.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/hbase/PutHbase.scala @@ -17,11 +17,11 @@ import org.apache.spark.sql.SparkSession // Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.io.ImmutableBytesWritable class PutHbase extends ConfigurableStop { - - override val description: String = "put data with dataframe to elasticSearch " val authorEmail: String = "ygang@cnic.cn" - val inportCount: Int = 0 - val outportCount: Int = 1 + + override val inportList: List[String] = List(PortEnum.DefaultPort.toString) + override val outportList: List[String] = List(PortEnum.NonePort.toString) + override val description: String = "put data to hbase " def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { @@ -89,14 +89,11 @@ class PutHbase extends ConfigurableStop { } override def getIcon(): Array[Byte] = { - ImageUtil.getImage("es.png") + ImageUtil.getImage("hbase.png") } override def getGroup(): List[String] = { - List(StopGroupEnum.ESGroup.toString) + List(StopGroupEnum.HbaseGroup.toString) } - - override val inportList: List[String] = List(PortEnum.DefaultPort.toString) - override val outportList: List[String] = List(PortEnum.NonePort.toString) } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/DeleteHdfs.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/DeleteHdfs.scala index 5ae7c0c..4bdcf1b 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/DeleteHdfs.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/DeleteHdfs.scala @@ -13,6 +13,9 @@ import org.apache.hadoop.fs.Path class DeleteHdfs extends ConfigurableStop{ override val authorEmail: String = "ygang@cmic.com" + + override val inportList: List[String] = List(PortEnum.NonePort.toString) + override val outportList: List[String] = List(PortEnum.NonePort.toString) override val description: String = "delete file or dir from hdfs" var hdfsUrl :String= _ @@ -20,7 +23,6 @@ class DeleteHdfs extends ConfigurableStop{ override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { val spark = pec.get[SparkSession]() - println(deletePath) val array = deletePath.split(",") @@ -74,6 +76,4 @@ class DeleteHdfs extends ConfigurableStop{ } - override val inportList: List[String] = List(PortEnum.DefaultPort.toString) - override val outportList: List[String] = List(PortEnum.NonePort.toString) } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/GetHdfs.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/GetHdfs.scala index 0953a10..2a128b1 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/GetHdfs.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/GetHdfs.scala @@ -10,6 +10,8 @@ import org.apache.spark.sql.SparkSession class GetHdfs extends ConfigurableStop{ override val authorEmail: String = "ygang@cmic.com" override val description: String = "write dataframe data from hdfs" + override val inportList: List[String] = List(PortEnum.NonePort.toString) + override val outportList: List[String] = List(PortEnum.DefaultPort.toString) var hdfsUrl : String=_ var hdfsPath :String= _ @@ -104,7 +106,4 @@ class GetHdfs extends ConfigurableStop{ override def initialize(ctx: ProcessContext): Unit = { } - - override val inportList: List[String] = List(PortEnum.DefaultPort.toString) - override val outportList: List[String] = List(PortEnum.NonePort.toString) } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/ListHdfs.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/ListHdfs.scala index 7485583..2101c5f 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/ListHdfs.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/ListHdfs.scala @@ -1,9 +1,5 @@ package cn.piflow.bundle.hdfs -import java.util -import java.util.ArrayList - -import breeze.collection.mutable.ArrayLike import cn.piflow._ import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.util.{ImageUtil, MapUtil} @@ -17,17 +13,15 @@ import org.apache.spark.sql. SparkSession class ListHdfs extends ConfigurableStop{ override val authorEmail: String = "ygang@cmic.com" - + override val inportList: List[String] = List(PortEnum.NonePort.toString) + override val outportList: List[String] = List(PortEnum.DefaultPort.toString) override val description: String = "retrieves a listing of files from hdfs " var hdfsPath :String= _ var hdfsUrl :String= _ var list = List("") override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { - val spark = pec.get[SparkSession]() - - val sc = spark.sparkContext val path = new Path(hdfsPath) @@ -95,6 +89,4 @@ class ListHdfs extends ConfigurableStop{ } - override val inportList: List[String] = List(PortEnum.DefaultPort.toString) - override val outportList: List[String] = List(PortEnum.NonePort.toString) } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/PutHdfs.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/PutHdfs.scala index 96d6b02..0d4f6e7 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/PutHdfs.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/PutHdfs.scala @@ -12,17 +12,17 @@ import org.apache.spark.sql.SparkSession class PutHdfs extends ConfigurableStop{ override val authorEmail: String = "ygang@cmic.com" - - + override val inportList: List[String] = List(PortEnum.DefaultPort.toString) + override val outportList: List[String] = List(PortEnum.NonePort.toString) override val description: String = "from dataframe write data to hdfs" var hdfsPath :String= _ var hdfsUrl :String= _ + var partition :Int=_ var types :String= _ + override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { - val spark = pec.get[SparkSession]() - val inDF = in.read() inDF.show() inDF.schema.printTreeString() @@ -35,12 +35,12 @@ class PutHdfs extends ConfigurableStop{ println(hdfsUrl+hdfsPath+"pppppppppppppppppppppppppppppppp--putHdfs") if (types=="json"){ - inDF.repartition(3).write.json(hdfsUrl+hdfsPath) + inDF.repartition(partition).write.json(hdfsUrl+hdfsPath) } else if (types=="csv"){ - inDF.repartition(3).write.csv(hdfsUrl+hdfsPath) + inDF.repartition(partition).write.csv(hdfsUrl+hdfsPath) } else { //parquet - inDF.repartition(3).write.save(hdfsUrl+hdfsPath) + inDF.repartition(partition).write.save(hdfsUrl+hdfsPath) } } @@ -48,6 +48,7 @@ class PutHdfs extends ConfigurableStop{ hdfsUrl = MapUtil.get(map,key="hdfsUrl").asInstanceOf[String] hdfsPath = MapUtil.get(map,key="hdfsPath").asInstanceOf[String] types = MapUtil.get(map,key="types").asInstanceOf[String] + partition = MapUtil.get(map,key="partition").asInstanceOf[Int] } override def getPropertyDescriptor(): List[PropertyDescriptor] = { @@ -55,6 +56,8 @@ class PutHdfs extends ConfigurableStop{ val hdfsPath = new PropertyDescriptor().name("hdfsPath").displayName("hdfsPath").defaultValue("").required(true) val hdfsUrl = new PropertyDescriptor().name("hdfsUrl").displayName("hdfsUrl").defaultValue("").required(true) val types = new PropertyDescriptor().name("types").displayName("json,csv,parquet").defaultValue("").required(true) + val partition = new PropertyDescriptor().name("partition").displayName("repartition").defaultValue("").required(true) + descriptor = partition :: descriptor descriptor = types :: descriptor descriptor = hdfsPath :: descriptor descriptor = hdfsUrl :: descriptor @@ -73,6 +76,5 @@ class PutHdfs extends ConfigurableStop{ } - override val inportList: List[String] = List(PortEnum.DefaultPort.toString) - override val outportList: List[String] = List(PortEnum.NonePort.toString) + } diff --git a/piflow-bundle/src/main/scala/cn/piflow/conf/StopGroup.scala b/piflow-bundle/src/main/scala/cn/piflow/conf/StopGroup.scala index 68a992f..46b0680 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/conf/StopGroup.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/conf/StopGroup.scala @@ -19,6 +19,7 @@ case object ESGroup extends StopGroup case object HdfsGroup extends StopGroup case object MicroorganismGroup extends StopGroup case object ExcelGroup extends StopGroup +case object HbaseGroup extends StopGroup diff --git a/piflow-bundle/src/main/scala/cn/piflow/conf/StopGroupEnum.scala b/piflow-bundle/src/main/scala/cn/piflow/conf/StopGroupEnum.scala index 565ccda..f6ad56a 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/conf/StopGroupEnum.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/conf/StopGroupEnum.scala @@ -27,5 +27,6 @@ object StopGroupEnum extends Enumeration { val Memcache= Value("Memcache") val GraphX=Value("GraphX") val ExcelGroup=Value("ExcelGroup") + val HbaseGroup=Value("HbaseGroup") }