|
@@ -124,8 +124,8 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
|
|
|
.publishOn(Schedulers.parallel())
|
|
.publishOn(Schedulers.parallel())
|
|
|
.flatMap(this::handleConnection)
|
|
.flatMap(this::handleConnection)
|
|
|
.flatMap(tuple3 -> handleAuthResponse(tuple3.getT1(), tuple3.getT2(), tuple3.getT3()))
|
|
.flatMap(tuple3 -> handleAuthResponse(tuple3.getT1(), tuple3.getT2(), tuple3.getT3()))
|
|
|
- .flatMap(this::handleSubscriptionTopic)
|
|
|
|
|
- .flatMap(this::handleUnSubscriptionTopic)
|
|
|
|
|
|
|
+ .doOnNext(this::handleSubscriptionTopic)
|
|
|
|
|
+ .doOnNext(this::handleUnSubscriptionTopic)
|
|
|
.flatMap(tp -> handleAcceptedMqttConnection(tp.getT1(), tp.getT2(), tp.getT3()), Integer.MAX_VALUE)
|
|
.flatMap(tp -> handleAcceptedMqttConnection(tp.getT1(), tp.getT2(), tp.getT3()), Integer.MAX_VALUE)
|
|
|
.onErrorContinue((err, obj) -> log.error("处理MQTT连接失败", err))
|
|
.onErrorContinue((err, obj) -> log.error("处理MQTT连接失败", err))
|
|
|
.subscriberContext(ReactiveLogger.start("network", mqttServer.getId()))
|
|
.subscriberContext(ReactiveLogger.start("network", mqttServer.getId()))
|
|
@@ -216,8 +216,8 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
|
|
|
}));
|
|
}));
|
|
|
}
|
|
}
|
|
|
//处理已经建立连接的MQTT连接的主题订阅
|
|
//处理已经建立连接的MQTT连接的主题订阅
|
|
|
- private Mono<Tuple3<MqttConnection, DeviceOperator, MqttConnectionSession>> handleSubscriptionTopic(Tuple3<MqttConnection, DeviceOperator, MqttConnectionSession> tuple3) {
|
|
|
|
|
- return tuple3.getT1()
|
|
|
|
|
|
|
+ private void handleSubscriptionTopic(Tuple3<MqttConnection, DeviceOperator, MqttConnectionSession> tuple3) {
|
|
|
|
|
+ tuple3.getT1()
|
|
|
.handleSubscribe(true)
|
|
.handleSubscribe(true)
|
|
|
.doOnNext(topic->{
|
|
.doOnNext(topic->{
|
|
|
MqttSubscribeMessage message = topic.getMessage();
|
|
MqttSubscribeMessage message = topic.getMessage();
|
|
@@ -227,13 +227,12 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
|
|
|
.flatMap(ignore->
|
|
.flatMap(ignore->
|
|
|
eventBus.publish(String.format("/dashboard/device/%s/changed/topics",
|
|
eventBus.publish(String.format("/dashboard/device/%s/changed/topics",
|
|
|
tuple3.getT2().getDeviceId()),new TimeSyncMessage()))
|
|
tuple3.getT2().getDeviceId()),new TimeSyncMessage()))
|
|
|
- .collectList()
|
|
|
|
|
- .map(ignore->tuple3);
|
|
|
|
|
|
|
+ .subscribe();
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
//取消MQTT连接的主题订阅
|
|
//取消MQTT连接的主题订阅
|
|
|
- private Mono<Tuple3<MqttConnection, DeviceOperator, MqttConnectionSession>> handleUnSubscriptionTopic(Tuple3<MqttConnection, DeviceOperator, MqttConnectionSession> tuple3) {
|
|
|
|
|
- return tuple3.getT1()
|
|
|
|
|
|
|
+ private void handleUnSubscriptionTopic(Tuple3<MqttConnection, DeviceOperator, MqttConnectionSession> tuple3) {
|
|
|
|
|
+ tuple3.getT1()
|
|
|
.handleUnSubscribe(true)
|
|
.handleUnSubscribe(true)
|
|
|
.doOnNext(topic->{
|
|
.doOnNext(topic->{
|
|
|
MqttUnsubscribeMessage message = topic.getMessage();
|
|
MqttUnsubscribeMessage message = topic.getMessage();
|
|
@@ -242,8 +241,7 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
|
|
|
.flatMap(ignore->
|
|
.flatMap(ignore->
|
|
|
eventBus.publish(String.format("/dashboard/device/%s/changed/topics",
|
|
eventBus.publish(String.format("/dashboard/device/%s/changed/topics",
|
|
|
tuple3.getT2().getDeviceId()),new TimeSyncMessage()))
|
|
tuple3.getT2().getDeviceId()),new TimeSyncMessage()))
|
|
|
- .collectList()
|
|
|
|
|
- .map(ignore->tuple3);
|
|
|
|
|
|
|
+ .subscribe();
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
//处理已经建立连接的MQTT连接
|
|
//处理已经建立连接的MQTT连接
|