update stop

This commit is contained in:
or 2020-03-26 15:29:11 +08:00
parent 6d792531a2
commit c27ab1f914
7 changed files with 131 additions and 444 deletions

View File

@ -2,13 +2,12 @@ spark.master=yarn
spark.deploy.mode=cluster
#hdfs default file system
fs.defaultFS=hdfs://10.0.88.13:9000
fs.defaultFS=hdfs://192.168.3.138:8020
#yarn resourcemanager hostname
yarn.resourcemanager.hostname=10.0.88.13
yarn.resourcemanager.hostname=192.168.3.139
#if you want to use hive, set hive metastore uris
hive.metastore.uris=thrift://10.0.88.13:9083
hive.metastore.uris=thrift://192.168.3.140:9083
#show data in log, set 0 if you do not show the logs
data.show=10
@ -20,4 +19,4 @@ monitor.throughput=true
server.port=8001
#h2db port
h2.port=50001
h2.port=50001

View File

@ -0,0 +1,35 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"0000",
"name":"SelectHiveQL",
"bundle":"cn.piflow.bundle.hive.SelectHiveQL",
"properties":{
"hiveQL":"select * from test.user1"
}
}, {
"uuid":"1111",
"name":"ExecuteShell",
"bundle":"cn.piflow.bundle.script.ExecuteShell",
"properties":{
"IP": "192.168.3.140",
"User": "root",
"PassWord":"123456",
"shellString": "mkdir /work/###cp /opt/1.29.3.tar.gz /work/"
}
}
],
"paths":[
{
"from":"SelectHiveQL",
"outport":"",
"inport":"",
"to":"ExecuteShell"
}
]
}
}

View File

@ -1,247 +0,0 @@
package cn.piflow.bundle.http
import java.io.{BufferedReader, InputStreamReader}
import java.net.URI
import java.util
import cn.piflow._
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.conf.{ConfigurableStop, Port, StopGroup}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataInputStream, FileSystem, Path}
import org.apache.http.client.methods._
import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.HttpClients
import org.apache.http.util.EntityUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.dom4j.{Document, DocumentHelper, Element}
import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
class InvokeUrl extends ConfigurableStop{
override val authorEmail: String = "ygang@cnic.com"
override val inportList: List[String] = List(Port.NonePort.toString)
override val outportList: List[String] = List(Port.NonePort.toString)
override val description: String = "Http Invoke"
var url :String= _
var jsonPath :String =_
var method :String = _
var colume : String = _
// xml get
var label:String=_
var schema: String = _
var xmlString :String=_
var types :String = _
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
val client = HttpClients.createDefault()
if (method == "getHttp") {
val getFlowInfo: HttpGet = new HttpGet(url)
val response: CloseableHttpResponse = client.execute(getFlowInfo)
val entity = response.getEntity
val jsonString = EntityUtils.toString(entity, "UTF-8")
// json to df
if (types == "json") {
// json to df
val jsonRDD = spark.sparkContext.makeRDD(jsonString :: Nil)
val jsonDF = spark.read.json(jsonRDD)
//jsonDF.schema.printTreeString()
//jsonDF.show(10)
//jsonDF.select("app.id").show()
out.write(jsonDF)
}
if (types == "xml") {
val doc: Document = DocumentHelper.parseText(xmlString)
val rootElt: Element = doc.getRootElement
var arrbuffer: ArrayBuffer[Element] = ArrayBuffer()
arrbuffer += rootElt
val arrLabel: Array[String] = label.split(",")
for (x <- (1 until arrLabel.length)) {
var ele: Element = null
if (x == 1) {
ele = rootElt.element(arrLabel(x).toString)
} else {
ele = arrbuffer(x - 2).element(arrLabel(x).toString)
}
arrbuffer += ele
}
val FatherElement: Element = arrbuffer(arrbuffer.size - 2)
val arrSchame: Array[String] = schema.split(",")
var list: ListBuffer[String] = ListBuffer()
val FatherInterator: util.Iterator[_] = FatherElement.elementIterator(arrbuffer.last.getName)
val scalaIterator: Iterator[Element] = FatherInterator.asInstanceOf[util.Iterator[Element]].asScala
while (scalaIterator.hasNext) {
val value: Element = scalaIterator.next()
var text: String = ""
for (each <- arrSchame) {
text += value.element(each).getText + ","
}
list.+=(text.substring(0, text.length - 1))
}
val listRows: List[Row] = list.toList.map(line => {
val seq: Seq[String] = line.split(",").toSeq
val row = Row.fromSeq(seq)
row
})
val rowRDD: RDD[Row] = spark.sparkContext.makeRDD(listRows)
val fields: Array[StructField] = arrSchame.map(p => {
StructField(p, StringType, nullable = true)
})
val structType: StructType = StructType(fields)
val outDf: DataFrame = spark.createDataFrame(rowRDD, structType)
out.write(outDf)
}
if (method == "putHttp" || method == "postHttp") {
//read json from hdfs
val conf = new Configuration()
val fs = FileSystem.get(URI.create(jsonPath), conf)
val stream: FSDataInputStream = fs.open(new Path(jsonPath))
val bufferReader = new BufferedReader(new InputStreamReader(stream))
var lineTxt = bufferReader.readLine()
val buffer = new StringBuffer()
while (lineTxt != null) {
buffer.append(lineTxt.mkString)
lineTxt = bufferReader.readLine()
}
if (method == "putHttp") {
val put = new HttpPut(url)
put.setHeader("content-Type", "application/json")
//put.setHeader("Accept","application/json")
put.setEntity(new StringEntity(buffer.toString, "utf-8"))
val response = client.execute(put)
val entity = response.getEntity
var result = ""
if (entity != null) {
result = EntityUtils.toString(entity, "utf-8")
}
put.releaseConnection()
} else {
val post = new HttpPost(url)
post.setHeader("content-Type", "application/json")
post.setEntity(new StringEntity(buffer.toString))
val response = client.execute(post)
val entity = response.getEntity
val str = EntityUtils.toString(entity, "UTF-8")
}
}
if (method == "deleteHttp") {
val inDf = in.read()
inDf.createOrReplaceTempView("table")
val sqlDF = inDf.sqlContext.sql(s"select $colume from table")
val array = sqlDF.collect()
for (i <- 0 until array.length) {
var url1 = ""
val newArray = array(i)
var builder = new StringBuilder
for (i <- 0 until newArray.length) {
val columns = colume.split(",")
if (i == newArray.length - 1) {
builder.append(columns(i) + "=" + newArray(i))
} else {
builder.append(columns(i) + "=" + newArray(i) + "&")
}
}
url1 = url + "?" + builder
val delete = new HttpDelete(url1)
delete.setHeader("content-Type", "application/json")
val response = client.execute(delete)
val entity = response.getEntity
val str = EntityUtils.toString(entity, "UTF-8")
}
}
}
}
override def setProperties(map: Map[String, Any]): Unit = {
url = MapUtil.get(map,key="url").asInstanceOf[String]
jsonPath = MapUtil.get(map,key="jsonPath").asInstanceOf[String]
method = MapUtil.get(map,key = "method").asInstanceOf[String]
//delete
colume = MapUtil.get(map,key = "colume").asInstanceOf[String]
// get xml
xmlString = MapUtil.get(map,"XmlString").asInstanceOf[String]
label = MapUtil.get(map,"label").asInstanceOf[String]
schema = MapUtil.get(map,"schema").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val url = new PropertyDescriptor().name("url").displayName("url").defaultValue("").required(true)
val jsonPath = new PropertyDescriptor().name("jsonPath").displayName("JSONPATH").defaultValue("").required(true)
val method = new PropertyDescriptor().name("method").displayName("the way with http").defaultValue("").required(true)
val colume = new PropertyDescriptor().name("colume").displayName("colume").defaultValue("").required(true)
val types = new PropertyDescriptor().name("types").displayName("types").defaultValue("the url content is json or xml)").required(true)
descriptor = types :: descriptor
val xmlString = new PropertyDescriptor().name("XmlString").displayName("XmlString").description("the xml String").defaultValue("").required(true)
descriptor = xmlString :: descriptor
val label = new PropertyDescriptor().name("label").displayName("label").description("label path for hope,the delimiter is ,").defaultValue("").required(true)
descriptor = label :: descriptor
val schema = new PropertyDescriptor().name("schema").displayName("schema").description("name of field in label,the delimiter is ,").defaultValue("").required(true)
descriptor = schema :: descriptor
descriptor = jsonPath :: descriptor
descriptor = method :: descriptor
descriptor = colume :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("icon/http/InvokeUrl.png")
}
override def getGroup(): List[String] = {
List(StopGroup.HttpGroup.toString)
}
override def initialize(ctx: ProcessContext): Unit = {
}
}

View File

@ -11,21 +11,18 @@ import org.apache.spark.sql.SparkSession
class ExecuteShell extends ConfigurableStop{
override val authorEmail: String = "ygang@cnic.cn"
override val description: String = "Execute shell script"
val inportList: List[String] = List(Port.DefaultPort.toString)
val outportList: List[String] = List(Port.DefaultPort.toString)
val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.DefaultPort)
var IP :String= _
var User :String= _
var PassWord :String = _
var shellString:String =_
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark: SparkSession = pec.get[SparkSession]()
val spark = pec.get[SparkSession]()
val executor: RemoteShellExecutor = new RemoteShellExecutor(IP,User,PassWord)
val strings: Array[String] = shellString.split("###")
strings.foreach(x=>{
executor.exec(x.toString)
@ -37,19 +34,43 @@ class ExecuteShell extends ConfigurableStop{
IP = MapUtil.get(map,key="IP").asInstanceOf[String]
User = MapUtil.get(map,key="User").asInstanceOf[String]
PassWord = MapUtil.get(map,key="PassWord").asInstanceOf[String]
shellString = MapUtil.get(map,key="shellString").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val IP = new PropertyDescriptor().name("IP").displayName("IP").defaultValue("").required(true)
val User = new PropertyDescriptor().name("User").displayName("User").defaultValue("root").required(true)
val PassWord = new PropertyDescriptor().name("PassWord").displayName("PassWord").defaultValue("").required(true)
val IP = new PropertyDescriptor()
.name("IP")
.displayName("IP")
.description("Server IP where the local file is located")
.defaultValue("")
.required(true)
.description("192.168.3.139")
val shellString = new PropertyDescriptor().name("shellString").displayName("shellString").description("split by ### ").defaultValue("").required(true)
val User = new PropertyDescriptor()
.name("User")
.displayName("User")
.description("Server User where the local file is located")
.defaultValue("root")
.required(true)
.example("root")
val PassWord = new PropertyDescriptor()
.name("PassWord")
.displayName("PassWord")
.description("Password of the server where the local file is located")
.defaultValue("")
.required(true)
.example("123456")
val shellString = new PropertyDescriptor()
.name("shellString")
.displayName("ShellString")
.description("shell script, multiple sentences separated by ###")
.defaultValue("")
.required(true)
.example("mkdir /work/###cp /opt/1.29.3.tar.gz /work/")
descriptor = IP :: descriptor
@ -64,7 +85,7 @@ class ExecuteShell extends ConfigurableStop{
}
override def getGroup(): List[String] = {
List(StopGroup.FileGroup.toString)
List(StopGroup.FileGroup)
}
override def initialize(ctx: ProcessContext): Unit = { }

View File

@ -1,69 +0,0 @@
//package cn.piflow.bundle.script
//
//import cn.piflow.conf.bean.PropertyDescriptor
//import cn.piflow.conf.util.{ImageUtil, MapUtil}
//import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
//import cn.piflow.conf._
//import org.apache.spark.sql.{Row, SparkSession}
//import org.apache.spark.sql.types.{StringType, StructField, StructType}
//
//import scala.beans.BeanProperty
//import sys.process._
//
//class ShellExecutor extends ConfigurableStop{
//
// val authorEmail: String = "xjzhu@cnic.cn"
// val description: String = "Execute shell script"
// val inportList: List[String] = List(PortEnum.NonePort.toString)
// val outportList: List[String] = List(PortEnum.DefaultPort.toString)
//
// var shellPath: String = _
// var args: String = _
// var outputSchema: String = _
//
//
// override def setProperties(map: Map[String, Any]): Unit = {
//
// shellPath = MapUtil.get(map,"shellPath").asInstanceOf[String]
// args = MapUtil.get(map,"args").asInstanceOf[String]
// }
//
// override def getPropertyDescriptor(): List[PropertyDescriptor] = {
// var descriptor : List[PropertyDescriptor] = List()
// val shellPath = new PropertyDescriptor().name("shellPath").displayName("shellPath").description("The path of shell script").defaultValue("").required(true)
// val args = new PropertyDescriptor().name("args").displayName("args").description("The arguments of the shell script").defaultValue("").required(true)
// descriptor = shellPath :: descriptor
// descriptor = args :: descriptor
// descriptor
// }
//
// override def getIcon(): Array[Byte] = {
// ImageUtil.getImage("icon/script/ShellExecutor.png")
// }
//
// override def getGroup(): List[String] = {
// List(StopGroup.ScriptGroup.toString)
// }
//
// override def initialize(ctx: ProcessContext): Unit = {
//
// }
//
// override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
// val command = shellPath + " " + args
// val result = command!!
//
// var rowList : List[Row] = List()
// val rawData = result.split("\n").toList
// rawData.foreach( s => rowList = Row(s) :: rowList )
//
// val spark = pec.get[SparkSession]()
//
// val rowsRdd = spark.sparkContext.parallelize(rowList)
// val schema = StructType(List(new StructField("row", StringType, nullable = true)))
// val df = spark.createDataFrame(rowsRdd,schema)
// //df.show()
//
// out.write(df)
// }
//}

View File

@ -0,0 +1,58 @@
package cn.piflow.bundle.script
import java.net.InetAddress
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.{PropertyUtil, ServerIpUtil}
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
class ExecuteShellTest {
@Test
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/script/shell.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val ip = InetAddress.getLocalHost.getHostAddress
cn.piflow.util.FileUtil.writeFile("server.ip=" + ip, ServerIpUtil.getServerIpFile())
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start()
//execute flow
val spark = SparkSession.builder()
.master("local[12]")
.appName("hive")
.config("spark.driver.memory", "4g")
.config("spark.executor.memory", "8g")
.config("spark.cores.max", "8")
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "")
.bind("debug.path","")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
}

View File

@ -1,110 +0,0 @@
package cn.piflow.bundle.test
import java.nio.charset.Charset
import cn.piflow.bundle.common.ExecuteSQLStop
import cn.piflow.lib._
import cn.piflow.lib.io.{FileFormat, TextFile}
import cn.piflow.{FlowAsStop, FlowImpl, Path, Runner}
import org.apache.flume.api.{RpcClient, RpcClientFactory}
import org.apache.flume.event.EventBuilder
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
class DoMapTest {
@Test
def testFlowA() {
val flow = new FlowImpl();
flow.addStop("CountWords",createProcessCountWords);
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start()
//execute flow
val spark = SparkSession.builder.master("local[4]")
.getOrCreate();
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.start(flow);
process.awaitTermination();
spark.close();
}
val SCRIPT_1 =
"""
function (row) {
return $.Row(row.get(0).replaceAll("[\\x00-\\xff]||。|||“|”||| ", ""));
}
""";
val SCRIPT_2 =
"""
function (row) {
var arr = $.Array();
var str = row.get(0);
var len = str.length;
for (var i = 0; i < len - 1; i++) {
arr.add($.Row(str.substring(i, i + 2)));
}
return arr;
}
""";
val selectSQLParameters : Map[String, String] = Map("sql" -> "select value, count(*) count from table1 group by value order by count desc"
,"bundle2TableName" -> "->table1,->table1")
val fun1: Map[String, String] = Map("SCRIPT_1" -> SCRIPT_1)
val fun2: Map[String, String] = Map("SCRIPT_2" -> SCRIPT_2)
//var bundle2TableName: (String, String) = "" -> "table1"
def createProcessCountWords() = {
val doMap = new DoMapStop
doMap.setProperties(fun1)
val doFlat = new DoFlatMapStop
doFlat.setProperties(fun2)
val executeSQLStop = new ExecuteSQLStop
executeSQLStop.setProperties(selectSQLParameters)
val processCountWords = new FlowImpl();
//SparkProcess = loadStream + transform... + writeStream
processCountWords.addStop("LoadStream", new LoadStream(TextFile("hdfs://10.0.86.89:9000/xjzhu/honglou.txt", FileFormat.TEXT)));
processCountWords.addStop("DoMap", doMap);
processCountWords.addStop("DoFlatMap", doFlat);
processCountWords.addStop("ExecuteSQL", executeSQLStop);
processCountWords.addPath(Path.from("LoadStream").to("DoMap").to("DoFlatMap").to("ExecuteSQL"));
new FlowAsStop(processCountWords);
}
@Test
def flume(): Unit ={
val client = RpcClientFactory.getDefaultInstance(HOST_NAME,8888)
while(true) {
for (i <- 0 to 100) {
sendDateToFlume(client, "msg" + i)
}
}
}
val HOST_NAME="master"
val POST = 8888
def sendDateToFlume(client:RpcClient,msg:String)={
val event= EventBuilder.withBody(msg,Charset.forName("utf-8"))
client.append(event)
}
}