From 588da5a530a086c9833d8065cfcd270c162abc3c Mon Sep 17 00:00:00 2001 From: fit2-zhao Date: Mon, 30 Oct 2023 15:18:51 +0800 Subject: [PATCH] =?UTF-8?q?feat(=E6=8E=A5=E5=8F=A3=E6=B5=8B=E8=AF=95):=20?= =?UTF-8?q?=E6=94=AF=E6=8C=81=E6=B6=88=E6=81=AF=E5=86=85=E5=AE=B9=E5=8E=8B?= =?UTF-8?q?=E7=BC=A9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../metersphere/api/config/KafkaConfig.java | 67 +++++++++++++++++++ .../api/config/KafkaConfigTests.java | 32 +++++++++ .../src/test/resources/application.properties | 4 ++ 3 files changed, 103 insertions(+) create mode 100644 backend/services/api-test/src/main/java/io/metersphere/api/config/KafkaConfig.java create mode 100644 backend/services/api-test/src/test/java/io/metersphere/api/config/KafkaConfigTests.java diff --git a/backend/services/api-test/src/main/java/io/metersphere/api/config/KafkaConfig.java b/backend/services/api-test/src/main/java/io/metersphere/api/config/KafkaConfig.java new file mode 100644 index 0000000000..f48a1461d0 --- /dev/null +++ b/backend/services/api-test/src/main/java/io/metersphere/api/config/KafkaConfig.java @@ -0,0 +1,67 @@ +package io.metersphere.api.config; + +import io.metersphere.sdk.util.CommonBeanFactory; +import org.apache.commons.collections.MapUtils; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.ProducerFactory; + +import java.util.HashMap; +import java.util.Map; + +@Configuration +public class KafkaConfig { + private static Map defaultMap; + + @Autowired + public KafkaConfig(KafkaProperties kafkaProperties) { + ProducerFactory producerFactory = new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties()); + defaultMap = producerFactory.getConfigurationProperties(); + } + + public static void addDefaultConfig(Map producerProps) { + if (MapUtils.isNotEmpty(defaultMap)) { + defaultMap.forEach(producerProps::putIfAbsent); + } + } + + public static Map getKafkaConfig() { + KafkaProperties kafkaProperties = CommonBeanFactory.getBean(KafkaProperties.class); + Map producerProps = new HashMap<>(); + assert kafkaProperties != null; + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers()); + // 加入默认配置 + addDefaultConfig(producerProps); + return producerProps; + } + + public static Map consumerConfig() { + KafkaProperties kafkaProperties = CommonBeanFactory.getBean(KafkaProperties.class); + Map producerProps = new HashMap<>(); + assert kafkaProperties != null; + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers()); + // 加入默认配置 + addDefaultConfig(producerProps); + // 批量一次最大拉取数据量 + producerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); + + // 消费者每次去kafka拉取数据最大间隔,服务端会认为消费者已离线。触发reBalance + producerProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 900000); + // 心跳检查 + producerProps.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5000); + // 手动提交 配置 false + producerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + + producerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + producerProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000); + + producerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + producerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + return producerProps; + } +} diff --git a/backend/services/api-test/src/test/java/io/metersphere/api/config/KafkaConfigTests.java b/backend/services/api-test/src/test/java/io/metersphere/api/config/KafkaConfigTests.java new file mode 100644 index 0000000000..565f9d158b --- /dev/null +++ b/backend/services/api-test/src/test/java/io/metersphere/api/config/KafkaConfigTests.java @@ -0,0 +1,32 @@ +package io.metersphere.api.config; + +import io.metersphere.sdk.util.LogUtils; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; +import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; +import org.springframework.boot.test.context.SpringBootTest; + +import java.util.Map; + + +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +@AutoConfigureMockMvc +public class KafkaConfigTests { + + @Test + @Order(1) + public void testProduceConfig() throws Exception { + Map config = KafkaConfig.getKafkaConfig(); + LogUtils.info(config.get("spring.kafka.producer.properties.compression.type")); + } + + @Test + @Order(1) + public void testConsumerConfig() throws Exception { + Map config = KafkaConfig.consumerConfig(); + LogUtils.info(config.get("spring.kafka.consumer.properties.decompression.type")); + } +} diff --git a/backend/services/api-test/src/test/resources/application.properties b/backend/services/api-test/src/test/resources/application.properties index d41f98f511..02d4a62ba4 100644 --- a/backend/services/api-test/src/test/resources/application.properties +++ b/backend/services/api-test/src/test/resources/application.properties @@ -35,6 +35,10 @@ spring.kafka.listener.missing-topics-fatal=false spring.kafka.producer.properties.max.request.size=32428800 spring.kafka.producer.batch-size=16384 spring.kafka.consumer.properties.max.partition.fetch.bytes=52428800 +spring.kafka.consumer.properties.decompression.type=gzip +spring.kafka.producer.properties.compression.type=gzip +spring.kafka.producer.properties.compression.threshold=100 + # mybatis mybatis.configuration.cache-enabled=true mybatis.configuration.lazy-loading-enabled=false