refactor(消息通知): 站内消息刷新优化

--bug=1016011 --user=刘瑞斌 【消息通知】执行接口case时有时收到多条系统站内通知,有时不会弹出站内通知 https://www.tapd.cn/55049933/s/1233494
This commit is contained in:
CaptainB 2022-08-24 22:51:02 +08:00 committed by 刘瑞斌
parent 07277f0754
commit 69bd601b3f
2 changed files with 58 additions and 38 deletions

View File

@ -7,21 +7,26 @@ import io.metersphere.commons.utils.LogUtil;
import io.metersphere.notice.service.NotificationService;
import lombok.Builder;
import lombok.Data;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
@ServerEndpoint("/notification/count/{userId}/{random}")
@ServerEndpoint("/notification/count/{userId}")
@Component
public class NotificationWebSocket {
private static NotificationService notificationService;
private static ConcurrentHashMap<Session, Timer> refreshTasks = new ConcurrentHashMap<>();
private static final ConcurrentHashMap<String, Timer> refreshTasks = new ConcurrentHashMap<>();
private static final ConcurrentHashMap<String, Set<Session>> sessionMap = new ConcurrentHashMap<>();
@Resource
public void setNotificationService(NotificationService notificationService) {
@ -32,22 +37,45 @@ public class NotificationWebSocket {
* 开启连接的操作
*/
@OnOpen
public void onOpen(@PathParam("userId") String userId, @PathParam("random") double random, Session session) {
Timer timer = new Timer(true);
NotificationCenter task = new NotificationCenter(session, userId);
public void onOpen(@PathParam("userId") String userId, Session session) {
LogUtil.info("建立一个socket链接: {} - {}", userId, session.getId());
Timer timer = refreshTasks.get(userId);
if (timer != null) {
timer.cancel();
}
timer = new Timer(true);
NotificationCenter task = new NotificationCenter(userId);
timer.schedule(task, 0, 10 * 1000);
refreshTasks.putIfAbsent(session, timer);
refreshTasks.put(userId, timer);
Set<Session> sessions = sessionMap.getOrDefault(userId, new HashSet<>());
sessions.add(session);
sessionMap.put(userId, sessions);
LogUtil.info("在线socket链接: {}, size: {}", userId, sessions.size());
}
/**
* 连接关闭的操作
*/
@OnClose
public void onClose(Session session) {
Timer timer = refreshTasks.get(session);
public void onClose(@PathParam("userId") String userId, Session session) {
// 删除当前session
Set<Session> sessions = sessionMap.get(userId);
if (CollectionUtils.isNotEmpty(sessions)) {
LogUtil.info("剔除一个socket链接: {} - {}", userId, session.getId());
sessions.remove(session);
}
// 没有在线用户了
if (CollectionUtils.isEmpty(sessions)) {
LogUtil.info("关闭socket: {} ", userId);
Timer timer = refreshTasks.get(userId);
if (timer != null) {
timer.cancel();
refreshTasks.remove(session);
refreshTasks.remove(userId);
}
}
}
@ -56,21 +84,7 @@ public class NotificationWebSocket {
*/
@OnMessage
public void onMessage(@PathParam("userId") String userId, Session session, String message) {
int refreshTime = 10;
try {
refreshTime = Integer.parseInt(message);
} catch (Exception e) {
}
try {
Timer timer = refreshTasks.get(session);
timer.cancel();
Timer newTimer = new Timer(true);
newTimer.schedule(new NotificationCenter(session, userId), 0, refreshTime * 1000L);
refreshTasks.put(session, newTimer);
} catch (Exception e) {
LogUtil.error(e.getMessage(), e);
}
LogUtil.info(message);
}
/**
@ -83,20 +97,15 @@ public class NotificationWebSocket {
}
public static class NotificationCenter extends TimerTask {
private Session session;
private String userId;
NotificationCenter(Session session, String userId) {
this.session = session;
NotificationCenter(String userId) {
this.userId = userId;
}
@Override
public void run() {
try {
if (!session.isOpen()) {
return;
}
Notification notification = new Notification();
notification.setReceiver(userId);
notification.setStatus(NotificationConstants.Status.UNREAD.name());
@ -105,7 +114,17 @@ public class NotificationWebSocket {
.count(count)
.now(System.currentTimeMillis())
.build();
Set<Session> sessions = sessionMap.get(userId);
if (CollectionUtils.isNotEmpty(sessions)) {
sessions.forEach(session -> {
try {
session.getBasicRemote().sendText(JSON.toJSONString(message));
} catch (IOException e) {
LogUtil.error(e.getMessage(), e);
}
});
}
} catch (Exception e) {
LogUtil.error(e.getMessage(), e);
}

View File

@ -84,6 +84,9 @@ export default {
}
}
},
beforeDestroy() {
this.websocket.close();
},
methods: {
getUserList() {
this.$get('/user/ws/current/member/list', response => {
@ -99,7 +102,7 @@ export default {
if (window.location.protocol === 'https:') {
protocol = "wss://";
}
const uri = protocol + window.location.host + "/notification/count/" + getCurrentUserId() + "/" + Math.random();
const uri = protocol + window.location.host + "/notification/count/" + getCurrentUserId();
this.websocket = new WebSocket(uri);
this.websocket.onmessage = this.onMessage;
this.websocket.onopen = this.onOpen;
@ -163,12 +166,10 @@ export default {
type: 'info',
message: message,
});
this.noticeShow = true;
});
setTimeout(() => {
//
this.$get('/notification/read/' + d.id);
}, 5000)
this.noticeShow = true;
});
});
});
}