lifang 3 тижнів тому
батько
коміт
14fb7dbee9

+ 0 - 12
nb-admin/src/main/resources/application-dev.yml

@@ -183,18 +183,6 @@ oss:
     accessSecret: "R7hOvMfiHb0PYroDqUDXAYgB9htQss"
     endpoint: "oss-cn-hangzhou.aliyuncs.com"
     bucketName: "tuoren-nb-dev"
-mqtt:
-  client:
-    id: nb-netpump-client-dev
-  broker:
-    url: tcp://192.168.100.115:1883
-    username: hospital
-    password: 1qaz!QAZ
-  connect-options:
-    clean-session: true
-    connection-timeout: 30
-    keep-alive-interval: 60
-    automatic-reconnect: true
 notify:
   wechat:
     url: https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=c3e093fe-5125-47d5-a171-0f4be2f61a78

+ 2 - 0
nb-core/src/main/java/com/nb/core/config/MqttConfig.java

@@ -52,6 +52,8 @@ public class MqttConfig {
         }
         // 设置自动重连
         options.setAutomaticReconnect(mqttProperties.getConnectOptions().isAutomaticReconnect());
+        // 设置重连相关参数
+        options.setMaxReconnectDelay(mqttProperties.getConnectOptions().getMaxReconnectDelay());
         factory.setConnectionOptions(options);
         return factory;
     }

+ 27 - 1
nb-core/src/main/java/com/nb/core/config/MqttProperties.java

@@ -48,7 +48,7 @@ public class MqttProperties {
         /**
          * 代理URL
          */
-        private String url = "tcp://192.168.100.115:1883";
+        private String url = "tcp://iot.tuoren.com:1883";
         
         /**
          * 用户名
@@ -106,6 +106,16 @@ public class MqttProperties {
          */
         private boolean automaticReconnect = true;
         
+        /**
+         * 重连间隔(毫秒)
+         */
+        private int reconnectDelay = 5000;
+        
+        /**
+         * 最大重连延迟(毫秒)
+         */
+        private int maxReconnectDelay = 60000;
+        
         public boolean isCleanSession() {
             return cleanSession;
         }
@@ -137,6 +147,22 @@ public class MqttProperties {
         public void setAutomaticReconnect(boolean automaticReconnect) {
             this.automaticReconnect = automaticReconnect;
         }
+        
+        public int getReconnectDelay() {
+            return reconnectDelay;
+        }
+        
+        public void setReconnectDelay(int reconnectDelay) {
+            this.reconnectDelay = reconnectDelay;
+        }
+        
+        public int getMaxReconnectDelay() {
+            return maxReconnectDelay;
+        }
+        
+        public void setMaxReconnectDelay(int maxReconnectDelay) {
+            this.maxReconnectDelay = maxReconnectDelay;
+        }
     }
     
     // Getters and Setters

+ 15 - 12
nb-core/src/main/java/com/nb/core/utils/MqttClientUtil.java

@@ -52,7 +52,7 @@ public class MqttClientUtil implements ApplicationContextAware {
         try {
             mqttClientFactory = applicationContext.getBean(MqttPahoClientFactory.class);
         } catch (Exception e) {
-
+            // 忽略异常,如果获取不到工厂则无法进行MQTT操作
         }
     }
 
@@ -96,14 +96,14 @@ public class MqttClientUtil implements ApplicationContextAware {
      */
     public void publish(String hospitalCode, String topic, Object payload, int qos, boolean retained) {
         try {
-            // 创建消息实体
-            MqttMessage message = new MqttMessage(topic, JSONUtil.toJsonStr( payload));
-            message.setClientId(CLIENT_ID);
-            
             if (mqttClientFactory == null) {
                 return;
             }
             
+            // 创建消息实体
+            MqttMessage message = new MqttMessage(topic, JSONUtil.toJsonStr(payload));
+            message.setClientId(CLIENT_ID);
+            
             MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(CLIENT_ID, mqttClientFactory);
             messageHandler.setDefaultTopic("hospitalInfo/"+hospitalCode);
             messageHandler.setDefaultQos(qos);
@@ -125,14 +125,14 @@ public class MqttClientUtil implements ApplicationContextAware {
      * @param topic 主题
      * @param messageHandler 消息处理器
      */
-    public void subscribe( String topic, MessageHandler messageHandler) {
+    public void subscribe(String topic, MessageHandler messageHandler) {
         try {
             if (mqttClientFactory == null) {
                 return;
             }
             // 如果已经订阅了该主题,则先取消订阅
             if (subscribers.containsKey(topic)) {
-                unsubscribe( topic);
+                unsubscribe(topic);
             }
             
             MqttPahoMessageDrivenChannelAdapter adapter = 
@@ -173,12 +173,15 @@ public class MqttClientUtil implements ApplicationContextAware {
      * 取消订阅
      * @param topic 主题
      */
-    public void unsubscribe( String topic) {
-        String key = topic;
-        MqttPahoMessageDrivenChannelAdapter adapter = subscribers.get(key);
+    public void unsubscribe(String topic) {
+        MqttPahoMessageDrivenChannelAdapter adapter = subscribers.get(topic);
         if (adapter != null) {
-            adapter.stop();
-            subscribers.remove(key);
+            try {
+                adapter.stop();
+            } catch (Exception e) {
+                // 忽略停止异常
+            }
+            subscribers.remove(topic);
         }
     }
 

+ 77 - 0
nb-service/web-service/src/main/java/com/nb/web/service/bus/listener/BusEvaluationMessageListener.java

@@ -0,0 +1,77 @@
+package com.nb.web.service.bus.listener;
+
+import cn.hutool.core.util.ObjectUtil;
+import com.alibaba.fastjson.JSON;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.nb.common.config.annotation.TenantIgnore;
+import com.nb.core.entity.MqttMessage;
+import com.nb.core.enums.MqttTopicEnum;
+import com.nb.core.handler.AbstractMqttMessageHandler;
+import com.nb.web.api.entity.BusEvaluationEntity;
+import com.nb.web.service.bus.entity.BusPatientEntity;
+import com.nb.web.service.bus.mapper.BusPatientMapper;
+import com.nb.web.service.bus.service.LocalBusEvaluationService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * 评价信息下发主题监听器
+ * 处理 bus/evaluation 主题的MQTT消息
+ *
+ * @author YourName
+ * @version 1.0.0
+ */
+@Slf4j
+@Component
+public class BusEvaluationMessageListener extends AbstractMqttMessageHandler {
+
+    @Autowired
+    private LocalBusEvaluationService evaluationService;
+
+    @Autowired
+    private BusPatientMapper patientMapper;
+    /**
+     * 定义支持的主题模式
+     * @return 支持的主题模式
+     */
+    @Override
+    protected String getSupportedTopicPattern() {
+        return MqttTopicEnum.BUS_EVALUATION;
+    }
+
+    /**
+     * 处理MQTT消息
+     * @param message MQTT消息实体
+     */
+    @Override
+    @TenantIgnore
+    public void handle(MqttMessage message) {
+        log.info("接收到评价信息下发消息,主题: {}, 内容: {}", message.getTopic(), message.getPayload());
+        
+        try {
+            // 解析消息内容为BusEvaluationEntity对象
+            BusEvaluationEntity evaluation = JSON.parseObject(message.getPayload(), BusEvaluationEntity.class);
+            evaluation.setPatientId(null);
+            evaluation.setInfusionId(null);
+            evaluation.setClinicId(null);
+            evaluation.setTenantId(null);
+
+            BusPatientEntity patient = patientMapper.selectOne(new LambdaQueryWrapper<BusPatientEntity>()
+                    .eq(BusPatientEntity::getOriginCode, evaluation.getPatientCode())
+                    .last("limit 1"));
+            if(ObjectUtil.isNull(patient)){
+                return;
+            }
+            evaluation.setPatientId(patient.getId());
+            evaluation.setInfusionId(patient.getInfusionId());
+            evaluation.setClinicId(patient.getClinicId());
+            evaluation.setTenantId(patient.getTenantId());
+            // 保存到数据库
+            evaluationService.save(evaluation);
+            log.info("评价信息处理完成,ID: {}", evaluation.getId());
+        } catch (Exception e) {
+            log.error("处理评价信息时发生错误: ", e);
+        }
+    }
+}

+ 81 - 0
nb-service/web-service/src/main/java/com/nb/web/service/bus/listener/BusUndoMessageListener.java

@@ -0,0 +1,81 @@
+package com.nb.web.service.bus.listener;
+
+import cn.hutool.core.util.ObjectUtil;
+import cn.hutool.core.util.StrUtil;
+import cn.hutool.json.JSONUtil;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.nb.common.config.annotation.TenantIgnore;
+import com.nb.core.entity.MqttMessage;
+import com.nb.core.enums.MqttTopicEnum;
+import com.nb.core.handler.AbstractMqttMessageHandler;
+import com.nb.web.service.bus.entity.BusPatientEntity;
+import com.nb.web.service.bus.mapper.BusPatientMapper;
+import com.nb.web.service.bus.service.LocalBusClinicService;
+import com.nb.web.service.bus.service.LocalBusInfusionHistoryService;
+import com.nb.web.service.bus.service.dto.ManualUndoConfig;
+import com.nb.web.service.bus.service.dto.MqttUndoDTO;
+import jodd.io.StreamUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.Arrays;
+
+/**
+ * 撤泵信息下发主题监听器
+ * 处理 bus/undo 主题的MQTT消息
+ *
+ * @author YourName
+ * @version 1.0.0
+ */
+@Slf4j
+@Component
+public class BusUndoMessageListener extends AbstractMqttMessageHandler {
+    @Autowired
+    private BusPatientMapper patientMapper;
+
+    @Autowired
+    private LocalBusInfusionHistoryService infusionHistoryService;
+    /**
+     * 定义支持的主题模式
+     * @return 支持的主题模式
+     */
+    @Override
+    protected String getSupportedTopicPattern() {
+        return MqttTopicEnum.BUS_UNDO;
+    }
+
+    /**
+     * 处理MQTT消息
+     * @param message MQTT消息实体
+     */
+    @Override
+    @TenantIgnore
+    public void handle(MqttMessage message) {
+        log.info("接收到撤泵信息下发消息,主题: {}, 内容: {}", message.getTopic(), message.getPayload());
+        try {
+            // 解析消息内容为MqttUndoDTO对象
+            MqttUndoDTO mqttUndoDTO = JSONUtil.toBean(message.getPayload(), MqttUndoDTO.class);
+            ManualUndoConfig manualUndoConfig = mqttUndoDTO.getManualUndoConfig();
+
+            log.info("撤泵配置信息:{}", JSONUtil.toJsonStr(manualUndoConfig));
+            BusPatientEntity patient = patientMapper.selectOne(new LambdaQueryWrapper<BusPatientEntity>()
+                    .eq(BusPatientEntity::getOriginCode, manualUndoConfig.getPatientCode())
+                    .last("limit 1"));
+            if(ObjectUtil.isNull(patient)){
+                return;
+            }
+            manualUndoConfig.setClinicId(patient.getClinicId());
+            manualUndoConfig.setPatientId(patient.getId());
+            manualUndoConfig.setTenantId(patient.getTenantId());
+            if(StrUtil.isNotEmpty(patient.getInfusionId())){
+                manualUndoConfig.setInfusionIds(Arrays.asList(patient.getInfusionId()));
+            }
+            infusionHistoryService.undo(manualUndoConfig,mqttUndoDTO.getFinishClinic());
+
+            log.info("撤泵信息处理完成");
+        } catch (Exception e) {
+            log.error("处理撤泵信息时发生错误: ", e);
+        }
+    }
+}