Merge remote-tracking branch 'origin/master'
This commit is contained in:
commit
350f74a741
Binary file not shown.
Binary file not shown.
|
@ -19,6 +19,14 @@
|
|||
|
||||
<dependencies>
|
||||
|
||||
<!--
|
||||
https://mvnrepository.com/artifact/org.apache.hive/hive-jdbc
|
||||
-->
|
||||
<dependency><groupId>org.apache.hive</groupId>
|
||||
<artifactId>hive-jdbc</artifactId>
|
||||
<version>1.2.1</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.mongodb</groupId>
|
||||
<artifactId>mongodb-driver</artifactId>
|
||||
|
@ -156,6 +164,19 @@
|
|||
<version>2.6.6</version>
|
||||
</dependency>
|
||||
|
||||
|
||||
<dependency>
|
||||
<groupId>jdbc_oracle</groupId>
|
||||
<artifactId>ojdbc</artifactId>
|
||||
<version>6.0.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>jdbc_oracle</groupId>
|
||||
<artifactId>ojdbc</artifactId>
|
||||
<version>5.0.0</version>
|
||||
</dependency>
|
||||
|
||||
|
||||
<!--https://mvnrepository.com/artifact/io.netty/netty-all-->
|
||||
<dependency>
|
||||
<groupId>io.netty</groupId>
|
||||
|
@ -202,6 +223,41 @@
|
|||
<generatePom>true</generatePom>
|
||||
</configuration>
|
||||
</execution>
|
||||
|
||||
|
||||
<execution>
|
||||
<id>install-external-2</id>
|
||||
<goals>
|
||||
<goal>install-file</goal>
|
||||
</goals>
|
||||
<phase>install</phase>
|
||||
<configuration>
|
||||
<file>${basedir}/lib/ojdbc6.jar</file>
|
||||
<groupId>jdbc_oracle</groupId>
|
||||
<artifactId>ojdbc</artifactId>
|
||||
<version>6.0.0</version>
|
||||
<packaging>jar</packaging>
|
||||
<generatePom>true</generatePom>
|
||||
</configuration>
|
||||
</execution>
|
||||
|
||||
<execution>
|
||||
<id>install-external-2</id>
|
||||
<goals>
|
||||
<goal>install-file</goal>
|
||||
</goals>
|
||||
<phase>install</phase>
|
||||
<configuration>
|
||||
<file>${basedir}/lib/ojdbc5.jar</file>
|
||||
<groupId>jdbc_oracle</groupId>
|
||||
<artifactId>ojdbc</artifactId>
|
||||
<version>5.0.0</version>
|
||||
<packaging>jar</packaging>
|
||||
<generatePom>true</generatePom>
|
||||
</configuration>
|
||||
</execution>
|
||||
|
||||
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
|
|
|
@ -5,8 +5,8 @@
|
|||
"stops":[
|
||||
{
|
||||
"uuid":"0000",
|
||||
"name":"NewXmlParser",
|
||||
"bundle":"cn.piflow.bundle.xml.NewXmlParser",
|
||||
"name":"FlattenXmlParser",
|
||||
"bundle":"cn.piflow.bundle.xml.FlattenXmlParser",
|
||||
"properties":{
|
||||
"xmlpath":"hdfs://10.0.86.89:9000/cscd.xml",
|
||||
"tagPath":"papers,paper",
|
||||
|
@ -27,7 +27,7 @@
|
|||
],
|
||||
"paths":[
|
||||
{
|
||||
"from":"NewXmlParser",
|
||||
"from":"FlattenXmlParser",
|
||||
"outport":"",
|
||||
"inport":"",
|
||||
"to":"JsonSave"
|
|
@ -0,0 +1,45 @@
|
|||
{
|
||||
"flow":{
|
||||
"name":"test",
|
||||
"uuid":"1234",
|
||||
"checkpoint":"Merge",
|
||||
"stops":[
|
||||
|
||||
{
|
||||
"uuid":"0000",
|
||||
"name":"JdbcReadFromOracle",
|
||||
"bundle":"cn.piflow.bundle.jdbc.JdbcReadFromOracle",
|
||||
"properties":{
|
||||
"url":"jdbc:oracle:thin:@10.0.86.237:1521/newdb",
|
||||
"user":"my",
|
||||
"password":"bigdata",
|
||||
"sql":"select * from typetype",
|
||||
"fileNamesString":"mynum.number,mychar.varchar2,myblob.blob,myclob.clob,myxml.xmltype,mylong.long,mydate.date,mynclob.nclob"
|
||||
}
|
||||
|
||||
},
|
||||
{
|
||||
"uuid":"1111",
|
||||
"name":"CsvParser",
|
||||
"bundle":"cn.piflow.bundle.csv.CsvParser",
|
||||
"properties":{
|
||||
"csvPath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.csv",
|
||||
"header":"false",
|
||||
"delimiter":",",
|
||||
"schema":"title,author,pages"
|
||||
}
|
||||
}
|
||||
|
||||
],
|
||||
"paths":[
|
||||
{
|
||||
"from":"JdbcReadFromOracle",
|
||||
"outport":"",
|
||||
"inport":"",
|
||||
"to":"CsvParser"
|
||||
}
|
||||
|
||||
|
||||
]
|
||||
}
|
||||
}
|
Binary file not shown.
After Width: | Height: | Size: 3.5 KiB |
|
@ -0,0 +1,48 @@
|
|||
{
|
||||
"flow":{
|
||||
"name":"test",
|
||||
"uuid":"1234",
|
||||
"checkpoint":"Merge",
|
||||
"stops":[
|
||||
|
||||
{
|
||||
"uuid":"0000",
|
||||
"name":"GetMongo",
|
||||
"bundle":"cn.piflow.bundle.mongodb.GetMongo",
|
||||
"properties":{
|
||||
"addresses":"10.0.86.89,27017",
|
||||
"credentials":"",
|
||||
"dataBase":"test01",
|
||||
"collection":"aaaaa",
|
||||
"sql":"select id,name,age from aaaaa"
|
||||
|
||||
|
||||
}
|
||||
},
|
||||
{
|
||||
"uuid":"1111",
|
||||
"name":"JdbcWriteToOracle",
|
||||
"bundle":"cn.piflow.bundle.jdbc.JdbcWriteToOracle",
|
||||
"properties":{
|
||||
"url":"jdbc:oracle:thin:@10.0.86.237:1521/newdb",
|
||||
"user":"my",
|
||||
"password":"bigdata",
|
||||
"table":"a111"
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
],
|
||||
"paths":[
|
||||
{
|
||||
"from":"GetMongo",
|
||||
"outport":"",
|
||||
"inport":"",
|
||||
"to":"JdbcWriteToOracle"
|
||||
}
|
||||
|
||||
|
||||
]
|
||||
}
|
||||
}
|
|
@ -0,0 +1,46 @@
|
|||
{
|
||||
"flow":{
|
||||
"name":"test",
|
||||
"uuid":"1234",
|
||||
"checkpoint":"Merge",
|
||||
"stops":[
|
||||
|
||||
{
|
||||
"uuid":"0000",
|
||||
"name":"SelectImpala",
|
||||
"bundle":"cn.piflow.bundle.impala.SelectImpala",
|
||||
"properties":{
|
||||
"url":"10.0.82.165:21050",
|
||||
"user":"",
|
||||
"password":"test1",
|
||||
"sql":"select * from kylin.test1",
|
||||
"schameString":"pid,name"
|
||||
|
||||
}
|
||||
},
|
||||
{
|
||||
"uuid":"1111",
|
||||
"name":"putHdfs",
|
||||
"bundle":"cn.piflow.bundle.hdfs.PutHdfs",
|
||||
"properties":{
|
||||
"hdfsUrl":"hdfs://10.0.86.89:9000",
|
||||
"hdfsPath":"/yg/0",
|
||||
"types":"csv"
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
],
|
||||
"paths":[
|
||||
{
|
||||
"from":"SelectImpala",
|
||||
"outport":"",
|
||||
"inport":"",
|
||||
"to":"putHdfs"
|
||||
}
|
||||
|
||||
|
||||
]
|
||||
}
|
||||
}
|
Binary file not shown.
After Width: | Height: | Size: 25 KiB |
|
@ -0,0 +1,108 @@
|
|||
package cn.piflow.bundle.impala
|
||||
|
||||
import java.sql.{Connection, DriverManager, ResultSet, Statement}
|
||||
|
||||
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
|
||||
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroupEnum}
|
||||
import cn.piflow.conf.bean.PropertyDescriptor
|
||||
import cn.piflow.conf.util.{ImageUtil, MapUtil}
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql.types.{StringType, StructField, StructType}
|
||||
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
|
||||
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
|
||||
|
||||
class SelectImpala extends ConfigurableStop{
|
||||
override val authorEmail: String = "yangqidong@cnic.cn"
|
||||
override val description: String = "get data from impala"
|
||||
val inportList: List[String] = List(PortEnum.NonePort.toString)
|
||||
val outportList: List[String] = List(PortEnum.DefaultPort.toString)
|
||||
|
||||
var url:String=_
|
||||
var user:String=_
|
||||
var password:String=_
|
||||
var sql:String=_
|
||||
var schameString : String=_
|
||||
|
||||
|
||||
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
|
||||
val session: SparkSession = pec.get[SparkSession]()
|
||||
|
||||
//jdbc:hive2://10.0.82.165:21050/;auth=noSasl
|
||||
|
||||
Class.forName("org.apache.hive.jdbc.HiveDriver")
|
||||
|
||||
val con: Connection = DriverManager.getConnection("jdbc:hive2://"+url+"/;auth=noSasl",user,password)
|
||||
val stmt: Statement = con.createStatement()
|
||||
// val rs: ResultSet = stmt.executeQuery("select * from kylin.test1 full join kylin.morg on kylin.test1.pid=kylin.morg.belongtocode")
|
||||
val rs: ResultSet = stmt.executeQuery(sql)
|
||||
|
||||
val filedNames: Array[String] = schameString.split(",")
|
||||
var rowsArr:ArrayBuffer[ArrayBuffer[String]]=ArrayBuffer()
|
||||
while (rs.next()){
|
||||
var rowArr:ArrayBuffer[String]=ArrayBuffer()
|
||||
for(fileName <- filedNames){
|
||||
rowArr+=rs.getString(fileName)
|
||||
}
|
||||
rowsArr+=rowArr
|
||||
}
|
||||
|
||||
val fields: Array[StructField] = filedNames.map(d=>StructField(d,StringType,nullable = true))
|
||||
val schema: StructType = StructType(fields)
|
||||
|
||||
val rows: List[Row] = rowsArr.toList.map(arr => {
|
||||
val row: Row = Row.fromSeq(arr)
|
||||
row
|
||||
})
|
||||
val rdd: RDD[Row] = session.sparkContext.makeRDD(rows)
|
||||
val df: DataFrame = session.createDataFrame(rdd,schema)
|
||||
|
||||
println("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@")
|
||||
df.show(20)
|
||||
println("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@")
|
||||
|
||||
out.write(df)
|
||||
|
||||
}
|
||||
|
||||
override def setProperties(map: Map[String, Any]): Unit = {
|
||||
url = MapUtil.get(map,"url").asInstanceOf[String]
|
||||
user = MapUtil.get(map,"user").asInstanceOf[String]
|
||||
password = MapUtil.get(map,"password").asInstanceOf[String]
|
||||
sql = MapUtil.get(map,"sql").asInstanceOf[String]
|
||||
schameString = MapUtil.get(map,"schameString").asInstanceOf[String]
|
||||
|
||||
}
|
||||
|
||||
|
||||
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
|
||||
var descriptor : List[PropertyDescriptor] = List()
|
||||
|
||||
val url=new PropertyDescriptor().name("url").displayName("url").description("IP and port number, you need to write like this -- ip:port").defaultValue("").required(true)
|
||||
descriptor = url :: descriptor
|
||||
val user=new PropertyDescriptor().name("user").displayName("user").description("").defaultValue("").required(false)
|
||||
descriptor = user :: descriptor
|
||||
val password=new PropertyDescriptor().name("password").displayName("password").description("").defaultValue("").required(false)
|
||||
descriptor = password :: descriptor
|
||||
val sql=new PropertyDescriptor().name("sql").displayName("sql").description("The name of the table has not changed.But you have to specify which database, such as database.table.").defaultValue("").required(true)
|
||||
descriptor = sql :: descriptor
|
||||
val schameString=new PropertyDescriptor().name("schameString").displayName("schameString").description("The field of SQL statement query results is divided by ,").defaultValue("").required(true)
|
||||
descriptor = schameString :: descriptor
|
||||
|
||||
descriptor
|
||||
}
|
||||
|
||||
override def getIcon(): Array[Byte] = {
|
||||
ImageUtil.getImage("impala/impala-logo.png")
|
||||
}
|
||||
|
||||
override def getGroup(): List[String] = {
|
||||
List(StopGroupEnum.Mongodb.toString)
|
||||
}
|
||||
|
||||
override def initialize(ctx: ProcessContext): Unit = {
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -214,7 +214,8 @@ class spider extends ConfigurableStop{
|
|||
}
|
||||
|
||||
override def getIcon(): Array[Byte] = {
|
||||
ImageUtil.getImage("spider.jpg")
|
||||
ImageUtil.getImage("spider.jpeg")
|
||||
|
||||
}
|
||||
|
||||
override def getGroup(): List[String] = {
|
||||
|
|
|
@ -0,0 +1,194 @@
|
|||
package cn.piflow.bundle.jdbc
|
||||
|
||||
import java.io._
|
||||
import java.sql.{Blob, Clob, Connection, Date, DriverManager, NClob, PreparedStatement, ResultSet, SQLXML}
|
||||
|
||||
import cn.piflow._
|
||||
import cn.piflow.conf._
|
||||
import cn.piflow.conf.bean.PropertyDescriptor
|
||||
import cn.piflow.conf.util.{ImageUtil, MapUtil}
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql._
|
||||
import org.apache.spark.sql.types._
|
||||
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
|
||||
class JdbcReadFromOracle extends ConfigurableStop{
|
||||
|
||||
val authorEmail: String = "yangqidong@cnic.cn"
|
||||
val description: String = "read from oracle."
|
||||
val inportList: List[String] = List(PortEnum.NonePort.toString)
|
||||
val outportList: List[String] = List(PortEnum.DefaultPort.toString)
|
||||
|
||||
var url:String = _
|
||||
var user:String = _
|
||||
var password:String = _
|
||||
var sql:String = _
|
||||
var schame:String=_
|
||||
|
||||
|
||||
def toByteArray(in: InputStream): Array[Byte] = {
|
||||
var byteArray:Array[Byte]=new Array[Byte](1024*1024)
|
||||
val out: ByteArrayOutputStream = new ByteArrayOutputStream()
|
||||
var n:Int=0
|
||||
while ((n=in.read(byteArray)) != -1 && (n != -1)){
|
||||
out.write(byteArray,0,n)
|
||||
}
|
||||
val arr: Array[Byte] = out.toByteArray
|
||||
out.close()
|
||||
arr
|
||||
}
|
||||
|
||||
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
|
||||
val session = pec.get[SparkSession]()
|
||||
|
||||
Class.forName("oracle.jdbc.driver.OracleDriver")
|
||||
val con: Connection = DriverManager.getConnection(url,user,password)
|
||||
val pre: PreparedStatement = con.prepareStatement(sql)
|
||||
val rs: ResultSet = pre.executeQuery()
|
||||
|
||||
|
||||
val filedNames: Array[String] = schame.split(",")
|
||||
var rowsArr:ArrayBuffer[ArrayBuffer[Any]]=ArrayBuffer()
|
||||
while (rs.next()){
|
||||
var rowArr:ArrayBuffer[Any]=ArrayBuffer()
|
||||
for(fileName <- filedNames){
|
||||
val name_type: Array[String] = fileName.split("\\.")
|
||||
val name: String = name_type(0)
|
||||
val typestr: String = name_type(1)
|
||||
if(typestr.toUpperCase.equals("BLOB")){
|
||||
val blob: Blob = rs.getBlob(name)
|
||||
var byteArr : Array[Byte] =Array()
|
||||
if(blob != null){
|
||||
val stream: InputStream = blob.getBinaryStream
|
||||
byteArr = toByteArray(stream)
|
||||
stream.close()
|
||||
}
|
||||
rowArr+=byteArr
|
||||
}else if(typestr.toUpperCase.equals("CLOB") || typestr.toUpperCase.equals("XMLTYPE")){
|
||||
val clob: Clob = rs.getClob(name)
|
||||
var byteArr : Array[Byte] =Array()
|
||||
if(clob != null){
|
||||
val stream: InputStream = clob.getAsciiStream
|
||||
byteArr = toByteArray(stream)
|
||||
stream.close()
|
||||
}
|
||||
rowArr+=byteArr
|
||||
}else if(typestr.toUpperCase.equals("NCLOB")){
|
||||
val nclob: NClob = rs.getNClob(name)
|
||||
var byteArr : Array[Byte] =Array()
|
||||
if(nclob != null){
|
||||
val stream: InputStream = nclob.getAsciiStream
|
||||
byteArr = toByteArray(stream)
|
||||
stream.close()
|
||||
}
|
||||
rowArr+=byteArr
|
||||
}else if(typestr.toUpperCase.equals("DATE")){
|
||||
val date: Date = rs.getDate(name)
|
||||
rowArr+=date
|
||||
}else if(typestr.toUpperCase.equals("NUMBER")){
|
||||
val int: Int = rs.getInt(name)
|
||||
rowArr+=int
|
||||
}else{
|
||||
rowArr+=rs.getString(name)
|
||||
}
|
||||
}
|
||||
rowsArr+=rowArr
|
||||
}
|
||||
|
||||
var nameArrBuff:ArrayBuffer[String]=ArrayBuffer()
|
||||
var typeArrBuff:ArrayBuffer[String]=ArrayBuffer()
|
||||
filedNames.foreach(x => {
|
||||
nameArrBuff+=x.split("\\.")(0)
|
||||
typeArrBuff+=x.split("\\.")(1)
|
||||
})
|
||||
var num:Int=0
|
||||
val fields: ArrayBuffer[StructField] = nameArrBuff.map(x => {
|
||||
var sf: StructField = null
|
||||
val typeName: String = typeArrBuff(num)
|
||||
if (typeName.toUpperCase.equals("BLOB") || typeName.toUpperCase.equals("CLOB") || typeName.toUpperCase.equals("NCLOB") || typeName.toUpperCase.equals("XMLTYPE")) {
|
||||
sf = StructField(x, DataTypes.createArrayType(ByteType), nullable = true)
|
||||
}else if( typeName.toUpperCase.equals("DATE")) {
|
||||
sf = StructField(x, DateType, nullable = true)
|
||||
}else if( typeName.toUpperCase.equals("NUMBER")) {
|
||||
sf = StructField(x, IntegerType, nullable = true)
|
||||
}else if( typeName.toUpperCase.equals("XMLTYPE")) {
|
||||
sf = StructField(x, IntegerType, nullable = true)
|
||||
}else {
|
||||
sf = StructField(x, StringType, nullable = true)
|
||||
}
|
||||
num+=1
|
||||
sf
|
||||
})
|
||||
|
||||
val schema: StructType = StructType(fields)
|
||||
val rows: List[Row] = rowsArr.toList.map(arr => {
|
||||
|
||||
val row: Row = Row.fromSeq(arr)
|
||||
row
|
||||
})
|
||||
val rdd: RDD[Row] = session.sparkContext.makeRDD(rows)
|
||||
val df: DataFrame = session.createDataFrame(rdd,schema)
|
||||
|
||||
println("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@")
|
||||
df.show(20)
|
||||
println("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@")
|
||||
|
||||
/*
|
||||
val v: Any = df.collect()(0).get(2)
|
||||
println("3333333333333333333333"+v.getClass)
|
||||
val ab: Array[Byte] = v.asInstanceOf[Seq[Byte]].toArray
|
||||
|
||||
val fos: FileOutputStream = new FileOutputStream(new File("/aa.txt"))
|
||||
fos.write(ab)
|
||||
fos.close()
|
||||
*/
|
||||
|
||||
out.write(df)
|
||||
|
||||
|
||||
}
|
||||
|
||||
def initialize(ctx: ProcessContext): Unit = {
|
||||
|
||||
}
|
||||
|
||||
override def setProperties(map: Map[String, Any]): Unit = {
|
||||
url = MapUtil.get(map,"url").asInstanceOf[String]
|
||||
user = MapUtil.get(map,"user").asInstanceOf[String]
|
||||
password = MapUtil.get(map,"password").asInstanceOf[String]
|
||||
sql = MapUtil.get(map,"sql").asInstanceOf[String]
|
||||
schame = MapUtil.get(map,"fileNamesString").asInstanceOf[String]
|
||||
}
|
||||
|
||||
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
|
||||
var descriptor : List[PropertyDescriptor] = List()
|
||||
|
||||
val url=new PropertyDescriptor().name("url").displayName("url").description("The Url, for example jdbc:oracle:thin:@192.168.0.1:1521/newdb").defaultValue("").required(true)
|
||||
descriptor = url :: descriptor
|
||||
|
||||
val user=new PropertyDescriptor().name("user").displayName("user").description("The user name of database").defaultValue("").required(true)
|
||||
descriptor = user :: descriptor
|
||||
|
||||
val password=new PropertyDescriptor().name("password").displayName("password").description("The password of database").defaultValue("").required(true)
|
||||
descriptor = password :: descriptor
|
||||
|
||||
val sql=new PropertyDescriptor().name("sql").displayName("sql").description("The sql you want").defaultValue("").required(true)
|
||||
descriptor = sql :: descriptor
|
||||
|
||||
val schame=new PropertyDescriptor().name("schame").displayName("schame").description("The name of the field of your SQL statement query, such as: ID.number, name.varchar").defaultValue("").required(true)
|
||||
descriptor = schame :: descriptor
|
||||
|
||||
descriptor
|
||||
}
|
||||
|
||||
override def getIcon(): Array[Byte] = {
|
||||
ImageUtil.getImage("JDBC/oracle.jpeg")
|
||||
}
|
||||
|
||||
override def getGroup(): List[String] = {
|
||||
List(StopGroupEnum.JdbcGroup.toString)
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,97 @@
|
|||
package cn.piflow.bundle.jdbc
|
||||
|
||||
import java.sql.{Connection, DriverManager, Statement}
|
||||
|
||||
import cn.piflow._
|
||||
import cn.piflow.conf._
|
||||
import cn.piflow.conf.bean.PropertyDescriptor
|
||||
import cn.piflow.conf.util.{ImageUtil, MapUtil}
|
||||
import org.apache.spark.sql._
|
||||
|
||||
class JdbcWriteToOracle extends ConfigurableStop{
|
||||
|
||||
val authorEmail: String = "yangqidong@cnic.cn"
|
||||
val description: String = "write to oracle."
|
||||
val inportList: List[String] = List(PortEnum.NonePort.toString)
|
||||
val outportList: List[String] = List(PortEnum.DefaultPort.toString)
|
||||
|
||||
var url:String = _
|
||||
var user:String = _
|
||||
var password:String = _
|
||||
var table:String = _
|
||||
|
||||
|
||||
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
|
||||
val session = pec.get[SparkSession]()
|
||||
val inDF: DataFrame = in.read()
|
||||
|
||||
Class.forName("oracle.jdbc.driver.OracleDriver")
|
||||
val con: Connection = DriverManager.getConnection("jdbc:oracle:thin:@10.0.86.237:1521/newdb","my","bigdata")
|
||||
val star: Statement = con.createStatement()
|
||||
|
||||
val fileNames: Array[String] = inDF.columns
|
||||
var fileNameStr:String=""
|
||||
var createSQL:String="create table "+table+"("
|
||||
fileNames.foreach(name => {
|
||||
fileNameStr+=(","+name)
|
||||
createSQL+=(name+" varchar2(100),")
|
||||
})
|
||||
|
||||
star.executeUpdate(createSQL.substring(0,createSQL.length-1)+")")
|
||||
|
||||
|
||||
inDF.collect().foreach(r => {
|
||||
var insertSQL:String="insert into "+table+"("+fileNameStr.substring(1)+") Values("
|
||||
var rowStr:String=""
|
||||
val rs: Array[String] = r.toString().substring(1, r.toString().length - 1).split(",")
|
||||
for(x <- rs){
|
||||
rowStr+=(",'"+x+"'")
|
||||
}
|
||||
insertSQL+=(rowStr.substring(1)+")")
|
||||
|
||||
star.executeUpdate(insertSQL)
|
||||
})
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
def initialize(ctx: ProcessContext): Unit = {
|
||||
|
||||
}
|
||||
|
||||
override def setProperties(map: Map[String, Any]): Unit = {
|
||||
url = MapUtil.get(map,"url").asInstanceOf[String]
|
||||
user = MapUtil.get(map,"user").asInstanceOf[String]
|
||||
password = MapUtil.get(map,"password").asInstanceOf[String]
|
||||
table = MapUtil.get(map,"table").asInstanceOf[String]
|
||||
}
|
||||
|
||||
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
|
||||
var descriptor : List[PropertyDescriptor] = List()
|
||||
|
||||
val url=new PropertyDescriptor().name("url").displayName("url").description("The Url, for example jdbc:oracle:thin:@192.168.0.1:1521/newdb").defaultValue("").required(true)
|
||||
descriptor = url :: descriptor
|
||||
|
||||
val user=new PropertyDescriptor().name("user").displayName("user").description("The user name of database").defaultValue("").required(true)
|
||||
descriptor = user :: descriptor
|
||||
|
||||
val password=new PropertyDescriptor().name("password").displayName("password").description("The password of database").defaultValue("").required(true)
|
||||
descriptor = password :: descriptor
|
||||
|
||||
val table=new PropertyDescriptor().name("table").displayName("table").description("The table you want").defaultValue("").required(true)
|
||||
descriptor = table :: descriptor
|
||||
|
||||
descriptor
|
||||
}
|
||||
|
||||
override def getIcon(): Array[Byte] = {
|
||||
ImageUtil.getImage("JDBC/oracle.jpeg")
|
||||
}
|
||||
|
||||
override def getGroup(): List[String] = {
|
||||
List(StopGroupEnum.JdbcGroup.toString)
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -14,7 +14,7 @@ import scala.collection.mutable.ArrayBuffer
|
|||
|
||||
class ComplementByMemcache extends ConfigurableStop {
|
||||
override val authorEmail: String = "yangqidong@cnic.cn"
|
||||
override val description: String = "get data from mongodb"
|
||||
override val description: String = "Supplement to Memcache query data"
|
||||
val inportList: List[String] = List(PortEnum.DefaultPort.toString)
|
||||
val outportList: List[String] = List(PortEnum.DefaultPort.toString)
|
||||
|
||||
|
|
|
@ -15,7 +15,7 @@ import scala.collection.mutable.ArrayBuffer
|
|||
|
||||
class GetMemcache extends ConfigurableStop{
|
||||
override val authorEmail: String = "yangqidong@cnic.cn"
|
||||
override val description: String = "get data from mongodb"
|
||||
override val description: String = "get data from memache"
|
||||
val inportList: List[String] = List(PortEnum.DefaultPort.toString)
|
||||
val outportList: List[String] = List(PortEnum.DefaultPort.toString)
|
||||
|
||||
|
|
|
@ -9,7 +9,7 @@ import org.apache.spark.sql.{DataFrame, Row, SparkSession}
|
|||
|
||||
class PutMemcache extends ConfigurableStop{
|
||||
override val authorEmail: String = "yangqidong@cnic.cn"
|
||||
override val description: String = "get data from mongodb"
|
||||
override val description: String = "get data from memcache"
|
||||
val inportList: List[String] = List(PortEnum.DefaultPort.toString)
|
||||
val outportList: List[String] = List(PortEnum.NonePort.toString)
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@ import scala.collection.JavaConverters._
|
|||
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
|
||||
|
||||
|
||||
class NewXmlParser extends ConfigurableStop{
|
||||
class FlattenXmlParser extends ConfigurableStop{
|
||||
override val authorEmail: String = "yangqidong@cnic.cn"
|
||||
val inportList: List[String] = List(PortEnum.NonePort.toString)
|
||||
val outportList: List[String] = List(PortEnum.DefaultPort.toString)
|
|
@ -8,13 +8,13 @@ import org.junit.Test
|
|||
|
||||
import scala.util.parsing.json.JSON
|
||||
|
||||
class NewXmlParserTest {
|
||||
class FlattenXmlParserTest {
|
||||
|
||||
@Test
|
||||
def testFlow(): Unit ={
|
||||
|
||||
//parse flow json
|
||||
val file = "src/main/resources/NewXmlParser.json"
|
||||
val file = "src/main/resources/FlattenXmlParser.json"
|
||||
val flowJsonStr = FileUtil.fileReader(file)
|
||||
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
|
||||
println(map)
|
|
@ -0,0 +1,65 @@
|
|||
package cn.piflow.bundle.JDBC
|
||||
|
||||
import cn.piflow.Runner
|
||||
import cn.piflow.conf.bean.FlowBean
|
||||
import cn.piflow.conf.util.{FileUtil, OptionUtil}
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.h2.tools.Server
|
||||
import org.junit.Test
|
||||
|
||||
import scala.util.parsing.json.JSON
|
||||
|
||||
class getOracleTest {
|
||||
|
||||
@Test
|
||||
def testFlow(): Unit ={
|
||||
|
||||
//parse flow json
|
||||
val file = "src/main/resources/JDBC/getOracle.json"
|
||||
val flowJsonStr = FileUtil.fileReader(file)
|
||||
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
|
||||
println(map)
|
||||
|
||||
//create flow
|
||||
val flowBean = FlowBean(map)
|
||||
val flow = flowBean.constructFlow()
|
||||
|
||||
val h2Server = Server.createTcpServer("-tcp","-tcpAllowOthers","-tcpPort","50001").start()
|
||||
|
||||
|
||||
//execute flow
|
||||
val spark = SparkSession.builder()
|
||||
.master("spark://10.0.86.89:7077")
|
||||
.appName("piflow-hive-bundle")
|
||||
.config("spark.driver.memory", "1g")
|
||||
.config("spark.executor.memory", "2g")
|
||||
.config("spark.cores.max", "2")
|
||||
.config("spark.jars","/root/Desktop/gitWORK/out/artifacts/piflow_bundle/piflow_bundle.jar")
|
||||
.enableHiveSupport()
|
||||
.getOrCreate()
|
||||
|
||||
val process = Runner.create()
|
||||
.bind(classOf[SparkSession].getName, spark)
|
||||
.bind("checkpoint.path", "hdfs://10.0.86.89:9000/xjzhu/piflow/checkpoints/")
|
||||
.start(flow);
|
||||
|
||||
process.awaitTermination();
|
||||
val pid = process.pid();
|
||||
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
|
||||
spark.close();
|
||||
}
|
||||
@Test
|
||||
def testFlow2json() = {
|
||||
|
||||
//parse flow json
|
||||
val file = "src/main/resources/flow.json"
|
||||
val flowJsonStr = FileUtil.fileReader(file)
|
||||
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
|
||||
|
||||
//create flow
|
||||
val flowBean = FlowBean(map)
|
||||
val flowJson = flowBean.toJson()
|
||||
println(flowJson)
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,65 @@
|
|||
package cn.piflow.bundle.JDBC
|
||||
|
||||
import cn.piflow.Runner
|
||||
import cn.piflow.conf.bean.FlowBean
|
||||
import cn.piflow.conf.util.{FileUtil, OptionUtil}
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.h2.tools.Server
|
||||
import org.junit.Test
|
||||
|
||||
import scala.util.parsing.json.JSON
|
||||
|
||||
class writeOracleTest {
|
||||
|
||||
@Test
|
||||
def testFlow(): Unit ={
|
||||
|
||||
//parse flow json
|
||||
val file = "src/main/resources/JDBC/putOracle.json"
|
||||
val flowJsonStr = FileUtil.fileReader(file)
|
||||
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
|
||||
println(map)
|
||||
|
||||
//create flow
|
||||
val flowBean = FlowBean(map)
|
||||
val flow = flowBean.constructFlow()
|
||||
|
||||
val h2Server = Server.createTcpServer("-tcp","-tcpAllowOthers","-tcpPort","50001").start()
|
||||
|
||||
|
||||
//execute flow
|
||||
val spark = SparkSession.builder()
|
||||
.master("spark://10.0.86.89:7077")
|
||||
.appName("piflow-hive-bundle")
|
||||
.config("spark.driver.memory", "1g")
|
||||
.config("spark.executor.memory", "2g")
|
||||
.config("spark.cores.max", "2")
|
||||
.config("spark.jars","/root/Desktop/gitWORK/out/artifacts/piflow_bundle/piflow_bundle.jar")
|
||||
.enableHiveSupport()
|
||||
.getOrCreate()
|
||||
|
||||
val process = Runner.create()
|
||||
.bind(classOf[SparkSession].getName, spark)
|
||||
.bind("checkpoint.path", "hdfs://10.0.86.89:9000/xjzhu/piflow/checkpoints/")
|
||||
.start(flow);
|
||||
|
||||
process.awaitTermination();
|
||||
val pid = process.pid();
|
||||
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
|
||||
spark.close();
|
||||
}
|
||||
@Test
|
||||
def testFlow2json() = {
|
||||
|
||||
//parse flow json
|
||||
val file = "src/main/resources/flow.json"
|
||||
val flowJsonStr = FileUtil.fileReader(file)
|
||||
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
|
||||
|
||||
//create flow
|
||||
val flowBean = FlowBean(map)
|
||||
val flowJson = flowBean.toJson()
|
||||
println(flowJson)
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,65 @@
|
|||
package cn.piflow.bundle.impala
|
||||
|
||||
import cn.piflow.Runner
|
||||
import cn.piflow.conf.bean.FlowBean
|
||||
import cn.piflow.conf.util.{FileUtil, OptionUtil}
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.h2.tools.Server
|
||||
import org.junit.Test
|
||||
|
||||
import scala.util.parsing.json.JSON
|
||||
|
||||
class SelectImpalaTest {
|
||||
|
||||
@Test
|
||||
def testFlow(): Unit ={
|
||||
|
||||
//parse flow json
|
||||
val file = "src/main/resources/impala/SelectImpala.json"
|
||||
val flowJsonStr = FileUtil.fileReader(file)
|
||||
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
|
||||
println(map)
|
||||
|
||||
//create flow
|
||||
val flowBean = FlowBean(map)
|
||||
val flow = flowBean.constructFlow()
|
||||
|
||||
val h2Server = Server.createTcpServer("-tcp","-tcpAllowOthers","-tcpPort","50001").start()
|
||||
|
||||
|
||||
//execute flow
|
||||
val spark = SparkSession.builder()
|
||||
.master("spark://10.0.86.89:7077")
|
||||
.appName("piflow-hive-bundle")
|
||||
.config("spark.driver.memory", "1g")
|
||||
.config("spark.executor.memory", "2g")
|
||||
.config("spark.cores.max", "2")
|
||||
.config("spark.jars","/root/Desktop/gitWORK/out/artifacts/piflow_bundle/piflow_bundle.jar")
|
||||
.enableHiveSupport()
|
||||
.getOrCreate()
|
||||
|
||||
val process = Runner.create()
|
||||
.bind(classOf[SparkSession].getName, spark)
|
||||
.bind("checkpoint.path", "hdfs://10.0.86.89:9000/xjzhu/piflow/checkpoints/")
|
||||
.start(flow);
|
||||
|
||||
process.awaitTermination();
|
||||
val pid = process.pid();
|
||||
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
|
||||
spark.close();
|
||||
}
|
||||
@Test
|
||||
def testFlow2json() = {
|
||||
|
||||
//parse flow json
|
||||
val file = "src/main/resources/flow.json"
|
||||
val flowJsonStr = FileUtil.fileReader(file)
|
||||
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
|
||||
|
||||
//create flow
|
||||
val flowBean = FlowBean(map)
|
||||
val flowJson = flowBean.toJson()
|
||||
println(flowJson)
|
||||
}
|
||||
|
||||
}
|
|
@ -1,7 +1,9 @@
|
|||
1.maven error
|
||||
apt-get install maven
|
||||
mvn install:install-file -Dfile=/opt/project/piflow/piflow-bundle/lib/spark-xml_2.11-0.4.2.jar -DgroupId=com.databricks -DartifactId=spark-xml_2.11 -Dversion=0.4.2 -Dpackaging=jar
|
||||
mvn install:install-file -Dfile=/opt/project/piflow/piflow-bundle/lib/java_memcached-release_2.6.6.jar -DgroupId=com.memcached -DartifactId=java_memcached-release -Dversion=2.6.6 -Dpackaging=jar
|
||||
mvn install:install-file -Dfile=/Work/piflow/piflow-bundle/lib/java_memcached-release_2.6.6.jar -DgroupId=com.memcached -DartifactId=java_memcached-release -Dversion=2.6.6 -Dpackaging=jar
|
||||
mvn install:install-file -Dfile=/Work/piflow/piflow-bundle/lib/ojdbc6.jar -DgroupId=jdbc_oracle -DartifactId=ojdbc -Dversion=6.0.0 -Dpackaging=jar
|
||||
mvn install:install-file -Dfile=/Work/piflow/piflow-bundle/lib/ojdbc5.jar -DgroupId=jdbc_oracle -DartifactId=ojdbc -Dversion=5.0.0 -Dpackaging=jar
|
||||
|
||||
2.packaging
|
||||
|
||||
|
|
Loading…
Reference in New Issue