fix(UI自动化): 解决UI并发报告部分未执行问题

This commit is contained in:
zhangyong 2022-10-31 11:52:40 +08:00 committed by 刘瑞斌
parent cdc3708fe8
commit 4926b6572b
7 changed files with 286 additions and 8 deletions

View File

@ -3,9 +3,13 @@ package io.metersphere.api.exec.queue;
import io.metersphere.api.jmeter.JMeterService;
import io.metersphere.api.jmeter.JMeterThreadUtils;
import io.metersphere.cache.JMeterEngineCache;
import io.metersphere.commons.constants.ParamConstants;
import io.metersphere.commons.utils.CommonBeanFactory;
import io.metersphere.commons.utils.JmeterThreadUtils;
import io.metersphere.constants.RunModeConstants;
import io.metersphere.dto.JmeterRunRequestDTO;
import io.metersphere.utils.LoggerUtil;
import org.apache.commons.lang3.StringUtils;
public class ExecTask implements Runnable {
private JmeterRunRequestDTO request;
@ -20,6 +24,11 @@ public class ExecTask implements Runnable {
@Override
public void run() {
if (request.getRunMode().startsWith(ParamConstants.MODEL.RUN_MODEL_UI.getValue()) &&
StringUtils.equals(request.getRunType(), RunModeConstants.PARALLEL.toString()) &&
StringUtils.equals(request.getReportType(), RunModeConstants.SET_REPORT.toString())) {
request.setReportId(JmeterThreadUtils.getThreadName(request));
}
CommonBeanFactory.getBean(JMeterService.class).addQueue(request);
Object res = PoolExecBlockingQueueUtil.take(request.getReportId());
if (res == null && !JMeterThreadUtils.isRunning(request.getReportId(), request.getTestId())) {

View File

@ -0,0 +1,181 @@
package io.metersphere.api.exec.queue;
import io.metersphere.api.exec.utils.NamedThreadFactory;
import io.metersphere.dto.JmeterRunRequestDTO;
import io.metersphere.utils.LoggerUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.springframework.stereotype.Service;
import java.util.Queue;
import java.util.concurrent.*;
@Service
public class UiExecThreadPoolExecutor {
// 线程池维护线程的最少数量
private final static int CORE_POOL_SIZE = 1;
// 线程池维护线程的最大数量
private final static int MAX_POOL_SIZE = 1;
// 线程池维护线程所允许的空闲时间
private final static int KEEP_ALIVE_TIME = 1;
// 线程池所使用的缓冲队列大小
private final static int WORK_QUEUE_SIZE = 10000;
private MsRejectedExecutionHandler msRejectedExecutionHandler = new MsRejectedExecutionHandler();
/**
* 创建线程池
*/
private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new ArrayBlockingQueue(WORK_QUEUE_SIZE),
new NamedThreadFactory("UI-MS-JMETER-RUN-TASK"),
msRejectedExecutionHandler);
/**
* 缓冲区调度线程池
*/
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory("MS-BUFFER-SCHEDULED"));
public void addTask(JmeterRunRequestDTO requestDTO) {
ExecTask task = new ExecTask(requestDTO);
threadPool.execute(task);
outApiThreadPoolExecutorLogger("报告:[" + requestDTO.getReportId() + "] 资源:[" + requestDTO.getTestId() + "] 加入执行队列");
}
/**
* 调度线程池检查缓冲区
*/
final ScheduledFuture scheduledFuture = scheduler.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
//判断缓冲队列是否存在记录
if (CollectionUtils.isNotEmpty(msRejectedExecutionHandler.getBufferQueue())) {
//当线程池的队列容量少于WORK_QUEUE_SIZE则开始把缓冲队列的任务 加入到 线程池
if (threadPool.getQueue().size() < WORK_QUEUE_SIZE) {
JmeterRunRequestDTO requestDTO = msRejectedExecutionHandler.getBufferQueue().poll();
ExecTask task = new ExecTask(requestDTO);
threadPool.submit(task);
LoggerUtil.info("把缓冲区任务重新添加到线程池报告ID" + requestDTO.getReportId());
}
}
}
}, 0, 2, TimeUnit.SECONDS);
/**
* 终止线程池和调度线程池
*/
public void shutdown() {
//true表示如果定时任务在执行立即中止false则等待任务结束后再停止
LoggerUtil.info("终止执行线程池和调度线程池:" + scheduledFuture.cancel(true));
scheduler.shutdown();
threadPool.shutdown();
}
/**
* 保留两位小数
*/
private String divide(int num1, int num2) {
return String.format("%1.2f%%", Double.parseDouble(num1 + "") / Double.parseDouble(num2 + "") * 100);
}
public void outApiThreadPoolExecutorLogger(String message) {
ArrayBlockingQueue queue = (ArrayBlockingQueue) threadPool.getQueue();
StringBuffer buffer = new StringBuffer("\n" + message);
buffer.append("\n").append("线程池详情:").append("\n");
buffer.append(" 核心线程数:" + threadPool.getCorePoolSize()).append("\n");
buffer.append(" 活动线程数:" + threadPool.getActiveCount()).append(" (略有波动非精确数据)").append("\n");
buffer.append(" 最大线程数:" + threadPool.getMaximumPoolSize()).append("\n");
buffer.append(" 线程池活跃度:" + divide(threadPool.getActiveCount(), threadPool.getMaximumPoolSize())).append("\n");
buffer.append(" 最大队列数:" + (queue.size() + queue.remainingCapacity())).append("\n");
buffer.append(" 当前排队线程数:" + (msRejectedExecutionHandler.getBufferQueue().size() + queue.size())).append("\n");
buffer.append(" 执行中队列大小:" + PoolExecBlockingQueueUtil.queue.size()).append("\n");
buffer.append(" 队列使用度:" + divide(queue.size(), queue.size() + queue.remainingCapacity()));
LoggerUtil.info(buffer.toString());
if (queue.size() > 0 && LoggerUtil.getLogger().isDebugEnabled()) {
LoggerUtil.debug(this.getWorkerQueue());
}
}
public void setCorePoolSize(int maximumPoolSize) {
try {
if (maximumPoolSize != threadPool.getMaximumPoolSize()) {
threadPool.setMaximumPoolSize(maximumPoolSize);
int corePoolSize = maximumPoolSize > 500 ? 500 : maximumPoolSize;
if (corePoolSize > CORE_POOL_SIZE) {
threadPool.setCorePoolSize(corePoolSize);
}
threadPool.allowCoreThreadTimeOut(true);
LoggerUtil.info("AllCoreThreads: " + threadPool.prestartAllCoreThreads());
}
} catch (Exception e) {
LoggerUtil.error("设置线程参数异常:", e);
}
}
public void removeQueue(String reportId) {
// 检查缓冲区
Queue<JmeterRunRequestDTO> bufferQueue = msRejectedExecutionHandler.getBufferQueue();
if (CollectionUtils.isNotEmpty(bufferQueue)) {
bufferQueue.forEach(item -> {
if (item != null && StringUtils.equals(item.getReportId(), reportId)) {
bufferQueue.remove(item);
}
});
}
// 检查等待队列
BlockingQueue workerQueue = threadPool.getQueue();
workerQueue.forEach(item -> {
ExecTask task = (ExecTask) item;
if (task != null && task.getRequest() != null && StringUtils.equals(task.getRequest().getReportId(), reportId)) {
workerQueue.remove(item);
}
});
}
public void removeAllQueue() {
// 检查缓冲区
msRejectedExecutionHandler.getBufferQueue().clear();
// 检查等待队列
threadPool.getQueue().clear();
}
public boolean check(String reportId) {
// 检查缓冲区
Queue<JmeterRunRequestDTO> bufferQueue = msRejectedExecutionHandler.getBufferQueue();
if (CollectionUtils.isNotEmpty(bufferQueue)) {
return bufferQueue.stream().filter(task -> StringUtils.equals(task.getReportId(), reportId)).count() > 0;
}
// 检查等待队列
BlockingQueue workerQueue = threadPool.getQueue();
return workerQueue.stream().filter(task -> StringUtils.equals(((ExecTask) task).getRequest().getReportId(), reportId)).count() > 0;
}
public boolean checkPlanReport(String planReportId) {
// 检查缓冲区
Queue<JmeterRunRequestDTO> bufferQueue = msRejectedExecutionHandler.getBufferQueue();
if (CollectionUtils.isNotEmpty(bufferQueue)) {
return bufferQueue.stream().filter(task -> StringUtils.equals(task.getTestPlanReportId(), planReportId)).count() > 0;
}
// 检查等待队列
BlockingQueue workerQueue = threadPool.getQueue();
return workerQueue.stream().filter(task -> StringUtils.equals(((ExecTask) task).getRequest().getTestPlanReportId(), planReportId)).count() > 0;
}
public String getWorkerQueue() {
StringBuffer buffer = new StringBuffer();
BlockingQueue workerQueue = threadPool.getQueue();
workerQueue.forEach(item -> {
ExecTask task = (ExecTask) item;
if (task.getRequest() != null) {
buffer.append("等待队列报告:【 " + task.getRequest().getReportId() + "】资源:【 " + task.getRequest().getTestId() + "").append("\n");
}
});
return buffer.toString();
}
}

View File

@ -2,11 +2,13 @@ package io.metersphere.api.jmeter;
import com.alibaba.fastjson.JSON;
import io.metersphere.api.exec.queue.ExecThreadPoolExecutor;
import io.metersphere.api.exec.queue.UiExecThreadPoolExecutor;
import io.metersphere.api.exec.utils.GenerateHashTreeUtil;
import io.metersphere.api.jmeter.utils.ServerConfig;
import io.metersphere.api.jmeter.utils.SmoothWeighted;
import io.metersphere.api.service.RemakeReportService;
import io.metersphere.commons.constants.ApiRunMode;
import io.metersphere.commons.constants.ParamConstants;
import io.metersphere.commons.utils.CommonBeanFactory;
import io.metersphere.commons.utils.HashTreeUtil;
import io.metersphere.config.JmeterProperties;
@ -192,9 +194,13 @@ public class JMeterService {
} else {
//解析hashTree是否含有文件库文件
HashTreeUtil.initRepositoryFiles(request);
if(request.getRunMode().startsWith(ParamConstants.MODEL.RUN_MODEL_UI.getValue())){
CommonBeanFactory.getBean(UiExecThreadPoolExecutor.class).addTask(request);
}else{
CommonBeanFactory.getBean(ExecThreadPoolExecutor.class).addTask(request);
}
}
}
public void addQueue(JmeterRunRequestDTO request) {
this.runLocal(request);

View File

@ -6,8 +6,10 @@ import io.metersphere.api.exec.queue.PoolExecBlockingQueueUtil;
import io.metersphere.api.service.ApiExecutionQueueService;
import io.metersphere.api.service.TestResultService;
import io.metersphere.cache.JMeterEngineCache;
import io.metersphere.commons.constants.ParamConstants;
import io.metersphere.commons.utils.CommonBeanFactory;
import io.metersphere.commons.utils.FileUtils;
import io.metersphere.commons.utils.JmeterThreadUtils;
import io.metersphere.constants.BackendListenerConstants;
import io.metersphere.constants.RunModeConstants;
import io.metersphere.dto.ResultDTO;
@ -55,23 +57,40 @@ public class MsApiBackendListener extends AbstractBackendListenerClient implemen
// 清理过程步骤
queues = RetryResultUtil.clearLoops(queues);
JMeterBase.resultFormatting(queues, dto);
if(dto.getRunMode().startsWith(ParamConstants.MODEL.RUN_MODEL_UI.getValue()) &&
StringUtils.equals(dto.getRunType(), RunModeConstants.PARALLEL.toString()) &&
StringUtils.equals(dto.getReportType(), RunModeConstants.SET_REPORT.toString())){
dto.setReportId(JmeterThreadUtils.getReportId(dto.getReportId()));
}
if (dto.isRetryEnable()) {
LoggerUtil.info("重试结果处理【" + dto.getReportId() + " 】开始");
RetryResultUtil.mergeRetryResults(dto.getRequestResults());
LoggerUtil.info("重试结果处理【" + dto.getReportId() + " 】结束");
}
String console = FixedCapacityUtils.getJmeterLogger(dto.getReportId(), !StringUtils.equals(dto.getReportType(), RunModeConstants.SET_REPORT.toString()));
String console;
if(dto.getRunMode().startsWith(ParamConstants.MODEL.RUN_MODEL_UI.getValue()) &&
StringUtils.equals(dto.getRunType(), RunModeConstants.PARALLEL.toString()) &&
StringUtils.equals(dto.getReportType(), RunModeConstants.SET_REPORT.toString())){
console = FixedCapacityUtils.getJmeterLogger(JmeterThreadUtils.getThreadName(dto), !StringUtils.equals(dto.getReportType(), RunModeConstants.SET_REPORT.toString()));
}else{
console = FixedCapacityUtils.getJmeterLogger(dto.getReportId(), !StringUtils.equals(dto.getReportType(), RunModeConstants.SET_REPORT.toString()));
}
if (FileUtils.isFolderExists(dto.getReportId())) {
console += "\r\n" + "tmp folder " + FileUtils.BODY_FILE_DIR + File.separator + dto.getReportId() + " has deleted.";
}
dto.setConsole(console);
// 入库存储
CommonBeanFactory.getBean(TestResultService.class).saveResults(dto);
LoggerUtil.info("进入TEST-END处理报告【" + dto.getReportId() + "" + dto.getRunMode() + " 整体执行完成");
// 全局并发队列
if(dto.getRunMode().startsWith(ParamConstants.MODEL.RUN_MODEL_UI.getValue()) &&
StringUtils.equals(dto.getRunType(), RunModeConstants.PARALLEL.toString()) &&
StringUtils.equals(dto.getReportType(), RunModeConstants.SET_REPORT.toString())){
PoolExecBlockingQueueUtil.offer(JmeterThreadUtils.getThreadName(dto));
}else{
PoolExecBlockingQueueUtil.offer(dto.getReportId());
}
// 整体执行结束更新资源状态
CommonBeanFactory.getBean(TestResultService.class).testEnded(dto);
if (apiExecutionQueueService == null) {
@ -95,8 +114,14 @@ public class MsApiBackendListener extends AbstractBackendListenerClient implemen
FileServer.getFileServer().closeCsv(dto.getReportId());
}
if (JMeterEngineCache.runningEngine.containsKey(dto.getReportId())) {
if(dto.getRunMode().startsWith(ParamConstants.MODEL.RUN_MODEL_UI.getValue()) &&
StringUtils.equals(dto.getRunType(), RunModeConstants.PARALLEL.toString()) &&
StringUtils.equals(dto.getReportType(), RunModeConstants.SET_REPORT.toString())){
JMeterEngineCache.runningEngine.remove(JmeterThreadUtils.getThreadName(dto));
}else{
JMeterEngineCache.runningEngine.remove(dto.getReportId());
}
}
queues.clear();
}
}

View File

@ -113,6 +113,7 @@ public interface ParamConstants {
enum BASE implements ParamConstants {
URL("base.url"),
CONCURRENCY("base.concurrency"),
GRID_CONCURRENCY("base.ui.concurrency"),
PROMETHEUS_HOST("base.prometheus.host"),
SELENIUM_DOCKER_URL("base.selenium.docker.url");
@ -148,4 +149,19 @@ public interface ParamConstants {
return value;
}
}
enum MODEL implements ParamConstants {
RUN_MODEL_UI("UI");
private String value;
MODEL(String value) {
this.value = value;
}
@Override
public String getValue() {
return value;
}
}
}

View File

@ -0,0 +1,37 @@
package io.metersphere.commons.utils;
import io.metersphere.dto.JmeterRunRequestDTO;
import io.metersphere.dto.ResultDTO;
public class JmeterThreadUtils {
/**
* 获取并行集合线程名称
*
* @param dto
* @return
*/
public static String getThreadName(ResultDTO dto){
return dto.getReportId()+ "@"+ dto.getTestId();
}
/**
* 获取线程名称
*
* @param dto
* @return
*/
public static String getThreadName(JmeterRunRequestDTO dto){
return dto.getReportId()+ "@"+ dto.getTestId();
}
/**
* 根据线程名称获取reportId
*
* @param threadName
* @return
*/
public static String getReportId(String threadName){
return threadName.split("@")[0];
}
}

View File

@ -1,10 +1,10 @@
package io.metersphere.listener;
import io.metersphere.api.exec.queue.ExecThreadPoolExecutor;
import io.metersphere.api.exec.queue.UiExecThreadPoolExecutor;
import io.metersphere.api.jmeter.JMeterService;
import io.metersphere.api.jmeter.NewDriverManager;
import io.metersphere.api.service.*;
import io.metersphere.base.domain.JarConfig;
import io.metersphere.commons.constants.ParamConstants;
import io.metersphere.commons.utils.CommonBeanFactory;
import io.metersphere.commons.utils.LogUtil;
import io.metersphere.commons.utils.RunInterface;
@ -21,7 +21,6 @@ import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
@Component
public class AppStartListener implements ApplicationListener<ApplicationReadyEvent> {
@ -94,6 +93,11 @@ public class AppStartListener implements ApplicationListener<ApplicationReadyEve
int size = Integer.parseInt(dto.getConcurrency());
CommonBeanFactory.getBean(ExecThreadPoolExecutor.class).setCorePoolSize(size);
}
String uiCorePoolSize = systemParameterService.getValue(ParamConstants.BASE.GRID_CONCURRENCY.getValue());
int corePoolSize = StringUtils.isEmpty(uiCorePoolSize) ? 4 : Integer.parseInt(uiCorePoolSize);
CommonBeanFactory.getBean(UiExecThreadPoolExecutor.class).setCorePoolSize(corePoolSize);
initPythonEnv();
//检查状态为开启的TCP-Mock服务端口