|
|
@@ -12,10 +12,7 @@ import org.springframework.beans.factory.annotation.Qualifier;
|
|
|
import org.springframework.data.redis.core.RedisTemplate;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
-import java.util.Collections;
|
|
|
-import java.util.LinkedList;
|
|
|
-import java.util.List;
|
|
|
-import java.util.Map;
|
|
|
+import java.util.*;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
|
|
@Service
|
|
|
@@ -81,22 +78,29 @@ public class IMSender {
|
|
|
|
|
|
public<T> void sendGroupMessage(IMGroupMessage<T> message) {
|
|
|
// 根据群聊每个成员所连的IM-server,进行分组
|
|
|
- List<IMUserInfo> offLineUsers = Collections.synchronizedList(new LinkedList<>());
|
|
|
- // 格式:map<服务器id,list<接收方>>
|
|
|
- Map<Integer, List<IMUserInfo>> serverMap = new ConcurrentHashMap<>();
|
|
|
+ Map<String, IMUserInfo> sendMap = new HashMap<>();
|
|
|
for (Integer terminal : message.getRecvTerminals()) {
|
|
|
- message.getRecvIds().parallelStream().forEach(id -> {
|
|
|
+ message.getRecvIds().stream().forEach(id -> {
|
|
|
String key = String.join(":", IMRedisKey.IM_USER_SERVER_ID, id.toString(), terminal.toString());
|
|
|
- Integer serverId = (Integer)redisTemplate.opsForValue().get(key);
|
|
|
- if (serverId != null) {
|
|
|
- List<IMUserInfo> list = serverMap.computeIfAbsent(serverId, o -> Collections.synchronizedList(new LinkedList<>()));
|
|
|
- list.add(new IMUserInfo(id, terminal));
|
|
|
- } else {
|
|
|
- // 加入离线列表
|
|
|
- offLineUsers.add(new IMUserInfo(id, terminal));
|
|
|
- }
|
|
|
+ sendMap.put(key,new IMUserInfo(id, terminal));
|
|
|
});
|
|
|
}
|
|
|
+ // 批量拉取
|
|
|
+ List<Object> serverIds = redisTemplate.opsForValue().multiGet(sendMap.keySet());
|
|
|
+ // 格式:map<服务器id,list<接收方>>
|
|
|
+ Map<Integer, List<IMUserInfo>> serverMap = new HashMap<>();
|
|
|
+ List<IMUserInfo> offLineUsers = Collections.synchronizedList(new LinkedList<>());
|
|
|
+ int idx = 0;
|
|
|
+ for (Map.Entry<String,IMUserInfo> entry : sendMap.entrySet()) {
|
|
|
+ Integer serverId = (Integer)serverIds.get(idx++);
|
|
|
+ if (serverId != null) {
|
|
|
+ List<IMUserInfo> list = serverMap.computeIfAbsent(serverId, o -> Collections.synchronizedList(new LinkedList<>()));
|
|
|
+ list.add(entry.getValue());
|
|
|
+ } else {
|
|
|
+ // 加入离线列表
|
|
|
+ offLineUsers.add(entry.getValue());
|
|
|
+ }
|
|
|
+ };
|
|
|
// 逐个server发送
|
|
|
for (Map.Entry<Integer, List<IMUserInfo>> entry : serverMap.entrySet()) {
|
|
|
IMRecvInfo recvInfo = new IMRecvInfo();
|