Ver código fonte

update im+ws即时通信
add im消息去重 Im消息有序

18339543638 3 anos atrás
pai
commit
58fe0e5d65
24 arquivos alterados com 392 adições e 182 exclusões
  1. 1 1
      nb-admin/src/test/java/com/nb/admin/SpringBootApplicationTests.java
  2. 2 1
      nb-common/ws-common/src/main/java/com/nb/common/websocket/DefaultMessageListener.java
  3. 1 0
      nb-common/ws-common/src/main/java/com/nb/common/websocket/DefaultWebSocketMsgHandler.java
  4. 5 5
      nb-common/ws-common/src/main/java/com/nb/common/websocket/PubResponse.java
  5. 3 3
      nb-common/ws-common/src/main/java/com/nb/common/websocket/TopicMessage.java
  6. 1 1
      nb-common/ws-common/src/main/java/com/nb/common/websocket/event/PubMsgEvent.java
  7. 3 5
      nb-common/ws-common/src/main/java/com/nb/common/websocket/filter/DefaultPubMsgFilter.java
  8. 1 1
      nb-common/ws-common/src/main/java/com/nb/common/websocket/filter/PubMsgFilter.java
  9. 1 2
      nb-common/ws-common/src/main/java/com/nb/common/websocket/handler/DefaultCloseHandler.java
  10. 7 138
      nb-common/ws-common/src/main/java/com/nb/common/websocket/handler/Subscribe.java
  11. 1 1
      nb-common/ws-common/src/main/java/com/nb/common/websocket/handler/WsHandler.java
  12. 2 2
      nb-common/ws-common/src/main/java/com/nb/common/websocket/msg/MessageResponse.java
  13. 3 4
      nb-common/ws-common/src/main/java/com/nb/common/websocket/msg/MessagingRequest.java
  14. 87 0
      nb-common/ws-common/src/main/java/com/nb/common/websocket/msg/handler/IMsgRequestHandler.java
  15. 22 0
      nb-common/ws-common/src/main/java/com/nb/common/websocket/msg/handler/MsgRequestHandlerManager.java
  16. 39 0
      nb-common/ws-common/src/main/java/com/nb/common/websocket/msg/handler/SubMsgRequestHandler.java
  17. 57 0
      nb-common/ws-common/src/main/java/com/nb/common/websocket/msg/handler/UnSubMsgRequestHandler.java
  18. 2 2
      nb-im/src/main/java/com/nb/im/listener/PubMsgListener.java
  19. 51 0
      nb-im/src/main/java/com/nb/im/room/ImRoomOperator.java
  20. 26 0
      nb-im/src/main/java/com/nb/im/room/ImRoomOperatorManager.java
  21. 6 10
      nb-im/src/main/java/com/nb/im/ws/PubMsgInfo.java
  22. 68 0
      nb-im/src/main/java/com/nb/im/ws/PubMsgRequestHandler.java
  23. 3 5
      nb-im/src/main/java/com/nb/im/ws/filter/MsgFormatFilter.java
  24. 0 1
      nb-service/web-service/src/main/java/com/nb/web/service/bus/websocket/DefaultHisMsgHandler.java

+ 1 - 1
nb-admin/src/test/java/com/nb/admin/SpringBootApplicationTests.java

@@ -32,7 +32,7 @@ public class SpringBootApplicationTests {
     HospitalManagerRegister hospitalManagerRegister;
     @Test
     public void autoUndo(){
-        String json="{\"msgId\":1551842476831064066,\"body\":{\"nativeValue\":{\"uploadTime\":1658823052919,\"deviceId\":\"43165411383902B1\",\"infusionId\":\"1551842476755566594\",\"tenantId\":\"1544525896866643970\",\"patientCode\":\"1000000000000\",\"timestamp\":1658823052972}},\"handlerId\":\"no_signal\",\"properties\":{\"expire\":30,\"timeUnit\":\"MINUTES\"}}";
+        String json="{\"key\":1551842476831064066,\"body\":{\"nativeValue\":{\"uploadTime\":1658823052919,\"deviceId\":\"43165411383902B1\",\"infusionId\":\"1551842476755566594\",\"tenantId\":\"1544525896866643970\",\"patientCode\":\"1000000000000\",\"timestamp\":1658823052972}},\"handlerId\":\"no_signal\",\"properties\":{\"expire\":30,\"timeUnit\":\"MINUTES\"}}";
         DelayMessage delayMessage = JSONUtil.parseObj(json).toBean(DelayMessage.class);
         String body="{\"uploadTime\":1658823052919,\"deviceId\":\"43165411383902B1\",\"infusionId\":\"1551842476755566594\",\"tenantId\":\"1544525896866643970\",\"patientCode\":\"1000000000000\",\"timestamp\":1658823052972}";
         HospitalDeviceAutoUndoConfigHandler.UndoEntity undoEntity = JSONUtil.parseObj(body).toBean(HospitalDeviceAutoUndoConfigHandler.UndoEntity.class);

+ 2 - 1
nb-common/ws-common/src/main/java/com/nb/common/websocket/DefaultMessageListener.java

@@ -3,6 +3,7 @@ package com.nb.common.websocket;
 import cn.hutool.json.JSONUtil;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.nb.common.websocket.msg.MessageResponse;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.redisson.api.RPatternTopic;
@@ -35,7 +36,7 @@ public class DefaultMessageListener implements PatternMessageListener<TopicMessa
             String json = null;
             try {
                 json = objectMapper.writeValueAsString(MessageResponse.of(id, "result", msg.getParam(),
-                        msg.getMessage(),msg.getMsgId()));
+                        msg.getMessage(),msg.getKey()));
                 Tio.send(channelContext, WsResponse.fromText(json, WsPacket.CHARSET_NAME));
                 if(log.isDebugEnabled()){
                     log.debug("通道【{}】发送消息【{}】",channelContext.toString(),json);

+ 1 - 0
nb-common/ws-common/src/main/java/com/nb/common/websocket/DefaultWebSocketMsgHandler.java

@@ -8,6 +8,7 @@ import cn.hutool.json.JSONUtil;
 import com.nb.auth.bean.LoginUser;
 import com.nb.common.websocket.event.ConnectedEvent;
 import com.nb.common.websocket.handler.HisMsgHandler;
+import com.nb.common.websocket.msg.MessagingRequest;
 import com.nb.common.websocket.ws.IWebSocketAuthFilter;
 import com.nb.core.Constants;
 import com.nb.common.websocket.handler.WsHandler;

+ 5 - 5
nb-common/ws-common/src/main/java/com/nb/common/websocket/PubResponse.java

@@ -15,16 +15,16 @@ import java.io.Serializable;
 @Data
 @AllArgsConstructor(staticName = "of")
 public class PubResponse implements Serializable {
-    private String msgId;
+    private Long key;
     private boolean success;
     private String errorMsg;
 
 
-    public static PubResponse success(String msgId){
-        return PubResponse.of(msgId,true,"");
+    public static PubResponse success(Long key){
+        return PubResponse.of(key,true,"");
     }
 
-    public static PubResponse fail(String msgId,String errorMsg){
-        return PubResponse.of(msgId,false,errorMsg);
+    public static PubResponse fail(Long key,String errorMsg){
+        return PubResponse.of(key,false,errorMsg);
     }
 }

+ 3 - 3
nb-common/ws-common/src/main/java/com/nb/common/websocket/TopicMessage.java

@@ -12,7 +12,7 @@ import java.io.Serializable;
 public class TopicMessage implements Serializable {
     private Object message;
     private String param;
-    private String msgId;
+    private Long key;
 
 
     public static TopicMessage of(Object message,String param){
@@ -20,8 +20,8 @@ public class TopicMessage implements Serializable {
     }
 
 
-    public static TopicMessage of(Object message,String param,String msgId){
-        return new TopicMessage(message,param,msgId);
+    public static TopicMessage of(Object message,String param,Long key){
+        return new TopicMessage(message,param,key);
     }
 
 }

+ 1 - 1
nb-common/ws-common/src/main/java/com/nb/common/websocket/event/PubMsgEvent.java

@@ -1,6 +1,6 @@
 package com.nb.common.websocket.event;
 
-import com.nb.common.websocket.MessagingRequest;
+import com.nb.common.websocket.msg.MessagingRequest;
 import lombok.Getter;
 import org.springframework.context.ApplicationEvent;
 

+ 3 - 5
nb-common/ws-common/src/main/java/com/nb/common/websocket/filter/DefaultPubMsgFilter.java

@@ -1,11 +1,9 @@
 package com.nb.common.websocket.filter;
 
 import cn.hutool.core.util.ObjectUtil;
-import cn.hutool.core.util.StrUtil;
 import cn.hutool.json.JSONUtil;
-import com.nb.common.websocket.MessagingRequest;
+import com.nb.common.websocket.msg.MessagingRequest;
 import com.nb.common.websocket.PubResponse;
-import com.nb.core.result.R;
 import org.springframework.stereotype.Component;
 import org.tio.core.ChannelContext;
 import org.tio.core.Tio;
@@ -23,8 +21,8 @@ import org.tio.websocket.common.WsResponse;
 public class DefaultPubMsgFilter implements PubMsgFilter {
     @Override
     public boolean doFilter(ChannelContext channelContext, MessagingRequest source) {
-        if(StrUtil.isEmpty(source.getMsgId())){
-            Tio.send(channelContext, WsResponse.fromText(JSONUtil.toJsonStr(PubResponse.fail("","发布消息时msgId不能为空")), WsPacket.CHARSET_NAME));
+        if(source.getKey()!=null){
+            Tio.send(channelContext, WsResponse.fromText(JSONUtil.toJsonStr(PubResponse.fail(-1L,"发布消息时key不能为空")), WsPacket.CHARSET_NAME));
             return false;
         }
         return ObjectUtil.isNotNull(source.getPayload());

+ 1 - 1
nb-common/ws-common/src/main/java/com/nb/common/websocket/filter/PubMsgFilter.java

@@ -1,6 +1,6 @@
 package com.nb.common.websocket.filter;
 
-import com.nb.common.websocket.MessagingRequest;
+import com.nb.common.websocket.msg.MessagingRequest;
 import org.tio.core.ChannelContext;
 
 /**

+ 1 - 2
nb-common/ws-common/src/main/java/com/nb/common/websocket/handler/DefaultCloseHandler.java

@@ -2,8 +2,7 @@ package com.nb.common.websocket.handler;
 
 import cn.hutool.extra.spring.SpringUtil;
 import com.nb.auth.bean.LoginUser;
-import com.nb.common.websocket.MessagingRequest;
-import com.nb.common.websocket.event.ConnectedEvent;
+import com.nb.common.websocket.msg.MessagingRequest;
 import com.nb.common.websocket.event.DisConnectedEvent;
 import org.springframework.stereotype.Component;
 import org.tio.core.ChannelContext;

+ 7 - 138
nb-common/ws-common/src/main/java/com/nb/common/websocket/handler/Subscribe.java

@@ -1,29 +1,18 @@
 package com.nb.common.websocket.handler;
 
 import cn.hutool.core.collection.CollectionUtil;
-import cn.hutool.core.util.StrUtil;
-import cn.hutool.extra.spring.SpringUtil;
 import cn.hutool.json.JSONUtil;
 import com.nb.auth.bean.LoginUser;
 import com.nb.common.websocket.*;
-import com.nb.common.websocket.event.PubMsgEvent;
-import com.nb.common.websocket.filter.PubMsgFilter;
+import com.nb.common.websocket.msg.handler.IMsgRequestHandler;
+import com.nb.common.websocket.msg.MessagingRequest;
+import com.nb.common.websocket.msg.handler.MsgRequestHandlerManager;
 import com.nb.core.Constants;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.nb.core.result.R;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.tomcat.util.descriptor.web.WebXml;
-import org.redisson.api.RPatternTopic;
-import org.redisson.api.RTopic;
-import org.redisson.api.RedissonClient;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.tio.core.ChannelContext;
-import org.tio.core.Tio;
-import org.tio.websocket.common.WsPacket;
-import org.tio.websocket.common.WsResponse;
 
 import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 /**
@@ -35,23 +24,9 @@ import java.util.stream.Collectors;
  */
 @Slf4j
 public abstract class Subscribe implements WsHandler {
-    @Autowired
-    private ObjectMapper objectMapper;
 
     @Autowired
-    private RedissonClient redissonClient;
-
-    @Autowired(required = false)
-    private List<PubMsgFilter> msgFilters;
-    /**
-     * 订阅主题缓存
-     */
-    private static Map<String, RPatternTopic> subTopicMap =new ConcurrentHashMap<>();
-
-    /**
-     * 发布主题缓存
-     */
-    private static Map<String, RTopic> pubTopicMap =new ConcurrentHashMap<>();
+    private MsgRequestHandlerManager requestHandlerManager;
 
     public TopicWrapper getTopic(String productName,String param,String tenantId){
         return  WebSocketConstant.getTopic(this.getId(),productName, param, tenantId);
@@ -81,118 +56,12 @@ public abstract class Subscribe implements WsHandler {
         else {
             subScribeTopic = Collections.singletonList(getTopic(message.getProductName(), null, message.getTenantId()));
         }
-
-        MessagingRequest.Type type = message.getType();
-        if(MessagingRequest.Type.sub.equals(type)){
-            //订阅主题
-            subScribeTopic.forEach(topicWrapper->this.subscribe(channelContext,topicWrapper));
-        }else if(MessagingRequest.Type.unsub.equals(type)){
-            //取消订阅主题
-            subScribeTopic.forEach(topicWrapper->this.unsubscribe(channelContext,topicWrapper.getTopic()));
-        }else {
-            subScribeTopic.forEach(topicWrapper->this.publish(channelContext,message,topicWrapper.getTopic(),topicWrapper.getParam()));
+        IMsgRequestHandler handle = requestHandlerManager.getHandle(message.getType());
+        if(handle!=null){
+            subScribeTopic.forEach(topicWrapper->handle.handler(message,channelContext,topicWrapper));
         }
         if(log.isDebugEnabled()){
             log.debug("ws数据处理成功{}", JSONUtil.toJsonStr(message));
         }
     }
-
-    /**
-     * 描述: 推送主题
-     * @author lifang
-     * @date 2022/8/16 14:04
-     * @param channelContext 发布推送消息的通道
-     * @param message
-     * @param topic
-     * @param param
-     * @return void
-     */
-    private void publish(ChannelContext channelContext, MessagingRequest message, String topic, String param) {
-        if(CollectionUtil.isNotEmpty(msgFilters)){
-            boolean result = msgFilters.stream()
-                    .allMatch(filter -> filter.doFilter(channelContext, message));
-            if(result){
-                SpringUtil.publishEvent(new PubMsgEvent(this,message));
-                pubTopicMap.computeIfAbsent(topic, k->redissonClient.getTopic(k))
-                        .publishAsync( TopicMessage.of(message.getPayload(),param,message.getMsgId()))
-                        .whenComplete((r,e)->{
-                            Tio.send(channelContext, WsResponse.fromText(JSONUtil.toJsonStr(PubResponse.success(message.getMsgId())), WsPacket.CHARSET_NAME));
-                        });
-
-            }
-        }
-    }
-
-    /**
-     * ws 订阅主题
-     * @param channelContext
-     * @param topicWrapper
-     */
-    public void subscribe(ChannelContext channelContext, TopicWrapper topicWrapper){
-        getChannelTopic(channelContext).add(topicWrapper.getTopic());
-        //同一主题只订阅一次
-        RPatternTopic rTopic = subTopicMap.computeIfAbsent(topicWrapper.getTopic(), topic->redissonClient.getPatternTopic(topicWrapper.getTopic()));
-        addTopicListener(rTopic,channelContext, topicWrapper.getTopic());
-    };
-
-    /**
-     * ws取消订阅主题
-     * @param channelContext
-     * @param topic
-     */
-    public void unsubscribe(ChannelContext channelContext, String topic){
-        if(StrUtil.isEmpty(topic)){
-            return;
-        }
-        Set<String> allTopics=new HashSet<>();
-
-        if(topic.startsWith("all")){
-            allTopics.addAll(getChannelTopic(channelContext));
-        }
-
-        //取消订阅
-        for (String subTopic : allTopics) {
-            subTopicMap.computeIfPresent(subTopic,(k, rTopic)->{
-                rTopic.removeListener( getTopicListener(channelContext,k));
-                return rTopic;
-            });
-            Map<String, DefaultMessageListener> topicListeners = getTopicListeners(channelContext);
-            topicListeners.remove(subTopic);
-        }
-
-    };
-
-
-    public Map<String,DefaultMessageListener> getTopicListeners(ChannelContext channelContext){
-        Object result = Optional.ofNullable(channelContext.get("topic")).orElse(new HashMap<String,DefaultMessageListener>());
-        channelContext.set("topic",result);
-        return  (Map<String,DefaultMessageListener>) result;
-    }
-
-    public DefaultMessageListener getTopicListener(ChannelContext channelContext,String topic){
-        Map<String,DefaultMessageListener> result = (Map<String, DefaultMessageListener>) Optional.ofNullable(channelContext.get("topic")).orElse(new HashMap<>());
-        return  result.get(topic);
-    }
-
-    public DefaultMessageListener addTopicListener(RPatternTopic rTopic,ChannelContext channelContext,String topic){
-        Map<String, DefaultMessageListener> topicByChannel = getTopicListeners(channelContext);
-        DefaultMessageListener messageListener = topicByChannel.computeIfAbsent(topic, k -> {
-            DefaultMessageListener defaultMessageListener = new DefaultMessageListener(getId(), objectMapper, channelContext,rTopic);
-            rTopic.addListenerAsync(TopicMessage.class, defaultMessageListener);
-            return defaultMessageListener;
-        });
-        return  messageListener;
-    }
-
-    private Set<String> getChannelTopic(ChannelContext channelContext){
-        Set<String> topics=null;
-        Object topicSet = channelContext.getAttribute("subtopic");
-        if(topicSet==null){
-            topics=new HashSet<>();
-        }else {
-            topics= (Set<String>) topicSet;
-        }
-        channelContext.setAttribute("subtopic",topics);
-        return topics;
-    }
 }

+ 1 - 1
nb-common/ws-common/src/main/java/com/nb/common/websocket/handler/WsHandler.java

@@ -1,6 +1,6 @@
 package com.nb.common.websocket.handler;
 
-import com.nb.common.websocket.MessagingRequest;
+import com.nb.common.websocket.msg.MessagingRequest;
 import org.tio.core.ChannelContext;
 
 /**

+ 2 - 2
nb-common/ws-common/src/main/java/com/nb/common/websocket/MessageResponse.java → nb-common/ws-common/src/main/java/com/nb/common/websocket/msg/MessageResponse.java

@@ -1,4 +1,4 @@
-package com.nb.common.websocket;
+package com.nb.common.websocket.msg;
 
 import lombok.AllArgsConstructor;
 import lombok.Data;
@@ -19,5 +19,5 @@ public class MessageResponse implements Serializable {
     private String type;
     private String param;
     private Object payload;
-    private String msgId;
+    private Long key;
 }

+ 3 - 4
nb-common/ws-common/src/main/java/com/nb/common/websocket/MessagingRequest.java → nb-common/ws-common/src/main/java/com/nb/common/websocket/msg/MessagingRequest.java

@@ -1,6 +1,5 @@
-package com.nb.common.websocket;
+package com.nb.common.websocket.msg;
 
-import cn.hutool.core.util.StrUtil;
 import lombok.Data;
 
 import java.io.Serializable;
@@ -9,9 +8,9 @@ import java.util.*;
 public class MessagingRequest implements Serializable {
 
     /**
-     * 消息id
+     * 唯一消息id
      */
-    private String msgId;
+    private Long key;
 
     /**
      * 心跳、设备或报警信息,例如ping、device,alarm,patient

+ 87 - 0
nb-common/ws-common/src/main/java/com/nb/common/websocket/msg/handler/IMsgRequestHandler.java

@@ -0,0 +1,87 @@
+package com.nb.common.websocket.msg.handler;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.nb.common.websocket.DefaultMessageListener;
+import com.nb.common.websocket.TopicMessage;
+import com.nb.common.websocket.handler.TopicWrapper;
+import com.nb.common.websocket.msg.MessagingRequest;
+import org.redisson.api.RPatternTopic;
+import org.redisson.api.RTopic;
+import org.tio.core.ChannelContext;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName MsgRequestHandler.java
+ * @Description 消息处理器
+ * @createTime 2022年08月18日 18:33:00
+ */
+public interface IMsgRequestHandler {
+    /**
+     * 订阅主题缓存
+     */
+    Map<String, RPatternTopic> subTopicMap =new ConcurrentHashMap<>();
+    /**
+     * 发布主题缓存
+     */
+    Map<String, RTopic> pubTopicMap =new ConcurrentHashMap<>();
+
+    MessagingRequest.Type getType();
+    /**
+     * 描述:
+     * @author lifang
+     * @date 2022/8/18 18:35
+     * @param message 发送消息内容
+     * @param channelContext 发送消息通道
+     * @param topicWrapper 主题
+     * @return void
+     */
+    void handler(MessagingRequest message, ChannelContext channelContext, TopicWrapper topicWrapper);
+
+    /**
+     * 描述: 获取序列化方式
+     * @author lifang
+     * @date 2022/8/18 18:51
+     * @param
+     * @return ObjectMapper
+     */
+    ObjectMapper getObjectMapper();
+
+    default Set<String> getChannelTopic(ChannelContext channelContext){
+        Set<String> topics=null;
+        Object topicSet = channelContext.getAttribute("subtopic");
+        if(topicSet==null){
+            topics=new HashSet<>();
+        }else {
+            topics= (Set<String>) topicSet;
+        }
+        channelContext.setAttribute("subtopic",topics);
+        return topics;
+    }
+
+    default DefaultMessageListener addTopicListener(RPatternTopic rTopic, ChannelContext channelContext, String topic,String msgId){
+        Map<String, DefaultMessageListener> topicByChannel = getTopicListeners(channelContext);
+        DefaultMessageListener messageListener = topicByChannel.computeIfAbsent(topic, k -> {
+            DefaultMessageListener defaultMessageListener = new DefaultMessageListener(msgId, getObjectMapper(), channelContext,rTopic);
+            rTopic.addListenerAsync(TopicMessage.class, defaultMessageListener);
+            return defaultMessageListener;
+        });
+        return  messageListener;
+    }
+
+    default Map<String,DefaultMessageListener> getTopicListeners(ChannelContext channelContext){
+        Object result = Optional.ofNullable(channelContext.get("topic")).orElse(new HashMap<String,DefaultMessageListener>());
+        channelContext.set("topic",result);
+        return  (Map<String,DefaultMessageListener>) result;
+    }
+
+
+    default DefaultMessageListener getTopicListener(ChannelContext channelContext,String topic){
+        Map<String,DefaultMessageListener> result = (Map<String, DefaultMessageListener>) Optional.ofNullable(channelContext.get("topic")).orElse(new HashMap<>());
+        return  result.get(topic);
+    }
+
+}

+ 22 - 0
nb-common/ws-common/src/main/java/com/nb/common/websocket/msg/handler/MsgRequestHandlerManager.java

@@ -0,0 +1,22 @@
+package com.nb.common.websocket.msg.handler;
+
+import com.nb.common.websocket.msg.MessagingRequest;
+import lombok.AllArgsConstructor;
+import org.springframework.stereotype.Component;
+import java.util.*;
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName MsgRequestHandlerManager.java
+ * @Description 消息处理管理器
+ * @createTime 2022年08月18日 18:42:00
+ */
+@AllArgsConstructor
+@Component
+public class MsgRequestHandlerManager {
+    private final List<IMsgRequestHandler>  handlers;
+
+    public IMsgRequestHandler getHandle(MessagingRequest.Type type){
+        return handlers.stream().filter(handler->type.equals(handler.getType())).findAny().orElse(null);
+    }
+}

+ 39 - 0
nb-common/ws-common/src/main/java/com/nb/common/websocket/msg/handler/SubMsgRequestHandler.java

@@ -0,0 +1,39 @@
+package com.nb.common.websocket.msg.handler;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.nb.common.websocket.handler.TopicWrapper;
+import com.nb.common.websocket.msg.MessagingRequest;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import org.redisson.api.RPatternTopic;
+import org.redisson.api.RedissonClient;
+import org.springframework.stereotype.Component;
+import org.tio.core.ChannelContext;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName SubMsgRequestHandler.java
+ * @Description TODO
+ * @createTime 2022年08月18日 18:47:00
+ */
+@Component
+@AllArgsConstructor
+public class SubMsgRequestHandler implements IMsgRequestHandler {
+    @Getter
+    private final ObjectMapper objectMapper;
+
+    private final RedissonClient redissonClient;
+    @Override
+    public MessagingRequest.Type getType() {
+        return MessagingRequest.Type.unsub;
+    }
+
+    @Override
+    public void handler(MessagingRequest message, ChannelContext channelContext, TopicWrapper topicWrapper) {
+        getChannelTopic(channelContext).add(topicWrapper.getTopic());
+        //同一主题只订阅一次
+        RPatternTopic rTopic = subTopicMap.computeIfAbsent(topicWrapper.getTopic(), topic->redissonClient.getPatternTopic(topicWrapper.getTopic()));
+        addTopicListener(rTopic,channelContext, topicWrapper.getTopic(),message.getId());
+    }
+}

+ 57 - 0
nb-common/ws-common/src/main/java/com/nb/common/websocket/msg/handler/UnSubMsgRequestHandler.java

@@ -0,0 +1,57 @@
+package com.nb.common.websocket.msg.handler;
+
+import cn.hutool.core.util.StrUtil;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.nb.common.websocket.DefaultMessageListener;
+import com.nb.common.websocket.handler.TopicWrapper;
+import com.nb.common.websocket.msg.MessagingRequest;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import org.springframework.stereotype.Component;
+import org.tio.core.ChannelContext;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName SubMsgRequestHandler.java
+ * @Description TODO
+ * @createTime 2022年08月18日 18:47:00
+ */
+@Component
+@AllArgsConstructor
+public class UnSubMsgRequestHandler implements IMsgRequestHandler {
+    @Getter
+    private final ObjectMapper objectMapper;
+
+    @Override
+    public MessagingRequest.Type getType() {
+        return MessagingRequest.Type.sub;
+    }
+
+    @Override
+    public void handler(MessagingRequest message, ChannelContext channelContext, TopicWrapper topicWrapper) {
+        String topic = topicWrapper.getTopic();
+        if(StrUtil.isEmpty(topic)){
+            return;
+        }
+        Set<String> allTopics=new HashSet<>();
+
+        if(topic.startsWith("all")){
+            allTopics.addAll(getChannelTopic(channelContext));
+        }
+
+        //取消订阅
+        for (String subTopic : allTopics) {
+            subTopicMap.computeIfPresent(subTopic,(k, rTopic)->{
+                rTopic.removeListener( getTopicListener(channelContext,k));
+                return rTopic;
+            });
+            Map<String, DefaultMessageListener> topicListeners = getTopicListeners(channelContext);
+            topicListeners.remove(subTopic);
+        }
+    }
+}

+ 2 - 2
nb-im/src/main/java/com/nb/im/listener/PubMsgListener.java

@@ -3,7 +3,7 @@ package com.nb.im.listener;
 import cn.hutool.core.bean.BeanUtil;
 import cn.hutool.json.JSONUtil;
 import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
-import com.nb.common.websocket.MessagingRequest;
+import com.nb.common.websocket.msg.MessagingRequest;
 import com.nb.common.websocket.event.PubMsgEvent;
 import com.nb.im.entity.ImRoomEntity;
 import com.nb.im.entity.ImMsgEntity;
@@ -46,7 +46,7 @@ public class PubMsgListener {
         //更新聊天室信息
         roomService.update(new UpdateWrapper<ImRoomEntity>()
                 .lambda()
-                .eq(ImRoomEntity::getId,pubMsgInfo.getChatRoomId())
+                .eq(ImRoomEntity::getId,pubMsgInfo.getRoomId())
                 .set(ImRoomEntity::getLastMsgId,roomMsg.getId())
                 .set(ImRoomEntity::getLastMsgTime,roomMsg.getCreateTime())
                 .setSql("total_count=total_count+1"));

+ 51 - 0
nb-im/src/main/java/com/nb/im/room/ImRoomOperator.java

@@ -0,0 +1,51 @@
+package com.nb.im.room;
+
+import org.redisson.api.RAtomicLong;
+import org.redisson.api.RBitSet;
+import org.redisson.api.RedissonClient;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName ImRoomOperator.java
+ * @Description TODO
+ * @createTime 2022年08月18日 19:05:00
+ */
+public class ImRoomOperator {
+    private RBitSet bitSet;
+    private RAtomicLong atomicLong;
+
+    public ImRoomOperator(RedissonClient redissonClient, String id) {
+        this.bitSet = redissonClient.getBitSet("im:bit."+id);
+        this.atomicLong = redissonClient.getAtomicLong("im:incr." + id);
+    }
+
+    /**
+     * 描述:生成聊天室消息顺序id
+     * @author lifang
+     * @date 2022/8/18 19:23
+     * @param
+     * @return Long
+     */
+    public Long generateSortId(){
+        return atomicLong.addAndGet(1);
+    }
+
+
+    /**
+     * 描述:去重生成聊天室顺序唯一id
+     * @author lifang
+     * @date 2022/8/18 19:23
+     * @param key
+     * @return boolean
+     */
+    public boolean existMsg(Long key){
+        return  bitSet.get(key);
+    }
+
+    public void close(){
+        bitSet.deleteAsync();
+        atomicLong.deleteAsync();
+    }
+
+}

+ 26 - 0
nb-im/src/main/java/com/nb/im/room/ImRoomOperatorManager.java

@@ -0,0 +1,26 @@
+package com.nb.im.room;
+
+import com.nb.core.cache.manager.ClusterStorageManager;
+import com.nb.core.cache.manager.ConfigStorageManager;
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName ImRoomManager.java
+ * @Description 聊天室管理器
+ * @createTime 2022年08月18日 19:04:00
+ */
+@Component
+public class ImRoomOperatorManager {
+
+    private Map<String,ImRoomOperator> imRoomOperatorMap;
+
+    private ConfigStorageManager storageManager;
+
+    public ImRoomOperator getRoomOperator(String roomId){
+        return imRoomOperatorMap.computeIfAbsent(roomId,k->null);
+    }
+}

+ 6 - 10
nb-im/src/main/java/com/nb/im/ws/PubMsgInfo.java

@@ -2,8 +2,6 @@ package com.nb.im.ws;
 
 import cn.hutool.core.util.ObjectUtil;
 import cn.hutool.core.util.StrUtil;
-import cn.hutool.json.JSONUtil;
-import com.nb.common.websocket.MessagingRequest;
 import com.nb.im.enums.ImMsgType;
 import com.nb.im.enums.SponsorEnum;
 import io.swagger.annotations.ApiModelProperty;
@@ -11,8 +9,6 @@ import lombok.Data;
 import org.springframework.validation.annotation.Validated;
 
 import java.io.Serializable;
-import java.util.Arrays;
-import java.util.UUID;
 
 /**
  * @author lifang
@@ -24,11 +20,11 @@ import java.util.UUID;
 @Data
 @Validated
 public class PubMsgInfo implements Serializable {
-    @ApiModelProperty("客户端生产的唯一消息id")
-    private String msgId;
+    @ApiModelProperty("客户端生产的唯一消息key")
+    private Long key;
 
     @ApiModelProperty("聊天室id")
-    private String chatRoomId;
+    private String roomId;
 
     @ApiModelProperty("消息载荷")
     private String payload;
@@ -54,8 +50,8 @@ public class PubMsgInfo implements Serializable {
 
 
     public void validate(){
-        if(StrUtil.isEmpty(msgId)){
-            throw new RuntimeException("消息id不能为空");
+        if(key==null){
+            throw new RuntimeException("消息key不能为空");
         }
         if(StrUtil.isEmpty(payload)){
             throw new RuntimeException("消息载荷不能为空");
@@ -69,7 +65,7 @@ public class PubMsgInfo implements Serializable {
         if(ObjectUtil.isNull(msgType)){
             throw new RuntimeException("信息类型不能为空");
         }
-        if(StrUtil.isEmpty(chatRoomId)){
+        if(StrUtil.isEmpty(roomId)){
             throw new RuntimeException("聊天室id不能为空");
         }
         if(ObjectUtil.isNull(senderType)){

+ 68 - 0
nb-im/src/main/java/com/nb/im/ws/PubMsgRequestHandler.java

@@ -0,0 +1,68 @@
+package com.nb.im.ws;
+
+import cn.hutool.core.collection.CollectionUtil;
+import cn.hutool.extra.spring.SpringUtil;
+import cn.hutool.json.JSONUtil;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.nb.common.websocket.PubResponse;
+import com.nb.common.websocket.TopicMessage;
+import com.nb.common.websocket.event.PubMsgEvent;
+import com.nb.common.websocket.filter.PubMsgFilter;
+import com.nb.common.websocket.handler.TopicWrapper;
+import com.nb.common.websocket.msg.MessagingRequest;
+import com.nb.common.websocket.msg.handler.IMsgRequestHandler;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import org.redisson.api.RedissonClient;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.stereotype.Component;
+import org.tio.core.ChannelContext;
+import org.tio.core.Tio;
+import org.tio.websocket.common.WsPacket;
+import org.tio.websocket.common.WsResponse;
+
+import java.util.List;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName PubMsgRequestHandler.java
+ * @Description TODO
+ * @createTime 2022年08月18日 18:58:00
+ */
+@Configuration
+@AllArgsConstructor
+public class PubMsgRequestHandler implements IMsgRequestHandler {
+    private final List<PubMsgFilter> msgFilters;
+    private final RedissonClient redissonClient;
+    @Getter
+    private final ObjectMapper objectMapper;
+    @Override
+    public MessagingRequest.Type getType() {
+        return MessagingRequest.Type.pub;
+    }
+
+    @Override
+    public void handler(MessagingRequest message, ChannelContext channelContext, TopicWrapper topicWrapper) {
+        String topic = topicWrapper.getTopic();
+        String param = topicWrapper.getParam();
+        if(CollectionUtil.isNotEmpty(msgFilters)){
+            boolean result = msgFilters.stream()
+                    .allMatch(filter -> filter.doFilter(channelContext, message));
+            if(result){
+                PubMsgInfo pubMsgInfo = JSONUtil.toBean(JSONUtil.toJsonStr(message.getPayload()), PubMsgInfo.class);
+                //消息不可重复发送
+                String chatRoomId = pubMsgInfo.getRoomId();
+
+
+                SpringUtil.publishEvent(new PubMsgEvent(this,message));
+                pubTopicMap.computeIfAbsent(topic, k->redissonClient.getTopic(k))
+                        .publishAsync( TopicMessage.of(message.getPayload(),param,message.getKey()))
+                        .whenComplete((r,e)->{
+                            Tio.send(channelContext, WsResponse.fromText(JSONUtil.toJsonStr(PubResponse.success(message.getKey())), WsPacket.CHARSET_NAME));
+                        });
+
+            }
+        }
+    }
+}

+ 3 - 5
nb-im/src/main/java/com/nb/im/ws/filter/MsgFormatFilter.java

@@ -1,12 +1,10 @@
 package com.nb.im.ws.filter;
 
 import cn.hutool.core.util.ObjectUtil;
-import cn.hutool.core.util.StrUtil;
 import cn.hutool.json.JSONUtil;
-import com.nb.common.websocket.MessagingRequest;
+import com.nb.common.websocket.msg.MessagingRequest;
 import com.nb.common.websocket.PubResponse;
 import com.nb.common.websocket.filter.PubMsgFilter;
-import com.nb.core.result.R;
 import com.nb.im.ws.PubMsgInfo;
 import org.springframework.stereotype.Component;
 import org.tio.core.ChannelContext;
@@ -28,14 +26,14 @@ public class MsgFormatFilter implements PubMsgFilter {
         String id = source.getId();
         if("im".equalsIgnoreCase(id)){
             if(ObjectUtil.isEmpty(source.getPayload())){
-                Tio.send(channelContext,WsResponse.fromText(JSONUtil.toJsonStr(PubResponse.fail("","发布消息不可为空")),WsPacket.CHARSET_NAME));
+                Tio.send(channelContext,WsResponse.fromText(JSONUtil.toJsonStr(PubResponse.fail(-1L,"发布消息不可为空")),WsPacket.CHARSET_NAME));
                 return false;
             }
             PubMsgInfo pubMsgInfo = JSONUtil.toBean(JSONUtil.toJsonStr(source.getPayload()), PubMsgInfo.class);
             try {
                 pubMsgInfo.validate();
             }catch (RuntimeException e){
-                Tio.send(channelContext,WsResponse.fromText(JSONUtil.toJsonStr(PubResponse.fail(pubMsgInfo.getMsgId(),e.getMessage())),WsPacket.CHARSET_NAME));
+                Tio.send(channelContext,WsResponse.fromText(JSONUtil.toJsonStr(PubResponse.fail(pubMsgInfo.getKey(),e.getMessage())),WsPacket.CHARSET_NAME));
                 return false;
             }
             //todo

+ 0 - 1
nb-service/web-service/src/main/java/com/nb/web/service/bus/websocket/DefaultHisMsgHandler.java

@@ -1,7 +1,6 @@
 package com.nb.web.service.bus.websocket;
 
 import cn.hutool.json.JSONUtil;
-import com.nb.common.websocket.MessagingRequest;
 import com.nb.common.websocket.handler.HisMsgHandler;
 import com.nb.web.service.bus.hospital.HospitalManagerRegister;
 import com.nb.web.service.bus.hospital.his.HisResponse;