18339543638 4 lat temu
rodzic
commit
c0a2449fc1

+ 11 - 1
jetlinks-core/src/main/java/org/jetlinks/core/message/DeviceOfflineMessage.java

@@ -1,6 +1,16 @@
 package org.jetlinks.core.message;
 
-public class DeviceOfflineMessage extends CommonDeviceMessage{
+public class DeviceOfflineMessage extends CommonDeviceMessage implements BroadcastMessage{
+    @Override
+    public String getAddress() {
+        return null;
+    }
+
+    @Override
+    public Message getMessage() {
+        return this;
+    }
+
     @Override
     public MessageType getMessageType() {
         return MessageType.OFFLINE;

+ 11 - 1
jetlinks-core/src/main/java/org/jetlinks/core/message/DeviceOnlineMessage.java

@@ -1,6 +1,16 @@
 package org.jetlinks.core.message;
 
-public class DeviceOnlineMessage extends CommonDeviceMessage {
+public class DeviceOnlineMessage extends CommonDeviceMessage implements BroadcastMessage {
+    @Override
+    public String getAddress() {
+        return null;
+    }
+
+    @Override
+    public Message getMessage() {
+        return this;
+    }
+
     public MessageType getMessageType() {
         return MessageType.ONLINE;
     }

+ 5 - 0
jetlinks-core/src/main/java/org/jetlinks/core/message/Headers.java

@@ -118,4 +118,9 @@ public interface Headers {
      * 产品ID
      */
     HeaderKey<String> productId = HeaderKey.of("productId", null, String.class);
+
+    /**
+     * 集群中首先受到消息的服务id,进行日志操作
+     */
+    HeaderKey<String> serverId = HeaderKey.of("serverId", null, String.class);
 }

+ 0 - 1
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/configuration/DeviceManagerConfiguration.java

@@ -36,5 +36,4 @@ public class DeviceManagerConfiguration {
         return new TimeSeriesMessageWriterConnector(dataService,registry);
     }
 
-
 }

+ 5 - 2
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java

@@ -130,6 +130,7 @@ public class DeviceMessageConnector implements DecodedClientMessageHandler {
     private final EventBus eventBus;
     private final MessageHandler messageHandler;
 
+
     public DeviceMessageConnector(EventBus eventBus,
                                   DeviceRegistry registry,
                                   MessageHandler messageHandler,
@@ -143,7 +144,8 @@ public class DeviceMessageConnector implements DecodedClientMessageHandler {
                 DeviceOnlineMessage message = new DeviceOnlineMessage();
                 message.setDeviceId(session.getDeviceId());
                 message.setTimestamp(session.connectTime());
-                return onMessage(message);
+                return this.handleMessage(session.getOperator(),message);
+//                return onMessage(message);
             })
             .onErrorContinue(doOnError)
             .subscribe();
@@ -154,7 +156,8 @@ public class DeviceMessageConnector implements DecodedClientMessageHandler {
                 DeviceOfflineMessage message = new DeviceOfflineMessage();
                 message.setDeviceId(session.getDeviceId());
                 message.setTimestamp(System.currentTimeMillis());
-                return onMessage(message);
+                return this.handleMessage(session.getOperator(),message);
+//                return onMessage(message);
             })
             .onErrorContinue(doOnError)
             .subscribe();

+ 13 - 30
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/writer/TimeSeriesMessageWriterConnector.java

@@ -7,8 +7,10 @@ import org.jetlinks.community.gateway.annotation.Subscribe;
 import org.jetlinks.core.device.DeviceOperator;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.message.DeviceMessage;
+import org.jetlinks.core.message.Headers;
 import org.jetlinks.core.message.MessageType;
 import org.jetlinks.core.message.TimeSyncReplyMessage;
+import org.springframework.beans.factory.annotation.Value;
 import reactor.core.publisher.Mono;
 
 import static org.jetlinks.core.message.MessageType.*;
@@ -22,28 +24,17 @@ import java.util.*;
  * @since 1.0
  */
 @Slf4j
-@AllArgsConstructor
 public class TimeSeriesMessageWriterConnector {
-    private static final String FIRST="first";
-    public static final String ONLY_READ="onlyRead";
-    //下行数据
-    public static  List<MessageType> downMessages;
-    //上行数据
-    public static  List<MessageType> upMessages;
     private final DeviceDataService dataService;
     private final DeviceRegistry registry;
 
-    static {
-        downMessages=Arrays.asList(READ_PROPERTY,WRITE_PROPERTY,INVOKE_FUNCTION,DISCONNECT,CHILD,
-            READ_FIRMWARE,
-            REQUEST_FIRMWARE,
-//            UPGRADE_FIRMWARE,
-            ACKNOWLEDGE,STATE_CHECK);
-        upMessages=Arrays.asList(REPORT_PROPERTY,EVENT,ONLINE,OFFLINE,REGISTER,
-            UN_REGISTER,DERIVED_METADATA,
-//            REPORT_FIRMWARE,
-            UPGRADE_FIRMWARE_PROGRESS,UPDATE_TAG);
+    public TimeSeriesMessageWriterConnector(DeviceDataService dataService, DeviceRegistry registry) {
+        this.dataService = dataService;
+        this.registry = registry;
     }
+
+    @Value("${jetlinks.server-id}")
+    private String serverId;
     /**
      * 订阅设备消息 入库
      *
@@ -52,6 +43,11 @@ public class TimeSeriesMessageWriterConnector {
      */
     @Subscribe(topics = "/device/**", id = "device-message-ts-writer")
     public Mono<Void> writeDeviceMessageToTs(DeviceMessage message) {
+        Optional<String> serverId = message.getHeader(Headers.serverId);
+        if(serverId.isPresent()&&!this.serverId.equals(serverId.get())){
+            //不是设备连接的服务器,不进行存储信息
+            return Mono.empty();
+        }
         return dataService.saveDeviceMessage(message);
     }
 
@@ -74,17 +70,4 @@ public class TimeSeriesMessageWriterConnector {
             })
             .then();
     }
-
-
-    //断开连接
-    private Mono<?> disconnect(DeviceMessage message){
-        Mono<DeviceOperator> device = registry.getDevice(message.getDeviceId());
-
-        return registry.getDevice(message.getDeviceId())
-            .switchIfEmpty(Mono.never())
-            .flatMapMany(DeviceOperator::disconnect)
-            .doOnError(e->{})
-            .singleOrEmpty();
-    }
-
 }

+ 1 - 5
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/AbstractDeviceDataStoragePolicy.java

@@ -118,7 +118,7 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
     @Nonnull
     @Override
     public Mono<Void> saveDeviceMessage(@Nonnull DeviceMessage message) {
-        return message.getHeader(TimeSeriesMessageWriterConnector.ONLY_READ).isPresent() ?Mono.never(): this
+        return this
             .convertMessageToTimeSeriesData(message)
             .flatMap(tp2 -> doSaveData(tp2.getT1(), tp2.getT2()))
             .then();
@@ -128,7 +128,6 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
     @Override
     public Mono<Void> saveDeviceMessage(@Nonnull Publisher<DeviceMessage> message) {
         return Flux.from(message)
-            .filter(msg->msg.getHeader(TimeSeriesMessageWriterConnector.ONLY_READ).isPresent())
             .flatMap(this::convertMessageToTimeSeriesData)
             .groupBy(Tuple2::getT1, Integer.MAX_VALUE)
             .flatMap(group -> doSaveData(group.key(), group.map(Tuple2::getT2)))
@@ -143,9 +142,6 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
     protected Mono<Tuple2<String, TimeSeriesData>> createDeviceMessageLog(String productId,
                                                                           DeviceMessage message,
                                                                           BiConsumer<DeviceMessage, DeviceOperationLogEntity> logEntityConsumer) {
-        if(message.getHeader(TimeSeriesMessageWriterConnector.ONLY_READ).isPresent()){
-            return Mono.never();
-        }
         DeviceOperationLogEntity operationLog = new DeviceOperationLogEntity();
         operationLog.setId(IDGenerator.SNOW_FLAKE_STRING.generate());
         operationLog.setDeviceId(message.getDeviceId());

+ 50 - 0
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/ClusterConfiguration.java

@@ -0,0 +1,50 @@
+package org.jetlinks.community.standalone.configuration;
+
+import org.jetlinks.community.standalone.configuration.cluster.ClusterDeviceMessageBrokeMessageBroker;
+import org.jetlinks.community.standalone.configuration.cluster.ClusterDeviceMessageConnector;
+import org.jetlinks.core.cluster.ClusterManager;
+import org.jetlinks.core.device.DeviceRegistry;
+import org.jetlinks.core.event.EventBus;
+import org.jetlinks.core.server.MessageHandler;
+import org.jetlinks.core.server.session.DeviceSessionManager;
+import org.jetlinks.supports.config.ClusterConfigStorageManager;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName ClusterConfiguration.java
+ * @Description TODO
+ * @createTime 2021年11月08日 15:35:00
+ */
+@Configuration
+@ConditionalOnProperty(name = "jetlinks.cluster",havingValue = "true")
+public class ClusterConfiguration {
+
+    @Bean
+    @ConditionalOnProperty(name = "jetlinks.cluster",havingValue = "true")
+    public ClusterDeviceMessageBrokeMessageBroker clusterDeviceMessageBrokeMessageBroker(JetLinksProperties properties, ClusterManager clusterManager) {
+        return new ClusterDeviceMessageBrokeMessageBroker(properties.getServerId(),clusterManager);
+    }
+
+    @Bean
+    @ConditionalOnProperty(name = "jetlinks.cluster",havingValue = "true")
+    public ClusterConfigStorageManager clusterEventBusStorageManager(ClusterManager clusterManager, EventBus eventBus) {
+        return new ClusterConfigStorageManager(clusterManager);
+    }
+
+
+    @Bean(initMethod = "init")
+    @ConditionalOnProperty(name = "jetlinks.cluster",havingValue = "true")
+    public ClusterDeviceMessageConnector clusterDeviceMessageConnector(EventBus eventBus,
+                                                                       MessageHandler messageHandler,
+                                                                       DeviceSessionManager sessionManager,
+                                                                       DeviceRegistry registry,
+                                                                       JetLinksProperties jetLinksProperties,
+                                                                       ClusterManager clusterManager) {
+        return new ClusterDeviceMessageConnector(eventBus, registry, messageHandler, sessionManager,jetLinksProperties.getServerId(),clusterManager);
+    }
+
+}

+ 0 - 28
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/JetLinksConfiguration.java

@@ -91,27 +91,6 @@ public class JetLinksConfiguration {
         return new BrokerEventBus();
     }
 
-
-    @Bean(initMethod = "init")
-    @ConditionalOnProperty(name = "jetlinks.cluster",havingValue = "true")
-    public ClusterDeviceMessageConnector clusterDeviceMessageConnector(EventBus eventBus,
-                                                                       MessageHandler messageHandler,
-                                                                       DeviceSessionManager sessionManager,
-                                                                       DeviceRegistry registry,
-                                                                       JetLinksProperties jetLinksProperties,
-                                                                       ClusterManager clusterManager) {
-        return new ClusterDeviceMessageConnector(eventBus, registry, messageHandler, sessionManager,jetLinksProperties.getServerId(),clusterManager);
-    }
-
-
-
-
-    @Bean
-    @ConditionalOnProperty(name = "jetlinks.cluster",havingValue = "true")
-    public ClusterDeviceMessageBrokeMessageBroker clusterDeviceMessageBrokeMessageBroker(JetLinksProperties properties,ClusterManager clusterManager) {
-        return new ClusterDeviceMessageBrokeMessageBroker(properties.getServerId(),clusterManager);
-    }
-
     @Bean
     @ConditionalOnProperty(name = "jetlinks.cluster",havingValue = "false")
     public StandaloneDeviceMessageBroker standaloneDeviceMessageBroker() {
@@ -119,13 +98,6 @@ public class JetLinksConfiguration {
     }
 
 
-
-    @Bean
-    @ConditionalOnProperty(name = "jetlinks.cluster",havingValue = "true")
-    public ClusterConfigStorageManager clusterEventBusStorageManager(ClusterManager clusterManager, EventBus eventBus) {
-        return new ClusterConfigStorageManager(clusterManager);
-    }
-
     @Bean
     @ConditionalOnProperty(name = "jetlinks.cluster",havingValue = "false")
     public EventBusStorageManager eventBusStorageManager(ClusterManager clusterManager, EventBus eventBus) {

+ 4 - 4
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/cluster/ClusterDeviceMessageBrokeMessageBroker.java

@@ -5,15 +5,14 @@ import lombok.extern.slf4j.Slf4j;
 import org.hswebframework.web.exception.BusinessException;
 import org.jetlinks.community.standalone.configuration.cluster.message.ClusterMessage;
 import org.jetlinks.core.cluster.ClusterManager;
-import org.jetlinks.core.cluster.ClusterTopic;
 import org.jetlinks.core.cluster.ServerNode;
 import org.jetlinks.core.device.StandaloneDeviceMessageBroker;
-import org.jetlinks.core.message.BroadcastMessage;
-import org.jetlinks.core.message.Message;
+import org.jetlinks.core.message.*;
 import org.reactivestreams.Publisher;
 import reactor.core.publisher.*;
 import reactor.core.scheduler.Schedulers;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
@@ -66,6 +65,7 @@ public class ClusterDeviceMessageBrokeMessageBroker extends StandaloneDeviceMess
         }
         return clusterManager.getTopic(ClusterMessageType.topicOf(serverId,ClusterMessageType.down))
             .publish(Mono.from(message)
+                .doOnNext(msg->msg.addHeader(Headers.serverId,serverId))
                 .map(msg->new ClusterMessage(msg,serverId,ClusterMessageType.topicOf(serverId,ClusterMessageType.down))));
     }
 
@@ -76,7 +76,7 @@ public class ClusterDeviceMessageBrokeMessageBroker extends StandaloneDeviceMess
             .filter(allNode::contains)
             .map(ServerNode::getId)
             .flatMap(serverId-> clusterManager.getTopic(ClusterMessageType.topicOf(serverId,ClusterMessageType.down))
-                .publish(message))
+                .publish(Mono.from(message).doOnNext(msg->msg.addHeader(Headers.serverId,serverId))))
             .collect(Collectors.counting())
             .map(Long::intValue);
     }

+ 28 - 15
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/cluster/ClusterDeviceMessageConnector.java

@@ -7,6 +7,8 @@ import org.jetlinks.core.cluster.ServerNode;
 import org.jetlinks.core.device.DeviceOperator;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.event.EventBus;
+import org.jetlinks.core.message.BroadcastMessage;
+import org.jetlinks.core.message.Headers;
 import org.jetlinks.core.message.Message;
 import org.jetlinks.core.server.MessageHandler;
 import org.jetlinks.core.server.session.DeviceSessionManager;
@@ -74,22 +76,33 @@ public class ClusterDeviceMessageConnector extends DeviceMessageConnector {
     }
     @Override
     public Mono<Boolean> handleMessage(DeviceOperator device, @Nonnull Message message) {
-       return  device.getConnectionServerId()
-            .flatMap(serverId->{
-                //设备连接当前服务器,不需要通过集群传递信息
-                if(serverId.equals(this.serverId)){
-                    return super.handleMessage(device,message);
-                }else {
-                    Optional<ServerNode> first = allNode.stream().filter(node -> node.getId().equals(serverId)).findFirst();
-                    if(!first.isPresent()){
-                        //设备连接服务器离线,则信息交由当前服务器处理
+        message.addHeader(Headers.serverId,serverId);
+        if(message instanceof BroadcastMessage){
+           return  Flux.fromStream(allNode.stream())
+                .flatMap(node-> clusterManager
+                    .getTopic(ClusterMessageType.topicOf(serverId, ClusterMessageType.up))
+                        .publish(Mono.just(message))
+                )
+                .flatMap(result->result==0?Mono.just(false):Mono.just(true))
+               .reduce((a1,a2)->a1&&a2);
+        }else {
+            return  device.getConnectionServerId()
+                .flatMap(serverId->{
+                    //设备连接当前服务器,不需要通过集群传递信息
+                    if(serverId.equals(this.serverId)){
                         return super.handleMessage(device,message);
+                    }else {
+                        Optional<ServerNode> first = allNode.stream().filter(node -> node.getId().equals(serverId)).findFirst();
+                        if(!first.isPresent()){
+                            //设备连接服务器离线,则信息交由当前服务器处理
+                            return super.handleMessage(device,message);
+                        }
+                        return clusterManager
+                            .getTopic(ClusterMessageType.topicOf(serverId,ClusterMessageType.up))
+                            .publish(Mono.just(message))
+                            .flatMap(result->result==0?Mono.just(false):Mono.just(true));
                     }
-                    return clusterManager
-                        .getTopic(ClusterMessageType.topicOf(serverId,ClusterMessageType.up))
-                        .publish(Mono.just(message))
-                        .flatMap(result->result==0?Mono.just(false):Mono.just(true));
-                }
-            });
+                });
+        }
     }
 }

+ 8 - 0
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/cluster/ClusterMessageType.java

@@ -24,4 +24,12 @@ public enum  ClusterMessageType {
             default: return null;
         }
     }
+
+    /**
+     * 广播消息
+     * @return
+     */
+    public static String broadcastTopic(){
+       return "cluster:*-message-up";
+    }
 }