18339543638 3 éve
szülő
commit
39096e6374

+ 5 - 0
jetlinks-manager/media-manager/pom.xml

@@ -20,6 +20,11 @@
             <version>1.3.0-91</version>
         </dependency>
 
+        <!--<dependency>-->
+            <!--<groupId>javax.servlet</groupId>-->
+            <!--<artifactId>servlet-api</artifactId>-->
+        <!--</dependency>-->
+
         <dependency>
             <groupId>com.squareup.okhttp3</groupId>
             <artifactId>okhttp</artifactId>

+ 28 - 8
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/controller/PlayController.java

@@ -10,6 +10,7 @@ import org.hswebframework.web.authorization.annotation.Authorize;
 import org.hswebframework.web.authorization.annotation.QueryAction;
 import org.hswebframework.web.authorization.annotation.Resource;
 import org.hswebframework.web.exception.BusinessException;
+import org.jetlinks.community.media.gb28181.result.PlayResult;
 import org.jetlinks.community.media.gb28181.result.WVPResult;
 import org.jetlinks.community.media.service.LocalMediaDeviceService;
 import org.jetlinks.community.media.service.LocalPlayService;
@@ -18,6 +19,7 @@ import org.jetlinks.core.event.Subscription;
 import org.jetlinks.core.event.TopicPayload;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
 import org.springframework.web.bind.annotation.*;
 import org.springframework.web.context.request.async.DeferredResult;
 import reactor.core.publisher.EmitterProcessor;
@@ -27,10 +29,7 @@ import reactor.core.publisher.Mono;
 
 import java.time.Duration;
 import java.util.Arrays;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.*;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
@@ -90,7 +89,7 @@ public class PlayController {
                 .flatMap(playService::getNewMediaServerItem)
                 .switchIfEmpty(Mono.error(new BusinessException("未找到可用的zlm媒体服务器")))
                 .flatMap(mediaServerItem ->playService.play(mediaServerItem, deviceId, channelId, null, null))
-                .flatMap(playResult -> deferredResultHandler(playResult.getResult()));
+                .flatMap(this::deferredResultHandler);
     }
 
 //
@@ -345,15 +344,36 @@ public class PlayController {
 //	}
 //
 
-    private Mono<Object> deferredResultHandler(DeferredResult result) {
+    private Mono<Object> deferredResultHandler(PlayResult playResult) {
+        DeferredResult<ResponseEntity<String>> result = playResult.getResult();
         //最长1分钟
         long currentTimeMillis = System.currentTimeMillis();
         while (!result.isSetOrExpired()){
-            if((System.currentTimeMillis()-currentTimeMillis)>3*1000){
+            if((System.currentTimeMillis()-currentTimeMillis)>60*1000){
                 return Mono.error(new TimeoutException("请求超时"));
             }
         }
-        return  Mono.justOrEmpty(result.getResult());
+        Object data = result.getResult();
+        if(data instanceof ResponseEntity){
+            ResponseEntity response= (ResponseEntity) data;
+            Object body = response.getBody();
+            if(body instanceof WVPResult){
+                WVPResult wvpResult= (WVPResult) body;
+                if(wvpResult.getCode()==0){
+                    if(playResult.getComplete()!=null){
+                        return playResult.getComplete().thenReturn(wvpResult.getData()).defaultIfEmpty(new WVPResult<>());
+                    }
+                    return Mono.justOrEmpty(wvpResult.getData());
+                }else {
+                    if(playResult.getTimeout()!=null){
+                        return playResult.getTimeout().then(Mono.error(new BusinessException(wvpResult.getMsg())));
+                    }
+                    return Mono.error(new BusinessException(wvpResult.getMsg()));
+                }
+            }
+            return Mono.error(new BusinessException("返回结果类型不为Response,请联系工作人员查看"));
+        }
+        return Mono.error(new BusinessException("返回结果类型不为WvpResult,请联系工作人员查看"));
     }
 }
 

+ 12 - 0
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/gb28181/result/PlayResult.java

@@ -4,6 +4,7 @@ import lombok.Data;
 import org.jetlinks.community.media.entity.MediaDevice;
 import org.springframework.http.ResponseEntity;
 import org.springframework.web.context.request.async.DeferredResult;
+import reactor.core.publisher.Mono;
 
 @Data
 public class PlayResult {
@@ -12,4 +13,15 @@ public class PlayResult {
     private String uuid;
 
     private MediaDevice device;
+
+    private Mono<Void> complete;
+    private Mono<Void> timeout;
+
+    public void onComplete( Mono<Void> complete){
+        this.complete=complete;
+    }
+
+    public void onTimeout( Mono<Void> timeout){
+        this.timeout=timeout;
+    }
 }

+ 3 - 0
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalMediaDeviceChannelService.java

@@ -4,6 +4,7 @@ import lombok.AllArgsConstructor;
 import org.hswebframework.web.crud.service.GenericReactiveCrudService;
 import org.jetlinks.community.media.bean.StreamInfo;
 import org.jetlinks.community.media.entity.MediaDeviceChannel;
+import org.jetlinks.community.media.enums.DeviceState;
 import org.jetlinks.community.media.storage.impl.RedisCacheStorageImpl;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
@@ -26,6 +27,7 @@ public class LocalMediaDeviceChannelService extends GenericReactiveCrudService<M
     public Mono<Void> startPlay(String deviceId, String channelId, String streamId) {
         return this.createUpdate()
             .set(MediaDeviceChannel::getStreamId,streamId)
+            .set(MediaDeviceChannel::getStatus, DeviceState.online)
             .where(MediaDeviceChannel::getDeviceId,deviceId)
             .where(MediaDeviceChannel::getChannelId,channelId)
             .execute()
@@ -35,6 +37,7 @@ public class LocalMediaDeviceChannelService extends GenericReactiveCrudService<M
     public Mono<Void> stopPlay(String deviceId, String channelId) {
         return this.createUpdate()
             .set(MediaDeviceChannel::getStreamId,null)
+            .set(MediaDeviceChannel::getStatus, DeviceState.offline)
             .where(MediaDeviceChannel::getDeviceId,deviceId)
             .where(MediaDeviceChannel::getChannelId,channelId)
             .execute()

+ 16 - 2
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalMediaServerItemService.java

@@ -2,6 +2,7 @@ package org.jetlinks.community.media.service;
 
 
 import cn.hutool.core.util.StrUtil;
+import cn.hutool.json.JSON;
 import cn.hutool.json.JSONArray;
 import cn.hutool.json.JSONObject;
 import cn.hutool.json.JSONUtil;
@@ -374,6 +375,7 @@ public class LocalMediaServerItemService extends GenericReactiveCrudService<Medi
                 }
                 serverItem.setStatus(true);
             })
+            .doOnNext(this::refreshServerId)
             //更新zlm服务器信息
             .doOnNext(serverItem->{
                 if (StrUtil.isEmpty(serverItem.getServerId())) {
@@ -428,6 +430,13 @@ public class LocalMediaServerItemService extends GenericReactiveCrudService<Medi
 //    }
 //
 
+    private void refreshServerId(MediaServerItem serverItem){
+        JSONObject mediaServerConfig = zlmresTfulUtils.getMediaServerConfig(serverItem);
+        JSONObject data = JSONUtil.parseObj(JSONUtil.parseArray(mediaServerConfig.get("data")).get(0));
+        serverItem.setServerId(String.valueOf( data.get("general.mediaServerId")));
+    }
+
+
     private void resetOnlineServerItem(MediaServerItem serverItem) {
         // 更新缓存
         String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + serverId;
@@ -511,7 +520,7 @@ public class LocalMediaServerItemService extends GenericReactiveCrudService<Medi
         param.put("api.secret",mediaServerItem.getSecret()); // -profile:v Baseline
         param.put("ffmpeg.cmd","%s -fflags nobuffer -i %s -c:a aac -strict -2 -ar 44100 -ab 48k -c:v libx264  -f flv %s");
         param.put("ffmpeg.bin","/usr/local/ffmpeg/bin/ffmpeg");
-        param.put("ffmpeg.snap","%25s -rtsp_transport tcp -i %25s -y -f mjpeg -t 0.1 %25s");
+        param.put("ffmpeg.snap","%25s -i %25s -y -f mjpeg -t 0.001 %25s");
         param.put("hook.enable","1");
         param.put("hook.on_flow_report","");
         param.put("hook.on_play",String.format("%s/on_play", hookPrex));
@@ -691,7 +700,12 @@ public class LocalMediaServerItemService extends GenericReactiveCrudService<Medi
         if (mediaInfo.getRtmpSSlPort() != 0) {
             streamInfoResult.setRtmps(String.format("rtmps://%s:%s/%s/%s", addr, mediaInfo.getRtmpSSlPort(), app,  stream));
         }
-        streamInfoResult.setRtsp(String.format("rtsp://%s:%s/%s/%s", addr, mediaInfo.getRtspPort(), app,  stream));
+        if(mediaInfo.getRtspPort()!=80){
+            streamInfoResult.setRtsp(String.format("rtsp://%s:%s/%s/%s", addr, mediaInfo.getRtspPort(), app,  stream));
+        }else {
+            streamInfoResult.setRtsp(String.format("rtsp://%s/%s/%s", addr, app,  stream));
+        }
+
         if (mediaInfo.getRtspSSLPort() != 0) {
             streamInfoResult.setRtsps(String.format("rtsps://%s:%s/%s/%s", addr, mediaInfo.getRtspSSLPort(), app,  stream));
         }

+ 19 - 21
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalPlayService.java

@@ -61,10 +61,6 @@ public class LocalPlayService  {
 
     private final LocalMediaServerItemService mediaServerItemService;
 
-    private final EventBus eventBus;
-
-    private final ClusterEventBus clusterEventBus;
-
     private final DeferredResultHolder resultHolder;
 
     private final VideoStreamSessionManager streamSessionManager;
@@ -81,11 +77,12 @@ public class LocalPlayService  {
         RequestMessage msg = new RequestMessage();
         String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId;
         msg.setKey(key);
-        String uuid = java.util.UUID.randomUUID().toString();
+        String uuid = UUID.randomUUID().toString();
         msg.setId(uuid);
         playResult.setUuid(uuid);
         DeferredResult<ResponseEntity<String>> result = new DeferredResult<>(userSetup.getPlayTimeout());
         playResult.setResult(result);
+        resultHolder.put(key, uuid, result);
         if (mediaServerItem == null) {
             WVPResult wvpResult = new WVPResult();
             wvpResult.setCode(-1);
@@ -99,7 +96,7 @@ public class LocalPlayService  {
         StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
         playResult.setDevice(device);
         // 超时处理
-        result.onTimeout(()->{
+        playResult.onTimeout(Mono.fromRunnable(()->{
             log.warn(String.format("设备点播超时,deviceId:%s ,channelId:%s", deviceId, channelId));
             WVPResult wvpResult = new WVPResult();
             wvpResult.setCode(-1);
@@ -119,8 +116,8 @@ public class LocalPlayService  {
                     // 回复之前所有的点播请求
                     resultHolder.invokeAllResult(msg))
                 .subscribe();
-        });
-        result.onCompletion(()->{
+        }));
+        playResult.onComplete(Mono.fromRunnable(()->{
             // 点播结束时调用截图接口
             String fileName =  deviceId + "_" + channelId + ".jpg";
             ResponseEntity responseEntity =  (ResponseEntity)result.getResult();
@@ -146,7 +143,7 @@ public class LocalPlayService  {
                                 if(System.getProperty("os.name").contains("indows")) {
                                     path = path.substring(1);
                                 }
-                                String streamUrl = streamInfoForSuccess.getFlv();
+                                String streamUrl = streamInfoForSuccess.getRtsp();
                                 // 请求截图
                                 log.info("[请求截图]: " + fileName);
                                 zlmresTfulUtils.getSnap(mediaInfo, streamUrl, 15, 1, path, fileName);
@@ -157,7 +154,7 @@ public class LocalPlayService  {
                         .subscribe();
                 }
             }
-        });
+        }));
         if (streamInfo == null) {
             SSRCInfo ssrcInfo;
             String streamId = null;
@@ -243,33 +240,34 @@ public class LocalPlayService  {
         }
     }
 
-
     private Mono<Void> onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid) {
         RequestMessage msg = new RequestMessage();
         msg.setId(uuid);
         msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId);
         return onPublishHandler(mediaServerItem, resonse, deviceId, channelId, uuid)
+            .doOnNext(streamInfo -> {
+                redisCatchStorage.startPlay(streamInfo);
+                msg.setData(JSON.toJSONString(streamInfo));
+                WVPResult wvpResult = new WVPResult();
+                wvpResult.setCode(0);
+                wvpResult.setMsg("success");
+                wvpResult.setData(streamInfo);
+                msg.setData(wvpResult);
+                resultHolder.invokeAllResult(msg);
+            })
             .flatMap(streamInfo ->
                 deviceChannelService.createQuery()
                     .where(MediaDeviceChannel::getChannelId,channelId)
                     .where(MediaDeviceChannel::getDeviceId,deviceId)
-                    .fetchOne()
-                    .defaultIfEmpty(new MediaDeviceChannel())
+                    .fetch()
                     .flatMap(deviceChannel->{
-                        redisCatchStorage.startPlay(streamInfo);
-                        msg.setData(JSON.toJSONString(streamInfo));
-                        WVPResult wvpResult = new WVPResult();
-                        wvpResult.setCode(0);
-                        wvpResult.setMsg("success");
-                        wvpResult.setData(streamInfo);
-                        msg.setData(wvpResult);
-                        resultHolder.invokeAllResult(msg);
                         if (StrUtil.isNotEmpty(deviceChannel.getId())) {
                             deviceChannel.setStreamId(streamInfo.getStreamId());
                             return deviceChannelService.startPlay(deviceId, channelId, streamInfo.getStreamId());
                         }
                         return Mono.empty();
                     })
+                    .then()
             )
             .switchIfEmpty(Mono.fromRunnable(()->{
                 log.warn("设备预览API调用失败!");

+ 8 - 0
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/storage/impl/RedisCacheStorageImpl.java

@@ -468,6 +468,14 @@ public class RedisCacheStorageImpl {
         return (MediaDevice)redis.get(key);
     }
 
+    public MediaServerItem getMediaServerItem(String mediaServerId) {
+        if (mediaServerId == null) {
+            return null;
+        }
+        String key = VideoManagerConstants.MEDIA_SERVER_PREFIX +serverId + "_" + mediaServerId;
+        return (MediaServerItem)redis.get(key);
+    }
+
     public List<MediaDevice> getAllOnlineDevice() {
         List<Object> scan = redis.scan(VideoManagerConstants.DEVICE_PREFIX + "*");
         List<MediaDevice> mediaDevices = new ArrayList<>();

+ 34 - 55
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/transmit/callback/DeferredResultHolder.java

@@ -1,16 +1,15 @@
 package org.jetlinks.community.media.transmit.callback;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.jetlinks.community.media.transmit.callback.RequestMessage;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
 import org.springframework.stereotype.Component;
 import org.springframework.web.context.request.async.DeferredResult;
-import reactor.core.publisher.EmitterProcessor;
-import reactor.core.publisher.Flux;
-
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Function;
 
 /**
  * @description: 异步请求处理
@@ -53,51 +52,36 @@ public class DeferredResultHolder {
 
     public static final String CALLBACK_CMD_BROADCAST = "CALLBACK_BROADCAST";
 
-//    private static final String uniqueTopic="%s/%s/%s/%s";
-//    private static final String matchTopics="%s/%s/%s/**";
-//    /**
-//     * 通过设备id、通道id、msgid获取唯一监听主题
-//     * @param key
-//     * @param requestMessage
-//     * @return
-//     */
-//    public static String getTopicByMsgId(String key,RequestMessage requestMessage){
-//        return String.format(uniqueTopic,key,requestMessage.getDeviceId(),requestMessage.getChannelId(),requestMessage.getMsgId());
-//    }
-//
-//    /**
-//     * 根据设备id和通道id匹配所有监听主题
-//     * @param key
-//     * @param requestMessage
-//     * @return
-//     */
-//    public static String getCmdCallBackTopic(String key, RequestMessage requestMessage){
-//        return String.format(matchTopics,key,requestMessage.getDeviceId(),requestMessage.getChannelId());
-//    }
-    private Map<String, Map<String, EmitterProcessor<ResponseEntity>>> map = new ConcurrentHashMap<>();
+    private Map<String, Map<String, DeferredResult>> map = new ConcurrentHashMap<>();
 
 
-    /**
-     *
-     * @param key 数据类型
-     * @param id 数据信息id
-     * @param processor 数据处理流
-     */
-    public void put(String key, String id, EmitterProcessor<ResponseEntity> processor) {
-        map.computeIfAbsent(key,k->new ConcurrentHashMap<>())
-            .put(id,processor);
+    public void put(String key, String id, DeferredResult result) {
+        Map<String, DeferredResult> deferredResultMap = map.get(key);
+        if (deferredResultMap == null) {
+            deferredResultMap = new ConcurrentHashMap<>();
+            map.put(key, deferredResultMap);
+        }
+        deferredResultMap.put(id, result);
     }
 
-    public Flux<ResponseEntity> get(String key, String id) {
-        Map<String, EmitterProcessor<ResponseEntity>> processorMap = map.getOrDefault(key, new ConcurrentHashMap<>());
-        if (!processorMap.containsKey(id)) {
-            return Flux.empty();
+    public DeferredResult get(String key, String id) {
+        Map<String, DeferredResult> deferredResultMap = map.get(key);
+        if (deferredResultMap == null) {
+            return null;
         }
-        return processorMap.get(id).map(Function.identity());
+        return deferredResultMap.get(id);
     }
 
     public boolean exist(String key, String id){
-        return map.containsKey(key)&&map.get(key).containsKey(id);
+        if (key == null) {
+            return false;
+        }
+        Map<String, DeferredResult> deferredResultMap = map.get(key);
+        if (id == null) {
+            return deferredResultMap != null;
+        }else {
+            return deferredResultMap != null && deferredResultMap.get(id) != null;
+        }
     }
 
     /**
@@ -105,18 +89,15 @@ public class DeferredResultHolder {
      * @param msg
      */
     public void invokeResult(RequestMessage msg) {
-        Map<String, EmitterProcessor<ResponseEntity>> deferredResultMap = map.get(msg.getKey());
+        Map<String, DeferredResult> deferredResultMap = map.get(msg.getKey());
         if (deferredResultMap == null) {
             return;
         }
-        EmitterProcessor<ResponseEntity> result = deferredResultMap.get(msg.getId());
+        DeferredResult result = deferredResultMap.get(msg.getId());
         if (result == null) {
             return;
         }
-        if(!result.isDisposed()&&result.hasDownstreams()){
-            result.sink().next(ResponseEntity.ok(msg.getData()));
-        }
-
+        result.setResult(new ResponseEntity<>(msg.getData(),HttpStatus.OK));
         deferredResultMap.remove(msg.getId());
         if (deferredResultMap.size() == 0) {
             map.remove(msg.getKey());
@@ -128,19 +109,17 @@ public class DeferredResultHolder {
      * @param msg
      */
     public void invokeAllResult(RequestMessage msg) {
-        Map<String,  EmitterProcessor<ResponseEntity>> deferredResultMap = map.get(msg.getKey());
+        Map<String, DeferredResult> deferredResultMap = map.get(msg.getKey());
         if (deferredResultMap == null) {
             return;
         }
         Set<String> ids = deferredResultMap.keySet();
         for (String id : ids) {
-            EmitterProcessor<ResponseEntity> result = deferredResultMap.get(id);
+            DeferredResult result = deferredResultMap.get(id);
             if (result == null) {
                 return;
             }
-            if(!result.isDisposed()&&result.hasDownstreams()){
-                result.sink().next(ResponseEntity.ok(msg.getData()));
-            }
+            result.setResult(ResponseEntity.ok().body(msg.getData()));
         }
         map.remove(msg.getKey());
 

+ 84 - 57
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/transmit/cmd/SipCommander.java

@@ -8,11 +8,10 @@ import gov.nist.javax.sip.stack.SIPDialog;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.hswebframework.web.exception.BusinessException;
-import org.jetlinks.community.media.bean.EventResult;
 import org.jetlinks.community.media.bean.SSRCInfo;
+import org.jetlinks.community.media.config.UserSetup;
 import org.jetlinks.community.media.entity.MediaDevice;
 import org.jetlinks.community.media.bean.SipServerConfig;
-import org.jetlinks.community.media.gb28181.bean.SsrcTransaction;
 import org.jetlinks.community.media.gb28181.event.SipSubscribe;
 import org.jetlinks.community.media.service.LocalMediaServerItemService;
 import org.jetlinks.community.media.session.VideoStreamSessionManager;
@@ -20,24 +19,19 @@ import org.jetlinks.community.media.sip.SipContext;
 import org.jetlinks.community.media.transmit.SIPRequestHeaderProvider;
 import org.jetlinks.community.media.zlm.ZLMHttpHookSubscribe;
 import org.jetlinks.community.media.zlm.entity.MediaServerItem;
-import org.jetlinks.community.utils.ZlmSubscribeTopic;
-import org.jetlinks.core.cluster.ClusterEventBus;
 import org.jetlinks.core.event.EventBus;
 import org.jetlinks.core.event.Subscription;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.*;
-
 import javax.sip.*;
 import javax.sip.address.SipURI;
 import javax.sip.header.CallIdHeader;
-import javax.sip.header.ViaHeader;
 import javax.sip.message.Request;
 import java.lang.reflect.Field;
 import java.text.ParseException;
 import java.time.Duration;
 import java.util.HashSet;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * @author lifang
@@ -53,15 +47,16 @@ public class SipCommander {
 
     private final SIPRequestHeaderProvider headerProvider;
 
-    private final ClusterEventBus clusterEventBus;
-
     private final EventBus eventBus;
-
+    private final ZLMHttpHookSubscribe subscribe;
     private final LocalMediaServerItemService mediaServerItemService;
     private final VideoStreamSessionManager streamSessionManager;
+    private final UserSetup userSetup;
+
+    private final SipSubscribe sipSubscribe;
 
     public Mono<Void> playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, MediaDevice device, String channelId,
-                                    ZLMHttpHookSubscribe.Event event, SipSubscribe.Event sipEvent) {
+                                    ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent) {
         String streamId = ssrcInfo.getStreamId();
         SipServerConfig sipConfig = SipContext.getConfig();
         try {
@@ -79,6 +74,19 @@ public class SipCommander {
             content.append("s=Play\r\n");
             content.append("c=IN IP4 "+ mediaServerItem.getSdpIp() +"\r\n");
             content.append("t=0 0\r\n");
+            JSONObject subscribeKey = new JSONObject()
+                .putOpt("app", "rtp")
+                .putOpt("stream", streamId)
+                .putOpt("regist", true)
+                .putOpt("mediaServerId", mediaServerItem.getServerId());
+            subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
+                (MediaServerItem mediaServerItemInUse, JSONObject json)->{
+                    if (userSetup.isWaitTrack() && json.getJSONArray("tracks") == null) {
+                        return;
+                    }
+                    event.accept(mediaServerItemInUse, JSONUtil.parseObj(json));
+                    subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey);
+                });
 
             //是否需要扩展sdp
 //            if (userSetup.isSeniorSdp()) {
@@ -160,29 +168,15 @@ public class SipCommander {
 
 
             Request request =  headerProvider.createInviteRequest(device, channelId, content.toString(), null, "FromInvt" + tm, null, ssrcInfo.getSsrc(), callIdHeader);
-            String topic= ZlmSubscribeTopic.getOnStreamChanged("rtp",streamId,mediaServerItem.getServerId());
-
-            return  eventBus.subscribe(Subscription.of("play_stream_cmd"+streamId,topic, Subscription.Feature.local))
-                .flatMap(topicPayload -> Mono.just(topicPayload.bodyToJson(true)))
-                .flatMap(payload->{
-                    Boolean regist = payload.getBoolean("regist");
-                    if(Boolean.TRUE.equals(regist)){
-                        String mediaServerId = payload.getString("mediaServerId");
-                        return Mono.zip(Mono.just(mediaServerItemService.getOneByServerId(mediaServerId)),Mono.justOrEmpty(payload));
-                    }
-                    return Mono.empty();
-                })
-                .doOnNext(tp2->{
-                    try {
-                        event.accept(tp2.getT1(), JSONUtil.parseObj(tp2.getT2()));
-                    } catch (Exception e) {
-                        log.error("回调ZLM流改变函数失败,",e);
-                    }
-                })
-                .mergeWith(transmitRequest(sipProvider,request,null).then(Mono.empty()))
-                .then();
-
 
+            return transmitRequest(sipProvider, request, (e -> {
+                streamSessionManager.remove(device.getId(), channelId);
+                mediaServerItemService.releaseSsrc(mediaServerItem, ssrcInfo.getSsrc());
+                errorEvent.accept(e);
+            }), e ->{
+                streamSessionManager.put(device.getId(), channelId ,ssrcInfo.getSsrc(), streamId, mediaServerItem.getId(), ((ResponseEvent)e.event).getClientTransaction());
+                streamSessionManager.put(device.getId(), channelId , e.dialog);
+            });
         } catch ( SipException | ParseException | InvalidArgumentException e) {
             return Mono.error(new BusinessException("服务器异常"));
         }
@@ -241,11 +235,10 @@ public class SipCommander {
             ClientTransaction clientTransaction = sipProvider.getNewClientTransaction(byeRequest);
 
 
-//            CallIdHeader callIdHeader = (CallIdHeader) byeRequest.getHeader(CallIdHeader.NAME);
-            //todo
-//            if (okEvent != null) {
-//                sipSubscribe.addOkSubscribe(callIdHeader.getCallId(), okEvent);
-//            }
+            CallIdHeader callIdHeader = (CallIdHeader) byeRequest.getHeader(CallIdHeader.NAME);
+            if (okEvent != null) {
+                sipSubscribe.addOkSubscribe(callIdHeader.getCallId(), okEvent);
+            }
 
             dialog.sendRequest(clientTransaction);
 
@@ -286,7 +279,7 @@ public class SipCommander {
 
             Request request = headerProvider.createMessageRequest(device, catalogXml.toString(), "z9hG4bK-ViaDeviceInfo-" + tm, "FromDev" + tm, null, callIdHeader);
 
-            return transmitRequest(sipProvider, request,null);
+            return transmitRequest(sipProvider, request);
 
         } catch (SipException | ParseException | InvalidArgumentException e) {
             return Mono.error(new RuntimeException("查询媒体设备出错,",e));
@@ -317,31 +310,65 @@ public class SipCommander {
 
             Request request = headerProvider.createMessageRequest(device, catalogXml.toString(), "z9hG4bK-ViaCatalog-" + tm, "FromCat" + tm, null, callIdHeader);
 
-            //todo 失败处理
-            return transmitRequest(sipProvider, request,null);
+            return transmitRequest(sipProvider, request, errorEvent);
         } catch (SipException | ParseException | InvalidArgumentException e) {
             return Mono.error(new RuntimeException("查询设备目录列表失败,",e));
         }
     }
 
-    private Mono<Void> transmitRequest(SipProvider sipProvider, Request request, SipSubscribe.Event event) throws SipException {
+    private  Mono<Void> transmitRequest(SipProvider sipProvider, Request request, SipSubscribe.Event errorEvent) throws SipException {
+        return transmitRequest(sipProvider, request, errorEvent, null);
+    }
+
+    private Mono<Void> transmitRequest(SipProvider sipProvider, Request request) throws SipException {
+        return transmitRequest(sipProvider, request, null, null);
+    }
+
+    private Mono<Void> transmitRequest(SipProvider sipProvider, Request request, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) throws SipException {
         ClientTransaction clientTransaction = sipProvider.getNewClientTransaction(request);
         CallIdHeader callIdHeader = (CallIdHeader)request.getHeader(CallIdHeader.NAME);
-        clientTransaction.sendRequest();
-        return  handleReply(callIdHeader.getCallId(),Duration.ofSeconds(10),event);
+        // 添加错误订阅
+        if (errorEvent != null) {
+            sipSubscribe.addErrorSubscribe(callIdHeader.getCallId(), (eventResult -> {
+                errorEvent.accept(eventResult);
+                sipSubscribe.removeErrorSubscribe(eventResult.callId);
+            }));
+        }
+        // 添加订阅
+        if (okEvent != null) {
+            sipSubscribe.addOkSubscribe(callIdHeader.getCallId(), eventResult ->{
+                okEvent.accept(eventResult);
+                sipSubscribe.removeOkSubscribe(eventResult.callId);
+            });
+        }
+        return Mono.fromRunnable(()->{
+            try {
+                clientTransaction.sendRequest();
+            } catch (SipException e) {
+                log.error("向设备发送sip请求失败,",e);
+            }
+        });
     }
 
-    private Mono<Void> handleReply(String callId, Duration timeout,SipSubscribe.Event event) {
-        return eventBus.subscribe(Subscription.of("sip_reply",callId, Subscription.Feature.local))
-            .single()
-//            .timeout(timeout, Mono.error(() -> new BusinessException("设备响应超时")))
-            .flatMap(payload -> Mono.just(payload.bodyToJson(true)))
-            .filter(ignore->event!=null)
-            .doOnNext(jsonObject -> {
-                SipSubscribe.EventResult eventResult = jsonObject.toJavaObject(SipSubscribe.EventResult.class);
-                event.accept(eventResult);
-            })
-            .then();
-    }
+
+//    private Mono<Void> transmitRequest(SipProvider sipProvider, Request request, SipSubscribe.Event event) throws SipException {
+//        ClientTransaction clientTransaction = sipProvider.getNewClientTransaction(request);
+//        CallIdHeader callIdHeader = (CallIdHeader)request.getHeader(CallIdHeader.NAME);
+//        clientTransaction.sendRequest();
+//        return  handleReply(callIdHeader.getCallId(),Duration.ofSeconds(10),event);
+//    }
+//
+//    private Mono<Void> handleReply(String callId, Duration timeout,SipSubscribe.Event event) {
+//        return eventBus.subscribe(Subscription.of("sip_reply",callId, Subscription.Feature.local))
+//            .single()
+////            .timeout(timeout, Mono.error(() -> new BusinessException("设备响应超时")))
+//            .flatMap(payload -> Mono.just(payload.bodyToJson(true)))
+//            .filter(ignore->event!=null)
+//            .doOnNext(jsonObject -> {
+//                SipSubscribe.EventResult eventResult = jsonObject.toJavaObject(SipSubscribe.EventResult.class);
+//                event.accept(eventResult);
+//            })
+//            .then();
+//    }
 
 }

+ 126 - 103
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/ZLMHttpHookListener.java

@@ -4,9 +4,11 @@ import cn.hutool.core.bean.BeanUtil;
 import cn.hutool.core.util.StrUtil;
 import cn.hutool.json.JSONObject;
 import cn.hutool.json.JSONUtil;
+import com.alibaba.fastjson.JSON;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.hswebframework.web.authorization.annotation.Authorize;
+import org.jetlinks.community.media.config.UserSetup;
 import org.jetlinks.community.media.contanst.VideoManagerConstants;
 import org.jetlinks.community.media.entity.GbStream;
 import org.jetlinks.community.media.bean.StreamInfo;
@@ -51,24 +53,30 @@ public class ZLMHttpHookListener {
     private final LocalGbStreamService gbStreamService;
     private final ClusterEventBus clusterEventBus;
     private final String serverId="";
+    private final UserSetup userSetup;
     /**
      * 服务器定时上报时间,上报间隔可配置,默认10s上报一次
      *
      */
     @ResponseBody
     @PostMapping(value = "/on_server_keepalive", produces = "application/json;charset=UTF-8")
-    public Mono<ResponseEntity<String>> onServerKeepalive(@RequestBody JSONObject json){
+    public Mono<ResponseEntity<String>> onServerKeepalive(@RequestBody JSONObject json) throws Exception {
 
         if (log.isDebugEnabled()) {
             log.debug("[ ZLM HOOK ]on_server_keepalive API调用,参数:" + json.toString());
         }
+        String mediaServerId = json.getStr("mediaServerId");
 
+        List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(ZLMHttpHookSubscribe.HookType.on_server_keepalive);
+        if (subscribes != null  && subscribes.size() > 0) {
+            for (ZLMHttpHookSubscribe.Event subscribe : subscribes) {
+                subscribe.accept(null, json);
+            }
+        }
         JSONObject ret = new JSONObject()
-            .append("code", 0)
-            .append("msg", "success");
-        return eventBus.
-            publish(ZLMHttpHookSubscribe.HookType.on_server_keepalive.name(),json)
-            .thenReturn(ResponseEntity.ok(ret.toString()));
+            .putOpt("code", 0)
+            .putOpt("msg", "success");
+        return Mono.just(ResponseEntity.ok(ret.toString()));
     }
 
 //	/**
@@ -89,52 +97,52 @@ public class ZLMHttpHookListener {
 //		return new ResponseEntity<String>(ret.toString(), HttpStatus.OK);
 //	}
 //
-	/**
-	 * 访问http文件服务器上hls之外的文件时触发。
-	 *
-	 */
-	@ResponseBody
-	@PostMapping(value = "/on_http_access", produces = "application/json;charset=UTF-8")
-	public ResponseEntity<String> onHttpAccess(@RequestBody JSONObject json){
+    /**
+     * 访问http文件服务器上hls之外的文件时触发。
+     *
+     */
+    @ResponseBody
+    @PostMapping(value = "/on_http_access", produces = "application/json;charset=UTF-8")
+    public ResponseEntity<String> onHttpAccess(@RequestBody JSONObject json){
 
-		if (log.isDebugEnabled()) {
+        if (log.isDebugEnabled()) {
             log.debug("[ ZLM HOOK ]on_http_access API 调用,参数:" + json.toString());
-		}
-		String mediaServerId = json.getStr("mediaServerId");
-		JSONObject ret = new JSONObject();
-		ret.put("code", 0);
-		ret.put("err", "");
-		ret.put("path", "");
-		ret.put("second", 600);
-		return new ResponseEntity<String>(ret.toString(), HttpStatus.OK);
-	}
+        }
+        String mediaServerId = json.getStr("mediaServerId");
+        JSONObject ret = new JSONObject()
+            .putOpt("code", 0)
+            .putOpt("err", "")
+            .putOpt("path", "")
+            .putOpt("second", 600);
+        return new ResponseEntity<String>(ret.toString(), HttpStatus.OK);
+    }
 
-	/**
-	 * 播放器鉴权事件,rtsp/rtmp/http-flv/ws-flv/hls的播放都将触发此鉴权事件。
-	 *
-	 */
-	@ResponseBody
-	@PostMapping(value = "/on_play", produces = "application/json;charset=UTF-8")
-	public ResponseEntity<String> onPlay(@RequestBody JSONObject json){
+    /**
+     * 播放器鉴权事件,rtsp/rtmp/http-flv/ws-flv/hls的播放都将触发此鉴权事件。
+     *
+     */
+    @ResponseBody
+    @PostMapping(value = "/on_play", produces = "application/json;charset=UTF-8")
+    public ResponseEntity<String> onPlay(@RequestBody JSONObject json){
 
-		if (log.isDebugEnabled()) {
+        if (log.isDebugEnabled()) {
             log.debug("[ ZLM HOOK ]on_play API调用,参数:" + json.toString());
-		}
-		String mediaServerId = json.getStr("mediaServerId");
-		ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_play, json);
-		if (subscribe != null ) {
-			MediaServerItem mediaInfo = mediaServerItemService.getOneByServerId(mediaServerId);
-			//todo
+        }
+        String mediaServerId = json.getStr("mediaServerId");
+        ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_play, json);
+        if (subscribe != null ) {
+            MediaServerItem mediaInfo = mediaServerItemService.getOneByServerId(mediaServerId);
+            //todo
 //			if (mediaInfo != null) {
 //				subscribe.response(mediaInfo, json);
 //			}
 
-		}
-		JSONObject ret = new JSONObject();
-		ret.put("code", 0);
-		ret.put("msg", "success");
-		return new ResponseEntity<String>(ret.toString(), HttpStatus.OK);
-	}
+        }
+        JSONObject ret = new JSONObject()
+            .putOpt("code", 0)
+            .putOpt("msg", "success");
+        return new ResponseEntity<String>(ret.toString(), HttpStatus.OK);
+    }
 
     /**
      * rtsp/rtmp/rtp推流鉴权事件。
@@ -145,32 +153,32 @@ public class ZLMHttpHookListener {
     public ResponseEntity<String> onPublish(@RequestBody JSONObject json) {
 
         log.debug("[ ZLM HOOK ]on_publish API调用,参数:" + json.toString());
-        JSONObject ret = new JSONObject();
-        ret.put("code", 0);
-        ret.put("msg", "success");
-        ret.put("enableHls", true);
-//		ret.put("enableMP4", userSetup.isRecordPushLive());
-//		String mediaServerId = json.getString("mediaServerId");
-//		ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_publish, json);
-//		if (subscribe != null) {
-//			MediaServerItem mediaInfo = mediaServerService.getOneByServerId(mediaServerId);
-//			if (mediaInfo != null) {
-//				subscribe.response(mediaInfo, json);
-//			}else {
-//				ret.put("code", 1);
-//				ret.put("msg", "zlm not register");
-//			}
-//		}
-//	 	String app = json.getString("app");
-//	 	String stream = json.getString("stream");
-//		StreamInfo streamInfo = redisCatchStorage.queryPlaybackByStreamId(stream);
+        JSONObject ret = new JSONObject()
+            .putOpt("code", 0)
+            .putOpt("msg", "success")
+            .putOpt("enableHls", true)
+            .putOpt("enableMP4", userSetup.isRecordPushLive());
+//        String mediaServerId = json.getStr("mediaServerId");
+//        ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_publish, json);
+//        if (subscribe != null) {
+//            MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
+//            if (mediaInfo != null) {
+//                subscribe.accept(mediaInfo, json);
+//            }else {
+//                ret.put("code", 1);
+//                ret.put("msg", "zlm not register");
+//            }
+//        }
+//        String app = json.getStr("app");
+//        String stream = json.getStr("stream");
+//        StreamInfo streamInfo = redisCatchStorage.queryPlaybackByStreamId(stream);
 //
-//		// 录像回放时不进行录像下载
-//		if (streamInfo != null) {
-//			ret.put("enableMP4", false);
-//		}else {
-//			ret.put("enableMP4", userSetup.isRecordPushLive());
-//		}
+//        // 录像回放时不进行录像下载
+//        if (streamInfo != null) {
+//            ret.put("enableMP4", false);
+//        }else {
+//            ret.put("enableMP4", userSetup.isRecordPushLive());
+//        }
 
         return new ResponseEntity<String>(ret.toString(), HttpStatus.OK);
     }
@@ -193,23 +201,23 @@ public class ZLMHttpHookListener {
 //		return new ResponseEntity<String>(ret.toString(), HttpStatus.OK);
 //	}
 
-	/**
-	 * rtsp专用的鉴权事件,先触发on_rtsp_realm事件然后才会触发on_rtsp_auth事件。
-	 *
-	 */
-	@ResponseBody
-	@PostMapping(value = "/on_rtsp_realm", produces = "application/json;charset=UTF-8")
-	public ResponseEntity<String> onRtspRealm(@RequestBody JSONObject json){
+    /**
+     * rtsp专用的鉴权事件,先触发on_rtsp_realm事件然后才会触发on_rtsp_auth事件。
+     *
+     */
+    @ResponseBody
+    @PostMapping(value = "/on_rtsp_realm", produces = "application/json;charset=UTF-8")
+    public ResponseEntity<String> onRtspRealm(@RequestBody JSONObject json){
 
-		if (log.isDebugEnabled()) {
+        if (log.isDebugEnabled()) {
             log.debug("[ ZLM HOOK ]on_rtsp_realm API调用,参数:" + json.toString());
-		}
-		String mediaServerId = json.getStr("mediaServerId");
-		JSONObject ret = new JSONObject();
-		ret.put("code", 0);
-		ret.put("realm", "");
-		return new ResponseEntity<String>(ret.toString(), HttpStatus.OK);
-	}
+        }
+        String mediaServerId = json.getStr("mediaServerId");
+        JSONObject ret = new JSONObject()
+            .putOpt("code", 0)
+            .putOpt("realm", "");
+        return new ResponseEntity<String>(ret.toString(), HttpStatus.OK);
+    }
 //
 //
 //	/**
@@ -265,7 +273,7 @@ public class ZLMHttpHookListener {
      * rtsp/rtmp流注册或注销时触发此事件;此事件对回复不敏感。
      *
      */
-    @PostMapping(value = "/on_stream_changed")
+    @PostMapping(value = "/on_stream_changed", produces = "application/json;charset=UTF-8")
     public Mono<ResponseEntity<String>> onStreamChanged(@RequestBody MediaItem item){
         if (log.isDebugEnabled()) {
             log.debug("[ ZLM HOOK ]on_stream_changed API调用,参数:" + JSONUtil.toJsonStr(item));
@@ -278,11 +286,8 @@ public class ZLMHttpHookListener {
         //rtsp或rtmp
         String schema = item.getSchema();
         JSONObject json = JSONUtil.parseObj(item);
-        //todo 这里订阅/发布采用eventBus 模式进行
-        //on_stream_changed+{mediaServerId}+{streamId}+app
-        Mono<Long> result = Mono.fromRunnable(()->clusterEventBus.publish(ZlmSubscribeTopic.getOnStreamChanged(app, streamId, mediaServerId), json)).thenReturn(1L);
-        // 流消失移除redis play
 
+        Mono<Long> result = Mono.fromRunnable(()->{}).thenReturn(1L);
 
         boolean regist = item.isRegist();
         if ("rtmp".equals(schema)){
@@ -369,11 +374,11 @@ public class ZLMHttpHookListener {
                                 if (type != null) {
                                     // 发送流变化redis消息
                                     JSONObject jsonObject = new JSONObject()
-                                        .append("serverId", serverId)
-                                        .append("app", app)
-                                        .append("stream", streamId)
-                                        .append("register", regist)
-                                        .append("mediaServerId", mediaServerId);
+                                        .putOpt("serverId", serverId)
+                                        .putOpt("app", app)
+                                        .putOpt("stream", streamId)
+                                        .putOpt("register", regist)
+                                        .putOpt("mediaServerId", mediaServerId);
                                     if(mono!=null){
                                         String originType=type;
                                         mono=mono.doOnNext(___->clusterEventBus.publish(VideoManagerConstants.WVP_SERVER_STREAM_PREFIX+"_"+originType,jsonObject));
@@ -385,11 +390,25 @@ public class ZLMHttpHookListener {
                 }
             }
         }
+
         JSONObject ret = new JSONObject()
-            .append("code", 0)
-            .append("msg", "success");
+            .putOpt("code", 0)
+            .putOpt("msg", "success");
         return       result
-            .thenReturn(ResponseEntity.ok(ret.toString()));
+            .mergeWith(Mono.fromRunnable(()->{
+                ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, json);
+                if(subscribe!=null){
+                    MediaServerItem mediaServerItem = redisCatchStorage.getMediaServerItem(mediaServerId);
+                    if(mediaServerItem!=null){
+                        try {
+                            subscribe.accept(mediaServerItem,json);
+                        } catch (Exception e) {
+                            log.error("回调失败,",e);
+                        }
+                    }
+                }
+            }))
+            .then(Mono.just(ResponseEntity.ok(ret.toString())));
     }
 //
 //	/**
@@ -504,17 +523,21 @@ public class ZLMHttpHookListener {
      */
     @ResponseBody
     @PostMapping(value = "/on_server_started", produces = "application/json;charset=UTF-8")
-    public Mono<ResponseEntity<String>> onServerStarted(ServerWebExchange exchange, @RequestBody JSONObject jsonObject){
+    public Mono<ResponseEntity<String>> onServerStarted(ServerWebExchange exchange, @RequestBody JSONObject jsonObject) throws Exception {
         if (log.isDebugEnabled()) {
             log.debug("[ ZLM HOOK ]on_server_started API调用,参数:" + jsonObject.toString());
         }
         String remoteAddr = exchange.getRequest().getRemoteAddress().getAddress().toString();
-        jsonObject.append("ip", remoteAddr);
+        jsonObject.putOpt("ip", remoteAddr);
         JSONObject ret = new JSONObject()
-            .append("code", 0)
-            .append("msg", "success");
-        return eventBus.
-            publish(ZLMHttpHookSubscribe.HookType.on_server_started.name(),jsonObject)
-            .thenReturn(ResponseEntity.ok(ret.toString()));
+            .putOpt("code", 0)
+            .putOpt("msg", "success");
+        List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(ZLMHttpHookSubscribe.HookType.on_server_started);
+        if (subscribes != null  && subscribes.size() > 0) {
+            for (ZLMHttpHookSubscribe.Event subscribe : subscribes) {
+                subscribe.accept(null, jsonObject);
+            }
+        }
+        return Mono.just(ResponseEntity.ok(ret.toString()));
     }
 }

+ 3 - 3
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/ZLMRESTfulUtils.java

@@ -121,7 +121,7 @@ public class ZLMRESTfulUtils {
         log.info(request.toString());
         try {
             OkHttpClient client = new OkHttpClient.Builder()
-                    .readTimeout(10, TimeUnit.SECONDS)
+                    .readTimeout(30, TimeUnit.SECONDS)
                     .build();
             Response response = client.newCall(request).execute();
             if (response.isSuccessful()) {
@@ -275,9 +275,9 @@ public class ZLMRESTfulUtils {
         sendPost(mediaServerItem, "kick_sessions",param, null);
     }
 
-    public void getSnap(MediaServerItem mediaServerItem, String flvUrl, int timeout_sec, int expire_sec, String targetPath, String fileName) {
+    public void getSnap(MediaServerItem mediaServerItem, String url, int timeout_sec, int expire_sec, String targetPath, String fileName) {
         Map<String, Object> param = new HashMap<>();
-        param.put("url", flvUrl);
+        param.put("url", url);
         param.put("timeout_sec", timeout_sec);
         param.put("expire_sec", expire_sec);
         sendGetForImg(mediaServerItem, "getSnap", param, targetPath, fileName);

+ 23 - 29
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/ZLMRunner.java

@@ -4,7 +4,6 @@ import cn.hutool.core.util.StrUtil;
 import cn.hutool.json.JSONArray;
 import cn.hutool.json.JSONObject;
 import cn.hutool.json.JSONUtil;
-import com.alibaba.fastjson.JSON;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.jetlinks.community.media.config.MediaConfig;
@@ -42,8 +41,6 @@ public class ZLMRunner implements CommandLineRunner {
 
     private final MediaConfig mediaConfig;
 
-    private final EventBus eventBus;
-
     @Override
     public void run(String... strings)  {
 
@@ -58,10 +55,13 @@ public class ZLMRunner implements CommandLineRunner {
             .flatMap(defaultMedia->{
                 //判断默认服务器是否发生变动
                 MediaServerItem mediaSerItem = mediaConfig.getMediaSerItem();
-                if(mediaSerItem.equals(defaultMedia)){
+                if(!mediaSerItem.equals(defaultMedia)){
                     mediaSerItem.setDefaultServer(true);
                     return mediaServerItemService.deleteById(defaultMedia.getId())
-                        .concatWith(mediaServerItemService.save(mediaSerItem).then(Mono.empty()));
+                        .concatWith(
+                            mediaServerItemService.save(mediaSerItem).
+                                then(Mono.empty()))
+                        .then(Mono.just(mediaSerItem));
                 }
                 return Mono.just(defaultMedia);
             })
@@ -105,20 +105,16 @@ public class ZLMRunner implements CommandLineRunner {
      * @return Mono
      */
     private void subscribeOnServerStarted(){
-        // 订阅 zlm保活事件, 当zlm离线时做业务的处理
-        eventBus.subscribe(Subscription.of("ZLM-started",ZLMHttpHookSubscribe.HookType.on_server_started.name(), Subscription.Feature.local))
-            .flatMap(payload -> Mono.just(payload.bodyToJson(true)))
-            .flatMap(response -> {
-                ZLMServerConfig zlmServerConfig = JSONUtil.toBean(response.toJSONString(), ZLMServerConfig.class);
+        hookSubscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_server_started,null,
+            (MediaServerItem mediaServerItem, JSONObject response)->{
+                ZLMServerConfig zlmServerConfig = JSONUtil.toBean(response, ZLMServerConfig.class);
                 if (zlmServerConfig !=null ) {
                     if (startGetMedia != null) {
                         startGetMedia.remove(zlmServerConfig.getGeneralMediaServerId());
                     }
-                    return mediaServerItemService.zlmServerOnline(zlmServerConfig);
+                    mediaServerItemService.zlmServerOnline(zlmServerConfig).subscribe();
                 }
-                return Mono.empty();
-            })
-            .subscribe();
+            });
     }
 
     /**
@@ -127,17 +123,13 @@ public class ZLMRunner implements CommandLineRunner {
      */
     private void subscribeOnServerKeepalive(){
         // 订阅 zlm保活事件, 当zlm离线时做业务的处理
-        eventBus.subscribe(Subscription.of("ZLM-keepAlive",ZLMHttpHookSubscribe.HookType.on_server_keepalive.name(), Subscription.Feature.local))
-            .flatMap(payload -> Mono.just(payload.bodyToJson(true)))
-            .flatMap(response -> {
-                String mediaServerId = response.getString("mediaServerId");
+        hookSubscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_server_keepalive,null,
+            (MediaServerItem mediaServerItem,JSONObject response)->{
+                String mediaServerId = response.getStr("mediaServerId");
                 if (mediaServerId !=null ) {
-                    return mediaServerItemService
-                        .updateMediaServerKeepalive(mediaServerId, JSONUtil.parseObj(response.getJSONObject("data")));
+                    mediaServerItemService.updateMediaServerKeepalive(mediaServerId, response.getJSONObject("data")).subscribe();
                 }
-                return Mono.empty();
-            })
-            .subscribe();
+            });
     }
 
     /**
@@ -148,13 +140,15 @@ public class ZLMRunner implements CommandLineRunner {
         return  mediaServerItemService
             .createQuery()
             .fetch()
-            .defaultIfEmpty(
-                mediaConfig.getMediaSerItem()
-            )
+//            .defaultIfEmpty(
+//                mediaConfig.getMediaSerItem()
+//            )
             .doOnNext(mediaServerItem ->{
                 //将媒体服务器进行标记
                 startGetMedia=Optional.ofNullable(startGetMedia).orElse(new HashMap<>());
-                startGetMedia.put(mediaServerItem.getId(), true);
+                if(StrUtil.isNotEmpty(mediaServerItem.getId())){
+                    startGetMedia.put(mediaServerItem.getId(), true);
+                }
             })
             .parallel()
             .runOn(Schedulers.parallel())
@@ -191,12 +185,12 @@ public class ZLMRunner implements CommandLineRunner {
             .filter(ignore->Boolean.TRUE.equals(mediaServerItem.isDefaultServer()))
             //给予默认值
             .defaultIfEmpty(mediaServerItem)
-            .flatMap(ignore->Mono.just( zlmresTfulUtils.getMediaServerConfig(mediaServerItem)))
+            .flatMap(ignore->Mono.justOrEmpty( zlmresTfulUtils.getMediaServerConfig(mediaServerItem)))
             .flatMap(responseJson->{
                 ZLMServerConfig zlmServerConfig=null;
                 JSONArray data = responseJson.getJSONArray("data");
                 if (data != null && data.size() > 0) {
-                    zlmServerConfig = JSON.parseObject(JSON.toJSONString(data.get(0)), ZLMServerConfig.class);
+                    zlmServerConfig = JSONUtil.toBean(JSONUtil.toJsonStr(data.get(0)), ZLMServerConfig.class);
                     zlmServerConfig.setIp(mediaServerItem.getIp());
                 }
                 return Mono.just(zlmServerConfig);

+ 6 - 3
jetlinks-standalone/src/main/resources/application.yml

@@ -235,16 +235,19 @@ media:
   # [必须修改] zlm服务器的内网IP
   ip: 192.168.100.32
   # [必须修改] zlm服务器的http.port
-  http-port: 8080
+  http-port: 80
   # [可选] zlm服务器的hook.admin_params=secret
   secret: 035c73f7-bb6b-4889-a715-d9eb2d1925cc
   # 录像辅助服务, 部署此服务可以实现zlm录像的管理与下载, 0 表示不使用
   record-assist-port: 18081
   # 启用多端口模式, 多端口模式使用端口区分每路流,兼容性更好。 单端口使用流的ssrc区分, 点播超时建议使用多端口测试
   # [可选] 是否启用多端口模式, 开启后会在portRange范围内选择端口用于媒体流传输
-  rtp-enable: true
+  rtp-enable: false
   # [可选] 在此范围内选择端口用于媒体流传输,
   send-rtp-port-range: 10000,10000 # 端口范围
   # [可选] 国标级联在此范围内选择端口发送媒体流,
   rtp-port-range: 10000,10000 # 端口范围
-  hook-ip: 3c64674y21.wicp.vip
+  hook-ip: 192.168.104.244
+  rtp-proxy-port: 10000
+  rtmp-port: 80
+  rtsp-port: 80