refactor(系统设置): 修改资源池验证方法

This commit is contained in:
fit2-zhao 2024-10-17 17:41:14 +08:00 committed by Craftsman
parent 16203cfe1d
commit e1d6ec26f8
3 changed files with 2 additions and 340 deletions

View File

@ -1,95 +0,0 @@
package io.metersphere.system.engine;
import io.metersphere.engine.ApiEngine;
import io.metersphere.sdk.dto.api.task.TaskBatchRequestDTO;
import io.metersphere.sdk.dto.api.task.TaskRequestDTO;
import io.metersphere.sdk.exception.MSException;
import io.metersphere.sdk.exception.TaskRunnerResultCode;
import io.metersphere.sdk.util.LogUtils;
import io.metersphere.system.controller.handler.ResultHolder;
import io.metersphere.system.dto.pool.TestResourceDTO;
import org.springframework.web.client.HttpServerErrorException;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
public class KubernetesExecEngine implements ApiEngine {
/**
* 任务请求参数 @LINK TaskRequestDTO or TaskBatchRequestDTO or List<String>
*/
private final Object request;
private final TestResourceDTO resource;
/**
* 单调执行构造函数
*
* @param request 任务请求参数
* @param resource 资源池
*/
public KubernetesExecEngine(TaskRequestDTO request, TestResourceDTO resource) {
this.request = request;
this.resource = resource;
}
/**
* 批量执行构造函数
*
* @param batchRequestDTO 批量任务请求参数
* @param resource 资源池
*/
public KubernetesExecEngine(TaskBatchRequestDTO batchRequestDTO, TestResourceDTO resource) {
this.resource = resource;
this.request = batchRequestDTO;
}
/**
* 停止执行构造函数
*
* @param reportIds 任务ID列表
* @param resource 资源池
*/
public KubernetesExecEngine(List<String> reportIds, TestResourceDTO resource) {
this.resource = resource;
this.request = reportIds;
}
@Override
public void execute(String path) {
// 初始化任务
LogUtils.info("CURL 执行方法:【 " + path + "");
this.runApi(path, request);
}
private void runApi(String command, Object request) {
try {
KubernetesProvider.exec(resource, request, command);
} catch (HttpServerErrorException e) {
handleHttpServerError(e);
} catch (Exception e) {
handleGeneralError(e);
}
}
private void handleHttpServerError(HttpServerErrorException e) {
LogUtils.error("K8S 执行异常:", e);
// 获取错误代码并处理
int errorCode = Optional.ofNullable(e.getResponseBodyAs(ResultHolder.class))
.map(ResultHolder::getCode)
.orElseThrow(() -> new MSException("Unknown error code"));
// 匹配资源池的错误代码并抛出相应异常
TaskRunnerResultCode resultCode = Arrays.stream(TaskRunnerResultCode.values())
.filter(code -> code.getCode() == errorCode)
.findFirst()
.orElseThrow(() -> new MSException(e.getMessage()));
throw new MSException(resultCode, e.getMessage());
}
private void handleGeneralError(Exception e) {
LogUtils.error("K8S 执行异常:", e);
throw new MSException(e.getMessage());
}
}

View File

@ -1,243 +0,0 @@
package io.metersphere.system.engine;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
import io.fabric8.kubernetes.client.dsl.ExecListener;
import io.metersphere.sdk.constants.KafkaTopicConstants;
import io.metersphere.sdk.constants.MsgType;
import io.metersphere.sdk.constants.ResultStatus;
import io.metersphere.sdk.dto.SocketMsgDTO;
import io.metersphere.sdk.dto.api.result.ProcessResultDTO;
import io.metersphere.sdk.dto.api.result.TaskResultDTO;
import io.metersphere.sdk.dto.api.task.TaskBatchRequestDTO;
import io.metersphere.sdk.dto.api.task.TaskRequestDTO;
import io.metersphere.sdk.exception.MSException;
import io.metersphere.sdk.util.*;
import io.metersphere.system.dto.pool.TestResourceDTO;
import org.apache.commons.lang3.StringUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.util.CollectionUtils;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
public class KubernetesProvider {
private static final String RUNNING_PHASE = "Running";
private static final String SHELL_COMMAND = "sh";
private static final int TIMEOUT = 120000;
public static KubernetesClient getKubernetesClient(TestResourceDTO credential) {
ConfigBuilder configBuilder = new ConfigBuilder()
.withMasterUrl(credential.getIp())
.withOauthToken(credential.getToken())
.withTrustCerts(true)
.withConnectionTimeout(TIMEOUT) // 120秒连接超时
.withRequestTimeout(TIMEOUT) // 120秒请求超时
.withNamespace(credential.getNamespace());
return new KubernetesClientBuilder()
.withConfig(configBuilder.build())
.build();
}
public static Pod getExecPod(KubernetesClient client, TestResourceDTO credential) {
List<Pod> nodePods = getPods(client, credential);
if (CollectionUtils.isEmpty(nodePods)) {
throw new MSException("Execution node not found");
}
return nodePods.get(ThreadLocalRandom.current().nextInt(nodePods.size()));
}
public static List<Pod> getPods(KubernetesClient client, TestResourceDTO credential) {
return client.pods()
.inNamespace(credential.getNamespace())
.list().getItems()
.stream()
.filter(s -> RUNNING_PHASE.equals(s.getStatus().getPhase()) &&
StringUtils.isNotBlank(credential.getDeployName()) &&
StringUtils.isNotBlank(s.getMetadata().getGenerateName()) &&
StringUtils.startsWith(s.getMetadata().getGenerateName(), credential.getDeployName()))
.toList();
}
/**
* 执行命令
*
* @param resource 资源
* @param command 命令
*/
protected static void exec(TestResourceDTO resource, Object runRequest, String command) {
KubernetesClient client = getKubernetesClient(resource);
try {
if (runRequest instanceof TaskBatchRequestDTO request) {
// 均分给每一个 Pod
List<Pod> pods = getPods(client, resource);
if (pods.isEmpty()) {
throw new MSException("No available pods found for execution.");
}
// Distribute tasks across nodes
List<TaskBatchRequestDTO> distributedTasks = distributeTasksAmongNodes(request, pods.size(), resource);
// Execute distributed tasks on each pod
for (int i = 0; i < pods.size(); i++) {
Pod pod = pods.get(i);
TaskBatchRequestDTO subTaskRequest = distributedTasks.get(i);
List<String> taskKeys = subTaskRequest.getTaskItems().stream()
.map(taskItem -> taskItem.getReportId() + "_" + taskItem.getResourceId())
.toList();
LogUtils.info("Sending batch tasks to pod {} for execution:\n{}", pod.getMetadata().getName(), taskKeys);
executeCommandOnPod(client, pod, subTaskRequest, command);
}
} else if (runRequest instanceof TaskRequestDTO) {
// 随机一个 Pod 执行
Pod pod = getExecPod(client, resource);
LogUtils.info("Executing task on pod: {}", pod.getMetadata().getName());
executeCommandOnPod(client, pod, runRequest, command);
} else {
// 发送给每一个 Pod
LogUtils.info("Stop tasks [{}] on Pods", runRequest);
List<Pod> nodesList = getPods(client, resource);
for (Pod pod : nodesList) {
executeCommandOnPod(client, pod, runRequest, command);
}
}
} catch (Exception e) {
LogUtils.error("Failed to execute command", e);
client.close();
throw new MSException("Failed to execute command", e);
}
}
/**
* Distributes tasks across nodes for parallel execution.
*/
private static List<TaskBatchRequestDTO> distributeTasksAmongNodes(TaskBatchRequestDTO request, int podCount, TestResourceDTO resource) {
List<TaskBatchRequestDTO> distributedTasks = new ArrayList<>(podCount);
for (int i = 0; i < request.getTaskItems().size(); i++) {
int nodeIndex = i % podCount;
TaskBatchRequestDTO distributeTask;
if (distributedTasks.size() < podCount) {
distributeTask = BeanUtils.copyBean(new TaskBatchRequestDTO(), request);
distributeTask.setTaskItems(new ArrayList<>());
distributedTasks.add(distributeTask);
} else {
distributeTask = distributedTasks.get(nodeIndex);
}
distributeTask.getTaskInfo().setPoolSize(resource.getConcurrentNumber());
distributeTask.getTaskItems().add(request.getTaskItems().get(i));
}
return distributedTasks;
}
/**
* Executes the curl command on a given Kubernetes pod.
*/
private static void executeCommandOnPod(KubernetesClient client, Pod pod, Object runRequest, String command) {
try {
LogUtils.info("Executing command on pod {}: 【{}】", pod.getMetadata().getName(), command);
// Execute the command on the pod
client.pods().inNamespace(client.getNamespace())
.withName(pod.getMetadata().getName())
.redirectingInput()
.writingOutput(System.out)
.writingError(System.err)
.usingListener(new SimpleListener(runRequest, client))
.exec(SHELL_COMMAND, "-c", command);
} catch (Exception e) {
LogUtils.error("Failed to execute command on pod {} ", pod.getMetadata().getName(), e);
}
}
private record SimpleListener(Object runRequest, KubernetesClient client) implements ExecListener {
@Override
public void onOpen() {
LogUtils.info("K8s 开启监听");
}
@Override
public void onFailure(Throwable t, Response response) {
LogUtils.error("K8s 监听失败", t);
if (runRequest != null) {
handleGeneralError(runRequest, t);
}
}
@Override
public void onClose(int code, String reason) {
LogUtils.info("K8s 监听关闭code=" + code + ", reason=" + reason);
// 关闭客户端
client.close();
}
}
private static void handleGeneralError(Object requestObj, Throwable e) {
// 检查请求对象是否为 TaskRequestDTO 类型
if (requestObj instanceof TaskRequestDTO request) {
// 发送结果到 WebSocket如果报告 ID 存在
String reportId = request.getTaskItem().getReportId();
if (WebSocketUtils.has(reportId)) {
SocketMsgDTO socketMsgDTO = new SocketMsgDTO(
reportId,
request.getTaskInfo().getRunMode(),
MsgType.EXEC_END.name(),
e.getMessage()
);
WebSocketUtils.sendMessageSingle(socketMsgDTO);
}
// 尝试获取 KafkaTemplate 并发送任务结果
KafkaTemplate<String, String> kafkaTemplate = CommonBeanFactory.getBean(KafkaTemplate.class);
if (kafkaTemplate != null) {
TaskResultDTO result = buildTaskResult(request, e);
kafkaTemplate.send(KafkaTopicConstants.API_REPORT_TOPIC, JSON.toJSONString(result));
}
}
}
private static TaskResultDTO buildTaskResult(TaskRequestDTO request, Throwable e) {
// 创建并配置 TaskResultDTO
TaskResultDTO result = new TaskResultDTO();
result.setRequest(request);
result.setRequestResults(List.of()); // 空的请求结果列表
result.setHasEnded(true); // 标记任务已结束
// 创建并配置 ProcessResultDTO
ProcessResultDTO processResultDTO = new ProcessResultDTO();
processResultDTO.setStatus(ResultStatus.ERROR.name());
result.setProcessResultDTO(processResultDTO);
result.setConsole(e.getMessage()); // 将异常信息记录到控制台日志
return result;
}
public static boolean validateNamespaceExists(TestResourceDTO testResourceDTO) {
LogUtils.info("Test resource config: {}", testResourceDTO);
try (KubernetesClient kubernetesClient = getKubernetesClient(testResourceDTO)) {
// 校验 KubernetesClient 是否为空避免后续调用产生 NullPointerException
if (kubernetesClient == null) {
throw new IllegalArgumentException("Kubernetes client initialization failed. Please check your configuration.");
}
// 直接获取 pods 并进行非空判断
List<Pod> pods = getPods(kubernetesClient, testResourceDTO);
if (org.apache.commons.collections.CollectionUtils.isEmpty(pods)) {
throw new RuntimeException("No execution pods found for the given resource: " + testResourceDTO.getNamespace());
}
} catch (Exception e) {
LogUtils.error("Failed to validate namespace exists", e);
return false;
}
return true;
}
}

View File

@ -1,5 +1,6 @@
package io.metersphere.system.service; package io.metersphere.system.service;
import io.metersphere.engine.EngineFactory;
import io.metersphere.sdk.constants.HttpMethodConstants; import io.metersphere.sdk.constants.HttpMethodConstants;
import io.metersphere.sdk.constants.OperationLogConstants; import io.metersphere.sdk.constants.OperationLogConstants;
import io.metersphere.sdk.constants.ResourcePoolTypeEnum; import io.metersphere.sdk.constants.ResourcePoolTypeEnum;
@ -11,7 +12,6 @@ import io.metersphere.system.domain.*;
import io.metersphere.system.dto.pool.*; import io.metersphere.system.dto.pool.*;
import io.metersphere.system.dto.sdk.OptionDTO; import io.metersphere.system.dto.sdk.OptionDTO;
import io.metersphere.system.dto.sdk.QueryResourcePoolRequest; import io.metersphere.system.dto.sdk.QueryResourcePoolRequest;
import io.metersphere.system.engine.KubernetesProvider;
import io.metersphere.system.log.constants.OperationLogModule; import io.metersphere.system.log.constants.OperationLogModule;
import io.metersphere.system.log.constants.OperationLogType; import io.metersphere.system.log.constants.OperationLogType;
import io.metersphere.system.log.dto.LogDTO; import io.metersphere.system.log.dto.LogDTO;
@ -96,7 +96,7 @@ public class TestResourcePoolService {
return false; return false;
} }
} else { } else {
return KubernetesProvider.validateNamespaceExists(testResourceDTO); return EngineFactory.validateNamespaceExists(testResourceDTO);
} }
} }