浏览代码

fixed 集群网桥

18339543638 4 年之前
父节点
当前提交
64911851ea

+ 5 - 0
jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/utils/DeviceGatewayHelper.java

@@ -71,6 +71,11 @@ public class DeviceGatewayHelper {
             }
         }
         ChildrenDeviceSession deviceSession = sessionManager.getSession(deviceId, children.getDeviceId());
+        if (deviceSession != null) {
+            deviceSession.keepAlive();
+            applySessionKeepaliveTimeout(children, () -> null)
+                .accept(deviceSession);
+        }
         //子设备离线或者注销
         if (children instanceof DeviceOfflineMessage || children instanceof DeviceUnRegisterMessage) {
             //注销会话,这里子设备可能会收到多次离线消息

+ 12 - 0
jetlinks-core/src/main/java/org/jetlinks/core/rpc/ClusterRpcConnector.java

@@ -0,0 +1,12 @@
+package org.jetlinks.core.rpc;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName ClusterRpcConnector.java
+ * @Description 集群远程调用通信
+ * @createTime 2021年12月09日 10:18:00
+ */
+public class ClusterRpcConnector {
+
+}

+ 22 - 0
jetlinks-core/src/main/java/org/jetlinks/core/rpc/RpcRequest.java

@@ -0,0 +1,22 @@
+package org.jetlinks.core.rpc;
+
+import lombok.Data;
+
+import java.io.Serializable;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName RpcRequest.java
+ * @Description TODO
+ * @createTime 2021年12月09日 10:20:00
+ */
+@Data
+public class RpcRequest implements Serializable {
+    public static final long serializable=1L;
+    private String requestId;
+    private Object[] params;
+    private long timeout;
+    private Class<?> requestClass;
+    private String methodName;
+}

+ 17 - 10
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/server/aliyun/AliBridgeGateway.java

@@ -154,18 +154,25 @@ public class AliBridgeGateway{
             .flatMap(ignore->getServer(bridgeId).unRegister(originalIdentity)).then();
     }
 
-    public Mono<Void> initBridge(AliIotBridgeEntity bridgeEntity){
-        return handleClusterOperation(bridgeEntity.getNodeId(),
-            bridgeEntity.getId(),MessageType.init,bridgeEntity)
+    /**
+     * 初始化网桥,创建网桥实体类并启动
+     *
+     * @param resource
+     * @return
+     */
+    public Mono<Void> initBridge(AliIotBridgeEntity resource){
+        return handleClusterOperation(resource.getNodeId(),
+            resource.getId(),MessageType.init,resource)
             .filter(Boolean.FALSE::equals)
-            .flatMap(ignore->Mono.justOrEmpty(getServer(bridgeEntity.getId()))
+            .flatMap(ignore->Mono.justOrEmpty(getServer(resource.getId()))
                 .then()
-                .switchIfEmpty(AliBridgeServer.create(eventBus,registry,bridgeEntity,clusterId)
+                .switchIfEmpty(AliBridgeServer.create(eventBus,registry,resource,clusterId)
                     .doOnNext(server -> {
                         AliBridgeServer oldServer =
-                            this.bridgeMap.putIfAbsent(server.getId(), server);
-                        if(oldServer!=null){
-                            oldServer.stopBridge().subscribe();
+                            this.bridgeMap.put(server.getId(),server);
+                        if(!server.equals(oldServer)&&oldServer!=null){
+                                oldServer.stopBridge()
+                                    .concatWith(server.reconnect()).subscribe();
                         }
                     })
                     .doOnNext(server->{
@@ -178,8 +185,8 @@ public class AliBridgeGateway{
                     .then()
                     .onErrorResume(error->
                         Mono.error(()->{
-                            this.delBridgeServer(bridgeEntity.getNodeId(),
-                                bridgeEntity.getId());
+                            this.delBridgeServer(resource.getNodeId(),
+                                resource.getId());
                             return new BusinessException(error.getMessage());
                         })))
                 .then()).then();

+ 4 - 0
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/server/aliyun/AliBridgeServer.java

@@ -54,6 +54,7 @@ public class AliBridgeServer implements BridgeServer {
 
     private DefaultBridgeConfigManager bridgeConfigManager;
 
+
     private  DefaultBridgeBootstrap bootstrap;
 
     private final String clusterId;
@@ -75,6 +76,9 @@ public class AliBridgeServer implements BridgeServer {
             .thenReturn(aliBridgeServer);
     }
 
+    public Mono<Void> start(){
+        return Mono.fromRunnable(()->this.bootstrap.reconnectBridge());
+    }
     public Mono<Void> initBridge(AliIotBridgeEntity params){
         if(start.get()){
             return Mono.empty();

+ 1 - 1
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/web/AliBridgeServerController.java

@@ -93,7 +93,7 @@ public class AliBridgeServerController implements
                 bridgeService.findById(id)
                     .flatMap(bridge->
                         bridgeDeviceService.save(config)
-                            .concatWith(__ -> bridgeGateway.registerDevice(bridge.getNodeId(), bridge.getId(), config))
+                            .flatMap(ignore->bridgeGateway.registerDevice(bridge.getNodeId(), bridge.getId(), config))
                             .onErrorResume(error->Mono.error(new BusinessException(error.getMessage().contains("originalidentity")?"平台设备不可重复绑定":"阿里云设备不可重复绑定")))
                             .then())
             ).then();

+ 1 - 0
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/cluster/RedisHaManager.java

@@ -158,6 +158,7 @@ public class RedisHaManager implements HaManager {
                 //自己
 
                 if (currentServer().isSame(serverNode)) {
+                    currentServer().setLastKeepAlive(System.currentTimeMillis());
                     return;
                 }
                 if(currentServer().duplication(serverNode)){

+ 4 - 20
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/doc/SwaggerConfiguration.java

@@ -1,6 +1,5 @@
 package org.jetlinks.community.standalone.configuration.doc;
 
-
 import io.swagger.v3.oas.annotations.OpenAPIDefinition;
 import io.swagger.v3.oas.annotations.enums.SecuritySchemeIn;
 import io.swagger.v3.oas.annotations.enums.SecuritySchemeType;
@@ -20,6 +19,7 @@ import org.springframework.core.ResolvableType;
 import org.springframework.http.ResponseEntity;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
 import java.util.List;
@@ -29,8 +29,8 @@ import java.util.List;
     info = @Info(
         title = "物联网平台",
         description = "物联网平台接口文档",
-        contact = @Contact(name = "admin",url = "http://doc.jetlinks.cn/dev-guide/start.html"),
-        version = "1.0.0"
+        contact = @Contact(name = "admin",url = "https://github.com/jetlinks"),
+        version = "1.12.0-SNAPSHOT"
     )
 )
 @SecuritySchemes(
@@ -47,26 +47,10 @@ import java.util.List;
 @AutoConfigureBefore(SpringDocWebFluxConfiguration.class)
 public class SwaggerConfiguration {
 
-//    @Bean
-//    public Docket docker(){
-//        // 构造函数传入初始化规范,这是swagger2规范
-//        return new Docket(DocumentationType.SWAGGER_2)
-//            //apiInfo: 添加api详情信息,参数为ApiInfo类型的参数,这个参数包含了第二部分的所有信息比如标题、描述、版本之类的,开发中一般都会自定义这些信息
-////            .apiInfo(apiInfo())
-//            .groupName("lifang")
-//            //配置是否启用Swagger,如果是false,在浏览器将无法访问,默认是true
-//            .enable(true)
-//            .select()
-//            //apis: 添加过滤条件,
-////            .apis(RequestHandlerSelectors.basePackage("com.yichun.swagger_boot_demo.controller"))
-//            //paths: 这里是控制哪些路径的api会被显示出来,比如下方的参数就是除了/user以外的其它路径都会生成api文档
-//            .paths(Objects::nonNull)
-//            .build();
-//    }
+
     @Bean
     public ReturnTypeParser operationCustomizer() {
 
-
         return new ReturnTypeParser() {
             @Override
             public Type getReturnType(MethodParameter methodParameter) {

+ 0 - 1
pom.xml

@@ -31,7 +31,6 @@
         <reactor.ql.version>1.0.13</reactor.ql.version>
         <fastjson.version>1.2.75</fastjson.version>
         <hutool.version>5.7.16</hutool.version>
-        <jetlinks.version>1.1.7-SNAPSHOT</jetlinks.version>
         <aliyun.sdk.version>4.5.6</aliyun.sdk.version>
         <aliyun.iot.sdk.version>7.29.0</aliyun.iot.sdk.version>
         <aliyun.bridge.sdk.version>2.4.1</aliyun.bridge.sdk.version>