Jelajahi Sumber

add 设置网桥初始值

18339543638 4 tahun lalu
induk
melakukan
2b975deafc

+ 18 - 3
jetlinks-core/src/main/java/org/jetlinks/core/cluster/AbstractClusterUniqueTask.java

@@ -59,6 +59,8 @@ public abstract class AbstractClusterUniqueTask<T> implements Serializable, Clus
     private transient Disposable generatePingMsgDisposable =null;
 
     private transient boolean beMasterFirst =true;
+
+    private transient Disposable start =null;
     /**
      * 持有锁的线程
      */
@@ -77,7 +79,7 @@ public abstract class AbstractClusterUniqueTask<T> implements Serializable, Clus
         this.operationTopic=String.format(this.operationTopic,this.id);
         this.lock = redissonClient.getLock("cluster-unique-"+this.id);
         //先创建本地任务,再争夺任务的唯一锁,避免消息漏发
-        init()
+        start = init()
             .subscribe();
     }
 
@@ -108,7 +110,7 @@ public abstract class AbstractClusterUniqueTask<T> implements Serializable, Clus
                         //副本且占据锁资源,解锁
                         if (this.lock.isHeldByThread(Thread.currentThread().getId())) {
                             this.lock.unlock();
-                        }else {
+                        }else if(!this.lock.isLocked()){
                             fightLock();
                         }
                     });
@@ -135,7 +137,7 @@ public abstract class AbstractClusterUniqueTask<T> implements Serializable, Clus
      */
     private void fightLock(){
         try {
-            if (this.lock.tryLock(-1,pingTime/2, TimeUnit.SECONDS)) {
+            if (this.lock.tryLock(1,pingTime/2, TimeUnit.SECONDS)) {
                 this.setWorkerId(currentSeverId);
                 if(beMasterFirst){
                     beMasterFirst =false;
@@ -219,4 +221,17 @@ public abstract class AbstractClusterUniqueTask<T> implements Serializable, Clus
         return !this.currentSeverId.equals(this.workerId);
     }
 
+
+    public void close(){
+        if(handleDisposable!=null){
+            handleDisposable.dispose();
+        }
+        if(generatePingMsgDisposable!=null){
+            generatePingMsgDisposable.dispose();
+        }
+        if(start!=null){
+            start.dispose();
+        }
+    }
+
 }

+ 28 - 24
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/server/aliyun/AliBridgeGateway.java

@@ -6,7 +6,6 @@ import org.hswebframework.web.logger.ReactiveLogger;
 import org.jetlinks.community.bridge.core.AliBridgeCodec;
 import org.jetlinks.community.bridge.entity.AliIotBridgeEntity;
 import org.jetlinks.community.bridge.message.AliBridgeMessage;
-import org.jetlinks.community.bridge.service.AliBridgeService;
 import org.jetlinks.core.cluster.ClusterManager;
 import org.jetlinks.core.cluster.ClusterService;
 import org.jetlinks.core.device.DeviceRegistry;
@@ -20,8 +19,11 @@ import org.jetlinks.core.message.property.WritePropertyMessage;
 import org.jetlinks.supports.server.DecodedClientMessageHandler;
 import org.springframework.stereotype.Service;
 import org.springframework.util.StringUtils;
-import reactor.core.publisher.*;
+import reactor.core.publisher.EmitterProcessor;
+import reactor.core.publisher.FluxSink;
+import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
+
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
@@ -63,13 +65,16 @@ public class AliBridgeGateway extends ClusterService<AliIotBridgeEntity> {
 
     }
 
+    public boolean contains(String serverId){
+        return this.bridgeMap.containsKey(serverId);
+    }
 
     public Mono<AliBridgeServer> lookUpServer(String serverId){
         return Mono.justOrEmpty(this.bridgeMap.get(serverId));
     }
 
-    public Mono<AliBridgeServer> refreshServer(String serverId,AliBridgeServer server){
-        return Mono.justOrEmpty(this.bridgeMap.put(serverId,server));
+    public void refreshServer(String serverId,AliBridgeServer server){
+        this.bridgeMap.put(serverId,server);
     }
     /**
      * 重启网桥
@@ -87,27 +92,26 @@ public class AliBridgeGateway extends ClusterService<AliIotBridgeEntity> {
      * @return
      */
     public Mono<Void> initBridge(AliIotBridgeEntity resource,boolean broadcast){
-        return lookUpServer(resource.getId())
+        Mono<AliBridgeServer> mono=null;
+        mono=contains(resource.getId())?
+            lookUpServer(resource.getId()):
+            Mono.just(new AliBridgeServer(eventBus,registry,resource))
+                .doOnNext(server->{
+                    server.handleReceive()
+                        .parallel()
+                        .runOn(Schedulers.parallel())
+                        .flatMap(this::decodeAndHandleMessage)
+                        .subscribe();
+                })
+                .doOnNext(server -> refreshServer(resource.getId(),server))
+                .onErrorResume(error->
+                    Mono.error(()->{
+                        this.delBridgeServer(
+                            resource.getId(),broadcast);
+                        return new BusinessException(error.getMessage());
+                    }));
+        return mono
             .flatMap(server -> server.refreshBootStrap(resource,broadcast))
-            .switchIfEmpty(
-                Mono.just(new AliBridgeServer(eventBus,registry,resource))
-                    .doOnNext(server -> refreshServer(resource.getId(),server))
-                    .flatMap(server -> server.initBridge(resource,broadcast))
-                    .doOnNext(server->{
-                        server.handleReceive()
-                            .parallel()
-                            .runOn(Schedulers.parallel())
-                            .flatMap(this::decodeAndHandleMessage)
-                            .subscribe();
-                    })
-                    .onErrorResume(error->
-                        Mono.error(()->{
-                            this.delBridgeServer(
-                                resource.getId(),broadcast);
-                            return new BusinessException(error.getMessage());
-                        }))
-            )
-
             .flatMap(AliBridgeServer::refreshAllChannel);
     }
 

+ 8 - 7
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/server/aliyun/AliBridgeServer.java

@@ -295,6 +295,7 @@ public class AliBridgeServer extends AbstractClusterUniqueTask<AliBridgeServer>
                     bootstrap.disconnectBridge();
                 }catch (Exception e){}
                 bootstrap=null;
+                this.close();
             }
             changeStatus(BridgeStatus.del,broadcast);
             log.info("网桥[{}]关闭",id);
@@ -366,15 +367,15 @@ public class AliBridgeServer extends AbstractClusterUniqueTask<AliBridgeServer>
                 return bridgeGateway.delBridgeServer(id,broadcast);
             case stop:
                 return bridgeGateway.pauseBridge(id,broadcast);
-            case starting:
+//            case starting:
+//                return bridgeGateway.initBridge(params,broadcast);
+//            case running:
+//                return bridgeGateway.initBridge(params,broadcast);
+//            case fail:
+//                break;
+            default:
                 return bridgeGateway.initBridge(params,broadcast);
-            case running:
-                return bridgeGateway.initBridge(params,broadcast);
-            case fail:
-                break;
-            default:break;
         }
-        return Mono.empty();
     }
     @Override
     public void beforeHandleMsg() {