refactor(接口测试): 优化超时处理逻辑

https://github.com/metersphere/metersphere/issues/32037
Signed-off-by: fit2-zhao <yong.zhao@fit2cloud.com>
This commit is contained in:
fit2-zhao 2024-07-12 13:42:39 +08:00 committed by Craftsman
parent 39b47825cd
commit 1c6fa21898
1 changed files with 81 additions and 47 deletions

View File

@ -35,7 +35,9 @@ import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import java.util.*; import java.util.*;
import java.util.stream.Collectors;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.toList;
@Service @Service
public class ApiExecutionQueueService { public class ApiExecutionQueueService {
@ -125,7 +127,7 @@ public class ApiExecutionQueueService {
List<String> projectIds = new ArrayList<>(); List<String> projectIds = new ArrayList<>();
// 获取当前所属项目ID用于后续区分资源隔离 // 获取当前所属项目ID用于后续区分资源隔离
if (MapUtils.isNotEmpty(v.getPlanEnvMap())) { if (MapUtils.isNotEmpty(v.getPlanEnvMap())) {
List<String> kyList = v.getPlanEnvMap().keySet().stream().collect(Collectors.toList()); List<String> kyList = v.getPlanEnvMap().keySet().stream().toList();
projectIds.addAll(kyList); projectIds.addAll(kyList);
} else { } else {
projectIds.add(v.getReport().getProjectId()); projectIds.add(v.getReport().getProjectId());
@ -219,8 +221,8 @@ public class ApiExecutionQueueService {
checkTestPlanCaseTestEnd(dto.getTestId(), dto.getRunMode(), dto.getTestPlanReportId()); checkTestPlanCaseTestEnd(dto.getTestId(), dto.getRunMode(), dto.getTestPlanReportId());
// 更新未执行的报告状态 // 更新未执行的报告状态
List<ApiExecutionQueueDetail> details = executionQueueDetailMapper.selectByExample(example); List<ApiExecutionQueueDetail> details = executionQueueDetailMapper.selectByExample(example);
List<String> reportIds = details.stream().map(ApiExecutionQueueDetail::getReportId).collect(Collectors.toList()); List<String> reportIds = details.stream().map(ApiExecutionQueueDetail::getReportId).collect(toList());
List<String> testIds = details.stream().map(ApiExecutionQueueDetail::getTestId).collect(Collectors.toList()); List<String> testIds = details.stream().map(ApiExecutionQueueDetail::getTestId).collect(toList());
if (CollectionUtils.isNotEmpty(reportIds)) { if (CollectionUtils.isNotEmpty(reportIds)) {
extApiDefinitionExecResultMapper.update(reportIds); extApiDefinitionExecResultMapper.update(reportIds);
extApiScenarioReportMapper.update(reportIds); extApiScenarioReportMapper.update(reportIds);
@ -257,7 +259,7 @@ public class ApiExecutionQueueService {
// 处理掉当前已经执行完成的资源 // 处理掉当前已经执行完成的资源
List<ApiExecutionQueueDetail> completedQueues = queues.stream() List<ApiExecutionQueueDetail> completedQueues = queues.stream()
.filter(item -> StringUtils.equals(item.getTestId(), testId)) .filter(item -> StringUtils.equals(item.getTestId(), testId))
.collect(Collectors.toList()); .collect(toList());
if (CollectionUtils.isNotEmpty(completedQueues)) { if (CollectionUtils.isNotEmpty(completedQueues)) {
ApiExecutionQueueDetail completed = completedQueues.get(0); ApiExecutionQueueDetail completed = completedQueues.get(0);
queue.setCompletedReportId(completed.getReportId()); queue.setCompletedReportId(completed.getReportId());
@ -300,7 +302,7 @@ public class ApiExecutionQueueService {
LoggerUtil.info("Normal execution completes, update test plan report status" + testPlanReportId); LoggerUtil.info("Normal execution completes, update test plan report status" + testPlanReportId);
kafkaTemplate.send(KafkaTopicConstants.TEST_PLAN_REPORT_TOPIC, testPlanReportId); kafkaTemplate.send(KafkaTopicConstants.TEST_PLAN_REPORT_TOPIC, testPlanReportId);
} else { } else {
List<String> ids = queues.stream().map(ApiExecutionQueue::getId).collect(Collectors.toList()); List<String> ids = queues.stream().map(ApiExecutionQueue::getId).collect(toList());
ApiExecutionQueueDetailExample detailExample = new ApiExecutionQueueDetailExample(); ApiExecutionQueueDetailExample detailExample = new ApiExecutionQueueDetailExample();
detailExample.createCriteria().andQueueIdIn(ids); detailExample.createCriteria().andQueueIdIn(ids);
long count = apiExecutionQueueDetailMapper.countByExample(detailExample); long count = apiExecutionQueueDetailMapper.countByExample(detailExample);
@ -371,17 +373,12 @@ public class ApiExecutionQueueService {
} }
} }
public void defendQueue() {
final int SECOND_MILLIS = 1000;
final int MINUTE_MILLIS = 60 * SECOND_MILLIS;
// 计算一小时前的超时报告
final long timeout = System.currentTimeMillis() - (60 * MINUTE_MILLIS);
ApiExecutionQueueDetailExample example = new ApiExecutionQueueDetailExample();
example.createCriteria().andCreateTimeLessThan(timeout).andTypeNotEqualTo("loadTest");
List<ApiExecutionQueueDetail> queueDetails = executionQueueDetailMapper.selectByExample(example);
private void subitemHandling(List<ApiExecutionQueueDetail> queueDetails, ApiExecutionQueue queue, long timeout) {
for (ApiExecutionQueueDetail item : queueDetails) { for (ApiExecutionQueueDetail item : queueDetails) {
ApiExecutionQueue queue = queueMapper.selectByPrimaryKey(item.getQueueId()); if (queue == null) {
queue = queueMapper.selectByPrimaryKey(item.getQueueId());
}
if (queue == null) { if (queue == null) {
executionQueueDetailMapper.deleteByPrimaryKey(item.getId()); executionQueueDetailMapper.deleteByPrimaryKey(item.getId());
continue; continue;
@ -399,59 +396,96 @@ public class ApiExecutionQueueService {
ResultDTO dto = new ResultDTO(); ResultDTO dto = new ResultDTO();
dto.setQueueId(item.getQueueId()); dto.setQueueId(item.getQueueId());
dto.setTestId(item.getTestId()); dto.setTestId(item.getTestId());
if (StringUtils.equalsAnyIgnoreCase(queue.getRunMode(), ApiRunMode.SCENARIO.name(), ApiRunMode.SCENARIO_PLAN.name(), ApiRunMode.SCHEDULE_SCENARIO_PLAN.name(), ApiRunMode.SCHEDULE_SCENARIO.name(), ApiRunMode.JENKINS_SCENARIO_PLAN.name())) {
if (StringUtils.equalsAnyIgnoreCase(queue.getRunMode(),
ApiRunMode.SCENARIO.name(),
ApiRunMode.SCENARIO_PLAN.name(),
ApiRunMode.SCHEDULE_SCENARIO_PLAN.name(),
ApiRunMode.SCHEDULE_SCENARIO.name(),
ApiRunMode.JENKINS_SCENARIO_PLAN.name())) {
ApiScenarioReportWithBLOBs report = apiScenarioReportMapper.selectByPrimaryKey(item.getReportId()); ApiScenarioReportWithBLOBs report = apiScenarioReportMapper.selectByPrimaryKey(item.getReportId());
// 报告已经被删除则队列也删除 // 报告已经被删除则队列也删除
if (report == null) { if (report == null) {
executionQueueDetailMapper.deleteByPrimaryKey(item.getId()); executionQueueDetailMapper.deleteByPrimaryKey(item.getId());
} }
// 这里只处理已经开始执行的队列如果 报告状态是 Waiting 表示还没开始暂不处理 // 这里只处理已经开始执行的队列如果 报告状态是 Waiting 表示还没开始暂不处理
if (report != null && StringUtils.equalsAnyIgnoreCase(report.getStatus(), TestPlanReportStatus.RUNNING.name()) && report.getUpdateTime() < timeout) { if (report != null && StringUtils.equalsAnyIgnoreCase(report.getStatus(), ApiReportStatus.RUNNING.name()) && report.getUpdateTime() < timeout) {
report.setStatus(ApiReportStatus.ERROR.name()); report.setStatus(ApiReportStatus.ERROR.name());
apiScenarioReportMapper.updateByPrimaryKeySelective(report); apiScenarioReportMapper.updateByPrimaryKeySelective(report);
LoggerUtil.info("超时处理报告:" + report.getId());
if (StringUtils.equalsIgnoreCase(item.getType(), RunModeConstants.SERIAL.toString())) { LoggerUtil.info("超时场景报告处理:{}", report.getId());
LoggerUtil.info("超时处理报告:【" + report.getId() + "】进入下一个执行"); integrateData(queue, item, dto);
dto.setTestPlanReportId(queue.getReportId());
dto.setReportId(queue.getReportId());
dto.setRunMode(queue.getRunMode());
dto.setRunType(item.getType());
dto.setReportType(queue.getReportType());
queueNext(dto);
} else {
executionQueueDetailMapper.deleteByPrimaryKey(item.getId());
}
} }
} else { } else {
// 用例/接口超时结果处理 // 用例/接口超时结果处理
ApiDefinitionExecResultWithBLOBs result = apiDefinitionExecResultMapper.selectByPrimaryKey(item.getReportId()); ApiDefinitionExecResultWithBLOBs result = apiDefinitionExecResultMapper.selectByPrimaryKey(item.getReportId());
if (result != null && StringUtils.equalsAnyIgnoreCase(result.getStatus(), TestPlanReportStatus.RUNNING.name())) { if (result != null && StringUtils.equalsAnyIgnoreCase(result.getStatus(), ApiReportStatus.RUNNING.name())) {
result.setStatus(ApiReportStatus.ERROR.name()); result.setStatus(ApiReportStatus.ERROR.name());
apiDefinitionExecResultMapper.updateByPrimaryKeySelective(result); apiDefinitionExecResultMapper.updateByPrimaryKeySelective(result);
executionQueueDetailMapper.deleteByPrimaryKey(item.getId());
LoggerUtil.info("超时API报告处理{}", item.getReportId());
integrateData(queue, item, dto);
}
}
}
}
private void integrateData(ApiExecutionQueue queue, ApiExecutionQueueDetail item, ResultDTO dto) {
if (StringUtils.equalsIgnoreCase(item.getType(), RunModeConstants.SERIAL.toString())) {
dto.setTestPlanReportId(queue.getReportId()); dto.setTestPlanReportId(queue.getReportId());
dto.setReportId(queue.getReportId()); dto.setReportId(queue.getReportId());
dto.setRunMode(queue.getRunMode()); dto.setRunMode(queue.getRunMode());
dto.setRunType(item.getType()); dto.setRunType(item.getType());
dto.setReportType(queue.getReportType()); dto.setReportType(queue.getReportType());
queueNext(dto); queueNext(dto);
} else {
executionQueueDetailMapper.deleteByPrimaryKey(item.getId());
} }
} }
}
public void defendQueue() {
// 计算一小时前的超时报告
final long timeout = System.currentTimeMillis() - 60 * 60 * 1000;
// 集成报告超时处理 // 集成报告超时处理
ApiExecutionQueueExample queueDetailExample = new ApiExecutionQueueExample(); ApiExecutionQueueExample queueExample = new ApiExecutionQueueExample();
queueDetailExample.createCriteria().andReportTypeEqualTo(RunModeConstants.SET_REPORT.toString()).andCreateTimeLessThan(timeout); queueExample.createCriteria()
List<ApiExecutionQueue> executionQueues = queueMapper.selectByExample(queueDetailExample); .andReportTypeEqualTo(RunModeConstants.SET_REPORT.toString())
.andCreateTimeLessThan(timeout);
List<ApiExecutionQueue> executionQueues = queueMapper.selectByExample(queueExample);
if (CollectionUtils.isNotEmpty(executionQueues)) { if (CollectionUtils.isNotEmpty(executionQueues)) {
executionQueues.forEach(item -> { List<String> queueIds = executionQueues.stream()
ApiScenarioReportWithBLOBs report = apiScenarioReportMapper.selectByPrimaryKey(item.getReportId()); .map(ApiExecutionQueue::getId)
if (report != null && StringUtils.equalsAnyIgnoreCase(report.getStatus(), .collect(toList());
TestPlanReportStatus.RUNNING.name(), ApiReportStatus.PENDING.name()) && (report.getUpdateTime() < timeout)) {
report.setStatus(ApiReportStatus.ERROR.name()); ApiExecutionQueueDetailExample detailExample = new ApiExecutionQueueDetailExample();
apiScenarioReportMapper.updateByPrimaryKeySelective(report); detailExample.createCriteria().andQueueIdIn(queueIds);
Map<String, List<ApiExecutionQueueDetail>> detailMap = executionQueueDetailMapper.selectByExample(detailExample)
.stream()
.collect(groupingBy(ApiExecutionQueueDetail::getQueueId));
executionQueues.forEach(queue -> {
List<ApiExecutionQueueDetail> details = detailMap.get(queue.getId());
if (CollectionUtils.isNotEmpty(details)) {
subitemHandling(details, queue, timeout);
} }
}); });
} }
// 独立报告超时处理
ApiExecutionQueueDetailExample example = new ApiExecutionQueueDetailExample();
example.createCriteria().andCreateTimeLessThan(timeout).andTypeNotEqualTo("loadTest");
List<ApiExecutionQueueDetail> queueDetails = executionQueueDetailMapper.selectByExample(example);
if (CollectionUtils.isNotEmpty(queueDetails)) {
subitemHandling(queueDetails, null, timeout);
}
// 处理测试计划报告 // 处理测试计划报告
List<ApiExecutionQueue> queues = extApiExecutionQueueMapper.findTestPlanReportQueue(); List<ApiExecutionQueue> queues = extApiExecutionQueueMapper.findTestPlanReportQueue();
if (CollectionUtils.isNotEmpty(queues)) { if (CollectionUtils.isNotEmpty(queues)) {
@ -549,7 +583,7 @@ public class ApiExecutionQueueService {
queueExample.createCriteria().andRunModeIn(apiModes); queueExample.createCriteria().andRunModeIn(apiModes);
List<ApiExecutionQueue> queues = apiExecutionQueueMapper.selectByExample(queueExample); List<ApiExecutionQueue> queues = apiExecutionQueueMapper.selectByExample(queueExample);
if (CollectionUtils.isNotEmpty(queues)) { if (CollectionUtils.isNotEmpty(queues)) {
List<String> ids = queues.stream().map(ApiExecutionQueue::getId).collect(Collectors.toList()); List<String> ids = queues.stream().map(ApiExecutionQueue::getId).collect(toList());
ApiExecutionQueueDetailExample queueDetailExample = new ApiExecutionQueueDetailExample(); ApiExecutionQueueDetailExample queueDetailExample = new ApiExecutionQueueDetailExample();
queueDetailExample.createCriteria().andQueueIdIn(ids); queueDetailExample.createCriteria().andQueueIdIn(ids);
apiExecutionQueueDetailMapper.deleteByExample(queueDetailExample); apiExecutionQueueDetailMapper.deleteByExample(queueDetailExample);