diff --git a/piflow-bundle/src/main/resources/JDBC/getOracle.json b/piflow-bundle/src/main/resources/JDBC/getOracle.json index afafa06..4215e9b 100644 --- a/piflow-bundle/src/main/resources/JDBC/getOracle.json +++ b/piflow-bundle/src/main/resources/JDBC/getOracle.json @@ -13,8 +13,8 @@ "url":"jdbc:oracle:thin:@10.0.86.237:1521/newdb", "user":"my", "password":"bigdata", - "sql":"select * from a", - "fileNamesString":"COL1,COL2,COL3,COL4" + "sql":"select * from typetype", + "fileNamesString":"mynum.number,mychar.varchar2,myblob.blob,myclob.clob,myxml.xmltype,mylong.long,mydate.date,mynclob.nclob" } }, @@ -30,7 +30,6 @@ } } - ], "paths":[ { diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/JdbcReadFromOracle.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/JdbcReadFromOracle.scala index 091b439..3bb271d 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/JdbcReadFromOracle.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/JdbcReadFromOracle.scala @@ -1,6 +1,7 @@ package cn.piflow.bundle.jdbc -import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet} +import java.io._ +import java.sql.{Blob, Clob, Connection, Date, DriverManager, NClob, PreparedStatement, ResultSet, SQLXML} import cn.piflow._ import cn.piflow.conf._ @@ -8,7 +9,7 @@ import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.util.{ImageUtil, MapUtil} import org.apache.spark.rdd.RDD import org.apache.spark.sql._ -import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.sql.types._ import scala.collection.mutable.ArrayBuffer @@ -26,6 +27,18 @@ class JdbcReadFromOracle extends ConfigurableStop{ var fileNamesString:String=_ + def toByteArray(in: InputStream): Array[Byte] = { + var byteArray:Array[Byte]=new Array[Byte](1024*1024) + val out: ByteArrayOutputStream = new ByteArrayOutputStream() + var n:Int=0 + while ((n=in.read(byteArray)) != -1 && (n != -1)){ + out.write(byteArray,0,n) + } + val arr: Array[Byte] = out.toByteArray + out.close() + arr + } + def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { val session = pec.get[SparkSession]() @@ -36,19 +49,81 @@ class JdbcReadFromOracle extends ConfigurableStop{ val filedNames: Array[String] = fileNamesString.split(",") - var rowsArr:ArrayBuffer[ArrayBuffer[String]]=ArrayBuffer() + var rowsArr:ArrayBuffer[ArrayBuffer[Any]]=ArrayBuffer() while (rs.next()){ - var rowArr:ArrayBuffer[String]=ArrayBuffer() + var rowArr:ArrayBuffer[Any]=ArrayBuffer() for(fileName <- filedNames){ - rowArr+=rs.getString(fileName) + val name_type: Array[String] = fileName.split("\\.") + val name: String = name_type(0) + val typestr: String = name_type(1) + if(typestr.toUpperCase.equals("BLOB")){ + val blob: Blob = rs.getBlob(name) + var byteArr : Array[Byte] =Array() + if(blob != null){ + val stream: InputStream = blob.getBinaryStream + byteArr = toByteArray(stream) + stream.close() + } + rowArr+=byteArr + }else if(typestr.toUpperCase.equals("CLOB") || typestr.toUpperCase.equals("XMLTYPE")){ + val clob: Clob = rs.getClob(name) + var byteArr : Array[Byte] =Array() + if(clob != null){ + val stream: InputStream = clob.getAsciiStream + byteArr = toByteArray(stream) + stream.close() + } + rowArr+=byteArr + }else if(typestr.toUpperCase.equals("NCLOB")){ + val nclob: NClob = rs.getNClob(name) + var byteArr : Array[Byte] =Array() + if(nclob != null){ + val stream: InputStream = nclob.getAsciiStream + byteArr = toByteArray(stream) + stream.close() + } + rowArr+=byteArr + }else if(typestr.toUpperCase.equals("DATE")){ + val date: Date = rs.getDate(name) + rowArr+=date + }else if(typestr.toUpperCase.equals("NUMBER")){ + val int: Int = rs.getInt(name) + rowArr+=int + }else{ + rowArr+=rs.getString(name) + } } rowsArr+=rowArr } - val fields: Array[StructField] = filedNames.map(d=>StructField(d,StringType,nullable = true)) - val schema: StructType = StructType(fields) + var nameArrBuff:ArrayBuffer[String]=ArrayBuffer() + var typeArrBuff:ArrayBuffer[String]=ArrayBuffer() + filedNames.foreach(x => { + nameArrBuff+=x.split("\\.")(0) + typeArrBuff+=x.split("\\.")(1) + }) + var num:Int=0 + val fields: ArrayBuffer[StructField] = nameArrBuff.map(x => { + var sf: StructField = null + val typeName: String = typeArrBuff(num) + if (typeName.toUpperCase.equals("BLOB") || typeName.toUpperCase.equals("CLOB") || typeName.toUpperCase.equals("NCLOB") || typeName.toUpperCase.equals("XMLTYPE")) { + sf = StructField(x, DataTypes.createArrayType(ByteType), nullable = true) + }else if( typeName.toUpperCase.equals("DATE")) { + sf = StructField(x, DateType, nullable = true) + }else if( typeName.toUpperCase.equals("NUMBER")) { + sf = StructField(x, IntegerType, nullable = true) + }else if( typeName.toUpperCase.equals("XMLTYPE")) { + sf = StructField(x, IntegerType, nullable = true) + }else { + sf = StructField(x, StringType, nullable = true) + } + num+=1 + sf + }) + val schema: StructType = StructType(fields) val rows: List[Row] = rowsArr.toList.map(arr => { + val row: Row = Row.fromSeq(arr) row }) @@ -59,11 +134,19 @@ class JdbcReadFromOracle extends ConfigurableStop{ df.show(20) println("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@") +/* + val v: Any = df.collect()(0).get(2) + println("3333333333333333333333"+v.getClass) + val ab: Array[Byte] = v.asInstanceOf[Seq[Byte]].toArray + + val fos: FileOutputStream = new FileOutputStream(new File("/aa.txt")) + fos.write(ab) + fos.close() +*/ + out.write(df) - - } def initialize(ctx: ProcessContext): Unit = {