From bce9c9d9bb18745ace4fe4e04aef2311c2f0d96f Mon Sep 17 00:00:00 2001 From: fit2-zhao Date: Sat, 12 Oct 2024 16:30:47 +0800 Subject: [PATCH] =?UTF-8?q?feat(=E6=8E=A5=E5=8F=A3=E6=B5=8B=E8=AF=95):=20?= =?UTF-8?q?=E6=94=AF=E6=8C=81k8s=E6=89=B9=E9=87=8F=E6=89=A7=E8=A1=8C?= =?UTF-8?q?=E5=92=8C=E6=89=B9=E9=87=8F=E5=81=9C=E6=AD=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../api/engine/KubernetesExecEngine.java | 20 +- .../api/engine/KubernetesProvider.java | 132 +++++++++++-- .../api/service/ApiExecuteService.java | 70 ++++--- .../api/service/ApiTaskCenterService.java | 184 ++++++++++++------ .../service/TestPlanTaskCenterService.java | 28 ++- 5 files changed, 309 insertions(+), 125 deletions(-) diff --git a/backend/services/api-test/src/main/java/io/metersphere/api/engine/KubernetesExecEngine.java b/backend/services/api-test/src/main/java/io/metersphere/api/engine/KubernetesExecEngine.java index b94acd829b..84c42781a7 100644 --- a/backend/services/api-test/src/main/java/io/metersphere/api/engine/KubernetesExecEngine.java +++ b/backend/services/api-test/src/main/java/io/metersphere/api/engine/KubernetesExecEngine.java @@ -22,6 +22,7 @@ public class KubernetesExecEngine implements ApiEngine { */ private final Object request; private final TestResourceDTO resource; + private final String optToken; /** * 单调执行构造函数 @@ -29,9 +30,10 @@ public class KubernetesExecEngine implements ApiEngine { * @param request * @param resource */ - public KubernetesExecEngine(TaskRequestDTO request, TestResourceDTO resource) { + public KubernetesExecEngine(TaskRequestDTO request, TestResourceDTO resource, String optToken) { this.request = request; this.resource = resource; + this.optToken = optToken; } /** @@ -40,9 +42,10 @@ public class KubernetesExecEngine implements ApiEngine { * @param batchRequestDTO * @param resource */ - public KubernetesExecEngine(TaskBatchRequestDTO batchRequestDTO, TestResourceDTO resource) { + public KubernetesExecEngine(TaskBatchRequestDTO batchRequestDTO, TestResourceDTO resource, String optToken) { this.resource = resource; this.request = batchRequestDTO; + this.optToken = optToken; } /** @@ -51,21 +54,22 @@ public class KubernetesExecEngine implements ApiEngine { * @param reportIds * @param resource */ - public KubernetesExecEngine(List reportIds, TestResourceDTO resource) { + public KubernetesExecEngine(List reportIds, TestResourceDTO resource, String optToken) { this.resource = resource; this.request = reportIds; + this.optToken = optToken; } @Override - public void execute(String command) { + public void execute(String path) { // 初始化任务 - LogUtils.info("CURL 命令:【 " + command + " 】"); - this.runApi(command, request); + LogUtils.info("CURL 执行方法:【 " + path + " 】"); + this.runApi(path, request); } - private void runApi(String command, Object request) { + private void runApi(String apiPath, Object request) { try { - KubernetesProvider.exec(resource, request, command); + KubernetesProvider.exec(resource, request, apiPath, optToken); } catch (HttpServerErrorException e) { handleHttpServerError(e); } catch (Exception e) { diff --git a/backend/services/api-test/src/main/java/io/metersphere/api/engine/KubernetesProvider.java b/backend/services/api-test/src/main/java/io/metersphere/api/engine/KubernetesProvider.java index 0ec26051ff..eb79d6d17d 100644 --- a/backend/services/api-test/src/main/java/io/metersphere/api/engine/KubernetesProvider.java +++ b/backend/services/api-test/src/main/java/io/metersphere/api/engine/KubernetesProvider.java @@ -11,17 +11,16 @@ import io.metersphere.sdk.constants.ResultStatus; import io.metersphere.sdk.dto.SocketMsgDTO; import io.metersphere.sdk.dto.api.result.ProcessResultDTO; import io.metersphere.sdk.dto.api.result.TaskResultDTO; +import io.metersphere.sdk.dto.api.task.TaskBatchRequestDTO; import io.metersphere.sdk.dto.api.task.TaskRequestDTO; import io.metersphere.sdk.exception.MSException; -import io.metersphere.sdk.util.CommonBeanFactory; -import io.metersphere.sdk.util.JSON; -import io.metersphere.sdk.util.LogUtils; -import io.metersphere.sdk.util.WebSocketUtils; +import io.metersphere.sdk.util.*; import io.metersphere.system.dto.pool.TestResourceDTO; import org.apache.commons.lang3.StringUtils; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.util.CollectionUtils; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.ThreadLocalRandom; @@ -29,6 +28,7 @@ public class KubernetesProvider { private static final String RUNNING_PHASE = "Running"; private static final String SHELL_COMMAND = "sh"; + private static final String LOCAL_URL = "http://127.0.0.1:8000"; public static KubernetesClient getKubernetesClient(TestResourceDTO credential) { ConfigBuilder configBuilder = new ConfigBuilder() @@ -43,39 +43,111 @@ public class KubernetesProvider { } public static Pod getExecPod(KubernetesClient client, TestResourceDTO credential) { - List nodePods = client.pods() - .inNamespace(credential.getNamespace()) - .list().getItems() - .stream() - .filter(s -> RUNNING_PHASE.equals(s.getStatus().getPhase()) && StringUtils.startsWith(s.getMetadata().getGenerateName(), "task-runner")) - .toList(); - + List nodePods = getPods(client, credential); if (CollectionUtils.isEmpty(nodePods)) { throw new MSException("Execution node not found"); } return nodePods.get(ThreadLocalRandom.current().nextInt(nodePods.size())); } + public static List getPods(KubernetesClient client, TestResourceDTO credential) { + return client.pods() + .inNamespace(credential.getNamespace()) + .list().getItems() + .stream() + .filter(s -> RUNNING_PHASE.equals(s.getStatus().getPhase()) && StringUtils.startsWith(s.getMetadata().getGenerateName(), "task-runner")) + .toList(); + } + /** * 执行命令 * * @param resource - * @param command + * @param apiPath */ - protected static void exec(TestResourceDTO resource, Object runRequest, String command) { + protected static void exec(TestResourceDTO resource, Object runRequest, String apiPath, String optToken) { try (KubernetesClient client = getKubernetesClient(resource)) { - Pod pod = getExecPod(client, resource); - LogUtils.info("当前执行 Pod:【 " + pod.getMetadata().getName() + " 】"); - LogUtils.info("执行命令:【 " + command + " 】"); - // 同步执行命令 + + if (runRequest instanceof TaskBatchRequestDTO request) { + // 均分给每一个 Pod + List pods = getPods(client, resource); + if (pods.isEmpty()) { + throw new MSException("No available pods found for execution."); + } + + // Distribute tasks across nodes + List distributedTasks = distributeTasksAmongNodes(request, pods.size(), resource); + + // Execute distributed tasks on each pod + for (int i = 0; i < pods.size(); i++) { + Pod pod = pods.get(i); + TaskBatchRequestDTO subTaskRequest = distributedTasks.get(i); + List taskKeys = subTaskRequest.getTaskItems().stream() + .map(taskItem -> taskItem.getReportId() + "_" + taskItem.getResourceId()) + .toList(); + + LogUtils.info("Sending batch tasks to pod {} for execution:\n{}", pod.getMetadata().getName(), taskKeys); + executeCommandOnPod(client, pod, subTaskRequest, apiPath, optToken); + } + } else if (runRequest instanceof TaskRequestDTO) { + // 随机一个 Pod 执行 + Pod pod = getExecPod(client, resource); + LogUtils.info("Executing task on pod: {}", pod.getMetadata().getName()); + executeCommandOnPod(client, pod, runRequest, apiPath, optToken); + } else { + // 发送给每一个 Pod + LogUtils.info("Stop tasks [{}] on Pods", runRequest); + List nodesList = getPods(client, resource); + for (Pod pod : nodesList) { + executeCommandOnPod(client, pod, runRequest, apiPath, optToken); + } + } + } catch (Exception e) { + LogUtils.error("Failed to execute tasks on Kubernetes.", e); + } + } + + /** + * Distributes tasks across nodes for parallel execution. + */ + private static List distributeTasksAmongNodes(TaskBatchRequestDTO request, int podCount, TestResourceDTO resource) { + List distributedTasks = new ArrayList<>(podCount); + + for (int i = 0; i < request.getTaskItems().size(); i++) { + int nodeIndex = i % podCount; + TaskBatchRequestDTO distributeTask; + if (distributedTasks.size() < podCount) { + distributeTask = BeanUtils.copyBean(new TaskBatchRequestDTO(), request); + distributeTask.setTaskItems(new ArrayList<>()); + distributedTasks.add(distributeTask); + } else { + distributeTask = distributedTasks.get(nodeIndex); + } + distributeTask.getTaskInfo().setPoolSize(resource.getConcurrentNumber()); + distributeTask.getTaskItems().add(request.getTaskItems().get(i)); + } + return distributedTasks; + } + + /** + * Executes the curl command on a given Kubernetes pod. + */ + private static void executeCommandOnPod(KubernetesClient client, Pod pod, Object runRequest, String apiPath, String optToken) { + try { + String command = buildCurlCommand(apiPath, runRequest, optToken); + + LogUtils.info("Executing command on pod {}: 【{}】", pod.getMetadata().getName(), command); + + // Execute the command on the pod client.pods().inNamespace(client.getNamespace()) .withName(pod.getMetadata().getName()) .redirectingInput() .writingOutput(System.out) .writingError(System.err) - .withTTY() .usingListener(new SimpleListener(runRequest)) - .exec(SHELL_COMMAND, "-c", command + StringUtils.LF); + .exec(SHELL_COMMAND, "-c", command); + } catch (Exception e) { + LogUtils.error("Failed to execute command on pod {} ", pod.getMetadata().getName(), e); } } @@ -89,7 +161,6 @@ public class KubernetesProvider { public void onFailure(Throwable t, Response response) { LogUtils.error("K8s 监听失败", t); if (runRequest != null) { - LogUtils.info("请求参数:{}", JSON.toJSONString(runRequest)); handleGeneralError(runRequest, t); return; } @@ -143,4 +214,25 @@ public class KubernetesProvider { return result; } + + private static String buildCurlCommand(String path, Object request, String optToken) { + return String.format( + "curl -H \"Accept: application/json\" " + + "-H \"Content-type: application/json\" " + + "-H \"otp-token: %s\" " + + "-X POST -d '%s' " + + "--connect-timeout %d " + + "--max-time %d " + + "--retry %d " + + "%s%s", + optToken, // otp-token + JSON.toFormatJSONString(request), // 请求体 + 30, // 连接超时(秒) + 120, // 最大时间(秒) + 3, // 重试次数 + LOCAL_URL, // 本地 URL + path // 具体 API 路径 + ); + } + } diff --git a/backend/services/api-test/src/main/java/io/metersphere/api/service/ApiExecuteService.java b/backend/services/api-test/src/main/java/io/metersphere/api/service/ApiExecuteService.java index 7ccf3a40ad..f3ccad2ba5 100644 --- a/backend/services/api-test/src/main/java/io/metersphere/api/service/ApiExecuteService.java +++ b/backend/services/api-test/src/main/java/io/metersphere/api/service/ApiExecuteService.java @@ -288,40 +288,50 @@ public class ApiExecuteService { // 如果资源池配置了当前站点,则使用资源池的 taskInfo.setMsUrl(testResourcePool.getServerUrl()); } - - // 将任务按资源池的数量拆分 - List nodesList = testResourcePool.getTestResourceReturnDTO().getNodesList(); - List distributeTasks = new ArrayList<>(nodesList.size()); - for (int i = 0; i < taskRequest.getTaskItems().size(); i++) { - TaskBatchRequestDTO distributeTask; - int nodeIndex = i % nodesList.size(); - if (distributeTasks.size() < nodesList.size()) { - distributeTask = BeanUtils.copyBean(new TaskBatchRequestDTO(), taskRequest); - distributeTask.setTaskItems(new ArrayList<>()); - distributeTasks.add(distributeTask); - } else { - distributeTask = distributeTasks.get(nodeIndex); - } - distributeTask.getTaskInfo().setPoolSize(nodesList.get(nodeIndex).getConcurrentNumber()); - distributeTask.getTaskItems().add(taskRequest.getTaskItems().get(i)); - } - - for (int i = 0; i < nodesList.size(); i++) { - // todo 优化某个资源池不可用的情况,以及清理 executionSet - TestResourceNodeDTO testResourceNode = nodesList.get(i); - TaskBatchRequestDTO subTaskRequest = distributeTasks.get(i); - String endpoint = MsHttpClient.getEndpoint(testResourceNode.getIp(), testResourceNode.getPort()); + // 判断是否为 K8S 资源池 + boolean isK8SResourcePool = StringUtils.equals(testResourcePool.getType(), ResourcePoolTypeEnum.K8S.name()); + if (isK8SResourcePool) { + TestResourceDTO testResourceDTO = new TestResourceDTO(); + BeanUtils.copyBean(testResourceDTO, testResourcePool.getTestResourceReturnDTO()); + taskInfo.setPoolSize(testResourceDTO.getConcurrentNumber()); try { - List taskKeys = subTaskRequest.getTaskItems().stream() - .map(taskItem -> taskItem.getReportId() + "_" + taskItem.getResourceId()) - .toList(); - LogUtils.info("开始发送批量任务到 {} 节点执行:\n" + taskKeys, endpoint); - - MsHttpClient.batchRunApi(endpoint, subTaskRequest); + EngineFactory.batchRunApi(taskRequest, testResourceDTO); } catch (Exception e) { - LogUtils.error("发送批量任务到 {} 节点执行失败", endpoint); LogUtils.error(e); } + } else { + // 将任务按资源池的数量拆分 + List nodesList = testResourcePool.getTestResourceReturnDTO().getNodesList(); + List distributeTasks = new ArrayList<>(nodesList.size()); + for (int i = 0; i < taskRequest.getTaskItems().size(); i++) { + TaskBatchRequestDTO distributeTask; + int nodeIndex = i % nodesList.size(); + if (distributeTasks.size() < nodesList.size()) { + distributeTask = BeanUtils.copyBean(new TaskBatchRequestDTO(), taskRequest); + distributeTask.setTaskItems(new ArrayList<>()); + distributeTasks.add(distributeTask); + } else { + distributeTask = distributeTasks.get(nodeIndex); + } + distributeTask.getTaskInfo().setPoolSize(nodesList.get(nodeIndex).getConcurrentNumber()); + distributeTask.getTaskItems().add(taskRequest.getTaskItems().get(i)); + } + for (int i = 0; i < nodesList.size(); i++) { + // todo 优化某个资源池不可用的情况,以及清理 executionSet + TestResourceNodeDTO testResourceNode = nodesList.get(i); + TaskBatchRequestDTO subTaskRequest = distributeTasks.get(i); + String endpoint = MsHttpClient.getEndpoint(testResourceNode.getIp(), testResourceNode.getPort()); + try { + List taskKeys = subTaskRequest.getTaskItems().stream() + .map(taskItem -> taskItem.getReportId() + "_" + taskItem.getResourceId()) + .toList(); + LogUtils.info("开始发送批量任务到 {} 节点执行:\n" + taskKeys, endpoint); + MsHttpClient.batchRunApi(endpoint, subTaskRequest); + } catch (Exception e) { + LogUtils.error("发送批量任务到 {} 节点执行失败", endpoint); + LogUtils.error(e); + } + } } } diff --git a/backend/services/api-test/src/main/java/io/metersphere/api/service/ApiTaskCenterService.java b/backend/services/api-test/src/main/java/io/metersphere/api/service/ApiTaskCenterService.java index 16cbae1f61..08c394acc3 100644 --- a/backend/services/api-test/src/main/java/io/metersphere/api/service/ApiTaskCenterService.java +++ b/backend/services/api-test/src/main/java/io/metersphere/api/service/ApiTaskCenterService.java @@ -6,6 +6,7 @@ import io.metersphere.api.dto.definition.ExecuteReportDTO; import io.metersphere.api.dto.report.ReportDTO; import io.metersphere.api.mapper.ExtApiReportMapper; import io.metersphere.api.mapper.ExtApiScenarioReportMapper; +import io.metersphere.engine.EngineFactory; import io.metersphere.engine.MsHttpClient; import io.metersphere.project.domain.Project; import io.metersphere.project.mapper.ProjectMapper; @@ -19,6 +20,7 @@ import io.metersphere.sdk.exception.MSException; import io.metersphere.sdk.util.*; import io.metersphere.system.domain.Organization; import io.metersphere.system.dto.builder.LogDTOBuilder; +import io.metersphere.system.dto.pool.TestResourceDTO; import io.metersphere.system.dto.pool.TestResourceNodeDTO; import io.metersphere.system.dto.pool.TestResourcePoolReturnDTO; import io.metersphere.system.dto.sdk.OptionDTO; @@ -272,55 +274,135 @@ public class ApiTaskCenterService { String userId, String module) { Map> poolIdMap = reports.stream() - .collect(Collectors.groupingBy(ReportDTO::getPoolId, Collectors.mapping(ReportDTO::getId, Collectors.toList()))); + .collect(Collectors.groupingBy(ReportDTO::getPoolId, + Collectors.mapping(ReportDTO::getId, Collectors.toList()))); + poolIdMap.forEach((poolId, reportList) -> { TestResourcePoolReturnDTO testResourcePoolDTO = testResourcePoolService.getTestResourcePoolDetail(poolId); - List nodesList = testResourcePoolDTO.getTestResourceReturnDTO().getNodesList(); - if (CollectionUtils.isNotEmpty(nodesList)) { - stopTask(request, reportList, nodesList, reports); + + // Remove excluded report IDs + if (request.getExcludeIds() != null && !request.getExcludeIds().isEmpty()) { + reportList.removeAll(request.getExcludeIds()); + } + + boolean isK8SResourcePool = StringUtils.equals(testResourcePoolDTO.getType(), ResourcePoolTypeEnum.K8S.name()); + + if (isK8SResourcePool) { + handleK8STask(request, reports, reportList, testResourcePoolDTO); + } else { + List nodesList = testResourcePoolDTO.getTestResourceReturnDTO().getNodesList(); + if (CollectionUtils.isNotEmpty(nodesList)) { + stopTask(request, reportList, nodesList, reports); + } } }); - // 保存日志 获取所有的reportId - List reportIds = reports.stream().map(ReportDTO::getId).toList(); - SubListUtils.dealForSubList(reportIds, 100, (subList) -> { - if (request.getModuleType().equals(TaskCenterResourceType.API_CASE.toString())) { - //记录日志 - saveLog(subList, userId, StringUtils.join(module, "_REAL_TIME_API_CASE"), TaskCenterResourceType.API_CASE.toString()); - } else if (request.getModuleType().equals(TaskCenterResourceType.API_SCENARIO.toString())) { - saveLog(subList, userId, StringUtils.join(module, "_REAL_TIME_API_SCENARIO"), TaskCenterResourceType.API_SCENARIO.toString()); + + logReports(request, reports, userId, module); + } + + private void handleK8STask(TaskCenterBatchRequest request, List reports, + List reportList, TestResourcePoolReturnDTO testResourcePoolDTO) { + TaskRequestDTO taskRequestDTO = new TaskRequestDTO(); + TaskResultDTO result = createStoppedTaskResult(); + + // Prepare mapping for integration and resource IDs + Map integrationMap = prepareIntegrationMap(reports); + Map resourceIdMap = prepareResourceIdMap(reports); + Map testPlanIdMap = prepareTestPlanIdMap(reports); + + SubListUtils.dealForSubList(reportList, 100, subList -> { + try { + TestResourceDTO testResourceDTO = new TestResourceDTO(); + BeanUtils.copyBean(testResourceDTO, testResourcePoolDTO.getTestResourceReturnDTO()); + EngineFactory.stopApi(subList, testResourceDTO); + } catch (Exception e) { + LogUtils.error(e); + } finally { + processSubListReports(subList, request, result, taskRequestDTO, integrationMap, resourceIdMap, testPlanIdMap); } }); } + private TaskResultDTO createStoppedTaskResult() { + TaskResultDTO result = new TaskResultDTO(); + result.setRequestResults(Collections.emptyList()); + result.setHasEnded(true); + + ProcessResultDTO processResultDTO = new ProcessResultDTO(); + processResultDTO.setStatus(ExecStatus.STOPPED.name()); + result.setProcessResultDTO(processResultDTO); + result.setConsole("任务已终止"); + + return result; + } + + private void processSubListReports(List subList, TaskCenterBatchRequest request, + TaskResultDTO result, TaskRequestDTO taskRequestDTO, + Map integrationMap, Map resourceIdMap, + Map testPlanIdMap) { + subList.forEach(reportId -> { + TaskInfo taskInfo = taskRequestDTO.getTaskInfo(); + taskInfo.setResourceType(request.getModuleType()); + + TaskItem taskItem = new TaskItem(); + taskItem.setReportId(reportId); + taskItem.setResourceId(resourceIdMap.get(reportId)); + + // Set resource type based on the test plan ID + String testPlanId = testPlanIdMap.get(reportId); + if (testPlanId != null && !"NONE".equals(testPlanId)) { + taskInfo.setResourceType(getResourceType(request.getModuleType())); + } + + // Set integrated report information + taskInfo.getRunModeConfig().setIntegratedReport(integrationMap.get(reportId)); + if (Boolean.TRUE.equals(integrationMap.get(reportId))) { + taskInfo.getRunModeConfig().getCollectionReport().setReportId(reportId); + } + + taskRequestDTO.setTaskItem(taskItem); + result.setRequest(taskRequestDTO); + kafkaTemplate.send(KafkaTopicConstants.API_REPORT_TOPIC, JSON.toJSONString(result)); + }); + } + + private String getResourceType(String moduleType) { + return TaskCenterResourceType.API_CASE.toString().equals(moduleType) + ? ApiExecuteResourceType.TEST_PLAN_API_CASE.name() + : TaskCenterResourceType.API_SCENARIO.toString().equals(moduleType) + ? ApiExecuteResourceType.TEST_PLAN_API_SCENARIO.name() + : ""; + } + + private void logReports(TaskCenterBatchRequest request, List reports, + String userId, String module) { + List reportIds = reports.stream().map(ReportDTO::getId).toList(); + SubListUtils.dealForSubList(reportIds, 100, subList -> { + String logPrefix = StringUtils.join(module, "_REAL_TIME_"); + String resourceType = request.getModuleType().equals(TaskCenterResourceType.API_CASE.toString()) + ? TaskCenterResourceType.API_CASE.toString() + : TaskCenterResourceType.API_SCENARIO.toString(); + + saveLog(subList, userId, logPrefix + resourceType, resourceType); + }); + } + public void stopTask(TaskCenterBatchRequest request, List reportList, List nodesList, List reports) { - // 根据报告id分组 - Map integrationMap = reports.stream() - .collect(Collectors.toMap(ReportDTO::getId, ReportDTO::getIntegrated)); - Map resourceIdMap = reports.stream() - .collect(Collectors.toMap(ReportDTO::getId, ReportDTO::getResourceId)); - Map testPlanIdMap = reports.stream() - .collect(Collectors.toMap(ReportDTO::getId, ReportDTO::getTestPlanId)); + Map integrationMap = prepareIntegrationMap(reports); + Map resourceIdMap = prepareResourceIdMap(reports); + Map testPlanIdMap = prepareTestPlanIdMap(reports); - // 如果需要排除的报告ID不为空,则从reportList中移除 + // Remove excluded report IDs if (request.getExcludeIds() != null && !request.getExcludeIds().isEmpty()) { reportList.removeAll(request.getExcludeIds()); } nodesList.parallelStream().forEach(node -> { String endpoint = MsHttpClient.getEndpoint(node.getIp(), node.getPort()); - - // 初始化 TaskRequestDTO 和 TaskResultDTO - TaskRequestDTO taskRequestDTO = new TaskRequestDTO(); - TaskResultDTO result = new TaskResultDTO(); - result.setRequestResults(Collections.emptyList()); - result.setHasEnded(true); - ProcessResultDTO processResultDTO = new ProcessResultDTO(); - processResultDTO.setStatus(ExecStatus.STOPPED.name()); - result.setProcessResultDTO(processResultDTO); - result.setConsole("任务已终止"); + TaskResultDTO result = createStoppedTaskResult(); SubListUtils.dealForSubList(reportList, 100, subList -> { try { @@ -329,41 +411,23 @@ public class ApiTaskCenterService { } catch (Exception e) { LogUtils.error(e); } finally { - subList.forEach(reportId -> { - TaskInfo taskInfo = taskRequestDTO.getTaskInfo(); - taskInfo.setResourceType(request.getModuleType()); - - TaskItem taskItem = new TaskItem(); - taskItem.setReportId(reportId); - taskItem.setResourceId(resourceIdMap.get(reportId)); - // 设置任务信息的资源类型 - String testPlanId = testPlanIdMap.get(reportId); - if (testPlanId != null && !"NONE".equals(testPlanId)) { - String moduleType = request.getModuleType(); - taskInfo.setResourceType( - TaskCenterResourceType.API_CASE.toString().equals(moduleType) - ? ApiExecuteResourceType.TEST_PLAN_API_CASE.name() - : TaskCenterResourceType.API_SCENARIO.toString().equals(moduleType) - ? ApiExecuteResourceType.TEST_PLAN_API_SCENARIO.name() - : taskInfo.getResourceType() - ); - } - - // 设置集成报告 - taskInfo.getRunModeConfig().setIntegratedReport(integrationMap.get(reportId)); - if (Boolean.TRUE.equals(integrationMap.get(reportId))) { - taskInfo.getRunModeConfig().getCollectionReport().setReportId(reportId); - } - - taskRequestDTO.setTaskItem(taskItem); - result.setRequest(taskRequestDTO); - kafkaTemplate.send(KafkaTopicConstants.API_REPORT_TOPIC, JSON.toJSONString(result)); - }); + processSubListReports(subList, request, result, new TaskRequestDTO(), integrationMap, resourceIdMap, testPlanIdMap); } }); }); } + private Map prepareIntegrationMap(List reports) { + return reports.stream().collect(Collectors.toMap(ReportDTO::getId, ReportDTO::getIntegrated)); + } + + private Map prepareResourceIdMap(List reports) { + return reports.stream().collect(Collectors.toMap(ReportDTO::getId, ReportDTO::getResourceId)); + } + + private Map prepareTestPlanIdMap(List reports) { + return reports.stream().collect(Collectors.toMap(ReportDTO::getId, ReportDTO::getTestPlanId)); + } private void saveLog(List ids, String userId, String module, String type) { List reports = new ArrayList<>(); diff --git a/backend/services/test-plan/src/main/java/io/metersphere/plan/service/TestPlanTaskCenterService.java b/backend/services/test-plan/src/main/java/io/metersphere/plan/service/TestPlanTaskCenterService.java index d2588ad14a..25a74af75c 100644 --- a/backend/services/test-plan/src/main/java/io/metersphere/plan/service/TestPlanTaskCenterService.java +++ b/backend/services/test-plan/src/main/java/io/metersphere/plan/service/TestPlanTaskCenterService.java @@ -6,19 +6,19 @@ import io.metersphere.api.dto.definition.ExecuteReportDTO; import io.metersphere.api.dto.report.ReportDTO; import io.metersphere.api.mapper.ExtApiScenarioReportMapper; import io.metersphere.api.service.ApiTaskCenterService; +import io.metersphere.engine.EngineFactory; import io.metersphere.engine.MsHttpClient; import io.metersphere.plan.mapper.ExtTestPlanReportMapper; import io.metersphere.project.domain.Project; import io.metersphere.project.mapper.ProjectMapper; import io.metersphere.sdk.constants.PermissionConstants; +import io.metersphere.sdk.constants.ResourcePoolTypeEnum; import io.metersphere.sdk.constants.TaskCenterResourceType; import io.metersphere.sdk.exception.MSException; -import io.metersphere.sdk.util.DateUtils; -import io.metersphere.sdk.util.LogUtils; -import io.metersphere.sdk.util.SubListUtils; -import io.metersphere.sdk.util.Translator; +import io.metersphere.sdk.util.*; import io.metersphere.system.domain.Organization; import io.metersphere.system.dto.builder.LogDTOBuilder; +import io.metersphere.system.dto.pool.TestResourceDTO; import io.metersphere.system.dto.pool.TestResourceNodeDTO; import io.metersphere.system.dto.pool.TestResourcePoolReturnDTO; import io.metersphere.system.dto.sdk.OptionDTO; @@ -336,9 +336,23 @@ public class TestPlanTaskCenterService { .collect(Collectors.groupingBy(ReportDTO::getPoolId, Collectors.mapping(ReportDTO::getId, Collectors.toList()))); poolIdMap.forEach((poolId, reportList) -> { TestResourcePoolReturnDTO testResourcePoolDTO = testResourcePoolService.getTestResourcePoolDetail(poolId); - List nodesList = testResourcePoolDTO.getTestResourceReturnDTO().getNodesList(); - if (CollectionUtils.isNotEmpty(nodesList)) { - stopTask(request, reportList, nodesList); + // 判断是否为 K8S 资源池 + boolean isK8SResourcePool = StringUtils.equals(testResourcePoolDTO.getType(), ResourcePoolTypeEnum.K8S.name()); + if (isK8SResourcePool) { + SubListUtils.dealForSubList(reportList, 100, (subList) -> { + try { + TestResourceDTO testResourceDTO = new TestResourceDTO(); + BeanUtils.copyBean(testResourceDTO, testResourcePoolDTO.getTestResourceReturnDTO()); + EngineFactory.stopApi(subList, testResourceDTO); + } catch (Exception e) { + LogUtils.error(e); + } + }); + } else { + List nodesList = testResourcePoolDTO.getTestResourceReturnDTO().getNodesList(); + if (CollectionUtils.isNotEmpty(nodesList)) { + stopTask(request, reportList, nodesList); + } } }); }