From d14b8ebc1f8389c0df0326fdd9cd6e921ed2e51b Mon Sep 17 00:00:00 2001 From: yanggang Date: Fri, 2 Nov 2018 10:10:08 +0800 Subject: [PATCH] update Es --- piflow-bundle/src/main/resources/es.json | 6 +++--- .../bundle/es/{NewFetchEs.scala => FetchEs.scala} | 6 +++--- .../piflow/bundle/es/{NewPutEs.scala => PutEs.scala} | 5 +++-- .../bundle/es/{NewQueryEs.scala => QueryEs.scala} | 12 +++++------- 4 files changed, 14 insertions(+), 15 deletions(-) rename piflow-bundle/src/main/scala/cn/piflow/bundle/es/{NewFetchEs.scala => FetchEs.scala} (95%) rename piflow-bundle/src/main/scala/cn/piflow/bundle/es/{NewPutEs.scala => PutEs.scala} (96%) rename piflow-bundle/src/main/scala/cn/piflow/bundle/es/{NewQueryEs.scala => QueryEs.scala} (93%) diff --git a/piflow-bundle/src/main/resources/es.json b/piflow-bundle/src/main/resources/es.json index 69236bd..e9600d8 100644 --- a/piflow-bundle/src/main/resources/es.json +++ b/piflow-bundle/src/main/resources/es.json @@ -14,7 +14,7 @@ { "uuid":"1111", "name":"putEs", - "bundle":"cn.piflow.bundle.es.NewPutEs", + "bundle":"cn.piflow.bundle.es.PutEs", "properties":{ "es_nodes":"10.0.86.239", "port":"9200", @@ -26,7 +26,7 @@ { "uuid":"2222", "name":"fetchEs", - "bundle":"cn.piflow.bundle.es.NewFetchEs", + "bundle":"cn.piflow.bundle.es.FetchEs", "properties":{ "es_nodes":"10.0.86.239", "port":"9200", @@ -38,7 +38,7 @@ { "uuid":"3333", "name":"queryEs", - "bundle":"cn.piflow.bundle.es.NewQueryEs", + "bundle":"cn.piflow.bundle.es.QueryEs", "properties":{ "es_nodes":"10.0.86.239", "port":"9200", diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/es/NewFetchEs.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/es/FetchEs.scala similarity index 95% rename from piflow-bundle/src/main/scala/cn/piflow/bundle/es/NewFetchEs.scala rename to piflow-bundle/src/main/scala/cn/piflow/bundle/es/FetchEs.scala index f9bd0b5..77ada14 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/es/NewFetchEs.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/es/FetchEs.scala @@ -9,7 +9,7 @@ import org.apache.spark.sql.SparkSession import org.elasticsearch.spark.rdd.EsSpark import org.elasticsearch.spark.sql.EsSparkSQL -class NewFetchEs extends ConfigurableStop { +class FetchEs extends ConfigurableStop { override val description: String = "fetch data with dataframe from elasticSearch " val authorEmail: String = "ygang@cnic.cn" @@ -24,12 +24,12 @@ class NewFetchEs extends ConfigurableStop { def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { val spark = pec.get[SparkSession]() - val ssc = spark.sqlContext val options = Map("es.index.auto.create"-> "true","es.nodes.wan.only"->"true", "es.nodes"->es_nodes,"es.port"->port) + //load data with df from es val outDf = ssc.read.format("org.elasticsearch.spark.sql").options(options).load(s"${es_index}/${es_type}") outDf.show() out.write(outDf) @@ -67,7 +67,7 @@ class NewFetchEs extends ConfigurableStop { } override def getGroup(): List[String] = { - List(StopGroupEnum.HiveGroup.toString) + List(StopGroupEnum.ESGroup.toString) } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/es/NewPutEs.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/es/PutEs.scala similarity index 96% rename from piflow-bundle/src/main/scala/cn/piflow/bundle/es/NewPutEs.scala rename to piflow-bundle/src/main/scala/cn/piflow/bundle/es/PutEs.scala index abb2067..f03879f 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/es/NewPutEs.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/es/PutEs.scala @@ -9,7 +9,7 @@ import org.apache.spark.sql.SparkSession import org.elasticsearch.spark.rdd.EsSpark import org.elasticsearch.spark.sql.EsSparkSQL -class NewPutEs extends ConfigurableStop { +class PutEs extends ConfigurableStop { override val description: String = "put data with dataframe to elasticSearch " val authorEmail: String = "ygang@cnic.cn" @@ -30,6 +30,7 @@ class NewPutEs extends ConfigurableStop { val options = Map("es.index.auto.create"-> "true", "es.nodes"->es_nodes,"es.port"->port) + //保存 df 到es EsSparkSQL.saveToEs(inDf,s"${es_index}/${es_type}",options) @@ -78,7 +79,7 @@ class NewPutEs extends ConfigurableStop { } override def getGroup(): List[String] = { - List(StopGroupEnum.HiveGroup.toString) + List(StopGroupEnum.ESGroup.toString) } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/es/NewQueryEs.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/es/QueryEs.scala similarity index 93% rename from piflow-bundle/src/main/scala/cn/piflow/bundle/es/NewQueryEs.scala rename to piflow-bundle/src/main/scala/cn/piflow/bundle/es/QueryEs.scala index c48c875..7eef03a 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/es/NewQueryEs.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/es/QueryEs.scala @@ -7,7 +7,7 @@ import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} import org.apache.spark.sql.SparkSession import org.elasticsearch.spark.rdd.EsSpark -class NewQueryEs extends ConfigurableStop { +class QueryEs extends ConfigurableStop { override val description: String = "query data with dataframe from elasticSearch " val authorEmail: String = "ygang@cnic.cn" @@ -25,6 +25,7 @@ class NewQueryEs extends ConfigurableStop { val spark = pec.get[SparkSession]() val ssc = spark.sqlContext + // 查询 语句 字段:内容 val query = s""" |{ @@ -61,7 +62,8 @@ class NewQueryEs extends ConfigurableStop { // """.stripMargin - val options = Map("es.index.auto.create"-> "true","es.nodes.wan.only"->"true", + val options = Map("es.index.auto.create"-> "true", + "es.nodes.wan.only"->"true", "es.query" -> query, "es.nodes"->es_nodes,"es.port"->port) @@ -70,11 +72,7 @@ class NewQueryEs extends ConfigurableStop { .options(options).load(s"${es_index}/${es_type}") outDf.show() - println(")))))))))))))))))))))))))))))))))))))))))))))))))))))))))))))_________________") - out.write(outDf) - - } def initialize(ctx: ProcessContext): Unit = { @@ -116,7 +114,7 @@ class NewQueryEs extends ConfigurableStop { } override def getGroup(): List[String] = { - List(StopGroupEnum.HiveGroup.toString) + List(StopGroupEnum.ESGroup.toString) }