rename piflow-api to piflow-server;add getFlowInfo Api

This commit is contained in:
judy0131 2018-09-04 09:53:19 +08:00
parent c347710084
commit 508f0639a9
6 changed files with 159 additions and 127 deletions

View File

@ -1,4 +1,9 @@
server.ip=10.0.86.98 server.ip=10.0.86.98
server.port=8001 server.port=8001
spark.version=2.1.0
spark.master=10.0.86.89
spark.port=6066
yarn.url=http://192.168.6.22:8032/ws/v1/cluster/apps/
checkpoint.path=hdfs://10.0.86.89:9000/xjzhu/piflow/checkpoints/" checkpoint.path=hdfs://10.0.86.89:9000/xjzhu/piflow/checkpoints/"

View File

@ -9,7 +9,7 @@
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>piflow-api</artifactId> <artifactId>piflow-server</artifactId>
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>piflow</groupId> <groupId>piflow</groupId>

View File

@ -6,7 +6,13 @@ import org.apache.spark.sql.SparkSession
import cn.piflow.conf.util.{FileUtil, OptionUtil} import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.Process import cn.piflow.Process
import cn.piflow.api.util.PropertyUtil import cn.piflow.api.util.PropertyUtil
import com.github.ywilkof.sparkrestclient.{JobStatusResponse, SparkRestClient}
import com.github.ywilkof.sparkrestclient.SparkRestClient.SparkRestClientBuilder
import jodd.util.PropertiesUtil import jodd.util.PropertiesUtil
import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet, HttpPost}
import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.HttpClients
import org.apache.http.util.EntityUtils
import scala.util.parsing.json.JSON import scala.util.parsing.json.JSON
@ -24,7 +30,7 @@ object API {
//execute flow //execute flow
val spark = SparkSession.builder() val spark = SparkSession.builder()
.master("spark://10.0.86.89:7077") .master("spark://10.0.86.89:7077")
.appName("piflow-hive-bundle") .appName(flowBean.name)
.config("spark.driver.memory", "1g") .config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g") .config("spark.executor.memory", "2g")
.config("spark.cores.max", "2") .config("spark.cores.max", "2")
@ -32,6 +38,7 @@ object API {
.enableHiveSupport() .enableHiveSupport()
.getOrCreate() .getOrCreate()
val process = Runner.create() val process = Runner.create()
.bind(classOf[SparkSession].getName, spark) .bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", PropertyUtil.getPropertyValue("checkpoint.path")) .bind("checkpoint.path", PropertyUtil.getPropertyValue("checkpoint.path"))
@ -46,4 +53,17 @@ object API {
process.stop() process.stop()
"ok" "ok"
} }
def getFlowInfo(appID : String) : String = {
val url = PropertyUtil.getPropertyValue("yarn.url") + appID
val client = HttpClients.createDefault()
val get:HttpGet = new HttpGet(url)
val response:CloseableHttpResponse = client.execute(get)
val entity = response.getEntity
val str = EntityUtils.toString(entity,"UTF-8")
println("Code is " + str)
str
}
} }

View File

@ -1,6 +1,7 @@
package cn.piflow.api package cn.piflow.api
import org.apache.http.client.methods.{CloseableHttpResponse, HttpPost} import cn.piflow.api.util.PropertyUtil
import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet, HttpPost}
import org.apache.http.entity.StringEntity import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.HttpClients import org.apache.http.impl.client.HttpClients
import org.apache.http.util.EntityUtils import org.apache.http.util.EntityUtils

View File

@ -1,89 +1,95 @@
package cn.piflow.api package cn.piflow.api
import java.io.File import java.io.File
import java.util.concurrent.CompletionStage import java.util.concurrent.CompletionStage
import akka.actor.ActorSystem import akka.actor.ActorSystem
import akka.http.scaladsl.Http import akka.http.scaladsl.Http
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import akka.http.scaladsl.model._ import akka.http.scaladsl.model._
import akka.http.scaladsl.model.HttpMethods._ import akka.http.scaladsl.model.HttpMethods._
import akka.http.scaladsl.server.Directives import akka.http.scaladsl.server.Directives
import akka.stream.ActorMaterializer import akka.stream.ActorMaterializer
import cn.piflow.api.util.PropertyUtil import cn.piflow.api.util.PropertyUtil
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import scala.concurrent.Future import scala.concurrent.Future
import scala.util.parsing.json.JSON import scala.util.parsing.json.JSON
import cn.piflow.Process import cn.piflow.Process
import spray.json.DefaultJsonProtocol import spray.json.DefaultJsonProtocol
object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSupport{ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSupport{
implicit val system = ActorSystem("HTTPService", ConfigFactory.load()) implicit val system = ActorSystem("HTTPService", ConfigFactory.load())
implicit val materializer = ActorMaterializer() implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher implicit val executionContext = system.dispatcher
var processMap = Map[String, Process]() var processMap = Map[String, Process]()
def toJson(entity: RequestEntity): Map[String, Any] = { def toJson(entity: RequestEntity): Map[String, Any] = {
entity match { entity match {
case HttpEntity.Strict(_, data) =>{ case HttpEntity.Strict(_, data) =>{
val temp = JSON.parseFull(data.utf8String) val temp = JSON.parseFull(data.utf8String)
temp.get.asInstanceOf[Map[String, Any]] temp.get.asInstanceOf[Map[String, Any]]
} }
case _ => Map() case _ => Map()
} }
} }
def route(req: HttpRequest): Future[HttpResponse] = req match { def route(req: HttpRequest): Future[HttpResponse] = req match {
case HttpRequest(GET, Uri.Path("/"), headers, entity, protocol) => { case HttpRequest(GET, Uri.Path("/"), headers, entity, protocol) => {
Future.successful(HttpResponse(entity = "Get OK!")) Future.successful(HttpResponse(entity = "Get OK!"))
} }
case HttpRequest(POST, Uri.Path("/flow/start"), headers, entity, protocol) =>{ case HttpRequest(GET, Uri.Path("/flow/info"), headers, entity, protocol) => {
val appID = ""
entity match { val result = API.getFlowInfo(appID)
case HttpEntity.Strict(_, data) =>{ Future.successful(HttpResponse(entity = "Get OK!"))
val flowJson = data.utf8String }
val process = API.startFlow(flowJson)
processMap += (process.pid() -> process) case HttpRequest(POST, Uri.Path("/flow/start"), headers, entity, protocol) =>{
Future.successful(HttpResponse(entity = process.pid()))
} entity match {
case HttpEntity.Strict(_, data) =>{
case _ => Future.failed(new Exception("Can not start flow!")) val flowJson = data.utf8String
} val process = API.startFlow(flowJson)
processMap += (process.pid() -> process)
} Future.successful(HttpResponse(entity = process.pid()))
}
case HttpRequest(POST, Uri.Path("/flow/stop"), headers, entity, protocol) =>{
val data = toJson(entity) case _ => Future.failed(new Exception("Can not start flow!"))
val processId = data.get("processId").getOrElse("").asInstanceOf[String] }
if(processId.equals("") || !processMap.contains(processId)){
Future.failed(new Exception("Can not found process Error!")) }
}else{
case HttpRequest(POST, Uri.Path("/flow/stop"), headers, entity, protocol) =>{
val result = API.stopFlow(processMap.get(processId).asInstanceOf[Process]) val data = toJson(entity)
Future.successful(HttpResponse(entity = result)) val processId = data.get("processId").getOrElse("").asInstanceOf[String]
} if(processId.equals("") || !processMap.contains(processId)){
} Future.failed(new Exception("Can not found process Error!"))
}else{
case _: HttpRequest =>
Future.successful(HttpResponse(404, entity = "Unknown resource!")) val result = API.stopFlow(processMap.get(processId).asInstanceOf[Process])
} Future.successful(HttpResponse(entity = result))
}
def run = { }
val ip = PropertyUtil.getPropertyValue("server.ip")
val port = PropertyUtil.getIntPropertyValue("server.port") case _: HttpRequest =>
Http().bindAndHandleAsync(route, ip, port) Future.successful(HttpResponse(404, entity = "Unknown resource!"))
println("Server:" + ip + ":" + port + " Started!!!") }
}
def run = {
} val ip = PropertyUtil.getPropertyValue("server.ip")
val port = PropertyUtil.getIntPropertyValue("server.port")
object Main { Http().bindAndHandleAsync(route, ip, port)
def main(argv: Array[String]):Unit = { println("Server:" + ip + ":" + port + " Started!!!")
HTTPService.run }
}
} }
object Main {
def main(argv: Array[String]):Unit = {
HTTPService.run
}
}

View File

@ -1,35 +1,35 @@
package cn.piflow.api.util package cn.piflow.api.util
import java.io.{FileInputStream, InputStream} import java.io.{FileInputStream, InputStream}
import java.util.Properties import java.util.Properties
object PropertyUtil { object PropertyUtil {
private val prop: Properties = new Properties() private val prop: Properties = new Properties()
var fis: InputStream = null var fis: InputStream = null
try{ try{
//val path = Thread.currentThread().getContextClassLoader.getResource("config.properties").getPath //val path = Thread.currentThread().getContextClassLoader.getResource("config.properties").getPath
//fis = this.getClass.getResourceAsStream("") //fis = this.getClass.getResourceAsStream("")
val userDir = System.getProperty("user.dir") val userDir = System.getProperty("user.dir")
val path = userDir + "/conf/" + "config.properties" val path = userDir + "/conf/" + "config.properties"
prop.load(new FileInputStream(path)) prop.load(new FileInputStream(path))
} catch{ } catch{
case ex: Exception => ex.printStackTrace() case ex: Exception => ex.printStackTrace()
} }
def getPropertyValue(propertyKey: String): String ={ def getPropertyValue(propertyKey: String): String ={
val obj = prop.get(propertyKey) val obj = prop.get(propertyKey)
if(obj != null){ if(obj != null){
return obj.toString return obj.toString
} }
null null
} }
def getIntPropertyValue(propertyKey: String): Int ={ def getIntPropertyValue(propertyKey: String): Int ={
val obj = prop.getProperty(propertyKey) val obj = prop.getProperty(propertyKey)
if(obj != null){ if(obj != null){
return obj.toInt return obj.toInt
} }
throw new NullPointerException throw new NullPointerException
} }
} }