Przeglądaj źródła

sip 网关接口

18339543638 3 lat temu
rodzic
commit
b560aec47f
21 zmienionych plików z 393 dodań i 299 usunięć
  1. 0 51
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/bean/SipServerConfig.java
  2. 24 17
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/controller/MediaDeviceController.java
  3. 45 17
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/controller/MediaGatewayController.java
  4. 6 6
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/entity/MediaDevice.java
  5. 89 0
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/entity/SipGateway.java
  6. 17 12
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalMediaDeviceChannelService.java
  7. 20 25
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalMediaDeviceService.java
  8. 34 0
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalSipGatewayService.java
  9. 30 11
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/SipContext.java
  10. 64 0
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/SipGatewayHelper.java
  11. 3 3
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/SipRequestProcessorParent.java
  12. 0 113
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/SipServerHelper.java
  13. 4 4
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/impl/RegisterRequestProcessor.java
  14. 12 6
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/message/notify/CatalogResponseMessageProcessor.java
  15. 1 1
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/message/notify/DeviceInfoResponseMessageProcessor.java
  16. 7 12
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/transmit/SIPRequestHeaderProvider.java
  17. 2 6
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/transmit/cmd/SipCommander.java
  18. 8 0
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/utils/RedisUtil.java
  19. 7 2
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/utils/XmlUtil.java
  20. 17 6
      jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/DefaultDeviceSessionManager.java
  21. 3 7
      jetlinks-standalone/src/test/java/org/jetlinks/community/BridgeTest.java

+ 0 - 51
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/bean/SipServerConfig.java

@@ -1,51 +0,0 @@
-package org.jetlinks.community.media.bean;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import org.hswebframework.ezorm.rdb.mapping.annotation.Comment;
-import org.hswebframework.web.exception.BusinessException;
-import java.util.*;
-/**
- * @author lifang
- * @version 1.0.0
- * @ClassName SipServerConfig.java
- * @Description sip服务器配置
- * @createTime 2022年01月15日 10:52:00
- */
-@Data
-@EqualsAndHashCode(callSuper = false)
-@AllArgsConstructor(staticName = "of")
-public class SipServerConfig {
-
-    private String id;
-
-    private String name;
-
-    private String domain;
-
-    @Comment("sip服务地址")
-    private String host;
-
-    @Comment("sip服务端口")
-    private Integer port;
-
-    @Comment("sip服务端口")
-    private String password="12345678";
-
-    @Comment("sip服务端口")
-    private String charset;
-
-    @Comment("通信协议,tcp、udp")
-    private String transport;
-
-    @Comment("产品id")
-    private String productId;
-
-
-
-    public void validateCreate(){
-        if(!"tcp".equals(transport.toLowerCase())&&!"udp".equals(transport.toLowerCase())){
-            throw new BusinessException("不支持该协议[{"+transport+"}],请从tcp、udp中选择");
-        }
-    }
-}

+ 24 - 17
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/controller/MediaDeviceController.java

@@ -12,7 +12,9 @@ import org.hswebframework.web.crud.web.reactive.ReactiveServiceCrudController;
 import org.hswebframework.web.exception.BusinessException;
 import org.jetlinks.community.media.bean.StreamInfo;
 import org.jetlinks.community.media.entity.MediaDevice;
+import org.jetlinks.community.media.entity.MediaDeviceChannel;
 import org.jetlinks.community.media.enums.DeviceState;
+import org.jetlinks.community.media.enums.PtzType;
 import org.jetlinks.community.media.message.MediaMessage;
 import org.jetlinks.community.media.message.MediaMessageReply;
 import org.jetlinks.community.media.service.LocalMediaDeviceChannelService;
@@ -28,8 +30,6 @@ import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.device.StandaloneDeviceMessageBroker;
 import org.jetlinks.core.exception.DeviceOperationException;
 import org.jetlinks.core.message.DeviceMessageReply;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.ResponseEntity;
 import org.springframework.web.bind.annotation.*;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -77,9 +77,9 @@ public class MediaDeviceController implements ReactiveServiceCrudController<Medi
     public Mono<Object> play(@PathVariable String deviceId,
                              @PathVariable String channelId) {
         return
-            //获取设备信息
+            //验证并返回设备信息
             mediaDeviceService.findById(deviceId)
-                .switchIfEmpty(Mono.error(new BusinessException("设备未注册")))
+                .switchIfEmpty(Mono.error(new BusinessException("设备已注销")))
                 //获取设备相连的媒体流服务器信息
                 .flatMap(playService::getNewMediaServerItem)
                 .switchIfEmpty(Mono.error(new BusinessException("未找到可用的zlm媒体服务器")))
@@ -161,12 +161,9 @@ public class MediaDeviceController implements ReactiveServiceCrudController<Medi
         }
         String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + id;
         return mediaDeviceService.findById(id)
-            .switchIfEmpty(Mono.error(new BusinessException("设备已离线,无法更新通道最新信息")))
-            .flatMap(device->{
-                if (DeviceState.offline==device.getState()) {
-                    return Mono.error(new BusinessException("设备已离线,无法更新通道最新信息"));
-                }
-                return messageBroker.handleReply(id,key,Duration.ofSeconds(30))
+            .switchIfEmpty(Mono.error(new BusinessException("设备已注销")))
+            .flatMap(device->
+                messageBroker.handleReply(id,key,Duration.ofSeconds(30))
                     .onErrorResume(DeviceOperationException.class,error-> {
                         //设备下线
                         mediaDeviceService.deviceOffline(device);
@@ -181,8 +178,7 @@ public class MediaDeviceController implements ReactiveServiceCrudController<Medi
                     })
                         .then(Mono.empty()))
                     .flatMap(this::convertReply)
-                    .singleOrEmpty();
-                }
+                    .singleOrEmpty()
             );
     }
 
@@ -198,12 +194,22 @@ public class MediaDeviceController implements ReactiveServiceCrudController<Medi
     @Operation(summary = "云台控制")
     @PostMapping({"/{deviceId}/{channelId}/_ptz/{command}/{speed}","/{deviceId}/{channelId}/_ptz/{command}"})
     public Mono<Void> ptz(@PathVariable("deviceId") String deviceId, @PathVariable("channelId") String channelId, @PathVariable(value = "command",required = false) String command, @PathVariable(value = "speed",required = false) Integer speedParameter){
-       return  mediaDeviceService.findById(deviceId)
-           .switchIfEmpty(Mono.error(new BusinessException("设备已注销")))
+        return deviceChannelService.createQuery()
+            .where(MediaDeviceChannel::getDeviceId,deviceId)
+            .where(MediaDeviceChannel::getChannelId,channelId)
+            .fetchOne()
+            .switchIfEmpty(Mono.error(new BusinessException("该通道不存在,请刷新后重试")))
+            .flatMap(channel->{
+                if(channel.getPtzType()== PtzType.UNKNOWN){
+                    return Mono.error(new BusinessException("不支持当前云台类型(UNKNOW)进行云台操作"));
+                }
+                if(channel.getStatus()==DeviceState.offline){
+                    return Mono.error(new BusinessException("该通道已下线"));
+                }
+                return Mono.just(channel);
+            })
+            .flatMap(channel->  mediaDeviceService.findById(deviceId)) .switchIfEmpty(Mono.error(new BusinessException("设备已注销")))
             .flatMap(device->{
-//                if(device.getState()==DeviceState.offline){
-//                    return Mono.error(new BusinessException("设备已下线"));
-//                }
                 Integer speed=Optional.ofNullable(speedParameter).orElse(0);
                 int cmdCode = 0;
                 //orizonSpeed	水平移动速度
@@ -258,6 +264,7 @@ public class MediaDeviceController implements ReactiveServiceCrudController<Medi
                 return cmder.frontEndCmd(device, channelId, cmdCode, horizonSpeed, verticalSpeed, zoomSpeed);
             });
 
+
     }
     private Flux<Object> convertReply(DeviceMessageReply reply){
         if(reply instanceof MediaMessageReply){

+ 45 - 17
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/controller/MediaGatewayController.java

@@ -1,23 +1,28 @@
 package org.jetlinks.community.media.controller;
 
+import io.swagger.v3.oas.annotations.Operation;
 import io.swagger.v3.oas.annotations.tags.Tag;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.hswebframework.web.authorization.annotation.Authorize;
 import org.hswebframework.web.authorization.annotation.Resource;
+import org.hswebframework.web.exception.BusinessException;
 import org.jetlinks.community.media.bean.DeviceNode;
+import org.jetlinks.community.media.entity.SipGateway;
 import org.jetlinks.community.media.service.LocalMediaDeviceService;
+import org.jetlinks.community.media.service.LocalSipGatewayService;
+import org.jetlinks.community.media.sip.SipContext;
+import org.jetlinks.community.media.sip.SipGatewayHelper;
 import org.jetlinks.core.device.DeviceOperationBroker;
 import org.jetlinks.core.message.AcknowledgeDeviceMessage;
 import org.jetlinks.core.message.DeviceMessageReply;
 import org.jetlinks.core.server.MessageHandler;
-import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.bind.annotation.*;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
-import reactor.core.publisher.UnicastProcessor;
 
+import javax.sip.InvalidArgumentException;
+import javax.sip.TransportNotSupportedException;
 import java.time.Duration;
 import java.util.*;
 
@@ -36,20 +41,9 @@ import java.util.*;
 @AllArgsConstructor
 @Tag(name = "设备平台管理")
 public class MediaGatewayController {
-    private final LocalMediaDeviceService mediaDeviceService;
-
-    private final DeviceOperationBroker operationBroker;
-
-    private final MessageHandler messageHandler;
-    @GetMapping
-    public Flux<DeviceMessageReply> test(){
-        AcknowledgeDeviceMessage message = new AcknowledgeDeviceMessage();
-        message.deviceId("123");
-        message.messageId("123");
-        return operationBroker.handleReply("123","123", Duration.ofSeconds(10))
-            .mergeWith(messageHandler.reply(message).then(Mono.empty()));
-    }
 
+    private final LocalSipGatewayService gatewayService;
+    private final SipGatewayHelper gatewayHelper;
     @GetMapping("_query/no-paging")
     public Mono<List> deviceTree(){
         DeviceNode deviceNode = new DeviceNode();
@@ -62,4 +56,38 @@ public class MediaGatewayController {
         deviceNode.setTitle("title");
         return Mono.just(Arrays.asList(deviceNode));
     }
+
+    @PatchMapping
+    @Operation(summary = "创建Sip网关服务器")
+    public Mono<Void> createGateway(@RequestBody SipGateway gateway){
+        return gatewayService.getDefault()
+            .doOnNext(existGateway ->{
+                try {
+                    if(!existGateway.equals(gateway)){
+                        SipContext.updateSipServerConfig(gateway);
+                    }
+                    else {
+                        SipContext.updateSipServerConfig(gateway,false);
+                    }
+                }catch (TransportNotSupportedException e) {
+                    throw new BusinessException("Sip不支持该协议",e.getMessage());
+                } catch (InvalidArgumentException e) {
+                    throw new BusinessException("Sip配置参数无效",e.getMessage());
+                }
+            } )
+            .flatMap(existGateway->gatewayService.updateById(existGateway.getId(),gateway))
+            .switchIfEmpty(
+                //保存网关服务器
+                gatewayService.save(gateway)
+                    .then( gatewayHelper.createSip(gateway))
+                    .then(Mono.empty())
+            )
+            .then();
+    }
+
+    @GetMapping("/gb28181_gateway")
+    @Operation(summary = "获取默认sip网关配置")
+    public Mono<SipGateway> getDefault(@RequestBody SipGateway gateway){
+        return gatewayService.getDefault();
+    }
 }

+ 6 - 6
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/entity/MediaDevice.java

@@ -131,23 +131,23 @@ public class MediaDevice extends GenericEntity<String> implements RecordCreation
 	/**
 	 * 注册时间
 	 */
-    @Column(name = "register_time", updatable = false)
+    @Column(name = "last_register_time")
     @Schema(
-        description = "注册时间"
+        description = "最后注册时间"
         ,accessMode = Schema.AccessMode.READ_ONLY
     )
-	private String registerTime;
+	private Long lastRegisterTime;
 
 
 	/**
 	 * 心跳时间
 	 */
-    @Column(name = "keepalive_time", updatable = false)
+    @Column(name = "last_keepalive_time")
     @Schema(
-        description = "心跳时间"
+        description = "最后心跳时间"
         ,accessMode = Schema.AccessMode.READ_ONLY
     )
-	private String keepaliveTime;
+	private Long lastKeepaliveTime;
 
 	/**
 	 * 通道个数

+ 89 - 0
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/entity/SipGateway.java

@@ -0,0 +1,89 @@
+package org.jetlinks.community.media.entity;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.hswebframework.ezorm.rdb.mapping.annotation.Comment;
+import org.hswebframework.web.api.crud.entity.GenericEntity;
+import org.hswebframework.web.exception.BusinessException;
+
+import javax.persistence.Column;
+import javax.persistence.Table;
+import javax.validation.constraints.NotNull;
+import java.util.*;
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName SipGateway.java
+ * @Description sip服务器配置
+ * @createTime 2022年01月15日 10:52:00
+ */
+@Data
+@Table(name = "sip_gateway", schema = "sip网关")
+public class SipGateway  extends GenericEntity<String> {
+    @Column
+    @Comment("sip服务名称")
+    private String name;
+
+    @Comment("sipId")
+    @Column(name = "sip_id")
+    private String sipId;
+
+    @Column
+    @Comment("sip服务域名")
+    private String domain;
+
+    @Column
+    @Comment("sip服务地址")
+    private String host;
+
+    @Column
+    @Comment("sip服务端口")
+    @NotNull(message = "端口号不能为空")
+    private Integer port;
+
+    @Column
+    @Comment("sip服务端口")
+    private String password;
+
+    @Column
+    @Comment("sip编码集")
+    @NotNull(message = "sip编码集不能为空")
+    private String charset;
+
+    @Column
+    @Comment("通信协议,tcp、udp")
+    private String transport;
+
+    @Comment("产品id")
+    @Column(name = "product_id")
+    private String productId;
+
+    @Comment("心跳时间")
+    @Column(name = "heart_beat")
+    private Integer heartBeat=30;
+
+    @Comment("唯一标识,仅作为查询条件 false")
+    @Column
+    private boolean remark;
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        SipGateway that = (SipGateway) o;
+        return getHost().equals(that.getHost()) &&
+            getPort().equals(that.getPort());
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(getHost(), getPort());
+    }
+
+    @Override
+    public String toString() {
+        return "SipGateway{" +
+            "host='" + host + '\'' +
+            ", port=" + port +
+            '}';
+    }
+}

+ 17 - 12
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalMediaDeviceChannelService.java

@@ -1,11 +1,15 @@
 package org.jetlinks.community.media.service;
 
+import io.vertx.core.spi.launcher.Command;
 import lombok.AllArgsConstructor;
 import org.hswebframework.web.crud.service.GenericReactiveCrudService;
 import org.jetlinks.community.media.bean.StreamInfo;
 import org.jetlinks.community.media.entity.MediaDeviceChannel;
 import org.jetlinks.community.media.enums.DeviceState;
 import org.jetlinks.community.media.storage.impl.RedisCacheStorageImpl;
+import org.jetlinks.community.utils.RedisUtil;
+import org.jetlinks.core.cluster.ClusterManager;
+import org.springframework.boot.CommandLineRunner;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 import reactor.core.publisher.Mono;
@@ -22,8 +26,10 @@ import java.util.Set;
  */
 @Service
 @AllArgsConstructor
-public class LocalMediaDeviceChannelService extends GenericReactiveCrudService<MediaDeviceChannel, String> {
+public class LocalMediaDeviceChannelService extends GenericReactiveCrudService<MediaDeviceChannel, String> implements CommandLineRunner {
     private final RedisCacheStorageImpl redisCacheStorage;
+    private final RedisUtil redisUtil;
+    private final ClusterManager clusterManager;
     public Mono<Void> startPlay(String deviceId, String channelId, String streamId) {
         return this.createUpdate()
             .set(MediaDeviceChannel::getStreamId,streamId)
@@ -44,17 +50,6 @@ public class LocalMediaDeviceChannelService extends GenericReactiveCrudService<M
             .then();
     }
 
-    /**
-     *  catlog查询结束后完全重写通道信息
-     * @param id
-     * @param channelList
-     * @return
-     */
-    public Mono<Void> resetChannels(String id, List<MediaDeviceChannel> channelList) {
-        Set<MediaDeviceChannel> channelSets = new HashSet<>();
-
-        return Mono.empty();
-    }
 
     public Mono<Void>  stopPlayback(StreamInfo streamInfo) {
         if(streamInfo==null){
@@ -71,4 +66,14 @@ public class LocalMediaDeviceChannelService extends GenericReactiveCrudService<M
             })
             .then(Mono.fromRunnable(()->redisCacheStorage.stopPlayback(streamInfo)));
     }
+
+    @Override
+    public void run(String... args)  {
+        Set<Object> deviceIds = redisUtil.members("session_" + clusterManager.getCurrentServerId());
+        this.createUpdate()
+            .in(MediaDeviceChannel::getDeviceId,deviceIds)
+            .set(MediaDeviceChannel::getStatus,DeviceState.offline)
+            .execute()
+            .subscribe();
+    }
 }

+ 20 - 25
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalMediaDeviceService.java

@@ -1,19 +1,18 @@
 package org.jetlinks.community.media.service;
 
-import cn.hutool.core.date.DateUtil;
 import lombok.extern.slf4j.Slf4j;
 import org.hswebframework.web.crud.service.GenericReactiveCrudService;
 import org.jetlinks.community.gateway.annotation.Subscribe;
 import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor;
 import org.jetlinks.community.gateway.monitor.GatewayMonitors;
 import org.jetlinks.community.media.entity.MediaDevice;
-import org.jetlinks.community.media.bean.SipServerConfig;
 import org.jetlinks.community.media.entity.MediaDeviceChannel;
 import org.jetlinks.community.media.enums.DeviceState;
 import org.jetlinks.community.media.session.SipSession;
-import org.jetlinks.community.media.sip.SipServerHelper;
 import org.jetlinks.community.media.storage.impl.RedisCacheStorageImpl;
 import org.jetlinks.community.media.transmit.cmd.SipCommander;
+import org.jetlinks.community.utils.RedisUtil;
+import org.jetlinks.core.cluster.ClusterManager;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.event.EventBus;
 import org.jetlinks.core.message.DeviceOfflineMessage;
@@ -26,8 +25,9 @@ import org.jetlinks.supports.server.DecodedClientMessageHandler;
 import org.springframework.boot.CommandLineRunner;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Mono;
-import java.util.List;
+
 import java.util.Optional;
+import java.util.Set;
 
 /**
  * @author lifang
@@ -47,23 +47,26 @@ public class LocalMediaDeviceService extends GenericReactiveCrudService<MediaDev
     private final DeviceGatewayMonitor gatewayMonitor;
     private final DeviceSessionManager sessionManager;
     private final LocalMediaDeviceChannelService deviceChannelService;
-
+    private final String serverId;
+    private final RedisUtil redisUtil;
     public LocalMediaDeviceService(DecodedClientMessageHandler messageHandler,
                                    RedisCacheStorageImpl redisCacheStorage,
                                    EventBus eventBus, DeviceRegistry registry,
                                    SipCommander cmder,
                                    DeviceSessionManager sessionManager,
-                                   SipServerHelper sipServerHelper,
-                                   LocalMediaDeviceChannelService deviceChannelService) {
+                                   LocalMediaDeviceChannelService deviceChannelService,
+                                   RedisUtil redisUtil,
+                                   ClusterManager clusterManager) {
         this.messageHandler = messageHandler;
         this.redisCacheStorage = redisCacheStorage;
         this.eventBus = eventBus;
         this.registry = registry;
+        this.serverId=clusterManager.getCurrentServerId();
         this.cmder = cmder;
         this.gatewayMonitor = GatewayMonitors.getDeviceGatewayMonitor("sip");
         this.sessionManager = sessionManager;
-        this.sipServerHelper = sipServerHelper;
         this.deviceChannelService=deviceChannelService;
+        this.redisUtil=redisUtil;
     }
 
     @Subscribe("/media/device/*/*/register")
@@ -170,13 +173,11 @@ public class LocalMediaDeviceService extends GenericReactiveCrudService<MediaDev
     public void heartBeat(MediaDevice device){
 
         //更新设备心跳时间
-        device.setKeepaliveTime(DateUtil.now());
+        device.setLastKeepaliveTime(System.currentTimeMillis());
         device.setState(DeviceState.online);
-        if (redisCacheStorage.getDevice(device.getId())==null) {
-            //未注册的设备不进行更新
-            updateById(device.getId(),device)
-                .subscribe();
-        }
+        //未注册的设备不进行更新
+        updateById(device.getId(),device)
+            .subscribe();
         //心跳过期 ,设备上线
         redisCacheStorage.updateDevice(device);
         DeviceSession session = sessionManager.getSession(device.getId());
@@ -195,20 +196,14 @@ public class LocalMediaDeviceService extends GenericReactiveCrudService<MediaDev
             .then();
     }
 
-    private final SipServerHelper sipServerHelper;
     @Override
     public void run(String... args) {
-        //todo 后期改为定时
-        List<String> deviceIds = redisCacheStorage.getOnlineForAll();
-        this.createQuery()
+        Set<Object> deviceIds = redisUtil.members("session_" + serverId);
+        this.createUpdate()
             .where(MediaDevice::getState,DeviceState.online)
-            .notIn(MediaDevice::getId,deviceIds)
-            .fetch()
-            .doOnNext(this::unRegister)
-            .concatWith(sipServerHelper.createSip( SipServerConfig.of("34020000002000000001","", "340200000","192.168.104.244",7001,"12345678","GB2312","udp","1")).then(Mono.empty()))
-            .doOnError(e->{
-                e.printStackTrace();
-            })
+            .in(MediaDevice::getId,deviceIds)
+            .set(MediaDevice::getState,DeviceState.offline)
+            .execute()
             .subscribe();
     }
 }

+ 34 - 0
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalSipGatewayService.java

@@ -0,0 +1,34 @@
+package org.jetlinks.community.media.service;
+
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.hswebframework.web.crud.service.GenericReactiveCrudService;
+import org.jetlinks.community.media.entity.SipGateway;
+import org.jetlinks.community.media.sip.SipGatewayHelper;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.stereotype.Service;
+import reactor.core.publisher.Mono;
+
+/**
+ * Sip网关
+ */
+@Service
+@Slf4j
+@AllArgsConstructor
+public class LocalSipGatewayService extends GenericReactiveCrudService<SipGateway, String> implements CommandLineRunner {
+
+    private final SipGatewayHelper sipGatewayHelper;
+    public Mono<SipGateway> getDefault() {
+         return this.createQuery()
+             .where(SipGateway::isRemark,false)
+             .fetchOne();
+    }
+
+    @Override
+    public void run(String... args) throws Exception {
+        this.getDefault()
+            .flatMap(sipGatewayHelper::createSip)
+            .subscribe();
+    }
+}

+ 30 - 11
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/SipContext.java

@@ -5,11 +5,16 @@ import io.vavr.Tuple;
 import io.vavr.Tuple3;
 import lombok.extern.slf4j.Slf4j;
 import org.hswebframework.web.exception.BusinessException;
-import org.jetlinks.community.media.bean.SipServerConfig;
+import org.jetlinks.community.media.entity.SipGateway;
+import org.jetlinks.core.message.codec.Transport;
+import org.jetlinks.core.message.codec.Transports;
+
 import javax.sip.*;
 import javax.sip.header.ViaHeader;
 import java.util.*;
 
+import static javax.sip.ListeningPoint.UDP;
+
 /**
  * @author lifang
  * @version 1.0.0
@@ -21,24 +26,38 @@ import java.util.*;
 public class SipContext {
     private static SipFactory sipFactory=null;
 
-    private static Tuple3<SipServerConfig,SipStack,Map<String,SipProvider>> tuple3Map=Tuple.of(null,null,null);
+    private static Tuple3<SipGateway,SipStack,Map<String,SipProvider>> tuple3Map=Tuple.of(null,null,null);
+
 
+    public static void updateSipServerConfig(SipGateway config) throws TransportNotSupportedException, InvalidArgumentException {
+       updateSipServerConfig(config,true);
+    }
 
-    public static void updateSipServerConfig(SipServerConfig config) throws TransportNotSupportedException, InvalidArgumentException {
+    /**
+     *
+     * @param config
+     * @param reboot 是否重启sip网关
+     * @throws TransportNotSupportedException
+     * @throws InvalidArgumentException
+     */
+    public static void updateSipServerConfig(SipGateway config,boolean reboot) throws TransportNotSupportedException, InvalidArgumentException {
         SipStack sipStack = tuple3Map._2();
         SipProviderImpl provider = (SipProviderImpl) tuple3Map._3();
-        if(provider!=null){
+        if(provider!=null&&reboot){
             provider.removeListeningPoints();
         }
-        if(sipStack!=null){
+        if(sipStack!=null&&reboot){
             sipStack.stop();
         }
         tuple3Map=tuple3Map.update1(config);
-        getSipStack(config);
-        getSipProviders(config);
+        if(reboot){
+            getSipStack(config);
+            getSipProviders(config);
+        }
+
     }
 
-    public static SipServerConfig getConfig(){
+    public static SipGateway getConfig(){
         return tuple3Map._1();
     }
 
@@ -65,20 +84,20 @@ public class SipContext {
         return sipFactory;
     }
 
-    public static SipStack getSipStack(SipServerConfig config){
+    public static SipStack getSipStack(SipGateway config){
         tuple3Map=tuple3Map.update2(createSipStack(config.getHost())) ;
         tuple3Map=tuple3Map.update1(config);
         return getSipStack();
     }
 
 
-    public static Map<String,SipProvider> getSipProviders(SipServerConfig config) throws InvalidArgumentException, TransportNotSupportedException {
+    public static Map<String,SipProvider> getSipProviders(SipGateway config) throws InvalidArgumentException, TransportNotSupportedException {
         SipStack sipStack = getSipStack();
         if(sipStack==null){
             sipStack=getSipStack(config);
         }
         try {
-            listenPoint(sipStack,"0.0.0.0",config.getPort(),config.getTransport());
+            listenPoint(sipStack,"0.0.0.0",config.getPort(), UDP);
         }catch (TransportAlreadySupportedException e){
             e.printStackTrace();
         }catch (ObjectInUseException e){

+ 64 - 0
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/SipGatewayHelper.java

@@ -0,0 +1,64 @@
+package org.jetlinks.community.media.sip;
+
+import gov.nist.javax.sip.SipProviderImpl;
+import lombok.AllArgsConstructor;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.hswebframework.web.exception.BusinessException;
+import org.jetlinks.community.media.entity.SipGateway;
+import org.jetlinks.community.media.sip.processor.SipProcessorObserver;
+import org.springframework.beans.factory.config.BeanPostProcessor;
+import org.springframework.stereotype.Component;
+import reactor.core.publisher.Mono;
+import javax.sip.*;
+import java.util.*;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName SipServerHelper.java
+ * @Description TODO
+ * @createTime 2022年01月15日 14:05:00
+ */
+@Component
+@Slf4j
+@AllArgsConstructor
+public class SipGatewayHelper {
+
+
+    private final SipProcessorObserver sipProcessorObserver;
+
+
+    public Mono<SipProcessorObserver> createSip(SipGateway sipConfig){
+        try {
+            SipContext.updateSipServerConfig(sipConfig);
+        } catch (TransportNotSupportedException e) {
+
+            return Mono.error( new BusinessException("不支持该协议[{"+sipConfig.getTransport()+"}],请从tcp、udp中选择"));
+        } catch (InvalidArgumentException e) {
+            return Mono.error( new BusinessException(String.format("无法使用 [ {%s}:{%s} ]作为SIP[ UDP ]服务,可排查: 1. sip.monitor-ip 是否为本机网卡IP; 2. sip.port 是否已被占用"
+                , sipConfig.getHost(), sipConfig.getPort())));
+
+        }
+        return Mono.just(sipProcessorObserver)
+            .doOnNext(observer->{
+                Map<String,SipProvider> sipProviders =  SipContext.getSipProviders();
+                if(sipProviders==null){
+                    log.error("sip启动失败");
+                    throw new BusinessException("sip启动失败");
+                }
+                for (SipProvider provider : sipProviders.values()) {
+                    SipListener sipListener = ((SipProviderImpl)provider).getSipListener();
+                    if(sipListener!=null){
+                        provider.removeSipListener(sipListener);
+                    }
+                    try {
+                        provider.addSipListener(sipProcessorObserver);
+                    } catch (Exception e) {
+                        log.warn("sipProvider 添加 sipListener 失败");
+                    }
+                }
+            });
+    }
+
+}

+ 3 - 3
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/SipRequestProcessorParent.java

@@ -10,7 +10,7 @@ import org.dom4j.DocumentException;
 import org.dom4j.Element;
 import org.dom4j.io.SAXReader;
 import org.jetlinks.community.media.core.DigestServerAuthenticationHelper2016;
-import org.jetlinks.community.media.bean.SipServerConfig;
+import org.jetlinks.community.media.entity.SipGateway;
 
 import javax.sip.*;
 import javax.sip.address.Address;
@@ -195,7 +195,7 @@ public abstract class SipRequestProcessorParent extends AbstractSipProcessor {
      * 在WWW_Authenticate请求头中传输认证参数
      * @return
      */
-    public Response basicResponse(Request request, SipServerConfig sipConfig) throws NoSuchAlgorithmException, ParseException {
+    public Response basicResponse(Request request, SipGateway sipConfig) throws NoSuchAlgorithmException, ParseException {
         Response response=null;
         log.info("[{}] 未携带授权头 回复401", request.getRequestURI());
         response = getMessageFactory().createResponse(Response.UNAUTHORIZED, request);
@@ -212,7 +212,7 @@ public abstract class SipRequestProcessorParent extends AbstractSipProcessor {
      * @return
      * @throws NoSuchAlgorithmException
      */
-    public boolean authRequest( Request request,SipServerConfig sipConfig) throws NoSuchAlgorithmException {
+    public boolean authRequest(Request request, SipGateway sipConfig) throws NoSuchAlgorithmException {
         return new DigestServerAuthenticationHelper2016()
             .doAuthenticatePlainTextPassword(request,
                 sipConfig.getPassword());

+ 0 - 113
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/SipServerHelper.java

@@ -1,113 +0,0 @@
-package org.jetlinks.community.media.sip;
-
-import gov.nist.javax.sip.SipProviderImpl;
-import lombok.AllArgsConstructor;
-import lombok.SneakyThrows;
-import lombok.extern.slf4j.Slf4j;
-import org.hswebframework.web.exception.BusinessException;
-import org.jetlinks.community.media.bean.EventResult;
-import org.jetlinks.community.media.bean.SSRCInfo;
-import org.jetlinks.community.media.entity.MediaDevice;
-import org.jetlinks.community.media.bean.SipServerConfig;
-import org.jetlinks.community.media.enums.StreamMode;
-import org.jetlinks.community.media.sip.request.impl.InviteRequestProcessor;
-import org.jetlinks.community.media.sip.request.impl.RegisterRequestProcessor;
-import org.jetlinks.community.media.sip.request.message.MessageRequestProcessor;
-import org.jetlinks.community.media.sip.request.message.notify.KeepaliveNotifyMessageHandler;
-import org.jetlinks.community.media.sip.processor.SipProcessorObserver;
-import org.jetlinks.community.media.transmit.SIPRequestHeaderProvider;
-import org.jetlinks.community.media.transmit.cmd.SipCommander;
-import org.jetlinks.community.media.zlm.entity.MediaServerItem;
-import org.springframework.beans.factory.config.BeanPostProcessor;
-import org.springframework.stereotype.Component;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import javax.sip.*;
-import java.util.*;
-
-/**
- * @author lifang
- * @version 1.0.0
- * @ClassName SipServerHelper.java
- * @Description TODO
- * @createTime 2022年01月15日 14:05:00
- */
-@Component
-@Slf4j
-@AllArgsConstructor
-public class SipServerHelper implements BeanPostProcessor {
-
-
-    private final SipProcessorObserver sipProcessorObserver;
-
-
-    public Mono<SipProcessorObserver> createSip(SipServerConfig sipConfig){
-        sipConfig.validateCreate();
-        try {
-            SipContext.updateSipServerConfig(sipConfig);
-        } catch (TransportNotSupportedException e) {
-            return Mono.error( new BusinessException("不支持该协议[{"+sipConfig.getTransport()+"}],请从tcp、udp中选择"));
-        } catch (InvalidArgumentException e) {
-            return Mono.error( new BusinessException(String.format("无法使用 [ {%s}:{%s} ]作为SIP[ UDP ]服务,可排查: 1. sip.monitor-ip 是否为本机网卡IP; 2. sip.port 是否已被占用"
-                , sipConfig.getHost(), sipConfig.getPort())));
-
-        }
-        return Mono.just(sipProcessorObserver)
-            .doOnNext(observer->{
-                Map<String,SipProvider> sipProviders =  SipContext.getSipProviders();
-                if(sipProviders==null){
-                    log.error("sip启动失败");
-                    throw new BusinessException("sip启动失败");
-                }
-                for (SipProvider provider : sipProviders.values()) {
-                    SipListener sipListener = ((SipProviderImpl)provider).getSipListener();
-                    if(sipListener!=null){
-                        provider.removeSipListener(sipListener);
-                    }
-                    try {
-                        provider.addSipListener(sipProcessorObserver);
-                    } catch (Exception e) {
-                        log.warn("sipProvider 添加 sipListener 失败");
-                    }
-                }
-            });
-    }
-
-    @SneakyThrows
-    public static void main(String[] args) {
-//        SipProcessorObserver sipProcessorObserver = new SipProcessorObserver();
-//        RegisterRequestProcessor registerRequestProcessor = new RegisterRequestProcessor(null);
-//        registerRequestProcessor.observer=sipProcessorObserver;
-//        registerRequestProcessor.init();
-//        InviteRequestProcessor inviteRequestProcessor = new InviteRequestProcessor(null,null);
-//        inviteRequestProcessor.observer=sipProcessorObserver;
-//        inviteRequestProcessor.init();
-//        MessageRequestProcessor messageRequestProcessor = new MessageRequestProcessor();
-//        messageRequestProcessor.observer=sipProcessorObserver;
-//        messageRequestProcessor.init();
-//        KeepaliveNotifyMessageHandler keepaliveNotifyMessageHandler = new KeepaliveNotifyMessageHandler();
-//        keepaliveNotifyMessageHandler.observer=sipProcessorObserver;
-//        keepaliveNotifyMessageHandler.init();
-//        keepaliveNotifyMessageHandler.messageRequestProcessor=messageRequestProcessor;
-//        keepaliveNotifyMessageHandler.start();
-//        new SipServerHelper(sipProcessorObserver).
-//            createSip( SipServerConfig.of("1","0.0.0.0", 7000,"udp","340200000","utf-8","12345678",10L,""))
-//            .subscribe();
-//        SipCommander sipCommander = new SipCommander(new SIPRequestHeaderProvider(null));
-//        SSRCInfo ssrcInfo = SSRCInfo.of(5003,null,null);
-//        MediaDevice mediaDevice = new MediaDevice();
-//        mediaDevice.setStreamMode(StreamMode.TCP_ACTIVE.name());
-//        mediaDevice.setId("34020000001320000003");
-//        mediaDevice.setHostAddress("192.168.104.244");
-//        mediaDevice.setPort(5063);
-//        mediaDevice.setTransport("udp");
-//        MediaServerItem mediaServerItem = new MediaServerItem();
-//        mediaServerItem.setSdpIp("192.168.104.244");
-//        Flux<EventResult> eventResultFlux = sipCommander.playStreamCmd(mediaServerItem, ssrcInfo,mediaDevice, "34020000001320000003");
-//        eventResultFlux
-//            .doOnNext(s->{
-//                System.out.println(s);
-//            })
-//            .subscribe();
-    }
-}

+ 4 - 4
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/impl/RegisterRequestProcessor.java

@@ -5,7 +5,7 @@ import gov.nist.javax.sip.header.Expires;
 import gov.nist.javax.sip.header.SIPDateHeader;
 import lombok.extern.slf4j.Slf4j;
 import org.jetlinks.community.media.entity.MediaDevice;
-import org.jetlinks.community.media.bean.SipServerConfig;
+import org.jetlinks.community.media.entity.SipGateway;
 import org.jetlinks.community.media.bean.WvpSipDate;
 import org.jetlinks.community.media.enums.StreamMode;
 import org.jetlinks.community.media.sip.SipContext;
@@ -13,7 +13,6 @@ import org.jetlinks.community.media.sip.SipRequestProcessorParent;
 import org.jetlinks.community.media.storage.impl.RedisCacheStorageImpl;
 import org.jetlinks.community.utils.SipUtils;
 import org.jetlinks.core.event.EventBus;
-import org.redisson.api.RedissonClient;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import org.springframework.util.StringUtils;
@@ -73,7 +72,7 @@ public class RegisterRequestProcessor extends SipRequestProcessorParent {
             int registerFlag = 0;
             String deviceId =  SipUtils.getUserIdFromFromHeader(request);
             //todo 获取设备信息
-            SipServerConfig sipConfig = SipContext.getConfig();
+            SipGateway sipConfig = SipContext.getConfig();
 
 
             MediaDevice device =
@@ -157,6 +156,7 @@ public class RegisterRequestProcessor extends SipRequestProcessorParent {
             // 保存到redis
             // 下发catelog查询目录
             if (registerFlag == 1 ) {
+                device.setLastRegisterTime(System.currentTimeMillis());
                 log.info("[{}] 注册成功! id:" + device.getId(), requestAddress);
                 registerCommonDevice(device);
                 // 重新注册更新设备和通道,以免设备替换或更新后信息无法更新
@@ -198,7 +198,7 @@ public class RegisterRequestProcessor extends SipRequestProcessorParent {
      * c)3:SIP代理重新向SIP服务器发送Register请求,在请求的Authorization字段给出信任书,包含认证信息;
      * d)4:SIP服务器对请求进行验证,如果检查出SIP代理身份合法,向SIP代理发送成功响应200OK,如果身份不合法则发送拒绝服务应答
      */
-    private Response baseDigest(Request request,SipServerConfig sipConfig,AtomicBoolean passwordCorrect) throws ParseException, NoSuchAlgorithmException {
+    private Response baseDigest(Request request, SipGateway sipConfig, AtomicBoolean passwordCorrect) throws ParseException, NoSuchAlgorithmException {
         AuthorizationHeader authorizationHeader = (AuthorizationHeader) request.getHeader(AuthorizationHeader.NAME);
         Response response=null;
         if(authorizationHeader==null){

+ 12 - 6
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/message/notify/CatalogResponseMessageProcessor.java

@@ -7,6 +7,7 @@ import org.jetlinks.community.media.bean.ParentPlatform;
 import org.jetlinks.community.media.contanst.CmdType;
 import org.jetlinks.community.media.entity.MediaDevice;
 import org.jetlinks.community.media.entity.MediaDeviceChannel;
+import org.jetlinks.community.media.enums.DeviceState;
 import org.jetlinks.community.media.gb28181.result.WVPResult;
 import org.jetlinks.community.media.message.MediaMessageReply;
 import org.jetlinks.community.media.service.LocalMediaDeviceChannelService;
@@ -18,6 +19,7 @@ import org.jetlinks.community.media.transmit.callback.RequestMessage;
 import org.jetlinks.community.utils.XmlUtil;
 import org.jetlinks.core.device.StandaloneDeviceMessageBroker;
 import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
 import reactor.core.publisher.Mono;
 import javax.sip.RequestEvent;
 import javax.sip.message.Response;
@@ -49,6 +51,7 @@ public class CatalogResponseMessageProcessor extends MessageHandlerAbstract {
     }
 
     @Override
+    @Transactional(rollbackFor = Exception.class)
     public void handleForDevice(RequestEvent evt, MediaDevice device, Element element) {
         Element rootElement = null;
         try {
@@ -61,8 +64,8 @@ public class CatalogResponseMessageProcessor extends MessageHandlerAbstract {
             }
             int sumNum = Integer.parseInt(sumNumElement.getText());
             Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
+            Set<MediaDeviceChannel> channelList = new HashSet<>();
             if (deviceListIterator != null) {
-                Set<MediaDeviceChannel> channelList = new HashSet<>();
                 // 遍历DeviceList
                 while (deviceListIterator.hasNext()) {
                     Element itemDevice = deviceListIterator.next();
@@ -84,11 +87,14 @@ public class CatalogResponseMessageProcessor extends MessageHandlerAbstract {
                 //更新设备通道
                 messageBroker.reply(reply)
                     .mergeWith(Mono.zip( deviceChannelService.createDelete()
-                        .where(MediaDeviceChannel::getDeviceId,device.getId())
-                        .execute(),deviceChannelService.save(channelList),    mediaDeviceService.createUpdate()
-                        .where(MediaDevice::getId,device.getId())
-                        .set(MediaDevice::getChannelCount,channelList.size())
-                        .execute()).then(Mono.empty()))
+                            .where(MediaDeviceChannel::getDeviceId,device.getId())
+                            .execute()
+                            .flatMap(ignore->deviceChannelService.save(channelList)),
+                        mediaDeviceService.createUpdate()
+                            .where(MediaDevice::getId,device.getId())
+                            .set(MediaDevice::getChannelCount,channelList.size())
+                            .set(MediaDevice::getState, DeviceState.online)
+                            .execute()).then(Mono.empty()))
                     .subscribe();
             }
             // 回复200 OK

+ 1 - 1
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/message/notify/DeviceInfoResponseMessageProcessor.java

@@ -64,7 +64,7 @@ public class DeviceInfoResponseMessageProcessor extends MessageHandlerAbstract {
         try {
             element = getRootElement(evt, Optional.ofNullable(device.getCharset()).orElse("utf-8"));
             Element deviceIdElement = element.element("DeviceID");
-//            String channelId = deviceIdElement.getTextTrim();
+            String channelId = deviceIdElement.getTextTrim();
 //            String key = DeferredResultHolder.CALLBACK_CMD_DEVICEINFO + device.getId() + channelId;
             device.setName(XmlUtil.getText(element, "DeviceName"));
 

+ 7 - 12
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/transmit/SIPRequestHeaderProvider.java

@@ -1,12 +1,10 @@
 package org.jetlinks.community.media.transmit;
 
 
-import gov.nist.javax.sip.header.SIPDateHeader;
 import lombok.AllArgsConstructor;
 import org.jetlinks.community.media.bean.StreamInfo;
 import org.jetlinks.community.media.entity.MediaDevice;
-import org.jetlinks.community.media.bean.SipServerConfig;
-import org.jetlinks.community.media.bean.WvpSipDate;
+import org.jetlinks.community.media.entity.SipGateway;
 import org.jetlinks.community.media.sip.SipContext;
 import org.jetlinks.community.media.storage.impl.RedisCacheStorageImpl;
 import org.springframework.stereotype.Component;
@@ -21,9 +19,6 @@ import javax.sip.header.*;
 import javax.sip.message.Request;
 import java.text.ParseException;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Calendar;
-import java.util.Locale;
 
 /**
  * @description:摄像头命令request创造器 TODO 冗余代码太多待优化
@@ -40,7 +35,7 @@ public class SIPRequestHeaderProvider {
 //	private VideoStreamSessionManager streamSession;
 
 	public Request createMessageRequest(MediaDevice device, String content, String viaTag, String fromTag, String toTag, CallIdHeader callIdHeader) throws ParseException, InvalidArgumentException, PeerUnavailableException {
-        SipServerConfig sipConfig=SipContext.getConfig();
+        SipGateway sipConfig=SipContext.getConfig();
         SipFactory sipFactory=SipContext.getSipFactory();
 		Request request = null;
 		// sipuri
@@ -73,7 +68,7 @@ public class SIPRequestHeaderProvider {
 	}
 
 	public Request createInviteRequest(MediaDevice device, String channelId, String content, String viaTag, String fromTag, String toTag, String ssrc, CallIdHeader callIdHeader) throws ParseException, InvalidArgumentException, PeerUnavailableException {
-        SipServerConfig sipConfig=SipContext.getConfig();
+        SipGateway sipConfig=SipContext.getConfig();
         SipFactory sipFactory=SipContext.getSipFactory();
 		Request request = null;
 		//请求行
@@ -131,7 +126,7 @@ public class SIPRequestHeaderProvider {
 	
 
 	public Request createPlaybackInviteRequest(MediaDevice device, String channelId, String content, String viaTag, String fromTag, String toTag, CallIdHeader callIdHeader, String ssrc) throws ParseException, InvalidArgumentException, PeerUnavailableException {
-        SipServerConfig sipConfig=SipContext.getConfig();
+        SipGateway sipConfig=SipContext.getConfig();
         SipFactory sipFactory=SipContext.getSipFactory();
 		Request request = null;
 		//请求行
@@ -173,7 +168,7 @@ public class SIPRequestHeaderProvider {
 	}
 
 	public Request createByteRequest(MediaDevice device, String channelId, String viaTag, String fromTag, String toTag, String callId) throws ParseException, InvalidArgumentException, PeerUnavailableException {
-        SipServerConfig sipConfig=SipContext.getConfig();
+        SipGateway sipConfig=SipContext.getConfig();
         SipFactory sipFactory=SipContext.getSipFactory();
 		Request request = null;
 		//请求行
@@ -205,7 +200,7 @@ public class SIPRequestHeaderProvider {
 	}
 
 	public Request createSubscribeRequest(MediaDevice device, String content, String viaTag, String fromTag, String toTag, Integer expires, String event, CallIdHeader callIdHeader) throws ParseException, InvalidArgumentException, PeerUnavailableException {
-        SipServerConfig sipConfig=SipContext.getConfig();
+        SipGateway sipConfig=SipContext.getConfig();
         SipFactory sipFactory=SipContext.getSipFactory();
 		Request request = null;
 		// sipuri
@@ -254,7 +249,7 @@ public class SIPRequestHeaderProvider {
 
 	public Request createInfoRequest(MediaDevice device, StreamInfo streamInfo, String content, Long cseq)
 			throws PeerUnavailableException, ParseException, InvalidArgumentException {
-        SipServerConfig sipConfig=SipContext.getConfig();
+        SipGateway sipConfig=SipContext.getConfig();
         SipFactory sipFactory=SipContext.getSipFactory();
 		Request request = null;
 		//todo

+ 2 - 6
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/transmit/cmd/SipCommander.java

@@ -5,7 +5,6 @@ import gov.nist.javax.sip.SipProviderImpl;
 import gov.nist.javax.sip.SipStackImpl;
 import gov.nist.javax.sip.message.SIPRequest;
 import gov.nist.javax.sip.stack.SIPDialog;
-import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.hswebframework.web.exception.BusinessException;
 import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor;
@@ -13,22 +12,19 @@ import org.jetlinks.community.gateway.monitor.GatewayMonitors;
 import org.jetlinks.community.media.bean.SSRCInfo;
 import org.jetlinks.community.media.config.UserSetup;
 import org.jetlinks.community.media.entity.MediaDevice;
-import org.jetlinks.community.media.bean.SipServerConfig;
+import org.jetlinks.community.media.entity.SipGateway;
 import org.jetlinks.community.media.gb28181.event.SipSubscribe;
 import org.jetlinks.community.media.message.MediaMessageReply;
-import org.jetlinks.community.media.service.LocalMediaDeviceService;
 import org.jetlinks.community.media.service.LocalMediaServerItemService;
 import org.jetlinks.community.media.session.VideoStreamSessionManager;
 import org.jetlinks.community.media.sip.SipContext;
 import org.jetlinks.community.media.storage.impl.RedisCacheStorageImpl;
 import org.jetlinks.community.media.transmit.SIPRequestHeaderProvider;
-import org.jetlinks.community.media.transmit.callback.DeferredResultHolder;
 import org.jetlinks.community.media.zlm.ZLMHttpHookSubscribe;
 import org.jetlinks.community.media.zlm.dto.MediaItem;
 import org.jetlinks.community.media.zlm.entity.MediaServerItem;
 import org.jetlinks.community.utils.ZLMKeyGenerate;
 import org.jetlinks.core.device.StandaloneDeviceMessageBroker;
-import org.jetlinks.core.exception.DeviceOperationException;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.*;
 import javax.sip.*;
@@ -85,7 +81,7 @@ public class SipCommander {
     public Mono<Void> playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, MediaDevice device, String channelId,
                                     ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent) {
         String streamId = ssrcInfo.getStreamId();
-        SipServerConfig sipConfig = SipContext.getConfig();
+        SipGateway sipConfig = SipContext.getConfig();
         try {
             if (device == null){
                 return Mono.error(new BusinessException("设备不存在"));

+ 8 - 0
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/utils/RedisUtil.java

@@ -376,6 +376,14 @@ public class RedisUtil {
         }
     }
 
+    public Set<Object> members(String key){
+        try {
+            return redisTemplate.opsForSet().members(key);
+        }catch (Exception e){
+            e.printStackTrace();
+            return new HashSet<>();
+        }
+    }
     /**
      * 移除 set缓存中,值为 value 的
      * @param key 键

+ 7 - 2
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/utils/XmlUtil.java

@@ -1,7 +1,9 @@
 package org.jetlinks.community.utils;
 
+import cn.hutool.core.util.StrUtil;
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
+import org.checkerframework.checker.nullness.Opt;
 import org.dom4j.Attribute;
 import org.dom4j.Document;
 import org.dom4j.DocumentException;
@@ -22,8 +24,8 @@ import java.util.*;
 
 /**
  * 基于dom4j的工具包
- * 
- * 
+ *
+ *
  */
 public class XmlUtil {
     /**
@@ -262,6 +264,9 @@ public class XmlUtil {
         } else {
             deviceChannel.setPtzType(PtzType.of(Integer.parseInt(XmlUtil.getText(itemDevice, "PTZType"))));
         }
+        if (XmlUtil.getText(itemDevice, "Info")!=null) {
+            deviceChannel.setPtzType(PtzType.of(  Integer.valueOf(Optional.ofNullable(itemDevice.element("Info").element("PTZType").getText()).orElse(deviceChannel.getPtzType().getValue()))));
+        }
         deviceChannel.setHasAudio(true); // 默认含有音频,播放时再检查是否有音频及是否AAC
         return deviceChannel;
     }

+ 17 - 6
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/DefaultDeviceSessionManager.java

@@ -1,8 +1,10 @@
 package org.jetlinks.community.standalone.configuration;
 
+import cn.hutool.extra.spring.SpringUtil;
 import lombok.Getter;
 import lombok.Setter;
 import org.jetlinks.community.standalone.configuration.cluster.RedisHaManager;
+import org.jetlinks.community.utils.RedisUtil;
 import org.jetlinks.core.device.DeviceConfigKey;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.device.DeviceState;
@@ -36,12 +38,6 @@ public class DefaultDeviceSessionManager implements DeviceSessionManager {
 
     private final Map<String, Map<String, ChildrenDeviceSession>> children = new ConcurrentHashMap<>(4096);
 
-    /**
-     * 冗余会话信息
-     * 即保存其他节点的会话信息,当其他节点宕机时,判断该会话是否立即断开
-     */
-    private final Map<String,Set<String>> redundancySessions = new ConcurrentHashMap<>(4096);
-
     @Getter
     @Setter
     private Logger log = LoggerFactory.getLogger(DefaultDeviceSessionManager.class);
@@ -162,6 +158,7 @@ public class DefaultDeviceSessionManager implements DeviceSessionManager {
     }
 
     public void init() {
+
         Objects.requireNonNull(gatewayServerMonitor, "gatewayServerMonitor");
         Objects.requireNonNull(registry, "registry");
         serverId = gatewayServerMonitor.getCurrentServerId();
@@ -276,6 +273,8 @@ public class DefaultDeviceSessionManager implements DeviceSessionManager {
     @Override
     public DeviceSession register(DeviceSession session) {
         DeviceSession old = repository.put(session.getDeviceId(), session);
+        addSessionCache(session.getDeviceId());
+        session.onClose(()->deleteSessionCache(session.getDeviceId()));
         if (old != null) {
             //清空sessionId不同
             if (!old.getId().equals(old.getDeviceId())) {
@@ -364,4 +363,16 @@ public class DefaultDeviceSessionManager implements DeviceSessionManager {
         return session;
     }
 
+
+    private void deleteSessionCache(String deviceId){
+        getRedisUtil().setRemove("session_"+serverId,deviceId);
+    }
+
+    private void addSessionCache(String deviceId){
+        getRedisUtil().sSet("session_"+serverId,deviceId);
+    }
+
+    private RedisUtil getRedisUtil(){
+        return SpringUtil.getBean(RedisUtil.class);
+    }
 }

+ 3 - 7
jetlinks-standalone/src/test/java/org/jetlinks/community/BridgeTest.java

@@ -1,13 +1,10 @@
 package org.jetlinks.community;
 
 import lombok.extern.slf4j.Slf4j;
-import org.jetlinks.community.media.bean.SSRCInfo;
-import org.jetlinks.community.media.bean.SipServerConfig;
 import org.jetlinks.community.media.controller.PlayController;
 import org.jetlinks.community.media.service.LocalMediaDeviceService;
-import org.jetlinks.community.media.sip.SipServerHelper;
+import org.jetlinks.community.media.sip.SipGatewayHelper;
 import org.jetlinks.community.media.transmit.cmd.SipCommander;
-import org.jetlinks.community.media.zlm.entity.MediaServerItem;
 import org.jetlinks.community.standalone.JetLinksApplication;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.event.EventBus;
@@ -18,7 +15,6 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.context.annotation.EnableAspectJAutoProxy;
 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
-import reactor.core.publisher.Mono;
 
 /**
  * @author lifang
@@ -41,7 +37,7 @@ public class BridgeTest {
     @Autowired
     private SipCommander sipCommander;
     @Autowired
-    SipServerHelper serverHelper;
+    SipGatewayHelper serverHelper;
     @Autowired
     LocalMediaDeviceService mediaDeviceService;
     @Autowired
@@ -49,7 +45,7 @@ public class BridgeTest {
 //
     @Test
     public void init() throws InterruptedException {
-//        serverHelper.createSip( SipServerConfig.of("34020000002000000001","192.168.10.100", 7001,"udp","340200000","utf-8","12345678",10L,"1")).subscribe();
+//        serverHelper.createSip( SipGateway.of("34020000002000000001","192.168.10.100", 7001,"udp","340200000","utf-8","12345678",10L,"1")).subscribe();
 //        Thread.sleep(3000);
 //        System.out.println("开始调用invite方法");
 //        SSRCInfo ssrcInfo = SSRCInfo.of(10000,"1000",null);