Ver código fonte

add
优化定时任务

lifang 3 semanas atrás
pai
commit
6ac51e8be2

+ 2 - 2
nb-auth/src/main/java/com/nb/auth/sa/SaTokenConfig.java

@@ -42,7 +42,6 @@ public class SaTokenConfig {
     private static final List<String> IGNORE_URL = new ArrayList<>();
 
     static {
-        IGNORE_URL.add("/iot/**");
         IGNORE_URL.add("/actuator/**");
         IGNORE_URL.add("/bus/version/page");
         IGNORE_URL.add("/druid/**");
@@ -69,8 +68,9 @@ public class SaTokenConfig {
         IGNORE_URL.add("/system/sysDept/**");
         IGNORE_URL.add("/assist/phone/**");
         IGNORE_URL.add("/bus/device/info/hospitalCode");
+        IGNORE_URL.add("/bus/device/info/hospitalCodeAndAlias");
         IGNORE_URL.add("/iot/**");
-        IGNORE_URL.add("/bus/yixinqian/get/**");
+        IGNORE_URL.add("/mqtt/**");
     }
 
 }

+ 9 - 0
nb-core/pom.xml

@@ -14,6 +14,10 @@
     <description>核心包,包含了一些基础信息</description>
 
     <dependencies>
+        <dependency>
+            <groupId>org.springframework.integration</groupId>
+            <artifactId>spring-integration-mqtt</artifactId>
+        </dependency>
         <dependency>
             <groupId>com.aliyun</groupId>
             <artifactId>dysmsapi20170525</artifactId>
@@ -66,6 +70,11 @@
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-aop</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-test</artifactId>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>cn.hutool</groupId>
             <artifactId>hutool-all</artifactId>

+ 52 - 0
nb-core/src/main/java/com/nb/core/config/MqttConfig.java

@@ -0,0 +1,52 @@
+package com.nb.core.config;
+
+import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
+import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+
+/**
+ * MQTT配置类
+ * 提供默认的MQTT客户端工厂配置
+ *
+ * @author YourName
+ * @version 1.0.0
+ */
+@Configuration
+public class MqttConfig {
+    /**
+     * 创建默认的MQTT客户端工厂
+     * 当容器中没有MqttPahoClientFactory时创建
+     *
+     * @param mqttProperties MQTT配置属性
+     * @return MqttPahoClientFactory
+     */
+    @Bean
+    @ConditionalOnBean(MqttProperties.class)
+    public MqttPahoClientFactory mqttClientFactory(MqttProperties mqttProperties) {
+        if(mqttProperties==null||mqttProperties.getBroker()==null){
+            return null;
+        }
+        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
+        MqttConnectOptions options = new MqttConnectOptions();
+        options.setServerURIs(new String[]{mqttProperties.getBroker().getUrl()});
+        options.setCleanSession(mqttProperties.getConnectOptions().isCleanSession());
+        options.setConnectionTimeout(mqttProperties.getConnectOptions().getConnectionTimeout());
+        options.setKeepAliveInterval(mqttProperties.getConnectOptions().getKeepAliveInterval());
+        if (mqttProperties.getBroker().getUsername() != null && !mqttProperties.getBroker().getUsername().isEmpty()) {
+            options.setUserName(mqttProperties.getBroker().getUsername());
+        }
+        if (mqttProperties.getBroker().getPassword() != null && !mqttProperties.getBroker().getPassword().isEmpty()) {
+            options.setPassword(mqttProperties.getBroker().getPassword().toCharArray());
+        }
+        // 设置自动重连
+        options.setAutomaticReconnect(mqttProperties.getConnectOptions().isAutomaticReconnect());
+        // 设置重连相关参数
+        options.setMaxReconnectDelay(mqttProperties.getConnectOptions().getMaxReconnectDelay());
+        factory.setConnectionOptions(options);
+        return factory;
+    }
+}

+ 38 - 0
nb-core/src/main/java/com/nb/core/config/MqttHandlerConfig.java

@@ -0,0 +1,38 @@
+package com.nb.core.config;
+
+import com.nb.core.handler.MqttMessageHandlerManager;
+import org.springframework.context.annotation.Configuration;
+
+import javax.annotation.PostConstruct;
+
+/**
+ * MQTT处理器配置类
+ * 确保处理器管理器正确初始化
+ *
+ * @author YourName
+ * @version 1.0.0
+ */
+@Configuration
+public class MqttHandlerConfig {
+    
+    private final MqttMessageHandlerManager handlerManager;
+    
+    public MqttHandlerConfig(MqttMessageHandlerManager handlerManager) {
+        this.handlerManager = handlerManager;
+    }
+    
+    @PostConstruct
+    public void init() {
+        // 确保处理器管理器已正确初始化
+        if (MqttMessageHandlerManager.getInstance() == null) {
+            try {
+                // 通过反射设置实例(备用方案)
+                java.lang.reflect.Field instanceField = MqttMessageHandlerManager.class.getDeclaredField("instance");
+                instanceField.setAccessible(true);
+                instanceField.set(null, handlerManager);
+            } catch (Exception e) {
+                // 忽略异常
+            }
+        }
+    }
+}

+ 193 - 0
nb-core/src/main/java/com/nb/core/config/MqttProperties.java

@@ -0,0 +1,193 @@
+package com.nb.core.config;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+/**
+ * MQTT配置属性类
+ * 用于从配置文件中读取MQTT相关配置
+ *
+ * @author YourName
+ * @version 1.0.0
+ */
+@Component
+@ConfigurationProperties(prefix = "mqtt")
+public class MqttProperties {
+    
+    /**
+     * 客户端配置
+     */
+    private Client client = null;
+    
+    /**
+     * 代理配置
+     */
+    private Broker broker = null;
+    
+    /**
+     * 连接选项
+     */
+    private ConnectOptions connectOptions = null;
+    
+    public static class Client {
+        /**
+         * 客户端ID
+         */
+        private String id ;
+        
+        public String getId() {
+            return id;
+        }
+        
+        public void setId(String id) {
+            this.id = id;
+        }
+    }
+    
+    public static class Broker {
+        /**
+         * 代理URL
+         */
+        private String url ;
+        
+        /**
+         * 用户名
+         */
+        private String username;
+        
+        /**
+         * 密码
+         */
+        private String password;
+        
+        public String getUrl() {
+            return url;
+        }
+        
+        public void setUrl(String url) {
+            this.url = url;
+        }
+        
+        public String getUsername() {
+            return username;
+        }
+        
+        public void setUsername(String username) {
+            this.username = username;
+        }
+        
+        public String getPassword() {
+            return password;
+        }
+        
+        public void setPassword(String password) {
+            this.password = password;
+        }
+    }
+    
+    public static class ConnectOptions {
+        /**
+         * 清理会话
+         */
+        private boolean cleanSession = true;
+        
+        /**
+         * 连接超时时间(秒)
+         */
+        private int connectionTimeout = 30;
+        
+        /**
+         * 保持连接间隔(秒)
+         */
+        private int keepAliveInterval = 60;
+        
+        /**
+         * 自动重连
+         */
+        private boolean automaticReconnect = true;
+        
+        /**
+         * 重连间隔(毫秒)
+         */
+        private int reconnectDelay = 5000;
+        
+        /**
+         * 最大重连延迟(毫秒)
+         */
+        private int maxReconnectDelay = 60000;
+        
+        public boolean isCleanSession() {
+            return cleanSession;
+        }
+        
+        public void setCleanSession(boolean cleanSession) {
+            this.cleanSession = cleanSession;
+        }
+        
+        public int getConnectionTimeout() {
+            return connectionTimeout;
+        }
+        
+        public void setConnectionTimeout(int connectionTimeout) {
+            this.connectionTimeout = connectionTimeout;
+        }
+        
+        public int getKeepAliveInterval() {
+            return keepAliveInterval;
+        }
+        
+        public void setKeepAliveInterval(int keepAliveInterval) {
+            this.keepAliveInterval = keepAliveInterval;
+        }
+        
+        public boolean isAutomaticReconnect() {
+            return automaticReconnect;
+        }
+        
+        public void setAutomaticReconnect(boolean automaticReconnect) {
+            this.automaticReconnect = automaticReconnect;
+        }
+        
+        public int getReconnectDelay() {
+            return reconnectDelay;
+        }
+        
+        public void setReconnectDelay(int reconnectDelay) {
+            this.reconnectDelay = reconnectDelay;
+        }
+        
+        public int getMaxReconnectDelay() {
+            return maxReconnectDelay;
+        }
+        
+        public void setMaxReconnectDelay(int maxReconnectDelay) {
+            this.maxReconnectDelay = maxReconnectDelay;
+        }
+    }
+    
+    // Getters and Setters
+    
+    public Client getClient() {
+        return client;
+    }
+    
+    public void setClient(Client client) {
+        this.client = client;
+    }
+    
+    public Broker getBroker() {
+        return broker;
+    }
+    
+    public void setBroker(Broker broker) {
+        this.broker = broker;
+    }
+    
+    public ConnectOptions getConnectOptions() {
+        return connectOptions;
+    }
+    
+    public void setConnectOptions(ConnectOptions connectOptions) {
+        this.connectOptions = connectOptions;
+    }
+}

+ 51 - 0
nb-core/src/main/java/com/nb/core/controller/MqttMessageController.java

@@ -0,0 +1,51 @@
+package com.nb.core.controller;
+
+import com.nb.core.entity.MqttMessage;
+import com.nb.core.result.R;
+import com.nb.core.service.MqttMessageProcessService;
+import com.nb.core.utils.MqttClientUtil;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.web.bind.annotation.*;
+
+/**
+ * MQTT消息控制器
+ * 提供REST API用于测试和管理MQTT消息处理
+ *
+ * @author YourName
+ * @version 1.0.0
+ */
+@Slf4j
+@RestController
+@RequestMapping("/mqtt")
+@AllArgsConstructor
+public class MqttMessageController {
+    
+    private final MqttMessageProcessService messageProcessService;
+    private final MqttClientUtil mqttClientUtil;
+
+    
+    /**
+     * 处理MQTT消息
+     * @param message MQTT消息
+     * @return 处理结果
+     */
+    @PostMapping("/process")
+    public R<String> processMessage(@RequestBody MqttMessage message) {
+        try {
+            //订阅和发布只能同时存在一个
+            if(mqttClientUtil.isOnline()){
+                //发布存在则就不接受订阅消息
+                return R.fail("本节点是客户端节点,不予处理订阅消息");
+            }
+            String clientId = message.getClientId();
+            if(MqttClientUtil.CLIENT_ID.equals(clientId)){
+                return R.fail("本节点发出的数据不予处理");
+            }
+            messageProcessService.processMessage(message);
+            return R.success("消息处理成功");
+        } catch (Exception e) {
+            return R.fail("处理MQTT消息失败"+ e.getMessage());
+        }
+    }
+}

+ 67 - 0
nb-core/src/main/java/com/nb/core/entity/MqttMessage.java

@@ -0,0 +1,67 @@
+package com.nb.core.entity;
+
+import com.nb.core.handler.MqttMessageHandlerManager;
+import lombok.Data;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+
+import java.io.Serializable;
+import java.util.Date;
+import java.util.Map;
+
+/**
+ * MQTT消息实体类
+ * 用于统一消息格式,支持通过HTTP转发给其他服务
+ *
+ * @author YourName
+ * @version 1.0.0
+ */
+@Data
+public class MqttMessage implements Serializable {
+    private static final long serialVersionUID = 1L;
+    /**
+     * 主题
+     */
+    private String topic;
+    
+    /**
+     * 消息内容
+     */
+    private String payload;
+    
+    /**
+     * 客户端ID
+     */
+    private String clientId;
+    
+    /**
+     * 时间戳
+     */
+    private Date timestamp;
+    
+    /**
+     * 消息头信息
+     */
+    private Map<String, Object> headers;
+    
+    /**
+     * 消息类型
+     */
+    private String messageType;
+    
+    /**
+     * 来源服务
+     */
+    private String sourceService;
+    
+    public MqttMessage() {
+        this.timestamp = new Date();
+    }
+    
+    public MqttMessage(String topic, String payload) {
+        this();
+        this.topic = topic;
+        this.payload = payload;
+    }
+}

+ 16 - 0
nb-core/src/main/java/com/nb/core/enums/MqttTopicEnum.java

@@ -0,0 +1,16 @@
+package com.nb.core.enums;
+
+/**
+ * 数据下发时的mqtt主题类
+ */
+public interface MqttTopicEnum {
+    /**
+     * 评价信息下发
+     */
+    String BUS_EVALUATION = "bus/evaluation";
+
+    /**
+     * 撤泵信息下发
+     */
+    String BUS_UNDO = "bus/undo";
+}

+ 44 - 0
nb-core/src/main/java/com/nb/core/handler/AbstractMqttMessageHandler.java

@@ -0,0 +1,44 @@
+package com.nb.core.handler;
+
+import com.nb.core.entity.MqttMessage;
+import org.springframework.util.AntPathMatcher;
+
+/**
+ * 抽象MQTT消息处理器
+ * 提供基于主题模式匹配的基础实现
+ *
+ * @author YourName
+ * @version 1.0.0
+ */
+public abstract class AbstractMqttMessageHandler implements MqttMessageHandler {
+    
+    private static final AntPathMatcher pathMatcher = new AntPathMatcher();
+    
+    /**
+     * 定义支持的主题模式
+     * 子类需要实现此方法返回支持的主题模式
+     * @return 支持的主题模式
+     */
+    protected abstract String getSupportedTopicPattern();
+    
+    /**
+     * 判断是否支持处理指定主题的消息
+     * @param topic MQTT主题
+     * @return 是否支持处理
+     */
+    @Override
+    public boolean supports(String topic) {
+        String pattern = getSupportedTopicPattern();
+        if (pattern == null || topic == null) {
+            return false;
+        }
+        return pathMatcher.match(pattern, topic);
+    }
+    
+    /**
+     * 处理MQTT消息
+     * @param message MQTT消息实体
+     */
+    @Override
+    public abstract void handle(MqttMessage message);
+}

+ 26 - 0
nb-core/src/main/java/com/nb/core/handler/MqttMessageHandler.java

@@ -0,0 +1,26 @@
+package com.nb.core.handler;
+
+import com.nb.core.entity.MqttMessage;
+
+/**
+ * MQTT消息处理器接口
+ * 定义处理MQTT消息的标准方法
+ *
+ * @author YourName
+ * @version 1.0.0
+ */
+public interface MqttMessageHandler {
+    
+    /**
+     * 处理MQTT消息
+     * @param message MQTT消息实体
+     */
+    void handle(MqttMessage message);
+    
+    /**
+     * 判断是否支持处理指定主题的消息
+     * @param topic MQTT主题
+     * @return 是否支持处理
+     */
+    boolean supports(String topic);
+}

+ 130 - 0
nb-core/src/main/java/com/nb/core/handler/MqttMessageHandlerManager.java

@@ -0,0 +1,130 @@
+package com.nb.core.handler;
+
+import com.nb.core.entity.MqttMessage;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * MQTT消息处理器管理器
+ * 负责管理和调度不同的消息处理器
+ *
+ * @author YourName
+ * @version 1.0.0
+ */
+@Slf4j
+@Component
+public class MqttMessageHandlerManager {
+    
+    // 添加单例实例引用
+    private static MqttMessageHandlerManager instance;
+    
+    @Autowired(required = false)
+    private List<MqttMessageHandler> handlers = new CopyOnWriteArrayList<>();
+    
+    private final Map<String, MqttMessageHandler> topicHandlerMap = new ConcurrentHashMap<>();
+    
+    private final ExecutorService executorService = Executors.newFixedThreadPool(10);
+    
+    @PostConstruct
+    public void init() {
+        instance = this;
+        log.info("MQTT消息处理器管理器初始化完成,已注册{}个处理器", handlers.size());
+    }
+    
+    /**
+     * 获取单例实例
+     * @return MqttMessageHandlerManager实例
+     */
+    public static MqttMessageHandlerManager getInstance() {
+        return instance;
+    }
+    
+    /**
+     * 处理MQTT消息
+     * 根据主题找到对应的处理器并处理消息
+     * @param message MQTT消息实体
+     */
+    public void handleMessage(MqttMessage message) {
+        if (message == null || message.getTopic() == null) {
+            log.warn("接收到无效的MQTT消息: {}", message);
+            return;
+        }
+        
+        String topic = message.getTopic();
+        
+        // 先从缓存中查找处理器
+        MqttMessageHandler handler = topicHandlerMap.get(topic);
+        if (handler != null) {
+            executeHandler(handler, message);
+            return;
+        }
+        
+        // 遍历所有处理器查找支持该主题的处理器
+        for (MqttMessageHandler messageHandler : handlers) {
+            if (messageHandler.supports(topic)) {
+                topicHandlerMap.put(topic, messageHandler);
+                executeHandler(messageHandler, message);
+                return;
+            }
+        }
+        
+        log.warn("未找到处理主题[{}]的处理器", topic);
+    }
+    
+    /**
+     * 异步执行处理器
+     * @param handler 消息处理器
+     * @param message MQTT消息
+     */
+    private void executeHandler(MqttMessageHandler handler, MqttMessage message) {
+        executorService.submit(() -> {
+            try {
+                handler.handle(message);
+            } catch (Exception e) {
+                log.error("处理MQTT消息时发生错误,主题: {}, 内容: {}", 
+                         message.getTopic(), message.getPayload(), e);
+            }
+        });
+    }
+    
+    /**
+     * 注册消息处理器
+     * @param handler 消息处理器
+     */
+    public void registerHandler(MqttMessageHandler handler) {
+        if (handler != null) {
+            handlers.add(handler);
+            log.info("注册MQTT消息处理器: {}", handler.getClass().getSimpleName());
+        }
+    }
+    
+    /**
+     * 移除消息处理器
+     * @param handler 消息处理器
+     */
+    public void unregisterHandler(MqttMessageHandler handler) {
+        if (handler != null) {
+            handlers.remove(handler);
+            // 清理缓存中的映射
+            topicHandlerMap.entrySet().removeIf(entry -> entry.getValue() == handler);
+            log.info("移除MQTT消息处理器: {}", handler.getClass().getSimpleName());
+        }
+    }
+    
+    /**
+     * 获取当前注册的处理器数量
+     * @return 处理器数量
+     */
+    public int getHandlerCount() {
+        return handlers.size();
+    }
+}

+ 64 - 0
nb-core/src/main/java/com/nb/core/service/MqttMessageProcessService.java

@@ -0,0 +1,64 @@
+package com.nb.core.service;
+
+import com.nb.core.entity.MqttMessage;
+import com.nb.core.handler.MqttMessageHandlerManager;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+
+/**
+ * MQTT消息处理服务
+ * 提供统一的消息处理接口
+ *
+ * @author YourName
+ * @version 1.0.0
+ */
+@Slf4j
+@Service
+public class MqttMessageProcessService {
+    
+    private final MqttMessageHandlerManager handlerManager;
+    
+    public MqttMessageProcessService(MqttMessageHandlerManager handlerManager) {
+        this.handlerManager = handlerManager;
+    }
+    
+    /**
+     * 处理MQTT消息
+     * @param message MQTT消息
+     */
+    public void processMessage(MqttMessage message) {
+        if (message == null) {
+            log.warn("尝试处理空的MQTT消息");
+            return;
+        }
+        
+        try {
+            handlerManager.handleMessage(message);
+        } catch (Exception e) {
+            log.error("处理MQTT消息时发生异常,主题: {}, 内容: {}", 
+                     message.getTopic(), message.getPayload(), e);
+        }
+    }
+    
+    /**
+     * 根据主题和内容创建并处理MQTT消息
+     * @param topic 主题
+     * @param payload 消息内容
+     */
+    public void processMessage(String topic, String payload) {
+        MqttMessage message = new MqttMessage(topic, payload);
+        processMessage(message);
+    }
+    
+    /**
+     * 根据主题和内容创建并处理MQTT消息
+     * @param topic 主题
+     * @param payload 消息内容
+     * @param clientId 客户端ID
+     */
+    public void processMessage(String topic, String payload, String clientId) {
+        MqttMessage message = new MqttMessage(topic, payload);
+        message.setClientId(clientId);
+        processMessage(message);
+    }
+}

+ 2 - 2
nb-core/src/main/java/com/nb/core/sms/AliSmsClientConfig.java

@@ -12,13 +12,13 @@ import com.aliyun.teaopenapi.models.*;
  * @Description TODO
  * @createTime 2022年10月12日 10:52:00
  */
-//@Configuration
+@Configuration
 public class AliSmsClientConfig {
     @Value("${aliyun.accessKey}")
     private String accessKeyId;
     @Value("${aliyun.accessSecret}")
     private String accessKeySecret;
-//    @Bean
+    @Bean
     public Client creatClient() throws Exception {
         com.aliyun.teaopenapi.models.Config config = new com.aliyun.teaopenapi.models.Config()
                 // 您的 AccessKey ID

+ 1 - 1
nb-core/src/main/java/com/nb/core/sms/SmsHelper.java

@@ -17,7 +17,7 @@ import java.util.Map;
  * @Description TODO
  * @createTime 2022年10月13日 09:28:00
  */
-//@Configuration
+@Configuration
 @AllArgsConstructor
 public class SmsHelper {
 

+ 1 - 1
nb-core/src/main/java/com/nb/core/utils/ExceptionUtil.java

@@ -8,7 +8,7 @@ package com.nb.core.utils;
  * @createTime 2022年07月04日 13:49:00
  */
 public class ExceptionUtil {
-    public static final int dept=50;
+    public static final int dept=10;
     public static String getExceptionMsg(Throwable e){
         StringBuffer result=new StringBuffer(e.toString()).append("\n");
         StackTraceElement[] stackTrace = e.getStackTrace();

+ 210 - 0
nb-core/src/main/java/com/nb/core/utils/MqttClientUtil.java

@@ -0,0 +1,210 @@
+package com.nb.core.utils;
+
+import cn.hutool.core.util.ObjectUtil;
+import cn.hutool.json.JSONUtil;
+import com.nb.core.entity.MqttMessage;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
+import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
+import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
+import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.MessageHandler;
+import org.springframework.messaging.MessagingException;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.stereotype.Component;
+import org.springframework.scheduling.annotation.Async;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+/**
+ * MQTT客户端工具类
+ * 提供MQTT连接、消息发布、订阅等功能
+ *
+ * @author YourName
+ * @version 1.0.0
+ */
+@Component
+public class MqttClientUtil implements ApplicationContextAware {
+    public static final String CLIENT_ID = "nb-netpump"+"-"+System.currentTimeMillis();
+    private ApplicationContext applicationContext;
+
+    private final Map<String, MqttPahoMessageDrivenChannelAdapter> subscribers = new ConcurrentHashMap<>();
+    
+    private MqttPahoClientFactory mqttClientFactory;
+
+    private MessageChannel mqttInputChannel;
+
+    public Boolean isOnline(){
+        return ObjectUtil.isNotNull(mqttClientFactory);
+    }
+
+    @PostConstruct
+    public void init() {
+        // 创建默认的消息通道
+        mqttInputChannel = new DirectChannel();
+        // 尝试从Spring容器中获取MqttPahoClientFactory
+        try {
+            mqttClientFactory = applicationContext.getBean(MqttPahoClientFactory.class);
+        } catch (Exception e) {
+            mqttClientFactory=null;
+            // 忽略异常,如果获取不到工厂则无法进行MQTT操作
+        }
+    }
+
+    @PreDestroy
+    public void destroy() {
+        // 清理所有订阅者
+        subscribers.values().forEach(adapter -> {
+            try {
+                adapter.stop();
+            } catch (Exception e) {
+                // 忽略停止异常
+            }
+        });
+        subscribers.clear();
+    }
+    
+    /**
+     * 设置应用上下文
+     * @param applicationContext 应用上下文
+     */
+    @Override
+    public void setApplicationContext(ApplicationContext applicationContext) {
+        this.applicationContext = applicationContext;
+    }
+    
+    /**
+     * 发布消息
+     * @param topic 主题
+     * @param payload 消息内容
+     */
+    public void publish(String hospitalCode,String topic, Object payload) {
+        publish(hospitalCode,topic, payload, 0, false);
+    }
+    
+    /**
+     * 发布消息到指定客户端
+     * @param topic 主题
+     * @param payload 消息内容
+     * @param qos 服务质量等级
+     * @param retained 是否保留消息
+     */
+    public void publish(String hospitalCode, String topic, Object payload, int qos, boolean retained) {
+        try {
+            if (mqttClientFactory == null) {
+                return;
+            }
+            
+            // 创建消息实体
+            MqttMessage message = new MqttMessage(topic, JSONUtil.toJsonStr(payload));
+            message.setClientId(CLIENT_ID);
+            
+            MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(CLIENT_ID, mqttClientFactory);
+            messageHandler.setDefaultTopic("hospitalInfo/"+hospitalCode);
+            messageHandler.setDefaultQos(qos);
+            messageHandler.setDefaultRetained(retained);
+            // 确保处理器已初始化
+            messageHandler.afterPropertiesSet();
+            
+            org.springframework.messaging.Message<String> springMessage =
+                org.springframework.messaging.support.MessageBuilder.withPayload(JSONUtil.toJsonStr(message)).build();
+            messageHandler.handleMessage(springMessage);
+        } catch (Exception e) {
+            throw new RuntimeException("发布MQTT消息失败: " + e.getMessage(), e);
+        }
+    }
+
+    
+    /**
+     * 订阅主题
+     * @param topic 主题
+     * @param messageHandler 消息处理器
+     */
+    public void subscribe(String topic, MessageHandler messageHandler) {
+        try {
+            if (mqttClientFactory == null) {
+                return;
+            }
+            // 如果已经订阅了该主题,则先取消订阅
+            if (subscribers.containsKey(topic)) {
+                unsubscribe(topic);
+            }
+            
+            MqttPahoMessageDrivenChannelAdapter adapter = 
+                new MqttPahoMessageDrivenChannelAdapter(CLIENT_ID, mqttClientFactory, topic);
+            adapter.setConverter(new DefaultPahoMessageConverter());
+            adapter.setOutputChannel(mqttInputChannel);
+            
+            // 创建一个带有处理逻辑的通道
+            DirectChannel channel = new DirectChannel();
+            channel.subscribe(messageHandler);
+            adapter.setOutputChannel(channel);
+            
+            adapter.start();
+            
+            subscribers.put(topic, adapter);
+        } catch (Exception e) {
+            throw new RuntimeException("订阅MQTT主题失败: " + e.getMessage(), e);
+        }
+    }
+    
+    /**
+     * 订阅主题(使用函数式接口处理消息)
+     * @param topic 主题
+     * @param messageConsumer 消息消费者
+     */
+    public void subscribe(String topic, Consumer<String> messageConsumer) {
+        MessageHandler handler = new MessageHandler() {
+            @Override
+            public void handleMessage(org.springframework.messaging.Message<?> message) throws MessagingException {
+                String payload = new String((byte[]) message.getPayload());
+                messageConsumer.accept(payload);
+            }
+        };
+        subscribe(topic, handler);
+    }
+    
+    /**
+     * 取消订阅
+     * @param topic 主题
+     */
+    public void unsubscribe(String topic) {
+        MqttPahoMessageDrivenChannelAdapter adapter = subscribers.get(topic);
+        if (adapter != null) {
+            try {
+                adapter.stop();
+            } catch (Exception e) {
+                // 忽略停止异常
+            }
+            subscribers.remove(topic);
+        }
+    }
+
+    /**
+     * 异步发布消息
+     * @param topic 主题
+     * @param payload 消息内容
+     */
+    @Async
+    public void asyncPublish(String hospitalCode,String topic, Object payload) {
+        publish(hospitalCode,topic, payload);
+    }
+
+    /**
+     * 异步发布消息到指定客户端
+     * @param topic 主题
+     * @param payload 消息内容
+     * @param qos 服务质量等级
+     * @param retained 是否保留消息
+     */
+    @Async
+    public void asyncPublish(String hospitalCode, String topic, String payload, int qos, boolean retained) {
+        publish(hospitalCode, topic, payload, qos, retained);
+    }
+}