refactor(消息通知): 站内消息刷新优化

--bug=1016011 --user=刘瑞斌 【消息通知】执行接口case时有时收到多条系统站内通知,有时不会弹出站内通知 https://www.tapd.cn/55049933/s/1233494
This commit is contained in:
CaptainB 2022-08-24 19:31:52 +08:00 committed by 刘瑞斌
parent 0214100d14
commit 9f5b9f158f
2 changed files with 53 additions and 31 deletions

View File

@ -7,12 +7,16 @@ import io.metersphere.commons.utils.LogUtil;
import io.metersphere.notice.service.NotificationService; import io.metersphere.notice.service.NotificationService;
import lombok.Builder; import lombok.Builder;
import lombok.Data; import lombok.Data;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
import javax.websocket.*; import javax.websocket.*;
import javax.websocket.server.PathParam; import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint; import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -21,7 +25,8 @@ import java.util.concurrent.ConcurrentHashMap;
@Component @Component
public class NotificationWebSocket { public class NotificationWebSocket {
private static NotificationService notificationService; private static NotificationService notificationService;
private static ConcurrentHashMap<Session, Timer> refreshTasks = new ConcurrentHashMap<>(); private static final ConcurrentHashMap<String, Timer> refreshTasks = new ConcurrentHashMap<>();
private static final ConcurrentHashMap<String, Set<Session>> sessionMap = new ConcurrentHashMap<>();
@Resource @Resource
public void setNotificationService(NotificationService notificationService) { public void setNotificationService(NotificationService notificationService) {
@ -33,21 +38,44 @@ public class NotificationWebSocket {
*/ */
@OnOpen @OnOpen
public void onOpen(@PathParam("userId") String userId, Session session) { public void onOpen(@PathParam("userId") String userId, Session session) {
Timer timer = new Timer(true); LogUtil.info("建立一个socket链接: {} - {}", userId, session.getId());
NotificationCenter task = new NotificationCenter(session, userId); Timer timer = refreshTasks.get(userId);
if (timer != null) {
timer.cancel();
}
timer = new Timer(true);
NotificationCenter task = new NotificationCenter(userId);
timer.schedule(task, 0, 10 * 1000); timer.schedule(task, 0, 10 * 1000);
refreshTasks.putIfAbsent(session, timer); refreshTasks.put(userId, timer);
Set<Session> sessions = sessionMap.getOrDefault(userId, new HashSet<>());
sessions.add(session);
sessionMap.put(userId, sessions);
LogUtil.info("在线socket链接: {}, size: {}", userId, sessions.size());
} }
/** /**
* 连接关闭的操作 * 连接关闭的操作
*/ */
@OnClose @OnClose
public void onClose(Session session) { public void onClose(@PathParam("userId") String userId, Session session) {
Timer timer = refreshTasks.get(session); // 删除当前session
if (timer != null) { Set<Session> sessions = sessionMap.get(userId);
timer.cancel(); if (CollectionUtils.isNotEmpty(sessions)) {
refreshTasks.remove(session); LogUtil.info("剔除一个socket链接: {} - {}", userId, session.getId());
sessions.remove(session);
}
// 没有在线用户了
if (CollectionUtils.isEmpty(sessions)) {
LogUtil.info("关闭socket: {} ", userId);
Timer timer = refreshTasks.get(userId);
if (timer != null) {
timer.cancel();
refreshTasks.remove(userId);
}
} }
} }
@ -56,21 +84,7 @@ public class NotificationWebSocket {
*/ */
@OnMessage @OnMessage
public void onMessage(@PathParam("userId") String userId, Session session, String message) { public void onMessage(@PathParam("userId") String userId, Session session, String message) {
int refreshTime = 10; LogUtil.info(message);
try {
refreshTime = Integer.parseInt(message);
} catch (Exception e) {
}
try {
Timer timer = refreshTasks.get(session);
timer.cancel();
Timer newTimer = new Timer(true);
newTimer.schedule(new NotificationCenter(session, userId), 0, refreshTime * 1000L);
refreshTasks.put(session, newTimer);
} catch (Exception e) {
LogUtil.error(e.getMessage(), e);
}
} }
/** /**
@ -83,20 +97,15 @@ public class NotificationWebSocket {
} }
public static class NotificationCenter extends TimerTask { public static class NotificationCenter extends TimerTask {
private Session session;
private String userId; private String userId;
NotificationCenter(Session session, String userId) { NotificationCenter(String userId) {
this.session = session;
this.userId = userId; this.userId = userId;
} }
@Override @Override
public void run() { public void run() {
try { try {
if (!session.isOpen()) {
return;
}
Notification notification = new Notification(); Notification notification = new Notification();
notification.setReceiver(userId); notification.setReceiver(userId);
notification.setStatus(NotificationConstants.Status.UNREAD.name()); notification.setStatus(NotificationConstants.Status.UNREAD.name());
@ -105,7 +114,17 @@ public class NotificationWebSocket {
.count(count) .count(count)
.now(System.currentTimeMillis()) .now(System.currentTimeMillis())
.build(); .build();
session.getBasicRemote().sendText(JSON.toJSONString(message));
Set<Session> sessions = sessionMap.get(userId);
if (CollectionUtils.isNotEmpty(sessions)) {
sessions.forEach(session -> {
try {
session.getBasicRemote().sendText(JSON.toJSONString(message));
} catch (IOException e) {
LogUtil.error(e.getMessage(), e);
}
});
}
} catch (Exception e) { } catch (Exception e) {
LogUtil.error(e.getMessage(), e); LogUtil.error(e.getMessage(), e);
} }

View File

@ -84,6 +84,9 @@ export default {
} }
} }
}, },
beforeDestroy() {
this.websocket.close();
},
methods: { methods: {
getUserList() { getUserList() {
this.$get('/user/ws/current/member/list', response => { this.$get('/user/ws/current/member/list', response => {