Bladeren bron

add 集群

18339543638 4 jaren geleden
bovenliggende
commit
aaf85196bf

+ 3 - 3
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/cluster/ClusterDeviceMessageBrokeMessageBroker.java

@@ -1,12 +1,12 @@
 package org.jetlinks.community.standalone.configuration.cluster;
 
-import cn.hutool.core.collection.CollectionUtil;
 import lombok.extern.slf4j.Slf4j;
-import org.hswebframework.web.exception.BusinessException;
 import org.jetlinks.community.standalone.configuration.cluster.message.ClusterMessage;
 import org.jetlinks.core.cluster.ClusterManager;
 import org.jetlinks.core.cluster.ServerNode;
 import org.jetlinks.core.device.StandaloneDeviceMessageBroker;
+import org.jetlinks.core.enums.ErrorCode;
+import org.jetlinks.core.exception.DeviceOperationException;
 import org.jetlinks.core.message.*;
 import org.reactivestreams.Publisher;
 import reactor.core.publisher.*;
@@ -61,7 +61,7 @@ public class ClusterDeviceMessageBrokeMessageBroker extends StandaloneDeviceMess
                 //设备连接服务器离线,则发送信息失败
                 log.error("服务器{}离线,发送设备信息失败",serverId);
                 //todo 设备设置为离线状态
-                throw new BusinessException("设备所连接服务器离线,无法发送信息");
+                throw new DeviceOperationException(ErrorCode.SERVER_NOT_AVAILABLE);
 
             }
         }

+ 34 - 22
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/cluster/ClusterDeviceMessageConnector.java

@@ -3,6 +3,7 @@ package org.jetlinks.community.standalone.configuration.cluster;
 import cn.hutool.core.util.StrUtil;
 import lombok.extern.slf4j.Slf4j;
 import org.jetlinks.community.device.message.DeviceMessageConnector;
+import org.jetlinks.community.standalone.configuration.cluster.message.ClusterMessage;
 import org.jetlinks.core.cluster.ClusterManager;
 import org.jetlinks.core.cluster.ServerNode;
 import org.jetlinks.core.device.DeviceOperator;
@@ -19,15 +20,13 @@ import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 import javax.annotation.Nonnull;
 import java.time.Duration;
-import java.time.LocalDateTime;
-import java.time.ZoneOffset;
 import java.util.*;
 
 /**
  * @author lifang
  * @version 1.0.0
  * @ClassName ClusterDeviceMessageConnector.java
- * @Description 上行数据
+ * @Description 上行数据, 保持服务器时钟一致
  * @createTime 2021年11月06日 16:18:00
  */
 @Slf4j
@@ -65,10 +64,23 @@ public class ClusterDeviceMessageConnector extends DeviceMessageConnector {
                 if(listenTopic==null||listenTopic.isDisposed()){
                     listenTopic = clusterManager.getTopic(ClusterMessageType.topicOf(serverId, ClusterMessageType.up))
                         .subscribePattern()
+                        .mergeWith(clusterManager.getTopic(ClusterMessageType.topicOf(serverId, ClusterMessageType.down)).subscribePattern())
                         .publishOn(Schedulers.boundedElastic())
                         .flatMap(message -> {
-                            Message msg = (Message) message.getMessage();
-                            messageServerMap.put(msg.getMessageId(),msg);
+                            Message msg=null;
+                            Object obj = message.getMessage();
+                            if(obj instanceof ClusterMessage){
+                                ClusterMessage clusterMessage= (ClusterMessage) obj;
+                                msg=clusterMessage.getPayload();
+                            }else if(obj instanceof Message){
+                                msg= (Message) obj;
+                            }
+                            if(msg==null){
+                                return Mono.empty();
+                            }
+                            if(StrUtil.isNotEmpty(msg.getMessageId())){
+                                messageServerMap.put(msg.getMessageId(),msg);
+                            }
                             return super.handleMessage(null,
                                 msg);
                         })
@@ -81,23 +93,23 @@ public class ClusterDeviceMessageConnector extends DeviceMessageConnector {
         /**
          * 清理过期消息
          */
-        Flux.interval(Duration.ofSeconds(5))
-            .doOnNext(ignore->{
-                LocalDateTime now = LocalDateTime.now();
-                Set<String> keySet = messageServerMap.keySet();
-                for (String key : keySet) {
-                    Message message = messageServerMap.get(key);
-                    if(message!=null){
-                        long messageTime = message.getTimestamp();
-                        if(Duration.between(LocalDateTime.ofEpochSecond(messageTime,0, ZoneOffset.ofHours(8)),now).toMillis()>3000){
-                            messageServerMap.remove(key);
-                        }else {
-                            break;
-                        }
-                    }
-                }
-            })
-            .subscribe();
+//        Flux.interval(Duration.ofSeconds(5))
+//            .doOnNext(ignore->{
+//                LocalDateTime now = LocalDateTime.now();
+//                Set<String> keySet = messageServerMap.keySet();
+//                for (String key : keySet) {
+//                    Message message = messageServerMap.get(key);
+//                    if(message!=null){
+//                        long messageTime = message.getTimestamp();
+//                        if(Duration.between(LocalDateTime.ofEpochSecond(messageTime,0, ZoneOffset.ofHours(8)),now).toMillis()>3000){
+//                            messageServerMap.remove(key);
+//                        }else {
+//                            break;
+//                        }
+//                    }
+//                }
+//            })
+//            .subscribe();
 
 
     }