瀏覽代碼

优化拉取离线消息

xsx 8 月之前
父節點
當前提交
19ed9d2b79

+ 11 - 0
im-client/src/main/java/com/bx/imclient/IMClient.java

@@ -26,6 +26,17 @@ public class IMClient {
         return imSender.isOnline(userId);
     }
 
+    /**
+     * 判断用户是否在线
+     *
+     * @param userId 用户id
+     * @param terminal 终端可惜
+     */
+    public Boolean isOnline(Long userId,IMTerminalType terminal){
+        return imSender.isOnline(userId,terminal);
+    }
+
+
     /**
      * 判断多个用户是否在线
      *

+ 5 - 0
im-client/src/main/java/com/bx/imclient/sender/IMSender.java

@@ -246,6 +246,11 @@ public class IMSender {
         return onlineMap;
     }
 
+    public Boolean isOnline(Long userId, IMTerminalType terminal) {
+        String key = String.join(":", IMRedisKey.IM_USER_SERVER_ID, userId.toString(), terminal.code().toString());
+        return redisMQTemplate.hasKey(key);
+    }
+
     public Boolean isOnline(Long userId) {
         String key = String.join(":", IMRedisKey.IM_USER_SERVER_ID, userId.toString(), "*");
         return !Objects.requireNonNull(redisMQTemplate.keys(key)).isEmpty();

+ 19 - 12
im-platform/src/main/java/com/bx/implatform/service/impl/GroupMessageServiceImpl.java

@@ -158,13 +158,13 @@ public class GroupMessageServiceImpl extends ServiceImpl<GroupMessageMapper, Gro
             this.sendLoadingMessage(false, session);
             return;
         }
-
-        // 只能拉取最近3个月的,移动端只拉最近一个月
-        int months = session.getTerminal().equals(IMTerminalType.APP.code()) ? 1 : 3;
-        Date minDate = DateUtils.addMonths(new Date(), -months);
+        // 只拉最近一个月
+        Date minDate = DateUtils.addMonths(new Date(), -1);
         LambdaQueryWrapper<GroupMessage> wrapper = Wrappers.lambdaQuery();
-        wrapper.gt(GroupMessage::getId, minId).gt(GroupMessage::getSendTime, minDate)
-            .in(GroupMessage::getGroupId, groupIds).orderByAsc(GroupMessage::getId);
+        wrapper.gt(GroupMessage::getId, minId);
+        wrapper.gt(GroupMessage::getSendTime, minDate);
+        wrapper.in(GroupMessage::getGroupId, groupIds);
+        wrapper.orderByAsc(GroupMessage::getId);
         List<GroupMessage> messages = this.list(wrapper);
         // 通过群聊对消息进行分组
         Map<Long, List<GroupMessage>> messageGroupMap =
@@ -173,9 +173,11 @@ public class GroupMessageServiceImpl extends ServiceImpl<GroupMessageMapper, Gro
         List<GroupMember> quitMembers = groupMemberService.findQuitInMonth(session.getUserId());
         for (GroupMember quitMember : quitMembers) {
             wrapper = Wrappers.lambdaQuery();
-            wrapper.gt(GroupMessage::getId, minId).between(GroupMessage::getSendTime, minDate, quitMember.getQuitTime())
-                .eq(GroupMessage::getGroupId, quitMember.getGroupId())
-                .ne(GroupMessage::getStatus, MessageStatus.RECALL.code()).orderByAsc(GroupMessage::getId);
+            wrapper.gt(GroupMessage::getId, minId);
+            wrapper.between(GroupMessage::getSendTime, minDate, quitMember.getQuitTime());
+            wrapper.eq(GroupMessage::getGroupId, quitMember.getGroupId());
+            wrapper.ne(GroupMessage::getStatus, MessageStatus.RECALL.code());
+            wrapper.orderByAsc(GroupMessage::getId);
             List<GroupMessage> groupMessages = this.list(wrapper);
             messageGroupMap.put(quitMember.getGroupId(), groupMessages);
             groupMemberMap.put(quitMember.getGroupId(), quitMember);
@@ -186,10 +188,10 @@ public class GroupMessageServiceImpl extends ServiceImpl<GroupMessageMapper, Gro
             // 推送消息
             AtomicInteger sendCount = new AtomicInteger();
             messageGroupMap.forEach((groupId, groupMessages) -> {
-                // 第一次拉取时,一个群最多推送1w条消息,防止前端接收能力溢出导致卡顿
+                // 第一次拉取时,一个群最多推送3000条消息,防止前端接收能力溢出导致卡顿
                 List<GroupMessage> sendMessages = groupMessages;
-                if (minId <= 0 && groupMessages.size() > 10000) {
-                    sendMessages = groupMessages.subList(groupMessages.size() - 10000, groupMessages.size());
+                if (minId <= 0 && groupMessages.size() > 3000) {
+                    sendMessages = groupMessages.subList(groupMessages.size() - 3000, groupMessages.size());
                 }
                 // 填充消息状态
                 String key = StrUtil.join(":", RedisKey.IM_GROUP_READED_POSITION, groupId);
@@ -197,6 +199,11 @@ public class GroupMessageServiceImpl extends ServiceImpl<GroupMessageMapper, Gro
                 long readedMaxId = Objects.isNull(o) ? -1 : Long.parseLong(o.toString());
                 Map<Object, Object> maxIdMap = null;
                 for (GroupMessage m : sendMessages) {
+                    // 推送过程如果用户下线了,则不再推送
+                    if (!imClient.isOnline(session.getUserId(), IMTerminalType.fromCode(session.getTerminal()))) {
+                        log.info("用户已下线,停止推送离线群聊消息,用户id:{}", session.getUserId());
+                        return;
+                    }
                     // 排除加群之前的消息
                     GroupMember member = groupMemberMap.get(m.getGroupId());
                     if (DateUtil.compare(member.getCreatedTime(), m.getSendTime()) > 0) {

+ 5 - 0
im-platform/src/main/java/com/bx/implatform/service/impl/PrivateMessageServiceImpl.java

@@ -154,6 +154,11 @@ public class PrivateMessageServiceImpl extends ServiceImpl<PrivateMessageMapper,
             // 开启加载中标志
             this.sendLoadingMessage(true, session);
             for (PrivateMessage m : messages) {
+                // 推送过程如果用户下线了,则不再推送
+                if (!imClient.isOnline(session.getUserId(), IMTerminalType.fromCode(session.getTerminal()))) {
+                    log.info("用户已下线,停止推送离线私聊消息,用户id:{}", session.getUserId());
+                    return;
+                }
                 PrivateMessageVO vo = BeanUtils.copyProperties(m, PrivateMessageVO.class);
                 IMPrivateMessage<PrivateMessageVO> sendMessage = new IMPrivateMessage<>();
                 sendMessage.setSender(new IMUserInfo(m.getSendId(), IMTerminalType.WEB.code()));