ソースを参照

add 通道更新

18339543638 3 年 前
コミット
575ca33a91
15 ファイル変更402 行追加264 行削除
  1. 2 2
      jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/netpump/NetPumpDeviceMetadataCodec.java
  2. 1 1
      jetlinks-components/notify-component/notify-webhook/src/main/java/org/jetlinks/community/notify/webhook/WebHookNotifier.java
  3. 10 6
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/controller/MediaDeviceController.java
  4. 1 0
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/controller/MediaGatewayController.java
  5. 1 1
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/controller/PlayController.java
  6. 85 0
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/message/MediaMessage.java
  7. 27 0
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/message/MediaMessageReply.java
  8. 157 183
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalPlayService.java
  9. 29 12
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/transmit/cmd/SipCommander.java
  10. 19 32
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/ZLMHttpHookListener.java
  11. 2 2
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/ZLMHttpHookSubscribe.java
  12. 22 0
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/dto/MediaItem.java
  13. 21 0
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/utils/SubscribeKeyGenerate.java
  14. 25 0
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/utils/ZLMKeyGenerate.java
  15. 0 25
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/utils/ZlmSubscribeTopic.java

+ 2 - 2
jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/netpump/NetPumpDeviceMetadataCodec.java

@@ -33,8 +33,8 @@ public class NetPumpDeviceMetadataCodec implements DeviceMessageCodec {
     @Override
     public Publisher<DeviceMessage> decode(@Nonnull MessageDecodeContext context) {
         byte[] bytes = context.getMessage().getPayload().array();
-//        String value = DataUtils.byteArrToHexString(bytes);
-        String value =new String(bytes);
+        String value = DataUtils.byteArrToHexString(bytes);
+//        String value =new String(bytes);
         MqttMessage message = (MqttMessage) context.getMessage();
         String topic = message.getTopic();
         if (!topic.endsWith("/properties/report")) {

+ 1 - 1
jetlinks-components/notify-component/notify-webhook/src/main/java/org/jetlinks/community/notify/webhook/WebHookNotifier.java

@@ -93,7 +93,7 @@ public class WebHookNotifier extends AbstractNotifier<WebHookTemplate> {
                             break;
                         case "post":
                             request=HttpUtil.createPost(template.getUrl());
-                            request.body(context.toString());
+                            request.body(context.getAllValues().toString());
                             break;
                         default:break;
                     }

+ 10 - 6
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/controller/MediaDeviceController.java

@@ -16,6 +16,7 @@ import org.jetlinks.community.media.bean.StreamInfo;
 import org.jetlinks.community.media.entity.MediaDevice;
 import org.jetlinks.community.media.gb28181.result.PlayResult;
 import org.jetlinks.community.media.gb28181.result.WVPResult;
+import org.jetlinks.community.media.message.MediaMessageReply;
 import org.jetlinks.community.media.service.LocalMediaDeviceChannelService;
 import org.jetlinks.community.media.service.LocalMediaDeviceService;
 import org.jetlinks.community.media.service.LocalMediaServerItemService;
@@ -24,9 +25,13 @@ import org.jetlinks.community.media.storage.impl.RedisCacheStorageImpl;
 import org.jetlinks.community.media.transmit.callback.DeferredResultHolder;
 import org.jetlinks.community.media.transmit.callback.RequestMessage;
 import org.jetlinks.community.media.transmit.cmd.SipCommander;
+import org.jetlinks.core.device.DeviceOperationBroker;
+import org.jetlinks.core.device.StandaloneDeviceMessageBroker;
+import org.jetlinks.core.server.MessageHandler;
 import org.springframework.http.ResponseEntity;
 import org.springframework.web.bind.annotation.*;
 import org.springframework.web.context.request.async.DeferredResult;
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.util.function.Tuple2;
 
@@ -63,17 +68,17 @@ public class MediaDeviceController implements ReactiveServiceCrudController<Medi
     @QueryAction
     @Operation(summary = "设备点播")
     @PostMapping("/{deviceId}/{channelId}/_start")
-    public Mono<Object> play(@PathVariable String deviceId,
+    public Flux<Object> play(@PathVariable String deviceId,
                              @PathVariable String channelId) {
-
         return
             //获取设备信息
-            mediaDeviceService.findById(deviceId)
+            Mono.justOrEmpty(redisCacheStorage.getDevice(deviceId))
+                .switchIfEmpty(Mono.error(new BusinessException("设备未注册")))
                 //获取设备相连的媒体流服务器信息
                 .flatMap(playService::getNewMediaServerItem)
                 .switchIfEmpty(Mono.error(new BusinessException("未找到可用的zlm媒体服务器")))
-                .flatMap(mediaServerItem ->playService.play(mediaServerItem, deviceId, channelId, null, null))
-                .flatMap(this::deferredResultHandler);
+                .flatMapMany(mediaServerItem ->playService.play(mediaServerItem, deviceId, channelId, null, null)
+                    .map(MediaMessageReply::getResult));
     }
 
 
@@ -84,7 +89,6 @@ public class MediaDeviceController implements ReactiveServiceCrudController<Medi
         if(log.isDebugEnabled()){
             log.debug(String.format("设备预览/回放停止API调用,streamId:%s_%s", deviceId, channelId ));
         }
-
         String uuid = cn.hutool.core.lang.UUID.randomUUID().toString();
         DeferredResult<ResponseEntity<String>> result = new DeferredResult<>();
         // 录像查询以channelId作为deviceId查询

+ 1 - 0
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/controller/MediaGatewayController.java

@@ -16,6 +16,7 @@ import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.publisher.UnicastProcessor;
 
 import java.time.Duration;
 import java.util.*;

+ 1 - 1
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/controller/PlayController.java

@@ -35,7 +35,7 @@ import reactor.util.function.Tuple2;
 @Resource(id="gb28181-play",name = "国标设备点播")
 @AllArgsConstructor
 @Tag(name = "GB媒体设备操作")
-public class PlayController implements DeferredController{
+public class PlayController{
 
     private final SipCommander cmder;
 

+ 85 - 0
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/message/MediaMessage.java

@@ -0,0 +1,85 @@
+package org.jetlinks.community.media.message;
+
+import com.alibaba.fastjson.JSONObject;
+import lombok.Data;
+import org.hswebframework.web.bean.FastBeanCopier;
+import org.jetlinks.community.gateway.external.Message;
+import org.jetlinks.core.message.DeviceMessage;
+import org.jetlinks.core.message.MessageType;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName MediaMessage.java
+ * @Description TODO
+ * @createTime 2022年02月21日 20:35:00
+ */
+@Data
+public class MediaMessage implements DeviceMessage {
+    private static final long serialVersionUID = -6849794470754667710L;
+
+    private String code;
+
+    private String messageId;
+
+    private String deviceId;
+
+    private Map<String, Object> headers;
+
+    private long timestamp = System.currentTimeMillis();
+
+    @Override
+    public MessageType getMessageType() {
+        return MessageType.UNKNOWN;
+    }
+
+    @Override
+    public synchronized DeviceMessage addHeader(String header, Object value) {
+        if (headers == null) {
+            this.headers = new ConcurrentHashMap<>();
+        }
+        if (header != null && value != null) {
+            this.headers.put(header, value);
+        }
+        return this;
+    }
+
+    @Override
+    public synchronized DeviceMessage addHeaderIfAbsent(String header, Object value) {
+        if (headers == null) {
+            this.headers = new ConcurrentHashMap<>();
+        }
+        if (header != null && value != null) {
+            this.headers.putIfAbsent(header, value);
+        }
+        return this;
+    }
+
+    @Override
+    public DeviceMessage removeHeader(String header) {
+        if (this.headers != null) {
+            this.headers.remove(header);
+        }
+        return this;
+    }
+
+    @Override
+    public JSONObject toJson() {
+        JSONObject json = FastBeanCopier.copy(this, new JSONObject());
+        json.put("messageType", getMessageType().name());
+        return json;
+    }
+
+    @Override
+    public void fromJson(JSONObject jsonObject) {
+        DeviceMessage.super.fromJson(jsonObject);
+    }
+
+    @Override
+    public String toString() {
+        return toJson().toJSONString();
+    }
+}

+ 27 - 0
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/message/MediaMessageReply.java

@@ -0,0 +1,27 @@
+package org.jetlinks.community.media.message;
+
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.jetlinks.core.message.*;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName MediaMessageReply.java
+ * @Description TODO
+ * @createTime 2022年02月21日 20:36:00
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
+public class MediaMessageReply<T> extends CommonDeviceMessageReply<MediaMessageReply> {
+    private String errMsg;
+    private T result;
+    @Override
+    public MessageType getMessageType() {
+        return MessageType.UNKNOWN;
+    }
+
+}

+ 157 - 183
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalPlayService.java

@@ -17,6 +17,7 @@ import org.jetlinks.community.media.entity.MediaDevice;
 import org.jetlinks.community.media.entity.MediaDeviceChannel;
 import org.jetlinks.community.media.gb28181.event.SipSubscribe;
 import org.jetlinks.community.media.gb28181.result.WVPResult;
+import org.jetlinks.community.media.message.MediaMessageReply;
 import org.jetlinks.community.media.session.VideoStreamSessionManager;
 import org.jetlinks.community.media.storage.impl.RedisCacheStorageImpl;
 import org.jetlinks.community.media.transmit.callback.DeferredResultHolder;
@@ -26,9 +27,12 @@ import org.jetlinks.community.media.gb28181.result.PlayResult;
 import org.jetlinks.community.media.zlm.ZLMHttpHookSubscribe;
 import org.jetlinks.community.media.zlm.ZLMRESTfulUtils;
 import org.jetlinks.community.media.zlm.entity.MediaServerItem;
+import org.jetlinks.community.utils.SubscribeKeyGenerate;
 import org.jetlinks.core.cluster.ClusterEventBus;
+import org.jetlinks.core.device.StandaloneDeviceMessageBroker;
 import org.jetlinks.core.event.EventBus;
 import org.jetlinks.core.event.Subscription;
+import org.jetlinks.core.exception.DeviceOperationException;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
@@ -52,9 +56,6 @@ import java.util.function.Function;
 @Slf4j
 @AllArgsConstructor
 public class LocalPlayService  {
-
-    //    @Autowired
-//    private IVideoManagerStorager storager;
     private final SipCommander cmder;
 
     private final RedisCacheStorageImpl redisCatchStorage;
@@ -70,189 +71,168 @@ public class LocalPlayService  {
     private final ZLMRESTfulUtils zlmresTfulUtils;
 
     private final UserSetup userSetup;
-    public Mono<PlayResult> play(MediaServerItem mediaServerItem, String deviceId, String channelId,
-                                 ZLMHttpHookSubscribe.Event hookEvent,  SipSubscribe.Event errorEvent) {
-        PlayResult playResult = new PlayResult();
-        RequestMessage msg = new RequestMessage();
-        String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId;
-        msg.setKey(key);
-        String uuid = UUID.randomUUID().toString();
-        msg.setId(uuid);
-        playResult.setUuid(uuid);
-        DeferredResult<ResponseEntity<String>> result = new DeferredResult<>(userSetup.getPlayTimeout());
-        playResult.setResult(result);
-        resultHolder.put(key, uuid, result);
+
+    private final StandaloneDeviceMessageBroker messageBroker;
+
+    public Flux<MediaMessageReply> play(MediaServerItem mediaServerItem, String deviceId, String channelId,
+                                        ZLMHttpHookSubscribe.Event hookEvent,  SipSubscribe.Event errorEvent) {
+        String key = SubscribeKeyGenerate.getSubscribeKey(DeferredResultHolder.CALLBACK_CMD_PLAY,deviceId,channelId);
         if (mediaServerItem == null) {
-            WVPResult wvpResult = new WVPResult();
-            wvpResult.setCode(-1);
-            wvpResult.setMsg("未找到可用的zlm");
-            msg.setData(wvpResult);
-            resultHolder.invokeResult(msg);
-            return Mono.just(playResult);
+            return Flux.error(new BusinessException("未找到可用的zlm"));
         }
-
         MediaDevice device = redisCatchStorage.getDevice(deviceId);
-        StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
-        playResult.setDevice(device);
-        // 超时处理
-        playResult.onTimeout(Mono.fromRunnable(()->{
-            log.warn(String.format("设备点播超时,deviceId:%s ,channelId:%s", deviceId, channelId));
-            WVPResult wvpResult = new WVPResult();
-            wvpResult.setCode(-1);
-            SIPDialog dialog = streamSessionManager.getDialog(deviceId, channelId);
-            if (dialog != null) {
-                wvpResult.setMsg("收流超时,请稍候重试");
-            }else {
-                wvpResult.setMsg("点播超时,请稍候重试");
-            }
-            msg.setData(wvpResult);
-            // 点播超时回复BYE
-            cmder.streamByeCmd(device.getId(), channelId)
-                .doOnNext(ignore->
-                    // 释放rtpserver
-                    mediaServerItemService.closeRTPServer(playResult.getDevice(), channelId))
-                .doOnNext(ignore->
-                    // 回复之前所有的点播请求
-                    resultHolder.invokeAllResult(msg))
-                .subscribe();
-        }));
-//        playResult.onComplete(Mono.fromRunnable(()->{
-//            // 点播结束时调用截图接口
-//            String fileName =  deviceId + "_" + channelId + ".jpg";
-//            ResponseEntity responseEntity =  (ResponseEntity)result.getResult();
-//            if (responseEntity != null && responseEntity.getStatusCode() == HttpStatus.OK) {
-//                WVPResult wvpResult = (WVPResult)responseEntity.getBody();
-//                if (Objects.requireNonNull(wvpResult).getCode() == 0) {
-//                    StreamInfo streamInfoForSuccess = (StreamInfo)wvpResult.getData();
-//                    mediaServerItemService.findById(streamInfoForSuccess.getMediaServerId())
-//                        .doOnNext(mediaInfo->{
-//                            String classPath = null;
-//                            try {
-//                                classPath = ResourceUtils.getURL("classpath:").getPath();
-//                                // 兼容打包为jar的class路径
-//                                if(classPath.contains("jar")) {
-//                                    classPath = classPath.substring(0, classPath.lastIndexOf("."));
-//                                    classPath = classPath.substring(0, classPath.lastIndexOf("/") + 1);
-//                                }
-//                                if (classPath.startsWith("file:")) {
-//                                    classPath = classPath.substring(classPath.indexOf(":") + 1);
-//                                }
-//                                String path = classPath + "static/static/snap/";
-//                                // 兼容Windows系统路径(去除前面的“/”)
-//                                if(System.getProperty("os.name").contains("indows")) {
-//                                    path = path.substring(1);
-//                                }
-//                                String streamUrl = streamInfoForSuccess.getRtsp();
-//                                // 请求截图
-//                                log.info("[请求截图]: " + fileName);
-//                                zlmresTfulUtils.getSnap(mediaInfo, streamUrl, 15, 1, path, fileName);
-//                            } catch (FileNotFoundException e) {
-//                                log.error("存放截图文件路径不存在,",e);
-//                            }
-//                        })
-//                        .subscribe();
-//                }
-//            }
-//        }));
-        if (streamInfo == null) {
-            SSRCInfo ssrcInfo;
-            String streamId = null;
-            if (mediaServerItem.isRtpEnable()) {
-                streamId = String.format("%s_%s", device.getId(), channelId);
-            }
-
-            ssrcInfo = mediaServerItemService.openRTPServer(mediaServerItem, streamId);
-
-            // 发送点播消息
-            return cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInUse,JSONObject response) -> {
-                log.info("收到订阅消息: " + response.toString());
-                onPublishHandlerForPlay(mediaServerItemInUse, response, deviceId, channelId, uuid).subscribe();
-                if (hookEvent != null) {
-                    hookEvent.accept(mediaServerItem, response);
+        return  Mono.justOrEmpty(redisCatchStorage.queryPlayByDevice(deviceId, channelId))
+            .switchIfEmpty(Mono.defer(()->{
+                SSRCInfo ssrcInfo;
+                String streamId = null;
+                if (mediaServerItem.isRtpEnable()) {
+                    streamId = String.format("%s_%s", device.getId(), channelId);
                 }
-            }, (event) -> {
-                WVPResult wvpResult = new WVPResult();
-                wvpResult.setCode(-1);
-                // 点播返回sip错误
-                mediaServerItemService.closeRTPServer(playResult.getDevice(), channelId);
-                wvpResult.setMsg(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg));
-                msg.setData(wvpResult);
-                resultHolder.invokeAllResult(msg);
-                if (errorEvent != null) {
-                    errorEvent.accept(event);
+                ssrcInfo = mediaServerItemService.openRTPServer(mediaServerItem, streamId);
+                // 发送点播消息
+                return cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInUse,JSONObject response) -> {
+                    log.info("收到订阅消息: " + response.toString());
+                    onPublishHandlerForPlay(mediaServerItemInUse, response, deviceId, channelId).subscribe();
+                    if (hookEvent != null) {
+                        hookEvent.accept(mediaServerItem, response);
+                    }
+                }, (event) -> {
+                    // 点播返回sip错误
+                    mediaServerItemService.closeRTPServer(device, channelId);
+                    MediaMessageReply messageReply =   MediaMessageReply.of(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg),null);
+                    messageReply.setSuccess(false);
+                    if (errorEvent != null) {
+                        errorEvent.accept(event);
+                    }
+                    messageBroker.reply(messageReply).subscribe();
+                }).then(Mono.empty());
+            }))
+            .flatMap(streamInfo->{
+                String streamId = streamInfo.getStreamId();
+                if (streamId == null) {
+                    MediaMessageReply messageReply = MediaMessageReply.of("点播失败, redis缓存streamId等于null",null);
+                    messageReply.setSuccess(false);
+                    return Mono.just(messageReply);
                 }
-            })
-                .thenReturn(playResult);
-        } else {
-            String streamId = streamInfo.getStreamId();
-            if (streamId == null) {
-                WVPResult wvpResult = new WVPResult();
-                wvpResult.setCode(-1);
-                wvpResult.setMsg("点播失败, redis缓存streamId等于null");
-                msg.setData(wvpResult);
-                resultHolder.invokeAllResult(msg);
-                return Mono.just(playResult);
-            }
-            return mediaServerItemService.findById(streamInfo.getMediaServerId())
-                .flatMap(mediaInfo->{
-                    JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaInfo, streamId);
-                    if (rtpInfo != null && rtpInfo.getBool("exist")) {
-                        WVPResult wvpResult = new WVPResult();
-                        wvpResult.setCode(0);
-                        wvpResult.setMsg("success");
-                        wvpResult.setData(streamInfo);
-                        msg.setData(wvpResult);
-                        resultHolder.invokeAllResult(msg);
-                        if (hookEvent != null) {
-                            try {
-                                hookEvent.accept(mediaServerItem, JSONUtil.parseObj(JSON.toJSONString(streamInfo)));
-                            } catch (Exception e) {
-                                log.error("点播回调函数失败,",e);
+                return mediaServerItemService.findById(streamInfo.getMediaServerId())
+                    .flatMap(mediaInfo->{
+                        JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaInfo, streamId);
+                        if (rtpInfo != null && rtpInfo.getBool("exist")) {
+                            MediaMessageReply messageReply = MediaMessageReply.of(null, streamInfo);
+                            messageReply.setSuccess(true);
+                            messageBroker.reply(messageReply).subscribe();
+                            if (hookEvent != null) {
+                                try {
+                                    hookEvent.accept(mediaServerItem, JSONUtil.parseObj(JSON.toJSONString(streamInfo)));
+                                } catch (Exception e) {
+                                    log.error("点播回调函数失败,",e);
+                                }
+                            }
+                        } else {
+                            // TODO 点播前是否重置状态
+                            redisCatchStorage.stopPlay(streamInfo);
+                            deviceChannelService.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
+                            SSRCInfo ssrcInfo;
+                            String streamId2 = null;
+                            if (mediaServerItem.isRtpEnable()) {
+                                streamId2 = String.format("%s_%s", device.getId(), channelId);
                             }
+                            ssrcInfo = mediaServerItemService.openRTPServer(mediaServerItem, streamId2);
+
+                            return cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
+                                log.info("收到订阅消息: " + response.toString());
+                                onPublishHandlerForPlay(mediaServerItemInuse, response, deviceId, channelId).subscribe();
+                            }, (event) -> {
+                                mediaServerItemService.closeRTPServer(device, channelId);
+                                MediaMessageReply messageReply = MediaMessageReply.of(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
+                                messageBroker.reply(messageReply).subscribe();
+                            });
                         }
-                    } else {
-                        // TODO 点播前是否重置状态
-                        redisCatchStorage.stopPlay(streamInfo);
-                        deviceChannelService.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
-                        SSRCInfo ssrcInfo;
-                        String streamId2 = null;
-                        if (mediaServerItem.isRtpEnable()) {
-                            streamId2 = String.format("%s_%s", device.getId(), channelId);
+                        return Mono.empty();
+                    });
+            })
+            .thenMany(messageBroker.handleReply(deviceId,key,Duration.ofSeconds(10))
+                .onErrorResume(DeviceOperationException.class,error->
+                    //超时响应处理
+                    Mono.defer(()->{
+                        log.warn(String.format("设备点播超时,deviceId:%s ,channelId:%s", deviceId, channelId));
+                        MediaMessageReply messageReply = new MediaMessageReply();
+                        messageReply.setDeviceId(deviceId);
+                        messageReply.setMessageId(key);
+                        messageReply.setSuccess(false);
+                        SIPDialog dialog = streamSessionManager.getDialog(deviceId, channelId);
+                        if (dialog != null) {
+                            messageReply.setErrMsg("收流超时,请稍候重试");
+                        }else {
+                            messageReply.setErrMsg("点播超时,请稍候重试");
+                        }
+                        // 点播超时回复BYE
+                        return cmder.streamByeCmd(device.getId(), channelId)
+                            .doOnNext(ignore->
+                                // 释放rtpserver
+                                mediaServerItemService.closeRTPServer(device, channelId))
+                            .flatMap(ignore->
+                                // 回复之前所有的点播请求
+                                messageBroker.reply(messageReply));
+                    }).then(Mono.error(error))
+                )
+                .flatMap(reply->{
+                    if(reply instanceof MediaMessageReply){
+                        MediaMessageReply<StreamInfo> messageReply= (MediaMessageReply<StreamInfo>) reply;
+                        if(reply.isSuccess()){
+                            return  Flux.just(messageReply);
+                        }else {
+                            return Flux.error(new BusinessException(Optional.ofNullable(messageReply.getErrMsg()).orElse("响应错误,请重试")));
                         }
-                        ssrcInfo = mediaServerItemService.openRTPServer(mediaServerItem, streamId2);
-
-                        return cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
-                            log.info("收到订阅消息: " + response.toString());
-                            onPublishHandlerForPlay(mediaServerItemInuse, response, deviceId, channelId, uuid).subscribe();
-                        }, (event) -> {
-                            mediaServerItemService.closeRTPServer(playResult.getDevice(), channelId);
-                            WVPResult wvpResult = new WVPResult();
-                            wvpResult.setCode(-1);
-                            wvpResult.setMsg(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg));
-                            msg.setData(wvpResult);
-                            resultHolder.invokeAllResult(msg);
-                        });
                     }
-                    return Mono.empty();
-                })
-                .thenReturn(playResult);
-        }
+                    return Flux.error(new BusinessException("服务器响应错误"));
+                }));
     }
 
-    public Mono<Void> onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid) {
-        RequestMessage msg = new RequestMessage();
-        msg.setId(uuid);
-        msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId);
-        return onPublishHandler(mediaServerItem, resonse, deviceId, channelId, uuid)
-            .doOnNext(streamInfo -> {
+    private Mono<Void> getSnap(String deviceId,String channelId,StreamInfo streamInfo){
+        return  Mono.fromRunnable(()->{
+                // 点播结束时调用截图接口
+                String fileName =  deviceId + "_" + channelId + ".jpg";
+                mediaServerItemService.findById(streamInfo.getMediaServerId())
+                    .doOnNext(mediaInfo->{
+                        String classPath = null;
+                        try {
+                            classPath = ResourceUtils.getURL("classpath:").getPath();
+                            // 兼容打包为jar的class路径
+                            if(classPath.contains("jar")) {
+                                classPath = classPath.substring(0, classPath.lastIndexOf("."));
+                                classPath = classPath.substring(0, classPath.lastIndexOf("/") + 1);
+                            }
+                            if (classPath.startsWith("file:")) {
+                                classPath = classPath.substring(classPath.indexOf(":") + 1);
+                            }
+                            String path = classPath + "static/static/snap/";
+                            // 兼容Windows系统路径(去除前面的“/”)
+                            if(System.getProperty("os.name").contains("indows")) {
+                                path = path.substring(1);
+                            }
+                            String streamUrl = streamInfo.getRtsp();
+                            // 请求截图
+                            log.info("[请求截图]: " + fileName);
+                            zlmresTfulUtils.getSnap(mediaInfo, streamUrl, 15, 1, path, fileName);
+                        } catch (FileNotFoundException e) {
+                            log.error("存放截图文件路径不存在,",e);
+                        }
+                    })
+                    .subscribe();
+            }
+        );
+    }
+    public Mono<Void> onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId) {
+        String key=SubscribeKeyGenerate.getSubscribeKey(DeferredResultHolder.CALLBACK_CMD_PLAY,deviceId,channelId);
+        return onPublishHandler(mediaServerItem, resonse, deviceId, channelId)
+            .flatMap(streamInfo -> {
                 redisCatchStorage.startPlay(streamInfo);
-                msg.setData(JSON.toJSONString(streamInfo));
-                WVPResult wvpResult = new WVPResult();
-                wvpResult.setCode(0);
-                wvpResult.setMsg("success");
-                wvpResult.setData(streamInfo);
-                msg.setData(wvpResult);
-                resultHolder.invokeAllResult(msg);
+                MediaMessageReply<StreamInfo> mediaMessageReply = MediaMessageReply.of(null, streamInfo);
+                mediaMessageReply.setMessageId(key);
+                mediaMessageReply.setSuccess(true);
+                return messageBroker.reply(mediaMessageReply)
+                    .thenReturn(streamInfo);
             })
             .flatMap(streamInfo ->
                 deviceChannelService.createQuery()
@@ -267,12 +247,7 @@ public class LocalPlayService  {
                         return Mono.empty();
                     })
                     .then()
-            )
-            .switchIfEmpty(Mono.fromRunnable(()->{
-                log.warn("设备预览API调用失败!");
-                msg.setData("设备预览API调用失败!");
-                resultHolder.invokeAllResult(msg);
-            }));
+            );
     }
 
 
@@ -288,11 +263,10 @@ public class LocalPlayService  {
         return Mono.justOrEmpty(device.getMediaServerId())
             .flatMap(mediaServerItemService::findById)
             //找不到设备相应的媒体流服务器则根据负载均衡获取相应的媒体流服务器
-            .switchIfEmpty(mediaServerItemService.getMediaServerForMinimumLoad())
-            .switchIfEmpty(Mono.fromRunnable(()-> log.warn("点播时未找到可使用的ZLM...")));
+            .switchIfEmpty(mediaServerItemService.getMediaServerForMinimumLoad());
     }
 
-    private Mono<StreamInfo> onPublishHandler(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid) {
+    private Mono<StreamInfo> onPublishHandler(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId) {
         String streamId = resonse.getStr("stream");
         JSONArray tracks = resonse.getJSONArray("tracks");
         return  mediaServerItemService.getStreamInfoByAppAndStream(mediaServerItem,"rtp", streamId, tracks)

+ 29 - 12
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/transmit/cmd/SipCommander.java

@@ -15,13 +15,17 @@ import org.jetlinks.community.media.config.UserSetup;
 import org.jetlinks.community.media.entity.MediaDevice;
 import org.jetlinks.community.media.bean.SipServerConfig;
 import org.jetlinks.community.media.gb28181.event.SipSubscribe;
+import org.jetlinks.community.media.message.MediaMessageReply;
 import org.jetlinks.community.media.service.LocalMediaServerItemService;
 import org.jetlinks.community.media.session.VideoStreamSessionManager;
 import org.jetlinks.community.media.sip.SipContext;
 import org.jetlinks.community.media.storage.impl.RedisCacheStorageImpl;
 import org.jetlinks.community.media.transmit.SIPRequestHeaderProvider;
 import org.jetlinks.community.media.zlm.ZLMHttpHookSubscribe;
+import org.jetlinks.community.media.zlm.dto.MediaItem;
 import org.jetlinks.community.media.zlm.entity.MediaServerItem;
+import org.jetlinks.community.utils.ZLMKeyGenerate;
+import org.jetlinks.core.device.StandaloneDeviceMessageBroker;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.*;
 import javax.sip.*;
@@ -30,6 +34,7 @@ import javax.sip.header.CallIdHeader;
 import javax.sip.message.Request;
 import java.lang.reflect.Field;
 import java.text.ParseException;
+import java.time.Duration;
 import java.util.HashSet;
 import java.util.Optional;
 
@@ -52,8 +57,16 @@ public class SipCommander {
     private final SipSubscribe sipSubscribe;
     private final RedisCacheStorageImpl redisCacheStorage;
     private final DeviceGatewayMonitor gatewayMonitor;
-
-    public SipCommander(SIPRequestHeaderProvider headerProvider, ZLMHttpHookSubscribe subscribe, LocalMediaServerItemService mediaServerItemService, VideoStreamSessionManager streamSessionManager, UserSetup userSetup, SipSubscribe sipSubscribe, RedisCacheStorageImpl redisCacheStorage) {
+    private final StandaloneDeviceMessageBroker messageBroker;
+
+    public SipCommander(SIPRequestHeaderProvider headerProvider,
+                        ZLMHttpHookSubscribe subscribe,
+                        LocalMediaServerItemService mediaServerItemService,
+                        VideoStreamSessionManager streamSessionManager,
+                        UserSetup userSetup,
+                        SipSubscribe sipSubscribe,
+                        RedisCacheStorageImpl redisCacheStorage,
+                        StandaloneDeviceMessageBroker messageBroker) {
         this.headerProvider = headerProvider;
         this.subscribe = subscribe;
         this.mediaServerItemService = mediaServerItemService;
@@ -62,6 +75,7 @@ public class SipCommander {
         this.sipSubscribe = sipSubscribe;
         this.redisCacheStorage = redisCacheStorage;
         this.gatewayMonitor = GatewayMonitors.getDeviceGatewayMonitor("sip");
+        this.messageBroker=messageBroker;
     }
 
     public Mono<Void> playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, MediaDevice device, String channelId,
@@ -88,14 +102,6 @@ public class SipCommander {
                 .putOpt("stream", streamId)
                 .putOpt("regist", true)
                 .putOpt("mediaServerId", mediaServerItem.getServerId());
-            subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
-                (MediaServerItem mediaServerItemInUse, JSONObject json)->{
-                    if (userSetup.isWaitTrack() && json.getJSONArray("tracks") == null) {
-                        return;
-                    }
-                    event.accept(mediaServerItemInUse, JSONUtil.parseObj(json));
-                    subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey);
-                });
 
             //是否需要扩展sdp
 //            if (userSetup.isSeniorSdp()) {
@@ -178,8 +184,19 @@ public class SipCommander {
 
             redisCacheStorage.addRequestStream(streamId,device.getId(),channelId);
             Request request =  headerProvider.createInviteRequest(device, channelId, content.toString(), null, "FromInvt" + tm, null, ssrcInfo.getSsrc(), callIdHeader);
-
-            return transmitRequest(sipProvider, request, (e -> {
+            messageBroker.handleReply(device.getId(), ZLMKeyGenerate.getStreamChangedKey(ZLMHttpHookSubscribe.HookType.on_stream_changed,mediaServerItem.getServerId(),"rtp",true,streamId), Duration.ofSeconds(15))
+                .doOnNext(reply -> {
+                    if(reply instanceof MediaMessageReply){
+                        MediaMessageReply<MediaItem> itemReply= (MediaMessageReply<MediaItem>) reply;
+                        MediaItem mediaItem = itemReply.getResult();
+                        if (userSetup.isWaitTrack() &&  mediaItem.getTracks() == null) {
+                            return;
+                        }
+                        event.accept(redisCacheStorage.getMediaServerItem(mediaItem.getMediaServerId()), JSONUtil.parseObj(mediaItem));
+                        subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey);
+                    }
+                }).subscribe();
+            return  transmitRequest(sipProvider, request, (e -> {
                 streamSessionManager.remove(device.getId(), channelId);
                 mediaServerItemService.releaseSsrc(mediaServerItem, ssrcInfo.getSsrc());
                 errorEvent.accept(e);

+ 19 - 32
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/ZLMHttpHookListener.java

@@ -5,17 +5,16 @@ import cn.hutool.core.lang.UUID;
 import cn.hutool.core.util.StrUtil;
 import cn.hutool.json.JSONObject;
 import cn.hutool.json.JSONUtil;
-import com.alibaba.fastjson.JSON;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.hswebframework.web.authorization.annotation.Authorize;
 import org.jetlinks.community.media.bean.SSRCInfo;
 import org.jetlinks.community.media.config.UserSetup;
 import org.jetlinks.community.media.contanst.VideoManagerConstants;
-import org.jetlinks.community.media.entity.GbStream;
 import org.jetlinks.community.media.bean.StreamInfo;
 import org.jetlinks.community.media.entity.MediaDevice;
 import org.jetlinks.community.media.entity.MediaDeviceChannel;
+import org.jetlinks.community.media.message.MediaMessageReply;
 import org.jetlinks.community.media.service.LocalGbStreamService;
 import org.jetlinks.community.media.service.LocalMediaDeviceChannelService;
 import org.jetlinks.community.media.service.LocalMediaServerItemService;
@@ -23,20 +22,18 @@ import org.jetlinks.community.media.service.LocalPlayService;
 import org.jetlinks.community.media.storage.impl.RedisCacheStorageImpl;
 import org.jetlinks.community.media.transmit.cmd.SipCommander;
 import org.jetlinks.community.media.zlm.dto.MediaItem;
-import org.jetlinks.community.media.zlm.dto.StreamProxyItem;
 import org.jetlinks.community.media.zlm.dto.StreamPushItem;
 import org.jetlinks.community.media.zlm.dto.OriginType;
 import org.jetlinks.community.media.zlm.entity.MediaServerItem;
-import org.jetlinks.community.utils.ZlmSubscribeTopic;
+import org.jetlinks.community.utils.ZLMKeyGenerate;
 import org.jetlinks.core.cluster.ClusterEventBus;
-import org.jetlinks.core.event.EventBus;
+import org.jetlinks.core.device.StandaloneDeviceMessageBroker;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
 import org.springframework.web.bind.annotation.*;
 import org.springframework.web.server.ServerWebExchange;
 import reactor.core.publisher.Mono;
 
-import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -56,7 +53,7 @@ public class ZLMHttpHookListener {
     private final LocalMediaDeviceChannelService channelService;
     private final ZLMMediaListManager zlmMediaListManager;
     private final LocalGbStreamService gbStreamService;
-    private final ClusterEventBus clusterEventBus;
+    private final StandaloneDeviceMessageBroker deviceMessageBroker;
     //todo
     private final String serverId="";
     private final UserSetup userSetup;
@@ -285,7 +282,7 @@ public class ZLMHttpHookListener {
     @PostMapping(value = "/on_stream_changed", produces = "application/json;charset=UTF-8")
     public Mono<ResponseEntity<String>> onStreamChanged(@RequestBody MediaItem item){
         if (log.isDebugEnabled()) {
-            log.debug("[ ZLM HOOK ]on_stream_changed API调用,参数:" + JSONUtil.toJsonStr(item));
+            log.debug("[ ZLM HOOK ]on_stream_changed API调用,参数:" + item.toString());
         }
         String mediaServerId = item.getMediaServerId();
         //获取流应用名
@@ -294,11 +291,10 @@ public class ZLMHttpHookListener {
         String streamId = item.getStream();
         //rtsp或rtmp
         String schema = item.getSchema();
-        JSONObject json = JSONUtil.parseObj(item);
 
-        redisCatchStorage.removeRequestStream(streamId);
+//        redisCatchStorage.removeRequestStream(streamId);
 
-        Mono<Long> result = Mono.fromRunnable(()->{}).thenReturn(1L);
+        Mono<Long> result = Mono.just(1L);
 
         boolean regist = item.isRegist();
         if ("rtmp".equals(schema)){
@@ -368,7 +364,6 @@ public class ZLMHttpHookListener {
 //                                    .doOnNext(streamPush->           eventPublisher.catalogEventPublishForStream(null, gbStreams, CatalogEvent.ON))
 //                                    ;
                                     }
-
                                 }else {
                                     // 兼容流注销时类型从redis记录获取
                                     MediaItem mediaItem = redisCatchStorage.getStreamInfo(app, streamId, mediaServerId);
@@ -378,7 +373,6 @@ public class ZLMHttpHookListener {
                                     }
                                     mono= gbStreamService.getByAppAndStreamId(app,streamId).then();
                                     //todo
-
 //                                    .doOnNext(gbStream -> eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF))
 //                                zlmMediaListManager.removeMedia(app, streamId);
                                 }
@@ -390,10 +384,10 @@ public class ZLMHttpHookListener {
                                         .putOpt("stream", streamId)
                                         .putOpt("register", regist)
                                         .putOpt("mediaServerId", mediaServerId);
-                                    if(mono!=null){
-                                        String originType=type;
-                                        mono=mono.doOnNext(___->clusterEventBus.publish(VideoManagerConstants.WVP_SERVER_STREAM_PREFIX+"_"+originType,jsonObject));
-                                    }
+//                                    if(mono!=null){
+//                                        String originType=type;
+//                                        mono=mono.doOnNext(___->clusterEventBus.publish(VideoManagerConstants.WVP_SERVER_STREAM_PREFIX+"_"+originType,jsonObject));
+//                                    }
                                 }
                                 return mono;
                             })
@@ -405,20 +399,13 @@ public class ZLMHttpHookListener {
         JSONObject ret = new JSONObject()
             .putOpt("code", 0)
             .putOpt("msg", "success");
-        return       result
-            .mergeWith(Mono.fromRunnable(()->{
-                ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, json);
-                if(subscribe!=null){
-                    MediaServerItem mediaServerItem = redisCatchStorage.getMediaServerItem(mediaServerId);
-                    if(mediaServerItem!=null){
-                        try {
-                            subscribe.accept(mediaServerItem,json);
-                        } catch (Exception e) {
-                            log.error("回调失败,",e);
-                        }
-                    }
-                }
-            }))
+
+        MediaMessageReply<MediaItem> mediaMessageReply = MediaMessageReply.of(null,item);
+        mediaMessageReply.setMessageId(ZLMKeyGenerate.getStreamChangedKey(ZLMHttpHookSubscribe.HookType.on_stream_changed,mediaServerId,app,regist,streamId));
+        mediaMessageReply.setSuccess(true);
+       ;
+        return    result
+            .mergeWith(deviceMessageBroker.reply(mediaMessageReply).thenReturn(1L))
             .then(Mono.just(ResponseEntity.ok(ret.toString())));
     }
 
@@ -519,7 +506,7 @@ public class ZLMHttpHookListener {
                             ssrcInfo = mediaServerItemService.openRTPServer(mediaInfo, streamId2);
                             return sipCommander.playStreamCmd(mediaInfo, ssrcInfo, device, channel.getChannelId(), (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
                                 log.info("收到订阅消息: " + response.toString());
-                                localPlayService.onPublishHandlerForPlay(mediaServerItemInuse, response, channel.getDeviceId(), channel.getChannelId(), uuid.toString()).subscribe();
+                                localPlayService.onPublishHandlerForPlay(mediaServerItemInuse, response, channel.getDeviceId(), channel.getChannelId()).subscribe();
                             }, null);
 
 

+ 2 - 2
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/ZLMHttpHookSubscribe.java

@@ -38,8 +38,8 @@ public class ZLMHttpHookSubscribe {
         on_server_keepalive
     }
 
-    public interface Event extends BiConsumer<MediaServerItem, JSONObject> {
-
+    public interface Event  {
+        void accept(MediaServerItem mediaServerItem, JSONObject jsonObject);
     }
 
     private Map<HookType, Map<JSONObject, ZLMHttpHookSubscribe.Event>> allSubscribes = new ConcurrentHashMap<>();

+ 22 - 0
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/dto/MediaItem.java

@@ -149,6 +149,28 @@ public class MediaItem {
 
     }
 
+    @Override
+    public String toString() {
+        return "MediaItem{" +
+            "regist=" + regist +
+            ", aliveSecond=" + aliveSecond +
+            ", mediaServerId='" + mediaServerId + '\'' +
+            ", app='" + app + '\'' +
+            ", bytesSpeed=" + bytesSpeed +
+            ", createStamp=" + createStamp +
+            ", stream='" + stream + '\'' +
+            ", totalReaderCount='" + totalReaderCount + '\'' +
+            ", schema='" + schema + '\'' +
+            ", originType=" + originType +
+            ", originSock=" + originSock +
+            ", originTypeStr='" + originTypeStr + '\'' +
+            ", originUrl='" + originUrl + '\'' +
+            ", tracks=" + tracks +
+            ", vhost='" + vhost + '\'' +
+            ", docker=" + docker +
+            '}';
+    }
+
     public static class OriginSock{
         private String identifier;
         private String local_ip;

+ 21 - 0
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/utils/SubscribeKeyGenerate.java

@@ -0,0 +1,21 @@
+package org.jetlinks.community.utils;
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName SubscribeKeyGenerate.java
+ * @Description TODO
+ * @createTime 2022年02月10日 15:36:00
+ */
+public class SubscribeKeyGenerate {
+
+    /**
+     * 获取
+     * @param prefix
+     * @param deviceId
+     * @param channelId
+     * @return
+     */
+    public static String getSubscribeKey(String prefix,String deviceId,String channelId){
+        return prefix+"/"+deviceId+"/"+channelId;
+    }
+}

+ 25 - 0
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/utils/ZLMKeyGenerate.java

@@ -0,0 +1,25 @@
+package org.jetlinks.community.utils;
+
+import org.jetlinks.community.media.zlm.ZLMHttpHookSubscribe;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName ZLMKeyGenerate.java
+ * @Description TODO
+ * @createTime 2022年02月22日 15:07:00
+ */
+public class ZLMKeyGenerate {
+
+    /**
+     * 获取
+     * @param hookType
+     * @param serverId
+     * @param streamId
+     * @return
+     */
+    public static String getStreamChangedKey(ZLMHttpHookSubscribe.HookType hookType, String serverId,String app,boolean register, String streamId){
+        return hookType.name()+"/"+serverId+"/"+app+"/"+register+"/"+streamId;
+    }
+
+}

+ 0 - 25
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/utils/ZlmSubscribeTopic.java

@@ -1,25 +0,0 @@
-package org.jetlinks.community.utils;
-
-import org.jetlinks.community.media.zlm.ZLMHttpHookSubscribe;
-
-/**
- * @author lifang
- * @version 1.0.0
- * @ClassName ZlmSubscribeTopic.java
- * @Description TODO
- * @createTime 2022年02月10日 15:36:00
- */
-public class ZlmSubscribeTopic {
-
-    /**
-     * 获取监听媒体流变化主题
-     * @param app
-     * @param streamId
-     * @param mediaServerItemId
-     * @return
-     */
-    public static String getOnStreamChanged(String app,String streamId,String mediaServerItemId){
-        return ZLMHttpHookSubscribe.HookType.on_stream_changed+"/"+app+"/"+streamId
-            +"/"+mediaServerItemId;
-    }
-}