Pārlūkot izejas kodu

add 延迟队列组件

A17404李放 3 gadi atpakaļ
vecāks
revīzija
0190716deb
22 mainītis faili ar 856 papildinājumiem un 417 dzēšanām
  1. 41 0
      nb-admin/src/test/java/com/nb/admin/DelayMessageTest.java
  2. 40 0
      nb-common/src/main/java/com/nb/common/delay_queue/handler/DelayMessageHandler.java
  3. 27 0
      nb-common/src/main/java/com/nb/common/delay_queue/manager/DelayMessageManager.java
  4. 95 0
      nb-common/src/main/java/com/nb/common/delay_queue/manager/RedissonDelayMessageManager.java
  5. 63 0
      nb-common/src/main/java/com/nb/common/delay_queue/message/DelayMessage.java
  6. 26 0
      nb-common/src/main/java/com/nb/common/delay_queue/message/DelayMessageProperties.java
  7. 0 7
      nb-system/src/main/java/com/nb/bus/entity/BusPatientEntity.java
  8. 7 10
      nb-system/src/main/java/com/nb/bus/hospital/HospitalManager.java
  9. 2 2
      nb-system/src/main/java/com/nb/bus/hospital/HospitalManagerRegister.java
  10. 1 11
      nb-system/src/main/java/com/nb/bus/hospital/config/AbstractHospitalConfigHandler.java
  11. 0 186
      nb-system/src/main/java/com/nb/bus/hospital/config/HospitalAutoUndoConfigHandler.java
  12. 148 0
      nb-system/src/main/java/com/nb/bus/hospital/config/HospitalDeviceAutoUndoConfigHandler.java
  13. 24 76
      nb-system/src/main/java/com/nb/bus/hospital/config/HospitalFinishMonitorConfigHandler.java
  14. 15 54
      nb-system/src/main/java/com/nb/bus/hospital/config/HospitalFunctionAnalConfigHandler.java
  15. 21 66
      nb-system/src/main/java/com/nb/bus/hospital/config/HospitalFunctionExtraConfigHandler.java
  16. 2 2
      nb-system/src/main/java/com/nb/bus/hospital/config/HospitalPatientCodeHandler.java
  17. 71 0
      nb-system/src/main/java/com/nb/bus/hospital/config/handler/AnalPoorDisappearHandler.java
  18. 72 0
      nb-system/src/main/java/com/nb/bus/hospital/config/handler/ClinicAutoFinishMonitorHandler.java
  19. 71 0
      nb-system/src/main/java/com/nb/bus/hospital/config/handler/DeputyDeviceAutoUndoHandler.java
  20. 35 0
      nb-system/src/main/java/com/nb/bus/hospital/config/handler/HandlerConstant.java
  21. 95 0
      nb-system/src/main/java/com/nb/bus/hospital/config/handler/NoSignalHandler.java
  22. 0 3
      nb-system/src/main/java/com/nb/bus/service/LocalBusPatientService.java

+ 41 - 0
nb-admin/src/test/java/com/nb/admin/DelayMessageTest.java

@@ -0,0 +1,41 @@
+package com.nb.admin;
+
+import com.nb.bus.hospital.config.HospitalDeviceAutoUndoConfigHandler;
+import com.nb.bus.hospital.config.handler.HandlerConstant;
+import com.nb.common.cache.value.Value;
+import com.nb.common.delay_queue.manager.DelayMessageManager;
+import com.nb.common.delay_queue.message.DelayMessage;
+import com.nb.common.delay_queue.message.DelayMessageProperties;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName DelayMessageTest.java
+ * @Description TODO
+ * @createTime 2022年07月09日 08:37:00
+ */
+@RunWith(SpringRunner.class)
+@Slf4j
+@SpringBootTest(classes = AdminApplication.class)
+public class DelayMessageTest {
+    @Autowired
+    DelayMessageManager delayMessageManager;
+    @Test
+    public void test(){
+        HospitalDeviceAutoUndoConfigHandler.UndoEntity timestamp = HospitalDeviceAutoUndoConfigHandler.UndoEntity.builder().timestamp(new Date()).build();
+        DelayMessage delayMessage = new DelayMessage(Value.simple(timestamp), HandlerConstant.DEVICE_AUTO_UNDO, DelayMessageProperties.of(TimeUnit.SECONDS, 30));
+        delayMessageManager.add(delayMessage);
+        while (true){
+
+        }
+    }
+}

+ 40 - 0
nb-common/src/main/java/com/nb/common/delay_queue/handler/DelayMessageHandler.java

@@ -0,0 +1,40 @@
+package com.nb.common.delay_queue.handler;
+
+import com.nb.common.delay_queue.message.DelayMessage;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName RedssionUtil.java
+ * @Description 延迟队列消息处理器
+ * @createTime 2022年05月17日 16:16:00
+ */
+public interface DelayMessageHandler {
+
+   /**
+    * 描述: 处理器id,以此与消息id进行匹配
+    * @author lifang
+    * @date 2022/7/8 21:09
+    * @param
+    * @return String
+    */
+    String getId();
+
+    /**
+     * 描述: 处理器描述
+     * @author lifang
+     * @date 2022/7/8 21:09
+     * @param
+     * @return String
+     */
+    String description();
+
+    /**
+     * 描述: 消息处理的具体执行方法
+     * @author lifang
+     * @date 2022/7/8 21:09
+     * @param message
+     * @return void
+     */
+    void handle(DelayMessage message);
+}

+ 27 - 0
nb-common/src/main/java/com/nb/common/delay_queue/manager/DelayMessageManager.java

@@ -0,0 +1,27 @@
+package com.nb.common.delay_queue.manager;
+
+import com.nb.common.delay_queue.message.DelayMessage;
+import org.springframework.beans.factory.DisposableBean;
+
+/**
+ * @author  lifang
+ * @date 20:38 2022/7/8
+ * @description 延迟消息管理器
+ **/
+public interface DelayMessageManager extends  DisposableBean {
+
+    /**
+     * 添加延时消息
+     *
+     * @param message 延时消息
+     */
+    void add(DelayMessage message);
+
+    /**
+     * 移除延时消息
+     *
+     * @param message 待移除的消息
+     * @return 移除成功返回true, 移除失败返回false
+     */
+    boolean remove(DelayMessage message);
+}

+ 95 - 0
nb-common/src/main/java/com/nb/common/delay_queue/manager/RedissonDelayMessageManager.java

@@ -0,0 +1,95 @@
+package com.nb.common.delay_queue.manager;
+
+
+import cn.hutool.core.util.ObjectUtil;
+import cn.hutool.json.JSONUtil;
+import com.nb.common.cache.value.Value;
+import com.nb.common.delay_queue.handler.DelayMessageHandler;
+import com.nb.common.delay_queue.message.DelayMessage;
+import com.nb.common.delay_queue.message.DelayMessageProperties;
+import com.nb.common.util.ExceptionUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.redisson.api.RBlockingQueue;
+import org.redisson.api.RDelayedQueue;
+import org.redisson.api.RedissonClient;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author  lifang
+ * @date 20:38 2022/7/8
+ * @description 基于redisson实现的延迟队列管理器
+ **/
+@Slf4j
+@Configuration
+@ConditionalOnBean({RedissonClient.class,DelayMessageHandler.class})
+public class RedissonDelayMessageManager implements DelayMessageManager {
+
+    private final RBlockingQueue<DelayMessage> blockingQueue;
+
+    private final RDelayedQueue<DelayMessage> delayedQueue;
+
+    private final List<DelayMessageHandler> handlers;
+    private static final String NAME="redisson-delay-message-queue";
+    @Autowired
+    public RedissonDelayMessageManager(RedissonClient redissonClient, List<DelayMessageHandler> handlers) {
+        this.blockingQueue = redissonClient.getBlockingQueue(NAME);
+        this.delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
+        this.handlers = handlers;
+
+        blockingQueue.subscribeOnElements(i->{
+            //开启新的线程消费,唯一线程消费,不可阻塞该异步线程
+            CompletableFuture.runAsync(()->{
+                log.info("redisson延迟队列开始消费数据,消息id【{}】,消息内容【{}】",i.getMsgId(),JSONUtil.toJsonStr(i.getBody()));
+                Optional<DelayMessageHandler> delayMessageHandler = handlers.stream().filter(handler -> ObjectUtil.equals(i.getHandlerId(), handler.getId()))
+                        .findFirst()
+                        .map(handler -> {
+                            handler.handle(i);
+                            return handler;
+                        });
+                if(!delayMessageHandler.isPresent()){
+                    log.warn("延迟队列消息处理失败,消息【{}】无相应的处理器处理",JSONUtil.toJsonStr(i));
+                }
+            })
+                    .whenComplete((__,t)->{
+                        if(t==null){
+                            log.info("redisson延迟队列中,数据【{}】消费完成",i.getMsgId());
+                        }else {
+                            log.error("redisson延迟队列中,消费数据【{}】失败,",i, ExceptionUtil.getExceptionMsg(t));
+                        }
+                    });
+        });
+    }
+
+    @PostConstruct
+    public void init(){
+        //初始化时并不会监听延迟队列,故先测试数据进入,开启监听
+        this.add(new DelayMessage(Value.simple("初始化数据"),"无", DelayMessageProperties.of(TimeUnit.SECONDS,3)));
+    }
+
+    @Override
+    public void add(DelayMessage message) {
+        if (delayedQueue.contains(message)) {
+            return;
+        }
+        log.info("redisson-delay-queue ,add message = 【{}】",JSONUtil.toJsonStr(message));
+        delayedQueue.offerAsync(message, message.getProperties().getExpire(), message.getProperties().getTimeUnit());
+    }
+
+    @Override
+    public boolean remove(DelayMessage message) {
+        return delayedQueue.remove(message);
+    }
+
+    @Override
+    public void destroy() {
+        delayedQueue.destroy();
+    }
+}

+ 63 - 0
nb-common/src/main/java/com/nb/common/delay_queue/message/DelayMessage.java

@@ -0,0 +1,63 @@
+package com.nb.common.delay_queue.message;
+
+import com.baomidou.mybatisplus.core.toolkit.IdWorker;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.nb.common.cache.value.Value;
+import com.nb.common.delay_queue.handler.DelayMessageHandler;
+import lombok.*;
+import org.springframework.lang.NonNull;
+import org.springframework.util.Assert;
+
+import java.io.Serializable;
+
+/**
+ * @author  lifang
+ * @date 20:38 2022/7/8
+ * @description 延迟消息
+ **/
+@Getter
+@Setter
+@ToString
+public class DelayMessage implements Serializable {
+    private static final long serialVersionUID = 9006297630420423520L;
+
+    /**
+     * 消息的唯一id,使用该字段移除消息
+     */
+    private Long msgId;
+
+    /**
+     * 内容
+     */
+    @NonNull
+    private Value body;
+
+    /**
+     * 消息处理器id
+     * @see DelayMessageHandler#getId()
+     */
+    @NonNull
+    private String handlerId;
+
+    /**
+     * 消息属性
+     */
+    @JsonIgnore
+    @NonNull
+    private DelayMessageProperties properties;
+
+    public DelayMessage(@NonNull Value body, @NonNull String handlerId, @NonNull DelayMessageProperties properties) {
+        this.msgId = IdWorker.getId();
+        this.body = body;
+        this.handlerId = handlerId;
+        this.properties = properties;
+    }
+
+
+
+    public void check() {
+        Assert.notNull(this.body, "delay message must not be null");
+        Assert.notNull(this.handlerId, "delay handlerId must not be null");
+        Assert.notNull(this.properties, "delay message properties must not be null");
+    }
+}

+ 26 - 0
nb-common/src/main/java/com/nb/common/delay_queue/message/DelayMessageProperties.java

@@ -0,0 +1,26 @@
+package com.nb.common.delay_queue.message;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
+public class DelayMessageProperties implements Serializable {
+
+    private static final long serialVersionUID = 1240631950524432277L;
+    /**
+     * 过期时间单位
+     */
+    private TimeUnit timeUnit;
+
+    /**
+     * 时长,实际的过期时间为 timeUnit * expire
+     */
+    private long expire;
+
+}

+ 0 - 7
nb-system/src/main/java/com/nb/bus/entity/BusPatientEntity.java

@@ -33,13 +33,6 @@ public class BusPatientEntity extends TenantGenericEntity<String,String> {
     @Length(max = 255,message = "病号长度不得超过255个字节")
     private String originCode;
 
-    @ApiModelProperty(value = "病人名称")
-    @Length(max = 255,message = "病人名称长度不得超过255个字节")
-    private String name;
-
-    @ApiModelProperty(value = "性别")
-    private SexEnum gender;
-
     @TableField
     @ApiModelProperty("病号最新的输注记录")
     private String infusionId;

+ 7 - 10
nb-system/src/main/java/com/nb/bus/hospital/HospitalManager.java

@@ -43,7 +43,7 @@ public class HospitalManager {
 
     private HisScriptSession scriptSession;
 
-    private HospitalAutoUndoConfigHandler autoUndoConfigHandler;
+    private HospitalDeviceAutoUndoConfigHandler autoUndoConfigHandler;
 
     private HospitalFinishMonitorConfigHandler finishMonitorConfigHandler;
 
@@ -69,21 +69,18 @@ public class HospitalManager {
 
     private LocalBusInfusionHistoryService infusionHistoryService;
     public HospitalManager(String hospitalId,
-                           DeviceRegistry deviceRegistry,
-                           WsPublishUtils wsPublishUtils,
                            ScriptManager scriptManager,
-                           ConfigStorageManager configStorageManager,
-                           RedissonUtil redissonUtil) {
+                           ConfigStorageManager configStorageManager) {
         this.hospitalId = hospitalId;
         this.hospitalService= SpringUtil.getBean(LocalBusHospitalService.class);
         this.hospitalConfigService = SpringUtil.getBean(LocalBusHospitalConfigService.class);
         this.scriptManager = scriptManager;
         this.storage=configStorageManager.getStorage(hospitalId);
-        this.autoUndoConfigHandler=new HospitalAutoUndoConfigHandler(storage,hospitalId,redissonUtil,SpringUtil.getBean(LocalBusInfusionHistoryService.class),deviceRegistry,wsPublishUtils,SpringUtil.getBean(LocalBusPatientService.class));
-        this.finishMonitorConfigHandler=new HospitalFinishMonitorConfigHandler(storage,hospitalId,redissonUtil,SpringUtil.getBean(LocalBusInfusionHistoryService.class),deviceRegistry,wsPublishUtils,SpringUtil.getBean(LocalBusPatientService.class));
-        this.analConfigHandler=new HospitalFunctionAnalConfigHandler(storage,hospitalId,redissonUtil,SpringUtil.getBean(LocalBusInfusionHistoryService.class),deviceRegistry,wsPublishUtils,SpringUtil.getBean(LocalBusDeviceAlarmService.class));
-        this.extraConfigHandler=new HospitalFunctionExtraConfigHandler(storage,hospitalId,redissonUtil,SpringUtil.getBean(LocalBusInfusionHistoryService.class),deviceRegistry,wsPublishUtils,finishMonitorConfigHandler,autoUndoConfigHandler,SpringUtil.getBean(LocalBusDeviceAlarmService.class));
-        this.codeHandler=new HospitalPatientCodeHandler(storage,hospitalId,redissonUtil,SpringUtil.getBean(LocalBusInfusionHistoryService.class),deviceRegistry,wsPublishUtils);
+        this.autoUndoConfigHandler=new HospitalDeviceAutoUndoConfigHandler(storage,hospitalId);
+        this.finishMonitorConfigHandler=new HospitalFinishMonitorConfigHandler(storage,hospitalId);
+        this.analConfigHandler=new HospitalFunctionAnalConfigHandler(storage,hospitalId);
+        this.extraConfigHandler=new HospitalFunctionExtraConfigHandler(storage,hospitalId,finishMonitorConfigHandler,autoUndoConfigHandler);
+        this.codeHandler=new HospitalPatientCodeHandler(storage,hospitalId);
         this.infusionHistoryService=SpringUtil.getBean(LocalBusInfusionHistoryService.class);
         init(configStorageManager);
     }

+ 2 - 2
nb-system/src/main/java/com/nb/bus/hospital/HospitalManagerRegister.java

@@ -36,13 +36,13 @@ public class HospitalManagerRegister {
 
     public void register(String hospitalId){
         managerMap.computeIfAbsent(hospitalId,k->
-                new HospitalManager(k,deviceRegistry,wsPublishUtils,scriptManager,configStorageManager,redissonUtil)
+                new HospitalManager(k,scriptManager,configStorageManager)
         );
     };
 
     public HospitalManager get(String hospitalId){
         return  managerMap.computeIfAbsent(hospitalId,k->
-                new HospitalManager(k,deviceRegistry,wsPublishUtils,scriptManager,configStorageManager,redissonUtil)
+                new HospitalManager(k,scriptManager,configStorageManager)
         );
     }
 

+ 1 - 11
nb-system/src/main/java/com/nb/bus/hospital/config/AbstractHospitalConfigHandler.java

@@ -16,25 +16,15 @@ import lombok.Getter;
  * @createTime 2022年05月19日 23:29:00
  */
 public abstract class AbstractHospitalConfigHandler<E,T> implements HospitalConfigHandler<E,T> {
-    protected final RedissonUtil redissonUtil;
 
-    protected  final LocalBusInfusionHistoryService infusionHistoryService;
-
-    protected final  DeviceRegistry deviceRegistry;
-
-    protected final  WsPublishUtils wsPublishUtils;
 
     protected final ConfigStorage configStorage;
     @Getter
     protected String hospitalId;
 
-    public AbstractHospitalConfigHandler(ConfigStorage configStorage, String hospitalId, RedissonUtil redissonUtil, LocalBusInfusionHistoryService infusionHistoryService, DeviceRegistry deviceRegistry, WsPublishUtils wsPublishUtils) {
+    public AbstractHospitalConfigHandler(ConfigStorage configStorage, String hospitalId) {
         this.hospitalId=hospitalId;
         this.configStorage=configStorage;
-        this.redissonUtil = redissonUtil;
-        this.infusionHistoryService = infusionHistoryService;
-        this.deviceRegistry = deviceRegistry;
-        this.wsPublishUtils = wsPublishUtils;
     }
 
 

+ 0 - 186
nb-system/src/main/java/com/nb/bus/hospital/config/HospitalAutoUndoConfigHandler.java

@@ -1,186 +0,0 @@
-package com.nb.bus.hospital.config;
-
-import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
-import com.nb.bus.entity.BusPatientEntity;
-import com.nb.bus.hospital.config.bean.FunctionAutoUndoConfig;
-import com.nb.bus.hospital.config.bean.FunctionFinishMonitorConfig;
-import com.nb.bus.entity.BusDeviceRunningEntity;
-import com.nb.bus.enums.DeviceStatusEnum;
-import com.nb.bus.hospital.enums.ConfigHandlerEnums;
-import com.nb.bus.registry.device.DeviceRegistry;
-import com.nb.bus.service.LocalBusInfusionHistoryService;
-import com.nb.bus.service.LocalBusPatientService;
-import com.nb.bus.service.dto.ManualUndoConfig;
-import com.nb.bus.service.dto.UndoDeviceConfig;
-import com.nb.bus.utils.WsPublishUtils;
-import com.nb.common.cache.ConfigStorage;
-import com.nb.common.entity.AbstractMsgId;
-import com.nb.common.util.RedissonUtil;
-import lombok.Builder;
-import lombok.Data;
-import org.redisson.api.RDelayedQueue;
-
-import java.util.Collections;
-import java.util.Date;
-import java.util.Objects;
-import java.util.concurrent.TimeUnit;
-
-/**
- * @author lifang
- * @version 1.0.0
- * @ClassName HospitalFinishMonitorConfigHandler.java
- * @Description 自动结束管理处理
- * @createTime 2022年05月19日 16:22:00
- */
-public class HospitalAutoUndoConfigHandler extends  AbstractHospitalConfigHandler<FunctionAutoUndoConfig, BusDeviceRunningEntity>{
-
-
-    private UndoEntity lastAutoUndo;
-    private UndoEntity lastNoSignalUndo;
-    private   final LocalBusPatientService patientService;
-    public HospitalAutoUndoConfigHandler(ConfigStorage configStorage, String hospitalId, RedissonUtil redissonUtil, LocalBusInfusionHistoryService infusionHistoryService, DeviceRegistry deviceRegistry, WsPublishUtils wsPublishUtils, LocalBusPatientService patientService) {
-        super(configStorage, hospitalId, redissonUtil, infusionHistoryService, deviceRegistry, wsPublishUtils);
-        this.patientService=patientService;
-    }
-
-    @Override
-    public ConfigHandlerEnums getId() {
-        return ConfigHandlerEnums.autoUndo;
-    }
-
-    @Override
-    public String getDescription() {
-        return "自动撤泵";
-    }
-
-    @Override
-    public void handler(BusDeviceRunningEntity source) {
-        FunctionAutoUndoConfig config = this.getConfig().as(FunctionAutoUndoConfig.class);
-        if(config==null||!Boolean.TRUE.equals(config.isEnable())|| Objects.isNull(config.getShutDownInterval())){
-            return;
-        }
-        judgeShutdownAutoUndo(source,config);
-
-
-        if(null!=lastNoSignalUndo){
-            RDelayedQueue noSignalUndoDelay = redissonUtil.getDelayedQueue(getId() +"-noSignalUndo", e -> {
-                if(e instanceof UndoEntity){
-                    this.handleAutoUndo((UndoEntity) e);
-                }
-            });
-            noSignalUndoDelay.remove(lastNoSignalUndo);
-        }
-    }
-
-    /**
-     * 描述: 不在服务区到撤泵处理
-     * @author lifang
-     * @date 2022/5/23 23:24
-     * @param device
-     * @param patientCode
-     * @param tenantId
-     * @return void
-     */
-    public void judgeNoSignalAutoUndo(String device, String patientCode, String tenantId, String infusionId){
-        FunctionFinishMonitorConfig config = this.getConfig().as(FunctionFinishMonitorConfig.class);
-        if(config==null||!Boolean.TRUE.equals(config.isEnable())|| Objects.isNull(config.getNoSignalInterval())){
-            return;
-        }
-        RDelayedQueue noSignalUndoDelay = redissonUtil.getDelayedQueue(getId() +"-noSignalUndo", e -> {
-            if(e instanceof UndoEntity){
-                this.handleAutoUndo((UndoEntity) e);
-            }
-        });
-        UndoDeviceConfig undoDeviceConfig = new UndoDeviceConfig();
-        undoDeviceConfig.setUndoBy(config.getUndoBy());
-        lastNoSignalUndo =UndoEntity.builder()
-                .deviceId(device)
-                .patientCode(patientCode)
-                .config(undoDeviceConfig)
-                .timeout(config.getNoSignalInterval())
-                .unit(TimeUnit.MINUTES)
-                .tenantId(tenantId)
-                .infusionId(infusionId)
-                .timestamp(new Date())
-                .build();
-        noSignalUndoDelay.offer(lastNoSignalUndo,lastNoSignalUndo.getTimeout(),lastNoSignalUndo.getUnit());
-    }
-
-    /**
-     * 描述: 处理关机到撤泵时间(副泵)
-     * @author lifang
-     * @date 2022/5/19 16:51
-     * @param source
-     * @param config
-     * @return void
-     */
-    private void judgeShutdownAutoUndo(BusDeviceRunningEntity source, FunctionAutoUndoConfig config ){
-        //自动撤泵(副泵)
-        RDelayedQueue autoUndoDevice = redissonUtil.getDelayedQueue(getId() +"-autoUndoDevice", e -> {
-            if(e instanceof UndoEntity){
-                this.handleAutoUndo((UndoEntity) e);
-            }
-        });
-        UndoDeviceConfig undoDeviceConfig = new UndoDeviceConfig();
-        undoDeviceConfig.setUndoBy(config.getUndoBy());
-        if(DeviceStatusEnum.Shutdown.equals(source.getRunState())){
-            lastAutoUndo = UndoEntity.builder()
-                    .deviceId(source.getDeviceId())
-                    .patientCode(source.getPatientCode())
-                    .config(undoDeviceConfig)
-                    .clinicId(source.getClinicId())
-                    .infusionId(source.getInfusionId())
-                    .timeout(config.getShutDownInterval())
-                    .unit(TimeUnit.MINUTES)
-                    .tenantId(source.getTenantId())
-                    .timestamp(new Date())
-                    .build();
-            autoUndoDevice.offer(lastAutoUndo,config.getShutDownInterval(),lastAutoUndo.getUnit());
-        }else{
-            if(lastAutoUndo!=null){
-                autoUndoDevice.remove(lastAutoUndo);
-            }
-        }
-
-
-    }
-
-    /**
-     * 描述: 关机/不在服务区自动撤泵处理(副泵)
-     * @author lifang
-     * @date 2022/5/30 17:44
-     * @param source
-     * @return void
-     */
-    private void handleAutoUndo(UndoEntity source){
-        String infusionId = source.getInfusionId();
-        BusPatientEntity patient = patientService.getOne(new QueryWrapper<BusPatientEntity>().lambda().eq(BusPatientEntity::getCode, source.getPatientCode())
-                .eq(BusPatientEntity::getTenantId, source.getTenantId()));
-        if (!infusionId.equals(patient.getInfusionId())) {
-            //只有副泵会自动撤泵
-            ManualUndoConfig manualUndoConfig = new ManualUndoConfig();
-            manualUndoConfig.setInfusionIds(Collections.singletonList(source.getInfusionId()));
-            manualUndoConfig.setPatientCode(source.getPatientCode());
-            manualUndoConfig.setClinicId(source.getClinicId());
-            manualUndoConfig.setMonitorType(true);
-            manualUndoConfig.setUndo(source.getConfig());
-            manualUndoConfig.setTenantId(source.getTenantId());
-            infusionHistoryService.undo(manualUndoConfig,false);
-        }
-    }
-
-
-    @Data
-    @Builder
-    static class UndoEntity extends AbstractMsgId {
-        private String deviceId;
-        private String patientCode;
-        private String clinicId;
-        private String infusionId;
-        private UndoDeviceConfig config;
-        private Integer timeout;
-        private TimeUnit unit;
-        private String tenantId;
-        private Date timestamp;
-    }
-}

+ 148 - 0
nb-system/src/main/java/com/nb/bus/hospital/config/HospitalDeviceAutoUndoConfigHandler.java

@@ -0,0 +1,148 @@
+package com.nb.bus.hospital.config;
+
+import cn.hutool.extra.spring.SpringUtil;
+import com.nb.bus.hospital.config.bean.FunctionAutoUndoConfig;
+import com.nb.bus.hospital.config.bean.FunctionFinishMonitorConfig;
+import com.nb.bus.entity.BusDeviceRunningEntity;
+import com.nb.bus.enums.DeviceStatusEnum;
+import com.nb.bus.hospital.config.handler.HandlerConstant;
+import com.nb.bus.hospital.enums.ConfigHandlerEnums;
+import com.nb.bus.registry.device.DeviceRegistry;
+import com.nb.bus.service.LocalBusInfusionHistoryService;
+import com.nb.bus.service.dto.UndoDeviceConfig;
+import com.nb.bus.utils.WsPublishUtils;
+import com.nb.common.cache.ConfigStorage;
+import com.nb.common.cache.value.Value;
+import com.nb.common.delay_queue.manager.DelayMessageManager;
+import com.nb.common.delay_queue.message.DelayMessage;
+import com.nb.common.delay_queue.message.DelayMessageProperties;
+import com.nb.common.util.RedissonUtil;
+import lombok.Builder;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.Date;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName HospitalDeviceAutoUndoConfigHandler.java
+ * @Description 自动撤泵(副泵)处理
+ * @createTime 2022年05月19日 16:22:00
+ */
+public class HospitalDeviceAutoUndoConfigHandler extends  AbstractHospitalConfigHandler<FunctionAutoUndoConfig, BusDeviceRunningEntity>{
+
+    private DelayMessage lastShutDownDelayMessage;
+    private DelayMessage lastNoSignalDelayMessage;
+    private DelayMessageManager delayMessageManager= SpringUtil.getBean(DelayMessageManager.class);
+    public HospitalDeviceAutoUndoConfigHandler(ConfigStorage configStorage, String hospitalId) {
+        super(configStorage, hospitalId);
+    }
+
+    @Override
+    public ConfigHandlerEnums getId() {
+        return ConfigHandlerEnums.autoUndo;
+    }
+
+    @Override
+    public String getDescription() {
+        return "自动撤泵";
+    }
+
+    @Override
+    public void handler(BusDeviceRunningEntity source) {
+        FunctionAutoUndoConfig config = this.getConfig().as(FunctionAutoUndoConfig.class);
+        if(config==null||!Boolean.TRUE.equals(config.isEnable())|| Objects.isNull(config.getShutDownInterval())){
+            return;
+        }
+        judgeShutdownAutoUndo(source,config);
+
+
+        if(null!=lastNoSignalDelayMessage){
+            delayMessageManager.remove(lastNoSignalDelayMessage);
+        }
+    }
+
+    /**
+     * 描述: 不在服务区到撤泵处理
+     * @author lifang
+     * @date 2022/5/23 23:24
+     * @param device
+     * @param patientId
+     * @param tenantId
+     * @param uploadTime
+     * @return void
+     */
+    public void judgeNoSignalAutoUndo(String device, String patientId, String tenantId, String infusionId, Date uploadTime){
+        FunctionFinishMonitorConfig config = this.getConfig().as(FunctionFinishMonitorConfig.class);
+        if(config==null||!Boolean.TRUE.equals(config.isEnable())|| Objects.isNull(config.getNoSignalInterval())){
+            return;
+        }
+
+        UndoDeviceConfig undoDeviceConfig = new UndoDeviceConfig();
+        undoDeviceConfig.setUndoBy(config.getUndoBy());
+
+        UndoEntity undo = UndoEntity.builder()
+                .deviceId(device)
+                .patientId(patientId)
+                .config(undoDeviceConfig)
+                .tenantId(tenantId)
+                .uploadTime(uploadTime)
+                .infusionId(infusionId)
+                .timestamp(new Date())
+                .build();
+
+        lastNoSignalDelayMessage = new DelayMessage(Value.simple(undo), HandlerConstant.DEVICE_AUTO_UNDO, DelayMessageProperties.of(TimeUnit.MINUTES, config.getNoSignalInterval()));
+
+        delayMessageManager.add(lastNoSignalDelayMessage);
+    }
+
+    /**
+     * 描述: 处理关机到撤泵时间(副泵)
+     * @author lifang
+     * @date 2022/5/19 16:51
+     * @param source
+     * @param config
+     * @return void
+     */
+    private void judgeShutdownAutoUndo(BusDeviceRunningEntity source, FunctionAutoUndoConfig config ){
+        UndoDeviceConfig undoDeviceConfig = new UndoDeviceConfig();
+        undoDeviceConfig.setUndoBy(config.getUndoBy());
+        if(DeviceStatusEnum.Shutdown.equals(source.getRunState())){
+            UndoEntity undo = UndoEntity.builder()
+                    .deviceId(source.getDeviceId())
+                    .patientId(source.getPatientCode())
+                    .config(undoDeviceConfig)
+                    .clinicId(source.getClinicId())
+                    .infusionId(source.getInfusionId())
+                    .tenantId(source.getTenantId())
+                    .uploadTime(source.getUploadTime())
+                    .timestamp(new Date())
+                    .build();
+            lastShutDownDelayMessage = new DelayMessage(Value.simple(undo), HandlerConstant.DEVICE_AUTO_UNDO, DelayMessageProperties.of(TimeUnit.MINUTES, config.getNoSignalInterval()));
+            delayMessageManager.add(lastShutDownDelayMessage);
+        }else{
+            if(lastShutDownDelayMessage!=null){
+                delayMessageManager.remove(lastShutDownDelayMessage);
+            }
+        }
+
+
+    }
+
+
+    @Data
+    @Builder
+    public static class UndoEntity implements Serializable {
+        private String deviceId;
+        private String patientId;
+        private String clinicId;
+        private String infusionId;
+        private UndoDeviceConfig config;
+        private String tenantId;
+        private Date uploadTime;
+        private Date timestamp;
+    }
+}

+ 24 - 76
nb-system/src/main/java/com/nb/bus/hospital/config/HospitalFinishMonitorConfigHandler.java

@@ -1,26 +1,26 @@
 package com.nb.bus.hospital.config;
 
 
-import com.nb.bus.entity.BusInfusionHistoryEntity;
-import com.nb.bus.entity.BusPatientEntity;
+import cn.hutool.extra.spring.SpringUtil;
 import com.nb.bus.hospital.config.bean.FunctionFinishMonitorConfig;
 import com.nb.bus.entity.BusDeviceRunningEntity;
 import com.nb.bus.enums.DeviceStatusEnum;
+import com.nb.bus.hospital.config.handler.HandlerConstant;
 import com.nb.bus.hospital.enums.ConfigHandlerEnums;
 import com.nb.bus.registry.device.DeviceRegistry;
 import com.nb.bus.service.LocalBusInfusionHistoryService;
-import com.nb.bus.service.LocalBusPatientService;
-import com.nb.bus.service.dto.ManualUndoConfig;
 import com.nb.bus.service.dto.UndoDeviceConfig;
 import com.nb.bus.utils.WsPublishUtils;
 import com.nb.common.cache.ConfigStorage;
-import com.nb.common.entity.AbstractMsgId;
-import com.nb.common.util.RedissonUtil;
+import com.nb.common.cache.value.Value;
+import com.nb.common.delay_queue.manager.DelayMessageManager;
+import com.nb.common.delay_queue.message.DelayMessage;
+import com.nb.common.delay_queue.message.DelayMessageProperties;
 import lombok.Builder;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
-import org.redisson.api.RDelayedQueue;
-import java.util.Collections;
+
+import java.io.Serializable;
 import java.util.Date;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
@@ -35,13 +35,11 @@ import java.util.concurrent.TimeUnit;
 @Slf4j
 public class HospitalFinishMonitorConfigHandler extends AbstractHospitalConfigHandler<FunctionFinishMonitorConfig, BusDeviceRunningEntity>{
 
-
-    private FinishMonitorEntity lastShutDownFinish;
-    private FinishMonitorEntity lastNoSignalFinish;
-    private LocalBusPatientService patientService;
-    public HospitalFinishMonitorConfigHandler(ConfigStorage configStorage, String hospitalId, RedissonUtil redissonUtil,LocalBusInfusionHistoryService infusionHistoryService, DeviceRegistry deviceRegistry, WsPublishUtils wsPublishUtils, LocalBusPatientService patientService) {
-        super(configStorage, hospitalId, redissonUtil, infusionHistoryService, deviceRegistry, wsPublishUtils);
-        this.patientService=patientService;
+    private DelayMessage lastShutDownFinish;
+    private DelayMessage lastNoSignalFinish;
+    private DelayMessageManager delayMessageManager= SpringUtil.getBean(DelayMessageManager.class);
+    public HospitalFinishMonitorConfigHandler(ConfigStorage configStorage, String hospitalId) {
+        super(configStorage, hospitalId);
     }
 
     @Override
@@ -64,12 +62,7 @@ public class HospitalFinishMonitorConfigHandler extends AbstractHospitalConfigHa
         judgeShutdownAutoFinish(source,config);
 
         if(lastNoSignalFinish !=null){
-            RDelayedQueue<AbstractMsgId> noSignalFinishDelay = redissonUtil.getDelayedQueue(getId() +"-noSignalFinish", e -> {
-                if(e instanceof FinishMonitorEntity){
-                    this.handleFinishMonitor((FinishMonitorEntity) e);
-                }
-            });
-            noSignalFinishDelay.remove(lastNoSignalFinish);
+            delayMessageManager.remove(lastNoSignalFinish);
         }
     }
 
@@ -90,27 +83,21 @@ public class HospitalFinishMonitorConfigHandler extends AbstractHospitalConfigHa
         if(config==null||!Boolean.TRUE.equals(config.isEnable())|| Objects.isNull(config.getNoSignalInterval())){
             return;
         }
-        RDelayedQueue<AbstractMsgId> noSignalFinishDelay = redissonUtil.getDelayedQueue(getId() +"-noSignalFinish", e -> {
-            if(e instanceof FinishMonitorEntity){
-                this.handleFinishMonitor((FinishMonitorEntity) e);
-            }
-        });
+
         UndoDeviceConfig undoDeviceConfig = new UndoDeviceConfig();
         undoDeviceConfig.setUndoBy(config.getUndoBy());
-        lastNoSignalFinish = FinishMonitorEntity.builder()
+        FinishMonitorEntity body = FinishMonitorEntity.builder()
                 .deviceId(device)
-                .patientCode(patientCode)
                 .config(undoDeviceConfig)
                 .infusionId(infusionId)
                 .patientId(patientId)
-                .timeout(config.getNoSignalInterval())
                 .uploadTime(uploadTime)
-                .unit(TimeUnit.DAYS)
                 .tenantId(tenantId)
                 .timestamp(new Date())
                 .build();
 
-        redissonUtil.offerQueue(noSignalFinishDelay, lastNoSignalFinish, lastNoSignalFinish.getTimeout(), lastNoSignalFinish.getUnit());
+        lastNoSignalFinish = new DelayMessage(Value.simple(body), HandlerConstant.CLINIC_AUTO_FINISH, DelayMessageProperties.of(TimeUnit.DAYS, config.getNoSignalInterval()));
+        delayMessageManager.add(lastNoSignalFinish);
     }
 
     /**
@@ -122,74 +109,35 @@ public class HospitalFinishMonitorConfigHandler extends AbstractHospitalConfigHa
      * @return void
      */
     private void judgeShutdownAutoFinish(BusDeviceRunningEntity source, FunctionFinishMonitorConfig config){
-        RDelayedQueue<AbstractMsgId> shutdownFinishDelay = redissonUtil.getDelayedQueue(getId() +"-ShutdownFinish", e -> {
-            if(e instanceof FinishMonitorEntity){
-                this.handleFinishMonitor((FinishMonitorEntity) e);
-            }
-        });
         UndoDeviceConfig undoDeviceConfig = new UndoDeviceConfig();
         undoDeviceConfig.setUndoBy(config.getUndoBy());
         //删除延迟队列消息
         if(lastShutDownFinish!=null){
-            shutdownFinishDelay.remove(lastShutDownFinish);
+            delayMessageManager.remove(lastShutDownFinish);
         }
+
         if(DeviceStatusEnum.Shutdown.equals(source.getRunState())){
-            lastShutDownFinish = FinishMonitorEntity.builder()
+            FinishMonitorEntity body = FinishMonitorEntity.builder()
                     .deviceId(source.getDeviceId())
-                    .patientCode(source.getPatientCode())
                     .patientId(source.getPatientId())
                     .config(undoDeviceConfig)
-                    .timeout(config.getShutDownInterval())
                     .uploadTime(source.getUploadTime())
-                    .unit(TimeUnit.DAYS)
                     .tenantId(source.getTenantId())
                     .timestamp(new Date())
                     .build();
-            redissonUtil.offerQueue(shutdownFinishDelay,lastShutDownFinish,config.getShutDownInterval(),lastShutDownFinish.getUnit());
-        }
-    }
-
-    /**
-     * 描述: 自动结束管理处理
-     * @author lifang
-     * @date 2022/5/19 22:35
-     * @param source
-     * @return void
-     */
-    private void handleFinishMonitor(FinishMonitorEntity source){
-        log.info("延迟任务【{}】,结束临床管理【{}】",source.getId(),source);
-        //判断是否为主输注,只有主输注才可结束临床
-        String patientId = source.getPatientId();
-        BusPatientEntity patient = patientService.getById(patientId);
-        if (source.getInfusionId().equals(patient.getInfusionId())) {
-            BusInfusionHistoryEntity infusionHistory = infusionHistoryService.getById(patient.getInfusionId());
-            Date uploadTime = source.getUploadTime();
-            if (infusionHistory.getLastUploadTime().equals(uploadTime)) {
-                UndoDeviceConfig config = source.getConfig();
-                ManualUndoConfig manualUndoConfig = new ManualUndoConfig();
-                config.setUndoTime(new Date());
-                manualUndoConfig.setTenantId(source.getTenantId());
-                manualUndoConfig.setUndo(config);
-                manualUndoConfig.setMonitorType(true);
-                manualUndoConfig.setClinicId(patient.getClinicId());
-                manualUndoConfig.setPatientCode(source.getPatientCode());
-                manualUndoConfig.setInfusionIds(Collections.singletonList(source.getInfusionId()));
-                infusionHistoryService.undo(manualUndoConfig,true);
-            }
+            lastShutDownFinish = new DelayMessage(Value.simple(body), HandlerConstant.CLINIC_AUTO_FINISH, DelayMessageProperties.of(TimeUnit.DAYS, config.getShutDownInterval()));
+            delayMessageManager.add(lastShutDownFinish);
         }
     }
 
     @Data
     @Builder
-    static class FinishMonitorEntity extends AbstractMsgId {
+    public static class FinishMonitorEntity implements Serializable {
         private String deviceId;
         private String patientId;
-        private String patientCode;
         private UndoDeviceConfig config;
         private String clinicId;
         private String infusionId;
-        private Integer timeout;
-        private TimeUnit unit;
         private String tenantId;
         private Date uploadTime;
         private Date timestamp;

+ 15 - 54
nb-system/src/main/java/com/nb/bus/hospital/config/HospitalFunctionAnalConfigHandler.java

@@ -3,26 +3,27 @@ package com.nb.bus.hospital.config;
 
 import cn.hutool.core.collection.CollUtil;
 import cn.hutool.core.date.DateUtil;
+import cn.hutool.extra.spring.SpringUtil;
 import cn.hutool.json.JSONUtil;
-import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
 import com.nb.bus.hospital.config.bean.FunctionAnalConfig;
 import com.nb.bus.entity.BusDeviceRunningEntity;
-import com.nb.bus.entity.BusInfusionHistoryEntity;
+import com.nb.bus.hospital.config.handler.HandlerConstant;
 import com.nb.bus.hospital.enums.ConfigHandlerEnums;
 import com.nb.bus.registry.device.DeviceRegistry;
-import com.nb.bus.service.LocalBusDeviceAlarmService;
 import com.nb.bus.service.LocalBusInfusionHistoryService;
 import com.nb.bus.utils.WsPublishUtils;
 import com.nb.common.cache.ConfigStorage;
 import com.nb.common.cache.value.Value;
-import com.nb.common.entity.AbstractMsgId;
+import com.nb.common.delay_queue.manager.DelayMessageManager;
+import com.nb.common.delay_queue.message.DelayMessage;
+import com.nb.common.delay_queue.message.DelayMessageProperties;
 import com.nb.common.util.ExceptionUtil;
 import com.nb.common.util.RedissonUtil;
 import lombok.Builder;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
-import org.redisson.api.RDelayedQueue;
 
+import java.io.Serializable;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -36,11 +37,12 @@ import java.util.stream.Collectors;
  */
 @Slf4j
 public class HospitalFunctionAnalConfigHandler extends AbstractHospitalConfigHandler<FunctionAnalConfig,BusDeviceRunningEntity> {
+    private DelayMessageManager delayMessageManager= SpringUtil.getBean(DelayMessageManager.class);
+    private RedissonUtil redissonUtil= SpringUtil.getBean(RedissonUtil.class);
 
 
-
-    public HospitalFunctionAnalConfigHandler(ConfigStorage configStorage, String hospitalId, RedissonUtil redissonUtil, LocalBusInfusionHistoryService infusionHistoryService, DeviceRegistry deviceRegistry, WsPublishUtils wsPublishUtils,LocalBusDeviceAlarmService alarmService) {
-        super(configStorage, hospitalId, redissonUtil, infusionHistoryService, deviceRegistry, wsPublishUtils);
+    public HospitalFunctionAnalConfigHandler(ConfigStorage configStorage, String hospitalId) {
+        super(configStorage, hospitalId);
     }
 
     @Override
@@ -138,23 +140,17 @@ public class HospitalFunctionAnalConfigHandler extends AbstractHospitalConfigHan
                 log.info("消息【{}】,设备【{}】由引起镇痛不足--实时判定",source.getMsgId(),source.getDeviceId());
                 source.setWarnAnalgesicPoor(true);
 
-                //镇痛消失延迟队列
-                RDelayedQueue<AbstractMsgId> noneAnalDelayedQueue = redissonUtil.getDelayedQueue("none-" + getId(), e -> {
-                    if(e instanceof NoneAnalEntity){
-                        handleAnalFinish((NoneAnalEntity) e);
-                    }
-                });
                 NoneAnalEntity noneAnal = NoneAnalEntity.builder()
                         .deviceId(firstAnal.getDeviceId())
                         .tenantId(firstAnal.getTenantId())
-                        .timeout(analConfig.getDisappearTime())
-                        .unit(TimeUnit.MINUTES)
                         .iotMsgId(source.getMsgId())
                         .infusionId(firstAnal.getInfusionId())
                         .historyId(firstAnal.getHistoryId())
                         .timestamp(new Date())
                         .build();
-                redissonUtil.offerQueue(noneAnalDelayedQueue,noneAnal,noneAnal.getTimeout(),noneAnal.getUnit());
+                //镇痛消失延迟队列
+                DelayMessage delayMessage = new DelayMessage(Value.simple(noneAnal), HandlerConstant.ANAL_POOR_DISAPPEAR, DelayMessageProperties.of(TimeUnit.MINUTES, analConfig.getDisappearTime()));
+                delayMessageManager.add(delayMessage);
             }
         }catch (Exception e){
             log.debug("消息【{}】,镇痛不足缓存解析失败,设备号【{}】",source.getMsgId(),source.getDeviceId(), ExceptionUtil.getExceptionMsg(e));
@@ -167,42 +163,9 @@ public class HospitalFunctionAnalConfigHandler extends AbstractHospitalConfigHan
     }
 
 
-    /**
-     * 描述: 镇痛消失处理
-     * 取消镇痛时,谨遵询谁引起谁取消的原则
-     * 即根据阿里云所发送的消息id进行判定
-     * @author lifang
-     * @date 2022/5/30 21:09
-     * @param noneAnal
-     * @return void
-     */
-    private void handleAnalFinish(NoneAnalEntity noneAnal){
-        String deviceId = noneAnal.getDeviceId();
-        BusInfusionHistoryEntity infusionHistory = infusionHistoryService.getById(noneAnal.getInfusionId());
-        if(Boolean.TRUE.equals(infusionHistory.getFinished())){
-            //泵已换绑医院,无需再处理
-            return;
-        }
-        //镇痛不足取消
-        if(Boolean.TRUE.equals(infusionHistory.getWarnAnalgesicPoor())){
-            log.info("消息【{}】,设备【{}】取消镇痛不足",noneAnal.getIotMsgId(),deviceId);
-            //那条消息引起的镇痛不足,就由那条消息的取消镇痛不足任务来结束
-            infusionHistoryService.update(new UpdateWrapper<BusInfusionHistoryEntity>().lambda()
-                    .eq(BusInfusionHistoryEntity::getId,infusionHistory.getId())
-                    .eq(BusInfusionHistoryEntity::getWarnAnalgesicPoor,true)
-                    .eq(BusInfusionHistoryEntity::getFlowRestricted,false)
-                    .eq(BusInfusionHistoryEntity::getAnalPoorMsgId,noneAnal.getIotMsgId())
-                    .set(BusInfusionHistoryEntity::getWarnAnalgesicPoor,false));
-            deviceRegistry.getOperator(deviceId).setAlarmOrWarn(null);
-            //发布推送
-            wsPublishUtils.publishPatientMonitor(infusionHistory.getPatientId(),infusionHistory.getTenantId());
-        }
-
-
-    }
     @Data
     @Builder
-    static class AnalEntity extends AbstractMsgId {
+    public static class AnalEntity implements Serializable {
         private String deviceId;
         private Integer pcaValidCount;
         private Integer pcaInvalidCount;
@@ -218,10 +181,8 @@ public class HospitalFunctionAnalConfigHandler extends AbstractHospitalConfigHan
 
     @Data
     @Builder
-    static class NoneAnalEntity extends AbstractMsgId   {
+    public static class NoneAnalEntity implements Serializable  {
         private String deviceId;
-        private Integer timeout;
-        private TimeUnit unit;
         private String tenantId;
         private String historyId;
         private Date timestamp;

+ 21 - 66
nb-system/src/main/java/com/nb/bus/hospital/config/HospitalFunctionExtraConfigHandler.java

@@ -1,5 +1,6 @@
 package com.nb.bus.hospital.config;
 
+import cn.hutool.extra.spring.SpringUtil;
 import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
 import com.nb.bus.entity.BusDeviceAlarmEntity;
 import com.nb.bus.enums.FlowStatusEnum;
@@ -8,6 +9,7 @@ import com.nb.bus.entity.BusDeviceRunningEntity;
 import com.nb.bus.entity.BusInfusionHistoryEntity;
 import com.nb.bus.enums.DeviceStatusEnum;
 import com.nb.bus.enums.DeviceTypeEnum;
+import com.nb.bus.hospital.config.handler.HandlerConstant;
 import com.nb.bus.hospital.enums.ConfigHandlerEnums;
 import com.nb.bus.registry.device.DeviceOperator;
 import com.nb.bus.registry.device.DeviceRegistry;
@@ -15,6 +17,10 @@ import com.nb.bus.service.LocalBusDeviceAlarmService;
 import com.nb.bus.service.LocalBusInfusionHistoryService;
 import com.nb.bus.utils.WsPublishUtils;
 import com.nb.common.cache.ConfigStorage;
+import com.nb.common.cache.value.Value;
+import com.nb.common.delay_queue.manager.DelayMessageManager;
+import com.nb.common.delay_queue.message.DelayMessage;
+import com.nb.common.delay_queue.message.DelayMessageProperties;
 import com.nb.common.entity.AbstractMsgId;
 import com.nb.common.util.RedissonUtil;
 import lombok.Builder;
@@ -22,6 +28,7 @@ import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.redisson.api.RDelayedQueue;
 
+import java.io.Serializable;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
 
@@ -38,18 +45,15 @@ import java.util.concurrent.TimeUnit;
 public class HospitalFunctionExtraConfigHandler  extends  AbstractHospitalConfigHandler<FunctionExtraConfig, BusDeviceRunningEntity>{
 
     private HospitalFinishMonitorConfigHandler monitorConfigHandler;
-    private HospitalAutoUndoConfigHandler autoUndoConfigHandler;
-    private LocalBusDeviceAlarmService alarmService;
-
-    public HospitalFunctionExtraConfigHandler(ConfigStorage configStorage, String hospitalId, RedissonUtil redissonUtil, LocalBusInfusionHistoryService infusionHistoryService,
-                                              DeviceRegistry deviceRegistry, WsPublishUtils wsPublishUtils,
+    private HospitalDeviceAutoUndoConfigHandler autoUndoConfigHandler;
+    private DelayMessageManager delayMessageManager= SpringUtil.getBean(DelayMessageManager.class);
+    private  DeviceRegistry deviceRegistry=SpringUtil.getBean( DeviceRegistry.class);
+    public HospitalFunctionExtraConfigHandler(ConfigStorage configStorage, String hospitalId,
                                               HospitalFinishMonitorConfigHandler monitorConfigHandler,
-                                              HospitalAutoUndoConfigHandler autoUndoConfigHandler,
-                                              LocalBusDeviceAlarmService alarmService) {
-        super(configStorage, hospitalId, redissonUtil, infusionHistoryService, deviceRegistry, wsPublishUtils);
+                                              HospitalDeviceAutoUndoConfigHandler autoUndoConfigHandler) {
+        super(configStorage, hospitalId);
         this.monitorConfigHandler=monitorConfigHandler;
         this.autoUndoConfigHandler=autoUndoConfigHandler;
-        this.alarmService=alarmService;
     }
 
     @Override
@@ -89,71 +93,21 @@ public class HospitalFunctionExtraConfigHandler  extends  AbstractHospitalConfig
         if(DeviceStatusEnum.Shutdown.equals(source.getRunState())){
             return;
         }
-        RDelayedQueue<AbstractMsgId> delayedQueue = redissonUtil.getDelayedQueue(getId() +"-nosignal", e -> {
-            if(e instanceof  NoSignalEntity){
-                this.handleNoSignal((NoSignalEntity) e);
-            }
-        });
-
         NoSignalEntity lastNoSignalWarn = NoSignalEntity.builder()
                 .deviceId(source.getDeviceId())
                 .patientCode(source.getPatientCode())
                 .tenantId(source.getTenantId())
                 .timestamp(new Date())
+                .autoUndoConfigHandler(autoUndoConfigHandler)
+                .monitorConfigHandler(monitorConfigHandler)
                 .historyId(source.getHistoryId())
                 .infusionId(source.getInfusionId())
                 .uploadTime(source.getUploadTime())
-                .timeout(interval)
-                .unit(TimeUnit.MINUTES)
                 .build();
-        //将该次消息视为最后一次消息放入队列中
-        redissonUtil.offerQueue(delayedQueue,lastNoSignalWarn,lastNoSignalWarn.getTimeout(),lastNoSignalWarn.getUnit());
+        DelayMessage delayMessage = new DelayMessage(Value.simple(lastNoSignalWarn), HandlerConstant.NO_SIGNAL, DelayMessageProperties.of(TimeUnit.MINUTES, interval));
+        delayMessageManager.add(delayMessage);
     }
 
-    /**
-     * 描述: 不在服务区处理
-     * @author lifang
-     * @date 2022/5/19 16:02
-     * @param source
-     * @return void
-     */
-    private void handleNoSignal(NoSignalEntity source){
-        String deviceId = source.getDeviceId();
-        Date lastUploadTime =  deviceRegistry.getOperator(source.getDeviceId()).getUploadTime();
-        if(!lastUploadTime.equals(source.getUploadTime())){
-            return;
-        }
-        BusInfusionHistoryEntity infusionHistory = infusionHistoryService.getById(source.getInfusionId());
-        if (Boolean.TRUE.equals(infusionHistory.getFinished())) {
-            //输注已结束
-            if(log.isDebugEnabled()){
-                log.debug("不在服务区消息【{}】由于输注已结束,不在继续处理该消息,消息【{}】",source.getId(),source);
-            }
-            return;
-        }
-        //不在服务区报警信息保存
-        Date alarmTime=new Date(source.getTimestamp().getTime()+source.getUnit().toMillis(source.getTimeout()));
-        BusDeviceAlarmEntity alarm = BusDeviceAlarmEntity.noSignalOf(source.getDeviceId(),source.getInfusionId(),source.getHistoryId(),source.getTenantId(),alarmTime);
-        if(log.isDebugEnabled()){
-            log.debug("消息【{}】处理,保存不在服务区报警",source);
-        }
-        alarmService.save(alarm);
-
-        boolean update = infusionHistoryService.update(new UpdateWrapper<BusInfusionHistoryEntity>().lambda()
-                .eq(BusInfusionHistoryEntity::getId, infusionHistory.getId())
-                .eq(BusInfusionHistoryEntity::getLastUploadTime, source.getUploadTime())
-                .set(BusInfusionHistoryEntity::getRunState, DeviceStatusEnum.NoSignal));
-        //报警/提醒缓存重置
-        if(update){
-            log.info("消息【{}】处理成功,输注【{}】变为【不在服务区】状态",source.getId(),source.getInfusionId());
-            wsPublishUtils.publishPatientMonitor(infusionHistory.getPatientId(), infusionHistory.getTenantId());
-            //不在服务区
-            monitorConfigHandler.judgeNoSignalAutoFinish(infusionHistory.getPatientId(),deviceId,source.getPatientCode(),source.getTenantId(),source.getInfusionId(),source.getUploadTime());
-            autoUndoConfigHandler.judgeNoSignalAutoUndo(deviceId,source.getPatientCode(),source.getTenantId(),source.getInfusionId());
-        }else {
-            log.info("消息【{}】处理失败,输注【{}】变为【不在服务区】状态失败",source.getId(),source.getInfusionId());
-        }
-    }
 
 
     /**
@@ -200,15 +154,16 @@ public class HospitalFunctionExtraConfigHandler  extends  AbstractHospitalConfig
 
     @Data
     @Builder
-    static class NoSignalEntity extends AbstractMsgId {
+    public static class NoSignalEntity implements Serializable {
         private String deviceId;
         private String patientCode;
-        private Integer timeout;
-        private TimeUnit unit;
         private String tenantId;
         private String infusionId;
         private String historyId;
         private Date uploadTime;
         private Date timestamp;
+        private HospitalFinishMonitorConfigHandler monitorConfigHandler;
+        private HospitalDeviceAutoUndoConfigHandler autoUndoConfigHandler;
+
     }
 }

+ 2 - 2
nb-system/src/main/java/com/nb/bus/hospital/config/HospitalPatientCodeHandler.java

@@ -24,8 +24,8 @@ import java.util.Objects;
 @Slf4j
 public class HospitalPatientCodeHandler  extends AbstractHospitalConfigHandler<FunctionPatientCodeConfig, BusDeviceRunningEntity> {
 
-    public HospitalPatientCodeHandler(ConfigStorage configStorage, String hospitalId, RedissonUtil redissonUtil, LocalBusInfusionHistoryService infusionHistoryService, DeviceRegistry deviceRegistry, WsPublishUtils wsPublishUtils) {
-        super(configStorage, hospitalId, redissonUtil, infusionHistoryService, deviceRegistry, wsPublishUtils);
+    public HospitalPatientCodeHandler(ConfigStorage configStorage, String hospitalId) {
+        super(configStorage, hospitalId);
     }
 
     @Override

+ 71 - 0
nb-system/src/main/java/com/nb/bus/hospital/config/handler/AnalPoorDisappearHandler.java

@@ -0,0 +1,71 @@
+package com.nb.bus.hospital.config.handler;
+
+import cn.hutool.json.JSONUtil;
+import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
+import com.nb.bus.entity.BusInfusionHistoryEntity;
+import com.nb.bus.hospital.config.HospitalFunctionAnalConfigHandler;
+import com.nb.bus.registry.device.DeviceRegistry;
+import com.nb.bus.service.LocalBusInfusionHistoryService;
+import com.nb.bus.utils.WsPublishUtils;
+import com.nb.common.delay_queue.handler.DelayMessageHandler;
+import com.nb.common.delay_queue.message.DelayMessage;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName MessageHandler.java
+ * @Description 镇痛不足消失处理
+ * @createTime 2022年07月08日 21:14:00
+ */
+@Component
+@AllArgsConstructor
+@Slf4j
+public class AnalPoorDisappearHandler implements DelayMessageHandler {
+    private  final LocalBusInfusionHistoryService infusionHistoryService;
+    private final WsPublishUtils wsPublishUtils;
+    private final DeviceRegistry deviceRegistry;
+    @Override
+    public String getId() {
+        return HandlerConstant.ANAL_POOR_DISAPPEAR;
+    }
+
+    @Override
+    public String description() {
+        return "镇痛不足消失";
+    }
+
+    @Override
+    @Transactional(rollbackFor = Exception.class)
+    public void handle(DelayMessage message) {
+        HospitalFunctionAnalConfigHandler.NoneAnalEntity source = message.getBody().as(HospitalFunctionAnalConfigHandler.NoneAnalEntity.class);
+        if(source==null){
+            log.error("延迟消息【{}】处理失败,获取消息体转换为 HospitalFunctionAnalConfigHandler.NoneAnalEntity 实体类时 转换结果为空", JSONUtil.toJsonStr(message));
+            return;
+        }
+        String deviceId = source.getDeviceId();
+        BusInfusionHistoryEntity infusionHistory = infusionHistoryService.getById(source.getInfusionId());
+        if(Boolean.TRUE.equals(infusionHistory.getFinished())){
+            //输注已结束,无需再处理
+            return;
+        }
+        //镇痛不足取消
+        if(Boolean.TRUE.equals(infusionHistory.getWarnAnalgesicPoor())){
+            log.info("消息【{}】,设备【{}】取消镇痛不足",source.getIotMsgId(),deviceId);
+            //那条消息引起的镇痛不足,就由那条消息的取消镇痛不足任务来结束
+            infusionHistoryService.update(new UpdateWrapper<BusInfusionHistoryEntity>().lambda()
+                    .eq(BusInfusionHistoryEntity::getId,infusionHistory.getId())
+                    .eq(BusInfusionHistoryEntity::getWarnAnalgesicPoor,true)
+                    .eq(BusInfusionHistoryEntity::getFlowRestricted,false)
+                    .eq(BusInfusionHistoryEntity::getAnalPoorMsgId,source.getIotMsgId())
+                    .set(BusInfusionHistoryEntity::getWarnAnalgesicPoor,false));
+            deviceRegistry.getOperator(deviceId).setAlarmOrWarn(null);
+            //发布推送
+            wsPublishUtils.publishPatientMonitor(infusionHistory.getPatientId(),infusionHistory.getTenantId());
+        }
+        log.info("延迟消息【{}】处理完成,处理器【{镇痛不足消失}】",JSONUtil.toJsonStr(message));
+    }
+}

+ 72 - 0
nb-system/src/main/java/com/nb/bus/hospital/config/handler/ClinicAutoFinishMonitorHandler.java

@@ -0,0 +1,72 @@
+package com.nb.bus.hospital.config.handler;
+
+import cn.hutool.json.JSONUtil;
+import com.nb.bus.entity.BusInfusionHistoryEntity;
+import com.nb.bus.entity.BusPatientEntity;
+import com.nb.bus.hospital.config.HospitalFinishMonitorConfigHandler;
+import com.nb.bus.service.LocalBusInfusionHistoryService;
+import com.nb.bus.service.LocalBusPatientService;
+import com.nb.bus.service.dto.ManualUndoConfig;
+import com.nb.bus.service.dto.UndoDeviceConfig;
+import com.nb.common.delay_queue.handler.DelayMessageHandler;
+import com.nb.common.delay_queue.message.DelayMessage;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.util.Collections;
+import java.util.Date;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName ClinicAutoFinishMonitorHandler.java
+ * @Description 当设备处于关机、不在服务区中时,发起临床自动结束管理
+ * @createTime 2022年07月08日 21:14:00
+ */
+@Component
+@AllArgsConstructor
+@Slf4j
+public class ClinicAutoFinishMonitorHandler implements DelayMessageHandler {
+    private   final LocalBusPatientService patientService;
+    private   final LocalBusInfusionHistoryService infusionHistoryService;
+    @Override
+    public String getId() {
+        return HandlerConstant.CLINIC_AUTO_FINISH;
+    }
+
+    @Override
+    public String description() {
+        return "当主设备处于关机、不在服务区中时,发起临床自动结束管理";
+    }
+
+    @Override
+    @Transactional(rollbackFor = Exception.class)
+    public void handle(DelayMessage message) {
+        HospitalFinishMonitorConfigHandler.FinishMonitorEntity source = message.getBody().as(HospitalFinishMonitorConfigHandler.FinishMonitorEntity.class);
+        if(source==null){
+            log.error("延迟消息【{}】处理失败,获取消息体转换为 HospitalFinishMonitorConfigHandler.FinishMonitorEntity 实体类时 转换结果为空", JSONUtil.toJsonStr(message));
+            return;
+        }
+        String patientId = source.getPatientId();
+        BusPatientEntity patient = patientService.getById(patientId);
+        if (source.getInfusionId().equals(patient.getInfusionId())) {
+            BusInfusionHistoryEntity infusionHistory = infusionHistoryService.getById(patient.getInfusionId());
+            Date uploadTime = source.getUploadTime();
+            if (infusionHistory.getLastUploadTime().equals(uploadTime)) {
+                UndoDeviceConfig config = source.getConfig();
+                ManualUndoConfig manualUndoConfig = new ManualUndoConfig();
+                config.setUndoTime(new Date());
+                manualUndoConfig.setTenantId(source.getTenantId());
+                manualUndoConfig.setUndo(config);
+                manualUndoConfig.setMonitorType(true);
+                manualUndoConfig.setClinicId(patient.getClinicId());
+                manualUndoConfig.setPatientCode(patient.getCode());
+                manualUndoConfig.setInfusionIds(Collections.singletonList(source.getInfusionId()));
+                infusionHistoryService.undo(manualUndoConfig,true);
+            }
+        }
+        log.info("延迟消息【{}】处理完成,处理器【{临床自动结束}】",JSONUtil.toJsonStr(message));
+    }
+}

+ 71 - 0
nb-system/src/main/java/com/nb/bus/hospital/config/handler/DeputyDeviceAutoUndoHandler.java

@@ -0,0 +1,71 @@
+package com.nb.bus.hospital.config.handler;
+
+import cn.hutool.core.util.ObjectUtil;
+import cn.hutool.json.JSONUtil;
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.nb.bus.entity.BusInfusionHistoryEntity;
+import com.nb.bus.entity.BusPatientEntity;
+import com.nb.bus.hospital.config.HospitalDeviceAutoUndoConfigHandler;
+import com.nb.bus.service.LocalBusInfusionHistoryService;
+import com.nb.bus.service.LocalBusPatientService;
+import com.nb.bus.service.dto.ManualUndoConfig;
+import com.nb.common.delay_queue.handler.DelayMessageHandler;
+import com.nb.common.delay_queue.message.DelayMessage;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.util.Collections;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName MessageHandler.java
+ * @Description 当设备处于关机、不在服务区中时,发起自动撤泵处理
+ * @createTime 2022年07月08日 21:14:00
+ */
+@Component
+@AllArgsConstructor
+@Slf4j
+public class DeputyDeviceAutoUndoHandler implements DelayMessageHandler {
+    private   final LocalBusPatientService patientService;
+    private   final LocalBusInfusionHistoryService infusionHistoryService;
+    @Override
+    public String getId() {
+        return HandlerConstant.DEVICE_AUTO_UNDO;
+    }
+
+    @Override
+    public String description() {
+        return "当设备处于关机、不在服务区中时,发起自动撤泵处理";
+    }
+
+    @Override
+    @Transactional(rollbackFor = Exception.class)
+    public void handle(DelayMessage message) {
+        HospitalDeviceAutoUndoConfigHandler.UndoEntity source = message.getBody().as(HospitalDeviceAutoUndoConfigHandler.UndoEntity.class);
+        if(source==null){
+            log.error("延迟消息【{}】处理失败,获取消息体转换为 HospitalDeviceAutoUndoConfigHandler.UndoEntity 实体类时 转换结果为空", JSONUtil.toJsonStr(message));
+            return;
+        }
+        String infusionId = source.getInfusionId();
+        BusPatientEntity patient = patientService.getOne(new QueryWrapper<BusPatientEntity>().lambda().eq(BusPatientEntity::getId, source.getPatientId())
+                .eq(BusPatientEntity::getTenantId, source.getTenantId()));
+        if (!infusionId.equals(patient.getInfusionId())) {
+            //只有副泵会自动撤泵
+            BusInfusionHistoryEntity infusionHistory = infusionHistoryService.getById(infusionId);
+            if(ObjectUtil.equal(infusionHistory.getLastUploadTime(),source.getUploadTime())){
+                ManualUndoConfig manualUndoConfig = new ManualUndoConfig();
+                manualUndoConfig.setInfusionIds(Collections.singletonList(source.getInfusionId()));
+                manualUndoConfig.setPatientCode(patient.getCode());
+                manualUndoConfig.setClinicId(source.getClinicId());
+                manualUndoConfig.setMonitorType(true);
+                manualUndoConfig.setUndo(source.getConfig());
+                manualUndoConfig.setTenantId(source.getTenantId());
+                infusionHistoryService.undo(manualUndoConfig,false);
+            }
+        }
+        log.info("延迟消息【{}】处理完成,处理器【{设备自动撤泵}】",JSONUtil.toJsonStr(message));
+    }
+}

+ 35 - 0
nb-system/src/main/java/com/nb/bus/hospital/config/handler/HandlerConstant.java

@@ -0,0 +1,35 @@
+package com.nb.bus.hospital.config.handler;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName HandlerConstant.java
+ * @Description 处理类名称常量
+ * @createTime 2022年07月08日 21:15:00
+ */
+public class HandlerConstant {
+
+    /**
+     * 当设备处于关机、不在服务区中时,发起自动撤泵处理
+     *
+     */
+    public static final String DEVICE_AUTO_UNDO="device_auto_uno";
+
+    /**
+     * 当设备处于关机、不在服务区中时,发起临床自动结束管理
+     *
+     */
+    public static final String CLINIC_AUTO_FINISH="clinic_auto_finish";
+
+    /**
+     * 镇痛不足消失
+     *
+     */
+    public static final String ANAL_POOR_DISAPPEAR="anal_poor_disappear";
+
+    /**
+     * 不在服务区
+     *
+     */
+    public static final String NO_SIGNAL="no_signal";
+}

+ 95 - 0
nb-system/src/main/java/com/nb/bus/hospital/config/handler/NoSignalHandler.java

@@ -0,0 +1,95 @@
+package com.nb.bus.hospital.config.handler;
+
+import cn.hutool.json.JSONUtil;
+import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
+import com.nb.bus.entity.BusDeviceAlarmEntity;
+import com.nb.bus.entity.BusInfusionHistoryEntity;
+import com.nb.bus.enums.DeviceStatusEnum;
+import com.nb.bus.hospital.config.HospitalFunctionExtraConfigHandler;
+import com.nb.bus.registry.device.DeviceRegistry;
+import com.nb.bus.service.LocalBusDeviceAlarmService;
+import com.nb.bus.service.LocalBusInfusionHistoryService;
+import com.nb.bus.utils.WsPublishUtils;
+import com.nb.common.delay_queue.handler.DelayMessageHandler;
+import com.nb.common.delay_queue.message.DelayMessage;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
+import java.util.Date;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName MessageHandler.java
+ * @Description 当设备处于关机、不在服务区中时,发起自动撤泵处理
+ * @createTime 2022年07月08日 21:14:00
+ */
+@Component
+@AllArgsConstructor
+@Slf4j
+public class NoSignalHandler implements DelayMessageHandler {
+
+    private final LocalBusInfusionHistoryService infusionHistoryService;
+    private final LocalBusDeviceAlarmService alarmService;
+
+    private final WsPublishUtils wsPublishUtils;
+    private final DeviceRegistry deviceRegistry;
+
+
+    @Override
+    public String getId() {
+        return HandlerConstant.NO_SIGNAL;
+    }
+
+    @Override
+    public String description() {
+        return "不在服务区延迟处理";
+    }
+
+    @Override
+    @Transactional(rollbackFor = Exception.class)
+    public void handle(DelayMessage message) {
+        HospitalFunctionExtraConfigHandler.NoSignalEntity source = message.getBody().as(HospitalFunctionExtraConfigHandler.NoSignalEntity.class);
+        if(source==null){
+            log.error("延迟消息【{}】处理失败,获取消息体转换为 HospitalFunctionExtraConfigHandler.NoSignalEntity 实体类时 转换结果为空", JSONUtil.toJsonStr(message));
+            return;
+        }
+        String deviceId = source.getDeviceId();
+        Date lastUploadTime =  deviceRegistry.getOperator(source.getDeviceId()).getUploadTime();
+        if(!lastUploadTime.equals(source.getUploadTime())){
+            return;
+        }
+        BusInfusionHistoryEntity infusionHistory = infusionHistoryService.getById(source.getInfusionId());
+        if (Boolean.TRUE.equals(infusionHistory.getFinished())) {
+            //输注已结束
+            if(log.isDebugEnabled()){
+                log.debug("不在服务区消息【{}】由于输注已结束,不在继续处理该消息,消息【{}】",message.getMsgId(),source);
+            }
+            return;
+        }
+        //不在服务区报警信息保存
+        Date alarmTime=new Date(source.getTimestamp().getTime()+message.getProperties().getTimeUnit().toMillis(message.getProperties().getExpire()));
+        BusDeviceAlarmEntity alarm = BusDeviceAlarmEntity.noSignalOf(source.getDeviceId(),source.getInfusionId(),source.getHistoryId(),source.getTenantId(),alarmTime);
+        if(log.isDebugEnabled()){
+            log.debug("消息【{}】处理,保存不在服务区报警",source);
+        }
+        alarmService.save(alarm);
+
+        boolean update = infusionHistoryService.update(new UpdateWrapper<BusInfusionHistoryEntity>().lambda()
+                .eq(BusInfusionHistoryEntity::getId, infusionHistory.getId())
+                .eq(BusInfusionHistoryEntity::getLastUploadTime, source.getUploadTime())
+                .set(BusInfusionHistoryEntity::getRunState, DeviceStatusEnum.NoSignal));
+        //报警/提醒缓存重置
+        if(update){
+            log.info("消息【{}】处理成功,输注【{}】变为【不在服务区】状态",message.getMsgId(),source.getInfusionId());
+            wsPublishUtils.publishPatientMonitor(infusionHistory.getPatientId(), infusionHistory.getTenantId());
+            //不在服务区
+            source.getMonitorConfigHandler().judgeNoSignalAutoFinish(infusionHistory.getPatientId(),deviceId,source.getPatientCode(),source.getTenantId(),source.getInfusionId(),source.getUploadTime());
+            source.getAutoUndoConfigHandler().judgeNoSignalAutoUndo(deviceId,infusionHistory.getPatientId(),source.getTenantId(),source.getInfusionId(),source.getUploadTime());
+        }else {
+            log.info("消息【{}】处理失败,输注【{}】变为【不在服务区】状态失败",message.getMsgId(),source.getInfusionId());
+        }
+        log.info("延迟消息【{}】处理完成,处理器【{设备自动撤泵}】",JSONUtil.toJsonStr(message));
+    }
+}

+ 0 - 3
nb-system/src/main/java/com/nb/bus/service/LocalBusPatientService.java

@@ -81,9 +81,6 @@ public class LocalBusPatientService extends BaseService<BusPatientMapper, BusPat
 
     @Override
     public void validateBeforeSave(BusPatientEntity entity) {
-        if (entity.getGender() == null) {
-            entity.setGender(SexEnum.UNKNOW);
-        }
         if (entity.getAlarm() == null) {
             entity.setAlarm(PatientAlarmEnum.NONE);
         }