diff --git a/piflow-bundle/src/main/resources/microorganism/PDB.json b/piflow-bundle/src/main/resources/microorganism/PDB.json index 3600cef..409b072 100644 --- a/piflow-bundle/src/main/resources/microorganism/PDB.json +++ b/piflow-bundle/src/main/resources/microorganism/PDB.json @@ -6,28 +6,41 @@ { "uuid":"1111", "name":"SelectFilesByName", - "bundle":"cn.piflow.bundle.ftp.SelectFilesByName", + "bundle":"cn.piflow.bundle.hdfs.SelectFilesByName", "properties":{ "HDFSUrl":"hdfs://10.0.88.70:9000", - "HDFSPath":"/yqd/weishengwu/PDB/", + "HDFSPath":"/yqd/ftp/pdb/07/", "selectionConditions":".*.ent.gz" } },{ "uuid":"2222", "name":"UnzipFilesOnHDFS", - "bundle":"cn.piflow.bundle.http.UnzipFilesOnHDFS", + "bundle":"cn.piflow.bundle.hdfs.UnzipFilesOnHDFS", "properties":{ "isCustomize":"false", "hdfsUrl":"hdfs://10.0.88.70:9000", - "filePath":"/yqd/weishengwu/PDB/", - "savePath":"" + "filePath":"", + "savePath":"/yqd/test/pdb/pdb1/" } },{ "uuid":"3333", "name":"PDBParser", - "bundle":"cn.piflow.bundle.microorganism.PDBParser", + "bundle":"cn.piflow.bundle.microorganism.PDBData", "properties":{ + "cachePath":"/yqd/test/pdb/" } + }, + { + "uuid":"4444", + "name":"putEs", + "bundle":"cn.piflow.bundle.es.PutEs", + "properties":{ + "es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72", + "es_port": "9200", + "es_index": "test_17", + "es_type": "test_17" + } + } ], "paths":[ @@ -41,6 +54,11 @@ "outport":"", "inport":"", "to":"PDBParser" + },{ + "from":"PDBParser", + "outport":"", + "inport":"", + "to":"putEs" } ] } diff --git a/piflow-bundle/src/main/resources/microorganism/bioSample.json b/piflow-bundle/src/main/resources/microorganism/bioSample.json index 5474b1c..abfb834 100644 --- a/piflow-bundle/src/main/resources/microorganism/bioSample.json +++ b/piflow-bundle/src/main/resources/microorganism/bioSample.json @@ -7,30 +7,30 @@ { "uuid":"0000", "name":"SelectFilesByName", - "bundle":"cn.piflow.bundle.ftp.SelectFilesByName", + "bundle":"cn.piflow.bundle.hdfs.SelectFilesByName", "properties":{ "HDFSUrl":"hdfs://master2.packone:8020", - "HDFSPath":"/microo/", + "HDFSPath":"/yg/microo/", "selectionConditions":".*ample_set.xml.gz" } },{ "uuid":"2222", "name":"UnzipFilesOnHDFS", - "bundle":"cn.piflow.bundle.http.UnzipFilesOnHDFS", + "bundle":"cn.piflow.bundle.hdfs.UnzipFilesOnHDFS", "properties":{ "isCustomize":"false", "filePath":"", "hdfsUrl":"hdfs://master2.packone:8020", - "savePath":"/microo/biosample/biosample/" + "savePath":"/yg/microo/biosample/" } }, { "uuid":"2222", "name":"BioSampleParse", - "bundle":"cn.piflow.bundle.microorganism.BioSampleParse", + "bundle":"cn.piflow.bundle.microorganism.BioSample", "properties":{ - + "cachePath": "/yg/microoCache/" } }, { @@ -38,10 +38,10 @@ "name":"putEs", "bundle":"cn.piflow.bundle.es.PutEs", "properties":{ - "es_nodes":"10.0.88.70,10.0.88.71,10.0.88.72", - "port":"9200", - "es_index":"sample0122", - "es_type":"sample0122" + "es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72", + "es_port": "9200", + "es_index": "test_11", + "es_type": "test_11" } } diff --git a/piflow-bundle/src/main/resources/microorganism/bioProject.json b/piflow-bundle/src/main/resources/microorganism/biopro.json similarity index 66% rename from piflow-bundle/src/main/resources/microorganism/bioProject.json rename to piflow-bundle/src/main/resources/microorganism/biopro.json index d6543e9..d0191c2 100644 --- a/piflow-bundle/src/main/resources/microorganism/bioProject.json +++ b/piflow-bundle/src/main/resources/microorganism/biopro.json @@ -6,22 +6,19 @@ { "uuid":"1111", "name":"SelectFilesByName", - "bundle":"cn.piflow.bundle.ftp.SelectFilesByName", + "bundle":"cn.piflow.bundle.hdfs.SelectFilesByName", "properties":{ "HDFSUrl":"hdfs://master2.packone:8020", - "HDFSPath":"/microo/biproject/", + "HDFSPath":"/yg/microo/biproject/", "selectionConditions":"bioproject.xml" } }, { "uuid":"2222", "name":"BioProjetDataParse", - "bundle":"cn.piflow.bundle.microorganism.BioProjetDataParse", + "bundle":"cn.piflow.bundle.microorganism.BioProjetData", "properties":{ - "es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72", - "port": "9200", - "es_index": "bioproject", - "es_type": "bioprojecttest002" + "cachePath": "/yg/microoCache/" } }, { @@ -30,9 +27,9 @@ "bundle": "cn.piflow.bundle.es.PutEs", "properties": { "es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72", - "port": "9200", - "es_index": "bioproject10", - "es_type": "bioproject10" + "es_port": "9200", + "es_index": "test_10", + "es_type": "test_10" } } ], diff --git a/piflow-bundle/src/main/resources/microorganism/down.json b/piflow-bundle/src/main/resources/microorganism/down.json deleted file mode 100644 index 8c60db1..0000000 --- a/piflow-bundle/src/main/resources/microorganism/down.json +++ /dev/null @@ -1,31 +0,0 @@ -{ - "flow":{ - "name":"test", - "uuid":"1234", - "stops":[ - { - "uuid":"1111", - "name":"LoadFromFtpToHDFS", - "bundle":"cn.piflow.bundle.ftp.LoadFromFtpToHDFS", - "properties":{ - "url_str":"ftp.ebi.ac.uk", - "port":"", - "username":"", - "password":"", - "ftpFile":"/pub/databases/ena/sequence/release/con/rel_con_env_07_r138.dat.gz", - "HDFSUrl":"hdfs://10.0.88.70:9000", - "HDFSPath":"/yqd/weishengwu/embl/", - "isFile":"true" - } - } - ], - "paths":[ - { - "from":"", - "outport":"", - "inport":"", - "to":"" - } - ] - } -} \ No newline at end of file diff --git a/piflow-bundle/src/main/resources/microorganism/embl_parser.json b/piflow-bundle/src/main/resources/microorganism/embl.json similarity index 61% rename from piflow-bundle/src/main/resources/microorganism/embl_parser.json rename to piflow-bundle/src/main/resources/microorganism/embl.json index dc40445..340bdd6 100644 --- a/piflow-bundle/src/main/resources/microorganism/embl_parser.json +++ b/piflow-bundle/src/main/resources/microorganism/embl.json @@ -7,29 +7,30 @@ { "uuid":"1111", "name":"SelectFilesByName", - "bundle":"cn.piflow.bundle.ftp.SelectFilesByName", + "bundle":"cn.piflow.bundle.hdfs.SelectFilesByName", "properties":{ "HDFSUrl":"hdfs://10.0.88.70:9000", - "HDFSPath":"/yqd/weishengwu/embl", - "selectionConditions":".*con_pro_02_r138.dat.gz,.*con_vrl_01_r138.dat.gz,.*pat_phg_01_r138.dat.gz" + "HDFSPath":"/yqd/ftp/embl/", + "selectionConditions":"rel_exp_con_pro_26_r138.dat.gz" } },{ "uuid":"2222", - "name":"UnzipFilesOnHDFS_1", - "bundle":"cn.piflow.bundle.http.UnzipFilesOnHDFS_1", + "name":"UnzipFilesOnHDFS", + "bundle":"cn.piflow.bundle.hdfs.UnzipFilesOnHDFS", "properties":{ "isCustomize":"false", "filePath":"", - "fileType":"gz", - "unzipPath":"" + "hdfsUrl":"hdfs://master2.packone:8020", + "savePath":"/yqd/test/embl/" } }, { "uuid":"3333", "name":"EmblParser", - "bundle":"cn.piflow.bundle.microorganism.EmblParser", + "bundle":"cn.piflow.bundle.microorganism.EmblData", "properties":{ + "cachePath": "/yqd/test/embl/" } },{ "uuid":"4444", @@ -37,9 +38,9 @@ "bundle":"cn.piflow.bundle.es.PutEs", "properties":{ "es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72", - "port": "9200", - "es_index": "embl", - "es_type": "embl" + "es_port": "9200", + "es_index": "test_22", + "es_type": "test_22" } } ], @@ -48,10 +49,10 @@ "from":"SelectFilesByName", "outport":"", "inport":"", - "to":"UnzipFilesOnHDFS_1" + "to":"UnzipFilesOnHDFS" }, { - "from":"UnzipFilesOnHDFS_1", + "from":"UnzipFilesOnHDFS", "outport":"", "inport":"", "to":"EmblParser" diff --git a/piflow-bundle/src/main/resources/microorganism/ensembl.json b/piflow-bundle/src/main/resources/microorganism/ensembl.json index baf2eee..0de0c94 100644 --- a/piflow-bundle/src/main/resources/microorganism/ensembl.json +++ b/piflow-bundle/src/main/resources/microorganism/ensembl.json @@ -9,15 +9,26 @@ "bundle":"cn.piflow.bundle.hdfs.SelectFilesByName", "properties":{ "HDFSUrl":"hdfs://10.0.88.70:9000", - "HDFSPath":"/yqd/ftp/ensembl_gff3/", - "selectionConditions":".*.gff3" + "HDFSPath":"/yqd/ftp/ensembl_gff3/aspergillus_niger/", + "selectionConditions":".*.gff3.gz" } },{ + "uuid":"2222", + "name":"UnzipFilesOnHDFS", + "bundle":"cn.piflow.bundle.hdfs.UnzipFilesOnHDFS", + "properties":{ + "isCustomize":"false", + "filePath":"", + "hdfsUrl":"hdfs://10.0.88.70:9000", + "savePath":"/yqd/test/ensembl/" + } + }, + { "uuid":"3333", "name":"Ensembl_gff3Parser", - "bundle":"cn.piflow.bundle.microorganism.Ensembl_gff3Parser", + "bundle":"cn.piflow.bundle.microorganism.Ensembl", "properties":{ - + "cachePath":"/yqd/test/ensembl/" } },{ "uuid":"4444", @@ -25,9 +36,9 @@ "bundle":"cn.piflow.bundle.es.PutEs", "properties":{ "es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72", - "port": "9200", - "es_index": "yqd_ensembl_index", - "es_type": "yqd_ensembl_type" + "es_port": "9200", + "es_index": "test_21", + "es_type": "test_21" } } ], @@ -36,6 +47,11 @@ "from":"SelectFilesByName", "outport":"", "inport":"", + "to":"UnzipFilesOnHDFS" + },{ + "from":"UnzipFilesOnHDFS", + "outport":"", + "inport":"", "to":"Ensembl_gff3Parser" },{ "from":"Ensembl_gff3Parser", diff --git a/piflow-bundle/src/main/resources/microorganism/genbank.json b/piflow-bundle/src/main/resources/microorganism/genbank.json index 0cca202..7f9a69f 100644 --- a/piflow-bundle/src/main/resources/microorganism/genbank.json +++ b/piflow-bundle/src/main/resources/microorganism/genbank.json @@ -6,33 +6,30 @@ { "uuid":"1111", "name":"SelectFilesByName", - "bundle":"cn.piflow.bundle.ftp.SelectFilesByName", + "bundle":"cn.piflow.bundle.hdfs.SelectFilesByName", "properties":{ "HDFSUrl":"hdfs://master2.packone:8020", - "HDFSPath":"/microo/genbank/", - "selectionConditions":".*.seq.gz" + "HDFSPath":"/yg/microo/genbank2/", + "selectionConditions":"gbbct1.seq.gz" } },{ "uuid":"2222", "name":"UnzipFilesOnHDFS", - "bundle":"cn.piflow.bundle.http.UnzipFilesOnHDFS", + "bundle":"cn.piflow.bundle.hdfs.UnzipFilesOnHDFS", "properties":{ "isCustomize":"false", "filePath":"", "hdfsUrl":"hdfs://master2.packone:8020", - "savePath":"/microo/genbank/" + "savePath":"/yg/microo/genbank/" } }, { "uuid":"3333", "name":"GenBankParse", - "bundle":"cn.piflow.bundle.microorganism.GenBankParse", + "bundle":"cn.piflow.bundle.microorganism.GenBankData", "properties":{ - "es_nodes":"10.0.86.239", - "port":"9200", - "es_index":"genbank", - "es_type":"data6" + "cachePath": "/yg/microoCache/" } }, { @@ -41,9 +38,9 @@ "bundle": "cn.piflow.bundle.es.PutEs", "properties": { "es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72", - "port": "9200", - "es_index": "genbank", - "es_type": "genbank1" + "es_port": "9200", + "es_index": "test_12", + "es_type": "test_12" } } ], diff --git a/piflow-bundle/src/main/resources/microorganism/gene.json b/piflow-bundle/src/main/resources/microorganism/gene.json index 975e8b7..0a4e2cb 100644 --- a/piflow-bundle/src/main/resources/microorganism/gene.json +++ b/piflow-bundle/src/main/resources/microorganism/gene.json @@ -6,29 +6,43 @@ { "uuid":"1111", "name":"SelectFilesByName", - "bundle":"cn.piflow.bundle.ftp.SelectFilesByName", + "bundle":"cn.piflow.bundle.hdfs.SelectFilesByName", "properties":{ "HDFSUrl":"hdfs://10.0.88.70:9000", - "HDFSPath":"/yqd/weishengwu/gene_w/", - "selectionConditions":".*ene_info" + "HDFSPath":"/yqd/ftp/gene/", + "selectionConditions":"gene_info.gz" } },{ - "uuid":"3333", - "name":"GeneParser", - "bundle":"cn.piflow.bundle.microorganism.GeneParser", + "uuid":"2222", + "name":"UnzipFilesOnHDFS", + "bundle":"cn.piflow.bundle.hdfs.UnzipFilesOnHDFS", "properties":{ + "isCustomize":"false", + "filePath":"", + "hdfsUrl":"hdfs://10.0.88.70:9000", + "savePath":"/yqd/test/gene/" + } + }, + { + "uuid":"3333", + "name":"Gene", + "bundle":"cn.piflow.bundle.microorganism.Gene", + "properties":{ + "cachePath":"/yqd/test/gene/" } - },{ + }, + { "uuid":"4444", - "name":"PutEs", + "name":"putEs", "bundle":"cn.piflow.bundle.es.PutEs", "properties":{ "es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72", - "port": "9200", - "es_index": "gene_index", - "es_type": "gene_type" + "es_port": "9200", + "es_index": "test_16", + "es_type": "test_16" } + } ], "paths":[ @@ -36,12 +50,17 @@ "from":"SelectFilesByName", "outport":"", "inport":"", - "to":"GeneParser" + "to":"UnzipFilesOnHDFS" },{ - "from":"GeneParser", + "from":"UnzipFilesOnHDFS", "outport":"", "inport":"", - "to":"PutEs" + "to":"Gene" + },{ + "from":"Gene", + "outport":"", + "inport":"", + "to":"putEs" } ] } diff --git a/piflow-bundle/src/main/resources/microorganism/godata.json b/piflow-bundle/src/main/resources/microorganism/godata.json index 47a9c6c..0fd9840 100644 --- a/piflow-bundle/src/main/resources/microorganism/godata.json +++ b/piflow-bundle/src/main/resources/microorganism/godata.json @@ -6,18 +6,19 @@ { "uuid":"1111", "name":"SelectFilesByName", - "bundle":"cn.piflow.bundle.ftp.SelectFilesByName", + "bundle":"cn.piflow.bundle.hdfs.SelectFilesByName", "properties":{ "HDFSUrl":"hdfs://master2.packone:8020", - "HDFSPath":"/microoAll/", + "HDFSPath":"/yg/microo/go/", "selectionConditions":"go.obo" } }, { "uuid": "3333", "name": "GoDataParse", - "bundle": "cn.piflow.bundle.microorganism.GoDataParse", + "bundle": "cn.piflow.bundle.microorganism.GoData", "properties": { + "cachePath": "/yg/microoCache/" } }, { @@ -26,9 +27,9 @@ "bundle": "cn.piflow.bundle.es.PutEs", "properties": { "es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72", - "port": "9200", - "es_index": "go", - "es_type": "go" + "es_port": "9200", + "es_index": "test_13", + "es_type": "test_13" } } ], diff --git a/piflow-bundle/src/main/resources/microorganism/goldData.json b/piflow-bundle/src/main/resources/microorganism/goldData.json index 3ad75d2..be495e9 100644 --- a/piflow-bundle/src/main/resources/microorganism/goldData.json +++ b/piflow-bundle/src/main/resources/microorganism/goldData.json @@ -9,7 +9,7 @@ "bundle":"cn.piflow.bundle.http.FileDownHDFS", "properties":{ "url_str":"https://gold.jgi.doe.gov/download?mode=site_excel", - "savePath":"hdfs://master2.packone:8020/microo/golddata/gold.xlsx" + "savePath":"hdfs://master2.packone:8020/yg/microo/golddata/gold.xlsx" } }, { @@ -17,26 +17,7 @@ "name": "ExcelParser", "bundle": "cn.piflow.bundle.excel.ExcelParser", "properties": { - "jaonSavePath":"hdfs://master2.packone:8020/microo/golddata/golddata.json" - } - }, - { - "uuid": "2222", - "name": "GoldDataParse", - "bundle": "cn.piflow.bundle.microorganism.GoldDataParse", - "properties": { - - } - }, - { - "uuid": "3333", - "name": "putEs", - "bundle": "cn.piflow.bundle.es.PutEs", - "properties": { - "es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72", - "port": "9200", - "es_index": "golddata1", - "es_type": "golddatadaa" + "cachePath":"hdfs://master2.packone:8020/yg/microo/golddata/golddata.json" } } ], @@ -46,18 +27,6 @@ "outport":"", "inport":"", "to":"ExcelParser" - }, - { - "from":"ExcelParser", - "outport":"", - "inport":"", - "to":"GoldDataParse" - }, - { - "from":"GoldDataParse", - "outport":"", - "inport":"", - "to":"putEs" } ] } diff --git a/piflow-bundle/src/main/resources/microorganism/interpro.json b/piflow-bundle/src/main/resources/microorganism/interpro.json index 962a7bb..91eb4b9 100644 --- a/piflow-bundle/src/main/resources/microorganism/interpro.json +++ b/piflow-bundle/src/main/resources/microorganism/interpro.json @@ -6,29 +6,30 @@ { "uuid":"1111", "name":"SelectFilesByName", - "bundle":"cn.piflow.bundle.ftp.SelectFilesByName", + "bundle":"cn.piflow.bundle.hdfs.SelectFilesByName", "properties":{ "HDFSUrl":"hdfs://master2.packone:8020", - "HDFSPath":"/microoAll/", + "HDFSPath":"/yg/microo/interpro/", "selectionConditions":"interpro.xml.gz" } },{ "uuid":"2222", "name":"UnzipFilesOnHDFS", - "bundle":"cn.piflow.bundle.http.UnzipFilesOnHDFS", + "bundle":"cn.piflow.bundle.hdfs.UnzipFilesOnHDFS", "properties":{ "isCustomize":"false", "filePath":"", "hdfsUrl":"hdfs://master2.packone:8020", - "savePath":"/microoAll/inter/" + "savePath":"/yg/microo/interpro/" } }, { "uuid": "3333", "name": "InterprodataParse", - "bundle": "cn.piflow.bundle.microorganism.InterprodataParse", + "bundle": "cn.piflow.bundle.microorganism.InterproData", "properties": { + "cachePath": "/yg/microoCache/" } }, { @@ -37,9 +38,9 @@ "bundle": "cn.piflow.bundle.es.PutEs", "properties": { "es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72", - "port": "9200", - "es_index": "interpro", - "es_type": "interpro" + "es_port": "9200", + "es_index": "test_14", + "es_type": "test_14" } } ], diff --git a/piflow-bundle/src/main/resources/microorganism/NCBI_Microbe_genome.json b/piflow-bundle/src/main/resources/microorganism/microbe.json similarity index 86% rename from piflow-bundle/src/main/resources/microorganism/NCBI_Microbe_genome.json rename to piflow-bundle/src/main/resources/microorganism/microbe.json index 7fe3195..0d2c5b3 100644 --- a/piflow-bundle/src/main/resources/microorganism/NCBI_Microbe_genome.json +++ b/piflow-bundle/src/main/resources/microorganism/microbe.json @@ -15,9 +15,9 @@ },{ "uuid":"3333", "name":"MicrobeGenomeDataParser", - "bundle":"cn.piflow.bundle.microorganism.MicrobeGenomeDataParser", + "bundle":"cn.piflow.bundle.microorganism.MicrobeGenomeData", "properties":{ - + "cachePath":"/yqd/test/microbe/" } },{ "uuid":"4444", @@ -25,9 +25,9 @@ "bundle":"cn.piflow.bundle.es.PutEs", "properties":{ "es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72", - "port": "9200", - "es_index": "yqd_ncbi_microbe_genome_index_delet", - "es_type": "yqd_ncbi_microbe_genome_type_delet" + "es_port": "9200", + "es_index": "test_19", + "es_type": "test_19" } } ], diff --git a/piflow-bundle/src/main/resources/microorganism/pfam.json b/piflow-bundle/src/main/resources/microorganism/pfam.json new file mode 100644 index 0000000..440185a --- /dev/null +++ b/piflow-bundle/src/main/resources/microorganism/pfam.json @@ -0,0 +1,50 @@ +{ + "flow":{ + "name":"test", + "uuid":"1234", + "stops":[ + { + "uuid":"1111", + "name":"SelectFilesByName", + "bundle":"cn.piflow.bundle.hdfs.SelectFilesByName", + "properties":{ + "HDFSUrl":"hdfs://10.0.88.70:9000", + "HDFSPath":"/yqd/test/pfam/", + "selectionConditions":".*full" + } + }, + { + "uuid":"3333", + "name":"PfamData", + "bundle":"cn.piflow.bundle.microorganism.PfamData", + "properties":{ + "cachePath":"/yqd/test/pfam/" + } + }, + { + "uuid":"4444", + "name":"putEs", + "bundle":"cn.piflow.bundle.es.PutEs", + "properties":{ + "es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72", + "es_port": "9200", + "es_index": "test_15", + "es_type": "test_15" + } + } + ], + "paths":[ + { + "from":"SelectFilesByName", + "outport":"", + "inport":"", + "to":"PfamData" + },{ + "from":"PfamData", + "outport":"", + "inport":"", + "to":"putEs" + } + ] + } +} \ No newline at end of file diff --git a/piflow-bundle/src/main/resources/microorganism/png/BioProject.png b/piflow-bundle/src/main/resources/microorganism/png/BioProject.png deleted file mode 100644 index ad921c6..0000000 Binary files a/piflow-bundle/src/main/resources/microorganism/png/BioProject.png and /dev/null differ diff --git a/piflow-bundle/src/main/resources/microorganism/png/Biosample.png b/piflow-bundle/src/main/resources/microorganism/png/Biosample.png deleted file mode 100644 index 1c79f18..0000000 Binary files a/piflow-bundle/src/main/resources/microorganism/png/Biosample.png and /dev/null differ diff --git a/piflow-bundle/src/main/resources/microorganism/png/Ensembl.png b/piflow-bundle/src/main/resources/microorganism/png/Ensembl.png deleted file mode 100644 index 837df81..0000000 Binary files a/piflow-bundle/src/main/resources/microorganism/png/Ensembl.png and /dev/null differ diff --git a/piflow-bundle/src/main/resources/microorganism/png/GOLD.png b/piflow-bundle/src/main/resources/microorganism/png/GOLD.png deleted file mode 100644 index 624d425..0000000 Binary files a/piflow-bundle/src/main/resources/microorganism/png/GOLD.png and /dev/null differ diff --git a/piflow-bundle/src/main/resources/microorganism/png/Genbank.png b/piflow-bundle/src/main/resources/microorganism/png/Genbank.png deleted file mode 100644 index f20ef78..0000000 Binary files a/piflow-bundle/src/main/resources/microorganism/png/Genbank.png and /dev/null differ diff --git a/piflow-bundle/src/main/resources/microorganism/png/Gene.png b/piflow-bundle/src/main/resources/microorganism/png/Gene.png deleted file mode 100644 index 5503409..0000000 Binary files a/piflow-bundle/src/main/resources/microorganism/png/Gene.png and /dev/null differ diff --git a/piflow-bundle/src/main/resources/microorganism/png/Gene_Ontology.png b/piflow-bundle/src/main/resources/microorganism/png/Gene_Ontology.png deleted file mode 100644 index 63d5d2f..0000000 Binary files a/piflow-bundle/src/main/resources/microorganism/png/Gene_Ontology.png and /dev/null differ diff --git a/piflow-bundle/src/main/resources/microorganism/png/Interpro.png b/piflow-bundle/src/main/resources/microorganism/png/Interpro.png deleted file mode 100644 index 99e106f..0000000 Binary files a/piflow-bundle/src/main/resources/microorganism/png/Interpro.png and /dev/null differ diff --git a/piflow-bundle/src/main/resources/microorganism/png/NCBI_Microbe_genome.png b/piflow-bundle/src/main/resources/microorganism/png/NCBI_Microbe_genome.png deleted file mode 100644 index 27afc79..0000000 Binary files a/piflow-bundle/src/main/resources/microorganism/png/NCBI_Microbe_genome.png and /dev/null differ diff --git a/piflow-bundle/src/main/resources/microorganism/png/PDB.png b/piflow-bundle/src/main/resources/microorganism/png/PDB.png deleted file mode 100644 index 07030af..0000000 Binary files a/piflow-bundle/src/main/resources/microorganism/png/PDB.png and /dev/null differ diff --git a/piflow-bundle/src/main/resources/microorganism/png/Pfam.png b/piflow-bundle/src/main/resources/microorganism/png/Pfam.png deleted file mode 100644 index 86d611f..0000000 Binary files a/piflow-bundle/src/main/resources/microorganism/png/Pfam.png and /dev/null differ diff --git a/piflow-bundle/src/main/resources/microorganism/png/Refseq.png b/piflow-bundle/src/main/resources/microorganism/png/Refseq.png deleted file mode 100644 index c74bf28..0000000 Binary files a/piflow-bundle/src/main/resources/microorganism/png/Refseq.png and /dev/null differ diff --git a/piflow-bundle/src/main/resources/microorganism/png/Swissprot+TrEMBL.png b/piflow-bundle/src/main/resources/microorganism/png/Swissprot+TrEMBL.png deleted file mode 100644 index 80bf023..0000000 Binary files a/piflow-bundle/src/main/resources/microorganism/png/Swissprot+TrEMBL.png and /dev/null differ diff --git a/piflow-bundle/src/main/resources/microorganism/png/Taxonomy.png b/piflow-bundle/src/main/resources/microorganism/png/Taxonomy.png deleted file mode 100644 index c67879f..0000000 Binary files a/piflow-bundle/src/main/resources/microorganism/png/Taxonomy.png and /dev/null differ diff --git a/piflow-bundle/src/main/resources/microorganism/png/embl.png b/piflow-bundle/src/main/resources/microorganism/png/embl.png deleted file mode 100644 index 8a1d533..0000000 Binary files a/piflow-bundle/src/main/resources/microorganism/png/embl.png and /dev/null differ diff --git a/piflow-bundle/src/main/resources/microorganism/refseq_genome.json b/piflow-bundle/src/main/resources/microorganism/refseq.json similarity index 61% rename from piflow-bundle/src/main/resources/microorganism/refseq_genome.json rename to piflow-bundle/src/main/resources/microorganism/refseq.json index 98f997a..7e14579 100644 --- a/piflow-bundle/src/main/resources/microorganism/refseq_genome.json +++ b/piflow-bundle/src/main/resources/microorganism/refseq.json @@ -7,29 +7,29 @@ { "uuid":"1111", "name":"SelectFilesByName", - "bundle":"cn.piflow.bundle.ftp.SelectFilesByName", + "bundle":"cn.piflow.bundle.hdfs.SelectFilesByName", "properties":{ "HDFSUrl":"hdfs://10.0.88.70:9000", - "HDFSPath":"/yqd/weishengwu/refseq/", - "selectionConditions":".*genomic.gbff.gz" + "HDFSPath":"/yqd/ftp/refseq/", + "selectionConditions":"archaea.4.genomic.gbff.gz" } },{ "uuid":"2222", - "name":"UnzipFilesOnHDFS_1", - "bundle":"cn.piflow.bundle.http.UnzipFilesOnHDFS_1", + "name":"UnzipFilesOnHDFS", + "bundle":"cn.piflow.bundle.hdfs.UnzipFilesOnHDFS", "properties":{ "isCustomize":"false", "filePath":"", - "fileType":"gz", - "unzipPath":"" - + "hdfsUrl":"hdfs://10.0.88.70:9000", + "savePath":"/yqd/test/refseq/" } }, { "uuid":"3333", "name":"Refseq_genomeParser", - "bundle":"cn.piflow.bundle.microorganism.Refseq_genomeParser", + "bundle":"cn.piflow.bundle.microorganism.RefseqData", "properties":{ + "cachePath":"/yqd/test/refseq/" } },{ "uuid":"4444", @@ -37,9 +37,9 @@ "bundle":"cn.piflow.bundle.es.PutEs", "properties":{ "es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72", - "port": "9200", - "es_index": "genome", - "es_type": "archaea" + "es_port": "9200", + "es_index": "test_20", + "es_type": "test_20" } } ], @@ -48,10 +48,10 @@ "from":"SelectFilesByName", "outport":"", "inport":"", - "to":"UnzipFilesOnHDFS_1" + "to":"UnzipFilesOnHDFS" }, { - "from":"UnzipFilesOnHDFS_1", + "from":"UnzipFilesOnHDFS", "outport":"", "inport":"", "to":"Refseq_genomeParser" diff --git a/piflow-bundle/src/main/resources/microorganism/select_unzip.json b/piflow-bundle/src/main/resources/microorganism/select_unzip.json deleted file mode 100644 index 29c65d2..0000000 --- a/piflow-bundle/src/main/resources/microorganism/select_unzip.json +++ /dev/null @@ -1,37 +0,0 @@ -{ - "flow":{ - "name":"test", - "uuid":"1234", - "stops":[ - { - "uuid":"0000", - "name":"SelectFilesByName", - "bundle":"cn.piflow.bundle.ftp.SelectFilesByName", - "properties":{ - "HDFSUrl":"hdfs://10.0.88.70:9000", - "HDFSPath":"/yqd/", - "selectionConditions":".*genomic.gbff.gz" - } - },{ - "uuid":"1111", - "name":"UnzipFilesOnHDFS_1", - "bundle":"cn.piflow.bundle.http.UnzipFilesOnHDFS_1", - "properties":{ - "isCustomize":"true", - "filePath":"hdfs://10.0.88.70:9000/yqd/archaea.1.genomic.gbff.gz", - "fileType":"gz", - "unzipPath":"hdfs://10.0.88.70:9000/yqd/weishengwu/" - - } - } - ], - "paths":[ - { - "from":"SelectFilesByName", - "outport":"", - "inport":"", - "to":"UnzipFilesOnHDFS_1" - } - ] - } -} \ No newline at end of file diff --git a/piflow-bundle/src/main/resources/microorganism/swissprot.json b/piflow-bundle/src/main/resources/microorganism/swissprot.json index 093fde2..9147583 100644 --- a/piflow-bundle/src/main/resources/microorganism/swissprot.json +++ b/piflow-bundle/src/main/resources/microorganism/swissprot.json @@ -6,18 +6,29 @@ { "uuid":"1111", "name":"SelectFilesByName", - "bundle":"cn.piflow.bundle.ftp.SelectFilesByName", + "bundle":"cn.piflow.bundle.hdfs.SelectFilesByName", "properties":{ "HDFSUrl":"hdfs://10.0.88.70:9000", - "HDFSPath":"/yqd/weishengwu/Swissprot_TrEMBL/", - "selectionConditions":"uniprot_sprot.dat" + "HDFSPath":"/yqd/ftp/swiss/", + "selectionConditions":".*trembl.dat.gz" } },{ - "uuid":"3333", - "name":"Swissprot_TrEMBLDataParser", - "bundle":"cn.piflow.bundle.microorganism.Swissprot_TrEMBLDataParser", + "uuid":"2222", + "name":"UnzipFilesOnHDFS", + "bundle":"cn.piflow.bundle.hdfs.UnzipFilesOnHDFS", "properties":{ - + "isCustomize":"false", + "filePath":"", + "hdfsUrl":"hdfs://10.0.88.70:9000", + "savePath":"/yqd/test/swiss/" + } + }, + { + "uuid":"3333", + "name":"SwissprotData", + "bundle":"cn.piflow.bundle.microorganism.SwissprotData", + "properties":{ + "cachePath":"/yqd/test/swiss/" } },{ "uuid":"4444", @@ -25,9 +36,9 @@ "bundle":"cn.piflow.bundle.es.PutEs", "properties":{ "es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72", - "port": "9200", - "es_index": "Swissprot_TrEMBL_index", - "es_type": "Swissprot_TrEMBL_type" + "es_port": "9200", + "es_index": "test_18", + "es_type": "test_18" } } ], @@ -36,9 +47,15 @@ "from":"SelectFilesByName", "outport":"", "inport":"", - "to":"Swissprot_TrEMBLDataParser" + "to":"UnzipFilesOnHDFS" + }, + { + "from":"UnzipFilesOnHDFS", + "outport":"", + "inport":"", + "to":"SwissprotData" },{ - "from":"Swissprot_TrEMBLDataParser", + "from":"SwissprotData", "outport":"", "inport":"", "to":"PutEs" diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/es/FetchEs.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/es/FetchEs.scala index 1656edc..7c5a145 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/es/FetchEs.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/es/FetchEs.scala @@ -48,10 +48,14 @@ class FetchEs extends ConfigurableStop { override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("es_nodes").defaultValue("Node of Elasticsearch").required(true) - val es_port = new PropertyDescriptor().name("es_port").displayName("es_port").defaultValue("Port of Elasticsearch").required(true) - val es_index = new PropertyDescriptor().name("es_index").displayName("es_index").defaultValue("Index of Elasticsearch").required(true) - val es_type = new PropertyDescriptor().name("es_type").displayName("es_type").defaultValue("Type of Elasticsearch").required(true) + val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("es_nodes") + .description("Node of Elasticsearch").defaultValue("").required(true) + val es_port = new PropertyDescriptor().defaultValue("9200").name("es_port").displayName("es_port") + .description("Port of Elasticsearch").required(true) + val es_index = new PropertyDescriptor().name("es_index").displayName("es_index") + .description("Index of Elasticsearch").defaultValue("").required(true) + val es_type = new PropertyDescriptor().name("es_type").displayName("es_type") + .description("Type of Elasticsearch").defaultValue("").required(true) descriptor = es_nodes :: descriptor diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/es/PutEs.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/es/PutEs.scala index 99c5989..4cbc578 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/es/PutEs.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/es/PutEs.scala @@ -47,11 +47,14 @@ class PutEs extends ConfigurableStop { override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("es_nodes").defaultValue("Node of Elasticsearch").required(true) - val es_port = new PropertyDescriptor().name("es_port").displayName("es_port").defaultValue("Port of Elasticsearch").required(true) - val es_index = new PropertyDescriptor().name("es_index").displayName("es_index").defaultValue("Index of Elasticsearch").required(true) - val es_type = new PropertyDescriptor().name("es_type").displayName("es_type").defaultValue("Type of Elasticsearch").required(true) - + val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("es_nodes") + .description("Node of Elasticsearch").defaultValue("").required(true) + val es_port = new PropertyDescriptor().defaultValue("9200").name("es_port").displayName("es_port") + .description("Port of Elasticsearch").required(true) + val es_index = new PropertyDescriptor().name("es_index").displayName("es_index") + .description("Index of Elasticsearch").defaultValue("").required(true) + val es_type = new PropertyDescriptor().name("es_type").displayName("es_type") + .description("Type of Elasticsearch").defaultValue("").required(true) descriptor = es_nodes :: descriptor descriptor = es_port :: descriptor diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/es/QueryEs.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/es/QueryEs.scala index cc4d723..3d2c101 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/es/QueryEs.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/es/QueryEs.scala @@ -50,16 +50,22 @@ class QueryEs extends ConfigurableStop { override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("es_nodes").defaultValue("Node of Elasticsearch").required(true) - val es_port = new PropertyDescriptor().name("es_port").displayName("es_port").defaultValue("Port of Elasticsearch").required(true) - val es_index = new PropertyDescriptor().name("es_index").displayName("es_index").defaultValue("Index of Elasticsearch").required(true) - val es_type = new PropertyDescriptor().name("es_type").displayName("es_type").defaultValue("Type of Elasticsearch").required(true) - + val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("es_nodes") + .description("Node of Elasticsearch").defaultValue("").required(true) + val es_port = new PropertyDescriptor().defaultValue("9200").name("es_port").displayName("es_port") + .description("Port of Elasticsearch").required(true) + val es_index = new PropertyDescriptor().name("es_index").displayName("es_index") + .description("Index of Elasticsearch").defaultValue("").required(true) + val es_type = new PropertyDescriptor().name("es_type").displayName("es_type") + .description("Type of Elasticsearch").defaultValue("").required(true) + val jsonDSL = new PropertyDescriptor().name("jsonDSL").displayName("jsonDSL") + .description("DSL of Elasticsearch").defaultValue("").required(true) descriptor = es_nodes :: descriptor descriptor = es_port :: descriptor descriptor = es_index :: descriptor descriptor = es_type :: descriptor + descriptor = jsonDSL :: descriptor descriptor } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/DeleteHdfs.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/DeleteHdfs.scala index c2532fb..a86e702 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/DeleteHdfs.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/DeleteHdfs.scala @@ -14,39 +14,67 @@ import org.apache.hadoop.fs.Path class DeleteHdfs extends ConfigurableStop{ override val authorEmail: String = "ygang@cmic.com" - override val inportList: List[String] = List(PortEnum.NonePort.toString) - override val outportList: List[String] = List(PortEnum.NonePort.toString) + override val inportList: List[String] = List(PortEnum.DefaultPort.toString) + override val outportList: List[String] = List(PortEnum.DefaultPort.toString) override val description: String = "Delete file or directory on hdfs" var hdfsUrl :String= _ var deletePath :String = _ + var isCustomize:String=_ override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { val spark = pec.get[SparkSession]() - val array = deletePath.split(",") + if (isCustomize.equals("false")){ + val inDf = in.read() - for (i<- 0 until array.length){ - val hdfsPath = hdfsUrl+array(i) - val path = new Path(array(i)) + val configuration: Configuration = new Configuration() + var pathStr: String =inDf.take(1)(0).get(0).asInstanceOf[String] + val pathARR: Array[String] = pathStr.split("\\/") + var hdfsUrl:String="" + for (x <- (0 until 3)){ + hdfsUrl+=(pathARR(x) +"/") + } + configuration.set("fs.defaultFS",hdfsUrl) - val config = new Configuration() - config.set("fs.defaultFS",hdfsUrl) - val fs = FileSystem.get(config) + var fs: FileSystem = FileSystem.get(configuration) - fs.delete(path,true) + inDf.collect.foreach(row=>{ + pathStr = row.get(0).asInstanceOf[String] + val path = new Path(pathStr) + fs.delete(path,true) + }) + } else { + val array = deletePath.split(",") + + for (i<- 0 until array.length){ + val hdfsPath = hdfsUrl+"/"+array(i) + val path = new Path(hdfsPath) + + val config = new Configuration() + config.set("fs.defaultFS",hdfsUrl) + val fs = FileSystem.get(config) + + fs.delete(path,true) + } } } override def setProperties(map: Map[String, Any]): Unit = { hdfsUrl = MapUtil.get(map,key="hdfsUrl").asInstanceOf[String] deletePath = MapUtil.get(map,key="deletePath").asInstanceOf[String] + isCustomize=MapUtil.get(map,key="isCustomize").asInstanceOf[String] } override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() val hdfsUrl = new PropertyDescriptor().name("hdfsUrl").displayName("hdfsUrl").defaultValue("").required(true) val deletePath = new PropertyDescriptor().name("deletePath").displayName("deletePath").defaultValue("").required(true) + val isCustomize = new PropertyDescriptor().name("isCustomize").displayName("isCustomize").description("Whether to customize the compressed file path, if true, " + + "you must specify the path where the compressed file is located . " + + "If it is false, it will automatically find the file path data from the upstream port ") + .defaultValue("false").allowableValues(Set("true","false")).required(true) + descriptor = isCustomize :: descriptor descriptor = hdfsUrl :: descriptor descriptor = deletePath :: descriptor descriptor diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/GetHdfs.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/GetHdfs.scala index d2834bf..be104a0 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/GetHdfs.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/GetHdfs.scala @@ -10,7 +10,7 @@ import org.apache.spark.sql.SparkSession class GetHdfs extends ConfigurableStop{ override val authorEmail: String = "ygang@cmic.com" override val description: String = "Get data from hdfs" - override val inportList: List[String] = List(PortEnum.NonePort.toString) + override val inportList: List[String] = List(PortEnum.DefaultPort.toString) override val outportList: List[String] = List(PortEnum.DefaultPort.toString) var hdfsUrl : String=_ @@ -45,7 +45,7 @@ class GetHdfs extends ConfigurableStop{ } else { val rdd = sc.textFile(path) - val outDf = rdd.toDF("txt") + val outDf = rdd.toDF() outDf.schema.printTreeString() //outDf.show() out.write(outDf) @@ -60,9 +60,13 @@ class GetHdfs extends ConfigurableStop{ override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - val hdfsPath = new PropertyDescriptor().name("hdfsPath").displayName("hdfsPath").defaultValue("").required(true) - val hdfsUrl = new PropertyDescriptor().name("hdfsUrl").displayName("hdfsUrl").defaultValue("").required(true) - val types = new PropertyDescriptor().name("types").displayName("types").defaultValue("txt,parquet,csv,json").required(true) + val hdfsPath = new PropertyDescriptor().name("hdfsPath").displayName("hdfsPath") + .defaultValue("").required(true) + val hdfsUrl = new PropertyDescriptor().name("hdfsUrl").displayName("hdfsUrl") + .defaultValue("").required(true) + val types = new PropertyDescriptor().name("types").displayName("types").description("txt,parquet,csv,json") + .defaultValue("txt").allowableValues(Set("txt","parquet","csv","json")).required(true) + descriptor = types :: descriptor descriptor = hdfsPath :: descriptor descriptor = hdfsUrl :: descriptor diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/ListHdfs.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/ListHdfs.scala index d422a18..05670be 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/ListHdfs.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/ListHdfs.scala @@ -6,38 +6,54 @@ import cn.piflow.conf.util.{ImageUtil, MapUtil} import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} -import org.apache.spark.sql. SparkSession +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.sql.{DataFrame, Row, SparkSession} + +import scala.collection.mutable.ArrayBuffer class ListHdfs extends ConfigurableStop{ override val authorEmail: String = "ygang@cmic.com" - override val inportList: List[String] = List(PortEnum.NonePort.toString) + override val inportList: List[String] = List(PortEnum.DefaultPort.toString) override val outportList: List[String] = List(PortEnum.DefaultPort.toString) override val description: String = "Retrieve a list of files from hdfs" - var hdfsPath :String= _ - var hdfsUrl :String= _ - var list = List("") + var HDFSPath :String= _ + var HDFSUrl :String= _ + var pathARR:ArrayBuffer[String]=ArrayBuffer() override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { val spark = pec.get[SparkSession]() val sc = spark.sparkContext - val path = new Path(hdfsPath) - + val path = new Path(HDFSPath) iterationFile(path.toString) import spark.implicits._ - val outDF = sc.parallelize(list).toDF() + val rows: List[Row] = pathARR.map(each => { + var arr:Array[String]=Array(each) + val row: Row = Row.fromSeq(arr) + row + }).toList + + val rowRDD: RDD[Row] = sc.makeRDD(rows) +// val fields: Array[StructField] = "path".split("/").map(d=>StructField(d,StringType,nullable = true)) +// val schema: StructType = StructType(fields) + + val schema: StructType = StructType(Array( + StructField("path",StringType) + )) + val outDF: DataFrame = spark.createDataFrame(rowRDD,schema) out.write(outDF) } // recursively traverse the folder def iterationFile(path: String):Unit = { val config = new Configuration() - config.set("fs.defaultFS",hdfsUrl) + config.set("fs.defaultFS",HDFSUrl) val fs = FileSystem.get(config) val listf = new Path(path) @@ -45,28 +61,25 @@ class ListHdfs extends ConfigurableStop{ for (f <- statuses) { val fsPath = f.getPath().toString - //println(fsPath) - if (f.isDirectory) { - list = fsPath::list +// pathARR += fsPath iterationFile(fsPath) - } else{ - list = f.getPath.toString::list + pathARR += f.getPath.toString } } } override def setProperties(map: Map[String, Any]): Unit = { - hdfsUrl = MapUtil.get(map,key="hdfsUrl").asInstanceOf[String] - hdfsPath = MapUtil.get(map,key="hdfsPath").asInstanceOf[String] + HDFSUrl = MapUtil.get(map,key="HDFSUrl").asInstanceOf[String] + HDFSPath = MapUtil.get(map,key="HDFSPath").asInstanceOf[String] } override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - val hdfsPath = new PropertyDescriptor().name("hdfsPath").displayName("hdfsPath").defaultValue("").required(true) - val hdfsUrl = new PropertyDescriptor().name("hdfsUrl").displayName("hdfsUrl").defaultValue("").required(true) + val hdfsPath = new PropertyDescriptor().name("HDFSPath").displayName("HDFSPath").defaultValue("").required(true) + val hdfsUrl = new PropertyDescriptor().name("HDFSUrl").displayName("HDFSUrl").defaultValue("").required(true) descriptor = hdfsPath :: descriptor descriptor = hdfsUrl :: descriptor descriptor diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/PutHdfs.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/PutHdfs.scala index 8476574..92ad841 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/PutHdfs.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/PutHdfs.scala @@ -13,7 +13,7 @@ import org.apache.spark.sql.SparkSession class PutHdfs extends ConfigurableStop{ override val authorEmail: String = "ygang@cmic.com" override val inportList: List[String] = List(PortEnum.DefaultPort.toString) - override val outportList: List[String] = List(PortEnum.NonePort.toString) + override val outportList: List[String] = List(PortEnum.DefaultPort.toString) override val description: String = "Put data to hdfs" var hdfsPath :String= _ diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/SelectFilesByName.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/SelectFilesByName.scala index dee12ae..adb597f 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/SelectFilesByName.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/SelectFilesByName.scala @@ -17,7 +17,7 @@ import scala.collection.mutable.ArrayBuffer class SelectFilesByName extends ConfigurableStop{ override val authorEmail: String = "yangqidong@cnic.cn" override val description: String = "Select files by file name" - override val inportList: List[String] = List(PortEnum.NonePort.toString) + override val inportList: List[String] = List(PortEnum.DefaultPort.toString) override val outportList: List[String] = List(PortEnum.DefaultPort.toString) var HDFSUrl:String=_ @@ -68,10 +68,11 @@ class SelectFilesByName extends ConfigurableStop{ val schema: StructType = StructType(fields) val df: DataFrame = session.createDataFrame(rowRDD,schema) - println("#################################################") df.show(20) - println("#################################################") + println(df.count+"#################################################") + + out.write(df) } @@ -85,9 +86,12 @@ class SelectFilesByName extends ConfigurableStop{ override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - val HDFSUrl = new PropertyDescriptor().name("HDFSUrl").displayName("HDFSUrl").defaultValue("The URL of the HDFS file system, such as hdfs://10.0.88.70:9000").required(true) - val HDFSPath = new PropertyDescriptor().name("HDFSPath").displayName("HDFSPath").defaultValue("The save path of the HDFS file system, such as /test/Ab").required(true) - val selectionConditions = new PropertyDescriptor().name("selectionConditions").displayName("selectionConditions").defaultValue("To select conditions, you need to fill in regular expressions in java, such as. * abc. *").required(true) + val HDFSUrl = new PropertyDescriptor().name("HDFSUrl").displayName("HDFSUrl").description("The URL of the HDFS file system, such as hdfs://ip:port") + .defaultValue("hdfs://").required(true) + val HDFSPath = new PropertyDescriptor().name("HDFSPath").displayName("HDFSPath").description("The save path of the HDFS file system, such as /test/Ab") + .defaultValue("").required(true) + val selectionConditions = new PropertyDescriptor().name("selectionConditions").displayName("selectionConditions").description("To select conditions, you need to fill in regular expressions in java, such as. * abc. *") + .defaultValue("").required(true) descriptor = HDFSUrl :: descriptor descriptor = HDFSPath :: descriptor descriptor = selectionConditions :: descriptor diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/UnzipFilesOnHDFS.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/UnzipFilesOnHDFS.scala index 7aa95f4..380283a 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/UnzipFilesOnHDFS.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/hdfs/UnzipFilesOnHDFS.scala @@ -186,7 +186,7 @@ class UnzipFilesOnHDFS extends ConfigurableStop { val isCustomize = new PropertyDescriptor().name("isCustomize").displayName("isCustomize").description("Whether to customize the compressed file path, if true, " + "you must specify the path where the compressed file is located . " + "If it is false, it will automatically find the file path data from the upstream port ") - .defaultValue("").required(false) + .defaultValue("false").allowableValues(Set("true","false")).required(true) descriptor = isCustomize :: descriptor descriptor = filePath :: descriptor descriptor = hdfsUrl :: descriptor diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/BioProjetData.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/BioProjetData.scala index e133f66..6c8e49a 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/BioProjetData.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/BioProjetData.scala @@ -41,17 +41,15 @@ class BioProjetData extends ConfigurableStop{ configuration.set("fs.defaultFS",hdfsUrl) var fs: FileSystem = FileSystem.get(configuration) - val hdfsPathTemporary = hdfsUrl+cachePath+"/bioprojectCatch/bioprojectCatch.json" + val hdfsPathTemporary = hdfsUrl+cachePath+"/bioprojectCache/bioprojectCache.json" val path: Path = new Path(hdfsPathTemporary) if(fs.exists(path)){ fs.delete(path) } fs.create(path).close() - var fdosOut: FSDataOutputStream = fs.append(path) - var jsonStr: String ="" - var bisIn: BufferedInputStream =null + val hdfsWriter: OutputStreamWriter = new OutputStreamWriter(fs.append(path)) inDf.collect().foreach(row =>{ val pathStr = row.get(0).asInstanceOf[String] @@ -64,23 +62,20 @@ class BioProjetData extends ConfigurableStop{ br.readLine() i = i + 1 } - var count = 0 + var xml = new StringBuffer() var x = 0 + var count = 0 while ((line = br.readLine()) != null && x <1 && line!= null ) { + xml.append(line) if (line.equals("")){ - println("----------------------------------break") + println("break") x == 1 } else if (line.indexOf("") != -1){ //reach the end of a doc count = count + 1 val doc: JSONObject = XML.toJSONObject(xml.toString()).getJSONObject(name) - val accession = doc.getJSONObject("Project").getJSONObject("Project") - .getJSONObject("ProjectID") - .getJSONObject("ArchiveID") - .getString("accession") - val projectDescr = doc.getJSONObject("Project").getJSONObject("Project") .getJSONObject("ProjectDescr") @@ -115,7 +110,7 @@ class BioProjetData extends ConfigurableStop{ } // ----------------3 - if(doc.getJSONObject("Project").optJSONObject("Submission") != null){ + if(doc.getJSONObject("Project").optJSONObject("Submission") != null){ val submission = doc.getJSONObject("Project").optJSONObject("Submission") if(submission.opt("submitted") != null){ @@ -166,27 +161,19 @@ class BioProjetData extends ConfigurableStop{ } } - bisIn = new BufferedInputStream(new ByteArrayInputStream((doc.toString+"\n").getBytes())) - val buff: Array[Byte] = new Array[Byte](1048576) - - var num: Int = bisIn.read(buff) - while (num != -1) { - fdosOut.write(buff, 0, num) - fdosOut.flush() - num = bisIn.read(buff) - } - fdosOut.flush() - bisIn = null + doc.write(hdfsWriter) + hdfsWriter.write("\n") xml = new StringBuffer() } + } + br.close() + fdis.close() }) - - fdosOut.close() - + hdfsWriter.close() println("start parser HDFSjsonFile") val df: DataFrame = spark.read.json(hdfsPathTemporary) out.write(df) @@ -199,7 +186,8 @@ class BioProjetData extends ConfigurableStop{ override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").defaultValue("").required(true) + val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").description("Temporary Cache File Path") + .defaultValue("/bioproject").required(true) descriptor = cachePath :: descriptor descriptor } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/BioSample.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/BioSample.scala index 793bfd6..70cf9f6 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/BioSample.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/BioSample.scala @@ -25,7 +25,6 @@ class BioSample extends ConfigurableStop{ def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { val spark = pec.get[SparkSession]() val sc = spark.sparkContext -// val ssc = spark.sqlContext val inDf= in.read() val configuration: Configuration = new Configuration() @@ -35,27 +34,22 @@ class BioSample extends ConfigurableStop{ for (x <- (0 until 3)){ hdfsUrl+=(pathARR(x) +"/") } - configuration.set("fs.defaultFS",hdfsUrl) var fs: FileSystem = FileSystem.get(configuration) - var hdfsPathJsonCache:String = "" - hdfsPathJsonCache = hdfsUrl+cachePath+"/biosampleCache/biosampleCache.json" + val hdfsPathTemporary = hdfsUrl+cachePath+"/biosampleCache/biosampleCache.json" - val path: Path = new Path(hdfsPathJsonCache) + val path: Path = new Path(hdfsPathTemporary) if(fs.exists(path)){ fs.delete(path) } fs.create(path).close() + val hdfsWriter: OutputStreamWriter = new OutputStreamWriter(fs.append(path)) - var fdosOut: FSDataOutputStream = null - var bisIn: BufferedInputStream =null - fdosOut = fs.append(path) var count = 0 - var nameNum = 0 inDf.collect().foreach(row => { pathStr = row.get(0).asInstanceOf[String] @@ -66,8 +60,7 @@ class BioSample extends ConfigurableStop{ br.readLine() br.readLine() - - while ((line = br.readLine()) != null && line!= null) { + while ((line = br.readLine()) != null && line!= null ) { xml = xml + line if (line.indexOf("") != -1) { count = count + 1 @@ -79,7 +72,6 @@ class BioSample extends ConfigurableStop{ if (attrs.equals("")) { doc.remove("Attributes") } - // Links val links: String = doc.optString("Links") if (links != null) { @@ -123,36 +115,24 @@ class BioSample extends ConfigurableStop{ if (jsonDoc.contains("Ids\":{\"Id\":[{")){ } else { -// println(count) -// println(jsonDoc) jsonDoc = jsonDoc.replace("Ids\":{\"Id\":{","Ids\":{\"Id\":[{") } } - bisIn = new BufferedInputStream(new ByteArrayInputStream((jsonDoc+"\n").getBytes())) - - val buff: Array[Byte] = new Array[Byte](1048576) - - var num: Int = bisIn.read(buff) - while (num != -1) { - fdosOut.write(buff, 0, num) - fdosOut.flush() - num = bisIn.read(buff) - } - fdosOut.flush() - bisIn = null + doc.write(hdfsWriter) + hdfsWriter.write("\n") xml = "" } } + br.close() + fdis.close() }) + hdfsWriter.close() - // bisIn.close() - fdosOut.close() println("start parser HDFSjsonFile") - val df: DataFrame = spark.read.json(hdfsPathJsonCache) - + val df: DataFrame = spark.read.json(hdfsPathTemporary) out.write(df) } @@ -164,7 +144,8 @@ class BioSample extends ConfigurableStop{ override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").defaultValue("").required(true) + val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").description("Temporary Cache File Path") + .defaultValue("/biosample").required(true) descriptor = cachePath :: descriptor descriptor } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/EmblData.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/EmblData.scala index fc9d7f7..d1df00d 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/EmblData.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/EmblData.scala @@ -4,7 +4,7 @@ import java.io._ import cn.piflow.bundle.microorganism.util.{CustomIOTools, Process} import cn.piflow.conf.bean.PropertyDescriptor -import cn.piflow.conf.util.ImageUtil +import cn.piflow.conf.util.{ImageUtil, MapUtil} import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} import org.apache.hadoop.conf.Configuration @@ -20,13 +20,12 @@ class EmblData extends ConfigurableStop{ override val outportList: List[String] = List(PortEnum.DefaultPort.toString) - + var cachePath:String = _ override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { val session = pec.get[SparkSession]() - val inDf: DataFrame = in.read() - val configuration: Configuration = new Configuration() + val configuration: Configuration = new Configuration() var pathStr: String =inDf.take(1)(0).get(0).asInstanceOf[String] val pathARR: Array[String] = pathStr.split("\\/") var hdfsUrl:String="" @@ -36,85 +35,56 @@ class EmblData extends ConfigurableStop{ configuration.set("fs.defaultFS",hdfsUrl) var fs: FileSystem = FileSystem.get(configuration) - val hdfsPathTemporary:String = hdfsUrl+"/Refseq_genomeParser_temporary.json" - val path: Path = new Path(hdfsPathTemporary) + val hdfsPathTemporary = hdfsUrl+cachePath+"/emblDataCache/emblDataCache.json" + val path: Path = new Path(hdfsPathTemporary) if(fs.exists(path)){ fs.delete(path) } - fs.create(path).close() - var fdos: FSDataOutputStream = fs.append(path) - var jsonStr: String ="" + val hdfsWriter: OutputStreamWriter = new OutputStreamWriter(fs.append(path)) - var bis: BufferedInputStream =null + var doc: JSONObject = null + var seq: RichSequence = null + var br: BufferedReader = null + var sequences: RichSequenceIterator = null + var count : Int =0 inDf.collect().foreach(row => { - var n : Int =0 - pathStr = row.get(0).asInstanceOf[String] + pathStr = row.get(0).asInstanceOf[String] + var fdis: FSDataInputStream = fs.open(new Path(pathStr)) + br = new BufferedReader(new InputStreamReader(fdis)) - var fdis: FSDataInputStream = fs.open(new Path(pathStr)) + sequences = CustomIOTools.IOTools.readEMBLDNA(br, null) + while (sequences.hasNext) { + count += 1 + seq = sequences.nextRichSequence() + doc = new JSONObject + Process.processEMBL_EnsemblSeq(seq, doc) - var br: BufferedReader = new BufferedReader(new InputStreamReader(fdis)) - - var sequences: RichSequenceIterator = CustomIOTools.IOTools.readEMBLDNA (br, null) - - while (sequences.hasNext) { - n += 1 - var seq: RichSequence = sequences.nextRichSequence() - var doc: JSONObject = new JSONObject - Process.processEMBL_EnsemblSeq(seq, doc) - jsonStr = doc.toString - println("start " + n) - - if (n == 1) { - bis = new BufferedInputStream(new ByteArrayInputStream(("[" + jsonStr).getBytes())) - } else { - bis = new BufferedInputStream(new ByteArrayInputStream(("," + jsonStr).getBytes())) + doc.write(hdfsWriter) + hdfsWriter.write("\n") } + br.close() + fdis.close() - val buff: Array[Byte] = new Array[Byte](1048576) - - var count: Int = bis.read(buff) - while (count != -1) { - fdos.write(buff, 0, count) - fdos.flush() - count = bis.read(buff) - } - - fdos.flush() - bis = null - seq = null - doc = null - } - bis = new BufferedInputStream(new ByteArrayInputStream(("]").getBytes())) - val buff: Array[Byte] = new Array[Byte](1048576) - - var count: Int = bis.read(buff) - while (count != -1) { - fdos.write(buff, 0, count) - fdos.flush() - count = bis.read(buff) - } - fdos.flush() }) - fdos.close() - val df: DataFrame = session.read.json(hdfsPathTemporary) - out.write(df) } - override def setProperties(map: Map[String, Any]): Unit = { - + def setProperties(map: Map[String, Any]): Unit = { + cachePath=MapUtil.get(map,key="cachePath").asInstanceOf[String] } - - override def getPropertyDescriptor(): List[PropertyDescriptor] ={ + override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() + val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").description("Temporary Cache File Path") + .defaultValue("/emblData").required(true) + descriptor = cachePath :: descriptor descriptor } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/Ensembl.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/Ensembl.scala index 6a5a937..b477ac8 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/Ensembl.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/Ensembl.scala @@ -1,10 +1,10 @@ package cn.piflow.bundle.microorganism -import java.io.{BufferedInputStream, BufferedReader, ByteArrayInputStream, InputStreamReader} +import java.io._ import cn.piflow.bundle.microorganism.util.ParserGff3Data import cn.piflow.conf.bean.PropertyDescriptor -import cn.piflow.conf.util.ImageUtil +import cn.piflow.conf.util.{ImageUtil, MapUtil} import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} import org.apache.hadoop.conf.Configuration @@ -19,12 +19,15 @@ class Ensembl extends ConfigurableStop{ override val inportList: List[String] =List(PortEnum.DefaultPort.toString) override val outportList: List[String] = List(PortEnum.DefaultPort.toString) - override def setProperties(map: Map[String, Any]): Unit = { - + var cachePath:String = _ + def setProperties(map: Map[String, Any]): Unit = { + cachePath=MapUtil.get(map,key="cachePath").asInstanceOf[String] } - - override def getPropertyDescriptor(): List[PropertyDescriptor] ={ + override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() + val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").description("Temporary Cache File Path") + .defaultValue("/ensembl").required(true) + descriptor = cachePath :: descriptor descriptor } @@ -43,82 +46,55 @@ class Ensembl extends ConfigurableStop{ override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { val session = pec.get[SparkSession]() - val inDf: DataFrame = in.read() + val configuration: Configuration = new Configuration() - var pathStr: String = "" + var pathStr: String =inDf.take(1)(0).get(0).asInstanceOf[String] + val pathARR: Array[String] = pathStr.split("\\/") var hdfsUrl:String="" - try{ - pathStr =inDf.take(1)(0).get(0).asInstanceOf[String] - val pathARR: Array[String] = pathStr.split("\\/") - - for (x <- (0 until 3)){ - hdfsUrl+=(pathARR(x) +"/") - } - }catch { - case e:Exception => throw new Exception("Path error") + for (x <- (0 until 3)){ + hdfsUrl+=(pathARR(x) +"/") } - configuration.set("fs.defaultFS",hdfsUrl) var fs: FileSystem = FileSystem.get(configuration) - val hdfsPathTemporary:String = hdfsUrl+"/ensembl_genomeParser_temporary.json" - val path: Path = new Path(hdfsPathTemporary) + val hdfsPathTemporary = hdfsUrl+cachePath+"/ensemblCache/ensemblCache.json" + val path: Path = new Path(hdfsPathTemporary) if(fs.exists(path)){ fs.delete(path) } - fs.create(path).close() - var fdos: FSDataOutputStream = fs.append(path) - val buff: Array[Byte] = new Array[Byte](1048576) + + val hdfsWriter: OutputStreamWriter = new OutputStreamWriter(fs.append(path)) val parser: ParserGff3Data = new ParserGff3Data - - var bis: BufferedInputStream =null var fdis: FSDataInputStream =null var br: BufferedReader = null - var sequences: RichSequenceIterator = null var doc: JSONObject = null - var seq: RichSequence = null - var jsonStr: String = "" + var count:Int = 0 inDf.collect().foreach(row => { pathStr = row.get(0).asInstanceOf[String] - println("start parser ^^^" + pathStr) + fdis = fs.open(new Path(pathStr)) br = new BufferedReader(new InputStreamReader(fdis)) var eachStr:String=null while((eachStr = br.readLine()) != null && eachStr != null ){ doc = parser.parserGff3(eachStr) - jsonStr = doc.toString - if(jsonStr.length > 2){ - bis = new BufferedInputStream(new ByteArrayInputStream((jsonStr+"\n").getBytes())) - var count: Int = bis.read(buff) - while (count != -1) { - fdos.write(buff, 0, count) - fdos.flush() - count = bis.read(buff) - } - fdos.flush() - bis.close() - bis = null - doc = null - seq = null - jsonStr = "" + if(doc.toString.length > 2){ + count += 1 + doc.write(hdfsWriter) + hdfsWriter.write("\n") } - } - sequences = null + br.close() - br = null fdis.close() - fdis =null - pathStr = null }) - fdos.close() + hdfsWriter.close() out.write(session.read.json(hdfsPathTemporary)) } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/GenBankData.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/GenBankData.scala index 4d370d7..1be1d28 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/GenBankData.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/GenBankData.scala @@ -2,15 +2,14 @@ package cn.piflow.bundle.microorganism import java.io._ - import cn.piflow.bundle.microorganism.util.{CustomIOTools, Process} import cn.piflow.conf.bean.PropertyDescriptor -import cn.piflow.conf.util.ImageUtil +import cn.piflow.conf.util.{ImageUtil, MapUtil} import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, FileSystem, Path} -import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.{DataFrame, SparkSession} import org.json.JSONObject @@ -21,10 +20,10 @@ class GenBankData extends ConfigurableStop{ val outportList: List[String] = List(PortEnum.DefaultPort.toString) + var cachePath:String = _ def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { val spark = pec.get[SparkSession]() val sc = spark.sparkContext - val inDf= in.read() val configuration: Configuration = new Configuration() @@ -34,21 +33,18 @@ class GenBankData extends ConfigurableStop{ for (x <- (0 until 3)){ hdfsUrl+=(pathARR(x) +"/") } - configuration.set("fs.defaultFS",hdfsUrl) var fs: FileSystem = FileSystem.get(configuration) - val hdfsPathTemporary:String = hdfsUrl+"/microoCache/genbank/genbankcach.json" + val hdfsPathTemporary = hdfsUrl+cachePath+"/genebankCache/genebankCache.json" + val path: Path = new Path(hdfsPathTemporary) if(fs.exists(path)){ fs.delete(path) } fs.create(path).close() - - var fdosOut: FSDataOutputStream = fs.append(path) - var jsonStr: String ="" - var bisIn: BufferedInputStream =null + val hdfsWriter: OutputStreamWriter = new OutputStreamWriter(fs.append(path)) inDf.collect().foreach(row=>{ @@ -60,48 +56,35 @@ class GenBankData extends ConfigurableStop{ var doc: JSONObject = null var count = 0 - while (sequenceIterator.hasNext) { - count += 1 - doc = new JSONObject + while (sequenceIterator.hasNext) { + count += 1 + doc = new JSONObject - val seq = sequenceIterator.nextRichSequence() + val seq = sequenceIterator.nextRichSequence() + Process.processSingleSequence(seq, doc) - Process.processSingleSequence(seq, doc) + doc.write(hdfsWriter) + hdfsWriter.write("\n") - - bisIn = new BufferedInputStream(new ByteArrayInputStream((doc.toString+"\n").getBytes())) - - val buff: Array[Byte] = new Array[Byte](1048576) - var num: Int = bisIn.read(buff) - while (num != -1) { - fdosOut.write(buff, 0, num) - fdosOut.flush() - num = bisIn.read(buff) } - - fdosOut.flush() - bisIn = null - - } + br.close() + fdis.close() }) - - fdosOut.close() + hdfsWriter.close() println("start parser HDFSjsonFile") val df: DataFrame = spark.read.json(hdfsPathTemporary) - - df.schema.printTreeString() out.write(df) - - } def setProperties(map: Map[String, Any]): Unit = { + cachePath=MapUtil.get(map,key="cachePath").asInstanceOf[String] } - override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - + val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").description("Temporary Cache File Path") + .defaultValue("/genbank").required(true) + descriptor = cachePath :: descriptor descriptor } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/Gene.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/Gene.scala index e636e54..bdd4d6d 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/Gene.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/Gene.scala @@ -1,10 +1,10 @@ package cn.piflow.bundle.microorganism -import java.io.{BufferedInputStream, BufferedReader, ByteArrayInputStream, InputStreamReader} +import java.io._ import java.text.SimpleDateFormat import cn.piflow.conf.bean.PropertyDescriptor -import cn.piflow.conf.util.ImageUtil +import cn.piflow.conf.util.{ImageUtil, MapUtil} import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} import org.apache.hadoop.conf.Configuration @@ -18,13 +18,13 @@ class Gene extends ConfigurableStop{ override val inportList: List[String] =List(PortEnum.DefaultPort.toString) override val outportList: List[String] = List(PortEnum.DefaultPort.toString) - + var cachePath:String = _ override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { val session = pec.get[SparkSession]() - val inDf: DataFrame = in.read() - val configuration: Configuration = new Configuration() + + val configuration: Configuration = new Configuration() var pathStr: String =inDf.take(1)(0).get(0).asInstanceOf[String] val pathARR: Array[String] = pathStr.split("\\/") var hdfsUrl:String="" @@ -34,19 +34,15 @@ class Gene extends ConfigurableStop{ configuration.set("fs.defaultFS",hdfsUrl) var fs: FileSystem = FileSystem.get(configuration) - val hdfsPathTemporary:String = hdfsUrl+"/Refseq_genomeParser_temporary.json" - val path: Path = new Path(hdfsPathTemporary) + val hdfsPathTemporary = hdfsUrl+cachePath+"/geneCache/geneCache.json" + val path: Path = new Path(hdfsPathTemporary) if(fs.exists(path)){ fs.delete(path) } - fs.create(path).close() - var fdos: FSDataOutputStream = fs.append(path) - var jsonStr: String ="" - - var bis: BufferedInputStream =null + val hdfsWriter: OutputStreamWriter = new OutputStreamWriter(fs.append(path)) var names:Array[String]=Array("tax_id", "geneID", "symbol", "locus_tag", "synonyms", "dbxrefs", "chromosome", "map_location", "description", "type_of_gene", "symbol_from_nomenclature_authority", "full_name_from_nomenclature_authority", @@ -54,7 +50,7 @@ class Gene extends ConfigurableStop{ val format: java.text.DateFormat = new SimpleDateFormat("yyyyMMdd").asInstanceOf[java.text.DateFormat] val newFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd") - var n:Int=0 + var count:Int=0 inDf.collect().foreach(row => { pathStr = row.get(0).asInstanceOf[String] @@ -66,9 +62,9 @@ class Gene extends ConfigurableStop{ var line:String="" var doc:JSONObject=null - while ((line=br.readLine()) != null){ + while ((line=br.readLine()) != null && count < 10 ){ if( ! line.startsWith("#")){ - n += 1 + count += 1 doc=new JSONObject() val tokens: Array[String] = line.split("\\\t") for(i <- (0 until 15)){ @@ -84,54 +80,28 @@ class Gene extends ConfigurableStop{ doc.put(names(i),newFormat.format(format.parse(tokens(i)))) } } - jsonStr = doc.toString - if (n == 1) { - bis = new BufferedInputStream(new ByteArrayInputStream(("[" + jsonStr).getBytes())) - } else { - bis = new BufferedInputStream(new ByteArrayInputStream(("," + jsonStr).getBytes())) - } - - val buff: Array[Byte] = new Array[Byte](1048576) - - var count: Int = bis.read(buff) - while (count != -1) { - fdos.write(buff, 0, count) - fdos.flush() - count = bis.read(buff) - } - fdos.flush() - bis = null - doc = null + doc.write(hdfsWriter) + hdfsWriter.write("\n") } } - + br.close() + fdis.close() }) - bis = new BufferedInputStream(new ByteArrayInputStream(("]").getBytes())) - val buff: Array[Byte] = new Array[Byte](1048576) - - var count: Int = bis.read(buff) - while (count != -1) { - fdos.write(buff, 0, count) - fdos.flush() - count = bis.read(buff) - } - fdos.flush() - - fdos.close() - + hdfsWriter.close() val df: DataFrame = session.read.json(hdfsPathTemporary) - out.write(df) } - override def setProperties(map: Map[String, Any]): Unit = { - + def setProperties(map: Map[String, Any]): Unit = { + cachePath=MapUtil.get(map,key="cachePath").asInstanceOf[String] } - override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() + val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").description("Temporary Cache File Path") + .defaultValue("/gene").required(true) + descriptor = cachePath :: descriptor descriptor } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/GoData.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/GoData.scala index e0bfb73..061c0e7 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/GoData.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/GoData.scala @@ -35,21 +35,18 @@ class GoData extends ConfigurableStop{ for (x <- (0 until 3)){ hdfsUrl+=(pathARR(x) +"/") } - configuration.set("fs.defaultFS",hdfsUrl) var fs: FileSystem = FileSystem.get(configuration) - val hdfsPathJsonCache = hdfsUrl+cachePath+"/godataCache/godataCache.json" + val hdfsPathTemporary = hdfsUrl+cachePath+"/godataCache/godataCache.json" - val path: Path = new Path(hdfsPathJsonCache) + val path: Path = new Path(hdfsPathTemporary) if(fs.exists(path)){ fs.delete(path) } fs.create(path).close() - var fdosOut: FSDataOutputStream = fs.append(path) - var jsonStr: String ="" - var bisIn: BufferedInputStream =null + val hdfsWriter: OutputStreamWriter = new OutputStreamWriter(fs.append(path)) inDf.collect().foreach(row => { pathStr = row.get(0).asInstanceOf[String] @@ -65,7 +62,7 @@ class GoData extends ConfigurableStop{ } var obj = new JSONObject() var count= 0 - while ((line = br.readLine()) !=null && line !=null ){ + while ((line = br.readLine()) !=null && line !=null && count<10){ val m: Matcher = tv.matcher(line) if (line.startsWith("[")){ if (line .equals("[Term]")){ @@ -89,28 +86,20 @@ class GoData extends ConfigurableStop{ } count += 1 - bisIn = new BufferedInputStream(new ByteArrayInputStream((obj.toString+"\n").getBytes())) - - val buff: Array[Byte] = new Array[Byte](1048576) - var num: Int = bisIn.read(buff) - while (num != -1) { - fdosOut.write(buff, 0, num) - fdosOut.flush() - num = bisIn.read(buff) - } - fdosOut.flush() - bisIn = null - + obj.write(hdfsWriter) + hdfsWriter.write("\n") obj= new JSONObject() } } + br.close() + fdis.close() }) + hdfsWriter.close() - fdosOut.close() println("start parser HDFSjsonFile") - val df: DataFrame = spark.read.json(hdfsPathJsonCache) + val df: DataFrame = spark.read.json(hdfsPathTemporary) out.write(df) } @@ -120,7 +109,8 @@ class GoData extends ConfigurableStop{ override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").defaultValue("").required(true) + val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").description("Temporary Cache File Path") + .defaultValue("/goData").required(true) descriptor = cachePath :: descriptor descriptor } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/InterproData.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/InterproData.scala index 1e02873..48c7be4 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/InterproData.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/InterproData.scala @@ -35,16 +35,15 @@ class InterproData extends ConfigurableStop{ configuration.set("fs.defaultFS",hdfsUrl) var fs: FileSystem = FileSystem.get(configuration) - val hdfsPathJsonCache = hdfsUrl+cachePath+"/interproDataCatch/interproDataCatch.json" - val path: Path = new Path(hdfsPathJsonCache) + val hdfsPathTemporary = hdfsUrl+cachePath+"/interproCache/interproCache.json" + + val path: Path = new Path(hdfsPathTemporary) if(fs.exists(path)){ fs.delete(path) } fs.create(path).close() - var fdosOut: FSDataOutputStream = fs.append(path) - var jsonStr: String ="" - var bisIn: BufferedInputStream =null + val hdfsWriter: OutputStreamWriter = new OutputStreamWriter(fs.append(path)) inDf.collect().foreach(row => { pathStr = row.get(0).asInstanceOf[String] @@ -76,28 +75,20 @@ class InterproData extends ConfigurableStop{ doc.remove("pub_list") } - - bisIn = new BufferedInputStream(new ByteArrayInputStream((doc.toString+"\n").getBytes())) - - val buff: Array[Byte] = new Array[Byte](1048576) - var num: Int = bisIn.read(buff) - while (num != -1) { - fdosOut.write(buff, 0, num) - fdosOut.flush() - num = bisIn.read(buff) - } - fdosOut.flush() - bisIn = null + doc.write(hdfsWriter) + hdfsWriter.write("\n") xml = "" } } + br.close() + fdis.close() }) - fdosOut.close() + hdfsWriter.close() println("start parser HDFSjsonFile") - val df: DataFrame = spark.read.json(hdfsPathJsonCache) + val df: DataFrame = spark.read.json(hdfsPathTemporary) out.write(df) @@ -109,7 +100,8 @@ class InterproData extends ConfigurableStop{ override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").defaultValue("").required(true) + val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").description("Temporary Cache File Path") + .defaultValue("/interpro").required(true) descriptor = cachePath :: descriptor descriptor } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/MedlineData.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/MedlineData.scala index 8c04d3c..ce1a10e 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/MedlineData.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/MedlineData.scala @@ -7,7 +7,7 @@ import java.util.Locale import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup} import cn.piflow.conf.bean.PropertyDescriptor -import cn.piflow.conf.util.ImageUtil +import cn.piflow.conf.util.{ImageUtil, MapUtil} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FSDataInputStream, FileSystem, Path} import org.apache.spark.sql.{DataFrame, SparkSession} @@ -19,11 +19,12 @@ class MedlineData extends ConfigurableStop{ override val inportList: List[String] =List(PortEnum.DefaultPort.toString) override val outportList: List[String] = List(PortEnum.DefaultPort.toString) + var cachePath:String = _ - var docName = "PubmedArticle" - val formatter = new SimpleDateFormat("yyyy-MM-dd") - val format = new SimpleDateFormat("dd-MM-yyyy") - val format_english = new SimpleDateFormat("dd-MMM-yyyy", Locale.ENGLISH) + var docName = "PubmedArticle" + val formatter = new SimpleDateFormat("yyyy-MM-dd") + val format = new SimpleDateFormat("dd-MM-yyyy") + val format_english = new SimpleDateFormat("dd-MMM-yyyy", Locale.ENGLISH) override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { val inDf: DataFrame = in.read() @@ -38,7 +39,7 @@ class MedlineData extends ConfigurableStop{ configuration.set("fs.defaultFS",hdfsUrl) var fs: FileSystem = FileSystem.get(configuration) - val hdfsPathTemporary:String = hdfsUrl+"/yqd/test/medline/medlineAll.json" + val hdfsPathTemporary = hdfsUrl+cachePath+"/medlineCache/medlineCache.json" val path: Path = new Path(hdfsPathTemporary) if(fs.exists(path)){ fs.delete(path) @@ -56,27 +57,23 @@ class MedlineData extends ConfigurableStop{ fileNum += 1 pathStr = row.get(0).asInstanceOf[String] - println(fileNum+"-----start parser ^^^" + pathStr) fdis = fs.open(new Path(pathStr)) br = new BufferedReader(new InputStreamReader(fdis)) + var i = 0 var eachLine: String= "" while (i < 3) { eachLine = br.readLine() i += 1 } + var xml = "" while ((eachLine = br.readLine) != null && eachLine != null ) { - xml += eachLine if (eachLine.indexOf("")!= -1){ count += 1 - doc = XML.toJSONObject(xml).getJSONObject(docName).optJSONObject("MedlineCitation") - println(count) -// println(doc.toString) - if (doc.optJSONObject("DateCreated") != null) { val dateCreated = doc.getJSONObject("DateCreated") @@ -130,9 +127,6 @@ class MedlineData extends ConfigurableStop{ if (pubDate.opt("Year") != null) doc.put("PubYear", pubDate.get("Year")) if (pubDate.opt("Year") != null && pubDate.opt("Month") != null && pubDate.opt("Day") != null) { - - - var month = pubDate.get("Month") if (month.toString.contains("01") || month.toString.contains("1")) month = "Jan" if (month.toString.contains("02") || month.toString.contains("2")) month = "Feb" @@ -147,10 +141,7 @@ class MedlineData extends ConfigurableStop{ if (month.toString.contains("11")) month = "Nov" if (month.toString.contains("12")) month = "Dec" - val date = pubDate.get("Day") + "-" +month + "-" + pubDate.get("Year") - -// println(date+"@@@@@@@@@@@") doc.put("PubDate", formatter.format(format_english.parse(date))) } } @@ -169,17 +160,13 @@ class MedlineData extends ConfigurableStop{ hdfsWriter.write("\n") xml = "" - } } br.close() fdis.close() }) hdfsWriter.close() - val df: DataFrame = pec.get[SparkSession]().read.json(hdfsPathTemporary) - df.schema.printTreeString() -// println(df.count()) out.write(df) } @@ -199,9 +186,7 @@ class MedlineData extends ConfigurableStop{ return findJSONObject(objKey, obj.getJSONArray(key).getJSONObject(i)) } } - } - } } return null @@ -210,15 +195,17 @@ class MedlineData extends ConfigurableStop{ - override def setProperties(map: Map[String, Any]): Unit = { - + def setProperties(map: Map[String, Any]): Unit = { + cachePath=MapUtil.get(map,key="cachePath").asInstanceOf[String] } - override def getPropertyDescriptor(): List[PropertyDescriptor] ={ - var descriptor = List() + override def getPropertyDescriptor(): List[PropertyDescriptor] = { + var descriptor : List[PropertyDescriptor] = List() + val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").description("Temporary Cache File Path") + .defaultValue("/medline").required(true) + descriptor = cachePath :: descriptor descriptor } - override def getIcon(): Array[Byte] = { ImageUtil.getImage("icon/microorganism/MedlineData.png") } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/MicrobeGenomeData.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/MicrobeGenomeData.scala index 324c217..631d787 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/MicrobeGenomeData.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/MicrobeGenomeData.scala @@ -1,10 +1,10 @@ package cn.piflow.bundle.microorganism -import java.io.{BufferedInputStream, BufferedReader, ByteArrayInputStream, InputStreamReader} +import java.io._ import cn.piflow.bundle.microorganism.util.{CustomIOTools, Process} import cn.piflow.conf.bean.PropertyDescriptor -import cn.piflow.conf.util.ImageUtil +import cn.piflow.conf.util.{ImageUtil, MapUtil} import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} import org.apache.hadoop.conf.Configuration @@ -14,118 +14,83 @@ import org.biojavax.bio.seq.{RichSequence, RichSequenceIterator} import org.json.JSONObject + class MicrobeGenomeData extends ConfigurableStop{ override val authorEmail: String = "yangqidong@cnic.cn" override val description: String = "Parse MicrobeGenome data" override val inportList: List[String] =List(PortEnum.DefaultPort.toString) override val outportList: List[String] = List(PortEnum.DefaultPort.toString) + var cachePath:String = _ override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { - val session = pec.get[SparkSession]() - + val spark = pec.get[SparkSession]() val inDf: DataFrame = in.read() + val configuration: Configuration = new Configuration() - var pathStr: String = "" + var pathStr: String =inDf.take(1)(0).get(0).asInstanceOf[String] + val pathARR: Array[String] = pathStr.split("\\/") var hdfsUrl:String="" - try{ - pathStr =inDf.take(1)(0).get(0).asInstanceOf[String] - val pathARR: Array[String] = pathStr.split("\\/") - - for (x <- (0 until 3)){ - hdfsUrl+=(pathARR(x) +"/") - } - }catch { - case e:Exception => throw new Exception("Path error") + for (x <- (0 until 3)){ + hdfsUrl+=(pathARR(x) +"/") } - configuration.set("fs.defaultFS",hdfsUrl) var fs: FileSystem = FileSystem.get(configuration) - val hdfsPathTemporary:String = hdfsUrl+"/NCBI_Microbe_genome_genomeParser_temporary.json" - val path: Path = new Path(hdfsPathTemporary) + val hdfsPathTemporary = hdfsUrl+cachePath+"/microbeGenomeCache/microbeGenomeCache.json" + val path: Path = new Path(hdfsPathTemporary) if(fs.exists(path)){ fs.delete(path) } - fs.create(path).close() - var fdos: FSDataOutputStream = fs.append(path) - val buff: Array[Byte] = new Array[Byte](1048576) - var bis: BufferedInputStream =null + val hdfsWriter: OutputStreamWriter = new OutputStreamWriter(fs.append(path)) + var fdis: FSDataInputStream =null var br: BufferedReader = null var sequences: RichSequenceIterator = null var doc: JSONObject = null var seq: RichSequence = null - var jsonStr: String = "" - var n:Int=0 + + var count:Int=0 inDf.collect().foreach(row => { pathStr = row.get(0).asInstanceOf[String] - println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! start parser ^^^" + pathStr) + fdis = fs.open(new Path(pathStr)) br = new BufferedReader(new InputStreamReader(fdis)) sequences = CustomIOTools.IOTools.readGenbankProtein(br, null) while (sequences.hasNext) { - n += 1 + count += 1 doc = new JSONObject() seq = sequences.nextRichSequence() - Process.processSingleSequence(seq,doc) + Process.processSingleSequence(seq, doc) - jsonStr = doc.toString + doc.write(hdfsWriter) + hdfsWriter.write("\n") - if (n == 1) { - bis = new BufferedInputStream(new ByteArrayInputStream(("[" + jsonStr).getBytes())) - } else { - bis = new BufferedInputStream(new ByteArrayInputStream(("," + jsonStr).getBytes())) - } - var count: Int = bis.read(buff) - while (count != -1) { - fdos.write(buff, 0, count) - fdos.flush() - count = bis.read(buff) - } - fdos.flush() - - bis.close() - bis = null - doc = null seq = null - jsonStr = "" - - } - sequences = null - br.close() - br = null - fdis.close() - fdis =null - pathStr = null - + } + br.close() + fdis.close() }) - bis = new BufferedInputStream(new ByteArrayInputStream(("]").getBytes())) - - var count: Int = bis.read(buff) - while (count != -1) { - fdos.write(buff, 0, count) - fdos.flush() - count = bis.read(buff) - } - fdos.flush() - bis.close() - fdos.close() - - out.write(session.read.json(hdfsPathTemporary)) + hdfsWriter.close() + println("start parser HDFSjsonFile") + val df: DataFrame = spark.read.json(hdfsPathTemporary) + out.write(df) } - override def setProperties(map: Map[String, Any]): Unit = { + def setProperties(map: Map[String, Any]): Unit = { + cachePath=MapUtil.get(map,key="cachePath").asInstanceOf[String] } - - override def getPropertyDescriptor(): List[PropertyDescriptor] ={ + override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() + val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").description("Temporary Cache File Path") + .defaultValue("/microbeGenome").required(true) + descriptor = cachePath :: descriptor descriptor } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/PDBData.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/PDBData.scala index 94c20bb..6f55163 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/PDBData.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/PDBData.scala @@ -4,7 +4,7 @@ import java.io._ import cn.piflow.bundle.microorganism.util.PDB import cn.piflow.conf.bean.PropertyDescriptor -import cn.piflow.conf.util.ImageUtil +import cn.piflow.conf.util.{ImageUtil, MapUtil} import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} import org.apache.hadoop.conf.Configuration @@ -19,105 +19,61 @@ class PDBData extends ConfigurableStop{ override val inportList: List[String] =List(PortEnum.DefaultPort.toString) override val outportList: List[String] = List(PortEnum.DefaultPort.toString) - + var cachePath:String = _ override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { val session = pec.get[SparkSession]() - val inDf: DataFrame = in.read() + val configuration: Configuration = new Configuration() - var pathStr: String = "" + var pathStr: String =inDf.take(1)(0).get(0).asInstanceOf[String] + val pathARR: Array[String] = pathStr.split("\\/") var hdfsUrl:String="" - try{ - pathStr =inDf.take(1)(0).get(0).asInstanceOf[String] - val pathARR: Array[String] = pathStr.split("\\/") - - for (x <- (0 until 3)){ - hdfsUrl+=(pathARR(x) +"/") - } - }catch { - case e:Exception => throw new Exception("Path error") + for (x <- (0 until 3)){ + hdfsUrl+=(pathARR(x) +"/") } - configuration.set("fs.defaultFS",hdfsUrl) var fs: FileSystem = FileSystem.get(configuration) - val hdfsPathTemporary:String = hdfsUrl+"/Refseq_genomeParser_temporary.json" - val path: Path = new Path(hdfsPathTemporary) + val hdfsPathTemporary = hdfsUrl+cachePath+"/PDBCache/PDBCache.json" + val path: Path = new Path(hdfsPathTemporary) if(fs.exists(path)){ fs.delete(path) } - fs.create(path).close() - var fdos: FSDataOutputStream = fs.append(path) - val buff: Array[Byte] = new Array[Byte](1048576) - var bis: BufferedInputStream =null - var fdis: FSDataInputStream =null - var br: BufferedReader = null - var sequences: RichSequenceIterator = null + val hdfsWriter: OutputStreamWriter = new OutputStreamWriter(fs.append(path)) + var doc: JSONObject = null - var seq: RichSequence = null var pdb: PDB = null - var jsonStr: String = "" - var n:Int=0 + var count:Int=0 inDf.collect().foreach(row => { + count += 1 pathStr = row.get(0).asInstanceOf[String] pdb = new PDB(pathStr,fs) doc = pdb.getDoc - jsonStr = doc.toString - n +=1 + doc.write(hdfsWriter) + hdfsWriter.write("\n") - if (n == 1) { - bis = new BufferedInputStream(new ByteArrayInputStream(("[" + jsonStr).getBytes())) - } else { - bis = new BufferedInputStream(new ByteArrayInputStream(("," + jsonStr).getBytes())) - } - var count: Int = bis.read(buff) - while (count != -1) { - fdos.write(buff, 0, count) - fdos.flush() - count = bis.read(buff) - } - fdos.flush() - - bis = null doc = null - seq = null - jsonStr = "" - sequences = null - br = null - fdis =null - pathStr = null - pdb = null }) - bis = new BufferedInputStream(new ByteArrayInputStream(("]").getBytes())) - - var count: Int = bis.read(buff) - while (count != -1) { - fdos.write(buff, 0, count) - fdos.flush() - count = bis.read(buff) - } - fdos.flush() - bis.close() - fdos.close() + hdfsWriter.close() val df: DataFrame = session.read.json(hdfsPathTemporary) - out.write(df) } - - override def setProperties(map: Map[String, Any]): Unit = { - + def setProperties(map: Map[String, Any]): Unit = { + cachePath=MapUtil.get(map,key="cachePath").asInstanceOf[String] } - override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() + val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").description("Temporary Cache File Path") + .defaultValue("/PDB").required(true) + descriptor = cachePath :: descriptor descriptor } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/Pathway.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/Pathway.scala index 74cd31c..8d618d7 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/Pathway.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/Pathway.scala @@ -5,24 +5,30 @@ import java.io.{BufferedReader, InputStreamReader, OutputStreamWriter} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup} import cn.piflow.conf.bean.PropertyDescriptor -import cn.piflow.conf.util.ImageUtil +import cn.piflow.conf.util.{ImageUtil, MapUtil} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, FileSystem, Path} import org.apache.spark.sql.{DataFrame, SparkSession} import org.json.JSONObject + class Pathway extends ConfigurableStop{ override val authorEmail: String = "yangqidong@cnic.cn" override val description: String = "Parse Pathway data" override val inportList: List[String] =List(PortEnum.DefaultPort.toString) override val outportList: List[String] = List(PortEnum.DefaultPort.toString) - override def setProperties(map: Map[String, Any]): Unit = { + var cachePath:String = _ + def setProperties(map: Map[String, Any]): Unit = { + cachePath=MapUtil.get(map,key="cachePath").asInstanceOf[String] } - override def getPropertyDescriptor(): List[PropertyDescriptor] ={ + override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() + val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").description("Temporary Cache File Path") + .defaultValue("/pathway").required(true) + descriptor = cachePath :: descriptor descriptor } @@ -51,8 +57,8 @@ class Pathway extends ConfigurableStop{ configuration.set("fs.defaultFS",hdfsUrl) var fs: FileSystem = FileSystem.get(configuration) -// "/biosampleCache/biosampleCache.json" - val hdfsPathTemporary:String = hdfsUrl+"/yqd/test/kegg/Refseq_genomeParser_temporary2.json" + + val hdfsPathTemporary = hdfsUrl+cachePath+"/pathwayCache/pathwayCache.json" val path: Path = new Path(hdfsPathTemporary) if(fs.exists(path)){ fs.delete(path) @@ -67,17 +73,15 @@ class Pathway extends ConfigurableStop{ inDf.collect().foreach(row => { pathStr = row.get(0).asInstanceOf[String] - println("start parser ^^^" + pathStr) + fdis = fs.open(new Path(pathStr)) br = new BufferedReader(new InputStreamReader(fdis)) var count = 0 while (hasAnotherSequence) { count += 1 - println(count) doc = new JSONObject hasAnotherSequence = util.KeggPathway.process(br, doc) - doc.write(hdfsWriter) hdfsWriter.write("\n") } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/PfamData.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/PfamData.scala index 8100033..c70ed58 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/PfamData.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/PfamData.scala @@ -1,10 +1,10 @@ package cn.piflow.bundle.microorganism -import java.io.{BufferedInputStream, BufferedReader, ByteArrayInputStream, InputStreamReader} +import java.io._ import cn.piflow.bundle.microorganism.util.Pfam import cn.piflow.conf.bean.PropertyDescriptor -import cn.piflow.conf.util.ImageUtil +import cn.piflow.conf.util.{ImageUtil, MapUtil} import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} import org.apache.hadoop.conf.Configuration @@ -20,108 +20,71 @@ class PfamData extends ConfigurableStop{ override val outportList: List[String] = List(PortEnum.DefaultPort.toString) + var cachePath:String = _ override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { val session = pec.get[SparkSession]() - val inDf: DataFrame = in.read() + val configuration: Configuration = new Configuration() - var pathStr: String = "" + var pathStr: String =inDf.take(1)(0).get(0).asInstanceOf[String] + val pathARR: Array[String] = pathStr.split("\\/") var hdfsUrl:String="" - try{ - pathStr =inDf.take(1)(0).get(0).asInstanceOf[String] - val pathARR: Array[String] = pathStr.split("\\/") - - for (x <- (0 until 3)){ - hdfsUrl+=(pathARR(x) +"/") - } - }catch { - case e:Exception => throw new Exception("Path error") + for (x <- (0 until 3)){ + hdfsUrl+=(pathARR(x) +"/") } - configuration.set("fs.defaultFS",hdfsUrl) var fs: FileSystem = FileSystem.get(configuration) - val hdfsPathTemporary:String = hdfsUrl+"/Refseq_genomeParser_temporary.json" - val path: Path = new Path(hdfsPathTemporary) + val hdfsPathTemporary = hdfsUrl+cachePath+"/pfamCache/pfamCache.json" + val path: Path = new Path(hdfsPathTemporary) if(fs.exists(path)){ fs.delete(path) } - fs.create(path).close() - var fdos: FSDataOutputStream = fs.append(path) - val buff: Array[Byte] = new Array[Byte](1048576) - var bis: BufferedInputStream =null + val hdfsWriter: OutputStreamWriter = new OutputStreamWriter(fs.append(path)) + var fdis: FSDataInputStream =null var br: BufferedReader = null - var sequences: RichSequenceIterator = null + var doc: JSONObject = null var seq: RichSequence = null var hasAnotherSequence : Boolean=true - var jsonStr: String = "" - var n:Int=0 + + var count:Int=0 inDf.collect().foreach(row => { pathStr = row.get(0).asInstanceOf[String] fdis = fs.open(new Path(pathStr)) br = new BufferedReader(new InputStreamReader(fdis)) - while( hasAnotherSequence && n < 1000 ){ - n += 1 - + while( hasAnotherSequence ){ + count += 1 doc = new JSONObject() hasAnotherSequence = Pfam.process(br,doc) - jsonStr = doc.toString + doc.write(hdfsWriter) + hdfsWriter.write("\n") - if (n == 1) { - bis = new BufferedInputStream(new ByteArrayInputStream(("[" + jsonStr).getBytes())) - } else { - bis = new BufferedInputStream(new ByteArrayInputStream(("," + jsonStr).getBytes())) - } - var count: Int = bis.read(buff) - while (count != -1) { - fdos.write(buff, 0, count) - fdos.flush() - count = bis.read(buff) - } - fdos.flush() - - bis = null - doc = null - seq = null - jsonStr = "" } - - sequences = null - br = null - fdis =null - pathStr = null + br.close() + fdis.close() }) - bis = new BufferedInputStream(new ByteArrayInputStream(("]").getBytes())) - - var count: Int = bis.read(buff) - while (count != -1) { - fdos.write(buff, 0, count) - fdos.flush() - count = bis.read(buff) - } - fdos.flush() - bis.close() - fdos.close() + hdfsWriter.close() val df: DataFrame = session.read.json(hdfsPathTemporary) - out.write(df) } - override def setProperties(map: Map[String, Any]): Unit = { - + def setProperties(map: Map[String, Any]): Unit = { + cachePath=MapUtil.get(map,key="cachePath").asInstanceOf[String] } - - override def getPropertyDescriptor(): List[PropertyDescriptor] ={ + override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() + val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").description("Temporary Cache File Path") + .defaultValue("/pfam").required(true) + descriptor = cachePath :: descriptor descriptor } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/RefseqData.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/RefseqData.scala index 8ee5b9e..91f5191 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/RefseqData.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/RefseqData.scala @@ -4,7 +4,7 @@ import java.io._ import cn.piflow.bundle.microorganism.util.{CustomIOTools, Process} import cn.piflow.conf.bean.PropertyDescriptor -import cn.piflow.conf.util.ImageUtil +import cn.piflow.conf.util.{ImageUtil, MapUtil} import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} import org.apache.hadoop.conf.Configuration @@ -19,15 +19,13 @@ class RefseqData extends ConfigurableStop{ override val inportList: List[String] =List(PortEnum.DefaultPort.toString) override val outportList: List[String] = List(PortEnum.DefaultPort.toString) - - + var cachePath:String = _ override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { val session = pec.get[SparkSession]() - val inDf: DataFrame = in.read() - val configuration: Configuration = new Configuration() + val configuration: Configuration = new Configuration() var pathStr: String =inDf.take(1)(0).get(0).asInstanceOf[String] val pathARR: Array[String] = pathStr.split("\\/") var hdfsUrl:String="" @@ -37,88 +35,62 @@ class RefseqData extends ConfigurableStop{ configuration.set("fs.defaultFS",hdfsUrl) var fs: FileSystem = FileSystem.get(configuration) - val hdfsPathTemporary:String = hdfsUrl+"/Refseq_genomeParser_temporary.json" - val path: Path = new Path(hdfsPathTemporary) + val hdfsPathTemporary = hdfsUrl+cachePath+"/refseqCache/refseqCache.json" + val path: Path = new Path(hdfsPathTemporary) if(fs.exists(path)){ fs.delete(path) } - fs.create(path).close() - var fdos: FSDataOutputStream = fs.append(path) - val buff: Array[Byte] = new Array[Byte](1048576) - var jsonStr: String ="" - - var bis: BufferedInputStream =null + val hdfsWriter: OutputStreamWriter = new OutputStreamWriter(fs.append(path)) + var doc: JSONObject= null + var seq: RichSequence = null + var br: BufferedReader = null + var fdis: FSDataInputStream =null + var sequences: RichSequenceIterator = null + var count : Int = 0 inDf.collect().foreach(row => { - var n : Int =0 pathStr = row.get(0).asInstanceOf[String] + fdis = fs.open(new Path(pathStr)) + br = new BufferedReader(new InputStreamReader(fdis)) + sequences = CustomIOTools.IOTools.readGenbankProtein(br, null) - var fdis: FSDataInputStream = fs.open(new Path(pathStr)) + while (sequences.hasNext) { + count += 1 + seq = sequences.nextRichSequence() + doc = new JSONObject + Process.processSingleSequence(seq, doc) - var br: BufferedReader = new BufferedReader(new InputStreamReader(fdis)) - - var sequences: RichSequenceIterator = CustomIOTools.IOTools.readGenbankProtein(br, null) - - while (sequences.hasNext) { - n += 1 - var seq: RichSequence = sequences.nextRichSequence() - var doc: JSONObject = new JSONObject - Process.processSingleSequence(seq, doc) - jsonStr = doc.toString - println("start " + n) - - if (n == 1) { - bis = new BufferedInputStream(new ByteArrayInputStream(("[" + jsonStr).getBytes())) - } else { - bis = new BufferedInputStream(new ByteArrayInputStream(("," + jsonStr).getBytes())) + doc.write(hdfsWriter) + hdfsWriter.write("\n") } - - var count: Int = bis.read(buff) - while (count != -1) { - fdos.write(buff, 0, count) - fdos.flush() - count = bis.read(buff) - } - fdos.flush() - bis = null - seq = null - doc = null - } + fdis.close() + br.close() }) - bis = new BufferedInputStream(new ByteArrayInputStream(("]").getBytes())) - - var count: Int = bis.read(buff) - while (count != -1) { - fdos.write(buff, 0, count) - fdos.flush() - count = bis.read(buff) - } - fdos.flush() - bis.close() - fdos.close() - + hdfsWriter.close() println("start parser HDFSjsonFile") val df: DataFrame = session.read.json(hdfsPathTemporary) out.write(df) - } - override def setProperties(map: Map[String, Any]): Unit = { + def setProperties(map: Map[String, Any]): Unit = { + cachePath=MapUtil.get(map,key="cachePath").asInstanceOf[String] + } + override def getPropertyDescriptor(): List[PropertyDescriptor] = { + var descriptor : List[PropertyDescriptor] = List() + val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").description("Temporary Cache File Path") + .defaultValue("/refseqData").required(true) + descriptor = cachePath :: descriptor + descriptor } - - override def getPropertyDescriptor(): List[PropertyDescriptor] ={ - var descriptor : List[PropertyDescriptor] = List() - descriptor -} override def getIcon(): Array[Byte] = { ImageUtil.getImage("icon/microorganism/RefseqData.png") diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/SwissprotData.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/SwissprotData.scala index 833b3e1..5b88b8d 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/SwissprotData.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/SwissprotData.scala @@ -1,10 +1,10 @@ package cn.piflow.bundle.microorganism -import java.io.{BufferedInputStream, BufferedReader, ByteArrayInputStream, InputStreamReader} +import java.io._ import cn.piflow.bundle.microorganism.util.{CustomIOTools, Process} import cn.piflow.conf.bean.PropertyDescriptor -import cn.piflow.conf.util.ImageUtil +import cn.piflow.conf.util.{ImageUtil, MapUtil} import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} import org.apache.hadoop.conf.Configuration @@ -19,12 +19,16 @@ class SwissprotData extends ConfigurableStop{ override val inportList: List[String] =List(PortEnum.DefaultPort.toString) override val outportList: List[String] = List(PortEnum.DefaultPort.toString) - override def setProperties(map: Map[String, Any]): Unit = { + var cachePath:String = _ + def setProperties(map: Map[String, Any]): Unit = { + cachePath=MapUtil.get(map,key="cachePath").asInstanceOf[String] } - - override def getPropertyDescriptor(): List[PropertyDescriptor] ={ + override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() + val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").description("Temporary Cache File Path") + .defaultValue("/swissprot").required(true) + descriptor = cachePath :: descriptor descriptor } @@ -41,96 +45,58 @@ class SwissprotData extends ConfigurableStop{ } override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { - val session = pec.get[SparkSession]() - val inDf: DataFrame = in.read() + val configuration: Configuration = new Configuration() - var pathStr: String = "" + var pathStr: String =inDf.take(1)(0).get(0).asInstanceOf[String] + val pathARR: Array[String] = pathStr.split("\\/") var hdfsUrl:String="" - try{ - pathStr =inDf.take(1)(0).get(0).asInstanceOf[String] - val pathARR: Array[String] = pathStr.split("\\/") - - for (x <- (0 until 3)){ - hdfsUrl+=(pathARR(x) +"/") - } - }catch { - case e:Exception => throw new Exception("Path error") + for (x <- (0 until 3)){ + hdfsUrl+=(pathARR(x) +"/") } - configuration.set("fs.defaultFS",hdfsUrl) var fs: FileSystem = FileSystem.get(configuration) - val hdfsPathTemporary:String = hdfsUrl+"/Refseq_genomeParser_temporary.json" - val path: Path = new Path(hdfsPathTemporary) + val hdfsPathTemporary = hdfsUrl+cachePath+"/swissprotCache/swissprotCache.json" + val path: Path = new Path(hdfsPathTemporary) if(fs.exists(path)){ fs.delete(path) } - fs.create(path).close() - var fdos: FSDataOutputStream = fs.append(path) - val buff: Array[Byte] = new Array[Byte](1048576) - var bis: BufferedInputStream =null + val hdfsWriter: OutputStreamWriter = new OutputStreamWriter(fs.append(path)) + var fdis: FSDataInputStream =null var br: BufferedReader = null var sequences: RichSequenceIterator = null var doc: JSONObject = null var seq: RichSequence = null - var jsonStr: String = "" - var n:Int=0 + var count:Int=0 inDf.collect().foreach(row => { pathStr = row.get(0).asInstanceOf[String] fdis = fs.open(new Path(pathStr)) br = new BufferedReader(new InputStreamReader(fdis)) sequences = CustomIOTools.IOTools.readUniProt(br,null) - while (sequences.hasNext) { + count += 1 + if (count < 100) { + doc = new JSONObject() + seq = sequences.nextRichSequence() + Process.processUniprotSeq(seq, doc) - n += 1 + doc.write(hdfsWriter) + hdfsWriter.write("\n") - doc = new JSONObject() - seq = sequences.nextRichSequence() - Process.processUniprotSeq(seq,doc) - jsonStr = doc.toString - println("start " + n + "String\\\n" ) - - if (n == 1) { - bis = new BufferedInputStream(new ByteArrayInputStream(("[" + jsonStr).getBytes())) - } else { - bis = new BufferedInputStream(new ByteArrayInputStream(("," + jsonStr).getBytes())) + seq = null + } } - var count: Int = bis.read(buff) - while (count != -1) { - fdos.write(buff, 0, count) - fdos.flush() - count = bis.read(buff) - } - fdos.flush() + br.close() + fdis.close() - bis = null - doc = null - seq = null - jsonStr = "" - } - sequences = null - br = null - fdis =null - pathStr = null }) - bis = new BufferedInputStream(new ByteArrayInputStream(("]").getBytes())) - - var count: Int = bis.read(buff) - while (count != -1) { - fdos.write(buff, 0, count) - fdos.flush() - count = bis.read(buff) - } - fdos.flush() - bis.close() - fdos.close() + hdfsWriter.close() val df: DataFrame = session.read.json(hdfsPathTemporary) diff --git a/piflow-bundle/src/main/scala/cn/piflow/conf/bean/FlowBean.scala b/piflow-bundle/src/main/scala/cn/piflow/conf/bean/FlowBean.scala index 9306944..8c9d2ff 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/conf/bean/FlowBean.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/conf/bean/FlowBean.scala @@ -13,6 +13,7 @@ class FlowBean { var checkpoint : String = _ var checkpointParentProcessId : String = _ var runMode : String = _ + var showData : String = _ var stops : List[StopBean] = List() var paths : List[PathBean] = List() @@ -25,6 +26,7 @@ class FlowBean { this.checkpoint = flowMap.getOrElse("checkpoint","").asInstanceOf[String] this.checkpointParentProcessId = flowMap.getOrElse("checkpointParentProcessId", "").asInstanceOf[String] this.runMode = flowMap.getOrElse("runMode","RUN").asInstanceOf[String] + this.showData = flowMap.getOrElse("showData","0").asInstanceOf[String] //construct StopBean List val stopsList = MapUtil.get(flowMap,"stops").asInstanceOf[List[Map[String, Any]]] diff --git a/piflow-core/src/main/scala/cn/piflow/main.scala b/piflow-core/src/main/scala/cn/piflow/main.scala index 984f7c2..069baa7 100644 --- a/piflow-core/src/main/scala/cn/piflow/main.scala +++ b/piflow-core/src/main/scala/cn/piflow/main.scala @@ -378,13 +378,12 @@ class JobOutputStreamImpl() extends JobOutputStream with Logging { def getDataFrame(port: String) = mapDataFrame(port); - def showData() = { + def showData(count:Int) = { mapDataFrame.foreach(en => { val portName = if(en._1.equals("")) "default" else en._1 println(portName + " port: ") - - en._2.apply().show(PropertyUtil.getPropertyValue("data.show").toInt) + en._2.apply().show(count) }) } @@ -484,7 +483,10 @@ class ProcessImpl(flow: Flow, runnerContext: Context, runner: Runner, parentProc runnerListener.onJobCompleted(pe.getContext()); //show data in log - outputs.showData() + val showDataCount = PropertyUtil.getPropertyValue("data.show").toInt + if(showDataCount > 0) { + outputs.showData(showDataCount) + } } catch { case e: Throwable => @@ -528,7 +530,10 @@ class ProcessImpl(flow: Flow, runnerContext: Context, runner: Runner, parentProc } } //show data in log - outputs.showData() + val showDataCount = PropertyUtil.getPropertyValue("data.show").toInt + if(showDataCount > 0) { + outputs.showData(showDataCount) + } //save data in debug mode if(flow.getRunMode() == FlowRunMode.DEBUG) { diff --git a/piflow-server/pom.xml b/piflow-server/pom.xml index d443cb8..dd07743 100644 --- a/piflow-server/pom.xml +++ b/piflow-server/pom.xml @@ -10,6 +10,12 @@ 4.0.0 piflow-server + + + 2.4.14 + 10.0.0 + + piflow @@ -30,40 +36,31 @@ com.typesafe.akka akka-remote_2.11 - 2.3.14 + ${akka.version} com.typesafe.akka akka-actor_2.11 - 2.3.14 + ${akka.version} com.typesafe.akka akka-slf4j_2.11 - 2.3.14 + ${akka.version} com.typesafe.akka - akka-stream-experimental_2.11 - 2.0.4 + akka-http_2.11 + ${akka.http.version} com.typesafe.akka - akka-http-core-experimental_2.11 - 2.0.4 - - - com.typesafe.akka - akka-http-experimental_2.11 - 2.0.4 - - - com.typesafe.akka - akka-http-spray-json-experimental_2.11 - 2.0.4 + akka-http-spray-json_2.11 + ${akka.http.version} + diff --git a/piflow-server/src/main/resources/application.conf b/piflow-server/src/main/resources/application.conf new file mode 100644 index 0000000..12e68bc --- /dev/null +++ b/piflow-server/src/main/resources/application.conf @@ -0,0 +1,10 @@ +PiFlowHTTPService{ + shutdown-timeout:300s +} + +akka { + http { + idle-timeout = 600 s + request-timeout = 200 s + } +} \ No newline at end of file diff --git a/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala b/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala index 772414c..2dc084b 100644 --- a/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala +++ b/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala @@ -19,7 +19,8 @@ import spray.json.DefaultJsonProtocol object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSupport{ - implicit val system = ActorSystem("HTTPService", ConfigFactory.load()) + implicit val config = ConfigFactory.load() + implicit val system = ActorSystem("PiFlowHTTPService", config) implicit val materializer = ActorMaterializer() implicit val executionContext = system.dispatcher var processMap = Map[String, SparkAppHandle]()