This commit is contained in:
xiaoxiao 2018-08-30 15:21:34 +08:00
commit 3285bf602b
4 changed files with 95 additions and 18 deletions

View File

@ -1,2 +1,4 @@
server.ip=10.0.86.98
server.port=8001
server.port=8001
checkpoint.path=hdfs://10.0.86.89:9000/xjzhu/piflow/checkpoints/"

View File

@ -22,50 +22,98 @@
<version>0.9</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-remote -->
<dependency>
<!--<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_2.11</artifactId>
<version>2.3.12</version>
</dependency>-->
<!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-remote -->
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_2.11</artifactId>
<version>2.5.15</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-actor -->
<!--<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
<version>2.3.12</version>
</dependency>-->
<!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-actor -->
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
<version>2.3.12</version>
<version>2.5.15</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-slf4j -->
<!--<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-slf4j_2.11</artifactId>
<version>2.3.12</version>
</dependency>-->
<!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-slf4j -->
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-slf4j_2.11</artifactId>
<version>2.3.12</version>
<version>2.5.15</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-stream-experimental -->
<dependency>
<!--<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-stream-experimental_2.11</artifactId>
<version>2.0.4</version>
</dependency>-->
<!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-stream -->
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-stream_2.11</artifactId>
<version>2.5.15</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-http-core-experimental -->
<dependency>
<!--<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-http-core-experimental_2.11</artifactId>
<version>2.0.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-http-experimental -->
</dependency>-->
<!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-http-core -->
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-http-core_2.11</artifactId>
<version>10.1.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-http-experimental -->
<!--<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-http-experimental_2.11</artifactId>
<version>2.0.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-http-spray-json-experimental -->
</dependency>-->
<!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-http -->
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-http_2.11</artifactId>
<version>10.1.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-http-spray-json-experimental -->
<!--<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-http-spray-json-experimental_2.11</artifactId>
<version>2.0.4</version>
</dependency>-->
<!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-http-spray-json -->
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-http-spray-json_2.11</artifactId>
<version>10.1.4</version>
</dependency>
</dependencies>

View File

@ -3,15 +3,17 @@ package cn.piflow.api
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import org.apache.spark.sql.SparkSession
import cn.piflow.conf.util.OptionUtil
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.Process
import cn.piflow.api.util.PropertyUtil
import jodd.util.PropertiesUtil
import scala.util.parsing.json.JSON
object API {
def startFlow(flowJson : String):Process = {
//parse flow json
val map = OptionUtil.getAny(JSON.parseFull(flowJson)).asInstanceOf[Map[String, Any]]
println(map)
@ -32,7 +34,7 @@ object API {
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "hdfs://10.0.86.89:9000/xjzhu/piflow/checkpoints/")
.bind("checkpoint.path", PropertyUtil.getPropertyValue("checkpoint.path"))
.start(flow);
process.awaitTermination();

View File

@ -1,24 +1,33 @@
package cn.piflow.api
import java.io.File
import java.util.concurrent.CompletionStage
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.http.scaladsl.Http
import akka.http.scaladsl.common.{EntityStreamingSupport, JsonEntityStreamingSupport}
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import akka.http.scaladsl.server.Directives
import spray.json.DefaultJsonProtocol
import akka.http.scaladsl.model.HttpMethods._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.HttpMethods._
import akka.http.scaladsl.server.Directives
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{FileIO, Framing}
import akka.util.ByteString
import cn.piflow.api.util.PropertyUtil
import com.typesafe.config.ConfigFactory
import scala.concurrent.Future
import scala.util.parsing.json.JSON
import cn.piflow.Process
import org.apache.http.util.EntityUtils
import spray.json.DefaultJsonProtocol
object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSupport{
implicit val system = ActorSystem("HTTPService", ConfigFactory.load())
implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher
implicit val jsonStreamingSupport : JsonEntityStreamingSupport = EntityStreamingSupport.json
var processMap = Map[String, Process]()
def toJson(entity: RequestEntity): Map[String, Any] = {
@ -32,18 +41,34 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup
}
def route(req: HttpRequest): Future[HttpResponse] = req match {
case HttpRequest(GET, Uri.Path("/"), headers, entity, protocol) => {
Future.successful(HttpResponse(entity = "Get OK!"))
}
case HttpRequest(POST, Uri.Path("/flow/start"), headers, entity, protocol) =>{
//val flowJson = toJson(entity)
/*entity.getDataBytes().runFold(ByteString.empty, ByteString.materializer).thenCompose(r => {
val jsonString = r.utf8String
})*/
entity match {
case HttpEntity.Strict(_, data) =>{
val process = API.startFlow(data.utf8String)
processMap += (process.pid() -> process)
Future.successful(HttpResponse(entity = process.pid()))
}
case HttpEntity.Default(_,contentLength,source)=>{
//source.runFoldAsync(ByteString.empty,materializer)
//entity.dataBytes.runWith(FileIO.toPath(new File("/opt/flow.json").toPath))
//val temp = entity.dataBytes.runWith(Sink.head).map(_.utf8String)
//entity.toStrict(3000,materializer).whenComplete((strict, th) => {println(strict.getData.utf8String)})
val process = API.startFlow("")
processMap += (process.pid() -> process)
Future.successful(HttpResponse(entity = process.pid()))
}
case _ => Future.failed(new Exception("Can not start flow!"))
}