Sfoglia il codice sorgente

add
设备历史数据、报警数据、修改数据改为批量存储

lifang 4 settimane fa
parent
commit
4d9df78d14

+ 47 - 9
nb-common/delay-queue-common/src/main/java/com/nb/common/queue/delay/RedissonDelayMessageManager.java

@@ -16,6 +16,7 @@ import org.redisson.api.RedissonClient;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.autoconfigure.AutoConfigureAfter;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.annotation.Async;
 
 import javax.annotation.PostConstruct;
 import java.util.*;
@@ -32,34 +33,61 @@ import java.util.concurrent.TimeUnit;
 @AutoConfigureAfter({RedissonClient.class})
 public class RedissonDelayMessageManager implements DelayMessageManager {
 
-    private final RBlockingQueue<DelayMessage> blockingQueue;
+    private RBlockingQueue<DelayMessage> blockingQueue;
 
-    private final RDelayedQueue<DelayMessage> delayedQueue;
+    private RDelayedQueue<DelayMessage> delayedQueue;
 
     private final List<DelayMessageHandler> handlers;
+    
+    private final RedissonClient redissonClient;
 
     private static final String NAME="redisson-delay-message-queue";
+    
     @Autowired
     public RedissonDelayMessageManager(RedissonClient redissonClient,
                                        List<DelayMessageHandler> handlers) {
-        this.blockingQueue = redissonClient.getBlockingQueue(NAME);
-        this.delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
-        this.handlers=handlers;
+        this.redissonClient = redissonClient;
+        this.handlers = handlers;
         if(CollectionUtil.isEmpty(handlers)){
             log.warn("the size of DelayMessageHandler is zero");
         }
-        //将头结点取出后触发该方法
-        blockingQueue.subscribeOnElements(this::handler);
     }
-
+    
     @PostConstruct
+    @Async
     public void init(){
+        // 延迟初始化Redisson队列,确保在应用启动的最后阶段才初始化
+        initializeRedissonQueues();
+        
         //初始化时并不会监听延迟队列,故先测试数据进入,开启监听
         this.add(new DelayMessage(Value.simple("初始化数据"),"无", DelayMessageProperties.of(TimeUnit.SECONDS,3)));
     }
+    
+    /**
+     * 延迟初始化Redisson队列
+     */
+    private void initializeRedissonQueues() {
+        try {
+            // 可选:添加延迟确保其他组件完全初始化
+            Thread.sleep(2000);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+        
+        this.blockingQueue = redissonClient.getBlockingQueue(NAME);
+        this.delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
+        
+        //将头结点取出后触发该方法
+        blockingQueue.subscribeOnElements(this::handler);
+    }
 
     @Override
     public void add(DelayMessage message) {
+        // 确保队列已初始化
+        if (delayedQueue == null) {
+            initializeRedissonQueues();
+        }
+        
         log.info("redisson-delay-queue ,add message = 【{}】",JSONUtil.toJsonStr(message));
         long expire = message.getProperties().getExpire();
         if(expire<=0){
@@ -72,11 +100,21 @@ public class RedissonDelayMessageManager implements DelayMessageManager {
 
     @Override
     public boolean remove(DelayMessage message) {
+        // 确保队列已初始化
+        if (delayedQueue == null) {
+            initializeRedissonQueues();
+        }
+        
         return delayedQueue.remove(message);
     }
 
     @Override
     public void destroy() {
+        // 确保队列已初始化
+        if (delayedQueue == null) {
+            initializeRedissonQueues();
+        }
+        
         delayedQueue.destroy();
     }
 
@@ -105,4 +143,4 @@ public class RedissonDelayMessageManager implements DelayMessageManager {
                     }
                 });
     }
-}
+}

+ 47 - 0
nb-common/delay-queue-common/src/main/java/com/nb/common/queue/delay/config/RedissonLateStartConfig.java

@@ -0,0 +1,47 @@
+package com.nb.common.queue.delay.config;
+
+import org.redisson.api.RedissonClient;
+import org.springframework.boot.autoconfigure.AutoConfigureAfter;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.DependsOn;
+
+import javax.annotation.PostConstruct;
+
+/**
+ * Redisson延迟启动配置类
+ * 确保Redisson在应用的最后阶段初始化
+ *
+ * @author YourName
+ * @version 1.0.0
+ */
+@Configuration
+@AutoConfigureAfter({
+    // 在这里添加您希望在Redisson之前初始化的所有配置类
+    // 例如:DataSourceAutoConfiguration.class,
+    //      HibernateJpaAutoConfiguration.class,
+    //      RedisAutoConfiguration.class
+})
+@DependsOn({
+    // 在这里添加您希望在Redisson之前初始化的所有Bean
+    // 例如:"dataSource",
+    //      "entityManagerFactory",
+    //      "redisTemplate"
+})
+public class RedissonLateStartConfig {
+
+    /**
+     * 最后执行的初始化方法
+     * 可以在这里添加任何需要在Redisson初始化之前完成的操作
+     */
+    @PostConstruct
+    public void lateInitialization() {
+        // 这里可以添加一些延迟初始化的逻辑
+        // 例如等待其他服务完全启动
+        try {
+            // 可选:添加一个小的延迟确保其他组件完全初始化
+            Thread.sleep(1000);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+    }
+}

+ 0 - 95
nb-core/src/main/java/com/nb/core/service/MqttMessageForwardService.java

@@ -1,95 +0,0 @@
-package com.nb.core.service;
-
-import com.nb.core.entity.MqttMessage;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.HttpEntity;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.MediaType;
-import org.springframework.http.ResponseEntity;
-import org.springframework.stereotype.Service;
-import org.springframework.web.client.RestTemplate;
-
-/**
- * MQTT消息转发服务
- * 将MQTT消息通过HTTP转发给其他服务
- *
- * @author YourName
- * @version 1.0.0
- */
-@Service
-public class MqttMessageForwardService {
-    
-    @Autowired
-    private RestTemplate restTemplate;
-    
-    /**
-     * 转发单个MQTT消息
-     *
-     * @param message MQTT消息
-     * @param targetUrl 目标服务URL
-     * @return 转发结果
-     */
-    public boolean forwardMessage(MqttMessage message, String targetUrl) {
-        try {
-            HttpHeaders headers = new HttpHeaders();
-            headers.setContentType(MediaType.APPLICATION_JSON);
-            
-            HttpEntity<MqttMessage> request = new HttpEntity<>(message, headers);
-            ResponseEntity<String> response = restTemplate.postForEntity(targetUrl, request, String.class);
-            
-            return response.getStatusCode().is2xxSuccessful();
-        } catch (Exception e) {
-            // 记录日志或处理异常
-            e.printStackTrace();
-            return false;
-        }
-    }
-    
-    /**
-     * 批量转发MQTT消息
-     *
-     * @param messages MQTT消息数组
-     * @param targetUrl 目标服务URL
-     * @return 转发结果
-     */
-    public boolean forwardMessages(MqttMessage[] messages, String targetUrl) {
-        try {
-            HttpHeaders headers = new HttpHeaders();
-            headers.setContentType(MediaType.APPLICATION_JSON);
-            
-            HttpEntity<MqttMessage[]> request = new HttpEntity<>(messages, headers);
-            ResponseEntity<String> response = restTemplate.postForEntity(targetUrl, request, String.class);
-            
-            return response.getStatusCode().is2xxSuccessful();
-        } catch (Exception e) {
-            // 记录日志或处理异常
-            e.printStackTrace();
-            return false;
-        }
-    }
-    
-    /**
-     * 转发MQTT消息(带自定义头信息)
-     *
-     * @param message MQTT消息
-     * @param targetUrl 目标服务URL
-     * @param customHeaders 自定义头信息
-     * @return 转发结果
-     */
-    public boolean forwardMessageWithHeaders(MqttMessage message, String targetUrl, HttpHeaders customHeaders) {
-        try {
-            if (customHeaders.getContentType() == null) {
-                customHeaders.setContentType(MediaType.APPLICATION_JSON);
-            }
-            
-            HttpEntity<MqttMessage> request = new HttpEntity<>(message, customHeaders);
-            ResponseEntity<String> response = restTemplate.postForEntity(targetUrl, request, String.class);
-            
-            return response.getStatusCode().is2xxSuccessful();
-        } catch (Exception e) {
-            // 记录日志或处理异常
-            e.printStackTrace();
-            return false;
-        }
-    }
-}

+ 1 - 10
nb-core/src/main/java/com/nb/core/utils/MqttClientUtil.java

@@ -2,7 +2,6 @@ package com.nb.core.utils;
 
 import cn.hutool.json.JSONUtil;
 import com.nb.core.entity.MqttMessage;
-import com.nb.core.service.MqttMessageForwardService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.integration.channel.DirectChannel;
 import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
@@ -33,10 +32,7 @@ import java.util.function.Consumer;
 public class MqttClientUtil implements ApplicationContextAware {
 
     private ApplicationContext applicationContext;
-    
-    @Autowired(required = false)
-    private MqttMessageForwardService mqttMessageForwardService;
-    
+
     private final Map<String, MqttPahoMessageDrivenChannelAdapter> subscribers = new ConcurrentHashMap<>();
     
     private MqttPahoClientFactory mqttClientFactory;
@@ -135,11 +131,6 @@ public class MqttClientUtil implements ApplicationContextAware {
             MqttMessage message = new MqttMessage(topic, payload);
             message.setClientId(clientId + "-pub-" + System.currentTimeMillis());
             
-            // 如果配置了转发服务,则转发消息
-            if (mqttMessageForwardService != null) {
-                // 这里可以配置转发到其他服务的URL
-                // mqttMessageForwardService.forwardMessage(message, "http://other-service/api/mqtt/receive");
-            }
             if (mqttClientFactory == null) {
                 return;
             }