ソースを参照

fixed 网桥设备注册绑定

18339543638 4 年 前
コミット
2ce4f48aea

+ 4 - 12
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/core/DefaultBridgeConfigManager.java

@@ -5,7 +5,6 @@ import com.aliyun.iot.as.bridge.core.config.BridgeConfigConsts;
 import com.aliyun.iot.as.bridge.core.config.BridgeConfigManager;
 import com.aliyun.iot.as.bridge.core.exception.BootException;
 import com.aliyun.iot.as.bridge.core.model.PopClientConfiguration;
-import io.netty.util.internal.MacAddressUtil;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
 
@@ -18,11 +17,6 @@ import java.util.Map;
 @NoArgsConstructor
 public class DefaultBridgeConfigManager implements BridgeConfigManager {
 
-    /**
-     * 网桥产品id,代替mac地址作为网桥的唯一标识
-     */
-    private String originalProductId;
-
     /**
      * 网桥设备所属产品的ProductKey。
      */
@@ -54,12 +48,11 @@ public class DefaultBridgeConfigManager implements BridgeConfigManager {
 
     private PopClientConfiguration popConfiguration;
 
-    public static DefaultBridgeConfigManager of(String originalProductId,String bridgeProductKey, String bridgeDeviceName, String bridgeDeviceSecret, String http2Endpoint,String authEndpoint, Map<String, Object> configMaps,PopClientConfiguration popConfiguration) {
-        return new DefaultBridgeConfigManager(originalProductId,bridgeProductKey,bridgeDeviceName,bridgeDeviceSecret,http2Endpoint,authEndpoint,configMaps,popConfiguration);
+    public static DefaultBridgeConfigManager of(String bridgeProductKey, String bridgeDeviceName, String bridgeDeviceSecret, String http2Endpoint,String authEndpoint, Map<String, Object> configMaps,PopClientConfiguration popConfiguration) {
+        return new DefaultBridgeConfigManager(bridgeProductKey,bridgeDeviceName,bridgeDeviceSecret,http2Endpoint,authEndpoint,configMaps,popConfiguration);
     }
 
-    private DefaultBridgeConfigManager(String originalProductId,String bridgeProductKey, String bridgeDeviceName, String bridgeDeviceSecret, String http2Endpoint,String authEndpoint,Map<String, Object> configMaps, PopClientConfiguration popConfiguration) {
-        this.originalProductId=originalProductId;
+    private DefaultBridgeConfigManager(String bridgeProductKey, String bridgeDeviceName, String bridgeDeviceSecret, String http2Endpoint,String authEndpoint,Map<String, Object> configMaps, PopClientConfiguration popConfiguration) {
         this.bridgeProductKey = bridgeProductKey;
         this.bridgeDeviceName = bridgeDeviceName;
         this.bridgeDeviceSecret = bridgeDeviceSecret;
@@ -126,8 +119,7 @@ public class DefaultBridgeConfigManager implements BridgeConfigManager {
     @Override
     public String getMacAddress() {
 //        return String.valueOf(HashUtil.rsHash(originalProductId));
-//        return getMac(getInetAddress());
-        return "31-32-33-34-35-36";
+        return getMac(getInetAddress());
     }
     @Override
     public void updateBridgeDeviceSecret(String deviceSecret) {

+ 127 - 0
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/core/DefaultIoTServiceClient.java

@@ -0,0 +1,127 @@
+/**
+ * aliyun.com Inc.
+ * Copyright (c) 2004-2018 All Rights Reserved.
+ */
+package org.jetlinks.community.bridge.core;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+
+import com.alibaba.fastjson.JSON;
+
+import com.aliyun.iot.as.bridge.core.client.Http2MessageClientFactory;
+import com.aliyun.iot.as.bridge.core.client.IoTServiceClient;
+import com.aliyun.iot.as.bridge.core.config.BridgeConfigManager;
+import com.aliyun.iot.as.bridge.core.config.ConfigFactory;
+import com.aliyun.iot.as.bridge.core.model.ServiceCallRequest;
+import com.aliyun.iot.as.bridge.core.model.ServiceCallResponse;
+import com.aliyun.openservices.iot.api.message.entity.Message;
+import com.aliyun.openservices.iot.api.message.entity.MessageToken;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Call a remote service and wait for the response by sending a message.
+ * It's like RPC. The invoker will receive a timeout exception if it's too long to receive the response.
+ * 
+ */
+public class DefaultIoTServiceClient {
+	private static Cache<String, ServiceCallResponse> calls = CacheBuilder.newBuilder().maximumSize(5000)
+	        .concurrencyLevel(20).expireAfterWrite(1, TimeUnit.MINUTES).build();
+	
+	private static final Logger logger = LoggerFactory.getLogger(IoTServiceClient.class);
+	
+	private static final String FAKE_METHOD = "fake";
+	
+	private static DefaultIoTServiceClient instance;
+	
+	private BridgeConfigManager bridgeConfigManager;
+	
+	private DefaultIoTServiceClient(BridgeConfigManager bridgeConfigManager) {
+		this.bridgeConfigManager =bridgeConfigManager;
+	}
+	
+	public static DefaultIoTServiceClient getClient(BridgeConfigManager bridgeConfigManager) {
+		if(instance != null) {
+			return instance;
+		}
+		instance = new DefaultIoTServiceClient(bridgeConfigManager);
+		return instance;
+	}
+	
+	public static ServiceCallResponse getResponse(String requestId) {
+		return calls.getIfPresent(requestId);
+	}
+	
+	public ServiceCallResponse call(String topicTemplate, int timeout, Object params) throws TimeoutException {
+		String id = UUID.randomUUID().toString();
+		ServiceCallRequest request = new ServiceCallRequest();
+		request.setParams(params);
+		request.setId(id);
+		request.setMethod(FAKE_METHOD);
+		String requestString = JSON.toJSONString(request);
+		
+		Message message = new Message();
+		message.setQos(0);
+		message.setPayload(requestString.getBytes());
+		
+		String bridgeProductKey = this.bridgeConfigManager.getBridgeProductKey();
+		String bridgeDeviceName = this.bridgeConfigManager.getBridgeDeviceName();
+		String requestTopic = String.format(topicTemplate, 
+				bridgeProductKey, bridgeDeviceName);
+		
+		ServiceCallResponse response = new ServiceCallResponse();
+		response.setStartTime(System.currentTimeMillis());
+		response.setId(id);
+		calls.put(id, response);
+		logger.info("Sending request[{}] to iot at [{}]", requestString, System.currentTimeMillis());
+		MessageToken token = Http2MessageClientFactory.getClient().publish(requestTopic, message);
+		token.getPublishFuture().thenAccept(new Consumer<Message>() {
+			@Override
+			public void accept(Message t) {
+				long sentTime = System.currentTimeMillis();
+				logger.info("Finished actual request message[{}] sending to iot at [{}]", 
+						t.getMessageId(), sentTime);
+				response.setSentTime(sentTime);
+			}
+		});
+		try {
+			response.waitForComplete(timeout);
+		} finally {
+			calls.invalidate(id);
+		}
+		
+		return response;
+	}
+
+	public CompletableFuture<Void> callAsync(String topicTemplate, Object params) {
+		String id = UUID.randomUUID().toString();
+		ServiceCallRequest request = new ServiceCallRequest();
+		request.setParams(params);
+		request.setId(id);
+		request.setMethod(FAKE_METHOD);
+		String requestString = JSON.toJSONString(request);
+
+		Message message = new Message();
+		message.setQos(0);
+		message.setPayload(requestString.getBytes());
+
+		String bridgeProductKey = this.bridgeConfigManager.getBridgeProductKey();
+		String bridgeDeviceName = this.bridgeConfigManager.getBridgeDeviceName();
+		String requestTopic = String.format(topicTemplate,
+			bridgeProductKey, bridgeDeviceName);
+
+		ServiceCallResponse response = new ServiceCallResponse();
+		response.setStartTime(System.currentTimeMillis());
+		response.setId(id);
+		logger.debug("Begin sending request[{}] at [{}]", requestString, System.currentTimeMillis());
+		MessageToken token = Http2MessageClientFactory.getClient().publish(requestTopic, message);
+		return token.getPublishFuture().thenAccept(msg ->
+			logger.debug("Sending request[{}] success at [{}]", requestString, System.currentTimeMillis()));
+	}
+}

+ 2 - 2
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/core/DefaultUplinkChannelHandler.java

@@ -55,7 +55,7 @@ public class DefaultUplinkChannelHandler {
         this.bridgeConfigManager = bridgeConfigManager;
         this.sessionManager = SessionManagerFactory.getInstance();
         this.authProvider = AuthProviderFactory.getInstance();
-        this.servicClient = IoTServiceClient.getClient();
+        this.servicClient = DefaultIoTServiceClient.getClient(bridgeConfigManager);
         this.authClient = DeviceServiceClient.getInstance();
     }
 
@@ -327,7 +327,7 @@ public class DefaultUplinkChannelHandler {
 
     protected SessionManager<? extends Session> sessionManager;
 
-    protected IoTServiceClient servicClient;
+    protected DefaultIoTServiceClient servicClient;
 
     protected DeviceServiceClient authClient;
 

+ 34 - 26
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/server/aliyun/AliBridgeGateway.java

@@ -21,6 +21,8 @@ import org.springframework.stereotype.Service;
 import org.springframework.util.StringUtils;
 import reactor.core.publisher.*;
 import reactor.core.scheduler.Schedulers;
+
+import javax.annotation.PostConstruct;
 import java.io.Serializable;
 import java.util.Objects;
 
@@ -58,10 +60,10 @@ public class AliBridgeGateway{
         this.messageHandler = messageHandler;
         this.clusterManager=clusterManager;
         this.clusterId=clusterManager.getCurrentServerId();
-        this.clusterTopic=clusterManager.getTopic(String.format("bridge-cluster-%s",clusterManager.getCurrentServerId()));
+        this.clusterTopic=clusterManager.getTopic(String.format(format,clusterManager.getCurrentServerId()));
     }
 
-
+    @PostConstruct
     public void init(){
         clusterTopic
             .subscribePattern()
@@ -119,33 +121,39 @@ public class AliBridgeGateway{
             .switchIfEmpty(aliBridgeServer.unRegister(originalIdentity));
     }
 
+    public static void main(String[] args) {
+        Mono.justOrEmpty(null)
+            .then()
+            .switchIfEmpty(Mono.just("123").doOnNext(s->{
+                System.out.println(s);
+            }).then()).subscribe();
+    }
     public Mono<Void> initBridge(AliIotBridgeEntity bridgeEntity){
         return handleClusterOperation(bridgeEntity.getNodeId(),MessageType.init,bridgeEntity)
             .switchIfEmpty(Mono.justOrEmpty(aliBridgeServer)
-                .filter(Objects::nonNull)
-                .flatMap(ignore->
-                    AliBridgeServer.create(eventBus,registry,bridgeEntity)
-                        .doOnNext(server -> {
-                            if(aliBridgeServer!=null){
-                                aliBridgeServer=server;
-                            }else {
-                                server.stopBridge().subscribe();
-                                throw new BusinessException("网桥正在运行,请勿重复启动");
-                            }
-                        })
-                        .doOnNext(server->{
-                            server.handleReceive()
-                                .parallel()
-                                .runOn(Schedulers.parallel())
-                                .flatMap(this::decodeAndHandleMessage)
-                                .subscribe();
-                        })
-                        .then()
-                        .onErrorResume(error->Mono.error(()->{
-                            this.delBridgeServer(bridgeEntity.getNodeId());
-                            return new BusinessException("网桥配置出错,请确定配置参数是否正确(是否存在空格)");
-                        }))
-                )
+                .then()
+                .switchIfEmpty(AliBridgeServer.create(eventBus,registry,bridgeEntity,clusterId)
+                    .doOnNext(server -> {
+                        if(aliBridgeServer==null){
+                            aliBridgeServer=server;
+                        }else {
+                            server.stopBridge().subscribe();
+                            throw new BusinessException("网桥正在运行,请勿重复启动");
+                        }
+                    })
+                    .doOnNext(server->{
+                        server.handleReceive()
+                            .parallel()
+                            .runOn(Schedulers.parallel())
+                            .flatMap(this::decodeAndHandleMessage)
+                            .subscribe();
+                    })
+                    .then()
+                    .onErrorResume(error->
+                        Mono.error(()->{
+                        this.delBridgeServer(bridgeEntity.getNodeId());
+                        return new BusinessException("网桥配置出错,请确定配置参数是否正确;或请勿重复启动网桥)");
+                    })))
                 .then());
     }
 

+ 11 - 12
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/server/aliyun/AliBridgeServer.java

@@ -36,7 +36,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
  */
 @Slf4j
 public class AliBridgeServer  implements BridgeServer {
-    private String productId;
     private Map<String, Channel> channelMap=new ConcurrentHashMap<>();
 
     private final EventBus eventBus;
@@ -53,14 +52,17 @@ public class AliBridgeServer  implements BridgeServer {
 
     private  DefaultBridgeBootstrap bootstrap;
 
+    private final String clusterId;
+
     private final DeviceRegistry deviceRegistry;
-    public AliBridgeServer(EventBus eventBus,DeviceRegistry deviceRegistry) {
+    private AliBridgeServer(EventBus eventBus,DeviceRegistry deviceRegistry,String clusterId) {
         this.deviceRegistry=deviceRegistry;
         this.eventBus = eventBus;
+        this.clusterId=clusterId;
     }
 
-    public static Mono<AliBridgeServer> create(EventBus eventBus,DeviceRegistry deviceRegistry, AliIotBridgeEntity productMapping) {
-        AliBridgeServer aliBridgeServer = new AliBridgeServer(eventBus,deviceRegistry);
+    public static Mono<AliBridgeServer> create(EventBus eventBus,DeviceRegistry deviceRegistry, AliIotBridgeEntity productMapping,String clusterId) {
+        AliBridgeServer aliBridgeServer = new AliBridgeServer(eventBus,deviceRegistry,clusterId);
         return  Mono.just(aliBridgeServer)
             .flatMap(server->server.initBridge(productMapping))
             .thenReturn(aliBridgeServer);
@@ -70,8 +72,6 @@ public class AliBridgeServer  implements BridgeServer {
         if(start.get()){
             return Mono.empty();
         }
-        String productId = params.getProductId();
-        Assert.notNull(productId,"创建网桥 productId 不能为空, mapping id {%s}",params.getId());
         AliIotBridgeEntity.AccessConfig accessConfig = params.getAccessConfig();
         String productKey = accessConfig.getProductKey();
         Assert.notNull(productKey,"创建网桥 productKey 不能为空, mapping id {%s}",params.getId());
@@ -85,13 +85,12 @@ public class AliBridgeServer  implements BridgeServer {
         Assert.notNull(http2Endpoint,"创建网桥 http2Endpoint 不能为空, mapping id {%s}",params.getId());
         String regionId = accessConfig.getRegionId();
         Assert.notNull(regionId,"创建网桥 regionId 不能为空, mapping id {%s}",params.getId());
-        bridgeConfigManager = DefaultBridgeConfigManager.of(productId,productKey, null, null, http2Endpoint, authEndpoint, null, getPopClientProfile(accessKey, accessSecret, regionId));
+        bridgeConfigManager = DefaultBridgeConfigManager.of(productKey, null, null, http2Endpoint, authEndpoint, null, getPopClientProfile(accessKey, accessSecret, regionId));
         bootstrap=new DefaultBridgeBootstrap(bridgeConfigManager);
         if(start.get()){
             return Mono.empty();
         }
         if (start.compareAndSet(false, true)) {
-            this.productId=productId;
             return Mono.fromRunnable(()->{
                 bootstrap.bootstrap(new DownlinkChannelHandler() {
                     @Override
@@ -130,7 +129,7 @@ public class AliBridgeServer  implements BridgeServer {
 
     @Override
     public String productId() {
-        return productId;
+        return null;
     }
 
     @Override
@@ -148,7 +147,7 @@ public class AliBridgeServer  implements BridgeServer {
         //注册设备信息
         DefaultDeviceConfigManager.register(originalIdentity,productKey,deviceName,deviceSecret);
         DefaultUplinkChannelHandler uplinkChannelHandler = new DefaultUplinkChannelHandler(bridgeConfigManager, DefaultDeviceConfigManager.getInstance());
-        channelMap.putIfAbsent(originalIdentity, new DefaultAliBridgeChannel(originalIdentity, productId, productKey, deviceName, uplinkChannelHandler, deviceRegistry, eventBus));
+        channelMap.putIfAbsent(originalIdentity, new DefaultAliBridgeChannel(originalIdentity, productKey, deviceName, uplinkChannelHandler, deviceRegistry, eventBus));
         return Mono.just(channelMap.get(originalIdentity))
             .doOnNext(Channel::init)
             .flatMap(channel ->
@@ -185,7 +184,7 @@ public class AliBridgeServer  implements BridgeServer {
             if(bootstrap!=null){
                 bootstrap.disconnectBridge();
             }
-            log.info("产品网桥{}关闭",this.productId);
+            log.info("网桥[{}]关闭",clusterId);
         });
     }
 
@@ -196,7 +195,7 @@ public class AliBridgeServer  implements BridgeServer {
                 bootstrap.reconnectBridge();
                 channelMap.values().forEach(Channel::online);
             }
-            log.info("产品网桥{}重启成功",this.productId);
+            log.info("网桥[{}]重启成功",clusterId);
         });
     }
 

+ 11 - 6
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/server/aliyun/DefaultAliBridgeChannel.java

@@ -10,6 +10,7 @@ import org.jetlinks.community.bridge.core.DefaultUplinkChannelHandler;
 import org.jetlinks.community.bridge.message.AliBridgeMessage;
 import org.jetlinks.community.bridge.server.Channel;
 import org.jetlinks.core.device.DeviceOperator;
+import org.jetlinks.core.device.DeviceProductOperator;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.event.EventBus;
 import org.jetlinks.core.event.Subscription;
@@ -32,10 +33,6 @@ public class DefaultAliBridgeChannel implements Channel {
     private String originalIdentity;
 
     private DeviceRegistry deviceRegistry;
-    /**
-     * 产品id
-     */
-    private String productId;
 
     private String productKey;
 
@@ -48,19 +45,19 @@ public class DefaultAliBridgeChannel implements Channel {
     private Disposable disposable ;
 
     private AtomicBoolean start=new AtomicBoolean(false);
+
+    private  String productId="**";
     /**
      * @see DefaultBridgeBootstrap 网桥数据下行
      * @see DefaultUplinkChannelHandler 网桥数据下行
      */
     public DefaultAliBridgeChannel(String originalIdentity,
-                                   String productId,
                                    String productKey,
                                    String deviceName,
                                    DefaultUplinkChannelHandler uplinkChannelHandler,
                                    DeviceRegistry deviceRegistry,
                                    EventBus eventBus) {
         this.originalIdentity = originalIdentity;
-        this.productId=productId;
         this.eventBus=eventBus;
         this.deviceRegistry=deviceRegistry;
         this.productKey=productKey;
@@ -79,6 +76,13 @@ public class DefaultAliBridgeChannel implements Channel {
              *  属性设置,服务调用
              *  上下线
              */
+
+            disposable=deviceRegistry.getDevice(originalIdentity)
+                .flatMap(DeviceOperator::getProduct)
+                .doOnNext(deviceProductOperator -> {
+                    productId = deviceProductOperator.getId();}
+                ).subscribe();
+
             Subscription subscription = Subscription.builder()
                 .subscriberId("alibridge-" + this.originalIdentity)
                 //上下线
@@ -137,6 +141,7 @@ public class DefaultAliBridgeChannel implements Channel {
     @Override
     public void close(){
         disposable.dispose();
+        doOffline();
         start.set(false);
     }
 }

+ 7 - 4
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/web/AliBridgeServerController.java

@@ -10,11 +10,13 @@ import org.hswebframework.web.authorization.annotation.DeleteAction;
 import org.hswebframework.web.authorization.annotation.Resource;
 import org.hswebframework.web.crud.service.ReactiveCrudService;
 import org.hswebframework.web.crud.web.reactive.ReactiveServiceCrudController;
+import org.hswebframework.web.exception.BusinessException;
 import org.jetlinks.community.bridge.entity.AliIotBridgeEntity;
 import org.jetlinks.community.bridge.entity.AliIotBridgeDeviceConfig;
 import org.jetlinks.community.bridge.server.aliyun.AliBridgeGateway;
 import org.jetlinks.community.bridge.service.AliBridgeDeviceService;
 import org.jetlinks.community.bridge.service.AliBridgeService;
+import org.springframework.dao.DuplicateKeyException;
 import org.springframework.web.bind.annotation.*;
 import reactor.core.publisher.Mono;
 
@@ -70,7 +72,8 @@ public class AliBridgeServerController implements
             bridgeDeviceService.save(config),
             //注册网桥设备
             bridgeGateway.registerDevice(serverId,config))
-            .then();
+            .then()
+            .onErrorResume(DuplicateKeyException.class,e->Mono.error(new BusinessException("设备不可重复添加")));
     }
 
     @PostMapping("/pause/{serverId}")
@@ -91,7 +94,7 @@ public class AliBridgeServerController implements
     @PostMapping("/unregister/{serverId}")
     @Operation(summary = "取消注册网桥设备")
     @DeleteAction
-    public Mono<Void> unRegister(@PathVariable("id")String serverId,@RequestBody String originalIdentity){
+    public Mono<Void> unRegister(@PathVariable("serverId")String serverId,@RequestBody String originalIdentity){
         return Mono.zip(
             //删除网桥设备
             bridgeDeviceService.createDelete().where(AliIotBridgeDeviceConfig::getOriginalIdentity,originalIdentity).execute(),
@@ -114,8 +117,8 @@ public class AliBridgeServerController implements
     public Mono<Void> updateBridge(@PathVariable("serverId")String serverId,@RequestBody AliIotBridgeEntity bridge){
         //删除设备后重新注册设备
         return   Mono.zip(
-            bridgeService.save(bridge),
-            bridgeGateway.replaceBridgeServer(serverId,bridge))
+            bridgeGateway.replaceBridgeServer(serverId,bridge)
+            , bridgeService.save(bridge))
             .flatMap(tp2->bridgeDeviceService.createQuery()
                 .where(AliIotBridgeDeviceConfig::getBridgeId,bridge.getId())
                 .fetch()