diff --git a/piflow-bundle/src/main/resources/flow/script/python.json b/piflow-bundle/src/main/resources/flow/script/python.json index d11c52d..b6ddd6b 100644 --- a/piflow-bundle/src/main/resources/flow/script/python.json +++ b/piflow-bundle/src/main/resources/flow/script/python.json @@ -17,10 +17,9 @@ { "uuid":"2222", "name":"PythonExecutor", - "bundle":"cn.piflow.bundle.script.ExecutePythonWithDataFrame", + "bundle":"cn.piflow.bundle.script.ExecutePython", "properties":{ - "script":"import sys\nimport os\n\nimport numpy as np\nfrom scipy import linalg\nimport pandas as pd\n\nimport matplotlib\nmatplotlib.use('Agg')\n\n\ndef listFunction(dictInfo):\n\n return dictInfo", - "execFunction": "listFunction" + "script":"import sys\nimport os\n\nimport numpy as np\nfrom scipy import linalg\nimport pandas as pd\n\nimport matplotlib\nmatplotlib.use('Agg')\nimport matplotlib.pyplot as plt\n\nimport seaborn as sns\n\nimport timeit\nimport numpy.random as np_random\nfrom numpy.linalg import inv, qr\nfrom random import normalvariate\n\nimport pylab\n\nif __name__ == \"__main__\":\n print(\"Hello PiFlow\")\n try:\n print(\"\\n mock dataļ¼š\")\n nsteps = 1000\n draws = np.random.randint(0,2,size=nsteps)\n print(\"\\n \" + str(draws))\n steps = np.where(draws > 0, 1, -1)\n walk = steps.cumsum()\n print(\"Draw picture\")\n plt.title('Random Walk')\n limit = max(abs(min(walk)), abs(max(walk)))\n plt.axis([0, nsteps, -limit, limit])\n x = np.linspace(0,nsteps, nsteps)\n plt.plot(x, walk, 'g-')\n plt.savefig('/opt/python.png')\n except Exception as e:\n print(e)" } } ], @@ -29,7 +28,7 @@ "from":"CsvParser", "outport":"", "inport":"", - "to":"PythonExecutor" + "to":"ExecutePython" } ] } diff --git a/piflow-bundle/src/main/resources/flow/script/pythonWithDataFrame.json b/piflow-bundle/src/main/resources/flow/script/pythonWithDataFrame.json new file mode 100644 index 0000000..0616f87 --- /dev/null +++ b/piflow-bundle/src/main/resources/flow/script/pythonWithDataFrame.json @@ -0,0 +1,36 @@ +{ + "flow":{ + "name":"pythonTest", + "uuid":"1234567890", + "stops":[ + { + "uuid":"1111", + "name":"CsvParser", + "bundle":"cn.piflow.bundle.csv.CsvParser", + "properties":{ + "csvPath":"hdfs://10.0.88.13:9000/xjzhu/test.csv", + "header":"false", + "delimiter":",", + "schema":"title,author" + } + }, + { + "uuid":"2222", + "name":"ExecutePythonWithDataFrame", + "bundle":"cn.piflow.bundle.script.ExecutePythonWithDataFrame", + "properties":{ + "script":"import sys\nimport os\n\nimport numpy as np\nfrom scipy import linalg\nimport pandas as pd\n\nimport matplotlib\nmatplotlib.use('Agg')\n\n\ndef listFunction(dictInfo):\n\n return dictInfo", + "execFunction": "listFunction" + } + } + ], + "paths":[ + { + "from":"CsvParser", + "outport":"", + "inport":"", + "to":"ExecutePythonWithDataFrame" + } + ] + } +} \ No newline at end of file diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/script/ExecutePython.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/script/ExecutePython.scala new file mode 100644 index 0000000..0a7abab --- /dev/null +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/script/ExecutePython.scala @@ -0,0 +1,61 @@ +package cn.piflow.bundle.script + +import java.util +import java.util.UUID + +import cn.piflow.conf.bean.PropertyDescriptor +import cn.piflow.conf.util.{ImageUtil, MapUtil} +import cn.piflow.conf.{ConfigurableStop, Port, StopGroup} +import cn.piflow.util.FileUtil +import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} +import jep.Jep +import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.sql.{Row, SparkSession} + +import scala.collection.JavaConversions._ + +/** + * Created by xjzhu@cnic.cn on 2/24/20 + */ +class ExecutePython extends ConfigurableStop{ + override val authorEmail: String = "xjzhu@cnic.cn" + override val description: String = "Execute python script" + override val inportList: List[String] = List(Port.DefaultPort) + override val outportList: List[String] = List(Port.DefaultPort) + + var script : String = _ + + override def setProperties(map: Map[String, Any]): Unit = { + script = MapUtil.get(map,"script").asInstanceOf[String] + } + + override def getPropertyDescriptor(): List[PropertyDescriptor] = { + var descriptor : List[PropertyDescriptor] = List() + val script = new PropertyDescriptor() + .name("script") + .displayName("script") + .description("The code of python") + .defaultValue("") + .required(true) + + descriptor = script :: descriptor + descriptor + } + + override def getIcon(): Array[Byte] = { + ImageUtil.getImage("icon/script/python.png") + } + + override def getGroup(): List[String] = { + List(StopGroup.ScriptGroup) + } + override def initialize(ctx: ProcessContext): Unit = {} + + override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { + + val jep = new Jep() + val scriptPath = "/tmp/pythonExcutor-"+ UUID.randomUUID() +".py" + FileUtil.writeFile(script,scriptPath) + jep.runScript(scriptPath) + } +} diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/script/PythonTest.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/script/PythonTest.scala index ed22978..38b5771 100644 --- a/piflow-bundle/src/test/scala/cn/piflow/bundle/script/PythonTest.scala +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/script/PythonTest.scala @@ -33,7 +33,7 @@ class PythonTest { val spark = SparkSession.builder() .master("local") //. master("spark://10.0.86.89:7077")t - .config("spark.driver.memory", "1g") + .config("spark.driver.memory", "1g") .config("spark.executor.memory", "2g") .config("spark.cores.max", "2") //.config("spark.yarn.appMasterEnv.PYSPARK_PYTHON","/usr/bin/python3") @@ -47,12 +47,13 @@ class PythonTest { val process = Runner.create() .bind(classOf[SparkSession].getName, spark) - .bind("checkpoint.path", "hdfs://10.0.86.89:9000/xjzhu/piflow/checkpoints/") - .bind("debug.path","hdfs://10.0.86.89:9000/xjzhu/piflow/debug/") + .bind("checkpoint.path", "") + .bind("debug.path","") .start(flow); process.awaitTermination(); spark.close(); + h2Server.stop() } -} + } diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/script/PythonWithDataFrameTest.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/script/PythonWithDataFrameTest.scala new file mode 100644 index 0000000..a7fa040 --- /dev/null +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/script/PythonWithDataFrameTest.scala @@ -0,0 +1,61 @@ +package cn.piflow.bundle.script + +import java.net.InetAddress + +import cn.piflow.Runner +import cn.piflow.conf.bean.FlowBean +import cn.piflow.conf.util.{FileUtil, OptionUtil} +import cn.piflow.util.ServerIpUtil +import org.apache.spark.sql.SparkSession +import org.h2.tools.Server +import org.junit.Test + +import scala.util.parsing.json.JSON + +/** + * Created by xjzhu@cnic.cn on 2/24/20 + */ +class PythonWithDataFrameTest { + + + @Test + def testPythonWithDataFrame() : Unit = { + //parse flow json + val file = "src/main/resources/flow/script/pythonWithDataFrame.json" + val flowJsonStr = FileUtil.fileReader(file) + val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]] + println(map) + + //create flow + val flowBean = FlowBean(map) + val flow = flowBean.constructFlow() + + //execute flow + val spark = SparkSession.builder() + .master("local") + //. master("spark://10.0.86.89:7077")t + .config("spark.driver.memory", "1g") + .config("spark.executor.memory", "2g") + .config("spark.cores.max", "2") + //.config("spark.yarn.appMasterEnv.PYSPARK_PYTHON","/usr/bin/python3") + //.config("spark.jars","/opt/project/piflow/piflow-bundle/lib/jython-standalone-2.7.1.jar") + .enableHiveSupport() + .getOrCreate() + + val ip = InetAddress.getLocalHost.getHostAddress + cn.piflow.util.FileUtil.writeFile("server.ip=" + ip, ServerIpUtil.getServerIpFile()) + val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start() + + val process = Runner.create() + .bind(classOf[SparkSession].getName, spark) + .bind("checkpoint.path", "") + .bind("debug.path","") + .start(flow); + + process.awaitTermination(); + spark.close(); + h2Server.stop() + } + + +}