goldData Parse

This commit is contained in:
yanggang 2018-11-22 14:46:04 +08:00
parent 5099851fe7
commit 4bb8abba9c
6 changed files with 22 additions and 38 deletions

View File

@ -7,7 +7,7 @@
{
"uuid":"0000",
"name":"getUrl",
"bundle":"cn.piflow.bundle.url.GetUrl",
"bundle":"cn.piflow.bundle.http.GetUrl",
"properties":{
"url":"http://10.0.86.98:8002/flow/info?appID=application_1539850523117_0136",
"types":"json",
@ -20,7 +20,7 @@
{
"uuid":"1111",
"name":"postUrl",
"bundle":"cn.piflow.bundle.url.PostUrl",
"bundle":"cn.piflow.bundle.http.PostUrl",
"properties":{
"url":"http://10.0.86.98:8002/flow/start",
"jsonPath":"hdfs://10.0.86.89:9000/yg/flow.json"
@ -30,7 +30,7 @@
{
"uuid":"2222",
"name":"invokeUrl",
"bundle":"cn.piflow.bundle.url.InvokeUrl",
"bundle":"cn.piflow.bundle.http.InvokeUrl",
"properties": {
"urlPut": "http://coolaf.com/tool/params",
"urlDelete": "http://coolaf.com/tool/params",

View File

@ -1,36 +1,30 @@
package cn.piflow.bundle.url
package cn.piflow.bundle.http
import java.net.URI
import java.util
import cn.piflow.bundle.util.JsonUtil
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.conf.util.ImageUtil
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroupEnum}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.MapUtil
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataInputStream, FileSystem, Path}
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroupEnum}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet}
import org.apache.http.impl.client.HttpClients
import org.apache.http.util.EntityUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.dom4j.{Document, DocumentHelper, Element}
import org.dom4j.io.SAXReader
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
class GetUrl extends ConfigurableStop{
override val authorEmail: String = "ygang@cmic.com"
override val description: String = "GetUrl to dataframe"
override val inportList: List[String] = List(PortEnum.NonePort.toString)
override val outportList: List[String] = List(PortEnum.DefaultPort.toString)
override val description: String = "Get data from http/url to dataframe"
var url :String= _
var types :String = _
@ -44,9 +38,6 @@ class GetUrl extends ConfigurableStop{
val ss = pec.get[SparkSession]()
println(url)
println(types)
// get from url
val client = HttpClients.createDefault()
val getFlowInfo:HttpGet = new HttpGet(url)
@ -167,13 +158,11 @@ class GetUrl extends ConfigurableStop{
}
override def getGroup(): List[String] = {
List(StopGroupEnum.UrlGroup.toString)
List(StopGroupEnum.HttpGroup.toString)
}
override def initialize(ctx: ProcessContext): Unit = {
}
override val inportList: List[String] = List(PortEnum.DefaultPort.toString)
override val outportList: List[String] = List(PortEnum.NonePort.toString)
}

View File

@ -1,4 +1,4 @@
package cn.piflow.bundle.url
package cn.piflow.bundle.http
import java.io.{BufferedReader, InputStreamReader}
import java.net.URI
@ -14,21 +14,18 @@ import org.apache.http.client.methods._
import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.HttpClients
import org.apache.http.util.EntityUtils
import org.apache.spark.SparkConf
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.dom4j.{Document, DocumentHelper, Element}
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
class InvokeUrl extends ConfigurableStop{
override val authorEmail: String = "ygang@cmic.com"
override val inportList: List[String] = List(PortEnum.DefaultPort.toString)
override val outportList: List[String] = List(PortEnum.NonePort.toString)
override val inportList: List[String] = List(PortEnum.NonePort.toString)
override val outportList: List[String] = List(PortEnum.DefaultPort.toString)
override val description: String = "invoke http "
var urlPut :String= _
@ -270,7 +267,7 @@ class InvokeUrl extends ConfigurableStop{
}
override def getGroup(): List[String] = {
List(StopGroupEnum.UrlGroup.toString)
List(StopGroupEnum.HttpGroup.toString)
}
override def initialize(ctx: ProcessContext): Unit = {

View File

@ -1,4 +1,4 @@
package cn.piflow.bundle.url
package cn.piflow.bundle.http
import java.io.{BufferedReader, InputStreamReader}
import java.net.URI
@ -19,8 +19,8 @@ import org.apache.spark.sql.SparkSession
class PostUrl extends ConfigurableStop{
override val authorEmail: String = "ygang@cmic.com"
override val inportList: List[String] = List(PortEnum.DefaultPort.toString)
override val outportList: List[String] = List(PortEnum.NonePort.toString)
override val inportList: List[String] = List(PortEnum.NonePort.toString)
override val outportList: List[String] = List(PortEnum.DefaultPort.toString)
override val description: String = "peforms an HTTP Post with "
var url : String= _
@ -80,7 +80,7 @@ class PostUrl extends ConfigurableStop{
}
override def getGroup(): List[String] = {
List(StopGroupEnum.UrlGroup.toString)
List(StopGroupEnum.HttpGroup.toString)
}
override def initialize(ctx: ProcessContext): Unit = {

View File

@ -16,7 +16,6 @@ case object CleanGroup extends StopGroup
case object RedisGroup extends StopGroup
case object KafkaGroup extends StopGroup
case object ESGroup extends StopGroup
case object UrlGroup extends StopGroup
case object HdfsGroup extends StopGroup
case object MicroorganismGroup extends StopGroup
case object ExcelGroup extends StopGroup

View File

@ -20,7 +20,6 @@ object StopGroupEnum extends Enumeration {
val ESGroup = Value("ESGroup")
val MLGroup = Value("MLGroup")
val RDFGroup = Value("RDFGroup")
val UrlGroup= Value("UrlGroup")
val HdfsGroup= Value("HdfsGroup")
val MicroorganismGroup= Value("MicroorganismGroup")
val Spider= Value("Spider")