Bladeren bron

add 集群

18339543638 4 jaren geleden
bovenliggende
commit
76a246e7a4

+ 7 - 7
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/cluster/ClusterDeviceMessageBrokeMessageBroker.java

@@ -11,8 +11,6 @@ import org.jetlinks.core.message.*;
 import org.reactivestreams.Publisher;
 import reactor.core.publisher.*;
 import reactor.core.scheduler.Schedulers;
-
-import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
@@ -31,7 +29,11 @@ public class ClusterDeviceMessageBrokeMessageBroker extends StandaloneDeviceMess
 
     private ClusterManager clusterManager;
 
-    private List<ServerNode> allNode;
+
+
+    private List<ServerNode> getAllNode(){
+        return clusterManager.getHaManager().getAllNode();
+    }
 
     public ClusterDeviceMessageBrokeMessageBroker(String serverId, ClusterManager clusterManager) {
         this.serverId = serverId;
@@ -54,7 +56,7 @@ public class ClusterDeviceMessageBrokeMessageBroker extends StandaloneDeviceMess
             //当前设备
             return super.send(serverId,message);
         }else {
-            Optional<ServerNode> server = allNode.stream().filter(node -> node.getId().equals(serverId)).findFirst();
+            Optional<ServerNode> server = getAllNode().stream().filter(node -> node.getId().equals(serverId)).findFirst();
             if(!server.isPresent()){
                 //设备连接服务器离线,则发送信息失败
                 log.error("服务器{}离线,发送设备信息失败",serverId);
@@ -71,9 +73,7 @@ public class ClusterDeviceMessageBrokeMessageBroker extends StandaloneDeviceMess
 
     @Override
     public Mono<Integer> send(Publisher<? extends BroadcastMessage> message) {
-        List<ServerNode> serverNodes = CollectionUtil.newArrayList(allNode);
-        return  Flux.fromStream(serverNodes.stream())
-            .filter(allNode::contains)
+        return  Flux.fromStream(getAllNode().stream())
             .map(ServerNode::getId)
             .flatMap(serverId-> clusterManager.getTopic(ClusterMessageType.topicOf(serverId,ClusterMessageType.down))
                 .publish(Mono.from(message).doOnNext(msg->msg.addHeader(Headers.serverId,serverId))))

+ 5 - 6
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/cluster/ClusterDeviceMessageConnector.java

@@ -43,20 +43,19 @@ public class ClusterDeviceMessageConnector extends DeviceMessageConnector {
 
     private ClusterManager clusterManager;
 
-    private List<ServerNode> allNode;
     private volatile  boolean start=false;
 
     Disposable listenTopic=null;
 
+    private List<ServerNode> getAllNode(){
+        return clusterManager.getHaManager().getAllNode();
+    }
     public synchronized void  init(){
         if(start){
             return;
         }
         start=true;
 
-        //映射出所有集群节点信息
-        allNode = clusterManager.getHaManager().getAllNode();
-
         //10秒查看监听流是否存活
         //注册监听队列
         Flux.interval(Duration.ofSeconds(10))
@@ -78,7 +77,7 @@ public class ClusterDeviceMessageConnector extends DeviceMessageConnector {
     public Mono<Boolean> handleMessage(DeviceOperator device, @Nonnull Message message) {
         message.addHeader(Headers.serverId,serverId);
         if(message instanceof BroadcastMessage){
-           return  Flux.fromStream(allNode.stream())
+           return  Flux.fromStream(getAllNode().stream())
                 .flatMap(node-> clusterManager
                     .getTopic(ClusterMessageType.topicOf(serverId, ClusterMessageType.up))
                         .publish(Mono.just(message))
@@ -92,7 +91,7 @@ public class ClusterDeviceMessageConnector extends DeviceMessageConnector {
                     if(serverId.equals(this.serverId)){
                         return super.handleMessage(device,message);
                     }else {
-                        Optional<ServerNode> first = allNode.stream().filter(node -> node.getId().equals(serverId)).findFirst();
+                        Optional<ServerNode> first = getAllNode().stream().filter(node -> node.getId().equals(serverId)).findFirst();
                         if(!first.isPresent()){
                             //设备连接服务器离线,则信息交由当前服务器处理
                             return super.handleMessage(device,message);