Parcourir la source

fixed 集群网桥

18339543638 il y a 4 ans
Parent
commit
cfe83a995b

+ 22 - 0
jetlinks-core/src/main/java/org/jetlinks/core/AbstractClusterServer.java

@@ -0,0 +1,22 @@
+package org.jetlinks.core;
+
+import org.jetlinks.core.cluster.ClusterManager;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName AbstractClusterServer.java
+ * @Description 集群服务方法
+ * @createTime 2021年12月11日 10:15:00
+ */
+public class AbstractClusterServer<T> {
+    private ClusterManager clusterManager;
+    private String topic;
+    private String currentServerId;
+
+
+    class  ServerMessage{
+        T msg;
+        String fromId;
+    }
+}

+ 0 - 1
jetlinks-core/src/main/java/org/jetlinks/core/cluster/AbstractClusterUniqueTask.java

@@ -123,7 +123,6 @@ public abstract class AbstractClusterUniqueTask<T> implements Serializable {
                         this.generatePingMsgDisposable = this.clusterManager.getTopic(pingTopic).publish(Mono.just(this)).subscribe();
                     }
                 }
-
             })
             .then();
     }

+ 4 - 0
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/core/DefaultBridgeConfigManager.java

@@ -74,6 +74,10 @@ public class DefaultBridgeConfigManager implements BridgeConfigManager {
         this.popConfiguration = popConfiguration;
     }
 
+    public void setBridgeDeviceName(String bridgeDeviceName) {
+        this.bridgeDeviceName = bridgeDeviceName;
+    }
+
     private static String createMacId(String id){
         String result="%s%s-%s%s-%s%s-%s%s-%s%s-%s%s";
         String[] params=new String[12];

+ 4 - 8
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/core/DefaultIoTServiceClient.java

@@ -38,7 +38,7 @@ public class DefaultIoTServiceClient {
 	
 	private static final String FAKE_METHOD = "fake";
 	
-	private static DefaultIoTServiceClient instance;
+//	private static DefaultIoTServiceClient instance;
 	
 	private BridgeConfigManager bridgeConfigManager;
 	
@@ -47,11 +47,7 @@ public class DefaultIoTServiceClient {
 	}
 	
 	public static DefaultIoTServiceClient getClient(BridgeConfigManager bridgeConfigManager) {
-		if(instance != null) {
-			return instance;
-		}
-		instance = new DefaultIoTServiceClient(bridgeConfigManager);
-		return instance;
+		return new DefaultIoTServiceClient(bridgeConfigManager);
 	}
 	
 	public static ServiceCallResponse getResponse(String requestId) {
@@ -71,7 +67,7 @@ public class DefaultIoTServiceClient {
 		message.setPayload(requestString.getBytes());
 		
 		String bridgeProductKey = this.bridgeConfigManager.getBridgeProductKey();
-		String bridgeDeviceName = this.bridgeConfigManager.getBridgeDeviceName();
+		String bridgeDeviceName = this.bridgeConfigManager.getMacAddress();
 		String requestTopic = String.format(topicTemplate, 
 				bridgeProductKey, bridgeDeviceName);
 		
@@ -112,7 +108,7 @@ public class DefaultIoTServiceClient {
 		message.setPayload(requestString.getBytes());
 
 		String bridgeProductKey = this.bridgeConfigManager.getBridgeProductKey();
-		String bridgeDeviceName = this.bridgeConfigManager.getBridgeDeviceName();
+		String bridgeDeviceName = this.bridgeConfigManager.getMacAddress();
 		String requestTopic = String.format(topicTemplate,
 			bridgeProductKey, bridgeDeviceName);
 

+ 0 - 3
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/core/DefaultUplinkChannelHandler.java

@@ -20,10 +20,8 @@ import com.aliyun.iot.as.bridge.core.auth.AuthProvider;
 import com.aliyun.iot.as.bridge.core.auth.AuthProviderFactory;
 import com.aliyun.iot.as.bridge.core.client.DeviceServiceClient;
 import com.aliyun.iot.as.bridge.core.client.Http2MessageClientFactory;
-import com.aliyun.iot.as.bridge.core.client.IoTServiceClient;
 import com.aliyun.iot.as.bridge.core.config.BridgeConfigConsts;
 import com.aliyun.iot.as.bridge.core.config.BridgeConfigManager;
-import com.aliyun.iot.as.bridge.core.config.ConfigFactory;
 import com.aliyun.iot.as.bridge.core.config.DeviceConfigManager;
 import com.aliyun.iot.as.bridge.core.exception.BridgeHttpException;
 import com.aliyun.iot.as.bridge.core.model.BridgeMessagePayload;
@@ -41,7 +39,6 @@ import com.aliyun.iot.as.bridge.core.session.SessionManagerFactory;
 import com.aliyun.openservices.iot.api.message.entity.Message;
 import com.aliyun.openservices.iot.api.message.entity.MessageToken;
 import com.aliyun.openservices.iot.api.util.StringUtil;
-import jdk.internal.dynalink.DefaultBootstrapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

+ 11 - 14
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/server/aliyun/AliBridgeServer.java

@@ -86,6 +86,7 @@ public class AliBridgeServer extends AbstractClusterUniqueTask<AliBridgeServer>
         this.status=params.getState();
         verify(params);
         refreshBridgeConfig(params);
+        params.setDeviceName(bridgeConfigManager.getDeviceName());
         bootstrap=new DefaultBridgeBootstrap(params.getId(),bridgeConfigManager);
         if(broadcast){
             //发送广播消息
@@ -93,11 +94,11 @@ public class AliBridgeServer extends AbstractClusterUniqueTask<AliBridgeServer>
         }
         //非主节点、非运行状态、已启动
         if(isReplica()||!BridgeStatus.running.equals(status)||start.get()){
-            return Mono.empty();
+            return Mono.just(this);
         }
         if (start.compareAndSet(false, true)) {
             if(bootstrap.isBridgeConnected()){
-                return Mono.empty();
+                return Mono.just(this);
             }
             return Mono.fromRunnable(()->{
                 bootstrap.bootstrap(new DownlinkChannelHandler() {
@@ -115,7 +116,6 @@ public class AliBridgeServer extends AbstractClusterUniqueTask<AliBridgeServer>
                 });
             });
         }
-        params.setDeviceName(bridgeConfigManager.getDeviceName());
         return Mono.just(this);
     }
 
@@ -245,7 +245,7 @@ public class AliBridgeServer extends AbstractClusterUniqueTask<AliBridgeServer>
 
     public Mono<Void> delBridge(boolean broadcast) {
         return Mono.fromRunnable(()->{
-            if(bootstrap!=null&&bootstrap.isBridgeConnected()){
+            if(bootstrap!=null&&isReplica()){
                 bootstrap.disconnectBridge();
                 bootstrap=null;
             }
@@ -257,11 +257,9 @@ public class AliBridgeServer extends AbstractClusterUniqueTask<AliBridgeServer>
 
     public Mono<Void> reconnect(boolean broadcast) {
         return  Mono.fromRunnable(()->{
-            if(bootstrap!=null){
-                if(!isReplica()){
-                    bootstrap.reconnectBridge();
-                    channelMap.values().forEach(DefaultAliBridgeChannel::online);
-                }
+            if(bootstrap!=null&&!isReplica()){
+                bootstrap.reconnectBridge();
+                channelMap.values().forEach(DefaultAliBridgeChannel::online);
             }
             changeStatus(BridgeStatus.running,broadcast);
             log.info("网桥[{}]重启成功",id);
@@ -271,12 +269,11 @@ public class AliBridgeServer extends AbstractClusterUniqueTask<AliBridgeServer>
 
     private void changeStatus(BridgeStatus status,boolean broadcast){
         this.status=status;
-        if(isReplica()){
-            if(broadcast){
-                getClusterOperationTopic()
-                    .publish(Mono.just(status)).subscribe();
-            }
+        if(broadcast){
+            getClusterOperationTopic()
+                .publish(Mono.just(status)).subscribe();
         }
+
     }