Gene type data parsing

QiDong Yang
This commit is contained in:
yanfqidong0604 2018-12-26 16:56:15 +08:00
parent 501b5fde3a
commit f6e9573018
5 changed files with 229 additions and 71 deletions

View File

@ -0,0 +1,48 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"1111",
"name":"SelectFilesByName",
"bundle":"cn.piflow.bundle.ftp.SelectFilesByName",
"properties":{
"HDFSUrl":"hdfs://10.0.88.70:9000",
"HDFSPath":"/yqd/weishengwu/gene_w/",
"selectionConditions":".*ene_info"
}
},{
"uuid":"3333",
"name":"GeneParser",
"bundle":"cn.piflow.bundle.microorganism.GeneParser",
"properties":{
}
},{
"uuid":"4444",
"name":"PutEs",
"bundle":"cn.piflow.bundle.es.PutEs",
"properties":{
"es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72",
"port": "9200",
"es_index": "gene_index",
"es_type": "gene_type"
}
}
],
"paths":[
{
"from":"SelectFilesByName",
"outport":"",
"inport":"",
"to":"GeneParser"
},{
"from":"GeneParser",
"outport":"",
"inport":"",
"to":"PutEs"
}
]
}
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.6 KiB

View File

@ -0,0 +1,158 @@
package cn.piflow.bundle.microorganism
import java.io.{BufferedInputStream, BufferedReader, ByteArrayInputStream, InputStreamReader}
import java.text.SimpleDateFormat
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.ImageUtil
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, FileSystem, Path}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.json.JSONObject
class GeneParser extends ConfigurableStop{
override val authorEmail: String = "yangqidong@cnic.cn"
override val description: String = "Parsing gene type data"
override val inportList: List[String] =List(PortEnum.DefaultPort.toString)
override val outportList: List[String] = List(PortEnum.DefaultPort.toString)
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val session = pec.get[SparkSession]()
val inDf: DataFrame = in.read()
val configuration: Configuration = new Configuration()
var pathStr: String =inDf.take(1)(0).get(0).asInstanceOf[String]
val pathARR: Array[String] = pathStr.split("\\/")
var hdfsUrl:String=""
for (x <- (0 until 3)){
hdfsUrl+=(pathARR(x) +"/")
}
configuration.set("fs.defaultFS",hdfsUrl)
var fs: FileSystem = FileSystem.get(configuration)
val hdfsPathTemporary:String = hdfsUrl+"/Refseq_genomeParser_temporary.json"
val path: Path = new Path(hdfsPathTemporary)
if(fs.exists(path)){
fs.delete(path)
}
fs.create(path).close()
var fdos: FSDataOutputStream = fs.append(path)
var jsonStr: String =""
var bis: BufferedInputStream =null
var names:Array[String]=Array("tax_id", "geneID", "symbol", "locus_tag", "synonyms", "dbxrefs", "chromosome", "map_location", "description", "type_of_gene",
"symbol_from_nomenclature_authority", "full_name_from_nomenclature_authority",
"nomenclature_status", "other_designations", "modification_date")
val format: java.text.DateFormat = new SimpleDateFormat("yyyyMMdd").asInstanceOf[java.text.DateFormat]
val newFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
var n:Int=0
inDf.collect().foreach(row => {
pathStr = row.get(0).asInstanceOf[String]
println("########################## start parser ^^^" + pathStr)
var fdis: FSDataInputStream = fs.open(new Path(pathStr))
var br: BufferedReader = new BufferedReader(new InputStreamReader(fdis))
var line:String=""
var doc:JSONObject=null
while ((line=br.readLine()) != null /*&& n<1000*/){
if( ! line.startsWith("#")){
n += 1
doc=new JSONObject()
val tokens: Array[String] = line.split("\\\t")
for(i <- (0 until 15)){
if(i < 2){
doc.put(names(i),Integer.parseInt(tokens(i).trim))
}else if(i < 14){
if(tokens(i).equals("-")){
doc.put(names(i),"")
}else{
doc.put(names(i),tokens(i))
}
}else{
doc.put(names(i),newFormat.format(format.parse(tokens(i))))
}
}
jsonStr = doc.toString
println(n+"^^^^^^^^^^^^^^^^^^^^")
if (n == 1) {
bis = new BufferedInputStream(new ByteArrayInputStream(("[" + jsonStr).getBytes()))
} else {
bis = new BufferedInputStream(new ByteArrayInputStream(("," + jsonStr).getBytes()))
}
val buff: Array[Byte] = new Array[Byte](1048576)
var count: Int = bis.read(buff)
while (count != -1) {
fdos.write(buff, 0, count)
fdos.flush()
count = bis.read(buff)
}
fdos.flush()
bis = null
doc = null
}
}
})
bis = new BufferedInputStream(new ByteArrayInputStream(("]").getBytes()))
val buff: Array[Byte] = new Array[Byte](1048576)
var count: Int = bis.read(buff)
while (count != -1) {
fdos.write(buff, 0, count)
fdos.flush()
count = bis.read(buff)
}
fdos.flush()
fdos.close()
println("start parser HDFSjsonFile")
val df: DataFrame = session.read.json(hdfsPathTemporary)
println("############################################################")
println(df.count())
df.show(20)
println("############################################################")
out.write(df)
}
override def setProperties(map: Map[String, Any]): Unit = {
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("/microorganism/gene.png")
}
override def getGroup(): List[String] = {
List(StopGroup.MicroorganismGroup.toString)
}
override def initialize(ctx: ProcessContext): Unit = {
}
}

View File

@ -46,15 +46,12 @@ class RefseqParser extends ConfigurableStop{
fs.create(path).close()
var fdos: FSDataOutputStream = fs.append(path)
val buff: Array[Byte] = new Array[Byte](1048576)
var jsonStr: String =""
var bis: BufferedInputStream =null
// var df: DataFrame =null
// var d: DataFrame =null
// var jsonRDD: RDD[String] =null
inDf.collect().foreach(row => {
var n : Int =0
@ -66,13 +63,7 @@ class RefseqParser extends ConfigurableStop{
// if(pathStr.equals("hdfs://10.0.88.70:9000/yqd/weishengwu/refseq/bacteria.1.genomic.gbff")) {
var fdis: FSDataInputStream = fs.open(new Path(pathStr))
// var fdis: FSDataInputStream = fs.open(new Path("hdfs://10.0.88.70:9000/yqd/weishengwu/refseq/bacteria.1.1.genomic.fna.gz"))
// var gzipout: GZIPInputStream = new GZIPInputStream(fdis)
// var br: BufferedReader = new BufferedReader(new InputStreamReader(gzipout))
var fdis: FSDataInputStream = fs.open(new Path(pathStr))
var br: BufferedReader = new BufferedReader(new InputStreamReader(fdis))
@ -92,44 +83,30 @@ class RefseqParser extends ConfigurableStop{
bis = new BufferedInputStream(new ByteArrayInputStream(("," + jsonStr).getBytes()))
}
val buff: Array[Byte] = new Array[Byte](1048576)
var count: Int = bis.read(buff)
while (count != -1) {
fdos.write(buff, 0, count)
fdos.flush()
count = bis.read(buff)
}
/* if(n==1){
jsonRDD = session.sparkContext.makeRDD(jsonStr :: Nil)
df = session.read.json(jsonRDD)
}else{
jsonRDD = session.sparkContext.makeRDD(jsonStr :: Nil)
d = session.read.json(jsonRDD)
df = df.union(d.toDF(df.columns:_*))
}*/
fdos.flush()
bis = null
seq = null
doc = null
// jsonRDD = null
// d = null
}
bis = new BufferedInputStream(new ByteArrayInputStream(("]").getBytes()))
val buff: Array[Byte] = new Array[Byte](1048576)
var count: Int = bis.read(buff)
while (count != -1) {
fdos.write(buff, 0, count)
fdos.flush()
count = bis.read(buff)
}
fdos.flush()
// }
})
bis = new BufferedInputStream(new ByteArrayInputStream(("]").getBytes()))
var count: Int = bis.read(buff)
while (count != -1) {
fdos.write(buff, 0, count)
fdos.flush()
count = bis.read(buff)
}
fdos.flush()
bis.close()
fdos.close()
println("start parser HDFSjsonFile")

View File

@ -5,8 +5,6 @@ import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.jsoup.Jsoup
import org.jsoup.select.Elements
import org.junit.Test
import scala.util.parsing.json.JSON
@ -20,7 +18,7 @@ class emblTest {
// val file = "src/main/resources/yqd/down.json"
//val file = "src/main/resources/yqd/refseq_genome.json"
//val file = "src/main/resources/yqd/select_unzip.json"
val file = "src/main/resources/yqd/embl_parser.json"
val file = "src/main/resources/microorganism/gene.json"
val flowJsonStr = FileUtil.fileReader(file)
@ -34,12 +32,17 @@ val file = "src/main/resources/yqd/embl_parser.json"
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start()
//execute flow
val spark = SparkSession.builder()
.master("spark://10.0.88.70:7077")
.appName("Embl")
.config("spark.driver.memory", "8g")
.config("spark.executor.memory", "16g")
.config("spark.cores.max", "16")
.config("spark.jars","/root/Desktop/weishengwu/out/artifacts/piflow_bundle/piflow_bundle.jar")
.master("yarn")
.appName("test18")
.config("spark.deploy.mode","client")
.config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g")
.config("spark.cores.max", "4")
.config("hive.metastore.uris","thrift://10.0.88.64:9083")
.config("spark.yarn.am.extraJavaOptions","-Dhdp.version=2.6.5.0-292")
.config("spark.hadoop.yarn.resourcemanager.address","master2.packone:8050")
.config("spark.hadoop.fs.defaultFS","hdfs://master2.packone:8020")
.config("spark.jars","/git_1225/out/artifacts/piflow/piflow.jar")
.enableHiveSupport()
.getOrCreate()
@ -55,33 +58,5 @@ val file = "src/main/resources/yqd/embl_parser.json"
}
@Test
def testEmblDataParse11(): Unit ={
val url ="http://ftp.ebi.ac.uk/pub/databases/ena/sequence/release/"
val doc = Jsoup.connect(url).timeout(100000000).get()
// 获取 url 界面 文件名字 日期 大小
// Name Last modified Size Parent Directory -
// build_gbff_cu.pl 2003-04-25 17:23 21K
val elements: Elements = doc.select("html >body >table >tbody")
// println(elements)
println(elements.first().text())
// 按行 分割 elements 为单个字符串
val fileString = elements.first().text().split("\\n")
for (i <- 0 until fileString.size) {
println(fileString(i))
}
println(fileString)
}
}