From a07cca1f8b32983fa8f9cd8c73c8aa70b732d064 Mon Sep 17 00:00:00 2001 From: yanggang Date: Mon, 19 Nov 2018 15:15:06 +0800 Subject: [PATCH] doMap flatMap executeSql --- .../piflow/bundle/common/DoFlatMapStop.scala | 56 ++++++++ .../cn/piflow/bundle/common/DoMapStop.scala | 73 ++++++++++ .../piflow/bundle/common/ExecuteSQLStop.scala | 99 ++++++++++++++ .../scala/cn/piflow/bundle/DoMapTest.scala | 128 ++++++++++++++++++ 4 files changed, 356 insertions(+) create mode 100644 piflow-bundle/src/main/scala/cn/piflow/bundle/common/DoFlatMapStop.scala create mode 100644 piflow-bundle/src/main/scala/cn/piflow/bundle/common/DoMapStop.scala create mode 100644 piflow-bundle/src/main/scala/cn/piflow/bundle/common/ExecuteSQLStop.scala create mode 100644 piflow-bundle/src/test/scala/cn/piflow/bundle/DoMapTest.scala diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/DoFlatMapStop.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/DoFlatMapStop.scala new file mode 100644 index 0000000..a3b5c66 --- /dev/null +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/DoFlatMapStop.scala @@ -0,0 +1,56 @@ +package cn.piflow.bundle.common + +import cn.piflow._ +import cn.piflow.conf._ +import cn.piflow.conf.bean.PropertyDescriptor +import cn.piflow.conf.util.{ImageUtil, MapUtil} +import cn.piflow.lib._ +import cn.piflow.util.ScriptEngine + + +class DoFlatMapStop extends ConfigurableStop{ + + + val authorEmail: String = "ygang@cnic.cn" + val description: String = "DoFlatMap Stop." + val inportList: List[String] = List(PortEnum.DefaultPort.toString) + val outportList: List[String] = List(PortEnum.AnyPort.toString) + + var SCRIPT: String = _ + + override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { + in.read().show() + + val doMap = new DoFlatMap(ScriptEngine.logic(SCRIPT)) + doMap.perform(in,out,pec) + + } + + override def setProperties(map: Map[String, Any]): Unit = { + SCRIPT = MapUtil.get(map,"SCRIPT_2").asInstanceOf[String] + + } + override def initialize(ctx: ProcessContext): Unit = { + + } + override def getPropertyDescriptor(): List[PropertyDescriptor] = { + var descriptor : List[PropertyDescriptor] = List() + val SCRIPT = new PropertyDescriptor().name("SCRIPT").displayName("SCRIPT").description("").defaultValue("").required(true) + descriptor = SCRIPT :: descriptor + descriptor + } + + override def getIcon(): Array[Byte] = { + ImageUtil.getImage("fork.png") + } + + override def getGroup(): List[String] = { + List(StopGroupEnum.CommonGroup.toString) + } + + + +} + + + diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/DoMapStop.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/DoMapStop.scala new file mode 100644 index 0000000..2546687 --- /dev/null +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/DoMapStop.scala @@ -0,0 +1,73 @@ +package cn.piflow.bundle.common + +import cn.piflow.conf._ +import cn.piflow.lib._ +import cn.piflow.conf.bean.PropertyDescriptor +import cn.piflow.conf.util.{ImageUtil, MapUtil} +import cn.piflow.util. ScriptEngine +import cn.piflow._ +import cn.piflow.lib.io.{FileFormat, TextFile} +import org.apache.spark.sql.types.StructType + + + +class DoMapStop extends ConfigurableStop{ + + + val authorEmail: String = "ygang@cnic.cn" + val description: String = "DoMap stop." + val inportList: List[String] = List(PortEnum.DefaultPort.toString) + val outportList: List[String] = List(PortEnum.AnyPort.toString) + + var targetSchema: StructType = null + var SCRIPT: String = _ + + override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { + in.read().show() + + val doMap = new DoMap(ScriptEngine.logic(SCRIPT)) + doMap.perform(in,out,pec) + + + } + + def createCountWords() = { + + val processCountWords = new FlowImpl(); + processCountWords.addStop("LoadStream", new LoadStream(TextFile("hdfs://10.0.86.89:9000/yg/2", FileFormat.TEXT))); + processCountWords.addStop("DoMap", new DoMapStop); + + processCountWords.addPath(Path.from("LoadStream").to("DoMap")); + + new FlowAsStop(processCountWords); + } + + + override def setProperties(map: Map[String, Any]): Unit = { + SCRIPT = MapUtil.get(map,"SCRIPT_1").asInstanceOf[String] + + } + override def initialize(ctx: ProcessContext): Unit = { + + } + override def getPropertyDescriptor(): List[PropertyDescriptor] = { + var descriptor : List[PropertyDescriptor] = List() + val SCRIPT = new PropertyDescriptor().name("SCRIPT").displayName("SCRIPT").description("").defaultValue("").required(true) + descriptor = SCRIPT :: descriptor + descriptor + } + + override def getIcon(): Array[Byte] = { + ImageUtil.getImage("fork.png") + } + + override def getGroup(): List[String] = { + List(StopGroupEnum.CommonGroup.toString) + } + + + +} + + + diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/common/ExecuteSQLStop.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/ExecuteSQLStop.scala new file mode 100644 index 0000000..543d642 --- /dev/null +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/common/ExecuteSQLStop.scala @@ -0,0 +1,99 @@ +package cn.piflow.bundle.common + +import breeze.collection.mutable.ArrayMap +import breeze.linalg.* +import cn.piflow._ +import cn.piflow.conf._ +import cn.piflow.conf.bean.PropertyDescriptor +import cn.piflow.conf.util.{ImageUtil, MapUtil} +import cn.piflow.lib._ +import cn.piflow.lib.io.{FileFormat, TextFile} +import org.elasticsearch.common.collect.Tuple + + +class ExecuteSQLStop extends ConfigurableStop{ + + val authorEmail: String = "ygang@cnic.cn" + val description: String = "ExecuteSQL Stop " + val inportList: List[String] = List(PortEnum.DefaultPort.toString) + val outportList: List[String] = List(PortEnum.AnyPort.toString) + + var sql: String = _ + var bundle2TableNames: String = _ + + + + + override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { + + + val tableNames = bundle2TableNames.split(",") + for (i <- 0 until tableNames.length){ + + + // 00->table1 ,..... + if (i== 0){ + val imports = tableNames(i).split("->")(0) + val tableName = tableNames(i).split("->")(1) + val bundle2 = imports -> tableName + + val doMap = new ExecuteSQL(sql,bundle2); + doMap.perform(in,out,pec) + + } else { + val imports = tableNames(i).split("->")(0) + val tableName = tableNames(i).split("->")(1) + val bundle2:(String,String) = imports -> tableName + + val doMap = new ExecuteSQL(sql,bundle2); + doMap.perform(in,out,pec) + } + } + + + } + + def createCountWords() = { + + val processCountWords = new FlowImpl(); + //SparkProcess = loadStream + transform... + writeStream + processCountWords.addStop("LoadStream", new LoadStream(TextFile("hdfs://10.0.86.89:9000/yg/2", FileFormat.TEXT))); + processCountWords.addStop("DoMap", new ExecuteSQLStop); + + processCountWords.addPath(Path.from("LoadStream").to("DoMap")); + + new FlowAsStop(processCountWords); + } + + + override def setProperties(map: Map[String, Any]): Unit = { + sql = MapUtil.get(map,"sql").asInstanceOf[String] + bundle2TableNames = MapUtil.get(map,"bundle2TableName").asInstanceOf[String] + + } + override def initialize(ctx: ProcessContext): Unit = { + + } + override def getPropertyDescriptor(): List[PropertyDescriptor] = { + var descriptor : List[PropertyDescriptor] = List() + val sql = new PropertyDescriptor().name("sql").displayName("sql").description("sql").defaultValue("").required(true) + val bundle2TableNames = new PropertyDescriptor().name("bundle2TableNames").displayName("bundle2TableName").description(" bundle2TableName: (String, String)*) ").defaultValue("").required(true) + descriptor = sql :: descriptor + descriptor = bundle2TableNames :: descriptor + descriptor + } + + override def getIcon(): Array[Byte] = { + ImageUtil.getImage("fork.png") + } + + override def getGroup(): List[String] = { + List(StopGroupEnum.CommonGroup.toString) + } + + + +} + + + diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/DoMapTest.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/DoMapTest.scala new file mode 100644 index 0000000..2f8e165 --- /dev/null +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/DoMapTest.scala @@ -0,0 +1,128 @@ +package cn.piflow.bundle + +import java.nio.charset.Charset +import cn.piflow.bundle.common.{DoFlatMapStop, DoMapStop, ExecuteSQLStop} +import cn.piflow.{FlowAsStop, FlowImpl, Path, Runner} +import cn.piflow.lib._ +import cn.piflow.lib.io.{FileFormat, TextFile} +import org.apache.flume.api.{RpcClient, RpcClientFactory} +import org.apache.flume.event.EventBuilder +import org.apache.spark.sql.SparkSession +import org.h2.tools.Server +import org.junit.Test + +import scala.util.parsing.json.JSON + +class DoMapTest { + + @Test + def testFlowA() { + val flow = new FlowImpl(); + flow.addStop("CountWords",createProcessCountWords); + + val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start() + //execute flow + val spark = SparkSession.builder.master("local[4]") + .getOrCreate(); + + val process = Runner.create() + .bind(classOf[SparkSession].getName, spark) + .start(flow); + + process.awaitTermination(); + spark.close(); + } + + val SCRIPT_1 = + """ + function (row) { + return $.Row(row.get(0).replaceAll("[\\x00-\\xff]|,|。|:|.|“|”|?|!| ", "")); + } + """; + val SCRIPT_2 = + """ + function (row) { + var arr = $.Array(); + var str = row.get(0); + var len = str.length; + for (var i = 0; i < len - 1; i++) { + arr.add($.Row(str.substring(i, i + 2))); + } + + return arr; + } + """; + + val selectSQLParameters : Map[String, String] = Map("sql" -> "select value, count(*) count from table1 group by value order by count desc" + ,"bundle2TableName" -> "->table1,->table1") + val fun1: Map[String, String] = Map("SCRIPT_1" -> SCRIPT_1) + val fun2: Map[String, String] = Map("SCRIPT_2" -> SCRIPT_2) + + //var bundle2TableName: (String, String) = "" -> "table1" + + def createProcessCountWords() = { + + val doMap = new DoMapStop + doMap.setProperties(fun1) + + val doFlat = new DoFlatMapStop + doFlat.setProperties(fun2) + + val executeSQLStop = new ExecuteSQLStop + executeSQLStop.setProperties(selectSQLParameters) + + val processCountWords = new FlowImpl(); + //SparkProcess = loadStream + transform... + writeStream + processCountWords.addStop("LoadStream", new LoadStream(TextFile("hdfs://10.0.86.89:9000/xjzhu/honglou.txt", FileFormat.TEXT))); + processCountWords.addStop("DoMap", doMap); + processCountWords.addStop("DoFlatMap", doFlat); + processCountWords.addStop("ExecuteSQL", executeSQLStop); + + processCountWords.addPath(Path.from("LoadStream").to("DoMap").to("DoFlatMap").to("ExecuteSQL")); + + new FlowAsStop(processCountWords); + } + + + + + + + + + + + + + + + + + + + + + + + @Test + def flume(): Unit ={ + val client = RpcClientFactory.getDefaultInstance(HOST_NAME,8888) + while(true) { + for (i <- 0 to 100) { + sendDateToFlume(client, "msg" + i) + } + } + } + + val HOST_NAME="master" + val POST = 8888 + + + def sendDateToFlume(client:RpcClient,msg:String)={ + + val event= EventBuilder.withBody(msg,Charset.forName("utf-8")) + client.append(event) + } + + +}