refactor(接口测试): 增加同步执行命令方法

This commit is contained in:
fit2-zhao 2024-10-11 14:30:33 +08:00 committed by Craftsman
parent 0672393488
commit 68576d3d03
2 changed files with 110 additions and 9 deletions

View File

@ -4,10 +4,17 @@ import io.metersphere.engine.ApiEngine;
import io.metersphere.sdk.dto.api.task.TaskBatchRequestDTO; import io.metersphere.sdk.dto.api.task.TaskBatchRequestDTO;
import io.metersphere.sdk.dto.api.task.TaskRequestDTO; import io.metersphere.sdk.dto.api.task.TaskRequestDTO;
import io.metersphere.sdk.exception.MSException; import io.metersphere.sdk.exception.MSException;
import io.metersphere.sdk.exception.TaskRunnerResultCode;
import io.metersphere.sdk.util.LogUtils; import io.metersphere.sdk.util.LogUtils;
import io.metersphere.system.controller.handler.ResultHolder;
import io.metersphere.system.dto.pool.TestResourceDTO; import io.metersphere.system.dto.pool.TestResourceDTO;
import org.springframework.web.client.HttpServerErrorException;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Optional;
import static io.metersphere.api.controller.result.ApiResultCode.RESOURCE_POOL_EXECUTE_ERROR;
public class KubernetesExecEngine implements ApiEngine { public class KubernetesExecEngine implements ApiEngine {
/** /**
@ -16,16 +23,34 @@ public class KubernetesExecEngine implements ApiEngine {
private final Object request; private final Object request;
private final TestResourceDTO resource; private final TestResourceDTO resource;
/**
* 单调执行构造函数
*
* @param request
* @param resource
*/
public KubernetesExecEngine(TaskRequestDTO request, TestResourceDTO resource) { public KubernetesExecEngine(TaskRequestDTO request, TestResourceDTO resource) {
this.request = request; this.request = request;
this.resource = resource; this.resource = resource;
} }
/**
* 批量执行构造函数
*
* @param batchRequestDTO
* @param resource
*/
public KubernetesExecEngine(TaskBatchRequestDTO batchRequestDTO, TestResourceDTO resource) { public KubernetesExecEngine(TaskBatchRequestDTO batchRequestDTO, TestResourceDTO resource) {
this.resource = resource; this.resource = resource;
this.request = batchRequestDTO; this.request = batchRequestDTO;
} }
/**
* 停止执行构造函数
*
* @param reportIds
* @param resource
*/
public KubernetesExecEngine(List<String> reportIds, TestResourceDTO resource) { public KubernetesExecEngine(List<String> reportIds, TestResourceDTO resource) {
this.resource = resource; this.resource = resource;
this.request = reportIds; this.request = reportIds;
@ -40,18 +65,33 @@ public class KubernetesExecEngine implements ApiEngine {
private void runApi(String command) { private void runApi(String command) {
try { try {
KubernetesProvider.exec(resource, request, command); KubernetesProvider.exec(resource, command);
} catch (HttpServerErrorException e) {
handleHttpServerError(e);
} catch (Exception e) { } catch (Exception e) {
LogUtils.error("K8S 执行异常:", e); handleGeneralError(e);
rollbackOnFailure(); // 错误处理逻辑
throw new MSException("K8S 节点执行错误:" + e.getMessage(), e);
} }
} }
private void handleHttpServerError(HttpServerErrorException e) {
LogUtils.error("K8S 执行异常:", e);
// 错误回滚处理 // 获取错误代码并处理
private void rollbackOnFailure() { int errorCode = Optional.ofNullable(e.getResponseBodyAs(ResultHolder.class))
// TODO: 实现回滚处理逻辑 .map(ResultHolder::getCode)
LogUtils.info("执行失败,回滚操作启动。"); .orElseThrow(() -> new MSException(RESOURCE_POOL_EXECUTE_ERROR, "Unknown error code"));
// 匹配资源池的错误代码并抛出相应异常
TaskRunnerResultCode resultCode = Arrays.stream(TaskRunnerResultCode.values())
.filter(code -> code.getCode() == errorCode)
.findFirst()
.orElseThrow(() -> new MSException(RESOURCE_POOL_EXECUTE_ERROR, e.getMessage()));
throw new MSException(resultCode, e.getMessage());
}
private void handleGeneralError(Exception e) {
LogUtils.error("K8S 执行异常:", e);
throw new MSException(RESOURCE_POOL_EXECUTE_ERROR, e.getMessage());
} }
} }

View File

@ -5,13 +5,17 @@ import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientBuilder; import io.fabric8.kubernetes.client.KubernetesClientBuilder;
import io.fabric8.kubernetes.client.dsl.ExecListener; import io.fabric8.kubernetes.client.dsl.ExecListener;
import io.fabric8.kubernetes.client.dsl.ExecWatch;
import io.metersphere.sdk.exception.MSException; import io.metersphere.sdk.exception.MSException;
import io.metersphere.sdk.util.JSON; import io.metersphere.sdk.util.JSON;
import io.metersphere.sdk.util.LogUtils; import io.metersphere.sdk.util.LogUtils;
import io.metersphere.system.dto.pool.TestResourceDTO; import io.metersphere.system.dto.pool.TestResourceDTO;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.util.List; import java.util.List;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
@ -46,6 +50,63 @@ public class KubernetesProvider {
return nodePods.get(ThreadLocalRandom.current().nextInt(nodePods.size())); return nodePods.get(ThreadLocalRandom.current().nextInt(nodePods.size()));
} }
/**
* 同步执行命令
*
* @param resource
* @param command
* @throws Exception
*/
public static void exec(TestResourceDTO resource, String command) throws Exception {
ExecWatch execWatch = null;
try (KubernetesClient client = getKubernetesClient(resource)) {
Pod pod = getExecPod(client, resource);
LogUtils.info("当前执行 Pod" + pod.getMetadata().getName() + "");
// 同步执行命令
execWatch = client.pods().inNamespace(client.getNamespace())
.withName(pod.getMetadata().getName())
.redirectingInput()
.writingOutput(System.out)
.writingError(System.err)
.withTTY()
.exec(SHELL_COMMAND, "-c", command);
// 等待命令执行完成获取结果
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
ByteArrayOutputStream errorStream = new ByteArrayOutputStream();
try (InputStream inputStream = execWatch.getOutput();
InputStream errStream = execWatch.getError()) {
// 读取标准输出和错误输出
IOUtils.copy(inputStream, outputStream);
IOUtils.copy(errStream, errorStream);
// 判断是否有错误输出
if (errorStream.size() > 0) {
throw new MSException("Kubernetes exec error: " + errorStream);
}
// 输出结果
String result = outputStream.toString();
LogUtils.info("命令执行结果: " + result);
}
} finally {
// 确保 ExecWatch 被关闭以释放资源
if (execWatch != null) {
execWatch.close();
}
}
}
/**
* 异步执行命令
*
* @param resource
* @param runRequest
* @param command
*/
public static void exec(TestResourceDTO resource, Object runRequest, String command) { public static void exec(TestResourceDTO resource, Object runRequest, String command) {
try (KubernetesClient client = getKubernetesClient(resource)) { try (KubernetesClient client = getKubernetesClient(resource)) {
Pod pod = getExecPod(client, resource); Pod pod = getExecPod(client, resource);
@ -63,7 +124,6 @@ public class KubernetesProvider {
} }
private record SimpleListener(Object runRequest) implements ExecListener { private record SimpleListener(Object runRequest) implements ExecListener {
@Override @Override
public void onOpen() { public void onOpen() {
LogUtils.info("K8s 开启监听"); LogUtils.info("K8s 开启监听");
@ -75,6 +135,7 @@ public class KubernetesProvider {
if (runRequest != null) { if (runRequest != null) {
LogUtils.info("请求参数:{}", JSON.toJSONString(runRequest)); LogUtils.info("请求参数:{}", JSON.toJSONString(runRequest));
// TODO: Add proper error handling based on response or task request details // TODO: Add proper error handling based on response or task request details
} }
throw new MSException("K8S 节点执行错误:" + t.getMessage(), t); throw new MSException("K8S 节点执行错误:" + t.getMessage(), t);
} }