refactor(接口测试): 优化node执行结果处理,增加过程日志
This commit is contained in:
parent
8aac239f42
commit
01d6dfd6e7
|
@ -18,7 +18,7 @@ import java.util.*;
|
||||||
|
|
||||||
@Data
|
@Data
|
||||||
public class KafkaListenerTask implements Runnable {
|
public class KafkaListenerTask implements Runnable {
|
||||||
private List<ConsumerRecord<?, String>> records;
|
private ConsumerRecord<?, String> record;
|
||||||
private ApiExecutionQueueService apiExecutionQueueService;
|
private ApiExecutionQueueService apiExecutionQueueService;
|
||||||
private TestResultService testResultService;
|
private TestResultService testResultService;
|
||||||
private ObjectMapper mapper;
|
private ObjectMapper mapper;
|
||||||
|
@ -49,30 +49,27 @@ public class KafkaListenerTask implements Runnable {
|
||||||
// 分三类存储
|
// 分三类存储
|
||||||
Map<String, List<ResultDTO>> assortMap = new LinkedHashMap<>();
|
Map<String, List<ResultDTO>> assortMap = new LinkedHashMap<>();
|
||||||
List<ResultDTO> resultDTOS = new LinkedList<>();
|
List<ResultDTO> resultDTOS = new LinkedList<>();
|
||||||
|
LoggerUtil.info("报告【" + record.key() + "】开始解析结果");
|
||||||
records.forEach(record -> {
|
ResultDTO dto = this.formatResult();
|
||||||
ResultDTO testResult = this.formatResult(record.value());
|
if (dto == null) {
|
||||||
|
return;
|
||||||
LoggerUtil.info("KAFKA消费结果处理:【" + testResult.getReportId() + "】", testResult.getArbitraryData() != null ? testResult.getArbitraryData().get("TEST_END") : false);
|
}
|
||||||
if (testResult != null) {
|
if (dto.getArbitraryData() != null && dto.getArbitraryData().containsKey("TEST_END")
|
||||||
if (testResult.getArbitraryData() != null && testResult.getArbitraryData().containsKey("TEST_END")
|
&& (Boolean) dto.getArbitraryData().get("TEST_END")) {
|
||||||
&& (Boolean) testResult.getArbitraryData().get("TEST_END")) {
|
resultDTOS.add(dto);
|
||||||
resultDTOS.add(testResult);
|
LoggerUtil.info("KAFKA消费结果处理:【" + record.key() + "】结果状态:" + dto.getArbitraryData().get("TEST_END"));
|
||||||
}
|
}
|
||||||
// 携带结果
|
// 携带结果
|
||||||
if (CollectionUtils.isNotEmpty(testResult.getRequestResults())) {
|
if (CollectionUtils.isNotEmpty(dto.getRequestResults())) {
|
||||||
String key = RUN_MODE_MAP.get(testResult.getRunMode());
|
String key = RUN_MODE_MAP.get(dto.getRunMode());
|
||||||
if (assortMap.containsKey(key)) {
|
if (assortMap.containsKey(key)) {
|
||||||
assortMap.get(key).add(testResult);
|
assortMap.get(key).add(dto);
|
||||||
} else {
|
} else {
|
||||||
assortMap.put(key, new LinkedList<ResultDTO>() {{
|
assortMap.put(key, new LinkedList<ResultDTO>() {{
|
||||||
this.add(testResult);
|
this.add(dto);
|
||||||
}});
|
}});
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
});
|
}
|
||||||
|
|
||||||
if (MapUtils.isNotEmpty(assortMap)) {
|
if (MapUtils.isNotEmpty(assortMap)) {
|
||||||
LoggerUtil.info("KAFKA消费执行内容存储开始");
|
LoggerUtil.info("KAFKA消费执行内容存储开始");
|
||||||
testResultService.batchSaveResults(assortMap);
|
testResultService.batchSaveResults(assortMap);
|
||||||
|
@ -95,19 +92,19 @@ public class KafkaListenerTask implements Runnable {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LoggerUtil.error("KAFKA消费失败:", e);
|
LoggerUtil.error("报告【" + record.key() + "】KAFKA消费失败:", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private ResultDTO formatResult(String result) {
|
private ResultDTO formatResult() {
|
||||||
try {
|
try {
|
||||||
// 多态JSON普通转换会丢失内容,需要通过 ObjectMapper 获取
|
// 多态JSON普通转换会丢失内容,需要通过 ObjectMapper 获取
|
||||||
if (StringUtils.isNotEmpty(result)) {
|
if (StringUtils.isNotEmpty(record.value())) {
|
||||||
return mapper.readValue(result, new TypeReference<ResultDTO>() {
|
return mapper.readValue(record.value(), new TypeReference<ResultDTO>() {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LoggerUtil.error("formatResult 格式化数据失败:", e);
|
LoggerUtil.error("报告【" + record.key() + "】格式化数据失败:", e);
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,7 +4,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import io.metersphere.api.exec.utils.NamedThreadFactory;
|
import io.metersphere.api.exec.utils.NamedThreadFactory;
|
||||||
import io.metersphere.api.service.ApiExecutionQueueService;
|
import io.metersphere.api.service.ApiExecutionQueueService;
|
||||||
import io.metersphere.api.service.TestResultService;
|
import io.metersphere.api.service.TestResultService;
|
||||||
import io.metersphere.commons.constants.ApiRunMode;
|
|
||||||
import io.metersphere.config.KafkaConfig;
|
import io.metersphere.config.KafkaConfig;
|
||||||
import io.metersphere.utils.LoggerUtil;
|
import io.metersphere.utils.LoggerUtil;
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||||
|
@ -13,9 +12,7 @@ import org.springframework.kafka.annotation.KafkaListener;
|
||||||
import org.springframework.kafka.support.Acknowledgment;
|
import org.springframework.kafka.support.Acknowledgment;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.ArrayBlockingQueue;
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
@ -29,27 +26,6 @@ public class MsKafkaListener {
|
||||||
private TestResultService testResultService;
|
private TestResultService testResultService;
|
||||||
@Resource
|
@Resource
|
||||||
private ObjectMapper mapper;
|
private ObjectMapper mapper;
|
||||||
|
|
||||||
private static final Map<String, String> RUN_MODE_MAP = new HashMap<String, String>() {{
|
|
||||||
this.put(ApiRunMode.SCHEDULE_API_PLAN.name(), "schedule-task");
|
|
||||||
this.put(ApiRunMode.JENKINS_API_PLAN.name(), "schedule-task");
|
|
||||||
this.put(ApiRunMode.MANUAL_PLAN.name(), "schedule-task");
|
|
||||||
|
|
||||||
this.put(ApiRunMode.DEFINITION.name(), "api-test-case-task");
|
|
||||||
this.put(ApiRunMode.JENKINS.name(), "api-test-case-task");
|
|
||||||
this.put(ApiRunMode.API_PLAN.name(), "api-test-case-task");
|
|
||||||
this.put(ApiRunMode.JENKINS_API_PLAN.name(), "api-test-case-task");
|
|
||||||
this.put(ApiRunMode.MANUAL_PLAN.name(), "api-test-case-task");
|
|
||||||
|
|
||||||
|
|
||||||
this.put(ApiRunMode.SCENARIO.name(), "api-scenario-task");
|
|
||||||
this.put(ApiRunMode.SCENARIO_PLAN.name(), "api-scenario-task");
|
|
||||||
this.put(ApiRunMode.SCHEDULE_SCENARIO_PLAN.name(), "api-scenario-task");
|
|
||||||
this.put(ApiRunMode.SCHEDULE_SCENARIO.name(), "api-scenario-task");
|
|
||||||
this.put(ApiRunMode.JENKINS_SCENARIO_PLAN.name(), "api-scenario-task");
|
|
||||||
|
|
||||||
}};
|
|
||||||
|
|
||||||
// 线程池维护线程的最少数量
|
// 线程池维护线程的最少数量
|
||||||
private final static int CORE_POOL_SIZE = 20;
|
private final static int CORE_POOL_SIZE = 20;
|
||||||
// 线程池维护线程的最大数量
|
// 线程池维护线程的最大数量
|
||||||
|
@ -71,13 +47,16 @@ public class MsKafkaListener {
|
||||||
@KafkaListener(id = CONSUME_ID, topics = KafkaConfig.TOPICS, groupId = "${spring.kafka.consumer.group-id}", containerFactory = "batchFactory")
|
@KafkaListener(id = CONSUME_ID, topics = KafkaConfig.TOPICS, groupId = "${spring.kafka.consumer.group-id}", containerFactory = "batchFactory")
|
||||||
public void consume(List<ConsumerRecord<?, String>> records, Acknowledgment ack) {
|
public void consume(List<ConsumerRecord<?, String>> records, Acknowledgment ack) {
|
||||||
try {
|
try {
|
||||||
KafkaListenerTask task = new KafkaListenerTask();
|
records.forEach(item -> {
|
||||||
task.setApiExecutionQueueService(apiExecutionQueueService);
|
LoggerUtil.info("接收到报告【" + item.key() + "】,加入到结果处理队列");
|
||||||
task.setTestResultService(testResultService);
|
KafkaListenerTask task = new KafkaListenerTask();
|
||||||
task.setMapper(mapper);
|
task.setApiExecutionQueueService(apiExecutionQueueService);
|
||||||
task.setRecords(records);
|
task.setTestResultService(testResultService);
|
||||||
threadPool.execute(task);
|
task.setMapper(mapper);
|
||||||
this.outApiThreadPoolExecutorLogger();
|
task.setRecord(item);
|
||||||
|
threadPool.execute(task);
|
||||||
|
this.outKafkaPoolLogger();
|
||||||
|
});
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LoggerUtil.error("KAFKA消费失败:", e);
|
LoggerUtil.error("KAFKA消费失败:", e);
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -85,14 +64,21 @@ public class MsKafkaListener {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void outApiThreadPoolExecutorLogger() {
|
public void outKafkaPoolLogger() {
|
||||||
StringBuffer buffer = new StringBuffer("KAFKA 消费队列处理详情 ");
|
StringBuffer buffer = new StringBuffer()
|
||||||
buffer.append("\n").append("KAFKA Consume 线程池详情:").append("\n");
|
.append("\n")
|
||||||
buffer.append(" KAFKA Consume 核心线程数:" + threadPool.getCorePoolSize()).append("\n");
|
.append("KAFKA Consume 线程池详情:")
|
||||||
buffer.append(" KAFKA Consume 活动线程数:" + threadPool.getActiveCount()).append("\n");
|
.append("\n")
|
||||||
buffer.append(" KAFKA Consume 最大线程数:" + threadPool.getMaximumPoolSize()).append("\n");
|
.append(" KAFKA Consume 核心线程数:" + threadPool.getCorePoolSize())
|
||||||
buffer.append(" KAFKA Consume 最大队列数:" + (threadPool.getQueue().size() + threadPool.getQueue().remainingCapacity())).append("\n");
|
.append("\n")
|
||||||
buffer.append(" KAFKA Consume 当前排队线程数:" + (threadPool.getQueue().size())).append("\n");
|
.append(" KAFKA Consume 活动线程数:" + threadPool.getActiveCount())
|
||||||
|
.append("\n")
|
||||||
|
.append(" KAFKA Consume 最大线程数:" + threadPool.getMaximumPoolSize())
|
||||||
|
.append("\n")
|
||||||
|
.append(" KAFKA Consume 最大队列数:" + (threadPool.getQueue().size() + threadPool.getQueue().remainingCapacity()))
|
||||||
|
.append("\n")
|
||||||
|
.append(" KAFKA Consume 当前排队线程数:" + (threadPool.getQueue().size()))
|
||||||
|
.append("\n");
|
||||||
LoggerUtil.info(buffer.toString());
|
LoggerUtil.info(buffer.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue