fix (接口自动化): node节点执行消息处理

This commit is contained in:
fit2-zhao 2021-09-01 16:30:34 +08:00 committed by fit2-zhao
parent 9bae1cadc4
commit ac0d3d9805
7 changed files with 132 additions and 69 deletions

View File

@ -1,7 +1,6 @@
package io.metersphere.api.dto;
import io.metersphere.api.dto.automation.APIScenarioReportResult;
import io.metersphere.base.domain.ApiScenarioWithBLOBs;
import lombok.Getter;
import lombok.Setter;
import org.apache.jorphan.collections.HashTree;
@ -20,7 +19,6 @@ public class RunModeDataDTO {
//
private String apiCaseId;
private ApiScenarioWithBLOBs scenario;
private Map<String, String> planEnvMap;
public RunModeDataDTO(){

View File

@ -16,6 +16,7 @@ import io.metersphere.commons.exception.MSException;
import io.metersphere.commons.utils.CommonBeanFactory;
import io.metersphere.commons.utils.LogUtil;
import io.metersphere.config.JmeterProperties;
import io.metersphere.config.KafkaConfig;
import io.metersphere.dto.BaseSystemConfigDTO;
import io.metersphere.dto.NodeDTO;
import io.metersphere.i18n.Translator;
@ -175,7 +176,7 @@ public class JMeterService {
}
} else {
try {
SendResult result = kafkaTemplate.send(MsKafkaListener.EXEC_TOPIC, JSON.toJSONString(runRequest)).get();
SendResult result = kafkaTemplate.send(KafkaConfig.EXEC_TOPIC, JSON.toJSONString(runRequest)).get();
if (result != null) {
LogUtil.debug("获取ack 结果:" + result.getRecordMetadata());
}

View File

@ -6,6 +6,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import io.metersphere.api.service.ApiEnvironmentRunningParamService;
import io.metersphere.api.service.TestResultService;
import io.metersphere.commons.utils.LogUtil;
import io.metersphere.config.KafkaConfig;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
@ -15,12 +16,9 @@ import javax.annotation.Resource;
@Service
public class MsKafkaListener {
public static final String TOPICS = "ms-api-exec-topic";
public final static String EXEC_TOPIC = "ms-automation-exec-topic";
public static final String CONSUME_ID = "ms-api-exec-consume";
@KafkaListener(id = CONSUME_ID, topics = TOPICS, groupId = "${spring.kafka.consumer.group-id}")
@KafkaListener(id = CONSUME_ID, topics = KafkaConfig.TOPICS, groupId = "${spring.kafka.consumer.group-id}")
public void consume(ConsumerRecord<?, String> record) {
LogUtil.info("接收到执行结果开始存储");
try {

View File

@ -3,7 +3,6 @@ package io.metersphere.api.service;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.JSONPath;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
@ -131,6 +130,8 @@ public class ApiAutomationService {
private ApiScenarioReferenceIdService apiScenarioReferenceIdService;
@Resource
private ResourcePoolCalculation resourcePoolCalculation;
@Resource
private NodeKafkaService nodeKafkaService;
public ApiScenarioWithBLOBs getDto(String id) {
return apiScenarioMapper.selectByPrimaryKey(id);
@ -177,39 +178,6 @@ public class ApiAutomationService {
return list;
}
private void setApiScenarioProjectIds(ApiScenarioDTO data) {
// 如果场景步骤涉及多项目则把涉及到的项目ID保存在projectIds属性
List<String> idList = new ArrayList<>();
String definition = data.getScenarioDefinition();
if (StringUtils.isNotBlank(definition)) {
RunDefinitionRequest d = JSON.parseObject(definition, RunDefinitionRequest.class);
if (d != null) {
Map<String, String> map = d.getEnvironmentMap();
if (map != null) {
if (map.isEmpty()) {
try {
List<String> ids = (List<String>) JSONPath.read(definition, "$..projectId");
if (CollectionUtils.isNotEmpty(ids)) {
idList.addAll(new HashSet<>(ids));
}
} catch (Exception e) {
LogUtil.error("JSONPath.read projectId fail.");
}
} else {
Set<String> set = d.getEnvironmentMap().keySet();
idList = new ArrayList<>(set);
}
} else {
// 兼容历史数据无EnvironmentMap直接赋值场景所属项目
idList.add(data.getProjectId());
}
}
}
data.setProjectIds(idList);
}
/**
* 初始化部分参数
*
@ -995,6 +963,10 @@ public class ApiAutomationService {
if (request.getConfig() != null && StringUtils.isNotEmpty(request.getConfig().getResourcePoolId())) {
List<JvmInfoDTO> testResources = resourcePoolCalculation.getPools(request.getConfig().getResourcePoolId());
request.getConfig().setTestResources(testResources);
String status = nodeKafkaService.createKafkaProducer(request.getConfig());
if ("ERROR".equals(status)) {
MSException.throwException("执行节点的kafka 启动失败,无法执行");
}
}
// 环境检查
this.checkEnv(request, apiScenarios);
@ -1052,9 +1024,9 @@ public class ApiAutomationService {
try {
if (request.getConfig() != null && StringUtils.isNotBlank(request.getConfig().getResourcePoolId())) {
RunModeDataDTO runModeDataDTO = new RunModeDataDTO();
runModeDataDTO.setTestId(item.getId());
runModeDataDTO.setPlanEnvMap(planEnvMap);
runModeDataDTO.setReport(report);
runModeDataDTO.setScenario(item);
executeQueue.put(report.getId(), runModeDataDTO);
} else {
// 生成报告和HashTree
@ -1093,7 +1065,9 @@ public class ApiAutomationService {
MessageCache.cache.put(serialReportId, counter);
}
}
request.getConfig().setAmassReport(serialReportId);
// 检查node的kafka
nodeKafkaService.createKafkaProducer(request.getConfig());
// 开始执行
if (executeQueue != null && executeQueue.size() > 0) {
if (request.getConfig() != null && request.getConfig().getMode().equals(RunModeConstants.SERIAL.toString())) {
@ -1206,7 +1180,7 @@ public class ApiAutomationService {
sqlSession.flushStatements();
for (String reportId : executeQueue.keySet()) {
if (request.getConfig() != null && StringUtils.isNotEmpty(request.getConfig().getResourcePoolId())) {
jMeterService.runTest(executeQueue.get(reportId).getScenario().getId(), reportId, request.getRunMode(), request.getPlanScenarioId(), request.getConfig());
jMeterService.runTest(executeQueue.get(reportId).getTestId(), reportId, request.getRunMode(), request.getPlanScenarioId(), request.getConfig());
} else {
jMeterService.runLocal(reportId, executeQueue.get(reportId).getHashTree(),
TriggerMode.BATCH.name().equals(request.getTriggerMode()) ? TriggerMode.BATCH.name() : request.getReportId(), request.getRunMode());
@ -1454,29 +1428,6 @@ public class ApiAutomationService {
}
}
/**
* 获取前台查询条件查询的所有(未经分页筛选)数据ID
*
* @param moduleIds 模块ID_前台查询时所选择的
* @param name 搜索条件_名称_前台查询时所输入的
* @param projectId 所属项目_前台查询时所在项目
* @param filters 过滤集合__前台查询时的过滤条件
* @param unSelectIds 未勾选ID_前台没有勾选的ID
* @return
*/
private List<String> getAllScenarioIdsByFontedSelect(List<String> moduleIds, String name, String projectId, Map<String, List<String>> filters, List<String> unSelectIds) {
ApiScenarioRequest selectRequest = new ApiScenarioRequest();
selectRequest.setModuleIds(moduleIds);
selectRequest.setName(name);
selectRequest.setProjectId(projectId);
selectRequest.setFilters(filters);
selectRequest.setWorkspaceId(SessionUtils.getCurrentWorkspaceId());
List<ApiScenarioDTO> list = extApiScenarioMapper.list(selectRequest);
List<String> allIds = list.stream().map(ApiScenarioDTO::getId).collect(Collectors.toList());
List<String> ids = allIds.stream().filter(id -> !unSelectIds.contains(id)).collect(Collectors.toList());
return ids;
}
/**
* 场景测试执行
*

View File

@ -0,0 +1,68 @@
package io.metersphere.api.service;
import com.alibaba.fastjson.JSON;
import io.metersphere.api.dto.JvmInfoDTO;
import io.metersphere.api.dto.automation.RunModeConfig;
import io.metersphere.base.domain.TestResource;
import io.metersphere.commons.utils.LogUtil;
import io.metersphere.config.KafkaProperties;
import io.metersphere.dto.NodeDTO;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
@Service
public class NodeKafkaService {
@Resource
private KafkaProperties kafkaProperties;
@Resource
private RestTemplate restTemplate;
@Value("${spring.kafka.producer.properties.max.request.size}")
private String maxRequestSize;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
private static final String BASE_URL = "http://%s:%d";
public String createKafkaProducer(RunModeConfig config) {
Map<String, Object> producerProps = new HashMap<>();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
producerProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxRequestSize);
Map<String, Object> consumerProps = new HashMap<>();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
try {
for (JvmInfoDTO jvmInfoDTO : config.getTestResources()) {
TestResource testResource = jvmInfoDTO.getTestResource();
String configuration = testResource.getConfiguration();
NodeDTO node = JSON.parseObject(configuration, NodeDTO.class);
String nodeIp = node.getIp();
Integer port = node.getPort();
String uri = String.format(BASE_URL + "/producer/create", nodeIp, port);
ResponseEntity<String> result = restTemplate.postForEntity(uri, producerProps, String.class);
if (result.getBody() == null || !"SUCCESS".equals(result.getBody())) {
config.getTestResources().remove(jvmInfoDTO);
}
String cUri = String.format(BASE_URL + "/consumer/create", nodeIp, port);
restTemplate.postForEntity(cUri, consumerProps, void.class);
}
} catch (Exception e) {
LogUtil.error(e.getMessage());
return "ERROR";
}
return "SUCCESS";
}
}

View File

@ -0,0 +1,25 @@
package io.metersphere.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
@Configuration
public class KafkaConfig {
// 执行内容监听
public final static String EXEC_TOPIC = "ms-automation-exec-topic";
//执行结果回传
public static final String TOPICS = "ms-api-exec-topic";
@Bean
public NewTopic apiExecTopic() {
return TopicBuilder.name(TOPICS)
.build();
}
@Bean
public NewTopic automationTopic() {
return TopicBuilder.name(EXEC_TOPIC)
.build();
}
}

View File

@ -7,6 +7,7 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import io.metersphere.api.dto.JvmInfoDTO;
import io.metersphere.api.dto.RunModeDataDTO;
import io.metersphere.api.dto.automation.TestPlanFailureApiDTO;
import io.metersphere.api.dto.definition.ApiTestCaseDTO;
@ -22,21 +23,29 @@ import io.metersphere.api.dto.definition.request.sampler.MsHTTPSamplerProxy;
import io.metersphere.api.dto.definition.request.sampler.MsJDBCSampler;
import io.metersphere.api.dto.definition.request.sampler.MsTCPSampler;
import io.metersphere.api.jmeter.JMeterService;
import io.metersphere.api.jmeter.ResourcePoolCalculation;
import io.metersphere.api.service.ApiDefinitionExecResultService;
import io.metersphere.api.service.ApiTestCaseService;
import io.metersphere.api.service.NodeKafkaService;
import io.metersphere.base.domain.*;
import io.metersphere.base.mapper.ApiDefinitionExecResultMapper;
import io.metersphere.base.mapper.ApiTestCaseMapper;
import io.metersphere.base.mapper.TestPlanApiCaseMapper;
import io.metersphere.base.mapper.TestPlanMapper;
import io.metersphere.base.mapper.ext.ExtTestPlanApiCaseMapper;
import io.metersphere.commons.constants.*;
import io.metersphere.commons.constants.APITestStatus;
import io.metersphere.commons.constants.ApiRunMode;
import io.metersphere.commons.constants.RunModeConstants;
import io.metersphere.commons.constants.TriggerMode;
import io.metersphere.commons.exception.MSException;
import io.metersphere.commons.utils.*;
import io.metersphere.dto.BaseSystemConfigDTO;
import io.metersphere.log.vo.OperatingLogDetails;
import io.metersphere.service.SystemParameterService;
import io.metersphere.track.dto.*;
import io.metersphere.track.dto.PlanReportCaseDTO;
import io.metersphere.track.dto.TestCaseReportStatusResultDTO;
import io.metersphere.track.dto.TestPlanApiResultReportDTO;
import io.metersphere.track.dto.TestPlanSimpleReportDTO;
import io.metersphere.track.request.testcase.TestPlanApiCaseBatchRequest;
import io.metersphere.track.service.task.SerialApiExecTask;
import org.apache.commons.lang3.StringUtils;
@ -81,6 +90,10 @@ public class TestPlanApiCaseService {
private ApiDefinitionExecResultMapper mapper;
@Resource
SqlSessionFactory sqlSessionFactory;
@Resource
private ResourcePoolCalculation resourcePoolCalculation;
@Resource
private NodeKafkaService nodeKafkaService;
public TestPlanApiCase getInfo(String caseId, String testPlanId) {
TestPlanApiCaseExample example = new TestPlanApiCaseExample();
@ -378,6 +391,15 @@ public class TestPlanApiCaseService {
List<TestPlanApiCase> planApiCases = testPlanApiCaseMapper.selectByExample(example);
SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH);
ApiDefinitionExecResultMapper batchMapper = sqlSession.getMapper(ApiDefinitionExecResultMapper.class);
// 资源池
if (request.getConfig() != null && StringUtils.isNotEmpty(request.getConfig().getResourcePoolId())) {
List<JvmInfoDTO> testResources = resourcePoolCalculation.getPools(request.getConfig().getResourcePoolId());
request.getConfig().setTestResources(testResources);
String status = nodeKafkaService.createKafkaProducer(request.getConfig());
if ("ERROR".equals(status)) {
MSException.throwException("执行节点的kafka 启动失败,无法执行");
}
}
// 开始选择执行模式
ExecutorService executorService = Executors.newFixedThreadPool(planApiCases.size());
if (request.getConfig() != null && request.getConfig().getMode().equals(RunModeConstants.SERIAL.toString())) {