forked from opensci/piflow
add getResourceInfo API
This commit is contained in:
parent
ab29f7ac06
commit
b3e1f9b0d1
|
@ -97,8 +97,22 @@ object ConfigureUtil {
|
|||
piflowBundleJar
|
||||
}
|
||||
|
||||
def getYarnResourceManagerAPI() : String = {
|
||||
var yarnURL = PropertyUtil.getPropertyValue("yarn.url")
|
||||
if(yarnURL == null){
|
||||
var port = "8088"
|
||||
val yarnHostName = PropertyUtil.getPropertyValue("yarn.resourcemanager.hostname")
|
||||
if(PropertyUtil.getPropertyValue("yarn.resourcemanager.webapp.address.port") != null){
|
||||
port = PropertyUtil.getPropertyValue("yarn.resourcemanager.webapp.address.port")
|
||||
}
|
||||
yarnURL = "http://" + yarnHostName + ":" + port
|
||||
}
|
||||
val yarnAPI = yarnURL + "/ws/v1/cluster/"
|
||||
yarnAPI
|
||||
}
|
||||
|
||||
def getYarnResourceManagerWebAppAddress() : String = {
|
||||
var yarnResourceManagerWebAppAddress = PropertyUtil.getPropertyValue("yarn.url")
|
||||
/*var yarnResourceManagerWebAppAddress = PropertyUtil.getPropertyValue("yarn.url")
|
||||
if(yarnResourceManagerWebAppAddress == null){
|
||||
var port = "8088"
|
||||
val yarnHostName = PropertyUtil.getPropertyValue("yarn.resourcemanager.hostname")
|
||||
|
@ -106,8 +120,25 @@ object ConfigureUtil {
|
|||
port = PropertyUtil.getPropertyValue("yarn.resourcemanager.webapp.address.port")
|
||||
}
|
||||
yarnResourceManagerWebAppAddress = "http://" + yarnHostName + ":" + port + "/ws/v1/cluster/apps/"
|
||||
}
|
||||
yarnResourceManagerWebAppAddress
|
||||
}*/
|
||||
val yarnAPI = getYarnResourceManagerAPI()
|
||||
val webAppAddress = yarnAPI + "apps" + "/"
|
||||
webAppAddress
|
||||
}
|
||||
|
||||
def getYarnResourceMatrics(): String = {
|
||||
/*var yarnResourceManagerWebAppAddress = PropertyUtil.getPropertyValue("yarn.url")
|
||||
if(yarnResourceManagerWebAppAddress == null){
|
||||
var port = "8088"
|
||||
val yarnHostName = PropertyUtil.getPropertyValue("yarn.resourcemanager.hostname")
|
||||
if(PropertyUtil.getPropertyValue("yarn.resourcemanager.webapp.address.port") != null){
|
||||
port = PropertyUtil.getPropertyValue("yarn.resourcemanager.webapp.address.port")
|
||||
}
|
||||
yarnResourceManagerWebAppAddress = "http://" + yarnHostName + ":" + port + "/ws/v1/cluster/matrics/"
|
||||
}*/
|
||||
val yarnAPI = getYarnResourceManagerAPI()
|
||||
val matrics = yarnAPI + "metrics"
|
||||
matrics
|
||||
}
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
|
|
|
@ -214,4 +214,16 @@ object HdfsUtil {
|
|||
result
|
||||
}
|
||||
|
||||
def getCapacity() : Map[String, Any] = {
|
||||
val hdfsURL = PropertyUtil.getPropertyValue("fs.defaultFS")
|
||||
val conf = new Configuration()
|
||||
val fileSystem = FileSystem.get(new URI(hdfsURL),conf)
|
||||
val fsStatus = fileSystem.getStatus
|
||||
val capacity = fsStatus.getCapacity
|
||||
val remaining = fsStatus.getRemaining
|
||||
val used = fsStatus.getUsed
|
||||
val map = Map("capacity" -> capacity, "remaining" -> remaining, "used" -> used)
|
||||
map
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -22,6 +22,42 @@ import scala.util.parsing.json.JSON
|
|||
|
||||
object API {
|
||||
|
||||
def getResourceInfo() : String = {
|
||||
|
||||
try{
|
||||
val matricsURL = ConfigureUtil.getYarnResourceMatrics()
|
||||
val client = HttpClients.createDefault()
|
||||
val get:HttpGet = new HttpGet(matricsURL)
|
||||
|
||||
val response:CloseableHttpResponse = client.execute(get)
|
||||
val entity = response.getEntity
|
||||
val str = EntityUtils.toString(entity,"UTF-8")
|
||||
val yarnInfo = OptionUtil.getAny(JSON.parseFull(str)).asInstanceOf[Map[String, Any]]
|
||||
val matricInfo = MapUtil.get(yarnInfo, "clusterMetrics").asInstanceOf[Map[String, Any]]
|
||||
|
||||
|
||||
val cpuInfo = Map(
|
||||
"totalVirtualCores" -> matricInfo.getOrElse("totalVirtualCores",""),
|
||||
"allocatedVirtualCores" -> matricInfo.getOrElse("allocatedVirtualCores",""),
|
||||
"reservedVirtualCores" -> matricInfo.getOrElse("reservedVirtualCores","")
|
||||
)
|
||||
val memoryInfo = Map(
|
||||
"totalMB" -> matricInfo.getOrElse("totalMB",""),
|
||||
"allocatedMB" -> matricInfo.getOrElse("allocatedMB",""),
|
||||
"reservedMB" -> matricInfo.getOrElse("reservedMB","")
|
||||
)
|
||||
val hdfsInfo = HdfsUtil.getCapacity()
|
||||
|
||||
val map = Map("cpu" -> cpuInfo, "memory" -> memoryInfo, "hdfs" -> hdfsInfo)
|
||||
val resultMap = Map("resource" -> map)
|
||||
|
||||
JsonUtil.format(JsonUtil.toJson(resultMap))
|
||||
}catch{
|
||||
case ex:Exception => ""
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
def getScheduleInfo(scheduleId : String) : String = {
|
||||
|
||||
val scheduleInfo = H2Util.getScheduleInfo(scheduleId)
|
||||
|
|
|
@ -0,0 +1,22 @@
|
|||
package cn.piflow.api
|
||||
|
||||
import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet}
|
||||
import org.apache.http.impl.client.HttpClients
|
||||
import org.apache.http.util.EntityUtils
|
||||
|
||||
object HTTPClientGetResourceInfo {
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
|
||||
val url = "http://10.0.85.83:8001/resource/info"
|
||||
val client = HttpClients.createDefault()
|
||||
val getFlowDebugData:HttpGet = new HttpGet(url)
|
||||
|
||||
val response:CloseableHttpResponse = client.execute(getFlowDebugData)
|
||||
val entity = response.getEntity
|
||||
val str = EntityUtils.toString(entity,"UTF-8")
|
||||
println("Code is " + str)
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -388,6 +388,17 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup
|
|||
}
|
||||
}
|
||||
|
||||
case HttpRequest(GET, Uri.Path("/resource/info"), headers, entity, protocol) =>{
|
||||
|
||||
val resourceInfo = API.getResourceInfo()
|
||||
if (resourceInfo != ""){
|
||||
Future.successful(HttpResponse(SUCCESS_CODE, entity = resourceInfo))
|
||||
}else{
|
||||
Future.successful(HttpResponse(FAIL_CODE, entity = "get resource info error!"))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
case _: HttpRequest =>
|
||||
Future.successful(HttpResponse(UNKNOWN_CODE, entity = "Unknown resource!"))
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue