Browse Source

add 媒体设备刷新通道

18339543638 3 years ago
parent
commit
c5f8f7a8ec

+ 4 - 2
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalPlayService.java

@@ -88,12 +88,13 @@ public class LocalPlayService  {
                             streamId = String.format("%s_%s", device.getId(), channelId);
                         }
                         ssrcInfo = mediaServerItemService.openRTPServer(mediaServerItem, streamId);
+                        redisCatchStorage.cacheStreamId(ssrcInfo.getStreamId(),device);
                         // 发送点播消息
                         return cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInUse,JSONObject response) -> {
                             log.info("收到订阅消息: " + response.toString());
                             onPublishHandlerForPlay(mediaServerItemInUse, response, deviceId, channelId).subscribe();
                             if (hookEvent != null) {
-                                hookEvent.accept(mediaServerItem, response);
+                                hookEvent.accept(mediaServerItemInUse, response);
                             }
                         }, (event) -> {
                             // 点播返回sip错误
@@ -110,10 +111,11 @@ public class LocalPlayService  {
                         String streamId = streamInfo.getStreamId();
                         if (streamId == null) {
                             redisCatchStorage.stopPlay(streamInfo);
-                            MediaMessageReply messageReply = MediaMessageReply.of("点播失败, redis缓存streamId等于null",null);
+                            MediaMessageReply messageReply = MediaMessageReply.of("点播失败, 请求已超时",null);
                             messageReply.setSuccess(false);
                             return Mono.empty();
                         }
+                        redisCatchStorage.cacheStreamId(streamId,device);
                         return    Mono.justOrEmpty(redisCatchStorage.getMediaServerItem(streamInfo.getMediaServerId()))
                             .flatMap(mediaInfo->{
                                 JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaInfo, streamId);

+ 12 - 3
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/storage/impl/RedisCacheStorageImpl.java

@@ -127,11 +127,11 @@ public class RedisCacheStorageImpl {
     }
 
     public StreamInfo queryPlayByStreamId(String streamId) {
-        List<Object> playLeys = redis.scan(String.format("%S_%s_%s_*", VideoManagerConstants.PLAYER_PREFIX, serverId, streamId));
-        if (playLeys == null || playLeys.size() == 0) {
+        List<Object> playLays = redis.scan(String.format("%S_%s_%s_*", VideoManagerConstants.PLAYER_PREFIX, serverId, streamId));
+        if (playLays == null || playLays.size() == 0) {
             return null;
         }
-        return (StreamInfo)redis.get(playLeys.get(0).toString());
+        return (StreamInfo)redis.get(playLays.get(0).toString());
     }
 
 
@@ -534,4 +534,13 @@ public class RedisCacheStorageImpl {
         return (MediaDeviceChannel) redis.get(key);
     }
 
+    public void cacheStreamId(String streamId, MediaDevice device) {
+        String key=VideoManagerConstants.MEDIA_STREAM_PREFIX+serverId+"_CACHE_"+streamId;
+        redis.set(key,device,30);
+    }
+
+    public MediaDevice queryDeviceByStreamId(String streamId) {
+        String key=VideoManagerConstants.MEDIA_STREAM_PREFIX+serverId+"_CACHE_"+streamId;
+        return (MediaDevice) redis.get(key);
+    }
 }

+ 3 - 6
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/ZLMHttpHookListener.java

@@ -10,7 +10,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.hswebframework.web.authorization.annotation.Authorize;
 import org.jetlinks.community.media.bean.SSRCInfo;
 import org.jetlinks.community.media.config.UserSetup;
-import org.jetlinks.community.media.contanst.VideoManagerConstants;
 import org.jetlinks.community.media.bean.StreamInfo;
 import org.jetlinks.community.media.entity.MediaDevice;
 import org.jetlinks.community.media.entity.MediaDeviceChannel;
@@ -26,7 +25,6 @@ import org.jetlinks.community.media.zlm.dto.StreamPushItem;
 import org.jetlinks.community.media.zlm.dto.OriginType;
 import org.jetlinks.community.media.zlm.entity.MediaServerItem;
 import org.jetlinks.community.utils.ZLMKeyGenerate;
-import org.jetlinks.core.cluster.ClusterEventBus;
 import org.jetlinks.core.device.StandaloneDeviceMessageBroker;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
@@ -36,8 +34,6 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.FluxSink;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.UnicastProcessor;
-
-import java.util.List;
 import java.util.function.Function;
 
 /**
@@ -309,11 +305,12 @@ public class ZLMHttpHookListener {
             System.out.println("视频回调开始-------------streamId-------"+streamId+"-----------------------------" + System.currentTimeMillis() + "---------------------------------------");
         }
         StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId);
+        MediaDevice device = redisCatchStorage.queryDeviceByStreamId(streamId);
         MediaMessageReply<MediaItem> mediaMessageReply = MediaMessageReply.of(null,item);
         mediaMessageReply.setMessageId(ZLMKeyGenerate.getStreamChangedKey(ZLMHttpHookSubscribe.HookType.on_stream_changed,mediaServerId,app,regist,streamId));
         mediaMessageReply.setSuccess(true);
-        if(streamInfo!=null){
-            mediaMessageReply.setDeviceId(streamInfo.getDeviceID());
+        if(device!=null){
+            mediaMessageReply.setDeviceId(device.getId());
         }
         deviceMessageBroker.reply(mediaMessageReply).subscribe();
         if ("rtmp".equals(schema)){