add ExecutePython Stop

This commit is contained in:
judy0131 2020-04-07 19:52:19 +08:00
parent bcc0068439
commit b0854bbad0
5 changed files with 166 additions and 8 deletions

View File

@ -17,10 +17,9 @@
{ {
"uuid":"2222", "uuid":"2222",
"name":"PythonExecutor", "name":"PythonExecutor",
"bundle":"cn.piflow.bundle.script.ExecutePythonWithDataFrame", "bundle":"cn.piflow.bundle.script.ExecutePython",
"properties":{ "properties":{
"script":"import sys\nimport os\n\nimport numpy as np\nfrom scipy import linalg\nimport pandas as pd\n\nimport matplotlib\nmatplotlib.use('Agg')\n\n\ndef listFunction(dictInfo):\n\n return dictInfo", "script":"import sys\nimport os\n\nimport numpy as np\nfrom scipy import linalg\nimport pandas as pd\n\nimport matplotlib\nmatplotlib.use('Agg')\nimport matplotlib.pyplot as plt\n\nimport seaborn as sns\n\nimport timeit\nimport numpy.random as np_random\nfrom numpy.linalg import inv, qr\nfrom random import normalvariate\n\nimport pylab\n\nif __name__ == \"__main__\":\n print(\"Hello PiFlow\")\n try:\n print(\"\\n mock data\")\n nsteps = 1000\n draws = np.random.randint(0,2,size=nsteps)\n print(\"\\n \" + str(draws))\n steps = np.where(draws > 0, 1, -1)\n walk = steps.cumsum()\n print(\"Draw picture\")\n plt.title('Random Walk')\n limit = max(abs(min(walk)), abs(max(walk)))\n plt.axis([0, nsteps, -limit, limit])\n x = np.linspace(0,nsteps, nsteps)\n plt.plot(x, walk, 'g-')\n plt.savefig('/opt/python.png')\n except Exception as e:\n print(e)"
"execFunction": "listFunction"
} }
} }
], ],
@ -29,7 +28,7 @@
"from":"CsvParser", "from":"CsvParser",
"outport":"", "outport":"",
"inport":"", "inport":"",
"to":"PythonExecutor" "to":"ExecutePython"
} }
] ]
} }

View File

@ -0,0 +1,36 @@
{
"flow":{
"name":"pythonTest",
"uuid":"1234567890",
"stops":[
{
"uuid":"1111",
"name":"CsvParser",
"bundle":"cn.piflow.bundle.csv.CsvParser",
"properties":{
"csvPath":"hdfs://10.0.88.13:9000/xjzhu/test.csv",
"header":"false",
"delimiter":",",
"schema":"title,author"
}
},
{
"uuid":"2222",
"name":"ExecutePythonWithDataFrame",
"bundle":"cn.piflow.bundle.script.ExecutePythonWithDataFrame",
"properties":{
"script":"import sys\nimport os\n\nimport numpy as np\nfrom scipy import linalg\nimport pandas as pd\n\nimport matplotlib\nmatplotlib.use('Agg')\n\n\ndef listFunction(dictInfo):\n\n return dictInfo",
"execFunction": "listFunction"
}
}
],
"paths":[
{
"from":"CsvParser",
"outport":"",
"inport":"",
"to":"ExecutePythonWithDataFrame"
}
]
}
}

View File

@ -0,0 +1,61 @@
package cn.piflow.bundle.script
import java.util
import java.util.UUID
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.conf.{ConfigurableStop, Port, StopGroup}
import cn.piflow.util.FileUtil
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import jep.Jep
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
import scala.collection.JavaConversions._
/**
* Created by xjzhu@cnic.cn on 2/24/20
*/
class ExecutePython extends ConfigurableStop{
override val authorEmail: String = "xjzhu@cnic.cn"
override val description: String = "Execute python script"
override val inportList: List[String] = List(Port.DefaultPort)
override val outportList: List[String] = List(Port.DefaultPort)
var script : String = _
override def setProperties(map: Map[String, Any]): Unit = {
script = MapUtil.get(map,"script").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val script = new PropertyDescriptor()
.name("script")
.displayName("script")
.description("The code of python")
.defaultValue("")
.required(true)
descriptor = script :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("icon/script/python.png")
}
override def getGroup(): List[String] = {
List(StopGroup.ScriptGroup)
}
override def initialize(ctx: ProcessContext): Unit = {}
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val jep = new Jep()
val scriptPath = "/tmp/pythonExcutor-"+ UUID.randomUUID() +".py"
FileUtil.writeFile(script,scriptPath)
jep.runScript(scriptPath)
}
}

View File

@ -47,12 +47,13 @@ class PythonTest {
val process = Runner.create() val process = Runner.create()
.bind(classOf[SparkSession].getName, spark) .bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "hdfs://10.0.86.89:9000/xjzhu/piflow/checkpoints/") .bind("checkpoint.path", "")
.bind("debug.path","hdfs://10.0.86.89:9000/xjzhu/piflow/debug/") .bind("debug.path","")
.start(flow); .start(flow);
process.awaitTermination(); process.awaitTermination();
spark.close(); spark.close();
h2Server.stop()
} }
} }

View File

@ -0,0 +1,61 @@
package cn.piflow.bundle.script
import java.net.InetAddress
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.ServerIpUtil
import org.apache.spark.sql.SparkSession
import org.h2.tools.Server
import org.junit.Test
import scala.util.parsing.json.JSON
/**
* Created by xjzhu@cnic.cn on 2/24/20
*/
class PythonWithDataFrameTest {
@Test
def testPythonWithDataFrame() : Unit = {
//parse flow json
val file = "src/main/resources/flow/script/pythonWithDataFrame.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
//execute flow
val spark = SparkSession.builder()
.master("local")
//. master("spark://10.0.86.89:7077")t
.config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g")
.config("spark.cores.max", "2")
//.config("spark.yarn.appMasterEnv.PYSPARK_PYTHON","/usr/bin/python3")
//.config("spark.jars","/opt/project/piflow/piflow-bundle/lib/jython-standalone-2.7.1.jar")
.enableHiveSupport()
.getOrCreate()
val ip = InetAddress.getLocalHost.getHostAddress
cn.piflow.util.FileUtil.writeFile("server.ip=" + ip, ServerIpUtil.getServerIpFile())
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort","50001").start()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "")
.bind("debug.path","")
.start(flow);
process.awaitTermination();
spark.close();
h2Server.stop()
}
}