|
|
@@ -197,6 +197,17 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
|
|
|
}
|
|
|
gatewayMonitor.disconnected();
|
|
|
gatewayMonitor.totalConnection(counter.sum());
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 清空订阅主题
|
|
|
+ */
|
|
|
+ registry.getDevice(conn.getClientId())
|
|
|
+ .doOnNext(operator -> {
|
|
|
+ operator.getTopics().clear();
|
|
|
+ eventBus.publish(String.format("/dashboard/device/%s/changed/topics",
|
|
|
+ conn.getClientId()),new TimeSyncMessage());
|
|
|
+ }).subscribe();
|
|
|
});
|
|
|
return Tuples.of(connection.accept(), device, newSession);
|
|
|
} else {
|
|
|
@@ -217,7 +228,7 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
|
|
|
}
|
|
|
//处理已经建立连接的MQTT连接的主题订阅
|
|
|
private void handleSubscriptionTopic(Tuple3<MqttConnection, DeviceOperator, MqttConnectionSession> tuple3) {
|
|
|
- tuple3.getT1()
|
|
|
+ tuple3.getT1()
|
|
|
.handleSubscribe(true)
|
|
|
.doOnNext(topic->{
|
|
|
MqttSubscribeMessage message = topic.getMessage();
|
|
|
@@ -232,7 +243,7 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
|
|
|
|
|
|
//取消MQTT连接的主题订阅
|
|
|
private void handleUnSubscriptionTopic(Tuple3<MqttConnection, DeviceOperator, MqttConnectionSession> tuple3) {
|
|
|
- tuple3.getT1()
|
|
|
+ tuple3.getT1()
|
|
|
.handleUnSubscribe(true)
|
|
|
.doOnNext(topic->{
|
|
|
MqttUnsubscribeMessage message = topic.getMessage();
|