Sfoglia il codice sorgente

fixed 集群网桥

18339543638 4 anni fa
parent
commit
87393ec3f8

+ 98 - 98
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/core/DefaultHttp2MessageClientFactory.java

@@ -34,101 +34,101 @@ import org.slf4j.LoggerFactory;
  */
 public class DefaultHttp2MessageClientFactory {
 
-	private static final Logger logger = LoggerFactory.getLogger(DefaultHttp2MessageClientFactory.class);
+    private static final Logger logger = LoggerFactory.getLogger(DefaultHttp2MessageClientFactory.class);
 
-	private static ExecutorService reConnectExecutorService;
+    private static ExecutorService reConnectExecutorService;
 
     /**
      * key: bridgeId
      * value: client
      */
-	private static Map<String,MessageClient> clientMap =new ConcurrentHashMap<>();
+    private static Map<String,MessageClient> clientMap =new ConcurrentHashMap<>();
 
     private static Map<String,BridgeConnectionStatusHandler> handlerMap =new ConcurrentHashMap<>();
 
-	public static MessageClient initClient(DefaultBridgeBootstrap bridgeBootstrap,String productKey, String deviceName, String deviceSecret, String endpoint) {
-		return initClient(bridgeBootstrap,productKey, deviceName, deviceSecret, endpoint, false);
-	}
+    public static MessageClient initClient(DefaultBridgeBootstrap bridgeBootstrap,String productKey, String deviceName, String deviceSecret, String endpoint) {
+        return initClient(bridgeBootstrap,productKey, deviceName, deviceSecret, endpoint, false);
+    }
 
-	public static MessageClient initClient(DefaultBridgeBootstrap bridgeBootstrap,String productKey, String deviceName, String deviceSecret,
-										   String endpoint, boolean isReconnect) {
+    public static MessageClient initClient(DefaultBridgeBootstrap bridgeBootstrap,String productKey, String deviceName, String deviceSecret,
+                                           String endpoint, boolean isReconnect) {
         MessageClient client = clientMap.get(bridgeBootstrap.getId());
         if (client!= null && !isReconnect) {
-			return client;
-		}
-		Profile profile = Profile.getDeviceProfile(endpoint, productKey, deviceName, deviceSecret,
-				String.format(BridgeConfigConsts.CLIENT_ID_TEMPLATE, productKey, deviceName));
-		if (reConnectExecutorService == null) {
-			reConnectExecutorService = new ThreadPoolExecutor(profile.getCallbackThreadCorePoolSize(),
-				profile.getCallbackThreadMaximumPoolSize(), 60, TimeUnit.SECONDS,
-				new LinkedBlockingQueue<>(35000),
-				new ThreadFactoryBuilder().setDaemon(true).setNameFormat("iot-bridge-reconnect-%d").build(),
-				new ThreadPoolExecutor.AbortPolicy());
-		}
-		client = MessageClientFactory.messageClient(profile);
-		client.connect(new MessageCallback() {
-			@Override
-			public Action consume(MessageToken messageToken) {
-				Message message = messageToken.getMessage();
-				byte[] payload = message.getPayload();
-				String responseString = new String(payload, StandardCharsets.UTF_8);
-				logger.warn("Unexpected message [{}] received: {}", message.getTopic(), responseString);
-				return Action.CommitSuccess;
-			}
-		});
-		client.setConnectionCallback(new ConnectionCallback() {
-			@Override
-			public void onConnectionLost() {
-				logger.error("Connection to http2 gateway [{}] is lost", endpoint);
+            return client;
+        }
+        Profile profile = Profile.getDeviceProfile(endpoint, productKey, deviceName, deviceSecret,
+            String.format(BridgeConfigConsts.CLIENT_ID_TEMPLATE, productKey, deviceName));
+        if (reConnectExecutorService == null) {
+            reConnectExecutorService = new ThreadPoolExecutor(profile.getCallbackThreadCorePoolSize(),
+                profile.getCallbackThreadMaximumPoolSize(), 60, TimeUnit.SECONDS,
+                new LinkedBlockingQueue<>(35000),
+                new ThreadFactoryBuilder().setDaemon(true).setNameFormat("iot-bridge-reconnect-%d").build(),
+                new ThreadPoolExecutor.AbortPolicy());
+        }
+        client = MessageClientFactory.messageClient(profile);
+        client.connect(new MessageCallback() {
+            @Override
+            public Action consume(MessageToken messageToken) {
+                Message message = messageToken.getMessage();
+                byte[] payload = message.getPayload();
+                String responseString = new String(payload, StandardCharsets.UTF_8);
+                logger.warn("Unexpected message [{}] received: {}", message.getTopic(), responseString);
+                return Action.CommitSuccess;
+            }
+        });
+        client.setConnectionCallback(new ConnectionCallback() {
+            @Override
+            public void onConnectionLost() {
+                logger.error("Connection to http2 gateway [{}] is lost", endpoint);
                 BridgeConnectionStatusHandler bridgeConnectionStatusHandler = handlerMap.get(bridgeBootstrap.getId());
                 if (bridgeConnectionStatusHandler != null) {
-					bridgeConnectionStatusHandler.onConnectionLost();
-				}
-			}
-
-			@Override
-			public void onConnected(boolean isReconnected) {
-				String subDeviceConnectMode = ConfigFactory.getBridgeConfigManager().getSubDeviceConnectMode();
-				logger.info("Bridge onConnected, isReconnected={}, subDeviceConnectMode={}",
-					isReconnected, subDeviceConnectMode);
+                    bridgeConnectionStatusHandler.onConnectionLost();
+                }
+            }
+
+            @Override
+            public void onConnected(boolean isReconnected) {
+                String subDeviceConnectMode = ConfigFactory.getBridgeConfigManager().getSubDeviceConnectMode();
+                logger.info("Bridge onConnected, isReconnected={}, subDeviceConnectMode={}",
+                    isReconnected, subDeviceConnectMode);
                 BridgeConnectionStatusHandler bridgeConnectionStatusHandler = handlerMap.get(bridgeBootstrap.getId());
-				if (bridgeConnectionStatusHandler !=  null) {
-					bridgeConnectionStatusHandler.onConnected(isReconnected);
-				}
-				if (isReconnected) {
-					if (!BridgeConfigConsts.SUBDEVICE_CONNECT_MODE_VALUE_3.equals(subDeviceConnectMode)) {
-						reConnectAllSubDevices();
-					}
-				}
-			}
-		});
+                if (bridgeConnectionStatusHandler !=  null) {
+                    bridgeConnectionStatusHandler.onConnected(isReconnected);
+                }
+                if (isReconnected) {
+                    if (!BridgeConfigConsts.SUBDEVICE_CONNECT_MODE_VALUE_3.equals(subDeviceConnectMode)) {
+                        reConnectAllSubDevices();
+                    }
+                }
+            }
+        });
         clientMap.put(bridgeBootstrap.getId(),client);
-		return client;
-	}
-
-	private static void reConnectAllSubDevices() {
-		Collection<Session> sessions = SessionManagerFactory.getInstance().getSessions();
-		if (sessions.isEmpty()) {
-			return;
-		}
-		UplinkChannelHandler handler = new UplinkChannelHandler();
-		sessions.forEach(session -> {
-			try {
-				reConnectExecutorService.submit(() -> {
-					logger.info("Re-online device {}", session.getOriginalIdentity());
-					try {
-						handler.doOnline(session.getOriginalIdentity());
-					} catch (Exception e) {
-						logger.error("Unexpected exception occurred while Re-online device:" + session.getOriginalIdentity(), e);
-					}
-				});
-			} catch (RejectedExecutionException e) {
-				logger.error("Re-online task rejected, device:{}", session.getOriginalIdentity());
-			} catch (Throwable e) {
-				logger.error("Re-online unExpected exception, device:" + session.getOriginalIdentity(), e);
-			}
-		});
-	}
+        return client;
+    }
+
+    private static void reConnectAllSubDevices() {
+        Collection<Session> sessions = SessionManagerFactory.getInstance().getSessions();
+        if (sessions.isEmpty()) {
+            return;
+        }
+        UplinkChannelHandler handler = new UplinkChannelHandler();
+        sessions.forEach(session -> {
+            try {
+                reConnectExecutorService.submit(() -> {
+                    logger.info("Re-online device {}", session.getOriginalIdentity());
+                    try {
+                        handler.doOnline(session.getOriginalIdentity());
+                    } catch (Exception e) {
+                        logger.error("Unexpected exception occurred while Re-online device:" + session.getOriginalIdentity(), e);
+                    }
+                });
+            } catch (RejectedExecutionException e) {
+                logger.error("Re-online task rejected, device:{}", session.getOriginalIdentity());
+            } catch (Throwable e) {
+                logger.error("Re-online unExpected exception, device:" + session.getOriginalIdentity(), e);
+            }
+        });
+    }
 
     public static MessageClient getClient(String id) {
         return clientMap.get(id);
@@ -136,32 +136,32 @@ public class DefaultHttp2MessageClientFactory {
 
 
     public static MessageClient getClient(DefaultBridgeBootstrap bridgeBootstrap) {
-		return clientMap.get(bridgeBootstrap.getId());
-	}
+        return clientMap.get(bridgeBootstrap.getId());
+    }
 
-	public static void setClient(DefaultBridgeBootstrap bridgeBootstrap,MessageClient c) {
-		clientMap.put(bridgeBootstrap.getId(),c);
-	}
+    public static void setClient(DefaultBridgeBootstrap bridgeBootstrap,MessageClient c) {
+        clientMap.put(bridgeBootstrap.getId(),c);
+    }
 
-	public static void setReConnectExecutorService(ExecutorService executorService) {
-		reConnectExecutorService = executorService;
-	}
+    public static void setReConnectExecutorService(ExecutorService executorService) {
+        reConnectExecutorService = executorService;
+    }
 
-	public static void setBridgeConnectionStatusHandler(DefaultBridgeBootstrap bridgeBootstrap,BridgeConnectionStatusHandler handler) {
-		handlerMap.merge(bridgeBootstrap.getId(),handler,(o,n)->n);
-	}
+    public static void setBridgeConnectionStatusHandler(DefaultBridgeBootstrap bridgeBootstrap,BridgeConnectionStatusHandler handler) {
+        handlerMap.merge(bridgeBootstrap.getId(),handler,(o,n)->n);
+    }
 
-	public static void disconnect(DefaultBridgeBootstrap bridgeBootstrap) {
+    public static void disconnect(DefaultBridgeBootstrap bridgeBootstrap) {
         MessageClient client = clientMap.get(bridgeBootstrap.getId());
         if (client != null) {
-			client.disconnect();
-			client = null;
-		}
-	}
+            client.disconnect();
+            client = null;
+        }
+    }
 
-	public static boolean isConnected(DefaultBridgeBootstrap bridgeBootstrap) {
+    public static boolean isConnected(DefaultBridgeBootstrap bridgeBootstrap) {
         MessageClient client = clientMap.get(bridgeBootstrap.getId());
-		return client != null && client.isConnected();
-	}
+        return client != null && client.isConnected();
+    }
 
 }

+ 7 - 7
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/server/aliyun/AliBridgeGateway.java

@@ -65,8 +65,8 @@ public class AliBridgeGateway{
      * @param bridgeId 网桥id
      * @return
      */
-    public Mono<Void> reconnect(String bridgeId){
-        return lookUpServer(bridgeId).flatMap(AliBridgeServer::reconnect);
+    public Mono<Void> reconnect(String bridgeId,boolean broadcast){
+        return lookUpServer(bridgeId).flatMap(server -> server.reconnect(broadcast));
     }
 
     /**
@@ -78,13 +78,13 @@ public class AliBridgeGateway{
     public Mono<Void> initBridge(AliIotBridgeEntity resource,boolean broadcast){
         return lookUpServer(resource.getId())
             .then()
-            .switchIfEmpty(AliBridgeServerFactory.create(eventBus,registry,resource)
+            .switchIfEmpty(AliBridgeServerFactory.create(eventBus,registry,resource,broadcast)
                 .doOnNext(server -> {
                     AliBridgeServer oldServer =
                         this.bridgeMap.put(server.getId(),server);
                     if(!server.equals(oldServer)&&oldServer!=null){
-                        oldServer.pauseBridge()
-                            .concatWith(server.reconnect()).subscribe();
+                        oldServer.pauseBridge(broadcast)
+                            .concatWith(server.reconnect(broadcast)).subscribe();
                     }
                 })
                 .doOnNext(server->{
@@ -116,7 +116,7 @@ public class AliBridgeGateway{
     public Mono<Void> delBridgeServer(String bridgeId,boolean broadcast){
         return lookUpServer(bridgeId)
             .filter(Objects::nonNull)
-            .flatMap(AliBridgeServer::delBridge);
+            .flatMap(server -> server.delBridge(broadcast));
     }
 
 
@@ -164,6 +164,6 @@ public class AliBridgeGateway{
     }
 
     public Mono<Void> pauseBridge(String id,boolean broadcast) {
-        return lookUpServer(id).flatMap(AliBridgeServer::pauseBridge);
+        return lookUpServer(id).flatMap(server -> server.pauseBridge(broadcast));
     }
 }

+ 13 - 17
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/server/aliyun/AliBridgeServer.java

@@ -21,8 +21,6 @@ import org.jetlinks.community.bridge.entity.AliIotBridgeEntity;
 import org.jetlinks.community.bridge.enums.BridgeStatus;
 import org.jetlinks.community.bridge.message.AliBridgeMessage;
 import org.jetlinks.community.bridge.server.Channel;
-import org.jetlinks.community.bridge.service.AliBridgeDeviceService;
-import org.jetlinks.community.bridge.service.AliBridgeService;
 import org.jetlinks.core.cluster.ClusterManager;
 import org.jetlinks.core.cluster.ClusterUniqueTask;
 import org.jetlinks.core.device.DeviceOperator;
@@ -213,30 +211,30 @@ public class AliBridgeServer extends ClusterUniqueTask<AliBridgeServer> {
     }
 
 
-    public Mono<Void> pauseBridge() {
+    public Mono<Void> pauseBridge(boolean broadcast ) {
         return Mono.fromRunnable(()->{
             if(bootstrap!=null&&bootstrap.isBridgeConnected()){
                 bootstrap.disconnectBridge();
             }
-            changeStatus(BridgeStatus.stop);
+            changeStatus(BridgeStatus.stop,broadcast);
             log.info("网桥[{}]关闭",id);
         });
     }
 
 
-    public Mono<Void> delBridge() {
+    public Mono<Void> delBridge(boolean broadcast) {
         return Mono.fromRunnable(()->{
             if(bootstrap!=null&&bootstrap.isBridgeConnected()){
                 bootstrap.disconnectBridge();
                 bootstrap=null;
             }
-            changeStatus(BridgeStatus.del);
+            changeStatus(BridgeStatus.del,broadcast);
             log.info("网桥[{}]关闭",id);
         });
     }
 
 
-    public Mono<Void> reconnect() {
+    public Mono<Void> reconnect(boolean broadcast) {
         return  Mono.fromRunnable(()->{
             if(bootstrap!=null){
                 if(!isReplica()){
@@ -244,17 +242,19 @@ public class AliBridgeServer extends ClusterUniqueTask<AliBridgeServer> {
                     channelMap.values().forEach(Channel::online);
                 }
             }
-            changeStatus(BridgeStatus.running);
+            changeStatus(BridgeStatus.running,broadcast);
             log.info("网桥[{}]重启成功",id);
         });
     }
 
 
-    private void changeStatus(BridgeStatus status){
+    private void changeStatus(BridgeStatus status,boolean broadcast){
         this.status=status;
         if(isReplica()){
-            getClusterOperationTopic()
-                .publish(Mono.just(status)).subscribe();
+            if(broadcast){
+                getClusterOperationTopic()
+                    .publish(Mono.just(status)).subscribe();
+            }
         }
     }
 
@@ -265,9 +265,7 @@ public class AliBridgeServer extends ClusterUniqueTask<AliBridgeServer> {
 
     @Override
     public void beMasterPostProcessor() {
-        if(BridgeStatus.running.equals(status)){
-            bridgeGateway.initBridge(params,false).subscribe();
-        }
+        handleStatus(status,false).subscribe();
     }
 
     @Override
@@ -298,9 +296,7 @@ public class AliBridgeServer extends ClusterUniqueTask<AliBridgeServer> {
             case stop:
                 return bridgeGateway.pauseBridge(id,broadcast);
             case running:
-                if(this.bootstrap==null){
-                    return bridgeGateway.initBridge(params,broadcast);
-                }
+                return bridgeGateway.initBridge(params,broadcast);
             case fail:
                 break;
             default:break;

+ 1 - 1
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/web/AliBridgeServerController.java

@@ -124,7 +124,7 @@ public class AliBridgeServerController implements
     @Operation(summary = "重启网桥")
     @CreateAction
     public Mono<Void> startBridge(@PathVariable("serverId")String id){
-        return bridgeGateway.reconnect(id);
+        return bridgeGateway.reconnect(id,true);
     }
 
     @PostMapping("/unregister/{serverId}")