IMSender.java 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. package com.bx.imclient.sender;
  2. import cn.hutool.core.collection.CollUtil;
  3. import com.bx.imclient.listener.MessageListenerMulticaster;
  4. import com.bx.imcommon.contant.IMRedisKey;
  5. import com.bx.imcommon.enums.IMCmdType;
  6. import com.bx.imcommon.enums.IMListenerType;
  7. import com.bx.imcommon.enums.IMSendCode;
  8. import com.bx.imcommon.enums.IMTerminalType;
  9. import com.bx.imcommon.model.*;
  10. import com.bx.imcommon.mq.RedisMQTemplate;
  11. import lombok.RequiredArgsConstructor;
  12. import org.springframework.beans.factory.annotation.Autowired;
  13. import org.springframework.beans.factory.annotation.Value;
  14. import org.springframework.stereotype.Service;
  15. import java.util.*;
  16. @Service
  17. @RequiredArgsConstructor
  18. public class IMSender {
  19. @Autowired
  20. private RedisMQTemplate redisMQTemplate;
  21. @Value("${spring.application.name}")
  22. private String appName;
  23. private final MessageListenerMulticaster listenerMulticaster;
  24. public<T> void sendSystemMessage(IMSystemMessage<T> message){
  25. // 根据群聊每个成员所连的IM-server,进行分组
  26. Map<String, IMUserInfo> sendMap = new HashMap<>();
  27. for (Integer terminal : message.getRecvTerminals()) {
  28. message.getRecvIds().forEach(id -> {
  29. String key = String.join(":", IMRedisKey.IM_USER_SERVER_ID, id.toString(), terminal.toString());
  30. sendMap.put(key,new IMUserInfo(id, terminal));
  31. });
  32. }
  33. // 批量拉取
  34. List<Object> serverIds = redisMQTemplate.opsForValue().multiGet(sendMap.keySet());
  35. // 格式:map<服务器id,list<接收方>>
  36. Map<Integer, List<IMUserInfo>> serverMap = new HashMap<>();
  37. List<IMUserInfo> offLineUsers = new LinkedList<>();
  38. int idx = 0;
  39. for (Map.Entry<String,IMUserInfo> entry : sendMap.entrySet()) {
  40. Integer serverId = (Integer)serverIds.get(idx++);
  41. if (!Objects.isNull(serverId)) {
  42. List<IMUserInfo> list = serverMap.computeIfAbsent(serverId, o -> new LinkedList<>());
  43. list.add(entry.getValue());
  44. } else {
  45. // 加入离线列表
  46. offLineUsers.add(entry.getValue());
  47. }
  48. }
  49. // 逐个server发送
  50. for (Map.Entry<Integer, List<IMUserInfo>> entry : serverMap.entrySet()) {
  51. IMRecvInfo recvInfo = new IMRecvInfo();
  52. recvInfo.setCmd(IMCmdType.SYSTEM_MESSAGE.code());
  53. recvInfo.setReceivers(new LinkedList<>(entry.getValue()));
  54. recvInfo.setServiceName(appName);
  55. recvInfo.setSendResult(message.getSendResult());
  56. recvInfo.setData(message.getData());
  57. // 推送至队列
  58. String key = String.join(":", IMRedisKey.IM_MESSAGE_SYSTEM_QUEUE, entry.getKey().toString());
  59. redisMQTemplate.opsForList().rightPush(key, recvInfo);
  60. }
  61. // 对离线用户回复消息状态
  62. if(message.getSendResult() && !offLineUsers.isEmpty()){
  63. List<IMSendResult> results = new LinkedList<>();
  64. for (IMUserInfo offLineUser : offLineUsers) {
  65. IMSendResult result = new IMSendResult();
  66. result.setReceiver(offLineUser);
  67. result.setCode(IMSendCode.NOT_ONLINE.code());
  68. result.setData(message.getData());
  69. results.add(result);
  70. }
  71. listenerMulticaster.multicast(IMListenerType.SYSTEM_MESSAGE, results);
  72. }
  73. }
  74. public<T> void sendPrivateMessage(IMPrivateMessage<T> message) {
  75. List<IMSendResult> results = new LinkedList<>();
  76. if(!Objects.isNull(message.getRecvId())){
  77. for (Integer terminal : message.getRecvTerminals()) {
  78. // 获取对方连接的channelId
  79. String key = String.join(":", IMRedisKey.IM_USER_SERVER_ID, message.getRecvId().toString(), terminal.toString());
  80. Integer serverId = (Integer)redisMQTemplate.opsForValue().get(key);
  81. // 如果对方在线,将数据存储至redis,等待拉取推送
  82. if (serverId != null) {
  83. String sendKey = String.join(":", IMRedisKey.IM_MESSAGE_PRIVATE_QUEUE, serverId.toString());
  84. IMRecvInfo recvInfo = new IMRecvInfo();
  85. recvInfo.setCmd(IMCmdType.PRIVATE_MESSAGE.code());
  86. recvInfo.setSendResult(message.getSendResult());
  87. recvInfo.setServiceName(appName);
  88. recvInfo.setSender(message.getSender());
  89. recvInfo.setReceivers(Collections.singletonList(new IMUserInfo(message.getRecvId(), terminal)));
  90. recvInfo.setData(message.getData());
  91. redisMQTemplate.opsForList().rightPush(sendKey, recvInfo);
  92. } else {
  93. IMSendResult result = new IMSendResult();
  94. result.setSender(message.getSender());
  95. result.setReceiver(new IMUserInfo(message.getRecvId(), terminal));
  96. result.setCode(IMSendCode.NOT_ONLINE.code());
  97. result.setData(message.getData());
  98. results.add(result);
  99. }
  100. }
  101. }
  102. // 推送给自己的其他终端
  103. if(message.getSendToSelf()){
  104. for (Integer terminal : IMTerminalType.codes()) {
  105. if (message.getSender().getTerminal().equals(terminal)) {
  106. continue;
  107. }
  108. // 获取终端连接的channelId
  109. String key = String.join(":", IMRedisKey.IM_USER_SERVER_ID, message.getSender().getId().toString(), terminal.toString());
  110. Integer serverId = (Integer)redisMQTemplate.opsForValue().get(key);
  111. // 如果终端在线,将数据存储至redis,等待拉取推送
  112. if (serverId != null) {
  113. String sendKey = String.join(":", IMRedisKey.IM_MESSAGE_PRIVATE_QUEUE, serverId.toString());
  114. IMRecvInfo recvInfo = new IMRecvInfo();
  115. // 自己的消息不需要回推消息结果
  116. recvInfo.setSendResult(false);
  117. recvInfo.setCmd(IMCmdType.PRIVATE_MESSAGE.code());
  118. recvInfo.setSender(message.getSender());
  119. recvInfo.setReceivers(Collections.singletonList(new IMUserInfo(message.getSender().getId(), terminal)));
  120. recvInfo.setData(message.getData());
  121. redisMQTemplate.opsForList().rightPush(sendKey, recvInfo);
  122. }
  123. }
  124. }
  125. // 对离线用户回复消息状态
  126. if(message.getSendResult() && !results.isEmpty()){
  127. listenerMulticaster.multicast(IMListenerType.PRIVATE_MESSAGE, results);
  128. }
  129. }
  130. public<T> void sendGroupMessage(IMGroupMessage<T> message) {
  131. // 根据群聊每个成员所连的IM-server,进行分组
  132. Map<String, IMUserInfo> sendMap = new HashMap<>();
  133. for (Integer terminal : message.getRecvTerminals()) {
  134. message.getRecvIds().forEach(id -> {
  135. String key = String.join(":", IMRedisKey.IM_USER_SERVER_ID, id.toString(), terminal.toString());
  136. sendMap.put(key,new IMUserInfo(id, terminal));
  137. });
  138. }
  139. // 批量拉取
  140. List<Object> serverIds = redisMQTemplate.opsForValue().multiGet(sendMap.keySet());
  141. // 格式:map<服务器id,list<接收方>>
  142. Map<Integer, List<IMUserInfo>> serverMap = new HashMap<>();
  143. List<IMUserInfo> offLineUsers = new LinkedList<>();
  144. int idx = 0;
  145. for (Map.Entry<String,IMUserInfo> entry : sendMap.entrySet()) {
  146. Integer serverId = (Integer)serverIds.get(idx++);
  147. if (!Objects.isNull(serverId)) {
  148. List<IMUserInfo> list = serverMap.computeIfAbsent(serverId, o -> new LinkedList<>());
  149. list.add(entry.getValue());
  150. } else {
  151. // 加入离线列表
  152. offLineUsers.add(entry.getValue());
  153. }
  154. }
  155. // 逐个server发送
  156. for (Map.Entry<Integer, List<IMUserInfo>> entry : serverMap.entrySet()) {
  157. IMRecvInfo recvInfo = new IMRecvInfo();
  158. recvInfo.setCmd(IMCmdType.GROUP_MESSAGE.code());
  159. recvInfo.setReceivers(new LinkedList<>(entry.getValue()));
  160. recvInfo.setSender(message.getSender());
  161. recvInfo.setServiceName(appName);
  162. recvInfo.setSendResult(message.getSendResult());
  163. recvInfo.setData(message.getData());
  164. // 推送至队列
  165. String key = String.join(":", IMRedisKey.IM_MESSAGE_GROUP_QUEUE, entry.getKey().toString());
  166. redisMQTemplate.opsForList().rightPush(key, recvInfo);
  167. }
  168. // 推送给自己的其他终端
  169. if (message.getSendToSelf()) {
  170. for (Integer terminal : IMTerminalType.codes()) {
  171. if (terminal.equals(message.getSender().getTerminal())) {
  172. continue;
  173. }
  174. // 获取终端连接的channelId
  175. String key = String.join(":", IMRedisKey.IM_USER_SERVER_ID, message.getSender().getId().toString(), terminal.toString());
  176. Integer serverId = (Integer)redisMQTemplate.opsForValue().get(key);
  177. // 如果终端在线,将数据存储至redis,等待拉取推送
  178. if (!Objects.isNull(serverId)) {
  179. IMRecvInfo recvInfo = new IMRecvInfo();
  180. recvInfo.setCmd(IMCmdType.GROUP_MESSAGE.code());
  181. recvInfo.setSender(message.getSender());
  182. recvInfo.setReceivers(Collections.singletonList(new IMUserInfo(message.getSender().getId(), terminal)));
  183. // 自己的消息不需要回推消息结果
  184. recvInfo.setSendResult(false);
  185. recvInfo.setData(message.getData());
  186. String sendKey = String.join(":", IMRedisKey.IM_MESSAGE_GROUP_QUEUE, serverId.toString());
  187. redisMQTemplate.opsForList().rightPush(sendKey, recvInfo);
  188. }
  189. }
  190. }
  191. // 对离线用户回复消息状态
  192. if(message.getSendResult() && !offLineUsers.isEmpty()){
  193. List<IMSendResult> results = new LinkedList<>();
  194. for (IMUserInfo offLineUser : offLineUsers) {
  195. IMSendResult result = new IMSendResult();
  196. result.setSender(message.getSender());
  197. result.setReceiver(offLineUser);
  198. result.setCode(IMSendCode.NOT_ONLINE.code());
  199. result.setData(message.getData());
  200. results.add(result);
  201. }
  202. listenerMulticaster.multicast(IMListenerType.GROUP_MESSAGE, results);
  203. }
  204. }
  205. public Map<Long,List<IMTerminalType>> getOnlineTerminal(List<Long> userIds){
  206. if(CollUtil.isEmpty(userIds)){
  207. return Collections.emptyMap();
  208. }
  209. // 把所有用户的key都存起来
  210. Map<String,IMUserInfo> userMap = new HashMap<>();
  211. for(Long id:userIds){
  212. for (Integer terminal : IMTerminalType.codes()) {
  213. String key = String.join(":", IMRedisKey.IM_USER_SERVER_ID, id.toString(), terminal.toString());
  214. userMap.put(key,new IMUserInfo(id,terminal));
  215. }
  216. }
  217. // 批量拉取
  218. List<Object> serverIds = redisMQTemplate.opsForValue().multiGet(userMap.keySet());
  219. int idx = 0;
  220. Map<Long,List<IMTerminalType>> onlineMap = new HashMap<>();
  221. for (Map.Entry<String,IMUserInfo> entry : userMap.entrySet()) {
  222. // serverid有值表示用户在线
  223. if(serverIds.get(idx++) != null){
  224. IMUserInfo userInfo = entry.getValue();
  225. List<IMTerminalType> terminals = onlineMap.computeIfAbsent(userInfo.getId(), o -> new LinkedList<>());
  226. terminals.add(IMTerminalType.fromCode(userInfo.getTerminal()));
  227. }
  228. }
  229. // 去重并返回
  230. return onlineMap;
  231. }
  232. public Boolean isOnline(Long userId) {
  233. String key = String.join(":", IMRedisKey.IM_USER_SERVER_ID, userId.toString(), "*");
  234. return !Objects.requireNonNull(redisMQTemplate.keys(key)).isEmpty();
  235. }
  236. public List<Long> getOnlineUser(List<Long> userIds){
  237. return new LinkedList<>(getOnlineTerminal(userIds).keySet());
  238. }
  239. }