|
|
@@ -17,6 +17,8 @@ import org.jetlinks.community.network.mqtt.auth.MqttDefaultAuth;
|
|
|
import org.jetlinks.community.network.mqtt.gateway.device.session.MqttConnectionSession;
|
|
|
import org.jetlinks.community.network.mqtt.server.MqttConnection;
|
|
|
import org.jetlinks.community.network.mqtt.server.MqttServer;
|
|
|
+import org.jetlinks.community.network.mqtt.server.MqttSubscription;
|
|
|
+import org.jetlinks.community.network.mqtt.server.MqttUnSubscription;
|
|
|
import org.jetlinks.community.network.utils.DeviceGatewayHelper;
|
|
|
import org.jetlinks.core.ProtocolSupport;
|
|
|
import org.jetlinks.core.defaults.Authenticator;
|
|
|
@@ -216,7 +218,7 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
|
|
|
//处理已经建立连接的MQTT连接的主题订阅
|
|
|
private Mono<Tuple3<MqttConnection, DeviceOperator, MqttConnectionSession>> handleSubscriptionTopic(Tuple3<MqttConnection, DeviceOperator, MqttConnectionSession> tuple3) {
|
|
|
return tuple3.getT1()
|
|
|
- .handleSubscribe(false)
|
|
|
+ .handleSubscribe(true)
|
|
|
.doOnNext(topic->{
|
|
|
MqttSubscribeMessage message = topic.getMessage();
|
|
|
Set<String> topics = message.topicSubscriptions().stream().map(MqttTopicSubscription::topicName).collect(Collectors.toSet());
|
|
|
@@ -232,7 +234,7 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
|
|
|
//取消MQTT连接的主题订阅
|
|
|
private Mono<Tuple3<MqttConnection, DeviceOperator, MqttConnectionSession>> handleUnSubscriptionTopic(Tuple3<MqttConnection, DeviceOperator, MqttConnectionSession> tuple3) {
|
|
|
return tuple3.getT1()
|
|
|
- .handleUnSubscribe(false)
|
|
|
+ .handleUnSubscribe(true)
|
|
|
.doOnNext(topic->{
|
|
|
MqttUnsubscribeMessage message = topic.getMessage();
|
|
|
tuple3.getT2().removeTopics(message.topics());
|