Browse Source

fix 媒体设备集群下线监听

18339543638 3 years ago
parent
commit
87feb54c42

+ 28 - 14
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalMediaDeviceService.java

@@ -130,10 +130,14 @@ public class LocalMediaDeviceService extends GenericReactiveCrudService<MediaDev
         //取消注册媒体设备
         redisUtil.setRemove("session_"+this.serverId,mediaDevice.getId());
         redisCacheStorage.removeDevice(mediaDevice.getId());
-        this.createUpdate()
-            .where(MediaDevice::getId,mediaDevice.getId())
-            .set(MediaDevice::getState,DeviceState.offline)
-            .execute()
+        Mono.zip( this.createUpdate()
+                .where(MediaDevice::getId,mediaDevice.getId())
+                .set(MediaDevice::getState,DeviceState.offline)
+                .execute(),
+            deviceChannelService.createUpdate()
+                .where(MediaDeviceChannel::getDeviceId,mediaDevice.getId())
+                .set(MediaDeviceChannel::getStatus,DeviceState.offline)
+                .execute())
             .flatMap(ignore->
                 //通用设备下线
                 registry.getDevice(mediaDevice.getId())
@@ -213,11 +217,16 @@ public class LocalMediaDeviceService extends GenericReactiveCrudService<MediaDev
             })
             .flatMap(ids->{
                 if(CollectionUtil.isNotEmpty(ids)){
-                    this.createUpdate()
-                        .where(MediaDevice::getState,DeviceState.online)
-                        .in(MediaDevice::getId,ids)
-                        .set(MediaDevice::getState,DeviceState.offline)
-                        .execute();
+                    return Mono.zip(this.createUpdate()
+                            .where(MediaDevice::getState,DeviceState.online)
+                            .in(MediaDevice::getId,ids)
+                            .set(MediaDevice::getState,DeviceState.offline)
+                            .execute(),
+                        deviceChannelService.createUpdate()
+                            .set(MediaDeviceChannel::getStatus,DeviceState.offline)
+                            .in(MediaDeviceChannel::getDeviceId,ids)
+                            .execute()
+                    );
                 }
                 return Mono.empty();
             })
@@ -225,11 +234,16 @@ public class LocalMediaDeviceService extends GenericReactiveCrudService<MediaDev
 
         //将本节点信息下线
         Set<Object> deviceIds = redisUtil.members("session_" + serverId);
-        this.createUpdate()
-            .where(MediaDevice::getState,DeviceState.online)
-            .in(MediaDevice::getId,deviceIds)
-            .set(MediaDevice::getState,DeviceState.offline)
-            .execute()
+        Mono.zip(this.createUpdate()
+                .where(MediaDevice::getState,DeviceState.online)
+                .in(MediaDevice::getId,deviceIds)
+                .set(MediaDevice::getState,DeviceState.offline)
+                .execute(),
+            deviceChannelService.createUpdate()
+                .set(MediaDeviceChannel::getStatus,DeviceState.offline)
+                .in(MediaDeviceChannel::getDeviceId,deviceIds)
+                .execute()
+        )
             .subscribe();
     }
 }