Ver Fonte

update 实时推送
add 评价中新增 胎心 字段

A17404李放 há 3 anos atrás
pai
commit
f2985584eb

+ 10 - 9
coffee-common/src/main/java/com/coffee/common/config/websocket/DefaultMessageListener.java

@@ -1,19 +1,17 @@
 package com.coffee.common.config.websocket;
 
 import cn.hutool.core.collection.CollectionUtil;
+import cn.hutool.core.util.ObjectUtil;
+import cn.hutool.json.JSON;
 import cn.hutool.json.JSONUtil;
-import com.coffee.common.result.R;
-import lombok.AllArgsConstructor;
+import com.coffee.common.config.websocket.handler.TopicWrapper;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.data.redis.connection.Message;
 import org.springframework.data.redis.connection.MessageListener;
-import org.springframework.util.SerializationUtils;
 import org.tio.core.ChannelContext;
 import org.tio.core.Tio;
 import org.tio.websocket.common.WsResponse;
-
-import java.io.UnsupportedEncodingException;
 import java.util.Set;
 
 /**
@@ -28,17 +26,20 @@ import java.util.Set;
 public class DefaultMessageListener implements MessageListener {
     private final String id;
     private final Set<ChannelContext> channelContexts;
+    private final TopicWrapper topicWrapper;
 
 
     @Override
     public void onMessage(Message message, byte[] pattern) {
         if (CollectionUtil.isNotEmpty(channelContexts)) {
             channelContexts.parallelStream()
-                    .filter(channelContext -> !channelContext.isClosed)
                     .forEach(channel -> Tio.send(channel,
-                            WsResponse.fromText(JSONUtil.toJsonStr(R.success(
-                                    MessageResponse.of(id,"result",new String(message.getBody()))))
-                                    ,"utf-8")));
+                            WsResponse.fromText(
+                                    JSONUtil.toJsonStr(MessageResponse.of(id,"result",topicWrapper.getParam(),JSONUtil.parse(message.toString()))),"utf-8"
+                            )
+                            )
+                    );
         }
     }
+
 }

+ 6 - 3
coffee-common/src/main/java/com/coffee/common/config/websocket/MessageResponse.java

@@ -1,9 +1,11 @@
 package com.coffee.common.config.websocket;
 
-import cn.hutool.core.util.ObjectUtil;
+import cn.hutool.json.JSON;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 
+import java.io.Serializable;
+
 /**
  * @author lifang
  * @version 1.0.0
@@ -13,8 +15,9 @@ import lombok.Data;
  */
 @Data
 @AllArgsConstructor(staticName = "of")
-public class MessageResponse {
+public class MessageResponse implements Serializable {
     private String id;
     private String type;
-    private Object payload;
+    private String param;
+    private JSON payload;
 }

+ 9 - 0
coffee-common/src/main/java/com/coffee/common/config/websocket/MessagingRequest.java

@@ -1,6 +1,7 @@
 package com.coffee.common.config.websocket;
 
 import cn.hutool.core.util.StrUtil;
+import cn.hutool.json.JSONUtil;
 import lombok.Data;
 
 import java.util.*;
@@ -43,4 +44,12 @@ public class MessagingRequest {
             throw new RuntimeException("tenantId、isSys不能同时为空");
         }
     }
+
+    public static void main(String[] args) {
+        MessagingRequest messagingRequest = new MessagingRequest();
+        messagingRequest.setTenantId("1");
+        messagingRequest.setParams(Arrays.asList("235931","120263"));
+        messagingRequest.setId(WebSocketConstant.PATIENT_MONITOR);
+        System.out.println(JSONUtil.toJsonStr(messagingRequest));
+    }
 }

+ 9 - 8
coffee-common/src/main/java/com/coffee/common/config/websocket/WebSocketConstant.java

@@ -1,6 +1,7 @@
 package com.coffee.common.config.websocket;
 
 import cn.hutool.core.util.StrUtil;
+import com.coffee.common.config.websocket.handler.TopicWrapper;
 
 /**
  * @author lifang
@@ -29,10 +30,10 @@ public class WebSocketConstant {
      * @param param
      * @return
      */
-    public static String getTopic(String id,String productName,String param,String tenantId){
+    public static TopicWrapper getTopic(String id,String productName,String param,String tenantId){
         productName=StrUtil.isEmptyIfStr(productName)?"default":productName;
         tenantId=StrUtil.isNullOrUndefined(tenantId)?"*":tenantId;
-        return id+"-"+productName+"-"+param+"-"+tenantId;
+        return TopicWrapper.of(id+"-"+productName+"-"+param+"-"+tenantId,param);
     }
 
     /**
@@ -42,7 +43,7 @@ public class WebSocketConstant {
      * @param tenantId
      * @return
      */
-    public static String getAlarmCount(String productName,String param,String tenantId){
+    public static TopicWrapper getAlarmCount(String productName,String param,String tenantId){
         return getTopic(ALARM_COUNT,productName,param,tenantId);
     }
 
@@ -53,7 +54,7 @@ public class WebSocketConstant {
      * @param tenantId
      * @return
      */
-    public static String getDeviceInfoDetailTopic(String productName,String param,String tenantId){
+    public static TopicWrapper getDeviceInfoDetailTopic(String productName,String param,String tenantId){
         return getTopic(DEVICE_INFO_DETAIL,productName,param,tenantId);
     }
 
@@ -65,7 +66,7 @@ public class WebSocketConstant {
      * @param tenantId
      * @return
      */
-    public static String getDeviceStateCount(String productName,String param,String tenantId){
+    public static TopicWrapper getDeviceStateCount(String productName,String param,String tenantId){
         return getTopic(DEVICE_STATE_COUNT,productName,param,tenantId);
     }
 
@@ -77,7 +78,7 @@ public class WebSocketConstant {
      * @param tenantId
      * @return
      */
-    public static String getClinicInfoTopic(String productName,String param,String tenantId){
+    public static TopicWrapper getClinicInfoTopic(String productName,String param,String tenantId){
         return getTopic(CLINIC_INFO,productName,param,tenantId);
     }
 
@@ -88,7 +89,7 @@ public class WebSocketConstant {
      * @param tenantId
      * @return
      */
-    public static String getDeviceConflictTopic(String productName,String param,String tenantId){
+    public static TopicWrapper getDeviceConflictTopic(String productName,String param,String tenantId){
         return getTopic(DEVICE_CONFLICT,productName,param,tenantId);
     }
 
@@ -99,7 +100,7 @@ public class WebSocketConstant {
      * @param tenantId
      * @return
      */
-    public static String getPatientMonitor(String productName,String param,String tenantId){
+    public static TopicWrapper getPatientMonitor(String productName,String param,String tenantId){
         return getTopic(PATIENT_MONITOR,productName,param,tenantId);
     }
 }

+ 13 - 13
coffee-common/src/main/java/com/coffee/common/config/websocket/handler/Subscribe.java

@@ -42,7 +42,7 @@ public abstract class Subscribe implements WsHandler {
     private Map<String,RedisConnection> redisConnectionMap=new ConcurrentReferenceHashMap<>();
 
 
-    public String getTopic(String productName,String param,String tenantId){
+    public TopicWrapper getTopic(String productName,String param,String tenantId){
         return  WebSocketConstant.getTopic(this.getId(),productName, param, tenantId);
 
     };
@@ -61,43 +61,43 @@ public abstract class Subscribe implements WsHandler {
             return;
         }
         //需要处理的主题
-        List<String> subScribeTopic =
-                params.stream().map(deviceId -> getTopic(message.getProductName(), deviceId, loginUser.getTenantId()))
+        List<TopicWrapper> subScribeTopic =
+                params.stream().map(param -> getTopic(message.getProductName(), param, loginUser.getTenantId()))
                         .collect(Collectors.toList());
         MessagingRequest.Type type = message.getType();
         if(MessagingRequest.Type.sub==type){
             //订阅主题
-            subScribeTopic.forEach(topic->this.subscribe(channelContext,topic));
+            subScribeTopic.forEach(topicWrapper->this.subscribe(channelContext,topicWrapper));
         }else {
             //取消订阅主题
-            subScribeTopic.forEach(topic->this.unsubscribe(channelContext,topic));
+            subScribeTopic.forEach(topicWrapper->this.unsubscribe(channelContext,topicWrapper.getTopic()));
         }
     }
 
     /**
      * ws 订阅主题
      * @param channelContext
-     * @param topic
+     * @param topicWrapper
      */
-    public void subscribe(ChannelContext channelContext, String topic){
+    public void subscribe(ChannelContext channelContext, TopicWrapper topicWrapper){
         //同一主题只订阅一次
-        Set<ChannelContext> channelContexts = Optional.ofNullable(subscribeTopics.get(topic)).orElse(new HashSet<>());
-        if(!subscribeTopics.containsKey(topic)){
+        Set<ChannelContext> channelContexts = Optional.ofNullable(subscribeTopics.get(topicWrapper)).orElse(new HashSet<>());
+        if(!subscribeTopics.containsKey(topicWrapper)){
             channelContexts.add(channelContext);
             redisTemplate.execute(new RedisCallback<Object>() {
                 @Override
                 public Object doInRedis(RedisConnection connection) throws DataAccessException {
-                    connection.pSubscribe(new DefaultMessageListener(getId(),channelContexts),topic.getBytes());
-                    redisConnectionMap.put(topic,connection);
+                    connection.pSubscribe(new DefaultMessageListener(getId(),channelContexts,topicWrapper),topicWrapper.getTopic().getBytes());
+                    redisConnectionMap.put(topicWrapper.getTopic(),connection);
                     return null;
                 }
             });
         }
-        subscribeTopics.put(topic,channelContexts);
+        subscribeTopics.put(topicWrapper.getTopic(),channelContexts);
         //将主题与ws通道绑定
         Object result = Optional.ofNullable(channelContext.get(SUBSCRIBE_TOPIC)).orElse(new HashSet<>());
         Set<String> subscribeTopicSet= (Set<String>) result;
-        subscribeTopicSet.add(topic);
+        subscribeTopicSet.add(topicWrapper.getTopic());
         channelContext.set(SUBSCRIBE_TOPIC,subscribeTopicSet);
     };
 

+ 24 - 0
coffee-common/src/main/java/com/coffee/common/config/websocket/handler/TopicWrapper.java

@@ -0,0 +1,24 @@
+package com.coffee.common.config.websocket.handler;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName Topic.java
+ * @Description ws订阅主题
+ * @createTime 2022年05月07日 17:14:00
+ */
+@Data
+@AllArgsConstructor(staticName = "of")
+public class TopicWrapper {
+    /**
+     * 主题
+     */
+    private String topic;
+    /**
+     * 订阅参数
+     */
+    private String param;
+}

+ 4 - 1
coffee-system/src/main/java/com/coffee/bus/entity/BusEvaluationEntity.java

@@ -34,7 +34,7 @@ public class BusEvaluationEntity extends TenantGenericEntity<String,String> {
 
     @ApiModelProperty(value = "泵号")
     @Length(max = 50,message = "泵号长度不得超过50个字节")
-    private String pumpCode;
+    private String deviceId;
 
     @ApiModelProperty(value = "疼痛评分静止")
     private Integer statics;
@@ -106,6 +106,9 @@ public class BusEvaluationEntity extends TenantGenericEntity<String,String> {
     @ApiModelProperty(value = "心率")
     private String heartRate;
 
+    @ApiModelProperty(value = "胎心")
+    private String fetalHeartRate;
+
     @ApiModelProperty(value = "呼吸频率")
     private String breathRate;
 

+ 0 - 1
coffee-system/src/main/java/com/coffee/bus/service/dto/PatientMonitorQuery.java

@@ -6,7 +6,6 @@ import com.coffee.bus.enums.DeviceTypeEnum;
 import com.coffee.bus.enums.DeviceStatusEnum;
 import com.coffee.bus.enums.WarnEnum;
 import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;

+ 3 - 0
coffee-system/src/main/java/com/coffee/bus/service/dto/PatientMonitorResult.java

@@ -27,6 +27,9 @@ public class PatientMonitorResult {
     @ApiModelProperty(value = "病人名称")
     private String patientName;
 
+    @ApiModelProperty(value = "病人年龄")
+    private String patientAge;
+
     @ApiModelProperty(value = "性别")
     private SexEnum gender;
 

+ 8 - 6
coffee-system/src/main/java/com/coffee/bus/utils/WsPublishUtils.java

@@ -1,7 +1,10 @@
 package com.coffee.bus.utils;
 
 import cn.hutool.extra.spring.SpringUtil;
+import cn.hutool.json.JSONUtil;
+import lombok.AllArgsConstructor;
 import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Component;
 
 /**
  * @author lifang
@@ -10,13 +13,12 @@ import org.springframework.data.redis.core.RedisTemplate;
  * @Description ws发布消息工具类
  * @createTime 2022年05月07日 11:34:00
  */
+@Component
+@AllArgsConstructor
 public class WsPublishUtils {
-    private static RedisTemplate redisTemplate;
+    private final RedisTemplate redisTemplate;
 
-    public static void publish(String topic,Object msg){
-        if(redisTemplate==null){
-            redisTemplate= SpringUtil.getBean(RedisTemplate.class);
-        }
-        redisTemplate.convertAndSend(topic, msg);
+    public void publish(String topic,Object msg){
+        redisTemplate.convertAndSend(topic,JSONUtil.parse(msg));
     }
 }

+ 27 - 0
coffee-system/src/main/java/com/coffee/bus/websocket/PatientMonitorHandler.java

@@ -0,0 +1,27 @@
+package com.coffee.bus.websocket;
+
+import com.coffee.common.config.websocket.WebSocketConstant;
+import com.coffee.common.config.websocket.handler.Subscribe;
+import org.springframework.stereotype.Component;
+import org.tio.core.ChannelContext;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName AlarmCountHandler.java
+ * @Description 病人监护信息订阅
+ * @createTime 2022年03月25日 14:22:00
+ */
+@Component
+public class PatientMonitorHandler extends Subscribe {
+
+    @Override
+    public String getId() {
+        return WebSocketConstant.PATIENT_MONITOR;
+    }
+
+    @Override
+    public void close(ChannelContext channelContext) {
+
+    }
+}

+ 3 - 2
coffee-system/src/main/java/com/coffee/bus/websocket/listener/DeviceInfoListener.java

@@ -72,6 +72,7 @@ public class DeviceInfoListener {
 
     private final LocalBusClinicService clinicService;
 
+    private final WsPublishUtils wsPublishUtils;
     /**
      * 监听上传的数据信息,
      * 若设备详情发生变化,则及时通知相应的ws通道
@@ -135,7 +136,7 @@ public class DeviceInfoListener {
 
             //则推送设备上报输注消息
             cacheOperation.add(()->{
-                WsPublishUtils.publish(WebSocketConstant.getPatientMonitor(null, device.getPatientCode(), device.getTenantId()),
+                wsPublishUtils.publish(WebSocketConstant.getPatientMonitor(null, device.getPatientCode(), device.getTenantId()).getTopic(),
                         new PatientMonitorDetailResult(null,infusionHistory,null,null));
                 return null;
             });
@@ -294,7 +295,7 @@ public class DeviceInfoListener {
                         patientOperator.setAllDevice(remainPatientBindDevices);
                         //更换输注信息后,将最新的输注信息传输实时传输给前端 //todo
 
-                        WsPublishUtils.publish(WebSocketConstant.getPatientMonitor(null,patientCode, hospitalId),
+                        wsPublishUtils.publish(WebSocketConstant.getPatientMonitor(null,patientCode, hospitalId).getTopic(),
                                 new PatientMonitorDetailResult(null,infusionHistoryService.getById(newMasterId),null,null));
 
                         return null;

+ 2 - 0
coffee-system/src/main/resources/mapper/bus/BusPatientMapper.xml

@@ -22,6 +22,7 @@
     <resultMap id="monitorResult" type="com.coffee.bus.service.dto.PatientMonitorResult">
         <result column="patient_name" property="patientName"/>
         <result column="patient_code" property="patientCode"/>
+        <result column="patient_age" property="patientAge"/>
         <result column="gender" property="gender"/>
         <result column="patient_alarm" property="patientAlarm"/>
         <result column="device_id" property="deviceId"/>
@@ -180,6 +181,7 @@
         i.type as device_type,
         r.alias as device_alias,
         c.ana_doctor as ana_doctor,
+        c.patient_age as patient_age,
         c.ana_type as ana_type,
         c.anal_type as anal_type,
         c.surgery_doctor as surgery_doctor,