Bladeren bron

add 通道更新

18339543638 3 jaren geleden
bovenliggende
commit
45397d2143

+ 22 - 0
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/bean/DeviceNode.java

@@ -0,0 +1,22 @@
+package org.jetlinks.community.media.bean;
+import lombok.Data;
+
+import java.util.*;
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName DeviceNode.java
+ * @Description 设备树
+ * @createTime 2022年02月19日 16:30:00
+ */
+@Data
+public class DeviceNode {
+    private String title;
+    private String key;
+    private boolean isLeaf;
+    private String icon;
+    private String deviceId;
+    private String channelId;
+    private List<DeviceNode> children;
+
+}

+ 6 - 2
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/controller/DeferredController.java

@@ -22,8 +22,12 @@ public interface DeferredController {
         //最长1分钟
         long currentTimeMillis = System.currentTimeMillis();
         while (!result.isSetOrExpired()){
-            if((System.currentTimeMillis()-currentTimeMillis)>60*1000){
-                return Mono.error(new TimeoutException("请求超时"));
+            if((System.currentTimeMillis()-currentTimeMillis)>10*1000){
+                if (playResult.getTimeout() != null) {
+                    playResult.getTimeout().subscribe();
+                    playResult.setTimeout(null);
+                }
+//                return Mono.error(new TimeoutException("请求超时"));
             }
         }
         Object data = result.getResult();

+ 104 - 8
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/controller/MediaDeviceController.java

@@ -1,22 +1,25 @@
 package org.jetlinks.community.media.controller;
 
+import cn.hutool.json.JSONObject;
+import com.google.common.collect.Maps;
 import io.swagger.v3.oas.annotations.Operation;
 import io.swagger.v3.oas.annotations.Parameter;
 import io.swagger.v3.oas.annotations.tags.Tag;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.hswebframework.web.authorization.annotation.Authorize;
-import org.hswebframework.web.authorization.annotation.CreateAction;
-import org.hswebframework.web.authorization.annotation.Resource;
-import org.hswebframework.web.authorization.annotation.SaveAction;
+import org.hswebframework.web.authorization.annotation.*;
 import org.hswebframework.web.crud.service.ReactiveCrudService;
 import org.hswebframework.web.crud.web.reactive.ReactiveServiceCrudController;
 import org.hswebframework.web.crud.web.reactive.ReactiveServiceQueryController;
 import org.hswebframework.web.exception.BusinessException;
+import org.jetlinks.community.media.bean.StreamInfo;
 import org.jetlinks.community.media.entity.MediaDevice;
 import org.jetlinks.community.media.gb28181.result.PlayResult;
 import org.jetlinks.community.media.gb28181.result.WVPResult;
+import org.jetlinks.community.media.service.LocalMediaDeviceChannelService;
 import org.jetlinks.community.media.service.LocalMediaDeviceService;
+import org.jetlinks.community.media.service.LocalMediaServerItemService;
+import org.jetlinks.community.media.service.LocalPlayService;
 import org.jetlinks.community.media.storage.impl.RedisCacheStorageImpl;
 import org.jetlinks.community.media.transmit.callback.DeferredResultHolder;
 import org.jetlinks.community.media.transmit.callback.RequestMessage;
@@ -25,8 +28,10 @@ import org.springframework.http.ResponseEntity;
 import org.springframework.web.bind.annotation.*;
 import org.springframework.web.context.request.async.DeferredResult;
 import reactor.core.publisher.Mono;
+import reactor.util.function.Tuple2;
 
-import java.util.UUID;
+import java.util.*;
+import java.util.stream.Collectors;
 
 /**
  * @author lifang
@@ -44,16 +49,104 @@ import java.util.UUID;
 @Tag(name = "媒体视频设备")
 public class MediaDeviceController implements ReactiveServiceCrudController<MediaDevice, String>,DeferredController{
     private final LocalMediaDeviceService mediaDeviceService;
-
     private final RedisCacheStorageImpl redisCacheStorage;
     private final DeferredResultHolder resultHolder;
     private final SipCommander cmder;
+    private final LocalPlayService playService;
+    private final LocalMediaDeviceChannelService deviceChannelService;
+    private final LocalMediaServerItemService mediaServerItemService;
     @Override
     public ReactiveCrudService<MediaDevice, String> getService() {
         return mediaDeviceService;
     }
 
+    @QueryAction
+    @Operation(summary = "设备点播")
+    @PostMapping("/{deviceId}/{channelId}/_start")
+    public Mono<Object> play(@PathVariable String deviceId,
+                             @PathVariable String channelId) {
+
+        return
+            //获取设备信息
+            mediaDeviceService.findById(deviceId)
+                //获取设备相连的媒体流服务器信息
+                .flatMap(playService::getNewMediaServerItem)
+                .switchIfEmpty(Mono.error(new BusinessException("未找到可用的zlm媒体服务器")))
+                .flatMap(mediaServerItem ->playService.play(mediaServerItem, deviceId, channelId, null, null))
+                .flatMap(this::deferredResultHandler);
+    }
+
+
+    @QueryAction
+    @Operation(summary = "停止点播")
+    @GetMapping("/{deviceId}/{channelId}/_stop")
+    public Mono<Object> playStop(@PathVariable String deviceId, @PathVariable String channelId) {
+        if(log.isDebugEnabled()){
+            log.debug(String.format("设备预览/回放停止API调用,streamId:%s_%s", deviceId, channelId ));
+        }
+
+        String uuid = cn.hutool.core.lang.UUID.randomUUID().toString();
+        DeferredResult<ResponseEntity<String>> result = new DeferredResult<>();
+        // 录像查询以channelId作为deviceId查询
+        String key = DeferredResultHolder.CALLBACK_CMD_STOP + deviceId + channelId;
+        resultHolder.put(key, uuid, result);
+        PlayResult<String> playResult = new PlayResult<String>();
+        playResult.setResult(result);
+        // 超时处理
+        playResult.onTimeout(Mono.fromRunnable(()->{
+            log.warn(String.format("设备预览/回放停止响应超时,deviceId/channelId:%s_%s ", deviceId, channelId));
+            RequestMessage msg = new RequestMessage();
+            msg.setId(uuid);
+            msg.setKey(key);
+            msg.setData("设备预览/回放停止响应超时");
+            resultHolder.invokeAllResult(msg);
+        }));
 
+        return Mono.justOrEmpty(redisCacheStorage.getDevice(deviceId))
+            .flatMap(device->
+                Mono.zip(cmder.streamByeCmd(deviceId, channelId, (event) -> {
+                    StreamInfo streamInfo = redisCacheStorage.queryPlayByDevice(deviceId, channelId);
+                    if (streamInfo == null) {
+                        RequestMessage msg = new RequestMessage();
+                        msg.setId(uuid);
+                        msg.setKey(key);
+                        msg.setData("点播未找到");
+                        resultHolder.invokeAllResult(msg);
+                        deviceChannelService.stopPlay(deviceId, channelId).subscribe();
+                    }else {
+                        redisCacheStorage.stopPlay(streamInfo);
+                        deviceChannelService.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId()).subscribe();
+                        RequestMessage msg = new RequestMessage();
+                        msg.setId(uuid);
+                        msg.setKey(key);
+                        msg.setData("success");
+                        resultHolder.invokeAllResult(msg);
+                    }
+                    mediaServerItemService.closeRTPServer(device, channelId);
+                })
+                    .mergeWith(Mono.fromRunnable(()->{
+                        if (deviceId != null || channelId != null) {
+                            JSONObject json = new JSONObject()
+                                .putOpt("deviceId", deviceId)
+                                .putOpt("channelId", channelId);
+                            RequestMessage msg = new RequestMessage();
+                            msg.setId(uuid);
+                            msg.setKey(key);
+                            msg.setData(json.toString());
+                            resultHolder.invokeAllResult(msg);
+                        } else {
+                            log.warn("设备预览/回放停止API调用失败!");
+                            RequestMessage msg = new RequestMessage();
+                            msg.setId(uuid);
+                            msg.setKey(key);
+                            msg.setData("streamId null");
+                            resultHolder.invokeAllResult(msg);
+                        }
+                    }))
+                    .then(),Mono.just(deferredResultHandler(playResult)))
+            )
+            .flatMap(Tuple2::getT2);
+    }
 
     @PostMapping("/{deviceId}/channels/_sync")
     @CreateAction
@@ -70,7 +163,7 @@ public class MediaDeviceController implements ReactiveServiceCrudController<Medi
         String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + id;
         String uuid = UUID.randomUUID().toString();
         //默认超时时间为30分钟
-        DeferredResult<ResponseEntity<MediaDevice>> result = new DeferredResult<>(30*60*1000L);
+        DeferredResult<ResponseEntity<MediaDevice>> result = new DeferredResult<>();
         playResult.setResult(result);
         playResult.onTimeout(Mono.fromRunnable(()->{
             log.warn("设备[{}]通道信息同步超时", id);
@@ -81,8 +174,10 @@ public class MediaDeviceController implements ReactiveServiceCrudController<Medi
             WVPResult<Object> wvpResult = new WVPResult<>();
             wvpResult.setCode(-1);
             wvpResult.setData(device);
-            wvpResult.setMsg("更新超时");
+            wvpResult.setMsg("设备响应超时,请检查设备是否在线或网络是否通畅");
             msg.setData(wvpResult);
+            //设备下线
+            mediaDeviceService.deviceOffline(device);
             resultHolder.invokeAllResult(msg);
         }));
         // 等待其他相同请求返回时一起返回
@@ -101,6 +196,7 @@ public class MediaDeviceController implements ReactiveServiceCrudController<Medi
             msg.setData(wvpResult);
             resultHolder.invokeAllResult(msg);
         })
+            .thenReturn(1L)
             .flatMap(ignore->deferredResultHandler(playResult));
     }
 

+ 64 - 0
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/controller/MediaGatewayController.java

@@ -0,0 +1,64 @@
+package org.jetlinks.community.media.controller;
+
+import io.swagger.v3.oas.annotations.tags.Tag;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.hswebframework.web.authorization.annotation.Authorize;
+import org.hswebframework.web.authorization.annotation.Resource;
+import org.jetlinks.community.media.bean.DeviceNode;
+import org.jetlinks.community.media.service.LocalMediaDeviceService;
+import org.jetlinks.core.device.DeviceOperationBroker;
+import org.jetlinks.core.message.AcknowledgeDeviceMessage;
+import org.jetlinks.core.message.DeviceMessageReply;
+import org.jetlinks.core.server.MessageHandler;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.time.Duration;
+import java.util.*;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName MediaGatawayController.java
+ * @Description TODO
+ * @createTime 2022年02月19日 16:23:00
+ */
+@RestController
+@RequestMapping("/media/gateway")
+@Slf4j
+@Authorize(ignore = true)
+@Resource(id="media-device",name = "设备平台")
+@AllArgsConstructor
+@Tag(name = "设备平台管理")
+public class MediaGatewayController {
+    private final LocalMediaDeviceService mediaDeviceService;
+
+    private final DeviceOperationBroker operationBroker;
+
+    private final MessageHandler messageHandler;
+    @GetMapping
+    public Flux<DeviceMessageReply> test(){
+        AcknowledgeDeviceMessage message = new AcknowledgeDeviceMessage();
+        message.deviceId("123");
+        message.messageId("123");
+        return operationBroker.handleReply("123","123", Duration.ofSeconds(10))
+            .mergeWith(messageHandler.reply(message).then(Mono.empty()));
+    }
+
+    @GetMapping("_query/no-paging")
+    public Mono<List> deviceTree(){
+        DeviceNode deviceNode = new DeviceNode();
+        deviceNode.setChannelId("channelId");
+        deviceNode.setDeviceId("deviceId");
+        deviceNode.setKey("key");
+        deviceNode.setIcon("icon");
+        deviceNode.setLeaf(false);
+        deviceNode.setChildren(new ArrayList<>());
+        deviceNode.setTitle("title");
+        return Mono.just(Arrays.asList(deviceNode));
+    }
+}

+ 0 - 83
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/controller/PlayController.java

@@ -50,93 +50,10 @@ public class PlayController implements DeferredController{
     private final DeferredResultHolder resultHolder;
 
     private final LocalMediaServerItemService mediaServerItemService;
-    @QueryAction
-    @Operation(summary = "设备点播")
-    @GetMapping("/start/{deviceId}/{channelId}")
-    public Mono<Object> play(@PathVariable String deviceId,
-                             @PathVariable String channelId) {
 
-        return
-            //获取设备信息
-            mediaDeviceService.findById(deviceId)
-                //获取设备相连的媒体流服务器信息
-                .flatMap(playService::getNewMediaServerItem)
-                .switchIfEmpty(Mono.error(new BusinessException("未找到可用的zlm媒体服务器")))
-                .flatMap(mediaServerItem ->playService.play(mediaServerItem, deviceId, channelId, null, null))
-                .flatMap(this::deferredResultHandler);
-    }
 
 
-    @QueryAction
-    @Operation(summary = "停止点播")
-    @GetMapping("/stop/{deviceId}/{channelId}")
-    public Mono<Object> playStop(@PathVariable String deviceId, @PathVariable String channelId) {
-        if(log.isDebugEnabled()){
-            log.debug(String.format("设备预览/回放停止API调用,streamId:%s_%s", deviceId, channelId ));
-        }
 
-        String uuid = UUID.randomUUID().toString();
-        DeferredResult<ResponseEntity<String>> result = new DeferredResult<>();
-        // 录像查询以channelId作为deviceId查询
-        String key = DeferredResultHolder.CALLBACK_CMD_STOP + deviceId + channelId;
-        resultHolder.put(key, uuid, result);
-        PlayResult<String> playResult = new PlayResult<String>();
-        playResult.setResult(result);
-        // 超时处理
-        playResult.onTimeout(Mono.fromRunnable(()->{
-            log.warn(String.format("设备预览/回放停止响应超时,deviceId/channelId:%s_%s ", deviceId, channelId));
-            RequestMessage msg = new RequestMessage();
-            msg.setId(uuid);
-            msg.setKey(key);
-            msg.setData("设备预览/回放停止响应超时");
-            resultHolder.invokeAllResult(msg);
-        }));
-
-        return Mono.justOrEmpty(redisCacheStorage.getDevice(deviceId))
-            .flatMap(device->
-                Mono.zip(cmder.streamByeCmd(deviceId, channelId, (event) -> {
-                    StreamInfo streamInfo = redisCacheStorage.queryPlayByDevice(deviceId, channelId);
-                    if (streamInfo == null) {
-                        RequestMessage msg = new RequestMessage();
-                        msg.setId(uuid);
-                        msg.setKey(key);
-                        msg.setData("点播未找到");
-                        resultHolder.invokeAllResult(msg);
-                        deviceChannelService.stopPlay(deviceId, channelId).subscribe();
-                    }else {
-                        redisCacheStorage.stopPlay(streamInfo);
-                        deviceChannelService.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId()).subscribe();
-                        RequestMessage msg = new RequestMessage();
-                        msg.setId(uuid);
-                        msg.setKey(key);
-                        msg.setData("success");
-                        resultHolder.invokeAllResult(msg);
-                    }
-                    mediaServerItemService.closeRTPServer(device, channelId);
-                })
-                    .mergeWith(Mono.fromRunnable(()->{
-                        if (deviceId != null || channelId != null) {
-                            JSONObject json = new JSONObject()
-                                .putOpt("deviceId", deviceId)
-                                .putOpt("channelId", channelId);
-                            RequestMessage msg = new RequestMessage();
-                            msg.setId(uuid);
-                            msg.setKey(key);
-                            msg.setData(json.toString());
-                            resultHolder.invokeAllResult(msg);
-                        } else {
-                            log.warn("设备预览/回放停止API调用失败!");
-                            RequestMessage msg = new RequestMessage();
-                            msg.setId(uuid);
-                            msg.setKey(key);
-                            msg.setData("streamId null");
-                            resultHolder.invokeAllResult(msg);
-                        }
-                    }))
-                    .then(),Mono.just(deferredResultHandler(playResult)))
-            )
-            .flatMap(Tuple2::getT2);
-    }
 //
 //	/**
 //	 * 将不是h264的视频通过ffmpeg 转码为h264 + aac

+ 2 - 1
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalMediaDeviceService.java

@@ -131,7 +131,7 @@ public class LocalMediaDeviceService extends GenericReactiveCrudService<MediaDev
      * 设备下线
      * @param mediaDevice 设备信息
      */
-    private void deviceOffline(MediaDevice mediaDevice){
+    public void deviceOffline(MediaDevice mediaDevice){
         //取消注册媒体设备
         redisCacheStorage.removeDevice(mediaDevice.getId());
         this.createUpdate()
@@ -150,6 +150,7 @@ public class LocalMediaDeviceService extends GenericReactiveCrudService<MediaDev
                     })
             )
             .subscribe();
+
     }
 
     @Subscribe("/media/device/*/*/unregister")

+ 2 - 2
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalMediaServerItemService.java

@@ -709,8 +709,8 @@ public class LocalMediaServerItemService extends GenericReactiveCrudService<Medi
         if (mediaInfo.getRtspSSLPort() != 0) {
             streamInfoResult.setRtsps(String.format("rtsps://%s:%s/%s/%s", addr, mediaInfo.getRtspSSLPort(), app,  stream));
         }
-        streamInfoResult.setFlv(String.format("http://%s:%s/%s/%s.flv", addr, mediaInfo.getHttpPort(), app,  stream));
-        streamInfoResult.setWs_flv(String.format("ws://%s:%s/%s/%s.flv", addr, mediaInfo.getHttpPort(), app,  stream));
+        streamInfoResult.setFlv(String.format("http://%s:%s/%s/%s.live.flv", addr, mediaInfo.getHttpPort(), app,  stream));
+        streamInfoResult.setWs_flv(String.format("ws://%s:%s/%s/%s.live.flv", addr, mediaInfo.getHttpPort(), app,  stream));
         streamInfoResult.setHls(String.format("http://%s:%s/%s/%s/hls.m3u8", addr, mediaInfo.getHttpPort(), app,  stream));
         streamInfoResult.setWs_hls(String.format("ws://%s:%s/%s/%s/hls.m3u8", addr, mediaInfo.getHttpPort(), app,  stream));
         streamInfoResult.setFmp4(String.format("http://%s:%s/%s/%s.live.mp4", addr, mediaInfo.getHttpPort(), app,  stream));

+ 38 - 39
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalPlayService.java

@@ -72,7 +72,6 @@ public class LocalPlayService  {
     private final UserSetup userSetup;
     public Mono<PlayResult> play(MediaServerItem mediaServerItem, String deviceId, String channelId,
                                  ZLMHttpHookSubscribe.Event hookEvent,  SipSubscribe.Event errorEvent) {
-        //todo
         PlayResult playResult = new PlayResult();
         RequestMessage msg = new RequestMessage();
         String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId;
@@ -117,44 +116,44 @@ public class LocalPlayService  {
                     resultHolder.invokeAllResult(msg))
                 .subscribe();
         }));
-        playResult.onComplete(Mono.fromRunnable(()->{
-            // 点播结束时调用截图接口
-            String fileName =  deviceId + "_" + channelId + ".jpg";
-            ResponseEntity responseEntity =  (ResponseEntity)result.getResult();
-            if (responseEntity != null && responseEntity.getStatusCode() == HttpStatus.OK) {
-                WVPResult wvpResult = (WVPResult)responseEntity.getBody();
-                if (Objects.requireNonNull(wvpResult).getCode() == 0) {
-                    StreamInfo streamInfoForSuccess = (StreamInfo)wvpResult.getData();
-                    mediaServerItemService.findById(streamInfoForSuccess.getMediaServerId())
-                        .doOnNext(mediaInfo->{
-                            String classPath = null;
-                            try {
-                                classPath = ResourceUtils.getURL("classpath:").getPath();
-                                // 兼容打包为jar的class路径
-                                if(classPath.contains("jar")) {
-                                    classPath = classPath.substring(0, classPath.lastIndexOf("."));
-                                    classPath = classPath.substring(0, classPath.lastIndexOf("/") + 1);
-                                }
-                                if (classPath.startsWith("file:")) {
-                                    classPath = classPath.substring(classPath.indexOf(":") + 1);
-                                }
-                                String path = classPath + "static/static/snap/";
-                                // 兼容Windows系统路径(去除前面的“/”)
-                                if(System.getProperty("os.name").contains("indows")) {
-                                    path = path.substring(1);
-                                }
-                                String streamUrl = streamInfoForSuccess.getRtsp();
-                                // 请求截图
-                                log.info("[请求截图]: " + fileName);
-                                zlmresTfulUtils.getSnap(mediaInfo, streamUrl, 15, 1, path, fileName);
-                            } catch (FileNotFoundException e) {
-                                log.error("存放截图文件路径不存在,",e);
-                            }
-                        })
-                        .subscribe();
-                }
-            }
-        }));
+//        playResult.onComplete(Mono.fromRunnable(()->{
+//            // 点播结束时调用截图接口
+//            String fileName =  deviceId + "_" + channelId + ".jpg";
+//            ResponseEntity responseEntity =  (ResponseEntity)result.getResult();
+//            if (responseEntity != null && responseEntity.getStatusCode() == HttpStatus.OK) {
+//                WVPResult wvpResult = (WVPResult)responseEntity.getBody();
+//                if (Objects.requireNonNull(wvpResult).getCode() == 0) {
+//                    StreamInfo streamInfoForSuccess = (StreamInfo)wvpResult.getData();
+//                    mediaServerItemService.findById(streamInfoForSuccess.getMediaServerId())
+//                        .doOnNext(mediaInfo->{
+//                            String classPath = null;
+//                            try {
+//                                classPath = ResourceUtils.getURL("classpath:").getPath();
+//                                // 兼容打包为jar的class路径
+//                                if(classPath.contains("jar")) {
+//                                    classPath = classPath.substring(0, classPath.lastIndexOf("."));
+//                                    classPath = classPath.substring(0, classPath.lastIndexOf("/") + 1);
+//                                }
+//                                if (classPath.startsWith("file:")) {
+//                                    classPath = classPath.substring(classPath.indexOf(":") + 1);
+//                                }
+//                                String path = classPath + "static/static/snap/";
+//                                // 兼容Windows系统路径(去除前面的“/”)
+//                                if(System.getProperty("os.name").contains("indows")) {
+//                                    path = path.substring(1);
+//                                }
+//                                String streamUrl = streamInfoForSuccess.getRtsp();
+//                                // 请求截图
+//                                log.info("[请求截图]: " + fileName);
+//                                zlmresTfulUtils.getSnap(mediaInfo, streamUrl, 15, 1, path, fileName);
+//                            } catch (FileNotFoundException e) {
+//                                log.error("存放截图文件路径不存在,",e);
+//                            }
+//                        })
+//                        .subscribe();
+//                }
+//            }
+//        }));
         if (streamInfo == null) {
             SSRCInfo ssrcInfo;
             String streamId = null;

+ 66 - 61
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/ZLMHttpHookListener.java

@@ -23,6 +23,7 @@ import org.jetlinks.community.media.service.LocalPlayService;
 import org.jetlinks.community.media.storage.impl.RedisCacheStorageImpl;
 import org.jetlinks.community.media.transmit.cmd.SipCommander;
 import org.jetlinks.community.media.zlm.dto.MediaItem;
+import org.jetlinks.community.media.zlm.dto.StreamProxyItem;
 import org.jetlinks.community.media.zlm.dto.StreamPushItem;
 import org.jetlinks.community.media.zlm.dto.OriginType;
 import org.jetlinks.community.media.zlm.entity.MediaServerItem;
@@ -52,7 +53,6 @@ public class ZLMHttpHookListener {
     private final LocalMediaServerItemService mediaServerItemService;
     private final RedisCacheStorageImpl redisCatchStorage;
     private final ZLMHttpHookSubscribe subscribe;
-    private final EventBus eventBus;
     private final LocalMediaDeviceChannelService channelService;
     private final ZLMMediaListManager zlmMediaListManager;
     private final LocalGbStreamService gbStreamService;
@@ -62,6 +62,8 @@ public class ZLMHttpHookListener {
     private final UserSetup userSetup;
     private final SipCommander sipCommander;
     private final LocalPlayService localPlayService;
+    private final LocalMediaDeviceChannelService mediaDeviceChannelService;
+
     /**
      * 服务器定时上报时间,上报间隔可配置,默认10s上报一次
      *
@@ -419,66 +421,69 @@ public class ZLMHttpHookListener {
             }))
             .then(Mono.just(ResponseEntity.ok(ret.toString())));
     }
-//
-//	/**
-//	 * 流无人观看时事件,用户可以通过此事件选择是否关闭无人看的流。
-//	 *
-//	 */
-//	@ResponseBody
-//	@PostMapping(value = "/on_stream_none_reader", produces = "application/json;charset=UTF-8")
-//	public ResponseEntity<String> onStreamNoneReader(@RequestBody JSONObject json){
-//
-//		if (logger.isDebugEnabled()) {
-//			logger.debug("[ ZLM HOOK ]on_stream_none_reader API调用,参数:" + json.toString());
-//		}
-//		String mediaServerId = json.getString("mediaServerId");
-//		String streamId = json.getString("stream");
-//		String app = json.getString("app");
-//		JSONObject ret = new JSONObject();
-//		ret.put("code", 0);
-//		if ("rtp".equals(app)){
-//			ret.put("close", true);
-//			StreamInfo streamInfoForPlayCatch = redisCatchStorage.queryPlayByStreamId(streamId);
-//			if (streamInfoForPlayCatch != null) {
-//				// 如果在给上级推流,也不停止。
-//				if (redisCatchStorage.isChannelSendingRTP(streamInfoForPlayCatch.getChannelId())) {
-//					ret.put("close", false);
-//				} else {
-//					cmder.streamByeCmd(streamInfoForPlayCatch.getDeviceID(), streamInfoForPlayCatch.getChannelId());
-//					redisCatchStorage.stopPlay(streamInfoForPlayCatch);
-//					storager.stopPlay(streamInfoForPlayCatch.getDeviceID(), streamInfoForPlayCatch.getChannelId());
-//				}
-//			}else{
-//				StreamInfo streamInfoForPlayBackCatch = redisCatchStorage.queryPlaybackByStreamId(streamId);
-//				if (streamInfoForPlayBackCatch != null) {
-//					cmder.streamByeCmd(streamInfoForPlayBackCatch.getDeviceID(), streamInfoForPlayBackCatch.getChannelId());
-//					redisCatchStorage.stopPlayback(streamInfoForPlayBackCatch);
-//				}else {
-//					StreamInfo streamInfoForDownload = redisCatchStorage.queryDownloadByStreamId(streamId);
-//					// 进行录像下载时无人观看不断流
-//					if (streamInfoForDownload != null) {
-//						ret.put("close", false);
-//					}
-//				}
-//			}
-//			MediaServerItem mediaServerItem = mediaServerService.getOneByServerId(mediaServerId);
-//			if (mediaServerItem != null && "-1".equals(mediaServerItem.getStreamNoneReaderDelayMS())) {
-//				ret.put("close", false);
-//			}
-//			return new ResponseEntity<String>(ret.toString(), HttpStatus.OK);
-//		}else {
-//			StreamProxyItem streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(app, streamId);
-//			if (streamProxyItem != null && streamProxyItem.isEnable_remove_none_reader()) {
-//				ret.put("close", true);
-//				streamProxyService.del(app, streamId);
-//				String url = streamProxyItem.getUrl() != null?streamProxyItem.getUrl():streamProxyItem.getSrc_url();
-//				logger.info("[{}/{}]<-[{}] 拉流代理无人观看已经移除",  app, streamId, url);
-//			}else {
-//				ret.put("close", false);
-//			}
-//			return new ResponseEntity<String>(ret.toString(), HttpStatus.OK);
-//		}
-//	}
+
+    /**
+     * 流无人观看时事件,用户可以通过此事件选择是否关闭无人看的流。
+     *
+     */
+    @ResponseBody
+    @PostMapping(value = "/on_stream_none_reader", produces = "application/json;charset=UTF-8")
+    public Mono<ResponseEntity<String>> onStreamNoneReader(@RequestBody JSONObject json){
+
+        if (log.isDebugEnabled()) {
+            log.debug("[ ZLM HOOK ]on_stream_none_reader API调用,参数:" + json.toString());
+        }
+        String mediaServerId = json.getStr("mediaServerId");
+        String streamId = json.getStr("stream");
+        String app = json.getStr("app");
+        JSONObject ret = new JSONObject()
+            .putOpt("code", 0);
+        if ("rtp".equals(app)){
+            ret.putOpt("close", true);
+            StreamInfo streamInfoForPlayCatch = redisCatchStorage.queryPlayByStreamId(streamId);
+            Mono<Void> result=null;
+            if (streamInfoForPlayCatch != null) {
+                // 如果在给上级推流,也不停止。
+
+                if (redisCatchStorage.isChannelSendingRTP(streamInfoForPlayCatch.getChannelId())) {
+                    ret.putOpt("close", false);
+                } else {
+                    result=sipCommander.streamByeCmd(streamInfoForPlayCatch.getDeviceID(), streamInfoForPlayCatch.getChannelId())
+                        .doOnNext(ignore->  redisCatchStorage.stopPlay(streamInfoForPlayCatch))
+                        .flatMap(ignore->  mediaDeviceChannelService.stopPlay(streamInfoForPlayCatch.getDeviceID(), streamInfoForPlayCatch.getChannelId()));
+                }
+            }else{
+                StreamInfo streamInfoForPlayBackCatch = redisCatchStorage.queryPlaybackByStreamId(streamId);
+                if (streamInfoForPlayBackCatch != null) {
+                    result=sipCommander.streamByeCmd(streamInfoForPlayBackCatch.getDeviceID(), streamInfoForPlayBackCatch.getChannelId())
+                        .doOnNext(ignore->  redisCatchStorage.stopPlayback(streamInfoForPlayBackCatch));
+                }else {
+                    StreamInfo streamInfoForDownload = redisCatchStorage.queryDownloadByStreamId(streamId);
+                    // 进行录像下载时无人观看不断流
+                    if (streamInfoForDownload != null) {
+                        ret.putOpt("close", false);
+                    }
+                }
+            }
+            MediaServerItem mediaServerItem =   redisCatchStorage.getMediaServerItem(mediaServerId);
+            if (mediaServerItem != null && "-1".equals(mediaServerItem.getStreamNoneReaderDelayMS())) {
+                ret.putOpt("close", false);
+            }
+            return result==null?Mono.just(ResponseEntity.ok(ret.toString())):result.thenReturn(ResponseEntity.ok(ret.toString()));
+        }else {
+            //todo
+//            StreamProxyItem streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(app, streamId);
+//            if (streamProxyItem != null && streamProxyItem.isEnable_remove_none_reader()) {
+                ret.putOpt("close", true);
+//                streamProxyService.del(app, streamId);
+//                String url = streamProxyItem.getUrl() != null?streamProxyItem.getUrl():streamProxyItem.getSrc_url();
+//                log.info("[{}/{}]<-[{}] 拉流代理无人观看已经移除",  app, streamId, url);
+//            }else {
+//                ret.putOpt("close", false);
+//            }
+            return Mono.just(ResponseEntity.ok(ret.toString()));
+        }
+    }
 
     /**
      * 流未找到事件,用户可以在此事件触发时,立即去拉流,这样可以实现按需拉流;此事件对回复不敏感。

+ 14 - 3
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/ZLMRESTfulUtils.java

@@ -49,8 +49,9 @@ public class ZLMRESTfulUtils {
                 .url(url)
                 .build();
             if (callback == null) {
+                Response response =null;
                 try {
-                    Response response = client.newCall(request).execute();
+                    response = client.newCall(request).execute();
                     if (response.isSuccessful()) {
                         ResponseBody responseBody = response.body();
                         if (responseBody != null) {
@@ -58,7 +59,6 @@ public class ZLMRESTfulUtils {
                             responseJSON = JSONUtil.parseObj(responseStr);
                         }
                     }else {
-                        response.close();
                         Objects.requireNonNull(response.body()).close();
                     }
                 } catch (ConnectException e) {
@@ -66,6 +66,10 @@ public class ZLMRESTfulUtils {
                     log.info("请检查media配置并确认ZLM已启动...");
                 }catch (IOException e) {
                     log.error(String.format("[ %s ]请求失败: %s", url, e.getMessage()));
+                }finally {
+                    if(response!=null){
+                        response.close();
+                    }
                 }
             }else {
                 client.newCall(request).enqueue(new Callback(){
@@ -78,6 +82,8 @@ public class ZLMRESTfulUtils {
                                 callback.run(JSONUtil.parseObj(responseStr));
                             } catch (IOException e) {
                                 log.error(String.format("[ %s ]请求失败: %s", url, e.getMessage()));
+                            }finally {
+                                response.close();
                             }
 
                         }else {
@@ -119,11 +125,12 @@ public class ZLMRESTfulUtils {
                 .url(httpBuilder.build())
                 .build();
         log.info(request.toString());
+        Response response=null;
         try {
             OkHttpClient client = new OkHttpClient.Builder()
                     .readTimeout(30, TimeUnit.SECONDS)
                     .build();
-            Response response = client.newCall(request).execute();
+            response = client.newCall(request).execute();
             if (response.isSuccessful()) {
                 log.info("response body contentType: " + Objects.requireNonNull(response.body()).contentType());
                 if (targetPath != null) {
@@ -151,6 +158,10 @@ public class ZLMRESTfulUtils {
             log.info("请检查media配置并确认ZLM已启动...");
         } catch (IOException e) {
             log.error(String.format("[ %s ]请求失败: %s", url, e.getMessage()));
+        }finally {
+            if(response!=null){
+                response.close();
+            }
         }
     }