18339543638 пре 4 година
родитељ
комит
32e2af3b09

+ 19 - 0
jetlinks-components/common-component/src/main/java/org/jetlinks/community/utils/TopicConvertorUtil.java

@@ -0,0 +1,19 @@
+package org.jetlinks.community.utils;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName TopicConvertorUtil.java
+ * @Description TODO
+ * @createTime 2021年07月19日 11:42:00
+ */
+public class TopicConvertorUtil {
+    /**
+     * 将外部主题转换为内部通用主题
+     * @param exteriorTopic
+     * @return
+     */
+    public static String exteriorToInterior(String exteriorTopic){
+        return null;
+    }
+}

+ 23 - 0
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/auth/MqttDefaultAuth.java

@@ -0,0 +1,23 @@
+package org.jetlinks.community.network.mqtt.auth;
+
+import org.jetlinks.core.device.AuthenticationRequest;
+import org.jetlinks.core.device.AuthenticationResponse;
+import org.jetlinks.core.device.DeviceOperator;
+import org.jetlinks.supports.official.JetLinksAuthenticator;
+import reactor.core.publisher.Mono;
+
+import javax.annotation.Nonnull;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName MqttDefaultAut.java
+ * @Description TODO
+ * @createTime 2021年07月20日 14:52:00
+ */
+public class MqttDefaultAuth extends JetLinksAuthenticator {
+    @Override
+    public Mono<AuthenticationResponse> authenticate(@Nonnull AuthenticationRequest request, @Nonnull DeviceOperator deviceOperation) {
+        return Mono.just(AuthenticationResponse.success(deviceOperation.getDeviceId()));
+    }
+}

+ 3 - 0
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/client/MqttClientProvider.java

@@ -16,12 +16,15 @@ import org.jetlinks.core.metadata.DefaultConfigMetadata;
 import org.jetlinks.core.metadata.types.BooleanType;
 import org.jetlinks.core.metadata.types.IntType;
 import org.jetlinks.core.metadata.types.StringType;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.config.BeanPostProcessor;
 import org.springframework.core.env.Environment;
 import org.springframework.stereotype.Component;
 import reactor.core.publisher.Mono;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import javax.annotation.PostConstruct;
 import java.util.Collections;
 import java.util.Map;
 

+ 18 - 7
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/client/VertxMqttClient.java

@@ -1,5 +1,6 @@
 package org.jetlinks.community.network.mqtt.client;
 
+import cn.hutool.extra.spring.SpringUtil;
 import io.netty.handler.codec.mqtt.MqttQoS;
 import io.vertx.core.buffer.Buffer;
 import lombok.Getter;
@@ -7,14 +8,24 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.MapUtils;
 import org.jetlinks.community.network.DefaultNetworkType;
 import org.jetlinks.community.network.NetworkType;
+import org.jetlinks.community.utils.TopicConvertorUtil;
+import org.jetlinks.core.codec.Decoder;
+import org.jetlinks.core.codec.Encoder;
+import org.jetlinks.core.event.EventBus;
+import org.jetlinks.core.event.Subscription;
+import org.jetlinks.core.event.TopicPayload;
 import org.jetlinks.core.message.codec.MqttMessage;
 import org.jetlinks.core.message.codec.SimpleMqttMessage;
 import org.jetlinks.core.topic.Topic;
+import org.jetlinks.core.utils.TopicUtils;
+import org.jetlinks.supports.event.BrokerEventBus;
+import org.reactivestreams.Publisher;
 import reactor.core.Disposable;
 import reactor.core.Disposables;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.FluxSink;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler;
 import reactor.util.function.Tuple3;
 import reactor.util.function.Tuples;
 
@@ -161,22 +172,22 @@ public class VertxMqttClient implements MqttClient {
                     client.subscribe(convertMqttTopic(topic), qos, result -> {
                         if (!result.succeeded()) {
                             sink.error(result.cause());
-                        }else {
+                        } else {
 
                         }
                     });
-                }else if(isAlive()&&loadSuccessListener.size()!=0){
+                } else if (isAlive() && loadSuccessListener.size() != 0) {
                     client.subscribe(convertMqttTopic(topic), qos, result -> {
                         if (!result.succeeded()) {
                             sink.error(result.cause());
-                        }else {
-                            log.info("sucess subscribe mqtt topic {}",this.client.clientId(), topic);
+                        } else {
+                            log.info("sucess subscribe mqtt topic {}", this.client.clientId(), topic);
                         }
                     });
-                }else if(!isAlive()){
+                } else if (!isAlive()) {
                     loadSuccessListener
                         .add(() ->
-                            subscribe(Collections.singletonList(topic),qos)
+                            subscribe(Collections.singletonList(topic), qos)
                                 .doOnComplete(sink::complete)
                                 .doOnError(sink::error)
                                 .subscribe());
@@ -215,7 +226,7 @@ public class VertxMqttClient implements MqttClient {
                         .doOnSuccess(sink::success)
                         .doOnError(sink::error)
                         .subscribe());
-                });
+            });
         }
         return doPublish(message);
     }

+ 1 - 1
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttClientDeviceGateway.java

@@ -96,7 +96,7 @@ public class MqttClientDeviceGateway implements DeviceGateway {
         disposable
             .add(mqttClient
                      .subscribe(topics)
-                     .filter((msg) -> started.get())
+//                     .filter((msg) -> started.get())
                      .flatMap(mqttMessage -> {
                          AtomicReference<Duration> timeoutRef = new AtomicReference<>();
                          return this

+ 26 - 17
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java

@@ -10,11 +10,13 @@ import org.jetlinks.community.gateway.monitor.GatewayMonitors;
 import org.jetlinks.community.gateway.monitor.MonitorSupportDeviceGateway;
 import org.jetlinks.community.network.DefaultNetworkType;
 import org.jetlinks.community.network.NetworkType;
+import org.jetlinks.community.network.mqtt.auth.MqttDefaultAuth;
 import org.jetlinks.community.network.mqtt.gateway.device.session.MqttConnectionSession;
 import org.jetlinks.community.network.mqtt.server.MqttConnection;
 import org.jetlinks.community.network.mqtt.server.MqttServer;
 import org.jetlinks.community.network.utils.DeviceGatewayHelper;
 import org.jetlinks.core.ProtocolSupport;
+import org.jetlinks.core.defaults.Authenticator;
 import org.jetlinks.core.device.AuthenticationResponse;
 import org.jetlinks.core.device.DeviceOperator;
 import org.jetlinks.core.device.DeviceRegistry;
@@ -40,6 +42,7 @@ import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 import reactor.util.function.Tuple3;
 import reactor.util.function.Tuples;
+import sun.security.provider.MD5;
 
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.LongAdder;
@@ -73,6 +76,8 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
 
     private Disposable disposable;
 
+    private Authenticator authenticator=new MqttDefaultAuth();
+
     private final DeviceGatewayHelper helper;
 
     public MqttServerDeviceGateway(String id,
@@ -128,9 +133,11 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
                     .getPassword(), getTransport());
                 return supportMono
                     //使用自定义协议来认证
-                    .map(support -> support.authenticate(request, registry))
+//                    .map(support -> support.authenticate(request, registry))
+                    .map(support -> authenticator.authenticate(request, registry))
                     .defaultIfEmpty(Mono.defer(() -> registry
                         .getDevice(connection.getClientId())
+//                        .flatMap(device -> device.authenticate(request))))
                         .flatMap(device -> device.authenticate(request))))
                     .flatMap(Function.identity())
                     .switchIfEmpty(Mono.fromRunnable(() -> connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD)));
@@ -152,6 +159,9 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
             }));
     }
 
+    public static void main(String[] args) {
+
+    }
     //处理认证结果
     private Mono<Tuple3<MqttConnection, DeviceOperator, MqttConnectionSession>> handleAuthResponse(DeviceOperator device,
                                                                                                    AuthenticationResponse resp,
@@ -202,7 +212,6 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
 
     //处理已经建立连接的MQTT连接
     private Mono<Void> handleAcceptedMqttConnection(MqttConnection connection, DeviceOperator operator, MqttConnectionSession session) {
-
         return connection
             .handleMessage()
             .filter(pb -> started.get())
@@ -213,9 +222,9 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
             .publishOn(Schedulers.parallel())
             .doOnNext(msg -> gatewayMonitor.receivedMessage())
             .flatMap(publishing ->
-                         this.decodeAndHandleMessage(operator, session, publishing.getMessage(), connection)
-                             //ack
-                             .doOnSuccess(s -> publishing.acknowledge())
+                this.decodeAndHandleMessage(operator, session, publishing.getMessage(), connection)
+                    //ack
+                    .doOnSuccess(s -> publishing.acknowledge())
             )
             //合并遗言消息
             .mergeWith(
@@ -269,18 +278,18 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
                 .then();
         }
         return helper.handleDeviceMessage(message,
-                                          device -> new MqttConnectionSession(device.getDeviceId(),
-                                                                              device,
-                                                                              getTransport(),
-                                                                              connection,
-                                                                              gatewayMonitor),
-                                          session -> {
-
-                                          },
-                                          () -> {
-                                              log.warn("无法从MQTT[{}]消息中获取设备信息:{}", connection.getClientId(), message);
-                                          })
-                     .then();
+            device -> new MqttConnectionSession(device.getDeviceId(),
+                device,
+                getTransport(),
+                connection,
+                gatewayMonitor),
+            session -> {
+
+            },
+            () -> {
+                log.warn("无法从MQTT[{}]消息中获取设备信息:{}", connection.getClientId(), message);
+            })
+            .then();
     }
 
     @Override

+ 3 - 3
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttConnection.java

@@ -20,10 +20,12 @@ import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.jetlinks.community.network.mqtt.server.*;
 import org.jetlinks.core.event.EventBus;
+import org.jetlinks.core.message.DeviceOnlineMessage;
 import org.jetlinks.core.message.MessageType;
 import org.jetlinks.core.message.codec.MqttMessage;
 import org.jetlinks.core.message.codec.SimpleMqttMessage;
 import org.jetlinks.core.server.mqtt.MqttAuth;
+import org.jetlinks.core.server.session.DeviceSessionManager;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.util.StringUtils;
@@ -131,15 +133,13 @@ class VertxMqttConnection implements MqttConnection {
         try {
             if (!endpoint.isConnected()) {
 
-
             }
             //心跳检测 todo 改为配置文件配置
             heartIdleHandler(endpoint,100,TimeUnit.SECONDS);
             //放置遗言
             this.willMessage = this.getWillMessage();
             //获取设备id
-            String deviceId=endpoint.clientIdentifier();
-
+//            String deviceId=endpoint.clientIdentifier();
         } catch (Exception e) {
             close().subscribe();
             log.warn(e.getMessage(), e);

+ 49 - 39
jetlinks-components/network-component/mqtt-component/src/test/java/org/jetlinks/community/network/mqtt/client/MqttClientProviderTest.java

@@ -1,27 +1,19 @@
 package org.jetlinks.community.network.mqtt.client;
 
-import io.netty.buffer.Unpooled;
-import io.netty.handler.codec.mqtt.MqttQoS;
 import io.vertx.core.Vertx;
-import io.vertx.core.buffer.Buffer;
 import io.vertx.mqtt.MqttClientOptions;
-import io.vertx.mqtt.MqttServer;
-import org.jetlinks.community.network.mqtt.server.vertx.VertxMqttServer;
-import org.jetlinks.core.message.Message;
+import org.jetlinks.core.event.EventBus;
+import org.jetlinks.core.event.Subscription;
 import org.jetlinks.core.message.codec.MqttMessage;
-import org.jetlinks.core.message.codec.SimpleMqttMessage;
-import org.junit.jupiter.api.Test;
+import org.jetlinks.supports.event.BrokerEventBus;
 import org.springframework.mock.env.MockEnvironment;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
-import reactor.test.StepVerifier;
-
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 
 
 class MqttClientProviderTest {
@@ -30,10 +22,10 @@ class MqttClientProviderTest {
 
 
     public static void main(String[] args) throws InterruptedException {
-        vertx.setTimer(100,event -> {});
-        MqttServer server = MqttServer.create(vertx);
-        VertxMqttServer vertxMqttServer = new VertxMqttServer("123");
-        vertxMqttServer.setMqttServer(Collections.singleton(server));
+//        vertx.setTimer(100,event -> {});
+//        MqttServer server = MqttServer.create(vertx);
+//        VertxMqttServer vertxMqttServer = new VertxMqttServer("123");
+//        vertxMqttServer.setMqttServer(Collections.singleton(server));
 //
 //        server.endpointHandler(endpoint -> {
 //            endpoint
@@ -42,25 +34,43 @@ class MqttClientProviderTest {
 //        }).listen(11223);
 
         MqttClientProvider sub = new MqttClientProvider(id -> Mono.empty(), vertx,new MockEnvironment());
+        EventBus eventBus =new BrokerEventBus();
 
         MqttClientProperties properties = new MqttClientProperties();
         properties.setHost("123.56.154.53");
         properties.setPort(1883);
         properties.setOptions(new MqttClientOptions());
         properties.setClientId(UUID.randomUUID().toString());
+        eventBus.subscribe(Subscription.of("ceshi","/test",Subscription.DEFAULT_FEATURES))
+            .doOnNext(s->{
+                String topic = s.getTopic();
+//                    System.out.println(topic);
+            })
+            .map(s->{
+                String topic = s.getTopic();
+//                    System.out.println(topic);
+                return topic;
+            }).subscribe();
+
         CompletableFuture.runAsync(()->{
 
             VertxMqttClient client = sub.createNetwork(properties);
             Flux<MqttMessage> subscribe = client.subscribe(Arrays.asList("/test"));
             subscribe
+                .map(mqttMessage -> {
+
+                    Long block = eventBus.publish(mqttMessage.getTopic(), mqttMessage).block();
+                    return mqttMessage;
+                })
                 .map(MqttMessage::getPayload)
                 .map(payload -> payload.toString(StandardCharsets.UTF_8))
                 .doOnNext(string->{
+
                     System.out.println("处理缓存>>>>>>>>>>"+string);
                 })
-            .subscribe(string->{
-                System.out.println("接收到数据>>>>>>>>>>"+string);
-            });
+                .subscribe(string->{
+                    System.out.println("接收到数据>>>>>>>>>>"+string);
+                });
 //                .as(StepVerifier::create)
 //                .expectNext("test")
 //                .verifyComplete();
@@ -74,26 +84,26 @@ class MqttClientProviderTest {
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
-        MqttClientProvider pub = new MqttClientProvider(id -> Mono.empty(), vertx, new MockEnvironment());
-        MqttClientProperties pubProperties = new MqttClientProperties();
-        pubProperties.setHost("123.56.154.53");
-        pubProperties.setPort(1883);
-        pubProperties.setOptions(new MqttClientOptions());
-        pubProperties.setClientId(UUID.randomUUID().toString());
-        VertxMqttClient pubClient = pub.createNetwork(pubProperties);
-        SimpleMqttMessage simpleMqttMessage = new SimpleMqttMessage();
-        simpleMqttMessage.setPayload(Unpooled.wrappedBuffer("123test".getBytes()));
-        simpleMqttMessage.setTopic("/test");
-        simpleMqttMessage.setMessageId(123);
-        simpleMqttMessage.setQosLevel(1);
-        pubClient.publish(simpleMqttMessage).subscribe();
-        simpleMqttMessage.setPayload(Unpooled.wrappedBuffer("123".getBytes()));
-        simpleMqttMessage.setTopic("/test");
-        simpleMqttMessage.setMessageId(456);
-        simpleMqttMessage.setQosLevel(2);
-        simpleMqttMessage.setMessageId(456);
-        pubClient.publish(simpleMqttMessage).subscribe();
-//        });
+//        MqttClientProvider pub = new MqttClientProvider(id -> Mono.empty(), vertx, new MockEnvironment());
+//        MqttClientProperties pubProperties = new MqttClientProperties();
+//        pubProperties.setHost("123.56.154.53");
+//        pubProperties.setPort(1883);
+//        pubProperties.setOptions(new MqttClientOptions());
+//        pubProperties.setClientId(UUID.randomUUID().toString());
+//        VertxMqttClient pubClient = pub.createNetwork(pubProperties);
+//        SimpleMqttMessage simpleMqttMessage = new SimpleMqttMessage();
+//        simpleMqttMessage.setPayload(Unpooled.wrappedBuffer("123test".getBytes()));
+//        simpleMqttMessage.setTopic("/test");
+//        simpleMqttMessage.setMessageId(123);
+//        simpleMqttMessage.setQosLevel(1);
+//        pubClient.publish(simpleMqttMessage).subscribe();
+//        simpleMqttMessage.setPayload(Unpooled.wrappedBuffer("123".getBytes()));
+//        simpleMqttMessage.setTopic("/test");
+//        simpleMqttMessage.setMessageId(456);
+//        simpleMqttMessage.setQosLevel(2);
+//        simpleMqttMessage.setMessageId(456);
+//        pubClient.publish(simpleMqttMessage).subscribe();
+////        });
         while (true){
 
         }

+ 2 - 1
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/DeviceInstanceController.java

@@ -211,7 +211,8 @@ public class DeviceInstanceController implements
         return Mono
             .zip(payload, Authentication.currentReactive(), this::applyAuthentication)
             .flatMap(entity -> service.insert(Mono.just(entity)).thenReturn(entity))
-            .onErrorMap(DuplicateKeyException.class, err -> new BusinessException("设备ID已存在", err));
+            .onErrorMap(e->new BusinessException("服务器繁忙,请稍后重试",e));
+//            .onErrorMap(DuplicateKeyException.class, err -> new BusinessException("设备ID已存在", err));
     }
 
     /**

+ 3 - 3
jetlinks-standalone/src/main/resources/application.yml

@@ -135,13 +135,13 @@ logging:
     org.jetlinks.supports.event: warn
     org.springframework: warn
     org.jetlinks.community.device.message.writer: warn
-    org.jetlinks.community.elastic.search.service: trace
-    org.jetlinks.community.elastic.search.service.reactive: trace
+    org.jetlinks.community.elastic.search.service: warn
+    org.jetlinks.community.elastic.search.service.reactive: warn
     org.jetlinks.community.network: warn
     io.vertx.mqtt.impl: warn
     org.springframework.data.elasticsearch.client: trace
 #        org.elasticsearch: error
-    org.jetlinks.pro.influx: trace
+    org.jetlinks.pro.influx: warn
     org.elasticsearch: error
   config: classpath:logback-spring.xml
 vertx: