update Es

This commit is contained in:
yanggang 2018-11-02 10:10:08 +08:00
parent f8dd478f4d
commit d14b8ebc1f
4 changed files with 14 additions and 15 deletions

View File

@ -14,7 +14,7 @@
{
"uuid":"1111",
"name":"putEs",
"bundle":"cn.piflow.bundle.es.NewPutEs",
"bundle":"cn.piflow.bundle.es.PutEs",
"properties":{
"es_nodes":"10.0.86.239",
"port":"9200",
@ -26,7 +26,7 @@
{
"uuid":"2222",
"name":"fetchEs",
"bundle":"cn.piflow.bundle.es.NewFetchEs",
"bundle":"cn.piflow.bundle.es.FetchEs",
"properties":{
"es_nodes":"10.0.86.239",
"port":"9200",
@ -38,7 +38,7 @@
{
"uuid":"3333",
"name":"queryEs",
"bundle":"cn.piflow.bundle.es.NewQueryEs",
"bundle":"cn.piflow.bundle.es.QueryEs",
"properties":{
"es_nodes":"10.0.86.239",
"port":"9200",

View File

@ -9,7 +9,7 @@ import org.apache.spark.sql.SparkSession
import org.elasticsearch.spark.rdd.EsSpark
import org.elasticsearch.spark.sql.EsSparkSQL
class NewFetchEs extends ConfigurableStop {
class FetchEs extends ConfigurableStop {
override val description: String = "fetch data with dataframe from elasticSearch "
val authorEmail: String = "ygang@cnic.cn"
@ -24,12 +24,12 @@ class NewFetchEs extends ConfigurableStop {
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
val ssc = spark.sqlContext
val options = Map("es.index.auto.create"-> "true","es.nodes.wan.only"->"true",
"es.nodes"->es_nodes,"es.port"->port)
//load data with df from es
val outDf = ssc.read.format("org.elasticsearch.spark.sql").options(options).load(s"${es_index}/${es_type}")
outDf.show()
out.write(outDf)
@ -67,7 +67,7 @@ class NewFetchEs extends ConfigurableStop {
}
override def getGroup(): List[String] = {
List(StopGroupEnum.HiveGroup.toString)
List(StopGroupEnum.ESGroup.toString)
}

View File

@ -9,7 +9,7 @@ import org.apache.spark.sql.SparkSession
import org.elasticsearch.spark.rdd.EsSpark
import org.elasticsearch.spark.sql.EsSparkSQL
class NewPutEs extends ConfigurableStop {
class PutEs extends ConfigurableStop {
override val description: String = "put data with dataframe to elasticSearch "
val authorEmail: String = "ygang@cnic.cn"
@ -30,6 +30,7 @@ class NewPutEs extends ConfigurableStop {
val options = Map("es.index.auto.create"-> "true",
"es.nodes"->es_nodes,"es.port"->port)
//保存 df 到es
EsSparkSQL.saveToEs(inDf,s"${es_index}/${es_type}",options)
@ -78,7 +79,7 @@ class NewPutEs extends ConfigurableStop {
}
override def getGroup(): List[String] = {
List(StopGroupEnum.HiveGroup.toString)
List(StopGroupEnum.ESGroup.toString)
}

View File

@ -7,7 +7,7 @@ import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.spark.sql.SparkSession
import org.elasticsearch.spark.rdd.EsSpark
class NewQueryEs extends ConfigurableStop {
class QueryEs extends ConfigurableStop {
override val description: String = "query data with dataframe from elasticSearch "
val authorEmail: String = "ygang@cnic.cn"
@ -25,6 +25,7 @@ class NewQueryEs extends ConfigurableStop {
val spark = pec.get[SparkSession]()
val ssc = spark.sqlContext
// 查询 语句 字段内容
val query =
s"""
|{
@ -61,7 +62,8 @@ class NewQueryEs extends ConfigurableStop {
// """.stripMargin
val options = Map("es.index.auto.create"-> "true","es.nodes.wan.only"->"true",
val options = Map("es.index.auto.create"-> "true",
"es.nodes.wan.only"->"true",
"es.query" -> query,
"es.nodes"->es_nodes,"es.port"->port)
@ -70,11 +72,7 @@ class NewQueryEs extends ConfigurableStop {
.options(options).load(s"${es_index}/${es_type}")
outDf.show()
println(")))))))))))))))))))))))))))))))))))))))))))))))))))))))))))))_________________")
out.write(outDf)
}
def initialize(ctx: ProcessContext): Unit = {
@ -116,7 +114,7 @@ class NewQueryEs extends ConfigurableStop {
}
override def getGroup(): List[String] = {
List(StopGroupEnum.HiveGroup.toString)
List(StopGroupEnum.ESGroup.toString)
}