ImUtils.java 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. package com.nb.im.utils;
  2. import cn.hutool.core.bean.BeanUtil;
  3. import cn.hutool.core.util.ObjectUtil;
  4. import cn.hutool.core.util.StrUtil;
  5. import cn.hutool.json.JSONUtil;
  6. import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
  7. import com.nb.common.websocket.PubResponse;
  8. import com.nb.common.websocket.TopicMessage;
  9. import com.nb.common.websocket.WebSocketConstant;
  10. import com.nb.common.websocket.handler.TopicWrapper;
  11. import com.nb.common.websocket.msg.MessageResponse;
  12. import com.nb.im.controller.vo.UnreadVo;
  13. import com.nb.im.entity.ImMsgEntity;
  14. import com.nb.im.entity.ImRoomEntity;
  15. import com.nb.im.entity.ImRoomUserEntity;
  16. import com.nb.im.enums.SponsorEnum;
  17. import com.nb.im.room.ImRoomOperator;
  18. import com.nb.im.room.ImRoomOperatorManager;
  19. import com.nb.im.service.LocalImMsgService;
  20. import com.nb.im.service.LocalImRoomService;
  21. import com.nb.im.service.LocalImRoomUserService;
  22. import com.nb.im.ws.PubMsgInfo;
  23. import lombok.AllArgsConstructor;
  24. import lombok.extern.slf4j.Slf4j;
  25. import org.redisson.api.RTopic;
  26. import org.redisson.api.RedissonClient;
  27. import org.springframework.stereotype.Component;
  28. import org.tio.core.ChannelContext;
  29. import org.tio.core.Tio;
  30. import org.tio.core.utils.TioUtils;
  31. import org.tio.websocket.common.WsPacket;
  32. import org.tio.websocket.common.WsResponse;
  33. import java.util.Arrays;
  34. import java.util.Date;
  35. import java.util.Map;
  36. import java.util.UUID;
  37. import java.util.concurrent.ConcurrentHashMap;
  38. /**
  39. * @author lifang
  40. * @version 1.0.0
  41. * @ClassName ImUtils.java
  42. * @Description TODO
  43. * @createTime 2022年08月22日 16:47:00
  44. */
  45. @Component
  46. @AllArgsConstructor
  47. @Slf4j
  48. public class ImUtils {
  49. private final ImRoomOperatorManager roomOperatorManager;
  50. private final LocalImMsgService imRoomMsgService;
  51. private final LocalImRoomService roomService;
  52. private final LocalImRoomUserService roomUserService;
  53. private final RedissonClient redissonClient;
  54. /**
  55. * 发布主题缓存
  56. */
  57. private final Map<String, RTopic> pubTopicMap =new ConcurrentHashMap<>();
  58. public void send(String roomId, PubMsgInfo pubMsgInfo){
  59. TopicWrapper topicWrapper = WebSocketConstant.getTopic(WebSocketConstant.IM, null, roomId, null);
  60. send(roomId,topicWrapper,pubMsgInfo,null,true);
  61. }
  62. public void send(String roomId, TopicWrapper topicWrapper, PubMsgInfo pubMsgInfo, ChannelContext channelContext, boolean createKey){
  63. String topic = topicWrapper.getTopic();
  64. String param = topicWrapper.getParam();
  65. //消息不可重复发送
  66. ImRoomOperator roomOperator = roomOperatorManager.getRoomOperator(roomId);
  67. //判断消息是否重复发送
  68. if(createKey){
  69. pubMsgInfo.setKey(UUID.randomUUID().toString());
  70. }
  71. if (roomOperator.existMsg(pubMsgInfo.getKey())) {
  72. log.debug("消息{}重复发送",JSONUtil.toJsonStr(pubMsgInfo));
  73. return;
  74. }
  75. pubMsgInfo.setSort(roomOperator.generateSortId());
  76. ImMsgEntity roomMsg = BeanUtil.toBean(pubMsgInfo, ImMsgEntity.class);
  77. String assitId = roomOperator.getAssitId();
  78. if(ObjectUtil.equal(assitId,roomMsg.getSenderId())){
  79. roomMsg.setSponsor(SponsorEnum.assist);
  80. }
  81. if(!pubMsgInfo.isTmp()){
  82. imRoomMsgService.save(roomMsg);
  83. //更新聊天室信息
  84. roomService.update(new UpdateWrapper<ImRoomEntity>()
  85. .lambda()
  86. .eq(ImRoomEntity::getId,pubMsgInfo.getRoomId())
  87. .set(ImRoomEntity::getLastMsgId,roomMsg.getId())
  88. .set(ImRoomEntity::getLastMsgTime,roomMsg.getCreateTime())
  89. .set(ImRoomEntity::getLastSenderId,roomMsg.getSenderId())
  90. .set(ImRoomEntity::getUpdateTime,new Date())
  91. .setSql("total_count=total_count+1"));
  92. roomUserService.update(new UpdateWrapper<ImRoomUserEntity>()
  93. .lambda()
  94. .eq(ImRoomUserEntity::getImRoomId,pubMsgInfo.getRoomId())
  95. .eq(ImRoomUserEntity::getUserId,pubMsgInfo.getSenderId())
  96. .setSql("send_count = send_count+1 "));
  97. }
  98. pubTopicMap.computeIfAbsent(topic, k->redissonClient.getTopic(k))
  99. .publishAsync( TopicMessage.of(roomMsg,param,pubMsgInfo.getKey()))
  100. .whenComplete((r,e)->{
  101. if(e==null){
  102. log.info("topic:{},发送聊天室消息{}成功",topic,JSONUtil.toJsonStr(roomMsg));
  103. }else{
  104. log.error("topic:{},发送聊天室消息{}失败,",topic,JSONUtil.toJsonStr(roomMsg),e);
  105. }
  106. if(channelContext!=null){
  107. Tio.send(channelContext, WsResponse.fromText(JSONUtil.toJsonStr(
  108. MessageResponse.of("im","im-result",
  109. param,
  110. PubResponse.success(pubMsgInfo.getKey()),
  111. pubMsgInfo.getKey())
  112. ), WsPacket.CHARSET_NAME));
  113. }
  114. });
  115. String senderId = pubMsgInfo.getSenderId();
  116. roomOperator.addUnreadMsg(senderId.equals(roomOperator.getAssitId())?roomOperator.getDoctorId():roomOperator.getAssitId(),Arrays.asList(pubMsgInfo.getKey()));
  117. //消息发送给个人
  118. String receiveId=senderId.equals(roomOperator.getAssitId())?roomOperator.getDoctorId():roomOperator.getAssitId();
  119. if(!pubMsgInfo.isTmp()){
  120. pubTopicMap.computeIfAbsent( WebSocketConstant.getAllImByUserId(receiveId).getTopic(), k->redissonClient.getTopic(k))
  121. .publishAsync( TopicMessage.of(roomMsg,param,pubMsgInfo.getKey()));
  122. }
  123. UnreadVo unreadVo = new UnreadVo();
  124. if(StrUtil.equals(pubMsgInfo.getSenderId(),roomOperator.getAssitId())){
  125. unreadVo.setDoctorId(receiveId);
  126. }else {
  127. unreadVo.setAssistId(receiveId);
  128. }
  129. Long unReadCount = imRoomMsgService.unReadCount(unreadVo);
  130. redissonClient.getTopic(WebSocketConstant.getImUnreadTopic(receiveId).getTopic()).publishAsync(TopicMessage.of(unReadCount,receiveId,pubMsgInfo.getKey()));
  131. }
  132. }