Merge remote-tracking branch 'origin/master'

This commit is contained in:
huchuan 2019-03-29 11:31:25 +08:00
commit ce3ab90c21
60 changed files with 716 additions and 958 deletions

View File

@ -6,28 +6,41 @@
{ {
"uuid":"1111", "uuid":"1111",
"name":"SelectFilesByName", "name":"SelectFilesByName",
"bundle":"cn.piflow.bundle.ftp.SelectFilesByName", "bundle":"cn.piflow.bundle.hdfs.SelectFilesByName",
"properties":{ "properties":{
"HDFSUrl":"hdfs://10.0.88.70:9000", "HDFSUrl":"hdfs://10.0.88.70:9000",
"HDFSPath":"/yqd/weishengwu/PDB/", "HDFSPath":"/yqd/ftp/pdb/07/",
"selectionConditions":".*.ent.gz" "selectionConditions":".*.ent.gz"
} }
},{ },{
"uuid":"2222", "uuid":"2222",
"name":"UnzipFilesOnHDFS", "name":"UnzipFilesOnHDFS",
"bundle":"cn.piflow.bundle.http.UnzipFilesOnHDFS", "bundle":"cn.piflow.bundle.hdfs.UnzipFilesOnHDFS",
"properties":{ "properties":{
"isCustomize":"false", "isCustomize":"false",
"hdfsUrl":"hdfs://10.0.88.70:9000", "hdfsUrl":"hdfs://10.0.88.70:9000",
"filePath":"/yqd/weishengwu/PDB/", "filePath":"",
"savePath":"" "savePath":"/yqd/test/pdb/pdb1/"
} }
},{ },{
"uuid":"3333", "uuid":"3333",
"name":"PDBParser", "name":"PDBParser",
"bundle":"cn.piflow.bundle.microorganism.PDBParser", "bundle":"cn.piflow.bundle.microorganism.PDBData",
"properties":{ "properties":{
"cachePath":"/yqd/test/pdb/"
} }
},
{
"uuid":"4444",
"name":"putEs",
"bundle":"cn.piflow.bundle.es.PutEs",
"properties":{
"es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72",
"es_port": "9200",
"es_index": "test_17",
"es_type": "test_17"
}
} }
], ],
"paths":[ "paths":[
@ -41,6 +54,11 @@
"outport":"", "outport":"",
"inport":"", "inport":"",
"to":"PDBParser" "to":"PDBParser"
},{
"from":"PDBParser",
"outport":"",
"inport":"",
"to":"putEs"
} }
] ]
} }

View File

@ -7,30 +7,30 @@
{ {
"uuid":"0000", "uuid":"0000",
"name":"SelectFilesByName", "name":"SelectFilesByName",
"bundle":"cn.piflow.bundle.ftp.SelectFilesByName", "bundle":"cn.piflow.bundle.hdfs.SelectFilesByName",
"properties":{ "properties":{
"HDFSUrl":"hdfs://master2.packone:8020", "HDFSUrl":"hdfs://master2.packone:8020",
"HDFSPath":"/microo/", "HDFSPath":"/yg/microo/",
"selectionConditions":".*ample_set.xml.gz" "selectionConditions":".*ample_set.xml.gz"
} }
},{ },{
"uuid":"2222", "uuid":"2222",
"name":"UnzipFilesOnHDFS", "name":"UnzipFilesOnHDFS",
"bundle":"cn.piflow.bundle.http.UnzipFilesOnHDFS", "bundle":"cn.piflow.bundle.hdfs.UnzipFilesOnHDFS",
"properties":{ "properties":{
"isCustomize":"false", "isCustomize":"false",
"filePath":"", "filePath":"",
"hdfsUrl":"hdfs://master2.packone:8020", "hdfsUrl":"hdfs://master2.packone:8020",
"savePath":"/microo/biosample/biosample/" "savePath":"/yg/microo/biosample/"
} }
}, },
{ {
"uuid":"2222", "uuid":"2222",
"name":"BioSampleParse", "name":"BioSampleParse",
"bundle":"cn.piflow.bundle.microorganism.BioSampleParse", "bundle":"cn.piflow.bundle.microorganism.BioSample",
"properties":{ "properties":{
"cachePath": "/yg/microoCache/"
} }
}, },
{ {
@ -38,10 +38,10 @@
"name":"putEs", "name":"putEs",
"bundle":"cn.piflow.bundle.es.PutEs", "bundle":"cn.piflow.bundle.es.PutEs",
"properties":{ "properties":{
"es_nodes":"10.0.88.70,10.0.88.71,10.0.88.72", "es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72",
"port":"9200", "es_port": "9200",
"es_index":"sample0122", "es_index": "test_11",
"es_type":"sample0122" "es_type": "test_11"
} }
} }

View File

@ -6,22 +6,19 @@
{ {
"uuid":"1111", "uuid":"1111",
"name":"SelectFilesByName", "name":"SelectFilesByName",
"bundle":"cn.piflow.bundle.ftp.SelectFilesByName", "bundle":"cn.piflow.bundle.hdfs.SelectFilesByName",
"properties":{ "properties":{
"HDFSUrl":"hdfs://master2.packone:8020", "HDFSUrl":"hdfs://master2.packone:8020",
"HDFSPath":"/microo/biproject/", "HDFSPath":"/yg/microo/biproject/",
"selectionConditions":"bioproject.xml" "selectionConditions":"bioproject.xml"
} }
}, },
{ {
"uuid":"2222", "uuid":"2222",
"name":"BioProjetDataParse", "name":"BioProjetDataParse",
"bundle":"cn.piflow.bundle.microorganism.BioProjetDataParse", "bundle":"cn.piflow.bundle.microorganism.BioProjetData",
"properties":{ "properties":{
"es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72", "cachePath": "/yg/microoCache/"
"port": "9200",
"es_index": "bioproject",
"es_type": "bioprojecttest002"
} }
}, },
{ {
@ -30,9 +27,9 @@
"bundle": "cn.piflow.bundle.es.PutEs", "bundle": "cn.piflow.bundle.es.PutEs",
"properties": { "properties": {
"es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72", "es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72",
"port": "9200", "es_port": "9200",
"es_index": "bioproject10", "es_index": "test_10",
"es_type": "bioproject10" "es_type": "test_10"
} }
} }
], ],

View File

@ -1,31 +0,0 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"1111",
"name":"LoadFromFtpToHDFS",
"bundle":"cn.piflow.bundle.ftp.LoadFromFtpToHDFS",
"properties":{
"url_str":"ftp.ebi.ac.uk",
"port":"",
"username":"",
"password":"",
"ftpFile":"/pub/databases/ena/sequence/release/con/rel_con_env_07_r138.dat.gz",
"HDFSUrl":"hdfs://10.0.88.70:9000",
"HDFSPath":"/yqd/weishengwu/embl/",
"isFile":"true"
}
}
],
"paths":[
{
"from":"",
"outport":"",
"inport":"",
"to":""
}
]
}
}

View File

@ -7,29 +7,30 @@
{ {
"uuid":"1111", "uuid":"1111",
"name":"SelectFilesByName", "name":"SelectFilesByName",
"bundle":"cn.piflow.bundle.ftp.SelectFilesByName", "bundle":"cn.piflow.bundle.hdfs.SelectFilesByName",
"properties":{ "properties":{
"HDFSUrl":"hdfs://10.0.88.70:9000", "HDFSUrl":"hdfs://10.0.88.70:9000",
"HDFSPath":"/yqd/weishengwu/embl", "HDFSPath":"/yqd/ftp/embl/",
"selectionConditions":".*con_pro_02_r138.dat.gz,.*con_vrl_01_r138.dat.gz,.*pat_phg_01_r138.dat.gz" "selectionConditions":"rel_exp_con_pro_26_r138.dat.gz"
} }
},{ },{
"uuid":"2222", "uuid":"2222",
"name":"UnzipFilesOnHDFS_1", "name":"UnzipFilesOnHDFS",
"bundle":"cn.piflow.bundle.http.UnzipFilesOnHDFS_1", "bundle":"cn.piflow.bundle.hdfs.UnzipFilesOnHDFS",
"properties":{ "properties":{
"isCustomize":"false", "isCustomize":"false",
"filePath":"", "filePath":"",
"fileType":"gz", "hdfsUrl":"hdfs://master2.packone:8020",
"unzipPath":"" "savePath":"/yqd/test/embl/"
} }
}, },
{ {
"uuid":"3333", "uuid":"3333",
"name":"EmblParser", "name":"EmblParser",
"bundle":"cn.piflow.bundle.microorganism.EmblParser", "bundle":"cn.piflow.bundle.microorganism.EmblData",
"properties":{ "properties":{
"cachePath": "/yqd/test/embl/"
} }
},{ },{
"uuid":"4444", "uuid":"4444",
@ -37,9 +38,9 @@
"bundle":"cn.piflow.bundle.es.PutEs", "bundle":"cn.piflow.bundle.es.PutEs",
"properties":{ "properties":{
"es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72", "es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72",
"port": "9200", "es_port": "9200",
"es_index": "embl", "es_index": "test_22",
"es_type": "embl" "es_type": "test_22"
} }
} }
], ],
@ -48,10 +49,10 @@
"from":"SelectFilesByName", "from":"SelectFilesByName",
"outport":"", "outport":"",
"inport":"", "inport":"",
"to":"UnzipFilesOnHDFS_1" "to":"UnzipFilesOnHDFS"
}, },
{ {
"from":"UnzipFilesOnHDFS_1", "from":"UnzipFilesOnHDFS",
"outport":"", "outport":"",
"inport":"", "inport":"",
"to":"EmblParser" "to":"EmblParser"

View File

@ -9,15 +9,26 @@
"bundle":"cn.piflow.bundle.hdfs.SelectFilesByName", "bundle":"cn.piflow.bundle.hdfs.SelectFilesByName",
"properties":{ "properties":{
"HDFSUrl":"hdfs://10.0.88.70:9000", "HDFSUrl":"hdfs://10.0.88.70:9000",
"HDFSPath":"/yqd/ftp/ensembl_gff3/", "HDFSPath":"/yqd/ftp/ensembl_gff3/aspergillus_niger/",
"selectionConditions":".*.gff3" "selectionConditions":".*.gff3.gz"
} }
},{ },{
"uuid":"2222",
"name":"UnzipFilesOnHDFS",
"bundle":"cn.piflow.bundle.hdfs.UnzipFilesOnHDFS",
"properties":{
"isCustomize":"false",
"filePath":"",
"hdfsUrl":"hdfs://10.0.88.70:9000",
"savePath":"/yqd/test/ensembl/"
}
},
{
"uuid":"3333", "uuid":"3333",
"name":"Ensembl_gff3Parser", "name":"Ensembl_gff3Parser",
"bundle":"cn.piflow.bundle.microorganism.Ensembl_gff3Parser", "bundle":"cn.piflow.bundle.microorganism.Ensembl",
"properties":{ "properties":{
"cachePath":"/yqd/test/ensembl/"
} }
},{ },{
"uuid":"4444", "uuid":"4444",
@ -25,9 +36,9 @@
"bundle":"cn.piflow.bundle.es.PutEs", "bundle":"cn.piflow.bundle.es.PutEs",
"properties":{ "properties":{
"es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72", "es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72",
"port": "9200", "es_port": "9200",
"es_index": "yqd_ensembl_index", "es_index": "test_21",
"es_type": "yqd_ensembl_type" "es_type": "test_21"
} }
} }
], ],
@ -36,6 +47,11 @@
"from":"SelectFilesByName", "from":"SelectFilesByName",
"outport":"", "outport":"",
"inport":"", "inport":"",
"to":"UnzipFilesOnHDFS"
},{
"from":"UnzipFilesOnHDFS",
"outport":"",
"inport":"",
"to":"Ensembl_gff3Parser" "to":"Ensembl_gff3Parser"
},{ },{
"from":"Ensembl_gff3Parser", "from":"Ensembl_gff3Parser",

View File

@ -6,33 +6,30 @@
{ {
"uuid":"1111", "uuid":"1111",
"name":"SelectFilesByName", "name":"SelectFilesByName",
"bundle":"cn.piflow.bundle.ftp.SelectFilesByName", "bundle":"cn.piflow.bundle.hdfs.SelectFilesByName",
"properties":{ "properties":{
"HDFSUrl":"hdfs://master2.packone:8020", "HDFSUrl":"hdfs://master2.packone:8020",
"HDFSPath":"/microo/genbank/", "HDFSPath":"/yg/microo/genbank2/",
"selectionConditions":".*.seq.gz" "selectionConditions":"gbbct1.seq.gz"
} }
},{ },{
"uuid":"2222", "uuid":"2222",
"name":"UnzipFilesOnHDFS", "name":"UnzipFilesOnHDFS",
"bundle":"cn.piflow.bundle.http.UnzipFilesOnHDFS", "bundle":"cn.piflow.bundle.hdfs.UnzipFilesOnHDFS",
"properties":{ "properties":{
"isCustomize":"false", "isCustomize":"false",
"filePath":"", "filePath":"",
"hdfsUrl":"hdfs://master2.packone:8020", "hdfsUrl":"hdfs://master2.packone:8020",
"savePath":"/microo/genbank/" "savePath":"/yg/microo/genbank/"
} }
}, },
{ {
"uuid":"3333", "uuid":"3333",
"name":"GenBankParse", "name":"GenBankParse",
"bundle":"cn.piflow.bundle.microorganism.GenBankParse", "bundle":"cn.piflow.bundle.microorganism.GenBankData",
"properties":{ "properties":{
"es_nodes":"10.0.86.239", "cachePath": "/yg/microoCache/"
"port":"9200",
"es_index":"genbank",
"es_type":"data6"
} }
}, },
{ {
@ -41,9 +38,9 @@
"bundle": "cn.piflow.bundle.es.PutEs", "bundle": "cn.piflow.bundle.es.PutEs",
"properties": { "properties": {
"es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72", "es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72",
"port": "9200", "es_port": "9200",
"es_index": "genbank", "es_index": "test_12",
"es_type": "genbank1" "es_type": "test_12"
} }
} }
], ],

View File

@ -6,29 +6,43 @@
{ {
"uuid":"1111", "uuid":"1111",
"name":"SelectFilesByName", "name":"SelectFilesByName",
"bundle":"cn.piflow.bundle.ftp.SelectFilesByName", "bundle":"cn.piflow.bundle.hdfs.SelectFilesByName",
"properties":{ "properties":{
"HDFSUrl":"hdfs://10.0.88.70:9000", "HDFSUrl":"hdfs://10.0.88.70:9000",
"HDFSPath":"/yqd/weishengwu/gene_w/", "HDFSPath":"/yqd/ftp/gene/",
"selectionConditions":".*ene_info" "selectionConditions":"gene_info.gz"
} }
},{ },{
"uuid":"3333", "uuid":"2222",
"name":"GeneParser", "name":"UnzipFilesOnHDFS",
"bundle":"cn.piflow.bundle.microorganism.GeneParser", "bundle":"cn.piflow.bundle.hdfs.UnzipFilesOnHDFS",
"properties":{ "properties":{
"isCustomize":"false",
"filePath":"",
"hdfsUrl":"hdfs://10.0.88.70:9000",
"savePath":"/yqd/test/gene/"
}
},
{
"uuid":"3333",
"name":"Gene",
"bundle":"cn.piflow.bundle.microorganism.Gene",
"properties":{
"cachePath":"/yqd/test/gene/"
} }
},{ },
{
"uuid":"4444", "uuid":"4444",
"name":"PutEs", "name":"putEs",
"bundle":"cn.piflow.bundle.es.PutEs", "bundle":"cn.piflow.bundle.es.PutEs",
"properties":{ "properties":{
"es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72", "es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72",
"port": "9200", "es_port": "9200",
"es_index": "gene_index", "es_index": "test_16",
"es_type": "gene_type" "es_type": "test_16"
} }
} }
], ],
"paths":[ "paths":[
@ -36,12 +50,17 @@
"from":"SelectFilesByName", "from":"SelectFilesByName",
"outport":"", "outport":"",
"inport":"", "inport":"",
"to":"GeneParser" "to":"UnzipFilesOnHDFS"
},{ },{
"from":"GeneParser", "from":"UnzipFilesOnHDFS",
"outport":"", "outport":"",
"inport":"", "inport":"",
"to":"PutEs" "to":"Gene"
},{
"from":"Gene",
"outport":"",
"inport":"",
"to":"putEs"
} }
] ]
} }

View File

@ -6,18 +6,19 @@
{ {
"uuid":"1111", "uuid":"1111",
"name":"SelectFilesByName", "name":"SelectFilesByName",
"bundle":"cn.piflow.bundle.ftp.SelectFilesByName", "bundle":"cn.piflow.bundle.hdfs.SelectFilesByName",
"properties":{ "properties":{
"HDFSUrl":"hdfs://master2.packone:8020", "HDFSUrl":"hdfs://master2.packone:8020",
"HDFSPath":"/microoAll/", "HDFSPath":"/yg/microo/go/",
"selectionConditions":"go.obo" "selectionConditions":"go.obo"
} }
}, },
{ {
"uuid": "3333", "uuid": "3333",
"name": "GoDataParse", "name": "GoDataParse",
"bundle": "cn.piflow.bundle.microorganism.GoDataParse", "bundle": "cn.piflow.bundle.microorganism.GoData",
"properties": { "properties": {
"cachePath": "/yg/microoCache/"
} }
}, },
{ {
@ -26,9 +27,9 @@
"bundle": "cn.piflow.bundle.es.PutEs", "bundle": "cn.piflow.bundle.es.PutEs",
"properties": { "properties": {
"es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72", "es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72",
"port": "9200", "es_port": "9200",
"es_index": "go", "es_index": "test_13",
"es_type": "go" "es_type": "test_13"
} }
} }
], ],

View File

@ -9,7 +9,7 @@
"bundle":"cn.piflow.bundle.http.FileDownHDFS", "bundle":"cn.piflow.bundle.http.FileDownHDFS",
"properties":{ "properties":{
"url_str":"https://gold.jgi.doe.gov/download?mode=site_excel", "url_str":"https://gold.jgi.doe.gov/download?mode=site_excel",
"savePath":"hdfs://master2.packone:8020/microo/golddata/gold.xlsx" "savePath":"hdfs://master2.packone:8020/yg/microo/golddata/gold.xlsx"
} }
}, },
{ {
@ -17,26 +17,7 @@
"name": "ExcelParser", "name": "ExcelParser",
"bundle": "cn.piflow.bundle.excel.ExcelParser", "bundle": "cn.piflow.bundle.excel.ExcelParser",
"properties": { "properties": {
"jaonSavePath":"hdfs://master2.packone:8020/microo/golddata/golddata.json" "cachePath":"hdfs://master2.packone:8020/yg/microo/golddata/golddata.json"
}
},
{
"uuid": "2222",
"name": "GoldDataParse",
"bundle": "cn.piflow.bundle.microorganism.GoldDataParse",
"properties": {
}
},
{
"uuid": "3333",
"name": "putEs",
"bundle": "cn.piflow.bundle.es.PutEs",
"properties": {
"es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72",
"port": "9200",
"es_index": "golddata1",
"es_type": "golddatadaa"
} }
} }
], ],
@ -46,18 +27,6 @@
"outport":"", "outport":"",
"inport":"", "inport":"",
"to":"ExcelParser" "to":"ExcelParser"
},
{
"from":"ExcelParser",
"outport":"",
"inport":"",
"to":"GoldDataParse"
},
{
"from":"GoldDataParse",
"outport":"",
"inport":"",
"to":"putEs"
} }
] ]
} }

View File

@ -6,29 +6,30 @@
{ {
"uuid":"1111", "uuid":"1111",
"name":"SelectFilesByName", "name":"SelectFilesByName",
"bundle":"cn.piflow.bundle.ftp.SelectFilesByName", "bundle":"cn.piflow.bundle.hdfs.SelectFilesByName",
"properties":{ "properties":{
"HDFSUrl":"hdfs://master2.packone:8020", "HDFSUrl":"hdfs://master2.packone:8020",
"HDFSPath":"/microoAll/", "HDFSPath":"/yg/microo/interpro/",
"selectionConditions":"interpro.xml.gz" "selectionConditions":"interpro.xml.gz"
} }
},{ },{
"uuid":"2222", "uuid":"2222",
"name":"UnzipFilesOnHDFS", "name":"UnzipFilesOnHDFS",
"bundle":"cn.piflow.bundle.http.UnzipFilesOnHDFS", "bundle":"cn.piflow.bundle.hdfs.UnzipFilesOnHDFS",
"properties":{ "properties":{
"isCustomize":"false", "isCustomize":"false",
"filePath":"", "filePath":"",
"hdfsUrl":"hdfs://master2.packone:8020", "hdfsUrl":"hdfs://master2.packone:8020",
"savePath":"/microoAll/inter/" "savePath":"/yg/microo/interpro/"
} }
}, },
{ {
"uuid": "3333", "uuid": "3333",
"name": "InterprodataParse", "name": "InterprodataParse",
"bundle": "cn.piflow.bundle.microorganism.InterprodataParse", "bundle": "cn.piflow.bundle.microorganism.InterproData",
"properties": { "properties": {
"cachePath": "/yg/microoCache/"
} }
}, },
{ {
@ -37,9 +38,9 @@
"bundle": "cn.piflow.bundle.es.PutEs", "bundle": "cn.piflow.bundle.es.PutEs",
"properties": { "properties": {
"es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72", "es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72",
"port": "9200", "es_port": "9200",
"es_index": "interpro", "es_index": "test_14",
"es_type": "interpro" "es_type": "test_14"
} }
} }
], ],

View File

@ -15,9 +15,9 @@
},{ },{
"uuid":"3333", "uuid":"3333",
"name":"MicrobeGenomeDataParser", "name":"MicrobeGenomeDataParser",
"bundle":"cn.piflow.bundle.microorganism.MicrobeGenomeDataParser", "bundle":"cn.piflow.bundle.microorganism.MicrobeGenomeData",
"properties":{ "properties":{
"cachePath":"/yqd/test/microbe/"
} }
},{ },{
"uuid":"4444", "uuid":"4444",
@ -25,9 +25,9 @@
"bundle":"cn.piflow.bundle.es.PutEs", "bundle":"cn.piflow.bundle.es.PutEs",
"properties":{ "properties":{
"es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72", "es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72",
"port": "9200", "es_port": "9200",
"es_index": "yqd_ncbi_microbe_genome_index_delet", "es_index": "test_19",
"es_type": "yqd_ncbi_microbe_genome_type_delet" "es_type": "test_19"
} }
} }
], ],

View File

@ -0,0 +1,50 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"1111",
"name":"SelectFilesByName",
"bundle":"cn.piflow.bundle.hdfs.SelectFilesByName",
"properties":{
"HDFSUrl":"hdfs://10.0.88.70:9000",
"HDFSPath":"/yqd/test/pfam/",
"selectionConditions":".*full"
}
},
{
"uuid":"3333",
"name":"PfamData",
"bundle":"cn.piflow.bundle.microorganism.PfamData",
"properties":{
"cachePath":"/yqd/test/pfam/"
}
},
{
"uuid":"4444",
"name":"putEs",
"bundle":"cn.piflow.bundle.es.PutEs",
"properties":{
"es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72",
"es_port": "9200",
"es_index": "test_15",
"es_type": "test_15"
}
}
],
"paths":[
{
"from":"SelectFilesByName",
"outport":"",
"inport":"",
"to":"PfamData"
},{
"from":"PfamData",
"outport":"",
"inport":"",
"to":"putEs"
}
]
}
}

Binary file not shown.

Before

Width:  |  Height:  |  Size: 6.6 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 7.0 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 8.7 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 7.3 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 6.6 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 8.3 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 8.5 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 10 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 8.7 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 7.6 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 5.2 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 6.1 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 7.7 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 6.6 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 6.5 KiB

View File

@ -7,29 +7,29 @@
{ {
"uuid":"1111", "uuid":"1111",
"name":"SelectFilesByName", "name":"SelectFilesByName",
"bundle":"cn.piflow.bundle.ftp.SelectFilesByName", "bundle":"cn.piflow.bundle.hdfs.SelectFilesByName",
"properties":{ "properties":{
"HDFSUrl":"hdfs://10.0.88.70:9000", "HDFSUrl":"hdfs://10.0.88.70:9000",
"HDFSPath":"/yqd/weishengwu/refseq/", "HDFSPath":"/yqd/ftp/refseq/",
"selectionConditions":".*genomic.gbff.gz" "selectionConditions":"archaea.4.genomic.gbff.gz"
} }
},{ },{
"uuid":"2222", "uuid":"2222",
"name":"UnzipFilesOnHDFS_1", "name":"UnzipFilesOnHDFS",
"bundle":"cn.piflow.bundle.http.UnzipFilesOnHDFS_1", "bundle":"cn.piflow.bundle.hdfs.UnzipFilesOnHDFS",
"properties":{ "properties":{
"isCustomize":"false", "isCustomize":"false",
"filePath":"", "filePath":"",
"fileType":"gz", "hdfsUrl":"hdfs://10.0.88.70:9000",
"unzipPath":"" "savePath":"/yqd/test/refseq/"
} }
}, },
{ {
"uuid":"3333", "uuid":"3333",
"name":"Refseq_genomeParser", "name":"Refseq_genomeParser",
"bundle":"cn.piflow.bundle.microorganism.Refseq_genomeParser", "bundle":"cn.piflow.bundle.microorganism.RefseqData",
"properties":{ "properties":{
"cachePath":"/yqd/test/refseq/"
} }
},{ },{
"uuid":"4444", "uuid":"4444",
@ -37,9 +37,9 @@
"bundle":"cn.piflow.bundle.es.PutEs", "bundle":"cn.piflow.bundle.es.PutEs",
"properties":{ "properties":{
"es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72", "es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72",
"port": "9200", "es_port": "9200",
"es_index": "genome", "es_index": "test_20",
"es_type": "archaea" "es_type": "test_20"
} }
} }
], ],
@ -48,10 +48,10 @@
"from":"SelectFilesByName", "from":"SelectFilesByName",
"outport":"", "outport":"",
"inport":"", "inport":"",
"to":"UnzipFilesOnHDFS_1" "to":"UnzipFilesOnHDFS"
}, },
{ {
"from":"UnzipFilesOnHDFS_1", "from":"UnzipFilesOnHDFS",
"outport":"", "outport":"",
"inport":"", "inport":"",
"to":"Refseq_genomeParser" "to":"Refseq_genomeParser"

View File

@ -1,37 +0,0 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"0000",
"name":"SelectFilesByName",
"bundle":"cn.piflow.bundle.ftp.SelectFilesByName",
"properties":{
"HDFSUrl":"hdfs://10.0.88.70:9000",
"HDFSPath":"/yqd/",
"selectionConditions":".*genomic.gbff.gz"
}
},{
"uuid":"1111",
"name":"UnzipFilesOnHDFS_1",
"bundle":"cn.piflow.bundle.http.UnzipFilesOnHDFS_1",
"properties":{
"isCustomize":"true",
"filePath":"hdfs://10.0.88.70:9000/yqd/archaea.1.genomic.gbff.gz",
"fileType":"gz",
"unzipPath":"hdfs://10.0.88.70:9000/yqd/weishengwu/"
}
}
],
"paths":[
{
"from":"SelectFilesByName",
"outport":"",
"inport":"",
"to":"UnzipFilesOnHDFS_1"
}
]
}
}

View File

@ -6,18 +6,29 @@
{ {
"uuid":"1111", "uuid":"1111",
"name":"SelectFilesByName", "name":"SelectFilesByName",
"bundle":"cn.piflow.bundle.ftp.SelectFilesByName", "bundle":"cn.piflow.bundle.hdfs.SelectFilesByName",
"properties":{ "properties":{
"HDFSUrl":"hdfs://10.0.88.70:9000", "HDFSUrl":"hdfs://10.0.88.70:9000",
"HDFSPath":"/yqd/weishengwu/Swissprot_TrEMBL/", "HDFSPath":"/yqd/ftp/swiss/",
"selectionConditions":"uniprot_sprot.dat" "selectionConditions":".*trembl.dat.gz"
} }
},{ },{
"uuid":"3333", "uuid":"2222",
"name":"Swissprot_TrEMBLDataParser", "name":"UnzipFilesOnHDFS",
"bundle":"cn.piflow.bundle.microorganism.Swissprot_TrEMBLDataParser", "bundle":"cn.piflow.bundle.hdfs.UnzipFilesOnHDFS",
"properties":{ "properties":{
"isCustomize":"false",
"filePath":"",
"hdfsUrl":"hdfs://10.0.88.70:9000",
"savePath":"/yqd/test/swiss/"
}
},
{
"uuid":"3333",
"name":"SwissprotData",
"bundle":"cn.piflow.bundle.microorganism.SwissprotData",
"properties":{
"cachePath":"/yqd/test/swiss/"
} }
},{ },{
"uuid":"4444", "uuid":"4444",
@ -25,9 +36,9 @@
"bundle":"cn.piflow.bundle.es.PutEs", "bundle":"cn.piflow.bundle.es.PutEs",
"properties":{ "properties":{
"es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72", "es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72",
"port": "9200", "es_port": "9200",
"es_index": "Swissprot_TrEMBL_index", "es_index": "test_18",
"es_type": "Swissprot_TrEMBL_type" "es_type": "test_18"
} }
} }
], ],
@ -36,9 +47,15 @@
"from":"SelectFilesByName", "from":"SelectFilesByName",
"outport":"", "outport":"",
"inport":"", "inport":"",
"to":"Swissprot_TrEMBLDataParser" "to":"UnzipFilesOnHDFS"
},
{
"from":"UnzipFilesOnHDFS",
"outport":"",
"inport":"",
"to":"SwissprotData"
},{ },{
"from":"Swissprot_TrEMBLDataParser", "from":"SwissprotData",
"outport":"", "outport":"",
"inport":"", "inport":"",
"to":"PutEs" "to":"PutEs"

View File

@ -48,10 +48,14 @@ class FetchEs extends ConfigurableStop {
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("es_nodes").defaultValue("Node of Elasticsearch").required(true) val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("es_nodes")
val es_port = new PropertyDescriptor().name("es_port").displayName("es_port").defaultValue("Port of Elasticsearch").required(true) .description("Node of Elasticsearch").defaultValue("").required(true)
val es_index = new PropertyDescriptor().name("es_index").displayName("es_index").defaultValue("Index of Elasticsearch").required(true) val es_port = new PropertyDescriptor().defaultValue("9200").name("es_port").displayName("es_port")
val es_type = new PropertyDescriptor().name("es_type").displayName("es_type").defaultValue("Type of Elasticsearch").required(true) .description("Port of Elasticsearch").required(true)
val es_index = new PropertyDescriptor().name("es_index").displayName("es_index")
.description("Index of Elasticsearch").defaultValue("").required(true)
val es_type = new PropertyDescriptor().name("es_type").displayName("es_type")
.description("Type of Elasticsearch").defaultValue("").required(true)
descriptor = es_nodes :: descriptor descriptor = es_nodes :: descriptor

View File

@ -47,11 +47,14 @@ class PutEs extends ConfigurableStop {
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("es_nodes").defaultValue("Node of Elasticsearch").required(true) val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("es_nodes")
val es_port = new PropertyDescriptor().name("es_port").displayName("es_port").defaultValue("Port of Elasticsearch").required(true) .description("Node of Elasticsearch").defaultValue("").required(true)
val es_index = new PropertyDescriptor().name("es_index").displayName("es_index").defaultValue("Index of Elasticsearch").required(true) val es_port = new PropertyDescriptor().defaultValue("9200").name("es_port").displayName("es_port")
val es_type = new PropertyDescriptor().name("es_type").displayName("es_type").defaultValue("Type of Elasticsearch").required(true) .description("Port of Elasticsearch").required(true)
val es_index = new PropertyDescriptor().name("es_index").displayName("es_index")
.description("Index of Elasticsearch").defaultValue("").required(true)
val es_type = new PropertyDescriptor().name("es_type").displayName("es_type")
.description("Type of Elasticsearch").defaultValue("").required(true)
descriptor = es_nodes :: descriptor descriptor = es_nodes :: descriptor
descriptor = es_port :: descriptor descriptor = es_port :: descriptor

View File

@ -50,16 +50,22 @@ class QueryEs extends ConfigurableStop {
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("es_nodes").defaultValue("Node of Elasticsearch").required(true) val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("es_nodes")
val es_port = new PropertyDescriptor().name("es_port").displayName("es_port").defaultValue("Port of Elasticsearch").required(true) .description("Node of Elasticsearch").defaultValue("").required(true)
val es_index = new PropertyDescriptor().name("es_index").displayName("es_index").defaultValue("Index of Elasticsearch").required(true) val es_port = new PropertyDescriptor().defaultValue("9200").name("es_port").displayName("es_port")
val es_type = new PropertyDescriptor().name("es_type").displayName("es_type").defaultValue("Type of Elasticsearch").required(true) .description("Port of Elasticsearch").required(true)
val es_index = new PropertyDescriptor().name("es_index").displayName("es_index")
.description("Index of Elasticsearch").defaultValue("").required(true)
val es_type = new PropertyDescriptor().name("es_type").displayName("es_type")
.description("Type of Elasticsearch").defaultValue("").required(true)
val jsonDSL = new PropertyDescriptor().name("jsonDSL").displayName("jsonDSL")
.description("DSL of Elasticsearch").defaultValue("").required(true)
descriptor = es_nodes :: descriptor descriptor = es_nodes :: descriptor
descriptor = es_port :: descriptor descriptor = es_port :: descriptor
descriptor = es_index :: descriptor descriptor = es_index :: descriptor
descriptor = es_type :: descriptor descriptor = es_type :: descriptor
descriptor = jsonDSL :: descriptor
descriptor descriptor
} }

View File

@ -14,39 +14,67 @@ import org.apache.hadoop.fs.Path
class DeleteHdfs extends ConfigurableStop{ class DeleteHdfs extends ConfigurableStop{
override val authorEmail: String = "ygang@cmic.com" override val authorEmail: String = "ygang@cmic.com"
override val inportList: List[String] = List(PortEnum.NonePort.toString) override val inportList: List[String] = List(PortEnum.DefaultPort.toString)
override val outportList: List[String] = List(PortEnum.NonePort.toString) override val outportList: List[String] = List(PortEnum.DefaultPort.toString)
override val description: String = "Delete file or directory on hdfs" override val description: String = "Delete file or directory on hdfs"
var hdfsUrl :String= _ var hdfsUrl :String= _
var deletePath :String = _ var deletePath :String = _
var isCustomize:String=_
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]() val spark = pec.get[SparkSession]()
val array = deletePath.split(",") if (isCustomize.equals("false")){
val inDf = in.read()
for (i<- 0 until array.length){ val configuration: Configuration = new Configuration()
val hdfsPath = hdfsUrl+array(i) var pathStr: String =inDf.take(1)(0).get(0).asInstanceOf[String]
val path = new Path(array(i)) val pathARR: Array[String] = pathStr.split("\\/")
var hdfsUrl:String=""
for (x <- (0 until 3)){
hdfsUrl+=(pathARR(x) +"/")
}
configuration.set("fs.defaultFS",hdfsUrl)
val config = new Configuration() var fs: FileSystem = FileSystem.get(configuration)
config.set("fs.defaultFS",hdfsUrl)
val fs = FileSystem.get(config)
fs.delete(path,true) inDf.collect.foreach(row=>{
pathStr = row.get(0).asInstanceOf[String]
val path = new Path(pathStr)
fs.delete(path,true)
})
} else {
val array = deletePath.split(",")
for (i<- 0 until array.length){
val hdfsPath = hdfsUrl+"/"+array(i)
val path = new Path(hdfsPath)
val config = new Configuration()
config.set("fs.defaultFS",hdfsUrl)
val fs = FileSystem.get(config)
fs.delete(path,true)
}
} }
} }
override def setProperties(map: Map[String, Any]): Unit = { override def setProperties(map: Map[String, Any]): Unit = {
hdfsUrl = MapUtil.get(map,key="hdfsUrl").asInstanceOf[String] hdfsUrl = MapUtil.get(map,key="hdfsUrl").asInstanceOf[String]
deletePath = MapUtil.get(map,key="deletePath").asInstanceOf[String] deletePath = MapUtil.get(map,key="deletePath").asInstanceOf[String]
isCustomize=MapUtil.get(map,key="isCustomize").asInstanceOf[String]
} }
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val hdfsUrl = new PropertyDescriptor().name("hdfsUrl").displayName("hdfsUrl").defaultValue("").required(true) val hdfsUrl = new PropertyDescriptor().name("hdfsUrl").displayName("hdfsUrl").defaultValue("").required(true)
val deletePath = new PropertyDescriptor().name("deletePath").displayName("deletePath").defaultValue("").required(true) val deletePath = new PropertyDescriptor().name("deletePath").displayName("deletePath").defaultValue("").required(true)
val isCustomize = new PropertyDescriptor().name("isCustomize").displayName("isCustomize").description("Whether to customize the compressed file path, if true, " +
"you must specify the path where the compressed file is located . " +
"If it is false, it will automatically find the file path data from the upstream port ")
.defaultValue("false").allowableValues(Set("true","false")).required(true)
descriptor = isCustomize :: descriptor
descriptor = hdfsUrl :: descriptor descriptor = hdfsUrl :: descriptor
descriptor = deletePath :: descriptor descriptor = deletePath :: descriptor
descriptor descriptor

View File

@ -10,7 +10,7 @@ import org.apache.spark.sql.SparkSession
class GetHdfs extends ConfigurableStop{ class GetHdfs extends ConfigurableStop{
override val authorEmail: String = "ygang@cmic.com" override val authorEmail: String = "ygang@cmic.com"
override val description: String = "Get data from hdfs" override val description: String = "Get data from hdfs"
override val inportList: List[String] = List(PortEnum.NonePort.toString) override val inportList: List[String] = List(PortEnum.DefaultPort.toString)
override val outportList: List[String] = List(PortEnum.DefaultPort.toString) override val outportList: List[String] = List(PortEnum.DefaultPort.toString)
var hdfsUrl : String=_ var hdfsUrl : String=_
@ -45,7 +45,7 @@ class GetHdfs extends ConfigurableStop{
} }
else { else {
val rdd = sc.textFile(path) val rdd = sc.textFile(path)
val outDf = rdd.toDF("txt") val outDf = rdd.toDF()
outDf.schema.printTreeString() outDf.schema.printTreeString()
//outDf.show() //outDf.show()
out.write(outDf) out.write(outDf)
@ -60,9 +60,13 @@ class GetHdfs extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val hdfsPath = new PropertyDescriptor().name("hdfsPath").displayName("hdfsPath").defaultValue("").required(true) val hdfsPath = new PropertyDescriptor().name("hdfsPath").displayName("hdfsPath")
val hdfsUrl = new PropertyDescriptor().name("hdfsUrl").displayName("hdfsUrl").defaultValue("").required(true) .defaultValue("").required(true)
val types = new PropertyDescriptor().name("types").displayName("types").defaultValue("txt,parquet,csv,json").required(true) val hdfsUrl = new PropertyDescriptor().name("hdfsUrl").displayName("hdfsUrl")
.defaultValue("").required(true)
val types = new PropertyDescriptor().name("types").displayName("types").description("txt,parquet,csv,json")
.defaultValue("txt").allowableValues(Set("txt","parquet","csv","json")).required(true)
descriptor = types :: descriptor descriptor = types :: descriptor
descriptor = hdfsPath :: descriptor descriptor = hdfsPath :: descriptor
descriptor = hdfsUrl :: descriptor descriptor = hdfsUrl :: descriptor

View File

@ -6,38 +6,54 @@ import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup} import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup}
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.spark.sql. SparkSession import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import scala.collection.mutable.ArrayBuffer
class ListHdfs extends ConfigurableStop{ class ListHdfs extends ConfigurableStop{
override val authorEmail: String = "ygang@cmic.com" override val authorEmail: String = "ygang@cmic.com"
override val inportList: List[String] = List(PortEnum.NonePort.toString) override val inportList: List[String] = List(PortEnum.DefaultPort.toString)
override val outportList: List[String] = List(PortEnum.DefaultPort.toString) override val outportList: List[String] = List(PortEnum.DefaultPort.toString)
override val description: String = "Retrieve a list of files from hdfs" override val description: String = "Retrieve a list of files from hdfs"
var hdfsPath :String= _ var HDFSPath :String= _
var hdfsUrl :String= _ var HDFSUrl :String= _
var list = List("") var pathARR:ArrayBuffer[String]=ArrayBuffer()
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]() val spark = pec.get[SparkSession]()
val sc = spark.sparkContext val sc = spark.sparkContext
val path = new Path(hdfsPath) val path = new Path(HDFSPath)
iterationFile(path.toString) iterationFile(path.toString)
import spark.implicits._ import spark.implicits._
val outDF = sc.parallelize(list).toDF() val rows: List[Row] = pathARR.map(each => {
var arr:Array[String]=Array(each)
val row: Row = Row.fromSeq(arr)
row
}).toList
val rowRDD: RDD[Row] = sc.makeRDD(rows)
// val fields: Array[StructField] = "path".split("/").map(d=>StructField(d,StringType,nullable = true))
// val schema: StructType = StructType(fields)
val schema: StructType = StructType(Array(
StructField("path",StringType)
))
val outDF: DataFrame = spark.createDataFrame(rowRDD,schema)
out.write(outDF) out.write(outDF)
} }
// recursively traverse the folder // recursively traverse the folder
def iterationFile(path: String):Unit = { def iterationFile(path: String):Unit = {
val config = new Configuration() val config = new Configuration()
config.set("fs.defaultFS",hdfsUrl) config.set("fs.defaultFS",HDFSUrl)
val fs = FileSystem.get(config) val fs = FileSystem.get(config)
val listf = new Path(path) val listf = new Path(path)
@ -45,28 +61,25 @@ class ListHdfs extends ConfigurableStop{
for (f <- statuses) { for (f <- statuses) {
val fsPath = f.getPath().toString val fsPath = f.getPath().toString
//println(fsPath)
if (f.isDirectory) { if (f.isDirectory) {
list = fsPath::list // pathARR += fsPath
iterationFile(fsPath) iterationFile(fsPath)
} else{ } else{
list = f.getPath.toString::list pathARR += f.getPath.toString
} }
} }
} }
override def setProperties(map: Map[String, Any]): Unit = { override def setProperties(map: Map[String, Any]): Unit = {
hdfsUrl = MapUtil.get(map,key="hdfsUrl").asInstanceOf[String] HDFSUrl = MapUtil.get(map,key="HDFSUrl").asInstanceOf[String]
hdfsPath = MapUtil.get(map,key="hdfsPath").asInstanceOf[String] HDFSPath = MapUtil.get(map,key="HDFSPath").asInstanceOf[String]
} }
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val hdfsPath = new PropertyDescriptor().name("hdfsPath").displayName("hdfsPath").defaultValue("").required(true) val hdfsPath = new PropertyDescriptor().name("HDFSPath").displayName("HDFSPath").defaultValue("").required(true)
val hdfsUrl = new PropertyDescriptor().name("hdfsUrl").displayName("hdfsUrl").defaultValue("").required(true) val hdfsUrl = new PropertyDescriptor().name("HDFSUrl").displayName("HDFSUrl").defaultValue("").required(true)
descriptor = hdfsPath :: descriptor descriptor = hdfsPath :: descriptor
descriptor = hdfsUrl :: descriptor descriptor = hdfsUrl :: descriptor
descriptor descriptor

View File

@ -13,7 +13,7 @@ import org.apache.spark.sql.SparkSession
class PutHdfs extends ConfigurableStop{ class PutHdfs extends ConfigurableStop{
override val authorEmail: String = "ygang@cmic.com" override val authorEmail: String = "ygang@cmic.com"
override val inportList: List[String] = List(PortEnum.DefaultPort.toString) override val inportList: List[String] = List(PortEnum.DefaultPort.toString)
override val outportList: List[String] = List(PortEnum.NonePort.toString) override val outportList: List[String] = List(PortEnum.DefaultPort.toString)
override val description: String = "Put data to hdfs" override val description: String = "Put data to hdfs"
var hdfsPath :String= _ var hdfsPath :String= _

View File

@ -17,7 +17,7 @@ import scala.collection.mutable.ArrayBuffer
class SelectFilesByName extends ConfigurableStop{ class SelectFilesByName extends ConfigurableStop{
override val authorEmail: String = "yangqidong@cnic.cn" override val authorEmail: String = "yangqidong@cnic.cn"
override val description: String = "Select files by file name" override val description: String = "Select files by file name"
override val inportList: List[String] = List(PortEnum.NonePort.toString) override val inportList: List[String] = List(PortEnum.DefaultPort.toString)
override val outportList: List[String] = List(PortEnum.DefaultPort.toString) override val outportList: List[String] = List(PortEnum.DefaultPort.toString)
var HDFSUrl:String=_ var HDFSUrl:String=_
@ -68,10 +68,11 @@ class SelectFilesByName extends ConfigurableStop{
val schema: StructType = StructType(fields) val schema: StructType = StructType(fields)
val df: DataFrame = session.createDataFrame(rowRDD,schema) val df: DataFrame = session.createDataFrame(rowRDD,schema)
println("#################################################") println("#################################################")
df.show(20) df.show(20)
println("#################################################") println(df.count+"#################################################")
out.write(df) out.write(df)
} }
@ -85,9 +86,12 @@ class SelectFilesByName extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val HDFSUrl = new PropertyDescriptor().name("HDFSUrl").displayName("HDFSUrl").defaultValue("The URL of the HDFS file system, such as hdfs://10.0.88.70:9000").required(true) val HDFSUrl = new PropertyDescriptor().name("HDFSUrl").displayName("HDFSUrl").description("The URL of the HDFS file system, such as hdfs://ip:port")
val HDFSPath = new PropertyDescriptor().name("HDFSPath").displayName("HDFSPath").defaultValue("The save path of the HDFS file system, such as /test/Ab").required(true) .defaultValue("hdfs://").required(true)
val selectionConditions = new PropertyDescriptor().name("selectionConditions").displayName("selectionConditions").defaultValue("To select conditions, you need to fill in regular expressions in java, such as. * abc. *").required(true) val HDFSPath = new PropertyDescriptor().name("HDFSPath").displayName("HDFSPath").description("The save path of the HDFS file system, such as /test/Ab")
.defaultValue("").required(true)
val selectionConditions = new PropertyDescriptor().name("selectionConditions").displayName("selectionConditions").description("To select conditions, you need to fill in regular expressions in java, such as. * abc. *")
.defaultValue("").required(true)
descriptor = HDFSUrl :: descriptor descriptor = HDFSUrl :: descriptor
descriptor = HDFSPath :: descriptor descriptor = HDFSPath :: descriptor
descriptor = selectionConditions :: descriptor descriptor = selectionConditions :: descriptor

View File

@ -186,7 +186,7 @@ class UnzipFilesOnHDFS extends ConfigurableStop {
val isCustomize = new PropertyDescriptor().name("isCustomize").displayName("isCustomize").description("Whether to customize the compressed file path, if true, " + val isCustomize = new PropertyDescriptor().name("isCustomize").displayName("isCustomize").description("Whether to customize the compressed file path, if true, " +
"you must specify the path where the compressed file is located . " + "you must specify the path where the compressed file is located . " +
"If it is false, it will automatically find the file path data from the upstream port ") "If it is false, it will automatically find the file path data from the upstream port ")
.defaultValue("").required(false) .defaultValue("false").allowableValues(Set("true","false")).required(true)
descriptor = isCustomize :: descriptor descriptor = isCustomize :: descriptor
descriptor = filePath :: descriptor descriptor = filePath :: descriptor
descriptor = hdfsUrl :: descriptor descriptor = hdfsUrl :: descriptor

View File

@ -41,17 +41,15 @@ class BioProjetData extends ConfigurableStop{
configuration.set("fs.defaultFS",hdfsUrl) configuration.set("fs.defaultFS",hdfsUrl)
var fs: FileSystem = FileSystem.get(configuration) var fs: FileSystem = FileSystem.get(configuration)
val hdfsPathTemporary = hdfsUrl+cachePath+"/bioprojectCatch/bioprojectCatch.json" val hdfsPathTemporary = hdfsUrl+cachePath+"/bioprojectCache/bioprojectCache.json"
val path: Path = new Path(hdfsPathTemporary) val path: Path = new Path(hdfsPathTemporary)
if(fs.exists(path)){ if(fs.exists(path)){
fs.delete(path) fs.delete(path)
} }
fs.create(path).close() fs.create(path).close()
var fdosOut: FSDataOutputStream = fs.append(path)
var jsonStr: String =""
var bisIn: BufferedInputStream =null
val hdfsWriter: OutputStreamWriter = new OutputStreamWriter(fs.append(path))
inDf.collect().foreach(row =>{ inDf.collect().foreach(row =>{
val pathStr = row.get(0).asInstanceOf[String] val pathStr = row.get(0).asInstanceOf[String]
@ -64,23 +62,20 @@ class BioProjetData extends ConfigurableStop{
br.readLine() br.readLine()
i = i + 1 i = i + 1
} }
var count = 0
var xml = new StringBuffer() var xml = new StringBuffer()
var x = 0 var x = 0
var count = 0
while ((line = br.readLine()) != null && x <1 && line!= null ) { while ((line = br.readLine()) != null && x <1 && line!= null ) {
xml.append(line) xml.append(line)
if (line.equals("</PackageSet>")){ if (line.equals("</PackageSet>")){
println("----------------------------------break") println("break")
x == 1 x == 1
} else if (line.indexOf("</" + name + ">") != -1){ //reach the end of a doc } else if (line.indexOf("</" + name + ">") != -1){ //reach the end of a doc
count = count + 1 count = count + 1
val doc: JSONObject = XML.toJSONObject(xml.toString()).getJSONObject(name) val doc: JSONObject = XML.toJSONObject(xml.toString()).getJSONObject(name)
val accession = doc.getJSONObject("Project").getJSONObject("Project")
.getJSONObject("ProjectID")
.getJSONObject("ArchiveID")
.getString("accession")
val projectDescr = doc.getJSONObject("Project").getJSONObject("Project") val projectDescr = doc.getJSONObject("Project").getJSONObject("Project")
.getJSONObject("ProjectDescr") .getJSONObject("ProjectDescr")
@ -115,7 +110,7 @@ class BioProjetData extends ConfigurableStop{
} }
// ----------------3 // ----------------3
if(doc.getJSONObject("Project").optJSONObject("Submission") != null){ if(doc.getJSONObject("Project").optJSONObject("Submission") != null){
val submission = doc.getJSONObject("Project").optJSONObject("Submission") val submission = doc.getJSONObject("Project").optJSONObject("Submission")
if(submission.opt("submitted") != null){ if(submission.opt("submitted") != null){
@ -166,27 +161,19 @@ class BioProjetData extends ConfigurableStop{
} }
} }
bisIn = new BufferedInputStream(new ByteArrayInputStream((doc.toString+"\n").getBytes()))
val buff: Array[Byte] = new Array[Byte](1048576) doc.write(hdfsWriter)
hdfsWriter.write("\n")
var num: Int = bisIn.read(buff)
while (num != -1) {
fdosOut.write(buff, 0, num)
fdosOut.flush()
num = bisIn.read(buff)
}
fdosOut.flush()
bisIn = null
xml = new StringBuffer() xml = new StringBuffer()
} }
} }
br.close()
fdis.close()
}) })
hdfsWriter.close()
fdosOut.close()
println("start parser HDFSjsonFile") println("start parser HDFSjsonFile")
val df: DataFrame = spark.read.json(hdfsPathTemporary) val df: DataFrame = spark.read.json(hdfsPathTemporary)
out.write(df) out.write(df)
@ -199,7 +186,8 @@ class BioProjetData extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").defaultValue("").required(true) val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").description("Temporary Cache File Path")
.defaultValue("/bioproject").required(true)
descriptor = cachePath :: descriptor descriptor = cachePath :: descriptor
descriptor descriptor
} }

View File

@ -25,7 +25,6 @@ class BioSample extends ConfigurableStop{
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]() val spark = pec.get[SparkSession]()
val sc = spark.sparkContext val sc = spark.sparkContext
// val ssc = spark.sqlContext
val inDf= in.read() val inDf= in.read()
val configuration: Configuration = new Configuration() val configuration: Configuration = new Configuration()
@ -35,27 +34,22 @@ class BioSample extends ConfigurableStop{
for (x <- (0 until 3)){ for (x <- (0 until 3)){
hdfsUrl+=(pathARR(x) +"/") hdfsUrl+=(pathARR(x) +"/")
} }
configuration.set("fs.defaultFS",hdfsUrl) configuration.set("fs.defaultFS",hdfsUrl)
var fs: FileSystem = FileSystem.get(configuration) var fs: FileSystem = FileSystem.get(configuration)
var hdfsPathJsonCache:String = ""
hdfsPathJsonCache = hdfsUrl+cachePath+"/biosampleCache/biosampleCache.json" val hdfsPathTemporary = hdfsUrl+cachePath+"/biosampleCache/biosampleCache.json"
val path: Path = new Path(hdfsPathJsonCache) val path: Path = new Path(hdfsPathTemporary)
if(fs.exists(path)){ if(fs.exists(path)){
fs.delete(path) fs.delete(path)
} }
fs.create(path).close() fs.create(path).close()
val hdfsWriter: OutputStreamWriter = new OutputStreamWriter(fs.append(path))
var fdosOut: FSDataOutputStream = null
var bisIn: BufferedInputStream =null
fdosOut = fs.append(path)
var count = 0 var count = 0
var nameNum = 0
inDf.collect().foreach(row => { inDf.collect().foreach(row => {
pathStr = row.get(0).asInstanceOf[String] pathStr = row.get(0).asInstanceOf[String]
@ -66,8 +60,7 @@ class BioSample extends ConfigurableStop{
br.readLine() br.readLine()
br.readLine() br.readLine()
while ((line = br.readLine()) != null && line!= null ) {
while ((line = br.readLine()) != null && line!= null) {
xml = xml + line xml = xml + line
if (line.indexOf("</" + docName + ">") != -1) { if (line.indexOf("</" + docName + ">") != -1) {
count = count + 1 count = count + 1
@ -79,7 +72,6 @@ class BioSample extends ConfigurableStop{
if (attrs.equals("")) { if (attrs.equals("")) {
doc.remove("Attributes") doc.remove("Attributes")
} }
// Links // Links
val links: String = doc.optString("Links") val links: String = doc.optString("Links")
if (links != null) { if (links != null) {
@ -123,36 +115,24 @@ class BioSample extends ConfigurableStop{
if (jsonDoc.contains("Ids\":{\"Id\":[{")){ if (jsonDoc.contains("Ids\":{\"Id\":[{")){
} else { } else {
// println(count)
// println(jsonDoc)
jsonDoc = jsonDoc.replace("Ids\":{\"Id\":{","Ids\":{\"Id\":[{") jsonDoc = jsonDoc.replace("Ids\":{\"Id\":{","Ids\":{\"Id\":[{")
} }
} }
bisIn = new BufferedInputStream(new ByteArrayInputStream((jsonDoc+"\n").getBytes())) doc.write(hdfsWriter)
hdfsWriter.write("\n")
val buff: Array[Byte] = new Array[Byte](1048576)
var num: Int = bisIn.read(buff)
while (num != -1) {
fdosOut.write(buff, 0, num)
fdosOut.flush()
num = bisIn.read(buff)
}
fdosOut.flush()
bisIn = null
xml = "" xml = ""
} }
} }
br.close()
fdis.close()
}) })
hdfsWriter.close()
// bisIn.close()
fdosOut.close()
println("start parser HDFSjsonFile") println("start parser HDFSjsonFile")
val df: DataFrame = spark.read.json(hdfsPathJsonCache) val df: DataFrame = spark.read.json(hdfsPathTemporary)
out.write(df) out.write(df)
} }
@ -164,7 +144,8 @@ class BioSample extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").defaultValue("").required(true) val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").description("Temporary Cache File Path")
.defaultValue("/biosample").required(true)
descriptor = cachePath :: descriptor descriptor = cachePath :: descriptor
descriptor descriptor
} }

View File

@ -4,7 +4,7 @@ import java.io._
import cn.piflow.bundle.microorganism.util.{CustomIOTools, Process} import cn.piflow.bundle.microorganism.util.{CustomIOTools, Process}
import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.ImageUtil import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup} import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
@ -20,13 +20,12 @@ class EmblData extends ConfigurableStop{
override val outportList: List[String] = List(PortEnum.DefaultPort.toString) override val outportList: List[String] = List(PortEnum.DefaultPort.toString)
var cachePath:String = _
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val session = pec.get[SparkSession]() val session = pec.get[SparkSession]()
val inDf: DataFrame = in.read() val inDf: DataFrame = in.read()
val configuration: Configuration = new Configuration()
val configuration: Configuration = new Configuration()
var pathStr: String =inDf.take(1)(0).get(0).asInstanceOf[String] var pathStr: String =inDf.take(1)(0).get(0).asInstanceOf[String]
val pathARR: Array[String] = pathStr.split("\\/") val pathARR: Array[String] = pathStr.split("\\/")
var hdfsUrl:String="" var hdfsUrl:String=""
@ -36,85 +35,56 @@ class EmblData extends ConfigurableStop{
configuration.set("fs.defaultFS",hdfsUrl) configuration.set("fs.defaultFS",hdfsUrl)
var fs: FileSystem = FileSystem.get(configuration) var fs: FileSystem = FileSystem.get(configuration)
val hdfsPathTemporary:String = hdfsUrl+"/Refseq_genomeParser_temporary.json" val hdfsPathTemporary = hdfsUrl+cachePath+"/emblDataCache/emblDataCache.json"
val path: Path = new Path(hdfsPathTemporary)
val path: Path = new Path(hdfsPathTemporary)
if(fs.exists(path)){ if(fs.exists(path)){
fs.delete(path) fs.delete(path)
} }
fs.create(path).close() fs.create(path).close()
var fdos: FSDataOutputStream = fs.append(path)
var jsonStr: String ="" val hdfsWriter: OutputStreamWriter = new OutputStreamWriter(fs.append(path))
var bis: BufferedInputStream =null var doc: JSONObject = null
var seq: RichSequence = null
var br: BufferedReader = null
var sequences: RichSequenceIterator = null
var count : Int =0
inDf.collect().foreach(row => { inDf.collect().foreach(row => {
var n : Int =0 pathStr = row.get(0).asInstanceOf[String]
pathStr = row.get(0).asInstanceOf[String] var fdis: FSDataInputStream = fs.open(new Path(pathStr))
br = new BufferedReader(new InputStreamReader(fdis))
var fdis: FSDataInputStream = fs.open(new Path(pathStr)) sequences = CustomIOTools.IOTools.readEMBLDNA(br, null)
while (sequences.hasNext) {
count += 1
seq = sequences.nextRichSequence()
doc = new JSONObject
Process.processEMBL_EnsemblSeq(seq, doc)
var br: BufferedReader = new BufferedReader(new InputStreamReader(fdis)) doc.write(hdfsWriter)
hdfsWriter.write("\n")
var sequences: RichSequenceIterator = CustomIOTools.IOTools.readEMBLDNA (br, null)
while (sequences.hasNext) {
n += 1
var seq: RichSequence = sequences.nextRichSequence()
var doc: JSONObject = new JSONObject
Process.processEMBL_EnsemblSeq(seq, doc)
jsonStr = doc.toString
println("start " + n)
if (n == 1) {
bis = new BufferedInputStream(new ByteArrayInputStream(("[" + jsonStr).getBytes()))
} else {
bis = new BufferedInputStream(new ByteArrayInputStream(("," + jsonStr).getBytes()))
} }
br.close()
fdis.close()
val buff: Array[Byte] = new Array[Byte](1048576)
var count: Int = bis.read(buff)
while (count != -1) {
fdos.write(buff, 0, count)
fdos.flush()
count = bis.read(buff)
}
fdos.flush()
bis = null
seq = null
doc = null
}
bis = new BufferedInputStream(new ByteArrayInputStream(("]").getBytes()))
val buff: Array[Byte] = new Array[Byte](1048576)
var count: Int = bis.read(buff)
while (count != -1) {
fdos.write(buff, 0, count)
fdos.flush()
count = bis.read(buff)
}
fdos.flush()
}) })
fdos.close()
val df: DataFrame = session.read.json(hdfsPathTemporary) val df: DataFrame = session.read.json(hdfsPathTemporary)
out.write(df) out.write(df)
} }
override def setProperties(map: Map[String, Any]): Unit = { def setProperties(map: Map[String, Any]): Unit = {
cachePath=MapUtil.get(map,key="cachePath").asInstanceOf[String]
} }
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
override def getPropertyDescriptor(): List[PropertyDescriptor] ={
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").description("Temporary Cache File Path")
.defaultValue("/emblData").required(true)
descriptor = cachePath :: descriptor
descriptor descriptor
} }

View File

@ -1,10 +1,10 @@
package cn.piflow.bundle.microorganism package cn.piflow.bundle.microorganism
import java.io.{BufferedInputStream, BufferedReader, ByteArrayInputStream, InputStreamReader} import java.io._
import cn.piflow.bundle.microorganism.util.ParserGff3Data import cn.piflow.bundle.microorganism.util.ParserGff3Data
import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.ImageUtil import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup} import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
@ -19,12 +19,15 @@ class Ensembl extends ConfigurableStop{
override val inportList: List[String] =List(PortEnum.DefaultPort.toString) override val inportList: List[String] =List(PortEnum.DefaultPort.toString)
override val outportList: List[String] = List(PortEnum.DefaultPort.toString) override val outportList: List[String] = List(PortEnum.DefaultPort.toString)
override def setProperties(map: Map[String, Any]): Unit = { var cachePath:String = _
def setProperties(map: Map[String, Any]): Unit = {
cachePath=MapUtil.get(map,key="cachePath").asInstanceOf[String]
} }
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
override def getPropertyDescriptor(): List[PropertyDescriptor] ={
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").description("Temporary Cache File Path")
.defaultValue("/ensembl").required(true)
descriptor = cachePath :: descriptor
descriptor descriptor
} }
@ -43,82 +46,55 @@ class Ensembl extends ConfigurableStop{
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val session = pec.get[SparkSession]() val session = pec.get[SparkSession]()
val inDf: DataFrame = in.read() val inDf: DataFrame = in.read()
val configuration: Configuration = new Configuration() val configuration: Configuration = new Configuration()
var pathStr: String = "" var pathStr: String =inDf.take(1)(0).get(0).asInstanceOf[String]
val pathARR: Array[String] = pathStr.split("\\/")
var hdfsUrl:String="" var hdfsUrl:String=""
try{ for (x <- (0 until 3)){
pathStr =inDf.take(1)(0).get(0).asInstanceOf[String] hdfsUrl+=(pathARR(x) +"/")
val pathARR: Array[String] = pathStr.split("\\/")
for (x <- (0 until 3)){
hdfsUrl+=(pathARR(x) +"/")
}
}catch {
case e:Exception => throw new Exception("Path error")
} }
configuration.set("fs.defaultFS",hdfsUrl) configuration.set("fs.defaultFS",hdfsUrl)
var fs: FileSystem = FileSystem.get(configuration) var fs: FileSystem = FileSystem.get(configuration)
val hdfsPathTemporary:String = hdfsUrl+"/ensembl_genomeParser_temporary.json" val hdfsPathTemporary = hdfsUrl+cachePath+"/ensemblCache/ensemblCache.json"
val path: Path = new Path(hdfsPathTemporary)
val path: Path = new Path(hdfsPathTemporary)
if(fs.exists(path)){ if(fs.exists(path)){
fs.delete(path) fs.delete(path)
} }
fs.create(path).close() fs.create(path).close()
var fdos: FSDataOutputStream = fs.append(path)
val buff: Array[Byte] = new Array[Byte](1048576) val hdfsWriter: OutputStreamWriter = new OutputStreamWriter(fs.append(path))
val parser: ParserGff3Data = new ParserGff3Data val parser: ParserGff3Data = new ParserGff3Data
var bis: BufferedInputStream =null
var fdis: FSDataInputStream =null var fdis: FSDataInputStream =null
var br: BufferedReader = null var br: BufferedReader = null
var sequences: RichSequenceIterator = null
var doc: JSONObject = null var doc: JSONObject = null
var seq: RichSequence = null var count:Int = 0
var jsonStr: String = ""
inDf.collect().foreach(row => { inDf.collect().foreach(row => {
pathStr = row.get(0).asInstanceOf[String] pathStr = row.get(0).asInstanceOf[String]
println("start parser ^^^" + pathStr)
fdis = fs.open(new Path(pathStr)) fdis = fs.open(new Path(pathStr))
br = new BufferedReader(new InputStreamReader(fdis)) br = new BufferedReader(new InputStreamReader(fdis))
var eachStr:String=null var eachStr:String=null
while((eachStr = br.readLine()) != null && eachStr != null ){ while((eachStr = br.readLine()) != null && eachStr != null ){
doc = parser.parserGff3(eachStr) doc = parser.parserGff3(eachStr)
jsonStr = doc.toString
if(jsonStr.length > 2){
bis = new BufferedInputStream(new ByteArrayInputStream((jsonStr+"\n").getBytes()))
var count: Int = bis.read(buff)
while (count != -1) {
fdos.write(buff, 0, count)
fdos.flush()
count = bis.read(buff)
}
fdos.flush()
bis.close() if(doc.toString.length > 2){
bis = null count += 1
doc = null doc.write(hdfsWriter)
seq = null hdfsWriter.write("\n")
jsonStr = ""
} }
} }
sequences = null
br.close() br.close()
br = null
fdis.close() fdis.close()
fdis =null
pathStr = null
}) })
fdos.close() hdfsWriter.close()
out.write(session.read.json(hdfsPathTemporary)) out.write(session.read.json(hdfsPathTemporary))
} }

View File

@ -2,15 +2,14 @@ package cn.piflow.bundle.microorganism
import java.io._ import java.io._
import cn.piflow.bundle.microorganism.util.{CustomIOTools, Process} import cn.piflow.bundle.microorganism.util.{CustomIOTools, Process}
import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.ImageUtil import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup} import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, FileSystem, Path} import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, FileSystem, Path}
import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.{DataFrame, SparkSession}
import org.json.JSONObject import org.json.JSONObject
@ -21,10 +20,10 @@ class GenBankData extends ConfigurableStop{
val outportList: List[String] = List(PortEnum.DefaultPort.toString) val outportList: List[String] = List(PortEnum.DefaultPort.toString)
var cachePath:String = _
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]() val spark = pec.get[SparkSession]()
val sc = spark.sparkContext val sc = spark.sparkContext
val inDf= in.read() val inDf= in.read()
val configuration: Configuration = new Configuration() val configuration: Configuration = new Configuration()
@ -34,21 +33,18 @@ class GenBankData extends ConfigurableStop{
for (x <- (0 until 3)){ for (x <- (0 until 3)){
hdfsUrl+=(pathARR(x) +"/") hdfsUrl+=(pathARR(x) +"/")
} }
configuration.set("fs.defaultFS",hdfsUrl) configuration.set("fs.defaultFS",hdfsUrl)
var fs: FileSystem = FileSystem.get(configuration) var fs: FileSystem = FileSystem.get(configuration)
val hdfsPathTemporary:String = hdfsUrl+"/microoCache/genbank/genbankcach.json" val hdfsPathTemporary = hdfsUrl+cachePath+"/genebankCache/genebankCache.json"
val path: Path = new Path(hdfsPathTemporary) val path: Path = new Path(hdfsPathTemporary)
if(fs.exists(path)){ if(fs.exists(path)){
fs.delete(path) fs.delete(path)
} }
fs.create(path).close() fs.create(path).close()
val hdfsWriter: OutputStreamWriter = new OutputStreamWriter(fs.append(path))
var fdosOut: FSDataOutputStream = fs.append(path)
var jsonStr: String =""
var bisIn: BufferedInputStream =null
inDf.collect().foreach(row=>{ inDf.collect().foreach(row=>{
@ -60,48 +56,35 @@ class GenBankData extends ConfigurableStop{
var doc: JSONObject = null var doc: JSONObject = null
var count = 0 var count = 0
while (sequenceIterator.hasNext) { while (sequenceIterator.hasNext) {
count += 1 count += 1
doc = new JSONObject doc = new JSONObject
val seq = sequenceIterator.nextRichSequence() val seq = sequenceIterator.nextRichSequence()
Process.processSingleSequence(seq, doc)
Process.processSingleSequence(seq, doc) doc.write(hdfsWriter)
hdfsWriter.write("\n")
bisIn = new BufferedInputStream(new ByteArrayInputStream((doc.toString+"\n").getBytes()))
val buff: Array[Byte] = new Array[Byte](1048576)
var num: Int = bisIn.read(buff)
while (num != -1) {
fdosOut.write(buff, 0, num)
fdosOut.flush()
num = bisIn.read(buff)
} }
br.close()
fdosOut.flush() fdis.close()
bisIn = null
}
}) })
hdfsWriter.close()
fdosOut.close()
println("start parser HDFSjsonFile") println("start parser HDFSjsonFile")
val df: DataFrame = spark.read.json(hdfsPathTemporary) val df: DataFrame = spark.read.json(hdfsPathTemporary)
df.schema.printTreeString()
out.write(df) out.write(df)
} }
def setProperties(map: Map[String, Any]): Unit = { def setProperties(map: Map[String, Any]): Unit = {
cachePath=MapUtil.get(map,key="cachePath").asInstanceOf[String]
} }
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").description("Temporary Cache File Path")
.defaultValue("/genbank").required(true)
descriptor = cachePath :: descriptor
descriptor descriptor
} }

View File

@ -1,10 +1,10 @@
package cn.piflow.bundle.microorganism package cn.piflow.bundle.microorganism
import java.io.{BufferedInputStream, BufferedReader, ByteArrayInputStream, InputStreamReader} import java.io._
import java.text.SimpleDateFormat import java.text.SimpleDateFormat
import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.ImageUtil import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup} import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
@ -18,13 +18,13 @@ class Gene extends ConfigurableStop{
override val inportList: List[String] =List(PortEnum.DefaultPort.toString) override val inportList: List[String] =List(PortEnum.DefaultPort.toString)
override val outportList: List[String] = List(PortEnum.DefaultPort.toString) override val outportList: List[String] = List(PortEnum.DefaultPort.toString)
var cachePath:String = _
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val session = pec.get[SparkSession]() val session = pec.get[SparkSession]()
val inDf: DataFrame = in.read() val inDf: DataFrame = in.read()
val configuration: Configuration = new Configuration()
val configuration: Configuration = new Configuration()
var pathStr: String =inDf.take(1)(0).get(0).asInstanceOf[String] var pathStr: String =inDf.take(1)(0).get(0).asInstanceOf[String]
val pathARR: Array[String] = pathStr.split("\\/") val pathARR: Array[String] = pathStr.split("\\/")
var hdfsUrl:String="" var hdfsUrl:String=""
@ -34,19 +34,15 @@ class Gene extends ConfigurableStop{
configuration.set("fs.defaultFS",hdfsUrl) configuration.set("fs.defaultFS",hdfsUrl)
var fs: FileSystem = FileSystem.get(configuration) var fs: FileSystem = FileSystem.get(configuration)
val hdfsPathTemporary:String = hdfsUrl+"/Refseq_genomeParser_temporary.json" val hdfsPathTemporary = hdfsUrl+cachePath+"/geneCache/geneCache.json"
val path: Path = new Path(hdfsPathTemporary)
val path: Path = new Path(hdfsPathTemporary)
if(fs.exists(path)){ if(fs.exists(path)){
fs.delete(path) fs.delete(path)
} }
fs.create(path).close() fs.create(path).close()
var fdos: FSDataOutputStream = fs.append(path)
var jsonStr: String ="" val hdfsWriter: OutputStreamWriter = new OutputStreamWriter(fs.append(path))
var bis: BufferedInputStream =null
var names:Array[String]=Array("tax_id", "geneID", "symbol", "locus_tag", "synonyms", "dbxrefs", "chromosome", "map_location", "description", "type_of_gene", var names:Array[String]=Array("tax_id", "geneID", "symbol", "locus_tag", "synonyms", "dbxrefs", "chromosome", "map_location", "description", "type_of_gene",
"symbol_from_nomenclature_authority", "full_name_from_nomenclature_authority", "symbol_from_nomenclature_authority", "full_name_from_nomenclature_authority",
@ -54,7 +50,7 @@ class Gene extends ConfigurableStop{
val format: java.text.DateFormat = new SimpleDateFormat("yyyyMMdd").asInstanceOf[java.text.DateFormat] val format: java.text.DateFormat = new SimpleDateFormat("yyyyMMdd").asInstanceOf[java.text.DateFormat]
val newFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd") val newFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
var n:Int=0 var count:Int=0
inDf.collect().foreach(row => { inDf.collect().foreach(row => {
pathStr = row.get(0).asInstanceOf[String] pathStr = row.get(0).asInstanceOf[String]
@ -66,9 +62,9 @@ class Gene extends ConfigurableStop{
var line:String="" var line:String=""
var doc:JSONObject=null var doc:JSONObject=null
while ((line=br.readLine()) != null){ while ((line=br.readLine()) != null && count < 10 ){
if( ! line.startsWith("#")){ if( ! line.startsWith("#")){
n += 1 count += 1
doc=new JSONObject() doc=new JSONObject()
val tokens: Array[String] = line.split("\\\t") val tokens: Array[String] = line.split("\\\t")
for(i <- (0 until 15)){ for(i <- (0 until 15)){
@ -84,54 +80,28 @@ class Gene extends ConfigurableStop{
doc.put(names(i),newFormat.format(format.parse(tokens(i)))) doc.put(names(i),newFormat.format(format.parse(tokens(i))))
} }
} }
jsonStr = doc.toString doc.write(hdfsWriter)
if (n == 1) { hdfsWriter.write("\n")
bis = new BufferedInputStream(new ByteArrayInputStream(("[" + jsonStr).getBytes()))
} else {
bis = new BufferedInputStream(new ByteArrayInputStream(("," + jsonStr).getBytes()))
}
val buff: Array[Byte] = new Array[Byte](1048576)
var count: Int = bis.read(buff)
while (count != -1) {
fdos.write(buff, 0, count)
fdos.flush()
count = bis.read(buff)
}
fdos.flush()
bis = null
doc = null
} }
} }
br.close()
fdis.close()
}) })
bis = new BufferedInputStream(new ByteArrayInputStream(("]").getBytes())) hdfsWriter.close()
val buff: Array[Byte] = new Array[Byte](1048576)
var count: Int = bis.read(buff)
while (count != -1) {
fdos.write(buff, 0, count)
fdos.flush()
count = bis.read(buff)
}
fdos.flush()
fdos.close()
val df: DataFrame = session.read.json(hdfsPathTemporary) val df: DataFrame = session.read.json(hdfsPathTemporary)
out.write(df) out.write(df)
} }
override def setProperties(map: Map[String, Any]): Unit = { def setProperties(map: Map[String, Any]): Unit = {
cachePath=MapUtil.get(map,key="cachePath").asInstanceOf[String]
} }
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").description("Temporary Cache File Path")
.defaultValue("/gene").required(true)
descriptor = cachePath :: descriptor
descriptor descriptor
} }

View File

@ -35,21 +35,18 @@ class GoData extends ConfigurableStop{
for (x <- (0 until 3)){ for (x <- (0 until 3)){
hdfsUrl+=(pathARR(x) +"/") hdfsUrl+=(pathARR(x) +"/")
} }
configuration.set("fs.defaultFS",hdfsUrl) configuration.set("fs.defaultFS",hdfsUrl)
var fs: FileSystem = FileSystem.get(configuration) var fs: FileSystem = FileSystem.get(configuration)
val hdfsPathJsonCache = hdfsUrl+cachePath+"/godataCache/godataCache.json" val hdfsPathTemporary = hdfsUrl+cachePath+"/godataCache/godataCache.json"
val path: Path = new Path(hdfsPathJsonCache) val path: Path = new Path(hdfsPathTemporary)
if(fs.exists(path)){ if(fs.exists(path)){
fs.delete(path) fs.delete(path)
} }
fs.create(path).close() fs.create(path).close()
var fdosOut: FSDataOutputStream = fs.append(path) val hdfsWriter: OutputStreamWriter = new OutputStreamWriter(fs.append(path))
var jsonStr: String =""
var bisIn: BufferedInputStream =null
inDf.collect().foreach(row => { inDf.collect().foreach(row => {
pathStr = row.get(0).asInstanceOf[String] pathStr = row.get(0).asInstanceOf[String]
@ -65,7 +62,7 @@ class GoData extends ConfigurableStop{
} }
var obj = new JSONObject() var obj = new JSONObject()
var count= 0 var count= 0
while ((line = br.readLine()) !=null && line !=null ){ while ((line = br.readLine()) !=null && line !=null && count<10){
val m: Matcher = tv.matcher(line) val m: Matcher = tv.matcher(line)
if (line.startsWith("[")){ if (line.startsWith("[")){
if (line .equals("[Term]")){ if (line .equals("[Term]")){
@ -89,28 +86,20 @@ class GoData extends ConfigurableStop{
} }
count += 1 count += 1
bisIn = new BufferedInputStream(new ByteArrayInputStream((obj.toString+"\n").getBytes())) obj.write(hdfsWriter)
hdfsWriter.write("\n")
val buff: Array[Byte] = new Array[Byte](1048576)
var num: Int = bisIn.read(buff)
while (num != -1) {
fdosOut.write(buff, 0, num)
fdosOut.flush()
num = bisIn.read(buff)
}
fdosOut.flush()
bisIn = null
obj= new JSONObject() obj= new JSONObject()
} }
} }
br.close()
fdis.close()
}) })
hdfsWriter.close()
fdosOut.close()
println("start parser HDFSjsonFile") println("start parser HDFSjsonFile")
val df: DataFrame = spark.read.json(hdfsPathJsonCache) val df: DataFrame = spark.read.json(hdfsPathTemporary)
out.write(df) out.write(df)
} }
@ -120,7 +109,8 @@ class GoData extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").defaultValue("").required(true) val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").description("Temporary Cache File Path")
.defaultValue("/goData").required(true)
descriptor = cachePath :: descriptor descriptor = cachePath :: descriptor
descriptor descriptor
} }

View File

@ -35,16 +35,15 @@ class InterproData extends ConfigurableStop{
configuration.set("fs.defaultFS",hdfsUrl) configuration.set("fs.defaultFS",hdfsUrl)
var fs: FileSystem = FileSystem.get(configuration) var fs: FileSystem = FileSystem.get(configuration)
val hdfsPathJsonCache = hdfsUrl+cachePath+"/interproDataCatch/interproDataCatch.json" val hdfsPathTemporary = hdfsUrl+cachePath+"/interproCache/interproCache.json"
val path: Path = new Path(hdfsPathJsonCache)
val path: Path = new Path(hdfsPathTemporary)
if(fs.exists(path)){ if(fs.exists(path)){
fs.delete(path) fs.delete(path)
} }
fs.create(path).close() fs.create(path).close()
var fdosOut: FSDataOutputStream = fs.append(path) val hdfsWriter: OutputStreamWriter = new OutputStreamWriter(fs.append(path))
var jsonStr: String =""
var bisIn: BufferedInputStream =null
inDf.collect().foreach(row => { inDf.collect().foreach(row => {
pathStr = row.get(0).asInstanceOf[String] pathStr = row.get(0).asInstanceOf[String]
@ -76,28 +75,20 @@ class InterproData extends ConfigurableStop{
doc.remove("pub_list") doc.remove("pub_list")
} }
doc.write(hdfsWriter)
bisIn = new BufferedInputStream(new ByteArrayInputStream((doc.toString+"\n").getBytes())) hdfsWriter.write("\n")
val buff: Array[Byte] = new Array[Byte](1048576)
var num: Int = bisIn.read(buff)
while (num != -1) {
fdosOut.write(buff, 0, num)
fdosOut.flush()
num = bisIn.read(buff)
}
fdosOut.flush()
bisIn = null
xml = "" xml = ""
} }
} }
br.close()
fdis.close()
}) })
fdosOut.close() hdfsWriter.close()
println("start parser HDFSjsonFile") println("start parser HDFSjsonFile")
val df: DataFrame = spark.read.json(hdfsPathJsonCache) val df: DataFrame = spark.read.json(hdfsPathTemporary)
out.write(df) out.write(df)
@ -109,7 +100,8 @@ class InterproData extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").defaultValue("").required(true) val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").description("Temporary Cache File Path")
.defaultValue("/interpro").required(true)
descriptor = cachePath :: descriptor descriptor = cachePath :: descriptor
descriptor descriptor
} }

View File

@ -7,7 +7,7 @@ import java.util.Locale
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup} import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup}
import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.ImageUtil import cn.piflow.conf.util.{ImageUtil, MapUtil}
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataInputStream, FileSystem, Path} import org.apache.hadoop.fs.{FSDataInputStream, FileSystem, Path}
import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.{DataFrame, SparkSession}
@ -19,11 +19,12 @@ class MedlineData extends ConfigurableStop{
override val inportList: List[String] =List(PortEnum.DefaultPort.toString) override val inportList: List[String] =List(PortEnum.DefaultPort.toString)
override val outportList: List[String] = List(PortEnum.DefaultPort.toString) override val outportList: List[String] = List(PortEnum.DefaultPort.toString)
var cachePath:String = _
var docName = "PubmedArticle" var docName = "PubmedArticle"
val formatter = new SimpleDateFormat("yyyy-MM-dd") val formatter = new SimpleDateFormat("yyyy-MM-dd")
val format = new SimpleDateFormat("dd-MM-yyyy") val format = new SimpleDateFormat("dd-MM-yyyy")
val format_english = new SimpleDateFormat("dd-MMM-yyyy", Locale.ENGLISH) val format_english = new SimpleDateFormat("dd-MMM-yyyy", Locale.ENGLISH)
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val inDf: DataFrame = in.read() val inDf: DataFrame = in.read()
@ -38,7 +39,7 @@ class MedlineData extends ConfigurableStop{
configuration.set("fs.defaultFS",hdfsUrl) configuration.set("fs.defaultFS",hdfsUrl)
var fs: FileSystem = FileSystem.get(configuration) var fs: FileSystem = FileSystem.get(configuration)
val hdfsPathTemporary:String = hdfsUrl+"/yqd/test/medline/medlineAll.json" val hdfsPathTemporary = hdfsUrl+cachePath+"/medlineCache/medlineCache.json"
val path: Path = new Path(hdfsPathTemporary) val path: Path = new Path(hdfsPathTemporary)
if(fs.exists(path)){ if(fs.exists(path)){
fs.delete(path) fs.delete(path)
@ -56,27 +57,23 @@ class MedlineData extends ConfigurableStop{
fileNum += 1 fileNum += 1
pathStr = row.get(0).asInstanceOf[String] pathStr = row.get(0).asInstanceOf[String]
println(fileNum+"-----start parser ^^^" + pathStr)
fdis = fs.open(new Path(pathStr)) fdis = fs.open(new Path(pathStr))
br = new BufferedReader(new InputStreamReader(fdis)) br = new BufferedReader(new InputStreamReader(fdis))
var i = 0 var i = 0
var eachLine: String= "" var eachLine: String= ""
while (i < 3) { while (i < 3) {
eachLine = br.readLine() eachLine = br.readLine()
i += 1 i += 1
} }
var xml = "" var xml = ""
while ((eachLine = br.readLine) != null && eachLine != null ) { while ((eachLine = br.readLine) != null && eachLine != null ) {
xml += eachLine xml += eachLine
if (eachLine.indexOf("</"+docName+">")!= -1){ if (eachLine.indexOf("</"+docName+">")!= -1){
count += 1 count += 1
doc = XML.toJSONObject(xml).getJSONObject(docName).optJSONObject("MedlineCitation") doc = XML.toJSONObject(xml).getJSONObject(docName).optJSONObject("MedlineCitation")
println(count)
// println(doc.toString)
if (doc.optJSONObject("DateCreated") != null) { if (doc.optJSONObject("DateCreated") != null) {
val dateCreated = doc.getJSONObject("DateCreated") val dateCreated = doc.getJSONObject("DateCreated")
@ -130,9 +127,6 @@ class MedlineData extends ConfigurableStop{
if (pubDate.opt("Year") != null) doc.put("PubYear", pubDate.get("Year")) if (pubDate.opt("Year") != null) doc.put("PubYear", pubDate.get("Year"))
if (pubDate.opt("Year") != null && pubDate.opt("Month") != null && pubDate.opt("Day") != null) { if (pubDate.opt("Year") != null && pubDate.opt("Month") != null && pubDate.opt("Day") != null) {
var month = pubDate.get("Month") var month = pubDate.get("Month")
if (month.toString.contains("01") || month.toString.contains("1")) month = "Jan" if (month.toString.contains("01") || month.toString.contains("1")) month = "Jan"
if (month.toString.contains("02") || month.toString.contains("2")) month = "Feb" if (month.toString.contains("02") || month.toString.contains("2")) month = "Feb"
@ -147,10 +141,7 @@ class MedlineData extends ConfigurableStop{
if (month.toString.contains("11")) month = "Nov" if (month.toString.contains("11")) month = "Nov"
if (month.toString.contains("12")) month = "Dec" if (month.toString.contains("12")) month = "Dec"
val date = pubDate.get("Day") + "-" +month + "-" + pubDate.get("Year") val date = pubDate.get("Day") + "-" +month + "-" + pubDate.get("Year")
// println(date+"@@@@@@@@@@@")
doc.put("PubDate", formatter.format(format_english.parse(date))) doc.put("PubDate", formatter.format(format_english.parse(date)))
} }
} }
@ -169,17 +160,13 @@ class MedlineData extends ConfigurableStop{
hdfsWriter.write("\n") hdfsWriter.write("\n")
xml = "" xml = ""
} }
} }
br.close() br.close()
fdis.close() fdis.close()
}) })
hdfsWriter.close() hdfsWriter.close()
val df: DataFrame = pec.get[SparkSession]().read.json(hdfsPathTemporary) val df: DataFrame = pec.get[SparkSession]().read.json(hdfsPathTemporary)
df.schema.printTreeString()
// println(df.count())
out.write(df) out.write(df)
} }
@ -199,9 +186,7 @@ class MedlineData extends ConfigurableStop{
return findJSONObject(objKey, obj.getJSONArray(key).getJSONObject(i)) return findJSONObject(objKey, obj.getJSONArray(key).getJSONObject(i))
} }
} }
} }
} }
} }
return null return null
@ -210,15 +195,17 @@ class MedlineData extends ConfigurableStop{
override def setProperties(map: Map[String, Any]): Unit = { def setProperties(map: Map[String, Any]): Unit = {
cachePath=MapUtil.get(map,key="cachePath").asInstanceOf[String]
} }
override def getPropertyDescriptor(): List[PropertyDescriptor] ={ override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor = List() var descriptor : List[PropertyDescriptor] = List()
val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").description("Temporary Cache File Path")
.defaultValue("/medline").required(true)
descriptor = cachePath :: descriptor
descriptor descriptor
} }
override def getIcon(): Array[Byte] = { override def getIcon(): Array[Byte] = {
ImageUtil.getImage("icon/microorganism/MedlineData.png") ImageUtil.getImage("icon/microorganism/MedlineData.png")
} }

View File

@ -1,10 +1,10 @@
package cn.piflow.bundle.microorganism package cn.piflow.bundle.microorganism
import java.io.{BufferedInputStream, BufferedReader, ByteArrayInputStream, InputStreamReader} import java.io._
import cn.piflow.bundle.microorganism.util.{CustomIOTools, Process} import cn.piflow.bundle.microorganism.util.{CustomIOTools, Process}
import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.ImageUtil import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup} import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
@ -14,118 +14,83 @@ import org.biojavax.bio.seq.{RichSequence, RichSequenceIterator}
import org.json.JSONObject import org.json.JSONObject
class MicrobeGenomeData extends ConfigurableStop{ class MicrobeGenomeData extends ConfigurableStop{
override val authorEmail: String = "yangqidong@cnic.cn" override val authorEmail: String = "yangqidong@cnic.cn"
override val description: String = "Parse MicrobeGenome data" override val description: String = "Parse MicrobeGenome data"
override val inportList: List[String] =List(PortEnum.DefaultPort.toString) override val inportList: List[String] =List(PortEnum.DefaultPort.toString)
override val outportList: List[String] = List(PortEnum.DefaultPort.toString) override val outportList: List[String] = List(PortEnum.DefaultPort.toString)
var cachePath:String = _
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val session = pec.get[SparkSession]() val spark = pec.get[SparkSession]()
val inDf: DataFrame = in.read() val inDf: DataFrame = in.read()
val configuration: Configuration = new Configuration() val configuration: Configuration = new Configuration()
var pathStr: String = "" var pathStr: String =inDf.take(1)(0).get(0).asInstanceOf[String]
val pathARR: Array[String] = pathStr.split("\\/")
var hdfsUrl:String="" var hdfsUrl:String=""
try{ for (x <- (0 until 3)){
pathStr =inDf.take(1)(0).get(0).asInstanceOf[String] hdfsUrl+=(pathARR(x) +"/")
val pathARR: Array[String] = pathStr.split("\\/")
for (x <- (0 until 3)){
hdfsUrl+=(pathARR(x) +"/")
}
}catch {
case e:Exception => throw new Exception("Path error")
} }
configuration.set("fs.defaultFS",hdfsUrl) configuration.set("fs.defaultFS",hdfsUrl)
var fs: FileSystem = FileSystem.get(configuration) var fs: FileSystem = FileSystem.get(configuration)
val hdfsPathTemporary:String = hdfsUrl+"/NCBI_Microbe_genome_genomeParser_temporary.json" val hdfsPathTemporary = hdfsUrl+cachePath+"/microbeGenomeCache/microbeGenomeCache.json"
val path: Path = new Path(hdfsPathTemporary)
val path: Path = new Path(hdfsPathTemporary)
if(fs.exists(path)){ if(fs.exists(path)){
fs.delete(path) fs.delete(path)
} }
fs.create(path).close() fs.create(path).close()
var fdos: FSDataOutputStream = fs.append(path)
val buff: Array[Byte] = new Array[Byte](1048576)
var bis: BufferedInputStream =null val hdfsWriter: OutputStreamWriter = new OutputStreamWriter(fs.append(path))
var fdis: FSDataInputStream =null var fdis: FSDataInputStream =null
var br: BufferedReader = null var br: BufferedReader = null
var sequences: RichSequenceIterator = null var sequences: RichSequenceIterator = null
var doc: JSONObject = null var doc: JSONObject = null
var seq: RichSequence = null var seq: RichSequence = null
var jsonStr: String = ""
var n:Int=0 var count:Int=0
inDf.collect().foreach(row => { inDf.collect().foreach(row => {
pathStr = row.get(0).asInstanceOf[String] pathStr = row.get(0).asInstanceOf[String]
println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! start parser ^^^" + pathStr)
fdis = fs.open(new Path(pathStr)) fdis = fs.open(new Path(pathStr))
br = new BufferedReader(new InputStreamReader(fdis)) br = new BufferedReader(new InputStreamReader(fdis))
sequences = CustomIOTools.IOTools.readGenbankProtein(br, null) sequences = CustomIOTools.IOTools.readGenbankProtein(br, null)
while (sequences.hasNext) { while (sequences.hasNext) {
n += 1 count += 1
doc = new JSONObject() doc = new JSONObject()
seq = sequences.nextRichSequence() seq = sequences.nextRichSequence()
Process.processSingleSequence(seq,doc) Process.processSingleSequence(seq, doc)
jsonStr = doc.toString doc.write(hdfsWriter)
hdfsWriter.write("\n")
if (n == 1) {
bis = new BufferedInputStream(new ByteArrayInputStream(("[" + jsonStr).getBytes()))
} else {
bis = new BufferedInputStream(new ByteArrayInputStream(("," + jsonStr).getBytes()))
}
var count: Int = bis.read(buff)
while (count != -1) {
fdos.write(buff, 0, count)
fdos.flush()
count = bis.read(buff)
}
fdos.flush()
bis.close()
bis = null
doc = null
seq = null seq = null
jsonStr = "" }
br.close()
} fdis.close()
sequences = null
br.close()
br = null
fdis.close()
fdis =null
pathStr = null
}) })
bis = new BufferedInputStream(new ByteArrayInputStream(("]").getBytes())) hdfsWriter.close()
var count: Int = bis.read(buff)
while (count != -1) {
fdos.write(buff, 0, count)
fdos.flush()
count = bis.read(buff)
}
fdos.flush()
bis.close()
fdos.close()
out.write(session.read.json(hdfsPathTemporary))
println("start parser HDFSjsonFile")
val df: DataFrame = spark.read.json(hdfsPathTemporary)
out.write(df)
} }
override def setProperties(map: Map[String, Any]): Unit = {
def setProperties(map: Map[String, Any]): Unit = {
cachePath=MapUtil.get(map,key="cachePath").asInstanceOf[String]
} }
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
override def getPropertyDescriptor(): List[PropertyDescriptor] ={
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").description("Temporary Cache File Path")
.defaultValue("/microbeGenome").required(true)
descriptor = cachePath :: descriptor
descriptor descriptor
} }

View File

@ -4,7 +4,7 @@ import java.io._
import cn.piflow.bundle.microorganism.util.PDB import cn.piflow.bundle.microorganism.util.PDB
import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.ImageUtil import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup} import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
@ -19,105 +19,61 @@ class PDBData extends ConfigurableStop{
override val inportList: List[String] =List(PortEnum.DefaultPort.toString) override val inportList: List[String] =List(PortEnum.DefaultPort.toString)
override val outportList: List[String] = List(PortEnum.DefaultPort.toString) override val outportList: List[String] = List(PortEnum.DefaultPort.toString)
var cachePath:String = _
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val session = pec.get[SparkSession]() val session = pec.get[SparkSession]()
val inDf: DataFrame = in.read() val inDf: DataFrame = in.read()
val configuration: Configuration = new Configuration() val configuration: Configuration = new Configuration()
var pathStr: String = "" var pathStr: String =inDf.take(1)(0).get(0).asInstanceOf[String]
val pathARR: Array[String] = pathStr.split("\\/")
var hdfsUrl:String="" var hdfsUrl:String=""
try{ for (x <- (0 until 3)){
pathStr =inDf.take(1)(0).get(0).asInstanceOf[String] hdfsUrl+=(pathARR(x) +"/")
val pathARR: Array[String] = pathStr.split("\\/")
for (x <- (0 until 3)){
hdfsUrl+=(pathARR(x) +"/")
}
}catch {
case e:Exception => throw new Exception("Path error")
} }
configuration.set("fs.defaultFS",hdfsUrl) configuration.set("fs.defaultFS",hdfsUrl)
var fs: FileSystem = FileSystem.get(configuration) var fs: FileSystem = FileSystem.get(configuration)
val hdfsPathTemporary:String = hdfsUrl+"/Refseq_genomeParser_temporary.json" val hdfsPathTemporary = hdfsUrl+cachePath+"/PDBCache/PDBCache.json"
val path: Path = new Path(hdfsPathTemporary)
val path: Path = new Path(hdfsPathTemporary)
if(fs.exists(path)){ if(fs.exists(path)){
fs.delete(path) fs.delete(path)
} }
fs.create(path).close() fs.create(path).close()
var fdos: FSDataOutputStream = fs.append(path)
val buff: Array[Byte] = new Array[Byte](1048576)
var bis: BufferedInputStream =null val hdfsWriter: OutputStreamWriter = new OutputStreamWriter(fs.append(path))
var fdis: FSDataInputStream =null
var br: BufferedReader = null
var sequences: RichSequenceIterator = null
var doc: JSONObject = null var doc: JSONObject = null
var seq: RichSequence = null
var pdb: PDB = null var pdb: PDB = null
var jsonStr: String = "" var count:Int=0
var n:Int=0
inDf.collect().foreach(row => { inDf.collect().foreach(row => {
count += 1
pathStr = row.get(0).asInstanceOf[String] pathStr = row.get(0).asInstanceOf[String]
pdb = new PDB(pathStr,fs) pdb = new PDB(pathStr,fs)
doc = pdb.getDoc doc = pdb.getDoc
jsonStr = doc.toString doc.write(hdfsWriter)
n +=1 hdfsWriter.write("\n")
if (n == 1) {
bis = new BufferedInputStream(new ByteArrayInputStream(("[" + jsonStr).getBytes()))
} else {
bis = new BufferedInputStream(new ByteArrayInputStream(("," + jsonStr).getBytes()))
}
var count: Int = bis.read(buff)
while (count != -1) {
fdos.write(buff, 0, count)
fdos.flush()
count = bis.read(buff)
}
fdos.flush()
bis = null
doc = null doc = null
seq = null
jsonStr = ""
sequences = null
br = null
fdis =null
pathStr = null
pdb = null
}) })
bis = new BufferedInputStream(new ByteArrayInputStream(("]").getBytes())) hdfsWriter.close()
var count: Int = bis.read(buff)
while (count != -1) {
fdos.write(buff, 0, count)
fdos.flush()
count = bis.read(buff)
}
fdos.flush()
bis.close()
fdos.close()
val df: DataFrame = session.read.json(hdfsPathTemporary) val df: DataFrame = session.read.json(hdfsPathTemporary)
out.write(df) out.write(df)
} }
def setProperties(map: Map[String, Any]): Unit = {
override def setProperties(map: Map[String, Any]): Unit = { cachePath=MapUtil.get(map,key="cachePath").asInstanceOf[String]
} }
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").description("Temporary Cache File Path")
.defaultValue("/PDB").required(true)
descriptor = cachePath :: descriptor
descriptor descriptor
} }

View File

@ -5,24 +5,30 @@ import java.io.{BufferedReader, InputStreamReader, OutputStreamWriter}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup} import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup}
import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.ImageUtil import cn.piflow.conf.util.{ImageUtil, MapUtil}
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, FileSystem, Path} import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, FileSystem, Path}
import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.{DataFrame, SparkSession}
import org.json.JSONObject import org.json.JSONObject
class Pathway extends ConfigurableStop{ class Pathway extends ConfigurableStop{
override val authorEmail: String = "yangqidong@cnic.cn" override val authorEmail: String = "yangqidong@cnic.cn"
override val description: String = "Parse Pathway data" override val description: String = "Parse Pathway data"
override val inportList: List[String] =List(PortEnum.DefaultPort.toString) override val inportList: List[String] =List(PortEnum.DefaultPort.toString)
override val outportList: List[String] = List(PortEnum.DefaultPort.toString) override val outportList: List[String] = List(PortEnum.DefaultPort.toString)
override def setProperties(map: Map[String, Any]): Unit = {
var cachePath:String = _
def setProperties(map: Map[String, Any]): Unit = {
cachePath=MapUtil.get(map,key="cachePath").asInstanceOf[String]
} }
override def getPropertyDescriptor(): List[PropertyDescriptor] ={ override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").description("Temporary Cache File Path")
.defaultValue("/pathway").required(true)
descriptor = cachePath :: descriptor
descriptor descriptor
} }
@ -51,8 +57,8 @@ class Pathway extends ConfigurableStop{
configuration.set("fs.defaultFS",hdfsUrl) configuration.set("fs.defaultFS",hdfsUrl)
var fs: FileSystem = FileSystem.get(configuration) var fs: FileSystem = FileSystem.get(configuration)
// "/biosampleCache/biosampleCache.json"
val hdfsPathTemporary:String = hdfsUrl+"/yqd/test/kegg/Refseq_genomeParser_temporary2.json" val hdfsPathTemporary = hdfsUrl+cachePath+"/pathwayCache/pathwayCache.json"
val path: Path = new Path(hdfsPathTemporary) val path: Path = new Path(hdfsPathTemporary)
if(fs.exists(path)){ if(fs.exists(path)){
fs.delete(path) fs.delete(path)
@ -67,17 +73,15 @@ class Pathway extends ConfigurableStop{
inDf.collect().foreach(row => { inDf.collect().foreach(row => {
pathStr = row.get(0).asInstanceOf[String] pathStr = row.get(0).asInstanceOf[String]
println("start parser ^^^" + pathStr)
fdis = fs.open(new Path(pathStr)) fdis = fs.open(new Path(pathStr))
br = new BufferedReader(new InputStreamReader(fdis)) br = new BufferedReader(new InputStreamReader(fdis))
var count = 0 var count = 0
while (hasAnotherSequence) { while (hasAnotherSequence) {
count += 1 count += 1
println(count)
doc = new JSONObject doc = new JSONObject
hasAnotherSequence = util.KeggPathway.process(br, doc) hasAnotherSequence = util.KeggPathway.process(br, doc)
doc.write(hdfsWriter) doc.write(hdfsWriter)
hdfsWriter.write("\n") hdfsWriter.write("\n")
} }

View File

@ -1,10 +1,10 @@
package cn.piflow.bundle.microorganism package cn.piflow.bundle.microorganism
import java.io.{BufferedInputStream, BufferedReader, ByteArrayInputStream, InputStreamReader} import java.io._
import cn.piflow.bundle.microorganism.util.Pfam import cn.piflow.bundle.microorganism.util.Pfam
import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.ImageUtil import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup} import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
@ -20,108 +20,71 @@ class PfamData extends ConfigurableStop{
override val outportList: List[String] = List(PortEnum.DefaultPort.toString) override val outportList: List[String] = List(PortEnum.DefaultPort.toString)
var cachePath:String = _
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val session = pec.get[SparkSession]() val session = pec.get[SparkSession]()
val inDf: DataFrame = in.read() val inDf: DataFrame = in.read()
val configuration: Configuration = new Configuration() val configuration: Configuration = new Configuration()
var pathStr: String = "" var pathStr: String =inDf.take(1)(0).get(0).asInstanceOf[String]
val pathARR: Array[String] = pathStr.split("\\/")
var hdfsUrl:String="" var hdfsUrl:String=""
try{ for (x <- (0 until 3)){
pathStr =inDf.take(1)(0).get(0).asInstanceOf[String] hdfsUrl+=(pathARR(x) +"/")
val pathARR: Array[String] = pathStr.split("\\/")
for (x <- (0 until 3)){
hdfsUrl+=(pathARR(x) +"/")
}
}catch {
case e:Exception => throw new Exception("Path error")
} }
configuration.set("fs.defaultFS",hdfsUrl) configuration.set("fs.defaultFS",hdfsUrl)
var fs: FileSystem = FileSystem.get(configuration) var fs: FileSystem = FileSystem.get(configuration)
val hdfsPathTemporary:String = hdfsUrl+"/Refseq_genomeParser_temporary.json" val hdfsPathTemporary = hdfsUrl+cachePath+"/pfamCache/pfamCache.json"
val path: Path = new Path(hdfsPathTemporary)
val path: Path = new Path(hdfsPathTemporary)
if(fs.exists(path)){ if(fs.exists(path)){
fs.delete(path) fs.delete(path)
} }
fs.create(path).close() fs.create(path).close()
var fdos: FSDataOutputStream = fs.append(path)
val buff: Array[Byte] = new Array[Byte](1048576)
var bis: BufferedInputStream =null val hdfsWriter: OutputStreamWriter = new OutputStreamWriter(fs.append(path))
var fdis: FSDataInputStream =null var fdis: FSDataInputStream =null
var br: BufferedReader = null var br: BufferedReader = null
var sequences: RichSequenceIterator = null
var doc: JSONObject = null var doc: JSONObject = null
var seq: RichSequence = null var seq: RichSequence = null
var hasAnotherSequence : Boolean=true var hasAnotherSequence : Boolean=true
var jsonStr: String = ""
var n:Int=0 var count:Int=0
inDf.collect().foreach(row => { inDf.collect().foreach(row => {
pathStr = row.get(0).asInstanceOf[String] pathStr = row.get(0).asInstanceOf[String]
fdis = fs.open(new Path(pathStr)) fdis = fs.open(new Path(pathStr))
br = new BufferedReader(new InputStreamReader(fdis)) br = new BufferedReader(new InputStreamReader(fdis))
while( hasAnotherSequence && n < 1000 ){ while( hasAnotherSequence ){
n += 1 count += 1
doc = new JSONObject() doc = new JSONObject()
hasAnotherSequence = Pfam.process(br,doc) hasAnotherSequence = Pfam.process(br,doc)
jsonStr = doc.toString doc.write(hdfsWriter)
hdfsWriter.write("\n")
if (n == 1) {
bis = new BufferedInputStream(new ByteArrayInputStream(("[" + jsonStr).getBytes()))
} else {
bis = new BufferedInputStream(new ByteArrayInputStream(("," + jsonStr).getBytes()))
}
var count: Int = bis.read(buff)
while (count != -1) {
fdos.write(buff, 0, count)
fdos.flush()
count = bis.read(buff)
}
fdos.flush()
bis = null
doc = null
seq = null
jsonStr = ""
} }
br.close()
sequences = null fdis.close()
br = null
fdis =null
pathStr = null
}) })
bis = new BufferedInputStream(new ByteArrayInputStream(("]").getBytes())) hdfsWriter.close()
var count: Int = bis.read(buff)
while (count != -1) {
fdos.write(buff, 0, count)
fdos.flush()
count = bis.read(buff)
}
fdos.flush()
bis.close()
fdos.close()
val df: DataFrame = session.read.json(hdfsPathTemporary) val df: DataFrame = session.read.json(hdfsPathTemporary)
out.write(df) out.write(df)
} }
override def setProperties(map: Map[String, Any]): Unit = { def setProperties(map: Map[String, Any]): Unit = {
cachePath=MapUtil.get(map,key="cachePath").asInstanceOf[String]
} }
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
override def getPropertyDescriptor(): List[PropertyDescriptor] ={
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").description("Temporary Cache File Path")
.defaultValue("/pfam").required(true)
descriptor = cachePath :: descriptor
descriptor descriptor
} }

View File

@ -4,7 +4,7 @@ import java.io._
import cn.piflow.bundle.microorganism.util.{CustomIOTools, Process} import cn.piflow.bundle.microorganism.util.{CustomIOTools, Process}
import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.ImageUtil import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup} import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
@ -19,15 +19,13 @@ class RefseqData extends ConfigurableStop{
override val inportList: List[String] =List(PortEnum.DefaultPort.toString) override val inportList: List[String] =List(PortEnum.DefaultPort.toString)
override val outportList: List[String] = List(PortEnum.DefaultPort.toString) override val outportList: List[String] = List(PortEnum.DefaultPort.toString)
var cachePath:String = _
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val session = pec.get[SparkSession]() val session = pec.get[SparkSession]()
val inDf: DataFrame = in.read() val inDf: DataFrame = in.read()
val configuration: Configuration = new Configuration()
val configuration: Configuration = new Configuration()
var pathStr: String =inDf.take(1)(0).get(0).asInstanceOf[String] var pathStr: String =inDf.take(1)(0).get(0).asInstanceOf[String]
val pathARR: Array[String] = pathStr.split("\\/") val pathARR: Array[String] = pathStr.split("\\/")
var hdfsUrl:String="" var hdfsUrl:String=""
@ -37,88 +35,62 @@ class RefseqData extends ConfigurableStop{
configuration.set("fs.defaultFS",hdfsUrl) configuration.set("fs.defaultFS",hdfsUrl)
var fs: FileSystem = FileSystem.get(configuration) var fs: FileSystem = FileSystem.get(configuration)
val hdfsPathTemporary:String = hdfsUrl+"/Refseq_genomeParser_temporary.json" val hdfsPathTemporary = hdfsUrl+cachePath+"/refseqCache/refseqCache.json"
val path: Path = new Path(hdfsPathTemporary)
val path: Path = new Path(hdfsPathTemporary)
if(fs.exists(path)){ if(fs.exists(path)){
fs.delete(path) fs.delete(path)
} }
fs.create(path).close() fs.create(path).close()
var fdos: FSDataOutputStream = fs.append(path)
val buff: Array[Byte] = new Array[Byte](1048576)
var jsonStr: String ="" val hdfsWriter: OutputStreamWriter = new OutputStreamWriter(fs.append(path))
var bis: BufferedInputStream =null
var doc: JSONObject= null
var seq: RichSequence = null
var br: BufferedReader = null
var fdis: FSDataInputStream =null
var sequences: RichSequenceIterator = null
var count : Int = 0
inDf.collect().foreach(row => { inDf.collect().foreach(row => {
var n : Int =0
pathStr = row.get(0).asInstanceOf[String] pathStr = row.get(0).asInstanceOf[String]
fdis = fs.open(new Path(pathStr))
br = new BufferedReader(new InputStreamReader(fdis))
sequences = CustomIOTools.IOTools.readGenbankProtein(br, null)
var fdis: FSDataInputStream = fs.open(new Path(pathStr)) while (sequences.hasNext) {
count += 1
seq = sequences.nextRichSequence()
doc = new JSONObject
Process.processSingleSequence(seq, doc)
var br: BufferedReader = new BufferedReader(new InputStreamReader(fdis)) doc.write(hdfsWriter)
hdfsWriter.write("\n")
var sequences: RichSequenceIterator = CustomIOTools.IOTools.readGenbankProtein(br, null)
while (sequences.hasNext) {
n += 1
var seq: RichSequence = sequences.nextRichSequence()
var doc: JSONObject = new JSONObject
Process.processSingleSequence(seq, doc)
jsonStr = doc.toString
println("start " + n)
if (n == 1) {
bis = new BufferedInputStream(new ByteArrayInputStream(("[" + jsonStr).getBytes()))
} else {
bis = new BufferedInputStream(new ByteArrayInputStream(("," + jsonStr).getBytes()))
} }
fdis.close()
var count: Int = bis.read(buff) br.close()
while (count != -1) {
fdos.write(buff, 0, count)
fdos.flush()
count = bis.read(buff)
}
fdos.flush()
bis = null
seq = null
doc = null
}
}) })
bis = new BufferedInputStream(new ByteArrayInputStream(("]").getBytes())) hdfsWriter.close()
var count: Int = bis.read(buff)
while (count != -1) {
fdos.write(buff, 0, count)
fdos.flush()
count = bis.read(buff)
}
fdos.flush()
bis.close()
fdos.close()
println("start parser HDFSjsonFile") println("start parser HDFSjsonFile")
val df: DataFrame = session.read.json(hdfsPathTemporary) val df: DataFrame = session.read.json(hdfsPathTemporary)
out.write(df) out.write(df)
} }
override def setProperties(map: Map[String, Any]): Unit = {
def setProperties(map: Map[String, Any]): Unit = {
cachePath=MapUtil.get(map,key="cachePath").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").description("Temporary Cache File Path")
.defaultValue("/refseqData").required(true)
descriptor = cachePath :: descriptor
descriptor
} }
override def getPropertyDescriptor(): List[PropertyDescriptor] ={
var descriptor : List[PropertyDescriptor] = List()
descriptor
}
override def getIcon(): Array[Byte] = { override def getIcon(): Array[Byte] = {
ImageUtil.getImage("icon/microorganism/RefseqData.png") ImageUtil.getImage("icon/microorganism/RefseqData.png")

View File

@ -1,10 +1,10 @@
package cn.piflow.bundle.microorganism package cn.piflow.bundle.microorganism
import java.io.{BufferedInputStream, BufferedReader, ByteArrayInputStream, InputStreamReader} import java.io._
import cn.piflow.bundle.microorganism.util.{CustomIOTools, Process} import cn.piflow.bundle.microorganism.util.{CustomIOTools, Process}
import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.ImageUtil import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup} import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
@ -19,12 +19,16 @@ class SwissprotData extends ConfigurableStop{
override val inportList: List[String] =List(PortEnum.DefaultPort.toString) override val inportList: List[String] =List(PortEnum.DefaultPort.toString)
override val outportList: List[String] = List(PortEnum.DefaultPort.toString) override val outportList: List[String] = List(PortEnum.DefaultPort.toString)
override def setProperties(map: Map[String, Any]): Unit = { var cachePath:String = _
def setProperties(map: Map[String, Any]): Unit = {
cachePath=MapUtil.get(map,key="cachePath").asInstanceOf[String]
} }
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
override def getPropertyDescriptor(): List[PropertyDescriptor] ={
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").description("Temporary Cache File Path")
.defaultValue("/swissprot").required(true)
descriptor = cachePath :: descriptor
descriptor descriptor
} }
@ -41,96 +45,58 @@ class SwissprotData extends ConfigurableStop{
} }
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val session = pec.get[SparkSession]() val session = pec.get[SparkSession]()
val inDf: DataFrame = in.read() val inDf: DataFrame = in.read()
val configuration: Configuration = new Configuration() val configuration: Configuration = new Configuration()
var pathStr: String = "" var pathStr: String =inDf.take(1)(0).get(0).asInstanceOf[String]
val pathARR: Array[String] = pathStr.split("\\/")
var hdfsUrl:String="" var hdfsUrl:String=""
try{ for (x <- (0 until 3)){
pathStr =inDf.take(1)(0).get(0).asInstanceOf[String] hdfsUrl+=(pathARR(x) +"/")
val pathARR: Array[String] = pathStr.split("\\/")
for (x <- (0 until 3)){
hdfsUrl+=(pathARR(x) +"/")
}
}catch {
case e:Exception => throw new Exception("Path error")
} }
configuration.set("fs.defaultFS",hdfsUrl) configuration.set("fs.defaultFS",hdfsUrl)
var fs: FileSystem = FileSystem.get(configuration) var fs: FileSystem = FileSystem.get(configuration)
val hdfsPathTemporary:String = hdfsUrl+"/Refseq_genomeParser_temporary.json" val hdfsPathTemporary = hdfsUrl+cachePath+"/swissprotCache/swissprotCache.json"
val path: Path = new Path(hdfsPathTemporary)
val path: Path = new Path(hdfsPathTemporary)
if(fs.exists(path)){ if(fs.exists(path)){
fs.delete(path) fs.delete(path)
} }
fs.create(path).close() fs.create(path).close()
var fdos: FSDataOutputStream = fs.append(path)
val buff: Array[Byte] = new Array[Byte](1048576)
var bis: BufferedInputStream =null val hdfsWriter: OutputStreamWriter = new OutputStreamWriter(fs.append(path))
var fdis: FSDataInputStream =null var fdis: FSDataInputStream =null
var br: BufferedReader = null var br: BufferedReader = null
var sequences: RichSequenceIterator = null var sequences: RichSequenceIterator = null
var doc: JSONObject = null var doc: JSONObject = null
var seq: RichSequence = null var seq: RichSequence = null
var jsonStr: String = "" var count:Int=0
var n:Int=0
inDf.collect().foreach(row => { inDf.collect().foreach(row => {
pathStr = row.get(0).asInstanceOf[String] pathStr = row.get(0).asInstanceOf[String]
fdis = fs.open(new Path(pathStr)) fdis = fs.open(new Path(pathStr))
br = new BufferedReader(new InputStreamReader(fdis)) br = new BufferedReader(new InputStreamReader(fdis))
sequences = CustomIOTools.IOTools.readUniProt(br,null) sequences = CustomIOTools.IOTools.readUniProt(br,null)
while (sequences.hasNext) { while (sequences.hasNext) {
count += 1
if (count < 100) {
doc = new JSONObject()
seq = sequences.nextRichSequence()
Process.processUniprotSeq(seq, doc)
n += 1 doc.write(hdfsWriter)
hdfsWriter.write("\n")
doc = new JSONObject() seq = null
seq = sequences.nextRichSequence() }
Process.processUniprotSeq(seq,doc)
jsonStr = doc.toString
println("start " + n + "String\\\n" )
if (n == 1) {
bis = new BufferedInputStream(new ByteArrayInputStream(("[" + jsonStr).getBytes()))
} else {
bis = new BufferedInputStream(new ByteArrayInputStream(("," + jsonStr).getBytes()))
} }
var count: Int = bis.read(buff) br.close()
while (count != -1) { fdis.close()
fdos.write(buff, 0, count)
fdos.flush()
count = bis.read(buff)
}
fdos.flush()
bis = null
doc = null
seq = null
jsonStr = ""
}
sequences = null
br = null
fdis =null
pathStr = null
}) })
bis = new BufferedInputStream(new ByteArrayInputStream(("]").getBytes())) hdfsWriter.close()
var count: Int = bis.read(buff)
while (count != -1) {
fdos.write(buff, 0, count)
fdos.flush()
count = bis.read(buff)
}
fdos.flush()
bis.close()
fdos.close()
val df: DataFrame = session.read.json(hdfsPathTemporary) val df: DataFrame = session.read.json(hdfsPathTemporary)

View File

@ -13,6 +13,7 @@ class FlowBean {
var checkpoint : String = _ var checkpoint : String = _
var checkpointParentProcessId : String = _ var checkpointParentProcessId : String = _
var runMode : String = _ var runMode : String = _
var showData : String = _
var stops : List[StopBean] = List() var stops : List[StopBean] = List()
var paths : List[PathBean] = List() var paths : List[PathBean] = List()
@ -25,6 +26,7 @@ class FlowBean {
this.checkpoint = flowMap.getOrElse("checkpoint","").asInstanceOf[String] this.checkpoint = flowMap.getOrElse("checkpoint","").asInstanceOf[String]
this.checkpointParentProcessId = flowMap.getOrElse("checkpointParentProcessId", "").asInstanceOf[String] this.checkpointParentProcessId = flowMap.getOrElse("checkpointParentProcessId", "").asInstanceOf[String]
this.runMode = flowMap.getOrElse("runMode","RUN").asInstanceOf[String] this.runMode = flowMap.getOrElse("runMode","RUN").asInstanceOf[String]
this.showData = flowMap.getOrElse("showData","0").asInstanceOf[String]
//construct StopBean List //construct StopBean List
val stopsList = MapUtil.get(flowMap,"stops").asInstanceOf[List[Map[String, Any]]] val stopsList = MapUtil.get(flowMap,"stops").asInstanceOf[List[Map[String, Any]]]

View File

@ -378,13 +378,12 @@ class JobOutputStreamImpl() extends JobOutputStream with Logging {
def getDataFrame(port: String) = mapDataFrame(port); def getDataFrame(port: String) = mapDataFrame(port);
def showData() = { def showData(count:Int) = {
mapDataFrame.foreach(en => { mapDataFrame.foreach(en => {
val portName = if(en._1.equals("")) "default" else en._1 val portName = if(en._1.equals("")) "default" else en._1
println(portName + " port: ") println(portName + " port: ")
en._2.apply().show(count)
en._2.apply().show(PropertyUtil.getPropertyValue("data.show").toInt)
}) })
} }
@ -484,7 +483,10 @@ class ProcessImpl(flow: Flow, runnerContext: Context, runner: Runner, parentProc
runnerListener.onJobCompleted(pe.getContext()); runnerListener.onJobCompleted(pe.getContext());
//show data in log //show data in log
outputs.showData() val showDataCount = PropertyUtil.getPropertyValue("data.show").toInt
if(showDataCount > 0) {
outputs.showData(showDataCount)
}
} }
catch { catch {
case e: Throwable => case e: Throwable =>
@ -528,7 +530,10 @@ class ProcessImpl(flow: Flow, runnerContext: Context, runner: Runner, parentProc
} }
} }
//show data in log //show data in log
outputs.showData() val showDataCount = PropertyUtil.getPropertyValue("data.show").toInt
if(showDataCount > 0) {
outputs.showData(showDataCount)
}
//save data in debug mode //save data in debug mode
if(flow.getRunMode() == FlowRunMode.DEBUG) { if(flow.getRunMode() == FlowRunMode.DEBUG) {

View File

@ -10,6 +10,12 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>piflow-server</artifactId> <artifactId>piflow-server</artifactId>
<properties>
<akka.version>2.4.14</akka.version>
<akka.http.version>10.0.0</akka.http.version>
</properties>
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>piflow</groupId> <groupId>piflow</groupId>
@ -30,40 +36,31 @@
<dependency> <dependency>
<groupId>com.typesafe.akka</groupId> <groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_2.11</artifactId> <artifactId>akka-remote_2.11</artifactId>
<version>2.3.14</version> <version>${akka.version}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.typesafe.akka</groupId> <groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId> <artifactId>akka-actor_2.11</artifactId>
<version>2.3.14</version> <version>${akka.version}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.typesafe.akka</groupId> <groupId>com.typesafe.akka</groupId>
<artifactId>akka-slf4j_2.11</artifactId> <artifactId>akka-slf4j_2.11</artifactId>
<version>2.3.14</version> <version>${akka.version}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.typesafe.akka</groupId> <groupId>com.typesafe.akka</groupId>
<artifactId>akka-stream-experimental_2.11</artifactId> <artifactId>akka-http_2.11</artifactId>
<version>2.0.4</version> <version>${akka.http.version}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.typesafe.akka</groupId> <groupId>com.typesafe.akka</groupId>
<artifactId>akka-http-core-experimental_2.11</artifactId> <artifactId>akka-http-spray-json_2.11</artifactId>
<version>2.0.4</version> <version>${akka.http.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-http-experimental_2.11</artifactId>
<version>2.0.4</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-http-spray-json-experimental_2.11</artifactId>
<version>2.0.4</version>
</dependency> </dependency>
</dependencies> </dependencies>

View File

@ -0,0 +1,10 @@
PiFlowHTTPService{
shutdown-timeout:300s
}
akka {
http {
idle-timeout = 600 s
request-timeout = 200 s
}
}

View File

@ -19,7 +19,8 @@ import spray.json.DefaultJsonProtocol
object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSupport{ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSupport{
implicit val system = ActorSystem("HTTPService", ConfigFactory.load()) implicit val config = ConfigFactory.load()
implicit val system = ActorSystem("PiFlowHTTPService", config)
implicit val materializer = ActorMaterializer() implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher implicit val executionContext = system.dispatcher
var processMap = Map[String, SparkAppHandle]() var processMap = Map[String, SparkAppHandle]()