package com.nb.im.utils; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.json.JSONUtil; import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; import com.nb.common.websocket.PubResponse; import com.nb.common.websocket.TopicMessage; import com.nb.common.websocket.WebSocketConstant; import com.nb.common.websocket.handler.TopicWrapper; import com.nb.common.websocket.msg.MessageResponse; import com.nb.im.controller.vo.UnreadVo; import com.nb.im.entity.ImMsgEntity; import com.nb.im.entity.ImRoomEntity; import com.nb.im.entity.ImRoomUserEntity; import com.nb.im.enums.SponsorEnum; import com.nb.im.room.ImRoomOperator; import com.nb.im.room.ImRoomOperatorManager; import com.nb.im.service.LocalImMsgService; import com.nb.im.service.LocalImRoomService; import com.nb.im.service.LocalImRoomUserService; import com.nb.im.ws.PubMsgInfo; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RTopic; import org.redisson.api.RedissonClient; import org.springframework.stereotype.Component; import org.tio.core.ChannelContext; import org.tio.core.Tio; import org.tio.core.utils.TioUtils; import org.tio.websocket.common.WsPacket; import org.tio.websocket.common.WsResponse; import java.util.Arrays; import java.util.Date; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; /** * @author lifang * @version 1.0.0 * @ClassName ImUtils.java * @Description TODO * @createTime 2022年08月22日 16:47:00 */ @Component @AllArgsConstructor @Slf4j public class ImUtils { private final ImRoomOperatorManager roomOperatorManager; private final LocalImMsgService imRoomMsgService; private final LocalImRoomService roomService; private final LocalImRoomUserService roomUserService; private final RedissonClient redissonClient; /** * 发布主题缓存 */ private final Map pubTopicMap =new ConcurrentHashMap<>(); public void send(String roomId, PubMsgInfo pubMsgInfo){ TopicWrapper topicWrapper = WebSocketConstant.getTopic(WebSocketConstant.IM, null, roomId, null); send(roomId,topicWrapper,pubMsgInfo,null,true); } public void send(String roomId, TopicWrapper topicWrapper, PubMsgInfo pubMsgInfo, ChannelContext channelContext, boolean createKey){ String topic = topicWrapper.getTopic(); String param = topicWrapper.getParam(); //消息不可重复发送 ImRoomOperator roomOperator = roomOperatorManager.getRoomOperator(roomId); //判断消息是否重复发送 if(createKey){ pubMsgInfo.setKey(UUID.randomUUID().toString()); } if (roomOperator.existMsg(pubMsgInfo.getKey())) { log.debug("消息{}重复发送",JSONUtil.toJsonStr(pubMsgInfo)); return; } pubMsgInfo.setSort(roomOperator.generateSortId()); ImMsgEntity roomMsg = BeanUtil.toBean(pubMsgInfo, ImMsgEntity.class); String assitId = roomOperator.getAssitId(); if(ObjectUtil.equal(assitId,roomMsg.getSenderId())){ roomMsg.setSponsor(SponsorEnum.assist); } if(!pubMsgInfo.isTmp()){ imRoomMsgService.save(roomMsg); //更新聊天室信息 roomService.update(new UpdateWrapper() .lambda() .eq(ImRoomEntity::getId,pubMsgInfo.getRoomId()) .set(ImRoomEntity::getLastMsgId,roomMsg.getId()) .set(ImRoomEntity::getLastMsgTime,roomMsg.getCreateTime()) .set(ImRoomEntity::getLastSenderId,roomMsg.getSenderId()) .set(ImRoomEntity::getUpdateTime,new Date()) .setSql("total_count=total_count+1")); roomUserService.update(new UpdateWrapper() .lambda() .eq(ImRoomUserEntity::getImRoomId,pubMsgInfo.getRoomId()) .eq(ImRoomUserEntity::getUserId,pubMsgInfo.getSenderId()) .setSql("send_count = send_count+1 ")); } pubTopicMap.computeIfAbsent(topic, k->redissonClient.getTopic(k)) .publishAsync( TopicMessage.of(roomMsg,param,pubMsgInfo.getKey())) .whenComplete((r,e)->{ if(e==null){ log.info("topic:{},发送聊天室消息{}成功",topic,JSONUtil.toJsonStr(roomMsg)); }else{ log.error("topic:{},发送聊天室消息{}失败,",topic,JSONUtil.toJsonStr(roomMsg),e); } if(channelContext!=null){ Tio.send(channelContext, WsResponse.fromText(JSONUtil.toJsonStr( MessageResponse.of("im","im-result", param, PubResponse.success(pubMsgInfo.getKey()), pubMsgInfo.getKey()) ), WsPacket.CHARSET_NAME)); } }); String senderId = pubMsgInfo.getSenderId(); roomOperator.addUnreadMsg(senderId.equals(roomOperator.getAssitId())?roomOperator.getDoctorId():roomOperator.getAssitId(),Arrays.asList(pubMsgInfo.getKey())); //消息发送给个人 String receiveId=senderId.equals(roomOperator.getAssitId())?roomOperator.getDoctorId():roomOperator.getAssitId(); if(!pubMsgInfo.isTmp()){ pubTopicMap.computeIfAbsent( WebSocketConstant.getAllImByUserId(receiveId).getTopic(), k->redissonClient.getTopic(k)) .publishAsync( TopicMessage.of(roomMsg,param,pubMsgInfo.getKey())); } UnreadVo unreadVo = new UnreadVo(); if(StrUtil.equals(pubMsgInfo.getSenderId(),roomOperator.getAssitId())){ unreadVo.setDoctorId(receiveId); }else { unreadVo.setAssistId(receiveId); } Long unReadCount = imRoomMsgService.unReadCount(unreadVo); redissonClient.getTopic(WebSocketConstant.getImUnreadTopic(receiveId).getTopic()).publishAsync(TopicMessage.of(unReadCount,receiveId,pubMsgInfo.getKey())); } }