feat(接口测试): 支持消息内容压缩
This commit is contained in:
parent
7ec287bd87
commit
588da5a530
|
@ -0,0 +1,67 @@
|
|||
package io.metersphere.api.config;
|
||||
|
||||
import io.metersphere.sdk.util.CommonBeanFactory;
|
||||
import org.apache.commons.collections.MapUtils;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
import org.springframework.kafka.core.ProducerFactory;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
@Configuration
|
||||
public class KafkaConfig {
|
||||
private static Map<String, Object> defaultMap;
|
||||
|
||||
@Autowired
|
||||
public KafkaConfig(KafkaProperties kafkaProperties) {
|
||||
ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties());
|
||||
defaultMap = producerFactory.getConfigurationProperties();
|
||||
}
|
||||
|
||||
public static void addDefaultConfig(Map<String, Object> producerProps) {
|
||||
if (MapUtils.isNotEmpty(defaultMap)) {
|
||||
defaultMap.forEach(producerProps::putIfAbsent);
|
||||
}
|
||||
}
|
||||
|
||||
public static Map<String, Object> getKafkaConfig() {
|
||||
KafkaProperties kafkaProperties = CommonBeanFactory.getBean(KafkaProperties.class);
|
||||
Map<String, Object> producerProps = new HashMap<>();
|
||||
assert kafkaProperties != null;
|
||||
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
|
||||
// 加入默认配置
|
||||
addDefaultConfig(producerProps);
|
||||
return producerProps;
|
||||
}
|
||||
|
||||
public static Map<String, Object> consumerConfig() {
|
||||
KafkaProperties kafkaProperties = CommonBeanFactory.getBean(KafkaProperties.class);
|
||||
Map<String, Object> producerProps = new HashMap<>();
|
||||
assert kafkaProperties != null;
|
||||
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
|
||||
// 加入默认配置
|
||||
addDefaultConfig(producerProps);
|
||||
// 批量一次最大拉取数据量
|
||||
producerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
|
||||
|
||||
// 消费者每次去kafka拉取数据最大间隔,服务端会认为消费者已离线。触发reBalance
|
||||
producerProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 900000);
|
||||
// 心跳检查
|
||||
producerProps.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5000);
|
||||
// 手动提交 配置 false
|
||||
producerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
|
||||
|
||||
producerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
producerProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
|
||||
|
||||
producerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
||||
producerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
||||
return producerProps;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,32 @@
|
|||
package io.metersphere.api.config;
|
||||
|
||||
import io.metersphere.sdk.util.LogUtils;
|
||||
import org.junit.jupiter.api.MethodOrderer;
|
||||
import org.junit.jupiter.api.Order;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.TestMethodOrder;
|
||||
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
|
||||
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
|
||||
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
|
||||
@AutoConfigureMockMvc
|
||||
public class KafkaConfigTests {
|
||||
|
||||
@Test
|
||||
@Order(1)
|
||||
public void testProduceConfig() throws Exception {
|
||||
Map<String, Object> config = KafkaConfig.getKafkaConfig();
|
||||
LogUtils.info(config.get("spring.kafka.producer.properties.compression.type"));
|
||||
}
|
||||
|
||||
@Test
|
||||
@Order(1)
|
||||
public void testConsumerConfig() throws Exception {
|
||||
Map<String, Object> config = KafkaConfig.consumerConfig();
|
||||
LogUtils.info(config.get("spring.kafka.consumer.properties.decompression.type"));
|
||||
}
|
||||
}
|
|
@ -35,6 +35,10 @@ spring.kafka.listener.missing-topics-fatal=false
|
|||
spring.kafka.producer.properties.max.request.size=32428800
|
||||
spring.kafka.producer.batch-size=16384
|
||||
spring.kafka.consumer.properties.max.partition.fetch.bytes=52428800
|
||||
spring.kafka.consumer.properties.decompression.type=gzip
|
||||
spring.kafka.producer.properties.compression.type=gzip
|
||||
spring.kafka.producer.properties.compression.threshold=100
|
||||
|
||||
# mybatis
|
||||
mybatis.configuration.cache-enabled=true
|
||||
mybatis.configuration.lazy-loading-enabled=false
|
||||
|
|
Loading…
Reference in New Issue