Merge remote-tracking branch 'origin/master'

This commit is contained in:
judy0131 2019-02-27 10:58:55 +08:00
commit 0bd5c9bcf5
19 changed files with 252 additions and 181 deletions

View File

@ -226,4 +226,13 @@ To Use
} }
- curl -0 -X POST http://10.0.86.191:8002/flow/start -H "Content-type: application/json" -d 'this is your flow json' - curl -0 -X POST http://10.0.86.191:8002/flow/start -H "Content-type: application/json" -d 'this is your flow json'
- piflow web - piflow web
![](https://github.com/cas-bigdatalab/piflow/blob/master/doc/piflow-login.png)
![](https://github.com/cas-bigdatalab/piflow/blob/master/doc/piflow-flowList.png)
![](https://github.com/cas-bigdatalab/piflow/blob/master/doc/piflow-createflow.png)
![](https://github.com/cas-bigdatalab/piflow/blob/master/doc/piflow_web.png) ![](https://github.com/cas-bigdatalab/piflow/blob/master/doc/piflow_web.png)
![](https://github.com/cas-bigdatalab/piflow/blob/master/doc/piflow-loadflow.png)
![](https://github.com/cas-bigdatalab/piflow/blob/master/doc/piflow-monitor.png)
![](https://github.com/cas-bigdatalab/piflow/blob/master/doc/piflow-log.png)
![](https://github.com/cas-bigdatalab/piflow/blob/master/doc/piflow-processlist.png)
![](https://github.com/cas-bigdatalab/piflow/blob/master/doc/piflow-templatelist.png)
![](https://github.com/cas-bigdatalab/piflow/blob/master/doc/piflow-savetemplate.png)

BIN
doc/piflow-createflow.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 213 KiB

BIN
doc/piflow-flowList.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 233 KiB

BIN
doc/piflow-loadflow.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 161 KiB

BIN
doc/piflow-log.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 95 KiB

BIN
doc/piflow-login.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 17 KiB

BIN
doc/piflow-monitor.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 152 KiB

BIN
doc/piflow-processlist.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 256 KiB

BIN
doc/piflow-savetemplate.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 173 KiB

BIN
doc/piflow-templatelist.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 261 KiB

Binary file not shown.

View File

@ -231,6 +231,11 @@
<version>2.4.2</version> <version>2.4.2</version>
</dependency> </dependency>
<dependency>
<groupId>ftpClient</groupId>
<artifactId>edtftp</artifactId>
<version>1.0.0</version>
</dependency>
</dependencies> </dependencies>
@ -291,6 +296,22 @@
</configuration> </configuration>
</execution> </execution>
<execution>
<id>install-external-3</id>
<goals>
<goal>install-file</goal>
</goals>
<phase>install</phase>
<configuration>
<file>${basedir}/lib/edtftpj.jar</file>
<groupId>ftpClient</groupId>
<artifactId>edtftp</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
<generatePom>true</generatePom>
</configuration>
</execution>
</executions> </executions>
</plugin> </plugin>

View File

@ -6,10 +6,10 @@
{ {
"uuid":"1111", "uuid":"1111",
"name":"SelectFilesByName", "name":"SelectFilesByName",
"bundle":"cn.piflow.bundle.ftp.SelectFilesByName", "bundle":"cn.piflow.bundle.hdfs.SelectFilesByName",
"properties":{ "properties":{
"HDFSUrl":"hdfs://10.0.88.70:9000", "HDFSUrl":"hdfs://10.0.88.70:9000",
"HDFSPath":"/yqd/weishengwu/NCBI_Microbe_genome/", "HDFSPath":"/yqd/ftp/NCBI_Microbe_genome/",
"selectionConditions":".*.gbk" "selectionConditions":".*.gbk"
} }
},{ },{
@ -26,8 +26,8 @@
"properties":{ "properties":{
"es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72", "es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72",
"port": "9200", "port": "9200",
"es_index": "gene_index", "es_index": "yqd_ncbi_microbe_genome_index_delet",
"es_type": "gene_type" "es_type": "yqd_ncbi_microbe_genome_type_delet"
} }
} }
], ],

View File

@ -6,10 +6,10 @@
{ {
"uuid":"1111", "uuid":"1111",
"name":"SelectFilesByName", "name":"SelectFilesByName",
"bundle":"cn.piflow.bundle.ftp.SelectFilesByName", "bundle":"cn.piflow.bundle.hdfs.SelectFilesByName",
"properties":{ "properties":{
"HDFSUrl":"hdfs://10.0.88.70:9000", "HDFSUrl":"hdfs://10.0.88.70:9000",
"HDFSPath":"/yqd/weishengwu/Ensembl_gff3/", "HDFSPath":"/yqd/ftp/ensembl_gff3/",
"selectionConditions":".*.gff3" "selectionConditions":".*.gff3"
} }
},{ },{
@ -26,8 +26,8 @@
"properties":{ "properties":{
"es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72", "es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72",
"port": "9200", "port": "9200",
"es_index": "ensemblgff3", "es_index": "yqd_ensembl_index",
"es_type": "ensemblgff3" "es_type": "yqd_ensembl_type"
} }
} }
], ],

View File

@ -1,20 +1,26 @@
package cn.piflow.bundle.ftp package cn.piflow.bundle.ftp
import java.text.NumberFormat
import java.util.regex.Pattern
import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil} import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup} import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.commons.net.ftp.{FTP, FTPClient, FTPClientConfig, FTPFile} import com.enterprisedt.net.ftp._
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem, Path} import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem, Path}
import org.apache.log4j.Logger
class LoadFromFtpToHDFS extends ConfigurableStop { import scala.collection.mutable.ArrayBuffer
class LoadFromFtpToHDFS extends ConfigurableStop{
override val authorEmail: String = "yangqidong@cnic.cn" override val authorEmail: String = "yangqidong@cnic.cn"
override val description: String = "Download files or folders to save to HDFS" override val description: String = "Download files or folders to save to HDFS"
override val inportList: List[String] = List(PortEnum.NonePort.toString) override val inportList: List[String] = List(PortEnum.NonePort.toString)
override val outportList: List[String] = List(PortEnum.NonePort.toString) override val outportList: List[String] = List(PortEnum.NonePort.toString)
var url_str:String =_ var ftp_url:String =_
var port:String=_ var port:String=_
var username:String=_ var username:String=_
var password:String=_ var password:String=_
@ -22,118 +28,205 @@ class LoadFromFtpToHDFS extends ConfigurableStop {
var HDFSUrl:String=_ var HDFSUrl:String=_
var HDFSPath:String=_ var HDFSPath:String=_
var isFile:String=_ var isFile:String=_
var filterByName : String = _
var ftp:FileTransferClient = null
var errorFile : ArrayBuffer[String]=ArrayBuffer()
var filters: Array[String]=null
var myException : Exception = null
var boo : Boolean = true
var fs: FileSystem=null var fs: FileSystem=null
var con: FTPClientConfig =null var log: Logger =null
def downFile(ftp: FTPClient,ftpFilePath:String,HDFSSavePath:String): Unit = {
val changeFlag: Boolean = ftp.changeWorkingDirectory(ftpFilePath)
if(changeFlag){
println("change dir successful "+ftpFilePath)
val files: Array[FTPFile] = ftp.listFiles() def ftpConnect(): Unit = {
var isConnect : Boolean = false
for(x <- files ) { var count : Int = 0
if (x.isFile) { while ( (! isConnect) && (count < 5) ){
ftp.changeWorkingDirectory(ftpFilePath) count += 1
println("down start ^^^ "+x.getName) try{
val hdfsPath: Path = new Path(HDFSSavePath + x.getName) ftp.connect()
if(! fs.exists(hdfsPath)){ isConnect = true
var fdos: FSDataOutputStream = fs.create(hdfsPath) }catch {
case e : Exception => {
val b = ftp.retrieveFile(new String(x.getName.getBytes("GBK"),"ISO-8859-1"), fdos) isConnect = false
if(b){ log.warn("Retry the connection")
println("down successful " + x.getName) Thread.sleep(1000*60*3)
}else{
throw new Exception("down error")
}
fdos.close()
}
} else {
downFile(ftp,ftpFilePath+x.getName+"/",HDFSSavePath+x.getName+"/")
} }
} }
}else{
throw new Exception("File path error")
} }
} }
def openFtpClient(): Unit = {
ftp = null
ftp = new FileTransferClient
ftp.setRemoteHost(ftp_url)
if(port.size > 0){
ftp.setRemotePort(port.toInt)
}
if(username.length > 0){
ftp.setUserName(username)
ftp.setPassword(password)
}else{
ftp.setUserName("anonymous")
ftp.setPassword("anonymous")
}
ftp.setContentType(FTPTransferType.BINARY)
ftp.getAdvancedFTPSettings.setConnectMode(FTPConnectMode.PASV)
var isConnect : Boolean = true
ftpConnect()
}
def copyFile(fileDir: String, fileNmae: String, HdfsPath: String) : Unit = {
openFtpClient()
try{
var ftis: FileTransferInputStream = ftp.downloadStream(fileDir+fileNmae)
var fileSize: Double = 0
var whetherShowdownloadprogress : Boolean = true
try {
fileSize = ftp.getSize(fileDir + fileNmae).asInstanceOf[Double]
}catch {
case e : Exception => {
whetherShowdownloadprogress = false
log.warn("File size acquisition failed")
}
}
val path: Path = new Path(HdfsPath + fileNmae)
var hdfsFileLen: Long = -100
if(fs.exists(path)){
hdfsFileLen = fs.getContentSummary(path).getLength
}
if(hdfsFileLen != fileSize ){
var fdos: FSDataOutputStream = fs.create(path)
val bytes: Array[Byte] = new Array[Byte](1024*1024*10)
var downSize:Double=0
var n = -1
while (((n = ftis.read(bytes)) != -1) && (n != -1)){
fdos.write(bytes,0,n)
fdos.flush()
downSize += n
if(whetherShowdownloadprogress){
val percentageOfProgressStr: String = NumberFormat.getPercentInstance.format(downSize/fileSize)
log.debug(fileDir+fileNmae+" Download complete "+percentageOfProgressStr)
}
}
ftis.close()
fdos.close()
ftp.disconnect()
boo = true
log.debug("Download complete---"+fileDir+fileNmae)
}
}catch {
case e : Exception => {
boo = false
myException = e
}
}
}
def getFtpList(ftpDir: String): Array[FTPFile]= {
var fileArr: Array[FTPFile] = null
try{
fileArr = ftp.directoryList(ftpDir)
}catch {
case e :Exception => {
myException = e
}
}
fileArr
}
def downFileDir(ftpDir: String, HdfsDir: String): Unit = {
openFtpClient()
var fileArr: Array[FTPFile] = getFtpList(ftpDir)
var countOfFileList:Int=0
while (fileArr == null && countOfFileList < 5){
countOfFileList += 1
if(fileArr == null){
Thread.sleep(1000*60*3)
openFtpClient()
fileArr = getFtpList(ftpDir)
log.warn("Retry the list of files---" +ftpDir )
}
}
if(fileArr == null && countOfFileList == 5){
errorFile += ftpDir
}else if(fileArr != null){
log.debug("Getted list of files---"+ftpDir)
try{
ftp.disconnect()
}catch {
case e :Exception => log.warn("Failed to disconnect FTP server")
}
fileArr.foreach(eachFile => {
val fileName: String = eachFile.getName
if(eachFile.isDir && ! eachFile.isFile){
downFileDir(ftpDir+fileName+"/",HdfsDir+fileName+"/")
}else if(! eachFile.isDir && eachFile.isFile){
var witherDown = true
if(filters.size > 0){
witherDown = false
filters.foreach(each => {
if(! witherDown){
witherDown = Pattern.matches(each,fileName)
}
})
}
if(witherDown){
log.debug("Start downloading---"+ftpDir+fileName)
copyFile(ftpDir,fileName,HdfsDir)
var count = 0
while ((! boo) && (count < 5)){
count += 1
Thread.sleep(1000*60*3)
copyFile(ftpDir,fileName,HdfsDir)
log.warn("Try downloading files again---" + ftpDir+fileName)
}
if((! boo) && (count == 5)){
errorFile += (ftpDir+fileName)
}
}
}else{
println(ftpDir+fileName+ "---Neither file nor folder")
errorFile += (ftpDir+fileName)
}
})
}
}
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
log = Logger.getLogger(classOf[LoadFromFtpToHDFS])
val configuration: Configuration = new Configuration() val configuration: Configuration = new Configuration()
configuration.set("fs.defaultFS", HDFSUrl) configuration.set("fs.defaultFS", HDFSUrl)
fs = FileSystem.get(configuration) fs = FileSystem.get(configuration)
val ftp:FTPClient = openFtpClient() if(filterByName.length == 0){
filterByName=".*"
if(isFile.equals("true")){ }
val pathArr: Array[String] = ftpFile.split("/") filters = filterByName.split(";")
var dirPath:String=""
for(x <- (0 until pathArr.length-1)){
dirPath += (pathArr(x)+"/")
}
val boolChange: Boolean = ftp.changeWorkingDirectory(dirPath)
if(boolChange){
println("change dir successful "+dirPath)
var fdos: FSDataOutputStream = fs.create(new Path(HDFSPath+pathArr.last))
println("down start "+pathArr.last)
val boolDownSeccess: Boolean = ftp.retrieveFile(new String(pathArr.last.getBytes("GBK"),"ISO-8859-1"), fdos)
if(boolDownSeccess){
println("down successful "+pathArr.last)
}else{
throw new Exception("down error")
}
fdos.flush()
fdos.close()
}else{
throw new Exception("File path error")
}
if(isFile.equals("true") || isFile.equals("TRUE")){
val fileNmae: String = ftpFile.split("/").last
val fileDir = ftpFile.replace(fileNmae,"")
copyFile(fileDir,fileNmae,HDFSPath)
}else if(isFile.equals("false") || isFile.equals("FALSE")){
downFileDir(ftpFile,HDFSPath)
}else{ }else{
downFile(ftp,ftpFile,HDFSPath) throw new Exception("Please specify whether it is a file or directory.")
}
if(errorFile.size > 0){
errorFile.foreach(x =>{
log.warn("Download failed---"+x)
})
} }
} }
def openFtpClient(): FTPClient = {
val ftp = new FTPClient
try{
if(port.length > 0 ){
ftp.connect(url_str,port.toInt)
}else{
ftp.connect(url_str)
}
}catch {
case e:Exception => throw new Exception("Failed to connect FTP server")
}
try{
if(username.length > 0 && password.length > 0){
ftp.login(username,password)
}else{
ftp.login("anonymous", "121@hotmail.com")
}
}catch {
case e:Exception => throw new Exception("Failed to log on to FTP server")
}
ftp.setControlEncoding("GBK")
con = new FTPClientConfig(FTPClientConfig.SYST_NT)
con.setServerLanguageCode("zh")
ftp.setFileType(FTP.BINARY_FILE_TYPE)
ftp.setDataTimeout(600000)
ftp.setConnectTimeout(600000)
ftp.enterLocalPassiveMode()
ftp.setControlEncoding("UTF-8")
ftp
}
override def setProperties(map: Map[String, Any]): Unit = { override def setProperties(map: Map[String, Any]): Unit = {
url_str=MapUtil.get(map,key="url_str").asInstanceOf[String] ftp_url=MapUtil.get(map,key="ftp_url").asInstanceOf[String]
port=MapUtil.get(map,key="port").asInstanceOf[String] port=MapUtil.get(map,key="port").asInstanceOf[String]
username=MapUtil.get(map,key="username").asInstanceOf[String] username=MapUtil.get(map,key="username").asInstanceOf[String]
password=MapUtil.get(map,key="password").asInstanceOf[String] password=MapUtil.get(map,key="password").asInstanceOf[String]
@ -141,25 +234,24 @@ class LoadFromFtpToHDFS extends ConfigurableStop {
HDFSUrl=MapUtil.get(map,key="HDFSUrl").asInstanceOf[String] HDFSUrl=MapUtil.get(map,key="HDFSUrl").asInstanceOf[String]
HDFSPath=MapUtil.get(map,key="HDFSPath").asInstanceOf[String] HDFSPath=MapUtil.get(map,key="HDFSPath").asInstanceOf[String]
isFile=MapUtil.get(map,key="isFile").asInstanceOf[String] isFile=MapUtil.get(map,key="isFile").asInstanceOf[String]
filterByName=MapUtil.get(map,key="filterByName").asInstanceOf[String]
} }
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val url_str = new PropertyDescriptor().name("url_str").displayName("URL").defaultValue("IP of FTP server, such as 128.136.0.1 or ftp.ei.addfc.gak").required(true) val url_str = new PropertyDescriptor().name("url_str").displayName("URL").defaultValue("IP of FTP server, such as 128.136.0.1 or ftp.ei.addfc.gak").required(true)
val port = new PropertyDescriptor().name("port").displayName("PORT").defaultValue("Port of FTP server").required(false) val port = new PropertyDescriptor().name("port").displayName("PORT").defaultValue("Port of FTP server").required(false)
val username = new PropertyDescriptor().name("username").displayName("USER_NAME").defaultValue("The username needed to log in to the FTP server." + val username = new PropertyDescriptor().name("username").displayName("USER_NAME").defaultValue("").required(false)
"You can choose not to specify, we will try to login anonymously").required(false) val password = new PropertyDescriptor().name("password").displayName("PASSWORD").defaultValue("").required(false)
val password = new PropertyDescriptor().name("password").displayName("PASSWORD").defaultValue("The password corresponding to the user name you specified." + val ftpFile = new PropertyDescriptor().name("ftpFile").displayName("FTP_File").defaultValue("The path of the file to the FTP server, such as /test/Ab/ or /test/Ab/test.txt").required(true)
"If you do not specify, we will try to login anonymously").required(false) val HDFSUrl = new PropertyDescriptor().name("HDFSUrl").displayName("HDFSUrl").defaultValue("The URL of the HDFS file system, such as hdfs://10.0.88.70:9000").required(true)
val ftpFile = new PropertyDescriptor().name("ftpFile").displayName("FTP_File").defaultValue("The path of the file to the FTP server, " + val HDFSPath = new PropertyDescriptor().name("HDFSPath").displayName("HDFSPath").defaultValue("The save path of the HDFS file system, such as /test/Ab/").required(true)
"such as /test/Ab/ or /test/Ab/test.txt").required(true) val isFile = new PropertyDescriptor().name("isFile").displayName("isFile").defaultValue("Whether the path is a file or not, if true is filled in, only a single file specified by the path is downloaded. If false is filled in, all files under the folder are downloaded recursively.").required(true)
val HDFSUrl = new PropertyDescriptor().name("HDFSUrl").displayName("HDFSUrl").defaultValue("The URL of the HDFS file system, " + val filterByName = new PropertyDescriptor().name("filterByName").displayName("filterByName").defaultValue("If you choose to download the entire directory, you can use this parameter to filter the files you need to download. " +
"such as hdfs://10.0.88.70:9000").required(true) "What you need to fill in here is standard Java regular expressions. For example, you need to download all files in the /A/ directory that end in .gz " +
val HDFSPath = new PropertyDescriptor().name("HDFSPath").displayName("HDFSPath").defaultValue("The save path of the HDFS file system, " + "You can fill in .*.gz here. If there are multiple screening conditions, need to use ; segmentation.").required(false)
"such as /test/Ab/").required(true)
val isFile = new PropertyDescriptor().name("isFile").displayName("isFile").defaultValue("Whether the path is a file or not, " + descriptor = filterByName :: descriptor
"if true is filled in, only a single file specified by the path is downloaded. If false is filled in, all files under the folder are downloaded recursively.").required(true)
descriptor = isFile :: descriptor descriptor = isFile :: descriptor
descriptor = url_str :: descriptor descriptor = url_str :: descriptor
descriptor = port :: descriptor descriptor = port :: descriptor
@ -182,6 +274,4 @@ class LoadFromFtpToHDFS extends ConfigurableStop {
override def initialize(ctx: ProcessContext): Unit = { override def initialize(ctx: ProcessContext): Unit = {
} }
} }

View File

@ -15,7 +15,7 @@ import org.json.JSONObject
class Ensembl_gff3Parser extends ConfigurableStop{ class Ensembl_gff3Parser extends ConfigurableStop{
override val authorEmail: String = "yangqidong@cnic.cn" override val authorEmail: String = "yangqidong@cnic.cn"
override val description: String = "Parsing ensembl type data" override val description: String = "ensembl type data"
override val inportList: List[String] =List(PortEnum.DefaultPort.toString) override val inportList: List[String] =List(PortEnum.DefaultPort.toString)
override val outportList: List[String] = List(PortEnum.DefaultPort.toString) override val outportList: List[String] = List(PortEnum.DefaultPort.toString)
@ -62,7 +62,7 @@ class Ensembl_gff3Parser extends ConfigurableStop{
configuration.set("fs.defaultFS",hdfsUrl) configuration.set("fs.defaultFS",hdfsUrl)
var fs: FileSystem = FileSystem.get(configuration) var fs: FileSystem = FileSystem.get(configuration)
val hdfsPathTemporary:String = hdfsUrl+"/Refseq_genomeParser_temporary.json" val hdfsPathTemporary:String = hdfsUrl+"/ensembl_genomeParser_temporary.json"
val path: Path = new Path(hdfsPathTemporary) val path: Path = new Path(hdfsPathTemporary)
if(fs.exists(path)){ if(fs.exists(path)){
@ -82,25 +82,18 @@ class Ensembl_gff3Parser extends ConfigurableStop{
var doc: JSONObject = null var doc: JSONObject = null
var seq: RichSequence = null var seq: RichSequence = null
var jsonStr: String = "" var jsonStr: String = ""
var n:Int=0
inDf.collect().foreach(row => { inDf.collect().foreach(row => {
pathStr = row.get(0).asInstanceOf[String] pathStr = row.get(0).asInstanceOf[String]
println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! start parser ^^^" + pathStr) println("start parser ^^^" + pathStr)
fdis = fs.open(new Path(pathStr)) fdis = fs.open(new Path(pathStr))
br = new BufferedReader(new InputStreamReader(fdis)) br = new BufferedReader(new InputStreamReader(fdis))
var eachStr:String=null var eachStr:String=null
while((eachStr = br.readLine()) != null && eachStr != null){ while((eachStr = br.readLine()) != null && eachStr != null ){
doc = parser.parserGff3(eachStr) doc = parser.parserGff3(eachStr)
jsonStr = doc.toString jsonStr = doc.toString
if(jsonStr.length > 2){ if(jsonStr.length > 2){
n +=1 bis = new BufferedInputStream(new ByteArrayInputStream((jsonStr+"\n").getBytes()))
println("start " + n + " String")
if (n == 1) {
bis = new BufferedInputStream(new ByteArrayInputStream(("[" + jsonStr).getBytes()))
} else {
bis = new BufferedInputStream(new ByteArrayInputStream(("," + jsonStr).getBytes()))
}
var count: Int = bis.read(buff) var count: Int = bis.read(buff)
while (count != -1) { while (count != -1) {
fdos.write(buff, 0, count) fdos.write(buff, 0, count)
@ -109,6 +102,7 @@ class Ensembl_gff3Parser extends ConfigurableStop{
} }
fdos.flush() fdos.flush()
bis.close()
bis = null bis = null
doc = null doc = null
seq = null seq = null
@ -117,24 +111,15 @@ class Ensembl_gff3Parser extends ConfigurableStop{
} }
sequences = null sequences = null
br.close()
br = null br = null
fdis.close()
fdis =null fdis =null
pathStr = null pathStr = null
}) })
bis = new BufferedInputStream(new ByteArrayInputStream(("]").getBytes()))
var count: Int = bis.read(buff)
while (count != -1) {
fdos.write(buff, 0, count)
fdos.flush()
count = bis.read(buff)
}
fdos.flush()
bis.close()
fdos.close() fdos.close()
val df: DataFrame = session.read.json(hdfsPathTemporary) out.write(session.read.json(hdfsPathTemporary))
out.write(df)
} }
} }

View File

@ -41,7 +41,7 @@ class MicrobeGenomeDataParser extends ConfigurableStop{
configuration.set("fs.defaultFS",hdfsUrl) configuration.set("fs.defaultFS",hdfsUrl)
var fs: FileSystem = FileSystem.get(configuration) var fs: FileSystem = FileSystem.get(configuration)
val hdfsPathTemporary:String = hdfsUrl+"/Refseq_genomeParser_temporary.json" val hdfsPathTemporary:String = hdfsUrl+"/NCBI_Microbe_genome_genomeParser_temporary.json"
val path: Path = new Path(hdfsPathTemporary) val path: Path = new Path(hdfsPathTemporary)
if(fs.exists(path)){ if(fs.exists(path)){
@ -63,14 +63,13 @@ class MicrobeGenomeDataParser extends ConfigurableStop{
inDf.collect().foreach(row => { inDf.collect().foreach(row => {
pathStr = row.get(0).asInstanceOf[String] pathStr = row.get(0).asInstanceOf[String]
println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! start parser ^^^" + pathStr)
fdis = fs.open(new Path(pathStr)) fdis = fs.open(new Path(pathStr))
br = new BufferedReader(new InputStreamReader(fdis)) br = new BufferedReader(new InputStreamReader(fdis))
sequences = CustomIOTools.IOTools.readGenbankProtein(br, null) sequences = CustomIOTools.IOTools.readGenbankProtein(br, null)
while (sequences.hasNext) { while (sequences.hasNext) {
n += 1 n += 1
doc = new JSONObject() doc = new JSONObject()
seq = sequences.nextRichSequence() seq = sequences.nextRichSequence()
Process.processSingleSequence(seq,doc) Process.processSingleSequence(seq,doc)
@ -90,6 +89,7 @@ class MicrobeGenomeDataParser extends ConfigurableStop{
} }
fdos.flush() fdos.flush()
bis.close()
bis = null bis = null
doc = null doc = null
seq = null seq = null
@ -97,7 +97,9 @@ class MicrobeGenomeDataParser extends ConfigurableStop{
} }
sequences = null sequences = null
br.close()
br = null br = null
fdis.close()
fdis =null fdis =null
pathStr = null pathStr = null
@ -114,10 +116,7 @@ class MicrobeGenomeDataParser extends ConfigurableStop{
bis.close() bis.close()
fdos.close() fdos.close()
println("start parser HDFSjsonFile") out.write(session.read.json(hdfsPathTemporary))
val df: DataFrame = session.read.json(hdfsPathTemporary)
out.write(df)
} }

View File

@ -4,6 +4,7 @@ mvn install:install-file -Dfile=/opt/project/piflow/piflow-bundle/lib/spark-xml_
mvn install:install-file -Dfile=/Work/piflow/piflow-bundle/lib/java_memcached-release_2.6.6.jar -DgroupId=com.memcached -DartifactId=java_memcached-release -Dversion=2.6.6 -Dpackaging=jar mvn install:install-file -Dfile=/Work/piflow/piflow-bundle/lib/java_memcached-release_2.6.6.jar -DgroupId=com.memcached -DartifactId=java_memcached-release -Dversion=2.6.6 -Dpackaging=jar
mvn install:install-file -Dfile=/Work/piflow/piflow-bundle/lib/ojdbc6.jar -DgroupId=jdbc_oracle -DartifactId=ojdbc -Dversion=6.0.0 -Dpackaging=jar mvn install:install-file -Dfile=/Work/piflow/piflow-bundle/lib/ojdbc6.jar -DgroupId=jdbc_oracle -DartifactId=ojdbc -Dversion=6.0.0 -Dpackaging=jar
mvn install:install-file -Dfile=/Work/piflow/piflow-bundle/lib/ojdbc5.jar -DgroupId=jdbc_oracle -DartifactId=ojdbc -Dversion=5.0.0 -Dpackaging=jar mvn install:install-file -Dfile=/Work/piflow/piflow-bundle/lib/ojdbc5.jar -DgroupId=jdbc_oracle -DartifactId=ojdbc -Dversion=5.0.0 -Dpackaging=jar
mvn install:install-file -Dfile=/root/Desktop/myPut/piflow-bundle/lib/edtftpj.jar -DgroupId=ftpClient -DartifactId=edtftp -Dversion=1.0.0 -Dpackaging=jar
2.packaging 2.packaging

View File

@ -1,34 +0,0 @@
1.maven error
apt-get install maven
mvn install:install-file -Dfile=/opt/project/piflow/piflow-bundle/lib/java_memcached-release_2.6.6.jar -DgroupId=com.memcached -DartifactId=java_memcached-release_2.6.6 -Dversion=0.0.0 -Dpackaging=jar
2.packaging
clean package -Dmaven.test.skip=true -U
3.set SPARK_HOME in Configurations
Edit Configurations --> Application(HttpService) --> Configurations --> Environment Variable
4. yarn log aggregation
Edit yarn-site.xml, add the following content
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
<property>
<name>yarn.nodemanager.log-aggregation.debug-enabled</name>
<value>true</value>
</property>
<property>
<name>yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds</name>
<value>3600</value>
</property>