From 4bb8abba9ce73f3abefc2780f78bcb95ad21e909 Mon Sep 17 00:00:00 2001 From: yanggang Date: Thu, 22 Nov 2018 14:46:04 +0800 Subject: [PATCH] goldData Parse --- piflow-bundle/src/main/resources/url.json | 6 ++-- .../piflow/bundle/{url => http}/GetUrl.scala | 31 ++++++------------- .../bundle/{url => http}/InvokeUrl.scala | 13 +++----- .../piflow/bundle/{url => http}/PostUrl.scala | 8 ++--- .../main/scala/cn/piflow/conf/StopGroup.scala | 1 - .../scala/cn/piflow/conf/StopGroupEnum.scala | 1 - 6 files changed, 22 insertions(+), 38 deletions(-) rename piflow-bundle/src/main/scala/cn/piflow/bundle/{url => http}/GetUrl.scala (88%) rename piflow-bundle/src/main/scala/cn/piflow/bundle/{url => http}/InvokeUrl.scala (97%) rename piflow-bundle/src/main/scala/cn/piflow/bundle/{url => http}/PostUrl.scala (92%) diff --git a/piflow-bundle/src/main/resources/url.json b/piflow-bundle/src/main/resources/url.json index ba64589..4fbcdfc 100644 --- a/piflow-bundle/src/main/resources/url.json +++ b/piflow-bundle/src/main/resources/url.json @@ -7,7 +7,7 @@ { "uuid":"0000", "name":"getUrl", - "bundle":"cn.piflow.bundle.url.GetUrl", + "bundle":"cn.piflow.bundle.http.GetUrl", "properties":{ "url":"http://10.0.86.98:8002/flow/info?appID=application_1539850523117_0136", "types":"json", @@ -20,7 +20,7 @@ { "uuid":"1111", "name":"postUrl", - "bundle":"cn.piflow.bundle.url.PostUrl", + "bundle":"cn.piflow.bundle.http.PostUrl", "properties":{ "url":"http://10.0.86.98:8002/flow/start", "jsonPath":"hdfs://10.0.86.89:9000/yg/flow.json" @@ -30,7 +30,7 @@ { "uuid":"2222", "name":"invokeUrl", - "bundle":"cn.piflow.bundle.url.InvokeUrl", + "bundle":"cn.piflow.bundle.http.InvokeUrl", "properties": { "urlPut": "http://coolaf.com/tool/params", "urlDelete": "http://coolaf.com/tool/params", diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/url/GetUrl.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/http/GetUrl.scala similarity index 88% rename from piflow-bundle/src/main/scala/cn/piflow/bundle/url/GetUrl.scala rename to piflow-bundle/src/main/scala/cn/piflow/bundle/http/GetUrl.scala index 14d6eda..381aa7d 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/url/GetUrl.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/http/GetUrl.scala @@ -1,36 +1,30 @@ -package cn.piflow.bundle.url +package cn.piflow.bundle.http -import java.net.URI import java.util -import cn.piflow.bundle.util.JsonUtil -import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} -import cn.piflow.conf.util.ImageUtil -import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroupEnum} import cn.piflow.conf.bean.PropertyDescriptor -import cn.piflow.conf.util.MapUtil -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FSDataInputStream, FileSystem, Path} +import cn.piflow.conf.util.{ImageUtil, MapUtil} +import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroupEnum} +import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet} import org.apache.http.impl.client.HttpClients import org.apache.http.util.EntityUtils import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.{StringType, StructField, StructType} -import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession} +import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.dom4j.{Document, DocumentHelper, Element} -import org.dom4j.io.SAXReader -import scala.collection.mutable.{ArrayBuffer, ListBuffer} import scala.collection.JavaConverters._ - - +import scala.collection.mutable.{ArrayBuffer, ListBuffer} class GetUrl extends ConfigurableStop{ override val authorEmail: String = "ygang@cmic.com" + override val description: String = "GetUrl to dataframe" + override val inportList: List[String] = List(PortEnum.NonePort.toString) + override val outportList: List[String] = List(PortEnum.DefaultPort.toString) - override val description: String = "Get data from http/url to dataframe" var url :String= _ var types :String = _ @@ -44,9 +38,6 @@ class GetUrl extends ConfigurableStop{ val ss = pec.get[SparkSession]() - println(url) - println(types) - // get from url val client = HttpClients.createDefault() val getFlowInfo:HttpGet = new HttpGet(url) @@ -167,13 +158,11 @@ class GetUrl extends ConfigurableStop{ } override def getGroup(): List[String] = { - List(StopGroupEnum.UrlGroup.toString) + List(StopGroupEnum.HttpGroup.toString) } override def initialize(ctx: ProcessContext): Unit = { } - override val inportList: List[String] = List(PortEnum.DefaultPort.toString) - override val outportList: List[String] = List(PortEnum.NonePort.toString) } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/url/InvokeUrl.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/http/InvokeUrl.scala similarity index 97% rename from piflow-bundle/src/main/scala/cn/piflow/bundle/url/InvokeUrl.scala rename to piflow-bundle/src/main/scala/cn/piflow/bundle/http/InvokeUrl.scala index 69778c0..af809f9 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/url/InvokeUrl.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/http/InvokeUrl.scala @@ -1,4 +1,4 @@ -package cn.piflow.bundle.url +package cn.piflow.bundle.http import java.io.{BufferedReader, InputStreamReader} import java.net.URI @@ -14,21 +14,18 @@ import org.apache.http.client.methods._ import org.apache.http.entity.StringEntity import org.apache.http.impl.client.HttpClients import org.apache.http.util.EntityUtils -import org.apache.spark.SparkConf -import org.apache.spark.api.java.JavaRDD import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.dom4j.{Document, DocumentHelper, Element} - -import scala.collection.mutable.{ArrayBuffer, ListBuffer} import scala.collection.JavaConverters._ +import scala.collection.mutable.{ArrayBuffer, ListBuffer} class InvokeUrl extends ConfigurableStop{ override val authorEmail: String = "ygang@cmic.com" - override val inportList: List[String] = List(PortEnum.DefaultPort.toString) - override val outportList: List[String] = List(PortEnum.NonePort.toString) + override val inportList: List[String] = List(PortEnum.NonePort.toString) + override val outportList: List[String] = List(PortEnum.DefaultPort.toString) override val description: String = "invoke http " var urlPut :String= _ @@ -270,7 +267,7 @@ class InvokeUrl extends ConfigurableStop{ } override def getGroup(): List[String] = { - List(StopGroupEnum.UrlGroup.toString) + List(StopGroupEnum.HttpGroup.toString) } override def initialize(ctx: ProcessContext): Unit = { diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/url/PostUrl.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/http/PostUrl.scala similarity index 92% rename from piflow-bundle/src/main/scala/cn/piflow/bundle/url/PostUrl.scala rename to piflow-bundle/src/main/scala/cn/piflow/bundle/http/PostUrl.scala index 7a54a7b..3d4dce2 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/url/PostUrl.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/http/PostUrl.scala @@ -1,4 +1,4 @@ -package cn.piflow.bundle.url +package cn.piflow.bundle.http import java.io.{BufferedReader, InputStreamReader} import java.net.URI @@ -19,8 +19,8 @@ import org.apache.spark.sql.SparkSession class PostUrl extends ConfigurableStop{ override val authorEmail: String = "ygang@cmic.com" - override val inportList: List[String] = List(PortEnum.DefaultPort.toString) - override val outportList: List[String] = List(PortEnum.NonePort.toString) + override val inportList: List[String] = List(PortEnum.NonePort.toString) + override val outportList: List[String] = List(PortEnum.DefaultPort.toString) override val description: String = "peforms an HTTP Post with " var url : String= _ @@ -80,7 +80,7 @@ class PostUrl extends ConfigurableStop{ } override def getGroup(): List[String] = { - List(StopGroupEnum.UrlGroup.toString) + List(StopGroupEnum.HttpGroup.toString) } override def initialize(ctx: ProcessContext): Unit = { diff --git a/piflow-bundle/src/main/scala/cn/piflow/conf/StopGroup.scala b/piflow-bundle/src/main/scala/cn/piflow/conf/StopGroup.scala index 4b36612..68a992f 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/conf/StopGroup.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/conf/StopGroup.scala @@ -16,7 +16,6 @@ case object CleanGroup extends StopGroup case object RedisGroup extends StopGroup case object KafkaGroup extends StopGroup case object ESGroup extends StopGroup -case object UrlGroup extends StopGroup case object HdfsGroup extends StopGroup case object MicroorganismGroup extends StopGroup case object ExcelGroup extends StopGroup diff --git a/piflow-bundle/src/main/scala/cn/piflow/conf/StopGroupEnum.scala b/piflow-bundle/src/main/scala/cn/piflow/conf/StopGroupEnum.scala index 3a1492c..565ccda 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/conf/StopGroupEnum.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/conf/StopGroupEnum.scala @@ -20,7 +20,6 @@ object StopGroupEnum extends Enumeration { val ESGroup = Value("ESGroup") val MLGroup = Value("MLGroup") val RDFGroup = Value("RDFGroup") - val UrlGroup= Value("UrlGroup") val HdfsGroup= Value("HdfsGroup") val MicroorganismGroup= Value("MicroorganismGroup") val Spider= Value("Spider")