Browse Source

fix id改为deviceId

18339543638 3 years ago
parent
commit
33908be22d
42 changed files with 683 additions and 526 deletions
  1. 1 1
      jetlinks-components/common-component/src/main/java/org/jetlinks/community/annotation/MessageValueCodec.java
  2. 4 4
      jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/resources/AbstractCoapResource.java
  3. 10 10
      jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/support/JetlinksExtendTopicMessageCodec.java
  4. 1 1
      jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/DeviceMessageSendTaskExecutorProvider.java
  5. 3 3
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/entity/DeviceTagEntity.java
  6. 3 3
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DeviceEventMeasurement.java
  7. 2 2
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DeviceEventsMeasurement.java
  8. 3 3
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertiesMeasurement.java
  9. 5 5
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertyMeasurement.java
  10. 3 3
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/status/DeviceStatusChangeMeasurement.java
  11. 5 5
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/topic/DeviceTopicMeasurement.java
  12. 1 1
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceCurrentStateSubscriptionProvider.java
  13. 3 3
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageSendSubscriptionProvider.java
  14. 11 11
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/AbstractDeviceDataStoragePolicy.java
  15. 5 5
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesColumnDeviceDataStoragePolicy.java
  16. 7 7
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesRowDeviceDataStoreStoragePolicy.java
  17. 1 1
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/timeseries/DeviceLogTimeSeriesMetadata.java
  18. 1 1
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/timeseries/DevicePropertiesTimeSeriesMetadata.java
  19. 1 1
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/timeseries/FixedPropertiesTimeSeriesMetadata.java
  20. 4 4
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/DeviceInstanceController.java
  21. 1 1
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/config/UserSetup.java
  22. 4 0
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/contanst/VideoManagerConstants.java
  23. 93 115
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/controller/PlayController.java
  24. 1 1
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/entity/MediaDevice.java
  25. 5 1
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/gb28181/bean/SsrcTransaction.java
  26. 7 10
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalMediaDeviceService.java
  27. 1 1
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalPlayService.java
  28. 4 0
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/SipRequestProcessorParent.java
  29. 69 0
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/SipRunner.java
  30. 16 16
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/processor/SipProcessorObserver.java
  31. 0 99
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/impl/MessageRequestProcessor.java
  32. 0 13
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/impl/RegisterRequestProcessor.java
  33. 183 0
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/impl/SubscribeRequestProcessor.java
  34. 58 67
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/storage/impl/RedisCacheStorageImpl.java
  35. 45 14
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/transmit/cmd/SipCommander.java
  36. 62 51
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/ZLMHttpHookListener.java
  37. 2 0
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/utils/RedisUtil.java
  38. 0 5
      jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/utils/SipUtils.java
  39. 2 2
      jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/providers/DeviceAlarmProvider.java
  40. 7 7
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/device/DeviceAlarmTaskExecutorProvider.java
  41. 48 48
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/device/RuleSceneTaskExecutorProvider.java
  42. 1 1
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/entity/DeviceAlarmEntity.java

+ 1 - 1
jetlinks-components/common-component/src/main/java/org/jetlinks/community/annotation/MessageValueCodec.java

@@ -36,7 +36,7 @@ public class MessageValueCodec implements ValueCodec<Object, Object> {
         Object msgType = jsonObject.get("messageType");
         if(MessageType.CHILD.name().equals(msgType)){
             JSONObject childDeviceMessage = (JSONObject) jsonObject.get("childDeviceMessage");
-            return ChildDeviceMessage.create(String.valueOf(jsonObject.get("id")), (DeviceMessage) this.decode(childDeviceMessage));
+            return ChildDeviceMessage.create(String.valueOf(jsonObject.get("deviceId")), (DeviceMessage) this.decode(childDeviceMessage));
         }
         MessageType messageType = MessageType.of(String.valueOf(msgType)).orElse(MessageType.UNKNOWN);
         return jsonObject.toJavaObject(messageType.getNewInstance().get().getClass());

+ 4 - 4
jetlinks-components/network-component/coap-component/src/main/java/org/jetlinks/community/network/coap/resources/AbstractCoapResource.java

@@ -29,7 +29,7 @@ import java.util.function.Function;
 public abstract class AbstractCoapResource extends CoapResource {
     private final EmitterProcessor<CoapExchangeMessage> processor;
 
-    public static final String prefixTopicName="{productId}/{id}/**";
+    public static final String prefixTopicName="{productId}/{deviceId}/**";
 
     public static final AntPathMatcher matcher = new AntPathMatcher(File.separator);
     public AbstractCoapResource(EmitterProcessor<CoapExchangeMessage> processor) {
@@ -58,21 +58,21 @@ public abstract class AbstractCoapResource extends CoapResource {
         path=path.startsWith("/")?path.substring(1):path;
         String[] split = path.split("/");
         if(split.length<2){
-            log.warn("Coap连接,id:[{}],productId:[{}],不能为空",exchange.getDeviceId(),exchange.getProductId());
+            log.warn("Coap连接,deviceId:[{}],productId:[{}],不能为空",exchange.getDeviceId(),exchange.getProductId());
             exchange.reject();
         }
         exchange.setDeviceId(split[1]);
         exchange.setProductId(split[0]);
         Map<String, String> elseMap = new HashMap<>();
         map.forEach((k,v)->{
-            if(!"id".equals(k)&&!"productId".equals(k)){
+            if(!"deviceId".equals(k)&&!"productId".equals(k)){
                 elseMap.put(k,v);
             }
         });
         exchange.setElseParams(elseMap);
         if (StrUtil.isNullOrUndefined(exchange.getDeviceId())
             || StrUtil.isNullOrUndefined(exchange.getProductId())) {
-            log.warn("Coap连接,id:[{}],productId:[{}],不能为空",exchange.getDeviceId(),exchange.getProductId());
+            log.warn("Coap连接,deviceId:[{}],productId:[{}],不能为空",exchange.getDeviceId(),exchange.getProductId());
             exchange.reject();
         }
         if(processor.hasDownstreams()){

+ 10 - 10
jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/support/JetlinksExtendTopicMessageCodec.java

@@ -55,7 +55,7 @@ public class JetlinksExtendTopicMessageCodec {
         private boolean log;
         public DecodeResult(String topic) {
             this.topic = topic;
-            args = TopicUtils.getPathVariables("/{productId}/{id}/**", topic);
+            args = TopicUtils.getPathVariables("/{productId}/{deviceId}/**", topic);
             if (topic.contains("child")) {
                 child = true;
                 args.putAll(TopicUtils.getPathVariables("/**/child/{childDeviceId}/**", topic));
@@ -93,7 +93,7 @@ public class JetlinksExtendTopicMessageCodec {
         private final String topic;
 
         public String getDeviceId() {
-            return args.get("id");
+            return args.get("deviceId");
         }
 
         public String getChildDeviceId() {
@@ -105,7 +105,7 @@ public class JetlinksExtendTopicMessageCodec {
 
     protected JetlinksExtendTopicMessageCodec.EncodedTopic encode(String deviceId, Message message) {
 
-        Assert.hasText(deviceId, "id can not be null");
+        Assert.hasText(deviceId, "deviceId can not be null");
         Assert.notNull(message, "message can not be null");
 
         if (message instanceof ReadPropertyMessage) {
@@ -113,7 +113,7 @@ public class JetlinksExtendTopicMessageCodec {
             JSONObject mqttData = new JSONObject();
             mqttData.put("messageId", message.getMessageId());
             mqttData.put("properties", ((ReadPropertyMessage) message).getProperties());
-            mqttData.put("id", deviceId);
+            mqttData.put("deviceId", deviceId);
 
             return new JetlinksExtendTopicMessageCodec.EncodedTopic(topic, mqttData);
         } else if (message instanceof WritePropertyMessage) {
@@ -121,7 +121,7 @@ public class JetlinksExtendTopicMessageCodec {
             JSONObject mqttData = new JSONObject();
             mqttData.put("messageId", message.getMessageId());
             mqttData.put("properties", ((WritePropertyMessage) message).getProperties());
-            mqttData.put("id", deviceId);
+            mqttData.put("deviceId", deviceId);
 
             return new JetlinksExtendTopicMessageCodec.EncodedTopic(topic, mqttData);
         } else if (message instanceof FunctionInvokeMessage) {
@@ -131,7 +131,7 @@ public class JetlinksExtendTopicMessageCodec {
             mqttData.put("messageId", message.getMessageId());
             mqttData.put("function", invokeMessage.getFunctionId());
             mqttData.put("inputs", invokeMessage.getInputs());
-            mqttData.put("id", deviceId);
+            mqttData.put("deviceId", deviceId);
 
             return new JetlinksExtendTopicMessageCodec.EncodedTopic(topic, mqttData);
         } else if (message instanceof UpgradeFirmwareMessage) {
@@ -145,14 +145,14 @@ public class JetlinksExtendTopicMessageCodec {
             mqttData.put("taskId",firmwareMessage.getTaskId());
             mqttData.put("signMethod", firmwareMessage.getSignMethod());
             mqttData.put("parameters", firmwareMessage.getParameters());
-            mqttData.put("id", deviceId);
+            mqttData.put("deviceId", deviceId);
 
             return new JetlinksExtendTopicMessageCodec.EncodedTopic(topic, mqttData);
         } else if (message instanceof ReadFirmwareMessage) {
             String topic = "/" .concat(deviceId).concat("/firmware/read");
             JSONObject mqttData = new JSONObject();
             mqttData.put("messageId", message.getMessageId());
-            mqttData.put("id", deviceId);
+            mqttData.put("deviceId", deviceId);
             return new JetlinksExtendTopicMessageCodec.EncodedTopic(topic, mqttData);
         } else if (message instanceof TimeSyncReplyMessage) {
             String topic = "/" .concat(deviceId).concat("/time-sync/reply");
@@ -171,13 +171,13 @@ public class JetlinksExtendTopicMessageCodec {
             mqttData.put("signMethod", firmwareMessage.getSignMethod());
             mqttData.put("parameters", firmwareMessage.getParameters());
             mqttData.put("taskId",firmwareMessage.getTaskId());
-            mqttData.put("id", deviceId);
+            mqttData.put("deviceId", deviceId);
             return new JetlinksExtendTopicMessageCodec.EncodedTopic(topic, mqttData);
         } else if (message instanceof ChildDeviceMessage) {
             ChildDeviceMessage childDeviceMessage = ((ChildDeviceMessage) message);
             JetlinksExtendTopicMessageCodec.EncodedTopic result = encode(childDeviceMessage.getChildDeviceId(), childDeviceMessage.getChildDeviceMessage());
             String topic = "/" .concat(deviceId).concat("/child").concat(result.topic);
-            result.payload.put("id", childDeviceMessage.getChildDeviceId());
+            result.payload.put("deviceId", childDeviceMessage.getChildDeviceId());
 
             return new JetlinksExtendTopicMessageCodec.EncodedTopic(topic, result.payload);
         }

+ 1 - 1
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/DeviceMessageSendTaskExecutorProvider.java

@@ -115,7 +115,7 @@ public class DeviceMessageSendTaskExecutorProvider implements TaskExecutorProvid
         public Publisher<DeviceMessageReply> doSend(Map<String, Object> ctx, DeviceOperator device) {
             Map<String, Object> message = new HashMap<>(this.message);
             message.put("messageId", IDGenerator.SNOW_FLAKE_STRING.generate());
-            message.put("id", device.getDeviceId());
+            message.put("deviceId", device.getDeviceId());
             return Mono
                 .justOrEmpty(MessageType.convertMessage(message))
                 .cast(RepayableDeviceMessage.class)

+ 3 - 3
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/entity/DeviceTagEntity.java

@@ -35,7 +35,7 @@ import java.util.Objects;
 public class DeviceTagEntity extends GenericEntity<String> {
 
     @Column(length = 64, nullable = false, updatable = false)
-    @NotBlank(message = "[id]不能为空", groups = CreateGroup.class)
+    @NotBlank(message = "[deviceId]不能为空", groups = CreateGroup.class)
     @Schema(description = "设备ID")
     private String deviceId;
 
@@ -77,7 +77,7 @@ public class DeviceTagEntity extends GenericEntity<String> {
 
 
     public static DeviceTagEntity of(PropertyMetadata property) {
-       DeviceTagEntity entity = new DeviceTagEntity();
+        DeviceTagEntity entity = new DeviceTagEntity();
         entity.setKey(property.getId());
         entity.setName(property.getName());
         entity.setType(property.getValueType().getId());
@@ -107,7 +107,7 @@ public class DeviceTagEntity extends GenericEntity<String> {
         DeviceTagEntity that = (DeviceTagEntity) o;
         return
             Objects.equals(getDeviceId(), that.getDeviceId()) &&
-            Objects.equals(getKey(), that.getKey());
+                Objects.equals(getKey(), that.getKey());
     }
 
     @Override

+ 3 - 3
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DeviceEventMeasurement.java

@@ -40,14 +40,14 @@ class DeviceEventMeasurement extends StaticMeasurement {
     }
 
     static ConfigMetadata configMetadata = new DefaultConfigMetadata()
-        .add("id", "设备", "指定设备", new StringType().expand("selector", "device-selector"))
+        .add("deviceId", "设备", "指定设备", new StringType().expand("selector", "device-selector"))
         .add("history", "历史数据量", "查询出历史数据后开始推送实时数据", new IntType().min(0).expand("defaultValue", 10));
 
 
     Flux<SimpleMeasurementValue> fromHistory(String deviceId, int history) {
         return history <= 0 ? Flux.empty() : QueryParamEntity.newQuery()
             .doPaging(0, history)
-            .where("id", deviceId)
+            .where("deviceId", deviceId)
             .execute(q->deviceDataService.queryEvent(deviceId,eventMetadata.getId(),q,false))
             .map(data -> SimpleMeasurementValue.of(data, data.getTimestamp()))
             .sort(MeasurementValue.sort());
@@ -91,7 +91,7 @@ class DeviceEventMeasurement extends StaticMeasurement {
 
         @Override
         public Flux<MeasurementValue> getValue(MeasurementParameter parameter) {
-            return Mono.justOrEmpty(parameter.getString("id"))
+            return Mono.justOrEmpty(parameter.getString("deviceId"))
                 .flatMapMany(deviceId -> {
                     int history = parameter.getInt("history").orElse(0);
                     //合并历史数据和实时数据

+ 2 - 2
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DeviceEventsMeasurement.java

@@ -74,7 +74,7 @@ class DeviceEventsMeasurement extends StaticMeasurement {
     }
 
     static ConfigMetadata configMetadata = new DefaultConfigMetadata()
-        .add("id", "设备", "指定设备", new StringType().expand("selector", "device-selector"));
+        .add("deviceId", "设备", "指定设备", new StringType().expand("selector", "device-selector"));
 
     /**
      * 实时设备事件
@@ -116,7 +116,7 @@ class DeviceEventsMeasurement extends StaticMeasurement {
         @Override
         public Flux<MeasurementValue> getValue(MeasurementParameter parameter) {
             return Mono
-                .justOrEmpty(parameter.getString("id"))
+                .justOrEmpty(parameter.getString("deviceId"))
                 .flatMapMany(deviceId -> {
                     int history = parameter.getInt("history").orElse(0);
                     return //合并历史数据和实时数据

+ 3 - 3
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertiesMeasurement.java

@@ -109,7 +109,7 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
     }
 
     static ConfigMetadata configMetadata = new DefaultConfigMetadata()
-        .add("id", "设备", "指定设备", new StringType().expand("selector", "device-selector"));
+        .add("deviceId", "设备", "指定设备", new StringType().expand("selector", "device-selector"));
 
     static Set<String> getPropertiesFromParameter(MeasurementParameter parameter) {
         return parameter
@@ -152,7 +152,7 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
         @Override
         public Flux<MeasurementValue> getValue(MeasurementParameter parameter) {
             return Mono
-                .justOrEmpty(parameter.getString("id"))
+                .justOrEmpty(parameter.getString("deviceId"))
                 .flatMapMany(deviceId -> {
                     int history = parameter.getInt("history").orElse(1);
 
@@ -192,7 +192,7 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
         @Override
         public Flux<MeasurementValue> getValue(MeasurementParameter parameter) {
             return Mono
-                .justOrEmpty(parameter.getString("id"))
+                .justOrEmpty(parameter.getString("deviceId"))
                 .flatMapMany(deviceId -> {
                     int history = parameter.getInt("history").orElse(0);
                     //合并历史数据和实时数据

+ 5 - 5
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertyMeasurement.java

@@ -92,14 +92,14 @@ class DevicePropertyMeasurement extends StaticMeasurement {
     }
 
     static ConfigMetadata configMetadata = new DefaultConfigMetadata()
-        .add("id", "设备", "指定设备", new StringType().expand("selector", "device-selector"))
+        .add("deviceId", "设备", "指定设备", new StringType().expand("selector", "device-selector"))
         .add("history", "历史数据量", "查询出历史数据后开始推送实时数据", new IntType().min(0).expand("defaultValue", 10))
         .add("from", "时间从", "", StringType.GLOBAL)
         .add("to", "时间至", "", StringType.GLOBAL);
     ;
 
     static ConfigMetadata aggConfigMetadata = new DefaultConfigMetadata()
-        .add("id", "设备ID", "", StringType.GLOBAL)
+        .add("deviceId", "设备ID", "", StringType.GLOBAL)
         .add("time", "周期", "例如: 1h,10m,30s", StringType.GLOBAL)
         .add("agg", "聚合类型", "count,sum,avg,max,min", StringType.GLOBAL)
         .add("format", "时间格式", "如: MM-dd:HH", StringType.GLOBAL)
@@ -140,7 +140,7 @@ class DevicePropertyMeasurement extends StaticMeasurement {
         @Override
         public Flux<SimpleMeasurementValue> getValue(MeasurementParameter parameter) {
 
-            String deviceId = parameter.getString("id", null);
+            String deviceId = parameter.getString("deviceId", null);
             DeviceDataService.AggregationRequest request = new DeviceDataService.AggregationRequest();
             DeviceDataService.DevicePropertyAggregation aggregation = new DeviceDataService.DevicePropertyAggregation(
                 metadata.getId(), metadata.getId(), parameter.getString("agg").map(String::toUpperCase).map(Aggregation::valueOf).orElse(Aggregation.AVG)
@@ -205,7 +205,7 @@ class DevicePropertyMeasurement extends StaticMeasurement {
 
         @Override
         public Flux<MeasurementValue> getValue(MeasurementParameter parameter) {
-            return Mono.justOrEmpty(parameter.getString("id"))
+            return Mono.justOrEmpty(parameter.getString("deviceId"))
                 .flatMapMany(deviceId -> {
                     int history = parameter.getInt("history").orElse(1);
                     return  QueryParamEntity.newQuery()
@@ -253,7 +253,7 @@ class DevicePropertyMeasurement extends StaticMeasurement {
 
         @Override
         public Flux<MeasurementValue> getValue(MeasurementParameter parameter) {
-            return Mono.justOrEmpty(parameter.getString("id"))
+            return Mono.justOrEmpty(parameter.getString("deviceId"))
                 .flatMapMany(deviceId -> {
                     int history = parameter.getInt("history").orElse(0);
                     return  //合并历史数据和实时数据

+ 3 - 3
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/status/DeviceStatusChangeMeasurement.java

@@ -39,7 +39,7 @@ class DeviceStatusChangeMeasurement extends StaticMeasurement {
     static MeasurementDefinition definition = MeasurementDefinition.of("change", "设备状态变更");
 
     static ConfigMetadata configMetadata = new DefaultConfigMetadata()
-        .add("id", "设备", "指定设备", new StringType().expand("selector", "device-selector"));
+        .add("deviceId", "设备", "指定设备", new StringType().expand("selector", "device-selector"));
 
     static DataType type = new EnumType()
         .addElement(EnumType.Element.of(MessageType.OFFLINE.name().toLowerCase(), "离线"))
@@ -148,7 +148,7 @@ class DeviceStatusChangeMeasurement extends StaticMeasurement {
         public Flux<MeasurementValue> getValue(MeasurementParameter parameter) {
 
 
-            return Mono.justOrEmpty(parameter.getString("id"))
+            return Mono.justOrEmpty(parameter.getString("deviceId"))
                 .flatMapMany(deviceId ->//从消息网关订阅消息
                     eventBus.subscribe(Subscription.of(
                         "RealTimeDeviceStateDimension"
@@ -165,7 +165,7 @@ class DeviceStatusChangeMeasurement extends StaticMeasurement {
         Map<String, Object> createStateValue(DeviceMessage message) {
             Map<String, Object> val = new HashMap<>();
             val.put("type", message.getMessageType().name().toLowerCase());
-            val.put("id", message.getDeviceId());
+            val.put("deviceId", message.getDeviceId());
             return val;
         }
     }

+ 5 - 5
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/topic/DeviceTopicMeasurement.java

@@ -75,9 +75,9 @@ public class DeviceTopicMeasurement extends StaticMeasurement {
         @Override
         public Flux<MeasurementValue> getValue(MeasurementParameter parameter) {
 
-           return  Flux.interval(Duration.ofSeconds(2))
+            return  Flux.interval(Duration.ofSeconds(2))
                 .map(t-> deviceRegistry.
-                    getDevice(String.valueOf( parameter.get("id").orElse("null-")))
+                    getDevice(String.valueOf( parameter.get("deviceId").orElse("null-")))
                     .map(DeviceOperator::getTopics))
                 .flatMap(Function.identity())
                 .flatMap(data -> Flux.just(SimpleMeasurementValue.of(
@@ -87,11 +87,11 @@ public class DeviceTopicMeasurement extends StaticMeasurement {
 
 //            return eventBus
 //                .subscribe(Subscription.of("real-time-device-topics",
-//                    String.format("/dashboard/device/%s/changed/topics",parameter.get("id").orElse("null-")), Subscription.Feature.local))
+//                    String.format("/dashboard/device/%s/changed/topics",parameter.get("deviceId").orElse("null-")), Subscription.Feature.local))
 //                .doOnNext(TopicPayload::release)
 //                .map(data->{
-//                    String id=String.valueOf( parameter.get("id").orElse("null-"));
-//                    return deviceRegistry.getDevice(id)
+//                    String deviceId=String.valueOf( parameter.get("deviceId").orElse("null-"));
+//                    return deviceRegistry.getDevice(deviceId)
 //                        .map(DeviceOperator::getTopics);
 //                })
 //                .flatMap(Function.identity())

+ 1 - 1
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceCurrentStateSubscriptionProvider.java

@@ -38,7 +38,7 @@ public class DeviceCurrentStateSubscriptionProvider implements SubscriptionProvi
     @Override
     @SuppressWarnings("all")
     public Flux<Map<String, Object>> subscribe(SubscribeRequest request) {
-        List<String> deviceId = request.get("id")
+        List<String> deviceId = request.get("deviceId")
             .map(List.class::cast)
             .orElseThrow(() -> new IllegalArgumentException("deviceId不能为空"));
 

+ 3 - 3
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageSendSubscriptionProvider.java

@@ -51,8 +51,8 @@ public class DeviceMessageSendSubscriptionProvider implements SubscriptionProvid
 
         String topic = request.getTopic();
 
-        Map<String, String> variables = TopicUtils.getPathVariables("/device-message-sender/{productId}/{id}", topic);
-        String deviceId = variables.get("id");
+        Map<String, String> variables = TopicUtils.getPathVariables("/device-message-sender/{productId}/{deviceId}", topic);
+        String deviceId = variables.get("deviceId");
         String productId = variables.get("productId");
 
         //发给所有设备
@@ -72,7 +72,7 @@ public class DeviceMessageSendSubscriptionProvider implements SubscriptionProvid
 
     public Flux<Message> doSend(String requestId, String topic, String deviceId, Map<String, Object> message) {
         message.put("messageId", IDGenerator.SNOW_FLAKE_STRING.generate());
-        message.put("id", deviceId);
+        message.put("deviceId", deviceId);
 
         RepayableDeviceMessage<?> msg = MessageType.convertMessage(message)
             .filter(RepayableDeviceMessage.class::isInstance)

+ 11 - 11
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/AbstractDeviceDataStoragePolicy.java

@@ -85,7 +85,7 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
     protected abstract Mono<Void> doSaveData(String metric, Flux<TimeSeriesData> data);
 
     /**
-     * 设备消息转换 二元组 {id, tsData}
+     * 设备消息转换 二元组 {deviceId, tsData}
      *
      * @param productId  产品ID
      * @param message    设备属性消息
@@ -160,7 +160,7 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
     }
 
     /**
-     * 设备消息转换成时序数据 二元组 {id, tsData}
+     * 设备消息转换成时序数据 二元组 {deviceId, tsData}
      *
      * @param message 设备消息
      * @return 二元组
@@ -208,7 +208,7 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
     }
 
     /**
-     * 事件消息转换成 二元组{id, tsData}
+     * 事件消息转换成 二元组{deviceId, tsData}
      *
      * @param productId 产品ID
      * @param message   事件消息
@@ -240,7 +240,7 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
                         data.put("value", tempValue);
                     }
                     data.put("id", createDataId(message));
-                    data.put("id", device.getDeviceId());
+                    data.put("deviceId", device.getDeviceId());
                     data.put("createTime", System.currentTimeMillis());
 
                     return TimeSeriesData.of(TimestampUtils.toMillis(message.getTimestamp()), data);
@@ -255,7 +255,7 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
             .flatMap(operator -> operator.getSelfConfig(DeviceConfigKey.productId))
             .flatMap(productId -> this
                 .doQueryPager(deviceLogMetricId(productId),
-                    entity.and("id", TermType.eq, deviceId),
+                    entity.and("deviceId", TermType.eq, deviceId),
                     data -> data.as(DeviceOperationLogEntity.class)
                 ))
             .defaultIfEmpty(PagerResult.empty());
@@ -273,7 +273,7 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
             .flatMap(device -> Mono.zip(device.getProduct(), device.getMetadata()))
             .flatMapMany(tp2 -> query
                 .toQuery()
-                .where("id", deviceId)
+                .where("deviceId", deviceId)
                 .execute(param -> this
                     .doQuery(deviceEventMetricId(tp2.getT1().getId(), event),
                         param,
@@ -298,7 +298,7 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
             .log()
             .flatMap(device -> Mono.zip(device.getProduct(), device.getMetadata()))
             .flatMap(tp2 -> query.toQuery()
-                .where("id", deviceId)
+                .where("deviceId", deviceId)
                 .execute(param -> this
                     .doQueryPager(deviceEventMetricId(tp2.getT1().getId(), event),
                         param,
@@ -311,7 +311,7 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
                             return deviceEvent;
                         }))
             );
-}
+    }
 
     protected Flux<DeviceProperty> rowToProperty(TimeSeriesData row, Collection<PropertyMetadata> properties) {
         return Flux
@@ -387,7 +387,7 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
                             if (newData.isEmpty()) {
                                 return Mono.empty();
                             }
-                            newData.put("id", message.getDeviceId());
+                            newData.put("deviceId", message.getDeviceId());
                             newData.put("productId", productId);
                             newData.put("timestamp", TimestampUtils.toMillis(message.getTimestamp()));
                             newData.put("createTime", System.currentTimeMillis());
@@ -404,7 +404,7 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
     }
 
     /**
-     * 设备消息转换 二元组{id, tsData}
+     * 设备消息转换 二元组{deviceId, tsData}
      *
      * @param productId  产品ID
      * @param message    设备属性消息
@@ -465,7 +465,7 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
                                                         Object value) {
         Map<String, Object> propertyData = newMap(24);
         propertyData.put("id", DigestUtils.md5Hex(id));
-        propertyData.put("id", deviceId);
+        propertyData.put("deviceId", deviceId);
         propertyData.put("timestamp", timestamp);
         propertyData.put("property", property.getId());
         propertyData.put("createTime", System.currentTimeMillis());

+ 5 - 5
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesColumnDeviceDataStoragePolicy.java

@@ -88,7 +88,7 @@ public class TimeSeriesColumnDeviceDataStoragePolicy extends TimeSeriesDeviceDat
         return param
             .toQuery()
             .includes(property.keySet().toArray(new String[0]))
-            .where("id", deviceId)
+            .where("deviceId", deviceId)
             .execute(q -> timeSeriesManager.getService(getPropertyTimeSeriesMetric(productId)).query(q))
             .flatMap(data -> rowToProperty(data, property.values()));
     }
@@ -129,7 +129,7 @@ public class TimeSeriesColumnDeviceDataStoragePolicy extends TimeSeriesDeviceDat
                     return param
                         .toQuery()
                         .includes(property)
-                        .where("id", deviceId)
+                        .where("deviceId", deviceId)
                         .execute(query -> timeSeriesManager
                             .getService(devicePropertyMetric(tp2.getT1().getId()))
                             .queryPager(query,
@@ -162,7 +162,7 @@ public class TimeSeriesColumnDeviceDataStoragePolicy extends TimeSeriesDeviceDat
 
                     return query
                         .toQuery()
-                        .where("id", deviceId)
+                        .where("deviceId", deviceId)
                         .includes(property)
                         .execute(timeSeriesManager.getService(getPropertyTimeSeriesMetric(tp2.getT1().getId()))::query)
                         .flatMap(data -> Flux
@@ -239,7 +239,7 @@ public class TimeSeriesColumnDeviceDataStoragePolicy extends TimeSeriesDeviceDat
                                                                @Nonnull DeviceDataService.AggregationRequest request,
                                                                @Nonnull DeviceDataService.DevicePropertyAggregation... properties) {
 
-        request.filter.and("id", "eq", deviceId);
+        request.filter.and("deviceId", "eq", deviceId);
 
         return deviceRegistry
             .getDevice(deviceId)
@@ -249,7 +249,7 @@ public class TimeSeriesColumnDeviceDataStoragePolicy extends TimeSeriesDeviceDat
     }
 
     /**
-     * 设备消息转换 二元组{id, tsData}
+     * 设备消息转换 二元组{deviceId, tsData}
      *
      * @param productId  产品ID
      * @param message    设备属性消息

+ 7 - 7
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesRowDeviceDataStoreStoragePolicy.java

@@ -87,7 +87,7 @@ public class TimeSeriesRowDeviceDataStoreStoragePolicy extends TimeSeriesDeviceD
         if (property.size() == 1) {
             return param
                 .toQuery()
-                .where("id", deviceId)
+                .where("deviceId", deviceId)
                 .and("property", property.keySet().iterator().next())
                 .execute(timeSeriesManager.getService(devicePropertyMetric(productId))::query)
                 .map(data ->
@@ -105,7 +105,7 @@ public class TimeSeriesRowDeviceDataStoreStoragePolicy extends TimeSeriesDeviceD
                 .groupBy(new LimitGroup("property", "property", property.size() * 2)) //按property分组
                 .limit(property.size())
                 .filter(param)
-                .filter(query -> query.where("id", deviceId))
+                .filter(query -> query.where("deviceId", deviceId))
             ).map(data -> DeviceProperty
                 .of(data, data.getString("property").map(property::get).orElse(null))
                 .deviceId(deviceId));
@@ -141,7 +141,7 @@ public class TimeSeriesRowDeviceDataStoreStoragePolicy extends TimeSeriesDeviceD
             .flatMap(device -> Mono.zip(device.getProduct(), device.getMetadata()))
             .flatMap(tp2 -> param.toQuery()
                 .where("property", property)
-                .and("id", deviceId)
+                .and("deviceId", deviceId)
                 .execute(query -> timeSeriesManager
                     .getService(devicePropertyMetric(tp2.getT1().getId()))
                     .queryPager(query, data -> DeviceProperty.of(data, tp2.getT2().getPropertyOrNull(property)))));
@@ -164,7 +164,7 @@ public class TimeSeriesRowDeviceDataStoreStoragePolicy extends TimeSeriesDeviceD
                         .collect(Collectors.toMap(PropertyMetadata::getId, Function.identity(), (a, b) -> a));
 
                     return query.toQuery()
-                        .where("id", deviceId)
+                        .where("deviceId", deviceId)
                         .when(property.length > 0, q -> q.in("property", Arrays.asList(property)))
                         .execute(timeSeriesManager
                             .getService(DeviceTimeSeriesMetric.devicePropertyMetricId(tp2.getT1().getId()))::query)
@@ -194,7 +194,7 @@ public class TimeSeriesRowDeviceDataStoreStoragePolicy extends TimeSeriesDeviceD
                         .agg(new LimitAggregationColumn("property", "property", Aggregation.TOP, query.getPageSize()))
                         .groupBy(new LimitGroup("property", "property", propertiesMap.size() * 2)) //按property分组
                         .filter(query)
-                        .filter(q -> q.where("id", deviceId).in("property", propertiesMap.keySet()))
+                        .filter(q -> q.where("deviceId", deviceId).in("property", propertiesMap.keySet()))
                     ).map(data -> DeviceProperty
                         .of(data, data.getString("property").map(propertiesMap::get).orElse(null))
                         .deviceId(deviceId));
@@ -282,7 +282,7 @@ public class TimeSeriesRowDeviceDataStoreStoragePolicy extends TimeSeriesDeviceD
                                                                @Nonnull DeviceDataService.AggregationRequest request,
                                                                @Nonnull DeviceDataService.DevicePropertyAggregation... properties) {
 
-        request.filter.and("id", "eq", deviceId);
+        request.filter.and("deviceId", "eq", deviceId);
 
         return deviceRegistry
             .getDevice(deviceId)
@@ -292,7 +292,7 @@ public class TimeSeriesRowDeviceDataStoreStoragePolicy extends TimeSeriesDeviceD
     }
 
     /**
-     * 设备消息转换 二元组{id, tsData}
+     * 设备消息转换 二元组{deviceId, tsData}
      *
      * @param productId  产品ID
      * @param message    设备属性消息

+ 1 - 1
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/timeseries/DeviceLogTimeSeriesMetadata.java

@@ -47,7 +47,7 @@ class DeviceLogTimeSeriesMetadata implements TimeSeriesMetadata {
 
         {
             SimplePropertyMetadata property = new SimplePropertyMetadata();
-            property.setId("id");
+            property.setId("deviceId");
             property.setValueType(new StringType());
             property.setName("设备ID");
             metadata.add(property);

+ 1 - 1
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/timeseries/DevicePropertiesTimeSeriesMetadata.java

@@ -79,7 +79,7 @@ class DevicePropertiesTimeSeriesMetadata implements TimeSeriesMetadata {
 
         {
             SimplePropertyMetadata property = new SimplePropertyMetadata();
-            property.setId("id");
+            property.setId("deviceId");
             property.setValueType(new StringType());
             property.setName("设备ID");
             metadata.add(property);

+ 1 - 1
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/timeseries/FixedPropertiesTimeSeriesMetadata.java

@@ -36,7 +36,7 @@ class FixedPropertiesTimeSeriesMetadata implements TimeSeriesMetadata {
 
         {
             SimplePropertyMetadata property = new SimplePropertyMetadata();
-            property.setId("id");
+            property.setId("deviceId");
             property.setValueType(new StringType());
             property.setName("设备ID");
             metadata.add(property);

+ 4 - 4
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/DeviceInstanceController.java

@@ -393,7 +393,7 @@ public class DeviceInstanceController implements
     /**
      * 获取设备全部标签
      * <pre>
-     *     GET /device/instance/{id}/tags
+     *     GET /device/instance/{deviceId}/tags
      *
      *     [
      *      {
@@ -653,7 +653,7 @@ public class DeviceInstanceController implements
             .map(DeviceInstanceEntity::getShadow)
             .defaultIfEmpty(new DeviceShadowEntity());
 //        return registry
-//            .getDevice(id)
+//            .getDevice(deviceId)
 //            .flatMap(operator -> operator.getSelfConfig(DeviceConfigKey.shadow))
 //            .defaultIfEmpty("{\n}");
     }
@@ -721,7 +721,7 @@ public class DeviceInstanceController implements
                         DeviceMessage message = tp2.getT2();
 
                         Map<String, String> copy = new HashMap<>();
-                        copy.put("id", deviceId);
+                        copy.put("deviceId", deviceId);
                         if (!StringUtils.hasText(message.getMessageId())) {
                             copy.put("messageId", IDGenerator.SNOW_FLAKE_STRING.generate());
                         }
@@ -777,7 +777,7 @@ public class DeviceInstanceController implements
                 return devices
                     .flatMap(device -> {
                         Map<String, Object> copy = new HashMap<>(message);
-                        copy.put("id", device.getDeviceId());
+                        copy.put("deviceId", device.getDeviceId());
                         copy.putIfAbsent("messageId", IDGenerator.SNOW_FLAKE_STRING.generate());
                         //复制为新的消息,防止冲突
                         DeviceMessage copiedMessage = MessageType

+ 1 - 1
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/config/UserSetup.java

@@ -13,7 +13,7 @@ public class UserSetup {
 
     private Boolean savePositionHistory = Boolean.FALSE;
 
-    private Boolean autoApplyPlay = Boolean.FALSE;
+    private Boolean autoApplyPlay = Boolean.TRUE;
 
     private Boolean seniorSdp = Boolean.FALSE;
 

+ 4 - 0
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/contanst/VideoManagerConstants.java

@@ -33,6 +33,10 @@ public class VideoManagerConstants {
 	public static final String KEEPLIVEKEY_PREFIX = "VMP_KEEPALIVE_";
 
 	// 此处多了一个_,暂不修改
+    /**
+     * 发起点播时,将点播设备信息缓存
+     */
+    public static final String PLAY_REQUEST_PREFIX = "VMP_PLAY_REQUEST_";
     /**
      * 点播流前缀
      */

+ 93 - 115
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/controller/PlayController.java

@@ -1,81 +1,57 @@
 package org.jetlinks.community.media.controller;
 
 
+import cn.hutool.core.lang.UUID;
+import cn.hutool.json.JSONObject;
 import io.swagger.v3.oas.annotations.Operation;
 import io.swagger.v3.oas.annotations.tags.Tag;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.lucene.search.BooleanClause;
 import org.hswebframework.web.authorization.annotation.Authorize;
 import org.hswebframework.web.authorization.annotation.QueryAction;
 import org.hswebframework.web.authorization.annotation.Resource;
 import org.hswebframework.web.exception.BusinessException;
+import org.jetlinks.community.media.bean.StreamInfo;
 import org.jetlinks.community.media.gb28181.result.PlayResult;
 import org.jetlinks.community.media.gb28181.result.WVPResult;
+import org.jetlinks.community.media.service.LocalMediaDeviceChannelService;
 import org.jetlinks.community.media.service.LocalMediaDeviceService;
+import org.jetlinks.community.media.service.LocalMediaServerItemService;
 import org.jetlinks.community.media.service.LocalPlayService;
-import org.jetlinks.core.event.EventBus;
-import org.jetlinks.core.event.Subscription;
-import org.jetlinks.core.event.TopicPayload;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.HttpStatus;
+import org.jetlinks.community.media.storage.impl.RedisCacheStorageImpl;
+import org.jetlinks.community.media.transmit.callback.DeferredResultHolder;
+import org.jetlinks.community.media.transmit.callback.RequestMessage;
+import org.jetlinks.community.media.transmit.cmd.SipCommander;
 import org.springframework.http.ResponseEntity;
 import org.springframework.web.bind.annotation.*;
 import org.springframework.web.context.request.async.DeferredResult;
-import reactor.core.publisher.EmitterProcessor;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.FluxSink;
 import reactor.core.publisher.Mono;
-
-import java.time.Duration;
-import java.util.Arrays;
+import reactor.util.function.Tuple2;
 import java.util.concurrent.*;
-import java.util.function.Consumer;
-import java.util.function.Function;
 
 
 @RestController
-@RequestMapping("/device/directives")
+@RequestMapping("/media/play")
 @Slf4j
 @Authorize(ignore = true)
 @Resource(id="gb28181-play",name = "国标设备点播")
 @AllArgsConstructor
-@Tag(name = "设备下发指令")
+@Tag(name = "GB媒体设备操作")
 public class PlayController {
-    //
-//	private final static Logger logger = LoggerFactory.getLogger(com.genersoft.iot.vmp.vmanager.gb28181.play.PlayController.class);
-//
-//	@Autowired
-//	private SIPCommander cmder;
-//
-//	@Autowired
-//	private VideoStreamSessionManager streamSession;
-//
-//	@Autowired
-//	private IVideoManagerStorager storager;
-//
-//	@Autowired
-//	private IRedisCatchStorage redisCatchStorage;
-//
-//	@Autowired
-//	private ZLMRESTfulUtils zlmresTfulUtils;
-//
-//	@Autowired
-//	private DeferredResultHolder resultHolder;
-//
-//	@Autowired
-//	private IPlayService playService;
-//
-//	@Autowired
-//	private IMediaService mediaService;
-//
-//	@Autowired
-//	private IMediaServerService mediaServerService;
-//
+
+    private final SipCommander cmder;
+
+    private final RedisCacheStorageImpl redisCacheStorage;
+
+    private final LocalMediaDeviceChannelService deviceChannelService;
+
     private final LocalMediaDeviceService mediaDeviceService;
 
     private final LocalPlayService playService;
 
+    private final DeferredResultHolder resultHolder;
+
+    private final LocalMediaServerItemService mediaServerItemService;
     @QueryAction
     @Operation(summary = "设备点播")
     @GetMapping("/start/{deviceId}/{channelId}")
@@ -92,75 +68,77 @@ public class PlayController {
                 .flatMap(this::deferredResultHandler);
     }
 
-//
-//	@ApiOperation("停止点播")
-//	@ApiImplicitParams({
-//			@ApiImplicitParam(name = "deviceId", value = "设备ID", dataTypeClass = String.class),
-//			@ApiImplicitParam(name = "channelId", value = "通道ID", dataTypeClass = String.class),
-//	})
-//	@GetMapping("/stop/{deviceId}/{channelId}")
-//	public DeferredResult<ResponseEntity<String>> playStop(@PathVariable String deviceId, @PathVariable String channelId) {
-//
-//		logger.debug(String.format("设备预览/回放停止API调用,streamId:%s_%s", deviceId, channelId ));
-//
-//		String uuid = UUID.randomUUID().toString();
-//		DeferredResult<ResponseEntity<String>> result = new DeferredResult<ResponseEntity<String>>();
-//
-//		// 录像查询以channelId作为deviceId查询
-//		String key = DeferredResultHolder.CALLBACK_CMD_STOP + deviceId + channelId;
-//		resultHolder.put(key, uuid, result);
-//		Device device = storager.queryVideoDevice(deviceId);
-//		cmder.streamByeCmd(deviceId, channelId, (event) -> {
-//			StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
-//			if (streamInfo == null) {
-//				RequestMessage msg = new RequestMessage();
-//				msg.setId(uuid);
-//				msg.setKey(key);
-//				msg.setData("点播未找到");
-//				resultHolder.invokeAllResult(msg);
-//				storager.stopPlay(deviceId, channelId);
-//			}else {
-//				redisCatchStorage.stopPlay(streamInfo);
-//				storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
-//				RequestMessage msg = new RequestMessage();
-//				msg.setId(uuid);
-//				msg.setKey(key);
-//				//Response response = event.getResponse();
-//				msg.setData(String.format("success"));
-//				resultHolder.invokeAllResult(msg);
-//			}
-//			mediaServerService.closeRTPServer(device, channelId);
-//		});
-//
-//		if (deviceId != null || channelId != null) {
-//			JSONObject json = new JSONObject();
-//			json.put("deviceId", deviceId);
-//			json.put("channelId", channelId);
-//			RequestMessage msg = new RequestMessage();
-//			msg.setId(uuid);
-//			msg.setKey(key);
-//			msg.setData(json.toString());
-//			resultHolder.invokeAllResult(msg);
-//		} else {
-//			logger.warn("设备预览/回放停止API调用失败!");
-//			RequestMessage msg = new RequestMessage();
-//			msg.setId(uuid);
-//			msg.setKey(key);
-//			msg.setData("streamId null");
-//			resultHolder.invokeAllResult(msg);
-//		}
-//
-//		// 超时处理
-//		result.onTimeout(()->{
-//			logger.warn(String.format("设备预览/回放停止超时,deviceId/channelId:%s_%s ", deviceId, channelId));
-//			RequestMessage msg = new RequestMessage();
-//			msg.setId(uuid);
-//			msg.setKey(key);
-//			msg.setData("Timeout");
-//			resultHolder.invokeAllResult(msg);
-//		});
-//		return result;
-//	}
+
+    @QueryAction
+    @Operation(summary = "停止点播")
+    @GetMapping("/stop/{deviceId}/{channelId}")
+    public Mono<Object> playStop(@PathVariable String deviceId, @PathVariable String channelId) {
+        if(log.isDebugEnabled()){
+            log.debug(String.format("设备预览/回放停止API调用,streamId:%s_%s", deviceId, channelId ));
+        }
+
+        String uuid = UUID.randomUUID().toString();
+        DeferredResult<ResponseEntity<String>> result = new DeferredResult<>();
+        // 录像查询以channelId作为deviceId查询
+        String key = DeferredResultHolder.CALLBACK_CMD_STOP + deviceId + channelId;
+        resultHolder.put(key, uuid, result);
+        PlayResult playResult = new PlayResult();
+        playResult.setResult(result);
+        // 超时处理
+        playResult.onTimeout(Mono.fromRunnable(()->{
+            log.warn(String.format("设备预览/回放停止响应超时,deviceId/channelId:%s_%s ", deviceId, channelId));
+            RequestMessage msg = new RequestMessage();
+            msg.setId(uuid);
+            msg.setKey(key);
+            msg.setData("设备预览/回放停止响应超时");
+            resultHolder.invokeAllResult(msg);
+        }));
+
+        return Mono.justOrEmpty(redisCacheStorage.getDevice(deviceId))
+            .flatMap(device->
+                Mono.zip(cmder.streamByeCmd(deviceId, channelId, (event) -> {
+                    StreamInfo streamInfo = redisCacheStorage.queryPlayByDevice(deviceId, channelId);
+                    if (streamInfo == null) {
+                        RequestMessage msg = new RequestMessage();
+                        msg.setId(uuid);
+                        msg.setKey(key);
+                        msg.setData("点播未找到");
+                        resultHolder.invokeAllResult(msg);
+                        deviceChannelService.stopPlay(deviceId, channelId).subscribe();
+                    }else {
+                        redisCacheStorage.stopPlay(streamInfo);
+                        deviceChannelService.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId()).subscribe();
+                        RequestMessage msg = new RequestMessage();
+                        msg.setId(uuid);
+                        msg.setKey(key);
+                        msg.setData("success");
+                        resultHolder.invokeAllResult(msg);
+                    }
+                    mediaServerItemService.closeRTPServer(device, channelId);
+                })
+                    .mergeWith(Mono.fromRunnable(()->{
+                        if (deviceId != null || channelId != null) {
+                            JSONObject json = new JSONObject()
+                                .putOpt("deviceId", deviceId)
+                                .putOpt("channelId", channelId);
+                            RequestMessage msg = new RequestMessage();
+                            msg.setId(uuid);
+                            msg.setKey(key);
+                            msg.setData(json.toString());
+                            resultHolder.invokeAllResult(msg);
+                        } else {
+                            log.warn("设备预览/回放停止API调用失败!");
+                            RequestMessage msg = new RequestMessage();
+                            msg.setId(uuid);
+                            msg.setKey(key);
+                            msg.setData("streamId null");
+                            resultHolder.invokeAllResult(msg);
+                        }
+                    }))
+                    .then(),Mono.just(deferredResultHandler(playResult)))
+            )
+            .flatMap(Tuple2::getT2);
+    }
 //
 //	/**
 //	 * 将不是h264的视频通过ffmpeg 转码为h264 + aac

+ 1 - 1
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/entity/MediaDevice.java

@@ -167,7 +167,7 @@ public class MediaDevice extends GenericEntity<String> implements RecordCreation
         description = "注册有效期"
         ,accessMode = Schema.AccessMode.READ_ONLY
     )
-	private int expires;
+	private Integer expires;
 
 	/**
 	 * 设备使用的媒体id, 默认为null

+ 5 - 1
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/gb28181/bean/SsrcTransaction.java

@@ -1,6 +1,10 @@
 package org.jetlinks.community.media.gb28181.bean;
 
-public class SsrcTransaction {
+import java.io.Serializable;
+
+public class SsrcTransaction implements Serializable {
+
+    private static final long serialVersionUID = 1L;
 
     private String deviceId;
     private String channelId;

+ 7 - 10
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalMediaDeviceService.java

@@ -98,26 +98,21 @@ public class LocalMediaDeviceService extends GenericReactiveCrudService<MediaDev
 
     @Subscribe("/media/device/heart-beat/**")
     public void heartBeat(MediaDevice device){
-//        SipServerConfig config = SipContext.getConfig();
-//        String key = SipUtils.keepAliveKey(device.getId());
-        Mono<DeviceOperator> operatorMono =
-            registry
-                .getDevice(device.getId());
         //心跳过期 ,设备上线
         redisCacheStorage.updateDevice(device);
         DeviceOnlineMessage onlineMessage = new DeviceOnlineMessage();
         onlineMessage.setDeviceId(device.getId());
         onlineMessage.setTimestamp(System.currentTimeMillis());
         onlineMessage.setMessageId(IdUtils.newUUID());
-        operatorMono
-            .flatMap(operator -> messageHandler.handleMessage(operator,onlineMessage));
 
         //更新设备心跳时间
         device.setKeepaliveTime(DateUtil.now());
 
         device.setState(DeviceState.online);
         //未注册的设备不进行更新
-        operatorMono
+        registry
+            .getDevice(device.getId())
+            .flatMap(operator -> messageHandler.handleMessage(operator,onlineMessage))
             .flatMap(ignore->updateById(device.getId(),device))
             .subscribe();
     }
@@ -136,14 +131,16 @@ public class LocalMediaDeviceService extends GenericReactiveCrudService<MediaDev
     @Override
     public void run(String... args) {
         //todo 后期改为定时
-        List<MediaDevice> allOnlineDevice = redisCacheStorage.getAllOnlineDevice();
-        List<String> deviceIds = allOnlineDevice.stream().map(MediaDevice::getId).collect(Collectors.toList());
+        List<String> deviceIds = redisCacheStorage.getOnlineForAll();
         this.createQuery()
             .where(MediaDevice::getState,DeviceState.online)
             .notIn(MediaDevice::getId,deviceIds)
             .fetch()
             .doOnNext(this::unRegister)
             .concatWith(sipServerHelper.createSip( SipServerConfig.of("34020000002000000001","0.0.0.0", 7001,"udp","340200000","utf-8","12345678",10L,"1")).then(Mono.empty()))
+            .doOnError(e->{
+                e.printStackTrace();
+            })
             .subscribe();
     }
 }

+ 1 - 1
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalPlayService.java

@@ -240,7 +240,7 @@ public class LocalPlayService  {
         }
     }
 
-    private Mono<Void> onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid) {
+    public Mono<Void> onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid) {
         RequestMessage msg = new RequestMessage();
         msg.setId(uuid);
         msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId);

+ 4 - 0
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/SipRequestProcessorParent.java

@@ -174,6 +174,10 @@ public abstract class SipRequestProcessorParent extends AbstractSipProcessor {
         return response;
     }
 
+    public Element getRootElement(RequestEvent evt) throws DocumentException {
+        return getRootElement(evt,"utf-8");
+    }
+
     public Element getRootElement(RequestEvent evt, String charset) throws DocumentException {
         if (charset == null) {
             throw new UnknownError("SipRequestProcessorParent.getRootElement(),charset is null");

+ 69 - 0
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/SipRunner.java

@@ -0,0 +1,69 @@
+package org.jetlinks.community.media.sip;
+
+import cn.hutool.core.collection.CollectionUtil;
+import io.vertx.core.impl.ConcurrentHashSet;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.jetlinks.community.media.storage.impl.RedisCacheStorageImpl;
+import org.jetlinks.community.media.transmit.cmd.SipCommander;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.stereotype.Component;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+import javax.sip.ResponseEvent;
+import java.time.Duration;
+import java.util.Set;
+import java.util.function.BiConsumer;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName SipRunner.java
+ * @Description TODO
+ * @createTime 2022年02月18日 08:51:00
+ */
+@Component
+@AllArgsConstructor
+@Slf4j
+public class SipRunner implements CommandLineRunner {
+    private final RedisCacheStorageImpl redisCacheStorage;
+    private final SipCommander sipCommander;
+    private Set<String> subscribeDevice=new ConcurrentHashSet<>();
+    private final static BiConsumer<Throwable, Object> doOnError = (error, val) ->
+        log.error(error.getMessage(), error);
+    @Override
+    public void run(String... args) {
+        redisCacheStorage.resetAllCSEQ();
+        //每隔30S发起一次订阅 todo 集群
+        Flux.interval(Duration.ofSeconds(10))
+            //获取所有在线设备信息
+            .flatMap(ignore->Mono.justOrEmpty(redisCacheStorage.getOnlineForAll()))
+            .filter(CollectionUtil::isNotEmpty)
+            .flatMap(deviceIds-> Flux.fromStream(deviceIds.stream()))
+            .publishOn(Schedulers.boundedElastic())
+            .flatMap(deviceId->Mono.justOrEmpty(redisCacheStorage.getDevice(deviceId)))
+            //设备发起订阅且不可重复订阅
+//            .filter(device -> device.getSubscribeCycleForCatalog()>0&&!subscribeDevice.contains(device.getId()))
+            .filter(device -> !subscribeDevice.contains(device.getId()))
+            .flatMap(device ->   sipCommander.catalogSubscribe(device, eventResult -> {
+                ResponseEvent event = (ResponseEvent) eventResult.event;
+                if (event.getResponse().getRawContent() != null) {
+                    // 成功
+                    log.info("[目录订阅]成功: {}", device.getId());
+                }else {
+                    // 成功
+                    log.info("[目录订阅]成功: {}", device.getId());
+                }
+                subscribeDevice.add(device.getId());
+            },eventResult -> {
+                // 失败
+                log.warn("[目录订阅]失败,信令发送失败: {}-{} ", device.getId(), eventResult.msg);
+            }))
+            .doOnError(e->{
+                e.printStackTrace();
+            })
+            .onErrorContinue(doOnError)
+            .subscribe();
+    }
+}

+ 16 - 16
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/processor/SipProcessorObserver.java

@@ -1,6 +1,7 @@
 package org.jetlinks.community.media.sip.processor;
 
 import lombok.extern.slf4j.Slf4j;
+import org.jetlinks.community.media.gb28181.event.SipSubscribe;
 import org.jetlinks.core.event.EventBus;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
@@ -8,6 +9,8 @@ import reactor.core.publisher.EmitterProcessor;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.FluxSink;
 import javax.sip.*;
+import javax.sip.header.CSeqHeader;
+import javax.sip.header.CallIdHeader;
 import javax.sip.message.Response;
 import java.util.function.Function;
 
@@ -20,6 +23,8 @@ import java.util.function.Function;
 @Slf4j
 public class SipProcessorObserver implements ISipProcessObserver {
 
+    @Autowired
+    private SipSubscribe sipSubscribe;
 
     private EmitterProcessor<RequestEvent> requestProcessor= EmitterProcessor.create();
     private FluxSink<RequestEvent> requestSink=requestProcessor.sink();
@@ -39,9 +44,6 @@ public class SipProcessorObserver implements ISipProcessObserver {
     private EmitterProcessor<DialogTerminatedEvent> dialogTerminatedProcessor=EmitterProcessor.create();
     private FluxSink<DialogTerminatedEvent> dialogTerminatedSink=dialogTerminatedProcessor.sink();
 
-    @Autowired
-    private EventBus eventBus;
-
 
     /**
      * 分发RequestEvent事件
@@ -64,22 +66,20 @@ public class SipProcessorObserver implements ISipProcessObserver {
         log.debug(responseEvent.getResponse().toString());
         int status = response.getStatusCode();
         if (((status >= 200) && (status < 300)) || status == 401) { // Success!
-//            CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME);
-//            String method = cseqHeader.getMethod();
             if(responseProcessor.hasDownstreams()){
                 responseSink.next(responseEvent);
             }
-//            if (responseEvent.getResponse() != null && sipSubscribe.getOkSubscribesSize() > 0 ) {
-//                CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME);
-//                if (callIdHeader != null) {
-//                    SipSubscribe.Event subscribe = sipSubscribe.getOkSubscribe(callIdHeader.getCallId());
-//                    if (subscribe != null) {
-//                        SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent);
-//                        subscribe.response(eventResult);
-//                        sipSubscribe.removeOkSubscribe(callIdHeader.getCallId());
-//                    }
-//                }
-//            }
+            if (responseEvent.getResponse() != null && sipSubscribe.getOkSubscribesSize() > 0 ) {
+                CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME);
+                if (callIdHeader != null) {
+                    SipSubscribe.Event subscribe = sipSubscribe.getOkSubscribe(callIdHeader.getCallId());
+                    if (subscribe != null) {
+                        SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent);
+                        subscribe.accept(eventResult);
+                        sipSubscribe.removeOkSubscribe(callIdHeader.getCallId());
+                    }
+                }
+            }
         } else if ((status >= 100) && (status < 200)) {
             // 增加其它无需回复的响应,如101、180等
         } else {

+ 0 - 99
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/impl/MessageRequestProcessor.java

@@ -1,99 +0,0 @@
-//package org.jetlinks.community.media.event.message;
-//
-//import org.dom4j.DocumentException;
-//import org.dom4j.Element;
-//import org.jetlinks.community.media.entity.MediaDevice;
-//import org.jetlinks.community.media.bean.ParentPlatform;
-//import org.jetlinks.community.media.sip.request.IMessageHandler;
-//import org.jetlinks.community.media.sip.request.ISipRequestProcessor;
-//import org.jetlinks.community.media.sip.SipRequestProcessorParent;
-//import org.jetlinks.community.media.sip.processor.SipProcessorObserver;
-//import org.jetlinks.community.utils.SipUtils;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
-//import org.springframework.beans.factory.annotation.Autowired;
-//import org.springframework.stereotype.Component;
-//
-//import javax.sip.InvalidArgumentException;
-//import javax.sip.RequestEvent;
-//import javax.sip.SipException;
-//import javax.sip.header.CallIdHeader;
-//import javax.sip.message.Request;
-//import javax.sip.message.Response;
-//import java.text.ParseException;
-//import java.util.Map;
-//import java.util.concurrent.ConcurrentHashMap;
-//
-//@Component
-//public class MessageRequestProcessor extends SipRequestProcessorParent {
-//
-//    private final static Logger logger = LoggerFactory.getLogger(MessageRequestProcessor.class);
-//
-//    private static Map<String, IMessageHandler> messageHandlerMap = new ConcurrentHashMap<>();
-//
-//    @Autowired
-//    private SipProcessorObserver sipProcessorObserver;
-//
-//
-////    @Autowired
-////    private IVideoManagerStorager storage;
-////
-////    @Autowired
-////    private SipSubscribe sipSubscribe;
-//
-////    @Autowired
-////    private IRedisCatchStorage redisCatchStorage;
-//
-//
-//    @Override
-//    public String getMethod() {
-//        return Request.MESSAGE;
-//    }
-//
-//    @Override
-//    public void processRequest(RequestEvent evt) {
-//        logger.debug("接收到消息:" + evt.getRequest());
-//        String id = SipUtils.getUserIdFromFromHeader(evt.getRequest());
-//        CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME);
-//        // 查询设备是否存在
-//        MediaDevice device = redisCatchStorage.getDevice(id);
-//        // 查询上级平台是否存在
-//        ParentPlatform parentPlatform = storage.queryParentPlatByServerGBId(id);
-//        try {
-//            if (device == null && parentPlatform == null) {
-//                // 不存在则回复404
-//                responseAck(evt, Response.NOT_FOUND, "device "+ id +" not found");
-//                logger.warn("[设备未找到 ]: {}", id);
-//                if (sipSubscribe.getErrorSubscribe(callIdHeader.getCallId()) != null){
-//                    SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(new DeviceNotFoundEvent(evt.getDialog()));
-//                    sipSubscribe.getErrorSubscribe(callIdHeader.getCallId()).response(eventResult);
-//                };
-//            }else {
-//                Element rootElement = getRootElement(evt);
-//                String name = rootElement.getName();
-//                IMessageHandler messageHandler = messageHandlerMap.get(name);
-//                if (messageHandler != null) {
-//                    if (device != null) {
-//                        messageHandler.handleForDevice(evt, device, rootElement);
-//                    }else { // 由于上面已经判断都为null则直接返回,所以这里device和parentPlatform必有一个不为null
-//                        messageHandler.handleForPlatform(evt, parentPlatform, rootElement);
-//                    }
-//                }else {
-//                    // 不支持的message
-//                    // 不存在则回复415
-//                    responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE, "Unsupported message type, must Control/Notify/Query/Response");
-//                }
-//            }
-//        } catch (SipException e) {
-//            logger.warn("SIP 回复错误", e);
-//        } catch (InvalidArgumentException e) {
-//            logger.warn("参数无效", e);
-//        } catch (ParseException e) {
-//            logger.warn("SIP回复时解析异常", e);
-//        } catch (DocumentException e) {
-//            logger.warn("解析XML消息内容异常", e);
-//        }
-//    }
-//
-//
-//}

+ 0 - 13
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/impl/RegisterRequestProcessor.java

@@ -39,23 +39,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
 @Slf4j
 public class RegisterRequestProcessor extends SipRequestProcessorParent {
 
-//	@Autowired
-//	private RegisterLogicHandler handler;
-//
-//	@Autowired
-//	private IRedisCatchStorage redisCatchStorage;
-//
-//	@Autowired
-//	private IVideoManagerStorager storager;
-
-
     @Autowired
     private RedisCacheStorageImpl redisCacheStorage;
     @Autowired
     private EventBus eventBus;
-
-    @Autowired
-    private RedissonClient redissonClient;
     @Autowired
     public RegisterRequestProcessor(EventBus eventBus) {
         this.eventBus = eventBus;

+ 183 - 0
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/sip/request/impl/SubscribeRequestProcessor.java

@@ -0,0 +1,183 @@
+//package org.jetlinks.community.media.sip.request.impl;
+//
+//import lombok.AllArgsConstructor;
+//import lombok.extern.slf4j.Slf4j;
+//import org.dom4j.DocumentException;
+//import org.dom4j.Element;
+//import org.jetlinks.community.media.config.UserSetup;
+//import org.jetlinks.community.media.contanst.CmdType;
+//import org.jetlinks.community.media.contanst.VideoManagerConstants;
+//import org.jetlinks.community.media.sip.SipRequestProcessorParent;
+//import org.jetlinks.community.media.storage.impl.RedisCacheStorageImpl;
+//import org.jetlinks.community.utils.SipUtils;
+//import org.jetlinks.community.utils.XmlUtil;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+//import org.springframework.beans.factory.InitializingBean;
+//import org.springframework.beans.factory.annotation.Autowired;
+//import org.springframework.stereotype.Component;
+//
+//import javax.sip.InvalidArgumentException;
+//import javax.sip.RequestEvent;
+//import javax.sip.ServerTransaction;
+//import javax.sip.SipException;
+//import javax.sip.header.ExpiresHeader;
+//import javax.sip.header.ToHeader;
+//import javax.sip.message.Request;
+//import javax.sip.message.Response;
+//import java.text.ParseException;
+//import java.util.Optional;
+//
+///**
+// * SIP命令类型: SUBSCRIBE请求
+// */
+//@Component
+//@Slf4j
+//public class SubscribeRequestProcessor  extends SipRequestProcessorParent {
+//
+//
+//    @Autowired
+//    private RedisCacheStorageImpl redisCatchStorage;
+//
+//
+//    @Autowired
+//    private UserSetup userSetup;
+//
+//
+//    /**
+//     * 处理SUBSCRIBE请求
+//     *
+//     * @param evt
+//     */
+//    @Override
+//    public void processRequest(RequestEvent evt) {
+//        Request request = evt.getRequest();
+//        try {
+//            Element rootElement = getRootElement(evt);
+//            String cmd = XmlUtil.getText(rootElement, "CmdType");
+//            if (CmdType.MOBILE_POSITION.equals(cmd)) {
+//                processNotifyMobilePosition(evt, rootElement);
+//            } else if (CmdType.ALARM.equals(cmd)) {
+//                log.info("接收到Alarm订阅");
+//                processNotifyAlarm(evt, rootElement);
+//            } else if (CmdType.CATALOG.equals(cmd)) {
+//                processNotifyCatalogList(evt, rootElement);
+//            } else {
+//                log.info("接收到消息:" + cmd);
+//                Response response = null;
+//                response = getMessageFactory().createResponse(200, request);
+//                if (response != null) {
+//                    ExpiresHeader expireHeader = getHeaderFactory().createExpiresHeader(30);
+//                    response.setExpires(expireHeader);
+//                }
+//                log.info("response : " + response);
+//                ServerTransaction transaction = getServerTransaction(evt);
+//                if (transaction != null) {
+//                    transaction.sendResponse(response);
+//                    transaction.getDialog().delete();
+//                    transaction.terminate();
+//                } else {
+//                    log.info("processRequest serverTransactionId is null.");
+//                }
+//            }
+//        } catch (Exception e) {
+//            log.error("SIP订阅发生错误,",e);
+//        }
+//
+//    }
+//
+//    /**
+//     * 处理移动位置订阅消息
+//     */
+//    private void processNotifyMobilePosition(RequestEvent evt, Element rootElement) {
+//        String platformId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
+//        String deviceID = XmlUtil.getText(rootElement, "DeviceID");
+//        SubscribeInfo subscribeInfo = new SubscribeInfo(evt, platformId);
+//        String sn = XmlUtil.getText(rootElement, "SN");
+//        String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() +  "_MobilePosition_" + platformId;
+//        log.info("接收到{}的MobilePosition订阅", platformId);
+//        StringBuilder resultXml = new StringBuilder(200);
+//        resultXml.append("<?xml version=\"1.0\" ?>\r\n")
+//            .append("<Response>\r\n")
+//            .append("<CmdType>MobilePosition</CmdType>\r\n")
+//            .append("<SN>" + sn + "</SN>\r\n")
+//            .append("<DeviceID>" + deviceID + "</DeviceID>\r\n")
+//            .append("<Result>OK</Result>\r\n")
+//            .append("</Response>\r\n");
+//
+//        if (subscribeInfo.getExpires() > 0) {
+//            if (redisCatchStorage.getSubscribe(key) != null) {
+//                dynamicTask.stopCron(key);
+//            }
+//            String interval = XmlUtil.getText(rootElement, "Interval"); // GPS上报时间间隔
+//            dynamicTask.startCron(key, new GPSSubscribeTask(redisCatchStorage, sipCommanderForPlatform, storager,  platformId, sn, key), Integer.parseInt(interval));
+//
+//            redisCatchStorage.updateSubscribe(key, subscribeInfo);
+//        }else if (subscribeInfo.getExpires() == 0) {
+//            dynamicTask.stopCron(key);
+//            redisCatchStorage.delSubscribe(key);
+//        }
+//
+//
+//
+//        try {
+//            Response response = responseXmlAck(evt, resultXml.toString());
+//            ToHeader toHeader = (ToHeader)response.getHeader(ToHeader.NAME);
+//            subscribeInfo.setToTag(toHeader.getTag());
+//            redisCatchStorage.updateSubscribe(key, subscribeInfo);
+//
+//        } catch (SipException e) {
+//            e.printStackTrace();
+//        } catch (InvalidArgumentException e) {
+//            e.printStackTrace();
+//        } catch (ParseException e) {
+//            e.printStackTrace();
+//        }
+//    }
+//
+//    private void processNotifyAlarm(RequestEvent evt, Element rootElement) {
+//
+//    }
+//
+//    private void processNotifyCatalogList(RequestEvent evt, Element rootElement) {
+//        String platformId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
+//        String deviceID = XmlUtil.getText(rootElement, "DeviceID");
+//        SubscribeInfo subscribeInfo = new SubscribeInfo(evt, platformId);
+//        String sn = XmlUtil.getText(rootElement, "SN");
+//        String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() +  "_Catalog_" + platformId;
+//        log.info("接收到{}的Catalog订阅", platformId);
+//        StringBuilder resultXml = new StringBuilder(200);
+//        resultXml.append("<?xml version=\"1.0\" ?>\r\n")
+//            .append("<Response>\r\n")
+//            .append("<CmdType>Catalog</CmdType>\r\n")
+//            .append("<SN>" + sn + "</SN>\r\n")
+//            .append("<DeviceID>" + deviceID + "</DeviceID>\r\n")
+//            .append("<Result>OK</Result>\r\n")
+//            .append("</Response>\r\n");
+//
+//        if (subscribeInfo.getExpires() > 0) {
+//            redisCatchStorage.updateSubscribe(key, subscribeInfo);
+//        }else if (subscribeInfo.getExpires() == 0) {
+//            redisCatchStorage.delSubscribe(key);
+//        }
+//
+//        try {
+//            Response response = responseXmlAck(evt, resultXml.toString());
+//            ToHeader toHeader = (ToHeader)response.getHeader(ToHeader.NAME);
+//            subscribeInfo.setToTag(toHeader.getTag());
+//            redisCatchStorage.updateSubscribe(key, subscribeInfo);
+//
+//        } catch (SipException e) {
+//            e.printStackTrace();
+//        } catch (InvalidArgumentException e) {
+//            e.printStackTrace();
+//        } catch (ParseException e) {
+//            e.printStackTrace();
+//        }
+//    }
+//
+//    @Override
+//    public String getMethod() {
+//        return Request.SUBSCRIBE;
+//    }
+//}

+ 58 - 67
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/storage/impl/RedisCacheStorageImpl.java

@@ -1,5 +1,6 @@
 package org.jetlinks.community.media.storage.impl;
 
+import cn.hutool.core.collection.CollectionUtil;
 import cn.hutool.json.JSONObject;
 import cn.hutool.json.JSONUtil;
 import lombok.extern.slf4j.Slf4j;
@@ -10,6 +11,7 @@ import org.jetlinks.community.media.contanst.VideoManagerConstants;
 import org.jetlinks.community.media.entity.MediaDevice;
 import org.jetlinks.community.media.bean.ParentPlatform;
 import org.jetlinks.community.media.entity.MediaDeviceChannel;
+import org.jetlinks.community.media.sip.SipContext;
 import org.jetlinks.community.media.zlm.dto.MediaItem;
 import org.jetlinks.community.media.zlm.entity.MediaServerItem;
 import org.jetlinks.community.utils.RedisUtil;
@@ -17,10 +19,7 @@ import org.jetlinks.core.cluster.ClusterManager;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 @SuppressWarnings("rawtypes")
 @Component
@@ -97,7 +96,7 @@ public class RedisCacheStorageImpl {
      */
     public boolean startPlay(StreamInfo stream) {
         return redis.set(String.format("%S_%S_%s_%s_%s", VideoManagerConstants.PLAYER_PREFIX,serverId, stream.getStreamId(),stream.getDeviceID(), stream.getChannelId()),
-                stream);
+            stream);
     }
 
     /**
@@ -111,9 +110,9 @@ public class RedisCacheStorageImpl {
         }
         return redis.del(String.format("%S_%s_%s_%s_%s", VideoManagerConstants.PLAYER_PREFIX,
             serverId,
-                streamInfo.getStreamId(),
-                streamInfo.getDeviceID(),
-                streamInfo.getChannelId()));
+            streamInfo.getStreamId(),
+            streamInfo.getDeviceID(),
+            streamInfo.getChannelId()));
     }
 
     /**
@@ -123,11 +122,11 @@ public class RedisCacheStorageImpl {
 
     public StreamInfo queryPlay(StreamInfo streamInfo) {
         return (StreamInfo)redis.get(String.format("%S_%s_%s_%s_%s",
-                VideoManagerConstants.PLAYER_PREFIX,
-                serverId,
-                streamInfo.getStreamId(),
-                streamInfo.getDeviceID(),
-                streamInfo.getChannelId()));
+            VideoManagerConstants.PLAYER_PREFIX,
+            serverId,
+            streamInfo.getStreamId(),
+            streamInfo.getDeviceID(),
+            streamInfo.getChannelId()));
     }
 
     public StreamInfo queryPlayByStreamId(String streamId) {
@@ -151,8 +150,8 @@ public class RedisCacheStorageImpl {
     public StreamInfo queryPlayByDevice(String deviceId, String channelId) {
         List<Object> playLeys = redis.scan(String.format("%S_%s_*_%s_%s", VideoManagerConstants.PLAYER_PREFIX,
             serverId,
-                deviceId,
-                channelId));
+            deviceId,
+            channelId));
         if (playLeys == null || playLeys.size() == 0){
             return null;
         }
@@ -177,13 +176,13 @@ public class RedisCacheStorageImpl {
 
     public boolean startPlayback(StreamInfo stream) {
         return redis.set(String.format("%S_%s_%s_%s_%s", VideoManagerConstants.PLAY_BLACK_PREFIX,serverId,stream.getStreamId(),
-                        stream.getDeviceID(), stream.getChannelId()), stream);
+            stream.getDeviceID(), stream.getChannelId()), stream);
     }
 
 
     public boolean startDownload(StreamInfo streamInfo) {
         return redis.set(String.format("%S_%s_%s_%s_%s", VideoManagerConstants.DOWNLOAD_PREFIX, serverId,streamInfo.getStreamId(),
-                        streamInfo.getDeviceID(), streamInfo.getChannelId()), streamInfo);
+            streamInfo.getDeviceID(), streamInfo.getChannelId()), streamInfo);
     }
 
 
@@ -197,9 +196,9 @@ public class RedisCacheStorageImpl {
 //        }
         return redis.del(String.format("%S_%s_%s_%s_%s", VideoManagerConstants.PLAY_BLACK_PREFIX,
             serverId,
-                streamInfo.getStreamId(),
-                streamInfo.getDeviceID(),
-                streamInfo.getChannelId()));
+            streamInfo.getStreamId(),
+            streamInfo.getDeviceID(),
+            streamInfo.getChannelId()));
     }
 
 
@@ -209,12 +208,12 @@ public class RedisCacheStorageImpl {
         //         code);
         List<Object> playLeys = redis.scan(String.format("%S_%s_*_%s_%s", VideoManagerConstants.PLAY_BLACK_PREFIX,
             serverId,
-                deviceId,
-                code));
+            deviceId,
+            code));
         if (playLeys == null || playLeys.size() == 0) {
             playLeys = redis.scan(String.format("%S_%s_*_*_%s", VideoManagerConstants.PLAY_BLACK_PREFIX,
                 serverId,
-                    deviceId));
+                deviceId));
         }
         if (playLeys == null || playLeys.size() == 0) {
             return null;
@@ -338,38 +337,8 @@ public class RedisCacheStorageImpl {
     }
 
 
-    public void clearCacheByDeviceId(String deviceId) {
-        List<Object> playLeys = redis.scan(String.format("%S_%s_*_%s_*", VideoManagerConstants.PLAYER_PREFIX,
-                serverId,
-                deviceId));
-        if (playLeys.size() > 0) {
-            for (Object key : playLeys) {
-                redis.del(key.toString());
-            }
-        }
-
-        List<Object> playBackers = redis.scan(String.format("%S_%s_*_%s_*", VideoManagerConstants.PLAY_BLACK_PREFIX,
-                serverId,
-                deviceId));
-        if (playBackers.size() > 0) {
-            for (Object key : playBackers) {
-                redis.del(key.toString());
-            }
-        }
-
-        List<Object> deviceCache = redis.scan(String.format("%S%s_%s", VideoManagerConstants.DEVICE_PREFIX,
-                serverId,
-                deviceId));
-        if (deviceCache.size() > 0) {
-            for (Object key : deviceCache) {
-                redis.del(key.toString());
-            }
-        }
-    }
-
-
     public void outlineForAll() {
-        List<Object> onlineDevices = redis.scan(VideoManagerConstants.KEEPLIVEKEY_PREFIX + serverId + "_" + "*" );
+        List<Object> onlineDevices = redis.scan("*"+VideoManagerConstants.DEVICE_PREFIX + serverId + "_" + "*" );
         for (int i = 0; i < onlineDevices.size(); i++) {
             String key = (String) onlineDevices.get(i);
             redis.del(key);
@@ -379,10 +348,16 @@ public class RedisCacheStorageImpl {
 
     public List<String> getOnlineForAll() {
         List<String> result = new ArrayList<>();
-        List<Object> onlineDevices = redis.scan(VideoManagerConstants.KEEPLIVEKEY_PREFIX + serverId + "_"  + "*" );
-        for (int i = 0; i < onlineDevices.size(); i++) {
-            String key = (String) onlineDevices.get(i);
-            result.add((String) redis.get(key));
+        List<Object> onlineDeviceIds = redis.scan("*"+VideoManagerConstants.DEVICE_PREFIX + serverId + "_"  + "*" );
+        if (CollectionUtil.isNotEmpty(onlineDeviceIds)) {
+            onlineDeviceIds.forEach(deviceIdKey->{
+                if(Objects.isNull(deviceIdKey)){
+                    return;
+                }
+                String strKey = String.valueOf(deviceIdKey);
+                String key = strKey.substring(strKey.indexOf(VideoManagerConstants.DEVICE_PREFIX));
+                result.add(key.substring(key.lastIndexOf("_")+1));
+            });
         }
         return result;
     }
@@ -452,8 +427,7 @@ public class RedisCacheStorageImpl {
 
     public void updateDevice(MediaDevice device) {
         String key = VideoManagerConstants.DEVICE_PREFIX + serverId + "_" + device.getId();
-        redis.set(key, device);
-        redis.expire(key,device.getExpires());
+        redis.set(key, device, Optional.ofNullable(device.getExpires()).orElse(SipContext.getConfig().getTimeout().intValue()));
     }
 
 
@@ -476,13 +450,6 @@ public class RedisCacheStorageImpl {
         return (MediaServerItem)redis.get(key);
     }
 
-    public List<MediaDevice> getAllOnlineDevice() {
-        List<Object> scan = redis.scan(VideoManagerConstants.DEVICE_PREFIX + "*");
-        List<MediaDevice> mediaDevices = new ArrayList<>();
-        scan.stream().filter(result-> result instanceof MediaDevice).forEach(result->mediaDevices.add((MediaDevice) result));
-        return mediaDevices;
-    }
-
 //    public void updateGpsMsgInfo(GPSMsgInfo gpsMsgInfo) {
 //        String key = VideoManagerConstants.WVP_STREAM_GPS_MSG_PREFIX + serverId + "_" + gpsMsgInfo.getId();
 //        redis.set(key, gpsMsgInfo, 60); // 默认GPS消息保存1分钟
@@ -539,4 +506,28 @@ public class RedisCacheStorageImpl {
         return result;
     }
 
+    /**
+     * 发起点播请求的设备流信息
+     * @param streamId
+     * @param deviceId
+     * @param channelId
+     */
+    public void addRequestStream(String streamId,String deviceId,String channelId) {
+        MediaDeviceChannel mediaDeviceChannel = new MediaDeviceChannel();
+        mediaDeviceChannel.setDeviceId(deviceId);
+        mediaDeviceChannel.setChannelId(channelId);
+        String key = VideoManagerConstants.PLAY_REQUEST_PREFIX + serverId + "_" + streamId;
+        redis.set(key,mediaDeviceChannel);
+    }
+
+    public void removeRequestStream(String streamId){
+        String key = VideoManagerConstants.PLAY_REQUEST_PREFIX + serverId + "_" + streamId;
+        redis.del(key);
+    }
+
+    public MediaDeviceChannel getRequestStream(String streamId){
+        String key = VideoManagerConstants.PLAY_REQUEST_PREFIX + serverId + "_" + streamId;
+        return (MediaDeviceChannel) redis.get(key);
+    }
+
 }

+ 45 - 14
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/transmit/cmd/SipCommander.java

@@ -16,12 +16,10 @@ import org.jetlinks.community.media.gb28181.event.SipSubscribe;
 import org.jetlinks.community.media.service.LocalMediaServerItemService;
 import org.jetlinks.community.media.session.VideoStreamSessionManager;
 import org.jetlinks.community.media.sip.SipContext;
+import org.jetlinks.community.media.storage.impl.RedisCacheStorageImpl;
 import org.jetlinks.community.media.transmit.SIPRequestHeaderProvider;
 import org.jetlinks.community.media.zlm.ZLMHttpHookSubscribe;
 import org.jetlinks.community.media.zlm.entity.MediaServerItem;
-import org.jetlinks.core.event.EventBus;
-import org.jetlinks.core.event.Subscription;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.*;
 import javax.sip.*;
@@ -30,7 +28,6 @@ import javax.sip.header.CallIdHeader;
 import javax.sip.message.Request;
 import java.lang.reflect.Field;
 import java.text.ParseException;
-import java.time.Duration;
 import java.util.HashSet;
 
 /**
@@ -46,14 +43,12 @@ import java.util.HashSet;
 public class SipCommander {
 
     private final SIPRequestHeaderProvider headerProvider;
-
-    private final EventBus eventBus;
     private final ZLMHttpHookSubscribe subscribe;
     private final LocalMediaServerItemService mediaServerItemService;
     private final VideoStreamSessionManager streamSessionManager;
     private final UserSetup userSetup;
-
     private final SipSubscribe sipSubscribe;
+    private final RedisCacheStorageImpl redisCacheStorage;
 
     public Mono<Void> playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, MediaDevice device, String channelId,
                                     ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent) {
@@ -68,7 +63,7 @@ public class SipCommander {
             log.info("{} 分配的ZLM为: {} [{}:{}]", streamId, mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort());
 
 
-            StringBuffer content = new StringBuffer(200);
+            StringBuilder content = new StringBuilder(200);
             content.append("v=0\r\n");
             content.append("o="+ sipConfig.getId()+" 0 0 IN IP4 "+ mediaServerItem.getSdpIp() +"\r\n");
             content.append("s=Play\r\n");
@@ -167,6 +162,7 @@ public class SipCommander {
             CallIdHeader callIdHeader =sipProvider.getNewCallId();
 
 
+            redisCacheStorage.addRequestStream(streamId,device.getId(),channelId);
             Request request =  headerProvider.createInviteRequest(device, channelId, content.toString(), null, "FromInvt" + tm, null, ssrcInfo.getSsrc(), callIdHeader);
 
             return transmitRequest(sipProvider, request, (e -> {
@@ -223,15 +219,16 @@ public class SipCommander {
                     eventListenersField.setAccessible(true);
                     eventListenersField.set(dialog, new HashSet<>());
                 } catch (NoSuchFieldException | IllegalAccessException e) {
-                    e.printStackTrace();
+                    log.error("视频流停止时系统出错,",e);
+                    return Mono.error(new BusinessException("系统内部调用出错,Sip配置出现问题"));
                 }
             }
 
             Request byeRequest = dialog.createRequest(Request.BYE);
-            SipURI byeURI = (SipURI) byeRequest.getRequestURI();
+            SipURI byeUri = (SipURI) byeRequest.getRequestURI();
             SIPRequest request = (SIPRequest)transaction.getRequest();
-            byeURI.setHost(request.getRemoteAddress().getHostName());
-            byeURI.setPort(request.getRemotePort());
+            byeUri.setHost(request.getRemoteAddress().getHostAddress());
+            byeUri.setPort(request.getRemotePort());
             ClientTransaction clientTransaction = sipProvider.getNewClientTransaction(byeRequest);
 
 
@@ -262,7 +259,7 @@ public class SipCommander {
      */
     public Mono<Void> deviceInfoQuery(MediaDevice device) {
         try {
-            StringBuffer catalogXml = new StringBuffer(200);
+            StringBuilder catalogXml = new StringBuilder(200);
             catalogXml.append("<?xml version=\"1.0\" encoding=\"GB2312\"?>\r\n");
             catalogXml.append("<Query>\r\n");
             catalogXml.append("<CmdType>DeviceInfo</CmdType>\r\n");
@@ -294,7 +291,7 @@ public class SipCommander {
      */
     public Mono<Void> catalogQuery(MediaDevice device, SipSubscribe.Event errorEvent) {
         try {
-            StringBuffer catalogXml = new StringBuffer(200);
+            StringBuilder catalogXml = new StringBuilder(200);
             catalogXml.append("<?xml version=\"1.0\" encoding=\"GB2312\"?>\r\n");
             catalogXml.append("<Query>\r\n");
             catalogXml.append("<CmdType>Catalog</CmdType>\r\n");
@@ -316,6 +313,40 @@ public class SipCommander {
         }
     }
 
+    /**
+     * 订阅目录
+     * @param device
+     * @param okEvent
+     * @param errorEvent
+     * @return
+     */
+    public Mono<Boolean> catalogSubscribe(MediaDevice device, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) {
+        try {
+            StringBuilder cmdXml = new StringBuilder(200);
+            cmdXml.append("<?xml version=\"1.0\" encoding=\"GB2312\"?>\r\n");
+            cmdXml.append("<Query>\r\n");
+            cmdXml.append("<CmdType>Catalog</CmdType>\r\n");
+            cmdXml.append("<SN>" + (int)((Math.random()*9+1)*100000) + "</SN>\r\n");
+            cmdXml.append("<DeviceID>" + device.getId() + "</DeviceID>\r\n");
+            cmdXml.append("</Query>\r\n");
+
+            String tm = Long.toString(System.currentTimeMillis());
+
+            SipProvider sipProvider = SipContext.getSipProvider();
+            CallIdHeader callIdHeader =sipProvider.getNewCallId();
+
+            // 有效时间默认为60秒以上
+            Request request = headerProvider.createSubscribeRequest(device, cmdXml.toString(), "z9hG4bK-viaPos-" + tm,
+                "fromTagPos" + tm, null, device.getSubscribeCycleForCatalog() + 60, "Catalog" ,
+                callIdHeader);
+            return  transmitRequest( sipProvider,request, errorEvent, okEvent).thenReturn(true);
+
+        } catch ( NumberFormatException | ParseException | InvalidArgumentException	| SipException e) {
+            log.warn("Sip对设备{}发起订阅时出现错误,",device.getId(),e);
+            return Mono.just(false);
+        }
+    }
+
     private  Mono<Void> transmitRequest(SipProvider sipProvider, Request request, SipSubscribe.Event errorEvent) throws SipException {
         return transmitRequest(sipProvider, request, errorEvent, null);
     }

+ 62 - 51
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/ZLMHttpHookListener.java

@@ -1,6 +1,7 @@
 package org.jetlinks.community.media.zlm;
 
 import cn.hutool.core.bean.BeanUtil;
+import cn.hutool.core.lang.UUID;
 import cn.hutool.core.util.StrUtil;
 import cn.hutool.json.JSONObject;
 import cn.hutool.json.JSONUtil;
@@ -8,15 +9,19 @@ import com.alibaba.fastjson.JSON;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.hswebframework.web.authorization.annotation.Authorize;
+import org.jetlinks.community.media.bean.SSRCInfo;
 import org.jetlinks.community.media.config.UserSetup;
 import org.jetlinks.community.media.contanst.VideoManagerConstants;
 import org.jetlinks.community.media.entity.GbStream;
 import org.jetlinks.community.media.bean.StreamInfo;
+import org.jetlinks.community.media.entity.MediaDevice;
 import org.jetlinks.community.media.entity.MediaDeviceChannel;
 import org.jetlinks.community.media.service.LocalGbStreamService;
 import org.jetlinks.community.media.service.LocalMediaDeviceChannelService;
 import org.jetlinks.community.media.service.LocalMediaServerItemService;
+import org.jetlinks.community.media.service.LocalPlayService;
 import org.jetlinks.community.media.storage.impl.RedisCacheStorageImpl;
+import org.jetlinks.community.media.transmit.cmd.SipCommander;
 import org.jetlinks.community.media.zlm.dto.MediaItem;
 import org.jetlinks.community.media.zlm.dto.StreamPushItem;
 import org.jetlinks.community.media.zlm.dto.OriginType;
@@ -52,8 +57,11 @@ public class ZLMHttpHookListener {
     private final ZLMMediaListManager zlmMediaListManager;
     private final LocalGbStreamService gbStreamService;
     private final ClusterEventBus clusterEventBus;
+    //todo
     private final String serverId="";
     private final UserSetup userSetup;
+    private final SipCommander sipCommander;
+    private final LocalPlayService localPlayService;
     /**
      * 服务器定时上报时间,上报间隔可配置,默认10s上报一次
      *
@@ -123,7 +131,7 @@ public class ZLMHttpHookListener {
      */
     @ResponseBody
     @PostMapping(value = "/on_play", produces = "application/json;charset=UTF-8")
-    public ResponseEntity<String> onPlay(@RequestBody JSONObject json){
+    public ResponseEntity<String> onPlay(@RequestBody JSONObject json) throws Exception {
 
         if (log.isDebugEnabled()) {
             log.debug("[ ZLM HOOK ]on_play API调用,参数:" + json.toString());
@@ -132,10 +140,9 @@ public class ZLMHttpHookListener {
         ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_play, json);
         if (subscribe != null ) {
             MediaServerItem mediaInfo = mediaServerItemService.getOneByServerId(mediaServerId);
-            //todo
-//			if (mediaInfo != null) {
-//				subscribe.response(mediaInfo, json);
-//			}
+            if (mediaInfo != null) {
+                subscribe.accept(mediaInfo, json);
+            }
 
         }
         JSONObject ret = new JSONObject()
@@ -287,6 +294,8 @@ public class ZLMHttpHookListener {
         String schema = item.getSchema();
         JSONObject json = JSONUtil.parseObj(item);
 
+        redisCatchStorage.removeRequestStream(streamId);
+
         Mono<Long> result = Mono.fromRunnable(()->{}).thenReturn(1L);
 
         boolean regist = item.isRegist();
@@ -470,52 +479,54 @@ public class ZLMHttpHookListener {
 //			return new ResponseEntity<String>(ret.toString(), HttpStatus.OK);
 //		}
 //	}
-//
-//	/**
-//	 * 流未找到事件,用户可以在此事件触发时,立即去拉流,这样可以实现按需拉流;此事件对回复不敏感。
-//	 *
-//	 */
-//	@ResponseBody
-//	@PostMapping(value = "/on_stream_not_found", produces = "application/json;charset=UTF-8")
-//	public ResponseEntity<String> onStreamNotFound(@RequestBody JSONObject json){
-//		if (logger.isDebugEnabled()) {
-//			logger.debug("[ ZLM HOOK ]on_stream_not_found API调用,参数:" + json.toString());
-//		}
-//		String mediaServerId = json.getString("mediaServerId");
-//		MediaServerItem mediaInfo = mediaServerService.getOneByServerId(mediaServerId);
-//		if (userSetup.isAutoApplyPlay() && mediaInfo != null) {
-//			String app = json.getString("app");
-//			String streamId = json.getString("stream");
-//			if ("rtp".equals(app)) {
-//				String[] s = streamId.split("_");
-//				if (s.length == 2) {
-//					String id = s[0];
-//					String channelId = s[1];
-//					Device device = redisCatchStorage.getDevice(id);
-//					if (device != null) {
-//						UUID uuid = UUID.randomUUID();
-//						SSRCInfo ssrcInfo;
-//						String streamId2 = null;
-//						if (mediaInfo.isRtpEnable()) {
-//							streamId2 = String.format("%s_%s", device.getId(), channelId);
-//						}
-//						ssrcInfo = mediaServerService.openRTPServer(mediaInfo, streamId2);
-//						cmder.playStreamCmd(mediaInfo, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
-//							logger.info("收到订阅消息: " + response.toJSONString());
-//							playService.onPublishHandlerForPlay(mediaServerItemInuse, response, id, channelId, uuid.toString());
-//						}, null);
-//					}
-//
-//				}
-//			}
-//
-//		}
-//
-//		JSONObject ret = new JSONObject();
-//		ret.put("code", 0);
-//		ret.put("msg", "success");
-//		return new ResponseEntity<String>(ret.toString(), HttpStatus.OK);
-//	}
+
+    /**
+     * 流未找到事件,用户可以在此事件触发时,立即去拉流,这样可以实现按需拉流;此事件对回复不敏感。
+     *
+     */
+    @ResponseBody
+    @PostMapping(value = "/on_stream_not_found", produces = "application/json;charset=UTF-8")
+    public Mono<ResponseEntity<String>> onStreamNotFound(@RequestBody JSONObject json){
+        if (log.isDebugEnabled()) {
+            log.debug("[ ZLM HOOK ]on_stream_not_found API调用,参数:" + json.toString());
+        }
+        String mediaServerId = json.getStr("mediaServerId");
+
+        return Mono.justOrEmpty( redisCatchStorage.getMediaServerItem(mediaServerId))
+            .flatMap(mediaInfo->{
+                if(userSetup.isAutoApplyPlay()){
+                    String app = json.getStr("app");
+                    String streamId = json.getStr("stream");
+                    if ("rtp".equals(app)) {
+                        MediaDeviceChannel channel = redisCatchStorage.getRequestStream(streamId);
+                        if(channel==null||StrUtil.isEmpty(channel.getDeviceId())||StrUtil.isEmpty(channel.getChannelId())){
+                            return Mono.empty();
+                        }
+                        redisCatchStorage.removeRequestStream(streamId);
+                        MediaDevice device = redisCatchStorage.getDevice(channel.getDeviceId());
+                        if (device != null) {
+                            UUID uuid = UUID.randomUUID();
+                            SSRCInfo ssrcInfo;
+                            String streamId2 = null;
+                            if (mediaInfo.isRtpEnable()) {
+                                streamId2 = String.format("%s_%s", device.getId(), channel.getChannelId());
+                            }
+                            ssrcInfo = mediaServerItemService.openRTPServer(mediaInfo, streamId2);
+                            return sipCommander.playStreamCmd(mediaInfo, ssrcInfo, device, channel.getChannelId(), (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
+                                log.info("收到订阅消息: " + response.toString());
+                                localPlayService.onPublishHandlerForPlay(mediaServerItemInuse, response, channel.getDeviceId(), channel.getChannelId(), uuid.toString()).subscribe();
+                            }, null);
+
+
+                        }
+                    }
+                }
+                return Mono.empty();
+            })
+            .thenReturn(ResponseEntity.ok( new JSONObject()
+                .putOpt("code", 0)
+                .putOpt("msg", "success").toString()));
+    }
 
     /**
      * 服务器启动事件,可以用于监听服务器崩溃重启;此事件对回复不敏感。

+ 2 - 0
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/utils/RedisUtil.java

@@ -2,6 +2,7 @@ package org.jetlinks.community.utils;
 
 import cn.hutool.json.JSONObject;
 import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
 import org.springframework.data.redis.core.*;
 import org.springframework.stereotype.Component;
 import org.springframework.util.CollectionUtils;
@@ -17,6 +18,7 @@ import java.util.concurrent.TimeUnit;
 @Component
 @AllArgsConstructor
 @SuppressWarnings(value = {"rawtypes", "unchecked"})
+@Slf4j
 public class RedisUtil {
 
     private final RedisTemplate redisTemplate;

+ 0 - 5
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/utils/SipUtils.java

@@ -25,9 +25,4 @@ public class SipUtils {
         SipUri uri = (SipUri) address.getURI();
         return uri.getUser();
     }
-
-    public static String keepAliveKey(String deviceId){
-        return VideoManagerConstants.KEEPLIVEKEY_PREFIX +":"+deviceId;
-    }
-
 }

+ 2 - 2
jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/providers/DeviceAlarmProvider.java

@@ -40,7 +40,7 @@ public class DeviceAlarmProvider implements SubscriberProvider {
     public ConfigMetadata getConfigMetadata() {
         return new DefaultConfigMetadata()
             .add("productId", "产品ID", "产品ID,支持通配符:*", StringType.GLOBAL)
-            .add("id", "设备ID", "设备ID,支持通配符:*", StringType.GLOBAL)
+            .add("deviceId", "设备ID", "设备ID,支持通配符:*", StringType.GLOBAL)
             .add("productId", "告警ID", "告警ID,支持通配符:*", StringType.GLOBAL)
             ;
     }
@@ -50,7 +50,7 @@ public class DeviceAlarmProvider implements SubscriberProvider {
         ValueObject configs = ValueObject.of(config);
 
         String productId = configs.getString("productId").orElse("*");
-        String deviceId = configs.getString("id").orElse("*");
+        String deviceId = configs.getString("deviceId").orElse("*");
         String alarmId = configs.getString("alarmId").orElse("*");
 
         Flux<Notify> flux = eventBus

+ 7 - 7
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/device/DeviceAlarmTaskExecutorProvider.java

@@ -52,7 +52,7 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
     class DeviceAlarmTaskExecutor extends AbstractTaskExecutor {
 
         List<String> default_columns = Arrays.asList(
-            "timestamp", "id", "this.headers.deviceName deviceName"
+            "timestamp", "deviceId", "this.headers.deviceName deviceName"
         );
 
         private DeviceAlarmRule rule;
@@ -138,7 +138,7 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
             if (CollectionUtils.isNotEmpty(rule.getProperties())) {
                 List<String> newColumns = new ArrayList<>(Arrays.asList(
                     "this.deviceName deviceName",
-                    "this.id id",
+                    "this.deviceId deviceId",
                     "this.timestamp timestamp"));
                 for (DeviceAlarmRule.Property property : rule.getProperties()) {
                     if (StringUtils.isEmpty(property.getProperty())) {
@@ -215,7 +215,7 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
                         StringUtils.hasText(rule.getDeviceId())
                             ? flux.window(windowTime, scheduler)//规则已经指定了固定的设备,直接开启时间窗口就行
                             : flux //规则配置在设备产品上,则按设备ID分组后再开窗口
-                            .groupBy(map -> String.valueOf(map.get("id")), Integer.MAX_VALUE)
+                            .groupBy(map -> String.valueOf(map.get("deviceId")), Integer.MAX_VALUE)
                             .flatMap(group -> group.window(windowTime, scheduler), Integer.MAX_VALUE))
                     //处理每一组数据
                     .flatMap(group -> group
@@ -240,10 +240,10 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
                         map.putIfAbsent("productName", rule.getProductName());
                     }
                     if (StringUtils.hasText(rule.getDeviceId())) {
-                        map.putIfAbsent("id", rule.getDeviceId());
+                        map.putIfAbsent("deviceId", rule.getDeviceId());
                     }
                     if (!map.containsKey("deviceName")) {
-                        map.putIfAbsent("deviceName", map.get("id"));
+                        map.putIfAbsent("deviceName", map.get("deviceId"));
                     }
                     if (!map.containsKey("productName")) {
                         map.putIfAbsent("productName", map.get("productId"));
@@ -252,11 +252,11 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
                         log.debug("发生设备告警:{}", map);
                     }
                     // 推送告警信息到消息网关中
-                    // /rule-engine/device/alarm/{productId}/{id}/{ruleId}
+                    // /rule-engine/device/alarm/{productId}/{deviceId}/{ruleId}
                     return eventBus
                         .publish(String.format(
                             "/rule-engine/device/alarm/%s/%s/%s",
-                            rule.getProductId(), map.get("id"), rule.getId()
+                            rule.getProductId(), map.get("deviceId"), rule.getId()
                         ), map)
                         .then(Mono.just(map));
                 });

+ 48 - 48
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/device/RuleSceneTaskExecutorProvider.java

@@ -58,7 +58,7 @@ public class RuleSceneTaskExecutorProvider implements TaskExecutorProvider {
     class RuleSceneTaskExecutor extends AbstractTaskExecutor {
 
 //        List<String> default_columns = Arrays.asList(
-//            "timestamp", "id"
+//            "timestamp", "deviceId"
 //        );
 
         private RuleSceneEntity rule;
@@ -154,54 +154,54 @@ public class RuleSceneTaskExecutorProvider implements TaskExecutorProvider {
             Flux<Map<String, Object>> result =null;
             List<Object> binds = new ArrayList<>();
             for (RuleSceneEntity.Trigger triggerMix : rule.getTriggers()) {
-            RuleSceneEntity.DeviceTrigger trigger = triggerMix.getDevice();
-            String topic = trigger.getType().getTopic(trigger.getProductId(), trigger.getDeviceId(), trigger.getModelId());
-            binds.addAll(trigger.toFilterBinds());
-            //订阅主题
-            Subscription subscription = Subscription.of(
-                "rule_scene:" + topic,
-                topic,
-                Subscription.Feature.local
-            );
-            ReactorQLContext context = ReactorQLContext
-                .ofDatasource(ignore ->
-                    eventBus
-                        .subscribe(subscription, DeviceMessage.class)
-                        .map(Jsonable::toJson)
-                        .doOnNext(json -> {
-                            json.put("ruleSceneId", rule.getId());
-                            json.put("ruleSceneName", rule.getName());
-                        })
+                RuleSceneEntity.DeviceTrigger trigger = triggerMix.getDevice();
+                String topic = trigger.getType().getTopic(trigger.getProductId(), trigger.getDeviceId(), trigger.getModelId());
+                binds.addAll(trigger.toFilterBinds());
+                //订阅主题
+                Subscription subscription = Subscription.of(
+                    "rule_scene:" + topic,
+                    topic,
+                    Subscription.Feature.local
                 );
-
-            binds.forEach(context::bind);
-            Flux<Map<String, Object>> resultFlux = (ql == null ? ql = createQL(rule) : ql)
-                .start(context)
-                .map(ReactorQLRecord::asMap);
-
-            ShakeLimit shakeLimit = trigger.getShakeLimit();
-            if (shakeLimit != null
-                && shakeLimit.isEnabled()
-                && shakeLimit.getTime() > 0) {
-                int thresholdNumber = shakeLimit.getThreshold();
-                Duration windowTime = Duration.ofSeconds(shakeLimit.getTime());
-                resultFlux
-                    .as(flux ->
-                        StringUtils.hasText(trigger.getDeviceId())
-                            ? flux.window(windowTime, scheduler)//规则已经指定了固定的设备,直接开启时间窗口就行
-                            : flux //规则配置在设备产品上,则按设备ID分组后再开窗口
-                            .groupBy(map -> String.valueOf(map.get("id")), Integer.MAX_VALUE)
-                            .flatMap(group -> group.window(windowTime, scheduler), Integer.MAX_VALUE))
-                    //处理每一组数据
-                    .flatMap(group -> group
-                        .index((index, data) -> Tuples.of(index + 1, data)) //给数据打上索引,索引号就是告警次数
-                        .filter(tp -> tp.getT1() >= thresholdNumber)//超过阈值告警
-                        .as(flux -> shakeLimit.isAlarmFirst() ? flux.take(1) : flux.takeLast(1))//取第一个或者最后一个
-                        .map(tp2 -> {
-                            tp2.getT2().put("totalScene", tp2.getT1());
-                            return tp2.getT2();
-                        }));
-            }
+                ReactorQLContext context = ReactorQLContext
+                    .ofDatasource(ignore ->
+                        eventBus
+                            .subscribe(subscription, DeviceMessage.class)
+                            .map(Jsonable::toJson)
+                            .doOnNext(json -> {
+                                json.put("ruleSceneId", rule.getId());
+                                json.put("ruleSceneName", rule.getName());
+                            })
+                    );
+
+                binds.forEach(context::bind);
+                Flux<Map<String, Object>> resultFlux = (ql == null ? ql = createQL(rule) : ql)
+                    .start(context)
+                    .map(ReactorQLRecord::asMap);
+
+                ShakeLimit shakeLimit = trigger.getShakeLimit();
+                if (shakeLimit != null
+                    && shakeLimit.isEnabled()
+                    && shakeLimit.getTime() > 0) {
+                    int thresholdNumber = shakeLimit.getThreshold();
+                    Duration windowTime = Duration.ofSeconds(shakeLimit.getTime());
+                    resultFlux
+                        .as(flux ->
+                            StringUtils.hasText(trigger.getDeviceId())
+                                ? flux.window(windowTime, scheduler)//规则已经指定了固定的设备,直接开启时间窗口就行
+                                : flux //规则配置在设备产品上,则按设备ID分组后再开窗口
+                                .groupBy(map -> String.valueOf(map.get("deviceId")), Integer.MAX_VALUE)
+                                .flatMap(group -> group.window(windowTime, scheduler), Integer.MAX_VALUE))
+                        //处理每一组数据
+                        .flatMap(group -> group
+                            .index((index, data) -> Tuples.of(index + 1, data)) //给数据打上索引,索引号就是告警次数
+                            .filter(tp -> tp.getT1() >= thresholdNumber)//超过阈值告警
+                            .as(flux -> shakeLimit.isAlarmFirst() ? flux.take(1) : flux.takeLast(1))//取第一个或者最后一个
+                            .map(tp2 -> {
+                                tp2.getT2().put("totalScene", tp2.getT1());
+                                return tp2.getT2();
+                            }));
+                }
                 result=result==null?resultFlux:result.mergeWith(resultFlux);
             }
             return result;

+ 1 - 1
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/entity/DeviceAlarmEntity.java

@@ -34,7 +34,7 @@ public class DeviceAlarmEntity extends GenericEntity<String> {
     @Schema(description = "device或者product")
     private String target;
 
-    //id or productId
+    //deviceId or productId
     @Column(length = 64, nullable = false, updatable = false)
     @NotBlank(message = "[targetId]不能为空")
     @Schema(description = "deviceId或者productId")