add httpClient

This commit is contained in:
judy0131 2018-08-31 13:07:44 +08:00
parent 3285bf602b
commit 6db6d54e56
4 changed files with 44 additions and 27 deletions

View File

@ -21,6 +21,11 @@
<artifactId>piflow-conf</artifactId>
<version>0.9</version>
</dependency>
<dependency>
<groupId>piflow</groupId>
<artifactId>piflow-bundle</artifactId>
<version>0.9</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-remote -->
<!--<dependency>
<groupId>com.typesafe.akka</groupId>
@ -28,11 +33,11 @@
<version>2.3.12</version>
</dependency>-->
<!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-remote -->
<dependency>
<!--<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_2.11</artifactId>
<version>2.5.15</version>
</dependency>
<version>2.4.7</version>
</dependency>-->
<!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-actor -->
<!--<dependency>
@ -44,7 +49,7 @@
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
<version>2.5.15</version>
<version>2.5.12</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-slf4j -->
@ -57,7 +62,7 @@
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-slf4j_2.11</artifactId>
<version>2.5.15</version>
<version>2.5.12</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-stream-experimental -->
@ -70,7 +75,7 @@
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-stream_2.11</artifactId>
<version>2.5.15</version>
<version>2.5.12</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-http-core-experimental -->
<!--<dependency>
@ -82,7 +87,7 @@
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-http-core_2.11</artifactId>
<version>10.1.4</version>
<version>10.1.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-http-experimental -->
@ -95,7 +100,7 @@
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-http_2.11</artifactId>
<version>10.1.4</version>
<version>10.1.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-http-spray-json-experimental -->
@ -108,7 +113,7 @@
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-http-spray-json_2.11</artifactId>
<version>10.1.4</version>
<version>10.1.3</version>
</dependency>

View File

@ -0,0 +1,24 @@
package cn.piflow.api
import org.apache.http.client.methods.{CloseableHttpResponse, HttpPost}
import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.HttpClients
import org.apache.http.util.EntityUtils
object HTTPClient {
def main(args: Array[String]): Unit = {
val json = """{"flow":{"name":"test","uuid":"1234","checkpoint":"Merge","stops":[{"uuid":"1111","name":"XmlParser","bundle":"cn.piflow.bundle.xml.XmlParser","properties":{"xmlpath":"hdfs://10.0.86.89:9000/xjzhu/dblp.mini.xml","rowTag":"phdthesis"}},{"uuid":"2222","name":"SelectField","bundle":"cn.piflow.bundle.common.SelectField","properties":{"schema":"title,author,pages"}},{"uuid":"3333","name":"PutHiveStreaming","bundle":"cn.piflow.bundle.hive.PutHiveStreaming","properties":{"database":"sparktest","table":"dblp_phdthesis"}},{"uuid":"4444","name":"CsvParser","bundle":"cn.piflow.bundle.csv.CsvParser","properties":{"csvPath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.csv","header":"false","delimiter":",","schema":"title,author,pages"}},{"uuid":"555","name":"Merge","bundle":"cn.piflow.bundle.common.Merge","properties":{}},{"uuid":"666","name":"Fork","bundle":"cn.piflow.bundle.common.Fork","properties":{"outports":["out1","out2","out3"]}},{"uuid":"777","name":"JsonSave","bundle":"cn.piflow.bundle.json.JsonSave","properties":{"jsonSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.json"}},{"uuid":"888","name":"CsvSave","bundle":"cn.piflow.bundle.csv.CsvSave","properties":{"csvSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.csv","header":"true","delimiter":","}}],"paths":[{"from":"XmlParser","outport":"","inport":"","to":"SelectField"},{"from":"SelectField","outport":"","inport":"data1","to":"Merge"},{"from":"CsvParser","outport":"","inport":"data2","to":"Merge"},{"from":"Merge","outport":"","inport":"","to":"Fork"},{"from":"Fork","outport":"out1","inport":"","to":"PutHiveStreaming"},{"from":"Fork","outport":"out2","inport":"","to":"JsonSave"},{"from":"Fork","outport":"out3","inport":"","to":"CsvSave"}]}}"""
val url = "http://10.0.86.98:8001/flow/start"
val client = HttpClients.createDefault()
val post:HttpPost = new HttpPost(url)
post.addHeader("Content-Type", "application/json")
post.setEntity(new StringEntity(json))
val response:CloseableHttpResponse = client.execute(post)
val entity = response.getEntity
val str = EntityUtils.toString(entity,"UTF-8")
}
}

View File

@ -3,6 +3,7 @@ package cn.piflow.api
import java.io.File
import java.util.concurrent.CompletionStage
import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.common.{EntityStreamingSupport, JsonEntityStreamingSupport}
@ -10,8 +11,9 @@ import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.HttpMethods._
import akka.http.scaladsl.server.Directives
import akka.http.impl.util.StreamUtils
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{FileIO, Framing}
import akka.stream.scaladsl.{FileIO, Framing, Sink, Source}
import akka.util.ByteString
import cn.piflow.api.util.PropertyUtil
import com.typesafe.config.ConfigFactory
@ -19,7 +21,6 @@ import com.typesafe.config.ConfigFactory
import scala.concurrent.Future
import scala.util.parsing.json.JSON
import cn.piflow.Process
import org.apache.http.util.EntityUtils
import spray.json.DefaultJsonProtocol
@ -48,27 +49,14 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup
case HttpRequest(POST, Uri.Path("/flow/start"), headers, entity, protocol) =>{
/*entity.getDataBytes().runFold(ByteString.empty, ByteString.materializer).thenCompose(r => {
val jsonString = r.utf8String
})*/
entity match {
case HttpEntity.Strict(_, data) =>{
val process = API.startFlow(data.utf8String)
val flowJson = data.utf8String
val process = API.startFlow(flowJson)
processMap += (process.pid() -> process)
Future.successful(HttpResponse(entity = process.pid()))
}
case HttpEntity.Default(_,contentLength,source)=>{
//source.runFoldAsync(ByteString.empty,materializer)
//entity.dataBytes.runWith(FileIO.toPath(new File("/opt/flow.json").toPath))
//val temp = entity.dataBytes.runWith(Sink.head).map(_.utf8String)
//entity.toStrict(3000,materializer).whenComplete((strict, th) => {println(strict.getData.utf8String)})
val process = API.startFlow("")
processMap += (process.pid() -> process)
Future.successful(HttpResponse(entity = process.pid()))
}
case _ => Future.failed(new Exception("Can not start flow!"))
}