From f599d6e36d387bda074688050c5693e9fbfa900f Mon Sep 17 00:00:00 2001 From: judy_0131 Date: Tue, 27 Aug 2019 15:38:03 +0800 Subject: [PATCH 1/2] fix bug: sparklauncher launch executor number error --- piflow-core/src/main/scala/cn/piflow/util/FlowLauncher.scala | 2 +- piflow-server/src/main/scala/cn/piflow/api/API.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/piflow-core/src/main/scala/cn/piflow/util/FlowLauncher.scala b/piflow-core/src/main/scala/cn/piflow/util/FlowLauncher.scala index 7366385..e02315e 100644 --- a/piflow-core/src/main/scala/cn/piflow/util/FlowLauncher.scala +++ b/piflow-core/src/main/scala/cn/piflow/util/FlowLauncher.scala @@ -34,7 +34,7 @@ object FlowLauncher { .setConf("spark.jars", PropertyUtil.getPropertyValue("piflow.bundle")) .setConf("spark.hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris")) .setConf("spark.driver.memory", flow.getDriverMemory()) - .setConf("spark.num.executors", flow.getExecutorNum()) + .setConf("spark.executor.instances", flow.getExecutorNum()) .setConf("spark.executor.memory", flow.getExecutorMem()) .setConf("spark.executor.cores",flow.getExecutorCores()) .addFile(PropertyUtil.getConfigureFile()) diff --git a/piflow-server/src/main/scala/cn/piflow/api/API.scala b/piflow-server/src/main/scala/cn/piflow/api/API.scala index 45168d6..f45a4b7 100644 --- a/piflow-server/src/main/scala/cn/piflow/api/API.scala +++ b/piflow-server/src/main/scala/cn/piflow/api/API.scala @@ -113,7 +113,7 @@ object API { .setVerbose(true) .setConf("spark.hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris")) .setConf("spark.driver.memory", dirverMem) - .setConf("spark.num.executors",executorNum) + .setConf("spark.executor.instances",executorNum) .setConf("spark.executor.memory", executorMem) .setConf("spark.executor.cores",executorCores) .addFile(PropertyUtil.getConfigureFile()) From db5a7200d20a9fe35cabaa789d36b71bea79c8f6 Mon Sep 17 00:00:00 2001 From: judy_0131 Date: Tue, 27 Aug 2019 15:42:41 +0800 Subject: [PATCH 2/2] add stop from oracle to hive --- .../piflow/bundle/hive/PutHiveOverwrite.scala | 55 +++++++++++ .../bundle/jdbc/OracleReadByPartition.scala | 95 +++++++++++++++++++ 2 files changed, 150 insertions(+) create mode 100644 piflow-bundle/src/main/scala/cn/piflow/bundle/hive/PutHiveOverwrite.scala create mode 100644 piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/OracleReadByPartition.scala diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/hive/PutHiveOverwrite.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/hive/PutHiveOverwrite.scala new file mode 100644 index 0000000..ef91a2e --- /dev/null +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/hive/PutHiveOverwrite.scala @@ -0,0 +1,55 @@ +package cn.piflow.bundle.hive + +import cn.piflow._ +import cn.piflow.conf._ +import cn.piflow.conf.bean.PropertyDescriptor +import cn.piflow.conf.util.{ImageUtil, MapUtil} +import org.apache.spark.sql.{SaveMode, SparkSession} + +class PutHiveOverwrite extends ConfigurableStop { + + val authorEmail: String = "xjzhu@cnic.cn" + val description: String = "Save data to hive by overwrite mode" + val inportList: List[String] = List(PortEnum.DefaultPort.toString) + val outportList: List[String] = List(PortEnum.NonePort.toString) + + var database:String = _ + var table:String = _ + + def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { + val spark = pec.get[SparkSession]() + val inDF = in.read() + + inDF.write.format("parquet").mode(SaveMode.Overwrite).saveAsTable(database + "." + table) + //inDF.show() + //out.write(studentDF) + } + + def initialize(ctx: ProcessContext): Unit = { + + } + + def setProperties(map : Map[String, Any]) = { + database = MapUtil.get(map,"database").asInstanceOf[String] + table = MapUtil.get(map,"table").asInstanceOf[String] + } + + override def getPropertyDescriptor(): List[PropertyDescriptor] = { + var descriptor : List[PropertyDescriptor] = List() + val database=new PropertyDescriptor().name("database").displayName("DataBase").description("The database name").defaultValue("").required(true) + val table = new PropertyDescriptor().name("table").displayName("Table").description("The table name").defaultValue("").required(true) + descriptor = database :: descriptor + descriptor = table :: descriptor + descriptor + } + + override def getIcon(): Array[Byte] = { + ImageUtil.getImage("icon/hive/PutHiveStreaming.png") + } + + override def getGroup(): List[String] = { + List(StopGroup.HiveGroup.toString) + } + + +} diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/OracleReadByPartition.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/OracleReadByPartition.scala new file mode 100644 index 0000000..4de4e06 --- /dev/null +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/jdbc/OracleReadByPartition.scala @@ -0,0 +1,95 @@ +package cn.piflow.bundle.jdbc + +import cn.piflow.conf.bean.PropertyDescriptor +import cn.piflow.conf.util.{ImageUtil, MapUtil} +import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup} +import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} +import org.apache.spark.sql.SparkSession + +/** + * Created by xjzhu@cnic.cn on 7/23/19 + */ +class OracleReadByPartition extends ConfigurableStop{ + override val authorEmail: String = "xjzhu@cnic.cn" + override val description: String = "Read data From oracle" + override val inportList: List[String] = List(PortEnum.DefaultPort) + override val outportList: List[String] = List(PortEnum.DefaultPort) + + var url:String = _ + var user:String = _ + var password:String = _ + var sql:String = _ + var partitionColumn:String= _ + var lowerBound:Long= _ + var upperBound:Long = _ + var numPartitions:Int = _ + + override def setProperties(map: Map[String, Any]): Unit = { + url = MapUtil.get(map,"url").asInstanceOf[String] + user = MapUtil.get(map,"user").asInstanceOf[String] + password = MapUtil.get(map,"password").asInstanceOf[String] + sql = MapUtil.get(map,"sql").asInstanceOf[String] + partitionColumn = MapUtil.get(map,"partitionColumn").asInstanceOf[String] + lowerBound = MapUtil.get(map,"lowerBound").asInstanceOf[String].toLong + upperBound = MapUtil.get(map,"upperBound").asInstanceOf[String].toLong + numPartitions = MapUtil.get(map,"numPartitions").asInstanceOf[String].toInt + } + + override def getPropertyDescriptor(): List[PropertyDescriptor] = { + var descriptor : List[PropertyDescriptor] = List() + + val url=new PropertyDescriptor().name("url").displayName("url").description("The Url, for example jdbc:mysql://127.0.0.1/dbname").defaultValue("").required(true) + descriptor = url :: descriptor + + val user=new PropertyDescriptor().name("user").displayName("user").description("The user name of database").defaultValue("").required(true) + descriptor = user :: descriptor + + val password=new PropertyDescriptor().name("password").displayName("password").description("The password of database").defaultValue("").required(true) + descriptor = password :: descriptor + + val sql=new PropertyDescriptor().name("sql").displayName("sql").description("The sql sentence you want to execute").defaultValue("").required(true) + descriptor = sql :: descriptor + + val partitionColumn=new PropertyDescriptor().name("partitionColumn").displayName("partitionColumn").description("The partitionby column").defaultValue("").required(true) + descriptor = partitionColumn :: descriptor + + val lowerBound=new PropertyDescriptor().name("lowerBound").displayName("lowerBound").description("The lowerBound of partitioned column").defaultValue("").required(true) + descriptor = lowerBound :: descriptor + + val upperBound=new PropertyDescriptor().name("upperBound").displayName("upperBound").description("The upperBound of partitioned column").defaultValue("").required(true) + descriptor = upperBound :: descriptor + + val numPartitions=new PropertyDescriptor().name("numPartitions").displayName("numPartitions").description("The number of partitions ").defaultValue("").required(true) + descriptor = numPartitions :: descriptor + + descriptor + } + + override def getIcon(): Array[Byte] = { + ImageUtil.getImage("icon/jdbc/jdbcRead.png") + } + + override def getGroup(): List[String] = { + List(StopGroup.JdbcGroup.toString) + } + + override def initialize(ctx: ProcessContext): Unit = {} + + override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { + val spark = pec.get[SparkSession]() + val dbtable = "( " + sql + ")temp" + val jdbcDF = spark.read.format("jdbc") + .option("url", url) + .option("driver", "oracle.jdbc.OracleDriver") + .option("dbtable", dbtable) + .option("user", user) + .option("password",password) + .option("partitionColumn",partitionColumn) + .option("lowerBound",lowerBound) + .option("upperBound",upperBound) + .option("numPartitions",numPartitions) + .load() + + out.write(jdbcDF) + } +}