ソースを参照

fixed 集群网桥

18339543638 4 年 前
コミット
af77a82aab
16 ファイル変更184 行追加119 行削除
  1. 2 1
      jetlinks-core/src/main/java/org/jetlinks/core/cluster/AbstractClusterUniqueTask.java
  2. 0 3
      jetlinks-core/src/main/java/org/jetlinks/core/cluster/ClusterTopic.java
  3. 15 0
      jetlinks-core/src/main/java/org/jetlinks/core/cluster/annotation/ClusterReceiveSelfMessage.java
  4. 2 1
      jetlinks-core/src/main/java/org/jetlinks/core/cluster/message/ClusterMessage.java
  5. 2 1
      jetlinks-core/src/main/java/org/jetlinks/core/cluster/message/ClusterMessageReply.java
  6. 2 1
      jetlinks-core/src/main/java/org/jetlinks/core/message/Message.java
  7. 1 2
      jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/message/AliBridgeMessage.java
  8. 4 4
      jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/server/aliyun/AliBridgeGateway.java
  9. 105 25
      jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/server/aliyun/AliBridgeServer.java
  10. 2 2
      jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/server/aliyun/AliBridgeServerFactory.java
  11. 7 10
      jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/server/aliyun/DefaultAliBridgeChannel.java
  12. 0 1
      jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/service/AliBridgeDeviceService.java
  13. 2 4
      jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/service/AliBridgeService.java
  14. 3 8
      jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/web/AliBridgeServerController.java
  15. 1 1
      jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/cluster/RedisClusterManager.java
  16. 36 55
      jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/cluster/RedisClusterTopic.java

+ 2 - 1
jetlinks-core/src/main/java/org/jetlinks/core/cluster/AbstractClusterUniqueTask.java

@@ -2,6 +2,7 @@ package org.jetlinks.core.cluster;
 
 import lombok.AllArgsConstructor;
 import lombok.Data;
+import org.jetlinks.core.cluster.annotation.ClusterReceiveSelfMessage;
 import org.redisson.api.RLock;
 import org.redisson.api.RedissonClient;
 import reactor.core.Disposable;
@@ -23,7 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
  */
 @AllArgsConstructor
 @Data
-public abstract class AbstractClusterUniqueTask<T> implements Serializable {
+public abstract class AbstractClusterUniqueTask<T> implements Serializable, ClusterReceiveSelfMessage {
     private static final long serialVersionUID=1L;
     private String id;
     /**

+ 0 - 3
jetlinks-core/src/main/java/org/jetlinks/core/cluster/ClusterTopic.java

@@ -36,9 +36,6 @@ public interface ClusterTopic<T> {
                 .map(TopicMessage::getMessage);
     }
 
-    Mono<Integer> reply(Publisher<? extends T> publisher, String messageId);
-
-    Flux<TopicMessage<T>> publishAndReceive(Publisher<? extends T> publisher, long timeout, String messageId);
 
     interface TopicMessage<T> {
         String getTopic();

+ 15 - 0
jetlinks-core/src/main/java/org/jetlinks/core/cluster/annotation/ClusterReceiveSelfMessage.java

@@ -0,0 +1,15 @@
+package org.jetlinks.core.cluster.annotation;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName ClusterReceiveSelfMessage.java
+ * @Description 当消息在集群中传递时,若A服务器发出消息AM1后,并接收到了AM1,
+ * 若AM1消息实现了(@link ClusterReceiveSelfMessage)该接口,则消费该消息,
+ * 否则舍弃该消息
+ * @createTime 2021年12月12日 00:20:00
+ * @see org.jetlinks.core.message.Message
+ * @see  org.jetlinks.core.cluster.AbstractClusterUniqueTask
+ */
+public interface ClusterReceiveSelfMessage {
+}

+ 2 - 1
jetlinks-core/src/main/java/org/jetlinks/core/cluster/message/ClusterMessage.java

@@ -3,6 +3,7 @@ package org.jetlinks.core.cluster.message;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import org.jetlinks.core.cluster.ClusterTopic;
+import org.jetlinks.core.cluster.annotation.ClusterReceiveSelfMessage;
 import org.jetlinks.core.message.Message;
 
 import java.io.Serializable;
@@ -16,7 +17,7 @@ import java.io.Serializable;
  */
 @AllArgsConstructor
 @Data
-public class ClusterMessage implements ClusterTopic.TopicMessage, Serializable {
+public class ClusterMessage implements ClusterTopic.TopicMessage, Serializable, ClusterReceiveSelfMessage {
     private String messageId;
 
     private Message payload;

+ 2 - 1
jetlinks-core/src/main/java/org/jetlinks/core/cluster/message/ClusterMessageReply.java

@@ -1,6 +1,7 @@
 package org.jetlinks.core.cluster.message;
 
 import lombok.Data;
+import org.jetlinks.core.cluster.annotation.ClusterReceiveSelfMessage;
 import org.jetlinks.core.message.Message;
 
 /**
@@ -11,7 +12,7 @@ import org.jetlinks.core.message.Message;
  * @createTime 2021年11月06日 09:57:00
  */
 @Data
-public class ClusterMessageReply {
+public class ClusterMessageReply implements ClusterReceiveSelfMessage {
     private String message;
     private String topic;
     private Message payload;

+ 2 - 1
jetlinks-core/src/main/java/org/jetlinks/core/message/Message.java

@@ -3,6 +3,7 @@ package org.jetlinks.core.message;
 import com.alibaba.fastjson.parser.ParserConfig;
 import com.alibaba.fastjson.util.TypeUtils;
 import lombok.Data;
+import org.jetlinks.core.cluster.annotation.ClusterReceiveSelfMessage;
 import org.jetlinks.core.metadata.Jsonable;
 
 import javax.annotation.Nullable;
@@ -27,7 +28,7 @@ import static org.jetlinks.core.message.MessageType.UNKNOWN;
  * @see ChildDeviceMessage
  * @see ChildDeviceMessageReply
  */
-public interface Message extends Jsonable, Serializable {
+public interface Message extends Jsonable, Serializable, ClusterReceiveSelfMessage {
 
     default MessageType getMessageType() {
         return UNKNOWN;

+ 1 - 2
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/message/AliBridgeMessage.java

@@ -1,14 +1,13 @@
 package org.jetlinks.community.bridge.message;
 
 import com.aliyun.iot.as.bridge.core.model.Session;
-import lombok.AllArgsConstructor;
 import lombok.Data;
 
 /**
  * @author lifang
  * @version 1.0.0
  * @ClassName AliBridgeMessage.java
- * @Description TODO
+ * @Description
  * @createTime 2021年11月30日 16:32:00
  */
 @Data

+ 4 - 4
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/server/aliyun/AliBridgeGateway.java

@@ -19,6 +19,8 @@ import org.springframework.stereotype.Service;
 import org.springframework.util.StringUtils;
 import reactor.core.publisher.*;
 import reactor.core.scheduler.Schedulers;
+
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
@@ -89,7 +91,6 @@ public class AliBridgeGateway{
                             .flatMap(this::decodeAndHandleMessage)
                             .subscribe();
                     })
-                    .flatMap(server -> server.refreshAllChannel().thenReturn(server))
                     .onErrorResume(error->
                         Mono.error(()->{
                             this.delBridgeServer(
@@ -102,9 +103,8 @@ public class AliBridgeGateway{
 
 
 
-    public Mono<AliBridgeServer> replaceBridgeServer(String oldBridgeId,AliIotBridgeEntity bridge,boolean broadcast){
-        return delBridgeServer(oldBridgeId,broadcast)
-            .concatWith(initBridge(bridge,broadcast))
+    public Mono<AliBridgeServer> replaceBridgeServer(AliIotBridgeEntity bridge,boolean broadcast){
+        return initBridge(bridge,broadcast)
             .then(lookUpServer(bridge.getId()));
     }
 

+ 105 - 25
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/server/aliyun/AliBridgeServer.java

@@ -8,9 +8,7 @@ import com.aliyun.iot.as.bridge.core.config.BridgeConfigConsts;
 import com.aliyun.iot.as.bridge.core.handler.DownlinkChannelHandler;
 import com.aliyun.iot.as.bridge.core.model.PopClientConfiguration;
 import com.aliyun.iot.as.bridge.core.model.Session;
-import lombok.Builder;
-import lombok.Data;
-import lombok.Getter;
+import lombok.*;
 import lombok.extern.slf4j.Slf4j;
 import org.hswebframework.web.exception.BusinessException;
 import org.jetlinks.community.bridge.core.DefaultBridgeBootstrap;
@@ -63,8 +61,6 @@ public class AliBridgeServer extends AbstractClusterUniqueTask<AliBridgeServer>
 
     private transient DefaultBridgeBootstrap bootstrap;
 
-    private BridgeStatus status;
-
     private transient AliBridgeGateway bridgeGateway;
 
     private AliIotBridgeEntity params;
@@ -73,21 +69,21 @@ public class AliBridgeServer extends AbstractClusterUniqueTask<AliBridgeServer>
     private final String id;
 
 
-    AliBridgeServer(EventBus eventBus, DeviceRegistry deviceRegistry, String id) {
-        super("_cluster_bridge_"+id,SpringUtil.getBean(ClusterManager.class),SpringUtil.getBean(RedissonClient.class));
+    AliBridgeServer(EventBus eventBus, DeviceRegistry deviceRegistry, AliIotBridgeEntity bridge) {
+        super("_cluster_bridge_"+bridge.getId(),SpringUtil.getBean(ClusterManager.class),SpringUtil.getBean(RedissonClient.class));
         this.deviceRegistry=deviceRegistry;
         this.eventBus = eventBus;
-        this.id=id;
+        this.id=bridge.getId();
+        this.params=bridge;
         this.bridgeGateway= SpringUtil.getBean(AliBridgeGateway.class);
     }
 
     public Mono<AliBridgeServer> initBridge(AliIotBridgeEntity params,boolean broadcast){
         this.params=params;
-        this.status=params.getState();
         verify(params);
         refreshBridgeConfig(params);
         params.setDeviceName(bridgeConfigManager.getDeviceName());
-        if(bootstrap!=null&&bootstrap.isBridgeConnected()){
+        if(bootstrap!=null){
             bootstrap.disconnectBridge();
             start.set(false);
         }
@@ -97,7 +93,7 @@ public class AliBridgeServer extends AbstractClusterUniqueTask<AliBridgeServer>
             getClusterOperationTopic().publish(Mono.just(OperationMessage.builder().param(params).init(true).build())).subscribe();
         }
         //非主节点、非运行状态、已启动
-        if(isReplica()||!BridgeStatus.running.equals(status)||start.get()){
+        if(isReplica()||!BridgeStatus.running.equals(params.getState())||start.get()){
             return Mono.just(this);
         }
         if (start.compareAndSet(false, true)) {
@@ -125,7 +121,12 @@ public class AliBridgeServer extends AbstractClusterUniqueTask<AliBridgeServer>
 
     public Mono<AliBridgeServer> refreshBootStrap(AliIotBridgeEntity params,boolean broadcast){
         if (this.bootstrap!=null) {
-            this.bootstrap.disconnectBridge();
+            try {
+                this.bootstrap.disconnectBridge();
+            }catch (Exception e){
+                log.error(e.getMessage());
+            }
+            this.bootstrap=null;
         }
         start.set(false);
         return initBridge(params,broadcast)
@@ -158,7 +159,7 @@ public class AliBridgeServer extends AbstractClusterUniqueTask<AliBridgeServer>
         Collection<DefaultAliBridgeChannel> channels = channelMap.values();
         return Flux.fromStream(channels.stream())
             .flatMap(channel->
-                register(this.getId(),channel.getOriginalIdentity(),channel.getProductKey(),channel.getDeviceName(),channel.getDeviceSecret(),false))
+                register(this.getId(),channel.getOriginalIdentity(),channel.getProductKey(),channel.getDeviceName(),channel.getDeviceSecret(),channel.getProductId(),false))
             .then();
     }
 
@@ -175,14 +176,19 @@ public class AliBridgeServer extends AbstractClusterUniqueTask<AliBridgeServer>
     }
 
 
-    public Mono<DefaultAliBridgeChannel> register(@NotNull String bridgeId,@NotNull String originalIdentity,@NotNull String productKey,@NotNull String deviceName,@NotNull String deviceSecret,
+    public Mono<DefaultAliBridgeChannel> register(@NotNull String bridgeId,@NotNull String originalIdentity,@NotNull String productKey,@NotNull String deviceName,@NotNull String deviceSecret,@NotNull String productId,
                                                   boolean broadcast) {
         //注册设备信息
         DefaultDeviceConfigManager.register(bridgeId,originalIdentity,productKey,deviceName,deviceSecret);
         DefaultUplinkChannelHandler uplinkChannelHandler = new DefaultUplinkChannelHandler(bridgeConfigManager, DefaultDeviceConfigManager.getInstance());
-        channelMap
-            .put(originalIdentity, new DefaultAliBridgeChannel(originalIdentity, productKey, deviceName,deviceSecret, uplinkChannelHandler, deviceRegistry, eventBus));
-
+        DefaultAliBridgeChannel oldChannel = channelMap
+            .put(originalIdentity, new DefaultAliBridgeChannel(originalIdentity, productKey, deviceName, deviceSecret,productId, uplinkChannelHandler, deviceRegistry, eventBus));
+        if(oldChannel!=null){
+            try {
+                oldChannel.close();
+            }catch (Exception e){
+            }
+        }
         if(broadcast){
             getClusterOperationTopic()
                 .publish(Mono.just(OperationMessage.builder()
@@ -191,6 +197,7 @@ public class AliBridgeServer extends AbstractClusterUniqueTask<AliBridgeServer>
                     .productKey(productKey)
                     .deviceName(deviceName)
                     .deviceSecret(deviceSecret)
+                    .productId(productId)
                     .register(true))).then(Mono.empty()).subscribe();
         }
         if(isReplica()){
@@ -225,9 +232,10 @@ public class AliBridgeServer extends AbstractClusterUniqueTask<AliBridgeServer>
             DefaultAliBridgeChannel channel = channelMap.remove(originalIdentity);
             if(broadcast){
                 getClusterOperationTopic()
-                    .publish(Mono.just(OperationMessage.builder()
-                        .originalIdentity(originalIdentity)
-                        .unRegister(true))).then(Mono.empty()).subscribe();
+                    .publish(Mono.just(
+                        OperationMessage.builder()
+                            .originalIdentity(originalIdentity)
+                            .unRegister(true))).then(Mono.empty()).subscribe();
             }
             if(!isReplica()&&channel!=null){
                 channel.close();
@@ -272,7 +280,7 @@ public class AliBridgeServer extends AbstractClusterUniqueTask<AliBridgeServer>
 
 
     private void changeStatus(BridgeStatus status,boolean broadcast){
-        this.status=status;
+        this.params.setState(status);
         if(broadcast){
             getClusterOperationTopic()
                 .publish(Mono.just(status)).subscribe();
@@ -287,7 +295,7 @@ public class AliBridgeServer extends AbstractClusterUniqueTask<AliBridgeServer>
 
     @Override
     public void beMasterPostProcessor() {
-        handleStatus(status,false).subscribe();
+        handleStatus(this.params.getState(),false).subscribe();
     }
 
     @Override
@@ -300,7 +308,7 @@ public class AliBridgeServer extends AbstractClusterUniqueTask<AliBridgeServer>
             OperationMessage message= (OperationMessage) msg;
             if(message.isRegister()){
                 //设备注册
-                return this.register(message.getBridgeId(),message.getOriginalIdentity(),message.getProductKey(),message.getDeviceName(),message.getDeviceSecret(),false);
+                return this.register(message.getBridgeId(),message.getOriginalIdentity(),message.getProductKey(),message.getDeviceName(),message.getDeviceSecret(),message.getProductId(),false);
             }else if(message.isUnRegister()){
                 return this.unRegister(message.getOriginalIdentity(),false);
             }
@@ -334,14 +342,14 @@ public class AliBridgeServer extends AbstractClusterUniqueTask<AliBridgeServer>
     public Mono<Void> handlePing(AbstractClusterUniqueTask task) {
         if(task instanceof  AliBridgeServer){
             AliBridgeServer server= (AliBridgeServer) task;
-            changeStatus(server.status,false);
+            changeStatus(server.params.getState(),false);
         }
         return Mono.empty();
     }
 
 
-    @Builder
     @Data
+    @AllArgsConstructor
     private static class OperationMessage implements Serializable {
         private boolean register;
         private boolean init;
@@ -351,6 +359,78 @@ public class AliBridgeServer extends AbstractClusterUniqueTask<AliBridgeServer>
         private String productKey;
         private String deviceName;
         private String deviceSecret;
+        private String productId;
+        private AliIotBridgeEntity param;
+
+        public static OperationMessageBuilder builder() {
+            return new OperationMessageBuilder();
+        }
+    }
+
+    private static class OperationMessageBuilder implements Serializable{
+        private boolean register;
+        private boolean init;
+        private boolean unRegister;
+        private String originalIdentity;
+        private String bridgeId;
+        private String productKey;
+        private String deviceName;
+        private String deviceSecret;
+        private String productId;
         private AliIotBridgeEntity param;
+
+        public OperationMessageBuilder register(boolean register) {
+            this.register = register;
+            return this;
+        }
+
+        public OperationMessageBuilder productId(String productId) {
+            this.productId = productId;
+            return this;
+        }
+
+        public OperationMessageBuilder init(boolean init) {
+            this.init = init;
+            return this;
+        }
+
+        public OperationMessageBuilder unRegister(boolean unRegister) {
+            this.unRegister = unRegister;
+            return this;
+        }
+
+        public OperationMessageBuilder originalIdentity(String originalIdentity) {
+            this.originalIdentity = originalIdentity;
+            return this;
+        }
+
+        public OperationMessageBuilder bridgeId(String bridgeId) {
+            this.bridgeId = bridgeId;
+            return this;
+        }
+
+        public OperationMessageBuilder productKey(String productKey) {
+            this.productKey = productKey;
+            return this;
+        }
+
+        public OperationMessageBuilder deviceName(String deviceName) {
+            this.deviceName = deviceName;
+            return this;
+        }
+
+        public OperationMessageBuilder deviceSecret(String deviceSecret) {
+            this.deviceSecret = deviceSecret;
+            return this;
+        }
+
+        public OperationMessageBuilder param(AliIotBridgeEntity param) {
+            this.param = param;
+            return this;
+        }
+
+        public OperationMessage build(){
+            return new OperationMessage(register,init,unRegister,originalIdentity,bridgeId,productKey,deviceName,deviceSecret,productId,param);
+        }
     }
 }

+ 2 - 2
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/server/aliyun/AliBridgeServerFactory.java

@@ -9,7 +9,7 @@ import reactor.core.publisher.Mono;
  * @author lifang
  * @version 1.0.0
  * @ClassName AliBridgeServerFactory.java
- * @Description TODO
+ * @Description
  * @createTime 2021年12月10日 08:40:00
  */
 public class AliBridgeServerFactory {
@@ -21,6 +21,6 @@ public class AliBridgeServerFactory {
 
 
     public static Mono<AliBridgeServer> create(EventBus eventBus, DeviceRegistry deviceRegistry, AliIotBridgeEntity bridge,boolean broadcast) {
-        return  Mono.just( new AliBridgeServer(eventBus,deviceRegistry,bridge.getId()));
+        return  Mono.just( new AliBridgeServer(eventBus,deviceRegistry,bridge));
     }
 }

+ 7 - 10
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/server/aliyun/DefaultAliBridgeChannel.java

@@ -44,8 +44,8 @@ public class DefaultAliBridgeChannel {
 
     private AtomicBoolean start=new AtomicBoolean(false);
 
-    private String deviceSecret;
-    private  String productId="**";
+    private final String deviceSecret;
+    private  final String productId;
     /**
      * @see DefaultBridgeBootstrap 网桥数据下行
      * @see DefaultUplinkChannelHandler 网桥数据下行
@@ -54,6 +54,7 @@ public class DefaultAliBridgeChannel {
                                    String productKey,
                                    String deviceName,
                                    String deviceSecret,
+                                   String productId,
                                    DefaultUplinkChannelHandler uplinkChannelHandler,
                                    DeviceRegistry deviceRegistry,
                                    EventBus eventBus) {
@@ -62,6 +63,7 @@ public class DefaultAliBridgeChannel {
         this.deviceRegistry=deviceRegistry;
         this.productKey=productKey;
         this.deviceName=deviceName;
+        this.productId=productId;
         this.uplinkChannelHandler = uplinkChannelHandler;
         this.deviceSecret=deviceSecret;
     }
@@ -76,13 +78,6 @@ public class DefaultAliBridgeChannel {
              *  属性设置,服务调用
              *  上下线
              */
-
-            disposable=deviceRegistry.getDevice(originalIdentity)
-                .flatMap(DeviceOperator::getProduct)
-                .doOnNext(deviceProductOperator -> {
-                    productId = deviceProductOperator.getId();}
-                ).subscribe();
-
             Subscription subscription = Subscription.builder()
                 .subscriberId("alibridge-" + this.originalIdentity)
                 //上下线
@@ -139,7 +134,9 @@ public class DefaultAliBridgeChannel {
 
 
     public void close(){
-        disposable.dispose();
+        if(disposable!=null){
+            disposable.dispose();
+        }
         doOffline();
         start.set(false);
     }

+ 0 - 1
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/service/AliBridgeDeviceService.java

@@ -2,7 +2,6 @@ package org.jetlinks.community.bridge.service;
 
 import org.hswebframework.web.crud.service.GenericReactiveCacheSupportCrudService;
 import org.jetlinks.community.bridge.entity.AliIotBridgeDeviceConfig;
-import org.jetlinks.community.bridge.entity.AliIotBridgeEntity;
 import org.springframework.stereotype.Component;
 
 /**

+ 2 - 4
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/service/AliBridgeService.java

@@ -4,9 +4,7 @@ import lombok.AllArgsConstructor;
 import org.hswebframework.web.crud.service.GenericReactiveCacheSupportCrudService;
 import org.jetlinks.community.bridge.entity.AliIotBridgeDeviceConfig;
 import org.jetlinks.community.bridge.entity.AliIotBridgeEntity;
-import org.jetlinks.community.bridge.enums.BridgeStatus;
 import org.jetlinks.community.bridge.server.aliyun.AliBridgeGateway;
-import org.jetlinks.core.cluster.ClusterManager;
 import org.springframework.boot.CommandLineRunner;
 import org.springframework.stereotype.Component;
 import reactor.core.scheduler.Schedulers;
@@ -15,7 +13,7 @@ import reactor.core.scheduler.Schedulers;
  * @author lifang
  * @version 1.0.0
  * @ClassName AliProductMappingService.java
- * @Description TODO
+ * @Description
  * @createTime 2021年11月27日 09:59:00
  */
 @Component
@@ -36,7 +34,7 @@ public class AliBridgeService extends GenericReactiveCacheSupportCrudService<Ali
                     .fetch()
                     .flatMap(deviceConfig->
                         bridgeGateway.lookUpServer(deviceConfig.getBridgeId()).flatMap(bridgeServer->
-                            bridgeServer.register(deviceConfig.getBridgeId(),deviceConfig.getOriginalIdentity(),deviceConfig.getProductKey(),deviceConfig.getDeviceName(),deviceConfig.getDeviceSecret(),false)
+                            bridgeServer.register(deviceConfig.getBridgeId(),deviceConfig.getOriginalIdentity(),deviceConfig.getProductKey(),deviceConfig.getDeviceName(),deviceConfig.getDeviceSecret(),deviceConfig.getProductId(),false)
                                 .then())
                     )))
             .subscribe();

+ 3 - 8
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/web/AliBridgeServerController.java

@@ -17,15 +17,10 @@ import org.jetlinks.community.bridge.entity.AliIotBridgeDeviceConfig;
 import org.jetlinks.community.bridge.enums.BridgeDeviceStatus;
 import org.jetlinks.community.bridge.enums.BridgeStatus;
 import org.jetlinks.community.bridge.server.aliyun.AliBridgeGateway;
-import org.jetlinks.community.bridge.server.aliyun.AliBridgeServer;
 import org.jetlinks.community.bridge.service.AliBridgeDeviceService;
 import org.jetlinks.community.bridge.service.AliBridgeService;
-import org.jetlinks.core.utils.IdUtils;
-import org.springframework.dao.DuplicateKeyException;
 import org.springframework.web.bind.annotation.*;
 import reactor.core.publisher.Mono;
-import reactor.util.function.Tuple2;
-import reactor.util.function.Tuple3;
 
 /**
  * @author lifang
@@ -95,7 +90,7 @@ public class AliBridgeServerController implements
                     .flatMap(ignore->bridgeGateway.lookUpServer(id)
                         .flatMap(existBridge->
                             existBridge.register(id,config.getOriginalIdentity(),config.getProductKey(),
-                                config.getDeviceName(),config.getDeviceSecret(),true))
+                                config.getDeviceName(),config.getDeviceSecret(),config.getProductId(),true))
                         .doOnError(BusinessException.class,
                             e->
                                 Mono.just(config)
@@ -157,14 +152,14 @@ public class AliBridgeServerController implements
             bridge.setId(String.valueOf(System.currentTimeMillis()));
         }
         return bridgeService.save(bridge)
-            .flatMap(ignore->bridgeGateway.replaceBridgeServer(bridge.getId(),bridge,true))
+            .flatMap(ignore->bridgeGateway.replaceBridgeServer(bridge,true))
             .concatWith(bridgeDeviceService.createQuery()
                 .where(AliIotBridgeDeviceConfig::getBridgeId,bridge.getId())
                 .fetch()
                 .flatMap(deviceConfig->
                     bridgeGateway.lookUpServer(id)
                         .flatMap(bridgeServer->bridgeServer
-                            .register(id,deviceConfig.getOriginalIdentity(),deviceConfig.getProductKey(),deviceConfig.getDeviceName(),deviceConfig.getDeviceSecret(),true).then(Mono.empty()))
+                            .register(id,deviceConfig.getOriginalIdentity(),deviceConfig.getProductKey(),deviceConfig.getDeviceName(),deviceConfig.getDeviceSecret(),deviceConfig.getProductId(),true).then(Mono.empty()))
                 )).then();
     }
 }

+ 1 - 1
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/cluster/RedisClusterManager.java

@@ -94,7 +94,7 @@ public class RedisClusterManager implements ClusterManager {
 
     @Override
     public <T> ClusterTopic<T> getTopic(String topic) {
-        return topics.computeIfAbsent(topic, id -> new RedisClusterTopic(id, this.getRedis()));
+        return topics.computeIfAbsent(topic, id -> new RedisClusterTopic(id, serverId,this.getRedis()));
     }
 
     @Override

+ 36 - 55
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/cluster/RedisClusterTopic.java

@@ -1,11 +1,17 @@
 package org.jetlinks.community.standalone.configuration.cluster;
 
+import lombok.AllArgsConstructor;
+import lombok.Data;
 import org.hswebframework.web.exception.BusinessException;
+import org.jetlinks.core.cluster.AbstractClusterUniqueTask;
 import org.jetlinks.core.cluster.ClusterTopic;
+import org.jetlinks.core.cluster.annotation.ClusterReceiveSelfMessage;
 import org.reactivestreams.Publisher;
 import org.springframework.data.redis.core.ReactiveRedisOperations;
 import reactor.core.Disposable;
 import reactor.core.publisher.*;
+
+import java.io.Serializable;
 import java.time.Duration;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -13,7 +19,7 @@ public class RedisClusterTopic<T> implements ClusterTopic<T> {
 
     private final String topicName;
 
-    private final ReactiveRedisOperations<Object, T> operations;
+    private final ReactiveRedisOperations<Object, ChannelMessage> operations;
 
     private final FluxProcessor<TopicMessage<T>, TopicMessage<T>> processor;
 
@@ -21,8 +27,10 @@ public class RedisClusterTopic<T> implements ClusterTopic<T> {
 
     private final AtomicBoolean subscribed = new AtomicBoolean();
 
-    public RedisClusterTopic(String topic, ReactiveRedisOperations<Object, T> operations) {
+    private final String fromId;
+    public RedisClusterTopic(String topic,String fromId, ReactiveRedisOperations<Object, ChannelMessage> operations) {
         this.topicName = topic;
+        this.fromId=fromId;
         this.operations = operations;
         processor = EmitterProcessor.create(false);
         sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);
@@ -35,22 +43,26 @@ public class RedisClusterTopic<T> implements ClusterTopic<T> {
             disposable = operations
                 .listenToPattern(topicName)
                 .subscribe(data -> {
-                    if (!processor.hasDownstreams()) {
-                        disposable.dispose();
-                        subscribed.compareAndSet(true, false);
-                    } else {
-                        sink.next(new TopicMessage<T>() {
-                            @Override
-                            public String getTopic() {
-                                return data.getChannel();
-                            }
-
-                            @Override
-                            public T getMessage() {
-                                return data.getMessage();
-                            }
-
-                        });
+                    ChannelMessage message = data.getMessage();
+                    Object messageObject = message.getObject();
+                    if (messageObject instanceof ClusterReceiveSelfMessage||!fromId.equals(message.getFromId())) {
+                        if (!processor.hasDownstreams()) {
+                            disposable.dispose();
+                            subscribed.compareAndSet(true, false);
+                        } else {
+                            sink.next(new TopicMessage<T>() {
+                                @Override
+                                public String getTopic() {
+                                    return data.getChannel();
+                                }
+
+                                @Override
+                                public T getMessage() {
+                                    return (T) message.getObject();
+                                }
+
+                            });
+                        }
                     }
                 });
         }
@@ -65,46 +77,15 @@ public class RedisClusterTopic<T> implements ClusterTopic<T> {
     @Override
     public Mono<Integer> publish(Publisher<? extends T> publisher) {
         return Flux.from(publisher)
-            .flatMap(data -> operations.convertAndSend(topicName, data))
-            .last(1L)
-            .map(Number::intValue);
-    }
-
-
-    /**
-     * 回复消息
-     * @param publisher
-     * @param messageId
-     * @return
-     */
-    @Override
-    public Mono<Integer> reply(Publisher<? extends T> publisher, String messageId){
-        return Flux.from(publisher)
-            .flatMap(data -> operations.convertAndSend("message-reply" + messageId, data))
+            .flatMap(data -> operations.convertAndSend(topicName,ChannelMessage.of(data,fromId)))
             .last(1L)
             .map(Number::intValue);
     }
 
-
-    @Override
-    public Flux<TopicMessage<T>> publishAndReceive(Publisher<? extends T> publisher, long timeout, String messageId) {
-        return  Flux.from(publisher)
-            .flatMap(data -> operations.convertAndSend(topicName, data))
-            .flatMap(ignore->
-                operations.listenToPattern("message-reply" + messageId)
-                    .timeout(Duration.ofSeconds(timeout), Mono.error(new BusinessException("响应超时")))
-                    .map(data->
-                        new TopicMessage<T>() {
-                            @Override
-                            public String getTopic() {
-                                return data.getChannel();
-                            }
-
-                            @Override
-                            public T getMessage() {
-                                return data.getMessage();
-                            }
-                        })
-            );
+    @AllArgsConstructor(staticName = "of")
+    @Data
+    static class ChannelMessage implements Serializable {
+        Object object;
+        String fromId;
     }
 }