add parameters for shell script

This commit is contained in:
judy0131 2018-07-18 10:13:51 +08:00
parent 2a2b6bd351
commit 576fbd039e
3 changed files with 15 additions and 18 deletions

View File

@ -8,8 +8,8 @@
"name":"ShellExecutor",
"bundle":"cn.piflow.bundle.script.ShellExecutor",
"properties":{
"shellPath":"/opt/data/test.sh",
"args":{},
"shellPath":"src/main/resources/shellflow.sh",
"args":"666,bing,M,32 777,ming,F,33",
"outputSchema":"id,name,gender,age"
}

View File

@ -0,0 +1,8 @@
#! /bin/bash
echo "111,Tom,M,88"
echo "222,Marry,F,77"
echo "333,Lucy,F,66"
echo "444,Lily,F,66"
echo "555,Peter,F,66"
echo "$1"
echo "$2"

View File

@ -11,16 +11,14 @@ import sys.process._
class ShellExecutor extends ConfigurableStop{
var shellPath: String = _
var args: Array[String] = _
var args: String = _
var outputSchema: String = _
override def setProperties(map: Map[String, Any]): Unit = {
shellPath = MapUtil.get(map,"shellPath").asInstanceOf[String]
//args = MapUtil.get(map,"args").asInstanceOf[Array[String]]
outputSchema = MapUtil.get(map,"outputSchema").asInstanceOf[String]
args = MapUtil.get(map,"args").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = ???
@ -38,26 +36,17 @@ class ShellExecutor extends ConfigurableStop{
}
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val result = shellPath !!
val command = shellPath + " " + args
val result = command!!
val rawData = result.split("\n").toList
var rowList = List[Row]()
var rowList : List[Row] = List()
rawData.foreach( s => rowList = Row(s) :: rowList )
val spark = pec.get[SparkSession]()
val rdd = spark.sparkContext.parallelize(rowList)
//Construct StructType
/*val field = outputSchema.split(",")
val structFieldArray : Array[StructField] = new Array[StructField](field.size)
for(i <- 0 to field.size - 1){
structFieldArray(i) = new StructField(field(i), StringType, nullable = true)
}
val schema : StructType = StructType(structFieldArray)*/
val schema = StructType(List(new StructField("row", StringType, nullable = true)))
println("")
val df = spark.createDataFrame(rdd, schema)
out.write(df)