Ver Fonte

add 集群

18339543638 há 4 anos atrás
pai
commit
3de1e078db

+ 45 - 8
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/cluster/ClusterDeviceMessageConnector.java

@@ -1,5 +1,6 @@
 package org.jetlinks.community.standalone.configuration.cluster;
 
+import cn.hutool.core.util.StrUtil;
 import lombok.extern.slf4j.Slf4j;
 import org.jetlinks.community.device.message.DeviceMessageConnector;
 import org.jetlinks.core.cluster.ClusterManager;
@@ -16,11 +17,11 @@ import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
-
 import javax.annotation.Nonnull;
 import java.time.Duration;
-import java.util.List;
-import java.util.Optional;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.*;
 
 /**
  * @author lifang
@@ -32,6 +33,7 @@ import java.util.Optional;
 @Slf4j
 public class ClusterDeviceMessageConnector extends DeviceMessageConnector {
 
+    private LinkedHashMap<String,Message> messageServerMap=new LinkedHashMap<>(1024,0.75f,true);
 
     public ClusterDeviceMessageConnector(EventBus eventBus, DeviceRegistry registry, MessageHandler messageHandler, DeviceSessionManager sessionManager, String serverId, ClusterManager clusterManager) {
         super(eventBus, registry, messageHandler, sessionManager);
@@ -64,14 +66,40 @@ public class ClusterDeviceMessageConnector extends DeviceMessageConnector {
                     listenTopic = clusterManager.getTopic(ClusterMessageType.topicOf(serverId, ClusterMessageType.up))
                         .subscribePattern()
                         .publishOn(Schedulers.boundedElastic())
-                        .flatMap(message -> super.handleMessage(null,
-                            (Message) message.getMessage()))
+                        .flatMap(message -> {
+                            Message msg = (Message) message.getMessage();
+                            messageServerMap.put(msg.getMessageId(),msg);
+                            return super.handleMessage(null,
+                                msg);
+                        })
                         .doOnError(e -> log.error("集群信息接收失败,节点关闭{};", serverId,e))
                         .subscribe();
                     log.info("集群节点重启{}成功",serverId);
                 }
             }).subscribe();
 
+        /**
+         * 清理过期消息
+         */
+        Flux.interval(Duration.ofSeconds(5))
+            .doOnNext(ignore->{
+                LocalDateTime now = LocalDateTime.now();
+                Set<String> keySet = messageServerMap.keySet();
+                for (String key : keySet) {
+                    Message message = messageServerMap.get(key);
+                    if(message!=null){
+                        long messageTime = message.getTimestamp();
+                        if(Duration.between(LocalDateTime.ofEpochSecond(messageTime,0, ZoneOffset.ofHours(8)),now).toMillis()>3000){
+                            messageServerMap.remove(key);
+                        }else {
+                            break;
+                        }
+                    }
+                }
+            })
+            .subscribe();
+
+
     }
     @Override
     public Mono<Boolean> handleMessage(DeviceOperator device, @Nonnull Message message) {
@@ -87,11 +115,20 @@ public class ClusterDeviceMessageConnector extends DeviceMessageConnector {
         }else {
             return  device.getConnectionServerId()
                 .flatMap(serverId->{
-                    //设备连接当前服务器,不需要通过集群传递信息
-                    if(serverId.equals(this.serverId)){
+                    Message acceptMessage = messageServerMap.get(message.getMessageId());
+                    if(acceptMessage==null){
+                        return super.handleMessage(device, message);
+                    }
+                    //发送消息服务器
+                    String messageServerId = acceptMessage.getHeader(Headers.serverId).orElse(this.serverId);
+                    if(StrUtil.isNotEmpty(messageServerId)&&messageServerId.equals(this.serverId)){
+                        //发送消息服务器即本台服务器
+                        return super.handleMessage(device,message);
+                    }else if(StrUtil.isEmpty(messageServerId)&&serverId.equals(this.serverId)){
+                        //非回复消息,且当前服务器与设备相连,则查询上报消息的服务器,设备连接当前服务器,不需要通过集群传递信息
                         return super.handleMessage(device,message);
                     }else {
-                        Optional<ServerNode> first = getAllNode().stream().filter(node -> node.getId().equals(serverId)).findFirst();
+                        Optional<ServerNode> first = getAllNode().stream().filter(node -> node.getId().equals(messageServerId)).findFirst();
                         if(!first.isPresent()){
                             //设备连接服务器离线,则信息交由当前服务器处理
                             return super.handleMessage(device,message);