diff --git a/backend/src/main/java/io/metersphere/api/jmeter/JMeterService.java b/backend/src/main/java/io/metersphere/api/jmeter/JMeterService.java index 2eecb3c40f..f858a93946 100644 --- a/backend/src/main/java/io/metersphere/api/jmeter/JMeterService.java +++ b/backend/src/main/java/io/metersphere/api/jmeter/JMeterService.java @@ -151,7 +151,7 @@ public class JMeterService { } } - private void send(JmeterRunRequestDTO request, List resources) { + private synchronized void send(JmeterRunRequestDTO request, List resources) { try { if (StringUtils.isNotEmpty(request.getPoolId()) && CollectionUtils.isEmpty(resources)) { resources = GenerateHashTreeUtil.setPoolResource(request.getPoolId()); diff --git a/backend/src/main/java/io/metersphere/api/jmeter/KafkaListenerTask.java b/backend/src/main/java/io/metersphere/api/jmeter/KafkaListenerTask.java new file mode 100644 index 0000000000..42cdba7ab7 --- /dev/null +++ b/backend/src/main/java/io/metersphere/api/jmeter/KafkaListenerTask.java @@ -0,0 +1,109 @@ +package io.metersphere.api.jmeter; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.metersphere.api.exec.queue.PoolExecBlockingQueueUtil; +import io.metersphere.api.service.ApiExecutionQueueService; +import io.metersphere.api.service.TestResultService; +import io.metersphere.commons.constants.ApiRunMode; +import io.metersphere.dto.ResultDTO; +import io.metersphere.utils.LoggerUtil; +import lombok.Data; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +import java.util.*; + +@Data +public class KafkaListenerTask implements Runnable { + private List> records; + private ApiExecutionQueueService apiExecutionQueueService; + private TestResultService testResultService; + private ObjectMapper mapper; + + private static final Map RUN_MODE_MAP = new HashMap() {{ + this.put(ApiRunMode.SCHEDULE_API_PLAN.name(), "schedule-task"); + this.put(ApiRunMode.JENKINS_API_PLAN.name(), "schedule-task"); + this.put(ApiRunMode.MANUAL_PLAN.name(), "schedule-task"); + + this.put(ApiRunMode.DEFINITION.name(), "api-test-case-task"); + this.put(ApiRunMode.JENKINS.name(), "api-test-case-task"); + this.put(ApiRunMode.API_PLAN.name(), "api-test-case-task"); + this.put(ApiRunMode.JENKINS_API_PLAN.name(), "api-test-case-task"); + this.put(ApiRunMode.MANUAL_PLAN.name(), "api-test-case-task"); + + + this.put(ApiRunMode.SCENARIO.name(), "api-scenario-task"); + this.put(ApiRunMode.SCENARIO_PLAN.name(), "api-scenario-task"); + this.put(ApiRunMode.SCHEDULE_SCENARIO_PLAN.name(), "api-scenario-task"); + this.put(ApiRunMode.SCHEDULE_SCENARIO.name(), "api-scenario-task"); + this.put(ApiRunMode.JENKINS_SCENARIO_PLAN.name(), "api-scenario-task"); + + }}; + + @Override + public void run() { + try { + LoggerUtil.info("进入KAFKA消费,接收到执行结果开始存储:" + records.size()); + // 分三类存储 + Map> assortMap = new LinkedHashMap<>(); + List resultDTOS = new LinkedList<>(); + + records.forEach(record -> { + ResultDTO testResult = this.formatResult(record.value()); + if (testResult != null) { + if (testResult.getArbitraryData() != null && testResult.getArbitraryData().containsKey("TEST_END") && (Boolean) testResult.getArbitraryData().get("TEST_END")) { + resultDTOS.add(testResult); + } else { + String key = RUN_MODE_MAP.get(testResult.getRunMode()); + if (assortMap.containsKey(key)) { + assortMap.get(key).add(testResult); + } else { + assortMap.put(key, new LinkedList() {{ + this.add(testResult); + }}); + } + } + } + }); + if (MapUtils.isNotEmpty(assortMap)) { + LoggerUtil.info("KAFKA消费执行内容存储开始"); + testResultService.batchSaveResults(assortMap); + LoggerUtil.info("KAFKA消费执行内容存储结束"); + } + // 更新执行结果 + if (CollectionUtils.isNotEmpty(resultDTOS)) { + resultDTOS.forEach(testResult -> { + LoggerUtil.info("报告 【 " + testResult.getReportId() + " 】资源 " + testResult.getTestId() + " 整体执行完成"); + testResultService.testEnded(testResult); + LoggerUtil.info("执行队列处理:" + testResult.getQueueId()); + apiExecutionQueueService.queueNext(testResult); + // 全局并发队列 + PoolExecBlockingQueueUtil.offer(testResult.getReportId()); + // 更新测试计划报告 + if (StringUtils.isNotEmpty(testResult.getTestPlanReportId())) { + LoggerUtil.info("Check Processing Test Plan report status:" + testResult.getQueueId() + "," + testResult.getTestId()); + apiExecutionQueueService.testPlanReportTestEnded(testResult.getTestPlanReportId()); + } + }); + } + } catch (Exception e) { + LoggerUtil.error("KAFKA消费失败:", e); + } + } + + private ResultDTO formatResult(String result) { + try { + // 多态JSON普通转换会丢失内容,需要通过 ObjectMapper 获取 + if (StringUtils.isNotEmpty(result)) { + return mapper.readValue(result, new TypeReference() { + }); + } + } catch (Exception e) { + LoggerUtil.error("formatResult 格式化数据失败:", e); + } + return null; + } +} diff --git a/backend/src/main/java/io/metersphere/api/jmeter/MsKafkaListener.java b/backend/src/main/java/io/metersphere/api/jmeter/MsKafkaListener.java index 294c3245ef..81fd831b7c 100644 --- a/backend/src/main/java/io/metersphere/api/jmeter/MsKafkaListener.java +++ b/backend/src/main/java/io/metersphere/api/jmeter/MsKafkaListener.java @@ -1,24 +1,24 @@ package io.metersphere.api.jmeter; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import io.metersphere.api.exec.queue.PoolExecBlockingQueueUtil; +import io.metersphere.api.exec.utils.NamedThreadFactory; import io.metersphere.api.service.ApiExecutionQueueService; import io.metersphere.api.service.TestResultService; import io.metersphere.commons.constants.ApiRunMode; import io.metersphere.config.KafkaConfig; -import io.metersphere.dto.ResultDTO; import io.metersphere.utils.LoggerUtil; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.collections.MapUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import javax.annotation.Resource; -import java.util.*; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; @Configuration public class MsKafkaListener { @@ -50,51 +50,34 @@ public class MsKafkaListener { }}; + // 线程池维护线程的最少数量 + private final static int CORE_POOL_SIZE = 50; + // 线程池维护线程的最大数量 + private final static int MAX_POOL_SIZE = 50; + // 线程池维护线程所允许的空闲时间 + private final static int KEEP_ALIVE_TIME = 1; + // 线程池所使用的缓冲队列大小 + private final static int WORK_QUEUE_SIZE = 10000; + + + private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( + CORE_POOL_SIZE, + MAX_POOL_SIZE, + KEEP_ALIVE_TIME, + TimeUnit.SECONDS, + new ArrayBlockingQueue(WORK_QUEUE_SIZE), + new NamedThreadFactory("MS-KAFKA-LISTENER-TASK")); + @KafkaListener(id = CONSUME_ID, topics = KafkaConfig.TOPICS, groupId = "${spring.kafka.consumer.group-id}", containerFactory = "batchFactory") public void consume(List> records, Acknowledgment ack) { try { - LoggerUtil.info("进入KAFKA消费,接收到执行结果开始存储:" + records.size()); - // 分三类存储 - Map> assortMap = new LinkedHashMap<>(); - List resultDTOS = new LinkedList<>(); - - records.forEach(record -> { - ResultDTO testResult = this.formatResult(record.value()); - if (testResult != null) { - if (testResult.getArbitraryData() != null && testResult.getArbitraryData().containsKey("TEST_END") && (Boolean) testResult.getArbitraryData().get("TEST_END")) { - resultDTOS.add(testResult); - } else { - String key = RUN_MODE_MAP.get(testResult.getRunMode()); - if (assortMap.containsKey(key)) { - assortMap.get(key).add(testResult); - } else { - assortMap.put(key, new LinkedList() {{ - this.add(testResult); - }}); - } - } - } - }); - if (MapUtils.isNotEmpty(assortMap)) { - testResultService.batchSaveResults(assortMap); - LoggerUtil.info("KAFKA消费执行内容存储结束"); - } - // 更新执行结果 - if (CollectionUtils.isNotEmpty(resultDTOS)) { - resultDTOS.forEach(testResult -> { - LoggerUtil.info("报告 【 " + testResult.getReportId() + " 】资源 " + testResult.getTestId() + " 整体执行完成"); - testResultService.testEnded(testResult); - LoggerUtil.info("执行队列处理:" + testResult.getQueueId()); - apiExecutionQueueService.queueNext(testResult); - // 全局并发队列 - PoolExecBlockingQueueUtil.offer(testResult.getReportId()); - // 更新测试计划报告 - if (StringUtils.isNotEmpty(testResult.getTestPlanReportId())) { - LoggerUtil.info("Check Processing Test Plan report status:" + testResult.getQueueId() + "," + testResult.getTestId()); - apiExecutionQueueService.testPlanReportTestEnded(testResult.getTestPlanReportId()); - } - }); - } + KafkaListenerTask task = new KafkaListenerTask(); + task.setApiExecutionQueueService(apiExecutionQueueService); + task.setTestResultService(testResultService); + task.setMapper(mapper); + task.setRecords(records); + threadPool.execute(task); + this.outApiThreadPoolExecutorLogger(); } catch (Exception e) { LoggerUtil.error("KAFKA消费失败:", e); } finally { @@ -102,16 +85,15 @@ public class MsKafkaListener { } } - private ResultDTO formatResult(String result) { - try { - // 多态JSON普通转换会丢失内容,需要通过 ObjectMapper 获取 - if (StringUtils.isNotEmpty(result)) { - return mapper.readValue(result, new TypeReference() { - }); - } - } catch (Exception e) { - LoggerUtil.error("formatResult 格式化数据失败:", e); - } - return null; + public void outApiThreadPoolExecutorLogger() { + StringBuffer buffer = new StringBuffer("KAFKA 消费队列处理详情 "); + buffer.append("\n").append("KAFKA Consume 线程池详情:").append("\n"); + buffer.append(" KAFKA Consume 核心线程数:" + threadPool.getCorePoolSize()).append("\n"); + buffer.append(" KAFKA Consume 活动线程数:" + threadPool.getActiveCount()).append("\n"); + buffer.append(" KAFKA Consume 最大线程数:" + threadPool.getMaximumPoolSize()).append("\n"); + buffer.append(" KAFKA Consume 最大队列数:" + (threadPool.getQueue().size() + threadPool.getQueue().remainingCapacity())).append("\n"); + buffer.append(" KAFKA Consume 当前排队线程数:" + (threadPool.getQueue().size())).append("\n"); + LoggerUtil.info(buffer.toString()); } + } diff --git a/backend/src/main/java/io/metersphere/config/KafkaConfig.java b/backend/src/main/java/io/metersphere/config/KafkaConfig.java index f89dab7597..832e1e9bf9 100644 --- a/backend/src/main/java/io/metersphere/config/KafkaConfig.java +++ b/backend/src/main/java/io/metersphere/config/KafkaConfig.java @@ -54,7 +54,7 @@ public class KafkaConfig { producerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); producerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - producerProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 20000); + producerProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000); producerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); producerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); @@ -70,7 +70,7 @@ public class KafkaConfig { //开启批量监听 factory.setBatchListener(true); - factory.getContainerProperties().setPollTimeout(60000); + factory.getContainerProperties().setPollTimeout(5000L); //设置提交偏移量的方式, factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);