doMap flatMap executeSql

This commit is contained in:
yanggang 2018-11-19 15:15:06 +08:00
parent 7f53e25275
commit a07cca1f8b
4 changed files with 356 additions and 0 deletions

View File

@ -0,0 +1,56 @@
package cn.piflow.bundle.common
import cn.piflow._
import cn.piflow.conf._
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.lib._
import cn.piflow.util.ScriptEngine
class DoFlatMapStop extends ConfigurableStop{
val authorEmail: String = "ygang@cnic.cn"
val description: String = "DoFlatMap Stop."
val inportList: List[String] = List(PortEnum.DefaultPort.toString)
val outportList: List[String] = List(PortEnum.AnyPort.toString)
var SCRIPT: String = _
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
in.read().show()
val doMap = new DoFlatMap(ScriptEngine.logic(SCRIPT))
doMap.perform(in,out,pec)
}
override def setProperties(map: Map[String, Any]): Unit = {
SCRIPT = MapUtil.get(map,"SCRIPT_2").asInstanceOf[String]
}
override def initialize(ctx: ProcessContext): Unit = {
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val SCRIPT = new PropertyDescriptor().name("SCRIPT").displayName("SCRIPT").description("").defaultValue("").required(true)
descriptor = SCRIPT :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("fork.png")
}
override def getGroup(): List[String] = {
List(StopGroupEnum.CommonGroup.toString)
}
}

View File

@ -0,0 +1,73 @@
package cn.piflow.bundle.common
import cn.piflow.conf._
import cn.piflow.lib._
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.util. ScriptEngine
import cn.piflow._
import cn.piflow.lib.io.{FileFormat, TextFile}
import org.apache.spark.sql.types.StructType
class DoMapStop extends ConfigurableStop{
val authorEmail: String = "ygang@cnic.cn"
val description: String = "DoMap stop."
val inportList: List[String] = List(PortEnum.DefaultPort.toString)
val outportList: List[String] = List(PortEnum.AnyPort.toString)
var targetSchema: StructType = null
var SCRIPT: String = _
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
in.read().show()
val doMap = new DoMap(ScriptEngine.logic(SCRIPT))
doMap.perform(in,out,pec)
}
def createCountWords() = {
val processCountWords = new FlowImpl();
processCountWords.addStop("LoadStream", new LoadStream(TextFile("hdfs://10.0.86.89:9000/yg/2", FileFormat.TEXT)));
processCountWords.addStop("DoMap", new DoMapStop);
processCountWords.addPath(Path.from("LoadStream").to("DoMap"));
new FlowAsStop(processCountWords);
}
override def setProperties(map: Map[String, Any]): Unit = {
SCRIPT = MapUtil.get(map,"SCRIPT_1").asInstanceOf[String]
}
override def initialize(ctx: ProcessContext): Unit = {
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val SCRIPT = new PropertyDescriptor().name("SCRIPT").displayName("SCRIPT").description("").defaultValue("").required(true)
descriptor = SCRIPT :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("fork.png")
}
override def getGroup(): List[String] = {
List(StopGroupEnum.CommonGroup.toString)
}
}

View File

@ -0,0 +1,99 @@
package cn.piflow.bundle.common
import breeze.collection.mutable.ArrayMap
import breeze.linalg.*
import cn.piflow._
import cn.piflow.conf._
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.lib._
import cn.piflow.lib.io.{FileFormat, TextFile}
import org.elasticsearch.common.collect.Tuple
class ExecuteSQLStop extends ConfigurableStop{
val authorEmail: String = "ygang@cnic.cn"
val description: String = "ExecuteSQL Stop "
val inportList: List[String] = List(PortEnum.DefaultPort.toString)
val outportList: List[String] = List(PortEnum.AnyPort.toString)
var sql: String = _
var bundle2TableNames: String = _
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val tableNames = bundle2TableNames.split(",")
for (i <- 0 until tableNames.length){
// 00->table1 ,.....
if (i== 0){
val imports = tableNames(i).split("->")(0)
val tableName = tableNames(i).split("->")(1)
val bundle2 = imports -> tableName
val doMap = new ExecuteSQL(sql,bundle2);
doMap.perform(in,out,pec)
} else {
val imports = tableNames(i).split("->")(0)
val tableName = tableNames(i).split("->")(1)
val bundle2:(String,String) = imports -> tableName
val doMap = new ExecuteSQL(sql,bundle2);
doMap.perform(in,out,pec)
}
}
}
def createCountWords() = {
val processCountWords = new FlowImpl();
//SparkProcess = loadStream + transform... + writeStream
processCountWords.addStop("LoadStream", new LoadStream(TextFile("hdfs://10.0.86.89:9000/yg/2", FileFormat.TEXT)));
processCountWords.addStop("DoMap", new ExecuteSQLStop);
processCountWords.addPath(Path.from("LoadStream").to("DoMap"));
new FlowAsStop(processCountWords);
}
override def setProperties(map: Map[String, Any]): Unit = {
sql = MapUtil.get(map,"sql").asInstanceOf[String]
bundle2TableNames = MapUtil.get(map,"bundle2TableName").asInstanceOf[String]
}
override def initialize(ctx: ProcessContext): Unit = {
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val sql = new PropertyDescriptor().name("sql").displayName("sql").description("sql").defaultValue("").required(true)
val bundle2TableNames = new PropertyDescriptor().name("bundle2TableNames").displayName("bundle2TableName").description(" bundle2TableName: (String, String)*) ").defaultValue("").required(true)
descriptor = sql :: descriptor
descriptor = bundle2TableNames :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("fork.png")
}
override def getGroup(): List[String] = {
List(StopGroupEnum.CommonGroup.toString)
}
}

View File

@ -0,0 +1,128 @@
package cn.piflow.bundle
import java.nio.charset.Charset
import cn.piflow.bundle.common.{DoFlatMapStop, DoMapStop, ExecuteSQLStop}
import cn.piflow.{FlowAsStop, FlowImpl, Path, Runner}
import cn.piflow.lib._
import cn.piflow.lib.io.{FileFormat, TextFile}
import org.apache.flume.api.{RpcClient, RpcClientFactory}
import org.apache.flume.event.EventBuilder
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
class DoMapTest {
@Test
def testFlowA() {
val flow = new FlowImpl();
flow.addStop("CountWords",createProcessCountWords);
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start()
//execute flow
val spark = SparkSession.builder.master("local[4]")
.getOrCreate();
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.start(flow);
process.awaitTermination();
spark.close();
}
val SCRIPT_1 =
"""
function (row) {
return $.Row(row.get(0).replaceAll("[\\x00-\\xff]||。|||“|”||| ", ""));
}
""";
val SCRIPT_2 =
"""
function (row) {
var arr = $.Array();
var str = row.get(0);
var len = str.length;
for (var i = 0; i < len - 1; i++) {
arr.add($.Row(str.substring(i, i + 2)));
}
return arr;
}
""";
val selectSQLParameters : Map[String, String] = Map("sql" -> "select value, count(*) count from table1 group by value order by count desc"
,"bundle2TableName" -> "->table1,->table1")
val fun1: Map[String, String] = Map("SCRIPT_1" -> SCRIPT_1)
val fun2: Map[String, String] = Map("SCRIPT_2" -> SCRIPT_2)
//var bundle2TableName: (String, String) = "" -> "table1"
def createProcessCountWords() = {
val doMap = new DoMapStop
doMap.setProperties(fun1)
val doFlat = new DoFlatMapStop
doFlat.setProperties(fun2)
val executeSQLStop = new ExecuteSQLStop
executeSQLStop.setProperties(selectSQLParameters)
val processCountWords = new FlowImpl();
//SparkProcess = loadStream + transform... + writeStream
processCountWords.addStop("LoadStream", new LoadStream(TextFile("hdfs://10.0.86.89:9000/xjzhu/honglou.txt", FileFormat.TEXT)));
processCountWords.addStop("DoMap", doMap);
processCountWords.addStop("DoFlatMap", doFlat);
processCountWords.addStop("ExecuteSQL", executeSQLStop);
processCountWords.addPath(Path.from("LoadStream").to("DoMap").to("DoFlatMap").to("ExecuteSQL"));
new FlowAsStop(processCountWords);
}
@Test
def flume(): Unit ={
val client = RpcClientFactory.getDefaultInstance(HOST_NAME,8888)
while(true) {
for (i <- 0 to 100) {
sendDateToFlume(client, "msg" + i)
}
}
}
val HOST_NAME="master"
val POST = 8888
def sendDateToFlume(client:RpcClient,msg:String)={
val event= EventBuilder.withBody(msg,Charset.forName("utf-8"))
client.append(event)
}
}