From edc400f4cd887e7081c07bebeea8781a767bf37f Mon Sep 17 00:00:00 2001 From: fit2-zhao Date: Mon, 19 Dec 2022 10:57:30 +0800 Subject: [PATCH] =?UTF-8?q?refactor(=E4=BB=BB=E5=8A=A1=E4=B8=AD=E5=BF=83):?= =?UTF-8?q?=20=E4=BC=98=E5=8C=96=E4=BB=BB=E5=8A=A1=E4=B8=AD=E5=BF=83?= =?UTF-8?q?=E5=85=A8=E9=83=A8=E5=81=9C=E6=AD=A2=E6=96=B9=E6=B3=95=EF=BC=8C?= =?UTF-8?q?=E6=8F=90=E9=AB=98=E5=A4=84=E7=90=86=E6=95=88=E7=8E=87?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --bug=1020886 --user=赵勇 【任务中心】任务中心pending的任务特别多的时候,点全部停止,会报超时错误 https://www.tapd.cn/55049933/s/1316784 --- .../ext/ExtApiDefinitionExecResultMapper.java | 3 +- .../ext/ExtApiDefinitionExecResultMapper.xml | 2 +- .../ext/ExtApiScenarioReportMapper.java | 3 +- .../mapper/ext/ExtApiScenarioReportMapper.xml | 2 +- .../metersphere/commons/vo/TaskResultVO.java | 11 + .../service/ext/ExtApiTaskService.java | 190 ++++++++++-------- 6 files changed, 120 insertions(+), 91 deletions(-) create mode 100644 api-test/backend/src/main/java/io/metersphere/commons/vo/TaskResultVO.java diff --git a/api-test/backend/src/main/java/io/metersphere/base/mapper/ext/ExtApiDefinitionExecResultMapper.java b/api-test/backend/src/main/java/io/metersphere/base/mapper/ext/ExtApiDefinitionExecResultMapper.java index cbc33f3f5e..2683b12a45 100644 --- a/api-test/backend/src/main/java/io/metersphere/base/mapper/ext/ExtApiDefinitionExecResultMapper.java +++ b/api-test/backend/src/main/java/io/metersphere/base/mapper/ext/ExtApiDefinitionExecResultMapper.java @@ -5,6 +5,7 @@ import io.metersphere.api.dto.datacount.ExecutedCaseInfoResult; import io.metersphere.base.domain.ApiDefinitionExecResult; import io.metersphere.base.domain.ApiDefinitionExecResultExpand; import io.metersphere.base.domain.ApiDefinitionExecResultWithBLOBs; +import io.metersphere.commons.vo.TaskResultVO; import io.metersphere.dto.PlanReportCaseDTO; import io.metersphere.task.dto.TaskCenterRequest; import org.apache.ibatis.annotations.InsertProvider; @@ -43,7 +44,7 @@ public interface ExtApiDefinitionExecResultMapper { @InsertProvider(type = ExtApiDefinitionExecResultProvider.class, method = "insertListSql") void sqlInsert(List list); - List findByProjectIds(@Param("request") TaskCenterRequest request); + List findByProjectIds(@Param("request") TaskCenterRequest request); List selectDistinctStatusByReportId(String reportId); diff --git a/api-test/backend/src/main/java/io/metersphere/base/mapper/ext/ExtApiDefinitionExecResultMapper.xml b/api-test/backend/src/main/java/io/metersphere/base/mapper/ext/ExtApiDefinitionExecResultMapper.xml index 40d7896cad..9b4b43e499 100644 --- a/api-test/backend/src/main/java/io/metersphere/base/mapper/ext/ExtApiDefinitionExecResultMapper.xml +++ b/api-test/backend/src/main/java/io/metersphere/base/mapper/ext/ExtApiDefinitionExecResultMapper.xml @@ -392,7 +392,7 @@ - select actuator ,id from api_scenario_report where status in ("running","starting","PENDING") and project_id in diff --git a/api-test/backend/src/main/java/io/metersphere/commons/vo/TaskResultVO.java b/api-test/backend/src/main/java/io/metersphere/commons/vo/TaskResultVO.java new file mode 100644 index 0000000000..aaf1d098d7 --- /dev/null +++ b/api-test/backend/src/main/java/io/metersphere/commons/vo/TaskResultVO.java @@ -0,0 +1,11 @@ +package io.metersphere.commons.vo; + +import lombok.Getter; +import lombok.Setter; + +@Getter +@Setter +public class TaskResultVO { + private String id; + private String actuator; +} diff --git a/api-test/backend/src/main/java/io/metersphere/service/ext/ExtApiTaskService.java b/api-test/backend/src/main/java/io/metersphere/service/ext/ExtApiTaskService.java index b6416df443..c80469f493 100644 --- a/api-test/backend/src/main/java/io/metersphere/service/ext/ExtApiTaskService.java +++ b/api-test/backend/src/main/java/io/metersphere/service/ext/ExtApiTaskService.java @@ -17,6 +17,7 @@ import io.metersphere.commons.enums.ApiReportStatus; import io.metersphere.commons.enums.StorageEnums; import io.metersphere.commons.utils.JSON; import io.metersphere.commons.utils.LogUtil; +import io.metersphere.commons.vo.TaskResultVO; import io.metersphere.dto.NodeDTO; import io.metersphere.service.ApiExecutionQueueService; import io.metersphere.task.dto.TaskCenterDTO; @@ -25,6 +26,7 @@ import io.metersphere.task.dto.TaskRequestDTO; import io.metersphere.task.service.TaskService; import io.metersphere.utils.LoggerUtil; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -70,36 +72,27 @@ public class ExtApiTaskService extends TaskService { } - public void send(Map> poolMap) { + private void send(Map> poolMap) { try { LoggerUtil.info("结束所有NODE中执行的资源"); - Thread thread = new Thread(new Runnable() { - @Override - public void run() { - Thread.currentThread().setName("STOP-NODE"); - for (String poolId : poolMap.keySet()) { - TestResourcePoolExample example = new TestResourcePoolExample(); - example.createCriteria().andStatusEqualTo("VALID").andTypeEqualTo("NODE").andIdEqualTo(poolId); - List pools = testResourcePoolMapper.selectByExample(example); - if (CollectionUtils.isNotEmpty(pools)) { - List poolIds = pools.stream().map(pool -> pool.getId()).collect(Collectors.toList()); - TestResourceExample resourceExample = new TestResourceExample(); - resourceExample.createCriteria().andTestResourcePoolIdIn(poolIds); - resourceExample.setOrderByClause("create_time"); - List testResources = testResourceMapper.selectByExampleWithBLOBs(resourceExample); - for (TestResource testResource : testResources) { - String configuration = testResource.getConfiguration(); - NodeDTO node = JSON.parseObject(configuration, NodeDTO.class); - String nodeIp = node.getIp(); - Integer port = node.getPort(); - String uri = String.format(JMeterService.BASE_URL + "/jmeter/stop", nodeIp, port); - restTemplate.postForEntity(uri, poolMap.get(poolId), void.class); - } - } + Map> process = new HashMap<>(); + for (String poolId : poolMap.keySet()) { + if (!process.containsKey(poolId)) { + List testResources = selectPoolResource(poolId); + process.put(poolId, testResources); + } + List testResources = process.get(poolId); + if (CollectionUtils.isNotEmpty(testResources)) { + for (TestResource testResource : testResources) { + String configuration = testResource.getConfiguration(); + NodeDTO node = JSON.parseObject(configuration, NodeDTO.class); + String nodeIp = node.getIp(); + Integer port = node.getPort(); + String uri = String.format(JMeterService.BASE_URL + "/jmeter/stop", nodeIp, port); + restTemplate.postForEntity(uri, poolMap.get(poolId), void.class); } } - }); - thread.start(); + } } catch (Exception e) { LogUtil.error(e.getMessage()); } @@ -133,73 +126,81 @@ public class ExtApiTaskService extends TaskService { extracted(poolMap, request.getReportId(), report.getActuator()); } } - - } else { - try { - LoggerUtil.info("进入批量停止方法"); - // 全部停止 - Map taskRequestMap = taskRequests.stream().collect(Collectors.toMap(TaskRequestDTO::getType, taskRequest -> taskRequest)); - // 获取工作空间项目 - LoggerUtil.info("获取工作空间对应的项目"); - TaskCenterRequest taskCenterRequest = new TaskCenterRequest(); - taskCenterRequest.setProjects(this.getOwnerProjectIds(taskRequestMap.get(ElementConstants.SCENARIO_UPPER).getUserId())); - - // 结束掉未分发完成的任务 - LoggerUtil.info("结束正在进行中的计划任务队列"); - JMeterThreadUtils.stop("PLAN-CASE"); - JMeterThreadUtils.stop("API-CASE-RUN"); - JMeterThreadUtils.stop("SCENARIO-PARALLEL-THREAD"); - - if (taskRequestMap.containsKey("API")) { - List results = extApiDefinitionExecResultMapper.findByProjectIds(taskCenterRequest); - LoggerUtil.info("查询API进行中的报告:" + results.size()); - if (CollectionUtils.isNotEmpty(results)) { - for (ApiDefinitionExecResult item : results) { - extracted(poolMap, item.getId(), item.getActuator()); - // 从队列移除 - execThreadPoolExecutor.removeQueue(item.getId()); - PoolExecBlockingQueueUtil.offer(item.getId()); - } - LoggerUtil.info("结束API进行中的报告"); - baseTaskMapper.stopApi(taskCenterRequest); - // 清理队列并停止测试计划报告 - LoggerUtil.info("清理API执行链"); - List ids = results.stream().map(ApiDefinitionExecResult::getId).collect(Collectors.toList()); - apiExecutionQueueService.stop(ids); - } - } - if (taskRequestMap.containsKey(ElementConstants.SCENARIO_UPPER)) { - List reports = extApiScenarioReportMapper.findByProjectIds(taskCenterRequest); - LoggerUtil.info("查询到执行中的场景报告:" + reports.size()); - if (CollectionUtils.isNotEmpty(reports)) { - for (ApiScenarioReport report : reports) { - - extracted(poolMap, report.getId(), report.getActuator()); - // 从队列移除 - execThreadPoolExecutor.removeQueue(report.getId()); - PoolExecBlockingQueueUtil.offer(report.getId()); - } - - // 清理队列并停止测试计划报告 - LoggerUtil.info("结束所有进行中的场景报告 "); - List ids = reports.stream().map(ApiScenarioReport::getId).collect(Collectors.toList()); - baseTaskMapper.stopScenario(taskCenterRequest); - // 清理队列并停止测试计划报告 - LoggerUtil.info("清理队列并停止测试计划报告 "); - apiExecutionQueueService.stop(ids); - } - } - } catch (Exception e) { - LogUtil.error(e); + // 开始结束资源池中执行的任务 + if (MapUtils.isNotEmpty(poolMap)) { + this.send(poolMap); } - } - if (!poolMap.isEmpty()) { - this.send(poolMap); + } else { + Thread thread = new Thread(() -> { + this.batchStop(taskRequests); + }); + thread.start(); } } return "SUCCESS"; } + private void batchStop(List taskRequests) { + LoggerUtil.info("进入批量停止方法"); + Map> poolMap = new HashMap<>(); + // 全部停止 + Map taskRequestMap = taskRequests.stream().collect(Collectors.toMap(TaskRequestDTO::getType, taskRequest -> taskRequest)); + // 获取工作空间项目 + LoggerUtil.info("获取工作空间对应的项目"); + TaskCenterRequest taskCenterRequest = new TaskCenterRequest(); + taskCenterRequest.setProjects(this.getOwnerProjectIds(taskRequestMap.get(ElementConstants.SCENARIO_UPPER).getUserId())); + + // 结束掉未分发完成的任务 + LoggerUtil.info("结束正在进行中的计划任务队列"); + JMeterThreadUtils.stop("PLAN-CASE"); + JMeterThreadUtils.stop("API-CASE-RUN"); + JMeterThreadUtils.stop("SCENARIO-PARALLEL-THREAD"); + + if (taskRequestMap.containsKey("API")) { + List results = extApiDefinitionExecResultMapper.findByProjectIds(taskCenterRequest); + LoggerUtil.info("查询API进行中的报告:" + results.size()); + if (CollectionUtils.isNotEmpty(results)) { + for (TaskResultVO item : results) { + extracted(poolMap, item.getId(), item.getActuator()); + // 从队列移除 + execThreadPoolExecutor.removeQueue(item.getId()); + PoolExecBlockingQueueUtil.offer(item.getId()); + } + LoggerUtil.info("结束API进行中的报告"); + baseTaskMapper.stopApi(taskCenterRequest); + // 清理队列并停止测试计划报告 + LoggerUtil.info("清理API执行链"); + List ids = results.stream().map(TaskResultVO::getId).collect(Collectors.toList()); + apiExecutionQueueService.stop(ids); + } + } + if (taskRequestMap.containsKey(ElementConstants.SCENARIO_UPPER)) { + List reports = extApiScenarioReportMapper.findByProjectIds(taskCenterRequest); + LoggerUtil.info("查询到执行中的场景报告:" + reports.size()); + if (CollectionUtils.isNotEmpty(reports)) { + for (TaskResultVO report : reports) { + + extracted(poolMap, report.getId(), report.getActuator()); + // 从队列移除 + execThreadPoolExecutor.removeQueue(report.getId()); + PoolExecBlockingQueueUtil.offer(report.getId()); + } + + // 清理队列并停止测试计划报告 + LoggerUtil.info("结束所有进行中的场景报告 "); + List ids = reports.stream().map(TaskResultVO::getId).collect(Collectors.toList()); + baseTaskMapper.stopScenario(taskCenterRequest); + // 清理队列并停止测试计划报告 + LoggerUtil.info("清理队列并停止测试计划报告 "); + apiExecutionQueueService.stop(ids); + } + } + // 开始结束资源池中执行的任务 + if (MapUtils.isNotEmpty(poolMap)) { + this.send(poolMap); + } + } + private void extracted(Map> poolMap, String reportId, String actuator) { if (StringUtils.isEmpty(reportId)) { return; @@ -216,4 +217,19 @@ public class ExtApiTaskService extends TaskService { JMeterThreadUtils.stop(reportId); } } + + private List selectPoolResource(String poolId) { + TestResourcePoolExample example = new TestResourcePoolExample(); + example.createCriteria().andStatusEqualTo("VALID").andTypeEqualTo("NODE").andIdEqualTo(poolId); + List pools = testResourcePoolMapper.selectByExample(example); + if (CollectionUtils.isNotEmpty(pools)) { + List poolIds = pools.stream().map(pool -> pool.getId()).collect(Collectors.toList()); + TestResourceExample resourceExample = new TestResourceExample(); + resourceExample.createCriteria().andTestResourcePoolIdIn(poolIds); + resourceExample.setOrderByClause("create_time"); + return testResourceMapper.selectByExampleWithBLOBs(resourceExample); + } + return new ArrayList<>(); + } + }