浏览代码

add 集群

18339543638 4 年之前
父节点
当前提交
bbec9d986b

+ 4 - 5
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/cluster/ClusterDeviceMessageBrokeMessageBroker.java

@@ -13,7 +13,6 @@ import reactor.core.publisher.*;
 import reactor.core.scheduler.Schedulers;
 import java.util.List;
 import java.util.Optional;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 /**
@@ -66,8 +65,8 @@ public class ClusterDeviceMessageBrokeMessageBroker extends StandaloneDeviceMess
         }
         return clusterManager.getTopic(ClusterMessageType.topicOf(serverId,ClusterMessageType.down))
             .publish(Mono.from(message)
-                .doOnNext(msg->msg.addHeader(Headers.serverId,serverId))
-                .map(msg->new ClusterMessage(msg,serverId,ClusterMessageType.topicOf(serverId,ClusterMessageType.down))));
+                .doOnNext(msg->msg.addHeader(Headers.serverId,this.serverId))
+                .map(msg->new ClusterMessage(msg,this.serverId,ClusterMessageType.topicOf(serverId,ClusterMessageType.down))));
     }
 
     @Override
@@ -76,8 +75,8 @@ public class ClusterDeviceMessageBrokeMessageBroker extends StandaloneDeviceMess
             .map(ServerNode::getId)
             .flatMap(serverId-> clusterManager.getTopic(ClusterMessageType.topicOf(serverId,ClusterMessageType.down))
                 .publish(Mono.from(message)
-                    .doOnNext(msg->msg.addHeader(Headers.serverId,serverId))
-                    .map(msg->new ClusterMessage(msg,serverId,ClusterMessageType.topicOf(serverId,ClusterMessageType.down)))))
+                    .doOnNext(msg->msg.addHeader(Headers.serverId,this.serverId))
+                    .map(msg->new ClusterMessage(msg,this.serverId,ClusterMessageType.topicOf(serverId,ClusterMessageType.down)))))
             .collect(Collectors.counting())
             .map(Long::intValue);
     }

+ 4 - 1
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/cluster/ClusterDeviceMessageConnector.java

@@ -15,6 +15,7 @@ import org.jetlinks.core.message.Headers;
 import org.jetlinks.core.message.Message;
 import org.jetlinks.core.server.MessageHandler;
 import org.jetlinks.core.server.session.DeviceSessionManager;
+import org.springframework.util.ConcurrentReferenceHashMap;
 import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -101,6 +102,7 @@ public class ClusterDeviceMessageConnector extends DeviceMessageConnector {
             .doOnNext(ignore->{
                 LocalDateTime now = LocalDateTime.now();
                 Set<String> keySet = messageServerMap.keySet();
+                List<String> removeKeys = new LinkedList<>();
                 for (String key : keySet) {
                     ClusterMessage message = messageServerMap.get(key);
                     if(message!=null){
@@ -108,12 +110,13 @@ public class ClusterDeviceMessageConnector extends DeviceMessageConnector {
                         //超过10s将过期消息删除
                         if(Duration.between(LocalDateTime.ofEpochSecond(messageTime/1000,0, ZoneOffset.ofHours(8)),now)
                             .toMillis()>DEFAULT_TIMEOUT){
-                            messageServerMap.remove(key);
+                            removeKeys.add(key);
                         }else {
                             break;
                         }
                     }
                 }
+                removeKeys.forEach(messageServerMap::remove);
             })
             .subscribe();