Merge remote-tracking branch 'origin/master'

This commit is contained in:
judy0131 2018-12-24 11:28:41 +08:00
commit a509a6d70a
17 changed files with 5198 additions and 138 deletions

View File

@ -0,0 +1,76 @@
<svg xmlns="http://www.w3.org/2000/svg" id="Layer_1" xmlns:xlink="http://www.w3.org/1999/xlink" viewBox="0 0 587 181" >
<defs>
<circle id="a" r="7.5" fill="#6DAB49"/>
<circle id="b" r="7.5" fill="#DA0F21"/>
</defs>
<use xlink:href="#a" transform="translate(467 28.1)"/>
<use xlink:href="#a" transform="translate(467 48.7)"/>
<use xlink:href="#a" transform="translate(467 69.2)"/>
<use xlink:href="#a" transform="translate(467 89.7)"/>
<use xlink:href="#a" transform="translate(467 130.9)"/>
<use xlink:href="#a" transform="translate(467 151.5)"/>
<use xlink:href="#a" transform="translate(430 48.6)"/>
<use xlink:href="#a" transform="translate(430 69.3)"/>
<use xlink:href="#a" transform="translate(430 89.7)"/>
<use xlink:href="#a" transform="translate(430 110.3)"/>
<use xlink:href="#a" transform="translate(430 130.8)"/>
<use xlink:href="#a" transform="translate(449 38.4)"/>
<use xlink:href="#a" transform="translate(449 59)"/>
<use xlink:href="#a" transform="translate(449 79.5)"/>
<use xlink:href="#a" transform="translate(449 100)"/>
<use xlink:href="#a" transform="translate(449 120.5)"/>
<use xlink:href="#a" transform="translate(449 141)"/>
<use xlink:href="#a" transform="translate(576.7 48.6)"/>
<use xlink:href="#a" transform="translate(576.7 69)"/>
<use xlink:href="#a" transform="translate(576.7 89.7)"/>
<use xlink:href="#a" transform="translate(576.7 110)"/>
<use xlink:href="#a" transform="translate(576.7 131)"/>
<use xlink:href="#a" transform="translate(558 38.4)"/>
<use xlink:href="#a" transform="translate(558 59)"/>
<use xlink:href="#a" transform="translate(558 79.5)"/>
<use xlink:href="#a" transform="translate(558 100)"/>
<use xlink:href="#a" transform="translate(558 120.5)"/>
<use xlink:href="#a" transform="translate(558 141)"/>
<use xlink:href="#a" transform="translate(540 28)"/>
<use xlink:href="#a" transform="translate(540 48.7)"/>
<use xlink:href="#a" transform="translate(540 69)"/>
<use xlink:href="#a" transform="translate(540 89.7)"/>
<use xlink:href="#a" transform="translate(540 110)"/>
<use xlink:href="#a" transform="translate(540 131)"/>
<use xlink:href="#a" transform="translate(540 151.5)"/>
<use xlink:href="#b" transform="translate(467 110)"/>
<use xlink:href="#a" transform="translate(522 18)"/>
<use xlink:href="#a" transform="translate(522 38)"/>
<use xlink:href="#a" transform="translate(522 59)"/>
<use xlink:href="#a" transform="translate(522 79)"/>
<use xlink:href="#a" transform="translate(522 100)"/>
<use xlink:href="#a" transform="translate(522 120.5)"/>
<use xlink:href="#a" transform="translate(522 141)"/>
<use xlink:href="#a" transform="translate(522 161.6)"/>
<use xlink:href="#a" transform="translate(503.5 7.5)"/>
<use xlink:href="#a" transform="translate(503.5 28)"/>
<use xlink:href="#a" transform="translate(503.5 48.6)"/>
<use xlink:href="#a" transform="translate(503.5 69)"/>
<use xlink:href="#a" transform="translate(503.5 90)"/>
<use xlink:href="#a" transform="translate(503.5 110.3)"/>
<use xlink:href="#a" transform="translate(503.5 131)"/>
<use xlink:href="#a" transform="translate(503.5 151.4)"/>
<use xlink:href="#a" transform="translate(485 17.7)"/>
<use xlink:href="#a" transform="translate(485 38.3)"/>
<use xlink:href="#a" transform="translate(485 58.8)"/>
<use xlink:href="#a" transform="translate(485 79.4)"/>
<use xlink:href="#a" transform="translate(485 100)"/>
<use xlink:href="#a" transform="translate(485 120.5)"/>
<use xlink:href="#a" transform="translate(485 141)"/>
<use xlink:href="#a" transform="translate(485 161.6)"/>
<use xlink:href="#a" transform="translate(503.5 172)"/>
<path id="E" d="M-22.8 31H22v-7.7h-36.7V4.5h34v-7h-34v-21h37.5V-31h-45.6z" transform="matrix(1 0 0 -1 26.2 89.6)" fill="#fff"/>
<path d="M-29.6 31h12L.2-21.4 17.8 31h12v-62h-8v52.3l-17.6-52H-4l-17.8 52v-52h-8z" transform="matrix(1 0 0 -1 86 90)" fill="#fff"/>
<path id="B" d="M-24 30.8H2.3c9.4 0 21.7-4.7 21.7-18 0-11.4-8-14-11.3-15.2C15-3.4 21-6.7 21-15.6 21-28 10-31 2.8-31H-24V31zM-15.7-5v-19h15c3.8 0 13.4.2 13.4 9.4 0 9.6-9.4 9.8-13 9.8h-15.4zM2.5 23.7h-18.2V2H1c3.6 0 14.7.3 14.7 10.4 0 9.7-8 11.3-13.2 11.2z" transform="translate(155 90)" fill="#fff"/>
<path id="I" d="M-4.2 31h8.4v-62h-8.4v62z" transform="translate(193 90)" fill="#fff"/>
<use xlink:href="#I" transform="matrix(0 .87 .64 0 151.5 -50.35)"/>
<use xlink:href="#I" transform="matrix(0 .75 .4 0 210 -58)"/>
<use xlink:href="#E" transform="translate(267.8 0)"/>
<use xlink:href="#B" transform="translate(193.4 0)"/>
<use xlink:href="#I" transform="translate(192 0)"/>
</svg>

After

Width:  |  Height:  |  Size: 4.4 KiB

View File

@ -0,0 +1,31 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"1111",
"name":"LoadFromFtpToHDFS",
"bundle":"cn.piflow.bundle.ftp.LoadFromFtpToHDFS",
"properties":{
"url_str":"ftp.ebi.ac.uk",
"port":"",
"username":"",
"password":"",
"ftpFile":"/pub/databases/ena/sequence/release/con/rel_con_env_07_r138.dat.gz",
"HDFSUrl":"hdfs://10.0.88.70:9000",
"HDFSPath":"/yqd/weishengwu/embl/",
"isFile":"true"
}
}
],
"paths":[
{
"from":"",
"outport":"",
"inport":"",
"to":""
}
]
}
}

View File

@ -0,0 +1,67 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"1111",
"name":"SelectFilesByName",
"bundle":"cn.piflow.bundle.ftp.SelectFilesByName",
"properties":{
"HDFSUrl":"hdfs://10.0.88.70:9000",
"HDFSPath":"/yqd/weishengwu/embl",
"selectionConditions":".*con_pro_02_r138.dat.gz,.*con_vrl_01_r138.dat.gz,.*pat_phg_01_r138.dat.gz"
}
},{
"uuid":"2222",
"name":"UnzipFilesOnHDFS_1",
"bundle":"cn.piflow.bundle.http.UnzipFilesOnHDFS_1",
"properties":{
"isCustomize":"false",
"filePath":"",
"fileType":"gz",
"unzipPath":""
}
},
{
"uuid":"3333",
"name":"EmblParser",
"bundle":"cn.piflow.bundle.microorganism.EmblParser",
"properties":{
}
},{
"uuid":"4444",
"name":"PutEs",
"bundle":"cn.piflow.bundle.es.PutEs",
"properties":{
"es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72",
"port": "9200",
"es_index": "embl",
"es_type": "embl"
}
}
],
"paths":[
{
"from":"SelectFilesByName",
"outport":"",
"inport":"",
"to":"UnzipFilesOnHDFS_1"
},
{
"from":"UnzipFilesOnHDFS_1",
"outport":"",
"inport":"",
"to":"EmblParser"
},
{
"from":"EmblParser",
"outport":"",
"inport":"",
"to":"PutEs"
}
]
}
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.5 KiB

View File

@ -0,0 +1,67 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"1111",
"name":"SelectFilesByName",
"bundle":"cn.piflow.bundle.ftp.SelectFilesByName",
"properties":{
"HDFSUrl":"hdfs://10.0.88.70:9000",
"HDFSPath":"/yqd/weishengwu/refseq/",
"selectionConditions":".*genomic.gbff.gz"
}
},{
"uuid":"2222",
"name":"UnzipFilesOnHDFS_1",
"bundle":"cn.piflow.bundle.http.UnzipFilesOnHDFS_1",
"properties":{
"isCustomize":"false",
"filePath":"",
"fileType":"gz",
"unzipPath":""
}
},
{
"uuid":"3333",
"name":"Refseq_genomeParser",
"bundle":"cn.piflow.bundle.microorganism.Refseq_genomeParser",
"properties":{
}
},{
"uuid":"4444",
"name":"PutEs",
"bundle":"cn.piflow.bundle.es.PutEs",
"properties":{
"es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72",
"port": "9200",
"es_index": "genome",
"es_type": "archaea"
}
}
],
"paths":[
{
"from":"SelectFilesByName",
"outport":"",
"inport":"",
"to":"UnzipFilesOnHDFS_1"
},
{
"from":"UnzipFilesOnHDFS_1",
"outport":"",
"inport":"",
"to":"Refseq_genomeParser"
},
{
"from":"Refseq_genomeParser",
"outport":"",
"inport":"",
"to":"PutEs"
}
]
}
}

View File

@ -0,0 +1,37 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"0000",
"name":"SelectFilesByName",
"bundle":"cn.piflow.bundle.ftp.SelectFilesByName",
"properties":{
"HDFSUrl":"hdfs://10.0.88.70:9000",
"HDFSPath":"/yqd/",
"selectionConditions":".*genomic.gbff.gz"
}
},{
"uuid":"1111",
"name":"UnzipFilesOnHDFS_1",
"bundle":"cn.piflow.bundle.http.UnzipFilesOnHDFS_1",
"properties":{
"isCustomize":"true",
"filePath":"hdfs://10.0.88.70:9000/yqd/archaea.1.genomic.gbff.gz",
"fileType":"gz",
"unzipPath":"hdfs://10.0.88.70:9000/yqd/weishengwu/"
}
}
],
"paths":[
{
"from":"SelectFilesByName",
"outport":"",
"inport":"",
"to":"UnzipFilesOnHDFS_1"
}
]
}
}

View File

@ -0,0 +1,141 @@
package cn.piflow.bundle.ftp
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.conf.{ConfigurableStop, PortEnum}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.commons.net.ftp.{FTP, FTPClient, FTPClientConfig, FTPFile}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem, Path}
class LoadFromFtpToHDFS extends ConfigurableStop {
override val authorEmail: String = "yangqidong@cnic.cn"
override val description: String = "Load file from ftp server save on HDFS"
override val inportList: List[String] = List(PortEnum.NonePort.toString)
override val outportList: List[String] = List(PortEnum.NonePort.toString)
var url_str:String =_
var port:String=_
var username:String=_
var password:String=_
var ftpFile:String=_
var HDFSUrl:String=_
var HDFSPath:String=_
var isFile:String=_
var fs: FileSystem=null
var con: FTPClientConfig =null
def downFile(ftp: FTPClient,ftpFilePath:String,HDFSSavePath:String): Unit = {
val changeFlag: Boolean = ftp.changeWorkingDirectory(ftpFilePath)
val files: Array[FTPFile] = ftp.listFiles()
for(x <- files ) {
if (x.isFile) {
println("down start ^^^ "+x.getName)
val hdfsPath: Path = new Path(HDFSSavePath + x.getName)
if(! fs.exists(hdfsPath)){
var fdos: FSDataOutputStream = fs.create(hdfsPath)
ftp.retrieveFile(new String(x.getName.getBytes("GBK"),"ISO-8859-1"), fdos)
fdos.close()
}
} else {
downFile(ftp,ftpFilePath+x.getName+"/",HDFSSavePath+x.getName+"/")
}
}
}
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val configuration: Configuration = new Configuration()
configuration.set("fs.defaultFS", HDFSUrl)
fs = FileSystem.get(configuration)
val ftp:FTPClient = openFtpClient()
if(isFile.equals("true")){
val pathArr: Array[String] = ftpFile.split("/")
var dirPath:String=""
for(x <- (0 until pathArr.length-1)){
dirPath += (pathArr(x)+"/")
}
ftp.changeWorkingDirectory(dirPath)
var fdos: FSDataOutputStream = fs.create(new Path(HDFSPath+pathArr.last))
ftp.retrieveFile(new String(pathArr.last.getBytes("GBK"),"ISO-8859-1"), fdos)
fdos.flush()
fdos.close()
}else{
downFile(ftp,ftpFile,HDFSPath)
}
}
def openFtpClient(): FTPClient = {
val ftp = new FTPClient
if(port.length > 0 ){
ftp.connect(url_str,port.toInt)
}else{
ftp.connect(url_str)
}
if(username.length > 0 && password.length > 0){
ftp.login(username,password)
}else{
ftp.login("anonymous", "121@hotmail.com")
}
ftp.setControlEncoding("GBK")
con = new FTPClientConfig(FTPClientConfig.SYST_NT)
con.setServerLanguageCode("zh")
ftp.setFileType(FTP.BINARY_FILE_TYPE)
ftp
}
override def setProperties(map: Map[String, Any]): Unit = {
url_str=MapUtil.get(map,key="url_str").asInstanceOf[String]
port=MapUtil.get(map,key="port").asInstanceOf[String]
username=MapUtil.get(map,key="username").asInstanceOf[String]
password=MapUtil.get(map,key="password").asInstanceOf[String]
ftpFile=MapUtil.get(map,key="ftpFile").asInstanceOf[String]
HDFSUrl=MapUtil.get(map,key="HDFSUrl").asInstanceOf[String]
HDFSPath=MapUtil.get(map,key="HDFSPath").asInstanceOf[String]
isFile=MapUtil.get(map,key="isFile").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val url_str = new PropertyDescriptor().name("url_str").displayName("URL").defaultValue("IP of FTP server, such as 128.136.0.1 or ftp.ei.addfc.gak").required(true)
val port = new PropertyDescriptor().name("port").displayName("PORT").defaultValue("Port of FTP server").required(false)
val username = new PropertyDescriptor().name("username").displayName("USER_NAME").defaultValue("").required(false)
val password = new PropertyDescriptor().name("password").displayName("PASSWORD").defaultValue("").required(false)
val ftpFile = new PropertyDescriptor().name("ftpFile").displayName("FTP_File").defaultValue("The path of the file to the FTP server, such as /test/Ab/ or /test/Ab/test.txt").required(true)
val HDFSUrl = new PropertyDescriptor().name("HDFSUrl").displayName("HDFSUrl").defaultValue("The URL of the HDFS file system, such as hdfs://10.0.88.70:9000").required(true)
val HDFSPath = new PropertyDescriptor().name("HDFSPath").displayName("HDFSPath").defaultValue("The save path of the HDFS file system, such as /test/Ab/").required(true)
val isFile = new PropertyDescriptor().name("isFile").displayName("isFile").defaultValue("Whether the path is a file or not, if true is filled in, only a single file specified by the path is downloaded. If false is filled in, all files under the folder are downloaded recursively.").required(true)
descriptor = isFile :: descriptor
descriptor = url_str :: descriptor
descriptor = port :: descriptor
descriptor = username :: descriptor
descriptor = password :: descriptor
descriptor = ftpFile :: descriptor
descriptor = HDFSUrl :: descriptor
descriptor = HDFSPath :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("ftp.png")
}
override def getGroup(): List[String] = {
List(StopGroupEnum.FtpGroup.toString)
}
override def initialize(ctx: ProcessContext): Unit = {
}
}

View File

@ -0,0 +1,109 @@
package cn.piflow.bundle.ftp
import java.util.regex.Pattern
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.conf.{ConfigurableStop, PortEnum}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import scala.collection.mutable.ArrayBuffer
class SelectFilesByName extends ConfigurableStop{
override val authorEmail: String = "yangqidong@cnic.cn"
override val description: String = "Selecting files by file name"
override val inportList: List[String] = List(PortEnum.NonePort.toString)
override val outportList: List[String] = List(PortEnum.DefaultPort.toString)
var HDFSUrl:String=_
var HDFSPath:String=_
var selectionConditions:String =_
var fs: FileSystem=null
var pathARR:ArrayBuffer[String]=ArrayBuffer()
var selectArr:Array[String]=null
def selectFile(path: String): Unit = {
val statusesARR: Array[FileStatus] = fs.listStatus(new Path(path))
for(each <- statusesARR){
val pathStr = each.getPath.toString
if(each.isFile){
val fileName: String = pathStr.split("/").last
selectArr = selectionConditions.split(",")
var b: Boolean =false
for(x <- selectArr){
b = Pattern.matches(x,fileName)
if(b){
pathARR += pathStr
}
}
}else{
selectFile(pathStr)
}
}
}
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val session: SparkSession = pec.get[SparkSession]()
val configuration: Configuration = new Configuration()
configuration.set("fs.defaultFS", HDFSUrl)
fs = FileSystem.get(configuration)
selectFile(HDFSPath)
val rows: List[Row] = pathARR.map(each => {
var arr:Array[String]=Array(each)
val row: Row = Row.fromSeq(arr)
row
}).toList
val rowRDD: RDD[Row] = session.sparkContext.makeRDD(rows)
val fields: Array[StructField] = "path".split("/").map(d=>StructField(d,StringType,nullable = true))
val schema: StructType = StructType(fields)
val df: DataFrame = session.createDataFrame(rowRDD,schema)
println("#################################################")
df.show(20)
println("#################################################")
out.write(df)
}
override def setProperties(map: Map[String, Any]): Unit = {
HDFSUrl=MapUtil.get(map,key="HDFSUrl").asInstanceOf[String]
HDFSPath=MapUtil.get(map,key="HDFSPath").asInstanceOf[String]
selectionConditions=MapUtil.get(map,key="selectionConditions").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val HDFSUrl = new PropertyDescriptor().name("HDFSUrl").displayName("HDFSUrl").defaultValue("The URL of the HDFS file system, such as hdfs://10.0.88.70:9000").required(true)
val HDFSPath = new PropertyDescriptor().name("HDFSPath").displayName("HDFSPath").defaultValue("The save path of the HDFS file system, such as /test/Ab").required(true)
val selectionConditions = new PropertyDescriptor().name("selectionConditions").displayName("selectionConditions").defaultValue("To select conditions, you need to fill in regular expressions in java, such as. * abc. *").required(true)
descriptor = HDFSUrl :: descriptor
descriptor = HDFSPath :: descriptor
descriptor = selectionConditions :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("ftp.png")
}
override def getGroup(): List[String] = {
List(StopGroupEnum.FtpGroup.toString)
}
override def initialize(ctx: ProcessContext): Unit = {
}
}

View File

@ -12,178 +12,122 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import scala.collection.mutable.ArrayBuffer
class UnzipFilesOnHDFS extends ConfigurableStop { class UnzipFilesOnHDFS extends ConfigurableStop {
val authorEmail: String = "yangqidong@cnic.cn" val authorEmail: String = "yangqidong@cnic.cn"
val description: String = "Unzip files on HDFS" val description: String = "Unzip files on HDFS"
val inportList: List[String] = List(PortEnum.NonePort.toString) val inportList: List[String] = List(PortEnum.DefaultPort.toString)
val outportList: List[String] = List(PortEnum.DefaultPort.toString) val outportList: List[String] = List(PortEnum.DefaultPort.toString)
var isCustomize:String=_
var filePath:String=_ var filePath:String=_
var fileType:String=_ var fileType:String=_
var unzipPath:String=_ var unzipPath:String=_
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val session: SparkSession = pec.get[SparkSession]() var session: SparkSession = null
def unzipFile(hdfsFilePath: String, zipFileType: String, unzipHdfsPath: String):String = {
var zft: String = ""
if(zipFileType.length < 1){
zft = hdfsFilePath.split("\\.").last
}else{
zft = zipFileType
}
val configuration: Configuration = new Configuration() val configuration: Configuration = new Configuration()
val pathARR: Array[String] = filePath.split("\\/") val pathARR: Array[String] = hdfsFilePath.split("\\/")
var hdfsUrl:String="" var hdfsUrl:String=""
for (x <- (0 until 3)){ for (x <- (0 until 3)){
hdfsUrl+=(pathARR(x) +"/") hdfsUrl+=(pathARR(x) +"/")
} }
configuration.set("fs.defaultFS",hdfsUrl) configuration.set("fs.defaultFS",hdfsUrl)
// configuration.set("dfs.nameservices", "nameservice1")
// configuration.set("dfs.ha.namenodes.nameservice1", "nn1,nn2");
// configuration.set("dfs.namenode.rpc-address.nameservice1.nn1", "xxx:8020");
// configuration.set("dfs.namenode.rpc-address.nameservice1.nn2", "xxx:8020");
// configuration.set("dfs.client.failover.proxy.provider.nameservice1"
// ,"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
// configuration.addResource("classpath:/hadoop/core-site.xml"); var uhp : String=""
// configuration.addResource("classpath:/hadoop/hdfs-site.xml"); if(unzipHdfsPath.length < 1){
// configuration.addResource("classpath:/hadoop/mapred-site.xml"); for (x <- (0 until pathARR.length-1)){
uhp+=(pathARR(x) +"/")
}
}else{
uhp=unzipHdfsPath
}
val fs = FileSystem.get(configuration) val fs = FileSystem.get(configuration)
val fdis: FSDataInputStream = fs.open(new Path(filePath)) val fdis: FSDataInputStream = fs.open(new Path(hdfsFilePath))
val filePathArr: Array[String] = hdfsFilePath.split("/")
val filePathArr: Array[String] = filePath.split("/")
var fileName: String = filePathArr.last var fileName: String = filePathArr.last
if(fileName.length == 0){ if(fileName.length == 0){
fileName = filePathArr(filePathArr.size-2) fileName = filePathArr(filePathArr.size-2)
} }
if(fileType.equals("gz")){ var savePath:String=""
if(zft.equals("gz")){
val gzip: GZIPInputStream = new GZIPInputStream(fdis) val gzip: GZIPInputStream = new GZIPInputStream(fdis)
var n = -1 var n = -1
val buf=new Array[Byte](10*1024*1024) val buf=new Array[Byte](10*1024*1024)
val savePath = new Path(unzipPath +fileName.replace(".gz","")) savePath = uhp +fileName.replace(".gz","")
val fdos = fs.create(savePath) val path = new Path(savePath)
val fdos = fs.create(path)
while((n=gzip.read(buf)) != -1 && n != -1){ while((n=gzip.read(buf)) != -1 && n != -1){
fdos.write(buf,0,n) fdos.write(buf,0,n)
fdos.flush() fdos.flush()
} }
fdos.close()
gzip.close()
}/*else if(fileType.equals("tar")){ fdis.close()
}else{
var entryNum:Int=0
var entryFileName:String=null
var entryFile:File=null
var subEntryFile:File=null
var subEntryFileName:String=null
var tarArchiveEntries:Array[TarArchiveEntry]=null
var fileList:List[String]=List()
var fos:FileOutputStream=null
var entry: TarArchiveEntry = null
val tarIs: TarArchiveInputStream = new TarArchiveInputStream(fdis)
while ((entry = tarIs.getNextTarEntry) != null && entry != null) {
entryFileName= localPath +File.separator+entry.getName()
entryFile=new File(entryFileName)
entryNum += 1
if(entry.isDirectory()){
if(!entryFile.exists()){
entryFile.mkdirs()
}
tarArchiveEntries=entry.getDirectoryEntries()
for(i<-0 until tarArchiveEntries.length){
subEntryFileName=entryFileName+File.separator+tarArchiveEntries(i).getName()
subEntryFile=new File(subEntryFileName)
fileList=subEntryFileName::fileList
fos=new FileOutputStream(subEntryFile)
var mark = -1
val buf=new Array[Byte](4*1024)
while((mark=tarIs.read(buf)) != -1 && mark != -1){
fos.write(buf,0,mark)
}
fos.close()
fos=null
}
}else{
fileList = entryFileName :: fileList
fos=new FileOutputStream(entryFile)
var mark = -1
val buf=new Array[Byte](4*1024)
while((mark=tarIs.read(buf)) != -1 && mark != -1){
fos.write(buf,0,mark)
}
fos.close()
fos=null
}
}
if(entryNum==0){
println("there is no file!")
}
}else if(fileType.equals("tar.gz")){
var entryNum:Int=0
var entryFileName:String=null
var entryFile:File=null
var subEntryFile:File=null
var subEntryFileName:String=null
var tarArchiveEntries:Array[TarArchiveEntry]=null
var fileList:List[String]=List()
var fos:FileOutputStream=null
var entry: TarArchiveEntry = null
val gzip:GZIPInputStream=new GZIPInputStream(fdis)
val tarIs: TarArchiveInputStream = new TarArchiveInputStream(gzip)
while ((entry = tarIs.getNextTarEntry) != null && entry != null) {
entryFileName=localPath +File.separator+entry.getName()
entryFile=new File(entryFileName)
entryNum += 1
if(entry.isDirectory()){
if(!entryFile.exists()){
entryFile.mkdirs()
}
tarArchiveEntries=entry.getDirectoryEntries()
for(i<-0 until tarArchiveEntries.length){
subEntryFileName=entryFileName+File.separator+tarArchiveEntries(i).getName()
subEntryFile=new File(subEntryFileName)
fileList=subEntryFileName::fileList
fos=new FileOutputStream(subEntryFile)
var mark = -1
val buf=new Array[Byte](4*1024)
while((mark=tarIs.read(buf)) != -1 && mark != -1){
fos.write(buf,0,mark)
}
fos.close()
fos=null
}
}else{
fileList = entryFileName :: fileList
fos=new FileOutputStream(entryFile)
var mark = -1
val buf=new Array[Byte](4*1024)
while((mark=tarIs.read(buf)) != -1 && mark != -1){
fos.write(buf,0,mark)
}
fos.close()
fos=null
}
}
if(entryNum==0){
println("there is no file!")
}
}*/else{
throw new RuntimeException("File type fill in error, or do not support this type.") throw new RuntimeException("File type fill in error, or do not support this type.")
} }
var seq:Seq[String]=Seq(unzipPath) savePath
val row: Row = Row.fromSeq(seq)
val list:List[Row]=List(row) }
val rdd: RDD[Row] = session.sparkContext.makeRDD(list)
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
session = pec.get[SparkSession]()
var savePath: String = ""
var arr:ArrayBuffer[Row]=ArrayBuffer()
if(isCustomize.equals("true")){
println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
savePath = unzipFile(filePath,fileType,unzipPath)
println("savepath : "+savePath)
arr += Row.fromSeq(Array(savePath))
}else if (isCustomize.equals("false")){
val inDf: DataFrame = in.read()
inDf.collect().foreach(row => {
filePath = row.get(0).asInstanceOf[String]
savePath = unzipFile(filePath,"","")
arr += Row.fromSeq(Array(savePath))
savePath = ""
})
}
val rdd: RDD[Row] = session.sparkContext.makeRDD(arr.toList)
val fields: Array[StructField] =Array(StructField("unzipPath",StringType,nullable = true)) val fields: Array[StructField] =Array(StructField("unzipPath",StringType,nullable = true))
val schema: StructType = StructType(fields) val schema: StructType = StructType(fields)
val df: DataFrame = session.createDataFrame(rdd,schema) val df: DataFrame = session.createDataFrame(rdd,schema)
println("##################################################################################################")
// println(df.count())
df.show(20)
println("##################################################################################################")
out.write(df) out.write(df)
} }
@ -193,6 +137,7 @@ class UnzipFilesOnHDFS extends ConfigurableStop {
} }
def setProperties(map : Map[String, Any]) = { def setProperties(map : Map[String, Any]) = {
isCustomize=MapUtil.get(map,key="isCustomize").asInstanceOf[String]
filePath=MapUtil.get(map,key="filePath").asInstanceOf[String] filePath=MapUtil.get(map,key="filePath").asInstanceOf[String]
fileType=MapUtil.get(map,key="fileType").asInstanceOf[String] fileType=MapUtil.get(map,key="fileType").asInstanceOf[String]
unzipPath=MapUtil.get(map,key="unzipPath").asInstanceOf[String] unzipPath=MapUtil.get(map,key="unzipPath").asInstanceOf[String]
@ -201,9 +146,15 @@ class UnzipFilesOnHDFS extends ConfigurableStop {
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val filePath = new PropertyDescriptor().name("filePath").displayName("filePath").description("file path,such as hdfs://10.0.86.89:9000/a/a.gz").defaultValue("").required(true) val filePath = new PropertyDescriptor().name("filePath").displayName("filePath").description("file path,such as hdfs://10.0.86.89:9000/a/a.gz").defaultValue("").required(false)
val fileType = new PropertyDescriptor().name("fileType").displayName("fileType").description("file type,such as gz").defaultValue("").required(true) val fileType = new PropertyDescriptor().name("fileType").displayName("fileType").description("file type,such as gz").defaultValue("").required(false)
val unzipPath = new PropertyDescriptor().name("unzipPath").displayName("unzipPath").description("unzip path, such as hdfs://10.0.86.89:9000/b/").defaultValue("").required(true) val unzipPath = new PropertyDescriptor().name("unzipPath").displayName("unzipPath").description("unzip path, such as hdfs://10.0.86.89:9000/b/").defaultValue("").required(true)
val isCustomize = new PropertyDescriptor().name("isCustomize").displayName("isCustomize").description("Whether to customize the compressed file path, if true, " +
"you must specify the path where the compressed file is located and the saved path after decompression. " +
"If it is fals, it will automatically find the file path data from the upstream port and " +
"save it to the original folder after decompression.")
.defaultValue("").required(false)
descriptor = isCustomize :: descriptor
descriptor = filePath :: descriptor descriptor = filePath :: descriptor
descriptor = fileType :: descriptor descriptor = fileType :: descriptor
descriptor = unzipPath :: descriptor descriptor = unzipPath :: descriptor
@ -216,7 +167,7 @@ class UnzipFilesOnHDFS extends ConfigurableStop {
} }
override def getGroup(): List[String] = { override def getGroup(): List[String] = {
List(StopGroup.HttpGroup.toString) List(StopGroupEnum.HttpGroup.toString)
} }
} }

View File

@ -0,0 +1,168 @@
package cn.piflow.bundle.microorganism
import java.io._
import cn.piflow.bundle.microorganism.util.{CustomIOTools, Process}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.ImageUtil
import cn.piflow.conf.{ConfigurableStop, PortEnum}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, FileSystem, Path}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.biojavax.bio.seq.{RichSequence, RichSequenceIterator}
import org.json.JSONObject
class EmblParser extends ConfigurableStop{
override val authorEmail: String = "yangqidong@cnic.cn"
override val description: String = "Parsing EMBL type data"
override val inportList: List[String] =List(PortEnum.DefaultPort.toString)
override val outportList: List[String] = List(PortEnum.DefaultPort.toString)
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val session = pec.get[SparkSession]()
val inDf: DataFrame = in.read()
val configuration: Configuration = new Configuration()
var pathStr: String =inDf.take(1)(0).get(0).asInstanceOf[String]
val pathARR: Array[String] = pathStr.split("\\/")
var hdfsUrl:String=""
for (x <- (0 until 3)){
hdfsUrl+=(pathARR(x) +"/")
}
configuration.set("fs.defaultFS",hdfsUrl)
var fs: FileSystem = FileSystem.get(configuration)
val hdfsPathTemporary:String = hdfsUrl+"/Refseq_genomeParser_temporary.json"
val path: Path = new Path(hdfsPathTemporary)
if(fs.exists(path)){
fs.delete(path)
}
fs.create(path).close()
var fdos: FSDataOutputStream = fs.append(path)
var jsonStr: String =""
var bis: BufferedInputStream =null
// var df: DataFrame =null
// var d: DataFrame =null
// var jsonRDD: RDD[String] =null
inDf.collect().foreach(row => {
var n : Int =0
pathStr = row.get(0).asInstanceOf[String]
println("#############################################")
println("start parser ^^^" + pathStr)
println("#############################################")
// if(pathStr.equals("hdfs://10.0.88.70:9000/yqd/weishengwu/refseq/bacteria.1.genomic.gbff")) {
var fdis: FSDataInputStream = fs.open(new Path(pathStr))
// var fdis: FSDataInputStream = fs.open(new Path("hdfs://10.0.88.70:9000/yqd/weishengwu/refseq/bacteria.1.1.genomic.fna.gz"))
// var gzipout: GZIPInputStream = new GZIPInputStream(fdis)
// var br: BufferedReader = new BufferedReader(new InputStreamReader(gzipout))
var br: BufferedReader = new BufferedReader(new InputStreamReader(fdis))
var sequences: RichSequenceIterator = CustomIOTools.IOTools.readEMBLDNA (br, null)
while (sequences.hasNext) {
n += 1
var seq: RichSequence = sequences.nextRichSequence()
var doc: JSONObject = new JSONObject
Process.processEMBL_EnsemblSeq(seq, doc)
jsonStr = doc.toString
println("start " + n)
if (n == 1) {
bis = new BufferedInputStream(new ByteArrayInputStream(("[" + jsonStr).getBytes()))
} else {
bis = new BufferedInputStream(new ByteArrayInputStream(("," + jsonStr).getBytes()))
}
val buff: Array[Byte] = new Array[Byte](1048576)
var count: Int = bis.read(buff)
while (count != -1) {
fdos.write(buff, 0, count)
fdos.flush()
count = bis.read(buff)
}
/* if(n==1){
jsonRDD = session.sparkContext.makeRDD(jsonStr :: Nil)
df = session.read.json(jsonRDD)
}else{
jsonRDD = session.sparkContext.makeRDD(jsonStr :: Nil)
d = session.read.json(jsonRDD)
df = df.union(d.toDF(df.columns:_*))
}*/
fdos.flush()
bis = null
seq = null
doc = null
// jsonRDD = null
// d = null
}
bis = new BufferedInputStream(new ByteArrayInputStream(("]").getBytes()))
val buff: Array[Byte] = new Array[Byte](1048576)
var count: Int = bis.read(buff)
while (count != -1) {
fdos.write(buff, 0, count)
fdos.flush()
count = bis.read(buff)
}
fdos.flush()
// }
})
fdos.close()
println("start parser HDFSjsonFile")
val df: DataFrame = session.read.json(hdfsPathTemporary)
println("############################################################")
// println(df.count())
df.show(20)
println("############################################################")
out.write(df)
}
override def setProperties(map: Map[String, Any]): Unit = {
}
override def getPropertyDescriptor(): List[PropertyDescriptor] ={
var descriptor : List[PropertyDescriptor] = List()
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("/microorganism/EMBL_Logo.svg")
}
override def getGroup(): List[String] = {
List(StopGroupEnum.MicroorganismGroup.toString)
}
override def initialize(ctx: ProcessContext): Unit = {
}
}

View File

@ -0,0 +1,168 @@
package cn.piflow.bundle.microorganism
import java.io._
import cn.piflow.bundle.microorganism.util.{CustomIOTools, Process}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.ImageUtil
import cn.piflow.conf.{ConfigurableStop, PortEnum}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, FileSystem, Path}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.biojavax.bio.seq.{RichSequence, RichSequenceIterator}
import org.json.JSONObject
class RefseqParser extends ConfigurableStop{
override val authorEmail: String = "yangqidong@cnic.cn"
override val description: String = "Parsing Refseq_genome type data"
override val inportList: List[String] =List(PortEnum.DefaultPort.toString)
override val outportList: List[String] = List(PortEnum.DefaultPort.toString)
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val session = pec.get[SparkSession]()
val inDf: DataFrame = in.read()
val configuration: Configuration = new Configuration()
var pathStr: String =inDf.take(1)(0).get(0).asInstanceOf[String]
val pathARR: Array[String] = pathStr.split("\\/")
var hdfsUrl:String=""
for (x <- (0 until 3)){
hdfsUrl+=(pathARR(x) +"/")
}
configuration.set("fs.defaultFS",hdfsUrl)
var fs: FileSystem = FileSystem.get(configuration)
val hdfsPathTemporary:String = hdfsUrl+"/Refseq_genomeParser_temporary.json"
val path: Path = new Path(hdfsPathTemporary)
if(fs.exists(path)){
fs.delete(path)
}
fs.create(path).close()
var fdos: FSDataOutputStream = fs.append(path)
var jsonStr: String =""
var bis: BufferedInputStream =null
// var df: DataFrame =null
// var d: DataFrame =null
// var jsonRDD: RDD[String] =null
inDf.collect().foreach(row => {
var n : Int =0
pathStr = row.get(0).asInstanceOf[String]
println("#############################################")
println("start parser ^^^" + pathStr)
println("#############################################")
// if(pathStr.equals("hdfs://10.0.88.70:9000/yqd/weishengwu/refseq/bacteria.1.genomic.gbff")) {
var fdis: FSDataInputStream = fs.open(new Path(pathStr))
// var fdis: FSDataInputStream = fs.open(new Path("hdfs://10.0.88.70:9000/yqd/weishengwu/refseq/bacteria.1.1.genomic.fna.gz"))
// var gzipout: GZIPInputStream = new GZIPInputStream(fdis)
// var br: BufferedReader = new BufferedReader(new InputStreamReader(gzipout))
var br: BufferedReader = new BufferedReader(new InputStreamReader(fdis))
var sequences: RichSequenceIterator = CustomIOTools.IOTools.readGenbankProtein(br, null)
while (sequences.hasNext) {
n += 1
var seq: RichSequence = sequences.nextRichSequence()
var doc: JSONObject = new JSONObject
Process.processSingleSequence(seq, doc)
jsonStr = doc.toString
println("start " + n)
if (n == 1) {
bis = new BufferedInputStream(new ByteArrayInputStream(("[" + jsonStr).getBytes()))
} else {
bis = new BufferedInputStream(new ByteArrayInputStream(("," + jsonStr).getBytes()))
}
val buff: Array[Byte] = new Array[Byte](1048576)
var count: Int = bis.read(buff)
while (count != -1) {
fdos.write(buff, 0, count)
fdos.flush()
count = bis.read(buff)
}
/* if(n==1){
jsonRDD = session.sparkContext.makeRDD(jsonStr :: Nil)
df = session.read.json(jsonRDD)
}else{
jsonRDD = session.sparkContext.makeRDD(jsonStr :: Nil)
d = session.read.json(jsonRDD)
df = df.union(d.toDF(df.columns:_*))
}*/
fdos.flush()
bis = null
seq = null
doc = null
// jsonRDD = null
// d = null
}
bis = new BufferedInputStream(new ByteArrayInputStream(("]").getBytes()))
val buff: Array[Byte] = new Array[Byte](1048576)
var count: Int = bis.read(buff)
while (count != -1) {
fdos.write(buff, 0, count)
fdos.flush()
count = bis.read(buff)
}
fdos.flush()
// }
})
fdos.close()
println("start parser HDFSjsonFile")
val df: DataFrame = session.read.json(hdfsPathTemporary)
println("############################################################")
// println(df.count())
df.show(20)
println("############################################################")
out.write(df)
}
override def setProperties(map: Map[String, Any]): Unit = {
}
override def getPropertyDescriptor(): List[PropertyDescriptor] ={
var descriptor : List[PropertyDescriptor] = List()
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("/microorganism/refseq.png")
}
override def getGroup(): List[String] = {
List(StopGroupEnum.MicroorganismGroup.toString)
}
override def initialize(ctx: ProcessContext): Unit = {
}
}

View File

@ -1,6 +1,5 @@
package cn.piflow.bundle.microorganism.util; package cn.piflow.bundle.microorganism.util;
import org.biojava.bio.BioError; import org.biojava.bio.BioError;
import org.biojava.bio.BioException; import org.biojava.bio.BioException;
import org.biojava.bio.seq.*; import org.biojava.bio.seq.*;
@ -662,10 +661,19 @@ public interface CustomIOTools {
* @return a <code>RichSequenceIterator</code> over each sequence in the * @return a <code>RichSequenceIterator</code> over each sequence in the
* fasta file * fasta file
*/ */
public static RichSequenceIterator readEMBLDNA(BufferedReader br,
Namespace ns) {
return new RichStreamReader(br, new CustomEMBLFormat(), getDNAParser(),
factory, ns);
}
//parse Ensembl file
public static RichSequenceIterator readEnsembl(BufferedReader br,
Namespace ns) {
return new RichStreamReader(br, new CustomEnsemblFormat(), getDNAParser(),
factory, ns);
}
/** /**
* Iterate over the sequences in an EMBL-format stream of RNA sequences. * Iterate over the sequences in an EMBL-format stream of RNA sequences.
@ -753,7 +761,11 @@ public interface CustomIOTools {
* @return a <code>RichSequenceIterator</code> over each sequence in the * @return a <code>RichSequenceIterator</code> over each sequence in the
* fasta file * fasta file
*/ */
public static RichSequenceIterator readUniProt(BufferedReader br,
Namespace ns) {
return new RichStreamReader(br, new CustomUniProtFormat(),
getProteinParser(), factory, ns);
}
/** /**
* Read a UniProt XML file using a custom type of SymbolList. For * Read a UniProt XML file using a custom type of SymbolList. For

View File

@ -0,0 +1,571 @@
package cn.piflow.bundle.microorganism.util;
import org.biojava.bio.seq.Feature;
import org.biojavax.*;
import org.biojavax.bio.seq.RichFeature;
import org.biojavax.bio.seq.RichSequence;
import org.biojavax.ontology.SimpleComparableTerm;
import org.json.JSONArray;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Created by xiujuan on 2016/3/24.
*/
public class ProcessNew {
static final Logger logger = LoggerFactory.getLogger(ProcessNew.class);
static final Pattern dp = Pattern.compile("(\\d{4})");
static final Pattern llp = Pattern.compile("(\\S+)\\s([SN])\\s(\\S+)\\s([WE])");
static final Pattern submitDatep = Pattern.compile("^Submitted\\s+\\((\\S+)\\)\\s+(.*)$");
static final SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");
static final SimpleDateFormat format = new SimpleDateFormat("dd-MMM-yyyy", Locale.ENGLISH);
// static AddressCountryDict dict = AddressCountryDict.getInstance();
public static HashMap<String,Object> processSingleSequence(RichSequence seq) throws ParseException {
//try{
// logger.info("doc: " + seq.getAccession());
HashMap<String,Object> map = new HashMap() ;
map.put("Sequence", seq.seqString());
map.put("Accession", seq.getAccession());
map.put("SequenceLength", seq.getInternalSymbolList().length());
if (seq.getTaxon() != null) {
map.put("TaxonID", seq.getTaxon().getNCBITaxID());
map.put("Organism", seq.getTaxon().getDisplayName());
}
map.put("Description", seq.getDescription().replace('\n', ' '));
map.put("Division", seq.getDivision());
map.put("Identifier", seq.getIdentifier());
map.put("Version", seq.getVersion());
if (seq.getCircular()) {
map.put("Topology", "Circular");
} else {
map.put("Topology", "Linear");
}
for (Note note : seq.getNoteSet()) {
String noteName = note.getTerm().toString().substring(9);
if (noteName.indexOf("moltype") != -1) {
map.put("MoleculeType", note.getValue());
} else if (noteName.indexOf("Organism") != -1) {
String organism = note.getValue();
//doc.put("Organism", organism.substring(0,organism.indexOf("\n")));
map.put("Lineage", organism.substring(organism.indexOf("\n")).replaceAll("\n", ""));
} else if (noteName.indexOf("acc") != -1) {
map.put("AdditionalAccs", note.getValue());
} else if (noteName.indexOf("DBLink") != -1) { //deal with dblinks
JSONArray dbLinks = new JSONArray();
String[] val = note.getValue().split("\\n");
for (String v : val) {
int index = v.indexOf(":");
if (index != -1) {
JSONObject link = new JSONObject();
link.put(v.substring(0, index), v.substring(index + 1).trim());
dbLinks.put(link);
} else { // value splitted into more than one line
JSONObject last = dbLinks.getJSONObject(dbLinks.length() - 1);
String key = last.keys().next();
String value = last.get(key).toString();
String newVal = value + v;
last.put(key, newVal);
}
}
map.put("dbLinks", dbLinks);
} else if (noteName.equals("kw")) {
map.put("KeyWords", note.getValue());
} else if (noteName.equals("udat")) {
map.put("dateUpdated", formatter.format(format.parse(note.getValue())));
} else {
map.put(noteName, note.getValue());
}
}
//features
JSONArray featureArray = new JSONArray();
Iterator<Feature> featureIterator = seq.features();
List<String> isolates = new ArrayList<String>();
while (featureIterator.hasNext()) {
JSONObject featureObject = new JSONObject();
List<String> dbxrefArray = new ArrayList<String>();
RichFeature feature = (RichFeature) featureIterator.next();
for (RankedCrossRef rankedCrossRef : feature.getRankedCrossRefs()) {
dbxrefArray.add(rankedCrossRef.getCrossRef().getDbname() + ":" + rankedCrossRef.getCrossRef().getAccession());
}
featureObject.put("db_xref", dbxrefArray);
featureObject.put("featureType", feature.getType());
Map featureMap = feature.getAnnotation().asMap();
Iterator<SimpleComparableTerm> featureKeyIterator = featureMap.keySet().iterator();
while (featureKeyIterator.hasNext()) {
SimpleComparableTerm term = featureKeyIterator.next();
String name = term.getName();
String nameValue = featureMap.get(term).toString();
//isolate is an array?
if (name.indexOf("altitude") != -1) {
featureObject.put("altitude_value", Float.valueOf(nameValue.substring(0, nameValue.indexOf(" ")))); //number, take care of negative number
} else if (name.indexOf("collection_date") != -1) {
if (getCollectionYear(nameValue) != 0) {
featureObject.put("collection_year", getCollectionYear(nameValue));
}
} else if (name.indexOf("country") != -1) {
if (nameValue.indexOf(":") != -1) {
featureObject.put("CollectionCountry", nameValue.substring(0, nameValue.indexOf(":")));
}
} else if (name.indexOf("culture_collection") != -1) {
int index = nameValue.indexOf(":") != -1 ? nameValue.indexOf(":") : nameValue.indexOf(" ");
if (index != -1) {
featureObject.put("InstitutionCode", nameValue.substring(0, index));
featureObject.put("CultureID", nameValue.substring(index + 1));
}
} else if (name.indexOf("lat_lon") != -1) {
Float[] arr = getLat_Lon(nameValue);
if (arr != null) {
featureObject.put("Latitude", arr[0]);
featureObject.put("Longitude", arr[1]);
}
} else if (name.indexOf("pathovar") != -1) {
} else if (feature.getType().equals("source") && name.equals("isolate")) {
isolates.add(nameValue);
}
featureObject.put(term.getName(), featureMap.get(term));
}
featureArray.put(featureObject);
//for garbage collection
featureObject = null;
dbxrefArray = null;
feature = null;
featureMap = null;
}
map.put("features", featureArray);
if (isolates.size() > 0) {
map.put("isolate_all", isolates);
}
return map;
}
public static int getCollectionYear(String date){
Matcher m = dp.matcher(date);
String year;
if(m.find()){
year = m.group(1);
return Integer.parseInt(year);
}else{
return 0;
}
}
public static Float[] getLat_Lon(String lat_lon){
Matcher m = llp.matcher(lat_lon);
Float[] array = null;
try{
if(m.matches()){
array = new Float[2];
if(m.group(2).equals("N")){
array[0] = Float.valueOf(m.group(1));
}else{
array[0] = Float.valueOf("0")-Float.valueOf(m.group(1));
}
if(m.group(4).equals("E")){
array[1] = Float.valueOf(m.group(3));
}else{
array[1] = Float.valueOf("0")-Float.valueOf(m.group(3));
}
}
}catch (NumberFormatException nfe){
return null;
}
return array;
}
public static void processUniprotSeq(RichSequence seq, JSONObject doc) throws ParseException {
logger.info("doc: " + seq.getAccession());
doc.put("Accession", seq.getAccession());
doc.put("Name", seq.getName());
doc.put("Division", seq.getDivision());
doc.put("Description", seq.getDescription().replace('\n', ' '));
doc.put("Version", seq.getVersion());
doc.put("sequencelength", seq.length());
//Taxon
doc.put("TaxonID", seq.getTaxon().getNCBITaxID());
for(Object name: seq.getTaxon().getNameClasses()){
doc.put("Taxon_"+(String)name, seq.getTaxon().getNames((String)name));
}
//rankedcrossrefs
/*JSONArray rankedCrossRefs = new JSONArray();
for(RankedCrossRef rankedCrossRef : seq.getRankedCrossRefs()){
JSONObject ref = new JSONObject();
String key = rankedCrossRef.getCrossRef().getDbname();
String accessions = rankedCrossRef.getCrossRef().getAccession();
for(Note note : rankedCrossRef.getCrossRef().getRichAnnotation().getNoteSet()){
accessions += ";"+note.getValue();
}
ref.put(key, accessions);
rankedCrossRefs.put(ref);
}
if(rankedCrossRefs.length() > 0){
doc.put("rankedCrossRefs", rankedCrossRefs);
}*/
processRankedCrossRefs(seq, doc);
//comments
JSONArray comments = new JSONArray();
for(Comment comment : seq.getComments()){
JSONObject cmtObj = new JSONObject();
String cmt = comment.getComment().replace('\n', ' ');
cmt = cmt.substring(3);
int index = cmt.indexOf(":");
cmtObj.put(cmt.substring(0,index).trim(),cmt.substring(index+1).trim());
comments.put(cmtObj);
}
if(comments.length() > 0){
doc.put("comments", comments);
}
//features
JSONArray features = new JSONArray();
Iterator<Feature> featureIterator = seq.features();
while(featureIterator.hasNext()){
JSONObject featureObject = new JSONObject();
List<String> dbxrefArray = new ArrayList<String>();
RichFeature feature = (RichFeature)featureIterator.next();
for(RankedCrossRef rankedCrossRef : feature.getRankedCrossRefs()){
dbxrefArray.add(rankedCrossRef.getCrossRef().getDbname() + ":" + rankedCrossRef.getCrossRef().getAccession());
}
if(dbxrefArray.size() > 0){
featureObject.put("rankedCrossRefs", dbxrefArray);
}
featureObject.put("type", feature.getType());
featureObject.put("location_start", feature.getLocation().getMin());
featureObject.put("location_end", feature.getLocation().getMax());
Map featureMap = feature.getAnnotation().asMap();
Iterator<SimpleComparableTerm> featureKeyIterator = featureMap.keySet().iterator();
while(featureKeyIterator.hasNext()){
SimpleComparableTerm term = featureKeyIterator.next();
featureObject.put(term.getName(),featureMap.get(term));
}
features.put(featureObject);
}
if(features.length() > 0){
doc.put("features", features);
}
//sequence
doc.put("sequence", seq.seqString());
JSONArray rankedDocRefs = new JSONArray();
Map<Integer,List<String>> rankedDocRefs_addiInfo = new HashMap<Integer, List<String>>();
//properties from notes: rlistener.addSequenceProperty
List<String> keywords = new ArrayList<String>();
List<String> secondaryAccs = new ArrayList<String>();
JSONArray organismHosts = new JSONArray();
for(Note note : seq.getNoteSet()){
String note_term = note.getTerm().getName();
if(note_term.equals("kw")){
keywords.add(note.getValue());
}else if(note_term.equals("cdat")){
doc.put("dateCreated", formatter.format(format.parse(note.getValue())));
}else if(note_term.equals("udat")){
doc.put("dateUpdated", formatter.format(format.parse(note.getValue())));
}else if(note_term.equals("adat")){
doc.put("dateAnnotated", formatter.format(format.parse(note.getValue())));
}else if(note_term.equals("arel")){
doc.put("relAnnotated", note.getValue());
}else if(note_term.equals("Organism host")){
JSONObject organismHost = new JSONObject();
String sciname;
String comname;
String names = null;
List synonym = new ArrayList();
String[] parts = note.getValue().split(";");
if(parts[0].matches("\\S+=\\S+")){
String[] moreparts = parts[0].split("=");
if(moreparts[0].equals("NCBI_TaxID")){
organismHost.put("NCBI_TaxID",Integer.parseInt(moreparts[1]));
}else{
organismHost.put(moreparts[0],moreparts[1]);
}
}else{
names = parts[0];
}
if(parts.length > 1){
names = parts[1];
}
if(names != null){
if (names.endsWith(".")) names = names.substring(0,names.length()-1); // chomp trailing dot
String[] nameparts = names.split("\\(");
sciname = nameparts[0].trim();
organismHost.put("scientific name", sciname);
if (nameparts.length>1) {
comname = nameparts[1].trim();
if (comname.endsWith(")")) comname = comname.substring(0,comname.length()-1); // chomp trailing bracket
organismHost.put("common name", comname);
if (nameparts.length>2) {
// synonyms
for (int j = 2 ; j < nameparts.length; j++) {
String syn = nameparts[j].trim();
if (syn.endsWith(")")) syn = syn.substring(0,syn.length()-1); // chomp trailing bracket
synonym.add(syn);
}
organismHost.put("synonym", synonym);
}
}
}
organismHosts.put(organismHost);
}else if(note_term.equals("Sequence meta info")){
String seqMetaInfo = note.getValue();
if(seqMetaInfo.startsWith("SEQUENCE")){
seqMetaInfo = seqMetaInfo.substring(8);
}
String[] parts = seqMetaInfo.split(";");
if(parts.length > 1){
doc.put("molecular weight", Integer.parseInt(parts[1].trim().split(" ")[0]));
if(parts.length > 2){
String[] moreparts = parts[2].trim().split(" ");
doc.put(moreparts[1], moreparts[0]);
}
}
}else if(note_term.startsWith("docref")){
int rank = Integer.parseInt(note.getValue().split(":")[0].trim());
String key = note_term.substring(7); //remove the precedding "docref_"
if(key.contains("biojavax:")){
key = key.substring(9); //remove "biojavax:"
}
String value = note.getValue().substring(note.getValue().indexOf(":")+1).trim();
if(rankedDocRefs_addiInfo.containsKey(rank)){
rankedDocRefs_addiInfo.get(rank).add(key+":"+value);
}else{
List<String> tmp = new ArrayList<String>();
tmp.add( key+":"+value);
rankedDocRefs_addiInfo.put(rank,tmp);
}
}else if(note_term.equals("acc")){
secondaryAccs.add(note.getValue());
}else{
doc.put(note_term, note.getValue());
}
}
if(secondaryAccs.size() > 0){
doc.put("secondaryacc",secondaryAccs);
}
if(organismHosts.length() > 0){
doc.put("organismhost", organismHosts);
}
if(keywords.size() > 0){
doc.put("keywords", keywords);
}
//rankeddocref
for(RankedDocRef rankedDocRef : seq.getRankedDocRefs()){
JSONObject rankedDocRefObj = new JSONObject();
DocRef docRef = rankedDocRef.getDocumentReference();
rankedDocRefObj.put("rank", rankedDocRef.getRank());
rankedDocRefObj.put("authors", docRef.getAuthors());
rankedDocRefObj.put("title", docRef.getTitle());
rankedDocRefObj.put("location", docRef.getLocation());
rankedDocRefObj.put("remark", docRef.getRemark());
for(Map.Entry entry : rankedDocRefs_addiInfo.entrySet()){
if((Integer)(entry.getKey()) == rankedDocRef.getRank()){
for(String pair : (List<String>)(entry.getValue())){
int index = pair.indexOf(":");
rankedDocRefObj.put(pair.substring(0, index),pair.substring(index+1));
}
}
}
rankedDocRefs.put(rankedDocRefObj);
}
if(rankedDocRefs.length() > 0){
doc.put("rankedDocRefs", rankedDocRefs);
}
}
public static void processEMBL_EnsemblSeq(RichSequence seq,JSONObject doc) throws ParseException {
logger.info("accession: " + seq.getName());
if(seq.getCircular()){
doc.put("Topology", "Circular");
}else{
doc.put("Topology", "Linear");
}
for(Note note : seq.getNoteSet()){
String noteName = note.getTerm().toString().substring(9);
if(noteName.equals("moltype")){
doc.put("Molecule type", note.getValue());
}else if(noteName.equals("organism")){
doc.put("Classfication", note.getValue().replaceAll("\n", ""));
}else if(noteName.equals("kw")){
doc.put("KeyWords", note.getValue());
}else if(noteName.equals("udat")){
doc.put("dateUpdated", formatter.format(format.parse(note.getValue())));
}else if(noteName.equals("cdat")){
doc.put("dateCreated", formatter.format(format.parse(note.getValue())));
}else{
doc.put(noteName, note.getValue());
}
}
doc.put("SequenceLength", seq.getInternalSymbolList().length());
doc.put("Description", seq.getDescription().replace('\n', ' '));
//System.out.println(seq.getInternalSymbolList().length());
//doc.put("Sequence length", seq.getInternalSymbolList().length());
doc.put("Accession", seq.getName());
doc.put("Organism",seq.getTaxon().getDisplayName());
doc.put("TaxonID", seq.getTaxon().getNCBITaxID());
/*for (RankedDocRef rankDocRef : seq.getRankedDocRefs()){
if(rankDocRef.getDocumentReference().getLocation().indexOf("Submitted") != -1){
int dotindex = rankDocRef.getDocumentReference().getLocation().indexOf(".");
String submitDate = rankDocRef.getDocumentReference().getLocation().substring(11,22);
String submitAddress = rankDocRef.getDocumentReference().getLocation().substring(dotindex+1).trim();
doc.put("SubmitDate", format.parse(submitDate));
doc.put("SubmittedAddress", rankDocRef.getDocumentReference().getLocation().substring(dotindex+1).trim());
}
}*/
//rankedDocRefs
//processRankedDocRefs(seq, doc);
//rankedCrossRef
processRankedCrossRefs(seq, doc);
//comments
processComment(seq, doc);
//features
JSONArray featureArray = new JSONArray();
Iterator<Feature> featureIterator = seq.features();
while (featureIterator.hasNext()){
JSONObject featureObject = new JSONObject();
List<String> dbxrefArray = new ArrayList<String>();
RichFeature feature = (RichFeature)featureIterator.next();
//deal with db_xref in each feature
//db_xref is not required in the requirement
for(RankedCrossRef rankedCrossRef : feature.getRankedCrossRefs()){
dbxrefArray.add(rankedCrossRef.getCrossRef().getDbname() + ":" + rankedCrossRef.getCrossRef().getAccession());
}
featureObject.put("db_xref", dbxrefArray);
featureObject.put("featureType", feature.getType());
Map featureMap = feature.getAnnotation().asMap();
Iterator<SimpleComparableTerm> featureKeyIterator = featureMap.keySet().iterator();
while(featureKeyIterator.hasNext()){
SimpleComparableTerm term = featureKeyIterator.next();
String name = term.getName();
String nameValue = featureMap.get(term).toString();
if(name.equals("altitude")){
featureObject.put("altitude_value", Float.valueOf(nameValue.substring(0,nameValue.indexOf("m")).trim())); //number, take care of negative number
}else if(name.equals("collection_date")){
JSONArray collectionDates = new JSONArray();
for(String singleDate : nameValue.split("/")){
JSONObject collectionDate = new JSONObject();
if(singleDate.endsWith("FT")){
singleDate = singleDate.substring(0, singleDate.length()-2);
}
if(singleDate.matches("\\d{2}-\\w{3}-\\d{4}")){
collectionDate.put("collection_date", formatter.format(format.parse(singleDate)));
}else{
collectionDate.put("collection_date", singleDate);
}
collectionDate.put("collection_year", getCollectionYear(singleDate));
collectionDates.put(collectionDate);
}
featureObject.put("collectionDate", collectionDates);
}
featureObject.put(term.getName(),featureMap.get(term));
}
featureArray.put(featureObject);
}
doc.put("features", featureArray);
}
public static void processRankedCrossRefs(RichSequence seq, JSONObject doc){
JSONArray rankedCrossRefs = new JSONArray();
for(RankedCrossRef rankedCrossRef : seq.getRankedCrossRefs()){
JSONObject ref = new JSONObject();
String key = rankedCrossRef.getCrossRef().getDbname();
String accessions = rankedCrossRef.getCrossRef().getAccession();
for(Note note : rankedCrossRef.getCrossRef().getRichAnnotation().getNoteSet()){
accessions += ";"+note.getValue();
}
ref.put(key, accessions);
rankedCrossRefs.put(ref);
}
if(rankedCrossRefs.length() > 0){
doc.put("rankedCrossRefs", rankedCrossRefs);
}
}
// public static void processRankedDocRefs(RichSequence seq, JSONObject doc) throws ParseException {
// JSONArray rankedDocRefs = new JSONArray();
// for(RankedDocRef rankedDocRef : seq.getRankedDocRefs()){
// DocRef docRef = rankedDocRef.getDocumentReference();
// JSONObject rankedRef = new JSONObject();
// rankedRef.put("authors", docRef.getAuthors());
// rankedRef.put("title", docRef.getTitle());
// if(docRef.getCrossref() != null){
// String dbName = docRef.getCrossref().getDbname();
// if(dbName.equals("PUBMED")){
// rankedRef.put(dbName, Integer.parseInt(docRef.getCrossref().getAccession()));
// }else{
// rankedRef.put(dbName, docRef.getCrossref().getAccession());
// }
// }
// Matcher m = submitDatep.matcher(docRef.getLocation().replaceAll("\n", " "));
// if(m.matches()){
// rankedRef.put("SubmitDate", formatter.format(format.parse(m.group(1))));
// rankedRef.put("SubmitAddress", m.group(2));
// int year = Integer.parseInt(m.group(1).substring(m.group(1).lastIndexOf("-")+1));
// rankedRef.put("SubmitYear", year);
// //submitCountry--extract from SubmitAddress
// String countryName = dict.mappingCountry(m.group(2));
// if(countryName != null){
// rankedRef.put("SubmitCountry", countryName);
// }
// }
// rankedDocRefs.put(rankedRef);
// }
// doc.put("rankedDocRefs", rankedDocRefs);
// }
public static void processComment(RichSequence seq, JSONObject doc){
Map<String, String> commentMetaData = new HashMap<String, String>();
JSONArray comments = new JSONArray();
for(Comment comment: seq.getComments()){
JSONObject commentObj = new JSONObject();
if(comment.getComment().indexOf("::") != -1){
String comm[] = comment.getComment().split("\n");
for(int i = 0; i < comm.length; i++){
if(comm[i].matches("(.*)\\s+::\\s+(.*)")){
String[] metaData = comm[i].split("::");
String key = metaData[0].trim();
String value = metaData[1].trim();
if(key.contains(".")){
key = key.replaceAll("\\.", " ");
}
commentMetaData.put(key, value);
}
}
commentObj.put("commentMeta", commentMetaData);
}else{
commentObj.put("comment", comment.getComment());
}
comments.put(commentObj);
}
doc.put("comments", comments);
}
}

View File

@ -0,0 +1,87 @@
package cn.piflow.bundle.ftp
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.jsoup.Jsoup
import org.jsoup.select.Elements
import org.junit.Test
import scala.util.parsing.json.JSON
class emblTest {
@Test
def testEmblDataParse(): Unit ={
//parse flow json
// val file = "src/main/resources/yqd/down.json"
//val file = "src/main/resources/yqd/refseq_genome.json"
//val file = "src/main/resources/yqd/select_unzip.json"
val file = "src/main/resources/yqd/embl_parser.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start()
//execute flow
val spark = SparkSession.builder()
.master("spark://10.0.88.70:7077")
.appName("Embl")
.config("spark.driver.memory", "8g")
.config("spark.executor.memory", "16g")
.config("spark.cores.max", "16")
.config("spark.jars","/root/Desktop/weishengwu/out/artifacts/piflow_bundle/piflow_bundle.jar")
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "hdfs://10.0.86.89:9000/xjzhu/piflow/checkpoints/")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
@Test
def testEmblDataParse11(): Unit ={
val url ="http://ftp.ebi.ac.uk/pub/databases/ena/sequence/release/"
val doc = Jsoup.connect(url).timeout(100000000).get()
// 获取 url 界面 文件名字 日期 大小
// Name Last modified Size Parent Directory -
// build_gbff_cu.pl 2003-04-25 17:23 21K
val elements: Elements = doc.select("html >body >table >tbody")
// println(elements)
println(elements.first().text())
// 按行 分割 elements 为单个字符串
val fileString = elements.first().text().split("\\n")
for (i <- 0 until fileString.size) {
println(fileString(i))
}
println(fileString)
}
}