Merge branch 'master' of https://github.com/cas-bigdatalab/piflow
This commit is contained in:
commit
590cba100e
|
@ -0,0 +1,2 @@
|
|||
server.ip=10.0.86.98
|
||||
server.port=8001
|
|
@ -10,6 +10,63 @@
|
|||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>piflow-api</artifactId>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>piflow</groupId>
|
||||
<artifactId>piflow-core</artifactId>
|
||||
<version>0.9</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>piflow</groupId>
|
||||
<artifactId>piflow-conf</artifactId>
|
||||
<version>0.9</version>
|
||||
</dependency>
|
||||
<!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-remote -->
|
||||
<dependency>
|
||||
<groupId>com.typesafe.akka</groupId>
|
||||
<artifactId>akka-remote_2.11</artifactId>
|
||||
<version>2.3.12</version>
|
||||
</dependency>
|
||||
<!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-actor -->
|
||||
<dependency>
|
||||
<groupId>com.typesafe.akka</groupId>
|
||||
<artifactId>akka-actor_2.11</artifactId>
|
||||
<version>2.3.12</version>
|
||||
</dependency>
|
||||
<!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-slf4j -->
|
||||
<dependency>
|
||||
<groupId>com.typesafe.akka</groupId>
|
||||
<artifactId>akka-slf4j_2.11</artifactId>
|
||||
<version>2.3.12</version>
|
||||
</dependency>
|
||||
<!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-stream-experimental -->
|
||||
<dependency>
|
||||
<groupId>com.typesafe.akka</groupId>
|
||||
<artifactId>akka-stream-experimental_2.11</artifactId>
|
||||
<version>2.0.4</version>
|
||||
</dependency>
|
||||
<!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-http-core-experimental -->
|
||||
<dependency>
|
||||
<groupId>com.typesafe.akka</groupId>
|
||||
<artifactId>akka-http-core-experimental_2.11</artifactId>
|
||||
<version>2.0.4</version>
|
||||
</dependency>
|
||||
<!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-http-experimental -->
|
||||
<dependency>
|
||||
<groupId>com.typesafe.akka</groupId>
|
||||
<artifactId>akka-http-experimental_2.11</artifactId>
|
||||
<version>2.0.4</version>
|
||||
</dependency>
|
||||
<!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-http-spray-json-experimental -->
|
||||
<dependency>
|
||||
<groupId>com.typesafe.akka</groupId>
|
||||
<artifactId>akka-http-spray-json-experimental_2.11</artifactId>
|
||||
<version>2.0.4</version>
|
||||
</dependency>
|
||||
|
||||
|
||||
|
||||
</dependencies>
|
||||
|
||||
|
||||
</project>
|
|
@ -0,0 +1,47 @@
|
|||
package cn.piflow.api
|
||||
|
||||
import cn.piflow.Runner
|
||||
import cn.piflow.conf.bean.FlowBean
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import cn.piflow.conf.util.OptionUtil
|
||||
import cn.piflow.Process
|
||||
|
||||
import scala.util.parsing.json.JSON
|
||||
|
||||
object API {
|
||||
|
||||
def startFlow(flowJson : String):Process = {
|
||||
//parse flow json
|
||||
val map = OptionUtil.getAny(JSON.parseFull(flowJson)).asInstanceOf[Map[String, Any]]
|
||||
println(map)
|
||||
|
||||
//create flow
|
||||
val flowBean = FlowBean(map)
|
||||
val flow = flowBean.constructFlow()
|
||||
|
||||
//execute flow
|
||||
val spark = SparkSession.builder()
|
||||
.master("spark://10.0.86.89:7077")
|
||||
.appName("piflow-hive-bundle")
|
||||
.config("spark.driver.memory", "1g")
|
||||
.config("spark.executor.memory", "2g")
|
||||
.config("spark.cores.max", "2")
|
||||
.config("spark.jars", "/opt/project/piflow/out/artifacts/piflow_bundle/piflow-bundle.jar")
|
||||
.enableHiveSupport()
|
||||
.getOrCreate()
|
||||
|
||||
val process = Runner.create()
|
||||
.bind(classOf[SparkSession].getName, spark)
|
||||
.bind("checkpoint.path", "hdfs://10.0.86.89:9000/xjzhu/piflow/checkpoints/")
|
||||
.start(flow);
|
||||
|
||||
process.awaitTermination();
|
||||
spark.close();
|
||||
process
|
||||
}
|
||||
|
||||
def stopFlow(process : Process): String = {
|
||||
process.stop()
|
||||
"ok"
|
||||
}
|
||||
}
|
|
@ -0,0 +1,81 @@
|
|||
package cn.piflow.api
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import akka.stream.ActorMaterializer
|
||||
import akka.http.scaladsl.Http
|
||||
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
|
||||
import akka.http.scaladsl.server.Directives
|
||||
import spray.json.DefaultJsonProtocol
|
||||
import akka.http.scaladsl.model.HttpMethods._
|
||||
import akka.http.scaladsl.model._
|
||||
import cn.piflow.api.util.PropertyUtil
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
||||
import scala.concurrent.Future
|
||||
import scala.util.parsing.json.JSON
|
||||
import cn.piflow.Process
|
||||
|
||||
|
||||
object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSupport{
|
||||
implicit val system = ActorSystem("HTTPService", ConfigFactory.load())
|
||||
implicit val materializer = ActorMaterializer()
|
||||
var processMap = Map[String, Process]()
|
||||
|
||||
def toJson(entity: RequestEntity): Map[String, Any] = {
|
||||
entity match {
|
||||
case HttpEntity.Strict(_, data) =>{
|
||||
val temp = JSON.parseFull(data.utf8String)
|
||||
temp.get.asInstanceOf[Map[String, Any]]
|
||||
}
|
||||
case _ => Map()
|
||||
}
|
||||
}
|
||||
|
||||
def route(req: HttpRequest): Future[HttpResponse] = req match {
|
||||
case HttpRequest(GET, Uri.Path("/"), headers, entity, protocol) => {
|
||||
Future.successful(HttpResponse(entity = "Get OK!"))
|
||||
}
|
||||
|
||||
case HttpRequest(POST, Uri.Path("/flow/start"), headers, entity, protocol) =>{
|
||||
//val flowJson = toJson(entity)
|
||||
entity match {
|
||||
case HttpEntity.Strict(_, data) =>{
|
||||
val process = API.startFlow(data.utf8String)
|
||||
processMap += (process.pid() -> process)
|
||||
Future.successful(HttpResponse(entity = process.pid()))
|
||||
}
|
||||
case _ => Future.failed(new Exception("Can not start flow!"))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
case HttpRequest(POST, Uri.Path("/flow/stop"), headers, entity, protocol) =>{
|
||||
val data = toJson(entity)
|
||||
val processId = data.get("processId").getOrElse("").asInstanceOf[String]
|
||||
if(processId.equals("") || !processMap.contains(processId)){
|
||||
Future.failed(new Exception("Can not found process Error!"))
|
||||
}else{
|
||||
|
||||
val result = API.stopFlow(processMap.get(processId).asInstanceOf[Process])
|
||||
Future.successful(HttpResponse(entity = result))
|
||||
}
|
||||
}
|
||||
|
||||
case _: HttpRequest =>
|
||||
Future.successful(HttpResponse(404, entity = "Unknown resource!"))
|
||||
}
|
||||
|
||||
def run = {
|
||||
val ip = PropertyUtil.getPropertyValue("server.ip")
|
||||
val port = PropertyUtil.getIntPropertyValue("server.port")
|
||||
Http().bindAndHandleAsync(route, ip, port)
|
||||
println("Server:" + ip + ":" + port + " Started!!!")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
object Main {
|
||||
def main(argv: Array[String]):Unit = {
|
||||
HTTPService.run
|
||||
}
|
||||
}
|
|
@ -1,99 +0,0 @@
|
|||
package cn.cnic.bigdatalab.server
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import akka.stream.ActorMaterializer
|
||||
import akka.http.scaladsl.Http
|
||||
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
|
||||
import akka.http.scaladsl.server.Directives
|
||||
import cn.cnic.bigdatalab.utils.PropertyUtil
|
||||
import spray.json.DefaultJsonProtocol
|
||||
import akka.http.scaladsl.model.HttpMethods._
|
||||
import akka.http.scaladsl.model._
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
||||
import scala.concurrent.Future
|
||||
import scala.util.parsing.json.JSON
|
||||
|
||||
/**
|
||||
* Created by Flora on 2016/8/8.
|
||||
*/
|
||||
object HTTPService2 extends DefaultJsonProtocol with Directives with SprayJsonSupport{
|
||||
implicit val system = ActorSystem("HTTPService2", ConfigFactory.load())
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
def toJson(entity: RequestEntity): Map[String, Any] = {
|
||||
entity match {
|
||||
case HttpEntity.Strict(_, data) =>{
|
||||
val temp = JSON.parseFull(data.utf8String)
|
||||
temp.get.asInstanceOf[Map[String, Any]]
|
||||
}
|
||||
case _ => Map()
|
||||
}
|
||||
}
|
||||
|
||||
def route(req: HttpRequest): Future[HttpResponse] = req match {
|
||||
case HttpRequest(GET, Uri.Path("/"), headers, entity, protocol) => {
|
||||
Future.successful(HttpResponse(entity = "Get OK!"))
|
||||
}
|
||||
|
||||
case HttpRequest(POST, Uri.Path("/task/v2/create"), headers, entity, protocol) =>{
|
||||
val data = toJson(entity)
|
||||
if(!data.get("agentId").isEmpty && !data.get("taskId").isEmpty){
|
||||
val result = API.runRealTimeTask(data.get("agentId").get.asInstanceOf[String], data.get("taskId").get.asInstanceOf[String])
|
||||
Future.successful(HttpResponse(entity = result))
|
||||
}else if(!data.get("taskId").isEmpty){
|
||||
val result = API.runOfflineTask(data.get("taskId").get.asInstanceOf[String])
|
||||
Future.successful(HttpResponse(entity = result))
|
||||
}else{
|
||||
Future.successful(HttpResponse(entity = "Param Error!"))
|
||||
}
|
||||
}
|
||||
|
||||
case HttpRequest(DELETE, Uri.Path("/task/v2/delete"), headers, entity, protocol) =>{
|
||||
val data = toJson(entity)
|
||||
if(data.get("name").isEmpty){
|
||||
Future.successful(HttpResponse(entity = "Param Error!"))
|
||||
}else{
|
||||
val result = API.deleteTask(data.get("name").get.asInstanceOf[String])
|
||||
Future.successful(HttpResponse(entity = result))
|
||||
}
|
||||
}
|
||||
|
||||
case HttpRequest(PUT, Uri.Path("/task/v2/stop"), headers, entity, protocol) =>{
|
||||
val data = toJson(entity)
|
||||
if(data.get("name").isEmpty){
|
||||
Future.successful(HttpResponse(entity = "Param Error!"))
|
||||
}else{
|
||||
val result = API.stopTask(data.get("name").get.asInstanceOf[String])
|
||||
Future.successful(HttpResponse(entity = result))
|
||||
}
|
||||
}
|
||||
|
||||
case HttpRequest(PUT, Uri.Path("/task/v2/start"), headers, entity, protocol) =>{
|
||||
val data = toJson(entity)
|
||||
if(data.get("name").isEmpty){
|
||||
Future.successful(HttpResponse(entity = "Param Error!"))
|
||||
}else{
|
||||
val result = API.startTask(data.get("name").get.asInstanceOf[String])
|
||||
Future.successful(HttpResponse(entity = result))
|
||||
}
|
||||
}
|
||||
|
||||
case _: HttpRequest =>
|
||||
Future.successful(HttpResponse(404, entity = "Unknown resource!"))
|
||||
}
|
||||
|
||||
def run = {
|
||||
val ip = PropertyUtil.getPropertyValue("server_ip")
|
||||
val port = PropertyUtil.getIntPropertyValue("server_port")
|
||||
Http().bindAndHandleAsync(route, ip, port)
|
||||
println("Server:" + ip + ":" + port + " Started!!!")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
object Main {
|
||||
def main(argv: Array[String]):Unit = {
|
||||
HTTPService2.run
|
||||
}
|
||||
}
|
|
@ -0,0 +1,35 @@
|
|||
package cn.piflow.api.util
|
||||
|
||||
import java.io.{FileInputStream, InputStream}
|
||||
import java.util.Properties
|
||||
|
||||
object PropertyUtil {
|
||||
private val prop: Properties = new Properties()
|
||||
var fis: InputStream = null
|
||||
try{
|
||||
//val path = Thread.currentThread().getContextClassLoader.getResource("config.properties").getPath
|
||||
//fis = this.getClass.getResourceAsStream("")
|
||||
val userDir = System.getProperty("user.dir")
|
||||
val path = userDir + "/conf/" + "config.properties"
|
||||
prop.load(new FileInputStream(path))
|
||||
} catch{
|
||||
case ex: Exception => ex.printStackTrace()
|
||||
}
|
||||
|
||||
def getPropertyValue(propertyKey: String): String ={
|
||||
val obj = prop.get(propertyKey)
|
||||
if(obj != null){
|
||||
return obj.toString
|
||||
}
|
||||
null
|
||||
}
|
||||
|
||||
def getIntPropertyValue(propertyKey: String): Int ={
|
||||
val obj = prop.getProperty(propertyKey)
|
||||
if(obj != null){
|
||||
return obj.toInt
|
||||
}
|
||||
throw new NullPointerException
|
||||
}
|
||||
|
||||
}
|
|
@ -40,6 +40,8 @@ class FlowTest {
|
|||
.start(flow);
|
||||
|
||||
process.awaitTermination();
|
||||
val pid = process.pid();
|
||||
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
|
||||
spark.close();
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue