ImUtils.java 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. package com.nb.im.utils;
  2. import cn.hutool.core.bean.BeanUtil;
  3. import cn.hutool.core.util.ObjectUtil;
  4. import cn.hutool.json.JSONUtil;
  5. import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
  6. import com.nb.common.websocket.PubResponse;
  7. import com.nb.common.websocket.TopicMessage;
  8. import com.nb.common.websocket.WebSocketConstant;
  9. import com.nb.common.websocket.handler.TopicWrapper;
  10. import com.nb.common.websocket.msg.MessageResponse;
  11. import com.nb.im.entity.ImMsgEntity;
  12. import com.nb.im.entity.ImRoomEntity;
  13. import com.nb.im.entity.ImRoomUserEntity;
  14. import com.nb.im.enums.SponsorEnum;
  15. import com.nb.im.room.ImRoomOperator;
  16. import com.nb.im.room.ImRoomOperatorManager;
  17. import com.nb.im.service.LocalImMsgService;
  18. import com.nb.im.service.LocalImRoomService;
  19. import com.nb.im.service.LocalImRoomUserService;
  20. import com.nb.im.ws.PubMsgInfo;
  21. import lombok.AllArgsConstructor;
  22. import lombok.extern.slf4j.Slf4j;
  23. import org.redisson.api.RTopic;
  24. import org.redisson.api.RedissonClient;
  25. import org.springframework.stereotype.Component;
  26. import org.tio.core.ChannelContext;
  27. import org.tio.core.Tio;
  28. import org.tio.websocket.common.WsPacket;
  29. import org.tio.websocket.common.WsResponse;
  30. import java.util.Arrays;
  31. import java.util.Date;
  32. import java.util.Map;
  33. import java.util.UUID;
  34. import java.util.concurrent.ConcurrentHashMap;
  35. /**
  36. * @author lifang
  37. * @version 1.0.0
  38. * @ClassName ImUtils.java
  39. * @Description TODO
  40. * @createTime 2022年08月22日 16:47:00
  41. */
  42. @Component
  43. @AllArgsConstructor
  44. @Slf4j
  45. public class ImUtils {
  46. private final ImRoomOperatorManager roomOperatorManager;
  47. private final LocalImMsgService imRoomMsgService;
  48. private final LocalImRoomService roomService;
  49. private final LocalImRoomUserService roomUserService;
  50. private final RedissonClient redissonClient;
  51. /**
  52. * 发布主题缓存
  53. */
  54. private final Map<String, RTopic> pubTopicMap =new ConcurrentHashMap<>();
  55. public void send(String roomId, PubMsgInfo pubMsgInfo){
  56. TopicWrapper topicWrapper = WebSocketConstant.getTopic(WebSocketConstant.IM, null, roomId, null);
  57. send(roomId,topicWrapper,pubMsgInfo,null,true);
  58. }
  59. public void send(String roomId, TopicWrapper topicWrapper, PubMsgInfo pubMsgInfo, ChannelContext channelContext, boolean createKey){
  60. String topic = topicWrapper.getTopic();
  61. String param = topicWrapper.getParam();
  62. //消息不可重复发送
  63. ImRoomOperator roomOperator = roomOperatorManager.getRoomOperator(roomId);
  64. //判断消息是否重复发送
  65. if(createKey){
  66. pubMsgInfo.setKey(UUID.randomUUID().toString());
  67. }
  68. if (roomOperator.existMsg(pubMsgInfo.getKey())) {
  69. log.debug("消息{}重复发送",JSONUtil.toJsonStr(pubMsgInfo));
  70. return;
  71. }
  72. pubMsgInfo.setSort(roomOperator.generateSortId());
  73. ImMsgEntity roomMsg = BeanUtil.toBean(pubMsgInfo, ImMsgEntity.class);
  74. String assitId = roomOperator.getAssitId();
  75. if(ObjectUtil.equal(assitId,roomMsg.getSenderId())){
  76. roomMsg.setSponsor(SponsorEnum.assist);
  77. }
  78. if(!pubMsgInfo.isTmp()){
  79. imRoomMsgService.save(roomMsg);
  80. //更新聊天室信息
  81. roomService.update(new UpdateWrapper<ImRoomEntity>()
  82. .lambda()
  83. .eq(ImRoomEntity::getId,pubMsgInfo.getRoomId())
  84. .set(ImRoomEntity::getLastMsgId,roomMsg.getId())
  85. .set(ImRoomEntity::getLastMsgTime,roomMsg.getCreateTime())
  86. .set(ImRoomEntity::getLastSenderId,roomMsg.getSenderId())
  87. .set(ImRoomEntity::getUpdateTime,new Date())
  88. .setSql("total_count=total_count+1"));
  89. roomUserService.update(new UpdateWrapper<ImRoomUserEntity>()
  90. .lambda()
  91. .eq(ImRoomUserEntity::getImRoomId,pubMsgInfo.getRoomId())
  92. .eq(ImRoomUserEntity::getUserId,pubMsgInfo.getSenderId())
  93. .setSql("send_count = send_count+1 "));
  94. }
  95. pubTopicMap.computeIfAbsent(topic, k->redissonClient.getTopic(k))
  96. .publishAsync( TopicMessage.of(roomMsg,param,pubMsgInfo.getKey()))
  97. .whenComplete((r,e)->{
  98. if(e==null){
  99. log.info("发送聊天室消息{}成功",JSONUtil.toJsonStr(roomMsg));
  100. }else{
  101. log.error("发送聊天室消息{}失败,",JSONUtil.toJsonStr(roomMsg),e);
  102. }
  103. if(channelContext!=null){
  104. Tio.send(channelContext, WsResponse.fromText(JSONUtil.toJsonStr(
  105. MessageResponse.of("im","im-result",
  106. param,
  107. PubResponse.success(pubMsgInfo.getKey()),
  108. pubMsgInfo.getKey())
  109. ), WsPacket.CHARSET_NAME));
  110. }
  111. });
  112. if(!pubMsgInfo.isTmp()){
  113. String senderId = pubMsgInfo.getSenderId();
  114. roomOperator.addUnreadMsg(senderId.equals(roomOperator.getAssitId())?roomOperator.getDoctorId():roomOperator.getAssitId(),Arrays.asList(pubMsgInfo.getKey()));
  115. //消息发送给个人
  116. String receiveId=senderId.equals(roomOperator.getAssitId())?roomOperator.getDoctorId():roomOperator.getAssitId();
  117. pubTopicMap.computeIfAbsent( WebSocketConstant.getAllImByUserId(receiveId).getTopic(), k->redissonClient.getTopic(k))
  118. .publishAsync( TopicMessage.of(roomMsg,param,pubMsgInfo.getKey()));
  119. }
  120. }
  121. }