diff --git a/backend/services/api-test/src/main/java/io/metersphere/api/engine/KubernetesProvider.java b/backend/services/api-test/src/main/java/io/metersphere/api/engine/KubernetesProvider.java index b933ed62f6..0ec26051ff 100644 --- a/backend/services/api-test/src/main/java/io/metersphere/api/engine/KubernetesProvider.java +++ b/backend/services/api-test/src/main/java/io/metersphere/api/engine/KubernetesProvider.java @@ -5,11 +5,21 @@ import io.fabric8.kubernetes.client.ConfigBuilder; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.KubernetesClientBuilder; import io.fabric8.kubernetes.client.dsl.ExecListener; +import io.metersphere.sdk.constants.KafkaTopicConstants; +import io.metersphere.sdk.constants.MsgType; +import io.metersphere.sdk.constants.ResultStatus; +import io.metersphere.sdk.dto.SocketMsgDTO; +import io.metersphere.sdk.dto.api.result.ProcessResultDTO; +import io.metersphere.sdk.dto.api.result.TaskResultDTO; +import io.metersphere.sdk.dto.api.task.TaskRequestDTO; import io.metersphere.sdk.exception.MSException; +import io.metersphere.sdk.util.CommonBeanFactory; import io.metersphere.sdk.util.JSON; import io.metersphere.sdk.util.LogUtils; +import io.metersphere.sdk.util.WebSocketUtils; import io.metersphere.system.dto.pool.TestResourceDTO; import org.apache.commons.lang3.StringUtils; +import org.springframework.kafka.core.KafkaTemplate; import org.springframework.util.CollectionUtils; import java.util.List; @@ -18,7 +28,7 @@ import java.util.concurrent.ThreadLocalRandom; public class KubernetesProvider { private static final String RUNNING_PHASE = "Running"; - private static final String SHELL_COMMAND = "#!/bin/bash"; + private static final String SHELL_COMMAND = "sh"; public static KubernetesClient getKubernetesClient(TestResourceDTO credential) { ConfigBuilder configBuilder = new ConfigBuilder() @@ -56,10 +66,7 @@ public class KubernetesProvider { try (KubernetesClient client = getKubernetesClient(resource)) { Pod pod = getExecPod(client, resource); LogUtils.info("当前执行 Pod:【 " + pod.getMetadata().getName() + " 】"); - - String commandX = SHELL_COMMAND + StringUtils.LF + command + StringUtils.LF; - - LogUtils.info("执行命令:【 " + commandX + " 】"); + LogUtils.info("执行命令:【 " + command + " 】"); // 同步执行命令 client.pods().inNamespace(client.getNamespace()) .withName(pod.getMetadata().getName()) @@ -68,7 +75,7 @@ public class KubernetesProvider { .writingError(System.err) .withTTY() .usingListener(new SimpleListener(runRequest)) - .exec(commandX); + .exec(SHELL_COMMAND, "-c", command + StringUtils.LF); } } @@ -83,8 +90,8 @@ public class KubernetesProvider { LogUtils.error("K8s 监听失败", t); if (runRequest != null) { LogUtils.info("请求参数:{}", JSON.toJSONString(runRequest)); - // TODO: Add proper error handling based on response or task request details - + handleGeneralError(runRequest, t); + return; } throw new MSException("K8S 节点执行错误:" + t.getMessage(), t); } @@ -95,4 +102,45 @@ public class KubernetesProvider { // No additional actions needed for now } } + + private static void handleGeneralError(Object requestObj, Throwable e) { + // 检查请求对象是否为 TaskRequestDTO 类型 + if (requestObj instanceof TaskRequestDTO request) { + // 发送结果到 WebSocket,如果报告 ID 存在 + String reportId = request.getTaskItem().getReportId(); + if (WebSocketUtils.has(reportId)) { + SocketMsgDTO socketMsgDTO = new SocketMsgDTO( + reportId, + request.getTaskInfo().getRunMode(), + MsgType.EXEC_END.name(), + e.getMessage() + ); + WebSocketUtils.sendMessageSingle(socketMsgDTO); + } + + // 尝试获取 KafkaTemplate 并发送任务结果 + KafkaTemplate kafkaTemplate = CommonBeanFactory.getBean(KafkaTemplate.class); + if (kafkaTemplate != null) { + TaskResultDTO result = buildTaskResult(request, e); + kafkaTemplate.send(KafkaTopicConstants.API_REPORT_TOPIC, JSON.toJSONString(result)); + } + } + } + + private static TaskResultDTO buildTaskResult(TaskRequestDTO request, Throwable e) { + // 创建并配置 TaskResultDTO + TaskResultDTO result = new TaskResultDTO(); + result.setRequest(request); + result.setRequestResults(List.of()); // 空的请求结果列表 + result.setHasEnded(true); // 标记任务已结束 + + // 创建并配置 ProcessResultDTO + ProcessResultDTO processResultDTO = new ProcessResultDTO(); + processResultDTO.setStatus(ResultStatus.ERROR.name()); + + result.setProcessResultDTO(processResultDTO); + result.setConsole(e.getMessage()); // 将异常信息记录到控制台日志 + + return result; + } }