refactor(接口测试): 重构任务中心全部停止方法提升性能

--bug=1013002 --user=赵勇 【测试计划】github#13137任务中心全部停止接口响应时间太长 https://www.tapd.cn/55049933/s/1153389
This commit is contained in:
fit2-zhao 2022-05-06 14:29:35 +08:00 committed by 刘瑞斌
parent d36090178c
commit e4411ded9a
12 changed files with 280 additions and 164 deletions

View File

@ -41,6 +41,9 @@ public class ApiCaseParallelExecuteService {
// 获取可以执行的资源池
BaseSystemConfigDTO baseInfo = CommonBeanFactory.getBean(SystemParameterService.class).getBaseInfo();
for (String testId : executeQueue.keySet()) {
if (Thread.currentThread().isInterrupted()) {
break;
}
ApiDefinitionExecResult result = executeQueue.get(testId);
String reportId = result.getId();
JmeterRunRequestDTO runRequest = new JmeterRunRequestDTO(testId, reportId, runMode, null);

View File

@ -131,7 +131,7 @@ public class ExecThreadPoolExecutor {
BlockingQueue workerQueue = threadPool.getQueue();
workerQueue.forEach(item -> {
ExecTask task = (ExecTask) item;
if (task.getRequest() != null && StringUtils.equals(task.getRequest().getReportId(), reportId)) {
if (task != null && task.getRequest() != null && StringUtils.equals(task.getRequest().getReportId(), reportId)) {
workerQueue.remove(item);
}
});

View File

@ -36,6 +36,9 @@ public class ApiScenarioParallelService {
// 获取可以执行的资源池
BaseSystemConfigDTO baseInfo = CommonBeanFactory.getBean(SystemParameterService.class).getBaseInfo();
for (String reportId : executeQueue.keySet()) {
if (Thread.currentThread().isInterrupted()) {
break;
}
RunModeDataDTO dataDTO = executeQueue.get(reportId);
JmeterRunRequestDTO runRequest = new JmeterRunRequestDTO(dataDTO.getTestId(), StringUtils.isNotEmpty(serialReportId) ? serialReportId : reportId, request.getRunMode(), null);
runRequest.setReportType(StringUtils.isNotEmpty(serialReportId) ? RunModeConstants.SET_REPORT.toString() : RunModeConstants.INDEPENDENCE.toString());

View File

@ -15,7 +15,7 @@ public class JmeterThreadUtils {
currentGroup.enumerate(lstThreads);
StringBuilder threadNames = new StringBuilder();
for (int i = 0; i < noThreads; i++) {
if (StringUtils.isNotEmpty(lstThreads[i].getName()) && lstThreads[i].getName().startsWith(name)) {
if (lstThreads[i]!=null && StringUtils.isNotEmpty(lstThreads[i].getName()) && lstThreads[i].getName().startsWith(name)) {
System.out.println("异常强制处理线程编号:" + i + " = " + lstThreads[i].getName());
LogUtil.error("异常强制处理线程编号:" + i + " = " + lstThreads[i].getName());
threadNames.append(lstThreads[i].getName()).append("");

View File

@ -459,6 +459,30 @@ public class ApiExecutionQueueService {
});
}
public void stop(List<String> reportIds) {
ApiExecutionQueueDetailExample example = new ApiExecutionQueueDetailExample();
example.createCriteria().andReportIdIn(reportIds);
List<ApiExecutionQueueDetail> details = executionQueueDetailMapper.selectByExample(example);
List<String> queueIds = new ArrayList<>();
details.forEach(item -> {
if (!queueIds.contains(item.getQueueId())) {
queueIds.add(item.getQueueId());
}
});
executionQueueDetailMapper.deleteByExample(example);
for (String queueId : queueIds) {
ApiExecutionQueue queue = queueMapper.selectByPrimaryKey(queueId);
// 更新测试计划报告
if (queue != null && StringUtils.isNotEmpty(queue.getReportId())) {
CommonBeanFactory.getBean(TestPlanReportService.class).finishedTestPlanReport(queue.getReportId(), "Stopped");
queueMapper.deleteByPrimaryKey(queueId);
}
}
}
/**
* 性能测试监听检查
*

View File

@ -1,10 +1,10 @@
package io.metersphere.base.mapper.ext;
import io.metersphere.api.dto.QueryAPIReportRequest;
import io.metersphere.api.dto.automation.APIScenarioReportResult;
import io.metersphere.api.dto.datacount.ExecutedCaseInfoResult;
import io.metersphere.base.domain.ApiDefinitionExecResult;
import io.metersphere.base.domain.ApiDefinitionExecResultExpand;
import io.metersphere.task.dto.TaskCenterRequest;
import io.metersphere.track.dto.PlanReportCaseDTO;
import org.apache.ibatis.annotations.InsertProvider;
import org.apache.ibatis.annotations.Param;
@ -44,4 +44,7 @@ public interface ExtApiDefinitionExecResultMapper {
@InsertProvider(type = ExtApiDefinitionExecResultProvider.class, method = "insertListSql")
void sqlInsert(List<ApiDefinitionExecResult> list);
List<ApiDefinitionExecResult> findByProjectIds(@Param("request") TaskCenterRequest request);
}

View File

@ -275,4 +275,11 @@
#{value}
</foreach>
</update>
<select id="findByProjectIds" resultType="io.metersphere.base.domain.ApiDefinitionExecResult" parameterType="java.lang.String">
select actuator ,id from api_definition_exec_result where status in ("running","starting","waiting") and project_id in
<foreach collection="request.projects" item="id" separator="," open="(" close=")">
#{id}
</foreach>
</select>
</mapper>

View File

@ -5,6 +5,7 @@ import io.metersphere.api.dto.automation.APIScenarioReportResult;
import io.metersphere.api.dto.datacount.ApiDataCountResult;
import io.metersphere.base.domain.ApiScenarioReport;
import io.metersphere.dto.ApiReportCountDTO;
import io.metersphere.task.dto.TaskCenterRequest;
import io.metersphere.track.dto.PlanReportCaseDTO;
import org.apache.ibatis.annotations.InsertProvider;
import org.apache.ibatis.annotations.Param;
@ -44,4 +45,7 @@ public interface ExtApiScenarioReportMapper {
@InsertProvider(type = ExtApiScenarioReportProvider.class, method = "insertListSql")
void sqlInsert(List<APIScenarioReportResult> list);
List<ApiScenarioReport> findByProjectIds(@Param("request") TaskCenterRequest request);
}

View File

@ -428,4 +428,12 @@
#{value}
</foreach>
</update>
<select id="findByProjectIds" resultType="io.metersphere.base.domain.ApiScenarioReport" parameterType="java.lang.String">
select actuator ,id from api_scenario_report where status in ("running","starting","waiting") and project_id in
<foreach collection="request.projects" item="id" separator="," open="(" close=")">
#{id}
</foreach>
</select>
</mapper>

View File

@ -18,4 +18,8 @@ public interface ExtTaskMapper {
List<String> checkActuator (@Param("actuator") String actuator);
int stopScenario(@Param("request") TaskCenterRequest request);
int stopApi(@Param("request") TaskCenterRequest request);
}

View File

@ -4,9 +4,13 @@
<select id="getTasks" resultType="io.metersphere.task.dto.TaskCenterDTO"
parameterType="java.lang.String">
SELECT tt.* FROM (
(select t.id,if(t.scenario_id like "[\"%\"]", t.name,t.scenario_name) as name ,'SCENARIO' as executionModule,t.report_type, ifnull(t2.name,'LOCAL') as actuator, t1.`name` as executor,t.create_time as executionTime, t.trigger_mode as triggerMode ,t.status as executionStatus
from api_scenario_report t left join `user` t1 ON t.user_id = t1.id left join test_resource_pool t2 on t.actuator = t2.id
where to_days(FROM_UNIXTIME(t.create_time/1000))= to_days(now()) and t.execute_type !='Debug' and t.execute_type !='Marge' and t.project_id in
(select t.id,if(t.scenario_id like "[\"%\"]", t.name,t.scenario_name) as name ,'SCENARIO' as
executionModule,t.report_type, ifnull(t2.name,'LOCAL') as actuator, t1.`name` as executor,t.create_time as
executionTime, t.trigger_mode as triggerMode ,t.status as executionStatus
from api_scenario_report t left join `user` t1 ON t.user_id = t1.id left join test_resource_pool t2 on
t.actuator = t2.id
where to_days(FROM_UNIXTIME(t.create_time/1000))= to_days(now()) and t.execute_type !='Debug' and t.execute_type
!='Marge' and t.project_id in
<foreach collection="request.projects" item="id" separator="," open="(" close=")">
#{id}
</foreach>
@ -21,8 +25,11 @@
</if>
)
UNION ALL
(select t.id,t.name,'API' as executionModule, t.report_type, ifnull(t2.name,'LOCAL') as actuator, t1.`name` as executor,t.create_time as executionTime, ifnull(t.trigger_mode,'MANUAL') as triggerMode ,ifnull(t.status,'Saved') as executionStatus
from api_definition_exec_result t left join `user` t1 ON t.user_id = t1.id left join test_resource_pool t2 on t.actuator = t2.id
(select t.id,t.name,'API' as executionModule, t.report_type, ifnull(t2.name,'LOCAL') as actuator, t1.`name` as
executor,t.create_time as executionTime, ifnull(t.trigger_mode,'MANUAL') as triggerMode
,ifnull(t.status,'Saved') as executionStatus
from api_definition_exec_result t left join `user` t1 ON t.user_id = t1.id left join test_resource_pool t2 on
t.actuator = t2.id
where to_days(FROM_UNIXTIME(t.create_time/1000))= to_days(now()) and t.project_id in
<foreach collection="request.projects" item="id" separator="," open="(" close=")">
#{id}
@ -40,8 +47,11 @@
and (t.integrated_report_id is null or t.integrated_report_id = 'null')
)
UNION ALL
(select t.id,t.name,'PERFORMANCE' as executionModule,'PERFORMANCE' as report_type, ifnull(t2.name,'LOCAL') as actuator, t1.`name` as executor,t.create_time as executionTime, t.trigger_mode as triggerMode ,t.`status` as executionStatus
from load_test_report t left join `user` t1 ON t.user_id = t1.id left join test_resource_pool t2 on t.test_resource_pool_id = t2.id
(select t.id,t.name,'PERFORMANCE' as executionModule,'PERFORMANCE' as report_type, ifnull(t2.name,'LOCAL') as
actuator, t1.`name` as executor,t.create_time as executionTime, t.trigger_mode as triggerMode ,t.`status` as
executionStatus
from load_test_report t left join `user` t1 ON t.user_id = t1.id left join test_resource_pool t2 on
t.test_resource_pool_id = t2.id
where to_days(FROM_UNIXTIME(t.create_time/1000))= to_days(now()) and t.project_id in
<foreach collection="request.projects" item="id" separator="," open="(" close=")">
#{id}
@ -78,9 +88,12 @@
<select id="getRunningTasks" resultType="java.lang.Integer" parameterType="java.lang.String">
SELECT count(tt.id) FROM (
(select t.id,'SCENARIO' as executionModule,t.report_type, ifnull(t2.name,'LOCAL') as actuator, t1.`name` as executor,t.create_time as executionTime, t.trigger_mode as triggerMode ,t.status as executionStatus
from api_scenario_report t left join `user` t1 ON t.user_id = t1.id left join test_resource_pool t2 on t.actuator = t2.id
where to_days(FROM_UNIXTIME(t.create_time/1000))= to_days(now()) and t.execute_type !='Debug' and t.execute_type !='Marge' and t.project_id in
(select t.id,'SCENARIO' as executionModule,t.report_type, ifnull(t2.name,'LOCAL') as actuator, t1.`name` as
executor,t.create_time as executionTime, t.trigger_mode as triggerMode ,t.status as executionStatus
from api_scenario_report t left join `user` t1 ON t.user_id = t1.id left join test_resource_pool t2 on
t.actuator = t2.id
where to_days(FROM_UNIXTIME(t.create_time/1000))= to_days(now()) and t.execute_type !='Debug' and t.execute_type
!='Marge' and t.project_id in
<foreach collection="request.projects" item="id" separator="," open="(" close=")">
#{id}
</foreach>
@ -96,8 +109,11 @@
and t.status in ("running","starting","waiting")
)
UNION ALL
(select t.id,'API' as executionModule, t.report_type, ifnull(t2.name,'LOCAL') as actuator, t1.`name` as executor,t.create_time as executionTime, ifnull(t.trigger_mode,'MANUAL') as triggerMode ,ifnull(t.status,'Saved') as executionStatus
from api_definition_exec_result t left join `user` t1 ON t.user_id = t1.id left join test_resource_pool t2 on t.actuator = t2.id
(select t.id,'API' as executionModule, t.report_type, ifnull(t2.name,'LOCAL') as actuator, t1.`name` as
executor,t.create_time as executionTime, ifnull(t.trigger_mode,'MANUAL') as triggerMode
,ifnull(t.status,'Saved') as executionStatus
from api_definition_exec_result t left join `user` t1 ON t.user_id = t1.id left join test_resource_pool t2 on
t.actuator = t2.id
where to_days(FROM_UNIXTIME(t.create_time/1000))= to_days(now()) and t.project_id in
<foreach collection="request.projects" item="id" separator="," open="(" close=")">
#{id}
@ -116,8 +132,11 @@
and t.status in ("running","starting","waiting")
)
UNION ALL
(select t.id,'PERFORMANCE' as executionModule,'PERFORMANCE' as report_type, ifnull(t2.name,'LOCAL') as actuator, t1.`name` as executor,t.create_time as executionTime, t.trigger_mode as triggerMode ,t.`status` as executionStatus
from load_test_report t left join `user` t1 ON t.user_id = t1.id left join test_resource_pool t2 on t.test_resource_pool_id = t2.id
(select t.id,'PERFORMANCE' as executionModule,'PERFORMANCE' as report_type, ifnull(t2.name,'LOCAL') as actuator,
t1.`name` as executor,t.create_time as executionTime, t.trigger_mode as triggerMode ,t.`status` as
executionStatus
from load_test_report t left join `user` t1 ON t.user_id = t1.id left join test_resource_pool t2 on
t.test_resource_pool_id = t2.id
where to_days(FROM_UNIXTIME(t.create_time/1000))= to_days(now()) and t.project_id in
<foreach collection="request.projects" item="id" separator="," open="(" close=")">
#{id}
@ -155,4 +174,21 @@
actuator = #{actuator}
AND `status` in ('Running','Waiting')
</select>
<update id="stopScenario">
update api_scenario_report set status ='STOP' where
project_id in
<foreach collection="request.projects" item="id" separator="," open="(" close=")">
#{id}
</foreach>
</update>
<update id="stopApi">
update api_definition_exec_result set status ='STOP' where
project_id in
<foreach collection="request.projects" item="id" separator="," open="(" close=")">
#{id}
</foreach>
</update>
</mapper>

View File

@ -5,6 +5,7 @@ import io.metersphere.api.dto.automation.TaskRequest;
import io.metersphere.api.exec.queue.ExecThreadPoolExecutor;
import io.metersphere.api.exec.queue.PoolExecBlockingQueueUtil;
import io.metersphere.api.jmeter.JMeterService;
import io.metersphere.api.jmeter.JmeterThreadUtils;
import io.metersphere.api.service.ApiExecutionQueueService;
import io.metersphere.base.domain.*;
import io.metersphere.base.mapper.ApiDefinitionExecResultMapper;
@ -16,6 +17,7 @@ import io.metersphere.base.mapper.ext.ExtApiScenarioReportMapper;
import io.metersphere.base.mapper.ext.ExtLoadTestReportMapper;
import io.metersphere.base.mapper.ext.ExtTaskMapper;
import io.metersphere.commons.utils.LogUtil;
import io.metersphere.commons.utils.SessionUtils;
import io.metersphere.dto.NodeDTO;
import io.metersphere.jmeter.LocalRunner;
import io.metersphere.performance.service.PerformanceTestService;
@ -125,14 +127,15 @@ public class TaskService {
}
}
public String stop(List<TaskRequest> reportIds) {
if (CollectionUtils.isNotEmpty(reportIds)) {
public String stop(List<TaskRequest> taskRequests) {
if (CollectionUtils.isNotEmpty(taskRequests)) {
List<TaskRequest> stopTasks = taskRequests.stream().filter(s -> StringUtils.isNotEmpty(s.getReportId())).collect(Collectors.toList());
// 聚类同一批资源池的一批发送
Map<String, List<String>> poolMap = new HashMap<>();
for (TaskRequest request : reportIds) {
String actuator = null;
if (StringUtils.isNotEmpty(request.getReportId())) {
// 单条停止
if (CollectionUtils.isNotEmpty(stopTasks) && stopTasks.size() == 1) {
// 从队列移除
TaskRequest request = stopTasks.get(0);
execThreadPoolExecutor.removeQueue(request.getReportId());
apiExecutionQueueService.stop(request.getReportId());
PoolExecBlockingQueueUtil.offer(request.getReportId());
@ -141,57 +144,72 @@ public class TaskService {
if (result != null) {
result.setStatus("STOP");
apiDefinitionExecResultMapper.updateByPrimaryKeySelective(result);
actuator = result.getActuator();
extracted(poolMap, request.getReportId(), result.getActuator());
}
} else if (StringUtils.equals(request.getType(), "SCENARIO")) {
}
if (StringUtils.equals(request.getType(), "SCENARIO")) {
ApiScenarioReport report = apiScenarioReportMapper.selectByPrimaryKey(request.getReportId());
if (report != null) {
report.setStatus("STOP");
apiScenarioReportMapper.updateByPrimaryKeySelective(report);
actuator = report.getActuator();
extracted(poolMap, request.getReportId(), report.getActuator());
}
} else if (StringUtils.equals(request.getType(), "PERFORMANCE")) {
}
if (StringUtils.equals(request.getType(), "PERFORMANCE")) {
performanceTestService.stopTest(request.getReportId(), false);
}
extracted(poolMap, request, actuator);
} else {
if (StringUtils.equals(request.getType(), "API")) {
List<ApiDefinitionExecResult> result = extApiDefinitionExecResultMapper.selectApiResultByProjectId(request.getProjectId());
if (CollectionUtils.isNotEmpty(result)) {
for (ApiDefinitionExecResult item : result) {
item.setStatus("STOP");
apiDefinitionExecResultMapper.updateByPrimaryKeySelective(item);
actuator = item.getActuator();
request.setReportId(item.getId());
extracted(poolMap, request, actuator);
try {
// 全部停止
Map<String, TaskRequest> taskRequestMap = taskRequests.stream().collect(Collectors.toMap(TaskRequest::getType, taskRequest -> taskRequest));
// 获取工作空间项目
TaskCenterRequest taskCenterRequest = new TaskCenterRequest();
taskCenterRequest.setProjects(this.getOwnerProjectIds(SessionUtils.getUserId()));
// 结束掉未分发完成的任务
JmeterThreadUtils.stop("PLAN-CASE");
JmeterThreadUtils.stop("API-CASE-RUN");
JmeterThreadUtils.stop("SCENARIO-PARALLEL-THREAD");
if (taskRequestMap.containsKey("API")) {
List<ApiDefinitionExecResult> results = extApiDefinitionExecResultMapper.findByProjectIds(taskCenterRequest);
if (CollectionUtils.isNotEmpty(results)) {
for (ApiDefinitionExecResult item : results) {
extracted(poolMap, item.getId(), item.getActuator());
// 从队列移除
execThreadPoolExecutor.removeQueue(item.getId());
apiExecutionQueueService.stop(item.getId());
PoolExecBlockingQueueUtil.offer(item.getId());
}
}
} else if (StringUtils.equals(request.getType(), "SCENARIO")) {
List<ApiScenarioReport> reports = extApiScenarioReportMapper.selectReportByProjectId(request.getProjectId());
extTaskMapper.stopApi(taskCenterRequest);
// 清理队列并停止测试计划报告
List<String> ids = results.stream().map(ApiDefinitionExecResult::getId).collect(Collectors.toList());
apiExecutionQueueService.stop(ids);
}
if (taskRequestMap.containsKey("SCENARIO")) {
List<ApiScenarioReport> reports = extApiScenarioReportMapper.findByProjectIds(taskCenterRequest);
if (CollectionUtils.isNotEmpty(reports)) {
for (ApiScenarioReport report : reports) {
report.setStatus("STOP");
apiScenarioReportMapper.updateByPrimaryKeySelective(report);
actuator = report.getActuator();
request.setReportId(report.getId());
extracted(poolMap, request, actuator);
extracted(poolMap, report.getId(), report.getActuator());
// 从队列移除
execThreadPoolExecutor.removeQueue(report.getId());
apiExecutionQueueService.stop(report.getId());
PoolExecBlockingQueueUtil.offer(report.getId());
}
// 清理队列并停止测试计划报告
List<String> ids = reports.stream().map(ApiScenarioReport::getId).collect(Collectors.toList());
extTaskMapper.stopScenario(taskCenterRequest);
// 清理队列并停止测试计划报告
apiExecutionQueueService.stop(ids);
}
} else if (StringUtils.equals(request.getType(), "PERFORMANCE")) {
List<LoadTestReport> loadTestReports = extLoadTestReportMapper.selectReportByProjectId(request.getProjectId());
}
if (taskRequestMap.containsKey("PERFORMANCE")) {
List<LoadTestReport> loadTestReports = extLoadTestReportMapper.selectReportByProjectId(taskRequestMap.get("PERFORMANCE").getProjectId());
if (CollectionUtils.isNotEmpty(loadTestReports)) {
for (LoadTestReport loadTestReport : loadTestReports) {
performanceTestService.stopTest(loadTestReport.getId(), false);
request.setReportId(loadTestReport.getId());
extracted(poolMap, request, actuator);
// 从队列移除
execThreadPoolExecutor.removeQueue(loadTestReport.getId());
apiExecutionQueueService.stop(loadTestReport.getId());
@ -199,26 +217,32 @@ public class TaskService {
}
}
}
} catch (Exception e) {
LogUtil.error(e);
}
}
if (!poolMap.isEmpty()) {
this.send(poolMap);
}
}
}
return "SUCCESS";
}
private void extracted(Map<String, List<String>> poolMap, TaskRequest request, String actuator) {
private void extracted(Map<String, List<String>> poolMap, String reportId, String actuator) {
if (StringUtils.isEmpty(reportId)) {
return;
}
if (StringUtils.isNotEmpty(actuator) && !StringUtils.equals(actuator, "LOCAL")) {
if (poolMap.containsKey(actuator)) {
poolMap.get(actuator).add(request.getReportId());
poolMap.get(actuator).add(reportId);
} else {
poolMap.put(actuator, new ArrayList<String>() {{
this.add(request.getReportId());
this.add(reportId);
}});
}
} else {
new LocalRunner().stop(request.getReportId());
new LocalRunner().stop(reportId);
JmeterThreadUtils.stop(reportId);
}
}
}