Jelajahi Sumber

add 自动启动

18339543638 4 tahun lalu
induk
melakukan
625dda1d04

+ 30 - 4
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/core/DefaultBridgeConfigManager.java

@@ -1,9 +1,11 @@
 package org.jetlinks.community.bridge.core;
 
+import cn.hutool.core.util.HashUtil;
 import com.aliyun.iot.as.bridge.core.config.BridgeConfigConsts;
 import com.aliyun.iot.as.bridge.core.config.BridgeConfigManager;
 import com.aliyun.iot.as.bridge.core.exception.BootException;
 import com.aliyun.iot.as.bridge.core.model.PopClientConfiguration;
+import io.netty.util.internal.MacAddressUtil;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
 
@@ -16,6 +18,11 @@ import java.util.Map;
 @NoArgsConstructor
 public class DefaultBridgeConfigManager implements BridgeConfigManager {
 
+    /**
+     * 网桥产品id,代替mac地址作为网桥的唯一标识
+     */
+    private String originalProductId;
+
     /**
      * 网桥设备所属产品的ProductKey。
      */
@@ -47,11 +54,12 @@ public class DefaultBridgeConfigManager implements BridgeConfigManager {
 
     private PopClientConfiguration popConfiguration;
 
-    public static DefaultBridgeConfigManager of(String bridgeProductKey, String bridgeDeviceName, String bridgeDeviceSecret, String http2Endpoint,String authEndpoint, Map<String, Object> configMaps,PopClientConfiguration popConfiguration) {
-        return new DefaultBridgeConfigManager(bridgeProductKey,bridgeDeviceName,bridgeDeviceSecret,http2Endpoint,authEndpoint,configMaps,popConfiguration);
+    public static DefaultBridgeConfigManager of(String originalProductId,String bridgeProductKey, String bridgeDeviceName, String bridgeDeviceSecret, String http2Endpoint,String authEndpoint, Map<String, Object> configMaps,PopClientConfiguration popConfiguration) {
+        return new DefaultBridgeConfigManager(originalProductId,bridgeProductKey,bridgeDeviceName,bridgeDeviceSecret,http2Endpoint,authEndpoint,configMaps,popConfiguration);
     }
 
-    private DefaultBridgeConfigManager(String bridgeProductKey, String bridgeDeviceName, String bridgeDeviceSecret, String http2Endpoint,String authEndpoint,Map<String, Object> configMaps, PopClientConfiguration popConfiguration) {
+    private DefaultBridgeConfigManager(String originalProductId,String bridgeProductKey, String bridgeDeviceName, String bridgeDeviceSecret, String http2Endpoint,String authEndpoint,Map<String, Object> configMaps, PopClientConfiguration popConfiguration) {
+        this.originalProductId=originalProductId;
         this.bridgeProductKey = bridgeProductKey;
         this.bridgeDeviceName = bridgeDeviceName;
         this.bridgeDeviceSecret = bridgeDeviceSecret;
@@ -117,7 +125,9 @@ public class DefaultBridgeConfigManager implements BridgeConfigManager {
     }
     @Override
     public String getMacAddress() {
-        return getMac(getInetAddress());
+//        return String.valueOf(HashUtil.rsHash(originalProductId));
+//        return getMac(getInetAddress());
+        return "31-32-33-34-35-36";
     }
     @Override
     public void updateBridgeDeviceSecret(String deviceSecret) {
@@ -129,6 +139,7 @@ public class DefaultBridgeConfigManager implements BridgeConfigManager {
         this.bridgeDeviceName=deviceName;
     }
 
+
     private String getMac(InetAddress addr) {
         try {
             NetworkInterface net = NetworkInterface.getByInetAddress(addr);
@@ -150,6 +161,21 @@ public class DefaultBridgeConfigManager implements BridgeConfigManager {
         }
     }
 
+    public static void main(String[] args) throws UnknownHostException {
+        byte[] macBytes ="123456".getBytes();
+        StringBuffer buffer = new StringBuffer();
+        for(int i = 0; i < macBytes.length; i++){
+            if(i != 0) { buffer.append("-"); }
+            int intMac = macBytes[i]&0xff;
+            String str = Integer.toHexString(intMac);
+            if(str.length() == 0){
+                buffer.append("0");
+            }
+            buffer.append(str);
+        }
+        System.out.println(buffer.toString().toUpperCase());
+
+    }
     private InetAddress getInetAddress() {
         try {
             Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();

+ 12 - 3
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/entity/AliIotBridgeEntity.java

@@ -24,7 +24,8 @@ import java.sql.JDBCType;
  * @createTime 2021年11月26日 09:35:00
  */
 @Table(name = "ali_bridge", indexes = {
-    @Index(name = "product", columnList = "product_id")
+    @Index(name = "product", columnList = "product_id"),
+    @Index(name = "nodeId",columnList = "node_id")
 })
 @Getter
 @Setter
@@ -36,8 +37,9 @@ public class AliIotBridgeEntity extends GenericEntity<String> {
     private String name;
 
     @Comment("网桥产品id")
-    @Column(name = "product_id",nullable = false,updatable = false)
+    @Column(name = "product_id")
     @Schema(description = "网桥产品id")
+    @Deprecated
     private String productId;
 
     @Comment("说明")
@@ -53,8 +55,9 @@ public class AliIotBridgeEntity extends GenericEntity<String> {
     private AccessConfig accessConfig;
 
     @Comment("云云协议")
-    @Column(name = "protocol",nullable = false,updatable = false)
+    @Column(name = "protocol")
     @Schema(description = "云云协议")
+    @Deprecated
     private String protocol;
 
 
@@ -71,8 +74,14 @@ public class AliIotBridgeEntity extends GenericEntity<String> {
     @Comment("失败原因")
     @Column(name = "error_reason")
     @Schema(description = "失败原因")
+    @Deprecated
     private String errorReason;
 
+    @Comment("所在集群节点id")
+    @Column(name = "node_id")
+    @Schema(description = "所在集群节点id")
+    private String nodeId;
+
     @Data
     public static class AccessConfig implements Serializable {
         @NotNull

+ 17 - 9
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/server/aliyun/AliBridgeGateway.java

@@ -41,6 +41,7 @@ public class AliBridgeGateway{
      */
     private static final Map<String,AliBridgeServer> serverMap=new ConcurrentHashMap<>();
 
+    private volatile AliBridgeServer aliBridgeServer;
     private final EmitterProcessor<Message> messageProcessor = EmitterProcessor.create(false);
 
     private final FluxSink<Message> sink = messageProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
@@ -59,9 +60,16 @@ public class AliBridgeGateway{
     }
 
     public Mono<Void> initBridge(AliIotBridgeEntity productMapping){
-        if (!serverMap.containsKey(productMapping.getProductId())) {
+        if (aliBridgeServer!=null) {
             return   AliBridgeServer.create(eventBus,registry,productMapping)
-                .doOnNext(server->serverMap.put(productMapping.getProductId(),server))
+                .doOnNext(server -> {
+                    if(aliBridgeServer!=null){
+                        aliBridgeServer=server;
+                    }else {
+                        server.stopBridge().subscribe();
+                        throw new BusinessException("网桥正在运行,请勿重复启动");
+                    }
+                })
                 .doOnNext(server->{
                     server.handleReceive()
                         .parallel()
@@ -71,20 +79,20 @@ public class AliBridgeGateway{
                 })
                 .then()
                 .onErrorResume(error->Mono.error(()->{
-                    this.delBridgeServer(productMapping.getProductId());
+                    this.delBridgeServer();
                     return new BusinessException("网桥配置出错,请确定配置参数是否正确(是否存在空格)");
                 }));
         }
         return Mono.empty();
     }
 
-    public Mono<AliBridgeServer> getBridgeServer(String serverId){
-        return Mono.justOrEmpty(serverMap.get(serverId))
-            .switchIfEmpty(Mono.error(new BusinessException("请先根据该产品{}创建网桥",serverId)));
+    public Mono<AliBridgeServer> getBridgeServer(){
+        return Mono.justOrEmpty(aliBridgeServer)
+            .switchIfEmpty(Mono.error(new BusinessException("暂未创建网桥")));
     }
 
-    public Mono<Void> delBridgeServer(String serverId){
-        return Mono.justOrEmpty(serverMap.remove(serverId))
+    public Mono<Void> delBridgeServer(){
+        return Mono.justOrEmpty(aliBridgeServer)
             .switchIfEmpty(Mono.defer(()->Mono.just(new AliBridgeServer(eventBus,registry))))
             .flatMap(AliBridgeServer::stopBridge)
             .then();
@@ -96,7 +104,7 @@ public class AliBridgeGateway{
         if(StrUtil.isEmpty(bridge.getProductId())||!bridge.getProductId().equals(serverId)){
             return Mono.error(new BusinessException("网桥配置有误,网桥产品不可更改"));
         }
-        return delBridgeServer(serverId)
+        return delBridgeServer()
             .concatWith(initBridge(bridge))
             .then();
     }

+ 1 - 2
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/server/aliyun/AliBridgeServer.java

@@ -20,7 +20,6 @@ import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.event.EventBus;
 import reactor.core.publisher.*;
 import reactor.util.function.Tuple4;
-
 import javax.validation.constraints.NotNull;
 import java.util.Collection;
 import java.util.Map;
@@ -86,7 +85,7 @@ public class AliBridgeServer  implements BridgeServer {
         Assert.notNull(http2Endpoint,"创建网桥 http2Endpoint 不能为空, mapping id {%s}",params.getId());
         String regionId = accessConfig.getRegionId();
         Assert.notNull(regionId,"创建网桥 regionId 不能为空, mapping id {%s}",params.getId());
-        bridgeConfigManager = DefaultBridgeConfigManager.of(productKey, null, null, http2Endpoint, authEndpoint, null, getPopClientProfile(accessKey, accessSecret, regionId));
+        bridgeConfigManager = DefaultBridgeConfigManager.of(productId,productKey, null, null, http2Endpoint, authEndpoint, null, getPopClientProfile(accessKey, accessSecret, regionId));
         bootstrap=new DefaultBridgeBootstrap(bridgeConfigManager);
         if(start.get()){
             return Mono.empty();

+ 22 - 13
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/service/AliBridgeService.java

@@ -7,6 +7,8 @@ import org.jetlinks.community.bridge.entity.AliIotBridgeEntity;
 import org.jetlinks.community.bridge.enums.BridgeDeviceStatus;
 import org.jetlinks.community.bridge.enums.BridgeStatus;
 import org.jetlinks.community.bridge.server.aliyun.AliBridgeGateway;
+import org.jetlinks.core.cluster.ClusterUniqueTask;
+import org.jetlinks.supports.cluster.redis.RedisClusterManager;
 import org.springframework.boot.CommandLineRunner;
 import org.springframework.stereotype.Component;
 import reactor.core.publisher.Mono;
@@ -24,19 +26,26 @@ public class AliBridgeService extends GenericReactiveCacheSupportCrudService<Ali
     private final AliBridgeGateway bridgeGateway;
     private final AliBridgeDeviceService bridgeDeviceService;
 
+
+    private RedisClusterManager clusterManager;
+
     @Override
-    public void run(String... args) throws Exception {
-//        this.createQuery()
-//            .where(AliIotBridgeEntity::getState, BridgeStatus.running)
-//            .fetch()
-//            .flatMap(bridgeParam->bridgeGateway.initBridge(bridgeParam)
-//                .concatWith(bridgeDeviceService.createQuery()
-//                    .where(AliIotBridgeDeviceConfig::getBridgeId,bridgeParam.getId())
-//                    .fetch()
-//                    .flatMap(deviceConfig->
-//                        bridgeGateway.getBridgeServer(bridgeParam.getProductId()).flatMap(bridgeServe->bridgeServe.register(deviceConfig.getOriginalIdentity(),deviceConfig.getProductKey(),deviceConfig.getDeviceName(),deviceConfig.getDeviceSecret()))
-//                            .then(Mono.empty()))
-//                ))
-//            .subscribe();1
+    public void run(String... args) {
+        this.createQuery()
+            .where(AliIotBridgeEntity::getState, BridgeStatus.running)
+            .where(AliIotBridgeEntity::getNodeId,clusterManager.getClusterName())
+            .fetch()
+            .flatMap(bridgeParam->bridgeGateway.initBridge(bridgeParam)
+                .concatWith(bridgeDeviceService.createQuery()
+                    .where(AliIotBridgeDeviceConfig::getBridgeId,bridgeParam.getId())
+                    .fetch()
+                    .flatMap(deviceConfig->
+                        bridgeGateway.getBridgeServer()
+                            .flatMap(bridgeServe->
+                                bridgeServe
+                                    .register(deviceConfig.getOriginalIdentity(),deviceConfig.getProductKey(),deviceConfig.getDeviceName(),deviceConfig.getDeviceSecret()))
+                            .then(Mono.empty()))
+                ))
+            .subscribe();
     }
 }

+ 6 - 6
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/web/AliBridgeServerController.java

@@ -59,7 +59,7 @@ public class AliBridgeServerController implements
     @Operation(summary = "删除网桥")
     @DeleteAction
     public Mono<Void> deleteBridge(@PathVariable("productId")String productId,@PathVariable("bridgeId")String bridgeId){
-        return Mono.zip(bridgeGateway.getBridgeServer(productId)
+        return Mono.zip(bridgeGateway.getBridgeServer()
             .flatMap(AliBridgeServer::stopBridge),bridgeService.deleteById(bridgeId))
             .concatWith(bridgeDeviceService.createDelete().where(AliIotBridgeDeviceConfig::getBridgeId,bridgeId).execute().then(Mono.empty()))
             .then();
@@ -74,7 +74,7 @@ public class AliBridgeServerController implements
             //保存网桥设备信息
             bridgeDeviceService.save(config),
             //注册网桥设备
-            bridgeGateway.getBridgeServer(id)
+            bridgeGateway.getBridgeServer()
                 .zipWith(config)
                 .flatMap(tp2-> tp2.getT1().register(tp2.getT2().getOriginalIdentity(),tp2.getT2().getProductKey(),tp2.getT2().getDeviceName(),tp2.getT2().getDeviceSecret())))
             .then();
@@ -84,7 +84,7 @@ public class AliBridgeServerController implements
     @Operation(summary = "暂停网桥")
     @CreateAction
     public Mono<Void> pause(@PathVariable("productId") String productId){
-        return bridgeGateway.getBridgeServer(productId)
+        return bridgeGateway.getBridgeServer()
             .flatMap(AliBridgeServer::stopBridge)
             .then();
     }
@@ -96,7 +96,7 @@ public class AliBridgeServerController implements
      * 判断是否是重启网桥,若不是重新,则重新注册网桥
      */
     public Mono<Void> startBridge(@PathVariable("productId")String productId){
-        return bridgeGateway.getBridgeServer(productId)
+        return bridgeGateway.getBridgeServer()
             .flatMap(AliBridgeServer::reconnect)
             .then();
     }
@@ -109,7 +109,7 @@ public class AliBridgeServerController implements
             //删除网桥设备
             bridgeDeviceService.createDelete().where(AliIotBridgeDeviceConfig::getOriginalIdentity,originalIdentity).execute(),
             //取消注册
-            bridgeGateway.getBridgeServer(productId)
+            bridgeGateway.getBridgeServer()
                 .flatMap(server->server.unRegister(originalIdentity))).then();
     }
 
@@ -141,7 +141,7 @@ public class AliBridgeServerController implements
                 .where(AliIotBridgeDeviceConfig::getBridgeId,bridge.getId())
                 .fetch()
                 .flatMap(deviceConfig->
-                    bridgeGateway.getBridgeServer(productId).flatMap(bridgeServe->bridgeServe.register(deviceConfig.getOriginalIdentity(),deviceConfig.getProductKey(),deviceConfig.getDeviceName(),deviceConfig.getDeviceSecret()))
+                    bridgeGateway.getBridgeServer().flatMap(bridgeServe->bridgeServe.register(deviceConfig.getOriginalIdentity(),deviceConfig.getProductKey(),deviceConfig.getDeviceName(),deviceConfig.getDeviceSecret()))
                         .then(Mono.empty())
                 ).then());
     }