Prechádzať zdrojové kódy

add 设备版本号

18339543638 4 rokov pred
rodič
commit
5362474e6c
13 zmenil súbory, kde vykonal 496 pridanie a 276 odobranie
  1. 3 0
      jetlinks-components/common-component/src/main/java/org/jetlinks/resources/i18n/messages_en.properties
  2. 3 0
      jetlinks-components/common-component/src/main/java/org/jetlinks/resources/i18n/messages_zh.properties
  3. 183 64
      jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/DefaultReactiveElasticsearchClient.java
  4. 27 1
      jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java
  5. 218 203
      jetlinks-core/src/main/java/org/jetlinks/core/defaults/DefaultDeviceOperator.java
  6. 9 0
      jetlinks-core/src/main/java/org/jetlinks/core/device/DeviceOperator.java
  7. 4 0
      jetlinks-manager/device-manager/pom.xml
  8. 5 0
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/entity/DeviceInstanceEntity.java
  9. 8 0
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/entity/DeviceProperty.java
  10. 0 2
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/DefaultDeviceDataService.java
  11. 31 4
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/DeviceMessageController.java
  12. 3 0
      jetlinks-manager/visualization-manager/src/main/java/org/jetlinks/community/visualization/entity/DataVisualizationEntity.java
  13. 2 2
      jetlinks-standalone/src/main/resources/application.yml

+ 3 - 0
jetlinks-components/common-component/src/main/java/org/jetlinks/resources/i18n/messages_en.properties

@@ -0,0 +1,3 @@
+message.device_message_handing=Message sent to device, processing...
+
+error.duplicate_key_detail=Duplicate Data:{0}

+ 3 - 0
jetlinks-components/common-component/src/main/java/org/jetlinks/resources/i18n/messages_zh.properties

@@ -0,0 +1,3 @@
+message.device_message_handing=消息已发往设备,处理中...
+
+error.duplicate_key_detail=重复的数据:{0}

+ 183 - 64
jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/DefaultReactiveElasticsearchClient.java

@@ -1,7 +1,10 @@
 package org.jetlinks.community.elastic.search.service.reactive;
 
 import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
 import com.alibaba.fastjson.serializer.SerializerFeature;
+import io.vavr.Function3;
+import lombok.Generated;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.http.client.methods.HttpGet;
@@ -69,6 +72,7 @@ import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.SearchHits;
 import org.elasticsearch.search.aggregations.Aggregation;
 import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
+import org.elasticsearch.search.internal.SearchContext;
 import org.elasticsearch.tasks.TaskId;
 import org.reactivestreams.Publisher;
 import org.springframework.data.elasticsearch.client.ClientLogger;
@@ -154,8 +158,12 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
     public Mono<Boolean> ping(HttpHeaders headers) {
 
         return sendRequest(new MainRequest(), requestCreator.ping(), RawActionResponse.class, headers) //
-            .map(response -> response.statusCode().is2xxSuccessful()) //
-            .onErrorResume(NoReachableHostException.class, error -> Mono.just(false)).next();
+            .map(response -> response
+                .statusCode()
+                .is2xxSuccessful()) //
+            .onErrorResume(NoReachableHostException.class, error -> Mono
+                .just(false))
+            .next();
     }
 
     /*
@@ -204,7 +212,9 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
     public Mono<Boolean> exists(HttpHeaders headers, GetRequest getRequest) {
 
         return sendRequest(getRequest, requestCreator.exists(), RawActionResponse.class, headers) //
-            .map(response -> response.statusCode().is2xxSuccessful()) //
+            .map(response -> response
+                .statusCode()
+                .is2xxSuccessful()) //
             .next();
     }
 
@@ -255,12 +265,26 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
         searchRequest.source().trackTotalHits(true);
         searchRequest.source().size(0);
         searchRequest.source().fetchSource(false);
-        return sendRequest(searchRequest, requestCreator.search(), SearchResponse.class, headers) //
-            .map(SearchResponse::getHits) //
-            .map(searchHits -> searchHits.getTotalHits().value) //
+        return sendRequest(searchRequest, this::buildSearchRequest, SearchResponse.class, headers)
+            .map(SearchResponse::getHits)
+            .map(searchHits -> searchHits.getTotalHits().value)
             .next();
     }
 
+    protected Request buildSearchRequest(SearchRequest request) {
+        //兼容6.x版本es
+        if (version.before(Version.V_7_0_0) && request
+            .source()
+            .trackTotalHitsUpTo() != SearchContext.TRACK_TOTAL_HITS_DISABLED) {
+            Request req = requestCreator.search().apply(request);
+            JSONObject json = JSON.parseObject(requestBodyToString(req));
+            json.put("track_total_hits", true);
+            req.setJsonEntity(json.toJSONString());
+            return req;
+        }
+        return requestCreator.search().apply(request);
+    }
+
     /*
      * (non-Javadoc)
      * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.action.search.SearchRequest)
@@ -329,24 +353,26 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
 
             scrollState -> {
 
-                Flux<SearchHit> searchHits = inbound.<SearchResponse>handle((searchResponse, sink) -> {
+                Flux<SearchHit> searchHits = inbound
+                    .<SearchResponse>handle((searchResponse, sink) -> {
 
-                    scrollState.updateScrollId(searchResponse.getScrollId());
-                    if (isEmpty(searchResponse.getHits())) {
+                        scrollState.updateScrollId(searchResponse.getScrollId());
+                        if (isEmpty(searchResponse.getHits())) {
 
-                        inbound.onComplete();
-                        outbound.onComplete();
+                            inbound.onComplete();
+                            outbound.onComplete();
 
-                    } else {
+                        } else {
 
-                        sink.next(searchResponse);
+                            sink.next(searchResponse);
 
-                        SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollState.getScrollId())
-                            .scroll(scrollTimeout);
-                        request.next(searchScrollRequest);
-                    }
+                            SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollState.getScrollId())
+                                .scroll(scrollTimeout);
+                            request.next(searchScrollRequest);
+                        }
 
-                }).map(SearchResponse::getHits) //
+                    })
+                    .map(SearchResponse::getHits) //
                     .flatMap(Flux::fromIterable);
 
                 return searchHits.doOnSubscribe(ignore -> exchange.subscribe(inbound));
@@ -689,29 +715,46 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
 
     // -->
 
-    private <Req extends ActionRequest, Resp> Flux<Resp> sendRequest(Req request, Function<Req, Request> converter,
-                                                                     Class<Resp> responseType, HttpHeaders headers) {
-        return sendRequest(converter.apply(request), responseType, headers);
+    private <Req extends ActionRequest, Resp> Flux<Resp> sendRequest(Req request,
+                                                                     Function<Req, Request> converter,
+                                                                     Class<Resp> responseType,
+                                                                     HttpHeaders headers) {
+        return sendRequest(request, converter, responseType, headers, DefaultReactiveElasticsearchClient::doDecode);
     }
 
-    private <Resp> Flux<Resp> sendRequest(Request request, Class<Resp> responseType, HttpHeaders headers) {
+    private <Req extends ActionRequest, Resp> Flux<Resp> sendRequest(Req request,
+                                                                     Function<Req, Request> converter,
+                                                                     Class<Resp> responseType,
+                                                                     HttpHeaders headers,
+                                                                     Function3<ClientResponse, Class<Resp>, String, Mono<Resp>> decoder) {
+        return sendRequest(converter.apply(request), responseType, headers, decoder);
+    }
+
+    private <Resp> Flux<Resp> sendRequest(Request request,
+                                          Class<Resp> responseType,
+                                          HttpHeaders headers,
+                                          Function3<ClientResponse, Class<Resp>, String, Mono<Resp>> decoder) {
 
         String logId = ClientLogger.newLogId();
 
         return execute(webClient -> sendRequest(webClient, logId, request, headers))
-            .flatMapMany(response -> readResponseBody(logId, request, response, responseType));
+            .flatMapMany(response -> readResponseBody(logId, request, response, responseType, decoder));
     }
 
     private Mono<ClientResponse> sendRequest(WebClient webClient, String logId, Request request, HttpHeaders headers) {
 
-        WebClient.RequestBodySpec requestBodySpec = webClient.method(HttpMethod.valueOf(request.getMethod().toUpperCase())) //
+        WebClient.RequestBodySpec requestBodySpec = webClient
+            .method(HttpMethod.valueOf(request.getMethod().toUpperCase())) //
             .uri(builder -> {
 
                 builder = builder.path(request.getEndpoint());
 
                 if (!ObjectUtils.isEmpty(request.getParameters())) {
-                    for (Map.Entry<String, String> entry : request.getParameters().entrySet()) {
-                        builder = builder.queryParam(entry.getKey(), entry.getValue());
+                    for (Map.Entry<String, String> entry : request
+                        .getParameters()
+                        .entrySet()) {
+                        builder = builder.queryParam(entry.getKey(), entry
+                            .getValue());
                     }
                 }
                 return builder.build();
@@ -725,8 +768,14 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
                 // and now those that might be set on the request.
                 if (request.getOptions() != null) {
 
-                    if (!ObjectUtils.isEmpty(request.getOptions().getHeaders())) {
-                        request.getOptions().getHeaders().forEach(it -> theHeaders.add(it.getName(), it.getValue()));
+                    if (!ObjectUtils.isEmpty(request
+                        .getOptions()
+                        .getHeaders())) {
+                        request
+                            .getOptions()
+                            .getHeaders()
+                            .forEach(it -> theHeaders.add(it.getName(), it
+                                .getValue()));
                     }
                 }
 
@@ -741,34 +790,41 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
 
             Lazy<String> body = bodyExtractor(request);
 
-            ClientLogger.logRequest(logId, request.getMethod().toUpperCase(), request.getEndpoint(), request.getParameters(),
+            ClientLogger.logRequest(logId, request
+                    .getMethod()
+                    .toUpperCase(), request.getEndpoint(), request.getParameters(),
                 body::get);
 
             requestBodySpec.contentType(MediaType.valueOf(request.getEntity().getContentType().getValue()));
             requestBodySpec.body(Mono.fromSupplier(body), String.class);
         } else {
-            ClientLogger.logRequest(logId, request.getMethod().toUpperCase(), request.getEndpoint(), request.getParameters());
+            ClientLogger.logRequest(logId, request
+                .getMethod()
+                .toUpperCase(), request.getEndpoint(), request.getParameters());
         }
 
         return requestBodySpec //
             .exchange() //
-            .onErrorReturn(ConnectException.class, ClientResponse.create(HttpStatus.SERVICE_UNAVAILABLE).build());
+            .onErrorReturn(ConnectException.class, ClientResponse
+                .create(HttpStatus.SERVICE_UNAVAILABLE)
+                .build());
     }
 
     private Lazy<String> bodyExtractor(Request request) {
 
-        return Lazy.of(() -> {
+        return Lazy.of(() -> requestBodyToString(request));
+    }
 
-            try {
-                return EntityUtils.toString(request.getEntity());
-            } catch (IOException e) {
-                throw new RequestBodyEncodingException("Error encoding request", e);
-            }
-        });
+    @SneakyThrows
+    private String requestBodyToString(Request request) {
+        return EntityUtils.toString(request.getEntity());
     }
 
-    private <T> Publisher<? extends T> readResponseBody(String logId, Request request, ClientResponse response,
-                                                        Class<T> responseType) {
+    private <T> Publisher<? extends T> readResponseBody(String logId,
+                                                        Request request,
+                                                        ClientResponse response,
+                                                        Class<T> responseType,
+                                                        Function3<ClientResponse, Class<T>, String, Mono<T>> decoder) {
 
         if (RawActionResponse.class.equals(responseType)) {
 
@@ -791,17 +847,24 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
         return response.body(BodyExtractors.toMono(byte[].class)) //
             .map(it -> new String(it, StandardCharsets.UTF_8)) //
             .doOnNext(it -> ClientLogger.logResponse(logId, response.statusCode(), it)) //
-            .flatMap(content -> doDecode(response, responseType, content));
+            .flatMap(content -> decoder.apply(response, responseType, content));
     }
 
+
     private static <T> Mono<T> doDecode(ClientResponse response, Class<T> responseType, String content) {
 
-        String mediaType = response.headers().contentType().map(MediaType::toString).orElse(XContentType.JSON.mediaType());
+        String mediaType = response
+            .headers()
+            .contentType()
+            .map(MediaType::toString)
+            .orElse(XContentType.JSON.mediaType());
 
         try {
 
             Method fromXContent = ReflectionUtils.findMethod(responseType, "fromXContent", XContentParser.class);
-
+            if (fromXContent == null) {
+                fromXContent = ReflectionUtils.findMethod(responseType, "fromXContext", XContentParser.class);
+            }
             return Mono.justOrEmpty(responseType
                 .cast(ReflectionUtils.invokeMethod(fromXContent, responseType, createParser(mediaType, content))));
 
@@ -810,14 +873,19 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
             try {
                 return Mono.error(BytesRestResponse.errorFromXContent(createParser(mediaType, content)));
             } catch (Exception e) {
-
                 return Mono
-                    .error(new ElasticsearchStatusException(content, RestStatus.fromCode(response.statusCode().value())));
+                    .error(new ElasticsearchStatusException(content,
+                        RestStatus.fromCode(response.statusCode().value()),
+                        errorParseFailure));
             }
         }
     }
 
     private static XContentParser createParser(String mediaType, String content) throws IOException {
+        XContentType type = XContentType.fromMediaTypeOrFormat(mediaType);
+        if (type == null) {
+            throw new IOException(content);
+        }
         return XContentType.fromMediaTypeOrFormat(mediaType) //
             .xContent() //
             .createParser(new NamedXContentRegistry(NamedXContents.getDefaultNamedXContents()),
@@ -828,14 +896,28 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
 
         int statusCode = response.statusCode().value();
         RestStatus status = RestStatus.fromCode(statusCode);
-        String mediaType = response.headers().contentType().map(MediaType::toString).orElse(XContentType.JSON.mediaType());
-
-        return response.body(BodyExtractors.toMono(byte[].class)) //
+        String mediaType = response
+            .headers()
+            .contentType()
+            .map(MediaType::toString)
+            .orElse(XContentType.JSON.mediaType());
+
+        return response
+            .body(BodyExtractors.toMono(byte[].class)) //
+            .switchIfEmpty(Mono.error(
+                () -> new ElasticsearchStatusException(String.format("%s request to %s returned error code %s and no body.",
+                    request.getMethod(),
+                    request.getEndpoint(),
+                    statusCode),
+                    status)))
             .map(bytes -> new String(bytes, StandardCharsets.UTF_8)) //
             .flatMap(content -> contentOrError(content, mediaType, status))
             .flatMap(unused -> Mono
-                .error(new ElasticsearchStatusException(String.format("%s request to %s returned error code %s.",
-                    request.getMethod(), request.getEndpoint(), statusCode), status)));
+                .error(() -> new ElasticsearchStatusException(String.format("%s request to %s returned error code %s.",
+                    request.getMethod(),
+                    request.getEndpoint(),
+                    statusCode),
+                    status)));
     }
 
     private <T> Publisher<? extends T> handleClientError(String logId, Request request, ClientResponse response,
@@ -843,7 +925,11 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
 
         int statusCode = response.statusCode().value();
         RestStatus status = RestStatus.fromCode(statusCode);
-        String mediaType = response.headers().contentType().map(MediaType::toString).orElse(XContentType.JSON.mediaType());
+        String mediaType = response
+            .headers()
+            .contentType()
+            .map(MediaType::toString)
+            .orElse(XContentType.JSON.mediaType());
 
         return response.body(BodyExtractors.toMono(byte[].class)) //
             .map(bytes -> new String(bytes, StandardCharsets.UTF_8)) //
@@ -868,6 +954,10 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
         ElasticsearchException exception = getElasticsearchException(content, mediaType, status);
 
         if (exception != null) {
+            if (status == RestStatus.NOT_FOUND) {
+                log.warn(exception.getMessage(), exception);
+                return Mono.empty();
+            }
             StringBuilder sb = new StringBuilder();
             buildExceptionMessages(sb, exception);
             return Mono.error(new ElasticsearchStatusException(sb.toString(), status, exception));
@@ -917,26 +1007,51 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
     @Override
     public Mono<SearchResponse> searchForPage(SearchRequest request) {
         long startTime = System.currentTimeMillis();
-        return sendRequest(request, requestCreator.search(), SearchResponse.class, HttpHeaders.EMPTY)
+        // if (version.after(Version.V_7_0_0)) {
+        request.source().trackTotalHits(true);
+        // }
+        return this
+            .sendRequest(request, this::buildSearchRequest, SearchResponse.class, HttpHeaders.EMPTY)
             .singleOrEmpty()
-            .doOnNext(res -> {
-                log.trace("execute search {} {}ms : {}", request.indices(), System.currentTimeMillis() - startTime, request.source());
-            })
-            .doOnError(err -> {
-                log.warn("execute search {} error : {}", request.indices(), request.source(), err);
-            });
+            .doOnSuccess(res -> log
+                .trace("execute search {} {}ms : {}", request.indices(), System.currentTimeMillis() - startTime, request.source()))
+            .doOnError(err -> log.warn("execute search {} error : {}", request.indices(), request.source(), err));
     }
 
     @SneakyThrows
     protected Request convertMultiSearchRequest(MultiSearchRequest searchRequest) {
-        return RequestConverters.multiSearch(searchRequest);
+        Request request = RequestConverters.multiSearch(searchRequest);
+        if (log.isTraceEnabled()) {
+            log.trace("execute elasticsearch multi search: {}", requestBodyToString(request));
+        }
+        return request;
     }
 
     @Override
     @SneakyThrows
     public Mono<MultiSearchResponse> multiSearch(MultiSearchRequest request) {
-
-        return sendRequest(request, this::convertMultiSearchRequest, MultiSearchResponse.class, HttpHeaders.EMPTY)
+        Function3<ClientResponse, Class<MultiSearchResponse>, String, Mono<MultiSearchResponse>> decoder;
+        if (version.before(Version.V_7_0_0)) {
+            //适配6.x响应格式
+            decoder = (clientResponse, multiSearchResponseClass, s) -> {
+                JSONObject data = JSON.parseObject(s);
+                int took = data.getJSONArray("responses")
+                    .stream()
+                    .map(JSONObject.class::cast)
+                    .map(json -> json.getIntValue("took"))
+                    .reduce(Math::addExact)
+                    .orElse(0);
+                data.put("took", took);
+                return DefaultReactiveElasticsearchClient.doDecode(clientResponse, multiSearchResponseClass, data.toJSONString());
+            };
+        } else {
+            decoder = DefaultReactiveElasticsearchClient::doDecode;
+        }
+        return sendRequest(request,
+            this::convertMultiSearchRequest,
+            MultiSearchResponse.class,
+            HttpHeaders.EMPTY,
+            decoder)
             .singleOrEmpty();
     }
 
@@ -962,8 +1077,10 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
     }
 
     Request convertGetIndexTemplateRequest(GetIndexTemplatesRequest getIndexTemplatesRequest) {
-        return new Request(HttpGet.METHOD_NAME, "/_template/" + String.join(",", getIndexTemplatesRequest.names()));
-//        return new Request(HttpGet.METHOD_NAME,  String.join(",", getIndexTemplatesRequest.names()));
+        Request request = new Request(HttpGet.METHOD_NAME, "/_template/" + String.join(",", getIndexTemplatesRequest.names()));
+        Params params = new Params(request);
+        params.putParam("include_type_name", "true");
+        return request;
     }
 
     @Override
@@ -1011,7 +1128,8 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
      *
      * @author Christoph Strobl
      */
-    class ClientStatus implements ReactiveElasticsearchClient.Status {
+    @Generated
+    static class ClientStatus implements ReactiveElasticsearchClient.Status {
 
         private final Collection<ElasticsearchHost> connectedHosts;
 
@@ -1029,6 +1147,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
         }
     }
 
+    @Generated
     static class Params {
         private final Request request;
 

+ 27 - 1
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java

@@ -1,6 +1,9 @@
 package org.jetlinks.community.network.mqtt.gateway.device;
 
 import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
+import io.vertx.mqtt.MqttTopicSubscription;
+import io.vertx.mqtt.messages.MqttSubscribeMessage;
+import io.vertx.mqtt.messages.MqttUnsubscribeMessage;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.hswebframework.web.logger.ReactiveLogger;
@@ -43,11 +46,12 @@ import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 import reactor.util.function.Tuple3;
 import reactor.util.function.Tuples;
-import sun.security.provider.MD5;
 
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 @Slf4j
 class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGateway {
@@ -121,6 +125,8 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
             .publishOn(Schedulers.parallel())
             .flatMap(this::handleConnection)
             .flatMap(tuple3 -> handleAuthResponse(tuple3.getT1(), tuple3.getT2(), tuple3.getT3()))
+            .doOnNext(tp-> handleSubscriptionTopic(tp.getT1(),tp.getT2()))
+            .doOnNext(tp-> handleUnSubscriptionTopic(tp.getT1(),tp.getT2()))
             .flatMap(tp -> handleAcceptedMqttConnection(tp.getT1(), tp.getT2(), tp.getT3()), Integer.MAX_VALUE)
             .onErrorContinue((err, obj) -> log.error("处理MQTT连接失败", err))
             .subscriberContext(ReactiveLogger.start("network", mqttServer.getId()))
@@ -210,6 +216,26 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
                 connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
             }));
     }
+    //处理已经建立连接的MQTT连接的主题订阅
+    private Mono<Void> handleSubscriptionTopic(MqttConnection connection, DeviceOperator operator) {
+        return connection
+            .handleSubscribe(true)
+            .doOnNext(topic->{
+                MqttSubscribeMessage message = topic.getMessage();
+                Set<String> topics = message.topicSubscriptions().stream().map(MqttTopicSubscription::topicName).collect(Collectors.toSet());
+                operator.addTopics(topics);
+            }).then();
+    }
+
+    //取消MQTT连接的主题订阅
+    private Mono<Void> handleUnSubscriptionTopic(MqttConnection connection, DeviceOperator operator) {
+        return connection
+            .handleUnSubscribe(true)
+            .doOnNext(topic->{
+                MqttUnsubscribeMessage message = topic.getMessage();
+                operator.removeTopics(message.topics());
+            }).then();
+    }
 
     //处理已经建立连接的MQTT连接
     private Mono<Void> handleAcceptedMqttConnection(MqttConnection connection, DeviceOperator operator, MqttConnectionSession session) {

+ 218 - 203
jetlinks-core/src/main/java/org/jetlinks/core/defaults/DefaultDeviceOperator.java

@@ -36,9 +36,9 @@ public class DefaultDeviceOperator implements DeviceOperator, StorageConfigurabl
     private static final ConfigKey<Long> lastMetadataTimeKey = ConfigKey.of("lst_metadata_time");
 
     private static final AtomicReferenceFieldUpdater<DefaultDeviceOperator, DeviceMetadata> METADATA_UPDATER =
-            AtomicReferenceFieldUpdater.newUpdater(DefaultDeviceOperator.class, DeviceMetadata.class, "metadataCache");
+        AtomicReferenceFieldUpdater.newUpdater(DefaultDeviceOperator.class, DeviceMetadata.class, "metadataCache");
     private static final AtomicLongFieldUpdater<DefaultDeviceOperator> METADATA_TIME_UPDATER =
-            AtomicLongFieldUpdater.newUpdater(DefaultDeviceOperator.class, "lastMetadataTime");
+        AtomicLongFieldUpdater.newUpdater(DefaultDeviceOperator.class, "lastMetadataTime");
 
     private final String id;
 
@@ -60,6 +60,11 @@ public class DefaultDeviceOperator implements DeviceOperator, StorageConfigurabl
 
     private volatile DeviceMetadata metadataCache;
 
+    /**
+     * 设备已订阅主题
+     */
+    private final Set<String> topics=new HashSet<>();
+
     public DefaultDeviceOperator(String id,
                                  ProtocolSupports supports,
                                  ConfigStorageManager storageManager,
@@ -95,29 +100,29 @@ public class DefaultDeviceOperator implements DeviceOperator, StorageConfigurabl
         this.protocolSupportMono = getProduct().flatMap(DeviceProductOperator::getProtocol);
         this.stateChecker = deviceStateChecker;
         this.metadataMono = this
-                //获取最后更新物模型的时间
-                .getSelfConfig(lastMetadataTimeKey)
-                .flatMap(i -> {
-                    //如果有时间,则表示设备有独立的物模型.
-                    //如果时间一致,则直接返回物模型缓存.
-                    if (i.equals(lastMetadataTime) && metadataCache != null) {
-                        return Mono.just(metadataCache);
-                    }
-                    METADATA_TIME_UPDATER.set(this, i);
-                    //加载真实的物模型
-                    return Mono
-                            .zip(getSelfConfig(metadata),
-                                 protocolSupportMono)
-                            .flatMap(tp2 -> tp2
-                                    .getT2()
-                                    .getMetadataCodec()
-                                    .decode(tp2.getT1())
-                                    .doOnNext(metadata -> METADATA_UPDATER.set(this, metadata)));
-
-                })
-                //如果上游为空,则使用产品的物模型
-                .switchIfEmpty(this.getParent()
-                                   .flatMap(DeviceProductOperator::getMetadata));
+            //获取最后更新物模型的时间
+            .getSelfConfig(lastMetadataTimeKey)
+            .flatMap(i -> {
+                //如果有时间,则表示设备有独立的物模型.
+                //如果时间一致,则直接返回物模型缓存.
+                if (i.equals(lastMetadataTime) && metadataCache != null) {
+                    return Mono.just(metadataCache);
+                }
+                METADATA_TIME_UPDATER.set(this, i);
+                //加载真实的物模型
+                return Mono
+                    .zip(getSelfConfig(metadata),
+                        protocolSupportMono)
+                    .flatMap(tp2 -> tp2
+                        .getT2()
+                        .getMetadataCodec()
+                        .decode(tp2.getT1())
+                        .doOnNext(metadata -> METADATA_UPDATER.set(this, metadata)));
+
+            })
+            //如果上游为空,则使用产品的物模型
+            .switchIfEmpty(this.getParent()
+                .flatMap(DeviceProductOperator::getMetadata));
 
     }
 
@@ -134,25 +139,25 @@ public class DefaultDeviceOperator implements DeviceOperator, StorageConfigurabl
     @Override
     public Mono<String> getConnectionServerId() {
         return getSelfConfig(connectionServerId.getKey())
-                .map(Value::asString);
+            .map(Value::asString);
     }
 
     @Override
     public Mono<String> getSessionId() {
         return getSelfConfig(sessionId.getKey())
-                .map(Value::asString);
+            .map(Value::asString);
     }
 
     @Override
     public Mono<String> getAddress() {
         return getConfig("address")
-                .map(Value::asString);
+            .map(Value::asString);
     }
 
     @Override
     public Mono<Void> setAddress(String address) {
         return setConfig("address", address)
-                .then();
+            .then();
     }
 
     @Override
@@ -163,195 +168,205 @@ public class DefaultDeviceOperator implements DeviceOperator, StorageConfigurabl
     @Override
     public Mono<Byte> getState() {
         return this
-                .getSelfConfigs(Arrays.asList("state", parentGatewayId.getKey(), selfManageState.getKey()))
+            .getSelfConfigs(Arrays.asList("state", parentGatewayId.getKey(), selfManageState.getKey()))
+            .flatMap(values -> {
+                Byte state = values
+                    .getValue("state")
+                    .map(val -> val.as(Byte.class))
+                    .orElse(DeviceState.unknown);
+
+                boolean isSelfManageState = values
+                    .getValue(selfManageState.getKey())
+                    .map(val -> val.as(Boolean.class))
+                    .orElse(false);
+                if (isSelfManageState) {
+                    return Mono.just(state);
+                }
+                String parentGatewayId = values
+                    .getValue(DeviceConfigKey.parentGatewayId)
+                    .orElse(null);
+                if(getDeviceId().equals(parentGatewayId)){
+                    log.warn("设备[{}]存在循环依赖",parentGatewayId);
+                    return Mono.just(state);
+                }
+                //获取父级设备状态
+                if (!state.equals(DeviceState.online) && StringUtils.hasText(parentGatewayId)) {
+                    return registry
+                        .getDevice(parentGatewayId)
+                        .flatMap(DeviceOperator::getState);
+                }
+                return Mono.just(state);
+            })
+            .defaultIfEmpty(DeviceState.unknown);
+    }
+
+    private Mono<Byte> doCheckState() {
+        return Mono
+            .defer(() -> this
+                .getSelfConfigs(Arrays.asList(
+                    connectionServerId.getKey(),
+                    parentGatewayId.getKey(),
+                    selfManageState.getKey(),
+                    "state"))
                 .flatMap(values -> {
-                    Byte state = values
-                            .getValue("state")
-                            .map(val -> val.as(Byte.class))
-                            .orElse(DeviceState.unknown);
-
-                    boolean isSelfManageState = values
-                            .getValue(selfManageState.getKey())
-                            .map(val -> val.as(Boolean.class))
-                            .orElse(false);
-                    if (isSelfManageState) {
-                        return Mono.just(state);
+
+                    //当前设备连接到的服务器
+                    String server = values
+                        .getValue(connectionServerId)
+                        .orElse(null);
+
+                    //设备缓存的状态
+                    Byte state = values.getValue("state")
+                        .map(val -> val.as(Byte.class))
+                        .orElse(DeviceState.unknown);
+
+
+                    //如果缓存中存储有当前设备所在服务信息则尝试发起状态检查
+                    if (StringUtils.hasText(server)) {
+                        return handler
+                            .getDeviceState(server, Collections.singletonList(id))
+                            .map(DeviceStateInfo::getState)
+                            .singleOrEmpty()
+                            .timeout(Duration.ofSeconds(1), Mono.just(state))
+                            .defaultIfEmpty(state);
                     }
+
+                    //网关设备ID
                     String parentGatewayId = values
-                            .getValue(DeviceConfigKey.parentGatewayId)
-                            .orElse(null);
+                        .getValue(DeviceConfigKey.parentGatewayId)
+                        .orElse(null);
+
                     if(getDeviceId().equals(parentGatewayId)){
                         log.warn("设备[{}]存在循环依赖",parentGatewayId);
                         return Mono.just(state);
                     }
-                    //获取父级设备状态
-                    if (!state.equals(DeviceState.online) && StringUtils.hasText(parentGatewayId)) {
+                    //如果关联了上级网关设备则获取父设备状态
+                    if (StringUtils.hasText(parentGatewayId)) {
                         return registry
-                                .getDevice(parentGatewayId)
-                                .flatMap(DeviceOperator::getState);
-                    }
-                    return Mono.just(state);
-                })
-                .defaultIfEmpty(DeviceState.unknown);
-    }
-
-    private Mono<Byte> doCheckState() {
-        return Mono
-                .defer(() -> this
-                        .getSelfConfigs(Arrays.asList(
-                                connectionServerId.getKey(),
-                                parentGatewayId.getKey(),
-                                selfManageState.getKey(),
-                                "state"))
-                        .flatMap(values -> {
-
-                            //当前设备连接到的服务器
-                            String server = values
-                                    .getValue(connectionServerId)
-                                    .orElse(null);
-
-                            //设备缓存的状态
-                            Byte state = values.getValue("state")
-                                               .map(val -> val.as(Byte.class))
-                                               .orElse(DeviceState.unknown);
-
-
-                            //如果缓存中存储有当前设备所在服务信息则尝试发起状态检查
-                            if (StringUtils.hasText(server)) {
-                                return handler
-                                        .getDeviceState(server, Collections.singletonList(id))
-                                        .map(DeviceStateInfo::getState)
-                                        .singleOrEmpty()
-                                        .timeout(Duration.ofSeconds(1), Mono.just(state))
-                                        .defaultIfEmpty(state);
-                            }
-
-                            //网关设备ID
-                            String parentGatewayId = values
-                                    .getValue(DeviceConfigKey.parentGatewayId)
-                                    .orElse(null);
-
-                            if(getDeviceId().equals(parentGatewayId)){
-                                log.warn("设备[{}]存在循环依赖",parentGatewayId);
-                                return Mono.just(state);
-                            }
-                            //如果关联了上级网关设备则获取父设备状态
-                            if (StringUtils.hasText(parentGatewayId)) {
-                                return registry
-                                        .getDevice(parentGatewayId)
-                                        .flatMap(device -> device
-                                                .messageSender()
-                                                //发送设备状态检查指令给网关设备
-                                                .<ChildDeviceMessageReply>
-                                                        send(ChildDeviceMessage.create(parentGatewayId, DeviceStateCheckMessage.create(getDeviceId())))
-                                                .singleOrEmpty()
-                                                .map(msg -> {
-                                                    if (msg.getChildDeviceMessage() instanceof DeviceStateCheckMessageReply) {
-                                                        return ((DeviceStateCheckMessageReply) msg.getChildDeviceMessage())
-                                                                .getState();
-                                                    }
-                                                    log.warn("子设备状态检查返回消息错误{}", msg);
-                                                    return DeviceState.online;
-                                                })
-                                                .onErrorResume(err ->{
-                                                    //子设备是否自己管理状态
-                                                    if (values.getValue(selfManageState).orElse(false)) {
-                                                        return Mono.just(state);
-                                                    }
-                                                    return device.checkState();
-                                                }));
+                            .getDevice(parentGatewayId)
+                            .flatMap(device -> device
+                                .messageSender()
+                                //发送设备状态检查指令给网关设备
+                                .<ChildDeviceMessageReply>
+                                    send(ChildDeviceMessage.create(parentGatewayId, DeviceStateCheckMessage.create(getDeviceId())))
+                                .singleOrEmpty()
+                                .map(msg -> {
+                                    if (msg.getChildDeviceMessage() instanceof DeviceStateCheckMessageReply) {
+                                        return ((DeviceStateCheckMessageReply) msg.getChildDeviceMessage())
+                                            .getState();
+                                    }
+                                    log.warn("子设备状态检查返回消息错误{}", msg);
+                                    return DeviceState.online;
+                                })
+                                .onErrorResume(err ->{
+                                    //子设备是否自己管理状态
+                                    if (values.getValue(selfManageState).orElse(false)) {
+                                        return Mono.just(state);
+                                    }
+                                    return device.checkState();
+                                }));
 //                                return registry
 //                                        .getDevice(parentGatewayId)
 //                                        .flatMap(DeviceOperator::checkState);
-                            }
+                    }
 
-                            //如果是在线状态,则改为离线,否则保持状态不变
-                            if (state.equals(DeviceState.online)) {
-                                return Mono.just(DeviceState.offline);
-                            } else {
-                                return Mono.just(state);
-                            }
-                        }));
+                    //如果是在线状态,则改为离线,否则保持状态不变
+                    if (state.equals(DeviceState.online)) {
+                        return Mono.just(DeviceState.offline);
+                    } else {
+                        return Mono.just(state);
+                    }
+                }));
     }
 
     @Override
     public Mono<Byte> checkState() {
         return Mono
-                .zip(
-                        stateChecker
-                                .checkState(this)
-                                .switchIfEmpty(Mono.defer(() -> DEFAULT_STATE_CHECKER.checkState(this)))
-                                .defaultIfEmpty(DeviceState.online),
-                        this.getState()
-                )
-                .flatMap(tp2 -> {
-                    byte newer = tp2.getT1();
-                    byte old = tp2.getT2();
-                    //状态不一致?
-                    if (newer != old) {
-                        log.info("device[{}] state changed from {} to {}", this.getDeviceId(), old, newer);
-                        Map<String, Object> configs = new HashMap<>();
-                        configs.put("state", newer);
-                        if (newer == DeviceState.online) {
-                            configs.put("onlineTime", System.currentTimeMillis());
-                        } else if (newer == DeviceState.offline) {
-                            configs.put("offlineTime", System.currentTimeMillis());
-                        }
-                        return this
-                                .setConfigs(configs)
-                                .thenReturn(newer);
+            .zip(
+                stateChecker
+                    .checkState(this)
+                    .switchIfEmpty(Mono.defer(() -> DEFAULT_STATE_CHECKER.checkState(this)))
+                    .defaultIfEmpty(DeviceState.online),
+                this.getState()
+            )
+            .flatMap(tp2 -> {
+                byte newer = tp2.getT1();
+                byte old = tp2.getT2();
+                //状态不一致?
+                if (newer != old) {
+                    log.info("device[{}] state changed from {} to {}", this.getDeviceId(), old, newer);
+                    Map<String, Object> configs = new HashMap<>();
+                    configs.put("state", newer);
+                    if (newer == DeviceState.online) {
+                        configs.put("onlineTime", System.currentTimeMillis());
+                    } else if (newer == DeviceState.offline) {
+                        configs.put("offlineTime", System.currentTimeMillis());
                     }
-                    return Mono.just(newer);
-                });
+                    return this
+                        .setConfigs(configs)
+                        .thenReturn(newer);
+                }
+                return Mono.just(newer);
+            });
     }
 
     @Override
     public Mono<Long> getOnlineTime() {
         return this
-                .getSelfConfig("onlineTime")
-                .map(val -> val.as(Long.class))
-                .switchIfEmpty(Mono.defer(() -> this
-                        .getSelfConfig(parentGatewayId)
-                        .flatMap(registry::getDevice)
-                        .flatMap(DeviceOperator::getOnlineTime)));
+            .getSelfConfig("onlineTime")
+            .map(val -> val.as(Long.class))
+            .switchIfEmpty(Mono.defer(() -> this
+                .getSelfConfig(parentGatewayId)
+                .flatMap(registry::getDevice)
+                .flatMap(DeviceOperator::getOnlineTime)));
     }
 
     @Override
     public Mono<Long> getOfflineTime() {
         return this
-                .getSelfConfig("offlineTime")
-                .map(val -> val.as(Long.class))
-                .switchIfEmpty(Mono.defer(() -> this
-                        .getSelfConfig(parentGatewayId)
-                        .flatMap(registry::getDevice)
-                        .flatMap(DeviceOperator::getOfflineTime)));
+            .getSelfConfig("offlineTime")
+            .map(val -> val.as(Long.class))
+            .switchIfEmpty(Mono.defer(() -> this
+                .getSelfConfig(parentGatewayId)
+                .flatMap(registry::getDevice)
+                .flatMap(DeviceOperator::getOfflineTime)));
+    }
+
+    @Override
+    public Mono<Boolean> addTopics(Collection<String> topic) {
+        return Mono.just(this.topics.addAll(topic));
+    }
+
+    @Override
+    public Mono<Boolean> removeTopics(Collection<String> topic){
+        return Mono.just(this.topics.removeAll(topic));
     }
 
     @Override
     public Mono<Boolean> offline() {
         return this
-                .setConfigs(
-                        //selfManageState.value(true),
-                        connectionServerId.value(""),
-                        sessionId.value(""),
-                        ConfigKey.of("offlineTime").value(System.currentTimeMillis()),
-                        ConfigKey.of("state").value(DeviceState.offline)
-                )
-                .doOnError(err -> log.error("offline device error", err));
+            .setConfigs(
+                //selfManageState.value(true),
+                connectionServerId.value(""),
+                sessionId.value(""),
+                ConfigKey.of("offlineTime").value(System.currentTimeMillis()),
+                ConfigKey.of("state").value(DeviceState.offline)
+            )
+            .doOnError(err -> log.error("offline device error", err));
     }
 
     @Override
     public Mono<Boolean> online(String serverId, String sessionId, String address) {
         return this
-                .setConfigs(
-                      //  selfManageState.value(true),
-                        connectionServerId.value(serverId),
-                        DeviceConfigKey.sessionId.value(sessionId),
-                        ConfigKey.of("address").value(address),
-                        ConfigKey.of("onlineTime").value(System.currentTimeMillis()),
-                        ConfigKey.of("state").value(DeviceState.online)
-                )
-                .doOnError(err -> log.error("online device error", err));
+            .setConfigs(
+                //  selfManageState.value(true),
+                connectionServerId.value(serverId),
+                DeviceConfigKey.sessionId.value(sessionId),
+                ConfigKey.of("address").value(address),
+                ConfigKey.of("onlineTime").value(System.currentTimeMillis()),
+                ConfigKey.of("state").value(DeviceState.online)
+            )
+            .doOnError(err -> log.error("online device error", err));
     }
 
     @Override
@@ -371,15 +386,15 @@ public class DefaultDeviceOperator implements DeviceOperator, StorageConfigurabl
         disconnect.setDeviceId(getDeviceId());
         disconnect.setMessageId(IdUtils.newUUID());
         return messageSender()
-                .send(Mono.just(disconnect))
-                .next()
-                .map(DeviceMessageReply::isSuccess);
+            .send(Mono.just(disconnect))
+            .next()
+            .map(DeviceMessageReply::isSuccess);
     }
 
     @Override
     public Mono<AuthenticationResponse> authenticate(AuthenticationRequest request) {
         return getProtocol()
-                .flatMap(protocolSupport -> protocolSupport.authenticate(request, this));
+            .flatMap(protocolSupport -> protocolSupport.authenticate(request, this));
     }
 
     @Override
@@ -391,9 +406,9 @@ public class DefaultDeviceOperator implements DeviceOperator, StorageConfigurabl
     @Override
     public Mono<DeviceProductOperator> getParent() {
         return getReactiveStorage()
-                .flatMap(store -> store.getConfig(productId.getKey()))
-                .map(Value::asString)
-                .flatMap(registry::getProduct);
+            .flatMap(store -> store.getConfig(productId.getKey()))
+            .map(Value::asString)
+            .flatMap(registry::getProduct);
     }
 
     @Override
@@ -423,9 +438,9 @@ public class DefaultDeviceOperator implements DeviceOperator, StorageConfigurabl
         METADATA_UPDATER.set(this, null);
         METADATA_TIME_UPDATER.set(this, -1);
         return removeConfigs(metadata, lastMetadataTimeKey)
-                .then(this.getProtocol()
-                          .flatMap(support -> support.onDeviceMetadataChanged(this))
-                );
+            .then(this.getProtocol()
+                .flatMap(support -> support.onDeviceMetadataChanged(this))
+            );
     }
 
     @Override
@@ -435,24 +450,24 @@ public class DefaultDeviceOperator implements DeviceOperator, StorageConfigurabl
             configs.put(lastMetadataTimeKey.getKey(), lastMetadataTime = System.currentTimeMillis());
 
             return StorageConfigurable.super
-                    .setConfigs(configs)
-                    .doOnNext(suc -> {
-                        this.metadataCache = null;
-                    })
-                    .then(this.getProtocol()
-                              .flatMap(support -> support.onDeviceMetadataChanged(this))
-                    )
-                    .thenReturn(true);
+                .setConfigs(configs)
+                .doOnNext(suc -> {
+                    this.metadataCache = null;
+                })
+                .then(this.getProtocol()
+                    .flatMap(support -> support.onDeviceMetadataChanged(this))
+                )
+                .thenReturn(true);
         }
         return StorageConfigurable.super.setConfigs(configs);
     }
 
     private static Mono<Byte> checkState0(DefaultDeviceOperator operator) {
         return operator
-                .getProtocol()
-                .flatMap(ProtocolSupport::getStateChecker) //协议自定义了状态检查逻辑
-                .flatMap(deviceStateChecker -> deviceStateChecker.checkState(operator))
-                .switchIfEmpty(operator.doCheckState()) //默认的检查
-                ;
+            .getProtocol()
+            .flatMap(ProtocolSupport::getStateChecker) //协议自定义了状态检查逻辑
+            .flatMap(deviceStateChecker -> deviceStateChecker.checkState(operator))
+            .switchIfEmpty(operator.doCheckState()) //默认的检查
+            ;
     }
 }

+ 9 - 0
jetlinks-core/src/main/java/org/jetlinks/core/device/DeviceOperator.java

@@ -78,6 +78,15 @@ public interface DeviceOperator extends Configurable {
      */
     Mono<Long> getOfflineTime();
 
+    /**
+     * @return 增加设备订阅主题
+     */
+    Mono<Boolean> addTopics(Collection<String> topic);
+
+    /**
+     * @return 删除设备订阅主题
+     */
+    Mono<Boolean> removeTopics(Collection<String> topic);
     /**
      * 设备上线
      *

+ 4 - 0
jetlinks-manager/device-manager/pom.xml

@@ -73,6 +73,10 @@
             <version>${project.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>com.github.ben-manes.caffeine</groupId>
+            <artifactId>caffeine</artifactId>
+        </dependency>
     </dependencies>
 
 </project>

+ 5 - 0
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/entity/DeviceInstanceEntity.java

@@ -135,6 +135,11 @@ public class DeviceInstanceEntity extends GenericEntity<String> implements Recor
     @DefaultValue("0")
     private DeviceFeature[] features;
 
+    @Comment("设备实例名称")
+    @Column(name = "current_version")
+    @Schema(description = "设备当前版本")
+    private String currentVersion;
+
     public DeviceInfo toDeviceInfo() {
         DeviceInfo info = org.jetlinks.core.device.DeviceInfo
             .builder()

+ 8 - 0
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/entity/DeviceProperty.java

@@ -57,6 +57,14 @@ public class DeviceProperty implements Serializable {
     @Schema(description = "格式化后的时间,在聚合查询时此字段有值")
     private String formatTime;
 
+    /**
+     * 设备状态值,如果是查询的数据库,此字段可能为{@link null}
+     *
+     * @see ReportPropertyMessage#getPropertyStates()
+     */
+    @Schema(description = "状态值")
+    private String state;
+
     public DeviceProperty deviceId(String deviceId) {
         this.deviceId = deviceId;
         return this;

+ 0 - 2
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/DefaultDeviceDataService.java

@@ -5,8 +5,6 @@ import org.hswebframework.web.api.crud.entity.QueryParamEntity;
 import org.jetlinks.community.device.entity.DeviceEvent;
 import org.jetlinks.community.device.entity.DeviceOperationLogEntity;
 import org.jetlinks.community.device.entity.DeviceProperty;
-import org.jetlinks.community.device.message.writer.TimeSeriesMessageWriterConnector;
-import org.jetlinks.community.gateway.external.socket.WebSocketMessagingHandler;
 import org.jetlinks.community.timeseries.query.AggregationData;
 import org.jetlinks.core.Value;
 import org.jetlinks.core.device.DeviceOperator;

+ 31 - 4
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/DeviceMessageController.java

@@ -19,10 +19,10 @@ import org.jetlinks.core.device.DeviceOperator;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.enums.ErrorCode;
 import org.jetlinks.core.exception.DeviceOperationException;
-import org.jetlinks.core.message.DeviceMessageReply;
-import org.jetlinks.core.message.FunctionInvokeMessageSender;
-import org.jetlinks.core.message.ReadPropertyMessageSender;
-import org.jetlinks.core.message.WritePropertyMessageSender;
+import org.jetlinks.core.message.*;
+import org.jetlinks.core.message.firmware.ReadFirmwareMessage;
+import org.jetlinks.core.message.firmware.ReadFirmwareMessageReply;
+import org.jetlinks.core.message.firmware.UpgradeFirmwareMessage;
 import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
 import org.jetlinks.core.message.property.ReadPropertyMessageReply;
 import org.jetlinks.core.message.property.WritePropertyMessageReply;
@@ -101,7 +101,34 @@ public class DeviceMessageController implements
                             .build()
                             .withValue(dataType, value));
                 })));
+    }
+
 
+    //获取设备固件信息
+    @GetMapping("/standard/{deviceId}/firmware/version")
+    @SneakyThrows
+    @Deprecated
+    public Mono<?> getFirmwareVersion(@PathVariable String deviceId) {
+        return registry
+            .getDevice(deviceId)
+            .switchIfEmpty(ErrorUtils.notFound("设备不存在"))
+            .flatMap(operator -> {
+                ReadFirmwareMessage firmwareMessage = new ReadFirmwareMessage();
+                firmwareMessage.setDeviceId(deviceId);
+                firmwareMessage.setTimestamp(System.currentTimeMillis());
+                firmwareMessage.setMessageId(IDGenerator.SNOW_FLAKE_STRING.generate());
+                return operator.messageSender()
+                    .send(Flux.just(firmwareMessage))
+                    .map(mapReply(ReadFirmwareMessageReply::getVersion))
+                    .single();
+            })
+            .doOnNext(version->{
+                service.createUpdate()
+                    .where(DeviceInstanceEntity::getId,deviceId)
+                    .set(DeviceInstanceEntity::getCurrentVersion,version)
+                    .execute()
+                    .subscribe();
+            });
     }
 
     //设置设备属性

+ 3 - 0
jetlinks-manager/visualization-manager/src/main/java/org/jetlinks/community/visualization/entity/DataVisualizationEntity.java

@@ -45,6 +45,9 @@ public class DataVisualizationEntity extends GenericEntity<String> {
     @ColumnType(jdbcType = JDBCType.CLOB)
     private String metadata;
 
+    @Column
+    private String catalogId;
+
     @Column(length = 32, nullable = false)
     @EnumCodec
     @ColumnType(javaType = String.class)

+ 2 - 2
jetlinks-standalone/src/main/resources/application.yml

@@ -206,5 +206,5 @@ springdoc:
 visual:
   base-path: "http://127.0.0.1:8844"
   urls:
-    big-screen-path: "http://192.168.104.115:8000/"
-    vis-configuration: "http://192.168.104.115:8000/"
+    big-screen-path: "http://192.168.104.115:9022/"
+    vis-configuration: "http://192.168.104.115:9022/"