New put to es,fetch from es.query from es

This commit is contained in:
yanggang 2018-10-31 10:40:11 +08:00
parent 7c65a7510a
commit 7ef5206305
11 changed files with 270 additions and 381 deletions

View File

@ -107,8 +107,8 @@
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.11</artifactId>
<version>5.6.3</version>
<artifactId>elasticsearch-hadoop</artifactId>
<version>6.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>

View File

@ -8,20 +8,44 @@
"name":"SelectHiveQL",
"bundle":"cn.piflow.bundle.hive.SelectHiveQL",
"properties":{
"hiveQL":"select * from sparktest.dblp_phdthesis"
"hiveQL":"select * from sparktest.student"
}
},
{
"uuid":"1111",
"name":"putEs",
"bundle":"cn.piflow.bundle.es.NewPutEs",
"properties":{
"es_nodes":"10.0.86.239",
"port":"9200",
"es_index":"spark",
"es_type":"testStudent1"
}
},
{
"uuid":"1111",
"name":"esPut",
"bundle":"cn.piflow.bundle.es.PutEs",
"uuid":"2222",
"name":"fetchEs",
"bundle":"cn.piflow.bundle.es.NewFetchEs",
"properties":{
"es_nodes":"10.0.86.239",
"port":"9200",
"es_index":"json000",
"es_type":"json_spark000",
"schema":"author,pages"
"es_index":"spark",
"es_type":"json"
}
},
{
"uuid":"3333",
"name":"queryEs",
"bundle":"cn.piflow.bundle.es.NewQueryEs",
"properties":{
"es_nodes":"10.0.86.239",
"port":"9200",
"es_index":"spark",
"es_type":"json",
"field_name":"name",
"field_content":"rose"
}
}
@ -32,7 +56,7 @@
"from":"SelectHiveQL",
"outport":"",
"inport":"",
"to":"esPut"
"to":"putEs"
}
]
}

View File

@ -1,39 +0,0 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"0000",
"name":"JsonSave",
"bundle":"cn.piflow.bundle.json.JsonSave",
"properties":{
"jsonSavePath":"hdfs://10.0.86.89:9000/yg/EsFetch.json"
}
},
{
"uuid":"1111",
"name":"esGet",
"bundle":"cn.piflow.bundle.es.FetchEs",
"properties":{
"es_nodes":"10.0.86.239",
"port":"9200",
"es_index":"sparkToEs",
"es_type":"toEs",
"schema":"author,pages"
}
}
],
"paths":[
{
"from":"esGet",
"outport":"",
"inport":"",
"to":"JsonSave"
}
]
}
}

View File

@ -1,39 +0,0 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"0000",
"name":"JsonSave",
"bundle":"cn.piflow.bundle.json.JsonSave",
"properties":{
"jsonSavePath":"hdfs://10.0.86.89:9000/yg/EsFetch.json"
}
},
{
"uuid":"1111",
"name":"esQurry",
"bundle":"cn.piflow.bundle.es.QurryEs",
"properties":{
"es_nodes":"10.0.86.239",
"port":"9200",
"es_index":"sparkToEs",
"es_type":"toEs",
"schema":"author,pages"
}
}
],
"paths":[
{
"from":"esQurry",
"outport":"",
"inport":"",
"to":"JsonSave"
}
]
}
}

View File

@ -1,88 +0,0 @@
package cn.piflow.bundle.es
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroupEnum}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.spark.sql.SparkSession
class FetchEs extends ConfigurableStop{
val description: String = "Fetch data from Es."
override val authorEmail: String = "xiaoxiao@cnic.cn"
val inportList: List[String] = List(PortEnum.NonePort.toString)
val outportList: List[String] = List(PortEnum.DefaultPort.toString)
var es_nodes:String = _ //es的节点多个用逗号隔开
var port:Int= _ //es的端口好
var es_index:String = _ //es的索引
var es_type:String = _ //es的类型
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
val sc = spark.sqlContext
val options = Map("es.index.auto.create"-> "true",
"es.nodes"->"10.0.86.239","es.port"->"9200")
val sdf = sc.read.format("org.elasticsearch.spark.sql").options(options).load("test/test")
// sdf.select("id").collect().foreach(println(_))
//连接es
// val df = EsSparkSQL.esDF(sc,"test/test")
// println(sdf.schema)
sdf.show()
out.write(sdf)
}
def setProperties(map: Map[String, Any]): Unit = {
es_nodes=MapUtil.get(map,key="es_nodes").asInstanceOf[String]
port=Integer.parseInt(MapUtil.get(map,key="port").toString)
es_index=MapUtil.get(map,key="es_index").asInstanceOf[String]
es_type=MapUtil.get(map,key="es_type").asInstanceOf[String]
println(es_index)
println(port)
println(es_nodes)
println(es_type)
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("REDIS_HOST").defaultValue("").required(true)
val port = new PropertyDescriptor().name("port").displayName("PORT").defaultValue("").required(true)
val es_index = new PropertyDescriptor().name("es_index").displayName("ES_INDEX").defaultValue("").required(true)
val es_type = new PropertyDescriptor().name("es_type").displayName("ES_TYPE").defaultValue("").required(true)
descriptor = es_nodes :: descriptor
descriptor = port :: descriptor
descriptor = es_index :: descriptor
descriptor = es_type :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("es.png")
}
override def getGroup(): List[String] = {
List(StopGroupEnum.ESGroup.toString)
}
override def initialize(ctx: ProcessContext): Unit = {
}
}

View File

@ -4,63 +4,46 @@ import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroupEnum}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.elasticsearch.spark.rdd.EsSpark
import org.elasticsearch.spark.sql.EsSparkSQL
class PutEs extends ConfigurableStop{
val description: String = "Put data to Es."
class NewFetchEs extends ConfigurableStop {
override val authorEmail: String = "xiaoxiao@cnic.cn"
val inportList: List[String] = List(PortEnum.DefaultPort.toString)
val outportList: List[String] = List(PortEnum.NonePort.toString)
override val description: String = "fetch data with dataframe from elasticSearch "
val authorEmail: String = "ygang@cnic.cn"
val inportCount: Int = 0
val outportCount: Int = 1
var es_nodes:String = _ //es的节点多个用逗号隔开
var port:Int= _ //es的端口好
var port:String= _ //es的端口好
var es_index:String = _ //es的索引
var es_type:String = _ //es的类型
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
val ssc = spark.sqlContext
// val options = Map("es.index.auto.create"-> "true",
// "es.nodes"->"10.0.86.239","es.port"->"9200")
//
// val conf = new SparkConf()
// .set("spark.driver.allowMultipleContexts", "true")
//
// conf.set("es.nodes", "10.0.86.239")
// .set("es.port", "9200")
// .set("es.index.auto.create", "true")
val options = Map("es.index.auto.create"-> "true","es.nodes.wan.only"->"true",
"es.nodes"->es_nodes,"es.port"->port)
val outDf = ssc.read.format("org.elasticsearch.spark.sql").options(options).load(s"${es_index}/${es_type}")
outDf.show()
out.write(outDf)
val sc = spark.sqlContext
val inDF = in.read()
inDF.show()
println(inDF.schema)
//连接es
EsSparkSQL.saveToEs(inDF,"/test/test5")
println("cunchuchenggong")
}
def initialize(ctx: ProcessContext): Unit = {
}
def setProperties(map: Map[String, Any]): Unit = {
def setProperties(map : Map[String, Any]): Unit = {
es_nodes=MapUtil.get(map,key="es_nodes").asInstanceOf[String]
port=Integer.parseInt(MapUtil.get(map,key="port").toString)
port=MapUtil.get(map,key="port").asInstanceOf[String]
es_index=MapUtil.get(map,key="es_index").asInstanceOf[String]
es_type=MapUtil.get(map,key="es_type").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
@ -69,27 +52,25 @@ class PutEs extends ConfigurableStop{
val port = new PropertyDescriptor().name("port").displayName("PORT").defaultValue("").required(true)
val es_index = new PropertyDescriptor().name("es_index").displayName("ES_INDEX").defaultValue("").required(true)
val es_type = new PropertyDescriptor().name("es_type").displayName("ES_TYPE").defaultValue("").required(true)
descriptor = es_nodes :: descriptor
descriptor = port :: descriptor
descriptor = es_index :: descriptor
descriptor = es_type :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("es.png")
}
override def getGroup(): List[String] = {
List(StopGroupEnum.ESGroup.toString)
}
override def initialize(ctx: ProcessContext): Unit = {
List(StopGroupEnum.HiveGroup.toString)
}
override val inportList: List[String] = List(PortEnum.DefaultPort.toString)
override val outportList: List[String] = List(PortEnum.NonePort.toString)
}

View File

@ -4,63 +4,57 @@ import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroupEnum}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.elasticsearch.spark.rdd.EsSpark
import org.elasticsearch.spark.sql.EsSparkSQL
class QurryEs extends ConfigurableStop{
val description: String = "Qurry data from Es."
class NewPutEs extends ConfigurableStop {
override val authorEmail: String = "xiaoxiao@cnic.cn"
val inportList: List[String] = List(PortEnum.NonePort.toString)
val outportList: List[String] = List(PortEnum.DefaultPort.toString)
override val description: String = "put data with dataframe to elasticSearch "
val authorEmail: String = "ygang@cnic.cn"
val inportCount: Int = 0
val outportCount: Int = 1
var es_nodes:String = _ //es的节点多个用逗号隔开
var port:Int= _ //es的端口好
var port:String= _ //es的端口好
var es_index:String = _ //es的索引
var es_type:String = _ //es的类型
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
val inDf = in.read()
inDf.show()
val sc = spark.sqlContext
//连接es
val qurry =
"""
|{
| "query":{
| "match":{
| "id":2
| }
| }
|}
""".stripMargin
val sc = spark.sparkContext
val options = Map("es.index.auto.create"-> "true",
"es.nodes"->"10.0.86.239","es.port"->"9200")
"es.nodes"->es_nodes,"es.port"->port)
// val sdf = sc.read.format("org.elasticsearch.spark.sql").options(options).load("test/test",qurry)
EsSparkSQL.saveToEs(inDf,s"${es_index}/${es_type}",options)
val df = EsSparkSQL.esDF(sc,"customer/doc",qurry)
println(df.schema)
out.write(df)
// val json1 = """{"name":"jack", "age":24, "sex":"man"}"""
// val json2 = """{"name":"rose", "age":22, "sex":"woman"}"""
//
// val rddData = sc.makeRDD(Seq(json1, json2))
//
// EsSpark.saveJsonToEs(rddData, "spark/json2",options)
//自定义id
// EsSpark.saveJsonToEs(rddData, "spark/json1", Map("es.mapping.id"->"name"))
}
def initialize(ctx: ProcessContext): Unit = {
def setProperties(map: Map[String, Any]): Unit = {
}
def setProperties(map : Map[String, Any]): Unit = {
es_nodes=MapUtil.get(map,key="es_nodes").asInstanceOf[String]
port=Integer.parseInt(MapUtil.get(map,key="port").toString)
port=MapUtil.get(map,key="port").asInstanceOf[String]
es_index=MapUtil.get(map,key="es_index").asInstanceOf[String]
es_type=MapUtil.get(map,key="es_type").asInstanceOf[String]
println(es_index)
println(port)
println(es_nodes)
println(es_type)
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
@ -69,26 +63,25 @@ class QurryEs extends ConfigurableStop{
val port = new PropertyDescriptor().name("port").displayName("PORT").defaultValue("").required(true)
val es_index = new PropertyDescriptor().name("es_index").displayName("ES_INDEX").defaultValue("").required(true)
val es_type = new PropertyDescriptor().name("es_type").displayName("ES_TYPE").defaultValue("").required(true)
descriptor = es_nodes :: descriptor
descriptor = port :: descriptor
descriptor = es_index :: descriptor
descriptor = es_type :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("es.png")
}
override def getGroup(): List[String] = {
List(StopGroupEnum.ESGroup.toString)
List(StopGroupEnum.HiveGroup.toString)
}
override def initialize(ctx: ProcessContext): Unit = {
}
override val inportList: List[String] = List(PortEnum.DefaultPort.toString)
override val outportList: List[String] = List(PortEnum.NonePort.toString)
}

View File

@ -0,0 +1,125 @@
package cn.piflow.bundle.es
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroupEnum}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.spark.sql.SparkSession
import org.elasticsearch.spark.rdd.EsSpark
class NewQueryEs extends ConfigurableStop {
override val description: String = "query data with dataframe from elasticSearch "
val authorEmail: String = "ygang@cnic.cn"
val inportCount: Int = 0
val outportCount: Int = 1
var es_nodes:String = _ //es的节点多个用逗号隔开
var port:String= _ //es的端口好
var es_index:String = _ //es的索引
var es_type:String = _ //es的类型
var field_name:String = _ //es的类型
var field_content:String = _ //es的类型
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
val ssc = spark.sqlContext
val query =
s"""
|{
| "query":{
| "match":{
|
| "${field_name}":"${field_content}"
| }
| }
|}
""".stripMargin
// val query2 =
// s"""
// |{
// | "query":{
// | "terms":{
// |
// | "age":[22]
// | }
// | }
// |}
// """.stripMargin
//
// val query3 =
// s"""
// |{
// | "query":{
// | "match":{
// | "name":"rose *"
// | }
// | }
// |}
// """.stripMargin
val options = Map("es.index.auto.create"-> "true","es.nodes.wan.only"->"true",
"es.query" -> query,
"es.nodes"->es_nodes,"es.port"->port)
val outDf = ssc.read.format("org.elasticsearch.spark.sql")
.options(options).load(s"${es_index}/${es_type}")
outDf.show()
println(")))))))))))))))))))))))))))))))))))))))))))))))))))))))))))))_________________")
out.write(outDf)
}
def initialize(ctx: ProcessContext): Unit = {
}
def setProperties(map : Map[String, Any]): Unit = {
es_nodes=MapUtil.get(map,key="es_nodes").asInstanceOf[String]
port=MapUtil.get(map,key="port").asInstanceOf[String]
es_index=MapUtil.get(map,key="es_index").asInstanceOf[String]
es_type=MapUtil.get(map,key="es_type").asInstanceOf[String]
field_name=MapUtil.get(map,key="field_name").asInstanceOf[String]
field_content=MapUtil.get(map,key="field_content").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("REDIS_HOST").defaultValue("").required(true)
val port = new PropertyDescriptor().name("port").displayName("PORT").defaultValue("").required(true)
val es_index = new PropertyDescriptor().name("es_index").displayName("ES_INDEX").defaultValue("").required(true)
val es_type = new PropertyDescriptor().name("es_type").displayName("ES_TYPE").defaultValue("").required(true)
val field_name = new PropertyDescriptor().name("field_name").displayName("field_name").defaultValue("").required(true)
val field_content = new PropertyDescriptor().name("field_content").displayName("field_content").defaultValue("").required(true)
descriptor = es_nodes :: descriptor
descriptor = port :: descriptor
descriptor = es_index :: descriptor
descriptor = es_type :: descriptor
descriptor = field_name :: descriptor
descriptor = field_content :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("es.png")
}
override def getGroup(): List[String] = {
List(StopGroupEnum.HiveGroup.toString)
}
override val inportList: List[String] = List(PortEnum.DefaultPort.toString)
override val outportList: List[String] = List(PortEnum.NonePort.toString)
}

View File

@ -0,0 +1,51 @@
package cn.piflow.bundle
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
class ESTest {
@Test
def testEs(): Unit ={
//parse flow json
val file = "src/main/resources/es.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start()
//execute flow
val spark = SparkSession.builder()
.master("spark://10.0.86.89:7077")
.appName("es")
.config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g")
.config("spark.cores.max", "2")
.config("spark.jars","/opt/project/piflow-master/out/artifacts/piflow_bundle/piflow_bundle.jar")
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "hdfs://10.0.86.89:9000/xjzhu/piflow/checkpoints/")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
}

View File

@ -1,122 +0,0 @@
package cn.piflow.bundle
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import org.apache.spark.sql.SparkSession
import org.junit.Test
import scala.util.parsing.json.JSON
class EsTest {
@Test
def testFetchEs():Unit ={
val file = "src/main/resources/esGet.json"
//解析json
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
//execute flow
val spark = SparkSession.builder()
.master("spark://10.0.86.89:7077")
.appName("piflow-hive-bundle")
.config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g")
.config("spark.cores.max", "2")
.config("spark.jars", "/opt/project/piflow-master/out/artifacts/piflow_bundle/piflow_bundle.jar")
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.start(flow)
process.awaitTermination()
spark.close()
}
@Test
def testPutEs(): Unit = {
val file = "src/main/resources/es.json"
//解析json
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
//execute flow
val spark = SparkSession.builder()
.master("spark://10.0.86.89:7077")
.appName("piflow-hive-bundle")
.config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g")
.config("spark.cores.max", "2")
.config("spark.jars", "/opt/project/piflow-master/out/artifacts/piflow_bundle/piflow_bundle.jar")
//.config("spark.jars","/root/.m2/repository/org/elasticsearch/elasticsearch-spark-20_2.11/5.6.3/elasticsearch-spark-20_2.11-5.6.3.jar")
.config("es.index.auto.create", "true") //开启自动创建索引
.config("es.nodes","10.0.86.239") //es的节点
.config("es.port","9200") //端口号
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.start(flow)
process.awaitTermination()
spark.close()
}
@Test
def queryEs(): Unit = {
val file = "src/main/resources/esQurry.json"
//解析json
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
//execute flow
val spark = SparkSession.builder()
.master("spark://10.0.86.89:7077")
.appName("piflow-hive-bundle")
.config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g")
.config("spark.cores.max", "2")
.config("spark.jars", "/opt/project/piflow-master/out/artifacts/piflow_bundle/piflow_bundle.jar")
.config("es.index.auto.create", "true") //开启自动创建索引
.config("es.nodes","10.0.86.239") //es的节点
.config("es.port","9200") //端口号
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.start(flow)
process.awaitTermination()
spark.close()
}
}

View File

@ -4,6 +4,7 @@ import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
@ -22,6 +23,8 @@ class HdfsTest {
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start()
val spark = SparkSession.builder()
.master("spark://10.0.86.89:7077")
.appName("DblpParserTest")