Browse Source

add coap网络组件

18339543638 4 years ago
parent
commit
cf2a481972
51 changed files with 2149 additions and 163 deletions
  1. 59 0
      jetlinks-components/network-component/coap-component/pom.xml
  2. 438 0
      jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/core/DefaultCoapServer.java
  3. 278 0
      jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/core/DefaultServerMessageDeliverer.java
  4. 35 0
      jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/gateway/CoapAuthRequest.java
  5. 221 0
      jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/gateway/CoapServerDeviceGateway.java
  6. 56 0
      jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/gateway/CoapServerDeviceGatewayProvider.java
  7. 95 0
      jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/gateway/session/CoapConnectionSession.java
  8. 85 0
      jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/resources/AbstractCoapResource.java
  9. 23 0
      jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/resources/CoordinateResource.java
  10. 23 0
      jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/resources/DirectResource.java
  11. 23 0
      jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/resources/EventCoapResource.java
  12. 23 0
      jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/resources/FirmwareReportResource.java
  13. 23 0
      jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/resources/LogResource.java
  14. 23 0
      jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/resources/MatchAllResource.java
  15. 23 0
      jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/resources/MetadataDerivedCoapResource.java
  16. 23 0
      jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/resources/PropertiesReportCoapResource.java
  17. 23 0
      jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/resources/RegisterCoapResource.java
  18. 23 0
      jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/resources/TagResource.java
  19. 23 0
      jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/resources/TimeSyncResource.java
  20. 23 0
      jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/resources/UnRegisterCoapResource.java
  21. 23 0
      jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/resources/UpgradeProgressResource.java
  22. 23 0
      jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/resources/UpgradePullResource.java
  23. 28 0
      jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/server/CoapServerProperties.java
  24. 34 0
      jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/server/DefaultCoapResource.java
  25. 89 0
      jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/server/MultiCoapServer.java
  26. 73 0
      jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/server/MultiCoapServerProvider.java
  27. 0 67
      jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/auth/MqttDefaultAuth.java
  28. 2 50
      jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java
  29. 0 2
      jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGatewayProvider.java
  30. 0 2
      jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/MqttConnection.java
  31. 0 15
      jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttServer.java
  32. 8 0
      jetlinks-components/network-component/network-core/pom.xml
  33. 123 0
      jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/support/JetLinksExtendCoapDTLSDeviceMessageCodec.java
  34. 87 0
      jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/support/JetLinksExtendCoapDeviceMessageCodec.java
  35. 2 4
      jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/support/JetLinksExtendProtocolSupportProvider.java
  36. 1 0
      jetlinks-components/network-component/pom.xml
  37. 1 0
      jetlinks-core/src/main/java/org/jetlinks/core/message/codec/DeviceMessageDecoder.java
  38. 2 1
      jetlinks-core/src/main/java/org/jetlinks/core/message/codec/EncodedMessage.java
  39. 2 0
      jetlinks-core/src/main/java/org/jetlinks/core/message/codec/MessageDecodeContext.java
  40. 19 7
      jetlinks-core/src/main/java/org/jetlinks/core/message/codec/coap/CoapExchangeMessage.java
  41. 2 1
      jetlinks-core/src/main/java/org/jetlinks/core/message/codec/coap/CoapMessage.java
  42. 2 1
      jetlinks-core/src/main/java/org/jetlinks/core/message/codec/coap/CoapResponseMessage.java
  43. 62 0
      jetlinks-core/src/main/java/org/jetlinks/core/message/codec/coap/DefaultCoapExchange.java
  44. 2 1
      jetlinks-core/src/main/java/org/jetlinks/core/message/codec/coap/DefaultCoapMessage.java
  45. 2 1
      jetlinks-core/src/main/java/org/jetlinks/core/message/codec/coap/DefaultCoapResponseMessage.java
  46. 1 0
      jetlinks-core/src/test/java/org/jetlinks/core/message/codec/DefaultCoapMessageTest.java
  47. 1 0
      jetlinks-core/src/test/java/org/jetlinks/core/message/codec/DefaultCoapResponseMessageTest.java
  48. 2 2
      jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/server/aliyun/AliBridgeServer.java
  49. 0 9
      jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/web/AliBridgeDeviceController.java
  50. 1 0
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/request/ProtocolDecodePayload.java
  51. 14 0
      pom.xml

+ 59 - 0
jetlinks-components/network-component/coap-component/pom.xml

@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>network-component</artifactId>
+        <groupId>org.jetlinks.community</groupId>
+        <version>1.10.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>coap-component</artifactId>
+
+
+    <dependencies>
+        <dependency>
+            <groupId>io.vertx</groupId>
+            <artifactId>vertx-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>io.vertx</groupId>
+            <artifactId>vertx-mqtt</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.jetlinks</groupId>
+            <artifactId>jetlinks-core</artifactId>
+            <version>${jetlinks.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>network-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>gateway-component</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>cn.hutool</groupId>
+            <artifactId>hutool-all</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.eclipse.californium</groupId>
+            <artifactId>californium-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.eclipse.californium</groupId>
+            <artifactId>element-connector</artifactId>
+        </dependency>
+    </dependencies>
+</project>

+ 438 - 0
jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/core/DefaultCoapServer.java

@@ -0,0 +1,438 @@
+package org.jetlinks.community.network.coap.core;
+
+import org.eclipse.californium.core.CoapResource;
+import org.eclipse.californium.core.CoapServer;
+import org.eclipse.californium.core.coap.CoAP;
+import org.eclipse.californium.core.network.CoapEndpoint;
+import org.eclipse.californium.core.network.Endpoint;
+import org.eclipse.californium.core.network.config.NetworkConfig;
+import org.eclipse.californium.core.server.MessageDeliverer;
+import org.eclipse.californium.core.server.ServerInterface;
+import org.eclipse.californium.core.server.resources.CoapExchange;
+import org.eclipse.californium.core.server.resources.DiscoveryResource;
+import org.eclipse.californium.core.server.resources.Resource;
+import org.eclipse.californium.elements.util.ExecutorsUtil;
+import org.eclipse.californium.elements.util.NamedThreadFactory;
+import org.eclipse.californium.elements.util.StringUtil;
+import org.jetlinks.community.network.coap.resources.MatchAllResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.AntPathMatcher;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ScheduledExecutorService;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName DefaultCoapServer.java
+ * @Description TODO
+ * @createTime 2021年12月16日 16:43:00
+ */
+public class DefaultCoapServer implements ServerInterface {
+
+    /** The logger. */
+    private static final Logger LOGGER = LoggerFactory.getLogger(CoapServer.class);
+
+    /** The root resource. */
+    private final Resource root;
+
+    /** The network configuration used by this server. */
+    private final NetworkConfig config;
+
+    /** The message deliverer. */
+    private MessageDeliverer deliverer;
+
+    /** The list of endpoints the server connects to the network. */
+    private final List<Endpoint> endpoints;
+
+    /** The executor of the server for its endpoints (can be null). */
+    private ScheduledExecutorService executor;
+
+    /** Scheduled executor intended to be used for rare executing timers (e.g. cleanup tasks). */
+    private ScheduledExecutorService secondaryExecutor;
+    /**
+     * Indicate, it the server-specific executor service is detached, or
+     * shutdown with this server.
+     */
+    private boolean detachExecutor;
+
+    private boolean running;
+
+    /**
+     * Constructs a default server. The server starts after the method
+     * {@link #start()} is called. If a server starts and has no specific ports
+     * assigned, it will bind to CoAP's default port 5683.
+     */
+    public DefaultCoapServer() {
+        this(NetworkConfig.getStandard());
+    }
+
+    /**
+     * Constructs a server that listens to the specified port(s) after method
+     * {@link #start()} is called.
+     *
+     * @param ports the ports to bind to. If empty or {@code null} and no
+     *            endpoints are added with {@link #addEndpoint(Endpoint)}, it
+     *            will bind to CoAP's default port 5683 on {@link #start()}.
+     */
+    public DefaultCoapServer(final int... ports) {
+        this(NetworkConfig.getStandard(), ports);
+    }
+
+    /**
+     * Constructs a server with the specified configuration that listens to the
+     * specified ports after method {@link #start()} is called.
+     *
+     * @param config the configuration, if {@code null} the configuration
+     *            returned by {@link NetworkConfig#getStandard()} is used.
+     * @param ports the ports to bind to. If empty or {@code null} and no
+     *            endpoints are added with {@link #addEndpoint(Endpoint)}, it
+     *            will bind to CoAP's default port 5683 on {@link #start()}.
+     */
+    public DefaultCoapServer(final NetworkConfig config, final int... ports) {
+        // global configuration that is passed down (can be observed for changes)
+        if (config != null) {
+            this.config = config;
+        } else {
+            this.config = NetworkConfig.getStandard();
+        }
+
+        // resources
+        this.root = createRoot();
+        this.deliverer = new DefaultServerMessageDeliverer(root);
+
+        CoapResource wellKnown = new CoapResource(".well-known");
+        wellKnown.setVisible(false);
+        wellKnown.add(new DiscoveryResource(root));
+        root.add(wellKnown);
+
+        // endpoints
+        this.endpoints = new ArrayList<>();
+        // create endpoint for each port
+        if (ports != null) {
+            for (int port : ports) {
+                CoapEndpoint.Builder builder = new CoapEndpoint.Builder();
+                builder.setPort(port);
+                builder.setNetworkConfig(config);
+                addEndpoint(builder.build());
+            }
+        }
+    }
+
+    public synchronized void setExecutors(final ScheduledExecutorService mainExecutor,
+                                          final ScheduledExecutorService secondaryExecutor, final boolean detach) {
+        if (mainExecutor == null || secondaryExecutor == null) {
+            throw new NullPointerException("executors must not be null");
+        }
+        if (this.executor == mainExecutor && this.secondaryExecutor == secondaryExecutor) {
+            return;
+        }
+        if (running) {
+            throw new IllegalStateException("executor service can not be set on running server");
+        }
+
+        if (!this.detachExecutor) {
+            if (this.executor != null) {
+                this.executor.shutdownNow();
+            }
+            if (this.secondaryExecutor != null) {
+                this.secondaryExecutor.shutdownNow();
+            }
+        }
+        this.executor = mainExecutor;
+        this.secondaryExecutor = secondaryExecutor;
+        this.detachExecutor = detach;
+        for (Endpoint ep : endpoints) {
+            ep.setExecutors(this.executor, this.secondaryExecutor);
+        }
+    }
+
+    /**
+     * Starts the server by starting all endpoints this server is assigned to.
+     * Each endpoint binds to its port. If no endpoint is assigned to the
+     * server, an endpoint is started on the port defined in the config.
+     */
+    @Override
+    public synchronized void start() {
+
+        if (running) {
+            return;
+        }
+
+        LOGGER.info("Starting server");
+
+        if (executor == null) {
+            // sets the central thread pool for the protocol stage over all
+            // endpoints
+            setExecutors(ExecutorsUtil.newScheduledThreadPool(//
+                this.config.getInt(NetworkConfig.Keys.PROTOCOL_STAGE_THREAD_COUNT),
+                new NamedThreadFactory("CoapServer(main)#")), //$NON-NLS-1$
+                ExecutorsUtil.newDefaultSecondaryScheduler("CoapServer(secondary)#"), false);
+        }
+
+        if (endpoints.isEmpty()) {
+            // servers should bind to the configured port (while clients should use an ephemeral port through the default endpoint)
+            int port = config.getInt(NetworkConfig.Keys.COAP_PORT);
+            LOGGER.info("no endpoints have been defined for server, setting up server endpoint on default port {}", port);
+            CoapEndpoint.Builder builder = new CoapEndpoint.Builder();
+            builder.setPort(port);
+            builder.setNetworkConfig(config);
+            addEndpoint(builder.build());
+        }
+
+        int started = 0;
+        for (Endpoint ep : endpoints) {
+            try {
+                ep.start();
+                // only reached on success
+                ++started;
+            } catch (IOException e) {
+                LOGGER.error("cannot start server endpoint [{}]", ep.getAddress(), e);
+            }
+        }
+        if (started == 0) {
+            throw new IllegalStateException("None of the server endpoints could be started");
+        } else {
+            running = true;
+        }
+    }
+
+    /**
+     * Stops the server, i.e., unbinds it from all ports. Frees as much system
+     * resources as possible to still be able to be re-started with the previous binds.
+     * To free all system resources {@link #destroy()} must be called!
+     */
+    @Override
+    public synchronized void stop() {
+
+        if (running) {
+            LOGGER.info("Stopping server");
+            for (Endpoint ep : endpoints) {
+                ep.stop();
+            }
+            running = false;
+        }
+    }
+
+    /**
+     * Destroys the server, i.e., unbinds from all ports and frees all system resources.
+     */
+    @Override
+    public synchronized void destroy() {
+        LOGGER.info("Destroying server");
+        // prevent new tasks from being submitted
+        try {
+            if (!detachExecutor)
+                if (running) {
+                    ExecutorsUtil.shutdownExecutorGracefully(2000, executor, secondaryExecutor);
+                } else {
+                    if (executor !=null) {
+                        executor.shutdownNow();
+                    }
+                    if (secondaryExecutor != null) {
+                        secondaryExecutor.shutdownNow();
+                    }
+                }
+        } finally {
+            for (Endpoint ep : endpoints) {
+                ep.destroy();
+            }
+            LOGGER.info("CoAP server has been destroyed");
+            running = false;
+        }
+    }
+
+    /**
+     * Sets the message deliverer.
+     *
+     * @param deliverer the new message deliverer
+     */
+    public void setMessageDeliverer(final MessageDeliverer deliverer) {
+        this.deliverer = deliverer;
+        for (Endpoint endpoint : endpoints) {
+            endpoint.setMessageDeliverer(deliverer);
+        }
+    }
+
+    /**
+     * Gets the message deliverer.
+     *
+     * @return the message deliverer
+     */
+    public MessageDeliverer getMessageDeliverer() {
+        return deliverer;
+    }
+
+    /**
+     * Adds an Endpoint to the server. WARNING: It automatically configures the
+     * default executor of the server. Endpoints that should use their own
+     * executor (e.g., to prioritize or balance request handling) either set it
+     * afterwards before starting the server or override the setExecutor()
+     * method of the special Endpoint.
+     *
+     * @param endpoint the endpoint to add
+     */
+    @Override
+    public void addEndpoint(final Endpoint endpoint) {
+        endpoint.setMessageDeliverer(deliverer);
+        if (executor != null && secondaryExecutor != null) {
+            endpoint.setExecutors(executor, secondaryExecutor);
+        }
+        endpoints.add(endpoint);
+    }
+
+    /**
+     * Gets the list of endpoints this server is connected to.
+     *
+     * @return the endpoints
+     */
+    @Override
+    public List<Endpoint> getEndpoints() {
+        return endpoints;
+    }
+
+    /**
+     * Returns the endpoint with a specific port.
+     * @param port the port
+     * @return the endpoint
+     */
+    @Override
+    public Endpoint getEndpoint(int port) {
+        Endpoint endpoint = null;
+
+        for (Endpoint ep : endpoints) {
+            if (ep.getAddress().getPort() == port) {
+                endpoint = ep;
+            }
+        }
+        return endpoint;
+    }
+
+    /**
+     * Returns the endpoint with a specific socket address.
+     * @param address the socket address
+     * @return the endpoint
+     */
+    @Override
+    public Endpoint getEndpoint(InetSocketAddress address) {
+        Endpoint endpoint = null;
+
+        for (Endpoint ep : endpoints) {
+            if (ep.getAddress().equals(address)) {
+                endpoint = ep;
+                break;
+            }
+        }
+
+        return endpoint;
+    }
+
+    /**
+     * Add a resource to the server.
+     * @param resources the resource(s)
+     * @return the server
+     */
+    @Override
+    public DefaultCoapServer add(Resource... resources) {
+        for (Resource r:resources)
+            root.add(r);
+        return this;
+    }
+
+    @Override
+    public boolean remove(Resource resource) {
+        return root.delete(resource);
+    }
+
+    /**
+     * Gets the root of this server.
+     *
+     * @return the root
+     */
+    public Resource getRoot() {
+        return root;
+    }
+
+    /**
+     * Get the network configuration of this server.
+     *
+     * @return the network configuration
+     * @since 2.1
+     */
+    public NetworkConfig getConfig() {
+        return config;
+    }
+
+    /**
+     * Creates a root for this server. Can be overridden to create another root.
+     *
+     * @return the resource
+     */
+    protected Resource createRoot() {
+        return new DefaultCoapServer.RootResource();
+    }
+
+    /**
+     * Represents the root of a resource tree.
+     */
+    private class RootResource extends CoapResource {
+
+        // get version from Maven package
+        private final String msg;
+        private final AntPathMatcher matcher = new AntPathMatcher(File.separator);
+        public RootResource() {
+            super("");
+            String nodeId = config.getString(NetworkConfig.Keys.DTLS_CONNECTION_ID_NODE_ID);
+            String title = "CoAP RFC 7252";
+            if (StringUtil.CALIFORNIUM_VERSION != null) {
+                String version = "Cf " + StringUtil.CALIFORNIUM_VERSION;
+                title = String.format("%s %50s", title, version);
+            }
+            StringBuilder builder = new StringBuilder()
+                .append("****************************************************************\n")
+                .append(title).append("\n")
+                .append("****************************************************************\n")
+                .append("This server is using the Eclipse Californium (Cf) CoAP framework\n")
+                .append("published under EPL+EDL: http://www.eclipse.org/californium/\n\n");
+            if (nodeId != null && !nodeId.isEmpty()) {
+                builder.append("node id = ").append(nodeId).append("\n\n");
+            }
+            builder.append("(c) 2014-2020 Institute for Pervasive Computing, ETH Zurich and others\n");
+            String master = StringUtil.getConfiguration("COAP_ROOT_RESOURCE_FOOTER");
+            if (master != null) {
+                builder.append(master).append("\n");
+            }
+            builder.append("****************************************************************");
+            msg = builder.toString();
+        }
+
+        @Override
+        public void handleGET(CoapExchange exchange) {
+            exchange.respond(CoAP.ResponseCode.CONTENT, msg);
+        }
+
+        @Override
+        public List<Endpoint> getEndpoints() {
+            return DefaultCoapServer.this.getEndpoints();
+        }
+
+
+        @Override
+        public Resource getChild(String name) {
+            Collection<Resource> children = super.getChildren();
+            Optional<Resource> result = children.stream().filter(pattern -> matcher.match(pattern.getName(), name)).findFirst();
+            result.ifPresent(resource->resource.setPath(name));
+            if(result.isPresent()){
+                result = children.stream().filter(resource -> resource instanceof MatchAllResource).findFirst();
+            }
+            return result.orElseGet(()->new CoapResource("noAllow"));
+        }
+    }
+}

+ 278 - 0
jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/core/DefaultServerMessageDeliverer.java

@@ -0,0 +1,278 @@
+/*******************************************************************************
+ * Copyright (c) 2015, 2016 Institute for Pervasive Computing, ETH Zurich and others.
+ * 
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v2.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ * 
+ * The Eclipse Public License is available at
+ *    http://www.eclipse.org/legal/epl-v20.html
+ * and the Eclipse Distribution License is available at
+ *    http://www.eclipse.org/org/documents/edl-v10.html.
+ * 
+ * Contributors:
+ *    Matthias Kovatsch - creator and main architect
+ *    Martin Lanter - architect and re-implementation
+ *    Dominique Im Obersteg - parsers and initial implementation
+ *    Daniel Pauli - parsers and initial implementation
+ *    Kai Hudalla - logging
+ *    Kai Hudalla (Bosch Software Innovations GmbH) - use Logger's message formatting instead of
+ *                                                    explicit String concatenation
+ *    Achim Kraus (Bosch Software Innovations GmbH) - replace byte array token by Token
+ ******************************************************************************/
+package org.jetlinks.community.network.coap.core;
+
+import org.eclipse.californium.core.coap.CoAP;
+import org.eclipse.californium.core.coap.CoAP.ResponseCode;
+import org.eclipse.californium.core.coap.Request;
+import org.eclipse.californium.core.coap.Response;
+import org.eclipse.californium.core.network.Exchange;
+import org.eclipse.californium.core.observe.ObserveManager;
+import org.eclipse.californium.core.observe.ObserveRelation;
+import org.eclipse.californium.core.observe.ObservingEndpoint;
+import org.eclipse.californium.core.server.MessageDeliverer;
+import org.eclipse.californium.core.server.resources.Resource;
+import org.jetlinks.community.network.coap.resources.MatchAllResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.concurrent.Executor;
+
+/**
+ * The DefaultServerMessageDeliverer delivers requests to corresponding resources and
+ * responses to corresponding requests.
+ */
+public class DefaultServerMessageDeliverer implements MessageDeliverer {
+
+	private static final Logger LOGGER = LoggerFactory.getLogger(DefaultServerMessageDeliverer.class);
+
+	/* The root of all resources */
+	private final Resource root;
+
+	/* The manager of the observe mechanism for this server */
+	private final ObserveManager observeManager = new ObserveManager();
+
+	/**
+	 * Constructs a default message deliverer that delivers requests to the
+	 * resources rooted at the specified root.
+	 * 
+	 * @param root the root resource
+	 */
+	public DefaultServerMessageDeliverer(final Resource root) {
+		this.root = root;
+	}
+
+	/**
+	 * Delivers an inbound CoAP request to an appropriate resource.
+	 * <p>
+	 * This method first invokes {@link #preDeliverRequest(Exchange)}. The
+	 * request is considered <em>processed</em> if the
+	 * <em>preDeliverRequest</em> method returned {@code true}.
+	 * <p>
+	 * Otherwise, this method
+	 * <ol>
+	 * <li>tries to {@linkplain #findResource(List) find a matching
+	 * resource},</li>
+	 * <li>handle a GET request's observe option and</li>
+	 * <li>deliver the request to the resource for processing.</li>
+	 * </ol>
+	 * 
+	 * @param exchange The exchange containing the inbound request.
+	 * @throws NullPointerException if exchange is {@code null}.
+	 */
+	@Override
+	public final void deliverRequest(final Exchange exchange) {
+		if (exchange == null) {
+			throw new NullPointerException("exchange must not be null");
+		}
+		boolean processed = preDeliverRequest(exchange);
+		if (!processed) {
+			final Resource resource = findResource(exchange);
+			if (resource != null) {
+				checkForObserveOption(exchange, resource);
+
+				// Get the executor and let it process the request
+				Executor executor = resource.getExecutor();
+				if (executor != null) {
+					executor.execute(new Runnable() {
+
+						public void run() {
+							resource.handleRequest(exchange);
+						}
+					});
+				} else {
+					resource.handleRequest(exchange);
+				}
+			} else {
+				if (LOGGER.isInfoEnabled()) {
+					Request request = exchange.getRequest();
+					LOGGER.info("did not find resource /{} requested by {}", request.getOptions().getUriPathString(),
+							request.getSourceContext().getPeerAddress());
+				}
+				exchange.sendResponse(new Response(ResponseCode.NOT_FOUND));
+			}
+		}
+	}
+
+	/**
+	 * Invoked by the <em>deliverRequest</em> before the request gets processed.
+	 * <p>
+	 * Subclasses may override this method in order to replace the default
+	 * request handling logic or to modify or add headers etc before the request
+	 * gets processed.
+	 * <p>
+	 * This default implementation returns {@code false}.
+	 * 
+	 * @param exchange The exchange for the incoming request.
+	 * @return {@code true} if the request has already been processed by this
+	 *         method and thus should not be delivered to a matching resource
+	 *         anymore.
+	 */
+	protected boolean preDeliverRequest(final Exchange exchange) {
+		return false;
+	}
+
+	/**
+	 * Checks whether an observe relationship has to be established or canceled.
+	 * <p>
+	 * This is done here to have a server-global observeManager that holds the
+	 * set of remote endpoints for all resources. This global knowledge is
+	 * required for efficient orphan handling.
+	 * 
+	 * @param exchange the exchange of the current request
+	 * @param resource the target resource
+	 */
+	protected final void checkForObserveOption(final Exchange exchange, final Resource resource) {
+
+		Request request = exchange.getRequest();
+		if (CoAP.isObservable(request.getCode()) && request.getOptions().hasObserve() && resource.isObservable()) {
+
+			InetSocketAddress source = request.getSourceContext().getPeerAddress();
+
+			if (request.isObserve()) {
+				// Requests wants to observe and resource allows it :-)
+				LOGGER.debug("initiating an observe relation between {} and resource {}, {}", source, resource.getURI(), exchange);
+				ObservingEndpoint remote = observeManager.findObservingEndpoint(source);
+				ObserveRelation relation = new ObserveRelation(remote, resource, exchange);
+				remote.addObserveRelation(relation);
+				exchange.setRelation(relation);
+				request.setProtectFromOffload();
+				// all that's left is to add the relation to the resource which
+				// the resource must do itself if the response is successful
+
+			} else if (request.isObserveCancel()) {
+				// Observe defines 1 for canceling
+				ObserveRelation relation = observeManager.getRelation(source, request.getToken());
+				if (relation != null) {
+					relation.cancel();
+				}
+			}
+		}
+	}
+
+	/**
+	 * Return root resource.
+	 * 
+	 * Intended to be used by custom {@link #findResource(List)}.
+	 * 
+	 * @return root resources
+	 * @see #root
+	 */
+	protected Resource getRootResource() {
+		return root;
+	}
+
+	/**
+	 * Searches in the resource tree for the specified path. A parent resource
+	 * may accept requests to subresources, e.g., to allow addresses with
+	 * wildcards like <code>coap://example.com:5683/devices/*</code>
+	 * 
+	 * @param exchange The exchange containing the inbound request including the
+	 *            path of resource names
+	 * @return the resource or {@code null}, if not found
+	 * @since 2.1
+	 */
+	protected Resource findResource(Exchange exchange) {
+		return findResource(exchange.getRequest().getOptions().getUriPath());
+	}
+
+	/**
+	 * Searches in the resource tree for the specified path. A parent resource
+	 * may accept requests to subresources, e.g., to allow addresses with
+	 * wildcards like <code>coap://example.com:5683/devices/*</code>
+	 * 
+	 * @param list the path as list of resource names
+	 * @return the resource or {@code null}, if not found
+	 */
+	protected Resource findResource(final List<String> list) {
+//		Deque<String> path = new LinkedList<String>(list);
+		StringBuffer path=new StringBuffer();
+		list.forEach(block->path.append("/").append(block));
+		Resource current = getRootResource();
+//		while (!path.isEmpty() && current != null) {
+//			String name = path.removeFirst();
+//			current = current.getChild(name);
+//}
+        current=current.getChild(path.toString());
+		return current;
+	}
+
+	/**
+	 * Delivers an inbound CoAP response message to its corresponding request.
+	 * <p>
+	 * This method first invokes
+	 * {@link #preDeliverResponse(Exchange, Response)}. The response is
+	 * considered <em>processed</em> if the <em>preDeliverResponse</em> method
+	 * returned {@code true}.
+	 * <p>
+	 * * Otherwise, this method delivers the response to the corresponding
+	 * request.
+	 * 
+	 * @param exchange The exchange containing the originating CoAP request.
+	 * @param response The inbound CoAP response message.
+	 * @throws NullPointerException if exchange or response are {@code null}.
+	 * @throws IllegalArgumentException if the exchange does not contain a
+	 *             request.
+	 */
+	@Override
+	public final void deliverResponse(final Exchange exchange, final Response response) {
+		if (response == null) {
+			throw new NullPointerException("Response must not be null");
+		} else if (exchange == null) {
+			throw new NullPointerException("Exchange must not be null");
+		} else if (exchange.getRequest() == null) {
+			throw new IllegalArgumentException("Exchange does not contain request");
+		} else {
+			boolean processed = preDeliverResponse(exchange, response);
+			if (!processed) {
+				exchange.getRequest().setResponse(response);
+			}
+		}
+	}
+
+	/**
+	 * Invoked by the <em>deliverResponse</em> method before the response is
+	 * delivered to the corresponding request.
+	 * <p>
+	 * Subclasses may override this method in order to replace the default
+	 * response handling logic or to modify or add headers etc before the
+	 * response is delivered to the request.
+	 * <p>
+	 * The response is delivered to the request if and only if the exchange's
+	 * request does not contain a <em>response</em> when this method returns.
+	 * <p>
+	 * This default implementation returns {@code false}.
+	 * 
+	 * @param exchange The exchange containing the request that the incoming
+	 *            response belongs to.
+	 * @param response The incoming response.
+	 * @return {@code true} if the response has been processed by this method
+	 *         and thus should not be delivered to the corresponding request
+	 *         anymore.
+	 */
+	protected boolean preDeliverResponse(final Exchange exchange, final Response response) {
+		return false;
+	}
+}

+ 35 - 0
jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/gateway/CoapAuthRequest.java

@@ -0,0 +1,35 @@
+package org.jetlinks.community.network.coap.gateway;
+
+import com.alibaba.fastjson.annotation.JSONField;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.jetlinks.core.device.AuthenticationRequest;
+import org.jetlinks.core.message.codec.coap.CoapExchangeMessage;
+import org.jetlinks.core.message.codec.DefaultTransport;
+import org.jetlinks.core.message.codec.Transport;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName CoapAuthRequest.java
+ * @Description TODO
+ * @createTime 2021年12月17日 10:48:00
+ */
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class CoapAuthRequest implements AuthenticationRequest {
+    private String timestamp;
+    private String secureKey;
+
+    @JsonIgnore
+    @JSONField(serialize = false,deserialize = false)
+    private CoapExchangeMessage exchangeMessage;
+
+    @Override
+    public Transport getTransport() {
+        return DefaultTransport.CoAP;
+    }
+}

+ 221 - 0
jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/gateway/CoapServerDeviceGateway.java

@@ -0,0 +1,221 @@
+package org.jetlinks.community.network.coap.gateway;
+
+import cn.hutool.core.lang.Pair;
+import cn.hutool.core.map.MapUtil;
+import cn.hutool.json.JSONUtil;
+import io.netty.buffer.ByteBuf;
+import javassist.NotFoundException;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.californium.core.coap.CoAP;
+import org.hswebframework.web.logger.ReactiveLogger;
+import org.jetlinks.community.gateway.DeviceGateway;
+import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor;
+import org.jetlinks.community.gateway.monitor.MonitorSupportDeviceGateway;
+import org.jetlinks.community.network.DefaultNetworkType;
+import org.jetlinks.community.network.NetworkType;
+import org.jetlinks.community.network.coap.gateway.session.CoapConnectionSession;
+import org.jetlinks.community.network.coap.server.MultiCoapServer;
+import org.jetlinks.community.network.utils.DeviceGatewayHelper;
+import org.jetlinks.core.ProtocolSupport;
+import org.jetlinks.core.device.AuthenticationResponse;
+import org.jetlinks.core.device.DeviceOperator;
+import org.jetlinks.core.device.DeviceRegistry;
+import org.jetlinks.core.message.CommonDeviceMessage;
+import org.jetlinks.core.message.CommonDeviceMessageReply;
+import org.jetlinks.core.message.DeviceMessage;
+import org.jetlinks.core.message.Message;
+import org.jetlinks.core.message.codec.FromDeviceMessageContext;
+import org.jetlinks.core.message.codec.coap.CoapExchangeMessage;
+import org.jetlinks.core.message.codec.DefaultTransport;
+import org.jetlinks.core.message.codec.Transport;
+import org.jetlinks.core.message.codec.coap.CoapResponseMessage;
+import org.jetlinks.core.server.session.DeviceSession;
+import org.jetlinks.core.server.session.DeviceSessionManager;
+import org.jetlinks.core.server.session.ReplaceableDeviceSession;
+import org.jetlinks.core.utils.IdUtils;
+import org.jetlinks.supports.server.DecodedClientMessageHandler;
+import org.reactivestreams.Publisher;
+import org.springframework.util.StringUtils;
+import reactor.core.Disposable;
+import reactor.core.publisher.EmitterProcessor;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler;
+import reactor.core.scheduler.Schedulers;
+import reactor.util.function.Tuple3;
+import reactor.util.function.Tuples;
+import java.nio.charset.Charset;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.function.Function;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName CoapServerDeviceGateway.java
+ * @Description TODO
+ * @createTime 2021年12月17日 10:08:00
+ */
+@AllArgsConstructor
+@Slf4j
+public class CoapServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGateway {
+    @Getter
+    private final String id;
+
+    private final DeviceRegistry registry;
+
+    private final DeviceSessionManager sessionManager;
+
+    private final MultiCoapServer coapServer;
+
+    private final DecodedClientMessageHandler messageHandler;
+
+    private final DeviceGatewayMonitor gatewayMonitor;
+
+    private final LongAdder counter = new LongAdder();
+
+    private final EmitterProcessor<Message> messageProcessor = EmitterProcessor.create(false);
+
+    private final FluxSink<Message> sink = messageProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
+
+    private final AtomicBoolean started = new AtomicBoolean();
+
+    private final Mono<ProtocolSupport> supportMono;
+
+    private Disposable disposable;
+
+    private final DeviceGatewayHelper helper;
+
+    @Override
+    public Transport getTransport() {
+        return DefaultTransport.CoAP;
+    }
+
+    @Override
+    public NetworkType getNetworkType() {
+        return DefaultNetworkType.COAP_SERVER;
+    }
+
+    @Override
+    public Flux<Message> onMessage() {
+        return messageProcessor;
+    }
+
+    @Override
+    public Mono<Void> startup() {
+        return Mono.fromRunnable(this::doStart);
+    }
+
+    private void doStart() {
+        if(started.compareAndSet(false,true)||disposable!=null){
+            return;
+        }
+        disposable = coapServer
+            .handleAuthRequest()
+            .filter(exchangeMessage -> {
+                if (!started.get()) {
+                    exchangeMessage.response(CoAP.ResponseCode.BAD_GATEWAY);
+                    gatewayMonitor.rejected();
+                }
+                return started.get();
+            })
+            .publishOn(Schedulers.parallel())
+            .flatMap(this::handleMessage)
+            .onErrorContinue((err, obj) -> log.error("处理Coap连接失败", err))
+            .subscriberContext(ReactiveLogger.start("network", coapServer.getId()))
+            .subscribe();
+
+    }
+
+    private Mono<Void> handleMessage(CoapExchangeMessage exchangeMessage) {
+        return  registry.getDevice(exchangeMessage.getDeviceId())
+            .switchIfEmpty(Mono.fromRunnable(exchangeMessage::reject)
+                .then(Mono.error(new NotFoundException("设备"+exchangeMessage.getDeviceId()+"不存在"))))
+            .filter(ignore->started.get())
+            .publishOn(Schedulers.parallel())
+            .doOnNext(operator->gatewayMonitor.receivedMessage())
+            .flatMap(operator->
+                this.decodeAndHandleMessage(operator,exchangeMessage)
+                    .doOnNext(ignore->{
+                            counter.increment();
+                            CoapConnectionSession newSession = new CoapConnectionSession(exchangeMessage.getDeviceId(),operator,DefaultTransport.CoAP,gatewayMonitor);
+                            DeviceSession session = sessionManager.getSession(operator.getDeviceId());
+                            if (null == session) {
+                                sessionManager.register(newSession);
+                            } else if (session instanceof ReplaceableDeviceSession) {
+                                ((ReplaceableDeviceSession) session).replaceWith(newSession);
+                            }else{
+                                sessionManager.register(newSession);
+                            }
+                            gatewayMonitor.connected();
+                            gatewayMonitor.totalConnection(counter.sum());
+                        }
+                    )
+            )
+            .doOnSuccess(s->{
+                exchangeMessage.response(CoAP.ResponseCode.CONTENT);
+                ;
+            })
+            .subscriberContext(ReactiveLogger.start("network", coapServer.getId()))
+            .then()
+            ;
+    }
+
+    private Mono<?> decodeAndHandleMessage(DeviceOperator operator, CoapExchangeMessage exchangeMessage) {
+        return operator
+            .getProtocol()
+            .flatMap(protocol -> protocol.getMessageCodec(getTransport()))
+            .flatMapMany(codec -> codec.decode(FromDeviceMessageContext.of(null, exchangeMessage, registry)))
+            .cast(DeviceMessage.class)
+            .flatMap(msg -> {
+                if (messageProcessor.hasDownstreams()) {
+                    sink.next(msg);
+                }
+                if (msg instanceof CommonDeviceMessage) {
+                    CommonDeviceMessage _msg = ((CommonDeviceMessage) msg);
+                    if (StringUtils.isEmpty(_msg.getDeviceId())) {
+                        _msg.setDeviceId(operator.getDeviceId());
+                    }
+                }
+                if (msg instanceof CommonDeviceMessageReply) {
+                    CommonDeviceMessageReply<?> _msg = ((CommonDeviceMessageReply<?>) msg);
+                    if (StringUtils.isEmpty(_msg.getDeviceId())) {
+                        _msg.setDeviceId(operator.getDeviceId());
+                    }
+                }
+                return handleMessage(operator, msg);
+            })
+            .then()
+            .doOnEach(ReactiveLogger.onError(err -> log.error("处理MQTT连接[{}]消息失败:{}", operator.getDeviceId(), exchangeMessage, err)))
+            .onErrorResume((err) -> Mono.empty());
+    }
+
+
+    private Mono<Void> handleMessage(DeviceOperator operator,DeviceMessage message) {
+        return messageHandler.handleMessage(operator,message).then();
+    }
+    @Override
+    public Mono<Void> pause() {
+        return Mono.fromRunnable(() -> started.set(false));
+    }
+
+    @Override
+    public Mono<Void> shutdown() {
+        return Mono.fromRunnable(() -> {
+            started.set(false);
+            if (disposable != null && !disposable.isDisposed()) {
+                disposable.dispose();
+            }
+            disposable = null;
+        });
+    }
+
+    @Override
+    public long totalConnection() {
+        return counter.sum();
+    }
+}

+ 56 - 0
jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/gateway/CoapServerDeviceGatewayProvider.java

@@ -0,0 +1,56 @@
+package org.jetlinks.community.network.coap.gateway;
+
+import lombok.AllArgsConstructor;
+import org.jetlinks.community.gateway.DeviceGateway;
+import org.jetlinks.community.gateway.supports.DeviceGatewayProperties;
+import org.jetlinks.community.gateway.supports.DeviceGatewayProvider;
+import org.jetlinks.community.network.DefaultNetworkType;
+import org.jetlinks.community.network.NetworkManager;
+import org.jetlinks.community.network.NetworkType;
+import org.jetlinks.core.ProtocolSupports;
+import org.jetlinks.core.device.DeviceRegistry;
+import org.jetlinks.core.server.session.DeviceSessionManager;
+import org.jetlinks.supports.server.DecodedClientMessageHandler;
+import org.springframework.stereotype.Component;
+import reactor.core.publisher.Mono;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName CoapServerDeviceGatewayProvider.java
+ * @Description coap服务器网关
+ * @createTime 2021年12月17日 10:03:00
+ */
+@Component
+@AllArgsConstructor
+public class CoapServerDeviceGatewayProvider implements DeviceGatewayProvider {
+    private final NetworkManager networkManager;
+
+    private final DeviceRegistry registry;
+
+    private final DeviceSessionManager sessionManager;
+
+    private final DecodedClientMessageHandler messageHandler;
+
+    private final ProtocolSupports protocolSupports;
+
+    @Override
+    public String getId() {
+        return "coap-server-gataway";
+    }
+
+    @Override
+    public String getName() {
+        return "Coap服务器接入";
+    }
+
+    @Override
+    public NetworkType getNetworkType() {
+        return DefaultNetworkType.COAP_SERVER;
+    }
+
+    @Override
+    public Mono<DeviceGateway> createDeviceGateway(DeviceGatewayProperties properties) {
+        return null;
+    }
+}

+ 95 - 0
jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/gateway/session/CoapConnectionSession.java

@@ -0,0 +1,95 @@
+package org.jetlinks.community.network.coap.gateway.session;
+
+import lombok.Getter;
+import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor;
+import org.jetlinks.core.device.DeviceOperator;
+import org.jetlinks.core.message.codec.DefaultTransport;
+import org.jetlinks.core.message.codec.EncodedMessage;
+import org.jetlinks.core.message.codec.Transport;
+import org.jetlinks.core.server.session.DeviceSession;
+import org.jetlinks.core.server.session.ReplaceableDeviceSession;
+import reactor.core.publisher.Mono;
+
+import javax.annotation.Nullable;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName CoapSession.java
+ * @Description TODO
+ * @createTime 2021年12月17日 11:52:00
+ */
+public class CoapConnectionSession implements DeviceSession, ReplaceableDeviceSession {
+
+    @Getter
+    private final String id;
+
+
+    @Getter
+    private final DeviceOperator operator;
+
+    @Getter
+    private final String deviceId;
+    @Getter
+    private final Transport transport;
+
+    private final DeviceGatewayMonitor monitor;
+
+    @Getter
+    private long connectTime = System.currentTimeMillis();
+    public CoapConnectionSession(
+                                 String deviceId,
+                                 DeviceOperator operator,
+                                 Transport transport,
+                                 DeviceGatewayMonitor monitor) {
+        this.id = deviceId;
+        this.deviceId=deviceId;
+        this.operator = operator;
+        this.monitor = monitor;
+        this.transport=transport;
+    }
+
+
+    @Override
+    public long lastPingTime() {
+        return -1;
+    }
+
+    @Override
+    public long connectTime() {
+        return connectTime;
+    }
+
+    @Override
+    public Mono<Boolean> send(EncodedMessage encodedMessage) {
+        throw new UnsupportedOperationException("Coap暂不支持下行操作");
+//        return null;
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public void ping() {
+
+    }
+
+    @Override
+    public boolean isAlive() {
+        return true;
+    }
+
+    @Override
+    public void onClose(Runnable call) {
+
+    }
+
+    @Override
+    public void replaceWith(DeviceSession session) {
+        if(session instanceof  CoapConnectionSession){
+            this.connectTime=((CoapConnectionSession) session).getConnectTime();
+        }
+    }
+}

+ 85 - 0
jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/resources/AbstractCoapResource.java

@@ -0,0 +1,85 @@
+package org.jetlinks.community.network.coap.resources;
+
+import cn.hutool.core.util.StrUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.californium.core.CoapResource;
+import org.eclipse.californium.core.coap.CoAP;
+import org.eclipse.californium.core.coap.Response;
+import org.eclipse.californium.core.network.Exchange;
+import org.eclipse.californium.core.server.resources.Resource;
+import org.jetlinks.core.message.codec.coap.CoapExchangeMessage;
+import org.jetlinks.core.message.codec.coap.DefaultCoapExchange;
+import org.springframework.util.AntPathMatcher;
+import reactor.core.publisher.EmitterProcessor;
+import reactor.core.publisher.Flux;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName AbstractCoapResource.java
+ * @Description TODO
+ * @createTime 2021年12月16日 16:27:00
+ */
+@Slf4j
+public abstract class AbstractCoapResource extends CoapResource {
+    private final EmitterProcessor<CoapExchangeMessage> processor;
+
+    public static final String prefixTopicName="/{productId}/{deviceId}/%s";
+
+    public static final AntPathMatcher matcher = new AntPathMatcher(File.separator);
+    public AbstractCoapResource(EmitterProcessor<CoapExchangeMessage> processor,String name) {
+        super(name);
+        this.processor=processor;
+    }
+
+    @Override
+    public void handleRequest(Exchange exchange) {
+        CoAP.Code code = exchange.getRequest().getCode();
+        if (code.equals(CoAP.Code.POST)) {
+            handlePOST(new DefaultCoapExchange(exchange, this));
+        }else {
+            exchange.sendResponse(new Response(CoAP.ResponseCode.METHOD_NOT_ALLOWED));
+        }
+    }
+
+    public AbstractCoapResource(EmitterProcessor<CoapExchangeMessage> processor, String name, boolean visible) {
+        super(name, visible);
+        this.processor=processor;
+    }
+
+    public void handlePOST(DefaultCoapExchange exchange) {
+        String result = exchange.getRequestText();
+        Map<String, String> map = matcher.extractUriTemplateVariables(this.getName(), this.getPath());
+        exchange.setDeviceId(String.valueOf(map.get("deviceId")));
+        exchange.setProductId(String.valueOf(map.get("productId")));
+        Map<String, String> elseMap = new HashMap<>();
+        map.forEach((k,v)->{
+            if(!"deviceId".equals(k)&&!"productId".equals(k)){
+                elseMap.put(k,v);
+            }
+        });
+        exchange.setElseParams(elseMap);
+        if (StrUtil.isNullOrUndefined(exchange.getDeviceId())
+            || StrUtil.isNullOrUndefined(exchange.getProductId())) {
+            log.warn("Coap连接,deviceId:[{}],productId:[{}],不能为空",exchange.getDeviceId(),exchange.getProductId());
+            exchange.reject();
+        }
+        if(processor.hasDownstreams()){
+            processor.onNext(new CoapExchangeMessage(exchange));
+        }
+    }
+
+    @Override
+    public Resource getChild(String name) {
+        return super.getChild(name);
+    }
+
+    public Flux<CoapExchangeMessage> handleMessage(){
+        return processor.map(Function.identity());
+    }
+}

+ 23 - 0
jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/resources/CoordinateResource.java

@@ -0,0 +1,23 @@
+package org.jetlinks.community.network.coap.resources;
+
+import org.jetlinks.core.message.codec.coap.CoapExchangeMessage;
+import reactor.core.publisher.EmitterProcessor;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName EventResouce.java
+ * @Description 固件更新进程
+ * @createTime 2021年12月16日 16:24:00
+ */
+public class CoordinateResource extends AbstractCoapResource {
+
+    public CoordinateResource(EmitterProcessor<CoapExchangeMessage> processor) {
+        super(processor, String.format(prefixTopicName,"/coordinate"));
+    }
+
+    public CoordinateResource(EmitterProcessor<CoapExchangeMessage> processor, boolean visible) {
+        super(processor, String.format(prefixTopicName,"/coordinate"), visible);
+    }
+
+}

+ 23 - 0
jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/resources/DirectResource.java

@@ -0,0 +1,23 @@
+package org.jetlinks.community.network.coap.resources;
+
+import org.jetlinks.core.message.codec.coap.CoapExchangeMessage;
+import reactor.core.publisher.EmitterProcessor;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName EventResouce.java
+ * @Description 固件更新进程
+ * @createTime 2021年12月16日 16:24:00
+ */
+public class DirectResource extends AbstractCoapResource {
+
+    public DirectResource(EmitterProcessor<CoapExchangeMessage> processor) {
+        super(processor, String.format(prefixTopicName,"/direct"));
+    }
+
+    public DirectResource(EmitterProcessor<CoapExchangeMessage> processor, boolean visible) {
+        super(processor, String.format(prefixTopicName,"/direct"), visible);
+    }
+
+}

+ 23 - 0
jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/resources/EventCoapResource.java

@@ -0,0 +1,23 @@
+package org.jetlinks.community.network.coap.resources;
+
+import org.jetlinks.core.message.codec.coap.CoapExchangeMessage;
+import reactor.core.publisher.EmitterProcessor;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName EventResouce.java
+ * @Description 事件上报资源
+ * @createTime 2021年12月16日 16:24:00
+ */
+public class EventCoapResource extends AbstractCoapResource {
+
+    public EventCoapResource(EmitterProcessor<CoapExchangeMessage> processor) {
+        super(processor, String.format(prefixTopicName,"/event/{eventId}"));
+    }
+
+    public EventCoapResource(EmitterProcessor<CoapExchangeMessage> processor,  boolean visible) {
+        super(processor,  String.format(prefixTopicName,"/event/{eventId}"), visible);
+    }
+
+}

+ 23 - 0
jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/resources/FirmwareReportResource.java

@@ -0,0 +1,23 @@
+package org.jetlinks.community.network.coap.resources;
+
+import org.jetlinks.core.message.codec.coap.CoapExchangeMessage;
+import reactor.core.publisher.EmitterProcessor;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName EventResouce.java
+ * @Description 固件更新进程
+ * @createTime 2021年12月16日 16:24:00
+ */
+public class FirmwareReportResource extends AbstractCoapResource {
+
+    public FirmwareReportResource(EmitterProcessor<CoapExchangeMessage> processor) {
+        super(processor, String.format(prefixTopicName,"/firmware/report"));
+    }
+
+    public FirmwareReportResource(EmitterProcessor<CoapExchangeMessage> processor, boolean visible) {
+        super(processor, String.format(prefixTopicName,"/firmware/report"), visible);
+    }
+
+}

+ 23 - 0
jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/resources/LogResource.java

@@ -0,0 +1,23 @@
+package org.jetlinks.community.network.coap.resources;
+
+import org.jetlinks.core.message.codec.coap.CoapExchangeMessage;
+import reactor.core.publisher.EmitterProcessor;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName EventResouce.java
+ * @Description 固件更新进程
+ * @createTime 2021年12月16日 16:24:00
+ */
+public class LogResource extends AbstractCoapResource {
+
+    public LogResource(EmitterProcessor<CoapExchangeMessage> processor) {
+        super(processor, String.format(prefixTopicName,"/log"));
+    }
+
+    public LogResource(EmitterProcessor<CoapExchangeMessage> processor, boolean visible) {
+        super(processor, String.format(prefixTopicName,"/log"), visible);
+    }
+
+}

+ 23 - 0
jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/resources/MatchAllResource.java

@@ -0,0 +1,23 @@
+package org.jetlinks.community.network.coap.resources;
+
+import org.jetlinks.core.message.codec.coap.CoapExchangeMessage;
+import reactor.core.publisher.EmitterProcessor;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName authResource.java
+ * @Description 授权资源
+ * @createTime 2021年12月16日 16:24:00
+ */
+public class MatchAllResource extends AbstractCoapResource {
+
+    public MatchAllResource(EmitterProcessor<CoapExchangeMessage> processor) {
+        super(processor, String.format(prefixTopicName,"/"));
+    }
+
+    public MatchAllResource(EmitterProcessor<CoapExchangeMessage> processor, boolean visible) {
+        super(processor, String.format(prefixTopicName,"/"), visible);
+    }
+
+}

+ 23 - 0
jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/resources/MetadataDerivedCoapResource.java

@@ -0,0 +1,23 @@
+package org.jetlinks.community.network.coap.resources;
+
+import org.jetlinks.core.message.codec.coap.CoapExchangeMessage;
+import reactor.core.publisher.EmitterProcessor;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName EventResouce.java
+ * @Description 事件上报资源
+ * @createTime 2021年12月16日 16:24:00
+ */
+public class MetadataDerivedCoapResource extends AbstractCoapResource {
+
+    public MetadataDerivedCoapResource(EmitterProcessor<CoapExchangeMessage> processor) {
+        super(processor, String.format(prefixTopicName,"/metadata/derived"));
+    }
+
+    public MetadataDerivedCoapResource(EmitterProcessor<CoapExchangeMessage> processor, boolean visible) {
+        super(processor,  String.format(prefixTopicName,"/metadata/derived"), visible);
+    }
+
+}

+ 23 - 0
jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/resources/PropertiesReportCoapResource.java

@@ -0,0 +1,23 @@
+package org.jetlinks.community.network.coap.resources;
+
+import org.jetlinks.core.message.codec.coap.CoapExchangeMessage;
+import reactor.core.publisher.EmitterProcessor;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName EventResouce.java
+ * @Description 事件上报资源
+ * @createTime 2021年12月16日 16:24:00
+ */
+public class PropertiesReportCoapResource extends AbstractCoapResource {
+
+    public PropertiesReportCoapResource(EmitterProcessor<CoapExchangeMessage> processor) {
+        super(processor, String.format(prefixTopicName,"/properties/report"));
+    }
+
+    public PropertiesReportCoapResource(EmitterProcessor<CoapExchangeMessage> processor, boolean visible) {
+        super(processor,  String.format(prefixTopicName,"/properties/report"), visible);
+    }
+
+}

+ 23 - 0
jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/resources/RegisterCoapResource.java

@@ -0,0 +1,23 @@
+package org.jetlinks.community.network.coap.resources;
+
+import org.jetlinks.core.message.codec.coap.CoapExchangeMessage;
+import reactor.core.publisher.EmitterProcessor;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName EventResouce.java
+ * @Description 子设备注册
+ * @createTime 2021年12月16日 16:24:00
+ */
+public class RegisterCoapResource extends AbstractCoapResource {
+
+    public RegisterCoapResource(EmitterProcessor<CoapExchangeMessage> processor) {
+        super(processor, String.format(prefixTopicName,"/child/{childDeviceId}/register"));
+    }
+
+    public RegisterCoapResource(EmitterProcessor<CoapExchangeMessage> processor, boolean visible) {
+        super(processor, String.format(prefixTopicName,"/child/{childDeviceId}/register"), visible);
+    }
+
+}

+ 23 - 0
jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/resources/TagResource.java

@@ -0,0 +1,23 @@
+package org.jetlinks.community.network.coap.resources;
+
+import org.jetlinks.core.message.codec.coap.CoapExchangeMessage;
+import reactor.core.publisher.EmitterProcessor;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName EventResouce.java
+ * @Description 固件更新进程
+ * @createTime 2021年12月16日 16:24:00
+ */
+public class TagResource extends AbstractCoapResource {
+
+    public TagResource(EmitterProcessor<CoapExchangeMessage> processor) {
+        super(processor, String.format(prefixTopicName,"/tag"));
+    }
+
+    public TagResource(EmitterProcessor<CoapExchangeMessage> processor, boolean visible) {
+        super(processor, String.format(prefixTopicName,"/tag"), visible);
+    }
+
+}

+ 23 - 0
jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/resources/TimeSyncResource.java

@@ -0,0 +1,23 @@
+package org.jetlinks.community.network.coap.resources;
+
+import org.jetlinks.core.message.codec.coap.CoapExchangeMessage;
+import reactor.core.publisher.EmitterProcessor;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName EventResouce.java
+ * @Description 固件更新进程
+ * @createTime 2021年12月16日 16:24:00
+ */
+public class TimeSyncResource extends AbstractCoapResource {
+
+    public TimeSyncResource(EmitterProcessor<CoapExchangeMessage> processor) {
+        super(processor, String.format(prefixTopicName,"/time-sync"));
+    }
+
+    public TimeSyncResource(EmitterProcessor<CoapExchangeMessage> processor, boolean visible) {
+        super(processor, String.format(prefixTopicName,"/time-sync"), visible);
+    }
+
+}

+ 23 - 0
jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/resources/UnRegisterCoapResource.java

@@ -0,0 +1,23 @@
+package org.jetlinks.community.network.coap.resources;
+
+import org.jetlinks.core.message.codec.coap.CoapExchangeMessage;
+import reactor.core.publisher.EmitterProcessor;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName EventResouce.java
+ * @Description 子设备注销
+ * @createTime 2021年12月16日 16:24:00
+ */
+public class UnRegisterCoapResource extends AbstractCoapResource {
+
+    public UnRegisterCoapResource(EmitterProcessor<CoapExchangeMessage> processor) {
+        super(processor, String.format(prefixTopicName,"/child/{childDeviceId}/unregister"));
+    }
+
+    public UnRegisterCoapResource(EmitterProcessor<CoapExchangeMessage> processor,boolean visible) {
+        super(processor, String.format(prefixTopicName,"/child/{childDeviceId}/unregister"), visible);
+    }
+
+}

+ 23 - 0
jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/resources/UpgradeProgressResource.java

@@ -0,0 +1,23 @@
+package org.jetlinks.community.network.coap.resources;
+
+import org.jetlinks.core.message.codec.coap.CoapExchangeMessage;
+import reactor.core.publisher.EmitterProcessor;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName EventResouce.java
+ * @Description 固件更新进程
+ * @createTime 2021年12月16日 16:24:00
+ */
+public class UpgradeProgressResource extends AbstractCoapResource {
+
+    public UpgradeProgressResource(EmitterProcessor<CoapExchangeMessage> processor) {
+        super(processor, String.format(prefixTopicName,"/firmware/upgrade/progress"));
+    }
+
+    public UpgradeProgressResource(EmitterProcessor<CoapExchangeMessage> processor, boolean visible) {
+        super(processor, String.format(prefixTopicName,"/firmware/upgrade/progress"), visible);
+    }
+
+}

+ 23 - 0
jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/resources/UpgradePullResource.java

@@ -0,0 +1,23 @@
+package org.jetlinks.community.network.coap.resources;
+
+import org.jetlinks.core.message.codec.coap.CoapExchangeMessage;
+import reactor.core.publisher.EmitterProcessor;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName EventResouce.java
+ * @Description 固件更新进程
+ * @createTime 2021年12月16日 16:24:00
+ */
+public class UpgradePullResource extends AbstractCoapResource {
+
+    public UpgradePullResource(EmitterProcessor<CoapExchangeMessage> processor) {
+        super(processor, String.format(prefixTopicName,"/firmware/pull"));
+    }
+
+    public UpgradePullResource(EmitterProcessor<CoapExchangeMessage> processor, boolean visible) {
+        super(processor, String.format(prefixTopicName,"/firmware/pull"), visible);
+    }
+
+}

+ 28 - 0
jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/server/CoapServerProperties.java

@@ -0,0 +1,28 @@
+package org.jetlinks.community.network.coap.server;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.eclipse.californium.core.network.config.NetworkConfig;
+import java.util.*;
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class CoapServerProperties {
+
+    private String id;
+
+    //服务实例数量(线程数)
+    private int instance = 4;
+
+    private String certId;
+
+    private boolean ssl;
+
+    private NetworkConfig options;
+
+    private Integer port;
+
+}

+ 34 - 0
jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/server/DefaultCoapResource.java

@@ -0,0 +1,34 @@
+package org.jetlinks.community.network.coap.server;
+
+import org.eclipse.californium.core.CoapResource;
+import org.eclipse.californium.core.server.resources.CoapExchange;
+import org.jetlinks.core.message.codec.coap.CoapExchangeMessage;
+import reactor.core.publisher.EmitterProcessor;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName DefaultCoapResource.java
+ * @Description  //仅支持post方法
+ * @createTime 2021年12月16日 10:33:00
+ */
+public class DefaultCoapResource extends CoapResource {
+
+    private EmitterProcessor<CoapExchangeMessage> processor;
+    public DefaultCoapResource(EmitterProcessor<CoapExchangeMessage> processor,String name) {
+        super(name);
+        this.processor=processor;
+    }
+
+    public DefaultCoapResource(String name, boolean visible) {
+        super(name, visible);
+    }
+
+
+    @Override
+    public void handlePOST(CoapExchange exchange) {
+        if(processor.hasDownstreams()){
+            processor.onNext(new CoapExchangeMessage(exchange));
+        }
+    }
+}

+ 89 - 0
jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/server/MultiCoapServer.java

@@ -0,0 +1,89 @@
+package org.jetlinks.community.network.coap.server;
+
+
+import cn.hutool.core.collection.CollectionUtil;
+import lombok.Getter;
+import org.eclipse.californium.core.CoapServer;
+import org.jetlinks.community.network.DefaultNetworkType;
+import org.jetlinks.community.network.Network;
+import org.jetlinks.community.network.NetworkType;
+import org.jetlinks.community.network.coap.resources.*;
+import org.jetlinks.core.message.codec.coap.CoapExchangeMessage;
+import reactor.core.publisher.EmitterProcessor;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
+
+import java.util.Collection;
+import java.util.function.Function;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName MultiCoapServer.java
+ * @Description TODO
+ * @createTime 2021年12月16日 08:57:00
+ */
+public class MultiCoapServer extends CoapServer implements Network {
+
+    private Collection<CoapServer> coapServers;
+    @Getter
+    private String id;
+    private final String topicFormat="/{productId}/{deviceId}/%s";
+    private final EmitterProcessor<CoapExchangeMessage> processor=EmitterProcessor.create();
+    private final FluxSink<CoapExchangeMessage> sink=processor.sink(FluxSink.OverflowStrategy.BUFFER);
+
+    public MultiCoapServer(String id){
+        super();
+        this.id=id;
+    }
+
+    public void setCoapServer( Collection<CoapServer> servers){
+        if (CollectionUtil.isNotEmpty(coapServers)) {
+            shutdown();
+        }
+        this.coapServers=servers;
+        for (org.eclipse.californium.core.CoapServer coapServer : this.coapServers) {
+            coapServer.start();
+        }
+
+    }
+
+    @Override
+    public NetworkType getType() {
+        return DefaultNetworkType.COAP_SERVER;
+    }
+
+    @Override
+    public void shutdown() {
+        if (CollectionUtil.isNotEmpty(coapServers)) {
+            for (org.eclipse.californium.core.CoapServer coapServer : coapServers) {
+                coapServer.destroy();
+            }
+            coapServers.clear();
+        }
+    }
+
+    @Override
+    public boolean isAlive() {
+        return CollectionUtil.isNotEmpty(coapServers);
+    }
+
+    @Override
+    public boolean isAutoReload() {
+        return false;
+    }
+
+
+    public void initResoruces(){
+        coapServers.forEach(this::initResource);
+    }
+
+
+    public Flux<CoapExchangeMessage> handleAuthRequest(){
+        return processor.map(Function.identity());
+    }
+
+    private void initResource(CoapServer coapServer){
+        coapServer.add(new MatchAllResource(processor));
+    }
+}

+ 73 - 0
jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/server/MultiCoapServerProvider.java

@@ -0,0 +1,73 @@
+package org.jetlinks.community.network.coap.server;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.californium.core.CoapServer;
+import org.jetlinks.community.network.*;
+import org.jetlinks.community.network.security.CertificateManager;
+import org.jetlinks.core.message.codec.coap.CoapExchangeMessage;
+import org.jetlinks.core.metadata.ConfigMetadata;
+import org.springframework.stereotype.Component;
+import reactor.core.publisher.EmitterProcessor;
+import reactor.core.publisher.FluxSink;
+import reactor.core.publisher.Mono;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName VertxCoapServerProvider.java
+ * @Description TODO
+ * @createTime 2021年12月15日 16:27:00
+ */
+@Component
+@AllArgsConstructor
+@Slf4j
+public class MultiCoapServerProvider implements NetworkProvider<CoapServerProperties> {
+    private final CertificateManager certificateManager;
+
+    private final EmitterProcessor<CoapExchangeMessage> processor=EmitterProcessor.create();
+    private final FluxSink<CoapExchangeMessage> sink=processor.sink(FluxSink.OverflowStrategy.BUFFER);
+    @Nonnull
+    @Override
+    public NetworkType getType() {
+        return DefaultNetworkType.COAP_SERVER;
+    }
+
+    @Nonnull
+    @Override
+    public Network createNetwork(@Nonnull CoapServerProperties properties) {
+        MultiCoapServer coapServer = new MultiCoapServer(properties.getId());
+        initServer(coapServer,properties);
+        return coapServer;
+    }
+
+    private void initServer(MultiCoapServer coapServer, CoapServerProperties properties) {
+        ArrayList<CoapServer> servers = new ArrayList<>(properties.getInstance());
+        for (int i = 0; i < properties.getInstance(); i++) {
+            CoapServer coap = new CoapServer(properties.getOptions(),properties.getPort());
+            servers.add(coap);
+        }
+        coapServer.setCoapServer(servers);
+        coapServer.initResoruces();
+    }
+
+    @Override
+    public void reload(@Nonnull Network network, @Nonnull CoapServerProperties properties) {
+        initServer((MultiCoapServer) network, properties);
+    }
+
+    @Nullable
+    @Override
+    public ConfigMetadata getConfigMetadata() {
+        return null;
+    }
+
+    @Nonnull
+    @Override
+    public Mono<CoapServerProperties> createConfig(@Nonnull NetworkProperties properties) {
+        return null;
+    }
+}

+ 0 - 67
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/auth/MqttDefaultAuth.java

@@ -1,67 +0,0 @@
-package org.jetlinks.community.network.mqtt.auth;
-
-import cn.hutool.core.util.StrUtil;
-import org.apache.commons.codec.digest.DigestUtils;
-import org.hswebframework.web.exception.BusinessException;
-import org.jetlinks.core.ProtocolSupport;
-import org.jetlinks.core.Value;
-import org.jetlinks.core.device.*;
-import org.jetlinks.core.exception.DeviceOperationException;
-import org.jetlinks.core.message.codec.DefaultTransport;
-import org.jetlinks.core.message.codec.Transport;
-import org.jetlinks.supports.official.JetLinksAuthenticator;
-import reactor.core.publisher.Mono;
-import javax.annotation.Nonnull;
-
-import static org.jetlinks.core.enums.ErrorCode.UNSUPPORTED_MESSAGE;
-
-/**
- * @author lifang
- * @version 1.0.0
- * @ClassName MqttDefaultAut.java
- * @Description TODO
- * @createTime 2021年07月20日 14:52:00
- */
-public class MqttDefaultAuth extends JetLinksAuthenticator {
-    @Override
-    public Mono<AuthenticationResponse> authenticate(@Nonnull AuthenticationRequest request, @Nonnull DeviceOperator deviceOperation) {
-        if (request instanceof MqttAuthenticationRequest) {
-            MqttAuthenticationRequest mqtt = ((MqttAuthenticationRequest) request);
-            Transport transport = request.getTransport();
-            if(!transport.equals(DefaultTransport.MQTT) &&!transport.equals(DefaultTransport.MQTT_TLS)){
-                //非mqtt协议
-                return Mono.just(AuthenticationResponse.error(400, "设备不支持的连接协议类型:" + transport.getName()));
-            }
-            String username = mqtt.getUsername();
-            String password = mqtt.getPassword();
-
-            try {
-                return deviceOperation
-                    .getProduct()
-                    .flatMap(deviceProductOperator -> deviceProductOperator.getProtocol()
-                        .flatMap(protocolSupport -> protocolSupport.authenticate(request,deviceOperation)))
-                    .switchIfEmpty(Mono.error(()->new DeviceOperationException(UNSUPPORTED_MESSAGE)))
-                    .flatMap(ignore->deviceOperation
-                        .getConfigs("secureId", "secureKey")
-                        .map(conf -> {
-                            String secureId = conf.getValue("secureId").map(Value::asString).orElse(null);
-
-                            String secureKey = conf
-                                .getValue("secureKey")
-                                .map(Value::asString)
-                                .orElse(null);
-                            //签名
-                            if ((StrUtil.isEmpty(secureId)||username.equals(secureId))
-                                && (StrUtil.isEmpty(secureKey)||password.equals(secureKey))) {
-                                return AuthenticationResponse.success(deviceOperation.getDeviceId());
-                            } else {
-                                return AuthenticationResponse.error(401, "密钥错误");
-                            }
-                        }));
-            } catch (NumberFormatException e) {
-                return Mono.just(AuthenticationResponse.error(401, "用户名格式错误"));
-            }
-        }
-        return Mono.just(AuthenticationResponse.error(400, "不支持的授权类型:" + request));
-    }
-}

+ 2 - 50
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java

@@ -1,9 +1,6 @@
 package org.jetlinks.community.network.mqtt.gateway.device;
 package org.jetlinks.community.network.mqtt.gateway.device;
 
 
 import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
 import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
-import io.vertx.mqtt.MqttTopicSubscription;
-import io.vertx.mqtt.messages.MqttSubscribeMessage;
-import io.vertx.mqtt.messages.MqttUnsubscribeMessage;
 import lombok.Getter;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
 import org.hswebframework.web.logger.ReactiveLogger;
 import org.hswebframework.web.logger.ReactiveLogger;
@@ -13,13 +10,11 @@ import org.jetlinks.community.gateway.monitor.GatewayMonitors;
 import org.jetlinks.community.gateway.monitor.MonitorSupportDeviceGateway;
 import org.jetlinks.community.gateway.monitor.MonitorSupportDeviceGateway;
 import org.jetlinks.community.network.DefaultNetworkType;
 import org.jetlinks.community.network.DefaultNetworkType;
 import org.jetlinks.community.network.NetworkType;
 import org.jetlinks.community.network.NetworkType;
-import org.jetlinks.community.network.mqtt.auth.MqttDefaultAuth;
 import org.jetlinks.community.network.mqtt.gateway.device.session.MqttConnectionSession;
 import org.jetlinks.community.network.mqtt.gateway.device.session.MqttConnectionSession;
 import org.jetlinks.community.network.mqtt.server.MqttConnection;
 import org.jetlinks.community.network.mqtt.server.MqttConnection;
 import org.jetlinks.community.network.mqtt.server.MqttServer;
 import org.jetlinks.community.network.mqtt.server.MqttServer;
 import org.jetlinks.community.network.utils.DeviceGatewayHelper;
 import org.jetlinks.community.network.utils.DeviceGatewayHelper;
 import org.jetlinks.core.ProtocolSupport;
 import org.jetlinks.core.ProtocolSupport;
-import org.jetlinks.core.defaults.Authenticator;
 import org.jetlinks.core.device.AuthenticationResponse;
 import org.jetlinks.core.device.AuthenticationResponse;
 import org.jetlinks.core.device.DeviceOperator;
 import org.jetlinks.core.device.DeviceOperator;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.device.DeviceRegistry;
@@ -42,12 +37,9 @@ import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 import reactor.core.scheduler.Schedulers;
 import reactor.util.function.Tuple3;
 import reactor.util.function.Tuple3;
 import reactor.util.function.Tuples;
 import reactor.util.function.Tuples;
-
-import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.function.Function;
 import java.util.function.Function;
-import java.util.stream.Collectors;
 
 
 @Slf4j
 @Slf4j
 class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGateway {
 class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGateway {
@@ -77,8 +69,6 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
 
 
     private Disposable disposable;
     private Disposable disposable;
 
 
-    private Authenticator authenticator=new MqttDefaultAuth();
-
     private final DeviceGatewayHelper helper;
     private final DeviceGatewayHelper helper;
 
 
     public MqttServerDeviceGateway(String id,
     public MqttServerDeviceGateway(String id,
@@ -119,8 +109,6 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
             .publishOn(Schedulers.parallel())
             .publishOn(Schedulers.parallel())
             .flatMap(this::handleConnection)
             .flatMap(this::handleConnection)
             .flatMap(tuple3 -> handleAuthResponse(tuple3.getT1(), tuple3.getT2(), tuple3.getT3()))
             .flatMap(tuple3 -> handleAuthResponse(tuple3.getT1(), tuple3.getT2(), tuple3.getT3()))
-//            .doOnNext(this::handleSubscriptionTopic)
-//            .doOnNext(this::handleUnSubscriptionTopic)
             .flatMap(tp -> handleAcceptedMqttConnection(tp.getT1(), tp.getT2(), tp.getT3()), Integer.MAX_VALUE)
             .flatMap(tp -> handleAcceptedMqttConnection(tp.getT1(), tp.getT2(), tp.getT3()), Integer.MAX_VALUE)
             .onErrorContinue((err, obj) -> log.error("处理MQTT连接失败", err))
             .onErrorContinue((err, obj) -> log.error("处理MQTT连接失败", err))
             .subscriberContext(ReactiveLogger.start("network", mqttServer.getId()))
             .subscriberContext(ReactiveLogger.start("network", mqttServer.getId()))
@@ -137,8 +125,8 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
                     .getPassword(), getTransport());
                     .getPassword(), getTransport());
                 return supportMono
                 return supportMono
                     //使用自定义协议来认证
                     //使用自定义协议来认证
-//                    .map(support -> support.authenticate(request, registry))
-                    .map(support -> authenticator.authenticate(request, registry))
+                    .map(support -> support.authenticate(request, registry))
+//                    .map(support -> authenticator.authenticate(request, registry))
                     .defaultIfEmpty(Mono.defer(() -> registry
                     .defaultIfEmpty(Mono.defer(() -> registry
                         .getDevice(connection.getClientId())
                         .getDevice(connection.getClientId())
                         .flatMap(device -> device.authenticate(request))))
                         .flatMap(device -> device.authenticate(request))))
@@ -194,14 +182,6 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
                             gatewayMonitor.disconnected();
                             gatewayMonitor.disconnected();
                             gatewayMonitor.totalConnection(counter.sum());
                             gatewayMonitor.totalConnection(counter.sum());
 
 
-
-                            /**
-                             * 清空订阅主题
-                             */
-                            registry.getDevice(conn.getClientId())
-                                .doOnNext(operator -> {
-                                    operator.getTopics().clear();
-                                }).subscribe();
                         });
                         });
                         return Tuples.of(connection.accept(), device, newSession);
                         return Tuples.of(connection.accept(), device, newSession);
                     } else {
                     } else {
@@ -220,34 +200,6 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
                 connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
                 connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
             }));
             }));
     }
     }
-    //处理已经建立连接的MQTT连接的主题订阅
-    private void handleSubscriptionTopic(Tuple3<MqttConnection, DeviceOperator, MqttConnectionSession> tuple3) {
-        tuple3.getT1()
-            .handleSubscribe(true)
-            .doOnNext(topic->{
-                MqttSubscribeMessage message = topic.getMessage();
-                Set<String> topics = message.topicSubscriptions().stream().map(MqttTopicSubscription::topicName).collect(Collectors.toSet());
-                tuple3.getT2().addTopics(topics);
-            })
-//            .flatMap(ignore->
-//                eventBus.publish(String.format("/dashboard/device/%s/changed/topics",
-//                    tuple3.getT2().getDeviceId()),new TimeSyncMessage()))
-            .subscribe();
-    }
-
-    //取消MQTT连接的主题订阅
-    private void handleUnSubscriptionTopic(Tuple3<MqttConnection, DeviceOperator, MqttConnectionSession> tuple3) {
-        tuple3.getT1()
-            .handleUnSubscribe(true)
-            .doOnNext(topic->{
-                MqttUnsubscribeMessage message = topic.getMessage();
-                tuple3.getT2().removeTopics(message.topics());
-            })
-//            .flatMap(ignore->
-//                eventBus.publish(String.format("/dashboard/device/%s/changed/topics",
-//                    tuple3.getT2().getDeviceId()),new TimeSyncMessage()))
-            .subscribe();
-    }
 
 
     //处理已经建立连接的MQTT连接
     //处理已经建立连接的MQTT连接
     private Mono<Void> handleAcceptedMqttConnection(MqttConnection connection, DeviceOperator operator, MqttConnectionSession session) {
     private Mono<Void> handleAcceptedMqttConnection(MqttConnection connection, DeviceOperator operator, MqttConnectionSession session) {

+ 0 - 2
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGatewayProvider.java

@@ -29,8 +29,6 @@ public class MqttServerDeviceGatewayProvider implements DeviceGatewayProvider {
     private final DecodedClientMessageHandler messageHandler;
     private final DecodedClientMessageHandler messageHandler;
 
 
     private final ProtocolSupports protocolSupports;
     private final ProtocolSupports protocolSupports;
-    @Autowired
-    private EventBus eventBus;
 
 
     public MqttServerDeviceGatewayProvider(NetworkManager networkManager,
     public MqttServerDeviceGatewayProvider(NetworkManager networkManager,
                                            DeviceRegistry registry,
                                            DeviceRegistry registry,

+ 0 - 2
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/MqttConnection.java

@@ -1,7 +1,6 @@
 package org.jetlinks.community.network.mqtt.server;
 package org.jetlinks.community.network.mqtt.server;
 
 
 import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
 import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
-import io.vertx.mqtt.MqttEndpoint;
 import org.jetlinks.core.message.codec.MqttMessage;
 import org.jetlinks.core.message.codec.MqttMessage;
 import org.jetlinks.core.server.mqtt.MqttAuth;
 import org.jetlinks.core.server.mqtt.MqttAuth;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Flux;
@@ -10,7 +9,6 @@ import reactor.core.publisher.Mono;
 import java.net.InetSocketAddress;
 import java.net.InetSocketAddress;
 import java.time.Duration;
 import java.time.Duration;
 import java.util.Optional;
 import java.util.Optional;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.function.Consumer;
 
 
 /**
 /**

+ 0 - 15
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttServer.java

@@ -98,19 +98,4 @@ public class VertxMqttServer implements MqttServer {
         }
         }
 
 
     }
     }
-
-
-    public static void main(String[] args) {
-        EmitterProcessor<String> connectionProcessor = EmitterProcessor.create(false);
-        FluxSink<String> sink = connectionProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
-
-
-        connectionProcessor.doOnNext(s->{
-            System.out.println(s);
-        }).subscribe();
-
-        for (int i = 0; i < 10; i++) {
-            sink.next(String.valueOf(i));
-        }
-    }
 }
 }

+ 8 - 0
jetlinks-components/network-component/network-core/pom.xml

@@ -35,6 +35,14 @@
             <version>1.64</version>
             <version>1.64</version>
         </dependency>
         </dependency>
 
 
+        <dependency>
+            <groupId>org.eclipse.californium</groupId>
+            <artifactId>californium-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.californium</groupId>
+            <artifactId>element-connector</artifactId>
+        </dependency>
         <dependency>
         <dependency>
             <groupId>io.vertx</groupId>
             <groupId>io.vertx</groupId>
             <artifactId>vertx-core</artifactId>
             <artifactId>vertx-core</artifactId>

+ 123 - 0
jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/support/JetLinksExtendCoapDTLSDeviceMessageCodec.java

@@ -0,0 +1,123 @@
+package org.jetlinks.community.support;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.eclipse.californium.core.coap.CoAP;
+import org.hswebframework.web.id.IDGenerator;
+import org.jetlinks.core.message.Message;
+import org.jetlinks.core.message.codec.*;
+import org.jetlinks.core.message.codec.coap.CoapExchangeMessage;
+import org.jetlinks.core.message.codec.coap.CoapMessage;
+import org.springframework.util.StringUtils;
+import reactor.core.publisher.Mono;
+
+import javax.annotation.Nonnull;
+import java.nio.charset.StandardCharsets;
+import java.util.function.Consumer;
+
+@Slf4j
+public class JetLinksExtendCoapDTLSDeviceMessageCodec extends JetlinksExtendTopicMessageCodec implements DeviceMessageCodec {
+
+    @Override
+    public Transport getSupportTransport() {
+        return DefaultTransport.CoAP_DTLS;
+    }
+
+
+    public Mono<? extends Message> decode(CoapMessage message, MessageDecodeContext context, Consumer<Object> response) {
+        return Mono.defer(() -> {
+            String path = message.getPath();
+            String sign = message.getStringOption(2110).orElse(null);
+            String token = message.getStringOption(2111).orElse(null);
+            String payload = message.getPayload().toString(StandardCharsets.UTF_8);
+            if ("/auth".equals(path)) {
+                //认证
+                return context.getDevice()
+                        .getConfig("secureKey")
+                        .flatMap(sk -> {
+                            String secureKey = sk.asString();
+                            if (!verifySign(secureKey, context.getDevice().getDeviceId(), payload, sign)) {
+                                response.accept(CoAP.ResponseCode.BAD_REQUEST);
+                                return Mono.empty();
+                            }
+                            String newToken = IDGenerator.MD5.generate();
+                            return context.getDevice()
+                                    .setConfig("coap-token", newToken)
+                                    .doOnSuccess(success -> {
+                                        JSONObject json = new JSONObject();
+                                        json.put("token", newToken);
+                                        response.accept(json.toJSONString());
+                                    });
+                        })
+                        .then(Mono.empty());
+            }
+            if (StringUtils.isEmpty(token)) {
+                response.accept(CoAP.ResponseCode.UNAUTHORIZED);
+                return Mono.empty();
+            }
+            return context.getDevice()
+                    .getConfig("coap-token")
+                    .switchIfEmpty(Mono.fromRunnable(() -> {
+                        response.accept(CoAP.ResponseCode.UNAUTHORIZED);
+                    }))
+                    .flatMap(value -> {
+                        String tk = value.asString();
+                        if (!token.equals(tk)) {
+                            response.accept(CoAP.ResponseCode.UNAUTHORIZED);
+                            return Mono.empty();
+                        }
+                        return Mono
+                                .just(decode(path, JSON.parseObject(payload)).getMessage())
+                                .switchIfEmpty(Mono.fromRunnable(() -> response.accept(CoAP.ResponseCode.BAD_REQUEST)));
+                    })
+                    .doOnSuccess(msg -> {
+                        response.accept(CoAP.ResponseCode.CREATED);
+                    })
+                    .doOnError(error -> {
+                        log.error("decode coap message error", error);
+                        response.accept(CoAP.ResponseCode.BAD_REQUEST);
+                    });
+        });
+
+    }
+
+    @Nonnull
+    @Override
+    public Mono<? extends Message> decode(@Nonnull MessageDecodeContext context) {
+        if (context.getMessage() instanceof CoapExchangeMessage) {
+            CoapExchangeMessage exchangeMessage = ((CoapExchangeMessage) context.getMessage());
+            return decode(exchangeMessage, context, resp -> {
+                if (resp instanceof CoAP.ResponseCode) {
+                    exchangeMessage.getExchange().respond(((CoAP.ResponseCode) resp));
+                }
+                if (resp instanceof String) {
+                    exchangeMessage.getExchange().respond(((String) resp));
+                }
+            });
+        }
+        if (context.getMessage() instanceof CoapMessage) {
+            return decode(((CoapMessage) context.getMessage()), context, resp -> {
+                log.info("skip response coap request:{}", resp);
+            });
+        }
+
+        return Mono.empty();
+    }
+
+    protected boolean verifySign(String secureKey, String deviceId, String payload, String sign) {
+        //验证签名
+        if (StringUtils.isEmpty(secureKey) || !DigestUtils.md5Hex(payload.concat(secureKey)).equalsIgnoreCase(sign)) {
+            log.info("device [{}] coap sign [{}] error, payload:{}", deviceId, sign, payload);
+            return false;
+        }
+        return true;
+    }
+
+    @Nonnull
+    @Override
+    public Mono<? extends EncodedMessage> encode(@Nonnull MessageEncodeContext context) {
+        return Mono.empty();
+    }
+}

+ 87 - 0
jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/support/JetLinksExtendCoapDeviceMessageCodec.java

@@ -0,0 +1,87 @@
+package org.jetlinks.community.support;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import io.netty.buffer.ByteBuf;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.californium.core.coap.CoAP;
+import org.eclipse.californium.core.server.resources.CoapExchange;
+import org.jetlinks.core.Value;
+import org.jetlinks.core.message.Message;
+import org.jetlinks.core.message.codec.*;
+import org.jetlinks.core.message.codec.coap.CoapExchangeMessage;
+import org.jetlinks.core.message.codec.coap.CoapMessage;
+import org.jetlinks.supports.official.cipher.Ciphers;
+import reactor.core.publisher.Mono;
+
+import javax.annotation.Nonnull;
+
+@Slf4j
+public class JetLinksExtendCoapDeviceMessageCodec extends JetlinksExtendTopicMessageCodec implements DeviceMessageCodec {
+
+
+    @Override
+    public Transport getSupportTransport() {
+        return DefaultTransport.CoAP;
+    }
+
+    protected JSONObject decode(String text) {
+        return JSON.parseObject(text);
+    }
+
+    protected Mono<? extends Message> decode(CoapMessage message, MessageDecodeContext context) {
+        String path = message.getPath();
+        return context
+                .getDevice()
+                .getConfigs("encAlg", "secureKey")
+                .flatMap(configs -> {
+                    Ciphers ciphers = configs.getValue("encAlg").map(Value::asString).flatMap(Ciphers::of).orElse(Ciphers.AES);
+                    String secureKey = configs.getValue("secureKey").map(Value::asString).orElse(null);
+                    ByteBuf byteBuf = message.getPayload();
+                    byte[] req = new byte[byteBuf.readableBytes()];
+                    byteBuf.readBytes(req);
+                    byteBuf.resetReaderIndex();
+                    String payload = new String(ciphers.decrypt(req, secureKey));
+                    //解码
+                    return Mono.just(decode(path, decode(payload)).getMessage());
+                });
+    }
+
+    protected Mono<? extends Message> decode(CoapExchangeMessage message, MessageDecodeContext context) {
+        CoapExchange exchange = message.getExchange();
+        return decode((CoapMessage) message, context)
+                .doOnSuccess(msg -> {
+                    exchange.respond(CoAP.ResponseCode.CREATED);
+                    exchange.accept();
+                })
+                .switchIfEmpty(Mono.fromRunnable(() -> {
+                    exchange.respond(CoAP.ResponseCode.BAD_REQUEST);
+                }))
+                .doOnError(error -> {
+                    log.error("decode coap message error", error);
+                    exchange.respond(CoAP.ResponseCode.BAD_REQUEST);
+                });
+    }
+
+    @Nonnull
+    @Override
+    public Mono<? extends Message> decode(@Nonnull MessageDecodeContext context) {
+        return Mono.defer(() -> {
+            log.debug("handle coap message:\n{}", context.getMessage());
+            if (context.getMessage() instanceof CoapExchangeMessage) {
+                return decode(((CoapExchangeMessage) context.getMessage()), context);
+            }
+            if (context.getMessage() instanceof CoapMessage) {
+                return decode(((CoapMessage) context.getMessage()), context);
+            }
+
+            return Mono.empty();
+        });
+    }
+
+    @Nonnull
+    @Override
+    public Mono<? extends EncodedMessage> encode(@Nonnull MessageEncodeContext context) {
+        return Mono.empty();
+    }
+}

+ 2 - 4
jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/support/JetLinksExtendProtocolSupportProvider.java

@@ -70,15 +70,13 @@ public class JetLinksExtendProtocolSupportProvider implements ProtocolSupportPro
             support.addMessageCodecSupport(new JetLinksExtendMqttDeviceMessageCodec(DefaultTransport.MQTT));
             support.addMessageCodecSupport(new JetLinksExtendMqttDeviceMessageCodec(DefaultTransport.MQTT));
             support.addMessageCodecSupport(new JetLinksExtendMqttDeviceMessageCodec(DefaultTransport.MQTT_TLS));
             support.addMessageCodecSupport(new JetLinksExtendMqttDeviceMessageCodec(DefaultTransport.MQTT_TLS));
 
 
-            support.addMessageCodecSupport(new JetLinksExtendMqttDeviceMessageCodec(DefaultTransport.INNER));
-
 //            support.addMessageCodecSupport(new JetLinksExtendMqttDeviceMessageCodec(DefaultTransport.TCP));
 //            support.addMessageCodecSupport(new JetLinksExtendMqttDeviceMessageCodec(DefaultTransport.TCP));
 //            support.addMessageCodecSupport(new JetLinksExtendMqttDeviceMessageCodec(DefaultTransport.TCP_TLS));
 //            support.addMessageCodecSupport(new JetLinksExtendMqttDeviceMessageCodec(DefaultTransport.TCP_TLS));
 
 
 
 
 
 
-            support.addMessageCodecSupport(new JetLinksCoapDeviceMessageCodec());
-            support.addMessageCodecSupport(new JetLinksCoapDTLSDeviceMessageCodec());
+            support.addMessageCodecSupport(new JetLinksExtendCoapDeviceMessageCodec());
+            support.addMessageCodecSupport(new JetLinksExtendCoapDTLSDeviceMessageCodec());
 
 
             return Mono.just(support);
             return Mono.just(support);
         });
         });

+ 1 - 0
jetlinks-components/network-component/pom.xml

@@ -15,6 +15,7 @@
         <module>network-core</module>
         <module>network-core</module>
         <module>mqtt-component</module>
         <module>mqtt-component</module>
         <module>tcp-component</module>
         <module>tcp-component</module>
+        <module>coap-component</module>
     </modules>
     </modules>
 
 
     <artifactId>network-component</artifactId>
     <artifactId>network-component</artifactId>

+ 1 - 0
jetlinks-core/src/main/java/org/jetlinks/core/message/codec/DeviceMessageDecoder.java

@@ -1,6 +1,7 @@
 package org.jetlinks.core.message.codec;
 package org.jetlinks.core.message.codec;
 
 
 import org.jetlinks.core.message.Message;
 import org.jetlinks.core.message.Message;
+import org.jetlinks.core.message.codec.coap.CoapExchangeMessage;
 import org.reactivestreams.Publisher;
 import org.reactivestreams.Publisher;
 
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nonnull;

+ 2 - 1
jetlinks-core/src/main/java/org/jetlinks/core/message/codec/EncodedMessage.java

@@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 import com.alibaba.fastjson.JSONObject;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
 import io.netty.buffer.ByteBufUtil;
+import org.jetlinks.core.message.codec.coap.CoapMessage;
 
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.annotation.Nullable;
@@ -13,7 +14,7 @@ import java.nio.charset.StandardCharsets;
 /**
 /**
  * 已编码的消息,通常为来自设备或者发向设备的原始报文.
  * 已编码的消息,通常为来自设备或者发向设备的原始报文.
  *
  *
- * @author zhouhao
+ * @author zhouhao0
  * @see MqttMessage
  * @see MqttMessage
  * @see CoapMessage
  * @see CoapMessage
  * @see org.jetlinks.core.message.codec.http.HttpExchangeMessage
  * @see org.jetlinks.core.message.codec.http.HttpExchangeMessage

+ 2 - 0
jetlinks-core/src/main/java/org/jetlinks/core/message/codec/MessageDecodeContext.java

@@ -1,6 +1,8 @@
 package org.jetlinks.core.message.codec;
 package org.jetlinks.core.message.codec;
 
 
 
 
+import org.jetlinks.core.message.codec.coap.CoapExchangeMessage;
+
 import javax.annotation.Nonnull;
 import javax.annotation.Nonnull;
 
 
 /**
 /**

+ 19 - 7
jetlinks-core/src/main/java/org/jetlinks/core/message/codec/CoapExchangeMessage.java → jetlinks-core/src/main/java/org/jetlinks/core/message/codec/coap/CoapExchangeMessage.java

@@ -1,4 +1,4 @@
-package org.jetlinks.core.message.codec;
+package org.jetlinks.core.message.codec.coap;
 
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.buffer.Unpooled;
@@ -6,7 +6,6 @@ import lombok.Getter;
 import org.eclipse.californium.core.coap.CoAP;
 import org.eclipse.californium.core.coap.CoAP;
 import org.eclipse.californium.core.coap.Option;
 import org.eclipse.californium.core.coap.Option;
 import org.eclipse.californium.core.coap.Response;
 import org.eclipse.californium.core.coap.Response;
-import org.eclipse.californium.core.server.resources.CoapExchange;
 import org.springframework.util.CollectionUtils;
 import org.springframework.util.CollectionUtils;
 
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nonnull;
@@ -19,16 +18,24 @@ import java.util.List;
 public class CoapExchangeMessage implements CoapMessage {
 public class CoapExchangeMessage implements CoapMessage {
 
 
     @Getter
     @Getter
-    protected CoapExchange exchange;
-
+    protected DefaultCoapExchange exchange;
+    @Getter
+    private final String deviceId;
+    @Getter
+    private final  String productId;
+    @Getter
+    private final String clientId;
     @Nonnull
     @Nonnull
     @Override
     @Override
     public CoAP.Code getCode() {
     public CoAP.Code getCode() {
         return exchange.getRequestCode();
         return exchange.getRequestCode();
     }
     }
 
 
-    public CoapExchangeMessage(CoapExchange exchange) {
+    public CoapExchangeMessage(DefaultCoapExchange exchange) {
         this.exchange = exchange;
         this.exchange = exchange;
+        this.deviceId=exchange.getDeviceId();
+        this.productId=exchange.getProductId();
+        this.clientId=productId+"|"+deviceId;
     }
     }
 
 
     static byte[] empty = new byte[0];
     static byte[] empty = new byte[0];
@@ -59,6 +66,11 @@ public class CoapExchangeMessage implements CoapMessage {
 
 
     }
     }
 
 
+    public void reject(){
+        if(exchange!=null){
+            exchange.reject();
+        }
+    }
     @Nonnull
     @Nonnull
     @Override
     @Override
     public ByteBuf getPayload() {
     public ByteBuf getPayload() {
@@ -83,7 +95,7 @@ public class CoapExchangeMessage implements CoapMessage {
     @Nonnull
     @Nonnull
     public List<Option> getOptions() {
     public List<Option> getOptions() {
         return exchange
         return exchange
-                .getRequestOptions()
-                .asSortedList();
+            .getRequestOptions()
+            .asSortedList();
     }
     }
 }
 }

+ 2 - 1
jetlinks-core/src/main/java/org/jetlinks/core/message/codec/CoapMessage.java → jetlinks-core/src/main/java/org/jetlinks/core/message/codec/coap/CoapMessage.java

@@ -1,9 +1,10 @@
-package org.jetlinks.core.message.codec;
+package org.jetlinks.core.message.codec.coap;
 
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
 import io.netty.buffer.ByteBufUtil;
 import org.apache.commons.codec.binary.Hex;
 import org.apache.commons.codec.binary.Hex;
 import org.eclipse.californium.core.coap.*;
 import org.eclipse.californium.core.coap.*;
+import org.jetlinks.core.message.codec.EncodedMessage;
 
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nonnull;
 import java.math.BigDecimal;
 import java.math.BigDecimal;

+ 2 - 1
jetlinks-core/src/main/java/org/jetlinks/core/message/codec/CoapResponseMessage.java → jetlinks-core/src/main/java/org/jetlinks/core/message/codec/coap/CoapResponseMessage.java

@@ -1,4 +1,4 @@
-package org.jetlinks.core.message.codec;
+package org.jetlinks.core.message.codec.coap;
 
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
 import io.netty.buffer.ByteBufUtil;
@@ -7,6 +7,7 @@ import org.eclipse.californium.core.CoapResponse;
 import org.eclipse.californium.core.coap.CoAP;
 import org.eclipse.californium.core.coap.CoAP;
 import org.eclipse.californium.core.coap.Option;
 import org.eclipse.californium.core.coap.Option;
 import org.eclipse.californium.core.coap.OptionNumberRegistry;
 import org.eclipse.californium.core.coap.OptionNumberRegistry;
+import org.jetlinks.core.message.codec.EncodedMessage;
 import org.springframework.util.CollectionUtils;
 import org.springframework.util.CollectionUtils;
 
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nonnull;

+ 62 - 0
jetlinks-core/src/main/java/org/jetlinks/core/message/codec/coap/DefaultCoapExchange.java

@@ -0,0 +1,62 @@
+package org.jetlinks.core.message.codec.coap;
+
+import org.eclipse.californium.core.CoapResource;
+import org.eclipse.californium.core.network.Exchange;
+import org.eclipse.californium.core.server.resources.CoapExchange;
+import java.util.*;
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName DefaultCoapExchange.java
+ * @Description TODO
+ * @createTime 2021年12月17日 11:20:00
+ */
+
+public class DefaultCoapExchange extends CoapExchange {
+    private String productId;
+    private String deviceId;
+    private String token;
+    private Map<String,String> elseParams;
+    /**
+     * Creates a new CoAP Exchange object for an exchange and resource.
+     *
+     * @param exchange The message exchange.
+     * @param resource The resource.
+     * @throws NullPointerException if any of the parameters is {@code null}.
+     */
+    public DefaultCoapExchange(Exchange exchange, CoapResource resource) {
+        super(exchange, resource);
+    }
+
+    public DefaultCoapExchange(Exchange exchange, CoapResource resource, String productId, String deviceId, Map<String, String> elseParams) {
+        super(exchange, resource);
+        this.productId = productId;
+        this.deviceId = deviceId;
+        this.elseParams = elseParams;
+    }
+
+
+    public String getProductId() {
+        return productId;
+    }
+
+    public void setProductId(String productId) {
+        this.productId = productId;
+    }
+
+    public String getDeviceId() {
+        return deviceId;
+    }
+
+    public void setDeviceId(String deviceId) {
+        this.deviceId = deviceId;
+    }
+
+    public Map<String, String> getElseParams() {
+        return elseParams;
+    }
+
+    public void setElseParams(Map<String, String> elseParams) {
+        this.elseParams = elseParams;
+    }
+}

+ 2 - 1
jetlinks-core/src/main/java/org/jetlinks/core/message/codec/DefaultCoapMessage.java → jetlinks-core/src/main/java/org/jetlinks/core/message/codec/coap/DefaultCoapMessage.java

@@ -1,10 +1,11 @@
-package org.jetlinks.core.message.codec;
+package org.jetlinks.core.message.codec.coap;
 
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.buffer.Unpooled;
 import lombok.*;
 import lombok.*;
 import org.eclipse.californium.core.coap.CoAP;
 import org.eclipse.californium.core.coap.CoAP;
 import org.eclipse.californium.core.coap.Option;
 import org.eclipse.californium.core.coap.Option;
+import org.jetlinks.core.message.codec.TextMessageParser;
 
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nonnull;
 import java.util.ArrayList;
 import java.util.ArrayList;

+ 2 - 1
jetlinks-core/src/main/java/org/jetlinks/core/message/codec/DefaultCoapResponseMessage.java → jetlinks-core/src/main/java/org/jetlinks/core/message/codec/coap/DefaultCoapResponseMessage.java

@@ -1,4 +1,4 @@
-package org.jetlinks.core.message.codec;
+package org.jetlinks.core.message.codec.coap;
 
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.buffer.Unpooled;
@@ -7,6 +7,7 @@ import lombok.Setter;
 import org.eclipse.californium.core.CoapResponse;
 import org.eclipse.californium.core.CoapResponse;
 import org.eclipse.californium.core.coap.CoAP;
 import org.eclipse.californium.core.coap.CoAP;
 import org.eclipse.californium.core.coap.Option;
 import org.eclipse.californium.core.coap.Option;
+import org.jetlinks.core.message.codec.TextMessageParser;
 
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nonnull;
 import java.util.ArrayList;
 import java.util.ArrayList;

+ 1 - 0
jetlinks-core/src/test/java/org/jetlinks/core/message/codec/DefaultCoapMessageTest.java

@@ -5,6 +5,7 @@ import org.eclipse.californium.core.coap.CoAP;
 import org.eclipse.californium.core.coap.MediaTypeRegistry;
 import org.eclipse.californium.core.coap.MediaTypeRegistry;
 import org.eclipse.californium.core.coap.Option;
 import org.eclipse.californium.core.coap.Option;
 import org.eclipse.californium.core.coap.OptionNumberRegistry;
 import org.eclipse.californium.core.coap.OptionNumberRegistry;
+import org.jetlinks.core.message.codec.coap.DefaultCoapMessage;
 import org.junit.Test;
 import org.junit.Test;
 
 
 import java.util.Objects;
 import java.util.Objects;

+ 1 - 0
jetlinks-core/src/test/java/org/jetlinks/core/message/codec/DefaultCoapResponseMessageTest.java

@@ -1,6 +1,7 @@
 package org.jetlinks.core.message.codec;
 package org.jetlinks.core.message.codec;
 
 
 import org.eclipse.californium.core.coap.CoAP;
 import org.eclipse.californium.core.coap.CoAP;
+import org.jetlinks.core.message.codec.coap.DefaultCoapResponseMessage;
 import org.junit.Test;
 import org.junit.Test;
 
 
 import static org.junit.Assert.*;
 import static org.junit.Assert.*;

+ 2 - 2
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/server/aliyun/AliBridgeServer.java

@@ -197,8 +197,6 @@ public class AliBridgeServer extends AbstractClusterUniqueTask<AliBridgeServer>
 
 
     public Mono<DefaultAliBridgeChannel> register(@NotNull String bridgeId,@NotNull String originalIdentity,@NotNull String productKey,@NotNull String deviceName,@NotNull String deviceSecret,@NotNull String productId,
     public Mono<DefaultAliBridgeChannel> register(@NotNull String bridgeId,@NotNull String originalIdentity,@NotNull String productKey,@NotNull String deviceName,@NotNull String deviceSecret,@NotNull String productId,
                                                   boolean broadcast) {
                                                   boolean broadcast) {
-        //注册设备信息
-        DefaultDeviceConfigManager.register(bridgeId,originalIdentity,productKey,deviceName,deviceSecret);
         DefaultUplinkChannelHandler uplinkChannelHandler = new DefaultUplinkChannelHandler(bridgeConfigManager, DefaultDeviceConfigManager.getInstance());
         DefaultUplinkChannelHandler uplinkChannelHandler = new DefaultUplinkChannelHandler(bridgeConfigManager, DefaultDeviceConfigManager.getInstance());
         DefaultAliBridgeChannel oldChannel = channelMap
         DefaultAliBridgeChannel oldChannel = channelMap
             .put(originalIdentity, new DefaultAliBridgeChannel(originalIdentity, productKey, deviceName, deviceSecret,productId, uplinkChannelHandler, deviceRegistry, eventBus));
             .put(originalIdentity, new DefaultAliBridgeChannel(originalIdentity, productKey, deviceName, deviceSecret,productId, uplinkChannelHandler, deviceRegistry, eventBus));
@@ -209,6 +207,8 @@ public class AliBridgeServer extends AbstractClusterUniqueTask<AliBridgeServer>
                 log.warn(e.getMessage());
                 log.warn(e.getMessage());
             }
             }
         }
         }
+        //注册设备信息
+        DefaultDeviceConfigManager.register(bridgeId,originalIdentity,productKey,deviceName,deviceSecret);
         if(broadcast){
         if(broadcast){
             getClusterOperationTopic()
             getClusterOperationTopic()
                 .publish(Mono.just(OperationMessage.builder()
                 .publish(Mono.just(OperationMessage.builder()

+ 0 - 9
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/web/AliBridgeDeviceController.java

@@ -1,24 +1,15 @@
 package org.jetlinks.community.bridge.web;
 package org.jetlinks.community.bridge.web;
 
 
-import io.swagger.v3.oas.annotations.Operation;
 import io.swagger.v3.oas.annotations.tags.Tag;
 import io.swagger.v3.oas.annotations.tags.Tag;
 import lombok.AllArgsConstructor;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
 import org.hswebframework.web.authorization.annotation.Authorize;
 import org.hswebframework.web.authorization.annotation.Authorize;
-import org.hswebframework.web.authorization.annotation.CreateAction;
-import org.hswebframework.web.authorization.annotation.DeleteAction;
 import org.hswebframework.web.authorization.annotation.Resource;
 import org.hswebframework.web.authorization.annotation.Resource;
 import org.hswebframework.web.crud.service.ReactiveCrudService;
 import org.hswebframework.web.crud.service.ReactiveCrudService;
 import org.hswebframework.web.crud.web.reactive.ReactiveServiceCrudController;
 import org.hswebframework.web.crud.web.reactive.ReactiveServiceCrudController;
-import org.hswebframework.web.exception.BusinessException;
 import org.jetlinks.community.bridge.entity.AliIotBridgeDeviceConfig;
 import org.jetlinks.community.bridge.entity.AliIotBridgeDeviceConfig;
-import org.jetlinks.community.bridge.entity.AliIotBridgeEntity;
-import org.jetlinks.community.bridge.server.aliyun.AliBridgeGateway;
 import org.jetlinks.community.bridge.service.AliBridgeDeviceService;
 import org.jetlinks.community.bridge.service.AliBridgeDeviceService;
-import org.jetlinks.community.bridge.service.AliBridgeService;
-import org.springframework.dao.DuplicateKeyException;
 import org.springframework.web.bind.annotation.*;
 import org.springframework.web.bind.annotation.*;
-import reactor.core.publisher.Mono;
 
 
 /**
 /**
  * @author lifang
  * @author lifang

+ 1 - 0
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/request/ProtocolDecodePayload.java

@@ -8,6 +8,7 @@ import org.jetlinks.core.ProtocolSupport;
 import org.jetlinks.core.device.DeviceOperator;
 import org.jetlinks.core.device.DeviceOperator;
 import org.jetlinks.core.message.Message;
 import org.jetlinks.core.message.Message;
 import org.jetlinks.core.message.codec.*;
 import org.jetlinks.core.message.codec.*;
+import org.jetlinks.core.message.codec.coap.DefaultCoapMessage;
 import org.jetlinks.core.server.session.DeviceSession;
 import org.jetlinks.core.server.session.DeviceSession;
 import org.jetlinks.rule.engine.executor.PayloadType;
 import org.jetlinks.rule.engine.executor.PayloadType;
 import org.reactivestreams.Publisher;
 import org.reactivestreams.Publisher;

+ 14 - 0
pom.xml

@@ -35,6 +35,7 @@
         <aliyun.iot.sdk.version>7.29.0</aliyun.iot.sdk.version>
         <aliyun.iot.sdk.version>7.29.0</aliyun.iot.sdk.version>
         <aliyun.bridge.sdk.version>2.4.1</aliyun.bridge.sdk.version>
         <aliyun.bridge.sdk.version>2.4.1</aliyun.bridge.sdk.version>
         <redisson.version>3.13.6</redisson.version>
         <redisson.version>3.13.6</redisson.version>
+        <californium.version>2.2.3</californium.version>
     </properties>
     </properties>
 
 
     <build>
     <build>
@@ -182,6 +183,19 @@
                 <version>${aliyun.sdk.version}</version>
                 <version>${aliyun.sdk.version}</version>
             </dependency>
             </dependency>
 
 
+
+            <dependency>
+                <groupId>org.eclipse.californium</groupId>
+                <artifactId>californium-core</artifactId>
+                <version>${californium.version}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>org.eclipse.californium</groupId>
+                <artifactId>element-connector</artifactId>
+                <version>${californium.version}</version>
+            </dependency>
+
             <dependency>
             <dependency>
                 <groupId>cn.hutool</groupId>
                 <groupId>cn.hutool</groupId>
                 <artifactId>hutool-all</artifactId>
                 <artifactId>hutool-all</artifactId>