diff --git a/backend/src/main/java/io/metersphere/websocket/NotificationWebSocket.java b/backend/src/main/java/io/metersphere/websocket/NotificationWebSocket.java index 4a96ce5d7b..061a878a4c 100644 --- a/backend/src/main/java/io/metersphere/websocket/NotificationWebSocket.java +++ b/backend/src/main/java/io/metersphere/websocket/NotificationWebSocket.java @@ -7,21 +7,26 @@ import io.metersphere.commons.utils.LogUtil; import io.metersphere.notice.service.NotificationService; import lombok.Builder; import lombok.Data; +import org.apache.commons.collections4.CollectionUtils; import org.springframework.stereotype.Component; import javax.annotation.Resource; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; -@ServerEndpoint("/notification/count/{userId}/{random}") +@ServerEndpoint("/notification/count/{userId}") @Component public class NotificationWebSocket { private static NotificationService notificationService; - private static ConcurrentHashMap refreshTasks = new ConcurrentHashMap<>(); + private static final ConcurrentHashMap refreshTasks = new ConcurrentHashMap<>(); + private static final ConcurrentHashMap> sessionMap = new ConcurrentHashMap<>(); @Resource public void setNotificationService(NotificationService notificationService) { @@ -32,22 +37,45 @@ public class NotificationWebSocket { * 开启连接的操作 */ @OnOpen - public void onOpen(@PathParam("userId") String userId, @PathParam("random") double random, Session session) { - Timer timer = new Timer(true); - NotificationCenter task = new NotificationCenter(session, userId); + public void onOpen(@PathParam("userId") String userId, Session session) { + LogUtil.info("建立一个socket链接: {} - {}", userId, session.getId()); + Timer timer = refreshTasks.get(userId); + if (timer != null) { + timer.cancel(); + } + timer = new Timer(true); + NotificationCenter task = new NotificationCenter(userId); timer.schedule(task, 0, 10 * 1000); - refreshTasks.putIfAbsent(session, timer); + refreshTasks.put(userId, timer); + + Set sessions = sessionMap.getOrDefault(userId, new HashSet<>()); + sessions.add(session); + sessionMap.put(userId, sessions); + + LogUtil.info("在线socket链接: {}, size: {}", userId, sessions.size()); } /** * 连接关闭的操作 */ @OnClose - public void onClose(Session session) { - Timer timer = refreshTasks.get(session); - if (timer != null) { - timer.cancel(); - refreshTasks.remove(session); + public void onClose(@PathParam("userId") String userId, Session session) { + // 删除当前session + Set sessions = sessionMap.get(userId); + if (CollectionUtils.isNotEmpty(sessions)) { + LogUtil.info("剔除一个socket链接: {} - {}", userId, session.getId()); + sessions.remove(session); + } + + // 没有在线用户了 + if (CollectionUtils.isEmpty(sessions)) { + LogUtil.info("关闭socket: {} ", userId); + + Timer timer = refreshTasks.get(userId); + if (timer != null) { + timer.cancel(); + refreshTasks.remove(userId); + } } } @@ -56,21 +84,7 @@ public class NotificationWebSocket { */ @OnMessage public void onMessage(@PathParam("userId") String userId, Session session, String message) { - int refreshTime = 10; - try { - refreshTime = Integer.parseInt(message); - } catch (Exception e) { - } - try { - Timer timer = refreshTasks.get(session); - timer.cancel(); - - Timer newTimer = new Timer(true); - newTimer.schedule(new NotificationCenter(session, userId), 0, refreshTime * 1000L); - refreshTasks.put(session, newTimer); - } catch (Exception e) { - LogUtil.error(e.getMessage(), e); - } + LogUtil.info(message); } /** @@ -83,20 +97,15 @@ public class NotificationWebSocket { } public static class NotificationCenter extends TimerTask { - private Session session; private String userId; - NotificationCenter(Session session, String userId) { - this.session = session; + NotificationCenter(String userId) { this.userId = userId; } @Override public void run() { try { - if (!session.isOpen()) { - return; - } Notification notification = new Notification(); notification.setReceiver(userId); notification.setStatus(NotificationConstants.Status.UNREAD.name()); @@ -105,7 +114,17 @@ public class NotificationWebSocket { .count(count) .now(System.currentTimeMillis()) .build(); - session.getBasicRemote().sendText(JSON.toJSONString(message)); + + Set sessions = sessionMap.get(userId); + if (CollectionUtils.isNotEmpty(sessions)) { + sessions.forEach(session -> { + try { + session.getBasicRemote().sendText(JSON.toJSONString(message)); + } catch (IOException e) { + LogUtil.error(e.getMessage(), e); + } + }); + } } catch (Exception e) { LogUtil.error(e.getMessage(), e); } diff --git a/frontend/src/business/components/notice/Notification.vue b/frontend/src/business/components/notice/Notification.vue index d2fae2dc11..fcc3588051 100644 --- a/frontend/src/business/components/notice/Notification.vue +++ b/frontend/src/business/components/notice/Notification.vue @@ -84,6 +84,9 @@ export default { } } }, + beforeDestroy() { + this.websocket.close(); + }, methods: { getUserList() { this.$get('/user/ws/current/member/list', response => { @@ -99,7 +102,7 @@ export default { if (window.location.protocol === 'https:') { protocol = "wss://"; } - const uri = protocol + window.location.host + "/notification/count/" + getCurrentUserId() + "/" + Math.random(); + const uri = protocol + window.location.host + "/notification/count/" + getCurrentUserId(); this.websocket = new WebSocket(uri); this.websocket.onmessage = this.onMessage; this.websocket.onopen = this.onOpen; @@ -163,12 +166,10 @@ export default { type: 'info', message: message, }); - this.noticeShow = true; - }); - setTimeout(() => { // 弹出之后标记成已读 this.$get('/notification/read/' + d.id); - }, 5000) + this.noticeShow = true; + }); }); }); }