updata describe

This commit is contained in:
yanggang 2019-03-12 11:04:29 +08:00
parent b7b93b9a1f
commit 84a7e75b92
3 changed files with 19 additions and 17 deletions

View File

@ -15,18 +15,20 @@ class FetchEs extends ConfigurableStop {
override val outportList: List[String] = List(PortEnum.DefaultPort.toString)
override val description: String = "Fetch data from Elasticsearch "
var es_nodes:String = _ //es的节点多个用逗号隔开
var es_port:String= _ //es的端口好
var es_index:String = _ //es的索引
var es_type:String = _ //es的类型
var es_nodes : String = _
var es_port : String = _
var es_index : String = _
var es_type : String = _
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
val ssc = spark.sqlContext
val options = Map("es.index.auto.create"-> "true","es.nodes.wan.only"->"true",
"es.nodes"->es_nodes,"es.port"->es_port)
val options = Map("es.index.auto.create"-> "true",
"es.nodes.wan.only"->"true",
"es.nodes"->es_nodes,
"es.port"->es_port)
//load data with df from es
val outDf = ssc.read.format("org.elasticsearch.spark.sql").options(options).load(s"${es_index}/${es_type}")

View File

@ -15,10 +15,10 @@ class PutEs extends ConfigurableStop {
override val inportList: List[String] = List(PortEnum.DefaultPort.toString)
override val outportList: List[String] = List(PortEnum.NonePort.toString)
var es_nodes:String = _ //es的节点多个用逗号隔开
var es_port:String= _ //es的端口好
var es_index:String = _ //es的索引
var es_type:String = _ //es的类型
var es_nodes : String = _
var es_port : String = _
var es_index : String = _
var es_type : String = _
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
@ -26,9 +26,9 @@ class PutEs extends ConfigurableStop {
val sc = spark.sparkContext
val options = Map("es.index.auto.create"-> "true",
"es.nodes"->es_nodes,"es.port"->es_port)
"es.nodes"->es_nodes,
"es.port"->es_port)
//保存 df 到es
EsSparkSQL.saveToEs(inDf,s"${es_index}/${es_type}",options)
}

View File

@ -13,11 +13,11 @@ class QueryEs extends ConfigurableStop {
override val outportList: List[String] = List(PortEnum.DefaultPort.toString)
override val description: String = "Query data from Elasticsearch "
var es_nodes:String = _ //es的节点多个用逗号隔开
var es_port:String= _ //es的端口好
var es_index:String = _ //es的索引
var es_type:String = _ //es的类型
var jsonDSL:String = _ //es的字段类型
var es_nodes : String = _
var es_port : String = _
var es_index : String = _
var es_type : String = _
var jsonDSL : String = _
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()