diff --git a/piflow-bundle/src/main/resources/bioProject.json b/piflow-bundle/src/main/resources/bioProject.json new file mode 100644 index 0000000..79d7c5e --- /dev/null +++ b/piflow-bundle/src/main/resources/bioProject.json @@ -0,0 +1,39 @@ +{ + "flow":{ + "name":"test", + "uuid":"1234", + "stops":[ + { + "uuid":"0000", + "name":"LoadFromFtpUrl", + "bundle":"cn.piflow.bundle.ftp.LoadFromFtpUrl", + "properties":{ + "http_URl":"https://ftp.ncbi.nlm.nih.gov/bioproject/bioproject.xml", + "url_type":"file", + "localPath":"/ftpBioProject", + "downType":"all", + "fileName":"bioproject.xml" + } + }, + { + "uuid":"1111", + "name":"BioProjetDataParse", + "bundle":"cn.piflow.bundle.microorganism.BioProjetDataParse", + "properties":{ + "es_nodes": "10.0.86.89,10.0.86.90,10.0.86.91", + "port": "9200", + "es_index": "bioproject", + "es_type": "bioprojecttest002" + } + } + ], + "paths":[ + { + "from":"LoadFromFtpUrl", + "outport":"", + "inport":"", + "to":"BioProjetDataParse" + } + ] + } +} \ No newline at end of file diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/BioProjetDataParse.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/BioProjetDataParse.scala new file mode 100644 index 0000000..bb9b8ab --- /dev/null +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/BioProjetDataParse.scala @@ -0,0 +1,238 @@ +package cn.piflow.bundle.microorganism + +import java.io._ +import java.net.UnknownHostException +import java.util.regex.Pattern + +import cn.piflow.bundle.microorganism.util.BioProject +import cn.piflow.conf.bean.PropertyDescriptor +import cn.piflow.conf.util.{ImageUtil, MapUtil} +import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroupEnum} +import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} +import org.apache.spark.sql.{Row, SparkSession} +import org.biojava.bio.BioException +import org.elasticsearch.spark.sql.EsSparkSQL +import org.json.{JSONArray, JSONObject, XML} + +import scala.util.parsing.json.JSON + + + +class BioProjetDataParse extends ConfigurableStop{ + val authorEmail: String = "ygang@cnic.cn" + val description: String = "Load file from ftp url." + val inportList: List[String] = List(PortEnum.DefaultPort.toString) + val outportList: List[String] = List(PortEnum.NonePort.toString) + + + var es_nodes:String = _ //es的节点,多个用逗号隔开 + var port:String= _ //es的端口好 + var es_index:String = _ //es的索引 + var es_type:String = _ //es的类型 + + var name:String = "Package" + var dp = Pattern.compile("((\\d{4})-(\\d{2})-(\\d{2}))(T.*)") + + def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { + val spark = pec.get[SparkSession]() + val sc = spark.sparkContext + + val inDf= in.read() + inDf.show() + inDf.schema.printTreeString() + + val rows: Array[Row] = inDf.collect() + + var path:String = null + for (i <- 0 until rows.size) { + if (rows(i)(0).toString.endsWith("bioproject.xml")){ + path = rows(i)(0).toString + +// val path1 = "/ftpBioProject/bioproject.xml" + try { + val br = new BufferedReader(new FileReader(path)) + var line: String = null + + var i = 0 + while (i < 2) { + br.readLine() + i = i + 1 + } + var count = 0 + var xml = new StringBuffer() + var x = 0 + while ((line = br.readLine()) != null || x ==0) { + + xml.append(line) + if (line.equals("")) { + println("----------------------------------break") + x == 1 + return x + } + else if (line.indexOf("") != -1) { //reach the end of a doc + println("-----------------------------------------"+count) + count = count + 1 + val doc = XML.toJSONObject(xml.toString()).getJSONObject(name) + println("#####################################################"+count) + println(doc) + + xml = new StringBuffer() + + // accession PRJNA31525 + val accession = doc.getJSONObject("Project").getJSONObject("Project") + .getJSONObject("ProjectID") + .getJSONObject("ArchiveID") + .getString("accession") + + val projectDescr = doc.getJSONObject("Project").getJSONObject("Project") + .getJSONObject("ProjectDescr") + + // 加载 json 字符串 为 df + val jsonRDD = spark.sparkContext.makeRDD(doc.toString() :: Nil) + val jsonDF = spark.read.json(jsonRDD) + jsonDF.show() + // jsonDF.schema.printTreeString() + + val options = Map("es.index.auto.create"-> "true", +// "es.mapping.id"->accession, + "es.nodes"->es_nodes,"es.port"->port) + + // df 写入 es + EsSparkSQL.saveToEs(jsonDF,s"${es_index}/${es_type}",options) + + +// val bio = new BioProject +// bio.convertConcrete2KeyVal(projectDescr,"LocusTagPrefix") + +// --------------1 +// if (projectDescr.opt("ProjectReleaseDate") != null){ +// val date = projectDescr.get("ProjectReleaseDate").toString +// val m = dp.matcher(date) +// if (m.matches()){ +// // m.group(1) 2017-04-25 +// // m.group(2)) 2017 +// projectDescr.put("ProjectReleaseDate",m.group(1)) +// projectDescr.put("ProjectReleaseDate",Integer.parseInt(m.group(2))) +// +// } else { +// // date 2012-05-21T00:00:00Z +// projectDescr.put("ProjectReleaseDate",date) +// } +// } + +// ----------------2 +// if (projectDescr.optJSONObject("Publication") !=null){ +// val pub = projectDescr.getJSONObject("Publication") +// if (pub.opt("date") !=null){ +// val date = projectDescr.getJSONObject("Publication").get("date").toString +// val m = dp.matcher(date) +// if (m.matches()){ +// // m.group(1) 2017-04-25 +// // m.group(2)) 2017 +// projectDescr.put("date",m.group(1)) +// projectDescr.put("year",Integer.parseInt(m.group(2))) +// } else { +// // date 2012-05-21T00:00:00Z +// projectDescr.put("date","##############99#") +// } +// } +// } +// + +// ----------------3 +// if(doc.optJSONObject("Submission").optJSONObject("submitted") != null){ +// val submission = doc.optJSONObject("Submission").optJSONObject("submitted"); +// if(submission.opt("submitted") != null){ +// val date = submission.get("submitted"); +// submission.put("submission_year", Integer.parseInt(date.toString().substring(0, 4))); +// } +// } +// ----------------4 +// val grant = projectDescr.opt("Grant"); +// if(grant != null){ +// if(grant isInstanceOf[JSONArray]){ +// for(int k = 0 ; k < ((JSONArray)grant).length(); k++){ +// JSONObject singleGrant = (JSONObject)((JSONArray)grant).get(k); +// convertConcrete2KeyVal(singleGrant, "Agency"); +// } +// }else if(grant instanceof JSONObject){ +// convertConcrete2KeyVal((JSONObject)grant, "Agency"); +// } +// } + + +// ----------------5 +// val projectID = doc.getJSONObject("Project").getJSONObject("Project").getJSONObject("ProjectID"); +// bio.convertConcrete2KeyVal(projectID, "LocalID"); +// Object organization = doc.optJSONObject("Submission").optJSONObject("Submission").optJSONObject("Description").opt("Organization"); +// if(organization instanceof JSONArray){ +// for(int j = 0; j < ((JSONArray) organization).length(); j++){ +// val orgz = ((JSONArray) organization).get(j); +// bio.convertConcrete2KeyVal(((JSONObject)orgz), "Name"); +// } +// }else if(organization instanceof JSONObject){ +// val orgz = (JSONObject)organization; +// bio.convertConcrete2KeyVal(orgz, "Name"); +// } + +// ----------------6 +// val projTypeSubmission = doc.getJSONObject("Project").getJSONObject("Project").getJSONObject("ProjectType").optJSONObject("ProjectTypeSubmission"); +// if(projTypeSubmission != null){ +// val bioSampleSet = projTypeSubmission.getJSONObject("Target").optJSONObject("BioSampleSet"); +// if(bioSampleSet != null){ +// bio.convertConcrete2KeyVal(bioSampleSet, "ID"); +// } +// } + + } + } + + } catch { + case e: UnknownHostException => + e.printStackTrace() + case e: FileNotFoundException => + e.printStackTrace() + case e: IOException => + e.printStackTrace() + } + } + } + } + + def setProperties(map: Map[String, Any]): Unit = { + es_nodes=MapUtil.get(map,key="es_nodes").asInstanceOf[String] + port=MapUtil.get(map,key="port").asInstanceOf[String] + es_index=MapUtil.get(map,key="es_index").asInstanceOf[String] + es_type=MapUtil.get(map,key="es_type").asInstanceOf[String] + } + + override def getPropertyDescriptor(): List[PropertyDescriptor] = { + var descriptor : List[PropertyDescriptor] = List() + val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("es_nodes").defaultValue("").required(true) + val port = new PropertyDescriptor().name("port").displayName("port").defaultValue("").required(true) + val es_index = new PropertyDescriptor().name("es_index").displayName("es_index").defaultValue("").required(true) + val es_type = new PropertyDescriptor().name("es_type").displayName("es_type").defaultValue("").required(true) + + + descriptor = es_nodes :: descriptor + descriptor = port :: descriptor + descriptor = es_index :: descriptor + descriptor = es_type :: descriptor + + descriptor + } + + override def getIcon(): Array[Byte] = { + ImageUtil.getImage("bioProject.png") + } + + override def getGroup(): List[String] = { + List(StopGroupEnum.MicroorganismGroup.toString) + } + + def initialize(ctx: ProcessContext): Unit = { + + } + + +} diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/BioProjectDataTest.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/BioProjectDataTest.scala new file mode 100644 index 0000000..6f270c8 --- /dev/null +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/BioProjectDataTest.scala @@ -0,0 +1,76 @@ +package cn.piflow.bundle + +import java.util.ArrayList + +import cn.piflow.Runner +import cn.piflow.conf.bean.FlowBean +import cn.piflow.conf.util.{FileUtil, OptionUtil} +import org.apache.spark.sql.SparkSession +import org.h2.tools.Server +import org.jsoup.Jsoup +import org.jsoup.select.Elements +import org.junit.Test + +import scala.util.parsing.json.JSON + +class BioProjectDataTest { + + @Test + def testBioProjetDataParse(): Unit ={ + + //parse flow json + val file = "src/main/resources/bioProject.json" + val flowJsonStr = FileUtil.fileReader(file) + val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]] + println(map) + + //create flow + val flowBean = FlowBean(map) + val flow = flowBean.constructFlow() + + val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start() + //execute flow + val spark = SparkSession.builder() + .master("spark://10.0.86.89:7077") + .appName("BioProjetDataParse") + .config("spark.driver.memory", "1g") + .config("spark.executor.memory", "2g") + .config("spark.cores.max", "2") + .config("spark.jars","/work4/hbase/out/artifacts/piflow_bundle/piflow_bundle.jar") + .enableHiveSupport() + .getOrCreate() + + val process = Runner.create() + .bind(classOf[SparkSession].getName, spark) + .bind("checkpoint.path", "hdfs://10.0.86.89:9000/xjzhu/piflow/checkpoints/") + .start(flow); + + process.awaitTermination(); + val pid = process.pid(); + println(pid + "!!!!!!!!!!!!!!!!!!!!!") + spark.close(); + } + + + + + + + @Test + def testgenurl(): Unit ={ + + val url = "https://ftp.ncbi.nlm.nih.gov/bioproject/" + val url1 = "https://ftp.ncbi.nih.gov/genbank" + + var filePathList = new ArrayList[String] + + val doc = Jsoup.connect(url).timeout(100000000).get() + // 获取 url 界面 文件名字 日期 大小 + // Name Last modified Size Parent Directory - + // build_gbff_cu.pl 2003-04-25 17:23 21K + + val elements: Elements = doc.select("html >body >pre") + println(elements.first().text()) + } + +}