diff --git a/piflow-bundle/piflow-bundle.iml b/piflow-bundle/piflow-bundle.iml deleted file mode 100644 index a22887a..0000000 --- a/piflow-bundle/piflow-bundle.iml +++ /dev/null @@ -1,389 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - \ No newline at end of file diff --git a/piflow-bundle/src/main/resources/microorganism/bioProject.json b/piflow-bundle/src/main/resources/microorganism/bioProject.json new file mode 100644 index 0000000..d6543e9 --- /dev/null +++ b/piflow-bundle/src/main/resources/microorganism/bioProject.json @@ -0,0 +1,54 @@ +{ + "flow":{ + "name":"test", + "uuid":"1234", + "stops":[ + { + "uuid":"1111", + "name":"SelectFilesByName", + "bundle":"cn.piflow.bundle.ftp.SelectFilesByName", + "properties":{ + "HDFSUrl":"hdfs://master2.packone:8020", + "HDFSPath":"/microo/biproject/", + "selectionConditions":"bioproject.xml" + } + }, + { + "uuid":"2222", + "name":"BioProjetDataParse", + "bundle":"cn.piflow.bundle.microorganism.BioProjetDataParse", + "properties":{ + "es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72", + "port": "9200", + "es_index": "bioproject", + "es_type": "bioprojecttest002" + } + }, + { + "uuid": "3333", + "name": "putEs", + "bundle": "cn.piflow.bundle.es.PutEs", + "properties": { + "es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72", + "port": "9200", + "es_index": "bioproject10", + "es_type": "bioproject10" + } + } + ], + "paths":[ + { + "from":"SelectFilesByName", + "outport":"", + "inport":"", + "to":"BioProjetDataParse" + }, + { + "from":"BioProjetDataParse", + "outport":"", + "inport":"", + "to":"putEs" + } + ] + } +} \ No newline at end of file diff --git a/piflow-bundle/src/main/resources/microorganism/bioSample.json b/piflow-bundle/src/main/resources/microorganism/bioSample.json new file mode 100644 index 0000000..5474b1c --- /dev/null +++ b/piflow-bundle/src/main/resources/microorganism/bioSample.json @@ -0,0 +1,70 @@ +{ + "flow":{ + "name":"test", + "uuid":"1234", + "stops":[ + + { + "uuid":"0000", + "name":"SelectFilesByName", + "bundle":"cn.piflow.bundle.ftp.SelectFilesByName", + "properties":{ + "HDFSUrl":"hdfs://master2.packone:8020", + "HDFSPath":"/microo/", + "selectionConditions":".*ample_set.xml.gz" + } + },{ + "uuid":"2222", + "name":"UnzipFilesOnHDFS", + "bundle":"cn.piflow.bundle.http.UnzipFilesOnHDFS", + "properties":{ + "isCustomize":"false", + "filePath":"", + "hdfsUrl":"hdfs://master2.packone:8020", + "savePath":"/microo/biosample/biosample/" + + } + }, + { + "uuid":"2222", + "name":"BioSampleParse", + "bundle":"cn.piflow.bundle.microorganism.BioSampleParse", + "properties":{ + + } + }, + { + "uuid":"3333", + "name":"putEs", + "bundle":"cn.piflow.bundle.es.PutEs", + "properties":{ + "es_nodes":"10.0.88.70,10.0.88.71,10.0.88.72", + "port":"9200", + "es_index":"sample0122", + "es_type":"sample0122" + } + + } + ], + "paths":[ + { + "from":"SelectFilesByName", + "outport":"", + "inport":"", + "to":"UnzipFilesOnHDFS" + }, + { + "from":"UnzipFilesOnHDFS", + "outport":"", + "inport":"", + "to":"BioSampleParse" + }, + { + "from":"BioSampleParse", + "outport":"", + "inport":"", + "to":"putEs" + } + ] + } +} diff --git a/piflow-bundle/src/main/resources/microorganism/genbank.json b/piflow-bundle/src/main/resources/microorganism/genbank.json new file mode 100644 index 0000000..0cca202 --- /dev/null +++ b/piflow-bundle/src/main/resources/microorganism/genbank.json @@ -0,0 +1,71 @@ +{ + "flow":{ + "name":"test", + "uuid":"1234", + "stops":[ + { + "uuid":"1111", + "name":"SelectFilesByName", + "bundle":"cn.piflow.bundle.ftp.SelectFilesByName", + "properties":{ + "HDFSUrl":"hdfs://master2.packone:8020", + "HDFSPath":"/microo/genbank/", + "selectionConditions":".*.seq.gz" + } + },{ + "uuid":"2222", + "name":"UnzipFilesOnHDFS", + "bundle":"cn.piflow.bundle.http.UnzipFilesOnHDFS", + "properties":{ + "isCustomize":"false", + "filePath":"", + "hdfsUrl":"hdfs://master2.packone:8020", + "savePath":"/microo/genbank/" + + } + }, + { + "uuid":"3333", + "name":"GenBankParse", + "bundle":"cn.piflow.bundle.microorganism.GenBankParse", + "properties":{ + "es_nodes":"10.0.86.239", + "port":"9200", + "es_index":"genbank", + "es_type":"data6" + } + }, + { + "uuid": "4444", + "name": "putEs", + "bundle": "cn.piflow.bundle.es.PutEs", + "properties": { + "es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72", + "port": "9200", + "es_index": "genbank", + "es_type": "genbank1" + } + } + ], + "paths":[ + { + "from":"SelectFilesByName", + "outport":"", + "inport":"", + "to":"UnzipFilesOnHDFS" + }, + { + "from":"UnzipFilesOnHDFS", + "outport":"", + "inport":"", + "to":"GenBankParse" + }, + { + "from":"GenBankParse", + "outport":"", + "inport":"", + "to":"putEs" + } + ] + } +} \ No newline at end of file diff --git a/piflow-bundle/src/main/resources/microorganism/godata.json b/piflow-bundle/src/main/resources/microorganism/godata.json new file mode 100644 index 0000000..47a9c6c --- /dev/null +++ b/piflow-bundle/src/main/resources/microorganism/godata.json @@ -0,0 +1,50 @@ +{ + "flow":{ + "name":"test", + "uuid":"1234", + "stops":[ + { + "uuid":"1111", + "name":"SelectFilesByName", + "bundle":"cn.piflow.bundle.ftp.SelectFilesByName", + "properties":{ + "HDFSUrl":"hdfs://master2.packone:8020", + "HDFSPath":"/microoAll/", + "selectionConditions":"go.obo" + } + }, + { + "uuid": "3333", + "name": "GoDataParse", + "bundle": "cn.piflow.bundle.microorganism.GoDataParse", + "properties": { + } + }, + { + "uuid": "4444", + "name": "putEs", + "bundle": "cn.piflow.bundle.es.PutEs", + "properties": { + "es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72", + "port": "9200", + "es_index": "go", + "es_type": "go" + } + } + ], + "paths":[ + { + "from":"SelectFilesByName", + "outport":"", + "inport":"", + "to":"GoDataParse" + }, + { + "from":"GoDataParse", + "outport":"", + "inport":"", + "to":"putEs" + } + ] + } +} \ No newline at end of file diff --git a/piflow-bundle/src/main/resources/microorganism/goldData.json b/piflow-bundle/src/main/resources/microorganism/goldData.json new file mode 100644 index 0000000..3ad75d2 --- /dev/null +++ b/piflow-bundle/src/main/resources/microorganism/goldData.json @@ -0,0 +1,64 @@ +{ + "flow":{ + "name":"test", + "uuid":"1234", + "stops":[ + { + "uuid":"0000", + "name":"FileDownHDFS", + "bundle":"cn.piflow.bundle.http.FileDownHDFS", + "properties":{ + "url_str":"https://gold.jgi.doe.gov/download?mode=site_excel", + "savePath":"hdfs://master2.packone:8020/microo/golddata/gold.xlsx" + } + }, + { + "uuid": "1111", + "name": "ExcelParser", + "bundle": "cn.piflow.bundle.excel.ExcelParser", + "properties": { + "jaonSavePath":"hdfs://master2.packone:8020/microo/golddata/golddata.json" + } + }, + { + "uuid": "2222", + "name": "GoldDataParse", + "bundle": "cn.piflow.bundle.microorganism.GoldDataParse", + "properties": { + + } + }, + { + "uuid": "3333", + "name": "putEs", + "bundle": "cn.piflow.bundle.es.PutEs", + "properties": { + "es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72", + "port": "9200", + "es_index": "golddata1", + "es_type": "golddatadaa" + } + } + ], + "paths":[ + { + "from":"FileDownHDFS", + "outport":"", + "inport":"", + "to":"ExcelParser" + }, + { + "from":"ExcelParser", + "outport":"", + "inport":"", + "to":"GoldDataParse" + }, + { + "from":"GoldDataParse", + "outport":"", + "inport":"", + "to":"putEs" + } + ] + } +} \ No newline at end of file diff --git a/piflow-bundle/src/main/resources/microorganism/interpro.json b/piflow-bundle/src/main/resources/microorganism/interpro.json new file mode 100644 index 0000000..962a7bb --- /dev/null +++ b/piflow-bundle/src/main/resources/microorganism/interpro.json @@ -0,0 +1,67 @@ +{ + "flow":{ + "name":"test", + "uuid":"1234", + "stops":[ + { + "uuid":"1111", + "name":"SelectFilesByName", + "bundle":"cn.piflow.bundle.ftp.SelectFilesByName", + "properties":{ + "HDFSUrl":"hdfs://master2.packone:8020", + "HDFSPath":"/microoAll/", + "selectionConditions":"interpro.xml.gz" + } + },{ + "uuid":"2222", + "name":"UnzipFilesOnHDFS", + "bundle":"cn.piflow.bundle.http.UnzipFilesOnHDFS", + "properties":{ + "isCustomize":"false", + "filePath":"", + "hdfsUrl":"hdfs://master2.packone:8020", + "savePath":"/microoAll/inter/" + + } + }, + { + "uuid": "3333", + "name": "InterprodataParse", + "bundle": "cn.piflow.bundle.microorganism.InterprodataParse", + "properties": { + } + }, + { + "uuid": "4444", + "name": "putEs", + "bundle": "cn.piflow.bundle.es.PutEs", + "properties": { + "es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72", + "port": "9200", + "es_index": "interpro", + "es_type": "interpro" + } + } + ], + "paths":[ + { + "from":"SelectFilesByName", + "outport":"", + "inport":"", + "to":"UnzipFilesOnHDFS" + }, + { + "from":"UnzipFilesOnHDFS", + "outport":"", + "inport":"", + "to":"InterprodataParse" + }, + { + "from":"InterprodataParse", + "outport":"", + "inport":"", + "to":"putEs" + } + ] + } +} \ No newline at end of file diff --git a/piflow-bundle/src/main/resources/microorganism/taxonomy.json b/piflow-bundle/src/main/resources/microorganism/taxonomy.json new file mode 100644 index 0000000..d4f74f3 --- /dev/null +++ b/piflow-bundle/src/main/resources/microorganism/taxonomy.json @@ -0,0 +1,69 @@ +{ + "flow":{ + "name":"test", + "uuid":"1234", + "stops":[ + { + "uuid":"1111", + "name":"SelectFilesByName", + "bundle":"cn.piflow.bundle.ftp.SelectFilesByName", + "properties":{ + "HDFSUrl":"hdfs://master2.packone:8020", + "HDFSPath":"/microo/taxonomy/", + "selectionConditions":"taxdump.tar.gz" + } + },{ + "uuid":"2222", + "name":"UnzipFilesOnHDFS", + "bundle":"cn.piflow.bundle.http.UnzipFilesOnHDFS", + "properties":{ + "isCustomize":"true", + "filePath":"/microo/taxonomy/taxdump.tar.gz", + "hdfsUrl":"hdfs://master2.packone:8020", + "savePath":"/microo/taxonomy/taxdump/" + + } + }, + { + "uuid":"3333", + "name":"TaxonomyParse", + "bundle":"cn.piflow.bundle.microorganism.TaxonomyParse", + "properties":{ + + } + }, + { + "uuid": "4444", + "name": "putEs", + "bundle": "cn.piflow.bundle.es.PutEs", + "properties": { + "es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72", + "port": "9200", + "es_index": "taxonomy", + "es_type": "taxonomy" + } + } + + ], + "paths":[ + { + "from":"SelectFilesByName", + "outport":"", + "inport":"", + "to":"UnzipFilesOnHDFS" + }, + { + "from":"UnzipFilesOnHDFS", + "outport":"", + "inport":"", + "to":"TaxonomyParse" + }, + { + "from":"TaxonomyParse", + "outport":"", + "inport":"", + "to":"putEs" + } + ] + } +} \ No newline at end of file diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/es/FetchEs.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/es/FetchEs.scala index 4fc908c..5d9573c 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/es/FetchEs.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/es/FetchEs.scala @@ -13,11 +13,10 @@ class FetchEs extends ConfigurableStop { override val inportList: List[String] = List(PortEnum.NonePort.toString) override val outportList: List[String] = List(PortEnum.DefaultPort.toString) - override val description: String = "fetch data with dataframe from elasticSearch " - + override val description: String = "Fetch data from Elasticsearch " var es_nodes:String = _ //es的节点,多个用逗号隔开 - var port:String= _ //es的端口好 + var es_port:String= _ //es的端口好 var es_index:String = _ //es的索引 var es_type:String = _ //es的类型 @@ -27,11 +26,10 @@ class FetchEs extends ConfigurableStop { val ssc = spark.sqlContext val options = Map("es.index.auto.create"-> "true","es.nodes.wan.only"->"true", - "es.nodes"->es_nodes,"es.port"->port) + "es.nodes"->es_nodes,"es.port"->es_port) //load data with df from es val outDf = ssc.read.format("org.elasticsearch.spark.sql").options(options).load(s"${es_index}/${es_type}") - //outDf.show() out.write(outDf) } @@ -41,21 +39,21 @@ class FetchEs extends ConfigurableStop { def setProperties(map : Map[String, Any]): Unit = { es_nodes=MapUtil.get(map,key="es_nodes").asInstanceOf[String] - port=MapUtil.get(map,key="port").asInstanceOf[String] + es_port=MapUtil.get(map,key="es_port").asInstanceOf[String] es_index=MapUtil.get(map,key="es_index").asInstanceOf[String] es_type=MapUtil.get(map,key="es_type").asInstanceOf[String] } override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("es_nodes").defaultValue("").required(true) - val port = new PropertyDescriptor().name("port").displayName("port").defaultValue("").required(true) - val es_index = new PropertyDescriptor().name("es_index").displayName("es_index").defaultValue("").required(true) - val es_type = new PropertyDescriptor().name("es_type").displayName("es_type").defaultValue("").required(true) + val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("es_nodes").defaultValue("Node of Elasticsearch").required(true) + val es_port = new PropertyDescriptor().name("es_port").displayName("es_port").defaultValue("Port of Elasticsearch").required(true) + val es_index = new PropertyDescriptor().name("es_index").displayName("es_index").defaultValue("Index of Elasticsearch").required(true) + val es_type = new PropertyDescriptor().name("es_type").displayName("es_type").defaultValue("Type of Elasticsearch").required(true) descriptor = es_nodes :: descriptor - descriptor = port :: descriptor + descriptor = es_port :: descriptor descriptor = es_index :: descriptor descriptor = es_type :: descriptor diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/es/PutEs.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/es/PutEs.scala index 6514662..fd3e350 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/es/PutEs.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/es/PutEs.scala @@ -9,40 +9,28 @@ import org.elasticsearch.spark.sql.EsSparkSQL class PutEs extends ConfigurableStop { - override val description: String = "put data with dataframe to elasticSearch " + override val description: String = "Put data to Elasticsearch " val authorEmail: String = "ygang@cnic.cn" override val inportList: List[String] = List(PortEnum.DefaultPort.toString) override val outportList: List[String] = List(PortEnum.NonePort.toString) var es_nodes:String = _ //es的节点,多个用逗号隔开 - var port:String= _ //es的端口好 + var es_port:String= _ //es的端口好 var es_index:String = _ //es的索引 var es_type:String = _ //es的类型 def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { val spark = pec.get[SparkSession]() val inDf = in.read() - //inDf.show() val sc = spark.sparkContext val options = Map("es.index.auto.create"-> "true", - "es.nodes"->es_nodes,"es.port"->port) + "es.nodes"->es_nodes,"es.port"->es_port) //保存 df 到es EsSparkSQL.saveToEs(inDf,s"${es_index}/${es_type}",options) - - - // val json1 = """{"name":"jack", "age":24, "sex":"man"}""" - // val json2 = """{"name":"rose", "age":22, "sex":"woman"}""" - // - // val rddData = sc.makeRDD(Seq(json1, json2)) - // - // EsSpark.saveJsonToEs(rddData, "spark/json2",options) - //自定义id - // EsSpark.saveJsonToEs(rddData, "spark/json1", Map("es.mapping.id"->"name")) - } @@ -52,21 +40,21 @@ class PutEs extends ConfigurableStop { def setProperties(map : Map[String, Any]): Unit = { es_nodes=MapUtil.get(map,key="es_nodes").asInstanceOf[String] - port=MapUtil.get(map,key="port").asInstanceOf[String] + es_port=MapUtil.get(map,key="es_port").asInstanceOf[String] es_index=MapUtil.get(map,key="es_index").asInstanceOf[String] es_type=MapUtil.get(map,key="es_type").asInstanceOf[String] } override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("es_nodes").defaultValue("").required(true) - val port = new PropertyDescriptor().name("port").displayName("port").defaultValue("").required(true) - val es_index = new PropertyDescriptor().name("es_index").displayName("es_index").defaultValue("").required(true) - val es_type = new PropertyDescriptor().name("es_type").displayName("es_type").defaultValue("").required(true) + val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("es_nodes").defaultValue("Node of Elasticsearch").required(true) + val es_port = new PropertyDescriptor().name("es_port").displayName("es_port").defaultValue("Port of Elasticsearch").required(true) + val es_index = new PropertyDescriptor().name("es_index").displayName("es_index").defaultValue("Index of Elasticsearch").required(true) + val es_type = new PropertyDescriptor().name("es_type").displayName("es_type").defaultValue("Type of Elasticsearch").required(true) descriptor = es_nodes :: descriptor - descriptor = port :: descriptor + descriptor = es_port :: descriptor descriptor = es_index :: descriptor descriptor = es_type :: descriptor diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/es/QueryEs.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/es/QueryEs.scala index 685c5f3..ed6b844 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/es/QueryEs.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/es/QueryEs.scala @@ -5,73 +5,32 @@ import cn.piflow.conf.util.{ImageUtil, MapUtil} import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} import org.apache.spark.sql.SparkSession -import org.elasticsearch.spark.rdd.EsSpark class QueryEs extends ConfigurableStop { val authorEmail: String = "ygang@cnic.cn" override val inportList: List[String] = List(PortEnum.NonePort.toString) override val outportList: List[String] = List(PortEnum.DefaultPort.toString) - override val description: String = "query data with dataframe from elasticSearch " + override val description: String = "Query data from Elasticsearch " var es_nodes:String = _ //es的节点,多个用逗号隔开 - var port:String= _ //es的端口好 + var es_port:String= _ //es的端口好 var es_index:String = _ //es的索引 var es_type:String = _ //es的类型 - var field_name:String = _ //es的字段类型 - var field_content:String = _ //es的字段内容 + var jsonDSL:String = _ //es的字段类型 def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { val spark = pec.get[SparkSession]() val ssc = spark.sqlContext - // 查询 语句 字段:内容 - val query = - s""" - |{ - | "query":{ - | "match":{ - | - | "${field_name}":"${field_content}" - | } - | } - |} - """.stripMargin - -// val query2 = -// s""" -// |{ -// | "query":{ -// | "terms":{ -// | -// | "age":[22] -// | } -// | } -// |} -// """.stripMargin -// -// val query3 = -// s""" -// |{ -// | "query":{ -// | "match":{ -// | "name":"rose *" -// | } -// | } -// |} -// """.stripMargin - - val options = Map("es.index.auto.create"-> "true", "es.nodes.wan.only"->"true", - "es.query" -> query, - "es.nodes"->es_nodes,"es.port"->port) - + "es.query" -> jsonDSL, + "es.nodes"->es_nodes,"es.port"->es_port) val outDf = ssc.read.format("org.elasticsearch.spark.sql") .options(options).load(s"${es_index}/${es_type}") - //outDf.show() out.write(outDf) } @@ -82,30 +41,26 @@ class QueryEs extends ConfigurableStop { def setProperties(map : Map[String, Any]): Unit = { es_nodes=MapUtil.get(map,key="es_nodes").asInstanceOf[String] - port=MapUtil.get(map,key="port").asInstanceOf[String] + es_port=MapUtil.get(map,key="es_port").asInstanceOf[String] es_index=MapUtil.get(map,key="es_index").asInstanceOf[String] es_type=MapUtil.get(map,key="es_type").asInstanceOf[String] - field_name=MapUtil.get(map,key="field_name").asInstanceOf[String] - field_content=MapUtil.get(map,key="field_content").asInstanceOf[String] + jsonDSL=MapUtil.get(map,key="jsonDSL").asInstanceOf[String] } override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("es_nodes").defaultValue("").required(true) - val port = new PropertyDescriptor().name("port").displayName("port").defaultValue("").required(true) - val es_index = new PropertyDescriptor().name("es_index").displayName("es_index").defaultValue("").required(true) - val es_type = new PropertyDescriptor().name("es_type").displayName("es_type").defaultValue("").required(true) - val field_name = new PropertyDescriptor().name("field_name").displayName("field_name").defaultValue("").required(true) - val field_content = new PropertyDescriptor().name("field_content").displayName("field_content").defaultValue("").required(true) + val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("es_nodes").defaultValue("Node of Elasticsearch").required(true) + val es_port = new PropertyDescriptor().name("es_port").displayName("es_port").defaultValue("Port of Elasticsearch").required(true) + val es_index = new PropertyDescriptor().name("es_index").displayName("es_index").defaultValue("Index of Elasticsearch").required(true) + val es_type = new PropertyDescriptor().name("es_type").displayName("es_type").defaultValue("Type of Elasticsearch").required(true) descriptor = es_nodes :: descriptor - descriptor = port :: descriptor + descriptor = es_port :: descriptor descriptor = es_index :: descriptor descriptor = es_type :: descriptor - descriptor = field_name :: descriptor - descriptor = field_content :: descriptor + descriptor } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/excel/ExcelParser.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/excel/ExcelParser.scala index 0e0649f..5ef0f77 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/excel/ExcelParser.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/excel/ExcelParser.scala @@ -1,15 +1,16 @@ package cn.piflow.bundle.excel -import java.io.File +import java.io.{BufferedInputStream, ByteArrayInputStream} import cn.piflow._ import cn.piflow.bundle.util.ExcelToJson import cn.piflow.conf._ import cn.piflow.conf.bean.PropertyDescriptor -import cn.piflow.conf.util.ImageUtil +import cn.piflow.conf.util.{ImageUtil, MapUtil} import net.sf.json.JSONArray -import org.apache.spark.sql.{Row, SparkSession} - +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem, Path} +import org.apache.spark.sql.{DataFrame, SparkSession} class ExcelParser extends ConfigurableStop{ @@ -18,38 +19,89 @@ class ExcelParser extends ConfigurableStop{ val inportList: List[String] = List(PortEnum.DefaultPort.toString) val outportList: List[String] = List(PortEnum.DefaultPort.toString) - var excelPath: String = _ + var jsonSavePath: String = _ + - var list = List("") def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { val spark = pec.get[SparkSession]() val sc = spark.sparkContext - import spark.implicits._ val inDf = in.read() - - val rows: Array[Row] = inDf.collect() - for (i <- 0 until rows.length){ - val path1 = rows(i)(0).toString - println("***"+path1+"---") - // "excelPath":"/ftpGoldData/test1.xlsx" - if (path1.endsWith(".xls") || path1.endsWith("xlsx")){ - println(path1) - val f1 = new File(path1) - // 调用 工具类 解析 Excel .xls .xlsx - val array: JSONArray = ExcelToJson.readExcel(f1) - - for (i <- 0 until array.size()){ - list = array.get(i).toString :: list - } - } + val configuration: Configuration = new Configuration() + var pathStr: String =inDf.take(1)(0).get(0).asInstanceOf[String] + val pathARR: Array[String] = pathStr.split("\\/") + var hdfsUrl:String="" + for (x <- (0 until 3)){ + hdfsUrl+=(pathARR(x) +"/") } - val outDF = sc.parallelize(list).toDF("jsonObject") - //println(outDF.count()) - //outDF.show() - out.write(outDF) + configuration.set("fs.defaultFS",hdfsUrl) + var fs: FileSystem = FileSystem.get(configuration) + + val path: Path = new Path(jsonSavePath) + if(fs.exists(path)){ + fs.delete(path) + } + fs.create(path).close() + + var fdosOut: FSDataOutputStream = fs.append(path) + var jsonStr: String ="" + var bisIn: BufferedInputStream =null + + + var count = 0 ; + inDf.collect().foreach(row=>{ + val pathStr = row.get(0).asInstanceOf[String] + + if (pathStr.endsWith(".xls") || pathStr.endsWith("xlsx")){ + + val array: JSONArray = ExcelToJson.readExcel(pathStr,hdfsUrl) + + println(array.size()) + + for (i <- 0 until array.size()){ + jsonStr = array.get(i).toString + + if (count == 0) { + bisIn = new BufferedInputStream(new ByteArrayInputStream(("[" + jsonStr).getBytes())) + count+=1 + } else if (count==1){ + bisIn = new BufferedInputStream(new ByteArrayInputStream(("," + jsonStr).getBytes())) + } + + val buff: Array[Byte] = new Array[Byte](1048576) + var num: Int = bisIn.read(buff) + while (num != -1) { + fdosOut.write(buff, 0, num) + fdosOut.flush() + num = bisIn.read(buff) + } + + fdosOut.flush() + bisIn = null + + } + } + }) + + bisIn = new BufferedInputStream(new ByteArrayInputStream(("]").getBytes())) + val buff: Array[Byte] = new Array[Byte](1048576) + + var num: Int = bisIn.read(buff) + while (num != -1) { + fdosOut.write(buff, 0, num) + fdosOut.flush() + num = bisIn.read(buff) + } + + fdosOut.flush() + fdosOut.close() + + val df: DataFrame = spark.read.json(jsonSavePath) + + out.write(df) + } @@ -58,13 +110,13 @@ class ExcelParser extends ConfigurableStop{ } def setProperties(map : Map[String, Any]): Unit = { - // excelPath = MapUtil.get(map,"excelPath").asInstanceOf[String] + jsonSavePath = MapUtil.get(map,"jsonSavePath").asInstanceOf[String] } override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() -// val excelPath = new PropertyDescriptor().name("excelPath").displayName("excelPath").description("The path of excel file").defaultValue("").required(true) -// descriptor = excelPath :: descriptor + val jsonSavePath = new PropertyDescriptor().name("jsonSavePath").displayName("jsonSavePath").description("save path of json").defaultValue("").required(true) + descriptor = jsonSavePath :: descriptor descriptor } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/DeleteHdfs.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/DeleteHdfs.scala index 670fd18..7675992 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/DeleteHdfs.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/DeleteHdfs.scala @@ -34,20 +34,8 @@ class DeleteHdfs extends ConfigurableStop{ config.set("fs.defaultFS",hdfsUrl) val fs = FileSystem.get(config) - println(path+"ddddddddddddddddddd--delete") fs.delete(path,true) -// if (fs.isDirectory(path)){ -// println("-------wenjianjia-----") -// fs.delete(path,true) -// } -// -// else if(fs.isFile(path)){ -// println("wenjian -------------------------") -// fs.delete(path,true) -// } else { -// fs.delete(path, true) -// } } } override def setProperties(map: Map[String, Any]): Unit = { diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/GetHdfs.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/GetHdfs.scala index 5be8efa..a377cb1 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/GetHdfs.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/GetHdfs.scala @@ -9,7 +9,7 @@ import org.apache.spark.sql.SparkSession class GetHdfs extends ConfigurableStop{ override val authorEmail: String = "ygang@cmic.com" - override val description: String = "write dataframe data from hdfs" + override val description: String = "get data from hdfs" override val inportList: List[String] = List(PortEnum.NonePort.toString) override val outportList: List[String] = List(PortEnum.DefaultPort.toString) @@ -22,22 +22,8 @@ class GetHdfs extends ConfigurableStop{ val sc= spark.sparkContext import spark.implicits._ - println(hdfsPath+"gggggggggggggggggggggggg ---getHdfs---txt") - val path = hdfsUrl+hdfsPath -// val array = hdfsPath.split(",") -// -// val buffer = new StringBuffer() -// for (i<- 0 until array.length) { -// if (i== array.length-1){ -// buffer.append(hdfsUrl+array(i)) -// } else { -// buffer.append(hdfsUrl+array(i)+",") -// } -// } - // println(buffer.toString) - if (types == "json") { val rdd = spark.read.json(path) //rdd.show() @@ -56,7 +42,6 @@ class GetHdfs extends ConfigurableStop{ //rdd.show() rdd.schema.printTreeString() out.write(rdd) - } else { val rdd = sc.textFile(path) @@ -66,17 +51,6 @@ class GetHdfs extends ConfigurableStop{ out.write(outDf) } - - - - -// val rdd = spark.read.text("hdfs://10.0.86.89:9000/yg/test/hdf1.txt") -// rdd.show() -// rdd.schema.printTreeString() - // println( rdd.count()) - // val rdd = ssc.read.load("hdfs://10.0.86.89:9000/yg/test/hdf1.txt") - - } override def setProperties(map: Map[String, Any]): Unit = { hdfsUrl = MapUtil.get(map,key="hdfsUrl").asInstanceOf[String] diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/ListHdfs.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/ListHdfs.scala index 5a0f5ca..7418a88 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/ListHdfs.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/ListHdfs.scala @@ -30,11 +30,7 @@ class ListHdfs extends ConfigurableStop{ import spark.implicits._ - val outDF = sc.parallelize(list).toDF("path") - - //outDF.show() - outDF.schema.printTreeString() - + val outDF = sc.parallelize(list).toDF() out.write(outDF) } @@ -56,7 +52,6 @@ class ListHdfs extends ConfigurableStop{ iterationFile(fsPath) } else{ - list = f.getPath.toString::list } } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/PutHdfs.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/PutHdfs.scala index 19d8f5e..1e50e92 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/PutHdfs.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/PutHdfs.scala @@ -14,25 +14,20 @@ class PutHdfs extends ConfigurableStop{ override val authorEmail: String = "ygang@cmic.com" override val inportList: List[String] = List(PortEnum.DefaultPort.toString) override val outportList: List[String] = List(PortEnum.NonePort.toString) - override val description: String = "from dataframe write data to hdfs" + override val description: String = "Put data to hdfs" var hdfsPath :String= _ var hdfsUrl :String= _ var types :String= _ - var partition :Int= 3 + var partition :Int= _ override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { val spark = pec.get[SparkSession]() val inDF = in.read() - //inDF.show() - inDF.schema.printTreeString() - - //val path = new Path(hdfsUrl+hdfsPath) val config = new Configuration() config.set("fs.defaultFS",hdfsUrl) val fs = FileSystem.get(config) - println(hdfsUrl+hdfsPath+"pppppppppppppppppppppppppppppppp--putHdfs") if (types=="json"){ inDF.repartition(partition).write.json(hdfsUrl+hdfsPath) @@ -48,7 +43,7 @@ class PutHdfs extends ConfigurableStop{ hdfsUrl = MapUtil.get(map,key="hdfsUrl").asInstanceOf[String] hdfsPath = MapUtil.get(map,key="hdfsPath").asInstanceOf[String] types = MapUtil.get(map,key="types").asInstanceOf[String] - val partition1 = MapUtil.get(map,key="partition").asInstanceOf[String] + partition = MapUtil.get(map,key="partition").asInstanceOf[Int] } override def getPropertyDescriptor(): List[PropertyDescriptor] = { diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/http/GetUrl.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/http/GetUrl.scala index 95bc287..efd969e 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/http/GetUrl.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/http/GetUrl.scala @@ -32,7 +32,6 @@ class GetUrl extends ConfigurableStop{ // xml String var label:String=_ var schema: String = _ -// var xmlString :String=_ override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { @@ -45,24 +44,17 @@ class GetUrl extends ConfigurableStop{ val response:CloseableHttpResponse = client.execute(getFlowInfo) val entity = response.getEntity val jsonString = EntityUtils.toString(entity,"UTF-8") - println("-------------------------------------------") - - if (types == "json"){ // json to df val jsonRDD = ss.sparkContext.makeRDD(jsonString :: Nil) val jsonDF = ss.read.json(jsonRDD) - //jsonDF.schema.printTreeString() - //jsonDF.show(10) - //jsonDF.select("app.id").show() out.write(jsonDF) } if(types=="xml"){ - println("8888888888888888888888888888888888888888888888888888888") val doc: Document = DocumentHelper.parseText(jsonString) val rootElt: Element = doc.getRootElement var arrbuffer:ArrayBuffer[Element]=ArrayBuffer() @@ -98,8 +90,6 @@ class GetUrl extends ConfigurableStop{ list.+=(text.substring(0,text.length-1)) } - - val listRows: List[Row] = list.toList.map(line => { val seq: Seq[String] = line.split(",").toSeq val row = Row.fromSeq(seq) @@ -115,10 +105,7 @@ class GetUrl extends ConfigurableStop{ val outDf: DataFrame = ss.createDataFrame(rowRDD,structType) - //outDf.show(20) - outDf.schema.printTreeString() out.write(outDf) - } @@ -128,8 +115,6 @@ class GetUrl extends ConfigurableStop{ override def setProperties(map: Map[String, Any]): Unit = { url = MapUtil.get(map,key="url").asInstanceOf[String] types= MapUtil.get(map,key="types").asInstanceOf[String] - -// xmlString = MapUtil.get(map,"XmlString").asInstanceOf[String] label = MapUtil.get(map,"label").asInstanceOf[String] schema = MapUtil.get(map,"schema").asInstanceOf[String] @@ -139,9 +124,6 @@ class GetUrl extends ConfigurableStop{ var descriptor : List[PropertyDescriptor] = List() val url = new PropertyDescriptor().name("url").displayName("url").defaultValue("").required(true) val types = new PropertyDescriptor().name("types").displayName("types").defaultValue("the url content is json or xml)").required(true) - -// val xmlString = new PropertyDescriptor().name("XmlString").displayName("XmlString").description("the xml String").defaultValue("").required(true) -// descriptor = xmlString :: descriptor val label = new PropertyDescriptor().name("label").displayName("label").description("label path for hope,the delimiter is ,").defaultValue("").required(true) descriptor = label :: descriptor val schema = new PropertyDescriptor().name("schema").displayName("schema").description("name of field in label,the delimiter is ,").defaultValue("").required(true) diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/http/InvokeUrl.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/http/InvokeUrl.scala index 3a2f8c5..2108a13 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/http/InvokeUrl.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/http/InvokeUrl.scala @@ -28,10 +28,6 @@ class InvokeUrl extends ConfigurableStop{ override val outportList: List[String] = List(PortEnum.NonePort.toString) override val description: String = "invoke http " -// var urlPut :String= _ -// var urlPost :String= _ -// var urlDelete :String= _ -// var urlGet :String= _ var url :String= _ var jsonPath :String =_ @@ -56,8 +52,6 @@ class InvokeUrl extends ConfigurableStop{ val entity = response.getEntity val jsonString = EntityUtils.toString(entity, "UTF-8") - println("=====================================================================invoke get") - // json to df if (types == "json") { // json to df @@ -122,12 +116,7 @@ class InvokeUrl extends ConfigurableStop{ val outDf: DataFrame = spark.createDataFrame(rowRDD, structType) - //outDf.show(20) - //outDf.schema.printTreeString() out.write(outDf) - - println("====================================================================") - } @@ -143,7 +132,6 @@ class InvokeUrl extends ConfigurableStop{ buffer.append(lineTxt.mkString) lineTxt = bufferReader.readLine() } - println(buffer) if (method == "putHttp") { val put = new HttpPut(url) @@ -157,8 +145,6 @@ class InvokeUrl extends ConfigurableStop{ if (entity != null) { result = EntityUtils.toString(entity, "utf-8") } - println(response) - println(result) put.releaseConnection() } else { val post = new HttpPost(url) @@ -168,22 +154,17 @@ class InvokeUrl extends ConfigurableStop{ val response = client.execute(post) val entity = response.getEntity val str = EntityUtils.toString(entity, "UTF-8") - println(response) - println("Code is " + str) } } if (method == "deleteHttp") { - println(url) + val inDf = in.read() inDf.createOrReplaceTempView("table") val sqlDF = inDf.sqlContext.sql(s"select $colume from table") - //sqlDF.show() val array = sqlDF.collect() - - for (i <- 0 until array.length) { var url1 = "" val newArray = array(i) @@ -197,10 +178,7 @@ class InvokeUrl extends ConfigurableStop{ builder.append(columns(i) + "=" + newArray(i) + "&") } } - // println(builder) - url1 = url + "?" + builder - println(url1 + "##########################################################") val delete = new HttpDelete(url1) delete.setHeader("content-Type", "application/json") @@ -208,8 +186,6 @@ class InvokeUrl extends ConfigurableStop{ val response = client.execute(delete) val entity = response.getEntity val str = EntityUtils.toString(entity, "UTF-8") - println("Code is " + str) - println(response) } @@ -221,10 +197,6 @@ class InvokeUrl extends ConfigurableStop{ override def setProperties(map: Map[String, Any]): Unit = { url = MapUtil.get(map,key="url").asInstanceOf[String] -// urlPut = MapUtil.get(map,key="urlPut").asInstanceOf[String] -// urlPost = MapUtil.get(map,key="urlPost").asInstanceOf[String] -// urlDelete = MapUtil.get(map,key="urlDelete").asInstanceOf[String] -// urlGet = MapUtil.get(map,key="urlGet").asInstanceOf[String] jsonPath = MapUtil.get(map,key="jsonPath").asInstanceOf[String] method = MapUtil.get(map,key = "method").asInstanceOf[String] @@ -240,10 +212,6 @@ class InvokeUrl extends ConfigurableStop{ override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() -// val urlPut = new PropertyDescriptor().name("urlPut").displayName("urlPutPost").defaultValue("").required(true) -// val urlPost = new PropertyDescriptor().name("urlPost").displayName("urlPutPost").defaultValue("").required(true) -// val urlDelete = new PropertyDescriptor().name("urlDelete").displayName("urlPutPost").defaultValue("").required(true) -// val urlGet = new PropertyDescriptor().name("urlGet").displayName("urlGet").defaultValue("").required(true) val url = new PropertyDescriptor().name("url").displayName("url").defaultValue("").required(true) val jsonPath = new PropertyDescriptor().name("jsonPath").displayName("JSONPATH").defaultValue("").required(true) val method = new PropertyDescriptor().name("method").displayName("the way with http").defaultValue("").required(true) @@ -258,10 +226,6 @@ class InvokeUrl extends ConfigurableStop{ val schema = new PropertyDescriptor().name("schema").displayName("schema").description("name of field in label,the delimiter is ,").defaultValue("").required(true) descriptor = schema :: descriptor -// descriptor = urlPut :: descriptor -// descriptor = urlPost :: descriptor -// descriptor = urlDelete :: descriptor -// descriptor = urlGet :: descriptor descriptor = jsonPath :: descriptor descriptor = method :: descriptor descriptor = colume :: descriptor diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/http/PostUrl.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/http/PostUrl.scala index 602477c..00d283b 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/http/PostUrl.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/http/PostUrl.scala @@ -41,7 +41,7 @@ class PostUrl extends ConfigurableStop{ buffer.append(lineTxt.mkString) lineTxt=bufferReader.readLine() } - println(buffer) + // post val client = HttpClients.createDefault() @@ -54,7 +54,6 @@ class PostUrl extends ConfigurableStop{ val response = client.execute(post) val entity = response.getEntity val str = EntityUtils.toString(entity,"UTF-8") - println(response) println("Code is " + str) } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/BioProjetDataParse.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/BioProjetDataParse.scala index 2298f9f..6007bf6 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/BioProjetDataParse.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/BioProjetDataParse.scala @@ -1,30 +1,28 @@ package cn.piflow.bundle.microorganism import java.io._ -import java.net.UnknownHostException -import java.util.regex.Pattern +import java.util.regex.{Matcher, Pattern} +import cn.piflow.bundle.microorganism.util.BioProject import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.util.{ImageUtil, MapUtil} import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} -import org.apache.spark.sql.{Row, SparkSession} -import org.elasticsearch.spark.sql.EsSparkSQL -import org.json.XML +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, FileSystem, Path} +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.json.{JSONArray, JSONObject, XML} + + class BioProjetDataParse extends ConfigurableStop{ val authorEmail: String = "ygang@cnic.cn" - val description: String = "Load file from ftp url." + val description: String = "Parsing BioProjet type data" val inportList: List[String] = List(PortEnum.DefaultPort.toString) val outportList: List[String] = List(PortEnum.NonePort.toString) - - var es_nodes:String = _ //es的节点,多个用逗号隔开 - var port:String= _ //es的端口好 - var es_index:String = _ //es的索引 - var es_type:String = _ //es的类型 - + var cachePath:String = _ var name:String = "Package" var dp = Pattern.compile("((\\d{4})-(\\d{2})-(\\d{2}))(T.*)") @@ -33,187 +31,193 @@ class BioProjetDataParse extends ConfigurableStop{ val sc = spark.sparkContext val inDf= in.read() - //inDf.show() - //inDf.schema.printTreeString() - val rows: Array[Row] = inDf.collect() + val configuration: Configuration = new Configuration() + var pathStr: String =inDf.take(1)(0).get(0).asInstanceOf[String] + val pathARR: Array[String] = pathStr.split("\\/") + var hdfsUrl:String="" + for (x <- (0 until 3)){ + hdfsUrl+=(pathARR(x) +"/") + } + configuration.set("fs.defaultFS",hdfsUrl) + var fs: FileSystem = FileSystem.get(configuration) - var path:String = null - for (i <- 0 until rows.size) { - if (rows(i)(0).toString.endsWith("bioproject.xml")){ - path = rows(i)(0).toString + val hdfsPathTemporary = hdfsUrl+cachePath+"/bioprojectCatch/bioproject.json" -// val path1 = "/ftpBioProject/bioproject.xml" - try { - val br = new BufferedReader(new FileReader(path)) + val path: Path = new Path(hdfsPathTemporary) + if(fs.exists(path)){ + fs.delete(path) + } + fs.create(path).close() + var fdosOut: FSDataOutputStream = fs.append(path) + var jsonStr: String ="" + var bisIn: BufferedInputStream =null + + + inDf.collect().foreach(row =>{ + val pathStr = row.get(0).asInstanceOf[String] + + var fdis: FSDataInputStream = fs.open(new Path(pathStr)) + val br: BufferedReader = new BufferedReader(new InputStreamReader(fdis)) var line: String = null - - var i = 0 - while (i < 2) { + var i= 0 + while(i<2){ br.readLine() i = i + 1 } var count = 0 var xml = new StringBuffer() var x = 0 - while ((line = br.readLine()) != null || x ==0) { - + while ((line = br.readLine()) != null && x <1 && line!= null) { xml.append(line) - if (line.equals("")) { + if (line.equals("")){ println("----------------------------------break") x == 1 - return x - } - else if (line.indexOf("") != -1) { //reach the end of a doc - println("-----------------------------------------"+count) + } else if (line.indexOf("") != -1){ //reach the end of a doc count = count + 1 - val doc = XML.toJSONObject(xml.toString()).getJSONObject(name) - println("#####################################################"+count) - println(doc) + val doc: JSONObject = XML.toJSONObject(xml.toString()).getJSONObject(name) - xml = new StringBuffer() - - // accession PRJNA31525 val accession = doc.getJSONObject("Project").getJSONObject("Project") .getJSONObject("ProjectID") .getJSONObject("ArchiveID") .getString("accession") val projectDescr = doc.getJSONObject("Project").getJSONObject("Project") - .getJSONObject("ProjectDescr") + .getJSONObject("ProjectDescr") - // 加载 json 字符串 为 df - val jsonRDD = spark.sparkContext.makeRDD(doc.toString() :: Nil) - val jsonDF = spark.read.json(jsonRDD) - //jsonDF.show() - // jsonDF.schema.printTreeString() + val bio = new BioProject + bio.convertConcrete2KeyVal(projectDescr,"LocusTagPrefix") - val options = Map("es.index.auto.create"-> "true", -// "es.mapping.id"->accession, - "es.nodes"->es_nodes,"es.port"->port) + // --------------1 + if (projectDescr.opt("ProjectReleaseDate") != null){ + val date = projectDescr.get("ProjectReleaseDate").toString + val m: Matcher = dp.matcher(date) + if (m.matches()){ + projectDescr.put("ProjectReleaseDate",m.group(1)) + projectDescr.put("ProjectReleaseYear",Integer.parseInt(m.group(2))) - // df 写入 es - EsSparkSQL.saveToEs(jsonDF,s"${es_index}/${es_type}",options) + } else { + projectDescr.put("ProjectReleaseDate",date) + } + } + // ----------------2 + if (projectDescr.optJSONObject("Publication") !=null){ + val pub = projectDescr.getJSONObject("Publication") + if (pub.opt("date") !=null){ + val date = pub.get("date").toString + val m: Matcher = dp.matcher(date) + if (m.matches()){ + pub.put("date",m.group(1)) + pub.put("year",Integer.parseInt(m.group(2))) + } else { + pub.put("date",date) + } + } + } + // ----------------3 + if(doc.getJSONObject("Project").optJSONObject("Submission") != null){ + val submission = doc.getJSONObject("Project").optJSONObject("Submission") -// val bio = new BioProject -// bio.convertConcrete2KeyVal(projectDescr,"LocusTagPrefix") + if(submission.opt("submitted") != null){ -// --------------1 -// if (projectDescr.opt("ProjectReleaseDate") != null){ -// val date = projectDescr.get("ProjectReleaseDate").toString -// val m = dp.matcher(date) -// if (m.matches()){ -// // m.group(1) 2017-04-25 -// // m.group(2)) 2017 -// projectDescr.put("ProjectReleaseDate",m.group(1)) -// projectDescr.put("ProjectReleaseDate",Integer.parseInt(m.group(2))) -// -// } else { -// // date 2012-05-21T00:00:00Z -// projectDescr.put("ProjectReleaseDate",date) -// } -// } + val date = submission.get("submitted") + submission.put("submission_year", Integer.parseInt(date.toString().substring(0, 4))); -// ----------------2 -// if (projectDescr.optJSONObject("Publication") !=null){ -// val pub = projectDescr.getJSONObject("Publication") -// if (pub.opt("date") !=null){ -// val date = projectDescr.getJSONObject("Publication").get("date").toString -// val m = dp.matcher(date) -// if (m.matches()){ -// // m.group(1) 2017-04-25 -// // m.group(2)) 2017 -// projectDescr.put("date",m.group(1)) -// projectDescr.put("year",Integer.parseInt(m.group(2))) -// } else { -// // date 2012-05-21T00:00:00Z -// projectDescr.put("date","##############99#") -// } -// } -// } -// + } + } -// ----------------3 -// if(doc.optJSONObject("Submission").optJSONObject("submitted") != null){ -// val submission = doc.optJSONObject("Submission").optJSONObject("submitted"); -// if(submission.opt("submitted") != null){ -// val date = submission.get("submitted"); -// submission.put("submission_year", Integer.parseInt(date.toString().substring(0, 4))); -// } -// } -// ----------------4 -// val grant = projectDescr.opt("Grant"); -// if(grant != null){ -// if(grant isInstanceOf[JSONArray]){ -// for(int k = 0 ; k < ((JSONArray)grant).length(); k++){ -// JSONObject singleGrant = (JSONObject)((JSONArray)grant).get(k); -// convertConcrete2KeyVal(singleGrant, "Agency"); -// } -// }else if(grant instanceof JSONObject){ -// convertConcrete2KeyVal((JSONObject)grant, "Agency"); -// } -// } + // ----------------4 + val grant: Object = projectDescr.opt("Grant") + if(grant != null){ + if(grant.isInstanceOf[JSONArray]){ + val array: JSONArray = grant.asInstanceOf[JSONArray] + for(k <- 0 until array.length()){ + val singleGrant = array.get(k).asInstanceOf[JSONObject] + bio.convertConcrete2KeyVal(singleGrant, "Agency"); + } + } + else if(grant.isInstanceOf[JSONObject]){ + val array: JSONObject = grant.asInstanceOf[JSONObject] + bio.convertConcrete2KeyVal(array, "Agency"); + } + } + // ----------------5 + val projectID = doc.getJSONObject("Project").getJSONObject("Project").getJSONObject("ProjectID"); + bio.convertConcrete2KeyVal(projectID, "LocalID"); + val organization = doc.getJSONObject("Project").optJSONObject("Submission").optJSONObject("Description").opt("Organization"); + if(organization.isInstanceOf[JSONArray] ){ + val array: JSONArray = organization.asInstanceOf[JSONArray] + for(k <- 0 until array.length()){ + val orgz = array.get(k).asInstanceOf[JSONObject] + bio.convertConcrete2KeyVal(orgz, "Name"); + } + }else if(organization.isInstanceOf[JSONObject]){ + val orgz: JSONObject = organization.asInstanceOf[JSONObject] + bio.convertConcrete2KeyVal(orgz, "Name"); + } -// ----------------5 -// val projectID = doc.getJSONObject("Project").getJSONObject("Project").getJSONObject("ProjectID"); -// bio.convertConcrete2KeyVal(projectID, "LocalID"); -// Object organization = doc.optJSONObject("Submission").optJSONObject("Submission").optJSONObject("Description").opt("Organization"); -// if(organization instanceof JSONArray){ -// for(int j = 0; j < ((JSONArray) organization).length(); j++){ -// val orgz = ((JSONArray) organization).get(j); -// bio.convertConcrete2KeyVal(((JSONObject)orgz), "Name"); -// } -// }else if(organization instanceof JSONObject){ -// val orgz = (JSONObject)organization; -// bio.convertConcrete2KeyVal(orgz, "Name"); -// } + // ----------------6 + val projTypeSubmission = doc.getJSONObject("Project").getJSONObject("Project").getJSONObject("ProjectType").optJSONObject("ProjectTypeSubmission"); -// ----------------6 -// val projTypeSubmission = doc.getJSONObject("Project").getJSONObject("Project").getJSONObject("ProjectType").optJSONObject("ProjectTypeSubmission"); -// if(projTypeSubmission != null){ -// val bioSampleSet = projTypeSubmission.getJSONObject("Target").optJSONObject("BioSampleSet"); -// if(bioSampleSet != null){ -// bio.convertConcrete2KeyVal(bioSampleSet, "ID"); -// } -// } + if(projTypeSubmission != null){ + val bioSampleSet = projTypeSubmission.getJSONObject("Target").optJSONObject("BioSampleSet"); + if(bioSampleSet != null){ + bio.convertConcrete2KeyVal(bioSampleSet, "ID"); + } + } + if (count ==1 ) { + bisIn = new BufferedInputStream(new ByteArrayInputStream(("[" + doc.toString).getBytes())) + } + else { + bisIn = new BufferedInputStream(new ByteArrayInputStream(("," + doc.toString).getBytes())) + } + val buff: Array[Byte] = new Array[Byte](1048576) + var num: Int = bisIn.read(buff) + while (num != -1) { + fdosOut.write(buff, 0, num) + fdosOut.flush() + num = bisIn.read(buff) + } + fdosOut.flush() + bisIn = null + xml = new StringBuffer() } } + }) - } catch { - case e: UnknownHostException => - e.printStackTrace() - case e: FileNotFoundException => - e.printStackTrace() - case e: IOException => - e.printStackTrace() - } - } + bisIn = new BufferedInputStream(new ByteArrayInputStream(("]").getBytes())) + val buff: Array[Byte] = new Array[Byte](1048576) + + var num: Int = bisIn.read(buff) + while (num != -1) { + fdosOut.write(buff, 0, num) + fdosOut.flush() + num = bisIn.read(buff) } + + fdosOut.flush() + fdosOut.close() + + println("start parser HDFSjsonFile") + val df: DataFrame = spark.read.json(hdfsPathTemporary) + + df.schema.printTreeString() + + out.write(df) + } def setProperties(map: Map[String, Any]): Unit = { - es_nodes=MapUtil.get(map,key="es_nodes").asInstanceOf[String] - port=MapUtil.get(map,key="port").asInstanceOf[String] - es_index=MapUtil.get(map,key="es_index").asInstanceOf[String] - es_type=MapUtil.get(map,key="es_type").asInstanceOf[String] + cachePath=MapUtil.get(map,key="cachePath").asInstanceOf[String] } override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("es_nodes").defaultValue("").required(true) - val port = new PropertyDescriptor().name("port").displayName("port").defaultValue("").required(true) - val es_index = new PropertyDescriptor().name("es_index").displayName("es_index").defaultValue("").required(true) - val es_type = new PropertyDescriptor().name("es_type").displayName("es_type").defaultValue("").required(true) - - - descriptor = es_nodes :: descriptor - descriptor = port :: descriptor - descriptor = es_index :: descriptor - descriptor = es_type :: descriptor - + val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").defaultValue("").required(true) + descriptor = cachePath :: descriptor descriptor } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/BioSampleParse.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/BioSampleParse.scala index 58c3570..9403b4d 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/BioSampleParse.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/BioSampleParse.scala @@ -7,62 +7,67 @@ import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.util.{ImageUtil, MapUtil} import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} -import org.apache.spark.sql.{Row, SparkSession} -import org.elasticsearch.spark.sql.EsSparkSQL +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, FileSystem, Path} +import org.apache.spark.sql.{DataFrame, SparkSession} import org.json.{JSONArray, JSONObject, XML} - class BioSampleParse extends ConfigurableStop{ val authorEmail: String = "ygang@cnic.cn" - val description: String = "Load file from ftp url." + val description: String = "Parsing BioSample type data" val inportList: List[String] = List(PortEnum.DefaultPort.toString) - val outportList: List[String] = List(PortEnum.NonePort.toString) + val outportList: List[String] = List(PortEnum.DefaultPort.toString) - - var es_nodes:String = _ //es的节点,多个用逗号隔开 - var port:String= _ //es的端口好 - var es_index:String = _ //es的索引 - var es_type:String = _ //es的类型 + var cachePath:String = _ var docName = "BioSample" - def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { val spark = pec.get[SparkSession]() val sc = spark.sparkContext - + val ssc = spark.sqlContext val inDf= in.read() -// inDf.show() -// inDf.schema.printTreeString() + val configuration: Configuration = new Configuration() + var pathStr: String =inDf.take(1)(0).get(0).asInstanceOf[String] + val pathARR: Array[String] = pathStr.split("\\/") + var hdfsUrl:String="" + for (x <- (0 until 3)){ + hdfsUrl+=(pathARR(x) +"/") + } - val rows: Array[Row] = inDf.collect() - for (i <- 0 until rows.size) { + configuration.set("fs.defaultFS",hdfsUrl) + var fs: FileSystem = FileSystem.get(configuration) - // /ftpBioSample1/biosample.xml - val sourceFile = rows(i)(0).toString - println("++++++++++++++++++++++++++++++++++++++++++++++++++"+sourceFile) + var hdfsPathJsonCache:String = "" + + var fdosOut: FSDataOutputStream = null + var bisIn: BufferedInputStream =null + + var count = 0 + var nameNum = 0 + inDf.collect().foreach(row => { + pathStr = row.get(0).asInstanceOf[String] var line: String = null var xml = "" - val br: BufferedReader = new BufferedReader(new FileReader(sourceFile)) + var fdis: FSDataInputStream = fs.open(new Path(pathStr)) + val br: BufferedReader = new BufferedReader(new InputStreamReader(fdis)) br.readLine() br.readLine() - var count = 0 - while ((line = br.readLine()) != null) { + + while ((line = br.readLine()) != null && line!= null) { xml = xml + line if (line.indexOf("") != -1) { count = count + 1 - val doc: JSONObject = XML.toJSONObject(xml).getJSONObject(docName) - val accession = doc.optString("accession") + val accession = doc.optString("accession") // Attributes val attrs: String = doc.optString("Attributes") if (attrs.equals("")) { doc.remove("Attributes") } - // Links val links: String = doc.optString("Links") if (links != null) { @@ -70,9 +75,7 @@ class BioSampleParse extends ConfigurableStop{ doc.remove("Links") } } - val bio = new BioProject - // owner.name val owner = doc.optString("Owner") if (owner != null) { @@ -82,57 +85,85 @@ class BioSampleParse extends ConfigurableStop{ bio.convertConcrete2KeyVal(singleOwner, "Name") } } - // Models.Model val models = doc.optJSONObject("Models") if (models != null) { bio.convertConcrete2KeyVal(models, "Models") } + if (count%200000 == 1 ){ + nameNum += 1 + hdfsPathJsonCache = hdfsUrl+cachePath+"/biosampleCache/"+"biosample"+nameNum+".json" + val path: Path = new Path(hdfsPathJsonCache) + if(fs.exists(path)){ + fs.delete(path) + } + fs.create(path).close() + fdosOut = fs.append(path) -// if (count < 20) { - println("#####################################" + count) - // 加载 json 字符串 为 df - val jsonRDD = spark.sparkContext.makeRDD(doc.toString() :: Nil) - val jsonDF = spark.read.json(jsonRDD) - //jsonDF.show() - // jsonDF.schema.printTreeString() - - val options = Map("es.index.auto.create" -> "true", - "es.mapping.id" -> "accession", - "es.nodes" -> es_nodes, "es.port" -> port) - - // df 写入 es - EsSparkSQL.saveToEs(jsonDF, s"${es_index}/${es_type}", options) -// } + bisIn = new BufferedInputStream(new ByteArrayInputStream(("[" + doc.toString).getBytes())) + val buff: Array[Byte] = new Array[Byte](1048576) + var num: Int = bisIn.read(buff) + while (num != -1) { + fdosOut.write(buff, 0, num) + fdosOut.flush() + num = bisIn.read(buff) + } + bisIn.close() + } else if (count%200000 ==0){ + bisIn = new BufferedInputStream(new ByteArrayInputStream((","+doc.toString + "]").getBytes())) + val buff: Array[Byte] = new Array[Byte](1048576) + var num: Int = bisIn.read(buff) + while (num != -1) { + fdosOut.write(buff, 0, num) + fdosOut.flush() + num = bisIn.read(buff) + } + fdosOut.flush() + fdosOut.close() + bisIn.close() + } else { + bisIn = new BufferedInputStream(new ByteArrayInputStream(("," + doc.toString).getBytes())) + val buff: Array[Byte] = new Array[Byte](1048576) + var num: Int = bisIn.read(buff) + while (num != -1) { + fdosOut.write(buff, 0, num) + fdosOut.flush() + num = bisIn.read(buff) + } + bisIn.close() + } xml = "" } } - + }) + if (count%200000 != 0){ + bisIn = new BufferedInputStream(new ByteArrayInputStream(("]").getBytes())) + val buff: Array[Byte] = new Array[Byte](1048576) + var num: Int = bisIn.read(buff) + while (num != -1) { + fdosOut.write(buff, 0, num) + fdosOut.flush() + num = bisIn.read(buff) + } + fdosOut.flush() + bisIn.close() } + fdosOut.close() + println("start parser HDFSjsonFile") + val df: DataFrame = ssc.read.json(hdfsUrl+cachePath+"/biosampleCache/") + out.write(df) } def setProperties(map: Map[String, Any]): Unit = { - es_nodes=MapUtil.get(map,key="es_nodes").asInstanceOf[String] - port=MapUtil.get(map,key="port").asInstanceOf[String] - es_index=MapUtil.get(map,key="es_index").asInstanceOf[String] - es_type=MapUtil.get(map,key="es_type").asInstanceOf[String] + cachePath=MapUtil.get(map,key="cachePath").asInstanceOf[String] } override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("es_nodes").defaultValue("").required(true) - val port = new PropertyDescriptor().name("port").displayName("port").defaultValue("").required(true) - val es_index = new PropertyDescriptor().name("es_index").displayName("es_index").defaultValue("").required(true) - val es_type = new PropertyDescriptor().name("es_type").displayName("es_type").defaultValue("").required(true) - - - descriptor = es_nodes :: descriptor - descriptor = port :: descriptor - descriptor = es_index :: descriptor - descriptor = es_type :: descriptor - + val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").defaultValue("").required(true) + descriptor = cachePath :: descriptor descriptor } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/GenBankParse.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/GenBankParse.scala index 5ba09ca..3957e81 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/GenBankParse.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/GenBankParse.scala @@ -1,142 +1,121 @@ package cn.piflow.bundle.microorganism import java.io._ -import java.text.ParseException -import java.util.ArrayList + import cn.piflow.bundle.microorganism.util.{CustomIOTools, Process} import cn.piflow.conf.bean.PropertyDescriptor -import cn.piflow.conf.util.{ImageUtil, MapUtil} +import cn.piflow.conf.util.ImageUtil import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} -import org.apache.spark.sql.{DataFrame, Row, SparkSession} -import org.biojava.bio.BioException -import org.elasticsearch.spark.sql.EsSparkSQL +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, FileSystem, Path} +import org.apache.spark.sql.{DataFrame, SparkSession} import org.json.JSONObject class GenBankParse extends ConfigurableStop{ val authorEmail: String = "ygang@cnic.cn" - val description: String = " Parse genbank date put to elasticSearch" + val description: String = " Parsing GenBank type data" val inportList: List[String] = List(PortEnum.DefaultPort.toString) - val outportList: List[String] = List(PortEnum.NonePort.toString) + val outportList: List[String] = List(PortEnum.DefaultPort.toString) - var es_nodes:String = _ //es的节点,多个用逗号隔开 - var port:String= _ //es的端口好 - var es_index:String = _ //es的索引 - var es_type:String = _ //es的类型 - def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { val spark = pec.get[SparkSession]() val sc = spark.sparkContext + val inDf= in.read() - val inDf = in.read() - //inDf.show() - println("++++++++++++++++++++++++++++++++++++++++++++++++++001") - println(inDf.count()) - inDf.schema.printTreeString() + val configuration: Configuration = new Configuration() + var pathStr: String =inDf.take(1)(0).get(0).asInstanceOf[String] + val pathARR: Array[String] = pathStr.split("\\/") + var hdfsUrl:String="" + for (x <- (0 until 3)){ + hdfsUrl+=(pathARR(x) +"/") + } - var listSeq = new ArrayList[ArrayList[String]] + configuration.set("fs.defaultFS",hdfsUrl) + var fs: FileSystem = FileSystem.get(configuration) - val rows: Array[Row] = inDf.collect() - try { - for (i <- 0 until rows.size) { + val hdfsPathTemporary:String = hdfsUrl+"/microoCache/genbank/genbankcach.json" + val path: Path = new Path(hdfsPathTemporary) + if(fs.exists(path)){ + fs.delete(path) + } + fs.create(path).close() - val sourceFile = rows(i)(0) - println("++++++++++++++++++++++++++++++++++++++++++++++++++002" + sourceFile) - // 字节数组反序列化 为 ByteArrayInputStream - val bis:ByteArrayInputStream=new ByteArrayInputStream(sourceFile.asInstanceOf[Array[Byte]]) - //val fileinputStream = new FileInputStream(sourceFile) - val br = new BufferedReader(new InputStreamReader(bis)) + var fdosOut: FSDataOutputStream = fs.append(path) + var jsonStr: String ="" + var bisIn: BufferedInputStream =null - // 解析seq 文件 的字节流 - val sequenceIterator = CustomIOTools.IOTools.readGenbankDNA(br, null) - var doc: JSONObject = null - var count = 0 - while (sequenceIterator.hasNext) { - var listJson = new ArrayList[String] - doc = new JSONObject() - try { - var seq = sequenceIterator.nextRichSequence() + inDf.collect().foreach(row=>{ + pathStr = row.get(0).asInstanceOf[String] + var fdis: FSDataInputStream = fs.open(new Path(pathStr)) + val br: BufferedReader = new BufferedReader(new InputStreamReader(fdis)) - Process.processSingleSequence(seq, doc) - // json 字符串 - listJson.add(doc.toString()) - // 序列号 CP009630 - listJson.add(seq.getAccession) + val sequenceIterator = CustomIOTools.IOTools.readGenbankDNA(br, null) - listSeq.add(listJson) + var doc: JSONObject = null + var count = 0 + while (sequenceIterator.hasNext) { + count += 1 + doc = new JSONObject - } - catch { - case e: BioException => - e.getMessage - case e: ParseException => - e.printStackTrace() - } + val seq = sequenceIterator.nextRichSequence() + + Process.processSingleSequence(seq, doc) + + if (count == 1) { + bisIn = new BufferedInputStream(new ByteArrayInputStream(("[" + doc.toString).getBytes())) + } else { + bisIn = new BufferedInputStream(new ByteArrayInputStream(("," + doc.toString).getBytes())) } + + val buff: Array[Byte] = new Array[Byte](1048576) + var num: Int = bisIn.read(buff) + while (num != -1) { + fdosOut.write(buff, 0, num) + fdosOut.flush() + num = bisIn.read(buff) + } + + fdosOut.flush() + bisIn = null + } - } catch { - case e: FileNotFoundException => - e.printStackTrace() - case e: IOException => - e.printStackTrace() + }) + + bisIn = new BufferedInputStream(new ByteArrayInputStream(("]").getBytes())) + val buff: Array[Byte] = new Array[Byte](1048576) + + var num: Int = bisIn.read(buff) + while (num != -1) { + fdosOut.write(buff, 0, num) + fdosOut.flush() + num = bisIn.read(buff) } - var jsonDF: DataFrame = null - for (i <- 0 until listSeq.size()) { + fdosOut.flush() + fdosOut.close() - println("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$" + i) - // - println(listSeq.get(i).size()) + println("start parser HDFSjsonFile") + val df: DataFrame = spark.read.json(hdfsPathTemporary) - // seq 文件中的 json 字符串 - val jsonString = listSeq.get(i).get(0) - - // 序列号 CP009630 - val esId = listSeq.get(i).get(1).toString - println(esId) - - // 加载 json 字符串 为 df - val jsonRDD = spark.sparkContext.makeRDD(jsonString.toString() :: Nil) - jsonDF = spark.read.json(jsonRDD) - //jsonDF.show() - // jsonDF.schema.printTreeString() + df.schema.printTreeString() + out.write(df) - val options = Map("es.index.auto.create"-> "true", - // "es.mapping.id"->"Accession", - "es.nodes"->es_nodes,"es.port"->port) - - // df 写入 es - EsSparkSQL.saveToEs(jsonDF,s"${es_index}/${es_type}",options) - - } } def setProperties(map: Map[String, Any]): Unit = { - es_nodes=MapUtil.get(map,key="es_nodes").asInstanceOf[String] - port=MapUtil.get(map,key="port").asInstanceOf[String] - es_index=MapUtil.get(map,key="es_index").asInstanceOf[String] - es_type=MapUtil.get(map,key="es_type").asInstanceOf[String] } override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("es_nodes").defaultValue("").required(true) - val port = new PropertyDescriptor().name("port").displayName("port").defaultValue("").required(true) - val es_index = new PropertyDescriptor().name("es_index").displayName("es_index").defaultValue("").required(true) - val es_type = new PropertyDescriptor().name("es_type").displayName("es_type").defaultValue("").required(true) - - descriptor = es_nodes :: descriptor - descriptor = port :: descriptor - descriptor = es_index :: descriptor - descriptor = es_type :: descriptor descriptor } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/GoDataParse.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/GoDataParse.scala new file mode 100644 index 0000000..1329d11 --- /dev/null +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/GoDataParse.scala @@ -0,0 +1,156 @@ +package cn.piflow.bundle.microorganism + +import java.io._ +import java.util.Iterator +import java.util.regex.{Matcher, Pattern} + +import cn.piflow.conf.bean.PropertyDescriptor +import cn.piflow.conf.util.{ImageUtil, MapUtil} +import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup} +import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, FileSystem, Path} +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.json.JSONObject + + +class GoDataParse extends ConfigurableStop{ + val authorEmail: String = "ygang@cnic.cn" + val description: String = "Parsing Go type data" + val inportList: List[String] = List(PortEnum.DefaultPort.toString) + val outportList: List[String] = List(PortEnum.DefaultPort.toString) + + var cachePath:String = _ + + var tv:Pattern = Pattern.compile("(\\S+):\\s(.+)") + def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { + val spark = pec.get[SparkSession]() + val sc = spark.sparkContext + val inDf= in.read() + + val configuration: Configuration = new Configuration() + var pathStr: String =inDf.take(1)(0).get(0).asInstanceOf[String] + val pathARR: Array[String] = pathStr.split("\\/") + var hdfsUrl:String="" + for (x <- (0 until 3)){ + hdfsUrl+=(pathARR(x) +"/") + } + + configuration.set("fs.defaultFS",hdfsUrl) + var fs: FileSystem = FileSystem.get(configuration) + + val hdfsPathJsonCache = hdfsUrl+cachePath+"/godataCache/godata.json" + + val path: Path = new Path(hdfsPathJsonCache) + if(fs.exists(path)){ + fs.delete(path) + } + fs.create(path).close() + + var fdosOut: FSDataOutputStream = fs.append(path) + var jsonStr: String ="" + var bisIn: BufferedInputStream =null + + inDf.collect().foreach(row => { + pathStr = row.get(0).asInstanceOf[String] + var fdis: FSDataInputStream = fs.open(new Path(pathStr)) + val br: BufferedReader = new BufferedReader(new InputStreamReader(fdis)) + + var line: String = null + var xml = "" + var i =0 + while (i<30){ + br.readLine() + i+=1 + } + var obj = new JSONObject() + var count= 0 + while ((line = br.readLine()) !=null && line !=null ){ + val m: Matcher = tv.matcher(line) + if (line.startsWith("[")){ + if (line .equals("[Term]")){ + obj.append("stanza_name","Term") + } else if (line.equals("[Typedef]")){ + obj.append("stanza_name","Typedef") + } else if (line.equals("[Instance]")){ + obj.append("stanza_name","Instance") + } + } else if (m.matches()){ + obj.append(m.group(1),m.group(2)) + } else if ( line.equals("")){ + val keyIterator: Iterator[String] = obj.keys() + while (keyIterator.hasNext){ + val key = keyIterator.next() + var value = "" + for (i <- 0 until obj.getJSONArray(key).length() ){ + value += (";" + obj.getJSONArray(key).get(i).toString) + } + obj.put(key,value.substring(1)) + } + count += 1 + if (count ==1 ) { + bisIn = new BufferedInputStream(new ByteArrayInputStream(("[" + obj.toString).getBytes())) + } + else { + bisIn = new BufferedInputStream(new ByteArrayInputStream(("," + obj.toString).getBytes())) + } + val buff: Array[Byte] = new Array[Byte](1048576) + var num: Int = bisIn.read(buff) + while (num != -1) { + fdosOut.write(buff, 0, num) + fdosOut.flush() + num = bisIn.read(buff) + } + fdosOut.flush() + bisIn = null + + obj= new JSONObject() + } + } + }) + + bisIn = new BufferedInputStream(new ByteArrayInputStream(("]").getBytes())) + val buff: Array[Byte] = new Array[Byte](1048576) + + var num: Int = bisIn.read(buff) + while (num != -1) { + fdosOut.write(buff, 0, num) + fdosOut.flush() + num = bisIn.read(buff) + } + + fdosOut.flush() + fdosOut.close() + + println("start parser HDFSjsonFile") + val df: DataFrame = spark.read.json(hdfsPathJsonCache) + + df.schema.printTreeString() + out.write(df) + + } + def setProperties(map: Map[String, Any]): Unit = { + cachePath=MapUtil.get(map,key="cachePath").asInstanceOf[String] + } + + override def getPropertyDescriptor(): List[PropertyDescriptor] = { + var descriptor : List[PropertyDescriptor] = List() + val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").defaultValue("").required(true) + descriptor = cachePath :: descriptor + descriptor + } + + override def getIcon(): Array[Byte] = { + ImageUtil.getImage("microorganism/png/Gene_Ontology.png") + } + + override def getGroup(): List[String] = { + List(StopGroup.MicroorganismGroup.toString) + } + + def initialize(ctx: ProcessContext): Unit = { + + } + + +} diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/GoldDataParse.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/GoldDataParse.scala index 0e36263..0e0e1a9 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/GoldDataParse.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/GoldDataParse.scala @@ -1,72 +1,37 @@ package cn.piflow.bundle.microorganism + import cn.piflow.conf.bean.PropertyDescriptor -import cn.piflow.conf.util.{ImageUtil, MapUtil} +import cn.piflow.conf.util.ImageUtil import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} -import org.apache.spark.sql.{Row, SparkSession} -import org.elasticsearch.spark.sql.EsSparkSQL +import org.apache.spark.sql. SparkSession class GoldDataParse extends ConfigurableStop{ val authorEmail: String = "ygang@cnic.cn" - val description: String = "Parse Gold date put to elasticSearch." + val description: String = "Parsing GoldData type data" val inportList: List[String] = List(PortEnum.DefaultPort.toString) - val outportList: List[String] = List(PortEnum.NonePort.toString) + val outportList: List[String] = List(PortEnum.DefaultPort.toString) - var es_nodes:String = _ //es的节点,多个用逗号隔开 - var port:String= _ //es的端口好 - var es_index:String = _ //es的索引 - var es_type:String = _ //es的类型 - def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { val spark = pec.get[SparkSession]() val sc = spark.sparkContext val inDf = in.read() -// inDf.show() -// println(inDf.count()) -// inDf.schema.printTreeString() - val rows: Array[Row] = inDf.collect() - for (i<- 0 until rows.length){ - // 加载 json 字符串 为 df - val jsonRDD = spark.sparkContext.makeRDD(rows(i)(0).toString :: Nil) - val jsonDF = spark.read.json(jsonRDD) - //jsonDF.show() + out.write(inDf) - - val options = Map("es.index.auto.create"-> "true", - "es.mapping.id"->"gold_id", - "es.nodes"->es_nodes,"es.port"->port) - - // df 写入 es - EsSparkSQL.saveToEs(jsonDF,s"${es_index}/${es_type}",options) - - } } def setProperties(map: Map[String, Any]): Unit = { - es_nodes=MapUtil.get(map,key="es_nodes").asInstanceOf[String] - port=MapUtil.get(map,key="port").asInstanceOf[String] - es_index=MapUtil.get(map,key="es_index").asInstanceOf[String] - es_type=MapUtil.get(map,key="es_type").asInstanceOf[String] + } override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("es_nodes").defaultValue("").required(true) - val port = new PropertyDescriptor().name("port").displayName("port").defaultValue("").required(true) - val es_index = new PropertyDescriptor().name("es_index").displayName("es_index").defaultValue("").required(true) - val es_type = new PropertyDescriptor().name("es_type").displayName("es_type").defaultValue("").required(true) - - - descriptor = es_nodes :: descriptor - descriptor = port :: descriptor - descriptor = es_index :: descriptor - descriptor = es_type :: descriptor descriptor } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/InterprodataParse.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/InterprodataParse.scala new file mode 100644 index 0000000..4117087 --- /dev/null +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/InterprodataParse.scala @@ -0,0 +1,145 @@ +package cn.piflow.bundle.microorganism + +import java.io._ + +import cn.piflow.conf.bean.PropertyDescriptor +import cn.piflow.conf.util.{ImageUtil, MapUtil} +import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup} +import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, FileSystem, Path} +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.json.{JSONObject, XML} + + +class InterprodataParse extends ConfigurableStop{ + val authorEmail: String = "ygang@cnic.cn" + val description: String = "Parsing Interpro type data" + val inportList: List[String] = List(PortEnum.DefaultPort.toString) + val outportList: List[String] = List(PortEnum.DefaultPort.toString) + var cachePath:String = _ + + def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { + val spark = pec.get[SparkSession]() + val sc = spark.sparkContext + val inDf= in.read() + + val configuration: Configuration = new Configuration() + var pathStr: String =inDf.take(1)(0).get(0).asInstanceOf[String] + val pathARR: Array[String] = pathStr.split("\\/") + var hdfsUrl:String="" + for (x <- (0 until 3)){ + hdfsUrl+=(pathARR(x) +"/") + } + configuration.set("fs.defaultFS",hdfsUrl) + var fs: FileSystem = FileSystem.get(configuration) + + val hdfsPathJsonCache = hdfsUrl+cachePath+"/interproDataCatch/interpro.json" + val path: Path = new Path(hdfsPathJsonCache) + if(fs.exists(path)){ + fs.delete(path) + } + fs.create(path).close() + + var fdosOut: FSDataOutputStream = fs.append(path) + var jsonStr: String ="" + var bisIn: BufferedInputStream =null + + inDf.collect().foreach(row => { + pathStr = row.get(0).asInstanceOf[String] + + var fdis: FSDataInputStream = fs.open(new Path(pathStr)) + val br: BufferedReader = new BufferedReader(new InputStreamReader(fdis)) + var line: String = null + var xml = "" + var i = 0 + while (i<26){ + br.readLine() + i+=1 + } + var count = 0 + var abstraction:String = null + var doc: JSONObject = null + while ((line = br.readLine()) != null && line !=null ){ + xml += line + if (line .indexOf("") != -1){ + count += 1 + doc = XML.toJSONObject(xml).getJSONObject("interpro") + + val id = doc.getString("id") + if (doc.has("abstract")){ + abstraction = doc.get("abstract").toString + doc.put("abstract",abstraction) + } + if (doc.get("pub_list") == ""){ + doc.remove("pub_list") + } + + if (count ==1 ) { + bisIn = new BufferedInputStream(new ByteArrayInputStream(("[" + doc.toString).getBytes())) + } + else { + bisIn = new BufferedInputStream(new ByteArrayInputStream(("," + doc.toString).getBytes())) + } + val buff: Array[Byte] = new Array[Byte](1048576) + var num: Int = bisIn.read(buff) + while (num != -1) { + fdosOut.write(buff, 0, num) + fdosOut.flush() + num = bisIn.read(buff) + } + fdosOut.flush() + bisIn = null + xml = "" + + } + } + }) + + bisIn = new BufferedInputStream(new ByteArrayInputStream(("]").getBytes())) + val buff: Array[Byte] = new Array[Byte](1048576) + + var num: Int = bisIn.read(buff) + while (num != -1) { + fdosOut.write(buff, 0, num) + fdosOut.flush() + num = bisIn.read(buff) + } + + fdosOut.flush() + fdosOut.close() + + println("start parser HDFSjsonFile") + val df: DataFrame = spark.read.json(hdfsPathJsonCache) + + df.schema.printTreeString() + + out.write(df) + + } + + def setProperties(map: Map[String, Any]): Unit = { + cachePath=MapUtil.get(map,key="cachePath").asInstanceOf[String] + } + + override def getPropertyDescriptor(): List[PropertyDescriptor] = { + var descriptor : List[PropertyDescriptor] = List() + val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").defaultValue("").required(true) + descriptor = cachePath :: descriptor + descriptor + } + + override def getIcon(): Array[Byte] = { + ImageUtil.getImage("microorganism/png/Interpro.png") + } + + override def getGroup(): List[String] = { + List(StopGroup.MicroorganismGroup.toString) + } + + def initialize(ctx: ProcessContext): Unit = { + + } + + +} diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/TaxonomyParse.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/TaxonomyParse.scala index fd92850..5ad9edf 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/TaxonomyParse.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/TaxonomyParse.scala @@ -1,110 +1,77 @@ package cn.piflow.bundle.microorganism import java.io._ +import java.util.HashMap import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.util.{ImageUtil, MapUtil} import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, FileSystem, Path} import org.apache.spark.sql.{DataFrame, SparkSession} -import org.elasticsearch.spark.sql.EsSparkSQL import org.json.JSONObject + + class TaxonomyParse extends ConfigurableStop{ val authorEmail: String = "ygang@cnic.cn" - val description: String = "Load file from ftp url." + val description: String = "Parsing Taxonomy type data" val inportList: List[String] = List(PortEnum.DefaultPort.toString) val outportList: List[String] = List(PortEnum.NonePort.toString) - - var es_nodes:String = _ //es的节点,多个用逗号隔开 - var port:String= _ //es的端口好 - var es_index:String = _ //es的索引 - var es_type:String = _ //es的类型 + var cachePath:String = _ var filePath:String = _ + var outWriteDF:DataFrame = _ + var nodesDF:DataFrame = _ + var divisionDF:DataFrame = _ + var gencodeDF:DataFrame = _ + var namesDF:DataFrame = _ + var citationsDF:DataFrame = _ def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { val spark = pec.get[SparkSession]() val sc = spark.sparkContext val ssc = spark.sqlContext - println("###############################") + val inDf = in.read() -// val inDf = in.read() -// inDf.show() -// inDf.printSchema() -// val rows: Array[Row] = inDf.collect() -// val pathDir = new File(rows(0)(0).toString).getParent + val configuration: Configuration = new Configuration() + var pathStr: String =inDf.take(1)(0).get(0).asInstanceOf[String] - - - - val path = "/ftpTaxonomy1/1/gencode.dmp" - val pathDir: String = new File(path).getParent - filePath = pathDir + File.separator + "nodes.dm" - - // #########################################------004 ---namese.dmp - if (filePath.endsWith("names.dmp")) { - - val optionsFromEs = Map("es.index.auto.create"-> "true", - "es.nodes.wan.only"->"true", - "es.nodes"->es_nodes,"es.port"->port) - //load data with df from es - val esDf = ssc.read.format("org.elasticsearch.spark.sql").options(optionsFromEs).load(s"${es_index}/${es_type}") - - - val br = new BufferedReader(new FileReader(filePath)) - var line: String = null; - var count = 0 - var divDF :DataFrame = null - while ((line = br.readLine) != null && line != null) { - - val tokens: Array[String] = line.split("\\t\\|\\t") - val doc = new JSONObject() - doc.put("genetic_code_id", tokens(0)) - doc.put("genetic_code_name", tokens(2)) - doc.put("genetic_code_translation_table", tokens(3).trim) - doc.put("genetic_code_start_codons", tokens(4).replace("\t|","").trim) - - if (count==0) { - val jsonRDD = spark.sparkContext.makeRDD(doc.toString :: Nil) - divDF = spark.read.json(jsonRDD) - } else { - val jsonRDD = spark.sparkContext.makeRDD(doc.toString :: Nil) - divDF = spark.read.json(jsonRDD).union(divDF) - } - count = count+1 - } - - // divDF.show() - - val outDf = esDf.join(divDF, Seq("genetic_code_id")). - filter(esDf.col("genetic_code_id") === divDF.col("genetic_code_id")) - - val optionsToEs = Map("es.index.auto.create" -> "true", - "es.mapping.id" -> "tax_id", - "es.nodes" -> es_nodes, - "es.port" -> port) - // df 写入 es - EsSparkSQL.saveToEs(outDf, s"${es_index}/${es_type}", optionsToEs) - println("nodes.dmp--------------->存储成功") - - - filePath = pathDir + File.separator + "names.dmp" + val pathARR: Array[String] = pathStr.split("\\/") + var hdfsUrl:String="" + for (x <- (0 until 3)){ + hdfsUrl+=(pathARR(x) +"/") } + configuration.set("fs.defaultFS",hdfsUrl) + var fs: FileSystem = FileSystem.get(configuration) + var pathDir = "" + for (x <- 0 until pathARR.length-1){ + pathDir+=(pathARR(x) +"/") + } + filePath = pathDir + File.separator + "nodes.dmp" - - - - // #########################################------001 ---nodes.dmp if (filePath.endsWith("nodes.dmp")) { - val br = new BufferedReader(new FileReader(filePath)) + val hdfsPathJsonCache = hdfsUrl+cachePath+"/taxonomyCache/nodes.json" + val path: Path = new Path(hdfsPathJsonCache) + if(fs.exists(path)){ + fs.delete(path) + } + fs.create(path).close() + var fdosOut: FSDataOutputStream = fs.append(path) + var jsonStr: String ="" + var bisIn: BufferedInputStream =null + + + var fdis: FSDataInputStream = fs.open(new Path(filePath.toString)) + val br: BufferedReader = new BufferedReader(new InputStreamReader(fdis)) var line: String = null; - var count =1 - while ((line = br.readLine) != null &&line != null) { -// println(count) + var count =0 + while ((line = br.readLine) != null && line != null) { + count = count+1 val doc = new JSONObject() val tokens: Array[String] = line.split("\\t\\|\\t") doc.put("tax_id", tokens(0)) @@ -114,154 +81,304 @@ class TaxonomyParse extends ConfigurableStop{ doc.put("division_id", tokens(4)) doc.put("genetic_code_id", tokens(6)) doc.put("mitochondrial_genetic_code_id", tokens(8)) -// println(doc) - count = count+1 - if (tokens(0).equals("2492834") ){ - println(tokens(0)) + + if (count == 1) { + bisIn = new BufferedInputStream(new ByteArrayInputStream(("[" + doc.toString).getBytes())) + } else { + bisIn = new BufferedInputStream(new ByteArrayInputStream(("," + doc.toString).getBytes())) + } + val buff: Array[Byte] = new Array[Byte](1048576) + var num: Int = bisIn.read(buff) + while (num != -1) { + fdosOut.write(buff, 0, num) + fdosOut.flush() + num = bisIn.read(buff) } - // 加载 json 字符串 为 df - val jsonRDD = spark.sparkContext.makeRDD(doc.toString :: Nil) - val jsonDF = spark.read.json(jsonRDD) - - val options = Map("es.index.auto.create" -> "true", - "es.mapping.id" -> "tax_id", - "es.nodes" -> es_nodes, "es.port" -> port) - // df 写入 es - EsSparkSQL.saveToEs(jsonDF, s"${es_index}/${es_type}", options) - println("nodes.dmp--------------->存储成功") - + fdosOut.flush() + bisIn.close() } + bisIn = new BufferedInputStream(new ByteArrayInputStream(("]").getBytes())) + val buff: Array[Byte] = new Array[Byte](1048576) + + var num: Int = bisIn.read(buff) + while (num != -1) { + fdosOut.write(buff, 0, num) + fdosOut.flush() + num = bisIn.read(buff) + } + fdosOut.flush() + bisIn.close() + fdosOut.close() + + nodesDF = spark.read.json(hdfsPathJsonCache) filePath = pathDir + File.separator + "division.dmp" } - // #########################################------002 ---division.dmp - else if (filePath.endsWith("division.dmp")) { + if (filePath.endsWith("division.dmp")){ + val hdfsPathJsonCache = hdfsUrl+cachePath+"/taxonomyCache/division.json" - val options = Map("es.index.auto.create"-> "true", - "es.nodes.wan.only"->"true", - "es.nodes"->es_nodes,"es.port"->port) + val path: Path = new Path(hdfsPathJsonCache) + if(fs.exists(path)){ + fs.delete(path) + } + fs.create(path).close() + var fdosOut: FSDataOutputStream = fs.append(path) + var jsonStr: String ="" + var bisIn: BufferedInputStream =null - //load data with df from es - val esDf = ssc.read.format("org.elasticsearch.spark.sql").options(options).load(s"${es_index}/${es_type}") - - val br = new BufferedReader(new FileReader(filePath)) + var fdis: FSDataInputStream = fs.open(new Path(filePath.toString)) + val br: BufferedReader = new BufferedReader(new InputStreamReader(fdis)) var line: String = null; var count = 0 - var divDF :DataFrame = null - while ((line = br.readLine) != null && line != null) { + while ((line = br.readLine) != null && line != null ) { + count=count+1 val tokens: Array[String] = line.split("\\t\\|\\t") val doc = new JSONObject() doc.put("division_id", tokens(0)) doc.put("dive", tokens(1)) doc.put("diname", tokens(2)) - if (count==0) { - val jsonRDD = spark.sparkContext.makeRDD(doc.toString :: Nil) - divDF = spark.read.json(jsonRDD) + + if (count == 1) { + bisIn = new BufferedInputStream(new ByteArrayInputStream(("[" + doc.toString).getBytes())) } else { - val jsonRDD = spark.sparkContext.makeRDD(doc.toString :: Nil) - divDF = spark.read.json(jsonRDD).union(divDF) + bisIn = new BufferedInputStream(new ByteArrayInputStream(("," + doc.toString).getBytes())) } - count = count+1 + val buff: Array[Byte] = new Array[Byte](1048576) + var num: Int = bisIn.read(buff) + while (num != -1) { + fdosOut.write(buff, 0, num) + fdosOut.flush() + num = bisIn.read(buff) + } + fdosOut.flush() + bisIn = null } + bisIn = new BufferedInputStream(new ByteArrayInputStream(("]").getBytes())) + val buff: Array[Byte] = new Array[Byte](1048576) - val outDf = esDf.join(divDF, Seq("division_id")).filter(esDf.col("division_id") === divDF.col("division_id")) + var num: Int = bisIn.read(buff) + while (num != -1) { + fdosOut.write(buff, 0, num) + fdosOut.flush() + num = bisIn.read(buff) + } + fdosOut.flush() + fdosOut.close() - val options1 = Map("es.index.auto.create" -> "true", - "es.mapping.id" -> "tax_id", - "es.nodes" -> es_nodes, - "es.port" -> port) - // df 写入 es - EsSparkSQL.saveToEs(outDf, s"${es_index}/${es_type}", options1) - println("nodes.dmp--------------->存储成功") + divisionDF = spark.read.json(hdfsPathJsonCache) + outWriteDF=nodesDF.join(divisionDF, Seq("division_id")) filePath = pathDir + File.separator + "gencode.dmp" } - - // #########################################------003 ---gencode.dmp - else if (filePath.endsWith("gencode.dmp")) { - - val optionsFromEs = Map("es.index.auto.create"-> "true", - "es.nodes.wan.only"->"true", - "es.nodes"->es_nodes,"es.port"->port) - //load data with df from es - val esDf = ssc.read.format("org.elasticsearch.spark.sql").options(optionsFromEs).load(s"${es_index}/${es_type}") + if (filePath.endsWith("gencode.dmp")){ - val br = new BufferedReader(new FileReader(filePath)) + val hdfsPathJsonCache = hdfsUrl+cachePath+"/taxonomyCache/gencode.json" + val path: Path = new Path(hdfsPathJsonCache) + if(fs.exists(path)){ + fs.delete(path) + } + fs.create(path).close() + var fdosOut: FSDataOutputStream = fs.append(path) + var jsonStr: String ="" + var bisIn: BufferedInputStream =null + + var fdis: FSDataInputStream = fs.open(new Path(filePath.toString)) + val br: BufferedReader = new BufferedReader(new InputStreamReader(fdis)) var line: String = null; var count = 0 - var divDF :DataFrame = null - while ((line = br.readLine) != null && line != null) { - + while ((line = br.readLine) != null && line != null ) { + count += 1 val tokens: Array[String] = line.split("\\t\\|\\t") val doc = new JSONObject() doc.put("genetic_code_id", tokens(0)) - doc.put("genetic_code_name", tokens(2)) + doc.put("genetic_code_name", tokens(2).trim) doc.put("genetic_code_translation_table", tokens(3).trim) doc.put("genetic_code_start_codons", tokens(4).replace("\t|","").trim) - - if (count==0) { - val jsonRDD = spark.sparkContext.makeRDD(doc.toString :: Nil) - divDF = spark.read.json(jsonRDD) + if (count == 1) { + bisIn = new BufferedInputStream(new ByteArrayInputStream(("[" + doc.toString).getBytes())) } else { - val jsonRDD = spark.sparkContext.makeRDD(doc.toString :: Nil) - divDF = spark.read.json(jsonRDD).union(divDF) + bisIn = new BufferedInputStream(new ByteArrayInputStream(("," + doc.toString).getBytes())) } - count = count+1 + val buff: Array[Byte] = new Array[Byte](1048576) + var num: Int = bisIn.read(buff) + while (num != -1) { + fdosOut.write(buff, 0, num) + fdosOut.flush() + num = bisIn.read(buff) + } + fdosOut.flush() + bisIn = null } + bisIn = new BufferedInputStream(new ByteArrayInputStream(("]").getBytes())) + val buff: Array[Byte] = new Array[Byte](1048576) - // divDF.show() + var num: Int = bisIn.read(buff) + while (num != -1) { + fdosOut.write(buff, 0, num) + fdosOut.flush() + num = bisIn.read(buff) + } + fdosOut.flush() + fdosOut.close() - val outDf = esDf.join(divDF, Seq("genetic_code_id")). - filter(esDf.col("genetic_code_id") === divDF.col("genetic_code_id")) - - val optionsToEs = Map("es.index.auto.create" -> "true", - "es.mapping.id" -> "tax_id", - "es.nodes" -> es_nodes, - "es.port" -> port) - // df 写入 es - EsSparkSQL.saveToEs(outDf, s"${es_index}/${es_type}", optionsToEs) - println("nodes.dmp--------------->存储成功") + gencodeDF = spark.read.json(hdfsPathJsonCache) + outWriteDF=outWriteDF.join(gencodeDF, Seq("genetic_code_id")) filePath = pathDir + File.separator + "names.dmp" + } + if (filePath.endsWith("names.dmp")){ + val hdfsPathJsonCache = hdfsUrl+cachePath+"/taxonomyCache/names.json" + val path: Path = new Path(hdfsPathJsonCache) + if(fs.exists(path)){ + fs.delete(path) + } + fs.create(path).close() + var fdosOut: FSDataOutputStream = fs.append(path) + var jsonStr: String ="" + var bisIn: BufferedInputStream =null + + var fdis: FSDataInputStream = fs.open(new Path(filePath.toString)) + val br: BufferedReader = new BufferedReader(new InputStreamReader(fdis)) + var line: String = null + var count = 0 + var pre_tax_id = "1" + var name_key = "" + + var names = new HashMap[String,String]() + + var doc = new JSONObject() + while ((line = br.readLine) != null && line != null ) { + val tokens: Array[String] = line.split("\\t\\|\\t") + name_key = tokens(3).replace("\t|","").trim + + if (tokens(0).equals(pre_tax_id)){ + if (names.containsKey(name_key)){ + names.put(name_key,names.get(name_key).toString+";"+tokens(1)) + } else { + names.put(name_key,tokens(1)) + } + } else { + count += 1 + names.put("tax_id",pre_tax_id) + + doc.put("",names) + val doc1 = doc.toString().substring(0,doc.toString.length-1) + jsonStr = doc1.substring(4,doc1.length) + + pre_tax_id = tokens(0) + names = new HashMap[String,String]() + names.put(name_key,tokens(1)) + + if (count == 1) { + bisIn = new BufferedInputStream(new ByteArrayInputStream(("[" + jsonStr).getBytes())) + } else { + bisIn = new BufferedInputStream(new ByteArrayInputStream(("," + jsonStr).getBytes())) + } + val buff: Array[Byte] = new Array[Byte](1048576) + var num: Int = bisIn.read(buff) + while (num != -1) { + fdosOut.write(buff, 0, num) + fdosOut.flush() + num = bisIn.read(buff) + } + fdosOut.flush() + bisIn = null + } + } + names.put("tax_id",pre_tax_id) + doc.put("",names) + val doc1 = doc.toString().substring(0,doc.toString.length-1) + jsonStr = doc1.substring(4,doc1.length) + bisIn = new BufferedInputStream(new ByteArrayInputStream(("," +jsonStr+ "]").getBytes())) + val buff: Array[Byte] = new Array[Byte](1048576) + + var num: Int = bisIn.read(buff) + while (num != -1) { + fdosOut.write(buff, 0, num) + fdosOut.flush() + num = bisIn.read(buff) + } + fdosOut.flush() + fdosOut.close() + + namesDF = spark.read.json(hdfsPathJsonCache) + + outWriteDF = outWriteDF.join(namesDF,Seq("tax_id")) + outWriteDF.schema.printTreeString() + + filePath = pathDir + File.separator + "citations.dmp" + } + + if (filePath.endsWith("citations.dmp")){ + var fdis: FSDataInputStream = fs.open(new Path(filePath.toString)) + val br: BufferedReader = new BufferedReader(new InputStreamReader(fdis)) + var line: String = null + var count = 0 + while ((line = br.readLine) != null && line != null ) { + count += 1 + val tokens: Array[String] = line.split("\\t\\|\\t") + if (tokens.size > 6) { + val pumed_id = tokens(2) + val medline_id = tokens(3) + val tar_ids = tokens(6).replace("\t|", "").trim + + var shouldUpdate_pubmed: Boolean = true + var shouldUpdate_medline: Boolean = true + var pumed_ids = null + var medline_ids = null + + if (!tar_ids.isEmpty) { + if (pumed_id.equals("0") && medline_id.equals("0")) { + + } else if (pumed_id.equals("0")) { + shouldUpdate_medline = true + shouldUpdate_pubmed = false + } else if (medline_id.equals("0")) { + shouldUpdate_pubmed = true + shouldUpdate_medline = false + } else { + shouldUpdate_pubmed = true + shouldUpdate_medline = true + } + + } + + } + } + } + + outWriteDF.schema.printTreeString() + outWriteDF.show() + println("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$"+outWriteDF.count()) + + out.write(outWriteDF) } - def processNodes(index:String,types:String)={ } + def setProperties(map: Map[String, Any]): Unit = { - es_nodes=MapUtil.get(map,key="es_nodes").asInstanceOf[String] - port=MapUtil.get(map,key="port").asInstanceOf[String] - es_index=MapUtil.get(map,key="es_index").asInstanceOf[String] - es_type=MapUtil.get(map,key="es_type").asInstanceOf[String] + cachePath=MapUtil.get(map,key="cachePath").asInstanceOf[String] } override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("es_nodes").defaultValue("").required(true) - val port = new PropertyDescriptor().name("port").displayName("port").defaultValue("").required(true) - val es_index = new PropertyDescriptor().name("es_index").displayName("es_index").defaultValue("").required(true) - val es_type = new PropertyDescriptor().name("es_type").displayName("es_type").defaultValue("").required(true) - - - descriptor = es_nodes :: descriptor - descriptor = port :: descriptor - descriptor = es_index :: descriptor - descriptor = es_type :: descriptor - + val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").defaultValue("").required(true) + descriptor = cachePath :: descriptor descriptor } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/util/ExcelToJson.java b/piflow-bundle/src/main/scala/cn/piflow/bundle/util/ExcelToJson.java index c96fdde..5f79d89 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/util/ExcelToJson.java +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/util/ExcelToJson.java @@ -1,6 +1,10 @@ package cn.piflow.bundle.util; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.poi.hssf.usermodel.HSSFDateUtil; import org.apache.poi.hssf.usermodel.HSSFWorkbook; import org.apache.poi.openxml4j.exceptions.InvalidFormatException; @@ -23,72 +27,62 @@ import java.util.Map; public class ExcelToJson { - public static final String XLSX = ".xlsx"; - public static final String XLS=".xls"; +// public static final String XLSX = ".xlsx"; +// public static final String XLS=".xls"; + + public static final Configuration configuration = new Configuration(); /** * 获取Excel文件(.xls和.xlsx都支持) - * @param file + * @param pathStr * @return 解析excle后的Json数据 * @throws IOException - * @throws FileNotFoundException - * @throws InvalidFormatException */ - public static net.sf.json.JSONArray readExcel(File file) throws FileNotFoundException, IOException, InvalidFormatException { - int res = checkFile(file); - if (res == 0) { - throw new NullPointerException("the file is null."); - }else if (res == 1) { - return readXLSX(file); - }else if (res == 2) { - return readXLS(file); + public static net.sf.json.JSONArray readExcel(String pathStr,String hdfsUrl) throws IOException { + + configuration.set("fs.defaultFS",hdfsUrl); + + if (pathStr.endsWith(".xlsx")) { + return readXLSX(pathStr); + }else { + return readXLS(pathStr); } - throw new IllegalAccessError("the file["+file.getName()+"] is not excel file."); - } - /** - * 判断File文件的类型 - * @param file 传入的文件 - * @return 0-文件为空,1-XLSX文件,2-XLS文件,3-其他文件 - */ - public static int checkFile(File file){ - if (file==null) { - System.out.println("0"); - return 0; - } - String flieName = file.getName(); - if (flieName.endsWith(XLSX)) { - System.out.println("1"); - return 1; - } - if (flieName.endsWith(XLS)) { - System.out.println("2"); - return 2; - } - return 3; + +// return new net.sf.json.JSONArray(); } + /** * 读取XLSX文件 - * @param file + * @param pathStr * @return - * @throws IOException - * @throws InvalidFormatException */ - public static net.sf.json.JSONArray readXLSX(File file) throws InvalidFormatException, IOException{ - Workbook book = new XSSFWorkbook(file); + public static net.sf.json.JSONArray readXLSX(String pathStr) throws IOException{ + + FileSystem fs = FileSystem.get(configuration); + FSDataInputStream fdis = fs.open(new Path(pathStr)); + + System.out.println("xlsx"); + + Workbook book = new XSSFWorkbook(fdis); Sheet sheet = book.getSheetAt(0); return read(sheet, book); } /** * 读取XLS文件 - * @param file + * @param pathStr * @return * @throws IOException - * @throws FileNotFoundException */ - public static net.sf.json.JSONArray readXLS(File file) throws FileNotFoundException, IOException{ - POIFSFileSystem poifsFileSystem = new POIFSFileSystem(new FileInputStream(file)); + public static net.sf.json.JSONArray readXLS(String pathStr) throws IOException{ + + FileSystem fs = FileSystem.get(configuration); + FSDataInputStream fdis = fs.open(new Path(pathStr)); + + System.out.println("xls"); + + POIFSFileSystem poifsFileSystem = new POIFSFileSystem(fdis); Workbook book = new HSSFWorkbook(poifsFileSystem); Sheet sheet = book.getSheetAt(0); return read(sheet, book);