bioProject stop

This commit is contained in:
yanggang 2018-11-26 15:42:48 +08:00
parent cda81eae5c
commit c29f9bcba7
3 changed files with 353 additions and 0 deletions

View File

@ -0,0 +1,39 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"0000",
"name":"LoadFromFtpUrl",
"bundle":"cn.piflow.bundle.ftp.LoadFromFtpUrl",
"properties":{
"http_URl":"https://ftp.ncbi.nlm.nih.gov/bioproject/bioproject.xml",
"url_type":"file",
"localPath":"/ftpBioProject",
"downType":"all",
"fileName":"bioproject.xml"
}
},
{
"uuid":"1111",
"name":"BioProjetDataParse",
"bundle":"cn.piflow.bundle.microorganism.BioProjetDataParse",
"properties":{
"es_nodes": "10.0.86.89,10.0.86.90,10.0.86.91",
"port": "9200",
"es_index": "bioproject",
"es_type": "bioprojecttest002"
}
}
],
"paths":[
{
"from":"LoadFromFtpUrl",
"outport":"",
"inport":"",
"to":"BioProjetDataParse"
}
]
}
}

View File

@ -0,0 +1,238 @@
package cn.piflow.bundle.microorganism
import java.io._
import java.net.UnknownHostException
import java.util.regex.Pattern
import cn.piflow.bundle.microorganism.util.BioProject
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroupEnum}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.spark.sql.{Row, SparkSession}
import org.biojava.bio.BioException
import org.elasticsearch.spark.sql.EsSparkSQL
import org.json.{JSONArray, JSONObject, XML}
import scala.util.parsing.json.JSON
class BioProjetDataParse extends ConfigurableStop{
val authorEmail: String = "ygang@cnic.cn"
val description: String = "Load file from ftp url."
val inportList: List[String] = List(PortEnum.DefaultPort.toString)
val outportList: List[String] = List(PortEnum.NonePort.toString)
var es_nodes:String = _ //es的节点多个用逗号隔开
var port:String= _ //es的端口好
var es_index:String = _ //es的索引
var es_type:String = _ //es的类型
var name:String = "Package"
var dp = Pattern.compile("((\\d{4})-(\\d{2})-(\\d{2}))(T.*)")
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
val sc = spark.sparkContext
val inDf= in.read()
inDf.show()
inDf.schema.printTreeString()
val rows: Array[Row] = inDf.collect()
var path:String = null
for (i <- 0 until rows.size) {
if (rows(i)(0).toString.endsWith("bioproject.xml")){
path = rows(i)(0).toString
// val path1 = "/ftpBioProject/bioproject.xml"
try {
val br = new BufferedReader(new FileReader(path))
var line: String = null
var i = 0
while (i < 2) {
br.readLine()
i = i + 1
}
var count = 0
var xml = new StringBuffer()
var x = 0
while ((line = br.readLine()) != null || x ==0) {
xml.append(line)
if (line.equals("</PackageSet>")) {
println("----------------------------------break")
x == 1
return x
}
else if (line.indexOf("</" + name + ">") != -1) { //reach the end of a doc
println("-----------------------------------------"+count)
count = count + 1
val doc = XML.toJSONObject(xml.toString()).getJSONObject(name)
println("#####################################################"+count)
println(doc)
xml = new StringBuffer()
// accession PRJNA31525
val accession = doc.getJSONObject("Project").getJSONObject("Project")
.getJSONObject("ProjectID")
.getJSONObject("ArchiveID")
.getString("accession")
val projectDescr = doc.getJSONObject("Project").getJSONObject("Project")
.getJSONObject("ProjectDescr")
// 加载 json 字符串 df
val jsonRDD = spark.sparkContext.makeRDD(doc.toString() :: Nil)
val jsonDF = spark.read.json(jsonRDD)
jsonDF.show()
// jsonDF.schema.printTreeString()
val options = Map("es.index.auto.create"-> "true",
// "es.mapping.id"->accession,
"es.nodes"->es_nodes,"es.port"->port)
// df 写入 es
EsSparkSQL.saveToEs(jsonDF,s"${es_index}/${es_type}",options)
// val bio = new BioProject
// bio.convertConcrete2KeyVal(projectDescr,"LocusTagPrefix")
// --------------1
// if (projectDescr.opt("ProjectReleaseDate") != null){
// val date = projectDescr.get("ProjectReleaseDate").toString
// val m = dp.matcher(date)
// if (m.matches()){
// // m.group(1) 2017-04-25
// // m.group(2)) 2017
// projectDescr.put("ProjectReleaseDate",m.group(1))
// projectDescr.put("ProjectReleaseDate",Integer.parseInt(m.group(2)))
//
// } else {
// // date 2012-05-21T00:00:00Z
// projectDescr.put("ProjectReleaseDate",date)
// }
// }
// ----------------2
// if (projectDescr.optJSONObject("Publication") !=null){
// val pub = projectDescr.getJSONObject("Publication")
// if (pub.opt("date") !=null){
// val date = projectDescr.getJSONObject("Publication").get("date").toString
// val m = dp.matcher(date)
// if (m.matches()){
// // m.group(1) 2017-04-25
// // m.group(2)) 2017
// projectDescr.put("date",m.group(1))
// projectDescr.put("year",Integer.parseInt(m.group(2)))
// } else {
// // date 2012-05-21T00:00:00Z
// projectDescr.put("date","##############99#")
// }
// }
// }
//
// ----------------3
// if(doc.optJSONObject("Submission").optJSONObject("submitted") != null){
// val submission = doc.optJSONObject("Submission").optJSONObject("submitted");
// if(submission.opt("submitted") != null){
// val date = submission.get("submitted");
// submission.put("submission_year", Integer.parseInt(date.toString().substring(0, 4)));
// }
// }
// ----------------4
// val grant = projectDescr.opt("Grant");
// if(grant != null){
// if(grant isInstanceOf[JSONArray]){
// for(int k = 0 ; k < ((JSONArray)grant).length(); k++){
// JSONObject singleGrant = (JSONObject)((JSONArray)grant).get(k);
// convertConcrete2KeyVal(singleGrant, "Agency");
// }
// }else if(grant instanceof JSONObject){
// convertConcrete2KeyVal((JSONObject)grant, "Agency");
// }
// }
// ----------------5
// val projectID = doc.getJSONObject("Project").getJSONObject("Project").getJSONObject("ProjectID");
// bio.convertConcrete2KeyVal(projectID, "LocalID");
// Object organization = doc.optJSONObject("Submission").optJSONObject("Submission").optJSONObject("Description").opt("Organization");
// if(organization instanceof JSONArray){
// for(int j = 0; j < ((JSONArray) organization).length(); j++){
// val orgz = ((JSONArray) organization).get(j);
// bio.convertConcrete2KeyVal(((JSONObject)orgz), "Name");
// }
// }else if(organization instanceof JSONObject){
// val orgz = (JSONObject)organization;
// bio.convertConcrete2KeyVal(orgz, "Name");
// }
// ----------------6
// val projTypeSubmission = doc.getJSONObject("Project").getJSONObject("Project").getJSONObject("ProjectType").optJSONObject("ProjectTypeSubmission");
// if(projTypeSubmission != null){
// val bioSampleSet = projTypeSubmission.getJSONObject("Target").optJSONObject("BioSampleSet");
// if(bioSampleSet != null){
// bio.convertConcrete2KeyVal(bioSampleSet, "ID");
// }
// }
}
}
} catch {
case e: UnknownHostException =>
e.printStackTrace()
case e: FileNotFoundException =>
e.printStackTrace()
case e: IOException =>
e.printStackTrace()
}
}
}
}
def setProperties(map: Map[String, Any]): Unit = {
es_nodes=MapUtil.get(map,key="es_nodes").asInstanceOf[String]
port=MapUtil.get(map,key="port").asInstanceOf[String]
es_index=MapUtil.get(map,key="es_index").asInstanceOf[String]
es_type=MapUtil.get(map,key="es_type").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("es_nodes").defaultValue("").required(true)
val port = new PropertyDescriptor().name("port").displayName("port").defaultValue("").required(true)
val es_index = new PropertyDescriptor().name("es_index").displayName("es_index").defaultValue("").required(true)
val es_type = new PropertyDescriptor().name("es_type").displayName("es_type").defaultValue("").required(true)
descriptor = es_nodes :: descriptor
descriptor = port :: descriptor
descriptor = es_index :: descriptor
descriptor = es_type :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("bioProject.png")
}
override def getGroup(): List[String] = {
List(StopGroupEnum.MicroorganismGroup.toString)
}
def initialize(ctx: ProcessContext): Unit = {
}
}

View File

@ -0,0 +1,76 @@
package cn.piflow.bundle
import java.util.ArrayList
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.jsoup.Jsoup
import org.jsoup.select.Elements
import org.junit.Test
import scala.util.parsing.json.JSON
class BioProjectDataTest {
@Test
def testBioProjetDataParse(): Unit ={
//parse flow json
val file = "src/main/resources/bioProject.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start()
//execute flow
val spark = SparkSession.builder()
.master("spark://10.0.86.89:7077")
.appName("BioProjetDataParse")
.config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g")
.config("spark.cores.max", "2")
.config("spark.jars","/work4/hbase/out/artifacts/piflow_bundle/piflow_bundle.jar")
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "hdfs://10.0.86.89:9000/xjzhu/piflow/checkpoints/")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
@Test
def testgenurl(): Unit ={
val url = "https://ftp.ncbi.nlm.nih.gov/bioproject/"
val url1 = "https://ftp.ncbi.nih.gov/genbank"
var filePathList = new ArrayList[String]
val doc = Jsoup.connect(url).timeout(100000000).get()
// 获取 url 界面 文件名字 日期 大小
// Name Last modified Size Parent Directory -
// build_gbff_cu.pl 2003-04-25 17:23 21K
val elements: Elements = doc.select("html >body >pre")
println(elements.first().text())
}
}