فهرست منبع

add 通道更新

18339543638 3 سال پیش
والد
کامیت
732a091bc9
19فایلهای تغییر یافته به همراه646 افزوده شده و 298 حذف شده
  1. 1 1
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/DeviceMessageBusinessHandler.java
  2. 81 0
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/bean/SubscribeInfo.java
  3. 51 0
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/controller/DeferredController.java
  4. 77 5
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/controller/MediaDeviceController.java
  5. 2 36
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/controller/PlayController.java
  6. 2 2
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/entity/MediaDevice.java
  7. 2 2
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/gb28181/result/PlayResult.java
  8. 67 18
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalMediaDeviceService.java
  9. 122 0
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/session/SipSession.java
  10. 28 31
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/SipRunner.java
  11. 14 2
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/processor/SipProcessorObserver.java
  12. 137 140
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/impl/SubscribeRequestProcessor.java
  13. 7 17
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/message/MessageRequestProcessor.java
  14. 7 4
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/message/notify/CatalogResponseMessageProcessor.java
  15. 1 1
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/message/notify/KeepaliveNotifyMessageHandler.java
  16. 18 21
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/storage/impl/RedisCacheStorageImpl.java
  17. 18 2
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/transmit/cmd/SipCommander.java
  18. 1 1
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/ZLMRunner.java
  19. 10 15
      jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/DefaultDeviceSessionManager.java

+ 1 - 1
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/DeviceMessageBusinessHandler.java

@@ -116,7 +116,7 @@ public class DeviceMessageBusinessHandler {
             }))
             .then();
     }
-
+    
     /**
      * 通过订阅子设备注册消息,自动绑定子设备到网关设备
      *

+ 81 - 0
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/bean/SubscribeInfo.java

@@ -0,0 +1,81 @@
+package org.jetlinks.community.media.bean;
+
+import javax.sip.RequestEvent;
+import javax.sip.header.CallIdHeader;
+import javax.sip.header.EventHeader;
+import javax.sip.header.ExpiresHeader;
+import javax.sip.header.FromHeader;
+import javax.sip.message.Request;
+
+public class SubscribeInfo {
+
+    public SubscribeInfo() {
+    }
+
+    public SubscribeInfo(RequestEvent evt, String id) {
+        this.id = id;
+        Request request = evt.getRequest();
+        CallIdHeader callIdHeader = (CallIdHeader)request.getHeader(CallIdHeader.NAME);
+        this.callId = callIdHeader.getCallId();
+        FromHeader fromHeader = (FromHeader)request.getHeader(FromHeader.NAME);
+        this.fromTag = fromHeader.getTag();
+        ExpiresHeader expiresHeader = (ExpiresHeader)request.getHeader(ExpiresHeader.NAME);
+        this.expires = expiresHeader.getExpires();
+        this.event = ((EventHeader)request.getHeader(EventHeader.NAME)).getName();
+    }
+
+    private String id;
+    private int expires;
+    private String callId;
+    private String event;
+    private String fromTag;
+    private String toTag;
+
+    public String getId() {
+        return id;
+    }
+
+    public int getExpires() {
+        return expires;
+    }
+
+    public String getCallId() {
+        return callId;
+    }
+
+    public String getFromTag() {
+        return fromTag;
+    }
+
+    public void setToTag(String toTag) {
+        this.toTag = toTag;
+    }
+
+    public String getToTag() {
+        return toTag;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public void setExpires(int expires) {
+        this.expires = expires;
+    }
+
+    public void setCallId(String callId) {
+        this.callId = callId;
+    }
+
+    public void setFromTag(String fromTag) {
+        this.fromTag = fromTag;
+    }
+
+    public String getEvent() {
+        return event;
+    }
+
+    public void setEvent(String event) {
+        this.event = event;
+    }
+}

+ 51 - 0
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/controller/DeferredController.java

@@ -0,0 +1,51 @@
+package org.jetlinks.community.media.controller;
+
+import org.hswebframework.web.exception.BusinessException;
+import org.jetlinks.community.media.gb28181.result.PlayResult;
+import org.jetlinks.community.media.gb28181.result.WVPResult;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.context.request.async.DeferredResult;
+import reactor.core.publisher.Mono;
+
+import java.util.concurrent.TimeoutException;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName DeferedController.java
+ * @Description TODO
+ * @createTime 2022年02月19日 10:06:00
+ */
+public interface DeferredController {
+    default Mono<Object> deferredResultHandler(PlayResult playResult) {
+        DeferredResult<ResponseEntity<String>> result = playResult.getResult();
+        //最长1分钟
+        long currentTimeMillis = System.currentTimeMillis();
+        while (!result.isSetOrExpired()){
+            if((System.currentTimeMillis()-currentTimeMillis)>60*1000){
+                return Mono.error(new TimeoutException("请求超时"));
+            }
+        }
+        Object data = result.getResult();
+        if(data instanceof ResponseEntity){
+            ResponseEntity response= (ResponseEntity) data;
+            Object body = response.getBody();
+            if(body instanceof WVPResult){
+                WVPResult wvpResult= (WVPResult) body;
+                if(wvpResult.getCode()==0){
+                    if(playResult.getComplete()!=null){
+                        return playResult.getComplete().thenReturn(wvpResult.getData()).defaultIfEmpty(new WVPResult<>());
+                    }
+                    return Mono.justOrEmpty(wvpResult.getData());
+                }else {
+                    if(playResult.getTimeout()!=null){
+                        return playResult.getTimeout().then(Mono.error(new BusinessException(wvpResult.getMsg())));
+                    }
+                    return Mono.error(new BusinessException(wvpResult.getMsg()));
+                }
+            }
+            return Mono.error(new BusinessException("返回结果类型不为Response,请联系工作人员查看"));
+        }
+        return Mono.error(new BusinessException("返回结果类型不为WvpResult,请联系工作人员查看"));
+    }
+}

+ 77 - 5
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/controller/MediaDeviceController.java

@@ -1,16 +1,32 @@
 package org.jetlinks.community.media.controller;
 
+import io.swagger.v3.oas.annotations.Operation;
+import io.swagger.v3.oas.annotations.Parameter;
 import io.swagger.v3.oas.annotations.tags.Tag;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.hswebframework.web.authorization.annotation.Authorize;
+import org.hswebframework.web.authorization.annotation.CreateAction;
 import org.hswebframework.web.authorization.annotation.Resource;
+import org.hswebframework.web.authorization.annotation.SaveAction;
 import org.hswebframework.web.crud.service.ReactiveCrudService;
+import org.hswebframework.web.crud.web.reactive.ReactiveServiceCrudController;
 import org.hswebframework.web.crud.web.reactive.ReactiveServiceQueryController;
+import org.hswebframework.web.exception.BusinessException;
 import org.jetlinks.community.media.entity.MediaDevice;
+import org.jetlinks.community.media.gb28181.result.PlayResult;
+import org.jetlinks.community.media.gb28181.result.WVPResult;
 import org.jetlinks.community.media.service.LocalMediaDeviceService;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RestController;
+import org.jetlinks.community.media.storage.impl.RedisCacheStorageImpl;
+import org.jetlinks.community.media.transmit.callback.DeferredResultHolder;
+import org.jetlinks.community.media.transmit.callback.RequestMessage;
+import org.jetlinks.community.media.transmit.cmd.SipCommander;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.*;
+import org.springframework.web.context.request.async.DeferredResult;
+import reactor.core.publisher.Mono;
+
+import java.util.UUID;
 
 /**
  * @author lifang
@@ -22,14 +38,70 @@ import org.springframework.web.bind.annotation.RestController;
 @RestController
 @RequestMapping("/media/device")
 @Slf4j
-@Authorize
+@Authorize(ignore = true)
 @Resource(id="media-deivce",name = "媒体流设备")
 @AllArgsConstructor
 @Tag(name = "媒体视频设备")
-public class MediaDeviceController implements ReactiveServiceQueryController<MediaDevice, String> {
-    private LocalMediaDeviceService mediaDeviceService;
+public class MediaDeviceController implements ReactiveServiceCrudController<MediaDevice, String>,DeferredController{
+    private final LocalMediaDeviceService mediaDeviceService;
+
+    private final RedisCacheStorageImpl redisCacheStorage;
+    private final DeferredResultHolder resultHolder;
+    private final SipCommander cmder;
     @Override
     public ReactiveCrudService<MediaDevice, String> getService() {
         return mediaDeviceService;
     }
+
+
+
+    @PostMapping("/{deviceId}/channels/sync")
+    @CreateAction
+    @Operation(summary = "更新通道")
+    public Mono<Object> getDeviceDetailInfo(@PathVariable("deviceId") @Parameter(description = "设备ID") String id) {
+        if (log.isDebugEnabled()) {
+            log.debug("设备通道信息同步API调用,deviceId:" + id);
+        }
+        MediaDevice device = redisCacheStorage.getDevice(id);
+        if(device==null){
+            return Mono.error(new BusinessException("设备已离线,无法更新通道最新信息"));
+        }
+        PlayResult<MediaDevice> playResult = new PlayResult<>();
+        String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + id;
+        String uuid = UUID.randomUUID().toString();
+        //默认超时时间为30分钟
+        DeferredResult<ResponseEntity<MediaDevice>> result = new DeferredResult<>(30*60*1000L);
+        playResult.setResult(result);
+        playResult.onTimeout(Mono.fromRunnable(()->{
+            log.warn("设备[{}]通道信息同步超时", id);
+            // 释放rtpserver
+            RequestMessage msg = new RequestMessage();
+            msg.setKey(key);
+            msg.setId(uuid);
+            WVPResult<Object> wvpResult = new WVPResult<>();
+            wvpResult.setCode(-1);
+            wvpResult.setData(device);
+            wvpResult.setMsg("更新超时");
+            msg.setData(wvpResult);
+            resultHolder.invokeAllResult(msg);
+        }));
+        // 等待其他相同请求返回时一起返回
+        if (resultHolder.exist(key, null)) {
+            return deferredResultHandler(playResult);
+        }
+        resultHolder.put(key, uuid, result);
+        return cmder.catalogQuery(device, event -> {
+            RequestMessage msg = new RequestMessage();
+            msg.setKey(key);
+            msg.setId(uuid);
+            WVPResult<Object> wvpResult = new WVPResult<>();
+            wvpResult.setCode(-1);
+            wvpResult.setData(device);
+            wvpResult.setMsg(String.format("同步通道失败,错误码: %s, %s", event.statusCode, event.msg));
+            msg.setData(wvpResult);
+            resultHolder.invokeAllResult(msg);
+        })
+            .flatMap(ignore->deferredResultHandler(playResult));
+    }
+
 }

+ 2 - 36
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/controller/PlayController.java

@@ -13,7 +13,6 @@ import org.hswebframework.web.authorization.annotation.Resource;
 import org.hswebframework.web.exception.BusinessException;
 import org.jetlinks.community.media.bean.StreamInfo;
 import org.jetlinks.community.media.gb28181.result.PlayResult;
-import org.jetlinks.community.media.gb28181.result.WVPResult;
 import org.jetlinks.community.media.service.LocalMediaDeviceChannelService;
 import org.jetlinks.community.media.service.LocalMediaDeviceService;
 import org.jetlinks.community.media.service.LocalMediaServerItemService;
@@ -27,7 +26,6 @@ import org.springframework.web.bind.annotation.*;
 import org.springframework.web.context.request.async.DeferredResult;
 import reactor.core.publisher.Mono;
 import reactor.util.function.Tuple2;
-import java.util.concurrent.*;
 
 
 @RestController
@@ -37,7 +35,7 @@ import java.util.concurrent.*;
 @Resource(id="gb28181-play",name = "国标设备点播")
 @AllArgsConstructor
 @Tag(name = "GB媒体设备操作")
-public class PlayController {
+public class PlayController implements DeferredController{
 
     private final SipCommander cmder;
 
@@ -82,7 +80,7 @@ public class PlayController {
         // 录像查询以channelId作为deviceId查询
         String key = DeferredResultHolder.CALLBACK_CMD_STOP + deviceId + channelId;
         resultHolder.put(key, uuid, result);
-        PlayResult playResult = new PlayResult();
+        PlayResult<String> playResult = new PlayResult<String>();
         playResult.setResult(result);
         // 超时处理
         playResult.onTimeout(Mono.fromRunnable(()->{
@@ -321,37 +319,5 @@ public class PlayController {
 //		return result;
 //	}
 //
-
-    private Mono<Object> deferredResultHandler(PlayResult playResult) {
-        DeferredResult<ResponseEntity<String>> result = playResult.getResult();
-        //最长1分钟
-        long currentTimeMillis = System.currentTimeMillis();
-        while (!result.isSetOrExpired()){
-            if((System.currentTimeMillis()-currentTimeMillis)>60*1000){
-                return Mono.error(new TimeoutException("请求超时"));
-            }
-        }
-        Object data = result.getResult();
-        if(data instanceof ResponseEntity){
-            ResponseEntity response= (ResponseEntity) data;
-            Object body = response.getBody();
-            if(body instanceof WVPResult){
-                WVPResult wvpResult= (WVPResult) body;
-                if(wvpResult.getCode()==0){
-                    if(playResult.getComplete()!=null){
-                        return playResult.getComplete().thenReturn(wvpResult.getData()).defaultIfEmpty(new WVPResult<>());
-                    }
-                    return Mono.justOrEmpty(wvpResult.getData());
-                }else {
-                    if(playResult.getTimeout()!=null){
-                        return playResult.getTimeout().then(Mono.error(new BusinessException(wvpResult.getMsg())));
-                    }
-                    return Mono.error(new BusinessException(wvpResult.getMsg()));
-                }
-            }
-            return Mono.error(new BusinessException("返回结果类型不为Response,请联系工作人员查看"));
-        }
-        return Mono.error(new BusinessException("返回结果类型不为WvpResult,请联系工作人员查看"));
-    }
 }
 

+ 2 - 2
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/entity/MediaDevice.java

@@ -152,7 +152,7 @@ public class MediaDevice extends GenericEntity<String> implements RecordCreation
 	/**
 	 * 通道个数
 	 */
-    @Column(name = "channel_count", updatable = false)
+    @Column(name = "channel_count")
     @Schema(
         description = "通道个数"
         ,accessMode = Schema.AccessMode.READ_ONLY
@@ -162,7 +162,7 @@ public class MediaDevice extends GenericEntity<String> implements RecordCreation
 	/**
 	 * 注册有效期
 	 */
-    @Column(name = "expires", updatable = false)
+    @Column(name = "expires")
     @Schema(
         description = "注册有效期"
         ,accessMode = Schema.AccessMode.READ_ONLY

+ 2 - 2
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/gb28181/result/PlayResult.java

@@ -7,9 +7,9 @@ import org.springframework.web.context.request.async.DeferredResult;
 import reactor.core.publisher.Mono;
 
 @Data
-public class PlayResult {
+public class PlayResult<T> {
 
-    private DeferredResult<ResponseEntity<String>> result;
+    private DeferredResult<ResponseEntity<T>> result;
     private String uuid;
 
     private MediaDevice device;

+ 67 - 18
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalMediaDeviceService.java

@@ -5,9 +5,12 @@ import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.hswebframework.web.crud.service.GenericReactiveCrudService;
 import org.jetlinks.community.gateway.annotation.Subscribe;
+import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor;
+import org.jetlinks.community.gateway.monitor.GatewayMonitors;
 import org.jetlinks.community.media.entity.MediaDevice;
 import org.jetlinks.community.media.bean.SipServerConfig;
 import org.jetlinks.community.media.enums.DeviceState;
+import org.jetlinks.community.media.session.SipSession;
 import org.jetlinks.community.media.sip.SipServerHelper;
 import org.jetlinks.community.media.storage.impl.RedisCacheStorageImpl;
 import org.jetlinks.community.media.transmit.cmd.SipCommander;
@@ -17,15 +20,20 @@ import org.jetlinks.core.event.EventBus;
 import org.jetlinks.core.message.DeviceOfflineMessage;
 import org.jetlinks.core.message.DeviceOnlineMessage;
 import org.jetlinks.core.message.DeviceRegisterMessage;
+import org.jetlinks.core.server.session.DeviceSession;
+import org.jetlinks.core.server.session.DeviceSessionManager;
 import org.jetlinks.core.utils.IdUtils;
 import org.jetlinks.supports.server.DecodedClientMessageHandler;
 import org.redisson.api.RedissonClient;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.CommandLineRunner;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Mono;
 
 import java.lang.management.MemoryNotificationInfo;
+import java.time.Duration;
 import java.util.List;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 /**
@@ -36,7 +44,6 @@ import java.util.stream.Collectors;
  * @createTime 2022年01月21日 14:59:00
  */
 @Service
-@AllArgsConstructor
 @Slf4j
 public class LocalMediaDeviceService extends GenericReactiveCrudService<MediaDevice, String> implements CommandLineRunner {
     private final DecodedClientMessageHandler messageHandler;
@@ -44,6 +51,27 @@ public class LocalMediaDeviceService extends GenericReactiveCrudService<MediaDev
     private final EventBus eventBus;
     private final DeviceRegistry registry;
     private final SipCommander cmder;
+    private final DeviceGatewayMonitor gatewayMonitor;
+    private final DecodedClientMessageHandler connector;
+    private final DeviceSessionManager sessionManager;
+
+    public LocalMediaDeviceService(DecodedClientMessageHandler messageHandler,
+                                   RedisCacheStorageImpl redisCacheStorage,
+                                   EventBus eventBus, DeviceRegistry registry,
+                                   SipCommander cmder,
+                                   DecodedClientMessageHandler connector,
+                                   DeviceSessionManager sessionManager,
+                                   SipServerHelper sipServerHelper) {
+        this.messageHandler = messageHandler;
+        this.redisCacheStorage = redisCacheStorage;
+        this.eventBus = eventBus;
+        this.registry = registry;
+        this.cmder = cmder;
+        this.gatewayMonitor = GatewayMonitors.getDeviceGatewayMonitor("sip");
+        this.connector = connector;
+        this.sessionManager = sessionManager;
+        this.sipServerHelper = sipServerHelper;
+    }
 
     @Subscribe("/media/device/*/*/register")
     public void register(MediaDevice mediaDevice){
@@ -53,7 +81,23 @@ public class LocalMediaDeviceService extends GenericReactiveCrudService<MediaDev
             .flatMap(ignore->save(mediaDevice))
             .concatWith(
                 Mono.just(mediaDevice)
+                    .flatMap(ignore-> registry.getDevice(mediaDevice.getId()))
+                    .doOnNext(operator -> {
+                        DeviceSession session = sessionManager.getSession(mediaDevice.getId());
+                        if(session==null){
+                            SipSession sipSession = new SipSession(mediaDevice.getId(), operator, connector, Optional.of(mediaDevice.getExpires().longValue()).orElse(30L));
+//                            SipSession sipSession = new SipSession(mediaDevice.getId(), operator, connector, 10L);
+                            sipSession.onClose(()->{
+                                //设备下线
+                                unRegister(mediaDevice);
+                                sessionManager.unregister(mediaDevice.getId());
+                            });
+                            sessionManager.register(sipSession);
+                        }
+                    })
+                    .filter(ignore->mediaDevice.isFirsRegister())
                     .flatMap(device -> {
+                        //首次注册执行注册设备信息,创建设备会话
                         DeviceRegisterMessage registerMessage = new DeviceRegisterMessage();
                         registerMessage.setDeviceId(mediaDevice.getId());
                         registerMessage.setMessageId(IdUtils.newUUID());
@@ -61,14 +105,16 @@ public class LocalMediaDeviceService extends GenericReactiveCrudService<MediaDev
                         registerMessage.addHeader("deviceName","GB28181-2016 "+mediaDevice.getId());
                         registerMessage.addHeader("productId",mediaDevice.getProductId());
                         return eventBus.publish("/device/*/*/register",registerMessage)
-                            .mergeWith(eventBus.publish("/media/device/heart-beat/"+mediaDevice.getId(),mediaDevice)).then(Mono.empty());
+                            .mergeWith(eventBus.publish("/media/device/heart-beat/"+mediaDevice.getId(),mediaDevice)).then(Mono.just(1L));
                     })
+
                     .then(Mono.empty())
             )
             .doOnComplete(()->{
                 if(mediaDevice.isFirsRegister()){
-                     cmder.deviceInfoQuery(mediaDevice)
-                         .concatWith(cmder.catalogQuery(mediaDevice, null)).subscribe();
+                    gatewayMonitor.connected();
+                    cmder.deviceInfoQuery(mediaDevice)
+                        .concatWith(cmder.catalogQuery(mediaDevice, null)).subscribe();
                 }
             })
             .subscribe();
@@ -77,6 +123,11 @@ public class LocalMediaDeviceService extends GenericReactiveCrudService<MediaDev
     @Subscribe("/media/device/*/*/unregister")
     public void unRegister(MediaDevice mediaDevice){
         //取消注册媒体设备
+        DeviceSession session = sessionManager.unregister(mediaDevice.getId());
+        if(session!=null){
+            session.close();
+        }
+        redisCacheStorage.removeDevice(mediaDevice.getId());
         this.createUpdate()
             .where(MediaDevice::getId,mediaDevice.getId())
             .set(MediaDevice::getState,DeviceState.offline)
@@ -98,23 +149,21 @@ public class LocalMediaDeviceService extends GenericReactiveCrudService<MediaDev
 
     @Subscribe("/media/device/heart-beat/**")
     public void heartBeat(MediaDevice device){
-        //心跳过期 ,设备上线
-        redisCacheStorage.updateDevice(device);
-        DeviceOnlineMessage onlineMessage = new DeviceOnlineMessage();
-        onlineMessage.setDeviceId(device.getId());
-        onlineMessage.setTimestamp(System.currentTimeMillis());
-        onlineMessage.setMessageId(IdUtils.newUUID());
 
         //更新设备心跳时间
         device.setKeepaliveTime(DateUtil.now());
-
         device.setState(DeviceState.online);
-        //未注册的设备不进行更新
-        registry
-            .getDevice(device.getId())
-            .flatMap(operator -> messageHandler.handleMessage(operator,onlineMessage))
-            .flatMap(ignore->updateById(device.getId(),device))
-            .subscribe();
+        if (redisCacheStorage.getDevice(device.getId())==null) {
+            //未注册的设备不进行更新
+            updateById(device.getId(),device)
+                .subscribe();
+        }
+        //心跳过期 ,设备上线
+        redisCacheStorage.updateDevice(device);
+        DeviceSession session = sessionManager.getSession(device.getId());
+        if(session!=null){
+            session.ping();
+        }
     }
 
 
@@ -137,7 +186,7 @@ public class LocalMediaDeviceService extends GenericReactiveCrudService<MediaDev
             .notIn(MediaDevice::getId,deviceIds)
             .fetch()
             .doOnNext(this::unRegister)
-            .concatWith(sipServerHelper.createSip( SipServerConfig.of("34020000002000000001","0.0.0.0", 7001,"udp","340200000","utf-8","12345678",10L,"1")).then(Mono.empty()))
+            .concatWith(sipServerHelper.createSip( SipServerConfig.of("34020000002000000001","192.168.104.244", 7001,"udp","340200000","utf-8","12345678",10L,"1")).then(Mono.empty()))
             .doOnError(e->{
                 e.printStackTrace();
             })

+ 122 - 0
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/session/SipSession.java

@@ -0,0 +1,122 @@
+package org.jetlinks.community.media.session;
+
+import lombok.Data;
+import org.jetlinks.core.device.DeviceOperator;
+import org.jetlinks.core.message.DeviceOnlineMessage;
+import org.jetlinks.core.message.codec.DefaultTransport;
+import org.jetlinks.core.message.codec.EncodedMessage;
+import org.jetlinks.core.message.codec.Transport;
+import org.jetlinks.core.server.session.DeviceSession;
+import org.jetlinks.core.utils.IdUtils;
+import org.jetlinks.supports.server.DecodedClientMessageHandler;
+import reactor.core.Disposable;
+import reactor.core.publisher.Mono;
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName SipSession.java
+ * @Description GB28181 sip协议会话
+ * @createTime 2022年02月18日 11:35:00
+ */
+@Data
+public class SipSession implements DeviceSession {
+    private final String deviceId;
+    private final DeviceOperator operator;
+
+    //心跳时间
+    private long heartBeat;
+
+    private AtomicBoolean alive=new AtomicBoolean(true);
+    private Disposable heartDisposable=null;
+    private DecodedClientMessageHandler connector;
+    private final long connectTime = System.currentTimeMillis();
+    private long lastPingTime = System.currentTimeMillis();
+    private Runnable closeCallBack=null;
+
+    public SipSession(String deviceId, DeviceOperator operator,DecodedClientMessageHandler connector,long heartBeat) {
+        this.deviceId = deviceId;
+        this.operator = operator;
+        this.heartBeat=heartBeat;
+        this.connector=connector;
+        if(heartBeat>0){
+            handleHeart();
+        }
+        connector.handleMessage(operator,online(deviceId)).subscribe();
+    }
+
+    /**
+     * 设备上线
+     */
+    private DeviceOnlineMessage online(String deviceId){
+        DeviceOnlineMessage onlineMessage = new DeviceOnlineMessage();
+        onlineMessage.setDeviceId(deviceId);
+        onlineMessage.setTimestamp(System.currentTimeMillis());
+        onlineMessage.setMessageId(IdUtils.newUUID());
+        return onlineMessage;
+    }
+
+    private void handleHeart(){
+        if(heartDisposable!=null&&!heartDisposable.isDisposed()){
+            heartDisposable.dispose();
+        }
+        this.alive.set(true);
+        heartDisposable=Mono.delay(Duration.ofSeconds(heartBeat)).subscribe(ignore->close());
+    }
+
+
+    @Override
+    public String getId() {
+        return deviceId;
+    }
+
+    @Override
+    public long lastPingTime() {
+        return lastPingTime;
+    }
+
+    @Override
+    public long connectTime() {
+        return connectTime;
+    }
+
+    @Override
+    public Mono<Boolean> send(EncodedMessage encodedMessage) {
+        return Mono.empty();
+    }
+
+    @Override
+    public Transport getTransport() {
+        return DefaultTransport.UDP;
+    }
+
+    @Override
+    public void close() {
+        if(this.alive.get()){
+            this.alive.set(false);
+            if(closeCallBack!=null){
+                Mono.fromRunnable(closeCallBack).subscribe();
+            };
+        }
+    }
+
+    @Override
+    public void ping() {
+        lastPingTime=System.currentTimeMillis();
+        if(heartBeat>0){
+            handleHeart();
+        }
+    }
+
+    @Override
+    public boolean isAlive() {
+        return alive.get();
+    }
+
+    @Override
+    public void onClose(Runnable call) {
+        closeCallBack=call;
+    }
+}

+ 28 - 31
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/SipRunner.java

@@ -34,36 +34,33 @@ public class SipRunner implements CommandLineRunner {
         log.error(error.getMessage(), error);
     @Override
     public void run(String... args) {
-        redisCacheStorage.resetAllCSEQ();
-        //每隔30S发起一次订阅 todo 集群
-        Flux.interval(Duration.ofSeconds(10))
-            //获取所有在线设备信息
-            .flatMap(ignore->Mono.justOrEmpty(redisCacheStorage.getOnlineForAll()))
-            .filter(CollectionUtil::isNotEmpty)
-            .flatMap(deviceIds-> Flux.fromStream(deviceIds.stream()))
-            .publishOn(Schedulers.boundedElastic())
-            .flatMap(deviceId->Mono.justOrEmpty(redisCacheStorage.getDevice(deviceId)))
-            //设备发起订阅且不可重复订阅
-//            .filter(device -> device.getSubscribeCycleForCatalog()>0&&!subscribeDevice.contains(device.getId()))
-            .filter(device -> !subscribeDevice.contains(device.getId()))
-            .flatMap(device ->   sipCommander.catalogSubscribe(device, eventResult -> {
-                ResponseEvent event = (ResponseEvent) eventResult.event;
-                if (event.getResponse().getRawContent() != null) {
-                    // 成功
-                    log.info("[目录订阅]成功: {}", device.getId());
-                }else {
-                    // 成功
-                    log.info("[目录订阅]成功: {}", device.getId());
-                }
-                subscribeDevice.add(device.getId());
-            },eventResult -> {
-                // 失败
-                log.warn("[目录订阅]失败,信令发送失败: {}-{} ", device.getId(), eventResult.msg);
-            }))
-            .doOnError(e->{
-                e.printStackTrace();
-            })
-            .onErrorContinue(doOnError)
-            .subscribe();
+//        redisCacheStorage.resetAllCSEQ();
+//        //每隔30S发起一次订阅 todo 集群
+//        Flux.interval(Duration.ofSeconds(10))
+//            //获取所有在线设备信息
+//            .flatMap(ignore->Mono.justOrEmpty(redisCacheStorage.getOnlineForAll()))
+//            .filter(CollectionUtil::isNotEmpty)
+//            .flatMap(deviceIds-> Flux.fromStream(deviceIds.stream()))
+//            .publishOn(Schedulers.boundedElastic())
+//            .flatMap(deviceId->Mono.justOrEmpty(redisCacheStorage.getDevice(deviceId)))
+//            //设备发起订阅且不可重复订阅
+////            .filter(device -> device.getSubscribeCycleForCatalog()>0&&!subscribeDevice.contains(device.getId()))
+////            .filter(device -> !subscribeDevice.contains(device.getId()))
+////            .doOnNext(device ->     subscribeDevice.add(device.getId()))
+//            .flatMap(device ->   sipCommander.catalogSubscribe(device, eventResult -> {
+//                ResponseEvent event = (ResponseEvent) eventResult.event;
+//                if (event.getResponse().getRawContent() != null) {
+//                    // 成功
+//                    log.info("[目录订阅]成功: {}", device.getId());
+//                }else {
+//                    // 成功
+//                    log.info("[目录订阅]成功: {}", device.getId());
+//                }
+//            },eventResult -> {
+//                // 失败
+//                log.warn("[目录订阅]失败,信令发送失败: {}-{} ", device.getId(), eventResult.msg);
+//            }))
+//            .onErrorContinue(doOnError)
+//            .subscribe();
     }
 }

+ 14 - 2
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/processor/SipProcessorObserver.java

@@ -1,6 +1,9 @@
 package org.jetlinks.community.media.sip.processor;
 
+import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor;
+import org.jetlinks.community.gateway.monitor.GatewayMonitors;
 import org.jetlinks.community.media.gb28181.event.SipSubscribe;
 import org.jetlinks.core.event.EventBus;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -23,8 +26,16 @@ import java.util.function.Function;
 @Slf4j
 public class SipProcessorObserver implements ISipProcessObserver {
 
-    @Autowired
-    private SipSubscribe sipSubscribe;
+
+    private final SipSubscribe sipSubscribe;
+
+
+    private final DeviceGatewayMonitor gatewayMonitor;
+
+    public SipProcessorObserver(SipSubscribe sipSubscribe) {
+        this.sipSubscribe = sipSubscribe;
+        this.gatewayMonitor = GatewayMonitors.getDeviceGatewayMonitor("sip");
+    }
 
     private EmitterProcessor<RequestEvent> requestProcessor= EmitterProcessor.create();
     private FluxSink<RequestEvent> requestSink=requestProcessor.sink();
@@ -51,6 +62,7 @@ public class SipProcessorObserver implements ISipProcessObserver {
      */
     @Override
     public void processRequest(RequestEvent requestEvent) {
+        gatewayMonitor.receivedMessage();
         if(requestProcessor.hasDownstreams()){
             requestSink.next(requestEvent);
         }

+ 137 - 140
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/impl/SubscribeRequestProcessor.java

@@ -1,95 +1,96 @@
-//package org.jetlinks.community.media.sip.request.impl;
-//
-//import lombok.AllArgsConstructor;
-//import lombok.extern.slf4j.Slf4j;
-//import org.dom4j.DocumentException;
-//import org.dom4j.Element;
-//import org.jetlinks.community.media.config.UserSetup;
-//import org.jetlinks.community.media.contanst.CmdType;
-//import org.jetlinks.community.media.contanst.VideoManagerConstants;
-//import org.jetlinks.community.media.sip.SipRequestProcessorParent;
-//import org.jetlinks.community.media.storage.impl.RedisCacheStorageImpl;
-//import org.jetlinks.community.utils.SipUtils;
-//import org.jetlinks.community.utils.XmlUtil;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
-//import org.springframework.beans.factory.InitializingBean;
-//import org.springframework.beans.factory.annotation.Autowired;
-//import org.springframework.stereotype.Component;
-//
-//import javax.sip.InvalidArgumentException;
-//import javax.sip.RequestEvent;
-//import javax.sip.ServerTransaction;
-//import javax.sip.SipException;
-//import javax.sip.header.ExpiresHeader;
-//import javax.sip.header.ToHeader;
-//import javax.sip.message.Request;
-//import javax.sip.message.Response;
-//import java.text.ParseException;
-//import java.util.Optional;
-//
-///**
-// * SIP命令类型: SUBSCRIBE请求
-// */
-//@Component
-//@Slf4j
-//public class SubscribeRequestProcessor  extends SipRequestProcessorParent {
-//
-//
-//    @Autowired
-//    private RedisCacheStorageImpl redisCatchStorage;
-//
-//
-//    @Autowired
-//    private UserSetup userSetup;
-//
-//
-//    /**
-//     * 处理SUBSCRIBE请求
-//     *
-//     * @param evt
-//     */
-//    @Override
-//    public void processRequest(RequestEvent evt) {
-//        Request request = evt.getRequest();
-//        try {
-//            Element rootElement = getRootElement(evt);
-//            String cmd = XmlUtil.getText(rootElement, "CmdType");
-//            if (CmdType.MOBILE_POSITION.equals(cmd)) {
-//                processNotifyMobilePosition(evt, rootElement);
-//            } else if (CmdType.ALARM.equals(cmd)) {
-//                log.info("接收到Alarm订阅");
-//                processNotifyAlarm(evt, rootElement);
-//            } else if (CmdType.CATALOG.equals(cmd)) {
-//                processNotifyCatalogList(evt, rootElement);
-//            } else {
-//                log.info("接收到消息:" + cmd);
-//                Response response = null;
-//                response = getMessageFactory().createResponse(200, request);
-//                if (response != null) {
-//                    ExpiresHeader expireHeader = getHeaderFactory().createExpiresHeader(30);
-//                    response.setExpires(expireHeader);
-//                }
-//                log.info("response : " + response);
-//                ServerTransaction transaction = getServerTransaction(evt);
-//                if (transaction != null) {
-//                    transaction.sendResponse(response);
-//                    transaction.getDialog().delete();
-//                    transaction.terminate();
-//                } else {
-//                    log.info("processRequest serverTransactionId is null.");
-//                }
-//            }
-//        } catch (Exception e) {
-//            log.error("SIP订阅发生错误,",e);
-//        }
-//
-//    }
-//
-//    /**
-//     * 处理移动位置订阅消息
-//     */
-//    private void processNotifyMobilePosition(RequestEvent evt, Element rootElement) {
+package org.jetlinks.community.media.sip.request.impl;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.dom4j.DocumentException;
+import org.dom4j.Element;
+import org.jetlinks.community.media.bean.SubscribeInfo;
+import org.jetlinks.community.media.config.UserSetup;
+import org.jetlinks.community.media.contanst.CmdType;
+import org.jetlinks.community.media.contanst.VideoManagerConstants;
+import org.jetlinks.community.media.sip.SipRequestProcessorParent;
+import org.jetlinks.community.media.storage.impl.RedisCacheStorageImpl;
+import org.jetlinks.community.utils.SipUtils;
+import org.jetlinks.community.utils.XmlUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import javax.sip.InvalidArgumentException;
+import javax.sip.RequestEvent;
+import javax.sip.ServerTransaction;
+import javax.sip.SipException;
+import javax.sip.header.ExpiresHeader;
+import javax.sip.header.ToHeader;
+import javax.sip.message.Request;
+import javax.sip.message.Response;
+import java.text.ParseException;
+import java.util.Optional;
+
+/**
+ * SIP命令类型: SUBSCRIBE请求
+ */
+@Component
+@Slf4j
+public class SubscribeRequestProcessor  extends SipRequestProcessorParent {
+
+
+    @Autowired
+    private RedisCacheStorageImpl redisCatchStorage;
+
+
+    @Autowired
+    private UserSetup userSetup;
+
+
+    /**
+     * 处理SUBSCRIBE请求
+     *
+     * @param evt
+     */
+    @Override
+    public void processRequest(RequestEvent evt) {
+        Request request = evt.getRequest();
+        try {
+            Element rootElement = getRootElement(evt);
+            String cmd = XmlUtil.getText(rootElement, "CmdType");
+            if (CmdType.MOBILE_POSITION.equals(cmd)) {
+                processNotifyMobilePosition(evt, rootElement);
+            } else if (CmdType.ALARM.equals(cmd)) {
+                log.info("接收到Alarm订阅");
+                processNotifyAlarm(evt, rootElement);
+            } else if (CmdType.CATALOG.equals(cmd)) {
+                processNotifyCatalogList(evt, rootElement);
+            } else {
+                log.info("接收到消息:" + cmd);
+                Response response = null;
+                response = getMessageFactory().createResponse(200, request);
+                if (response != null) {
+                    ExpiresHeader expireHeader = getHeaderFactory().createExpiresHeader(30);
+                    response.setExpires(expireHeader);
+                }
+                log.info("response : " + response);
+                ServerTransaction transaction = getServerTransaction(evt);
+                if (transaction != null) {
+                    transaction.sendResponse(response);
+                    transaction.getDialog().delete();
+                    transaction.terminate();
+                } else {
+                    log.info("processRequest serverTransactionId is null.");
+                }
+            }
+        } catch (Exception e) {
+            log.error("SIP订阅发生错误,",e);
+        }
+
+    }
+
+    /**
+     * 处理移动位置订阅消息
+     */
+    private void processNotifyMobilePosition(RequestEvent evt, Element rootElement) {
 //        String platformId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
 //        String deviceID = XmlUtil.getText(rootElement, "DeviceID");
 //        SubscribeInfo subscribeInfo = new SubscribeInfo(evt, platformId);
@@ -133,51 +134,47 @@
 //        } catch (ParseException e) {
 //            e.printStackTrace();
 //        }
-//    }
-//
-//    private void processNotifyAlarm(RequestEvent evt, Element rootElement) {
-//
-//    }
-//
-//    private void processNotifyCatalogList(RequestEvent evt, Element rootElement) {
-//        String platformId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
-//        String deviceID = XmlUtil.getText(rootElement, "DeviceID");
-//        SubscribeInfo subscribeInfo = new SubscribeInfo(evt, platformId);
-//        String sn = XmlUtil.getText(rootElement, "SN");
-//        String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() +  "_Catalog_" + platformId;
-//        log.info("接收到{}的Catalog订阅", platformId);
-//        StringBuilder resultXml = new StringBuilder(200);
-//        resultXml.append("<?xml version=\"1.0\" ?>\r\n")
-//            .append("<Response>\r\n")
-//            .append("<CmdType>Catalog</CmdType>\r\n")
-//            .append("<SN>" + sn + "</SN>\r\n")
-//            .append("<DeviceID>" + deviceID + "</DeviceID>\r\n")
-//            .append("<Result>OK</Result>\r\n")
-//            .append("</Response>\r\n");
-//
-//        if (subscribeInfo.getExpires() > 0) {
-//            redisCatchStorage.updateSubscribe(key, subscribeInfo);
-//        }else if (subscribeInfo.getExpires() == 0) {
-//            redisCatchStorage.delSubscribe(key);
-//        }
-//
-//        try {
-//            Response response = responseXmlAck(evt, resultXml.toString());
-//            ToHeader toHeader = (ToHeader)response.getHeader(ToHeader.NAME);
-//            subscribeInfo.setToTag(toHeader.getTag());
-//            redisCatchStorage.updateSubscribe(key, subscribeInfo);
-//
-//        } catch (SipException e) {
-//            e.printStackTrace();
-//        } catch (InvalidArgumentException e) {
-//            e.printStackTrace();
-//        } catch (ParseException e) {
-//            e.printStackTrace();
-//        }
-//    }
-//
-//    @Override
-//    public String getMethod() {
-//        return Request.SUBSCRIBE;
-//    }
-//}
+    }
+
+    private void processNotifyAlarm(RequestEvent evt, Element rootElement) {
+
+    }
+
+    private void processNotifyCatalogList(RequestEvent evt, Element rootElement) {
+        String platformId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
+        String deviceID = XmlUtil.getText(rootElement, "DeviceID");
+        SubscribeInfo subscribeInfo = new SubscribeInfo(evt, platformId);
+        String sn = XmlUtil.getText(rootElement, "SN");
+        String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() +  "_Catalog_" + platformId;
+        log.info("接收到{}的Catalog订阅", platformId);
+        StringBuilder resultXml = new StringBuilder(200);
+        resultXml.append("<?xml version=\"1.0\" ?>\r\n")
+            .append("<Response>\r\n")
+            .append("<CmdType>Catalog</CmdType>\r\n")
+            .append("<SN>" + sn + "</SN>\r\n")
+            .append("<DeviceID>" + deviceID + "</DeviceID>\r\n")
+            .append("<Result>OK</Result>\r\n")
+            .append("</Response>\r\n");
+
+        if (subscribeInfo.getExpires() > 0) {
+            redisCatchStorage.updateSubscribe(key, subscribeInfo);
+        }else if (subscribeInfo.getExpires() == 0) {
+            redisCatchStorage.delSubscribe(key);
+        }
+
+        try {
+            Response response = responseXmlAck(evt, resultXml.toString());
+            ToHeader toHeader = (ToHeader)response.getHeader(ToHeader.NAME);
+            subscribeInfo.setToTag(toHeader.getTag());
+            redisCatchStorage.updateSubscribe(key, subscribeInfo);
+
+        } catch (Exception e) {
+            log.error("设备目录订阅发生错误,",e);
+        }
+    }
+
+    @Override
+    public String getMethod() {
+        return Request.SUBSCRIBE;
+    }
+}

+ 7 - 17
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/message/MessageRequestProcessor.java

@@ -6,8 +6,10 @@ import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.dom4j.DocumentException;
 import org.dom4j.Element;
+import org.jetlinks.community.media.bean.DeviceNotFoundEvent;
 import org.jetlinks.community.media.entity.MediaDevice;
 import org.jetlinks.community.media.bean.ParentPlatform;
+import org.jetlinks.community.media.gb28181.event.SipSubscribe;
 import org.jetlinks.community.media.service.LocalMediaDeviceService;
 import org.jetlinks.community.media.sip.SipContext;
 import org.jetlinks.community.media.sip.SipRequestProcessorParent;
@@ -43,20 +45,8 @@ public class MessageRequestProcessor extends SipRequestProcessorParent {
     private EmitterProcessor<Tuple3<RequestEvent,ParentPlatform,Element>> platProcessor=EmitterProcessor.create();
     private FluxSink<Tuple3<RequestEvent,ParentPlatform,Element>> platFluxSink=platProcessor.sink();
 
-//    @Autowired
-//    private IVideoManagerStorager storage;
-//
-//    @Autowired
-//    private SipSubscribe sipSubscribe;
-//
-//    @Autowired
-//    private IRedisCatchStorage redisCatchStorage;
-
-    @Autowired
-    private RedissonClient client;
     @Autowired
-    private LocalMediaDeviceService deviceService;
-
+    private SipSubscribe sipSubscribe;
     @Autowired
     private RedisCacheStorageImpl redisCacheStorage;
     @SneakyThrows
@@ -75,10 +65,10 @@ public class MessageRequestProcessor extends SipRequestProcessorParent {
                         // 不存在则回复404
                         responseAck(evt, Response.NOT_FOUND, "device "+ deviceId +" not found");
                         log.warn("[设备未找到 ]: {}", deviceId);
-//                if (sipSubscribe.getErrorSubscribe(callIdHeader.getCallId()) != null){
-//                    SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(new DeviceNotFoundEvent(evt.getDialog()));
-//                    sipSubscribe.getErrorSubscribe(callIdHeader.getCallId()).response(eventResult);
-//                };
+                        if (sipSubscribe.getErrorSubscribe(callIdHeader.getCallId()) != null){
+                            SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(new DeviceNotFoundEvent(evt.getDialog()));
+                            sipSubscribe.getErrorSubscribe(callIdHeader.getCallId()).accept(eventResult);
+                        };
                     }else {
                         Element rootElement = getRootElement(evt, SipContext.getConfig().getCharset());
                         if (device != null) {

+ 7 - 4
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/message/notify/CatalogResponseMessageProcessor.java

@@ -14,11 +14,8 @@ import org.jetlinks.community.media.sip.request.message.MessageHandlerAbstract;
 import org.jetlinks.community.media.transmit.callback.DeferredResultHolder;
 import org.jetlinks.community.media.transmit.callback.RequestMessage;
 import org.jetlinks.community.utils.XmlUtil;
-import org.jetlinks.core.cluster.ClusterEventBus;
-import org.springframework.http.HttpStatus;
 import org.springframework.stereotype.Component;
 import reactor.core.publisher.Mono;
-
 import javax.sip.RequestEvent;
 import javax.sip.message.Response;
 import java.util.*;
@@ -37,6 +34,7 @@ public class CatalogResponseMessageProcessor extends MessageHandlerAbstract {
 
     private final LocalMediaDeviceChannelService deviceChannelService;
     private final DeferredResultHolder deferredResultHolder;
+    private final LocalMediaDeviceService mediaDeviceService;
     @Override
     public String getMethod() {
         return "Catalog";
@@ -49,7 +47,6 @@ public class CatalogResponseMessageProcessor extends MessageHandlerAbstract {
 
     @Override
     public void handleForDevice(RequestEvent evt, MediaDevice device, Element element) {
-        String topic = DeferredResultHolder.CALLBACK_CMD_CATALOG + device.getId();
         Element rootElement = null;
         try {
             rootElement = getRootElement(evt, Optional.ofNullable(device.getCharset()).orElse("UTF-8"));
@@ -89,11 +86,17 @@ public class CatalogResponseMessageProcessor extends MessageHandlerAbstract {
                     msg.setData(result);
                     deferredResultHolder.invokeAllResult(msg);
                 })
+                    .thenReturn(1L)
                     .flatMap(__->
                         deviceChannelService.createDelete()
                             .where(MediaDeviceChannel::getDeviceId,device.getId())
                             .execute())
                     .flatMap(__->deviceChannelService.save(channelList))
+                    .flatMap(result->
+                        mediaDeviceService.createUpdate()
+                            .where(MediaDevice::getId,device.getId())
+                            .set(MediaDevice::getChannelCount,channelList.size())
+                            .execute())
                     .subscribe();
             }
             // 回复200 OK

+ 1 - 1
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/message/notify/KeepaliveNotifyMessageHandler.java

@@ -64,7 +64,7 @@ public class KeepaliveNotifyMessageHandler extends MessageHandlerAbstract {
                 eventBus.publish("/media/device/heart-beat/"+device.getId(),device).subscribe();
             }
         } catch (SipException e) {
-            e.printStackTrace();
+            log.error("sip 配置,",e);
         } catch (InvalidArgumentException e) {
             log.error("sip 参数错误,",e);
         } catch (ParseException e) {

+ 18 - 21
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/storage/impl/RedisCacheStorageImpl.java

@@ -4,12 +4,9 @@ import cn.hutool.core.collection.CollectionUtil;
 import cn.hutool.json.JSONObject;
 import cn.hutool.json.JSONUtil;
 import lombok.extern.slf4j.Slf4j;
-import org.jetlinks.community.media.bean.SendRtpItem;
-import org.jetlinks.community.media.bean.StreamInfo;
-import org.jetlinks.community.media.bean.ThirdPartyGB;
+import org.jetlinks.community.media.bean.*;
 import org.jetlinks.community.media.contanst.VideoManagerConstants;
 import org.jetlinks.community.media.entity.MediaDevice;
-import org.jetlinks.community.media.bean.ParentPlatform;
 import org.jetlinks.community.media.entity.MediaDeviceChannel;
 import org.jetlinks.community.media.sip.SipContext;
 import org.jetlinks.community.media.zlm.dto.MediaItem;
@@ -460,23 +457,23 @@ public class RedisCacheStorageImpl {
 //        String key = VideoManagerConstants.WVP_STREAM_GPS_MSG_PREFIX + serverId + "_" + gbId;
 //        return (GPSMsgInfo)redis.get(key);
 //    }
-//
-//
-//    public void updateSubscribe(String key, SubscribeInfo subscribeInfo) {
-//        redis.set(key, subscribeInfo, subscribeInfo.getExpires());
-//    }
-//
-//
-//    public SubscribeInfo getSubscribe(String key) {
-//        return (SubscribeInfo)redis.get(key);
-//    }
-//
-//
-//    public void delSubscribe(String key) {
-//        redis.del(key);
-//    }
-//
-//
+
+
+    public void updateSubscribe(String key, SubscribeInfo subscribeInfo) {
+        redis.set(key, subscribeInfo, subscribeInfo.getExpires());
+    }
+
+
+    public SubscribeInfo getSubscribe(String key) {
+        return (SubscribeInfo)redis.get(key);
+    }
+
+
+    public void delSubscribe(String key) {
+        redis.del(key);
+    }
+
+
 //    public List<GPSMsgInfo> getAllGpsMsgInfo() {
 //        String scanKey = VideoManagerConstants.WVP_STREAM_GPS_MSG_PREFIX + serverId + "_*";
 //        List<GPSMsgInfo> result = new ArrayList<>();

+ 18 - 2
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/transmit/cmd/SipCommander.java

@@ -8,6 +8,8 @@ import gov.nist.javax.sip.stack.SIPDialog;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.hswebframework.web.exception.BusinessException;
+import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor;
+import org.jetlinks.community.gateway.monitor.GatewayMonitors;
 import org.jetlinks.community.media.bean.SSRCInfo;
 import org.jetlinks.community.media.config.UserSetup;
 import org.jetlinks.community.media.entity.MediaDevice;
@@ -29,6 +31,7 @@ import javax.sip.message.Request;
 import java.lang.reflect.Field;
 import java.text.ParseException;
 import java.util.HashSet;
+import java.util.Optional;
 
 /**
  * @author lifang
@@ -39,7 +42,6 @@ import java.util.HashSet;
  */
 @Service
 @Slf4j
-@AllArgsConstructor
 public class SipCommander {
 
     private final SIPRequestHeaderProvider headerProvider;
@@ -49,6 +51,18 @@ public class SipCommander {
     private final UserSetup userSetup;
     private final SipSubscribe sipSubscribe;
     private final RedisCacheStorageImpl redisCacheStorage;
+    private final DeviceGatewayMonitor gatewayMonitor;
+
+    public SipCommander(SIPRequestHeaderProvider headerProvider, ZLMHttpHookSubscribe subscribe, LocalMediaServerItemService mediaServerItemService, VideoStreamSessionManager streamSessionManager, UserSetup userSetup, SipSubscribe sipSubscribe, RedisCacheStorageImpl redisCacheStorage) {
+        this.headerProvider = headerProvider;
+        this.subscribe = subscribe;
+        this.mediaServerItemService = mediaServerItemService;
+        this.streamSessionManager = streamSessionManager;
+        this.userSetup = userSetup;
+        this.sipSubscribe = sipSubscribe;
+        this.redisCacheStorage = redisCacheStorage;
+        this.gatewayMonitor = GatewayMonitors.getDeviceGatewayMonitor("sip");
+    }
 
     public Mono<Void> playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, MediaDevice device, String channelId,
                                     ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent) {
@@ -323,7 +337,8 @@ public class SipCommander {
     public Mono<Boolean> catalogSubscribe(MediaDevice device, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) {
         try {
             StringBuilder cmdXml = new StringBuilder(200);
-            cmdXml.append("<?xml version=\"1.0\" encoding=\"GB2312\"?>\r\n");
+//            cmdXml.append("<?xml version=\"1.0\" encoding="+"\"GB2312\""+"?>\r\n");
+            cmdXml.append("<?xml version=\"1.0\" encoding="+ Optional.ofNullable(SipContext.getConfig().getCharset()).orElse("utf-8") +"?>\r\n");
             cmdXml.append("<Query>\r\n");
             cmdXml.append("<CmdType>Catalog</CmdType>\r\n");
             cmdXml.append("<SN>" + (int)((Math.random()*9+1)*100000) + "</SN>\r\n");
@@ -375,6 +390,7 @@ public class SipCommander {
         return Mono.fromRunnable(()->{
             try {
                 clientTransaction.sendRequest();
+                gatewayMonitor.sentMessage();
             } catch (SipException e) {
                 log.error("向设备发送sip请求失败,",e);
             }

+ 1 - 1
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/ZLMRunner.java

@@ -47,7 +47,7 @@ public class ZLMRunner implements CommandLineRunner {
         //获取默认服务器配置
         mediaServerItemService.getDefaultMediaServer()
             //添加默认媒体服务器配置
-            .defaultIfEmpty(mediaConfig.getMediaSerItem())
+            .switchIfEmpty(mediaServerItemService.save(mediaConfig.getMediaSerItem()).thenReturn(mediaConfig.getMediaSerItem()))
             //清除在线服务器信息
             .mergeWith(mediaServerItemService.clearMediaServerForOnline().then(Mono.empty()))
             .mergeWith(Mono.delay(Duration.ofSeconds(20)).flatMap(ignore->timeOutHandle()).then(Mono.empty()))

+ 10 - 15
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/DefaultDeviceSessionManager.java

@@ -289,21 +289,16 @@ public class DefaultDeviceSessionManager implements DeviceSessionManager {
 
 
         //注册中心上线
-        if(!session.isMock()){
-            session.getOperator()
-                .online(session.getServerId().orElse(serverId), session.getId(), session.getClientAddress().map(String::valueOf).orElse(null))
-                .doFinally(s -> {
-                    //通知
-                    if (onDeviceRegister.hasDownstreams()) {
-                        registerListener.next(session);
-                    }
-                })
-                .subscribe();
-        }else {
-            session.getOperator()
-                .mockOnline(session.getServerId().orElse(serverId), session.getId(), session.getClientAddress().map(String::valueOf).orElse(null))
-                .subscribe();
-        }
+        session.getOperator()
+            .online(session.getServerId().orElse(serverId), session.getId(), session.getClientAddress().map(String::valueOf).orElse(null))
+            .doFinally(s -> {
+                //通知
+                if (onDeviceRegister.hasDownstreams()) {
+                    registerListener.next(session);
+                }
+            })
+            .subscribe();
+
         return old;
     }