Ver Fonte

merge 冲突

18339543638 há 3 anos atrás
pai
commit
ba796e31d4

+ 1 - 1
coffee-system/src/main/java/com/coffee/bus/entity/BusDeviceRunningEntity.java

@@ -33,7 +33,7 @@ import java.util.Date;
 @Data
 @EqualsAndHashCode(callSuper = false)
 @Accessors(chain = true)
-@TableName(value = "bus_device_using",autoResultMap = true)
+@TableName(value = "bus_device_running",autoResultMap = true)
 @ApiModel(value="设备运行状态", description="设备运行状态")
 public class BusDeviceRunningEntity extends TenantGenericEntity<String,String> {
 

+ 169 - 134
coffee-system/src/main/java/com/coffee/bus/websocket/listener/DeviceInfoListener.java

@@ -5,6 +5,7 @@ import cn.hutool.core.util.RandomUtil;
 import cn.hutool.core.util.StrUtil;
 import cn.hutool.extra.spring.SpringUtil;
 import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
+import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
 import com.baomidou.mybatisplus.core.toolkit.IdWorker;
 import com.coffee.bus.entity.BusDeviceAlarmEntity;
 import com.coffee.bus.entity.BusDeviceRunningEntity;
@@ -29,11 +30,13 @@ import lombok.extern.slf4j.Slf4j;
 import org.springframework.context.event.EventListener;
 import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.scheduling.annotation.Async;
+import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 import org.springframework.transaction.annotation.Transactional;
 
 import java.math.BigDecimal;
 import java.util.*;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 /**
@@ -69,7 +72,7 @@ public class DeviceInfoListener {
     @Async
     @Transactional(rollbackFor = Exception.class)
     public void deviceInfoDetail(DeviceInfoEvent infoEvent){
-        //若数据发送过快,为防止冲突,请在此加锁处理 todo
+        //保证统一设备数据顺序处理,若数据发送过快,为防止冲突,请在此加锁处理 todo
         synchronized (infoEvent.getDeviceId()){
             log.info("开始处理时间------------------------[{}]",System.currentTimeMillis());
             /****************处理泵数据 todo 后续交由上游处理****************/
@@ -77,164 +80,191 @@ public class DeviceInfoListener {
             //1、判断该设备是否已和医院绑定并开启使用
             String deviceId = device.getDeviceId();
             DeviceOperator<DeviceCacheInfo> deviceOperator = deviceRegistry.getDevice(deviceId);
+
             if (deviceOperator==null||!Boolean.TRUE.equals(deviceOperator.getEnable())) {
                 log.warn("设备[{}]暂不可用,数据已丢弃",deviceId);
                 return ;
             }
             log.info("接收到设备数据:[{}]",infoEvent.getContent().toString());
-            /********************根据分包标记标识判断是否为新的开机时间*******************/
-            String originClassify = deviceOperator.getClassification();
-            String classification = device.getClassification();
-            if(classification==null){
-                log.error("设备号:[{}]分包标识号为空,无法更新开始时间");
-                classification="-1";
-            }
-            if(!classification.equals(originClassify)){
-                deviceOperator.setClassification(classification);
-                deviceOperator.setStartTime(new Date());
-            }
-            /********************根据分包标记标识判断是否为新的开机时间*******************/
-            String tenantId = deviceOperator.getTenantId();
-            PatientOperator<PatientCacheInfo> patientOperator =patientRegistry.getPatient(tenantId, device.getPatientCode());
-            device.setTenantId(tenantId);
-            device.setMonitorType(1);
-            //默认为主泵
-            device.setMaster(1);
-            device.setStartTime(deviceOperator.getStartTime());
-            device.setAlias(deviceOperator.getAlias());
-            String usingId = deviceOperator.getUsingId();
-            if(StrUtil.isNullOrUndefined(usingId)){
-                //不存在运行id,即首次运行,需要与病人、医院进行绑定
-                initDevice(device,deviceOperator);
-                initPatient(device,patientOperator);
-            }
-            else {
-                device.setId(usingId);
-            }
-            /****************处理泵数据****************/
+            //缓存操作
+            List<Supplier<?>> cacheOperation=new ArrayList<>();
 
-            /****************处理泵与患者关系****************/
-            //泵绑定重复判定
-            //获取病号当前绑定的泵号
-            String bindDeviceId = patientOperator.getBindDeviceId();
-            if(!StrUtil.isNullOrUndefined(bindDeviceId)&&!deviceId.equals(bindDeviceId)){
-                //泵号发生改变,判断泵的开始时间,将开始时间稍后的泵设置为主泵再与设备绑定
-                DeviceOperator currentBindDevice = deviceRegistry.getDevice(bindDeviceId);
-                if (currentBindDevice.getStartTime().before(device.getStartTime())) {
-                    log.error("病号:[{}],之前主泵为:[{}],现在主泵为:[{}]",device.getPatientCode(),bindDeviceId,deviceId);
-                    //设置当前上传信息的泵为主泵,将旧泵设置为副泵吗,并更新病人绑定泵的信息, todo 发生泵重复报警
-                    deviceUsingService.update(new UpdateWrapper<BusDeviceRunningEntity>().lambda().eq(BusDeviceRunningEntity::getDeviceId,deviceId).set(BusDeviceRunningEntity::getMaster,1));
-                    deviceUsingService.update(new UpdateWrapper<BusDeviceRunningEntity>().lambda().eq(BusDeviceRunningEntity::getDeviceId,bindDeviceId).set(BusDeviceRunningEntity::getMaster,0));
-                }else {
-                    device.setMaster(0);
-                }
-            }
-            if (StrUtil.isNullOrUndefined(bindDeviceId)) {
-                //之前的病号为无泵状态,无泵 -》 有泵 做处理 todo
-                log.error("病号:【{}】临床发生由无泵转为有泵",patientOperator.getCode());
-            }
-
-            //无泵判断处理
-            //上条记录所绑定的病号
-            String originPatientCode=deviceOperator.getPatientCode();
-            //当泵所绑定的病号发生变化时,进行原先泵号无泵判定
-            if(!StrUtil.isNullOrUndefined(originPatientCode)&&!device.getPatientCode().equals(originPatientCode)){
-                //检查是原先的病号是否发生了无泵异常
-                PatientOperator<PatientCacheInfo> originPatientOperator =patientRegistry.getPatient(tenantId, originPatientCode);
-                Set<DeviceTimeSmallInfo> originPatientBindDevices = originPatientOperator.getAllDevice();
-
-                Set<DeviceTimeSmallInfo> originRemainPatientBindDevices = originPatientBindDevices.stream().filter(bindDevice -> !bindDevice.getDeviceId().equals(deviceId)).collect(Collectors.toSet());
-                //临床无泵绑定时,查看是否存在副泵,若存在将开始时间稍后的泵设置为副泵,若不存在,则报无泵异常
-                if(CollectionUtil.isEmpty(originRemainPatientBindDevices)){
-                    //todo 发起无泵报警,处理原先泵的无泵信息
-                    log.error("病号:【{}】临床发生无泵报警",originPatientOperator.getCode());
-
-                    originPatientOperator.setBindDeviceId(null);
-                    originPatientOperator.setAllDevice(new ArrayList<>());
-                }
-                else  {
-                    //将开始时间最大的泵设置为主泵
-                    Optional<DeviceTimeSmallInfo> master = originRemainPatientBindDevices.stream().max((o1,o2)->
-                            o1.getStartTime().equals(o2.getStartTime())?0:o1.getStartTime().before(o2.getStartTime()) ? -1 : 1
-
-                    );
-                    if(master.isPresent()){
-                        log.error("病号:[{}],主泵变为[{}]",originPatientCode,master.get().getDeviceId());
-                        deviceUsingService
-                                .update(new UpdateWrapper<BusDeviceRunningEntity>().lambda()
-                                        .eq(BusDeviceRunningEntity::getDeviceId,master.get().getDeviceId())
-                                        .set(BusDeviceRunningEntity::getMaster,1));
-                        //更新泵原来绑定病人缓存信息
-                        originPatientOperator.setBindDeviceId(master.get().getDeviceId());
-                    }
-                }
-                //更新泵原来绑定病人缓存信息
-                originPatientOperator.setAllDevice(originRemainPatientBindDevices);
+            //处理设备运行数据
+            boolean first = handleRunningInfo(device, deviceOperator,cacheOperation);
+            //处理病患数据,解决泵重复,无泵等问题
+            handlePatient(device,cacheOperation);
+            //绑定临床信息 todo
+            if(first){
+                deviceUsingService.save(device);
+            }else {
+                deviceUsingService.updateById(device);
             }
-            deviceUsingService.saveOrUpdate(device);
             /****************处理泵与患者关系****************/
 
             //则推送设备上报消息
             String topic = WebSocketConstant.getDeviceInfoDetailTopic(null, device.getId(), device.getTenantId());
             redisTemplate.convertAndSend(topic, device);
+            //更新缓存信息
+            cacheOperation.forEach(Supplier::get);
+        }
+        log.info("结束处理时间------------------------[{}]",System.currentTimeMillis());
 
-            //更新设备缓存信息
-            DeviceCacheInfo deviceCacheInfo = DeviceCacheInfo.builder()
-                    .tenantId(device.getTenantId())
-                    .usingId(device.getId())
-                    .status(device.getRunState())
-                    .startTime(device.getStartTime())
-                    .master(device.getMaster()!=null&&device.getMaster() == 1)
-                    .patientCode(device.getPatientCode())
-                    .classification(device.getClassification())
-                    .build();
-            deviceOperator.set(deviceCacheInfo);
+    }
 
-            //更新病人缓存信息
-            Set<DeviceTimeSmallInfo> allDevice = Optional.ofNullable(patientOperator.getAllDevice()).orElse(new HashSet<>());
-            allDevice.add(DeviceTimeSmallInfo.of(deviceId,device.getStartTime()));
-            PatientCacheInfo patientCacheInfo = PatientCacheInfo.builder()
-                    .bindDeviceId(deviceId)
-//                    .clinicId()
-                    .code(device.getPatientCode())
-//                    .isFinished()
-//                    .name()
-//                    .startTime()
-                    .tenantId(device.getTenantId())
-//                    .gender()
-                    .devices(allDevice)
-                    .build();
-            patientOperator.set(patientCacheInfo);
+
+    /**
+     * 设备运行数据处理,返回是否为第一次接受数据消息
+     * @param device 接收到的设备信息
+     * @param deviceOperator 设备缓存信息操作符
+     * @return 是否为第一次接收数据消息
+     */
+    private boolean handleRunningInfo( BusDeviceRunningEntity device,DeviceOperator<DeviceCacheInfo> deviceOperator,List<Supplier<?>> suppliers){
+        //判断此条数据的分包标识是否发生了改变,若改变则
+        String originClassify = deviceOperator.getClassification();
+        String classification = device.getClassification();
+        //默认为主泵,后续判断若不满足主泵条件,则替换为副泵
+        device.setMaster(1);
+        if(classification==null){
+            log.error("设备号:[{}]分包标识号为空,无法更新开始时间");
+            classification="-1";
         }
-        log.info("结束处理时间------------------------[{}]",System.currentTimeMillis());
+        if(!classification.equals(originClassify)){
+            deviceOperator.setClassification(classification);
+            //分包标识发生了改变,设备开机时间重新计算
+            device.setStartTime(new Date());
+        }
+
+        String usingId = deviceOperator.getUsingId();
+        //todo 这部分操作交由上游处理
+        device.setTenantId(deviceOperator.getTenantId());
+        device.setMonitorType(1);
+        boolean first=false;
+        if(StrUtil.isNullOrUndefined(usingId)){
+            //设备首次运行,记录开机时间
+            device.setId(String.valueOf(IdWorker.getId()));
+            device.setStartTime(new Date());
+            device.setAlias(deviceOperator.getAlias());
+
+            first=true;
+        }else {
+            device.setId(usingId);
+            device.setStartTime(deviceOperator.getStartTime());
+        }
+        suppliers.add(()->{
+            //更新设备缓存数据
+            deviceOperator.setClassification(device.getClassification());
+            deviceOperator.setTenantId(device.getTenantId());
+            deviceOperator.setUsingId(device.getId());
+            deviceOperator.setStatus(device.getRunState());
+            deviceOperator.setStartTime(device.getStartTime());
+            deviceOperator.setMaster(device.getMaster()!=null&&device.getMaster() == 1);
+            deviceOperator.setPatientCode(device.getPatientCode());
+            return null;
+        });
+        return first;
 
     }
 
     /**
-     * 初始化病人信息
-     * @param pump
-     * @param patientOperator
+     * 病患信息处理,返回结果为当前病患是否存在临床信息
+     * @param device 接收到的设备数据
+     * @return 是否发生了换泵操作
      */
-    private void initPatient(BusDeviceRunningEntity pump, PatientOperator patientOperator) {
-        //填充病人名称和病人性别
-        pump.setPatientName(patientOperator.getName());
-        pump.setPatientSex(patientOperator.getGender());
+    private void handlePatient(BusDeviceRunningEntity device,List<Supplier<?>> suppliers){
+        String deviceId = device.getDeviceId();
+        DeviceOperator<DeviceCacheInfo> currentDeviceOperator = deviceRegistry.getDevice(deviceId);
+        PatientOperator<PatientCacheInfo> currentPatientOperator = patientRegistry.getPatient(device.getTenantId(), device.getPatientCode());
+        String bindDeviceId = currentPatientOperator.getBindDeviceId();
+        //当前病号所绑定的泵发生了改变,对当前病号进行处理
+        if(!StrUtil.isNullOrUndefined(bindDeviceId)&&!deviceId.equals(bindDeviceId)){
+            handleConflictCurrentPatient(device,suppliers);
+        }else if(StrUtil.isNullOrUndefined(bindDeviceId)){
+            //之前的病号为无泵状态,无泵 -》 有泵 做处理  修改缓存信息
+            log.error("病号:【{}】临床发生由无泵转为有泵",device.getPatientCode());
+        }
+        //更新泵所绑定当前病人缓存信息
+        suppliers.add(()->{
+            Set<DeviceTimeSmallInfo> allDevice = Optional.ofNullable(currentPatientOperator.getAllDevice()).orElse(new HashSet<>());
+            allDevice.add(DeviceTimeSmallInfo.of(deviceId,device.getStartTime()));
+            currentPatientOperator.setAllDevice(allDevice);
+            if(device.getMaster()==1){
+                currentPatientOperator.setBindDeviceId(deviceId);
+            }
+            return null;
+        });
+        String originPatientCode=currentDeviceOperator.getPatientCode();
+        //当前泵所绑定病号发生了改变,对之前所绑定病号进行处理
+        if(!StrUtil.isNullOrUndefined(originPatientCode)&&!device.getPatientCode().equals(originPatientCode)){
+            handleConflictOriginPatient(deviceId,device.getTenantId(),originPatientCode,suppliers);
+        }
     }
 
     /**
-     * 初始化设备状态
-     * @param pump
-     * @param deviceOperator
+     * 病患信息处理,当泵所绑定病号发生变化时,处理泵原来所绑定病号信息
      */
-    private void initDevice(BusDeviceRunningEntity pump, DeviceOperator deviceOperator){
-        //处理无泵状态(即泵首次与医院进行绑定)
-        Date now = new Date();
-        //设置注册时间和泵的开始时间
-        pump.setId(String.valueOf(IdWorker.getId()));
-        pump.setStartTime(now);
-        pump.setAlias(deviceOperator.getAlias());
+    private void handleConflictOriginPatient(String deviceId, String hospitalId, String patientCode, List<Supplier<?>> suppliers){
+        PatientOperator<PatientCacheInfo> patientOperator = patientRegistry.getPatient(hospitalId, patientCode);
+        Set<DeviceTimeSmallInfo> allDevice = patientOperator.getAllDevice();
+        //过滤掉已换绑的泵,获取剩余所绑定的泵数据
+        if(CollectionUtils.isNotEmpty(allDevice)){
+            Set<DeviceTimeSmallInfo> remainPatientBindDevices = allDevice.stream().filter(bindDevice -> !bindDevice.getDeviceId().equals(deviceId)).collect(Collectors.toSet());
+            if(CollectionUtil.isEmpty(remainPatientBindDevices)){
+
+                log.error("病号:【{}】临床发生无泵报警",patientCode);
+                suppliers.add(()->{
+                    //todo 发起无泵报警,处理原先泵的无泵信息
+                    patientOperator.setBindDeviceId(null);
+                    patientOperator.setAllDevice(new ArrayList<>());
+                    return null;
+                });
+            }else {
+                //将开始时间最大的泵设置为主泵
+                Optional<DeviceTimeSmallInfo> master = remainPatientBindDevices.stream().max((o1,o2)->
+                        o1.getStartTime().equals(o2.getStartTime())?0:o1.getStartTime().before(o2.getStartTime()) ? -1 : 1
+
+                );
+                if(master.isPresent()){
+                    log.error("病号:[{}],主泵变为[{}]",patientCode,master.get().getDeviceId());
+                    deviceUsingService
+                            .update(new UpdateWrapper<BusDeviceRunningEntity>().lambda()
+                                    .eq(BusDeviceRunningEntity::getDeviceId,master.get().getDeviceId())
+                                    .set(BusDeviceRunningEntity::getMaster,1));
+                    suppliers.add(()->{
+                        patientOperator.setBindDeviceId(master.get().getDeviceId());
+                        patientOperator.setAllDevice(remainPatientBindDevices);
+                        return null;
+                    });
+                }
+            }
+        }else {
+            suppliers.add(()->{
+                patientOperator.setBindDeviceId(null);
+                return null;
+            });
+        }
     }
 
+    /**
+     * 发生冲突时,病患信息处理,当泵所绑定病号发生变化时,处理泵现在所绑定病号信息
+     * @param device 设备运行数据
+     */
+    private void handleConflictCurrentPatient(BusDeviceRunningEntity device, List<Supplier<?>> suppliers){
+        String deviceId = device.getDeviceId();
+        PatientOperator<PatientCacheInfo> patientOperator = patientRegistry.getPatient(device.getTenantId(), device.getPatientCode());
+        String bindDeviceId = patientOperator.getBindDeviceId();
+        if(!StrUtil.isNullOrUndefined(bindDeviceId)&&!deviceId.equals(bindDeviceId)){
+            //泵号发生改变,获取病号绑定的泵信息,判断绑定的泵开始时间,将开始时间稍后的泵设置为主泵
+            DeviceOperator<DeviceCacheInfo> patientCurrentBindDevice = deviceRegistry.getDevice(bindDeviceId);
+            if (patientCurrentBindDevice.getStartTime().before(device.getStartTime())) {
+                log.error("病号:[{}],之前主泵为:[{}],现在主泵为:[{}]",device.getPatientCode(),bindDeviceId,deviceId);
+                //设置当前上传信息的泵为主泵,将旧泵设置为副泵,并更新病人绑定泵的消息 todo 更新缓存信息
+                device.setMaster(1);
+                deviceUsingService.update(new UpdateWrapper<BusDeviceRunningEntity>().lambda().eq(BusDeviceRunningEntity::getDeviceId,bindDeviceId).set(BusDeviceRunningEntity::getMaster,0));
+            }else {
+                device.setMaster(0);
+            }
+        }
+    }
 
     /**
      * 判断是否为报警信息并处理
@@ -270,10 +300,15 @@ public class DeviceInfoListener {
     }
 
 
-    //    @Scheduled(cron = "0/15 * * * * ?")
+    public static boolean s=false;
+        @Scheduled(cron = "0/15 * * * * ?")
     public void send() throws InterruptedException {
 //        List<BusDeviceRunningEntity> list = deviceUsingService.list();
 //        list.forEach(pump->{
+            if(s){
+                return;
+            }
+            s=true;
         BusDeviceRunningEntity pump = new BusDeviceRunningEntity();
         pump.setClassification("-1");
         pump.setDeviceId("123");