From 429c3e56f869473f47ca0333e252828cd3877ff0 Mon Sep 17 00:00:00 2001 From: bao319 <51151344+bao319@users.noreply.github.com> Date: Mon, 30 Mar 2020 18:25:21 +0800 Subject: [PATCH] Delete OptionalSelectHiveQL.scala --- .../bundle/hive/OptionalSelectHiveQL.scala | 130 ------------------ 1 file changed, 130 deletions(-) delete mode 100644 piflow-bundle/src/main/scala/cn/piflow/bundle/hive/OptionalSelectHiveQL.scala diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/hive/OptionalSelectHiveQL.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/hive/OptionalSelectHiveQL.scala deleted file mode 100644 index 03fc26a..0000000 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/hive/OptionalSelectHiveQL.scala +++ /dev/null @@ -1,130 +0,0 @@ -package cn.piflow.bundle.hive - -import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} -import cn.piflow.conf.{ConfigurableStop, Port, StopGroup} -import cn.piflow.conf.bean.PropertyDescriptor -import cn.piflow.conf.util.{ImageUtil, MapUtil} -import org.apache.spark.SparkContext -import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession} - -/** - * HIVE JDBC DRIVER DESIGN FOR HIVE 1.2.1 - */ -class OptionalSelectHiveQL extends ConfigurableStop { - override val authorEmail: String = "xiaomeng7890@gmail.com" - override val description: String = "some hive can only achieve by jdbc, this stop is designed for this" - override val inportList: List[String] = List(Port.NonePort) - override val outportList: List[String] = List(Port.DefaultPort) - - private val driverName = "org.apache.hive.jdbc.HiveDriver" - var hiveUser : String = _ - var hivePassword : String = _ - var jdbcUrl : String = _ - var sql : String = _ - - override def setProperties(map: Map[String, Any]): Unit = { - - hiveUser = MapUtil.get(map,"hiveUser").asInstanceOf[String] - hivePassword = MapUtil.get(map,"hivePassword").asInstanceOf[String] - jdbcUrl = MapUtil.get(map,"jdbcUrl").asInstanceOf[String] - sql = MapUtil.get(map,"sql").asInstanceOf[String] - } - override def getPropertyDescriptor(): List[PropertyDescriptor] = { - var descriptor : List[PropertyDescriptor] = List() - val hiveUser = new PropertyDescriptor() - .name("hive user") - .displayName("Hive User") - .description("Users connected to hive") - .defaultValue("root") - .required(true) - .example("root") - - descriptor = hiveUser :: descriptor - - val hivePassword = new PropertyDescriptor(). - name("hive password") - .displayName("Hive Password") - .description("Password to connect to hive") - .defaultValue("123456") - .required(true) - .example("123456") - descriptor = hivePassword :: descriptor - - val jdbcUrl = new PropertyDescriptor(). - name("jdbcUrl") - .displayName("JdbcUrl") - .description("URL for hive to connect to JDBC") - .defaultValue("jdbc:hive2://packone12:2181,packone13:2181,packone11:2181/middle;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2") - .required(true) - .example("jdbc:hive2://192.168.3.140:10000/default") - descriptor = jdbcUrl :: descriptor - - val sql = new PropertyDescriptor(). - name("query") - .displayName("Hive Query") - .description("SQL query statement of hive") - .defaultValue("select * from middle.m_person") - .required(true) - .example("select * from test.user1") - descriptor = sql :: descriptor - - descriptor - } - - - override def getIcon(): Array[Byte] = { - ImageUtil.getImage("icon/hive/OptionalPutHiveQL.png") - } - - override def getGroup(): List[String] = { - List(StopGroup.HiveGroup) - } - - - override def initialize(ctx: ProcessContext): Unit = {} - - override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { - val sc = pec.get[SparkSession]() - val df = getDF (sc.sqlContext, sc.sparkContext, sql) - out.write(df) - } - - def getDF(sqlContext : SQLContext, sc : SparkContext, tableName : String) : DataFrame = { - var df = sqlContext.sql(sql) - val count = df.count() - if (count == 0) { - println("Cant read by normal read, using JDBC <== this will cost a lot of time") - df = getJDBCDF(sqlContext, sc, tableName) - } - df - } - - def getJDBCDF(sqlContext : SQLContext, sc : SparkContext, tableName : String) : DataFrame = { - import java.sql.DriverManager - try - Class.forName(driverName) - catch { - case e: ClassNotFoundException => - e.printStackTrace() - System.exit(1) - } - val conn = DriverManager.getConnection(jdbcUrl, hiveUser, hivePassword) - val ptsm = conn.prepareStatement(sql) - println(ptsm) - val rs = ptsm.executeQuery() - var rows = Seq[Row]() - val meta = rs.getMetaData - for (i <- 1 to meta.getColumnCount) { - println(meta.getColumnName(i)) - } - while (rs.next) { - var row = Seq[String]() - for (i <- 1 to meta.getColumnCount) { - row = row.:+(rs.getString(i)) - } - rows = rows.:+(Row.fromSeq(row)) - } - val organizationRDD = sc.makeRDD(rows) - sqlContext.createDataFrame(organizationRDD, sqlContext.read.table(tableName).schema) - } -}