refactor(测试计划): 优化测试计划大批量执行KAFKA消息堆积现象 (#11142)

--bug=1010768 --user=赵勇 测试计划执行报告一直处于running状态,部分测试计划报告为completed的状态的报告内容全部为空 https://www.tapd.cn/55049933/s/1113017

Co-authored-by: fit2-zhao <yong.zhao@fit2cloud.com>
This commit is contained in:
metersphere-bot 2022-03-03 15:33:46 +08:00 committed by GitHub
parent 72fbb1e296
commit 52a8a6d44a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 274 additions and 71 deletions

View File

@ -166,7 +166,7 @@ public class ApiScenarioSerialService {
MsThreadGroup group = new MsThreadGroup();
group.setLabel(caseWithBLOBs.getName());
group.setName(caseWithBLOBs.getName());
group.setName(runRequest.getReportId());
group.setProjectId(caseWithBLOBs.getProjectId());
MsTestElement testElement = parse(caseWithBLOBs, testId, envId);

View File

@ -49,6 +49,7 @@ public class ApiDefinitionExecResultUtil {
if (caseWithBLOBs != null) {
apiResult.setName(caseWithBLOBs.getName());
apiResult.setProjectId(caseWithBLOBs.getProjectId());
apiResult.setVersionId(caseWithBLOBs.getVersionId());
}
apiResult.setTriggerMode(request.getTriggerMode());
apiResult.setActuator("LOCAL");

View File

@ -2,8 +2,6 @@ package io.metersphere.api.jmeter;
import io.metersphere.api.service.ApiEnvironmentRunningParamService;
import io.metersphere.commons.utils.CommonBeanFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
@ -15,16 +13,11 @@ import java.util.List;
*/
public class ExecutedHandleSingleton {
private static volatile ApiEnvironmentRunningParamService apiEnvironmentRunningParamService = CommonBeanFactory.getBean(ApiEnvironmentRunningParamService.class);
static Logger testPlanLog = LoggerFactory.getLogger("testPlanExecuteLog");
private ExecutedHandleSingleton() {
}
public static void parseEnvironment(List<String> evnStrList) {
for (String evnStr: evnStrList) {
try {
Thread.sleep(1000);
}catch (Exception e){
}
apiEnvironmentRunningParamService.parseEvn(evnStr);
}
}

View File

@ -8,50 +8,95 @@ import io.metersphere.api.service.ApiEnvironmentRunningParamService;
import io.metersphere.api.service.ApiExecutionQueueService;
import io.metersphere.api.service.TestResultService;
import io.metersphere.commons.constants.ApiRunMode;
import io.metersphere.commons.utils.CommonBeanFactory;
import io.metersphere.config.KafkaConfig;
import io.metersphere.dto.ResultDTO;
import io.metersphere.utils.LoggerUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import org.springframework.kafka.support.Acknowledgment;
import javax.annotation.Resource;
import java.util.*;
@Service
@Configuration
public class MsKafkaListener {
public static final String CONSUME_ID = "ms-api-exec-consume";
@Resource
private ApiExecutionQueueService apiExecutionQueueService;
@KafkaListener(id = CONSUME_ID, topics = KafkaConfig.TOPICS, groupId = "${spring.kafka.consumer.group-id}")
public void consume(ConsumerRecord<?, String> record) {
LoggerUtil.info("接收到执行结果开始存储");
ResultDTO testResult = this.formatResult(record.value());
if (testResult != null && testResult.getArbitraryData() != null && testResult.getArbitraryData().containsKey("TEST_END") && (Boolean) testResult.getArbitraryData().get("TEST_END")) {
LoggerUtil.info("报告 【 " + testResult.getReportId() + " 】资源 " + testResult.getTestId() + " 整体执行完成");
testResultService.testEnded(testResult);
private static final Map<String, String> RUN_MODE_MAP = new HashMap<String, String>() {{
this.put(ApiRunMode.SCHEDULE_API_PLAN.name(), "schedule-task");
this.put(ApiRunMode.JENKINS_API_PLAN.name(), "schedule-task");
this.put(ApiRunMode.MANUAL_PLAN.name(), "schedule-task");
LoggerUtil.info("执行队列处理:" + testResult.getQueueId());
apiExecutionQueueService.queueNext(testResult);
// 全局并发队列
PoolExecBlockingQueueUtil.offer(testResult.getReportId());
// 更新测试计划报告
if (StringUtils.isNotEmpty(testResult.getTestPlanReportId())) {
LoggerUtil.info("Check Processing Test Plan report status" + testResult.getQueueId() + "" + testResult.getTestId());
apiExecutionQueueService.testPlanReportTestEnded(testResult.getTestPlanReportId());
this.put(ApiRunMode.DEFINITION.name(), "api-test-case-task");
this.put(ApiRunMode.JENKINS.name(), "api-test-case-task");
this.put(ApiRunMode.API_PLAN.name(), "api-test-case-task");
this.put(ApiRunMode.JENKINS_API_PLAN.name(), "api-test-case-task");
this.put(ApiRunMode.MANUAL_PLAN.name(), "api-test-case-task");
this.put(ApiRunMode.SCENARIO.name(), "api-scenario-task");
this.put(ApiRunMode.SCENARIO_PLAN.name(), "api-scenario-task");
this.put(ApiRunMode.SCHEDULE_SCENARIO_PLAN.name(), "api-scenario-task");
this.put(ApiRunMode.SCHEDULE_SCENARIO.name(), "api-scenario-task");
this.put(ApiRunMode.JENKINS_SCENARIO_PLAN.name(), "api-scenario-task");
}};
@KafkaListener(id = CONSUME_ID, topics = KafkaConfig.TOPICS, groupId = "${spring.kafka.consumer.group-id}", containerFactory = "batchFactory")
public void consume(List<ConsumerRecord<?, String>> records, Acknowledgment ack) {
try {
LoggerUtil.info("进入KAFKA消费接收到执行结果开始存储" + records.size());
// 分三类存储
Map<String, List<ResultDTO>> assortMap = new LinkedHashMap<>();
List<ResultDTO> resultDTOS = new LinkedList<>();
records.forEach(record -> {
ResultDTO testResult = this.formatResult(record.value());
if (testResult != null) {
if (testResult.getArbitraryData() != null && testResult.getArbitraryData().containsKey("TEST_END") && (Boolean) testResult.getArbitraryData().get("TEST_END")) {
resultDTOS.add(testResult);
} else {
String key = RUN_MODE_MAP.get(testResult.getRunMode());
if (assortMap.containsKey(key)) {
assortMap.get(key).add(testResult);
} else {
assortMap.put(key, new LinkedList<ResultDTO>() {{
this.add(testResult);
}});
}
}
}
});
if (!assortMap.isEmpty()) {
testResultService.batchSaveResults(assortMap);
LoggerUtil.info("KAFKA消费执行内容存储结束");
}
} else {
// 更新报告最后接收到请求的时间
if (StringUtils.equalsAny(testResult.getRunMode(), ApiRunMode.SCENARIO.name(),
ApiRunMode.SCENARIO_PLAN.name(), ApiRunMode.SCHEDULE_SCENARIO_PLAN.name(),
ApiRunMode.SCHEDULE_SCENARIO.name(), ApiRunMode.JENKINS_SCENARIO_PLAN.name())) {
CommonBeanFactory.getBean(TestResultService.class).editReportTime(testResult);
// 更新执行结果
if (CollectionUtils.isNotEmpty(resultDTOS)) {
resultDTOS.forEach(testResult -> {
LoggerUtil.info("报告 【 " + testResult.getReportId() + " 】资源 " + testResult.getTestId() + " 整体执行完成");
testResultService.testEnded(testResult);
LoggerUtil.info("执行队列处理:" + testResult.getQueueId());
apiExecutionQueueService.queueNext(testResult);
// 全局并发队列
PoolExecBlockingQueueUtil.offer(testResult.getReportId());
// 更新测试计划报告
if (StringUtils.isNotEmpty(testResult.getTestPlanReportId())) {
LoggerUtil.info("Check Processing Test Plan report status" + testResult.getQueueId() + "" + testResult.getTestId());
apiExecutionQueueService.testPlanReportTestEnded(testResult.getTestPlanReportId());
}
});
}
testResultService.saveResults(testResult);
} catch (Exception e) {
LoggerUtil.error("KAFKA消费失败" + e.getMessage());
} finally {
ack.acknowledge();
}
LoggerUtil.info("执行内容存储结束");
}
@Resource

View File

@ -28,6 +28,10 @@ import org.apache.commons.beanutils.BeanMap;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.ibatis.session.ExecutorType;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@ -63,6 +67,8 @@ public class ApiDefinitionExecResultService {
private UserMapper userMapper;
@Resource
private ProjectMapper projectMapper;
@Resource
private SqlSessionFactory sqlSessionFactory;
public void saveApiResult(List<RequestResult> requestResults, ResultDTO dto) {
LoggerUtil.info("接收到API/CASE执行结果【 " + requestResults.size() + "");
@ -72,7 +78,7 @@ public class ApiDefinitionExecResultService {
item.getResponseResult().setResponseTime((item.getEndTime() - item.getStartTime()));
}
if (!StringUtils.startsWithAny(item.getName(), "PRE_PROCESSOR_ENV_", "POST_PROCESSOR_ENV_")) {
ApiDefinitionExecResult result = this.save(item, dto.getReportId(), dto.getConsole(), dto.getRunMode(), dto.getTestId());
ApiDefinitionExecResult result = this.editResult(item, dto.getReportId(), dto.getConsole(), dto.getRunMode(), dto.getTestId(), null);
if (result != null) {
// 发送通知
sendNotice(result);
@ -81,6 +87,54 @@ public class ApiDefinitionExecResultService {
}
}
public void batchSaveApiResult(List<ResultDTO> resultDTOS, boolean isSchedule) {
if (CollectionUtils.isEmpty(resultDTOS)) {
return;
}
LoggerUtil.info("接收到API/CASE执行结果【 " + resultDTOS.size() + "");
SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH);
ApiDefinitionExecResultMapper definitionExecResultMapper = sqlSession.getMapper(ApiDefinitionExecResultMapper.class);
TestPlanApiCaseMapper planApiCaseMapper = sqlSession.getMapper(TestPlanApiCaseMapper.class);
TestCaseReviewApiCaseMapper reviewApiCaseMapper = sqlSession.getMapper(TestCaseReviewApiCaseMapper.class);
ApiTestCaseMapper batchApiTestCaseMapper = sqlSession.getMapper(ApiTestCaseMapper.class);
for (ResultDTO dto : resultDTOS) {
if (CollectionUtils.isNotEmpty(dto.getRequestResults())) {
for (RequestResult item : dto.getRequestResults()) {
if (!StringUtils.startsWithAny(item.getName(), "PRE_PROCESSOR_ENV_", "POST_PROCESSOR_ENV_")) {
ApiDefinitionExecResult result = this.editResult(item, dto.getReportId(), dto.getConsole(), dto.getRunMode(), dto.getTestId(),
definitionExecResultMapper);
// 批量更新关联关系状态
batchEditStatus(dto.getRunMode(), result.getStatus(), result.getId(), dto.getTestId(),
planApiCaseMapper, reviewApiCaseMapper, batchApiTestCaseMapper
);
if (result != null && !StringUtils.startsWithAny(dto.getRunMode(), "SCHEDULE")) {
// 发送通知
sendNotice(result);
}
}
}
if (isSchedule) {
// 这个方法得优化大批量跑有问题
updateTestCaseStates(dto.getTestId());
Map<String, String> apiIdResultMap = new HashMap<>();
long errorSize = dto.getRequestResults().stream().filter(requestResult -> requestResult.getError() > 0).count();
String status = errorSize > 0 || dto.getRequestResults().isEmpty() ? TestPlanApiExecuteStatus.FAILD.name() : TestPlanApiExecuteStatus.SUCCESS.name();
if (StringUtils.isNotEmpty(dto.getReportId())) {
apiIdResultMap.put(dto.getReportId(), status);
}
LoggerUtil.info("TestPlanReportId[" + dto.getTestPlanReportId() + "] API CASE OVER. API CASE STATUS:" + JSONObject.toJSONString(apiIdResultMap));
}
}
}
sqlSession.flushStatements();
if (sqlSession != null && sqlSessionFactory != null) {
SqlSessionUtils.closeSqlSession(sqlSession, sqlSessionFactory);
}
}
private void sendNotice(ApiDefinitionExecResult result) {
try {
String resourceId = result.getResourceId();
@ -206,6 +260,34 @@ public class ApiDefinitionExecResultService {
saveResult.setName(name);
}
public void batchEditStatus(String type, String status, String reportId, String testId,
TestPlanApiCaseMapper batchTestPlanApiCaseMapper,
TestCaseReviewApiCaseMapper batchReviewApiCaseMapper,
ApiTestCaseMapper batchApiTestCaseMapper) {
if (StringUtils.equalsAnyIgnoreCase(type, ApiRunMode.API_PLAN.name(), ApiRunMode.SCHEDULE_API_PLAN.name(),
ApiRunMode.JENKINS_API_PLAN.name(), ApiRunMode.MANUAL_PLAN.name())) {
TestPlanApiCase apiCase = new TestPlanApiCase();
apiCase.setId(testId);
apiCase.setStatus(status);
apiCase.setUpdateTime(System.currentTimeMillis());
batchTestPlanApiCaseMapper.updateByPrimaryKeySelective(apiCase);
TestCaseReviewApiCase reviewApiCase = new TestCaseReviewApiCase();
reviewApiCase.setId(testId);
reviewApiCase.setStatus(status);
reviewApiCase.setUpdateTime(System.currentTimeMillis());
batchReviewApiCaseMapper.updateByPrimaryKeySelective(reviewApiCase);
} else {
// 更新用例最后执行结果
ApiTestCaseWithBLOBs caseWithBLOBs = new ApiTestCaseWithBLOBs();
caseWithBLOBs.setId(testId);
caseWithBLOBs.setLastResultId(reportId);
caseWithBLOBs.setStatus(status);
caseWithBLOBs.setUpdateTime(System.currentTimeMillis());
batchApiTestCaseMapper.updateByPrimaryKeySelective(caseWithBLOBs);
}
}
/**
* 定时任务触发的保存逻辑
* 定时任务时userID要改为定时任务中的用户
@ -218,7 +300,7 @@ public class ApiDefinitionExecResultService {
//对响应内容进行进一步解析如果有附加信息比如误报库信息则根据附加信息内的数据进行其他判读
RequestResultExpandDTO expandDTO = ResponseUtil.parseByRequestResult(item);
ApiDefinitionExecResult reportResult = this.save(item, dto.getReportId(), dto.getConsole(), dto.getRunMode(), dto.getTestId());
ApiDefinitionExecResult reportResult = this.editResult(item, dto.getReportId(), dto.getConsole(), dto.getRunMode(), dto.getTestId(), null);
String status = item.isSuccess() ? "success" : "error";
if (reportResult != null) {
status = reportResult.getStatus();
@ -250,6 +332,7 @@ public class ApiDefinitionExecResultService {
LoggerUtil.info("TestPlanReportId[" + dto.getTestPlanReportId() + "] APICASE OVER. API CASE STATUS:" + JSONObject.toJSONString(apiIdResultMap));
}
/**
* 更新测试计划中, 关联接口测试的功能用例的状态
*/
@ -338,17 +421,11 @@ public class ApiDefinitionExecResultService {
}
}
private ApiDefinitionExecResult save(RequestResult item, String reportId, String console, String type, String testId) {
private ApiDefinitionExecResult editResult(RequestResult item, String reportId, String console, String type, String testId, ApiDefinitionExecResultMapper batchMapper) {
if (!StringUtils.startsWithAny(item.getName(), "PRE_PROCESSOR_ENV_", "POST_PROCESSOR_ENV_")) {
ApiDefinitionExecResult saveResult = apiDefinitionExecResultMapper.selectByPrimaryKey(reportId);
if (saveResult == null) {
saveResult = new ApiDefinitionExecResult();
}
ApiDefinitionExecResult saveResult = new ApiDefinitionExecResult();
item.getResponseResult().setConsole(console);
saveResult.setId(reportId);
if (StringUtils.isEmpty(saveResult.getActuator())) {
saveResult.setActuator("LOCAL");
}
//对响应内容进行进一步解析如果有附加信息比如误报库信息则根据附加信息内的数据进行其他判读
RequestResultExpandDTO expandDTO = ResponseUtil.parseByRequestResult(item);
String status = item.isSuccess() ? ExecuteResult.success.name() : ExecuteResult.error.name();
@ -358,14 +435,8 @@ public class ApiDefinitionExecResultService {
} else {
saveResult.setContent(JSON.toJSONString(item));
}
saveResult.setName(item.getName());
saveResult.setType(type);
if (saveResult.getCreateTime() == null || saveResult.getCreateTime() == 0) {
saveResult.setCreateTime(item.getStartTime());
}
editStatus(saveResult, type, status, saveResult.getCreateTime(), saveResult.getId(), testId);
saveResult.setStatus(status);
saveResult.setResourceId(item.getName());
saveResult.setStartTime(item.getStartTime());
@ -374,7 +445,12 @@ public class ApiDefinitionExecResultService {
if (StringUtils.isNotEmpty(saveResult.getTriggerMode()) && saveResult.getTriggerMode().equals("CASE")) {
saveResult.setTriggerMode(TriggerMode.MANUAL.name());
}
apiDefinitionExecResultMapper.updateByPrimaryKeySelective(saveResult);
if (batchMapper == null) {
editStatus(saveResult, type, status, saveResult.getCreateTime(), saveResult.getId(), testId);
apiDefinitionExecResultMapper.updateByPrimaryKeySelective(saveResult);
} else {
batchMapper.updateByPrimaryKeySelective(saveResult);
}
return saveResult;
}
return null;

View File

@ -7,9 +7,14 @@ import io.metersphere.base.mapper.ApiScenarioReportResultMapper;
import io.metersphere.commons.constants.ExecuteResult;
import io.metersphere.commons.utils.ErrorReportLibraryUtil;
import io.metersphere.dto.RequestResult;
import io.metersphere.dto.ResultDTO;
import io.metersphere.utils.LoggerUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.ibatis.session.ExecutorType;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@ -23,6 +28,8 @@ import java.util.UUID;
public class ApiScenarioReportResultService {
@Resource
private ApiScenarioReportResultMapper apiScenarioReportResultMapper;
@Resource
private SqlSessionFactory sqlSessionFactory;
public void save(String reportId, List<RequestResult> queue) {
if (CollectionUtils.isNotEmpty(queue)) {
@ -37,6 +44,26 @@ public class ApiScenarioReportResultService {
}
}
public void batchSave(List<ResultDTO> dtos) {
if (CollectionUtils.isNotEmpty(dtos)) {
SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH);
ApiScenarioReportResultMapper batchMapper = sqlSession.getMapper(ApiScenarioReportResultMapper.class);
for (ResultDTO dto : dtos) {
if (CollectionUtils.isNotEmpty(dto.getRequestResults())) {
dto.getRequestResults().forEach(item -> {
if (StringUtils.isEmpty(item.getName()) || !item.getName().startsWith("Transaction=") || !CollectionUtils.isEmpty(item.getSubRequestResults())) {
batchMapper.insert(this.newApiScenarioReportResult(dto.getReportId(), item));
}
});
}
}
sqlSession.flushStatements();
if (sqlSession != null && sqlSessionFactory != null) {
SqlSessionUtils.closeSqlSession(sqlSession, sqlSessionFactory);
}
}
}
private ApiScenarioReportResult newApiScenarioReportResult(String reportId, RequestResult result) {
ApiScenarioReportResult report = new ApiScenarioReportResult();
//解析误报内容

View File

@ -84,6 +84,10 @@ public class ApiScenarioReportService {
apiScenarioReportResultService.save(dto.getReportId(), requestResults);
}
public void batchSaveResult(List<ResultDTO> dtos) {
apiScenarioReportResultService.batchSave(dtos);
}
public ApiScenarioReport testEnded(ResultDTO dto) {
if (!StringUtils.equals(dto.getReportType(), RunModeConstants.SET_REPORT.toString())) {
// 更新控制台信息

View File

@ -267,7 +267,7 @@ public class ApiScenarioReportStructureService {
}
}
// 循环步骤请求从新排序
if (dtoList.stream().filter(e -> e.getValue() != null).collect(Collectors.toList()).size() == dtoList.size()) {
if (dtoList.stream().filter(e -> e.getValue() != null && e.getAllIndex() != null).collect(Collectors.toList()).size() == dtoList.size()) {
List<StepTreeDTO> list = dtoList.stream().sorted(Comparator.comparing(x -> x.getAllIndex())).collect(Collectors.toList());
for (int index = 0; index < list.size(); index++) {
list.get(index).setIndex((index + 1));

View File

@ -5,7 +5,6 @@ import io.metersphere.api.jmeter.ExecutedHandleSingleton;
import io.metersphere.base.domain.*;
import io.metersphere.base.mapper.ApiDefinitionExecResultMapper;
import io.metersphere.base.mapper.ApiScenarioMapper;
import io.metersphere.base.mapper.ApiScenarioReportMapper;
import io.metersphere.commons.constants.APITestStatus;
import io.metersphere.commons.constants.ApiRunMode;
import io.metersphere.commons.constants.NoticeConstants;
@ -54,8 +53,6 @@ public class TestResultService {
@Resource
private ApiTestCaseService apiTestCaseService;
@Resource
private ApiScenarioReportMapper apiScenarioReportMapper;
@Resource
private ApiDefinitionExecResultMapper apiDefinitionExecResultMapper;
public void saveResults(ResultDTO dto) {
@ -80,12 +77,32 @@ public class TestResultService {
updateTestCaseStates(requestResults, dto.getRunMode());
}
public void batchSaveResults(Map<String, List<ResultDTO>> resultDtoMap) {
// 处理环境
List<String> environmentList = new LinkedList<>();
for (String key : resultDtoMap.keySet()) {
List<ResultDTO> dtos = resultDtoMap.get(key);
for (ResultDTO dto : dtos) {
if (dto.getArbitraryData() != null && dto.getArbitraryData().containsKey("ENV")) {
environmentList = (List<String>) dto.getArbitraryData().get("ENV");
}
//处理环境参数
if (CollectionUtils.isNotEmpty(environmentList)) {
ExecutedHandleSingleton.parseEnvironment(environmentList);
}
// 处理用例/场景和计划关系
updateTestCaseStates(dto.getRequestResults(), dto.getRunMode());
}
//测试计划定时任务-接口执行逻辑的话需要同步测试计划的报告数据
if (StringUtils.equals(key, "schedule-task")) {
apiDefinitionExecResultService.batchSaveApiResult(dtos, true);
} else if (StringUtils.equals(key, "api-test-case-task")) {
apiDefinitionExecResultService.batchSaveApiResult(dtos, false);
} else if (StringUtils.equalsAny(key, "api-scenario-task")) {
apiScenarioReportService.batchSaveResult(dtos);
}
public void editReportTime(ResultDTO dto) {
ApiScenarioReport report = apiScenarioReportMapper.selectByPrimaryKey(dto.getReportId());
if (report != null) {
report.setUpdateTime(System.currentTimeMillis());
apiScenarioReportMapper.updateByPrimaryKey(report);
}
}

View File

@ -2,11 +2,19 @@ package io.metersphere.config;
import io.metersphere.commons.utils.CommonBeanFactory;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
@ -14,6 +22,8 @@ import java.util.Map;
public class KafkaConfig {
//执行结果回传
public static final String TOPICS = "ms-api-exec-topic";
@Resource
private KafkaProperties kafkaProperties;
@Bean
public NewTopic apiExecTopic() {
@ -28,4 +38,38 @@ public class KafkaConfig {
producerProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, kafkaProperties.getMaxRequestSize());
return producerProps;
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> producerProps = new HashMap<>();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
producerProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, kafkaProperties.getMaxRequestSize());
// 批量一次最大拉取数据量
producerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
producerProps.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 6000);
producerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);// 手动提交 配置 false
producerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
producerProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 20000);
producerProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 100000);
producerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
producerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return producerProps;
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> batchFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
//并发数量
factory.setConcurrency(1);
//开启批量监听
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(10000);
//设置提交偏移量的方式
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
}

View File

@ -8,19 +8,19 @@
:content="responseResult.responseCode"
placement="top">
<div v-if="responseData.attachInfoMap && responseData.attachInfoMap.errorReportResult" class="node-title" :class="'ms-req-error-report-result'">
<div v-if="response.attachInfoMap && response.attachInfoMap.errorReportResult" class="node-title" :class="'ms-req-error-report-result'">
{{ responseResult && responseResult.responseCode ? responseResult.responseCode : '0' }}
</div>
<div v-else class="node-title" :class="responseData && responseData.success ?'ms-req-success':'ms-req-error'">
<div v-else class="node-title" :class="response && response.success ?'ms-req-success':'ms-req-error'">
{{ responseResult && responseResult.responseCode ? responseResult.responseCode : '0' }}
</div>
</el-tooltip>
<div v-else class="node-title" :class="responseData && responseData.success ?'ms-req-success':'ms-req-error'">
<div v-else class="node-title" :class="response && response.success ?'ms-req-success':'ms-req-error'">
{{ responseResult && responseResult.responseCode ? responseResult.responseCode : '0' }}
</div>
<div v-if="responseData.attachInfoMap && responseData.attachInfoMap.errorReportResult">
<div v-if="response && response.attachInfoMap && response.attachInfoMap.errorReportResult">
<div class="ms-req-error-report-result">
{{ responseData.attachInfoMap.errorReportResult }}
{{ response.attachInfoMap.errorReportResult }}
</div>
</div>
</el-col>
@ -39,6 +39,7 @@
<script>
export default {
name: "MsRequestMetric",
props: {
response: {
type: Object,
@ -47,11 +48,6 @@ export default {
}
}
},
data() {
return {
responseData: this.response ? this.response : {}
}
},
computed: {
responseResult() {
return this.response && this.response.responseResult ? this.response.responseResult : {};
@ -59,7 +55,7 @@ export default {
error() {
return this.response && this.response.responseCode && this.response.responseCode >= 400;
}
},
}
}
</script>