Browse Source

add coap网络组件

18339543638 4 years ago
parent
commit
e9fdbc314b

+ 18 - 15
jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/gateway/CoapServerDeviceGateway.java

@@ -1,9 +1,6 @@
 package org.jetlinks.community.network.coap.gateway;
 
-import cn.hutool.core.lang.Pair;
-import cn.hutool.core.map.MapUtil;
-import cn.hutool.json.JSONUtil;
-import io.netty.buffer.ByteBuf;
+import io.vertx.mqtt.MqttServer;
 import javassist.NotFoundException;
 import lombok.AllArgsConstructor;
 import lombok.Getter;
@@ -12,6 +9,7 @@ import org.eclipse.californium.core.coap.CoAP;
 import org.hswebframework.web.logger.ReactiveLogger;
 import org.jetlinks.community.gateway.DeviceGateway;
 import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor;
+import org.jetlinks.community.gateway.monitor.GatewayMonitors;
 import org.jetlinks.community.gateway.monitor.MonitorSupportDeviceGateway;
 import org.jetlinks.community.network.DefaultNetworkType;
 import org.jetlinks.community.network.NetworkType;
@@ -19,7 +17,6 @@ import org.jetlinks.community.network.coap.gateway.session.CoapConnectionSession
 import org.jetlinks.community.network.coap.server.MultiCoapServer;
 import org.jetlinks.community.network.utils.DeviceGatewayHelper;
 import org.jetlinks.core.ProtocolSupport;
-import org.jetlinks.core.device.AuthenticationResponse;
 import org.jetlinks.core.device.DeviceOperator;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.message.CommonDeviceMessage;
@@ -30,28 +27,19 @@ import org.jetlinks.core.message.codec.FromDeviceMessageContext;
 import org.jetlinks.core.message.codec.CoapExchangeMessage;
 import org.jetlinks.core.message.codec.DefaultTransport;
 import org.jetlinks.core.message.codec.Transport;
-import org.jetlinks.core.message.codec.CoapResponseMessage;
 import org.jetlinks.core.server.session.DeviceSession;
 import org.jetlinks.core.server.session.DeviceSessionManager;
 import org.jetlinks.core.server.session.ReplaceableDeviceSession;
-import org.jetlinks.core.utils.IdUtils;
 import org.jetlinks.supports.server.DecodedClientMessageHandler;
-import org.reactivestreams.Publisher;
 import org.springframework.util.StringUtils;
 import reactor.core.Disposable;
 import reactor.core.publisher.EmitterProcessor;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.FluxSink;
 import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Scheduler;
 import reactor.core.scheduler.Schedulers;
-import reactor.util.function.Tuple3;
-import reactor.util.function.Tuples;
-import java.nio.charset.Charset;
-import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.LongAdder;
-import java.util.function.Function;
 
 /**
  * @author lifang
@@ -60,7 +48,6 @@ import java.util.function.Function;
  * @Description TODO
  * @createTime 2021年12月17日 10:08:00
  */
-@AllArgsConstructor
 @Slf4j
 public class CoapServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGateway {
     @Getter
@@ -90,6 +77,22 @@ public class CoapServerDeviceGateway implements DeviceGateway, MonitorSupportDev
 
     private final DeviceGatewayHelper helper;
 
+    public CoapServerDeviceGateway(String id,
+                                   DeviceRegistry registry,
+                                   DeviceSessionManager sessionManager,
+                                   MultiCoapServer coapServer,
+                                   DecodedClientMessageHandler messageHandler,
+                                   Mono<ProtocolSupport> customProtocol) {
+
+        this.gatewayMonitor = GatewayMonitors.getDeviceGatewayMonitor(id);
+        this.id = id;
+        this.registry = registry;
+        this.sessionManager = sessionManager;
+        this.coapServer = coapServer;
+        this.messageHandler = messageHandler;
+        this.supportMono = customProtocol;
+        this.helper = new DeviceGatewayHelper(registry, sessionManager, messageHandler);
+    }
     @Override
     public Transport getTransport() {
         return DefaultTransport.CoAP;

+ 13 - 1
jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/gateway/CoapServerDeviceGatewayProvider.java

@@ -7,6 +7,7 @@ import org.jetlinks.community.gateway.supports.DeviceGatewayProvider;
 import org.jetlinks.community.network.DefaultNetworkType;
 import org.jetlinks.community.network.NetworkManager;
 import org.jetlinks.community.network.NetworkType;
+import org.jetlinks.community.network.coap.server.MultiCoapServer;
 import org.jetlinks.core.ProtocolSupports;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.server.session.DeviceSessionManager;
@@ -51,6 +52,17 @@ public class CoapServerDeviceGatewayProvider implements DeviceGatewayProvider {
 
     @Override
     public Mono<DeviceGateway> createDeviceGateway(DeviceGatewayProperties properties) {
-        return null;
+        return networkManager
+            .<MultiCoapServer>getNetwork(getNetworkType(), properties.getNetworkId())
+            .map(coapServer -> new CoapServerDeviceGateway(
+                properties.getId(),
+                registry,
+                sessionManager,
+                coapServer,
+                messageHandler,
+                properties.getString("protocol")
+                    .map(id -> Mono.defer(() -> protocolSupports.getProtocol(id)))
+                    .orElse(Mono.empty())
+            ));
     }
 }

+ 19 - 8
jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/server/MultiCoapServerProvider.java

@@ -1,15 +1,14 @@
 package org.jetlinks.community.network.coap.server;
 
+import com.alibaba.fastjson.JSONObject;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.eclipse.californium.core.CoapServer;
+import org.eclipse.californium.core.network.config.NetworkConfig;
+import org.hswebframework.web.bean.FastBeanCopier;
 import org.jetlinks.community.network.*;
-import org.jetlinks.community.network.security.CertificateManager;
-import org.jetlinks.core.message.codec.CoapExchangeMessage;
 import org.jetlinks.core.metadata.ConfigMetadata;
 import org.springframework.stereotype.Component;
-import reactor.core.publisher.EmitterProcessor;
-import reactor.core.publisher.FluxSink;
 import reactor.core.publisher.Mono;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -26,10 +25,7 @@ import java.util.ArrayList;
 @AllArgsConstructor
 @Slf4j
 public class MultiCoapServerProvider implements NetworkProvider<CoapServerProperties> {
-    private final CertificateManager certificateManager;
 
-    private final EmitterProcessor<CoapExchangeMessage> processor=EmitterProcessor.create();
-    private final FluxSink<CoapExchangeMessage> sink=processor.sink(FluxSink.OverflowStrategy.BUFFER);
     @Nonnull
     @Override
     public NetworkType getType() {
@@ -68,6 +64,21 @@ public class MultiCoapServerProvider implements NetworkProvider<CoapServerProper
     @Nonnull
     @Override
     public Mono<CoapServerProperties> createConfig(@Nonnull NetworkProperties properties) {
-        return null;
+        return Mono.defer(() -> {
+            CoapServerProperties config = FastBeanCopier.copy(properties.getConfigurations(), new CoapServerProperties());
+            config.setId(properties.getId());
+
+            config.setOptions(new JSONObject(properties.getConfigurations()).toJavaObject(NetworkConfig.class));
+
+//            if (config.isSsl()) {
+//                config.getOptions().setSsl(true);
+//                return certificateManager.getCertificate(config.getCertId())
+//                    .map(VertxKeyCertTrustOptions::new)
+//                    .doOnNext(config.getOptions()::setKeyCertOptions)
+//                    .doOnNext(config.getOptions()::setTrustOptions)
+//                    .thenReturn(config);
+//            }
+            return Mono.just(config);
+        });
     }
 }

+ 5 - 0
jetlinks-manager/network-manager/pom.xml

@@ -74,6 +74,11 @@
             <version>${project.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.jetlinks.community</groupId>
+            <artifactId>coap-component</artifactId>
+            <version>${project.version}</version>
+        </dependency>
 
     </dependencies>