xsx il y a 1 an
Parent
commit
def6d68d26

+ 2 - 2
im-server/src/main/java/com/bx/imserver/netty/IMChannelHandler.java

@@ -3,6 +3,7 @@ package com.bx.imserver.netty;
 import com.bx.imcommon.contant.IMRedisKey;
 import com.bx.imcommon.enums.IMCmdType;
 import com.bx.imcommon.model.IMSendInfo;
+import com.bx.imcommon.mq.RedisMQTemplate;
 import com.bx.imserver.constant.ChannelAttrKey;
 import com.bx.imserver.netty.processor.AbstractMessageProcessor;
 import com.bx.imserver.netty.processor.ProcessorFactory;
@@ -13,7 +14,6 @@ import io.netty.handler.timeout.IdleState;
 import io.netty.handler.timeout.IdleStateEvent;
 import io.netty.util.AttributeKey;
 import lombok.extern.slf4j.Slf4j;
-import org.springframework.data.redis.core.RedisTemplate;
 
 /**
  * WebSocket 长连接下 文本帧的处理器
@@ -71,7 +71,7 @@ public class IMChannelHandler extends SimpleChannelInboundHandler<IMSendInfo> {
             // 移除channel
             UserChannelCtxMap.removeChannelCtx(userId, terminal);
             // 用户下线
-            RedisTemplate<String, Object> redisTemplate = SpringContextHolder.getBean(RedisTemplate.class);
+            RedisMQTemplate redisTemplate = SpringContextHolder.getBean(RedisMQTemplate.class);
             String key = String.join(":", IMRedisKey.IM_USER_SERVER_ID, userId.toString(), terminal.toString());
             redisTemplate.delete(key);
             log.info("断开连接,userId:{},终端类型:{},{}", userId, terminal, ctx.channel().id().asLongText());

+ 3 - 4
im-server/src/main/java/com/bx/imserver/netty/IMServerGroup.java

@@ -1,11 +1,11 @@
 package com.bx.imserver.netty;
 
 import com.bx.imcommon.contant.IMRedisKey;
+import com.bx.imcommon.mq.RedisMQTemplate;
 import jakarta.annotation.PreDestroy;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.boot.CommandLineRunner;
-import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.stereotype.Component;
 
 import java.util.List;
@@ -17,14 +17,13 @@ public class IMServerGroup implements CommandLineRunner {
 
     public static volatile long serverId = 0;
 
-    private final  RedisTemplate<String, Object> redisTemplate;
+    private final RedisMQTemplate redisMQTemplate;
 
     private final List<IMServer> imServers;
 
     /***
      * 判断服务器是否就绪
      *
-     * @return
      **/
     public boolean isReady() {
         for (IMServer imServer : imServers) {
@@ -39,7 +38,7 @@ public class IMServerGroup implements CommandLineRunner {
     public void run(String... args) {
         // 初始化SERVER_ID
         String key = IMRedisKey.IM_MAX_SERVER_ID;
-        serverId = redisTemplate.opsForValue().increment(key, 1);
+        serverId = redisMQTemplate.opsForValue().increment(key, 1);
         // 启动服务
         for (IMServer imServer : imServers) {
             imServer.start();

+ 4 - 4
im-server/src/main/java/com/bx/imserver/netty/processor/HeartbeatProcessor.java

@@ -6,12 +6,12 @@ import com.bx.imcommon.contant.IMRedisKey;
 import com.bx.imcommon.enums.IMCmdType;
 import com.bx.imcommon.model.IMHeartbeatInfo;
 import com.bx.imcommon.model.IMSendInfo;
+import com.bx.imcommon.mq.RedisMQTemplate;
 import com.bx.imserver.constant.ChannelAttrKey;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.util.AttributeKey;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.stereotype.Component;
 
 import java.util.HashMap;
@@ -22,12 +22,12 @@ import java.util.concurrent.TimeUnit;
 @RequiredArgsConstructor
 public class HeartbeatProcessor extends AbstractMessageProcessor<IMHeartbeatInfo> {
 
-    private final RedisTemplate<String, Object> redisTemplate;
+    private final RedisMQTemplate redisMQTemplate;
 
     @Override
     public void process(ChannelHandlerContext ctx, IMHeartbeatInfo beatInfo) {
         // 响应ws
-        IMSendInfo sendInfo = new IMSendInfo();
+        IMSendInfo<Object> sendInfo = new IMSendInfo<>();
         sendInfo.setCmd(IMCmdType.HEART_BEAT.code());
         ctx.channel().writeAndFlush(sendInfo);
         // 设置属性
@@ -41,7 +41,7 @@ public class HeartbeatProcessor extends AbstractMessageProcessor<IMHeartbeatInfo
             AttributeKey<Integer> terminalAttr = AttributeKey.valueOf(ChannelAttrKey.TERMINAL_TYPE);
             Integer terminal = ctx.channel().attr(terminalAttr).get();
             String key = String.join(":", IMRedisKey.IM_USER_SERVER_ID, userId.toString(), terminal.toString());
-            redisTemplate.expire(key, IMConstant.ONLINE_TIMEOUT_SECOND, TimeUnit.SECONDS);
+            redisMQTemplate.expire(key, IMConstant.ONLINE_TIMEOUT_SECOND, TimeUnit.SECONDS);
         }
         AttributeKey<Long> userIdAttr = AttributeKey.valueOf(ChannelAttrKey.USER_ID);
         Long userId = ctx.channel().attr(userIdAttr).get();

+ 3 - 3
im-server/src/main/java/com/bx/imserver/netty/processor/LoginProcessor.java

@@ -8,6 +8,7 @@ import com.bx.imcommon.enums.IMCmdType;
 import com.bx.imcommon.model.IMLoginInfo;
 import com.bx.imcommon.model.IMSendInfo;
 import com.bx.imcommon.model.IMSessionInfo;
+import com.bx.imcommon.mq.RedisMQTemplate;
 import com.bx.imcommon.util.JwtUtil;
 import com.bx.imserver.constant.ChannelAttrKey;
 import com.bx.imserver.netty.IMServerGroup;
@@ -17,7 +18,6 @@ import io.netty.util.AttributeKey;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Value;
-import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.stereotype.Component;
 
 import java.util.HashMap;
@@ -28,7 +28,7 @@ import java.util.concurrent.TimeUnit;
 @RequiredArgsConstructor
 public class LoginProcessor extends AbstractMessageProcessor<IMLoginInfo> {
 
-    private final RedisTemplate<String, Object> redisTemplate;
+    private final RedisMQTemplate redisMQTemplate;
 
     @Value("${jwt.accessToken.secret}")
     private String accessTokenSecret;
@@ -66,7 +66,7 @@ public class LoginProcessor extends AbstractMessageProcessor<IMLoginInfo> {
         ctx.channel().attr(heartBeatAttr).set(0L);
         // 在redis上记录每个user的channelId,15秒没有心跳,则自动过期
         String key = String.join(":", IMRedisKey.IM_USER_SERVER_ID, userId.toString(), terminal.toString());
-        redisTemplate.opsForValue().set(key, IMServerGroup.serverId, IMConstant.ONLINE_TIMEOUT_SECOND, TimeUnit.SECONDS);
+        redisMQTemplate.opsForValue().set(key, IMServerGroup.serverId, IMConstant.ONLINE_TIMEOUT_SECOND, TimeUnit.SECONDS);
         // 响应ws
         IMSendInfo<Object> sendInfo = new IMSendInfo<>();
         sendInfo.setCmd(IMCmdType.LOGIN.code());