refactor(测试计划): 优化大批量执行KAFKA消费慢问题
This commit is contained in:
parent
65305dda5c
commit
a05118ea5e
|
@ -151,7 +151,7 @@ public class JMeterService {
|
|||
}
|
||||
}
|
||||
|
||||
private void send(JmeterRunRequestDTO request, List<TestResource> resources) {
|
||||
private synchronized void send(JmeterRunRequestDTO request, List<TestResource> resources) {
|
||||
try {
|
||||
if (StringUtils.isNotEmpty(request.getPoolId()) && CollectionUtils.isEmpty(resources)) {
|
||||
resources = GenerateHashTreeUtil.setPoolResource(request.getPoolId());
|
||||
|
|
|
@ -0,0 +1,109 @@
|
|||
package io.metersphere.api.jmeter;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.metersphere.api.exec.queue.PoolExecBlockingQueueUtil;
|
||||
import io.metersphere.api.service.ApiExecutionQueueService;
|
||||
import io.metersphere.api.service.TestResultService;
|
||||
import io.metersphere.commons.constants.ApiRunMode;
|
||||
import io.metersphere.dto.ResultDTO;
|
||||
import io.metersphere.utils.LoggerUtil;
|
||||
import lombok.Data;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.commons.collections.MapUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
@Data
|
||||
public class KafkaListenerTask implements Runnable {
|
||||
private List<ConsumerRecord<?, String>> records;
|
||||
private ApiExecutionQueueService apiExecutionQueueService;
|
||||
private TestResultService testResultService;
|
||||
private ObjectMapper mapper;
|
||||
|
||||
private static final Map<String, String> RUN_MODE_MAP = new HashMap<String, String>() {{
|
||||
this.put(ApiRunMode.SCHEDULE_API_PLAN.name(), "schedule-task");
|
||||
this.put(ApiRunMode.JENKINS_API_PLAN.name(), "schedule-task");
|
||||
this.put(ApiRunMode.MANUAL_PLAN.name(), "schedule-task");
|
||||
|
||||
this.put(ApiRunMode.DEFINITION.name(), "api-test-case-task");
|
||||
this.put(ApiRunMode.JENKINS.name(), "api-test-case-task");
|
||||
this.put(ApiRunMode.API_PLAN.name(), "api-test-case-task");
|
||||
this.put(ApiRunMode.JENKINS_API_PLAN.name(), "api-test-case-task");
|
||||
this.put(ApiRunMode.MANUAL_PLAN.name(), "api-test-case-task");
|
||||
|
||||
|
||||
this.put(ApiRunMode.SCENARIO.name(), "api-scenario-task");
|
||||
this.put(ApiRunMode.SCENARIO_PLAN.name(), "api-scenario-task");
|
||||
this.put(ApiRunMode.SCHEDULE_SCENARIO_PLAN.name(), "api-scenario-task");
|
||||
this.put(ApiRunMode.SCHEDULE_SCENARIO.name(), "api-scenario-task");
|
||||
this.put(ApiRunMode.JENKINS_SCENARIO_PLAN.name(), "api-scenario-task");
|
||||
|
||||
}};
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
LoggerUtil.info("进入KAFKA消费,接收到执行结果开始存储:" + records.size());
|
||||
// 分三类存储
|
||||
Map<String, List<ResultDTO>> assortMap = new LinkedHashMap<>();
|
||||
List<ResultDTO> resultDTOS = new LinkedList<>();
|
||||
|
||||
records.forEach(record -> {
|
||||
ResultDTO testResult = this.formatResult(record.value());
|
||||
if (testResult != null) {
|
||||
if (testResult.getArbitraryData() != null && testResult.getArbitraryData().containsKey("TEST_END") && (Boolean) testResult.getArbitraryData().get("TEST_END")) {
|
||||
resultDTOS.add(testResult);
|
||||
} else {
|
||||
String key = RUN_MODE_MAP.get(testResult.getRunMode());
|
||||
if (assortMap.containsKey(key)) {
|
||||
assortMap.get(key).add(testResult);
|
||||
} else {
|
||||
assortMap.put(key, new LinkedList<ResultDTO>() {{
|
||||
this.add(testResult);
|
||||
}});
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
if (MapUtils.isNotEmpty(assortMap)) {
|
||||
LoggerUtil.info("KAFKA消费执行内容存储开始");
|
||||
testResultService.batchSaveResults(assortMap);
|
||||
LoggerUtil.info("KAFKA消费执行内容存储结束");
|
||||
}
|
||||
// 更新执行结果
|
||||
if (CollectionUtils.isNotEmpty(resultDTOS)) {
|
||||
resultDTOS.forEach(testResult -> {
|
||||
LoggerUtil.info("报告 【 " + testResult.getReportId() + " 】资源 " + testResult.getTestId() + " 整体执行完成");
|
||||
testResultService.testEnded(testResult);
|
||||
LoggerUtil.info("执行队列处理:" + testResult.getQueueId());
|
||||
apiExecutionQueueService.queueNext(testResult);
|
||||
// 全局并发队列
|
||||
PoolExecBlockingQueueUtil.offer(testResult.getReportId());
|
||||
// 更新测试计划报告
|
||||
if (StringUtils.isNotEmpty(testResult.getTestPlanReportId())) {
|
||||
LoggerUtil.info("Check Processing Test Plan report status:" + testResult.getQueueId() + "," + testResult.getTestId());
|
||||
apiExecutionQueueService.testPlanReportTestEnded(testResult.getTestPlanReportId());
|
||||
}
|
||||
});
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LoggerUtil.error("KAFKA消费失败:", e);
|
||||
}
|
||||
}
|
||||
|
||||
private ResultDTO formatResult(String result) {
|
||||
try {
|
||||
// 多态JSON普通转换会丢失内容,需要通过 ObjectMapper 获取
|
||||
if (StringUtils.isNotEmpty(result)) {
|
||||
return mapper.readValue(result, new TypeReference<ResultDTO>() {
|
||||
});
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LoggerUtil.error("formatResult 格式化数据失败:", e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -1,24 +1,24 @@
|
|||
package io.metersphere.api.jmeter;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.metersphere.api.exec.queue.PoolExecBlockingQueueUtil;
|
||||
import io.metersphere.api.exec.utils.NamedThreadFactory;
|
||||
import io.metersphere.api.service.ApiExecutionQueueService;
|
||||
import io.metersphere.api.service.TestResultService;
|
||||
import io.metersphere.commons.constants.ApiRunMode;
|
||||
import io.metersphere.config.KafkaConfig;
|
||||
import io.metersphere.dto.ResultDTO;
|
||||
import io.metersphere.utils.LoggerUtil;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.commons.collections.MapUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.kafka.annotation.KafkaListener;
|
||||
import org.springframework.kafka.support.Acknowledgment;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.*;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Configuration
|
||||
public class MsKafkaListener {
|
||||
|
@ -50,51 +50,34 @@ public class MsKafkaListener {
|
|||
|
||||
}};
|
||||
|
||||
// 线程池维护线程的最少数量
|
||||
private final static int CORE_POOL_SIZE = 50;
|
||||
// 线程池维护线程的最大数量
|
||||
private final static int MAX_POOL_SIZE = 50;
|
||||
// 线程池维护线程所允许的空闲时间
|
||||
private final static int KEEP_ALIVE_TIME = 1;
|
||||
// 线程池所使用的缓冲队列大小
|
||||
private final static int WORK_QUEUE_SIZE = 10000;
|
||||
|
||||
|
||||
private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
|
||||
CORE_POOL_SIZE,
|
||||
MAX_POOL_SIZE,
|
||||
KEEP_ALIVE_TIME,
|
||||
TimeUnit.SECONDS,
|
||||
new ArrayBlockingQueue(WORK_QUEUE_SIZE),
|
||||
new NamedThreadFactory("MS-KAFKA-LISTENER-TASK"));
|
||||
|
||||
@KafkaListener(id = CONSUME_ID, topics = KafkaConfig.TOPICS, groupId = "${spring.kafka.consumer.group-id}", containerFactory = "batchFactory")
|
||||
public void consume(List<ConsumerRecord<?, String>> records, Acknowledgment ack) {
|
||||
try {
|
||||
LoggerUtil.info("进入KAFKA消费,接收到执行结果开始存储:" + records.size());
|
||||
// 分三类存储
|
||||
Map<String, List<ResultDTO>> assortMap = new LinkedHashMap<>();
|
||||
List<ResultDTO> resultDTOS = new LinkedList<>();
|
||||
|
||||
records.forEach(record -> {
|
||||
ResultDTO testResult = this.formatResult(record.value());
|
||||
if (testResult != null) {
|
||||
if (testResult.getArbitraryData() != null && testResult.getArbitraryData().containsKey("TEST_END") && (Boolean) testResult.getArbitraryData().get("TEST_END")) {
|
||||
resultDTOS.add(testResult);
|
||||
} else {
|
||||
String key = RUN_MODE_MAP.get(testResult.getRunMode());
|
||||
if (assortMap.containsKey(key)) {
|
||||
assortMap.get(key).add(testResult);
|
||||
} else {
|
||||
assortMap.put(key, new LinkedList<ResultDTO>() {{
|
||||
this.add(testResult);
|
||||
}});
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
if (MapUtils.isNotEmpty(assortMap)) {
|
||||
testResultService.batchSaveResults(assortMap);
|
||||
LoggerUtil.info("KAFKA消费执行内容存储结束");
|
||||
}
|
||||
// 更新执行结果
|
||||
if (CollectionUtils.isNotEmpty(resultDTOS)) {
|
||||
resultDTOS.forEach(testResult -> {
|
||||
LoggerUtil.info("报告 【 " + testResult.getReportId() + " 】资源 " + testResult.getTestId() + " 整体执行完成");
|
||||
testResultService.testEnded(testResult);
|
||||
LoggerUtil.info("执行队列处理:" + testResult.getQueueId());
|
||||
apiExecutionQueueService.queueNext(testResult);
|
||||
// 全局并发队列
|
||||
PoolExecBlockingQueueUtil.offer(testResult.getReportId());
|
||||
// 更新测试计划报告
|
||||
if (StringUtils.isNotEmpty(testResult.getTestPlanReportId())) {
|
||||
LoggerUtil.info("Check Processing Test Plan report status:" + testResult.getQueueId() + "," + testResult.getTestId());
|
||||
apiExecutionQueueService.testPlanReportTestEnded(testResult.getTestPlanReportId());
|
||||
}
|
||||
});
|
||||
}
|
||||
KafkaListenerTask task = new KafkaListenerTask();
|
||||
task.setApiExecutionQueueService(apiExecutionQueueService);
|
||||
task.setTestResultService(testResultService);
|
||||
task.setMapper(mapper);
|
||||
task.setRecords(records);
|
||||
threadPool.execute(task);
|
||||
this.outApiThreadPoolExecutorLogger();
|
||||
} catch (Exception e) {
|
||||
LoggerUtil.error("KAFKA消费失败:", e);
|
||||
} finally {
|
||||
|
@ -102,16 +85,15 @@ public class MsKafkaListener {
|
|||
}
|
||||
}
|
||||
|
||||
private ResultDTO formatResult(String result) {
|
||||
try {
|
||||
// 多态JSON普通转换会丢失内容,需要通过 ObjectMapper 获取
|
||||
if (StringUtils.isNotEmpty(result)) {
|
||||
return mapper.readValue(result, new TypeReference<ResultDTO>() {
|
||||
});
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LoggerUtil.error("formatResult 格式化数据失败:", e);
|
||||
}
|
||||
return null;
|
||||
public void outApiThreadPoolExecutorLogger() {
|
||||
StringBuffer buffer = new StringBuffer("KAFKA 消费队列处理详情 ");
|
||||
buffer.append("\n").append("KAFKA Consume 线程池详情:").append("\n");
|
||||
buffer.append(" KAFKA Consume 核心线程数:" + threadPool.getCorePoolSize()).append("\n");
|
||||
buffer.append(" KAFKA Consume 活动线程数:" + threadPool.getActiveCount()).append("\n");
|
||||
buffer.append(" KAFKA Consume 最大线程数:" + threadPool.getMaximumPoolSize()).append("\n");
|
||||
buffer.append(" KAFKA Consume 最大队列数:" + (threadPool.getQueue().size() + threadPool.getQueue().remainingCapacity())).append("\n");
|
||||
buffer.append(" KAFKA Consume 当前排队线程数:" + (threadPool.getQueue().size())).append("\n");
|
||||
LoggerUtil.info(buffer.toString());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -54,7 +54,7 @@ public class KafkaConfig {
|
|||
producerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
|
||||
|
||||
producerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
producerProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 20000);
|
||||
producerProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
|
||||
|
||||
producerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
||||
producerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
||||
|
@ -70,7 +70,7 @@ public class KafkaConfig {
|
|||
//开启批量监听
|
||||
factory.setBatchListener(true);
|
||||
|
||||
factory.getContainerProperties().setPollTimeout(60000);
|
||||
factory.getContainerProperties().setPollTimeout(5000L);
|
||||
|
||||
//设置提交偏移量的方式,
|
||||
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
|
||||
|
|
Loading…
Reference in New Issue