add ShellExecutor Stop

This commit is contained in:
judy0131 2018-07-17 17:47:46 +08:00
parent a26569e15e
commit 2a2b6bd351
3 changed files with 145 additions and 0 deletions

View File

@ -0,0 +1,36 @@
{
"flow":{
"name":"shellTest",
"uuid":"5678",
"stops":[
{
"uuid":"1111",
"name":"ShellExecutor",
"bundle":"cn.piflow.bundle.script.ShellExecutor",
"properties":{
"shellPath":"/opt/data/test.sh",
"args":{},
"outputSchema":"id,name,gender,age"
}
},
{
"uuid":"2222",
"name":"PutHiveStreaming",
"bundle":"cn.piflow.bundle.hive.PutHiveStreaming",
"properties":{
"database":"sparktest",
"table":"student"
}
}
],
"paths":[
{
"from":"ShellExecutor",
"outport":"",
"inport":"",
"to":"PutHiveStreaming"
}
]
}
}

View File

@ -0,0 +1,65 @@
package cn.piflow.bundle.script
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.conf.{ConfigurableStop, ScriptGroup, StopGroup}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import sys.process._
class ShellExecutor extends ConfigurableStop{
var shellPath: String = _
var args: Array[String] = _
var outputSchema: String = _
override def setProperties(map: Map[String, Any]): Unit = {
shellPath = MapUtil.get(map,"shellPath").asInstanceOf[String]
//args = MapUtil.get(map,"args").asInstanceOf[Array[String]]
outputSchema = MapUtil.get(map,"outputSchema").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = ???
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("./src/main/resources/ShellExecutor.jpg")
}
override def getGroup(): StopGroup = {
ScriptGroup
}
override def initialize(ctx: ProcessContext): Unit = {
}
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val result = shellPath !!
val rawData = result.split("\n").toList
var rowList = List[Row]()
rawData.foreach( s => rowList = Row(s) :: rowList )
val spark = pec.get[SparkSession]()
val rdd = spark.sparkContext.parallelize(rowList)
//Construct StructType
/*val field = outputSchema.split(",")
val structFieldArray : Array[StructField] = new Array[StructField](field.size)
for(i <- 0 to field.size - 1){
structFieldArray(i) = new StructField(field(i), StringType, nullable = true)
}
val schema : StructType = StructType(structFieldArray)*/
val schema = StructType(List(new StructField("row", StringType, nullable = true)))
println("")
val df = spark.createDataFrame(rdd, schema)
out.write(df)
}
}

View File

@ -0,0 +1,44 @@
package cn.piflow.bundle
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import org.apache.spark.sql.SparkSession
import org.junit.Test
import scala.util.parsing.json.JSON
class ShellFlowTest {
@Test
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/shellflow.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
//execute flow
val spark = SparkSession.builder()
.master("spark://10.0.86.89:7077")
.appName("piflow-hive-bundle")
.config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g")
.config("spark.cores.max", "2")
.config("spark.jars","/opt/project/piflow/out/artifacts/piflow_bundle/piflow-bundle.jar")
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.start(flow);
process.awaitTermination();
spark.close();
}
}