Parsing of ensembl data

Yang QiDong
This commit is contained in:
yanfqidong0604 2019-01-08 20:46:47 +08:00
parent 53cb53f713
commit 3332ee704e
4 changed files with 249 additions and 0 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.0 KiB

View File

@ -0,0 +1,48 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"1111",
"name":"SelectFilesByName",
"bundle":"cn.piflow.bundle.ftp.SelectFilesByName",
"properties":{
"HDFSUrl":"hdfs://10.0.88.70:9000",
"HDFSPath":"/yqd/weishengwu/Ensembl_gff3/",
"selectionConditions":".*.gff3"
}
},{
"uuid":"3333",
"name":"Ensembl_gff3Parser",
"bundle":"cn.piflow.bundle.microorganism.Ensembl_gff3Parser",
"properties":{
}
},{
"uuid":"4444",
"name":"PutEs",
"bundle":"cn.piflow.bundle.es.PutEs",
"properties":{
"es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72",
"port": "9200",
"es_index": "ensemblgff3",
"es_type": "ensemblgff3"
}
}
],
"paths":[
{
"from":"SelectFilesByName",
"outport":"",
"inport":"",
"to":"Ensembl_gff3Parser"
},{
"from":"Ensembl_gff3Parser",
"outport":"",
"inport":"",
"to":"PutEs"
}
]
}
}

View File

@ -0,0 +1,146 @@
package cn.piflow.bundle.microorganism
import java.io.{BufferedInputStream, BufferedReader, ByteArrayInputStream, InputStreamReader}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.bundle.microorganism.util.ParserGff3Data
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.ImageUtil
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, FileSystem, Path}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.biojavax.bio.seq.{RichSequence, RichSequenceIterator}
import org.json.JSONObject
class Ensembl_gff3Parser extends ConfigurableStop{
override val authorEmail: String = "yangqidong@cnic.cn"
override val description: String = "ensembl type data"
override val inportList: List[String] =List(PortEnum.DefaultPort.toString)
override val outportList: List[String] = List(PortEnum.DefaultPort.toString)
override def setProperties(map: Map[String, Any]): Unit = {
}
override def getPropertyDescriptor(): List[PropertyDescriptor] ={
var descriptor : List[PropertyDescriptor] = List()
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("/microorganism/Ensembl.png")
}
override def getGroup(): List[String] = {
List(StopGroup.MicroorganismGroup)
}
override def initialize(ctx: ProcessContext): Unit = {
}
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val session = pec.get[SparkSession]()
val inDf: DataFrame = in.read()
val configuration: Configuration = new Configuration()
var pathStr: String = ""
var hdfsUrl:String=""
try{
pathStr =inDf.take(1)(0).get(0).asInstanceOf[String]
val pathARR: Array[String] = pathStr.split("\\/")
for (x <- (0 until 3)){
hdfsUrl+=(pathARR(x) +"/")
}
}catch {
case e:Exception => throw new Exception("Path error")
}
configuration.set("fs.defaultFS",hdfsUrl)
var fs: FileSystem = FileSystem.get(configuration)
val hdfsPathTemporary:String = hdfsUrl+"/Refseq_genomeParser_temporary.json"
val path: Path = new Path(hdfsPathTemporary)
if(fs.exists(path)){
fs.delete(path)
}
fs.create(path).close()
var fdos: FSDataOutputStream = fs.append(path)
val buff: Array[Byte] = new Array[Byte](1048576)
val parser: ParserGff3Data = new ParserGff3Data
var bis: BufferedInputStream =null
var fdis: FSDataInputStream =null
var br: BufferedReader = null
var sequences: RichSequenceIterator = null
var doc: JSONObject = null
var seq: RichSequence = null
var jsonStr: String = ""
var n:Int=0
inDf.collect().foreach(row => {
pathStr = row.get(0).asInstanceOf[String]
println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! start parser ^^^" + pathStr)
fdis = fs.open(new Path(pathStr))
br = new BufferedReader(new InputStreamReader(fdis))
var eachStr:String=null
while((eachStr = br.readLine()) != null && eachStr != null /*&& n<=10000*/ ){
doc = parser.parserGff3(eachStr)
jsonStr = doc.toString
if(jsonStr.length > 2){
n +=1
println("start " + n + " String" /*+ jsonStr*/)
if (n == 1) {
bis = new BufferedInputStream(new ByteArrayInputStream(("[" + jsonStr).getBytes()))
} else {
bis = new BufferedInputStream(new ByteArrayInputStream(("," + jsonStr).getBytes()))
}
var count: Int = bis.read(buff)
while (count != -1) {
fdos.write(buff, 0, count)
fdos.flush()
count = bis.read(buff)
}
fdos.flush()
bis = null
doc = null
seq = null
jsonStr = ""
}
}
sequences = null
br = null
fdis =null
pathStr = null
})
bis = new BufferedInputStream(new ByteArrayInputStream(("]").getBytes()))
var count: Int = bis.read(buff)
while (count != -1) {
fdos.write(buff, 0, count)
fdos.flush()
count = bis.read(buff)
}
fdos.flush()
bis.close()
fdos.close()
// println("start parser HDFSjsonFile --------------------")
val df: DataFrame = session.read.json(hdfsPathTemporary)
println("############################################################")
// println(df.count())
df.show(20)
// df.printSchema()
println("############################################################")
out.write(df)
}
}

View File

@ -0,0 +1,55 @@
package cn.piflow.bundle.microorganism.util
import java.util.HashMap
import org.json.JSONObject
class ParserGff3Data {
def parserAttributes(eachFileStr: String): HashMap[String, String] = {
val map: HashMap[String, String] = new HashMap[String,String]()
val eachArr = eachFileStr.split(";")
for(each <- eachArr){
try{
val k: String = each.split("=")(0)
val v: String = each.split("=")(1)
map.put(k,v)
}catch {
case e : Exception => throw new Exception("File format error")
}
}
map
}
def parserGff3(eachLine: String): JSONObject = {
var doc: JSONObject =new JSONObject()
val eachArr: Array[String] = eachLine.split("\u0009")
if(eachArr.size ==9){
for(x <- (0 until 9)){
val eachFileStr = eachArr(x)
if(x == 0){
doc.put("reference_sequence",eachFileStr)
}else if(x == 1){
doc.put("source ",eachFileStr)
}else if(x == 2){
doc.put("type",eachFileStr)
}else if(x == 3){
doc.put("start_position",eachFileStr)
}else if(x == 4){
doc.put("end_position",eachFileStr)
}else if(x == 5){
doc.put("score",eachFileStr)
}else if(x == 6){
doc.put("strand",eachFileStr)
}else if(x == 7){
doc.put("phase",eachFileStr)
}else if(x == 8){
var map:HashMap[String, String]=parserAttributes(eachFileStr)
doc.put("attributes",map)
}
}
}
return doc
}
}