|
@@ -108,22 +108,22 @@ public class AliBridgeServer extends AbstractClusterUniqueTask<AliBridgeServer>
|
|
|
if(bootstrap.isBridgeConnected()){
|
|
if(bootstrap.isBridgeConnected()){
|
|
|
return Mono.just(this);
|
|
return Mono.just(this);
|
|
|
}
|
|
}
|
|
|
- return Mono.fromRunnable(()->{
|
|
|
|
|
- bootstrap.bootstrap(new DownlinkChannelHandler() {
|
|
|
|
|
- @Override
|
|
|
|
|
- public boolean pushToDevice(Session session, String topic, byte[] payload) {
|
|
|
|
|
- if(receive.hasDownstreams()){
|
|
|
|
|
- receiveSink.next(AliBridgeMessage.of(session,topic,payload));
|
|
|
|
|
|
|
+ return Mono.just(bootstrap)
|
|
|
|
|
+ .doOnNext(ignore->{
|
|
|
|
|
+ bootstrap.bootstrap(new DownlinkChannelHandler() {
|
|
|
|
|
+ @Override
|
|
|
|
|
+ public boolean pushToDevice(Session session, String topic, byte[] payload) {
|
|
|
|
|
+ if(receive.hasDownstreams()){
|
|
|
|
|
+ receiveSink.next(AliBridgeMessage.of(session,topic,payload));
|
|
|
|
|
+ }
|
|
|
|
|
+ return false;
|
|
|
}
|
|
}
|
|
|
- return false;
|
|
|
|
|
- }
|
|
|
|
|
- @Override
|
|
|
|
|
- public boolean broadcast(String topic, byte[] payload) {
|
|
|
|
|
- return false;
|
|
|
|
|
- }
|
|
|
|
|
- });
|
|
|
|
|
- })
|
|
|
|
|
- .flatMap(ignore->bridgeService.createUpdate().set(AliIotBridgeEntity::getState,BridgeStatus.running)
|
|
|
|
|
|
|
+ @Override
|
|
|
|
|
+ public boolean broadcast(String topic, byte[] payload) {
|
|
|
|
|
+ return false;
|
|
|
|
|
+ }
|
|
|
|
|
+ });
|
|
|
|
|
+ }) .flatMap(ignore->bridgeService.createUpdate().set(AliIotBridgeEntity::getState,BridgeStatus.running)
|
|
|
.where(AliIotBridgeEntity::getId,id).execute())
|
|
.where(AliIotBridgeEntity::getId,id).execute())
|
|
|
.onErrorResume(RuntimeException.class,
|
|
.onErrorResume(RuntimeException.class,
|
|
|
e->Mono.just(isReplica())
|
|
e->Mono.just(isReplica())
|
|
@@ -321,6 +321,7 @@ public class AliBridgeServer extends AbstractClusterUniqueTask<AliBridgeServer>
|
|
|
handleStatus(this.params.getState(),false)
|
|
handleStatus(this.params.getState(),false)
|
|
|
.concatWith(bridgeService.createUpdate()
|
|
.concatWith(bridgeService.createUpdate()
|
|
|
.set(AliIotBridgeEntity::getNodeId,this.getCurrentSeverId())
|
|
.set(AliIotBridgeEntity::getNodeId,this.getCurrentSeverId())
|
|
|
|
|
+ .where(AliIotBridgeEntity::getId,id)
|
|
|
.execute()
|
|
.execute()
|
|
|
.then()
|
|
.then()
|
|
|
).subscribe();
|
|
).subscribe();
|