feat(接口测试): 支持k8s批量执行和批量停止

This commit is contained in:
fit2-zhao 2024-10-12 16:30:47 +08:00 committed by Craftsman
parent 258178f369
commit bce9c9d9bb
5 changed files with 309 additions and 125 deletions

View File

@ -22,6 +22,7 @@ public class KubernetesExecEngine implements ApiEngine {
*/ */
private final Object request; private final Object request;
private final TestResourceDTO resource; private final TestResourceDTO resource;
private final String optToken;
/** /**
* 单调执行构造函数 * 单调执行构造函数
@ -29,9 +30,10 @@ public class KubernetesExecEngine implements ApiEngine {
* @param request * @param request
* @param resource * @param resource
*/ */
public KubernetesExecEngine(TaskRequestDTO request, TestResourceDTO resource) { public KubernetesExecEngine(TaskRequestDTO request, TestResourceDTO resource, String optToken) {
this.request = request; this.request = request;
this.resource = resource; this.resource = resource;
this.optToken = optToken;
} }
/** /**
@ -40,9 +42,10 @@ public class KubernetesExecEngine implements ApiEngine {
* @param batchRequestDTO * @param batchRequestDTO
* @param resource * @param resource
*/ */
public KubernetesExecEngine(TaskBatchRequestDTO batchRequestDTO, TestResourceDTO resource) { public KubernetesExecEngine(TaskBatchRequestDTO batchRequestDTO, TestResourceDTO resource, String optToken) {
this.resource = resource; this.resource = resource;
this.request = batchRequestDTO; this.request = batchRequestDTO;
this.optToken = optToken;
} }
/** /**
@ -51,21 +54,22 @@ public class KubernetesExecEngine implements ApiEngine {
* @param reportIds * @param reportIds
* @param resource * @param resource
*/ */
public KubernetesExecEngine(List<String> reportIds, TestResourceDTO resource) { public KubernetesExecEngine(List<String> reportIds, TestResourceDTO resource, String optToken) {
this.resource = resource; this.resource = resource;
this.request = reportIds; this.request = reportIds;
this.optToken = optToken;
} }
@Override @Override
public void execute(String command) { public void execute(String path) {
// 初始化任务 // 初始化任务
LogUtils.info("CURL 命令:【 " + command + ""); LogUtils.info("CURL 执行方法:【 " + path + "");
this.runApi(command, request); this.runApi(path, request);
} }
private void runApi(String command, Object request) { private void runApi(String apiPath, Object request) {
try { try {
KubernetesProvider.exec(resource, request, command); KubernetesProvider.exec(resource, request, apiPath, optToken);
} catch (HttpServerErrorException e) { } catch (HttpServerErrorException e) {
handleHttpServerError(e); handleHttpServerError(e);
} catch (Exception e) { } catch (Exception e) {

View File

@ -11,17 +11,16 @@ import io.metersphere.sdk.constants.ResultStatus;
import io.metersphere.sdk.dto.SocketMsgDTO; import io.metersphere.sdk.dto.SocketMsgDTO;
import io.metersphere.sdk.dto.api.result.ProcessResultDTO; import io.metersphere.sdk.dto.api.result.ProcessResultDTO;
import io.metersphere.sdk.dto.api.result.TaskResultDTO; import io.metersphere.sdk.dto.api.result.TaskResultDTO;
import io.metersphere.sdk.dto.api.task.TaskBatchRequestDTO;
import io.metersphere.sdk.dto.api.task.TaskRequestDTO; import io.metersphere.sdk.dto.api.task.TaskRequestDTO;
import io.metersphere.sdk.exception.MSException; import io.metersphere.sdk.exception.MSException;
import io.metersphere.sdk.util.CommonBeanFactory; import io.metersphere.sdk.util.*;
import io.metersphere.sdk.util.JSON;
import io.metersphere.sdk.util.LogUtils;
import io.metersphere.sdk.util.WebSocketUtils;
import io.metersphere.system.dto.pool.TestResourceDTO; import io.metersphere.system.dto.pool.TestResourceDTO;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
@ -29,6 +28,7 @@ public class KubernetesProvider {
private static final String RUNNING_PHASE = "Running"; private static final String RUNNING_PHASE = "Running";
private static final String SHELL_COMMAND = "sh"; private static final String SHELL_COMMAND = "sh";
private static final String LOCAL_URL = "http://127.0.0.1:8000";
public static KubernetesClient getKubernetesClient(TestResourceDTO credential) { public static KubernetesClient getKubernetesClient(TestResourceDTO credential) {
ConfigBuilder configBuilder = new ConfigBuilder() ConfigBuilder configBuilder = new ConfigBuilder()
@ -43,39 +43,111 @@ public class KubernetesProvider {
} }
public static Pod getExecPod(KubernetesClient client, TestResourceDTO credential) { public static Pod getExecPod(KubernetesClient client, TestResourceDTO credential) {
List<Pod> nodePods = client.pods() List<Pod> nodePods = getPods(client, credential);
.inNamespace(credential.getNamespace())
.list().getItems()
.stream()
.filter(s -> RUNNING_PHASE.equals(s.getStatus().getPhase()) && StringUtils.startsWith(s.getMetadata().getGenerateName(), "task-runner"))
.toList();
if (CollectionUtils.isEmpty(nodePods)) { if (CollectionUtils.isEmpty(nodePods)) {
throw new MSException("Execution node not found"); throw new MSException("Execution node not found");
} }
return nodePods.get(ThreadLocalRandom.current().nextInt(nodePods.size())); return nodePods.get(ThreadLocalRandom.current().nextInt(nodePods.size()));
} }
public static List<Pod> getPods(KubernetesClient client, TestResourceDTO credential) {
return client.pods()
.inNamespace(credential.getNamespace())
.list().getItems()
.stream()
.filter(s -> RUNNING_PHASE.equals(s.getStatus().getPhase()) && StringUtils.startsWith(s.getMetadata().getGenerateName(), "task-runner"))
.toList();
}
/** /**
* 执行命令 * 执行命令
* *
* @param resource * @param resource
* @param command * @param apiPath
*/ */
protected static void exec(TestResourceDTO resource, Object runRequest, String command) { protected static void exec(TestResourceDTO resource, Object runRequest, String apiPath, String optToken) {
try (KubernetesClient client = getKubernetesClient(resource)) { try (KubernetesClient client = getKubernetesClient(resource)) {
if (runRequest instanceof TaskBatchRequestDTO request) {
// 均分给每一个 Pod
List<Pod> pods = getPods(client, resource);
if (pods.isEmpty()) {
throw new MSException("No available pods found for execution.");
}
// Distribute tasks across nodes
List<TaskBatchRequestDTO> distributedTasks = distributeTasksAmongNodes(request, pods.size(), resource);
// Execute distributed tasks on each pod
for (int i = 0; i < pods.size(); i++) {
Pod pod = pods.get(i);
TaskBatchRequestDTO subTaskRequest = distributedTasks.get(i);
List<String> taskKeys = subTaskRequest.getTaskItems().stream()
.map(taskItem -> taskItem.getReportId() + "_" + taskItem.getResourceId())
.toList();
LogUtils.info("Sending batch tasks to pod {} for execution:\n{}", pod.getMetadata().getName(), taskKeys);
executeCommandOnPod(client, pod, subTaskRequest, apiPath, optToken);
}
} else if (runRequest instanceof TaskRequestDTO) {
// 随机一个 Pod 执行
Pod pod = getExecPod(client, resource); Pod pod = getExecPod(client, resource);
LogUtils.info("当前执行 Pod" + pod.getMetadata().getName() + ""); LogUtils.info("Executing task on pod: {}", pod.getMetadata().getName());
LogUtils.info("执行命令:【 " + command + ""); executeCommandOnPod(client, pod, runRequest, apiPath, optToken);
// 同步执行命令 } else {
// 发送给每一个 Pod
LogUtils.info("Stop tasks [{}] on Pods", runRequest);
List<Pod> nodesList = getPods(client, resource);
for (Pod pod : nodesList) {
executeCommandOnPod(client, pod, runRequest, apiPath, optToken);
}
}
} catch (Exception e) {
LogUtils.error("Failed to execute tasks on Kubernetes.", e);
}
}
/**
* Distributes tasks across nodes for parallel execution.
*/
private static List<TaskBatchRequestDTO> distributeTasksAmongNodes(TaskBatchRequestDTO request, int podCount, TestResourceDTO resource) {
List<TaskBatchRequestDTO> distributedTasks = new ArrayList<>(podCount);
for (int i = 0; i < request.getTaskItems().size(); i++) {
int nodeIndex = i % podCount;
TaskBatchRequestDTO distributeTask;
if (distributedTasks.size() < podCount) {
distributeTask = BeanUtils.copyBean(new TaskBatchRequestDTO(), request);
distributeTask.setTaskItems(new ArrayList<>());
distributedTasks.add(distributeTask);
} else {
distributeTask = distributedTasks.get(nodeIndex);
}
distributeTask.getTaskInfo().setPoolSize(resource.getConcurrentNumber());
distributeTask.getTaskItems().add(request.getTaskItems().get(i));
}
return distributedTasks;
}
/**
* Executes the curl command on a given Kubernetes pod.
*/
private static void executeCommandOnPod(KubernetesClient client, Pod pod, Object runRequest, String apiPath, String optToken) {
try {
String command = buildCurlCommand(apiPath, runRequest, optToken);
LogUtils.info("Executing command on pod {}: 【{}】", pod.getMetadata().getName(), command);
// Execute the command on the pod
client.pods().inNamespace(client.getNamespace()) client.pods().inNamespace(client.getNamespace())
.withName(pod.getMetadata().getName()) .withName(pod.getMetadata().getName())
.redirectingInput() .redirectingInput()
.writingOutput(System.out) .writingOutput(System.out)
.writingError(System.err) .writingError(System.err)
.withTTY()
.usingListener(new SimpleListener(runRequest)) .usingListener(new SimpleListener(runRequest))
.exec(SHELL_COMMAND, "-c", command + StringUtils.LF); .exec(SHELL_COMMAND, "-c", command);
} catch (Exception e) {
LogUtils.error("Failed to execute command on pod {} ", pod.getMetadata().getName(), e);
} }
} }
@ -89,7 +161,6 @@ public class KubernetesProvider {
public void onFailure(Throwable t, Response response) { public void onFailure(Throwable t, Response response) {
LogUtils.error("K8s 监听失败", t); LogUtils.error("K8s 监听失败", t);
if (runRequest != null) { if (runRequest != null) {
LogUtils.info("请求参数:{}", JSON.toJSONString(runRequest));
handleGeneralError(runRequest, t); handleGeneralError(runRequest, t);
return; return;
} }
@ -143,4 +214,25 @@ public class KubernetesProvider {
return result; return result;
} }
private static String buildCurlCommand(String path, Object request, String optToken) {
return String.format(
"curl -H \"Accept: application/json\" " +
"-H \"Content-type: application/json\" " +
"-H \"otp-token: %s\" " +
"-X POST -d '%s' " +
"--connect-timeout %d " +
"--max-time %d " +
"--retry %d " +
"%s%s",
optToken, // otp-token
JSON.toFormatJSONString(request), // 请求体
30, // 连接超时
120, // 最大时间
3, // 重试次数
LOCAL_URL, // 本地 URL
path // 具体 API 路径
);
}
} }

View File

@ -288,7 +288,18 @@ public class ApiExecuteService {
// 如果资源池配置了当前站点则使用资源池的 // 如果资源池配置了当前站点则使用资源池的
taskInfo.setMsUrl(testResourcePool.getServerUrl()); taskInfo.setMsUrl(testResourcePool.getServerUrl());
} }
// 判断是否为 K8S 资源池
boolean isK8SResourcePool = StringUtils.equals(testResourcePool.getType(), ResourcePoolTypeEnum.K8S.name());
if (isK8SResourcePool) {
TestResourceDTO testResourceDTO = new TestResourceDTO();
BeanUtils.copyBean(testResourceDTO, testResourcePool.getTestResourceReturnDTO());
taskInfo.setPoolSize(testResourceDTO.getConcurrentNumber());
try {
EngineFactory.batchRunApi(taskRequest, testResourceDTO);
} catch (Exception e) {
LogUtils.error(e);
}
} else {
// 将任务按资源池的数量拆分 // 将任务按资源池的数量拆分
List<TestResourceNodeDTO> nodesList = testResourcePool.getTestResourceReturnDTO().getNodesList(); List<TestResourceNodeDTO> nodesList = testResourcePool.getTestResourceReturnDTO().getNodesList();
List<TaskBatchRequestDTO> distributeTasks = new ArrayList<>(nodesList.size()); List<TaskBatchRequestDTO> distributeTasks = new ArrayList<>(nodesList.size());
@ -305,7 +316,6 @@ public class ApiExecuteService {
distributeTask.getTaskInfo().setPoolSize(nodesList.get(nodeIndex).getConcurrentNumber()); distributeTask.getTaskInfo().setPoolSize(nodesList.get(nodeIndex).getConcurrentNumber());
distributeTask.getTaskItems().add(taskRequest.getTaskItems().get(i)); distributeTask.getTaskItems().add(taskRequest.getTaskItems().get(i));
} }
for (int i = 0; i < nodesList.size(); i++) { for (int i = 0; i < nodesList.size(); i++) {
// todo 优化某个资源池不可用的情况以及清理 executionSet // todo 优化某个资源池不可用的情况以及清理 executionSet
TestResourceNodeDTO testResourceNode = nodesList.get(i); TestResourceNodeDTO testResourceNode = nodesList.get(i);
@ -316,7 +326,6 @@ public class ApiExecuteService {
.map(taskItem -> taskItem.getReportId() + "_" + taskItem.getResourceId()) .map(taskItem -> taskItem.getReportId() + "_" + taskItem.getResourceId())
.toList(); .toList();
LogUtils.info("开始发送批量任务到 {} 节点执行:\n" + taskKeys, endpoint); LogUtils.info("开始发送批量任务到 {} 节点执行:\n" + taskKeys, endpoint);
MsHttpClient.batchRunApi(endpoint, subTaskRequest); MsHttpClient.batchRunApi(endpoint, subTaskRequest);
} catch (Exception e) { } catch (Exception e) {
LogUtils.error("发送批量任务到 {} 节点执行失败", endpoint); LogUtils.error("发送批量任务到 {} 节点执行失败", endpoint);
@ -324,6 +333,7 @@ public class ApiExecuteService {
} }
} }
} }
}
protected static boolean validate() { protected static boolean validate() {
try { try {

View File

@ -6,6 +6,7 @@ import io.metersphere.api.dto.definition.ExecuteReportDTO;
import io.metersphere.api.dto.report.ReportDTO; import io.metersphere.api.dto.report.ReportDTO;
import io.metersphere.api.mapper.ExtApiReportMapper; import io.metersphere.api.mapper.ExtApiReportMapper;
import io.metersphere.api.mapper.ExtApiScenarioReportMapper; import io.metersphere.api.mapper.ExtApiScenarioReportMapper;
import io.metersphere.engine.EngineFactory;
import io.metersphere.engine.MsHttpClient; import io.metersphere.engine.MsHttpClient;
import io.metersphere.project.domain.Project; import io.metersphere.project.domain.Project;
import io.metersphere.project.mapper.ProjectMapper; import io.metersphere.project.mapper.ProjectMapper;
@ -19,6 +20,7 @@ import io.metersphere.sdk.exception.MSException;
import io.metersphere.sdk.util.*; import io.metersphere.sdk.util.*;
import io.metersphere.system.domain.Organization; import io.metersphere.system.domain.Organization;
import io.metersphere.system.dto.builder.LogDTOBuilder; import io.metersphere.system.dto.builder.LogDTOBuilder;
import io.metersphere.system.dto.pool.TestResourceDTO;
import io.metersphere.system.dto.pool.TestResourceNodeDTO; import io.metersphere.system.dto.pool.TestResourceNodeDTO;
import io.metersphere.system.dto.pool.TestResourcePoolReturnDTO; import io.metersphere.system.dto.pool.TestResourcePoolReturnDTO;
import io.metersphere.system.dto.sdk.OptionDTO; import io.metersphere.system.dto.sdk.OptionDTO;
@ -272,63 +274,72 @@ public class ApiTaskCenterService {
String userId, String userId,
String module) { String module) {
Map<String, List<String>> poolIdMap = reports.stream() Map<String, List<String>> poolIdMap = reports.stream()
.collect(Collectors.groupingBy(ReportDTO::getPoolId, Collectors.mapping(ReportDTO::getId, Collectors.toList()))); .collect(Collectors.groupingBy(ReportDTO::getPoolId,
Collectors.mapping(ReportDTO::getId, Collectors.toList())));
poolIdMap.forEach((poolId, reportList) -> { poolIdMap.forEach((poolId, reportList) -> {
TestResourcePoolReturnDTO testResourcePoolDTO = testResourcePoolService.getTestResourcePoolDetail(poolId); TestResourcePoolReturnDTO testResourcePoolDTO = testResourcePoolService.getTestResourcePoolDetail(poolId);
List<TestResourceNodeDTO> nodesList = testResourcePoolDTO.getTestResourceReturnDTO().getNodesList();
if (CollectionUtils.isNotEmpty(nodesList)) {
stopTask(request, reportList, nodesList, reports);
}
});
// 保存日志 获取所有的reportId
List<String> reportIds = reports.stream().map(ReportDTO::getId).toList();
SubListUtils.dealForSubList(reportIds, 100, (subList) -> {
if (request.getModuleType().equals(TaskCenterResourceType.API_CASE.toString())) {
//记录日志
saveLog(subList, userId, StringUtils.join(module, "_REAL_TIME_API_CASE"), TaskCenterResourceType.API_CASE.toString());
} else if (request.getModuleType().equals(TaskCenterResourceType.API_SCENARIO.toString())) {
saveLog(subList, userId, StringUtils.join(module, "_REAL_TIME_API_SCENARIO"), TaskCenterResourceType.API_SCENARIO.toString());
}
});
}
public void stopTask(TaskCenterBatchRequest request, // Remove excluded report IDs
List<String> reportList,
List<TestResourceNodeDTO> nodesList,
List<ReportDTO> reports) {
// 根据报告id分组
Map<String, Boolean> integrationMap = reports.stream()
.collect(Collectors.toMap(ReportDTO::getId, ReportDTO::getIntegrated));
Map<String, String> resourceIdMap = reports.stream()
.collect(Collectors.toMap(ReportDTO::getId, ReportDTO::getResourceId));
Map<String, String> testPlanIdMap = reports.stream()
.collect(Collectors.toMap(ReportDTO::getId, ReportDTO::getTestPlanId));
// 如果需要排除的报告ID不为空则从reportList中移除
if (request.getExcludeIds() != null && !request.getExcludeIds().isEmpty()) { if (request.getExcludeIds() != null && !request.getExcludeIds().isEmpty()) {
reportList.removeAll(request.getExcludeIds()); reportList.removeAll(request.getExcludeIds());
} }
nodesList.parallelStream().forEach(node -> { boolean isK8SResourcePool = StringUtils.equals(testResourcePoolDTO.getType(), ResourcePoolTypeEnum.K8S.name());
String endpoint = MsHttpClient.getEndpoint(node.getIp(), node.getPort());
// 初始化 TaskRequestDTO TaskResultDTO if (isK8SResourcePool) {
handleK8STask(request, reports, reportList, testResourcePoolDTO);
} else {
List<TestResourceNodeDTO> nodesList = testResourcePoolDTO.getTestResourceReturnDTO().getNodesList();
if (CollectionUtils.isNotEmpty(nodesList)) {
stopTask(request, reportList, nodesList, reports);
}
}
});
logReports(request, reports, userId, module);
}
private void handleK8STask(TaskCenterBatchRequest request, List<ReportDTO> reports,
List<String> reportList, TestResourcePoolReturnDTO testResourcePoolDTO) {
TaskRequestDTO taskRequestDTO = new TaskRequestDTO(); TaskRequestDTO taskRequestDTO = new TaskRequestDTO();
TaskResultDTO result = createStoppedTaskResult();
// Prepare mapping for integration and resource IDs
Map<String, Boolean> integrationMap = prepareIntegrationMap(reports);
Map<String, String> resourceIdMap = prepareResourceIdMap(reports);
Map<String, String> testPlanIdMap = prepareTestPlanIdMap(reports);
SubListUtils.dealForSubList(reportList, 100, subList -> {
try {
TestResourceDTO testResourceDTO = new TestResourceDTO();
BeanUtils.copyBean(testResourceDTO, testResourcePoolDTO.getTestResourceReturnDTO());
EngineFactory.stopApi(subList, testResourceDTO);
} catch (Exception e) {
LogUtils.error(e);
} finally {
processSubListReports(subList, request, result, taskRequestDTO, integrationMap, resourceIdMap, testPlanIdMap);
}
});
}
private TaskResultDTO createStoppedTaskResult() {
TaskResultDTO result = new TaskResultDTO(); TaskResultDTO result = new TaskResultDTO();
result.setRequestResults(Collections.emptyList()); result.setRequestResults(Collections.emptyList());
result.setHasEnded(true); result.setHasEnded(true);
ProcessResultDTO processResultDTO = new ProcessResultDTO(); ProcessResultDTO processResultDTO = new ProcessResultDTO();
processResultDTO.setStatus(ExecStatus.STOPPED.name()); processResultDTO.setStatus(ExecStatus.STOPPED.name());
result.setProcessResultDTO(processResultDTO); result.setProcessResultDTO(processResultDTO);
result.setConsole("任务已终止"); result.setConsole("任务已终止");
SubListUtils.dealForSubList(reportList, 100, subList -> { return result;
try { }
LogUtils.info(String.format("开始发送停止请求到 %s 节点执行", endpoint), subList.toString());
MsHttpClient.stopApi(endpoint, subList); private void processSubListReports(List<String> subList, TaskCenterBatchRequest request,
} catch (Exception e) { TaskResultDTO result, TaskRequestDTO taskRequestDTO,
LogUtils.error(e); Map<String, Boolean> integrationMap, Map<String, String> resourceIdMap,
} finally { Map<String, String> testPlanIdMap) {
subList.forEach(reportId -> { subList.forEach(reportId -> {
TaskInfo taskInfo = taskRequestDTO.getTaskInfo(); TaskInfo taskInfo = taskRequestDTO.getTaskInfo();
taskInfo.setResourceType(request.getModuleType()); taskInfo.setResourceType(request.getModuleType());
@ -336,20 +347,14 @@ public class ApiTaskCenterService {
TaskItem taskItem = new TaskItem(); TaskItem taskItem = new TaskItem();
taskItem.setReportId(reportId); taskItem.setReportId(reportId);
taskItem.setResourceId(resourceIdMap.get(reportId)); taskItem.setResourceId(resourceIdMap.get(reportId));
// 设置任务信息的资源类型
// Set resource type based on the test plan ID
String testPlanId = testPlanIdMap.get(reportId); String testPlanId = testPlanIdMap.get(reportId);
if (testPlanId != null && !"NONE".equals(testPlanId)) { if (testPlanId != null && !"NONE".equals(testPlanId)) {
String moduleType = request.getModuleType(); taskInfo.setResourceType(getResourceType(request.getModuleType()));
taskInfo.setResourceType(
TaskCenterResourceType.API_CASE.toString().equals(moduleType)
? ApiExecuteResourceType.TEST_PLAN_API_CASE.name()
: TaskCenterResourceType.API_SCENARIO.toString().equals(moduleType)
? ApiExecuteResourceType.TEST_PLAN_API_SCENARIO.name()
: taskInfo.getResourceType()
);
} }
// 设置集成报告 // Set integrated report information
taskInfo.getRunModeConfig().setIntegratedReport(integrationMap.get(reportId)); taskInfo.getRunModeConfig().setIntegratedReport(integrationMap.get(reportId));
if (Boolean.TRUE.equals(integrationMap.get(reportId))) { if (Boolean.TRUE.equals(integrationMap.get(reportId))) {
taskInfo.getRunModeConfig().getCollectionReport().setReportId(reportId); taskInfo.getRunModeConfig().getCollectionReport().setReportId(reportId);
@ -360,10 +365,69 @@ public class ApiTaskCenterService {
kafkaTemplate.send(KafkaTopicConstants.API_REPORT_TOPIC, JSON.toJSONString(result)); kafkaTemplate.send(KafkaTopicConstants.API_REPORT_TOPIC, JSON.toJSONString(result));
}); });
} }
private String getResourceType(String moduleType) {
return TaskCenterResourceType.API_CASE.toString().equals(moduleType)
? ApiExecuteResourceType.TEST_PLAN_API_CASE.name()
: TaskCenterResourceType.API_SCENARIO.toString().equals(moduleType)
? ApiExecuteResourceType.TEST_PLAN_API_SCENARIO.name()
: "";
}
private void logReports(TaskCenterBatchRequest request, List<ReportDTO> reports,
String userId, String module) {
List<String> reportIds = reports.stream().map(ReportDTO::getId).toList();
SubListUtils.dealForSubList(reportIds, 100, subList -> {
String logPrefix = StringUtils.join(module, "_REAL_TIME_");
String resourceType = request.getModuleType().equals(TaskCenterResourceType.API_CASE.toString())
? TaskCenterResourceType.API_CASE.toString()
: TaskCenterResourceType.API_SCENARIO.toString();
saveLog(subList, userId, logPrefix + resourceType, resourceType);
});
}
public void stopTask(TaskCenterBatchRequest request,
List<String> reportList,
List<TestResourceNodeDTO> nodesList,
List<ReportDTO> reports) {
Map<String, Boolean> integrationMap = prepareIntegrationMap(reports);
Map<String, String> resourceIdMap = prepareResourceIdMap(reports);
Map<String, String> testPlanIdMap = prepareTestPlanIdMap(reports);
// Remove excluded report IDs
if (request.getExcludeIds() != null && !request.getExcludeIds().isEmpty()) {
reportList.removeAll(request.getExcludeIds());
}
nodesList.parallelStream().forEach(node -> {
String endpoint = MsHttpClient.getEndpoint(node.getIp(), node.getPort());
TaskResultDTO result = createStoppedTaskResult();
SubListUtils.dealForSubList(reportList, 100, subList -> {
try {
LogUtils.info(String.format("开始发送停止请求到 %s 节点执行", endpoint), subList.toString());
MsHttpClient.stopApi(endpoint, subList);
} catch (Exception e) {
LogUtils.error(e);
} finally {
processSubListReports(subList, request, result, new TaskRequestDTO(), integrationMap, resourceIdMap, testPlanIdMap);
}
}); });
}); });
} }
private Map<String, Boolean> prepareIntegrationMap(List<ReportDTO> reports) {
return reports.stream().collect(Collectors.toMap(ReportDTO::getId, ReportDTO::getIntegrated));
}
private Map<String, String> prepareResourceIdMap(List<ReportDTO> reports) {
return reports.stream().collect(Collectors.toMap(ReportDTO::getId, ReportDTO::getResourceId));
}
private Map<String, String> prepareTestPlanIdMap(List<ReportDTO> reports) {
return reports.stream().collect(Collectors.toMap(ReportDTO::getId, ReportDTO::getTestPlanId));
}
private void saveLog(List<String> ids, String userId, String module, String type) { private void saveLog(List<String> ids, String userId, String module, String type) {
List<ReportDTO> reports = new ArrayList<>(); List<ReportDTO> reports = new ArrayList<>();

View File

@ -6,19 +6,19 @@ import io.metersphere.api.dto.definition.ExecuteReportDTO;
import io.metersphere.api.dto.report.ReportDTO; import io.metersphere.api.dto.report.ReportDTO;
import io.metersphere.api.mapper.ExtApiScenarioReportMapper; import io.metersphere.api.mapper.ExtApiScenarioReportMapper;
import io.metersphere.api.service.ApiTaskCenterService; import io.metersphere.api.service.ApiTaskCenterService;
import io.metersphere.engine.EngineFactory;
import io.metersphere.engine.MsHttpClient; import io.metersphere.engine.MsHttpClient;
import io.metersphere.plan.mapper.ExtTestPlanReportMapper; import io.metersphere.plan.mapper.ExtTestPlanReportMapper;
import io.metersphere.project.domain.Project; import io.metersphere.project.domain.Project;
import io.metersphere.project.mapper.ProjectMapper; import io.metersphere.project.mapper.ProjectMapper;
import io.metersphere.sdk.constants.PermissionConstants; import io.metersphere.sdk.constants.PermissionConstants;
import io.metersphere.sdk.constants.ResourcePoolTypeEnum;
import io.metersphere.sdk.constants.TaskCenterResourceType; import io.metersphere.sdk.constants.TaskCenterResourceType;
import io.metersphere.sdk.exception.MSException; import io.metersphere.sdk.exception.MSException;
import io.metersphere.sdk.util.DateUtils; import io.metersphere.sdk.util.*;
import io.metersphere.sdk.util.LogUtils;
import io.metersphere.sdk.util.SubListUtils;
import io.metersphere.sdk.util.Translator;
import io.metersphere.system.domain.Organization; import io.metersphere.system.domain.Organization;
import io.metersphere.system.dto.builder.LogDTOBuilder; import io.metersphere.system.dto.builder.LogDTOBuilder;
import io.metersphere.system.dto.pool.TestResourceDTO;
import io.metersphere.system.dto.pool.TestResourceNodeDTO; import io.metersphere.system.dto.pool.TestResourceNodeDTO;
import io.metersphere.system.dto.pool.TestResourcePoolReturnDTO; import io.metersphere.system.dto.pool.TestResourcePoolReturnDTO;
import io.metersphere.system.dto.sdk.OptionDTO; import io.metersphere.system.dto.sdk.OptionDTO;
@ -336,10 +336,24 @@ public class TestPlanTaskCenterService {
.collect(Collectors.groupingBy(ReportDTO::getPoolId, Collectors.mapping(ReportDTO::getId, Collectors.toList()))); .collect(Collectors.groupingBy(ReportDTO::getPoolId, Collectors.mapping(ReportDTO::getId, Collectors.toList())));
poolIdMap.forEach((poolId, reportList) -> { poolIdMap.forEach((poolId, reportList) -> {
TestResourcePoolReturnDTO testResourcePoolDTO = testResourcePoolService.getTestResourcePoolDetail(poolId); TestResourcePoolReturnDTO testResourcePoolDTO = testResourcePoolService.getTestResourcePoolDetail(poolId);
// 判断是否为 K8S 资源池
boolean isK8SResourcePool = StringUtils.equals(testResourcePoolDTO.getType(), ResourcePoolTypeEnum.K8S.name());
if (isK8SResourcePool) {
SubListUtils.dealForSubList(reportList, 100, (subList) -> {
try {
TestResourceDTO testResourceDTO = new TestResourceDTO();
BeanUtils.copyBean(testResourceDTO, testResourcePoolDTO.getTestResourceReturnDTO());
EngineFactory.stopApi(subList, testResourceDTO);
} catch (Exception e) {
LogUtils.error(e);
}
});
} else {
List<TestResourceNodeDTO> nodesList = testResourcePoolDTO.getTestResourceReturnDTO().getNodesList(); List<TestResourceNodeDTO> nodesList = testResourcePoolDTO.getTestResourceReturnDTO().getNodesList();
if (CollectionUtils.isNotEmpty(nodesList)) { if (CollectionUtils.isNotEmpty(nodesList)) {
stopTask(request, reportList, nodesList); stopTask(request, reportList, nodesList);
} }
}
}); });
} }