18339543638 há 4 anos atrás
pai
commit
05dab4f0b3

+ 1 - 1
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/client/MqttClientProvider.java

@@ -76,7 +76,7 @@ public class MqttClientProvider implements NetworkProvider<MqttClientProperties>
             if (!result.succeeded()) {
                 log.warn("connect mqtt [{}] error", properties.getId(), result.cause());
             } else {
-                log.debug("connect mqtt [{}] success", properties.getId());
+                log.debug("connect mqtt [{}] success,host:{},port:{}", properties.getId(),properties.getHost(),properties.getPort());
             }
         });
     }

+ 10 - 0
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/MqttConnection.java

@@ -1,6 +1,7 @@
 package org.jetlinks.community.network.mqtt.server;
 
 import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
+import io.vertx.mqtt.MqttEndpoint;
 import org.jetlinks.core.message.codec.MqttMessage;
 import org.jetlinks.core.server.mqtt.MqttAuth;
 import reactor.core.publisher.Flux;
@@ -9,6 +10,7 @@ import reactor.core.publisher.Mono;
 import java.net.InetSocketAddress;
 import java.time.Duration;
 import java.util.Optional;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
 /**
@@ -59,6 +61,14 @@ public interface MqttConnection {
      */
     MqttConnection accept();
 
+    /**
+     * 心跳处理
+     * @param endpoint
+     * @param delay  最长空闲时间
+     * @param unit  空闲时间单位
+     */
+    void heartIdleHandler(MqttEndpoint endpoint, long delay, TimeUnit unit);
+
     /**
      * 获取遗言消息
      *

+ 3 - 0
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/MqttPublishing.java

@@ -1,10 +1,13 @@
 package org.jetlinks.community.network.mqtt.server;
 
 import org.jetlinks.core.message.codec.MqttMessage;
+import reactor.core.publisher.Flux;
 
 public interface MqttPublishing {
 
     MqttMessage getMessage();
 
     void acknowledge();
+
+    void handlerMessage(Flux publishFlux) ;
 }

+ 3 - 0
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/MqttSubscription.java

@@ -1,6 +1,7 @@
 package org.jetlinks.community.network.mqtt.server;
 
 import io.vertx.mqtt.messages.MqttSubscribeMessage;
+import reactor.core.publisher.Flux;
 
 public interface MqttSubscription {
 
@@ -8,4 +9,6 @@ public interface MqttSubscription {
 
     void acknowledge();
 
+    void handlerSubscription(Flux subscriptionFlux) ;
+
 }

+ 2 - 0
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/MqttUnSubscription.java

@@ -1,6 +1,7 @@
 package org.jetlinks.community.network.mqtt.server;
 
 import io.vertx.mqtt.messages.MqttUnsubscribeMessage;
+import reactor.core.publisher.Flux;
 
 public interface MqttUnSubscription {
 
@@ -8,4 +9,5 @@ public interface MqttUnSubscription {
 
     void acknowledge();
 
+    void handlerUnSubscription(Flux publishFlux) ;
 }

+ 100 - 16
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttConnection.java

@@ -2,8 +2,8 @@ package org.jetlinks.community.network.mqtt.server.vertx;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
-import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
-import io.netty.handler.codec.mqtt.MqttQoS;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.handler.codec.mqtt.*;
 import io.vertx.core.buffer.Buffer;
 import io.vertx.core.net.SocketAddress;
 import io.vertx.mqtt.MqttEndpoint;
@@ -30,6 +30,8 @@ import javax.annotation.Nonnull;
 import java.net.InetSocketAddress;
 import java.time.Duration;
 import java.util.Optional;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -45,11 +47,19 @@ class VertxMqttConnection implements MqttConnection {
 
     private final EmitterProcessor<MqttPublishing> messageProcessor = EmitterProcessor.create(false);
 
+
+    private ScheduledFuture<?> pingRespTimeout;
+
     private final FluxSink<MqttPublishing> publishingFluxSink = messageProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
 
-    private final EmitterProcessor<MqttSubscription> subscription = EmitterProcessor.create(false);
+    private final EmitterProcessor<MqttSubscription> subscriptionProcessor = EmitterProcessor.create(false);
     private final EmitterProcessor<MqttUnSubscription> unsubscription = EmitterProcessor.create(false);
 
+    private final VertxMqttPublishing publishing = new VertxMqttPublishing(false);
+    private final VertxMqttSubscription subscription = new VertxMqttSubscription( false);
+    private final VertxMqttMqttUnSubscription unSubscription = new VertxMqttMqttUnSubscription(false);
+    private Optional<MqttMessage> willMessage=null;
+
     private static final MqttAuth emptyAuth = new MqttAuth() {
         @Override
         public String getUsername() {
@@ -69,10 +79,9 @@ class VertxMqttConnection implements MqttConnection {
 
     private final Consumer<MqttConnection> defaultListener = mqttConnection -> {
         log.debug("mqtt client [{}] disconnected", getClientId());
-        subscription.onComplete();
+        subscriptionProcessor.onComplete();
         unsubscription.onComplete();
         messageProcessor.onComplete();
-
     };
 
     private Consumer<MqttConnection> disconnectConsumer = defaultListener;
@@ -113,10 +122,18 @@ class VertxMqttConnection implements MqttConnection {
         if (accepted) {
             return this;
         }
+        //todo 需要验证密码
+        String protocolName = endpoint.protocolName();
+        int protocolVersion = endpoint.protocolVersion();
         log.debug("mqtt client [{}] connected", getClientId());
         accepted = true;
+        ping();
         try {
             if (!endpoint.isConnected()) {
+                //心跳检测 todo 改为配置文件配置
+                heartIdleHandler(endpoint,5,TimeUnit.SECONDS);
+                //放置遗言
+                this.willMessage = this.getWillMessage();
                 endpoint.accept();
             }
         } catch (Exception e) {
@@ -128,6 +145,14 @@ class VertxMqttConnection implements MqttConnection {
         return this;
     }
 
+    @Override
+    public void heartIdleHandler(MqttEndpoint endpoint, long delay, TimeUnit unit) {
+        this.pingRespTimeout= new NioEventLoopGroup(1).schedule(()->{
+            log.error("通道关闭了:{}",endpoint.remoteAddress());
+            endpoint.close();
+        },delay,unit);
+    }
+
     @Override
     public void keepAlive() {
         ping();
@@ -135,24 +160,33 @@ class VertxMqttConnection implements MqttConnection {
 
     void ping() {
         lastPingTime = System.currentTimeMillis();
+        if (this.pingRespTimeout != null && !this.pingRespTimeout.isCancelled() && !this.pingRespTimeout.isDone()) {
+            this.pingRespTimeout.cancel(true);
+            this.pingRespTimeout = null;
+        }
     }
 
     void init() {
         this.endpoint
+            .exceptionHandler(event -> {
+
+            })
             .disconnectHandler(ignore -> this.complete())
             .closeHandler(ignore -> this.complete())
             .pingHandler(ignore -> {
                 this.ping();
-                if (!endpoint.isAutoKeepAlive()) {
+                if (endpoint.isAutoKeepAlive()) {
                     endpoint.pong();
                 }
             })
             .publishHandler(msg -> {
                 ping();
-                VertxMqttPublishing publishing = new VertxMqttPublishing(msg, false);
+                publishing.setMessage(msg);
+                publishing.handlerMessage(messageProcessor);
                 boolean hasDownstream = this.messageProcessor.hasDownstreams();
                 if (autoAckMsg || !hasDownstream) {
                     publishing.acknowledge();
+                    this.publishingFluxSink.next(publishing);
                 }
                 if (hasDownstream) {
                     this.publishingFluxSink.next(publishing);
@@ -182,18 +216,20 @@ class VertxMqttConnection implements MqttConnection {
             })
             .subscribeHandler(msg -> {
                 ping();
-                VertxMqttSubscription subscription = new VertxMqttSubscription(msg, false);
-                boolean hasDownstream = this.subscription.hasDownstreams();
+                subscription.setMessage(msg);
+                subscription.handlerSubscription(this.subscriptionProcessor);
+                boolean hasDownstream = this.subscriptionProcessor.hasDownstreams();
                 if (autoAckSub || !hasDownstream) {
                     subscription.acknowledge();
                 }
                 if (hasDownstream) {
-                    this.subscription.onNext(subscription);
+                    this.subscriptionProcessor.onNext(subscription);
                 }
             })
             .unsubscribeHandler(msg -> {
                 ping();
-                VertxMqttMqttUnSubscription unSubscription = new VertxMqttMqttUnSubscription(msg, false);
+                unSubscription.setMessage(msg);
+                unSubscription.handlerUnSubscription(this.unsubscription);
                 boolean hasDownstream = this.unsubscription.hasDownstreams();
                 if (autoAckUnSub || !hasDownstream) {
                     unSubscription.acknowledge();
@@ -260,9 +296,8 @@ class VertxMqttConnection implements MqttConnection {
 
     @Override
     public Flux<MqttSubscription> handleSubscribe(boolean autoAck) {
-
         autoAckSub = autoAck;
-        return subscription.map(Function.identity());
+        return subscriptionProcessor.map(Function.identity());
     }
 
     @Override
@@ -350,10 +385,22 @@ class VertxMqttConnection implements MqttConnection {
     @AllArgsConstructor
     class VertxMqttPublishing implements MqttPublishing {
 
-        private final MqttPublishMessage message;
+        private MqttPublishMessage message;
 
         private volatile boolean acknowledged;
 
+        public VertxMqttPublishing(boolean acknowledged) {
+            this.acknowledged = acknowledged;
+        }
+
+        public void setMessage(MqttPublishMessage message) {
+            this.message = message;
+        }
+
+        public void setAcknowledged(boolean acknowledged) {
+            this.acknowledged = acknowledged;
+        }
+
         @Override
         public MqttMessage getMessage() {
             return new VertxMqttMessage(message);
@@ -373,15 +420,31 @@ class VertxMqttConnection implements MqttConnection {
                 endpoint.publishReceived(message.messageId());
             }
         }
+
+        @Override
+        public void handlerMessage(Flux publishFlux) {
+            publishFlux.subscribe(publish->{
+               //todo 发布消息
+            });
+
+        }
     }
 
     @AllArgsConstructor
     class VertxMqttSubscription implements MqttSubscription {
 
-        private final MqttSubscribeMessage message;
+        private MqttSubscribeMessage message;
+
 
         private volatile boolean acknowledged;
 
+        public VertxMqttSubscription(boolean acknowledged) {
+            this.acknowledged = acknowledged;
+        }
+
+        public void setMessage(MqttSubscribeMessage message) {
+            this.message = message;
+        }
         @Override
         public MqttSubscribeMessage getMessage() {
             return message;
@@ -396,15 +459,29 @@ class VertxMqttConnection implements MqttConnection {
             endpoint.subscribeAcknowledge(message.messageId(), message.topicSubscriptions().stream()
                 .map(MqttTopicSubscription::qualityOfService).collect(Collectors.toList()));
         }
+
+        @Override
+        public void handlerSubscription(Flux subscriptionFlux) {
+            subscriptionFlux.subscribe(result->{
+                //todo 订阅主题
+            });
+        }
     }
 
     @AllArgsConstructor
     class VertxMqttMqttUnSubscription implements MqttUnSubscription {
 
-        private final MqttUnsubscribeMessage message;
+        private MqttUnsubscribeMessage message;
 
         private volatile boolean acknowledged;
 
+        public VertxMqttMqttUnSubscription(boolean acknowledged) {
+            this.acknowledged = acknowledged;
+        }
+
+        public void setMessage(MqttUnsubscribeMessage message) {
+            this.message = message;
+        }
         @Override
         public MqttUnsubscribeMessage getMessage() {
             return message;
@@ -419,6 +496,13 @@ class VertxMqttConnection implements MqttConnection {
             acknowledged = true;
             endpoint.unsubscribeAcknowledge(message.messageId());
         }
+
+        @Override
+        public void handlerUnSubscription(Flux unSubscriptionFlux) {
+            unSubscriptionFlux.subscribe(result->{
+                //取消订阅主题
+            });
+        }
     }
 
 

+ 18 - 7
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttServer.java

@@ -39,18 +39,29 @@ public class VertxMqttServer implements MqttServer {
                     log.error(error.getMessage(), error);
                 })
                 .endpointHandler(endpoint -> {
-                    if (!connectionProcessor.hasDownstreams()) {
-                        log.info("mqtt server no handler for:[{}]", endpoint.clientIdentifier());
-                        endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
-                        return;
-                    }
+                    // 显示主要连接信息
+                    log.info("MQTT client  {} request to connect, clean session {} " ,endpoint.clientIdentifier(), endpoint.isCleanSession());
+
+//                    if (!connectionProcessor.hasDownstreams()) {
+//                        log.info("mqtt server no handler for:[{}]", endpoint.clientIdentifier());
+//                        endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
+//                        return;
+//                    }
                     if (connectionProcessor.getPending() >= 10240) {
                         log.warn("too many no handle mqtt connection : {}", connectionProcessor.getPending());
                         endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
                         return;
                     }
-                    sink.next(new VertxMqttConnection(endpoint));
-                });
+//                    endpoint.accept(false);
+                    sink.next(new VertxMqttConnection(endpoint).accept());
+                }).listen(result->{
+                if (result.succeeded()) {
+                    log.info("MQTT server is listening on port " + result.result().actualPort());
+                } else {
+
+                    log.error("Error on starting the server,",result.cause());
+                }
+            });
         }
     }
 

+ 25 - 15
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttServerProvider.java

@@ -5,6 +5,7 @@ import io.vertx.core.Vertx;
 import io.vertx.mqtt.MqttServer;
 import io.vertx.mqtt.MqttServerOptions;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
 import org.hswebframework.web.bean.FastBeanCopier;
 import org.jetlinks.community.network.*;
 import org.jetlinks.community.network.security.CertificateManager;
@@ -16,6 +17,7 @@ import reactor.core.publisher.Mono;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 @Component
@@ -46,21 +48,29 @@ public class VertxMqttServerProvider implements NetworkProvider<VertxMqttServerP
 
     private void initServer(VertxMqttServer server, VertxMqttServerProperties properties) {
         List<MqttServer> instances = new ArrayList<>(properties.getInstance());
-        for (int i = 0; i < properties.getInstance(); i++) {
-            MqttServer mqttServer = MqttServer.create(vertx, properties.getOptions());
-            instances.add(mqttServer);
-
-        }
-        server.setMqttServer(instances);
-        for (MqttServer instance : instances) {
-            instance.listen(result -> {
-                if (result.succeeded()) {
-                    log.debug("startup mqtt server [{}] on port :{} ", properties.getId(), result.result().actualPort());
-                } else {
-                    log.warn("startup mqtt server [{}] error ", properties.getId(), result.cause());
-                }
-            });
-        }
+        MqttServer mqttServer = MqttServer.create(vertx, properties.getOptions());
+//        mqttServer.listen(result->{
+//            if (result.failed()) {
+//                log.error("启动失败");
+//            }else {
+//                log.info("启动成功");
+//            }
+//        });
+        server.setMqttServer(Collections.singleton(mqttServer));
+//        for (int i = 0; i < properties.getInstance(); i++) {
+//            MqttServer mqttServer = MqttServer.create(vertx, properties.getOptions());
+//            instances.add(mqttServer);
+//        }
+//        server.setMqttServer(instances);
+//        for (MqttServer instance : instances) {
+//            instance.listen(result -> {
+//                if (result.succeeded()) {
+//                    log.info("startup mqtt server [{}] on port :{} ", properties.getId(), result.result().actualPort());
+//                } else {
+//                    log.error("startup mqtt server [{}] error ", properties.getId(), result.cause());
+//                }
+//            });
+//        }
     }
 
     @Override

+ 42 - 7
jetlinks-standalone/src/test/java/org/jetlinks/community/network/manager/web/GateWayTest.java

@@ -1,8 +1,21 @@
 package org.jetlinks.community.network.manager.web;
 
+import io.vertx.core.Vertx;
+import io.vertx.mqtt.MqttServerOptions;
 import lombok.extern.slf4j.Slf4j;
+import org.checkerframework.checker.units.qual.A;
+import org.hswebframework.web.exception.NotFoundException;
 import org.jetlinks.community.gateway.DeviceGatewayManager;
+import org.jetlinks.community.network.NetworkManager;
+import org.jetlinks.community.network.manager.entity.DeviceGatewayEntity;
+import org.jetlinks.community.network.manager.entity.NetworkConfigEntity;
+import org.jetlinks.community.network.manager.enums.NetworkConfigState;
 import org.jetlinks.community.network.manager.service.DeviceGatewayService;
+import org.jetlinks.community.network.manager.service.NetworkConfigService;
+import org.jetlinks.community.network.mqtt.server.MqttServer;
+import org.jetlinks.community.network.mqtt.server.MqttSubscription;
+import org.jetlinks.community.network.mqtt.server.vertx.VertxMqttServerProperties;
+import org.jetlinks.community.network.mqtt.server.vertx.VertxMqttServerProvider;
 import org.jetlinks.community.standalone.JetLinksApplication;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -10,6 +23,13 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.context.annotation.EnableAspectJAutoProxy;
 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+import reactor.core.publisher.EmitterProcessor;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
 
 /**
  * @author lifang
@@ -18,18 +38,33 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
  * @Description TODO
  * @createTime 2021年06月29日 14:25:00
  */
-@RunWith(SpringJUnit4ClassRunner.class)
-@SpringBootTest(classes = JetLinksApplication.class, webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
-@Slf4j
-@EnableAspectJAutoProxy(proxyTargetClass = true,exposeProxy = true)
 public class GateWayTest {
     @Autowired
     private DeviceGatewayService deviceGatewayService;
 
     @Autowired
     private DeviceGatewayManager gatewayManager;
-    @Test
-    public void startUp(){
-//        deviceGatewayService.createQuery().fetch().all(deviceGatewayEntity -> )
+
+    @Autowired
+    private NetworkConfigService networkConfigService;
+    @Autowired
+    private NetworkManager networkManager;
+
+
+    static Vertx vertx = Vertx.vertx();
+
+    static MqttServer mqttServer;
+
+    public static void main(String[] args) throws InterruptedException {
+        VertxMqttServerProvider mqttServerManager = new VertxMqttServerProvider(id -> Mono.empty(), vertx);
+
+        VertxMqttServerProperties properties = new VertxMqttServerProperties();
+        properties.setId("test");
+        properties.setInstance(4);
+        properties.setOptions(new MqttServerOptions().setPort(1800));
+
+        mqttServer = mqttServerManager.createNetwork(properties);
+
+        Thread.currentThread().join();
     }
 }