Prechádzať zdrojové kódy

add 设备订阅主题实时更新显示

18339543638 4 rokov pred
rodič
commit
fc2d8887c6

+ 1 - 1
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/dashboard/DashBoardSubscriptionProvider.java

@@ -44,7 +44,7 @@ public class DashBoardSubscriptionProvider implements SubscriptionProvider {
                 return dashboardManager.getDashboard(variables.get("dashboard"))
                     .flatMap(dashboard -> dashboard.getObject(variables.get("object")))
                     .flatMap(object -> object.getMeasurement(variables.get("measurement")))
-                    .flatMap(measurement -> measurement.getDimension(variables.get("dimension")))
+                    .flatMap(   measurement -> measurement.getDimension(variables.get("dimension")))
                     .flatMapMany(dimension -> dimension.getValue(MeasurementParameter.of(request.getParameter())))
                     .map(val -> Message.success(request.getId(), request.getTopic(), val));
             } catch (Exception e) {

+ 23 - 16
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java

@@ -25,10 +25,7 @@ import org.jetlinks.core.device.DeviceOperator;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.device.MqttAuthenticationRequest;
 import org.jetlinks.core.event.EventBus;
-import org.jetlinks.core.message.CommonDeviceMessage;
-import org.jetlinks.core.message.CommonDeviceMessageReply;
-import org.jetlinks.core.message.DeviceMessage;
-import org.jetlinks.core.message.Message;
+import org.jetlinks.core.message.*;
 import org.jetlinks.core.message.codec.DefaultTransport;
 import org.jetlinks.core.message.codec.FromDeviceMessageContext;
 import org.jetlinks.core.message.codec.MqttMessage;
@@ -125,8 +122,8 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
             .publishOn(Schedulers.parallel())
             .flatMap(this::handleConnection)
             .flatMap(tuple3 -> handleAuthResponse(tuple3.getT1(), tuple3.getT2(), tuple3.getT3()))
-            .doOnNext(tp-> handleSubscriptionTopic(tp.getT1(),tp.getT2()))
-            .doOnNext(tp-> handleUnSubscriptionTopic(tp.getT1(),tp.getT2()))
+            .flatMap(this::handleSubscriptionTopic)
+            .flatMap(this::handleUnSubscriptionTopic)
             .flatMap(tp -> handleAcceptedMqttConnection(tp.getT1(), tp.getT2(), tp.getT3()), Integer.MAX_VALUE)
             .onErrorContinue((err, obj) -> log.error("处理MQTT连接失败", err))
             .subscriberContext(ReactiveLogger.start("network", mqttServer.getId()))
@@ -217,24 +214,34 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
             }));
     }
     //处理已经建立连接的MQTT连接的主题订阅
-    private Mono<Void> handleSubscriptionTopic(MqttConnection connection, DeviceOperator operator) {
-        return connection
-            .handleSubscribe(true)
+    private Mono<Tuple3<MqttConnection, DeviceOperator, MqttConnectionSession>> handleSubscriptionTopic(Tuple3<MqttConnection, DeviceOperator, MqttConnectionSession> tuple3) {
+        return tuple3.getT1()
+            .handleSubscribe(false)
             .doOnNext(topic->{
                 MqttSubscribeMessage message = topic.getMessage();
                 Set<String> topics = message.topicSubscriptions().stream().map(MqttTopicSubscription::topicName).collect(Collectors.toSet());
-                operator.addTopics(topics);
-            }).then();
+                tuple3.getT2().addTopics(topics);
+            })
+            .flatMap(ignore->
+                eventBus.publish(String.format("/dashboard/device/%s/changed/topics",
+                    tuple3.getT2().getDeviceId()),new TimeSyncMessage()))
+            .collectList()
+            .map(ignore->tuple3);
     }
 
     //取消MQTT连接的主题订阅
-    private Mono<Void> handleUnSubscriptionTopic(MqttConnection connection, DeviceOperator operator) {
-        return connection
-            .handleUnSubscribe(true)
+    private Mono<Tuple3<MqttConnection, DeviceOperator, MqttConnectionSession>> handleUnSubscriptionTopic(Tuple3<MqttConnection, DeviceOperator, MqttConnectionSession> tuple3) {
+        return tuple3.getT1()
+            .handleUnSubscribe(false)
             .doOnNext(topic->{
                 MqttUnsubscribeMessage message = topic.getMessage();
-                operator.removeTopics(message.topics());
-            }).then();
+                tuple3.getT2().removeTopics(message.topics());
+            })
+            .flatMap(ignore->
+                eventBus.publish(String.format("/dashboard/device/%s/changed/topics",
+                    tuple3.getT2().getDeviceId()),new TimeSyncMessage()))
+            .collectList()
+            .map(ignore->tuple3);
     }
 
     //处理已经建立连接的MQTT连接

+ 6 - 0
jetlinks-core/src/main/java/org/jetlinks/core/defaults/DefaultDeviceOperator.java

@@ -20,6 +20,7 @@ import org.jetlinks.core.message.state.DeviceStateCheckMessageReply;
 import org.jetlinks.core.metadata.DeviceMetadata;
 import org.jetlinks.core.utils.IdUtils;
 import org.springframework.util.StringUtils;
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 import java.time.Duration;
@@ -342,6 +343,11 @@ public class DefaultDeviceOperator implements DeviceOperator, StorageConfigurabl
         return Mono.just(this.topics.removeAll(topic));
     }
 
+    @Override
+    public Set<String>  getTopics() {
+        return topics;
+    }
+
     @Override
     public Mono<Boolean> offline() {
         return this

+ 4 - 0
jetlinks-core/src/main/java/org/jetlinks/core/device/DeviceOperator.java

@@ -7,10 +7,12 @@ import org.jetlinks.core.Value;
 import org.jetlinks.core.Values;
 import org.jetlinks.core.config.ConfigKey;
 import org.jetlinks.core.metadata.DeviceMetadata;
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -87,6 +89,8 @@ public interface DeviceOperator extends Configurable {
      * @return 删除设备订阅主题
      */
     Mono<Boolean> removeTopics(Collection<String> topic);
+
+    Set<String> getTopics();
     /**
      * 设备上线
      *

+ 2 - 1
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DeviceObjectDefinition.java

@@ -8,7 +8,8 @@ import org.jetlinks.community.dashboard.ObjectDefinition;
 @AllArgsConstructor
 public enum DeviceObjectDefinition implements ObjectDefinition {
     status("设备状态"),
-    message("设备消息");
+    message("设备消息"),
+    topics("订阅主题");
 
     @Override
     public String getId() {

+ 90 - 0
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/topic/DeviceTopicMeasurement.java

@@ -0,0 +1,90 @@
+package org.jetlinks.community.device.measurements.topic;
+
+import io.vavr.control.Option;
+import org.jetlinks.community.dashboard.*;
+import org.jetlinks.community.dashboard.supports.StaticMeasurement;
+import org.jetlinks.community.timeseries.TimeSeriesManager;
+import org.jetlinks.core.device.DeviceOperator;
+import org.jetlinks.core.device.DeviceRegistry;
+import org.jetlinks.core.event.EventBus;
+import org.jetlinks.core.event.Subscription;
+import org.jetlinks.core.event.TopicPayload;
+import org.jetlinks.core.metadata.ConfigMetadata;
+import org.jetlinks.core.metadata.DataType;
+import org.jetlinks.core.metadata.types.ArrayType;
+import org.jetlinks.core.metadata.types.StringType;
+import org.jetlinks.core.utils.TopicUtils;
+import reactor.core.publisher.Flux;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName DeviceTopicMeasurement.java
+ * @Description TODO
+ * @createTime 2021年10月12日 14:08:00
+ */
+public class DeviceTopicMeasurement extends StaticMeasurement {
+    private final EventBus eventBus;
+
+    private final TimeSeriesManager timeSeriesManager;
+
+    private final DeviceRegistry deviceRegistry;
+
+    static MeasurementDefinition definition = MeasurementDefinition.of("topics", "设备订阅主题列表");
+
+    public DeviceTopicMeasurement(EventBus eventBus,
+                                  DeviceRegistry registry,
+                                  TimeSeriesManager timeSeriesManager) {
+        super(definition);
+        this.deviceRegistry = registry;
+        this.eventBus = eventBus;
+        this.timeSeriesManager = timeSeriesManager;
+        addDimension(new RealTimeTopicsDimension());
+    }
+
+    class RealTimeTopicsDimension implements MeasurementDimension {
+
+        @Override
+        public DimensionDefinition getDefinition() {
+            return CommonDimensionDefinition.realTime;
+        }
+
+        @Override
+        public DataType getValueType() {
+            return new ArrayType().elementType(new StringType());
+        }
+
+        @Override
+        public ConfigMetadata getParams() {
+            return null;
+        }
+
+        @Override
+        public boolean isRealTime() {
+            return true;
+        }
+
+        @Override
+        public Flux<MeasurementValue> getValue(MeasurementParameter parameter) {
+
+
+            //通过订阅消息来统计设备订阅主题消息
+            return eventBus
+                .subscribe(Subscription.of("real-time-device-topics",
+                    String.format("/dashboard/device/%s/changed/topics",parameter.get("deviceId").orElse("null-")), Subscription.Feature.local))
+                .doOnNext(TopicPayload::release)
+                .map(data->{
+                    String deviceId=String.valueOf( parameter.get("deviceId").orElse("null-"));
+                    return deviceRegistry.getDevice(deviceId)
+                        .map(DeviceOperator::getTopics);
+                })
+                .flatMap(Function.identity())
+                .flatMap(data -> Flux.just(SimpleMeasurementValue.of(
+                    data,
+                    System.currentTimeMillis())));
+        }
+    }
+}

+ 31 - 0
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/topic/DeviceTopicMeasurementProvider.java

@@ -0,0 +1,31 @@
+package org.jetlinks.community.device.measurements.topic;
+
+import io.micrometer.core.instrument.MeterRegistry;
+import org.jetlinks.community.dashboard.supports.StaticMeasurementProvider;
+import org.jetlinks.community.device.measurements.DeviceDashboardDefinition;
+import org.jetlinks.community.device.measurements.DeviceObjectDefinition;
+import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric;
+import org.jetlinks.community.micrometer.MeterRegistryManager;
+import org.jetlinks.community.timeseries.TimeSeriesManager;
+import org.jetlinks.core.device.DeviceRegistry;
+import org.jetlinks.core.event.EventBus;
+import org.springframework.stereotype.Component;
+
+@Component
+public class DeviceTopicMeasurementProvider extends StaticMeasurementProvider {
+
+    MeterRegistry registry;
+
+    public DeviceTopicMeasurementProvider(EventBus eventBus,
+                                          MeterRegistryManager registryManager,
+                                          DeviceRegistry deviceRegistry,
+                                          TimeSeriesManager timeSeriesManager) {
+        super(DeviceDashboardDefinition.instance, DeviceObjectDefinition.topics);
+
+        registry = registryManager.getMeterRegister(DeviceTimeSeriesMetric.deviceMetrics().getId(),
+            "target", "msgType", "productId");
+
+        addMeasurement(new DeviceTopicMeasurement(eventBus, deviceRegistry, timeSeriesManager));
+
+    }
+}