Browse Source

changed 改变网络泵解析方式

18339543638 3 years ago
parent
commit
d419c5ed2d

+ 0 - 4
jetlinks-components/notify-component/notify-webhook/src/main/java/org/jetlinks/community/notify/webhook/WebHookNotifier.java

@@ -114,10 +114,6 @@ public class WebHookNotifier extends AbstractNotifier<WebHookTemplate> {
     }
     }
 
 
 
 
-    public static void main(String[] args) {
-        HttpResponse execute = HttpUtil.createGet("https://cn.bing.com/").execute();
-        System.out.println(execute);
-    }
     @Nonnull
     @Nonnull
     @Override
     @Override
     public Mono<Void> close() {
     public Mono<Void> close() {

+ 91 - 103
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/controller/MediaDeviceController.java

@@ -2,6 +2,7 @@ package org.jetlinks.community.media.controller;
 
 
 import cn.hutool.json.JSONObject;
 import cn.hutool.json.JSONObject;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Maps;
+import gov.nist.javax.sip.stack.SIPDialog;
 import io.swagger.v3.oas.annotations.Operation;
 import io.swagger.v3.oas.annotations.Operation;
 import io.swagger.v3.oas.annotations.Parameter;
 import io.swagger.v3.oas.annotations.Parameter;
 import io.swagger.v3.oas.annotations.tags.Tag;
 import io.swagger.v3.oas.annotations.tags.Tag;
@@ -16,18 +17,23 @@ import org.jetlinks.community.media.bean.StreamInfo;
 import org.jetlinks.community.media.entity.MediaDevice;
 import org.jetlinks.community.media.entity.MediaDevice;
 import org.jetlinks.community.media.gb28181.result.PlayResult;
 import org.jetlinks.community.media.gb28181.result.PlayResult;
 import org.jetlinks.community.media.gb28181.result.WVPResult;
 import org.jetlinks.community.media.gb28181.result.WVPResult;
+import org.jetlinks.community.media.message.MediaMessage;
 import org.jetlinks.community.media.message.MediaMessageReply;
 import org.jetlinks.community.media.message.MediaMessageReply;
 import org.jetlinks.community.media.service.LocalMediaDeviceChannelService;
 import org.jetlinks.community.media.service.LocalMediaDeviceChannelService;
 import org.jetlinks.community.media.service.LocalMediaDeviceService;
 import org.jetlinks.community.media.service.LocalMediaDeviceService;
 import org.jetlinks.community.media.service.LocalMediaServerItemService;
 import org.jetlinks.community.media.service.LocalMediaServerItemService;
 import org.jetlinks.community.media.service.LocalPlayService;
 import org.jetlinks.community.media.service.LocalPlayService;
+import org.jetlinks.community.media.session.VideoStreamSessionManager;
 import org.jetlinks.community.media.storage.impl.RedisCacheStorageImpl;
 import org.jetlinks.community.media.storage.impl.RedisCacheStorageImpl;
 import org.jetlinks.community.media.transmit.callback.DeferredResultHolder;
 import org.jetlinks.community.media.transmit.callback.DeferredResultHolder;
 import org.jetlinks.community.media.transmit.callback.RequestMessage;
 import org.jetlinks.community.media.transmit.callback.RequestMessage;
 import org.jetlinks.community.media.transmit.cmd.SipCommander;
 import org.jetlinks.community.media.transmit.cmd.SipCommander;
+import org.jetlinks.community.utils.SubscribeKeyGenerate;
 import org.jetlinks.core.device.DeviceOperationBroker;
 import org.jetlinks.core.device.DeviceOperationBroker;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.device.StandaloneDeviceMessageBroker;
 import org.jetlinks.core.device.StandaloneDeviceMessageBroker;
+import org.jetlinks.core.exception.DeviceOperationException;
+import org.jetlinks.core.message.DeviceMessageReply;
 import org.jetlinks.core.server.MessageHandler;
 import org.jetlinks.core.server.MessageHandler;
 import org.springframework.http.ResponseEntity;
 import org.springframework.http.ResponseEntity;
 import org.springframework.web.bind.annotation.*;
 import org.springframework.web.bind.annotation.*;
@@ -36,6 +42,7 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.Mono;
 import reactor.util.function.Tuple2;
 import reactor.util.function.Tuple2;
 
 
+import java.time.Duration;
 import java.util.*;
 import java.util.*;
 import java.util.stream.Collectors;
 import java.util.stream.Collectors;
 
 
@@ -56,12 +63,13 @@ import java.util.stream.Collectors;
 public class MediaDeviceController implements ReactiveServiceCrudController<MediaDevice, String>,DeferredController{
 public class MediaDeviceController implements ReactiveServiceCrudController<MediaDevice, String>,DeferredController{
     private final LocalMediaDeviceService mediaDeviceService;
     private final LocalMediaDeviceService mediaDeviceService;
     private final RedisCacheStorageImpl redisCacheStorage;
     private final RedisCacheStorageImpl redisCacheStorage;
-    private final DeferredResultHolder resultHolder;
     private final SipCommander cmder;
     private final SipCommander cmder;
     private final LocalPlayService playService;
     private final LocalPlayService playService;
     private final LocalMediaDeviceChannelService deviceChannelService;
     private final LocalMediaDeviceChannelService deviceChannelService;
     private final LocalMediaServerItemService mediaServerItemService;
     private final LocalMediaServerItemService mediaServerItemService;
     private final DeviceRegistry registry;
     private final DeviceRegistry registry;
+    private final StandaloneDeviceMessageBroker messageBroker;
+    private final VideoStreamSessionManager streamSessionManager;
     @Override
     @Override
     public ReactiveCrudService<MediaDevice, String> getService() {
     public ReactiveCrudService<MediaDevice, String> getService() {
         return mediaDeviceService;
         return mediaDeviceService;
@@ -85,131 +93,111 @@ public class MediaDeviceController implements ReactiveServiceCrudController<Medi
                 //获取设备相连的媒体流服务器信息
                 //获取设备相连的媒体流服务器信息
                 .flatMap(playService::getNewMediaServerItem)
                 .flatMap(playService::getNewMediaServerItem)
                 .switchIfEmpty(Mono.error(new BusinessException("未找到可用的zlm媒体服务器")))
                 .switchIfEmpty(Mono.error(new BusinessException("未找到可用的zlm媒体服务器")))
-                .flatMapMany(mediaServerItem ->playService.play(mediaServerItem, deviceId, channelId, null, null)
-                    .map(MediaMessageReply::getResult));
+                .flatMapMany(mediaServerItem -> {
+                    String key = SubscribeKeyGenerate.getSubscribeKey(DeferredResultHolder.CALLBACK_CMD_PLAY,deviceId,channelId);
+                    return messageBroker.handleReply(deviceId,key,Duration.ofSeconds(10))
+                        .onErrorResume(DeviceOperationException.class,error->
+                            //超时响应处理
+                            Mono.defer(()->{
+                                log.warn(String.format("设备点播超时,deviceId:%s ,channelId:%s", deviceId, channelId));
+                                MediaMessageReply messageReply = new MediaMessageReply();
+                                messageReply.setDeviceId(deviceId);
+                                messageReply.setMessageId(key);
+                                messageReply.setSuccess(false);
+                                SIPDialog dialog = streamSessionManager.getDialog(deviceId, channelId);
+                                if (dialog != null) {
+                                    messageReply.setErrMsg("收流超时,请稍候重试");
+                                }else {
+                                    messageReply.setErrMsg("点播超时,请稍候重试");
+                                }
+                                // 点播超时回复BYE
+                                return cmder.streamByeCmd(deviceId, channelId)
+                                    .doOnNext(ignore->
+                                        // 释放rtpserver
+                                        mediaServerItemService.closeRTPServer(deviceId, channelId))
+                                    .flatMap(ignore->
+                                        // 回复之前所有的点播请求
+                                        messageBroker.reply(messageReply));
+                            }).then(Mono.error(error))
+                        )
+                        .mergeWith(playService.play(mediaServerItem, deviceId, channelId, null, null).thenMany(Flux.empty()))
+                        .flatMap(this::convertReply);
+                })
+            ;
     }
     }
 
 
 
 
     @QueryAction
     @QueryAction
     @Operation(summary = "停止点播")
     @Operation(summary = "停止点播")
     @GetMapping("/{deviceId}/{channelId}/_stop")
     @GetMapping("/{deviceId}/{channelId}/_stop")
-    public Mono<Object> playStop(@PathVariable String deviceId, @PathVariable String channelId) {
+    public Flux<Object> playStop(@PathVariable String deviceId, @PathVariable String channelId) {
         if(log.isDebugEnabled()){
         if(log.isDebugEnabled()){
             log.debug(String.format("设备预览/回放停止API调用,streamId:%s_%s", deviceId, channelId ));
             log.debug(String.format("设备预览/回放停止API调用,streamId:%s_%s", deviceId, channelId ));
         }
         }
-        String uuid = cn.hutool.core.lang.UUID.randomUUID().toString();
-        DeferredResult<ResponseEntity<String>> result = new DeferredResult<>();
         // 录像查询以channelId作为deviceId查询
         // 录像查询以channelId作为deviceId查询
         String key = DeferredResultHolder.CALLBACK_CMD_STOP + deviceId + channelId;
         String key = DeferredResultHolder.CALLBACK_CMD_STOP + deviceId + channelId;
-        resultHolder.put(key, uuid, result);
-        PlayResult<String> playResult = new PlayResult<String>();
-        playResult.setResult(result);
-        // 超时处理
-        playResult.onTimeout(Mono.fromRunnable(()->{
-            log.warn(String.format("设备预览/回放停止响应超时,deviceId/channelId:%s_%s ", deviceId, channelId));
-            RequestMessage msg = new RequestMessage();
-            msg.setId(uuid);
-            msg.setKey(key);
-            msg.setData("设备预览/回放停止响应超时");
-            resultHolder.invokeAllResult(msg);
-        }));
-
-        return Mono.justOrEmpty(redisCacheStorage.getDevice(deviceId))
-            .flatMap(device->
-                Mono.zip(cmder.streamByeCmd(deviceId, channelId, (event) -> {
-                    StreamInfo streamInfo = redisCacheStorage.queryPlayByDevice(deviceId, channelId);
-                    if (streamInfo == null) {
-                        RequestMessage msg = new RequestMessage();
-                        msg.setId(uuid);
-                        msg.setKey(key);
-                        msg.setData("点播未找到");
-                        resultHolder.invokeAllResult(msg);
-                        deviceChannelService.stopPlay(deviceId, channelId).subscribe();
-                    }else {
-                        redisCacheStorage.stopPlay(streamInfo);
-                        deviceChannelService.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId()).subscribe();
-                        RequestMessage msg = new RequestMessage();
-                        msg.setId(uuid);
-                        msg.setKey(key);
-                        msg.setData("success");
-                        resultHolder.invokeAllResult(msg);
-                    }
-                    mediaServerItemService.closeRTPServer(device, channelId);
-                })
-                    .mergeWith(Mono.fromRunnable(()->{
-                        if (deviceId != null || channelId != null) {
-                            JSONObject json = new JSONObject()
-                                .putOpt("deviceId", deviceId)
-                                .putOpt("channelId", channelId);
-                            RequestMessage msg = new RequestMessage();
-                            msg.setId(uuid);
-                            msg.setKey(key);
-                            msg.setData(json.toString());
-                            resultHolder.invokeAllResult(msg);
-                        } else {
-                            log.warn("设备预览/回放停止API调用失败!");
-                            RequestMessage msg = new RequestMessage();
-                            msg.setId(uuid);
-                            msg.setKey(key);
-                            msg.setData("streamId null");
-                            resultHolder.invokeAllResult(msg);
-                        }
-                    }))
-                    .then(),Mono.just(deferredResultHandler(playResult)))
-            )
-            .flatMap(Tuple2::getT2);
+        MediaMessage mediaMessage = new MediaMessage();
+        mediaMessage.setDeviceId(deviceId);
+        mediaMessage.setMessageId(key);
+        return messageBroker.handleReply(deviceId,key, Duration.ofSeconds(10))
+            .onErrorResume(DeviceOperationException.class,error->
+                Mono.error(new BusinessException("设备预览/回放停止响应超时")))
+            .mergeWith(
+                Mono.justOrEmpty(redisCacheStorage.getDevice(deviceId))
+                    .flatMap(device->
+                        cmder.streamByeCmd(deviceId, channelId, (event) -> {
+                            MediaMessageReply<String> mediaMessageReply = new MediaMessageReply<>();
+                            mediaMessageReply.setMessageId(key);
+                            mediaMessage.setDeviceId(deviceId);
+                            StreamInfo streamInfo = redisCacheStorage.queryPlayByDevice(deviceId, channelId);
+                            if(streamInfo!=null){
+                                redisCacheStorage.stopPlay(streamInfo);
+                            }
+                            messageBroker.reply(mediaMessageReply)
+                                .then(deviceChannelService.stopPlay(deviceId, channelId))
+                                .doOnNext(ignore -> mediaServerItemService.closeRTPServer(device, channelId))
+                                .subscribe();
+                        }).then(Mono.empty())))
+            .flatMap(this::convertReply);
     }
     }
 
 
     @PostMapping("/{deviceId}/channels/_sync")
     @PostMapping("/{deviceId}/channels/_sync")
     @CreateAction
     @CreateAction
     @Operation(summary = "更新通道")
     @Operation(summary = "更新通道")
-    public Mono<Object> getDeviceDetailInfo(@PathVariable("deviceId") @Parameter(description = "设备ID") String id) {
+    public Flux<Object> getDeviceDetailInfo(@PathVariable("deviceId") @Parameter(description = "设备ID") String id) {
         if (log.isDebugEnabled()) {
         if (log.isDebugEnabled()) {
             log.debug("设备通道信息同步API调用,deviceId:" + id);
             log.debug("设备通道信息同步API调用,deviceId:" + id);
         }
         }
         MediaDevice device = redisCacheStorage.getDevice(id);
         MediaDevice device = redisCacheStorage.getDevice(id);
         if(device==null){
         if(device==null){
-            return Mono.error(new BusinessException("设备已离线,无法更新通道最新信息"));
+            return Flux.error(new BusinessException("设备已离线,无法更新通道最新信息"));
         }
         }
-        PlayResult<MediaDevice> playResult = new PlayResult<>();
         String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + id;
         String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + id;
-        String uuid = UUID.randomUUID().toString();
-        //默认超时时间为30分钟
-        DeferredResult<ResponseEntity<MediaDevice>> result = new DeferredResult<>();
-        playResult.setResult(result);
-        playResult.onTimeout(Mono.fromRunnable(()->{
-            log.warn("设备[{}]通道信息同步超时", id);
-            // 释放rtpserver
-            RequestMessage msg = new RequestMessage();
-            msg.setKey(key);
-            msg.setId(uuid);
-            WVPResult<Object> wvpResult = new WVPResult<>();
-            wvpResult.setCode(-1);
-            wvpResult.setData(device);
-            wvpResult.setMsg("设备响应超时,请检查设备是否在线或网络是否通畅");
-            msg.setData(wvpResult);
-            //设备下线
-            mediaDeviceService.deviceOffline(device);
-            resultHolder.invokeAllResult(msg);
-        }));
-        // 等待其他相同请求返回时一起返回
-        if (resultHolder.exist(key, null)) {
-            return deferredResultHandler(playResult);
-        }
-        resultHolder.put(key, uuid, result);
-        return cmder.catalogQuery(device, event -> {
-            RequestMessage msg = new RequestMessage();
-            msg.setKey(key);
-            msg.setId(uuid);
-            WVPResult<Object> wvpResult = new WVPResult<>();
-            wvpResult.setCode(-1);
-            wvpResult.setData(device);
-            wvpResult.setMsg(String.format("同步通道失败,错误码: %s, %s", event.statusCode, event.msg));
-            msg.setData(wvpResult);
-            resultHolder.invokeAllResult(msg);
-        })
-            .thenReturn(1L)
-            .flatMap(ignore->deferredResultHandler(playResult));
+        return  messageBroker.handleReply(id,key,Duration.ofSeconds(10))
+            .onErrorResume(DeviceOperationException.class,error-> {
+                //设备下线
+                mediaDeviceService.deviceOffline(device);
+                return Mono.error(new BusinessException("设备响应超时"));
+            })
+            .mergeWith(cmder.catalogQuery(device, event -> {
+                MediaMessageReply<String> reply = MediaMessageReply.of(String.format("同步通道失败,错误码: %s, %s", event.statusCode, event.msg), null);
+                reply.setSuccess(false);
+                reply.setMessageId(key);
+                messageBroker.reply(reply).subscribe();
+            })
+                .then(Mono.empty()))
+            .flatMap(this::convertReply);
     }
     }
 
 
+    private Flux<Object> convertReply(DeviceMessageReply reply){
+        if(reply instanceof MediaMessageReply){
+            MediaMessageReply<Object> messageReply= (MediaMessageReply<Object>) reply;
+            if(messageReply.isSuccess()){
+                return  Flux.just(messageReply.getResult());
+            }else {
+                return Flux.error(new BusinessException(Optional.ofNullable(messageReply.getErrMsg()).orElse("响应错误,请重试")));
+            }
+        }
+        return Flux.error(new BusinessException("服务器响应错误"));
+    }
 }
 }

+ 12 - 9
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalMediaServerItemService.java

@@ -162,18 +162,21 @@ public class LocalMediaServerItemService extends GenericReactiveCrudService<Medi
 
 
 
 
     public void closeRTPServer(MediaDevice device, String channelId) {
     public void closeRTPServer(MediaDevice device, String channelId) {
-        String mediaServerId = streamSession.getMediaServerId(device.getId(), channelId);
+        closeRTPServer(device.getId(),channelId);
+    }
+
+    public void closeRTPServer(String deviceId, String channelId) {
+        String mediaServerId = streamSession.getMediaServerId(deviceId, channelId);
         MediaServerItem mediaServerItem = this.getOneByServerId(mediaServerId);
         MediaServerItem mediaServerItem = this.getOneByServerId(mediaServerId);
         if (mediaServerItem != null) {
         if (mediaServerItem != null) {
             //更新sstc信息
             //更新sstc信息
-            String streamId = String.format("%s_%s", device.getId(), channelId);
+            String streamId = String.format("%s_%s", deviceId, channelId);
             zlmrtpServerFactory.closeRTPServer(mediaServerItem, streamId);
             zlmrtpServerFactory.closeRTPServer(mediaServerItem, streamId);
-            releaseSsrc(mediaServerItem, streamSession.getSSRC(device.getId(), channelId));
+            releaseSsrc(mediaServerItem, streamSession.getSSRC(deviceId, channelId));
         }
         }
-        streamSession.remove(device.getId(), channelId);
+        streamSession.remove(deviceId, channelId);
     }
     }
 
 
-
     public void releaseSsrc(MediaServerItem mediaServerItem, String ssrc) {
     public void releaseSsrc(MediaServerItem mediaServerItem, String ssrc) {
         if (mediaServerItem == null || ssrc == null) {
         if (mediaServerItem == null || ssrc == null) {
             return;
             return;
@@ -510,8 +513,8 @@ public class LocalMediaServerItemService extends GenericReactiveCrudService<Medi
         log.info("[ ZLM:{} ]-[ {}:{} ]设置zlm",
         log.info("[ ZLM:{} ]-[ {}:{} ]设置zlm",
             mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
             mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
         String protocol = sslEnabled ? "https" : "http";
         String protocol = sslEnabled ? "https" : "http";
-//        String hookPrex = String.format("%s://%s:%s/index/hook", protocol, mediaServerItem.getHookIp(), serverPort);
-        String hookPrex = "http://192.168.104.244:8848/index/hook";
+        String hookPrex = String.format("%s://%s:%s/index/hook", protocol, mediaServerItem.getHookIp(), serverPort);
+//        String hookPrex = "http://192.168.104.244:8848/index/hook";
         String recordHookPrex = null;
         String recordHookPrex = null;
         if (mediaServerItem.getRecordAssistPort() != 0) {
         if (mediaServerItem.getRecordAssistPort() != 0) {
             recordHookPrex = String.format("http://127.0.0.1:%s/api/record", mediaServerItem.getRecordAssistPort());
             recordHookPrex = String.format("http://127.0.0.1:%s/api/record", mediaServerItem.getRecordAssistPort());
@@ -709,8 +712,8 @@ public class LocalMediaServerItemService extends GenericReactiveCrudService<Medi
         if (mediaInfo.getRtspSSLPort() != 0) {
         if (mediaInfo.getRtspSSLPort() != 0) {
             streamInfoResult.setRtsps(String.format("rtsps://%s:%s/%s/%s", addr, mediaInfo.getRtspSSLPort(), app,  stream));
             streamInfoResult.setRtsps(String.format("rtsps://%s:%s/%s/%s", addr, mediaInfo.getRtspSSLPort(), app,  stream));
         }
         }
-        streamInfoResult.setFlv(String.format("http://%s:%s/%s/%s.live.flv", addr, mediaInfo.getHttpPort(), app,  stream));
-        streamInfoResult.setWs_flv(String.format("ws://%s:%s/%s/%s.live.flv", addr, mediaInfo.getHttpPort(), app,  stream));
+        streamInfoResult.setFlv(String.format("http://%s:%s/%s/%s.flv", addr, mediaInfo.getHttpPort(), app,  stream));
+        streamInfoResult.setWs_flv(String.format("ws://%s:%s/%s/%s.flv", addr, mediaInfo.getHttpPort(), app,  stream));
         streamInfoResult.setHls(String.format("http://%s:%s/%s/%s/hls.m3u8", addr, mediaInfo.getHttpPort(), app,  stream));
         streamInfoResult.setHls(String.format("http://%s:%s/%s/%s/hls.m3u8", addr, mediaInfo.getHttpPort(), app,  stream));
         streamInfoResult.setWs_hls(String.format("ws://%s:%s/%s/%s/hls.m3u8", addr, mediaInfo.getHttpPort(), app,  stream));
         streamInfoResult.setWs_hls(String.format("ws://%s:%s/%s/%s/hls.m3u8", addr, mediaInfo.getHttpPort(), app,  stream));
         streamInfoResult.setFmp4(String.format("http://%s:%s/%s/%s.live.mp4", addr, mediaInfo.getHttpPort(), app,  stream));
         streamInfoResult.setFmp4(String.format("http://%s:%s/%s/%s.live.mp4", addr, mediaInfo.getHttpPort(), app,  stream));

+ 79 - 106
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalPlayService.java

@@ -62,10 +62,6 @@ public class LocalPlayService  {
 
 
     private final LocalMediaServerItemService mediaServerItemService;
     private final LocalMediaServerItemService mediaServerItemService;
 
 
-    private final DeferredResultHolder resultHolder;
-
-    private final VideoStreamSessionManager streamSessionManager;
-
     private final LocalMediaDeviceChannelService deviceChannelService;
     private final LocalMediaDeviceChannelService deviceChannelService;
 
 
     private final ZLMRESTfulUtils zlmresTfulUtils;
     private final ZLMRESTfulUtils zlmresTfulUtils;
@@ -74,119 +70,95 @@ public class LocalPlayService  {
 
 
     private final StandaloneDeviceMessageBroker messageBroker;
     private final StandaloneDeviceMessageBroker messageBroker;
 
 
-    public Flux<MediaMessageReply> play(MediaServerItem mediaServerItem, String deviceId, String channelId,
+    public Mono<Void> play(MediaServerItem mediaServerItem, String deviceId, String channelId,
                                         ZLMHttpHookSubscribe.Event hookEvent,  SipSubscribe.Event errorEvent) {
                                         ZLMHttpHookSubscribe.Event hookEvent,  SipSubscribe.Event errorEvent) {
         String key = SubscribeKeyGenerate.getSubscribeKey(DeferredResultHolder.CALLBACK_CMD_PLAY,deviceId,channelId);
         String key = SubscribeKeyGenerate.getSubscribeKey(DeferredResultHolder.CALLBACK_CMD_PLAY,deviceId,channelId);
         if (mediaServerItem == null) {
         if (mediaServerItem == null) {
-            return Flux.error(new BusinessException("未找到可用的zlm"));
+            return Mono.error(new BusinessException("未找到可用的zlm"));
         }
         }
         MediaDevice device = redisCatchStorage.getDevice(deviceId);
         MediaDevice device = redisCatchStorage.getDevice(deviceId);
-        return  Mono.justOrEmpty(redisCatchStorage.queryPlayByDevice(deviceId, channelId))
-            .switchIfEmpty(Mono.defer(()->{
-                SSRCInfo ssrcInfo;
-                String streamId = null;
-                if (mediaServerItem.isRtpEnable()) {
-                    streamId = String.format("%s_%s", device.getId(), channelId);
+        StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
+        return streamInfo==null?
+            Mono.defer(()->{
+            SSRCInfo ssrcInfo;
+            String streamId = null;
+            if (mediaServerItem.isRtpEnable()) {
+                streamId = String.format("%s_%s", device.getId(), channelId);
+            }
+            ssrcInfo = mediaServerItemService.openRTPServer(mediaServerItem, streamId);
+            // 发送点播消息
+            return cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInUse,JSONObject response) -> {
+                log.info("收到订阅消息: " + response.toString());
+                onPublishHandlerForPlay(mediaServerItemInUse, response, deviceId, channelId).subscribe();
+                if (hookEvent != null) {
+                    hookEvent.accept(mediaServerItem, response);
                 }
                 }
-                ssrcInfo = mediaServerItemService.openRTPServer(mediaServerItem, streamId);
-                // 发送点播消息
-                return cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInUse,JSONObject response) -> {
-                    log.info("收到订阅消息: " + response.toString());
-                    onPublishHandlerForPlay(mediaServerItemInUse, response, deviceId, channelId).subscribe();
-                    if (hookEvent != null) {
-                        hookEvent.accept(mediaServerItem, response);
-                    }
-                }, (event) -> {
-                    // 点播返回sip错误
-                    mediaServerItemService.closeRTPServer(device, channelId);
-                    MediaMessageReply messageReply =   MediaMessageReply.of(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg),null);
-                    messageReply.setSuccess(false);
-                    if (errorEvent != null) {
-                        errorEvent.accept(event);
-                    }
-                    messageBroker.reply(messageReply).subscribe();
-                }).then(Mono.empty());
-            }))
-            .flatMap(streamInfo->{
-                String streamId = streamInfo.getStreamId();
-                if (streamId == null) {
-                    MediaMessageReply messageReply = MediaMessageReply.of("点播失败, redis缓存streamId等于null",null);
-                    messageReply.setSuccess(false);
-                    return Mono.just(messageReply);
+            }, (event) -> {
+                // 点播返回sip错误
+                mediaServerItemService.closeRTPServer(device, channelId);
+                MediaMessageReply messageReply =   MediaMessageReply.of(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg),null);
+                messageReply.setSuccess(false);
+                if (errorEvent != null) {
+                    errorEvent.accept(event);
                 }
                 }
-                return mediaServerItemService.findById(streamInfo.getMediaServerId())
-                    .flatMap(mediaInfo->{
-                        JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaInfo, streamId);
-                        if (rtpInfo != null && rtpInfo.getBool("exist")) {
-                            MediaMessageReply messageReply = MediaMessageReply.of(null, streamInfo);
-                            messageReply.setSuccess(true);
-                            messageBroker.reply(messageReply).subscribe();
-                            if (hookEvent != null) {
-                                try {
-                                    hookEvent.accept(mediaServerItem, JSONUtil.parseObj(JSON.toJSONString(streamInfo)));
-                                } catch (Exception e) {
-                                    log.error("点播回调函数失败,",e);
-                                }
-                            }
-                        } else {
-                            // TODO 点播前是否重置状态
-                            redisCatchStorage.stopPlay(streamInfo);
-                            deviceChannelService.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
-                            SSRCInfo ssrcInfo;
-                            String streamId2 = null;
-                            if (mediaServerItem.isRtpEnable()) {
-                                streamId2 = String.format("%s_%s", device.getId(), channelId);
-                            }
-                            ssrcInfo = mediaServerItemService.openRTPServer(mediaServerItem, streamId2);
-
-                            return cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
-                                log.info("收到订阅消息: " + response.toString());
-                                onPublishHandlerForPlay(mediaServerItemInuse, response, deviceId, channelId).subscribe();
-                            }, (event) -> {
-                                mediaServerItemService.closeRTPServer(device, channelId);
-                                MediaMessageReply messageReply = MediaMessageReply.of(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
-                                messageBroker.reply(messageReply).subscribe();
-                            });
-                        }
-                        return Mono.empty();
-                    });
-            })
-            .thenMany(messageBroker.handleReply(deviceId,key,Duration.ofSeconds(10))
-                .onErrorResume(DeviceOperationException.class,error->
-                    //超时响应处理
-                    Mono.defer(()->{
-                        log.warn(String.format("设备点播超时,deviceId:%s ,channelId:%s", deviceId, channelId));
-                        MediaMessageReply messageReply = new MediaMessageReply();
-                        messageReply.setDeviceId(deviceId);
+                messageBroker.reply(messageReply).subscribe();
+            }).then(Mono.empty());
+        }):
+            Mono.defer(()->{
+            String streamId = streamInfo.getStreamId();
+            if (streamId == null) {
+                redisCatchStorage.stopPlay(streamInfo);
+                MediaMessageReply messageReply = MediaMessageReply.of("点播失败, redis缓存streamId等于null",null);
+                messageReply.setSuccess(false);
+                return Mono.empty();
+            }
+            return mediaServerItemService.findById(streamInfo.getMediaServerId())
+                .flatMap(mediaInfo->{
+                    JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaInfo, streamId);
+                    if (rtpInfo != null && rtpInfo.getBool("exist")) {
+                        MediaMessageReply messageReply = MediaMessageReply.of(null, streamInfo);
                         messageReply.setMessageId(key);
                         messageReply.setMessageId(key);
-                        messageReply.setSuccess(false);
-                        SIPDialog dialog = streamSessionManager.getDialog(deviceId, channelId);
-                        if (dialog != null) {
-                            messageReply.setErrMsg("收流超时,请稍候重试");
-                        }else {
-                            messageReply.setErrMsg("点播超时,请稍候重试");
+                        messageReply.setSuccess(true);
+                        if (hookEvent != null) {
+                            try {
+                                hookEvent.accept(mediaServerItem, JSONUtil.parseObj(JSON.toJSONString(streamInfo)));
+                            } catch (Exception e) {
+                                log.error("点播回调函数失败,",e);
+                            }
                         }
                         }
-                        // 点播超时回复BYE
-                        return cmder.streamByeCmd(device.getId(), channelId)
-                            .doOnNext(ignore->
-                                // 释放rtpserver
-                                mediaServerItemService.closeRTPServer(device, channelId))
-                            .flatMap(ignore->
-                                // 回复之前所有的点播请求
-                                messageBroker.reply(messageReply));
-                    }).then(Mono.error(error))
-                )
-                .flatMap(reply->{
-                    if(reply instanceof MediaMessageReply){
-                        MediaMessageReply<StreamInfo> messageReply= (MediaMessageReply<StreamInfo>) reply;
-                        if(reply.isSuccess()){
-                            return  Flux.just(messageReply);
-                        }else {
-                            return Flux.error(new BusinessException(Optional.ofNullable(messageReply.getErrMsg()).orElse("响应错误,请重试")));
+                        return  messageBroker.reply(messageReply);
+                    } else {
+                        // TODO 点播前是否重置状态
+                        redisCatchStorage.stopPlay(streamInfo);
+                        deviceChannelService.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
+                        SSRCInfo ssrcInfo;
+                        String streamId2 = null;
+                        if (mediaServerItem.isRtpEnable()) {
+                            streamId2 = String.format("%s_%s", device.getId(), channelId);
                         }
                         }
+                        ssrcInfo = mediaServerItemService.openRTPServer(mediaServerItem, streamId2);
+
+                        return cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
+                            log.info("收到订阅消息: " + response.toString());
+                            onPublishHandlerForPlay(mediaServerItemInuse, response, deviceId, channelId).subscribe();
+                        }, (event) -> {
+                            mediaServerItemService.closeRTPServer(device, channelId);
+                            MediaMessageReply messageReply = MediaMessageReply.of(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
+                            messageReply.setMessageId(key);
+                            messageReply.setSuccess(false);
+                            messageBroker.reply(messageReply).subscribe();
+                        })
+                            .thenReturn(1L);
                     }
                     }
-                    return Flux.error(new BusinessException("服务器响应错误"));
-                }));
+                })
+                .switchIfEmpty(Mono.defer(()->{
+                    MediaMessageReply messageReply = MediaMessageReply.of("媒体服务器暂不可用", null);
+                    messageReply.setMessageId(key);
+                    messageReply.setSuccess(false);
+                    return messageBroker.reply(messageReply);
+                }))
+                .then(Mono.empty());
+        });
     }
     }
 
 
     private Mono<Void> getSnap(String deviceId,String channelId,StreamInfo streamInfo){
     private Mono<Void> getSnap(String deviceId,String channelId,StreamInfo streamInfo){
@@ -231,6 +203,7 @@ public class LocalPlayService  {
                 MediaMessageReply<StreamInfo> mediaMessageReply = MediaMessageReply.of(null, streamInfo);
                 MediaMessageReply<StreamInfo> mediaMessageReply = MediaMessageReply.of(null, streamInfo);
                 mediaMessageReply.setMessageId(key);
                 mediaMessageReply.setMessageId(key);
                 mediaMessageReply.setSuccess(true);
                 mediaMessageReply.setSuccess(true);
+                System.out.println("SIP请求回调------------------------------------------" + System.currentTimeMillis() + "---------------------------------------");
                 return messageBroker.reply(mediaMessageReply)
                 return messageBroker.reply(mediaMessageReply)
                     .thenReturn(streamInfo);
                     .thenReturn(streamInfo);
             })
             })

+ 8 - 13
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/message/notify/CatalogResponseMessageProcessor.java

@@ -8,12 +8,14 @@ import org.jetlinks.community.media.contanst.CmdType;
 import org.jetlinks.community.media.entity.MediaDevice;
 import org.jetlinks.community.media.entity.MediaDevice;
 import org.jetlinks.community.media.entity.MediaDeviceChannel;
 import org.jetlinks.community.media.entity.MediaDeviceChannel;
 import org.jetlinks.community.media.gb28181.result.WVPResult;
 import org.jetlinks.community.media.gb28181.result.WVPResult;
+import org.jetlinks.community.media.message.MediaMessageReply;
 import org.jetlinks.community.media.service.LocalMediaDeviceChannelService;
 import org.jetlinks.community.media.service.LocalMediaDeviceChannelService;
 import org.jetlinks.community.media.service.LocalMediaDeviceService;
 import org.jetlinks.community.media.service.LocalMediaDeviceService;
 import org.jetlinks.community.media.sip.request.message.MessageHandlerAbstract;
 import org.jetlinks.community.media.sip.request.message.MessageHandlerAbstract;
 import org.jetlinks.community.media.transmit.callback.DeferredResultHolder;
 import org.jetlinks.community.media.transmit.callback.DeferredResultHolder;
 import org.jetlinks.community.media.transmit.callback.RequestMessage;
 import org.jetlinks.community.media.transmit.callback.RequestMessage;
 import org.jetlinks.community.utils.XmlUtil;
 import org.jetlinks.community.utils.XmlUtil;
+import org.jetlinks.core.device.StandaloneDeviceMessageBroker;
 import org.springframework.stereotype.Component;
 import org.springframework.stereotype.Component;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.Mono;
 import javax.sip.RequestEvent;
 import javax.sip.RequestEvent;
@@ -33,8 +35,8 @@ import java.util.*;
 public class CatalogResponseMessageProcessor extends MessageHandlerAbstract {
 public class CatalogResponseMessageProcessor extends MessageHandlerAbstract {
 
 
     private final LocalMediaDeviceChannelService deviceChannelService;
     private final LocalMediaDeviceChannelService deviceChannelService;
-    private final DeferredResultHolder deferredResultHolder;
     private final LocalMediaDeviceService mediaDeviceService;
     private final LocalMediaDeviceService mediaDeviceService;
+    private final StandaloneDeviceMessageBroker messageBroker;
     @Override
     @Override
     public String getMethod() {
     public String getMethod() {
         return "Catalog";
         return "Catalog";
@@ -74,19 +76,12 @@ public class CatalogResponseMessageProcessor extends MessageHandlerAbstract {
                     }
                     }
                     channelList.add(deviceChannel);
                     channelList.add(deviceChannel);
                 }
                 }
+                String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + device.getId();
+                MediaMessageReply<String> reply = MediaMessageReply.of(null, "更新成功,共" + sumNum + "条通道");
+                reply.setMessageId(key);
+                reply.setDeviceId(device.getId());
                 //更新设备通道
                 //更新设备通道
-                Mono.fromRunnable(()->{
-                    String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + device.getId();
-                    RequestMessage msg = new RequestMessage();
-                    msg.setKey(key);
-                    WVPResult<Object> result = new WVPResult<>();
-                    result.setCode(0);
-                    result.setData(device);
-                    result.setMsg("更新成功,共" + sumNum + "条");
-                    msg.setData(result);
-                    deferredResultHolder.invokeAllResult(msg);
-                })
-                    .thenReturn(1L)
+                messageBroker.reply(reply)
                     .flatMap(__->
                     .flatMap(__->
                         deviceChannelService.createDelete()
                         deviceChannelService.createDelete()
                             .where(MediaDeviceChannel::getDeviceId,device.getId())
                             .where(MediaDeviceChannel::getDeviceId,device.getId())

+ 4 - 3
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/storage/impl/RedisCacheStorageImpl.java

@@ -145,14 +145,15 @@ public class RedisCacheStorageImpl {
 
 
 
 
     public StreamInfo queryPlayByDevice(String deviceId, String channelId) {
     public StreamInfo queryPlayByDevice(String deviceId, String channelId) {
-        List<Object> playLeys = redis.scan(String.format("%S_%s_*_%s_%s", VideoManagerConstants.PLAYER_PREFIX,
+        List<Object> playLays = redis.scan(String.format("*%S_%s_*_%s_%s", VideoManagerConstants.PLAYER_PREFIX,
             serverId,
             serverId,
             deviceId,
             deviceId,
             channelId));
             channelId));
-        if (playLeys == null || playLeys.size() == 0){
+        if (playLays == null || playLays.size() == 0){
             return null;
             return null;
         }
         }
-        return (StreamInfo)redis.get(playLeys.get(0).toString());
+        String key = String.valueOf(playLays.get(0));
+        return (StreamInfo)redis.get(key.substring(key.indexOf(VideoManagerConstants.PLAYER_PREFIX)));
     }
     }
 
 
 
 

+ 0 - 73
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/transmit/callback/DeferredResultHolder.java

@@ -51,77 +51,4 @@ public class DeferredResultHolder {
     public static final String CALLBACK_CMD_ALARM = "CALLBACK_ALARM";
     public static final String CALLBACK_CMD_ALARM = "CALLBACK_ALARM";
 
 
     public static final String CALLBACK_CMD_BROADCAST = "CALLBACK_BROADCAST";
     public static final String CALLBACK_CMD_BROADCAST = "CALLBACK_BROADCAST";
-
-    private Map<String, Map<String, DeferredResult>> map = new ConcurrentHashMap<>();
-
-
-    public void put(String key, String id, DeferredResult result) {
-        Map<String, DeferredResult> deferredResultMap = map.get(key);
-        if (deferredResultMap == null) {
-            deferredResultMap = new ConcurrentHashMap<>();
-            map.put(key, deferredResultMap);
-        }
-        deferredResultMap.put(id, result);
-    }
-
-    public DeferredResult get(String key, String id) {
-        Map<String, DeferredResult> deferredResultMap = map.get(key);
-        if (deferredResultMap == null) {
-            return null;
-        }
-        return deferredResultMap.get(id);
-    }
-
-    public boolean exist(String key, String id){
-        if (key == null) {
-            return false;
-        }
-        Map<String, DeferredResult> deferredResultMap = map.get(key);
-        if (id == null) {
-            return deferredResultMap != null;
-        }else {
-            return deferredResultMap != null && deferredResultMap.get(id) != null;
-        }
-    }
-
-    /**
-     * 释放单个请求
-     * @param msg
-     */
-    public void invokeResult(RequestMessage msg) {
-        Map<String, DeferredResult> deferredResultMap = map.get(msg.getKey());
-        if (deferredResultMap == null) {
-            return;
-        }
-        DeferredResult result = deferredResultMap.get(msg.getId());
-        if (result == null) {
-            return;
-        }
-        result.setResult(new ResponseEntity<>(msg.getData(),HttpStatus.OK));
-        deferredResultMap.remove(msg.getId());
-        if (deferredResultMap.size() == 0) {
-            map.remove(msg.getKey());
-        }
-    }
-
-    /**
-     * 释放所有的请求
-     * @param msg
-     */
-    public void invokeAllResult(RequestMessage msg) {
-        Map<String, DeferredResult> deferredResultMap = map.get(msg.getKey());
-        if (deferredResultMap == null) {
-            return;
-        }
-        Set<String> ids = deferredResultMap.keySet();
-        for (String id : ids) {
-            DeferredResult result = deferredResultMap.get(id);
-            if (result == null) {
-                return;
-            }
-            result.setResult(ResponseEntity.ok().body(msg.getData()));
-        }
-        map.remove(msg.getKey());
-
-    }
 }
 }

+ 5 - 0
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/transmit/cmd/SipCommander.java

@@ -187,6 +187,7 @@ public class SipCommander {
             messageBroker.handleReply(device.getId(), ZLMKeyGenerate.getStreamChangedKey(ZLMHttpHookSubscribe.HookType.on_stream_changed,mediaServerItem.getServerId(),"rtp",true,streamId), Duration.ofSeconds(15))
             messageBroker.handleReply(device.getId(), ZLMKeyGenerate.getStreamChangedKey(ZLMHttpHookSubscribe.HookType.on_stream_changed,mediaServerItem.getServerId(),"rtp",true,streamId), Duration.ofSeconds(15))
                 .doOnNext(reply -> {
                 .doOnNext(reply -> {
                     if(reply instanceof MediaMessageReply){
                     if(reply instanceof MediaMessageReply){
+                        System.out.println("收到视频回调------------------------------------------" + System.currentTimeMillis() + "---------------------------------------");
                         MediaMessageReply<MediaItem> itemReply= (MediaMessageReply<MediaItem>) reply;
                         MediaMessageReply<MediaItem> itemReply= (MediaMessageReply<MediaItem>) reply;
                         MediaItem mediaItem = itemReply.getResult();
                         MediaItem mediaItem = itemReply.getResult();
                         if (userSetup.isWaitTrack() &&  mediaItem.getTracks() == null) {
                         if (userSetup.isWaitTrack() &&  mediaItem.getTracks() == null) {
@@ -196,6 +197,10 @@ public class SipCommander {
                         subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey);
                         subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey);
                     }
                     }
                 }).subscribe();
                 }).subscribe();
+
+
+            System.out.println("开始请求视频--------------------------streamId:"+streamId+"----------------" + System.currentTimeMillis() + "---------------------------------------");
+
             return  transmitRequest(sipProvider, request, (e -> {
             return  transmitRequest(sipProvider, request, (e -> {
                 streamSessionManager.remove(device.getId(), channelId);
                 streamSessionManager.remove(device.getId(), channelId);
                 mediaServerItemService.releaseSsrc(mediaServerItem, ssrcInfo.getSsrc());
                 mediaServerItemService.releaseSsrc(mediaServerItem, ssrcInfo.getSsrc());

+ 10 - 6
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/ZLMHttpHookListener.java

@@ -304,6 +304,15 @@ public class ZLMHttpHookListener {
         Mono<Long> result = Mono.just(1L);
         Mono<Long> result = Mono.just(1L);
 
 
         boolean regist = item.isRegist();
         boolean regist = item.isRegist();
+
+        if(regist){
+            System.out.println("视频回调开始-------------streamId-------"+streamId+"-----------------------------" + System.currentTimeMillis() + "---------------------------------------");
+        }
+
+        MediaMessageReply<MediaItem> mediaMessageReply = MediaMessageReply.of(null,item);
+        mediaMessageReply.setMessageId(ZLMKeyGenerate.getStreamChangedKey(ZLMHttpHookSubscribe.HookType.on_stream_changed,mediaServerId,app,regist,streamId));
+        mediaMessageReply.setSuccess(true);
+        deviceMessageBroker.reply(mediaMessageReply).subscribe();
         if ("rtmp".equals(schema)){
         if ("rtmp".equals(schema)){
             log.info("on_stream_changed:注册(true)/注销(false)->{}, app->{}, stream->{}", regist, app, streamId);
             log.info("on_stream_changed:注册(true)/注销(false)->{}, app->{}, stream->{}", regist, app, streamId);
             if (regist) {
             if (regist) {
@@ -401,13 +410,8 @@ public class ZLMHttpHookListener {
                             .thenReturn(1L));
                             .thenReturn(1L));
                 }
                 }
             }
             }
-        }
-        MediaMessageReply<MediaItem> mediaMessageReply = MediaMessageReply.of(null,item);
-        mediaMessageReply.setMessageId(ZLMKeyGenerate.getStreamChangedKey(ZLMHttpHookSubscribe.HookType.on_stream_changed,mediaServerId,app,regist,streamId));
-        mediaMessageReply.setSuccess(true);
-       ;
+        };
         return    result
         return    result
-            .mergeWith(deviceMessageBroker.reply(mediaMessageReply).thenReturn(1L))
             .then(Mono.just(ResponseEntity.ok( new JSONObject()
             .then(Mono.just(ResponseEntity.ok( new JSONObject()
                 .putOpt("code", 0)
                 .putOpt("code", 0)
                 .putOpt("msg", "success").toString())));
                 .putOpt("msg", "success").toString())));

+ 1 - 1
jetlinks-standalone/src/main/resources/application.yml

@@ -247,7 +247,7 @@ media:
   send-rtp-port-range: 10000,10000 # 端口范围
   send-rtp-port-range: 10000,10000 # 端口范围
   # [可选] 国标级联在此范围内选择端口发送媒体流,
   # [可选] 国标级联在此范围内选择端口发送媒体流,
   rtp-port-range: 10000,10000 # 端口范围
   rtp-port-range: 10000,10000 # 端口范围
-  hook-ip: 192.168.104.244
+  hook-ip: 192.168.104.114
   rtp-proxy-port: 10000
   rtp-proxy-port: 10000
   rtmp-port: 80
   rtmp-port: 80
   rtsp-port: 80
   rtsp-port: 80