fix(接口测试): 修复k8s无法执行问题

--bug=1017872 --user=赵勇 【接口测试】接口CASE列表中批量执行case-选择K8s资源池-集合报告执行被停止 https://www.tapd.cn/55049933/s/1259827
This commit is contained in:
fit2-zhao 2022-10-12 17:22:04 +08:00 committed by f2c-ci-robot[bot]
parent d95efb97f7
commit c3a77c9202
7 changed files with 164 additions and 282 deletions

View File

@ -1,6 +1,5 @@
package io.metersphere.api.exec.engine;
import io.metersphere.base.domain.LoadTestReportWithBLOBs;
import io.metersphere.base.domain.TestResource;
import io.metersphere.base.domain.TestResourcePool;
import io.metersphere.commons.constants.ResourcePoolTypeEnum;
@ -12,21 +11,15 @@ import io.metersphere.dto.JmeterRunRequestDTO;
import io.metersphere.engine.Engine;
import io.metersphere.service.BaseTestResourcePoolService;
import io.metersphere.service.BaseTestResourceService;
import io.metersphere.commons.utils.JSONUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.json.JSONArray;
import org.json.JSONObject;
import java.util.Iterator;
import java.util.List;
public abstract class AbstractEngine implements Engine {
protected String JMETER_IMAGE;
protected String HEAP;
protected String GC_ALGO;
protected LoadTestReportWithBLOBs loadTestReport;
protected Integer threadNum;
protected List<TestResource> resourceList;
protected TestResourcePool resourcePool;
private final BaseTestResourcePoolService testResourcePoolService;
@ -73,98 +66,4 @@ public abstract class AbstractEngine implements Engine {
MSException.throwException("Test Resource is empty");
}
}
protected void init(LoadTestReportWithBLOBs loadTestReport) {
if (loadTestReport == null) {
MSException.throwException("LoadTest is null.");
}
this.loadTestReport = loadTestReport;
threadNum = getThreadNum(loadTestReport);
String resourcePoolId = loadTestReport.getTestResourcePoolId();
if (StringUtils.isBlank(resourcePoolId)) {
MSException.throwException("Resource Pool ID is empty");
}
resourcePool = testResourcePoolService.getResourcePool(resourcePoolId);
if (resourcePool == null || StringUtils.equals(resourcePool.getStatus(), ResourceStatusEnum.DELETE.name())) {
MSException.throwException("Resource Pool is empty");
}
if (!ResourcePoolTypeEnum.K8S.name().equals(resourcePool.getType())
&& !ResourcePoolTypeEnum.NODE.name().equals(resourcePool.getType())) {
MSException.throwException("Invalid Resource Pool type.");
}
if (!StringUtils.equals(resourcePool.getStatus(), ResourceStatusEnum.VALID.name())) {
MSException.throwException("Resource Pool Status is not VALID");
}
// image
String image = resourcePool.getImage();
if (StringUtils.isNotEmpty(image)) {
JMETER_IMAGE = image;
}
// heap
String heap = resourcePool.getHeap();
if (StringUtils.isNotEmpty(heap)) {
HEAP = heap;
}
// gc_algo
String gcAlgo = resourcePool.getGcAlgo();
if (StringUtils.isNotEmpty(gcAlgo)) {
GC_ALGO = gcAlgo;
}
this.resourceList = testResourceService.getResourcesByPoolId(resourcePool.getId());
if (CollectionUtils.isEmpty(this.resourceList)) {
MSException.throwException("Test Resource is empty");
}
}
private Integer getThreadNum(LoadTestReportWithBLOBs t) {
Integer s = 0;
String loadConfiguration = t.getLoadConfiguration();
JSONArray jsonArray = JSONUtil.parseArray(loadConfiguration);
Iterator<Object> iterator = jsonArray.iterator();
outer:
while (iterator.hasNext()) {
Object next = iterator.next();
if (next instanceof List) {
List<Object> o = (List<Object>) next;
for (Object o1 : o) {
JSONObject jsonObject = JSONUtil.parseObject(o1.toString());
String key = jsonObject.optString("key");
if (StringUtils.equals(key, "deleted")) {
String value = jsonObject.optString("value");
if (StringUtils.equals(value, "true")) {
iterator.remove();
continue outer;
}
}
}
for (Object o1 : o) {
JSONObject jsonObject = JSONUtil.parseObject(o1.toString());
String key = jsonObject.optString("key");
if (StringUtils.equals(key, "enabled")) {
String value = jsonObject.optString("value");
if (StringUtils.equals(value, "false")) {
iterator.remove();
continue outer;
}
}
}
}
}
for (int i = 0; i < jsonArray.length(); i++) {
if (jsonArray.get(i) instanceof List) {
JSONArray o = jsonArray.getJSONArray(i);
for (int j = 0; j < o.length(); j++) {
if (StringUtils.equals(o.optJSONObject(j).optString("key"), "TargetLevel")) {
s += o.optJSONObject(j).optInt("value");
break;
}
}
}
}
return s;
}
}

View File

@ -1,36 +1,14 @@
package io.metersphere.api.exec.engine;
import io.metersphere.commons.exception.MSException;
import io.metersphere.commons.utils.LogUtil;
import io.metersphere.dto.JmeterRunRequestDTO;
import io.metersphere.engine.Engine;
import org.apache.commons.beanutils.ConstructorUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
@Transactional(rollbackFor = Exception.class)
public class EngineFactory {
public static Class<? extends KubernetesTestEngine> getKubernetesTestEngineClass() {
Class kubernetesTestEngineClass;
try {
// 使用反射工具包这里在特定环境66会卡住
kubernetesTestEngineClass = Class.forName("io.metersphere.xpack.engine.KubernetesTestEngineImpl");
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
return kubernetesTestEngineClass;
}
public static Engine createApiEngine(JmeterRunRequestDTO runRequest) {
try {
return ConstructorUtils.invokeConstructor(getKubernetesTestEngineClass(), runRequest);
} catch (Exception e) {
LogUtil.error(e);
MSException.throwException(e.getMessage());
}
return null;
return new KubernetesTestEngine(runRequest);
}
}

View File

@ -1,8 +1,6 @@
package io.metersphere.api.exec.engine;
import io.metersphere.commons.constants.FileType;
import io.metersphere.commons.utils.LogUtil;
import io.metersphere.commons.utils.JmeterDocumentParser;
import org.apache.commons.lang3.StringUtils;
import org.dom4j.Document;
import org.dom4j.DocumentException;
@ -21,16 +19,6 @@ import java.util.List;
public class EngineSourceParserFactory {
public static final boolean IS_TRANS = false;
public static EngineSourceParser createEngineSourceParser(String type) {
final FileType engineType = FileType.valueOf(type);
if (FileType.JMX.equals(engineType)) {
return new JmeterDocumentParser();
}
return null;
}
public static Document getDocument(InputStream source) throws DocumentException {
SAXReader reader = new SAXReader();
try {

View File

@ -0,0 +1,92 @@
package io.metersphere.api.exec.engine;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.ExecListener;
import io.fabric8.kubernetes.client.dsl.ExecWatch;
import io.metersphere.commons.exception.MSException;
import io.metersphere.commons.utils.CommonBeanFactory;
import io.metersphere.commons.utils.JSON;
import io.metersphere.dto.JmeterRunRequestDTO;
import io.metersphere.service.RemakeReportService;
import io.metersphere.utils.LoggerUtil;
import io.metersphere.xpack.resourcepool.engine.provider.ClientCredential;
import okhttp3.Response;
import org.apache.commons.collections4.CollectionUtils;
import java.util.List;
import java.util.Random;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
public class KubernetesApiExec {
public static Pod getExecPod(ClientCredential credential, KubernetesClient client) {
List<Pod> pods = client.pods().inNamespace(credential.getNamespace()).list().getItems();
if (CollectionUtils.isEmpty(pods)) {
MSException.throwException("Execution node not found");
}
List<Pod> nodePods = pods.stream().filter(s -> s.getStatus().getPhase().equals("Running") && s.getMetadata().getGenerateName().startsWith(credential.getDeployName())).collect(Collectors.toList());
if (CollectionUtils.isEmpty(nodePods)) {
MSException.throwException("Execution node not found");
}
Pod pod = nodePods.get(new Random().nextInt(nodePods.size()));
return pod;
}
public static ExecWatch newExecWatch(KubernetesClient client, String namespace, String podName, String command) {
LoggerUtil.info("CURL 命令:【 " + command + "");
return client.pods().inNamespace(namespace).withName(podName)
.readingInput(System.in)
.writingOutput(System.out)
.writingError(System.err)
.withTTY()
.usingListener(new SimpleListener())
.exec("sh", "-c", command);
}
private static String getQuery(String content) {
Pattern regex = Pattern.compile("\\{([^}]*)\\}");
Matcher matcher = regex.matcher(content);
StringBuilder sql = new StringBuilder();
while (matcher.find()) {
sql.append(matcher.group(1) + ",");
}
if (sql.length() > 0) {
sql.deleteCharAt(sql.length() - 1);
}
return sql.toString();
}
private static class SimpleListener implements ExecListener {
@Override
public void onOpen(Response response) {
LoggerUtil.info("The shell will remain open for 10 seconds.");
}
@Override
public void onFailure(Throwable t, Response response) {
List<String> value = response.request().url().queryParameterValues("command");
if (CollectionUtils.isNotEmpty(value) && value.size() > 2 && value.get(2).startsWith("curl")) {
String query = "{" + KubernetesApiExec.getQuery(value.get(2)) + "}";
JmeterRunRequestDTO runRequest = JSON.parseObject(query, JmeterRunRequestDTO.class);
if (runRequest != null) {
RemakeReportService apiScenarioReportService = CommonBeanFactory.getBean(RemakeReportService.class);
apiScenarioReportService.testEnded(runRequest, response.networkResponse().message());
} else {
MSException.throwException("K8S 节点执行错误:" + response.networkResponse().message());
}
} else {
MSException.throwException("K8S 节点执行错误:" + response.networkResponse().message());
}
LoggerUtil.error("K8S 节点执行错误:" + JSON.toJSONString(value));
LoggerUtil.error("K8S 节点执行错误:" + response.networkResponse());
}
@Override
public void onClose(int code, String reason) {
LoggerUtil.info("The shell will now close.");
}
}
}

View File

@ -1,6 +1,73 @@
package io.metersphere.api.exec.engine;
import io.metersphere.engine.Engine;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.metersphere.base.domain.TestResource;
import io.metersphere.commons.exception.MSException;
import io.metersphere.commons.utils.JSON;
import io.metersphere.dto.JmeterRunRequestDTO;
import io.metersphere.utils.LoggerUtil;
import io.metersphere.xpack.resourcepool.engine.provider.ClientCredential;
import io.metersphere.xpack.resourcepool.engine.provider.KubernetesProvider;
import org.apache.commons.beanutils.ConstructorUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.reflections.Reflections;
public interface KubernetesTestEngine extends Engine {
import java.util.Set;
public class KubernetesTestEngine extends AbstractEngine {
private JmeterRunRequestDTO runRequest;
// 初始化API调用
public KubernetesTestEngine(JmeterRunRequestDTO runRequest) {
super.initApiConfig(runRequest);
this.runRequest = runRequest;
}
private static Class<? extends KubernetesProvider> providerClass = null;
static {
Set<Class<? extends KubernetesProvider>> subTypes = new Reflections("io.metersphere.xpack").getSubTypesOf(KubernetesProvider.class);
if (CollectionUtils.isNotEmpty(subTypes)) {
providerClass = subTypes.stream().findFirst().get();
}
}
@Override
public void start() {
resourceList.forEach(r -> {
runApi(r);
});
}
private void runApi(TestResource resource) {
try {
ClientCredential clientCredential = JSON.parseObject(resource.getConfiguration(), ClientCredential.class);
KubernetesProvider kubernetesProvider = ConstructorUtils.invokeConstructor(providerClass, clientCredential);
KubernetesClient client = kubernetesProvider.getClient();
Pod pod = KubernetesApiExec.getExecPod(clientCredential, client);
StringBuffer logMsg = new StringBuffer("当前报告:【" + runRequest.getReportId() + "】资源:【" + runRequest.getTestId() + "")
.append("\n").append("namespace").append(clientCredential.getNamespace())
.append("\n").append("Pod信息")
.append(JSON.toJSONString(pod.getMetadata())).append("");
LoggerUtil.info(logMsg);
// 拼接CURL执行命令
StringBuffer command = new StringBuffer("curl -H \"Accept: application/json\" -H \"Content-type: application/json\" -X POST -d").append(" ");
command.append("'").append(JSON.toJSONString(runRequest)).append("'"); // 请求参数
command.append(" ").append("--connect-timeout 30"); // 设置连接超时时间为30S
command.append(" ").append("--max-time 120"); // 设置请求超时时间为120S
command.append(" ").append("--retry 3"); // 设置重试次数3次
command.append(" ").append("http://127.0.0.1:8082/jmeter/api/start");
KubernetesApiExec.newExecWatch(client, clientCredential.getNamespace(), pod.getMetadata().getName(), command.toString());
} catch (Exception e) {
LoggerUtil.error("当前报告:【" + runRequest.getReportId() + "】资源:【" + runRequest.getTestId() + "】CURL失败", e);
MSException.throwException(e);
}
}
@Override
public void stop() {
}
}

View File

@ -1,144 +0,0 @@
package io.metersphere.api.exec.engine.docker;
import io.metersphere.api.exec.engine.AbstractEngine;
import io.metersphere.base.domain.LoadTestReportWithBLOBs;
import io.metersphere.base.domain.TestResource;
import io.metersphere.commons.constants.ResourceStatusEnum;
import io.metersphere.commons.exception.MSException;
import io.metersphere.commons.utils.CommonBeanFactory;
import io.metersphere.commons.utils.JSON;
import io.metersphere.commons.utils.LocalAddressUtils;
import io.metersphere.commons.utils.UrlTestUtils;
import io.metersphere.config.KafkaProperties;
import io.metersphere.controller.handler.ResultHolder;
import io.metersphere.dto.BaseSystemConfigDTO;
import io.metersphere.dto.NodeDTO;
import io.metersphere.engine.request.StartTestRequest;
import io.metersphere.i18n.Translator;
import io.metersphere.service.SystemParameterService;
import org.apache.commons.lang3.StringUtils;
import org.springframework.web.client.RestTemplate;
import java.util.HashMap;
import java.util.Map;
public class DockerTestEngine extends AbstractEngine {
private static final String BASE_URL = "http://%s:%d";
private RestTemplate restTemplate;
private RestTemplate restTemplateWithTimeOut;
public DockerTestEngine(LoadTestReportWithBLOBs loadTestReport) {
this.init(loadTestReport);
}
@Override
protected void init(LoadTestReportWithBLOBs loadTestReport) {
super.init(loadTestReport);
this.restTemplate = (RestTemplate) CommonBeanFactory.getBean("restTemplate");
this.restTemplateWithTimeOut = (RestTemplate) CommonBeanFactory.getBean("restTemplateWithTimeOut");
}
@Override
public void start() {
int totalThreadNum = resourceList.stream()
.filter(r -> ResourceStatusEnum.VALID.name().equals(r.getStatus()))
.map(r -> JSON.parseObject(r.getConfiguration(), NodeDTO.class).getMaxConcurrency())
.reduce(Integer::sum)
.orElse(0);
Object[] resourceRatios = resourceList.stream()
.filter(r -> ResourceStatusEnum.VALID.name().equals(r.getStatus()))
.map(r -> JSON.parseObject(r.getConfiguration(), NodeDTO.class).getMaxConcurrency())
.map(r -> r * 1.0 / totalThreadNum)
.map(r -> String.format("%.2f", r))
.toArray();
}
private void runTest(TestResource resource, Object[] ratios, int resourceIndex) {
String configuration = resource.getConfiguration();
NodeDTO node = JSON.parseObject(configuration, NodeDTO.class);
String nodeIp = node.getIp();
Integer port = node.getPort();
BaseSystemConfigDTO baseInfo = CommonBeanFactory.getBean(SystemParameterService.class).getBaseInfo();
KafkaProperties kafkaProperties = CommonBeanFactory.getBean(KafkaProperties.class);
String metersphereUrl = "http://localhost:8081"; // 占位符
if (baseInfo != null) {
metersphereUrl = baseInfo.getUrl();
}
// docker 不能从 localhost 中下载文件, 本地开发
if (StringUtils.contains(metersphereUrl, "http://localhost") ||
StringUtils.contains(metersphereUrl, "http://127.0.0.1")) {
metersphereUrl = "http://" + LocalAddressUtils.getIpAddress("en0") + ":8081";
}
String jmeterPingUrl = metersphereUrl + "/jmeter/ping"; // 检查下载地址是否正确
if (!UrlTestUtils.testUrlWithTimeOut(jmeterPingUrl, 1000)) {
MSException.throwException(Translator.get("run_load_test_file_init_error"));
}
Map<String, String> env = new HashMap<>();
env.put("RATIO", StringUtils.join(ratios, ","));
env.put("RESOURCE_INDEX", "" + resourceIndex);
env.put("METERSPHERE_URL", metersphereUrl);
env.put("START_TIME", "" + System.currentTimeMillis());
env.put("TEST_ID", this.loadTestReport.getTestId());
env.put("REPORT_ID", this.loadTestReport.getId());
env.put("BOOTSTRAP_SERVERS", kafkaProperties.getBootstrapServers());
env.put("LOG_TOPIC", kafkaProperties.getLog().getTopic());
env.put("JMETER_REPORTS_TOPIC", kafkaProperties.getReport().getTopic());
env.put("RESOURCE_ID", resource.getId());
env.put("THREAD_NUM", "0");// 传入0表示不用修改线程数
env.put("HEAP", HEAP);
env.put("GC_ALGO", GC_ALGO);
env.put("BACKEND_LISTENER", resourcePool.getBackendListener().toString());
StartTestRequest startTestRequest = new StartTestRequest();
startTestRequest.setImage(JMETER_IMAGE);
startTestRequest.setEnv(env);
String uri = String.format(BASE_URL + "/jmeter/container/start", nodeIp, port);
try {
ResultHolder result = restTemplate.postForObject(uri, startTestRequest, ResultHolder.class);
if (result == null) {
MSException.throwException(Translator.get("start_engine_fail"));
}
if (!result.isSuccess()) {
MSException.throwException(result.getMessage());
}
} catch (MSException e) {
throw e;
} catch (Exception e) {
MSException.throwException("Please check node-controller status.");
}
}
@Override
public void stop() {
String testId = loadTestReport.getTestId();
this.resourceList.forEach(r -> {
NodeDTO node = JSON.parseObject(r.getConfiguration(), NodeDTO.class);
String ip = node.getIp();
Integer port = node.getPort();
String uri = String.format(BASE_URL + "/jmeter/container/stop/" + testId, ip, port);
try {
ResultHolder result = restTemplateWithTimeOut.getForObject(uri, ResultHolder.class);
if (result == null) {
MSException.throwException(Translator.get("container_delete_fail"));
}
if (!result.isSuccess()) {
MSException.throwException(result.getMessage());
}
} catch (MSException e) {
throw e;
} catch (Exception e) {
MSException.throwException("Please check node-controller status.");
}
});
}
}

View File

@ -1,9 +1,11 @@
package io.metersphere.xpack.resourcepool.engine.provider;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.metersphere.engine.request.StartTestRequest;
public interface KubernetesProvider {
void deployJmeter(StartTestRequest request, ClientCredential clientCredential);
void deleteJmeter(String testId, String namespace);
KubernetesClient getClient();
}