diff --git a/backend/src/main/java/io/metersphere/api/exec/api/TestPlanApiExecuteService.java b/backend/src/main/java/io/metersphere/api/exec/api/TestPlanApiExecuteService.java index 11b42099f8..c9227087cb 100644 --- a/backend/src/main/java/io/metersphere/api/exec/api/TestPlanApiExecuteService.java +++ b/backend/src/main/java/io/metersphere/api/exec/api/TestPlanApiExecuteService.java @@ -21,10 +21,7 @@ import io.metersphere.api.exec.utils.GenerateHashTreeUtil; import io.metersphere.api.jmeter.JMeterService; import io.metersphere.api.jmeter.MessageCache; import io.metersphere.api.service.RemakeReportService; -import io.metersphere.base.domain.ApiDefinitionExecResult; -import io.metersphere.base.domain.ApiTestCaseWithBLOBs; -import io.metersphere.base.domain.TestPlanApiCase; -import io.metersphere.base.domain.TestPlanApiCaseExample; +import io.metersphere.base.domain.*; import io.metersphere.base.mapper.ApiDefinitionExecResultMapper; import io.metersphere.base.mapper.ApiTestCaseMapper; import io.metersphere.base.mapper.TestPlanApiCaseMapper; @@ -159,17 +156,23 @@ public class TestPlanApiExecuteService { remakeReportService.remake(runRequest); continue; } - // 初始化等待队列 - SerialBlockingQueueUtil.init(runRequest.getReportId(), 1); LoggerUtil.info("TestPlan Serial run 【 " + runRequest.getReportId() + "】start"); // 开始执行 jMeterService.run(runRequest); Object reportObj = SerialBlockingQueueUtil.take(runRequest.getReportId()); + if (reportObj == null) { + LoggerUtil.info("TestPlan Serial run-进入超时补偿处理:【 " + runRequest.getReportId() + " 】"); + ApiDefinitionExecResult apiDefinitionExecResult = apiDefinitionExecResultMapper.selectByPrimaryKey(runRequest.getReportId()); + if (apiDefinitionExecResult != null) { + apiDefinitionExecResult.setStatus("timeout"); + apiDefinitionExecResultMapper.updateByPrimaryKey(apiDefinitionExecResult); + } + } LoggerUtil.info("TestPlan Serial run 【 " + runRequest.getReportId() + "】end"); // 如果开启失败结束执行,则判断返回结果状态 if (request.getConfig().isOnSampleError()) { - if (reportObj != null) { + if (reportObj != null && reportObj instanceof ApiDefinitionExecResult) { ApiDefinitionExecResult result = (ApiDefinitionExecResult) reportObj; if (result == null || !result.getStatus().equals("Success")) { break; diff --git a/backend/src/main/java/io/metersphere/api/exec/queue/ExecTask.java b/backend/src/main/java/io/metersphere/api/exec/queue/ExecTask.java index 1d8ede4912..5947234c15 100644 --- a/backend/src/main/java/io/metersphere/api/exec/queue/ExecTask.java +++ b/backend/src/main/java/io/metersphere/api/exec/queue/ExecTask.java @@ -18,11 +18,13 @@ public class ExecTask implements Runnable { @Override public void run() { - PoolExecBlockingQueueUtil.init(request.getReportId(), 1); LoggerUtil.info("开始执行报告ID:【 " + request.getReportId() + " 】,资源ID【 " + request.getTestId() + " 】"); JMeterService jMeterService = CommonBeanFactory.getBean(JMeterService.class); jMeterService.addQueue(request); - PoolExecBlockingQueueUtil.take(request.getReportId()); + Object res = PoolExecBlockingQueueUtil.take(request.getReportId()); + if (res == null) { + LoggerUtil.info("执行报告:【 " + request.getReportId() + " 】,资源ID【 " + request.getTestId() + " 】执行超时"); + } LoggerUtil.info("任务:【 " + request.getReportId() + " 】执行完成"); } } diff --git a/backend/src/main/java/io/metersphere/api/exec/queue/PoolExecBlockingQueueUtil.java b/backend/src/main/java/io/metersphere/api/exec/queue/PoolExecBlockingQueueUtil.java index 6e6bd4042b..1b8f30dcef 100644 --- a/backend/src/main/java/io/metersphere/api/exec/queue/PoolExecBlockingQueueUtil.java +++ b/backend/src/main/java/io/metersphere/api/exec/queue/PoolExecBlockingQueueUtil.java @@ -13,21 +13,16 @@ public class PoolExecBlockingQueueUtil { // 系统级队列控制整体并发数量 public static Map> queue = new ConcurrentHashMap<>(); - public static void init(String key, int queueSize) { - BlockingQueue blockingQueue = new ArrayBlockingQueue<>(queueSize); - queue.put(key, blockingQueue); - } + private static final String END_SIGN = "RUN-END"; + private static final int QUEUE_SIZE = 1; public static void offer(String key) { if (StringUtils.isNotEmpty(key) && queue.containsKey(key)) { try { - if (!queue.get(key).offer("end")) { - queue.get(key).add("end"); - } + queue.get(key).offer(END_SIGN); } catch (Exception e) { LogUtil.error(e); } finally { - queue.get(key).offer("end"); queue.remove(key); } } @@ -35,23 +30,20 @@ public class PoolExecBlockingQueueUtil { public static Object take(String key) { try { - if (StringUtils.isNotEmpty(key) && queue.containsKey(key)) { - return queue.get(key).poll(5, TimeUnit.MINUTES); + if (StringUtils.isNotEmpty(key) && !queue.containsKey(key)) { + BlockingQueue blockingQueue = new ArrayBlockingQueue<>(QUEUE_SIZE); + queue.put(key, blockingQueue); + return blockingQueue.poll(5, TimeUnit.MINUTES); } } catch (Exception e) { - LogUtil.error("获取队列失败:" + e.getMessage()); - return null; - } finally { - if (StringUtils.isNotEmpty(key)) { - queue.remove(key); - } + LogUtil.error("初始化队列失败:" + e.getMessage()); } return null; } public static void remove(String key) { if (StringUtils.isNotEmpty(key) && queue.containsKey(key)) { - queue.get(key).add("end"); + queue.get(key).offer(END_SIGN); queue.remove(key); } } diff --git a/backend/src/main/java/io/metersphere/api/exec/queue/SerialBlockingQueueUtil.java b/backend/src/main/java/io/metersphere/api/exec/queue/SerialBlockingQueueUtil.java index fb4ee17532..51ab0e1262 100644 --- a/backend/src/main/java/io/metersphere/api/exec/queue/SerialBlockingQueueUtil.java +++ b/backend/src/main/java/io/metersphere/api/exec/queue/SerialBlockingQueueUtil.java @@ -14,46 +14,39 @@ import java.util.concurrent.TimeUnit; public class SerialBlockingQueueUtil { // 只作用与串行任务 public static Map> queue = new ConcurrentHashMap<>(); + public static final String END_SIGN = "RUN-END"; + private static final int QUEUE_SIZE = 1; - public static void init(String key, int queueSize) { - BlockingQueue blockingQueue = new ArrayBlockingQueue<>(queueSize); - queue.put(key, blockingQueue); - } public static void offer(ResultDTO dto, Object report) { String key = dto != null && StringUtils.equals(dto.getReportType(), RunModeConstants.SET_REPORT.toString()) ? dto.getReportId() + "_" + dto.getTestId() : dto.getReportId(); if (StringUtils.isNotEmpty(key) && queue.containsKey(key)) { try { - if (!queue.get(key).offer(report)) { - queue.get(key).add(report); - } + queue.get(key).offer(report); } catch (Exception e) { LogUtil.error(e); } finally { - queue.get(key).offer(report); + queue.remove(key); } } } public static Object take(String key) { try { - if (StringUtils.isNotEmpty(key) && queue.containsKey(key)) { - return queue.get(key).poll(3, TimeUnit.MINUTES); + if (StringUtils.isNotEmpty(key) && !queue.containsKey(key)) { + BlockingQueue blockingQueue = new ArrayBlockingQueue<>(QUEUE_SIZE); + queue.put(key, blockingQueue); + return blockingQueue.poll(3, TimeUnit.MINUTES); } } catch (Exception e) { LogUtil.error("获取队列失败:" + e.getMessage()); - return null; - } finally { - if (StringUtils.isNotEmpty(key)) { - queue.remove(key); - } } return null; } public static void remove(String key) { if (StringUtils.isNotEmpty(key) && queue.containsKey(key)) { - queue.get(key).add(null); + queue.get(key).offer(END_SIGN); queue.remove(key); } } diff --git a/backend/src/main/java/io/metersphere/api/exec/scenario/ApiScenarioSerialService.java b/backend/src/main/java/io/metersphere/api/exec/scenario/ApiScenarioSerialService.java index 2b3995a7e3..dc34e5b59f 100644 --- a/backend/src/main/java/io/metersphere/api/exec/scenario/ApiScenarioSerialService.java +++ b/backend/src/main/java/io/metersphere/api/exec/scenario/ApiScenarioSerialService.java @@ -88,9 +88,6 @@ public class ApiScenarioSerialService { hashTreeUtil.setEnvParamsMapToHashTree(hashTree, executeEnvParams); executeQueue.get(key).setHashTree(hashTree); } - - SerialBlockingQueueUtil.init(queueReportId, 1); - String reportId = StringUtils.isNotEmpty(serialReportId) ? serialReportId : key; JmeterRunRequestDTO runRequest = new JmeterRunRequestDTO(executeQueue.get(key).getTestId(), reportId, request.getRunMode(), executeQueue.get(key).getHashTree()); if (request.getConfig() != null) { @@ -104,10 +101,18 @@ public class ApiScenarioSerialService { jMeterService.run(runRequest); Object reportObj = SerialBlockingQueueUtil.take(queueReportId); + if (reportObj == null) { + LoggerUtil.info("Scenario run-进入超时补偿处理:【 " + queueReportId + " 】"); + ApiScenarioReport scenarioReport = apiScenarioReportMapper.selectByPrimaryKey(reportId); + if (scenarioReport != null) { + scenarioReport.setStatus("Timeout"); + apiScenarioReportMapper.updateByPrimaryKey(scenarioReport); + } + } LoggerUtil.info("Scenario run-执行完成:【 " + queueReportId + " 】"); // 如果开启失败结束执行,则判断返回结果状态 if (request.getConfig().isOnSampleError()) { - if (reportObj != null) { + if (reportObj != null && reportObj instanceof ApiScenarioReport) { ApiScenarioReport scenarioReport = (ApiScenarioReport) reportObj; if (!scenarioReport.getStatus().equals("Success")) { break; diff --git a/backend/src/main/java/io/metersphere/api/jmeter/APISingleResultListener.java b/backend/src/main/java/io/metersphere/api/jmeter/APISingleResultListener.java index cfcd08e22f..232d12c673 100644 --- a/backend/src/main/java/io/metersphere/api/jmeter/APISingleResultListener.java +++ b/backend/src/main/java/io/metersphere/api/jmeter/APISingleResultListener.java @@ -2,7 +2,6 @@ package io.metersphere.api.jmeter; import io.metersphere.api.exec.queue.PoolExecBlockingQueueUtil; -import io.metersphere.api.exec.queue.SerialBlockingQueueUtil; import io.metersphere.api.service.MsResultService; import io.metersphere.api.service.TestResultService; import io.metersphere.commons.utils.CommonBeanFactory; @@ -26,8 +25,6 @@ public class APISingleResultListener extends MsExecListener { public void testEnded(ResultDTO dto, Map kafkaConfig) { try { LoggerUtil.info("进入TEST-END处理报告【" + dto.getReportId() + " 】整体执行完成;" + dto.getRunMode()); - // 串行队列 - SerialBlockingQueueUtil.offer(dto, "testEnd"); // 全局并发队列 PoolExecBlockingQueueUtil.offer(dto.getReportId()); dto.setConsole(CommonBeanFactory.getBean(MsResultService.class).getJmeterLogger(dto.getReportId())); diff --git a/backend/src/main/java/io/metersphere/api/jmeter/MsKafkaListener.java b/backend/src/main/java/io/metersphere/api/jmeter/MsKafkaListener.java index 5e4ec20d6f..3c292c0665 100644 --- a/backend/src/main/java/io/metersphere/api/jmeter/MsKafkaListener.java +++ b/backend/src/main/java/io/metersphere/api/jmeter/MsKafkaListener.java @@ -10,6 +10,7 @@ import io.metersphere.api.service.TestResultService; import io.metersphere.config.KafkaConfig; import io.metersphere.dto.ResultDTO; import io.metersphere.utils.LoggerUtil; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; @@ -29,7 +30,9 @@ public class MsKafkaListener { LoggerUtil.info("报告 【 " + testResult.getReportId() + " 】资源 " + testResult.getTestId() + " 整体执行完成"); testResultService.testEnded(testResult); // 串行队列 - SerialBlockingQueueUtil.offer(testResult, "testEnd"); + if (CollectionUtils.isEmpty(testResult.getRequestResults())) { + SerialBlockingQueueUtil.offer(testResult, SerialBlockingQueueUtil.END_SIGN); + } // 全局并发队列 PoolExecBlockingQueueUtil.offer(testResult.getReportId()); } else { diff --git a/backend/src/main/java/io/metersphere/api/service/ApiScenarioReportService.java b/backend/src/main/java/io/metersphere/api/service/ApiScenarioReportService.java index 1cb6c15ca8..b2b1260239 100644 --- a/backend/src/main/java/io/metersphere/api/service/ApiScenarioReportService.java +++ b/backend/src/main/java/io/metersphere/api/service/ApiScenarioReportService.java @@ -117,6 +117,8 @@ public class ApiScenarioReportService { scenarioReport = updateScenario(requestResults, dto); } } + // 串行队列 + SerialBlockingQueueUtil.offer(dto, scenarioReport != null ? scenarioReport : SerialBlockingQueueUtil.END_SIGN); return scenarioReport; } @@ -269,8 +271,6 @@ public class ApiScenarioReportService { } } ApiScenarioReport report = editReport(dto.getReportType(), dto.getReportId(), errorSize > 0 ? "Error" : "Success", dto.getRunMode()); - // 串行队列 - SerialBlockingQueueUtil.offer(dto, report); return report; } @@ -329,8 +329,6 @@ public class ApiScenarioReportService { apiScenarioMapper.updateByPrimaryKey(scenario); } } - // 串行队列 - SerialBlockingQueueUtil.offer(dto, report); testPlanLog.info("TestPlanReportId" + JSONArray.toJSONString(testPlanReportIdList) + " EXECUTE OVER. SCENARIO STATUS : " + JSONObject.toJSONString(scenarioAndErrorMap)); for (String item : testPlanReportIdList) { TestPlanReportExecuteCatch.updateApiTestPlanExecuteInfo(item, null, scenarioAndErrorMap, null); @@ -385,8 +383,6 @@ public class ApiScenarioReportService { if (scenario != null && report != null) { sendNotice(scenario, report); } - // 串行队列 - SerialBlockingQueueUtil.offer(dto, report); return report; } diff --git a/backend/src/main/java/io/metersphere/api/service/TestResultService.java b/backend/src/main/java/io/metersphere/api/service/TestResultService.java index 92d978cb72..d0237d1868 100644 --- a/backend/src/main/java/io/metersphere/api/service/TestResultService.java +++ b/backend/src/main/java/io/metersphere/api/service/TestResultService.java @@ -1,6 +1,7 @@ package io.metersphere.api.service; import io.metersphere.api.dto.automation.ApiTestReportVariable; +import io.metersphere.api.exec.queue.SerialBlockingQueueUtil; import io.metersphere.api.jmeter.ExecutedHandleSingleton; import io.metersphere.base.domain.*; import io.metersphere.commons.constants.ApiRunMode; @@ -106,6 +107,11 @@ public class TestResultService { } } } + } else { + // 串行队列 + if (dto != null && org.apache.commons.collections.CollectionUtils.isEmpty(dto.getRequestResults())) { + SerialBlockingQueueUtil.offer(dto, SerialBlockingQueueUtil.END_SIGN); + } } } diff --git a/backend/src/main/java/io/metersphere/xpack b/backend/src/main/java/io/metersphere/xpack index 5055bb87bf..dae3bfa801 160000 --- a/backend/src/main/java/io/metersphere/xpack +++ b/backend/src/main/java/io/metersphere/xpack @@ -1 +1 @@ -Subproject commit 5055bb87bffbb60d57268a684c90edf8b27c2df8 +Subproject commit dae3bfa8015fefe44c0c7a7517a43a5da93e6950