diff --git a/backend/src/main/java/io/metersphere/api/exec/scenario/ApiScenarioSerialService.java b/backend/src/main/java/io/metersphere/api/exec/scenario/ApiScenarioSerialService.java index 1235efc111..cc546683e1 100644 --- a/backend/src/main/java/io/metersphere/api/exec/scenario/ApiScenarioSerialService.java +++ b/backend/src/main/java/io/metersphere/api/exec/scenario/ApiScenarioSerialService.java @@ -166,7 +166,7 @@ public class ApiScenarioSerialService { MsThreadGroup group = new MsThreadGroup(); group.setLabel(caseWithBLOBs.getName()); - group.setName(caseWithBLOBs.getName()); + group.setName(runRequest.getReportId()); group.setProjectId(caseWithBLOBs.getProjectId()); MsTestElement testElement = parse(caseWithBLOBs, testId, envId); diff --git a/backend/src/main/java/io/metersphere/api/exec/utils/ApiDefinitionExecResultUtil.java b/backend/src/main/java/io/metersphere/api/exec/utils/ApiDefinitionExecResultUtil.java index 45c35ea518..a3bc007e75 100644 --- a/backend/src/main/java/io/metersphere/api/exec/utils/ApiDefinitionExecResultUtil.java +++ b/backend/src/main/java/io/metersphere/api/exec/utils/ApiDefinitionExecResultUtil.java @@ -49,6 +49,7 @@ public class ApiDefinitionExecResultUtil { if (caseWithBLOBs != null) { apiResult.setName(caseWithBLOBs.getName()); apiResult.setProjectId(caseWithBLOBs.getProjectId()); + apiResult.setVersionId(caseWithBLOBs.getVersionId()); } apiResult.setTriggerMode(request.getTriggerMode()); apiResult.setActuator("LOCAL"); diff --git a/backend/src/main/java/io/metersphere/api/jmeter/ExecutedHandleSingleton.java b/backend/src/main/java/io/metersphere/api/jmeter/ExecutedHandleSingleton.java index e797f8a2c8..50d514a74b 100644 --- a/backend/src/main/java/io/metersphere/api/jmeter/ExecutedHandleSingleton.java +++ b/backend/src/main/java/io/metersphere/api/jmeter/ExecutedHandleSingleton.java @@ -2,8 +2,6 @@ package io.metersphere.api.jmeter; import io.metersphere.api.service.ApiEnvironmentRunningParamService; import io.metersphere.commons.utils.CommonBeanFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.List; @@ -15,16 +13,11 @@ import java.util.List; */ public class ExecutedHandleSingleton { private static volatile ApiEnvironmentRunningParamService apiEnvironmentRunningParamService = CommonBeanFactory.getBean(ApiEnvironmentRunningParamService.class); - static Logger testPlanLog = LoggerFactory.getLogger("testPlanExecuteLog"); private ExecutedHandleSingleton() { } public static void parseEnvironment(List evnStrList) { for (String evnStr: evnStrList) { - try { - Thread.sleep(1000); - }catch (Exception e){ - } apiEnvironmentRunningParamService.parseEvn(evnStr); } } diff --git a/backend/src/main/java/io/metersphere/api/jmeter/MsKafkaListener.java b/backend/src/main/java/io/metersphere/api/jmeter/MsKafkaListener.java index a82e7886c6..2d5e5f309e 100644 --- a/backend/src/main/java/io/metersphere/api/jmeter/MsKafkaListener.java +++ b/backend/src/main/java/io/metersphere/api/jmeter/MsKafkaListener.java @@ -8,50 +8,95 @@ import io.metersphere.api.service.ApiEnvironmentRunningParamService; import io.metersphere.api.service.ApiExecutionQueueService; import io.metersphere.api.service.TestResultService; import io.metersphere.commons.constants.ApiRunMode; -import io.metersphere.commons.utils.CommonBeanFactory; import io.metersphere.config.KafkaConfig; import io.metersphere.dto.ResultDTO; import io.metersphere.utils.LoggerUtil; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.stereotype.Service; +import org.springframework.kafka.support.Acknowledgment; import javax.annotation.Resource; +import java.util.*; -@Service +@Configuration public class MsKafkaListener { public static final String CONSUME_ID = "ms-api-exec-consume"; @Resource private ApiExecutionQueueService apiExecutionQueueService; - @KafkaListener(id = CONSUME_ID, topics = KafkaConfig.TOPICS, groupId = "${spring.kafka.consumer.group-id}") - public void consume(ConsumerRecord record) { - LoggerUtil.info("接收到执行结果开始存储"); - ResultDTO testResult = this.formatResult(record.value()); - if (testResult != null && testResult.getArbitraryData() != null && testResult.getArbitraryData().containsKey("TEST_END") && (Boolean) testResult.getArbitraryData().get("TEST_END")) { - LoggerUtil.info("报告 【 " + testResult.getReportId() + " 】资源 " + testResult.getTestId() + " 整体执行完成"); - testResultService.testEnded(testResult); + private static final Map RUN_MODE_MAP = new HashMap() {{ + this.put(ApiRunMode.SCHEDULE_API_PLAN.name(), "schedule-task"); + this.put(ApiRunMode.JENKINS_API_PLAN.name(), "schedule-task"); + this.put(ApiRunMode.MANUAL_PLAN.name(), "schedule-task"); - LoggerUtil.info("执行队列处理:" + testResult.getQueueId()); - apiExecutionQueueService.queueNext(testResult); - // 全局并发队列 - PoolExecBlockingQueueUtil.offer(testResult.getReportId()); - // 更新测试计划报告 - if (StringUtils.isNotEmpty(testResult.getTestPlanReportId())) { - LoggerUtil.info("Check Processing Test Plan report status:" + testResult.getQueueId() + "," + testResult.getTestId()); - apiExecutionQueueService.testPlanReportTestEnded(testResult.getTestPlanReportId()); + this.put(ApiRunMode.DEFINITION.name(), "api-test-case-task"); + this.put(ApiRunMode.JENKINS.name(), "api-test-case-task"); + this.put(ApiRunMode.API_PLAN.name(), "api-test-case-task"); + this.put(ApiRunMode.JENKINS_API_PLAN.name(), "api-test-case-task"); + this.put(ApiRunMode.MANUAL_PLAN.name(), "api-test-case-task"); + + + this.put(ApiRunMode.SCENARIO.name(), "api-scenario-task"); + this.put(ApiRunMode.SCENARIO_PLAN.name(), "api-scenario-task"); + this.put(ApiRunMode.SCHEDULE_SCENARIO_PLAN.name(), "api-scenario-task"); + this.put(ApiRunMode.SCHEDULE_SCENARIO.name(), "api-scenario-task"); + this.put(ApiRunMode.JENKINS_SCENARIO_PLAN.name(), "api-scenario-task"); + + }}; + + @KafkaListener(id = CONSUME_ID, topics = KafkaConfig.TOPICS, groupId = "${spring.kafka.consumer.group-id}", containerFactory = "batchFactory") + public void consume(List> records, Acknowledgment ack) { + try { + LoggerUtil.info("进入KAFKA消费,接收到执行结果开始存储:" + records.size()); + // 分三类存储 + Map> assortMap = new LinkedHashMap<>(); + List resultDTOS = new LinkedList<>(); + + records.forEach(record -> { + ResultDTO testResult = this.formatResult(record.value()); + if (testResult != null) { + if (testResult.getArbitraryData() != null && testResult.getArbitraryData().containsKey("TEST_END") && (Boolean) testResult.getArbitraryData().get("TEST_END")) { + resultDTOS.add(testResult); + } else { + String key = RUN_MODE_MAP.get(testResult.getRunMode()); + if (assortMap.containsKey(key)) { + assortMap.get(key).add(testResult); + } else { + assortMap.put(key, new LinkedList() {{ + this.add(testResult); + }}); + } + } + } + }); + if (!assortMap.isEmpty()) { + testResultService.batchSaveResults(assortMap); + LoggerUtil.info("KAFKA消费执行内容存储结束"); } - } else { - // 更新报告最后接收到请求的时间 - if (StringUtils.equalsAny(testResult.getRunMode(), ApiRunMode.SCENARIO.name(), - ApiRunMode.SCENARIO_PLAN.name(), ApiRunMode.SCHEDULE_SCENARIO_PLAN.name(), - ApiRunMode.SCHEDULE_SCENARIO.name(), ApiRunMode.JENKINS_SCENARIO_PLAN.name())) { - CommonBeanFactory.getBean(TestResultService.class).editReportTime(testResult); + // 更新执行结果 + if (CollectionUtils.isNotEmpty(resultDTOS)) { + resultDTOS.forEach(testResult -> { + LoggerUtil.info("报告 【 " + testResult.getReportId() + " 】资源 " + testResult.getTestId() + " 整体执行完成"); + testResultService.testEnded(testResult); + LoggerUtil.info("执行队列处理:" + testResult.getQueueId()); + apiExecutionQueueService.queueNext(testResult); + // 全局并发队列 + PoolExecBlockingQueueUtil.offer(testResult.getReportId()); + // 更新测试计划报告 + if (StringUtils.isNotEmpty(testResult.getTestPlanReportId())) { + LoggerUtil.info("Check Processing Test Plan report status:" + testResult.getQueueId() + "," + testResult.getTestId()); + apiExecutionQueueService.testPlanReportTestEnded(testResult.getTestPlanReportId()); + } + }); } - testResultService.saveResults(testResult); + } catch (Exception e) { + LoggerUtil.error("KAFKA消费失败:" + e.getMessage()); + } finally { + ack.acknowledge(); } - LoggerUtil.info("执行内容存储结束"); } @Resource diff --git a/backend/src/main/java/io/metersphere/api/service/ApiDefinitionExecResultService.java b/backend/src/main/java/io/metersphere/api/service/ApiDefinitionExecResultService.java index 67ba9c48d1..4e167b51c1 100644 --- a/backend/src/main/java/io/metersphere/api/service/ApiDefinitionExecResultService.java +++ b/backend/src/main/java/io/metersphere/api/service/ApiDefinitionExecResultService.java @@ -28,6 +28,10 @@ import org.apache.commons.beanutils.BeanMap; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.ibatis.session.ExecutorType; +import org.apache.ibatis.session.SqlSession; +import org.apache.ibatis.session.SqlSessionFactory; +import org.mybatis.spring.SqlSessionUtils; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -63,6 +67,8 @@ public class ApiDefinitionExecResultService { private UserMapper userMapper; @Resource private ProjectMapper projectMapper; + @Resource + private SqlSessionFactory sqlSessionFactory; public void saveApiResult(List requestResults, ResultDTO dto) { LoggerUtil.info("接收到API/CASE执行结果【 " + requestResults.size() + " 】"); @@ -72,7 +78,7 @@ public class ApiDefinitionExecResultService { item.getResponseResult().setResponseTime((item.getEndTime() - item.getStartTime())); } if (!StringUtils.startsWithAny(item.getName(), "PRE_PROCESSOR_ENV_", "POST_PROCESSOR_ENV_")) { - ApiDefinitionExecResult result = this.save(item, dto.getReportId(), dto.getConsole(), dto.getRunMode(), dto.getTestId()); + ApiDefinitionExecResult result = this.editResult(item, dto.getReportId(), dto.getConsole(), dto.getRunMode(), dto.getTestId(), null); if (result != null) { // 发送通知 sendNotice(result); @@ -81,6 +87,54 @@ public class ApiDefinitionExecResultService { } } + public void batchSaveApiResult(List resultDTOS, boolean isSchedule) { + if (CollectionUtils.isEmpty(resultDTOS)) { + return; + } + LoggerUtil.info("接收到API/CASE执行结果【 " + resultDTOS.size() + " 】"); + + SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH); + ApiDefinitionExecResultMapper definitionExecResultMapper = sqlSession.getMapper(ApiDefinitionExecResultMapper.class); + TestPlanApiCaseMapper planApiCaseMapper = sqlSession.getMapper(TestPlanApiCaseMapper.class); + TestCaseReviewApiCaseMapper reviewApiCaseMapper = sqlSession.getMapper(TestCaseReviewApiCaseMapper.class); + ApiTestCaseMapper batchApiTestCaseMapper = sqlSession.getMapper(ApiTestCaseMapper.class); + + for (ResultDTO dto : resultDTOS) { + if (CollectionUtils.isNotEmpty(dto.getRequestResults())) { + for (RequestResult item : dto.getRequestResults()) { + if (!StringUtils.startsWithAny(item.getName(), "PRE_PROCESSOR_ENV_", "POST_PROCESSOR_ENV_")) { + ApiDefinitionExecResult result = this.editResult(item, dto.getReportId(), dto.getConsole(), dto.getRunMode(), dto.getTestId(), + definitionExecResultMapper); + // 批量更新关联关系状态 + batchEditStatus(dto.getRunMode(), result.getStatus(), result.getId(), dto.getTestId(), + planApiCaseMapper, reviewApiCaseMapper, batchApiTestCaseMapper + ); + + if (result != null && !StringUtils.startsWithAny(dto.getRunMode(), "SCHEDULE")) { + // 发送通知 + sendNotice(result); + } + } + } + if (isSchedule) { + // 这个方法得优化大批量跑有问题 + updateTestCaseStates(dto.getTestId()); + Map apiIdResultMap = new HashMap<>(); + long errorSize = dto.getRequestResults().stream().filter(requestResult -> requestResult.getError() > 0).count(); + String status = errorSize > 0 || dto.getRequestResults().isEmpty() ? TestPlanApiExecuteStatus.FAILD.name() : TestPlanApiExecuteStatus.SUCCESS.name(); + if (StringUtils.isNotEmpty(dto.getReportId())) { + apiIdResultMap.put(dto.getReportId(), status); + } + LoggerUtil.info("TestPlanReportId[" + dto.getTestPlanReportId() + "] API CASE OVER. API CASE STATUS:" + JSONObject.toJSONString(apiIdResultMap)); + } + } + } + sqlSession.flushStatements(); + if (sqlSession != null && sqlSessionFactory != null) { + SqlSessionUtils.closeSqlSession(sqlSession, sqlSessionFactory); + } + } + private void sendNotice(ApiDefinitionExecResult result) { try { String resourceId = result.getResourceId(); @@ -206,6 +260,34 @@ public class ApiDefinitionExecResultService { saveResult.setName(name); } + public void batchEditStatus(String type, String status, String reportId, String testId, + TestPlanApiCaseMapper batchTestPlanApiCaseMapper, + TestCaseReviewApiCaseMapper batchReviewApiCaseMapper, + ApiTestCaseMapper batchApiTestCaseMapper) { + if (StringUtils.equalsAnyIgnoreCase(type, ApiRunMode.API_PLAN.name(), ApiRunMode.SCHEDULE_API_PLAN.name(), + ApiRunMode.JENKINS_API_PLAN.name(), ApiRunMode.MANUAL_PLAN.name())) { + TestPlanApiCase apiCase = new TestPlanApiCase(); + apiCase.setId(testId); + apiCase.setStatus(status); + apiCase.setUpdateTime(System.currentTimeMillis()); + batchTestPlanApiCaseMapper.updateByPrimaryKeySelective(apiCase); + + TestCaseReviewApiCase reviewApiCase = new TestCaseReviewApiCase(); + reviewApiCase.setId(testId); + reviewApiCase.setStatus(status); + reviewApiCase.setUpdateTime(System.currentTimeMillis()); + batchReviewApiCaseMapper.updateByPrimaryKeySelective(reviewApiCase); + } else { + // 更新用例最后执行结果 + ApiTestCaseWithBLOBs caseWithBLOBs = new ApiTestCaseWithBLOBs(); + caseWithBLOBs.setId(testId); + caseWithBLOBs.setLastResultId(reportId); + caseWithBLOBs.setStatus(status); + caseWithBLOBs.setUpdateTime(System.currentTimeMillis()); + batchApiTestCaseMapper.updateByPrimaryKeySelective(caseWithBLOBs); + } + } + /** * 定时任务触发的保存逻辑 * 定时任务时,userID要改为定时任务中的用户 @@ -218,7 +300,7 @@ public class ApiDefinitionExecResultService { //对响应内容进行进一步解析。如果有附加信息(比如误报库信息),则根据附加信息内的数据进行其他判读 RequestResultExpandDTO expandDTO = ResponseUtil.parseByRequestResult(item); - ApiDefinitionExecResult reportResult = this.save(item, dto.getReportId(), dto.getConsole(), dto.getRunMode(), dto.getTestId()); + ApiDefinitionExecResult reportResult = this.editResult(item, dto.getReportId(), dto.getConsole(), dto.getRunMode(), dto.getTestId(), null); String status = item.isSuccess() ? "success" : "error"; if (reportResult != null) { status = reportResult.getStatus(); @@ -250,6 +332,7 @@ public class ApiDefinitionExecResultService { LoggerUtil.info("TestPlanReportId[" + dto.getTestPlanReportId() + "] APICASE OVER. API CASE STATUS:" + JSONObject.toJSONString(apiIdResultMap)); } + /** * 更新测试计划中, 关联接口测试的功能用例的状态 */ @@ -338,17 +421,11 @@ public class ApiDefinitionExecResultService { } } - private ApiDefinitionExecResult save(RequestResult item, String reportId, String console, String type, String testId) { + private ApiDefinitionExecResult editResult(RequestResult item, String reportId, String console, String type, String testId, ApiDefinitionExecResultMapper batchMapper) { if (!StringUtils.startsWithAny(item.getName(), "PRE_PROCESSOR_ENV_", "POST_PROCESSOR_ENV_")) { - ApiDefinitionExecResult saveResult = apiDefinitionExecResultMapper.selectByPrimaryKey(reportId); - if (saveResult == null) { - saveResult = new ApiDefinitionExecResult(); - } + ApiDefinitionExecResult saveResult = new ApiDefinitionExecResult(); item.getResponseResult().setConsole(console); saveResult.setId(reportId); - if (StringUtils.isEmpty(saveResult.getActuator())) { - saveResult.setActuator("LOCAL"); - } //对响应内容进行进一步解析。如果有附加信息(比如误报库信息),则根据附加信息内的数据进行其他判读 RequestResultExpandDTO expandDTO = ResponseUtil.parseByRequestResult(item); String status = item.isSuccess() ? ExecuteResult.success.name() : ExecuteResult.error.name(); @@ -358,14 +435,8 @@ public class ApiDefinitionExecResultService { } else { saveResult.setContent(JSON.toJSONString(item)); } - - saveResult.setName(item.getName()); saveResult.setType(type); - if (saveResult.getCreateTime() == null || saveResult.getCreateTime() == 0) { - saveResult.setCreateTime(item.getStartTime()); - } - editStatus(saveResult, type, status, saveResult.getCreateTime(), saveResult.getId(), testId); saveResult.setStatus(status); saveResult.setResourceId(item.getName()); saveResult.setStartTime(item.getStartTime()); @@ -374,7 +445,12 @@ public class ApiDefinitionExecResultService { if (StringUtils.isNotEmpty(saveResult.getTriggerMode()) && saveResult.getTriggerMode().equals("CASE")) { saveResult.setTriggerMode(TriggerMode.MANUAL.name()); } - apiDefinitionExecResultMapper.updateByPrimaryKeySelective(saveResult); + if (batchMapper == null) { + editStatus(saveResult, type, status, saveResult.getCreateTime(), saveResult.getId(), testId); + apiDefinitionExecResultMapper.updateByPrimaryKeySelective(saveResult); + } else { + batchMapper.updateByPrimaryKeySelective(saveResult); + } return saveResult; } return null; diff --git a/backend/src/main/java/io/metersphere/api/service/ApiScenarioReportResultService.java b/backend/src/main/java/io/metersphere/api/service/ApiScenarioReportResultService.java index ad16996bb8..ccbb405a6a 100644 --- a/backend/src/main/java/io/metersphere/api/service/ApiScenarioReportResultService.java +++ b/backend/src/main/java/io/metersphere/api/service/ApiScenarioReportResultService.java @@ -7,9 +7,14 @@ import io.metersphere.base.mapper.ApiScenarioReportResultMapper; import io.metersphere.commons.constants.ExecuteResult; import io.metersphere.commons.utils.ErrorReportLibraryUtil; import io.metersphere.dto.RequestResult; +import io.metersphere.dto.ResultDTO; import io.metersphere.utils.LoggerUtil; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.ibatis.session.ExecutorType; +import org.apache.ibatis.session.SqlSession; +import org.apache.ibatis.session.SqlSessionFactory; +import org.mybatis.spring.SqlSessionUtils; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -23,6 +28,8 @@ import java.util.UUID; public class ApiScenarioReportResultService { @Resource private ApiScenarioReportResultMapper apiScenarioReportResultMapper; + @Resource + private SqlSessionFactory sqlSessionFactory; public void save(String reportId, List queue) { if (CollectionUtils.isNotEmpty(queue)) { @@ -37,6 +44,26 @@ public class ApiScenarioReportResultService { } } + public void batchSave(List dtos) { + if (CollectionUtils.isNotEmpty(dtos)) { + SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH); + ApiScenarioReportResultMapper batchMapper = sqlSession.getMapper(ApiScenarioReportResultMapper.class); + for (ResultDTO dto : dtos) { + if (CollectionUtils.isNotEmpty(dto.getRequestResults())) { + dto.getRequestResults().forEach(item -> { + if (StringUtils.isEmpty(item.getName()) || !item.getName().startsWith("Transaction=") || !CollectionUtils.isEmpty(item.getSubRequestResults())) { + batchMapper.insert(this.newApiScenarioReportResult(dto.getReportId(), item)); + } + }); + } + } + sqlSession.flushStatements(); + if (sqlSession != null && sqlSessionFactory != null) { + SqlSessionUtils.closeSqlSession(sqlSession, sqlSessionFactory); + } + } + } + private ApiScenarioReportResult newApiScenarioReportResult(String reportId, RequestResult result) { ApiScenarioReportResult report = new ApiScenarioReportResult(); //解析误报内容 diff --git a/backend/src/main/java/io/metersphere/api/service/ApiScenarioReportService.java b/backend/src/main/java/io/metersphere/api/service/ApiScenarioReportService.java index bfb8f5b40e..074bfea331 100644 --- a/backend/src/main/java/io/metersphere/api/service/ApiScenarioReportService.java +++ b/backend/src/main/java/io/metersphere/api/service/ApiScenarioReportService.java @@ -84,6 +84,10 @@ public class ApiScenarioReportService { apiScenarioReportResultService.save(dto.getReportId(), requestResults); } + public void batchSaveResult(List dtos) { + apiScenarioReportResultService.batchSave(dtos); + } + public ApiScenarioReport testEnded(ResultDTO dto) { if (!StringUtils.equals(dto.getReportType(), RunModeConstants.SET_REPORT.toString())) { // 更新控制台信息 diff --git a/backend/src/main/java/io/metersphere/api/service/ApiScenarioReportStructureService.java b/backend/src/main/java/io/metersphere/api/service/ApiScenarioReportStructureService.java index ad2f5529e6..475e2ea3fb 100644 --- a/backend/src/main/java/io/metersphere/api/service/ApiScenarioReportStructureService.java +++ b/backend/src/main/java/io/metersphere/api/service/ApiScenarioReportStructureService.java @@ -267,7 +267,7 @@ public class ApiScenarioReportStructureService { } } // 循环步骤请求从新排序 - if (dtoList.stream().filter(e -> e.getValue() != null).collect(Collectors.toList()).size() == dtoList.size()) { + if (dtoList.stream().filter(e -> e.getValue() != null && e.getAllIndex() != null).collect(Collectors.toList()).size() == dtoList.size()) { List list = dtoList.stream().sorted(Comparator.comparing(x -> x.getAllIndex())).collect(Collectors.toList()); for (int index = 0; index < list.size(); index++) { list.get(index).setIndex((index + 1)); diff --git a/backend/src/main/java/io/metersphere/api/service/TestResultService.java b/backend/src/main/java/io/metersphere/api/service/TestResultService.java index 0ff67a6996..71af4fe90e 100644 --- a/backend/src/main/java/io/metersphere/api/service/TestResultService.java +++ b/backend/src/main/java/io/metersphere/api/service/TestResultService.java @@ -5,7 +5,6 @@ import io.metersphere.api.jmeter.ExecutedHandleSingleton; import io.metersphere.base.domain.*; import io.metersphere.base.mapper.ApiDefinitionExecResultMapper; import io.metersphere.base.mapper.ApiScenarioMapper; -import io.metersphere.base.mapper.ApiScenarioReportMapper; import io.metersphere.commons.constants.APITestStatus; import io.metersphere.commons.constants.ApiRunMode; import io.metersphere.commons.constants.NoticeConstants; @@ -54,8 +53,6 @@ public class TestResultService { @Resource private ApiTestCaseService apiTestCaseService; @Resource - private ApiScenarioReportMapper apiScenarioReportMapper; - @Resource private ApiDefinitionExecResultMapper apiDefinitionExecResultMapper; public void saveResults(ResultDTO dto) { @@ -80,12 +77,32 @@ public class TestResultService { updateTestCaseStates(requestResults, dto.getRunMode()); } + public void batchSaveResults(Map> resultDtoMap) { + // 处理环境 + List environmentList = new LinkedList<>(); + for (String key : resultDtoMap.keySet()) { + List dtos = resultDtoMap.get(key); + for (ResultDTO dto : dtos) { + if (dto.getArbitraryData() != null && dto.getArbitraryData().containsKey("ENV")) { + environmentList = (List) dto.getArbitraryData().get("ENV"); + } + //处理环境参数 + if (CollectionUtils.isNotEmpty(environmentList)) { + ExecutedHandleSingleton.parseEnvironment(environmentList); + } + // 处理用例/场景和计划关系 + updateTestCaseStates(dto.getRequestResults(), dto.getRunMode()); + + } + //测试计划定时任务-接口执行逻辑的话,需要同步测试计划的报告数据 + if (StringUtils.equals(key, "schedule-task")) { + apiDefinitionExecResultService.batchSaveApiResult(dtos, true); + } else if (StringUtils.equals(key, "api-test-case-task")) { + apiDefinitionExecResultService.batchSaveApiResult(dtos, false); + } else if (StringUtils.equalsAny(key, "api-scenario-task")) { + apiScenarioReportService.batchSaveResult(dtos); + } - public void editReportTime(ResultDTO dto) { - ApiScenarioReport report = apiScenarioReportMapper.selectByPrimaryKey(dto.getReportId()); - if (report != null) { - report.setUpdateTime(System.currentTimeMillis()); - apiScenarioReportMapper.updateByPrimaryKey(report); } } diff --git a/backend/src/main/java/io/metersphere/config/KafkaConfig.java b/backend/src/main/java/io/metersphere/config/KafkaConfig.java index 1e6e5d5ce5..bc4a26e38b 100644 --- a/backend/src/main/java/io/metersphere/config/KafkaConfig.java +++ b/backend/src/main/java/io/metersphere/config/KafkaConfig.java @@ -2,11 +2,19 @@ package io.metersphere.config; import io.metersphere.commons.utils.CommonBeanFactory; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.config.TopicBuilder; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; +import org.springframework.kafka.listener.ContainerProperties; +import javax.annotation.Resource; import java.util.HashMap; import java.util.Map; @@ -14,6 +22,8 @@ import java.util.Map; public class KafkaConfig { //执行结果回传 public static final String TOPICS = "ms-api-exec-topic"; + @Resource + private KafkaProperties kafkaProperties; @Bean public NewTopic apiExecTopic() { @@ -28,4 +38,38 @@ public class KafkaConfig { producerProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, kafkaProperties.getMaxRequestSize()); return producerProps; } + + public Map consumerConfigs() { + Map producerProps = new HashMap<>(); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers()); + producerProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, kafkaProperties.getMaxRequestSize()); + // 批量一次最大拉取数据量 + producerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); + producerProps.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 6000); + producerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);// 手动提交 配置 false + producerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + producerProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 20000); + producerProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 100000); + + producerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + producerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + return producerProps; + } + + @Bean + public KafkaListenerContainerFactory> batchFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs())); + //并发数量 + factory.setConcurrency(1); + //开启批量监听 + factory.setBatchListener(true); + + factory.getContainerProperties().setPollTimeout(10000); + + //设置提交偏移量的方式, + factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); + + return factory; + } } diff --git a/frontend/src/business/components/api/definition/components/response/RequestMetric.vue b/frontend/src/business/components/api/definition/components/response/RequestMetric.vue index 155d7014d0..7707bf2d96 100644 --- a/frontend/src/business/components/api/definition/components/response/RequestMetric.vue +++ b/frontend/src/business/components/api/definition/components/response/RequestMetric.vue @@ -8,19 +8,19 @@ :content="responseResult.responseCode" placement="top"> -
+
{{ responseResult && responseResult.responseCode ? responseResult.responseCode : '0' }}
-
+
{{ responseResult && responseResult.responseCode ? responseResult.responseCode : '0' }}
-
+
{{ responseResult && responseResult.responseCode ? responseResult.responseCode : '0' }}
-
+
- {{ responseData.attachInfoMap.errorReportResult }} + {{ response.attachInfoMap.errorReportResult }}
@@ -39,6 +39,7 @@