diff --git a/piflow-bundle/pom.xml b/piflow-bundle/pom.xml index 4e7a6bc..5691622 100644 --- a/piflow-bundle/pom.xml +++ b/piflow-bundle/pom.xml @@ -19,6 +19,14 @@ + + org.apache.hive + hive-jdbc + 1.2.1 + + org.mongodb mongodb-driver diff --git a/piflow-bundle/src/main/resources/impala/SelectImpala.json b/piflow-bundle/src/main/resources/impala/SelectImpala.json new file mode 100644 index 0000000..7e358d3 --- /dev/null +++ b/piflow-bundle/src/main/resources/impala/SelectImpala.json @@ -0,0 +1,46 @@ +{ + "flow":{ + "name":"test", + "uuid":"1234", + "checkpoint":"Merge", + "stops":[ + + { + "uuid":"0000", + "name":"SelectImpala", + "bundle":"cn.piflow.bundle.impala.SelectImpala", + "properties":{ + "url":"10.0.82.165:21050", + "user":"", + "password":"test1", + "sql":"select * from kylin.test1", + "schameString":"pid,name" + + } + }, + { + "uuid":"1111", + "name":"putHdfs", + "bundle":"cn.piflow.bundle.hdfs.PutHdfs", + "properties":{ + "hdfsUrl":"hdfs://10.0.86.89:9000", + "hdfsPath":"/yg/0", + "types":"csv" + } + + } + + + ], + "paths":[ + { + "from":"SelectImpala", + "outport":"", + "inport":"", + "to":"putHdfs" + } + + + ] + } +} \ No newline at end of file diff --git a/piflow-bundle/src/main/resources/impala/impala-logo.png b/piflow-bundle/src/main/resources/impala/impala-logo.png new file mode 100644 index 0000000..d414518 Binary files /dev/null and b/piflow-bundle/src/main/resources/impala/impala-logo.png differ diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/impala/SelectImpala.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/impala/SelectImpala.scala new file mode 100644 index 0000000..1109b80 --- /dev/null +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/impala/SelectImpala.scala @@ -0,0 +1,108 @@ +package cn.piflow.bundle.impala + +import java.sql.{Connection, DriverManager, ResultSet, Statement} + +import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} +import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroupEnum} +import cn.piflow.conf.bean.PropertyDescriptor +import cn.piflow.conf.util.{ImageUtil, MapUtil} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.sql.{DataFrame, Row, SparkSession} + +import scala.collection.mutable.ArrayBuffer + + +class SelectImpala extends ConfigurableStop{ + override val authorEmail: String = "yangqidong@cnic.cn" + override val description: String = "get data from impala" + val inportList: List[String] = List(PortEnum.NonePort.toString) + val outportList: List[String] = List(PortEnum.DefaultPort.toString) + + var url:String=_ + var user:String=_ + var password:String=_ + var sql:String=_ + var schameString : String=_ + + + override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { + val session: SparkSession = pec.get[SparkSession]() + + //jdbc:hive2://10.0.82.165:21050/;auth=noSasl + + Class.forName("org.apache.hive.jdbc.HiveDriver") + + val con: Connection = DriverManager.getConnection("jdbc:hive2://"+url+"/;auth=noSasl",user,password) + val stmt: Statement = con.createStatement() + // val rs: ResultSet = stmt.executeQuery("select * from kylin.test1 full join kylin.morg on kylin.test1.pid=kylin.morg.belongtocode") + val rs: ResultSet = stmt.executeQuery(sql) + + val filedNames: Array[String] = schameString.split(",") + var rowsArr:ArrayBuffer[ArrayBuffer[String]]=ArrayBuffer() + while (rs.next()){ + var rowArr:ArrayBuffer[String]=ArrayBuffer() + for(fileName <- filedNames){ + rowArr+=rs.getString(fileName) + } + rowsArr+=rowArr + } + + val fields: Array[StructField] = filedNames.map(d=>StructField(d,StringType,nullable = true)) + val schema: StructType = StructType(fields) + + val rows: List[Row] = rowsArr.toList.map(arr => { + val row: Row = Row.fromSeq(arr) + row + }) + val rdd: RDD[Row] = session.sparkContext.makeRDD(rows) + val df: DataFrame = session.createDataFrame(rdd,schema) + + println("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@") + df.show(20) + println("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@") + + out.write(df) + + } + + override def setProperties(map: Map[String, Any]): Unit = { + url = MapUtil.get(map,"url").asInstanceOf[String] + user = MapUtil.get(map,"user").asInstanceOf[String] + password = MapUtil.get(map,"password").asInstanceOf[String] + sql = MapUtil.get(map,"sql").asInstanceOf[String] + schameString = MapUtil.get(map,"schameString").asInstanceOf[String] + + } + + + override def getPropertyDescriptor(): List[PropertyDescriptor] = { + var descriptor : List[PropertyDescriptor] = List() + + val url=new PropertyDescriptor().name("url").displayName("url").description("IP and port number, you need to write like this -- ip:port").defaultValue("").required(true) + descriptor = url :: descriptor + val user=new PropertyDescriptor().name("user").displayName("user").description("").defaultValue("").required(false) + descriptor = user :: descriptor + val password=new PropertyDescriptor().name("password").displayName("password").description("").defaultValue("").required(false) + descriptor = password :: descriptor + val sql=new PropertyDescriptor().name("sql").displayName("sql").description("The name of the table has not changed.But you have to specify which database, such as database.table.").defaultValue("").required(true) + descriptor = sql :: descriptor + val schameString=new PropertyDescriptor().name("schameString").displayName("schameString").description("The field of SQL statement query results is divided by ,").defaultValue("").required(true) + descriptor = schameString :: descriptor + + descriptor + } + + override def getIcon(): Array[Byte] = { + ImageUtil.getImage("impala/impala-logo.png") + } + + override def getGroup(): List[String] = { + List(StopGroupEnum.Mongodb.toString) + } + + override def initialize(ctx: ProcessContext): Unit = { + } + + +} diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/internetWorm/spider.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/internetWorm/spider.scala index c4218b4..6eea32b 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/internetWorm/spider.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/internetWorm/spider.scala @@ -214,7 +214,8 @@ class spider extends ConfigurableStop{ } override def getIcon(): Array[Byte] = { - ImageUtil.getImage("spider.jpg") + ImageUtil.getImage("spider.jpeg") + } override def getGroup(): List[String] = { diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/memcache/ComplementByMemcache.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/memcache/ComplementByMemcache.scala index 819e7cb..a859779 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/memcache/ComplementByMemcache.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/memcache/ComplementByMemcache.scala @@ -14,7 +14,7 @@ import scala.collection.mutable.ArrayBuffer class ComplementByMemcache extends ConfigurableStop { override val authorEmail: String = "yangqidong@cnic.cn" - override val description: String = "get data from mongodb" + override val description: String = "Supplement to Memcache query data" val inportList: List[String] = List(PortEnum.DefaultPort.toString) val outportList: List[String] = List(PortEnum.DefaultPort.toString) diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/memcache/GetMemcache.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/memcache/GetMemcache.scala index 4e5dd1f..a547670 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/memcache/GetMemcache.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/memcache/GetMemcache.scala @@ -15,7 +15,7 @@ import scala.collection.mutable.ArrayBuffer class GetMemcache extends ConfigurableStop{ override val authorEmail: String = "yangqidong@cnic.cn" - override val description: String = "get data from mongodb" + override val description: String = "get data from memache" val inportList: List[String] = List(PortEnum.DefaultPort.toString) val outportList: List[String] = List(PortEnum.DefaultPort.toString) diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/memcache/PutMemcache.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/memcache/PutMemcache.scala index 2fb519c..4d69773 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/memcache/PutMemcache.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/memcache/PutMemcache.scala @@ -9,7 +9,7 @@ import org.apache.spark.sql.{DataFrame, Row, SparkSession} class PutMemcache extends ConfigurableStop{ override val authorEmail: String = "yangqidong@cnic.cn" - override val description: String = "get data from mongodb" + override val description: String = "get data from memcache" val inportList: List[String] = List(PortEnum.DefaultPort.toString) val outportList: List[String] = List(PortEnum.NonePort.toString) diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/impala/SelectImpalaTest.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/impala/SelectImpalaTest.scala new file mode 100644 index 0000000..b8924d5 --- /dev/null +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/impala/SelectImpalaTest.scala @@ -0,0 +1,65 @@ +package cn.piflow.bundle.impala + +import cn.piflow.Runner +import cn.piflow.conf.bean.FlowBean +import cn.piflow.conf.util.{FileUtil, OptionUtil} +import org.apache.spark.sql.SparkSession +import org.h2.tools.Server +import org.junit.Test + +import scala.util.parsing.json.JSON + +class SelectImpalaTest { + + @Test + def testFlow(): Unit ={ + + //parse flow json + val file = "src/main/resources/impala/SelectImpala.json" + val flowJsonStr = FileUtil.fileReader(file) + val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]] + println(map) + + //create flow + val flowBean = FlowBean(map) + val flow = flowBean.constructFlow() + + val h2Server = Server.createTcpServer("-tcp","-tcpAllowOthers","-tcpPort","50001").start() + + + //execute flow + val spark = SparkSession.builder() + .master("spark://10.0.86.89:7077") + .appName("piflow-hive-bundle") + .config("spark.driver.memory", "1g") + .config("spark.executor.memory", "2g") + .config("spark.cores.max", "2") + .config("spark.jars","/root/Desktop/gitWORK/out/artifacts/piflow_bundle/piflow_bundle.jar") + .enableHiveSupport() + .getOrCreate() + + val process = Runner.create() + .bind(classOf[SparkSession].getName, spark) + .bind("checkpoint.path", "hdfs://10.0.86.89:9000/xjzhu/piflow/checkpoints/") + .start(flow); + + process.awaitTermination(); + val pid = process.pid(); + println(pid + "!!!!!!!!!!!!!!!!!!!!!") + spark.close(); + } + @Test + def testFlow2json() = { + + //parse flow json + val file = "src/main/resources/flow.json" + val flowJsonStr = FileUtil.fileReader(file) + val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]] + + //create flow + val flowBean = FlowBean(map) + val flowJson = flowBean.toJson() + println(flowJson) + } + +}