This commit is contained in:
judy0131 2020-03-27 22:12:18 +08:00
parent 259da94bbe
commit 8d747c789f
2 changed files with 7 additions and 10 deletions

View File

@ -65,8 +65,10 @@ class MockData extends ConfigurableStop{
override def initialize(ctx: ProcessContext): Unit = {}
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val field = this.schema.split(",")
val spark = pec.get[SparkSession]()
import spark.implicits._
val field = this.schema.split(",")
val structFieldArray : Array[StructField] = new Array[StructField](field.size)
for(i <- 0 to field.size - 1){
val columnInfo = field(i).split(":")
@ -86,17 +88,11 @@ class MockData extends ConfigurableStop{
//case "Date"=>structFieldArray(i) = new StructField(column, DateType, nullable = true)
//case "Timestamp"=>structFieldArray(i) = new StructField(column, TimestampType, nullable = true)
}
}
val schemaStructType = StructType(structFieldArray)
val spark = pec.get[SparkSession]()
import spark.implicits._
val rnd : Random = new Random()
val i = randomJson(rnd,schemaStructType)
val df = spark.read.schema(schemaStructType).json((0 to count).map{ _ => compact(randomJson(rnd,schemaStructType))}.toDS())
out.write(df)
}
private def randomJson( rnd: Random, dataType : DataType): JValue ={

View File

@ -1,6 +1,7 @@
package cn.piflow.bundle.test
import cn.piflow.bundle.jdbc.{JdbcRead, JdbcWrite}
import cn.piflow.bundle.jdbc.{MysqlRead, MysqlWrite}
import cn.piflow.{FlowImpl, Path, Runner}
import org.apache.spark.sql.SparkSession
import org.junit.Test
@ -20,10 +21,10 @@ class JDBCTest {
val jdbcWriteParameters = Map("writeDBtable" -> "student_full")
val jDBCReadStop = new JdbcRead()
val jDBCReadStop = new MysqlRead()
jDBCReadStop.setProperties(jdbcReadParameters)
val jDBCWriteStop = new JdbcWrite()
val jDBCWriteStop = new MysqlWrite()
jDBCWriteStop.setProperties(jdbcWriteParameters)
val flow = new FlowImpl();