xsx 1 rok temu
rodzic
commit
6a3ae3ccdb

+ 2 - 5
im-commom/src/main/java/com/bx/imcommon/mq/RedisMQPullTask.java

@@ -52,11 +52,6 @@ public class RedisMQPullTask implements CommandLineRunner {
                 public void run() {
                     List<Object> datas = new LinkedList<>();
                     try {
-                        if(redisMQTemplate.isClose()){
-                            // 如果redis未初始化或已断开,3s后再重新尝试消费
-                            EXECUTOR.schedule(this, 3, TimeUnit.SECONDS);
-                            return;
-                        }
                         if (consumer.isReady()) {
                             String key = consumer.generateKey();
                             // 拉取一个批次的数据
@@ -75,6 +70,8 @@ public class RedisMQPullTask implements CommandLineRunner {
                         }
                     } catch (Exception e) {
                         log.error("数据消费异常,队列:{}", queue, e);
+                        // 出现异常,10s后再重新尝试消费
+                        EXECUTOR.schedule(this, 10, TimeUnit.SECONDS);
                         return;
                     }
                     // 继续消费数据

+ 4 - 12
im-commom/src/main/java/com/bx/imcommon/mq/RedisMQTemplate.java

@@ -2,10 +2,9 @@ package com.bx.imcommon.mq;
 
 import org.apache.logging.log4j.util.Strings;
 import org.springframework.data.redis.connection.RedisConnection;
-import org.springframework.data.redis.connection.RedisConnectionFactory;
+import org.springframework.data.redis.core.RedisConnectionUtils;
 import org.springframework.data.redis.core.RedisTemplate;
 
-import java.util.Objects;
 import java.util.Properties;
 
 /**
@@ -19,9 +18,10 @@ public class RedisMQTemplate extends RedisTemplate<String, Object> {
 
     public String getVersion() {
         if (version.isEmpty()) {
-            RedisConnection redisConnection = this.getConnectionFactory().getConnection();
-            Properties properties = redisConnection.info();
+            RedisConnection connection = RedisConnectionUtils.getConnection(getConnectionFactory());
+            Properties properties = connection.info();
             version = properties.getProperty("redis_version");
+            RedisConnectionUtils.releaseConnection(connection,getConnectionFactory());
         }
         return version;
     }
@@ -41,12 +41,4 @@ public class RedisMQTemplate extends RedisTemplate<String, Object> {
         return firVersion > 6 || (firVersion == 6 && secVersion >= 2);
     }
 
-
-    Boolean isClose(){
-        try {
-            return  getConnectionFactory().getConnection().isClosed();
-        }catch (Exception e){
-            return true;
-        }
-    }
 }

+ 27 - 0
im-platform/src/main/java/com/bx/implatform/config/TaskSchedulerConfig.java

@@ -0,0 +1,27 @@
+package com.bx.implatform.config;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.TaskScheduler;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
+
+/**
+ * @author: Blue
+ * @date: 2024-09-01
+ * @version: 1.0
+ */
+
+@EnableScheduling
+@Configuration
+public class TaskSchedulerConfig {
+
+    @Bean
+    public TaskScheduler taskScheduler() {
+        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
+        taskScheduler.setPoolSize(10); // 设置线程池大小
+        taskScheduler.setThreadNamePrefix("scheduled-task-");
+        taskScheduler.initialize();
+        return taskScheduler;
+    }
+}

+ 1 - 1
im-platform/src/main/java/com/bx/implatform/task/GroupBannedConsumerTask.java → im-platform/src/main/java/com/bx/implatform/task/consumer/GroupBannedConsumerTask.java

@@ -1,4 +1,4 @@
-package com.bx.implatform.task;
+package com.bx.implatform.task.consumer;
 
 import com.bx.imclient.IMClient;
 import com.bx.imcommon.enums.IMTerminalType;

+ 1 - 1
im-platform/src/main/java/com/bx/implatform/task/GroupUnbanConsumerTask.java → im-platform/src/main/java/com/bx/implatform/task/consumer/GroupUnbanConsumerTask.java

@@ -1,4 +1,4 @@
-package com.bx.implatform.task;
+package com.bx.implatform.task.consumer;
 
 import com.bx.imclient.IMClient;
 import com.bx.imcommon.enums.IMTerminalType;

+ 1 - 1
im-platform/src/main/java/com/bx/implatform/task/UserBannedConsumerTask.java → im-platform/src/main/java/com/bx/implatform/task/consumer/UserBannedConsumerTask.java

@@ -1,4 +1,4 @@
-package com.bx.implatform.task;
+package com.bx.implatform.task.consumer;
 
 import com.bx.imclient.IMClient;
 import com.bx.imcommon.model.IMSystemMessage;

+ 26 - 0
im-platform/src/main/java/com/bx/implatform/task/schedule/ReloadSensitiveWordTask.java

@@ -0,0 +1,26 @@
+package com.bx.implatform.task.schedule;
+
+import com.bx.implatform.util.SensitiveFilterUtil;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author: Blue
+ * @date: 2024-09-01
+ * @version: 1.0
+ */
+@Slf4j
+@Component
+@RequiredArgsConstructor
+public class ReloadSensitiveWordTask {
+
+    private final SensitiveFilterUtil sensitiveFilterUtil;
+
+    @Scheduled(fixedRate = 60000)
+    public void run() {
+        log.info("【定时任务】重新装载敏感词...");
+        sensitiveFilterUtil.reload();
+    }
+}

+ 19 - 20
im-platform/src/main/java/com/bx/implatform/util/SensitiveFilterUtil.java

@@ -14,7 +14,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
 /**
  * 敏感词过滤器——SensitiveFilter
@@ -35,7 +34,7 @@ public final class SensitiveFilterUtil {
     /**
      * 根节点
      */
-    private static final TrieNode ROOT_NODE = new TrieNode();
+    private static  TrieNode ROOT_NODE = new TrieNode();
 
     /**
      * 线程池
@@ -86,41 +85,41 @@ public final class SensitiveFilterUtil {
      * @date 2023/12/4 11:18
      */
     @PostConstruct
-    public void init() {
-        // 每120s装载一次敏感词
-        EXECUTOR_SERVICE.scheduleAtFixedRate(() -> {
-            List<String> keywords = sensitiveWordService.findAllEnabledWords();
-            keywords.forEach(keyword->{
-                if(StrUtil.isNotEmpty(keyword)){
-                    // 添加到前缀树
-                    addKeyword(keyword);
-                }
-            });
-        },0,120, TimeUnit.SECONDS);
+    public void reload() {
+        // 使用copy on write的方式,防止出现并发问题
+        TrieNode newNode = new TrieNode();
+        List<String> keywords = sensitiveWordService.findAllEnabledWords();
+        keywords.forEach(keyword -> {
+            if (StrUtil.isNotEmpty(keyword)) {
+                // 添加到前缀树
+                addKeyword(newNode,keyword);
+            }
+        });
+        ROOT_NODE = newNode;
     }
 
     /**
      * 3、将一个敏感词添加到前缀树中
      *
+     * @param node
      * @param keyword
      * @author NXY
      * @date 2023/12/4 11:15
      */
-    private void addKeyword(String keyword) {
-        TrieNode tempNode = ROOT_NODE;
+    private void addKeyword(TrieNode node, String keyword) {
         for (int i = 0; i < keyword.length(); i++) {
             char c = keyword.charAt(i);
-            TrieNode subNode = tempNode.getSubNode(c);
+            TrieNode subNode = node.getSubNode(c);
             if (subNode == null) {
                 // 初始化子节点
                 subNode = new TrieNode();
-                tempNode.addSubNode(c, subNode);
+                node.addSubNode(c, subNode);
             }
             // 指向子节点,进入下一轮循环
-            tempNode = subNode;
+            node = subNode;
             // 设置结束标识
             if (i == keyword.length() - 1) {
-                tempNode.setKeywordEnd(true);
+                node.setKeywordEnd(true);
             }
         }
     }
@@ -195,9 +194,9 @@ public final class SensitiveFilterUtil {
     /**
      * 判断是否为符号 ——特殊符号
      *
+     * @return boolean
      * @author NXY
      * @date 2023/12/4 11:17
-     * @return boolean
      */
     private boolean isSymbol(Character c) {
         // 0x2E80~0x9FFF 是东亚文字范围

+ 8 - 21
im-server/src/main/java/com/bx/imserver/netty/processor/ProcessorFactory.java

@@ -6,27 +6,14 @@ import com.bx.imserver.util.SpringContextHolder;
 public class ProcessorFactory {
 
     public static AbstractMessageProcessor createProcessor(IMCmdType cmd) {
-        AbstractMessageProcessor processor = null;
-        switch (cmd) {
-            case LOGIN:
-                processor = SpringContextHolder.getApplicationContext().getBean(LoginProcessor.class);
-                break;
-            case HEART_BEAT:
-                processor = SpringContextHolder.getApplicationContext().getBean(HeartbeatProcessor.class);
-                break;
-            case PRIVATE_MESSAGE:
-                processor = SpringContextHolder.getApplicationContext().getBean(PrivateMessageProcessor.class);
-                break;
-            case GROUP_MESSAGE:
-                processor = SpringContextHolder.getApplicationContext().getBean(GroupMessageProcessor.class);
-                break;
-            case SYSTEM_MESSAGE:
-                processor = SpringContextHolder.getApplicationContext().getBean(SystemMessageProcessor.class);
-                break;
-            default:
-                break;
-        }
-        return processor;
+        return switch (cmd) {
+            case LOGIN->SpringContextHolder.getApplicationContext().getBean(LoginProcessor.class);
+            case HEART_BEAT -> SpringContextHolder.getApplicationContext().getBean(HeartbeatProcessor.class);
+            case PRIVATE_MESSAGE->SpringContextHolder.getApplicationContext().getBean(PrivateMessageProcessor.class);
+            case GROUP_MESSAGE->SpringContextHolder.getApplicationContext().getBean(GroupMessageProcessor.class);
+            case SYSTEM_MESSAGE->SpringContextHolder.getApplicationContext().getBean(SystemMessageProcessor.class);
+            default -> null;
+        };
     }
 
 }