Browse Source

fix 媒体设备集群下线监听

18339543638 3 years ago
parent
commit
9970b82733

+ 23 - 0
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalMediaDeviceService.java

@@ -1,5 +1,6 @@
 package org.jetlinks.community.media.service;
 
+import cn.hutool.core.collection.CollectionUtil;
 import lombok.extern.slf4j.Slf4j;
 import org.hswebframework.web.crud.service.GenericReactiveCrudService;
 import org.jetlinks.community.gateway.annotation.Subscribe;
@@ -49,6 +50,7 @@ public class LocalMediaDeviceService extends GenericReactiveCrudService<MediaDev
     private final LocalMediaDeviceChannelService deviceChannelService;
     private final String serverId;
     private final RedisUtil redisUtil;
+    private final  ClusterManager clusterManager;
     public LocalMediaDeviceService(DecodedClientMessageHandler messageHandler,
                                    RedisCacheStorageImpl redisCacheStorage,
                                    EventBus eventBus, DeviceRegistry registry,
@@ -67,6 +69,7 @@ public class LocalMediaDeviceService extends GenericReactiveCrudService<MediaDev
         this.sessionManager = sessionManager;
         this.deviceChannelService=deviceChannelService;
         this.redisUtil=redisUtil;
+        this.clusterManager=clusterManager;
     }
 
     @Subscribe("/media/device/*/*/register")
@@ -198,6 +201,26 @@ public class LocalMediaDeviceService extends GenericReactiveCrudService<MediaDev
 
     @Override
     public void run(String... args) {
+        //监听其他节点信息
+        clusterManager.getHaManager().subscribeServerOffline()
+            .flatMap(node->{
+                Set<Object> members = redisUtil.members("session_" + node.getId());
+                redisUtil.del("session_"+ node.getId());
+                return Mono.just(members);
+            })
+            .flatMap(ids->{
+                if(CollectionUtil.isNotEmpty(ids)){
+                    this.createUpdate()
+                        .where(MediaDevice::getState,DeviceState.online)
+                        .in(MediaDevice::getId,ids)
+                        .set(MediaDevice::getState,DeviceState.offline)
+                        .execute();
+                }
+                return Mono.empty();
+            })
+            .subscribe();
+
+        //将本节点信息下线
         Set<Object> deviceIds = redisUtil.members("session_" + serverId);
         this.createUpdate()
             .where(MediaDevice::getState,DeviceState.online)

+ 2 - 0
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/impl/RegisterRequestProcessor.java

@@ -7,6 +7,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.jetlinks.community.media.entity.MediaDevice;
 import org.jetlinks.community.media.entity.SipGateway;
 import org.jetlinks.community.media.bean.WvpSipDate;
+import org.jetlinks.community.media.enums.DeviceState;
 import org.jetlinks.community.media.enums.StreamMode;
 import org.jetlinks.community.media.sip.SipContext;
 import org.jetlinks.community.media.sip.SipRequestProcessorParent;
@@ -155,6 +156,7 @@ public class RegisterRequestProcessor extends SipRequestProcessorParent {
             // 注册成功
             // 保存到redis
             // 下发catelog查询目录
+            device.setState(DeviceState.online);
             if (registerFlag == 1 ) {
                 device.setLastRegisterTime(System.currentTimeMillis());
                 log.info("[{}] 注册成功! id:" + device.getId(), requestAddress);

+ 3 - 7
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/DefaultDeviceSessionManager.java

@@ -1,5 +1,6 @@
 package org.jetlinks.community.standalone.configuration;
 
+import cn.hutool.core.collection.CollectionUtil;
 import cn.hutool.extra.spring.SpringUtil;
 import lombok.Getter;
 import lombok.Setter;
@@ -67,10 +68,9 @@ public class DefaultDeviceSessionManager implements DeviceSessionManager {
 
     private final Map<String, LongAdder> transportCounter = new ConcurrentHashMap<>();
 
-    private final HaManager haManager;
 
-    public DefaultDeviceSessionManager(HaManager haManager) {
-        this.haManager = haManager;
+    public DefaultDeviceSessionManager() {
+
     }
 
     @Getter
@@ -166,10 +166,6 @@ public class DefaultDeviceSessionManager implements DeviceSessionManager {
     }
 
     public void init() {
-        haManager.subscribeServerOffline()
-            .doOnNext(node->{
-                getRedisUtil().del("session_"+ node.getId());
-            }).subscribe();
         Objects.requireNonNull(gatewayServerMonitor, "gatewayServerMonitor");
         Objects.requireNonNull(registry, "registry");
         serverId = gatewayServerMonitor.getCurrentServerId();

+ 2 - 3
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/JetLinksConfiguration.java

@@ -184,9 +184,8 @@ public class JetLinksConfiguration {
     @Bean(initMethod = "init", destroyMethod = "shutdown")
     public DefaultDeviceSessionManager deviceSessionManager(JetLinksProperties properties,
                                                             GatewayServerMonitor monitor,
-                                                            DeviceRegistry registry,
-                                                            ClusterManager clusterManager) {
-        DefaultDeviceSessionManager sessionManager = new DefaultDeviceSessionManager(clusterManager.getHaManager());
+                                                            DeviceRegistry registry) {
+        DefaultDeviceSessionManager sessionManager = new DefaultDeviceSessionManager();
         sessionManager.setGatewayServerMonitor(monitor);
         sessionManager.setRegistry(registry);
         Optional.ofNullable(properties.getTransportLimit()).ifPresent(sessionManager::setTransportLimits);