18339543638 4 anni fa
parent
commit
b1474bb854

+ 135 - 55
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/server/aliyun/AliBridgeGateway.java

@@ -1,12 +1,16 @@
 package org.jetlinks.community.bridge.server.aliyun;
 
-import cn.hutool.core.util.StrUtil;
+import lombok.AllArgsConstructor;
+import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.hswebframework.web.exception.BusinessException;
 import org.hswebframework.web.logger.ReactiveLogger;
 import org.jetlinks.community.bridge.core.AliBridgeCodec;
+import org.jetlinks.community.bridge.entity.AliIotBridgeDeviceConfig;
 import org.jetlinks.community.bridge.entity.AliIotBridgeEntity;
 import org.jetlinks.community.bridge.message.AliBridgeMessage;
+import org.jetlinks.core.cluster.ClusterManager;
+import org.jetlinks.core.cluster.ClusterTopic;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.event.EventBus;
 import org.jetlinks.core.message.CommonDeviceMessage;
@@ -14,16 +18,11 @@ import org.jetlinks.core.message.CommonDeviceMessageReply;
 import org.jetlinks.core.message.Message;
 import org.jetlinks.supports.server.DecodedClientMessageHandler;
 import org.springframework.stereotype.Service;
-import org.springframework.util.Assert;
 import org.springframework.util.StringUtils;
-import reactor.core.publisher.EmitterProcessor;
-import reactor.core.publisher.FluxSink;
-import reactor.core.publisher.Mono;
+import reactor.core.publisher.*;
 import reactor.core.scheduler.Schedulers;
-
-import java.util.Map;
+import java.io.Serializable;
 import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * @author lifang
@@ -35,12 +34,6 @@ import java.util.concurrent.ConcurrentHashMap;
 @Slf4j
 @Service
 public class AliBridgeGateway{
-
-    /**
-     * 阿里云网桥网关集合,每一个产品的设备归属于同一个网桥
-     */
-    private static final Map<String,AliBridgeServer> serverMap=new ConcurrentHashMap<>();
-
     private volatile AliBridgeServer aliBridgeServer;
     private final EmitterProcessor<Message> messageProcessor = EmitterProcessor.create(false);
 
@@ -51,62 +44,123 @@ public class AliBridgeGateway{
     private final EventBus eventBus;
     private final DecodedClientMessageHandler messageHandler;
 
+    private final String  clusterId;
+
+    private final ClusterTopic<Object> clusterTopic;
+    private final ClusterManager clusterManager;
+    private final String format="bridge-cluster-%s";
     public AliBridgeGateway(DeviceRegistry registry,
                             EventBus eventBus,
-                            DecodedClientMessageHandler messageHandler) {
+                            DecodedClientMessageHandler messageHandler,
+                            ClusterManager clusterManager) {
         this.registry = registry;
         this.eventBus=eventBus;
         this.messageHandler = messageHandler;
+        this.clusterManager=clusterManager;
+        this.clusterId=clusterManager.getCurrentServerId();
+        this.clusterTopic=clusterManager.getTopic(String.format("bridge-cluster-%s",clusterManager.getCurrentServerId()));
     }
 
-    public Mono<Void> initBridge(AliIotBridgeEntity productMapping){
-        if (aliBridgeServer!=null) {
-            return   AliBridgeServer.create(eventBus,registry,productMapping)
-                .doOnNext(server -> {
-                    if(aliBridgeServer!=null){
-                        aliBridgeServer=server;
-                    }else {
-                        server.stopBridge().subscribe();
-                        throw new BusinessException("网桥正在运行,请勿重复启动");
+
+    public void init(){
+        clusterTopic
+            .subscribePattern()
+            .flatMap(msg->{
+                //暂停、更新、启动、删除
+                if(msg instanceof  BridgeMessage) {
+                    BridgeMessage message = (BridgeMessage) msg;
+                    MessageType messageType = message.getMessageType();
+                    switch (messageType) {
+                        case del:
+                            return delBridgeServer((String) message.getMsg());
+                        case init:
+                            return initBridge((AliIotBridgeEntity) message.getMsg());
+                        case update:
+                            return replaceBridgeServer(clusterId, (AliIotBridgeEntity) message.getMsg());
+                        case register:
+                            return registerDevice(clusterId, (AliIotBridgeDeviceConfig) message.getMsg());
+                        case unregister:
+                            return unregisterDevice(clusterId, (String) message.getMsg());
+                        case restart:
+                            return reconnect(clusterId);
+                        default:
+                            return Mono.empty();
                     }
-                })
-                .doOnNext(server->{
-                    server.handleReceive()
-                        .parallel()
-                        .runOn(Schedulers.parallel())
-                        .flatMap(this::decodeAndHandleMessage)
-                        .subscribe();
-                })
-                .then()
-                .onErrorResume(error->Mono.error(()->{
-                    this.delBridgeServer();
-                    return new BusinessException("网桥配置出错,请确定配置参数是否正确(是否存在空格)");
-                }));
-        }
-        return Mono.empty();
+                }
+                return Mono.empty();
+            }).subscribe();
+    }
+
+
+
+    public Mono<Void> reconnect(String serverId){
+        return handleClusterOperation(serverId,MessageType.restart,serverId)
+            .switchIfEmpty(aliBridgeServer.reconnect());
+    }
+    /**
+     * 注册设备
+     * @param serverId 节点id
+     * @param config 设备配置
+     * @return
+     */
+    public Mono<Void> registerDevice(String serverId, AliIotBridgeDeviceConfig config){
+        return handleClusterOperation(serverId,MessageType.register,config)
+            .switchIfEmpty(this.aliBridgeServer
+                .register(config.getOriginalIdentity(),config.getProductKey(),config.getDeviceName(),config.getDeviceSecret()).then());
+    }
+
+
+    /**
+     * 取消设备注册
+     * @return
+     */
+    public Mono<Void> unregisterDevice(String serverId,String originalIdentity){
+        return handleClusterOperation(serverId,MessageType.unregister,originalIdentity)
+            .switchIfEmpty(aliBridgeServer.unRegister(originalIdentity));
     }
 
-    public Mono<AliBridgeServer> getBridgeServer(){
-        return Mono.justOrEmpty(aliBridgeServer)
-            .switchIfEmpty(Mono.error(new BusinessException("暂未创建网桥")));
+    public Mono<Void> initBridge(AliIotBridgeEntity bridgeEntity){
+        return handleClusterOperation(bridgeEntity.getNodeId(),MessageType.init,bridgeEntity)
+            .switchIfEmpty(Mono.justOrEmpty(aliBridgeServer)
+                .filter(Objects::nonNull)
+                .flatMap(ignore->
+                    AliBridgeServer.create(eventBus,registry,bridgeEntity)
+                        .doOnNext(server -> {
+                            if(aliBridgeServer!=null){
+                                aliBridgeServer=server;
+                            }else {
+                                server.stopBridge().subscribe();
+                                throw new BusinessException("网桥正在运行,请勿重复启动");
+                            }
+                        })
+                        .doOnNext(server->{
+                            server.handleReceive()
+                                .parallel()
+                                .runOn(Schedulers.parallel())
+                                .flatMap(this::decodeAndHandleMessage)
+                                .subscribe();
+                        })
+                        .then()
+                        .onErrorResume(error->Mono.error(()->{
+                            this.delBridgeServer(bridgeEntity.getNodeId());
+                            return new BusinessException("网桥配置出错,请确定配置参数是否正确(是否存在空格)");
+                        }))
+                )
+                .then());
     }
 
-    public Mono<Void> delBridgeServer(){
-        return Mono.justOrEmpty(aliBridgeServer)
-            .switchIfEmpty(Mono.defer(()->Mono.just(new AliBridgeServer(eventBus,registry))))
-            .flatMap(AliBridgeServer::stopBridge)
-            .then();
+    public Mono<Void> delBridgeServer(String serverId){
+        return handleClusterOperation(serverId,MessageType.del,serverId)
+            .switchIfEmpty(Mono.justOrEmpty(aliBridgeServer)
+                .filter(Objects::nonNull)
+                .flatMap(AliBridgeServer::stopBridge));
     }
 
     public Mono<Void> replaceBridgeServer(String serverId,AliIotBridgeEntity bridge){
-        Assert.hasText(serverId,"更新网桥 serverId 不可为空");
-        Assert.notNull(bridge,"更新网桥 配置 AliIotBridgeEntity 不可为空");
-        if(StrUtil.isEmpty(bridge.getProductId())||!bridge.getProductId().equals(serverId)){
-            return Mono.error(new BusinessException("网桥配置有误,网桥产品不可更改"));
-        }
-        return delBridgeServer()
-            .concatWith(initBridge(bridge))
-            .then();
+        return handleClusterOperation(serverId,MessageType.update,bridge)
+            .switchIfEmpty(delBridgeServer(serverId)
+                .concatWith(initBridge(bridge))
+                .then());
     }
 
     //解码消息并处理
@@ -145,4 +199,30 @@ public class AliBridgeGateway{
                     .then()
             );
     }
+
+
+    private Mono<Void> handleClusterOperation(String serverId, MessageType messageType, Object msg){
+        if(!this.clusterId.equals(serverId)){
+            return clusterManager.getTopic(String.format(format,serverId))
+                .publish(Mono.just(BridgeMessage.of(clusterId,messageType,msg))).then();
+        }
+        return Mono.empty();
+    }
+
+    @AllArgsConstructor(staticName = "of")
+    @Data
+    class BridgeMessage implements Serializable {
+        String from;
+        MessageType messageType;
+        Object msg;
+    }
+
+    enum MessageType{
+        restart,
+        init,
+        del,
+        update,
+        register,
+        unregister;
+    }
 }

+ 0 - 5
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/service/AliBridgeService.java

@@ -1,17 +1,12 @@
 package org.jetlinks.community.bridge.service;
 
 import lombok.AllArgsConstructor;
-import org.checkerframework.checker.units.qual.A;
 import org.hswebframework.web.crud.service.GenericReactiveCacheSupportCrudService;
 import org.jetlinks.community.bridge.entity.AliIotBridgeDeviceConfig;
 import org.jetlinks.community.bridge.entity.AliIotBridgeEntity;
-import org.jetlinks.community.bridge.enums.BridgeDeviceStatus;
 import org.jetlinks.community.bridge.enums.BridgeStatus;
 import org.jetlinks.community.bridge.server.aliyun.AliBridgeGateway;
 import org.jetlinks.core.cluster.ClusterManager;
-import org.jetlinks.core.cluster.ClusterUniqueTask;
-import org.jetlinks.supports.cluster.redis.RedisClusterManager;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.CommandLineRunner;
 import org.springframework.stereotype.Component;
 import reactor.core.publisher.Mono;

+ 20 - 42
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/web/AliBridgeServerController.java

@@ -13,15 +13,10 @@ import org.hswebframework.web.crud.web.reactive.ReactiveServiceCrudController;
 import org.jetlinks.community.bridge.entity.AliIotBridgeEntity;
 import org.jetlinks.community.bridge.entity.AliIotBridgeDeviceConfig;
 import org.jetlinks.community.bridge.server.aliyun.AliBridgeGateway;
-import org.jetlinks.community.bridge.server.aliyun.AliBridgeServer;
 import org.jetlinks.community.bridge.service.AliBridgeDeviceService;
 import org.jetlinks.community.bridge.service.AliBridgeService;
 import org.springframework.web.bind.annotation.*;
 import reactor.core.publisher.Mono;
-import reactor.util.function.Tuple2;
-
-import java.util.HashMap;
-import java.util.function.Function;
 
 /**
  * @author lifang
@@ -55,12 +50,12 @@ public class AliBridgeServerController implements
      * 2、删除网桥
      * 3、删除网桥挂载设备
      */
-    @DeleteMapping("/delete/{productId}/{bridgeId}")
+    @DeleteMapping("/delete/{serverId}/{bridgeId}")
     @Operation(summary = "删除网桥")
     @DeleteAction
-    public Mono<Void> deleteBridge(@PathVariable("productId")String productId,@PathVariable("bridgeId")String bridgeId){
-        return Mono.zip(bridgeGateway.getBridgeServer()
-            .flatMap(AliBridgeServer::stopBridge),bridgeService.deleteById(bridgeId))
+    public Mono<Void> deleteBridge(@PathVariable("serverId")String serverId,@PathVariable("bridgeId")String bridgeId){
+        return Mono.zip(bridgeGateway.delBridgeServer(serverId)
+            ,bridgeService.deleteById(bridgeId))
             .concatWith(bridgeDeviceService.createDelete().where(AliIotBridgeDeviceConfig::getBridgeId,bridgeId).execute().then(Mono.empty()))
             .then();
     }
@@ -69,55 +64,39 @@ public class AliBridgeServerController implements
     @PostMapping("/register/{serverId}")
     @Operation(summary = "注册网桥设备")
     @CreateAction
-    public Mono<Void> register(@PathVariable("serverId") String id,@RequestBody Mono<AliIotBridgeDeviceConfig> config){
+    public Mono<Void> register(@PathVariable("serverId") String serverId,@RequestBody AliIotBridgeDeviceConfig config){
         return Mono.zip(
             //保存网桥设备信息
             bridgeDeviceService.save(config),
             //注册网桥设备
-            bridgeGateway.getBridgeServer()
-                .zipWith(config)
-                .flatMap(tp2-> tp2.getT1().register(tp2.getT2().getOriginalIdentity(),tp2.getT2().getProductKey(),tp2.getT2().getDeviceName(),tp2.getT2().getDeviceSecret())))
+            bridgeGateway.registerDevice(serverId,config))
             .then();
     }
 
-    @PostMapping("/pause/{productId}")
+    @PostMapping("/pause/{serverId}")
     @Operation(summary = "暂停网桥")
     @CreateAction
-    public Mono<Void> pause(@PathVariable("productId") String productId){
-        return bridgeGateway.getBridgeServer()
-            .flatMap(AliBridgeServer::stopBridge)
+    public Mono<Void> pause(@PathVariable("serverId") String serverId){
+        return bridgeGateway.delBridgeServer(serverId)
             .then();
     }
 
-    @PostMapping("/start/{productId}")
+    @PostMapping("/start/{serverId}")
     @Operation(summary = "重启网桥")
     @CreateAction
-    /**
-     * 判断是否是重启网桥,若不是重新,则重新注册网桥
-     */
-    public Mono<Void> startBridge(@PathVariable("productId")String productId){
-        return bridgeGateway.getBridgeServer()
-            .flatMap(AliBridgeServer::reconnect)
-            .then();
+    public Mono<Void> startBridge(@PathVariable("serverId")String serverId){
+        return bridgeGateway.reconnect(serverId);
     }
 
-    @PostMapping("/unregister/{productId}")
+    @PostMapping("/unregister/{serverId}")
     @Operation(summary = "取消注册网桥设备")
     @DeleteAction
-    public Mono<Void> unRegister(@PathVariable("id")String productId,@RequestBody String originalIdentity){
+    public Mono<Void> unRegister(@PathVariable("id")String serverId,@RequestBody String originalIdentity){
         return Mono.zip(
             //删除网桥设备
             bridgeDeviceService.createDelete().where(AliIotBridgeDeviceConfig::getOriginalIdentity,originalIdentity).execute(),
             //取消注册
-            bridgeGateway.getBridgeServer()
-                .flatMap(server->server.unRegister(originalIdentity))).then();
-    }
-
-    @GetMapping("/lookUp/{productId}")
-    @Operation(summary = "查看网桥设备")
-    //todo
-    public Mono<Void> lookUp(@PathVariable("productId")Mono<String> productId){
-        return Mono.empty();
+            bridgeGateway.unregisterDevice(serverId,originalIdentity)).then();
     }
 
 
@@ -125,24 +104,23 @@ public class AliBridgeServerController implements
      * 1、删除网桥
      * 2、重启网桥
      * 3、注册网桥设备
-     * @param productId
+     * @param serverId
      * @param bridge
      * @return
      */
-    @PutMapping("/update/{productId}")
+    @PutMapping("/update/{serverId}")
     @Operation(summary = "更新网桥信息")
     @CreateAction
-    public Mono<Void> updateBridge(@PathVariable("productId")String productId,@RequestBody AliIotBridgeEntity bridge){
+    public Mono<Void> updateBridge(@PathVariable("serverId")String serverId,@RequestBody AliIotBridgeEntity bridge){
         //删除设备后重新注册设备
         return   Mono.zip(
             bridgeService.save(bridge),
-            bridgeGateway.replaceBridgeServer(productId,bridge))
+            bridgeGateway.replaceBridgeServer(serverId,bridge))
             .flatMap(tp2->bridgeDeviceService.createQuery()
                 .where(AliIotBridgeDeviceConfig::getBridgeId,bridge.getId())
                 .fetch()
                 .flatMap(deviceConfig->
-                    bridgeGateway.getBridgeServer().flatMap(bridgeServe->bridgeServe.register(deviceConfig.getOriginalIdentity(),deviceConfig.getProductKey(),deviceConfig.getDeviceName(),deviceConfig.getDeviceSecret()))
-                        .then(Mono.empty())
+                    bridgeGateway.registerDevice(serverId,deviceConfig)
                 ).then());
     }