18339543638 vor 4 Jahren
Ursprung
Commit
29a40088fb

+ 3 - 1
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/cluster/ClusterDeviceMessageBrokeMessageBroker.java

@@ -76,7 +76,9 @@ public class ClusterDeviceMessageBrokeMessageBroker extends StandaloneDeviceMess
         return  Flux.fromStream(getAllNode().stream())
             .map(ServerNode::getId)
             .flatMap(serverId-> clusterManager.getTopic(ClusterMessageType.topicOf(serverId,ClusterMessageType.down))
-                .publish(Mono.from(message).doOnNext(msg->msg.addHeader(Headers.serverId,serverId))))
+                .publish(Mono.from(message)
+                    .doOnNext(msg->msg.addHeader(Headers.serverId,serverId))
+                    .map(msg->new ClusterMessage(msg,serverId,ClusterMessageType.topicOf(serverId,ClusterMessageType.down)))))
             .collect(Collectors.counting())
             .map(Long::intValue);
     }

+ 15 - 17
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/cluster/ClusterDeviceMessageConnector.java

@@ -1,5 +1,6 @@
 package org.jetlinks.community.standalone.configuration.cluster;
 
+import cn.hutool.core.util.ObjectUtil;
 import cn.hutool.core.util.StrUtil;
 import lombok.extern.slf4j.Slf4j;
 import org.jetlinks.community.device.message.DeviceMessageConnector;
@@ -32,7 +33,7 @@ import java.util.*;
 @Slf4j
 public class ClusterDeviceMessageConnector extends DeviceMessageConnector {
 
-    private LinkedHashMap<String,Message> messageServerMap=new LinkedHashMap<>(1024,0.75f,true);
+    private LinkedHashMap<String,ClusterMessage> messageServerMap=new LinkedHashMap<>(1024,0.75f,true);
 
     public ClusterDeviceMessageConnector(EventBus eventBus, DeviceRegistry registry, MessageHandler messageHandler, DeviceSessionManager sessionManager, String serverId, ClusterManager clusterManager) {
         super(eventBus, registry, messageHandler, sessionManager);
@@ -67,22 +68,19 @@ public class ClusterDeviceMessageConnector extends DeviceMessageConnector {
                         .mergeWith(clusterManager.getTopic(ClusterMessageType.topicOf(serverId, ClusterMessageType.down)).subscribePattern())
                         .publishOn(Schedulers.boundedElastic())
                         .flatMap(message -> {
-                            Message msg=null;
+                            ClusterMessage msg=null;
                             Object obj = message.getMessage();
                             if(obj instanceof ClusterMessage){
-                                ClusterMessage clusterMessage= (ClusterMessage) obj;
-                                msg=clusterMessage.getPayload();
-                            }else if(obj instanceof Message){
-                                msg= (Message) obj;
+                                msg= (ClusterMessage) obj;
                             }
                             if(msg==null){
                                 return Mono.empty();
                             }
-                            if(StrUtil.isNotEmpty(msg.getMessageId())){
-                                messageServerMap.put(msg.getMessageId(),msg);
+                            if(ObjectUtil.isNotNull(msg)){
+                                messageServerMap.put(msg.getPayload().getMessageId(),msg);
                             }
                             return super.handleMessage(null,
-                                msg);
+                                msg.getPayload());
                         })
                         .doOnError(e -> log.error("集群信息接收失败,节点关闭{};", serverId,e))
                         .subscribe();
@@ -117,30 +115,30 @@ public class ClusterDeviceMessageConnector extends DeviceMessageConnector {
     public Mono<Boolean> handleMessage(DeviceOperator device, @Nonnull Message message) {
         message.addHeader(Headers.serverId,serverId);
         if(message instanceof BroadcastMessage){
-           return  Flux.fromStream(getAllNode().stream())
+            return  Flux.fromStream(getAllNode().stream())
                 .flatMap(node-> clusterManager
                     .getTopic(ClusterMessageType.topicOf(node.getId(), ClusterMessageType.up))
-                        .publish(Mono.just(message))
+                    .publish(Mono.just(message))
                 )
                 .flatMap(result->result==0?Mono.just(false):Mono.just(true))
-               .reduce((a1,a2)->a1&&a2);
+                .reduce((a1,a2)->a1&&a2);
         }else {
             return  device.getConnectionServerId()
                 .flatMap(serverId->{
-                    Message acceptMessage = messageServerMap.get(message.getMessageId());
+                    ClusterMessage acceptMessage = messageServerMap.get(message.getMessageId());
                     if(acceptMessage==null){
                         return super.handleMessage(device, message);
                     }
                     //发送消息服务器
-                    String messageServerId = acceptMessage.getHeader(Headers.serverId).orElse(this.serverId);
-                    if(StrUtil.isNotEmpty(messageServerId)&&messageServerId.equals(this.serverId)){
+                    String fromServerId = acceptMessage.getPayload().getHeader(Headers.serverId).orElse(this.serverId);
+                    if(StrUtil.isNotEmpty(fromServerId)&&fromServerId.equals(this.serverId)){
                         //发送消息服务器即本台服务器
                         return super.handleMessage(device,message);
-                    }else if(StrUtil.isEmpty(messageServerId)&&serverId.equals(this.serverId)){
+                    }else if(StrUtil.isEmpty(fromServerId)&&serverId.equals(this.serverId)){
                         //非回复消息,且当前服务器与设备相连,则查询上报消息的服务器,设备连接当前服务器,不需要通过集群传递信息
                         return super.handleMessage(device,message);
                     }else {
-                        Optional<ServerNode> first = getAllNode().stream().filter(node -> node.getId().equals(messageServerId)).findFirst();
+                        Optional<ServerNode> first = getAllNode().stream().filter(node -> node.getId().equals(fromServerId)).findFirst();
                         if(!first.isPresent()){
                             //设备连接服务器离线,则信息交由当前服务器处理
                             return super.handleMessage(device,message);