Bladeren bron

add coap网络组件

18339543638 4 jaren geleden
bovenliggende
commit
e9e6970a15
11 gewijzigde bestanden met toevoegingen van 121 en 86 verwijderingen
  1. 1 4
      jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/core/DefaultCoapServer.java
  2. 0 1
      jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/core/DefaultServerMessageDeliverer.java
  3. 4 6
      jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/gateway/CoapServerDeviceGateway.java
  4. 2 2
      jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/gateway/CoapServerDeviceGatewayProvider.java
  5. 6 7
      jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/resources/AbstractCoapResource.java
  6. 2 2
      jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/resources/MatchAllResource.java
  7. 3 6
      jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/server/CoapServerProperties.java
  8. 5 26
      jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/server/CoapServerProvider.java
  9. 29 30
      jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/server/NetWorkCoapServer.java
  10. 2 2
      jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/support/JetLinksExtendProtocolSupportProvider.java
  11. 67 0
      jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/support/MqttDefaultAuth.java

+ 1 - 4
jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/core/DefaultCoapServer.java

@@ -427,11 +427,8 @@ public class DefaultCoapServer implements ServerInterface {
         @Override
         public Resource getChild(String name) {
             Collection<Resource> children = super.getChildren();
-            Optional<Resource> result = children.stream().filter(pattern -> matcher.match(pattern.getName(), name)).findFirst();
+            Optional<Resource> result = children.stream().filter(pattern -> pattern instanceof MatchAllResource).findFirst();
             result.ifPresent(resource->resource.setPath(name));
-            if(result.isPresent()){
-                result = children.stream().filter(resource -> resource instanceof MatchAllResource).findFirst();
-            }
             return result.orElseGet(()->new CoapResource("noAllow"));
         }
     }

+ 0 - 1
jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/core/DefaultServerMessageDeliverer.java

@@ -32,7 +32,6 @@ import org.eclipse.californium.core.observe.ObserveRelation;
 import org.eclipse.californium.core.observe.ObservingEndpoint;
 import org.eclipse.californium.core.server.MessageDeliverer;
 import org.eclipse.californium.core.server.resources.Resource;
-import org.jetlinks.community.network.coap.resources.MatchAllResource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

+ 4 - 6
jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/gateway/CoapServerDeviceGateway.java

@@ -1,8 +1,6 @@
 package org.jetlinks.community.network.coap.gateway;
 
-import io.vertx.mqtt.MqttServer;
 import javassist.NotFoundException;
-import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.eclipse.californium.core.coap.CoAP;
@@ -14,7 +12,7 @@ import org.jetlinks.community.gateway.monitor.MonitorSupportDeviceGateway;
 import org.jetlinks.community.network.DefaultNetworkType;
 import org.jetlinks.community.network.NetworkType;
 import org.jetlinks.community.network.coap.gateway.session.CoapConnectionSession;
-import org.jetlinks.community.network.coap.server.MultiCoapServer;
+import org.jetlinks.community.network.coap.server.NetWorkCoapServer;
 import org.jetlinks.community.network.utils.DeviceGatewayHelper;
 import org.jetlinks.core.ProtocolSupport;
 import org.jetlinks.core.device.DeviceOperator;
@@ -57,7 +55,7 @@ public class CoapServerDeviceGateway implements DeviceGateway, MonitorSupportDev
 
     private final DeviceSessionManager sessionManager;
 
-    private final MultiCoapServer coapServer;
+    private final NetWorkCoapServer coapServer;
 
     private final DecodedClientMessageHandler messageHandler;
 
@@ -80,7 +78,7 @@ public class CoapServerDeviceGateway implements DeviceGateway, MonitorSupportDev
     public CoapServerDeviceGateway(String id,
                                    DeviceRegistry registry,
                                    DeviceSessionManager sessionManager,
-                                   MultiCoapServer coapServer,
+                                   NetWorkCoapServer coapServer,
                                    DecodedClientMessageHandler messageHandler,
                                    Mono<ProtocolSupport> customProtocol) {
 
@@ -114,7 +112,7 @@ public class CoapServerDeviceGateway implements DeviceGateway, MonitorSupportDev
     }
 
     private void doStart() {
-        if(started.compareAndSet(false,true)||disposable!=null){
+        if(!started.compareAndSet(false,true)||disposable!=null){
             return;
         }
         disposable = coapServer

+ 2 - 2
jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/gateway/CoapServerDeviceGatewayProvider.java

@@ -7,7 +7,7 @@ import org.jetlinks.community.gateway.supports.DeviceGatewayProvider;
 import org.jetlinks.community.network.DefaultNetworkType;
 import org.jetlinks.community.network.NetworkManager;
 import org.jetlinks.community.network.NetworkType;
-import org.jetlinks.community.network.coap.server.MultiCoapServer;
+import org.jetlinks.community.network.coap.server.NetWorkCoapServer;
 import org.jetlinks.core.ProtocolSupports;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.server.session.DeviceSessionManager;
@@ -53,7 +53,7 @@ public class CoapServerDeviceGatewayProvider implements DeviceGatewayProvider {
     @Override
     public Mono<DeviceGateway> createDeviceGateway(DeviceGatewayProperties properties) {
         return networkManager
-            .<MultiCoapServer>getNetwork(getNetworkType(), properties.getNetworkId())
+            .<NetWorkCoapServer>getNetwork(getNetworkType(), properties.getNetworkId())
             .map(coapServer -> new CoapServerDeviceGateway(
                 properties.getId(),
                 registry,

+ 6 - 7
jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/resources/AbstractCoapResource.java

@@ -29,11 +29,11 @@ import java.util.function.Function;
 public abstract class AbstractCoapResource extends CoapResource {
     private final EmitterProcessor<CoapExchangeMessage> processor;
 
-    public static final String prefixTopicName="/{productId}/{deviceId}/%s";
+    public static final String prefixTopicName="{productId}/{deviceId}";
 
     public static final AntPathMatcher matcher = new AntPathMatcher(File.separator);
-    public AbstractCoapResource(EmitterProcessor<CoapExchangeMessage> processor,String name) {
-        super(name);
+    public AbstractCoapResource(EmitterProcessor<CoapExchangeMessage> processor) {
+        super("/receive/all");
         this.processor=processor;
     }
 
@@ -47,14 +47,13 @@ public abstract class AbstractCoapResource extends CoapResource {
         }
     }
 
-    public AbstractCoapResource(EmitterProcessor<CoapExchangeMessage> processor, String name, boolean visible) {
-        super(name, visible);
+    public AbstractCoapResource(EmitterProcessor<CoapExchangeMessage> processor, boolean visible) {
+        super("receive/all", visible);
         this.processor=processor;
     }
 
     public void handlePOST(DefaultCoapExchange exchange) {
-        String result = exchange.getRequestText();
-        Map<String, String> map = matcher.extractUriTemplateVariables(this.getName(), this.getPath());
+        Map<String, String> map = matcher.extractUriTemplateVariables(prefixTopicName, this.getPath());
         exchange.setDeviceId(String.valueOf(map.get("deviceId")));
         exchange.setProductId(String.valueOf(map.get("productId")));
         Map<String, String> elseMap = new HashMap<>();

+ 2 - 2
jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/resources/MatchAllResource.java

@@ -13,11 +13,11 @@ import reactor.core.publisher.EmitterProcessor;
 public class MatchAllResource extends AbstractCoapResource {
 
     public MatchAllResource(EmitterProcessor<CoapExchangeMessage> processor) {
-        super(processor, String.format(prefixTopicName,"/"));
+        super(processor);
     }
 
     public MatchAllResource(EmitterProcessor<CoapExchangeMessage> processor, boolean visible) {
-        super(processor, String.format(prefixTopicName,"/"), visible);
+        super(processor, visible);
     }
 
 }

+ 3 - 6
jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/server/CoapServerProperties.java

@@ -14,14 +14,11 @@ public class CoapServerProperties {
 
     private String id;
 
-    //服务实例数量(线程数)
-    private int instance = 4;
-
-    private String certId;
+    private NetworkConfig options;
 
-    private boolean ssl;
+    private String address;
 
-    private NetworkConfig options;
+    private boolean enableDtls;
 
     private Integer port;
 

+ 5 - 26
jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/server/MultiCoapServerProvider.java → jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/server/CoapServerProvider.java

@@ -1,7 +1,6 @@
 package org.jetlinks.community.network.coap.server;
 
 import com.alibaba.fastjson.JSONObject;
-import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.eclipse.californium.core.CoapServer;
 import org.eclipse.californium.core.network.config.NetworkConfig;
@@ -12,7 +11,6 @@ import org.springframework.stereotype.Component;
 import reactor.core.publisher.Mono;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
-import java.util.ArrayList;
 
 /**
  * @author lifang
@@ -22,9 +20,8 @@ import java.util.ArrayList;
  * @createTime 2021年12月15日 16:27:00
  */
 @Component
-@AllArgsConstructor
 @Slf4j
-public class MultiCoapServerProvider implements NetworkProvider<CoapServerProperties> {
+public class CoapServerProvider implements NetworkProvider<CoapServerProperties> {
 
     @Nonnull
     @Override
@@ -35,24 +32,14 @@ public class MultiCoapServerProvider implements NetworkProvider<CoapServerProper
     @Nonnull
     @Override
     public Network createNetwork(@Nonnull CoapServerProperties properties) {
-        MultiCoapServer coapServer = new MultiCoapServer(properties.getId());
-        initServer(coapServer,properties);
-        return coapServer;
-    }
-
-    private void initServer(MultiCoapServer coapServer, CoapServerProperties properties) {
-        ArrayList<CoapServer> servers = new ArrayList<>(properties.getInstance());
-        for (int i = 0; i < properties.getInstance(); i++) {
-            CoapServer coap = new CoapServer(properties.getOptions(),properties.getPort());
-            servers.add(coap);
-        }
-        coapServer.setCoapServer(servers);
+        NetWorkCoapServer coapServer = new NetWorkCoapServer(properties,properties.getId());
         coapServer.initResoruces();
+        return coapServer;
     }
-
     @Override
     public void reload(@Nonnull Network network, @Nonnull CoapServerProperties properties) {
-        initServer((MultiCoapServer) network, properties);
+        NetWorkCoapServer coapServer= (NetWorkCoapServer) network;
+        coapServer.reload(properties);
     }
 
     @Nullable
@@ -70,14 +57,6 @@ public class MultiCoapServerProvider implements NetworkProvider<CoapServerProper
 
             config.setOptions(new JSONObject(properties.getConfigurations()).toJavaObject(NetworkConfig.class));
 
-//            if (config.isSsl()) {
-//                config.getOptions().setSsl(true);
-//                return certificateManager.getCertificate(config.getCertId())
-//                    .map(VertxKeyCertTrustOptions::new)
-//                    .doOnNext(config.getOptions()::setKeyCertOptions)
-//                    .doOnNext(config.getOptions()::setTrustOptions)
-//                    .thenReturn(config);
-//            }
             return Mono.just(config);
         });
     }

+ 29 - 30
jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/server/MultiCoapServer.java → jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/server/NetWorkCoapServer.java

@@ -1,53 +1,44 @@
 package org.jetlinks.community.network.coap.server;
 
 
-import cn.hutool.core.collection.CollectionUtil;
 import lombok.Getter;
 import org.eclipse.californium.core.CoapServer;
+import org.eclipse.californium.core.network.CoapEndpoint;
 import org.jetlinks.community.network.DefaultNetworkType;
 import org.jetlinks.community.network.Network;
 import org.jetlinks.community.network.NetworkType;
+import org.jetlinks.community.network.coap.core.DefaultCoapServer;
 import org.jetlinks.community.network.coap.resources.*;
 import org.jetlinks.core.message.codec.CoapExchangeMessage;
 import reactor.core.publisher.EmitterProcessor;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.FluxSink;
-
-import java.util.Collection;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 
 /**
  * @author lifang
  * @version 1.0.0
- * @ClassName MultiCoapServer.java
+ * @ClassName NetWorkCoapServer.java
  * @Description TODO
  * @createTime 2021年12月16日 08:57:00
  */
-public class MultiCoapServer extends CoapServer implements Network {
+public class NetWorkCoapServer extends CoapServer implements Network {
 
-    private Collection<CoapServer> coapServers;
+    private DefaultCoapServer coapServer;
+    private AtomicBoolean start=new AtomicBoolean(false);
+    private CoapServerProperties properties;
     @Getter
     private String id;
-    private final String topicFormat="/{productId}/{deviceId}/%s";
     private final EmitterProcessor<CoapExchangeMessage> processor=EmitterProcessor.create();
     private final FluxSink<CoapExchangeMessage> sink=processor.sink(FluxSink.OverflowStrategy.BUFFER);
 
-    public MultiCoapServer(String id){
+    public NetWorkCoapServer( CoapServerProperties properties,String id){
         super();
+        this.properties=properties;
         this.id=id;
     }
 
-    public void setCoapServer( Collection<CoapServer> servers){
-        if (CollectionUtil.isNotEmpty(coapServers)) {
-            shutdown();
-        }
-        this.coapServers=servers;
-        for (org.eclipse.californium.core.CoapServer coapServer : this.coapServers) {
-            coapServer.start();
-        }
-
-    }
-
     @Override
     public NetworkType getType() {
         return DefaultNetworkType.COAP_SERVER;
@@ -55,17 +46,15 @@ public class MultiCoapServer extends CoapServer implements Network {
 
     @Override
     public void shutdown() {
-        if (CollectionUtil.isNotEmpty(coapServers)) {
-            for (org.eclipse.californium.core.CoapServer coapServer : coapServers) {
-                coapServer.destroy();
-            }
-            coapServers.clear();
+        if (coapServer!=null) {
+            coapServer.destroy();
+            coapServer=null;
         }
     }
 
     @Override
     public boolean isAlive() {
-        return CollectionUtil.isNotEmpty(coapServers);
+        return coapServer!=null;
     }
 
     @Override
@@ -73,17 +62,27 @@ public class MultiCoapServer extends CoapServer implements Network {
         return false;
     }
 
+    public void reload(CoapServerProperties properties){
+        this.properties=properties;
+        this.start.set(false);
+        coapServer.destroy();
+        initResoruces();
+    }
 
     public void initResoruces(){
-        coapServers.forEach(this::initResource);
+        if(start.compareAndSet(false,true)){
+            coapServer=new DefaultCoapServer();
+            CoapEndpoint endpoint = new CoapEndpoint.Builder()
+                .setPort(properties.getPort())
+                .build();
+            coapServer.addEndpoint(endpoint);
+            coapServer.add(new MatchAllResource(processor));
+            coapServer.start();
+        }
     }
 
 
     public Flux<CoapExchangeMessage> handleAuthRequest(){
         return processor.map(Function.identity());
     }
-
-    private void initResource(CoapServer coapServer){
-        coapServer.add(new MatchAllResource(processor));
-    }
 }

+ 2 - 2
jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/support/JetLinksExtendProtocolSupportProvider.java

@@ -57,8 +57,8 @@ public class JetLinksExtendProtocolSupportProvider implements ProtocolSupportPro
             support.setName("JetLinks V2.0");
             support.setDescription("JetLinks Protocol Version 2.0");
 
-            support.addAuthenticator(DefaultTransport.MQTT, new JetLinksAuthenticator());
-            support.addAuthenticator(DefaultTransport.MQTT_TLS, new JetLinksAuthenticator());
+            support.addAuthenticator(DefaultTransport.MQTT, new MqttDefaultAuth());
+            support.addAuthenticator(DefaultTransport.MQTT_TLS, new MqttDefaultAuth());
 
             support.setMetadataCodec(new JetLinksDeviceMetadataCodec());
 

+ 67 - 0
jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/support/MqttDefaultAuth.java

@@ -0,0 +1,67 @@
+package org.jetlinks.community.support;
+
+import cn.hutool.core.util.StrUtil;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.hswebframework.web.exception.BusinessException;
+import org.jetlinks.core.ProtocolSupport;
+import org.jetlinks.core.Value;
+import org.jetlinks.core.device.*;
+import org.jetlinks.core.exception.DeviceOperationException;
+import org.jetlinks.core.message.codec.DefaultTransport;
+import org.jetlinks.core.message.codec.Transport;
+import org.jetlinks.supports.official.JetLinksAuthenticator;
+import reactor.core.publisher.Mono;
+import javax.annotation.Nonnull;
+
+import static org.jetlinks.core.enums.ErrorCode.UNSUPPORTED_MESSAGE;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName MqttDefaultAut.java
+ * @Description TODO
+ * @createTime 2021年07月20日 14:52:00
+ */
+public class MqttDefaultAuth extends JetLinksAuthenticator {
+    @Override
+    public Mono<AuthenticationResponse> authenticate(@Nonnull AuthenticationRequest request, @Nonnull DeviceOperator deviceOperation) {
+        if (request instanceof MqttAuthenticationRequest) {
+            MqttAuthenticationRequest mqtt = ((MqttAuthenticationRequest) request);
+            Transport transport = request.getTransport();
+            if(!transport.equals(DefaultTransport.MQTT) &&!transport.equals(DefaultTransport.MQTT_TLS)){
+                //非mqtt协议
+                return Mono.just(AuthenticationResponse.error(400, "设备不支持的连接协议类型:" + transport.getName()));
+            }
+            String username = mqtt.getUsername();
+            String password = mqtt.getPassword();
+
+            try {
+                return deviceOperation
+                    .getProduct()
+                    .flatMap(deviceProductOperator -> deviceProductOperator.getProtocol()
+                        .flatMap(protocolSupport -> protocolSupport.authenticate(request,deviceOperation)))
+                    .switchIfEmpty(Mono.error(()->new DeviceOperationException(UNSUPPORTED_MESSAGE)))
+                    .flatMap(ignore->deviceOperation
+                        .getConfigs("secureId", "secureKey")
+                        .map(conf -> {
+                            String secureId = conf.getValue("secureId").map(Value::asString).orElse(null);
+
+                            String secureKey = conf
+                                .getValue("secureKey")
+                                .map(Value::asString)
+                                .orElse(null);
+                            //签名
+                            if ((StrUtil.isEmpty(secureId)||username.equals(secureId))
+                                && (StrUtil.isEmpty(secureKey)||password.equals(secureKey))) {
+                                return AuthenticationResponse.success(deviceOperation.getDeviceId());
+                            } else {
+                                return AuthenticationResponse.error(401, "密钥错误");
+                            }
+                        }));
+            } catch (NumberFormatException e) {
+                return Mono.just(AuthenticationResponse.error(401, "用户名格式错误"));
+            }
+        }
+        return Mono.just(AuthenticationResponse.error(400, "不支持的授权类型:" + request));
+    }
+}