diff --git a/piflow-bundle/src/main/resources/hdfs.json b/piflow-bundle/src/main/resources/hdfs.json index b5e3442..6545410 100644 --- a/piflow-bundle/src/main/resources/hdfs.json +++ b/piflow-bundle/src/main/resources/hdfs.json @@ -20,6 +20,7 @@ "properties":{ "hdfsUrl":"hdfs://10.0.86.89:9000", "hdfsPath":"/yg/0", + "partition":"3", "types":"csv" } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/es/FetchEs.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/es/FetchEs.scala index fed11c5..447383c 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/es/FetchEs.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/es/FetchEs.scala @@ -48,10 +48,10 @@ class FetchEs extends ConfigurableStop { override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("REDIS_HOST").defaultValue("").required(true) - val port = new PropertyDescriptor().name("port").displayName("PORT").defaultValue("").required(true) - val es_index = new PropertyDescriptor().name("es_index").displayName("ES_INDEX").defaultValue("").required(true) - val es_type = new PropertyDescriptor().name("es_type").displayName("ES_TYPE").defaultValue("").required(true) + val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("es_nodes").defaultValue("").required(true) + val port = new PropertyDescriptor().name("port").displayName("port").defaultValue("").required(true) + val es_index = new PropertyDescriptor().name("es_index").displayName("es_index").defaultValue("").required(true) + val es_type = new PropertyDescriptor().name("es_type").displayName("es_type").defaultValue("").required(true) descriptor = es_nodes :: descriptor diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/es/PutEs.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/es/PutEs.scala index 51f21ae..7acd838 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/es/PutEs.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/es/PutEs.scala @@ -59,10 +59,10 @@ class PutEs extends ConfigurableStop { override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("REDIS_HOST").defaultValue("").required(true) - val port = new PropertyDescriptor().name("port").displayName("PORT").defaultValue("").required(true) - val es_index = new PropertyDescriptor().name("es_index").displayName("ES_INDEX").defaultValue("").required(true) - val es_type = new PropertyDescriptor().name("es_type").displayName("ES_TYPE").defaultValue("").required(true) + val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("es_nodes").defaultValue("").required(true) + val port = new PropertyDescriptor().name("port").displayName("port").defaultValue("").required(true) + val es_index = new PropertyDescriptor().name("es_index").displayName("es_index").defaultValue("").required(true) + val es_type = new PropertyDescriptor().name("es_type").displayName("es_type").defaultValue("").required(true) descriptor = es_nodes :: descriptor diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/es/QueryEs.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/es/QueryEs.scala index 5b6df98..cf0b080 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/es/QueryEs.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/es/QueryEs.scala @@ -18,8 +18,8 @@ class QueryEs extends ConfigurableStop { var port:String= _ //es的端口好 var es_index:String = _ //es的索引 var es_type:String = _ //es的类型 - var field_name:String = _ //es的类型 - var field_content:String = _ //es的类型 + var field_name:String = _ //es的字段类型 + var field_content:String = _ //es的字段内容 def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { val spark = pec.get[SparkSession]() @@ -92,10 +92,10 @@ class QueryEs extends ConfigurableStop { override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("REDIS_HOST").defaultValue("").required(true) - val port = new PropertyDescriptor().name("port").displayName("PORT").defaultValue("").required(true) - val es_index = new PropertyDescriptor().name("es_index").displayName("ES_INDEX").defaultValue("").required(true) - val es_type = new PropertyDescriptor().name("es_type").displayName("ES_TYPE").defaultValue("").required(true) + val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("es_nodes").defaultValue("").required(true) + val port = new PropertyDescriptor().name("port").displayName("port").defaultValue("").required(true) + val es_index = new PropertyDescriptor().name("es_index").displayName("es_index").defaultValue("").required(true) + val es_type = new PropertyDescriptor().name("es_type").displayName("es_type").defaultValue("").required(true) val field_name = new PropertyDescriptor().name("field_name").displayName("field_name").defaultValue("").required(true) val field_content = new PropertyDescriptor().name("field_content").displayName("field_content").defaultValue("").required(true) diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/ftp/LoadFromFtpUrl.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/ftp/LoadFromFtpUrl.scala index ec1ee7b..9543f5f 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/ftp/LoadFromFtpUrl.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/ftp/LoadFromFtpUrl.scala @@ -22,7 +22,7 @@ class LoadFromFtpUrl extends ConfigurableStop{ val inportList: List[String] = List(PortEnum.NonePort.toString) val outportList: List[String] = List(PortEnum.DefaultPort.toString) - var http_URl:String =_ //url 地址 + var http_url:String =_ //url 地址 var url_type:String =_ // url 指向文件 类型,文件 or 文件夹 var localPath:String =_ // 保存的本地路径 var downType:String=_ @@ -47,9 +47,9 @@ class LoadFromFtpUrl extends ConfigurableStop{ fileLocalPath = localPath+"/"+fileName list = fileLocalPath::list // 下载 文件 - downFileFromFtpUrl(http_URl,localPath,fileName) + downFileFromFtpUrl(http_url,localPath,fileName) } else { - var arrayList:ArrayList[String]=getFilePathList(http_URl) + var arrayList:ArrayList[String]=getFilePathList(http_url) // 遍历 文件路径 所在的集合 for (i <- 0 until arrayList.size()) { // https://ftp.ncbi.nih.gov/genbank/docs/--1234567890987654321--Current_version_is_10.7--1234567890987654321--20180423 @@ -59,7 +59,7 @@ class LoadFromFtpUrl extends ConfigurableStop{ val arrayString = array.split("--1234567890987654321--") - val fileUrlDir = array.replace(s"$http_URl", "/").split("--1234567890987654321--")(0) + val fileUrlDir = array.replace(s"$http_url", "/").split("--1234567890987654321--")(0) // 单个文件url 指向的 路径 val urlPath = arrayString(0)+arrayString(1) // 下载 保存 文件夹的目录 @@ -88,7 +88,7 @@ class LoadFromFtpUrl extends ConfigurableStop{ println("##################################_____-------------------------------") val arrayString = array.split("--1234567890987654321--") - val fileUrlDir = array.replace(s"$http_URl", "/").split("--1234567890987654321--")(0) + val fileUrlDir = array.replace(s"$http_url", "/").split("--1234567890987654321--")(0) // 单个文件url 指向的 路径 val urlPath = arrayString(0)+arrayString(1) // 下载 保存 文件夹的目录 @@ -217,7 +217,7 @@ class LoadFromFtpUrl extends ConfigurableStop{ def setProperties(map: Map[String, Any]): Unit = { - http_URl=MapUtil.get(map,key="http_URl").asInstanceOf[String] + http_url=MapUtil.get(map,key="http_url").asInstanceOf[String] url_type=MapUtil.get(map,key="url_type").asInstanceOf[String] localPath=MapUtil.get(map,key="localPath").asInstanceOf[String] downType=MapUtil.get(map,key="downType").asInstanceOf[String] @@ -226,14 +226,14 @@ class LoadFromFtpUrl extends ConfigurableStop{ override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - val http_URl = new PropertyDescriptor().name("http_URl").displayName("http_URl").defaultValue("").required(true) + val http_url = new PropertyDescriptor().name("http_url").displayName("http_url").defaultValue("").required(true) val localPath = new PropertyDescriptor().name("localPath").displayName("Local_Path").defaultValue("").required(true) val url_type= new PropertyDescriptor().name("url_type").displayName("url_type").defaultValue("").required(true) val downType= new PropertyDescriptor().name("downType").displayName("downType").defaultValue("all,day").required(true) val fileName= new PropertyDescriptor().name("fileName").displayName("fileName").defaultValue("fileName").required(false) - descriptor = http_URl :: descriptor + descriptor = http_url :: descriptor descriptor = url_type :: descriptor descriptor = localPath :: descriptor descriptor = downType :: descriptor diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/ftp/NewLoadFromFtp.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/ftp/NewLoadFromFtp.scala index aa77262..21f2228 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/ftp/NewLoadFromFtp.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/ftp/NewLoadFromFtp.scala @@ -24,12 +24,10 @@ class NewLoadFromFtp extends ConfigurableStop{ var password:String=_ var ftpFile:String=_ var localPath:String=_ - var ftpClient:FtpClient=null + var ftpClient:FtpClient=null def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { - val inDf = in.read() - inDf.show() ftpClient=connectFTP(url_str,port,username,password) var arrayList:util.ArrayList[String]=getFileList(ftpFile) @@ -164,12 +162,12 @@ class NewLoadFromFtp extends ConfigurableStop{ override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - val url_str = new PropertyDescriptor().name("url_str").displayName("URL").defaultValue("").required(true) - val port = new PropertyDescriptor().name("port").displayName("PORT").defaultValue("").required(true) - val username = new PropertyDescriptor().name("username").displayName("USER_NAME").defaultValue("").required(true) - val password = new PropertyDescriptor().name("password").displayName("PASSWORD").defaultValue("").required(true) - val ftpFile = new PropertyDescriptor().name("ftpFile").displayName("FTP_File").defaultValue("").required(true) - val localPath = new PropertyDescriptor().name("localPath").displayName("Local_Path").defaultValue("").required(true) + val url_str = new PropertyDescriptor().name("url_str").displayName("url_str").defaultValue("").required(true) + val port = new PropertyDescriptor().name("port").displayName("port").defaultValue("").required(true) + val username = new PropertyDescriptor().name("username").displayName("username").defaultValue("").required(true) + val password = new PropertyDescriptor().name("password").displayName("password").defaultValue("").required(true) + val ftpFile = new PropertyDescriptor().name("ftpFile").displayName("ftpFile").defaultValue("").required(true) + val localPath = new PropertyDescriptor().name("localPath").displayName("localPath").defaultValue("").required(true) descriptor = url_str :: descriptor descriptor = port :: descriptor descriptor = username :: descriptor diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/http/GetUrl.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/http/GetUrl.scala index 381aa7d..2565d88 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/http/GetUrl.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/http/GetUrl.scala @@ -32,7 +32,7 @@ class GetUrl extends ConfigurableStop{ // xml String var label:String=_ var schema: String = _ - var xmlString :String=_ +// var xmlString :String=_ override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { @@ -63,7 +63,7 @@ class GetUrl extends ConfigurableStop{ if(types=="xml"){ println("8888888888888888888888888888888888888888888888888888888") - val doc: Document = DocumentHelper.parseText(xmlString) + val doc: Document = DocumentHelper.parseText(jsonString) val rootElt: Element = doc.getRootElement var arrbuffer:ArrayBuffer[Element]=ArrayBuffer() arrbuffer+=rootElt @@ -130,7 +130,7 @@ class GetUrl extends ConfigurableStop{ url = MapUtil.get(map,key="url").asInstanceOf[String] types= MapUtil.get(map,key="types").asInstanceOf[String] - xmlString = MapUtil.get(map,"XmlString").asInstanceOf[String] +// xmlString = MapUtil.get(map,"XmlString").asInstanceOf[String] label = MapUtil.get(map,"label").asInstanceOf[String] schema = MapUtil.get(map,"schema").asInstanceOf[String] @@ -138,11 +138,11 @@ class GetUrl extends ConfigurableStop{ override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - val url = new PropertyDescriptor().name("url").displayName("URL").defaultValue("").required(true) + val url = new PropertyDescriptor().name("url").displayName("url").defaultValue("").required(true) val types = new PropertyDescriptor().name("types").displayName("types").defaultValue("the url content is json or xml)").required(true) - val xmlString = new PropertyDescriptor().name("XmlString").displayName("XmlString").description("the xml String").defaultValue("").required(true) - descriptor = xmlString :: descriptor +// val xmlString = new PropertyDescriptor().name("XmlString").displayName("XmlString").description("the xml String").defaultValue("").required(true) +// descriptor = xmlString :: descriptor val label = new PropertyDescriptor().name("label").displayName("label").description("label path for hope,the delimiter is ,").defaultValue("").required(true) descriptor = label :: descriptor val schema = new PropertyDescriptor().name("schema").displayName("schema").description("name of field in label,the delimiter is ,").defaultValue("").required(true) diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/http/InvokeUrl.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/http/InvokeUrl.scala index af809f9..d9aeea6 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/http/InvokeUrl.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/http/InvokeUrl.scala @@ -25,13 +25,15 @@ import scala.collection.mutable.{ArrayBuffer, ListBuffer} class InvokeUrl extends ConfigurableStop{ override val authorEmail: String = "ygang@cmic.com" override val inportList: List[String] = List(PortEnum.NonePort.toString) - override val outportList: List[String] = List(PortEnum.DefaultPort.toString) + override val outportList: List[String] = List(PortEnum.NonePort.toString) override val description: String = "invoke http " - var urlPut :String= _ - var urlPost :String= _ - var urlDelete :String= _ - var urlGet :String= _ +// var urlPut :String= _ +// var urlPost :String= _ +// var urlDelete :String= _ +// var urlGet :String= _ + + var url :String= _ var jsonPath :String =_ var method :String = _ var colume : String = _ @@ -47,8 +49,8 @@ class InvokeUrl extends ConfigurableStop{ val client = HttpClients.createDefault() - if (method == "GetHttp") { - val getFlowInfo: HttpGet = new HttpGet(urlGet) + if (method == "getHttp") { + val getFlowInfo: HttpGet = new HttpGet(url) val response: CloseableHttpResponse = client.execute(getFlowInfo) val entity = response.getEntity @@ -129,7 +131,7 @@ class InvokeUrl extends ConfigurableStop{ } - if (method == "PutHttp" || method == "PostHttp") { + if (method == "putHttp" || method == "postHttp") { //read json from hdfs val conf = new Configuration() val fs = FileSystem.get(URI.create(jsonPath), conf) @@ -143,8 +145,8 @@ class InvokeUrl extends ConfigurableStop{ } println(buffer) - if (method == "PutHttp") { - val put = new HttpPut(urlPut) + if (method == "putHttp") { + val put = new HttpPut(url) put.setHeader("content-Type", "application/json") //put.setHeader("Accept","application/json") put.setEntity(new StringEntity(buffer.toString, "utf-8")) @@ -159,7 +161,7 @@ class InvokeUrl extends ConfigurableStop{ println(result) put.releaseConnection() } else { - val post = new HttpPost(urlPost) + val post = new HttpPost(url) post.setHeader("content-Type", "application/json") post.setEntity(new StringEntity(buffer.toString)) @@ -171,8 +173,8 @@ class InvokeUrl extends ConfigurableStop{ } } - if (method == "DeleteHttp") { - println(urlDelete) + if (method == "deleteHttp") { + println(url) val inDf = in.read() inDf.createOrReplaceTempView("table") @@ -183,7 +185,7 @@ class InvokeUrl extends ConfigurableStop{ for (i <- 0 until array.length) { - var url = "" + var url1 = "" val newArray = array(i) var builder = new StringBuilder @@ -197,10 +199,10 @@ class InvokeUrl extends ConfigurableStop{ } // println(builder) - url = urlDelete + "?" + builder - println(url + "##########################################################") + url1 = url + "?" + builder + println(url1 + "##########################################################") - val delete = new HttpDelete(url) + val delete = new HttpDelete(url1) delete.setHeader("content-Type", "application/json") val response = client.execute(delete) @@ -218,12 +220,15 @@ class InvokeUrl extends ConfigurableStop{ override def setProperties(map: Map[String, Any]): Unit = { - urlPut = MapUtil.get(map,key="urlPut").asInstanceOf[String] - urlPost = MapUtil.get(map,key="urlPost").asInstanceOf[String] - urlDelete = MapUtil.get(map,key="urlDelete").asInstanceOf[String] - urlGet = MapUtil.get(map,key="urlGet").asInstanceOf[String] + url = MapUtil.get(map,key="url").asInstanceOf[String] +// urlPut = MapUtil.get(map,key="urlPut").asInstanceOf[String] +// urlPost = MapUtil.get(map,key="urlPost").asInstanceOf[String] +// urlDelete = MapUtil.get(map,key="urlDelete").asInstanceOf[String] +// urlGet = MapUtil.get(map,key="urlGet").asInstanceOf[String] jsonPath = MapUtil.get(map,key="jsonPath").asInstanceOf[String] method = MapUtil.get(map,key = "method").asInstanceOf[String] + + //delete colume = MapUtil.get(map,key = "colume").asInstanceOf[String] // get xml @@ -235,10 +240,11 @@ class InvokeUrl extends ConfigurableStop{ override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - val urlPut = new PropertyDescriptor().name("urlPut").displayName("urlPutPost").defaultValue("").required(true) - val urlPost = new PropertyDescriptor().name("urlPost").displayName("urlPutPost").defaultValue("").required(true) - val urlDelete = new PropertyDescriptor().name("urlDelete").displayName("urlPutPost").defaultValue("").required(true) - val urlGet = new PropertyDescriptor().name("urlGet").displayName("urlGet").defaultValue("").required(true) +// val urlPut = new PropertyDescriptor().name("urlPut").displayName("urlPutPost").defaultValue("").required(true) +// val urlPost = new PropertyDescriptor().name("urlPost").displayName("urlPutPost").defaultValue("").required(true) +// val urlDelete = new PropertyDescriptor().name("urlDelete").displayName("urlPutPost").defaultValue("").required(true) +// val urlGet = new PropertyDescriptor().name("urlGet").displayName("urlGet").defaultValue("").required(true) + val url = new PropertyDescriptor().name("url").displayName("url").defaultValue("").required(true) val jsonPath = new PropertyDescriptor().name("jsonPath").displayName("JSONPATH").defaultValue("").required(true) val method = new PropertyDescriptor().name("method").displayName("the way with http").defaultValue("").required(true) val colume = new PropertyDescriptor().name("colume").displayName("colume").defaultValue("").required(true) @@ -252,10 +258,10 @@ class InvokeUrl extends ConfigurableStop{ val schema = new PropertyDescriptor().name("schema").displayName("schema").description("name of field in label,the delimiter is ,").defaultValue("").required(true) descriptor = schema :: descriptor - descriptor = urlPut :: descriptor - descriptor = urlPost :: descriptor - descriptor = urlDelete :: descriptor - descriptor = urlGet :: descriptor +// descriptor = urlPut :: descriptor +// descriptor = urlPost :: descriptor +// descriptor = urlDelete :: descriptor +// descriptor = urlGet :: descriptor descriptor = jsonPath :: descriptor descriptor = method :: descriptor descriptor = colume :: descriptor diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/http/PostUrl.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/http/PostUrl.scala index 3d4dce2..53dddb6 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/http/PostUrl.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/http/PostUrl.scala @@ -20,7 +20,7 @@ import org.apache.spark.sql.SparkSession class PostUrl extends ConfigurableStop{ override val authorEmail: String = "ygang@cmic.com" override val inportList: List[String] = List(PortEnum.NonePort.toString) - override val outportList: List[String] = List(PortEnum.DefaultPort.toString) + override val outportList: List[String] = List(PortEnum.NonePort.toString) override val description: String = "peforms an HTTP Post with " var url : String= _ @@ -63,13 +63,12 @@ class PostUrl extends ConfigurableStop{ override def setProperties(map: Map[String, Any]): Unit = { url = MapUtil.get(map,key="url").asInstanceOf[String] jsonPath = MapUtil.get(map,key="jsonPath").asInstanceOf[String] - println(url) } override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - val url = new PropertyDescriptor().name("url").displayName("URL").defaultValue("").required(true) - val jsonPath = new PropertyDescriptor().name("jsonPath").displayName("JSONPATH").defaultValue("").required(true) + val url = new PropertyDescriptor().name("url").displayName("url").defaultValue("").required(true) + val jsonPath = new PropertyDescriptor().name("jsonPath").displayName("jsonPath").defaultValue("").required(true) descriptor = url :: descriptor descriptor = jsonPath :: descriptor descriptor diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/GenBankParse.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/GenBankParse.scala index e26920b..cb05f51 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/GenBankParse.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/microorganism/GenBankParse.scala @@ -128,10 +128,10 @@ class GenBankParse extends ConfigurableStop{ override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("REDIS_HOST").defaultValue("").required(true) - val port = new PropertyDescriptor().name("port").displayName("PORT").defaultValue("").required(true) - val es_index = new PropertyDescriptor().name("es_index").displayName("ES_INDEX").defaultValue("").required(true) - val es_type = new PropertyDescriptor().name("es_type").displayName("ES_TYPE").defaultValue("").required(true) + val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("es_nodes").defaultValue("").required(true) + val port = new PropertyDescriptor().name("port").displayName("port").defaultValue("").required(true) + val es_index = new PropertyDescriptor().name("es_index").displayName("es_index").defaultValue("").required(true) + val es_type = new PropertyDescriptor().name("es_type").displayName("es_type").defaultValue("").required(true) descriptor = es_nodes :: descriptor descriptor = port :: descriptor