Sfoglia il codice sorgente

fixed 设备删除
add 集群消息回复
add 集群id不可重复

18339543638 4 anni fa
parent
commit
9d0587e63d
17 ha cambiato i file con 209 aggiunte e 76 eliminazioni
  1. 1 1
      jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/cluster/ClusterUniqueWork.java
  2. 1 2
      jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/cluster/RuleClusterUniqueTask.java
  3. 4 0
      jetlinks-core/src/main/java/org/jetlinks/core/cluster/ClusterTopic.java
  4. 4 4
      jetlinks-core/src/main/java/org/jetlinks/core/cluster/ClusterUniqueTask.java
  5. 29 2
      jetlinks-core/src/main/java/org/jetlinks/core/cluster/ServerNode.java
  6. 65 0
      jetlinks-core/src/main/java/org/jetlinks/core/utils/MacUtils.java
  7. 5 4
      jetlinks-manager/bridge-manager/pom.xml
  8. 0 52
      jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/core/DefaultBridgeConfigManager.java
  9. 2 1
      jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/entity/AliIotBridgeEntity.java
  10. 2 0
      jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/server/aliyun/AliBridgeGateway.java
  11. 25 5
      jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/server/aliyun/AliBridgeServer.java
  12. 0 3
      jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/service/AliBridgeService.java
  13. 2 2
      jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/web/AliBridgeServerController.java
  14. 45 0
      jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/cluster/RedisClusterTopic.java
  15. 8 0
      jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/cluster/RedisHaManager.java
  16. 1 0
      jetlinks-standalone/src/main/resources/application.yml
  17. 15 0
      pom.xml

+ 1 - 1
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/cluster/ClusterUniqueWork.java

@@ -64,7 +64,7 @@ public class ClusterUniqueWork implements Worker, Serializable {
             .flatMap(provider -> {
                 DefaultExecutionContext context = createContext(job);
                 return provider.createTask(context)
-                    .map(executor -> new RuleClusterUniqueTask(schedulerId, context,executor,id,clusterManager,redissonClient));
+                    .map(executor -> new RuleClusterUniqueTask(schedulerId, context,executor,clusterManager,redissonClient));
             });
     }
 

+ 1 - 2
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/cluster/RuleClusterUniqueTask.java

@@ -75,10 +75,9 @@ class RuleClusterUniqueTask extends ClusterUniqueTask implements Task ,Serializa
     RuleClusterUniqueTask(String schedulerId,
                           AbstractExecutionContext context,
                           TaskExecutor executor,
-                          String currentSeverId,
                           ClusterManager clusterManager,
                           RedissonClient redissonClient) {
-        super(context.getInstanceId()+"-"+context.getJob().getNodeId(),currentSeverId,clusterManager,redissonClient);
+        super(context.getInstanceId()+"-"+context.getJob().getNodeId(),clusterManager,redissonClient);
         this.schedulerId = schedulerId;
         this.context = context;
         this.executor = executor;

+ 4 - 0
jetlinks-core/src/main/java/org/jetlinks/core/cluster/ClusterTopic.java

@@ -36,6 +36,10 @@ public interface ClusterTopic<T> {
                 .map(TopicMessage::getMessage);
     }
 
+    Mono<Integer> reply(Publisher<? extends T> publisher, String messageId);
+
+    Flux<TopicMessage<T>> publishAndReceive(Publisher<? extends T> publisher, long timeout, String messageId);
+
     interface TopicMessage<T> {
         String getTopic();
 

+ 4 - 4
jetlinks-core/src/main/java/org/jetlinks/core/cluster/ClusterUniqueTask.java

@@ -67,11 +67,11 @@ public abstract class ClusterUniqueTask<T> implements Serializable {
     private transient ExecutorService heldLockThread= new ThreadPoolExecutor(1, 1,
         Integer.MAX_VALUE, TimeUnit.SECONDS, new ArrayBlockingQueue(1024),r->new Thread(r,"cluster-task"),new ThreadPoolExecutor.DiscardOldestPolicy());
 
-    public ClusterUniqueTask(String id,String currentSeverId,
-                          ClusterManager clusterManager,
-                          RedissonClient redissonClient) {
+    public ClusterUniqueTask(String id,
+                             ClusterManager clusterManager,
+                             RedissonClient redissonClient) {
         this.id=id;
-        this.currentSeverId=currentSeverId;
+        this.currentSeverId=clusterManager.getCurrentServerId();
         this.workerId=currentSeverId;
         this.clusterManager=clusterManager;
         this.pingTopic=String.format(this.pingTopic,this.id);

+ 29 - 2
jetlinks-core/src/main/java/org/jetlinks/core/cluster/ServerNode.java

@@ -1,7 +1,9 @@
 package org.jetlinks.core.cluster;
 
+import io.netty.util.internal.MacAddressUtil;
 import lombok.*;
 import org.hswebframework.web.bean.FastBeanCopier;
+import org.jetlinks.core.utils.MacUtils;
 
 import java.io.Serializable;
 import java.util.Map;
@@ -30,20 +32,45 @@ public class ServerNode implements Serializable {
 
     private long lastKeepAlive;
 
+    private String macId;
+
+    private long startTime;
+    {
+        macId= MacUtils.getMac(MacUtils.getInetAddress());
+        startTime=System.currentTimeMillis();
+    }
+
+
     public boolean hasTag(String tag) {
         return tags != null && tags.containsKey(tag);
     }
 
     public Optional<Object> getTag(String tag) {
         return Optional.ofNullable(tags)
-                .map(t -> t.get(tag));
+            .map(t -> t.get(tag));
     }
 
     public boolean isSame(ServerNode another) {
-        return id.equals(another.getId());
+        return id.equals(another.getId())&&macId.equals(another.getMacId());
+    }
+
+    /**
+     * id是否重复
+     * @param another
+     * @return
+     */
+    public boolean duplication(ServerNode another) {
+        return id.equals(another.getId())&&!macId.equals(another.getMacId());
+    }
+
+    public boolean isAfter(ServerNode another){
+        return startTime>another.getStartTime();
     }
 
     public ServerNode copy(){
         return FastBeanCopier.copy(this,new ServerNode());
     }
+
+
+
 }

+ 65 - 0
jetlinks-core/src/main/java/org/jetlinks/core/utils/MacUtils.java

@@ -0,0 +1,65 @@
+package org.jetlinks.core.utils;
+
+import java.net.*;
+import java.util.Enumeration;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName MacUtils.java
+ * @Description TODO
+ * @createTime 2021年12月08日 14:50:00
+ */
+public class MacUtils {
+
+    public static String getMac(InetAddress addr) {
+        try {
+            NetworkInterface net = NetworkInterface.getByInetAddress(addr);
+            byte[] macBytes = net.getHardwareAddress();
+            StringBuffer buffer = new StringBuffer();
+            for(int i = 0; i < macBytes.length; i++){
+                if(i != 0) { buffer.append("-"); }
+                int intMac = macBytes[i]&0xff;
+                String str = Integer.toHexString(intMac);
+                if(str.length() == 0){
+                    buffer.append("0");
+                }
+                buffer.append(str);
+            }
+            return buffer.toString().toUpperCase();
+        } catch (SocketException e) {
+            String msg = "Failed to resolve mac address, please check!";
+            throw new RuntimeException(msg);
+        }
+    }
+
+    public static InetAddress getInetAddress() {
+        try {
+            Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
+            while(interfaces.hasMoreElements()){
+                NetworkInterface netInterface= interfaces.nextElement();
+                if(netInterface.isLoopback() || !netInterface.isUp() || netInterface.getHardwareAddress() == null) {continue;}
+                Enumeration<InetAddress> inetAddresses = netInterface.getInetAddresses();
+                while(inetAddresses.hasMoreElements()) {
+                    InetAddress inetAdd = inetAddresses.nextElement();
+                    if (inetAdd instanceof Inet4Address) {
+                        return inetAdd;
+                    }
+                }
+            }
+        } catch (SocketException e) {
+            String msg = "Failed to resolve specified address, please check!";
+            throw new RuntimeException(msg);
+        }
+        return getDefaultAddress();
+    }
+
+    private static InetAddress getDefaultAddress() {
+        try {
+            return InetAddress.getLocalHost();
+        } catch (UnknownHostException e) {
+            String msg = "Failed to get ip address of localhost, please specify it manually!";
+            throw new RuntimeException(msg);
+        }
+    }
+}

+ 5 - 4
jetlinks-manager/bridge-manager/pom.xml

@@ -75,13 +75,14 @@
         </dependency>
 
         <dependency>
-            <groupId>io.r2dbc</groupId>
-            <artifactId>r2dbc-h2</artifactId>
+            <groupId>org.hswebframework</groupId>
+            <artifactId>hsweb-easy-orm-rdb</artifactId>
         </dependency>
 
         <dependency>
-            <groupId>org.hswebframework</groupId>
-            <artifactId>hsweb-easy-orm-rdb</artifactId>
+            <groupId>org.redisson</groupId>
+            <artifactId>redisson-spring-boot-starter</artifactId>
+
         </dependency>
     </dependencies>
 </project>

+ 0 - 52
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/core/DefaultBridgeConfigManager.java

@@ -157,56 +157,4 @@ public class DefaultBridgeConfigManager implements BridgeConfigManager {
     public void updateBridgeDeviceName(String deviceName) {
         this.bridgeDeviceName=deviceName;
     }
-
-
-    private String getMac(InetAddress addr) {
-        try {
-            NetworkInterface net = NetworkInterface.getByInetAddress(addr);
-            byte[] macBytes = net.getHardwareAddress();
-            StringBuffer buffer = new StringBuffer();
-            for(int i = 0; i < macBytes.length; i++){
-                if(i != 0) { buffer.append("-"); }
-                int intMac = macBytes[i]&0xff;
-                String str = Integer.toHexString(intMac);
-                if(str.length() == 0){
-                    buffer.append("0");
-                }
-                buffer.append(str);
-            }
-            return buffer.toString().toUpperCase();
-        } catch (SocketException e) {
-            String msg = "Failed to resolve mac address, please check!";
-            throw new BootException(msg);
-        }
-    }
-
-    private InetAddress getInetAddress() {
-        try {
-            Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
-            while(interfaces.hasMoreElements()){
-                NetworkInterface netInterface= interfaces.nextElement();
-                if(netInterface.isLoopback() || !netInterface.isUp() || netInterface.getHardwareAddress() == null) {continue;}
-                Enumeration<InetAddress> inetAddresses = netInterface.getInetAddresses();
-                while(inetAddresses.hasMoreElements()) {
-                    InetAddress inetAdd = inetAddresses.nextElement();
-                    if (inetAdd instanceof Inet4Address) {
-                        return inetAdd;
-                    }
-                }
-            }
-        } catch (SocketException e) {
-            String msg = "Failed to resolve specified address, please check!";
-            throw new BootException(msg);
-        }
-        return getDefaultAddress();
-    }
-
-    private InetAddress getDefaultAddress() {
-        try {
-            return InetAddress.getLocalHost();
-        } catch (UnknownHostException e) {
-            String msg = "Failed to get ip address of localhost, please specify it manually!";
-            throw new BootException(msg);
-        }
-    }
 }

+ 2 - 1
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/entity/AliIotBridgeEntity.java

@@ -103,7 +103,8 @@ public class AliIotBridgeEntity extends GenericEntity<String> {
         private String accessKey;
         @NotNull
         private String accessSecret;
-
+        @NotNull
+        private String version;
         @NotNull
         private String iotInstanceId;
     }

+ 2 - 0
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/server/aliyun/AliBridgeGateway.java

@@ -60,6 +60,8 @@ public class AliBridgeGateway{
 
     private final String  clusterId;
 
+
+
     private final ClusterTopic<Object> clusterTopic;
     private final ClusterManager clusterManager;
     private final String format="bridge-cluster-%s";

+ 25 - 5
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/server/aliyun/AliBridgeServer.java

@@ -10,7 +10,6 @@ import com.aliyun.iot.as.bridge.core.model.Session;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.hswebframework.web.exception.BusinessException;
-import org.hswebframework.web.logger.ReactiveLogger;
 import org.jetlinks.community.bridge.core.DefaultBridgeBootstrap;
 import org.jetlinks.community.bridge.core.DefaultBridgeConfigManager;
 import org.jetlinks.community.bridge.core.DefaultDeviceConfigManager;
@@ -19,13 +18,12 @@ import org.jetlinks.community.bridge.entity.AliIotBridgeEntity;
 import org.jetlinks.community.bridge.message.AliBridgeMessage;
 import org.jetlinks.community.bridge.server.BridgeServer;
 import org.jetlinks.community.bridge.server.Channel;
+import org.jetlinks.core.cluster.ClusterUniqueTask;
 import org.jetlinks.core.device.DeviceOperator;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.event.EventBus;
 import reactor.core.publisher.*;
-import reactor.util.function.Tuple4;
 import javax.validation.constraints.NotNull;
-import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -41,7 +39,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
  * @createTime 2021年11月30日 11:01:00
  */
 @Slf4j
-public class AliBridgeServer  implements BridgeServer {
+public class AliBridgeServer  extends ClusterUniqueTask implements BridgeServer {
     private Map<String, Channel> channelMap=new ConcurrentHashMap<>();
 
     private final EventBus eventBus;
@@ -63,7 +61,8 @@ public class AliBridgeServer  implements BridgeServer {
     @Getter
     private final String id;
     private final DeviceRegistry deviceRegistry;
-    private AliBridgeServer(EventBus eventBus,DeviceRegistry deviceRegistry,String clusterId,String id) {
+    private AliBridgeServer(EventBus eventBus, DeviceRegistry deviceRegistry, String clusterId, String id) {
+        super("",null,null);
         this.deviceRegistry=deviceRegistry;
         this.eventBus = eventBus;
         this.clusterId=clusterId;
@@ -206,4 +205,25 @@ public class AliBridgeServer  implements BridgeServer {
     public Flux<AliBridgeMessage> handleReceive() {
         return receive;
     }
+
+
+    @Override
+    public void beMasterPostProcessor() {
+
+    }
+
+    @Override
+    public Mono<?> handleMsg(Object msg) {
+        return null;
+    }
+
+    @Override
+    public void beforeHandleMsg() {
+
+    }
+
+    @Override
+    public Mono<Void> handlePing(ClusterUniqueTask task) {
+        return null;
+    }
 }

+ 0 - 3
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/service/AliBridgeService.java

@@ -25,9 +25,6 @@ import reactor.core.scheduler.Schedulers;
 public class AliBridgeService extends GenericReactiveCacheSupportCrudService<AliIotBridgeEntity, String> implements CommandLineRunner {
     private final AliBridgeGateway bridgeGateway;
     private final AliBridgeDeviceService bridgeDeviceService;
-
-
-
     private final ClusterManager clusterManager;
 
     @Override

+ 2 - 2
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/web/AliBridgeServerController.java

@@ -57,10 +57,10 @@ public class AliBridgeServerController implements
      * 2、删除网桥
      * 3、删除网桥挂载设备
      */
-    @DeleteMapping("/delete/{serverId}/{bridgeId}")
+    @DeleteMapping("/delete/{bridgeId}")
     @Operation(summary = "删除网桥")
     @DeleteAction
-    public Mono<Void> deleteBridge(@PathVariable("serverId")String id){
+    public Mono<Void> deleteBridge(@PathVariable("bridgeId")String id){
         return bridgeService.findById(id)
             .flatMap(bridge->Mono.zip(
                 bridgeGateway.delBridgeServer(bridge.getNodeId(),bridge.getId())

+ 45 - 0
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/cluster/RedisClusterTopic.java

@@ -1,10 +1,16 @@
 package org.jetlinks.community.standalone.configuration.cluster;
 
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import org.hswebframework.web.exception.BusinessException;
 import org.jetlinks.core.cluster.ClusterTopic;
+import org.jetlinks.core.utils.IdUtils;
 import org.reactivestreams.Publisher;
 import org.springframework.data.redis.core.ReactiveRedisOperations;
 import reactor.core.Disposable;
 import reactor.core.publisher.*;
+
+import java.time.Duration;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public class RedisClusterTopic<T> implements ClusterTopic<T> {
@@ -47,6 +53,7 @@ public class RedisClusterTopic<T> implements ClusterTopic<T> {
                             public T getMessage() {
                                 return data.getMessage();
                             }
+
                         });
                     }
                 });
@@ -66,4 +73,42 @@ public class RedisClusterTopic<T> implements ClusterTopic<T> {
             .last(1L)
             .map(Number::intValue);
     }
+
+
+    /**
+     * 回复消息
+     * @param publisher
+     * @param messageId
+     * @return
+     */
+    @Override
+    public Mono<Integer> reply(Publisher<? extends T> publisher, String messageId){
+        return Flux.from(publisher)
+            .flatMap(data -> operations.convertAndSend("message-reply" + messageId, data))
+            .last(1L)
+            .map(Number::intValue);
+    }
+
+
+    @Override
+    public Flux<TopicMessage<T>> publishAndReceive(Publisher<? extends T> publisher, long timeout, String messageId) {
+        return  Flux.from(publisher)
+            .flatMap(data -> operations.convertAndSend(topicName, data))
+            .flatMap(ignore->
+                operations.listenToPattern("message-reply" + messageId)
+                    .timeout(Duration.ofSeconds(timeout), Mono.error(new BusinessException("响应超时")))
+                    .map(data->
+                        new TopicMessage<T>() {
+                            @Override
+                            public String getTopic() {
+                                return data.getChannel();
+                            }
+
+                            @Override
+                            public T getMessage() {
+                                return data.getMessage();
+                            }
+                        })
+            );
+    }
 }

+ 8 - 0
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/cluster/RedisHaManager.java

@@ -156,9 +156,17 @@ public class RedisHaManager implements HaManager {
             .subscribe(msg -> {
                 ServerNode serverNode = msg.getMessage();
                 //自己
+
                 if (currentServer().isSame(serverNode)) {
                     return;
                 }
+                if(currentServer().duplication(serverNode)){
+                    if(currentServer().isAfter(serverNode)){
+                        //集群节点id重复且该节点后启动
+                        log.error("集群节点id    [{}]    不可重复,将关闭该应用",serverNode.getId());
+                        System.exit(1);
+                    }
+                }
                 serverNode.setLastKeepAlive(System.currentTimeMillis());
                 allNode.compute(serverNode.getId(), (id, node) -> {
                     if (node != null) {

+ 1 - 0
jetlinks-standalone/src/main/resources/application.yml

@@ -1,5 +1,6 @@
 server:
   port: 8848
+  shutdown: graceful
 
 spring:
   profiles:

+ 15 - 0
pom.xml

@@ -35,6 +35,7 @@
         <aliyun.sdk.version>4.5.6</aliyun.sdk.version>
         <aliyun.iot.sdk.version>7.29.0</aliyun.iot.sdk.version>
         <aliyun.bridge.sdk.version>2.4.1</aliyun.bridge.sdk.version>
+        <redisson.version>3.13.6</redisson.version>
     </properties>
 
     <build>
@@ -357,6 +358,19 @@
                 <artifactId>hsweb-easy-orm-elasticsearch</artifactId>
                 <version>${easyorm.version}</version>
             </dependency>
+
+
+            <dependency>
+                <groupId>org.redisson</groupId>
+                <artifactId>redisson-spring-boot-starter</artifactId>
+                <version>${redisson.version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>org.springframework.boot</groupId>
+                        <artifactId>spring-boot-starter-web</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 
@@ -368,6 +382,7 @@
             <scope>test</scope>
         </dependency>
 
+
         <dependency>
             <groupId>dev.miku</groupId>
             <artifactId>r2dbc-mysql</artifactId>