Additional stop reading impala, changes to description information of previous Memcache stop, and changes to logo of spider

yang qidong
This commit is contained in:
yanfqidong0604 2018-11-13 12:35:52 +08:00
parent 6994550803
commit 32925d3071
9 changed files with 232 additions and 4 deletions

View File

@ -19,6 +19,14 @@
<dependencies>
<!--
https://mvnrepository.com/artifact/org.apache.hive/hive-jdbc
-->
<dependency><groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver</artifactId>

View File

@ -0,0 +1,46 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"checkpoint":"Merge",
"stops":[
{
"uuid":"0000",
"name":"SelectImpala",
"bundle":"cn.piflow.bundle.impala.SelectImpala",
"properties":{
"url":"10.0.82.165:21050",
"user":"",
"password":"test1",
"sql":"select * from kylin.test1",
"schameString":"pid,name"
}
},
{
"uuid":"1111",
"name":"putHdfs",
"bundle":"cn.piflow.bundle.hdfs.PutHdfs",
"properties":{
"hdfsUrl":"hdfs://10.0.86.89:9000",
"hdfsPath":"/yg/0",
"types":"csv"
}
}
],
"paths":[
{
"from":"SelectImpala",
"outport":"",
"inport":"",
"to":"putHdfs"
}
]
}
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 25 KiB

View File

@ -0,0 +1,108 @@
package cn.piflow.bundle.impala
import java.sql.{Connection, DriverManager, ResultSet, Statement}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroupEnum}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import scala.collection.mutable.ArrayBuffer
class SelectImpala extends ConfigurableStop{
override val authorEmail: String = "yangqidong@cnic.cn"
override val description: String = "get data from impala"
val inportList: List[String] = List(PortEnum.NonePort.toString)
val outportList: List[String] = List(PortEnum.DefaultPort.toString)
var url:String=_
var user:String=_
var password:String=_
var sql:String=_
var schameString : String=_
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val session: SparkSession = pec.get[SparkSession]()
//jdbc:hive2://10.0.82.165:21050/;auth=noSasl
Class.forName("org.apache.hive.jdbc.HiveDriver")
val con: Connection = DriverManager.getConnection("jdbc:hive2://"+url+"/;auth=noSasl",user,password)
val stmt: Statement = con.createStatement()
// val rs: ResultSet = stmt.executeQuery("select * from kylin.test1 full join kylin.morg on kylin.test1.pid=kylin.morg.belongtocode")
val rs: ResultSet = stmt.executeQuery(sql)
val filedNames: Array[String] = schameString.split(",")
var rowsArr:ArrayBuffer[ArrayBuffer[String]]=ArrayBuffer()
while (rs.next()){
var rowArr:ArrayBuffer[String]=ArrayBuffer()
for(fileName <- filedNames){
rowArr+=rs.getString(fileName)
}
rowsArr+=rowArr
}
val fields: Array[StructField] = filedNames.map(d=>StructField(d,StringType,nullable = true))
val schema: StructType = StructType(fields)
val rows: List[Row] = rowsArr.toList.map(arr => {
val row: Row = Row.fromSeq(arr)
row
})
val rdd: RDD[Row] = session.sparkContext.makeRDD(rows)
val df: DataFrame = session.createDataFrame(rdd,schema)
println("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@")
df.show(20)
println("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@")
out.write(df)
}
override def setProperties(map: Map[String, Any]): Unit = {
url = MapUtil.get(map,"url").asInstanceOf[String]
user = MapUtil.get(map,"user").asInstanceOf[String]
password = MapUtil.get(map,"password").asInstanceOf[String]
sql = MapUtil.get(map,"sql").asInstanceOf[String]
schameString = MapUtil.get(map,"schameString").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val url=new PropertyDescriptor().name("url").displayName("url").description("IP and port number, you need to write like this -- ip:port").defaultValue("").required(true)
descriptor = url :: descriptor
val user=new PropertyDescriptor().name("user").displayName("user").description("").defaultValue("").required(false)
descriptor = user :: descriptor
val password=new PropertyDescriptor().name("password").displayName("password").description("").defaultValue("").required(false)
descriptor = password :: descriptor
val sql=new PropertyDescriptor().name("sql").displayName("sql").description("The name of the table has not changed.But you have to specify which database, such as database.table.").defaultValue("").required(true)
descriptor = sql :: descriptor
val schameString=new PropertyDescriptor().name("schameString").displayName("schameString").description("The field of SQL statement query results is divided by ,").defaultValue("").required(true)
descriptor = schameString :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("impala/impala-logo.png")
}
override def getGroup(): List[String] = {
List(StopGroupEnum.Mongodb.toString)
}
override def initialize(ctx: ProcessContext): Unit = {
}
}

View File

@ -214,7 +214,8 @@ class spider extends ConfigurableStop{
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("spider.jpg")
ImageUtil.getImage("spider.jpeg")
}
override def getGroup(): List[String] = {

View File

@ -14,7 +14,7 @@ import scala.collection.mutable.ArrayBuffer
class ComplementByMemcache extends ConfigurableStop {
override val authorEmail: String = "yangqidong@cnic.cn"
override val description: String = "get data from mongodb"
override val description: String = "Supplement to Memcache query data"
val inportList: List[String] = List(PortEnum.DefaultPort.toString)
val outportList: List[String] = List(PortEnum.DefaultPort.toString)

View File

@ -15,7 +15,7 @@ import scala.collection.mutable.ArrayBuffer
class GetMemcache extends ConfigurableStop{
override val authorEmail: String = "yangqidong@cnic.cn"
override val description: String = "get data from mongodb"
override val description: String = "get data from memache"
val inportList: List[String] = List(PortEnum.DefaultPort.toString)
val outportList: List[String] = List(PortEnum.DefaultPort.toString)

View File

@ -9,7 +9,7 @@ import org.apache.spark.sql.{DataFrame, Row, SparkSession}
class PutMemcache extends ConfigurableStop{
override val authorEmail: String = "yangqidong@cnic.cn"
override val description: String = "get data from mongodb"
override val description: String = "get data from memcache"
val inportList: List[String] = List(PortEnum.DefaultPort.toString)
val outportList: List[String] = List(PortEnum.NonePort.toString)

View File

@ -0,0 +1,65 @@
package cn.piflow.bundle.impala
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
class SelectImpalaTest {
@Test
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/impala/SelectImpala.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val h2Server = Server.createTcpServer("-tcp","-tcpAllowOthers","-tcpPort","50001").start()
//execute flow
val spark = SparkSession.builder()
.master("spark://10.0.86.89:7077")
.appName("piflow-hive-bundle")
.config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g")
.config("spark.cores.max", "2")
.config("spark.jars","/root/Desktop/gitWORK/out/artifacts/piflow_bundle/piflow_bundle.jar")
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "hdfs://10.0.86.89:9000/xjzhu/piflow/checkpoints/")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
@Test
def testFlow2json() = {
//parse flow json
val file = "src/main/resources/flow.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
//create flow
val flowBean = FlowBean(map)
val flowJson = flowBean.toJson()
println(flowJson)
}
}