Przeglądaj źródła

add
设备历史数据、报警数据、修改数据改为批量存储

lifang 4 tygodni temu
rodzic
commit
284b32d7a6

+ 4 - 6
nb-core/pom.xml

@@ -14,6 +14,10 @@
     <description>核心包,包含了一些基础信息</description>
 
     <dependencies>
+        <dependency>
+            <groupId>org.springframework.integration</groupId>
+            <artifactId>spring-integration-mqtt</artifactId>
+        </dependency>
         <dependency>
             <groupId>com.aliyun</groupId>
             <artifactId>dysmsapi20170525</artifactId>
@@ -103,11 +107,5 @@
             <groupId>com.belerweb</groupId>
             <artifactId>pinyin4j</artifactId>
         </dependency>
-        <!-- MQTT客户端依赖 -->
-        <dependency>
-            <groupId>org.eclipse.paho</groupId>
-            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
-            <version>1.2.5</version>
-        </dependency>
     </dependencies>
 </project>

+ 260 - 0
nb-service/web-service/src/main/java/com/nb/web/service/bus/batch/DeviceDataBatchProcessor.java

@@ -0,0 +1,260 @@
+package com.nb.web.service.bus.batch;
+
+import cn.hutool.core.collection.CollUtil;
+import com.nb.web.api.entity.BusDeviceAlarmEntity;
+import com.nb.web.api.entity.BusDeviceHistoryEntity;
+import com.nb.web.service.bus.entity.BusInfusionModifyEntity;
+import com.nb.web.service.bus.service.LocalBusDeviceAlarmService;
+import com.nb.web.service.bus.service.LocalBusDeviceHistoryService;
+import com.nb.web.service.bus.service.LocalBusInfusionModifyService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.SmartLifecycle;
+import org.springframework.stereotype.Component;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * 设备数据批量处理器
+ * 负责批量处理设备历史数据、报警信息和输注修改历史记录
+ */
+@Component
+@Slf4j
+public class DeviceDataBatchProcessor implements SmartLifecycle {
+    
+    // 历史数据队列
+    private final BlockingQueue<BusDeviceHistoryEntity> historyQueue = new LinkedBlockingQueue<>();
+    
+    // 报警信息队列
+    private final BlockingQueue<BusDeviceAlarmEntity> alarmQueue = new LinkedBlockingQueue<>();
+    
+    // 输注修改历史记录队列
+    private final BlockingQueue<BusInfusionModifyEntity> modifyQueue = new LinkedBlockingQueue<>();
+    
+    // 定时任务执行器
+    private final ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(3);
+    
+    // 是否正在运行
+    private final AtomicBoolean running = new AtomicBoolean(false);
+    
+    // 批处理大小
+    private static final int BATCH_SIZE = 500;
+    
+    // 批处理间隔(毫秒)
+    private static final long BATCH_INTERVAL = 5000;
+    
+    @Autowired
+    private LocalBusDeviceHistoryService historyService;
+    
+    @Autowired
+    private LocalBusDeviceAlarmService alarmService;
+    
+    @Autowired
+    private LocalBusInfusionModifyService modifyService;
+    
+    /**
+     * 添加历史数据到批处理队列
+     * @param history 历史数据
+     */
+    public void addHistory(BusDeviceHistoryEntity history) {
+        try {
+            historyQueue.put(history);
+        } catch (InterruptedException e) {
+            log.warn("添加历史数据到批处理队列被中断", e);
+        }
+    }
+    
+    /**
+     * 添加报警信息到批处理队列
+     * @param alarm 报警信息
+     */
+    public void addAlarm(BusDeviceAlarmEntity alarm) {
+        try {
+            alarmQueue.put(alarm);
+        } catch (InterruptedException e) {
+            log.warn("添加报警信息到批处理队列被中断", e);
+        }
+    }
+    
+    /**
+     * 添加输注修改历史记录到批处理队列
+     * @param modify 输注修改历史记录
+     */
+    public void addModify(BusInfusionModifyEntity modify) {
+        try {
+            modifyQueue.put(modify);
+        } catch (InterruptedException e) {
+            log.warn("添加输注修改历史记录到批处理队列被中断", e);
+        }
+    }
+    
+    @Override
+    public void start() {
+        if (running.compareAndSet(false, true)) {
+            // 启动定时批处理任务
+            scheduler.scheduleWithFixedDelay(this::batchProcessHistories, 
+                    BATCH_INTERVAL, BATCH_INTERVAL, TimeUnit.MILLISECONDS);
+            scheduler.scheduleWithFixedDelay(this::batchProcessAlarms, 
+                    BATCH_INTERVAL, BATCH_INTERVAL, TimeUnit.MILLISECONDS);
+            scheduler.scheduleWithFixedDelay(this::batchProcessModifies, 
+                    BATCH_INTERVAL, BATCH_INTERVAL, TimeUnit.MILLISECONDS);
+            
+            log.info("设备数据批量处理器已启动");
+        }
+    }
+    
+    @Override
+    public void stop() {
+        if (running.compareAndSet(true, false)) {
+            // 关闭调度器
+            scheduler.shutdown();
+            
+            // 处理剩余数据
+            processRemainingData();
+            
+            log.info("设备数据批量处理器已停止");
+        }
+    }
+    
+    @Override
+    public boolean isRunning() {
+        return running.get();
+    }
+    
+    /**
+     * 批量处理历史数据
+     */
+    private void batchProcessHistories() {
+        if (!running.get()) {
+            return;
+        }
+        
+        try {
+            // 批量处理历史数据
+            processHistoryBatch();
+        } catch (Exception e) {
+            log.error("批量处理历史数据时发生异常", e);
+        }
+    }
+    
+    /**
+     * 批量处理报警信息
+     */
+    private void batchProcessAlarms() {
+        if (!running.get()) {
+            return;
+        }
+        
+        try {
+            // 批量处理报警信息
+            processAlarmBatch();
+        } catch (Exception e) {
+            log.error("批量处理报警信息时发生异常", e);
+        }
+    }
+    
+    /**
+     * 批量处理输注修改历史记录
+     */
+    private void batchProcessModifies() {
+        if (!running.get()) {
+            return;
+        }
+        
+        try {
+            // 批量处理输注修改历史记录
+            processModifyBatch();
+        } catch (Exception e) {
+            log.error("批量处理输注修改历史记录时发生异常", e);
+        }
+    }
+    
+    /**
+     * 处理剩余数据(在应用关闭时调用)
+     */
+    private void processRemainingData() {
+        log.info("开始处理剩余的批处理数据");
+        
+        // 处理所有剩余的历史数据
+        while (!historyQueue.isEmpty()) {
+            processHistoryBatch();
+        }
+        
+        // 处理所有剩余的报警信息
+        while (!alarmQueue.isEmpty()) {
+            processAlarmBatch();
+        }
+        
+        // 处理所有剩余的输注修改历史记录
+        while (!modifyQueue.isEmpty()) {
+            processModifyBatch();
+        }
+        log.info("剩余的批处理数据处理完成");
+    }
+    
+    /**
+     * 处理一批历史数据
+     */
+    private void processHistoryBatch() {
+        int batchSize = Math.min(historyQueue.size(), BATCH_SIZE);
+        if (batchSize <= 0) {
+            return;
+        }
+        
+        try {
+            List<BusDeviceHistoryEntity> batch = CollUtil.newArrayList();
+            historyQueue.drainTo(batch, batchSize);
+            if (CollUtil.isNotEmpty(batch)) {
+                historyService.saveBatch(batch);
+                log.info("批量保存 {} 条历史数据", batch.size());
+            }
+        } catch (Exception e) {
+            log.error("批量保存历史数据时发生异常", e);
+        }
+    }
+    
+    /**
+     * 处理一批报警信息
+     */
+    private void processAlarmBatch() {
+        int batchSize = Math.min(alarmQueue.size(), BATCH_SIZE);
+        if (batchSize <= 0) {
+            return;
+        }
+        try {
+            List<BusDeviceAlarmEntity> batch = CollUtil.newArrayList();
+            alarmQueue.drainTo(batch, batchSize);
+            if (CollUtil.isNotEmpty(batch)) {
+                alarmService.saveBatch(batch);
+                log.info("批量处理 {} 条报警信息", batch.size());
+            }
+        } catch (Exception e) {
+            log.error("批量处理报警信息时发生异常", e);
+        }
+    }
+    
+    /**
+     * 处理一批输注修改历史记录
+     */
+    private void processModifyBatch() {
+        int batchSize = Math.min(modifyQueue.size(), BATCH_SIZE);
+        if (batchSize <= 0) {
+            return;
+        }
+        try {
+            List<BusInfusionModifyEntity> batch = CollUtil.newArrayList();
+            modifyQueue.drainTo(batch, batchSize);
+            if (CollUtil.isNotEmpty(batch)) {
+                modifyService.saveBatch(batch);
+                log.info("批量保存 {} 条输注修改历史记录", batch.size());
+            }
+        } catch (Exception e) {
+            log.error("批量保存输注修改历史记录时发生异常", e);
+        }
+    }
+}

+ 249 - 83
nb-service/web-service/src/main/java/com/nb/web/service/bus/listener/DeviceInfoListener.java

@@ -17,6 +17,7 @@ import com.nb.web.api.enums.DeviceStatusEnum;
 import com.nb.web.api.enums.DeviceTypeEnum;
 import com.nb.web.api.enums.FlowStatusEnum;
 import com.nb.web.api.enums.PatientAlarmEnum;
+import com.nb.web.service.bus.batch.DeviceDataBatchProcessor;
 import com.nb.web.service.bus.hospital.HospitalManagerRegister;
 import com.nb.web.service.bus.registry.device.DeviceOperator;
 import com.nb.web.service.bus.registry.device.DeviceRegistry;
@@ -43,107 +44,168 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Supplier;
 
 /**
+ * 设备信息监听器
+ * 
+ * 负责处理从物联网平台接收到的设备数据,包括:
+ * 1. 设备状态更新
+ * 2. 输注历史记录处理
+ * 3. 报警信息处理
+ * 4. 患者信息处理
+ * 5. WebSocket消息推送
+ * 
  * @author lifang
  * @version 1.0.0
- * @ClassName DeviceInfoListener.java
- * @Description 设备接收处理逻辑
- * @createTime 2022年03月27日 11:21:00
+ * @since 2022年03月27日
  */
 @Component
 @AllArgsConstructor
 @Slf4j
 public class DeviceInfoListener implements IIotMsgHandler {
 
+    /**
+     * 设备注册中心,用于管理设备操作实例
+     */
     private final DeviceRegistry deviceRegistry;
 
-    private final LocalBusDeviceAlarmService alarmService;
-
+    /**
+     * 患者注册中心,用于管理患者操作实例
+     */
     private final PatientRegistry patientRegistry;
 
+    /**
+     * 输注历史服务,用于处理输注相关信息
+     */
     private final LocalBusInfusionHistoryService infusionHistoryService;
 
+    /**
+     * 输注修改服务,用于处理输注参数修改记录
+     */
     private final LocalBusInfusionModifyService infusionModifyService;
 
-    private final LocalBusDeviceHistoryService historyService;
-
+    /**
+     * 患者服务,用于处理患者相关信息
+     */
     private final LocalBusPatientService patientService;
 
+    /**
+     * 临床服务,用于处理临床相关信息
+     */
     private final LocalBusClinicService clinicService;
 
+    /**
+     * WebSocket推送工具,用于向客户端推送实时消息
+     */
     private final WsPublishUtils wsPublishUtils;
 
+    /**
+     * 设备服务,用于处理设备基本信息
+     */
     private final LocalBusDeviceService deviceService;
 
+    /**
+     * 线程池,用于执行异步任务
+     */
     private final Executor executor;
 
+    /**
+     * 医院管理注册器,用于获取医院相关配置
+     */
     private final HospitalManagerRegister hospitalManagerRegister;
 
+    /**
+     * 设备定制化服务,用于处理设备特定逻辑
+     */
     private final DeviceCustomizedService deviceCustomizedService;
+    
     /**
-     * 监听上传的数据信息,
-     * 若设备详情发生变化,则及时通知相应的ws通道
-     * 当输注信息或新输注时,进行主副泵判断及切换
-     * 当新输注时,将关闭的临床重启
-     * @param
-     * @return  返回设备输注所在医院id
+     * 设备数据批量处理器,用于异步批量处理设备数据
+     */
+    private final DeviceDataBatchProcessor batchProcessor;
+    
+    /**
+     * 同步处理设备上传的数据
+     * 
+     * 主要处理逻辑包括:
+     * 1. 设备数据初始化和验证
+     * 2. 输注历史记录处理
+     * 3. 患者信息处理
+     * 4. 报警信息处理
+     * 5. WebSocket消息推送
+     * 
+     * @param device  设备运行数据
+     * @param deviceId 设备ID
+     * @return 处理后的设备运行数据
      */
     @Override
     @Transactional(rollbackFor = Exception.class)
     public BusDeviceRunningEntity sync(BusDeviceRunningEntity device, String deviceId) {
-        //保证统一设备数据顺序处理,若数据发送过快,为防止冲突,请在此加锁处理
+        // 保证统一设备数据顺序处理,若数据发送过快,为防止冲突,请在此加锁处理
         synchronized (deviceId){
             String classification = device.getClassification();
+            // 验证分包标识号不能为空
             if(StrUtil.isBlank(classification)){
                 log.error("消息【{}】,设备号:【{}】分包标识号为空,无法更新开始时间",device.getMsgId(),device.getDeviceId());
                 throw new RuntimeException("设备数据中分包标识不能为空");
             }
             log.info("上传设备属性:【{}】",JSONUtil.toJsonStr(device));
             long startTime = System.currentTimeMillis();
+            
+            // 获取设备操作实例
             DeviceOperator deviceOperator = deviceRegistry.getOperator(deviceId);
-            //对设备数据的一些参数进行初始化设置
+            
+            // 初始化设备数据
             deviceOperator.refreshHospitalCode(device.getUserId());
             device.setTenantId(deviceOperator.getTenantId());
             deviceCustomizedService.customized(device);
             init(device);
-            //缓存操作
+            
+            // 缓存操作列表,用于事务提交后执行
             List<Supplier<?>> cacheOperation=new ArrayList<>();
-            //处理输注参数
+            
+            // 处理输注参数
             BusInfusionHistoryEntity infusionHistory = handleInfusionHistory(device, deviceOperator, cacheOperation);
 
+            // 记录加档受限情况
             if(Boolean.TRUE.equals(infusionHistory.getFlowRestricted())){
-                log.info("消息【{}】,设备号【{}】,输注id【{}】,在输注过程中发生了【加档受限】,故此输注过程中的【镇痛不足】不会自动消失",device.getMsgId(),device.getDeviceId(),device.getInfusionId());
+                log.info("消息【{}】,设备号【{}】,输注id【{}】,在输注过程中发生了【加档受限】,故此输注过程中的【镇痛不足】不会自动消失",
+                        device.getMsgId(),device.getDeviceId(),device.getInfusionId());
             }
+            
             if(log.isDebugEnabled()){
                 log.debug("消息【{}】,设备号【{}】处理输注参数成功,处理后结果为【{}】",device.getMsgId(),device.getDeviceId(),JSONUtil.toJsonStr(device));
             }
 
-            //测试数据不进行病人数据的计算
+            // 测试数据不进行病人数据的计算
             if(Boolean.FALSE.equals(infusionHistory.getTest())){
-                //格式化病号
+                // 格式化病号
 //                formatPatientCode(device);
                 if(log.isDebugEnabled()){
                     log.debug("消息【{}】,设备号【{}】格式化病号,处理后结果为【{}】",device.getMsgId(),device.getDeviceId(),JSONUtil.toJsonStr(device));
                 }
-                //处理输注修改参数,处理病人信息
+                
+                // 处理输注修改参数,处理病人信息
                 handleInfusionModify(device,deviceOperator,cacheOperation);
                 if(log.isDebugEnabled()){
                     log.debug("消息【{}】,设备号【{}】处理输注修改参数、病人信息成功,处理后结果为【{}】",device.getMsgId(),device.getDeviceId(),JSONUtil.toJsonStr(device));
                 }
-                //根据功能配置进行最后的一些状态处理
+                
+                // 根据功能配置进行最后的一些状态处理
                 handleHospitalConfigLast(device,infusionHistory);
             }
 
-            //处理历史运行数据
+            // 处理历史运行数据
             BusDeviceHistoryEntity history=handleRunningHistory(device);
             if(log.isDebugEnabled()){
                 log.debug("消息【{}】,设备号【{}】处理历史运行数据成功,处理后结果为【{}】",device.getMsgId(),device.getDeviceId(),JSONUtil.toJsonStr(device));
             }
-            //处理报警、提醒信息
+            
+            // 处理报警、提醒信息
             handleAlarmOrWarn(history);
             if(log.isDebugEnabled()){
                 log.debug("消息【{}】,设备号【{}】处理报警、提醒信息成功,处理后结果为【{}】",device.getMsgId(),device.getDeviceId(),JSONUtil.toJsonStr(device));
             }
 
+            // 保存或更新输注历史记录
             if(device.isNewInfusion()){
                 deviceService.update(new UpdateWrapper<BusDeviceEntity>()
                         .lambda()
@@ -161,24 +223,35 @@ public class DeviceInfoListener implements IIotMsgHandler {
                 }
             }
 
+            // 添加缓存操作
             cacheOperation.add(()->{
                 deviceOperator.setUploadTime(device.getUploadTime());
                 wsPublishUtils.publishPatientMonitor(device.getPatientId(),device.getTenantId());
-                //非测试数据再进行处理
+                
+                // 非测试数据再进行处理
                 if(!Boolean.TRUE.equals(device.getTest())){
+                    // 处理撤泵或输注修改情况
                     if (Boolean.TRUE.equals(device.isResetUndo())||Boolean.TRUE.equals(device.isInfusionModify())) {
                         if (Boolean.TRUE.equals(device.isInfusionModify())) {
                             wsPublishUtils.publishDeviceNone(device.getTenantId());
                         }
                         wsPublishUtils.publishDeviceRepeat(device.getTenantId());
                     }
+                    
+                    // 处理临床重启情况
                     if (Boolean.TRUE.equals(device.isResetClinic())) {
                         wsPublishUtils.publishMonitorTotalCount(device.getTenantId());
                     }
+                    
+                    // 处理低输注情况
                     if(!device.isMaybeLowInfusion()){
                         deviceOperator.setNonLowInfusionTime(device.getUploadTime());
                     }
+                    
+                    // 发布监控状态计数
                     wsPublishUtils.publishMonitorStateCount(device.getTenantId());
+                    
+                    // 新输注情况下异步获取患者信息
                     if(device.isNewInfusion()){
                         CompletableFuture.runAsync(()->patientService.getPatientInfoFromHis(device.getTenantId(),device.getFormatPatientCode(),10,false)
                                 ,executor)
@@ -191,7 +264,7 @@ public class DeviceInfoListener implements IIotMsgHandler {
                 return null;
             });
 
-            //所有事务处理完成后更新缓存信息
+            // 所有事务处理完成后更新缓存信息
             cacheOperation.forEach(Supplier::get);
             log.info("设备数据处理结束:{},耗时【{}】",JSONUtil.toJsonStr(device),(System.currentTimeMillis()-startTime));
             return device;
@@ -200,38 +273,46 @@ public class DeviceInfoListener implements IIotMsgHandler {
 
 
     /**
-     * 描述: 是否存在加档受限、存在则跳过判定,不存在则判定加减档标识
-     * 1、新输注则忽略
-     * 2、与上次流速进行比较,降低为减档,增加为加档
-     * @author lifang
-     * @date 2022/6/30 9:53
-     * @param device
-     * @param deviceOperator
-     * @return void
+     * 处理智能泵的加减档逻辑
+     * 
+     * 判断逻辑:
+     * 1. 新输注则忽略
+     * 2. 与上次流速进行比较,降低为减档,增加为加档
+     * 
+     * @param device 设备运行数据
+     * @param deviceOperator 设备操作实例
+     * @param cacheOperation 缓存操作列表
      */
     private void handleIntelligent(BusDeviceRunningEntity device, DeviceOperator deviceOperator, List<Supplier<?>> cacheOperation) {
-        //开机数据不进行加减档判定
+        // 开机数据不进行加减档判定
         if (!DeviceStatusEnum.StartUp.equals(device.getRunState())) {
             AtomicBoolean flowLimit=new AtomicBoolean(deviceOperator.isFlowLimit());
             BigDecimal lastContinueDose = deviceOperator.getContinueDose();
+            
+            // 判断是否需要进行加减档判定
             if(FlowStatusEnum.None==device.getWarnFlow()
                     && Boolean.FALSE.equals(device.isNewInfusion())){
 
+                // 比较当前流速与上次流速
                 if(lastContinueDose!=null){
                     if(CompareUtil.compare(device.getContinueDose(),lastContinueDose)>0){
-                        log.info("消息【{}】,设备【{}】,当前流速【{}】,上一状态流速【{}】,加档",device.getMsgId(),device.getDeviceId(),device.getContinueDose(),lastContinueDose);
+                        log.info("消息【{}】,设备【{}】,当前流速【{}】,上一状态流速【{}】,加档",
+                                device.getMsgId(),device.getDeviceId(),device.getContinueDose(),lastContinueDose);
                         device.setWarnFlow(FlowStatusEnum.Up);
                     }else if(CompareUtil.compare(device.getContinueDose(),lastContinueDose)<0){
-                        log.info("消息【{}】,设备【{}】,当前流速【{}】,上一状态流速【{}】,减档",device.getMsgId(),device.getDeviceId(),device.getContinueDose(),lastContinueDose);
+                        log.info("消息【{}】,设备【{}】,当前流速【{}】,上一状态流速【{}】,减档",
+                                device.getMsgId(),device.getDeviceId(),device.getContinueDose(),lastContinueDose);
                         device.setWarnFlow(FlowStatusEnum.Down);
                     }
                 }
+                
+                // 处理流速限制状态
                 if(!FlowStatusEnum.Down.equals(device.getWarnFlow())){
                     device.setWarnFlow(flowLimit.get()?FlowStatusEnum.Limited:device.getWarnFlow());
                 }
             }
 
-            //上限值调整
+            // 上限值调整
             if (FlowStatusEnum.Limited.equals(device.getWarnFlow())) {
                 if(CompareUtil.compare(device.getContinueDose(),device.getFlowUpLimit())<0){
                     flowLimit.set(false);
@@ -239,10 +320,11 @@ public class DeviceInfoListener implements IIotMsgHandler {
                 }
             }
 
+            // 添加缓存操作
             cacheOperation.add(()->{
                 deviceOperator.setContinueDose(device.getContinueDose());
                 if(flowLimit.get()&&Boolean.FALSE.equals(device.isNewInfusion())){
-                    //同一输注过程中,只有减档才能够消除加档受限
+                    // 同一输注过程中,只有减档才能够消除加档受限
                     if(FlowStatusEnum.Down.equals(device.getWarnFlow())){
                         deviceOperator.setFlowLimit(false);
                     }
@@ -252,62 +334,87 @@ public class DeviceInfoListener implements IIotMsgHandler {
                 return null;
             });
         }else {
+            // 开机包不进行计算
             cacheOperation.add(()->{
-                //开机包不进行计算
                 deviceOperator.setContinueDose(null);
                 deviceOperator.setFlowLimit(false);
                 return null;
             });
         }
 
+        // 设置加档受限标识
         if(device.getWarnFlow()!=null&&FlowStatusEnum.Limited.equals(device.getWarnFlow())){
-            //加档受限,进行标识
+            // 加档受限,进行标识
             device.setFlowRestricted(true);
         }else {
             device.setFlowRestricted(false);
         }
     }
 
+    /**
+     * 初始化设备运行数据
+     * 
+     * @param device 设备运行数据
+     */
     private void init(BusDeviceRunningEntity device) {
-        //默认格式化住院号=设备上传住院号
+        // 默认格式化住院号=设备上传住院号
         device.setFormatPatientCode(device.getPatientCode());
         device.setMonitorType(true);
         device.setUploadTime(device.getUploadTime()==null?new Date():device.getUploadTime());
-        //将设备撤泵标志去除
+        // 将设备撤泵标志去除
         device.setIsUndo(false);
-
     }
 
+    /**
+     * 判断是否为新输注
+     * 
+     * @param deviceId 设备ID
+     * @param classification 分包标识号
+     * @return true-新输注,false-非新输注
+     */
     private boolean isNewInFusion(String deviceId, String classification){
         return ObjectUtil.notEqual(classification,deviceRegistry.getOperator(deviceId).getClassification());
     }
 
     /**
-     * 病患信息处理,返回结果为当前病患是否存在临床信息
-     * @param device 接收到的设备数据
-     * @return 是否发生了换泵操作
+     * 处理患者信息
+     * 
+     * 主要处理逻辑:
+     * 1. 设置主泵标识
+     * 2. 处理撤泵或输注修改情况
+     * 3. 处理设备占用情况
+     * 
+     * @param device 设备运行数据
+     * @param clinicFinished 临床是否结束
+     * @param suppliers 供应商操作列表
      */
     private void handlePatient(BusDeviceRunningEntity device,boolean clinicFinished,List<Supplier<?>> suppliers){
         device.setMaster(true);
         String deviceId = device.getDeviceId();
         PatientOperator currentPatientOperator = patientRegistry.getOperator(device.getTenantId(), device.getPatientCode());
+        
+        // 处理撤泵或输注修改情况
         if(Boolean.TRUE.equals(device.isResetUndo())||device.isInfusionModify()){
             if(Boolean.TRUE.equals(device.isResetUndo())||device.isNewInfusion()){
-                //只有在新开启输注时进行判断,1、判断当前输注的病号是否存在其他正在进行的输注 2、判断该设备上一次输注绑定的病号是否没有任何输注
+                // 只有在新开启输注时进行判断
+                // 1. 判断当前输注的病号是否存在其他正在进行的输注
+                // 2. 判断该设备上一次输注绑定的病号是否没有任何输注
                 long runningInfusionCount = infusionHistoryService.getRunningInfusionCount(device.getPatientId());
                 patientService.update(new UpdateWrapper<BusPatientEntity>()
                         .lambda()
                         .eq(BusPatientEntity::getId,device.getPatientId())
-                        //已撤泵且修改参数的输注不再作为主泵进行判定
+                        // 已撤泵且修改参数的输注不再作为主泵进行判定
                         .set(device.isNewInfusion(),BusPatientEntity::getInfusionId,device.getInfusionId())
                         .set(device.isNewInfusion(),BusPatientEntity::getOriginCode,device.getPatientCode())
                         .set(device.isNewInfusion(),BusPatientEntity::getCode,device.getFormatPatientCode())
                         .set(runningInfusionCount>0,BusPatientEntity::getAlarm,PatientAlarmEnum.DEVICE_REPEAT)
                         .set(runningInfusionCount==0,BusPatientEntity::getAlarm,PatientAlarmEnum.NONE));
+                
+                // 处理新输注情况
                 if(device.isNewInfusion()){
                     BusInfusionHistoryEntity lastInfusion = infusionHistoryService.lastInfusion(deviceId);
                     if(lastInfusion!=null&&ObjectUtil.notEqual(device.getPatientId(),lastInfusion.getPatientId())){
-                        //输注设备被占用
+                        // 输注设备被占用
                         BusInfusionHistoryEntity normalInfusion = infusionHistoryService.deviceOccupation(lastInfusion);
                         if(log.isDebugEnabled()){
                             log.debug("住院号【{}】输注设备被占用,最后一条输注信息为【{}】",lastInfusion.getPatientCode(),JSONUtil.toJsonStr(lastInfusion));
@@ -336,7 +443,7 @@ public class DeviceInfoListener implements IIotMsgHandler {
                     return null;
                 });
             }else if(!clinicFinished){
-                //临床未结束,切换当前输注信息为主输注
+                // 临床未结束,切换当前输注信息为主输注
                 patientService.update(new UpdateWrapper<BusPatientEntity>().lambda()
                         .eq(BusPatientEntity::getId,device.getPatientId())
                         .set(BusPatientEntity::getInfusionId,device.getInfusionId()));
@@ -351,15 +458,26 @@ public class DeviceInfoListener implements IIotMsgHandler {
 
     /**
      * 处理输注历史记录信息
+     * 
+     * 主要处理逻辑:
+     * 1. 判断是否为新输注
+     * 2. 设置输注相关信息
+     * 3. 处理智能泵数据
+     * 4. 处理追加量数据
+     * 
      * @param device 设备当前运行数据
      * @param deviceOperator 设备数据操作符
      * @param cacheOperation 缓存操作
+     * @return 输注历史记录实体
      */
     private BusInfusionHistoryEntity handleInfusionHistory(BusDeviceRunningEntity device, DeviceOperator deviceOperator, List<Supplier<?>> cacheOperation) {
+        // 判断是否为新输注
         boolean newInfusion=isNewInFusion(device.getDeviceId(),device.getClassification());
         device.setNewInfusion(newInfusion);
+        
+        // 设置输注相关信息
         if(newInfusion){
-            //新输注第一条数据不存在加档受限
+            // 新输注第一条数据不存在加档受限
             device.setFlowRestricted(false);
             device.setTenantId(deviceOperator.getTenantId());
             device.setStartTime(device.getUploadTime());
@@ -367,25 +485,35 @@ public class DeviceInfoListener implements IIotMsgHandler {
             device.setTenantId(deviceOperator.getInfusionTenantId());
             device.setStartTime(deviceOperator.getStartTime());
         }
+        
+        // 获取患者操作实例
         PatientOperator patientOperator = patientRegistry.getOperator(device.getTenantId(), device.getPatientCode(),MapUtil.of("startTime", device.getStartTime()));
         device.setClinicId(patientOperator.getClinicId());
         device.setPatientId(patientOperator.getPatientId());
-        //处理智能泵数据
+        
+        // 处理智能泵数据
         if(DeviceTypeEnum.intelligent.equals(device.getType())){
             handleIntelligent(device,deviceOperator,cacheOperation);
         }
-        //处理追加量数据
+        
+        // 处理追加量数据
         handleAppendDose(device,deviceOperator,cacheOperation);
+        
+        // 解析输注历史记录
         BusInfusionHistoryEntity infusionHistory = BusInfusionHistoryEntity.parseRunningInfo(device);
         String originInfusionId = deviceOperator.getInfusionId();
+        
+        // 判断是否为主泵
         if(ObjectUtil.equal(device.getDeviceId(),patientOperator.getBindDeviceId())){
             device.setMaster(true);
         }
+        
+        // 处理新输注或已有输注情况
         if (newInfusion) {
             infusionHistory.setId(IdWorker.getIdStr());
             infusionHistory.setFinished(false);
             infusionHistory.setStartTime(device.getUploadTime());
-            //结束其余输注信息
+            // 结束其余输注信息
             infusionHistoryService.update(new UpdateWrapper<BusInfusionHistoryEntity>().lambda()
                     .eq(BusInfusionHistoryEntity::getDeviceId,device.getDeviceId())
                     .eq(BusInfusionHistoryEntity::getFinished,false)
@@ -399,6 +527,8 @@ public class DeviceInfoListener implements IIotMsgHandler {
             }
         }
         device.setInfusionId(infusionHistory.getId());
+        
+        // 添加缓存操作
         cacheOperation.add(()->{
             if(device.isNewInfusion()){
                 deviceOperator.setInfusionId(device.getInfusionId());
@@ -416,27 +546,33 @@ public class DeviceInfoListener implements IIotMsgHandler {
     }
 
     /**
-     * 描述: 处理追加量数据
-     * @author lifang
-     * @date 2022/7/1 11:05
-     * @param device
-     * @param deviceOperator
-     * @return void
+     * 处理追加量数据
+     * 
+     * 计算逻辑:
+     * 1. 新输注:仅计算此次输注过程中的追加量
+     * 2. 非新输注:根据PCA有效次数变化计算追加量
+     * 
+     * @param device 设备运行数据
+     * @param deviceOperator 设备操作实例
+     * @param cacheOperation 缓存操作列表
      */
     private void handleAppendDose(BusDeviceRunningEntity device, DeviceOperator deviceOperator, List<Supplier<?>> cacheOperation) {
         BigDecimal appendDose = device.getAppendDose();
         device.setTotalAppendDose(BigDecimal.ZERO);
+        
+        // 处理新输注情况
         if(device.isNewInfusion()){
-            //仅计算此次输注过程中的追加量
+            // 仅计算此次输注过程中的追加量
             if(appendDose!=null){
                 device.setTotalAppendDose(appendDose.multiply(BigDecimal.valueOf(Optional.ofNullable(device.getPcaValidCount()).orElse(0))));
             }
         }else{
+            // 处理非新输注情况
             if(device.getPcaValidCount()==null||device.getPcaValidCount()==0){
                 device.setTotalAppendDose(BigDecimal.ZERO);
             }else {
                 Integer lastPcaValidCount = deviceOperator.getLastPcaValidCount();
-                //当pca值变小是则以当前值为准重新计算
+                // 当pca值变小是则以当前值为准重新计算
                 int compare = CompareUtil.compare(device.getPcaValidCount(),lastPcaValidCount);
                 BigDecimal totalAppendDose = deviceOperator.getTotalAppendDose();
                 if(compare==0){
@@ -451,33 +587,42 @@ public class DeviceInfoListener implements IIotMsgHandler {
             }
         }
 
+        // 添加缓存操作
         cacheOperation.add(()->{
             deviceOperator.setLastPcaValidCount(Optional.ofNullable(device.getPcaValidCount()).orElse(0));
             deviceOperator.setTotalAppendDose(Optional.ofNullable(device.getTotalAppendDose()).orElse(BigDecimal.ZERO));
             return null;
         });
-
     }
 
     /**
-     * 描述: 处理输注参数
-     * @author lifang
-     * @date 2022/5/30 14:04
-     * @param device
-     * @param deviceOperator
-     * @param cacheOperation
+     * 处理输注参数修改
+     * 
+     * 主要处理逻辑:
+     * 1. 解析输注修改信息
+     * 2. 判断是否需要处理输注修改
+     * 3. 处理患者信息
+     * 4. 处理临床信息
+     * 
+     * @param device 设备运行数据
+     * @param deviceOperator 设备操作实例
+     * @param cacheOperation 缓存操作列表
      */
     private void handleInfusionModify(BusDeviceRunningEntity device, DeviceOperator deviceOperator, List<Supplier<?>> cacheOperation){
+        // 解析输注修改信息
         BusInfusionModifyEntity busInfusionModify = BusInfusionModifyEntity.parseRunningInfo(device);
 
         String signHex = busInfusionModify.signParam();
         String infusionParam = deviceOperator.getInfusionParam();
         boolean clinicFinished = clinicService.isFinished(device.getClinicId());
-        //1、输注已撤泵,重新开启
+        
+        // 判断是否需要处理输注修改
+        // 1. 输注已撤泵,重新开启
         if(Boolean.TRUE.equals(device.isResetUndo())||ObjectUtil.notEqual(signHex,infusionParam)){
             if(ObjectUtil.notEqual(signHex,infusionParam)){
                 device.setInfusionModify(true);
-                infusionModifyService.save(busInfusionModify);
+                // 改为异步批量处理
+                batchProcessor.addModify(busInfusionModify);
                 device.setInfusionModifyId(busInfusionModify.getId());
                 device.setModifyTime(busInfusionModify.getModifyTime());
             }
@@ -489,15 +634,17 @@ public class DeviceInfoListener implements IIotMsgHandler {
                 return null;
             });
         }else {
-            //填充当前参数修改记录id
+            // 填充当前参数修改记录id
             BusInfusionModifyEntity modify =infusionModifyService.recentModify(device.getClinicId(),device.getTenantId());
             if(modify==null){
                 modify=busInfusionModify;
-                infusionModifyService.save(modify);
+                // 改为异步批量处理
+                batchProcessor.addModify(modify);
             }
             device.setInfusionModifyId(modify.getId());
         }
-        //判断临床是否已结束,若临床已结束,则采用当前输注作为主输注开启临床
+        
+        // 判断临床是否已结束,若临床已结束,则采用当前输注作为主输注开启临床
         BusPatientEntity patient = patientService.getById(device.getPatientId());
         if(clinicFinished){
             clinicService.resetClinic(device.getClinicId());
@@ -516,6 +663,11 @@ public class DeviceInfoListener implements IIotMsgHandler {
     }
 
 
+    /**
+     * 格式化患者编码
+     * 
+     * @param device 设备运行数据
+     */
     private void formatPatientCode(BusDeviceRunningEntity device){
         hospitalManagerRegister
                 .get(device.getTenantId())
@@ -523,20 +675,24 @@ public class DeviceInfoListener implements IIotMsgHandler {
     }
 
     /**
-     * 最后处理医院相关配置对于设备数据的处理
+     * 处理医院相关配置对于设备数据的处理
+     * 
      * @param device 设备运行数据
+     * @param infusionHistory 输注历史记录
      */
     private void handleHospitalConfigLast(BusDeviceRunningEntity device, BusInfusionHistoryEntity infusionHistory ) {
         hospitalManagerRegister
                 .get(device.getTenantId())
                 .handleDeviceMessage(device);
-        //实时处理,镇痛不足
+        
+        // 实时处理镇痛不足
         infusionHistory.setWarnAnalgesicPoor(device.getWarnAnalgesicPoor());
         if (Boolean.TRUE.equals(infusionHistory.getWarnAnalgesicPoor())) {
-            //引起镇痛不足的消息id
+            // 引起镇痛不足的消息id
             infusionHistory.setAnalPoorMsgId(device.getMsgId());
         }
-        //实时处理,低输注
+        
+        // 实时处理低输注
         infusionHistory.setWarnFlow(device.getWarnFlow());
     }
 
@@ -544,31 +700,41 @@ public class DeviceInfoListener implements IIotMsgHandler {
 
     /**
      * 处理运行数据的历史记录
+     * 
      * @param device 接收到的设备信息
-     * @return 历史记录id
+     * @return 历史记录实体
      */
     private BusDeviceHistoryEntity handleRunningHistory(BusDeviceRunningEntity device) {
         BusDeviceHistoryEntity entity = BusDeviceHistoryEntity.parseRunningInfo(device);
         entity.setId(String.valueOf(IdWorker.getId()));
-        historyService.save(entity);
+        // 改为异步批量处理
+        batchProcessor.addHistory(entity);
         device.setHistoryId(entity.getId());
         return entity;
     }
 
     /**
      * 判断是否为报警信息并处理
+     * 
      * @param history 设备当前数据的历史数据
      */
     private void handleAlarmOrWarn(BusDeviceHistoryEntity history){
         if (BusDeviceAlarmEntity.alarmOrWarn(history)) {
             BusDeviceAlarmEntity alarm = BusDeviceAlarmEntity.parseHistory(history);
-            alarmService.save(alarm);
+            // 改为异步批量处理
+            batchProcessor.addAlarm(alarm);
         }
     }
 
+    /**
+     * 异步处理设备上传的数据
+     * 
+     * @param source 设备运行数据
+     * @param deviceId 设备ID
+     */
     @Override
     @Async
     public void async(BusDeviceRunningEntity source, String deviceId) {
         sync(source,deviceId);
     }
-}
+}

+ 0 - 1
nb-service/web-service/src/main/java/com/nb/web/service/bus/service/LocalBusEvaluationService.java

@@ -128,7 +128,6 @@ public class LocalBusEvaluationService extends BaseService<BusEvaluationMapper,
             }
             entity.setPatientId(clinic.getPatientId());
         }
-
     }
 
     @Override