update describe

This commit is contained in:
yanggang 2018-11-22 19:27:04 +08:00
parent 2e26f7cbe3
commit f8f34a4cee
10 changed files with 78 additions and 74 deletions

View File

@ -20,6 +20,7 @@
"properties":{
"hdfsUrl":"hdfs://10.0.86.89:9000",
"hdfsPath":"/yg/0",
"partition":"3",
"types":"csv"
}

View File

@ -48,10 +48,10 @@ class FetchEs extends ConfigurableStop {
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("REDIS_HOST").defaultValue("").required(true)
val port = new PropertyDescriptor().name("port").displayName("PORT").defaultValue("").required(true)
val es_index = new PropertyDescriptor().name("es_index").displayName("ES_INDEX").defaultValue("").required(true)
val es_type = new PropertyDescriptor().name("es_type").displayName("ES_TYPE").defaultValue("").required(true)
val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("es_nodes").defaultValue("").required(true)
val port = new PropertyDescriptor().name("port").displayName("port").defaultValue("").required(true)
val es_index = new PropertyDescriptor().name("es_index").displayName("es_index").defaultValue("").required(true)
val es_type = new PropertyDescriptor().name("es_type").displayName("es_type").defaultValue("").required(true)
descriptor = es_nodes :: descriptor

View File

@ -59,10 +59,10 @@ class PutEs extends ConfigurableStop {
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("REDIS_HOST").defaultValue("").required(true)
val port = new PropertyDescriptor().name("port").displayName("PORT").defaultValue("").required(true)
val es_index = new PropertyDescriptor().name("es_index").displayName("ES_INDEX").defaultValue("").required(true)
val es_type = new PropertyDescriptor().name("es_type").displayName("ES_TYPE").defaultValue("").required(true)
val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("es_nodes").defaultValue("").required(true)
val port = new PropertyDescriptor().name("port").displayName("port").defaultValue("").required(true)
val es_index = new PropertyDescriptor().name("es_index").displayName("es_index").defaultValue("").required(true)
val es_type = new PropertyDescriptor().name("es_type").displayName("es_type").defaultValue("").required(true)
descriptor = es_nodes :: descriptor

View File

@ -18,8 +18,8 @@ class QueryEs extends ConfigurableStop {
var port:String= _ //es的端口好
var es_index:String = _ //es的索引
var es_type:String = _ //es的类型
var field_name:String = _ //es的类型
var field_content:String = _ //es的类型
var field_name:String = _ //es的字段类型
var field_content:String = _ //es的字段内容
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
@ -92,10 +92,10 @@ class QueryEs extends ConfigurableStop {
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("REDIS_HOST").defaultValue("").required(true)
val port = new PropertyDescriptor().name("port").displayName("PORT").defaultValue("").required(true)
val es_index = new PropertyDescriptor().name("es_index").displayName("ES_INDEX").defaultValue("").required(true)
val es_type = new PropertyDescriptor().name("es_type").displayName("ES_TYPE").defaultValue("").required(true)
val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("es_nodes").defaultValue("").required(true)
val port = new PropertyDescriptor().name("port").displayName("port").defaultValue("").required(true)
val es_index = new PropertyDescriptor().name("es_index").displayName("es_index").defaultValue("").required(true)
val es_type = new PropertyDescriptor().name("es_type").displayName("es_type").defaultValue("").required(true)
val field_name = new PropertyDescriptor().name("field_name").displayName("field_name").defaultValue("").required(true)
val field_content = new PropertyDescriptor().name("field_content").displayName("field_content").defaultValue("").required(true)

View File

@ -22,7 +22,7 @@ class LoadFromFtpUrl extends ConfigurableStop{
val inportList: List[String] = List(PortEnum.NonePort.toString)
val outportList: List[String] = List(PortEnum.DefaultPort.toString)
var http_URl:String =_ //url 地址
var http_url:String =_ //url 地址
var url_type:String =_ // url 指向文件 类型文件 or 文件夹
var localPath:String =_ // 保存的本地路径
var downType:String=_
@ -47,9 +47,9 @@ class LoadFromFtpUrl extends ConfigurableStop{
fileLocalPath = localPath+"/"+fileName
list = fileLocalPath::list
// 下载 文件
downFileFromFtpUrl(http_URl,localPath,fileName)
downFileFromFtpUrl(http_url,localPath,fileName)
} else {
var arrayList:ArrayList[String]=getFilePathList(http_URl)
var arrayList:ArrayList[String]=getFilePathList(http_url)
// 遍历 文件路径 所在的集合
for (i <- 0 until arrayList.size()) {
// https://ftp.ncbi.nih.gov/genbank/docs/--1234567890987654321--Current_version_is_10.7--1234567890987654321--20180423
@ -59,7 +59,7 @@ class LoadFromFtpUrl extends ConfigurableStop{
val arrayString = array.split("--1234567890987654321--")
val fileUrlDir = array.replace(s"$http_URl", "/").split("--1234567890987654321--")(0)
val fileUrlDir = array.replace(s"$http_url", "/").split("--1234567890987654321--")(0)
// 单个文件url 指向的 路径
val urlPath = arrayString(0)+arrayString(1)
// 下载 保存 文件夹的目录
@ -88,7 +88,7 @@ class LoadFromFtpUrl extends ConfigurableStop{
println("##################################_____-------------------------------")
val arrayString = array.split("--1234567890987654321--")
val fileUrlDir = array.replace(s"$http_URl", "/").split("--1234567890987654321--")(0)
val fileUrlDir = array.replace(s"$http_url", "/").split("--1234567890987654321--")(0)
// 单个文件url 指向的 路径
val urlPath = arrayString(0)+arrayString(1)
// 下载 保存 文件夹的目录
@ -217,7 +217,7 @@ class LoadFromFtpUrl extends ConfigurableStop{
def setProperties(map: Map[String, Any]): Unit = {
http_URl=MapUtil.get(map,key="http_URl").asInstanceOf[String]
http_url=MapUtil.get(map,key="http_url").asInstanceOf[String]
url_type=MapUtil.get(map,key="url_type").asInstanceOf[String]
localPath=MapUtil.get(map,key="localPath").asInstanceOf[String]
downType=MapUtil.get(map,key="downType").asInstanceOf[String]
@ -226,14 +226,14 @@ class LoadFromFtpUrl extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val http_URl = new PropertyDescriptor().name("http_URl").displayName("http_URl").defaultValue("").required(true)
val http_url = new PropertyDescriptor().name("http_url").displayName("http_url").defaultValue("").required(true)
val localPath = new PropertyDescriptor().name("localPath").displayName("Local_Path").defaultValue("").required(true)
val url_type= new PropertyDescriptor().name("url_type").displayName("url_type").defaultValue("").required(true)
val downType= new PropertyDescriptor().name("downType").displayName("downType").defaultValue("all,day").required(true)
val fileName= new PropertyDescriptor().name("fileName").displayName("fileName").defaultValue("fileName").required(false)
descriptor = http_URl :: descriptor
descriptor = http_url :: descriptor
descriptor = url_type :: descriptor
descriptor = localPath :: descriptor
descriptor = downType :: descriptor

View File

@ -24,12 +24,10 @@ class NewLoadFromFtp extends ConfigurableStop{
var password:String=_
var ftpFile:String=_
var localPath:String=_
var ftpClient:FtpClient=null
var ftpClient:FtpClient=null
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val inDf = in.read()
inDf.show()
ftpClient=connectFTP(url_str,port,username,password)
var arrayList:util.ArrayList[String]=getFileList(ftpFile)
@ -164,12 +162,12 @@ class NewLoadFromFtp extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val url_str = new PropertyDescriptor().name("url_str").displayName("URL").defaultValue("").required(true)
val port = new PropertyDescriptor().name("port").displayName("PORT").defaultValue("").required(true)
val username = new PropertyDescriptor().name("username").displayName("USER_NAME").defaultValue("").required(true)
val password = new PropertyDescriptor().name("password").displayName("PASSWORD").defaultValue("").required(true)
val ftpFile = new PropertyDescriptor().name("ftpFile").displayName("FTP_File").defaultValue("").required(true)
val localPath = new PropertyDescriptor().name("localPath").displayName("Local_Path").defaultValue("").required(true)
val url_str = new PropertyDescriptor().name("url_str").displayName("url_str").defaultValue("").required(true)
val port = new PropertyDescriptor().name("port").displayName("port").defaultValue("").required(true)
val username = new PropertyDescriptor().name("username").displayName("username").defaultValue("").required(true)
val password = new PropertyDescriptor().name("password").displayName("password").defaultValue("").required(true)
val ftpFile = new PropertyDescriptor().name("ftpFile").displayName("ftpFile").defaultValue("").required(true)
val localPath = new PropertyDescriptor().name("localPath").displayName("localPath").defaultValue("").required(true)
descriptor = url_str :: descriptor
descriptor = port :: descriptor
descriptor = username :: descriptor

View File

@ -32,7 +32,7 @@ class GetUrl extends ConfigurableStop{
// xml String
var label:String=_
var schema: String = _
var xmlString :String=_
// var xmlString :String=_
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
@ -63,7 +63,7 @@ class GetUrl extends ConfigurableStop{
if(types=="xml"){
println("8888888888888888888888888888888888888888888888888888888")
val doc: Document = DocumentHelper.parseText(xmlString)
val doc: Document = DocumentHelper.parseText(jsonString)
val rootElt: Element = doc.getRootElement
var arrbuffer:ArrayBuffer[Element]=ArrayBuffer()
arrbuffer+=rootElt
@ -130,7 +130,7 @@ class GetUrl extends ConfigurableStop{
url = MapUtil.get(map,key="url").asInstanceOf[String]
types= MapUtil.get(map,key="types").asInstanceOf[String]
xmlString = MapUtil.get(map,"XmlString").asInstanceOf[String]
// xmlString = MapUtil.get(map,"XmlString").asInstanceOf[String]
label = MapUtil.get(map,"label").asInstanceOf[String]
schema = MapUtil.get(map,"schema").asInstanceOf[String]
@ -138,11 +138,11 @@ class GetUrl extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val url = new PropertyDescriptor().name("url").displayName("URL").defaultValue("").required(true)
val url = new PropertyDescriptor().name("url").displayName("url").defaultValue("").required(true)
val types = new PropertyDescriptor().name("types").displayName("types").defaultValue("the url content is json or xml)").required(true)
val xmlString = new PropertyDescriptor().name("XmlString").displayName("XmlString").description("the xml String").defaultValue("").required(true)
descriptor = xmlString :: descriptor
// val xmlString = new PropertyDescriptor().name("XmlString").displayName("XmlString").description("the xml String").defaultValue("").required(true)
// descriptor = xmlString :: descriptor
val label = new PropertyDescriptor().name("label").displayName("label").description("label path for hope,the delimiter is ,").defaultValue("").required(true)
descriptor = label :: descriptor
val schema = new PropertyDescriptor().name("schema").displayName("schema").description("name of field in label,the delimiter is ,").defaultValue("").required(true)

View File

@ -25,13 +25,15 @@ import scala.collection.mutable.{ArrayBuffer, ListBuffer}
class InvokeUrl extends ConfigurableStop{
override val authorEmail: String = "ygang@cmic.com"
override val inportList: List[String] = List(PortEnum.NonePort.toString)
override val outportList: List[String] = List(PortEnum.DefaultPort.toString)
override val outportList: List[String] = List(PortEnum.NonePort.toString)
override val description: String = "invoke http "
var urlPut :String= _
var urlPost :String= _
var urlDelete :String= _
var urlGet :String= _
// var urlPut :String= _
// var urlPost :String= _
// var urlDelete :String= _
// var urlGet :String= _
var url :String= _
var jsonPath :String =_
var method :String = _
var colume : String = _
@ -47,8 +49,8 @@ class InvokeUrl extends ConfigurableStop{
val client = HttpClients.createDefault()
if (method == "GetHttp") {
val getFlowInfo: HttpGet = new HttpGet(urlGet)
if (method == "getHttp") {
val getFlowInfo: HttpGet = new HttpGet(url)
val response: CloseableHttpResponse = client.execute(getFlowInfo)
val entity = response.getEntity
@ -129,7 +131,7 @@ class InvokeUrl extends ConfigurableStop{
}
if (method == "PutHttp" || method == "PostHttp") {
if (method == "putHttp" || method == "postHttp") {
//read json from hdfs
val conf = new Configuration()
val fs = FileSystem.get(URI.create(jsonPath), conf)
@ -143,8 +145,8 @@ class InvokeUrl extends ConfigurableStop{
}
println(buffer)
if (method == "PutHttp") {
val put = new HttpPut(urlPut)
if (method == "putHttp") {
val put = new HttpPut(url)
put.setHeader("content-Type", "application/json")
//put.setHeader("Accept","application/json")
put.setEntity(new StringEntity(buffer.toString, "utf-8"))
@ -159,7 +161,7 @@ class InvokeUrl extends ConfigurableStop{
println(result)
put.releaseConnection()
} else {
val post = new HttpPost(urlPost)
val post = new HttpPost(url)
post.setHeader("content-Type", "application/json")
post.setEntity(new StringEntity(buffer.toString))
@ -171,8 +173,8 @@ class InvokeUrl extends ConfigurableStop{
}
}
if (method == "DeleteHttp") {
println(urlDelete)
if (method == "deleteHttp") {
println(url)
val inDf = in.read()
inDf.createOrReplaceTempView("table")
@ -183,7 +185,7 @@ class InvokeUrl extends ConfigurableStop{
for (i <- 0 until array.length) {
var url = ""
var url1 = ""
val newArray = array(i)
var builder = new StringBuilder
@ -197,10 +199,10 @@ class InvokeUrl extends ConfigurableStop{
}
// println(builder)
url = urlDelete + "?" + builder
println(url + "##########################################################")
url1 = url + "?" + builder
println(url1 + "##########################################################")
val delete = new HttpDelete(url)
val delete = new HttpDelete(url1)
delete.setHeader("content-Type", "application/json")
val response = client.execute(delete)
@ -218,12 +220,15 @@ class InvokeUrl extends ConfigurableStop{
override def setProperties(map: Map[String, Any]): Unit = {
urlPut = MapUtil.get(map,key="urlPut").asInstanceOf[String]
urlPost = MapUtil.get(map,key="urlPost").asInstanceOf[String]
urlDelete = MapUtil.get(map,key="urlDelete").asInstanceOf[String]
urlGet = MapUtil.get(map,key="urlGet").asInstanceOf[String]
url = MapUtil.get(map,key="url").asInstanceOf[String]
// urlPut = MapUtil.get(map,key="urlPut").asInstanceOf[String]
// urlPost = MapUtil.get(map,key="urlPost").asInstanceOf[String]
// urlDelete = MapUtil.get(map,key="urlDelete").asInstanceOf[String]
// urlGet = MapUtil.get(map,key="urlGet").asInstanceOf[String]
jsonPath = MapUtil.get(map,key="jsonPath").asInstanceOf[String]
method = MapUtil.get(map,key = "method").asInstanceOf[String]
//delete
colume = MapUtil.get(map,key = "colume").asInstanceOf[String]
// get xml
@ -235,10 +240,11 @@ class InvokeUrl extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val urlPut = new PropertyDescriptor().name("urlPut").displayName("urlPutPost").defaultValue("").required(true)
val urlPost = new PropertyDescriptor().name("urlPost").displayName("urlPutPost").defaultValue("").required(true)
val urlDelete = new PropertyDescriptor().name("urlDelete").displayName("urlPutPost").defaultValue("").required(true)
val urlGet = new PropertyDescriptor().name("urlGet").displayName("urlGet").defaultValue("").required(true)
// val urlPut = new PropertyDescriptor().name("urlPut").displayName("urlPutPost").defaultValue("").required(true)
// val urlPost = new PropertyDescriptor().name("urlPost").displayName("urlPutPost").defaultValue("").required(true)
// val urlDelete = new PropertyDescriptor().name("urlDelete").displayName("urlPutPost").defaultValue("").required(true)
// val urlGet = new PropertyDescriptor().name("urlGet").displayName("urlGet").defaultValue("").required(true)
val url = new PropertyDescriptor().name("url").displayName("url").defaultValue("").required(true)
val jsonPath = new PropertyDescriptor().name("jsonPath").displayName("JSONPATH").defaultValue("").required(true)
val method = new PropertyDescriptor().name("method").displayName("the way with http").defaultValue("").required(true)
val colume = new PropertyDescriptor().name("colume").displayName("colume").defaultValue("").required(true)
@ -252,10 +258,10 @@ class InvokeUrl extends ConfigurableStop{
val schema = new PropertyDescriptor().name("schema").displayName("schema").description("name of field in label,the delimiter is ,").defaultValue("").required(true)
descriptor = schema :: descriptor
descriptor = urlPut :: descriptor
descriptor = urlPost :: descriptor
descriptor = urlDelete :: descriptor
descriptor = urlGet :: descriptor
// descriptor = urlPut :: descriptor
// descriptor = urlPost :: descriptor
// descriptor = urlDelete :: descriptor
// descriptor = urlGet :: descriptor
descriptor = jsonPath :: descriptor
descriptor = method :: descriptor
descriptor = colume :: descriptor

View File

@ -20,7 +20,7 @@ import org.apache.spark.sql.SparkSession
class PostUrl extends ConfigurableStop{
override val authorEmail: String = "ygang@cmic.com"
override val inportList: List[String] = List(PortEnum.NonePort.toString)
override val outportList: List[String] = List(PortEnum.DefaultPort.toString)
override val outportList: List[String] = List(PortEnum.NonePort.toString)
override val description: String = "peforms an HTTP Post with "
var url : String= _
@ -63,13 +63,12 @@ class PostUrl extends ConfigurableStop{
override def setProperties(map: Map[String, Any]): Unit = {
url = MapUtil.get(map,key="url").asInstanceOf[String]
jsonPath = MapUtil.get(map,key="jsonPath").asInstanceOf[String]
println(url)
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val url = new PropertyDescriptor().name("url").displayName("URL").defaultValue("").required(true)
val jsonPath = new PropertyDescriptor().name("jsonPath").displayName("JSONPATH").defaultValue("").required(true)
val url = new PropertyDescriptor().name("url").displayName("url").defaultValue("").required(true)
val jsonPath = new PropertyDescriptor().name("jsonPath").displayName("jsonPath").defaultValue("").required(true)
descriptor = url :: descriptor
descriptor = jsonPath :: descriptor
descriptor

View File

@ -128,10 +128,10 @@ class GenBankParse extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("REDIS_HOST").defaultValue("").required(true)
val port = new PropertyDescriptor().name("port").displayName("PORT").defaultValue("").required(true)
val es_index = new PropertyDescriptor().name("es_index").displayName("ES_INDEX").defaultValue("").required(true)
val es_type = new PropertyDescriptor().name("es_type").displayName("ES_TYPE").defaultValue("").required(true)
val es_nodes = new PropertyDescriptor().name("es_nodes").displayName("es_nodes").defaultValue("").required(true)
val port = new PropertyDescriptor().name("port").displayName("port").defaultValue("").required(true)
val es_index = new PropertyDescriptor().name("es_index").displayName("es_index").defaultValue("").required(true)
val es_type = new PropertyDescriptor().name("es_type").displayName("es_type").defaultValue("").required(true)
descriptor = es_nodes :: descriptor
descriptor = port :: descriptor