refactor(接口测试): 优化超时队列,超时报告更新状态 timeout

This commit is contained in:
fit2-zhao 2021-12-16 10:24:11 +08:00 committed by fit2-zhao
parent 0f8792324a
commit 5517d5c437
10 changed files with 54 additions and 57 deletions

View File

@ -21,10 +21,7 @@ import io.metersphere.api.exec.utils.GenerateHashTreeUtil;
import io.metersphere.api.jmeter.JMeterService;
import io.metersphere.api.jmeter.MessageCache;
import io.metersphere.api.service.RemakeReportService;
import io.metersphere.base.domain.ApiDefinitionExecResult;
import io.metersphere.base.domain.ApiTestCaseWithBLOBs;
import io.metersphere.base.domain.TestPlanApiCase;
import io.metersphere.base.domain.TestPlanApiCaseExample;
import io.metersphere.base.domain.*;
import io.metersphere.base.mapper.ApiDefinitionExecResultMapper;
import io.metersphere.base.mapper.ApiTestCaseMapper;
import io.metersphere.base.mapper.TestPlanApiCaseMapper;
@ -159,17 +156,23 @@ public class TestPlanApiExecuteService {
remakeReportService.remake(runRequest);
continue;
}
// 初始化等待队列
SerialBlockingQueueUtil.init(runRequest.getReportId(), 1);
LoggerUtil.info("TestPlan Serial run 【 " + runRequest.getReportId() + "】start");
// 开始执行
jMeterService.run(runRequest);
Object reportObj = SerialBlockingQueueUtil.take(runRequest.getReportId());
if (reportObj == null) {
LoggerUtil.info("TestPlan Serial run-进入超时补偿处理:【 " + runRequest.getReportId() + "");
ApiDefinitionExecResult apiDefinitionExecResult = apiDefinitionExecResultMapper.selectByPrimaryKey(runRequest.getReportId());
if (apiDefinitionExecResult != null) {
apiDefinitionExecResult.setStatus("timeout");
apiDefinitionExecResultMapper.updateByPrimaryKey(apiDefinitionExecResult);
}
}
LoggerUtil.info("TestPlan Serial run 【 " + runRequest.getReportId() + "】end");
// 如果开启失败结束执行则判断返回结果状态
if (request.getConfig().isOnSampleError()) {
if (reportObj != null) {
if (reportObj != null && reportObj instanceof ApiDefinitionExecResult) {
ApiDefinitionExecResult result = (ApiDefinitionExecResult) reportObj;
if (result == null || !result.getStatus().equals("Success")) {
break;

View File

@ -18,11 +18,13 @@ public class ExecTask implements Runnable {
@Override
public void run() {
PoolExecBlockingQueueUtil.init(request.getReportId(), 1);
LoggerUtil.info("开始执行报告ID" + request.getReportId() + " 】,资源ID【 " + request.getTestId() + "");
JMeterService jMeterService = CommonBeanFactory.getBean(JMeterService.class);
jMeterService.addQueue(request);
PoolExecBlockingQueueUtil.take(request.getReportId());
Object res = PoolExecBlockingQueueUtil.take(request.getReportId());
if (res == null) {
LoggerUtil.info("执行报告:【 " + request.getReportId() + " 】,资源ID【 " + request.getTestId() + " 】执行超时");
}
LoggerUtil.info("任务:【 " + request.getReportId() + " 】执行完成");
}
}

View File

@ -13,21 +13,16 @@ public class PoolExecBlockingQueueUtil {
// 系统级队列控制整体并发数量
public static Map<String, BlockingQueue<Object>> queue = new ConcurrentHashMap<>();
public static void init(String key, int queueSize) {
BlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(queueSize);
queue.put(key, blockingQueue);
}
private static final String END_SIGN = "RUN-END";
private static final int QUEUE_SIZE = 1;
public static void offer(String key) {
if (StringUtils.isNotEmpty(key) && queue.containsKey(key)) {
try {
if (!queue.get(key).offer("end")) {
queue.get(key).add("end");
}
queue.get(key).offer(END_SIGN);
} catch (Exception e) {
LogUtil.error(e);
} finally {
queue.get(key).offer("end");
queue.remove(key);
}
}
@ -35,23 +30,20 @@ public class PoolExecBlockingQueueUtil {
public static Object take(String key) {
try {
if (StringUtils.isNotEmpty(key) && queue.containsKey(key)) {
return queue.get(key).poll(5, TimeUnit.MINUTES);
if (StringUtils.isNotEmpty(key) && !queue.containsKey(key)) {
BlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(QUEUE_SIZE);
queue.put(key, blockingQueue);
return blockingQueue.poll(5, TimeUnit.MINUTES);
}
} catch (Exception e) {
LogUtil.error("获取队列失败:" + e.getMessage());
return null;
} finally {
if (StringUtils.isNotEmpty(key)) {
queue.remove(key);
}
LogUtil.error("初始化队列失败:" + e.getMessage());
}
return null;
}
public static void remove(String key) {
if (StringUtils.isNotEmpty(key) && queue.containsKey(key)) {
queue.get(key).add("end");
queue.get(key).offer(END_SIGN);
queue.remove(key);
}
}

View File

@ -14,46 +14,39 @@ import java.util.concurrent.TimeUnit;
public class SerialBlockingQueueUtil {
// 只作用与串行任务
public static Map<String, BlockingQueue<Object>> queue = new ConcurrentHashMap<>();
public static final String END_SIGN = "RUN-END";
private static final int QUEUE_SIZE = 1;
public static void init(String key, int queueSize) {
BlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(queueSize);
queue.put(key, blockingQueue);
}
public static void offer(ResultDTO dto, Object report) {
String key = dto != null && StringUtils.equals(dto.getReportType(), RunModeConstants.SET_REPORT.toString()) ? dto.getReportId() + "_" + dto.getTestId() : dto.getReportId();
if (StringUtils.isNotEmpty(key) && queue.containsKey(key)) {
try {
if (!queue.get(key).offer(report)) {
queue.get(key).add(report);
}
queue.get(key).offer(report);
} catch (Exception e) {
LogUtil.error(e);
} finally {
queue.get(key).offer(report);
queue.remove(key);
}
}
}
public static Object take(String key) {
try {
if (StringUtils.isNotEmpty(key) && queue.containsKey(key)) {
return queue.get(key).poll(3, TimeUnit.MINUTES);
if (StringUtils.isNotEmpty(key) && !queue.containsKey(key)) {
BlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(QUEUE_SIZE);
queue.put(key, blockingQueue);
return blockingQueue.poll(3, TimeUnit.MINUTES);
}
} catch (Exception e) {
LogUtil.error("获取队列失败:" + e.getMessage());
return null;
} finally {
if (StringUtils.isNotEmpty(key)) {
queue.remove(key);
}
}
return null;
}
public static void remove(String key) {
if (StringUtils.isNotEmpty(key) && queue.containsKey(key)) {
queue.get(key).add(null);
queue.get(key).offer(END_SIGN);
queue.remove(key);
}
}

View File

@ -88,9 +88,6 @@ public class ApiScenarioSerialService {
hashTreeUtil.setEnvParamsMapToHashTree(hashTree, executeEnvParams);
executeQueue.get(key).setHashTree(hashTree);
}
SerialBlockingQueueUtil.init(queueReportId, 1);
String reportId = StringUtils.isNotEmpty(serialReportId) ? serialReportId : key;
JmeterRunRequestDTO runRequest = new JmeterRunRequestDTO(executeQueue.get(key).getTestId(), reportId, request.getRunMode(), executeQueue.get(key).getHashTree());
if (request.getConfig() != null) {
@ -104,10 +101,18 @@ public class ApiScenarioSerialService {
jMeterService.run(runRequest);
Object reportObj = SerialBlockingQueueUtil.take(queueReportId);
if (reportObj == null) {
LoggerUtil.info("Scenario run-进入超时补偿处理:【 " + queueReportId + "");
ApiScenarioReport scenarioReport = apiScenarioReportMapper.selectByPrimaryKey(reportId);
if (scenarioReport != null) {
scenarioReport.setStatus("Timeout");
apiScenarioReportMapper.updateByPrimaryKey(scenarioReport);
}
}
LoggerUtil.info("Scenario run-执行完成:【 " + queueReportId + "");
// 如果开启失败结束执行则判断返回结果状态
if (request.getConfig().isOnSampleError()) {
if (reportObj != null) {
if (reportObj != null && reportObj instanceof ApiScenarioReport) {
ApiScenarioReport scenarioReport = (ApiScenarioReport) reportObj;
if (!scenarioReport.getStatus().equals("Success")) {
break;

View File

@ -2,7 +2,6 @@ package io.metersphere.api.jmeter;
import io.metersphere.api.exec.queue.PoolExecBlockingQueueUtil;
import io.metersphere.api.exec.queue.SerialBlockingQueueUtil;
import io.metersphere.api.service.MsResultService;
import io.metersphere.api.service.TestResultService;
import io.metersphere.commons.utils.CommonBeanFactory;
@ -26,8 +25,6 @@ public class APISingleResultListener extends MsExecListener {
public void testEnded(ResultDTO dto, Map<String, Object> kafkaConfig) {
try {
LoggerUtil.info("进入TEST-END处理报告【" + dto.getReportId() + " 】整体执行完成;" + dto.getRunMode());
// 串行队列
SerialBlockingQueueUtil.offer(dto, "testEnd");
// 全局并发队列
PoolExecBlockingQueueUtil.offer(dto.getReportId());
dto.setConsole(CommonBeanFactory.getBean(MsResultService.class).getJmeterLogger(dto.getReportId()));

View File

@ -10,6 +10,7 @@ import io.metersphere.api.service.TestResultService;
import io.metersphere.config.KafkaConfig;
import io.metersphere.dto.ResultDTO;
import io.metersphere.utils.LoggerUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
@ -29,7 +30,9 @@ public class MsKafkaListener {
LoggerUtil.info("报告 【 " + testResult.getReportId() + " 】资源 " + testResult.getTestId() + " 整体执行完成");
testResultService.testEnded(testResult);
// 串行队列
SerialBlockingQueueUtil.offer(testResult, "testEnd");
if (CollectionUtils.isEmpty(testResult.getRequestResults())) {
SerialBlockingQueueUtil.offer(testResult, SerialBlockingQueueUtil.END_SIGN);
}
// 全局并发队列
PoolExecBlockingQueueUtil.offer(testResult.getReportId());
} else {

View File

@ -117,6 +117,8 @@ public class ApiScenarioReportService {
scenarioReport = updateScenario(requestResults, dto);
}
}
// 串行队列
SerialBlockingQueueUtil.offer(dto, scenarioReport != null ? scenarioReport : SerialBlockingQueueUtil.END_SIGN);
return scenarioReport;
}
@ -269,8 +271,6 @@ public class ApiScenarioReportService {
}
}
ApiScenarioReport report = editReport(dto.getReportType(), dto.getReportId(), errorSize > 0 ? "Error" : "Success", dto.getRunMode());
// 串行队列
SerialBlockingQueueUtil.offer(dto, report);
return report;
}
@ -329,8 +329,6 @@ public class ApiScenarioReportService {
apiScenarioMapper.updateByPrimaryKey(scenario);
}
}
// 串行队列
SerialBlockingQueueUtil.offer(dto, report);
testPlanLog.info("TestPlanReportId" + JSONArray.toJSONString(testPlanReportIdList) + " EXECUTE OVER. SCENARIO STATUS : " + JSONObject.toJSONString(scenarioAndErrorMap));
for (String item : testPlanReportIdList) {
TestPlanReportExecuteCatch.updateApiTestPlanExecuteInfo(item, null, scenarioAndErrorMap, null);
@ -385,8 +383,6 @@ public class ApiScenarioReportService {
if (scenario != null && report != null) {
sendNotice(scenario, report);
}
// 串行队列
SerialBlockingQueueUtil.offer(dto, report);
return report;
}

View File

@ -1,6 +1,7 @@
package io.metersphere.api.service;
import io.metersphere.api.dto.automation.ApiTestReportVariable;
import io.metersphere.api.exec.queue.SerialBlockingQueueUtil;
import io.metersphere.api.jmeter.ExecutedHandleSingleton;
import io.metersphere.base.domain.*;
import io.metersphere.commons.constants.ApiRunMode;
@ -106,6 +107,11 @@ public class TestResultService {
}
}
}
} else {
// 串行队列
if (dto != null && org.apache.commons.collections.CollectionUtils.isEmpty(dto.getRequestResults())) {
SerialBlockingQueueUtil.offer(dto, SerialBlockingQueueUtil.END_SIGN);
}
}
}

@ -1 +1 @@
Subproject commit 5055bb87bffbb60d57268a684c90edf8b27c2df8
Subproject commit dae3bfa8015fefe44c0c7a7517a43a5da93e6950