Parcourir la source

Revert "update 优化设备数据处理逻辑"

This reverts commit b6883ca54c785fd2632a1256b24e4afe3006d356.

# Conflicts:
#	coffee-system/src/main/java/com/coffee/bus/websocket/listener/DeviceInfoListener.java
18339543638 il y a 3 ans
Parent
commit
8fe184f15b

+ 0 - 53
coffee-admin/src/test/java/com/coffee/admin/BusNetpumpTest.java

@@ -1,53 +0,0 @@
-package com.coffee.admin;
-
-import com.coffee.bus.controller.BusDeviceRunningController;
-import com.coffee.bus.entity.BusDeviceRunningEntity;
-import com.coffee.bus.service.LocalBusDeviceRunningService;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.test.context.junit4.SpringRunner;
-
-import java.util.List;
-
-/**
- * @author lifang
- * @version 1.0.0
- * @ClassName BusHospitalTest.java
- * @Description TODO
- * @createTime 2022年03月19日 10:27:00
- */
-@RunWith(SpringRunner.class)
-@SpringBootTest(classes = AdminApplication.class)
-public class BusNetpumpTest {
-    @Autowired
-    private LocalBusDeviceRunningService netPumpService;
-
-    @Autowired
-    private BusDeviceRunningController netPumpController;
-    @Test
-    public void save(){
-//        StpUtil.login();
-        BusDeviceRunningEntity netPumpEntity = new BusDeviceRunningEntity();
-//        netPumpEntity.setWarn(NetPumpWarnEnum.ComingEnd);
-        netPumpEntity.setId("1507903748141658113");
-        netPumpEntity.setAlias("1");
-        netPumpEntity.setBedNo("1");
-        netPumpEntity.setDeviceId("123");
-        netPumpEntity.setTenantId("123");
-        netPumpService.updateById(netPumpEntity);
-    }
-
-    @Test
-    public void query(){
-        List<BusDeviceRunningEntity> list = netPumpService.list();
-        System.out.println(list);
-    }
-
-    @Test
-    public void Delete(){
-//        boolean b = busHospitalService.removeById(1505789328745721857L);
-//        System.out.println(b);
-    }
-}

+ 0 - 44
coffee-common/src/main/java/com/coffee/common/config/websocket/DefaultMessageListener.java

@@ -1,44 +0,0 @@
-package com.coffee.common.config.websocket;
-
-import cn.hutool.core.collection.CollectionUtil;
-import cn.hutool.json.JSONUtil;
-import com.coffee.common.result.R;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.data.redis.connection.Message;
-import org.springframework.data.redis.connection.MessageListener;
-import org.springframework.util.SerializationUtils;
-import org.tio.core.ChannelContext;
-import org.tio.core.Tio;
-import org.tio.websocket.common.WsResponse;
-
-import java.io.UnsupportedEncodingException;
-import java.util.Set;
-
-/**
- * @author lifang
- * @version 1.0.0
- * @ClassName DefaultMessageListener.java
- * @Description TODO
- * @createTime 2022年03月25日 14:42:00
- */
-@Data
-@Slf4j
-public class DefaultMessageListener implements MessageListener {
-    private final String id;
-    private final Set<ChannelContext> channelContexts;
-
-
-    @Override
-    public void onMessage(Message message, byte[] pattern) {
-        if (CollectionUtil.isNotEmpty(channelContexts)) {
-            channelContexts.parallelStream()
-                    .filter(channelContext -> !channelContext.isClosed)
-                    .forEach(channel -> Tio.send(channel,
-                            WsResponse.fromText(JSONUtil.toJsonStr(R.success(
-                                    MessageResponse.of(id,"result",new String(message.getBody()))))
-                                    ,"utf-8")));
-        }
-    }
-}

+ 0 - 134
coffee-common/src/main/java/com/coffee/common/config/websocket/handler/Subscribe.java

@@ -1,134 +0,0 @@
-package com.coffee.common.config.websocket.handler;
-
-import cn.hutool.core.collection.CollectionUtil;
-import com.coffee.common.Constants;
-import com.coffee.common.bo.LoginUser;
-import com.coffee.common.config.websocket.DefaultMessageListener;
-import com.coffee.common.config.websocket.MessagingRequest;
-import com.coffee.common.config.websocket.WebSocketConstant;
-import org.springframework.dao.DataAccessException;
-import org.springframework.data.redis.connection.RedisConnection;
-import org.springframework.data.redis.core.RedisCallback;
-import org.springframework.data.redis.core.RedisTemplate;
-import org.springframework.util.ConcurrentReferenceHashMap;
-import org.tio.core.ChannelContext;
-
-import javax.annotation.Resource;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
-
-/**
- * @author lifang
- * @version 1.0.0
- * @ClassName Subscribe.java
- * @Description TODO
- * @createTime 2022年03月25日 14:18:00
- */
-public abstract class Subscribe implements WsHandler {
-    @Resource
-    private RedisTemplate<String,Object> redisTemplate;
-
-    public static final String SUBSCRIBE_TOPIC="subscribe-topic";
-    /**
-     * 存储主题与ws通道关联
-     */
-    private Map<String,Set<ChannelContext>> subscribeTopics=new ConcurrentHashMap<>();
-
-
-    /**
-     * 存储主题与redis通道关联
-     */
-    private Map<String,RedisConnection> redisConnectionMap=new ConcurrentReferenceHashMap<>();
-
-
-    public String getTopic(String productName,String param,String tenantId){
-        return  WebSocketConstant.getTopic(this.getId(),productName, param, tenantId);
-
-    };
-
-
-    @Override
-    public void onMessage(MessagingRequest message, ChannelContext channelContext) {
-        LoginUser loginUser = (LoginUser) channelContext.get(Constants.LOGIN_USER_KEY);
-        if(loginUser==null){
-            channelContext.setClosed(true);
-            return;
-        }
-        //获取所有设备id
-        List<String> params = message.getParams();
-        if(CollectionUtil.isEmpty(params)){
-            return;
-        }
-        //需要处理的主题
-        List<String> subScribeTopic =
-                params.stream().map(deviceId -> getTopic(message.getProductName(), deviceId, loginUser.getTenantId()))
-                        .collect(Collectors.toList());
-        MessagingRequest.Type type = message.getType();
-        if(MessagingRequest.Type.sub==type){
-            //订阅主题
-            subScribeTopic.forEach(topic->this.subscribe(channelContext,topic));
-        }else {
-            //取消订阅主题
-            subScribeTopic.forEach(topic->this.unsubscribe(channelContext,topic));
-        }
-    }
-
-    /**
-     * ws 订阅主题
-     * @param channelContext
-     * @param topic
-     */
-    public void subscribe(ChannelContext channelContext, String topic){
-        //同一主题只订阅一次
-        Set<ChannelContext> channelContexts = Optional.ofNullable(subscribeTopics.get(topic)).orElse(new HashSet<>());
-        if(!subscribeTopics.containsKey(topic)){
-            redisTemplate.execute(new RedisCallback<Object>() {
-                @Override
-                public Object doInRedis(RedisConnection connection) throws DataAccessException {
-                    connection.pSubscribe(new DefaultMessageListener(getId(),channelContexts),topic.getBytes());
-                    redisConnectionMap.put(topic,connection);
-                    return null;
-                }
-            });
-        }
-        channelContexts.add(channelContext);
-        subscribeTopics.put(topic,channelContexts);
-        //将主题与ws通道绑定
-        Object result = Optional.ofNullable(channelContext.get(SUBSCRIBE_TOPIC)).orElse(new HashSet<>());
-        Set<String> subscribeTopicSet= (Set<String>) result;
-        subscribeTopicSet.add(topic);
-        channelContext.set(SUBSCRIBE_TOPIC,subscribeTopicSet);
-    };
-
-    /**
-     * ws取消订阅主题
-     * @param channelContext
-     * @param topic
-     */
-    public void unsubscribe(ChannelContext channelContext, String topic){
-        if(subscribeTopics.containsKey(topic)){
-            Set<ChannelContext> channelContexts = subscribeTopics.get(topic);
-            if(CollectionUtil.isNotEmpty(channelContexts)){
-                channelContexts.remove(channelContext);
-            }
-            //重新获取集合,避免多线程发生冲突,再次判断此时是否为空
-            if(CollectionUtil.isEmpty(subscribeTopics.get(topic))){
-                subscribeTopics.remove(topic);
-                redisTemplate.execute(new RedisCallback<Object>() {
-                    @Override
-                    public Object doInRedis(RedisConnection connection) throws DataAccessException {
-                        RedisConnection redisConnection = redisConnectionMap.get(topic);
-                        if (redisConnection!=null) {
-                            redisConnection.getSubscription().unsubscribe(topic.getBytes());
-                        }
-                        return null;
-                    }
-                });
-                redisConnectionMap.remove(topic);
-            }
-        }
-    };
-
-
-}

+ 1 - 1
coffee-system/src/main/java/com/coffee/bus/entity/BusDeviceRunningEntity.java

@@ -33,7 +33,7 @@ import java.util.Date;
 @Data
 @Data
 @EqualsAndHashCode(callSuper = false)
 @EqualsAndHashCode(callSuper = false)
 @Accessors(chain = true)
 @Accessors(chain = true)
-@TableName(value = "bus_device_running",autoResultMap = true)
+@TableName(value = "bus_device_using",autoResultMap = true)
 @ApiModel(value="设备运行状态", description="设备运行状态")
 @ApiModel(value="设备运行状态", description="设备运行状态")
 public class BusDeviceRunningEntity extends TenantGenericEntity<String,String> {
 public class BusDeviceRunningEntity extends TenantGenericEntity<String,String> {
 
 

+ 0 - 41
coffee-system/src/main/java/com/coffee/bus/enums/DeviceStatusEnum.java

@@ -1,41 +0,0 @@
-package com.coffee.bus.enums;
-
-import com.fasterxml.jackson.annotation.JsonFormat;
-import io.swagger.annotations.ApiModelProperty;
-import io.swagger.annotations.ApiOperation;
-import lombok.AllArgsConstructor;
-import lombok.Getter;
-import lombok.Setter;
-
-/**
- * @author lifang
- * @version 1.0.0
- * @ClassName AlarmEnum.java
- * @Description 泵状态类型
- * @createTime 2022年03月27日 09:49:00
- */
-@AllArgsConstructor
-@JsonFormat(shape = JsonFormat.Shape.OBJECT)
-public enum DeviceStatusEnum {
-    /**
-     * 请在添加或修改的过程中不要改变枚举的顺序!!!!!!!!!!!!!!!
-     * 如若新增,请在最后尾部新增
-     * 切记不可删除!!!!!!!!!!!!
-     */
-    Running(0,"正在运行"),
-    Waiting(1,"待机中"),
-    Shutdown(2,"关机"),
-    Pause(3,"待机"),;
-
-
-    /**
-     * 与枚举ordinal保持一致
-     */
-    @Getter
-    @ApiModelProperty("运行状态编码")
-    private Integer code;
-    @Getter
-    @ApiModelProperty("运行状态描述")
-    private String text;
-
-}

+ 0 - 40
coffee-system/src/main/java/com/coffee/bus/enums/NetPumpWarnEnum.java

@@ -1,40 +0,0 @@
-package com.coffee.bus.enums;
-
-import com.alibaba.fastjson.annotation.JSONType;
-import com.fasterxml.jackson.annotation.*;
-import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-import lombok.AllArgsConstructor;
-import lombok.Getter;
-
-/**
- * @author lifang
- * @version 1.0.0
- * @ClassName AlarmEnum.java
- * @Description 泵报警类型
- * @createTime 2022年03月27日 09:49:00
- */
-@AllArgsConstructor
-@JsonFormat(shape = JsonFormat.Shape.OBJECT)
-public enum NetPumpWarnEnum {
-    /**
-     * 请在添加或修改的过程中不要改变枚举的顺序!!!!!!!!!!!!!!!
-     * 如若新增,请在最后尾部新增
-     * 切记不可删除!!!!!!!!!!!!
-     */
-    UpLimit(0,"加档受限"),
-    ComingEnd(1,"即将输液完毕"),
-    LowInfusion(2,"低输注状态"),
-    LowBattery(3,"电量不足"),
-    FlowRateLimit(4,"流速已达上限"),
-    InsufficientAnalgesia(5,"镇痛不足");
-
-
-    /**
-     * 与枚举ordinal保持一致
-     */
-    @Getter
-    private Integer code;
-    @Getter
-    private String text;
-
-}

+ 0 - 3
coffee-system/src/main/java/com/coffee/bus/registry/Operator.java

@@ -1,7 +1,6 @@
 package com.coffee.bus.registry;
 package com.coffee.bus.registry;
 
 
 import cn.hutool.core.util.ObjectUtil;
 import cn.hutool.core.util.ObjectUtil;
-import com.baomidou.mybatisplus.core.toolkit.support.SFunction;
 import com.coffee.common.cache.ConfigStorage;
 import com.coffee.common.cache.ConfigStorage;
 import com.coffee.common.cache.value.Value;
 import com.coffee.common.cache.value.Value;
 import org.springframework.cache.support.SimpleValueWrapper;
 import org.springframework.cache.support.SimpleValueWrapper;
@@ -10,7 +9,6 @@ import org.springframework.util.ReflectionUtils;
 
 
 import java.lang.reflect.Field;
 import java.lang.reflect.Field;
 import java.util.*;
 import java.util.*;
-import java.util.function.BiFunction;
 
 
 /**
 /**
  * @author lifang
  * @author lifang
@@ -42,7 +40,6 @@ public interface Operator<T> {
         getConfig().setConfig(key,value);
         getConfig().setConfig(key,value);
     };
     };
 
 
-
     default Map<String, Value> getAll() {
     default Map<String, Value> getAll() {
         Map<String, Value> result = getConfig().getKeys(getAllKeys());
         Map<String, Value> result = getConfig().getKeys(getAllKeys());
         List<String> keys = new LinkedList<>();
         List<String> keys = new LinkedList<>();

+ 8 - 8
coffee-system/src/main/java/com/coffee/bus/script/PythonParse.java

@@ -20,17 +20,17 @@ import java.util.Properties;
 @Slf4j
 @Slf4j
 @Component
 @Component
 public class PythonParse implements ScriptParse {
 public class PythonParse implements ScriptParse {
-    private final PythonInterpreter interpreter;
+    private PythonInterpreter interpreter;
     private PyFunction pyFunction;
     private PyFunction pyFunction;
 
 
     public PythonParse() {
     public PythonParse() {
-        Properties p = new Properties();
-        p.setProperty("python.console.encoding", "UTF-8");
-        PySystemState systemState = Py.getSystemState();
-        PyString xml = new PyString(Thread.currentThread().getContextClassLoader().getResource("python").getPath());
-        systemState.path.append(xml);
-        PythonInterpreter.initialize(System.getProperties(), p, new String[] {});
-        interpreter = new PythonInterpreter();
+//        Properties p = new Properties();
+//        p.setProperty("python.console.encoding", "UTF-8");
+//        PySystemState systemState = Py.getSystemState();
+//        PyString xml = new PyString(Thread.currentThread().getContextClassLoader().getResource("python").getPath());
+//        systemState.path.append(xml);
+//        PythonInterpreter.initialize(System.getProperties(), p, new String[] {});
+//        interpreter = new PythonInterpreter();
     }
     }
 
 
     @Override
     @Override

+ 9 - 9
coffee-system/src/main/java/com/coffee/bus/script/ScriptManager.java

@@ -72,14 +72,14 @@ public class ScriptManager implements CommandLineRunner {
 
 
     @Override
     @Override
     public void run(String... args) {
     public void run(String... args) {
-        List<BusHospitalEntity> hospitals = hospitalService.list(new QueryWrapper<BusHospitalEntity>().lambda().isNotNull(BusHospitalEntity::getScript));
-        //将脚本初始化
-        if(CollectionUtil.isNotEmpty(hospitals)){
-            hospitals.stream().filter(hospitalEntity -> ObjectUtil.isNotNull(hospitalEntity.getScript())).forEach(hospital -> {
-
-                Script script = hospital.getScript();
-                resetScript(hospital.getId(),script.getId(),script.getContent());
-            });
-        }
+//        List<BusHospitalEntity> hospitals = hospitalService.list(new QueryWrapper<BusHospitalEntity>().lambda().isNotNull(BusHospitalEntity::getScript));
+//        //将脚本初始化
+//        if(CollectionUtil.isNotEmpty(hospitals)){
+//            hospitals.stream().filter(hospitalEntity -> ObjectUtil.isNotNull(hospitalEntity.getScript())).forEach(hospital -> {
+//
+//                Script script = hospital.getScript();
+//                resetScript(hospital.getId(),script.getId(),script.getContent());
+//            });
+//        }
     }
     }
 }
 }

+ 134 - 169
coffee-system/src/main/java/com/coffee/bus/websocket/listener/DeviceInfoListener.java

@@ -5,7 +5,6 @@ import cn.hutool.core.util.RandomUtil;
 import cn.hutool.core.util.StrUtil;
 import cn.hutool.core.util.StrUtil;
 import cn.hutool.extra.spring.SpringUtil;
 import cn.hutool.extra.spring.SpringUtil;
 import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
 import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
-import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
 import com.baomidou.mybatisplus.core.toolkit.IdWorker;
 import com.baomidou.mybatisplus.core.toolkit.IdWorker;
 import com.coffee.bus.entity.BusDeviceAlarmEntity;
 import com.coffee.bus.entity.BusDeviceAlarmEntity;
 import com.coffee.bus.entity.BusDeviceRunningEntity;
 import com.coffee.bus.entity.BusDeviceRunningEntity;
@@ -30,13 +29,11 @@ import lombok.extern.slf4j.Slf4j;
 import org.springframework.context.event.EventListener;
 import org.springframework.context.event.EventListener;
 import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.scheduling.annotation.Async;
 import org.springframework.scheduling.annotation.Async;
-import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 import org.springframework.stereotype.Component;
 import org.springframework.transaction.annotation.Transactional;
 import org.springframework.transaction.annotation.Transactional;
 
 
 import java.math.BigDecimal;
 import java.math.BigDecimal;
 import java.util.*;
 import java.util.*;
-import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Collectors;
 
 
 /**
 /**
@@ -72,7 +69,7 @@ public class DeviceInfoListener {
     @Async
     @Async
     @Transactional(rollbackFor = Exception.class)
     @Transactional(rollbackFor = Exception.class)
     public void deviceInfoDetail(DeviceInfoEvent infoEvent){
     public void deviceInfoDetail(DeviceInfoEvent infoEvent){
-        //保证统一设备数据顺序处理,若数据发送过快,为防止冲突,请在此加锁处理 todo
+        //若数据发送过快,为防止冲突,请在此加锁处理 todo
         synchronized (infoEvent.getDeviceId()){
         synchronized (infoEvent.getDeviceId()){
             log.info("开始处理时间------------------------[{}]",System.currentTimeMillis());
             log.info("开始处理时间------------------------[{}]",System.currentTimeMillis());
             /****************处理泵数据 todo 后续交由上游处理****************/
             /****************处理泵数据 todo 后续交由上游处理****************/
@@ -80,191 +77,164 @@ public class DeviceInfoListener {
             //1、判断该设备是否已和医院绑定并开启使用
             //1、判断该设备是否已和医院绑定并开启使用
             String deviceId = device.getDeviceId();
             String deviceId = device.getDeviceId();
             DeviceOperator<DeviceCacheInfo> deviceOperator = deviceRegistry.getDevice(deviceId);
             DeviceOperator<DeviceCacheInfo> deviceOperator = deviceRegistry.getDevice(deviceId);
-
             if (deviceOperator==null||!Boolean.TRUE.equals(deviceOperator.getEnable())) {
             if (deviceOperator==null||!Boolean.TRUE.equals(deviceOperator.getEnable())) {
                 log.warn("设备[{}]暂不可用,数据已丢弃",deviceId);
                 log.warn("设备[{}]暂不可用,数据已丢弃",deviceId);
                 return ;
                 return ;
             }
             }
             log.info("接收到设备数据:[{}]",infoEvent.getContent().toString());
             log.info("接收到设备数据:[{}]",infoEvent.getContent().toString());
-            //缓存操作
-            List<Supplier<?>> cacheOperation=new ArrayList<>();
+            /********************根据分包标记标识判断是否为新的开机时间*******************/
+            String originClassify = deviceOperator.getClassification();
+            String classification = device.getClassification();
+            if(classification==null){
+                log.error("设备号:[{}]分包标识号为空,无法更新开始时间");
+                classification="-1";
+            }
+            if(!classification.equals(originClassify)){
+                deviceOperator.setClassification(classification);
+                deviceOperator.setStartTime(new Date());
+            }
+            /********************根据分包标记标识判断是否为新的开机时间*******************/
+            String tenantId = deviceOperator.getTenantId();
+            PatientOperator<PatientCacheInfo> patientOperator =patientRegistry.getPatient(tenantId, device.getPatientCode());
+            device.setTenantId(tenantId);
+            device.setMonitorType(1);
+            //默认为主泵
+            device.setMaster(1);
+            device.setStartTime(deviceOperator.getStartTime());
+            device.setAlias(deviceOperator.getAlias());
+            String usingId = deviceOperator.getUsingId();
+            if(StrUtil.isNullOrUndefined(usingId)){
+                //不存在运行id,即首次运行,需要与病人、医院进行绑定
+                initDevice(device,deviceOperator);
+                initPatient(device,patientOperator);
+            }
+            else {
+                device.setId(usingId);
+            }
+            /****************处理泵数据****************/
 
 
-            //处理设备运行数据
-            boolean first = handleRunningInfo(device, deviceOperator,cacheOperation);
-            //处理病患数据,解决泵重复,无泵等问题
-            handlePatient(device,cacheOperation);
-            //绑定临床信息 todo
-            if(first){
-                deviceUsingService.save(device);
-            }else {
-                deviceUsingService.updateById(device);
+            /****************处理泵与患者关系****************/
+            //泵绑定重复判定
+            //获取病号当前绑定的泵号
+            String bindDeviceId = patientOperator.getBindDeviceId();
+            if(!StrUtil.isNullOrUndefined(bindDeviceId)&&!deviceId.equals(bindDeviceId)){
+                //泵号发生改变,判断泵的开始时间,将开始时间稍后的泵设置为主泵再与设备绑定
+                DeviceOperator currentBindDevice = deviceRegistry.getDevice(bindDeviceId);
+                if (currentBindDevice.getStartTime().before(device.getStartTime())) {
+                    log.error("病号:[{}],之前主泵为:[{}],现在主泵为:[{}]",device.getPatientCode(),bindDeviceId,deviceId);
+                    //设置当前上传信息的泵为主泵,将旧泵设置为副泵吗,并更新病人绑定泵的信息, todo 发生泵重复报警
+                    deviceUsingService.update(new UpdateWrapper<BusDeviceRunningEntity>().lambda().eq(BusDeviceRunningEntity::getDeviceId,deviceId).set(BusDeviceRunningEntity::getMaster,1));
+                    deviceUsingService.update(new UpdateWrapper<BusDeviceRunningEntity>().lambda().eq(BusDeviceRunningEntity::getDeviceId,bindDeviceId).set(BusDeviceRunningEntity::getMaster,0));
+                }else {
+                    device.setMaster(0);
+                }
+            }
+            if (StrUtil.isNullOrUndefined(bindDeviceId)) {
+                //之前的病号为无泵状态,无泵 -》 有泵 做处理 todo
+                log.error("病号:【{}】临床发生由无泵转为有泵",patientOperator.getCode());
             }
             }
+
+            //无泵判断处理
+            //上条记录所绑定的病号
+            String originPatientCode=deviceOperator.getPatientCode();
+            //当泵所绑定的病号发生变化时,进行原先泵号无泵判定
+            if(!StrUtil.isNullOrUndefined(originPatientCode)&&!device.getPatientCode().equals(originPatientCode)){
+                //检查是原先的病号是否发生了无泵异常
+                PatientOperator<PatientCacheInfo> originPatientOperator =patientRegistry.getPatient(tenantId, originPatientCode);
+                Set<DeviceTimeSmallInfo> originPatientBindDevices = originPatientOperator.getAllDevice();
+
+                Set<DeviceTimeSmallInfo> originRemainPatientBindDevices = originPatientBindDevices.stream().filter(bindDevice -> !bindDevice.getDeviceId().equals(deviceId)).collect(Collectors.toSet());
+                //临床无泵绑定时,查看是否存在副泵,若存在将开始时间稍后的泵设置为副泵,若不存在,则报无泵异常
+                if(CollectionUtil.isEmpty(originRemainPatientBindDevices)){
+                    //todo 发起无泵报警,处理原先泵的无泵信息
+                    log.error("病号:【{}】临床发生无泵报警",originPatientOperator.getCode());
+
+                    originPatientOperator.setBindDeviceId(null);
+                    originPatientOperator.setAllDevice(new ArrayList<>());
+                }
+                else  {
+                    //将开始时间最大的泵设置为主泵
+                    Optional<DeviceTimeSmallInfo> master = originRemainPatientBindDevices.stream().max((o1,o2)->
+                            o1.getStartTime().equals(o2.getStartTime())?0:o1.getStartTime().before(o2.getStartTime()) ? -1 : 1
+
+                    );
+                    if(master.isPresent()){
+                        log.error("病号:[{}],主泵变为[{}]",originPatientCode,master.get().getDeviceId());
+                        deviceUsingService
+                                .update(new UpdateWrapper<BusDeviceRunningEntity>().lambda()
+                                        .eq(BusDeviceRunningEntity::getDeviceId,master.get().getDeviceId())
+                                        .set(BusDeviceRunningEntity::getMaster,1));
+                        //更新泵原来绑定病人缓存信息
+                        originPatientOperator.setBindDeviceId(master.get().getDeviceId());
+                    }
+                }
+                //更新泵原来绑定病人缓存信息
+                originPatientOperator.setAllDevice(originRemainPatientBindDevices);
+            }
+            deviceUsingService.saveOrUpdate(device);
             /****************处理泵与患者关系****************/
             /****************处理泵与患者关系****************/
 
 
             //则推送设备上报消息
             //则推送设备上报消息
             String topic = WebSocketConstant.getDeviceInfoDetailTopic(null, device.getId(), device.getTenantId());
             String topic = WebSocketConstant.getDeviceInfoDetailTopic(null, device.getId(), device.getTenantId());
             redisTemplate.convertAndSend(topic, device);
             redisTemplate.convertAndSend(topic, device);
-            //更新缓存信息
-            cacheOperation.forEach(Supplier::get);
-        }
-        log.info("结束处理时间------------------------[{}]",System.currentTimeMillis());
-
-    }
-
-
-    /**
-     * 设备运行数据处理,返回是否为第一次接受数据消息
-     * @param device 接收到的设备信息
-     * @param deviceOperator 设备缓存信息操作符
-     * @return 是否为第一次接收数据消息
-     */
-    private boolean handleRunningInfo( BusDeviceRunningEntity device,DeviceOperator<DeviceCacheInfo> deviceOperator,List<Supplier<?>> suppliers){
-        //判断此条数据的分包标识是否发生了改变,若改变则
-        String originClassify = deviceOperator.getClassification();
-        String classification = device.getClassification();
-        //默认为主泵,后续判断若不满足主泵条件,则替换为副泵
-        device.setMaster(1);
-        if(classification==null){
-            log.error("设备号:[{}]分包标识号为空,无法更新开始时间");
-            classification="-1";
-        }
-        if(!classification.equals(originClassify)){
-            deviceOperator.setClassification(classification);
-            //分包标识发生了改变,设备开机时间重新计算
-            device.setStartTime(new Date());
-        }
 
 
-        String usingId = deviceOperator.getUsingId();
-        //todo 这部分操作交由上游处理
-        device.setTenantId(deviceOperator.getTenantId());
-        device.setMonitorType(1);
-        boolean first=false;
-        if(StrUtil.isNullOrUndefined(usingId)){
-            //设备首次运行,记录开机时间
-            device.setId(String.valueOf(IdWorker.getId()));
-            device.setStartTime(new Date());
-            device.setAlias(deviceOperator.getAlias());
+            //更新设备缓存信息
+            DeviceCacheInfo deviceCacheInfo = DeviceCacheInfo.builder()
+                    .tenantId(device.getTenantId())
+                    .usingId(device.getId())
+                    .status(device.getRunState())
+                    .startTime(device.getStartTime())
+                    .master(device.getMaster()!=null&&device.getMaster() == 1)
+                    .patientCode(device.getPatientCode())
+                    .classification(device.getClassification())
+                    .build();
+            deviceOperator.set(deviceCacheInfo);
 
 
-            first=true;
-        }else {
-            device.setId(usingId);
-            device.setStartTime(deviceOperator.getStartTime());
+            //更新病人缓存信息
+            Set<DeviceTimeSmallInfo> allDevice = Optional.ofNullable(patientOperator.getAllDevice()).orElse(new HashSet<>());
+            allDevice.add(DeviceTimeSmallInfo.of(deviceId,device.getStartTime()));
+            PatientCacheInfo patientCacheInfo = PatientCacheInfo.builder()
+                    .bindDeviceId(deviceId)
+//                    .clinicId()
+                    .code(device.getPatientCode())
+//                    .isFinished()
+//                    .name()
+//                    .startTime()
+                    .tenantId(device.getTenantId())
+//                    .gender()
+                    .devices(allDevice)
+                    .build();
+            patientOperator.set(patientCacheInfo);
         }
         }
-        suppliers.add(()->{
-            //更新设备缓存数据
-            deviceOperator.setClassification(device.getClassification());
-            deviceOperator.setTenantId(device.getTenantId());
-            deviceOperator.setUsingId(device.getId());
-            deviceOperator.setStatus(device.getRunState());
-            deviceOperator.setStartTime(device.getStartTime());
-            deviceOperator.setMaster(device.getMaster()!=null&&device.getMaster() == 1);
-            deviceOperator.setPatientCode(device.getPatientCode());
-            return null;
-        });
-        return first;
+        log.info("结束处理时间------------------------[{}]",System.currentTimeMillis());
 
 
     }
     }
 
 
     /**
     /**
-     * 病患信息处理,返回结果为当前病患是否存在临床信息
-     * @param device 接收到的设备数据
-     * @return 是否发生了换泵操作
+     * 初始化病人信息
+     * @param pump
+     * @param patientOperator
      */
      */
-    private void handlePatient(BusDeviceRunningEntity device,List<Supplier<?>> suppliers){
-        String deviceId = device.getDeviceId();
-        DeviceOperator<DeviceCacheInfo> currentDeviceOperator = deviceRegistry.getDevice(deviceId);
-        PatientOperator<PatientCacheInfo> currentPatientOperator = patientRegistry.getPatient(device.getTenantId(), device.getPatientCode());
-        String bindDeviceId = currentPatientOperator.getBindDeviceId();
-        //当前病号所绑定的泵发生了改变,对当前病号进行处理
-        if(!StrUtil.isNullOrUndefined(bindDeviceId)&&!deviceId.equals(bindDeviceId)){
-            handleConflictCurrentPatient(device,suppliers);
-        }else if(StrUtil.isNullOrUndefined(bindDeviceId)){
-            //之前的病号为无泵状态,无泵 -》 有泵 做处理  修改缓存信息
-            log.error("病号:【{}】临床发生由无泵转为有泵",device.getPatientCode());
-        }
-        //更新泵所绑定当前病人缓存信息
-        suppliers.add(()->{
-            Set<DeviceTimeSmallInfo> allDevice = Optional.ofNullable(currentPatientOperator.getAllDevice()).orElse(new HashSet<>());
-            allDevice.add(DeviceTimeSmallInfo.of(deviceId,device.getStartTime()));
-            currentPatientOperator.setAllDevice(allDevice);
-            if(device.getMaster()==1){
-                currentPatientOperator.setBindDeviceId(deviceId);
-            }
-            return null;
-        });
-        String originPatientCode=currentDeviceOperator.getPatientCode();
-        //当前泵所绑定病号发生了改变,对之前所绑定病号进行处理
-        if(!StrUtil.isNullOrUndefined(originPatientCode)&&!device.getPatientCode().equals(originPatientCode)){
-            handleConflictOriginPatient(deviceId,device.getTenantId(),originPatientCode,suppliers);
-        }
+    private void initPatient(BusDeviceRunningEntity pump, PatientOperator patientOperator) {
+        //填充病人名称和病人性别
+        pump.setPatientName(patientOperator.getName());
+        pump.setPatientSex(patientOperator.getGender());
     }
     }
 
 
     /**
     /**
-     * 病患信息处理,当泵所绑定病号发生变化时,处理泵原来所绑定病号信息
+     * 初始化设备状态
+     * @param pump
+     * @param deviceOperator
      */
      */
-    private void handleConflictOriginPatient(String deviceId, String hospitalId, String patientCode, List<Supplier<?>> suppliers){
-        PatientOperator<PatientCacheInfo> patientOperator = patientRegistry.getPatient(hospitalId, patientCode);
-        Set<DeviceTimeSmallInfo> allDevice = patientOperator.getAllDevice();
-        //过滤掉已换绑的泵,获取剩余所绑定的泵数据
-        if(CollectionUtils.isNotEmpty(allDevice)){
-            Set<DeviceTimeSmallInfo> remainPatientBindDevices = allDevice.stream().filter(bindDevice -> !bindDevice.getDeviceId().equals(deviceId)).collect(Collectors.toSet());
-            if(CollectionUtil.isEmpty(remainPatientBindDevices)){
-
-                log.error("病号:【{}】临床发生无泵报警",patientCode);
-                suppliers.add(()->{
-                    //todo 发起无泵报警,处理原先泵的无泵信息
-                    patientOperator.setBindDeviceId(null);
-                    patientOperator.setAllDevice(new ArrayList<>());
-                    return null;
-                });
-            }else {
-                //将开始时间最大的泵设置为主泵
-                Optional<DeviceTimeSmallInfo> master = remainPatientBindDevices.stream().max((o1,o2)->
-                        o1.getStartTime().equals(o2.getStartTime())?0:o1.getStartTime().before(o2.getStartTime()) ? -1 : 1
-
-                );
-                if(master.isPresent()){
-                    log.error("病号:[{}],主泵变为[{}]",patientCode,master.get().getDeviceId());
-                    deviceUsingService
-                            .update(new UpdateWrapper<BusDeviceRunningEntity>().lambda()
-                                    .eq(BusDeviceRunningEntity::getDeviceId,master.get().getDeviceId())
-                                    .set(BusDeviceRunningEntity::getMaster,1));
-                    suppliers.add(()->{
-                        patientOperator.setBindDeviceId(master.get().getDeviceId());
-                        patientOperator.setAllDevice(remainPatientBindDevices);
-                        return null;
-                    });
-                }
-            }
-        }else {
-            suppliers.add(()->{
-                patientOperator.setBindDeviceId(null);
-                return null;
-            });
-        }
+    private void initDevice(BusDeviceRunningEntity pump, DeviceOperator deviceOperator){
+        //处理无泵状态(即泵首次与医院进行绑定)
+        Date now = new Date();
+        //设置注册时间和泵的开始时间
+        pump.setId(String.valueOf(IdWorker.getId()));
+        pump.setStartTime(now);
+        pump.setAlias(deviceOperator.getAlias());
     }
     }
 
 
-    /**
-     * 发生冲突时,病患信息处理,当泵所绑定病号发生变化时,处理泵现在所绑定病号信息
-     * @param device 设备运行数据
-     */
-    private void handleConflictCurrentPatient(BusDeviceRunningEntity device, List<Supplier<?>> suppliers){
-        String deviceId = device.getDeviceId();
-        PatientOperator<PatientCacheInfo> patientOperator = patientRegistry.getPatient(device.getTenantId(), device.getPatientCode());
-        String bindDeviceId = patientOperator.getBindDeviceId();
-        if(!StrUtil.isNullOrUndefined(bindDeviceId)&&!deviceId.equals(bindDeviceId)){
-            //泵号发生改变,获取病号绑定的泵信息,判断绑定的泵开始时间,将开始时间稍后的泵设置为主泵
-            DeviceOperator<DeviceCacheInfo> patientCurrentBindDevice = deviceRegistry.getDevice(bindDeviceId);
-            if (patientCurrentBindDevice.getStartTime().before(device.getStartTime())) {
-                log.error("病号:[{}],之前主泵为:[{}],现在主泵为:[{}]",device.getPatientCode(),bindDeviceId,deviceId);
-                //设置当前上传信息的泵为主泵,将旧泵设置为副泵,并更新病人绑定泵的消息 todo 更新缓存信息
-                device.setMaster(1);
-                deviceUsingService.update(new UpdateWrapper<BusDeviceRunningEntity>().lambda().eq(BusDeviceRunningEntity::getDeviceId,bindDeviceId).set(BusDeviceRunningEntity::getMaster,0));
-            }else {
-                device.setMaster(0);
-            }
-        }
-    }
 
 
     /**
     /**
      * 判断是否为报警信息并处理
      * 判断是否为报警信息并处理
@@ -300,15 +270,10 @@ public class DeviceInfoListener {
     }
     }
 
 
 
 
-    public static boolean s=false;
-        @Scheduled(cron = "0/15 * * * * ?")
+    //    @Scheduled(cron = "0/15 * * * * ?")
     public void send() throws InterruptedException {
     public void send() throws InterruptedException {
 //        List<BusDeviceRunningEntity> list = deviceUsingService.list();
 //        List<BusDeviceRunningEntity> list = deviceUsingService.list();
 //        list.forEach(pump->{
 //        list.forEach(pump->{
-            if(s){
-                return;
-            }
-            s=true;
         BusDeviceRunningEntity pump = new BusDeviceRunningEntity();
         BusDeviceRunningEntity pump = new BusDeviceRunningEntity();
         pump.setClassification("-1");
         pump.setClassification("-1");
         pump.setDeviceId("123");
         pump.setDeviceId("123");