瀏覽代碼

fix
慢sql

lifang 2 周之前
父節點
當前提交
5d793a672f
共有 1 個文件被更改,包括 65 次插入29 次删除
  1. 65 29
      nb-core/src/main/java/com/nb/core/utils/MqttClientUtil.java

+ 65 - 29
nb-core/src/main/java/com/nb/core/utils/MqttClientUtil.java

@@ -32,16 +32,19 @@ import java.util.function.Consumer;
  */
 @Component
 public class MqttClientUtil implements ApplicationContextAware {
-    public static final String CLIENT_ID = "nb-netpump"+"-"+System.currentTimeMillis();
+    public static final String CLIENT_ID = "nb-netpump" + "-" + System.currentTimeMillis();
     private ApplicationContext applicationContext;
 
     private final Map<String, MqttPahoMessageDrivenChannelAdapter> subscribers = new ConcurrentHashMap<>();
-    
+
+    // 添加缓存MQTT消息处理器的Map
+    private final Map<String, MqttPahoMessageHandler> handlerCache = new ConcurrentHashMap<>();
+
     private MqttPahoClientFactory mqttClientFactory;
 
     private MessageChannel mqttInputChannel;
 
-    public Boolean isOnline(){
+    public Boolean isOnline() {
         return ObjectUtil.isNotNull(mqttClientFactory);
     }
 
@@ -53,7 +56,7 @@ public class MqttClientUtil implements ApplicationContextAware {
         try {
             mqttClientFactory = applicationContext.getBean(MqttPahoClientFactory.class);
         } catch (Exception e) {
-            mqttClientFactory=null;
+            mqttClientFactory = null;
             // 忽略异常,如果获取不到工厂则无法进行MQTT操作
         }
     }
@@ -69,31 +72,67 @@ public class MqttClientUtil implements ApplicationContextAware {
             }
         });
         subscribers.clear();
+
+        // 清理缓存的消息处理器
+        handlerCache.values().forEach(handler -> {
+            try {
+                handler.destroy();
+            } catch (Exception e) {
+                // 忽略销毁异常
+            }
+        });
+        handlerCache.clear();
     }
-    
+
     /**
      * 设置应用上下文
+     *
      * @param applicationContext 应用上下文
      */
     @Override
     public void setApplicationContext(ApplicationContext applicationContext) {
         this.applicationContext = applicationContext;
     }
-    
+
+    /**
+     * 获取或创建MQTT消息处理器
+     *
+     * @param topic 客户端ID
+     * @return MqttPahoMessageHandler
+     */
+    private MqttPahoMessageHandler getOrCreateHandler(String topic) {
+        return handlerCache.computeIfAbsent(CLIENT_ID+topic, key -> {
+            MqttPahoMessageHandler handler = new MqttPahoMessageHandler(key, mqttClientFactory);
+            handler.setAsync(true);
+            handler.setAsyncEvents(true);
+            handler.setDefaultQos(0);
+            handler.setDefaultRetained(false);
+            handler.setDefaultTopic(topic);
+            try {
+                handler.afterPropertiesSet();
+            } catch (Exception e) {
+                throw new RuntimeException("Failed to initialize MQTT handler", e);
+            }
+            return handler;
+        });
+    }
+
     /**
      * 发布消息
-     * @param topic 主题
+     *
+     * @param topic   主题
      * @param payload 消息内容
      */
-    public void publish(String hospitalCode,String topic, Object payload) {
-        publish(hospitalCode,topic, payload, 0, false);
+    public void publish(String hospitalCode, String topic, Object payload) {
+        publish(hospitalCode, topic, payload, 0, false);
     }
-    
+
     /**
      * 发布消息到指定客户端
-     * @param topic 主题
-     * @param payload 消息内容
-     * @param qos 服务质量等级
+     *
+     * @param topic    主题
+     * @param payload  消息内容
+     * @param qos      服务质量等级
      * @param retained 是否保留消息
      */
     public void publish(String hospitalCode, String topic, Object payload, int qos, boolean retained) {
@@ -101,21 +140,16 @@ public class MqttClientUtil implements ApplicationContextAware {
             if (mqttClientFactory == null) {
                 return;
             }
-            
+            String clientTopic="hospitalInfo/" + hospitalCode;
             // 创建消息实体
             MqttMessage message = new MqttMessage(topic, HexUtil.encodeHexStr(JSONUtil.toJsonStr(payload)));
             message.setClientId(CLIENT_ID);
-            
-            MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(CLIENT_ID, mqttClientFactory);
-            messageHandler.setDefaultTopic("hospitalInfo/"+hospitalCode);
-            messageHandler.setDefaultQos(qos);
-            messageHandler.setDefaultRetained(retained);
-            // 确保处理器已初始化
-            messageHandler.afterPropertiesSet();
-            
+            // 使用缓存的处理器而不是每次都创建新的
+            MqttPahoMessageHandler messageHandler = getOrCreateHandler(clientTopic);
             org.springframework.messaging.Message<String> springMessage =
-                org.springframework.messaging.support.MessageBuilder.withPayload(JSONUtil.toJsonStr(message)).build();
+                    org.springframework.messaging.support.MessageBuilder.withPayload(JSONUtil.toJsonStr(message)).build();
             messageHandler.handleMessage(springMessage);
+            // 移除destroy调用,让处理器可以被复用
         } catch (Exception e) {
             throw new RuntimeException("发布MQTT消息失败: " + e.getMessage(), e);
         }
@@ -189,19 +223,21 @@ public class MqttClientUtil implements ApplicationContextAware {
 
     /**
      * 异步发布消息
-     * @param topic 主题
+     *
+     * @param topic   主题
      * @param payload 消息内容
      */
     @Async
-    public void asyncPublish(String hospitalCode,String topic, Object payload) {
-        publish(hospitalCode,topic, payload);
+    public void asyncPublish(String hospitalCode, String topic, Object payload) {
+        publish(hospitalCode, topic, payload);
     }
 
     /**
      * 异步发布消息到指定客户端
-     * @param topic 主题
-     * @param payload 消息内容
-     * @param qos 服务质量等级
+     *
+     * @param topic    主题
+     * @param payload  消息内容
+     * @param qos      服务质量等级
      * @param retained 是否保留消息
      */
     @Async