18339543638 3 anos atrás
pai
commit
87e92204f9

+ 15 - 11
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/controller/PlayController.java

@@ -30,6 +30,7 @@ import java.util.Arrays;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
@@ -80,7 +81,7 @@ public class PlayController {
     @Operation(summary = "设备点播")
     @GetMapping("/start/{deviceId}/{channelId}")
     public Mono<Object> play(@PathVariable String deviceId,
-                                @PathVariable String channelId) {
+                             @PathVariable String channelId) {
 
         return
             //获取设备信息
@@ -88,16 +89,8 @@ public class PlayController {
                 //获取设备相连的媒体流服务器信息
                 .flatMap(playService::getNewMediaServerItem)
                 .switchIfEmpty(Mono.error(new BusinessException("未找到可用的zlm媒体服务器")))
-                .flatMap(mediaServerItem ->{
-                        try {
-                            return playService.play(mediaServerItem, deviceId, channelId, null, null);
-                        } catch (InterruptedException e) {
-                            log.error("设备点播线程中断,请稍后再试,",e);
-                            return Mono.error(new BusinessException("服务器繁忙,请稍后再试"));
-                        }
-                    }
-                )
-                .flatMap(wvpResult -> Mono.justOrEmpty(wvpResult.getData()));
+                .flatMap(mediaServerItem ->playService.play(mediaServerItem, deviceId, channelId, null, null))
+                .flatMap(playResult -> deferredResultHandler(playResult.getResult()));
     }
 
 //
@@ -351,5 +344,16 @@ public class PlayController {
 //		return result;
 //	}
 //
+
+    private Mono<Object> deferredResultHandler(DeferredResult result) {
+        //最长1分钟
+        long currentTimeMillis = System.currentTimeMillis();
+        while (!result.isSetOrExpired()){
+            if((System.currentTimeMillis()-currentTimeMillis)>3*1000){
+                return Mono.error(new TimeoutException("请求超时"));
+            }
+        }
+        return  Mono.justOrEmpty(result.getResult());
+    }
 }
 

+ 5 - 8
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalMediaServerItemService.java

@@ -7,6 +7,7 @@ import cn.hutool.json.JSONObject;
 import cn.hutool.json.JSONUtil;
 import lombok.extern.slf4j.Slf4j;
 import org.hswebframework.web.crud.service.GenericReactiveCrudService;
+import org.hswebframework.web.exception.BusinessException;
 import org.jetlinks.community.media.bean.SSRCInfo;
 import org.jetlinks.community.media.bean.StreamInfo;
 import org.jetlinks.community.media.contanst.VideoManagerConstants;
@@ -111,16 +112,12 @@ public class LocalMediaServerItemService extends GenericReactiveCrudService<Medi
                     //对ssrc进行更新 todo
                     SsrcConfig ssrcConfig = new SsrcConfig(mediaServerItem.getId(), null,"340200000");
                     mediaServerItem.setSsrcConfig(ssrcConfig);
-//                    redisUtil.set(VideoManagerConstants.MEDIA_SERVER_PREFIX +serverId + "_" + mediaServerItem.getServerId(), mediaServerItem);
                     return this.updateById(mediaServerItem.getId(),mediaServerItem).thenReturn(mediaServerItem);
                 }
                 return Mono.just(mediaServerItem);
             })
             .doOnNext(mediaServerItem->{
-//                String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + serverId + "_" + mediaServerItem.getId();
-//                if(!redisUtil.hasKey(key)){
                 redisUtil.set(VideoManagerConstants.MEDIA_SERVER_PREFIX + serverId + "_" + mediaServerItem.getServerId(),mediaServerItem);
-//                }
             })
             .subscribe();
     }
@@ -484,7 +481,7 @@ public class LocalMediaServerItemService extends GenericReactiveCrudService<Medi
 
         if (redisUtil.zSize(key)  == null || redisUtil.zSize(key) == 0) {
             log.info("获取负载最低的节点时无在线节点");
-            return null;
+            return Mono.error(new BusinessException("获取负载最低的节点时无在线节点"));
         }
 
         // 获取分数最低的,及并发最低的
@@ -505,7 +502,7 @@ public class LocalMediaServerItemService extends GenericReactiveCrudService<Medi
             mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
         String protocol = sslEnabled ? "https" : "http";
 //        String hookPrex = String.format("%s://%s:%s/index/hook", protocol, mediaServerItem.getHookIp(), serverPort);
-        String hookPrex = String.format("%s://%s/index/hook", protocol, mediaServerItem.getHookIp());
+        String hookPrex = "http://192.168.104.244:8848/index/hook";
         String recordHookPrex = null;
         if (mediaServerItem.getRecordAssistPort() != 0) {
             recordHookPrex = String.format("http://127.0.0.1:%s/api/record", mediaServerItem.getRecordAssistPort());
@@ -514,11 +511,11 @@ public class LocalMediaServerItemService extends GenericReactiveCrudService<Medi
         param.put("api.secret",mediaServerItem.getSecret()); // -profile:v Baseline
         param.put("ffmpeg.cmd","%s -fflags nobuffer -i %s -c:a aac -strict -2 -ar 44100 -ab 48k -c:v libx264  -f flv %s");
         param.put("ffmpeg.bin","/usr/local/ffmpeg/bin/ffmpeg");
-//        param.put("ffmpeg.snap","%25s -rtsp_transport tcp -i %25s -y -f mjpeg -t 0.001 %25s");
+        param.put("ffmpeg.snap","%25s -rtsp_transport tcp -i %25s -y -f mjpeg -t 0.1 %25s");
         param.put("hook.enable","1");
         param.put("hook.on_flow_report","");
         param.put("hook.on_play",String.format("%s/on_play", hookPrex));
-        param.put("hook.on_http_access","");
+        param.put("hook.on_http_access",String.format("%s/on_http_access", hookPrex));
         param.put("hook.on_publish", String.format("%s/on_publish", hookPrex));
         param.put("hook.on_record_mp4",recordHookPrex != null? String.format("%s/on_record_mp4", recordHookPrex): "");
         param.put("hook.on_record_ts","");

+ 190 - 183
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalPlayService.java

@@ -4,13 +4,18 @@ import cn.hutool.core.lang.UUID;
 import cn.hutool.core.util.StrUtil;
 import cn.hutool.json.JSONArray;
 import cn.hutool.json.JSONObject;
+import cn.hutool.json.JSONUtil;
+import com.alibaba.fastjson.JSON;
 import gov.nist.javax.sip.stack.SIPDialog;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.hswebframework.web.exception.BusinessException;
 import org.jetlinks.community.media.bean.SSRCInfo;
 import org.jetlinks.community.media.bean.StreamInfo;
+import org.jetlinks.community.media.config.UserSetup;
 import org.jetlinks.community.media.entity.MediaDevice;
+import org.jetlinks.community.media.entity.MediaDeviceChannel;
+import org.jetlinks.community.media.gb28181.event.SipSubscribe;
 import org.jetlinks.community.media.gb28181.result.WVPResult;
 import org.jetlinks.community.media.session.VideoStreamSessionManager;
 import org.jetlinks.community.media.storage.impl.RedisCacheStorageImpl;
@@ -24,6 +29,7 @@ import org.jetlinks.community.media.zlm.entity.MediaServerItem;
 import org.jetlinks.core.cluster.ClusterEventBus;
 import org.jetlinks.core.event.EventBus;
 import org.jetlinks.core.event.Subscription;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
 import org.springframework.stereotype.Service;
@@ -59,179 +65,217 @@ public class LocalPlayService  {
 
     private final ClusterEventBus clusterEventBus;
 
-    private final VideoStreamSessionManager streamSession;
+    private final DeferredResultHolder resultHolder;
+
+    private final VideoStreamSessionManager streamSessionManager;
 
     private final LocalMediaDeviceChannelService deviceChannelService;
 
     private final ZLMRESTfulUtils zlmresTfulUtils;
 
-    public Mono<WVPResult> play(MediaServerItem mediaServerItem, String deviceId, String channelId,
-                                ZLMHttpHookSubscribe.Event hookEvent, ZLMHttpHookSubscribe.Event errorEvent) throws InterruptedException {
+    private final UserSetup userSetup;
+    public Mono<PlayResult> play(MediaServerItem mediaServerItem, String deviceId, String channelId,
+                                 ZLMHttpHookSubscribe.Event hookEvent,  SipSubscribe.Event errorEvent) {
         //todo
-        mediaServerItem.setRtpEnable(false);
-        mediaServerItem.setRtpProxyPort(10000);
         PlayResult playResult = new PlayResult();
-        LinkedBlockingDeque<WVPResult> result = new LinkedBlockingDeque<>();
-        String msgId=UUID.randomUUID().toString();
-
-        RequestMessage msg = RequestMessage.of(deviceId,channelId,msgId,null);
+        RequestMessage msg = new RequestMessage();
+        String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId;
+        msg.setKey(key);
+        String uuid = java.util.UUID.randomUUID().toString();
+        msg.setId(uuid);
+        playResult.setUuid(uuid);
+        DeferredResult<ResponseEntity<String>> result = new DeferredResult<>(userSetup.getPlayTimeout());
+        playResult.setResult(result);
+        if (mediaServerItem == null) {
+            WVPResult wvpResult = new WVPResult();
+            wvpResult.setCode(-1);
+            wvpResult.setMsg("未找到可用的zlm");
+            msg.setData(wvpResult);
+            resultHolder.invokeResult(msg);
+            return Mono.just(playResult);
+        }
 
+        MediaDevice device = redisCatchStorage.getDevice(deviceId);
+        StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
+        playResult.setDevice(device);
+        // 超时处理
+        result.onTimeout(()->{
+            log.warn(String.format("设备点播超时,deviceId:%s ,channelId:%s", deviceId, channelId));
+            WVPResult wvpResult = new WVPResult();
+            wvpResult.setCode(-1);
+            SIPDialog dialog = streamSessionManager.getDialog(deviceId, channelId);
+            if (dialog != null) {
+                wvpResult.setMsg("收流超时,请稍候重试");
+            }else {
+                wvpResult.setMsg("点播超时,请稍候重试");
+            }
+            msg.setData(wvpResult);
+            // 点播超时回复BYE
+            cmder.streamByeCmd(device.getId(), channelId)
+                .doOnNext(ignore->
+                    // 释放rtpserver
+                    mediaServerItemService.closeRTPServer(playResult.getDevice(), channelId))
+                .doOnNext(ignore->
+                    // 回复之前所有的点播请求
+                    resultHolder.invokeAllResult(msg))
+                .subscribe();
+        });
+        result.onCompletion(()->{
+            // 点播结束时调用截图接口
+            String fileName =  deviceId + "_" + channelId + ".jpg";
+            ResponseEntity responseEntity =  (ResponseEntity)result.getResult();
+            if (responseEntity != null && responseEntity.getStatusCode() == HttpStatus.OK) {
+                WVPResult wvpResult = (WVPResult)responseEntity.getBody();
+                if (Objects.requireNonNull(wvpResult).getCode() == 0) {
+                    StreamInfo streamInfoForSuccess = (StreamInfo)wvpResult.getData();
+                    mediaServerItemService.findById(streamInfoForSuccess.getMediaServerId())
+                        .doOnNext(mediaInfo->{
+                            String classPath = null;
+                            try {
+                                classPath = ResourceUtils.getURL("classpath:").getPath();
+                                // 兼容打包为jar的class路径
+                                if(classPath.contains("jar")) {
+                                    classPath = classPath.substring(0, classPath.lastIndexOf("."));
+                                    classPath = classPath.substring(0, classPath.lastIndexOf("/") + 1);
+                                }
+                                if (classPath.startsWith("file:")) {
+                                    classPath = classPath.substring(classPath.indexOf(":") + 1);
+                                }
+                                String path = classPath + "static/static/snap/";
+                                // 兼容Windows系统路径(去除前面的“/”)
+                                if(System.getProperty("os.name").contains("indows")) {
+                                    path = path.substring(1);
+                                }
+                                String streamUrl = streamInfoForSuccess.getFlv();
+                                // 请求截图
+                                log.info("[请求截图]: " + fileName);
+                                zlmresTfulUtils.getSnap(mediaInfo, streamUrl, 15, 1, path, fileName);
+                            } catch (FileNotFoundException e) {
+                                log.error("存放截图文件路径不存在,",e);
+                            }
+                        })
+                        .subscribe();
+                }
+            }
+        });
+        if (streamInfo == null) {
+            SSRCInfo ssrcInfo;
+            String streamId = null;
+            if (mediaServerItem.isRtpEnable()) {
+                streamId = String.format("%s_%s", device.getId(), channelId);
+            }
 
-        Disposable subscribe = eventBus.subscribe(
-            Subscription.of("media_play", DeferredResultHolder.getCmdCallBackTopic(DeferredResultHolder.CALLBACK_CMD_PLAY, msg), Subscription.Feature.local))
-            .mergeWith(Mono.justOrEmpty(redisCatchStorage.queryPlayByDevice(deviceId, channelId))
-                .flatMap(streamInfo -> {
-                    if (StrUtil.isEmpty(streamInfo.getStreamId())) {
-                        return Mono.error(new BusinessException("点播失败, redis缓存streamId等于null"));
-                    }
-                    String mediaServerId = streamInfo.getMediaServerId();
-                    MediaServerItem mediaInfo = mediaServerItemService.getOneByServerId(mediaServerId);
+            ssrcInfo = mediaServerItemService.openRTPServer(mediaServerItem, streamId);
 
-                    JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaInfo, streamInfo.getStreamId());
+            // 发送点播消息
+            return cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInUse,JSONObject response) -> {
+                log.info("收到订阅消息: " + response.toString());
+                onPublishHandlerForPlay(mediaServerItemInUse, response, deviceId, channelId, uuid).subscribe();
+                if (hookEvent != null) {
+                    hookEvent.accept(mediaServerItem, response);
+                }
+            }, (event) -> {
+                WVPResult wvpResult = new WVPResult();
+                wvpResult.setCode(-1);
+                // 点播返回sip错误
+                mediaServerItemService.closeRTPServer(playResult.getDevice(), channelId);
+                wvpResult.setMsg(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg));
+                msg.setData(wvpResult);
+                resultHolder.invokeAllResult(msg);
+                if (errorEvent != null) {
+                    errorEvent.accept(event);
+                }
+            })
+                .thenReturn(playResult);
+        } else {
+            String streamId = streamInfo.getStreamId();
+            if (streamId == null) {
+                WVPResult wvpResult = new WVPResult();
+                wvpResult.setCode(-1);
+                wvpResult.setMsg("点播失败, redis缓存streamId等于null");
+                msg.setData(wvpResult);
+                resultHolder.invokeAllResult(msg);
+                return Mono.just(playResult);
+            }
+            return mediaServerItemService.findById(streamInfo.getMediaServerId())
+                .flatMap(mediaInfo->{
+                    JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaInfo, streamId);
                     if (rtpInfo != null && rtpInfo.getBool("exist")) {
+                        WVPResult wvpResult = new WVPResult();
+                        wvpResult.setCode(0);
+                        wvpResult.setMsg("success");
+                        wvpResult.setData(streamInfo);
+                        msg.setData(wvpResult);
+                        resultHolder.invokeAllResult(msg);
                         if (hookEvent != null) {
-                            //todo
-//                        hookEvent.response(mediaServerItem, com.alibaba.fastjson.JSONObject.parseObject(JSON.toJSONString(streamInfo)));
+                            try {
+                                hookEvent.accept(mediaServerItem, JSONUtil.parseObj(JSON.toJSONString(streamInfo)));
+                            } catch (Exception e) {
+                                log.error("点播回调函数失败,",e);
+                            }
                         }
-                        clusterEventBus.publish(DeferredResultHolder.getCmdCallBackTopic(DeferredResultHolder.CALLBACK_CMD_PLAY, msg),
-                            streamInfo);
                     } else {
                         // TODO 点播前是否重置状态
                         redisCatchStorage.stopPlay(streamInfo);
-                        return deviceChannelService.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId())
-                            .flatMap(ignore -> {
-                                SSRCInfo ssrcInfo;
-                                String streamId2 = null;
-                                if (mediaServerItem.isRtpEnable()) {
-                                    streamId2 = String.format("%s_%s", deviceId, channelId);
-                                }
-                                ssrcInfo = mediaServerItemService.openRTPServer(mediaServerItem, streamId2);
-
-                                return cmder.playStreamCmd(mediaServerItem, ssrcInfo, redisCatchStorage.getDevice(deviceId), channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
-                                    log.info("收到订阅消息: " + response.toString());
-                                    onPublishHandlerForPlay(mediaServerItemInuse, response, deviceId, channelId, msgId)
-                                        .subscribe();
-                                }, (event) -> {
-                                    mediaServerItemService.closeRTPServer(playResult.getDevice(), channelId);
-                                    WVPResult wvpResult = WVPResult.of(HttpStatus.INTERNAL_SERVER_ERROR.value(), String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
-                                    clusterEventBus.publish(DeferredResultHolder.getCmdCallBackTopic(DeferredResultHolder.CALLBACK_CMD_PLAY, msg),
-                                        wvpResult);
-                                });
-                            });
-                    }
-                    return Mono.empty();
-                })
-                .switchIfEmpty(Mono.just(mediaServerItem)
-                    .flatMap(serverItem -> {
+                        deviceChannelService.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
                         SSRCInfo ssrcInfo;
-                        String streamId = null;
-                        if (serverItem.isRtpEnable()) {
-                            streamId = String.format("%s_%s", deviceId, channelId);
+                        String streamId2 = null;
+                        if (mediaServerItem.isRtpEnable()) {
+                            streamId2 = String.format("%s_%s", device.getId(), channelId);
                         }
+                        ssrcInfo = mediaServerItemService.openRTPServer(mediaServerItem, streamId2);
 
-                        ssrcInfo = mediaServerItemService.openRTPServer(serverItem, streamId);
-
-                        // 发送点播消息
-                        MediaDevice device = redisCatchStorage.getDevice(deviceId);
-                        return cmder.playStreamCmd(serverItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {
+                        return cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
                             log.info("收到订阅消息: " + response.toString());
-                            onPublishHandlerForPlay(mediaServerItemInUse, response, deviceId, channelId, msgId).subscribe();
-                            if (hookEvent != null) {
-//                                    hookEvent.response(mediaServerItem, response);
-                            }
+                            onPublishHandlerForPlay(mediaServerItemInuse, response, deviceId, channelId, uuid).subscribe();
                         }, (event) -> {
-                            WVPResult wvpResult = WVPResult.of(HttpStatus.INTERNAL_SERVER_ERROR.value(), String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
-                            // 点播返回sip错误
                             mediaServerItemService.closeRTPServer(playResult.getDevice(), channelId);
-                            clusterEventBus.publish(DeferredResultHolder.getCmdCallBackTopic(DeferredResultHolder.CALLBACK_CMD_PLAY, msg),
-                                wvpResult);
-                            if (errorEvent != null) {
-                                //todo
-//                    errorEvent.response(event);
-                            }
+                            WVPResult wvpResult = new WVPResult();
+                            wvpResult.setCode(-1);
+                            wvpResult.setMsg(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg));
+                            msg.setData(wvpResult);
+                            resultHolder.invokeAllResult(msg);
                         });
-                    })).then(Mono.empty()))
-            .map(topicPayload -> topicPayload.bodyToJson(false).toJavaObject(WVPResult.class))
-            //返回结果
-            .doOnNext(wvpResult -> {
-                try {
-                    result.put(wvpResult);
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                }
-            })
-            .subscribe();
-
-        return  Mono.justOrEmpty(result.pollFirst(60,TimeUnit.SECONDS))
-            .flatMap(wvpResult->{
-                //取消订阅
-                subscribe.dispose();
-
-                if(wvpResult==null){
-                    return Mono.error(new BusinessException("服务器内部出现问题"));
-                }
-                if(wvpResult.getCode()!=HttpStatus.OK.value()){
-                    return Mono.error(new BusinessException(Optional.ofNullable(wvpResult.getMsg()).orElse("点播失败")));
-                }
-                return Mono.just(wvpResult);
-            })
-            // 点播结束时调用截图接口
-            .doOnNext(wvpResult -> {
-                try {
-                    String classPath = ResourceUtils.getURL("classpath:").getPath();
-                    // System.out.println(classPath);
-                    // 兼容打包为jar的class路径
-                    if (classPath.contains("jar")) {
-                        classPath = classPath.substring(0, classPath.lastIndexOf("."));
-                        classPath = classPath.substring(0, classPath.lastIndexOf("/") + 1);
-                    }
-                    if (classPath.startsWith("file:")) {
-                        classPath = classPath.substring(classPath.indexOf(":") + 1);
-                    }
-                    String path = classPath + "static/static/snap/";
-                    // 兼容Windows系统路径(去除前面的“/”)
-                    if (System.getProperty("os.name").contains("indows")) {
-                        path = path.substring(1);
                     }
-                    String fileName =  deviceId + "_" + channelId + ".jpg";
-                    if (Objects.requireNonNull(wvpResult).getCode() == HttpStatus.OK.value()) {
-                        StreamInfo streamInfoForSuccess = (StreamInfo)wvpResult.getData();
-                        MediaServerItem mediaInfo = mediaServerItemService.getOneByServerId(streamInfoForSuccess.getMediaServerId());
-                        String streamUrl = streamInfoForSuccess.getFmp4();
-                        // 请求截图
-                        log.info("[请求截图]: " + fileName);
-                        zlmresTfulUtils.getSnap(mediaInfo, streamUrl, 15, 1, path, fileName);
-                    }
-
-                } catch (FileNotFoundException e) {
-                    throw new BusinessException("设备上传视频截图文件找不到");
-                }
-            })
-            .switchIfEmpty(Mono.error(new BusinessException("点播超时")))
-            //保证接下来的操作流仅被触发一次
-            .doOnError(BusinessException.class, e -> cmder.streamByeCmd(deviceId, channelId).subscribe());
+                    return Mono.empty();
+                })
+                .thenReturn(playResult);
+        }
     }
 
 
-    private Mono<Void> onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String msgId) {
-        RequestMessage msg = RequestMessage.of(deviceId, channelId, msgId, null);
-        return onPublishHandler(mediaServerItem, resonse, deviceId, channelId, msgId)
-            .switchIfEmpty(Mono.fromRunnable(()->{
-                log.warn("设备预览API调用失败!");
-                clusterEventBus.publish(DeferredResultHolder.getCmdCallBackTopic(DeferredResultHolder.CALLBACK_CMD_PLAY, msg),
-                    WVPResult.of(HttpStatus.INTERNAL_SERVER_ERROR.value(),"设备预览API调用失败",null));
-            }))
+    private Mono<Void> onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid) {
+        RequestMessage msg = new RequestMessage();
+        msg.setId(uuid);
+        msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId);
+        return onPublishHandler(mediaServerItem, resonse, deviceId, channelId, uuid)
             .flatMap(streamInfo ->
-                deviceChannelService.startPlay(deviceId, channelId, streamInfo.getStreamId())
-                    .mergeWith(Mono.fromRunnable(()->{
+                deviceChannelService.createQuery()
+                    .where(MediaDeviceChannel::getChannelId,channelId)
+                    .where(MediaDeviceChannel::getDeviceId,deviceId)
+                    .fetchOne()
+                    .defaultIfEmpty(new MediaDeviceChannel())
+                    .flatMap(deviceChannel->{
                         redisCatchStorage.startPlay(streamInfo);
-                        WVPResult<StreamInfo> wvpResult = WVPResult.of(HttpStatus.OK.value(), "success", streamInfo);
-                        clusterEventBus.publish(DeferredResultHolder.getCmdCallBackTopic(DeferredResultHolder.CALLBACK_CMD_PLAY, msg),
-                            wvpResult);
-                    }))
-                    .then()
-            );
-
+                        msg.setData(JSON.toJSONString(streamInfo));
+                        WVPResult wvpResult = new WVPResult();
+                        wvpResult.setCode(0);
+                        wvpResult.setMsg("success");
+                        wvpResult.setData(streamInfo);
+                        msg.setData(wvpResult);
+                        resultHolder.invokeAllResult(msg);
+                        if (StrUtil.isNotEmpty(deviceChannel.getId())) {
+                            deviceChannel.setStreamId(streamInfo.getStreamId());
+                            return deviceChannelService.startPlay(deviceId, channelId, streamInfo.getStreamId());
+                        }
+                        return Mono.empty();
+                    })
+            )
+            .switchIfEmpty(Mono.fromRunnable(()->{
+                log.warn("设备预览API调用失败!");
+                msg.setData("设备预览API调用失败!");
+                resultHolder.invokeAllResult(msg);
+            }));
     }
 
 
@@ -250,45 +294,8 @@ public class LocalPlayService  {
             .switchIfEmpty(mediaServerItemService.getMediaServerForMinimumLoad())
             .switchIfEmpty(Mono.fromRunnable(()-> log.warn("点播时未找到可使用的ZLM...")));
     }
-    ////
-////
-////    @Override
-////    public void onPublishHandlerForPlayBack(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid) {
-////        RequestMessage msg = new RequestMessage();
-////        msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId);
-////        msg.setId(uuid);
-////        StreamInfo streamInfo = onPublishHandler(mediaServerItem, resonse, deviceId, channelId, uuid);
-////        if (streamInfo != null) {
-////            redisCatchStorage.startPlayback(streamInfo);
-////            msg.setData(JSON.toJSONString(streamInfo));
-////            resultHolder.invokeResult(msg);
-////        } else {
-////            log.warn("设备回放API调用失败!");
-////            msg.setData("设备回放API调用失败!");
-////            resultHolder.invokeResult(msg);
-////        }
-////    }
-////
-////
-////    @Override
-////    public void onPublishHandlerForDownload(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String uuid) {
-////        RequestMessage msg = new RequestMessage();
-////        msg.setKey(DeferredResultHolder.CALLBACK_CMD_DOWNLOAD + deviceId + channelId);
-////        msg.setId(uuid);
-////        StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId, uuid);
-////        if (streamInfo != null) {
-////            redisCatchStorage.startDownload(streamInfo);
-////            msg.setData(JSON.toJSONString(streamInfo));
-////            resultHolder.invokeResult(msg);
-////        } else {
-////            log.warn("设备预览API调用失败!");
-////            msg.setData("设备预览API调用失败!");
-////            resultHolder.invokeResult(msg);
-////        }
-////    }
-////
-////
-    public Mono<StreamInfo> onPublishHandler(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid) {
+
+    private Mono<StreamInfo> onPublishHandler(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid) {
         String streamId = resonse.getStr("stream");
         JSONArray tracks = resonse.getJSONArray("tracks");
         return  mediaServerItemService.getStreamInfoByAppAndStream(mediaServerItem,"rtp", streamId, tracks)

+ 12 - 22
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/message/notify/CatalogResponseMessageProcessor.java

@@ -35,9 +35,8 @@ import java.util.*;
 @AllArgsConstructor
 public class CatalogResponseMessageProcessor extends MessageHandlerAbstract {
 
-    private final LocalMediaDeviceService mediaDeviceService;
     private final LocalMediaDeviceChannelService deviceChannelService;
-    private final ClusterEventBus clusterEventBus;
+    private final DeferredResultHolder deferredResultHolder;
     @Override
     public String getMethod() {
         return "Catalog";
@@ -79,35 +78,26 @@ public class CatalogResponseMessageProcessor extends MessageHandlerAbstract {
                     channelList.add(deviceChannel);
                 }
                 //更新设备通道
-                Mono.fromRunnable(()->clusterEventBus.publish(topic,RequestMessage.of(device.getId(),null,null, WVPResult.of(HttpStatus.OK.value(),"", device))))
+                Mono.fromRunnable(()->{
+                    String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + device.getId();
+                    RequestMessage msg = new RequestMessage();
+                    msg.setKey(key);
+                    WVPResult<Object> result = new WVPResult<>();
+                    result.setCode(0);
+                    result.setData(device);
+                    result.setMsg("更新成功,共" + sumNum + "条");
+                    msg.setData(result);
+                    deferredResultHolder.invokeAllResult(msg);
+                })
                     .flatMap(__->
                         deviceChannelService.createDelete()
                             .where(MediaDeviceChannel::getDeviceId,device.getId())
                             .execute())
                     .flatMap(__->deviceChannelService.save(channelList))
                     .subscribe();
-//                catalogDataCatch.put(key, sumNum, device, channelList);
-//                if (catalogDataCatch.get(key).size() == sumNum) {
-                // 数据已经完整接收
-//                    boolean resetChannelsResult = storager.resetChannels(device.getDeviceId(), catalogDataCatch.get(key));
-//                    RequestMessage msg = new RequestMessage();
-//                    msg.setKey(key);
-//                    if (resetChannelsResult) {
-//                        result.setMsg("更新成功,共" + sumNum + "条,已更新" + catalogDataCatch.get(key).size() + "条");
-//                    }else {
-//                        result.setMsg("接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(key).size() + "条");
-//                    }
-//                    msg.setData(result);
-//                    deferredResultHolder.invokeAllResult(msg);
-//                    catalogDataCatch.del(key);
             }
-
             // 回复200 OK
             responseAck(evt, Response.OK);
-//                if (offLineDetector.isOnline(device.getDeviceId())) {
-//                    publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_MESSAGE);
-//                }
-//            }
         } catch (Exception e) {
             log.error("接受媒体设备目录信息失败,",e);
         }

+ 83 - 83
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/transmit/callback/DeferredResultHolder.java

@@ -53,96 +53,96 @@ public class DeferredResultHolder {
 
     public static final String CALLBACK_CMD_BROADCAST = "CALLBACK_BROADCAST";
 
-    private static final String uniqueTopic="%s/%s/%s/%s";
-    private static final String matchTopics="%s/%s/%s/**";
+//    private static final String uniqueTopic="%s/%s/%s/%s";
+//    private static final String matchTopics="%s/%s/%s/**";
+//    /**
+//     * 通过设备id、通道id、msgid获取唯一监听主题
+//     * @param key
+//     * @param requestMessage
+//     * @return
+//     */
+//    public static String getTopicByMsgId(String key,RequestMessage requestMessage){
+//        return String.format(uniqueTopic,key,requestMessage.getDeviceId(),requestMessage.getChannelId(),requestMessage.getMsgId());
+//    }
+//
+//    /**
+//     * 根据设备id和通道id匹配所有监听主题
+//     * @param key
+//     * @param requestMessage
+//     * @return
+//     */
+//    public static String getCmdCallBackTopic(String key, RequestMessage requestMessage){
+//        return String.format(matchTopics,key,requestMessage.getDeviceId(),requestMessage.getChannelId());
+//    }
+    private Map<String, Map<String, EmitterProcessor<ResponseEntity>>> map = new ConcurrentHashMap<>();
+
+
     /**
-     * 通过设备id、通道id、msgid获取唯一监听主题
-     * @param key
-     * @param requestMessage
-     * @return
+     *
+     * @param key 数据类型
+     * @param id 数据信息id
+     * @param processor 数据处理流
      */
-    public static String getTopicByMsgId(String key,RequestMessage requestMessage){
-        return String.format(uniqueTopic,key,requestMessage.getDeviceId(),requestMessage.getChannelId(),requestMessage.getMsgId());
+    public void put(String key, String id, EmitterProcessor<ResponseEntity> processor) {
+        map.computeIfAbsent(key,k->new ConcurrentHashMap<>())
+            .put(id,processor);
+    }
+
+    public Flux<ResponseEntity> get(String key, String id) {
+        Map<String, EmitterProcessor<ResponseEntity>> processorMap = map.getOrDefault(key, new ConcurrentHashMap<>());
+        if (!processorMap.containsKey(id)) {
+            return Flux.empty();
+        }
+        return processorMap.get(id).map(Function.identity());
+    }
+
+    public boolean exist(String key, String id){
+        return map.containsKey(key)&&map.get(key).containsKey(id);
     }
 
     /**
-     * 根据设备id和通道id匹配所有监听主题
-     * @param key
-     * @param requestMessage
-     * @return
+     * 释放单个请求
+     * @param msg
      */
-    public static String getCmdCallBackTopic(String key, RequestMessage requestMessage){
-        return String.format(matchTopics,key,requestMessage.getDeviceId(),requestMessage.getChannelId());
+    public void invokeResult(RequestMessage msg) {
+        Map<String, EmitterProcessor<ResponseEntity>> deferredResultMap = map.get(msg.getKey());
+        if (deferredResultMap == null) {
+            return;
+        }
+        EmitterProcessor<ResponseEntity> result = deferredResultMap.get(msg.getId());
+        if (result == null) {
+            return;
+        }
+        if(!result.isDisposed()&&result.hasDownstreams()){
+            result.sink().next(ResponseEntity.ok(msg.getData()));
+        }
+
+        deferredResultMap.remove(msg.getId());
+        if (deferredResultMap.size() == 0) {
+            map.remove(msg.getKey());
+        }
     }
-//    private Map<String, Map<String, EmitterProcessor<ResponseEntity>>> map = new ConcurrentHashMap<>();
 
+    /**
+     * 释放所有的请求
+     * @param msg
+     */
+    public void invokeAllResult(RequestMessage msg) {
+        Map<String,  EmitterProcessor<ResponseEntity>> deferredResultMap = map.get(msg.getKey());
+        if (deferredResultMap == null) {
+            return;
+        }
+        Set<String> ids = deferredResultMap.keySet();
+        for (String id : ids) {
+            EmitterProcessor<ResponseEntity> result = deferredResultMap.get(id);
+            if (result == null) {
+                return;
+            }
+            if(!result.isDisposed()&&result.hasDownstreams()){
+                result.sink().next(ResponseEntity.ok(msg.getData()));
+            }
+        }
+        map.remove(msg.getKey());
 
-//    /**
-//     *
-//     * @param key 数据类型
-//     * @param id 数据信息id
-//     * @param processor 数据处理流
-//     */
-//    public void put(String key, String id, EmitterProcessor<ResponseEntity> processor) {
-//        map.computeIfAbsent(key,k->new ConcurrentHashMap<>())
-//            .put(id,processor);
-//    }
-//
-//    public Flux<ResponseEntity> get(String key, String id) {
-//        Map<String, EmitterProcessor<ResponseEntity>> processorMap = map.getOrDefault(key, new ConcurrentHashMap<>());
-//        if (!processorMap.containsKey(id)) {
-//            return Flux.empty();
-//        }
-//        return processorMap.get(id).map(Function.identity());
-//    }
-//
-//    public boolean exist(String key, String id){
-//        return map.containsKey(key)&&map.get(key).containsKey(id);
-//    }
-//
-//    /**
-//     * 释放单个请求
-//     * @param msg
-//     */
-//    public void invokeResult(RequestMessage msg) {
-//        Map<String, EmitterProcessor<ResponseEntity>> deferredResultMap = map.get(msg.getKey());
-//        if (deferredResultMap == null) {
-//            return;
-//        }
-//        EmitterProcessor<ResponseEntity> result = deferredResultMap.get(msg.getId());
-//        if (result == null) {
-//            return;
-//        }
-//        if(!result.isDisposed()&&result.hasDownstreams()){
-//            result.sink().next(ResponseEntity.ok(msg.getData()));
-//        }
-//
-//        deferredResultMap.remove(msg.getId());
-//        if (deferredResultMap.size() == 0) {
-//            map.remove(msg.getKey());
-//        }
-//    }
-//
-//    /**
-//     * 释放所有的请求
-//     * @param msg
-//     */
-//    public void invokeAllResult(RequestMessage msg) {
-//        Map<String,  EmitterProcessor<ResponseEntity>> deferredResultMap = map.get(msg.getKey());
-//        if (deferredResultMap == null) {
-//            return;
-//        }
-//        Set<String> ids = deferredResultMap.keySet();
-//        for (String id : ids) {
-//            EmitterProcessor<ResponseEntity> result = deferredResultMap.get(id);
-//            if (result == null) {
-//                return;
-//            }
-//            if(!result.isDisposed()&&result.hasDownstreams()){
-//                result.sink().next(ResponseEntity.ok(msg.getData()));
-//            }
-//        }
-//        map.remove(msg.getKey());
-//
-//    }
+    }
 }

+ 5 - 13
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/transmit/callback/RequestMessage.java

@@ -14,21 +14,13 @@ import java.io.Serializable;
  * @date:   2020年5月8日 下午1:09:18
  */
 @Data
-@AllArgsConstructor
+@AllArgsConstructor(staticName = "of")
 @NoArgsConstructor
 public class RequestMessage implements Serializable {
 
-	private String deviceId;
-	private String channelId;
-    private String msgId;
-	private Object data;
+    private String id;
 
-    /**
-     * 是否根据设备id和通道id完成该次请求
-     * false:根据设备id+通道id+msgId完成该次请求
-     */
-	private Boolean invoke;
-    public static  RequestMessage of(String deviceId, String channelId, String msgId, Object data) {
-        return new RequestMessage(deviceId,channelId,msgId,data,null);
-    }
+    private String key;
+
+    private Object data;
 }

+ 62 - 60
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/ZLMHttpHookListener.java

@@ -18,6 +18,7 @@ import org.jetlinks.community.media.storage.impl.RedisCacheStorageImpl;
 import org.jetlinks.community.media.zlm.dto.MediaItem;
 import org.jetlinks.community.media.zlm.dto.StreamPushItem;
 import org.jetlinks.community.media.zlm.dto.OriginType;
+import org.jetlinks.community.media.zlm.entity.MediaServerItem;
 import org.jetlinks.community.utils.ZlmSubscribeTopic;
 import org.jetlinks.core.cluster.ClusterEventBus;
 import org.jetlinks.core.event.EventBus;
@@ -88,51 +89,52 @@ public class ZLMHttpHookListener {
 //		return new ResponseEntity<String>(ret.toString(), HttpStatus.OK);
 //	}
 //
-//	/**
-//	 * 访问http文件服务器上hls之外的文件时触发。
-//	 *
-//	 */
-//	@ResponseBody
-//	@PostMapping(value = "/on_http_access", produces = "application/json;charset=UTF-8")
-//	public ResponseEntity<String> onHttpAccess(@RequestBody JSONObject json){
-//
-//		if (logger.isDebugEnabled()) {
-//			logger.debug("[ ZLM HOOK ]on_http_access API 调用,参数:" + json.toString());
-//		}
-//		String mediaServerId = json.getString("mediaServerId");
-//		JSONObject ret = new JSONObject();
-//		ret.put("code", 0);
-//		ret.put("err", "");
-//		ret.put("path", "");
-//		ret.put("second", 600);
-//		return new ResponseEntity<String>(ret.toString(), HttpStatus.OK);
-//	}
-//
-//	/**
-//	 * 播放器鉴权事件,rtsp/rtmp/http-flv/ws-flv/hls的播放都将触发此鉴权事件。
-//	 *
-//	 */
-//	@ResponseBody
-//	@PostMapping(value = "/on_play", produces = "application/json;charset=UTF-8")
-//	public ResponseEntity<String> onPlay(@RequestBody JSONObject json){
-//
-//		if (logger.isDebugEnabled()) {
-//			logger.debug("[ ZLM HOOK ]on_play API调用,参数:" + json.toString());
-//		}
-//		String mediaServerId = json.getString("mediaServerId");
-//		ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_play, json);
-//		if (subscribe != null ) {
-//			MediaServerItem mediaInfo = mediaServerService.getOneByServerId(mediaServerId);
+	/**
+	 * 访问http文件服务器上hls之外的文件时触发。
+	 *
+	 */
+	@ResponseBody
+	@PostMapping(value = "/on_http_access", produces = "application/json;charset=UTF-8")
+	public ResponseEntity<String> onHttpAccess(@RequestBody JSONObject json){
+
+		if (log.isDebugEnabled()) {
+            log.debug("[ ZLM HOOK ]on_http_access API 调用,参数:" + json.toString());
+		}
+		String mediaServerId = json.getStr("mediaServerId");
+		JSONObject ret = new JSONObject();
+		ret.put("code", 0);
+		ret.put("err", "");
+		ret.put("path", "");
+		ret.put("second", 600);
+		return new ResponseEntity<String>(ret.toString(), HttpStatus.OK);
+	}
+
+	/**
+	 * 播放器鉴权事件,rtsp/rtmp/http-flv/ws-flv/hls的播放都将触发此鉴权事件。
+	 *
+	 */
+	@ResponseBody
+	@PostMapping(value = "/on_play", produces = "application/json;charset=UTF-8")
+	public ResponseEntity<String> onPlay(@RequestBody JSONObject json){
+
+		if (log.isDebugEnabled()) {
+            log.debug("[ ZLM HOOK ]on_play API调用,参数:" + json.toString());
+		}
+		String mediaServerId = json.getStr("mediaServerId");
+		ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_play, json);
+		if (subscribe != null ) {
+			MediaServerItem mediaInfo = mediaServerItemService.getOneByServerId(mediaServerId);
+			//todo
 //			if (mediaInfo != null) {
 //				subscribe.response(mediaInfo, json);
 //			}
-//
-//		}
-//		JSONObject ret = new JSONObject();
-//		ret.put("code", 0);
-//		ret.put("msg", "success");
-//		return new ResponseEntity<String>(ret.toString(), HttpStatus.OK);
-//	}
+
+		}
+		JSONObject ret = new JSONObject();
+		ret.put("code", 0);
+		ret.put("msg", "success");
+		return new ResponseEntity<String>(ret.toString(), HttpStatus.OK);
+	}
 
     /**
      * rtsp/rtmp/rtp推流鉴权事件。
@@ -190,24 +192,24 @@ public class ZLMHttpHookListener {
 //		ret.put("msg", "success");
 //		return new ResponseEntity<String>(ret.toString(), HttpStatus.OK);
 //	}
-//
-//	/**
-//	 * rtsp专用的鉴权事件,先触发on_rtsp_realm事件然后才会触发on_rtsp_auth事件。
-//	 *
-//	 */
-//	@ResponseBody
-//	@PostMapping(value = "/on_rtsp_realm", produces = "application/json;charset=UTF-8")
-//	public ResponseEntity<String> onRtspRealm(@RequestBody JSONObject json){
-//
-//		if (logger.isDebugEnabled()) {
-//			logger.debug("[ ZLM HOOK ]on_rtsp_realm API调用,参数:" + json.toString());
-//		}
-//		String mediaServerId = json.getString("mediaServerId");
-//		JSONObject ret = new JSONObject();
-//		ret.put("code", 0);
-//		ret.put("realm", "");
-//		return new ResponseEntity<String>(ret.toString(), HttpStatus.OK);
-//	}
+
+	/**
+	 * rtsp专用的鉴权事件,先触发on_rtsp_realm事件然后才会触发on_rtsp_auth事件。
+	 *
+	 */
+	@ResponseBody
+	@PostMapping(value = "/on_rtsp_realm", produces = "application/json;charset=UTF-8")
+	public ResponseEntity<String> onRtspRealm(@RequestBody JSONObject json){
+
+		if (log.isDebugEnabled()) {
+            log.debug("[ ZLM HOOK ]on_rtsp_realm API调用,参数:" + json.toString());
+		}
+		String mediaServerId = json.getStr("mediaServerId");
+		JSONObject ret = new JSONObject();
+		ret.put("code", 0);
+		ret.put("realm", "");
+		return new ResponseEntity<String>(ret.toString(), HttpStatus.OK);
+	}
 //
 //
 //	/**

+ 10 - 4
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/ZLMRunner.java

@@ -38,9 +38,6 @@ public class ZLMRunner implements CommandLineRunner {
 
     private final ZLMHttpHookSubscribe hookSubscribe;
 
-//    @Autowired
-//    private EventPublisher publisher;
-
     private final LocalMediaServerItemService mediaServerItemService;
 
     private final MediaConfig mediaConfig;
@@ -58,7 +55,16 @@ public class ZLMRunner implements CommandLineRunner {
             .mergeWith(mediaServerItemService.clearMediaServerForOnline().then(Mono.empty()))
             .mergeWith(Mono.delay(Duration.ofSeconds(20)).flatMap(ignore->timeOutHandle()).then(Mono.empty()))
             //更新默认服务器配置
-            .flatMap(mediaServerItemService::save)
+            .flatMap(defaultMedia->{
+                //判断默认服务器是否发生变动
+                MediaServerItem mediaSerItem = mediaConfig.getMediaSerItem();
+                if(mediaSerItem.equals(defaultMedia)){
+                    mediaSerItem.setDefaultServer(true);
+                    return mediaServerItemService.deleteById(defaultMedia.getId())
+                        .concatWith(mediaServerItemService.save(mediaSerItem).then(Mono.empty()));
+                }
+                return Mono.just(defaultMedia);
+            })
 
             //订阅 zlm启动事件
             .doOnNext(ignore->subscribeOnServerStarted())

+ 31 - 1
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/entity/MediaServerItem.java

@@ -18,12 +18,13 @@ import javax.persistence.Column;
 import javax.persistence.Index;
 import javax.persistence.Table;
 import java.sql.JDBCType;
+import java.util.Objects;
 
 @Data
 @Table(name = "media_server_item")
 public class MediaServerItem extends GenericEntity<String> {
 
-    private static Long serialVersionUID=1L;
+    private static final Long serialVersionUID=1L;
     @Column(name = "server_id")
     @Schema(
         description = "服务器id"
@@ -145,4 +146,33 @@ public class MediaServerItem extends GenericEntity<String> {
         sendRtpPortRange = "30000,30500"; // 默认使用30000,30500作为级联时发送流的端口号
         recordAssistPort = 0; // 默认关闭
     }
+
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        MediaServerItem that = (MediaServerItem) o;
+        return getHttpPort() == that.getHttpPort() &&
+            getHttpSSlPort() == that.getHttpSSlPort() &&
+            getRtmpPort() == that.getRtmpPort() &&
+            getRtmpSSlPort() == that.getRtmpSSlPort() &&
+            getRtpProxyPort() == that.getRtpProxyPort() &&
+            getRtspPort() == that.getRtspPort() &&
+            getRtspSSLPort() == that.getRtspSSLPort() &&
+            isAutoConfig() == that.isAutoConfig() &&
+            getRecordAssistPort() == that.getRecordAssistPort() &&
+            Objects.equals(getIp(), that.getIp()) &&
+            Objects.equals(getHookIp(), that.getHookIp()) &&
+            Objects.equals(getSdpIp(), that.getSdpIp()) &&
+            Objects.equals(getStreamIp(), that.getStreamIp()) &&
+            Objects.equals(getSecret(), that.getSecret()) &&
+            Objects.equals(getRtpPortRange(), that.getRtpPortRange()) &&
+            Objects.equals(getSendRtpPortRange(), that.getSendRtpPortRange());
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(getIp(), getHookIp(), getSdpIp(), getStreamIp(), getHttpPort(), getHttpSSlPort(), getRtmpPort(), getRtmpSSlPort(), getRtpProxyPort(), getRtspPort(), getRtspSSLPort(), isAutoConfig(), getSecret(), getRtpPortRange(), getSendRtpPortRange(), getRecordAssistPort());
+    }
 }

+ 1 - 1
jetlinks-standalone/src/main/resources/application.yml

@@ -233,7 +233,7 @@ visual:
 #zlm 默认服务器配置
 media:
   # [必须修改] zlm服务器的内网IP
-  ip: 1.15.89.83
+  ip: 192.168.100.32
   # [必须修改] zlm服务器的http.port
   http-port: 8080
   # [可选] zlm服务器的hook.admin_params=secret