From 8d747c789fe07e3207a78afdfad6f7c8744019cc Mon Sep 17 00:00:00 2001 From: judy0131 Date: Fri, 27 Mar 2020 22:12:18 +0800 Subject: [PATCH] fix bug --- .../main/scala/cn/piflow/bundle/common/MockData.scala | 10 +++------- .../test/scala/cn/piflow/bundle/test/JDBCTest.scala | 7 ++++--- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/MockData.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/MockData.scala index 7dabc02..8f78274 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/MockData.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/MockData.scala @@ -65,8 +65,10 @@ class MockData extends ConfigurableStop{ override def initialize(ctx: ProcessContext): Unit = {} override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { - val field = this.schema.split(",") + val spark = pec.get[SparkSession]() + import spark.implicits._ + val field = this.schema.split(",") val structFieldArray : Array[StructField] = new Array[StructField](field.size) for(i <- 0 to field.size - 1){ val columnInfo = field(i).split(":") @@ -86,17 +88,11 @@ class MockData extends ConfigurableStop{ //case "Date"=>structFieldArray(i) = new StructField(column, DateType, nullable = true) //case "Timestamp"=>structFieldArray(i) = new StructField(column, TimestampType, nullable = true) } - } val schemaStructType = StructType(structFieldArray) - - val spark = pec.get[SparkSession]() - import spark.implicits._ val rnd : Random = new Random() - val i = randomJson(rnd,schemaStructType) val df = spark.read.schema(schemaStructType).json((0 to count).map{ _ => compact(randomJson(rnd,schemaStructType))}.toDS()) out.write(df) - } private def randomJson( rnd: Random, dataType : DataType): JValue ={ diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/test/JDBCTest.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/test/JDBCTest.scala index 2f4fed5..9b9d0f3 100644 --- a/piflow-bundle/src/test/scala/cn/piflow/bundle/test/JDBCTest.scala +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/test/JDBCTest.scala @@ -1,6 +1,7 @@ package cn.piflow.bundle.test -import cn.piflow.bundle.jdbc.{JdbcRead, JdbcWrite} + +import cn.piflow.bundle.jdbc.{MysqlRead, MysqlWrite} import cn.piflow.{FlowImpl, Path, Runner} import org.apache.spark.sql.SparkSession import org.junit.Test @@ -20,10 +21,10 @@ class JDBCTest { val jdbcWriteParameters = Map("writeDBtable" -> "student_full") - val jDBCReadStop = new JdbcRead() + val jDBCReadStop = new MysqlRead() jDBCReadStop.setProperties(jdbcReadParameters) - val jDBCWriteStop = new JdbcWrite() + val jDBCWriteStop = new MysqlWrite() jDBCWriteStop.setProperties(jdbcWriteParameters) val flow = new FlowImpl();