refactor: 优化资源节点轮询算法

Signed-off-by: fit2-zhao <yong.zhao@fit2cloud.com>
This commit is contained in:
fit2-zhao 2024-09-02 18:26:01 +08:00 committed by Craftsman
parent 8ba6d426de
commit 4034f48213
1 changed files with 63 additions and 53 deletions

View File

@ -1,6 +1,5 @@
package io.metersphere.api.jmeter.utils; package io.metersphere.api.jmeter.utils;
import io.metersphere.base.domain.TestResource; import io.metersphere.base.domain.TestResource;
import io.metersphere.commons.utils.GenerateHashTreeUtil; import io.metersphere.commons.utils.GenerateHashTreeUtil;
import io.metersphere.commons.utils.JSON; import io.metersphere.commons.utils.JSON;
@ -12,102 +11,115 @@ import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
import java.util.*; import java.util.*;
import java.util.stream.Collectors;
/** /**
* 平滑加权轮询算法 * 平滑加权轮询算法
*/ */
public class SmoothWeighted { public class SmoothWeighted {
private static final String BASE_URL = "http://%s:%d"; public static final String BASE_URL = "http://%s:%d";
public static final String CONFIG = "CONFIG_"; public static final String CONFIG = "CONFIG_";
public static final String EXEC_INDEX = "EXEC_INDEX_"; public static final String EXEC_INDEX = "EXEC_INDEX_";
public static void setServerConfig(String poolId, RedisTemplate client) { public static void setServerConfig(String poolId, RedisTemplate<String, Object> client) {
if (StringUtils.isEmpty(poolId)) { if (StringUtils.isEmpty(poolId)) {
return; return;
} }
List<TestResource> resources = new ArrayList<>(); List<TestResource> resources = new ArrayList<>();
BooleanPool pool = GenerateHashTreeUtil.isResourcePool(poolId); BooleanPool pool = GenerateHashTreeUtil.isResourcePool(poolId);
if (pool.isPool()) { if (pool.isPool()) {
resources = GenerateHashTreeUtil.setPoolResource(poolId); resources = GenerateHashTreeUtil.setPoolResource(poolId);
} }
if (CollectionUtils.isEmpty(resources)) { if (CollectionUtils.isEmpty(resources)) {
client.delete(CONFIG + poolId); client.delete(CONFIG + poolId);
client.delete(EXEC_INDEX + poolId); client.delete(EXEC_INDEX + poolId);
return; return;
} }
Map<String, ServerConfig> configs = new HashMap<>(); Map<String, ServerConfig> configs = resources.stream()
for (TestResource testResource : resources) { .map(testResource -> {
String configuration = testResource.getConfiguration(); String configuration = testResource.getConfiguration();
if (StringUtils.isNotEmpty(configuration)) { if (StringUtils.isNotEmpty(configuration)) {
NodeDTO node = JSON.parseObject(configuration, NodeDTO.class); NodeDTO node = JSON.parseObject(configuration, NodeDTO.class);
String uri = String.format(BASE_URL + "/jmeter/api/start", node.getIp(), node.getPort()); String uri = String.format(BASE_URL + "/jmeter/api/start", node.getIp(), node.getPort());
configs.put(uri, new ServerConfig(uri, 1, 1, node.getMaxConcurrency(), node.isEnable())); return new ServerConfig(uri, 1, 1, node.getMaxConcurrency(), node.isEnable());
} }
} return null;
})
.filter(Objects::nonNull)
.collect(Collectors.toMap(ServerConfig::getUrl, serverConfig -> serverConfig));
if (client.opsForValue().get(CONFIG + poolId) == null) { List<ServerConfig> existingConfigs = (List<ServerConfig>) client.opsForValue().get(CONFIG + poolId);
if (existingConfigs == null) {
client.opsForValue().set(CONFIG + poolId, new ArrayList<>(configs.values())); client.opsForValue().set(CONFIG + poolId, new ArrayList<>(configs.values()));
} else { } else {
// 合并相同节点信息更改权重 Map<String, ServerConfig> existingConfigMap = existingConfigs.stream()
List<ServerConfig> serverConfigs = (List<ServerConfig>) client.opsForValue().get(CONFIG + poolId); .collect(Collectors.toMap(ServerConfig::getUrl, serverConfig -> serverConfig));
serverConfigs.forEach(item -> {
if (configs.containsKey(item.getUrl())) {
item.setCorePoolSize(configs.get(item.getUrl()).getCorePoolSize());
item.setEnable(configs.get(item.getUrl()).isEnable());
item.setWeight(configs.get(item.getUrl()).getWeight());
}
});
// 添加新增节点
configs.forEach((k, v) -> {
if (!serverConfigs.contains(v)) {
serverConfigs.add(v);
}
});
// 异常已经废弃的节点
serverConfigs.removeIf(item -> !configs.containsKey(item.getUrl()));
client.opsForValue().set(CONFIG + poolId, serverConfigs); // 合并相同节点信息更改权重
existingConfigMap.forEach((url, existingConfig) -> {
if (configs.containsKey(url)) {
ServerConfig newConfig = configs.get(url);
existingConfig.setCorePoolSize(newConfig.getCorePoolSize());
existingConfig.setEnable(newConfig.isEnable());
existingConfig.setWeight(newConfig.getWeight());
}
});
// 添加新增节点
existingConfigs.addAll(configs.values().stream()
.filter(config -> !existingConfigMap.containsKey(config.getUrl()))
.toList());
// 移除废弃的节点
existingConfigs.removeIf(config -> !configs.containsKey(config.getUrl()));
client.opsForValue().set(CONFIG + poolId, existingConfigs);
} }
if (client.opsForValue().get(EXEC_INDEX + poolId) == null) { if (client.opsForValue().get(EXEC_INDEX + poolId) == null) {
client.opsForValue().set(EXEC_INDEX + poolId, 1); client.opsForValue().set(EXEC_INDEX + poolId, 1);
} }
} }
public static ServerConfig calculate(String poolId, RedisTemplate client) { public static ServerConfig calculate(String poolId, RedisTemplate<String, Object> client) {
if (client.opsForValue().get(EXEC_INDEX + poolId) == null) { if (client.opsForValue().get(EXEC_INDEX + poolId) == null) {
client.opsForValue().set(EXEC_INDEX + poolId, 1); client.opsForValue().set(EXEC_INDEX + poolId, 1);
} }
List<ServerConfig> serverList = new ArrayList<>();
if (client.opsForValue().get(CONFIG + poolId) != null) {
serverList = (List<ServerConfig>) client.opsForValue().get(CONFIG + poolId);
}
// 最大权重
int weightSum = serverList.stream().map(ServerConfig::getWeight).reduce((x, y) -> x += y).get();
long execIndex = Long.parseLong(client.opsForValue().get(EXEC_INDEX + poolId).toString()); List<ServerConfig> serverList = Optional.ofNullable((List<ServerConfig>) client.opsForValue().get(CONFIG + poolId))
// 选出当前有效权重最大的实例将当前有效权重currentWeight减去所有实例的"权重和"weightSum且变量tmpSv指向此位置 .orElse(new ArrayList<>());
ServerConfig max = Collections.max(serverList, Comparator.comparingLong(ServerConfig::getCurrentWeight));
// 最大权重
int weightSum = serverList.stream()
.mapToInt(ServerConfig::getWeight)
.sum();
long execIndex = Long.parseLong(Objects.requireNonNull(client.opsForValue().get(EXEC_INDEX + poolId)).toString());
// 选出当前有效权重最大的实例
ServerConfig max = serverList.stream()
.max(Comparator.comparingLong(ServerConfig::getCurrentWeight))
.orElse(null);
// 选中的实例 // 选中的实例
ServerConfig tmpConfig = null; ServerConfig tmpConfig = null;
for (ServerConfig serverConfig : serverList) { for (ServerConfig serverConfig : serverList) {
if (max.equals(serverConfig)) { if (serverConfig.equals(max)) {
serverConfig.setCurrentWeight(serverConfig.getCurrentWeight() - weightSum); serverConfig.setCurrentWeight(serverConfig.getCurrentWeight() - weightSum);
if (tmpConfig == null || serverConfig.getCurrentWeight() > tmpConfig.getCurrentWeight()) { if (tmpConfig == null || serverConfig.getCurrentWeight() > tmpConfig.getCurrentWeight()) {
tmpConfig = serverConfig; tmpConfig = serverConfig;
} }
} }
//将每个实例的当前有效权重currentWeight都加上配置权重weight
serverConfig.setCurrentWeight(serverConfig.getCurrentWeight() + serverConfig.getWeight()); serverConfig.setCurrentWeight(serverConfig.getCurrentWeight() + serverConfig.getWeight());
} }
// 选中前的当前权重 LoggerUtil.info(String.format("第%d次选中前的当前权重:%s", execIndex, JSON.toJSONString(serverList)));
LoggerUtil.info("" + (execIndex) + "次选中前的当前权重:" + JSON.toJSONString(serverList));
if (client.opsForValue().get(CONFIG + poolId) != null) { client.opsForValue().set(CONFIG + poolId, serverList);
client.opsForValue().set(CONFIG + poolId, serverList);
}
return tmpConfig; return tmpConfig;
} }
@ -116,15 +128,13 @@ public class SmoothWeighted {
if (pool.isPool()) { if (pool.isPool()) {
List<TestResource> resources = GenerateHashTreeUtil.setPoolResource(poolId); List<TestResource> resources = GenerateHashTreeUtil.setPoolResource(poolId);
if (CollectionUtils.isNotEmpty(resources)) { if (CollectionUtils.isNotEmpty(resources)) {
int index = (int) (Math.random() * resources.size()); int index = new Random().nextInt(resources.size());
TestResource testResource = resources.get(index); TestResource testResource = resources.get(index);
NodeDTO node = JSON.parseObject(testResource.getConfiguration(), NodeDTO.class); NodeDTO node = JSON.parseObject(testResource.getConfiguration(), NodeDTO.class);
String nodeIp = node.getIp(); String uri = String.format(BASE_URL + "/jmeter/api/start", node.getIp(), node.getPort());
Integer port = node.getPort();
String uri = String.format(BASE_URL + "/jmeter/api/start", nodeIp, port);
return new ServerConfig(uri, 1, 1, node.getMaxConcurrency(), node.isEnable()); return new ServerConfig(uri, 1, 1, node.getMaxConcurrency(), node.isEnable());
} }
} }
return null; return null;
} }
} }