Bläddra i källkod

fixed 集群网桥

18339543638 4 år sedan
förälder
incheckning
fdca67ee6a

+ 47 - 0
jetlinks-core/src/main/java/org/jetlinks/core/cluster/ClusterService.java

@@ -0,0 +1,47 @@
+package org.jetlinks.core.cluster;
+
+import reactor.core.publisher.EmitterProcessor;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
+import reactor.core.publisher.Mono;
+
+import java.util.function.Function;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName ClusterServer.java
+ * @Description 集群服务,用来在集群件传递服务消息
+ * @createTime 2021年12月13日 08:33:00
+ */
+public abstract class ClusterService<T> {
+
+    private final String listenTopic;
+    private final ClusterTopic<T> clusterTopic;
+
+    private EmitterProcessor<T> msgProcessor=EmitterProcessor.create(false);
+    private FluxSink<T> sink = msgProcessor.sink();
+
+    public ClusterService(ClusterManager clusterManager) {
+        listenTopic="_cluster_service_"+this.getClass().getSimpleName();
+        clusterTopic = clusterManager.getTopic(listenTopic);
+        clusterTopic
+            .subscribePattern()
+            .map(ClusterTopic.TopicMessage::getMessage)
+            .doOnNext(msg->{
+                if(msgProcessor.hasDownstreams()){
+                    sink.next(msg);
+                }
+            }).subscribe();
+    }
+
+
+    public Flux<T> handleClusterMsg(){
+        return  msgProcessor.map(Function.identity());
+    }
+
+
+    public Mono<Void> sendClusterMsg(T msg){
+        return clusterTopic.publish(Mono.just(msg)).then();
+    }
+}

+ 27 - 1
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/entity/AliIotBridgeDeviceConfig.java

@@ -8,6 +8,8 @@ import org.hswebframework.ezorm.rdb.mapping.annotation.Comment;
 import org.hswebframework.ezorm.rdb.mapping.annotation.DefaultValue;
 import org.hswebframework.ezorm.rdb.mapping.annotation.EnumCodec;
 import org.hswebframework.web.api.crud.entity.GenericEntity;
+import org.hswebframework.web.api.crud.entity.RecordCreationEntity;
+import org.hswebframework.web.crud.generator.Generators;
 import org.jetlinks.community.bridge.enums.BridgeDeviceStatus;
 import javax.persistence.Column;
 import javax.persistence.Index;
@@ -28,7 +30,7 @@ import javax.validation.constraints.NotNull;
 },
 schema = "阿里云设备网桥配置")
 @EqualsAndHashCode(callSuper = false)
-public class AliIotBridgeDeviceConfig extends GenericEntity<String> {
+public class AliIotBridgeDeviceConfig extends GenericEntity<String> implements RecordCreationEntity {
 
     @NotNull
     @Comment("设备所属产品")
@@ -96,4 +98,28 @@ public class AliIotBridgeDeviceConfig extends GenericEntity<String> {
     @Column(name = "error_reason")
     @Schema(description = "失败原因")
     private String errorReason;
+
+
+
+    @Column(name = "creator_id", updatable = false)
+    @Schema(
+        description = "创建者ID(只读)"
+        ,accessMode = Schema.AccessMode.READ_ONLY
+    )
+    private String creatorId;
+
+    @Column(name = "creator_name", updatable = false)
+    @Schema(
+        description = "创建者名称(只读)"
+        ,accessMode = Schema.AccessMode.READ_ONLY
+    )
+    private String creatorName;
+
+    @Column(name = "create_time", updatable = false)
+    @DefaultValue(generator = Generators.CURRENT_TIME)
+    @Schema(
+        description = "创建时间(只读)"
+        , accessMode = Schema.AccessMode.READ_ONLY
+    )
+    private Long createTime;
 }

+ 25 - 13
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/entity/AliIotBridgeEntity.java

@@ -7,6 +7,8 @@ import lombok.Getter;
 import lombok.Setter;
 import org.hswebframework.ezorm.rdb.mapping.annotation.*;
 import org.hswebframework.web.api.crud.entity.GenericEntity;
+import org.hswebframework.web.api.crud.entity.RecordCreationEntity;
+import org.hswebframework.web.crud.generator.Generators;
 import org.jetlinks.community.bridge.enums.BridgeStatus;
 import org.jetlinks.core.device.DeviceState;
 
@@ -30,19 +32,13 @@ import java.sql.JDBCType;
 })
 @Getter
 @Setter
-public class AliIotBridgeEntity extends GenericEntity<String> {
+public class AliIotBridgeEntity extends GenericEntity<String>  implements RecordCreationEntity {
 
     @Comment("云云对接名称")
     @Column(name = "name",nullable = false)
     @Schema(description = "云云对接名称")
     private String name;
 
-    @Comment("网桥产品id")
-    @Column(name = "product_id")
-    @Schema(description = "网桥产品id")
-    @Deprecated
-    private String productId;
-
     @Comment("说明")
     @Column(name = "description")
     @Schema(description = "说明")
@@ -55,12 +51,6 @@ public class AliIotBridgeEntity extends GenericEntity<String> {
     @ColumnType(jdbcType = JDBCType.CLOB)
     private AccessConfig accessConfig;
 
-    @Comment("云云协议")
-    @Column(name = "protocol")
-    @Schema(description = "云云协议")
-    @Deprecated
-    private String protocol;
-
     @Column(name = "state",length = 16,nullable = false)
     @EnumCodec
     @ColumnType(javaType = String.class)
@@ -89,6 +79,28 @@ public class AliIotBridgeEntity extends GenericEntity<String> {
     @Schema(description = "所在集群节点id")
     private String nodeId;
 
+
+    @Column(name = "creator_id", updatable = false)
+    @Schema(
+        description = "创建者ID(只读)"
+        ,accessMode = Schema.AccessMode.READ_ONLY
+    )
+    private String creatorId;
+
+    @Column(name = "creator_name", updatable = false)
+    @Schema(
+        description = "创建者名称(只读)"
+        ,accessMode = Schema.AccessMode.READ_ONLY
+    )
+    private String creatorName;
+
+    @Column(name = "create_time", updatable = false)
+    @DefaultValue(generator = Generators.CURRENT_TIME)
+    @Schema(
+        description = "创建时间(只读)"
+        , accessMode = Schema.AccessMode.READ_ONLY
+    )
+    private Long createTime;
     @Data
     public static class AccessConfig implements Serializable {
         @NotNull

+ 14 - 4
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/server/aliyun/AliBridgeGateway.java

@@ -6,6 +6,8 @@ import org.hswebframework.web.logger.ReactiveLogger;
 import org.jetlinks.community.bridge.core.AliBridgeCodec;
 import org.jetlinks.community.bridge.entity.AliIotBridgeEntity;
 import org.jetlinks.community.bridge.message.AliBridgeMessage;
+import org.jetlinks.core.cluster.ClusterManager;
+import org.jetlinks.core.cluster.ClusterService;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.event.EventBus;
 import org.jetlinks.core.message.CommonDeviceMessage;
@@ -19,8 +21,6 @@ import org.springframework.stereotype.Service;
 import org.springframework.util.StringUtils;
 import reactor.core.publisher.*;
 import reactor.core.scheduler.Schedulers;
-
-import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
@@ -35,7 +35,7 @@ import java.util.Objects;
  */
 @Slf4j
 @Service
-public class AliBridgeGateway{
+public class AliBridgeGateway extends ClusterService<AliIotBridgeEntity> {
     private Map<String,AliBridgeServer> bridgeMap=new HashMap<>();
     private final EmitterProcessor<Message> messageProcessor = EmitterProcessor.create(false);
 
@@ -48,10 +48,18 @@ public class AliBridgeGateway{
 
     public AliBridgeGateway(DeviceRegistry registry,
                             EventBus eventBus,
+                            ClusterManager clusterManager,
                             DecodedClientMessageHandler messageHandler) {
+        super(clusterManager);
         this.registry = registry;
         this.eventBus=eventBus;
         this.messageHandler = messageHandler;
+        
+        this.handleClusterMsg()
+            .flatMap(replaceMsg->
+                replaceBridgeServer(replaceMsg,false)
+            ).subscribe();
+
     }
 
 
@@ -104,7 +112,9 @@ public class AliBridgeGateway{
 
 
     public Mono<AliBridgeServer> replaceBridgeServer(AliIotBridgeEntity bridge,boolean broadcast){
-        return initBridge(bridge,broadcast)
+        return Mono.zip(Mono.just(broadcast)
+            .filter(Boolean.TRUE::equals)
+            .flatMap(ignore->sendClusterMsg(bridge)),initBridge(bridge,broadcast))
             .then(lookUpServer(bridge.getId()));
     }
 

+ 4 - 1
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/web/AliBridgeServerController.java

@@ -19,6 +19,8 @@ import org.jetlinks.community.bridge.enums.BridgeStatus;
 import org.jetlinks.community.bridge.server.aliyun.AliBridgeGateway;
 import org.jetlinks.community.bridge.service.AliBridgeDeviceService;
 import org.jetlinks.community.bridge.service.AliBridgeService;
+import org.jetlinks.core.cluster.ClusterManager;
+import org.jetlinks.core.cluster.ClusterService;
 import org.springframework.web.bind.annotation.*;
 import reactor.core.publisher.Mono;
 
@@ -37,13 +39,14 @@ import reactor.core.publisher.Mono;
 @AllArgsConstructor
 @Tag(name = "阿里云网桥服务")
 public class AliBridgeServerController implements
-    ReactiveServiceCrudController<AliIotBridgeEntity, String> {
+    ReactiveServiceCrudController<AliIotBridgeEntity, String>  {
 
     private final AliBridgeService bridgeService;
 
     private final AliBridgeGateway bridgeGateway;
 
     private final AliBridgeDeviceService  bridgeDeviceService;
+
     @Override
     public ReactiveCrudService<AliIotBridgeEntity, String> getService() {
         return bridgeService;