18339543638 3 rokov pred
rodič
commit
9efea35700
34 zmenil súbory, kde vykonal 964 pridanie a 531 odobranie
  1. 1 1
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/DeviceFirmwareTaskController.java
  2. 1 1
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/DeviceFirmwareUpgradeHistoryController.java
  3. 0 22
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/bean/GbStream.java
  4. 3 1
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/bean/StreamInfo.java
  5. 25 0
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/bean/ThirdPartyGB.java
  6. 137 0
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/config/UserSetup.java
  7. 34 12
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/controller/PlayController.java
  8. 46 0
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/entity/GbStream.java
  9. 5 1
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/gb28181/result/WVPResult.java
  10. 24 0
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalGbStreamService.java
  11. 19 0
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalMediaDeviceChannelService.java
  12. 37 23
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalMediaServerItemService.java
  13. 113 118
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalPlayService.java
  14. 1 1
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/impl/InviteRequestProcessor.java
  15. 9 2
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/impl/RegisterRequestProcessor.java
  16. 4 3
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/message/MessageRequestProcessor.java
  17. 13 7
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/message/notify/CatalogResponseMessageProcessor.java
  18. 1 3
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/message/notify/DeviceInfoResponseMessageProcessor.java
  19. 2 0
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/message/notify/NoSupportMessageHandler.java
  20. 16 13
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/storage/impl/RedisCacheStorageImpl.java
  21. 1 1
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/transmit/callback/DeferredResultHolder.java
  22. 2 0
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/transmit/callback/RequestMessage.java
  23. 123 103
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/transmit/cmd/SipCommander.java
  24. 136 81
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/ZLMHttpHookListener.java
  25. 136 112
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/ZLMMediaListManager.java
  26. 12 12
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/ZLMRunner.java
  27. 1 1
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/dto/StreamProxyItem.java
  28. 20 6
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/dto/StreamPushItem.java
  29. 11 1
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/entity/MediaServerItem.java
  30. 1 1
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/event/ZLMKeepliveTimeoutListener.java
  31. 25 0
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/utils/ZlmSubscribeTopic.java
  32. 1 1
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/RuleSceneController.java
  33. 3 4
      jetlinks-standalone/src/main/resources/application.yml
  34. 1 0
      jetlinks-standalone/src/main/resources/logback-spring.xml

+ 1 - 1
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/DeviceFirmwareTaskController.java

@@ -24,7 +24,7 @@ import java.util.*;
 @RestController
 @RequestMapping({"/firmware/upgrade/task","/firmware-upgrade-task"})
 @Authorize
-@Resource(id = "/firmware-upgrade-task", name = "设备固件升级任务")
+@Resource(id = "firmware-upgrade-task", name = "设备固件升级任务")
 @Slf4j
 @AllArgsConstructor
 @Tag(name = "设备固件升级任务接口")

+ 1 - 1
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/DeviceFirmwareUpgradeHistoryController.java

@@ -28,7 +28,7 @@ import java.util.ArrayList;
 @RestController
 @RequestMapping({"/firmware/upgrade/history","/firmware-upgrade-history"})
 @Authorize
-@Resource(id = "/firmware-upgrade-history", name = "设备固件升级记录")
+@Resource(id = "firmware-upgrade-history", name = "设备固件升级记录")
 @Slf4j
 @AllArgsConstructor
 @Tag(name = "设备固件升级记录接口")

+ 0 - 22
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/bean/GbStream.java

@@ -1,22 +0,0 @@
-package org.jetlinks.community.media.bean;
-
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-
-/**
- * 直播流关联国标上级平台
- */
-@Data
-@EqualsAndHashCode(callSuper = true)
-public class GbStream extends PlatformGbStream {
-
-    private String app;
-    private String stream;
-    private String gbId;
-    private String name;
-    private String mediaServerId;
-    private double longitude;
-    private double latitude;
-    private String streamType;
-    private boolean status;
-}

+ 3 - 1
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/bean/StreamInfo.java

@@ -1,8 +1,10 @@
 package org.jetlinks.community.media.bean;
 
 
-public class StreamInfo {
+import java.io.Serializable;
 
+public class StreamInfo implements Serializable {
+    private static final long serialVersionUID = 1L;
     private String app;
     private String streamId;
     private String deviceID;

+ 25 - 0
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/bean/ThirdPartyGB.java

@@ -0,0 +1,25 @@
+package org.jetlinks.community.media.bean;
+
+import java.io.Serializable;
+
+public class ThirdPartyGB implements Serializable {
+    private static final long serialVersionUID = 1L;
+    private String name;
+    private String nationalStandardNo;
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getNationalStandardNo() {
+        return nationalStandardNo;
+    }
+
+    public void setNationalStandardNo(String nationalStandardNo) {
+        this.nationalStandardNo = nationalStandardNo;
+    }
+}

+ 137 - 0
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/config/UserSetup.java

@@ -0,0 +1,137 @@
+package org.jetlinks.community.media.config;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+@Component
+@ConfigurationProperties(prefix = "user-settings", ignoreInvalidFields = true)
+public class UserSetup {
+
+    private Boolean savePositionHistory = Boolean.FALSE;
+
+    private Boolean autoApplyPlay = Boolean.FALSE;
+
+    private Boolean seniorSdp = Boolean.FALSE;
+
+    private Long playTimeout = 18000L;
+
+    private Boolean waitTrack = Boolean.FALSE;
+
+    private Boolean interfaceAuthentication = Boolean.TRUE;
+
+    private Boolean recordPushLive = Boolean.FALSE;
+
+    private Boolean logInDatebase = Boolean.TRUE;
+
+    private Boolean redisConfig = Boolean.TRUE;
+
+    private String serverId = "000000";
+
+    private String thirdPartyGBIdReg = "[\\s\\S]*";
+
+    private List<String> interfaceAuthenticationExcludes = new ArrayList<>();
+
+    public Boolean getSavePositionHistory() {
+        return savePositionHistory;
+    }
+
+    public Boolean isSavePositionHistory() {
+        return savePositionHistory;
+    }
+
+    public Boolean isAutoApplyPlay() {
+        return autoApplyPlay;
+    }
+
+    public Boolean isSeniorSdp() {
+        return seniorSdp;
+    }
+
+    public Long getPlayTimeout() {
+        return playTimeout;
+    }
+
+    public Boolean isWaitTrack() {
+        return waitTrack;
+    }
+
+    public Boolean isInterfaceAuthentication() {
+        return interfaceAuthentication;
+    }
+
+    public Boolean isRecordPushLive() {
+        return recordPushLive;
+    }
+
+    public List<String> getInterfaceAuthenticationExcludes() {
+        return interfaceAuthenticationExcludes;
+    }
+
+    public void setSavePositionHistory(Boolean savePositionHistory) {
+        this.savePositionHistory = savePositionHistory;
+    }
+
+    public void setAutoApplyPlay(Boolean autoApplyPlay) {
+        this.autoApplyPlay = autoApplyPlay;
+    }
+
+    public void setSeniorSdp(Boolean seniorSdp) {
+        this.seniorSdp = seniorSdp;
+    }
+
+    public void setPlayTimeout(Long playTimeout) {
+        this.playTimeout = playTimeout;
+    }
+
+    public void setWaitTrack(Boolean waitTrack) {
+        this.waitTrack = waitTrack;
+    }
+
+    public void setInterfaceAuthentication(boolean interfaceAuthentication) {
+        this.interfaceAuthentication = interfaceAuthentication;
+    }
+
+    public void setRecordPushLive(Boolean recordPushLive) {
+        this.recordPushLive = recordPushLive;
+    }
+
+    public void setInterfaceAuthenticationExcludes(List<String> interfaceAuthenticationExcludes) {
+        this.interfaceAuthenticationExcludes = interfaceAuthenticationExcludes;
+    }
+
+    public Boolean getLogInDatebase() {
+        return logInDatebase;
+    }
+
+    public void setLogInDatebase(Boolean logInDatebase) {
+        this.logInDatebase = logInDatebase;
+    }
+
+    public String getServerId() {
+        return serverId;
+    }
+
+    public void setServerId(String serverId) {
+        this.serverId = serverId;
+    }
+
+    public String getThirdPartyGBIdReg() {
+        return thirdPartyGBIdReg;
+    }
+
+    public void setThirdPartyGBIdReg(String thirdPartyGBIdReg) {
+        this.thirdPartyGBIdReg = thirdPartyGBIdReg;
+    }
+
+    public Boolean getRedisConfig() {
+        return redisConfig;
+    }
+
+    public void setRedisConfig(Boolean redisConfig) {
+        this.redisConfig = redisConfig;
+    }
+}

+ 34 - 12
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/controller/PlayController.java

@@ -5,15 +5,34 @@ import io.swagger.v3.oas.annotations.Operation;
 import io.swagger.v3.oas.annotations.tags.Tag;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.lucene.search.BooleanClause;
 import org.hswebframework.web.authorization.annotation.Authorize;
 import org.hswebframework.web.authorization.annotation.QueryAction;
 import org.hswebframework.web.authorization.annotation.Resource;
 import org.hswebframework.web.exception.BusinessException;
+import org.jetlinks.community.media.gb28181.result.WVPResult;
 import org.jetlinks.community.media.service.LocalMediaDeviceService;
 import org.jetlinks.community.media.service.LocalPlayService;
+import org.jetlinks.core.event.EventBus;
+import org.jetlinks.core.event.Subscription;
+import org.jetlinks.core.event.TopicPayload;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpStatus;
 import org.springframework.web.bind.annotation.*;
+import org.springframework.web.context.request.async.DeferredResult;
+import reactor.core.publisher.EmitterProcessor;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
 import reactor.core.publisher.Mono;
 
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
 
 @RestController
 @RequestMapping("/device/directives")
@@ -60,11 +79,8 @@ public class PlayController {
     @QueryAction
     @Operation(summary = "设备点播")
     @GetMapping("/start/{deviceId}/{channelId}")
-//    public Mono<ResponseEntity<String>> play(@PathVariable String deviceId,
-//                                             @PathVariable String channelId) {
-        public Mono<?> play(@PathVariable String deviceId,
-            @PathVariable String channelId) {
-
+    public Mono<Object> play(@PathVariable String deviceId,
+                                @PathVariable String channelId) {
 
         return
             //获取设备信息
@@ -72,12 +88,18 @@ public class PlayController {
                 //获取设备相连的媒体流服务器信息
                 .flatMap(playService::getNewMediaServerItem)
                 .switchIfEmpty(Mono.error(new BusinessException("未找到可用的zlm媒体服务器")))
-                .flatMap(mediaServerItem ->
-                    playService.play(mediaServerItem, deviceId, channelId, null, null)
-                );
-
-//        return playResult.getResult();
+                .flatMap(mediaServerItem ->{
+                        try {
+                            return playService.play(mediaServerItem, deviceId, channelId, null, null);
+                        } catch (InterruptedException e) {
+                            log.error("设备点播线程中断,请稍后再试,",e);
+                            return Mono.error(new BusinessException("服务器繁忙,请稍后再试"));
+                        }
+                    }
+                )
+                .flatMap(wvpResult -> Mono.justOrEmpty(wvpResult.getData()));
     }
+
 //
 //	@ApiOperation("停止点播")
 //	@ApiImplicitParams({
@@ -167,7 +189,7 @@ public class PlayController {
 //			logger.warn("视频转码API调用失败!, 视频流已经停止!");
 //			return new ResponseEntity<String>("未找到视频流信息, 视频流可能已经停止", HttpStatus.OK);
 //		}
-//		MediaServerItem mediaInfo = mediaServerService.getOne(streamInfo.getMediaServerId());
+//		MediaServerItem mediaInfo = mediaServerService.getOneByServerId(streamInfo.getMediaServerId());
 //		JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaInfo, streamId);
 //		if (!rtpInfo.getBoolean("exist")) {
 //			logger.warn("视频转码API调用失败!, 视频流已停止推流!");
@@ -212,7 +234,7 @@ public class PlayController {
 //			result.put("msg", "mediaServerId is null");
 //			return new ResponseEntity<String>( result.toJSONString(), HttpStatus.BAD_REQUEST);
 //		}
-//		MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
+//		MediaServerItem mediaInfo = mediaServerService.getOneByServerId(mediaServerId);
 //		if (mediaInfo == null) {
 //			result.put("code", 0);
 //			result.put("msg", "使用的流媒体已经停止运行");

+ 46 - 0
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/entity/GbStream.java

@@ -0,0 +1,46 @@
+package org.jetlinks.community.media.entity;
+
+import io.swagger.v3.oas.annotations.media.Schema;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.hswebframework.web.api.crud.entity.GenericEntity;
+import org.jetlinks.community.media.bean.PlatformGbStream;
+
+import javax.persistence.Column;
+import javax.persistence.Table;
+
+/**
+ * 直播流关联国标上级平台
+ */
+@Data
+@Table(name = "gb_stream", schema = "直播流关联国标上级平台")
+public class GbStream  extends GenericEntity<String> {
+    @Column(name = "app")
+    private String app;
+    @Column(name = "stream")
+    private String stream;
+    @Column(name = "platform_id")
+
+    private String platformId;
+    @Column(name = "catalog_id")
+    @Schema(
+        description = "产品id"
+    )
+    private String catalogId;
+    @Column(name = "gb_id")
+    private String gbId;
+    @Column(name = "name")
+    private String name;
+    @Column(name = "media_server_id")
+    private String mediaServerId;
+    @Column(name = "longitude")
+    private double longitude;
+    @Column(name = "latitude")
+    private double latitude;
+    @Column(name = "stream_type")
+    private String streamType;
+    @Column(name = "status")
+    private boolean status;
+    @Column(name = "create_stamp")
+    private Long createStamp;
+}

+ 5 - 1
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/gb28181/result/WVPResult.java

@@ -2,10 +2,14 @@ package org.jetlinks.community.media.gb28181.result;
 
 import lombok.AllArgsConstructor;
 import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
 
 @Data
 @AllArgsConstructor(staticName = "of")
-public class WVPResult<T> {
+@NoArgsConstructor
+public class WVPResult<T> implements Serializable {
 
     private int code;
     private String msg;

+ 24 - 0
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalGbStreamService.java

@@ -0,0 +1,24 @@
+package org.jetlinks.community.media.service;
+
+
+
+import lombok.extern.slf4j.Slf4j;
+import org.hswebframework.web.crud.service.GenericReactiveCrudService;
+import org.jetlinks.community.media.entity.GbStream;
+import org.springframework.stereotype.Service;
+import reactor.core.publisher.Mono;
+
+/**
+ * 媒体服务器节点管理
+ */
+@Service
+@Slf4j
+public class LocalGbStreamService extends GenericReactiveCrudService<GbStream, String>  {
+
+    public Mono<GbStream> getByAppAndStreamId(String app,String streamId){
+        return this.createQuery()
+            .where(GbStream::getApp,app)
+            .where(GbStream::getStream,streamId)
+            .fetchOne();
+    }
+}

+ 19 - 0
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalMediaDeviceChannelService.java

@@ -2,7 +2,9 @@ package org.jetlinks.community.media.service;
 
 import lombok.AllArgsConstructor;
 import org.hswebframework.web.crud.service.GenericReactiveCrudService;
+import org.jetlinks.community.media.bean.StreamInfo;
 import org.jetlinks.community.media.entity.MediaDeviceChannel;
+import org.jetlinks.community.media.storage.impl.RedisCacheStorageImpl;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 import reactor.core.publisher.Mono;
@@ -20,6 +22,7 @@ import java.util.Set;
 @Service
 @AllArgsConstructor
 public class LocalMediaDeviceChannelService extends GenericReactiveCrudService<MediaDeviceChannel, String> {
+    private final RedisCacheStorageImpl redisCacheStorage;
     public Mono<Void> startPlay(String deviceId, String channelId, String streamId) {
         return this.createUpdate()
             .set(MediaDeviceChannel::getStreamId,streamId)
@@ -49,4 +52,20 @@ public class LocalMediaDeviceChannelService extends GenericReactiveCrudService<M
 
         return Mono.empty();
     }
+
+    public Mono<Void>  stopPlayback(StreamInfo streamInfo) {
+        if(streamInfo==null){
+            return Mono.empty();
+        }
+        return this.createQuery()
+            .where(MediaDeviceChannel::getDeviceId,streamInfo.getDeviceID())
+            .where(MediaDeviceChannel::getChannelId,streamInfo.getChannelId())
+            .fetchOne()
+            .flatMap(channel->{
+                channel.setStreamId(null);
+                channel.setDeviceId(streamInfo.getDeviceID());
+                return this.updateById(channel.getId(),channel).then();
+            })
+            .then(Mono.fromRunnable(()->redisCacheStorage.stopPlayback(streamInfo)));
+    }
 }

+ 37 - 23
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalMediaServerItemService.java

@@ -26,10 +26,7 @@ import org.springframework.stereotype.Service;
 import reactor.core.publisher.Mono;
 
 import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
 /**
  * 媒体服务器节点管理
@@ -109,14 +106,22 @@ public class LocalMediaServerItemService extends GenericReactiveCrudService<Medi
         this.createQuery()
             .fetch()
             .filter(item->StrUtil.isNotEmpty(item.getId()))
-            .doOnNext(mediaServerItem->{
+            .flatMap(mediaServerItem->{
                 if (mediaServerItem.getSsrcConfig() == null) {
                     //对ssrc进行更新 todo
                     SsrcConfig ssrcConfig = new SsrcConfig(mediaServerItem.getId(), null,"340200000");
                     mediaServerItem.setSsrcConfig(ssrcConfig);
+//                    redisUtil.set(VideoManagerConstants.MEDIA_SERVER_PREFIX +serverId + "_" + mediaServerItem.getServerId(), mediaServerItem);
+                    return this.updateById(mediaServerItem.getId(),mediaServerItem).thenReturn(mediaServerItem);
                 }
+                return Mono.just(mediaServerItem);
+            })
+            .doOnNext(mediaServerItem->{
+//                String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + serverId + "_" + mediaServerItem.getId();
+//                if(!redisUtil.hasKey(key)){
+                redisUtil.set(VideoManagerConstants.MEDIA_SERVER_PREFIX + serverId + "_" + mediaServerItem.getServerId(),mediaServerItem);
+//                }
             })
-            .doOnNext(mediaServerItem->redisUtil.set(VideoManagerConstants.MEDIA_SERVER_PREFIX + serverId + "_" + mediaServerItem.getId(),mediaServerItem))
             .subscribe();
     }
 
@@ -160,7 +165,7 @@ public class LocalMediaServerItemService extends GenericReactiveCrudService<Medi
 
     public void closeRTPServer(MediaDevice device, String channelId) {
         String mediaServerId = streamSession.getMediaServerId(device.getId(), channelId);
-        MediaServerItem mediaServerItem = this.getOne(mediaServerId);
+        MediaServerItem mediaServerItem = this.getOneByServerId(mediaServerId);
         if (mediaServerItem != null) {
             //更新sstc信息
             String streamId = String.format("%s_%s", device.getId(), channelId);
@@ -188,14 +193,21 @@ public class LocalMediaServerItemService extends GenericReactiveCrudService<Medi
     private void clearRTPServer(MediaServerItem mediaServerItem) {
 //        mediaServerItem.setSsrcConfig(new SsrcConfig(mediaServerItem.getId(), null, sipConfig.getDomain()));
         mediaServerItem.setSsrcConfig(new SsrcConfig(mediaServerItem.getId(), null,"340200000"));
-        redisUtil.zAdd(VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + serverId, mediaServerItem.getId(), 0);
+        redisUtil.zAdd(VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + serverId, mediaServerItem.getServerId(), 0);
+    }
+
+
+    private Mono<MediaServerItem> getByServerId(String serverId){
+        return this.createQuery()
+            .where(MediaServerItem::getServerId,serverId)
+            .fetchOne();
     }
 //
 //
 //    @Override
 //    public void update(MediaServerItem mediaSerItem) {
 //        mediaServerMapper.update(mediaSerItem);
-//        MediaServerItem mediaServerItemInRedis = getOne(mediaSerItem.getId());
+//        MediaServerItem mediaServerItemInRedis = getOneByServerId(mediaSerItem.getId());
 //        MediaServerItem mediaServerItemInDataBase = mediaServerMapper.queryOne(mediaSerItem.getId());
 //        if (mediaServerItemInRedis != null && mediaServerItemInRedis.getSsrcConfig() != null) {
 //            mediaServerItemInDataBase.setSsrcConfig(mediaServerItemInRedis.getSsrcConfig());
@@ -265,7 +277,7 @@ public class LocalMediaServerItemService extends GenericReactiveCrudService<Medi
      * @param mediaServerId 服务id
      * @return MediaServerItem
      */
-    public MediaServerItem getOne(String mediaServerId) {
+    public MediaServerItem getOneByServerId(String mediaServerId) {
         if (mediaServerId == null) {
             return null;
         }
@@ -279,10 +291,10 @@ public class LocalMediaServerItemService extends GenericReactiveCrudService<Medi
 //    }
 //
 
-    //todo
     public Mono<MediaServerItem> getDefaultMediaServer() {
-//        return mediaServerMapper.queryDefault();
-        return Mono.empty();
+        return this.createQuery()
+            .where(MediaServerItem::isDefaultServer,true)
+            .fetchOne();
     }
 
     public Mono<Void> clearMediaServerForOnline() {
@@ -332,7 +344,7 @@ public class LocalMediaServerItemService extends GenericReactiveCrudService<Medi
         log.info("[ ZLM:{} ]-[ {}:{} ]已连接",
             zlmServerConfig.getGeneralMediaServerId(), zlmServerConfig.getIp(), zlmServerConfig.getHttpPort());
         //根据id主键查询
-        return this.findById(zlmServerConfig.getGeneralMediaServerId())
+        return this.findById(Optional.ofNullable(zlmServerConfig.getGeneralMediaServerId()).orElse("-1"))
             .switchIfEmpty(this.createQuery()
                 .where(MediaServerItem::getIp,zlmServerConfig.getIp())
                 .where(MediaServerItem::getHttpPort,zlmServerConfig.getHttpPort())
@@ -367,8 +379,8 @@ public class LocalMediaServerItemService extends GenericReactiveCrudService<Medi
             })
             //更新zlm服务器信息
             .doOnNext(serverItem->{
-                if (StrUtil.isEmpty(serverItem.getId())) {
-                    serverItem.setId(zlmServerConfig.getGeneralMediaServerId());
+                if (StrUtil.isEmpty(serverItem.getServerId())) {
+                    serverItem.setServerId(zlmServerConfig.getGeneralMediaServerId());
                     this.createUpdate()
                         .set(serverItem)
                         .where(MediaServerItem::getIp,zlmServerConfig.getIp())
@@ -389,7 +401,7 @@ public class LocalMediaServerItemService extends GenericReactiveCrudService<Medi
                 if (redisUtil.get(key) == null) {
                     //todo
 //                    SsrcConfig ssrcConfig = new SsrcConfig(zlmServerConfig.getGeneralMediaServerId(), null, sipconfig.getDomain());
-                    SsrcConfig ssrcConfig = new SsrcConfig(zlmServerConfig.getGeneralMediaServerId(), null," sipconfig.getDomain()");
+                    SsrcConfig ssrcConfig = new SsrcConfig(zlmServerConfig.getGeneralMediaServerId(), null,"340200000");
                     serverItem.setSsrcConfig(ssrcConfig);
                 }else {
                     MediaServerItem mediaServerItemInRedis = (MediaServerItem)redisUtil.get(key);
@@ -423,15 +435,15 @@ public class LocalMediaServerItemService extends GenericReactiveCrudService<Medi
         // 更新缓存
         String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + serverId;
         // 使用zset的分数作为当前并发量, 默认值设置为0
-        if (redisUtil.zScore(key, serverItem.getId()) == null) {  // 不存在则设置默认值 已存在则重置
-            redisUtil.zAdd(key, serverItem.getId(), 0L);
+        if (redisUtil.zScore(key, serverItem.getServerId()) == null) {  // 不存在则设置默认值 已存在则重置
+            redisUtil.zAdd(key, serverItem.getServerId(), 0L);
             // 查询服务流数量
             zlmresTfulUtils.getMediaList(serverItem, null, null, "rtmp",(mediaList ->{
                 Integer code = mediaList.getInt("code");
                 if (code == 0) {
                     JSONArray data = mediaList.getJSONArray("data");
                     if (data != null) {
-                        redisUtil.zAdd(key, serverItem.getId(), data.size());
+                        redisUtil.zAdd(key, serverItem.getServerId(), data.size());
                     }
                 }
             }));
@@ -477,10 +489,10 @@ public class LocalMediaServerItemService extends GenericReactiveCrudService<Medi
 
         // 获取分数最低的,及并发最低的
         Set<Object> objects = redisUtil.ZRange(key, 0, -1);
-        ArrayList<Object> mediaServerObjectS = new ArrayList<>(objects);
+        ArrayList<Object> mediaServerObject = new ArrayList<>(objects);
 
-        String mediaServerId = (String)mediaServerObjectS.get(0);
-        return Mono.just(getOne(mediaServerId));
+        String mediaServerId = (String)mediaServerObject.get(0);
+        return Mono.justOrEmpty(getOneByServerId(mediaServerId));
     }
 
     /**
@@ -501,6 +513,8 @@ public class LocalMediaServerItemService extends GenericReactiveCrudService<Medi
         Map<String, Object> param = new HashMap<>();
         param.put("api.secret",mediaServerItem.getSecret()); // -profile:v Baseline
         param.put("ffmpeg.cmd","%s -fflags nobuffer -i %s -c:a aac -strict -2 -ar 44100 -ab 48k -c:v libx264  -f flv %s");
+        param.put("ffmpeg.bin","/usr/local/ffmpeg/bin/ffmpeg");
+//        param.put("ffmpeg.snap","%25s -rtsp_transport tcp -i %25s -y -f mjpeg -t 0.001 %25s");
         param.put("hook.enable","1");
         param.put("hook.on_flow_report","");
         param.put("hook.on_play",String.format("%s/on_play", hookPrex));

+ 113 - 118
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalPlayService.java

@@ -4,6 +4,7 @@ import cn.hutool.core.lang.UUID;
 import cn.hutool.core.util.StrUtil;
 import cn.hutool.json.JSONArray;
 import cn.hutool.json.JSONObject;
+import gov.nist.javax.sip.stack.SIPDialog;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.hswebframework.web.exception.BusinessException;
@@ -27,12 +28,18 @@ import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
 import org.springframework.stereotype.Service;
 import org.springframework.util.ResourceUtils;
-import reactor.core.publisher.Mono;
+import org.springframework.web.context.request.async.DeferredResult;
+import reactor.core.Disposable;
+import reactor.core.publisher.*;
+
 import java.io.FileNotFoundException;
 import java.time.Duration;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.function.Function;
 
 @SuppressWarnings(value = {"rawtypes", "unchecked"})
 @Service
@@ -40,9 +47,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 @AllArgsConstructor
 public class LocalPlayService  {
 
-    ////    @Autowired
-////    private IVideoManagerStorager storager;
-//
+    //    @Autowired
+//    private IVideoManagerStorager storager;
     private final SipCommander cmder;
 
     private final RedisCacheStorageImpl redisCatchStorage;
@@ -56,130 +62,121 @@ public class LocalPlayService  {
     private final VideoStreamSessionManager streamSession;
 
     private final LocalMediaDeviceChannelService deviceChannelService;
-    //    @Autowired
-////    private RedisUtil redis;
-////
-    private DeferredResultHolder resultHolder;
 
     private final ZLMRESTfulUtils zlmresTfulUtils;
 
-//
-//    @Autowired
-//    private UserSetup userSetup;
-
-    public Mono<ResponseEntity> play(MediaServerItem mediaServerItem, String deviceId, String channelId,
-                                     ZLMHttpHookSubscribe.Event hookEvent, ZLMHttpHookSubscribe.Event errorEvent) {
+    public Mono<WVPResult> play(MediaServerItem mediaServerItem, String deviceId, String channelId,
+                                ZLMHttpHookSubscribe.Event hookEvent, ZLMHttpHookSubscribe.Event errorEvent) throws InterruptedException {
         //todo
         mediaServerItem.setRtpEnable(false);
         mediaServerItem.setRtpProxyPort(10000);
         PlayResult playResult = new PlayResult();
-
+        LinkedBlockingDeque<WVPResult> result = new LinkedBlockingDeque<>();
         String msgId=UUID.randomUUID().toString();
 
         RequestMessage msg = RequestMessage.of(deviceId,channelId,msgId,null);
 
-        final AtomicBoolean start=new AtomicBoolean(false);
-
-
-
 
-            return eventBus.subscribe(
-                Subscription.of("media_play",DeferredResultHolder.getTopicByDeviceIdAndChannelId(DeferredResultHolder.CALLBACK_CMD_PLAY, msg), Subscription.Feature.local))
-                .mergeWith(   Mono.justOrEmpty(redisCatchStorage.queryPlayByDevice(deviceId,channelId))
-                    .flatMap(streamInfo -> {
-                        if(StrUtil.isEmpty(streamInfo.getStreamId())){
-                            return Mono.error(new BusinessException("点播失败, redis缓存streamId等于null"));
-                        }
-                        String mediaServerId = streamInfo.getMediaServerId();
-                        MediaServerItem mediaInfo = mediaServerItemService.getOne(mediaServerId);
+        Disposable subscribe = eventBus.subscribe(
+            Subscription.of("media_play", DeferredResultHolder.getCmdCallBackTopic(DeferredResultHolder.CALLBACK_CMD_PLAY, msg), Subscription.Feature.local))
+            .mergeWith(Mono.justOrEmpty(redisCatchStorage.queryPlayByDevice(deviceId, channelId))
+                .flatMap(streamInfo -> {
+                    if (StrUtil.isEmpty(streamInfo.getStreamId())) {
+                        return Mono.error(new BusinessException("点播失败, redis缓存streamId等于null"));
+                    }
+                    String mediaServerId = streamInfo.getMediaServerId();
+                    MediaServerItem mediaInfo = mediaServerItemService.getOneByServerId(mediaServerId);
 
-                        JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaInfo, streamInfo.getStreamId());
-                        if (rtpInfo != null && rtpInfo.getBool("exist")) {
-                            if (hookEvent != null) {
-                                //todo
+                    JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaInfo, streamInfo.getStreamId());
+                    if (rtpInfo != null && rtpInfo.getBool("exist")) {
+                        if (hookEvent != null) {
+                            //todo
 //                        hookEvent.response(mediaServerItem, com.alibaba.fastjson.JSONObject.parseObject(JSON.toJSONString(streamInfo)));
-                            }
-                            clusterEventBus.publish(DeferredResultHolder.getTopicByDeviceIdAndChannelId(DeferredResultHolder.CALLBACK_CMD_PLAY, msg),
-                                ResponseEntity.ok(streamInfo));
-                        } else {
-                            // TODO 点播前是否重置状态
-                            redisCatchStorage.stopPlay(streamInfo);
-                            return deviceChannelService.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId())
-                                .flatMap(ignore->{
-                                    SSRCInfo ssrcInfo;
-                                    String streamId2 = null;
-                                    if (mediaServerItem.isRtpEnable()) {
-                                        streamId2 = String.format("%s_%s",deviceId, channelId);
-                                    }
-                                    ssrcInfo = mediaServerItemService.openRTPServer(mediaServerItem, streamId2);
-
-                                    return cmder.playStreamCmd(mediaServerItem, ssrcInfo, redisCatchStorage.getDevice(deviceId), channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
-                                        log.info("收到订阅消息: " + response.toString());
-                                        onPublishHandlerForPlay(mediaServerItemInuse, response, deviceId, channelId, msgId)
-                                            .subscribe();
-                                    }, (event) -> {
-                                        mediaServerItemService.closeRTPServer(playResult.getDevice(), channelId);
-                                        WVPResult wvpResult = WVPResult.of(HttpStatus.INTERNAL_SERVER_ERROR.value(), String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg),null);
-                                        clusterEventBus.publish(DeferredResultHolder.getTopicByDeviceIdAndChannelId(DeferredResultHolder.CALLBACK_CMD_PLAY, msg),
-                                            ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(wvpResult));
-                                    })
-                                        .then();
+                        }
+                        clusterEventBus.publish(DeferredResultHolder.getCmdCallBackTopic(DeferredResultHolder.CALLBACK_CMD_PLAY, msg),
+                            streamInfo);
+                    } else {
+                        // TODO 点播前是否重置状态
+                        redisCatchStorage.stopPlay(streamInfo);
+                        return deviceChannelService.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId())
+                            .flatMap(ignore -> {
+                                SSRCInfo ssrcInfo;
+                                String streamId2 = null;
+                                if (mediaServerItem.isRtpEnable()) {
+                                    streamId2 = String.format("%s_%s", deviceId, channelId);
+                                }
+                                ssrcInfo = mediaServerItemService.openRTPServer(mediaServerItem, streamId2);
+
+                                return cmder.playStreamCmd(mediaServerItem, ssrcInfo, redisCatchStorage.getDevice(deviceId), channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
+                                    log.info("收到订阅消息: " + response.toString());
+                                    onPublishHandlerForPlay(mediaServerItemInuse, response, deviceId, channelId, msgId)
+                                        .subscribe();
+                                }, (event) -> {
+                                    mediaServerItemService.closeRTPServer(playResult.getDevice(), channelId);
+                                    WVPResult wvpResult = WVPResult.of(HttpStatus.INTERNAL_SERVER_ERROR.value(), String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
+                                    clusterEventBus.publish(DeferredResultHolder.getCmdCallBackTopic(DeferredResultHolder.CALLBACK_CMD_PLAY, msg),
+                                        wvpResult);
                                 });
+                            });
+                    }
+                    return Mono.empty();
+                })
+                .switchIfEmpty(Mono.just(mediaServerItem)
+                    .flatMap(serverItem -> {
+                        SSRCInfo ssrcInfo;
+                        String streamId = null;
+                        if (serverItem.isRtpEnable()) {
+                            streamId = String.format("%s_%s", deviceId, channelId);
                         }
-                        return Mono.empty();
-                    })
-                    .switchIfEmpty(Mono.just(mediaServerItem)
-                        .flatMap(serverItem->{
-                            SSRCInfo ssrcInfo;
-                            String streamId = null;
-                            if (serverItem.isRtpEnable()) {
-                                streamId = String.format("%s_%s", deviceId, channelId);
-                            }
 
-                            ssrcInfo = mediaServerItemService.openRTPServer(serverItem, streamId);
+                        ssrcInfo = mediaServerItemService.openRTPServer(serverItem, streamId);
 
-                            // 发送点播消息
-                            MediaDevice device = redisCatchStorage.getDevice(deviceId);
-                            return cmder.playStreamCmd(serverItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {
-                                log.info("收到订阅消息: " + response.toString());
-                                onPublishHandlerForPlay(mediaServerItemInUse, response, deviceId, channelId, msgId).subscribe();
-                                if (hookEvent != null) {
+                        // 发送点播消息
+                        MediaDevice device = redisCatchStorage.getDevice(deviceId);
+                        return cmder.playStreamCmd(serverItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {
+                            log.info("收到订阅消息: " + response.toString());
+                            onPublishHandlerForPlay(mediaServerItemInUse, response, deviceId, channelId, msgId).subscribe();
+                            if (hookEvent != null) {
 //                                    hookEvent.response(mediaServerItem, response);
-                                }
-                            }, (event) -> {
-                                WVPResult wvpResult = WVPResult.of(HttpStatus.INTERNAL_SERVER_ERROR.value(), String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
-                                // 点播返回sip错误
-                                mediaServerItemService.closeRTPServer(playResult.getDevice(), channelId);
-                                clusterEventBus.publish(DeferredResultHolder.getTopicByDeviceIdAndChannelId(DeferredResultHolder.CALLBACK_CMD_PLAY, msg),
-                                    ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(wvpResult));
-                                if (errorEvent != null) {
-                                    //todo
+                            }
+                        }, (event) -> {
+                            WVPResult wvpResult = WVPResult.of(HttpStatus.INTERNAL_SERVER_ERROR.value(), String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
+                            // 点播返回sip错误
+                            mediaServerItemService.closeRTPServer(playResult.getDevice(), channelId);
+                            clusterEventBus.publish(DeferredResultHolder.getCmdCallBackTopic(DeferredResultHolder.CALLBACK_CMD_PLAY, msg),
+                                wvpResult);
+                            if (errorEvent != null) {
+                                //todo
 //                    errorEvent.response(event);
-                                }
-                            })
-                                .then();
-                        })).then(Mono.empty()))
+                            }
+                        });
+                    })).then(Mono.empty()))
+            .map(topicPayload -> topicPayload.bodyToJson(false).toJavaObject(WVPResult.class))
+            //返回结果
+            .doOnNext(wvpResult -> {
+                try {
+                    result.put(wvpResult);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            })
+            .subscribe();
 
-            //超时报错 todo
-            .timeout(Duration.ofSeconds(10), Mono.error(new BusinessException("点播/收流超时,请稍候重试")))
-            //报错时,发送bye指令
-            .flatMap(payload -> Mono.just(payload.bodyToJson(true)))
+        return  Mono.justOrEmpty(result.pollFirst(60,TimeUnit.SECONDS))
+            .flatMap(wvpResult->{
+                //取消订阅
+                subscribe.dispose();
 
-            .cast(ResponseEntity.class)
-            //判断回复是否有效
-            .flatMap(response->{
-                if(response==null){
+                if(wvpResult==null){
                     return Mono.error(new BusinessException("服务器内部出现问题"));
                 }
-                if(response.getStatusCode()!=HttpStatus.OK){
-                    WVPResult wvpResult = (WVPResult)response.getBody();
-                    return Mono.error(new BusinessException(Optional.ofNullable(wvpResult.getMsg()).orElse("服务器内部出现问题")));
+                if(wvpResult.getCode()!=HttpStatus.OK.value()){
+                    return Mono.error(new BusinessException(Optional.ofNullable(wvpResult.getMsg()).orElse("点播失败")));
                 }
-                return Mono.just(response);
+                return Mono.just(wvpResult);
             })
-
             // 点播结束时调用截图接口
-            .doOnNext(response -> {
+            .doOnNext(wvpResult -> {
                 try {
                     String classPath = ResourceUtils.getURL("classpath:").getPath();
                     // System.out.println(classPath);
@@ -197,10 +194,9 @@ public class LocalPlayService  {
                         path = path.substring(1);
                     }
                     String fileName =  deviceId + "_" + channelId + ".jpg";
-                    WVPResult wvpResult = (WVPResult)response.getBody();
-                    if (Objects.requireNonNull(wvpResult).getCode() == 0) {
+                    if (Objects.requireNonNull(wvpResult).getCode() == HttpStatus.OK.value()) {
                         StreamInfo streamInfoForSuccess = (StreamInfo)wvpResult.getData();
-                        MediaServerItem mediaInfo = mediaServerItemService.getOne(streamInfoForSuccess.getMediaServerId());
+                        MediaServerItem mediaInfo = mediaServerItemService.getOneByServerId(streamInfoForSuccess.getMediaServerId());
                         String streamUrl = streamInfoForSuccess.getFmp4();
                         // 请求截图
                         log.info("[请求截图]: " + fileName);
@@ -211,32 +207,31 @@ public class LocalPlayService  {
                     throw new BusinessException("设备上传视频截图文件找不到");
                 }
             })
+            .switchIfEmpty(Mono.error(new BusinessException("点播超时")))
             //保证接下来的操作流仅被触发一次
-            .filter(ignore->!start.get())
-            .doOnNext(ignore->start.set(true))
-            .doOnError(BusinessException.class, e -> cmder.streamByeCmd(deviceId, channelId).subscribe())
-            .single();
+            .doOnError(BusinessException.class, e -> cmder.streamByeCmd(deviceId, channelId).subscribe());
     }
 
 
-    public Mono<Void> onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String msgId) {
+    private Mono<Void> onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String msgId) {
         RequestMessage msg = RequestMessage.of(deviceId, channelId, msgId, null);
         return onPublishHandler(mediaServerItem, resonse, deviceId, channelId, msgId)
+            .switchIfEmpty(Mono.fromRunnable(()->{
+                log.warn("设备预览API调用失败!");
+                clusterEventBus.publish(DeferredResultHolder.getCmdCallBackTopic(DeferredResultHolder.CALLBACK_CMD_PLAY, msg),
+                    WVPResult.of(HttpStatus.INTERNAL_SERVER_ERROR.value(),"设备预览API调用失败",null));
+            }))
             .flatMap(streamInfo ->
                 deviceChannelService.startPlay(deviceId, channelId, streamInfo.getStreamId())
-                    .concatWith(Mono.fromRunnable(()->{
+                    .mergeWith(Mono.fromRunnable(()->{
                         redisCatchStorage.startPlay(streamInfo);
                         WVPResult<StreamInfo> wvpResult = WVPResult.of(HttpStatus.OK.value(), "success", streamInfo);
-                        clusterEventBus.publish(DeferredResultHolder.getTopicByDeviceIdAndChannelId(DeferredResultHolder.CALLBACK_CMD_PLAY, msg),
-                            ResponseEntity.ok(wvpResult));
+                        clusterEventBus.publish(DeferredResultHolder.getCmdCallBackTopic(DeferredResultHolder.CALLBACK_CMD_PLAY, msg),
+                            wvpResult);
                     }))
                     .then()
-            )
-            .switchIfEmpty(Mono.fromRunnable(()->{
-                log.warn("设备预览API调用失败!");
-                clusterEventBus.publish(DeferredResultHolder.getTopicByDeviceIdAndChannelId(DeferredResultHolder.CALLBACK_CMD_PLAY, msg),
-                    ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(WVPResult.of(HttpStatus.INTERNAL_SERVER_ERROR.value(),"设备预览API调用失败",null)));
-            }));
+            );
+
     }
 
 

+ 1 - 1
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/impl/InviteRequestProcessor.java

@@ -170,7 +170,7 @@ public class InviteRequestProcessor extends SipRequestProcessorParent {
 //					responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 通道存在,发181,呼叫转接中
 //				}else if(channel == null && gbStream != null){
 //					String mediaServerId = gbStream.getMediaServerId();
-//					mediaServerItem = mediaServerService.getOne(mediaServerId);
+//					mediaServerItem = mediaServerService.getOneByServerId(mediaServerId);
 //					if (mediaServerItem == null) {
 //						log.info("[ app={}, stream={} ]找不到zlm {},返回410",gbStream.getApp(), gbStream.getStream(), mediaServerId);
 //						responseAck(evt, Response.GONE, "media server not found");

+ 9 - 2
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/impl/RegisterRequestProcessor.java

@@ -10,6 +10,7 @@ import org.jetlinks.community.media.bean.WvpSipDate;
 import org.jetlinks.community.media.enums.StreamMode;
 import org.jetlinks.community.media.sip.SipContext;
 import org.jetlinks.community.media.sip.SipRequestProcessorParent;
+import org.jetlinks.community.media.storage.impl.RedisCacheStorageImpl;
 import org.jetlinks.community.utils.SipUtils;
 import org.jetlinks.core.event.EventBus;
 import org.redisson.api.RedissonClient;
@@ -48,6 +49,9 @@ public class RegisterRequestProcessor extends SipRequestProcessorParent {
 //	private IVideoManagerStorager storager;
 
 
+    @Autowired
+    private RedisCacheStorageImpl redisCacheStorage;
+    @Autowired
     private EventBus eventBus;
 
     @Autowired
@@ -84,12 +88,15 @@ public class RegisterRequestProcessor extends SipRequestProcessorParent {
             //todo 获取设备信息
             SipServerConfig sipConfig = SipContext.getConfig();
 
-            MediaDevice device = (MediaDevice)
-                Optional.ofNullable( redissonClient.getBucket(SipUtils.keepAliveKey(deviceId)).get()).orElse(new MediaDevice());
+
+            MediaDevice device =
+                Optional.ofNullable(redisCacheStorage.getDevice(deviceId)).orElse(new MediaDevice());
             if(StrUtil.isEmpty(device.getId())){
                 //首次注册
                 device.setFirsRegister(true);
                 device.setStreamMode( StreamMode.UDP.name());
+            }else {
+                device.setFirsRegister(false);
             }
             device.setId(deviceId);
             //基于数字摘要的认证

+ 4 - 3
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/message/MessageRequestProcessor.java

@@ -66,7 +66,7 @@ public class MessageRequestProcessor extends SipRequestProcessorParent {
         String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
         CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME);
         MediaDevice device = redisCacheStorage.getDevice(deviceId);
-        // 查询设备是否存在
+        // 查询设备是否存在 todo
         Mono.just(new ParentPlatform())
             .doOnNext(platform->{
                 try {
@@ -85,12 +85,13 @@ public class MessageRequestProcessor extends SipRequestProcessorParent {
                             if(deviceProcessor.hasDownstreams()){
                                 deviceFluxSink.next(Tuple.of(evt,device,rootElement));
                             }
-                        }else { // 由于上面已经判断都为null则直接返回,所以这里device和parentPlatform必有一个不为null
+                        }else if(platform!=null){ // 由于上面已经判断都为null则直接返回,所以这里device和parentPlatform必有一个不为null
                             if(platProcessor.hasDownstreams()){
                                 platFluxSink.next(Tuple.of(evt,platform,rootElement));
                             }
+                        }else {
+                            responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE, "Unsupported message type, must Control/Notify/Query/Response");
                         }
-                        responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE, "Unsupported message type, must Control/Notify/Query/Response");
                     }
 
                 } catch (SipException e) {

+ 13 - 7
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/message/notify/CatalogResponseMessageProcessor.java

@@ -7,12 +7,18 @@ import org.jetlinks.community.media.bean.ParentPlatform;
 import org.jetlinks.community.media.contanst.CmdType;
 import org.jetlinks.community.media.entity.MediaDevice;
 import org.jetlinks.community.media.entity.MediaDeviceChannel;
+import org.jetlinks.community.media.gb28181.result.WVPResult;
 import org.jetlinks.community.media.service.LocalMediaDeviceChannelService;
 import org.jetlinks.community.media.service.LocalMediaDeviceService;
 import org.jetlinks.community.media.sip.request.message.MessageHandlerAbstract;
 import org.jetlinks.community.media.transmit.callback.DeferredResultHolder;
+import org.jetlinks.community.media.transmit.callback.RequestMessage;
 import org.jetlinks.community.utils.XmlUtil;
+import org.jetlinks.core.cluster.ClusterEventBus;
+import org.springframework.http.HttpStatus;
 import org.springframework.stereotype.Component;
+import reactor.core.publisher.Mono;
+
 import javax.sip.RequestEvent;
 import javax.sip.message.Response;
 import java.util.*;
@@ -31,6 +37,7 @@ public class CatalogResponseMessageProcessor extends MessageHandlerAbstract {
 
     private final LocalMediaDeviceService mediaDeviceService;
     private final LocalMediaDeviceChannelService deviceChannelService;
+    private final ClusterEventBus clusterEventBus;
     @Override
     public String getMethod() {
         return "Catalog";
@@ -43,7 +50,7 @@ public class CatalogResponseMessageProcessor extends MessageHandlerAbstract {
 
     @Override
     public void handleForDevice(RequestEvent evt, MediaDevice device, Element element) {
-        String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + device.getId();
+        String topic = DeferredResultHolder.CALLBACK_CMD_CATALOG + device.getId();
         Element rootElement = null;
         try {
             rootElement = getRootElement(evt, Optional.ofNullable(device.getCharset()).orElse("UTF-8"));
@@ -72,9 +79,11 @@ public class CatalogResponseMessageProcessor extends MessageHandlerAbstract {
                     channelList.add(deviceChannel);
                 }
                 //更新设备通道
-                deviceChannelService.createDelete()
-                    .where(MediaDeviceChannel::getDeviceId,device.getId())
-                    .execute()
+                Mono.fromRunnable(()->clusterEventBus.publish(topic,RequestMessage.of(device.getId(),null,null, WVPResult.of(HttpStatus.OK.value(),"", device))))
+                    .flatMap(__->
+                        deviceChannelService.createDelete()
+                            .where(MediaDeviceChannel::getDeviceId,device.getId())
+                            .execute())
                     .flatMap(__->deviceChannelService.save(channelList))
                     .subscribe();
 //                catalogDataCatch.put(key, sumNum, device, channelList);
@@ -83,9 +92,6 @@ public class CatalogResponseMessageProcessor extends MessageHandlerAbstract {
 //                    boolean resetChannelsResult = storager.resetChannels(device.getDeviceId(), catalogDataCatch.get(key));
 //                    RequestMessage msg = new RequestMessage();
 //                    msg.setKey(key);
-//                    WVPResult<Object> result = new WVPResult<>();
-//                    result.setCode(0);
-//                    result.setData(device);
 //                    if (resetChannelsResult) {
 //                        result.setMsg("更新成功,共" + sumNum + "条,已更新" + catalogDataCatch.get(key).size() + "条");
 //                    }else {

+ 1 - 3
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/message/notify/DeviceInfoResponseMessageProcessor.java

@@ -45,8 +45,6 @@ import java.util.Optional;
 @AllArgsConstructor
 public class DeviceInfoResponseMessageProcessor extends MessageHandlerAbstract {
 
-//    private final IVideoManagerStorager storager;
-
     private final LocalMediaDeviceService mediaDeviceService;
     @Override
     public String getMethod() {
@@ -66,7 +64,7 @@ public class DeviceInfoResponseMessageProcessor extends MessageHandlerAbstract {
         try {
             element = getRootElement(evt, Optional.ofNullable(device.getCharset()).orElse("utf-8"));
             Element deviceIdElement = element.element("DeviceID");
-            String channelId = deviceIdElement.getTextTrim();
+//            String channelId = deviceIdElement.getTextTrim();
 //            String key = DeferredResultHolder.CALLBACK_CMD_DEVICEINFO + device.getId() + channelId;
             device.setName(XmlUtil.getText(element, "DeviceName"));
 

+ 2 - 0
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/message/notify/NoSupportMessageHandler.java

@@ -28,6 +28,8 @@ public class NoSupportMessageHandler extends MessageHandlerAbstract {
     public boolean matchCmd(String requestCmd) {
         return !CmdType.KEEP_ALIVE.equals(requestCmd)
             && !CmdType.NOTIFY.equals(requestCmd)
+            &&!CmdType.DEVICE_INFO.equals(requestCmd)
+            &&!CmdType.CATALOG.equals(requestCmd)
             &&!CmdType.QUERY.equals(requestCmd)
             &&!CmdType.RESPONSE.equals(requestCmd);
     }

+ 16 - 13
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/storage/impl/RedisCacheStorageImpl.java

@@ -1,12 +1,15 @@
 package org.jetlinks.community.media.storage.impl;
 
 import cn.hutool.json.JSONObject;
+import cn.hutool.json.JSONUtil;
 import lombok.extern.slf4j.Slf4j;
 import org.jetlinks.community.media.bean.SendRtpItem;
 import org.jetlinks.community.media.bean.StreamInfo;
+import org.jetlinks.community.media.bean.ThirdPartyGB;
 import org.jetlinks.community.media.contanst.VideoManagerConstants;
 import org.jetlinks.community.media.entity.MediaDevice;
 import org.jetlinks.community.media.bean.ParentPlatform;
+import org.jetlinks.community.media.entity.MediaDeviceChannel;
 import org.jetlinks.community.media.zlm.dto.MediaItem;
 import org.jetlinks.community.media.zlm.entity.MediaServerItem;
 import org.jetlinks.community.utils.RedisUtil;
@@ -184,20 +187,20 @@ public class RedisCacheStorageImpl {
     }
 
 
-//    public boolean stopPlayback(StreamInfo streamInfo) {
+    public boolean stopPlayback(StreamInfo streamInfo) {
 //        if (streamInfo == null) return false;
-//        DeviceChannel deviceChannel = deviceChannelMapper.queryChannel(streamInfo.getDeviceID(), streamInfo.getChannelId());
+//        MediaDeviceChannel deviceChannel = deviceChannelMapper.queryChannel(streamInfo.getDeviceID(), streamInfo.getChannelId());
 //        if (deviceChannel != null) {
 //            deviceChannel.setStreamId(null);
 //            deviceChannel.setDeviceId(streamInfo.getDeviceID());
 //            deviceChannelMapper.update(deviceChannel);
 //        }
-//        return redis.del(String.format("%S_%s_%s_%s_%s", VideoManagerConstants.PLAY_BLACK_PREFIX,
-//            serverId,
-//                streamInfo.getStreamId(),
-//                streamInfo.getDeviceID(),
-//                streamInfo.getChannelId()));
-//    }
+        return redis.del(String.format("%S_%s_%s_%s_%s", VideoManagerConstants.PLAY_BLACK_PREFIX,
+            serverId,
+                streamInfo.getStreamId(),
+                streamInfo.getDeviceID(),
+                streamInfo.getChannelId()));
+    }
 
 
     public StreamInfo queryPlaybackByDevice(String deviceId, String code) {
@@ -419,11 +422,11 @@ public class RedisCacheStorageImpl {
     }
 
 
-//    public ThirdPartyGB queryMemberNoGBId(String queryKey) {
-//        String key = VideoManagerConstants.WVP_STREAM_GB_ID_PREFIX + queryKey;
-//        JSONObject jsonObject = (JSONObject)redis.get(key);
-//        return  JSONObject.toJavaObject(jsonObject, ThirdPartyGB.class);
-//    }
+    public ThirdPartyGB queryMemberNoGBId(String queryKey) {
+        String key = VideoManagerConstants.WVP_STREAM_GB_ID_PREFIX + queryKey;
+        JSONObject jsonObject = (JSONObject)redis.get(key);
+        return  JSONUtil.toBean(jsonObject, ThirdPartyGB.class);
+    }
 
 
     public void removeStream(String mediaServerId, String type) {

+ 1 - 1
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/transmit/callback/DeferredResultHolder.java

@@ -71,7 +71,7 @@ public class DeferredResultHolder {
      * @param requestMessage
      * @return
      */
-    public static String getTopicByDeviceIdAndChannelId(String key,RequestMessage requestMessage){
+    public static String getCmdCallBackTopic(String key, RequestMessage requestMessage){
         return String.format(matchTopics,key,requestMessage.getDeviceId(),requestMessage.getChannelId());
     }
 //    private Map<String, Map<String, EmitterProcessor<ResponseEntity>>> map = new ConcurrentHashMap<>();

+ 2 - 0
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/transmit/callback/RequestMessage.java

@@ -2,6 +2,7 @@ package org.jetlinks.community.media.transmit.callback;
 
 import lombok.AllArgsConstructor;
 import lombok.Data;
+import lombok.NoArgsConstructor;
 import org.hswebframework.web.logger.ReactiveLogger;
 
 import javax.persistence.Transient;
@@ -14,6 +15,7 @@ import java.io.Serializable;
  */
 @Data
 @AllArgsConstructor
+@NoArgsConstructor
 public class RequestMessage implements Serializable {
 
 	private String deviceId;

+ 123 - 103
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/transmit/cmd/SipCommander.java

@@ -1,4 +1,10 @@
 package org.jetlinks.community.media.transmit.cmd;
+import cn.hutool.json.JSONObject;
+import cn.hutool.json.JSONUtil;
+import gov.nist.javax.sip.SipProviderImpl;
+import gov.nist.javax.sip.SipStackImpl;
+import gov.nist.javax.sip.message.SIPRequest;
+import gov.nist.javax.sip.stack.SIPDialog;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.hswebframework.web.exception.BusinessException;
@@ -6,19 +12,30 @@ import org.jetlinks.community.media.bean.EventResult;
 import org.jetlinks.community.media.bean.SSRCInfo;
 import org.jetlinks.community.media.entity.MediaDevice;
 import org.jetlinks.community.media.bean.SipServerConfig;
+import org.jetlinks.community.media.gb28181.bean.SsrcTransaction;
 import org.jetlinks.community.media.gb28181.event.SipSubscribe;
+import org.jetlinks.community.media.service.LocalMediaServerItemService;
+import org.jetlinks.community.media.session.VideoStreamSessionManager;
 import org.jetlinks.community.media.sip.SipContext;
 import org.jetlinks.community.media.transmit.SIPRequestHeaderProvider;
 import org.jetlinks.community.media.zlm.ZLMHttpHookSubscribe;
 import org.jetlinks.community.media.zlm.entity.MediaServerItem;
+import org.jetlinks.community.utils.ZlmSubscribeTopic;
+import org.jetlinks.core.cluster.ClusterEventBus;
+import org.jetlinks.core.event.EventBus;
+import org.jetlinks.core.event.Subscription;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.*;
 
 import javax.sip.*;
+import javax.sip.address.SipURI;
 import javax.sip.header.CallIdHeader;
+import javax.sip.header.ViaHeader;
 import javax.sip.message.Request;
+import java.lang.reflect.Field;
 import java.text.ParseException;
 import java.time.Duration;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -36,33 +53,26 @@ public class SipCommander {
 
     private final SIPRequestHeaderProvider headerProvider;
 
-    public static final Map<String, FluxProcessor<EventResult,EventResult>> replyProcessor=new ConcurrentHashMap<>(1024);
+    private final ClusterEventBus clusterEventBus;
 
+    private final EventBus eventBus;
 
-    public Flux<EventResult> playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, MediaDevice device, String channelId,
-                                           ZLMHttpHookSubscribe.Event event, SipSubscribe.Event error) {
+    private final LocalMediaServerItemService mediaServerItemService;
+    private final VideoStreamSessionManager streamSessionManager;
+
+    public Mono<Void> playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, MediaDevice device, String channelId,
+                                    ZLMHttpHookSubscribe.Event event, SipSubscribe.Event sipEvent) {
         String streamId = ssrcInfo.getStreamId();
         SipServerConfig sipConfig = SipContext.getConfig();
         try {
             if (device == null){
-                return Flux.error(new BusinessException("设备不存在"));
+                return Mono.error(new BusinessException("设备不存在"));
             }
             String streamMode = device.getStreamMode().toUpperCase();
 
-//            log.info("{} 分配的ZLM为: {} [{}:{}]", streamId, mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort());
-//            // 添加订阅
-//            JSONObject subscribeKey = new JSONObject();
-//            subscribeKey.put("app", "rtp");
-//            subscribeKey.put("stream", streamId);
-//            subscribeKey.put("regist", true);
-//            subscribeKey.put("mediaServerId", mediaServerItem.getId());
-//            subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
-//                (MediaServerItem mediaServerItemInUse, JSONObject json)->{
-//                    if (userSetup.isWaitTrack() && json.getJSONArray("tracks") == null) return;
-//                    event.response(mediaServerItemInUse, json);
-//                    subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey);
-//                });
-//            //
+            log.info("{} 分配的ZLM为: {} [{}:{}]", streamId, mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort());
+
+
             StringBuffer content = new StringBuffer(200);
             content.append("v=0\r\n");
             content.append("o="+ sipConfig.getId()+" 0 0 IN IP4 "+ mediaServerItem.getSdpIp() +"\r\n");
@@ -150,22 +160,31 @@ public class SipCommander {
 
 
             Request request =  headerProvider.createInviteRequest(device, channelId, content.toString(), null, "FromInvt" + tm, null, ssrcInfo.getSsrc(), callIdHeader);
-
-            String finalStreamId = streamId;
-            return transmitRequest(sipProvider,request)
-                .flatMap(result->{
-                    if(result.getSuccess()){
-                        //消息处理成功 TODO
-                        return Mono.just(result);
-                    }else {
-                        //消息处理失败 todo
-                        return Mono.error(new BusinessException("请求失败,"+result.getMsg()));
+            String topic= ZlmSubscribeTopic.getOnStreamChanged("rtp",streamId,mediaServerItem.getServerId());
+
+            return  eventBus.subscribe(Subscription.of("play_stream_cmd"+streamId,topic, Subscription.Feature.local))
+                .flatMap(topicPayload -> Mono.just(topicPayload.bodyToJson(true)))
+                .flatMap(payload->{
+                    Boolean regist = payload.getBoolean("regist");
+                    if(Boolean.TRUE.equals(regist)){
+                        String mediaServerId = payload.getString("mediaServerId");
+                        return Mono.zip(Mono.just(mediaServerItemService.getOneByServerId(mediaServerId)),Mono.justOrEmpty(payload));
+                    }
+                    return Mono.empty();
+                })
+                .doOnNext(tp2->{
+                    try {
+                        event.accept(tp2.getT1(), JSONUtil.parseObj(tp2.getT2()));
+                    } catch (Exception e) {
+                        log.error("回调ZLM流改变函数失败,",e);
                     }
-                });
+                })
+                .mergeWith(transmitRequest(sipProvider,request,null).then(Mono.empty()))
+                .then();
 
 
         } catch ( SipException | ParseException | InvalidArgumentException e) {
-            return Flux.error(new BusinessException("服务器异常"));
+            return Mono.error(new BusinessException("服务器异常"));
         }
     }
 
@@ -173,77 +192,74 @@ public class SipCommander {
      * 视频流停止, 不使用回调 todo
      */
     public Mono<Void> streamByeCmd(String deviceId, String channelId) {
-        return Mono.empty() ;
+        return streamByeCmd(deviceId, channelId, null);
     }
 
     /**
      * 视频流停止
      */
-//    public void streamByeCmd(String deviceId, String channelId, SipSubscribe.Event okEvent) {
-//        try {
-//            ClientTransaction transaction = streamSession.getTransaction(deviceId, channelId);
-//            if (transaction == null) {
-//                logger.warn("[ {} -> {}]停止视频流的时候发现事务已丢失", deviceId, channelId);
-//                SipSubscribe.EventResult<Object> eventResult = new SipSubscribe.EventResult<>();
-//                if (okEvent != null) {
-//                    okEvent.response(eventResult);
-//                }
-//                return;
-//            }
-//            SIPDialog dialog = streamSession.getDialog(deviceId, channelId);
-//            if (dialog == null) {
-//                logger.warn("[ {} -> {}]停止视频流的时候发现对话已丢失", deviceId, channelId);
-//                return;
-//            }
-//            SipStack sipStack = udpSipProvider.getSipStack();
-//            SIPDialog sipDialog = ((SipStackImpl) sipStack).putDialog(dialog);
-//            if (dialog != sipDialog) {
-//                dialog = sipDialog;
-//            }else {
-//                dialog.setSipProvider(udpSipProvider);
-//                try {
-//                    Field sipStackField = SIPDialog.class.getDeclaredField("sipStack");
-//                    sipStackField.setAccessible(true);
-//                    sipStackField.set(dialog, sipStack);
-//                    Field eventListenersField = SIPDialog.class.getDeclaredField("eventListeners");
-//                    eventListenersField.setAccessible(true);
-//                    eventListenersField.set(dialog, new HashSet<>());
-//                } catch (NoSuchFieldException | IllegalAccessException e) {
-//                    e.printStackTrace();
-//                }
-//            }
-//
-//            Request byeRequest = dialog.createRequest(Request.BYE);
-//            SipURI byeURI = (SipURI) byeRequest.getRequestURI();
-//            SIPRequest request = (SIPRequest)transaction.getRequest();
-//            byeURI.setHost(request.getRemoteAddress().getHostName());
-//            byeURI.setPort(request.getRemotePort());
-//            ViaHeader viaHeader = (ViaHeader) byeRequest.getHeader(ViaHeader.NAME);
-//            String protocol = viaHeader.getTransport().toUpperCase();
-//            ClientTransaction clientTransaction = null;
-//            if("TCP".equals(protocol)) {
-//                clientTransaction = tcpSipProvider.getNewClientTransaction(byeRequest);
-//            } else if("UDP".equals(protocol)) {
-//                clientTransaction = udpSipProvider.getNewClientTransaction(byeRequest);
-//            }
-//
+    public Mono<Void> streamByeCmd(String deviceId, String channelId, SipSubscribe.Event okEvent) {
+        try {
+            ClientTransaction transaction = streamSessionManager.getTransaction(deviceId, channelId);
+            if (transaction == null) {
+                log.warn("[ {} -> {}]停止视频流的时候发现事务已丢失", deviceId, channelId);
+                SipSubscribe.EventResult<Object> eventResult = new SipSubscribe.EventResult<>();
+                if (okEvent != null) {
+                    okEvent.accept(eventResult);
+                }
+                return Mono.empty();
+            }
+            SIPDialog dialog = streamSessionManager.getDialog(deviceId, channelId);
+            if (dialog == null) {
+                log.warn("[ {} -> {}]停止视频流的时候发现对话已丢失", deviceId, channelId);
+                return Mono.empty();
+            }
+            SipStack sipStack = SipContext.getSipStack();
+            SipProvider sipProvider = SipContext.getSipProvider();
+            SIPDialog sipDialog = ((SipStackImpl) sipStack).putDialog(dialog);
+            if (dialog != sipDialog) {
+                dialog = sipDialog;
+            }else {
+                dialog.setSipProvider((SipProviderImpl) sipProvider);
+                try {
+                    Field sipStackField = SIPDialog.class.getDeclaredField("sipStack");
+                    sipStackField.setAccessible(true);
+                    sipStackField.set(dialog, sipStack);
+                    Field eventListenersField = SIPDialog.class.getDeclaredField("eventListeners");
+                    eventListenersField.setAccessible(true);
+                    eventListenersField.set(dialog, new HashSet<>());
+                } catch (NoSuchFieldException | IllegalAccessException e) {
+                    e.printStackTrace();
+                }
+            }
+
+            Request byeRequest = dialog.createRequest(Request.BYE);
+            SipURI byeURI = (SipURI) byeRequest.getRequestURI();
+            SIPRequest request = (SIPRequest)transaction.getRequest();
+            byeURI.setHost(request.getRemoteAddress().getHostName());
+            byeURI.setPort(request.getRemotePort());
+            ClientTransaction clientTransaction = sipProvider.getNewClientTransaction(byeRequest);
+
+
 //            CallIdHeader callIdHeader = (CallIdHeader) byeRequest.getHeader(CallIdHeader.NAME);
+            //todo
 //            if (okEvent != null) {
 //                sipSubscribe.addOkSubscribe(callIdHeader.getCallId(), okEvent);
 //            }
-//
-//            dialog.sendRequest(clientTransaction);
-//
-//            SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(deviceId, channelId);
-//            if (ssrcTransaction != null) {
-//                MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransaction.getMediaServerId());
-//                mediaServerService.releaseSsrc(mediaServerItem, ssrcTransaction.getSsrc());
-//                streamSession.remove(deviceId, channelId);
-//            }
-//        } catch (SipException | ParseException e) {
-//            e.printStackTrace();
-//        }
-//    }
+
+            dialog.sendRequest(clientTransaction);
+
+            return Mono.justOrEmpty( streamSessionManager.getSsrcTransaction(deviceId, channelId))
+                .doOnNext(ssrcTransaction -> {
+                    MediaServerItem mediaServerItem = mediaServerItemService.getOneByServerId(ssrcTransaction.getMediaServerId());
+                    mediaServerItemService.releaseSsrc(mediaServerItem, ssrcTransaction.getSsrc());
+                    streamSessionManager.remove(deviceId, channelId);
+                })
+                .then();
+        } catch (SipException | ParseException e) {
+            return Mono.error(new BusinessException("SIP设备断开连接发生错误,",e));
+        }
+    }
 
 
     /**
@@ -270,7 +286,7 @@ public class SipCommander {
 
             Request request = headerProvider.createMessageRequest(device, catalogXml.toString(), "z9hG4bK-ViaDeviceInfo-" + tm, "FromDev" + tm, null, callIdHeader);
 
-            return transmitRequest(sipProvider, request).then();
+            return transmitRequest(sipProvider, request,null);
 
         } catch (SipException | ParseException | InvalidArgumentException e) {
             return Mono.error(new RuntimeException("查询媒体设备出错,",e));
@@ -302,26 +318,30 @@ public class SipCommander {
             Request request = headerProvider.createMessageRequest(device, catalogXml.toString(), "z9hG4bK-ViaCatalog-" + tm, "FromCat" + tm, null, callIdHeader);
 
             //todo 失败处理
-            return transmitRequest(sipProvider, request).then();
+            return transmitRequest(sipProvider, request,null);
         } catch (SipException | ParseException | InvalidArgumentException e) {
             return Mono.error(new RuntimeException("查询设备目录列表失败,",e));
         }
     }
 
-    private Flux<EventResult> transmitRequest(SipProvider sipProvider, Request request) throws SipException {
+    private Mono<Void> transmitRequest(SipProvider sipProvider, Request request, SipSubscribe.Event event) throws SipException {
         ClientTransaction clientTransaction = sipProvider.getNewClientTransaction(request);
         CallIdHeader callIdHeader = (CallIdHeader)request.getHeader(CallIdHeader.NAME);
         clientTransaction.sendRequest();
-        return  handleReply(callIdHeader.getCallId(),Duration.ofSeconds(10));
+        return  handleReply(callIdHeader.getCallId(),Duration.ofSeconds(10),event);
     }
 
-    private Flux<EventResult> handleReply(String callId, Duration timeout) {
-        return replyProcessor
-            .computeIfAbsent(callId, ignore -> UnicastProcessor.create())
-            .timeout(timeout, Mono.error(() -> new BusinessException("设备响应超时")))
-            .doFinally(signal -> {
-                replyProcessor.remove(callId);
-            });
+    private Mono<Void> handleReply(String callId, Duration timeout,SipSubscribe.Event event) {
+        return eventBus.subscribe(Subscription.of("sip_reply",callId, Subscription.Feature.local))
+            .single()
+//            .timeout(timeout, Mono.error(() -> new BusinessException("设备响应超时")))
+            .flatMap(payload -> Mono.just(payload.bodyToJson(true)))
+            .filter(ignore->event!=null)
+            .doOnNext(jsonObject -> {
+                SipSubscribe.EventResult eventResult = jsonObject.toJavaObject(SipSubscribe.EventResult.class);
+                event.accept(eventResult);
+            })
+            .then();
     }
 
 }

+ 136 - 81
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/ZLMHttpHookListener.java

@@ -1,22 +1,33 @@
 package org.jetlinks.community.media.zlm;
 
+import cn.hutool.core.bean.BeanUtil;
+import cn.hutool.core.util.StrUtil;
 import cn.hutool.json.JSONObject;
 import cn.hutool.json.JSONUtil;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.hswebframework.web.authorization.annotation.Authorize;
+import org.jetlinks.community.media.contanst.VideoManagerConstants;
+import org.jetlinks.community.media.entity.GbStream;
 import org.jetlinks.community.media.bean.StreamInfo;
+import org.jetlinks.community.media.entity.MediaDeviceChannel;
+import org.jetlinks.community.media.service.LocalGbStreamService;
+import org.jetlinks.community.media.service.LocalMediaDeviceChannelService;
 import org.jetlinks.community.media.service.LocalMediaServerItemService;
 import org.jetlinks.community.media.storage.impl.RedisCacheStorageImpl;
 import org.jetlinks.community.media.zlm.dto.MediaItem;
-import org.jetlinks.community.media.zlm.entity.MediaServerItem;
+import org.jetlinks.community.media.zlm.dto.StreamPushItem;
 import org.jetlinks.community.media.zlm.dto.OriginType;
+import org.jetlinks.community.utils.ZlmSubscribeTopic;
+import org.jetlinks.core.cluster.ClusterEventBus;
 import org.jetlinks.core.event.EventBus;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
 import org.springframework.web.bind.annotation.*;
 import org.springframework.web.server.ServerWebExchange;
 import reactor.core.publisher.Mono;
+
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -34,6 +45,11 @@ public class ZLMHttpHookListener {
     private final RedisCacheStorageImpl redisCatchStorage;
     private final ZLMHttpHookSubscribe subscribe;
     private final EventBus eventBus;
+    private final LocalMediaDeviceChannelService channelService;
+    private final ZLMMediaListManager zlmMediaListManager;
+    private final LocalGbStreamService gbStreamService;
+    private final ClusterEventBus clusterEventBus;
+    private final String serverId="";
     /**
      * 服务器定时上报时间,上报间隔可配置,默认10s上报一次
      *
@@ -45,16 +61,10 @@ public class ZLMHttpHookListener {
         if (log.isDebugEnabled()) {
             log.debug("[ ZLM HOOK ]on_server_keepalive API调用,参数:" + json.toString());
         }
-//		List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(ZLMHttpHookSubscribe.HookType.on_server_keepalive);
-//		if (subscribes != null  && subscribes.size() > 0) {
-//			for (ZLMHttpHookSubscribe.Event subscribe : subscribes) {
-//				subscribe.response(null, json);
-//			}
-//		}
 
-        JSONObject ret = new JSONObject();
-        ret.put("code", 0);
-        ret.put("msg", "success");
+        JSONObject ret = new JSONObject()
+            .append("code", 0)
+            .append("msg", "success");
         return eventBus.
             publish(ZLMHttpHookSubscribe.HookType.on_server_keepalive.name(),json)
             .thenReturn(ResponseEntity.ok(ret.toString()));
@@ -112,7 +122,7 @@ public class ZLMHttpHookListener {
 //		String mediaServerId = json.getString("mediaServerId");
 //		ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_play, json);
 //		if (subscribe != null ) {
-//			MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
+//			MediaServerItem mediaInfo = mediaServerService.getOneByServerId(mediaServerId);
 //			if (mediaInfo != null) {
 //				subscribe.response(mediaInfo, json);
 //			}
@@ -123,25 +133,25 @@ public class ZLMHttpHookListener {
 //		ret.put("msg", "success");
 //		return new ResponseEntity<String>(ret.toString(), HttpStatus.OK);
 //	}
-//
-//	/**
-//	 * rtsp/rtmp/rtp推流鉴权事件。
-//	 *
-//	 */
-//	@ResponseBody
-//	@PostMapping(value = "/on_publish", produces = "application/json;charset=UTF-8")
-//	public ResponseEntity<String> onPublish(@RequestBody JSONObject json) {
-//
-//		logger.debug("[ ZLM HOOK ]on_publish API调用,参数:" + json.toString());
-//		JSONObject ret = new JSONObject();
-//		ret.put("code", 0);
-//		ret.put("msg", "success");
-//		ret.put("enableHls", true);
+
+    /**
+     * rtsp/rtmp/rtp推流鉴权事件。
+     *
+     */
+    @ResponseBody
+    @PostMapping(value = "/on_publish", produces = "application/json;charset=UTF-8")
+    public ResponseEntity<String> onPublish(@RequestBody JSONObject json) {
+
+        log.debug("[ ZLM HOOK ]on_publish API调用,参数:" + json.toString());
+        JSONObject ret = new JSONObject();
+        ret.put("code", 0);
+        ret.put("msg", "success");
+        ret.put("enableHls", true);
 //		ret.put("enableMP4", userSetup.isRecordPushLive());
 //		String mediaServerId = json.getString("mediaServerId");
 //		ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_publish, json);
 //		if (subscribe != null) {
-//			MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
+//			MediaServerItem mediaInfo = mediaServerService.getOneByServerId(mediaServerId);
 //			if (mediaInfo != null) {
 //				subscribe.response(mediaInfo, json);
 //			}else {
@@ -159,9 +169,9 @@ public class ZLMHttpHookListener {
 //		}else {
 //			ret.put("enableMP4", userSetup.isRecordPushLive());
 //		}
-//
-//		return new ResponseEntity<String>(ret.toString(), HttpStatus.OK);
-//	}
+
+        return new ResponseEntity<String>(ret.toString(), HttpStatus.OK);
+    }
 //
 //	/**
 //	 * 录制mp4完成后通知事件;此事件对回复不敏感。
@@ -236,7 +246,7 @@ public class ZLMHttpHookListener {
 //		String mediaServerId = json.getString("mediaServerId");
 //		ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_shell_login, json);
 //		if (subscribe != null ) {
-//			MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
+//			MediaServerItem mediaInfo = mediaServerService.getOneByServerId(mediaServerId);
 //			if (mediaInfo != null) {
 //				subscribe.response(mediaInfo, json);
 //			}
@@ -248,91 +258,136 @@ public class ZLMHttpHookListener {
 //		ret.put("msg", "success");
 //		return new ResponseEntity<String>(ret.toString(), HttpStatus.OK);
 //	}
-//
+
     /**
      * rtsp/rtmp流注册或注销时触发此事件;此事件对回复不敏感。
      *
      */
-//	@ResponseBody
-    @PostMapping(value = "/on_stream_changed", produces = "application/json;charset=UTF-8")
+    @PostMapping(value = "/on_stream_changed")
     public Mono<ResponseEntity<String>> onStreamChanged(@RequestBody MediaItem item){
         if (log.isDebugEnabled()) {
             log.debug("[ ZLM HOOK ]on_stream_changed API调用,参数:" + JSONUtil.toJsonStr(item));
         }
         String mediaServerId = item.getMediaServerId();
-        JSONObject json = JSONUtil.parseObj(item);
-        Mono<ResponseEntity<String>> result=null;
-        //todo 这里订阅/发布采用eventBus 模式进行
-//        ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, json);
-//        if (subscribe != null ) {
-//            MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
-//            if (mediaInfo != null) {
-//                subscribe.response(mediaInfo, json);
-//            }
-//        }
-        // 流消失移除redis play
         //获取流应用名
         String app = item.getApp();
         //获取流ID
         String streamId = item.getStream();
         //rtsp或rtmp
         String schema = item.getSchema();
-        //音频轨道
-        List<MediaItem.MediaTrack> tracks = item.getTracks();
+        JSONObject json = JSONUtil.parseObj(item);
+        //todo 这里订阅/发布采用eventBus 模式进行
+        //on_stream_changed+{mediaServerId}+{streamId}+app
+        Mono<Long> result = Mono.fromRunnable(()->clusterEventBus.publish(ZlmSubscribeTopic.getOnStreamChanged(app, streamId, mediaServerId), json)).thenReturn(1L);
+        // 流消失移除redis play
+
+
         boolean regist = item.isRegist();
         if ("rtmp".equals(schema)){
             log.info("on_stream_changed:注册(true)/注销(false)->{}, app->{}, stream->{}", regist, app, streamId);
             if (regist) {
-                result=mediaServerItemService.addCount(mediaServerId).then(Mono.empty());
+                result=result.flatMap(__->mediaServerItemService.addCount(mediaServerId).thenReturn(1L));
             }else {
-                result=mediaServerItemService.removeCount(mediaServerId).then(Mono.empty());
+                result=result.flatMap(__->mediaServerItemService.removeCount(mediaServerId).thenReturn(1L));
+            }
+            if (item.getOriginType() == OriginType.PULL.ordinal()
+                || item.getOriginType() == OriginType.FFMPEG_PULL.ordinal()) {
+                // 设置拉流代理上线/离线 todo
+//                streamProxyService.updateStatus(regist, app, streamId);
             }
+            //播放结束
             if ("rtp".equals(app) && !regist ) {
                 StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId);
                 if (streamInfo!=null){
                     redisCatchStorage.stopPlay(streamInfo);
-//                    storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
+                    String deviceId = streamInfo.getDeviceID();
+                    String channelId = streamInfo.getChannelId();
+                    //终止播放
+                    result=result.flatMap(__->channelService.createUpdate()
+                        .set(MediaDeviceChannel::getStreamId,null)
+                        .where(MediaDeviceChannel::getDeviceId,deviceId)
+                        .where(MediaDeviceChannel::getChannelId,channelId)
+                        .execute()
+                        .thenReturn(1L));
                 }else{
                     streamInfo = redisCatchStorage.queryPlaybackByStreamId(streamId);
-//                    redisCatchStorage.stopPlayback(streamInfo);
+                    final StreamInfo paramInfo = BeanUtil.toBean(streamInfo, StreamInfo.class);
+                    result=result.flatMap(__->channelService.stopPlayback(paramInfo).thenReturn(1L));
                 }
             }else {
                 if (!"rtp".equals(app)){
-                    String type = OriginType.values()[item.getOriginType()].getType();
-//                    MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
-                    MediaServerItem mediaServerItem =null;
-                    if (mediaServerItem != null){
-                        if (regist) {
-                            redisCatchStorage.addStream(mediaServerItem, type, app, streamId, item);
-                            if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal()
-                                || item.getOriginType() == OriginType.RTMP_PUSH.ordinal()
-                                || item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) {
-//                                zlmMediaListManager.addPush(item);
-                            }
-                        }else {
-                            // 兼容流注销时类型从redis记录获取
-                            MediaItem mediaItem = redisCatchStorage.getStreamInfo(app, streamId, mediaServerId);
-                            type = OriginType.values()[mediaItem.getOriginType()].getType();
-//                            zlmMediaListManager.removeMedia(app, streamId);
-                            redisCatchStorage.removeStream(mediaServerItem.getId(), type, app, streamId);
-                        }
-                        // 发送流变化redis消息
-                        JSONObject jsonObject = new JSONObject()
-                            .append("serverId", mediaServerId)
-                            .append("app", app)
-                            .append("stream", streamId)
-                            .append("register", regist)
-                            .append("mediaServerId", mediaServerId);
-                        redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
-                    }
+                    result=result.flatMap(ingore->
+                        mediaServerItemService.findById(mediaServerId)
+                            .doOnNext(mediaServerItem -> {
+                                String type = OriginType.values()[item.getOriginType()].getType();
+                                if (regist) {
+                                    redisCatchStorage.addStream(mediaServerItem, type, app, streamId, item);
+                                }else {
+                                    redisCatchStorage.removeStream(mediaServerItem.getId(), type, app, streamId);
+                                }
+                            })
+                            .flatMap(mediaServerItem -> {
+                                //注册
+                                String type = OriginType.values()[item.getOriginType()].getType();
+                                Mono<Void> mono=null;
+
+                                if (regist) {
+                                    StreamPushItem streamPushItem = null;
+                                    if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal()
+                                        || item.getOriginType() == OriginType.RTMP_PUSH.ordinal()
+                                        || item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) {
+                                        mono= zlmMediaListManager
+                                            .addPush(item)
+                                            .flatMap(streamPush->{
+                                                if(StrUtil.isEmpty(streamPush.getGbId())){
+                                                    return gbStreamService.getByAppAndStreamId(app,streamId);
+                                                }
+                                                return Mono.just(streamPush);
+                                            })
+                                            .switchIfEmpty(gbStreamService.getByAppAndStreamId(app,streamId))
+                                            .then();
+                                        //todo 新的流信息发布
+//                                    .doOnNext(streamPush->           eventPublisher.catalogEventPublishForStream(null, gbStreams, CatalogEvent.ON))
+//                                    ;
+                                    }
+
+                                }else {
+                                    // 兼容流注销时类型从redis记录获取
+                                    MediaItem mediaItem = redisCatchStorage.getStreamInfo(app, streamId, mediaServerId);
+                                    if (mediaItem != null) {
+                                        type = OriginType.values()[mediaItem.getOriginType()].getType();
+                                        redisCatchStorage.removeStream(mediaServerItem.getId(), type, app, streamId);
+                                    }
+                                    mono= gbStreamService.getByAppAndStreamId(app,streamId).then();
+                                    //todo
+
+//                                    .doOnNext(gbStream -> eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF))
+//                                zlmMediaListManager.removeMedia(app, streamId);
+                                }
+                                if (type != null) {
+                                    // 发送流变化redis消息
+                                    JSONObject jsonObject = new JSONObject()
+                                        .append("serverId", serverId)
+                                        .append("app", app)
+                                        .append("stream", streamId)
+                                        .append("register", regist)
+                                        .append("mediaServerId", mediaServerId);
+                                    if(mono!=null){
+                                        String originType=type;
+                                        mono=mono.doOnNext(___->clusterEventBus.publish(VideoManagerConstants.WVP_SERVER_STREAM_PREFIX+"_"+originType,jsonObject));
+                                    }
+                                }
+                                return mono;
+                            })
+                            .thenReturn(1L));
                 }
             }
         }
         JSONObject ret = new JSONObject()
             .append("code", 0)
             .append("msg", "success");
-        return result==null?Mono.just(ResponseEntity.ok(ret.toString())):result.thenReturn(ResponseEntity.ok(ret.toString()));
-//        return Mono.just(ResponseEntity.ok(ret.toString()));
+        return       result
+            .thenReturn(ResponseEntity.ok(ret.toString()));
     }
 //
 //	/**
@@ -376,7 +431,7 @@ public class ZLMHttpHookListener {
 //					}
 //				}
 //			}
-//			MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
+//			MediaServerItem mediaServerItem = mediaServerService.getOneByServerId(mediaServerId);
 //			if (mediaServerItem != null && "-1".equals(mediaServerItem.getStreamNoneReaderDelayMS())) {
 //				ret.put("close", false);
 //			}
@@ -406,7 +461,7 @@ public class ZLMHttpHookListener {
 //			logger.debug("[ ZLM HOOK ]on_stream_not_found API调用,参数:" + json.toString());
 //		}
 //		String mediaServerId = json.getString("mediaServerId");
-//		MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
+//		MediaServerItem mediaInfo = mediaServerService.getOneByServerId(mediaServerId);
 //		if (userSetup.isAutoApplyPlay() && mediaInfo != null) {
 //			String app = json.getString("app");
 //			String streamId = json.getString("stream");

+ 136 - 112
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/ZLMMediaListManager.java

@@ -1,42 +1,40 @@
-//package org.jetlinks.community.media.zlm;
-//
-//import com.alibaba.fastjson.JSONObject;
-//import com.genersoft.iot.vmp.conf.UserSetup;
-//import com.genersoft.iot.vmp.gb28181.bean.GbStream;
-//import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
-//import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
-//import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
-//import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
-//import com.genersoft.iot.vmp.service.IStreamProxyService;
-//import com.genersoft.iot.vmp.service.IStreamPushService;
-//import com.genersoft.iot.vmp.service.bean.ThirdPartyGB;
-//import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
-//import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
-//import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
-//import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper;
-//import com.genersoft.iot.vmp.storager.dao.StreamPushMapper;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
-//import org.springframework.beans.factory.annotation.Autowired;
-//import org.springframework.stereotype.Component;
-//import org.springframework.util.StringUtils;
-//
-//import java.util.HashMap;
-//import java.util.List;
-//import java.util.Map;
-//import java.util.regex.Matcher;
-//import java.util.regex.Pattern;
-//
-//@Component
-//public class ZLMMediaListManager {
-//
-//    private Logger logger = LoggerFactory.getLogger("ZLMMediaListManager");
-//
-//    @Autowired
-//    private ZLMRESTfulUtils zlmresTfulUtils;
-//
+package org.jetlinks.community.media.zlm;
+
+import cn.hutool.core.util.StrUtil;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.jetlinks.community.media.entity.GbStream;
+import org.jetlinks.community.media.bean.ThirdPartyGB;
+import org.jetlinks.community.media.config.UserSetup;
+import org.jetlinks.community.media.service.LocalGbStreamService;
+import org.jetlinks.community.media.storage.impl.RedisCacheStorageImpl;
+import org.jetlinks.community.media.zlm.dto.MediaItem;
+import org.jetlinks.community.media.zlm.dto.StreamProxyItem;
+import org.jetlinks.community.media.zlm.dto.StreamPushItem;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler;
+import reactor.core.scheduler.Schedulers;
+
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+@Component
+@Slf4j
+@AllArgsConstructor
+public class ZLMMediaListManager {
+    //
+
+    private final ZLMRESTfulUtils zlmresTfulUtils;
+
+    private final UserSetup userSetup;
+    //
 //    @Autowired
-//    private IRedisCatchStorage redisCatchStorage;
+    private final RedisCacheStorageImpl redisCatchStorage;
+
+    private final LocalGbStreamService gbStreamService;
 //
 //    @Autowired
 //    private IVideoManagerStorager storager;
@@ -60,7 +58,7 @@
 //    private ZLMHttpHookSubscribe subscribe;
 //
 //    @Autowired
-//    private UserSetup userSetup;
+
 //
 //
 //    public void updateMediaList(MediaServerItem mediaServerItem) {
@@ -106,47 +104,52 @@
 //        //使用异步更新推流
 //        updateMedia(mediaServerItem, app, streamId);
 //    }
-//
-//    public void addPush(MediaItem mediaItem) {
-//        // 查找此直播流是否存在redis预设gbId
-//        StreamPushItem transform = streamPushService.transform(mediaItem);
-//        // 从streamId取出查询关键值
-//        Pattern pattern = Pattern.compile(userSetup.getThirdPartyGBIdReg());
-//        Matcher matcher = pattern.matcher(mediaItem.getStream());// 指定要匹配的字符串
-//        String queryKey = null;
-//        if (matcher.find()) { //此处find()每次被调用后,会偏移到下一个匹配
-//            queryKey = matcher.group();
-//        }
-//        if (queryKey != null) {
-//            ThirdPartyGB thirdPartyGB = redisCatchStorage.queryMemberNoGBId(queryKey);
-//            if (thirdPartyGB != null && !StringUtils.isEmpty(thirdPartyGB.getNationalStandardNo())) {
-//                transform.setGbId(thirdPartyGB.getNationalStandardNo());
-//                transform.setName(thirdPartyGB.getName());
-//            }
-//        }
+
+    public Mono<StreamPushItem> addPush(MediaItem mediaItem) {
+        // 查找此直播流是否存在redis预设gbId
+        StreamPushItem transform = transform(mediaItem);
+        // 从streamId取出查询关键值
+        Pattern pattern = Pattern.compile(userSetup.getThirdPartyGBIdReg());
+        Matcher matcher = pattern.matcher(mediaItem.getStream());// 指定要匹配的字符串
+        String queryKey = null;
+        if (matcher.find()) { //此处find()每次被调用后,会偏移到下一个匹配
+            queryKey = matcher.group();
+        }
+        if (queryKey != null) {
+            ThirdPartyGB thirdPartyGB = redisCatchStorage.queryMemberNoGBId(queryKey);
+            if (thirdPartyGB != null && !StrUtil.isEmpty(thirdPartyGB.getNationalStandardNo())) {
+                transform.setGbId(thirdPartyGB.getNationalStandardNo());
+                transform.setName(thirdPartyGB.getName());
+            }
+        }
+        //todo 更新媒体流
 //        storager.updateMedia(transform);
-//        if (!StringUtils.isEmpty(transform.getGbId())) {
-//            // 如果这个国标ID已经给了其他推流且流已离线,则移除其他推流
-//            List<GbStream> gbStreams = gbStreamMapper.selectByGBId(transform.getGbId());
-//            if (gbStreams.size() > 0) {
-//                for (GbStream gbStream : gbStreams) {
-//                    // 出现使用相同国标Id的视频流时,使用新流替换旧流,
-//                    gbStreamMapper.del(gbStream.getApp(), gbStream.getStream());
-//                    platformGbStreamMapper.delByAppAndStream(gbStream.getApp(), gbStream.getStream());
-//                    if (!gbStream.isStatus()) {
+        if (!StrUtil.isEmpty(transform.getGbId())) {
+            // 如果这个国标ID已经给了其他推流且流已离线,则移除其他推流
+            return gbStreamService.createQuery()
+                .where(GbStream::getGbId, transform.getGbId())
+                .fetch()
+                .publishOn(Schedulers.parallel())
+                .flatMap(gbStream -> {
+                    if (!gbStream.isStatus()) {
+                        //todo
 //                        streamPushMapper.del(gbStream.getApp(), gbStream.getStream());
-//                    }
-//                }
-//            }
-//            if (gbStreamMapper.selectOne(transform.getApp(), transform.getStream()) != null) {
-//                gbStreamMapper.update(transform);
-//            }else {
-//                gbStreamMapper.add(transform);
-//            }
-//        }
-//    }
-//
-//
+                    }
+                    // 出现使用相同国标Id的视频流时,使用新流替换旧流,
+                    return gbStreamService.createDelete()
+                        .where(GbStream::getApp, gbStream.getApp())
+                        .where(GbStream::getStream, gbStream.getStream())
+                        .execute();
+                })
+                .mergeWith( gbStreamService.getByAppAndStreamId(transform.getApp(),transform.getStream())
+                    .flatMap(gbStream -> gbStreamService.updateById(gbStream.getId(), gbStream))
+                    .switchIfEmpty(gbStreamService.save(transform.toGbStream()).then(Mono.empty())))
+                .then(Mono.just(transform));
+        }
+        return Mono.just(transform);
+    }
+
+
 //    public void updateMedia(MediaServerItem mediaServerItem, String app, String streamId) {
 //        //使用异步更新推流
 //        zlmresTfulUtils.getMediaList(mediaServerItem, app, streamId, "rtmp", json->{
@@ -170,8 +173,8 @@
 //            }
 //        });
 //    }
-//
-//
+
+
 //    public int removeMedia(String app, String streamId) {
 //        // 查找是否关联了国标, 关联了不删除, 置为离线
 //        StreamProxyItem streamProxyItem = gbStreamMapper.selectOne(app, streamId);
@@ -183,35 +186,56 @@
 //        }
 //        return result;
 //    }
-//
-//
-//
-////    public void clearAllSessions() {
-////        logger.info("清空所有国标相关的session");
-////        JSONObject allSessionJSON = zlmresTfulUtils.getAllSession();
-////        ZLMServerConfig mediaInfo = redisCatchStorage.getMediaInfo();
-////        HashSet<String> allLocalPorts = new HashSet();
-////        if (allSessionJSON.getInteger("code") == 0) {
-////            JSONArray data = allSessionJSON.getJSONArray("data");
-////            if (data.size() > 0) {
-////                for (int i = 0; i < data.size(); i++) {
-////                    JSONObject sessionJOSN = data.getJSONObject(i);
-////                    Integer local_port = sessionJOSN.getInteger("local_port");
-////                    if (!local_port.equals(Integer.valueOf(mediaInfo.getHttpPort())) &&
-////                        !local_port.equals(Integer.valueOf(mediaInfo.getHttpSSLport())) &&
-////                        !local_port.equals(Integer.valueOf(mediaInfo.getRtmpPort())) &&
-////                        !local_port.equals(Integer.valueOf(mediaInfo.getRtspPort())) &&
-////                        !local_port.equals(Integer.valueOf(mediaInfo.getRtspSSlport())) &&
-////                        !local_port.equals(Integer.valueOf(mediaInfo.getHookOnFlowReport()))){
-////                        allLocalPorts.add(sessionJOSN.getInteger("local_port") + "");
-////                     }
-////                }
-////            }
-////        }
-////        if (allLocalPorts.size() > 0) {
-////            List<String> result = new ArrayList<>(allLocalPorts);
-////            String localPortSStr = String.join(",", result);
-////            zlmresTfulUtils.kickSessions(localPortSStr);
-////        }
-////    }
-//}
+
+
+
+//    public void clearAllSessions() {
+//        logger.info("清空所有国标相关的session");
+//        JSONObject allSessionJSON = zlmresTfulUtils.getAllSession();
+//        ZLMServerConfig mediaInfo = redisCatchStorage.getMediaInfo();
+//        HashSet<String> allLocalPorts = new HashSet();
+//        if (allSessionJSON.getInteger("code") == 0) {
+//            JSONArray data = allSessionJSON.getJSONArray("data");
+//            if (data.size() > 0) {
+//                for (int i = 0; i < data.size(); i++) {
+//                    JSONObject sessionJOSN = data.getJSONObject(i);
+//                    Integer local_port = sessionJOSN.getInteger("local_port");
+//                    if (!local_port.equals(Integer.valueOf(mediaInfo.getHttpPort())) &&
+//                        !local_port.equals(Integer.valueOf(mediaInfo.getHttpSSLport())) &&
+//                        !local_port.equals(Integer.valueOf(mediaInfo.getRtmpPort())) &&
+//                        !local_port.equals(Integer.valueOf(mediaInfo.getRtspPort())) &&
+//                        !local_port.equals(Integer.valueOf(mediaInfo.getRtspSSlport())) &&
+//                        !local_port.equals(Integer.valueOf(mediaInfo.getHookOnFlowReport()))){
+//                        allLocalPorts.add(sessionJOSN.getInteger("local_port") + "");
+//                     }
+//                }
+//            }
+//        }
+//        if (allLocalPorts.size() > 0) {
+//            List<String> result = new ArrayList<>(allLocalPorts);
+//            String localPortSStr = String.join(",", result);
+//            zlmresTfulUtils.kickSessions(localPortSStr);
+//        }
+//    }
+
+
+    private StreamPushItem transform(MediaItem item) {
+        StreamPushItem streamPushItem = new StreamPushItem();
+        streamPushItem.setApp(item.getApp());
+        streamPushItem.setMediaServerId(item.getMediaServerId());
+        streamPushItem.setStream(item.getStream());
+        streamPushItem.setAliveSecond(item.getAliveSecond());
+        streamPushItem.setCreateStamp(item.getCreateStamp());
+        streamPushItem.setOriginSock(item.getOriginSock());
+        streamPushItem.setTotalReaderCount(item.getTotalReaderCount());
+        streamPushItem.setOriginType(item.getOriginType());
+        streamPushItem.setOriginTypeStr(item.getOriginTypeStr());
+        streamPushItem.setOriginUrl(item.getOriginUrl());
+        streamPushItem.setCreateStamp(item.getCreateStamp());
+        streamPushItem.setAliveSecond(item.getAliveSecond());
+        streamPushItem.setStatus(true);
+        streamPushItem.setStreamType("push");
+        streamPushItem.setVhost(item.getVhost());
+        return streamPushItem;
+    }
+}

+ 12 - 12
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/ZLMRunner.java

@@ -1,5 +1,6 @@
 package org.jetlinks.community.media.zlm;
 
+import cn.hutool.core.util.StrUtil;
 import cn.hutool.json.JSONArray;
 import cn.hutool.json.JSONObject;
 import cn.hutool.json.JSONUtil;
@@ -48,17 +49,16 @@ public class ZLMRunner implements CommandLineRunner {
 
     @Override
     public void run(String... strings)  {
-        mediaServerItemService.clearMediaServerForOnline()
-            //获取默认服务器配置
-            .flatMap(ignore-> mediaServerItemService.getDefaultMediaServer())
-            //更新默认服务器配置
-            .flatMap(defaultMediaServer->{
-                MediaServerItem mediaSerItem = mediaConfig.getMediaSerItem();
-                mediaSerItem.setId(defaultMediaServer.getId());
-                return mediaServerItemService.save(mediaSerItem);
-            })
+
+        //获取默认服务器配置
+        mediaServerItemService.getDefaultMediaServer()
             //添加默认媒体服务器配置
-            .switchIfEmpty(mediaServerItemService.save(mediaConfig.getMediaSerItem()))
+            .defaultIfEmpty(mediaConfig.getMediaSerItem())
+            //清除在线服务器信息
+            .mergeWith(mediaServerItemService.clearMediaServerForOnline().then(Mono.empty()))
+            .mergeWith(Mono.delay(Duration.ofSeconds(20)).flatMap(ignore->timeOutHandle()).then(Mono.empty()))
+            //更新默认服务器配置
+            .flatMap(mediaServerItemService::save)
 
             //订阅 zlm启动事件
             .doOnNext(ignore->subscribeOnServerStarted())
@@ -69,9 +69,9 @@ public class ZLMRunner implements CommandLineRunner {
             .flatMap(ignore->startAllConnection())
 
             //20s进行超时处理
-            .delayElement(Duration.ofSeconds(20))
+//            .delayElement(Duration.ofSeconds(20))
 
-            .flatMap(ignore->timeOutHandle())
+//            .flatMap(ignore->timeOutHandle())
 
             .subscribe();
     }

+ 1 - 1
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/dto/StreamProxyItem.java

@@ -1,7 +1,7 @@
 package org.jetlinks.community.media.zlm.dto;
 
 import lombok.Data;
-import org.jetlinks.community.media.bean.GbStream;
+import org.jetlinks.community.media.entity.GbStream;
 @Data
 public class StreamProxyItem extends GbStream {
 

+ 20 - 6
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/dto/StreamPushItem.java

@@ -1,7 +1,7 @@
 package org.jetlinks.community.media.zlm.dto;
 
 import lombok.Data;
-import org.jetlinks.community.media.bean.GbStream;
+import org.jetlinks.community.media.entity.GbStream;
 
 import javax.validation.constraints.NotNull;
 import java.util.List;
@@ -57,10 +57,10 @@ public class StreamPushItem extends GbStream implements Comparable<StreamPushIte
      */
     private String originUrl;
 
-    /**
-     * GMT unix系统时间戳,单位秒
-     */
-    private Long createStamp;
+//    /**
+//     * GMT unix系统时间戳,单位秒
+//     */
+//    private Long createStamp;
 
     /**
      * 存活时间,单位秒
@@ -91,9 +91,23 @@ public class StreamPushItem extends GbStream implements Comparable<StreamPushIte
     }
 
 
+    public GbStream toGbStream(){
+        GbStream gbStream = new GbStream();
+        gbStream.setApp(this.app);
+        gbStream.setStream(this.stream);
+        gbStream.setGbId(this.getGbId());
+        gbStream.setName(this.getName());
+        gbStream.setLongitude(this.getLongitude());
+        gbStream.setLatitude(this.getLatitude());
+        gbStream.setStreamType(this.getStreamType());
+        gbStream.setMediaServerId(this.getMediaServerId());
+        gbStream.setStatus(super.isStatus());
+        gbStream.setCreateStamp(this.getCreateStamp());
+        return gbStream;
+    }
     @Override
     public int compareTo(@NotNull StreamPushItem streamPushItem) {
-        return Long.valueOf(this.createStamp - streamPushItem.getCreateStamp().intValue()).intValue();
+        return Long.valueOf(this.getCreateStamp() - streamPushItem.getCreateStamp().intValue()).intValue();
     }
 
     public static class MediaSchema {

+ 11 - 1
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/entity/MediaServerItem.java

@@ -7,6 +7,7 @@ import lombok.Data;
 import lombok.Getter;
 import lombok.Setter;
 import org.hswebframework.ezorm.rdb.mapping.annotation.ColumnType;
+import org.hswebframework.ezorm.rdb.mapping.annotation.JsonCodec;
 import org.hswebframework.web.api.crud.entity.GenericEntity;
 import org.hswebframework.web.api.crud.entity.RecordCreationEntity;
 import org.jetlinks.community.media.bean.SsrcConfig;
@@ -21,6 +22,14 @@ import java.sql.JDBCType;
 @Data
 @Table(name = "media_server_item")
 public class MediaServerItem extends GenericEntity<String> {
+
+    private static Long serialVersionUID=1L;
+    @Column(name = "server_id")
+    @Schema(
+        description = "服务器id"
+    )
+    private String serverId;
+
     @Column(name = "ip")
     @Schema(
         description = "产品id"
@@ -98,6 +107,7 @@ public class MediaServerItem extends GenericEntity<String> {
 
     @Column(name = "ssrc_config")
     @ColumnType(jdbcType = JDBCType.CLOB)
+    @JsonCodec
     private SsrcConfig ssrcConfig;
 
     @Column(name = "current_port")
@@ -114,7 +124,7 @@ public class MediaServerItem extends GenericEntity<String> {
     }
 
     public MediaServerItem(ZLMServerConfig zlmServerConfig, String sipIp) {
-        this.setId( zlmServerConfig.getGeneralMediaServerId());
+        serverId=zlmServerConfig.getGeneralMediaServerId();
         ip = zlmServerConfig.getIp();
         hookIp = StringUtils.isEmpty(zlmServerConfig.getHookIp())? sipIp: zlmServerConfig.getHookIp();
         sdpIp = StringUtils.isEmpty(zlmServerConfig.getSdpIp())? zlmServerConfig.getIp(): zlmServerConfig.getSdpIp();

+ 1 - 1
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/event/ZLMKeepliveTimeoutListener.java

@@ -59,7 +59,7 @@
 //        String mediaServerId = expiredKey.substring(KEEPLIVEKEY_PREFIX.length(),expiredKey.length());
 //        logger.info("[zlm心跳到期]:" + mediaServerId);
 //        // 发起http请求验证zlm是否确实无法连接,如果确实无法连接则发送离线事件,否则不作处理
-//        MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
+//        MediaServerItem mediaServerItem = mediaServerService.getOneByServerId(mediaServerId);
 //        JSONObject mediaServerConfig = zlmresTfulUtils.getMediaServerConfig(mediaServerItem);
 //        if (mediaServerConfig == null) {
 //            publisher.zlmOfflineEventPublish(mediaServerId);

+ 25 - 0
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/utils/ZlmSubscribeTopic.java

@@ -0,0 +1,25 @@
+package org.jetlinks.community.utils;
+
+import org.jetlinks.community.media.zlm.ZLMHttpHookSubscribe;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName ZlmSubscribeTopic.java
+ * @Description TODO
+ * @createTime 2022年02月10日 15:36:00
+ */
+public class ZlmSubscribeTopic {
+
+    /**
+     * 获取监听媒体流变化主题
+     * @param app
+     * @param streamId
+     * @param mediaServerItemId
+     * @return
+     */
+    public static String getOnStreamChanged(String app,String streamId,String mediaServerItemId){
+        return ZLMHttpHookSubscribe.HookType.on_stream_changed+"/"+app+"/"+streamId
+            +"/"+mediaServerItemId;
+    }
+}

+ 1 - 1
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/RuleSceneController.java

@@ -16,7 +16,7 @@ import reactor.core.publisher.Mono;
 @RestController
 @RequestMapping("rule-engine/scene")
 @Authorize
-@Resource(id = "rule-engine/scene", name = "规则引擎-场景联动")
+@Resource(id = "rule-engine-scene", name = "规则引擎-场景联动")
 @AllArgsConstructor
 public class RuleSceneController implements ReactiveServiceCrudController<RuleSceneEntity, String> {
 

+ 3 - 4
jetlinks-standalone/src/main/resources/application.yml

@@ -133,15 +133,15 @@ logging:
   level:
     org.jetlinks: error
     rule.engine: debug
-    org.hswebframework: error
+    org.hswebframework: info
     org.springframework.transaction: error
     org.springframework.data.r2dbc.connectionfactory: error
     io.micrometer: warn
     org.hswebframework.expands: error
-    system: debug
+    system: info
     org.jetlinks.rule.engine: warn
     org.jetlinks.supports.event: warn
-    org.springframework: warn
+    org.springframework: info
     org.jetlinks.community.device.message.writer: warn
     org.jetlinks.community.elastic.search.service: warn
     org.jetlinks.community.elastic.search.service.reactive: warn
@@ -232,7 +232,6 @@ visual:
 
 #zlm 默认服务器配置
 media:
-  id: 123456
   # [必须修改] zlm服务器的内网IP
   ip: 1.15.89.83
   # [必须修改] zlm服务器的http.port

+ 1 - 0
jetlinks-standalone/src/main/resources/logback-spring.xml

@@ -15,6 +15,7 @@
 
         <logger name="system" level="debug">
             <appender-ref ref="LOGEventPublisher"/>
+            <appender-ref ref="CONSOLE"/>
         </logger>
 
         <root level="INFO">