Browse Source

add 网桥设备新增设备

18339543638 4 years ago
parent
commit
6741a4e8cb
16 changed files with 276 additions and 97 deletions
  1. 16 20
      jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/core/DefaultBridgeBootstrap.java
  2. 1 1
      jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/core/DefaultBridgeConfigManager.java
  3. 14 4
      jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/core/DefaultDeviceConfigManager.java
  4. 9 16
      jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/core/DefaultDeviceServiceClient.java
  5. 167 0
      jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/core/DefaultHttp2MessageClientFactory.java
  6. 1 1
      jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/core/DefaultIoTServiceClient.java
  7. 25 0
      jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/core/DefaultPopClientFactory.java
  8. 10 4
      jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/core/DefaultUplinkChannelHandler.java
  9. 0 1
      jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/entity/AliIotBridgeDeviceConfig.java
  10. 2 6
      jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/entity/AliIotBridgeEntity.java
  11. 2 8
      jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/server/BridgeServer.java
  12. 8 6
      jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/server/aliyun/AliBridgeGateway.java
  13. 9 15
      jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/server/aliyun/AliBridgeServer.java
  14. 4 0
      jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/service/AliBridgeService.java
  15. 8 12
      jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/web/AliBridgeServerController.java
  16. 0 3
      jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/cluster/RedisClusterTopic.java

+ 16 - 20
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/core/DefaultBridgeBootstrap.java

@@ -4,7 +4,6 @@
  */
 package org.jetlinks.community.bridge.core;
 
-import java.nio.charset.StandardCharsets;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -13,14 +12,10 @@ import java.util.concurrent.TimeoutException;
 import com.alibaba.fastjson.JSON;
 
 import com.aliyun.iot.as.bridge.core.BridgeBootstrapApi;
-import com.aliyun.iot.as.bridge.core.client.DeviceServiceClient;
-import com.aliyun.iot.as.bridge.core.client.Http2MessageClientFactory;
-import com.aliyun.iot.as.bridge.core.client.PopClientFactory;
 import com.aliyun.iot.as.bridge.core.client.callback.DownlinkMessageCallback;
 import com.aliyun.iot.as.bridge.core.client.callback.IoTServiceCallback;
 import com.aliyun.iot.as.bridge.core.config.BridgeConfigConsts;
 import com.aliyun.iot.as.bridge.core.config.BridgeConfigManager;
-import com.aliyun.iot.as.bridge.core.config.ConfigFactory;
 import com.aliyun.iot.as.bridge.core.exception.BootException;
 import com.aliyun.iot.as.bridge.core.exception.BridgeHttpException;
 import com.aliyun.iot.as.bridge.core.handler.DownlinkChannelHandler;
@@ -32,13 +27,8 @@ import com.aliyun.iot.as.bridge.core.model.device.DeviceAuthRequest;
 import com.aliyun.iot.as.bridge.core.model.device.DeviceAuthResponse;
 import com.aliyun.iot.as.bridge.core.session.SessionManagerFactory;
 import com.aliyun.iot.as.bridge.core.util.AssertUtil;
-import com.aliyun.openservices.iot.api.Profile;
 import com.aliyun.openservices.iot.api.exception.IotClientException;
-import com.aliyun.openservices.iot.api.message.MessageClientFactory;
 import com.aliyun.openservices.iot.api.message.api.MessageClient;
-import com.aliyun.openservices.iot.api.message.callback.MessageCallback;
-import com.aliyun.openservices.iot.api.message.entity.Message;
-import com.aliyun.openservices.iot.api.message.entity.MessageToken;
 import com.aliyun.openservices.iot.api.util.StringUtil;
 import com.aliyuncs.DefaultAcsClient;
 import com.aliyuncs.exceptions.ClientException;
@@ -47,6 +37,8 @@ import com.aliyuncs.iot.model.v20180120.QueryDeviceDetailResponse;
 import com.aliyuncs.iot.model.v20180120.QueryDeviceDetailResponse.Data;
 import com.aliyuncs.iot.model.v20180120.RegisterDeviceRequest;
 import com.aliyuncs.iot.model.v20180120.RegisterDeviceResponse;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,6 +48,7 @@ import org.slf4j.LoggerFactory;
  * socket server.
  *
  */
+@EqualsAndHashCode(callSuper = false)
 public class DefaultBridgeBootstrap implements BridgeBootstrapApi {
 
     private static final Logger logger = LoggerFactory.getLogger(DefaultBridgeBootstrap.class);
@@ -63,9 +56,11 @@ public class DefaultBridgeBootstrap implements BridgeBootstrapApi {
 
     private BridgeConfigManager bridgeConfigManager;
 
-
-    public DefaultBridgeBootstrap( BridgeConfigManager bridgeConfigManager) {
+    @Getter
+    private String id;
+    public DefaultBridgeBootstrap(String id, BridgeConfigManager bridgeConfigManager) {
         this.bridgeConfigManager = bridgeConfigManager;
+        this.id=id;
         SessionManagerFactory.init(new DefaultSessionManager(DefaultDeviceConfigManager.getInstance()));
     }
 
@@ -81,7 +76,7 @@ public class DefaultBridgeBootstrap implements BridgeBootstrapApi {
 
     @Override
     public void disconnectBridge() {
-        Http2MessageClientFactory.disconnect();
+        DefaultHttp2MessageClientFactory.disconnect(this);
     }
 
     @Override
@@ -92,7 +87,7 @@ public class DefaultBridgeBootstrap implements BridgeBootstrapApi {
 
     @Override
     public boolean isBridgeConnected() {
-        return Http2MessageClientFactory.isConnected();
+        return DefaultHttp2MessageClientFactory.isConnected(this);
     }
 
     @Override
@@ -148,8 +143,8 @@ public class DefaultBridgeBootstrap implements BridgeBootstrapApi {
                 deviceName = bridgeConfigManager.getMacAddress();
                 PopClientConfiguration popConfigs = bridgeConfigManager.getPopConfiguration();
                 AssertUtil.isTrue(popConfigs.isComplete(), "bridge deviceName empty && popConfig is not complete!");
-                DefaultAcsClient popClient = PopClientFactory.initClient(popConfigs.getAccessKey(),
-                    popConfigs.getAccessSecret(), popConfigs.getName(), popConfigs.getRegion(), popConfigs.getProduct(),
+                DefaultAcsClient popClient = DefaultPopClientFactory.initClient(popConfigs.getAccessKey(),
+                    popConfigs.getAccessSecret(), popConfigs.getRegion(), popConfigs.getProduct(),
                     popConfigs.getEndpoint());
                 QueryDeviceDetailRequest queryRequest = new QueryDeviceDetailRequest();
                 if(StringUtil.isNotEmpty(instanceId)){
@@ -185,7 +180,8 @@ public class DefaultBridgeBootstrap implements BridgeBootstrapApi {
                     } else {
                         logger.error(String.format("Failed to register bridge[request:%s, response:%s]",
                             JSON.toJSONString(registerRequest), JSON.toJSONString(registerResponse)));
-                        throw new BootException("Failed to register bridge " + registerResponse.getRequestId());
+//                        throw new BootException("Failed to register bridge " + registerResponse.getRequestId());
+                        throw new BootException(registerResponse.getErrorMessage());
                     }
                 }
             } catch (ClientException e) {
@@ -216,7 +212,7 @@ public class DefaultBridgeBootstrap implements BridgeBootstrapApi {
     }
 
     private void initServiceCall() {
-        MessageClient client = Http2MessageClientFactory.getClient();
+        MessageClient client = DefaultHttp2MessageClientFactory.getClient(this);
         AssertUtil.notNull(client, "client must be connected before initServiceCall");
         String responseTopic = String.format(BridgeConfigConsts.OFFLINE_REPLY_TOPIC_TEMPLATE,
             bridgeConfigManager.getBridgeProductKey(), bridgeConfigManager.getBridgeDeviceName());
@@ -239,7 +235,7 @@ public class DefaultBridgeBootstrap implements BridgeBootstrapApi {
         if(handler == null) {
             return;
         }
-        MessageClient client = Http2MessageClientFactory.getClient();
+        MessageClient client = DefaultHttp2MessageClientFactory.getClient(this);
         String responseTopic = String.format(BridgeConfigConsts.SUB_TOPIC_TEMPLATE,
             bridgeConfigManager.getBridgeProductKey(), bridgeConfigManager.getBridgeDeviceName());
         AssertUtil.notNull(callback, "DefaultDownlinkMessageCallback is null");
@@ -270,7 +266,7 @@ public class DefaultBridgeBootstrap implements BridgeBootstrapApi {
         AssertUtil.notBlank(http2EndPoint, "http2 EndPoint is empty");
         logger.info("Initialize the bridge with {} & {}, {}", productKey, deviceName, http2EndPoint);
         try {
-            Http2MessageClientFactory.initClient(productKey, deviceName, deviceSecret, http2EndPoint, isReconnect);
+            DefaultHttp2MessageClientFactory.initClient(this,productKey, deviceName, deviceSecret, http2EndPoint, isReconnect);
         } catch (IotClientException e) {
             String msg = "Failed to initialize HTTP2 client!" + e.getMessage();
             logger.error(msg, e);

+ 1 - 1
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/core/DefaultBridgeConfigManager.java

@@ -141,7 +141,7 @@ public class DefaultBridgeConfigManager implements BridgeConfigManager {
 
     @Override
     public void updateBridgeIotId(String iotId) {
-        configMaps.put(BridgeConfigConsts.BRIDGE_DEVICE_ID,iotId);
+        configMaps.put(BridgeConfigConsts.BRIDGE_INSTANCEID_KEY,iotId);
     }
     @Override
     public String getMacAddress() {

+ 14 - 4
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/core/DefaultDeviceConfigManager.java

@@ -29,16 +29,26 @@ public class DefaultDeviceConfigManager implements DeviceConfigManager{
     public static DefaultDeviceConfigManager getInstance(){
         return defaultDeviceConfigManager;
     }
-    public static void register(String originalIdentity,String productKey,String deviceName,String deviceSecret){
+    public static void register(String bridgeId,String originalIdentity,String productKey,String deviceName,String deviceSecret){
         DeviceIdentity deviceIdentity = new DeviceIdentity(productKey, deviceName, deviceSecret);
-        configCache.putIfAbsent(originalIdentity,deviceIdentity);
+        configCache.putIfAbsent(getConfigId(bridgeId,originalIdentity),deviceIdentity);
         revertCache.putIfAbsent(getMapKey(deviceIdentity), originalIdentity);
     }
 
+    public static String getConfigId(String bridgeId,String originalIdentity){
+        return bridgeId+"|"+originalIdentity;
+    }
+    /**
+     *  设备原始Id+网桥id
+     *
+     *  configId= ${originalIdentity}|${bridgeId}
+     * @param configId
+     * @return
+     */
     @Override
-    public DeviceIdentity getDeviceIdentity(String originalIdentity) {
+    public DeviceIdentity getDeviceIdentity(String configId) {
         // find in cache first
-        return  configCache.get(originalIdentity);
+        return  configCache.get(configId);
     }
 
     @Override

+ 9 - 16
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/core/DefaultDeviceServiceClient.java

@@ -15,6 +15,8 @@ import java.security.NoSuchAlgorithmException;
 import java.security.SecureRandom;
 import java.security.cert.CertificateException;
 import java.security.cert.X509Certificate;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import javax.net.ssl.HostnameVerifier;
 import javax.net.ssl.HttpsURLConnection;
@@ -48,23 +50,15 @@ public class DefaultDeviceServiceClient {
 	private static final String CONTENT_TYPE_HEADER = "Content-Type";
 
 	private static final String CONTENT_TYPE = "application/x-www-form-urlencoded";
-	
-	private static DefaultDeviceServiceClient instance;
+
 
 	private BridgeConfigManager bridgeConfigManager;
 
+	private static Map<BridgeConfigManager,DefaultDeviceServiceClient> clientMap=new ConcurrentHashMap<>();
     public static DefaultDeviceServiceClient getInstance(BridgeConfigManager bridgeConfigManager) {
-        if(instance != null) {
-            return instance;
-        }
-        instance = new DefaultDeviceServiceClient(bridgeConfigManager);
-        return instance;
+        return clientMap.computeIfAbsent(bridgeConfigManager, k->new DefaultDeviceServiceClient(bridgeConfigManager));
     }
 
-	private DefaultDeviceServiceClient() {
-		this.bridgeConfigManager = ConfigFactory.getBridgeConfigManager();
-	}
-
     private DefaultDeviceServiceClient(BridgeConfigManager bridgeConfigManager) {
         this.bridgeConfigManager = bridgeConfigManager==null?ConfigFactory.getBridgeConfigManager():bridgeConfigManager;
     }
@@ -107,11 +101,7 @@ public class DefaultDeviceServiceClient {
 			}
 			HttpsURLConnection con = (HttpsURLConnection) obj.openConnection();
 			con.setSSLSocketFactory(ctx.getSocketFactory());
-			con.setHostnameVerifier(new HostnameVerifier() {
-				public boolean verify(String hostname, SSLSession session) {
-					return true;
-				}
-			});
+			con.setHostnameVerifier((hostName,session)->true);
 			con.setRequestMethod(METHOD);
 			con.setRequestProperty(CONTENT_TYPE_HEADER, CONTENT_TYPE);
 			con.setDoOutput(true);
@@ -164,14 +154,17 @@ public class DefaultDeviceServiceClient {
 	}
 
 	private class DefaultTrustManager implements X509TrustManager {
+        @Override
 		public X509Certificate[] getAcceptedIssuers() {
 			return null;
 		}
 
+        @Override
 		public void checkClientTrusted(X509Certificate[] chain, String authType)
 			throws CertificateException {
 		}
 
+		@Override
 		public void checkServerTrusted(X509Certificate[] chain, String authType)
 			throws CertificateException {
 		}

+ 167 - 0
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/core/DefaultHttp2MessageClientFactory.java

@@ -0,0 +1,167 @@
+/**
+ * aliyun.com Inc.
+ * Copyright (c) 2004-2018 All Rights Reserved.
+ */
+package org.jetlinks.community.bridge.core;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.*;
+
+import com.aliyun.iot.as.bridge.core.config.BridgeConfigConsts;
+import com.aliyun.iot.as.bridge.core.config.ConfigFactory;
+import com.aliyun.iot.as.bridge.core.handler.BridgeConnectionStatusHandler;
+import com.aliyun.iot.as.bridge.core.handler.UplinkChannelHandler;
+import com.aliyun.iot.as.bridge.core.model.Session;
+import com.aliyun.iot.as.bridge.core.session.SessionManagerFactory;
+import com.aliyun.openservices.iot.api.Profile;
+import com.aliyun.openservices.iot.api.message.MessageClientFactory;
+import com.aliyun.openservices.iot.api.message.api.MessageClient;
+import com.aliyun.openservices.iot.api.message.callback.ConnectionCallback;
+import com.aliyun.openservices.iot.api.message.callback.MessageCallback;
+import com.aliyun.openservices.iot.api.message.entity.Message;
+import com.aliyun.openservices.iot.api.message.entity.MessageToken;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a singleton factory to initialize a messaging client for Http2 SDK.
+ * <br>
+ * Note that this factory is NOT thread safe.
+ *
+ */
+public class DefaultHttp2MessageClientFactory {
+
+	private static final Logger logger = LoggerFactory.getLogger(DefaultHttp2MessageClientFactory.class);
+
+	private static ExecutorService reConnectExecutorService;
+
+    /**
+     * key: bridgeId
+     * value: client
+     */
+	private static Map<String,MessageClient> clientMap =new ConcurrentHashMap<>();
+
+    private static Map<String,BridgeConnectionStatusHandler> handlerMap =new ConcurrentHashMap<>();
+
+	public static MessageClient initClient(DefaultBridgeBootstrap bridgeBootstrap,String productKey, String deviceName, String deviceSecret, String endpoint) {
+		return initClient(bridgeBootstrap,productKey, deviceName, deviceSecret, endpoint, false);
+	}
+
+	public static MessageClient initClient(DefaultBridgeBootstrap bridgeBootstrap,String productKey, String deviceName, String deviceSecret,
+										   String endpoint, boolean isReconnect) {
+        MessageClient client = clientMap.get(bridgeBootstrap.getId());
+        if (client!= null && !isReconnect) {
+			return client;
+		}
+		Profile profile = Profile.getDeviceProfile(endpoint, productKey, deviceName, deviceSecret,
+				String.format(BridgeConfigConsts.CLIENT_ID_TEMPLATE, productKey, deviceName));
+		if (reConnectExecutorService == null) {
+			reConnectExecutorService = new ThreadPoolExecutor(profile.getCallbackThreadCorePoolSize(),
+				profile.getCallbackThreadMaximumPoolSize(), 60, TimeUnit.SECONDS,
+				new LinkedBlockingQueue<>(35000),
+				new ThreadFactoryBuilder().setDaemon(true).setNameFormat("iot-bridge-reconnect-%d").build(),
+				new ThreadPoolExecutor.AbortPolicy());
+		}
+		client = MessageClientFactory.messageClient(profile);
+		client.connect(new MessageCallback() {
+			@Override
+			public Action consume(MessageToken messageToken) {
+				Message message = messageToken.getMessage();
+				byte[] payload = message.getPayload();
+				String responseString = new String(payload, StandardCharsets.UTF_8);
+				logger.warn("Unexpected message [{}] received: {}", message.getTopic(), responseString);
+				return Action.CommitSuccess;
+			}
+		});
+		client.setConnectionCallback(new ConnectionCallback() {
+			@Override
+			public void onConnectionLost() {
+				logger.error("Connection to http2 gateway [{}] is lost", endpoint);
+                BridgeConnectionStatusHandler bridgeConnectionStatusHandler = handlerMap.get(bridgeBootstrap.getId());
+                if (bridgeConnectionStatusHandler != null) {
+					bridgeConnectionStatusHandler.onConnectionLost();
+				}
+			}
+
+			@Override
+			public void onConnected(boolean isReconnected) {
+				String subDeviceConnectMode = ConfigFactory.getBridgeConfigManager().getSubDeviceConnectMode();
+				logger.info("Bridge onConnected, isReconnected={}, subDeviceConnectMode={}",
+					isReconnected, subDeviceConnectMode);
+                BridgeConnectionStatusHandler bridgeConnectionStatusHandler = handlerMap.get(bridgeBootstrap.getId());
+				if (bridgeConnectionStatusHandler !=  null) {
+					bridgeConnectionStatusHandler.onConnected(isReconnected);
+				}
+				if (isReconnected) {
+					if (!BridgeConfigConsts.SUBDEVICE_CONNECT_MODE_VALUE_3.equals(subDeviceConnectMode)) {
+						reConnectAllSubDevices();
+					}
+				}
+			}
+		});
+        clientMap.put(bridgeBootstrap.getId(),client);
+		return client;
+	}
+
+	private static void reConnectAllSubDevices() {
+		Collection<Session> sessions = SessionManagerFactory.getInstance().getSessions();
+		if (sessions.isEmpty()) {
+			return;
+		}
+		UplinkChannelHandler handler = new UplinkChannelHandler();
+		sessions.forEach(session -> {
+			try {
+				reConnectExecutorService.submit(() -> {
+					logger.info("Re-online device {}", session.getOriginalIdentity());
+					try {
+						handler.doOnline(session.getOriginalIdentity());
+					} catch (Exception e) {
+						logger.error("Unexpected exception occurred while Re-online device:" + session.getOriginalIdentity(), e);
+					}
+				});
+			} catch (RejectedExecutionException e) {
+				logger.error("Re-online task rejected, device:{}", session.getOriginalIdentity());
+			} catch (Throwable e) {
+				logger.error("Re-online unExpected exception, device:" + session.getOriginalIdentity(), e);
+			}
+		});
+	}
+
+    public static MessageClient getClient(String id) {
+        return clientMap.get(id);
+    }
+
+
+    public static MessageClient getClient(DefaultBridgeBootstrap bridgeBootstrap) {
+		return clientMap.get(bridgeBootstrap.getId());
+	}
+
+	public static void setClient(DefaultBridgeBootstrap bridgeBootstrap,MessageClient c) {
+		clientMap.put(bridgeBootstrap.getId(),c);
+	}
+
+	public static void setReConnectExecutorService(ExecutorService executorService) {
+		reConnectExecutorService = executorService;
+	}
+
+	public static void setBridgeConnectionStatusHandler(DefaultBridgeBootstrap bridgeBootstrap,BridgeConnectionStatusHandler handler) {
+		handlerMap.merge(bridgeBootstrap.getId(),handler,(o,n)->n);
+	}
+
+	public static void disconnect(DefaultBridgeBootstrap bridgeBootstrap) {
+        MessageClient client = clientMap.get(bridgeBootstrap.getId());
+        if (client != null) {
+			client.disconnect();
+			client = null;
+		}
+	}
+
+	public static boolean isConnected(DefaultBridgeBootstrap bridgeBootstrap) {
+        MessageClient client = clientMap.get(bridgeBootstrap.getId());
+		return client != null && client.isConnected();
+	}
+
+}

+ 1 - 1
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/core/DefaultIoTServiceClient.java

@@ -120,7 +120,7 @@ public class DefaultIoTServiceClient {
 		response.setStartTime(System.currentTimeMillis());
 		response.setId(id);
 		logger.debug("Begin sending request[{}] at [{}]", requestString, System.currentTimeMillis());
-		MessageToken token = Http2MessageClientFactory.getClient().publish(requestTopic, message);
+		MessageToken token = DefaultHttp2MessageClientFactory.getClient(((DefaultBridgeConfigManager)this.bridgeConfigManager).getBridgeId()).publish(requestTopic, message);
 		return token.getPublishFuture().thenAccept(msg ->
 			logger.debug("Sending request[{}] success at [{}]", requestString, System.currentTimeMillis()));
 	}

+ 25 - 0
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/core/DefaultPopClientFactory.java

@@ -0,0 +1,25 @@
+/**
+ * aliyun.com Inc.
+ * Copyright (c) 2004-2018 All Rights Reserved.
+ */
+package org.jetlinks.community.bridge.core;
+
+import com.aliyuncs.DefaultAcsClient;
+import com.aliyuncs.exceptions.ClientException;
+import com.aliyuncs.profile.DefaultProfile;
+import com.aliyuncs.profile.IClientProfile;
+
+/**
+ *
+ * Note that this factory is NOT thread safe.
+ * 
+ */
+public class DefaultPopClientFactory {
+
+    public static DefaultAcsClient initClient(String accessKey, String accessSecret, String region,
+                                              String product, String endPoint) throws ClientException {
+        DefaultProfile.addEndpoint(region,product,endPoint);
+        IClientProfile profile = DefaultProfile.getProfile(region, accessKey, accessSecret);
+        return new DefaultAcsClient(profile);
+    }
+}

+ 10 - 4
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/core/DefaultUplinkChannelHandler.java

@@ -41,6 +41,7 @@ import com.aliyun.iot.as.bridge.core.session.SessionManagerFactory;
 import com.aliyun.openservices.iot.api.message.entity.Message;
 import com.aliyun.openservices.iot.api.message.entity.MessageToken;
 import com.aliyun.openservices.iot.api.util.StringUtil;
+import jdk.internal.dynalink.DefaultBootstrapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,9 +51,11 @@ import org.slf4j.LoggerFactory;
  */
 public class DefaultUplinkChannelHandler {
 
+    private String bridgeId;
     public DefaultUplinkChannelHandler( BridgeConfigManager bridgeConfigManager,DeviceConfigManager deviceConfigManager) {
         this.deviceConfigManager = deviceConfigManager;
         this.bridgeConfigManager = bridgeConfigManager;
+        this.bridgeId=((DefaultBridgeConfigManager)bridgeConfigManager).getBridgeId();
         this.sessionManager = SessionManagerFactory.getInstance();
         this.authProvider = AuthProviderFactory.getInstance();
         this.servicClient = DefaultIoTServiceClient.getClient(bridgeConfigManager);
@@ -174,7 +177,7 @@ public class DefaultUplinkChannelHandler {
 
     private CompletableFuture<ProtocolMessage> doPublishAsyncImpl(String originalIdentity, String topic, byte[] payload,
                                                                   int qos) {
-        DeviceIdentity identity = deviceConfigManager.getDeviceIdentity(originalIdentity);
+        DeviceIdentity identity = deviceConfigManager.getDeviceIdentity(DefaultDeviceConfigManager.getConfigId(bridgeId,originalIdentity));
         BridgeMessagePayloadParams bridgePayloadParams = new BridgeMessagePayloadParams();
         bridgePayloadParams.setDeviceName(identity.getDeviceName());
         bridgePayloadParams.setProductKey(identity.getProductKey());
@@ -220,7 +223,7 @@ public class DefaultUplinkChannelHandler {
             return new DeviceOnlineResult(DeviceOnlineResult.PARAM_ERROR, "local auth fail");
         }
 
-        DeviceIdentity iotIdentity = deviceConfigManager.getDeviceIdentity(originalIdentity);
+        DeviceIdentity iotIdentity = deviceConfigManager.getDeviceIdentity(DefaultDeviceConfigManager.getConfigId(bridgeId,originalIdentity));
         if (iotIdentity == null) {
             logger.error("[device-online-fail] Failed to get device identity for device {}", originalIdentity);
             return new DeviceOnlineResult(DeviceOnlineResult.PARAM_ERROR, "no originalIdentity");
@@ -279,8 +282,11 @@ public class DefaultUplinkChannelHandler {
         }
     }
 
+    private String getConfigId(String originalIdentity){
+        return originalIdentity+"|"+((DefaultBridgeConfigManager)this.bridgeConfigManager).getBridgeId();
+    }
     private boolean doOfflineImpl(String originalIdentity) {
-        DeviceIdentity identity = deviceConfigManager.getDeviceIdentity(originalIdentity);
+        DeviceIdentity identity = deviceConfigManager.getDeviceIdentity(DefaultDeviceConfigManager.getConfigId(bridgeId,originalIdentity));
         if (Objects.isNull(identity)
             || StringUtil.isEmpty(identity.getProductKey())
             || StringUtil.isEmpty(identity.getDeviceName())) {
@@ -297,7 +303,7 @@ public class DefaultUplinkChannelHandler {
     }
 
     private boolean doOfflineAndWaitForCloudResponse(String originalIdentity, int timeout) {
-        DeviceIdentity identity = deviceConfigManager.getDeviceIdentity(originalIdentity);
+        DeviceIdentity identity = deviceConfigManager.getDeviceIdentity(DefaultDeviceConfigManager.getConfigId(bridgeId,originalIdentity));
         OfflineParams params = new OfflineParams();
         params.setDeviceName(identity.getDeviceName());
         params.setProductKey(identity.getProductKey());

+ 0 - 1
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/entity/AliIotBridgeDeviceConfig.java

@@ -25,7 +25,6 @@ import javax.validation.constraints.NotNull;
 @Table(name = "ali_bridge_device", indexes = {
     @Index(name = "deviceName", columnList = "device_name",unique = true),
     @Index(name = "originalIdentity", columnList = "original_identity",unique = true),
-
 },
 schema = "阿里云设备网桥配置")
 @EqualsAndHashCode(callSuper = false)

+ 2 - 6
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/entity/AliIotBridgeEntity.java

@@ -61,11 +61,6 @@ public class AliIotBridgeEntity extends GenericEntity<String> {
     @Deprecated
     private String protocol;
 
-    @Comment("阿里云实例id")
-    @Column(name = "iotInstanceId")
-    @Schema(description = "阿里云实例id")
-    private String iotInstanceId;
-
     @Column(name = "state",length = 16,nullable = false)
     @EnumCodec
     @ColumnType(javaType = String.class)
@@ -109,6 +104,7 @@ public class AliIotBridgeEntity extends GenericEntity<String> {
         @NotNull
         private String accessSecret;
 
-
+        @NotNull
+        private String iotInstanceId;
     }
 }

+ 2 - 8
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/server/BridgeServer.java

@@ -31,23 +31,17 @@ public interface BridgeServer {
      * @return
      */
     String productId();
-    /**
-     * 批量注册设备
-     * @param params
-     *  *************tp1 originalIdentity tp2 productId tp3 deviceName*************
-     * @return
-     */
-    Flux<Channel> registerBatch(Collection<Tuple4<String, String, String,String>> params);
 
     /**
      * 注册设备通道
+     * @param bridgeId 网桥id
      * @param originalIdentity 设备原始id,即网桥设备id
      * @param productKey 产品id
      * @param deviceName 网桥配置
      * @see   AliBridgeServer
      * @return
      */
-    Mono<Channel> register(@NotNull String originalIdentity, @NotNull String productKey,@NotNull  String deviceName,@NotNull  String deviceSecret);
+    Mono<Channel> register(@NotNull String bridgeId, @NotNull String originalIdentity, @NotNull String productKey, @NotNull String deviceName, @NotNull String deviceSecret);
 
     /**
      * 取消注册设备

+ 8 - 6
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/server/aliyun/AliBridgeGateway.java

@@ -35,6 +35,7 @@ import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.function.BiConsumer;
 
 /**
  * @author lifang
@@ -102,7 +103,11 @@ public class AliBridgeGateway{
                     }
                 }
                 return Mono.empty();
-            }).subscribe();
+            })
+            .onErrorContinue((error,object)->{
+                log.error("网桥任务执行失败,",error);
+            })
+            .subscribe();
     }
 
 
@@ -110,9 +115,6 @@ public class AliBridgeGateway{
         return this.bridgeMap.get(serverId);
     }
 
-    private AliBridgeServer putServer(String serverId,AliBridgeServer server){
-        return this.bridgeMap.put(serverId,server);
-    }
     /**
      * 重启网桥
      * @param bridgeId 网桥id
@@ -134,7 +136,7 @@ public class AliBridgeGateway{
         return handleClusterOperation(nodeId,bridgeId,MessageType.register,config)
             .filter(Boolean.FALSE::equals)
             .flatMap(ignore->getServer(bridgeId)
-                .register(config.getOriginalIdentity(),config.getProductKey(),config.getDeviceName(),config.getDeviceSecret())).then();
+                .register(bridgeId,config.getOriginalIdentity(),config.getProductKey(),config.getDeviceName(),config.getDeviceSecret())).then();
     }
 
 
@@ -176,7 +178,7 @@ public class AliBridgeGateway{
                         Mono.error(()->{
                             this.delBridgeServer(bridgeEntity.getNodeId(),
                                 bridgeEntity.getId());
-                            return new BusinessException("网桥配置出错,请确定配置参数是否正确;或请勿重复启动网桥)");
+                            return new BusinessException(error.getMessage());
                         })))
                 .then()).then();
     }

+ 9 - 15
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/server/aliyun/AliBridgeServer.java

@@ -1,6 +1,9 @@
 package org.jetlinks.community.bridge.server.aliyun;
 
 import cn.hutool.core.lang.Assert;
+import cn.hutool.core.lang.Pair;
+import cn.hutool.core.map.MapUtil;
+import com.aliyun.iot.as.bridge.core.config.BridgeConfigConsts;
 import com.aliyun.iot.as.bridge.core.handler.DownlinkChannelHandler;
 import com.aliyun.iot.as.bridge.core.model.PopClientConfiguration;
 import com.aliyun.iot.as.bridge.core.model.Session;
@@ -91,8 +94,8 @@ public class AliBridgeServer  implements BridgeServer {
         Assert.notNull(http2Endpoint,"创建网桥 http2Endpoint 不能为空, mapping id {%s}",params.getId());
         String regionId = accessConfig.getRegionId();
         Assert.notNull(regionId,"创建网桥 regionId 不能为空, mapping id {%s}",params.getId());
-        bridgeConfigManager = DefaultBridgeConfigManager.of(params.getId(),productKey, null, null, http2Endpoint, authEndpoint, null, getPopClientProfile(accessKey, accessSecret, regionId));
-        bootstrap=new DefaultBridgeBootstrap(bridgeConfigManager);
+        bridgeConfigManager = DefaultBridgeConfigManager.of(params.getId(),productKey, null, null, http2Endpoint, authEndpoint, MapUtil.of(Pair.of(BridgeConfigConsts.BRIDGE_INSTANCEID_KEY,params.getAccessConfig().getIotInstanceId())), getPopClientProfile(accessKey, accessSecret, regionId));
+        bootstrap=new DefaultBridgeBootstrap(params.getId(),bridgeConfigManager);
         if(start.get()){
             return Mono.empty();
         }
@@ -140,19 +143,9 @@ public class AliBridgeServer  implements BridgeServer {
     }
 
     @Override
-    public Flux<Channel> registerBatch(Collection<Tuple4<String, String, String,String>> params) {
-        return Flux.fromStream(params.stream())
-            .flatMap(tp4->this.register(tp4.getT1(),tp4.getT2(),tp4.getT3(),tp4.getT4()))
-            .doOnEach(ReactiveLogger.onError(err -> log.error("注册阿里云云云对接网关失败:{}", err)))
-            //发生错误不中断读
-            .onErrorResume((err)->Mono.empty());
-    }
-
-
-    @Override
-    public Mono<Channel> register(@NotNull String originalIdentity,@NotNull String productKey,@NotNull String deviceName,@NotNull String deviceSecret) {
+    public Mono<Channel> register(@NotNull String bridgeId,@NotNull String originalIdentity,@NotNull String productKey,@NotNull String deviceName,@NotNull String deviceSecret) {
         //注册设备信息
-        DefaultDeviceConfigManager.register(originalIdentity,productKey,deviceName,deviceSecret);
+        DefaultDeviceConfigManager.register(bridgeId,originalIdentity,productKey,deviceName,deviceSecret);
         DefaultUplinkChannelHandler uplinkChannelHandler = new DefaultUplinkChannelHandler(bridgeConfigManager, DefaultDeviceConfigManager.getInstance());
         channelMap
             .putIfAbsent(originalIdentity, new DefaultAliBridgeChannel(originalIdentity, productKey, deviceName, uplinkChannelHandler, deviceRegistry, eventBus));
@@ -170,7 +163,8 @@ public class AliBridgeServer  implements BridgeServer {
                     })
                     .thenReturn(channel)
             )
-            .onErrorResume(error->Mono.error(()->{
+            .onErrorResume(error->
+                Mono.error(()->{
                 this.unRegister(originalIdentity);
                 return new BusinessException("请检查deviceName和deviceSecret是否填写正确");
             }));

+ 4 - 0
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/service/AliBridgeService.java

@@ -10,6 +10,8 @@ import org.jetlinks.core.cluster.ClusterManager;
 import org.springframework.boot.CommandLineRunner;
 import org.springframework.stereotype.Component;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler;
+import reactor.core.scheduler.Schedulers;
 
 /**
  * @author lifang
@@ -34,6 +36,8 @@ public class AliBridgeService extends GenericReactiveCacheSupportCrudService<Ali
             .where(AliIotBridgeEntity::getState, BridgeStatus.running)
             .where(AliIotBridgeEntity::getNodeId,clusterManager.getCurrentServerId())
             .fetch()
+            .parallel()
+            .runOn(Schedulers.parallel())
             .flatMap(bridgeParam->bridgeGateway.initBridge(bridgeParam)
                 .concatWith(bridgeDeviceService.createQuery()
                     .where(AliIotBridgeDeviceConfig::getBridgeId,bridgeParam.getId())

+ 8 - 12
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/web/AliBridgeServerController.java

@@ -90,18 +90,14 @@ public class AliBridgeServerController implements
                     throw new BusinessException("不可在网桥下绑定网桥设备自身");
                 }
             })
-            .concatWith(bridgeService.findById(id)
-                .flatMap(bridge->
-                    Mono.zip(
-                        //保存网桥设备信息
+            .flatMap(ingore->
+                bridgeService.findById(id)
+                    .flatMap(bridge->
                         bridgeDeviceService.save(config)
-                            .onErrorResume(DuplicateKeyException.class,
-                                e->Mono.error(new BusinessException(
-                                    String.format("deviceName:[{%s}],deviceSecret:[{%s}]或设备id:[{%s}],已在网桥存在,不可重复添加",config.getDeviceName(),config.getDeviceSecret(),config.getOriginalIdentity())))),
-                        //注册网桥设备
-                        bridgeGateway.registerDevice(bridge.getNodeId(),bridge.getId(),config))
-
-                ).then(Mono.empty())).then();
+                            .concatWith(__ -> bridgeGateway.registerDevice(bridge.getNodeId(), bridge.getId(), config))
+                            .onErrorResume(error->Mono.error(new BusinessException(error.getMessage().contains("originalidentity")?"平台设备不可重复绑定":"阿里云设备不可重复绑定")))
+                            .then())
+            ).then();
     }
 
     @PostMapping("/pause/{serverId}")
@@ -150,7 +146,7 @@ public class AliBridgeServerController implements
     @CreateAction
     public Mono<Void> updateBridge(@RequestBody AliIotBridgeEntity bridge,@PathVariable("serverId") String id){
         if(StrUtil.isNullOrUndefined(bridge.getId())){
-            bridge.setId(IdUtils.newUUID());
+            bridge.setId(String.valueOf(System.currentTimeMillis()));
         }
         return bridgeService.findById(id)
             .defaultIfEmpty(bridge)

+ 0 - 3
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/cluster/RedisClusterTopic.java

@@ -1,13 +1,10 @@
 package org.jetlinks.community.standalone.configuration.cluster;
 
-import org.hswebframework.web.exception.BusinessException;
 import org.jetlinks.core.cluster.ClusterTopic;
 import org.reactivestreams.Publisher;
 import org.springframework.data.redis.core.ReactiveRedisOperations;
 import reactor.core.Disposable;
 import reactor.core.publisher.*;
-
-import java.time.Duration;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public class RedisClusterTopic<T> implements ClusterTopic<T> {