فهرست منبع

fixed 集群网桥

18339543638 4 سال پیش
والد
کامیت
fa4004392a

+ 1 - 1
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/cluster/ClusterUniqueWork.java

@@ -64,7 +64,7 @@ public class ClusterUniqueWork implements Worker, Serializable {
             .flatMap(provider -> {
                 DefaultExecutionContext context = createContext(job);
                 return provider.createTask(context)
-                    .map(executor -> new RuleClusterUniqueTask(schedulerId, context,executor,clusterManager,redissonClient));
+                    .map(executor -> new RuleAbstractClusterUniqueTask(schedulerId, context,executor,clusterManager,redissonClient));
             });
     }
 

+ 9 - 14
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/cluster/RuleClusterUniqueTask.java → jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/cluster/RuleAbstractClusterUniqueTask.java

@@ -3,23 +3,18 @@ package org.jetlinks.community.rule.engine.cluster;
 import lombok.*;
 import lombok.extern.slf4j.Slf4j;
 import org.jetlinks.core.cluster.ClusterManager;
-import org.jetlinks.core.cluster.ClusterUniqueTask;
+import org.jetlinks.core.cluster.AbstractClusterUniqueTask;
 import org.jetlinks.rule.engine.api.RuleData;
 import org.jetlinks.rule.engine.api.scheduler.ScheduleJob;
 import org.jetlinks.rule.engine.api.task.ExecutableTaskExecutor;
 import org.jetlinks.rule.engine.api.task.Task;
 import org.jetlinks.rule.engine.api.task.TaskExecutor;
 import org.jetlinks.rule.engine.defaults.AbstractExecutionContext;
-import org.redisson.api.RLock;
 import org.redisson.api.RedissonClient;
-import reactor.core.Disposable;
 import reactor.core.publisher.*;
 import reactor.core.scheduler.Schedulers;
 import java.io.Serializable;
-import java.time.Duration;
-import java.util.concurrent.*;
 import java.util.*;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * @author lifang
@@ -36,7 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
  */
 @Slf4j
 @EqualsAndHashCode(callSuper = false)
-class RuleClusterUniqueTask extends ClusterUniqueTask implements Task ,Serializable{
+class RuleAbstractClusterUniqueTask extends AbstractClusterUniqueTask implements Task ,Serializable{
 
     private static final long serialVersionUID=1L;
 
@@ -65,11 +60,11 @@ class RuleClusterUniqueTask extends ClusterUniqueTask implements Task ,Serializa
 
     private transient final TaskExecutor executor;
 
-    RuleClusterUniqueTask(String schedulerId,
-                          AbstractExecutionContext context,
-                          TaskExecutor executor,
-                          ClusterManager clusterManager,
-                          RedissonClient redissonClient) {
+    RuleAbstractClusterUniqueTask(String schedulerId,
+                                  AbstractExecutionContext context,
+                                  TaskExecutor executor,
+                                  ClusterManager clusterManager,
+                                  RedissonClient redissonClient) {
         super(context.getInstanceId()+"-"+context.getJob().getNodeId(),clusterManager,redissonClient);
         this.schedulerId = schedulerId;
         this.context = context;
@@ -141,9 +136,9 @@ class RuleClusterUniqueTask extends ClusterUniqueTask implements Task ,Serializa
     }
 
     @Override
-    public Mono<Void> handlePing(ClusterUniqueTask task) {
+    public Mono<Void> handlePing(AbstractClusterUniqueTask task) {
         //心跳信息
-        RuleClusterUniqueTask uniqueTask = (RuleClusterUniqueTask) task;
+        RuleAbstractClusterUniqueTask uniqueTask = (RuleAbstractClusterUniqueTask) task;
         if (task.getCurrentSeverId().equals(this.getWorkerId())) {
             return Mono.empty();
         }

+ 41 - 53
jetlinks-core/src/main/java/org/jetlinks/core/cluster/ClusterUniqueTask.java → jetlinks-core/src/main/java/org/jetlinks/core/cluster/AbstractClusterUniqueTask.java

@@ -2,7 +2,6 @@ package org.jetlinks.core.cluster;
 
 import lombok.AllArgsConstructor;
 import lombok.Data;
-import org.redisson.RedissonTopic;
 import org.redisson.api.RLock;
 import org.redisson.api.RedissonClient;
 import reactor.core.Disposable;
@@ -12,23 +11,20 @@ import reactor.core.scheduler.Schedulers;
 
 import java.io.Serializable;
 import java.time.Duration;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * @author lifang
  * @version 1.0.0
- * @ClassName ClusterUniqueTask.java
- * @Description TODO
- * @see  RuleClusterUniqueTask
+ * @ClassName AbstractClusterUniqueTask.java
+ * @Description 集群唯一任务
+ * @see
  * @createTime 2021年11月27日 10:54:00
  */
 @AllArgsConstructor
 @Data
-public abstract class ClusterUniqueTask<T> implements Serializable {
+public abstract class AbstractClusterUniqueTask<T> implements Serializable {
     private static final long serialVersionUID=1L;
     private String id;
     /**
@@ -62,6 +58,7 @@ public abstract class ClusterUniqueTask<T> implements Serializable {
 
     private transient Disposable generatePingMsgDisposable =null;
 
+    private transient boolean beMasterFirst =true;
     /**
      * 持有锁的线程
      */
@@ -69,9 +66,9 @@ public abstract class ClusterUniqueTask<T> implements Serializable {
     private transient ExecutorService heldLockThread= new ThreadPoolExecutor(1, 1,
         Integer.MAX_VALUE, TimeUnit.SECONDS, new ArrayBlockingQueue(1024),r->new Thread(r,"cluster-task"),new ThreadPoolExecutor.DiscardOldestPolicy());
 
-    public ClusterUniqueTask(String id,
-                             ClusterManager clusterManager,
-                             RedissonClient redissonClient) {
+    public AbstractClusterUniqueTask(String id,
+                                     ClusterManager clusterManager,
+                                     RedissonClient redissonClient) {
         this.id=id;
         this.currentSeverId=clusterManager.getCurrentServerId();
         this.workerId=null;
@@ -84,7 +81,9 @@ public abstract class ClusterUniqueTask<T> implements Serializable {
             .subscribe();
     }
 
-
+    public void setWorkerId(String workerId) {
+        this.workerId = workerId;
+    }
 
     private Mono<Void> init(){
 
@@ -103,28 +102,14 @@ public abstract class ClusterUniqueTask<T> implements Serializable {
     private Mono<Void> generatePingMsg() {
         return Flux.interval(Duration.ofSeconds(pingTime/2))
             .filter(ignore->{
+                //是副本则争夺锁资源
                 if (isReplica()) {
                     this.heldLockThread.execute(()->{
+                        //副本且占据锁资源,解锁
                         if (this.lock.isHeldByThread(Thread.currentThread().getId())) {
-                            //非工作节点且占据锁,则解锁
                             this.lock.unlock();
                         }else if(!this.isAlive.get()&&!this.lock.isLocked()){
-                            //争夺锁
-                            try {
-                                //获取锁
-                                if (this.lock.tryLock(-1, pingTime, TimeUnit.SECONDS)) {
-                                    //获取锁成功后改写工作节点id
-                                    this.workerId=this.currentSeverId;
-                                    this.isAlive.set(true);
-                                }else {
-                                    //失败后停止发送心跳(若存在)
-                                    this.workerId=null;
-                                    if(this.generatePingMsgDisposable!=null&&!this.generatePingMsgDisposable.isDisposed()){
-                                        this.generatePingMsgDisposable.dispose();
-                                    }
-                                }
-                            }catch (InterruptedException e){
-                            }
+                            fightLock();
                         }
                     });
                     return false;
@@ -133,27 +118,8 @@ public abstract class ClusterUniqueTask<T> implements Serializable {
             })
             .doOnNext(ignore->{
                 //锁续约
-                AtomicBoolean finished= new AtomicBoolean(false);
-                this.heldLockThread.execute(()->{
-                    try {
-                        if (this.lock.tryLock(-1,pingTime/2, TimeUnit.SECONDS)) {
-                            this.workerId=this.currentSeverId;
-                        }else {
-                            this.workerId=null;
-                            if(this.generatePingMsgDisposable!=null&&!this.generatePingMsgDisposable.isDisposed()){
-                                this.generatePingMsgDisposable.dispose();
-                            }
-                        }
-
-                    } catch (InterruptedException e) {
-                    }finally {
-                        finished.set(true);
-                    }
-                });
-                while (!finished.get()){
-                }
+                this.heldLockThread.execute(this::fightLock);
                 if(!isReplica()){
-                    beMasterPostProcessor();
                     if(this.generatePingMsgDisposable==null||this.generatePingMsgDisposable.isDisposed()){
                         this.generatePingMsgDisposable = this.clusterManager.getTopic(pingTopic).publish(Mono.just(this)).subscribe();
                     }
@@ -163,6 +129,28 @@ public abstract class ClusterUniqueTask<T> implements Serializable {
             .then();
     }
 
+
+    /**
+     * 争夺锁资源
+     *
+     */
+    private void fightLock(){
+        try {
+            if (this.lock.tryLock(-1,pingTime/2, TimeUnit.SECONDS)) {
+                this.setWorkerId(currentSeverId);
+                if(beMasterFirst){
+                    beMasterFirst =false;
+                    beMasterPostProcessor();
+                }
+            }else {
+                beMasterFirst=true;
+                this.setWorkerId(null);
+                if(this.generatePingMsgDisposable!=null&&!this.generatePingMsgDisposable.isDisposed()){
+                    this.generatePingMsgDisposable.dispose();
+                }
+            }
+        }catch (InterruptedException e){}
+    }
     /**
      * 当该节点变为集群主节点时,调用该方法进行处理操作
      */
@@ -184,7 +172,7 @@ public abstract class ClusterUniqueTask<T> implements Serializable {
     /**
      * 处理心跳信息
      */
-    public abstract Mono<Void> handlePing(ClusterUniqueTask task);
+    public abstract Mono<Void> handlePing(AbstractClusterUniqueTask task);
 
     /**
      * 处理消息
@@ -214,8 +202,8 @@ public abstract class ClusterUniqueTask<T> implements Serializable {
                             }
                             Object message = obj.getMessage();
                             this.isAlive.set(true);
-                            if (message instanceof ClusterUniqueTask) {
-                                return handlePing((ClusterUniqueTask) message);
+                            if (message instanceof AbstractClusterUniqueTask) {
+                                return handlePing((AbstractClusterUniqueTask) message);
                             }
                             return handleMsg(message);
                         })

+ 21 - 25
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/server/aliyun/AliBridgeGateway.java

@@ -58,6 +58,9 @@ public class AliBridgeGateway{
         return Mono.justOrEmpty(this.bridgeMap.get(serverId));
     }
 
+    public Mono<AliBridgeServer> refreshServer(String serverId,AliBridgeServer server){
+        return Mono.justOrEmpty(this.bridgeMap.put(serverId,server));
+    }
     /**
      * 重启网桥
      * @param bridgeId 网桥id
@@ -75,31 +78,24 @@ public class AliBridgeGateway{
      */
     public Mono<Void> initBridge(AliIotBridgeEntity resource,boolean broadcast){
         return lookUpServer(resource.getId())
-            .then()
-            .switchIfEmpty(AliBridgeServerFactory.create(eventBus,registry,resource,broadcast)
-                .doOnNext(server -> {
-                    AliBridgeServer oldServer =
-                        this.bridgeMap.put(server.getId(),server);
-                    if(!server.equals(oldServer)&&oldServer!=null){
-                        oldServer.pauseBridge(broadcast)
-                            .concatWith(server.reconnect(broadcast)).subscribe();
-                    }
-                })
-                .doOnNext(server->{
-                    server.handleReceive()
-                        .parallel()
-                        .runOn(Schedulers.parallel())
-                        .flatMap(this::decodeAndHandleMessage)
-                        .subscribe();
-                })
-                .then()
-                .onErrorResume(error->
-                    Mono.error(()->{
-                        this.delBridgeServer(
-                            resource.getId(),broadcast);
-                        return new BusinessException(error.getMessage());
-                    })))
-            .then();
+            .flatMap(server -> server.refreshBootStrap(resource,broadcast))
+            .switchIfEmpty(
+                AliBridgeServerFactory.create(eventBus,registry,resource,broadcast)
+                    .doOnNext(server -> refreshServer(resource.getId(),server))
+                    .doOnNext(server->{
+                        server.handleReceive()
+                            .parallel()
+                            .runOn(Schedulers.parallel())
+                            .flatMap(this::decodeAndHandleMessage)
+                            .subscribe();
+                    })
+                    .onErrorResume(error->
+                        Mono.error(()->{
+                            this.delBridgeServer(
+                                resource.getId(),broadcast);
+                            return new BusinessException(error.getMessage());
+                        }))
+            ).flatMap(server -> server.initBridge(resource,broadcast));
     }
 
 

+ 38 - 26
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/server/aliyun/AliBridgeServer.java

@@ -21,8 +21,8 @@ import org.jetlinks.community.bridge.entity.AliIotBridgeEntity;
 import org.jetlinks.community.bridge.enums.BridgeStatus;
 import org.jetlinks.community.bridge.message.AliBridgeMessage;
 import org.jetlinks.community.bridge.server.Channel;
+import org.jetlinks.core.cluster.AbstractClusterUniqueTask;
 import org.jetlinks.core.cluster.ClusterManager;
-import org.jetlinks.core.cluster.ClusterUniqueTask;
 import org.jetlinks.core.device.DeviceOperator;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.event.EventBus;
@@ -45,7 +45,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
  * @createTime 2021年11月30日 11:01:00
  */
 @Slf4j
-public class AliBridgeServer extends ClusterUniqueTask<AliBridgeServer> {
+public class AliBridgeServer extends AbstractClusterUniqueTask<AliBridgeServer> {
     private transient Map<String, Channel> channelMap=new ConcurrentHashMap<>();
 
     private transient final EventBus eventBus;
@@ -82,34 +82,15 @@ public class AliBridgeServer extends ClusterUniqueTask<AliBridgeServer> {
     }
 
     public Mono<Void> initBridge(AliIotBridgeEntity params,boolean broadcast){
-        AliIotBridgeEntity.AccessConfig accessConfig = params.getAccessConfig();
-        String productKey = accessConfig.getProductKey();
-        Assert.notNull(productKey,"创建网桥 productKey 不能为空, mapping id {%s}",params.getId());
-        String accessKey = accessConfig.getAccessKey();
-        Assert.notNull(accessKey,"创建网桥 accessKey 不能为空, mapping id {%s}",params.getId());
-        String accessSecret = accessConfig.getAccessSecret();
-        Assert.notNull(accessSecret,"创建网桥 accessSecret 不能为空, mapping id {%s}",params.getId());
-        String authEndpoint = accessConfig.getAuthEndpoint();
-        Assert.notNull(authEndpoint,"创建网桥 authEndpoint 不能为空, mapping id {%s}",params.getId());
-        String http2Endpoint = accessConfig.getHttp2Endpoint();
-        Assert.notNull(http2Endpoint,"创建网桥 http2Endpoint 不能为空, mapping id {%s}",params.getId());
-        String regionId = accessConfig.getRegionId();
-        Assert.notNull(regionId,"创建网桥 regionId 不能为空, mapping id {%s}",params.getId());
         this.params=params;
+        this.status=params.getState();
+        verify(params);
+        refreshBridgeConfig(params);
+        bootstrap=new DefaultBridgeBootstrap(params.getId(),bridgeConfigManager);
         if(broadcast){
             //发送广播消息
             getClusterOperationTopic().publish(Mono.just(OperationMessage.builder().param(params).init(true).build())).subscribe();
         }
-        if(isReplica()){
-            return Mono.empty();
-        }
-
-        if(start.get()){
-            return Mono.empty();
-        }
-        this.status=params.getState();
-        bridgeConfigManager = DefaultBridgeConfigManager.of(params.getId(),productKey, null, null, http2Endpoint, authEndpoint, MapUtil.of(Pair.of(BridgeConfigConsts.BRIDGE_INSTANCEID_KEY,params.getAccessConfig().getIotInstanceId())), getPopClientProfile(accessKey, accessSecret, regionId));
-        bootstrap=new DefaultBridgeBootstrap(params.getId(),bridgeConfigManager);
         //非主节点、非运行状态、已启动
         if(isReplica()||!BridgeStatus.running.equals(status)||start.get()){
             return Mono.empty();
@@ -138,6 +119,37 @@ public class AliBridgeServer extends ClusterUniqueTask<AliBridgeServer> {
         return Mono.empty();
     }
 
+    public Mono<AliBridgeServer> refreshBootStrap(AliIotBridgeEntity params,boolean broadcast){
+        if (this.bootstrap!=null) {
+            this.bootstrap.disconnectBridge();
+        }
+        start.set(false);
+        return initBridge(params,broadcast)
+            .thenReturn(this);
+    }
+
+
+    private void verify(AliIotBridgeEntity params){
+        AliIotBridgeEntity.AccessConfig accessConfig = params.getAccessConfig();
+        Assert.notNull(accessConfig.getProductKey(),"创建网桥 productKey 不能为空, mapping id {%s}",params.getId());
+        Assert.notNull(accessConfig.getAccessKey(),"创建网桥 accessKey 不能为空, mapping id {%s}",params.getId());
+        Assert.notNull(accessConfig.getAccessSecret(),"创建网桥 accessSecret 不能为空, mapping id {%s}",params.getId());
+        Assert.notNull(accessConfig.getAuthEndpoint(),"创建网桥 authEndpoint 不能为空, mapping id {%s}",params.getId());
+        Assert.notNull(accessConfig.getHttp2Endpoint(),"创建网桥 http2Endpoint 不能为空, mapping id {%s}",params.getId());
+        Assert.notNull(accessConfig.getRegionId(),"创建网桥 regionId 不能为空, mapping id {%s}",params.getId());
+    }
+
+    private void refreshBridgeConfig(AliIotBridgeEntity params){
+        AliIotBridgeEntity.AccessConfig accessConfig = params.getAccessConfig();
+        String productKey = accessConfig.getProductKey();
+        String accessKey = accessConfig.getAccessKey();
+        String accessSecret = accessConfig.getAccessSecret();
+        String authEndpoint = accessConfig.getAuthEndpoint();
+        String http2Endpoint = accessConfig.getHttp2Endpoint();
+        String regionId = accessConfig.getRegionId();
+        bridgeConfigManager = DefaultBridgeConfigManager.of(params.getId(),productKey, null, null, http2Endpoint, authEndpoint, MapUtil.of(Pair.of(BridgeConfigConsts.BRIDGE_INSTANCEID_KEY,params.getAccessConfig().getIotInstanceId())), getPopClientProfile(accessKey, accessSecret, regionId));
+    }
+
 
     private PopClientConfiguration getPopClientProfile(@NotNull String accessKey,@NotNull String accessSecret,@NotNull String endpoint){
         PopClientConfiguration popClientConfiguration = new PopClientConfiguration();
@@ -309,7 +321,7 @@ public class AliBridgeServer extends ClusterUniqueTask<AliBridgeServer> {
     }
 
     @Override
-    public Mono<Void> handlePing(ClusterUniqueTask task) {
+    public Mono<Void> handlePing(AbstractClusterUniqueTask task) {
         if(task instanceof  AliBridgeServer){
             AliBridgeServer server= (AliBridgeServer) task;
             changeStatus(server.status,false);

+ 3 - 4
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/server/aliyun/AliBridgeServerFactory.java

@@ -21,9 +21,8 @@ public class AliBridgeServerFactory {
 
 
     public static Mono<AliBridgeServer> create(EventBus eventBus, DeviceRegistry deviceRegistry, AliIotBridgeEntity bridge,boolean broadcast) {
-        AliBridgeServer aliBridgeServer = new AliBridgeServer(eventBus,deviceRegistry,bridge.getId());
-        return  Mono.just(aliBridgeServer)
-            .flatMap(server->server.initBridge(bridge,broadcast))
-            .thenReturn(aliBridgeServer);
+        return  Mono.just( new AliBridgeServer(eventBus,deviceRegistry,bridge.getId()));
+//            .flatMap(server->server.initBridge(bridge,broadcast))
+//            .thenReturn(aliBridgeServer);
     }
 }