From 5bdcf2a403ec0f7f86f2ffa6526738d11394f18a Mon Sep 17 00:00:00 2001 From: yanggang Date: Wed, 1 Apr 2020 17:06:01 +0800 Subject: [PATCH] update stop --- .../src/main/resources/flow/es/FetchEs.json | 36 ------- .../src/main/resources/flow/es/PutEs.json | 11 ++- .../src/main/resources/flow/es/QueryEs.json | 25 ++--- .../PutElasticsearch.scala | 7 +- .../QueryElasticsearch.scala | 16 +-- .../piflow/bundle/es/FetchElasticsearch.scala | 98 ------------------- .../PutEsTest.scala} | 26 ++--- .../QueryEsTest.scala} | 26 ++--- .../bundle/es/FetchElasticsearchTest.scala | 54 ---------- 9 files changed, 57 insertions(+), 242 deletions(-) delete mode 100644 piflow-bundle/src/main/resources/flow/es/FetchEs.json rename piflow-bundle/src/main/scala/cn/piflow/bundle/{es => elasticsearch}/PutElasticsearch.scala (95%) rename piflow-bundle/src/main/scala/cn/piflow/bundle/{es => elasticsearch}/QueryElasticsearch.scala (91%) delete mode 100644 piflow-bundle/src/main/scala/cn/piflow/bundle/es/FetchElasticsearch.scala rename piflow-bundle/src/test/scala/cn/piflow/bundle/{es/PutElasticsearchTest.scala => elasticsearch/PutEsTest.scala} (63%) rename piflow-bundle/src/test/scala/cn/piflow/bundle/{es/QueryElasticsearchTest.scala => elasticsearch/QueryEsTest.scala} (63%) delete mode 100644 piflow-bundle/src/test/scala/cn/piflow/bundle/es/FetchElasticsearchTest.scala diff --git a/piflow-bundle/src/main/resources/flow/es/FetchEs.json b/piflow-bundle/src/main/resources/flow/es/FetchEs.json deleted file mode 100644 index 43eba67..0000000 --- a/piflow-bundle/src/main/resources/flow/es/FetchEs.json +++ /dev/null @@ -1,36 +0,0 @@ -{ - "flow":{ - "name":"test", - "uuid":"1234", - "stops":[ - { - "uuid":"2222", - "name":"fetchEs", - "bundle":"cn.piflow.bundle.es.FetchEs", - "properties":{ - "es_nodes":"10.0.86.239", - "port":"9200", - "es_index":"spark", - "es_type":"json" - } - - }, - { - "uuid":"0000", - "name":"PutHiveStreaming", - "bundle":"cn.piflow.bundle.hive.PutHiveStreaming", - "properties":{ - "hiveQL":"select * from sparktest.student" - } - } - ], - "paths":[ - { - "from":"fetchEs", - "outport":"", - "inport":"", - "to":"PutHiveStreaming" - } - ] - } -} \ No newline at end of file diff --git a/piflow-bundle/src/main/resources/flow/es/PutEs.json b/piflow-bundle/src/main/resources/flow/es/PutEs.json index 5619077..0b6cefc 100644 --- a/piflow-bundle/src/main/resources/flow/es/PutEs.json +++ b/piflow-bundle/src/main/resources/flow/es/PutEs.json @@ -8,18 +8,19 @@ "name":"SelectHiveQL", "bundle":"cn.piflow.bundle.hive.SelectHiveQL", "properties":{ - "hiveQL":"select * from sparktest.student" + "hiveQL":"select * from test.paper" } }, { "uuid":"1111", "name":"putEs", - "bundle":"cn.piflow.bundle.es.PutEs", + "bundle":"cn.piflow.bundle.elasticsearch.PutElasticsearch", "properties":{ - "es_nodes":"10.0.86.239", - "port":"9200", + "es_nodes":"10.0.88.70", + "es_port":"9200", "es_index":"spark", - "es_type":"testStudent1" + "es_type":"test_paper", + "configuration_item":"" } } ], diff --git a/piflow-bundle/src/main/resources/flow/es/QueryEs.json b/piflow-bundle/src/main/resources/flow/es/QueryEs.json index 540f5e0..46a813f 100644 --- a/piflow-bundle/src/main/resources/flow/es/QueryEs.json +++ b/piflow-bundle/src/main/resources/flow/es/QueryEs.json @@ -6,32 +6,23 @@ { "uuid":"3333", "name":"queryEs", - "bundle":"cn.piflow.bundle.es.QueryEs", + "bundle":"cn.piflow.bundle.elasticsearch.QueryElasticsearch", "properties":{ - "es_nodes":"10.0.86.239", - "port":"9200", - "es_index":"spark", - "es_type":"json", - "jsonDSL":"GET _search \n {\\\"query\\\":{\\\"match_all:{}\\\"}}" - } - - }, - { - "uuid":"0000", - "name":"PutHiveStreaming", - "bundle":"cn.piflow.bundle.hive.PutHiveStreaming", - "properties":{ - "hiveQL":"select * from sparktest.student" + "es_nodes":"10.0.88.70", + "es_port":"9200", + "es_index":"taxonomy", + "es_type":"taxonomy", + "jsonDSL":"{\"query\":{\"match_all\":{}}}" } } ], "paths":[ { - "from":"queryEs", + "from":"", "outport":"", "inport":"", - "to":"PutHiveStreaming" + "to":"" } ] } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/es/PutElasticsearch.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/elasticsearch/PutElasticsearch.scala similarity index 95% rename from piflow-bundle/src/main/scala/cn/piflow/bundle/es/PutElasticsearch.scala rename to piflow-bundle/src/main/scala/cn/piflow/bundle/elasticsearch/PutElasticsearch.scala index 849a8b7..338a249 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/es/PutElasticsearch.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/elasticsearch/PutElasticsearch.scala @@ -1,4 +1,4 @@ -package cn.piflow.bundle.es +package cn.piflow.bundle.elasticsearch import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.util.{ImageUtil, MapUtil} @@ -60,7 +60,7 @@ class PutElasticsearch extends ConfigurableStop { .description("Node of Elasticsearch") .defaultValue("") .required(true) - .example("10.0.86.239") + .example("10.0.88.70") descriptor = es_nodes :: descriptor val es_port = new PropertyDescriptor() @@ -68,6 +68,7 @@ class PutElasticsearch extends ConfigurableStop { .name("es_port") .displayName("Es_Port") .description("Port of Elasticsearch") + .defaultValue("9200") .required(true) .example("9200") descriptor = es_port :: descriptor @@ -96,7 +97,7 @@ class PutElasticsearch extends ConfigurableStop { .description("Configuration Item of Es.such as:es.mapping.parent->id_1,es.mapping.parent->id_2") .defaultValue("") .required(false) - .example("es.mapping.parent->id_1") + .example("es.mapping.parent->id_1,es.mapping.parent->id_2") descriptor = configuration_item :: descriptor descriptor diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/es/QueryElasticsearch.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/elasticsearch/QueryElasticsearch.scala similarity index 91% rename from piflow-bundle/src/main/scala/cn/piflow/bundle/es/QueryElasticsearch.scala rename to piflow-bundle/src/main/scala/cn/piflow/bundle/elasticsearch/QueryElasticsearch.scala index 96626d3..b0eafa9 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/es/QueryElasticsearch.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/elasticsearch/QueryElasticsearch.scala @@ -1,4 +1,4 @@ -package cn.piflow.bundle.es +package cn.piflow.bundle.elasticsearch import cn.piflow.conf.bean.PropertyDescriptor import cn.piflow.conf.util.{ImageUtil, MapUtil} @@ -31,6 +31,8 @@ class QueryElasticsearch extends ConfigurableStop { val outDf = ssc.read.format("org.elasticsearch.spark.sql") .options(options).load(s"${es_index}/${es_type}") + outDf.printSchema() + out.write(outDf) } @@ -55,7 +57,7 @@ class QueryElasticsearch extends ConfigurableStop { .description("Node of Elasticsearch") .defaultValue("") .required(true) - .example("") + .example("10.0.88.70") descriptor = es_nodes :: descriptor val es_port = new PropertyDescriptor() @@ -64,7 +66,7 @@ class QueryElasticsearch extends ConfigurableStop { .description("Port of Elasticsearch") .defaultValue("9200") .required(true) - .example("") + .example("9200") descriptor = es_port :: descriptor val es_index = new PropertyDescriptor() @@ -73,7 +75,7 @@ class QueryElasticsearch extends ConfigurableStop { .description("Index of Elasticsearch") .defaultValue("") .required(true) - .example("") + .example("test") descriptor = es_index :: descriptor val es_type = new PropertyDescriptor() @@ -82,16 +84,16 @@ class QueryElasticsearch extends ConfigurableStop { .description("Type of Elasticsearch") .defaultValue("") .required(true) - .example("") + .example("test") descriptor = es_type :: descriptor val jsonDSL = new PropertyDescriptor() .name("jsonDSL") .displayName("JsonDSL") .description("DSL of Elasticsearch") - .defaultValue("") + .defaultValue("{\"query\":{\"match_all\":{}}}") .required(true) - .example("GET _search \n {\\\"query\\\":{\\\"match_all:{}\\\"}}") + .example("{\"query\":{\"match_all\":{}}}") descriptor = jsonDSL :: descriptor descriptor diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/es/FetchElasticsearch.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/es/FetchElasticsearch.scala deleted file mode 100644 index 7b535e0..0000000 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/es/FetchElasticsearch.scala +++ /dev/null @@ -1,98 +0,0 @@ -package cn.piflow.bundle.es - -import cn.piflow.conf.bean.PropertyDescriptor -import cn.piflow.conf.util.{ImageUtil, MapUtil} -import cn.piflow.conf.{ConfigurableStop, Port, StopGroup} -import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} -import org.apache.spark.sql.SparkSession - - -class FetchElasticsearch extends ConfigurableStop { - - val authorEmail: String = "ygang@cnic.cn" - val description: String = "Fetch data from Elasticsearch" - val inportList: List[String] = List(Port.DefaultPort) - val outportList: List[String] = List(Port.DefaultPort) - - var es_nodes : String = _ - var es_port : String = _ - var es_index : String = _ - var es_type : String = _ - - def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { - val spark = pec.get[SparkSession]() - - val ssc = spark.sqlContext - - val options = Map("es.index.auto.create"-> "true", - "es.nodes.wan.only"->"true", - "es.nodes"->es_nodes, - "es.port"->es_port) - - val outDf = ssc.read.format("org.elasticsearch.spark.sql").options(options).load(s"${es_index}/${es_type}") - out.write(outDf) - - } - def initialize(ctx: ProcessContext): Unit = { - - } - - def setProperties(map : Map[String, Any]): Unit = { - es_nodes=MapUtil.get(map,key="es_nodes").asInstanceOf[String] - es_port=MapUtil.get(map,key="es_port").asInstanceOf[String] - es_index=MapUtil.get(map,key="es_index").asInstanceOf[String] - es_type=MapUtil.get(map,key="es_type").asInstanceOf[String] - } - - override def getPropertyDescriptor(): List[PropertyDescriptor] = { - var descriptor : List[PropertyDescriptor] = List() - val es_nodes = new PropertyDescriptor() - .name("es_nodes") - .displayName("Es_Nodes") - .description("Node of Elasticsearch") - .defaultValue("") - .required(true) - .example("10.0.86.239") - descriptor = es_nodes :: descriptor - - val es_port = new PropertyDescriptor() - .name("es_port") - .displayName("Es_Port") - .description("Port of Elasticsearch") - .defaultValue("9200") - .required(true) - .example("9200") - descriptor = es_port :: descriptor - - val es_index = new PropertyDescriptor() - .name("es_index") - .displayName("Es_Index") - .description("Index of Elasticsearch") - .defaultValue("") - .required(true) - .example("spark") - descriptor = es_index :: descriptor - - val es_type = new PropertyDescriptor() - .name("es_type") - .displayName("Es_Type") - .description("Type of Elasticsearch") - .defaultValue("") - .required(true) - .example("json") - descriptor = es_type :: descriptor - - descriptor - } - - override def getIcon(): Array[Byte] = { - ImageUtil.getImage("icon/elasticsearch/FetchEs.png") - } - - override def getGroup(): List[String] = { - List(StopGroup.ESGroup) - } - - - -} diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/es/PutElasticsearchTest.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/elasticsearch/PutEsTest.scala similarity index 63% rename from piflow-bundle/src/test/scala/cn/piflow/bundle/es/PutElasticsearchTest.scala rename to piflow-bundle/src/test/scala/cn/piflow/bundle/elasticsearch/PutEsTest.scala index 77865c3..c2d735c 100644 --- a/piflow-bundle/src/test/scala/cn/piflow/bundle/es/PutElasticsearchTest.scala +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/elasticsearch/PutEsTest.scala @@ -1,19 +1,21 @@ -package cn.piflow.bundle.es +package cn.piflow.bundle.elasticsearch + +import java.net.InetAddress import cn.piflow.Runner import cn.piflow.conf.bean.FlowBean import cn.piflow.conf.util.{FileUtil, OptionUtil} -import cn.piflow.util.PropertyUtil +import cn.piflow.util.{PropertyUtil, ServerIpUtil} import org.apache.spark.sql.SparkSession import org.h2.tools.Server import org.junit.Test import scala.util.parsing.json.JSON -class PutElasticsearchTest { +class PutEsTest { @Test - def testEs(): Unit ={ + def testFlow(): Unit ={ //parse flow json val file = "src/main/resources/flow/es/PutEs.json" @@ -25,16 +27,18 @@ class PutElasticsearchTest { val flowBean = FlowBean(map) val flow = flowBean.constructFlow() - val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort", "50001").start() + val ip = InetAddress.getLocalHost.getHostAddress + cn.piflow.util.FileUtil.writeFile("server.ip=" + ip, ServerIpUtil.getServerIpFile()) + val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start() //execute flow val spark = SparkSession.builder() - .master("local[*]") - .appName("CsvParserTest") - .config("spark.driver.memory", "1g") - .config("spark.executor.memory", "2g") - .config("spark.cores.max", "2") - .config("hive.metastore.uris", PropertyUtil.getPropertyValue("hive.metastore.uris")) + .master("local[12]") + .appName("hive") + .config("spark.driver.memory", "4g") + .config("spark.executor.memory", "8g") + .config("spark.cores.max", "8") + .config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris")) .enableHiveSupport() .getOrCreate() diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/es/QueryElasticsearchTest.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/elasticsearch/QueryEsTest.scala similarity index 63% rename from piflow-bundle/src/test/scala/cn/piflow/bundle/es/QueryElasticsearchTest.scala rename to piflow-bundle/src/test/scala/cn/piflow/bundle/elasticsearch/QueryEsTest.scala index e6d27db..31f6cb9 100644 --- a/piflow-bundle/src/test/scala/cn/piflow/bundle/es/QueryElasticsearchTest.scala +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/elasticsearch/QueryEsTest.scala @@ -1,19 +1,21 @@ -package cn.piflow.bundle.es +package cn.piflow.bundle.elasticsearch + +import java.net.InetAddress import cn.piflow.Runner import cn.piflow.conf.bean.FlowBean import cn.piflow.conf.util.{FileUtil, OptionUtil} -import cn.piflow.util.PropertyUtil +import cn.piflow.util.{PropertyUtil, ServerIpUtil} import org.apache.spark.sql.SparkSession import org.h2.tools.Server import org.junit.Test import scala.util.parsing.json.JSON -class QueryElasticsearchTest { +class QueryEsTest { @Test - def testEs(): Unit ={ + def testFlow(): Unit ={ //parse flow json val file = "src/main/resources/flow/es/QueryEs.json" @@ -25,16 +27,18 @@ class QueryElasticsearchTest { val flowBean = FlowBean(map) val flow = flowBean.constructFlow() - val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort", "50001").start() + val ip = InetAddress.getLocalHost.getHostAddress + cn.piflow.util.FileUtil.writeFile("server.ip=" + ip, ServerIpUtil.getServerIpFile()) + val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start() //execute flow val spark = SparkSession.builder() - .master("local[*]") - .appName("CsvParserTest") - .config("spark.driver.memory", "1g") - .config("spark.executor.memory", "2g") - .config("spark.cores.max", "2") - .config("hive.metastore.uris", PropertyUtil.getPropertyValue("hive.metastore.uris")) + .master("local[12]") + .appName("hive") + .config("spark.driver.memory", "4g") + .config("spark.executor.memory", "8g") + .config("spark.cores.max", "8") +// .config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris")) .enableHiveSupport() .getOrCreate() diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/es/FetchElasticsearchTest.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/es/FetchElasticsearchTest.scala deleted file mode 100644 index 4a64502..0000000 --- a/piflow-bundle/src/test/scala/cn/piflow/bundle/es/FetchElasticsearchTest.scala +++ /dev/null @@ -1,54 +0,0 @@ -package cn.piflow.bundle.es - -import cn.piflow.Runner -import cn.piflow.conf.bean.FlowBean -import cn.piflow.conf.util.{FileUtil, OptionUtil} -import cn.piflow.util.PropertyUtil -import org.apache.spark.sql.SparkSession -import org.h2.tools.Server -import org.junit.Test - -import scala.util.parsing.json.JSON - -class FetchElasticsearchTest { - - @Test - def testEs(): Unit ={ - - //parse flow json - val file = "src/main/resources/flow/es/FetchEs.json" - val flowJsonStr = FileUtil.fileReader(file) - val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]] - println(map) - - //create flow - val flowBean = FlowBean(map) - val flow = flowBean.constructFlow() - - val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort", "50001").start() - - //execute flow - val spark = SparkSession.builder() - .master("local[*]") - .appName("CsvParserTest") - .config("spark.driver.memory", "1g") - .config("spark.executor.memory", "2g") - .config("spark.cores.max", "2") - .config("hive.metastore.uris", PropertyUtil.getPropertyValue("hive.metastore.uris")) - .enableHiveSupport() - .getOrCreate() - - val process = Runner.create() - .bind(classOf[SparkSession].getName, spark) - .bind("checkpoint.path", "") - .bind("debug.path","") - .start(flow); - - process.awaitTermination(); - val pid = process.pid(); - println(pid + "!!!!!!!!!!!!!!!!!!!!!") - spark.close(); - } - - -}