Merge remote-tracking branch 'origin/master'

This commit is contained in:
bao319 2020-04-02 16:49:34 +08:00
commit 03edd124c3
9 changed files with 57 additions and 242 deletions

View File

@ -1,36 +0,0 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"2222",
"name":"fetchEs",
"bundle":"cn.piflow.bundle.es.FetchEs",
"properties":{
"es_nodes":"10.0.86.239",
"port":"9200",
"es_index":"spark",
"es_type":"json"
}
},
{
"uuid":"0000",
"name":"PutHiveStreaming",
"bundle":"cn.piflow.bundle.hive.PutHiveStreaming",
"properties":{
"hiveQL":"select * from sparktest.student"
}
}
],
"paths":[
{
"from":"fetchEs",
"outport":"",
"inport":"",
"to":"PutHiveStreaming"
}
]
}
}

View File

@ -8,18 +8,19 @@
"name":"SelectHiveQL",
"bundle":"cn.piflow.bundle.hive.SelectHiveQL",
"properties":{
"hiveQL":"select * from sparktest.student"
"hiveQL":"select * from test.paper"
}
},
{
"uuid":"1111",
"name":"putEs",
"bundle":"cn.piflow.bundle.es.PutEs",
"bundle":"cn.piflow.bundle.elasticsearch.PutElasticsearch",
"properties":{
"es_nodes":"10.0.86.239",
"port":"9200",
"es_nodes":"10.0.88.70",
"es_port":"9200",
"es_index":"spark",
"es_type":"testStudent1"
"es_type":"test_paper",
"configuration_item":""
}
}
],

View File

@ -6,32 +6,23 @@
{
"uuid":"3333",
"name":"queryEs",
"bundle":"cn.piflow.bundle.es.QueryEs",
"bundle":"cn.piflow.bundle.elasticsearch.QueryElasticsearch",
"properties":{
"es_nodes":"10.0.86.239",
"port":"9200",
"es_index":"spark",
"es_type":"json",
"jsonDSL":"GET _search \n {\\\"query\\\":{\\\"match_all:{}\\\"}}"
}
},
{
"uuid":"0000",
"name":"PutHiveStreaming",
"bundle":"cn.piflow.bundle.hive.PutHiveStreaming",
"properties":{
"hiveQL":"select * from sparktest.student"
"es_nodes":"10.0.88.70",
"es_port":"9200",
"es_index":"taxonomy",
"es_type":"taxonomy",
"jsonDSL":"{\"query\":{\"match_all\":{}}}"
}
}
],
"paths":[
{
"from":"queryEs",
"from":"",
"outport":"",
"inport":"",
"to":"PutHiveStreaming"
"to":""
}
]
}

View File

@ -1,4 +1,4 @@
package cn.piflow.bundle.es
package cn.piflow.bundle.elasticsearch
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
@ -60,7 +60,7 @@ class PutElasticsearch extends ConfigurableStop {
.description("Node of Elasticsearch")
.defaultValue("")
.required(true)
.example("10.0.86.239")
.example("10.0.88.70")
descriptor = es_nodes :: descriptor
val es_port = new PropertyDescriptor()
@ -68,6 +68,7 @@ class PutElasticsearch extends ConfigurableStop {
.name("es_port")
.displayName("Es_Port")
.description("Port of Elasticsearch")
.defaultValue("9200")
.required(true)
.example("9200")
descriptor = es_port :: descriptor
@ -96,7 +97,7 @@ class PutElasticsearch extends ConfigurableStop {
.description("Configuration Item of Es.such as:es.mapping.parent->id_1,es.mapping.parent->id_2")
.defaultValue("")
.required(false)
.example("es.mapping.parent->id_1")
.example("es.mapping.parent->id_1,es.mapping.parent->id_2")
descriptor = configuration_item :: descriptor
descriptor

View File

@ -1,4 +1,4 @@
package cn.piflow.bundle.es
package cn.piflow.bundle.elasticsearch
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
@ -31,6 +31,8 @@ class QueryElasticsearch extends ConfigurableStop {
val outDf = ssc.read.format("org.elasticsearch.spark.sql")
.options(options).load(s"${es_index}/${es_type}")
outDf.printSchema()
out.write(outDf)
}
@ -55,7 +57,7 @@ class QueryElasticsearch extends ConfigurableStop {
.description("Node of Elasticsearch")
.defaultValue("")
.required(true)
.example("")
.example("10.0.88.70")
descriptor = es_nodes :: descriptor
val es_port = new PropertyDescriptor()
@ -64,7 +66,7 @@ class QueryElasticsearch extends ConfigurableStop {
.description("Port of Elasticsearch")
.defaultValue("9200")
.required(true)
.example("")
.example("9200")
descriptor = es_port :: descriptor
val es_index = new PropertyDescriptor()
@ -73,7 +75,7 @@ class QueryElasticsearch extends ConfigurableStop {
.description("Index of Elasticsearch")
.defaultValue("")
.required(true)
.example("")
.example("test")
descriptor = es_index :: descriptor
val es_type = new PropertyDescriptor()
@ -82,16 +84,16 @@ class QueryElasticsearch extends ConfigurableStop {
.description("Type of Elasticsearch")
.defaultValue("")
.required(true)
.example("")
.example("test")
descriptor = es_type :: descriptor
val jsonDSL = new PropertyDescriptor()
.name("jsonDSL")
.displayName("JsonDSL")
.description("DSL of Elasticsearch")
.defaultValue("")
.defaultValue("{\"query\":{\"match_all\":{}}}")
.required(true)
.example("GET _search \n {\\\"query\\\":{\\\"match_all:{}\\\"}}")
.example("{\"query\":{\"match_all\":{}}}")
descriptor = jsonDSL :: descriptor
descriptor

View File

@ -1,98 +0,0 @@
package cn.piflow.bundle.es
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.conf.{ConfigurableStop, Port, StopGroup}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.spark.sql.SparkSession
class FetchElasticsearch extends ConfigurableStop {
val authorEmail: String = "ygang@cnic.cn"
val description: String = "Fetch data from Elasticsearch"
val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.DefaultPort)
var es_nodes : String = _
var es_port : String = _
var es_index : String = _
var es_type : String = _
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
val ssc = spark.sqlContext
val options = Map("es.index.auto.create"-> "true",
"es.nodes.wan.only"->"true",
"es.nodes"->es_nodes,
"es.port"->es_port)
val outDf = ssc.read.format("org.elasticsearch.spark.sql").options(options).load(s"${es_index}/${es_type}")
out.write(outDf)
}
def initialize(ctx: ProcessContext): Unit = {
}
def setProperties(map : Map[String, Any]): Unit = {
es_nodes=MapUtil.get(map,key="es_nodes").asInstanceOf[String]
es_port=MapUtil.get(map,key="es_port").asInstanceOf[String]
es_index=MapUtil.get(map,key="es_index").asInstanceOf[String]
es_type=MapUtil.get(map,key="es_type").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val es_nodes = new PropertyDescriptor()
.name("es_nodes")
.displayName("Es_Nodes")
.description("Node of Elasticsearch")
.defaultValue("")
.required(true)
.example("10.0.86.239")
descriptor = es_nodes :: descriptor
val es_port = new PropertyDescriptor()
.name("es_port")
.displayName("Es_Port")
.description("Port of Elasticsearch")
.defaultValue("9200")
.required(true)
.example("9200")
descriptor = es_port :: descriptor
val es_index = new PropertyDescriptor()
.name("es_index")
.displayName("Es_Index")
.description("Index of Elasticsearch")
.defaultValue("")
.required(true)
.example("spark")
descriptor = es_index :: descriptor
val es_type = new PropertyDescriptor()
.name("es_type")
.displayName("Es_Type")
.description("Type of Elasticsearch")
.defaultValue("")
.required(true)
.example("json")
descriptor = es_type :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("icon/elasticsearch/FetchEs.png")
}
override def getGroup(): List[String] = {
List(StopGroup.ESGroup)
}
}

View File

@ -1,19 +1,21 @@
package cn.piflow.bundle.es
package cn.piflow.bundle.elasticsearch
import java.net.InetAddress
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.PropertyUtil
import cn.piflow.util.{PropertyUtil, ServerIpUtil}
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
class PutElasticsearchTest {
class PutEsTest {
@Test
def testEs(): Unit ={
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/es/PutEs.json"
@ -25,16 +27,18 @@ class PutElasticsearchTest {
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort", "50001").start()
val ip = InetAddress.getLocalHost.getHostAddress
cn.piflow.util.FileUtil.writeFile("server.ip=" + ip, ServerIpUtil.getServerIpFile())
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start()
//execute flow
val spark = SparkSession.builder()
.master("local[*]")
.appName("CsvParserTest")
.config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g")
.config("spark.cores.max", "2")
.config("hive.metastore.uris", PropertyUtil.getPropertyValue("hive.metastore.uris"))
.master("local[12]")
.appName("hive")
.config("spark.driver.memory", "4g")
.config("spark.executor.memory", "8g")
.config("spark.cores.max", "8")
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport()
.getOrCreate()

View File

@ -1,19 +1,21 @@
package cn.piflow.bundle.es
package cn.piflow.bundle.elasticsearch
import java.net.InetAddress
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.PropertyUtil
import cn.piflow.util.{PropertyUtil, ServerIpUtil}
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
class QueryElasticsearchTest {
class QueryEsTest {
@Test
def testEs(): Unit ={
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/es/QueryEs.json"
@ -25,16 +27,18 @@ class QueryElasticsearchTest {
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort", "50001").start()
val ip = InetAddress.getLocalHost.getHostAddress
cn.piflow.util.FileUtil.writeFile("server.ip=" + ip, ServerIpUtil.getServerIpFile())
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start()
//execute flow
val spark = SparkSession.builder()
.master("local[*]")
.appName("CsvParserTest")
.config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g")
.config("spark.cores.max", "2")
.config("hive.metastore.uris", PropertyUtil.getPropertyValue("hive.metastore.uris"))
.master("local[12]")
.appName("hive")
.config("spark.driver.memory", "4g")
.config("spark.executor.memory", "8g")
.config("spark.cores.max", "8")
// .config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport()
.getOrCreate()

View File

@ -1,54 +0,0 @@
package cn.piflow.bundle.es
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.PropertyUtil
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
class FetchElasticsearchTest {
@Test
def testEs(): Unit ={
//parse flow json
val file = "src/main/resources/flow/es/FetchEs.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort", "50001").start()
//execute flow
val spark = SparkSession.builder()
.master("local[*]")
.appName("CsvParserTest")
.config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g")
.config("spark.cores.max", "2")
.config("hive.metastore.uris", PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "")
.bind("debug.path","")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
}