Przeglądaj źródła

fix 同一ws通道订阅多个相同主题时不会推送重复数据
del 删除多余ws主题

A17404李放 3 lat temu
rodzic
commit
b4aa142db3

+ 2 - 12
coffee-common/src/main/java/com/coffee/common/config/websocket/MessagingRequest.java

@@ -30,8 +30,6 @@ public class MessagingRequest {
 
     private String tenantId;
 
-    private Integer isSys;
-
     public static enum Type{
         sub,unsub
     }
@@ -40,16 +38,8 @@ public class MessagingRequest {
         if(StrUtil.isNullOrUndefined(id)){
             throw new RuntimeException("MessageRequest id不能为空");
         }
-        if(StrUtil.isNullOrUndefined(tenantId)&&isSys==null){
-            throw new RuntimeException("tenantId、isSys不能同时为空");
+        if(StrUtil.isNullOrUndefined(tenantId)){
+            throw new RuntimeException("tenantId不能为空");
         }
     }
-
-    public static void main(String[] args) {
-        MessagingRequest messagingRequest = new MessagingRequest();
-        messagingRequest.setTenantId("1");
-        messagingRequest.setParams(Arrays.asList("235931","120263"));
-        messagingRequest.setId(WebSocketConstant.PATIENT_MONITOR);
-        System.out.println(JSONUtil.toJsonStr(messagingRequest));
-    }
 }

+ 2 - 39
coffee-common/src/main/java/com/coffee/common/config/websocket/WebSocketConstant.java

@@ -11,11 +11,8 @@ import com.coffee.common.config.websocket.handler.TopicWrapper;
  * @createTime 2022年03月25日 14:25:00
  */
 public class WebSocketConstant {
-
-    public static final String ALARM_COUNT="alarm-count";
-    public static final String DEVICE_INFO_DETAIL="device-info-detail";
+    public static final String MONITOR_TIME_COUNT ="monitor-time-count";
     public static final String MONITOR_STATE_COUNT ="monitor-state-count";
-    public static final String CLINIC_INFO = "clinic-info";
     public static final String DEVICE_CONFLICT = "device-conflict";
     /**
      * 病人监控订阅
@@ -36,50 +33,16 @@ public class WebSocketConstant {
         return TopicWrapper.of(id+"-"+productName+"-"+param+"-"+tenantId,param);
     }
 
-    /**
-     * 获取 设备状态变化主题
-     * @param productName
-     * @param param
-     * @param tenantId
-     * @return
-     */
-    public static TopicWrapper getAlarmCount(String productName,String param,String tenantId){
-        return getTopic(ALARM_COUNT,productName,param,tenantId);
-    }
-
-    /**
-     * 获取 设备信息变化主题
-     * @param productName
-     * @param param
-     * @param tenantId
-     * @return
-     */
-    public static TopicWrapper getDeviceInfoDetailTopic(String productName,String param,String tenantId){
-        return getTopic(DEVICE_INFO_DETAIL,productName,param,tenantId);
-    }
-
 
     /**
      * 获取 设备状态变化主题
      * @param tenantId 设备所属医院
      * @return
      */
-    public static TopicWrapper getDeviceStateCount(String tenantId){
+    public static TopicWrapper getMonitorStateCount(String tenantId){
         return getTopic(MONITOR_STATE_COUNT,null,null,tenantId);
     }
 
-
-    /**
-     * 获取 临床信息主题
-     * @param productName
-     * @param param
-     * @param tenantId
-     * @return
-     */
-    public static TopicWrapper getClinicInfoTopic(String productName,String param,String tenantId){
-        return getTopic(CLINIC_INFO,productName,param,tenantId);
-    }
-
     /**
      * 获取 临床设备冲突主题
      * @param productName

+ 21 - 12
coffee-common/src/main/java/com/coffee/common/config/websocket/handler/Subscribe.java

@@ -62,13 +62,19 @@ public abstract class Subscribe implements WsHandler {
         }
         //获取所有设备id
         List<String> params = message.getParams();
-        if(CollectionUtil.isEmpty(params)){
+        if(CollectionUtil.isEmpty(params)&&this.needParam()){
             return;
         }
+        List<TopicWrapper> subScribeTopic =null;
         //需要处理的主题
-        List<TopicWrapper> subScribeTopic =
-                params.stream().map(param -> getTopic(message.getProductName(), param, loginUser.getTenantId()))
-                        .collect(Collectors.toList());
+        if (CollectionUtil.isNotEmpty(params)) {
+            subScribeTopic =
+                    params.stream().map(param -> getTopic(message.getProductName(), param, loginUser.getTenantId()))
+                            .collect(Collectors.toList());
+        }else {
+            subScribeTopic = Collections.singletonList(getTopic(message.getProductName(), null, loginUser.getTenantId()));
+        }
+
         MessagingRequest.Type type = message.getType();
         if(MessagingRequest.Type.sub==type){
             //订阅主题
@@ -85,10 +91,18 @@ public abstract class Subscribe implements WsHandler {
      * @param topicWrapper
      */
     public void subscribe(ChannelContext channelContext, TopicWrapper topicWrapper){
+        //将主题与ws通道绑定
+        Object result = Optional.ofNullable(channelContext.get(SUBSCRIBE_TOPIC)).orElse(new HashSet<>());
+        Set<String> subscribeTopicSet= (Set<String>) result;
+        if(subscribeTopicSet.contains(topicWrapper.getTopic())){
+            return;
+        }
+        subscribeTopicSet.add(topicWrapper.getTopic());
+        channelContext.set(SUBSCRIBE_TOPIC,subscribeTopicSet);
         //同一主题只订阅一次
-        Set<ChannelContext> channelContexts = Optional.ofNullable(subscribeTopics.get(topicWrapper)).orElse(new HashSet<>());
-        if(!subscribeTopics.containsKey(topicWrapper)){
-            channelContexts.add(channelContext);
+        Set<ChannelContext> channelContexts = Optional.ofNullable(subscribeTopics.get(topicWrapper.getTopic())).orElse(new HashSet<>());
+        channelContexts.add(channelContext);
+        if(!subscribeTopics.containsKey(topicWrapper.getTopic())){
             redisTemplate.execute(new RedisCallback<Object>() {
                 @Override
                 public Object doInRedis(RedisConnection connection) throws DataAccessException {
@@ -99,11 +113,6 @@ public abstract class Subscribe implements WsHandler {
             });
         }
         subscribeTopics.put(topicWrapper.getTopic(),channelContexts);
-        //将主题与ws通道绑定
-        Object result = Optional.ofNullable(channelContext.get(SUBSCRIBE_TOPIC)).orElse(new HashSet<>());
-        Set<String> subscribeTopicSet= (Set<String>) result;
-        subscribeTopicSet.add(topicWrapper.getTopic());
-        channelContext.set(SUBSCRIBE_TOPIC,subscribeTopicSet);
     };
 
     /**

+ 9 - 0
coffee-common/src/main/java/com/coffee/common/config/websocket/handler/WsHandler.java

@@ -16,4 +16,13 @@ public interface WsHandler {
     void onMessage(MessagingRequest message, ChannelContext channelContext);
 
     void close(ChannelContext channelContext);
+
+    /**
+     * 描述: 参数是否为必输的
+     * @author lifang
+     * @date 2022/5/9 11:20
+     * @param
+     * @return boolean
+     */
+    boolean needParam();
 }

+ 0 - 34
coffee-system/src/main/java/com/coffee/bus/websocket/AlarmCountHandler.java

@@ -1,34 +0,0 @@
-package com.coffee.bus.websocket;
-
-import cn.hutool.core.collection.CollectionUtil;
-import com.coffee.common.Constants;
-import com.coffee.common.bo.LoginUser;
-import com.coffee.common.config.websocket.MessagingRequest;
-import com.coffee.common.config.websocket.WebSocketConstant;
-import com.coffee.common.config.websocket.handler.Subscribe;
-import org.springframework.stereotype.Component;
-import org.tio.core.ChannelContext;
-
-import java.util.List;
-import java.util.stream.Collectors;
-
-/**
- * @author lifang
- * @version 1.0.0
- * @ClassName AlarmCountHandler.java
- * @Description 设备报警数量订阅
- * @createTime 2022年03月25日 14:22:00
- */
-@Component
-public class AlarmCountHandler extends Subscribe {
-
-    @Override
-    public String getId() {
-        return WebSocketConstant.ALARM_COUNT;
-    }
-
-    @Override
-    public void close(ChannelContext channelContext) {
-
-    }
-}

+ 0 - 28
coffee-system/src/main/java/com/coffee/bus/websocket/ClinicInfoHandler.java

@@ -1,28 +0,0 @@
-package com.coffee.bus.websocket;
-
-import com.coffee.common.config.websocket.MessagingRequest;
-import com.coffee.common.config.websocket.WebSocketConstant;
-import com.coffee.common.config.websocket.handler.Subscribe;
-import org.springframework.stereotype.Component;
-import org.tio.core.ChannelContext;
-
-/**
- * @author lifang
- * @version 1.0.0
- * @ClassName AlarmCountHandler.java
- * @Description 病人临床信息订阅
- * @createTime 2022年03月25日 14:22:00
- */
-@Component
-public class ClinicInfoHandler extends Subscribe {
-
-    @Override
-    public String getId() {
-        return WebSocketConstant.CLINIC_INFO;
-    }
-
-    @Override
-    public void close(ChannelContext channelContext) {
-
-    }
-}

+ 0 - 27
coffee-system/src/main/java/com/coffee/bus/websocket/DeviceConflictHandler.java

@@ -1,27 +0,0 @@
-package com.coffee.bus.websocket;
-
-import com.coffee.common.config.websocket.WebSocketConstant;
-import com.coffee.common.config.websocket.handler.Subscribe;
-import org.springframework.stereotype.Component;
-import org.tio.core.ChannelContext;
-
-/**
- * @author lifang
- * @version 1.0.0
- * @ClassName AlarmCountHandler.java
- * @Description 临床设备冲突订阅
- * @createTime 2022年03月25日 14:22:00
- */
-@Component
-public class DeviceConflictHandler extends Subscribe {
-
-    @Override
-    public String getId() {
-        return WebSocketConstant.CLINIC_INFO;
-    }
-
-    @Override
-    public void close(ChannelContext channelContext) {
-
-    }
-}

+ 5 - 0
coffee-system/src/main/java/com/coffee/bus/websocket/MonitorStateCountHandler.java

@@ -24,4 +24,9 @@ public class MonitorStateCountHandler extends Subscribe {
     public void close(ChannelContext channelContext) {
 
     }
+
+    @Override
+    public boolean needParam() {
+        return false;
+    }
 }

+ 9 - 6
coffee-system/src/main/java/com/coffee/bus/websocket/DeviceInfoDetailHandler.java → coffee-system/src/main/java/com/coffee/bus/websocket/MonitorTimeCountHandler.java

@@ -8,22 +8,25 @@ import org.tio.core.ChannelContext;
 /**
  * @author lifang
  * @version 1.0.0
- * @ClassName DeviceInfoDetailHandler.java
- * @Description 处理订阅设备详情
- * @createTime 2022年03月25日 14:20:00
+ * @ClassName DeviceStateCountHandler.java
+ * @Description 设备数量时间统计更新订阅
+ * @createTime 2022年03月25日 14:21:00
  */
 @Component
-public class DeviceInfoDetailHandler extends Subscribe{
+public class MonitorTimeCountHandler extends Subscribe {
 
     @Override
     public String getId() {
-        return WebSocketConstant.DEVICE_INFO_DETAIL;
+        return WebSocketConstant.MONITOR_STATE_COUNT;
     }
 
-
     @Override
     public void close(ChannelContext channelContext) {
 
     }
 
+    @Override
+    public boolean needParam() {
+        return false;
+    }
 }

+ 5 - 0
coffee-system/src/main/java/com/coffee/bus/websocket/PatientMonitorHandler.java

@@ -24,4 +24,9 @@ public class PatientMonitorHandler extends Subscribe {
     public void close(ChannelContext channelContext) {
 
     }
+
+    @Override
+    public boolean needParam() {
+        return true;
+    }
 }

+ 5 - 0
coffee-system/src/main/java/com/coffee/bus/websocket/WebSocketCloseHandler.java

@@ -31,4 +31,9 @@ public class WebSocketCloseHandler extends Subscribe {
         Set<String> subscribeTopicSet= (Set<String>) result;
         subscribeTopicSet.forEach(topic->this.unsubscribe(channelContext,topic));
     }
+
+    @Override
+    public boolean needParam() {
+        return false;
+    }
 }

+ 1 - 4
coffee-system/src/main/java/com/coffee/bus/websocket/listener/DeviceInfoListener.java

@@ -23,14 +23,11 @@ import com.coffee.bus.registry.patient.PatientRegistry;
 import com.coffee.bus.registry.patient.bean.DeviceTimeSmallInfo;
 import com.coffee.bus.registry.patient.bean.PatientCacheInfo;
 import com.coffee.bus.service.*;
-import com.coffee.bus.service.dto.PatientMonitorDetailResult;
 import com.coffee.bus.utils.WsPublishUtils;
 import com.coffee.common.config.websocket.WebSocketConstant;
-import io.reactivex.internal.operators.completable.CompletableFromRunnable;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.context.event.EventListener;
-import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.stereotype.Component;
 import org.springframework.transaction.annotation.Transactional;
 import java.util.*;
@@ -142,7 +139,7 @@ public class DeviceInfoListener {
                     wsPublishUtils.publish(WebSocketConstant.getPatientMonitor(null, device.getPatientCode(), device.getTenantId()).getTopic(),
                             patientService.lookMonitorByPatientCode(device.getPatientCode(),device.getTenantId()));
                     //推送设备状态
-                    wsPublishUtils.publish(WebSocketConstant.getDeviceStateCount(device.getTenantId()).getTopic(),patientService.statusStats(device.getTenantId()));
+                    wsPublishUtils.publish(WebSocketConstant.getMonitorStateCount(device.getTenantId()).getTopic(),patientService.statusStats(device.getTenantId()));
                 });
                 return null;
             });