perf(系统管理): 修改资源池列表获取剩余并发数量方法
This commit is contained in:
parent
9ac9ff57cd
commit
c03a7f181b
|
@ -17,6 +17,7 @@ import io.metersphere.system.log.dto.LogDTO;
|
||||||
import io.metersphere.system.mapper.*;
|
import io.metersphere.system.mapper.*;
|
||||||
import io.metersphere.system.uid.IDGenerator;
|
import io.metersphere.system.uid.IDGenerator;
|
||||||
import io.metersphere.system.utils.TaskRunnerClient;
|
import io.metersphere.system.utils.TaskRunnerClient;
|
||||||
|
import jakarta.annotation.PreDestroy;
|
||||||
import jakarta.annotation.Resource;
|
import jakarta.annotation.Resource;
|
||||||
import org.apache.commons.collections.CollectionUtils;
|
import org.apache.commons.collections.CollectionUtils;
|
||||||
import org.apache.commons.lang3.BooleanUtils;
|
import org.apache.commons.lang3.BooleanUtils;
|
||||||
|
@ -28,8 +29,12 @@ import org.mybatis.spring.SqlSessionUtils;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.springframework.transaction.annotation.Transactional;
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.*;
|
||||||
import java.util.List;
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
@Transactional
|
@Transactional
|
||||||
|
@ -48,6 +53,8 @@ public class TestResourcePoolService {
|
||||||
@Resource
|
@Resource
|
||||||
private ExtResourcePoolMapper extResourcePoolMapper;
|
private ExtResourcePoolMapper extResourcePoolMapper;
|
||||||
|
|
||||||
|
private final ExecutorService executor = Executors.newFixedThreadPool(5);
|
||||||
|
|
||||||
private final static String poolControllerUrl = "http://%s:%s/metric";
|
private final static String poolControllerUrl = "http://%s:%s/metric";
|
||||||
|
|
||||||
|
|
||||||
|
@ -131,6 +138,9 @@ public class TestResourcePoolService {
|
||||||
public List<TestResourcePoolDTO> listResourcePools(QueryResourcePoolRequest request) {
|
public List<TestResourcePoolDTO> listResourcePools(QueryResourcePoolRequest request) {
|
||||||
List<TestResourcePool> testResourcePools = extResourcePoolMapper.getResourcePoolList(request);
|
List<TestResourcePool> testResourcePools = extResourcePoolMapper.getResourcePoolList(request);
|
||||||
List<TestResourcePoolDTO> testResourcePoolDTOS = new ArrayList<>();
|
List<TestResourcePoolDTO> testResourcePoolDTOS = new ArrayList<>();
|
||||||
|
Map<String, List<TestResourceNodeDTO>> nodeMap = new HashMap<>();
|
||||||
|
Map<String, TestResourcePoolDTO> poolDTOMap = new HashMap<>();
|
||||||
|
Set<String> nodeSets = new HashSet<>();
|
||||||
testResourcePools.forEach(pool -> {
|
testResourcePools.forEach(pool -> {
|
||||||
TestResourcePoolBlob testResourcePoolBlob = testResourcePoolBlobMapper.selectByPrimaryKey(pool.getId());
|
TestResourcePoolBlob testResourcePoolBlob = testResourcePoolBlobMapper.selectByPrimaryKey(pool.getId());
|
||||||
byte[] configuration = testResourcePoolBlob.getConfiguration();
|
byte[] configuration = testResourcePoolBlob.getConfiguration();
|
||||||
|
@ -158,26 +168,57 @@ public class TestResourcePoolService {
|
||||||
}
|
}
|
||||||
//获取最大并发
|
//获取最大并发
|
||||||
if (StringUtils.equalsIgnoreCase(pool.getType(), ResourcePoolTypeEnum.NODE.getName())) {
|
if (StringUtils.equalsIgnoreCase(pool.getType(), ResourcePoolTypeEnum.NODE.getName())) {
|
||||||
int maxConcurrentNumber = 0;
|
nodeMap.put(pool.getId(), testResourceDTO.getNodesList());
|
||||||
int concurrentNumber = 0;
|
poolDTOMap.put(pool.getId(), testResourcePoolDTO);
|
||||||
int occupiedConcurrentNumber = 0;
|
Set<String> nodeSet = testResourceDTO.getNodesList().stream()
|
||||||
for (TestResourceNodeDTO testResourceNodeDTO : testResourceDTO.getNodesList()) {
|
.map(node -> node.getIp() + ":" + node.getPort())
|
||||||
maxConcurrentNumber = maxConcurrentNumber + testResourceNodeDTO.getConcurrentNumber();
|
.collect(Collectors.toSet());
|
||||||
//TODO: 调接口获取剩余并发
|
nodeSets.addAll(nodeSet);
|
||||||
ResourcePoolNodeMetric nodeMetric = getNodeMetric(testResourceNodeDTO.getIp(), testResourceNodeDTO.getPort());
|
|
||||||
if (nodeMetric != null) {
|
|
||||||
concurrentNumber = concurrentNumber + (nodeMetric.getConcurrentNumber() == null ? 0 :nodeMetric.getConcurrentNumber());
|
|
||||||
occupiedConcurrentNumber = occupiedConcurrentNumber +(nodeMetric.getOccupiedConcurrentNumber() == null ? 0 :nodeMetric.getOccupiedConcurrentNumber());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
testResourcePoolDTO.setLastConcurrentNumber(concurrentNumber-occupiedConcurrentNumber);
|
|
||||||
testResourcePoolDTO.setMaxConcurrentNumber(maxConcurrentNumber);
|
|
||||||
} else {
|
} else {
|
||||||
|
//处理k8s资源池
|
||||||
testResourcePoolDTO.setMaxConcurrentNumber(testResourceDTO.getConcurrentNumber());
|
testResourcePoolDTO.setMaxConcurrentNumber(testResourceDTO.getConcurrentNumber());
|
||||||
|
testResourcePoolDTOS.add(testResourcePoolDTO);
|
||||||
}
|
}
|
||||||
|
});
|
||||||
|
//处理node资源池
|
||||||
|
Map<String,Integer>lastConcurrentNumberMap = new HashMap<>();
|
||||||
|
|
||||||
|
List<Future<Map<String, Integer>>> futures = new ArrayList<>();
|
||||||
|
for (String nodeSet : nodeSets) {
|
||||||
|
futures.add(executor.submit(() -> {
|
||||||
|
String[] split = nodeSet.split(":");
|
||||||
|
ResourcePoolNodeMetric nodeMetric = getNodeMetric(split[0], split[1]);
|
||||||
|
Map<String, Integer> resultMap = new HashMap<>();
|
||||||
|
if (nodeMetric!=null) {
|
||||||
|
resultMap.put(nodeSet, nodeMetric.getConcurrentNumber() - nodeMetric.getOccupiedConcurrentNumber());
|
||||||
|
}
|
||||||
|
return resultMap;
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
for (Future<Map<String, Integer>> future : futures) {
|
||||||
|
try {
|
||||||
|
lastConcurrentNumberMap.putAll(future.get());
|
||||||
|
} catch (InterruptedException | ExecutionException e) {
|
||||||
|
// 处理异常
|
||||||
|
LogUtils.error("获取剩余并发数失败:"+ e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
nodeMap.forEach((poolId,nodeList)->{
|
||||||
|
int lastConcurrentNumber = 0;
|
||||||
|
int maxConcurrentNumber = 0;
|
||||||
|
for (TestResourceNodeDTO testResourceNodeDTO : nodeList) {
|
||||||
|
if (lastConcurrentNumberMap.get(testResourceNodeDTO.getIp() + ":" + testResourceNodeDTO.getPort())!=null) {
|
||||||
|
lastConcurrentNumber = lastConcurrentNumber+lastConcurrentNumberMap.get(testResourceNodeDTO.getIp() + ":" + testResourceNodeDTO.getPort());
|
||||||
|
}
|
||||||
|
maxConcurrentNumber = maxConcurrentNumber + testResourceNodeDTO.getConcurrentNumber();
|
||||||
|
}
|
||||||
|
TestResourcePoolDTO testResourcePoolDTO = poolDTOMap.get(poolId);
|
||||||
|
testResourcePoolDTO.setLastConcurrentNumber(lastConcurrentNumber);
|
||||||
|
testResourcePoolDTO.setMaxConcurrentNumber(maxConcurrentNumber);
|
||||||
testResourcePoolDTOS.add(testResourcePoolDTO);
|
testResourcePoolDTOS.add(testResourcePoolDTO);
|
||||||
});
|
});
|
||||||
|
|
||||||
return testResourcePoolDTOS;
|
return testResourcePoolDTOS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -310,4 +351,9 @@ public class TestResourcePoolService {
|
||||||
public ResourcePoolNodeMetric getTestResourcePoolCapacityDetail(TestResourcePoolCapacityRequest request) {
|
public ResourcePoolNodeMetric getTestResourcePoolCapacityDetail(TestResourcePoolCapacityRequest request) {
|
||||||
return getNodeMetric(request.getIp(), request.getPort());
|
return getNodeMetric(request.getIp(), request.getPort());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@PreDestroy
|
||||||
|
private void shutdownExecutor() {
|
||||||
|
executor.shutdown();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue