18339543638 4 jaren geleden
bovenliggende
commit
c037b6c22c

+ 13 - 10
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttConnection.java

@@ -202,11 +202,12 @@ class VertxMqttConnection implements MqttConnection {
                 boolean hasDownstream = this.messageProcessor.hasDownstreams();
                 if (autoAckMsg || !hasDownstream) {
                     publishing.acknowledge();
-                    this.publishingFluxSink.next(publishing);
-                }
-                if (hasDownstream) {
-                    this.publishingFluxSink.next(publishing);
+//                    this.publishingFluxSink.next(publishing);
                 }
+                this.publishingFluxSink.next(publishing);
+//                if (hasDownstream) {
+//                    this.publishingFluxSink.next(publishing);
+//                }
             })
             //QoS 1 PUBACK
             .publishAcknowledgeHandler(messageId -> {
@@ -237,9 +238,10 @@ class VertxMqttConnection implements MqttConnection {
                 if (autoAckSub || !hasDownstream) {
                     subscription.acknowledge();
                 }
-                if (hasDownstream) {
-                    this.subscriptionProcessor.onNext(subscription);
-                }
+                this.subscriptionProcessor.onNext(subscription);
+//                if (hasDownstream) {
+//                    this.subscriptionProcessor.onNext(subscription);
+//                }
             })
             .unsubscribeHandler(msg -> {
                 ping();
@@ -248,9 +250,10 @@ class VertxMqttConnection implements MqttConnection {
                 if (autoAckUnSub || !hasDownstream) {
                     unSubscription.acknowledge();
                 }
-                if (hasDownstream) {
-                    this.unsubscription.onNext(unSubscription);
-                }
+                this.unsubscription.onNext(unSubscription);
+//                if (hasDownstream) {
+//                    this.unsubscription.onNext(unSubscription);
+//                }
             });
     }