|
@@ -54,7 +54,7 @@ public class AliBridgeGateway extends ClusterService<AliIotBridgeEntity> {
|
|
|
this.registry = registry;
|
|
this.registry = registry;
|
|
|
this.eventBus=eventBus;
|
|
this.eventBus=eventBus;
|
|
|
this.messageHandler = messageHandler;
|
|
this.messageHandler = messageHandler;
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
this.handleClusterMsg()
|
|
this.handleClusterMsg()
|
|
|
.flatMap(replaceMsg->
|
|
.flatMap(replaceMsg->
|
|
|
replaceBridgeServer(replaceMsg,false)
|
|
replaceBridgeServer(replaceMsg,false)
|
|
@@ -89,7 +89,7 @@ public class AliBridgeGateway extends ClusterService<AliIotBridgeEntity> {
|
|
|
return lookUpServer(resource.getId())
|
|
return lookUpServer(resource.getId())
|
|
|
.flatMap(server -> server.refreshBootStrap(resource,broadcast))
|
|
.flatMap(server -> server.refreshBootStrap(resource,broadcast))
|
|
|
.switchIfEmpty(
|
|
.switchIfEmpty(
|
|
|
- AliBridgeServerFactory.create(eventBus,registry,resource,broadcast)
|
|
|
|
|
|
|
+ Mono.just(new AliBridgeServer(eventBus,registry,resource))
|
|
|
.doOnNext(server -> refreshServer(resource.getId(),server))
|
|
.doOnNext(server -> refreshServer(resource.getId(),server))
|
|
|
.flatMap(server -> server.initBridge(resource,broadcast))
|
|
.flatMap(server -> server.initBridge(resource,broadcast))
|
|
|
.doOnNext(server->{
|
|
.doOnNext(server->{
|
|
@@ -112,10 +112,13 @@ public class AliBridgeGateway extends ClusterService<AliIotBridgeEntity> {
|
|
|
|
|
|
|
|
|
|
|
|
|
public Mono<AliBridgeServer> replaceBridgeServer(AliIotBridgeEntity bridge,boolean broadcast){
|
|
public Mono<AliBridgeServer> replaceBridgeServer(AliIotBridgeEntity bridge,boolean broadcast){
|
|
|
- return Mono.zip(Mono.just(broadcast)
|
|
|
|
|
- .filter(Boolean.TRUE::equals)
|
|
|
|
|
- .flatMap(ignore->sendClusterMsg(bridge)),initBridge(bridge,broadcast))
|
|
|
|
|
- .then(lookUpServer(bridge.getId()));
|
|
|
|
|
|
|
+ return broadcast?Mono.zip(sendClusterMsg(bridge),initBridge(bridge,broadcast)) .then(lookUpServer(bridge.getId()))
|
|
|
|
|
+ :initBridge(bridge,broadcast).then(lookUpServer(bridge.getId()));
|
|
|
|
|
+// return Mono.zip(Mono.just(broadcast)
|
|
|
|
|
+// .filter(Boolean.TRUE::equals)
|
|
|
|
|
+// .flatMap(ignore->sendClusterMsg(bridge)),
|
|
|
|
|
+// initBridge(bridge,broadcast))
|
|
|
|
|
+// .then(lookUpServer(bridge.getId()));
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
|