Jelajahi Sumber

fixed 集群网桥

18339543638 4 tahun lalu
induk
melakukan
d52067b856

+ 52 - 53
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/cluster/RuleClusterUniqueTask.java

@@ -65,13 +65,6 @@ class RuleClusterUniqueTask extends ClusterUniqueTask implements Task ,Serializa
 
     private transient final TaskExecutor executor;
 
-    /**
-     * 持有锁的线程
-     */
-
-    private transient ExecutorService heldLockThread= new ThreadPoolExecutor(1, 1,
-        Integer.MAX_VALUE, TimeUnit.SECONDS, new ArrayBlockingQueue(1024),r->new Thread(r,"cluster-task"),new ThreadPoolExecutor.DiscardOldestPolicy());
-
     RuleClusterUniqueTask(String schedulerId,
                           AbstractExecutionContext context,
                           TaskExecutor executor,
@@ -140,6 +133,7 @@ class RuleClusterUniqueTask extends ClusterUniqueTask implements Task ,Serializa
 
     @Override
     public void beforeHandleMsg() {
+        //判断是否为副本节点,若为副本节点,则停止运行
         if(isReplica()&&this.executor.getState().equals(State.running)){
             this.taskState=State.running;
             this.executor.pause();
@@ -215,57 +209,59 @@ class RuleClusterUniqueTask extends ClusterUniqueTask implements Task ,Serializa
     private Mono<Void> operation(OperationMessage message) {
         TaskOperation operation=message.operation;
         this.taskState=State.unknown.equals(operation.getState())?this.taskState:operation.getState();
-        if(isReplica()){
+        if(!isReplica()){
+            long currentTimeMillis = System.currentTimeMillis();
+            this.lastStateTime=currentTimeMillis;
+            //该任务不是其他任务的副本,即运行实例位于该机器中,改变状态后进行广播操作
+            switch (operation){
+                case JOB:
+                    return Mono.fromRunnable(() -> {
+                        ScheduleJob job = (ScheduleJob) message.getParams().get(0);
+                        ScheduleJob old = context.getJob();
+                        context.setJob(job);
+                        try {
+                            executor.validate();
+                        } catch (Throwable e) {
+                            context.setJob(old);
+                            throw e;
+                        }
+                    });
+                case START:
+                    return Mono.<Void>fromRunnable(this.executor::start)
+                        .doOnSuccess((v) ->this. startTime =currentTimeMillis)
+                        .subscribeOn(Schedulers.boundedElastic());
+                case PAUSE:
+                    return Mono.fromRunnable(this.executor::pause);
+                case RELOAD:
+                    return Mono.<Void>fromRunnable(this.executor::reload)
+                        .subscribeOn(Schedulers.boundedElastic());
+                case SHUTDOWN:
+                    this.taskState=State.shutdown;
+                    //解锁
+                    this.getHeldLockThread().execute(()->this.getLock().unlock());
+                    break;
+                case ENABLE_DEBUG:
+                    return Mono.fromRunnable(() ->this. context.setDebug(true));
+                case DISABLE_DEBUG:
+                    return Mono.fromRunnable(() -> this.context.setDebug(false));
+                case EXECUTE:
+                    if(this.executor instanceof ExecutableTaskExecutor){
+                        RuleData data = (RuleData) message.getParams().get(0);
+                        return ((ExecutableTaskExecutor) this.executor).execute(data);
+                    }
+                    return Mono.empty();
+
+                default:break;
+            }
+        }
+        if(message.isNeedSend()){
             //当前为任务副本或重新加载不在此处进行广播操作
             if(!TaskOperation.JOB.equals(message.operation)&&!TaskOperation.RELOAD.equals(message.getOperation())){
                 return  this.getClusterManager().getTopic(this.getOperationTopic())
-                    .publish(Mono.just(message))
+                    .publish(Mono.just(message).doOnNext(operationMessage -> operationMessage.setNeedSend(false)))
                     .then();
             }
         }
-        long currentTimeMillis = System.currentTimeMillis();
-        this.lastStateTime=currentTimeMillis;
-        //该任务不是其他任务的副本,即运行实例位于该机器中,改变状态后进行广播操作
-        switch (operation){
-            case JOB:
-                return Mono.fromRunnable(() -> {
-                    ScheduleJob job = (ScheduleJob) message.getParams().get(0);
-                    ScheduleJob old = context.getJob();
-                    context.setJob(job);
-                    try {
-                        executor.validate();
-                    } catch (Throwable e) {
-                        context.setJob(old);
-                        throw e;
-                    }
-                });
-            case START:
-                return Mono.<Void>fromRunnable(this.executor::start)
-                    .doOnSuccess((v) ->this. startTime =currentTimeMillis)
-                    .subscribeOn(Schedulers.boundedElastic());
-            case PAUSE:
-                return Mono.fromRunnable(this.executor::pause);
-            case RELOAD:
-                return Mono.<Void>fromRunnable(this.executor::reload)
-                    .subscribeOn(Schedulers.boundedElastic());
-            case SHUTDOWN:
-                this.taskState=State.shutdown;
-                //解锁
-                this.heldLockThread.execute(()->this.getLock().unlock());
-                break;
-            case ENABLE_DEBUG:
-                return Mono.fromRunnable(() ->this. context.setDebug(true));
-            case DISABLE_DEBUG:
-                return Mono.fromRunnable(() -> this.context.setDebug(false));
-            case EXECUTE:
-                if(this.executor instanceof ExecutableTaskExecutor){
-                    RuleData data = (RuleData) message.getParams().get(0);
-                    return ((ExecutableTaskExecutor) this.executor).execute(data);
-                }
-                return Mono.empty();
-
-            default:break;
-        }
         return Mono.empty();
     }
 
@@ -315,7 +311,10 @@ class RuleClusterUniqueTask extends ClusterUniqueTask implements Task ,Serializa
         private TaskOperation operation;
         private List<Object> params;
         private String fromServerId;
-
+        /**
+         * 该信息是否需要发送
+         */
+        private boolean needSend=true;
         private OperationMessage(TaskOperation operation, List<Object> params, String fromServerId) {
             this.operation = operation;
             this.params = params;

+ 5 - 0
jetlinks-core/src/main/java/org/jetlinks/core/cluster/ClusterUniqueTask.java

@@ -2,6 +2,7 @@ package org.jetlinks.core.cluster;
 
 import lombok.AllArgsConstructor;
 import lombok.Data;
+import org.redisson.RedissonTopic;
 import org.redisson.api.RLock;
 import org.redisson.api.RedissonClient;
 import reactor.core.Disposable;
@@ -22,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
  * @version 1.0.0
  * @ClassName ClusterUniqueTask.java
  * @Description TODO
+ * @see  RuleClusterUniqueTask
  * @createTime 2021年11月27日 10:54:00
  */
 @AllArgsConstructor
@@ -91,6 +93,9 @@ public abstract class ClusterUniqueTask<T> implements Serializable {
             .then();
     }
 
+    public ClusterTopic<Object>  getClusterOperationTopic(){
+        return getClusterManager().getTopic(getOperationTopic());
+    }
     /**
      * 产生心跳信息
      * @return

+ 0 - 12
jetlinks-core/src/main/java/org/jetlinks/core/rpc/ClusterRpcConnector.java

@@ -1,12 +0,0 @@
-package org.jetlinks.core.rpc;
-
-/**
- * @author lifang
- * @version 1.0.0
- * @ClassName ClusterRpcConnector.java
- * @Description 集群远程调用通信
- * @createTime 2021年12月09日 10:18:00
- */
-public class ClusterRpcConnector {
-
-}

+ 0 - 22
jetlinks-core/src/main/java/org/jetlinks/core/rpc/RpcRequest.java

@@ -1,22 +0,0 @@
-package org.jetlinks.core.rpc;
-
-import lombok.Data;
-
-import java.io.Serializable;
-
-/**
- * @author lifang
- * @version 1.0.0
- * @ClassName RpcRequest.java
- * @Description TODO
- * @createTime 2021年12月09日 10:20:00
- */
-@Data
-public class RpcRequest implements Serializable {
-    public static final long serializable=1L;
-    private String requestId;
-    private Object[] params;
-    private long timeout;
-    private Class<?> requestClass;
-    private String methodName;
-}

+ 6 - 0
jetlinks-manager/bridge-manager/pom.xml

@@ -27,6 +27,12 @@
             <version>${aliyun.iot.sdk.version}</version>
         </dependency>
 
+
+        <dependency>
+            <groupId>cn.hutool</groupId>
+            <artifactId>hutool-all</artifactId>
+        </dependency>
+
         <dependency>
             <groupId>com.aliyun.openservices</groupId>
             <artifactId>iot-as-bridge-sdk-core</artifactId>

+ 1 - 0
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/enums/BridgeStatus.java

@@ -18,6 +18,7 @@ import org.hswebframework.web.dict.EnumDict;
 public enum BridgeStatus implements EnumDict<String> {
     running("运行中"),
     fail("运行失败"),
+    del("已删除"),
     stop("暂停");
 
     private String text;

+ 0 - 67
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/server/BridgeServer.java

@@ -1,67 +0,0 @@
-package org.jetlinks.community.bridge.server;
-
-import org.jetlinks.community.bridge.message.AliBridgeMessage;
-import org.jetlinks.community.bridge.server.aliyun.AliBridgeServer;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import reactor.util.function.Tuple4;
-
-import javax.validation.constraints.NotNull;
-import java.util.Collection;
-import java.util.Map;
-
-/**
- * @author lifang
- * @version 1.0.0
- * @ClassName Server.java
- * @Description 网桥服务器
- * @see Channel 设备网桥
- * @createTime 2021年11月26日 09:12:00
- */
-public interface BridgeServer {
-    /**
-     * 校验网桥参数
-     * @param params 网桥构造参数
-     * @return
-     */
-    boolean verify(Map<String,Object> params);
-
-    /**
-     * 网桥组id
-     * @return
-     */
-    String productId();
-
-    /**
-     * 注册设备通道
-     * @param bridgeId 网桥id
-     * @param originalIdentity 设备原始id,即网桥设备id
-     * @param productKey 产品id
-     * @param deviceName 网桥配置
-     * @see   AliBridgeServer
-     * @return
-     */
-    Mono<Channel> register(@NotNull String bridgeId, @NotNull String originalIdentity, @NotNull String productKey, @NotNull String deviceName, @NotNull String deviceSecret);
-
-    /**
-     * 取消注册设备
-     * @param originalIdentity
-     */
-    Mono<Void>  unRegister(String originalIdentity);
-
-    /**
-     * 停止 网桥
-     */
-    Mono<Void> stopBridge();
-
-    /**
-     * 重新连接
-     */
-    Mono<Void> reconnect();
-
-    /**
-     * 处理网桥信息
-     * @return
-     */
-    Flux<AliBridgeMessage> handleReceive();
-}

+ 41 - 169
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/server/aliyun/AliBridgeGateway.java

@@ -1,41 +1,30 @@
 package org.jetlinks.community.bridge.server.aliyun;
 
-import cn.hutool.core.util.StrUtil;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.hswebframework.web.exception.BusinessException;
 import org.hswebframework.web.logger.ReactiveLogger;
 import org.jetlinks.community.bridge.core.AliBridgeCodec;
-import org.jetlinks.community.bridge.entity.AliIotBridgeDeviceConfig;
 import org.jetlinks.community.bridge.entity.AliIotBridgeEntity;
 import org.jetlinks.community.bridge.message.AliBridgeMessage;
-import org.jetlinks.core.cluster.ClusterManager;
-import org.jetlinks.core.cluster.ClusterTopic;
-import org.jetlinks.core.device.DeviceOperator;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.event.EventBus;
-import org.jetlinks.core.event.Subscription;
-import org.jetlinks.core.event.TopicPayload;
 import org.jetlinks.core.message.CommonDeviceMessage;
 import org.jetlinks.core.message.CommonDeviceMessageReply;
 import org.jetlinks.core.message.DeviceMessage;
 import org.jetlinks.core.message.Message;
 import org.jetlinks.core.message.function.FunctionInvokeMessage;
-import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
 import org.jetlinks.core.message.property.WritePropertyMessage;
 import org.jetlinks.supports.server.DecodedClientMessageHandler;
 import org.springframework.stereotype.Service;
 import org.springframework.util.StringUtils;
 import reactor.core.publisher.*;
 import reactor.core.scheduler.Schedulers;
-
-import javax.annotation.PostConstruct;
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
-import java.util.function.BiConsumer;
 
 /**
  * @author lifang
@@ -58,63 +47,17 @@ public class AliBridgeGateway{
     private final EventBus eventBus;
     private final DecodedClientMessageHandler messageHandler;
 
-    private final String  clusterId;
-
-
-
-    private final ClusterTopic<Object> clusterTopic;
-    private final ClusterManager clusterManager;
-    private final String format="bridge-cluster-%s";
     public AliBridgeGateway(DeviceRegistry registry,
                             EventBus eventBus,
-                            DecodedClientMessageHandler messageHandler,
-                            ClusterManager clusterManager) {
+                            DecodedClientMessageHandler messageHandler) {
         this.registry = registry;
         this.eventBus=eventBus;
         this.messageHandler = messageHandler;
-        this.clusterManager=clusterManager;
-        this.clusterId=clusterManager.getCurrentServerId();
-        this.clusterTopic=clusterManager.getTopic(String.format(format,clusterManager.getCurrentServerId()));
-    }
-
-    @PostConstruct
-    public void init(){
-        clusterTopic
-            .subscribePattern()
-            .map(ClusterTopic.TopicMessage::getMessage)
-            .flatMap(msg->{
-                //暂停、更新、启动、删除
-                if(msg instanceof  BridgeMessage) {
-                    BridgeMessage message = (BridgeMessage) msg;
-                    MessageType messageType = message.getMessageType();
-                    switch (messageType) {
-                        case del:
-                            return delBridgeServer(clusterId,message.getBridgeId());
-                        case init:
-                            return initBridge((AliIotBridgeEntity) message.getMsg());
-                        case update:
-                            return replaceBridgeServer(clusterId, message.getBridgeId(),(AliIotBridgeEntity) message.getMsg());
-                        case register:
-                            return registerDevice(clusterId, message.getBridgeId(),(AliIotBridgeDeviceConfig) message.getMsg());
-                        case unregister:
-                            return unregisterDevice(clusterId,message.getBridgeId(), (String) message.getMsg());
-                        case restart:
-                            return reconnect(clusterId,message.getBridgeId());
-                        default:
-                            return Mono.empty();
-                    }
-                }
-                return Mono.empty();
-            })
-            .onErrorContinue((error,object)->{
-                log.error("网桥任务执行失败,",error);
-            })
-            .subscribe();
     }
 
 
-    private AliBridgeServer getServer(String serverId){
-        return this.bridgeMap.get(serverId);
+    public Mono<AliBridgeServer> lookUpServer(String serverId){
+        return Mono.justOrEmpty(this.bridgeMap.get(serverId));
     }
 
     /**
@@ -122,36 +65,8 @@ public class AliBridgeGateway{
      * @param bridgeId 网桥id
      * @return
      */
-    public Mono<Void> reconnect(String nodeId,String bridgeId){
-        return handleClusterOperation(nodeId,bridgeId,MessageType.restart,bridgeId)
-            .filter(Boolean.FALSE::equals)
-            .flatMap(ignore->getServer(bridgeId).reconnect()).then();
-    }
-    /**
-     * 注册设备
-     * @param nodeId 节点id
-     * @param bridgeId 网桥id
-     * @param config 设备配置
-     * @return
-     */
-    public Mono<Void> registerDevice(String nodeId,String bridgeId,AliIotBridgeDeviceConfig config){
-        return handleClusterOperation(nodeId,bridgeId,MessageType.register,config)
-            .filter(Boolean.FALSE::equals)
-            .flatMap(ignore->getServer(bridgeId)
-                .register(bridgeId,config.getOriginalIdentity(),config.getProductKey(),config.getDeviceName(),config.getDeviceSecret())).then();
-    }
-
-
-    /**
-     * 取消设备注册
-     * @param bridgeId 网桥id
-     * @param originalIdentity 设备id
-     * @return
-     */
-    public Mono<Void> unregisterDevice(String nodeId,String bridgeId,String originalIdentity){
-        return handleClusterOperation(nodeId,bridgeId,MessageType.unregister,originalIdentity)
-            .filter(Boolean.FALSE::equals)
-            .flatMap(ignore->getServer(bridgeId).unRegister(originalIdentity)).then();
+    public Mono<Void> reconnect(String bridgeId){
+        return lookUpServer(bridgeId).flatMap(AliBridgeServer::reconnect);
     }
 
     /**
@@ -160,55 +75,48 @@ public class AliBridgeGateway{
      * @param resource
      * @return
      */
-    public Mono<Void> initBridge(AliIotBridgeEntity resource){
-        return handleClusterOperation(resource.getNodeId(),
-            resource.getId(),MessageType.init,resource)
-            .filter(Boolean.FALSE::equals)
-            .flatMap(ignore->Mono.justOrEmpty(getServer(resource.getId()))
+    public Mono<Void> initBridge(AliIotBridgeEntity resource,boolean broadcast){
+        return lookUpServer(resource.getId())
+            .then()
+            .switchIfEmpty(AliBridgeServerFactory.create(eventBus,registry,resource)
+                .doOnNext(server -> {
+                    AliBridgeServer oldServer =
+                        this.bridgeMap.put(server.getId(),server);
+                    if(!server.equals(oldServer)&&oldServer!=null){
+                        oldServer.pauseBridge()
+                            .concatWith(server.reconnect()).subscribe();
+                    }
+                })
+                .doOnNext(server->{
+                    server.handleReceive()
+                        .parallel()
+                        .runOn(Schedulers.parallel())
+                        .flatMap(this::decodeAndHandleMessage)
+                        .subscribe();
+                })
                 .then()
-                .switchIfEmpty(AliBridgeServer.create(eventBus,registry,resource,clusterId)
-                    .doOnNext(server -> {
-                        AliBridgeServer oldServer =
-                            this.bridgeMap.put(server.getId(),server);
-                        if(!server.equals(oldServer)&&oldServer!=null){
-                                oldServer.stopBridge()
-                                    .concatWith(server.reconnect()).subscribe();
-                        }
-                    })
-                    .doOnNext(server->{
-                        server.handleReceive()
-                            .parallel()
-                            .runOn(Schedulers.parallel())
-                            .flatMap(this::decodeAndHandleMessage)
-                            .subscribe();
-                    })
-                    .then()
-                    .onErrorResume(error->
-                        Mono.error(()->{
-                            this.delBridgeServer(resource.getNodeId(),
-                                resource.getId());
-                            return new BusinessException(error.getMessage());
-                        })))
-                .then()).then();
+                .onErrorResume(error->
+                    Mono.error(()->{
+                        this.delBridgeServer(
+                            resource.getId(),broadcast);
+                        return new BusinessException(error.getMessage());
+                    })))
+            .then();
     }
 
 
 
-    public Mono<AliBridgeServer> replaceBridgeServer(String oldNodeId,String oldBridgeId,AliIotBridgeEntity bridge){
-        return handleClusterOperation(oldNodeId,oldBridgeId,MessageType.update,bridge)
-            .filter(Boolean.FALSE::equals)
-            .flatMap(ignore->delBridgeServer(oldNodeId,oldBridgeId))
-            .concatWith(initBridge(bridge))
-            .then(Mono.justOrEmpty(getServer(bridge.getId())));
+    public Mono<AliBridgeServer> replaceBridgeServer(String oldBridgeId,AliIotBridgeEntity bridge,boolean broadcast){
+        return delBridgeServer(oldBridgeId,broadcast)
+            .concatWith(initBridge(bridge,broadcast))
+            .then(lookUpServer(bridge.getId()));
     }
 
 
-    public Mono<Void> delBridgeServer(String nodeId,String bridgeId){
-        return handleClusterOperation(nodeId,bridgeId,MessageType.del,bridgeId)
-            .filter(Boolean.FALSE::equals)
-            .flatMap(ignore->Mono.justOrEmpty(getServer(bridgeId))
-                .filter(Objects::nonNull)
-                .flatMap(AliBridgeServer::stopBridge)).then();
+    public Mono<Void> delBridgeServer(String bridgeId,boolean broadcast){
+        return lookUpServer(bridgeId)
+            .filter(Objects::nonNull)
+            .flatMap(AliBridgeServer::delBridge);
     }
 
 
@@ -255,43 +163,7 @@ public class AliBridgeGateway{
             );
     }
 
-
-    /**
-     *
-     * @param nodeId 网桥所在节点id
-     * @param bridgeId 网桥id
-     * @param messageType  消息类型
-     * @param msg 消息
-     * @return
-     */
-    private Mono<Boolean> handleClusterOperation(String nodeId,String bridgeId, MessageType messageType, Object msg){
-        if(StrUtil.isNullOrUndefined(bridgeId)){
-            log.error("AliBridgeGateway.handleClusterOperation 网桥id不能为空,");
-            return Mono.error(new BusinessException("bridgeId不能为空"));
-        }
-        if(!this.clusterId.equals(nodeId)){
-            return clusterManager.getTopic(String.format(format,nodeId))
-                .publish(Mono.just(BridgeMessage.of(clusterId,messageType,bridgeId,msg)))
-                .thenReturn(true);
-        }
-        return Mono.just(false);
-    }
-
-    @AllArgsConstructor(staticName = "of")
-    @Data
-    private static class BridgeMessage implements Serializable {
-        private String from;
-        private MessageType messageType;
-        private String bridgeId;
-        private Object msg;
-    }
-
-    enum MessageType{
-        restart,
-        init,
-        del,
-        update,
-        register,
-        unregister;
+    public Mono<Void> pauseBridge(String id,boolean broadcast) {
+        return lookUpServer(id).flatMap(AliBridgeServer::pauseBridge);
     }
 }

+ 181 - 55
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/server/aliyun/AliBridgeServer.java

@@ -3,10 +3,13 @@ package org.jetlinks.community.bridge.server.aliyun;
 import cn.hutool.core.lang.Assert;
 import cn.hutool.core.lang.Pair;
 import cn.hutool.core.map.MapUtil;
+import cn.hutool.extra.spring.SpringUtil;
 import com.aliyun.iot.as.bridge.core.config.BridgeConfigConsts;
 import com.aliyun.iot.as.bridge.core.handler.DownlinkChannelHandler;
 import com.aliyun.iot.as.bridge.core.model.PopClientConfiguration;
 import com.aliyun.iot.as.bridge.core.model.Session;
+import lombok.Builder;
+import lombok.Data;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.hswebframework.web.exception.BusinessException;
@@ -15,15 +18,20 @@ import org.jetlinks.community.bridge.core.DefaultBridgeConfigManager;
 import org.jetlinks.community.bridge.core.DefaultDeviceConfigManager;
 import org.jetlinks.community.bridge.core.DefaultUplinkChannelHandler;
 import org.jetlinks.community.bridge.entity.AliIotBridgeEntity;
+import org.jetlinks.community.bridge.enums.BridgeStatus;
 import org.jetlinks.community.bridge.message.AliBridgeMessage;
-import org.jetlinks.community.bridge.server.BridgeServer;
 import org.jetlinks.community.bridge.server.Channel;
+import org.jetlinks.community.bridge.service.AliBridgeDeviceService;
+import org.jetlinks.community.bridge.service.AliBridgeService;
+import org.jetlinks.core.cluster.ClusterManager;
 import org.jetlinks.core.cluster.ClusterUniqueTask;
 import org.jetlinks.core.device.DeviceOperator;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.event.EventBus;
+import org.redisson.api.RedissonClient;
 import reactor.core.publisher.*;
 import javax.validation.constraints.NotNull;
+import java.io.Serializable;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -39,50 +47,43 @@ import java.util.concurrent.atomic.AtomicBoolean;
  * @createTime 2021年11月30日 11:01:00
  */
 @Slf4j
-public class AliBridgeServer implements BridgeServer {
-    private Map<String, Channel> channelMap=new ConcurrentHashMap<>();
+public class AliBridgeServer extends ClusterUniqueTask<AliBridgeServer> {
+    private transient Map<String, Channel> channelMap=new ConcurrentHashMap<>();
 
-    private final EventBus eventBus;
+    private transient final EventBus eventBus;
 
     /************ 下行数据处理 ***************/
-    private final UnicastProcessor<AliBridgeMessage> receive = UnicastProcessor.create();
+    private transient final UnicastProcessor<AliBridgeMessage> receive = UnicastProcessor.create();
 
-    private final FluxSink<AliBridgeMessage> receiveSink = receive.sink(FluxSink.OverflowStrategy.BUFFER);
+    private transient final FluxSink<AliBridgeMessage> receiveSink = receive.sink(FluxSink.OverflowStrategy.BUFFER);
     /************ 下行数据处理 ***************/
 
+    private transient final DeviceRegistry deviceRegistry;
     private AtomicBoolean start=new AtomicBoolean(false);
 
-    private DefaultBridgeConfigManager bridgeConfigManager;
+    private transient DefaultBridgeConfigManager bridgeConfigManager;
 
+    private transient DefaultBridgeBootstrap bootstrap;
 
-    private  DefaultBridgeBootstrap bootstrap;
+    private BridgeStatus status;
 
-    private final String clusterId;
+    private transient AliBridgeGateway bridgeGateway;
+
+    private AliIotBridgeEntity params;
 
     @Getter
     private final String id;
-    private final DeviceRegistry deviceRegistry;
-    private AliBridgeServer(EventBus eventBus, DeviceRegistry deviceRegistry, String clusterId, String id) {
+
+
+    AliBridgeServer(EventBus eventBus, DeviceRegistry deviceRegistry, String id) {
+        super("_cluster_bridge_"+id,SpringUtil.getBean(ClusterManager.class),SpringUtil.getBean(RedissonClient.class));
         this.deviceRegistry=deviceRegistry;
         this.eventBus = eventBus;
-        this.clusterId=clusterId;
         this.id=id;
+        this.bridgeGateway= SpringUtil.getBean(AliBridgeGateway.class);
     }
 
-    public static Mono<AliBridgeServer> create(EventBus eventBus,DeviceRegistry deviceRegistry, AliIotBridgeEntity bridge,String clusterId) {
-        AliBridgeServer aliBridgeServer = new AliBridgeServer(eventBus,deviceRegistry,clusterId,bridge.getId());
-        return  Mono.just(aliBridgeServer)
-            .flatMap(server->server.initBridge(bridge))
-            .thenReturn(aliBridgeServer);
-    }
-
-    public Mono<Void> start(){
-        return Mono.fromRunnable(()->this.bootstrap.reconnectBridge());
-    }
-    public Mono<Void> initBridge(AliIotBridgeEntity params){
-        if(start.get()){
-            return Mono.empty();
-        }
+    public Mono<Void> initBridge(AliIotBridgeEntity params,boolean broadcast){
         AliIotBridgeEntity.AccessConfig accessConfig = params.getAccessConfig();
         String productKey = accessConfig.getProductKey();
         Assert.notNull(productKey,"创建网桥 productKey 不能为空, mapping id {%s}",params.getId());
@@ -96,12 +97,29 @@ public class AliBridgeServer implements BridgeServer {
         Assert.notNull(http2Endpoint,"创建网桥 http2Endpoint 不能为空, mapping id {%s}",params.getId());
         String regionId = accessConfig.getRegionId();
         Assert.notNull(regionId,"创建网桥 regionId 不能为空, mapping id {%s}",params.getId());
+        if(broadcast){
+            //发送广播消息
+            getClusterOperationTopic().publish(Mono.just(OperationMessage.builder().param(params).init(true).build())).subscribe();
+        }
+        if(isReplica()){
+            this.params=params;
+            return Mono.empty();
+        }
+
+        if(start.get()){
+            return Mono.empty();
+        }
+        this.status=params.getState();
         bridgeConfigManager = DefaultBridgeConfigManager.of(params.getId(),productKey, null, null, http2Endpoint, authEndpoint, MapUtil.of(Pair.of(BridgeConfigConsts.BRIDGE_INSTANCEID_KEY,params.getAccessConfig().getIotInstanceId())), getPopClientProfile(accessKey, accessSecret, regionId));
         bootstrap=new DefaultBridgeBootstrap(params.getId(),bridgeConfigManager);
-        if(start.get()){
+        //非主节点、非运行状态、已启动
+        if(isReplica()||!BridgeStatus.running.equals(status)||start.get()){
             return Mono.empty();
         }
         if (start.compareAndSet(false, true)) {
+            if(bootstrap.isBridgeConnected()){
+                return Mono.empty();
+            }
             return Mono.fromRunnable(()->{
                 bootstrap.bootstrap(new DownlinkChannelHandler() {
                     @Override
@@ -120,9 +138,9 @@ public class AliBridgeServer implements BridgeServer {
         }
         params.setDeviceName(bridgeConfigManager.getDeviceName());
         return Mono.empty();
-
     }
 
+
     private PopClientConfiguration getPopClientProfile(@NotNull String accessKey,@NotNull String accessSecret,@NotNull String endpoint){
         PopClientConfiguration popClientConfiguration = new PopClientConfiguration();
         popClientConfiguration.setAccessKey(accessKey);
@@ -134,23 +152,27 @@ public class AliBridgeServer implements BridgeServer {
         return popClientConfiguration;
     }
 
-    @Override
-    public boolean verify(Map<String, Object> params) {
-        return true;
-    }
-
-    @Override
-    public String productId() {
-        return null;
-    }
 
-    @Override
-    public Mono<Channel> register(@NotNull String bridgeId,@NotNull String originalIdentity,@NotNull String productKey,@NotNull String deviceName,@NotNull String deviceSecret) {
+    public Mono<Channel> register(@NotNull String bridgeId,@NotNull String originalIdentity,@NotNull String productKey,@NotNull String deviceName,@NotNull String deviceSecret,
+                                  boolean broadcast) {
         //注册设备信息
         DefaultDeviceConfigManager.register(bridgeId,originalIdentity,productKey,deviceName,deviceSecret);
         DefaultUplinkChannelHandler uplinkChannelHandler = new DefaultUplinkChannelHandler(bridgeConfigManager, DefaultDeviceConfigManager.getInstance());
         channelMap
             .putIfAbsent(originalIdentity, new DefaultAliBridgeChannel(originalIdentity, productKey, deviceName, uplinkChannelHandler, deviceRegistry, eventBus));
+        if(broadcast){
+            getClusterOperationTopic()
+                .publish(Mono.just(OperationMessage.builder()
+                    .bridgeId(bridgeId)
+                    .originalIdentity(originalIdentity)
+                    .productKey(productKey)
+                    .deviceName(deviceName)
+                    .deviceSecret(deviceSecret)
+                    .register(true))).then(Mono.empty()).subscribe();
+        }
+        if(isReplica()){
+            return Mono.empty();
+        }
         return Mono.just(channelMap.get(originalIdentity))
             .doOnNext(Channel::init)
             .flatMap(channel ->
@@ -160,53 +182,157 @@ public class AliBridgeServer implements BridgeServer {
                         if(online){
                             channel.online();
                         }else {
-                            channel.doOffline();
+                            try {
+                                channel.doOffline();
+                            }catch (Exception e){}
                         }
                     })
                     .thenReturn(channel)
             )
             .onErrorResume(error->
                 Mono.error(()->{
-                this.unRegister(originalIdentity);
-                return new BusinessException("请检查deviceName和deviceSecret是否填写正确");
-            }));
+                    this.unRegister(originalIdentity,broadcast);
+                    return new BusinessException("请检查deviceName和deviceSecret是否填写正确");
+                }));
     }
 
-    @Override
-    public Mono<Void> unRegister(String originalIdentity) {
+
+    public Mono<Void> unRegister(String originalIdentity,boolean broadcast) {
         return Mono.fromRunnable(()->{
             Channel channel = channelMap.remove(originalIdentity);
-            if(channel!=null){
+            if(broadcast){
+                getClusterOperationTopic()
+                    .publish(Mono.just(OperationMessage.builder()
+                        .originalIdentity(originalIdentity)
+                        .unRegister(true))).then(Mono.empty()).subscribe();
+            }
+            if(!isReplica()&&channel!=null){
                 channel.close();
             }
         });
     }
 
-    @Override
-    public Mono<Void> stopBridge() {
+
+    public Mono<Void> pauseBridge() {
         return Mono.fromRunnable(()->{
-            if(bootstrap!=null){
+            if(bootstrap!=null&&bootstrap.isBridgeConnected()){
                 bootstrap.disconnectBridge();
             }
-            log.info("网桥[{}]关闭",clusterId);
+            changeStatus(BridgeStatus.stop);
+            log.info("网桥[{}]关闭",id);
         });
     }
 
-    @Override
+
+    public Mono<Void> delBridge() {
+        return Mono.fromRunnable(()->{
+            if(bootstrap!=null&&bootstrap.isBridgeConnected()){
+                bootstrap.disconnectBridge();
+                bootstrap=null;
+            }
+            changeStatus(BridgeStatus.del);
+            log.info("网桥[{}]关闭",id);
+        });
+    }
+
+
     public Mono<Void> reconnect() {
         return  Mono.fromRunnable(()->{
             if(bootstrap!=null){
-                bootstrap.reconnectBridge();
-                channelMap.values().forEach(Channel::online);
+                if(!isReplica()){
+                    bootstrap.reconnectBridge();
+                    channelMap.values().forEach(Channel::online);
+                }
             }
-            log.info("网桥[{}]重启成功",clusterId);
+            changeStatus(BridgeStatus.running);
+            log.info("网桥[{}]重启成功",id);
         });
     }
 
 
-    @Override
+    private void changeStatus(BridgeStatus status){
+        this.status=status;
+        if(isReplica()){
+            getClusterOperationTopic()
+                .publish(Mono.just(status)).subscribe();
+        }
+    }
+
+
     public Flux<AliBridgeMessage> handleReceive() {
         return receive;
     }
 
+    @Override
+    public void beMasterPostProcessor() {
+        if(BridgeStatus.running.equals(status)){
+            bridgeGateway.initBridge(params,false).subscribe();
+        }
+    }
+
+    @Override
+    public Mono<?> handleMsg(Object msg) {
+        if(msg instanceof BridgeStatus){
+            BridgeStatus status= (BridgeStatus) msg;
+            return handleStatus(status,false);
+        }
+        if(msg instanceof  OperationMessage){
+            OperationMessage message= (OperationMessage) msg;
+            if(message.register){
+                //设备注册
+                return this.register(message.getBridgeId(),message.getOriginalIdentity(),message.getProductKey(),message.getDeviceName(),message.getDeviceSecret(),false);
+            }else if(message.unRegister){
+                return this.unRegister(message.getOriginalIdentity(),false);
+            }
+            else if(message.init){
+                return bridgeGateway.initBridge(params,false);
+            }
+        }
+        return Mono.empty();
+    }
+
+    private Mono<Void> handleStatus(BridgeStatus status,boolean broadcast){
+        switch (status){
+            case del:
+                return bridgeGateway.delBridgeServer(id,broadcast);
+            case stop:
+                return bridgeGateway.pauseBridge(id,broadcast);
+            case running:
+                if(this.bootstrap==null){
+                    return bridgeGateway.initBridge(params,broadcast);
+                }
+            case fail:
+                break;
+            default:break;
+        }
+        return Mono.empty();
+    }
+    @Override
+    public void beforeHandleMsg() {
+
+    }
+
+    @Override
+    public Mono<Void> handlePing(ClusterUniqueTask task) {
+        if(task instanceof  AliBridgeServer){
+            AliBridgeServer server= (AliBridgeServer) task;
+            changeStatus(server.status);
+        }
+        return Mono.empty();
+    }
+
+
+    @Builder
+    @Data
+    private static class OperationMessage implements Serializable {
+        private boolean register;
+        private boolean init;
+        private boolean unRegister;
+        private String originalIdentity;
+        private String bridgeId;
+        private String productKey;
+        private String deviceName;
+        private String deviceSecret;
+        private AliIotBridgeEntity param;
+    }
 }

+ 29 - 0
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/server/aliyun/AliBridgeServerFactory.java

@@ -0,0 +1,29 @@
+package org.jetlinks.community.bridge.server.aliyun;
+
+import org.jetlinks.community.bridge.entity.AliIotBridgeEntity;
+import org.jetlinks.core.device.DeviceRegistry;
+import org.jetlinks.core.event.EventBus;
+import reactor.core.publisher.Mono;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName AliBridgeServerFactory.java
+ * @Description TODO
+ * @createTime 2021年12月10日 08:40:00
+ */
+public class AliBridgeServerFactory {
+
+
+    public static Mono<AliBridgeServer> create(EventBus eventBus, DeviceRegistry deviceRegistry, AliIotBridgeEntity bridge) {
+        return  create(eventBus,deviceRegistry,bridge,true);
+    }
+
+
+    public static Mono<AliBridgeServer> create(EventBus eventBus, DeviceRegistry deviceRegistry, AliIotBridgeEntity bridge,boolean broadcast) {
+        AliBridgeServer aliBridgeServer = new AliBridgeServer(eventBus,deviceRegistry,bridge.getId());
+        return  Mono.just(aliBridgeServer)
+            .flatMap(server->server.initBridge(bridge,broadcast))
+            .thenReturn(aliBridgeServer);
+    }
+}

+ 5 - 6
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/service/AliBridgeService.java

@@ -9,8 +9,6 @@ import org.jetlinks.community.bridge.server.aliyun.AliBridgeGateway;
 import org.jetlinks.core.cluster.ClusterManager;
 import org.springframework.boot.CommandLineRunner;
 import org.springframework.stereotype.Component;
-import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Scheduler;
 import reactor.core.scheduler.Schedulers;
 
 /**
@@ -30,18 +28,19 @@ public class AliBridgeService extends GenericReactiveCacheSupportCrudService<Ali
     @Override
     public void run(String... args) {
         this.createQuery()
-            .where(AliIotBridgeEntity::getState, BridgeStatus.running)
             .where(AliIotBridgeEntity::getNodeId,clusterManager.getCurrentServerId())
             .fetch()
             .parallel()
             .runOn(Schedulers.parallel())
-            .flatMap(bridgeParam->bridgeGateway.initBridge(bridgeParam)
+            .flatMap(bridgeParam->bridgeGateway.initBridge(bridgeParam,false)
                 .concatWith(bridgeDeviceService.createQuery()
                     .where(AliIotBridgeDeviceConfig::getBridgeId,bridgeParam.getId())
                     .fetch()
                     .flatMap(deviceConfig->
-                        bridgeGateway.registerDevice(bridgeParam.getNodeId(),bridgeParam.getId(),deviceConfig)
-                )))
+                        bridgeGateway.lookUpServer(deviceConfig.getBridgeId()).flatMap(bridgeServer->
+                            bridgeServer.register(deviceConfig.getBridgeId(),deviceConfig.getOriginalIdentity(),deviceConfig.getProductKey(),deviceConfig.getDeviceName(),deviceConfig.getDeviceSecret(),false)
+                                .then())
+                    )))
             .subscribe();
     }
 }

+ 42 - 35
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/web/AliBridgeServerController.java

@@ -14,8 +14,10 @@ import org.hswebframework.web.crud.web.reactive.ReactiveServiceCrudController;
 import org.hswebframework.web.exception.BusinessException;
 import org.jetlinks.community.bridge.entity.AliIotBridgeEntity;
 import org.jetlinks.community.bridge.entity.AliIotBridgeDeviceConfig;
+import org.jetlinks.community.bridge.enums.BridgeDeviceStatus;
 import org.jetlinks.community.bridge.enums.BridgeStatus;
 import org.jetlinks.community.bridge.server.aliyun.AliBridgeGateway;
+import org.jetlinks.community.bridge.server.aliyun.AliBridgeServer;
 import org.jetlinks.community.bridge.service.AliBridgeDeviceService;
 import org.jetlinks.community.bridge.service.AliBridgeService;
 import org.jetlinks.core.utils.IdUtils;
@@ -61,12 +63,11 @@ public class AliBridgeServerController implements
     @Operation(summary = "删除网桥")
     @DeleteAction
     public Mono<Void> deleteBridge(@PathVariable("bridgeId")String id){
-        return bridgeService.findById(id)
-            .concatWith( bridgeService.deleteById(id).then(Mono.empty()))
-            .flatMap(bridge->
-                bridgeGateway.delBridgeServer(bridge.getNodeId(),bridge.getId())
-                .flatMap(ignore->bridgeDeviceService.createDelete().where(AliIotBridgeDeviceConfig::getBridgeId,id).execute())
-            ).then();
+        return  Mono.zip(
+            bridgeDeviceService.createDelete().where(AliIotBridgeDeviceConfig::getBridgeId,id).execute(),
+            bridgeService.deleteById(id),
+            bridgeGateway.delBridgeServer(id,true))
+            .then();
     }
 
 
@@ -89,13 +90,22 @@ public class AliBridgeServerController implements
                     throw new BusinessException("不可在网桥下绑定网桥设备自身");
                 }
             })
-            .flatMap(ingore->
-                bridgeService.findById(id)
-                    .flatMap(bridge->
-                        bridgeDeviceService.save(config)
-                            .flatMap(ignore->bridgeGateway.registerDevice(bridge.getNodeId(), bridge.getId(), config))
-                            .onErrorResume(error->Mono.error(new BusinessException(error.getMessage().contains("originalidentity")?"平台设备不可重复绑定":"阿里云设备不可重复绑定")))
-                            .then())
+            .flatMap(__->
+                bridgeDeviceService.save(config)
+                    .flatMap(ignore->bridgeGateway.lookUpServer(id)
+                        .flatMap(existBridge->
+                            existBridge.register(id,config.getOriginalIdentity(),config.getProductKey(),
+                                config.getDeviceName(),config.getDeviceSecret(),true))
+                        .doOnError(BusinessException.class,
+                            e->
+                                Mono.just(config)
+                                    .doOnNext(device->{
+                                        device.setErrorReason(e.getMessage());
+                                        device.setState(BridgeDeviceStatus.fail);
+                                    })
+                                    .flatMap(bridgeDeviceService::save)))
+                    .onErrorResume(error->Mono.error(new BusinessException(error.getMessage()
+                        .contains("originalidentity")?"平台设备不可重复绑定":"阿里云设备不可重复绑定")))
             ).then();
     }
 
@@ -103,32 +113,31 @@ public class AliBridgeServerController implements
     @Operation(summary = "暂停网桥")
     @CreateAction
     public Mono<Void> pause(@PathVariable("serverId") String id){
-        return Mono.zip(bridgeService.findById(id),bridgeService.createUpdate().where(AliIotBridgeEntity::getId,id)
-            .set(AliIotBridgeEntity::getState, BridgeStatus.stop).execute().then())
-            .map(Tuple2::getT1)
-            .flatMap(server->bridgeGateway.delBridgeServer(server.getNodeId(),server.getId()));
+        return Mono.zip(
+            bridgeService.createUpdate().where(AliIotBridgeEntity::getId,id)
+                .set(AliIotBridgeEntity::getState, BridgeStatus.stop).execute(),
+            bridgeGateway.pauseBridge(id,true))
+            .then();
     }
 
     @PostMapping("/start/{serverId}")
     @Operation(summary = "重启网桥")
     @CreateAction
     public Mono<Void> startBridge(@PathVariable("serverId")String id){
-        return bridgeService.findById(id)
-            .flatMap(bridge-> bridgeGateway.reconnect(bridge.getNodeId(),id));
+        return bridgeGateway.reconnect(id);
     }
 
     @PostMapping("/unregister/{serverId}")
     @Operation(summary = "取消注册网桥设备")
     @DeleteAction
     public Mono<Void> unRegister(@RequestBody String originalIdentity,@PathVariable("serverId") String id){
-        return bridgeService.findById(id)
-            .flatMap(bridge->
-                Mono.zip(
-                    //删除网桥设备
-                    bridgeDeviceService.createDelete().where(AliIotBridgeDeviceConfig::getOriginalIdentity,originalIdentity).execute(),
-                    //取消注册
-                    bridgeGateway.unregisterDevice(bridge.getNodeId(),bridge.getId(),originalIdentity)).then()
-            );
+        return
+            Mono.zip(
+                //删除网桥设备
+                bridgeDeviceService.createDelete().where(AliIotBridgeDeviceConfig::getOriginalIdentity,originalIdentity).execute(),
+                //取消注册
+                bridgeGateway.lookUpServer(id).flatMap(server -> server.unRegister(originalIdentity,true))
+            ).then();
     }
 
 
@@ -147,17 +156,15 @@ public class AliBridgeServerController implements
         if(StrUtil.isNullOrUndefined(bridge.getId())){
             bridge.setId(String.valueOf(System.currentTimeMillis()));
         }
-        return bridgeService.findById(id)
-            .defaultIfEmpty(bridge)
-            .flatMap(oldBridge-> bridgeGateway.replaceBridgeServer(oldBridge.getNodeId(),oldBridge.getId(),bridge))
-            .concatWith(  bridgeService.save(bridge).then(Mono.empty()))
+        return bridgeService.save(bridge)
+            .flatMap(ignore->bridgeGateway.replaceBridgeServer(bridge.getId(),bridge,true))
             .concatWith(bridgeDeviceService.createQuery()
                 .where(AliIotBridgeDeviceConfig::getBridgeId,bridge.getId())
                 .fetch()
                 .flatMap(deviceConfig->
-                    bridgeGateway.registerDevice(bridge.getNodeId(),bridge.getId(),deviceConfig).then(Mono.empty())
-                ))
-            .then(Mono.empty())
-            .then();
+                    bridgeGateway.lookUpServer(id)
+                        .flatMap(bridgeServer->bridgeServer
+                            .register(id,deviceConfig.getOriginalIdentity(),deviceConfig.getProductKey(),deviceConfig.getDeviceName(),deviceConfig.getDeviceSecret(),true).then(Mono.empty()))
+                )).then();
     }
 }

+ 0 - 4
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/cluster/RedisClusterTopic.java

@@ -1,15 +1,11 @@
 package org.jetlinks.community.standalone.configuration.cluster;
 
-import lombok.AllArgsConstructor;
-import lombok.Getter;
 import org.hswebframework.web.exception.BusinessException;
 import org.jetlinks.core.cluster.ClusterTopic;
-import org.jetlinks.core.utils.IdUtils;
 import org.reactivestreams.Publisher;
 import org.springframework.data.redis.core.ReactiveRedisOperations;
 import reactor.core.Disposable;
 import reactor.core.publisher.*;
-
 import java.time.Duration;
 import java.util.concurrent.atomic.AtomicBoolean;