Ver código fonte

add 媒体设备刷新通道

18339543638 3 anos atrás
pai
commit
9467021852

+ 10 - 4
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/transmit/cmd/SipCommander.java

@@ -185,17 +185,23 @@ public class SipCommander {
             redisCacheStorage.addRequestStream(streamId,device.getId(),channelId);
             Request request =  headerProvider.createInviteRequest(device, channelId, content.toString(), null, "FromInvt" + tm, null, ssrcInfo.getSsrc(), callIdHeader);
             messageBroker.handleReply(device.getId(), ZLMKeyGenerate.getStreamChangedKey(ZLMHttpHookSubscribe.HookType.on_stream_changed,mediaServerItem.getServerId(),"rtp",true,streamId), Duration.ofSeconds(15))
-                .doOnNext(reply -> {
+                .flatMap(reply -> {
                     if(reply instanceof MediaMessageReply){
                         System.out.println("收到视频回调------------------------------------------" + System.currentTimeMillis() + "---------------------------------------");
                         MediaMessageReply<MediaItem> itemReply= (MediaMessageReply<MediaItem>) reply;
                         MediaItem mediaItem = itemReply.getResult();
                         if (userSetup.isWaitTrack() &&  mediaItem.getTracks() == null) {
-                            return;
+                            return Mono.empty();
                         }
-                        event.accept(redisCacheStorage.getMediaServerItem(mediaItem.getMediaServerId()), JSONUtil.parseObj(mediaItem));
-                        subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey);
+                       return mediaServerItemService.createQuery()
+                            .where(MediaServerItem::getServerId,mediaItem.getMediaServerId())
+                            .fetchOne()
+                            .doOnNext(mediaServer->{
+                                event.accept(mediaServer, JSONUtil.parseObj(mediaItem));
+                                subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey);
+                            });
                     }
+                    return Mono.empty();
                 }).subscribe();
 
 

+ 5 - 5
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/ZLMHttpHookListener.java

@@ -315,11 +315,11 @@ public class ZLMHttpHookListener {
         deviceMessageBroker.reply(mediaMessageReply).subscribe();
         if ("rtmp".equals(schema)){
             log.info("on_stream_changed:注册(true)/注销(false)->{}, app->{}, stream->{}", regist, app, streamId);
-            if (regist) {
-                result=result.flatMap(__->mediaServerItemService.addCount(mediaServerId).thenReturn(1L));
-            }else {
-                result=result.flatMap(__->mediaServerItemService.removeCount(mediaServerId).thenReturn(1L));
-            }
+//            if (regist) {
+//                result=result.flatMap(__->mediaServerItemService.addCount(mediaServerId).thenReturn(1L));
+//            }else {
+//                result=result.flatMap(__->mediaServerItemService.removeCount(mediaServerId).thenReturn(1L));
+//            }
             if (item.getOriginType() == OriginType.PULL.ordinal()
                 || item.getOriginType() == OriginType.FFMPEG_PULL.ordinal()) {
                 // 设置拉流代理上线/离线 todo