diff --git a/piflow-bundle/config.properties b/piflow-bundle/config.properties index 7dfb143..4c1da0c 100644 --- a/piflow-bundle/config.properties +++ b/piflow-bundle/config.properties @@ -2,12 +2,12 @@ spark.master=yarn spark.deploy.mode=cluster #hdfs default file system -fs.defaultFS=hdfs://192.168.3.138:8020 +fs.defaultFS=hdfs://10.0.88.13:9000 #yarn resourcemanager hostname -yarn.resourcemanager.hostname=192.168.3.139 +yarn.resourcemanager.hostname=10.0.88.13 #if you want to use hive, set hive metastore uris -hive.metastore.uris=thrift://192.168.3.140:9083 +hive.metastore.uris=thrift://10.0.88.13:9083 #show data in log, set 0 if you do not show the logs data.show=10 diff --git a/piflow-bundle/server.ip b/piflow-bundle/server.ip new file mode 100644 index 0000000..39633ba --- /dev/null +++ b/piflow-bundle/server.ip @@ -0,0 +1 @@ +server.ip=10.0.85.83 \ No newline at end of file diff --git a/piflow-bundle/src/main/resources/scala.json b/piflow-bundle/src/main/resources/flow/script/scala.json similarity index 72% rename from piflow-bundle/src/main/resources/scala.json rename to piflow-bundle/src/main/resources/flow/script/scala.json index 38ce2c0..05b3624 100644 --- a/piflow-bundle/src/main/resources/scala.json +++ b/piflow-bundle/src/main/resources/flow/script/scala.json @@ -19,8 +19,7 @@ "name":"ExecuteScala", "bundle":"cn.piflow.bundle.script.ExecuteScala", "properties":{ - "script":"import sys\nimport os\n\nimport numpy as np\nfrom scipy import linalg\nimport pandas as pd\n\nimport matplotlib\nmatplotlib.use('Agg')\n\n\ndef listFunction(dictInfo):\n\n return dictInfo", - "execFunction": "listFunction" + "script":"val t = 3 + 5 \n println(\" result = \" + t)" } } ], diff --git a/piflow-bundle/src/main/resources/icon/script/scala.jpg b/piflow-bundle/src/main/resources/icon/script/scala.jpg new file mode 100644 index 0000000..f7ade05 Binary files /dev/null and b/piflow-bundle/src/main/resources/icon/script/scala.jpg differ diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/script/ExecuteScala.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/script/ExecuteScala.scala new file mode 100644 index 0000000..dcd080a --- /dev/null +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/script/ExecuteScala.scala @@ -0,0 +1,67 @@ +package cn.piflow.bundle.script + +import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} +import cn.piflow.conf.{ConfigurableStop, Port, StopGroup} +import cn.piflow.conf.bean.PropertyDescriptor +import cn.piflow.conf.util.{ImageUtil, MapUtil} +import org.apache.spark.sql.{DataFrame, SparkSession} + +import scala.reflect.runtime.universe._ +import scala.reflect.runtime.currentMirror +import scala.tools.reflect.ToolBox + +import scala.reflect.macros.whitebox.Context +import scala.language.experimental.macros + +class ExecuteScala extends ConfigurableStop{ + override val authorEmail: String = "xjzhu@cnic.cn" + override val description: String = "Execute scala script" + override val inportList: List[String] = List(Port.DefaultPort) + override val outportList: List[String] = List(Port.DefaultPort) + + var script : String = _ + + override def setProperties(map: Map[String, Any]): Unit = { + script = MapUtil.get(map,"script").asInstanceOf[String] + } + + override def getPropertyDescriptor(): List[PropertyDescriptor] = { + var descriptor : List[PropertyDescriptor] = List() + val script = new PropertyDescriptor() + .name("script") + .displayName("script") + .description("The code of scala") + .defaultValue("") + .required(true) + .example("val t = 3 + 5 \n println(t)") + //val execFunction = new PropertyDescriptor().name("execFunction").displayName("execFunction").description("The function of python script to be executed.").defaultValue("").required(true) + descriptor = script :: descriptor + //descriptor = execFunction :: descriptor + descriptor + } + + override def getIcon(): Array[Byte] = { + ImageUtil.getImage("icon/script/scala.jpg") + } + + override def getGroup(): List[String] = { + List(StopGroup.ScriptGroup) + } + + override def initialize(ctx: ProcessContext): Unit = {} + + override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { + + val spark = pec.get[SparkSession]() + import spark.implicits._ + + val toolBox = currentMirror.mkToolBox() + val result2 = evalCode[Unit](script) + + } + + private def evalCode[T](code : String) : T = { + val toolBox = currentMirror.mkToolBox() + toolBox.eval(toolBox.parse(code)).asInstanceOf[T] + } +} diff --git a/piflow-bundle/src/main/scala/cn/piflow/conf/util/MapUtil.scala b/piflow-bundle/src/main/scala/cn/piflow/conf/util/MapUtil.scala index 3640da1..6a3ba7b 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/conf/util/MapUtil.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/conf/util/MapUtil.scala @@ -6,6 +6,7 @@ object MapUtil { map.get(key) match { case None => None case Some(x:String) => x + case Some(x:Integer) => x case Some(x:List[String]) => x case Some(x:List[Map[String, String]]) => x case Some(x:Map[String, Any]) => x diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/script/ExecuteScalaTest.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/script/ExecuteScalaTest.scala new file mode 100644 index 0000000..896e52f --- /dev/null +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/script/ExecuteScalaTest.scala @@ -0,0 +1,58 @@ +package cn.piflow.bundle.script + +import java.net.InetAddress + +import cn.piflow.Runner +import cn.piflow.conf.bean.FlowBean +import cn.piflow.conf.util.{FileUtil, OptionUtil} +import cn.piflow.util.{PropertyUtil, ServerIpUtil} +import org.apache.spark.sql.SparkSession +import org.h2.tools.Server +import org.junit.Test + +import scala.util.parsing.json.JSON + +class ExecuteScalaTest { + + @Test + def testFlow(): Unit ={ + + //parse flow json + val file = "src/main/resources/flow/script/scala.json" + val flowJsonStr = FileUtil.fileReader(file) + val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]] + println(map) + + //create flow + val flowBean = FlowBean(map) + val flow = flowBean.constructFlow() + + + val ip = InetAddress.getLocalHost.getHostAddress + cn.piflow.util.FileUtil.writeFile("server.ip=" + ip, ServerIpUtil.getServerIpFile()) + val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start() + //execute flow + val spark = SparkSession.builder() + .master("local[12]") + .appName("hive") + .config("spark.driver.memory", "4g") + .config("spark.executor.memory", "8g") + .config("spark.cores.max", "8") + .config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris")) + .enableHiveSupport() + .getOrCreate() + + val process = Runner.create() + .bind(classOf[SparkSession].getName, spark) + .bind("checkpoint.path", "") + .bind("debug.path","") + .start(flow); + + process.awaitTermination(); + val pid = process.pid(); + println(pid + "!!!!!!!!!!!!!!!!!!!!!") + spark.close(); + } + + +}