Просмотр исходного кода

ws设备订阅主题监听 改为定时推送

18339543638 4 лет назад
Родитель
Сommit
c30abe7490

+ 8 - 8
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java

@@ -205,8 +205,8 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
                             registry.getDevice(conn.getClientId())
                                 .doOnNext(operator -> {
                                     operator.getTopics().clear();
-                                    eventBus.publish(String.format("/dashboard/device/%s/changed/topics",
-                                        conn.getClientId()),new TimeSyncMessage());
+//                                    eventBus.publish(String.format("/dashboard/device/%s/changed/topics",
+//                                        conn.getClientId()),new TimeSyncMessage());
                                 }).subscribe();
                         });
                         return Tuples.of(connection.accept(), device, newSession);
@@ -235,9 +235,9 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
                 Set<String> topics = message.topicSubscriptions().stream().map(MqttTopicSubscription::topicName).collect(Collectors.toSet());
                 tuple3.getT2().addTopics(topics);
             })
-            .flatMap(ignore->
-                eventBus.publish(String.format("/dashboard/device/%s/changed/topics",
-                    tuple3.getT2().getDeviceId()),new TimeSyncMessage()))
+//            .flatMap(ignore->
+//                eventBus.publish(String.format("/dashboard/device/%s/changed/topics",
+//                    tuple3.getT2().getDeviceId()),new TimeSyncMessage()))
             .subscribe();
     }
 
@@ -249,9 +249,9 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
                 MqttUnsubscribeMessage message = topic.getMessage();
                 tuple3.getT2().removeTopics(message.topics());
             })
-            .flatMap(ignore->
-                eventBus.publish(String.format("/dashboard/device/%s/changed/topics",
-                    tuple3.getT2().getDeviceId()),new TimeSyncMessage()))
+//            .flatMap(ignore->
+//                eventBus.publish(String.format("/dashboard/device/%s/changed/topics",
+//                    tuple3.getT2().getDeviceId()),new TimeSyncMessage()))
             .subscribe();
     }
 

+ 0 - 834
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/TopicHelper.java

@@ -1,834 +0,0 @@
-package org.jetlinks.community.network.mqtt.server;
-
-
-import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
-import io.netty.handler.codec.mqtt.MqttQoS;
-import io.vertx.core.AsyncResult;
-import io.vertx.core.Handler;
-import io.vertx.core.buffer.Buffer;
-import io.vertx.core.net.SocketAddress;
-import io.vertx.mqtt.MqttAuth;
-import io.vertx.mqtt.MqttEndpoint;
-import io.vertx.mqtt.MqttWill;
-import io.vertx.mqtt.messages.MqttPublishMessage;
-import io.vertx.mqtt.messages.MqttSubscribeMessage;
-import io.vertx.mqtt.messages.MqttUnsubscribeMessage;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-
-import javax.net.ssl.SSLSession;
-import java.util.*;
-
-/**
- * @author lifang
- * @version 1.0.0
- * @ClassName TopicHelper.java
- * @Description TODO
- * @createTime 2021年07月07日 16:35:00
- */
-public class TopicHelper {
-    private static final String ENDPOINTS="endpoints";
-    private static final String CHILDREN="childrenNode";
-
-
-    private static final Tree<MqttEndpoint> ROOT =new Tree<>();
-    //添加订阅主题
-    public static void  addTopic(String topic,MqttEndpoint endpoint){
-        if(topic.startsWith("/")){
-            topic=topic.substring(1);
-        }
-        String[] subTopics = topic.split("/");
-        addTopic(ROOT,subTopics,0,endpoint);
-    }
-
-    /**
-     * 删除相关主题下订阅通道
-     * @param topic
-     * @return
-     */
-    public static Set<MqttEndpoint>  deleteNodeEndPoint(String topic,MqttEndpoint endpoint){
-        if(topic.startsWith("/")){
-            topic=topic.substring(1);
-        }
-        String[] subTopics = topic.split("/");
-        Set<Tree<MqttEndpoint>> mqttEndpoints = new HashSet<>();
-        findMatchNodes(ROOT,subTopics,0,mqttEndpoints);
-        Set<MqttEndpoint> result = new HashSet<>();
-        mqttEndpoints.stream().filter(mqttEndpointTree -> mqttEndpointTree.get(ENDPOINTS)!=null)
-            .forEach(mqttEndpointTree -> result.addAll((Collection<? extends MqttEndpoint>) mqttEndpointTree.get(ENDPOINTS)));
-        return result;
-    }
-    /**
-     * 找到符合发布主题的所有订阅主题
-     * @param topic
-     * @return
-     */
-    public static Set<MqttEndpoint>  findMatchNodes(String topic){
-        if(topic.startsWith("/")){
-            topic=topic.substring(1);
-        }
-        String[] subTopics = topic.split("/");
-        Set<Tree<MqttEndpoint>> mqttEndpoints = new HashSet<>();
-        findMatchNodes(ROOT,subTopics,0,mqttEndpoints);
-        Set<MqttEndpoint> result = new HashSet<>();
-        mqttEndpoints.stream().filter(mqttEndpointTree -> mqttEndpointTree.get(ENDPOINTS)!=null)
-            .forEach(mqttEndpointTree -> result.addAll((Collection<? extends MqttEndpoint>) mqttEndpointTree.get(ENDPOINTS)));
-        return result;
-    }
-    @SuppressWarnings("unchecked")
-    private static synchronized void addTopic(Tree<MqttEndpoint> node,String[] topics,int pos,MqttEndpoint endpoint){
-        if(pos==topics.length){
-            //当到达最后的节点时
-            return;
-        }
-        String topic = topics[pos];
-        Set<Tree<MqttEndpoint>> children =
-            (Set<Tree<MqttEndpoint>>) Optional.ofNullable(node.get(CHILDREN)).orElse(new HashSet<>());
-        Tree<MqttEndpoint> eqTopic=Optional.ofNullable(findNode(children,topic)).
-            orElse(new Tree<>());
-        if(eqTopic.getParent()==null){
-            eqTopic.setParent(node);
-            eqTopic.setName(topic);
-        }
-        children.add(eqTopic);
-        if(pos==topics.length-1){
-            Set<MqttEndpoint> endpointSet = (Set<MqttEndpoint>) Optional.ofNullable(eqTopic.get(ENDPOINTS))
-                .orElse(new HashSet<>());
-            endpointSet.add(endpoint);
-            eqTopic.put(ENDPOINTS,endpointSet);
-        }
-        node.put(CHILDREN,children);
-        addTopic(eqTopic,topics,++pos,endpoint);
-    }
-
-    @SuppressWarnings("unchecked")
-    private static Set<Tree<MqttEndpoint>> findMatchNodes(Tree<MqttEndpoint> root,String[] topics,int pos,Set<Tree<MqttEndpoint>> result){
-        Set<Tree<MqttEndpoint>> children = Optional.ofNullable((Set<Tree<MqttEndpoint>>) Optional.ofNullable(root)
-            .orElse(new Tree<>()).get(CHILDREN)).orElse(new HashSet<>());
-        if(pos==topics.length){
-            //当到达最后的节点时
-            return children;
-        }
-        String topic = topics[pos];
-        for (Tree<MqttEndpoint> child : children) {
-            if(child.getName().equals("#")){
-                result.add(child);
-            }else if(child.getName().equals("+")||child.getName().equals(topic)){
-                if(pos==topics.length-1){
-                    result.add(child);
-                }
-                findMatchNodes(child,topics,pos+1,result);
-
-            }
-        }
-        return children;
-    }
-
-    private static Tree<MqttEndpoint> findNode(Set<Tree<MqttEndpoint>> children,String name){
-        for (Tree<MqttEndpoint> child : children) {
-            if(child.getName().equals(name)){
-                return child;
-            }
-        }
-        return null;
-    }
-
-    @Data
-    @EqualsAndHashCode(of = {"name","parent"},callSuper = false)
-    static class Tree<T> extends LinkedHashMap<String, Object> {
-        private String id;
-        private String name;
-        private Tree<T> parent;
-    }
-
-    public static void main(String[] args) {
-        String[] subTopics = "tuoren/+/ceshi/+".split("/");
-        String[] subTopics2 = "tuoren/+/ceshi/#".split("/");
-        String[] subTopics3 = "tuoren/+/+/#".split("/");
-        addTopic(ROOT, subTopics, 0, new MqttEndpoint() {
-            @Override
-            public void close() {
-
-            }
-
-            @Override
-            public SocketAddress remoteAddress() {
-                return null;
-            }
-
-            @Override
-            public SocketAddress localAddress() {
-                return null;
-            }
-
-            @Override
-            public boolean isSsl() {
-                return false;
-            }
-
-            @Override
-            public SSLSession sslSession() {
-                return null;
-            }
-
-            @Override
-            public String clientIdentifier() {
-                return null;
-            }
-
-            @Override
-            public MqttAuth auth() {
-                return null;
-            }
-
-            @Override
-            public MqttWill will() {
-                return null;
-            }
-
-            @Override
-            public int protocolVersion() {
-                return 0;
-            }
-
-            @Override
-            public String protocolName() {
-                return null;
-            }
-
-            @Override
-            public boolean isCleanSession() {
-                return false;
-            }
-
-            @Override
-            public int keepAliveTimeSeconds() {
-                return 0;
-            }
-
-            @Override
-            public int lastMessageId() {
-                return 0;
-            }
-
-            @Override
-            public void subscriptionAutoAck(boolean b) {
-
-            }
-
-            @Override
-            public boolean isSubscriptionAutoAck() {
-                return false;
-            }
-
-            @Override
-            public MqttEndpoint publishAutoAck(boolean b) {
-                return null;
-            }
-
-            @Override
-            public boolean isPublishAutoAck() {
-                return false;
-            }
-
-            @Override
-            public MqttEndpoint autoKeepAlive(boolean b) {
-                return null;
-            }
-
-            @Override
-            public boolean isAutoKeepAlive() {
-                return false;
-            }
-
-            @Override
-            public boolean isConnected() {
-                return false;
-            }
-
-            @Override
-            public MqttEndpoint setClientIdentifier(String s) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint disconnectHandler(Handler<Void> handler) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint subscribeHandler(Handler<MqttSubscribeMessage> handler) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint unsubscribeHandler(Handler<MqttUnsubscribeMessage> handler) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint publishHandler(Handler<MqttPublishMessage> handler) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint publishAcknowledgeHandler(Handler<Integer> handler) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint publishReceivedHandler(Handler<Integer> handler) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint publishReleaseHandler(Handler<Integer> handler) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint publishCompletionHandler(Handler<Integer> handler) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint pingHandler(Handler<Void> handler) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint closeHandler(Handler<Void> handler) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint exceptionHandler(Handler<Throwable> handler) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint accept() {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint accept(boolean b) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint reject(MqttConnectReturnCode mqttConnectReturnCode) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint subscribeAcknowledge(int i, List<MqttQoS> list) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint unsubscribeAcknowledge(int i) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint publishAcknowledge(int i) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint publishReceived(int i) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint publishRelease(int i) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint publishComplete(int i) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint publish(String s, Buffer buffer, MqttQoS mqttQoS, boolean b, boolean b1) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint publish(String s, Buffer buffer, MqttQoS mqttQoS, boolean b, boolean b1, Handler<AsyncResult<Integer>> handler) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint publish(String s, Buffer buffer, MqttQoS mqttQoS, boolean b, boolean b1, int i, Handler<AsyncResult<Integer>> handler) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint pong() {
-                return null;
-            }
-        });
-        addTopic(ROOT, subTopics2, 0, new MqttEndpoint() {
-            @Override
-            public void close() {
-
-            }
-
-            @Override
-            public SocketAddress remoteAddress() {
-                return null;
-            }
-
-            @Override
-            public SocketAddress localAddress() {
-                return null;
-            }
-
-            @Override
-            public boolean isSsl() {
-                return false;
-            }
-
-            @Override
-            public SSLSession sslSession() {
-                return null;
-            }
-
-            @Override
-            public String clientIdentifier() {
-                return null;
-            }
-
-            @Override
-            public MqttAuth auth() {
-                return null;
-            }
-
-            @Override
-            public MqttWill will() {
-                return null;
-            }
-
-            @Override
-            public int protocolVersion() {
-                return 0;
-            }
-
-            @Override
-            public String protocolName() {
-                return null;
-            }
-
-            @Override
-            public boolean isCleanSession() {
-                return false;
-            }
-
-            @Override
-            public int keepAliveTimeSeconds() {
-                return 0;
-            }
-
-            @Override
-            public int lastMessageId() {
-                return 0;
-            }
-
-            @Override
-            public void subscriptionAutoAck(boolean b) {
-
-            }
-
-            @Override
-            public boolean isSubscriptionAutoAck() {
-                return false;
-            }
-
-            @Override
-            public MqttEndpoint publishAutoAck(boolean b) {
-                return null;
-            }
-
-            @Override
-            public boolean isPublishAutoAck() {
-                return false;
-            }
-
-            @Override
-            public MqttEndpoint autoKeepAlive(boolean b) {
-                return null;
-            }
-
-            @Override
-            public boolean isAutoKeepAlive() {
-                return false;
-            }
-
-            @Override
-            public boolean isConnected() {
-                return false;
-            }
-
-            @Override
-            public MqttEndpoint setClientIdentifier(String s) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint disconnectHandler(Handler<Void> handler) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint subscribeHandler(Handler<MqttSubscribeMessage> handler) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint unsubscribeHandler(Handler<MqttUnsubscribeMessage> handler) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint publishHandler(Handler<MqttPublishMessage> handler) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint publishAcknowledgeHandler(Handler<Integer> handler) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint publishReceivedHandler(Handler<Integer> handler) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint publishReleaseHandler(Handler<Integer> handler) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint publishCompletionHandler(Handler<Integer> handler) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint pingHandler(Handler<Void> handler) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint closeHandler(Handler<Void> handler) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint exceptionHandler(Handler<Throwable> handler) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint accept() {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint accept(boolean b) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint reject(MqttConnectReturnCode mqttConnectReturnCode) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint subscribeAcknowledge(int i, List<MqttQoS> list) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint unsubscribeAcknowledge(int i) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint publishAcknowledge(int i) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint publishReceived(int i) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint publishRelease(int i) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint publishComplete(int i) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint publish(String s, Buffer buffer, MqttQoS mqttQoS, boolean b, boolean b1) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint publish(String s, Buffer buffer, MqttQoS mqttQoS, boolean b, boolean b1, Handler<AsyncResult<Integer>> handler) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint publish(String s, Buffer buffer, MqttQoS mqttQoS, boolean b, boolean b1, int i, Handler<AsyncResult<Integer>> handler) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint pong() {
-                return null;
-            }
-        });
-        addTopic(ROOT, subTopics3, 0, new MqttEndpoint() {
-            @Override
-            public void close() {
-
-            }
-
-            @Override
-            public SocketAddress remoteAddress() {
-                return null;
-            }
-
-            @Override
-            public SocketAddress localAddress() {
-                return null;
-            }
-
-            @Override
-            public boolean isSsl() {
-                return false;
-            }
-
-            @Override
-            public SSLSession sslSession() {
-                return null;
-            }
-
-            @Override
-            public String clientIdentifier() {
-                return null;
-            }
-
-            @Override
-            public MqttAuth auth() {
-                return null;
-            }
-
-            @Override
-            public MqttWill will() {
-                return null;
-            }
-
-            @Override
-            public int protocolVersion() {
-                return 0;
-            }
-
-            @Override
-            public String protocolName() {
-                return null;
-            }
-
-            @Override
-            public boolean isCleanSession() {
-                return false;
-            }
-
-            @Override
-            public int keepAliveTimeSeconds() {
-                return 0;
-            }
-
-            @Override
-            public int lastMessageId() {
-                return 0;
-            }
-
-            @Override
-            public void subscriptionAutoAck(boolean b) {
-
-            }
-
-            @Override
-            public boolean isSubscriptionAutoAck() {
-                return false;
-            }
-
-            @Override
-            public MqttEndpoint publishAutoAck(boolean b) {
-                return null;
-            }
-
-            @Override
-            public boolean isPublishAutoAck() {
-                return false;
-            }
-
-            @Override
-            public MqttEndpoint autoKeepAlive(boolean b) {
-                return null;
-            }
-
-            @Override
-            public boolean isAutoKeepAlive() {
-                return false;
-            }
-
-            @Override
-            public boolean isConnected() {
-                return false;
-            }
-
-            @Override
-            public MqttEndpoint setClientIdentifier(String s) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint disconnectHandler(Handler<Void> handler) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint subscribeHandler(Handler<MqttSubscribeMessage> handler) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint unsubscribeHandler(Handler<MqttUnsubscribeMessage> handler) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint publishHandler(Handler<MqttPublishMessage> handler) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint publishAcknowledgeHandler(Handler<Integer> handler) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint publishReceivedHandler(Handler<Integer> handler) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint publishReleaseHandler(Handler<Integer> handler) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint publishCompletionHandler(Handler<Integer> handler) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint pingHandler(Handler<Void> handler) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint closeHandler(Handler<Void> handler) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint exceptionHandler(Handler<Throwable> handler) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint accept() {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint accept(boolean b) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint reject(MqttConnectReturnCode mqttConnectReturnCode) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint subscribeAcknowledge(int i, List<MqttQoS> list) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint unsubscribeAcknowledge(int i) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint publishAcknowledge(int i) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint publishReceived(int i) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint publishRelease(int i) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint publishComplete(int i) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint publish(String s, Buffer buffer, MqttQoS mqttQoS, boolean b, boolean b1) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint publish(String s, Buffer buffer, MqttQoS mqttQoS, boolean b, boolean b1, Handler<AsyncResult<Integer>> handler) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint publish(String s, Buffer buffer, MqttQoS mqttQoS, boolean b, boolean b1, int i, Handler<AsyncResult<Integer>> handler) {
-                return null;
-            }
-
-            @Override
-            public MqttEndpoint pong() {
-                return null;
-            }
-        });
-        Set<Tree<MqttEndpoint>> objects = new HashSet<>();
-        long l = System.currentTimeMillis();
-        String[] publish = "tuoren/123/ceshi/123".split("/");
-        findMatchNodes(ROOT,publish,0,objects);
-        System.out.println(System.currentTimeMillis()-l);
-    }
-
-}
-

+ 24 - 11
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/topic/DeviceTopicMeasurement.java

@@ -1,7 +1,9 @@
 package org.jetlinks.community.device.measurements.topic;
 
 import io.vavr.control.Option;
+import org.hswebframework.utils.time.DateFormatter;
 import org.jetlinks.community.dashboard.*;
+import org.jetlinks.community.dashboard.measurements.SystemMemoryMeasurementProvider;
 import org.jetlinks.community.dashboard.supports.StaticMeasurement;
 import org.jetlinks.community.timeseries.TimeSeriesManager;
 import org.jetlinks.core.device.DeviceOperator;
@@ -15,6 +17,9 @@ import org.jetlinks.core.metadata.types.ArrayType;
 import org.jetlinks.core.metadata.types.StringType;
 import org.jetlinks.core.utils.TopicUtils;
 import reactor.core.publisher.Flux;
+
+import java.time.Duration;
+import java.util.Date;
 import java.util.Map;
 import java.util.Optional;
 import java.util.function.Function;
@@ -70,21 +75,29 @@ public class DeviceTopicMeasurement extends StaticMeasurement {
         @Override
         public Flux<MeasurementValue> getValue(MeasurementParameter parameter) {
 
-
-            //通过订阅消息来统计设备订阅主题消息
-            return eventBus
-                .subscribe(Subscription.of("real-time-device-topics",
-                    String.format("/dashboard/device/%s/changed/topics",parameter.get("deviceId").orElse("null-")), Subscription.Feature.local))
-                .doOnNext(TopicPayload::release)
-                .map(data->{
-                    String deviceId=String.valueOf( parameter.get("deviceId").orElse("null-"));
-                    return deviceRegistry.getDevice(deviceId)
-                        .map(DeviceOperator::getTopics);
-                })
+           return  Flux.interval(Duration.ofSeconds(2))
+                .map(t-> deviceRegistry.
+                    getDevice(String.valueOf( parameter.get("deviceId").orElse("null-")))
+                    .map(DeviceOperator::getTopics))
                 .flatMap(Function.identity())
                 .flatMap(data -> Flux.just(SimpleMeasurementValue.of(
                     data,
                     System.currentTimeMillis())));
+            //通过订阅消息来统计设备订阅主题消息
+
+//            return eventBus
+//                .subscribe(Subscription.of("real-time-device-topics",
+//                    String.format("/dashboard/device/%s/changed/topics",parameter.get("deviceId").orElse("null-")), Subscription.Feature.local))
+//                .doOnNext(TopicPayload::release)
+//                .map(data->{
+//                    String deviceId=String.valueOf( parameter.get("deviceId").orElse("null-"));
+//                    return deviceRegistry.getDevice(deviceId)
+//                        .map(DeviceOperator::getTopics);
+//                })
+//                .flatMap(Function.identity())
+//                .flatMap(data -> Flux.just(SimpleMeasurementValue.of(
+//                    data,
+//                    System.currentTimeMillis())));
         }
     }
 }