feat(接口测试): 本地调试和后端调试结果实时接收处理

This commit is contained in:
fit2-zhao 2023-12-27 17:24:56 +08:00 committed by Craftsman
parent 8f203d6a44
commit 6b9e244758
9 changed files with 364 additions and 3 deletions

View File

@ -2,6 +2,7 @@ package io.metersphere.sdk.constants;
/**
* 接口执行时的资源类型
*
* @Author: jianxing
* @CreateTime: 2023-12-08 10:53
*/
@ -11,9 +12,14 @@ public enum ApiExecuteRunMode {
*/
RUN,
/**
* 调试
* 前端调试
*/
DEBUG,
FRONTEND_DEBUG,
/**
* 后端调试
*/
BACKEND_DEBUG,
/**
* jenkins 触发
*/

View File

@ -0,0 +1,8 @@
package io.metersphere.sdk.constants;
public enum MsgType {
CONNECT, // 链接标识
HEARTBEAT, // 心跳检查标识
EXEC_START, // 开始执行标识
EXEC_RESULT; // 执行结果标识
}

View File

@ -0,0 +1,38 @@
package io.metersphere.sdk.dto;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serial;
import java.io.Serializable;
/**
*
*/
@AllArgsConstructor
@NoArgsConstructor
@Builder
@Data
public class SocketMsgDTO implements Serializable {
@Serial
private static final long serialVersionUID = 1L;
/**
* 报告ID
*/
private String reportId;
/**
* 执行模式
*/
private String runMode;
/**
* 消息类型LINK-链接标识HEARTBEAT-心跳检查标识EXEC_START-开始执行标识EXEC_RESULT-执行结果标识
*/
private String msgType;
/**
* 结果内容
*/
private String taskResult;
}

View File

@ -0,0 +1,57 @@
package io.metersphere.sdk.util;
import io.metersphere.sdk.dto.SocketMsgDTO;
import jakarta.websocket.RemoteEndpoint;
import jakarta.websocket.Session;
import org.apache.commons.lang3.StringUtils;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
public class WebSocketUtils {
public static final Map<String, Session> ONLINE_USER_SESSIONS = new ConcurrentHashMap<>();
// 单用户推送
public static void sendMessage(Session session, SocketMsgDTO message) {
if (session == null) {
return;
}
// 替换了web容器后 jetty没有设置永久有效的参数这里暂时设置超时时间为一天
session.setMaxIdleTimeout(86400000L);
RemoteEndpoint.Async async = session.getAsyncRemote();
if (async == null) {
return;
}
async.sendText(JSON.toJSONString(message));
}
// 单用户推送
public static void sendMessageSingle(SocketMsgDTO dto) {
sendMessage(ONLINE_USER_SESSIONS.get(Optional.ofNullable(dto.getReportId())
.orElse(StringUtils.EMPTY)), dto);
}
// 全用户推送
public static void sendMessageAll(SocketMsgDTO message) {
ONLINE_USER_SESSIONS.forEach((sessionId, session) -> {
sendMessage(session, message);
});
}
public static boolean has(String key) {
return StringUtils.isNotEmpty(key) && ONLINE_USER_SESSIONS.containsKey(key);
}
//当前的Session 移除
public static void onClose(String reportId) {
try {
if (WebSocketUtils.ONLINE_USER_SESSIONS.containsKey(reportId)) {
WebSocketUtils.ONLINE_USER_SESSIONS.get(reportId).close();
WebSocketUtils.ONLINE_USER_SESSIONS.remove(reportId);
}
} catch (Exception e) {
LogUtils.error("关闭socket失败", e);
}
}
}

View File

@ -0,0 +1,32 @@
package io.metersphere.api.listener;
import io.metersphere.sdk.constants.KafkaTopicConstants;
import io.metersphere.sdk.dto.SocketMsgDTO;
import io.metersphere.sdk.util.JSON;
import io.metersphere.sdk.util.LogUtils;
import io.metersphere.sdk.util.WebSocketUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
/**
* 后端调试消息监听器
*/
@Configuration
public class DebugListener {
public static final String DEBUG_CONSUME_ID = "MS-API-DEBUG-CONSUME";
@KafkaListener(id = DEBUG_CONSUME_ID, topics = KafkaTopicConstants.API_REPORT_DEBUG_TOPIC, groupId = DEBUG_CONSUME_ID + "_" + "${random.uuid}")
public void debugConsume(ConsumerRecord<?, String> record) {
try {
LogUtils.info("接收到执行结果:", record.key());
if (ObjectUtils.isNotEmpty(record.value()) && WebSocketUtils.has(record.key().toString())) {
SocketMsgDTO dto = JSON.parseObject(record.value(), SocketMsgDTO.class);
WebSocketUtils.sendMessageSingle(dto);
}
} catch (Exception e) {
LogUtils.error("调试消息推送失败:", e);
}
}
}

View File

@ -179,7 +179,7 @@ public class ApiDebugService {
runRequest.setTestId(id);
runRequest.setReportId(id);
runRequest.setResourceType(ApiResourceType.API_DEBUG.name());
runRequest.setRunMode(ApiExecuteRunMode.DEBUG.name());
runRequest.setRunMode(ApiExecuteRunMode.BACKEND_DEBUG.name());
apiExecuteService.debug(runRequest);

View File

@ -0,0 +1,73 @@
package io.metersphere.api.socket;
import com.fit2cloud.quartz.anno.QuartzScheduled;
import io.metersphere.sdk.constants.MsgType;
import io.metersphere.sdk.dto.SocketMsgDTO;
import io.metersphere.sdk.util.JSON;
import io.metersphere.sdk.util.LogUtils;
import io.metersphere.sdk.util.WebSocketUtils;
import jakarta.websocket.*;
import jakarta.websocket.server.PathParam;
import jakarta.websocket.server.ServerEndpoint;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
@ServerEndpoint("/ws/api/{reportId}")
public class WebSocketHandler {
/**
* 连接成功响应
*/
@OnOpen
public void openSession(@PathParam("reportId") String reportId, Session session) {
WebSocketUtils.ONLINE_USER_SESSIONS.put(reportId, session);
RemoteEndpoint.Async async = session.getAsyncRemote();
if (async != null) {
async.sendText(JSON.toJSONString(new SocketMsgDTO(reportId, "", MsgType.CONNECT.name(), MsgType.CONNECT.name())));
session.setMaxIdleTimeout(180000);
}
LogUtils.info("客户端: [" + reportId + "] : 连接成功!" + WebSocketUtils.ONLINE_USER_SESSIONS.size(), reportId);
}
/**
* 收到消息响应
*/
@OnMessage
public void onMessage(@PathParam("reportId") String reportId, String message) {
LogUtils.info("服务器收到:[" + reportId + "] : " + message);
SocketMsgDTO dto = JSON.parseObject(message, SocketMsgDTO.class);
WebSocketUtils.sendMessageSingle(dto);
}
/**
* 连接关闭响应
*/
@OnClose
public void onClose(@PathParam("reportId") String reportId, Session session) throws IOException {
//当前的Session 移除
WebSocketUtils.ONLINE_USER_SESSIONS.remove(reportId);
LogUtils.info("[" + reportId + "] : 断开连接!" + WebSocketUtils.ONLINE_USER_SESSIONS.size());
//并且通知其他人当前用户已经断开连接了
session.close();
}
/**
* 连接异常响应
*/
@OnError
public void onError(Session session, Throwable throwable) throws IOException {
LogUtils.error("连接异常响应", throwable);
session.close();
}
/**
* 每一分钟群发一次心跳检查
*/
@QuartzScheduled(cron = "0 0/1 * * * ?")
public void heartbeatCheck() {
WebSocketUtils.sendMessageAll(
new SocketMsgDTO(MsgType.HEARTBEAT.name(), MsgType.HEARTBEAT.name(), MsgType.HEARTBEAT.name(), "heartbeat check")
);
}
}

View File

@ -0,0 +1,60 @@
package io.metersphere.api.listener;
import io.metersphere.api.socket.WebSocketHandler;
import io.metersphere.sdk.constants.KafkaTopicConstants;
import io.metersphere.sdk.constants.MsgType;
import io.metersphere.sdk.dto.SocketMsgDTO;
import io.metersphere.sdk.util.WebSocketUtils;
import jakarta.websocket.RemoteEndpoint;
import jakarta.websocket.Session;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
import org.springframework.boot.test.context.SpringBootTest;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@AutoConfigureMockMvc
public class DebugListenerTest {
@InjectMocks
private DebugListener debugListener;
@Mock
private Session session;
@InjectMocks
private WebSocketHandler webSocketHandler;
@Mock
private RemoteEndpoint.Async async;
@Test
void testDebugConsume() {
testOpenSession();
// 模拟参数
ConsumerRecord<Object, String> record = new ConsumerRecord<>(KafkaTopicConstants.API_REPORT_DEBUG_TOPIC, 0, 0, "123", "value");
// 调用被测试方法
debugListener.debugConsume(record);
String reportId = "123";
WebSocketUtils.sendMessageSingle(new SocketMsgDTO(reportId, "2", MsgType.CONNECT.name(), "test"));
}
void testOpenSession() {
// 模拟参数
String reportId = "123";
when(session.getAsyncRemote()).thenReturn(async);
// 调用被测试方法
webSocketHandler.openSession(reportId, session);
// 验证行为
verify(session).setMaxIdleTimeout(180000);
// 这里可以添加更多的验证例如检查 ONLINE_USER_SESSIONS 的状态等
}
}

View File

@ -0,0 +1,87 @@
package io.metersphere.api.socket;
import io.metersphere.sdk.constants.MsgType;
import io.metersphere.sdk.dto.SocketMsgDTO;
import io.metersphere.sdk.util.JSON;
import jakarta.websocket.RemoteEndpoint;
import jakarta.websocket.Session;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
import org.springframework.boot.test.context.SpringBootTest;
import java.io.IOException;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@AutoConfigureMockMvc
public class WebSocketHandlerTest {
@Mock
private Session session;
@Mock
private RemoteEndpoint.Async async;
@InjectMocks
private WebSocketHandler webSocketHandler;
@Test
void testOpenSession() throws IOException {
// 模拟参数
String reportId = "123";
when(session.getAsyncRemote()).thenReturn(async);
// 调用被测试方法
webSocketHandler.openSession(reportId, session);
// 验证行为
verify(session).setMaxIdleTimeout(180000);
// 这里可以添加更多的验证例如检查 ONLINE_USER_SESSIONS 的状态等
}
@Test
void testOnMessage() {
// 模拟参数
String reportId = "123";
String message = JSON.toJSONString(new SocketMsgDTO(reportId, "2", MsgType.CONNECT.name(), "test"));
when(session.getAsyncRemote()).thenReturn(async);
// 调用被测试方法
webSocketHandler.onMessage(reportId, message);
// 验证行为
// 这里可以添加更多的验证例如检查 sendMessageSingle 的调用情况等
}
@Test
void testOnClose() throws IOException {
// 模拟参数
String reportId = "123";
// 调用被测试方法
webSocketHandler.onClose(reportId, session);
// 验证行为
verify(session).close();
// 这里可以添加更多的验证例如检查 ONLINE_USER_SESSIONS 的状态等
}
@Test
void testOnError() throws IOException {
// 模拟参数
when(session.getAsyncRemote()).thenReturn(async);
// 调用被测试方法
webSocketHandler.onError(session, new Throwable("Error"));
// 验证行为
verify(session).close();
}
}