Modification of special types of Oracle

yang qidong
This commit is contained in:
yanfqidong0604 2018-11-19 09:43:45 +08:00
parent 7bbfd7d3ee
commit becfca6b91
2 changed files with 94 additions and 12 deletions

View File

@ -13,8 +13,8 @@
"url":"jdbc:oracle:thin:@10.0.86.237:1521/newdb",
"user":"my",
"password":"bigdata",
"sql":"select * from a",
"fileNamesString":"COL1,COL2,COL3,COL4"
"sql":"select * from typetype",
"fileNamesString":"mynum.number,mychar.varchar2,myblob.blob,myclob.clob,myxml.xmltype,mylong.long,mydate.date,mynclob.nclob"
}
},
@ -30,7 +30,6 @@
}
}
],
"paths":[
{

View File

@ -1,6 +1,7 @@
package cn.piflow.bundle.jdbc
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
import java.io._
import java.sql.{Blob, Clob, Connection, Date, DriverManager, NClob, PreparedStatement, ResultSet, SQLXML}
import cn.piflow._
import cn.piflow.conf._
@ -8,7 +9,7 @@ import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.types._
import scala.collection.mutable.ArrayBuffer
@ -26,6 +27,18 @@ class JdbcReadFromOracle extends ConfigurableStop{
var fileNamesString:String=_
def toByteArray(in: InputStream): Array[Byte] = {
var byteArray:Array[Byte]=new Array[Byte](1024*1024)
val out: ByteArrayOutputStream = new ByteArrayOutputStream()
var n:Int=0
while ((n=in.read(byteArray)) != -1 && (n != -1)){
out.write(byteArray,0,n)
}
val arr: Array[Byte] = out.toByteArray
out.close()
arr
}
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val session = pec.get[SparkSession]()
@ -36,19 +49,81 @@ class JdbcReadFromOracle extends ConfigurableStop{
val filedNames: Array[String] = fileNamesString.split(",")
var rowsArr:ArrayBuffer[ArrayBuffer[String]]=ArrayBuffer()
var rowsArr:ArrayBuffer[ArrayBuffer[Any]]=ArrayBuffer()
while (rs.next()){
var rowArr:ArrayBuffer[String]=ArrayBuffer()
var rowArr:ArrayBuffer[Any]=ArrayBuffer()
for(fileName <- filedNames){
rowArr+=rs.getString(fileName)
val name_type: Array[String] = fileName.split("\\.")
val name: String = name_type(0)
val typestr: String = name_type(1)
if(typestr.toUpperCase.equals("BLOB")){
val blob: Blob = rs.getBlob(name)
var byteArr : Array[Byte] =Array()
if(blob != null){
val stream: InputStream = blob.getBinaryStream
byteArr = toByteArray(stream)
stream.close()
}
rowArr+=byteArr
}else if(typestr.toUpperCase.equals("CLOB") || typestr.toUpperCase.equals("XMLTYPE")){
val clob: Clob = rs.getClob(name)
var byteArr : Array[Byte] =Array()
if(clob != null){
val stream: InputStream = clob.getAsciiStream
byteArr = toByteArray(stream)
stream.close()
}
rowArr+=byteArr
}else if(typestr.toUpperCase.equals("NCLOB")){
val nclob: NClob = rs.getNClob(name)
var byteArr : Array[Byte] =Array()
if(nclob != null){
val stream: InputStream = nclob.getAsciiStream
byteArr = toByteArray(stream)
stream.close()
}
rowArr+=byteArr
}else if(typestr.toUpperCase.equals("DATE")){
val date: Date = rs.getDate(name)
rowArr+=date
}else if(typestr.toUpperCase.equals("NUMBER")){
val int: Int = rs.getInt(name)
rowArr+=int
}else{
rowArr+=rs.getString(name)
}
}
rowsArr+=rowArr
}
val fields: Array[StructField] = filedNames.map(d=>StructField(d,StringType,nullable = true))
val schema: StructType = StructType(fields)
var nameArrBuff:ArrayBuffer[String]=ArrayBuffer()
var typeArrBuff:ArrayBuffer[String]=ArrayBuffer()
filedNames.foreach(x => {
nameArrBuff+=x.split("\\.")(0)
typeArrBuff+=x.split("\\.")(1)
})
var num:Int=0
val fields: ArrayBuffer[StructField] = nameArrBuff.map(x => {
var sf: StructField = null
val typeName: String = typeArrBuff(num)
if (typeName.toUpperCase.equals("BLOB") || typeName.toUpperCase.equals("CLOB") || typeName.toUpperCase.equals("NCLOB") || typeName.toUpperCase.equals("XMLTYPE")) {
sf = StructField(x, DataTypes.createArrayType(ByteType), nullable = true)
}else if( typeName.toUpperCase.equals("DATE")) {
sf = StructField(x, DateType, nullable = true)
}else if( typeName.toUpperCase.equals("NUMBER")) {
sf = StructField(x, IntegerType, nullable = true)
}else if( typeName.toUpperCase.equals("XMLTYPE")) {
sf = StructField(x, IntegerType, nullable = true)
}else {
sf = StructField(x, StringType, nullable = true)
}
num+=1
sf
})
val schema: StructType = StructType(fields)
val rows: List[Row] = rowsArr.toList.map(arr => {
val row: Row = Row.fromSeq(arr)
row
})
@ -59,11 +134,19 @@ class JdbcReadFromOracle extends ConfigurableStop{
df.show(20)
println("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@")
/*
val v: Any = df.collect()(0).get(2)
println("3333333333333333333333"+v.getClass)
val ab: Array[Byte] = v.asInstanceOf[Seq[Byte]].toArray
val fos: FileOutputStream = new FileOutputStream(new File("/aa.txt"))
fos.write(ab)
fos.close()
*/
out.write(df)
}
def initialize(ctx: ProcessContext): Unit = {