소스 검색

add 通道更新

18339543638 3 년 전
부모
커밋
0470f6fb57

+ 1 - 1
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/controller/MediaDeviceController.java

@@ -39,7 +39,7 @@ import java.util.UUID;
 @RequestMapping("/media/device")
 @Slf4j
 @Authorize(ignore = true)
-@Resource(id="media-deivce",name = "媒体流设备")
+@Resource(id="media-device",name = "媒体流设备")
 @AllArgsConstructor
 @Tag(name = "媒体视频设备")
 public class MediaDeviceController implements ReactiveServiceCrudController<MediaDevice, String>,DeferredController{

+ 36 - 9
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalMediaDeviceService.java

@@ -9,6 +9,7 @@ import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor;
 import org.jetlinks.community.gateway.monitor.GatewayMonitors;
 import org.jetlinks.community.media.entity.MediaDevice;
 import org.jetlinks.community.media.bean.SipServerConfig;
+import org.jetlinks.community.media.entity.MediaDeviceChannel;
 import org.jetlinks.community.media.enums.DeviceState;
 import org.jetlinks.community.media.session.SipSession;
 import org.jetlinks.community.media.sip.SipServerHelper;
@@ -20,6 +21,7 @@ import org.jetlinks.core.event.EventBus;
 import org.jetlinks.core.message.DeviceOfflineMessage;
 import org.jetlinks.core.message.DeviceOnlineMessage;
 import org.jetlinks.core.message.DeviceRegisterMessage;
+import org.jetlinks.core.message.DeviceUnRegisterMessage;
 import org.jetlinks.core.server.session.DeviceSession;
 import org.jetlinks.core.server.session.DeviceSessionManager;
 import org.jetlinks.core.utils.IdUtils;
@@ -54,6 +56,7 @@ public class LocalMediaDeviceService extends GenericReactiveCrudService<MediaDev
     private final DeviceGatewayMonitor gatewayMonitor;
     private final DecodedClientMessageHandler connector;
     private final DeviceSessionManager sessionManager;
+    private final LocalMediaDeviceChannelService deviceChannelService;
 
     public LocalMediaDeviceService(DecodedClientMessageHandler messageHandler,
                                    RedisCacheStorageImpl redisCacheStorage,
@@ -61,7 +64,8 @@ public class LocalMediaDeviceService extends GenericReactiveCrudService<MediaDev
                                    SipCommander cmder,
                                    DecodedClientMessageHandler connector,
                                    DeviceSessionManager sessionManager,
-                                   SipServerHelper sipServerHelper) {
+                                   SipServerHelper sipServerHelper,
+                                   LocalMediaDeviceChannelService deviceChannelService) {
         this.messageHandler = messageHandler;
         this.redisCacheStorage = redisCacheStorage;
         this.eventBus = eventBus;
@@ -71,6 +75,7 @@ public class LocalMediaDeviceService extends GenericReactiveCrudService<MediaDev
         this.connector = connector;
         this.sessionManager = sessionManager;
         this.sipServerHelper = sipServerHelper;
+        this.deviceChannelService=deviceChannelService;
     }
 
     @Subscribe("/media/device/*/*/register")
@@ -89,8 +94,7 @@ public class LocalMediaDeviceService extends GenericReactiveCrudService<MediaDev
 //                            SipSession sipSession = new SipSession(mediaDevice.getId(), operator, connector, 10L);
                             sipSession.onClose(()->{
                                 //设备下线
-                                unRegister(mediaDevice);
-                                sessionManager.unregister(mediaDevice.getId());
+                                deviceOffline(mediaDevice);
                             });
                             sessionManager.register(sipSession);
                         }
@@ -120,6 +124,31 @@ public class LocalMediaDeviceService extends GenericReactiveCrudService<MediaDev
             .subscribe();
     }
 
+    /**
+     * 设备下线
+     * @param mediaDevice 设备信息
+     */
+    private void deviceOffline(MediaDevice mediaDevice){
+        //取消注册媒体设备
+        redisCacheStorage.removeDevice(mediaDevice.getId());
+        this.createUpdate()
+            .where(MediaDevice::getId,mediaDevice.getId())
+            .set(MediaDevice::getState,DeviceState.offline)
+            .execute()
+            .flatMap(ignore->
+                //通用设备下线
+                registry.getDevice(mediaDevice.getId())
+                    .flatMap(operator -> {
+                        DeviceOfflineMessage message = new DeviceOfflineMessage();
+                        message.setTimestamp(System.currentTimeMillis());
+                        message.setDeviceId(mediaDevice.getId());
+                        message.setMessageId(IdUtils.newUUID());
+                        return messageHandler.handleMessage(operator,message);
+                    })
+            )
+            .subscribe();
+    }
+
     @Subscribe("/media/device/*/*/unregister")
     public void unRegister(MediaDevice mediaDevice){
         //取消注册媒体设备
@@ -128,21 +157,19 @@ public class LocalMediaDeviceService extends GenericReactiveCrudService<MediaDev
             session.close();
         }
         redisCacheStorage.removeDevice(mediaDevice.getId());
-        this.createUpdate()
-            .where(MediaDevice::getId,mediaDevice.getId())
-            .set(MediaDevice::getState,DeviceState.offline)
-            .execute()
+        this.deleteById(mediaDevice.getId())
             .flatMap(ignore->
                 //通用设备下线
                 registry.getDevice(mediaDevice.getId())
                     .flatMap(operator -> {
-                        DeviceOfflineMessage message = new DeviceOfflineMessage();
+                        DeviceUnRegisterMessage message = new DeviceUnRegisterMessage();
                         message.setTimestamp(System.currentTimeMillis());
-                        message.setDeviceId(mediaDevice.getIp());
+                        message.setDeviceId(mediaDevice.getId());
                         message.setMessageId(IdUtils.newUUID());
                         return messageHandler.handleMessage(operator,message);
                     })
             )
+            .mergeWith(deviceChannelService.createDelete().where(MediaDeviceChannel::getDeviceId,mediaDevice.getId()).execute().then(Mono.empty()))
             .subscribe();
     }
 

+ 5 - 0
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/session/SipSession.java

@@ -119,4 +119,9 @@ public class SipSession implements DeviceSession {
     public void onClose(Runnable call) {
         closeCallBack=call;
     }
+
+    @Override
+    protected void finalize() throws Throwable {
+        close();
+    }
 }

+ 10 - 0
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/DefaultDeviceSessionManager.java

@@ -2,6 +2,7 @@ package org.jetlinks.community.standalone.configuration;
 
 import lombok.Getter;
 import lombok.Setter;
+import org.jetlinks.community.standalone.configuration.cluster.RedisHaManager;
 import org.jetlinks.core.device.DeviceConfigKey;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.device.DeviceState;
@@ -22,6 +23,9 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.LongAdder;
 
 /**
+ * 集群会话管理
+ * 每个节点保存自身的节点全部信息和其他节点的副本信息,
+ * 对于其他节点的会话信息进行监听,当有节点发生丢失时,对该节点的设备会话信息进行掉线处理
  * @author zhouhao
  * @since 1.0.0
  */
@@ -32,6 +36,12 @@ public class DefaultDeviceSessionManager implements DeviceSessionManager {
 
     private final Map<String, Map<String, ChildrenDeviceSession>> children = new ConcurrentHashMap<>(4096);
 
+    /**
+     * 冗余会话信息
+     * 即保存其他节点的会话信息,当其他节点宕机时,判断该会话是否立即断开
+     */
+    private final Map<String,Set<String>> redundancySessions = new ConcurrentHashMap<>(4096);
+
     @Getter
     @Setter
     private Logger log = LoggerFactory.getLogger(DefaultDeviceSessionManager.class);