diff --git a/backend/src/main/java/io/metersphere/api/jmeter/KafkaListenerTask.java b/backend/src/main/java/io/metersphere/api/jmeter/KafkaListenerTask.java index e36f2a6ea5..7fe83f030d 100644 --- a/backend/src/main/java/io/metersphere/api/jmeter/KafkaListenerTask.java +++ b/backend/src/main/java/io/metersphere/api/jmeter/KafkaListenerTask.java @@ -18,7 +18,7 @@ import java.util.*; @Data public class KafkaListenerTask implements Runnable { - private List> records; + private ConsumerRecord record; private ApiExecutionQueueService apiExecutionQueueService; private TestResultService testResultService; private ObjectMapper mapper; @@ -49,30 +49,27 @@ public class KafkaListenerTask implements Runnable { // 分三类存储 Map> assortMap = new LinkedHashMap<>(); List resultDTOS = new LinkedList<>(); - - records.forEach(record -> { - ResultDTO testResult = this.formatResult(record.value()); - - LoggerUtil.info("KAFKA消费结果处理:【" + testResult.getReportId() + "】", testResult.getArbitraryData() != null ? testResult.getArbitraryData().get("TEST_END") : false); - if (testResult != null) { - if (testResult.getArbitraryData() != null && testResult.getArbitraryData().containsKey("TEST_END") - && (Boolean) testResult.getArbitraryData().get("TEST_END")) { - resultDTOS.add(testResult); - } - // 携带结果 - if (CollectionUtils.isNotEmpty(testResult.getRequestResults())) { - String key = RUN_MODE_MAP.get(testResult.getRunMode()); - if (assortMap.containsKey(key)) { - assortMap.get(key).add(testResult); - } else { - assortMap.put(key, new LinkedList() {{ - this.add(testResult); - }}); - } - } + LoggerUtil.info("报告【" + record.key() + "】开始解析结果"); + ResultDTO dto = this.formatResult(); + if (dto == null) { + return; + } + if (dto.getArbitraryData() != null && dto.getArbitraryData().containsKey("TEST_END") + && (Boolean) dto.getArbitraryData().get("TEST_END")) { + resultDTOS.add(dto); + LoggerUtil.info("KAFKA消费结果处理:【" + record.key() + "】结果状态:" + dto.getArbitraryData().get("TEST_END")); + } + // 携带结果 + if (CollectionUtils.isNotEmpty(dto.getRequestResults())) { + String key = RUN_MODE_MAP.get(dto.getRunMode()); + if (assortMap.containsKey(key)) { + assortMap.get(key).add(dto); + } else { + assortMap.put(key, new LinkedList() {{ + this.add(dto); + }}); } - }); - + } if (MapUtils.isNotEmpty(assortMap)) { LoggerUtil.info("KAFKA消费执行内容存储开始"); testResultService.batchSaveResults(assortMap); @@ -95,19 +92,19 @@ public class KafkaListenerTask implements Runnable { }); } } catch (Exception e) { - LoggerUtil.error("KAFKA消费失败:", e); + LoggerUtil.error("报告【" + record.key() + "】KAFKA消费失败:", e); } } - private ResultDTO formatResult(String result) { + private ResultDTO formatResult() { try { // 多态JSON普通转换会丢失内容,需要通过 ObjectMapper 获取 - if (StringUtils.isNotEmpty(result)) { - return mapper.readValue(result, new TypeReference() { + if (StringUtils.isNotEmpty(record.value())) { + return mapper.readValue(record.value(), new TypeReference() { }); } } catch (Exception e) { - LoggerUtil.error("formatResult 格式化数据失败:", e); + LoggerUtil.error("报告【" + record.key() + "】格式化数据失败:", e); } return null; } diff --git a/backend/src/main/java/io/metersphere/api/jmeter/MsKafkaListener.java b/backend/src/main/java/io/metersphere/api/jmeter/MsKafkaListener.java index eb7eada5b1..be49235dd2 100644 --- a/backend/src/main/java/io/metersphere/api/jmeter/MsKafkaListener.java +++ b/backend/src/main/java/io/metersphere/api/jmeter/MsKafkaListener.java @@ -4,7 +4,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.metersphere.api.exec.utils.NamedThreadFactory; import io.metersphere.api.service.ApiExecutionQueueService; import io.metersphere.api.service.TestResultService; -import io.metersphere.commons.constants.ApiRunMode; import io.metersphere.config.KafkaConfig; import io.metersphere.utils.LoggerUtil; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -13,9 +12,7 @@ import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import javax.annotation.Resource; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -29,27 +26,6 @@ public class MsKafkaListener { private TestResultService testResultService; @Resource private ObjectMapper mapper; - - private static final Map RUN_MODE_MAP = new HashMap() {{ - this.put(ApiRunMode.SCHEDULE_API_PLAN.name(), "schedule-task"); - this.put(ApiRunMode.JENKINS_API_PLAN.name(), "schedule-task"); - this.put(ApiRunMode.MANUAL_PLAN.name(), "schedule-task"); - - this.put(ApiRunMode.DEFINITION.name(), "api-test-case-task"); - this.put(ApiRunMode.JENKINS.name(), "api-test-case-task"); - this.put(ApiRunMode.API_PLAN.name(), "api-test-case-task"); - this.put(ApiRunMode.JENKINS_API_PLAN.name(), "api-test-case-task"); - this.put(ApiRunMode.MANUAL_PLAN.name(), "api-test-case-task"); - - - this.put(ApiRunMode.SCENARIO.name(), "api-scenario-task"); - this.put(ApiRunMode.SCENARIO_PLAN.name(), "api-scenario-task"); - this.put(ApiRunMode.SCHEDULE_SCENARIO_PLAN.name(), "api-scenario-task"); - this.put(ApiRunMode.SCHEDULE_SCENARIO.name(), "api-scenario-task"); - this.put(ApiRunMode.JENKINS_SCENARIO_PLAN.name(), "api-scenario-task"); - - }}; - // 线程池维护线程的最少数量 private final static int CORE_POOL_SIZE = 20; // 线程池维护线程的最大数量 @@ -71,13 +47,16 @@ public class MsKafkaListener { @KafkaListener(id = CONSUME_ID, topics = KafkaConfig.TOPICS, groupId = "${spring.kafka.consumer.group-id}", containerFactory = "batchFactory") public void consume(List> records, Acknowledgment ack) { try { - KafkaListenerTask task = new KafkaListenerTask(); - task.setApiExecutionQueueService(apiExecutionQueueService); - task.setTestResultService(testResultService); - task.setMapper(mapper); - task.setRecords(records); - threadPool.execute(task); - this.outApiThreadPoolExecutorLogger(); + records.forEach(item -> { + LoggerUtil.info("接收到报告【" + item.key() + "】,加入到结果处理队列"); + KafkaListenerTask task = new KafkaListenerTask(); + task.setApiExecutionQueueService(apiExecutionQueueService); + task.setTestResultService(testResultService); + task.setMapper(mapper); + task.setRecord(item); + threadPool.execute(task); + this.outKafkaPoolLogger(); + }); } catch (Exception e) { LoggerUtil.error("KAFKA消费失败:", e); } finally { @@ -85,14 +64,21 @@ public class MsKafkaListener { } } - public void outApiThreadPoolExecutorLogger() { - StringBuffer buffer = new StringBuffer("KAFKA 消费队列处理详情 "); - buffer.append("\n").append("KAFKA Consume 线程池详情:").append("\n"); - buffer.append(" KAFKA Consume 核心线程数:" + threadPool.getCorePoolSize()).append("\n"); - buffer.append(" KAFKA Consume 活动线程数:" + threadPool.getActiveCount()).append("\n"); - buffer.append(" KAFKA Consume 最大线程数:" + threadPool.getMaximumPoolSize()).append("\n"); - buffer.append(" KAFKA Consume 最大队列数:" + (threadPool.getQueue().size() + threadPool.getQueue().remainingCapacity())).append("\n"); - buffer.append(" KAFKA Consume 当前排队线程数:" + (threadPool.getQueue().size())).append("\n"); + public void outKafkaPoolLogger() { + StringBuffer buffer = new StringBuffer() + .append("\n") + .append("KAFKA Consume 线程池详情:") + .append("\n") + .append(" KAFKA Consume 核心线程数:" + threadPool.getCorePoolSize()) + .append("\n") + .append(" KAFKA Consume 活动线程数:" + threadPool.getActiveCount()) + .append("\n") + .append(" KAFKA Consume 最大线程数:" + threadPool.getMaximumPoolSize()) + .append("\n") + .append(" KAFKA Consume 最大队列数:" + (threadPool.getQueue().size() + threadPool.getQueue().remainingCapacity())) + .append("\n") + .append(" KAFKA Consume 当前排队线程数:" + (threadPool.getQueue().size())) + .append("\n"); LoggerUtil.info(buffer.toString()); }