Merge remote-tracking branch 'origin/master'

This commit is contained in:
SongDz 2019-08-27 16:03:53 +08:00
commit b9ae336cda
8 changed files with 163 additions and 29 deletions

Binary file not shown.

Binary file not shown.

View File

@ -181,13 +181,6 @@
</dependency>
<dependency>
<groupId>jdbc_oracle</groupId>
<artifactId>ojdbc</artifactId>
<version>6.0.0</version>
</dependency>
<!--https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core-->
<dependency>
<groupId>org.apache.flume</groupId>
@ -266,6 +259,13 @@
<version>4.5.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/oracle/ojdbc6 -->
<dependency>
<groupId>oracle</groupId>
<artifactId>ojdbc6</artifactId>
<version>11.2.0.3</version>
</dependency>
</dependencies>
<build>
@ -301,25 +301,9 @@
<phase>install</phase>
<configuration>
<file>${basedir}/lib/ojdbc6-11.2.0.3.jar</file>
<groupId>jdbc_oracle</groupId>
<artifactId>ojdbc</artifactId>
<version>6.0.0</version>
<packaging>jar</packaging>
<generatePom>true</generatePom>
</configuration>
</execution>
<execution>
<id>install-external-3</id>
<goals>
<goal>install-file</goal>
</goals>
<phase>install</phase>
<configuration>
<file>${basedir}/lib/ojdbc5.jar</file>
<groupId>jdbc_oracle</groupId>
<artifactId>ojdbc</artifactId>
<version>5.0.0</version>
<groupId>oracle</groupId>
<artifactId>ojdbc6</artifactId>
<version>11.2.0.3</version>
<packaging>jar</packaging>
<generatePom>true</generatePom>
</configuration>

View File

@ -0,0 +1,55 @@
package cn.piflow.bundle.hive
import cn.piflow._
import cn.piflow.conf._
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import org.apache.spark.sql.{SaveMode, SparkSession}
class PutHiveOverwrite extends ConfigurableStop {
val authorEmail: String = "xjzhu@cnic.cn"
val description: String = "Save data to hive by overwrite mode"
val inportList: List[String] = List(PortEnum.DefaultPort.toString)
val outportList: List[String] = List(PortEnum.NonePort.toString)
var database:String = _
var table:String = _
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
val inDF = in.read()
inDF.write.format("parquet").mode(SaveMode.Overwrite).saveAsTable(database + "." + table)
//inDF.show()
//out.write(studentDF)
}
def initialize(ctx: ProcessContext): Unit = {
}
def setProperties(map : Map[String, Any]) = {
database = MapUtil.get(map,"database").asInstanceOf[String]
table = MapUtil.get(map,"table").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val database=new PropertyDescriptor().name("database").displayName("DataBase").description("The database name").defaultValue("").required(true)
val table = new PropertyDescriptor().name("table").displayName("Table").description("The table name").defaultValue("").required(true)
descriptor = database :: descriptor
descriptor = table :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("icon/hive/PutHiveStreaming.png")
}
override def getGroup(): List[String] = {
List(StopGroup.HiveGroup.toString)
}
}

View File

@ -0,0 +1,95 @@
package cn.piflow.bundle.jdbc
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.spark.sql.SparkSession
/**
* Created by xjzhu@cnic.cn on 7/23/19
*/
class OracleReadByPartition extends ConfigurableStop{
override val authorEmail: String = "xjzhu@cnic.cn"
override val description: String = "Read data From oracle"
override val inportList: List[String] = List(PortEnum.DefaultPort)
override val outportList: List[String] = List(PortEnum.DefaultPort)
var url:String = _
var user:String = _
var password:String = _
var sql:String = _
var partitionColumn:String= _
var lowerBound:Long= _
var upperBound:Long = _
var numPartitions:Int = _
override def setProperties(map: Map[String, Any]): Unit = {
url = MapUtil.get(map,"url").asInstanceOf[String]
user = MapUtil.get(map,"user").asInstanceOf[String]
password = MapUtil.get(map,"password").asInstanceOf[String]
sql = MapUtil.get(map,"sql").asInstanceOf[String]
partitionColumn = MapUtil.get(map,"partitionColumn").asInstanceOf[String]
lowerBound = MapUtil.get(map,"lowerBound").asInstanceOf[String].toLong
upperBound = MapUtil.get(map,"upperBound").asInstanceOf[String].toLong
numPartitions = MapUtil.get(map,"numPartitions").asInstanceOf[String].toInt
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val url=new PropertyDescriptor().name("url").displayName("url").description("The Url, for example jdbc:mysql://127.0.0.1/dbname").defaultValue("").required(true)
descriptor = url :: descriptor
val user=new PropertyDescriptor().name("user").displayName("user").description("The user name of database").defaultValue("").required(true)
descriptor = user :: descriptor
val password=new PropertyDescriptor().name("password").displayName("password").description("The password of database").defaultValue("").required(true)
descriptor = password :: descriptor
val sql=new PropertyDescriptor().name("sql").displayName("sql").description("The sql sentence you want to execute").defaultValue("").required(true)
descriptor = sql :: descriptor
val partitionColumn=new PropertyDescriptor().name("partitionColumn").displayName("partitionColumn").description("The partitionby column").defaultValue("").required(true)
descriptor = partitionColumn :: descriptor
val lowerBound=new PropertyDescriptor().name("lowerBound").displayName("lowerBound").description("The lowerBound of partitioned column").defaultValue("").required(true)
descriptor = lowerBound :: descriptor
val upperBound=new PropertyDescriptor().name("upperBound").displayName("upperBound").description("The upperBound of partitioned column").defaultValue("").required(true)
descriptor = upperBound :: descriptor
val numPartitions=new PropertyDescriptor().name("numPartitions").displayName("numPartitions").description("The number of partitions ").defaultValue("").required(true)
descriptor = numPartitions :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("icon/jdbc/jdbcRead.png")
}
override def getGroup(): List[String] = {
List(StopGroup.JdbcGroup.toString)
}
override def initialize(ctx: ProcessContext): Unit = {}
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
val dbtable = "( " + sql + ")temp"
val jdbcDF = spark.read.format("jdbc")
.option("url", url)
.option("driver", "oracle.jdbc.OracleDriver")
.option("dbtable", dbtable)
.option("user", user)
.option("password",password)
.option("partitionColumn",partitionColumn)
.option("lowerBound",lowerBound)
.option("upperBound",upperBound)
.option("numPartitions",numPartitions)
.load()
out.write(jdbcDF)
}
}

View File

@ -34,7 +34,7 @@ object FlowLauncher {
.setConf("spark.jars", PropertyUtil.getPropertyValue("piflow.bundle"))
.setConf("spark.hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.setConf("spark.driver.memory", flow.getDriverMemory())
.setConf("spark.num.executors", flow.getExecutorNum())
.setConf("spark.executor.instances", flow.getExecutorNum())
.setConf("spark.executor.memory", flow.getExecutorMem())
.setConf("spark.executor.cores",flow.getExecutorCores())
.addFile(PropertyUtil.getConfigureFile())

View File

@ -113,7 +113,7 @@ object API {
.setVerbose(true)
.setConf("spark.hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.setConf("spark.driver.memory", dirverMem)
.setConf("spark.num.executors",executorNum)
.setConf("spark.executor.instances",executorNum)
.setConf("spark.executor.memory", executorMem)
.setConf("spark.executor.cores",executorCores)
.addFile(PropertyUtil.getConfigureFile())

View File

@ -2,7 +2,7 @@
apt-get install maven
mvn install:install-file -Dfile=/opt/project/piflow/piflow-bundle/lib/spark-xml_2.11-0.4.2.jar -DgroupId=com.databricks -DartifactId=spark-xml_2.11 -Dversion=0.4.2 -Dpackaging=jar
mvn install:install-file -Dfile=/opt/project/piflow/piflow-bundle/lib/java_memcached-release_2.6.6.jar -DgroupId=com.memcached -DartifactId=java_memcached-release -Dversion=2.6.6 -Dpackaging=jar
mvn install:install-file -Dfile=/opt/project/piflow/piflow-bundle/lib/ojdbc6.jar -DgroupId=jdbc_oracle -DartifactId=ojdbc -Dversion=6.0.0 -Dpackaging=jar
mvn install:install-file -Dfile=/work/0901/piflow/piflow-bundle/lib/ojdbc6-11.2.0.3.jar -DgroupId=jdbc_oracle -DartifactId=ojdbc6 -Dversion=11.2.0.3 -Dpackaging=jar
mvn install:install-file -Dfile=/opt/project/piflow/piflow-bundle/lib/edtftpj.jar -DgroupId=ftpClient -DartifactId=edtftp -Dversion=1.0.0 -Dpackaging=jar
2.packaging