18339543638 4 роки тому
батько
коміт
942618452e

+ 0 - 8
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/MqttConnection.java

@@ -61,14 +61,6 @@ public interface MqttConnection {
      */
     MqttConnection accept();
 
-    /**
-     * 心跳处理
-     * @param endpoint
-     * @param delay  最长空闲时间
-     * @param unit  空闲时间单位
-     */
-    void heartIdleHandler(MqttEndpoint endpoint, long delay, TimeUnit unit);
-
     /**
      * 获取遗言消息
      *

+ 23 - 97
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttConnection.java

@@ -1,13 +1,9 @@
 package org.jetlinks.community.network.mqtt.server.vertx;
 
-import cn.hutool.core.lang.tree.Tree;
-import cn.hutool.core.lang.tree.TreeUtil;
-import cn.hutool.extra.spring.SpringUtil;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.handler.codec.http2.Http2Connection;
-import io.netty.handler.codec.mqtt.*;
+import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
+import io.netty.handler.codec.mqtt.MqttQoS;
 import io.vertx.core.buffer.Buffer;
 import io.vertx.core.net.SocketAddress;
 import io.vertx.mqtt.MqttEndpoint;
@@ -18,17 +14,13 @@ import io.vertx.mqtt.messages.MqttUnsubscribeMessage;
 import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-import org.jetlinks.community.network.mqtt.server.*;
-import org.jetlinks.core.event.EventBus;
-import org.jetlinks.core.message.DeviceOnlineMessage;
-import org.jetlinks.core.message.MessageType;
+import org.jetlinks.community.network.mqtt.server.MqttConnection;
+import org.jetlinks.community.network.mqtt.server.MqttPublishing;
+import org.jetlinks.community.network.mqtt.server.MqttSubscription;
+import org.jetlinks.community.network.mqtt.server.MqttUnSubscription;
 import org.jetlinks.core.message.codec.MqttMessage;
 import org.jetlinks.core.message.codec.SimpleMqttMessage;
 import org.jetlinks.core.server.mqtt.MqttAuth;
-import org.jetlinks.core.server.session.DeviceSessionManager;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-import org.springframework.util.StringUtils;
 import reactor.core.publisher.EmitterProcessor;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.FluxSink;
@@ -37,15 +29,14 @@ import reactor.core.publisher.Mono;
 import javax.annotation.Nonnull;
 import java.net.InetSocketAddress;
 import java.time.Duration;
-import java.util.*;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.Optional;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
 @Slf4j
 class VertxMqttConnection implements MqttConnection {
+
     private final MqttEndpoint endpoint;
     private long keepAliveTimeoutMs;
     @Getter
@@ -54,15 +45,12 @@ class VertxMqttConnection implements MqttConnection {
 
     private final EmitterProcessor<MqttPublishing> messageProcessor = EmitterProcessor.create(false);
 
-    private ScheduledFuture<?> pingRespTimeout;
-
     private final FluxSink<MqttPublishing> publishingFluxSink = messageProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
 
-    private final EmitterProcessor<MqttSubscription> subscriptionProcessor = EmitterProcessor.create(false);
+    private final EmitterProcessor<MqttSubscription> subscription = EmitterProcessor.create(false);
     private final EmitterProcessor<MqttUnSubscription> unsubscription = EmitterProcessor.create(false);
 
     private static final MqttAuth emptyAuth = new MqttAuth() {
-
         @Override
         public String getUsername() {
             return "";
@@ -81,9 +69,10 @@ class VertxMqttConnection implements MqttConnection {
 
     private final Consumer<MqttConnection> defaultListener = mqttConnection -> {
         log.debug("mqtt client [{}] disconnected", getClientId());
-        subscriptionProcessor.onComplete();
+        subscription.onComplete();
         unsubscription.onComplete();
         messageProcessor.onComplete();
+
     };
 
     private Consumer<MqttConnection> disconnectConsumer = defaultListener;
@@ -124,70 +113,21 @@ class VertxMqttConnection implements MqttConnection {
         if (accepted) {
             return this;
         }
-        accepted = true;
-        //todo 需要验证密码
         log.debug("mqtt client [{}] connected", getClientId());
-        ping();
+        accepted = true;
         try {
             if (!endpoint.isConnected()) {
-
+                endpoint.accept();
             }
-            //心跳检测 todo 改为配置文件配置
-            heartIdleHandler(endpoint,keepAliveTimeoutMs,TimeUnit.SECONDS);
         } catch (Exception e) {
             close().subscribe();
             log.warn(e.getMessage(), e);
             return this;
         }
-//        initSubscript();
         init();
-        endpoint.accept();
         return this;
     }
 
-
-    private static final Map<String, List<MqttEndpoint>> map = new HashMap<>();
-//    /**
-//     * 初始化监听器
-//     */
-//    private void initSubscript() {
-//
-//        //todo 订阅处理
-//        subscriptionProcessor.subscribe(mqttSubscription -> {
-//            MqttSubscribeMessage message = mqttSubscription.getMessage();
-//            List<MqttTopicSubscription> mqttTopicSubscriptions = message.topicSubscriptions();
-//            for (MqttTopicSubscription subscription : mqttTopicSubscriptions) {
-//                TopicHelper.addTopic(subscription.topicName(),endpoint);
-//            }
-//        });
-//        //取消订阅处理 todo
-//        unsubscription.subscribe(mqttUnSubscription -> {
-//            List<String> topics = mqttUnSubscription.getMessage().topics();
-//            for (String topic : topics) {
-//                TopicHelper.deleteNodeEndPoint(topic,endpoint);
-//            }
-//        });
-//
-//        //发布处理 todo
-//        messageProcessor.subscribe(publishing -> {
-//            //  /tuoren/yanjiuyuan
-//            String publishTopic = publishing.getMessage().getTopic();
-//            Set<MqttEndpoint> mqttEndpoints = TopicHelper.findMatchNodes(publishTopic);
-//            // /tuoren/yanjiuyuan /tuoren/#
-//            mqttEndpoints.parallelStream().forEach(mqttEndpoint ->{
-//                mqttEndpoint.publish(publishTopic,Buffer.buffer(publishing.getMessage().payloadAsString(),"utf-8"),MqttQoS.AT_MOST_ONCE,false,false);
-//            });
-//        });
-//    }
-
-    @Override
-    public void heartIdleHandler(MqttEndpoint endpoint, long delay, TimeUnit unit) {
-        this.pingRespTimeout= new NioEventLoopGroup(1).schedule(()->{
-            log.warn("通道关闭了:{}",endpoint.remoteAddress());
-            endpoint.close();
-        },delay,unit);
-    }
-
     @Override
     public void keepAlive() {
         ping();
@@ -195,22 +135,15 @@ class VertxMqttConnection implements MqttConnection {
 
     void ping() {
         lastPingTime = System.currentTimeMillis();
-        if (this.pingRespTimeout != null && !this.pingRespTimeout.isCancelled() && !this.pingRespTimeout.isDone()) {
-            this.pingRespTimeout.cancel(true);
-            this.pingRespTimeout = null;
-        }
     }
 
     void init() {
         this.endpoint
-            .exceptionHandler(event -> {
-
-            })
             .disconnectHandler(ignore -> this.complete())
             .closeHandler(ignore -> this.complete())
             .pingHandler(ignore -> {
                 this.ping();
-                if (endpoint.isAutoKeepAlive()) {
+                if (!endpoint.isAutoKeepAlive()) {
                     endpoint.pong();
                 }
             })
@@ -249,27 +182,25 @@ class VertxMqttConnection implements MqttConnection {
             })
             .subscribeHandler(msg -> {
                 ping();
-                VertxMqttSubscription subscription = new VertxMqttSubscription( msg,false);
-                boolean hasDownstream = this.subscriptionProcessor.hasDownstreams();
+                VertxMqttSubscription subscription = new VertxMqttSubscription(msg, false);
+                boolean hasDownstream = this.subscription.hasDownstreams();
                 if (autoAckSub || !hasDownstream) {
                     subscription.acknowledge();
                 }
-                this.subscriptionProcessor.onNext(subscription);
                 if (hasDownstream) {
-                    this.subscriptionProcessor.onNext(subscription);
+                    this.subscription.onNext(subscription);
                 }
             })
             .unsubscribeHandler(msg -> {
                 ping();
-                VertxMqttMqttUnSubscription unSubscription = new VertxMqttMqttUnSubscription(msg,false);
+                VertxMqttMqttUnSubscription unSubscription = new VertxMqttMqttUnSubscription(msg, false);
                 boolean hasDownstream = this.unsubscription.hasDownstreams();
                 if (autoAckUnSub || !hasDownstream) {
                     unSubscription.acknowledge();
                 }
-                this.unsubscription.onNext(unSubscription);
-//                if (hasDownstream) {
-//                    this.unsubscription.onNext(unSubscription);
-//                }
+                if (hasDownstream) {
+                    this.unsubscription.onNext(unSubscription);
+                }
             });
     }
 
@@ -329,8 +260,9 @@ class VertxMqttConnection implements MqttConnection {
 
     @Override
     public Flux<MqttSubscription> handleSubscribe(boolean autoAck) {
+
         autoAckSub = autoAck;
-        return subscriptionProcessor.map(Function.identity());
+        return subscription.map(Function.identity());
     }
 
     @Override
@@ -422,11 +354,6 @@ class VertxMqttConnection implements MqttConnection {
 
         private volatile boolean acknowledged;
 
-
-        public void setAcknowledged(boolean acknowledged) {
-            this.acknowledged = acknowledged;
-        }
-
         @Override
         public MqttMessage getMessage() {
             return new VertxMqttMessage(message);
@@ -453,7 +380,6 @@ class VertxMqttConnection implements MqttConnection {
 
         private final MqttSubscribeMessage message;
 
-
         private volatile boolean acknowledged;
 
         @Override

+ 13 - 24
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttServer.java

@@ -1,5 +1,6 @@
 package org.jetlinks.community.network.mqtt.server.vertx;
 
+import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
 import lombok.extern.slf4j.Slf4j;
 import org.jetlinks.community.network.DefaultNetworkType;
 import org.jetlinks.community.network.NetworkType;
@@ -27,7 +28,6 @@ public class VertxMqttServer implements MqttServer {
         this.id = id;
     }
 
-
     public void setMqttServer(Collection<io.vertx.mqtt.MqttServer> mqttServer) {
         if (this.mqttServer != null && !this.mqttServer.isEmpty()) {
             shutdown();
@@ -39,29 +39,18 @@ public class VertxMqttServer implements MqttServer {
                     log.error(error.getMessage(), error);
                 })
                 .endpointHandler(endpoint -> {
-                    // 显示主要连接信息
-                    log.info("MQTT client  {} request to connect, clean session {} " ,endpoint.clientIdentifier(), endpoint.isCleanSession());
-
-//                    if (!connectionProcessor.hasDownstreams()) {
-//                        log.info("mqtt server no handler for:[{}]", endpoint.clientIdentifier());
-//                        endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
-//                        return;
-//                    }
-//                    if (connectionProcessor.getPending() >= 10240) {
-//                        log.warn("too many no handle mqtt connection : {}", connectionProcessor.getPending());
-//                        endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
-//                        return;
-//                    }
-//                    endpoint.accept(false);
-                    sink.next(new VertxMqttConnection(endpoint).accept());
-                }).listen(result->{
-                if (result.succeeded()) {
-                    log.info("MQTT server is listening on port " + result.result().actualPort());
-                } else {
-
-                    log.error("Error on starting the server,",result.cause());
-                }
-            });
+                    if (!connectionProcessor.hasDownstreams()) {
+                        log.info("mqtt server no handler for:[{}]", endpoint.clientIdentifier());
+                        endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
+                        return;
+                    }
+                    if (connectionProcessor.getPending() >= 10240) {
+                        log.warn("too many no handle mqtt connection : {}", connectionProcessor.getPending());
+                        endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
+                        return;
+                    }
+                    sink.next(new VertxMqttConnection(endpoint));
+                });
         }
     }
 

+ 24 - 35
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttServerProvider.java

@@ -4,14 +4,9 @@ import com.alibaba.fastjson.JSONObject;
 import io.vertx.core.Vertx;
 import io.vertx.mqtt.MqttServer;
 import io.vertx.mqtt.MqttServerOptions;
-import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections4.CollectionUtils;
 import org.hswebframework.web.bean.FastBeanCopier;
 import org.jetlinks.community.network.*;
-import org.jetlinks.community.network.mqtt.client.MqttClientProperties;
-import org.jetlinks.community.network.mqtt.client.MqttClientProvider;
-import org.jetlinks.community.network.mqtt.client.VertxMqttClient;
 import org.jetlinks.community.network.security.CertificateManager;
 import org.jetlinks.community.network.security.VertxKeyCertTrustOptions;
 import org.jetlinks.core.metadata.ConfigMetadata;
@@ -21,18 +16,20 @@ import reactor.core.publisher.Mono;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
-import java.util.UUID;
 
 @Component
 @Slf4j
-@AllArgsConstructor
 public class VertxMqttServerProvider implements NetworkProvider<VertxMqttServerProperties> {
 
     private final CertificateManager certificateManager;
     private final Vertx vertx;
 
+    public VertxMqttServerProvider(CertificateManager certificateManager, Vertx vertx) {
+        this.certificateManager = certificateManager;
+        this.vertx = vertx;
+    }
+
     @Nonnull
     @Override
     public NetworkType getType() {
@@ -49,29 +46,21 @@ public class VertxMqttServerProvider implements NetworkProvider<VertxMqttServerP
 
     private void initServer(VertxMqttServer server, VertxMqttServerProperties properties) {
         List<MqttServer> instances = new ArrayList<>(properties.getInstance());
-        MqttServer mqttServer = MqttServer.create(vertx, properties.getOptions());
-//        mqttServer.listen(result->{
-//            if (result.failed()) {
-//                log.error("启动失败");
-//            }else {
-//                log.info("启动成功");
-//            }
-//        });
-        server.setMqttServer(Collections.singleton(mqttServer));
-//        for (int i = 0; i < properties.getInstance(); i++) {
-//            MqttServer mqttServer = MqttServer.create(vertx, properties.getOptions());
-//            instances.add(mqttServer);
-//        }
-//        server.setMqttServer(instances);
-//        for (MqttServer instance : instances) {
-//            instance.listen(result -> {
-//                if (result.succeeded()) {
-//                    log.info("startup mqtt server [{}] on port :{} ", properties.getId(), result.result().actualPort());
-//                } else {
-//                    log.error("startup mqtt server [{}] error ", properties.getId(), result.cause());
-//                }
-//            });
-//        }
+        for (int i = 0; i < properties.getInstance(); i++) {
+            MqttServer mqttServer = MqttServer.create(vertx, properties.getOptions());
+            instances.add(mqttServer);
+
+        }
+        server.setMqttServer(instances);
+        for (MqttServer instance : instances) {
+            instance.listen(result -> {
+                if (result.succeeded()) {
+                    log.debug("startup mqtt server [{}] on port :{} ", properties.getId(), result.result().actualPort());
+                } else {
+                    log.warn("startup mqtt server [{}] error ", properties.getId(), result.cause());
+                }
+            });
+        }
     }
 
     @Override
@@ -98,10 +87,10 @@ public class VertxMqttServerProvider implements NetworkProvider<VertxMqttServerP
             if (config.isSsl()) {
                 config.getOptions().setSsl(true);
                 return certificateManager.getCertificate(config.getCertId())
-                        .map(VertxKeyCertTrustOptions::new)
-                        .doOnNext(config.getOptions()::setKeyCertOptions)
-                        .doOnNext(config.getOptions()::setTrustOptions)
-                        .thenReturn(config);
+                    .map(VertxKeyCertTrustOptions::new)
+                    .doOnNext(config.getOptions()::setKeyCertOptions)
+                    .doOnNext(config.getOptions()::setTrustOptions)
+                    .thenReturn(config);
             }
             return Mono.just(config);
         });

+ 11 - 10
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/LocalFirmwareUpgradeHistoryService.java

@@ -54,7 +54,7 @@ public class LocalFirmwareUpgradeHistoryService extends GenericReactiveCrudServi
                 firmwareTaskService.createQuery()
                     .where(DeviceFirmwareTaskEntity::getMode,TaskMode.push)
                     .where(DeviceFirmwareTaskEntity::getProductId, message.getHeader("productId").get()).fetchOne())
-                .doOnNext(tp3 -> {
+                .flatMap(tp3 -> {
                     DeviceOperator operator = tp3.getT1();
                     DeviceFirmwareEntity firmware = tp3.getT2();
                     if (!firmware.getVersion().equals(message.getVersion())) {
@@ -66,9 +66,11 @@ public class LocalFirmwareUpgradeHistoryService extends GenericReactiveCrudServi
                         firmwareMessage.setSign(firmware.getSign());
                         firmwareMessage.setSignMethod(firmware.getSignMethod());
                         firmwareMessage.setTimestamp(System.currentTimeMillis());
-                        operator.messageSender().sendAndForget(firmwareMessage).subscribe();
+                        return operator.messageSender().sendAndForget(firmwareMessage);
                     }
-                }).then();
+                    return Mono.empty();
+                })
+                .then();
     }
 
     /**
@@ -79,7 +81,7 @@ public class LocalFirmwareUpgradeHistoryService extends GenericReactiveCrudServi
     @Subscribe("/device/*/*/firmware/progress")
     public Mono<Void> handleProcess(UpgradeFirmwareProgressMessage message) {
         return Mono.just(message)
-            .doOnNext(progress->{
+            .flatMap(progress->{
                 String firmwareId = message.getFirmwareId();
                 String version = message.getVersion();
                 ReactiveUpdate<DeviceUpgradeHistoryEntity> upgrade = this.createUpdate()
@@ -105,7 +107,7 @@ public class LocalFirmwareUpgradeHistoryService extends GenericReactiveCrudServi
                 }else {
                     upgrade.set(DeviceUpgradeHistoryEntity::getState,FirmwareUpgradeState.processing);
                 }
-                upgrade.execute().subscribe();
+                return upgrade.execute();
             }).doOnNext(progress->{
                 if(message.getProgress()==0){
                     //设备id+固件id+版本号作为key值
@@ -124,11 +126,11 @@ public class LocalFirmwareUpgradeHistoryService extends GenericReactiveCrudServi
                                 .set(DeviceUpgradeHistoryEntity::getState,FirmwareUpgradeState.failed)
                                 .set(DeviceUpgradeHistoryEntity::getErrorReason,"升级超时")
                                 .set(DeviceUpgradeHistoryEntity::getProgress, message.getProgress())
-                                .set(DeviceUpgradeHistoryEntity::getLastUpdateTime,System.currentTimeMillis()).execute().subscribe();
+                                .set(DeviceUpgradeHistoryEntity::getLastUpdateTime,System.currentTimeMillis()).execute()
+                                .subscribe();
                             taskDelayService.remove(key);
                         },
                         System.currentTimeMillis(), 3L, TimeUnit.SECONDS));
-
                 }
             })
             .then();
@@ -160,7 +162,7 @@ public class LocalFirmwareUpgradeHistoryService extends GenericReactiveCrudServi
                         .where(DeviceFirmwareTaskEntity::getMode, TaskMode.pull.getValue())
                         .fetchOne());
             })
-            .doOnNext(tp5->{
+            .flatMap(tp5->{
                 DeviceFirmwareEntity firmware = tp5.getT3();
                 DeviceMessageSender sender = tp5.getT2();
                 RequestFirmwareMessageReply reply = new RequestFirmwareMessageReply();
@@ -171,8 +173,7 @@ public class LocalFirmwareUpgradeHistoryService extends GenericReactiveCrudServi
                 reply.setSize(10L);
                 reply.setUrl(firmware.getUrl());
                 reply.setMessageId(messages.getMessageId());
-                sender.sendAndForget(reply).
-                    subscribe();
+                return sender.sendAndForget(reply);
             }).then();
     }
 }