refactor(接口测试): 执行相关websocket 处理优化防止数据堆积情况
This commit is contained in:
parent
d3a601f96d
commit
6f27c43a24
|
@ -88,6 +88,10 @@ public class APIBackendListenerClient extends AbstractBackendListenerClient impl
|
||||||
testResult.getScenarios().sort(Comparator.comparing(ScenarioResult::getId));
|
testResult.getScenarios().sort(Comparator.comparing(ScenarioResult::getId));
|
||||||
testResult.setConsole(resultService.getJmeterLogger(testId, true));
|
testResult.setConsole(resultService.getJmeterLogger(testId, true));
|
||||||
testResultService.saveResult(testResult, this.runMode, this.debugReportId, this.testId);
|
testResultService.saveResult(testResult, this.runMode, this.debugReportId, this.testId);
|
||||||
|
// 清除已经中断的过程数据
|
||||||
|
if (!MessageCache.reportCache.containsKey(testId) && resultService.processCache.containsKey(testId)) {
|
||||||
|
resultService.processCache.remove(testId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setParam(BackendListenerContext context) {
|
private void setParam(BackendListenerContext context) {
|
||||||
|
|
|
@ -1,8 +1,13 @@
|
||||||
package io.metersphere.api.jmeter;
|
package io.metersphere.api.jmeter;
|
||||||
|
|
||||||
|
import javax.websocket.Session;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
public class MessageCache {
|
public class MessageCache {
|
||||||
public static Map<String, ReportCounter> cache = new HashMap<>();
|
public static Map<String, ReportCounter> cache = new HashMap<>();
|
||||||
|
|
||||||
|
public static ConcurrentHashMap<String, Session> reportCache = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,13 +15,14 @@ import org.springframework.stereotype.Service;
|
||||||
import sun.security.util.Cache;
|
import sun.security.util.Cache;
|
||||||
|
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
public class MsResultService {
|
public class MsResultService {
|
||||||
// 零时存放实时结果
|
// 零时存放实时结果
|
||||||
private Cache cache = Cache.newHardMemoryCache(0, 3600 * 2);
|
private Cache cache = Cache.newHardMemoryCache(0, 3600 * 2);
|
||||||
private Map<String, List<SampleResult>> processCache = new HashMap<>();
|
public ConcurrentHashMap<String, List<SampleResult>> processCache = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
private final static String THREAD_SPLIT = " ";
|
private final static String THREAD_SPLIT = " ";
|
||||||
|
|
||||||
|
@ -91,6 +92,7 @@ public class MsResultService {
|
||||||
|
|
||||||
public void delete(String testId) {
|
public void delete(String testId) {
|
||||||
this.cache.remove(testId);
|
this.cache.remove(testId);
|
||||||
|
MessageCache.reportCache.remove(testId);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void formatTestResult(TestResult testResult, Map<String, ScenarioResult> scenarios, SampleResult result) {
|
public void formatTestResult(TestResult testResult, Map<String, ScenarioResult> scenarios, SampleResult result) {
|
||||||
|
|
|
@ -1,18 +1,22 @@
|
||||||
package io.metersphere.websocket;
|
package io.metersphere.websocket;
|
||||||
|
|
||||||
import com.alibaba.fastjson.JSON;
|
import com.alibaba.fastjson.JSON;
|
||||||
|
import io.metersphere.api.jmeter.MessageCache;
|
||||||
import io.metersphere.api.jmeter.TestResult;
|
import io.metersphere.api.jmeter.TestResult;
|
||||||
import io.metersphere.api.service.MsResultService;
|
import io.metersphere.api.service.MsResultService;
|
||||||
import io.metersphere.commons.utils.LogUtil;
|
import io.metersphere.commons.utils.LogUtil;
|
||||||
|
import org.apache.commons.collections4.CollectionUtils;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
import javax.websocket.*;
|
import javax.websocket.*;
|
||||||
import javax.websocket.server.PathParam;
|
import javax.websocket.server.PathParam;
|
||||||
import javax.websocket.server.ServerEndpoint;
|
import javax.websocket.server.ServerEndpoint;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Timer;
|
import java.util.Timer;
|
||||||
import java.util.TimerTask;
|
import java.util.TimerTask;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
@ServerEndpoint("/api/scenario/report/get/real/{reportId}")
|
@ServerEndpoint("/api/scenario/report/get/real/{reportId}")
|
||||||
@Component
|
@Component
|
||||||
|
@ -31,6 +35,7 @@ public class ScenarioReportWebSocket {
|
||||||
*/
|
*/
|
||||||
@OnOpen
|
@OnOpen
|
||||||
public void onOpen(@PathParam("reportId") String reportId, Session session) {
|
public void onOpen(@PathParam("reportId") String reportId, Session session) {
|
||||||
|
MessageCache.reportCache.put(reportId, session);
|
||||||
Timer timer = new Timer(true);
|
Timer timer = new Timer(true);
|
||||||
ApiDebugResultTask task = new ApiDebugResultTask(session, reportId);
|
ApiDebugResultTask task = new ApiDebugResultTask(session, reportId);
|
||||||
timer.schedule(task, 0, 1000);
|
timer.schedule(task, 0, 1000);
|
||||||
|
@ -47,6 +52,13 @@ public class ScenarioReportWebSocket {
|
||||||
timer.cancel();
|
timer.cancel();
|
||||||
refreshTasks.remove(session);
|
refreshTasks.remove(session);
|
||||||
}
|
}
|
||||||
|
// 清理掉过程数据
|
||||||
|
List<String> reports = MessageCache.reportCache.entrySet().stream()
|
||||||
|
.filter(x -> x.getValue() == session).map(x -> x.getKey())
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
if (CollectionUtils.isNotEmpty(reports)) {
|
||||||
|
resultService.delete(reports.get(0));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -23,6 +23,7 @@ export default {
|
||||||
loading: false,
|
loading: false,
|
||||||
runId: "",
|
runId: "",
|
||||||
reqNumber: 0,
|
reqNumber: 0,
|
||||||
|
websocket: {}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
|
@ -42,26 +43,14 @@ export default {
|
||||||
const uri = protocol + window.location.host + "/api/definition/run/report/" + this.runId + "/" + runMode;
|
const uri = protocol + window.location.host + "/api/definition/run/report/" + this.runId + "/" + runMode;
|
||||||
this.websocket = new WebSocket(uri);
|
this.websocket = new WebSocket(uri);
|
||||||
this.websocket.onmessage = this.onMessage;
|
this.websocket.onmessage = this.onMessage;
|
||||||
this.websocket.onopen = this.onOpen;
|
|
||||||
this.websocket.onerror = this.onError;
|
|
||||||
this.websocket.onclose = this.onClose;
|
|
||||||
},
|
|
||||||
onOpen() {
|
|
||||||
},
|
|
||||||
onError(e) {
|
|
||||||
this.$emit('runRefresh', {});
|
|
||||||
},
|
},
|
||||||
onMessage(e) {
|
onMessage(e) {
|
||||||
if (e.data) {
|
if (e.data) {
|
||||||
let data = JSON.parse(e.data);
|
let data = JSON.parse(e.data);
|
||||||
|
this.websocket.close();
|
||||||
this.$emit('runRefresh', data);
|
this.$emit('runRefresh', data);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
onClose(e) {
|
|
||||||
if (e.code === 1005) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
},
|
|
||||||
run() {
|
run() {
|
||||||
let projectId = getCurrentProjectID();
|
let projectId = getCurrentProjectID();
|
||||||
// 如果envMap不存在,是单接口调用
|
// 如果envMap不存在,是单接口调用
|
||||||
|
|
Loading…
Reference in New Issue