Bläddra i källkod

feat: im-client增加发送系统消息api

xsx 1 år sedan
förälder
incheckning
9f28d8c891
21 ändrade filer med 359 tillägg och 42 borttagningar
  1. 11 0
      im-client/src/main/java/com/bx/imclient/IMClient.java
  2. 56 3
      im-client/src/main/java/com/bx/imclient/sender/IMSender.java
  3. 43 0
      im-client/src/main/java/com/bx/imclient/task/SystemMessageResultResultTask.java
  4. 11 3
      im-commom/src/main/java/com/bx/imcommon/contant/IMRedisKey.java
  5. 5 1
      im-commom/src/main/java/com/bx/imcommon/enums/IMCmdType.java
  6. 7 1
      im-commom/src/main/java/com/bx/imcommon/enums/IMListenerType.java
  7. 1 1
      im-commom/src/main/java/com/bx/imcommon/model/IMPrivateMessage.java
  8. 1 1
      im-commom/src/main/java/com/bx/imcommon/model/IMSendResult.java
  9. 34 0
      im-commom/src/main/java/com/bx/imcommon/model/IMSystemMessage.java
  10. 6 0
      im-commom/src/main/java/com/bx/imcommon/mq/RedisMQConsumer.java
  11. 1 7
      im-commom/src/main/java/com/bx/imcommon/mq/RedisMQPullTask.java
  12. 36 0
      im-platform/src/main/java/com/bx/implatform/listener/SystemMessageListener.java
  13. 10 19
      im-platform/src/main/java/com/bx/implatform/task/UserBannedConsumerTask.java
  14. 27 0
      im-platform/src/main/java/com/bx/implatform/vo/SystemMessageVO.java
  15. 2 2
      im-server/src/main/java/com/bx/imserver/netty/processor/GroupMessageProcessor.java
  16. 4 2
      im-server/src/main/java/com/bx/imserver/netty/processor/PrivateMessageProcessor.java
  17. 3 0
      im-server/src/main/java/com/bx/imserver/netty/processor/ProcessorFactory.java
  18. 65 0
      im-server/src/main/java/com/bx/imserver/netty/processor/SystemMessageProcessor.java
  19. 1 1
      im-server/src/main/java/com/bx/imserver/task/PullGroupMessageTask.java
  20. 1 1
      im-server/src/main/java/com/bx/imserver/task/PullPrivateMessageTask.java
  21. 34 0
      im-server/src/main/java/com/bx/imserver/task/PullSystemMessageTask.java

+ 11 - 0
im-client/src/main/java/com/bx/imclient/IMClient.java

@@ -4,6 +4,7 @@ import com.bx.imclient.sender.IMSender;
 import com.bx.imcommon.enums.IMTerminalType;
 import com.bx.imcommon.model.IMGroupMessage;
 import com.bx.imcommon.model.IMPrivateMessage;
+import com.bx.imcommon.model.IMSystemMessage;
 import lombok.AllArgsConstructor;
 import org.springframework.context.annotation.Configuration;
 
@@ -46,6 +47,16 @@ public class IMClient {
         return imSender.getOnlineTerminal(userIds);
     }
 
+    /**
+     * 发送系统消息(发送结果通过MessageListener接收)
+     *
+     * @param message 私有消息
+     */
+    public<T> void sendSystemMessage(IMSystemMessage<T> message){
+        imSender.sendSystemMessage(message);
+    }
+
+
     /**
      * 发送私聊消息(发送结果通过MessageListener接收)
      *

+ 56 - 3
im-client/src/main/java/com/bx/imclient/sender/IMSender.java

@@ -28,6 +28,59 @@ public class IMSender {
 
     private final MessageListenerMulticaster listenerMulticaster;
 
+
+    public<T> void sendSystemMessage(IMSystemMessage<T> message){
+        // 根据群聊每个成员所连的IM-server,进行分组
+        Map<String, IMUserInfo> sendMap = new HashMap<>();
+        for (Integer terminal : message.getRecvTerminals()) {
+            message.getRecvIds().forEach(id -> {
+                String key = String.join(":", IMRedisKey.IM_USER_SERVER_ID, id.toString(), terminal.toString());
+                sendMap.put(key,new IMUserInfo(id, terminal));
+            });
+        }
+        // 批量拉取
+        List<Object> serverIds = redisMQTemplate.opsForValue().multiGet(sendMap.keySet());
+        // 格式:map<服务器id,list<接收方>>
+        Map<Integer, List<IMUserInfo>> serverMap = new HashMap<>();
+        List<IMUserInfo> offLineUsers = new LinkedList<>();
+        int idx = 0;
+        for (Map.Entry<String,IMUserInfo> entry : sendMap.entrySet()) {
+            Integer serverId = (Integer)serverIds.get(idx++);
+            if (!Objects.isNull(serverId)) {
+                List<IMUserInfo> list = serverMap.computeIfAbsent(serverId, o -> new LinkedList<>());
+                list.add(entry.getValue());
+            } else {
+                // 加入离线列表
+                offLineUsers.add(entry.getValue());
+            }
+        }
+        // 逐个server发送
+        for (Map.Entry<Integer, List<IMUserInfo>> entry : serverMap.entrySet()) {
+            IMRecvInfo recvInfo = new IMRecvInfo();
+            recvInfo.setCmd(IMCmdType.SYSTEM_MESSAGE.code());
+            recvInfo.setReceivers(new LinkedList<>(entry.getValue()));
+            recvInfo.setServiceName(appName);
+            recvInfo.setSendResult(message.getSendResult());
+            recvInfo.setData(message.getData());
+            // 推送至队列
+            String key = String.join(":", IMRedisKey.IM_MESSAGE_SYSTEM_QUEUE, entry.getKey().toString());
+            redisMQTemplate.opsForList().rightPush(key, recvInfo);
+        }
+        // 对离线用户回复消息状态
+        if(message.getSendResult() && !offLineUsers.isEmpty()){
+            List<IMSendResult> results = new LinkedList<>();
+            for (IMUserInfo offLineUser : offLineUsers) {
+                IMSendResult result = new IMSendResult();
+                result.setReceiver(offLineUser);
+                result.setCode(IMSendCode.NOT_ONLINE.code());
+                result.setData(message.getData());
+                results.add(result);
+            }
+            listenerMulticaster.multicast(IMListenerType.SYSTEM_MESSAGE, results);
+        }
+    }
+
+
     public<T> void sendPrivateMessage(IMPrivateMessage<T> message) {
         List<IMSendResult> results = new LinkedList<>();
         if(!Objects.isNull(message.getRecvId())){
@@ -84,10 +137,10 @@ public class IMSender {
         if(message.getSendResult() && !results.isEmpty()){
             listenerMulticaster.multicast(IMListenerType.PRIVATE_MESSAGE, results);
         }
+
     }
 
     public<T> void sendGroupMessage(IMGroupMessage<T> message) {
-
         // 根据群聊每个成员所连的IM-server,进行分组
         Map<String, IMUserInfo> sendMap = new HashMap<>();
         for (Integer terminal : message.getRecvTerminals()) {
@@ -104,7 +157,7 @@ public class IMSender {
         int idx = 0;
         for (Map.Entry<String,IMUserInfo> entry : sendMap.entrySet()) {
             Integer serverId = (Integer)serverIds.get(idx++);
-            if (serverId != null) {
+            if (!Objects.isNull(serverId)) {
                 List<IMUserInfo> list = serverMap.computeIfAbsent(serverId, o -> new LinkedList<>());
                 list.add(entry.getValue());
             } else {
@@ -136,7 +189,7 @@ public class IMSender {
                 String key = String.join(":", IMRedisKey.IM_USER_SERVER_ID, message.getSender().getId().toString(), terminal.toString());
                 Integer serverId = (Integer)redisMQTemplate.opsForValue().get(key);
                 // 如果终端在线,将数据存储至redis,等待拉取推送
-                if (serverId != null) {
+                if (!Objects.isNull(serverId)) {
                     IMRecvInfo recvInfo = new IMRecvInfo();
                     recvInfo.setCmd(IMCmdType.GROUP_MESSAGE.code());
                     recvInfo.setSender(message.getSender());

+ 43 - 0
im-client/src/main/java/com/bx/imclient/task/SystemMessageResultResultTask.java

@@ -0,0 +1,43 @@
+package com.bx.imclient.task;
+
+import cn.hutool.core.util.StrUtil;
+import com.alibaba.fastjson.JSONObject;
+import com.bx.imclient.listener.MessageListenerMulticaster;
+import com.bx.imcommon.contant.IMRedisKey;
+import com.bx.imcommon.enums.IMListenerType;
+import com.bx.imcommon.model.IMSendResult;
+import com.bx.imcommon.mq.RedisMQConsumer;
+import com.bx.imcommon.mq.RedisMQListener;
+import com.bx.imcommon.mq.RedisMQTemplate;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+
+@Slf4j
+@Component
+@RequiredArgsConstructor
+@RedisMQListener(queue = IMRedisKey.IM_RESULT_SYSTEM_QUEUE, batchSize = 100, period = 100)
+public class SystemMessageResultResultTask extends RedisMQConsumer<IMSendResult> {
+
+    @Value("${spring.application.name}")
+    private String appName;
+
+    private final MessageListenerMulticaster listenerMulticaster;
+
+    @Override
+    public void onMessage(List<IMSendResult> results) {
+        listenerMulticaster.multicast(IMListenerType.SYSTEM_MESSAGE, results);
+    }
+
+    @Override
+    public String generateKey() {
+        return StrUtil.join(":", IMRedisKey.IM_RESULT_SYSTEM_QUEUE, appName);
+    }
+
+}

+ 11 - 3
im-commom/src/main/java/com/bx/imcommon/contant/IMRedisKey.java

@@ -2,7 +2,6 @@ package com.bx.imcommon.contant;
 
 public final class IMRedisKey {
 
-    private IMRedisKey() {}
 
     /**
      * im-server最大id,从0开始递增
@@ -13,13 +12,22 @@ public final class IMRedisKey {
      */
     public static final String  IM_USER_SERVER_ID = "im:user:server_id";
     /**
-     * 未读私聊消息队列
+     * 系统消息队列
+     */
+    public static final String IM_MESSAGE_SYSTEM_QUEUE = "im:message:system";
+    /**
+     * 私聊消息队列
      */
     public static final String IM_MESSAGE_PRIVATE_QUEUE = "im:message:private";
     /**
-     * 未读群聊消息队列
+     * 群聊消息队列
      */
     public static final String IM_MESSAGE_GROUP_QUEUE = "im:message:group";
+
+    /**
+     * 系统消息发送结果队列
+     */
+    public static final String IM_RESULT_SYSTEM_QUEUE = "im:result:system";
     /**
      * 私聊消息发送结果队列
      */

+ 5 - 1
im-commom/src/main/java/com/bx/imcommon/enums/IMCmdType.java

@@ -24,7 +24,11 @@ public enum IMCmdType {
     /**
      * 群发消息
      */
-    GROUP_MESSAGE(4, "群发消息");
+    GROUP_MESSAGE(4, "群发消息"),
+    /**
+     * 系统消息
+     */
+    SYSTEM_MESSAGE(5,"系统消息");
 
 
     private final Integer code;

+ 7 - 1
im-commom/src/main/java/com/bx/imcommon/enums/IMListenerType.java

@@ -15,7 +15,13 @@ public enum IMListenerType {
     /**
      * 群聊消息
      */
-    GROUP_MESSAGE(2, "群聊消息");
+    GROUP_MESSAGE(2, "群聊消息"),
+    /**
+     * 系统消息
+     */
+    SYSTEM_MESSAGE(3, "群聊消息");
+
+
 
     private final Integer code;
 

+ 1 - 1
im-commom/src/main/java/com/bx/imcommon/model/IMPrivateMessage.java

@@ -26,7 +26,7 @@ public class IMPrivateMessage<T> {
     private List<Integer> recvTerminals = IMTerminalType.codes();
 
     /**
-     * 是否发送给自己的其他终端,默认true
+     * 是否同步消息给自己的其他终端,默认true
      */
     private Boolean sendToSelf = true;
 

+ 1 - 1
im-commom/src/main/java/com/bx/imcommon/model/IMSendResult.java

@@ -16,7 +16,7 @@ public class IMSendResult<T> {
     private IMUserInfo receiver;
 
     /**
-     * 发送状态 IMCmdType
+     * 发送状态编码 IMSendCode
      */
     private Integer code;
 

+ 34 - 0
im-commom/src/main/java/com/bx/imcommon/model/IMSystemMessage.java

@@ -0,0 +1,34 @@
+package com.bx.imcommon.model;
+
+import com.bx.imcommon.enums.IMTerminalType;
+import lombok.Data;
+
+import java.util.LinkedList;
+import java.util.List;
+
+@Data
+public class IMSystemMessage<T> {
+
+
+    /**
+     * 接收者id列表,为空表示向所有在线用户广播
+     */
+    private List<Long> recvIds = new LinkedList<>();
+
+    /**
+     * 接收者终端类型,默认全部
+     */
+    private List<Integer> recvTerminals = IMTerminalType.codes();
+
+    /**
+     * 是否需要回推发送结果,默认true
+     */
+    private Boolean sendResult = true;
+
+    /**
+     * 消息内容
+     */
+    private T data;
+
+
+}

+ 6 - 0
im-commom/src/main/java/com/bx/imcommon/mq/RedisMQConsumer.java

@@ -10,4 +10,10 @@ public abstract class RedisMQConsumer<T> {
      public void onMessage(T data){}
 
      public void onMessage(List<T> datas){}
+
+     public String generateKey(){
+          // 默认队列名就是redis的key
+          RedisMQListener annotation = this.getClass().getAnnotation(RedisMQListener.class);
+          return annotation.queue();
+     }
 }

+ 1 - 7
im-commom/src/main/java/com/bx/imcommon/mq/RedisMQPullTask.java

@@ -1,18 +1,12 @@
 package com.bx.imcommon.mq;
 
 import com.alibaba.fastjson.JSONObject;
-
 import com.bx.imcommon.util.ThreadPoolExecutorFactory;
-import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.CommandLineRunner;
-import org.springframework.data.redis.connection.RedisConnection;
-import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.stereotype.Component;
-
 import javax.annotation.PreDestroy;
-import javax.annotation.Resource;
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
 import java.util.*;
@@ -44,7 +38,7 @@ public class RedisMQPullTask implements CommandLineRunner {
         consumers.forEach((consumer -> {
             // 注解参数
             RedisMQListener annotation = consumer.getClass().getAnnotation(RedisMQListener.class);
-            String key = annotation.queue();
+            String key = consumer.generateKey();
             int batchSize = annotation.batchSize();
             int period = annotation.period();
             // 获取泛型类型

+ 36 - 0
im-platform/src/main/java/com/bx/implatform/listener/SystemMessageListener.java

@@ -0,0 +1,36 @@
+package com.bx.implatform.listener;
+
+import cn.hutool.core.collection.CollUtil;
+import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
+import com.bx.imclient.annotation.IMListener;
+import com.bx.imclient.listener.MessageListener;
+import com.bx.imcommon.enums.IMListenerType;
+import com.bx.imcommon.enums.IMSendCode;
+import com.bx.imcommon.model.IMSendResult;
+import com.bx.implatform.entity.PrivateMessage;
+import com.bx.implatform.enums.MessageStatus;
+import com.bx.implatform.service.IPrivateMessageService;
+import com.bx.implatform.vo.PrivateMessageVO;
+import com.bx.implatform.vo.SystemMessageVO;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Lazy;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@Slf4j
+@IMListener(type = IMListenerType.SYSTEM_MESSAGE)
+public class SystemMessageListener implements MessageListener<SystemMessageVO> {
+
+    @Override
+    public void process(List<IMSendResult<SystemMessageVO>> results) {
+        for(IMSendResult<SystemMessageVO> result : results){
+            SystemMessageVO messageInfo = result.getData();
+            if (result.getCode().equals(IMSendCode.SUCCESS.code())) {
+                log.info("消息送达,消息id:{},接收者:{},终端:{}", messageInfo.getId(), result.getReceiver().getId(), result.getReceiver().getTerminal());
+            }
+        }
+    }
+}

+ 10 - 19
im-platform/src/main/java/com/bx/implatform/task/UserBannedConsumerTask.java

@@ -1,21 +1,17 @@
 package com.bx.implatform.task;
 
 import com.bx.imclient.IMClient;
-import com.bx.imcommon.enums.IMTerminalType;
-import com.bx.imcommon.model.IMPrivateMessage;
-import com.bx.imcommon.model.IMUserInfo;
+import com.bx.imcommon.model.IMSystemMessage;
 import com.bx.imcommon.mq.RedisMQConsumer;
 import com.bx.imcommon.mq.RedisMQListener;
-import com.bx.implatform.contant.Constant;
 import com.bx.implatform.contant.RedisKey;
 import com.bx.implatform.dto.UserBanDTO;
-import com.bx.implatform.service.IUserService;
-import com.bx.implatform.util.BeanUtils;
-import com.bx.implatform.vo.PrivateMessageVO;
+import com.bx.implatform.enums.MessageType;
+import com.bx.implatform.vo.SystemMessageVO;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.springframework.security.core.parameters.P;
 import org.springframework.stereotype.Component;
+import java.util.Collections;
 
 /**
  * @author: 谢绍许
@@ -31,20 +27,15 @@ public class UserBannedConsumerTask extends RedisMQConsumer<UserBanDTO> {
     private final IMClient imClient;
     @Override
     public void onMessage(UserBanDTO dto) {
-
         log.info("用户被封禁处理,userId:{},原因:{}",dto.getId(),dto.getReason());
-        // 推送消息
-
-        PrivateMessageVO msgInfo = new PrivateMessageVO();
-        msgInfo.setRecvId(dto.getId());
-        msgInfo.setSendId(Constant.SYS_USER_ID);
+        // 推送消息将用户赶下线
+        SystemMessageVO msgInfo = new SystemMessageVO();
+        msgInfo.setType(MessageType.USER_BANNED.code());
         msgInfo.setContent(dto.getReason());
-        IMPrivateMessage<PrivateMessageVO> sendMessage = new IMPrivateMessage<>();
-        sendMessage.setSender(new IMUserInfo(Constant.SYS_USER_ID, IMTerminalType.WEB.code()));
-        sendMessage.setRecvId(dto.getId());
-        sendMessage.setSendToSelf(false);
+        IMSystemMessage<SystemMessageVO> sendMessage = new IMSystemMessage<>();
+        sendMessage.setRecvIds(Collections.singletonList(dto.getId()));
         sendMessage.setData(msgInfo);
         sendMessage.setSendResult(true);
-        imClient.sendPrivateMessage(sendMessage);
+        imClient.sendSystemMessage(sendMessage);
     }
 }

+ 27 - 0
im-platform/src/main/java/com/bx/implatform/vo/SystemMessageVO.java

@@ -0,0 +1,27 @@
+package com.bx.implatform.vo;
+
+import com.bx.imcommon.serializer.DateToLongSerializer;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+
+import java.util.Date;
+
+@Data
+@ApiModel("系统消息VO")
+public class SystemMessageVO {
+
+    @ApiModelProperty(value = " 消息id")
+    private Long id;
+
+    @ApiModelProperty(value = " 发送内容")
+    private String content;
+
+    @ApiModelProperty(value = "消息内容类型 MessageType")
+    private Integer type;
+
+    @ApiModelProperty(value = " 发送时间")
+    @JsonSerialize(using = DateToLongSerializer.class)
+    private Date sendTime;
+}

+ 2 - 2
im-server/src/main/java/com/bx/imserver/netty/processor/GroupMessageProcessor.java

@@ -13,10 +13,10 @@ import io.netty.channel.ChannelHandlerContext;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.data.redis.core.RedisTemplate;
-import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Component;
 
 import java.util.List;
+import java.util.Objects;
 
 @Slf4j
 @Component
@@ -33,7 +33,7 @@ public class GroupMessageProcessor extends AbstractMessageProcessor<IMRecvInfo>
         for (IMUserInfo receiver : receivers) {
             try {
                 ChannelHandlerContext channelCtx = UserChannelCtxMap.getChannelCtx(receiver.getId(), receiver.getTerminal());
-                if (channelCtx != null) {
+                if (!Objects.isNull(channelCtx)) {
                     // 推送消息到用户
                     IMSendInfo sendInfo = new IMSendInfo();
                     sendInfo.setCmd(IMCmdType.GROUP_MESSAGE.code());

+ 4 - 2
im-server/src/main/java/com/bx/imserver/netty/processor/PrivateMessageProcessor.java

@@ -15,6 +15,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.stereotype.Component;
 
+import java.util.Objects;
+
 @Slf4j
 @Component
 @RequiredArgsConstructor
@@ -26,10 +28,10 @@ public class PrivateMessageProcessor extends AbstractMessageProcessor<IMRecvInfo
     public void process(IMRecvInfo recvInfo) {
         IMUserInfo sender = recvInfo.getSender();
         IMUserInfo receiver = recvInfo.getReceivers().get(0);
-        log.info("接收到消息,发送者:{},接收者:{},内容:{}", sender.getId(), receiver.getId(), recvInfo.getData());
+        log.info("接收到私聊消息,发送者:{},接收者:{},内容:{}", sender.getId(), receiver.getId(), recvInfo.getData());
         try {
             ChannelHandlerContext channelCtx = UserChannelCtxMap.getChannelCtx(receiver.getId(), receiver.getTerminal());
-            if (channelCtx != null) {
+            if (!Objects.isNull(channelCtx)) {
                 // 推送消息到用户
                 IMSendInfo<Object> sendInfo = new IMSendInfo<>();
                 sendInfo.setCmd(IMCmdType.PRIVATE_MESSAGE.code());

+ 3 - 0
im-server/src/main/java/com/bx/imserver/netty/processor/ProcessorFactory.java

@@ -20,6 +20,9 @@ public class ProcessorFactory {
             case GROUP_MESSAGE:
                 processor = SpringContextHolder.getApplicationContext().getBean(GroupMessageProcessor.class);
                 break;
+            case SYSTEM_MESSAGE:
+                processor = SpringContextHolder.getApplicationContext().getBean(SystemMessageProcessor.class);
+                break;
             default:
                 break;
         }

+ 65 - 0
im-server/src/main/java/com/bx/imserver/netty/processor/SystemMessageProcessor.java

@@ -0,0 +1,65 @@
+package com.bx.imserver.netty.processor;
+
+import cn.hutool.core.util.StrUtil;
+import com.bx.imcommon.contant.IMRedisKey;
+import com.bx.imcommon.enums.IMCmdType;
+import com.bx.imcommon.enums.IMSendCode;
+import com.bx.imcommon.model.IMRecvInfo;
+import com.bx.imcommon.model.IMSendInfo;
+import com.bx.imcommon.model.IMSendResult;
+import com.bx.imcommon.model.IMUserInfo;
+import com.bx.imserver.netty.UserChannelCtxMap;
+import io.netty.channel.ChannelHandlerContext;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Component;
+
+import java.util.Objects;
+
+@Slf4j
+@Component
+@RequiredArgsConstructor
+public class SystemMessageProcessor extends AbstractMessageProcessor<IMRecvInfo> {
+
+    private final RedisTemplate<String, Object> redisTemplate;
+
+    @Override
+    public void process(IMRecvInfo recvInfo) {
+        IMUserInfo receiver = recvInfo.getReceivers().get(0);
+        log.info("接收到系统消息,接收者:{},内容:{}",  receiver.getId(), recvInfo.getData());
+        try {
+            ChannelHandlerContext channelCtx = UserChannelCtxMap.getChannelCtx(receiver.getId(), receiver.getTerminal());
+            if (!Objects.isNull(channelCtx)) {
+                // 推送消息到用户
+                IMSendInfo<Object> sendInfo = new IMSendInfo<>();
+                sendInfo.setCmd(IMCmdType.SYSTEM_MESSAGE.code());
+                sendInfo.setData(recvInfo.getData());
+                channelCtx.channel().writeAndFlush(sendInfo);
+                // 消息发送成功确认
+                sendResult(recvInfo, IMSendCode.SUCCESS);
+            } else {
+                // 消息推送失败确认
+                sendResult(recvInfo, IMSendCode.NOT_FIND_CHANNEL);
+                log.error("未找到channel,接收者:{},内容:{}",receiver.getId(), recvInfo.getData());
+            }
+        } catch (Exception e) {
+            // 消息推送失败确认
+            sendResult(recvInfo, IMSendCode.UNKONW_ERROR);
+            log.error("发送异常,,接收者:{},内容:{}", receiver.getId(), recvInfo.getData(), e);
+        }
+
+    }
+
+    private void sendResult(IMRecvInfo recvInfo, IMSendCode sendCode) {
+        if (recvInfo.getSendResult()) {
+            IMSendResult<Object> result = new IMSendResult<>();
+            result.setReceiver(recvInfo.getReceivers().get(0));
+            result.setCode(sendCode.code());
+            result.setData(recvInfo.getData());
+            // 推送到结果队列
+            String key = StrUtil.join(":",IMRedisKey.IM_RESULT_SYSTEM_QUEUE,recvInfo.getServiceName());
+            redisTemplate.opsForList().rightPush(key, result);
+        }
+    }
+}

+ 1 - 1
im-server/src/main/java/com/bx/imserver/task/PullGroupMessageTask.java

@@ -23,7 +23,7 @@ public class PullGroupMessageTask extends AbstractPullMessageTask {
 
     @Override
     public void pullMessage() {
-        // 从redis拉取未读消息
+        // 从redis拉取消息
         String key = String.join(":", IMRedisKey.IM_MESSAGE_GROUP_QUEUE, IMServerGroup.serverId + "");
         JSONObject jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key);
         while (!Objects.isNull(jsonObject)) {

+ 1 - 1
im-server/src/main/java/com/bx/imserver/task/PullPrivateMessageTask.java

@@ -23,7 +23,7 @@ public class PullPrivateMessageTask extends AbstractPullMessageTask {
 
     @Override
     public void pullMessage() {
-        // 从redis拉取未读消息
+        // 从redis拉取消息
         String key = String.join(":", IMRedisKey.IM_MESSAGE_PRIVATE_QUEUE, IMServerGroup.serverId + "");
         JSONObject jsonObject = (JSONObject) redisTemplate.opsForList().leftPop(key);
         while (!Objects.isNull(jsonObject)) {

+ 34 - 0
im-server/src/main/java/com/bx/imserver/task/PullSystemMessageTask.java

@@ -0,0 +1,34 @@
+package com.bx.imserver.task;
+
+import com.bx.imcommon.contant.IMRedisKey;
+import com.bx.imcommon.enums.IMCmdType;
+import com.bx.imcommon.model.IMRecvInfo;
+import com.bx.imcommon.mq.RedisMQConsumer;
+import com.bx.imcommon.mq.RedisMQListener;
+import com.bx.imserver.netty.IMServerGroup;
+import com.bx.imserver.netty.processor.AbstractMessageProcessor;
+import com.bx.imserver.netty.processor.ProcessorFactory;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author: 谢绍许
+ * @date: 2024-07-16
+ * @version: 1.0
+ */
+@Slf4j
+@Component
+@RedisMQListener(queue = IMRedisKey.IM_MESSAGE_SYSTEM_QUEUE,batchSize = 10)
+public class PullSystemMessageTask extends RedisMQConsumer<IMRecvInfo> {
+
+    @Override
+    public void onMessage(IMRecvInfo recvInfo) {
+        AbstractMessageProcessor processor = ProcessorFactory.createProcessor(IMCmdType.SYSTEM_MESSAGE);
+        processor.process(recvInfo);
+    }
+
+    public String generateKey(){
+        // 队列名:im:message:system:{服务id}
+        return String.join(":", IMRedisKey.IM_MESSAGE_SYSTEM_QUEUE, IMServerGroup.serverId + "");
+    }
+}