Bladeren bron

add
设备历史数据、报警数据、修改数据改为批量存储

lifang 1 maand geleden
bovenliggende
commit
49bdd89127

+ 42 - 35
nb-admin/src/main/resources/application-dev.yml

@@ -109,12 +109,12 @@ spring:
               user: root
               password: 123456
               validationQuery: SELECT 1
-#  rabbitmq:
-#    host: 192.168.100.32
-#    port: 5672
-#    username: guest
-#    password: guest
-#    virtual-host: /
+  rabbitmq:
+    host: 192.168.100.32
+    port: 5672
+    username: guest
+    password: guest
+    virtual-host: /
   # redis 配置
   redis:
     # 地址
@@ -137,13 +137,6 @@ spring:
         max-active: 16
         # #连接池最大阻塞等待时间(使用负值表示没有限制)
         max-wait: -1ms
-  flyway:
-    enabled: false
-#  rabbitmq:
-#    password: guest
-#    host: 192.168.100.32
-#    port: 5672
-#    username: guest
 request:
   check:
     enable: false
@@ -156,31 +149,33 @@ request:
 
 
 # 阿里云对接配置
-#aliyun:
-#  accessKey: "LTAI4FhB19MgQuviGxwA3aod"
-#  accessSecret: "cQQVkATR0yv2G9CEtfjAhEGBepPDRs"
-#  consumerGroupId: "nalavzBm4RuVJc0BUij7000100"
-#  aliyunUid: "1177450762772738"
-#  regionId: "cn-shanghai"
-#  # iotInstanceId:企业版实例请填写实例ID,公共实例请填空字符串""。
-#  iotInstanceId: ""
-#  server-subscription:
-#    enable: true  # 是否开启阿里云物联网服务端订阅
-#  product:
-#    productKey: a1ALlsBa2ZK
-# 阿里云对接配置
+iot:
+  consumers:
+    - enable: false
+      consumer: com.nb.aliyun.service.consumer.NBAndFourGConsumerGroupService
+      name: NB
+      config:
+        accessKey: "LTAI4G7FA9ytMc76oNkJ45YJ"
+        accessSecret: "R7hOvMfiHb0PYroDqUDXAYgB9htQss"
+        consumerGroupId: "NZKDBbvhxUqtcF5VqDb2000100"
+        aliyunUid: "1238892013759131"
+        regionId: "cn-shanghai"
+        iotInstanceId: "iot-060a0bgd"
+        productKey: he1fACg7ySx
+    - enable: false
+      consumer: com.nb.aliyun.service.consumer.NBAndFourGConsumerGroupService
+      name: 4G
+      config:
+        accessKey: "LTAI5tRaK95AqvWiQ9LspZ3q"
+        accessSecret: "wZUpYGeztNcV4tGbBLfK2OvxnZzZF3"
+        consumerGroupId: "0na0olXpWEmWgMucxIH1000100"
+        aliyunUid: "1777492656465771"
+        regionId: "cn-shanghai"
+        iotInstanceId: "iot-06z00hi2guq5qi3"
+        productKey: k0g9w9xRhhi
 aliyun:
   accessKey: "LTAI4G7FA9ytMc76oNkJ45YJ"
   accessSecret: "R7hOvMfiHb0PYroDqUDXAYgB9htQss"
-  consumerGroupId: "Un44crQ5EQB121syWzFz000100"
-  aliyunUid: "1238892013759131"
-  regionId: "cn-shanghai"
-  # iotInstanceId:企业版实例请填写实例ID,公共实例请填空字符串""。
-  iotInstanceId: "iot-060a0bgd"
-  server-subscription:
-    enable: false  # 是否开启阿里云物联网服务端订阅
-  product:
-    productKey: he1fy3RqSr0
 
 oss:
   aliyun:
@@ -188,6 +183,18 @@ oss:
     accessSecret: "R7hOvMfiHb0PYroDqUDXAYgB9htQss"
     endpoint: "oss-cn-hangzhou.aliyuncs.com"
     bucketName: "tuoren-nb-dev"
+mqtt:
+  client:
+    id: nb-netpump-client-dev
+  broker:
+    url: tcp://192.168.100.115:1883
+    username: hospital
+    password: 1qaz!QAZ
+  connect-options:
+    clean-session: true
+    connection-timeout: 30
+    keep-alive-interval: 60
+    automatic-reconnect: true
 notify:
   wechat:
     url: https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=c3e093fe-5125-47d5-a171-0f4be2f61a78

+ 1 - 1
nb-admin/src/main/resources/application-prod.yml

@@ -223,7 +223,7 @@ mqtt:
   connect-options:
     clean-session: true
     connection-timeout: 30
-    keep-alive-interval: 10
+    keep-alive-interval: 60
     automatic-reconnect: true
 knife4j:
   enable: true

+ 1 - 1
nb-admin/src/main/resources/application.yml

@@ -7,7 +7,7 @@ spring:
   application:
     name: nb
   profiles:
-    active: prod
+    active: dev
   jackson:
     time-zone: GMT+8
   flyway:

+ 13 - 7
nb-core/src/main/java/com/nb/core/config/MqttConfig.java

@@ -1,6 +1,5 @@
 package com.nb.core.config;
 
-import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
@@ -17,6 +16,18 @@ import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  */
 @Configuration
 public class MqttConfig {
+    
+    /**
+     * 创建MQTT配置属性Bean
+     *
+     * @return MqttProperties
+     */
+    @Bean
+    @ConditionalOnMissingBean
+    public MqttProperties mqttProperties() {
+        return new MqttProperties();
+    }
+
     /**
      * 创建默认的MQTT客户端工厂
      * 当容器中没有MqttPahoClientFactory时创建
@@ -25,11 +36,8 @@ public class MqttConfig {
      * @return MqttPahoClientFactory
      */
     @Bean
-    @ConditionalOnBean(MqttProperties.class)
+    @ConditionalOnMissingBean(MqttPahoClientFactory.class)
     public MqttPahoClientFactory mqttClientFactory(MqttProperties mqttProperties) {
-        if(mqttProperties==null||mqttProperties.getBroker()==null){
-            return null;
-        }
         DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
         MqttConnectOptions options = new MqttConnectOptions();
         options.setServerURIs(new String[]{mqttProperties.getBroker().getUrl()});
@@ -44,8 +52,6 @@ public class MqttConfig {
         }
         // 设置自动重连
         options.setAutomaticReconnect(mqttProperties.getConnectOptions().isAutomaticReconnect());
-        // 设置重连相关参数
-        options.setMaxReconnectDelay(mqttProperties.getConnectOptions().getMaxReconnectDelay());
         factory.setConnectionOptions(options);
         return factory;
     }

+ 7 - 33
nb-core/src/main/java/com/nb/core/config/MqttProperties.java

@@ -17,23 +17,23 @@ public class MqttProperties {
     /**
      * 客户端配置
      */
-    private Client client = null;
+    private Client client = new Client();
     
     /**
      * 代理配置
      */
-    private Broker broker = null;
+    private Broker broker = new Broker();
     
     /**
      * 连接选项
      */
-    private ConnectOptions connectOptions = null;
+    private ConnectOptions connectOptions = new ConnectOptions();
     
     public static class Client {
         /**
          * 客户端ID
          */
-        private String id ;
+        private String id = "nb-netpump-client";
         
         public String getId() {
             return id;
@@ -48,17 +48,17 @@ public class MqttProperties {
         /**
          * 代理URL
          */
-        private String url ;
+        private String url = "tcp://192.168.100.115:1883";
         
         /**
          * 用户名
          */
-        private String username;
+        private String username = "hospital";
         
         /**
          * 密码
          */
-        private String password;
+        private String password = "1qaz!QAZ";
         
         public String getUrl() {
             return url;
@@ -106,16 +106,6 @@ public class MqttProperties {
          */
         private boolean automaticReconnect = true;
         
-        /**
-         * 重连间隔(毫秒)
-         */
-        private int reconnectDelay = 5000;
-        
-        /**
-         * 最大重连延迟(毫秒)
-         */
-        private int maxReconnectDelay = 60000;
-        
         public boolean isCleanSession() {
             return cleanSession;
         }
@@ -147,22 +137,6 @@ public class MqttProperties {
         public void setAutomaticReconnect(boolean automaticReconnect) {
             this.automaticReconnect = automaticReconnect;
         }
-        
-        public int getReconnectDelay() {
-            return reconnectDelay;
-        }
-        
-        public void setReconnectDelay(int reconnectDelay) {
-            this.reconnectDelay = reconnectDelay;
-        }
-        
-        public int getMaxReconnectDelay() {
-            return maxReconnectDelay;
-        }
-        
-        public void setMaxReconnectDelay(int maxReconnectDelay) {
-            this.maxReconnectDelay = maxReconnectDelay;
-        }
     }
     
     // Getters and Setters

+ 1 - 7
nb-core/src/main/java/com/nb/core/entity/MqttMessage.java

@@ -1,12 +1,7 @@
 package com.nb.core.entity;
 
-import com.nb.core.handler.MqttMessageHandlerManager;
 import lombok.Data;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.ApplicationContext;
-import org.springframework.context.ApplicationContextAware;
 
-import java.io.Serializable;
 import java.util.Date;
 import java.util.Map;
 
@@ -18,8 +13,7 @@ import java.util.Map;
  * @version 1.0.0
  */
 @Data
-public class MqttMessage implements Serializable {
-    private static final long serialVersionUID = 1L;
+public class MqttMessage {
     /**
      * 主题
      */

+ 95 - 0
nb-core/src/main/java/com/nb/core/service/MqttMessageForwardService.java

@@ -0,0 +1,95 @@
+package com.nb.core.service;
+
+import com.nb.core.entity.MqttMessage;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.stereotype.Service;
+import org.springframework.web.client.RestTemplate;
+
+/**
+ * MQTT消息转发服务
+ * 将MQTT消息通过HTTP转发给其他服务
+ *
+ * @author YourName
+ * @version 1.0.0
+ */
+@Service
+public class MqttMessageForwardService {
+    
+    @Autowired
+    private RestTemplate restTemplate;
+    
+    /**
+     * 转发单个MQTT消息
+     *
+     * @param message MQTT消息
+     * @param targetUrl 目标服务URL
+     * @return 转发结果
+     */
+    public boolean forwardMessage(MqttMessage message, String targetUrl) {
+        try {
+            HttpHeaders headers = new HttpHeaders();
+            headers.setContentType(MediaType.APPLICATION_JSON);
+            
+            HttpEntity<MqttMessage> request = new HttpEntity<>(message, headers);
+            ResponseEntity<String> response = restTemplate.postForEntity(targetUrl, request, String.class);
+            
+            return response.getStatusCode().is2xxSuccessful();
+        } catch (Exception e) {
+            // 记录日志或处理异常
+            e.printStackTrace();
+            return false;
+        }
+    }
+    
+    /**
+     * 批量转发MQTT消息
+     *
+     * @param messages MQTT消息数组
+     * @param targetUrl 目标服务URL
+     * @return 转发结果
+     */
+    public boolean forwardMessages(MqttMessage[] messages, String targetUrl) {
+        try {
+            HttpHeaders headers = new HttpHeaders();
+            headers.setContentType(MediaType.APPLICATION_JSON);
+            
+            HttpEntity<MqttMessage[]> request = new HttpEntity<>(messages, headers);
+            ResponseEntity<String> response = restTemplate.postForEntity(targetUrl, request, String.class);
+            
+            return response.getStatusCode().is2xxSuccessful();
+        } catch (Exception e) {
+            // 记录日志或处理异常
+            e.printStackTrace();
+            return false;
+        }
+    }
+    
+    /**
+     * 转发MQTT消息(带自定义头信息)
+     *
+     * @param message MQTT消息
+     * @param targetUrl 目标服务URL
+     * @param customHeaders 自定义头信息
+     * @return 转发结果
+     */
+    public boolean forwardMessageWithHeaders(MqttMessage message, String targetUrl, HttpHeaders customHeaders) {
+        try {
+            if (customHeaders.getContentType() == null) {
+                customHeaders.setContentType(MediaType.APPLICATION_JSON);
+            }
+            
+            HttpEntity<MqttMessage> request = new HttpEntity<>(message, customHeaders);
+            ResponseEntity<String> response = restTemplate.postForEntity(targetUrl, request, String.class);
+            
+            return response.getStatusCode().is2xxSuccessful();
+        } catch (Exception e) {
+            // 记录日志或处理异常
+            e.printStackTrace();
+            return false;
+        }
+    }
+}

+ 187 - 275
nb-core/src/main/java/com/nb/core/utils/MqttClientUtil.java

@@ -1,333 +1,245 @@
 package com.nb.core.utils;
 
-import lombok.extern.slf4j.Slf4j;
-import org.eclipse.paho.client.mqttv3.*;
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
-
-import java.nio.charset.StandardCharsets;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
+import cn.hutool.json.JSONUtil;
+import com.nb.core.entity.MqttMessage;
+import com.nb.core.service.MqttMessageForwardService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
+import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
+import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
+import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.MessageHandler;
+import org.springframework.messaging.MessagingException;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.Map;
+import java.util.function.Consumer;
 
 /**
  * MQTT客户端工具类
- * 提供MQTT连接、消息发布、订阅等通用功能
+ * 提供MQTT连接、消息发布、订阅等功能
  *
- * @author lingma
+ * @author YourName
  * @version 1.0.0
- * @since 1.0.0
  */
-@Slf4j
-public class MqttClientUtil {
-
-    private MqttClient client;
-    private MqttConnectOptions options;
-    private String serverURI;
-    private String clientId;
-    private String username;
-    private String password;
-
-    /**
-     * 构造函数
-     *
-     * @param serverURI MQTT服务器地址,例如:tcp://localhost:1883
-     * @param clientId  客户端ID,需要唯一
-     */
-    public MqttClientUtil(String serverURI, String clientId) {
-        this.serverURI = serverURI;
-        this.clientId = clientId;
-    }
-
-    /**
-     * 构造函数
-     *
-     * @param serverURI MQTT服务器地址,例如:tcp://localhost:1883
-     * @param clientId  客户端ID,需要唯一
-     * @param username  用户名
-     * @param password  密码
-     */
-    public MqttClientUtil(String serverURI, String clientId, String username, String password) {
-        this(serverURI, clientId);
-        this.username = username;
-        this.password = password;
-    }
-
-    /**
-     * 初始化MQTT客户端
-     *
-     * @throws MqttException MQTT异常
-     */
-    public void init() throws MqttException {
-        if (client == null) {
-            client = new MqttClient(serverURI, clientId, new MemoryPersistence());
-        }
-
-        options = new MqttConnectOptions();
-        options.setCleanSession(true);
-        options.setConnectionTimeout(30);
-        options.setKeepAliveInterval(60);
-
-        if (username != null && password != null) {
-            options.setUserName(username);
-            options.setPassword(password.toCharArray());
-        }
-
-        client.setCallback(new MqttCallbackExtended() {
-            @Override
-            public void connectComplete(boolean reconnect, String serverURI) {
-                log.info("MQTT客户端连接完成 - serverURI: {}, clientId: {}, reconnect: {}", serverURI, clientId, reconnect);
-            }
-
-            @Override
-            public void connectionLost(Throwable cause) {
-                log.error("MQTT客户端连接丢失 - clientId: {}", clientId, cause);
-            }
-
-            @Override
-            public void messageArrived(String topic, MqttMessage message) throws Exception {
-                log.info("MQTT客户端收到消息 - topic: {}, message: {}", topic, new String(message.getPayload(), StandardCharsets.UTF_8));
-            }
+@Component
+public class MqttClientUtil implements ApplicationContextAware {
+
+    private ApplicationContext applicationContext;
+    
+    @Autowired(required = false)
+    private MqttMessageForwardService mqttMessageForwardService;
+    
+    private final Map<String, MqttPahoMessageDrivenChannelAdapter> subscribers = new ConcurrentHashMap<>();
+    
+    private MqttPahoClientFactory mqttClientFactory;
+    
+    private String defaultClientId = "nb-netpump";
+    
+    private MessageChannel mqttInputChannel;
+    
+    @PostConstruct
+    public void init() {
+        // 创建默认的消息通道
+        mqttInputChannel = new DirectChannel();
+        
+        // 尝试从Spring容器中获取MqttPahoClientFactory
+        try {
+            mqttClientFactory = applicationContext.getBean(MqttPahoClientFactory.class);
+        } catch (Exception e) {
 
-            @Override
-            public void deliveryComplete(IMqttDeliveryToken token) {
-                log.debug("MQTT消息发送完成 - token: {}", token);
-            }
-        });
-    }
-
-    /**
-     * 连接到MQTT服务器
-     *
-     * @throws MqttException MQTT异常
-     */
-    public void connect() throws MqttException {
-        if (client == null) {
-            init();
-        }
-        if (!client.isConnected()) {
-            client.connect(options);
-            log.info("MQTT客户端连接成功 - serverURI: {}, clientId: {}", serverURI, clientId);
         }
     }
 
-    /**
-     * 异步连接到MQTT服务器
-     *
-     * @return CompletableFuture<Void>
-     */
-    public CompletableFuture<Void> connectAsync() {
-        return CompletableFuture.runAsync(() -> {
+    @PreDestroy
+    public void destroy() {
+        // 清理所有订阅者
+        subscribers.values().forEach(adapter -> {
             try {
-                connect();
-            } catch (MqttException e) {
-                log.error("MQTT客户端连接失败 - serverURI: {}, clientId: {}", serverURI, clientId, e);
-                throw new RuntimeException(e);
+                adapter.stop();
+            } catch (Exception e) {
+                // 忽略停止异常
             }
         });
+        subscribers.clear();
     }
-
+    
     /**
-     * 断开MQTT连接
-     *
-     * @throws MqttException MQTT异常
+     * 设置应用上下文
+     * @param applicationContext 应用上下文
      */
-    public void disconnect() throws MqttException {
-        if (client != null && client.isConnected()) {
-            client.disconnect();
-            log.info("MQTT客户端断开连接 - clientId: {}", clientId);
-        }
+    @Override
+    public void setApplicationContext(ApplicationContext applicationContext) {
+        this.applicationContext = applicationContext;
     }
-
-    /**
-     * 异步断开MQTT连接
-     *
-     * @return CompletableFuture<Void>
-     */
-    public CompletableFuture<Void> disconnectAsync() {
-        return CompletableFuture.runAsync(() -> {
-            try {
-                disconnect();
-            } catch (MqttException e) {
-                log.error("MQTT客户端断开连接失败 - clientId: {}", clientId, e);
-                throw new RuntimeException(e);
-            }
-        });
-    }
-
-    /**
-     * 关闭MQTT客户端
-     *
-     * @throws MqttException MQTT异常
-     */
-    public void close() throws MqttException {
-        if (client != null) {
-            if (client.isConnected()) {
-                client.disconnect();
-            }
-            client.close();
-            log.info("MQTT客户端已关闭 - clientId: {}", clientId);
-        }
-    }
-
-    /**
-     * 异步关闭MQTT客户端
-     *
-     * @return CompletableFuture<Void>
-     */
-    public CompletableFuture<Void> closeAsync() {
-        return CompletableFuture.runAsync(() -> {
-            try {
-                close();
-            } catch (MqttException e) {
-                log.error("MQTT客户端关闭失败 - clientId: {}", clientId, e);
-                throw new RuntimeException(e);
-            }
-        });
-    }
-
+    
     /**
      * 发布消息
-     *
-     * @param topic   主题
+     * @param topic 主题
      * @param payload 消息内容
-     * @param qos     服务质量等级(0,1,2)
-     * @param retained 是否保留消息
-     * @throws MqttException MQTT异常
      */
-    public void publish(String topic, String payload, int qos, boolean retained) throws MqttException {
-        MqttMessage message = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8));
-        message.setQos(qos);
-        message.setRetained(retained);
-        publish(topic, message);
+    public void publish(String topic, String payload) {
+        publish(topic, payload, 0, false);
     }
-
+    
     /**
      * 发布消息
-     *
-     * @param topic   主题
-     * @param message 消息
-     * @throws MqttException MQTT异常
+     * @param topic 主题
+     * @param payload 消息内容
+     * @param qos 服务质量等级 (0, 1, 2)
+     * @param retained 是否保留消息
      */
-    public void publish(String topic, MqttMessage message) throws MqttException {
-        checkConnected();
-        client.publish(topic, message);
-        log.debug("MQTT消息发布成功 - topic: {}", topic);
+    public void publish(String topic, String payload, int qos, boolean retained) {
+        try {
+            // 创建消息实体
+            MqttMessage message = new MqttMessage(topic, payload);
+            message.setClientId(defaultClientId + "-pub-" + System.currentTimeMillis());
+            // 确保客户端工厂已初始化
+            if (mqttClientFactory == null) {
+                return;
+            }
+            
+            MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(defaultClientId + "-pub-" + System.currentTimeMillis(), mqttClientFactory);
+            messageHandler.setDefaultTopic(topic);
+            messageHandler.setDefaultQos(qos);
+            messageHandler.setDefaultRetained(retained);
+            // 确保处理器已初始化
+            messageHandler.afterPropertiesSet();
+            
+            org.springframework.messaging.Message<String> springMessage = 
+                org.springframework.messaging.support.MessageBuilder.withPayload(JSONUtil.toJsonStr(message)).build();
+            messageHandler.handleMessage(springMessage);
+        } catch (Exception e) {
+            throw new RuntimeException("发布MQTT消息失败: " + e.getMessage(), e);
+        }
     }
-
+    
     /**
-     * 异步发布消息
-     *
-     * @param topic   主题
+     * 发布消息到指定客户端
+     * @param clientId 客户端ID
+     * @param topic 主题
      * @param payload 消息内容
-     * @param qos     服务质量等级(0,1,2)
+     * @param qos 服务质量等级
      * @param retained 是否保留消息
-     * @return CompletableFuture<Void>
      */
-    public CompletableFuture<Void> publishAsync(String topic, String payload, int qos, boolean retained) {
-        return CompletableFuture.runAsync(() -> {
-            try {
-                publish(topic, payload, qos, retained);
-            } catch (MqttException e) {
-                log.error("MQTT消息发布失败 - topic: {}", topic, e);
-                throw new RuntimeException(e);
+    public void publish(String clientId, String topic, String payload, int qos, boolean retained) {
+        try {
+            // 创建消息实体
+            MqttMessage message = new MqttMessage(topic, payload);
+            message.setClientId(clientId + "-pub-" + System.currentTimeMillis());
+            
+            // 如果配置了转发服务,则转发消息
+            if (mqttMessageForwardService != null) {
+                // 这里可以配置转发到其他服务的URL
+                // mqttMessageForwardService.forwardMessage(message, "http://other-service/api/mqtt/receive");
             }
-        });
+            if (mqttClientFactory == null) {
+                return;
+            }
+            
+            MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId + "-pub-" + System.currentTimeMillis(), mqttClientFactory);
+            messageHandler.setDefaultTopic(topic);
+            messageHandler.setDefaultQos(qos);
+            messageHandler.setDefaultRetained(retained);
+            // 确保处理器已初始化
+            messageHandler.afterPropertiesSet();
+            
+            org.springframework.messaging.Message<String> springMessage = 
+                org.springframework.messaging.support.MessageBuilder.withPayload(payload).build();
+            messageHandler.handleMessage(springMessage);
+        } catch (Exception e) {
+            throw new RuntimeException("发布MQTT消息失败: " + e.getMessage(), e);
+        }
     }
-
+    
     /**
      * 订阅主题
-     *
      * @param topic 主题
-     * @param qos   服务质量等级
-     * @throws MqttException MQTT异常
+     * @param messageHandler 消息处理器
      */
-    public void subscribe(String topic, int qos) throws MqttException {
-        checkConnected();
-        client.subscribe(topic, qos);
-        log.info("MQTT订阅主题成功 - topic: {}, qos: {}", topic, qos);
+    public void subscribe(String topic, MessageHandler messageHandler) {
+        subscribe(defaultClientId + "-sub-" + System.currentTimeMillis(), topic, messageHandler);
     }
-
+    
     /**
-     * 异步订阅主题
-     *
+     * 订阅主题
+     * @param clientId 客户端ID
      * @param topic 主题
-     * @param qos   服务质量等级
-     * @return CompletableFuture<Void>
+     * @param messageHandler 消息处理器
      */
-    public CompletableFuture<Void> subscribeAsync(String topic, int qos) {
-        return CompletableFuture.runAsync(() -> {
-            try {
-                subscribe(topic, qos);
-            } catch (MqttException e) {
-                log.error("MQTT订阅主题失败 - topic: {}", topic, e);
-                throw new RuntimeException(e);
+    public void subscribe(String clientId, String topic, MessageHandler messageHandler) {
+        try {
+            if (mqttClientFactory == null) {
+                return;
             }
-        });
+            // 如果已经订阅了该主题,则先取消订阅
+            String key = clientId + ":" + topic;
+            if (subscribers.containsKey(key)) {
+                unsubscribe(clientId, topic);
+            }
+            
+            MqttPahoMessageDrivenChannelAdapter adapter = 
+                new MqttPahoMessageDrivenChannelAdapter(clientId, mqttClientFactory, topic);
+            adapter.setConverter(new DefaultPahoMessageConverter());
+            adapter.setOutputChannel(mqttInputChannel);
+            
+            // 创建一个带有处理逻辑的通道
+            DirectChannel channel = new DirectChannel();
+            channel.subscribe(messageHandler);
+            adapter.setOutputChannel(channel);
+            
+            adapter.start();
+            
+            subscribers.put(key, adapter);
+        } catch (Exception e) {
+            throw new RuntimeException("订阅MQTT主题失败: " + e.getMessage(), e);
+        }
     }
-
+    
     /**
-     * 取消订阅主题
-     *
+     * 订阅主题(使用函数式接口处理消息)
      * @param topic 主题
-     * @throws MqttException MQTT异常
+     * @param messageConsumer 消息消费者
      */
-    public void unsubscribe(String topic) throws MqttException {
-        checkConnected();
-        client.unsubscribe(topic);
-        log.info("MQTT取消订阅主题成功 - topic: {}", topic);
+    public void subscribe(String topic, Consumer<String> messageConsumer) {
+        subscribe(defaultClientId + "-sub-" + System.currentTimeMillis(), topic, messageConsumer);
     }
-
+    
     /**
-     * 异步取消订阅主题
-     *
+     * 订阅主题(使用函数式接口处理消息)
+     * @param clientId 客户端ID
      * @param topic 主题
-     * @return CompletableFuture<Void>
+     * @param messageConsumer 消息消费者
      */
-    public CompletableFuture<Void> unsubscribeAsync(String topic) {
-        return CompletableFuture.runAsync(() -> {
-            try {
-                unsubscribe(topic);
-            } catch (MqttException e) {
-                log.error("MQTT取消订阅主题失败 - topic: {}", topic, e);
-                throw new RuntimeException(e);
+    public void subscribe(String clientId, String topic, Consumer<String> messageConsumer) {
+        MessageHandler handler = new MessageHandler() {
+            @Override
+            public void handleMessage(org.springframework.messaging.Message<?> message) throws MessagingException {
+                String payload = new String((byte[]) message.getPayload());
+                messageConsumer.accept(payload);
             }
-        });
+        };
+        subscribe(clientId, topic, handler);
     }
-
+    
     /**
-     * 检查客户端是否连接
-     *
-     * @throws MqttException MQTT异常
+     * 取消订阅
+     * @param clientId 客户端ID
+     * @param topic 主题
      */
-    private void checkConnected() throws MqttException {
-        if (client == null || !client.isConnected()) {
-            throw new MqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED);
+    public void unsubscribe(String clientId, String topic) {
+        String key = clientId + ":" + topic;
+        MqttPahoMessageDrivenChannelAdapter adapter = subscribers.get(key);
+        if (adapter != null) {
+            adapter.stop();
+            subscribers.remove(key);
         }
     }
-
-    /**
-     * 获取客户端ID
-     *
-     * @return 客户端ID
-     */
-    public String getClientId() {
-        return clientId;
-    }
-
-    /**
-     * 获取MQTT客户端
-     *
-     * @return MqttClient
-     */
-    public MqttClient getClient() {
-        return client;
-    }
-
-    /**
-     * 生成随机客户端ID
-     *
-     * @return 随机客户端ID
-     */
-    public static String generateClientId() {
-        return "mqtt_client_" + UUID.randomUUID().toString().replace("-", "");
-    }
 }

+ 58 - 35
nb-core/src/test/java/com/nb/core/utils/MqttClientUtilTest.java

@@ -1,57 +1,80 @@
 package com.nb.core.utils;
 
-import lombok.extern.slf4j.Slf4j;
-import org.eclipse.paho.client.mqttv3.MqttException;
+import cn.hutool.core.util.ReflectUtil;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.springframework.context.ApplicationContext;
+import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
+import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
 
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 /**
- * MQTT客户端工具类测试
+ * MqttClientUtil测试类
  *
- * @author lingma
+ * @author YourName
  * @version 1.0.0
- * @since 1.0.0
  */
-@Slf4j
 public class MqttClientUtilTest {
 
+    @Mock
+    private ApplicationContext applicationContext;
+
     @Test
-    public void testMqttClient() throws MqttException, ExecutionException, InterruptedException, TimeoutException {
-        // 创建MQTT客户端
-        MqttClientUtil mqttClient = new MqttClientUtil(
-                "tcp://broker.emqx.io:1883",
-                MqttClientUtil.generateClientId()
-        );
+    public void testPublish() {
+        MqttClientUtil mqttClientUtil = new MqttClientUtil();
+        mqttClientUtil.setApplicationContext(applicationContext);
+        ReflectUtil.setFieldValue(mqttClientUtil, "mqttClientFactory", createDefaultClientFactory());
 
+        // 测试发布消息
         try {
-            // 初始化客户端
-            mqttClient.init();
+            mqttClientUtil.publish("testtopic/123", "Hello MQTT");
+        } catch (Exception e) {
+            e.printStackTrace();
+            // 在测试环境中,由于没有实际的MQTT服务器,会抛出异常,这是预期的
+        }
+    }
 
-            // 连接服务器
-            CompletableFuture<Void> connectFuture = mqttClient.connectAsync();
-            connectFuture.get(10, TimeUnit.SECONDS); // 等待连接完成,最多10秒
+    @Test
+    public void testPublishWithParams() {
+        MqttClientUtil mqttClientUtil = new MqttClientUtil();
+        mqttClientUtil.setApplicationContext(applicationContext);
+        
+        // 测试发布消息(带参数)
+        try {
+            mqttClientUtil.publish("test/topic", "Hello MQTT", 1, false);
+        } catch (Exception e) {
+            // 在测试环境中,由于没有实际的MQTT服务器,会抛出异常,这是预期的
+        }
+    }
 
-            // 订阅主题
-            CompletableFuture<Void> subscribeFuture = mqttClient.subscribeAsync("test/topic", 1);
-            subscribeFuture.get(5, TimeUnit.SECONDS); // 等待订阅完成,最多5秒
+    private String defaultClientId = "nb-netpump";
 
-            // 发布消息
-            CompletableFuture<Void> publishFuture = mqttClient.publishAsync("test/topic", "Hello MQTT!", 1, false);
-            publishFuture.get(5, TimeUnit.SECONDS); // 等待发布完成,最多5秒
+    private String defaultBrokerUrl = "tcp://iot.tuoren.com:1883";
 
-            // 等待一段时间以接收消息
-            Thread.sleep(3000);
+    private String defaultUsername = "hospital";
 
-        } catch (Exception e) {
-            log.error("MQTT测试出现异常", e);
-        } finally {
-            // 关闭客户端
-            CompletableFuture<Void> closeFuture = mqttClient.closeAsync();
-            closeFuture.get(5, TimeUnit.SECONDS); // 等待关闭完成,最多5秒
+    private String defaultPassword = "1qaz!QAZ";
+    /**
+     * 创建默认的客户端工厂
+     * @return MqttPahoClientFactory
+     */
+    private MqttPahoClientFactory createDefaultClientFactory() {
+        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
+        MqttConnectOptions options = new MqttConnectOptions();
+        options.setServerURIs(new String[]{defaultBrokerUrl});
+        options.setCleanSession(true);
+        options.setConnectionTimeout(30);
+        options.setKeepAliveInterval(60);
+        if (defaultUsername != null && !defaultUsername.isEmpty()) {
+            options.setUserName(defaultUsername);
+        }
+        if (defaultPassword != null && !defaultPassword.isEmpty()) {
+            options.setPassword(defaultPassword.toCharArray());
         }
+        // 设置自动重连
+        options.setAutomaticReconnect(true);
+        factory.setConnectionOptions(options);
+        return factory;
     }
 }

+ 9 - 6
nb-service/web-service/src/main/java/com/nb/web/service/bus/service/LocalBusEvaluationService.java

@@ -138,12 +138,15 @@ public class LocalBusEvaluationService extends BaseService<BusEvaluationMapper,
     @Override
     public void postSave(BusEvaluationEntity entity) {
         if(StrUtil.isNotEmpty(entity.getClinicId())){
-            clinicService.update(new UpdateWrapper<BusClinicEntity>().lambda()
-                    .eq(BusClinicEntity::getId,entity.getClinicId())
-                    .set(BusClinicEntity::getEvalTime, Optional.ofNullable(entity.getEvaluateTime()).orElse(new Date())));
-            if(CharSequenceUtil.isNotBlank(entity.getPatientId())){
-                wsPublishUtils.publishPatientMonitor(entity.getPatientId(),entity.getTenantId());
-            }
+        String adverseReactions = AdverseReactionUtil.getAdverseReactions(entity);
+        String clinicId = entity.getClinicId();
+        BusClinicEntity clinic = clinicService.getById(clinicId);
+        clinic.setLastBadEval(adverseReactions);
+        clinic.setEvalTime(Optional.ofNullable(entity.getEvaluateTime()).orElse(new Date()));
+        clinicService.updateById(clinic);
+        if(CharSequenceUtil.isNotBlank(entity.getPatientId())){
+            wsPublishUtils.publishPatientMonitor(entity.getPatientId(),entity.getTenantId());
+        }
         }
     }