修改描述信息,提交微生物stop

This commit is contained in:
yanggang 2019-01-30 15:02:42 +08:00
parent 11a15d6ca7
commit 0d1608f2b3
27 changed files with 1507 additions and 1170 deletions

View File

@ -1,389 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_8">
<output url="file://$MODULE_DIR$/target/classes" />
<output-test url="file://$MODULE_DIR$/target/test-classes" />
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/src/main/resources" type="java-resource" />
<sourceFolder url="file://$MODULE_DIR$/src/main/scala" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/test/resources" type="java-test-resource" />
<sourceFolder url="file://$MODULE_DIR$/src/test/scala" isTestSource="true" />
<excludeFolder url="file://$MODULE_DIR$/target" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="library" name="Maven: org.biojava:biojava-structure:4.0.0" level="project" />
<orderEntry type="library" name="Maven: org.biojava:biojava-alignment:4.0.0" level="project" />
<orderEntry type="library" name="Maven: org.biojava:biojava-phylo:4.0.0" level="project" />
<orderEntry type="library" name="Maven: org.biojava.thirdparty:forester:1.005" level="project" />
<orderEntry type="library" name="Maven: org.biojava:biojava-core:4.0.0" level="project" />
<orderEntry type="library" name="Maven: java3d:vecmath:1.3.1" level="project" />
<orderEntry type="library" scope="RUNTIME" name="Maven: org.apache.logging.log4j:log4j-slf4j-impl:2.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.logging.log4j:log4j-api:2.1" level="project" />
<orderEntry type="library" scope="RUNTIME" name="Maven: org.apache.logging.log4j:log4j-core:2.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.hive:hive-jdbc:1.2.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.hive:hive-common:1.2.1" level="project" />
<orderEntry type="library" name="Maven: log4j:apache-log4j-extras:1.2.17" level="project" />
<orderEntry type="library" name="Maven: org.apache.commons:commons-compress:1.4.1" level="project" />
<orderEntry type="library" name="Maven: org.tukaani:xz:1.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.ant:ant:1.9.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.ant:ant-launcher:1.9.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.hive:hive-service:1.2.1" level="project" />
<orderEntry type="library" name="Maven: net.sf.jpam:jpam:1.1" level="project" />
<orderEntry type="library" name="Maven: org.eclipse.jetty.aggregate:jetty-all:7.6.0.v20120127" level="project" />
<orderEntry type="library" name="Maven: javax.servlet:servlet-api:2.5" level="project" />
<orderEntry type="library" name="Maven: org.apache.geronimo.specs:geronimo-jta_1.1_spec:1.1.1" level="project" />
<orderEntry type="library" name="Maven: javax.mail:mail:1.4.1" level="project" />
<orderEntry type="library" name="Maven: javax.activation:activation:1.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.geronimo.specs:geronimo-jaspic_1.0_spec:1.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.geronimo.specs:geronimo-annotation_1.0_spec:1.1.1" level="project" />
<orderEntry type="library" name="Maven: asm:asm-commons:3.1" level="project" />
<orderEntry type="library" name="Maven: asm:asm-tree:3.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.hive:hive-serde:1.2.1" level="project" />
<orderEntry type="library" name="Maven: net.sf.opencsv:opencsv:2.3" level="project" />
<orderEntry type="library" name="Maven: org.apache.hive:hive-metastore:1.2.1" level="project" />
<orderEntry type="library" name="Maven: com.jolbox:bonecp:0.8.0.RELEASE" level="project" />
<orderEntry type="library" name="Maven: org.apache.derby:derby:10.10.2.0" level="project" />
<orderEntry type="library" name="Maven: org.datanucleus:datanucleus-api-jdo:3.2.6" level="project" />
<orderEntry type="library" name="Maven: org.datanucleus:datanucleus-rdbms:3.2.9" level="project" />
<orderEntry type="library" name="Maven: commons-pool:commons-pool:1.5.4" level="project" />
<orderEntry type="library" name="Maven: javax.jdo:jdo-api:3.0.1" level="project" />
<orderEntry type="library" name="Maven: javax.transaction:jta:1.1" level="project" />
<orderEntry type="library" name="Maven: org.antlr:antlr-runtime:3.4" level="project" />
<orderEntry type="library" name="Maven: org.antlr:stringtemplate:3.2.1" level="project" />
<orderEntry type="library" name="Maven: antlr:antlr:2.7.7" level="project" />
<orderEntry type="library" name="Maven: org.apache.hive:hive-shims:1.2.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.hive.shims:hive-shims-common:1.2.1" level="project" />
<orderEntry type="library" scope="RUNTIME" name="Maven: org.apache.hive.shims:hive-shims-0.20S:1.2.1" level="project" />
<orderEntry type="library" scope="RUNTIME" name="Maven: org.apache.hive.shims:hive-shims-0.23:1.2.1" level="project" />
<orderEntry type="library" scope="RUNTIME" name="Maven: org.apache.hadoop:hadoop-yarn-server-resourcemanager:2.6.0" level="project" />
<orderEntry type="library" scope="RUNTIME" name="Maven: com.sun.jersey:jersey-json:1.9" level="project" />
<orderEntry type="library" scope="RUNTIME" name="Maven: com.sun.xml.bind:jaxb-impl:2.2.3-1" level="project" />
<orderEntry type="library" scope="RUNTIME" name="Maven: org.codehaus.jackson:jackson-xc:1.8.3" level="project" />
<orderEntry type="library" scope="RUNTIME" name="Maven: com.sun.jersey.contribs:jersey-guice:1.9" level="project" />
<orderEntry type="library" scope="RUNTIME" name="Maven: javax.xml.bind:jaxb-api:2.2.2" level="project" />
<orderEntry type="library" scope="RUNTIME" name="Maven: javax.xml.stream:stax-api:1.0-2" level="project" />
<orderEntry type="library" scope="RUNTIME" name="Maven: org.codehaus.jettison:jettison:1.1" level="project" />
<orderEntry type="library" scope="RUNTIME" name="Maven: com.sun.jersey:jersey-client:1.9" level="project" />
<orderEntry type="library" scope="RUNTIME" name="Maven: org.apache.hadoop:hadoop-yarn-server-applicationhistoryservice:2.6.0" level="project" />
<orderEntry type="library" scope="RUNTIME" name="Maven: org.apache.hive.shims:hive-shims-scheduler:1.2.1" level="project" />
<orderEntry type="library" name="Maven: commons-logging:commons-logging:1.1.3" level="project" />
<orderEntry type="library" name="Maven: org.apache.httpcomponents:httpclient:4.4" level="project" />
<orderEntry type="library" name="Maven: org.apache.httpcomponents:httpcore:4.4" level="project" />
<orderEntry type="library" name="Maven: org.apache.thrift:libthrift:0.9.2" level="project" />
<orderEntry type="library" name="Maven: org.apache.zookeeper:zookeeper:3.4.6" level="project" />
<orderEntry type="library" name="Maven: jline:jline:0.9.94" level="project" />
<orderEntry type="library" name="Maven: org.apache.curator:curator-framework:2.6.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.curator:curator-client:2.6.0" level="project" />
<orderEntry type="library" name="Maven: org.slf4j:slf4j-log4j12:1.7.5" level="project" />
<orderEntry type="library" name="Maven: org.mongodb:mongodb-driver:3.4.1" level="project" />
<orderEntry type="library" name="Maven: org.mongodb:mongodb-driver-core:3.4.1" level="project" />
<orderEntry type="library" name="Maven: org.mongodb:bson:3.4.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.solr:solr-solrj:7.2.0" level="project" />
<orderEntry type="library" name="Maven: commons-io:commons-io:2.5" level="project" />
<orderEntry type="library" name="Maven: org.apache.commons:commons-math3:3.6.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.httpcomponents:httpmime:4.5.3" level="project" />
<orderEntry type="library" name="Maven: org.codehaus.woodstox:stax2-api:3.1.4" level="project" />
<orderEntry type="library" name="Maven: org.codehaus.woodstox:woodstox-core-asl:4.4.1" level="project" />
<orderEntry type="library" name="Maven: org.noggit:noggit:0.8" level="project" />
<orderEntry type="library" name="Maven: org.slf4j:jcl-over-slf4j:1.7.7" level="project" />
<orderEntry type="library" name="Maven: dom4j:dom4j:1.6.1" level="project" />
<orderEntry type="library" name="Maven: xml-apis:xml-apis:1.0.b2" level="project" />
<orderEntry type="module" module-name="piflow-core" />
<orderEntry type="library" name="Maven: org.quartz-scheduler:quartz:2.3.0" level="project" />
<orderEntry type="library" name="Maven: com.mchange:c3p0:0.9.5.2" level="project" />
<orderEntry type="library" name="Maven: com.mchange:mchange-commons-java:0.2.11" level="project" />
<orderEntry type="library" name="Maven: com.zaxxer:HikariCP-java6:2.3.13" level="project" />
<orderEntry type="library" name="Maven: org.clapper:classutil_2.11:1.3.0" level="project" />
<orderEntry type="library" name="Maven: org.ow2.asm:asm:6.1.1" level="project" />
<orderEntry type="library" name="Maven: org.ow2.asm:asm-commons:6.1.1" level="project" />
<orderEntry type="library" name="Maven: org.ow2.asm:asm-tree:6.1.1" level="project" />
<orderEntry type="library" name="Maven: org.ow2.asm:asm-analysis:6.1.1" level="project" />
<orderEntry type="library" name="Maven: org.ow2.asm:asm-util:6.1.1" level="project" />
<orderEntry type="library" name="Maven: org.clapper:grizzled-scala_2.11:4.4.2" level="project" />
<orderEntry type="library" name="Maven: org.reflections:reflections:0.9.9" level="project" />
<orderEntry type="library" name="Maven: com.google.guava:guava:15.0" level="project" />
<orderEntry type="library" name="Maven: org.javassist:javassist:3.18.2-GA" level="project" />
<orderEntry type="library" name="Maven: com.google.code.findbugs:annotations:2.0.1" level="project" />
<orderEntry type="library" name="Maven: com.chuusai:shapeless_2.11:2.3.1" level="project" />
<orderEntry type="library" name="Maven: org.typelevel:macro-compat_2.11:1.1.1" level="project" />
<orderEntry type="library" name="Maven: com.sksamuel.scrimage:scrimage-core_2.11:2.1.0" level="project" />
<orderEntry type="library" name="Maven: com.twelvemonkeys.imageio:imageio-core:3.1.2" level="project" />
<orderEntry type="library" name="Maven: com.twelvemonkeys.imageio:imageio-jpeg:3.1.2" level="project" />
<orderEntry type="library" name="Maven: com.twelvemonkeys.imageio:imageio-metadata:3.1.2" level="project" />
<orderEntry type="library" name="Maven: com.twelvemonkeys.common:common-lang:3.1.2" level="project" />
<orderEntry type="library" name="Maven: com.twelvemonkeys.common:common-io:3.1.2" level="project" />
<orderEntry type="library" name="Maven: com.twelvemonkeys.common:common-image:3.1.2" level="project" />
<orderEntry type="library" name="Maven: com.drewnoakes:metadata-extractor:2.8.1" level="project" />
<orderEntry type="library" name="Maven: com.adobe.xmp:xmpcore:5.1.2" level="project" />
<orderEntry type="library" name="Maven: ar.com.hjg:pngj:2.1.0" level="project" />
<orderEntry type="library" name="Maven: com.sksamuel.scrimage:scrimage-io-extra_2.11:2.1.0" level="project" />
<orderEntry type="library" name="Maven: com.twelvemonkeys.imageio:imageio-bmp:3.1.2" level="project" />
<orderEntry type="library" name="Maven: com.twelvemonkeys.imageio:imageio-icns:3.1.2" level="project" />
<orderEntry type="library" name="Maven: com.twelvemonkeys.imageio:imageio-iff:3.1.2" level="project" />
<orderEntry type="library" name="Maven: com.twelvemonkeys.imageio:imageio-pcx:3.1.2" level="project" />
<orderEntry type="library" name="Maven: com.twelvemonkeys.imageio:imageio-pict:3.1.2" level="project" />
<orderEntry type="library" name="Maven: com.twelvemonkeys.imageio:imageio-pdf:3.1.2" level="project" />
<orderEntry type="library" name="Maven: com.twelvemonkeys.imageio:imageio-pnm:3.1.2" level="project" />
<orderEntry type="library" name="Maven: com.twelvemonkeys.imageio:imageio-psd:3.1.2" level="project" />
<orderEntry type="library" name="Maven: com.twelvemonkeys.imageio:imageio-sgi:3.1.2" level="project" />
<orderEntry type="library" name="Maven: com.twelvemonkeys.imageio:imageio-tiff:3.1.2" level="project" />
<orderEntry type="library" name="Maven: com.twelvemonkeys.imageio:imageio-tga:3.1.2" level="project" />
<orderEntry type="library" name="Maven: com.twelvemonkeys.imageio:imageio-thumbsdb:3.1.2" level="project" />
<orderEntry type="library" name="Maven: com.sksamuel.scrimage:scrimage-filters_2.11:2.1.0" level="project" />
<orderEntry type="library" name="Maven: org.slf4j:slf4j-api:1.7.25" level="project" />
<orderEntry type="library" name="Maven: net.liftweb:lift-json_2.11:2.6.1" level="project" />
<orderEntry type="library" name="Maven: org.scala-lang:scalap:2.11.1" level="project" />
<orderEntry type="library" name="Maven: com.thoughtworks.paranamer:paranamer:2.4.1" level="project" />
<orderEntry type="library" name="Maven: com.databricks:spark-xml_2.11:0.4.1" level="project" />
<orderEntry type="library" name="Maven: redis.clients:jedis:2.9.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.kafka:kafka-clients:0.11.0.0" level="project" />
<orderEntry type="library" name="Maven: net.jpountz.lz4:lz4:1.3.0" level="project" />
<orderEntry type="library" name="Maven: org.xerial.snappy:snappy-java:1.1.2.6" level="project" />
<orderEntry type="library" name="Maven: org.elasticsearch:elasticsearch-hadoop:6.4.2" level="project" />
<orderEntry type="library" name="Maven: org.elasticsearch:elasticsearch:5.6.3" level="project" />
<orderEntry type="library" name="Maven: org.apache.lucene:lucene-core:6.6.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.lucene:lucene-analyzers-common:6.6.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.lucene:lucene-backward-codecs:6.6.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.lucene:lucene-grouping:6.6.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.lucene:lucene-highlighter:6.6.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.lucene:lucene-join:6.6.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.lucene:lucene-memory:6.6.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.lucene:lucene-misc:6.6.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.lucene:lucene-queries:6.6.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.lucene:lucene-queryparser:6.6.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.lucene:lucene-sandbox:6.6.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.lucene:lucene-spatial:6.6.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.lucene:lucene-spatial-extras:6.6.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.lucene:lucene-spatial3d:6.6.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.lucene:lucene-suggest:6.6.1" level="project" />
<orderEntry type="library" name="Maven: org.elasticsearch:securesm:1.1" level="project" />
<orderEntry type="library" name="Maven: net.sf.jopt-simple:jopt-simple:5.0.2" level="project" />
<orderEntry type="library" name="Maven: com.carrotsearch:hppc:0.7.1" level="project" />
<orderEntry type="library" name="Maven: joda-time:joda-time:2.9.5" level="project" />
<orderEntry type="library" name="Maven: org.yaml:snakeyaml:1.15" level="project" />
<orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-core:2.8.6" level="project" />
<orderEntry type="library" name="Maven: com.fasterxml.jackson.dataformat:jackson-dataformat-smile:2.8.6" level="project" />
<orderEntry type="library" name="Maven: com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.8.6" level="project" />
<orderEntry type="library" name="Maven: com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.8.6" level="project" />
<orderEntry type="library" name="Maven: com.tdunning:t-digest:3.0" level="project" />
<orderEntry type="library" name="Maven: org.hdrhistogram:HdrHistogram:2.1.9" level="project" />
<orderEntry type="library" name="Maven: org.elasticsearch:jna:4.4.0-1" level="project" />
<orderEntry type="library" name="Maven: org.apache.spark:spark-mllib_2.11:2.1.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.spark:spark-streaming_2.11:2.1.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.spark:spark-graphx_2.11:2.1.0" level="project" />
<orderEntry type="library" name="Maven: com.github.fommil.netlib:core:1.1.2" level="project" />
<orderEntry type="library" name="Maven: net.sourceforge.f2j:arpack_combined_all:0.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.spark:spark-mllib-local_2.11:2.1.0" level="project" />
<orderEntry type="library" name="Maven: org.scalanlp:breeze_2.11:0.12" level="project" />
<orderEntry type="library" name="Maven: org.scalanlp:breeze-macros_2.11:0.12" level="project" />
<orderEntry type="library" name="Maven: com.github.rwl:jtransforms:2.4.0" level="project" />
<orderEntry type="library" name="Maven: org.spire-math:spire_2.11:0.7.4" level="project" />
<orderEntry type="library" name="Maven: org.spire-math:spire-macros_2.11:0.7.4" level="project" />
<orderEntry type="library" name="Maven: org.jpmml:pmml-model:1.2.15" level="project" />
<orderEntry type="library" name="Maven: org.jpmml:pmml-schema:1.2.15" level="project" />
<orderEntry type="library" name="Maven: org.apache.spark:spark-tags_2.11:2.1.0" level="project" />
<orderEntry type="library" name="Maven: org.scalatest:scalatest_2.11:2.2.6" level="project" />
<orderEntry type="library" name="Maven: org.spark-project.spark:unused:1.0.0" level="project" />
<orderEntry type="library" name="Maven: org.jsoup:jsoup:1.10.3" level="project" />
<orderEntry type="library" name="Maven: org.json:json:20151123" level="project" />
<orderEntry type="library" name="Maven: org.biojava:core:1.9.1" level="project" />
<orderEntry type="library" name="Maven: commons-dbcp:commons-dbcp:1.2.2" level="project" />
<orderEntry type="library" name="Maven: org.biojava:bytecode:1.9.1" level="project" />
<orderEntry type="library" name="Maven: com.memcached:java_memcached-release:2.6.6" level="project" />
<orderEntry type="library" name="Maven: jdbc_oracle:ojdbc:6.0.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.flume:flume-ng-core:1.8.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.flume:flume-ng-sdk:1.8.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.flume:flume-ng-configuration:1.8.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.flume:flume-ng-auth:1.8.0" level="project" />
<orderEntry type="library" name="Maven: commons-codec:commons-codec:1.8" level="project" />
<orderEntry type="library" name="Maven: log4j:log4j:1.2.17" level="project" />
<orderEntry type="library" name="Maven: commons-cli:commons-cli:1.2" level="project" />
<orderEntry type="library" name="Maven: commons-lang:commons-lang:2.5" level="project" />
<orderEntry type="library" name="Maven: org.apache.avro:avro:1.7.4" level="project" />
<orderEntry type="library" name="Maven: org.apache.avro:avro-ipc:1.7.4" level="project" />
<orderEntry type="library" name="Maven: org.apache.velocity:velocity:1.7" level="project" />
<orderEntry type="library" name="Maven: io.netty:netty:3.9.4.Final" level="project" />
<orderEntry type="library" name="Maven: org.mortbay.jetty:servlet-api:2.5-20110124" level="project" />
<orderEntry type="library" name="Maven: org.mortbay.jetty:jetty-util:6.1.26" level="project" />
<orderEntry type="library" name="Maven: org.mortbay.jetty:jetty:6.1.26" level="project" />
<orderEntry type="library" name="Maven: com.google.code.gson:gson:2.2.2" level="project" />
<orderEntry type="library" name="Maven: org.apache.mina:mina-core:2.0.4" level="project" />
<orderEntry type="library" name="Maven: io.netty:netty-all:4.0.42.Final" level="project" />
<orderEntry type="library" name="Maven: org.apache.hbase:hbase-client:1.2.6" level="project" />
<orderEntry type="library" name="Maven: org.apache.hbase:hbase-annotations:1.2.6" level="project" />
<orderEntry type="library" name="Maven: com.github.stephenc.findbugs:findbugs-annotations:1.3.9-1" level="project" />
<orderEntry type="library" name="Maven: org.apache.hbase:hbase-common:1.2.6" level="project" />
<orderEntry type="library" name="Maven: org.apache.hbase:hbase-protocol:1.2.6" level="project" />
<orderEntry type="library" name="Maven: com.google.protobuf:protobuf-java:2.5.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.htrace:htrace-core:3.1.0-incubating" level="project" />
<orderEntry type="library" name="Maven: org.codehaus.jackson:jackson-mapper-asl:1.9.13" level="project" />
<orderEntry type="library" name="Maven: org.jruby.jcodings:jcodings:1.0.8" level="project" />
<orderEntry type="library" name="Maven: org.jruby.joni:joni:2.1.2" level="project" />
<orderEntry type="library" name="Maven: com.yammer.metrics:metrics-core:2.2.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-auth:2.5.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.directory.server:apacheds-kerberos-codec:2.0.0-M15" level="project" />
<orderEntry type="library" name="Maven: org.apache.directory.server:apacheds-i18n:2.0.0-M15" level="project" />
<orderEntry type="library" name="Maven: org.apache.directory.api:api-asn1-api:1.0.0-M20" level="project" />
<orderEntry type="library" name="Maven: org.apache.directory.api:api-util:1.0.0-M20" level="project" />
<orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-common:2.5.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-annotations:2.5.1" level="project" />
<orderEntry type="module-library">
<library name="Maven: jdk.tools:jdk.tools:1.6">
<CLASSES>
<root url="jar:///opt/java/jdk1.8.0_181/lib/tools.jar!/" />
</CLASSES>
<JAVADOC />
<SOURCES />
</library>
</orderEntry>
<orderEntry type="library" name="Maven: xmlenc:xmlenc:0.52" level="project" />
<orderEntry type="library" name="Maven: commons-el:commons-el:1.0" level="project" />
<orderEntry type="library" name="Maven: commons-configuration:commons-configuration:1.6" level="project" />
<orderEntry type="library" name="Maven: commons-digester:commons-digester:1.8" level="project" />
<orderEntry type="library" name="Maven: commons-beanutils:commons-beanutils-core:1.8.0" level="project" />
<orderEntry type="library" name="Maven: com.jcraft:jsch:0.1.42" level="project" />
<orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-mapreduce-client-core:2.5.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.hbase:hbase-server:1.2.6" level="project" />
<orderEntry type="library" name="Maven: org.apache.hbase:hbase-procedure:1.2.6" level="project" />
<orderEntry type="library" name="Maven: org.apache.hbase:hbase-common:tests:1.2.6" level="project" />
<orderEntry type="library" scope="RUNTIME" name="Maven: org.apache.hbase:hbase-prefix-tree:1.2.6" level="project" />
<orderEntry type="library" name="Maven: commons-httpclient:commons-httpclient:3.1" level="project" />
<orderEntry type="library" name="Maven: commons-collections:commons-collections:3.2.2" level="project" />
<orderEntry type="library" name="Maven: org.apache.hbase:hbase-hadoop-compat:1.2.6" level="project" />
<orderEntry type="library" name="Maven: org.apache.hbase:hbase-hadoop2-compat:1.2.6" level="project" />
<orderEntry type="library" name="Maven: com.sun.jersey:jersey-core:1.9" level="project" />
<orderEntry type="library" name="Maven: com.sun.jersey:jersey-server:1.9" level="project" />
<orderEntry type="library" name="Maven: asm:asm:3.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.commons:commons-math:2.2" level="project" />
<orderEntry type="library" name="Maven: org.mortbay.jetty:jetty-sslengine:6.1.26" level="project" />
<orderEntry type="library" name="Maven: org.mortbay.jetty:jsp-2.1:6.1.14" level="project" />
<orderEntry type="library" name="Maven: org.mortbay.jetty:jsp-api-2.1:6.1.14" level="project" />
<orderEntry type="library" name="Maven: org.mortbay.jetty:servlet-api-2.5:6.1.14" level="project" />
<orderEntry type="library" name="Maven: org.codehaus.jackson:jackson-core-asl:1.9.13" level="project" />
<orderEntry type="library" name="Maven: org.codehaus.jackson:jackson-jaxrs:1.9.13" level="project" />
<orderEntry type="library" name="Maven: tomcat:jasper-compiler:5.5.23" level="project" />
<orderEntry type="library" name="Maven: tomcat:jasper-runtime:5.5.23" level="project" />
<orderEntry type="library" name="Maven: org.jamon:jamon-runtime:2.4.1" level="project" />
<orderEntry type="library" name="Maven: com.lmax:disruptor:3.3.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-client:2.5.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-mapreduce-client-app:2.5.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-mapreduce-client-common:2.5.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-mapreduce-client-shuffle:2.5.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-mapreduce-client-jobclient:2.5.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-hdfs:2.5.1" level="project" />
<orderEntry type="library" name="Maven: commons-daemon:commons-daemon:1.0.13" level="project" />
<orderEntry type="library" name="Maven: net.sourceforge.jexcelapi:jxl:2.6.12" level="project" />
<orderEntry type="library" name="Maven: org.apache.poi:poi-ooxml:3.11-beta1" level="project" />
<orderEntry type="library" name="Maven: org.apache.poi:poi:3.11-beta1" level="project" />
<orderEntry type="library" name="Maven: org.apache.poi:poi-ooxml-schemas:3.11-beta1" level="project" />
<orderEntry type="library" name="Maven: org.apache.xmlbeans:xmlbeans:2.3.0" level="project" />
<orderEntry type="library" name="Maven: net.sf.json-lib:json-lib:jdk15:2.4" level="project" />
<orderEntry type="library" name="Maven: commons-beanutils:commons-beanutils:1.8.0" level="project" />
<orderEntry type="library" name="Maven: net.sf.ezmorph:ezmorph:1.0.6" level="project" />
<orderEntry type="library" name="Maven: org.apache.commons:commons-pool2:2.4.2" level="project" />
<orderEntry type="library" name="Maven: org.scala-lang:scala-library:2.11.8" level="project" />
<orderEntry type="library" name="Maven: org.scala-lang:scala-reflect:2.11.8" level="project" />
<orderEntry type="library" name="Maven: org.scala-lang:scala-compiler:2.11.8" level="project" />
<orderEntry type="library" name="Maven: org.scala-lang.modules:scala-xml_2.11:1.0.4" level="project" />
<orderEntry type="library" name="Maven: org.scala-lang.modules:scala-parser-combinators_2.11:1.0.4" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: junit:junit:4.11" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.hamcrest:hamcrest-core:1.3" level="project" />
<orderEntry type="library" name="Maven: org.apache.spark:spark-core_2.11:2.1.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.avro:avro-mapred:hadoop2:1.7.7" level="project" />
<orderEntry type="library" name="Maven: org.apache.avro:avro-ipc:tests:1.7.7" level="project" />
<orderEntry type="library" name="Maven: com.twitter:chill_2.11:0.8.0" level="project" />
<orderEntry type="library" name="Maven: com.esotericsoftware:kryo-shaded:3.0.3" level="project" />
<orderEntry type="library" name="Maven: com.esotericsoftware:minlog:1.3.0" level="project" />
<orderEntry type="library" name="Maven: org.objenesis:objenesis:2.1" level="project" />
<orderEntry type="library" name="Maven: com.twitter:chill-java:0.8.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.xbean:xbean-asm5-shaded:4.4" level="project" />
<orderEntry type="library" name="Maven: org.apache.spark:spark-launcher_2.11:2.1.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.spark:spark-network-common_2.11:2.1.0" level="project" />
<orderEntry type="library" name="Maven: org.fusesource.leveldbjni:leveldbjni-all:1.8" level="project" />
<orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-annotations:2.6.5" level="project" />
<orderEntry type="library" name="Maven: org.apache.spark:spark-network-shuffle_2.11:2.1.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.spark:spark-unsafe_2.11:2.1.0" level="project" />
<orderEntry type="library" name="Maven: net.java.dev.jets3t:jets3t:0.7.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.curator:curator-recipes:2.4.0" level="project" />
<orderEntry type="library" name="Maven: javax.servlet:javax.servlet-api:3.1.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.commons:commons-lang3:3.5" level="project" />
<orderEntry type="library" name="Maven: com.google.code.findbugs:jsr305:1.3.9" level="project" />
<orderEntry type="library" name="Maven: org.slf4j:jul-to-slf4j:1.7.16" level="project" />
<orderEntry type="library" name="Maven: com.ning:compress-lzf:1.0.3" level="project" />
<orderEntry type="library" name="Maven: org.roaringbitmap:RoaringBitmap:0.5.11" level="project" />
<orderEntry type="library" name="Maven: commons-net:commons-net:2.2" level="project" />
<orderEntry type="library" name="Maven: org.json4s:json4s-jackson_2.11:3.2.11" level="project" />
<orderEntry type="library" name="Maven: org.json4s:json4s-core_2.11:3.2.11" level="project" />
<orderEntry type="library" name="Maven: org.json4s:json4s-ast_2.11:3.2.11" level="project" />
<orderEntry type="library" name="Maven: org.glassfish.jersey.core:jersey-client:2.22.2" level="project" />
<orderEntry type="library" name="Maven: javax.ws.rs:javax.ws.rs-api:2.0.1" level="project" />
<orderEntry type="library" name="Maven: org.glassfish.hk2:hk2-api:2.4.0-b34" level="project" />
<orderEntry type="library" name="Maven: org.glassfish.hk2:hk2-utils:2.4.0-b34" level="project" />
<orderEntry type="library" name="Maven: org.glassfish.hk2.external:aopalliance-repackaged:2.4.0-b34" level="project" />
<orderEntry type="library" name="Maven: org.glassfish.hk2.external:javax.inject:2.4.0-b34" level="project" />
<orderEntry type="library" name="Maven: org.glassfish.hk2:hk2-locator:2.4.0-b34" level="project" />
<orderEntry type="library" name="Maven: org.glassfish.jersey.core:jersey-common:2.22.2" level="project" />
<orderEntry type="library" name="Maven: javax.annotation:javax.annotation-api:1.2" level="project" />
<orderEntry type="library" name="Maven: org.glassfish.jersey.bundles.repackaged:jersey-guava:2.22.2" level="project" />
<orderEntry type="library" name="Maven: org.glassfish.hk2:osgi-resource-locator:1.0.1" level="project" />
<orderEntry type="library" name="Maven: org.glassfish.jersey.core:jersey-server:2.22.2" level="project" />
<orderEntry type="library" name="Maven: org.glassfish.jersey.media:jersey-media-jaxb:2.22.2" level="project" />
<orderEntry type="library" name="Maven: javax.validation:validation-api:1.1.0.Final" level="project" />
<orderEntry type="library" name="Maven: org.glassfish.jersey.containers:jersey-container-servlet:2.22.2" level="project" />
<orderEntry type="library" name="Maven: org.glassfish.jersey.containers:jersey-container-servlet-core:2.22.2" level="project" />
<orderEntry type="library" name="Maven: com.clearspring.analytics:stream:2.7.0" level="project" />
<orderEntry type="library" name="Maven: io.dropwizard.metrics:metrics-core:3.1.2" level="project" />
<orderEntry type="library" name="Maven: io.dropwizard.metrics:metrics-jvm:3.1.2" level="project" />
<orderEntry type="library" name="Maven: io.dropwizard.metrics:metrics-json:3.1.2" level="project" />
<orderEntry type="library" name="Maven: io.dropwizard.metrics:metrics-graphite:3.1.2" level="project" />
<orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-databind:2.6.5" level="project" />
<orderEntry type="library" name="Maven: com.fasterxml.jackson.module:jackson-module-scala_2.11:2.6.5" level="project" />
<orderEntry type="library" name="Maven: com.fasterxml.jackson.module:jackson-module-paranamer:2.6.5" level="project" />
<orderEntry type="library" name="Maven: org.apache.ivy:ivy:2.4.0" level="project" />
<orderEntry type="library" name="Maven: oro:oro:2.0.8" level="project" />
<orderEntry type="library" name="Maven: net.razorvine:pyrolite:4.13" level="project" />
<orderEntry type="library" name="Maven: net.sf.py4j:py4j:0.10.4" level="project" />
<orderEntry type="library" name="Maven: org.apache.commons:commons-crypto:1.0.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.spark:spark-sql_2.11:2.1.0" level="project" />
<orderEntry type="library" name="Maven: com.univocity:univocity-parsers:2.2.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.spark:spark-sketch_2.11:2.1.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.spark:spark-catalyst_2.11:2.1.0" level="project" />
<orderEntry type="library" name="Maven: org.codehaus.janino:janino:3.0.0" level="project" />
<orderEntry type="library" name="Maven: org.codehaus.janino:commons-compiler:3.0.0" level="project" />
<orderEntry type="library" name="Maven: org.antlr:antlr4-runtime:4.5.3" level="project" />
<orderEntry type="library" name="Maven: org.apache.parquet:parquet-column:1.8.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.parquet:parquet-common:1.8.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.parquet:parquet-encoding:1.8.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.parquet:parquet-hadoop:1.8.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.parquet:parquet-format:2.3.0-incubating" level="project" />
<orderEntry type="library" name="Maven: org.apache.parquet:parquet-jackson:1.8.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.spark:spark-hive_2.11:2.1.0" level="project" />
<orderEntry type="library" name="Maven: com.twitter:parquet-hadoop-bundle:1.6.0" level="project" />
<orderEntry type="library" name="Maven: org.spark-project.hive:hive-exec:1.2.1.spark2" level="project" />
<orderEntry type="library" name="Maven: javolution:javolution:5.5.1" level="project" />
<orderEntry type="library" name="Maven: org.antlr:ST4:4.0.4" level="project" />
<orderEntry type="library" name="Maven: com.googlecode.javaewah:JavaEWAH:0.3.2" level="project" />
<orderEntry type="library" name="Maven: org.iq80.snappy:snappy:0.2" level="project" />
<orderEntry type="library" name="Maven: stax:stax-api:1.0.1" level="project" />
<orderEntry type="library" name="Maven: org.spark-project.hive:hive-metastore:1.2.1.spark2" level="project" />
<orderEntry type="library" name="Maven: org.apache.calcite:calcite-avatica:1.2.0-incubating" level="project" />
<orderEntry type="library" name="Maven: org.apache.calcite:calcite-core:1.2.0-incubating" level="project" />
<orderEntry type="library" name="Maven: org.apache.calcite:calcite-linq4j:1.2.0-incubating" level="project" />
<orderEntry type="library" name="Maven: net.hydromatic:eigenbase-properties:1.1.5" level="project" />
<orderEntry type="library" name="Maven: org.jodd:jodd-core:3.5.2" level="project" />
<orderEntry type="library" name="Maven: org.datanucleus:datanucleus-core:3.2.10" level="project" />
<orderEntry type="library" name="Maven: org.apache.thrift:libfb303:0.9.2" level="project" />
<orderEntry type="library" name="Maven: org.apache.spark:spark-yarn_2.11:2.1.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-yarn-api:2.2.0" level="project" />
<orderEntry type="library" name="Maven: com.google.inject.extensions:guice-servlet:3.0" level="project" />
<orderEntry type="library" name="Maven: com.google.inject:guice:3.0" level="project" />
<orderEntry type="library" name="Maven: javax.inject:javax.inject:1" level="project" />
<orderEntry type="library" name="Maven: aopalliance:aopalliance:1.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-yarn-common:2.2.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-yarn-server-web-proxy:2.2.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-yarn-server-common:2.2.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-yarn-client:2.2.0" level="project" />
<orderEntry type="library" name="Maven: com.h2database:h2:1.4.197" level="project" />
</component>
</module>

View File

@ -0,0 +1,54 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"1111",
"name":"SelectFilesByName",
"bundle":"cn.piflow.bundle.ftp.SelectFilesByName",
"properties":{
"HDFSUrl":"hdfs://master2.packone:8020",
"HDFSPath":"/microo/biproject/",
"selectionConditions":"bioproject.xml"
}
},
{
"uuid":"2222",
"name":"BioProjetDataParse",
"bundle":"cn.piflow.bundle.microorganism.BioProjetDataParse",
"properties":{
"es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72",
"port": "9200",
"es_index": "bioproject",
"es_type": "bioprojecttest002"
}
},
{
"uuid": "3333",
"name": "putEs",
"bundle": "cn.piflow.bundle.es.PutEs",
"properties": {
"es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72",
"port": "9200",
"es_index": "bioproject10",
"es_type": "bioproject10"
}
}
],
"paths":[
{
"from":"SelectFilesByName",
"outport":"",
"inport":"",
"to":"BioProjetDataParse"
},
{
"from":"BioProjetDataParse",
"outport":"",
"inport":"",
"to":"putEs"
}
]
}
}

View File

@ -0,0 +1,70 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"0000",
"name":"SelectFilesByName",
"bundle":"cn.piflow.bundle.ftp.SelectFilesByName",
"properties":{
"HDFSUrl":"hdfs://master2.packone:8020",
"HDFSPath":"/microo/",
"selectionConditions":".*ample_set.xml.gz"
}
},{
"uuid":"2222",
"name":"UnzipFilesOnHDFS",
"bundle":"cn.piflow.bundle.http.UnzipFilesOnHDFS",
"properties":{
"isCustomize":"false",
"filePath":"",
"hdfsUrl":"hdfs://master2.packone:8020",
"savePath":"/microo/biosample/biosample/"
}
},
{
"uuid":"2222",
"name":"BioSampleParse",
"bundle":"cn.piflow.bundle.microorganism.BioSampleParse",
"properties":{
}
},
{
"uuid":"3333",
"name":"putEs",
"bundle":"cn.piflow.bundle.es.PutEs",
"properties":{
"es_nodes":"10.0.88.70,10.0.88.71,10.0.88.72",
"port":"9200",
"es_index":"sample0122",
"es_type":"sample0122"
}
}
],
"paths":[
{
"from":"SelectFilesByName",
"outport":"",
"inport":"",
"to":"UnzipFilesOnHDFS"
},
{
"from":"UnzipFilesOnHDFS",
"outport":"",
"inport":"",
"to":"BioSampleParse"
},
{
"from":"BioSampleParse",
"outport":"",
"inport":"",
"to":"putEs"
}
]
}
}

View File

@ -0,0 +1,71 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"1111",
"name":"SelectFilesByName",
"bundle":"cn.piflow.bundle.ftp.SelectFilesByName",
"properties":{
"HDFSUrl":"hdfs://master2.packone:8020",
"HDFSPath":"/microo/genbank/",
"selectionConditions":".*.seq.gz"
}
},{
"uuid":"2222",
"name":"UnzipFilesOnHDFS",
"bundle":"cn.piflow.bundle.http.UnzipFilesOnHDFS",
"properties":{
"isCustomize":"false",
"filePath":"",
"hdfsUrl":"hdfs://master2.packone:8020",
"savePath":"/microo/genbank/"
}
},
{
"uuid":"3333",
"name":"GenBankParse",
"bundle":"cn.piflow.bundle.microorganism.GenBankParse",
"properties":{
"es_nodes":"10.0.86.239",
"port":"9200",
"es_index":"genbank",
"es_type":"data6"
}
},
{
"uuid": "4444",
"name": "putEs",
"bundle": "cn.piflow.bundle.es.PutEs",
"properties": {
"es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72",
"port": "9200",
"es_index": "genbank",
"es_type": "genbank1"
}
}
],
"paths":[
{
"from":"SelectFilesByName",
"outport":"",
"inport":"",
"to":"UnzipFilesOnHDFS"
},
{
"from":"UnzipFilesOnHDFS",
"outport":"",
"inport":"",
"to":"GenBankParse"
},
{
"from":"GenBankParse",
"outport":"",
"inport":"",
"to":"putEs"
}
]
}
}

View File

@ -0,0 +1,50 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"1111",
"name":"SelectFilesByName",
"bundle":"cn.piflow.bundle.ftp.SelectFilesByName",
"properties":{
"HDFSUrl":"hdfs://master2.packone:8020",
"HDFSPath":"/microoAll/",
"selectionConditions":"go.obo"
}
},
{
"uuid": "3333",
"name": "GoDataParse",
"bundle": "cn.piflow.bundle.microorganism.GoDataParse",
"properties": {
}
},
{
"uuid": "4444",
"name": "putEs",
"bundle": "cn.piflow.bundle.es.PutEs",
"properties": {
"es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72",
"port": "9200",
"es_index": "go",
"es_type": "go"
}
}
],
"paths":[
{
"from":"SelectFilesByName",
"outport":"",
"inport":"",
"to":"GoDataParse"
},
{
"from":"GoDataParse",
"outport":"",
"inport":"",
"to":"putEs"
}
]
}
}

View File

@ -0,0 +1,64 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"0000",
"name":"FileDownHDFS",
"bundle":"cn.piflow.bundle.http.FileDownHDFS",
"properties":{
"url_str":"https://gold.jgi.doe.gov/download?mode=site_excel",
"savePath":"hdfs://master2.packone:8020/microo/golddata/gold.xlsx"
}
},
{
"uuid": "1111",
"name": "ExcelParser",
"bundle": "cn.piflow.bundle.excel.ExcelParser",
"properties": {
"jaonSavePath":"hdfs://master2.packone:8020/microo/golddata/golddata.json"
}
},
{
"uuid": "2222",
"name": "GoldDataParse",
"bundle": "cn.piflow.bundle.microorganism.GoldDataParse",
"properties": {
}
},
{
"uuid": "3333",
"name": "putEs",
"bundle": "cn.piflow.bundle.es.PutEs",
"properties": {
"es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72",
"port": "9200",
"es_index": "golddata1",
"es_type": "golddatadaa"
}
}
],
"paths":[
{
"from":"FileDownHDFS",
"outport":"",
"inport":"",
"to":"ExcelParser"
},
{
"from":"ExcelParser",
"outport":"",
"inport":"",
"to":"GoldDataParse"
},
{
"from":"GoldDataParse",
"outport":"",
"inport":"",
"to":"putEs"
}
]
}
}

View File

@ -0,0 +1,67 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"1111",
"name":"SelectFilesByName",
"bundle":"cn.piflow.bundle.ftp.SelectFilesByName",
"properties":{
"HDFSUrl":"hdfs://master2.packone:8020",
"HDFSPath":"/microoAll/",
"selectionConditions":"interpro.xml.gz"
}
},{
"uuid":"2222",
"name":"UnzipFilesOnHDFS",
"bundle":"cn.piflow.bundle.http.UnzipFilesOnHDFS",
"properties":{
"isCustomize":"false",
"filePath":"",
"hdfsUrl":"hdfs://master2.packone:8020",
"savePath":"/microoAll/inter/"
}
},
{
"uuid": "3333",
"name": "InterprodataParse",
"bundle": "cn.piflow.bundle.microorganism.InterprodataParse",
"properties": {
}
},
{
"uuid": "4444",
"name": "putEs",
"bundle": "cn.piflow.bundle.es.PutEs",
"properties": {
"es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72",
"port": "9200",
"es_index": "interpro",
"es_type": "interpro"
}
}
],
"paths":[
{
"from":"SelectFilesByName",
"outport":"",
"inport":"",
"to":"UnzipFilesOnHDFS"
},
{
"from":"UnzipFilesOnHDFS",
"outport":"",
"inport":"",
"to":"InterprodataParse"
},
{
"from":"InterprodataParse",
"outport":"",
"inport":"",
"to":"putEs"
}
]
}
}

View File

@ -0,0 +1,69 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"1111",
"name":"SelectFilesByName",
"bundle":"cn.piflow.bundle.ftp.SelectFilesByName",
"properties":{
"HDFSUrl":"hdfs://master2.packone:8020",
"HDFSPath":"/microo/taxonomy/",
"selectionConditions":"taxdump.tar.gz"
}
},{
"uuid":"2222",
"name":"UnzipFilesOnHDFS",
"bundle":"cn.piflow.bundle.http.UnzipFilesOnHDFS",
"properties":{
"isCustomize":"true",
"filePath":"/microo/taxonomy/taxdump.tar.gz",
"hdfsUrl":"hdfs://master2.packone:8020",
"savePath":"/microo/taxonomy/taxdump/"
}
},
{
"uuid":"3333",
"name":"TaxonomyParse",
"bundle":"cn.piflow.bundle.microorganism.TaxonomyParse",
"properties":{
}
},
{
"uuid": "4444",
"name": "putEs",
"bundle": "cn.piflow.bundle.es.PutEs",
"properties": {
"es_nodes": "10.0.88.70,10.0.88.71,10.0.88.72",
"port": "9200",
"es_index": "taxonomy",
"es_type": "taxonomy"
}
}
],
"paths":[
{
"from":"SelectFilesByName",
"outport":"",
"inport":"",
"to":"UnzipFilesOnHDFS"
},
{
"from":"UnzipFilesOnHDFS",
"outport":"",
"inport":"",
"to":"TaxonomyParse"
},
{
"from":"TaxonomyParse",
"outport":"",
"inport":"",
"to":"putEs"
}
]
}
}

View File

@ -13,11 +13,10 @@ class FetchEs extends ConfigurableStop {
override val inportList: List[String] = List(PortEnum.NonePort.toString)
override val outportList: List[String] = List(PortEnum.DefaultPort.toString)
override val description: String = "fetch data with dataframe from elasticSearch "
override val description: String = "Fetch data from Elasticsearch "
var es_nodes:String = _ //es的节点多个用逗号隔开
var port:String= _ //es的端口好
var es_port:String= _ //es的端口好
var es_index:String = _ //es的索引
var es_type:String = _ //es的类型
@ -27,11 +26,10 @@ class FetchEs extends ConfigurableStop {
val ssc = spark.sqlContext
val options = Map("es.index.auto.create"-> "true","es.nodes.wan.only"->"true",
"es.nodes"->es_nodes,"es.port"->port)
"es.nodes"->es_nodes,"es.port"->es_port)
//load data with df from es
val outDf = ssc.read.format("org.elasticsearch.spark.sql").options(options).load(s"${es_index}/${es_type}")
//outDf.show()
out.write(outDf)
}
@ -41,21 +39,21 @@ class FetchEs extends ConfigurableStop {
def setProperties(map : Map[String, Any]): Unit = {
es_nodes=MapUtil.get(map,key="es_nodes").asInstanceOf[String]
port=MapUtil.get(map,key="port").asInstanceOf[String]
es_port=MapUtil.get(map,key="es_port").asInstanceOf[String]
es_index=MapUtil.get(map,key="es_index").asInstanceOf[String]
es_type=MapUtil.get(map,key="es_type").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("es_nodes").defaultValue("").required(true)
val port = new PropertyDescriptor().name("port").displayName("port").defaultValue("").required(true)
val es_index = new PropertyDescriptor().name("es_index").displayName("es_index").defaultValue("").required(true)
val es_type = new PropertyDescriptor().name("es_type").displayName("es_type").defaultValue("").required(true)
val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("es_nodes").defaultValue("Node of Elasticsearch").required(true)
val es_port = new PropertyDescriptor().name("es_port").displayName("es_port").defaultValue("Port of Elasticsearch").required(true)
val es_index = new PropertyDescriptor().name("es_index").displayName("es_index").defaultValue("Index of Elasticsearch").required(true)
val es_type = new PropertyDescriptor().name("es_type").displayName("es_type").defaultValue("Type of Elasticsearch").required(true)
descriptor = es_nodes :: descriptor
descriptor = port :: descriptor
descriptor = es_port :: descriptor
descriptor = es_index :: descriptor
descriptor = es_type :: descriptor

View File

@ -9,40 +9,28 @@ import org.elasticsearch.spark.sql.EsSparkSQL
class PutEs extends ConfigurableStop {
override val description: String = "put data with dataframe to elasticSearch "
override val description: String = "Put data to Elasticsearch "
val authorEmail: String = "ygang@cnic.cn"
override val inportList: List[String] = List(PortEnum.DefaultPort.toString)
override val outportList: List[String] = List(PortEnum.NonePort.toString)
var es_nodes:String = _ //es的节点多个用逗号隔开
var port:String= _ //es的端口好
var es_port:String= _ //es的端口好
var es_index:String = _ //es的索引
var es_type:String = _ //es的类型
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
val inDf = in.read()
//inDf.show()
val sc = spark.sparkContext
val options = Map("es.index.auto.create"-> "true",
"es.nodes"->es_nodes,"es.port"->port)
"es.nodes"->es_nodes,"es.port"->es_port)
//保存 df 到es
EsSparkSQL.saveToEs(inDf,s"${es_index}/${es_type}",options)
// val json1 = """{"name":"jack", "age":24, "sex":"man"}"""
// val json2 = """{"name":"rose", "age":22, "sex":"woman"}"""
//
// val rddData = sc.makeRDD(Seq(json1, json2))
//
// EsSpark.saveJsonToEs(rddData, "spark/json2",options)
//自定义id
// EsSpark.saveJsonToEs(rddData, "spark/json1", Map("es.mapping.id"->"name"))
}
@ -52,21 +40,21 @@ class PutEs extends ConfigurableStop {
def setProperties(map : Map[String, Any]): Unit = {
es_nodes=MapUtil.get(map,key="es_nodes").asInstanceOf[String]
port=MapUtil.get(map,key="port").asInstanceOf[String]
es_port=MapUtil.get(map,key="es_port").asInstanceOf[String]
es_index=MapUtil.get(map,key="es_index").asInstanceOf[String]
es_type=MapUtil.get(map,key="es_type").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("es_nodes").defaultValue("").required(true)
val port = new PropertyDescriptor().name("port").displayName("port").defaultValue("").required(true)
val es_index = new PropertyDescriptor().name("es_index").displayName("es_index").defaultValue("").required(true)
val es_type = new PropertyDescriptor().name("es_type").displayName("es_type").defaultValue("").required(true)
val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("es_nodes").defaultValue("Node of Elasticsearch").required(true)
val es_port = new PropertyDescriptor().name("es_port").displayName("es_port").defaultValue("Port of Elasticsearch").required(true)
val es_index = new PropertyDescriptor().name("es_index").displayName("es_index").defaultValue("Index of Elasticsearch").required(true)
val es_type = new PropertyDescriptor().name("es_type").displayName("es_type").defaultValue("Type of Elasticsearch").required(true)
descriptor = es_nodes :: descriptor
descriptor = port :: descriptor
descriptor = es_port :: descriptor
descriptor = es_index :: descriptor
descriptor = es_type :: descriptor

View File

@ -5,73 +5,32 @@ import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.spark.sql.SparkSession
import org.elasticsearch.spark.rdd.EsSpark
class QueryEs extends ConfigurableStop {
val authorEmail: String = "ygang@cnic.cn"
override val inportList: List[String] = List(PortEnum.NonePort.toString)
override val outportList: List[String] = List(PortEnum.DefaultPort.toString)
override val description: String = "query data with dataframe from elasticSearch "
override val description: String = "Query data from Elasticsearch "
var es_nodes:String = _ //es的节点多个用逗号隔开
var port:String= _ //es的端口好
var es_port:String= _ //es的端口好
var es_index:String = _ //es的索引
var es_type:String = _ //es的类型
var field_name:String = _ //es的字段类型
var field_content:String = _ //es的字段内容
var jsonDSL:String = _ //es的字段类型
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
val ssc = spark.sqlContext
// 查询 语句 字段内容
val query =
s"""
|{
| "query":{
| "match":{
|
| "${field_name}":"${field_content}"
| }
| }
|}
""".stripMargin
// val query2 =
// s"""
// |{
// | "query":{
// | "terms":{
// |
// | "age":[22]
// | }
// | }
// |}
// """.stripMargin
//
// val query3 =
// s"""
// |{
// | "query":{
// | "match":{
// | "name":"rose *"
// | }
// | }
// |}
// """.stripMargin
val options = Map("es.index.auto.create"-> "true",
"es.nodes.wan.only"->"true",
"es.query" -> query,
"es.nodes"->es_nodes,"es.port"->port)
"es.query" -> jsonDSL,
"es.nodes"->es_nodes,"es.port"->es_port)
val outDf = ssc.read.format("org.elasticsearch.spark.sql")
.options(options).load(s"${es_index}/${es_type}")
//outDf.show()
out.write(outDf)
}
@ -82,30 +41,26 @@ class QueryEs extends ConfigurableStop {
def setProperties(map : Map[String, Any]): Unit = {
es_nodes=MapUtil.get(map,key="es_nodes").asInstanceOf[String]
port=MapUtil.get(map,key="port").asInstanceOf[String]
es_port=MapUtil.get(map,key="es_port").asInstanceOf[String]
es_index=MapUtil.get(map,key="es_index").asInstanceOf[String]
es_type=MapUtil.get(map,key="es_type").asInstanceOf[String]
field_name=MapUtil.get(map,key="field_name").asInstanceOf[String]
field_content=MapUtil.get(map,key="field_content").asInstanceOf[String]
jsonDSL=MapUtil.get(map,key="jsonDSL").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("es_nodes").defaultValue("").required(true)
val port = new PropertyDescriptor().name("port").displayName("port").defaultValue("").required(true)
val es_index = new PropertyDescriptor().name("es_index").displayName("es_index").defaultValue("").required(true)
val es_type = new PropertyDescriptor().name("es_type").displayName("es_type").defaultValue("").required(true)
val field_name = new PropertyDescriptor().name("field_name").displayName("field_name").defaultValue("").required(true)
val field_content = new PropertyDescriptor().name("field_content").displayName("field_content").defaultValue("").required(true)
val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("es_nodes").defaultValue("Node of Elasticsearch").required(true)
val es_port = new PropertyDescriptor().name("es_port").displayName("es_port").defaultValue("Port of Elasticsearch").required(true)
val es_index = new PropertyDescriptor().name("es_index").displayName("es_index").defaultValue("Index of Elasticsearch").required(true)
val es_type = new PropertyDescriptor().name("es_type").displayName("es_type").defaultValue("Type of Elasticsearch").required(true)
descriptor = es_nodes :: descriptor
descriptor = port :: descriptor
descriptor = es_port :: descriptor
descriptor = es_index :: descriptor
descriptor = es_type :: descriptor
descriptor = field_name :: descriptor
descriptor = field_content :: descriptor
descriptor
}

View File

@ -1,15 +1,16 @@
package cn.piflow.bundle.excel
import java.io.File
import java.io.{BufferedInputStream, ByteArrayInputStream}
import cn.piflow._
import cn.piflow.bundle.util.ExcelToJson
import cn.piflow.conf._
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.ImageUtil
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import net.sf.json.JSONArray
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem, Path}
import org.apache.spark.sql.{DataFrame, SparkSession}
class ExcelParser extends ConfigurableStop{
@ -18,38 +19,89 @@ class ExcelParser extends ConfigurableStop{
val inportList: List[String] = List(PortEnum.DefaultPort.toString)
val outportList: List[String] = List(PortEnum.DefaultPort.toString)
var excelPath: String = _
var jsonSavePath: String = _
var list = List("")
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
val sc = spark.sparkContext
import spark.implicits._
val inDf = in.read()
val rows: Array[Row] = inDf.collect()
for (i <- 0 until rows.length){
val path1 = rows(i)(0).toString
println("***"+path1+"---")
// "excelPath":"/ftpGoldData/test1.xlsx"
if (path1.endsWith(".xls") || path1.endsWith("xlsx")){
println(path1)
val f1 = new File(path1)
// 调用 工具类 解析 Excel .xls .xlsx
val array: JSONArray = ExcelToJson.readExcel(f1)
for (i <- 0 until array.size()){
list = array.get(i).toString :: list
}
}
val configuration: Configuration = new Configuration()
var pathStr: String =inDf.take(1)(0).get(0).asInstanceOf[String]
val pathARR: Array[String] = pathStr.split("\\/")
var hdfsUrl:String=""
for (x <- (0 until 3)){
hdfsUrl+=(pathARR(x) +"/")
}
val outDF = sc.parallelize(list).toDF("jsonObject")
//println(outDF.count())
//outDF.show()
out.write(outDF)
configuration.set("fs.defaultFS",hdfsUrl)
var fs: FileSystem = FileSystem.get(configuration)
val path: Path = new Path(jsonSavePath)
if(fs.exists(path)){
fs.delete(path)
}
fs.create(path).close()
var fdosOut: FSDataOutputStream = fs.append(path)
var jsonStr: String =""
var bisIn: BufferedInputStream =null
var count = 0 ;
inDf.collect().foreach(row=>{
val pathStr = row.get(0).asInstanceOf[String]
if (pathStr.endsWith(".xls") || pathStr.endsWith("xlsx")){
val array: JSONArray = ExcelToJson.readExcel(pathStr,hdfsUrl)
println(array.size())
for (i <- 0 until array.size()){
jsonStr = array.get(i).toString
if (count == 0) {
bisIn = new BufferedInputStream(new ByteArrayInputStream(("[" + jsonStr).getBytes()))
count+=1
} else if (count==1){
bisIn = new BufferedInputStream(new ByteArrayInputStream(("," + jsonStr).getBytes()))
}
val buff: Array[Byte] = new Array[Byte](1048576)
var num: Int = bisIn.read(buff)
while (num != -1) {
fdosOut.write(buff, 0, num)
fdosOut.flush()
num = bisIn.read(buff)
}
fdosOut.flush()
bisIn = null
}
}
})
bisIn = new BufferedInputStream(new ByteArrayInputStream(("]").getBytes()))
val buff: Array[Byte] = new Array[Byte](1048576)
var num: Int = bisIn.read(buff)
while (num != -1) {
fdosOut.write(buff, 0, num)
fdosOut.flush()
num = bisIn.read(buff)
}
fdosOut.flush()
fdosOut.close()
val df: DataFrame = spark.read.json(jsonSavePath)
out.write(df)
}
@ -58,13 +110,13 @@ class ExcelParser extends ConfigurableStop{
}
def setProperties(map : Map[String, Any]): Unit = {
// excelPath = MapUtil.get(map,"excelPath").asInstanceOf[String]
jsonSavePath = MapUtil.get(map,"jsonSavePath").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
// val excelPath = new PropertyDescriptor().name("excelPath").displayName("excelPath").description("The path of excel file").defaultValue("").required(true)
// descriptor = excelPath :: descriptor
val jsonSavePath = new PropertyDescriptor().name("jsonSavePath").displayName("jsonSavePath").description("save path of json").defaultValue("").required(true)
descriptor = jsonSavePath :: descriptor
descriptor
}

View File

@ -34,20 +34,8 @@ class DeleteHdfs extends ConfigurableStop{
config.set("fs.defaultFS",hdfsUrl)
val fs = FileSystem.get(config)
println(path+"ddddddddddddddddddd--delete")
fs.delete(path,true)
// if (fs.isDirectory(path)){
// println("-------wenjianjia-----")
// fs.delete(path,true)
// }
//
// else if(fs.isFile(path)){
// println("wenjian -------------------------")
// fs.delete(path,true)
// } else {
// fs.delete(path, true)
// }
}
}
override def setProperties(map: Map[String, Any]): Unit = {

View File

@ -9,7 +9,7 @@ import org.apache.spark.sql.SparkSession
class GetHdfs extends ConfigurableStop{
override val authorEmail: String = "ygang@cmic.com"
override val description: String = "write dataframe data from hdfs"
override val description: String = "get data from hdfs"
override val inportList: List[String] = List(PortEnum.NonePort.toString)
override val outportList: List[String] = List(PortEnum.DefaultPort.toString)
@ -22,22 +22,8 @@ class GetHdfs extends ConfigurableStop{
val sc= spark.sparkContext
import spark.implicits._
println(hdfsPath+"gggggggggggggggggggggggg ---getHdfs---txt")
val path = hdfsUrl+hdfsPath
// val array = hdfsPath.split(",")
//
// val buffer = new StringBuffer()
// for (i<- 0 until array.length) {
// if (i== array.length-1){
// buffer.append(hdfsUrl+array(i))
// } else {
// buffer.append(hdfsUrl+array(i)+",")
// }
// }
// println(buffer.toString)
if (types == "json") {
val rdd = spark.read.json(path)
//rdd.show()
@ -56,7 +42,6 @@ class GetHdfs extends ConfigurableStop{
//rdd.show()
rdd.schema.printTreeString()
out.write(rdd)
}
else {
val rdd = sc.textFile(path)
@ -66,17 +51,6 @@ class GetHdfs extends ConfigurableStop{
out.write(outDf)
}
// val rdd = spark.read.text("hdfs://10.0.86.89:9000/yg/test/hdf1.txt")
// rdd.show()
// rdd.schema.printTreeString()
// println( rdd.count())
// val rdd = ssc.read.load("hdfs://10.0.86.89:9000/yg/test/hdf1.txt")
}
override def setProperties(map: Map[String, Any]): Unit = {
hdfsUrl = MapUtil.get(map,key="hdfsUrl").asInstanceOf[String]

View File

@ -30,11 +30,7 @@ class ListHdfs extends ConfigurableStop{
import spark.implicits._
val outDF = sc.parallelize(list).toDF("path")
//outDF.show()
outDF.schema.printTreeString()
val outDF = sc.parallelize(list).toDF()
out.write(outDF)
}
@ -56,7 +52,6 @@ class ListHdfs extends ConfigurableStop{
iterationFile(fsPath)
} else{
list = f.getPath.toString::list
}
}

View File

@ -14,25 +14,20 @@ class PutHdfs extends ConfigurableStop{
override val authorEmail: String = "ygang@cmic.com"
override val inportList: List[String] = List(PortEnum.DefaultPort.toString)
override val outportList: List[String] = List(PortEnum.NonePort.toString)
override val description: String = "from dataframe write data to hdfs"
override val description: String = "Put data to hdfs"
var hdfsPath :String= _
var hdfsUrl :String= _
var types :String= _
var partition :Int= 3
var partition :Int= _
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
val inDF = in.read()
//inDF.show()
inDF.schema.printTreeString()
//val path = new Path(hdfsUrl+hdfsPath)
val config = new Configuration()
config.set("fs.defaultFS",hdfsUrl)
val fs = FileSystem.get(config)
println(hdfsUrl+hdfsPath+"pppppppppppppppppppppppppppppppp--putHdfs")
if (types=="json"){
inDF.repartition(partition).write.json(hdfsUrl+hdfsPath)
@ -48,7 +43,7 @@ class PutHdfs extends ConfigurableStop{
hdfsUrl = MapUtil.get(map,key="hdfsUrl").asInstanceOf[String]
hdfsPath = MapUtil.get(map,key="hdfsPath").asInstanceOf[String]
types = MapUtil.get(map,key="types").asInstanceOf[String]
val partition1 = MapUtil.get(map,key="partition").asInstanceOf[String]
partition = MapUtil.get(map,key="partition").asInstanceOf[Int]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {

View File

@ -32,7 +32,6 @@ class GetUrl extends ConfigurableStop{
// xml String
var label:String=_
var schema: String = _
// var xmlString :String=_
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
@ -45,24 +44,17 @@ class GetUrl extends ConfigurableStop{
val response:CloseableHttpResponse = client.execute(getFlowInfo)
val entity = response.getEntity
val jsonString = EntityUtils.toString(entity,"UTF-8")
println("-------------------------------------------")
if (types == "json"){
// json to df
val jsonRDD = ss.sparkContext.makeRDD(jsonString :: Nil)
val jsonDF = ss.read.json(jsonRDD)
//jsonDF.schema.printTreeString()
//jsonDF.show(10)
//jsonDF.select("app.id").show()
out.write(jsonDF)
}
if(types=="xml"){
println("8888888888888888888888888888888888888888888888888888888")
val doc: Document = DocumentHelper.parseText(jsonString)
val rootElt: Element = doc.getRootElement
var arrbuffer:ArrayBuffer[Element]=ArrayBuffer()
@ -98,8 +90,6 @@ class GetUrl extends ConfigurableStop{
list.+=(text.substring(0,text.length-1))
}
val listRows: List[Row] = list.toList.map(line => {
val seq: Seq[String] = line.split(",").toSeq
val row = Row.fromSeq(seq)
@ -115,10 +105,7 @@ class GetUrl extends ConfigurableStop{
val outDf: DataFrame = ss.createDataFrame(rowRDD,structType)
//outDf.show(20)
outDf.schema.printTreeString()
out.write(outDf)
}
@ -128,8 +115,6 @@ class GetUrl extends ConfigurableStop{
override def setProperties(map: Map[String, Any]): Unit = {
url = MapUtil.get(map,key="url").asInstanceOf[String]
types= MapUtil.get(map,key="types").asInstanceOf[String]
// xmlString = MapUtil.get(map,"XmlString").asInstanceOf[String]
label = MapUtil.get(map,"label").asInstanceOf[String]
schema = MapUtil.get(map,"schema").asInstanceOf[String]
@ -139,9 +124,6 @@ class GetUrl extends ConfigurableStop{
var descriptor : List[PropertyDescriptor] = List()
val url = new PropertyDescriptor().name("url").displayName("url").defaultValue("").required(true)
val types = new PropertyDescriptor().name("types").displayName("types").defaultValue("the url content is json or xml)").required(true)
// val xmlString = new PropertyDescriptor().name("XmlString").displayName("XmlString").description("the xml String").defaultValue("").required(true)
// descriptor = xmlString :: descriptor
val label = new PropertyDescriptor().name("label").displayName("label").description("label path for hope,the delimiter is ,").defaultValue("").required(true)
descriptor = label :: descriptor
val schema = new PropertyDescriptor().name("schema").displayName("schema").description("name of field in label,the delimiter is ,").defaultValue("").required(true)

View File

@ -28,10 +28,6 @@ class InvokeUrl extends ConfigurableStop{
override val outportList: List[String] = List(PortEnum.NonePort.toString)
override val description: String = "invoke http "
// var urlPut :String= _
// var urlPost :String= _
// var urlDelete :String= _
// var urlGet :String= _
var url :String= _
var jsonPath :String =_
@ -56,8 +52,6 @@ class InvokeUrl extends ConfigurableStop{
val entity = response.getEntity
val jsonString = EntityUtils.toString(entity, "UTF-8")
println("=====================================================================invoke get")
// json to df
if (types == "json") {
// json to df
@ -122,12 +116,7 @@ class InvokeUrl extends ConfigurableStop{
val outDf: DataFrame = spark.createDataFrame(rowRDD, structType)
//outDf.show(20)
//outDf.schema.printTreeString()
out.write(outDf)
println("====================================================================")
}
@ -143,7 +132,6 @@ class InvokeUrl extends ConfigurableStop{
buffer.append(lineTxt.mkString)
lineTxt = bufferReader.readLine()
}
println(buffer)
if (method == "putHttp") {
val put = new HttpPut(url)
@ -157,8 +145,6 @@ class InvokeUrl extends ConfigurableStop{
if (entity != null) {
result = EntityUtils.toString(entity, "utf-8")
}
println(response)
println(result)
put.releaseConnection()
} else {
val post = new HttpPost(url)
@ -168,22 +154,17 @@ class InvokeUrl extends ConfigurableStop{
val response = client.execute(post)
val entity = response.getEntity
val str = EntityUtils.toString(entity, "UTF-8")
println(response)
println("Code is " + str)
}
}
if (method == "deleteHttp") {
println(url)
val inDf = in.read()
inDf.createOrReplaceTempView("table")
val sqlDF = inDf.sqlContext.sql(s"select $colume from table")
//sqlDF.show()
val array = sqlDF.collect()
for (i <- 0 until array.length) {
var url1 = ""
val newArray = array(i)
@ -197,10 +178,7 @@ class InvokeUrl extends ConfigurableStop{
builder.append(columns(i) + "=" + newArray(i) + "&")
}
}
// println(builder)
url1 = url + "?" + builder
println(url1 + "##########################################################")
val delete = new HttpDelete(url1)
delete.setHeader("content-Type", "application/json")
@ -208,8 +186,6 @@ class InvokeUrl extends ConfigurableStop{
val response = client.execute(delete)
val entity = response.getEntity
val str = EntityUtils.toString(entity, "UTF-8")
println("Code is " + str)
println(response)
}
@ -221,10 +197,6 @@ class InvokeUrl extends ConfigurableStop{
override def setProperties(map: Map[String, Any]): Unit = {
url = MapUtil.get(map,key="url").asInstanceOf[String]
// urlPut = MapUtil.get(map,key="urlPut").asInstanceOf[String]
// urlPost = MapUtil.get(map,key="urlPost").asInstanceOf[String]
// urlDelete = MapUtil.get(map,key="urlDelete").asInstanceOf[String]
// urlGet = MapUtil.get(map,key="urlGet").asInstanceOf[String]
jsonPath = MapUtil.get(map,key="jsonPath").asInstanceOf[String]
method = MapUtil.get(map,key = "method").asInstanceOf[String]
@ -240,10 +212,6 @@ class InvokeUrl extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
// val urlPut = new PropertyDescriptor().name("urlPut").displayName("urlPutPost").defaultValue("").required(true)
// val urlPost = new PropertyDescriptor().name("urlPost").displayName("urlPutPost").defaultValue("").required(true)
// val urlDelete = new PropertyDescriptor().name("urlDelete").displayName("urlPutPost").defaultValue("").required(true)
// val urlGet = new PropertyDescriptor().name("urlGet").displayName("urlGet").defaultValue("").required(true)
val url = new PropertyDescriptor().name("url").displayName("url").defaultValue("").required(true)
val jsonPath = new PropertyDescriptor().name("jsonPath").displayName("JSONPATH").defaultValue("").required(true)
val method = new PropertyDescriptor().name("method").displayName("the way with http").defaultValue("").required(true)
@ -258,10 +226,6 @@ class InvokeUrl extends ConfigurableStop{
val schema = new PropertyDescriptor().name("schema").displayName("schema").description("name of field in label,the delimiter is ,").defaultValue("").required(true)
descriptor = schema :: descriptor
// descriptor = urlPut :: descriptor
// descriptor = urlPost :: descriptor
// descriptor = urlDelete :: descriptor
// descriptor = urlGet :: descriptor
descriptor = jsonPath :: descriptor
descriptor = method :: descriptor
descriptor = colume :: descriptor

View File

@ -41,7 +41,7 @@ class PostUrl extends ConfigurableStop{
buffer.append(lineTxt.mkString)
lineTxt=bufferReader.readLine()
}
println(buffer)
// post
val client = HttpClients.createDefault()
@ -54,7 +54,6 @@ class PostUrl extends ConfigurableStop{
val response = client.execute(post)
val entity = response.getEntity
val str = EntityUtils.toString(entity,"UTF-8")
println(response)
println("Code is " + str)
}

View File

@ -1,30 +1,28 @@
package cn.piflow.bundle.microorganism
import java.io._
import java.net.UnknownHostException
import java.util.regex.Pattern
import java.util.regex.{Matcher, Pattern}
import cn.piflow.bundle.microorganism.util.BioProject
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.spark.sql.{Row, SparkSession}
import org.elasticsearch.spark.sql.EsSparkSQL
import org.json.XML
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, FileSystem, Path}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.json.{JSONArray, JSONObject, XML}
class BioProjetDataParse extends ConfigurableStop{
val authorEmail: String = "ygang@cnic.cn"
val description: String = "Load file from ftp url."
val description: String = "Parsing BioProjet type data"
val inportList: List[String] = List(PortEnum.DefaultPort.toString)
val outportList: List[String] = List(PortEnum.NonePort.toString)
var es_nodes:String = _ //es的节点多个用逗号隔开
var port:String= _ //es的端口好
var es_index:String = _ //es的索引
var es_type:String = _ //es的类型
var cachePath:String = _
var name:String = "Package"
var dp = Pattern.compile("((\\d{4})-(\\d{2})-(\\d{2}))(T.*)")
@ -33,187 +31,193 @@ class BioProjetDataParse extends ConfigurableStop{
val sc = spark.sparkContext
val inDf= in.read()
//inDf.show()
//inDf.schema.printTreeString()
val rows: Array[Row] = inDf.collect()
val configuration: Configuration = new Configuration()
var pathStr: String =inDf.take(1)(0).get(0).asInstanceOf[String]
val pathARR: Array[String] = pathStr.split("\\/")
var hdfsUrl:String=""
for (x <- (0 until 3)){
hdfsUrl+=(pathARR(x) +"/")
}
configuration.set("fs.defaultFS",hdfsUrl)
var fs: FileSystem = FileSystem.get(configuration)
var path:String = null
for (i <- 0 until rows.size) {
if (rows(i)(0).toString.endsWith("bioproject.xml")){
path = rows(i)(0).toString
val hdfsPathTemporary = hdfsUrl+cachePath+"/bioprojectCatch/bioproject.json"
// val path1 = "/ftpBioProject/bioproject.xml"
try {
val br = new BufferedReader(new FileReader(path))
val path: Path = new Path(hdfsPathTemporary)
if(fs.exists(path)){
fs.delete(path)
}
fs.create(path).close()
var fdosOut: FSDataOutputStream = fs.append(path)
var jsonStr: String =""
var bisIn: BufferedInputStream =null
inDf.collect().foreach(row =>{
val pathStr = row.get(0).asInstanceOf[String]
var fdis: FSDataInputStream = fs.open(new Path(pathStr))
val br: BufferedReader = new BufferedReader(new InputStreamReader(fdis))
var line: String = null
var i = 0
while (i < 2) {
var i= 0
while(i<2){
br.readLine()
i = i + 1
}
var count = 0
var xml = new StringBuffer()
var x = 0
while ((line = br.readLine()) != null || x ==0) {
while ((line = br.readLine()) != null && x <1 && line!= null) {
xml.append(line)
if (line.equals("</PackageSet>")) {
if (line.equals("</PackageSet>")){
println("----------------------------------break")
x == 1
return x
}
else if (line.indexOf("</" + name + ">") != -1) { //reach the end of a doc
println("-----------------------------------------"+count)
} else if (line.indexOf("</" + name + ">") != -1){ //reach the end of a doc
count = count + 1
val doc = XML.toJSONObject(xml.toString()).getJSONObject(name)
println("#####################################################"+count)
println(doc)
val doc: JSONObject = XML.toJSONObject(xml.toString()).getJSONObject(name)
xml = new StringBuffer()
// accession PRJNA31525
val accession = doc.getJSONObject("Project").getJSONObject("Project")
.getJSONObject("ProjectID")
.getJSONObject("ArchiveID")
.getString("accession")
val projectDescr = doc.getJSONObject("Project").getJSONObject("Project")
.getJSONObject("ProjectDescr")
.getJSONObject("ProjectDescr")
// 加载 json 字符串 df
val jsonRDD = spark.sparkContext.makeRDD(doc.toString() :: Nil)
val jsonDF = spark.read.json(jsonRDD)
//jsonDF.show()
// jsonDF.schema.printTreeString()
val bio = new BioProject
bio.convertConcrete2KeyVal(projectDescr,"LocusTagPrefix")
val options = Map("es.index.auto.create"-> "true",
// "es.mapping.id"->accession,
"es.nodes"->es_nodes,"es.port"->port)
// --------------1
if (projectDescr.opt("ProjectReleaseDate") != null){
val date = projectDescr.get("ProjectReleaseDate").toString
val m: Matcher = dp.matcher(date)
if (m.matches()){
projectDescr.put("ProjectReleaseDate",m.group(1))
projectDescr.put("ProjectReleaseYear",Integer.parseInt(m.group(2)))
// df 写入 es
EsSparkSQL.saveToEs(jsonDF,s"${es_index}/${es_type}",options)
} else {
projectDescr.put("ProjectReleaseDate",date)
}
}
// ----------------2
if (projectDescr.optJSONObject("Publication") !=null){
val pub = projectDescr.getJSONObject("Publication")
if (pub.opt("date") !=null){
val date = pub.get("date").toString
val m: Matcher = dp.matcher(date)
if (m.matches()){
pub.put("date",m.group(1))
pub.put("year",Integer.parseInt(m.group(2)))
} else {
pub.put("date",date)
}
}
}
// ----------------3
if(doc.getJSONObject("Project").optJSONObject("Submission") != null){
val submission = doc.getJSONObject("Project").optJSONObject("Submission")
// val bio = new BioProject
// bio.convertConcrete2KeyVal(projectDescr,"LocusTagPrefix")
if(submission.opt("submitted") != null){
// --------------1
// if (projectDescr.opt("ProjectReleaseDate") != null){
// val date = projectDescr.get("ProjectReleaseDate").toString
// val m = dp.matcher(date)
// if (m.matches()){
// // m.group(1) 2017-04-25
// // m.group(2)) 2017
// projectDescr.put("ProjectReleaseDate",m.group(1))
// projectDescr.put("ProjectReleaseDate",Integer.parseInt(m.group(2)))
//
// } else {
// // date 2012-05-21T00:00:00Z
// projectDescr.put("ProjectReleaseDate",date)
// }
// }
val date = submission.get("submitted")
submission.put("submission_year", Integer.parseInt(date.toString().substring(0, 4)));
// ----------------2
// if (projectDescr.optJSONObject("Publication") !=null){
// val pub = projectDescr.getJSONObject("Publication")
// if (pub.opt("date") !=null){
// val date = projectDescr.getJSONObject("Publication").get("date").toString
// val m = dp.matcher(date)
// if (m.matches()){
// // m.group(1) 2017-04-25
// // m.group(2)) 2017
// projectDescr.put("date",m.group(1))
// projectDescr.put("year",Integer.parseInt(m.group(2)))
// } else {
// // date 2012-05-21T00:00:00Z
// projectDescr.put("date","##############99#")
// }
// }
// }
//
}
}
// ----------------3
// if(doc.optJSONObject("Submission").optJSONObject("submitted") != null){
// val submission = doc.optJSONObject("Submission").optJSONObject("submitted");
// if(submission.opt("submitted") != null){
// val date = submission.get("submitted");
// submission.put("submission_year", Integer.parseInt(date.toString().substring(0, 4)));
// }
// }
// ----------------4
// val grant = projectDescr.opt("Grant");
// if(grant != null){
// if(grant isInstanceOf[JSONArray]){
// for(int k = 0 ; k < ((JSONArray)grant).length(); k++){
// JSONObject singleGrant = (JSONObject)((JSONArray)grant).get(k);
// convertConcrete2KeyVal(singleGrant, "Agency");
// }
// }else if(grant instanceof JSONObject){
// convertConcrete2KeyVal((JSONObject)grant, "Agency");
// }
// }
// ----------------4
val grant: Object = projectDescr.opt("Grant")
if(grant != null){
if(grant.isInstanceOf[JSONArray]){
val array: JSONArray = grant.asInstanceOf[JSONArray]
for(k <- 0 until array.length()){
val singleGrant = array.get(k).asInstanceOf[JSONObject]
bio.convertConcrete2KeyVal(singleGrant, "Agency");
}
}
else if(grant.isInstanceOf[JSONObject]){
val array: JSONObject = grant.asInstanceOf[JSONObject]
bio.convertConcrete2KeyVal(array, "Agency");
}
}
// ----------------5
val projectID = doc.getJSONObject("Project").getJSONObject("Project").getJSONObject("ProjectID");
bio.convertConcrete2KeyVal(projectID, "LocalID");
val organization = doc.getJSONObject("Project").optJSONObject("Submission").optJSONObject("Description").opt("Organization");
if(organization.isInstanceOf[JSONArray] ){
val array: JSONArray = organization.asInstanceOf[JSONArray]
for(k <- 0 until array.length()){
val orgz = array.get(k).asInstanceOf[JSONObject]
bio.convertConcrete2KeyVal(orgz, "Name");
}
}else if(organization.isInstanceOf[JSONObject]){
val orgz: JSONObject = organization.asInstanceOf[JSONObject]
bio.convertConcrete2KeyVal(orgz, "Name");
}
// ----------------5
// val projectID = doc.getJSONObject("Project").getJSONObject("Project").getJSONObject("ProjectID");
// bio.convertConcrete2KeyVal(projectID, "LocalID");
// Object organization = doc.optJSONObject("Submission").optJSONObject("Submission").optJSONObject("Description").opt("Organization");
// if(organization instanceof JSONArray){
// for(int j = 0; j < ((JSONArray) organization).length(); j++){
// val orgz = ((JSONArray) organization).get(j);
// bio.convertConcrete2KeyVal(((JSONObject)orgz), "Name");
// }
// }else if(organization instanceof JSONObject){
// val orgz = (JSONObject)organization;
// bio.convertConcrete2KeyVal(orgz, "Name");
// }
// ----------------6
val projTypeSubmission = doc.getJSONObject("Project").getJSONObject("Project").getJSONObject("ProjectType").optJSONObject("ProjectTypeSubmission");
// ----------------6
// val projTypeSubmission = doc.getJSONObject("Project").getJSONObject("Project").getJSONObject("ProjectType").optJSONObject("ProjectTypeSubmission");
// if(projTypeSubmission != null){
// val bioSampleSet = projTypeSubmission.getJSONObject("Target").optJSONObject("BioSampleSet");
// if(bioSampleSet != null){
// bio.convertConcrete2KeyVal(bioSampleSet, "ID");
// }
// }
if(projTypeSubmission != null){
val bioSampleSet = projTypeSubmission.getJSONObject("Target").optJSONObject("BioSampleSet");
if(bioSampleSet != null){
bio.convertConcrete2KeyVal(bioSampleSet, "ID");
}
}
if (count ==1 ) {
bisIn = new BufferedInputStream(new ByteArrayInputStream(("[" + doc.toString).getBytes()))
}
else {
bisIn = new BufferedInputStream(new ByteArrayInputStream(("," + doc.toString).getBytes()))
}
val buff: Array[Byte] = new Array[Byte](1048576)
var num: Int = bisIn.read(buff)
while (num != -1) {
fdosOut.write(buff, 0, num)
fdosOut.flush()
num = bisIn.read(buff)
}
fdosOut.flush()
bisIn = null
xml = new StringBuffer()
}
}
})
} catch {
case e: UnknownHostException =>
e.printStackTrace()
case e: FileNotFoundException =>
e.printStackTrace()
case e: IOException =>
e.printStackTrace()
}
}
bisIn = new BufferedInputStream(new ByteArrayInputStream(("]").getBytes()))
val buff: Array[Byte] = new Array[Byte](1048576)
var num: Int = bisIn.read(buff)
while (num != -1) {
fdosOut.write(buff, 0, num)
fdosOut.flush()
num = bisIn.read(buff)
}
fdosOut.flush()
fdosOut.close()
println("start parser HDFSjsonFile")
val df: DataFrame = spark.read.json(hdfsPathTemporary)
df.schema.printTreeString()
out.write(df)
}
def setProperties(map: Map[String, Any]): Unit = {
es_nodes=MapUtil.get(map,key="es_nodes").asInstanceOf[String]
port=MapUtil.get(map,key="port").asInstanceOf[String]
es_index=MapUtil.get(map,key="es_index").asInstanceOf[String]
es_type=MapUtil.get(map,key="es_type").asInstanceOf[String]
cachePath=MapUtil.get(map,key="cachePath").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("es_nodes").defaultValue("").required(true)
val port = new PropertyDescriptor().name("port").displayName("port").defaultValue("").required(true)
val es_index = new PropertyDescriptor().name("es_index").displayName("es_index").defaultValue("").required(true)
val es_type = new PropertyDescriptor().name("es_type").displayName("es_type").defaultValue("").required(true)
descriptor = es_nodes :: descriptor
descriptor = port :: descriptor
descriptor = es_index :: descriptor
descriptor = es_type :: descriptor
val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").defaultValue("").required(true)
descriptor = cachePath :: descriptor
descriptor
}

View File

@ -7,62 +7,67 @@ import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.spark.sql.{Row, SparkSession}
import org.elasticsearch.spark.sql.EsSparkSQL
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, FileSystem, Path}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.json.{JSONArray, JSONObject, XML}
class BioSampleParse extends ConfigurableStop{
val authorEmail: String = "ygang@cnic.cn"
val description: String = "Load file from ftp url."
val description: String = "Parsing BioSample type data"
val inportList: List[String] = List(PortEnum.DefaultPort.toString)
val outportList: List[String] = List(PortEnum.NonePort.toString)
val outportList: List[String] = List(PortEnum.DefaultPort.toString)
var es_nodes:String = _ //es的节点多个用逗号隔开
var port:String= _ //es的端口好
var es_index:String = _ //es的索引
var es_type:String = _ //es的类型
var cachePath:String = _
var docName = "BioSample"
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
val sc = spark.sparkContext
val ssc = spark.sqlContext
val inDf= in.read()
// inDf.show()
// inDf.schema.printTreeString()
val configuration: Configuration = new Configuration()
var pathStr: String =inDf.take(1)(0).get(0).asInstanceOf[String]
val pathARR: Array[String] = pathStr.split("\\/")
var hdfsUrl:String=""
for (x <- (0 until 3)){
hdfsUrl+=(pathARR(x) +"/")
}
val rows: Array[Row] = inDf.collect()
for (i <- 0 until rows.size) {
configuration.set("fs.defaultFS",hdfsUrl)
var fs: FileSystem = FileSystem.get(configuration)
// /ftpBioSample1/biosample.xml
val sourceFile = rows(i)(0).toString
println("++++++++++++++++++++++++++++++++++++++++++++++++++"+sourceFile)
var hdfsPathJsonCache:String = ""
var fdosOut: FSDataOutputStream = null
var bisIn: BufferedInputStream =null
var count = 0
var nameNum = 0
inDf.collect().foreach(row => {
pathStr = row.get(0).asInstanceOf[String]
var line: String = null
var xml = ""
val br: BufferedReader = new BufferedReader(new FileReader(sourceFile))
var fdis: FSDataInputStream = fs.open(new Path(pathStr))
val br: BufferedReader = new BufferedReader(new InputStreamReader(fdis))
br.readLine()
br.readLine()
var count = 0
while ((line = br.readLine()) != null) {
while ((line = br.readLine()) != null && line!= null) {
xml = xml + line
if (line.indexOf("</" + docName + ">") != -1) {
count = count + 1
val doc: JSONObject = XML.toJSONObject(xml).getJSONObject(docName)
val accession = doc.optString("accession")
val accession = doc.optString("accession")
// Attributes
val attrs: String = doc.optString("Attributes")
if (attrs.equals("")) {
doc.remove("Attributes")
}
// Links
val links: String = doc.optString("Links")
if (links != null) {
@ -70,9 +75,7 @@ class BioSampleParse extends ConfigurableStop{
doc.remove("Links")
}
}
val bio = new BioProject
// owner.name
val owner = doc.optString("Owner")
if (owner != null) {
@ -82,57 +85,85 @@ class BioSampleParse extends ConfigurableStop{
bio.convertConcrete2KeyVal(singleOwner, "Name")
}
}
// Models.Model
val models = doc.optJSONObject("Models")
if (models != null) {
bio.convertConcrete2KeyVal(models, "Models")
}
if (count%200000 == 1 ){
nameNum += 1
hdfsPathJsonCache = hdfsUrl+cachePath+"/biosampleCache/"+"biosample"+nameNum+".json"
val path: Path = new Path(hdfsPathJsonCache)
if(fs.exists(path)){
fs.delete(path)
}
fs.create(path).close()
fdosOut = fs.append(path)
// if (count < 20) {
println("#####################################" + count)
// 加载 json 字符串 df
val jsonRDD = spark.sparkContext.makeRDD(doc.toString() :: Nil)
val jsonDF = spark.read.json(jsonRDD)
//jsonDF.show()
// jsonDF.schema.printTreeString()
val options = Map("es.index.auto.create" -> "true",
"es.mapping.id" -> "accession",
"es.nodes" -> es_nodes, "es.port" -> port)
// df 写入 es
EsSparkSQL.saveToEs(jsonDF, s"${es_index}/${es_type}", options)
// }
bisIn = new BufferedInputStream(new ByteArrayInputStream(("[" + doc.toString).getBytes()))
val buff: Array[Byte] = new Array[Byte](1048576)
var num: Int = bisIn.read(buff)
while (num != -1) {
fdosOut.write(buff, 0, num)
fdosOut.flush()
num = bisIn.read(buff)
}
bisIn.close()
} else if (count%200000 ==0){
bisIn = new BufferedInputStream(new ByteArrayInputStream((","+doc.toString + "]").getBytes()))
val buff: Array[Byte] = new Array[Byte](1048576)
var num: Int = bisIn.read(buff)
while (num != -1) {
fdosOut.write(buff, 0, num)
fdosOut.flush()
num = bisIn.read(buff)
}
fdosOut.flush()
fdosOut.close()
bisIn.close()
} else {
bisIn = new BufferedInputStream(new ByteArrayInputStream(("," + doc.toString).getBytes()))
val buff: Array[Byte] = new Array[Byte](1048576)
var num: Int = bisIn.read(buff)
while (num != -1) {
fdosOut.write(buff, 0, num)
fdosOut.flush()
num = bisIn.read(buff)
}
bisIn.close()
}
xml = ""
}
}
})
if (count%200000 != 0){
bisIn = new BufferedInputStream(new ByteArrayInputStream(("]").getBytes()))
val buff: Array[Byte] = new Array[Byte](1048576)
var num: Int = bisIn.read(buff)
while (num != -1) {
fdosOut.write(buff, 0, num)
fdosOut.flush()
num = bisIn.read(buff)
}
fdosOut.flush()
bisIn.close()
}
fdosOut.close()
println("start parser HDFSjsonFile")
val df: DataFrame = ssc.read.json(hdfsUrl+cachePath+"/biosampleCache/")
out.write(df)
}
def setProperties(map: Map[String, Any]): Unit = {
es_nodes=MapUtil.get(map,key="es_nodes").asInstanceOf[String]
port=MapUtil.get(map,key="port").asInstanceOf[String]
es_index=MapUtil.get(map,key="es_index").asInstanceOf[String]
es_type=MapUtil.get(map,key="es_type").asInstanceOf[String]
cachePath=MapUtil.get(map,key="cachePath").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("es_nodes").defaultValue("").required(true)
val port = new PropertyDescriptor().name("port").displayName("port").defaultValue("").required(true)
val es_index = new PropertyDescriptor().name("es_index").displayName("es_index").defaultValue("").required(true)
val es_type = new PropertyDescriptor().name("es_type").displayName("es_type").defaultValue("").required(true)
descriptor = es_nodes :: descriptor
descriptor = port :: descriptor
descriptor = es_index :: descriptor
descriptor = es_type :: descriptor
val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").defaultValue("").required(true)
descriptor = cachePath :: descriptor
descriptor
}

View File

@ -1,142 +1,121 @@
package cn.piflow.bundle.microorganism
import java.io._
import java.text.ParseException
import java.util.ArrayList
import cn.piflow.bundle.microorganism.util.{CustomIOTools, Process}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.conf.util.ImageUtil
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.biojava.bio.BioException
import org.elasticsearch.spark.sql.EsSparkSQL
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, FileSystem, Path}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.json.JSONObject
class GenBankParse extends ConfigurableStop{
val authorEmail: String = "ygang@cnic.cn"
val description: String = " Parse genbank date put to elasticSearch"
val description: String = " Parsing GenBank type data"
val inportList: List[String] = List(PortEnum.DefaultPort.toString)
val outportList: List[String] = List(PortEnum.NonePort.toString)
val outportList: List[String] = List(PortEnum.DefaultPort.toString)
var es_nodes:String = _ //es的节点多个用逗号隔开
var port:String= _ //es的端口好
var es_index:String = _ //es的索引
var es_type:String = _ //es的类型
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
val sc = spark.sparkContext
val inDf= in.read()
val inDf = in.read()
//inDf.show()
println("++++++++++++++++++++++++++++++++++++++++++++++++++001")
println(inDf.count())
inDf.schema.printTreeString()
val configuration: Configuration = new Configuration()
var pathStr: String =inDf.take(1)(0).get(0).asInstanceOf[String]
val pathARR: Array[String] = pathStr.split("\\/")
var hdfsUrl:String=""
for (x <- (0 until 3)){
hdfsUrl+=(pathARR(x) +"/")
}
var listSeq = new ArrayList[ArrayList[String]]
configuration.set("fs.defaultFS",hdfsUrl)
var fs: FileSystem = FileSystem.get(configuration)
val rows: Array[Row] = inDf.collect()
try {
for (i <- 0 until rows.size) {
val hdfsPathTemporary:String = hdfsUrl+"/microoCache/genbank/genbankcach.json"
val path: Path = new Path(hdfsPathTemporary)
if(fs.exists(path)){
fs.delete(path)
}
fs.create(path).close()
val sourceFile = rows(i)(0)
println("++++++++++++++++++++++++++++++++++++++++++++++++++002" + sourceFile)
// 字节数组反序列化 ByteArrayInputStream
val bis:ByteArrayInputStream=new ByteArrayInputStream(sourceFile.asInstanceOf[Array[Byte]])
//val fileinputStream = new FileInputStream(sourceFile)
val br = new BufferedReader(new InputStreamReader(bis))
var fdosOut: FSDataOutputStream = fs.append(path)
var jsonStr: String =""
var bisIn: BufferedInputStream =null
// 解析seq 文件 的字节流
val sequenceIterator = CustomIOTools.IOTools.readGenbankDNA(br, null)
var doc: JSONObject = null
var count = 0
while (sequenceIterator.hasNext) {
var listJson = new ArrayList[String]
doc = new JSONObject()
try {
var seq = sequenceIterator.nextRichSequence()
inDf.collect().foreach(row=>{
pathStr = row.get(0).asInstanceOf[String]
var fdis: FSDataInputStream = fs.open(new Path(pathStr))
val br: BufferedReader = new BufferedReader(new InputStreamReader(fdis))
Process.processSingleSequence(seq, doc)
// json 字符串
listJson.add(doc.toString())
// 序列号 CP009630
listJson.add(seq.getAccession)
val sequenceIterator = CustomIOTools.IOTools.readGenbankDNA(br, null)
listSeq.add(listJson)
var doc: JSONObject = null
var count = 0
while (sequenceIterator.hasNext) {
count += 1
doc = new JSONObject
}
catch {
case e: BioException =>
e.getMessage
case e: ParseException =>
e.printStackTrace()
}
val seq = sequenceIterator.nextRichSequence()
Process.processSingleSequence(seq, doc)
if (count == 1) {
bisIn = new BufferedInputStream(new ByteArrayInputStream(("[" + doc.toString).getBytes()))
} else {
bisIn = new BufferedInputStream(new ByteArrayInputStream(("," + doc.toString).getBytes()))
}
val buff: Array[Byte] = new Array[Byte](1048576)
var num: Int = bisIn.read(buff)
while (num != -1) {
fdosOut.write(buff, 0, num)
fdosOut.flush()
num = bisIn.read(buff)
}
fdosOut.flush()
bisIn = null
}
} catch {
case e: FileNotFoundException =>
e.printStackTrace()
case e: IOException =>
e.printStackTrace()
})
bisIn = new BufferedInputStream(new ByteArrayInputStream(("]").getBytes()))
val buff: Array[Byte] = new Array[Byte](1048576)
var num: Int = bisIn.read(buff)
while (num != -1) {
fdosOut.write(buff, 0, num)
fdosOut.flush()
num = bisIn.read(buff)
}
var jsonDF: DataFrame = null
for (i <- 0 until listSeq.size()) {
fdosOut.flush()
fdosOut.close()
println("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$" + i)
//
println(listSeq.get(i).size())
println("start parser HDFSjsonFile")
val df: DataFrame = spark.read.json(hdfsPathTemporary)
// seq 文件中的 json 字符串
val jsonString = listSeq.get(i).get(0)
// 序列号 CP009630
val esId = listSeq.get(i).get(1).toString
println(esId)
// 加载 json 字符串 df
val jsonRDD = spark.sparkContext.makeRDD(jsonString.toString() :: Nil)
jsonDF = spark.read.json(jsonRDD)
//jsonDF.show()
// jsonDF.schema.printTreeString()
df.schema.printTreeString()
out.write(df)
val options = Map("es.index.auto.create"-> "true",
// "es.mapping.id"->"Accession",
"es.nodes"->es_nodes,"es.port"->port)
// df 写入 es
EsSparkSQL.saveToEs(jsonDF,s"${es_index}/${es_type}",options)
}
}
def setProperties(map: Map[String, Any]): Unit = {
es_nodes=MapUtil.get(map,key="es_nodes").asInstanceOf[String]
port=MapUtil.get(map,key="port").asInstanceOf[String]
es_index=MapUtil.get(map,key="es_index").asInstanceOf[String]
es_type=MapUtil.get(map,key="es_type").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("es_nodes").defaultValue("").required(true)
val port = new PropertyDescriptor().name("port").displayName("port").defaultValue("").required(true)
val es_index = new PropertyDescriptor().name("es_index").displayName("es_index").defaultValue("").required(true)
val es_type = new PropertyDescriptor().name("es_type").displayName("es_type").defaultValue("").required(true)
descriptor = es_nodes :: descriptor
descriptor = port :: descriptor
descriptor = es_index :: descriptor
descriptor = es_type :: descriptor
descriptor
}

View File

@ -0,0 +1,156 @@
package cn.piflow.bundle.microorganism
import java.io._
import java.util.Iterator
import java.util.regex.{Matcher, Pattern}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, FileSystem, Path}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.json.JSONObject
class GoDataParse extends ConfigurableStop{
val authorEmail: String = "ygang@cnic.cn"
val description: String = "Parsing Go type data"
val inportList: List[String] = List(PortEnum.DefaultPort.toString)
val outportList: List[String] = List(PortEnum.DefaultPort.toString)
var cachePath:String = _
var tv:Pattern = Pattern.compile("(\\S+):\\s(.+)")
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
val sc = spark.sparkContext
val inDf= in.read()
val configuration: Configuration = new Configuration()
var pathStr: String =inDf.take(1)(0).get(0).asInstanceOf[String]
val pathARR: Array[String] = pathStr.split("\\/")
var hdfsUrl:String=""
for (x <- (0 until 3)){
hdfsUrl+=(pathARR(x) +"/")
}
configuration.set("fs.defaultFS",hdfsUrl)
var fs: FileSystem = FileSystem.get(configuration)
val hdfsPathJsonCache = hdfsUrl+cachePath+"/godataCache/godata.json"
val path: Path = new Path(hdfsPathJsonCache)
if(fs.exists(path)){
fs.delete(path)
}
fs.create(path).close()
var fdosOut: FSDataOutputStream = fs.append(path)
var jsonStr: String =""
var bisIn: BufferedInputStream =null
inDf.collect().foreach(row => {
pathStr = row.get(0).asInstanceOf[String]
var fdis: FSDataInputStream = fs.open(new Path(pathStr))
val br: BufferedReader = new BufferedReader(new InputStreamReader(fdis))
var line: String = null
var xml = ""
var i =0
while (i<30){
br.readLine()
i+=1
}
var obj = new JSONObject()
var count= 0
while ((line = br.readLine()) !=null && line !=null ){
val m: Matcher = tv.matcher(line)
if (line.startsWith("[")){
if (line .equals("[Term]")){
obj.append("stanza_name","Term")
} else if (line.equals("[Typedef]")){
obj.append("stanza_name","Typedef")
} else if (line.equals("[Instance]")){
obj.append("stanza_name","Instance")
}
} else if (m.matches()){
obj.append(m.group(1),m.group(2))
} else if ( line.equals("")){
val keyIterator: Iterator[String] = obj.keys()
while (keyIterator.hasNext){
val key = keyIterator.next()
var value = ""
for (i <- 0 until obj.getJSONArray(key).length() ){
value += (";" + obj.getJSONArray(key).get(i).toString)
}
obj.put(key,value.substring(1))
}
count += 1
if (count ==1 ) {
bisIn = new BufferedInputStream(new ByteArrayInputStream(("[" + obj.toString).getBytes()))
}
else {
bisIn = new BufferedInputStream(new ByteArrayInputStream(("," + obj.toString).getBytes()))
}
val buff: Array[Byte] = new Array[Byte](1048576)
var num: Int = bisIn.read(buff)
while (num != -1) {
fdosOut.write(buff, 0, num)
fdosOut.flush()
num = bisIn.read(buff)
}
fdosOut.flush()
bisIn = null
obj= new JSONObject()
}
}
})
bisIn = new BufferedInputStream(new ByteArrayInputStream(("]").getBytes()))
val buff: Array[Byte] = new Array[Byte](1048576)
var num: Int = bisIn.read(buff)
while (num != -1) {
fdosOut.write(buff, 0, num)
fdosOut.flush()
num = bisIn.read(buff)
}
fdosOut.flush()
fdosOut.close()
println("start parser HDFSjsonFile")
val df: DataFrame = spark.read.json(hdfsPathJsonCache)
df.schema.printTreeString()
out.write(df)
}
def setProperties(map: Map[String, Any]): Unit = {
cachePath=MapUtil.get(map,key="cachePath").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").defaultValue("").required(true)
descriptor = cachePath :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("microorganism/png/Gene_Ontology.png")
}
override def getGroup(): List[String] = {
List(StopGroup.MicroorganismGroup.toString)
}
def initialize(ctx: ProcessContext): Unit = {
}
}

View File

@ -1,72 +1,37 @@
package cn.piflow.bundle.microorganism
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.conf.util.ImageUtil
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.spark.sql.{Row, SparkSession}
import org.elasticsearch.spark.sql.EsSparkSQL
import org.apache.spark.sql. SparkSession
class GoldDataParse extends ConfigurableStop{
val authorEmail: String = "ygang@cnic.cn"
val description: String = "Parse Gold date put to elasticSearch."
val description: String = "Parsing GoldData type data"
val inportList: List[String] = List(PortEnum.DefaultPort.toString)
val outportList: List[String] = List(PortEnum.NonePort.toString)
val outportList: List[String] = List(PortEnum.DefaultPort.toString)
var es_nodes:String = _ //es的节点多个用逗号隔开
var port:String= _ //es的端口好
var es_index:String = _ //es的索引
var es_type:String = _ //es的类型
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
val sc = spark.sparkContext
val inDf = in.read()
// inDf.show()
// println(inDf.count())
// inDf.schema.printTreeString()
val rows: Array[Row] = inDf.collect()
for (i<- 0 until rows.length){
// 加载 json 字符串 df
val jsonRDD = spark.sparkContext.makeRDD(rows(i)(0).toString :: Nil)
val jsonDF = spark.read.json(jsonRDD)
//jsonDF.show()
out.write(inDf)
val options = Map("es.index.auto.create"-> "true",
"es.mapping.id"->"gold_id",
"es.nodes"->es_nodes,"es.port"->port)
// df 写入 es
EsSparkSQL.saveToEs(jsonDF,s"${es_index}/${es_type}",options)
}
}
def setProperties(map: Map[String, Any]): Unit = {
es_nodes=MapUtil.get(map,key="es_nodes").asInstanceOf[String]
port=MapUtil.get(map,key="port").asInstanceOf[String]
es_index=MapUtil.get(map,key="es_index").asInstanceOf[String]
es_type=MapUtil.get(map,key="es_type").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("es_nodes").defaultValue("").required(true)
val port = new PropertyDescriptor().name("port").displayName("port").defaultValue("").required(true)
val es_index = new PropertyDescriptor().name("es_index").displayName("es_index").defaultValue("").required(true)
val es_type = new PropertyDescriptor().name("es_type").displayName("es_type").defaultValue("").required(true)
descriptor = es_nodes :: descriptor
descriptor = port :: descriptor
descriptor = es_index :: descriptor
descriptor = es_type :: descriptor
descriptor
}

View File

@ -0,0 +1,145 @@
package cn.piflow.bundle.microorganism
import java.io._
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, FileSystem, Path}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.json.{JSONObject, XML}
class InterprodataParse extends ConfigurableStop{
val authorEmail: String = "ygang@cnic.cn"
val description: String = "Parsing Interpro type data"
val inportList: List[String] = List(PortEnum.DefaultPort.toString)
val outportList: List[String] = List(PortEnum.DefaultPort.toString)
var cachePath:String = _
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
val sc = spark.sparkContext
val inDf= in.read()
val configuration: Configuration = new Configuration()
var pathStr: String =inDf.take(1)(0).get(0).asInstanceOf[String]
val pathARR: Array[String] = pathStr.split("\\/")
var hdfsUrl:String=""
for (x <- (0 until 3)){
hdfsUrl+=(pathARR(x) +"/")
}
configuration.set("fs.defaultFS",hdfsUrl)
var fs: FileSystem = FileSystem.get(configuration)
val hdfsPathJsonCache = hdfsUrl+cachePath+"/interproDataCatch/interpro.json"
val path: Path = new Path(hdfsPathJsonCache)
if(fs.exists(path)){
fs.delete(path)
}
fs.create(path).close()
var fdosOut: FSDataOutputStream = fs.append(path)
var jsonStr: String =""
var bisIn: BufferedInputStream =null
inDf.collect().foreach(row => {
pathStr = row.get(0).asInstanceOf[String]
var fdis: FSDataInputStream = fs.open(new Path(pathStr))
val br: BufferedReader = new BufferedReader(new InputStreamReader(fdis))
var line: String = null
var xml = ""
var i = 0
while (i<26){
br.readLine()
i+=1
}
var count = 0
var abstraction:String = null
var doc: JSONObject = null
while ((line = br.readLine()) != null && line !=null ){
xml += line
if (line .indexOf("</interpro>") != -1){
count += 1
doc = XML.toJSONObject(xml).getJSONObject("interpro")
val id = doc.getString("id")
if (doc.has("abstract")){
abstraction = doc.get("abstract").toString
doc.put("abstract",abstraction)
}
if (doc.get("pub_list") == ""){
doc.remove("pub_list")
}
if (count ==1 ) {
bisIn = new BufferedInputStream(new ByteArrayInputStream(("[" + doc.toString).getBytes()))
}
else {
bisIn = new BufferedInputStream(new ByteArrayInputStream(("," + doc.toString).getBytes()))
}
val buff: Array[Byte] = new Array[Byte](1048576)
var num: Int = bisIn.read(buff)
while (num != -1) {
fdosOut.write(buff, 0, num)
fdosOut.flush()
num = bisIn.read(buff)
}
fdosOut.flush()
bisIn = null
xml = ""
}
}
})
bisIn = new BufferedInputStream(new ByteArrayInputStream(("]").getBytes()))
val buff: Array[Byte] = new Array[Byte](1048576)
var num: Int = bisIn.read(buff)
while (num != -1) {
fdosOut.write(buff, 0, num)
fdosOut.flush()
num = bisIn.read(buff)
}
fdosOut.flush()
fdosOut.close()
println("start parser HDFSjsonFile")
val df: DataFrame = spark.read.json(hdfsPathJsonCache)
df.schema.printTreeString()
out.write(df)
}
def setProperties(map: Map[String, Any]): Unit = {
cachePath=MapUtil.get(map,key="cachePath").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").defaultValue("").required(true)
descriptor = cachePath :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("microorganism/png/Interpro.png")
}
override def getGroup(): List[String] = {
List(StopGroup.MicroorganismGroup.toString)
}
def initialize(ctx: ProcessContext): Unit = {
}
}

View File

@ -1,110 +1,77 @@
package cn.piflow.bundle.microorganism
import java.io._
import java.util.HashMap
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, FileSystem, Path}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.elasticsearch.spark.sql.EsSparkSQL
import org.json.JSONObject
class TaxonomyParse extends ConfigurableStop{
val authorEmail: String = "ygang@cnic.cn"
val description: String = "Load file from ftp url."
val description: String = "Parsing Taxonomy type data"
val inportList: List[String] = List(PortEnum.DefaultPort.toString)
val outportList: List[String] = List(PortEnum.NonePort.toString)
var es_nodes:String = _ //es的节点多个用逗号隔开
var port:String= _ //es的端口好
var es_index:String = _ //es的索引
var es_type:String = _ //es的类型
var cachePath:String = _
var filePath:String = _
var outWriteDF:DataFrame = _
var nodesDF:DataFrame = _
var divisionDF:DataFrame = _
var gencodeDF:DataFrame = _
var namesDF:DataFrame = _
var citationsDF:DataFrame = _
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
val sc = spark.sparkContext
val ssc = spark.sqlContext
println("###############################")
val inDf = in.read()
// val inDf = in.read()
// inDf.show()
// inDf.printSchema()
// val rows: Array[Row] = inDf.collect()
// val pathDir = new File(rows(0)(0).toString).getParent
val configuration: Configuration = new Configuration()
var pathStr: String =inDf.take(1)(0).get(0).asInstanceOf[String]
val path = "/ftpTaxonomy1/1/gencode.dmp"
val pathDir: String = new File(path).getParent
filePath = pathDir + File.separator + "nodes.dm"
// #########################################------004 ---namese.dmp
if (filePath.endsWith("names.dmp")) {
val optionsFromEs = Map("es.index.auto.create"-> "true",
"es.nodes.wan.only"->"true",
"es.nodes"->es_nodes,"es.port"->port)
//load data with df from es
val esDf = ssc.read.format("org.elasticsearch.spark.sql").options(optionsFromEs).load(s"${es_index}/${es_type}")
val br = new BufferedReader(new FileReader(filePath))
var line: String = null;
var count = 0
var divDF :DataFrame = null
while ((line = br.readLine) != null && line != null) {
val tokens: Array[String] = line.split("\\t\\|\\t")
val doc = new JSONObject()
doc.put("genetic_code_id", tokens(0))
doc.put("genetic_code_name", tokens(2))
doc.put("genetic_code_translation_table", tokens(3).trim)
doc.put("genetic_code_start_codons", tokens(4).replace("\t|","").trim)
if (count==0) {
val jsonRDD = spark.sparkContext.makeRDD(doc.toString :: Nil)
divDF = spark.read.json(jsonRDD)
} else {
val jsonRDD = spark.sparkContext.makeRDD(doc.toString :: Nil)
divDF = spark.read.json(jsonRDD).union(divDF)
}
count = count+1
}
// divDF.show()
val outDf = esDf.join(divDF, Seq("genetic_code_id")).
filter(esDf.col("genetic_code_id") === divDF.col("genetic_code_id"))
val optionsToEs = Map("es.index.auto.create" -> "true",
"es.mapping.id" -> "tax_id",
"es.nodes" -> es_nodes,
"es.port" -> port)
// df 写入 es
EsSparkSQL.saveToEs(outDf, s"${es_index}/${es_type}", optionsToEs)
println("nodes.dmp--------------->存储成功")
filePath = pathDir + File.separator + "names.dmp"
val pathARR: Array[String] = pathStr.split("\\/")
var hdfsUrl:String=""
for (x <- (0 until 3)){
hdfsUrl+=(pathARR(x) +"/")
}
configuration.set("fs.defaultFS",hdfsUrl)
var fs: FileSystem = FileSystem.get(configuration)
var pathDir = ""
for (x <- 0 until pathARR.length-1){
pathDir+=(pathARR(x) +"/")
}
filePath = pathDir + File.separator + "nodes.dmp"
// #########################################------001 ---nodes.dmp
if (filePath.endsWith("nodes.dmp")) {
val br = new BufferedReader(new FileReader(filePath))
val hdfsPathJsonCache = hdfsUrl+cachePath+"/taxonomyCache/nodes.json"
val path: Path = new Path(hdfsPathJsonCache)
if(fs.exists(path)){
fs.delete(path)
}
fs.create(path).close()
var fdosOut: FSDataOutputStream = fs.append(path)
var jsonStr: String =""
var bisIn: BufferedInputStream =null
var fdis: FSDataInputStream = fs.open(new Path(filePath.toString))
val br: BufferedReader = new BufferedReader(new InputStreamReader(fdis))
var line: String = null;
var count =1
while ((line = br.readLine) != null &&line != null) {
// println(count)
var count =0
while ((line = br.readLine) != null && line != null) {
count = count+1
val doc = new JSONObject()
val tokens: Array[String] = line.split("\\t\\|\\t")
doc.put("tax_id", tokens(0))
@ -114,154 +81,304 @@ class TaxonomyParse extends ConfigurableStop{
doc.put("division_id", tokens(4))
doc.put("genetic_code_id", tokens(6))
doc.put("mitochondrial_genetic_code_id", tokens(8))
// println(doc)
count = count+1
if (tokens(0).equals("2492834") ){
println(tokens(0))
if (count == 1) {
bisIn = new BufferedInputStream(new ByteArrayInputStream(("[" + doc.toString).getBytes()))
} else {
bisIn = new BufferedInputStream(new ByteArrayInputStream(("," + doc.toString).getBytes()))
}
val buff: Array[Byte] = new Array[Byte](1048576)
var num: Int = bisIn.read(buff)
while (num != -1) {
fdosOut.write(buff, 0, num)
fdosOut.flush()
num = bisIn.read(buff)
}
// 加载 json 字符串 df
val jsonRDD = spark.sparkContext.makeRDD(doc.toString :: Nil)
val jsonDF = spark.read.json(jsonRDD)
val options = Map("es.index.auto.create" -> "true",
"es.mapping.id" -> "tax_id",
"es.nodes" -> es_nodes, "es.port" -> port)
// df 写入 es
EsSparkSQL.saveToEs(jsonDF, s"${es_index}/${es_type}", options)
println("nodes.dmp--------------->存储成功")
fdosOut.flush()
bisIn.close()
}
bisIn = new BufferedInputStream(new ByteArrayInputStream(("]").getBytes()))
val buff: Array[Byte] = new Array[Byte](1048576)
var num: Int = bisIn.read(buff)
while (num != -1) {
fdosOut.write(buff, 0, num)
fdosOut.flush()
num = bisIn.read(buff)
}
fdosOut.flush()
bisIn.close()
fdosOut.close()
nodesDF = spark.read.json(hdfsPathJsonCache)
filePath = pathDir + File.separator + "division.dmp"
}
// #########################################------002 ---division.dmp
else if (filePath.endsWith("division.dmp")) {
if (filePath.endsWith("division.dmp")){
val hdfsPathJsonCache = hdfsUrl+cachePath+"/taxonomyCache/division.json"
val options = Map("es.index.auto.create"-> "true",
"es.nodes.wan.only"->"true",
"es.nodes"->es_nodes,"es.port"->port)
val path: Path = new Path(hdfsPathJsonCache)
if(fs.exists(path)){
fs.delete(path)
}
fs.create(path).close()
var fdosOut: FSDataOutputStream = fs.append(path)
var jsonStr: String =""
var bisIn: BufferedInputStream =null
//load data with df from es
val esDf = ssc.read.format("org.elasticsearch.spark.sql").options(options).load(s"${es_index}/${es_type}")
val br = new BufferedReader(new FileReader(filePath))
var fdis: FSDataInputStream = fs.open(new Path(filePath.toString))
val br: BufferedReader = new BufferedReader(new InputStreamReader(fdis))
var line: String = null;
var count = 0
var divDF :DataFrame = null
while ((line = br.readLine) != null && line != null) {
while ((line = br.readLine) != null && line != null ) {
count=count+1
val tokens: Array[String] = line.split("\\t\\|\\t")
val doc = new JSONObject()
doc.put("division_id", tokens(0))
doc.put("dive", tokens(1))
doc.put("diname", tokens(2))
if (count==0) {
val jsonRDD = spark.sparkContext.makeRDD(doc.toString :: Nil)
divDF = spark.read.json(jsonRDD)
if (count == 1) {
bisIn = new BufferedInputStream(new ByteArrayInputStream(("[" + doc.toString).getBytes()))
} else {
val jsonRDD = spark.sparkContext.makeRDD(doc.toString :: Nil)
divDF = spark.read.json(jsonRDD).union(divDF)
bisIn = new BufferedInputStream(new ByteArrayInputStream(("," + doc.toString).getBytes()))
}
count = count+1
val buff: Array[Byte] = new Array[Byte](1048576)
var num: Int = bisIn.read(buff)
while (num != -1) {
fdosOut.write(buff, 0, num)
fdosOut.flush()
num = bisIn.read(buff)
}
fdosOut.flush()
bisIn = null
}
bisIn = new BufferedInputStream(new ByteArrayInputStream(("]").getBytes()))
val buff: Array[Byte] = new Array[Byte](1048576)
val outDf = esDf.join(divDF, Seq("division_id")).filter(esDf.col("division_id") === divDF.col("division_id"))
var num: Int = bisIn.read(buff)
while (num != -1) {
fdosOut.write(buff, 0, num)
fdosOut.flush()
num = bisIn.read(buff)
}
fdosOut.flush()
fdosOut.close()
val options1 = Map("es.index.auto.create" -> "true",
"es.mapping.id" -> "tax_id",
"es.nodes" -> es_nodes,
"es.port" -> port)
// df 写入 es
EsSparkSQL.saveToEs(outDf, s"${es_index}/${es_type}", options1)
println("nodes.dmp--------------->存储成功")
divisionDF = spark.read.json(hdfsPathJsonCache)
outWriteDF=nodesDF.join(divisionDF, Seq("division_id"))
filePath = pathDir + File.separator + "gencode.dmp"
}
// #########################################------003 ---gencode.dmp
else if (filePath.endsWith("gencode.dmp")) {
val optionsFromEs = Map("es.index.auto.create"-> "true",
"es.nodes.wan.only"->"true",
"es.nodes"->es_nodes,"es.port"->port)
//load data with df from es
val esDf = ssc.read.format("org.elasticsearch.spark.sql").options(optionsFromEs).load(s"${es_index}/${es_type}")
if (filePath.endsWith("gencode.dmp")){
val br = new BufferedReader(new FileReader(filePath))
val hdfsPathJsonCache = hdfsUrl+cachePath+"/taxonomyCache/gencode.json"
val path: Path = new Path(hdfsPathJsonCache)
if(fs.exists(path)){
fs.delete(path)
}
fs.create(path).close()
var fdosOut: FSDataOutputStream = fs.append(path)
var jsonStr: String =""
var bisIn: BufferedInputStream =null
var fdis: FSDataInputStream = fs.open(new Path(filePath.toString))
val br: BufferedReader = new BufferedReader(new InputStreamReader(fdis))
var line: String = null;
var count = 0
var divDF :DataFrame = null
while ((line = br.readLine) != null && line != null) {
while ((line = br.readLine) != null && line != null ) {
count += 1
val tokens: Array[String] = line.split("\\t\\|\\t")
val doc = new JSONObject()
doc.put("genetic_code_id", tokens(0))
doc.put("genetic_code_name", tokens(2))
doc.put("genetic_code_name", tokens(2).trim)
doc.put("genetic_code_translation_table", tokens(3).trim)
doc.put("genetic_code_start_codons", tokens(4).replace("\t|","").trim)
if (count==0) {
val jsonRDD = spark.sparkContext.makeRDD(doc.toString :: Nil)
divDF = spark.read.json(jsonRDD)
if (count == 1) {
bisIn = new BufferedInputStream(new ByteArrayInputStream(("[" + doc.toString).getBytes()))
} else {
val jsonRDD = spark.sparkContext.makeRDD(doc.toString :: Nil)
divDF = spark.read.json(jsonRDD).union(divDF)
bisIn = new BufferedInputStream(new ByteArrayInputStream(("," + doc.toString).getBytes()))
}
count = count+1
val buff: Array[Byte] = new Array[Byte](1048576)
var num: Int = bisIn.read(buff)
while (num != -1) {
fdosOut.write(buff, 0, num)
fdosOut.flush()
num = bisIn.read(buff)
}
fdosOut.flush()
bisIn = null
}
bisIn = new BufferedInputStream(new ByteArrayInputStream(("]").getBytes()))
val buff: Array[Byte] = new Array[Byte](1048576)
// divDF.show()
var num: Int = bisIn.read(buff)
while (num != -1) {
fdosOut.write(buff, 0, num)
fdosOut.flush()
num = bisIn.read(buff)
}
fdosOut.flush()
fdosOut.close()
val outDf = esDf.join(divDF, Seq("genetic_code_id")).
filter(esDf.col("genetic_code_id") === divDF.col("genetic_code_id"))
val optionsToEs = Map("es.index.auto.create" -> "true",
"es.mapping.id" -> "tax_id",
"es.nodes" -> es_nodes,
"es.port" -> port)
// df 写入 es
EsSparkSQL.saveToEs(outDf, s"${es_index}/${es_type}", optionsToEs)
println("nodes.dmp--------------->存储成功")
gencodeDF = spark.read.json(hdfsPathJsonCache)
outWriteDF=outWriteDF.join(gencodeDF, Seq("genetic_code_id"))
filePath = pathDir + File.separator + "names.dmp"
}
if (filePath.endsWith("names.dmp")){
val hdfsPathJsonCache = hdfsUrl+cachePath+"/taxonomyCache/names.json"
val path: Path = new Path(hdfsPathJsonCache)
if(fs.exists(path)){
fs.delete(path)
}
fs.create(path).close()
var fdosOut: FSDataOutputStream = fs.append(path)
var jsonStr: String =""
var bisIn: BufferedInputStream =null
var fdis: FSDataInputStream = fs.open(new Path(filePath.toString))
val br: BufferedReader = new BufferedReader(new InputStreamReader(fdis))
var line: String = null
var count = 0
var pre_tax_id = "1"
var name_key = ""
var names = new HashMap[String,String]()
var doc = new JSONObject()
while ((line = br.readLine) != null && line != null ) {
val tokens: Array[String] = line.split("\\t\\|\\t")
name_key = tokens(3).replace("\t|","").trim
if (tokens(0).equals(pre_tax_id)){
if (names.containsKey(name_key)){
names.put(name_key,names.get(name_key).toString+";"+tokens(1))
} else {
names.put(name_key,tokens(1))
}
} else {
count += 1
names.put("tax_id",pre_tax_id)
doc.put("",names)
val doc1 = doc.toString().substring(0,doc.toString.length-1)
jsonStr = doc1.substring(4,doc1.length)
pre_tax_id = tokens(0)
names = new HashMap[String,String]()
names.put(name_key,tokens(1))
if (count == 1) {
bisIn = new BufferedInputStream(new ByteArrayInputStream(("[" + jsonStr).getBytes()))
} else {
bisIn = new BufferedInputStream(new ByteArrayInputStream(("," + jsonStr).getBytes()))
}
val buff: Array[Byte] = new Array[Byte](1048576)
var num: Int = bisIn.read(buff)
while (num != -1) {
fdosOut.write(buff, 0, num)
fdosOut.flush()
num = bisIn.read(buff)
}
fdosOut.flush()
bisIn = null
}
}
names.put("tax_id",pre_tax_id)
doc.put("",names)
val doc1 = doc.toString().substring(0,doc.toString.length-1)
jsonStr = doc1.substring(4,doc1.length)
bisIn = new BufferedInputStream(new ByteArrayInputStream(("," +jsonStr+ "]").getBytes()))
val buff: Array[Byte] = new Array[Byte](1048576)
var num: Int = bisIn.read(buff)
while (num != -1) {
fdosOut.write(buff, 0, num)
fdosOut.flush()
num = bisIn.read(buff)
}
fdosOut.flush()
fdosOut.close()
namesDF = spark.read.json(hdfsPathJsonCache)
outWriteDF = outWriteDF.join(namesDF,Seq("tax_id"))
outWriteDF.schema.printTreeString()
filePath = pathDir + File.separator + "citations.dmp"
}
if (filePath.endsWith("citations.dmp")){
var fdis: FSDataInputStream = fs.open(new Path(filePath.toString))
val br: BufferedReader = new BufferedReader(new InputStreamReader(fdis))
var line: String = null
var count = 0
while ((line = br.readLine) != null && line != null ) {
count += 1
val tokens: Array[String] = line.split("\\t\\|\\t")
if (tokens.size > 6) {
val pumed_id = tokens(2)
val medline_id = tokens(3)
val tar_ids = tokens(6).replace("\t|", "").trim
var shouldUpdate_pubmed: Boolean = true
var shouldUpdate_medline: Boolean = true
var pumed_ids = null
var medline_ids = null
if (!tar_ids.isEmpty) {
if (pumed_id.equals("0") && medline_id.equals("0")) {
} else if (pumed_id.equals("0")) {
shouldUpdate_medline = true
shouldUpdate_pubmed = false
} else if (medline_id.equals("0")) {
shouldUpdate_pubmed = true
shouldUpdate_medline = false
} else {
shouldUpdate_pubmed = true
shouldUpdate_medline = true
}
}
}
}
}
outWriteDF.schema.printTreeString()
outWriteDF.show()
println("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$"+outWriteDF.count())
out.write(outWriteDF)
}
def processNodes(index:String,types:String)={
}
def setProperties(map: Map[String, Any]): Unit = {
es_nodes=MapUtil.get(map,key="es_nodes").asInstanceOf[String]
port=MapUtil.get(map,key="port").asInstanceOf[String]
es_index=MapUtil.get(map,key="es_index").asInstanceOf[String]
es_type=MapUtil.get(map,key="es_type").asInstanceOf[String]
cachePath=MapUtil.get(map,key="cachePath").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("es_nodes").defaultValue("").required(true)
val port = new PropertyDescriptor().name("port").displayName("port").defaultValue("").required(true)
val es_index = new PropertyDescriptor().name("es_index").displayName("es_index").defaultValue("").required(true)
val es_type = new PropertyDescriptor().name("es_type").displayName("es_type").defaultValue("").required(true)
descriptor = es_nodes :: descriptor
descriptor = port :: descriptor
descriptor = es_index :: descriptor
descriptor = es_type :: descriptor
val cachePath = new PropertyDescriptor().name("cachePath").displayName("cachePath").defaultValue("").required(true)
descriptor = cachePath :: descriptor
descriptor
}

View File

@ -1,6 +1,10 @@
package cn.piflow.bundle.util;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.poi.hssf.usermodel.HSSFDateUtil;
import org.apache.poi.hssf.usermodel.HSSFWorkbook;
import org.apache.poi.openxml4j.exceptions.InvalidFormatException;
@ -23,72 +27,62 @@ import java.util.Map;
public class ExcelToJson {
public static final String XLSX = ".xlsx";
public static final String XLS=".xls";
// public static final String XLSX = ".xlsx";
// public static final String XLS=".xls";
public static final Configuration configuration = new Configuration();
/**
* 获取Excel文件.xls和.xlsx都支持
* @param file
* @param pathStr
* @return 解析excle后的Json数据
* @throws IOException
* @throws FileNotFoundException
* @throws InvalidFormatException
*/
public static net.sf.json.JSONArray readExcel(File file) throws FileNotFoundException, IOException, InvalidFormatException {
int res = checkFile(file);
if (res == 0) {
throw new NullPointerException("the file is null.");
}else if (res == 1) {
return readXLSX(file);
}else if (res == 2) {
return readXLS(file);
public static net.sf.json.JSONArray readExcel(String pathStr,String hdfsUrl) throws IOException {
configuration.set("fs.defaultFS",hdfsUrl);
if (pathStr.endsWith(".xlsx")) {
return readXLSX(pathStr);
}else {
return readXLS(pathStr);
}
throw new IllegalAccessError("the file["+file.getName()+"] is not excel file.");
}
/**
* 判断File文件的类型
* @param file 传入的文件
* @return 0-文件为空1-XLSX文件2-XLS文件3-其他文件
*/
public static int checkFile(File file){
if (file==null) {
System.out.println("0");
return 0;
}
String flieName = file.getName();
if (flieName.endsWith(XLSX)) {
System.out.println("1");
return 1;
}
if (flieName.endsWith(XLS)) {
System.out.println("2");
return 2;
}
return 3;
// return new net.sf.json.JSONArray();
}
/**
* 读取XLSX文件
* @param file
* @param pathStr
* @return
* @throws IOException
* @throws InvalidFormatException
*/
public static net.sf.json.JSONArray readXLSX(File file) throws InvalidFormatException, IOException{
Workbook book = new XSSFWorkbook(file);
public static net.sf.json.JSONArray readXLSX(String pathStr) throws IOException{
FileSystem fs = FileSystem.get(configuration);
FSDataInputStream fdis = fs.open(new Path(pathStr));
System.out.println("xlsx");
Workbook book = new XSSFWorkbook(fdis);
Sheet sheet = book.getSheetAt(0);
return read(sheet, book);
}
/**
* 读取XLS文件
* @param file
* @param pathStr
* @return
* @throws IOException
* @throws FileNotFoundException
*/
public static net.sf.json.JSONArray readXLS(File file) throws FileNotFoundException, IOException{
POIFSFileSystem poifsFileSystem = new POIFSFileSystem(new FileInputStream(file));
public static net.sf.json.JSONArray readXLS(String pathStr) throws IOException{
FileSystem fs = FileSystem.get(configuration);
FSDataInputStream fdis = fs.open(new Path(pathStr));
System.out.println("xls");
POIFSFileSystem poifsFileSystem = new POIFSFileSystem(fdis);
Workbook book = new HSSFWorkbook(poifsFileSystem);
Sheet sheet = book.getSheetAt(0);
return read(sheet, book);