HospitalFunctionAnalConfigHandler.java 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. package com.coffee.bus.config;
  2. import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
  3. import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
  4. import com.coffee.bus.config.bean.FunctionAnalConfig;
  5. import com.coffee.bus.entity.BusDeviceRunningEntity;
  6. import com.coffee.bus.entity.BusInfusionHistoryEntity;
  7. import com.coffee.bus.registry.device.DeviceRegistry;
  8. import com.coffee.bus.service.LocalBusDeviceRunningService;
  9. import com.coffee.bus.service.LocalBusInfusionHistoryService;
  10. import com.coffee.bus.utils.WsPublishUtils;
  11. import com.coffee.common.cache.ClusterConfigStorage;
  12. import com.coffee.common.util.RedissonUtil;
  13. import lombok.Builder;
  14. import lombok.Data;
  15. import lombok.Getter;
  16. import lombok.extern.slf4j.Slf4j;
  17. import org.redisson.api.RDelayedQueue;
  18. import java.io.Serializable;
  19. import java.util.Date;
  20. import java.util.Optional;
  21. import java.util.concurrent.TimeUnit;
  22. /**
  23. * @author lifang
  24. * @version 1.0.0
  25. * @ClassName HospitalFunctionAnalConfigHandler.java
  26. * @Description 镇痛不足检测
  27. * @createTime 2022年05月18日 11:25:00
  28. */
  29. @Slf4j
  30. public class HospitalFunctionAnalConfigHandler extends ClusterConfigStorage implements HospitalConfigHandler<FunctionAnalConfig,BusDeviceRunningEntity> {
  31. private final LocalBusDeviceRunningService deviceRunningService;
  32. private final LocalBusInfusionHistoryService infusionHistoryService;
  33. private final RedissonUtil redissonUtil;
  34. private final DeviceRegistry deviceRegistry;
  35. @Getter
  36. private String name;
  37. public HospitalFunctionAnalConfigHandler(String name, LocalBusDeviceRunningService deviceRunningService, LocalBusInfusionHistoryService infusionHistoryService, RedissonUtil redissonUtil, DeviceRegistry deviceRegistry, WsPublishUtils wsPublishUtils) {
  38. super(redissonUtil.getRedissonClient(), name);
  39. this.name=name;
  40. this.deviceRunningService = deviceRunningService;
  41. this.infusionHistoryService = infusionHistoryService;
  42. this.redissonUtil = redissonUtil;
  43. this.deviceRegistry = deviceRegistry;
  44. this.wsPublishUtils = wsPublishUtils;
  45. }
  46. private WsPublishUtils wsPublishUtils;
  47. @Override
  48. public String getId() {
  49. return "insufficient";
  50. }
  51. @Override
  52. public String getDescription() {
  53. return "进行镇痛不足判定、不在服务区判定、低输注判定";
  54. }
  55. @Override
  56. public void setConfig(FunctionAnalConfig config) {
  57. this.setConfig("config",config);
  58. }
  59. @Override
  60. public void handler(BusDeviceRunningEntity source) {
  61. FunctionAnalConfig config = this.getConfig("config").as(FunctionAnalConfig.class);
  62. if(config==null){
  63. log.warn("id:{},配置名称:{},不存在",getId(),name);
  64. return;
  65. }
  66. handlerAnal(source,config);
  67. }
  68. /**
  69. * 描述: 镇痛不足处理
  70. * 在时间窗口中判断pca是否达到一定次数
  71. * @author lifang
  72. * @date 2022/5/18 14:05
  73. * @param source
  74. * @param analConfig
  75. * @return void
  76. */
  77. private void handlerAnal(BusDeviceRunningEntity source,FunctionAnalConfig analConfig){
  78. RDelayedQueue delayedQueue = redissonUtil.getDelayedQueue(getId() + "-" + source.getTenantId() + "-" + source.getDeviceId(), e -> {
  79. if(e instanceof AnalEntity){
  80. this.judgeAnalPoor((AnalEntity) e);
  81. }
  82. });
  83. //如果有新的输注产生,则清空延迟队列
  84. if(source.isNewInfusion()){
  85. //镇痛不足设置为false
  86. source.setWarnAnalgesicPoor(false);
  87. delayedQueue.clear();
  88. }
  89. Integer insufficientTime = analConfig.getInsufficientTime();
  90. Integer insufficientCount = analConfig.getInsufficientCount();
  91. Boolean valid = analConfig.getValid();
  92. if(valid==null||(insufficientTime==null||insufficientTime<=0)||
  93. (insufficientCount==null||insufficientCount<=0)){
  94. return;
  95. }
  96. AnalEntity anal = AnalEntity.builder()
  97. .deviceId(source.getDeviceId())
  98. .pcaValidCount(source.getPcaValidCount())
  99. .pcaInvalidCount(source.getPcaInvalidCount())
  100. .timeout(analConfig.getInsufficientTime())
  101. .unit(TimeUnit.SECONDS)
  102. .threshold(analConfig.getInsufficientCount())
  103. .judgeByValid(analConfig.getValid())
  104. .tenantId(source.getTenantId())
  105. .timestamp(new Date())
  106. .build();
  107. delayedQueue.offer(anal,anal.getTimeout(),anal.getUnit());
  108. }
  109. /**
  110. * 描述: 判断延迟任务是否触发镇痛不足
  111. * @author lifang
  112. * @date 2022/5/19 9:14
  113. * @param
  114. * @return void
  115. */
  116. private void judgeAnalPoor(AnalEntity anal){
  117. //镇痛消失延迟队列
  118. RDelayedQueue noneAnalDelayedQueue = redissonUtil.getDelayedQueue("none-" + getId()+"-" + anal.getTenantId()+"-" + anal.getDeviceId(), e -> {
  119. if(e instanceof NoneAnalEntity){
  120. handleNoneAnal((NoneAnalEntity) e);
  121. }
  122. });
  123. String deviceId = anal.getDeviceId();
  124. BusDeviceRunningEntity runningInfo = deviceRunningService.getOne(new QueryWrapper<BusDeviceRunningEntity>().lambda().eq(BusDeviceRunningEntity::getDeviceId, deviceId).eq(BusDeviceRunningEntity::getTenantId, anal.getTenantId()));
  125. if(runningInfo==null){
  126. //泵已换绑医院,无需再处理
  127. return;
  128. }
  129. //PCA差值
  130. int subCount=0;
  131. if(anal.isJudgeByValid()){
  132. subCount=Math.subtractExact(runningInfo.getPcaValidCount(), Optional.ofNullable(anal.getPcaValidCount()).orElse(0));
  133. }else {
  134. subCount=Math.subtractExact(runningInfo.getPcaInvalidCount(), Optional.ofNullable(anal.getPcaInvalidCount()).orElse(0));
  135. }
  136. if(subCount>anal.getThreshold()){
  137. log.info("设备:{}镇痛不足",deviceId);
  138. //触发阈值,设置为镇痛不足
  139. if(!Boolean.TRUE.equals(runningInfo.getWarnAnalgesicPoor())){
  140. deviceRunningService.update(new UpdateWrapper<BusDeviceRunningEntity>().lambda().eq(BusDeviceRunningEntity::getId,runningInfo.getId())
  141. .set(BusDeviceRunningEntity::getWarnAnalgesicPoor,true));
  142. infusionHistoryService.update(new UpdateWrapper<BusInfusionHistoryEntity>().lambda().eq(BusInfusionHistoryEntity::getId,runningInfo.getInfusionId())
  143. .set(BusInfusionHistoryEntity::getWarnAnalgesicPoor,true));
  144. //报警/提醒缓存重置
  145. deviceRegistry.getOperator(deviceId).setAlarmOrWarn(null);
  146. if(Boolean.TRUE.equals(runningInfo.getMaster())){
  147. //发布推送
  148. wsPublishUtils.publishPatientMonitor(runningInfo.getPatientCode(),runningInfo.getTenantId());
  149. }
  150. //设置镇痛不足后,设置镇痛消失延迟时间
  151. }
  152. noneAnalDelayedQueue.clear();
  153. }else {
  154. //没有触发阈值,发入镇痛消失处理
  155. FunctionAnalConfig config = this.getConfig("config").as(FunctionAnalConfig.class);
  156. NoneAnalEntity noneAnal = NoneAnalEntity.builder()
  157. .deviceId(anal.getDeviceId())
  158. .tenantId(anal.getTenantId())
  159. .timeout(config.getDisappearTime())
  160. .unit(TimeUnit.SECONDS)
  161. .timestamp(new Date())
  162. .build();
  163. noneAnalDelayedQueue.offer(noneAnal,noneAnal.getTimeout(),noneAnal.getUnit());
  164. }
  165. }
  166. private void handleNoneAnal(NoneAnalEntity noneAnal){
  167. String deviceId = noneAnal.getDeviceId();
  168. BusDeviceRunningEntity runningInfo = deviceRunningService.getOne(new QueryWrapper<BusDeviceRunningEntity>().lambda().eq(BusDeviceRunningEntity::getDeviceId, deviceId).eq(BusDeviceRunningEntity::getTenantId, noneAnal.getTenantId()));
  169. if(runningInfo==null){
  170. //泵已换绑医院,无需再处理
  171. return;
  172. }
  173. //将设备由镇痛不足取消成功
  174. if (deviceRunningService.update(new UpdateWrapper<BusDeviceRunningEntity>().lambda().eq(BusDeviceRunningEntity::getId, runningInfo.getId())
  175. .eq(BusDeviceRunningEntity::getWarnAnalgesicPoor, true)
  176. .set(BusDeviceRunningEntity::getWarnAnalgesicPoor, false))) {
  177. log.info("设备:{}取消镇痛不足",deviceId);
  178. infusionHistoryService.update(new UpdateWrapper<BusInfusionHistoryEntity>().lambda().eq(BusInfusionHistoryEntity::getId,runningInfo.getInfusionId())
  179. .set(BusInfusionHistoryEntity::getWarnAnalgesicPoor,false));
  180. deviceRegistry.getOperator(deviceId).setAlarmOrWarn(null);
  181. if(Boolean.TRUE.equals(runningInfo.getMaster())){
  182. //发布推送
  183. wsPublishUtils.publishPatientMonitor(runningInfo.getPatientCode(),runningInfo.getTenantId());
  184. }
  185. }
  186. }
  187. @Data
  188. @Builder
  189. static class AnalEntity implements Serializable {
  190. private String deviceId;
  191. private Integer pcaValidCount;
  192. private Integer pcaInvalidCount;
  193. private Integer timeout;
  194. private TimeUnit unit;
  195. private Integer threshold;
  196. private boolean judgeByValid;
  197. private String tenantId;
  198. private Date timestamp;
  199. }
  200. @Data
  201. @Builder
  202. static class NoneAnalEntity implements Serializable {
  203. private String deviceId;
  204. private Integer timeout;
  205. private TimeUnit unit;
  206. private String tenantId;
  207. private Date timestamp;
  208. }
  209. }