diff --git a/classpath/piflow-external.jar b/classpath/piflow-external.jar index 445ae83..5633af9 100644 Binary files a/classpath/piflow-external.jar and b/classpath/piflow-external.jar differ diff --git a/conf/config.properties b/conf/config.properties index 0ffd173..3becfa6 100644 --- a/conf/config.properties +++ b/conf/config.properties @@ -4,8 +4,14 @@ server.port=8001 #spark.master=spark://10.0.86.89:7077 #spark.master=spark://10.0.86.191:7077 spark.master=yarn -spark.deploy.mode=cluster +spark.deploy.mode=client +yarn.resourcemanager.hostname=10.0.86.191 +yarn.resourcemanager.address=10.0.86.191:8032 +yarn.access.namenode=hdfs://10.0.86.191:9000 +yarn.stagingDir=hdfs://10.0.86.191:9000/tmp/ +yarn.jars=hdfs://10.0.86.191:9000/user/spark/share/lib/*.jar +hive.metastore.uris=thrift://10.0.86.191:9083 piflow.bundle=/opt/project/piflow/out/artifacts/piflow_bundle/piflow-bundle.jar diff --git a/piflow-bundle/pom.xml b/piflow-bundle/pom.xml index ff10e6c..8c6112f 100644 --- a/piflow-bundle/pom.xml +++ b/piflow-bundle/pom.xml @@ -16,16 +16,22 @@ piflow-core 0.9 - org.clapper classutil_2.11 1.3.0 + + + + org.reflections + reflections + 0.9.11 + + + com.chuusai + shapeless_2.11 + 2.3.1 com.sksamuel.scrimage diff --git a/piflow-bundle/src/main/resources/core-site.xml b/piflow-bundle/src/main/resources/core-site.xml deleted file mode 100644 index 5501561..0000000 --- a/piflow-bundle/src/main/resources/core-site.xml +++ /dev/null @@ -1,17 +0,0 @@ - - - - fs.defaultFS - hdfs://10.0.86.191:9000 - - - - io.file.buffer.size - 131072 - - - - hadoop.tmp.dir - /opt/hadoop-2.6.0/tmp - - diff --git a/piflow-bundle/src/main/resources/hdfs-site.xml b/piflow-bundle/src/main/resources/hdfs-site.xml deleted file mode 100644 index 3de1e21..0000000 --- a/piflow-bundle/src/main/resources/hdfs-site.xml +++ /dev/null @@ -1,44 +0,0 @@ - - - - dfs.namenode.http-address - 10.0.86.191:50070 - - - - dfs.namenode.secondary.http-address - 10.0.86.191:50090 - - - dfs.replication - 1 - - - - dfs.namenode.name.dir - file:/opt/hadoop-2.6.0/dfs/name - - - - dfs.datanode.data.dir - file:/opt/hadoop-2.6.0/dfs/data - - - dfs.client.read.shortcircuit - true - - - dfs.domain.socket.path - /var/run/hadoop-hdfs/dn._PORT - - - - dfs.client.file-block-storage-locations.timeout - 10000 - - - dfs.datanode.hdfs-blocks-metadata.enabled - true - - - diff --git a/piflow-bundle/src/main/resources/hive-site.xml b/piflow-bundle/src/main/resources/hive-site.xml deleted file mode 100644 index cbba649..0000000 --- a/piflow-bundle/src/main/resources/hive-site.xml +++ /dev/null @@ -1,90 +0,0 @@ - - - hive.metastore.warehouse.dir - - /user/hive/warehouse - location of default database for the warehouse - - - - hive.metastore.uris - thrift://10.0.86.89:9083 - Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore. - - - - javax.jdo.option.ConnectionURL - jdbc:mysql://10.0.86.90:3306/hive?createDatabaseIfNotExist=true - JDBC connect string for a JDBC metastore - - - javax.jdo.option.ConnectionDriverName - com.mysql.jdbc.Driver - Driver class name for a JDBC metastore - - - - javax.jdo.option.ConnectionUserName - root - username to use against metastore database - - - javax.jdo.option.ConnectionPassword - root - password to use against metastore database - - - - - - hive.support.concurrency - true - - - - - - hive.enforce.bucketing - - true - - - - - - hive.exec.dynamic.partition.mode - - nonstrict - - - - - - hive.txn.manager - - org.apache.hadoop.hive.ql.lockmgr.DbTxnManager - - - - - - hive.compactor.initiator.on - - true - - - - - - hive.compactor.worker.threads - - 1 - - - - - - diff --git a/piflow-bundle/src/main/resources/yarn-site.xml b/piflow-bundle/src/main/resources/yarn-site.xml deleted file mode 100644 index 8a142db..0000000 --- a/piflow-bundle/src/main/resources/yarn-site.xml +++ /dev/null @@ -1,52 +0,0 @@ - - - yarn.resourcemanager.hostname - 10.0.86.191 - - - The address of the applications manager interface in the RM. - yarn.resourcemanager.address - ${yarn.resourcemanager.hostname}:8032 - - - The address of the scheduler interface. - yarn.resourcemanager.scheduler.address - ${yarn.resourcemanager.hostname}:8030 - - - The http address of the RM web application. - yarn.resourcemanager.webapp.address - ${yarn.resourcemanager.hostname}:8088 - - - The https adddress of the RM web application. - yarn.resourcemanager.webapp.https.address - ${yarn.resourcemanager.hostname}:8090 - - - yarn.resourcemanager.resource-tracker.address - ${yarn.resourcemanager.hostname}:8031 - - - The address of the RM admin interface. - yarn.resourcemanager.admin.address - ${yarn.resourcemanager.hostname}:8033 - - - yarn.nodemanager.aux-services - mapreduce_shuffle - - - yarn.scheduler.maximum-allocation-mb - 12288 - 每个节点可用内存,单位MB,默认8182MB - - - yarn.nodemanager.pmem-check-enabled - false - - - yarn.nodemanager.vmem-check-enabled - false - - diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/clean/EmailClean.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/clean/EmailClean.scala index 25e611c..c2adf1e 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/clean/EmailClean.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/clean/EmailClean.scala @@ -4,12 +4,13 @@ import java.beans.Transient import cn.piflow.bundle.util.CleanUtil import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} -import cn.piflow.conf.{CleanGroup,ConfigurableStop, StopGroup} +import cn.piflow.conf.{CleanGroup, ConfigurableStop, StopGroup, StopGroupEnum} import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.util.MapUtil import org.apache.spark.sql.SparkSession class EmailClean extends ConfigurableStop{ + val authorEmail: String = "xiaoxiao@cnic.cn" val inportCount: Int = 0 val outportCount: Int = 1 var columnName:String=_ @@ -46,8 +47,8 @@ class EmailClean extends ConfigurableStop{ override def getIcon(): Array[Byte] = ??? - override def getGroup(): StopGroup = { - CleanGroup + override def getGroup(): List[String] = { + List(StopGroupEnum.CleanGroup.toString) } } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/clean/IdentityNumberClean.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/clean/IdentityNumberClean.scala index 888db4e..2ae4560 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/clean/IdentityNumberClean.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/clean/IdentityNumberClean.scala @@ -6,7 +6,7 @@ import java.util.{Calendar, Date} import cn.piflow.bundle.util.CleanUtil import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} -import cn.piflow.conf.{CleanGroup, ConfigurableStop, FileGroup, StopGroup} +import cn.piflow.conf._ import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.util.MapUtil import org.apache.spark.sql.SparkSession @@ -14,6 +14,7 @@ import org.apache.spark.sql.SparkSession import scala.reflect.macros.ParseException class IdentityNumberClean extends ConfigurableStop{ + val authorEmail: String = "xiaoxiao@cnic.cn" val inportCount: Int = 0 val outportCount: Int = 1 //var regex:String =_ @@ -51,8 +52,7 @@ class IdentityNumberClean extends ConfigurableStop{ override def getIcon(): Array[Byte] = ??? - override def getGroup(): StopGroup = { - CleanGroup + override def getGroup(): List[String] = { + List(StopGroupEnum.CleanGroup.toString) } - } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/clean/PhoneNumberClean.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/clean/PhoneNumberClean.scala index 584f958..5864f01 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/clean/PhoneNumberClean.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/clean/PhoneNumberClean.scala @@ -2,12 +2,13 @@ package cn.piflow.bundle.clean import cn.piflow.bundle.util.CleanUtil import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} -import cn.piflow.conf.{CleanGroup, ConfigurableStop, StopGroup} +import cn.piflow.conf.{CleanGroup, ConfigurableStop, StopGroup, StopGroupEnum} import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.util.MapUtil import org.apache.spark.sql.SparkSession class PhoneNumberClean extends ConfigurableStop{ + val authorEmail: String = "xiaoxiao@cnic.cn" val inportCount: Int = 0 val outportCount: Int = 1 var columnName:String=_ @@ -45,8 +46,8 @@ class PhoneNumberClean extends ConfigurableStop{ override def getIcon(): Array[Byte] = ??? - override def getGroup(): StopGroup = { - CleanGroup + override def getGroup(): List[String] = { + List(StopGroupEnum.CleanGroup.toString) } } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/clean/TitleClean.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/clean/TitleClean.scala index 5a71533..fa413b6 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/clean/TitleClean.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/clean/TitleClean.scala @@ -2,12 +2,13 @@ package cn.piflow.bundle.clean import cn.piflow.bundle.util.CleanUtil import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} -import cn.piflow.conf.{CleanGroup, ConfigurableStop, StopGroup} +import cn.piflow.conf.{CleanGroup, ConfigurableStop, StopGroup, StopGroupEnum} import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.util.MapUtil import org.apache.spark.sql.SparkSession class TitleClean extends ConfigurableStop{ + val authorEmail: String = "xiaoxiao@cnic.cn" val inportCount: Int = 0 val outportCount: Int = 1 var columnName:String=_ @@ -43,8 +44,8 @@ class TitleClean extends ConfigurableStop{ override def getIcon(): Array[Byte] = ??? - override def getGroup(): StopGroup = { - CleanGroup + override def getGroup(): List[String] = { + List(StopGroupEnum.CleanGroup.toString) } } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Fork.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Fork.scala index bf406f1..94d5fbf 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Fork.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Fork.scala @@ -1,6 +1,6 @@ package cn.piflow.bundle.common -import cn.piflow.conf.{CommonGroup, ConfigurableStop, StopGroup} +import cn.piflow.conf.{CommonGroup, ConfigurableStop, StopGroup, StopGroupEnum} import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.util.MapUtil import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} @@ -9,6 +9,7 @@ import scala.beans.BeanProperty class Fork extends ConfigurableStop{ + val authorEmail: String = "xjzhu@cnic.cn" val inportCount: Int = 1 val outportCount: Int = -1 @@ -30,8 +31,8 @@ class Fork extends ConfigurableStop{ override def getIcon(): Array[Byte] = ??? - override def getGroup(): StopGroup = { - CommonGroup + override def getGroup(): List[String] = { + List(StopGroupEnum.CommonGroup.toString) } } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Merge.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Merge.scala index 95a86e1..a042581 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Merge.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/Merge.scala @@ -1,6 +1,6 @@ package cn.piflow.bundle.common -import cn.piflow.conf.{CommonGroup, ConfigurableStop, StopGroup} +import cn.piflow.conf.{CommonGroup, ConfigurableStop, StopGroup, StopGroupEnum} import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} @@ -8,6 +8,7 @@ import scala.beans.BeanProperty class Merge extends ConfigurableStop{ + val authorEmail: String = "xjzhu@cnic.cn" val inportCount: Int = -1 val outportCount: Int = 1 @@ -29,8 +30,8 @@ class Merge extends ConfigurableStop{ override def getIcon(): Array[Byte] = ??? - override def getGroup(): StopGroup = { - CommonGroup + override def getGroup(): List[String] = { + List(StopGroupEnum.CommonGroup.toString) } } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/SelectField.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/SelectField.scala index 380d7f7..e9deaac 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/SelectField.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/SelectField.scala @@ -1,7 +1,7 @@ package cn.piflow.bundle.common import cn.piflow._ -import cn.piflow.conf.{CommonGroup, ConfigurableStop, StopGroup} +import cn.piflow.conf.{CommonGroup, ConfigurableStop, StopGroup, StopGroupEnum} import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.util.MapUtil import org.apache.spark.sql.{Column, DataFrame} @@ -11,6 +11,7 @@ import scala.beans.BeanProperty class SelectField extends ConfigurableStop { + val authorEmail: String = "xjzhu@cnic.cn" val inportCount: Int = 1 val outportCount: Int = 1 @@ -44,8 +45,8 @@ class SelectField extends ConfigurableStop { override def getIcon(): Array[Byte] = ??? - override def getGroup(): StopGroup = { - CommonGroup + override def getGroup(): List[String] = { + List(StopGroupEnum.CommonGroup.toString) } } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvParser.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvParser.scala index b84014a..02bb4de 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvParser.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvParser.scala @@ -1,7 +1,7 @@ package cn.piflow.bundle.csv import cn.piflow._ -import cn.piflow.conf.{ConfigurableStop, CsvGroup, HiveGroup, StopGroup} +import cn.piflow.conf._ import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.util.MapUtil import org.apache.spark.sql.types.{StringType, StructField, StructType} @@ -10,6 +10,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession} class CsvParser extends ConfigurableStop{ + val authorEmail: String = "xjzhu@cnic.cn" val inportCount: Int = 0 val outportCount: Int = 1 @@ -68,8 +69,8 @@ class CsvParser extends ConfigurableStop{ override def getIcon(): Array[Byte] = ??? - def getGroup() : StopGroup = { - CsvGroup + override def getGroup(): List[String] = { + List(StopGroupEnum.CsvGroup.toString) } } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvSave.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvSave.scala index 52cad43..4f01cb2 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvSave.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvSave.scala @@ -3,12 +3,13 @@ package cn.piflow.bundle.csv import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.util.{ImageUtil, MapUtil} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} -import cn.piflow.conf.{ConfigurableStop, CsvGroup, StopGroup} +import cn.piflow.conf.{ConfigurableStop, CsvGroup, StopGroup, StopGroupEnum} import org.apache.spark.sql.SaveMode class CsvSave extends ConfigurableStop{ - override val inportCount: Int = 1 - override val outportCount: Int = 0 + val authorEmail: String = "xjzhu@cnic.cn" + val inportCount: Int = 1 + val outportCount: Int = 0 var csvSavePath: String = _ var header: Boolean = _ @@ -42,8 +43,8 @@ class CsvSave extends ConfigurableStop{ ImageUtil.getImage("./src/main/resources/selectHiveQL.jpg") } - override def getGroup(): StopGroup = { - CsvGroup + override def getGroup(): List[String] = { + List(StopGroupEnum.CsvGroup.toString) } override def initialize(ctx: ProcessContext): Unit = { diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/file/FetchFile.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/file/FetchFile.scala index 5415d4d..95cc489 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/file/FetchFile.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/file/FetchFile.scala @@ -3,13 +3,14 @@ package cn.piflow.bundle.file import java.net.URI import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} -import cn.piflow.conf.{ConfigurableStop, FileGroup, StopGroup} +import cn.piflow.conf.{ConfigurableStop, FileGroup, StopGroup, StopGroupEnum} import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.util.MapUtil import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} class FetchFile extends ConfigurableStop{ + val authorEmail: String = "xiaoxiao@cnic.cn" val inportCount: Int = 0 val outportCount: Int = 1 var hdfs_path:String =_ @@ -53,8 +54,8 @@ class FetchFile extends ConfigurableStop{ override def getIcon(): Array[Byte] = ??? - override def getGroup(): StopGroup = { - FileGroup + override def getGroup(): List[String] = { + List(StopGroupEnum.FileGroup.toString) } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/file/PutFile.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/file/PutFile.scala index 59f49d1..b4460a7 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/file/PutFile.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/file/PutFile.scala @@ -6,13 +6,14 @@ import java.net.{HttpURLConnection, URI, URL} import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} -import cn.piflow.conf.{ConfigurableStop, FileGroup, StopGroup} +import cn.piflow.conf.{ConfigurableStop, FileGroup, StopGroup, StopGroupEnum} import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.util.MapUtil import org.apache.hadoop.conf.Configuration import org.apache.spark.sql.SparkSession class PutFile extends ConfigurableStop{ + val authorEmail: String = "xiaoxiao@cnic.cn" val inportCount: Int = 0 val outportCount: Int = 1 var hdfs_path:String =_ @@ -56,8 +57,8 @@ class PutFile extends ConfigurableStop{ override def getIcon(): Array[Byte] = ??? - override def getGroup(): StopGroup = { - FileGroup + override def getGroup(): List[String] = { + List(StopGroupEnum.FileGroup.toString) } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/file/RegexTextProcess.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/file/RegexTextProcess.scala index fef8dde..00fb7d8 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/file/RegexTextProcess.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/file/RegexTextProcess.scala @@ -3,12 +3,13 @@ package cn.piflow.bundle.file import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.util.MapUtil import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} -import cn.piflow.conf.{ConfigurableStop, FileGroup, StopGroup} +import cn.piflow.conf.{ConfigurableStop, FileGroup, StopGroup, StopGroupEnum} import org.apache.spark.sql.SparkSession class RegexTextProcess extends ConfigurableStop{ + val authorEmail: String = "xiaoxiao@cnic.cn" val inportCount: Int = 0 val outportCount: Int = 1 var regex:String =_ @@ -55,7 +56,7 @@ class RegexTextProcess extends ConfigurableStop{ override def getIcon(): Array[Byte] = ??? - override def getGroup(): StopGroup = { - FileGroup + override def getGroup(): List[String] = { + List(StopGroupEnum.FileGroup.toString) } } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/ftp/LoadFromFtp.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/ftp/LoadFromFtp.scala index 0ac1425..8e60bb3 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/ftp/LoadFromFtp.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/ftp/LoadFromFtp.scala @@ -5,7 +5,7 @@ import java.net.{HttpURLConnection, URL} import java.util import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} -import cn.piflow.conf.{ConfigurableStop, FtpGroup, HttpGroup, StopGroup} +import cn.piflow.conf.{ConfigurableStop, FtpGroup, HttpGroup, StopGroup, StopGroupEnum} import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.util.MapUtil import sun.net.ftp.{FtpClient, FtpDirEntry} @@ -13,6 +13,7 @@ import sun.net.ftp.{FtpClient, FtpDirEntry} import scala.reflect.io.Directory class LoadFromFtp extends ConfigurableStop{ + val authorEmail: String = "xiaoxiao@cnic.cn" val inportCount: Int = 0 val outportCount: Int = 1 var url_str:String =_ @@ -155,8 +156,8 @@ class LoadFromFtp extends ConfigurableStop{ override def getIcon(): Array[Byte] = ??? - override def getGroup(): StopGroup = { - FtpGroup + override def getGroup(): List[String] = { + List(StopGroupEnum.FtpGroup.toString) } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/ftp/UploadToFtp.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/ftp/UploadToFtp.scala index e926d85..ce7e89f 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/ftp/UploadToFtp.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/ftp/UploadToFtp.scala @@ -4,7 +4,7 @@ import java.io.{DataOutputStream, File, InputStream, OutputStream} import java.util import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} -import cn.piflow.conf.{ConfigurableStop, FtpGroup, StopGroup} +import cn.piflow.conf.{ConfigurableStop, FtpGroup, StopGroup, StopGroupEnum} import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.util.MapUtil import sun.net.TelnetOutputStream @@ -13,6 +13,7 @@ import sun.net.ftp.{FtpClient, FtpDirEntry} import scala.reflect.io.Directory class UploadToFtp extends ConfigurableStop{ + val authorEmail: String = "xiaoxiao@cnic.cn" val inportCount: Int = 0 val outportCount: Int = 1 var url_str:String =_ @@ -169,8 +170,7 @@ class UploadToFtp extends ConfigurableStop{ override def getIcon(): Array[Byte] = ??? - override def getGroup(): StopGroup = { - FtpGroup - } - + override def getGroup(): List[String] = { + List(StopGroupEnum.FtpGroup.toString) + } } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/hive/PutHiveQL.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/hive/PutHiveQL.scala index ca82554..a7d5e5a 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/hive/PutHiveQL.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/hive/PutHiveQL.scala @@ -2,12 +2,13 @@ package cn.piflow.bundle.hive import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.util.MapUtil -import cn.piflow.conf.{ConfigurableStop, HiveGroup, StopGroup} +import cn.piflow.conf.{ConfigurableStop, HiveGroup, StopGroup, StopGroupEnum} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} import org.apache.spark.sql.SparkSession class PutHiveQL extends ConfigurableStop { + val authorEmail: String = "xiaoxiao@cnic.cn" val inportCount: Int = 0 val outportCount: Int = 1 @@ -56,8 +57,8 @@ class PutHiveQL extends ConfigurableStop { override def getIcon(): Array[Byte] = ??? - override def getGroup(): StopGroup = { - HiveGroup + override def getGroup(): List[String] = { + List(StopGroupEnum.HiveGroup.toString) } } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/hive/PutHiveStreaming.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/hive/PutHiveStreaming.scala index 3b98f85..449aec8 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/hive/PutHiveStreaming.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/hive/PutHiveStreaming.scala @@ -1,7 +1,7 @@ package cn.piflow.bundle.hive import cn.piflow._ -import cn.piflow.conf.{ConfigurableStop, HiveGroup, StopGroup} +import cn.piflow.conf.{ConfigurableStop, HiveGroup, StopGroup, StopGroupEnum} import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.util.MapUtil import org.apache.spark.sql.SparkSession @@ -10,6 +10,7 @@ import scala.beans.BeanProperty class PutHiveStreaming extends ConfigurableStop { + val authorEmail: String = "xjzhu@cnic.cn" val inportCount: Int = 1 val outportCount: Int = 0 @@ -40,8 +41,9 @@ class PutHiveStreaming extends ConfigurableStop { override def getIcon(): Array[Byte] = ??? - override def getGroup(): StopGroup = { - HiveGroup + override def getGroup(): List[String] = { + List(StopGroupEnum.HiveGroup.toString) } + } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/hive/SelectHiveQL.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/hive/SelectHiveQL.scala index 62f1dee..4235ef6 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/hive/SelectHiveQL.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/hive/SelectHiveQL.scala @@ -2,7 +2,7 @@ package cn.piflow.bundle.hive import cn.piflow._ import cn.piflow.conf.util.ImageUtil -import cn.piflow.conf.{ConfigurableStop, HiveGroup, StopGroup} +import cn.piflow.conf.{ConfigurableStop, HiveGroup, StopGroup, StopGroupEnum} import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.util.{ImageUtil, MapUtil} import org.apache.spark.sql.SparkSession @@ -13,6 +13,7 @@ import scala.beans.BeanProperty class SelectHiveQL extends ConfigurableStop { + val authorEmail: String = "xjzhu@cnic.cn" val inportCount: Int = 0 val outportCount: Int = 1 @@ -38,7 +39,7 @@ class SelectHiveQL extends ConfigurableStop { override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - val hiveQL = new PropertyDescriptor().name("hiveQL").displayName("HiveQL").defaultValue("").required(true) + val hiveQL = new PropertyDescriptor().name("hiveQL").displayName("HiveQL").defaultValue("").allowableValues(Set("")).required(true) descriptor = hiveQL :: descriptor descriptor } @@ -47,10 +48,11 @@ class SelectHiveQL extends ConfigurableStop { ImageUtil.getImage("./src/main/resources/selectHiveQL.jpg") } - override def getGroup(): StopGroup = { - HiveGroup + override def getGroup(): List[String] = { + List(StopGroupEnum.HiveGroup.toString) } + } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/http/LoadZipFromUrl.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/http/LoadZipFromUrl.scala index 038a10a..25c1906 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/http/LoadZipFromUrl.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/http/LoadZipFromUrl.scala @@ -3,13 +3,14 @@ package cn.piflow.bundle.http import java.io._ import java.net.{HttpURLConnection, URL} -import cn.piflow.conf.{ConfigurableStop, HttpGroup, StopGroup} +import cn.piflow.conf.{ConfigurableStop, HttpGroup, StopGroup, StopGroupEnum} import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.util.MapUtil import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} import org.apache.spark.sql.SparkSession class LoadZipFromUrl extends ConfigurableStop{ + val authorEmail: String = "xiaoxiao@cnic.cn" val inportCount: Int = 0 val outportCount: Int = 1 var url_str:String =_ @@ -81,8 +82,9 @@ class LoadZipFromUrl extends ConfigurableStop{ override def getIcon(): Array[Byte] = ??? - override def getGroup(): StopGroup = { - HttpGroup + override def getGroup(): List[String] = { + List(StopGroupEnum.HttpGroup.toString) } + } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/http/UnGZip.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/http/UnGZip.scala index b254c74..57864b9 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/http/UnGZip.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/http/UnGZip.scala @@ -4,13 +4,14 @@ import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream} import java.lang.Exception import java.util.zip.GZIPInputStream -import cn.piflow.conf.{ConfigurableStop, HiveGroup, HttpGroup, StopGroup} +import cn.piflow.conf._ import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} import org.apache.commons.compress.archivers.tar.{TarArchiveEntry, TarArchiveInputStream} import org.apache.spark.sql.{DataFrame, SparkSession} class UnGZip extends ConfigurableStop { + val authorEmail: String = "xiaoxiao@cnic.cn" val inportCount: Int = 0 val outportCount: Int = 1 val fileTypes:List[String]=List("tar.gz","tar","gz") @@ -207,8 +208,8 @@ class UnGZip extends ConfigurableStop { override def getIcon(): Array[Byte] = ??? - override def getGroup(): StopGroup = { - HttpGroup + override def getGroup(): List[String] = { + List(StopGroupEnum.HttpGroup.toString) } } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/JdbcRead.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/JdbcRead.scala index 7493e33..7e0a52c 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/JdbcRead.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/JdbcRead.scala @@ -1,7 +1,7 @@ package cn.piflow.bundle.jdbc import cn.piflow._ -import cn.piflow.conf.{ConfigurableStop, JdbcGroup, StopGroup} +import cn.piflow.conf.{ConfigurableStop, JdbcGroup, StopGroup, StopGroupEnum} import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.util.MapUtil import org.apache.spark.sql.SparkSession @@ -9,6 +9,7 @@ import org.apache.spark.sql.SparkSession import scala.beans.BeanProperty class JdbcRead extends ConfigurableStop { + val authorEmail: String = "xjzhu@cnic.cn" val inportCount: Int = 0 val outportCount: Int = 1 @@ -50,8 +51,8 @@ class JdbcRead extends ConfigurableStop { override def getIcon(): Array[Byte] = ??? - override def getGroup(): StopGroup = { - JdbcGroup + override def getGroup(): List[String] = { + List(StopGroupEnum.JdbcGroup.toString) } } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/JdbcWrite.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/JdbcWrite.scala index 3faed33..081a936 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/JdbcWrite.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/JdbcWrite.scala @@ -3,7 +3,7 @@ package cn.piflow.bundle.jdbc import java.util.Properties import cn.piflow._ -import cn.piflow.conf.{ConfigurableStop, JdbcGroup, StopGroup} +import cn.piflow.conf.{ConfigurableStop, JdbcGroup, StopGroup, StopGroupEnum} import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.util.MapUtil import org.apache.spark.sql.{SaveMode, SparkSession} @@ -12,6 +12,7 @@ import scala.beans.BeanProperty class JdbcWrite extends ConfigurableStop{ + val authorEmail: String = "xjzhu@cnic.cn" val inportCount: Int = 1 val outportCount: Int = 0 @@ -46,8 +47,9 @@ class JdbcWrite extends ConfigurableStop{ override def getIcon(): Array[Byte] = ??? - override def getGroup(): StopGroup = { - JdbcGroup + override def getGroup(): List[String] = { + List(StopGroupEnum.JdbcGroup.toString) } + } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/json/JsonParser.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/json/JsonParser.scala index 281d9c6..cce204d 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/json/JsonParser.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/json/JsonParser.scala @@ -1,7 +1,7 @@ package cn.piflow.bundle.json import cn.piflow._ -import cn.piflow.conf.{ConfigurableStop, JsonGroup, StopGroup} +import cn.piflow.conf.{ConfigurableStop, JsonGroup, StopGroup, StopGroupEnum} import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.util.MapUtil import org.apache.spark.sql.SparkSession @@ -10,8 +10,9 @@ import scala.beans.BeanProperty class JsonPathParser extends ConfigurableStop{ - @BeanProperty val inportCount: Int = 1 - @BeanProperty val outportCount: Int = 1 + val authorEmail: String = "xjzhu@cnic.cn" + val inportCount: Int = 1 + val outportCount: Int = 1 var jsonPath: String = _ var tag : String = _ @@ -40,14 +41,15 @@ class JsonPathParser extends ConfigurableStop{ override def getIcon(): Array[Byte] = ??? - override def getGroup(): StopGroup = { - JsonGroup + override def getGroup(): List[String] = { + List(StopGroupEnum.JsonGroup.toString) } + } class JsonStringParser extends ConfigurableStop{ - + val authorEmail: String = "xjzhu@cnic.cn" val inportCount: Int = 1 val outportCount: Int = 1 @@ -75,8 +77,9 @@ class JsonStringParser extends ConfigurableStop{ override def getIcon(): Array[Byte] = ??? - override def getGroup(): StopGroup = { - JsonGroup + override def getGroup(): List[String] = { + List(StopGroupEnum.JsonGroup.toString) } + } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/json/JsonSave.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/json/JsonSave.scala index f5a63b4..0f42dab 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/json/JsonSave.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/json/JsonSave.scala @@ -1,7 +1,7 @@ package cn.piflow.bundle.json import cn.piflow._ -import cn.piflow.conf.{ConfigurableStop, JsonGroup, StopGroup} +import cn.piflow.conf.{ConfigurableStop, JsonGroup, StopGroup, StopGroupEnum} import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.util.MapUtil import org.apache.spark.sql.SaveMode @@ -10,6 +10,7 @@ import scala.beans.BeanProperty class JsonSave extends ConfigurableStop{ + val authorEmail: String = "xjzhu@cnic.cn" val inportCount: Int = 1 val outportCount: Int = 0 @@ -35,8 +36,9 @@ class JsonSave extends ConfigurableStop{ override def getIcon(): Array[Byte] = ??? - override def getGroup(): StopGroup = { - JsonGroup + override def getGroup(): List[String] = { + List(StopGroupEnum.JsonGroup.toString) } + } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/script/DataFrameRowParser.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/script/DataFrameRowParser.scala index 48d3010..c54b222 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/script/DataFrameRowParser.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/script/DataFrameRowParser.scala @@ -2,7 +2,7 @@ package cn.piflow.bundle.script import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.util.{ImageUtil, MapUtil} -import cn.piflow.conf.{CommonGroup, ConfigurableStop, ScriptGroup, StopGroup} +import cn.piflow.conf._ import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{Row, SparkSession} @@ -11,6 +11,7 @@ import scala.beans.BeanProperty class DataFrameRowParser extends ConfigurableStop{ + val authorEmail: String = "xjzhu@cnic.cn" val inportCount: Int = 1 val outportCount: Int = 1 @@ -28,10 +29,11 @@ class DataFrameRowParser extends ConfigurableStop{ ImageUtil.getImage("./src/main/resources/DataFrameParse.jpg") } - override def getGroup(): StopGroup = { - ScriptGroup + override def getGroup(): List[String] = { + List(StopGroupEnum.ScriptGroup.toString) } + override def initialize(ctx: ProcessContext): Unit = {} override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/script/ShellExecutor.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/script/ShellExecutor.scala index 5131387..b3353f6 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/script/ShellExecutor.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/script/ShellExecutor.scala @@ -3,7 +3,7 @@ package cn.piflow.bundle.script import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.util.{ImageUtil, MapUtil} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} -import cn.piflow.conf.{ConfigurableStop, ScriptGroup, StopGroup} +import cn.piflow.conf.{ConfigurableStop, ScriptGroup, StopGroup, StopGroupEnum} import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.types.{StringType, StructField, StructType} @@ -12,6 +12,7 @@ import sys.process._ class ShellExecutor extends ConfigurableStop{ + val authorEmail: String = "xjzhu@cnic.cn" val inportCount: Int = 0 val outportCount: Int = 1 @@ -32,8 +33,8 @@ class ShellExecutor extends ConfigurableStop{ ImageUtil.getImage("./src/main/resources/ShellExecutor.jpg") } - override def getGroup(): StopGroup = { - ScriptGroup + override def getGroup(): List[String] = { + List(StopGroupEnum.ScriptGroup.toString) } override def initialize(ctx: ProcessContext): Unit = { diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/xml/FolderXmlParser.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/xml/FolderXmlParser.scala index 3257f2a..53de666 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/xml/FolderXmlParser.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/xml/FolderXmlParser.scala @@ -4,7 +4,7 @@ import java.net.URI import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.util.MapUtil -import cn.piflow.conf.{ConfigurableStop, StopGroup, XmlGroup} +import cn.piflow.conf.{ConfigurableStop, StopGroup, StopGroupEnum, XmlGroup} import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} @@ -17,8 +17,9 @@ import scala.util.control.Breaks._ * Created by admin on 2018/8/27. */ class FolderXmlParser extends ConfigurableStop{ - override val inportCount: Int = -1 - override val outportCount: Int = 1 + val authorEmail: String = "lijie" + val inportCount: Int = -1 + val outportCount: Int = 1 var rowTag:String = _ var xmlpath:String = _ @@ -44,8 +45,8 @@ class FolderXmlParser extends ConfigurableStop{ override def getIcon(): Array[Byte] = ??? - override def getGroup(): StopGroup = { - XmlGroup + override def getGroup(): List[String] = { + List(StopGroupEnum.XmlGroup.toString) } override def initialize(ctx: ProcessContext): Unit ={} diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/xml/XmlParser.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/xml/XmlParser.scala index cf47385..586e84a 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/xml/XmlParser.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/xml/XmlParser.scala @@ -1,7 +1,7 @@ package cn.piflow.bundle.xml import cn.piflow._ -import cn.piflow.conf.{ConfigurableStop, StopGroup, XmlGroup} +import cn.piflow.conf.{ConfigurableStop, StopGroup, StopGroupEnum, XmlGroup} import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.util.MapUtil import org.apache.spark.sql.SparkSession @@ -11,6 +11,7 @@ import scala.beans.BeanProperty class XmlParser extends ConfigurableStop { + val authorEmail: String = "xjzhu@cnic.cn" val inportCount: Int = 1 val outportCount: Int = 1 @@ -49,8 +50,8 @@ class XmlParser extends ConfigurableStop { override def getIcon(): Array[Byte] = ??? - override def getGroup(): StopGroup = { - XmlGroup + override def getGroup(): List[String] = { + List(StopGroupEnum.XmlGroup.toString) } } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/xml/XmlSave.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/xml/XmlSave.scala index f44c071..b8c64fa 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/xml/XmlSave.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/xml/XmlSave.scala @@ -1,7 +1,7 @@ package cn.piflow.bundle.xml import cn.piflow._ -import cn.piflow.conf.{ConfigurableStop, StopGroup, XmlGroup} +import cn.piflow.conf.{ConfigurableStop, StopGroup, StopGroupEnum, XmlGroup} import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.util.MapUtil import org.codehaus.jackson.map.ext.CoreXMLSerializers.XMLGregorianCalendarSerializer @@ -10,6 +10,7 @@ import scala.beans.BeanProperty class XmlSave extends ConfigurableStop{ + val authorEmail: String = "xjzhu@cnic.cn" val inportCount: Int = 1 val outportCount: Int = 0 @@ -34,8 +35,8 @@ class XmlSave extends ConfigurableStop{ override def getIcon(): Array[Byte] = ??? - override def getGroup(): StopGroup = { - XmlGroup + override def getGroup(): List[String] = { + List(StopGroupEnum.XmlGroup.toString) } } diff --git a/piflow-bundle/src/main/scala/cn/piflow/conf/ConfigurableStop.scala b/piflow-bundle/src/main/scala/cn/piflow/conf/ConfigurableStop.scala index eb02d9b..f02fa27 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/conf/ConfigurableStop.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/conf/ConfigurableStop.scala @@ -4,17 +4,19 @@ import cn.piflow.Stop import cn.piflow.conf.bean.PropertyDescriptor -trait ConfigurableStop extends Stop{ +abstract class ConfigurableStop extends Stop{ + val authorEmail : String val inportCount : Int val outportCount : Int + def setProperties(map: Map[String, Any]) def getPropertyDescriptor() : List[PropertyDescriptor] def getIcon() : Array[Byte] - def getGroup() : StopGroup + def getGroup() : List[String] } diff --git a/piflow-bundle/src/main/scala/cn/piflow/conf/StopGroupEnum.scala b/piflow-bundle/src/main/scala/cn/piflow/conf/StopGroupEnum.scala new file mode 100644 index 0000000..8761aad --- /dev/null +++ b/piflow-bundle/src/main/scala/cn/piflow/conf/StopGroupEnum.scala @@ -0,0 +1,18 @@ +package cn.piflow.conf + +object StopGroupEnum extends Enumeration { + + type StopGroup = Value + val CommonGroup = Value("CommonGroup") + val CsvGroup = Value("CsvGroup") + val HiveGroup = Value("HiveGroup") + val JdbcGroup = Value("JdbcGroup") + val JsonGroup = Value("JsonGroup") + val XmlGroup = Value("XmlGroup") + val HttpGroup = Value("HttpGroup") + val FtpGroup = Value("FtpGroup") + val ScriptGroup = Value("ScriptGroup") + val FileGroup = Value("FileGroup") + val CleanGroup = Value("CleanGroup") + +} diff --git a/piflow-bundle/src/main/scala/cn/piflow/conf/bean/PropertyDescriptor.scala b/piflow-bundle/src/main/scala/cn/piflow/conf/bean/PropertyDescriptor.scala index c4de27b..d6fc7a5 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/conf/bean/PropertyDescriptor.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/conf/bean/PropertyDescriptor.scala @@ -1,5 +1,8 @@ package cn.piflow.conf.bean +import net.liftweb.json.JsonDSL._ +import net.liftweb.json._ + class PropertyDescriptor { var name : String = _ var displayName : String = _ @@ -38,6 +41,22 @@ class PropertyDescriptor { this.sensitive = sensitive this } + def toJson():String = { + val allowableValueStr = if(this.allowableValues == null) "" else this.allowableValues.mkString(",") + val json = + ("property" -> + ("name" -> this.name) ~ + ("displayName" -> this.displayName) ~ + ("description" -> this.description) ~ + ("defaultValue" -> this.defaultValue) ~ + ("allowableValues" -> allowableValueStr) ~ + ("required" -> this.required.toString) ~ + ("sensitive" -> this.sensitive.toString)) + + + val jsonString = compactRender(json) + jsonString + } } object PropertyDescriptor{ diff --git a/piflow-bundle/src/main/scala/cn/piflow/conf/bean/StopBean.scala b/piflow-bundle/src/main/scala/cn/piflow/conf/bean/StopBean.scala index 373e09a..0825d04 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/conf/bean/StopBean.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/conf/bean/StopBean.scala @@ -19,33 +19,14 @@ class StopBean { } def constructStop() : ConfigurableStop = { - //val stop = Class.forName(this.bundle).getConstructor(classOf[Map[String, String]]).newInstance(this.properties) - /*val stop = Class.forName(this.bundle).newInstance() - stop.asInstanceOf[ConfigurableStop].setProperties(this.properties) - stop.asInstanceOf[ConfigurableStop]*/ + try{ - val stop = Class.forName(this.bundle).newInstance() - stop.asInstanceOf[ConfigurableStop].setProperties(this.properties) - stop.asInstanceOf[ConfigurableStop] - }catch{ - - case classNotFoundException:ClassNotFoundException =>{ - val stop : Option[ConfigurableStop] = ClassUtil.findConfigurableStop(this.bundle) - stop match { - case Some(s) => { - s.asInstanceOf[ConfigurableStop].setProperties(this.properties) - s.asInstanceOf[ConfigurableStop] - } - case _ => throw new ClassNotFoundException(this.bundle + " is not found!!!") - } - } - case _ => throw new ClassNotFoundException(this.bundle + " is not found!!!") - - + val stop = ClassUtil.findConfigurableStop(this.bundle) + stop.setProperties(this.properties) + stop + }catch { + case ex : Exception => throw ex } - - - } } diff --git a/piflow-bundle/src/main/scala/cn/piflow/conf/util/ClassUtil.scala b/piflow-bundle/src/main/scala/cn/piflow/conf/util/ClassUtil.scala index 9127af1..043e49b 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/conf/util/ClassUtil.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/conf/util/ClassUtil.scala @@ -3,7 +3,12 @@ package cn.piflow.conf.util import java.io.File import cn.piflow.conf.ConfigurableStop +import cn.piflow.conf.bean.PropertyDescriptor +import net.liftweb.json.compactRender import org.clapper.classutil.ClassFinder +import org.reflections.Reflections +import net.liftweb.json.JsonDSL._ +import net.liftweb.json._ object ClassUtil { @@ -11,11 +16,9 @@ object ClassUtil { val configurableStopClass:String = "cn.piflow.conf.ConfigurableStop" //val classpath:String = "/opt/project/piflow/classpath" - val classpath = System.getProperty("user.dir")+ "/classpath/" - - - def findAllConfigurableStop() : List[String] = { + /*def findAllConfigurableStopByClassFinder() : List[String] = { + val classpath = System.getProperty("user.dir") var stopList : List[String] = List() val classpathFile = new File(classpath) @@ -33,39 +36,177 @@ object ClassUtil { } stopList + }*/ + + def findAllConfigurableStop() : List[ConfigurableStop] = { + var stopList:List[ConfigurableStop] = List() + + //find internal stop + val reflections = new Reflections("") + val allClasses = reflections.getSubTypesOf(classOf[ConfigurableStop]) + val it = allClasses.iterator(); + while(it.hasNext) { + val plugin = Class.forName(it.next().getName).newInstance() + val stop = plugin.asInstanceOf[ConfigurableStop] + stopList = stop +: stopList + } + + //find external stop + stopList = stopList ::: findAllConfigurableStopInClasspath() + stopList } - def findConfigurableStop(bundle : String) : Option[ConfigurableStop] = { - var stop:Option[ConfigurableStop] = None + private def findAllConfigurableStopInClasspath() : List[ConfigurableStop] = { + + val classpath = System.getProperty("user.dir")+ "/classpath/" + var stopList:List[ConfigurableStop] = List() val classpathFile = new File(classpath) - println("classpath is " + classpath) + //println("classpath is " + classpath) val finder = ClassFinder(getJarFile(classpathFile)) val classes = finder.getClasses - val classMap = ClassFinder.classInfoMap(classes) + val it = classes.iterator + + while(it.hasNext) { + val externalClass = it.next() + if(externalClass.superClassName.equals(configurableStopClass)){ + + val stopIntance = Class.forName(externalClass.name).newInstance() + stopList = stopIntance.asInstanceOf[ConfigurableStop] +: stopList + } + } + stopList + + /*val classMap = ClassFinder.classInfoMap(classes) + val plugins = ClassFinder.concreteSubclasses(configurableStopClass,classMap) + plugins.foreach{ + pluginClassInfo => + val plugin = Class.forName(pluginClassInfo.name).newInstance() + stopList = plugin.asInstanceOf[ConfigurableStop] +: stopList + } + stopList*/ + } + + def findAllGroups() : List[String] = { + + val stoplist = findAllConfigurableStop(); + val groupList = stoplist.flatMap(_.getGroup()).distinct + groupList + } + + private def findConfigurableStopInClasspath(bundle : String) : Option[ConfigurableStop] = { + + val classpath = System.getProperty("user.dir")+ "/classpath/" + var stop:Option[ConfigurableStop] = None + + val classpathFile = new File(classpath) + //println("classpath is " + classpath) + val finder = ClassFinder(getJarFile(classpathFile)) + val classes = finder.getClasses + val it = classes.iterator + + while(it.hasNext) { + val externalClass = it.next() + if(externalClass.superClassName.equals(configurableStopClass)){ + if (externalClass.name.equals(bundle)){ + val stopIntance = Class.forName(externalClass.name).newInstance() + stop = Some(stopIntance.asInstanceOf[ConfigurableStop]) + return stop + } + } + } + /*val classMap = ClassFinder.classInfoMap(classes) val plugins = ClassFinder.concreteSubclasses(configurableStopClass,classMap) plugins.foreach{ pluginClassInfo => - //println(pluginString.name) if(pluginClassInfo.name.equals(bundle)){ val plugin = Class.forName(pluginClassInfo.name).newInstance() stop = Some(plugin.asInstanceOf[ConfigurableStop]) return stop } - } + }*/ stop } - def getJarFile(dir : File) : Seq[File] = { - val files = dir.listFiles.filter(! _.isDirectory).filter( _.toString.endsWith(".jar")) + private def getJarFile(dir : File) : Seq[File] = { + val files = dir.listFiles.filter(! _.isDirectory).filter( _.toString.endsWith(".jar")).filter(_.toString.contains("piflow")) files ++ dir.listFiles().filter(_.isDirectory).flatMap(getJarFile) } - /*def main(args: Array[String]): Unit = { + def findConfigurableStop(bundle : String) : ConfigurableStop = { + try{ + println("find ConfigurableStop by Class.forName: " + bundle) + val stop = Class.forName(bundle).newInstance() + stop.asInstanceOf[ConfigurableStop] + }catch{ + + case classNotFoundException:ClassNotFoundException =>{ + println("find ConfigurableStop in Classpath: " + bundle) + val stop : Option[ConfigurableStop] = ClassUtil.findConfigurableStopInClasspath(bundle) + stop match { + case Some(s) => s.asInstanceOf[ConfigurableStop] + case _ => throw new ClassNotFoundException(bundle + " is not found!!!") + } + } + case ex : Exception => { + println("Can not find Configurable: " + bundle) + throw ex + } + } + } + + def findConfigurableStopPropertyDescriptor(bundle : String) : List[PropertyDescriptor] = { + val stopPropertyDesc = ClassUtil.findConfigurableStop(bundle) + stopPropertyDesc.getPropertyDescriptor() + } + + def findConfigurableStopInfo(bundle : String) : String = { + val stop = ClassUtil.findConfigurableStop(bundle) + val propertyDescriptorList:List[PropertyDescriptor] = stop.getPropertyDescriptor() + propertyDescriptorList.foreach(p=> if (p.allowableValues == null) p.allowableValues = List("")) + + val json = + ("StopInfo" -> + ("bundle" -> bundle) ~ + ("owner" -> stop.authorEmail) ~ + ("inportCount" -> stop.inportCount) ~ + ("outportCount" -> stop.outportCount) ~ + ("groups" -> stop.getGroup().mkString(",")) ~ + ("properties" -> + propertyDescriptorList.map { property =>( + ("name" -> property.name) ~ + ("displayName" -> property.displayName) ~ + ("description" -> property.description) ~ + ("defaultValue" -> property.defaultValue) ~ + ("allowableValues" -> property.allowableValues) ~ + ("required" -> property.required.toString) ~ + ("sensitive" -> property.sensitive.toString)) }) ) + val jsonString = compactRender(json) + jsonString + + } + + def main(args: Array[String]): Unit = { //val stop = findConfigurableStop("cn.piflow.bundle.Class1") - val allConfigurableStopList = findAllConfigurableStop() - println("\n\n\n" + allConfigurableStopList) - }*/ + //val allConfigurableStopList = findAllConfigurableStop() + /*val propertyDescriptorList = findConfigurableStopPropertyDescriptor("cn.piflow.bundle.Xjzhu") + var propertyJsonList = List[String]() + propertyDescriptorList.foreach( p => propertyJsonList = p.toJson() +: propertyJsonList ) + val start ="""{"properties":[""" + val end = """]}""" + val str = propertyJsonList.mkString(start, ",", end) + println(str)*/ + + /*val stop = findAllConfigurableStop() + stop.foreach(s => println(s.getClass.getName)) + val temp = 1*/ + + + val stop = findConfigurableStop("cn.piflow.bundle.Xjzhu") + println(stop.getClass.getName) + + + } } diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/ClassFindTest.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/ClassFindTest.scala index 361c078..add38cf 100644 --- a/piflow-bundle/src/test/scala/cn/piflow/bundle/ClassFindTest.scala +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/ClassFindTest.scala @@ -44,14 +44,22 @@ class ClassFindTest { def testFindConfigurableStop() = { val bundle = "cn.piflow.bundle.hive.SelectHiveQL" val stop = findConfigurableStop(bundle) - stop match { - case Some(x) => { - println("Find " + x.getClass.toString + "!!!!!!!!") - //val propertiesDescList = x.getPropertyDescriptor() - //propertiesDescList.foreach(println(_)) - } - case _ => println("Can not find : " + bundle) - } + println("Find " + stop.getClass.toString + "!!!!!!!!") + + } + @Test + def testFindConfigurableStopClassPath() = { + val bundle = "cn.piflow.bundle.Class1" + val stop = findConfigurableStop(bundle) + println("Find " + stop.getClass.toString + "!!!!!!!!") + + } + @Test + def testFindConfigurableStopNotExist() = { + val bundle = "cn.piflow.bundle.Class5" + val stop = findConfigurableStop(bundle) + println("Find " + stop.getClass.toString + "!!!!!!!!") + } @Test diff --git a/piflow-server/src/main/scala/cn/piflow/api/API.scala b/piflow-server/src/main/scala/cn/piflow/api/API.scala index 00e267b..e26dbd3 100644 --- a/piflow-server/src/main/scala/cn/piflow/api/API.scala +++ b/piflow-server/src/main/scala/cn/piflow/api/API.scala @@ -1,14 +1,15 @@ package cn.piflow.api import cn.piflow.Runner -import cn.piflow.conf.bean.FlowBean +import cn.piflow.conf.bean.{FlowBean, PropertyDescriptor} import org.apache.spark.sql.SparkSession -import cn.piflow.conf.util.{FileUtil, OptionUtil} +import cn.piflow.conf.util.{ClassUtil, OptionUtil} import cn.piflow.Process -import cn.piflow.api.util.PropertyUtil +import cn.piflow.api.util.{PropertyUtil} import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet, HttpPost} import org.apache.http.impl.client.HttpClients import org.apache.http.util.EntityUtils +import org.apache.spark.launcher.SparkLauncher import scala.util.parsing.json.JSON @@ -28,10 +29,16 @@ object API { .master(PropertyUtil.getPropertyValue("spark.master")) .appName(flowBean.name) .config("spark.deploy.mode",PropertyUtil.getPropertyValue("spark.deploy.mode")) + .config("spark.hadoop.yarn.resourcemanager.hostname", PropertyUtil.getPropertyValue("yarn.resourcemanager.hostname")) + .config("spark.hadoop.yarn.resourcemanager.address", PropertyUtil.getPropertyValue("yarn.resourcemanager.address")) + .config("spark.yarn.access.namenode", PropertyUtil.getPropertyValue("yarn.access.namenode")) + .config("spark.yarn.stagingDir", PropertyUtil.getPropertyValue("yarn.stagingDir")) + .config("spark.yarn.jars", PropertyUtil.getPropertyValue("yarn.jars")) //.config("spark.driver.memory", "1g") //.config("spark.executor.memory", "1g") //.config("spark.cores.max", "2") .config("spark.jars", PropertyUtil.getPropertyValue("piflow.bundle")) + .config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris")) .enableHiveSupport() .getOrCreate() @@ -41,9 +48,24 @@ object API { .bind("checkpoint.path", PropertyUtil.getPropertyValue("checkpoint.path")) .start(flow); val applicationId = spark.sparkContext.applicationId - process.awaitTermination(); - spark.close(); + //process.awaitTermination(); + //spark.close(); + new Thread( new WaitProcessTerminateRunnable(spark, process)).start() (applicationId,process) + + /*val launcher = new SparkLauncher + launcher.setMaster(PropertyUtil.getPropertyValue("spark.master")) + .setAppName("test") + .setDeployMode(PropertyUtil.getPropertyValue("spark.deploy.mode")) + .setConf("spark.hadoop.yarn.resourcemanager.hostname", PropertyUtil.getPropertyValue("yarn.resourcemanager.hostname")) + .setConf("spark.hadoop.yarn.resourcemanager.address", PropertyUtil.getPropertyValue("yarn.resourcemanager.address")).setConf("spark.yarn.access.namenode", PropertyUtil.getPropertyValue("yarn.access.namenode")) + .setConf("spark.yarn.stagingDir", PropertyUtil.getPropertyValue("yarn.stagingDir")) + .setConf("spark.yarn.jars", PropertyUtil.getPropertyValue("yarn.jars")) + .setConf("spark.jars", PropertyUtil.getPropertyValue("piflow.bundle")) + .setConf("hive.metastore.uris", PropertyUtil.getPropertyValue("hive.metastore.uris")) + .setMainClass("lalla") + .addAppArgs(flowJson)*/ + } def stopFlow(process : Process): String = { @@ -64,4 +86,27 @@ object API { str } + def getStopInfo(bundle : String) : String = { + try{ + + val str = ClassUtil.findConfigurableStopInfo(bundle) + str + }catch{ + case ex : Exception => println(ex);throw ex + } + + } + + def getAllGroups() = { + val groups = ClassUtil.findAllGroups().mkString(",") + """{"groups":"""" + groups + """"}""" + } + +} + +class WaitProcessTerminateRunnable(spark : SparkSession, process: Process) extends Runnable { + override def run(): Unit = { + process.awaitTermination() + //spark.close() + } } diff --git a/piflow-server/src/main/scala/cn/piflow/api/HTTPClientGet.scala b/piflow-server/src/main/scala/cn/piflow/api/HTTPClientGetFlowInfo.scala similarity index 85% rename from piflow-server/src/main/scala/cn/piflow/api/HTTPClientGet.scala rename to piflow-server/src/main/scala/cn/piflow/api/HTTPClientGetFlowInfo.scala index 0c3ea2e..9a3cf70 100644 --- a/piflow-server/src/main/scala/cn/piflow/api/HTTPClientGet.scala +++ b/piflow-server/src/main/scala/cn/piflow/api/HTTPClientGetFlowInfo.scala @@ -9,11 +9,11 @@ import org.apache.http.message.BasicNameValuePair import org.apache.http.util.EntityUtils import org.omg.CORBA.NameValuePair -object HTTPClientGet { +object HTTPClientGetFlowInfo { def main(args: Array[String]): Unit = { - val url = "http://10.0.86.98:8002/flow/info?appID=application_1536562610670_0001" + val url = "http://10.0.86.98:8001/flow/info?appID=application_1536562610670_0005" val client = HttpClients.createDefault() val getFlowInfo:HttpGet = new HttpGet(url) diff --git a/piflow-server/src/main/scala/cn/piflow/api/HTTPClientGetGroups.scala b/piflow-server/src/main/scala/cn/piflow/api/HTTPClientGetGroups.scala new file mode 100644 index 0000000..c554eb1 --- /dev/null +++ b/piflow-server/src/main/scala/cn/piflow/api/HTTPClientGetGroups.scala @@ -0,0 +1,21 @@ +package cn.piflow.api + +import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet} +import org.apache.http.impl.client.HttpClients +import org.apache.http.util.EntityUtils + +object HTTPClientGetGroups { + + def main(args: Array[String]): Unit = { + + val url = "http://10.0.86.98:8001/stop/groups" + val client = HttpClients.createDefault() + val getGroups:HttpGet = new HttpGet(url) + + val response:CloseableHttpResponse = client.execute(getGroups) + val entity = response.getEntity + val str = EntityUtils.toString(entity,"UTF-8") + println("Groups is: " + str) + } + +} diff --git a/piflow-server/src/main/scala/cn/piflow/api/HTTPClient.scala b/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStartFlow.scala similarity index 96% rename from piflow-server/src/main/scala/cn/piflow/api/HTTPClient.scala rename to piflow-server/src/main/scala/cn/piflow/api/HTTPClientStartFlow.scala index f16ce6c..3c37be0 100644 --- a/piflow-server/src/main/scala/cn/piflow/api/HTTPClient.scala +++ b/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStartFlow.scala @@ -6,11 +6,11 @@ import org.apache.http.entity.StringEntity import org.apache.http.impl.client.HttpClients import org.apache.http.util.EntityUtils -object HTTPClient { +object HTTPClientStartFlow { def main(args: Array[String]): Unit = { val json = """{"flow":{"name":"test","uuid":"1234","stops":[{"uuid":"1111","name":"XmlParser","bundle":"cn.piflow.bundle.xml.XmlParser","properties":{"xmlpath":"hdfs://10.0.86.89:9000/xjzhu/dblp.mini.xml","rowTag":"phdthesis"}},{"uuid":"2222","name":"SelectField","bundle":"cn.piflow.bundle.common.SelectField","properties":{"schema":"title,author,pages"}},{"uuid":"3333","name":"PutHiveStreaming","bundle":"cn.piflow.bundle.hive.PutHiveStreaming","properties":{"database":"sparktest","table":"dblp_phdthesis"}},{"uuid":"4444","name":"CsvParser","bundle":"cn.piflow.bundle.csv.CsvParser","properties":{"csvPath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.csv","header":"false","delimiter":",","schema":"title,author,pages"}},{"uuid":"555","name":"Merge","bundle":"cn.piflow.bundle.common.Merge","properties":{}},{"uuid":"666","name":"Fork","bundle":"cn.piflow.bundle.common.Fork","properties":{"outports":["out1","out2","out3"]}},{"uuid":"777","name":"JsonSave","bundle":"cn.piflow.bundle.json.JsonSave","properties":{"jsonSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.json"}},{"uuid":"888","name":"CsvSave","bundle":"cn.piflow.bundle.csv.CsvSave","properties":{"csvSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis_result.csv","header":"true","delimiter":","}}],"paths":[{"from":"XmlParser","outport":"","inport":"","to":"SelectField"},{"from":"SelectField","outport":"","inport":"data1","to":"Merge"},{"from":"CsvParser","outport":"","inport":"data2","to":"Merge"},{"from":"Merge","outport":"","inport":"","to":"Fork"},{"from":"Fork","outport":"out1","inport":"","to":"PutHiveStreaming"},{"from":"Fork","outport":"out2","inport":"","to":"JsonSave"},{"from":"Fork","outport":"out3","inport":"","to":"CsvSave"}]}}""" - val url = "http://10.0.86.98:8002/flow/start" + val url = "http://10.0.86.98:8001/flow/start" val client = HttpClients.createDefault() val post:HttpPost = new HttpPost(url) diff --git a/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStopFlow.scala b/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStopFlow.scala new file mode 100644 index 0000000..87f5788 --- /dev/null +++ b/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStopFlow.scala @@ -0,0 +1,24 @@ +package cn.piflow.api + +import org.apache.http.client.methods.{CloseableHttpResponse, HttpPost} +import org.apache.http.entity.StringEntity +import org.apache.http.impl.client.HttpClients +import org.apache.http.util.EntityUtils + +object HTTPClientStopFlow { + def main(args: Array[String]): Unit = { + val json = """{"appID":"application_1536718350536_0023"}""" + val url = "http://10.0.86.98:8001/flow/stop" + val client = HttpClients.createDefault() + val post:HttpPost = new HttpPost(url) + + post.addHeader("Content-Type", "application/json") + post.setEntity(new StringEntity(json)) + + val response:CloseableHttpResponse = client.execute(post) + val entity = response.getEntity + val str = EntityUtils.toString(entity,"UTF-8") + println(str) + } + +} diff --git a/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala b/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala index b8b6458..408fd8e 100644 --- a/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala +++ b/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala @@ -5,7 +5,6 @@ import java.util.concurrent.CompletionStage import akka.actor.ActorSystem import akka.http.scaladsl.Http - import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport import akka.http.scaladsl.model._ import akka.http.scaladsl.model.HttpMethods._ @@ -60,7 +59,7 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup case HttpEntity.Strict(_, data) =>{ val flowJson = data.utf8String val (appId,process) = API.startFlow(flowJson) - processMap += (process.pid() -> process) + processMap += (appId -> process) Future.successful(HttpResponse(entity = appId)) } @@ -69,18 +68,48 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup } - case HttpRequest(POST, Uri.Path("/flow/stop"), headers, entity, protocol) =>{ + case HttpRequest(POST, Uri.Path("/flow/stop"), headers, entity, protocol) =>{ val data = toJson(entity) - val processId = data.get("processId").getOrElse("").asInstanceOf[String] - if(processId.equals("") || !processMap.contains(processId)){ + val appId = data.get("appID").getOrElse("").asInstanceOf[String] + if(appId.equals("") || !processMap.contains(appId)){ Future.failed(new Exception("Can not found process Error!")) }else{ - val result = API.stopFlow(processMap.get(processId).asInstanceOf[Process]) - Future.successful(HttpResponse(entity = result)) + processMap.get(appId) match { + case Some(process) => + val result = API.stopFlow(process.asInstanceOf[Process]) + Future.successful(HttpResponse(entity = result)) + case _ => + Future.successful(HttpResponse(entity = "Can not found process Error!")) + } + } } + case HttpRequest(GET, Uri.Path("/stop/info"), headers, entity, protocol) =>{ + val bundle = req.getUri().query().getOrElse("bundle","") + if(bundle.equals("")){ + Future.failed(new Exception("Can not found bundle Error!")) + }else{ + try{ + val stopInfo = API.getStopInfo(bundle) + Future.successful(HttpResponse(entity = stopInfo)) + }catch { + case _ => Future.successful(HttpResponse(entity = "Can not found stop properties Error!")) + } + } + } + case HttpRequest(GET, Uri.Path("/stop/groups"), headers, entity, protocol) =>{ + + try{ + val stopGroups = API.getAllGroups() + Future.successful(HttpResponse(entity = stopGroups)) + }catch { + case _ => Future.successful(HttpResponse(entity = "Can not found stop properties Error!")) + } + + } + case _: HttpRequest => Future.successful(HttpResponse(404, entity = "Unknown resource!")) } diff --git a/piflow-server/src/main/scala/cn/piflow/api/HttpClientGetStopInfo.scala b/piflow-server/src/main/scala/cn/piflow/api/HttpClientGetStopInfo.scala new file mode 100644 index 0000000..748fb71 --- /dev/null +++ b/piflow-server/src/main/scala/cn/piflow/api/HttpClientGetStopInfo.scala @@ -0,0 +1,21 @@ +package cn.piflow.api + +import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet} +import org.apache.http.impl.client.HttpClients +import org.apache.http.util.EntityUtils + +object HttpClientGetStopInfo { + + def main(args: Array[String]): Unit = { + + val url = "http://10.0.86.98:8001/stop/info?bundle=cn.piflow.bundle.hive.SelectHiveQL" + val client = HttpClients.createDefault() + val getFlowInfo:HttpGet = new HttpGet(url) + + val response:CloseableHttpResponse = client.execute(getFlowInfo) + val entity = response.getEntity + val str = EntityUtils.toString(entity,"UTF-8") + println("Property Desc is " + str) + } + +} diff --git a/pom.xml b/pom.xml index 69bde91..ad7d777 100644 --- a/pom.xml +++ b/pom.xml @@ -54,7 +54,7 @@ spark-yarn_2.11 ${spark.version} - + diff --git a/readMe.txt b/readMe.txt index fa30ac1..72c4597 100644 --- a/readMe.txt +++ b/readMe.txt @@ -6,3 +6,6 @@ mvn install:install-file -Dfile=/opt/project/piflow/piflow-bundle/lib/spark-xml_ clean package -Dmaven.test.skip=true -U +3.set SPARK_HOME in Configurations + Edit Configurations --> Application(HttpService) --> Configurations --> Environment Variable +