|
|
@@ -119,7 +119,8 @@ public class AliBridgeGateway{
|
|
|
*/
|
|
|
public Mono<Void> reconnect(String nodeId,String bridgeId){
|
|
|
return handleClusterOperation(nodeId,bridgeId,MessageType.restart,bridgeId)
|
|
|
- .switchIfEmpty(getServer(bridgeId).reconnect());
|
|
|
+ .filter(Boolean.FALSE::equals)
|
|
|
+ .flatMap(ignore->getServer(bridgeId).reconnect()).then();
|
|
|
}
|
|
|
/**
|
|
|
* 注册设备
|
|
|
@@ -130,8 +131,9 @@ public class AliBridgeGateway{
|
|
|
*/
|
|
|
public Mono<Void> registerDevice(String nodeId,String bridgeId,AliIotBridgeDeviceConfig config){
|
|
|
return handleClusterOperation(nodeId,bridgeId,MessageType.register,config)
|
|
|
- .switchIfEmpty(getServer(bridgeId)
|
|
|
- .register(config.getOriginalIdentity(),config.getProductKey(),config.getDeviceName(),config.getDeviceSecret()).then());
|
|
|
+ .filter(Boolean.FALSE::equals)
|
|
|
+ .flatMap(ignore->getServer(bridgeId)
|
|
|
+ .register(config.getOriginalIdentity(),config.getProductKey(),config.getDeviceName(),config.getDeviceSecret())).then();
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -143,13 +145,15 @@ public class AliBridgeGateway{
|
|
|
*/
|
|
|
public Mono<Void> unregisterDevice(String nodeId,String bridgeId,String originalIdentity){
|
|
|
return handleClusterOperation(nodeId,bridgeId,MessageType.unregister,originalIdentity)
|
|
|
- .switchIfEmpty(getServer(bridgeId).unRegister(originalIdentity));
|
|
|
+ .filter(Boolean.FALSE::equals)
|
|
|
+ .flatMap(ignore->getServer(bridgeId).unRegister(originalIdentity)).then();
|
|
|
}
|
|
|
|
|
|
public Mono<Void> initBridge(AliIotBridgeEntity bridgeEntity){
|
|
|
return handleClusterOperation(bridgeEntity.getNodeId(),
|
|
|
bridgeEntity.getId(),MessageType.init,bridgeEntity)
|
|
|
- .switchIfEmpty(Mono.justOrEmpty(getServer(bridgeEntity.getId()))
|
|
|
+ .filter(Boolean.FALSE::equals)
|
|
|
+ .flatMap(ignore->Mono.justOrEmpty(getServer(bridgeEntity.getId()))
|
|
|
.then()
|
|
|
.switchIfEmpty(AliBridgeServer.create(eventBus,registry,bridgeEntity,clusterId)
|
|
|
.doOnNext(server -> {
|
|
|
@@ -173,14 +177,15 @@ public class AliBridgeGateway{
|
|
|
bridgeEntity.getId());
|
|
|
return new BusinessException("网桥配置出错,请确定配置参数是否正确;或请勿重复启动网桥)");
|
|
|
})))
|
|
|
- .then());
|
|
|
+ .then()).then();
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
public Mono<AliBridgeServer> replaceBridgeServer(String oldNodeId,String oldBridgeId,AliIotBridgeEntity bridge){
|
|
|
return handleClusterOperation(oldNodeId,oldBridgeId,MessageType.update,bridge)
|
|
|
- .switchIfEmpty(delBridgeServer(oldNodeId,oldBridgeId))
|
|
|
+ .filter(Boolean.FALSE::equals)
|
|
|
+ .flatMap(ignore->delBridgeServer(oldNodeId,oldBridgeId))
|
|
|
.concatWith(initBridge(bridge))
|
|
|
.then(Mono.just(getServer(bridge.getId())));
|
|
|
}
|
|
|
@@ -188,9 +193,10 @@ public class AliBridgeGateway{
|
|
|
|
|
|
public Mono<Void> delBridgeServer(String nodeId,String bridgeId){
|
|
|
return handleClusterOperation(nodeId,bridgeId,MessageType.del,bridgeId)
|
|
|
- .switchIfEmpty(Mono.justOrEmpty(getServer(bridgeId))
|
|
|
+ .filter(Boolean.FALSE::equals)
|
|
|
+ .flatMap(ignore->Mono.justOrEmpty(getServer(bridgeId))
|
|
|
.filter(Objects::nonNull)
|
|
|
- .flatMap(AliBridgeServer::stopBridge));
|
|
|
+ .flatMap(AliBridgeServer::stopBridge)).then();
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -246,16 +252,17 @@ public class AliBridgeGateway{
|
|
|
* @param msg 消息
|
|
|
* @return
|
|
|
*/
|
|
|
- private Mono<Void> handleClusterOperation(String nodeId,String bridgeId, MessageType messageType, Object msg){
|
|
|
+ private Mono<Boolean> handleClusterOperation(String nodeId,String bridgeId, MessageType messageType, Object msg){
|
|
|
if(StrUtil.isNullOrUndefined(bridgeId)){
|
|
|
log.error("AliBridgeGateway.handleClusterOperation 网桥id不能为空,");
|
|
|
- return Mono.error(new BusinessException("系统内部错误"));
|
|
|
+ return Mono.error(new BusinessException("bridgeId不能为空"));
|
|
|
}
|
|
|
if(!this.clusterId.equals(nodeId)){
|
|
|
return clusterManager.getTopic(String.format(format,nodeId))
|
|
|
- .publish(Mono.just(BridgeMessage.of(clusterId,messageType,bridgeId,msg))).then();
|
|
|
+ .publish(Mono.just(BridgeMessage.of(clusterId,messageType,bridgeId,msg)))
|
|
|
+ .thenReturn(true);
|
|
|
}
|
|
|
- return Mono.empty();
|
|
|
+ return Mono.just(false);
|
|
|
}
|
|
|
|
|
|
@AllArgsConstructor(staticName = "of")
|