diff --git a/backend/services/api-test/src/main/java/io/metersphere/api/engine/KubernetesExecEngine.java b/backend/services/api-test/src/main/java/io/metersphere/api/engine/KubernetesExecEngine.java index 52fe8ee37b..c844b79f74 100644 --- a/backend/services/api-test/src/main/java/io/metersphere/api/engine/KubernetesExecEngine.java +++ b/backend/services/api-test/src/main/java/io/metersphere/api/engine/KubernetesExecEngine.java @@ -4,10 +4,17 @@ import io.metersphere.engine.ApiEngine; import io.metersphere.sdk.dto.api.task.TaskBatchRequestDTO; import io.metersphere.sdk.dto.api.task.TaskRequestDTO; import io.metersphere.sdk.exception.MSException; +import io.metersphere.sdk.exception.TaskRunnerResultCode; import io.metersphere.sdk.util.LogUtils; +import io.metersphere.system.controller.handler.ResultHolder; import io.metersphere.system.dto.pool.TestResourceDTO; +import org.springframework.web.client.HttpServerErrorException; +import java.util.Arrays; import java.util.List; +import java.util.Optional; + +import static io.metersphere.api.controller.result.ApiResultCode.RESOURCE_POOL_EXECUTE_ERROR; public class KubernetesExecEngine implements ApiEngine { /** @@ -16,16 +23,34 @@ public class KubernetesExecEngine implements ApiEngine { private final Object request; private final TestResourceDTO resource; + /** + * 单调执行构造函数 + * + * @param request + * @param resource + */ public KubernetesExecEngine(TaskRequestDTO request, TestResourceDTO resource) { this.request = request; this.resource = resource; } + /** + * 批量执行构造函数 + * + * @param batchRequestDTO + * @param resource + */ public KubernetesExecEngine(TaskBatchRequestDTO batchRequestDTO, TestResourceDTO resource) { this.resource = resource; this.request = batchRequestDTO; } + /** + * 停止执行构造函数 + * + * @param reportIds + * @param resource + */ public KubernetesExecEngine(List reportIds, TestResourceDTO resource) { this.resource = resource; this.request = reportIds; @@ -40,18 +65,33 @@ public class KubernetesExecEngine implements ApiEngine { private void runApi(String command) { try { - KubernetesProvider.exec(resource, request, command); + KubernetesProvider.exec(resource, command); + } catch (HttpServerErrorException e) { + handleHttpServerError(e); } catch (Exception e) { - LogUtils.error("K8S 执行异常:", e); - rollbackOnFailure(); // 错误处理逻辑 - throw new MSException("K8S 节点执行错误:" + e.getMessage(), e); + handleGeneralError(e); } } + private void handleHttpServerError(HttpServerErrorException e) { + LogUtils.error("K8S 执行异常:", e); - // 错误回滚处理 - private void rollbackOnFailure() { - // TODO: 实现回滚处理逻辑 - LogUtils.info("执行失败,回滚操作启动。"); + // 获取错误代码并处理 + int errorCode = Optional.ofNullable(e.getResponseBodyAs(ResultHolder.class)) + .map(ResultHolder::getCode) + .orElseThrow(() -> new MSException(RESOURCE_POOL_EXECUTE_ERROR, "Unknown error code")); + + // 匹配资源池的错误代码并抛出相应异常 + TaskRunnerResultCode resultCode = Arrays.stream(TaskRunnerResultCode.values()) + .filter(code -> code.getCode() == errorCode) + .findFirst() + .orElseThrow(() -> new MSException(RESOURCE_POOL_EXECUTE_ERROR, e.getMessage())); + + throw new MSException(resultCode, e.getMessage()); + } + + private void handleGeneralError(Exception e) { + LogUtils.error("K8S 执行异常:", e); + throw new MSException(RESOURCE_POOL_EXECUTE_ERROR, e.getMessage()); } } diff --git a/backend/services/api-test/src/main/java/io/metersphere/api/engine/KubernetesProvider.java b/backend/services/api-test/src/main/java/io/metersphere/api/engine/KubernetesProvider.java index ec669f65b0..54ca2571f8 100644 --- a/backend/services/api-test/src/main/java/io/metersphere/api/engine/KubernetesProvider.java +++ b/backend/services/api-test/src/main/java/io/metersphere/api/engine/KubernetesProvider.java @@ -5,13 +5,17 @@ import io.fabric8.kubernetes.client.ConfigBuilder; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.KubernetesClientBuilder; import io.fabric8.kubernetes.client.dsl.ExecListener; +import io.fabric8.kubernetes.client.dsl.ExecWatch; import io.metersphere.sdk.exception.MSException; import io.metersphere.sdk.util.JSON; import io.metersphere.sdk.util.LogUtils; import io.metersphere.system.dto.pool.TestResourceDTO; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.springframework.util.CollectionUtils; +import java.io.ByteArrayOutputStream; +import java.io.InputStream; import java.util.List; import java.util.concurrent.ThreadLocalRandom; @@ -46,6 +50,63 @@ public class KubernetesProvider { return nodePods.get(ThreadLocalRandom.current().nextInt(nodePods.size())); } + /** + * 同步执行命令 + * + * @param resource + * @param command + * @throws Exception + */ + public static void exec(TestResourceDTO resource, String command) throws Exception { + ExecWatch execWatch = null; + try (KubernetesClient client = getKubernetesClient(resource)) { + Pod pod = getExecPod(client, resource); + LogUtils.info("当前执行 Pod:【 " + pod.getMetadata().getName() + " 】"); + + // 同步执行命令 + execWatch = client.pods().inNamespace(client.getNamespace()) + .withName(pod.getMetadata().getName()) + .redirectingInput() + .writingOutput(System.out) + .writingError(System.err) + .withTTY() + .exec(SHELL_COMMAND, "-c", command); + + // 等待命令执行完成,获取结果 + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + ByteArrayOutputStream errorStream = new ByteArrayOutputStream(); + + try (InputStream inputStream = execWatch.getOutput(); + InputStream errStream = execWatch.getError()) { + + // 读取标准输出和错误输出 + IOUtils.copy(inputStream, outputStream); + IOUtils.copy(errStream, errorStream); + + // 判断是否有错误输出 + if (errorStream.size() > 0) { + throw new MSException("Kubernetes exec error: " + errorStream); + } + + // 输出结果 + String result = outputStream.toString(); + LogUtils.info("命令执行结果: " + result); + } + } finally { + // 确保 ExecWatch 被关闭以释放资源 + if (execWatch != null) { + execWatch.close(); + } + } + } + + /** + * 异步执行命令 + * + * @param resource + * @param runRequest + * @param command + */ public static void exec(TestResourceDTO resource, Object runRequest, String command) { try (KubernetesClient client = getKubernetesClient(resource)) { Pod pod = getExecPod(client, resource); @@ -63,7 +124,6 @@ public class KubernetesProvider { } private record SimpleListener(Object runRequest) implements ExecListener { - @Override public void onOpen() { LogUtils.info("K8s 开启监听"); @@ -75,6 +135,7 @@ public class KubernetesProvider { if (runRequest != null) { LogUtils.info("请求参数:{}", JSON.toJSONString(runRequest)); // TODO: Add proper error handling based on response or task request details + } throw new MSException("K8S 节点执行错误:" + t.getMessage(), t); }