18339543638 4 rokov pred
rodič
commit
d5b3079617

+ 0 - 3
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/MqttPublishing.java

@@ -1,13 +1,10 @@
 package org.jetlinks.community.network.mqtt.server;
 
 import org.jetlinks.core.message.codec.MqttMessage;
-import reactor.core.publisher.Flux;
 
 public interface MqttPublishing {
 
     MqttMessage getMessage();
 
     void acknowledge();
-
-    void handlerMessage(Flux publishFlux) ;
 }

+ 0 - 3
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/MqttSubscription.java

@@ -1,7 +1,6 @@
 package org.jetlinks.community.network.mqtt.server;
 
 import io.vertx.mqtt.messages.MqttSubscribeMessage;
-import reactor.core.publisher.Flux;
 
 public interface MqttSubscription {
 
@@ -9,6 +8,4 @@ public interface MqttSubscription {
 
     void acknowledge();
 
-    void handlerSubscription(Flux subscriptionFlux) ;
-
 }

+ 0 - 3
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/MqttUnSubscription.java

@@ -1,13 +1,10 @@
 package org.jetlinks.community.network.mqtt.server;
 
 import io.vertx.mqtt.messages.MqttUnsubscribeMessage;
-import reactor.core.publisher.Flux;
 
 public interface MqttUnSubscription {
 
     MqttUnsubscribeMessage getMessage();
 
     void acknowledge();
-
-    void handlerUnSubscription(Flux publishFlux) ;
 }

+ 26 - 55
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttConnection.java

@@ -55,9 +55,7 @@ class VertxMqttConnection implements MqttConnection {
     private final EmitterProcessor<MqttSubscription> subscriptionProcessor = EmitterProcessor.create(false);
     private final EmitterProcessor<MqttUnSubscription> unsubscription = EmitterProcessor.create(false);
 
-    private final VertxMqttPublishing publishing = new VertxMqttPublishing(false);
-    private final VertxMqttSubscription subscription = new VertxMqttSubscription( false);
-    private final VertxMqttMqttUnSubscription unSubscription = new VertxMqttMqttUnSubscription(false);
+
     private Optional<MqttMessage> willMessage=null;
 
     private static final MqttAuth emptyAuth = new MqttAuth() {
@@ -136,6 +134,7 @@ class VertxMqttConnection implements MqttConnection {
                 this.willMessage = this.getWillMessage();
                 endpoint.accept();
             }
+            initSubscript();
         } catch (Exception e) {
             close().subscribe();
             log.warn(e.getMessage(), e);
@@ -145,6 +144,24 @@ class VertxMqttConnection implements MqttConnection {
         return this;
     }
 
+    /**
+     * 初始化监听器
+     */
+    private void initSubscript() {
+        //todo 订阅处理
+        subscriptionProcessor.subscribe(mqttSubscription -> {
+
+        });
+        //取消订阅处理 todo
+        unsubscription.subscribe(mqttUnSubscription -> {
+
+        });
+        //发布处理 todo
+        messageProcessor.subscribe(publishing -> {
+
+        });
+    }
+
     @Override
     public void heartIdleHandler(MqttEndpoint endpoint, long delay, TimeUnit unit) {
         this.pingRespTimeout= new NioEventLoopGroup(1).schedule(()->{
@@ -181,8 +198,7 @@ class VertxMqttConnection implements MqttConnection {
             })
             .publishHandler(msg -> {
                 ping();
-                publishing.setMessage(msg);
-                publishing.handlerMessage(messageProcessor);
+                VertxMqttPublishing publishing = new VertxMqttPublishing(msg,false);
                 boolean hasDownstream = this.messageProcessor.hasDownstreams();
                 if (autoAckMsg || !hasDownstream) {
                     publishing.acknowledge();
@@ -216,8 +232,7 @@ class VertxMqttConnection implements MqttConnection {
             })
             .subscribeHandler(msg -> {
                 ping();
-                subscription.setMessage(msg);
-                subscription.handlerSubscription(this.subscriptionProcessor);
+                 VertxMqttSubscription subscription = new VertxMqttSubscription( msg,false);
                 boolean hasDownstream = this.subscriptionProcessor.hasDownstreams();
                 if (autoAckSub || !hasDownstream) {
                     subscription.acknowledge();
@@ -228,8 +243,7 @@ class VertxMqttConnection implements MqttConnection {
             })
             .unsubscribeHandler(msg -> {
                 ping();
-                unSubscription.setMessage(msg);
-                unSubscription.handlerUnSubscription(this.unsubscription);
+                VertxMqttMqttUnSubscription unSubscription = new VertxMqttMqttUnSubscription(msg,false);
                 boolean hasDownstream = this.unsubscription.hasDownstreams();
                 if (autoAckUnSub || !hasDownstream) {
                     unSubscription.acknowledge();
@@ -385,17 +399,10 @@ class VertxMqttConnection implements MqttConnection {
     @AllArgsConstructor
     class VertxMqttPublishing implements MqttPublishing {
 
-        private MqttPublishMessage message;
+        private final MqttPublishMessage message;
 
         private volatile boolean acknowledged;
 
-        public VertxMqttPublishing(boolean acknowledged) {
-            this.acknowledged = acknowledged;
-        }
-
-        public void setMessage(MqttPublishMessage message) {
-            this.message = message;
-        }
 
         public void setAcknowledged(boolean acknowledged) {
             this.acknowledged = acknowledged;
@@ -420,31 +427,16 @@ class VertxMqttConnection implements MqttConnection {
                 endpoint.publishReceived(message.messageId());
             }
         }
-
-        @Override
-        public void handlerMessage(Flux publishFlux) {
-            publishFlux.subscribe(publish->{
-               //todo 发布消息
-            });
-
-        }
     }
 
     @AllArgsConstructor
     class VertxMqttSubscription implements MqttSubscription {
 
-        private MqttSubscribeMessage message;
+        private final MqttSubscribeMessage message;
 
 
         private volatile boolean acknowledged;
 
-        public VertxMqttSubscription(boolean acknowledged) {
-            this.acknowledged = acknowledged;
-        }
-
-        public void setMessage(MqttSubscribeMessage message) {
-            this.message = message;
-        }
         @Override
         public MqttSubscribeMessage getMessage() {
             return message;
@@ -459,29 +451,15 @@ class VertxMqttConnection implements MqttConnection {
             endpoint.subscribeAcknowledge(message.messageId(), message.topicSubscriptions().stream()
                 .map(MqttTopicSubscription::qualityOfService).collect(Collectors.toList()));
         }
-
-        @Override
-        public void handlerSubscription(Flux subscriptionFlux) {
-            subscriptionFlux.subscribe(result->{
-                //todo 订阅主题
-            });
-        }
     }
 
     @AllArgsConstructor
     class VertxMqttMqttUnSubscription implements MqttUnSubscription {
 
-        private MqttUnsubscribeMessage message;
+        private final MqttUnsubscribeMessage message;
 
         private volatile boolean acknowledged;
 
-        public VertxMqttMqttUnSubscription(boolean acknowledged) {
-            this.acknowledged = acknowledged;
-        }
-
-        public void setMessage(MqttUnsubscribeMessage message) {
-            this.message = message;
-        }
         @Override
         public MqttUnsubscribeMessage getMessage() {
             return message;
@@ -496,13 +474,6 @@ class VertxMqttConnection implements MqttConnection {
             acknowledged = true;
             endpoint.unsubscribeAcknowledge(message.messageId());
         }
-
-        @Override
-        public void handlerUnSubscription(Flux unSubscriptionFlux) {
-            unSubscriptionFlux.subscribe(result->{
-                //取消订阅主题
-            });
-        }
     }