|
|
@@ -105,10 +105,7 @@ public class AliBridgeServer extends AbstractClusterUniqueTask<AliBridgeServer>
|
|
|
return Mono.just(this);
|
|
|
}
|
|
|
if (start.compareAndSet(false, true)) {
|
|
|
- if(bootstrap.isBridgeConnected()){
|
|
|
- return Mono.just(this);
|
|
|
- }
|
|
|
- return Mono.just(bootstrap)
|
|
|
+ return Mono.justOrEmpty(bootstrap)
|
|
|
.doOnNext(ignore->{
|
|
|
bootstrap.bootstrap(new DownlinkChannelHandler() {
|
|
|
@Override
|
|
|
@@ -125,11 +122,13 @@ public class AliBridgeServer extends AbstractClusterUniqueTask<AliBridgeServer>
|
|
|
});
|
|
|
}) .flatMap(ignore->bridgeService.createUpdate().set(AliIotBridgeEntity::getState,BridgeStatus.running)
|
|
|
.where(AliIotBridgeEntity::getId,id).execute())
|
|
|
- .onErrorResume(RuntimeException.class,
|
|
|
- e->Mono.just(isReplica())
|
|
|
+ .onErrorResume(Exception.class,
|
|
|
+ e->Mono.justOrEmpty(isReplica())
|
|
|
.filter(Boolean.FALSE::equals)
|
|
|
.flatMap(ignore->
|
|
|
- bridgeService.createUpdate().set(AliIotBridgeEntity::getState,BridgeStatus.fail)
|
|
|
+ bridgeService
|
|
|
+ .createUpdate()
|
|
|
+ .set(AliIotBridgeEntity::getState,BridgeStatus.fail)
|
|
|
.set(AliIotBridgeEntity::getErrorReason,e.getMessage())
|
|
|
.where(AliIotBridgeEntity::getId,id)
|
|
|
.execute()))
|
|
|
@@ -238,7 +237,7 @@ public class AliBridgeServer extends AbstractClusterUniqueTask<AliBridgeServer>
|
|
|
})
|
|
|
.thenReturn(channel)
|
|
|
)
|
|
|
- .onErrorResume(RuntimeException.class,
|
|
|
+ .onErrorResume(Exception.class,
|
|
|
e->Mono.just(isReplica())
|
|
|
.filter(Boolean.FALSE::equals)
|
|
|
.flatMap(ignore->
|