| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152 |
- package com.nb.im.utils;
- import cn.hutool.core.bean.BeanUtil;
- import cn.hutool.core.util.ObjectUtil;
- import cn.hutool.core.util.StrUtil;
- import cn.hutool.json.JSONUtil;
- import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
- import com.nb.common.websocket.PubResponse;
- import com.nb.common.websocket.TopicMessage;
- import com.nb.common.websocket.WebSocketConstant;
- import com.nb.common.websocket.handler.TopicWrapper;
- import com.nb.common.websocket.msg.MessageResponse;
- import com.nb.im.controller.vo.UnreadVo;
- import com.nb.im.entity.ImMsgEntity;
- import com.nb.im.entity.ImRoomEntity;
- import com.nb.im.entity.ImRoomUserEntity;
- import com.nb.im.enums.SponsorEnum;
- import com.nb.im.room.ImRoomOperator;
- import com.nb.im.room.ImRoomOperatorManager;
- import com.nb.im.service.LocalImMsgService;
- import com.nb.im.service.LocalImRoomService;
- import com.nb.im.service.LocalImRoomUserService;
- import com.nb.im.ws.PubMsgInfo;
- import lombok.AllArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import org.redisson.api.RTopic;
- import org.redisson.api.RedissonClient;
- import org.springframework.stereotype.Component;
- import org.tio.core.ChannelContext;
- import org.tio.core.Tio;
- import org.tio.core.utils.TioUtils;
- import org.tio.websocket.common.WsPacket;
- import org.tio.websocket.common.WsResponse;
- import java.util.Arrays;
- import java.util.Date;
- import java.util.Map;
- import java.util.UUID;
- import java.util.concurrent.ConcurrentHashMap;
- /**
- * @author lifang
- * @version 1.0.0
- * @ClassName ImUtils.java
- * @Description TODO
- * @createTime 2022年08月22日 16:47:00
- */
- @Component
- @AllArgsConstructor
- @Slf4j
- public class ImUtils {
- private final ImRoomOperatorManager roomOperatorManager;
- private final LocalImMsgService imRoomMsgService;
- private final LocalImRoomService roomService;
- private final LocalImRoomUserService roomUserService;
- private final RedissonClient redissonClient;
- /**
- * 发布主题缓存
- */
- private final Map<String, RTopic> pubTopicMap =new ConcurrentHashMap<>();
- public void send(String roomId, PubMsgInfo pubMsgInfo){
- TopicWrapper topicWrapper = WebSocketConstant.getTopic(WebSocketConstant.IM, null, roomId, null);
- send(roomId,topicWrapper,pubMsgInfo,null,true);
- }
- public void send(String roomId, TopicWrapper topicWrapper, PubMsgInfo pubMsgInfo, ChannelContext channelContext, boolean createKey){
- String topic = topicWrapper.getTopic();
- String param = topicWrapper.getParam();
- //消息不可重复发送
- ImRoomOperator roomOperator = roomOperatorManager.getRoomOperator(roomId);
- //判断消息是否重复发送
- if(createKey){
- pubMsgInfo.setKey(UUID.randomUUID().toString());
- }
- if (roomOperator.existMsg(pubMsgInfo.getKey())) {
- log.debug("消息{}重复发送",JSONUtil.toJsonStr(pubMsgInfo));
- return;
- }
- pubMsgInfo.setSort(roomOperator.generateSortId());
- ImMsgEntity roomMsg = BeanUtil.toBean(pubMsgInfo, ImMsgEntity.class);
- String assitId = roomOperator.getAssitId();
- if(ObjectUtil.equal(assitId,roomMsg.getSenderId())){
- roomMsg.setSponsor(SponsorEnum.assist);
- }
- if(!pubMsgInfo.isTmp()){
- imRoomMsgService.save(roomMsg);
- //更新聊天室信息
- roomService.update(new UpdateWrapper<ImRoomEntity>()
- .lambda()
- .eq(ImRoomEntity::getId,pubMsgInfo.getRoomId())
- .set(ImRoomEntity::getLastMsgId,roomMsg.getId())
- .set(ImRoomEntity::getLastMsgTime,roomMsg.getCreateTime())
- .set(ImRoomEntity::getLastSenderId,roomMsg.getSenderId())
- .set(ImRoomEntity::getUpdateTime,new Date())
- .setSql("total_count=total_count+1"));
- roomUserService.update(new UpdateWrapper<ImRoomUserEntity>()
- .lambda()
- .eq(ImRoomUserEntity::getImRoomId,pubMsgInfo.getRoomId())
- .eq(ImRoomUserEntity::getUserId,pubMsgInfo.getSenderId())
- .setSql("send_count = send_count+1 "));
- }
- pubTopicMap.computeIfAbsent(topic, k->redissonClient.getTopic(k))
- .publishAsync( TopicMessage.of(roomMsg,param,pubMsgInfo.getKey()))
- .whenComplete((r,e)->{
- if(e==null){
- log.info("topic:{},发送聊天室消息{}成功",topic,JSONUtil.toJsonStr(roomMsg));
- }else{
- log.error("topic:{},发送聊天室消息{}失败,",topic,JSONUtil.toJsonStr(roomMsg),e);
- }
- if(channelContext!=null){
- Tio.send(channelContext, WsResponse.fromText(JSONUtil.toJsonStr(
- MessageResponse.of("im","im-result",
- param,
- PubResponse.success(pubMsgInfo.getKey()),
- pubMsgInfo.getKey())
- ), WsPacket.CHARSET_NAME));
- }
- });
- String senderId = pubMsgInfo.getSenderId();
- roomOperator.addUnreadMsg(senderId.equals(roomOperator.getAssitId())?roomOperator.getDoctorId():roomOperator.getAssitId(),Arrays.asList(pubMsgInfo.getKey()));
- //消息发送给个人
- String receiveId=senderId.equals(roomOperator.getAssitId())?roomOperator.getDoctorId():roomOperator.getAssitId();
- if(!pubMsgInfo.isTmp()){
- pubTopicMap.computeIfAbsent( WebSocketConstant.getAllImByUserId(receiveId).getTopic(), k->redissonClient.getTopic(k))
- .publishAsync( TopicMessage.of(roomMsg,param,pubMsgInfo.getKey()));
- }
- UnreadVo unreadVo = new UnreadVo();
- if(StrUtil.equals(pubMsgInfo.getSenderId(),roomOperator.getAssitId())){
- unreadVo.setDoctorId(receiveId);
- }else {
- unreadVo.setAssistId(receiveId);
- }
- Long unReadCount = imRoomMsgService.unReadCount(unreadVo);
- redissonClient.getTopic(WebSocketConstant.getImUnreadTopic(receiveId).getTopic()).publishAsync(TopicMessage.of(unReadCount,receiveId,pubMsgInfo.getKey()));
- }
- }
|