This commit is contained in:
xiaoxiao 2018-08-23 13:36:17 +08:00
commit bf7b5d9ea4
6 changed files with 195 additions and 3 deletions

15
piflow-api/pom.xml Normal file
View File

@ -0,0 +1,15 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>piflow-project</artifactId>
<groupId>piflow</groupId>
<version>0.9</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>piflow-api</artifactId>
</project>

View File

@ -0,0 +1,99 @@
package cn.cnic.bigdatalab.server
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.http.scaladsl.Http
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import akka.http.scaladsl.server.Directives
import cn.cnic.bigdatalab.utils.PropertyUtil
import spray.json.DefaultJsonProtocol
import akka.http.scaladsl.model.HttpMethods._
import akka.http.scaladsl.model._
import com.typesafe.config.ConfigFactory
import scala.concurrent.Future
import scala.util.parsing.json.JSON
/**
* Created by Flora on 2016/8/8.
*/
object HTTPService2 extends DefaultJsonProtocol with Directives with SprayJsonSupport{
implicit val system = ActorSystem("HTTPService2", ConfigFactory.load())
implicit val materializer = ActorMaterializer()
def toJson(entity: RequestEntity): Map[String, Any] = {
entity match {
case HttpEntity.Strict(_, data) =>{
val temp = JSON.parseFull(data.utf8String)
temp.get.asInstanceOf[Map[String, Any]]
}
case _ => Map()
}
}
def route(req: HttpRequest): Future[HttpResponse] = req match {
case HttpRequest(GET, Uri.Path("/"), headers, entity, protocol) => {
Future.successful(HttpResponse(entity = "Get OK!"))
}
case HttpRequest(POST, Uri.Path("/task/v2/create"), headers, entity, protocol) =>{
val data = toJson(entity)
if(!data.get("agentId").isEmpty && !data.get("taskId").isEmpty){
val result = API.runRealTimeTask(data.get("agentId").get.asInstanceOf[String], data.get("taskId").get.asInstanceOf[String])
Future.successful(HttpResponse(entity = result))
}else if(!data.get("taskId").isEmpty){
val result = API.runOfflineTask(data.get("taskId").get.asInstanceOf[String])
Future.successful(HttpResponse(entity = result))
}else{
Future.successful(HttpResponse(entity = "Param Error!"))
}
}
case HttpRequest(DELETE, Uri.Path("/task/v2/delete"), headers, entity, protocol) =>{
val data = toJson(entity)
if(data.get("name").isEmpty){
Future.successful(HttpResponse(entity = "Param Error!"))
}else{
val result = API.deleteTask(data.get("name").get.asInstanceOf[String])
Future.successful(HttpResponse(entity = result))
}
}
case HttpRequest(PUT, Uri.Path("/task/v2/stop"), headers, entity, protocol) =>{
val data = toJson(entity)
if(data.get("name").isEmpty){
Future.successful(HttpResponse(entity = "Param Error!"))
}else{
val result = API.stopTask(data.get("name").get.asInstanceOf[String])
Future.successful(HttpResponse(entity = result))
}
}
case HttpRequest(PUT, Uri.Path("/task/v2/start"), headers, entity, protocol) =>{
val data = toJson(entity)
if(data.get("name").isEmpty){
Future.successful(HttpResponse(entity = "Param Error!"))
}else{
val result = API.startTask(data.get("name").get.asInstanceOf[String])
Future.successful(HttpResponse(entity = result))
}
}
case _: HttpRequest =>
Future.successful(HttpResponse(404, entity = "Unknown resource!"))
}
def run = {
val ip = PropertyUtil.getPropertyValue("server_ip")
val port = PropertyUtil.getIntPropertyValue("server_port")
Http().bindAndHandleAsync(route, ip, port)
println("Server:" + ip + ":" + port + " Started!!!")
}
}
object Main {
def main(argv: Array[String]):Unit = {
HTTPService2.run
}
}

View File

@ -54,7 +54,7 @@
"name":"Fork",
"bundle":"cn.piflow.bundle.common.Fork",
"properties":{
"outports":["out1","out2"]
"outports":["out1","out2","out3"]
}
},
{
@ -64,6 +64,16 @@
"properties":{
"jsonSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.json"
}
},
{
"uuid":"888",
"name":"CsvSave",
"bundle":"cn.piflow.bundle.csv.CsvSave",
"properties":{
"csvSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.csv",
"header":"true",
"delimiter":","
}
}
],
"paths":[
@ -102,6 +112,12 @@
"outport":"out2",
"inport":"",
"to":"JsonSave"
},
{
"from":"Fork",
"outport":"out3",
"inport":"",
"to":"CsvSave"
}
]
}

View File

@ -7,8 +7,6 @@ import cn.piflow.conf.util.MapUtil
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.beans.BeanProperty
class CsvParser extends ConfigurableStop{

View File

@ -0,0 +1,63 @@
package cn.piflow.bundle.csv
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.conf.{ConfigurableStop, CsvGroup, StopGroup}
import org.apache.spark.sql.SaveMode
class CsvSave extends ConfigurableStop{
override val inportCount: Int = 1
override val outportCount: Int = 0
var csvSavePath: String = _
var header: Boolean = _
var delimiter: String = _
override def setProperties(map: Map[String, Any]): Unit = {
csvSavePath = MapUtil.get(map,"csvSavePath").asInstanceOf[String]
header = MapUtil.get(map,"header").asInstanceOf[String].toBoolean
delimiter = MapUtil.get(map,"delimiter").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
//csvSavePath
val csvSavePath = new PropertyDescriptor().name("csvSavePath").displayName("csvSavePath").defaultValue("").required(true)
descriptor = csvSavePath :: descriptor
//header
val header = new PropertyDescriptor().name("header").displayName("header").defaultValue("header").required(true)
descriptor = header :: descriptor
//delimiter
val delimiter = new PropertyDescriptor().name("delimiter").displayName("delimiter").defaultValue(",").required(true)
descriptor = delimiter :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("./src/main/resources/selectHiveQL.jpg")
}
override def getGroup(): StopGroup = {
CsvGroup
}
override def initialize(ctx: ProcessContext): Unit = {
}
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val df = in.read()
df.show()
df.write
.format("csv")
.mode(SaveMode.Overwrite)
.option("header", header)
.save(csvSavePath)
}
}

View File

@ -86,5 +86,6 @@
<module>piflow-core</module>
<module>piflow-conf</module>
<module>piflow-bundle</module>
<module>piflow-api</module>
</modules>
</project>