update genbank

This commit is contained in:
yanggang 2018-11-06 10:01:55 +08:00
parent 32749fa906
commit 1b956b7cf0
4 changed files with 98 additions and 108 deletions

View File

@ -8,10 +8,10 @@
"name":"LoadFromFtpUrl",
"bundle":"cn.piflow.bundle.ftp.LoadFromFtpUrl",
"properties":{
"url_str":"https://ftp.ncbi.nih.gov/genbank/docs/",
"url_type":"dir",
"url_str":"https://ftp.ncbi.nih.gov/genbank/gbbct151.seq.gz",
"url_type":"file",
"localPath":"/ftpUrlDownLoadDIR555",
"downType":"all11"
"downType":"all"
}
},
{
@ -19,7 +19,7 @@
"name":"UnGz",
"bundle":"cn.piflow.bundle.ftp.UnGz",
"properties":{
"localPath":"/ftpUrlUnGz222"
"gzType":"seq.gz"
}
},
{
@ -30,7 +30,7 @@
"es_nodes":"10.0.86.239",
"port":"9200",
"es_index":"genbank",
"es_type":"data1"
"es_type":"data6"
}
}
],

View File

@ -1,11 +1,14 @@
package cn.piflow.bundle.ftp
import java.util
import java.util.zip.GZIPInputStream
import cn.piflow.bundle.util.UnGzUtil
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroupEnum}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
class UnGz extends ConfigurableStop{
@ -14,9 +17,7 @@ class UnGz extends ConfigurableStop{
val inportList: List[String] = List(PortEnum.NonePort.toString)
val outportList: List[String] = List(PortEnum.NonePort.toString)
var localPath:String =_
var list = List("")
var gzType:String =_
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
@ -27,30 +28,41 @@ class UnGz extends ConfigurableStop{
inDf.show()
inDf.schema.printTreeString()
// df => Array[Row]
val rows: Array[Row] = inDf.collect()
var df:DataFrame = null
var count = 0
for (i <- 0 until rows.size) {
//row(i) [/ftpUrlDownLoadDIR555/gbbct151.seq.gz]
// 文件 路径
val sourceFile = rows(i)(0).toString
// println(sourceFile+"----------------------------------")
if(sourceFile.endsWith("seq.gz")){
for (i <- 0 until rows.size) {
val sourceFile = rows(i)(0).toString
if (count == 0){
println(count+"-------------------------------")
if(sourceFile.endsWith("seq.gz")){
println(sourceFile+"----------------------------------")
val strings: Array[String] = sourceFile.split("/")
val fileName = strings(strings.length-1).split(".gz")(0)
val savePath = localPath+sourceFile.split(".seq.gz")(0)
// 加载文件为 byteArray
val byteArray: Array[Byte] = UnGzUtil.unGzStream(sourceFile)
println(savePath)
df = Seq(byteArray).toDF()
count = count+1
} else {
println(count+"-------------------------------")
val byteArray: Array[Byte] = UnGzUtil.unGzStream(sourceFile)
val filePath = UnGzUtil.unGz(sourceFile,savePath,fileName)
println(filePath)
list = filePath::list
df = df.union(Seq(byteArray).toDF())
}
}
}
val df = sc.parallelize(list).toDF("filePath")
df.schema.printTreeString()
df.show()
println(df.count())
out.write(df)
}
@ -58,13 +70,13 @@ class UnGz extends ConfigurableStop{
def setProperties(map: Map[String, Any]): Unit = {
localPath=MapUtil.get(map,key="localPath").asInstanceOf[String]
gzType=MapUtil.get(map,key="gzType").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val savePath = new PropertyDescriptor().name("savePath").displayName("savePath").defaultValue("").required(true)
descriptor = savePath :: descriptor
val gzType = new PropertyDescriptor().name("gzType").displayName("gzType").defaultValue("").required(true)
descriptor = gzType :: descriptor
descriptor
}

View File

@ -1,29 +1,18 @@
package cn.piflow.bundle.microorganism
import java.io._
import java.net.URL
import java.text.ParseException
import java.util
import java.util.ArrayList
import java.util.zip.GZIPInputStream
import cn.piflow.bundle.microorganism.util.CustomIOTools
import cn.piflow.bundle.microorganism.util.Process
import cn.piflow.bundle.microorganism.util.{CustomIOTools, Process}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroupEnum}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.biojava.bio.BioException
import org.elasticsearch.action.bulk.BulkProcessor
import org.json.JSONObject
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.spark.sql.EsSparkSQL
import scala.collection.mutable.ArrayBuffer
import org.json.JSONObject
class GenBankParse extends ConfigurableStop{
@ -41,72 +30,55 @@ class GenBankParse extends ConfigurableStop{
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
val sc = spark.sparkContext
import spark.sqlContext.implicits._
import spark.implicits._
val inDf = in.read()
inDf.show()
//inDf.show()
println("++++++++++++++++++++++++++++++++++++++++++++++++++001")
println(inDf.count())
inDf.schema.printTreeString()
// var list2 = new ArrayBuffer[ArrayBuffer[String]]
var list222 = new ArrayList[ArrayList[String]]
var listSeq = new ArrayList[ArrayList[String]]
val rows: Array[Row] = inDf.collect()
try {
for (i <- 0 until rows.size) {
val sourceFile = rows(i)(0).toString
val sourceFile = rows(i)(0)
println("++++++++++++++++++++++++++++++++++++++++++++++++++002" + sourceFile)
// 字节数组反序列化 ByteArrayInputStream
val bis:ByteArrayInputStream=new ByteArrayInputStream(inDf.head().get(0).asInstanceOf[Array[Byte]])
if (sourceFile.endsWith("seq")) {
println(sourceFile)
//val fileinputStream = new FileInputStream(sourceFile)
println("Start processing file ----->" + sourceFile)
val fileinputStream = new FileInputStream(sourceFile)
println("Start processing file ----->" + sourceFile)
val br = new BufferedReader(new InputStreamReader(bis))
val br = new BufferedReader(new InputStreamReader(fileinputStream))
// 解析seq 文件 的字节流
val sequenceIterator = CustomIOTools.IOTools.readGenbankDNA(br, null)
val sequenceIterator = CustomIOTools.IOTools.readGenbankDNA(br, null)
var doc: JSONObject = null
var count = 0
while (sequenceIterator.hasNext) {
var listJson = new ArrayList[String]
doc = new JSONObject()
try {
var seq = sequenceIterator.nextRichSequence()
var doc: JSONObject = null
var count = 0
Process.processSingleSequence(seq, doc)
// json 字符串
listJson.add(doc.toString())
// 序列号 CP009630
listJson.add(seq.getAccession)
listSeq.add(listJson)
while (sequenceIterator.hasNext) {
// var list1 = new ArrayBuffer[String]
var list001 = new ArrayList[String]
doc = new JSONObject()
try {
var seq = sequenceIterator.nextRichSequence()
if (seq.getAccession.equals("CP010565")) {
println(seq.getAccession)
}
Process.processSingleSequence(seq, doc)
println("++++++++++++++++++++++++++++++++++++++++++++++++++" + sourceFile)
//list1+=(sourceFile)
// list1+=(seq.getAccession)
// list1+=(doc.toString())
// list2+=(list1)
// println(list1.size+"-----"+list2.size)
list001.add(doc.toString())
list001.add(seq.getAccession)
list222.add(list001)
}
catch {
case e: BioException =>
e.getMessage
case e: ParseException =>
e.printStackTrace()
}
}
catch {
case e: BioException =>
e.getMessage
case e: ParseException =>
e.printStackTrace()
}
}
}
@ -117,27 +89,29 @@ class GenBankParse extends ConfigurableStop{
e.printStackTrace()
}
println("#################")
var jsonDF: DataFrame = null
for (i <- 0 until list222.size()) {
for (i <- 0 until listSeq.size()) {
// println("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$" + i)
// println(list222.get(i).size())
//
// val esId = list222.get(i).get(1).toString
// println(esId)
println("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$" + i)
//
println(listSeq.get(i).size())
// seq 文件中的 json 字符串
val jsonString = listSeq.get(i).get(0)
// 序列号 CP009630
val esId = listSeq.get(i).get(1).toString
println(esId)
// 加载 json 字符串 df
val jsonRDD = spark.sparkContext.makeRDD(list222.get(i).get(0).toString() :: Nil)
val jsonRDD = spark.sparkContext.makeRDD(jsonString.toString() :: Nil)
jsonDF = spark.read.json(jsonRDD)
jsonDF.show()
jsonDF.schema.printTreeString()
// jsonDF.schema.printTreeString()
val options = Map("es.index.auto.create"-> "true",
// "es.mapping.id"->"Accession",
"es.nodes"->es_nodes,"es.port"->port)
// df 写入 es

View File

@ -45,21 +45,25 @@ object UnGzUtil extends Serializable{
def unGzStream(inputDir:String):GZIPInputStream = {
def unGzStream(inputDir:String):Array[Byte] = {
var gzip:GZIPInputStream = null
var byteArrayOutputStream:ByteArrayOutputStream=new ByteArrayOutputStream()
val buffer=new Array[Byte](1024*1024)
try {
val fileInput = new FileInputStream(inputDir)
gzip = new GZIPInputStream(fileInput)
val fileInput = new FileInputStream(inputDir)
gzip = new GZIPInputStream(fileInput)
var count = -1
while ((count = gzip.read(buffer)) != -1 && (count != -1) ){
byteArrayOutputStream.write(buffer,0,count)
} catch {
case e:FtpProtocolException=>
e.printStackTrace()
case e: IOException =>
e.printStackTrace()
}
return gzip
val byteArray: Array[Byte] = byteArrayOutputStream.toByteArray
return byteArray
}
}