18339543638 4 lat temu
rodzic
commit
f6a1640855
31 zmienionych plików z 634 dodań i 113 usunięć
  1. 0 1
      jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/socket/WebSocketMessagingHandlerConfiguration.java
  2. 3 9
      jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java
  3. 0 1
      jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGatewayProvider.java
  4. 1 1
      jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/support/JetLinksExtendProtocolSupportProvider.java
  5. 14 0
      jetlinks-core/src/main/java/org/jetlinks/core/defaults/DefaultDeviceOperator.java
  6. 11 0
      jetlinks-core/src/main/java/org/jetlinks/core/device/DeviceOperator.java
  7. 1 0
      jetlinks-core/src/main/java/org/jetlinks/core/device/DeviceState.java
  8. 11 7
      jetlinks-core/src/main/java/org/jetlinks/core/device/StandaloneDeviceMessageBroker.java
  9. 2 0
      jetlinks-core/src/main/java/org/jetlinks/core/message/CommonDeviceMessage.java
  10. 1 0
      jetlinks-core/src/main/java/org/jetlinks/core/message/DeviceMessage.java
  11. 3 2
      jetlinks-core/src/main/java/org/jetlinks/core/message/Message.java
  12. 1 0
      jetlinks-core/src/main/java/org/jetlinks/core/message/codec/DefaultTransport.java
  13. 10 0
      jetlinks-core/src/main/java/org/jetlinks/core/server/session/DeviceSession.java
  14. 6 2
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/configuration/DeviceManagerConfiguration.java
  15. 3 0
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/enums/DeviceState.java
  16. 3 3
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java
  17. 0 47
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/writer/TimeSeriesMessageWriterConnector.java
  18. 18 25
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/LocalDeviceInstanceService.java
  19. 101 0
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/session/MockSession.java
  20. 87 0
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/DeviceMockController.java
  21. 3 0
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/GeoController.java
  22. 16 10
      jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/DefaultDeviceSessionManager.java
  23. 28 0
      jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/JetLinksConfiguration.java
  24. 3 1
      jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/JetLinksProperties.java
  25. 84 0
      jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/cluster/ClusterDeviceMessageBrokeMessageBroker.java
  26. 95 0
      jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/cluster/ClusterDeviceMessageConnector.java
  27. 27 0
      jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/cluster/ClusterMessageType.java
  28. 47 0
      jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/cluster/message/ClusterMessage.java
  29. 26 0
      jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/cluster/message/ClusterMessageReply.java
  30. 2 1
      jetlinks-standalone/src/main/resources/application.yml
  31. 27 3
      jetlinks-standalone/src/test/java/org/jetlinks/community/network/manager/web/GateWayTest.java

+ 0 - 1
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/socket/WebSocketMessagingHandlerConfiguration.java

@@ -3,7 +3,6 @@ package org.jetlinks.community.gateway.external.socket;
 import org.hswebframework.web.authorization.ReactiveAuthenticationManager;
 import org.hswebframework.web.authorization.token.UserTokenManager;
 import org.jetlinks.community.gateway.external.MessagingManager;
-import org.jetlinks.supports.cluster.redis.RedisClusterManager;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;

+ 3 - 9
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java

@@ -17,8 +17,6 @@ import org.jetlinks.community.network.mqtt.auth.MqttDefaultAuth;
 import org.jetlinks.community.network.mqtt.gateway.device.session.MqttConnectionSession;
 import org.jetlinks.community.network.mqtt.server.MqttConnection;
 import org.jetlinks.community.network.mqtt.server.MqttServer;
-import org.jetlinks.community.network.mqtt.server.MqttSubscription;
-import org.jetlinks.community.network.mqtt.server.MqttUnSubscription;
 import org.jetlinks.community.network.utils.DeviceGatewayHelper;
 import org.jetlinks.core.ProtocolSupport;
 import org.jetlinks.core.defaults.Authenticator;
@@ -26,7 +24,6 @@ import org.jetlinks.core.device.AuthenticationResponse;
 import org.jetlinks.core.device.DeviceOperator;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.device.MqttAuthenticationRequest;
-import org.jetlinks.core.event.EventBus;
 import org.jetlinks.core.message.*;
 import org.jetlinks.core.message.codec.DefaultTransport;
 import org.jetlinks.core.message.codec.FromDeviceMessageContext;
@@ -84,15 +81,13 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
 
     private final DeviceGatewayHelper helper;
 
-    private final EventBus eventBus;
     public MqttServerDeviceGateway(String id,
-                                   EventBus eventBus,
                                    DeviceRegistry registry,
                                    DeviceSessionManager sessionManager,
                                    MqttServer mqttServer,
                                    DecodedClientMessageHandler messageHandler,
                                    Mono<ProtocolSupport> customProtocol) {
-        this.eventBus=eventBus;
+
         this.gatewayMonitor = GatewayMonitors.getDeviceGatewayMonitor(id);
         this.id = id;
         this.registry = registry;
@@ -146,7 +141,6 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
                     .map(support -> authenticator.authenticate(request, registry))
                     .defaultIfEmpty(Mono.defer(() -> registry
                         .getDevice(connection.getClientId())
-//                        .flatMap(device -> device.authenticate(request))))
                         .flatMap(device -> device.authenticate(request))))
                     .flatMap(Function.identity())
                     .switchIfEmpty(Mono.fromRunnable(() -> connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD)));
@@ -184,6 +178,8 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
                             sessionManager.register(newSession);
                         } else if (session instanceof ReplaceableDeviceSession) {
                             ((ReplaceableDeviceSession) session).replaceWith(newSession);
+                        }else{
+                            sessionManager.register(newSession);
                         }
                         gatewayMonitor.connected();
                         gatewayMonitor.totalConnection(counter.sum());
@@ -205,8 +201,6 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
                             registry.getDevice(conn.getClientId())
                                 .doOnNext(operator -> {
                                     operator.getTopics().clear();
-//                                    eventBus.publish(String.format("/dashboard/device/%s/changed/topics",
-//                                        conn.getClientId()),new TimeSyncMessage());
                                 }).subscribe();
                         });
                         return Tuples.of(connection.accept(), device, newSession);

+ 0 - 1
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGatewayProvider.java

@@ -66,7 +66,6 @@ public class MqttServerDeviceGatewayProvider implements DeviceGatewayProvider {
             .<MqttServer>getNetwork(getNetworkType(), properties.getNetworkId())
             .map(mqttServer -> new MqttServerDeviceGateway(
                 properties.getId(),
-                eventBus,
                 registry,
                 sessionManager,
                 mqttServer,

+ 1 - 1
jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/support/JetLinksExtendProtocolSupportProvider.java

@@ -69,11 +69,11 @@ public class JetLinksExtendProtocolSupportProvider implements ProtocolSupportPro
 
             support.addMessageCodecSupport(new JetLinksExtendMqttDeviceMessageCodec(DefaultTransport.MQTT));
             support.addMessageCodecSupport(new JetLinksExtendMqttDeviceMessageCodec(DefaultTransport.MQTT_TLS));
+            support.addMessageCodecSupport(new JetLinksExtendMqttDeviceMessageCodec(DefaultTransport.INNER));
 
             support.addMessageCodecSupport(new JetLinksCoapDeviceMessageCodec());
             support.addMessageCodecSupport(new JetLinksCoapDTLSDeviceMessageCodec());
 
-
             return Mono.just(support);
         });
     }

+ 14 - 0
jetlinks-core/src/main/java/org/jetlinks/core/defaults/DefaultDeviceOperator.java

@@ -302,6 +302,8 @@ public class DefaultDeviceOperator implements DeviceOperator, StorageConfigurabl
                         configs.put("onlineTime", System.currentTimeMillis());
                     } else if (newer == DeviceState.offline) {
                         configs.put("offlineTime", System.currentTimeMillis());
+                    }else if(newer == DeviceState.mockOnline ){
+                        configs.put("mockOnlineTime",System.currentTimeMillis());
                     }
                     return this
                         .setConfigs(configs)
@@ -375,6 +377,18 @@ public class DefaultDeviceOperator implements DeviceOperator, StorageConfigurabl
             .doOnError(err -> log.error("online device error", err));
     }
 
+    @Override
+    public Mono<Boolean> mockOnline(String serverId, String sessionId, String address) {
+        return this.setConfigs(
+            connectionServerId.value(serverId),
+            DeviceConfigKey.sessionId.value(serverId),
+            ConfigKey.of("address").value(address),
+            ConfigKey.of("mockOnlineTime").value(System.currentTimeMillis()),
+            ConfigKey.of("state").value(DeviceState.mockOnline)
+
+        );
+    }
+
     @Override
     public Mono<Value> getSelfConfig(String key) {
         return getConfig(key, false);

+ 11 - 0
jetlinks-core/src/main/java/org/jetlinks/core/device/DeviceOperator.java

@@ -103,6 +103,8 @@ public interface DeviceOperator extends Configurable {
 
     Mono<Boolean> online(String serverId, String sessionId, String address);
 
+    Mono<Boolean> mockOnline(String serverId,String sessionId, String address);
+
     Mono<Value> getSelfConfig(String key);
 
     Mono<Values> getSelfConfigs(Collection<String> keys);
@@ -129,6 +131,15 @@ public interface DeviceOperator extends Configurable {
                 .defaultIfEmpty(false);
     }
 
+    /**
+     * @return 是否在线
+     */
+    default Mono<Boolean> isMockOnline() {
+        return checkState()
+            .map(state -> state.equals(DeviceState.mockOnline))
+            .defaultIfEmpty(false);
+    }
+
     /**
      * 设置设备离线
      *

+ 1 - 0
jetlinks-core/src/main/java/org/jetlinks/core/device/DeviceState.java

@@ -6,6 +6,7 @@ package org.jetlinks.core.device;
  */
 public interface DeviceState {
 
+    byte mockOnline = 2;
     //未知
     byte unknown = 0;
 

+ 11 - 7
jetlinks-core/src/main/java/org/jetlinks/core/device/StandaloneDeviceMessageBroker.java

@@ -2,6 +2,7 @@ package org.jetlinks.core.device;
 
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
+import org.jetlinks.core.cluster.ClusterManager;
 import org.jetlinks.core.enums.ErrorCode;
 import org.jetlinks.core.exception.DeviceOperationException;
 import org.jetlinks.core.message.BroadcastMessage;
@@ -22,6 +23,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 
 @Slf4j
+/**
+ * 下行数据
+ */
 public class StandaloneDeviceMessageBroker implements DeviceOperationBroker, MessageHandler {
 
     private final FluxProcessor<Message, Message> messageEmitterProcessor;
@@ -51,7 +55,7 @@ public class StandaloneDeviceMessageBroker implements DeviceOperationBroker, Mes
     @Override
     public Flux<Message> handleSendToDeviceMessage(String serverId) {
         return messageEmitterProcessor
-                .map(Function.identity());
+            .map(Function.identity());
     }
 
     @Override
@@ -63,7 +67,7 @@ public class StandaloneDeviceMessageBroker implements DeviceOperationBroker, Mes
     @Override
     public Flux<DeviceStateInfo> getDeviceState(String serviceId, Collection<String> deviceIdList) {
         return Mono.justOrEmpty(stateHandler.get(serviceId))
-                .flatMapMany(fun -> fun.apply(Flux.fromIterable(deviceIdList)));
+            .flatMapMany(fun -> fun.apply(Flux.fromIterable(deviceIdList)));
     }
 
     @Override
@@ -113,9 +117,9 @@ public class StandaloneDeviceMessageBroker implements DeviceOperationBroker, Mes
     public Flux<DeviceMessageReply> handleReply(String deviceId,String messageId, Duration timeout) {
 
         return replyProcessor
-                .computeIfAbsent(messageId, ignore -> UnicastProcessor.create())
-                .timeout(timeout, Mono.error(() -> new DeviceOperationException(ErrorCode.TIME_OUT)))
-                .doFinally(signal -> replyProcessor.remove(messageId));
+            .computeIfAbsent(messageId, ignore -> UnicastProcessor.create())
+            .timeout(timeout, Mono.error(() -> new DeviceOperationException(ErrorCode.TIME_OUT)))
+            .doFinally(signal -> replyProcessor.remove(messageId));
     }
 
     @Override
@@ -125,8 +129,8 @@ public class StandaloneDeviceMessageBroker implements DeviceOperationBroker, Mes
         }
 
         return Flux.from(message)
-                .doOnNext(sink::next)
-                .then(Mono.just(Long.valueOf(messageEmitterProcessor.downstreamCount()).intValue()));
+            .doOnNext(sink::next)
+            .then(Mono.just(Long.valueOf(messageEmitterProcessor.downstreamCount()).intValue()));
     }
 
     @Override

+ 2 - 0
jetlinks-core/src/main/java/org/jetlinks/core/message/CommonDeviceMessage.java

@@ -23,6 +23,8 @@ public class CommonDeviceMessage implements DeviceMessage {
 
     private String deviceId;
 
+    private boolean mock;
+
     private Map<String, Object> headers;
 
     private long timestamp = System.currentTimeMillis();

+ 1 - 0
jetlinks-core/src/main/java/org/jetlinks/core/message/DeviceMessage.java

@@ -10,6 +10,7 @@ public interface DeviceMessage extends Message, Jsonable {
 
     String getDeviceId();
 
+    @Override
     long getTimestamp();
 
     @Override

+ 3 - 2
jetlinks-core/src/main/java/org/jetlinks/core/message/Message.java

@@ -2,6 +2,7 @@ package org.jetlinks.core.message;
 
 import com.alibaba.fastjson.parser.ParserConfig;
 import com.alibaba.fastjson.util.TypeUtils;
+import lombok.Data;
 import org.jetlinks.core.metadata.Jsonable;
 
 import javax.annotation.Nullable;
@@ -106,7 +107,7 @@ public interface Message extends Jsonable, Serializable {
     @SuppressWarnings("all")
     default <T> Optional<T> getHeader(HeaderKey<T> key) {
         return getHeader(key.getKey())
-                .map(v -> TypeUtils.cast(v, key.getType(), ParserConfig.global));
+            .map(v -> TypeUtils.cast(v, key.getType(), ParserConfig.global));
     }
 
     default <T> T getHeaderOrDefault(HeaderKey<T> key) {
@@ -115,7 +116,7 @@ public interface Message extends Jsonable, Serializable {
 
     default Optional<Object> getHeader(String header) {
         return Optional.ofNullable(getHeaders())
-                       .map(headers -> headers.get(header));
+            .map(headers -> headers.get(header));
     }
 
     default void validate(){

+ 1 - 0
jetlinks-core/src/main/java/org/jetlinks/core/message/codec/DefaultTransport.java

@@ -11,6 +11,7 @@ import java.util.Arrays;
  */
 @AllArgsConstructor
 public enum DefaultTransport implements Transport {
+    INNER("Inner"),
     MQTT("MQTT"),
     MQTT_TLS("MQTT TLS"),
     UDP("UDP"),

+ 10 - 0
jetlinks-core/src/main/java/org/jetlinks/core/server/session/DeviceSession.java

@@ -1,6 +1,8 @@
 package org.jetlinks.core.server.session;
 
 import org.jetlinks.core.device.DeviceOperator;
+import org.jetlinks.core.message.CommonDeviceMessage;
+import org.jetlinks.core.message.CommonDeviceMessageReply;
 import org.jetlinks.core.message.codec.EncodedMessage;
 import org.jetlinks.core.message.codec.Transport;
 import reactor.core.publisher.Mono;
@@ -65,6 +67,14 @@ public interface DeviceSession {
      */
     Transport getTransport();
 
+    /**
+     * 该设备是否为模拟登陆设备
+     * @return
+     */
+    default boolean isMock(){
+        return false;
+    }
+
     /**
      * 关闭session
      */

+ 6 - 2
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/configuration/DeviceManagerConfiguration.java

@@ -8,7 +8,9 @@ import org.jetlinks.core.event.EventBus;
 import org.jetlinks.core.server.MessageHandler;
 import org.jetlinks.core.server.session.DeviceSessionManager;
 import org.jetlinks.supports.cluster.redis.RedisClusterManager;
+import org.jetlinks.supports.server.DecodedClientMessageHandler;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
@@ -18,6 +20,7 @@ import org.springframework.context.annotation.DependsOn;
 public class DeviceManagerConfiguration {
 
     @Bean
+    @ConditionalOnMissingBean(DecodedClientMessageHandler.class)
     public DeviceMessageConnector deviceMessageConnector(EventBus eventBus,
                                                          MessageHandler messageHandler,
                                                          DeviceSessionManager sessionManager,
@@ -25,11 +28,12 @@ public class DeviceManagerConfiguration {
         return new DeviceMessageConnector(eventBus, registry, messageHandler, sessionManager);
     }
 
+
     @Bean
     @ConditionalOnProperty(prefix = "device.message.writer.time-series", name = "enabled", havingValue = "true", matchIfMissing = true)
     public TimeSeriesMessageWriterConnector timeSeriesMessageWriterConnector(DeviceDataService dataService
-        , RedisClusterManager redisClusterManager,EventBus eventBus,DeviceRegistry registry) {
-        return new TimeSeriesMessageWriterConnector(dataService,redisClusterManager,eventBus,registry);
+        ,DeviceRegistry registry) {
+        return new TimeSeriesMessageWriterConnector(dataService,registry);
     }
 
 

+ 3 - 0
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/enums/DeviceState.java

@@ -10,6 +10,7 @@ import org.hswebframework.web.dict.EnumDict;
 @Dict("device-state")
 public enum DeviceState implements EnumDict<String> {
     notActive("未激活"),
+    mockOnline("模拟在线"),
     offline("离线"),
     online("在线");
 
@@ -22,6 +23,8 @@ public enum DeviceState implements EnumDict<String> {
 
     public static DeviceState of(byte state) {
         switch (state) {
+            case org.jetlinks.core.device.DeviceState.mockOnline:
+                return mockOnline;
             case org.jetlinks.core.device.DeviceState.offline:
                 return offline;
             case org.jetlinks.core.device.DeviceState.online:

+ 3 - 3
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java

@@ -1,6 +1,5 @@
 package org.jetlinks.community.device.message;
 
-import cn.hutool.core.util.EnumUtil;
 import lombok.extern.slf4j.Slf4j;
 import org.jetlinks.community.PropertyConstants;
 import org.jetlinks.core.Values;
@@ -23,7 +22,7 @@ import java.util.function.BiConsumer;
 import java.util.function.Function;
 
 /**
- * 将设备消息连接到消息网关
+ * 将设备消息连接到消息网关,上行
  *
  * @author zhouhao
  * @since 1.0
@@ -45,7 +44,6 @@ public class DeviceMessageConnector implements DecodedClientMessageHandler {
     static {
         fastTopicBuilder = new BiConsumer[MessageType.values().length];
 
-
         //事件
         createFastBuilder(MessageType.EVENT, (message, builder) -> {
             EventMessage event = ((EventMessage) message);
@@ -222,6 +220,7 @@ public class DeviceMessageConnector implements DecodedClientMessageHandler {
         }
     }
 
+
     public Mono<Void> onMessage(Message message) {
         if (null == message) {
             return Mono.empty();
@@ -275,6 +274,7 @@ public class DeviceMessageConnector implements DecodedClientMessageHandler {
         return handleChildrenDeviceMessage(reply.getChildDeviceMessage());
     }
 
+
     /**
      * 这里才是真正处理消息的地方
      *

+ 0 - 47
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/writer/TimeSeriesMessageWriterConnector.java

@@ -2,17 +2,13 @@ package org.jetlinks.community.device.message.writer;
 
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.jetlinks.community.device.message.DeviceMessageConnector;
 import org.jetlinks.community.device.service.data.DeviceDataService;
 import org.jetlinks.community.gateway.annotation.Subscribe;
 import org.jetlinks.core.device.DeviceOperator;
 import org.jetlinks.core.device.DeviceRegistry;
-import org.jetlinks.core.event.EventBus;
 import org.jetlinks.core.message.DeviceMessage;
 import org.jetlinks.core.message.MessageType;
 import org.jetlinks.core.message.TimeSyncReplyMessage;
-import org.jetlinks.supports.cluster.redis.RedisClusterManager;
-import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 import static org.jetlinks.core.message.MessageType.*;
@@ -35,8 +31,6 @@ public class TimeSeriesMessageWriterConnector {
     //上行数据
     public static  List<MessageType> upMessages;
     private final DeviceDataService dataService;
-    private final RedisClusterManager clusterManager;
-    private final EventBus eventBus;
     private final DeviceRegistry registry;
 
     static {
@@ -82,42 +76,6 @@ public class TimeSeriesMessageWriterConnector {
     }
 
 
-//    @PostConstruct
-//    public void clusterSubscribe(){
-//        //上行主题数据处理
-//        clusterManager.getTopic("/device/**").subscribePattern()
-//        .doOnNext(message->{
-//            String topic = message.getTopic();
-//            DeviceMessage deviceMessage= (DeviceMessage) message.getMessage();
-//            deviceMessage.addHeader(FIRST,false);
-//            deviceMessage.addHeader(ONLY_READ,false);
-//            eventBus.publish(topic,deviceMessage).subscribe();
-//        }).subscribe();
-//    }
-
-
-//    /**
-//     * 数据下行处理
-//     * @param message
-//     * @return
-//     */
-//    @Subscribe(topics = "/device/**", id = "ts-device-message-writer")
-//    public Mono<Void> writeTsToDevice(DeviceMessage message) {
-//        String deviceId = message.getDeviceId();
-//        String productId = String.valueOf(message.getHeader("productId").get());
-//        if(downMessages.contains(message.getMessageType())){
-//            try {
-//                String deviceMessageTopic = DeviceMessageConnector.createDeviceMessageTopic(productId, deviceId, message);
-//                disconnect(message);
-//
-//            }catch (Exception e){
-//                log.error("message : {} ,cluster share failed",message);
-//            }
-//        }
-//        return Mono.empty();
-//    }
-
-
     //断开连接
     private Mono<?> disconnect(DeviceMessage message){
         Mono<DeviceOperator> device = registry.getDevice(message.getDeviceId());
@@ -129,9 +87,4 @@ public class TimeSeriesMessageWriterConnector {
             .singleOrEmpty();
     }
 
-    //读取数据
-
-
-
-
 }

+ 18 - 25
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/LocalDeviceInstanceService.java

@@ -1,7 +1,6 @@
 package org.jetlinks.community.device.service;
 
-import cn.hutool.core.collection.CollectionUtil;
-import cn.hutool.json.JSONUtil;
+import lombok.AllArgsConstructor;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
@@ -17,35 +16,27 @@ import org.jetlinks.community.device.enums.DeviceFeature;
 import org.jetlinks.community.device.enums.DeviceState;
 import org.jetlinks.community.device.response.DeviceDeployResult;
 import org.jetlinks.community.device.response.DeviceDetail;
-import org.jetlinks.community.device.service.data.DeviceDataService;
-import org.jetlinks.community.gateway.annotation.Subscribe;
+import org.jetlinks.community.device.session.MockSession;
 import org.jetlinks.community.utils.ErrorUtils;
 import org.jetlinks.core.device.DeviceConfigKey;
 import org.jetlinks.core.device.DeviceOperator;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.enums.ErrorCode;
 import org.jetlinks.core.event.EventBus;
-import org.jetlinks.core.event.Subscription;
 import org.jetlinks.core.exception.DeviceOperationException;
-import org.jetlinks.core.message.DeviceMessage;
 import org.jetlinks.core.message.DeviceMessageReply;
 import org.jetlinks.core.message.FunctionInvokeMessageSender;
 import org.jetlinks.core.message.WritePropertyMessageSender;
-import org.jetlinks.core.message.event.EventMessage;
+import org.jetlinks.core.message.codec.DefaultTransport;
 import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
 import org.jetlinks.core.message.property.ReadPropertyMessageReply;
-import org.jetlinks.core.message.property.ReportPropertyMessage;
 import org.jetlinks.core.message.property.WritePropertyMessageReply;
 import org.jetlinks.core.metadata.*;
-import org.jetlinks.core.metadata.types.GeoType;
 import org.jetlinks.core.metadata.types.StringType;
+import org.jetlinks.core.server.session.DeviceSessionManager;
 import org.jetlinks.supports.official.JetLinksDeviceMetadataCodec;
 import org.reactivestreams.Publisher;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.annotation.Lazy;
 import org.springframework.stereotype.Service;
-import org.springframework.transaction.annotation.Propagation;
-import org.springframework.transaction.annotation.Transactional;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
@@ -59,6 +50,7 @@ import java.util.stream.Collectors;
 
 @Service
 @Slf4j
+@AllArgsConstructor
 public class LocalDeviceInstanceService extends GenericReactiveCrudService<DeviceInstanceEntity, String> {
 
     private final DeviceRegistry registry;
@@ -71,18 +63,9 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
     private final ReactiveRepository<DeviceTagEntity, String> tagRepository;
 
     private final DeviceMetadataCodec metadataCodec=new JetLinksDeviceMetadataCodec();
-    @Autowired
-    private LocalDeviceInstanceService instanceService;
-
-    public LocalDeviceInstanceService(DeviceRegistry registry,
-                                      LocalDeviceProductService deviceProductService,
-                                      DeviceConfigMetadataManager metadataManager,
-                                      ReactiveRepository<DeviceTagEntity, String> tagRepository) {
-        this.registry = registry;
-        this.deviceProductService = deviceProductService;
-        this.metadataManager = metadataManager;
-        this.tagRepository = tagRepository;
-    }
+
+    private final DeviceSessionManager deviceSessionManager;
+
 
 
     @Override
@@ -477,4 +460,14 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
     }
 
 
+    public Mono<Void> startMock(String deviceId){
+       return  registry.getDevice(deviceId)
+            .switchIfEmpty(ErrorUtils.notFound("设备不存在"))
+            .doOnNext(operator -> {
+                MockSession mockSession = new MockSession(deviceId,operator, DefaultTransport.INNER,eventBus);
+                deviceSessionManager.register(mockSession);
+            })
+           .then();
+    }
+
 }

+ 101 - 0
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/session/MockSession.java

@@ -0,0 +1,101 @@
+package org.jetlinks.community.device.session;
+
+import io.vertx.core.buffer.Buffer;
+import lombok.Data;
+import org.jetlinks.core.device.DeviceOperator;
+import org.jetlinks.core.event.EventBus;
+import org.jetlinks.core.message.codec.DefaultTransport;
+import org.jetlinks.core.message.codec.EncodedMessage;
+import org.jetlinks.core.message.codec.MqttMessage;
+import org.jetlinks.core.message.codec.Transport;
+import org.jetlinks.core.server.session.DeviceSession;
+import reactor.core.publisher.Mono;
+
+import javax.annotation.Nullable;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName MockSession.java
+ * @Description 模拟设备
+ * @createTime 2021年11月03日 16:30:00
+ */
+@Data
+public class MockSession implements DeviceSession {
+    private String id;
+    private DeviceOperator operator;
+    private Transport transport;
+    private final long connectTime = System.currentTimeMillis();
+    private EventBus eventBus;
+    public MockSession(String id,
+                                 DeviceOperator operator,
+                                 Transport transport,
+                       EventBus eventBus) {
+        this.id = id;
+        this.operator = operator;
+        this.transport = transport;
+        this.eventBus=eventBus;
+    }
+
+    @Override
+    public String getDeviceId() {
+        return id;
+    }
+
+    @Nullable
+    @Override
+    public DeviceOperator getOperator() {
+        return operator;
+    }
+
+    @Override
+    public long lastPingTime() {
+        return -1;
+    }
+
+    @Override
+    public long connectTime() {
+        return connectTime;
+    }
+
+    @Override
+    public Mono<Boolean> send(EncodedMessage encodedMessage) {
+        MqttMessage message= (MqttMessage) encodedMessage;
+        String topic = message.getTopic();
+        Buffer buffer = Buffer.buffer(message.getPayload());
+        return eventBus
+            .publish(topic,buffer)
+            .thenReturn(true);
+    }
+
+
+    @Override
+    public Transport getTransport() {
+        return DefaultTransport.INNER;
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public void ping() {
+
+    }
+
+    @Override
+    public boolean isAlive() {
+        return true;
+    }
+
+    @Override
+    public void onClose(Runnable call) {
+
+    }
+
+    @Override
+    public boolean isMock() {
+        return true;
+    }
+}

+ 87 - 0
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/DeviceMockController.java

@@ -0,0 +1,87 @@
+package org.jetlinks.community.device.web;
+
+import io.swagger.v3.oas.annotations.Operation;
+import io.swagger.v3.oas.annotations.tags.Tag;
+import lombok.AllArgsConstructor;
+import org.hswebframework.web.authorization.annotation.Resource;
+import org.jetlinks.community.device.entity.DeviceInstanceEntity;
+import org.jetlinks.community.device.enums.DeviceState;
+import org.jetlinks.community.device.service.LocalDeviceInstanceService;
+import org.jetlinks.core.device.DeviceOperator;
+import org.jetlinks.core.device.DeviceRegistry;
+import org.jetlinks.core.message.CommonDeviceMessage;
+import org.jetlinks.core.message.DeviceMessage;
+import org.jetlinks.core.server.MessageHandler;
+import org.jetlinks.supports.server.DecodedClientMessageHandler;
+import org.springframework.web.bind.annotation.*;
+import reactor.core.publisher.Mono;
+
+import java.util.Arrays;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName DeviceMockController.java
+ * @Description TODO
+ * @createTime 2021年11月03日 14:33:00
+ */
+@RestController
+@RequestMapping("/device/mock")
+@Resource(id = "device-mock", name = "设备模拟管理")
+@Tag(name = "设备模拟管理")
+@AllArgsConstructor
+public class DeviceMockController {
+    private final LocalDeviceInstanceService deviceInstanceService;
+
+    private final DecodedClientMessageHandler messageHandler;
+
+    /**
+     * 心跳值为3S
+     */
+    private final int heartIdle=3;
+    private final DeviceRegistry deviceRegistry;
+    @PostMapping("/start/{id}")
+    @Operation(summary = "启动模拟器")
+    public Mono<Integer> startMock(@PathVariable String id){
+        return deviceInstanceService
+            .createUpdate()
+            .set(DeviceInstanceEntity::getState, DeviceState.mockOnline)
+            .where(DeviceInstanceEntity::getId,id)
+            .in(DeviceInstanceEntity::getState, Arrays.asList(DeviceState.offline))
+            .execute()
+            .filter(result->!result.equals(0))
+            //启动失败则返回当前设备状态
+            .flatMap(ignore->deviceInstanceService.startMock(id))
+            .thenReturn(heartIdle);
+    }
+
+
+    @PostMapping("/stop/{id}")
+    @Operation(summary = "关闭模拟器")
+    public Mono<Void> stopMock(@PathVariable String id){
+        return deviceInstanceService
+            .createUpdate()
+            .set(DeviceInstanceEntity::getState, DeviceState.offline)
+            .where(DeviceInstanceEntity::getId,id)
+            .where(DeviceInstanceEntity::getState, DeviceState.mockOnline)
+            .execute()
+            .flatMap(ignore->
+                deviceRegistry.getDevice(id)
+                    .flatMap(DeviceOperator::offline)
+                    .then()
+            );
+    }
+
+
+    @PostMapping("/send/message/{id}")
+    @Operation(summary = "向平台发送设备信息")
+    public Mono<Void> sendMessageReplyToPlat(@PathVariable String deviceId,@RequestBody CommonDeviceMessage deviceMessage){
+
+        return deviceRegistry
+            .getDevice(deviceId)
+            .filterWhen(DeviceOperator::isMockOnline)
+            .flatMap(operator ->messageHandler.handleMessage(operator,deviceMessage))
+            .then();
+    }
+
+}

+ 3 - 0
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/GeoController.java

@@ -4,7 +4,9 @@ import cn.hutool.core.util.StrUtil;
 import io.swagger.v3.oas.annotations.Parameter;
 import io.swagger.v3.oas.annotations.tags.Tag;
 import io.vavr.Tuple;
+import io.vavr.Tuple2;
 import lombok.AllArgsConstructor;
+import org.hswebframework.ezorm.core.param.Term;
 import org.hswebframework.web.api.crud.entity.QueryOperation;
 import org.hswebframework.web.api.crud.entity.QueryParamEntity;
 import org.hswebframework.web.api.crud.entity.TreeSupportEntity;
@@ -42,6 +44,7 @@ import java.util.stream.Collectors;
 public class GeoController  implements ReactiveTreeServiceQueryController<GeoRegionEntity, String>{
     private final LocalGeoRegionService geoService;
 
+
     @GetMapping("/_query/tree/geo.json")
     @QueryAction
     @QueryOperation(summary = "使用GET动态查询并返回树形结构以及geo集合信息")

+ 16 - 10
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/DefaultDeviceSessionManager.java

@@ -287,17 +287,23 @@ public class DefaultDeviceSessionManager implements DeviceSessionManager {
                 .increment();
         }
 
-        //注册中心上线
-        session.getOperator()
-            .online(session.getServerId().orElse(serverId), session.getId(), session.getClientAddress().map(String::valueOf).orElse(null))
-            .doFinally(s -> {
-                //通知
-                if (onDeviceRegister.hasDownstreams()) {
-                    registerListener.next(session);
-                }
-            })
-            .subscribe();
 
+        //注册中心上线
+        if(!session.isMock()){
+            session.getOperator()
+                .online(session.getServerId().orElse(serverId), session.getId(), session.getClientAddress().map(String::valueOf).orElse(null))
+                .doFinally(s -> {
+                    //通知
+                    if (onDeviceRegister.hasDownstreams()) {
+                        registerListener.next(session);
+                    }
+                })
+                .subscribe();
+        }else {
+            session.getOperator()
+                .mockOnline(session.getServerId().orElse(serverId), session.getId(), session.getClientAddress().map(String::valueOf).orElse(null))
+                .subscribe();
+        }
         return old;
     }
 

+ 28 - 0
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/JetLinksConfiguration.java

@@ -10,9 +10,12 @@ import org.hswebframework.web.authorization.token.UserTokenManager;
 import org.hswebframework.web.authorization.token.redis.RedisUserTokenManager;
 import org.jetlinks.community.device.entity.DeviceInstanceEntity;
 import org.jetlinks.community.device.entity.DeviceProductEntity;
+import org.jetlinks.community.device.message.DeviceMessageConnector;
 import org.jetlinks.community.device.service.AutoDiscoverDeviceRegistry;
 import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric;
 import org.jetlinks.community.micrometer.MeterRegistryManager;
+import org.jetlinks.community.standalone.configuration.cluster.ClusterDeviceMessageBrokeMessageBroker;
+import org.jetlinks.community.standalone.configuration.cluster.ClusterDeviceMessageConnector;
 import org.jetlinks.core.ProtocolSupports;
 import org.jetlinks.core.cluster.ClusterManager;
 import org.jetlinks.core.config.ConfigStorageManager;
@@ -39,6 +42,8 @@ import org.jetlinks.supports.server.DefaultSendToDeviceMessageHandler;
 import org.jetlinks.supports.server.monitor.MicrometerGatewayServerMetrics;
 import org.springframework.beans.BeansException;
 import org.springframework.beans.factory.config.BeanPostProcessor;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -83,11 +88,34 @@ public class JetLinksConfiguration {
         return new BrokerEventBus();
     }
 
+
+    @Bean(initMethod = "init")
+    @ConditionalOnProperty(name = "jetlinks.cluster",havingValue = "true")
+    public ClusterDeviceMessageConnector clusterDeviceMessageConnector(EventBus eventBus,
+                                                                       MessageHandler messageHandler,
+                                                                       DeviceSessionManager sessionManager,
+                                                                       DeviceRegistry registry,
+                                                                       JetLinksProperties jetLinksProperties,
+                                                                       ClusterManager clusterManager) {
+        return new ClusterDeviceMessageConnector(eventBus, registry, messageHandler, sessionManager,jetLinksProperties.getServerId(),clusterManager);
+    }
+
+
+
+
     @Bean
+    @ConditionalOnProperty(name = "jetlinks.cluster",havingValue = "true")
+    public ClusterDeviceMessageBrokeMessageBroker clusterDeviceMessageBrokeMessageBroker(JetLinksProperties properties,ClusterManager clusterManager) {
+        return new ClusterDeviceMessageBrokeMessageBroker(properties.getServerId(),clusterManager);
+    }
+
+    @Bean
+    @ConditionalOnProperty(name = "jetlinks.cluster",havingValue = "false")
     public StandaloneDeviceMessageBroker standaloneDeviceMessageBroker() {
         return new StandaloneDeviceMessageBroker();
     }
 
+
     @Bean
     public EventBusStorageManager eventBusStorageManager(ClusterManager clusterManager, EventBus eventBus) {
         return new EventBusStorageManager(clusterManager,

+ 3 - 1
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/JetLinksProperties.java

@@ -16,7 +16,9 @@ public class JetLinksProperties {
 
     private String serverId;
 
-    private String clusterName ="default";
+    private String clusterName="default" ;
+
+    private boolean cluster=false;
 
     private Map<String, Long> transportLimit;
 

+ 84 - 0
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/cluster/ClusterDeviceMessageBrokeMessageBroker.java

@@ -0,0 +1,84 @@
+package org.jetlinks.community.standalone.configuration.cluster;
+
+import cn.hutool.core.collection.CollectionUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.hswebframework.web.exception.BusinessException;
+import org.jetlinks.community.standalone.configuration.cluster.message.ClusterMessage;
+import org.jetlinks.core.cluster.ClusterManager;
+import org.jetlinks.core.cluster.ClusterTopic;
+import org.jetlinks.core.cluster.ServerNode;
+import org.jetlinks.core.device.StandaloneDeviceMessageBroker;
+import org.jetlinks.core.message.BroadcastMessage;
+import org.jetlinks.core.message.Message;
+import org.reactivestreams.Publisher;
+import reactor.core.publisher.*;
+import reactor.core.scheduler.Schedulers;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName ClusterDeviceMessageBrokeMessageBroker.java
+ * @Description 上行数据
+ * @createTime 2021年11月05日 08:47:00
+ */
+@Slf4j
+public class ClusterDeviceMessageBrokeMessageBroker extends StandaloneDeviceMessageBroker {
+
+    private String serverId;
+
+    private ClusterManager clusterManager;
+
+    private List<ServerNode> allNode;
+
+    public ClusterDeviceMessageBrokeMessageBroker(String serverId, ClusterManager clusterManager) {
+        this.serverId = serverId;
+        this.clusterManager = clusterManager;
+    }
+
+    @Override
+    public Flux<Message> handleSendToDeviceMessage(String serverId) {
+        return clusterManager.getTopic(ClusterMessageType.topicOf(serverId,ClusterMessageType.down))
+            .subscribePattern()
+            .publishOn(Schedulers.boundedElastic())
+            .map(message-> ((ClusterMessage)message.getMessage()).getPayload())
+            .mergeWith(super.handleSendToDeviceMessage(serverId));
+    }
+
+
+    @Override
+    public Mono<Integer> send(String serverId, Publisher<? extends Message> message) {
+        if(this.serverId.equals(serverId)){
+            //当前设备
+            return super.send(serverId,message);
+        }else {
+            Optional<ServerNode> server = allNode.stream().filter(node -> node.getId().equals(serverId)).findFirst();
+            if(!server.isPresent()){
+                //设备连接服务器离线,则发送信息失败
+                log.error("服务器{}离线,发送设备信息失败",serverId);
+                //todo 设备设置为离线状态
+                throw new BusinessException("设备所连接服务器离线,无法发送信息");
+
+            }
+        }
+        return clusterManager.getTopic(ClusterMessageType.topicOf(serverId,ClusterMessageType.down))
+            .publish(Mono.from(message)
+                .map(msg->new ClusterMessage(msg,serverId,ClusterMessageType.topicOf(serverId,ClusterMessageType.down))));
+    }
+
+    @Override
+    public Mono<Integer> send(Publisher<? extends BroadcastMessage> message) {
+        List<ServerNode> serverNodes = CollectionUtil.newArrayList(allNode);
+        return  Flux.fromStream(serverNodes.stream())
+            .filter(allNode::contains)
+            .map(ServerNode::getId)
+            .flatMap(serverId-> clusterManager.getTopic(ClusterMessageType.topicOf(serverId,ClusterMessageType.down))
+                .publish(message))
+            .collect(Collectors.counting())
+            .map(Long::intValue);
+    }
+
+}

+ 95 - 0
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/cluster/ClusterDeviceMessageConnector.java

@@ -0,0 +1,95 @@
+package org.jetlinks.community.standalone.configuration.cluster;
+
+import lombok.extern.slf4j.Slf4j;
+import org.jetlinks.community.device.message.DeviceMessageConnector;
+import org.jetlinks.core.cluster.ClusterManager;
+import org.jetlinks.core.cluster.ServerNode;
+import org.jetlinks.core.device.DeviceOperator;
+import org.jetlinks.core.device.DeviceRegistry;
+import org.jetlinks.core.event.EventBus;
+import org.jetlinks.core.message.Message;
+import org.jetlinks.core.server.MessageHandler;
+import org.jetlinks.core.server.session.DeviceSessionManager;
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+import javax.annotation.Nonnull;
+import java.time.Duration;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName ClusterDeviceMessageConnector.java
+ * @Description 上行数据
+ * @createTime 2021年11月06日 16:18:00
+ */
+@Slf4j
+public class ClusterDeviceMessageConnector extends DeviceMessageConnector {
+
+
+    public ClusterDeviceMessageConnector(EventBus eventBus, DeviceRegistry registry, MessageHandler messageHandler, DeviceSessionManager sessionManager, String serverId, ClusterManager clusterManager) {
+        super(eventBus, registry, messageHandler, sessionManager);
+        this.serverId = serverId;
+        this.clusterManager = clusterManager;
+    }
+
+    private String serverId;
+
+    private ClusterManager clusterManager;
+
+    private List<ServerNode> allNode;
+    private volatile  boolean start=false;
+
+    Disposable listenTopic=null;
+
+    public synchronized void  init(){
+        if(start){
+            return;
+        }
+        start=true;
+
+        //映射出所有集群节点信息
+        allNode = clusterManager.getHaManager().getAllNode();
+
+        //10秒查看监听流是否存活
+        //注册监听队列
+        Flux.interval(Duration.ofSeconds(10))
+            .doOnNext(ignore->{
+                if(listenTopic==null||listenTopic.isDisposed()){
+                    listenTopic = clusterManager.getTopic(ClusterMessageType.topicOf(serverId, ClusterMessageType.up))
+                        .subscribePattern()
+                        .publishOn(Schedulers.boundedElastic())
+                        .flatMap(message -> super.handleMessage(null,
+                            (Message) message.getMessage()))
+                        .doOnError(e -> log.error("集群信息接收失败,节点关闭{};", serverId,e))
+                        .subscribe();
+                    log.info("集群节点重启{}成功",serverId);
+                }
+            }).subscribe();
+
+    }
+    @Override
+    public Mono<Boolean> handleMessage(DeviceOperator device, @Nonnull Message message) {
+       return  device.getConnectionServerId()
+            .flatMap(serverId->{
+                //设备连接当前服务器,不需要通过集群传递信息
+                if(serverId.equals(this.serverId)){
+                    return super.handleMessage(device,message);
+                }else {
+                    Optional<ServerNode> first = allNode.stream().filter(node -> node.getId().equals(serverId)).findFirst();
+                    if(!first.isPresent()){
+                        //设备连接服务器离线,则信息交由当前服务器处理
+                        return super.handleMessage(device,message);
+                    }
+                    return clusterManager
+                        .getTopic(ClusterMessageType.topicOf(serverId,ClusterMessageType.up))
+                        .publish(Mono.just(message))
+                        .flatMap(result->result==0?Mono.just(false):Mono.just(true));
+                }
+            });
+    }
+}

+ 27 - 0
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/cluster/ClusterMessageType.java

@@ -0,0 +1,27 @@
+package org.jetlinks.community.standalone.configuration.cluster;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName ClusterMessageType.java
+ * @Description TODO
+ * @createTime 2021年11月06日 16:37:00
+ */
+public enum  ClusterMessageType {
+    up,
+    down;
+
+    static String pattern_send_down="cluster:%s-message-down";
+    static String pattern_send_up="cluster:%s-message-up";
+    public static String topicOf(String serverId,@NotNull ClusterMessageType messageType){
+        switch (messageType){
+            case down:
+                return String.format(pattern_send_down,serverId);
+            case up:
+                return String.format(pattern_send_up,serverId);
+            default: return null;
+        }
+    }
+}

+ 47 - 0
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/cluster/message/ClusterMessage.java

@@ -0,0 +1,47 @@
+package org.jetlinks.community.standalone.configuration.cluster.message;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import org.jetlinks.core.cluster.ClusterTopic;
+import org.jetlinks.core.message.Message;
+
+import java.io.Serializable;
+import java.util.UUID;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName ClusterMessage.java
+ * @Description TODO
+ * @createTime 2021年11月06日 09:53:00
+ */
+@AllArgsConstructor
+@Data
+public class ClusterMessage implements ClusterTopic.TopicMessage, Serializable {
+    private String messageId;
+
+    private Message payload;
+
+    private String fromServer;
+
+    private String address;
+
+    private long timestamp;
+
+    public ClusterMessage(Message payload, String fromServer, String address) {
+        this.payload = payload;
+        this.fromServer = fromServer;
+        this.address = address;
+        this.messageId = payload.getMessageId();
+    }
+
+    @Override
+    public String getTopic() {
+        return address;
+    }
+
+    @Override
+    public Object getMessage() {
+        return payload;
+    }
+}

+ 26 - 0
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/cluster/message/ClusterMessageReply.java

@@ -0,0 +1,26 @@
+package org.jetlinks.community.standalone.configuration.cluster.message;
+
+import lombok.Data;
+import org.jetlinks.core.message.Message;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName ClusterMessageReply.java
+ * @Description TODO
+ * @createTime 2021年11月06日 09:57:00
+ */
+@Data
+public class ClusterMessageReply {
+    private String message;
+    private String topic;
+    private Message payload;
+    private long timestamp;
+
+    public ClusterMessageReply(String message, String topic, Message payload) {
+        this.message = message;
+        this.topic = topic;
+        this.payload = payload;
+        this.timestamp=System.currentTimeMillis();
+    }
+}

+ 2 - 1
jetlinks-standalone/src/main/resources/application.yml

@@ -115,7 +115,7 @@ hsweb:
     redis:
       local-cache-type: guava
 jetlinks:
-#  server-id: ${spring.application.name}:${server.port} #设备服务网关服务ID,不同服务请设置不同的ID
+  server-id: ${spring.application.name}:${server.port} #设备服务网关服务ID,不同服务请设置不同的ID
   logging:
     system:
       context:
@@ -123,6 +123,7 @@ jetlinks:
   protocol:
     spi:
       enabled: true # 为true时开启自动加载通过依赖引入的协议包
+  cluster: true
 logging:
   level:
     org.jetlinks: error

+ 27 - 3
jetlinks-standalone/src/test/java/org/jetlinks/community/network/manager/web/GateWayTest.java

@@ -1,8 +1,13 @@
 package org.jetlinks.community.network.manager.web;
 
 import io.vertx.core.Vertx;
+import org.hswebframework.ezorm.core.param.QueryParam;
+import org.hswebframework.web.api.crud.entity.QueryParamEntity;
+import org.hswebframework.web.api.crud.entity.TreeSupportEntity;
+import org.jetlinks.community.device.entity.GeoRegionEntity;
 import org.jetlinks.community.device.service.LocalDeviceFirmwareService;
 import org.jetlinks.community.device.service.LocalDeviceInstanceService;
+import org.jetlinks.community.device.service.LocalGeoRegionService;
 import org.jetlinks.community.gateway.DeviceGatewayManager;
 import org.jetlinks.community.network.NetworkManager;
 import org.jetlinks.community.network.manager.service.DeviceGatewayService;
@@ -15,6 +20,12 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.context.annotation.EnableAspectJAutoProxy;
 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.HashSet;
+import java.util.List;
+
 /**
  * @author lifang
  * @version 1.0.0
@@ -46,11 +57,24 @@ public class GateWayTest {
     @Autowired
     private LocalDeviceFirmwareService firmwareService;
 
+    @Autowired
+    private LocalGeoRegionService geoRegionService;
     @Test
     public void test(){
-        instanceService.createQuery()
-            .fetch()
 
-            .subscribe();
+        QueryParamEntity queryParam = new QueryParamEntity();
+        Mono.zip(geoRegionService.query(queryParam).collectList(),Mono.just( System.currentTimeMillis()))
+            .flatMap(tp2->{
+                long l = System.currentTimeMillis();
+                long s1 = l - tp2.getT2();
+                System.out.println("查询用时》》》》》》》》》》" + s1);
+                 l = System.currentTimeMillis();
+                List<GeoRegionEntity> geoRegionEntities = TreeSupportEntity.list2tree(tp2.getT1(), GeoRegionEntity::setChildren);
+                long s2 = System.currentTimeMillis() - l;
+                System.out.println("转换用时》》》》》》》》》》" + s2);
+                return Mono.empty();
+            }).subscribe();
+
+
     }
 }