Pārlūkot izejas kodu

add catalog 和 deviceInfo

18339543638 3 gadi atpakaļ
vecāks
revīzija
c9a221820b
22 mainītis faili ar 685 papildinājumiem un 179 dzēšanām
  1. 1 0
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/contanst/CmdType.java
  2. 35 0
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/controller/MediaDeviceChannelController.java
  3. 8 1
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/entity/MediaDevice.java
  4. 15 0
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/entity/MediaDeviceChannel.java
  5. 0 1
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/enums/StreamMode.java
  6. 17 0
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalMediaDeviceChannelService.java
  7. 68 44
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalMediaDeviceService.java
  8. 86 89
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalPlayService.java
  9. 6 0
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/AbstractSipProcessor.java
  10. 44 11
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/processor/SipProcessorObserver.java
  11. 3 1
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/impl/RegisterRequestProcessor.java
  12. 9 11
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/message/MessageRequestProcessor.java
  13. 114 0
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/message/notify/CatalogResponseMessageProcessor.java
  14. 100 0
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/message/notify/DeviceInfoResponseMessageProcessor.java
  15. 1 1
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/message/notify/KeepaliveNotifyMessageHandler.java
  16. 1 0
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/response/InviteResponseProcessor.java
  17. 10 3
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/storage/impl/RedisCacheStorageImpl.java
  18. 8 8
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/transmit/SIPRequestHeaderProvider.java
  19. 63 0
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/transmit/cmd/SipCommander.java
  20. 2 2
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/ZLMHttpHookListener.java
  21. 2 1
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/ZLMRTPServerFactory.java
  22. 92 6
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/utils/XmlUtil.java

+ 1 - 0
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/contanst/CmdType.java

@@ -14,5 +14,6 @@ public class CmdType {
     public static final String KEEP_ALIVE = "Keepalive";
     public static final String NOTIFY = "Notify";
     public static final String QUERY = "Query";
+    public static final String DEVICE_INFO = "DeviceInfo";
     public static final String RESPONSE = "Response";
 }

+ 35 - 0
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/controller/MediaDeviceChannelController.java

@@ -0,0 +1,35 @@
+package org.jetlinks.community.media.controller;
+
+import io.swagger.v3.oas.annotations.tags.Tag;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.hswebframework.web.authorization.annotation.Authorize;
+import org.hswebframework.web.authorization.annotation.Resource;
+import org.hswebframework.web.crud.service.ReactiveCrudService;
+import org.hswebframework.web.crud.web.reactive.ReactiveServiceQueryController;
+import org.jetlinks.community.media.entity.MediaDevice;
+import org.jetlinks.community.media.service.LocalMediaDeviceService;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName MediaDeviceController.java
+ * @Description TODO
+ * @createTime 2022年02月08日 10:42:00
+ */
+@RestController
+@RequestMapping("/media/channel")
+@Slf4j
+@Authorize
+@Resource(id="media-deivce-channel",name = "媒体流设备通道")
+@AllArgsConstructor
+@Tag(name = "媒体视频设备")
+public class MediaDeviceChannelController implements ReactiveServiceQueryController<MediaDevice, String> {
+    private LocalMediaDeviceService mediaDeviceService;
+    @Override
+    public ReactiveCrudService<MediaDevice, String> getService() {
+        return mediaDeviceService;
+    }
+}

+ 8 - 1
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/entity/MediaDevice.java

@@ -204,8 +204,15 @@ public class MediaDevice extends GenericEntity<String> implements RecordCreation
     )
 	private int subscribeCycleForCatalog ;
 
-	private String creatorId;
 
+	private String creatorId;
 
+    /**
+     * 目录订阅周期,0为不订阅
+     */
+    @Column(name = "create_time")
+    @Schema(
+        description = "创建时间"
+    )
 	private Long createTime;
 }

+ 15 - 0
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/entity/MediaDeviceChannel.java

@@ -7,6 +7,7 @@ import org.hswebframework.web.api.crud.entity.GenericEntity;
 
 import javax.persistence.Column;
 import javax.persistence.Table;
+import java.util.Objects;
 
 @Data
 @EqualsAndHashCode(callSuper = false)
@@ -320,4 +321,18 @@ public class MediaDeviceChannel extends GenericEntity<String> {
 				break;
 		}
 	}
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        MediaDeviceChannel that = (MediaDeviceChannel) o;
+        return Objects.equals(getChannelId(), that.getChannelId()) &&
+            Objects.equals(getDeviceId(), that.getDeviceId());
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(getChannelId(), getDeviceId());
+    }
 }

+ 0 - 1
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/enums/StreamMode.java

@@ -11,5 +11,4 @@ public enum  StreamMode {
     TCP_PASSIVE,
     TCP_ACTIVE,
     UDP;
-
 }

+ 17 - 0
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalMediaDeviceChannelService.java

@@ -4,8 +4,13 @@ import lombok.AllArgsConstructor;
 import org.hswebframework.web.crud.service.GenericReactiveCrudService;
 import org.jetlinks.community.media.entity.MediaDeviceChannel;
 import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
 import reactor.core.publisher.Mono;
 
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
 /**
  * @author lifang
  * @projectName jetlinks-community
@@ -32,4 +37,16 @@ public class LocalMediaDeviceChannelService extends GenericReactiveCrudService<M
             .execute()
             .then();
     }
+
+    /**
+     *  catlog查询结束后完全重写通道信息
+     * @param id
+     * @param channelList
+     * @return
+     */
+    public Mono<Void> resetChannels(String id, List<MediaDeviceChannel> channelList) {
+        Set<MediaDeviceChannel> channelSets = new HashSet<>();
+
+        return Mono.empty();
+    }
 }

+ 68 - 44
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalMediaDeviceService.java

@@ -2,14 +2,15 @@ package org.jetlinks.community.media.service;
 
 import cn.hutool.core.date.DateUtil;
 import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
 import org.hswebframework.web.crud.service.GenericReactiveCrudService;
 import org.jetlinks.community.gateway.annotation.Subscribe;
 import org.jetlinks.community.media.entity.MediaDevice;
 import org.jetlinks.community.media.bean.SipServerConfig;
 import org.jetlinks.community.media.enums.DeviceState;
-import org.jetlinks.community.media.sip.SipContext;
 import org.jetlinks.community.media.sip.SipServerHelper;
-import org.jetlinks.community.utils.SipUtils;
+import org.jetlinks.community.media.storage.impl.RedisCacheStorageImpl;
+import org.jetlinks.community.media.transmit.cmd.SipCommander;
 import org.jetlinks.core.device.DeviceOperator;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.event.EventBus;
@@ -18,13 +19,14 @@ import org.jetlinks.core.message.DeviceOnlineMessage;
 import org.jetlinks.core.message.DeviceRegisterMessage;
 import org.jetlinks.core.utils.IdUtils;
 import org.jetlinks.supports.server.DecodedClientMessageHandler;
-import org.redisson.api.RBucket;
 import org.redisson.api.RedissonClient;
 import org.springframework.boot.CommandLineRunner;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Mono;
 
-import java.util.concurrent.TimeUnit;
+import java.lang.management.MemoryNotificationInfo;
+import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * @author lifang
@@ -35,19 +37,16 @@ import java.util.concurrent.TimeUnit;
  */
 @Service
 @AllArgsConstructor
+@Slf4j
 public class LocalMediaDeviceService extends GenericReactiveCrudService<MediaDevice, String> implements CommandLineRunner {
-    private final RedissonClient redissonClient;
-
     private final DecodedClientMessageHandler messageHandler;
-
+    private final RedisCacheStorageImpl redisCacheStorage;
     private final EventBus eventBus;
     private final DeviceRegistry registry;
+    private final SipCommander cmder;
+
     @Subscribe("/media/device/*/*/register")
     public void register(MediaDevice mediaDevice){
-        //todo 当设备为首次注册时,拉取设备信息
-        if(mediaDevice.isFirsRegister()){
-
-        }
         //注册设备
         this.updateById(mediaDevice.getId(),mediaDevice)
             .filter(count->count==0)
@@ -62,50 +61,57 @@ public class LocalMediaDeviceService extends GenericReactiveCrudService<MediaDev
                         registerMessage.addHeader("deviceName","GB28181-2016 "+mediaDevice.getId());
                         registerMessage.addHeader("productId",mediaDevice.getProductId());
                         return eventBus.publish("/device/*/*/register",registerMessage)
-                            .mergeWith(eventBus.publish("/media/device/heart-beat",mediaDevice)).then(Mono.empty());
+                            .mergeWith(eventBus.publish("/media/device/heart-beat/"+mediaDevice.getId(),mediaDevice)).then(Mono.empty());
                     })
                     .then(Mono.empty())
             )
+            .doOnComplete(()->{
+                if(mediaDevice.isFirsRegister()){
+                     cmder.deviceInfoQuery(mediaDevice)
+                         .concatWith(cmder.catalogQuery(mediaDevice, null)).subscribe();
+                }
+            })
             .subscribe();
-
-
     }
 
     @Subscribe("/media/device/*/*/unregister")
     public void unRegister(MediaDevice mediaDevice){
         //取消注册媒体设备
-
-        //通用设备下线
-        registry.getDevice(mediaDevice.getId())
-            .flatMap(operator -> {
-                DeviceOfflineMessage message = new DeviceOfflineMessage();
-                message.setTimestamp(System.currentTimeMillis());
-                message.setDeviceId(mediaDevice.getIp());
-                message.setMessageId(IdUtils.newUUID());
-                return messageHandler.handleMessage(operator,message);
-            }).subscribe();
+        this.createUpdate()
+            .where(MediaDevice::getId,mediaDevice.getId())
+            .set(MediaDevice::getState,DeviceState.offline)
+            .execute()
+            .flatMap(ignore->
+                //通用设备下线
+                registry.getDevice(mediaDevice.getId())
+                    .flatMap(operator -> {
+                        DeviceOfflineMessage message = new DeviceOfflineMessage();
+                        message.setTimestamp(System.currentTimeMillis());
+                        message.setDeviceId(mediaDevice.getIp());
+                        message.setMessageId(IdUtils.newUUID());
+                        return messageHandler.handleMessage(operator,message);
+                    })
+            )
+            .subscribe();
     }
 
 
-    @Subscribe("/media/device/heart-beat")
+    @Subscribe("/media/device/heart-beat/**")
     public void heartBeat(MediaDevice device){
-        SipServerConfig config = SipContext.getConfig();
-        String key = SipUtils.keepAliveKey(device.getId());
+//        SipServerConfig config = SipContext.getConfig();
+//        String key = SipUtils.keepAliveKey(device.getId());
+        Mono<DeviceOperator> operatorMono =
+            registry
+                .getDevice(device.getId());
+        //心跳过期 ,设备上线
+        redisCacheStorage.updateDevice(device);
+        DeviceOnlineMessage onlineMessage = new DeviceOnlineMessage();
+        onlineMessage.setDeviceId(device.getId());
+        onlineMessage.setTimestamp(System.currentTimeMillis());
+        onlineMessage.setMessageId(IdUtils.newUUID());
+        operatorMono
+            .flatMap(operator -> messageHandler.handleMessage(operator,onlineMessage));
 
-        RBucket<MediaDevice> bucket = redissonClient.getBucket(key);
-        Mono<DeviceOperator> operatorMono = registry.getDevice(device.getId());
-        if (!bucket.isExists()) {
-            //心跳过期 ,设备上线
-            bucket.set(device,config.getTimeout(), TimeUnit.SECONDS);
-            DeviceOnlineMessage onlineMessage = new DeviceOnlineMessage();
-            onlineMessage.setDeviceId(device.getId());
-            onlineMessage.setTimestamp(System.currentTimeMillis());
-            onlineMessage.setMessageId(IdUtils.newUUID());
-            operatorMono
-                .flatMap(operator -> messageHandler.handleMessage(operator,onlineMessage));
-        }else {
-            bucket.expire(config.getTimeout(), TimeUnit.SECONDS);
-        }
         //更新设备心跳时间
         device.setKeepaliveTime(DateUtil.now());
 
@@ -116,10 +122,28 @@ public class LocalMediaDeviceService extends GenericReactiveCrudService<MediaDev
             .subscribe();
     }
 
+
+
+    public Mono<Void> updateDeviceAndCache(MediaDevice device){
+        return this.updateById(device.getId(),device)
+            .doOnNext(__->
+                redisCacheStorage.updateDevice(device)
+            )
+            .then();
+    }
+
     private final SipServerHelper sipServerHelper;
     @Override
-    public void run(String... args) throws Exception {
-        sipServerHelper.createSip( SipServerConfig.of("34020000002000000001","0.0.0.0", 7001,"udp","340200000","utf-8","12345678",10L,"1")).subscribe();
-
+    public void run(String... args) {
+        //todo 后期改为定时
+        List<MediaDevice> allOnlineDevice = redisCacheStorage.getAllOnlineDevice();
+        List<String> deviceIds = allOnlineDevice.stream().map(MediaDevice::getId).collect(Collectors.toList());
+        this.createQuery()
+            .where(MediaDevice::getState,DeviceState.online)
+            .notIn(MediaDevice::getId,deviceIds)
+            .fetch()
+            .doOnNext(this::unRegister)
+            .concatWith(sipServerHelper.createSip( SipServerConfig.of("34020000002000000001","0.0.0.0", 7001,"udp","340200000","utf-8","12345678",10L,"1")).then(Mono.empty()))
+            .subscribe();
     }
 }

+ 86 - 89
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalPlayService.java

@@ -9,11 +9,10 @@ import lombok.extern.slf4j.Slf4j;
 import org.hswebframework.web.exception.BusinessException;
 import org.jetlinks.community.media.bean.SSRCInfo;
 import org.jetlinks.community.media.bean.StreamInfo;
-import org.jetlinks.community.media.entity.MediaDeviceChannel;
 import org.jetlinks.community.media.entity.MediaDevice;
 import org.jetlinks.community.media.gb28181.result.WVPResult;
 import org.jetlinks.community.media.session.VideoStreamSessionManager;
-import org.jetlinks.community.media.storage.impl.RedisCatchStorageImpl;
+import org.jetlinks.community.media.storage.impl.RedisCacheStorageImpl;
 import org.jetlinks.community.media.transmit.callback.DeferredResultHolder;
 import org.jetlinks.community.media.transmit.callback.RequestMessage;
 import org.jetlinks.community.media.transmit.cmd.SipCommander;
@@ -28,7 +27,6 @@ import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
 import org.springframework.stereotype.Service;
 import org.springframework.util.ResourceUtils;
-import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import java.io.FileNotFoundException;
 import java.time.Duration;
@@ -47,7 +45,7 @@ public class LocalPlayService  {
 //
     private final SipCommander cmder;
 
-    private final RedisCatchStorageImpl redisCatchStorage;
+    private final RedisCacheStorageImpl redisCatchStorage;
 
     private final LocalMediaServerItemService mediaServerItemService;
 
@@ -71,6 +69,9 @@ public class LocalPlayService  {
 
     public Mono<ResponseEntity> play(MediaServerItem mediaServerItem, String deviceId, String channelId,
                                      ZLMHttpHookSubscribe.Event hookEvent, ZLMHttpHookSubscribe.Event errorEvent) {
+        //todo
+        mediaServerItem.setRtpEnable(false);
+        mediaServerItem.setRtpProxyPort(10000);
         PlayResult playResult = new PlayResult();
 
         String msgId=UUID.randomUUID().toString();
@@ -79,12 +80,86 @@ public class LocalPlayService  {
 
         final AtomicBoolean start=new AtomicBoolean(false);
 
-        return eventBus.subscribe(
-            Subscription
-                .builder()
-                .topics(DeferredResultHolder.getTopicByDeviceIdAndChannelId(DeferredResultHolder.CALLBACK_CMD_PLAY, msg))
-                .features(Subscription.Feature.local)
-                .build())
+
+
+
+            return eventBus.subscribe(
+                Subscription.of("media_play",DeferredResultHolder.getTopicByDeviceIdAndChannelId(DeferredResultHolder.CALLBACK_CMD_PLAY, msg), Subscription.Feature.local))
+                .mergeWith(   Mono.justOrEmpty(redisCatchStorage.queryPlayByDevice(deviceId,channelId))
+                    .flatMap(streamInfo -> {
+                        if(StrUtil.isEmpty(streamInfo.getStreamId())){
+                            return Mono.error(new BusinessException("点播失败, redis缓存streamId等于null"));
+                        }
+                        String mediaServerId = streamInfo.getMediaServerId();
+                        MediaServerItem mediaInfo = mediaServerItemService.getOne(mediaServerId);
+
+                        JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaInfo, streamInfo.getStreamId());
+                        if (rtpInfo != null && rtpInfo.getBool("exist")) {
+                            if (hookEvent != null) {
+                                //todo
+//                        hookEvent.response(mediaServerItem, com.alibaba.fastjson.JSONObject.parseObject(JSON.toJSONString(streamInfo)));
+                            }
+                            clusterEventBus.publish(DeferredResultHolder.getTopicByDeviceIdAndChannelId(DeferredResultHolder.CALLBACK_CMD_PLAY, msg),
+                                ResponseEntity.ok(streamInfo));
+                        } else {
+                            // TODO 点播前是否重置状态
+                            redisCatchStorage.stopPlay(streamInfo);
+                            return deviceChannelService.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId())
+                                .flatMap(ignore->{
+                                    SSRCInfo ssrcInfo;
+                                    String streamId2 = null;
+                                    if (mediaServerItem.isRtpEnable()) {
+                                        streamId2 = String.format("%s_%s",deviceId, channelId);
+                                    }
+                                    ssrcInfo = mediaServerItemService.openRTPServer(mediaServerItem, streamId2);
+
+                                    return cmder.playStreamCmd(mediaServerItem, ssrcInfo, redisCatchStorage.getDevice(deviceId), channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
+                                        log.info("收到订阅消息: " + response.toString());
+                                        onPublishHandlerForPlay(mediaServerItemInuse, response, deviceId, channelId, msgId)
+                                            .subscribe();
+                                    }, (event) -> {
+                                        mediaServerItemService.closeRTPServer(playResult.getDevice(), channelId);
+                                        WVPResult wvpResult = WVPResult.of(HttpStatus.INTERNAL_SERVER_ERROR.value(), String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg),null);
+                                        clusterEventBus.publish(DeferredResultHolder.getTopicByDeviceIdAndChannelId(DeferredResultHolder.CALLBACK_CMD_PLAY, msg),
+                                            ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(wvpResult));
+                                    })
+                                        .then();
+                                });
+                        }
+                        return Mono.empty();
+                    })
+                    .switchIfEmpty(Mono.just(mediaServerItem)
+                        .flatMap(serverItem->{
+                            SSRCInfo ssrcInfo;
+                            String streamId = null;
+                            if (serverItem.isRtpEnable()) {
+                                streamId = String.format("%s_%s", deviceId, channelId);
+                            }
+
+                            ssrcInfo = mediaServerItemService.openRTPServer(serverItem, streamId);
+
+                            // 发送点播消息
+                            MediaDevice device = redisCatchStorage.getDevice(deviceId);
+                            return cmder.playStreamCmd(serverItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {
+                                log.info("收到订阅消息: " + response.toString());
+                                onPublishHandlerForPlay(mediaServerItemInUse, response, deviceId, channelId, msgId).subscribe();
+                                if (hookEvent != null) {
+//                                    hookEvent.response(mediaServerItem, response);
+                                }
+                            }, (event) -> {
+                                WVPResult wvpResult = WVPResult.of(HttpStatus.INTERNAL_SERVER_ERROR.value(), String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
+                                // 点播返回sip错误
+                                mediaServerItemService.closeRTPServer(playResult.getDevice(), channelId);
+                                clusterEventBus.publish(DeferredResultHolder.getTopicByDeviceIdAndChannelId(DeferredResultHolder.CALLBACK_CMD_PLAY, msg),
+                                    ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(wvpResult));
+                                if (errorEvent != null) {
+                                    //todo
+//                    errorEvent.response(event);
+                                }
+                            })
+                                .then();
+                        })).then(Mono.empty()))
+
             //超时报错 todo
             .timeout(Duration.ofSeconds(10), Mono.error(new BusinessException("点播/收流超时,请稍候重试")))
             //报错时,发送bye指令
@@ -139,84 +214,6 @@ public class LocalPlayService  {
             //保证接下来的操作流仅被触发一次
             .filter(ignore->!start.get())
             .doOnNext(ignore->start.set(true))
-
-            .concatWith(
-                Mono.fromRunnable(()->
-                        Mono.just(redisCatchStorage.queryPlayByDevice(deviceId,channelId))
-                            .flatMap(streamInfo -> {
-                                if(StrUtil.isEmpty(streamInfo.getStreamId())){
-                                    return Mono.error(new BusinessException("点播失败, redis缓存streamId等于null"));
-                                }
-                                String mediaServerId = streamInfo.getMediaServerId();
-                                MediaServerItem mediaInfo = mediaServerItemService.getOne(mediaServerId);
-
-                                JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaInfo, streamInfo.getStreamId());
-                                if (rtpInfo != null && rtpInfo.getBool("exist")) {
-                                    if (hookEvent != null) {
-                                        //todo
-//                        hookEvent.response(mediaServerItem, com.alibaba.fastjson.JSONObject.parseObject(JSON.toJSONString(streamInfo)));
-                                    }
-                                    clusterEventBus.publish(DeferredResultHolder.getTopicByDeviceIdAndChannelId(DeferredResultHolder.CALLBACK_CMD_PLAY, msg),
-                                        ResponseEntity.ok(streamInfo));
-                                } else {
-                                    // TODO 点播前是否重置状态
-                                    redisCatchStorage.stopPlay(streamInfo);
-                                    return deviceChannelService.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId())
-                                        .flatMap(ignore->{
-                                            SSRCInfo ssrcInfo;
-                                            String streamId2 = null;
-                                            if (mediaServerItem.isRtpEnable()) {
-                                                streamId2 = String.format("%s_%s",deviceId, channelId);
-                                            }
-                                            ssrcInfo = mediaServerItemService.openRTPServer(mediaServerItem, streamId2);
-
-                                            return cmder.playStreamCmd(mediaServerItem, ssrcInfo, redisCatchStorage.getDevice(deviceId), channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
-                                                log.info("收到订阅消息: " + response.toString());
-                                                onPublishHandlerForPlay(mediaServerItemInuse, response, deviceId, channelId, msgId)
-                                                    .subscribe();
-                                            }, (event) -> {
-                                                mediaServerItemService.closeRTPServer(playResult.getDevice(), channelId);
-                                                WVPResult wvpResult = WVPResult.of(HttpStatus.INTERNAL_SERVER_ERROR.value(), String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg),null);
-                                                clusterEventBus.publish(DeferredResultHolder.getTopicByDeviceIdAndChannelId(DeferredResultHolder.CALLBACK_CMD_PLAY, msg),
-                                                    ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(wvpResult));
-                                            })
-                                                .then();
-                                        });
-                                }
-                                return Mono.empty();
-                            })
-                            .switchIfEmpty(Mono.just(mediaServerItem)
-                                .flatMap(serverItem->{
-                                    SSRCInfo ssrcInfo;
-                                    String streamId = null;
-                                    if (serverItem.isRtpEnable()) {
-                                        streamId = String.format("%s_%s", deviceId, channelId);
-                                    }
-
-                                    ssrcInfo = mediaServerItemService.openRTPServer(serverItem, streamId);
-
-                                    // 发送点播消息
-                                    return cmder.playStreamCmd(serverItem, ssrcInfo, redisCatchStorage.getDevice(deviceId), channelId, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {
-                                        log.info("收到订阅消息: " + response.toString());
-                                        onPublishHandlerForPlay(mediaServerItemInUse, response, deviceId, channelId, msgId).subscribe();
-                                        if (hookEvent != null) {
-//                                    hookEvent.response(mediaServerItem, response);
-                                        }
-                                    }, (event) -> {
-                                        WVPResult wvpResult = WVPResult.of(HttpStatus.INTERNAL_SERVER_ERROR.value(), String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
-                                        // 点播返回sip错误
-                                        mediaServerItemService.closeRTPServer(playResult.getDevice(), channelId);
-                                        clusterEventBus.publish(DeferredResultHolder.getTopicByDeviceIdAndChannelId(DeferredResultHolder.CALLBACK_CMD_PLAY, msg),
-                                            ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(wvpResult));
-                                        if (errorEvent != null) {
-                                            //todo
-//                    errorEvent.response(event);
-                                        }
-                                    })
-                                        .then();
-                                }))
-                            .subscribe()
-                ))
             .doOnError(BusinessException.class, e -> cmder.streamByeCmd(deviceId, channelId).subscribe())
             .single();
     }
@@ -252,7 +249,7 @@ public class LocalPlayService  {
         if (device == null) {
             return Mono.empty();
         }
-        return Mono.just(device.getMediaServerId())
+        return Mono.justOrEmpty(device.getMediaServerId())
             .flatMap(mediaServerItemService::findById)
             //找不到设备相应的媒体流服务器则根据负载均衡获取相应的媒体流服务器
             .switchIfEmpty(mediaServerItemService.getMediaServerForMinimumLoad())

+ 6 - 0
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/AbstractSipProcessor.java

@@ -5,6 +5,7 @@ import org.jetlinks.community.media.sip.processor.SipProcessorObserver;
 import org.springframework.beans.factory.annotation.Autowired;
 import javax.annotation.PostConstruct;
 import javax.sip.*;
+import javax.sip.header.CSeqHeader;
 
 /**
  * @description: 对SIP事件进行处理,包括request, response, timeout, ioException, transactionTerminated,dialogTerminated
@@ -36,6 +37,11 @@ public abstract class AbstractSipProcessor {
             })
             .subscribe();
         observer.handleResponseEvent()
+            .filter(responseEvent -> {
+                CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME);
+                String method = cseqHeader.getMethod();
+                return this.getMethod().equals(method);
+            })
             .doOnNext(this::processResponse)
             .onErrorContinue((k,v)->{})
             .subscribe();

+ 44 - 11
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/processor/SipProcessorObserver.java

@@ -1,19 +1,14 @@
 package org.jetlinks.community.media.sip.processor;
 
-import gov.nist.javax.sip.parser.HeaderParser;
-import gov.nist.javax.sip.parser.ParserFactory;
 import lombok.extern.slf4j.Slf4j;
+import org.jetlinks.core.event.EventBus;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import reactor.core.publisher.EmitterProcessor;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.FluxSink;
-import reactor.core.publisher.UnicastProcessor;
-
 import javax.sip.*;
-import javax.sip.header.AuthorizationHeader;
-import javax.sip.message.Request;
-import java.text.ParseException;
-import java.util.ListIterator;
+import javax.sip.message.Response;
 import java.util.function.Function;
 
 /**
@@ -44,6 +39,8 @@ public class SipProcessorObserver implements ISipProcessObserver {
     private EmitterProcessor<DialogTerminatedEvent> dialogTerminatedProcessor=EmitterProcessor.create();
     private FluxSink<DialogTerminatedEvent> dialogTerminatedSink=dialogTerminatedProcessor.sink();
 
+    @Autowired
+    private EventBus eventBus;
 
 
     /**
@@ -58,13 +55,49 @@ public class SipProcessorObserver implements ISipProcessObserver {
     }
 
     /**
-     * 分发ResponseEvent事件
+     * 分发ResponseEvent事件 todo 处理失败事件
      * @param responseEvent responseEvent事件
      */
     @Override
     public void processResponse(ResponseEvent responseEvent) {
-        if(responseProcessor.hasDownstreams()){
-            responseSink.next(responseEvent);
+        Response response = responseEvent.getResponse();
+        log.debug(responseEvent.getResponse().toString());
+        int status = response.getStatusCode();
+        if (((status >= 200) && (status < 300)) || status == 401) { // Success!
+//            CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME);
+//            String method = cseqHeader.getMethod();
+            if(responseProcessor.hasDownstreams()){
+                responseSink.next(responseEvent);
+            }
+//            if (responseEvent.getResponse() != null && sipSubscribe.getOkSubscribesSize() > 0 ) {
+//                CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME);
+//                if (callIdHeader != null) {
+//                    SipSubscribe.Event subscribe = sipSubscribe.getOkSubscribe(callIdHeader.getCallId());
+//                    if (subscribe != null) {
+//                        SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent);
+//                        subscribe.response(eventResult);
+//                        sipSubscribe.removeOkSubscribe(callIdHeader.getCallId());
+//                    }
+//                }
+//            }
+        } else if ((status >= 100) && (status < 200)) {
+            // 增加其它无需回复的响应,如101、180等
+        } else {
+            log.warn("接收到失败的response响应!status:" + status + ",message:" + response.getReasonPhrase()/* .getContent().toString()*/);
+//            if (responseEvent.getResponse() != null && sipSubscribe.getErrorSubscribesSize() > 0 ) {
+//                CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME);
+//                if (callIdHeader != null) {
+//                    SipSubscribe.Event subscribe = sipSubscribe.getErrorSubscribe(callIdHeader.getCallId());
+//                    if (subscribe != null) {
+//                        SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent);
+//                        subscribe.response(eventResult);
+//                        sipSubscribe.removeErrorSubscribe(callIdHeader.getCallId());
+//                    }
+//                }
+//            }
+            if (responseEvent.getDialog() != null) {
+                responseEvent.getDialog().delete();
+            }
         }
     }
 

+ 3 - 1
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/impl/RegisterRequestProcessor.java

@@ -7,6 +7,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.jetlinks.community.media.entity.MediaDevice;
 import org.jetlinks.community.media.bean.SipServerConfig;
 import org.jetlinks.community.media.bean.WvpSipDate;
+import org.jetlinks.community.media.enums.StreamMode;
 import org.jetlinks.community.media.sip.SipContext;
 import org.jetlinks.community.media.sip.SipRequestProcessorParent;
 import org.jetlinks.community.utils.SipUtils;
@@ -88,6 +89,7 @@ public class RegisterRequestProcessor extends SipRequestProcessorParent {
             if(StrUtil.isEmpty(device.getId())){
                 //首次注册
                 device.setFirsRegister(true);
+                device.setStreamMode( StreamMode.UDP.name());
             }
             device.setId(deviceId);
             //基于数字摘要的认证
@@ -131,7 +133,7 @@ public class RegisterRequestProcessor extends SipRequestProcessorParent {
                 String transport = viaHeader.getTransport();
                 String protocol = viaHeader.getProtocol();
                 //todo
-                device.setStreamMode(protocol);
+
                 device.setTransport(transport);
                 device.setIp(received);
                 device.setPort(rPort);

+ 9 - 11
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/message/MessageRequestProcessor.java

@@ -11,6 +11,7 @@ import org.jetlinks.community.media.bean.ParentPlatform;
 import org.jetlinks.community.media.service.LocalMediaDeviceService;
 import org.jetlinks.community.media.sip.SipContext;
 import org.jetlinks.community.media.sip.SipRequestProcessorParent;
+import org.jetlinks.community.media.storage.impl.RedisCacheStorageImpl;
 import org.jetlinks.community.utils.SipUtils;
 import org.redisson.api.RedissonClient;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -55,23 +56,22 @@ public class MessageRequestProcessor extends SipRequestProcessorParent {
     private RedissonClient client;
     @Autowired
     private LocalMediaDeviceService deviceService;
+
+    @Autowired
+    private RedisCacheStorageImpl redisCacheStorage;
     @SneakyThrows
     @Override
     public void processRequest(RequestEvent evt) {
         log.debug("接收到消息:" + evt.getRequest());
         String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
         CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME);
+        MediaDevice device = redisCacheStorage.getDevice(deviceId);
         // 查询设备是否存在
-        Mono.zip(Mono.justOrEmpty((MediaDevice) (client.getBucket(SipUtils.keepAliveKey(deviceId)).get()))
-            .switchIfEmpty(deviceService.findById(deviceId)),
-            //todo
-            Mono.justOrEmpty(new ParentPlatform()))
-            .doOnNext(tuple2->{
-                MediaDevice device = tuple2.getT1();
-                ParentPlatform parentPlatform = tuple2.getT2();
+        Mono.just(new ParentPlatform())
+            .doOnNext(platform->{
                 try {
                     if (device == null
-                        && parentPlatform == null) {
+                        && platform == null) {
                         // 不存在则回复404
                         responseAck(evt, Response.NOT_FOUND, "device "+ deviceId +" not found");
                         log.warn("[设备未找到 ]: {}", deviceId);
@@ -87,11 +87,9 @@ public class MessageRequestProcessor extends SipRequestProcessorParent {
                             }
                         }else { // 由于上面已经判断都为null则直接返回,所以这里device和parentPlatform必有一个不为null
                             if(platProcessor.hasDownstreams()){
-                                platFluxSink.next(Tuple.of(evt,parentPlatform,rootElement));
+                                platFluxSink.next(Tuple.of(evt,platform,rootElement));
                             }
                         }
-                        // 不支持的message
-                        // 不存在则回复415
                         responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE, "Unsupported message type, must Control/Notify/Query/Response");
                     }
 

+ 114 - 0
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/message/notify/CatalogResponseMessageProcessor.java

@@ -0,0 +1,114 @@
+package org.jetlinks.community.media.sip.request.message.notify;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.dom4j.DocumentException;
+import org.dom4j.Element;
+import org.jetlinks.community.media.bean.ParentPlatform;
+import org.jetlinks.community.media.contanst.CmdType;
+import org.jetlinks.community.media.entity.MediaDevice;
+import org.jetlinks.community.media.entity.MediaDeviceChannel;
+import org.jetlinks.community.media.service.LocalMediaDeviceChannelService;
+import org.jetlinks.community.media.service.LocalMediaDeviceService;
+import org.jetlinks.community.media.sip.request.message.MessageHandlerAbstract;
+import org.jetlinks.community.media.transmit.callback.DeferredResultHolder;
+import org.jetlinks.community.utils.XmlUtil;
+import org.springframework.stereotype.Component;
+import javax.sip.RequestEvent;
+import javax.sip.message.Response;
+import java.util.*;
+
+
+/**
+ * @description: 处理设备目录响应,获取设备目录响应
+ * @author: panlinlin
+ * @date: 2021年11月5日 16:40
+ */
+@Component
+@Slf4j
+@AllArgsConstructor
+public class CatalogResponseMessageProcessor extends MessageHandlerAbstract {
+
+    private final LocalMediaDeviceService mediaDeviceService;
+    private final LocalMediaDeviceChannelService deviceChannelService;
+    @Override
+    public String getMethod() {
+        return "Catalog";
+    }
+
+    @Override
+    public boolean matchCmd(String requestCmd) {
+        return CmdType.CATALOG.equals(requestCmd);
+    }
+
+    @Override
+    public void handleForDevice(RequestEvent evt, MediaDevice device, Element element) {
+        String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + device.getId();
+        Element rootElement = null;
+        try {
+            rootElement = getRootElement(evt, Optional.ofNullable(device.getCharset()).orElse("UTF-8"));
+            Element deviceListElement = rootElement.element("DeviceList");
+            Element sumNumElement = rootElement.element("SumNum");
+            if (sumNumElement == null || deviceListElement == null) {
+                responseAck(evt, Response.BAD_REQUEST, "xml error");
+                return;
+            }
+            int sumNum = Integer.parseInt(sumNumElement.getText());
+            Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
+            if (deviceListIterator != null) {
+                Set<MediaDeviceChannel> channelList = new HashSet<>();
+                // 遍历DeviceList
+                while (deviceListIterator.hasNext()) {
+                    Element itemDevice = deviceListIterator.next();
+                    Element channelDeviceElement = itemDevice.element("DeviceID");
+                    if (channelDeviceElement == null) {
+                        continue;
+                    }
+                    MediaDeviceChannel deviceChannel = XmlUtil.channelContentHander(itemDevice);
+                    deviceChannel.setDeviceId(device.getId());
+                    if(log.isDebugEnabled()){
+                        log.debug("收到来自设备【{}】的通道: {}【{}】", device.getId(), deviceChannel.getName(), deviceChannel.getChannelId());
+                    }
+                    channelList.add(deviceChannel);
+                }
+                //更新设备通道
+                deviceChannelService.createDelete()
+                    .where(MediaDeviceChannel::getDeviceId,device.getId())
+                    .execute()
+                    .flatMap(__->deviceChannelService.save(channelList))
+                    .subscribe();
+//                catalogDataCatch.put(key, sumNum, device, channelList);
+//                if (catalogDataCatch.get(key).size() == sumNum) {
+                // 数据已经完整接收
+//                    boolean resetChannelsResult = storager.resetChannels(device.getDeviceId(), catalogDataCatch.get(key));
+//                    RequestMessage msg = new RequestMessage();
+//                    msg.setKey(key);
+//                    WVPResult<Object> result = new WVPResult<>();
+//                    result.setCode(0);
+//                    result.setData(device);
+//                    if (resetChannelsResult) {
+//                        result.setMsg("更新成功,共" + sumNum + "条,已更新" + catalogDataCatch.get(key).size() + "条");
+//                    }else {
+//                        result.setMsg("接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(key).size() + "条");
+//                    }
+//                    msg.setData(result);
+//                    deferredResultHolder.invokeAllResult(msg);
+//                    catalogDataCatch.del(key);
+            }
+
+            // 回复200 OK
+            responseAck(evt, Response.OK);
+//                if (offLineDetector.isOnline(device.getDeviceId())) {
+//                    publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_MESSAGE);
+//                }
+//            }
+        } catch (Exception e) {
+            log.error("接受媒体设备目录信息失败,",e);
+        }
+    }
+
+    @Override
+    public void handleForPlatform(RequestEvent evt, ParentPlatform parentPlatform, Element element) {
+
+    }
+}

+ 100 - 0
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/message/notify/DeviceInfoResponseMessageProcessor.java

@@ -0,0 +1,100 @@
+package org.jetlinks.community.media.sip.request.message.notify;
+
+import gov.nist.javax.sip.ResponseEventExt;
+import gov.nist.javax.sip.stack.SIPDialog;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.dom4j.DocumentException;
+import org.dom4j.Element;
+import org.jetlinks.community.media.bean.ParentPlatform;
+import org.jetlinks.community.media.contanst.CmdType;
+import org.jetlinks.community.media.entity.MediaDevice;
+import org.jetlinks.community.media.service.LocalMediaDeviceService;
+import org.jetlinks.community.media.sip.AbstractSipProcessor;
+import org.jetlinks.community.media.sip.request.message.MessageHandlerAbstract;
+import org.jetlinks.community.media.storage.IVideoManagerStorager;
+import org.jetlinks.community.media.storage.impl.RedisCacheStorageImpl;
+import org.jetlinks.community.media.transmit.callback.DeferredResultHolder;
+import org.jetlinks.community.utils.XmlUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+import org.springframework.util.StringUtils;
+
+import javax.sip.InvalidArgumentException;
+import javax.sip.RequestEvent;
+import javax.sip.ResponseEvent;
+import javax.sip.SipException;
+import javax.sip.address.SipURI;
+import javax.sip.header.CSeqHeader;
+import javax.sip.message.Request;
+import javax.sip.message.Response;
+import javax.swing.text.html.Option;
+import java.text.ParseException;
+import java.util.Optional;
+
+
+/**
+ * @description: 处理DeviceInfo响应,获取设备信息响应
+ * @author: panlinlin
+ * @date: 2021年11月5日 16:40
+ */
+@Component
+@Slf4j
+@AllArgsConstructor
+public class DeviceInfoResponseMessageProcessor extends MessageHandlerAbstract {
+
+//    private final IVideoManagerStorager storager;
+
+    private final LocalMediaDeviceService mediaDeviceService;
+    @Override
+    public String getMethod() {
+        return "DeviceInfo";
+    }
+
+    @Override
+    public boolean matchCmd(String requestCmd) {
+        return CmdType.DEVICE_INFO.equals(requestCmd);
+    }
+
+    @Override
+    public void handleForDevice(RequestEvent evt, MediaDevice device, Element element) {
+        if(log.isDebugEnabled()){
+            log.debug("接收到DeviceInfo应答消息");
+        }
+        try {
+            element = getRootElement(evt, Optional.ofNullable(device.getCharset()).orElse("utf-8"));
+            Element deviceIdElement = element.element("DeviceID");
+            String channelId = deviceIdElement.getTextTrim();
+//            String key = DeferredResultHolder.CALLBACK_CMD_DEVICEINFO + device.getId() + channelId;
+            device.setName(XmlUtil.getText(element, "DeviceName"));
+
+            device.setManufacturer(XmlUtil.getText(element, "Manufacturer"));
+            device.setModel(XmlUtil.getText(element, "Model"));
+            device.setFirmware(XmlUtil.getText(element, "Firmware"));
+            if (StringUtils.isEmpty(device.getStreamMode())) {
+                device.setStreamMode("UDP");
+            }
+            //更新设备信息
+            mediaDeviceService.updateDeviceAndCache(device).subscribe();
+
+//            RequestMessage msg = new RequestMessage();
+//            msg.setKey(key);
+//            msg.setData(device);
+//            deferredResultHolder.invokeAllResult(msg);
+            // 回复200 OK
+            responseAck(evt, Response.OK);
+//            if (offLineDetector.isOnline(device.getId())) {
+//                publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_MESSAGE);
+//            }
+        } catch (Exception e) {
+            log.error("接受媒体设备信息回复时发生错误,",e);
+        }
+    }
+
+    @Override
+    public void handleForPlatform(RequestEvent evt, ParentPlatform parentPlatform, Element element) {
+
+    }
+}

+ 1 - 1
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/message/notify/KeepaliveNotifyMessageHandler.java

@@ -61,7 +61,7 @@ public class KeepaliveNotifyMessageHandler extends MessageHandlerAbstract {
 //                    redisCatchStorage.updateDevice(device);
                 }
                 //todo 心跳成功,重置过期时间
-                eventBus.publish("/media/device/heart-beat",device).subscribe();
+                eventBus.publish("/media/device/heart-beat/"+device.getId(),device).subscribe();
             }
         } catch (SipException e) {
             e.printStackTrace();

+ 1 - 0
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/response/InviteResponseProcessor.java

@@ -36,6 +36,7 @@ public class InviteResponseProcessor extends AbstractSipProcessor {
 			int statusCode = response.getStatusCode();
 			// trying不会回复
 			if (statusCode == Response.TRYING) {
+			    return;
 			}
 			// 成功响应
 			// 下发ack

+ 10 - 3
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/storage/impl/RedisCatchStorageImpl.java → jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/storage/impl/RedisCacheStorageImpl.java

@@ -22,7 +22,7 @@ import java.util.Map;
 @SuppressWarnings("rawtypes")
 @Component
 @Slf4j
-public class RedisCatchStorageImpl {
+public class RedisCacheStorageImpl {
 
     private final RedisUtil redis;
 
@@ -31,7 +31,7 @@ public class RedisCatchStorageImpl {
     private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 
     @Autowired
-    public RedisCatchStorageImpl(RedisUtil redis,
+    public RedisCacheStorageImpl(RedisUtil redis,
                                  ClusterManager clusterManager) {
         this.redis = redis;
         this.serverId=clusterManager.getCurrentServerId();
@@ -335,7 +335,7 @@ public class RedisCatchStorageImpl {
     }
 
 
-    public void clearCatchByDeviceId(String deviceId) {
+    public void clearCacheByDeviceId(String deviceId) {
         List<Object> playLeys = redis.scan(String.format("%S_%s_*_%s_*", VideoManagerConstants.PLAYER_PREFIX,
                 serverId,
                 deviceId));
@@ -450,6 +450,7 @@ public class RedisCatchStorageImpl {
     public void updateDevice(MediaDevice device) {
         String key = VideoManagerConstants.DEVICE_PREFIX + serverId + "_" + device.getId();
         redis.set(key, device);
+        redis.expire(key,device.getExpires());
     }
 
 
@@ -464,6 +465,12 @@ public class RedisCatchStorageImpl {
         return (MediaDevice)redis.get(key);
     }
 
+    public List<MediaDevice> getAllOnlineDevice() {
+        List<Object> scan = redis.scan(VideoManagerConstants.DEVICE_PREFIX + "*");
+        List<MediaDevice> mediaDevices = new ArrayList<>();
+        scan.stream().filter(result-> result instanceof MediaDevice).forEach(result->mediaDevices.add((MediaDevice) result));
+        return mediaDevices;
+    }
 
 //    public void updateGpsMsgInfo(GPSMsgInfo gpsMsgInfo) {
 //        String key = VideoManagerConstants.WVP_STREAM_GPS_MSG_PREFIX + serverId + "_" + gpsMsgInfo.getId();

+ 8 - 8
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/transmit/SIPRequestHeaderProvider.java

@@ -8,7 +8,7 @@ import org.jetlinks.community.media.entity.MediaDevice;
 import org.jetlinks.community.media.bean.SipServerConfig;
 import org.jetlinks.community.media.bean.WvpSipDate;
 import org.jetlinks.community.media.sip.SipContext;
-import org.jetlinks.community.media.storage.impl.RedisCatchStorageImpl;
+import org.jetlinks.community.media.storage.impl.RedisCacheStorageImpl;
 import org.springframework.stereotype.Component;
 
 import javax.sip.Dialog;
@@ -34,7 +34,7 @@ import java.util.Locale;
 @AllArgsConstructor
 public class SIPRequestHeaderProvider {
 
-	private final RedisCatchStorageImpl redisCatchStorage;
+	private final RedisCacheStorageImpl redisCatchStorage;
 
 //	@Autowired
 //	private VideoStreamSessionManager streamSession;
@@ -115,14 +115,14 @@ public class SIPRequestHeaderProvider {
 		ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("APPLICATION", "SDP");
 
         //ua
-        UserAgentHeader userAgentHeader = sipFactory.createHeaderFactory().createUserAgentHeader(Arrays.asList("eXosip/4.1.0"));
-        request.setHeader(userAgentHeader);
+//        UserAgentHeader userAgentHeader = sipFactory.createHeaderFactory().createUserAgentHeader(Arrays.asList("eXosip/4.1.0"));
+//        request.setHeader(userAgentHeader);
 
 		//date
-        WvpSipDate wvpSipDate = new WvpSipDate(Calendar.getInstance(Locale.ENGLISH).getTimeInMillis());
-        SIPDateHeader dateHeader = new SIPDateHeader();
-        dateHeader.setDate(wvpSipDate);
-        request.setHeader(dateHeader);
+//        WvpSipDate wvpSipDate = new WvpSipDate(Calendar.getInstance(Locale.ENGLISH).getTimeInMillis());
+//        SIPDateHeader dateHeader = new SIPDateHeader();
+//        dateHeader.setDate(wvpSipDate);
+//        request.setHeader(dateHeader);
 
 
 		request.setContent(content, contentTypeHeader);

+ 63 - 0
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/transmit/cmd/SipCommander.java

@@ -245,6 +245,69 @@ public class SipCommander {
 //        }
 //    }
 
+
+    /**
+     * 查询设备信息
+     * @param device
+     * @return
+     */
+    public Mono<Void> deviceInfoQuery(MediaDevice device) {
+        try {
+            StringBuffer catalogXml = new StringBuffer(200);
+            catalogXml.append("<?xml version=\"1.0\" encoding=\"GB2312\"?>\r\n");
+            catalogXml.append("<Query>\r\n");
+            catalogXml.append("<CmdType>DeviceInfo</CmdType>\r\n");
+            catalogXml.append("<SN>" + (int)((Math.random()*9+1)*100000) + "</SN>\r\n");
+            catalogXml.append("<DeviceID>" + device.getId() + "</DeviceID>\r\n");
+            catalogXml.append("</Query>\r\n");
+
+            String tm = Long.toString(System.currentTimeMillis());
+
+            SipProvider sipProvider = SipContext.getSipProvider();
+
+            CallIdHeader callIdHeader =sipProvider.getNewCallId();
+
+
+            Request request = headerProvider.createMessageRequest(device, catalogXml.toString(), "z9hG4bK-ViaDeviceInfo-" + tm, "FromDev" + tm, null, callIdHeader);
+
+            return transmitRequest(sipProvider, request).then();
+
+        } catch (SipException | ParseException | InvalidArgumentException e) {
+            return Mono.error(new RuntimeException("查询媒体设备出错,",e));
+        }
+    }
+
+    /**
+     * 查询目录列表
+     * @param device
+     * @param errorEvent
+     * @return
+     */
+    public Mono<Void> catalogQuery(MediaDevice device, SipSubscribe.Event errorEvent) {
+        try {
+            StringBuffer catalogXml = new StringBuffer(200);
+            catalogXml.append("<?xml version=\"1.0\" encoding=\"GB2312\"?>\r\n");
+            catalogXml.append("<Query>\r\n");
+            catalogXml.append("<CmdType>Catalog</CmdType>\r\n");
+            catalogXml.append("<SN>" + (int)((Math.random()*9+1)*100000) + "</SN>\r\n");
+            catalogXml.append("<DeviceID>" + device.getId() + "</DeviceID>\r\n");
+            catalogXml.append("</Query>\r\n");
+
+            String tm = Long.toString(System.currentTimeMillis());
+
+            SipProvider sipProvider = SipContext.getSipProvider();
+
+            CallIdHeader callIdHeader =sipProvider.getNewCallId();
+
+            Request request = headerProvider.createMessageRequest(device, catalogXml.toString(), "z9hG4bK-ViaCatalog-" + tm, "FromCat" + tm, null, callIdHeader);
+
+            //todo 失败处理
+            return transmitRequest(sipProvider, request).then();
+        } catch (SipException | ParseException | InvalidArgumentException e) {
+            return Mono.error(new RuntimeException("查询设备目录列表失败,",e));
+        }
+    }
+
     private Flux<EventResult> transmitRequest(SipProvider sipProvider, Request request) throws SipException {
         ClientTransaction clientTransaction = sipProvider.getNewClientTransaction(request);
         CallIdHeader callIdHeader = (CallIdHeader)request.getHeader(CallIdHeader.NAME);

+ 2 - 2
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/ZLMHttpHookListener.java

@@ -7,7 +7,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.hswebframework.web.authorization.annotation.Authorize;
 import org.jetlinks.community.media.bean.StreamInfo;
 import org.jetlinks.community.media.service.LocalMediaServerItemService;
-import org.jetlinks.community.media.storage.impl.RedisCatchStorageImpl;
+import org.jetlinks.community.media.storage.impl.RedisCacheStorageImpl;
 import org.jetlinks.community.media.zlm.dto.MediaItem;
 import org.jetlinks.community.media.zlm.entity.MediaServerItem;
 import org.jetlinks.community.media.zlm.dto.OriginType;
@@ -29,7 +29,7 @@ import java.util.List;
 @AllArgsConstructor
 public class ZLMHttpHookListener {
     private final LocalMediaServerItemService mediaServerItemService;
-    private final RedisCatchStorageImpl redisCatchStorage;
+    private final RedisCacheStorageImpl redisCatchStorage;
     private final ZLMHttpHookSubscribe subscribe;
 //
 //	/**

+ 2 - 1
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/ZLMRTPServerFactory.java

@@ -19,7 +19,7 @@ public class ZLMRTPServerFactory {
 
     private final ZLMRESTfulUtils zlmresTfulUtils;
 
-    private int[] portRangeArray = new int[2];
+
 
     public int createRTPServer(MediaServerItem mediaServerItem, String streamId) {
         Map<String, Integer> currentStreams = new HashMap<>();
@@ -123,6 +123,7 @@ public class ZLMRTPServerFactory {
     }
 
     private int getPortFromportRange(MediaServerItem mediaServerItem) {
+        int[] portRangeArray = new int[2];
         int currentPort = mediaServerItem.getCurrentPort();
         if (currentPort == 0) {
             String[] portRangeStrArray = mediaServerItem.getSendRtpPortRange().split(",");

+ 92 - 6
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/utils/XmlUtil.java

@@ -7,6 +7,7 @@ import org.dom4j.Document;
 import org.dom4j.DocumentException;
 import org.dom4j.Element;
 import org.dom4j.io.SAXReader;
+import org.jetlinks.community.media.entity.MediaDeviceChannel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.StringUtils;
@@ -30,9 +31,9 @@ public class XmlUtil {
 
     /**
      * 解析XML为Document对象
-     * 
+     *
      * @param xml 被解析的XMl
-     * 
+     *
      * @return Document
      */
     public static Element parseXml(String xml) {
@@ -50,7 +51,7 @@ public class XmlUtil {
 
     /**
      * 获取element对象的text的值
-     * 
+     *
      * @param em  节点的对象
      * @param tag 节点的tag
      * @return 节点
@@ -66,7 +67,7 @@ public class XmlUtil {
 
     /**
      * 递归解析xml节点,适用于 多节点数据
-     * 
+     *
      * @param node     node
      * @param nodeName nodeName
      * @return List<Map<String, Object>>
@@ -105,7 +106,7 @@ public class XmlUtil {
 
     /**
      * xml转json
-     * 
+     *
      * @param element
      * @param json
      */
@@ -159,7 +160,7 @@ public class XmlUtil {
             }
         }
     }
-    public static Element getRootElement(RequestEvent evt) throws DocumentException {
+    public static  Element getRootElement(RequestEvent evt) throws DocumentException {
 
         return getRootElement(evt, "gb2312");
     }
@@ -178,4 +179,89 @@ public class XmlUtil {
         Document xml = reader.read(new ByteArrayInputStream(content));
         return xml.getRootElement();
     }
+
+    public static MediaDeviceChannel channelContentHander(Element itemDevice){
+        Element channdelNameElement = itemDevice.element("Name");
+        String channelName = channdelNameElement != null ? channdelNameElement.getTextTrim().toString() : "";
+        Element statusElement = itemDevice.element("Status");
+        String status = statusElement != null ? statusElement.getTextTrim() : "ON";
+        MediaDeviceChannel deviceChannel = new MediaDeviceChannel();
+        deviceChannel.setName(channelName);
+        Element channelIdElement = itemDevice.element("DeviceID");
+        String channelId = channelIdElement != null ? channelIdElement.getTextTrim() : "";
+        deviceChannel.setChannelId(channelId);
+        // ONLINE OFFLINE HIKVISION DS-7716N-E4 NVR的兼容性处理
+        if (status.equals("ON") || status.equals("On") || status.equals("ONLINE")) {
+            deviceChannel.setStatus(1);
+        }
+        if (status.equals("OFF") || status.equals("Off") || status.equals("OFFLINE")) {
+            deviceChannel.setStatus(0);
+        }
+
+        deviceChannel.setManufacture(XmlUtil.getText(itemDevice, "Manufacturer"));
+        deviceChannel.setModel(XmlUtil.getText(itemDevice, "Model"));
+        deviceChannel.setOwner(XmlUtil.getText(itemDevice, "Owner"));
+        deviceChannel.setCivilCode(XmlUtil.getText(itemDevice, "CivilCode"));
+        deviceChannel.setBlock(XmlUtil.getText(itemDevice, "Block"));
+        deviceChannel.setAddress(XmlUtil.getText(itemDevice, "Address"));
+        if (XmlUtil.getText(itemDevice, "Parental") == null
+            || XmlUtil.getText(itemDevice, "Parental") == "") {
+            deviceChannel.setParental(0);
+        } else {
+            deviceChannel.setParental(Integer.parseInt(XmlUtil.getText(itemDevice, "Parental")));
+        }
+        deviceChannel.setParentId(XmlUtil.getText(itemDevice, "ParentID"));
+        if (XmlUtil.getText(itemDevice, "SafetyWay") == null
+            || XmlUtil.getText(itemDevice, "SafetyWay") == "") {
+            deviceChannel.setSafetyWay(0);
+        } else {
+            deviceChannel.setSafetyWay(Integer.parseInt(XmlUtil.getText(itemDevice, "SafetyWay")));
+        }
+        if (XmlUtil.getText(itemDevice, "RegisterWay") == null
+            || XmlUtil.getText(itemDevice, "RegisterWay") == "") {
+            deviceChannel.setRegisterWay(1);
+        } else {
+            deviceChannel.setRegisterWay(Integer.parseInt(XmlUtil.getText(itemDevice, "RegisterWay")));
+        }
+        deviceChannel.setCertNum(XmlUtil.getText(itemDevice, "CertNum"));
+        if (XmlUtil.getText(itemDevice, "Certifiable") == null
+            || XmlUtil.getText(itemDevice, "Certifiable") == "") {
+            deviceChannel.setCertifiable(0);
+        } else {
+            deviceChannel.setCertifiable(Integer.parseInt(XmlUtil.getText(itemDevice, "Certifiable")));
+        }
+        if (XmlUtil.getText(itemDevice, "ErrCode") == null
+            || XmlUtil.getText(itemDevice, "ErrCode") == "") {
+            deviceChannel.setErrCode(0);
+        } else {
+            deviceChannel.setErrCode(Integer.parseInt(XmlUtil.getText(itemDevice, "ErrCode")));
+        }
+        deviceChannel.setEndTime(XmlUtil.getText(itemDevice, "EndTime"));
+        deviceChannel.setSecrecy(XmlUtil.getText(itemDevice, "Secrecy"));
+        deviceChannel.setIpAddress(XmlUtil.getText(itemDevice, "IPAddress"));
+        if (XmlUtil.getText(itemDevice, "Port") == null || XmlUtil.getText(itemDevice, "Port") == "") {
+            deviceChannel.setPort(0);
+        } else {
+            deviceChannel.setPort(Integer.parseInt(XmlUtil.getText(itemDevice, "Port")));
+        }
+        deviceChannel.setPassword(XmlUtil.getText(itemDevice, "Password"));
+        if (NumericUtil.isDouble(XmlUtil.getText(itemDevice, "Longitude"))) {
+            deviceChannel.setLongitude(Double.parseDouble(XmlUtil.getText(itemDevice, "Longitude")));
+        } else {
+            deviceChannel.setLongitude(0.00);
+        }
+        if (NumericUtil.isDouble(XmlUtil.getText(itemDevice, "Latitude"))) {
+            deviceChannel.setLatitude(Double.parseDouble(XmlUtil.getText(itemDevice, "Latitude")));
+        } else {
+            deviceChannel.setLatitude(0.00);
+        }
+        if (XmlUtil.getText(itemDevice, "PTZType") == null
+            || XmlUtil.getText(itemDevice, "PTZType") == "") {
+            deviceChannel.setPTZType(0);
+        } else {
+            deviceChannel.setPTZType(Integer.parseInt(XmlUtil.getText(itemDevice, "PTZType")));
+        }
+        deviceChannel.setHasAudio(true); // 默认含有音频,播放时再检查是否有音频及是否AAC
+        return deviceChannel;
+    }
 }