From 6b9e2447586ab8c7df117484e6d377848bb2e6cb Mon Sep 17 00:00:00 2001 From: fit2-zhao Date: Wed, 27 Dec 2023 17:24:56 +0800 Subject: [PATCH] =?UTF-8?q?feat(=E6=8E=A5=E5=8F=A3=E6=B5=8B=E8=AF=95):=20?= =?UTF-8?q?=E6=9C=AC=E5=9C=B0=E8=B0=83=E8=AF=95=E5=92=8C=E5=90=8E=E7=AB=AF?= =?UTF-8?q?=E8=B0=83=E8=AF=95=E7=BB=93=E6=9E=9C=E5=AE=9E=E6=97=B6=E6=8E=A5?= =?UTF-8?q?=E6=94=B6=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../sdk/constants/ApiExecuteRunMode.java | 10 ++- .../io/metersphere/sdk/constants/MsgType.java | 8 ++ .../io/metersphere/sdk/dto/SocketMsgDTO.java | 38 ++++++++ .../metersphere/sdk/util/WebSocketUtils.java | 57 ++++++++++++ .../api/listener/DebugListener.java | 32 +++++++ .../api/service/debug/ApiDebugService.java | 2 +- .../api/socket/WebSocketHandler.java | 73 ++++++++++++++++ .../api/listener/DebugListenerTest.java | 60 +++++++++++++ .../api/socket/WebSocketHandlerTest.java | 87 +++++++++++++++++++ 9 files changed, 364 insertions(+), 3 deletions(-) create mode 100644 backend/framework/sdk/src/main/java/io/metersphere/sdk/constants/MsgType.java create mode 100644 backend/framework/sdk/src/main/java/io/metersphere/sdk/dto/SocketMsgDTO.java create mode 100644 backend/framework/sdk/src/main/java/io/metersphere/sdk/util/WebSocketUtils.java create mode 100644 backend/services/api-test/src/main/java/io/metersphere/api/listener/DebugListener.java create mode 100644 backend/services/api-test/src/main/java/io/metersphere/api/socket/WebSocketHandler.java create mode 100644 backend/services/api-test/src/test/java/io/metersphere/api/listener/DebugListenerTest.java create mode 100644 backend/services/api-test/src/test/java/io/metersphere/api/socket/WebSocketHandlerTest.java diff --git a/backend/framework/sdk/src/main/java/io/metersphere/sdk/constants/ApiExecuteRunMode.java b/backend/framework/sdk/src/main/java/io/metersphere/sdk/constants/ApiExecuteRunMode.java index 13f024c346..f6dda4b7c6 100644 --- a/backend/framework/sdk/src/main/java/io/metersphere/sdk/constants/ApiExecuteRunMode.java +++ b/backend/framework/sdk/src/main/java/io/metersphere/sdk/constants/ApiExecuteRunMode.java @@ -2,6 +2,7 @@ package io.metersphere.sdk.constants; /** * 接口执行时的资源类型 + * * @Author: jianxing * @CreateTime: 2023-12-08 10:53 */ @@ -11,9 +12,14 @@ public enum ApiExecuteRunMode { */ RUN, /** - * 调试 + * 前端调试 */ - DEBUG, + FRONTEND_DEBUG, + /** + * 后端调试 + */ + BACKEND_DEBUG, + /** * jenkins 触发 */ diff --git a/backend/framework/sdk/src/main/java/io/metersphere/sdk/constants/MsgType.java b/backend/framework/sdk/src/main/java/io/metersphere/sdk/constants/MsgType.java new file mode 100644 index 0000000000..b7efd01ee4 --- /dev/null +++ b/backend/framework/sdk/src/main/java/io/metersphere/sdk/constants/MsgType.java @@ -0,0 +1,8 @@ +package io.metersphere.sdk.constants; + +public enum MsgType { + CONNECT, // 链接标识 + HEARTBEAT, // 心跳检查标识 + EXEC_START, // 开始执行标识 + EXEC_RESULT; // 执行结果标识 +} diff --git a/backend/framework/sdk/src/main/java/io/metersphere/sdk/dto/SocketMsgDTO.java b/backend/framework/sdk/src/main/java/io/metersphere/sdk/dto/SocketMsgDTO.java new file mode 100644 index 0000000000..38f8a34933 --- /dev/null +++ b/backend/framework/sdk/src/main/java/io/metersphere/sdk/dto/SocketMsgDTO.java @@ -0,0 +1,38 @@ +package io.metersphere.sdk.dto; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serial; +import java.io.Serializable; + +/** + * + */ +@AllArgsConstructor +@NoArgsConstructor +@Builder +@Data +public class SocketMsgDTO implements Serializable { + @Serial + private static final long serialVersionUID = 1L; + /** + * 报告ID + */ + private String reportId; + /** + * 执行模式 + */ + private String runMode; + /** + * 消息类型(LINK-链接标识,HEARTBEAT-心跳检查标识,EXEC_START-开始执行标识,EXEC_RESULT-执行结果标识) + */ + private String msgType; + /** + * 结果内容 + */ + private String taskResult; + +} diff --git a/backend/framework/sdk/src/main/java/io/metersphere/sdk/util/WebSocketUtils.java b/backend/framework/sdk/src/main/java/io/metersphere/sdk/util/WebSocketUtils.java new file mode 100644 index 0000000000..e9fe774ec2 --- /dev/null +++ b/backend/framework/sdk/src/main/java/io/metersphere/sdk/util/WebSocketUtils.java @@ -0,0 +1,57 @@ +package io.metersphere.sdk.util; + +import io.metersphere.sdk.dto.SocketMsgDTO; +import jakarta.websocket.RemoteEndpoint; +import jakarta.websocket.Session; +import org.apache.commons.lang3.StringUtils; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + +public class WebSocketUtils { + public static final Map ONLINE_USER_SESSIONS = new ConcurrentHashMap<>(); + + // 单用户推送 + public static void sendMessage(Session session, SocketMsgDTO message) { + if (session == null) { + return; + } + // 替换了web容器后 jetty没有设置永久有效的参数,这里暂时设置超时时间为一天 + session.setMaxIdleTimeout(86400000L); + RemoteEndpoint.Async async = session.getAsyncRemote(); + if (async == null) { + return; + } + async.sendText(JSON.toJSONString(message)); + } + + // 单用户推送 + public static void sendMessageSingle(SocketMsgDTO dto) { + sendMessage(ONLINE_USER_SESSIONS.get(Optional.ofNullable(dto.getReportId()) + .orElse(StringUtils.EMPTY)), dto); + } + + // 全用户推送 + public static void sendMessageAll(SocketMsgDTO message) { + ONLINE_USER_SESSIONS.forEach((sessionId, session) -> { + sendMessage(session, message); + }); + } + + public static boolean has(String key) { + return StringUtils.isNotEmpty(key) && ONLINE_USER_SESSIONS.containsKey(key); + } + + //当前的Session 移除 + public static void onClose(String reportId) { + try { + if (WebSocketUtils.ONLINE_USER_SESSIONS.containsKey(reportId)) { + WebSocketUtils.ONLINE_USER_SESSIONS.get(reportId).close(); + WebSocketUtils.ONLINE_USER_SESSIONS.remove(reportId); + } + } catch (Exception e) { + LogUtils.error("关闭socket失败:", e); + } + } +} diff --git a/backend/services/api-test/src/main/java/io/metersphere/api/listener/DebugListener.java b/backend/services/api-test/src/main/java/io/metersphere/api/listener/DebugListener.java new file mode 100644 index 0000000000..5d0ad474b5 --- /dev/null +++ b/backend/services/api-test/src/main/java/io/metersphere/api/listener/DebugListener.java @@ -0,0 +1,32 @@ +package io.metersphere.api.listener; + +import io.metersphere.sdk.constants.KafkaTopicConstants; +import io.metersphere.sdk.dto.SocketMsgDTO; +import io.metersphere.sdk.util.JSON; +import io.metersphere.sdk.util.LogUtils; +import io.metersphere.sdk.util.WebSocketUtils; +import org.apache.commons.lang3.ObjectUtils; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.KafkaListener; + +/** + * 后端调试消息监听器 + */ +@Configuration +public class DebugListener { + public static final String DEBUG_CONSUME_ID = "MS-API-DEBUG-CONSUME"; + + @KafkaListener(id = DEBUG_CONSUME_ID, topics = KafkaTopicConstants.API_REPORT_DEBUG_TOPIC, groupId = DEBUG_CONSUME_ID + "_" + "${random.uuid}") + public void debugConsume(ConsumerRecord record) { + try { + LogUtils.info("接收到执行结果:", record.key()); + if (ObjectUtils.isNotEmpty(record.value()) && WebSocketUtils.has(record.key().toString())) { + SocketMsgDTO dto = JSON.parseObject(record.value(), SocketMsgDTO.class); + WebSocketUtils.sendMessageSingle(dto); + } + } catch (Exception e) { + LogUtils.error("调试消息推送失败:", e); + } + } +} diff --git a/backend/services/api-test/src/main/java/io/metersphere/api/service/debug/ApiDebugService.java b/backend/services/api-test/src/main/java/io/metersphere/api/service/debug/ApiDebugService.java index ac3e3b8325..1a61672775 100644 --- a/backend/services/api-test/src/main/java/io/metersphere/api/service/debug/ApiDebugService.java +++ b/backend/services/api-test/src/main/java/io/metersphere/api/service/debug/ApiDebugService.java @@ -179,7 +179,7 @@ public class ApiDebugService { runRequest.setTestId(id); runRequest.setReportId(id); runRequest.setResourceType(ApiResourceType.API_DEBUG.name()); - runRequest.setRunMode(ApiExecuteRunMode.DEBUG.name()); + runRequest.setRunMode(ApiExecuteRunMode.BACKEND_DEBUG.name()); apiExecuteService.debug(runRequest); diff --git a/backend/services/api-test/src/main/java/io/metersphere/api/socket/WebSocketHandler.java b/backend/services/api-test/src/main/java/io/metersphere/api/socket/WebSocketHandler.java new file mode 100644 index 0000000000..e85af270da --- /dev/null +++ b/backend/services/api-test/src/main/java/io/metersphere/api/socket/WebSocketHandler.java @@ -0,0 +1,73 @@ +package io.metersphere.api.socket; + +import com.fit2cloud.quartz.anno.QuartzScheduled; +import io.metersphere.sdk.constants.MsgType; +import io.metersphere.sdk.dto.SocketMsgDTO; +import io.metersphere.sdk.util.JSON; +import io.metersphere.sdk.util.LogUtils; +import io.metersphere.sdk.util.WebSocketUtils; +import jakarta.websocket.*; +import jakarta.websocket.server.PathParam; +import jakarta.websocket.server.ServerEndpoint; +import org.springframework.stereotype.Component; + +import java.io.IOException; + +@Component +@ServerEndpoint("/ws/api/{reportId}") +public class WebSocketHandler { + /** + * 连接成功响应 + */ + @OnOpen + public void openSession(@PathParam("reportId") String reportId, Session session) { + WebSocketUtils.ONLINE_USER_SESSIONS.put(reportId, session); + RemoteEndpoint.Async async = session.getAsyncRemote(); + if (async != null) { + async.sendText(JSON.toJSONString(new SocketMsgDTO(reportId, "", MsgType.CONNECT.name(), MsgType.CONNECT.name()))); + session.setMaxIdleTimeout(180000); + } + LogUtils.info("客户端: [" + reportId + "] : 连接成功!" + WebSocketUtils.ONLINE_USER_SESSIONS.size(), reportId); + } + + /** + * 收到消息响应 + */ + @OnMessage + public void onMessage(@PathParam("reportId") String reportId, String message) { + LogUtils.info("服务器收到:[" + reportId + "] : " + message); + SocketMsgDTO dto = JSON.parseObject(message, SocketMsgDTO.class); + WebSocketUtils.sendMessageSingle(dto); + } + + /** + * 连接关闭响应 + */ + @OnClose + public void onClose(@PathParam("reportId") String reportId, Session session) throws IOException { + //当前的Session 移除 + WebSocketUtils.ONLINE_USER_SESSIONS.remove(reportId); + LogUtils.info("[" + reportId + "] : 断开连接!" + WebSocketUtils.ONLINE_USER_SESSIONS.size()); + //并且通知其他人当前用户已经断开连接了 + session.close(); + } + + /** + * 连接异常响应 + */ + @OnError + public void onError(Session session, Throwable throwable) throws IOException { + LogUtils.error("连接异常响应", throwable); + session.close(); + } + + /** + * 每一分钟群发一次心跳检查 + */ + @QuartzScheduled(cron = "0 0/1 * * * ?") + public void heartbeatCheck() { + WebSocketUtils.sendMessageAll( + new SocketMsgDTO(MsgType.HEARTBEAT.name(), MsgType.HEARTBEAT.name(), MsgType.HEARTBEAT.name(), "heartbeat check") + ); + } +} diff --git a/backend/services/api-test/src/test/java/io/metersphere/api/listener/DebugListenerTest.java b/backend/services/api-test/src/test/java/io/metersphere/api/listener/DebugListenerTest.java new file mode 100644 index 0000000000..727f2ac899 --- /dev/null +++ b/backend/services/api-test/src/test/java/io/metersphere/api/listener/DebugListenerTest.java @@ -0,0 +1,60 @@ +package io.metersphere.api.listener; + +import io.metersphere.api.socket.WebSocketHandler; +import io.metersphere.sdk.constants.KafkaTopicConstants; +import io.metersphere.sdk.constants.MsgType; +import io.metersphere.sdk.dto.SocketMsgDTO; +import io.metersphere.sdk.util.WebSocketUtils; +import jakarta.websocket.RemoteEndpoint; +import jakarta.websocket.Session; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; +import org.springframework.boot.test.context.SpringBootTest; + +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +@AutoConfigureMockMvc +public class DebugListenerTest { + + @InjectMocks + private DebugListener debugListener; + @Mock + private Session session; + @InjectMocks + private WebSocketHandler webSocketHandler; + + @Mock + private RemoteEndpoint.Async async; + + @Test + void testDebugConsume() { + testOpenSession(); + // 模拟参数 + ConsumerRecord record = new ConsumerRecord<>(KafkaTopicConstants.API_REPORT_DEBUG_TOPIC, 0, 0, "123", "value"); + // 调用被测试方法 + debugListener.debugConsume(record); + String reportId = "123"; + WebSocketUtils.sendMessageSingle(new SocketMsgDTO(reportId, "2", MsgType.CONNECT.name(), "test")); + } + + void testOpenSession() { + // 模拟参数 + String reportId = "123"; + when(session.getAsyncRemote()).thenReturn(async); + + // 调用被测试方法 + webSocketHandler.openSession(reportId, session); + + // 验证行为 + verify(session).setMaxIdleTimeout(180000); + // 这里可以添加更多的验证,例如检查 ONLINE_USER_SESSIONS 的状态等 + } +} diff --git a/backend/services/api-test/src/test/java/io/metersphere/api/socket/WebSocketHandlerTest.java b/backend/services/api-test/src/test/java/io/metersphere/api/socket/WebSocketHandlerTest.java new file mode 100644 index 0000000000..a81a6b4533 --- /dev/null +++ b/backend/services/api-test/src/test/java/io/metersphere/api/socket/WebSocketHandlerTest.java @@ -0,0 +1,87 @@ +package io.metersphere.api.socket; + +import io.metersphere.sdk.constants.MsgType; +import io.metersphere.sdk.dto.SocketMsgDTO; +import io.metersphere.sdk.util.JSON; +import jakarta.websocket.RemoteEndpoint; +import jakarta.websocket.Session; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; +import org.springframework.boot.test.context.SpringBootTest; + +import java.io.IOException; + +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +@AutoConfigureMockMvc +public class WebSocketHandlerTest { + @Mock + private Session session; + + @Mock + private RemoteEndpoint.Async async; + + @InjectMocks + private WebSocketHandler webSocketHandler; + + @Test + void testOpenSession() throws IOException { + // 模拟参数 + String reportId = "123"; + when(session.getAsyncRemote()).thenReturn(async); + + // 调用被测试方法 + webSocketHandler.openSession(reportId, session); + + // 验证行为 + verify(session).setMaxIdleTimeout(180000); + // 这里可以添加更多的验证,例如检查 ONLINE_USER_SESSIONS 的状态等 + } + + @Test + void testOnMessage() { + // 模拟参数 + String reportId = "123"; + String message = JSON.toJSONString(new SocketMsgDTO(reportId, "2", MsgType.CONNECT.name(), "test")); + when(session.getAsyncRemote()).thenReturn(async); + + // 调用被测试方法 + webSocketHandler.onMessage(reportId, message); + + // 验证行为 + // 这里可以添加更多的验证,例如检查 sendMessageSingle 的调用情况等 + } + + @Test + void testOnClose() throws IOException { + // 模拟参数 + String reportId = "123"; + + // 调用被测试方法 + webSocketHandler.onClose(reportId, session); + + // 验证行为 + verify(session).close(); + // 这里可以添加更多的验证,例如检查 ONLINE_USER_SESSIONS 的状态等 + } + + @Test + void testOnError() throws IOException { + // 模拟参数 + when(session.getAsyncRemote()).thenReturn(async); + + // 调用被测试方法 + webSocketHandler.onError(session, new Throwable("Error")); + + // 验证行为 + verify(session).close(); + } + +}