add ExecuteScala stop

This commit is contained in:
judy0131 2020-04-07 17:53:49 +08:00
parent ca5da28573
commit 3c9c621433
7 changed files with 131 additions and 5 deletions

View File

@ -2,12 +2,12 @@ spark.master=yarn
spark.deploy.mode=cluster
#hdfs default file system
fs.defaultFS=hdfs://192.168.3.138:8020
fs.defaultFS=hdfs://10.0.88.13:9000
#yarn resourcemanager hostname
yarn.resourcemanager.hostname=192.168.3.139
yarn.resourcemanager.hostname=10.0.88.13
#if you want to use hive, set hive metastore uris
hive.metastore.uris=thrift://192.168.3.140:9083
hive.metastore.uris=thrift://10.0.88.13:9083
#show data in log, set 0 if you do not show the logs
data.show=10

1
piflow-bundle/server.ip Normal file
View File

@ -0,0 +1 @@
server.ip=10.0.85.83

View File

@ -19,8 +19,7 @@
"name":"ExecuteScala",
"bundle":"cn.piflow.bundle.script.ExecuteScala",
"properties":{
"script":"import sys\nimport os\n\nimport numpy as np\nfrom scipy import linalg\nimport pandas as pd\n\nimport matplotlib\nmatplotlib.use('Agg')\n\n\ndef listFunction(dictInfo):\n\n return dictInfo",
"execFunction": "listFunction"
"script":"val t = 3 + 5 \n println(\" result = \" + t)"
}
}
],

Binary file not shown.

After

Width:  |  Height:  |  Size: 34 KiB

View File

@ -0,0 +1,67 @@
package cn.piflow.bundle.script
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.conf.{ConfigurableStop, Port, StopGroup}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.reflect.runtime.universe._
import scala.reflect.runtime.currentMirror
import scala.tools.reflect.ToolBox
import scala.reflect.macros.whitebox.Context
import scala.language.experimental.macros
class ExecuteScala extends ConfigurableStop{
override val authorEmail: String = "xjzhu@cnic.cn"
override val description: String = "Execute scala script"
override val inportList: List[String] = List(Port.DefaultPort)
override val outportList: List[String] = List(Port.DefaultPort)
var script : String = _
override def setProperties(map: Map[String, Any]): Unit = {
script = MapUtil.get(map,"script").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val script = new PropertyDescriptor()
.name("script")
.displayName("script")
.description("The code of scala")
.defaultValue("")
.required(true)
.example("val t = 3 + 5 \n println(t)")
//val execFunction = new PropertyDescriptor().name("execFunction").displayName("execFunction").description("The function of python script to be executed.").defaultValue("").required(true)
descriptor = script :: descriptor
//descriptor = execFunction :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("icon/script/scala.jpg")
}
override def getGroup(): List[String] = {
List(StopGroup.ScriptGroup)
}
override def initialize(ctx: ProcessContext): Unit = {}
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
import spark.implicits._
val toolBox = currentMirror.mkToolBox()
val result2 = evalCode[Unit](script)
}
private def evalCode[T](code : String) : T = {
val toolBox = currentMirror.mkToolBox()
toolBox.eval(toolBox.parse(code)).asInstanceOf[T]
}
}

View File

@ -6,6 +6,7 @@ object MapUtil {
map.get(key) match {
case None => None
case Some(x:String) => x
case Some(x:Integer) => x
case Some(x:List[String]) => x
case Some(x:List[Map[String, String]]) => x
case Some(x:Map[String, Any]) => x

View File

@ -0,0 +1,58 @@
package cn.piflow.bundle.script
import java.net.InetAddress
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.{PropertyUtil, ServerIpUtil}
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
class ExecuteScalaTest {
@Test
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/flow/script/scala.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
val ip = InetAddress.getLocalHost.getHostAddress
cn.piflow.util.FileUtil.writeFile("server.ip=" + ip, ServerIpUtil.getServerIpFile())
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start()
//execute flow
val spark = SparkSession.builder()
.master("local[12]")
.appName("hive")
.config("spark.driver.memory", "4g")
.config("spark.executor.memory", "8g")
.config("spark.cores.max", "8")
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "")
.bind("debug.path","")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
}