|
|
@@ -71,7 +71,7 @@ public class LocalPlayService {
|
|
|
private final StandaloneDeviceMessageBroker messageBroker;
|
|
|
|
|
|
public Mono<Void> play(MediaServerItem mediaServerItem, String deviceId, String channelId,
|
|
|
- ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent) {
|
|
|
+ ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent) {
|
|
|
String key = SubscribeKeyGenerate.getSubscribeKey(DeferredResultHolder.CALLBACK_CMD_PLAY,deviceId,channelId);
|
|
|
if (mediaServerItem == null) {
|
|
|
return Mono.error(new BusinessException("未找到可用的zlm"));
|
|
|
@@ -80,85 +80,85 @@ public class LocalPlayService {
|
|
|
StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
|
|
|
return streamInfo==null?
|
|
|
Mono.defer(()->{
|
|
|
- SSRCInfo ssrcInfo;
|
|
|
- String streamId = null;
|
|
|
- if (mediaServerItem.isRtpEnable()) {
|
|
|
- streamId = String.format("%s_%s", device.getId(), channelId);
|
|
|
- }
|
|
|
- ssrcInfo = mediaServerItemService.openRTPServer(mediaServerItem, streamId);
|
|
|
- // 发送点播消息
|
|
|
- return cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInUse,JSONObject response) -> {
|
|
|
- log.info("收到订阅消息: " + response.toString());
|
|
|
- onPublishHandlerForPlay(mediaServerItemInUse, response, deviceId, channelId).subscribe();
|
|
|
- if (hookEvent != null) {
|
|
|
- hookEvent.accept(mediaServerItem, response);
|
|
|
- }
|
|
|
- }, (event) -> {
|
|
|
- // 点播返回sip错误
|
|
|
- mediaServerItemService.closeRTPServer(device, channelId);
|
|
|
- MediaMessageReply messageReply = MediaMessageReply.of(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg),null);
|
|
|
- messageReply.setSuccess(false);
|
|
|
- if (errorEvent != null) {
|
|
|
- errorEvent.accept(event);
|
|
|
+ SSRCInfo ssrcInfo;
|
|
|
+ String streamId = null;
|
|
|
+ if (mediaServerItem.isRtpEnable()) {
|
|
|
+ streamId = String.format("%s_%s", device.getId(), channelId);
|
|
|
}
|
|
|
- messageBroker.reply(messageReply).subscribe();
|
|
|
- }).then(Mono.empty());
|
|
|
- }):
|
|
|
+ ssrcInfo = mediaServerItemService.openRTPServer(mediaServerItem, streamId);
|
|
|
+ // 发送点播消息
|
|
|
+ return cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInUse,JSONObject response) -> {
|
|
|
+ log.info("收到订阅消息: " + response.toString());
|
|
|
+ onPublishHandlerForPlay(mediaServerItemInUse, response, deviceId, channelId).subscribe();
|
|
|
+ if (hookEvent != null) {
|
|
|
+ hookEvent.accept(mediaServerItem, response);
|
|
|
+ }
|
|
|
+ }, (event) -> {
|
|
|
+ // 点播返回sip错误
|
|
|
+ mediaServerItemService.closeRTPServer(device, channelId);
|
|
|
+ MediaMessageReply messageReply = MediaMessageReply.of(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg),null);
|
|
|
+ messageReply.setSuccess(false);
|
|
|
+ if (errorEvent != null) {
|
|
|
+ errorEvent.accept(event);
|
|
|
+ }
|
|
|
+ messageBroker.reply(messageReply).subscribe();
|
|
|
+ }).then(Mono.empty());
|
|
|
+ }):
|
|
|
Mono.defer(()->{
|
|
|
- String streamId = streamInfo.getStreamId();
|
|
|
- if (streamId == null) {
|
|
|
- redisCatchStorage.stopPlay(streamInfo);
|
|
|
- MediaMessageReply messageReply = MediaMessageReply.of("点播失败, redis缓存streamId等于null",null);
|
|
|
- messageReply.setSuccess(false);
|
|
|
- return Mono.empty();
|
|
|
- }
|
|
|
- return mediaServerItemService.findById(streamInfo.getMediaServerId())
|
|
|
- .flatMap(mediaInfo->{
|
|
|
- JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaInfo, streamId);
|
|
|
- if (rtpInfo != null && rtpInfo.getBool("exist")) {
|
|
|
- MediaMessageReply messageReply = MediaMessageReply.of(null, streamInfo);
|
|
|
- messageReply.setMessageId(key);
|
|
|
- messageReply.setSuccess(true);
|
|
|
- if (hookEvent != null) {
|
|
|
- try {
|
|
|
- hookEvent.accept(mediaServerItem, JSONUtil.parseObj(JSON.toJSONString(streamInfo)));
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("点播回调函数失败,",e);
|
|
|
+ String streamId = streamInfo.getStreamId();
|
|
|
+ if (streamId == null) {
|
|
|
+ redisCatchStorage.stopPlay(streamInfo);
|
|
|
+ MediaMessageReply messageReply = MediaMessageReply.of("点播失败, redis缓存streamId等于null",null);
|
|
|
+ messageReply.setSuccess(false);
|
|
|
+ return Mono.empty();
|
|
|
+ }
|
|
|
+ return Mono.justOrEmpty(redisCatchStorage.getMediaServerItem(streamInfo.getMediaServerId()))
|
|
|
+ .flatMap(mediaInfo->{
|
|
|
+ JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaInfo, streamId);
|
|
|
+ if (rtpInfo != null && rtpInfo.getBool("exist")) {
|
|
|
+ MediaMessageReply messageReply = MediaMessageReply.of(null, streamInfo);
|
|
|
+ messageReply.setMessageId(key);
|
|
|
+ messageReply.setSuccess(true);
|
|
|
+ if (hookEvent != null) {
|
|
|
+ try {
|
|
|
+ hookEvent.accept(mediaServerItem, JSONUtil.parseObj(JSON.toJSONString(streamInfo)));
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("点播回调函数失败,",e);
|
|
|
+ }
|
|
|
}
|
|
|
+ return messageBroker.reply(messageReply);
|
|
|
+ } else {
|
|
|
+ // TODO 点播前是否重置状态
|
|
|
+ redisCatchStorage.stopPlay(streamInfo);
|
|
|
+ deviceChannelService.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
|
|
|
+ SSRCInfo ssrcInfo;
|
|
|
+ String streamId2 = null;
|
|
|
+ if (mediaServerItem.isRtpEnable()) {
|
|
|
+ streamId2 = String.format("%s_%s", device.getId(), channelId);
|
|
|
+ }
|
|
|
+ ssrcInfo = mediaServerItemService.openRTPServer(mediaServerItem, streamId2);
|
|
|
+
|
|
|
+ return cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
|
|
|
+ log.info("收到订阅消息: " + response.toString());
|
|
|
+ onPublishHandlerForPlay(mediaServerItemInuse, response, deviceId, channelId).subscribe();
|
|
|
+ }, (event) -> {
|
|
|
+ mediaServerItemService.closeRTPServer(device, channelId);
|
|
|
+ MediaMessageReply messageReply = MediaMessageReply.of(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
|
|
|
+ messageReply.setMessageId(key);
|
|
|
+ messageReply.setSuccess(false);
|
|
|
+ messageBroker.reply(messageReply).subscribe();
|
|
|
+ })
|
|
|
+ .thenReturn(1L);
|
|
|
}
|
|
|
- return messageBroker.reply(messageReply);
|
|
|
- } else {
|
|
|
- // TODO 点播前是否重置状态
|
|
|
- redisCatchStorage.stopPlay(streamInfo);
|
|
|
- deviceChannelService.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
|
|
|
- SSRCInfo ssrcInfo;
|
|
|
- String streamId2 = null;
|
|
|
- if (mediaServerItem.isRtpEnable()) {
|
|
|
- streamId2 = String.format("%s_%s", device.getId(), channelId);
|
|
|
- }
|
|
|
- ssrcInfo = mediaServerItemService.openRTPServer(mediaServerItem, streamId2);
|
|
|
-
|
|
|
- return cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
|
|
|
- log.info("收到订阅消息: " + response.toString());
|
|
|
- onPublishHandlerForPlay(mediaServerItemInuse, response, deviceId, channelId).subscribe();
|
|
|
- }, (event) -> {
|
|
|
- mediaServerItemService.closeRTPServer(device, channelId);
|
|
|
- MediaMessageReply messageReply = MediaMessageReply.of(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
|
|
|
- messageReply.setMessageId(key);
|
|
|
- messageReply.setSuccess(false);
|
|
|
- messageBroker.reply(messageReply).subscribe();
|
|
|
- })
|
|
|
- .thenReturn(1L);
|
|
|
- }
|
|
|
- })
|
|
|
- .switchIfEmpty(Mono.defer(()->{
|
|
|
- MediaMessageReply messageReply = MediaMessageReply.of("媒体服务器暂不可用", null);
|
|
|
- messageReply.setMessageId(key);
|
|
|
- messageReply.setSuccess(false);
|
|
|
- return messageBroker.reply(messageReply);
|
|
|
- }))
|
|
|
- .then(Mono.empty());
|
|
|
- });
|
|
|
+ })
|
|
|
+ .switchIfEmpty(Mono.defer(()->{
|
|
|
+ MediaMessageReply messageReply = MediaMessageReply.of("媒体服务器暂不可用", null);
|
|
|
+ messageReply.setMessageId(key);
|
|
|
+ messageReply.setSuccess(false);
|
|
|
+ return messageBroker.reply(messageReply);
|
|
|
+ }))
|
|
|
+ .then(Mono.empty());
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
private Mono<Void> getSnap(String deviceId,String channelId,StreamInfo streamInfo){
|