From 5bb7d64efd04128b84b3448ef17b5df91b9b15a3 Mon Sep 17 00:00:00 2001 From: judy0131 Date: Mon, 30 Nov 2020 03:50:55 -0500 Subject: [PATCH] fix bug: Execute Python Stop --- .../flow/script/pythonWithDataFrame.json | 17 ++++---- .../script/ExecutePythonWithDataFrame.scala | 17 ++++++-- .../piflow/bundle/script/ExecuteScala.scala | 2 +- .../scala/cn/piflow/api/HTTPService.scala | 41 ++++++++++--------- 4 files changed, 46 insertions(+), 31 deletions(-) diff --git a/piflow-bundle/src/main/resources/flow/script/pythonWithDataFrame.json b/piflow-bundle/src/main/resources/flow/script/pythonWithDataFrame.json index 0616f87..c44505c 100644 --- a/piflow-bundle/src/main/resources/flow/script/pythonWithDataFrame.json +++ b/piflow-bundle/src/main/resources/flow/script/pythonWithDataFrame.json @@ -5,13 +5,14 @@ "stops":[ { "uuid":"1111", - "name":"CsvParser", - "bundle":"cn.piflow.bundle.csv.CsvParser", + "name":"MysqlRead", + "bundle":"cn.piflow.bundle.jdbc.MysqlRead", "properties":{ - "csvPath":"hdfs://10.0.88.13:9000/xjzhu/test.csv", - "header":"false", - "delimiter":",", - "schema":"title,author" + "sql":"select id,name from student", + "url":"jdbc:mysql://10.0.88.24:3306/visualization?useUnicode=true&characterEncoding=utf-8", + "driver":"com.mysql.jdbc.Driver", + "user":"root", + "password":"123456" } }, { @@ -19,14 +20,14 @@ "name":"ExecutePythonWithDataFrame", "bundle":"cn.piflow.bundle.script.ExecutePythonWithDataFrame", "properties":{ - "script":"import sys\nimport os\n\nimport numpy as np\nfrom scipy import linalg\nimport pandas as pd\n\nimport matplotlib\nmatplotlib.use('Agg')\n\n\ndef listFunction(dictInfo):\n\n return dictInfo", + "script":"import sys\nimport os\nimport numpy as np\ndef listFunction(dictInfo): \n newDict = {\"name\":\"hello new user!\", \"id\":11}\n secondDict = {\"name\":\"hello second user!\", \"id\":12}\n listInfo=[newDict, secondDict]\n return dictInfo + listInfo\n", "execFunction": "listFunction" } } ], "paths":[ { - "from":"CsvParser", + "from":"MysqlRead", "outport":"", "inport":"", "to":"ExecutePythonWithDataFrame" diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/script/ExecutePythonWithDataFrame.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/script/ExecutePythonWithDataFrame.scala index 942925a..acf7fa8 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/script/ExecutePythonWithDataFrame.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/script/ExecutePythonWithDataFrame.scala @@ -75,24 +75,35 @@ class ExecutePythonWithDataFrame extends ConfigurableStop{ val listInfo = df.toJSON.collectAsList() jep.eval(s"result = $execFunction($listInfo)") val resultArrayList = jep.getValue("result",new util.ArrayList().getClass) - println(resultArrayList) + println("Execute Python Result : " + resultArrayList + "!!!!!!!!!!!!!!!!!!!!!") - var resultList = List[Map[String, Any]]() + var resultList = List[Map[String, String]]() val it = resultArrayList.iterator() while(it.hasNext){ val i = it.next().asInstanceOf[java.util.HashMap[String, Any]] val item = mapAsScalaMap(i).toMap[String, Any] - resultList = item +: resultList + var new_item = Map[String, String]() + item.foreach(m => { + val key = m._1 + val value = m._2 + new_item += (key -> String.valueOf(value)) + }) + resultList = new_item +: resultList } + println("Convert Python Result to Scala List: " + resultList + "!!!!!!!!!!!!!!!!!!!!!") val rows = resultList.map( m => Row(m.values.toSeq:_*)) + //println("rows: " + rows + "!!!!!!!!!!!!!!!!!!!!!") val header = resultList.head.keys.toList + //println("header: " + header + "!!!!!!!!!!!!!!!!!!!!!") val schema = StructType(header.map(fieldName => new StructField(fieldName, StringType, true))) + println("schema: " + schema + "!!!!!!!!!!!!!!!!!!!!!") val rdd = spark.sparkContext.parallelize(rows) val resultDF = spark.createDataFrame(rdd, schema) + resultDF.show() out.write(resultDF) } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/script/ExecuteScala.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/script/ExecuteScala.scala index 22ce89b..59e62b9 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/script/ExecuteScala.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/script/ExecuteScala.scala @@ -34,7 +34,7 @@ class ExecuteScala extends ConfigurableStop{ val pluginName = new PropertyDescriptor() .name("plugin") .displayName("Plugin") - .description("The class name of scala code. This field is generated automaticly.") + .description("The class name of scala code.") .defaultValue("") .required(true) descriptor = pluginName :: descriptor diff --git a/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala b/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala index 91ce917..6e76a95 100644 --- a/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala +++ b/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala @@ -31,6 +31,8 @@ import org.flywaydb.core.api.FlywayException import org.h2.tools.Server import spray.json.DefaultJsonProtocol +import scala.io.Source + object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSupport{ implicit val config = ConfigFactory.load() @@ -295,28 +297,28 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup case HttpRequest(POST, Uri.Path("/group/start"), headers, entity, protocol) =>{ - try{ - /*entity match { - case HttpEntity.Strict(_, data) =>{ - var flowGroupJson = data.utf8String - val flowGroupExecution = API.startGroup(flowGroupJson) - flowGroupMap += (flowGroupExecution.getGroupId() -> flowGroupExecution) - val result = "{\"group\":{\"id\":\"" + flowGroupExecution.getGroupId() + "\"}}" - Future.successful(HttpResponse(SUCCESS_CODE, entity = result)) - } - case otherType => { - println(otherType) + /*try{ + + val bodyFeature = Unmarshal(entity.withoutSizeLimit()) + val flowGroupJson = ""//Await.result(bodyFeature,scala.concurrent.duration.Duration(5,"second")) + + val flowGroupExecution = API.startGroup(flowGroupJson) + flowGroupMap += (flowGroupExecution.getGroupId() -> flowGroupExecution) + val result = "{\"group\":{\"id\":\"" + flowGroupExecution.getGroupId() + "\"}}" + Future.successful(HttpResponse(SUCCESS_CODE, entity = result)) + }catch { + case ex => { + println(ex) + Future.successful(HttpResponse(FAIL_CODE, entity = "Can not start group!")) + } + }*/ + try{ - val bodyFeature = Unmarshal(entity).to [String] - val flowGroupJson = Await.result(bodyFeature,scala.concurrent.duration.Duration(1,"second")) - val flowGroupExecution = API.startGroup(flowGroupJson) - flowGroupMap += (flowGroupExecution.getGroupId() -> flowGroupExecution) - val result = "{\"group\":{\"id\":\"" + flowGroupExecution.getGroupId() + "\"}}" - Future.successful(HttpResponse(SUCCESS_CODE, entity = result)) - } - }*/ val bodyFeature = Unmarshal(entity).to [String] val flowGroupJson = Await.result(bodyFeature,scala.concurrent.duration.Duration(1,"second")) + //use file to run large group + //val flowGroupJsonPath = Await.result(bodyFeature,scala.concurrent.duration.Duration(1,"second")) + //val flowGroupJson = Source.fromFile(flowGroupJsonPath).getLines().toArray.mkString("\n")*/ val flowGroupExecution = API.startGroup(flowGroupJson) flowGroupMap += (flowGroupExecution.getGroupId() -> flowGroupExecution) val result = "{\"group\":{\"id\":\"" + flowGroupExecution.getGroupId() + "\"}}" @@ -328,6 +330,7 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup } } + }