refactor(接口测试): 重构场景并发执行任务下发策略

This commit is contained in:
fit2-zhao 2024-05-31 14:55:53 +08:00 committed by Craftsman
parent 6308e2a5a8
commit c2d267596b
5 changed files with 211 additions and 38 deletions

View File

@ -176,9 +176,13 @@ public class ApiExecuteService {
throw new MSException(RESOURCE_POOL_EXECUTE_ERROR, e.getMessage());
} catch (MSException e) {
handleDoExecuteException(scriptRedisKey, e);
// 集合报告对应的资源池集合移除
removeCollectionReport(taskRequest);
throw e;
} catch (Exception e) {
handleDoExecuteException(scriptRedisKey, e);
// 集合报告对应的资源池集合移除
removeCollectionReport(taskRequest);
throw new MSException(RESOURCE_POOL_EXECUTE_ERROR, e.getMessage());
}
}
@ -189,6 +193,14 @@ public class ApiExecuteService {
stringRedisTemplate.delete(scriptRedisKey);
}
private void removeCollectionReport(TaskRequestDTO taskRequest) {
// 集合报告对应的资源池集合移除
if (taskRequest.getRunModeConfig().getIntegratedReport()) {
String SET_PREFIX = "set:" + taskRequest.getRunModeConfig().getCollectionReport().getReportId();
stringRedisTemplate.opsForSet().remove(SET_PREFIX, taskRequest.getResourceId());
}
}
private GlobalParams getGlobalParam(String projectId) {
GlobalParamsDTO globalParamsDTO = globalParamsService.get(projectId);
if (globalParamsDTO != null) {
@ -205,7 +217,11 @@ public class ApiExecuteService {
private TaskRequestDTO doExecute(TaskRequestDTO taskRequest) throws Exception {
// 获取资源池
TestResourcePoolReturnDTO testResourcePoolDTO = getGetResourcePoolNodeDTO(taskRequest.getRunModeConfig(), taskRequest.getProjectId());
if (testResourcePoolDTO == null || CollectionUtils.isEmpty(testResourcePoolDTO.getTestResourceReturnDTO().getNodesList())) {
throw new MSException(ApiResultCode.EXECUTE_RESOURCE_POOL_NOT_CONFIG);
}
TestResourceNodeDTO testResourceNodeDTO = getProjectExecuteNode(testResourcePoolDTO);
if (StringUtils.isNotBlank(testResourcePoolDTO.getServerUrl())) {
// 如果资源池配置了当前站点则使用资源池的
taskRequest.setMsUrl(testResourcePoolDTO.getServerUrl());
@ -231,14 +247,18 @@ public class ApiExecuteService {
private TestResourceNodeDTO getProjectExecuteNode(TestResourcePoolReturnDTO resourcePoolDTO) {
roundRobinService.initializeNodes(resourcePoolDTO.getId(), resourcePoolDTO.getTestResourceReturnDTO().getNodesList());
try {
return roundRobinService.getNextNode(resourcePoolDTO.getId());
TestResourceNodeDTO node = roundRobinService.getNextNode(resourcePoolDTO.getId());
if (node == null) {
node = resourcePoolDTO.getTestResourceReturnDTO().getNodesList().getFirst();
}
return node;
} catch (Exception e) {
LogUtils.error(e);
throw new MSException("get execute node error", e);
}
}
private TestResourcePoolReturnDTO getGetResourcePoolNodeDTO(ApiRunModeConfigDTO runModeConfig, String projectId) {
public TestResourcePoolReturnDTO getGetResourcePoolNodeDTO(ApiRunModeConfigDTO runModeConfig, String projectId) {
String poolId = runModeConfig.getPoolId();
if (StringUtils.isBlank(poolId)) {
poolId = getProjectApiResourcePoolId(projectId);

View File

@ -1,6 +1,9 @@
package io.metersphere.api.service.scenario;
import io.metersphere.api.domain.*;
import io.metersphere.api.domain.ApiScenario;
import io.metersphere.api.domain.ApiScenarioRecord;
import io.metersphere.api.domain.ApiScenarioReport;
import io.metersphere.api.domain.ApiScenarioReportStep;
import io.metersphere.api.dto.ApiScenarioParamConfig;
import io.metersphere.api.dto.ApiScenarioParseTmpParam;
import io.metersphere.api.dto.debug.ApiResourceRunRequest;
@ -9,11 +12,14 @@ import io.metersphere.api.dto.scenario.ApiScenarioBatchRunRequest;
import io.metersphere.api.dto.scenario.ApiScenarioDetail;
import io.metersphere.api.dto.scenario.ApiScenarioParseParam;
import io.metersphere.api.dto.scenario.ApiScenarioStepDTO;
import io.metersphere.api.mapper.*;
import io.metersphere.api.mapper.ApiScenarioReportMapper;
import io.metersphere.api.mapper.ExtApiScenarioMapper;
import io.metersphere.api.service.ApiBatchRunBaseService;
import io.metersphere.api.service.ApiExecuteService;
import io.metersphere.api.service.queue.ApiExecutionQueueService;
import io.metersphere.api.service.queue.ApiExecutionSetService;
import io.metersphere.api.utils.ExecTask;
import io.metersphere.api.utils.TaskRunnerUtils;
import io.metersphere.sdk.constants.*;
import io.metersphere.sdk.dto.api.task.ApiRunModeConfigDTO;
import io.metersphere.sdk.dto.api.task.CollectionReportDTO;
@ -24,11 +30,13 @@ import io.metersphere.sdk.util.BeanUtils;
import io.metersphere.sdk.util.DateUtils;
import io.metersphere.sdk.util.LogUtils;
import io.metersphere.sdk.util.SubListUtils;
import io.metersphere.system.dto.pool.TestResourcePoolReturnDTO;
import io.metersphere.system.uid.IDGenerator;
import jakarta.annotation.Resource;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@ -61,6 +69,9 @@ public class ApiScenarioBatchRunService {
@Resource
private ExtApiScenarioMapper extApiScenarioMapper;
@Value("${spring.datasource.hikari.maximum-pool-size}")
private int maximumPoolSize;
/**
* 异步批量执行
*
@ -137,39 +148,29 @@ public class ApiScenarioBatchRunService {
// 集成报告执行前先设置成 RUNNING
setRunningIntegrateReport(runModeConfig);
AtomicInteger errorCount = new AtomicInteger();
// 这里ID顺序和队列的ID顺序保持一致
for (String id : ids) {
TestResourcePoolReturnDTO testResourcePoolDTO = apiExecuteService.getGetResourcePoolNodeDTO(runModeConfig, request.getProjectId());
List<ApiScenarioDetail> apiScenarioDetails = apiScenarioService.getForRuns(ids);
String reportId = null;
try {
ApiScenarioDetail apiScenarioDetail = apiScenarioService.getForRun(id);
if (apiScenarioDetail == null) {
if (runModeConfig.isIntegratedReport()) {
// 用例不存在则在执行集合中删除
apiExecutionSetService.removeItem(runModeConfig.getCollectionReport().getReportId(), id);
}
LogUtils.info("当前执行任务的用例已删除 {}", id);
continue;
}
if (StringUtils.isNotBlank(testResourcePoolDTO.getServerUrl())) {
// 独立部署执行专属服务,线程池执行
TaskRunnerUtils.setThreadPoolSize(maximumPoolSize / 2 - 10);
apiScenarioDetails.forEach(apiScenarioDetail -> {
ExecTask execTask = new ExecTask(this, apiScenarioDetail, scenarioReportMap, runModeConfig);
TaskRunnerUtils.executeThreadPool(execTask);
});
} else {
// 未独立部署执行专属引用则使用默认循环分发任务
apiScenarioDetails.forEach(apiScenarioDetail -> execute(apiScenarioDetail, scenarioReportMap, runModeConfig));
}
}
if (runModeConfig.isIntegratedReport()) {
// 集成报告生成虚拟的报告ID
reportId = IDGenerator.nextStr();
} else {
reportId = scenarioReportMap.get(id);
}
TaskRequestDTO taskRequest = getTaskRequestDTO(reportId, apiScenarioDetail, runModeConfig);
execute(taskRequest, apiScenarioDetail);
} catch (Exception e) {
LogUtils.error("执行用例失败 {}-{}", reportId, id);
LogUtils.error(e);
if (errorCount.getAndIncrement() > 10) {
LogUtils.error("批量执行用例失败错误次数超过10次停止执行");
return;
}
}
public void execute(ApiScenarioDetail apiScenarioDetail, Map<String, String> scenarioReportMap, ApiRunModeConfigDTO runModeConfig) {
try {
String reportId = runModeConfig.isIntegratedReport() ? IDGenerator.nextStr() : scenarioReportMap.get(apiScenarioDetail.getId());
TaskRequestDTO taskRequest = getTaskRequestDTO(reportId, apiScenarioDetail, runModeConfig);
execute(taskRequest, apiScenarioDetail);
} catch (Exception e) {
LogUtils.error("执行用例失败 {}", apiScenarioDetail.getId(), e);
}
}
@ -422,7 +423,7 @@ public class ApiScenarioBatchRunService {
if (queueDetail == null) {
return;
}
Long requestCount = 0L;
long requestCount = 0L;
while (queueDetail != null) {
ApiScenarioDetail apiScenarioDetail = apiScenarioService.getForRun(queueDetail.getResourceId());
if (apiScenarioDetail == null) {

View File

@ -561,7 +561,7 @@ public class ApiScenarioService extends MoveNodeService {
}
private List<CsvVariable> getCsvVariables(ScenarioConfig scenarioConfig) {
if (scenarioConfig == null ||scenarioConfig.getVariable() == null || scenarioConfig.getVariable().getCsvVariables() == null) {
if (scenarioConfig == null || scenarioConfig.getVariable() == null || scenarioConfig.getVariable().getCsvVariables() == null) {
return List.of();
}
return scenarioConfig.getVariable().getCsvVariables();
@ -2187,6 +2187,12 @@ public class ApiScenarioService extends MoveNodeService {
return apiScenarioDetail;
}
public List<ApiScenarioDetail> getForRuns(List<String> scenarioIds) {
List<ApiScenarioDetail> apiScenarioDetails = list(scenarioIds);
apiScenarioDetails.forEach(apiScenarioDetail -> apiScenarioDetail.setSteps(filerDisableSteps(apiScenarioDetail.getSteps())));
return apiScenarioDetails;
}
/**
* 过滤掉禁用的步骤
*/
@ -2258,9 +2264,80 @@ public class ApiScenarioService extends MoveNodeService {
return apiScenarioDetail;
}
public List<ApiScenarioDetail> list(List<String> scenarioIds) {
List<ApiScenarioDetail> list = new LinkedList<>();
ApiScenarioExample example = new ApiScenarioExample();
example.createCriteria().andIdIn(scenarioIds).andDeletedEqualTo(false);
List<ApiScenario> apiScenarios = apiScenarioMapper.selectByExample(example);
ApiScenarioBlobExample blobExample = new ApiScenarioBlobExample();
blobExample.createCriteria().andIdIn(scenarioIds);
List<ApiScenarioBlob> apiScenarioBlobs = apiScenarioBlobMapper.selectByExampleWithBLOBs(blobExample);
Map<String, ApiScenarioBlob> scenarioMap = apiScenarioBlobs.stream()
.collect(Collectors.toMap(ApiScenarioBlob::getId, item -> item));
apiScenarios.forEach(apiScenario -> {
ApiScenarioDetail apiScenarioDetail = BeanUtils.copyBean(new ApiScenarioDetail(), apiScenario);
apiScenarioDetail.setSteps(List.of());
ApiScenarioBlob apiScenarioBlob = scenarioMap.get(apiScenario.getId());
if (apiScenarioBlob != null) {
apiScenarioDetail.setScenarioConfig(JSON.parseObject(new String(apiScenarioBlob.getConfig()), ScenarioConfig.class));
}
//存放csv变量
apiScenarioDetail.getScenarioConfig().getVariable().setCsvVariables(getCsvVariables(apiScenario.getId()));
// 获取所有步骤
List<ApiScenarioStepDTO> allSteps = getAllStepsByScenarioIds(List.of(apiScenario.getId()))
.stream()
.distinct() // 这里可能存在多次引用相同场景步骤可能会重复去重
.collect(Collectors.toList());
// 设置步骤的 csvIds
setStepCsvIds(apiScenario.getId(), allSteps);
// 构造 mapkey 为场景IDvalue 为步骤列表
Map<String, List<ApiScenarioStepDTO>> scenarioStepMap = allSteps.stream()
.collect(Collectors.groupingBy(step -> Optional.ofNullable(step.getScenarioId()).orElse(StringUtils.EMPTY)));
// key 为父步骤IDvalue 为子步骤列表
if (MapUtils.isEmpty(scenarioStepMap)) {
list.add(apiScenarioDetail);
return;
}
Map<String, List<ApiScenarioStepDTO>> currentScenarioParentStepMap = scenarioStepMap.get(apiScenario.getId())
.stream()
.collect(Collectors.groupingBy(step -> {
if (StringUtils.equals(step.getParentId(), "NONE")) {
step.setParentId(StringUtils.EMPTY);
}
return Optional.ofNullable(step.getParentId()).orElse(StringUtils.EMPTY);
}));
List<ApiScenarioStepDTO> steps = buildStepTree(currentScenarioParentStepMap.get(StringUtils.EMPTY), currentScenarioParentStepMap, scenarioStepMap, new HashSet<>());
// 查询步骤详情
Map<String, String> stepDetailMap = getPartialRefStepDetailMap(allSteps);
// 设置部分引用的步骤的启用状态
setPartialRefStepsEnable(steps, stepDetailMap);
apiScenarioDetail.setSteps(steps);
list.add(apiScenarioDetail);
});
return list;
}
private void setStepCsvIds(String scenarioId, List<ApiScenarioStepDTO> allSteps) {
List<String> refScenarioIds = allSteps.stream()
.filter(step -> isRefOrPartialScenario(step))
.filter(this::isRefOrPartialScenario)
.map(ApiScenarioStepCommonDTO::getResourceId)
.collect(Collectors.toList());
refScenarioIds.add(scenarioId);

View File

@ -0,0 +1,23 @@
package io.metersphere.api.utils;
import io.metersphere.api.dto.scenario.ApiScenarioDetail;
import io.metersphere.api.service.scenario.ApiScenarioBatchRunService;
import io.metersphere.sdk.dto.api.task.ApiRunModeConfigDTO;
import lombok.AllArgsConstructor;
import lombok.Data;
import java.util.Map;
@Data
@AllArgsConstructor
public class ExecTask implements Runnable {
private ApiScenarioBatchRunService apiScenarioBatchRunService;
private ApiScenarioDetail detail;
private Map<String, String> scenarioReportMap;
private ApiRunModeConfigDTO runModeConfig;
@Override
public void run() {
apiScenarioBatchRunService.execute(detail, scenarioReportMap, runModeConfig);
}
}

View File

@ -0,0 +1,52 @@
package io.metersphere.api.utils;
import io.metersphere.sdk.util.LogUtils;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TaskRunnerUtils {
// 线程池维护线程的最大数量
private final static int MAX_POOL_SIZE = 10;
// 线程池维护线程所允许的空闲时间
private final static int KEEP_ALIVE_TIME = 1;
// 线程池所使用的缓冲队列大小
private final static int WORK_QUEUE_SIZE = 50000;
private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
MAX_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(WORK_QUEUE_SIZE));
public static void executeThreadPool(ExecTask task) {
try {
// 开始执行任务
threadPool.execute(task);
LogUtils.info("当前线程池活跃线程数量:{},当前线程池线程数量:{},当前线程池队列数量:{}",
threadPool.getActiveCount(),
threadPool.getPoolSize(),
threadPool.getQueue().size());
} catch (Exception e) {
LogUtils.error("KAFKA消费失败", e);
}
}
public static void setThreadPoolSize(int poolSize) {
try {
if (poolSize > 10 && poolSize < 500 && poolSize != threadPool.getMaximumPoolSize()) {
threadPool.setMaximumPoolSize(poolSize);
threadPool.setCorePoolSize(poolSize);
threadPool.allowCoreThreadTimeOut(true);
LogUtils.info("Set successfully: " + threadPool.prestartAllCoreThreads());
}
LogUtils.info("Invalid thread pool size: " + poolSize);
} catch (Exception e) {
LogUtils.error("设置线程参数异常", e);
}
}
}