fix(接口测试): 修复集合报告停止问题

【【接口测试】github#34563, 批量执行自动化场景无法完全停止】
https://www.tapd.cn/55049933/bugtrace/bugs/view/1155049933001050141
This commit is contained in:
fit2-zhao 2024-12-10 15:09:48 +08:00 committed by Craftsman
parent a47f7440e1
commit 6aece4b8c5
2 changed files with 86 additions and 53 deletions

View File

@ -507,6 +507,23 @@ public class ApiExecutionQueueService {
}
}
public void clearSetReportQueue(String reportId) {
ApiExecutionQueueExample example = new ApiExecutionQueueExample();
example.createCriteria()
.andReportIdEqualTo(reportId)
.andReportTypeEqualTo(RunModeConstants.SET_REPORT.toString());
List<ApiExecutionQueue> queues = apiExecutionQueueMapper.selectByExample(example);
queues.forEach(queue -> {
apiExecutionQueueMapper.deleteByPrimaryKey(queue.getId());
ApiExecutionQueueDetailExample queueDetailExample = new ApiExecutionQueueDetailExample();
queueDetailExample.createCriteria().andQueueIdEqualTo(queue.getId());
executionQueueDetailMapper.deleteByExample(queueDetailExample);
});
}
public void stop(String reportId) {
ApiExecutionQueueDetailExample example = new ApiExecutionQueueDetailExample();
example.createCriteria().andReportIdEqualTo(reportId);

View File

@ -34,6 +34,8 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
@Service
@ -95,15 +97,42 @@ public class ExtApiTaskService extends TaskService {
public void apiStop(List<TaskRequestDTO> taskRequests) {
if (CollectionUtils.isNotEmpty(taskRequests)) {
List<TaskRequestDTO> stopTasks = taskRequests.stream().filter(s -> StringUtils.isNotEmpty(s.getReportId())).collect(Collectors.toList());
List<TaskRequestDTO> stopTasks = taskRequests.stream()
.filter(s -> StringUtils.isNotEmpty(s.getReportId()))
.toList();
// 聚类同一批资源池的一批发送
Map<String, List<String>> poolMap = new HashMap<>();
// 单条停止
if (CollectionUtils.isNotEmpty(stopTasks) && stopTasks.size() == 1) {
// 从队列移除
TaskRequestDTO request = stopTasks.get(0);
apiExecutionQueueService.stop(request.getReportId());
if (stopTasks.size() == 1) {
stopSingleTask(stopTasks.get(0), poolMap);
} else {
// 使用线程池管理线程
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(() -> batchStop(taskRequests));
executorService.shutdown();
}
// 开始结束资源池中执行的任务
if (MapUtils.isNotEmpty(poolMap)) {
this.send(poolMap);
}
}
}
private void stopSingleTask(TaskRequestDTO request, Map<String, List<String>> poolMap) {
// 集合报告停止则清理队列
apiExecutionQueueService.clearSetReportQueue(request.getReportId());
if (StringUtils.equals(request.getType(), "API")) {
stopApiTask(request, poolMap);
} else if (StringUtils.equals(request.getType(), ElementConstants.SCENARIO_UPPER)) {
stopScenarioTask(request, poolMap);
}
}
private void stopApiTask(TaskRequestDTO request, Map<String, List<String>> poolMap) {
ApiDefinitionExecResultWithBLOBs result = apiDefinitionExecResultMapper.selectByPrimaryKey(request.getReportId());
if (result != null) {
result.setStatus(ApiReportStatus.STOPPED.name());
@ -111,7 +140,8 @@ public class ExtApiTaskService extends TaskService {
extracted(poolMap, request.getReportId(), result.getActuator());
}
}
if (StringUtils.equals(request.getType(), ElementConstants.SCENARIO_UPPER)) {
private void stopScenarioTask(TaskRequestDTO request, Map<String, List<String>> poolMap) {
ApiScenarioReportWithBLOBs report = apiScenarioReportMapper.selectByPrimaryKey(request.getReportId());
if (report != null) {
report.setStatus(ApiReportStatus.STOPPED.name());
@ -119,24 +149,22 @@ public class ExtApiTaskService extends TaskService {
extracted(poolMap, request.getReportId(), report.getActuator());
}
}
// 开始结束资源池中执行的任务
if (MapUtils.isNotEmpty(poolMap)) {
this.send(poolMap);
}
} else {
Thread thread = new Thread(() -> {
this.batchStop(taskRequests);
});
thread.start();
}
private void extracted(Map<String, List<String>> poolMap, String reportId, String actuator) {
if (StringUtils.isNotEmpty(reportId) && StringUtils.isNotEmpty(actuator) && !StringUtils.equals(actuator, StorageEnums.LOCAL.name())) {
poolMap.computeIfAbsent(actuator, k -> new ArrayList<>()).add(reportId);
}
}
private void batchStop(List<TaskRequestDTO> taskRequests) {
LoggerUtil.info("进入批量停止方法");
Map<String, List<String>> poolMap = new HashMap<>();
// 全部停止
Map<String, TaskRequestDTO> taskRequestMap = taskRequests.stream().collect(Collectors.toMap(TaskRequestDTO::getType, taskRequest -> taskRequest));
Map<String, TaskRequestDTO> taskRequestMap = taskRequests.stream()
.collect(Collectors.toMap(TaskRequestDTO::getType, taskRequest -> taskRequest));
// 获取工作空间项目
LoggerUtil.info("获取工作空间对应的项目");
TaskRequestDTO taskRequest = taskRequestMap.get(ElementConstants.SCENARIO_UPPER);
@ -148,60 +176,48 @@ public class ExtApiTaskService extends TaskService {
// 结束掉未分发完成的任务
LoggerUtil.info("结束正在进行中的计划任务队列");
if (taskRequestMap.containsKey("API")) {
List<TaskResultVO> results = extApiDefinitionExecResultMapper.findByProjectIds(taskCenterRequest);
LoggerUtil.info("查询API进行中的报告" + results.size());
if (CollectionUtils.isNotEmpty(results)) {
for (TaskResultVO item : results) {
extracted(poolMap, item.getId(), item.getActuator());
}
results.forEach(item -> extracted(poolMap, item.getId(), item.getActuator()));
LoggerUtil.info("结束API进行中的报告");
baseTaskMapper.stopApi(taskCenterRequest);
// 清理队列并停止测试计划报告
LoggerUtil.info("清理API执行链");
List<String> ids = results.stream().map(TaskResultVO::getId).collect(Collectors.toList());
apiExecutionQueueService.stop(ids);
}
}
if (taskRequestMap.containsKey(ElementConstants.SCENARIO_UPPER)) {
List<TaskResultVO> reports = extApiScenarioReportMapper.findByProjectIds(taskCenterRequest);
LoggerUtil.info("查询到执行中的场景报告:" + reports.size());
if (CollectionUtils.isNotEmpty(reports)) {
for (TaskResultVO report : reports) {
extracted(poolMap, report.getId(), report.getActuator());
}
if (CollectionUtils.isNotEmpty(reports)) {
reports.forEach(report -> extracted(poolMap, report.getId(), report.getActuator()));
// 清理队列并停止测试计划报告
LoggerUtil.info("结束所有进行中的场景报告 ");
List<String> ids = reports.stream().map(TaskResultVO::getId).collect(Collectors.toList());
baseTaskMapper.stopScenario(taskCenterRequest);
// 清理队列并停止测试计划报告
LoggerUtil.info("清理队列并停止测试计划报告 ");
apiExecutionQueueService.stop(ids);
}
}
// 开始结束资源池中执行的任务
if (MapUtils.isNotEmpty(poolMap)) {
this.send(poolMap);
}
}
private void extracted(Map<String, List<String>> poolMap, String reportId, String actuator) {
if (StringUtils.isEmpty(reportId)) {
return;
}
if (StringUtils.isNotEmpty(actuator) && !StringUtils.equals(actuator, StorageEnums.LOCAL.name())) {
if (poolMap.containsKey(actuator)) {
poolMap.get(actuator).add(reportId);
} else {
poolMap.put(actuator, new ArrayList<String>() {{
this.add(reportId);
}});
}
}
}
private List<TestResource> selectPoolResource(String poolId) {
TestResourcePoolExample example = new TestResourcePoolExample();
example.createCriteria().andStatusEqualTo("VALID").andTypeEqualTo("NODE").andIdEqualTo(poolId);