18339543638 hace 4 años
padre
commit
9afd9f897e
Se han modificado 19 ficheros con 1095 adiciones y 122 borrados
  1. 1 0
      jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/spring/SpringMessageBroker.java
  2. 4 0
      jetlinks-components/network-component/mqtt-component/pom.xml
  3. 6 1
      jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/client/MqttClientProvider.java
  4. 30 8
      jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/client/VertxMqttClient.java
  5. 834 0
      jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/TopicHelper.java
  6. 45 21
      jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttConnection.java
  7. 6 6
      jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttServer.java
  8. 6 5
      jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttServerProvider.java
  9. 47 47
      jetlinks-components/network-component/mqtt-component/src/test/java/org/jetlinks/community/network/mqtt/VertxMqttSslProviderTest.java
  10. 70 19
      jetlinks-components/network-component/mqtt-component/src/test/java/org/jetlinks/community/network/mqtt/client/MqttClientProviderTest.java
  11. 1 0
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/message/DeviceMessageMeasurement.java
  12. 1 0
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageSendLogInterceptor.java
  13. 3 0
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/LocalDeviceInstanceService.java
  14. 1 1
      jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/web/NetworkConfigController.java
  15. 4 0
      jetlinks-standalone/pom.xml
  16. 11 5
      jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/JetLinksApplication.java
  17. 7 1
      jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/authorize/LoginEvent.java
  18. 11 8
      jetlinks-standalone/src/main/resources/application.yml
  19. 7 0
      pom.xml

+ 1 - 0
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/spring/SpringMessageBroker.java

@@ -7,6 +7,7 @@ import org.jetlinks.community.gateway.annotation.Subscribe;
 import org.jetlinks.core.event.EventBus;
 import org.jetlinks.core.event.Subscription;
 import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.config.BeanPostProcessor;
 import org.springframework.core.annotation.AnnotatedElementUtils;
 import org.springframework.core.annotation.AnnotationAttributes;

+ 4 - 0
jetlinks-components/network-component/mqtt-component/pom.xml

@@ -48,5 +48,9 @@
             <version>${project.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>cn.hutool</groupId>
+            <artifactId>hutool-all</artifactId>
+        </dependency>
     </dependencies>
 </project>

+ 6 - 1
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/client/MqttClientProvider.java

@@ -10,6 +10,7 @@ import org.hswebframework.web.utils.ExpressionUtils;
 import org.jetlinks.community.network.*;
 import org.jetlinks.community.network.security.CertificateManager;
 import org.jetlinks.community.network.security.VertxKeyCertTrustOptions;
+import org.jetlinks.core.message.DeviceLogMessage;
 import org.jetlinks.core.metadata.ConfigMetadata;
 import org.jetlinks.core.metadata.DefaultConfigMetadata;
 import org.jetlinks.core.metadata.types.BooleanType;
@@ -67,16 +68,20 @@ public class MqttClientProvider implements NetworkProvider<MqttClientProperties>
         initMqttClient(mqttClient, properties);
     }
 
+    public static void main(String[] args) {
+
+    }
     public void initMqttClient(VertxMqttClient mqttClient, MqttClientProperties properties) {
         mqttClient.setLoading(true);
         MqttClient client = MqttClient.create(vertx, properties.getOptions());
         mqttClient.setClient(client);
         client.connect(properties.getPort(), properties.getHost(), result -> {
             mqttClient.setLoading(false);
+            //todo 添加心跳
             if (!result.succeeded()) {
                 log.warn("connect mqtt [{}] error", properties.getId(), result.cause());
             } else {
-                log.debug("connect mqtt [{}] success,host:{},port:{}", properties.getId(),properties.getHost(),properties.getPort());
+                log.debug("connect mqtt [{}] success,clientId:{},host:{},port:{},{}", mqttClient.getClient().clientId(),properties.getId(),properties.getHost(),properties.getPort(),mqttClient);
             }
         });
     }

+ 30 - 8
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/client/VertxMqttClient.java

@@ -18,8 +18,11 @@ import reactor.core.publisher.Mono;
 import reactor.util.function.Tuple3;
 import reactor.util.function.Tuples;
 
+import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -35,7 +38,7 @@ public class VertxMqttClient implements MqttClient {
 
     private volatile boolean loading;
 
-    private final List<Runnable> loadSuccessListener = new CopyOnWriteArrayList<>();
+    private volatile List<Runnable> loadSuccessListener = new CopyOnWriteArrayList<>();
 
     public void setLoading(boolean loading) {
         this.loading = loading;
@@ -95,6 +98,9 @@ public class VertxMqttClient implements MqttClient {
     }
 
     private void reSubscribe() {
+        if(subscriber.getTopics()==null||subscriber.getTopics().length==0){
+            return;
+        }
         subscriber
             .findTopic("/**")
             .filter(topic -> topic.getSubscribers().size() > 0)
@@ -141,9 +147,9 @@ public class VertxMqttClient implements MqttClient {
                     if (sinkTopic.unsubscribe(topicQos).size() > 0) {
                         client.unsubscribe(convertMqttTopic(topic), result -> {
                             if (result.succeeded()) {
-                                log.debug("unsubscribe mqtt topic {}", topic);
+                                log.info("unsubscribe mqtt topic {}", topic);
                             } else {
-                                log.debug("unsubscribe mqtt topic {} error", topic, result.cause());
+                                log.info("unsubscribe mqtt topic {} error", topic, result.cause());
                             }
                         });
                     }
@@ -151,17 +157,32 @@ public class VertxMqttClient implements MqttClient {
 
                 //首次订阅
                 if (isAlive() && first) {
-                    log.debug("subscribe mqtt topic {}", topic);
+//                    log.debug("subscribe mqtt topic {}",this.client.clientId(), topic);
+                    client.subscribe(convertMqttTopic(topic), qos, result -> {
+                        if (!result.succeeded()) {
+                            sink.error(result.cause());
+                        }else {
+
+                        }
+                    });
+                }else if(isAlive()&&loadSuccessListener.size()!=0){
                     client.subscribe(convertMqttTopic(topic), qos, result -> {
                         if (!result.succeeded()) {
                             sink.error(result.cause());
+                        }else {
+                            log.info("sucess subscribe mqtt topic {}",this.client.clientId(), topic);
                         }
                     });
+                }else if(!isAlive()){
+                    loadSuccessListener
+                        .add(() ->
+                            subscribe(Collections.singletonList(topic),qos)
+                                .doOnComplete(sink::complete)
+                                .doOnError(sink::error)
+                                .subscribe());
                 }
             }
-
             sink.onDispose(composite);
-
         });
     }
 
@@ -188,12 +209,13 @@ public class VertxMqttClient implements MqttClient {
     @Override
     public Mono<Void> publish(MqttMessage message) {
         if (loading) {
-            return Mono.create(sink ->
+            return Mono.create(sink ->{
                 loadSuccessListener
                     .add(() -> doPublish(message)
                         .doOnSuccess(sink::success)
                         .doOnError(sink::error)
-                        .subscribe()));
+                        .subscribe());
+                });
         }
         return doPublish(message);
     }

+ 834 - 0
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/TopicHelper.java

@@ -0,0 +1,834 @@
+package org.jetlinks.community.network.mqtt.server;
+
+
+import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
+import io.netty.handler.codec.mqtt.MqttQoS;
+import io.vertx.core.AsyncResult;
+import io.vertx.core.Handler;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.mqtt.MqttAuth;
+import io.vertx.mqtt.MqttEndpoint;
+import io.vertx.mqtt.MqttWill;
+import io.vertx.mqtt.messages.MqttPublishMessage;
+import io.vertx.mqtt.messages.MqttSubscribeMessage;
+import io.vertx.mqtt.messages.MqttUnsubscribeMessage;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+import javax.net.ssl.SSLSession;
+import java.util.*;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName TopicHelper.java
+ * @Description TODO
+ * @createTime 2021年07月07日 16:35:00
+ */
+public class TopicHelper {
+    private static final String ENDPOINTS="endpoints";
+    private static final String CHILDREN="childrenNode";
+
+
+    private static final Tree<MqttEndpoint> ROOT =new Tree<>();
+    //添加订阅主题
+    public static void  addTopic(String topic,MqttEndpoint endpoint){
+        if(topic.startsWith("/")){
+            topic=topic.substring(1);
+        }
+        String[] subTopics = topic.split("/");
+        addTopic(ROOT,subTopics,0,endpoint);
+    }
+
+    /**
+     * 删除相关主题下订阅通道
+     * @param topic
+     * @return
+     */
+    public static Set<MqttEndpoint>  deleteNodeEndPoint(String topic,MqttEndpoint endpoint){
+        if(topic.startsWith("/")){
+            topic=topic.substring(1);
+        }
+        String[] subTopics = topic.split("/");
+        Set<Tree<MqttEndpoint>> mqttEndpoints = new HashSet<>();
+        findMatchNodes(ROOT,subTopics,0,mqttEndpoints);
+        Set<MqttEndpoint> result = new HashSet<>();
+        mqttEndpoints.stream().filter(mqttEndpointTree -> mqttEndpointTree.get(ENDPOINTS)!=null)
+            .forEach(mqttEndpointTree -> result.addAll((Collection<? extends MqttEndpoint>) mqttEndpointTree.get(ENDPOINTS)));
+        return result;
+    }
+    /**
+     * 找到符合发布主题的所有订阅主题
+     * @param topic
+     * @return
+     */
+    public static Set<MqttEndpoint>  findMatchNodes(String topic){
+        if(topic.startsWith("/")){
+            topic=topic.substring(1);
+        }
+        String[] subTopics = topic.split("/");
+        Set<Tree<MqttEndpoint>> mqttEndpoints = new HashSet<>();
+        findMatchNodes(ROOT,subTopics,0,mqttEndpoints);
+        Set<MqttEndpoint> result = new HashSet<>();
+        mqttEndpoints.stream().filter(mqttEndpointTree -> mqttEndpointTree.get(ENDPOINTS)!=null)
+            .forEach(mqttEndpointTree -> result.addAll((Collection<? extends MqttEndpoint>) mqttEndpointTree.get(ENDPOINTS)));
+        return result;
+    }
+    @SuppressWarnings("unchecked")
+    private static synchronized void addTopic(Tree<MqttEndpoint> node,String[] topics,int pos,MqttEndpoint endpoint){
+        if(pos==topics.length){
+            //当到达最后的节点时
+            return;
+        }
+        String topic = topics[pos];
+        Set<Tree<MqttEndpoint>> children =
+            (Set<Tree<MqttEndpoint>>) Optional.ofNullable(node.get(CHILDREN)).orElse(new HashSet<>());
+        Tree<MqttEndpoint> eqTopic=Optional.ofNullable(findNode(children,topic)).
+            orElse(new Tree<>());
+        if(eqTopic.getParent()==null){
+            eqTopic.setParent(node);
+            eqTopic.setName(topic);
+        }
+        children.add(eqTopic);
+        if(pos==topics.length-1){
+            Set<MqttEndpoint> endpointSet = (Set<MqttEndpoint>) Optional.ofNullable(eqTopic.get(ENDPOINTS))
+                .orElse(new HashSet<>());
+            endpointSet.add(endpoint);
+            eqTopic.put(ENDPOINTS,endpointSet);
+        }
+        node.put(CHILDREN,children);
+        addTopic(eqTopic,topics,++pos,endpoint);
+    }
+
+    @SuppressWarnings("unchecked")
+    private static Set<Tree<MqttEndpoint>> findMatchNodes(Tree<MqttEndpoint> root,String[] topics,int pos,Set<Tree<MqttEndpoint>> result){
+        Set<Tree<MqttEndpoint>> children = Optional.ofNullable((Set<Tree<MqttEndpoint>>) Optional.ofNullable(root)
+            .orElse(new Tree<>()).get(CHILDREN)).orElse(new HashSet<>());
+        if(pos==topics.length){
+            //当到达最后的节点时
+            return children;
+        }
+        String topic = topics[pos];
+        for (Tree<MqttEndpoint> child : children) {
+            if(child.getName().equals("#")){
+                result.add(child);
+            }else if(child.getName().equals("+")||child.getName().equals(topic)){
+                if(pos==topics.length-1){
+                    result.add(child);
+                }
+                findMatchNodes(child,topics,pos+1,result);
+
+            }
+        }
+        return children;
+    }
+
+    private static Tree<MqttEndpoint> findNode(Set<Tree<MqttEndpoint>> children,String name){
+        for (Tree<MqttEndpoint> child : children) {
+            if(child.getName().equals(name)){
+                return child;
+            }
+        }
+        return null;
+    }
+
+    @Data
+    @EqualsAndHashCode(of = {"name","parent"},callSuper = false)
+    static class Tree<T> extends LinkedHashMap<String, Object> {
+        private String id;
+        private String name;
+        private Tree<T> parent;
+    }
+
+    public static void main(String[] args) {
+        String[] subTopics = "tuoren/+/ceshi/+".split("/");
+        String[] subTopics2 = "tuoren/+/ceshi/#".split("/");
+        String[] subTopics3 = "tuoren/+/+/#".split("/");
+        addTopic(ROOT, subTopics, 0, new MqttEndpoint() {
+            @Override
+            public void close() {
+
+            }
+
+            @Override
+            public SocketAddress remoteAddress() {
+                return null;
+            }
+
+            @Override
+            public SocketAddress localAddress() {
+                return null;
+            }
+
+            @Override
+            public boolean isSsl() {
+                return false;
+            }
+
+            @Override
+            public SSLSession sslSession() {
+                return null;
+            }
+
+            @Override
+            public String clientIdentifier() {
+                return null;
+            }
+
+            @Override
+            public MqttAuth auth() {
+                return null;
+            }
+
+            @Override
+            public MqttWill will() {
+                return null;
+            }
+
+            @Override
+            public int protocolVersion() {
+                return 0;
+            }
+
+            @Override
+            public String protocolName() {
+                return null;
+            }
+
+            @Override
+            public boolean isCleanSession() {
+                return false;
+            }
+
+            @Override
+            public int keepAliveTimeSeconds() {
+                return 0;
+            }
+
+            @Override
+            public int lastMessageId() {
+                return 0;
+            }
+
+            @Override
+            public void subscriptionAutoAck(boolean b) {
+
+            }
+
+            @Override
+            public boolean isSubscriptionAutoAck() {
+                return false;
+            }
+
+            @Override
+            public MqttEndpoint publishAutoAck(boolean b) {
+                return null;
+            }
+
+            @Override
+            public boolean isPublishAutoAck() {
+                return false;
+            }
+
+            @Override
+            public MqttEndpoint autoKeepAlive(boolean b) {
+                return null;
+            }
+
+            @Override
+            public boolean isAutoKeepAlive() {
+                return false;
+            }
+
+            @Override
+            public boolean isConnected() {
+                return false;
+            }
+
+            @Override
+            public MqttEndpoint setClientIdentifier(String s) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint disconnectHandler(Handler<Void> handler) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint subscribeHandler(Handler<MqttSubscribeMessage> handler) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint unsubscribeHandler(Handler<MqttUnsubscribeMessage> handler) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint publishHandler(Handler<MqttPublishMessage> handler) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint publishAcknowledgeHandler(Handler<Integer> handler) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint publishReceivedHandler(Handler<Integer> handler) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint publishReleaseHandler(Handler<Integer> handler) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint publishCompletionHandler(Handler<Integer> handler) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint pingHandler(Handler<Void> handler) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint closeHandler(Handler<Void> handler) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint exceptionHandler(Handler<Throwable> handler) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint accept() {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint accept(boolean b) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint reject(MqttConnectReturnCode mqttConnectReturnCode) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint subscribeAcknowledge(int i, List<MqttQoS> list) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint unsubscribeAcknowledge(int i) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint publishAcknowledge(int i) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint publishReceived(int i) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint publishRelease(int i) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint publishComplete(int i) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint publish(String s, Buffer buffer, MqttQoS mqttQoS, boolean b, boolean b1) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint publish(String s, Buffer buffer, MqttQoS mqttQoS, boolean b, boolean b1, Handler<AsyncResult<Integer>> handler) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint publish(String s, Buffer buffer, MqttQoS mqttQoS, boolean b, boolean b1, int i, Handler<AsyncResult<Integer>> handler) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint pong() {
+                return null;
+            }
+        });
+        addTopic(ROOT, subTopics2, 0, new MqttEndpoint() {
+            @Override
+            public void close() {
+
+            }
+
+            @Override
+            public SocketAddress remoteAddress() {
+                return null;
+            }
+
+            @Override
+            public SocketAddress localAddress() {
+                return null;
+            }
+
+            @Override
+            public boolean isSsl() {
+                return false;
+            }
+
+            @Override
+            public SSLSession sslSession() {
+                return null;
+            }
+
+            @Override
+            public String clientIdentifier() {
+                return null;
+            }
+
+            @Override
+            public MqttAuth auth() {
+                return null;
+            }
+
+            @Override
+            public MqttWill will() {
+                return null;
+            }
+
+            @Override
+            public int protocolVersion() {
+                return 0;
+            }
+
+            @Override
+            public String protocolName() {
+                return null;
+            }
+
+            @Override
+            public boolean isCleanSession() {
+                return false;
+            }
+
+            @Override
+            public int keepAliveTimeSeconds() {
+                return 0;
+            }
+
+            @Override
+            public int lastMessageId() {
+                return 0;
+            }
+
+            @Override
+            public void subscriptionAutoAck(boolean b) {
+
+            }
+
+            @Override
+            public boolean isSubscriptionAutoAck() {
+                return false;
+            }
+
+            @Override
+            public MqttEndpoint publishAutoAck(boolean b) {
+                return null;
+            }
+
+            @Override
+            public boolean isPublishAutoAck() {
+                return false;
+            }
+
+            @Override
+            public MqttEndpoint autoKeepAlive(boolean b) {
+                return null;
+            }
+
+            @Override
+            public boolean isAutoKeepAlive() {
+                return false;
+            }
+
+            @Override
+            public boolean isConnected() {
+                return false;
+            }
+
+            @Override
+            public MqttEndpoint setClientIdentifier(String s) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint disconnectHandler(Handler<Void> handler) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint subscribeHandler(Handler<MqttSubscribeMessage> handler) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint unsubscribeHandler(Handler<MqttUnsubscribeMessage> handler) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint publishHandler(Handler<MqttPublishMessage> handler) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint publishAcknowledgeHandler(Handler<Integer> handler) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint publishReceivedHandler(Handler<Integer> handler) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint publishReleaseHandler(Handler<Integer> handler) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint publishCompletionHandler(Handler<Integer> handler) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint pingHandler(Handler<Void> handler) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint closeHandler(Handler<Void> handler) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint exceptionHandler(Handler<Throwable> handler) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint accept() {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint accept(boolean b) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint reject(MqttConnectReturnCode mqttConnectReturnCode) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint subscribeAcknowledge(int i, List<MqttQoS> list) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint unsubscribeAcknowledge(int i) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint publishAcknowledge(int i) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint publishReceived(int i) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint publishRelease(int i) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint publishComplete(int i) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint publish(String s, Buffer buffer, MqttQoS mqttQoS, boolean b, boolean b1) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint publish(String s, Buffer buffer, MqttQoS mqttQoS, boolean b, boolean b1, Handler<AsyncResult<Integer>> handler) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint publish(String s, Buffer buffer, MqttQoS mqttQoS, boolean b, boolean b1, int i, Handler<AsyncResult<Integer>> handler) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint pong() {
+                return null;
+            }
+        });
+        addTopic(ROOT, subTopics3, 0, new MqttEndpoint() {
+            @Override
+            public void close() {
+
+            }
+
+            @Override
+            public SocketAddress remoteAddress() {
+                return null;
+            }
+
+            @Override
+            public SocketAddress localAddress() {
+                return null;
+            }
+
+            @Override
+            public boolean isSsl() {
+                return false;
+            }
+
+            @Override
+            public SSLSession sslSession() {
+                return null;
+            }
+
+            @Override
+            public String clientIdentifier() {
+                return null;
+            }
+
+            @Override
+            public MqttAuth auth() {
+                return null;
+            }
+
+            @Override
+            public MqttWill will() {
+                return null;
+            }
+
+            @Override
+            public int protocolVersion() {
+                return 0;
+            }
+
+            @Override
+            public String protocolName() {
+                return null;
+            }
+
+            @Override
+            public boolean isCleanSession() {
+                return false;
+            }
+
+            @Override
+            public int keepAliveTimeSeconds() {
+                return 0;
+            }
+
+            @Override
+            public int lastMessageId() {
+                return 0;
+            }
+
+            @Override
+            public void subscriptionAutoAck(boolean b) {
+
+            }
+
+            @Override
+            public boolean isSubscriptionAutoAck() {
+                return false;
+            }
+
+            @Override
+            public MqttEndpoint publishAutoAck(boolean b) {
+                return null;
+            }
+
+            @Override
+            public boolean isPublishAutoAck() {
+                return false;
+            }
+
+            @Override
+            public MqttEndpoint autoKeepAlive(boolean b) {
+                return null;
+            }
+
+            @Override
+            public boolean isAutoKeepAlive() {
+                return false;
+            }
+
+            @Override
+            public boolean isConnected() {
+                return false;
+            }
+
+            @Override
+            public MqttEndpoint setClientIdentifier(String s) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint disconnectHandler(Handler<Void> handler) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint subscribeHandler(Handler<MqttSubscribeMessage> handler) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint unsubscribeHandler(Handler<MqttUnsubscribeMessage> handler) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint publishHandler(Handler<MqttPublishMessage> handler) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint publishAcknowledgeHandler(Handler<Integer> handler) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint publishReceivedHandler(Handler<Integer> handler) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint publishReleaseHandler(Handler<Integer> handler) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint publishCompletionHandler(Handler<Integer> handler) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint pingHandler(Handler<Void> handler) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint closeHandler(Handler<Void> handler) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint exceptionHandler(Handler<Throwable> handler) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint accept() {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint accept(boolean b) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint reject(MqttConnectReturnCode mqttConnectReturnCode) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint subscribeAcknowledge(int i, List<MqttQoS> list) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint unsubscribeAcknowledge(int i) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint publishAcknowledge(int i) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint publishReceived(int i) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint publishRelease(int i) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint publishComplete(int i) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint publish(String s, Buffer buffer, MqttQoS mqttQoS, boolean b, boolean b1) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint publish(String s, Buffer buffer, MqttQoS mqttQoS, boolean b, boolean b1, Handler<AsyncResult<Integer>> handler) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint publish(String s, Buffer buffer, MqttQoS mqttQoS, boolean b, boolean b1, int i, Handler<AsyncResult<Integer>> handler) {
+                return null;
+            }
+
+            @Override
+            public MqttEndpoint pong() {
+                return null;
+            }
+        });
+        Set<Tree<MqttEndpoint>> objects = new HashSet<>();
+        long l = System.currentTimeMillis();
+        String[] publish = "tuoren/123/ceshi/123".split("/");
+        findMatchNodes(ROOT,publish,0,objects);
+        System.out.println(System.currentTimeMillis()-l);
+    }
+
+}
+

+ 45 - 21
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttConnection.java

@@ -1,8 +1,12 @@
 package org.jetlinks.community.network.mqtt.server.vertx;
 
+import cn.hutool.core.lang.tree.Tree;
+import cn.hutool.core.lang.tree.TreeUtil;
+import cn.hutool.extra.spring.SpringUtil;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.handler.codec.http2.Http2Connection;
 import io.netty.handler.codec.mqtt.*;
 import io.vertx.core.buffer.Buffer;
 import io.vertx.core.net.SocketAddress;
@@ -14,13 +18,15 @@ import io.vertx.mqtt.messages.MqttUnsubscribeMessage;
 import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-import org.jetlinks.community.network.mqtt.server.MqttConnection;
-import org.jetlinks.community.network.mqtt.server.MqttPublishing;
-import org.jetlinks.community.network.mqtt.server.MqttSubscription;
-import org.jetlinks.community.network.mqtt.server.MqttUnSubscription;
+import org.jetlinks.community.network.mqtt.server.*;
+import org.jetlinks.core.event.EventBus;
+import org.jetlinks.core.message.MessageType;
 import org.jetlinks.core.message.codec.MqttMessage;
 import org.jetlinks.core.message.codec.SimpleMqttMessage;
 import org.jetlinks.core.server.mqtt.MqttAuth;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.util.StringUtils;
 import reactor.core.publisher.EmitterProcessor;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.FluxSink;
@@ -29,7 +35,7 @@ import reactor.core.publisher.Mono;
 import javax.annotation.Nonnull;
 import java.net.InetSocketAddress;
 import java.time.Duration;
-import java.util.Optional;
+import java.util.*;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
@@ -38,7 +44,6 @@ import java.util.stream.Collectors;
 
 @Slf4j
 class VertxMqttConnection implements MqttConnection {
-
     private final MqttEndpoint endpoint;
     private long keepAliveTimeoutMs;
     @Getter
@@ -47,7 +52,6 @@ class VertxMqttConnection implements MqttConnection {
 
     private final EmitterProcessor<MqttPublishing> messageProcessor = EmitterProcessor.create(false);
 
-
     private ScheduledFuture<?> pingRespTimeout;
 
     private final FluxSink<MqttPublishing> publishingFluxSink = messageProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
@@ -55,10 +59,10 @@ class VertxMqttConnection implements MqttConnection {
     private final EmitterProcessor<MqttSubscription> subscriptionProcessor = EmitterProcessor.create(false);
     private final EmitterProcessor<MqttUnSubscription> unsubscription = EmitterProcessor.create(false);
 
-
     private Optional<MqttMessage> willMessage=null;
 
     private static final MqttAuth emptyAuth = new MqttAuth() {
+
         @Override
         public String getUsername() {
             return "";
@@ -120,45 +124,65 @@ class VertxMqttConnection implements MqttConnection {
         if (accepted) {
             return this;
         }
+        accepted = true;
         //todo 需要验证密码
-        String protocolName = endpoint.protocolName();
-        int protocolVersion = endpoint.protocolVersion();
         log.debug("mqtt client [{}] connected", getClientId());
-        accepted = true;
         ping();
         try {
             if (!endpoint.isConnected()) {
-                //心跳检测 todo 改为配置文件配置
-                heartIdleHandler(endpoint,5,TimeUnit.SECONDS);
-                //放置遗言
-                this.willMessage = this.getWillMessage();
-                endpoint.accept();
+
+
             }
-            initSubscript();
+            //心跳检测 todo 改为配置文件配置
+            heartIdleHandler(endpoint,100,TimeUnit.SECONDS);
+            //放置遗言
+            this.willMessage = this.getWillMessage();
+            //获取设备id
+            String deviceId=endpoint.clientIdentifier();
+
         } catch (Exception e) {
             close().subscribe();
             log.warn(e.getMessage(), e);
             return this;
         }
+        initSubscript();
         init();
+        endpoint.accept();
         return this;
     }
 
+
+    private static final Map<String, List<MqttEndpoint>> map = new HashMap<>();
     /**
      * 初始化监听器
      */
     private void initSubscript() {
+
         //todo 订阅处理
         subscriptionProcessor.subscribe(mqttSubscription -> {
-
+            MqttSubscribeMessage message = mqttSubscription.getMessage();
+            List<MqttTopicSubscription> mqttTopicSubscriptions = message.topicSubscriptions();
+            for (MqttTopicSubscription subscription : mqttTopicSubscriptions) {
+                TopicHelper.addTopic(subscription.topicName(),endpoint);
+            }
         });
         //取消订阅处理 todo
         unsubscription.subscribe(mqttUnSubscription -> {
-
+            List<String> topics = mqttUnSubscription.getMessage().topics();
+            for (String topic : topics) {
+                TopicHelper.deleteNodeEndPoint(topic,endpoint);
+            }
         });
+
         //发布处理 todo
         messageProcessor.subscribe(publishing -> {
-
+            //  /tuoren/yanjiuyuan
+            String publishTopic = publishing.getMessage().getTopic();
+            Set<MqttEndpoint> mqttEndpoints = TopicHelper.findMatchNodes(publishTopic);
+            // /tuoren/yanjiuyuan /tuoren/#
+            mqttEndpoints.parallelStream().forEach(mqttEndpoint ->{
+                mqttEndpoint.publish(publishTopic,Buffer.buffer(publishing.getMessage().payloadAsString(),"utf-8"),MqttQoS.AT_MOST_ONCE,false,false);
+            });
         });
     }
 
@@ -233,7 +257,7 @@ class VertxMqttConnection implements MqttConnection {
             })
             .subscribeHandler(msg -> {
                 ping();
-                 VertxMqttSubscription subscription = new VertxMqttSubscription( msg,false);
+                VertxMqttSubscription subscription = new VertxMqttSubscription( msg,false);
                 boolean hasDownstream = this.subscriptionProcessor.hasDownstreams();
                 if (autoAckSub || !hasDownstream) {
                     subscription.acknowledge();

+ 6 - 6
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttServer.java

@@ -1,6 +1,5 @@
 package org.jetlinks.community.network.mqtt.server.vertx;
 
-import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
 import lombok.extern.slf4j.Slf4j;
 import org.jetlinks.community.network.DefaultNetworkType;
 import org.jetlinks.community.network.NetworkType;
@@ -28,6 +27,7 @@ public class VertxMqttServer implements MqttServer {
         this.id = id;
     }
 
+
     public void setMqttServer(Collection<io.vertx.mqtt.MqttServer> mqttServer) {
         if (this.mqttServer != null && !this.mqttServer.isEmpty()) {
             shutdown();
@@ -47,11 +47,11 @@ public class VertxMqttServer implements MqttServer {
 //                        endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
 //                        return;
 //                    }
-                    if (connectionProcessor.getPending() >= 10240) {
-                        log.warn("too many no handle mqtt connection : {}", connectionProcessor.getPending());
-                        endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
-                        return;
-                    }
+//                    if (connectionProcessor.getPending() >= 10240) {
+//                        log.warn("too many no handle mqtt connection : {}", connectionProcessor.getPending());
+//                        endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
+//                        return;
+//                    }
 //                    endpoint.accept(false);
                     sink.next(new VertxMqttConnection(endpoint).accept());
                 }).listen(result->{

+ 6 - 5
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttServerProvider.java

@@ -4,10 +4,14 @@ import com.alibaba.fastjson.JSONObject;
 import io.vertx.core.Vertx;
 import io.vertx.mqtt.MqttServer;
 import io.vertx.mqtt.MqttServerOptions;
+import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
 import org.hswebframework.web.bean.FastBeanCopier;
 import org.jetlinks.community.network.*;
+import org.jetlinks.community.network.mqtt.client.MqttClientProperties;
+import org.jetlinks.community.network.mqtt.client.MqttClientProvider;
+import org.jetlinks.community.network.mqtt.client.VertxMqttClient;
 import org.jetlinks.community.network.security.CertificateManager;
 import org.jetlinks.community.network.security.VertxKeyCertTrustOptions;
 import org.jetlinks.core.metadata.ConfigMetadata;
@@ -19,19 +23,16 @@ import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.UUID;
 
 @Component
 @Slf4j
+@AllArgsConstructor
 public class VertxMqttServerProvider implements NetworkProvider<VertxMqttServerProperties> {
 
     private final CertificateManager certificateManager;
     private final Vertx vertx;
 
-    public VertxMqttServerProvider(CertificateManager certificateManager, Vertx vertx) {
-        this.certificateManager = certificateManager;
-        this.vertx = vertx;
-    }
-
     @Nonnull
     @Override
     public NetworkType getType() {

+ 47 - 47
jetlinks-components/network-component/mqtt-component/src/test/java/org/jetlinks/community/network/mqtt/VertxMqttSslProviderTest.java

@@ -31,52 +31,52 @@ public class VertxMqttSslProviderTest {
 
     static MqttServer mqttServer;
 
-    @Test
-    @SneakyThrows
-    public void test() {
-        VertxMqttServerProvider mqttServerManager = new VertxMqttServerProvider(id -> Mono.empty(), vertx);
-
-        DefaultCertificate serverCert = new DefaultCertificate("server", "test");
-        DefaultCertificate clientCert = new DefaultCertificate("client", "test");
-
-        byte[] serverKs = StreamUtils.copyToByteArray(new ClassPathResource("server.p12").getInputStream());
-
-        byte[] clientKs = StreamUtils.copyToByteArray(new ClassPathResource("client.p12").getInputStream());
-
-        byte[] trust = StreamUtils.copyToByteArray(new ClassPathResource("trustStore.p12").getInputStream());
-
-        serverCert.initPfxKey(serverKs, "endPass").initPfxTrust(trust, "rootPass");
-        clientCert.initPfxKey(clientKs, "endPass").initPfxTrust(trust, "rootPass");
-
-        VertxMqttServerProperties properties = new VertxMqttServerProperties();
-        properties.setId("test");
-        properties.setInstance(4);
-        properties.setSsl(true);
-        properties.setOptions(new MqttServerOptions()
-                .setSsl(true)
-                .setKeyCertOptions(new VertxKeyCertTrustOptions(serverCert))
-                .setTrustOptions(new VertxKeyCertTrustOptions(serverCert))
-                .setPort(1888));
-
-        mqttServer = mqttServerManager.createNetwork(properties);
-
-        MqttClientProperties propertiesClient = new MqttClientProperties();
-        propertiesClient.setHost("127.0.0.1");
-        propertiesClient.setPort(1888);
-        propertiesClient.setOptions(new MqttClientOptions()
-                .setSsl(true)
-                .setKeyCertOptions(new VertxKeyCertTrustOptions(clientCert))
-                .setTrustOptions(new VertxKeyCertTrustOptions(clientCert)));
-
-        MqttClientProvider provider = new MqttClientProvider(id -> Mono.empty(), vertx,new MockEnvironment());
-        VertxMqttClient client = provider.createNetwork(propertiesClient);
-        mqttServer.handleConnection()
-                .map(MqttConnection::getClientId)
-                .doOnNext(System.out::println)
-                .take(1)
-                .as(StepVerifier::create)
-                .expectNextCount(1)
-                .verifyComplete();
-    }
+//    @Test
+//    @SneakyThrows
+//    public void test() {
+//        VertxMqttServerProvider mqttServerManager = new VertxMqttServerProvider(id -> Mono.empty(), vertx);
+//
+//        DefaultCertificate serverCert = new DefaultCertificate("server", "test");
+//        DefaultCertificate clientCert = new DefaultCertificate("client", "test");
+//
+//        byte[] serverKs = StreamUtils.copyToByteArray(new ClassPathResource("server.p12").getInputStream());
+//
+//        byte[] clientKs = StreamUtils.copyToByteArray(new ClassPathResource("client.p12").getInputStream());
+//
+//        byte[] trust = StreamUtils.copyToByteArray(new ClassPathResource("trustStore.p12").getInputStream());
+//
+//        serverCert.initPfxKey(serverKs, "endPass").initPfxTrust(trust, "rootPass");
+//        clientCert.initPfxKey(clientKs, "endPass").initPfxTrust(trust, "rootPass");
+//
+//        VertxMqttServerProperties properties = new VertxMqttServerProperties();
+//        properties.setId("test");
+//        properties.setInstance(4);
+//        properties.setSsl(true);
+//        properties.setOptions(new MqttServerOptions()
+//                .setSsl(true)
+//                .setKeyCertOptions(new VertxKeyCertTrustOptions(serverCert))
+//                .setTrustOptions(new VertxKeyCertTrustOptions(serverCert))
+//                .setPort(1888));
+//
+//        mqttServer = mqttServerManager.createNetwork(properties);
+//
+//        MqttClientProperties propertiesClient = new MqttClientProperties();
+//        propertiesClient.setHost("127.0.0.1");
+//        propertiesClient.setPort(1888);
+//        propertiesClient.setOptions(new MqttClientOptions()
+//                .setSsl(true)
+//                .setKeyCertOptions(new VertxKeyCertTrustOptions(clientCert))
+//                .setTrustOptions(new VertxKeyCertTrustOptions(clientCert)));
+//
+//        MqttClientProvider provider = new MqttClientProvider(id -> Mono.empty(), vertx,new MockEnvironment());
+//        VertxMqttClient client = provider.createNetwork(propertiesClient);
+//        mqttServer.handleConnection()
+//                .map(MqttConnection::getClientId)
+//                .doOnNext(System.out::println)
+//                .take(1)
+//                .as(StepVerifier::create)
+//                .expectNextCount(1)
+//                .verifyComplete();
+//    }
 
 }

+ 70 - 19
jetlinks-components/network-component/mqtt-component/src/test/java/org/jetlinks/community/network/mqtt/client/MqttClientProviderTest.java

@@ -1,51 +1,102 @@
 package org.jetlinks.community.network.mqtt.client;
 
+import io.netty.buffer.Unpooled;
 import io.netty.handler.codec.mqtt.MqttQoS;
 import io.vertx.core.Vertx;
 import io.vertx.core.buffer.Buffer;
 import io.vertx.mqtt.MqttClientOptions;
 import io.vertx.mqtt.MqttServer;
+import org.jetlinks.community.network.mqtt.server.vertx.VertxMqttServer;
+import org.jetlinks.core.message.Message;
 import org.jetlinks.core.message.codec.MqttMessage;
+import org.jetlinks.core.message.codec.SimpleMqttMessage;
 import org.junit.jupiter.api.Test;
 import org.springframework.mock.env.MockEnvironment;
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
 
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 
 class MqttClientProviderTest {
 
-    private Vertx vertx = Vertx.vertx();
+    private static Vertx vertx = Vertx.vertx();
 
-    @Test
-    void test() {
-        MqttServer server = MqttServer.create(vertx);
 
-        server.endpointHandler(endpoint -> {
-            endpoint
-                    .accept()
-                    .publish("/test", Buffer.buffer("test"), MqttQoS.AT_MOST_ONCE, false, false);
-        }).listen(11223);
+    public static void main(String[] args) throws InterruptedException {
+        vertx.setTimer(100,event -> {});
+        MqttServer server = MqttServer.create(vertx);
+        VertxMqttServer vertxMqttServer = new VertxMqttServer("123");
+        vertxMqttServer.setMqttServer(Collections.singleton(server));
+//
+//        server.endpointHandler(endpoint -> {
+//            endpoint
+//                    .accept()
+//                    .publish("/test", Buffer.buffer("test"), MqttQoS.AT_MOST_ONCE, false, false);
+//        }).listen(11223);
 
-        MqttClientProvider provider = new MqttClientProvider(id -> Mono.empty(), vertx,new MockEnvironment());
+        MqttClientProvider sub = new MqttClientProvider(id -> Mono.empty(), vertx,new MockEnvironment());
 
         MqttClientProperties properties = new MqttClientProperties();
-        properties.setHost("127.0.0.1");
-        properties.setPort(11223);
+        properties.setHost("123.56.154.53");
+        properties.setPort(1883);
         properties.setOptions(new MqttClientOptions());
+        properties.setClientId(UUID.randomUUID().toString());
+        CompletableFuture.runAsync(()->{
 
-        VertxMqttClient client = provider.createNetwork(properties);
-
-        client.subscribe(Arrays.asList("/test"))
+            VertxMqttClient client = sub.createNetwork(properties);
+            Flux<MqttMessage> subscribe = client.subscribe(Arrays.asList("/test"));
+            subscribe
                 .map(MqttMessage::getPayload)
                 .map(payload -> payload.toString(StandardCharsets.UTF_8))
-                .take(1)
-                .as(StepVerifier::create)
-                .expectNext("test")
-                .verifyComplete();
+                .doOnNext(string->{
+                    System.out.println("处理缓存>>>>>>>>>>"+string);
+                })
+            .subscribe(string->{
+                System.out.println("接收到数据>>>>>>>>>>"+string);
+            });
+//                .as(StepVerifier::create)
+//                .expectNext("test")
+//                .verifyComplete();
+        }).exceptionally(e->{
+            e.printStackTrace();
+            return null;
+        });
+//        CompletableFuture.runAsync(()-> {
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+        MqttClientProvider pub = new MqttClientProvider(id -> Mono.empty(), vertx, new MockEnvironment());
+        MqttClientProperties pubProperties = new MqttClientProperties();
+        pubProperties.setHost("123.56.154.53");
+        pubProperties.setPort(1883);
+        pubProperties.setOptions(new MqttClientOptions());
+        pubProperties.setClientId(UUID.randomUUID().toString());
+        VertxMqttClient pubClient = pub.createNetwork(pubProperties);
+        SimpleMqttMessage simpleMqttMessage = new SimpleMqttMessage();
+        simpleMqttMessage.setPayload(Unpooled.wrappedBuffer("123test".getBytes()));
+        simpleMqttMessage.setTopic("/test");
+        simpleMqttMessage.setMessageId(123);
+        simpleMqttMessage.setQosLevel(1);
+        pubClient.publish(simpleMqttMessage).subscribe();
+        simpleMqttMessage.setPayload(Unpooled.wrappedBuffer("123".getBytes()));
+        simpleMqttMessage.setTopic("/test");
+        simpleMqttMessage.setMessageId(456);
+        simpleMqttMessage.setQosLevel(2);
+        simpleMqttMessage.setMessageId(456);
+        pubClient.publish(simpleMqttMessage).subscribe();
+//        });
+        while (true){
 
+        }
 
     }
 

+ 1 - 0
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/message/DeviceMessageMeasurement.java

@@ -18,6 +18,7 @@ import org.jetlinks.core.metadata.DefaultConfigMetadata;
 import org.jetlinks.core.metadata.types.DateTimeType;
 import org.jetlinks.core.metadata.types.IntType;
 import org.jetlinks.core.metadata.types.StringType;
+import org.jetlinks.core.topic.Topic;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 

+ 1 - 0
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageSendLogInterceptor.java

@@ -17,6 +17,7 @@ import org.jetlinks.core.message.property.WritePropertyMessage;
 import org.jetlinks.core.metadata.FunctionMetadata;
 import org.jetlinks.core.metadata.PropertyMetadata;
 import org.jetlinks.core.metadata.ValidateResult;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;

+ 3 - 0
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/LocalDeviceInstanceService.java

@@ -20,6 +20,8 @@ import org.jetlinks.core.device.DeviceConfigKey;
 import org.jetlinks.core.device.DeviceOperator;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.enums.ErrorCode;
+import org.jetlinks.core.event.EventBus;
+import org.jetlinks.core.event.Subscription;
 import org.jetlinks.core.exception.DeviceOperationException;
 import org.jetlinks.core.message.DeviceMessageReply;
 import org.jetlinks.core.message.FunctionInvokeMessageSender;
@@ -116,6 +118,7 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
             ;
     }
 
+    private EventBus eventBus;
     /**
      * 发布设备到设备注册中心
      *

+ 1 - 1
jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/web/NetworkConfigController.java

@@ -33,7 +33,7 @@ import reactor.core.publisher.Mono;
 @RestController
 @RequestMapping("/network/config")
 @Resource(id = "network-config", name = "网络组件配置")
-@Authorize
+//@Authorize
 @Tag(name = "网络组件管理")
 public class NetworkConfigController implements ReactiveServiceCrudController<NetworkConfigEntity, String> {
 

+ 4 - 0
jetlinks-standalone/pom.xml

@@ -53,6 +53,10 @@
     </build>
 
     <dependencies>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+        </dependency>
 
         <dependency>
             <groupId>com.github.ben-manes.caffeine</groupId>

+ 11 - 5
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/JetLinksApplication.java

@@ -14,13 +14,13 @@ import org.springframework.cache.annotation.EnableCaching;
 import org.springframework.context.annotation.Profile;
 import org.springframework.context.event.EventListener;
 import org.springframework.stereotype.Component;
-
+import reactor.core.publisher.Hooks;
 import javax.annotation.PostConstruct;
 
 
 @SpringBootApplication(scanBasePackages = "org.jetlinks.community", exclude = {
-    DataSourceAutoConfiguration.class,
-    ElasticsearchRestClientAutoConfiguration.class
+//    DataSourceAutoConfiguration.class,
+//    ElasticsearchRestClientAutoConfiguration.class
 })
 @EnableCaching
 @EnableEasyormRepository("org.jetlinks.community.**.entity")
@@ -30,7 +30,13 @@ import javax.annotation.PostConstruct;
 public class JetLinksApplication {
 
     public static void main(String[] args) {
-        SpringApplication.run(JetLinksApplication.class, args);
+        try {
+            SpringApplication.run(JetLinksApplication.class, args);
+        }catch (Exception e){
+            e.printStackTrace();
+        }
+
+
     }
 
     @Profile("dev")
@@ -41,7 +47,7 @@ public class JetLinksApplication {
         @PostConstruct
         public void init() {
             // TODO: 2020/1/4 严重影响性能,谨慎开启
-            // Hooks.onOperatorDebug();
+//             Hooks.onOperatorDebug();
         }
 
         @EventListener

+ 7 - 1
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/authorize/LoginEvent.java

@@ -1,5 +1,6 @@
 package org.jetlinks.community.standalone.authorize;
 
+import lombok.extern.slf4j.Slf4j;
 import org.hswebframework.web.authorization.Authentication;
 import org.hswebframework.web.authorization.Dimension;
 import org.hswebframework.web.authorization.Permission;
@@ -8,6 +9,7 @@ import org.jetlinks.community.auth.service.UserDetailService;
 import org.springframework.context.event.EventListener;
 import org.springframework.stereotype.Component;
 
+import javax.annotation.PostConstruct;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -17,9 +19,13 @@ import java.util.stream.Collectors;
  * @since 1.0.0
  */
 @Component
+@Slf4j
 public class LoginEvent {
     private final UserDetailService detailService;
-
+@PostConstruct
+    public void test(){
+      log.info("123");
+    }
     public LoginEvent(UserDetailService detailService) {
         this.detailService = detailService;
     }

+ 11 - 8
jetlinks-standalone/src/main/resources/application.yml

@@ -2,8 +2,8 @@ server:
   port: 8848
 
 spring:
-#  profiles:
-#    active: dev
+  profiles:
+    active: dev
   application:
     name: jetlinks-platform
   jackson:
@@ -27,6 +27,9 @@ spring:
   #        max-wait: 10s
   r2dbc:
     url: r2dbc:postgresql://192.168.104.114:5432/jetlinks
+#    url: r2dbc:mysql://192.168.100.32:3306/jetlinks
+#    username: root
+#    password: 123456
     username: postgres
     password: jetlinks
     pool:
@@ -120,11 +123,11 @@ jetlinks:
       enabled: true # 为true时开启自动加载通过依赖引入的协议包
 logging:
   level:
-    org.jetlinks: debug
+    org.jetlinks: error
     rule.engine: debug
-    org.hswebframework: debug
-    org.springframework.transaction: debug
-    org.springframework.data.r2dbc.connectionfactory: warn
+    org.hswebframework: error
+    org.springframework.transaction: error
+    org.springframework.data.r2dbc.connectionfactory: error
     io.micrometer: warn
     org.hswebframework.expands: error
     system: debug
@@ -136,8 +139,8 @@ logging:
     org.jetlinks.community.elastic.search.service.reactive: trace
     org.jetlinks.community.network: warn
     io.vertx.mqtt.impl: warn
-    #    org.springframework.data.elasticsearch.client: trace
-    #    org.elasticsearch: error
+    org.springframework.data.elasticsearch.client: trace
+#        org.elasticsearch: error
     org.jetlinks.pro.influx: trace
     org.elasticsearch: error
   config: classpath:logback-spring.xml

+ 7 - 0
pom.xml

@@ -30,6 +30,7 @@
         <reactor.excel.version>1.0.0</reactor.excel.version>
         <reactor.ql.version>1.0.11</reactor.ql.version>
         <fastjson.version>1.2.70</fastjson.version>
+        <hutool.version>5.5.8</hutool.version>
     </properties>
 
     <build>
@@ -171,6 +172,12 @@
 
         <dependencies>
 
+            <dependency>
+                <groupId>cn.hutool</groupId>
+                <artifactId>hutool-all</artifactId>
+                <version>${hutool.version}</version>
+            </dependency>
+
             <dependency>
                 <groupId>io.netty</groupId>
                 <artifactId>netty-bom</artifactId>