Bladeren bron

fix 规则引擎集群状态变化

18339543638 3 jaren geleden
bovenliggende
commit
88ea0e73d6

+ 12 - 1
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/DefaultDeviceSessionManager.java

@@ -5,6 +5,8 @@ import lombok.Getter;
 import lombok.Setter;
 import org.jetlinks.community.standalone.configuration.cluster.RedisHaManager;
 import org.jetlinks.community.utils.RedisUtil;
+import org.jetlinks.core.cluster.ClusterManager;
+import org.jetlinks.core.cluster.HaManager;
 import org.jetlinks.core.device.DeviceConfigKey;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.device.DeviceState;
@@ -65,6 +67,12 @@ public class DefaultDeviceSessionManager implements DeviceSessionManager {
 
     private final Map<String, LongAdder> transportCounter = new ConcurrentHashMap<>();
 
+    private final HaManager haManager;
+
+    public DefaultDeviceSessionManager(HaManager haManager) {
+        this.haManager = haManager;
+    }
+
     @Getter
     @Setter
     private Map<String, Long> transportLimits = new ConcurrentHashMap<>();
@@ -158,7 +166,10 @@ public class DefaultDeviceSessionManager implements DeviceSessionManager {
     }
 
     public void init() {
-
+        haManager.subscribeServerOffline()
+            .doOnNext(node->{
+                getRedisUtil().del("session_"+ node.getId());
+            }).subscribe();
         Objects.requireNonNull(gatewayServerMonitor, "gatewayServerMonitor");
         Objects.requireNonNull(registry, "registry");
         serverId = gatewayServerMonitor.getCurrentServerId();

+ 4 - 2
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/JetLinksConfiguration.java

@@ -19,6 +19,7 @@ import org.jetlinks.community.standalone.configuration.cluster.RedisClusterManag
 import org.jetlinks.core.ProtocolSupports;
 import org.jetlinks.core.cluster.ClusterEventBus;
 import org.jetlinks.core.cluster.ClusterManager;
+import org.jetlinks.core.cluster.HaManager;
 import org.jetlinks.core.config.ConfigStorageManager;
 import org.jetlinks.core.device.DeviceOperationBroker;
 import org.jetlinks.core.device.DeviceRegistry;
@@ -183,8 +184,9 @@ public class JetLinksConfiguration {
     @Bean(initMethod = "init", destroyMethod = "shutdown")
     public DefaultDeviceSessionManager deviceSessionManager(JetLinksProperties properties,
                                                             GatewayServerMonitor monitor,
-                                                            DeviceRegistry registry) {
-        DefaultDeviceSessionManager sessionManager = new DefaultDeviceSessionManager();
+                                                            DeviceRegistry registry,
+                                                            ClusterManager clusterManager) {
+        DefaultDeviceSessionManager sessionManager = new DefaultDeviceSessionManager(clusterManager.getHaManager());
         sessionManager.setGatewayServerMonitor(monitor);
         sessionManager.setRegistry(registry);
         Optional.ofNullable(properties.getTransportLimit()).ifPresent(sessionManager::setTransportLimits);