This commit is contained in:
xiaoxiao 2018-09-27 12:23:48 +08:00
commit 54b7a8509b
50 changed files with 530 additions and 375 deletions

Binary file not shown.

View File

@ -4,8 +4,14 @@ server.port=8001
#spark.master=spark://10.0.86.89:7077
#spark.master=spark://10.0.86.191:7077
spark.master=yarn
spark.deploy.mode=cluster
spark.deploy.mode=client
yarn.resourcemanager.hostname=10.0.86.191
yarn.resourcemanager.address=10.0.86.191:8032
yarn.access.namenode=hdfs://10.0.86.191:9000
yarn.stagingDir=hdfs://10.0.86.191:9000/tmp/
yarn.jars=hdfs://10.0.86.191:9000/user/spark/share/lib/*.jar
hive.metastore.uris=thrift://10.0.86.191:9083
piflow.bundle=/opt/project/piflow/out/artifacts/piflow_bundle/piflow-bundle.jar

View File

@ -16,16 +16,22 @@
<artifactId>piflow-core</artifactId>
<version>0.9</version>
</dependency>
<!--<dependency>
<groupId>piflow</groupId>
<artifactId>piflow-conf</artifactId>
<version>0.9</version>
</dependency>-->
<dependency>
<groupId>org.clapper</groupId>
<artifactId>classutil_2.11</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
<version>0.9.11</version>
</dependency>
<dependency>
<groupId>com.chuusai</groupId>
<artifactId>shapeless_2.11</artifactId>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>com.sksamuel.scrimage</groupId>

View File

@ -1,17 +0,0 @@
<configuration>
<!-- 指定hdfs的nameservice为ns1 -->
<property>
<name>fs.defaultFS</name>
<value>hdfs://10.0.86.191:9000</value>
</property>
<!-- Size of read/write buffer used in SequenceFiles. -->
<property>
<name>io.file.buffer.size</name>
<value>131072</value>
</property>
<!-- 指定hadoop临时目录,自行创建 -->
<property>
<name>hadoop.tmp.dir</name>
<value>/opt/hadoop-2.6.0/tmp</value>
</property>
</configuration>

View File

@ -1,44 +0,0 @@
<configuration>
<!-- 设置namenode的http通讯地址 -->
<property>
<name>dfs.namenode.http-address</name>
<value>10.0.86.191:50070</value>
</property>
<!-- 设置secondarynamenode的http通讯地址 -->
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>10.0.86.191:50090</value>
</property>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<!-- 设置namenode存放的路径 -->
<property>
<name>dfs.namenode.name.dir</name>
<value>file:/opt/hadoop-2.6.0/dfs/name</value>
</property>
<!-- 设置datanode存放的路径 -->
<property>
<name>dfs.datanode.data.dir</name>
<value>file:/opt/hadoop-2.6.0/dfs/data</value>
</property>
<property>
<name>dfs.client.read.shortcircuit</name>
<value>true</value>
</property>
<property>
<name>dfs.domain.socket.path</name>
<value>/var/run/hadoop-hdfs/dn._PORT</value>
</property>
<property>
<name>dfs.client.file-block-storage-locations.timeout</name>
<value>10000</value>
</property>
<property>
<name>dfs.datanode.hdfs-blocks-metadata.enabled</name>
<value>true</value>
</property>
</configuration>

View File

@ -1,90 +0,0 @@
<configuration>
<property>
<name>hive.metastore.warehouse.dir</name>
<!--之前hdfs中创建的warehouse文件夹-->
<value>/user/hive/warehouse</value>
<description>location of default database for the warehouse</description>
</property>
<property>
<name>hive.metastore.uris</name>
<value>thrift://10.0.86.89:9083</value>
<description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>
</property>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://10.0.86.90:3306/hive?createDatabaseIfNotExist=true</value>
<description>JDBC connect string for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
<description>Driver class name for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
<description>username to use against metastore database</description>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>root</value>
<description>password to use against metastore database</description>
</property>
<!--start for trancaction -->
<property>
<name>hive.support.concurrency</name>
<value>true</value>
</property>
<property>
<name>hive.enforce.bucketing</name>
<value>true</value>
</property>
<property>
<name>hive.exec.dynamic.partition.mode</name>
<value>nonstrict</value>
</property>
<property>
<name>hive.txn.manager</name>
<value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
</property>
<property>
<name>hive.compactor.initiator.on</name>
<value>true</value>
</property>
<property>
<name>hive.compactor.worker.threads</name>
<value>1</value>
</property>
<!-- property>
<name>hive.in.test</name>
<value>true</value>
</propertyi-->
</configuration>

View File

@ -1,52 +0,0 @@
<configuration>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>10.0.86.191</value>
</property>
<property>
<description>The address of the applications manager interface in the RM.</description>
<name>yarn.resourcemanager.address</name>
<value>${yarn.resourcemanager.hostname}:8032</value>
</property>
<property>
<description>The address of the scheduler interface.</description>
<name>yarn.resourcemanager.scheduler.address</name>
<value>${yarn.resourcemanager.hostname}:8030</value>
</property>
<property>
<description>The http address of the RM web application.</description>
<name>yarn.resourcemanager.webapp.address</name>
<value>${yarn.resourcemanager.hostname}:8088</value>
</property>
<property>
<description>The https adddress of the RM web application.</description>
<name>yarn.resourcemanager.webapp.https.address</name>
<value>${yarn.resourcemanager.hostname}:8090</value>
</property>
<property>
<name>yarn.resourcemanager.resource-tracker.address</name>
<value>${yarn.resourcemanager.hostname}:8031</value>
</property>
<property>
<description>The address of the RM admin interface.</description>
<name>yarn.resourcemanager.admin.address</name>
<value>${yarn.resourcemanager.hostname}:8033</value>
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.scheduler.maximum-allocation-mb</name>
<value>12288</value>
<discription>每个节点可用内存,单位MB,默认8182MB</discription>
</property>
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
</configuration>

View File

@ -4,12 +4,13 @@ import java.beans.Transient
import cn.piflow.bundle.util.CleanUtil
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.conf.{CleanGroup,ConfigurableStop, StopGroup}
import cn.piflow.conf.{CleanGroup, ConfigurableStop, StopGroup, StopGroupEnum}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.MapUtil
import org.apache.spark.sql.SparkSession
class EmailClean extends ConfigurableStop{
val authorEmail: String = "xiaoxiao@cnic.cn"
val inportCount: Int = 0
val outportCount: Int = 1
var columnName:String=_
@ -46,8 +47,8 @@ class EmailClean extends ConfigurableStop{
override def getIcon(): Array[Byte] = ???
override def getGroup(): StopGroup = {
CleanGroup
override def getGroup(): List[String] = {
List(StopGroupEnum.CleanGroup.toString)
}
}

View File

@ -6,7 +6,7 @@ import java.util.{Calendar, Date}
import cn.piflow.bundle.util.CleanUtil
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.conf.{CleanGroup, ConfigurableStop, FileGroup, StopGroup}
import cn.piflow.conf._
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.MapUtil
import org.apache.spark.sql.SparkSession
@ -14,6 +14,7 @@ import org.apache.spark.sql.SparkSession
import scala.reflect.macros.ParseException
class IdentityNumberClean extends ConfigurableStop{
val authorEmail: String = "xiaoxiao@cnic.cn"
val inportCount: Int = 0
val outportCount: Int = 1
//var regex:String =_
@ -51,8 +52,7 @@ class IdentityNumberClean extends ConfigurableStop{
override def getIcon(): Array[Byte] = ???
override def getGroup(): StopGroup = {
CleanGroup
override def getGroup(): List[String] = {
List(StopGroupEnum.CleanGroup.toString)
}
}

View File

@ -2,12 +2,13 @@ package cn.piflow.bundle.clean
import cn.piflow.bundle.util.CleanUtil
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.conf.{CleanGroup, ConfigurableStop, StopGroup}
import cn.piflow.conf.{CleanGroup, ConfigurableStop, StopGroup, StopGroupEnum}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.MapUtil
import org.apache.spark.sql.SparkSession
class PhoneNumberClean extends ConfigurableStop{
val authorEmail: String = "xiaoxiao@cnic.cn"
val inportCount: Int = 0
val outportCount: Int = 1
var columnName:String=_
@ -45,8 +46,8 @@ class PhoneNumberClean extends ConfigurableStop{
override def getIcon(): Array[Byte] = ???
override def getGroup(): StopGroup = {
CleanGroup
override def getGroup(): List[String] = {
List(StopGroupEnum.CleanGroup.toString)
}
}

View File

@ -2,12 +2,13 @@ package cn.piflow.bundle.clean
import cn.piflow.bundle.util.CleanUtil
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.conf.{CleanGroup, ConfigurableStop, StopGroup}
import cn.piflow.conf.{CleanGroup, ConfigurableStop, StopGroup, StopGroupEnum}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.MapUtil
import org.apache.spark.sql.SparkSession
class TitleClean extends ConfigurableStop{
val authorEmail: String = "xiaoxiao@cnic.cn"
val inportCount: Int = 0
val outportCount: Int = 1
var columnName:String=_
@ -43,8 +44,8 @@ class TitleClean extends ConfigurableStop{
override def getIcon(): Array[Byte] = ???
override def getGroup(): StopGroup = {
CleanGroup
override def getGroup(): List[String] = {
List(StopGroupEnum.CleanGroup.toString)
}
}

View File

@ -1,6 +1,6 @@
package cn.piflow.bundle.common
import cn.piflow.conf.{CommonGroup, ConfigurableStop, StopGroup}
import cn.piflow.conf.{CommonGroup, ConfigurableStop, StopGroup, StopGroupEnum}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.MapUtil
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
@ -9,6 +9,7 @@ import scala.beans.BeanProperty
class Fork extends ConfigurableStop{
val authorEmail: String = "xjzhu@cnic.cn"
val inportCount: Int = 1
val outportCount: Int = -1
@ -30,8 +31,8 @@ class Fork extends ConfigurableStop{
override def getIcon(): Array[Byte] = ???
override def getGroup(): StopGroup = {
CommonGroup
override def getGroup(): List[String] = {
List(StopGroupEnum.CommonGroup.toString)
}
}

View File

@ -1,6 +1,6 @@
package cn.piflow.bundle.common
import cn.piflow.conf.{CommonGroup, ConfigurableStop, StopGroup}
import cn.piflow.conf.{CommonGroup, ConfigurableStop, StopGroup, StopGroupEnum}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
@ -8,6 +8,7 @@ import scala.beans.BeanProperty
class Merge extends ConfigurableStop{
val authorEmail: String = "xjzhu@cnic.cn"
val inportCount: Int = -1
val outportCount: Int = 1
@ -29,8 +30,8 @@ class Merge extends ConfigurableStop{
override def getIcon(): Array[Byte] = ???
override def getGroup(): StopGroup = {
CommonGroup
override def getGroup(): List[String] = {
List(StopGroupEnum.CommonGroup.toString)
}
}

View File

@ -1,7 +1,7 @@
package cn.piflow.bundle.common
import cn.piflow._
import cn.piflow.conf.{CommonGroup, ConfigurableStop, StopGroup}
import cn.piflow.conf.{CommonGroup, ConfigurableStop, StopGroup, StopGroupEnum}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.MapUtil
import org.apache.spark.sql.{Column, DataFrame}
@ -11,6 +11,7 @@ import scala.beans.BeanProperty
class SelectField extends ConfigurableStop {
val authorEmail: String = "xjzhu@cnic.cn"
val inportCount: Int = 1
val outportCount: Int = 1
@ -44,8 +45,8 @@ class SelectField extends ConfigurableStop {
override def getIcon(): Array[Byte] = ???
override def getGroup(): StopGroup = {
CommonGroup
override def getGroup(): List[String] = {
List(StopGroupEnum.CommonGroup.toString)
}
}

View File

@ -1,7 +1,7 @@
package cn.piflow.bundle.csv
import cn.piflow._
import cn.piflow.conf.{ConfigurableStop, CsvGroup, HiveGroup, StopGroup}
import cn.piflow.conf._
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.MapUtil
import org.apache.spark.sql.types.{StringType, StructField, StructType}
@ -10,6 +10,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
class CsvParser extends ConfigurableStop{
val authorEmail: String = "xjzhu@cnic.cn"
val inportCount: Int = 0
val outportCount: Int = 1
@ -68,8 +69,8 @@ class CsvParser extends ConfigurableStop{
override def getIcon(): Array[Byte] = ???
def getGroup() : StopGroup = {
CsvGroup
override def getGroup(): List[String] = {
List(StopGroupEnum.CsvGroup.toString)
}
}

View File

@ -3,12 +3,13 @@ package cn.piflow.bundle.csv
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.conf.{ConfigurableStop, CsvGroup, StopGroup}
import cn.piflow.conf.{ConfigurableStop, CsvGroup, StopGroup, StopGroupEnum}
import org.apache.spark.sql.SaveMode
class CsvSave extends ConfigurableStop{
override val inportCount: Int = 1
override val outportCount: Int = 0
val authorEmail: String = "xjzhu@cnic.cn"
val inportCount: Int = 1
val outportCount: Int = 0
var csvSavePath: String = _
var header: Boolean = _
@ -42,8 +43,8 @@ class CsvSave extends ConfigurableStop{
ImageUtil.getImage("./src/main/resources/selectHiveQL.jpg")
}
override def getGroup(): StopGroup = {
CsvGroup
override def getGroup(): List[String] = {
List(StopGroupEnum.CsvGroup.toString)
}
override def initialize(ctx: ProcessContext): Unit = {

View File

@ -3,13 +3,14 @@ package cn.piflow.bundle.file
import java.net.URI
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.conf.{ConfigurableStop, FileGroup, StopGroup}
import cn.piflow.conf.{ConfigurableStop, FileGroup, StopGroup, StopGroupEnum}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.MapUtil
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
class FetchFile extends ConfigurableStop{
val authorEmail: String = "xiaoxiao@cnic.cn"
val inportCount: Int = 0
val outportCount: Int = 1
var hdfs_path:String =_
@ -53,8 +54,8 @@ class FetchFile extends ConfigurableStop{
override def getIcon(): Array[Byte] = ???
override def getGroup(): StopGroup = {
FileGroup
override def getGroup(): List[String] = {
List(StopGroupEnum.FileGroup.toString)
}

View File

@ -6,13 +6,14 @@ import java.net.{HttpURLConnection, URI, URL}
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.conf.{ConfigurableStop, FileGroup, StopGroup}
import cn.piflow.conf.{ConfigurableStop, FileGroup, StopGroup, StopGroupEnum}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.MapUtil
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.SparkSession
class PutFile extends ConfigurableStop{
val authorEmail: String = "xiaoxiao@cnic.cn"
val inportCount: Int = 0
val outportCount: Int = 1
var hdfs_path:String =_
@ -56,8 +57,8 @@ class PutFile extends ConfigurableStop{
override def getIcon(): Array[Byte] = ???
override def getGroup(): StopGroup = {
FileGroup
override def getGroup(): List[String] = {
List(StopGroupEnum.FileGroup.toString)
}

View File

@ -3,12 +3,13 @@ package cn.piflow.bundle.file
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.MapUtil
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.conf.{ConfigurableStop, FileGroup, StopGroup}
import cn.piflow.conf.{ConfigurableStop, FileGroup, StopGroup, StopGroupEnum}
import org.apache.spark.sql.SparkSession
class RegexTextProcess extends ConfigurableStop{
val authorEmail: String = "xiaoxiao@cnic.cn"
val inportCount: Int = 0
val outportCount: Int = 1
var regex:String =_
@ -55,7 +56,7 @@ class RegexTextProcess extends ConfigurableStop{
override def getIcon(): Array[Byte] = ???
override def getGroup(): StopGroup = {
FileGroup
override def getGroup(): List[String] = {
List(StopGroupEnum.FileGroup.toString)
}
}

View File

@ -5,7 +5,7 @@ import java.net.{HttpURLConnection, URL}
import java.util
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.conf.{ConfigurableStop, FtpGroup, HttpGroup, StopGroup}
import cn.piflow.conf.{ConfigurableStop, FtpGroup, HttpGroup, StopGroup, StopGroupEnum}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.MapUtil
import sun.net.ftp.{FtpClient, FtpDirEntry}
@ -13,6 +13,7 @@ import sun.net.ftp.{FtpClient, FtpDirEntry}
import scala.reflect.io.Directory
class LoadFromFtp extends ConfigurableStop{
val authorEmail: String = "xiaoxiao@cnic.cn"
val inportCount: Int = 0
val outportCount: Int = 1
var url_str:String =_
@ -155,8 +156,8 @@ class LoadFromFtp extends ConfigurableStop{
override def getIcon(): Array[Byte] = ???
override def getGroup(): StopGroup = {
FtpGroup
override def getGroup(): List[String] = {
List(StopGroupEnum.FtpGroup.toString)
}

View File

@ -4,7 +4,7 @@ import java.io.{DataOutputStream, File, InputStream, OutputStream}
import java.util
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.conf.{ConfigurableStop, FtpGroup, StopGroup}
import cn.piflow.conf.{ConfigurableStop, FtpGroup, StopGroup, StopGroupEnum}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.MapUtil
import sun.net.TelnetOutputStream
@ -13,6 +13,7 @@ import sun.net.ftp.{FtpClient, FtpDirEntry}
import scala.reflect.io.Directory
class UploadToFtp extends ConfigurableStop{
val authorEmail: String = "xiaoxiao@cnic.cn"
val inportCount: Int = 0
val outportCount: Int = 1
var url_str:String =_
@ -169,8 +170,7 @@ class UploadToFtp extends ConfigurableStop{
override def getIcon(): Array[Byte] = ???
override def getGroup(): StopGroup = {
FtpGroup
}
override def getGroup(): List[String] = {
List(StopGroupEnum.FtpGroup.toString)
}
}

View File

@ -2,12 +2,13 @@ package cn.piflow.bundle.hive
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.MapUtil
import cn.piflow.conf.{ConfigurableStop, HiveGroup, StopGroup}
import cn.piflow.conf.{ConfigurableStop, HiveGroup, StopGroup, StopGroupEnum}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.spark.sql.SparkSession
class PutHiveQL extends ConfigurableStop {
val authorEmail: String = "xiaoxiao@cnic.cn"
val inportCount: Int = 0
val outportCount: Int = 1
@ -56,8 +57,8 @@ class PutHiveQL extends ConfigurableStop {
override def getIcon(): Array[Byte] = ???
override def getGroup(): StopGroup = {
HiveGroup
override def getGroup(): List[String] = {
List(StopGroupEnum.HiveGroup.toString)
}
}

View File

@ -1,7 +1,7 @@
package cn.piflow.bundle.hive
import cn.piflow._
import cn.piflow.conf.{ConfigurableStop, HiveGroup, StopGroup}
import cn.piflow.conf.{ConfigurableStop, HiveGroup, StopGroup, StopGroupEnum}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.MapUtil
import org.apache.spark.sql.SparkSession
@ -10,6 +10,7 @@ import scala.beans.BeanProperty
class PutHiveStreaming extends ConfigurableStop {
val authorEmail: String = "xjzhu@cnic.cn"
val inportCount: Int = 1
val outportCount: Int = 0
@ -40,8 +41,9 @@ class PutHiveStreaming extends ConfigurableStop {
override def getIcon(): Array[Byte] = ???
override def getGroup(): StopGroup = {
HiveGroup
override def getGroup(): List[String] = {
List(StopGroupEnum.HiveGroup.toString)
}
}

View File

@ -2,7 +2,7 @@ package cn.piflow.bundle.hive
import cn.piflow._
import cn.piflow.conf.util.ImageUtil
import cn.piflow.conf.{ConfigurableStop, HiveGroup, StopGroup}
import cn.piflow.conf.{ConfigurableStop, HiveGroup, StopGroup, StopGroupEnum}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import org.apache.spark.sql.SparkSession
@ -13,6 +13,7 @@ import scala.beans.BeanProperty
class SelectHiveQL extends ConfigurableStop {
val authorEmail: String = "xjzhu@cnic.cn"
val inportCount: Int = 0
val outportCount: Int = 1
@ -38,7 +39,7 @@ class SelectHiveQL extends ConfigurableStop {
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val hiveQL = new PropertyDescriptor().name("hiveQL").displayName("HiveQL").defaultValue("").required(true)
val hiveQL = new PropertyDescriptor().name("hiveQL").displayName("HiveQL").defaultValue("").allowableValues(Set("")).required(true)
descriptor = hiveQL :: descriptor
descriptor
}
@ -47,10 +48,11 @@ class SelectHiveQL extends ConfigurableStop {
ImageUtil.getImage("./src/main/resources/selectHiveQL.jpg")
}
override def getGroup(): StopGroup = {
HiveGroup
override def getGroup(): List[String] = {
List(StopGroupEnum.HiveGroup.toString)
}
}

View File

@ -3,13 +3,14 @@ package cn.piflow.bundle.http
import java.io._
import java.net.{HttpURLConnection, URL}
import cn.piflow.conf.{ConfigurableStop, HttpGroup, StopGroup}
import cn.piflow.conf.{ConfigurableStop, HttpGroup, StopGroup, StopGroupEnum}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.MapUtil
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.spark.sql.SparkSession
class LoadZipFromUrl extends ConfigurableStop{
val authorEmail: String = "xiaoxiao@cnic.cn"
val inportCount: Int = 0
val outportCount: Int = 1
var url_str:String =_
@ -81,8 +82,9 @@ class LoadZipFromUrl extends ConfigurableStop{
override def getIcon(): Array[Byte] = ???
override def getGroup(): StopGroup = {
HttpGroup
override def getGroup(): List[String] = {
List(StopGroupEnum.HttpGroup.toString)
}
}

View File

@ -4,13 +4,14 @@ import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream}
import java.lang.Exception
import java.util.zip.GZIPInputStream
import cn.piflow.conf.{ConfigurableStop, HiveGroup, HttpGroup, StopGroup}
import cn.piflow.conf._
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.commons.compress.archivers.tar.{TarArchiveEntry, TarArchiveInputStream}
import org.apache.spark.sql.{DataFrame, SparkSession}
class UnGZip extends ConfigurableStop {
val authorEmail: String = "xiaoxiao@cnic.cn"
val inportCount: Int = 0
val outportCount: Int = 1
val fileTypes:List[String]=List("tar.gz","tar","gz")
@ -207,8 +208,8 @@ class UnGZip extends ConfigurableStop {
override def getIcon(): Array[Byte] = ???
override def getGroup(): StopGroup = {
HttpGroup
override def getGroup(): List[String] = {
List(StopGroupEnum.HttpGroup.toString)
}
}

View File

@ -1,7 +1,7 @@
package cn.piflow.bundle.jdbc
import cn.piflow._
import cn.piflow.conf.{ConfigurableStop, JdbcGroup, StopGroup}
import cn.piflow.conf.{ConfigurableStop, JdbcGroup, StopGroup, StopGroupEnum}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.MapUtil
import org.apache.spark.sql.SparkSession
@ -9,6 +9,7 @@ import org.apache.spark.sql.SparkSession
import scala.beans.BeanProperty
class JdbcRead extends ConfigurableStop {
val authorEmail: String = "xjzhu@cnic.cn"
val inportCount: Int = 0
val outportCount: Int = 1
@ -50,8 +51,8 @@ class JdbcRead extends ConfigurableStop {
override def getIcon(): Array[Byte] = ???
override def getGroup(): StopGroup = {
JdbcGroup
override def getGroup(): List[String] = {
List(StopGroupEnum.JdbcGroup.toString)
}
}

View File

@ -3,7 +3,7 @@ package cn.piflow.bundle.jdbc
import java.util.Properties
import cn.piflow._
import cn.piflow.conf.{ConfigurableStop, JdbcGroup, StopGroup}
import cn.piflow.conf.{ConfigurableStop, JdbcGroup, StopGroup, StopGroupEnum}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.MapUtil
import org.apache.spark.sql.{SaveMode, SparkSession}
@ -12,6 +12,7 @@ import scala.beans.BeanProperty
class JdbcWrite extends ConfigurableStop{
val authorEmail: String = "xjzhu@cnic.cn"
val inportCount: Int = 1
val outportCount: Int = 0
@ -46,8 +47,9 @@ class JdbcWrite extends ConfigurableStop{
override def getIcon(): Array[Byte] = ???
override def getGroup(): StopGroup = {
JdbcGroup
override def getGroup(): List[String] = {
List(StopGroupEnum.JdbcGroup.toString)
}
}

View File

@ -1,7 +1,7 @@
package cn.piflow.bundle.json
import cn.piflow._
import cn.piflow.conf.{ConfigurableStop, JsonGroup, StopGroup}
import cn.piflow.conf.{ConfigurableStop, JsonGroup, StopGroup, StopGroupEnum}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.MapUtil
import org.apache.spark.sql.SparkSession
@ -10,8 +10,9 @@ import scala.beans.BeanProperty
class JsonPathParser extends ConfigurableStop{
@BeanProperty val inportCount: Int = 1
@BeanProperty val outportCount: Int = 1
val authorEmail: String = "xjzhu@cnic.cn"
val inportCount: Int = 1
val outportCount: Int = 1
var jsonPath: String = _
var tag : String = _
@ -40,14 +41,15 @@ class JsonPathParser extends ConfigurableStop{
override def getIcon(): Array[Byte] = ???
override def getGroup(): StopGroup = {
JsonGroup
override def getGroup(): List[String] = {
List(StopGroupEnum.JsonGroup.toString)
}
}
class JsonStringParser extends ConfigurableStop{
val authorEmail: String = "xjzhu@cnic.cn"
val inportCount: Int = 1
val outportCount: Int = 1
@ -75,8 +77,9 @@ class JsonStringParser extends ConfigurableStop{
override def getIcon(): Array[Byte] = ???
override def getGroup(): StopGroup = {
JsonGroup
override def getGroup(): List[String] = {
List(StopGroupEnum.JsonGroup.toString)
}
}

View File

@ -1,7 +1,7 @@
package cn.piflow.bundle.json
import cn.piflow._
import cn.piflow.conf.{ConfigurableStop, JsonGroup, StopGroup}
import cn.piflow.conf.{ConfigurableStop, JsonGroup, StopGroup, StopGroupEnum}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.MapUtil
import org.apache.spark.sql.SaveMode
@ -10,6 +10,7 @@ import scala.beans.BeanProperty
class JsonSave extends ConfigurableStop{
val authorEmail: String = "xjzhu@cnic.cn"
val inportCount: Int = 1
val outportCount: Int = 0
@ -35,8 +36,9 @@ class JsonSave extends ConfigurableStop{
override def getIcon(): Array[Byte] = ???
override def getGroup(): StopGroup = {
JsonGroup
override def getGroup(): List[String] = {
List(StopGroupEnum.JsonGroup.toString)
}
}

View File

@ -2,7 +2,7 @@ package cn.piflow.bundle.script
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.conf.{CommonGroup, ConfigurableStop, ScriptGroup, StopGroup}
import cn.piflow.conf._
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
@ -11,6 +11,7 @@ import scala.beans.BeanProperty
class DataFrameRowParser extends ConfigurableStop{
val authorEmail: String = "xjzhu@cnic.cn"
val inportCount: Int = 1
val outportCount: Int = 1
@ -28,10 +29,11 @@ class DataFrameRowParser extends ConfigurableStop{
ImageUtil.getImage("./src/main/resources/DataFrameParse.jpg")
}
override def getGroup(): StopGroup = {
ScriptGroup
override def getGroup(): List[String] = {
List(StopGroupEnum.ScriptGroup.toString)
}
override def initialize(ctx: ProcessContext): Unit = {}
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {

View File

@ -3,7 +3,7 @@ package cn.piflow.bundle.script
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.conf.{ConfigurableStop, ScriptGroup, StopGroup}
import cn.piflow.conf.{ConfigurableStop, ScriptGroup, StopGroup, StopGroupEnum}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
@ -12,6 +12,7 @@ import sys.process._
class ShellExecutor extends ConfigurableStop{
val authorEmail: String = "xjzhu@cnic.cn"
val inportCount: Int = 0
val outportCount: Int = 1
@ -32,8 +33,8 @@ class ShellExecutor extends ConfigurableStop{
ImageUtil.getImage("./src/main/resources/ShellExecutor.jpg")
}
override def getGroup(): StopGroup = {
ScriptGroup
override def getGroup(): List[String] = {
List(StopGroupEnum.ScriptGroup.toString)
}
override def initialize(ctx: ProcessContext): Unit = {

View File

@ -4,7 +4,7 @@ import java.net.URI
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.MapUtil
import cn.piflow.conf.{ConfigurableStop, StopGroup, XmlGroup}
import cn.piflow.conf.{ConfigurableStop, StopGroup, StopGroupEnum, XmlGroup}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
@ -17,8 +17,9 @@ import scala.util.control.Breaks._
* Created by admin on 2018/8/27.
*/
class FolderXmlParser extends ConfigurableStop{
override val inportCount: Int = -1
override val outportCount: Int = 1
val authorEmail: String = "lijie"
val inportCount: Int = -1
val outportCount: Int = 1
var rowTag:String = _
var xmlpath:String = _
@ -44,8 +45,8 @@ class FolderXmlParser extends ConfigurableStop{
override def getIcon(): Array[Byte] = ???
override def getGroup(): StopGroup = {
XmlGroup
override def getGroup(): List[String] = {
List(StopGroupEnum.XmlGroup.toString)
}
override def initialize(ctx: ProcessContext): Unit ={}

View File

@ -1,7 +1,7 @@
package cn.piflow.bundle.xml
import cn.piflow._
import cn.piflow.conf.{ConfigurableStop, StopGroup, XmlGroup}
import cn.piflow.conf.{ConfigurableStop, StopGroup, StopGroupEnum, XmlGroup}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.MapUtil
import org.apache.spark.sql.SparkSession
@ -11,6 +11,7 @@ import scala.beans.BeanProperty
class XmlParser extends ConfigurableStop {
val authorEmail: String = "xjzhu@cnic.cn"
val inportCount: Int = 1
val outportCount: Int = 1
@ -49,8 +50,8 @@ class XmlParser extends ConfigurableStop {
override def getIcon(): Array[Byte] = ???
override def getGroup(): StopGroup = {
XmlGroup
override def getGroup(): List[String] = {
List(StopGroupEnum.XmlGroup.toString)
}
}

View File

@ -1,7 +1,7 @@
package cn.piflow.bundle.xml
import cn.piflow._
import cn.piflow.conf.{ConfigurableStop, StopGroup, XmlGroup}
import cn.piflow.conf.{ConfigurableStop, StopGroup, StopGroupEnum, XmlGroup}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.MapUtil
import org.codehaus.jackson.map.ext.CoreXMLSerializers.XMLGregorianCalendarSerializer
@ -10,6 +10,7 @@ import scala.beans.BeanProperty
class XmlSave extends ConfigurableStop{
val authorEmail: String = "xjzhu@cnic.cn"
val inportCount: Int = 1
val outportCount: Int = 0
@ -34,8 +35,8 @@ class XmlSave extends ConfigurableStop{
override def getIcon(): Array[Byte] = ???
override def getGroup(): StopGroup = {
XmlGroup
override def getGroup(): List[String] = {
List(StopGroupEnum.XmlGroup.toString)
}
}

View File

@ -4,17 +4,19 @@ import cn.piflow.Stop
import cn.piflow.conf.bean.PropertyDescriptor
trait ConfigurableStop extends Stop{
abstract class ConfigurableStop extends Stop{
val authorEmail : String
val inportCount : Int
val outportCount : Int
def setProperties(map: Map[String, Any])
def getPropertyDescriptor() : List[PropertyDescriptor]
def getIcon() : Array[Byte]
def getGroup() : StopGroup
def getGroup() : List[String]
}

View File

@ -0,0 +1,18 @@
package cn.piflow.conf
object StopGroupEnum extends Enumeration {
type StopGroup = Value
val CommonGroup = Value("CommonGroup")
val CsvGroup = Value("CsvGroup")
val HiveGroup = Value("HiveGroup")
val JdbcGroup = Value("JdbcGroup")
val JsonGroup = Value("JsonGroup")
val XmlGroup = Value("XmlGroup")
val HttpGroup = Value("HttpGroup")
val FtpGroup = Value("FtpGroup")
val ScriptGroup = Value("ScriptGroup")
val FileGroup = Value("FileGroup")
val CleanGroup = Value("CleanGroup")
}

View File

@ -1,5 +1,8 @@
package cn.piflow.conf.bean
import net.liftweb.json.JsonDSL._
import net.liftweb.json._
class PropertyDescriptor {
var name : String = _
var displayName : String = _
@ -38,6 +41,22 @@ class PropertyDescriptor {
this.sensitive = sensitive
this
}
def toJson():String = {
val allowableValueStr = if(this.allowableValues == null) "" else this.allowableValues.mkString(",")
val json =
("property" ->
("name" -> this.name) ~
("displayName" -> this.displayName) ~
("description" -> this.description) ~
("defaultValue" -> this.defaultValue) ~
("allowableValues" -> allowableValueStr) ~
("required" -> this.required.toString) ~
("sensitive" -> this.sensitive.toString))
val jsonString = compactRender(json)
jsonString
}
}
object PropertyDescriptor{

View File

@ -19,33 +19,14 @@ class StopBean {
}
def constructStop() : ConfigurableStop = {
//val stop = Class.forName(this.bundle).getConstructor(classOf[Map[String, String]]).newInstance(this.properties)
/*val stop = Class.forName(this.bundle).newInstance()
stop.asInstanceOf[ConfigurableStop].setProperties(this.properties)
stop.asInstanceOf[ConfigurableStop]*/
try{
val stop = Class.forName(this.bundle).newInstance()
stop.asInstanceOf[ConfigurableStop].setProperties(this.properties)
stop.asInstanceOf[ConfigurableStop]
}catch{
case classNotFoundException:ClassNotFoundException =>{
val stop : Option[ConfigurableStop] = ClassUtil.findConfigurableStop(this.bundle)
stop match {
case Some(s) => {
s.asInstanceOf[ConfigurableStop].setProperties(this.properties)
s.asInstanceOf[ConfigurableStop]
}
case _ => throw new ClassNotFoundException(this.bundle + " is not found!!!")
}
}
case _ => throw new ClassNotFoundException(this.bundle + " is not found!!!")
val stop = ClassUtil.findConfigurableStop(this.bundle)
stop.setProperties(this.properties)
stop
}catch {
case ex : Exception => throw ex
}
}
}

View File

@ -3,7 +3,12 @@ package cn.piflow.conf.util
import java.io.File
import cn.piflow.conf.ConfigurableStop
import cn.piflow.conf.bean.PropertyDescriptor
import net.liftweb.json.compactRender
import org.clapper.classutil.ClassFinder
import org.reflections.Reflections
import net.liftweb.json.JsonDSL._
import net.liftweb.json._
object ClassUtil {
@ -11,11 +16,9 @@ object ClassUtil {
val configurableStopClass:String = "cn.piflow.conf.ConfigurableStop"
//val classpath:String = "/opt/project/piflow/classpath"
val classpath = System.getProperty("user.dir")+ "/classpath/"
def findAllConfigurableStop() : List[String] = {
/*def findAllConfigurableStopByClassFinder() : List[String] = {
val classpath = System.getProperty("user.dir")
var stopList : List[String] = List()
val classpathFile = new File(classpath)
@ -33,39 +36,177 @@ object ClassUtil {
}
stopList
}*/
def findAllConfigurableStop() : List[ConfigurableStop] = {
var stopList:List[ConfigurableStop] = List()
//find internal stop
val reflections = new Reflections("")
val allClasses = reflections.getSubTypesOf(classOf[ConfigurableStop])
val it = allClasses.iterator();
while(it.hasNext) {
val plugin = Class.forName(it.next().getName).newInstance()
val stop = plugin.asInstanceOf[ConfigurableStop]
stopList = stop +: stopList
}
//find external stop
stopList = stopList ::: findAllConfigurableStopInClasspath()
stopList
}
def findConfigurableStop(bundle : String) : Option[ConfigurableStop] = {
var stop:Option[ConfigurableStop] = None
private def findAllConfigurableStopInClasspath() : List[ConfigurableStop] = {
val classpath = System.getProperty("user.dir")+ "/classpath/"
var stopList:List[ConfigurableStop] = List()
val classpathFile = new File(classpath)
println("classpath is " + classpath)
//println("classpath is " + classpath)
val finder = ClassFinder(getJarFile(classpathFile))
val classes = finder.getClasses
val classMap = ClassFinder.classInfoMap(classes)
val it = classes.iterator
while(it.hasNext) {
val externalClass = it.next()
if(externalClass.superClassName.equals(configurableStopClass)){
val stopIntance = Class.forName(externalClass.name).newInstance()
stopList = stopIntance.asInstanceOf[ConfigurableStop] +: stopList
}
}
stopList
/*val classMap = ClassFinder.classInfoMap(classes)
val plugins = ClassFinder.concreteSubclasses(configurableStopClass,classMap)
plugins.foreach{
pluginClassInfo =>
val plugin = Class.forName(pluginClassInfo.name).newInstance()
stopList = plugin.asInstanceOf[ConfigurableStop] +: stopList
}
stopList*/
}
def findAllGroups() : List[String] = {
val stoplist = findAllConfigurableStop();
val groupList = stoplist.flatMap(_.getGroup()).distinct
groupList
}
private def findConfigurableStopInClasspath(bundle : String) : Option[ConfigurableStop] = {
val classpath = System.getProperty("user.dir")+ "/classpath/"
var stop:Option[ConfigurableStop] = None
val classpathFile = new File(classpath)
//println("classpath is " + classpath)
val finder = ClassFinder(getJarFile(classpathFile))
val classes = finder.getClasses
val it = classes.iterator
while(it.hasNext) {
val externalClass = it.next()
if(externalClass.superClassName.equals(configurableStopClass)){
if (externalClass.name.equals(bundle)){
val stopIntance = Class.forName(externalClass.name).newInstance()
stop = Some(stopIntance.asInstanceOf[ConfigurableStop])
return stop
}
}
}
/*val classMap = ClassFinder.classInfoMap(classes)
val plugins = ClassFinder.concreteSubclasses(configurableStopClass,classMap)
plugins.foreach{
pluginClassInfo =>
//println(pluginString.name)
if(pluginClassInfo.name.equals(bundle)){
val plugin = Class.forName(pluginClassInfo.name).newInstance()
stop = Some(plugin.asInstanceOf[ConfigurableStop])
return stop
}
}
}*/
stop
}
def getJarFile(dir : File) : Seq[File] = {
val files = dir.listFiles.filter(! _.isDirectory).filter( _.toString.endsWith(".jar"))
private def getJarFile(dir : File) : Seq[File] = {
val files = dir.listFiles.filter(! _.isDirectory).filter( _.toString.endsWith(".jar")).filter(_.toString.contains("piflow"))
files ++ dir.listFiles().filter(_.isDirectory).flatMap(getJarFile)
}
/*def main(args: Array[String]): Unit = {
def findConfigurableStop(bundle : String) : ConfigurableStop = {
try{
println("find ConfigurableStop by Class.forName: " + bundle)
val stop = Class.forName(bundle).newInstance()
stop.asInstanceOf[ConfigurableStop]
}catch{
case classNotFoundException:ClassNotFoundException =>{
println("find ConfigurableStop in Classpath: " + bundle)
val stop : Option[ConfigurableStop] = ClassUtil.findConfigurableStopInClasspath(bundle)
stop match {
case Some(s) => s.asInstanceOf[ConfigurableStop]
case _ => throw new ClassNotFoundException(bundle + " is not found!!!")
}
}
case ex : Exception => {
println("Can not find Configurable: " + bundle)
throw ex
}
}
}
def findConfigurableStopPropertyDescriptor(bundle : String) : List[PropertyDescriptor] = {
val stopPropertyDesc = ClassUtil.findConfigurableStop(bundle)
stopPropertyDesc.getPropertyDescriptor()
}
def findConfigurableStopInfo(bundle : String) : String = {
val stop = ClassUtil.findConfigurableStop(bundle)
val propertyDescriptorList:List[PropertyDescriptor] = stop.getPropertyDescriptor()
propertyDescriptorList.foreach(p=> if (p.allowableValues == null) p.allowableValues = List(""))
val json =
("StopInfo" ->
("bundle" -> bundle) ~
("owner" -> stop.authorEmail) ~
("inportCount" -> stop.inportCount) ~
("outportCount" -> stop.outportCount) ~
("groups" -> stop.getGroup().mkString(",")) ~
("properties" ->
propertyDescriptorList.map { property =>(
("name" -> property.name) ~
("displayName" -> property.displayName) ~
("description" -> property.description) ~
("defaultValue" -> property.defaultValue) ~
("allowableValues" -> property.allowableValues) ~
("required" -> property.required.toString) ~
("sensitive" -> property.sensitive.toString)) }) )
val jsonString = compactRender(json)
jsonString
}
def main(args: Array[String]): Unit = {
//val stop = findConfigurableStop("cn.piflow.bundle.Class1")
val allConfigurableStopList = findAllConfigurableStop()
println("\n\n\n" + allConfigurableStopList)
}*/
//val allConfigurableStopList = findAllConfigurableStop()
/*val propertyDescriptorList = findConfigurableStopPropertyDescriptor("cn.piflow.bundle.Xjzhu")
var propertyJsonList = List[String]()
propertyDescriptorList.foreach( p => propertyJsonList = p.toJson() +: propertyJsonList )
val start ="""{"properties":["""
val end = """]}"""
val str = propertyJsonList.mkString(start, ",", end)
println(str)*/
/*val stop = findAllConfigurableStop()
stop.foreach(s => println(s.getClass.getName))
val temp = 1*/
val stop = findConfigurableStop("cn.piflow.bundle.Xjzhu")
println(stop.getClass.getName)
}
}

View File

@ -44,14 +44,22 @@ class ClassFindTest {
def testFindConfigurableStop() = {
val bundle = "cn.piflow.bundle.hive.SelectHiveQL"
val stop = findConfigurableStop(bundle)
stop match {
case Some(x) => {
println("Find " + x.getClass.toString + "!!!!!!!!")
//val propertiesDescList = x.getPropertyDescriptor()
//propertiesDescList.foreach(println(_))
}
case _ => println("Can not find : " + bundle)
}
println("Find " + stop.getClass.toString + "!!!!!!!!")
}
@Test
def testFindConfigurableStopClassPath() = {
val bundle = "cn.piflow.bundle.Class1"
val stop = findConfigurableStop(bundle)
println("Find " + stop.getClass.toString + "!!!!!!!!")
}
@Test
def testFindConfigurableStopNotExist() = {
val bundle = "cn.piflow.bundle.Class5"
val stop = findConfigurableStop(bundle)
println("Find " + stop.getClass.toString + "!!!!!!!!")
}
@Test

View File

@ -1,14 +1,15 @@
package cn.piflow.api
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.bean.{FlowBean, PropertyDescriptor}
import org.apache.spark.sql.SparkSession
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.conf.util.{ClassUtil, OptionUtil}
import cn.piflow.Process
import cn.piflow.api.util.PropertyUtil
import cn.piflow.api.util.{PropertyUtil}
import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet, HttpPost}
import org.apache.http.impl.client.HttpClients
import org.apache.http.util.EntityUtils
import org.apache.spark.launcher.SparkLauncher
import scala.util.parsing.json.JSON
@ -28,10 +29,16 @@ object API {
.master(PropertyUtil.getPropertyValue("spark.master"))
.appName(flowBean.name)
.config("spark.deploy.mode",PropertyUtil.getPropertyValue("spark.deploy.mode"))
.config("spark.hadoop.yarn.resourcemanager.hostname", PropertyUtil.getPropertyValue("yarn.resourcemanager.hostname"))
.config("spark.hadoop.yarn.resourcemanager.address", PropertyUtil.getPropertyValue("yarn.resourcemanager.address"))
.config("spark.yarn.access.namenode", PropertyUtil.getPropertyValue("yarn.access.namenode"))
.config("spark.yarn.stagingDir", PropertyUtil.getPropertyValue("yarn.stagingDir"))
.config("spark.yarn.jars", PropertyUtil.getPropertyValue("yarn.jars"))
//.config("spark.driver.memory", "1g")
//.config("spark.executor.memory", "1g")
//.config("spark.cores.max", "2")
.config("spark.jars", PropertyUtil.getPropertyValue("piflow.bundle"))
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport()
.getOrCreate()
@ -41,9 +48,24 @@ object API {
.bind("checkpoint.path", PropertyUtil.getPropertyValue("checkpoint.path"))
.start(flow);
val applicationId = spark.sparkContext.applicationId
process.awaitTermination();
spark.close();
//process.awaitTermination();
//spark.close();
new Thread( new WaitProcessTerminateRunnable(spark, process)).start()
(applicationId,process)
/*val launcher = new SparkLauncher
launcher.setMaster(PropertyUtil.getPropertyValue("spark.master"))
.setAppName("test")
.setDeployMode(PropertyUtil.getPropertyValue("spark.deploy.mode"))
.setConf("spark.hadoop.yarn.resourcemanager.hostname", PropertyUtil.getPropertyValue("yarn.resourcemanager.hostname"))
.setConf("spark.hadoop.yarn.resourcemanager.address", PropertyUtil.getPropertyValue("yarn.resourcemanager.address")).setConf("spark.yarn.access.namenode", PropertyUtil.getPropertyValue("yarn.access.namenode"))
.setConf("spark.yarn.stagingDir", PropertyUtil.getPropertyValue("yarn.stagingDir"))
.setConf("spark.yarn.jars", PropertyUtil.getPropertyValue("yarn.jars"))
.setConf("spark.jars", PropertyUtil.getPropertyValue("piflow.bundle"))
.setConf("hive.metastore.uris", PropertyUtil.getPropertyValue("hive.metastore.uris"))
.setMainClass("lalla")
.addAppArgs(flowJson)*/
}
def stopFlow(process : Process): String = {
@ -64,4 +86,27 @@ object API {
str
}
def getStopInfo(bundle : String) : String = {
try{
val str = ClassUtil.findConfigurableStopInfo(bundle)
str
}catch{
case ex : Exception => println(ex);throw ex
}
}
def getAllGroups() = {
val groups = ClassUtil.findAllGroups().mkString(",")
"""{"groups":"""" + groups + """"}"""
}
}
class WaitProcessTerminateRunnable(spark : SparkSession, process: Process) extends Runnable {
override def run(): Unit = {
process.awaitTermination()
//spark.close()
}
}

View File

@ -9,11 +9,11 @@ import org.apache.http.message.BasicNameValuePair
import org.apache.http.util.EntityUtils
import org.omg.CORBA.NameValuePair
object HTTPClientGet {
object HTTPClientGetFlowInfo {
def main(args: Array[String]): Unit = {
val url = "http://10.0.86.98:8002/flow/info?appID=application_1536562610670_0001"
val url = "http://10.0.86.98:8001/flow/info?appID=application_1536562610670_0005"
val client = HttpClients.createDefault()
val getFlowInfo:HttpGet = new HttpGet(url)

View File

@ -0,0 +1,21 @@
package cn.piflow.api
import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet}
import org.apache.http.impl.client.HttpClients
import org.apache.http.util.EntityUtils
object HTTPClientGetGroups {
def main(args: Array[String]): Unit = {
val url = "http://10.0.86.98:8001/stop/groups"
val client = HttpClients.createDefault()
val getGroups:HttpGet = new HttpGet(url)
val response:CloseableHttpResponse = client.execute(getGroups)
val entity = response.getEntity
val str = EntityUtils.toString(entity,"UTF-8")
println("Groups is: " + str)
}
}

View File

@ -6,11 +6,11 @@ import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.HttpClients
import org.apache.http.util.EntityUtils
object HTTPClient {
object HTTPClientStartFlow {
def main(args: Array[String]): Unit = {
val json = """{"flow":{"name":"test","uuid":"1234","stops":[{"uuid":"1111","name":"XmlParser","bundle":"cn.piflow.bundle.xml.XmlParser","properties":{"xmlpath":"hdfs://10.0.86.89:9000/xjzhu/dblp.mini.xml","rowTag":"phdthesis"}},{"uuid":"2222","name":"SelectField","bundle":"cn.piflow.bundle.common.SelectField","properties":{"schema":"title,author,pages"}},{"uuid":"3333","name":"PutHiveStreaming","bundle":"cn.piflow.bundle.hive.PutHiveStreaming","properties":{"database":"sparktest","table":"dblp_phdthesis"}},{"uuid":"4444","name":"CsvParser","bundle":"cn.piflow.bundle.csv.CsvParser","properties":{"csvPath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.csv","header":"false","delimiter":",","schema":"title,author,pages"}},{"uuid":"555","name":"Merge","bundle":"cn.piflow.bundle.common.Merge","properties":{}},{"uuid":"666","name":"Fork","bundle":"cn.piflow.bundle.common.Fork","properties":{"outports":["out1","out2","out3"]}},{"uuid":"777","name":"JsonSave","bundle":"cn.piflow.bundle.json.JsonSave","properties":{"jsonSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.json"}},{"uuid":"888","name":"CsvSave","bundle":"cn.piflow.bundle.csv.CsvSave","properties":{"csvSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis_result.csv","header":"true","delimiter":","}}],"paths":[{"from":"XmlParser","outport":"","inport":"","to":"SelectField"},{"from":"SelectField","outport":"","inport":"data1","to":"Merge"},{"from":"CsvParser","outport":"","inport":"data2","to":"Merge"},{"from":"Merge","outport":"","inport":"","to":"Fork"},{"from":"Fork","outport":"out1","inport":"","to":"PutHiveStreaming"},{"from":"Fork","outport":"out2","inport":"","to":"JsonSave"},{"from":"Fork","outport":"out3","inport":"","to":"CsvSave"}]}}"""
val url = "http://10.0.86.98:8002/flow/start"
val url = "http://10.0.86.98:8001/flow/start"
val client = HttpClients.createDefault()
val post:HttpPost = new HttpPost(url)

View File

@ -0,0 +1,24 @@
package cn.piflow.api
import org.apache.http.client.methods.{CloseableHttpResponse, HttpPost}
import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.HttpClients
import org.apache.http.util.EntityUtils
object HTTPClientStopFlow {
def main(args: Array[String]): Unit = {
val json = """{"appID":"application_1536718350536_0023"}"""
val url = "http://10.0.86.98:8001/flow/stop"
val client = HttpClients.createDefault()
val post:HttpPost = new HttpPost(url)
post.addHeader("Content-Type", "application/json")
post.setEntity(new StringEntity(json))
val response:CloseableHttpResponse = client.execute(post)
val entity = response.getEntity
val str = EntityUtils.toString(entity,"UTF-8")
println(str)
}
}

View File

@ -5,7 +5,6 @@ import java.util.concurrent.CompletionStage
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.HttpMethods._
@ -60,7 +59,7 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup
case HttpEntity.Strict(_, data) =>{
val flowJson = data.utf8String
val (appId,process) = API.startFlow(flowJson)
processMap += (process.pid() -> process)
processMap += (appId -> process)
Future.successful(HttpResponse(entity = appId))
}
@ -69,18 +68,48 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup
}
case HttpRequest(POST, Uri.Path("/flow/stop"), headers, entity, protocol) =>{
case HttpRequest(POST, Uri.Path("/flow/stop"), headers, entity, protocol) =>{
val data = toJson(entity)
val processId = data.get("processId").getOrElse("").asInstanceOf[String]
if(processId.equals("") || !processMap.contains(processId)){
val appId = data.get("appID").getOrElse("").asInstanceOf[String]
if(appId.equals("") || !processMap.contains(appId)){
Future.failed(new Exception("Can not found process Error!"))
}else{
val result = API.stopFlow(processMap.get(processId).asInstanceOf[Process])
Future.successful(HttpResponse(entity = result))
processMap.get(appId) match {
case Some(process) =>
val result = API.stopFlow(process.asInstanceOf[Process])
Future.successful(HttpResponse(entity = result))
case _ =>
Future.successful(HttpResponse(entity = "Can not found process Error!"))
}
}
}
case HttpRequest(GET, Uri.Path("/stop/info"), headers, entity, protocol) =>{
val bundle = req.getUri().query().getOrElse("bundle","")
if(bundle.equals("")){
Future.failed(new Exception("Can not found bundle Error!"))
}else{
try{
val stopInfo = API.getStopInfo(bundle)
Future.successful(HttpResponse(entity = stopInfo))
}catch {
case _ => Future.successful(HttpResponse(entity = "Can not found stop properties Error!"))
}
}
}
case HttpRequest(GET, Uri.Path("/stop/groups"), headers, entity, protocol) =>{
try{
val stopGroups = API.getAllGroups()
Future.successful(HttpResponse(entity = stopGroups))
}catch {
case _ => Future.successful(HttpResponse(entity = "Can not found stop properties Error!"))
}
}
case _: HttpRequest =>
Future.successful(HttpResponse(404, entity = "Unknown resource!"))
}

View File

@ -0,0 +1,21 @@
package cn.piflow.api
import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet}
import org.apache.http.impl.client.HttpClients
import org.apache.http.util.EntityUtils
object HttpClientGetStopInfo {
def main(args: Array[String]): Unit = {
val url = "http://10.0.86.98:8001/stop/info?bundle=cn.piflow.bundle.hive.SelectHiveQL"
val client = HttpClients.createDefault()
val getFlowInfo:HttpGet = new HttpGet(url)
val response:CloseableHttpResponse = client.execute(getFlowInfo)
val entity = response.getEntity
val str = EntityUtils.toString(entity,"UTF-8")
println("Property Desc is " + str)
}
}

View File

@ -54,7 +54,7 @@
<artifactId>spark-yarn_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<!--<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0</version>
@ -83,7 +83,7 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
<version>2.6.4</version>
</dependency>
</dependency>-->
</dependencies>
<build>

View File

@ -6,3 +6,6 @@ mvn install:install-file -Dfile=/opt/project/piflow/piflow-bundle/lib/spark-xml_
clean package -Dmaven.test.skip=true -U
3.set SPARK_HOME in Configurations
Edit Configurations --> Application(HttpService) --> Configurations --> Environment Variable