瀏覽代碼

add ws统一订阅主题

18339543638 3 年之前
父節點
當前提交
8e8699f436

+ 6 - 1
coffee-system/src/main/java/com/coffee/bus/websocket/WebSocketConstant.java → coffee-common/src/main/java/com/coffee/common/config/websocket/WebSocketConstant.java

@@ -1,4 +1,4 @@
-package com.coffee.bus.websocket;
+package com.coffee.common.config.websocket;
 
 import cn.hutool.core.util.StrUtil;
 
@@ -14,6 +14,7 @@ public class WebSocketConstant {
     public static final String ALARM_COUNT="alarm-count";
     public static final String DEVICE_INFO_DETAIL="device-info-detail";
     public static final String DEVICE_STATE_COUNT="device-state-count";
+    public static final String CLINIC_INFO = "clinic-info";
 
     /**
      * 主题格式为 device-info-detail:default:45789215623:医院id
@@ -32,4 +33,8 @@ public class WebSocketConstant {
     public static String getDeviceInfoDetailTopic(String productName,String param,String tenantId){
         return getTopic(DEVICE_INFO_DETAIL,productName,param,tenantId);
     }
+
+    public static String getClinicInfoTopic(String productName,String param,String tenantId){
+        return getTopic(CLINIC_INFO,productName,param,tenantId);
+    }
 }

+ 37 - 7
coffee-common/src/main/java/com/coffee/common/config/websocket/handler/Subscribe.java

@@ -1,23 +1,22 @@
 package com.coffee.common.config.websocket.handler;
 
 import cn.hutool.core.collection.CollectionUtil;
+import com.coffee.common.Constants;
+import com.coffee.common.bo.LoginUser;
 import com.coffee.common.config.websocket.DefaultMessageListener;
-import com.coffee.common.config.websocket.MessageResponse;
-import io.netty.channel.DefaultEventLoop;
-import io.netty.channel.EventLoop;
+import com.coffee.common.config.websocket.MessagingRequest;
+import com.coffee.common.config.websocket.WebSocketConstant;
 import org.springframework.dao.DataAccessException;
 import org.springframework.data.redis.connection.RedisConnection;
-import org.springframework.data.redis.connection.Subscription;
 import org.springframework.data.redis.core.RedisCallback;
 import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.util.ConcurrentReferenceHashMap;
 import org.tio.core.ChannelContext;
-import reactor.util.function.Tuple3;
 
 import javax.annotation.Resource;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 /**
  * @author lifang
@@ -43,7 +42,38 @@ public abstract class Subscribe implements WsHandler {
     private Map<String,RedisConnection> redisConnectionMap=new ConcurrentReferenceHashMap<>();
 
 
-    private EventLoop singleThreadEventLoop=new DefaultEventLoop() ;
+    public String getTopic(String productName,String param,String tenantId){
+        return  WebSocketConstant.getTopic(this.getId(),productName, param, tenantId);
+
+    };
+
+
+    @Override
+    public void onMessage(MessagingRequest message, ChannelContext channelContext) {
+        LoginUser loginUser = (LoginUser) channelContext.get(Constants.LOGIN_USER_KEY);
+        if(loginUser==null){
+            channelContext.setClosed(true);
+            return;
+        }
+        //获取所有设备id
+        List<String> params = message.getParams();
+        if(CollectionUtil.isEmpty(params)){
+            return;
+        }
+        //需要处理的主题
+        List<String> subScribeTopic =
+                params.stream().map(deviceId -> getTopic(message.getProductName(), deviceId, loginUser.getTenantId()))
+                        .collect(Collectors.toList());
+        MessagingRequest.Type type = message.getType();
+        if(MessagingRequest.Type.sub==type){
+            //订阅主题
+            subScribeTopic.forEach(topic->this.subscribe(channelContext,topic));
+        }else {
+            //取消订阅主题
+            subScribeTopic.forEach(topic->this.unsubscribe(channelContext,topic));
+        }
+    }
+
     /**
      * ws 订阅主题
      * @param channelContext

+ 7 - 5
coffee-system/src/main/java/com/coffee/bus/websocket/AlarmCountHandler.java

@@ -1,10 +1,17 @@
 package com.coffee.bus.websocket;
 
+import cn.hutool.core.collection.CollectionUtil;
+import com.coffee.common.Constants;
+import com.coffee.common.bo.LoginUser;
 import com.coffee.common.config.websocket.MessagingRequest;
+import com.coffee.common.config.websocket.WebSocketConstant;
 import com.coffee.common.config.websocket.handler.Subscribe;
 import org.springframework.stereotype.Component;
 import org.tio.core.ChannelContext;
 
+import java.util.List;
+import java.util.stream.Collectors;
+
 /**
  * @author lifang
  * @version 1.0.0
@@ -20,11 +27,6 @@ public class AlarmCountHandler extends Subscribe {
         return WebSocketConstant.ALARM_COUNT;
     }
 
-    @Override
-    public void onMessage(MessagingRequest message, ChannelContext channelContext) {
-
-    }
-
     @Override
     public void close(ChannelContext channelContext) {
 

+ 28 - 0
coffee-system/src/main/java/com/coffee/bus/websocket/ClinicInfoHandler.java

@@ -0,0 +1,28 @@
+package com.coffee.bus.websocket;
+
+import com.coffee.common.config.websocket.MessagingRequest;
+import com.coffee.common.config.websocket.WebSocketConstant;
+import com.coffee.common.config.websocket.handler.Subscribe;
+import org.springframework.stereotype.Component;
+import org.tio.core.ChannelContext;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName AlarmCountHandler.java
+ * @Description 病人临床信息订阅
+ * @createTime 2022年03月25日 14:22:00
+ */
+@Component
+public class ClinicInfoHandler extends Subscribe {
+
+    @Override
+    public String getId() {
+        return WebSocketConstant.CLINIC_INFO;
+    }
+
+    @Override
+    public void close(ChannelContext channelContext) {
+
+    }
+}

+ 1 - 25
coffee-system/src/main/java/com/coffee/bus/websocket/DeviceInfoDetailHandler.java

@@ -4,6 +4,7 @@ import cn.hutool.core.collection.CollectionUtil;
 import com.coffee.common.Constants;
 import com.coffee.common.bo.LoginUser;
 import com.coffee.common.config.websocket.MessagingRequest;
+import com.coffee.common.config.websocket.WebSocketConstant;
 import com.coffee.common.config.websocket.handler.Subscribe;
 import org.springframework.stereotype.Component;
 import org.tio.core.ChannelContext;
@@ -25,31 +26,6 @@ public class DeviceInfoDetailHandler extends Subscribe{
         return WebSocketConstant.DEVICE_INFO_DETAIL;
     }
 
-    @Override
-    public void onMessage(MessagingRequest message, ChannelContext channelContext) {
-        LoginUser loginUser = (LoginUser) channelContext.get(Constants.LOGIN_USER_KEY);
-        if(loginUser==null){
-            channelContext.setClosed(true);
-            return;
-        }
-        //获取所有设备id
-        List<String> params = message.getParams();
-        if(CollectionUtil.isEmpty(params)){
-            return;
-        }
-        //需要处理的主题
-        List<String> subScribeTopic =
-                params.stream().map(deviceId -> WebSocketConstant.getTopic(this.getId(), message.getProductName(), deviceId, loginUser.getTenantId()))
-                        .collect(Collectors.toList());
-        MessagingRequest.Type type = message.getType();
-        if(MessagingRequest.Type.sub==type){
-            //订阅主题
-            subScribeTopic.forEach(topic->this.subscribe(channelContext,topic));
-        }else {
-            //取消订阅主题
-            subScribeTopic.forEach(topic->this.unsubscribe(channelContext,topic));
-        }
-    }
 
     @Override
     public void close(ChannelContext channelContext) {

+ 1 - 5
coffee-system/src/main/java/com/coffee/bus/websocket/DeviceStateCountHandler.java

@@ -1,6 +1,7 @@
 package com.coffee.bus.websocket;
 
 import com.coffee.common.config.websocket.MessagingRequest;
+import com.coffee.common.config.websocket.WebSocketConstant;
 import com.coffee.common.config.websocket.handler.Subscribe;
 import org.springframework.stereotype.Component;
 import org.tio.core.ChannelContext;
@@ -20,11 +21,6 @@ public class DeviceStateCountHandler  extends Subscribe {
         return WebSocketConstant.DEVICE_STATE_COUNT;
     }
 
-    @Override
-    public void onMessage(MessagingRequest message, ChannelContext channelContext) {
-
-    }
-
     @Override
     public void close(ChannelContext channelContext) {
 

+ 0 - 5
coffee-system/src/main/java/com/coffee/bus/websocket/WebSocketCloseHandler.java

@@ -24,11 +24,6 @@ public class WebSocketCloseHandler extends Subscribe {
         return "";
     }
 
-    @Override
-    public void onMessage(MessagingRequest message, ChannelContext channelContext) {
-
-    }
-
     @Override
     public void close(ChannelContext channelContext) {
         //关闭则取消订阅

+ 14 - 1
coffee-system/src/main/java/com/coffee/bus/websocket/listener/DeviceInfoListener.java

@@ -2,18 +2,20 @@ package com.coffee.bus.websocket.listener;
 
 import cn.hutool.core.util.RandomUtil;
 import cn.hutool.extra.spring.SpringUtil;
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
 import com.coffee.bus.entity.BusNetPumpEntity;
 import com.coffee.bus.enums.NetPumpStatusEnum;
 import com.coffee.bus.listener.event.bean.DeviceAlarmEvent;
 import com.coffee.bus.listener.event.bean.DeviceInfoEvent;
 import com.coffee.bus.service.LocalBusNetPumpService;
-import com.coffee.bus.websocket.WebSocketConstant;
+import com.coffee.common.config.websocket.WebSocketConstant;
 import lombok.AllArgsConstructor;
 import org.springframework.context.event.EventListener;
 import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 import java.math.BigDecimal;
+import java.util.Arrays;
 import java.util.List;
 
 /**
@@ -70,6 +72,17 @@ public class DeviceInfoListener {
         NetPumpStatusEnum runState = pump.getRunState();
         if(runState!=null&&Boolean.TRUE.equals(runState.getAlarm())){
             //设备发生报警
+            //存储报警信息 todo
+
+            String topic = WebSocketConstant.getDeviceStateCount(null, runState.name(), pump.getTenantId());
+            //获取报警设备数量
+            List<BusNetPumpEntity> alarmList = netPumpService.list(new QueryWrapper<BusNetPumpEntity>()
+                    .lambda()
+                    .select(BusNetPumpEntity::getId)
+                    .eq(BusNetPumpEntity::getRunState, runState));
+            //发送告警通知,获取报警设备数量, 下标为0是报警设备总数量,下标为1是设备所属医院的报警设备总数量
+            List<? extends Number> result = Arrays.asList(alarmList.size(), alarmList.stream().filter(alarm -> alarm.getTenantId().equals(pump.getTenantId())).count());
+            redisTemplate.convertAndSend(topic, result);
         }
 
     }

+ 6 - 2
coffee-system/src/main/java/com/coffee/bus/websocket/listener/HistoryInfoListener.java

@@ -1,6 +1,5 @@
 package com.coffee.bus.websocket.listener;
 
-import cn.hutool.json.JSON;
 import cn.hutool.json.JSONArray;
 import cn.hutool.json.JSONUtil;
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
@@ -12,9 +11,11 @@ import com.coffee.bus.listener.event.bean.HistoryInfoEvent;
 import com.coffee.bus.script.ExecuteResult;
 import com.coffee.bus.script.ScriptManager;
 import com.coffee.bus.service.*;
+import com.coffee.common.config.websocket.WebSocketConstant;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.context.event.EventListener;
+import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.stereotype.Component;
 import org.springframework.transaction.annotation.Transactional;
 
@@ -44,6 +45,8 @@ public class HistoryInfoListener {
     private final LocalBusNetPumpHistoryService historyService;
 
     private final LocalBusNetPumpService netPumpService;
+
+    private final RedisTemplate redisTemplate;
     @EventListener
     public void historyInfoListener(HistoryInfoEvent infoEvent){
         log.info("接收到了医院数据,[{}]",infoEvent.getContent());
@@ -89,6 +92,7 @@ public class HistoryInfoListener {
             clinicService.save(clinic);
         }
         //发送临床信息
-
+        String topic = WebSocketConstant.getClinicInfoTopic(null, clinic.getPatientCode(), clinic.getTenantId());
+        redisTemplate.convertAndSend(topic, clinic);
     }
 }