refactor: 调整BufferMemory默认大小并支持自定义配置

Signed-off-by: fit2-zhao <yong.zhao@fit2cloud.com>
This commit is contained in:
fit2-zhao 2023-07-21 09:34:56 +08:00
parent a263c419c3
commit e49d29b76b
2 changed files with 27 additions and 3 deletions

View File

@ -3,20 +3,24 @@ package io.metersphere.commons.config;
import io.metersphere.commons.constants.KafkaTopicConstants; import io.metersphere.commons.constants.KafkaTopicConstants;
import io.metersphere.commons.utils.CommonBeanFactory; import io.metersphere.commons.utils.CommonBeanFactory;
import io.metersphere.config.KafkaProperties; import io.metersphere.config.KafkaProperties;
import jakarta.annotation.Resource;
import org.apache.commons.collections.MapUtils;
import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.TopicBuilder; import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.ContainerProperties;
import jakarta.annotation.Resource;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -45,12 +49,29 @@ public class KafkaConfig {
.build(); .build();
} }
private static Map<String, Object> defaultMap;
@Autowired
public KafkaConfig(org.springframework.boot.autoconfigure.kafka.KafkaProperties kafkaProperties) {
ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties());
defaultMap = producerFactory.getConfigurationProperties();
}
public static void addDefaultConfig(Map<String, Object> producerProps) {
if (MapUtils.isNotEmpty(defaultMap)) {
defaultMap.forEach((k, v) -> producerProps.putIfAbsent(k, v));
}
}
public static Map<String, Object> getKafka() { public static Map<String, Object> getKafka() {
KafkaProperties kafkaProperties = CommonBeanFactory.getBean(KafkaProperties.class); KafkaProperties kafkaProperties = CommonBeanFactory.getBean(KafkaProperties.class);
Map<String, Object> producerProps = new HashMap<>(); Map<String, Object> producerProps = new HashMap<>();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers()); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
producerProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, kafkaProperties.getMaxRequestSize()); producerProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, kafkaProperties.getMaxRequestSize());
producerProps.put(DEBUG_TOPICS_KEY, KafkaTopicConstants.DEBUG_TOPICS); producerProps.put(DEBUG_TOPICS_KEY, KafkaTopicConstants.DEBUG_TOPICS);
// 加入默认配置
addDefaultConfig(producerProps);
return producerProps; return producerProps;
} }
@ -58,6 +79,8 @@ public class KafkaConfig {
Map<String, Object> producerProps = new HashMap<>(); Map<String, Object> producerProps = new HashMap<>();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers()); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
producerProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, kafkaProperties.getMaxRequestSize()); producerProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, kafkaProperties.getMaxRequestSize());
// 加入默认配置
addDefaultConfig(producerProps);
// 批量一次最大拉取数据量 // 批量一次最大拉取数据量
producerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); producerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);

View File

@ -36,7 +36,7 @@ mybatis.configuration.default-statement-timeout=25000
mybatis.configuration.map-underscore-to-camel-case=true mybatis.configuration.map-underscore-to-camel-case=true
# view # view
spring.mvc.throw-exception-if-no-handler-found=true spring.mvc.throw-exception-if-no-handler-found=true
# ?? gzip ?? # gzip
server.compression.enabled=true server.compression.enabled=true
server.compression.mime-types=application/json,application/xml,text/html,text/xml,text/plain,application/javascript,text/css,text/javascript,image/jpeg server.compression.mime-types=application/json,application/xml,text/html,text/xml,text/plain,application/javascript,text/css,text/javascript,image/jpeg
server.compression.min-response-size=2048 server.compression.min-response-size=2048
@ -49,7 +49,8 @@ spring.flyway.baseline-version=0
spring.flyway.encoding=UTF-8 spring.flyway.encoding=UTF-8
spring.flyway.validate-on-migrate=false spring.flyway.validate-on-migrate=false
spring.kafka.listener.missing-topics-fatal=false spring.kafka.listener.missing-topics-fatal=false
spring.kafka.producer.properties.max.request.size=32428800 spring.kafka.producer.properties.max.request.size=524288000
spring.kafka.producer.properties.buffer.memory=524288000
spring.kafka.producer.batch-size=16384 spring.kafka.producer.batch-size=16384
spring.kafka.consumer.properties.max.partition.fetch.bytes=52428800 spring.kafka.consumer.properties.max.partition.fetch.bytes=52428800
spring.messages.basename=i18n/messages,i18n/commons spring.messages.basename=i18n/messages,i18n/commons