Bläddra i källkod

changed 改变网络泵解析方式

18339543638 3 år sedan
förälder
incheckning
1f1d2acdb6
18 ändrade filer med 652 tillägg och 726 borttagningar
  1. 2 2
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/bean/SsrcConfig.java
  2. 199 219
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/config/MediaConfig.java
  3. 0 146
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/controller/ApiStreamController.java
  4. 0 55
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/controller/DeferredController.java
  5. 15 11
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/controller/MediaDeviceController.java
  6. 81 0
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/controller/MediaServerController.java
  7. 16 14
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalMediaDeviceService.java
  8. 31 35
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalMediaServerItemService.java
  9. 81 78
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalPlayService.java
  10. 65 36
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/message/MessageRequestProcessor.java
  11. 8 11
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/message/notify/CatalogResponseMessageProcessor.java
  12. 7 1
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/storage/impl/RedisCacheStorageImpl.java
  13. 5 5
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/transmit/cmd/SipCommander.java
  14. 5 4
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/ZLMHttpHookListener.java
  15. 19 23
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/ZLMRunner.java
  16. 70 70
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/ZLMServerConfig.java
  17. 47 16
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/entity/MediaServerItem.java
  18. 1 0
      jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/web/NetworkConfigController.java

+ 2 - 2
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/bean/SsrcConfig.java

@@ -31,10 +31,10 @@ public class SsrcConfig implements Serializable {
     public SsrcConfig() {
     }
 
-    public SsrcConfig(String mediaServerId, Set<String> usedSet, String sipDomain) {
+    public SsrcConfig(String mediaServerId, Set<String> usedSet, String ssrcPrefix) {
         this.mediaServerId = mediaServerId;
         this.isUsed = new ArrayList<>();
-        this.ssrcPrefix = sipDomain.substring(3, 8);
+        this.ssrcPrefix = ssrcPrefix.substring(3, 8);
         this.notUsed = new ArrayList<>();
         for (int i = 1; i < MAX_STRTEAM_COUNT; i++) {
             String ssrc;

+ 199 - 219
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/config/MediaConfig.java

@@ -1,219 +1,199 @@
-package org.jetlinks.community.media.config;
-
-import lombok.Data;
-import org.jetlinks.community.media.zlm.entity.MediaServerItem;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-import org.springframework.util.StringUtils;
-
-import java.text.SimpleDateFormat;
-
-@Data
-@ConfigurationProperties(prefix = "media")
-public class MediaConfig {
-
-//    //@Value("${media.id:}")
-    private String id;
-
-    //@Value("${media.ip}")
-    private String ip;
-
-    //@Value("${media.hook-ip:${sip.ip}}")
-    private String hookIp;
-
-    //@Value("${sip.ip}")
-    private String sipIp;
-
-    //@Value("${sip.domain}")
-    private String sipDomain;
-
-    //@Value("${media.sdp-ip:${media.ip}}")
-    private String sdpIp;
-
-    //@Value("${media.stream-ip:${media.ip}}")
-    private String streamIp;
-
-    //@Value("${media.http-port}")
-    private Integer httpPort;
-
-    //@Value("${media.http-ssl-port:0}")
-    private Integer httpSSlPort = 0;
-
-    //@Value("${media.rtmp-port:0}")
-    private Integer rtmpPort = 0;
-
-    //@Value("${media.rtmp-ssl-port:0}")
-    private Integer rtmpSSlPort = 0;
-
-    //@Value("${media.rtp-proxy-port:0}")
-    private Integer rtpProxyPort = 0;
-
-    //@Value("${media.rtsp-port:0}")
-    private Integer rtspPort = 0;
-
-    //@Value("${media.rtsp-ssl-port:0}")
-    private Integer rtspSSLPort = 0;
-
-    //@Value("${media.auto-config:true}")
-    private boolean autoConfig = true;
-
-    //@Value("${media.secret}")
-    private String secret;
-
-    //@Value("${media.stream-none-reader-delay-ms:18000}")
-    private int streamNoneReaderDelayMS = 18000;
-
-    //@Value("${media.rtp.enable}")
-    private boolean rtpEnable;
-
-    //@Value("${media.rtp.port-range}")
-    private String rtpPortRange;
-
-
-    //@Value("${media.rtp.send-port-range}")
-    private String sendRtpPortRange;
-
-    //@Value("${media.record-assist-port:0}")
-    private Integer recordAssistPort = 0;
-
-    public String getId() {
-        return id;
-    }
-
-    public String getIp() {
-        return ip;
-    }
-
-    public String getHookIp() {
-        if (StringUtils.isEmpty(hookIp)){
-            return sipIp;
-        }else {
-            return hookIp;
-        }
-
-    }
-
-    public String getSipIp() {
-        if (sipIp == null) {
-            return this.ip;
-        }else {
-            return sipIp;
-        }
-    }
-
-    public int getHttpPort() {
-        return httpPort;
-    }
-
-    public int getHttpSSlPort() {
-        return httpSSlPort;
-    }
-
-    public int getRtmpPort() {
-        return rtmpPort;
-    }
-
-    public int getRtmpSSlPort() {
-        return rtmpSSlPort;
-    }
-
-    public int getRtpProxyPort() {
-        if (rtpProxyPort == null) {
-            return 0;
-        }else {
-            return rtpProxyPort;
-        }
-
-    }
-
-    public int getRtspPort() {
-        return rtspPort;
-    }
-
-    public int getRtspSSLPort() {
-        return rtspSSLPort;
-    }
-
-    public boolean isAutoConfig() {
-        return autoConfig;
-    }
-
-    public String getSecret() {
-        return secret;
-    }
-
-    public int getStreamNoneReaderDelayMS() {
-        return streamNoneReaderDelayMS;
-    }
-
-    public boolean isRtpEnable() {
-        return rtpEnable;
-    }
-
-    public String getRtpPortRange() {
-        return rtpPortRange;
-    }
-
-    public int getRecordAssistPort() {
-        return recordAssistPort;
-    }
-
-    public String getSdpIp() {
-        if (StringUtils.isEmpty(sdpIp)){
-            return ip;
-        }else {
-            return sdpIp;
-        }
-    }
-
-    public String getStreamIp() {
-        if (StringUtils.isEmpty(streamIp)){
-            return ip;
-        }else {
-            return streamIp;
-        }
-    }
-
-    public String getSipDomain() {
-        return sipDomain;
-    }
-
-    public String getSendRtpPortRange() {
-        return sendRtpPortRange;
-    }
-
-    private MediaServerItem mediaServerItem;
-    public MediaServerItem getMediaSerItem(){
-        if(mediaServerItem!=null){
-            return mediaServerItem;
-        }
-        MediaServerItem mediaServerItem = new MediaServerItem();
-        mediaServerItem.setId(id);
-        mediaServerItem.setIp(ip);
-        mediaServerItem.setDefaultServer(true);
-        mediaServerItem.setHookIp(getHookIp());
-        mediaServerItem.setSdpIp(getSdpIp());
-        mediaServerItem.setStreamIp(getStreamIp());
-        mediaServerItem.setHttpPort(httpPort);
-        mediaServerItem.setHttpSSlPort(httpSSlPort);
-        mediaServerItem.setRtmpPort(rtmpPort);
-        mediaServerItem.setRtmpSSlPort(rtmpSSlPort);
-        mediaServerItem.setRtpProxyPort(getRtpProxyPort());
-        mediaServerItem.setRtspPort(rtspPort);
-        mediaServerItem.setRtspSSLPort(rtspSSLPort);
-        mediaServerItem.setAutoConfig(autoConfig);
-        mediaServerItem.setSecret(secret);
-        mediaServerItem.setStreamNoneReaderDelayMS(streamNoneReaderDelayMS);
-        mediaServerItem.setRtpEnable(rtpEnable);
-        mediaServerItem.setRtpPortRange(rtpPortRange);
-        mediaServerItem.setSendRtpPortRange(sendRtpPortRange);
-        mediaServerItem.setRecordAssistPort(recordAssistPort);
-        mediaServerItem.setHookAliveInterval(120);
-
-        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-        mediaServerItem.setCreateTime(format.format(System.currentTimeMillis()));
-        mediaServerItem.setUpdateTime(format.format(System.currentTimeMillis()));
-
-        return mediaServerItem;
-    }
-
-}
+//package org.jetlinks.community.media.config;
+//
+//import lombok.Data;
+//import org.jetlinks.community.media.zlm.entity.MediaServerItem;
+//import org.springframework.boot.context.properties.ConfigurationProperties;
+//import org.springframework.util.StringUtils;
+//
+//import java.text.SimpleDateFormat;
+//
+//@Data
+//@ConfigurationProperties(prefix = "media")
+//public class MediaConfig {
+//
+//    private String id;
+//
+//    private String ip;
+//
+//    private String hookIp;
+//
+//    private String hookPort;
+//
+//    private String sipIp;
+//
+//    private String sipDomain;
+//
+//    private String sdpIp;
+//
+//    private String streamIp;
+//
+//    private Integer httpPort;
+//
+//    private Integer httpSSlPort = 0;
+//
+//    private Integer rtmpPort = 0;
+//
+//    private Integer rtmpSSlPort = 0;
+//
+//    private Integer rtpProxyPort = 0;
+//
+//    private Integer rtspPort = 0;
+//
+//    private Integer rtspSSLPort = 0;
+//
+//    private boolean autoConfig = true;
+//
+//    private String secret;
+//
+//    private int streamNoneReaderDelayMS = 18000;
+//
+//    private boolean rtpEnable;
+//
+//    private String rtpPortRange;
+//
+//    private String sendRtpPortRange;
+//
+//    private Integer recordAssistPort = 0;
+//
+//    public String getId() {
+//        return id;
+//    }
+//
+//    public String getIp() {
+//        return ip;
+//    }
+//
+//    public String getHookIp() {
+//        if (StringUtils.isEmpty(hookIp)){
+//            return sipIp;
+//        }else {
+//            return hookIp;
+//        }
+//
+//    }
+//
+//    public String getSipIp() {
+//        if (sipIp == null) {
+//            return this.ip;
+//        }else {
+//            return sipIp;
+//        }
+//    }
+//
+//    public int getHttpPort() {
+//        return httpPort;
+//    }
+//
+//    public int getHttpSSlPort() {
+//        return httpSSlPort;
+//    }
+//
+//    public int getRtmpPort() {
+//        return rtmpPort;
+//    }
+//
+//    public int getRtmpSSlPort() {
+//        return rtmpSSlPort;
+//    }
+//
+//    public int getRtpProxyPort() {
+//        if (rtpProxyPort == null) {
+//            return 0;
+//        }else {
+//            return rtpProxyPort;
+//        }
+//
+//    }
+//
+//    public int getRtspPort() {
+//        return rtspPort;
+//    }
+//
+//    public int getRtspSSLPort() {
+//        return rtspSSLPort;
+//    }
+//
+//    public boolean isAutoConfig() {
+//        return autoConfig;
+//    }
+//
+//    public String getSecret() {
+//        return secret;
+//    }
+//
+//    public int getStreamNoneReaderDelayMS() {
+//        return streamNoneReaderDelayMS;
+//    }
+//
+//    public boolean isRtpEnable() {
+//        return rtpEnable;
+//    }
+//
+//    public String getRtpPortRange() {
+//        return rtpPortRange;
+//    }
+//
+//    public int getRecordAssistPort() {
+//        return recordAssistPort;
+//    }
+//
+//    public String getSdpIp() {
+//        if (StringUtils.isEmpty(sdpIp)){
+//            return ip;
+//        }else {
+//            return sdpIp;
+//        }
+//    }
+//
+//    public String getStreamIp() {
+//        if (StringUtils.isEmpty(streamIp)){
+//            return ip;
+//        }else {
+//            return streamIp;
+//        }
+//    }
+//
+//    public String getSipDomain() {
+//        return sipDomain;
+//    }
+//
+//    public String getSendRtpPortRange() {
+//        return sendRtpPortRange;
+//    }
+//
+//    private MediaServerItem mediaServerItem;
+//    public MediaServerItem getMediaSerItem(){
+//        if(mediaServerItem!=null){
+//            return mediaServerItem;
+//        }
+//        MediaServerItem mediaServerItem = new MediaServerItem();
+//        mediaServerItem.setId(id);
+//        mediaServerItem.setIp(ip);
+//        mediaServerItem.setDefaultServer(true);
+//        mediaServerItem.setHookIp(getHookIp());
+//        mediaServerItem.setSdpIp(getSdpIp());
+//        mediaServerItem.setStreamIp(getStreamIp());
+//        mediaServerItem.setHttpPort(httpPort);
+//        mediaServerItem.setHttpSSlPort(httpSSlPort);
+//        mediaServerItem.setRtmpPort(rtmpPort);
+//        mediaServerItem.setRtmpSSlPort(rtmpSSlPort);
+//        mediaServerItem.setRtpProxyPort(getRtpProxyPort());
+//        mediaServerItem.setRtspPort(rtspPort);
+//        mediaServerItem.setRtspSSLPort(rtspSSLPort);
+//        mediaServerItem.setAutoConfig(autoConfig);
+//        mediaServerItem.setSecret(secret);
+//        mediaServerItem.setStreamNoneReaderDelayMS(streamNoneReaderDelayMS);
+//        mediaServerItem.setRtpEnable(rtpEnable);
+//        mediaServerItem.setRtpPortRange(rtpPortRange);
+//        mediaServerItem.setSendRtpPortRange(sendRtpPortRange);
+//        mediaServerItem.setRecordAssistPort(recordAssistPort);
+//        mediaServerItem.setHookAliveInterval(120);
+//
+//        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+//        mediaServerItem.setCreateTime(format.format(System.currentTimeMillis()));
+//        mediaServerItem.setUpdateTime(format.format(System.currentTimeMillis()));
+//
+//        return mediaServerItem;
+//    }
+//
+//}

+ 0 - 146
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/controller/ApiStreamController.java

@@ -1,146 +0,0 @@
-//package org.jetlinks.community.media.controller;
-//
-//import com.alibaba.fastjson.JSONObject;
-//import io.swagger.v3.oas.annotations.tags.Tag;
-//import lombok.AllArgsConstructor;
-//import lombok.extern.slf4j.Slf4j;
-//import org.hswebframework.web.authorization.annotation.Authorize;
-//import org.hswebframework.web.authorization.annotation.Resource;
-//import org.jetlinks.community.media.bean.StreamInfo;
-//import org.jetlinks.community.media.entity.DeviceChannel;
-//import org.jetlinks.community.media.entity.MediaDevice;
-//import org.jetlinks.community.media.enums.DeviceState;
-//import org.jetlinks.community.media.gb28181.result.PlayResult;
-//import org.jetlinks.community.media.zlm.entity.MediaServerItem;
-//import org.jetlinks.community.utils.SipUtils;
-//import org.redisson.api.RedissonClient;
-//import org.springframework.web.bind.annotation.GetMapping;
-//import org.springframework.web.bind.annotation.RequestMapping;
-//import org.springframework.web.bind.annotation.RequestParam;
-//import org.springframework.web.bind.annotation.RestController;
-//import org.springframework.web.context.request.async.DeferredResult;
-//import reactor.core.publisher.Mono;
-//
-///**
-// * @author lifang
-// * @version 1.0.0
-// * @ClassName ApiStreamController.java
-// * @Description TODO
-// * @createTime 2022年01月22日 08:26:00
-// */
-//@RestController
-//@RequestMapping("/api/stream")
-//@Slf4j
-//@Authorize
-//@Resource(id="api-stream",name = "媒体流")
-//@AllArgsConstructor
-//@Tag(name = "媒体流")
-//public class ApiStreamController {
-//
-//    private final RedissonClient redissonClient;
-//    /**
-//     * 实时直播 - 开始直播
-//     * @param serial 设备编号
-//     * @param channel 通道序号 默认值: 1
-//     * @param code 通道编号,通过 /api/v1/device/channellist 获取的 ChannelList.ID, 该参数和 channel 二选一传递即可
-//     * @param cdn TODO 转推 CDN 地址, 形如: [rtmp|rtsp]://xxx, encodeURIComponent
-//     * @param audio TODO 是否开启音频, 默认 开启
-//     * @param transport 流传输模式, 默认 UDP
-//     * @param checkchannelstatus TODO 是否检查通道状态, 默认 false, 表示 拉流前不检查通道状态是否在线
-//     * @param transportmode TODO 当 transport=TCP 时有效, 指示流传输主被动模式, 默认被动
-//     * @param timeout TODO 拉流超时(秒),
-//     * @return
-//     */
-//    @GetMapping(value = "/start")
-//    public Mono<?> start(String serial ,
-//                         @RequestParam(required = false)Integer channel ,
-//                         @RequestParam(required = false)String code,
-//                         @RequestParam(required = false)String cdn,
-//                         @RequestParam(required = false)String audio,
-//                         @RequestParam(required = false)String transport,
-//                         @RequestParam(required = false)String checkchannelstatus ,
-//                         @RequestParam(required = false)String transportmode,
-//                         @RequestParam(required = false)String timeout
-//    ){
-//        DeferredResult<JSONObject> resultDeferredResult = new DeferredResult<>(userSetup.getPlayTimeout() + 10);
-//        MediaDevice device  = (MediaDevice) redissonClient.getBucket(SipUtils.keepAliveKey(serial)).get();
-////        MediaDevice device = storager.queryVideoDevice(serial);
-//        if (device == null ) {
-//            JSONObject result = new JSONObject();
-//            result.put("error","device[ " + serial + " ]未找到或已离线");
-//            resultDeferredResult.setResult(result);
-//        }
-//        resultDeferredResult.onTimeout(()->{
-//            log.info("播放等待超时");
-//            JSONObject result = new JSONObject();
-//            result.put("error","timeout");
-//            resultDeferredResult.setResult(result);
-//
-//            // 清理RTP server
-//        });
-//
-//        DeviceChannel deviceChannel = storager.queryChannel(serial, code);
-//        if (deviceChannel == null) {
-//            JSONObject result = new JSONObject();
-//            result.put("error","channel[ " + code + " ]未找到");
-//            resultDeferredResult.setResult(result);
-//        }else if (deviceChannel.getStatus() == 0) {
-//            JSONObject result = new JSONObject();
-//            result.put("error","channel[ " + code + " ]offline");
-//            resultDeferredResult.setResult(result);
-//        }
-//        MediaServerItem newMediaServerItem = playService.getNewMediaServerItem(device);
-//        PlayResult play = playService.play(newMediaServerItem, serial, code, (mediaServerItem, response)->{
-//            StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(serial, code);
-//            JSONObject result = new JSONObject();
-//            result.put("StreamID", streamInfo.getStreamId());
-//            result.put("DeviceID", device.getId());
-//            result.put("ChannelID", code);
-//            result.put("ChannelName", deviceChannel.getName());
-//            result.put("ChannelCustomName", "");
-//            result.put("FLV", streamInfo.getFlv());
-//            result.put("WS_FLV", streamInfo.getWs_flv());
-//            result.put("RTMP", streamInfo.getRtmp());
-//            result.put("HLS", streamInfo.getHls());
-//            result.put("RTSP", streamInfo.getRtsp());
-//            result.put("CDN", "");
-//            result.put("SnapURL", "");
-//            result.put("Transport", device.getTransport());
-//            result.put("StartAt", "");
-//            result.put("Duration", "");
-//            result.put("SourceVideoCodecName", "");
-//            result.put("SourceVideoWidth", "");
-//            result.put("SourceVideoHeight", "");
-//            result.put("SourceVideoFrameRate", "");
-//            result.put("SourceAudioCodecName", "");
-//            result.put("SourceAudioSampleRate", "");
-//            result.put("AudioEnable", "");
-//            result.put("Ondemand", "");
-//            result.put("InBytes", "");
-//            result.put("InBitRate", "");
-//            result.put("OutBytes", "");
-//            result.put("NumOutputs", "");
-//            result.put("CascadeSize", "");
-//            result.put("RelaySize", "");
-//            result.put("ChannelPTZType", "0");
-//            resultDeferredResult.setResult(result);
-////            Class<?> aClass = responseEntity.getClass().getSuperclass();
-////            Field body = null;
-////            try {
-////                // 使用反射动态修改返回的body
-////                body = aClass.getDeclaredField("body");
-////                body.setAccessible(true);
-////                body.set(responseEntity, result);
-////            } catch (NoSuchFieldException e) {
-////                e.printStackTrace();
-////            } catch (IllegalAccessException e) {
-////                e.printStackTrace();
-////            }
-//        }, (eventResult) -> {
-//            JSONObject result = new JSONObject();
-//            result.put("error", "channel[ " + code + " ] " + eventResult.msg);
-//            resultDeferredResult.setResult(result);
-//        });
-//        return Mono.just(resultDeferredResult);
-//    }
-//}

+ 0 - 55
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/controller/DeferredController.java

@@ -1,55 +0,0 @@
-package org.jetlinks.community.media.controller;
-
-import org.hswebframework.web.exception.BusinessException;
-import org.jetlinks.community.media.gb28181.result.PlayResult;
-import org.jetlinks.community.media.gb28181.result.WVPResult;
-import org.springframework.http.ResponseEntity;
-import org.springframework.web.context.request.async.DeferredResult;
-import reactor.core.publisher.Mono;
-
-import java.util.concurrent.TimeoutException;
-
-/**
- * @author lifang
- * @version 1.0.0
- * @ClassName DeferedController.java
- * @Description TODO
- * @createTime 2022年02月19日 10:06:00
- */
-public interface DeferredController {
-    default Mono<Object> deferredResultHandler(PlayResult playResult) {
-        DeferredResult<ResponseEntity<String>> result = playResult.getResult();
-        //最长1分钟
-        long currentTimeMillis = System.currentTimeMillis();
-        while (!result.isSetOrExpired()){
-            if((System.currentTimeMillis()-currentTimeMillis)>10*1000){
-                if (playResult.getTimeout() != null) {
-                    playResult.getTimeout().subscribe();
-                    playResult.setTimeout(null);
-                }
-//                return Mono.error(new TimeoutException("请求超时"));
-            }
-        }
-        Object data = result.getResult();
-        if(data instanceof ResponseEntity){
-            ResponseEntity response= (ResponseEntity) data;
-            Object body = response.getBody();
-            if(body instanceof WVPResult){
-                WVPResult wvpResult= (WVPResult) body;
-                if(wvpResult.getCode()==0){
-                    if(playResult.getComplete()!=null){
-                        return playResult.getComplete().thenReturn(wvpResult.getData()).defaultIfEmpty(new WVPResult<>());
-                    }
-                    return Mono.justOrEmpty(wvpResult.getData());
-                }else {
-                    if(playResult.getTimeout()!=null){
-                        return playResult.getTimeout().then(Mono.error(new BusinessException(wvpResult.getMsg())));
-                    }
-                    return Mono.error(new BusinessException(wvpResult.getMsg()));
-                }
-            }
-            return Mono.error(new BusinessException("返回结果类型不为Response,请联系工作人员查看"));
-        }
-        return Mono.error(new BusinessException("返回结果类型不为WvpResult,请联系工作人员查看"));
-    }
-}

+ 15 - 11
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/controller/MediaDeviceController.java

@@ -60,7 +60,7 @@ import java.util.stream.Collectors;
 @Resource(id="media-device",name = "媒体流设备")
 @AllArgsConstructor
 @Tag(name = "媒体视频设备")
-public class MediaDeviceController implements ReactiveServiceCrudController<MediaDevice, String>,DeferredController{
+public class MediaDeviceController implements ReactiveServiceCrudController<MediaDevice, String>{
     private final LocalMediaDeviceService mediaDeviceService;
     private final RedisCacheStorageImpl redisCacheStorage;
     private final SipCommander cmder;
@@ -84,7 +84,7 @@ public class MediaDeviceController implements ReactiveServiceCrudController<Medi
     @QueryAction
     @Operation(summary = "设备点播")
     @PostMapping("/{deviceId}/{channelId}/_start")
-    public Flux<Object> play(@PathVariable String deviceId,
+    public Mono<Object> play(@PathVariable String deviceId,
                              @PathVariable String channelId) {
         return
             //获取设备信息
@@ -95,7 +95,7 @@ public class MediaDeviceController implements ReactiveServiceCrudController<Medi
                 .switchIfEmpty(Mono.error(new BusinessException("未找到可用的zlm媒体服务器")))
                 .flatMapMany(mediaServerItem -> {
                     String key = SubscribeKeyGenerate.getSubscribeKey(DeferredResultHolder.CALLBACK_CMD_PLAY,deviceId,channelId);
-                    return messageBroker.handleReply(deviceId,key,Duration.ofSeconds(10))
+                    return messageBroker.handleReply(deviceId,key,Duration.ofSeconds(30))
                         .onErrorResume(DeviceOperationException.class,error->
                             //超时响应处理
                             Mono.defer(()->{
@@ -123,14 +123,14 @@ public class MediaDeviceController implements ReactiveServiceCrudController<Medi
                         .mergeWith(playService.play(mediaServerItem, deviceId, channelId, null, null).thenMany(Flux.empty()))
                         .flatMap(this::convertReply);
                 })
-            ;
+                .singleOrEmpty();
     }
 
 
     @QueryAction
     @Operation(summary = "停止点播")
-    @GetMapping("/{deviceId}/{channelId}/_stop")
-    public Flux<Object> playStop(@PathVariable String deviceId, @PathVariable String channelId) {
+    @PostMapping("/{deviceId}/{channelId}/_stop")
+    public Mono<Object> playStop(@PathVariable String deviceId, @PathVariable String channelId) {
         if(log.isDebugEnabled()){
             log.debug(String.format("设备预览/回放停止API调用,streamId:%s_%s", deviceId, channelId ));
         }
@@ -158,21 +158,23 @@ public class MediaDeviceController implements ReactiveServiceCrudController<Medi
                                 .doOnNext(ignore -> mediaServerItemService.closeRTPServer(device, channelId))
                                 .subscribe();
                         }).then(Mono.empty())))
-            .flatMap(this::convertReply);
+            .flatMap(this::convertReply)
+            .singleOrEmpty();
     }
 
     @PostMapping("/{deviceId}/channels/_sync")
     @CreateAction
     @Operation(summary = "更新通道")
-    public Flux<Object> getDeviceDetailInfo(@PathVariable("deviceId") @Parameter(description = "设备ID") String id) {
+    public Mono<Object> getDeviceDetailInfo(@PathVariable("deviceId") @Parameter(description = "设备ID") String id) {
         if (log.isDebugEnabled()) {
             log.debug("设备通道信息同步API调用,deviceId:" + id);
         }
+        String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + id;
         MediaDevice device = redisCacheStorage.getDevice(id);
         if(device==null){
-            return Flux.error(new BusinessException("设备已离线,无法更新通道最新信息"));
+            return Mono.error(new BusinessException("设备已离线,无法更新通道最新信息"));
         }
-        String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + id;
+
         return  messageBroker.handleReply(id,key,Duration.ofSeconds(10))
             .onErrorResume(DeviceOperationException.class,error-> {
                 //设备下线
@@ -183,10 +185,12 @@ public class MediaDeviceController implements ReactiveServiceCrudController<Medi
                 MediaMessageReply<String> reply = MediaMessageReply.of(String.format("同步通道失败,错误码: %s, %s", event.statusCode, event.msg), null);
                 reply.setSuccess(false);
                 reply.setMessageId(key);
+                reply.setDeviceId(id);
                 messageBroker.reply(reply).subscribe();
             })
                 .then(Mono.empty()))
-            .flatMap(this::convertReply);
+            .flatMap(this::convertReply)
+            .singleOrEmpty();
     }
 
     private Flux<Object> convertReply(DeviceMessageReply reply){

+ 81 - 0
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/controller/MediaServerController.java

@@ -0,0 +1,81 @@
+package org.jetlinks.community.media.controller;
+
+import cn.hutool.json.JSONUtil;
+import io.swagger.v3.oas.annotations.tags.Tag;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.hswebframework.web.authorization.annotation.Authorize;
+import org.hswebframework.web.authorization.annotation.QueryAction;
+import org.hswebframework.web.authorization.annotation.Resource;
+import org.hswebframework.web.authorization.annotation.SaveAction;
+import org.jetlinks.community.media.service.LocalMediaServerItemService;
+import org.jetlinks.community.media.storage.impl.RedisCacheStorageImpl;
+import org.jetlinks.community.media.zlm.ZLMRunner;
+import org.jetlinks.community.media.zlm.entity.MediaServerItem;
+import org.jetlinks.community.utils.RedisUtil;
+import org.springframework.web.bind.annotation.*;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import java.util.*;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName MediaServerController.java
+ * @Description TODO
+ * @createTime 2022年02月28日 10:43:00
+ */
+@RestController
+@RequestMapping("/media/server")
+@Slf4j
+@Authorize(ignore = true)
+@Resource(id="media-server",name = "媒体流服务器")
+@AllArgsConstructor
+@Tag(name = "媒体流服务器")
+public class MediaServerController {
+    private final ZLMRunner zlmRunner;
+    private final RedisCacheStorageImpl cacheStorage;
+    private final RedisUtil redisUtil;
+    private static Map<String, String> providers = new HashMap<>();
+    private LocalMediaServerItemService localMediaServerItemService;
+    static {
+        providers.put("id","zlmedia");
+        providers.put("name","ZLMedia");
+    }
+    @QueryAction
+    @GetMapping("/providers")
+    public Flux<Map> providers(){
+        return Flux.just(providers);
+    }
+
+
+    @QueryAction
+    @GetMapping("/default")
+    public Mono<MediaServerItem> getServer(){
+        return localMediaServerItemService.getDefaultMediaServer();
+    }
+
+    @SaveAction
+    @PatchMapping("/save")
+    public Mono<Void> saveOrUpdate(@RequestBody MediaServerItem mediaServerItem){
+        return localMediaServerItemService.findById(mediaServerItem.getId())
+            .defaultIfEmpty(new MediaServerItem())
+            .filter(old->!old.equals(mediaServerItem))
+            .flatMap(old->
+                //配置发生变化
+                localMediaServerItemService.save(mediaServerItem)
+                    .concatWith(zlmRunner.startAllConnection().then(Mono.empty()))
+                    .then()
+            )
+            .doOnNext(ignore->{
+                //更新缓存信息
+                cacheStorage.setMediaServerItem(mediaServerItem);
+            });
+    }
+    public static void main(String[] args) {
+        String str="{\"name\":\"meit\",\"id\":\"gb28181_MediaServer\",\"provider\":\"zlmedia\",\"httpPort\":80,\"ip\":\"192.168.100.32\",\"hookIp\":\"192.168.104.244\",\"hookPort\":8848,\"hookSslEnabled\":false,\"rtpProxyPort\":10000,\"rtpEnable\":false,\"secret\":\"15645645456\"}";
+        MediaServerItem mediaServerItem = JSONUtil.toBean(str, MediaServerItem.class);
+
+        System.out.println(mediaServerItem);
+    }
+}

+ 16 - 14
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalMediaDeviceService.java

@@ -100,19 +100,21 @@ public class LocalMediaDeviceService extends GenericReactiveCrudService<MediaDev
                             sessionManager.register(sipSession);
                         }
                     })
-                    .filter(ignore->mediaDevice.isFirsRegister())
-                    .flatMap(device -> {
-                        //首次注册执行注册设备信息,创建设备会话
-                        DeviceRegisterMessage registerMessage = new DeviceRegisterMessage();
-                        registerMessage.setDeviceId(mediaDevice.getId());
-                        registerMessage.setMessageId(IdUtils.newUUID());
-                        registerMessage.setTimestamp(System.currentTimeMillis());
-                        registerMessage.addHeader("deviceName","GB28181-2016 "+mediaDevice.getId());
-                        registerMessage.addHeader("productId",mediaDevice.getProductId());
-
-                        return eventBus.publish("/device/*/*/register",registerMessage)
-                            .doOnNext(ignore->heartBeat(mediaDevice));
-                    })
+                    .switchIfEmpty(Mono.defer(()->{
+                        if(mediaDevice.isFirsRegister()){
+                            //首次注册执行注册设备信息,创建设备会话
+                            DeviceRegisterMessage registerMessage = new DeviceRegisterMessage();
+                            registerMessage.setDeviceId(mediaDevice.getId());
+                            registerMessage.setMessageId(IdUtils.newUUID());
+                            registerMessage.setTimestamp(System.currentTimeMillis());
+                            registerMessage.addHeader("deviceName","GB28181-2016 "+mediaDevice.getId());
+                            registerMessage.addHeader("productId",mediaDevice.getProductId());
+                            return eventBus.publish("/device/*/*/register",registerMessage)
+                                .doOnNext(ignore->heartBeat(mediaDevice))
+                                .then(Mono.empty());
+                        }
+                        return Mono.empty();
+                    }))
                     .then(Mono.empty())
             )
             .doOnComplete(()->{
@@ -215,7 +217,7 @@ public class LocalMediaDeviceService extends GenericReactiveCrudService<MediaDev
             .notIn(MediaDevice::getId,deviceIds)
             .fetch()
             .doOnNext(this::unRegister)
-            .concatWith(sipServerHelper.createSip( SipServerConfig.of("34020000002000000001","192.168.104.244", 7001,"udp","340200000","utf-8","12345678",10L,"1")).then(Mono.empty()))
+            .concatWith(sipServerHelper.createSip( SipServerConfig.of("34020000002000000001","192.168.104.244", 7001,"udp","340200000","GB2312","12345678",10L,"1")).then(Mono.empty()))
             .doOnError(e->{
                 e.printStackTrace();
             })

+ 31 - 35
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalMediaServerItemService.java

@@ -191,7 +191,7 @@ public class LocalMediaServerItemService extends GenericReactiveCrudService<Medi
     /**
      * zlm 重启后重置他的推流信息, TODO 给正在使用的设备发送停止命令
      */
-    private void clearRTPServer(MediaServerItem mediaServerItem) {
+    public void clearRTPServer(MediaServerItem mediaServerItem) {
 //        mediaServerItem.setSsrcConfig(new SsrcConfig(mediaServerItem.getId(), null, sipConfig.getDomain()));
         mediaServerItem.setSsrcConfig(new SsrcConfig(mediaServerItem.getId(), null,"340200000"));
         redisUtil.zAdd(VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + serverId, mediaServerItem.getServerId(), 0);
@@ -294,7 +294,8 @@ public class LocalMediaServerItemService extends GenericReactiveCrudService<Medi
 
     public Mono<MediaServerItem> getDefaultMediaServer() {
         return this.createQuery()
-            .where(MediaServerItem::isDefaultServer,true)
+            .selectExcludes(MediaServerItem::getSsrcConfig)
+//            .where(MediaServerItem::isDefaultServer,true)
             .fetchOne();
     }
 
@@ -358,20 +359,20 @@ public class LocalMediaServerItemService extends GenericReactiveCrudService<Medi
                 if (serverItem.getHttpPort() == 0) {
                     serverItem.setHttpPort(zlmServerConfig.getHttpPort());
                 }
-                if (serverItem.getHttpSSlPort() == 0) {
-                    serverItem.setHttpSSlPort(zlmServerConfig.getHttpSSLport());
+                if (serverItem.getHttpSslPort() == 0) {
+                    serverItem.setHttpSslPort(zlmServerConfig.getHttpSSLport());
                 }
                 if (serverItem.getRtmpPort() == 0) {
                     serverItem.setRtmpPort(zlmServerConfig.getRtmpPort());
                 }
-                if (serverItem.getRtmpSSlPort() == 0) {
-                    serverItem.setRtmpSSlPort(zlmServerConfig.getRtmpSslPort());
+                if (serverItem.getRtmpSslPort() == 0) {
+                    serverItem.setRtmpSslPort(zlmServerConfig.getRtmpSslPort());
                 }
                 if (serverItem.getRtspPort() == 0) {
                     serverItem.setRtspPort(zlmServerConfig.getRtspPort());
                 }
-                if (serverItem.getRtspSSLPort() == 0) {
-                    serverItem.setRtspSSLPort(zlmServerConfig.getRtspSSlport());
+                if (serverItem.getRtspSsLPort() == 0) {
+                    serverItem.setRtspSsLPort(zlmServerConfig.getRtspSSlport());
                 }
                 if (serverItem.getRtpProxyPort() == 0) {
                     serverItem.setRtpProxyPort(zlmServerConfig.getRtpProxyPort());
@@ -433,7 +434,7 @@ public class LocalMediaServerItemService extends GenericReactiveCrudService<Medi
 //    }
 //
 
-    private void refreshServerId(MediaServerItem serverItem){
+    public void refreshServerId(MediaServerItem serverItem){
         JSONObject mediaServerConfig = zlmresTfulUtils.getMediaServerConfig(serverItem);
         JSONObject data = JSONUtil.parseObj(JSONUtil.parseArray(mediaServerConfig.get("data")).get(0));
         serverItem.setServerId(String.valueOf( data.get("general.mediaServerId")));
@@ -513,7 +514,7 @@ public class LocalMediaServerItemService extends GenericReactiveCrudService<Medi
         log.info("[ ZLM:{} ]-[ {}:{} ]设置zlm",
             mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
         String protocol = sslEnabled ? "https" : "http";
-        String hookPrex = String.format("%s://%s:%s/index/hook", protocol, mediaServerItem.getHookIp(), serverPort);
+        String hookPrex = String.format("%s://%s:%s/index/hook", protocol, mediaServerItem.getHookIp(), mediaServerItem.getHookPort());
 //        String hookPrex = "http://192.168.104.244:8848/index/hook";
         String recordHookPrex = null;
         if (mediaServerItem.getRecordAssistPort() != 0) {
@@ -525,6 +526,9 @@ public class LocalMediaServerItemService extends GenericReactiveCrudService<Medi
         param.put("ffmpeg.bin","/usr/local/ffmpeg/bin/ffmpeg");
         param.put("ffmpeg.snap","%25s -i %25s -y -f mjpeg -t 0.001 %25s");
         param.put("hook.enable","1");
+        param.put("general.fmp4_demand","1");
+        param.put("general.hls_demand","1");
+        param.put("general.modifyStamp","1");
         param.put("hook.on_flow_report","");
         param.put("hook.on_play",String.format("%s/on_play", hookPrex));
         param.put("hook.on_http_access",String.format("%s/on_http_access", hookPrex));
@@ -540,6 +544,7 @@ public class LocalMediaServerItemService extends GenericReactiveCrudService<Medi
         param.put("hook.on_stream_not_found",String.format("%s/on_stream_not_found", hookPrex));
         param.put("hook.on_server_keepalive",String.format("%s/on_server_keepalive", hookPrex));
         param.put("hook.timeoutSec","20");
+        param.put("rtmp.modifyStamp","1");
         param.put("general.enable_audio","1");
         param.put("general.streamNoneReaderDelayMS","-1".equals(mediaServerItem.getStreamNoneReaderDelayMS())?"3600000":mediaServerItem.getStreamNoneReaderDelayMS() );
 
@@ -701,37 +706,28 @@ public class LocalMediaServerItemService extends GenericReactiveCrudService<Medi
         }
         streamInfoResult.setMediaServerId(mediaInfo.getId());
         streamInfoResult.setRtmp(String.format("rtmp://%s:%s/%s/%s", addr, mediaInfo.getRtmpPort(), app,  stream));
-        if (mediaInfo.getRtmpSSlPort() != 0) {
-            streamInfoResult.setRtmps(String.format("rtmps://%s:%s/%s/%s", addr, mediaInfo.getRtmpSSlPort(), app,  stream));
-        }
-        if(mediaInfo.getRtspPort()!=80){
-            streamInfoResult.setRtsp(String.format("rtsp://%s:%s/%s/%s", addr, mediaInfo.getRtspPort(), app,  stream));
-        }else {
-            streamInfoResult.setRtsp(String.format("rtsp://%s/%s/%s", addr, app,  stream));
-        }
-
-        if (mediaInfo.getRtspSSLPort() != 0) {
-            streamInfoResult.setRtsps(String.format("rtsps://%s:%s/%s/%s", addr, mediaInfo.getRtspSSLPort(), app,  stream));
-        }
-        streamInfoResult.setFlv(String.format("http://%s:%s/%s/%s.flv", addr, mediaInfo.getHttpPort(), app,  stream));
-        streamInfoResult.setWs_flv(String.format("ws://%s:%s/%s/%s.flv", addr, mediaInfo.getHttpPort(), app,  stream));
+        streamInfoResult.setRtmps(String.format("rtmps://%s:%s/%s/%s", addr, mediaInfo.getRtmpSslPort(), app,  stream));
+        streamInfoResult.setRtsp(String.format("rtsp://%s:%s/%s/%s", addr, mediaInfo.getRtspPort(), app,  stream));
+        streamInfoResult.setRtsps(String.format("rtsps://%s:%s/%s/%s", addr, mediaInfo.getRtspSsLPort(), app,  stream));
+        streamInfoResult.setFlv(String.format("http://%s:%s/%s/%s.live.flv", addr, mediaInfo.getHttpPort(), app,  stream));
+        streamInfoResult.setWs_flv(String.format("ws://%s:%s/%s/%s.live.flv", addr, mediaInfo.getHttpPort(), app,  stream));
         streamInfoResult.setHls(String.format("http://%s:%s/%s/%s/hls.m3u8", addr, mediaInfo.getHttpPort(), app,  stream));
         streamInfoResult.setWs_hls(String.format("ws://%s:%s/%s/%s/hls.m3u8", addr, mediaInfo.getHttpPort(), app,  stream));
         streamInfoResult.setFmp4(String.format("http://%s:%s/%s/%s.live.mp4", addr, mediaInfo.getHttpPort(), app,  stream));
         streamInfoResult.setWs_fmp4(String.format("ws://%s:%s/%s/%s.live.mp4", addr, mediaInfo.getHttpPort(), app,  stream));
         streamInfoResult.setTs(String.format("http://%s:%s/%s/%s.live.ts", addr, mediaInfo.getHttpPort(), app,  stream));
         streamInfoResult.setWs_ts(String.format("ws://%s:%s/%s/%s.live.ts", addr, mediaInfo.getHttpPort(), app,  stream));
-        if (mediaInfo.getHttpSSlPort() != 0) {
-            streamInfoResult.setHttps_flv(String.format("https://%s:%s/%s/%s.flv", addr, mediaInfo.getHttpSSlPort(), app,  stream));
-            streamInfoResult.setWss_flv(String.format("wss://%s:%s/%s/%s.flv", addr, mediaInfo.getHttpSSlPort(), app,  stream));
-            streamInfoResult.setHttps_hls(String.format("https://%s:%s/%s/%s/hls.m3u8", addr, mediaInfo.getHttpSSlPort(), app,  stream));
-            streamInfoResult.setWss_hls(String.format("wss://%s:%s/%s/%s/hls.m3u8", addr, mediaInfo.getHttpSSlPort(), app,  stream));
-            streamInfoResult.setHttps_fmp4(String.format("https://%s:%s/%s/%s.live.mp4", addr, mediaInfo.getHttpSSlPort(), app,  stream));
-            streamInfoResult.setWss_fmp4(String.format("wss://%s:%s/%s/%s.live.mp4", addr, mediaInfo.getHttpSSlPort(), app,  stream));
-            streamInfoResult.setHttps_ts(String.format("https://%s:%s/%s/%s.live.ts", addr, mediaInfo.getHttpSSlPort(), app,  stream));
-            streamInfoResult.setWss_ts(String.format("wss://%s:%s/%s/%s.live.ts", addr, mediaInfo.getHttpSSlPort(), app,  stream));
-            streamInfoResult.setWss_ts(String.format("wss://%s:%s/%s/%s.live.ts", addr, mediaInfo.getHttpSSlPort(), app,  stream));
-            streamInfoResult.setRtc(String.format("https://%s:%s/index/api/webrtc?app=%s&stream=%s&type=play", mediaInfo.getStreamIp(), mediaInfo.getHttpSSlPort(), app,  stream));
+        if (mediaInfo.getHttpSslPort() != 0) {
+            streamInfoResult.setHttps_flv(String.format("https://%s:%s/%s/%s.live.flv", addr, mediaInfo.getHttpSslPort(), app,  stream));
+            streamInfoResult.setWss_flv(String.format("wss://%s:%s/%s/%s.live.flv", addr, mediaInfo.getHttpSslPort(), app,  stream));
+            streamInfoResult.setHttps_hls(String.format("https://%s:%s/%s/%s/hls.m3u8", addr, mediaInfo.getHttpSslPort(), app,  stream));
+            streamInfoResult.setWss_hls(String.format("wss://%s:%s/%s/%s/hls.m3u8", addr, mediaInfo.getHttpSslPort(), app,  stream));
+            streamInfoResult.setHttps_fmp4(String.format("https://%s:%s/%s/%s.live.mp4", addr, mediaInfo.getHttpSslPort(), app,  stream));
+            streamInfoResult.setWss_fmp4(String.format("wss://%s:%s/%s/%s.live.mp4", addr, mediaInfo.getHttpSslPort(), app,  stream));
+            streamInfoResult.setHttps_ts(String.format("https://%s:%s/%s/%s.live.ts", addr, mediaInfo.getHttpSslPort(), app,  stream));
+            streamInfoResult.setWss_ts(String.format("wss://%s:%s/%s/%s.live.ts", addr, mediaInfo.getHttpSslPort(), app,  stream));
+            streamInfoResult.setWss_ts(String.format("wss://%s:%s/%s/%s.live.ts", addr, mediaInfo.getHttpSslPort(), app,  stream));
+            streamInfoResult.setRtc(String.format("https://%s:%s/index/api/webrtc?app=%s&stream=%s&type=play", mediaInfo.getStreamIp(), mediaInfo.getHttpSslPort(), app,  stream));
         }
 
         streamInfoResult.setTracks(tracks);

+ 81 - 78
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalPlayService.java

@@ -66,7 +66,7 @@ public class LocalPlayService  {
 
     private final ZLMRESTfulUtils zlmresTfulUtils;
 
-    private final UserSetup userSetup;
+    private final LocalMediaDeviceService deviceService;
 
     private final StandaloneDeviceMessageBroker messageBroker;
 
@@ -76,88 +76,91 @@ public class LocalPlayService  {
         if (mediaServerItem == null) {
             return Mono.error(new BusinessException("未找到可用的zlm"));
         }
-        MediaDevice device = redisCatchStorage.getDevice(deviceId);
-        StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
-        return streamInfo==null?
-            Mono.defer(()->{
-                SSRCInfo ssrcInfo;
-                String streamId = null;
-                if (mediaServerItem.isRtpEnable()) {
-                    streamId = String.format("%s_%s", device.getId(), channelId);
-                }
-                ssrcInfo = mediaServerItemService.openRTPServer(mediaServerItem, streamId);
-                // 发送点播消息
-                return cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInUse,JSONObject response) -> {
-                    log.info("收到订阅消息: " + response.toString());
-                    onPublishHandlerForPlay(mediaServerItemInUse, response, deviceId, channelId).subscribe();
-                    if (hookEvent != null) {
-                        hookEvent.accept(mediaServerItem, response);
-                    }
-                }, (event) -> {
-                    // 点播返回sip错误
-                    mediaServerItemService.closeRTPServer(device, channelId);
-                    MediaMessageReply messageReply =   MediaMessageReply.of(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg),null);
-                    messageReply.setSuccess(false);
-                    if (errorEvent != null) {
-                        errorEvent.accept(event);
-                    }
-                    messageBroker.reply(messageReply).subscribe();
-                }).then(Mono.empty());
-            }):
-            Mono.defer(()->{
-                String streamId = streamInfo.getStreamId();
-                if (streamId == null) {
-                    redisCatchStorage.stopPlay(streamInfo);
-                    MediaMessageReply messageReply = MediaMessageReply.of("点播失败, redis缓存streamId等于null",null);
-                    messageReply.setSuccess(false);
-                    return Mono.empty();
-                }
-                return    Mono.justOrEmpty(redisCatchStorage.getMediaServerItem(streamInfo.getMediaServerId()))
-                    .flatMap(mediaInfo->{
-                        JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaInfo, streamId);
-                        if (rtpInfo != null && rtpInfo.getBool("exist")) {
-                            MediaMessageReply messageReply = MediaMessageReply.of(null, streamInfo);
-                            messageReply.setMessageId(key);
-                            messageReply.setSuccess(true);
+       return  deviceService.findById(deviceId)
+           .switchIfEmpty(Mono.error(new BusinessException("设备不存在")))
+            .flatMap(device -> {
+                StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
+                return streamInfo==null?
+                    Mono.defer(()->{
+                        SSRCInfo ssrcInfo;
+                        String streamId = null;
+                        if (mediaServerItem.isRtpEnable()) {
+                            streamId = String.format("%s_%s", device.getId(), channelId);
+                        }
+                        ssrcInfo = mediaServerItemService.openRTPServer(mediaServerItem, streamId);
+                        // 发送点播消息
+                        return cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInUse,JSONObject response) -> {
+                            log.info("收到订阅消息: " + response.toString());
+                            onPublishHandlerForPlay(mediaServerItemInUse, response, deviceId, channelId).subscribe();
                             if (hookEvent != null) {
-                                try {
-                                    hookEvent.accept(mediaServerItem, JSONUtil.parseObj(JSON.toJSONString(streamInfo)));
-                                } catch (Exception e) {
-                                    log.error("点播回调函数失败,",e);
-                                }
+                                hookEvent.accept(mediaServerItem, response);
                             }
-                            return  messageBroker.reply(messageReply);
-                        } else {
-                            // TODO 点播前是否重置状态
-                            redisCatchStorage.stopPlay(streamInfo);
-                            deviceChannelService.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
-                            SSRCInfo ssrcInfo;
-                            String streamId2 = null;
-                            if (mediaServerItem.isRtpEnable()) {
-                                streamId2 = String.format("%s_%s", device.getId(), channelId);
+                        }, (event) -> {
+                            // 点播返回sip错误
+                            mediaServerItemService.closeRTPServer(device, channelId);
+                            MediaMessageReply messageReply =   MediaMessageReply.of(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg),null);
+                            messageReply.setSuccess(false);
+                            if (errorEvent != null) {
+                                errorEvent.accept(event);
                             }
-                            ssrcInfo = mediaServerItemService.openRTPServer(mediaServerItem, streamId2);
-
-                            return cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
-                                log.info("收到订阅消息: " + response.toString());
-                                onPublishHandlerForPlay(mediaServerItemInuse, response, deviceId, channelId).subscribe();
-                            }, (event) -> {
-                                mediaServerItemService.closeRTPServer(device, channelId);
-                                MediaMessageReply messageReply = MediaMessageReply.of(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
+                            messageBroker.reply(messageReply).subscribe();
+                        }).then(Mono.empty());
+                    }):
+                    Mono.defer(()->{
+                        String streamId = streamInfo.getStreamId();
+                        if (streamId == null) {
+                            redisCatchStorage.stopPlay(streamInfo);
+                            MediaMessageReply messageReply = MediaMessageReply.of("点播失败, redis缓存streamId等于null",null);
+                            messageReply.setSuccess(false);
+                            return Mono.empty();
+                        }
+                        return    Mono.justOrEmpty(redisCatchStorage.getMediaServerItem(streamInfo.getMediaServerId()))
+                            .flatMap(mediaInfo->{
+                                JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaInfo, streamId);
+                                if (rtpInfo != null && rtpInfo.getBool("exist")) {
+                                    MediaMessageReply messageReply = MediaMessageReply.of(null, streamInfo);
+                                    messageReply.setMessageId(key);
+                                    messageReply.setSuccess(true);
+                                    if (hookEvent != null) {
+                                        try {
+                                            hookEvent.accept(mediaServerItem, JSONUtil.parseObj(JSON.toJSONString(streamInfo)));
+                                        } catch (Exception e) {
+                                            log.error("点播回调函数失败,",e);
+                                        }
+                                    }
+                                    return  messageBroker.reply(messageReply);
+                                } else {
+                                    // TODO 点播前是否重置状态
+                                    redisCatchStorage.stopPlay(streamInfo);
+                                    deviceChannelService.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
+                                    SSRCInfo ssrcInfo;
+                                    String streamId2 = null;
+                                    if (mediaServerItem.isRtpEnable()) {
+                                        streamId2 = String.format("%s_%s", device.getId(), channelId);
+                                    }
+                                    ssrcInfo = mediaServerItemService.openRTPServer(mediaServerItem, streamId2);
+
+                                    return cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
+                                        log.info("收到订阅消息: " + response.toString());
+                                        onPublishHandlerForPlay(mediaServerItemInuse, response, deviceId, channelId).subscribe();
+                                    }, (event) -> {
+                                        mediaServerItemService.closeRTPServer(device, channelId);
+                                        MediaMessageReply messageReply = MediaMessageReply.of(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
+                                        messageReply.setMessageId(key);
+                                        messageReply.setSuccess(false);
+                                        messageBroker.reply(messageReply).subscribe();
+                                    })
+                                        .thenReturn(1L);
+                                }
+                            })
+                            .switchIfEmpty(Mono.defer(()->{
+                                MediaMessageReply messageReply = MediaMessageReply.of("媒体服务器暂不可用", null);
                                 messageReply.setMessageId(key);
                                 messageReply.setSuccess(false);
-                                messageBroker.reply(messageReply).subscribe();
-                            })
-                                .thenReturn(1L);
-                        }
-                    })
-                    .switchIfEmpty(Mono.defer(()->{
-                        MediaMessageReply messageReply = MediaMessageReply.of("媒体服务器暂不可用", null);
-                        messageReply.setMessageId(key);
-                        messageReply.setSuccess(false);
-                        return messageBroker.reply(messageReply);
-                    }))
-                    .then(Mono.empty());
+                                return messageBroker.reply(messageReply);
+                            }))
+                            .then(Mono.empty());
+                    });
             });
     }
 

+ 65 - 36
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/message/MessageRequestProcessor.java

@@ -49,51 +49,80 @@ public class MessageRequestProcessor extends SipRequestProcessorParent {
     private SipSubscribe sipSubscribe;
     @Autowired
     private RedisCacheStorageImpl redisCacheStorage;
+    @Autowired
+    private LocalMediaDeviceService mediaDeviceService;
     @SneakyThrows
     @Override
     public void processRequest(RequestEvent evt) {
         log.debug("接收到消息:" + evt.getRequest());
         String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
         CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME);
-        MediaDevice device = redisCacheStorage.getDevice(deviceId);
+        Element rootElement = getRootElement(evt, SipContext.getConfig().getCharset());
         // 查询设备是否存在 todo
-        Mono.just(new ParentPlatform())
-            .doOnNext(platform->{
-                try {
-                    if (device == null
-                        && platform == null) {
-                        // 不存在则回复404
-                        responseAck(evt, Response.NOT_FOUND, "device "+ deviceId +" not found");
-                        log.warn("[设备未找到 ]: {}", deviceId);
-                        if (sipSubscribe.getErrorSubscribe(callIdHeader.getCallId()) != null){
-                            SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(new DeviceNotFoundEvent(evt.getDialog()));
-                            sipSubscribe.getErrorSubscribe(callIdHeader.getCallId()).accept(eventResult);
-                        };
-                    }else {
-                        Element rootElement = getRootElement(evt, SipContext.getConfig().getCharset());
-                        if (device != null) {
-                            if(deviceProcessor.hasDownstreams()){
-                                deviceFluxSink.next(Tuple.of(evt,device,rootElement));
-                            }
-                        }else if(platform!=null){ // 由于上面已经判断都为null则直接返回,所以这里device和parentPlatform必有一个不为null
-                            if(platProcessor.hasDownstreams()){
-                                platFluxSink.next(Tuple.of(evt,platform,rootElement));
-                            }
-                        }else {
-                            responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE, "Unsupported message type, must Control/Notify/Query/Response");
-                        }
+        mediaDeviceService.findById(deviceId)
+            .doOnNext(device->{
+                if(deviceProcessor.hasDownstreams()){
+                    deviceFluxSink.next(Tuple.of(evt,device,rootElement));
+                }
+            })
+            .switchIfEmpty(Mono.just(new ParentPlatform())
+                .doOnNext(platform->{
+                    if(platProcessor.hasDownstreams()){
+                        platFluxSink.next(Tuple.of(evt,platform,rootElement));
                     }
+                })
+                .switchIfEmpty(Mono.fromRunnable(()->{
+                    try {
+                        responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE, "Unsupported message type, must Control/Notify/Query/Response");
+                    } catch (SipException e) {
+                        log.warn("SIP 回复错误", e);
+                    } catch (InvalidArgumentException e) {
+                        log.warn("参数无效", e);
+                    } catch (ParseException e) {
+                        log.warn("SIP回复时解析异常", e);
+                    }
+                }))
+                .then(Mono.empty())
+            )
+            .subscribe();
 
-                } catch (SipException e) {
-                    log.warn("SIP 回复错误", e);
-                } catch (InvalidArgumentException e) {
-                    log.warn("参数无效", e);
-                } catch (ParseException e) {
-                    log.warn("SIP回复时解析异常", e);
-                } catch (DocumentException e) {
-                    log.warn("解析XML消息内容异常", e);
-                }
-            }).subscribe();
+//        Mono.just(new ParentPlatform())
+//            .doOnNext(platform->{
+//                try {
+//                    if (device == null
+//                        && platform == null) {
+//                        // 不存在则回复404
+//                        responseAck(evt, Response.NOT_FOUND, "device "+ deviceId +" not found");
+//                        log.warn("[设备未找到 ]: {}", deviceId);
+//                        if (sipSubscribe.getErrorSubscribe(callIdHeader.getCallId()) != null){
+//                            SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(new DeviceNotFoundEvent(evt.getDialog()));
+//                            sipSubscribe.getErrorSubscribe(callIdHeader.getCallId()).accept(eventResult);
+//                        };
+//                    }else {
+//                        Element rootElement = getRootElement(evt, SipContext.getConfig().getCharset());
+//                        if (device != null) {
+//                            if(deviceProcessor.hasDownstreams()){
+//                                deviceFluxSink.next(Tuple.of(evt,device,rootElement));
+//                            }
+//                        }else if(platform!=null){ // 由于上面已经判断都为null则直接返回,所以这里device和parentPlatform必有一个不为null
+//                            if(platProcessor.hasDownstreams()){
+//                                platFluxSink.next(Tuple.of(evt,platform,rootElement));
+//                            }
+//                        }else {
+//                            responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE, "Unsupported message type, must Control/Notify/Query/Response");
+//                        }
+//                    }
+//
+//                } catch (SipException e) {
+//                    log.warn("SIP 回复错误", e);
+//                } catch (InvalidArgumentException e) {
+//                    log.warn("参数无效", e);
+//                } catch (ParseException e) {
+//                    log.warn("SIP回复时解析异常", e);
+//                } catch (DocumentException e) {
+//                    log.warn("解析XML消息内容异常", e);
+//                }
+//            }).subscribe();
 
         // 查询上级平台是否存在
 //        ParentPlatform parentPlatform = storage.queryParentPlatByServerGBId(id);

+ 8 - 11
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/message/notify/CatalogResponseMessageProcessor.java

@@ -11,6 +11,7 @@ import org.jetlinks.community.media.gb28181.result.WVPResult;
 import org.jetlinks.community.media.message.MediaMessageReply;
 import org.jetlinks.community.media.service.LocalMediaDeviceChannelService;
 import org.jetlinks.community.media.service.LocalMediaDeviceService;
+import org.jetlinks.community.media.sip.SipContext;
 import org.jetlinks.community.media.sip.request.message.MessageHandlerAbstract;
 import org.jetlinks.community.media.transmit.callback.DeferredResultHolder;
 import org.jetlinks.community.media.transmit.callback.RequestMessage;
@@ -51,7 +52,7 @@ public class CatalogResponseMessageProcessor extends MessageHandlerAbstract {
     public void handleForDevice(RequestEvent evt, MediaDevice device, Element element) {
         Element rootElement = null;
         try {
-            rootElement = getRootElement(evt, Optional.ofNullable(device.getCharset()).orElse("UTF-8"));
+            rootElement = getRootElement(evt, Optional.ofNullable(SipContext.getConfig().getCharset()).orElse("GB2312"));
             Element deviceListElement = rootElement.element("DeviceList");
             Element sumNumElement = rootElement.element("SumNum");
             if (sumNumElement == null || deviceListElement == null) {
@@ -82,16 +83,12 @@ public class CatalogResponseMessageProcessor extends MessageHandlerAbstract {
                 reply.setDeviceId(device.getId());
                 //更新设备通道
                 messageBroker.reply(reply)
-                    .flatMap(__->
-                        deviceChannelService.createDelete()
-                            .where(MediaDeviceChannel::getDeviceId,device.getId())
-                            .execute())
-                    .flatMap(__->deviceChannelService.save(channelList))
-                    .flatMap(result->
-                        mediaDeviceService.createUpdate()
-                            .where(MediaDevice::getId,device.getId())
-                            .set(MediaDevice::getChannelCount,channelList.size())
-                            .execute())
+                    .mergeWith(Mono.zip( deviceChannelService.createDelete()
+                        .where(MediaDeviceChannel::getDeviceId,device.getId())
+                        .execute(),deviceChannelService.save(channelList),    mediaDeviceService.createUpdate()
+                        .where(MediaDevice::getId,device.getId())
+                        .set(MediaDevice::getChannelCount,channelList.size())
+                        .execute()).then(Mono.empty()))
                     .subscribe();
             }
             // 回复200 OK

+ 7 - 1
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/storage/impl/RedisCacheStorageImpl.java

@@ -425,7 +425,8 @@ public class RedisCacheStorageImpl {
 
     public void updateDevice(MediaDevice device) {
         String key = VideoManagerConstants.DEVICE_PREFIX + serverId + "_" + device.getId();
-        redis.set(key, device, Optional.ofNullable(device.getExpires()).orElse(SipContext.getConfig().getTimeout().intValue()));
+//        redis.set(key, device, Optional.ofNullable(SipContext.getConfig().getTimeout()).orElse(120L));
+        redis.set(key, device,120L);
     }
 
 
@@ -440,6 +441,11 @@ public class RedisCacheStorageImpl {
         return (MediaDevice)redis.get(key);
     }
 
+
+    public void setMediaServerItem(MediaServerItem mediaServerItem) {
+        redis.set(VideoManagerConstants.MEDIA_SERVER_PREFIX + serverId + "_" + mediaServerItem.getServerId(),mediaServerItem);
+    }
+
     public MediaServerItem getMediaServerItem(String mediaServerId) {
         if (mediaServerId == null) {
             return null;

+ 5 - 5
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/transmit/cmd/SipCommander.java

@@ -93,9 +93,9 @@ public class SipCommander {
 
             StringBuilder content = new StringBuilder(200);
             content.append("v=0\r\n");
-            content.append("o="+ sipConfig.getId()+" 0 0 IN IP4 "+ mediaServerItem.getSdpIp() +"\r\n");
+            content.append("o="+ sipConfig.getId()+" 0 0 IN IP4 "+ mediaServerItem.getIp() +"\r\n");
             content.append("s=Play\r\n");
-            content.append("c=IN IP4 "+ mediaServerItem.getSdpIp() +"\r\n");
+            content.append("c=IN IP4 "+ mediaServerItem.getIp() +"\r\n");
             content.append("t=0 0\r\n");
             JSONObject subscribeKey = new JSONObject()
                 .putOpt("app", "rtp")
@@ -104,8 +104,8 @@ public class SipCommander {
                 .putOpt("mediaServerId", mediaServerItem.getServerId());
 
             //是否需要扩展sdp
-//            if (userSetup.isSeniorSdp()) {
-            if (false) {
+            if (userSetup.isSeniorSdp()) {
+//            if (false) {
                 if("TCP-PASSIVE".equals(streamMode)) {
                     content.append("m=video "+ ssrcInfo.getPort() +" TCP/RTP/AVP 96 126 125 99 34 98 97\r\n");
                 }else if ("TCP-ACTIVE".equals(streamMode)) {
@@ -360,7 +360,7 @@ public class SipCommander {
         try {
             StringBuilder cmdXml = new StringBuilder(200);
 //            cmdXml.append("<?xml version=\"1.0\" encoding="+"\"GB2312\""+"?>\r\n");
-            cmdXml.append("<?xml version=\"1.0\" encoding="+ Optional.ofNullable(SipContext.getConfig().getCharset()).orElse("utf-8") +"?>\r\n");
+            cmdXml.append("<?xml version=\"1.0\" encoding="+ Optional.ofNullable(SipContext.getConfig().getCharset()).orElse("GB2312") +"?>\r\n");
             cmdXml.append("<Query>\r\n");
             cmdXml.append("<CmdType>Catalog</CmdType>\r\n");
             cmdXml.append("<SN>" + (int)((Math.random()*9+1)*100000) + "</SN>\r\n");

+ 5 - 4
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/ZLMHttpHookListener.java

@@ -308,10 +308,13 @@ public class ZLMHttpHookListener {
         if(regist){
             System.out.println("视频回调开始-------------streamId-------"+streamId+"-----------------------------" + System.currentTimeMillis() + "---------------------------------------");
         }
-
+        StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId);
         MediaMessageReply<MediaItem> mediaMessageReply = MediaMessageReply.of(null,item);
         mediaMessageReply.setMessageId(ZLMKeyGenerate.getStreamChangedKey(ZLMHttpHookSubscribe.HookType.on_stream_changed,mediaServerId,app,regist,streamId));
         mediaMessageReply.setSuccess(true);
+        if(streamInfo!=null){
+            mediaMessageReply.setDeviceId(streamInfo.getDeviceID());
+        }
         deviceMessageBroker.reply(mediaMessageReply).subscribe();
         if ("rtmp".equals(schema)){
             log.info("on_stream_changed:注册(true)/注销(false)->{}, app->{}, stream->{}", regist, app, streamId);
@@ -327,7 +330,6 @@ public class ZLMHttpHookListener {
             }
             //播放结束
             if ("rtp".equals(app) && !regist ) {
-                StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId);
                 if (streamInfo!=null){
                     redisCatchStorage.stopPlay(streamInfo);
                     String deviceId = streamInfo.getDeviceID();
@@ -439,7 +441,6 @@ public class ZLMHttpHookListener {
             Mono<Void> result=null;
             if (streamInfoForPlayCatch != null) {
                 // 如果在给上级推流,也不停止。
-
                 if (redisCatchStorage.isChannelSendingRTP(streamInfoForPlayCatch.getChannelId())) {
                     ret.putOpt("close", false);
                 } else {
@@ -534,7 +535,7 @@ public class ZLMHttpHookListener {
      */
     @ResponseBody
     @PostMapping(value = "/on_server_started", produces = "application/json;charset=UTF-8")
-    public Mono<ResponseEntity<String>> onServerStarted(ServerWebExchange exchange, @RequestBody JSONObject jsonObject) throws Exception {
+    public Mono<ResponseEntity<String>> onServerStarted(ServerWebExchange exchange, @RequestBody JSONObject jsonObject) {
         if (log.isDebugEnabled()) {
             log.debug("[ ZLM HOOK ]on_server_started API调用,参数:" + jsonObject.toString());
         }

+ 19 - 23
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/ZLMRunner.java

@@ -5,7 +5,7 @@ import cn.hutool.json.JSONArray;
 import cn.hutool.json.JSONUtil;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.jetlinks.community.media.config.MediaConfig;
+//import org.jetlinks.community.media.config.MediaConfig;
 import org.jetlinks.community.media.service.LocalMediaServerItemService;
 import org.jetlinks.community.media.zlm.entity.MediaServerItem;
 import org.springframework.boot.CommandLineRunner;
@@ -22,7 +22,7 @@ import java.util.*;
 @Slf4j
 @AutoConfigureAfter(LocalMediaServerItemService.class)
 @AllArgsConstructor
-@EnableConfigurationProperties(MediaConfig.class)
+//@EnableConfigurationProperties(MediaConfig.class)
 public class ZLMRunner implements CommandLineRunner {
 
     private Map<String, Boolean> startGetMedia;
@@ -33,7 +33,7 @@ public class ZLMRunner implements CommandLineRunner {
 
     private final LocalMediaServerItemService mediaServerItemService;
 
-    private final MediaConfig mediaConfig;
+//    private final MediaConfig mediaConfig;
 
     @Override
     public void run(String... strings)  {
@@ -41,24 +41,24 @@ public class ZLMRunner implements CommandLineRunner {
         //获取默认服务器配置
         mediaServerItemService.getDefaultMediaServer()
             //添加默认媒体服务器配置
-            .switchIfEmpty(mediaServerItemService.save(mediaConfig.getMediaSerItem()).thenReturn(mediaConfig.getMediaSerItem()))
+//            .switchIfEmpty(mediaServerItemService.save(mediaConfig.getMediaSerItem()).thenReturn(mediaConfig.getMediaSerItem()))
             //清除在线服务器信息
             .mergeWith(mediaServerItemService.clearMediaServerForOnline().then(Mono.empty()))
             .mergeWith(Mono.delay(Duration.ofSeconds(20)).flatMap(ignore-> timeoutHandle()).then(Mono.empty()))
             //更新默认服务器配置
-            .flatMap(defaultMedia->{
-                //判断默认服务器是否发生变动
-                MediaServerItem mediaSerItem = mediaConfig.getMediaSerItem();
-                if(!mediaSerItem.equals(defaultMedia)){
-                    mediaSerItem.setDefaultServer(true);
-                    return mediaServerItemService.deleteById(defaultMedia.getId())
-                        .concatWith(
-                            mediaServerItemService.save(mediaSerItem).
-                                then(Mono.empty()))
-                        .then(Mono.just(mediaSerItem));
-                }
-                return Mono.just(defaultMedia);
-            })
+//            .flatMap(defaultMedia->{
+//                //判断默认服务器是否发生变动
+//                MediaServerItem mediaSerItem = mediaConfig.getMediaSerItem();
+//                if(!mediaSerItem.equals(defaultMedia)){
+//                    mediaSerItem.setDefaultServer(true);
+//                    return mediaServerItemService.deleteById(defaultMedia.getId())
+//                        .concatWith(
+//                            mediaServerItemService.save(mediaSerItem).
+//                                then(Mono.empty()))
+//                        .then(Mono.just(mediaSerItem));
+//                }
+//                return Mono.just(defaultMedia);
+//            })
 
             //订阅 zlm启动事件
             .doOnNext(ignore->subscribeOnServerStarted())
@@ -132,13 +132,10 @@ public class ZLMRunner implements CommandLineRunner {
      * 获取所有zlm 并开启主动连接
      * @return  Mono
      */
-    private Mono<Void> startAllConnection(){
+    public Mono<Void> startAllConnection(){
         return  mediaServerItemService
             .createQuery()
             .fetch()
-//            .defaultIfEmpty(
-//                mediaConfig.getMediaSerItem()
-//            )
             .doOnNext(mediaServerItem ->{
                 //将媒体服务器进行标记
                 startGetMedia=Optional.ofNullable(startGetMedia).orElse(new HashMap<>());
@@ -150,7 +147,6 @@ public class ZLMRunner implements CommandLineRunner {
             .runOn(Schedulers.parallel())
             .flatMap(this::connectZlmServer)
             .then();
-
     }
 
 
@@ -160,7 +156,7 @@ public class ZLMRunner implements CommandLineRunner {
      * @param mediaServerItem
      * @return  Mono
      */
-    private Mono<Void> connectZlmServer(MediaServerItem mediaServerItem){
+    public Mono<Void> connectZlmServer(MediaServerItem mediaServerItem){
         return getMediaServerConfig(mediaServerItem, 1)
             .doOnNext(zlmServerConfig ->{
                 zlmServerConfig.setIp(mediaServerItem.getIp());

+ 70 - 70
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/ZLMServerConfig.java

@@ -1,40 +1,40 @@
 package org.jetlinks.community.media.zlm;
 
-import com.alibaba.fastjson.annotation.JSONField;
+import cn.hutool.core.annotation.Alias;
 
 public class ZLMServerConfig {
 
-    @JSONField(name = "api.apiDebug")
+    @Alias( "api.apiDebug")
     private String apiDebug;
 
-    @JSONField(name = "api.secret")
+    @Alias(  "api.secret")
     private String apiSecret;
 
-    @JSONField(name = "ffmpeg.bin")
+    @Alias(  "ffmpeg.bin")
     private String ffmpegBin;
 
-    @JSONField(name = "ffmpeg.cmd")
+    @Alias(  "ffmpeg.cmd")
     private String ffmpegCmd;
 
-    @JSONField(name = "ffmpeg.log")
+    @Alias(  "ffmpeg.log")
     private String ffmpegLog;
 
-    @JSONField(name = "general.enableVhost")
+    @Alias(  "general.enableVhost")
     private String generalEnableVhost;
 
-    @JSONField(name = "general.mediaServerId")
+    @Alias(  "general.mediaServerId")
     private String generalMediaServerId;
 
-    @JSONField(name = "general.flowThreshold")
+    @Alias(  "general.flowThreshold")
     private String generalFlowThreshold;
 
-    @JSONField(name = "general.maxStreamWaitMS")
+    @Alias(  "general.maxStreamWaitMS")
     private String generalMaxStreamWaitMS;
 
-    @JSONField(name = "general.streamNoneReaderDelayMS")
+    @Alias(  "general.streamNoneReaderDelayMS")
     private int generalStreamNoneReaderDelayMS;
 
-    @JSONField(name = "ip")
+    @Alias(  "ip")
     private String ip;
 
     private String sdpIp;
@@ -47,175 +47,175 @@ public class ZLMServerConfig {
 
     private String createTime;
 
-    @JSONField(name = "hls.fileBufSize")
+    @Alias(  "hls.fileBufSize")
     private String hlsFileBufSize;
 
-    @JSONField(name = "hls.filePath")
+    @Alias(  "hls.filePath")
     private String hlsFilePath;
 
-    @JSONField(name = "hls.segDur")
+    @Alias(  "hls.segDur")
     private String hlsSegDur;
 
-    @JSONField(name = "hls.segNum")
+    @Alias(  "hls.segNum")
     private String hlsSegNum;
 
-    @JSONField(name = "hook.access_file_except_hls")
+    @Alias(  "hook.access_file_except_hls")
     private String hookAccessFileExceptHLS;
 
-    @JSONField(name = "hook.admin_params")
+    @Alias(  "hook.admin_params")
     private String hookAdminParams;
 
-    @JSONField(name = "hook.alive_interval")
+    @Alias(  "hook.alive_interval")
     private int hookAliveInterval;
 
-    @JSONField(name = "hook.enable")
+    @Alias(  "hook.enable")
     private String hookEnable;
 
-    @JSONField(name = "hook.on_flow_report")
+    @Alias(  "hook.on_flow_report")
     private String hookOnFlowReport;
 
-    @JSONField(name = "hook.on_http_access")
+    @Alias(  "hook.on_http_access")
     private String hookOnHttpAccess;
 
-    @JSONField(name = "hook.on_play")
+    @Alias(  "hook.on_play")
     private String hookOnPlay;
 
-    @JSONField(name = "hook.on_publish")
+    @Alias(  "hook.on_publish")
     private String hookOnPublish;
 
-    @JSONField(name = "hook.on_record_mp4")
+    @Alias(  "hook.on_record_mp4")
     private String hookOnRecordMp4;
 
-    @JSONField(name = "hook.on_rtsp_auth")
+    @Alias(  "hook.on_rtsp_auth")
     private String hookOnRtspAuth;
 
-    @JSONField(name = "hook.on_rtsp_realm")
+    @Alias(  "hook.on_rtsp_realm")
     private String hookOnRtspRealm;
 
-    @JSONField(name = "hook.on_shell_login")
+    @Alias(  "hook.on_shell_login")
     private String hookOnShellLogin;
 
-    @JSONField(name = "hook.on_stream_changed")
+    @Alias(  "hook.on_stream_changed")
     private String hookOnStreamChanged;
 
-    @JSONField(name = "hook.on_stream_none_reader")
+    @Alias(  "hook.on_stream_none_reader")
     private String hookOnStreamNoneReader;
 
-    @JSONField(name = "hook.on_stream_not_found")
+    @Alias(  "hook.on_stream_not_found")
     private String hookOnStreamNotFound;
 
-    @JSONField(name = "hook.timeoutSec")
+    @Alias(  "hook.timeoutSec")
     private String hookTimeoutSec;
 
-    @JSONField(name = "http.charSet")
+    @Alias(  "http.charSet")
     private String httpCharSet;
 
-    @JSONField(name = "http.keepAliveSecond")
+    @Alias(  "http.keepAliveSecond")
     private String httpKeepAliveSecond;
 
-    @JSONField(name = "http.maxReqCount")
+    @Alias(  "http.maxReqCount")
     private String httpMaxReqCount;
 
-    @JSONField(name = "http.maxReqSize")
+    @Alias(  "http.maxReqSize")
     private String httpMaxReqSize;
 
-    @JSONField(name = "http.notFound")
+    @Alias(  "http.notFound")
     private String httpNotFound;
 
-    @JSONField(name = "http.port")
+    @Alias(  "http.port")
     private int httpPort;
 
-    @JSONField(name = "http.rootPath")
+    @Alias(  "http.rootPath")
     private String httpRootPath;
 
-    @JSONField(name = "http.sendBufSize")
+    @Alias(  "http.sendBufSize")
     private String httpSendBufSize;
 
-    @JSONField(name = "http.sslport")
+    @Alias(  "http.sslport")
     private int httpSSLport;
 
-    @JSONField(name = "multicast.addrMax")
+    @Alias(  "multicast.addrMax")
     private String multicastAddrMax;
 
-    @JSONField(name = "multicast.addrMin")
+    @Alias(  "multicast.addrMin")
     private String multicastAddrMin;
 
-    @JSONField(name = "multicast.udpTTL")
+    @Alias(  "multicast.udpTTL")
     private String multicastUdpTTL;
 
-    @JSONField(name = "record.appName")
+    @Alias(  "record.appName")
     private String recordAppName;
 
-    @JSONField(name = "record.filePath")
+    @Alias(  "record.filePath")
     private String recordFilePath;
 
-    @JSONField(name = "record.fileSecond")
+    @Alias(  "record.fileSecond")
     private String recordFileSecond;
 
-    @JSONField(name = "record.sampleMS")
+    @Alias(  "record.sampleMS")
     private String recordFileSampleMS;
 
-    @JSONField(name = "rtmp.handshakeSecond")
+    @Alias(  "rtmp.handshakeSecond")
     private String rtmpHandshakeSecond;
 
-    @JSONField(name = "rtmp.keepAliveSecond")
+    @Alias(  "rtmp.keepAliveSecond")
     private String rtmpKeepAliveSecond;
 
-    @JSONField(name = "rtmp.modifyStamp")
+    @Alias(  "rtmp.modifyStamp")
     private String rtmpModifyStamp;
 
-    @JSONField(name = "rtmp.port")
+    @Alias(  "rtmp.port")
     private int rtmpPort;
 
-    @JSONField(name = "rtmp.sslport")
+    @Alias(  "rtmp.sslport")
     private int rtmpSslPort;
 
-    @JSONField(name = "rtp.audioMtuSize")
+    @Alias(  "rtp.audioMtuSize")
     private String rtpAudioMtuSize;
 
-    @JSONField(name = "rtp.clearCount")
+    @Alias(  "rtp.clearCount")
     private String rtpClearCount;
 
-    @JSONField(name = "rtp.cycleMS")
+    @Alias(  "rtp.cycleMS")
     private String rtpCycleMS;
 
-    @JSONField(name = "rtp.maxRtpCount")
+    @Alias(  "rtp.maxRtpCount")
     private String rtpMaxRtpCount;
 
-    @JSONField(name = "rtp.videoMtuSize")
+    @Alias(  "rtp.videoMtuSize")
     private String rtpVideoMtuSize;
 
-    @JSONField(name = "rtp_proxy.checkSource")
+    @Alias(  "rtp_proxy.checkSource")
     private String rtpProxyCheckSource;
 
-    @JSONField(name = "rtp_proxy.dumpDir")
+    @Alias(  "rtp_proxy.dumpDir")
     private String rtpProxyDumpDir;
 
-    @JSONField(name = "rtp_proxy.port")
+    @Alias(  "rtp_proxy.port")
     private int rtpProxyPort;
 
-    @JSONField(name = "rtp_proxy.timeoutSec")
+    @Alias(  "rtp_proxy.timeoutSec")
     private String rtpProxyTimeoutSec;
 
-    @JSONField(name = "rtsp.authBasic")
+    @Alias(  "rtsp.authBasic")
     private String rtspAuthBasic;
 
-    @JSONField(name = "rtsp.handshakeSecond")
+    @Alias(  "rtsp.handshakeSecond")
     private String rtspHandshakeSecond;
 
-    @JSONField(name = "rtsp.keepAliveSecond")
+    @Alias(  "rtsp.keepAliveSecond")
     private String rtspKeepAliveSecond;
 
-    @JSONField(name = "rtsp.port")
+    @Alias(  "rtsp.port")
     private int rtspPort;
 
-    @JSONField(name = "rtsp.sslport")
+    @Alias(  "rtsp.sslport")
     private int rtspSSlport;
 
-    @JSONField(name = "shell.maxReqSize")
+    @Alias(  "shell.maxReqSize")
     private String shellMaxReqSize;
 
-    @JSONField(name = "shell.shell")
+    @Alias(  "shell.shell")
     private String shellPhell;
 
 
@@ -600,7 +600,7 @@ public class ZLMServerConfig {
     }
 
     public void setRecordAppName(String recordAppName) {
-        this.recordAppName = recordAppName;
+        this.recordAppName=  recordAppName;
     }
 
     public String getRecordFilePath() {

+ 47 - 16
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/entity/MediaServerItem.java

@@ -20,25 +20,49 @@ import javax.persistence.Table;
 import java.sql.JDBCType;
 import java.util.Objects;
 
+/**
+ * 仅存放唯一一个流媒体信息
+ */
 @Data
-@Table(name = "media_server_item")
+@Table(name = "media_server_item",indexes = {
+//    @Index(name="unique",unique = true,columnList = "default_server")
+})
 public class MediaServerItem extends GenericEntity<String> {
 
     private static final Long serialVersionUID=1L;
+    @Column(name = "name")
+    @Schema(
+        description = "流媒体名称"
+    )
+    private String name;
+
     @Column(name = "server_id")
     @Schema(
         description = "服务器id"
     )
     private String serverId;
 
+
+    /*** 回调函数配置****/
+    @Column(name = "hook_ip")
+    private String hookIp;
+
+    @Column(name = "hook_ssl_enabled")
+    private boolean hookSslEnabled;
+
+    @Column(name = "hook_port")
+    private String hookPort;
+    /*** 回调函数配置****/
+
+
+    /*** 媒体流Ip地址***/
     @Column(name = "ip")
     @Schema(
-        description = "产品id"
+        description = "ip地址"
     )
     private String ip;
+    /*** 媒体流Ip地址***/
 
-    @Column(name = "hook_ip")
-    private String hookIp;
 
     @Column(name = "sdp_ip")
     private String sdpIp;
@@ -46,17 +70,22 @@ public class MediaServerItem extends GenericEntity<String> {
     @Column(name = "stream_ip")
     private String streamIp;
 
+    /*** 媒体流http-flv地址***/
     @Column(name = "http_port")
     private int httpPort;
 
     @Column(name = "http_ssl_port")
-    private int httpSSlPort;
+    private int httpSslPort;
+    /*** 媒体流http-flv地址***/
+
 
+    /*** 媒体流rtmp地址***/
     @Column(name = "rtmp_port")
     private int rtmpPort;
 
     @Column(name = "rtmp_ssl_port")
-    private int rtmpSSlPort;
+    private int rtmpSslPort;
+    /*** 媒体流rtmp地址***/
 
     @Column(name = "rtp_proxy_port")
     private int rtpProxyPort;
@@ -65,14 +94,16 @@ public class MediaServerItem extends GenericEntity<String> {
     private int rtspPort;
 
     @Column(name = "rtsp_ssl_port")
-    private int rtspSSLPort;
+    private int rtspSsLPort;
 
-    @Column(name = "auto_config")
-    private boolean autoConfig;
 
     @Column(name = "secret")
     private String secret;
 
+    @Column(name = "auto_config")
+    private boolean autoConfig;
+
+
     @Column(name = "stream_none_reader_delay_ms")
     private int streamNoneReaderDelayMS;
 
@@ -131,12 +162,12 @@ public class MediaServerItem extends GenericEntity<String> {
         sdpIp = StringUtils.isEmpty(zlmServerConfig.getSdpIp())? zlmServerConfig.getIp(): zlmServerConfig.getSdpIp();
         streamIp = StringUtils.isEmpty(zlmServerConfig.getStreamIp())? zlmServerConfig.getIp(): zlmServerConfig.getStreamIp();
         httpPort = zlmServerConfig.getHttpPort();
-        httpSSlPort = zlmServerConfig.getHttpSSLport();
+        httpSslPort = zlmServerConfig.getHttpSSLport();
         rtmpPort = zlmServerConfig.getRtmpPort();
-        rtmpSSlPort = zlmServerConfig.getRtmpSslPort();
+        rtmpSslPort = zlmServerConfig.getRtmpSslPort();
         rtpProxyPort = zlmServerConfig.getRtpProxyPort();
         rtspPort = zlmServerConfig.getRtspPort();
-        rtspSSLPort = zlmServerConfig.getRtspSSlport();
+        rtspSsLPort = zlmServerConfig.getRtspSSlport();
         autoConfig = true; // 默认值true;
         secret = zlmServerConfig.getApiSecret();
         streamNoneReaderDelayMS = zlmServerConfig.getGeneralStreamNoneReaderDelayMS();
@@ -154,12 +185,12 @@ public class MediaServerItem extends GenericEntity<String> {
         if (o == null || getClass() != o.getClass()) return false;
         MediaServerItem that = (MediaServerItem) o;
         return getHttpPort() == that.getHttpPort() &&
-            getHttpSSlPort() == that.getHttpSSlPort() &&
+            getHttpSslPort() == that.getHttpSslPort() &&
             getRtmpPort() == that.getRtmpPort() &&
-            getRtmpSSlPort() == that.getRtmpSSlPort() &&
+            getRtmpSslPort() == that.getRtmpSslPort() &&
             getRtpProxyPort() == that.getRtpProxyPort() &&
             getRtspPort() == that.getRtspPort() &&
-            getRtspSSLPort() == that.getRtspSSLPort() &&
+            getRtspSsLPort() == that.getRtspSsLPort() &&
             isAutoConfig() == that.isAutoConfig() &&
             getRecordAssistPort() == that.getRecordAssistPort() &&
             Objects.equals(getIp(), that.getIp()) &&
@@ -173,6 +204,6 @@ public class MediaServerItem extends GenericEntity<String> {
 
     @Override
     public int hashCode() {
-        return Objects.hash(getIp(), getHookIp(), getSdpIp(), getStreamIp(), getHttpPort(), getHttpSSlPort(), getRtmpPort(), getRtmpSSlPort(), getRtpProxyPort(), getRtspPort(), getRtspSSLPort(), isAutoConfig(), getSecret(), getRtpPortRange(), getSendRtpPortRange(), getRecordAssistPort());
+        return Objects.hash(getIp(), getHookIp(), getSdpIp(), getStreamIp(), getHttpPort(), getHttpSslPort(), getRtmpPort(), getRtmpSslPort(), getRtpProxyPort(), getRtspPort(), getRtspSsLPort(), isAutoConfig(), getSecret(), getRtpPortRange(), getSendRtpPortRange(), getRecordAssistPort());
     }
 }

+ 1 - 0
jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/web/NetworkConfigController.java

@@ -1,5 +1,6 @@
 package org.jetlinks.community.network.manager.web;
 
+import cn.hutool.json.JSONUtil;
 import io.swagger.v3.oas.annotations.Operation;
 import io.swagger.v3.oas.annotations.Parameter;
 import io.swagger.v3.oas.annotations.media.Schema;