diff --git a/piflow-bundle/src/main/resources/genbank.json b/piflow-bundle/src/main/resources/genbank.json index 991e22e..95c3e85 100644 --- a/piflow-bundle/src/main/resources/genbank.json +++ b/piflow-bundle/src/main/resources/genbank.json @@ -8,10 +8,10 @@ "name":"LoadFromFtpUrl", "bundle":"cn.piflow.bundle.ftp.LoadFromFtpUrl", "properties":{ - "url_str":"https://ftp.ncbi.nih.gov/genbank/docs/", - "url_type":"dir", + "url_str":"https://ftp.ncbi.nih.gov/genbank/gbbct151.seq.gz", + "url_type":"file", "localPath":"/ftpUrlDownLoadDIR555", - "downType":"all11" + "downType":"all" } }, { @@ -19,7 +19,7 @@ "name":"UnGz", "bundle":"cn.piflow.bundle.ftp.UnGz", "properties":{ - "localPath":"/ftpUrlUnGz222" + "gzType":"seq.gz" } }, { @@ -30,7 +30,7 @@ "es_nodes":"10.0.86.239", "port":"9200", "es_index":"genbank", - "es_type":"data1" + "es_type":"data6" } } ], diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/ftp/UnGz.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/ftp/UnGz.scala index d11f454..6187c77 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/ftp/UnGz.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/ftp/UnGz.scala @@ -1,11 +1,14 @@ package cn.piflow.bundle.ftp +import java.util +import java.util.zip.GZIPInputStream + import cn.piflow.bundle.util.UnGzUtil import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.util.{ImageUtil, MapUtil} import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroupEnum} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} -import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.{DataFrame, Row, SparkSession} class UnGz extends ConfigurableStop{ @@ -14,9 +17,7 @@ class UnGz extends ConfigurableStop{ val inportList: List[String] = List(PortEnum.NonePort.toString) val outportList: List[String] = List(PortEnum.NonePort.toString) - var localPath:String =_ - - var list = List("") + var gzType:String =_ def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { val spark = pec.get[SparkSession]() @@ -27,30 +28,41 @@ class UnGz extends ConfigurableStop{ inDf.show() inDf.schema.printTreeString() + // df => Array[Row] val rows: Array[Row] = inDf.collect() + var df:DataFrame = null + + var count = 0 + for (i <- 0 until rows.size) { + + //row(i) [/ftpUrlDownLoadDIR555/gbbct151.seq.gz] + + // 文件 路径 + val sourceFile = rows(i)(0).toString + // println(sourceFile+"----------------------------------") + + if(sourceFile.endsWith("seq.gz")){ - for (i <- 0 until rows.size) { - val sourceFile = rows(i)(0).toString + if (count == 0){ + println(count+"-------------------------------") - if(sourceFile.endsWith("seq.gz")){ - println(sourceFile+"----------------------------------") - val strings: Array[String] = sourceFile.split("/") - val fileName = strings(strings.length-1).split(".gz")(0) - val savePath = localPath+sourceFile.split(".seq.gz")(0) + // 加载文件为 byteArray + val byteArray: Array[Byte] = UnGzUtil.unGzStream(sourceFile) - println(savePath) + df = Seq(byteArray).toDF() + count = count+1 + } else { + println(count+"-------------------------------") + val byteArray: Array[Byte] = UnGzUtil.unGzStream(sourceFile) - val filePath = UnGzUtil.unGz(sourceFile,savePath,fileName) - - println(filePath) - list = filePath::list + df = df.union(Seq(byteArray).toDF()) } } + } - val df = sc.parallelize(list).toDF("filePath") df.schema.printTreeString() - df.show() + println(df.count()) out.write(df) } @@ -58,13 +70,13 @@ class UnGz extends ConfigurableStop{ def setProperties(map: Map[String, Any]): Unit = { - localPath=MapUtil.get(map,key="localPath").asInstanceOf[String] + gzType=MapUtil.get(map,key="gzType").asInstanceOf[String] } override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - val savePath = new PropertyDescriptor().name("savePath").displayName("savePath").defaultValue("").required(true) - descriptor = savePath :: descriptor + val gzType = new PropertyDescriptor().name("gzType").displayName("gzType").defaultValue("").required(true) + descriptor = gzType :: descriptor descriptor } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/GenBankParse.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/GenBankParse.scala index b77e8a7..d82dbb2 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/GenBankParse.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/GenBankParse.scala @@ -1,29 +1,18 @@ package cn.piflow.bundle.microorganism import java.io._ -import java.net.URL import java.text.ParseException -import java.util import java.util.ArrayList -import java.util.zip.GZIPInputStream -import cn.piflow.bundle.microorganism.util.CustomIOTools -import cn.piflow.bundle.microorganism.util.Process +import cn.piflow.bundle.microorganism.util.{CustomIOTools, Process} import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.util.{ImageUtil, MapUtil} import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroupEnum} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.biojava.bio.BioException -import org.elasticsearch.action.bulk.BulkProcessor -import org.json.JSONObject -import org.elasticsearch.action.index.IndexRequest import org.elasticsearch.spark.sql.EsSparkSQL - -import scala.collection.mutable.ArrayBuffer - +import org.json.JSONObject class GenBankParse extends ConfigurableStop{ @@ -41,72 +30,55 @@ class GenBankParse extends ConfigurableStop{ def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { val spark = pec.get[SparkSession]() val sc = spark.sparkContext - import spark.sqlContext.implicits._ - import spark.implicits._ val inDf = in.read() - inDf.show() + //inDf.show() println("++++++++++++++++++++++++++++++++++++++++++++++++++001") + println(inDf.count()) inDf.schema.printTreeString() - -// var list2 = new ArrayBuffer[ArrayBuffer[String]] - var list222 = new ArrayList[ArrayList[String]] + var listSeq = new ArrayList[ArrayList[String]] val rows: Array[Row] = inDf.collect() try { for (i <- 0 until rows.size) { - val sourceFile = rows(i)(0).toString + val sourceFile = rows(i)(0) println("++++++++++++++++++++++++++++++++++++++++++++++++++002" + sourceFile) + // 字节数组反序列化 为 ByteArrayInputStream + val bis:ByteArrayInputStream=new ByteArrayInputStream(inDf.head().get(0).asInstanceOf[Array[Byte]]) - if (sourceFile.endsWith("seq")) { - println(sourceFile) + //val fileinputStream = new FileInputStream(sourceFile) + println("Start processing file ----->" + sourceFile) - val fileinputStream = new FileInputStream(sourceFile) - println("Start processing file ----->" + sourceFile) + val br = new BufferedReader(new InputStreamReader(bis)) - val br = new BufferedReader(new InputStreamReader(fileinputStream)) + // 解析seq 文件 的字节流 + val sequenceIterator = CustomIOTools.IOTools.readGenbankDNA(br, null) - val sequenceIterator = CustomIOTools.IOTools.readGenbankDNA(br, null) + var doc: JSONObject = null + var count = 0 + while (sequenceIterator.hasNext) { + var listJson = new ArrayList[String] + doc = new JSONObject() + try { + var seq = sequenceIterator.nextRichSequence() - var doc: JSONObject = null - var count = 0 + Process.processSingleSequence(seq, doc) + // json 字符串 + listJson.add(doc.toString()) + // 序列号 CP009630 + listJson.add(seq.getAccession) + listSeq.add(listJson) - while (sequenceIterator.hasNext) { -// var list1 = new ArrayBuffer[String] - var list001 = new ArrayList[String] - - doc = new JSONObject() - try { - var seq = sequenceIterator.nextRichSequence() - - if (seq.getAccession.equals("CP010565")) { - println(seq.getAccession) - } - - Process.processSingleSequence(seq, doc) - - println("++++++++++++++++++++++++++++++++++++++++++++++++++" + sourceFile) - - //list1+=(sourceFile) - // list1+=(seq.getAccession) - // list1+=(doc.toString()) - // list2+=(list1) - // println(list1.size+"-----"+list2.size) - list001.add(doc.toString()) - list001.add(seq.getAccession) - list222.add(list001) - - } - catch { - case e: BioException => - e.getMessage - case e: ParseException => - e.printStackTrace() - } + } + catch { + case e: BioException => + e.getMessage + case e: ParseException => + e.printStackTrace() } } } @@ -117,27 +89,29 @@ class GenBankParse extends ConfigurableStop{ e.printStackTrace() } - - println("#################") - - var jsonDF: DataFrame = null - for (i <- 0 until list222.size()) { + for (i <- 0 until listSeq.size()) { -// println("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$" + i) -// println(list222.get(i).size()) -// -// val esId = list222.get(i).get(1).toString -// println(esId) + println("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$" + i) + // + println(listSeq.get(i).size()) + + // seq 文件中的 json 字符串 + val jsonString = listSeq.get(i).get(0) + + // 序列号 CP009630 + val esId = listSeq.get(i).get(1).toString + println(esId) // 加载 json 字符串 为 df - val jsonRDD = spark.sparkContext.makeRDD(list222.get(i).get(0).toString() :: Nil) + val jsonRDD = spark.sparkContext.makeRDD(jsonString.toString() :: Nil) jsonDF = spark.read.json(jsonRDD) jsonDF.show() - jsonDF.schema.printTreeString() + // jsonDF.schema.printTreeString() val options = Map("es.index.auto.create"-> "true", + // "es.mapping.id"->"Accession", "es.nodes"->es_nodes,"es.port"->port) // df 写入 es diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/util/UnGzUtil.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/util/UnGzUtil.scala index 7e9e466..c6104d6 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/util/UnGzUtil.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/util/UnGzUtil.scala @@ -45,21 +45,25 @@ object UnGzUtil extends Serializable{ - def unGzStream(inputDir:String):GZIPInputStream = { + def unGzStream(inputDir:String):Array[Byte] = { + var gzip:GZIPInputStream = null + var byteArrayOutputStream:ByteArrayOutputStream=new ByteArrayOutputStream() + val buffer=new Array[Byte](1024*1024) - try { - val fileInput = new FileInputStream(inputDir) - gzip = new GZIPInputStream(fileInput) + val fileInput = new FileInputStream(inputDir) + gzip = new GZIPInputStream(fileInput) + + var count = -1 + while ((count = gzip.read(buffer)) != -1 && (count != -1) ){ + + byteArrayOutputStream.write(buffer,0,count) - } catch { - case e:FtpProtocolException=> - e.printStackTrace() - case e: IOException => - e.printStackTrace() } - return gzip + val byteArray: Array[Byte] = byteArrayOutputStream.toByteArray + + return byteArray } }