1.fix csvparser bug

2.support python scprit
This commit is contained in:
judy_0131 2020-03-04 16:01:11 +08:00
parent ce9daf60a3
commit 3b751243e8
6 changed files with 93 additions and 63 deletions

View File

@ -131,6 +131,22 @@
<version>0.4.1</version>
</dependency>
<!--<dependency>
<groupId>org.python</groupId>
<artifactId>jython</artifactId>
<version>2.7.0</version>
</dependency>-->
<!--<dependency>
<groupId>org.python</groupId>
<artifactId>jython-standalone</artifactId>
<version>2.7.1</version>
</dependency>-->
<dependency>
<groupId>black.ninia</groupId>
<artifactId>jep</artifactId>
<version>3.9.0</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
@ -237,6 +253,11 @@
<artifactId>commons-pool2</artifactId>
<version>2.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.5</version>
</dependency>
<dependency>
<groupId>ftpClient</groupId>

View File

@ -5,15 +5,32 @@
"stops":[
{
"uuid":"1111",
"name":"CsvParser",
"bundle":"cn.piflow.bundle.csv.CsvParser",
"properties":{
"csvPath":"hdfs://10.0.88.13:9000/xjzhu/test.csv",
"header":"false",
"delimiter":",",
"schema":"title,author"
}
},
{
"uuid":"2222",
"name":"PythonExecutor",
"bundle":"cn.piflow.bundle.python.PythonExecutor",
"properties":{
"script":"python.py"
"script":"import sys\nimport os\n\nimport numpy as np\nfrom scipy import linalg\nimport pandas as pd\n\nimport matplotlib\nmatplotlib.use('Agg')\n\n\ndef listFunction(dictInfo):\n\n return dictInfo",
"execFunction": "listFunction"
}
}
],
"paths":[
{
"from":"CsvParser",
"outport":"",
"inport":"",
"to":"PythonExecutor"
}
]
}
}

View File

@ -46,6 +46,7 @@ class CsvParser extends ConfigurableStop{
.option("header",header)
.option("inferSchema","false")
.option("delimiter",delimiter)
.option("timestampFormat","yyyy/MM/dd HH:mm:ss ZZ")
.schema(schemaStructType)
.csv(csvPath)
}

View File

@ -1,11 +1,19 @@
package cn.piflow.bundle.python
import java.util
import java.util.UUID
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroup}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import org.python.core.{PyFunction, PyInteger, PyObject}
import org.python.util.PythonInterpreter
import cn.piflow.util.FileUtil
import jep.Jep
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Encoders, Row, SparkSession}
import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._
/**
* Created by xjzhu@cnic.cn on 2/24/20
@ -17,15 +25,19 @@ class PythonExecutor extends ConfigurableStop{
override val outportList: List[String] = List(PortEnum.DefaultPort)
var script : String = _
var execFunction : String = _
override def setProperties(map: Map[String, Any]): Unit = {
script = MapUtil.get(map,"script").asInstanceOf[String]
execFunction = MapUtil.get(map,"execFunction").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val script = new PropertyDescriptor().name("script").displayName("script").description("The code of python").defaultValue("").required(true)
val execFunction = new PropertyDescriptor().name("execFunction").displayName("execFunction").description("The function of python script to be executed.").defaultValue("").required(true)
descriptor = script :: descriptor
descriptor = execFunction :: descriptor
descriptor
}
@ -39,61 +51,40 @@ class PythonExecutor extends ConfigurableStop{
override def initialize(ctx: ProcessContext): Unit = {}
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val script =
"""
|import sys
|import os
|
|import numpy as np
|from scipy import linalg
|import pandas as pd
|
|import matplotlib
|matplotlib.use('Agg')
|import matplotlib.pyplot as plt
|
|import seaborn as sns
|
|import timeit
|import numpy.random as np_random
|from numpy.linalg import inv, qr
|from random import normalvariate
|
|import pylab
|
|if __name__ == "__main__":
| print("Hello PiFlow")
| try:
| print("\n mock data")
| nsteps = 1000
| draws = np.random.randint(0,2,size=nsteps)
| print("\n " + str(draws))
| steps = np.where(draws > 0, 1, -1)
| walk = steps.cumsum()
| print("Draw picture")
| plt.title('Random Walk')
| limit = max(abs(min(walk)), abs(max(walk)))
| plt.axis([0, nsteps, -limit, limit])
| x = np.linspace(0,nsteps, nsteps)
| plt.plot(x, walk, 'g-')
| plt.savefig('/opt/python.png')
| except Exception as e:
| print(e)
|
|
|
""".stripMargin
/*val script =
"""
|import sys
|import os
|
|if __name__ == "__main__":
| print("Hello PiFlow")
""".stripMargin*/
val interpreter = new PythonInterpreter()
interpreter.exec(script)
/*val proc1 = Runtime.getRuntime().exec("python " + script)
proc1.waitFor()*/
val spark = pec.get[SparkSession]()
import spark.implicits._
val df = in.read()
val jep = new Jep()
val scriptPath = "/tmp/pythonExcutor-"+ UUID.randomUUID() +".py"
FileUtil.writeFile(script,scriptPath)
jep.runScript(scriptPath)
val listInfo = df.toJSON.collectAsList()
jep.eval(s"result = $execFunction($listInfo)")
val resultArrayList = jep.getValue("result",new util.ArrayList().getClass)
println(resultArrayList)
var resultList = List[Map[String, Any]]()
val it = resultArrayList.iterator()
while(it.hasNext){
val i = it.next().asInstanceOf[java.util.HashMap[String, Any]]
val item = mapAsScalaMap(i).toMap[String, Any]
resultList = item +: resultList
}
val rows = resultList.map( m => Row(m.values.toSeq:_*))
val header = resultList.head.keys.toList
val schema = StructType(header.map(fieldName => new StructField(fieldName, StringType, true)))
val rdd = spark.sparkContext.parallelize(rows)
val resultDF = spark.createDataFrame(rdd, schema)
out.write(resultDF)
}
}

View File

@ -32,11 +32,11 @@ class PythonTest {
//execute flow
val spark = SparkSession.builder()
.master("local")
//.master("spark://10.0.86.89:7077")
.appName("pythonTest")
.config("spark.driver.memory", "1g")
//. master("spark://10.0.86.89:7077")t
.config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g")
.config("spark.cores.max", "2")
//.config("spark.yarn.appMasterEnv.PYSPARK_PYTHON","/usr/bin/python3")
//.config("spark.jars","/opt/project/piflow/piflow-bundle/lib/jython-standalone-2.7.1.jar")
.enableHiveSupport()
.getOrCreate()