refactor(任务中心): 优化任务中心全部停止方法,提高处理效率

--bug=1020886 --user=赵勇 【任务中心】任务中心pending的任务特别多的时候,点全部停止,会报超时错误 https://www.tapd.cn/55049933/s/1316784
This commit is contained in:
fit2-zhao 2022-12-19 10:57:30 +08:00 committed by fit2-zhao
parent 8410bd4ab2
commit edc400f4cd
6 changed files with 120 additions and 91 deletions

View File

@ -5,6 +5,7 @@ import io.metersphere.api.dto.datacount.ExecutedCaseInfoResult;
import io.metersphere.base.domain.ApiDefinitionExecResult;
import io.metersphere.base.domain.ApiDefinitionExecResultExpand;
import io.metersphere.base.domain.ApiDefinitionExecResultWithBLOBs;
import io.metersphere.commons.vo.TaskResultVO;
import io.metersphere.dto.PlanReportCaseDTO;
import io.metersphere.task.dto.TaskCenterRequest;
import org.apache.ibatis.annotations.InsertProvider;
@ -43,7 +44,7 @@ public interface ExtApiDefinitionExecResultMapper {
@InsertProvider(type = ExtApiDefinitionExecResultProvider.class, method = "insertListSql")
void sqlInsert(List<ApiDefinitionExecResult> list);
List<ApiDefinitionExecResult> findByProjectIds(@Param("request") TaskCenterRequest request);
List<TaskResultVO> findByProjectIds(@Param("request") TaskCenterRequest request);
List<String> selectDistinctStatusByReportId(String reportId);

View File

@ -392,7 +392,7 @@
</select>
<select id="findByProjectIds" resultType="io.metersphere.base.domain.ApiDefinitionExecResult"
<select id="findByProjectIds" resultType="io.metersphere.commons.vo.TaskResultVO"
parameterType="java.lang.String">
select actuator ,id from api_definition_exec_result where status in ("running","starting","PENDING") and
project_id in

View File

@ -4,6 +4,7 @@ import io.metersphere.api.dto.QueryAPIReportRequest;
import io.metersphere.api.dto.automation.ApiScenarioReportResult;
import io.metersphere.api.dto.datacount.ApiDataCountResult;
import io.metersphere.base.domain.ApiScenarioReport;
import io.metersphere.commons.vo.TaskResultVO;
import io.metersphere.dto.ApiReportCountDTO;
import io.metersphere.dto.PlanReportCaseDTO;
import io.metersphere.task.dto.TaskCenterRequest;
@ -45,7 +46,7 @@ public interface ExtApiScenarioReportMapper {
@InsertProvider(type = ExtApiScenarioReportProvider.class, method = "insertListSql")
void sqlInsert(List<ApiScenarioReportResult> list);
List<ApiScenarioReport> findByProjectIds(@Param("request") TaskCenterRequest request);
List<TaskResultVO> findByProjectIds(@Param("request") TaskCenterRequest request);
List<String> selectByProjectIdAndLessThanTime(@Param("projectId") String projectId, @Param("time") long time);

View File

@ -470,7 +470,7 @@
</foreach>
</update>
<select id="findByProjectIds" resultType="io.metersphere.base.domain.ApiScenarioReport"
<select id="findByProjectIds" resultType="io.metersphere.commons.vo.TaskResultVO"
parameterType="java.lang.String">
select actuator ,id from api_scenario_report where status in ("running","starting","PENDING") and project_id in
<foreach collection="request.projects" item="id" separator="," open="(" close=")">

View File

@ -0,0 +1,11 @@
package io.metersphere.commons.vo;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
public class TaskResultVO {
private String id;
private String actuator;
}

View File

@ -17,6 +17,7 @@ import io.metersphere.commons.enums.ApiReportStatus;
import io.metersphere.commons.enums.StorageEnums;
import io.metersphere.commons.utils.JSON;
import io.metersphere.commons.utils.LogUtil;
import io.metersphere.commons.vo.TaskResultVO;
import io.metersphere.dto.NodeDTO;
import io.metersphere.service.ApiExecutionQueueService;
import io.metersphere.task.dto.TaskCenterDTO;
@ -25,6 +26,7 @@ import io.metersphere.task.dto.TaskRequestDTO;
import io.metersphere.task.service.TaskService;
import io.metersphere.utils.LoggerUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@ -70,36 +72,27 @@ public class ExtApiTaskService extends TaskService {
}
public void send(Map<String, List<String>> poolMap) {
private void send(Map<String, List<String>> poolMap) {
try {
LoggerUtil.info("结束所有NODE中执行的资源");
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
Thread.currentThread().setName("STOP-NODE");
for (String poolId : poolMap.keySet()) {
TestResourcePoolExample example = new TestResourcePoolExample();
example.createCriteria().andStatusEqualTo("VALID").andTypeEqualTo("NODE").andIdEqualTo(poolId);
List<TestResourcePool> pools = testResourcePoolMapper.selectByExample(example);
if (CollectionUtils.isNotEmpty(pools)) {
List<String> poolIds = pools.stream().map(pool -> pool.getId()).collect(Collectors.toList());
TestResourceExample resourceExample = new TestResourceExample();
resourceExample.createCriteria().andTestResourcePoolIdIn(poolIds);
resourceExample.setOrderByClause("create_time");
List<TestResource> testResources = testResourceMapper.selectByExampleWithBLOBs(resourceExample);
for (TestResource testResource : testResources) {
String configuration = testResource.getConfiguration();
NodeDTO node = JSON.parseObject(configuration, NodeDTO.class);
String nodeIp = node.getIp();
Integer port = node.getPort();
String uri = String.format(JMeterService.BASE_URL + "/jmeter/stop", nodeIp, port);
restTemplate.postForEntity(uri, poolMap.get(poolId), void.class);
}
}
Map<String, List<TestResource>> process = new HashMap<>();
for (String poolId : poolMap.keySet()) {
if (!process.containsKey(poolId)) {
List<TestResource> testResources = selectPoolResource(poolId);
process.put(poolId, testResources);
}
List<TestResource> testResources = process.get(poolId);
if (CollectionUtils.isNotEmpty(testResources)) {
for (TestResource testResource : testResources) {
String configuration = testResource.getConfiguration();
NodeDTO node = JSON.parseObject(configuration, NodeDTO.class);
String nodeIp = node.getIp();
Integer port = node.getPort();
String uri = String.format(JMeterService.BASE_URL + "/jmeter/stop", nodeIp, port);
restTemplate.postForEntity(uri, poolMap.get(poolId), void.class);
}
}
});
thread.start();
}
} catch (Exception e) {
LogUtil.error(e.getMessage());
}
@ -133,73 +126,81 @@ public class ExtApiTaskService extends TaskService {
extracted(poolMap, request.getReportId(), report.getActuator());
}
}
} else {
try {
LoggerUtil.info("进入批量停止方法");
// 全部停止
Map<String, TaskRequestDTO> taskRequestMap = taskRequests.stream().collect(Collectors.toMap(TaskRequestDTO::getType, taskRequest -> taskRequest));
// 获取工作空间项目
LoggerUtil.info("获取工作空间对应的项目");
TaskCenterRequest taskCenterRequest = new TaskCenterRequest();
taskCenterRequest.setProjects(this.getOwnerProjectIds(taskRequestMap.get(ElementConstants.SCENARIO_UPPER).getUserId()));
// 结束掉未分发完成的任务
LoggerUtil.info("结束正在进行中的计划任务队列");
JMeterThreadUtils.stop("PLAN-CASE");
JMeterThreadUtils.stop("API-CASE-RUN");
JMeterThreadUtils.stop("SCENARIO-PARALLEL-THREAD");
if (taskRequestMap.containsKey("API")) {
List<ApiDefinitionExecResult> results = extApiDefinitionExecResultMapper.findByProjectIds(taskCenterRequest);
LoggerUtil.info("查询API进行中的报告" + results.size());
if (CollectionUtils.isNotEmpty(results)) {
for (ApiDefinitionExecResult item : results) {
extracted(poolMap, item.getId(), item.getActuator());
// 从队列移除
execThreadPoolExecutor.removeQueue(item.getId());
PoolExecBlockingQueueUtil.offer(item.getId());
}
LoggerUtil.info("结束API进行中的报告");
baseTaskMapper.stopApi(taskCenterRequest);
// 清理队列并停止测试计划报告
LoggerUtil.info("清理API执行链");
List<String> ids = results.stream().map(ApiDefinitionExecResult::getId).collect(Collectors.toList());
apiExecutionQueueService.stop(ids);
}
}
if (taskRequestMap.containsKey(ElementConstants.SCENARIO_UPPER)) {
List<ApiScenarioReport> reports = extApiScenarioReportMapper.findByProjectIds(taskCenterRequest);
LoggerUtil.info("查询到执行中的场景报告:" + reports.size());
if (CollectionUtils.isNotEmpty(reports)) {
for (ApiScenarioReport report : reports) {
extracted(poolMap, report.getId(), report.getActuator());
// 从队列移除
execThreadPoolExecutor.removeQueue(report.getId());
PoolExecBlockingQueueUtil.offer(report.getId());
}
// 清理队列并停止测试计划报告
LoggerUtil.info("结束所有进行中的场景报告 ");
List<String> ids = reports.stream().map(ApiScenarioReport::getId).collect(Collectors.toList());
baseTaskMapper.stopScenario(taskCenterRequest);
// 清理队列并停止测试计划报告
LoggerUtil.info("清理队列并停止测试计划报告 ");
apiExecutionQueueService.stop(ids);
}
}
} catch (Exception e) {
LogUtil.error(e);
// 开始结束资源池中执行的任务
if (MapUtils.isNotEmpty(poolMap)) {
this.send(poolMap);
}
}
if (!poolMap.isEmpty()) {
this.send(poolMap);
} else {
Thread thread = new Thread(() -> {
this.batchStop(taskRequests);
});
thread.start();
}
}
return "SUCCESS";
}
private void batchStop(List<TaskRequestDTO> taskRequests) {
LoggerUtil.info("进入批量停止方法");
Map<String, List<String>> poolMap = new HashMap<>();
// 全部停止
Map<String, TaskRequestDTO> taskRequestMap = taskRequests.stream().collect(Collectors.toMap(TaskRequestDTO::getType, taskRequest -> taskRequest));
// 获取工作空间项目
LoggerUtil.info("获取工作空间对应的项目");
TaskCenterRequest taskCenterRequest = new TaskCenterRequest();
taskCenterRequest.setProjects(this.getOwnerProjectIds(taskRequestMap.get(ElementConstants.SCENARIO_UPPER).getUserId()));
// 结束掉未分发完成的任务
LoggerUtil.info("结束正在进行中的计划任务队列");
JMeterThreadUtils.stop("PLAN-CASE");
JMeterThreadUtils.stop("API-CASE-RUN");
JMeterThreadUtils.stop("SCENARIO-PARALLEL-THREAD");
if (taskRequestMap.containsKey("API")) {
List<TaskResultVO> results = extApiDefinitionExecResultMapper.findByProjectIds(taskCenterRequest);
LoggerUtil.info("查询API进行中的报告" + results.size());
if (CollectionUtils.isNotEmpty(results)) {
for (TaskResultVO item : results) {
extracted(poolMap, item.getId(), item.getActuator());
// 从队列移除
execThreadPoolExecutor.removeQueue(item.getId());
PoolExecBlockingQueueUtil.offer(item.getId());
}
LoggerUtil.info("结束API进行中的报告");
baseTaskMapper.stopApi(taskCenterRequest);
// 清理队列并停止测试计划报告
LoggerUtil.info("清理API执行链");
List<String> ids = results.stream().map(TaskResultVO::getId).collect(Collectors.toList());
apiExecutionQueueService.stop(ids);
}
}
if (taskRequestMap.containsKey(ElementConstants.SCENARIO_UPPER)) {
List<TaskResultVO> reports = extApiScenarioReportMapper.findByProjectIds(taskCenterRequest);
LoggerUtil.info("查询到执行中的场景报告:" + reports.size());
if (CollectionUtils.isNotEmpty(reports)) {
for (TaskResultVO report : reports) {
extracted(poolMap, report.getId(), report.getActuator());
// 从队列移除
execThreadPoolExecutor.removeQueue(report.getId());
PoolExecBlockingQueueUtil.offer(report.getId());
}
// 清理队列并停止测试计划报告
LoggerUtil.info("结束所有进行中的场景报告 ");
List<String> ids = reports.stream().map(TaskResultVO::getId).collect(Collectors.toList());
baseTaskMapper.stopScenario(taskCenterRequest);
// 清理队列并停止测试计划报告
LoggerUtil.info("清理队列并停止测试计划报告 ");
apiExecutionQueueService.stop(ids);
}
}
// 开始结束资源池中执行的任务
if (MapUtils.isNotEmpty(poolMap)) {
this.send(poolMap);
}
}
private void extracted(Map<String, List<String>> poolMap, String reportId, String actuator) {
if (StringUtils.isEmpty(reportId)) {
return;
@ -216,4 +217,19 @@ public class ExtApiTaskService extends TaskService {
JMeterThreadUtils.stop(reportId);
}
}
private List<TestResource> selectPoolResource(String poolId) {
TestResourcePoolExample example = new TestResourcePoolExample();
example.createCriteria().andStatusEqualTo("VALID").andTypeEqualTo("NODE").andIdEqualTo(poolId);
List<TestResourcePool> pools = testResourcePoolMapper.selectByExample(example);
if (CollectionUtils.isNotEmpty(pools)) {
List<String> poolIds = pools.stream().map(pool -> pool.getId()).collect(Collectors.toList());
TestResourceExample resourceExample = new TestResourceExample();
resourceExample.createCriteria().andTestResourcePoolIdIn(poolIds);
resourceExample.setOrderByClause("create_time");
return testResourceMapper.selectByExampleWithBLOBs(resourceExample);
}
return new ArrayList<>();
}
}