瀏覽代碼

add
mqtt信息发送

lifang 6 天之前
父節點
當前提交
af8fb92515

+ 28 - 21
nb-service/iot-service/src/main/java/com/nb/aliyun/service/consumer/NBAndFourGConsumerGroupService.java

@@ -122,7 +122,7 @@ public class NBAndFourGConsumerGroupService extends AbstractAliConsumer {
      */
     private void checkAndReconnect() {
         // 如果当前未连接且队列大小小于阈值,则尝试重新连接
-        if (!isConnected && ((ThreadPoolExecutor) executorService).getQueue().size() < (50/10) && logQueue.size() < 100) {
+        if (!isConnected && ((ThreadPoolExecutor) executorService).getQueue().size() < 50 && logQueue.size() < 100) {
             log.info("队列已清空,准备重新连接阿里云物联网平台");
             reconnect();
         }
@@ -145,32 +145,39 @@ public class NBAndFourGConsumerGroupService extends AbstractAliConsumer {
      * 断开阿里云连接
      */
     private void disconnect() {
-        if (isConnected) {
-            try {
-                this.getClient().stop();
-                isConnected = false;
-                log.info("已断开阿里云物联网连接");
-            } catch (Exception e) {
-                log.error("断开阿里云物联网连接失败", e);
-            }
-        }
+//        if (isConnected) {
+//            try {
+//                // 在新线程中执行断开连接操作,避免在消息监听器回调线程中执行非法操作
+//                new Thread(() -> {
+//                    try {
+//                        this.getClient().stop();
+//                        isConnected = false;
+//                        log.info("已断开阿里云物联网连接");
+//                    } catch (Exception e) {
+//                        log.error("断开阿里云物联网连接失败", e);
+//                    }
+//                }, "AliIotDisconnectThread").start();
+//            } catch (Exception e) {
+//                log.error("启动断开连接线程失败", e);
+//            }
+//        }
     }
     
     /**
      * 重新连接阿里云
      */
     private void reconnect() {
-        if (!isConnected) {
-            try {
-                this.getClient().restart();
-                isConnected = true;
-                log.info("已重新连接阿里云物联网平台");
-            } catch (Exception e) {
-                log.error("重新连接阿里云物联网平台失败", e);
-                // 安排延时重试
-                scheduledExecutorService.schedule(this::reconnect, RECONNECT_DELAY, TimeUnit.MILLISECONDS);
-            }
-        }
+//        if (!isConnected) {
+//            try {
+//                this.getClient().start(messageListener);
+//                isConnected = true;
+//                log.info("已重新连接阿里云物联网平台");
+//            } catch (Exception e) {
+//                log.error("重新连接阿里云物联网平台失败", e);
+//                // 安排延时重试
+//                scheduledExecutorService.schedule(this::reconnect, RECONNECT_DELAY, TimeUnit.MILLISECONDS);
+//            }
+//        }
     }