Bläddra i källkod

add catalog 和 deviceInfo

18339543638 3 år sedan
förälder
incheckning
a1e0ed8413

+ 1 - 1
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/entity/MediaDeviceChannel.java

@@ -254,7 +254,7 @@ public class MediaDeviceChannel extends GenericEntity<String> {
 	 * <Status>OFF</Status>
 	 * <Status>OFF</Status>
 	 * 遇到过NVR下的IPC下发信令可以推流, 但是 Status 响应 OFF
 	 * 遇到过NVR下的IPC下发信令可以推流, 但是 Status 响应 OFF
 	 */
 	 */
-    @Column(name = "state",length = 16)
+    @Column(name = "status",length = 16)
     @EnumCodec
     @EnumCodec
     @ColumnType(javaType = String.class)
     @ColumnType(javaType = String.class)
     @DefaultValue("offline")
     @DefaultValue("offline")

+ 2 - 1
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalMediaServerItemService.java

@@ -492,7 +492,8 @@ public class LocalMediaServerItemService extends GenericReactiveCrudService<Medi
         log.info("[ ZLM:{} ]-[ {}:{} ]设置zlm",
         log.info("[ ZLM:{} ]-[ {}:{} ]设置zlm",
             mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
             mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
         String protocol = sslEnabled ? "https" : "http";
         String protocol = sslEnabled ? "https" : "http";
-        String hookPrex = String.format("%s://%s:%s/index/hook", protocol, mediaServerItem.getHookIp(), serverPort);
+//        String hookPrex = String.format("%s://%s:%s/index/hook", protocol, mediaServerItem.getHookIp(), serverPort);
+        String hookPrex = String.format("%s://%s/index/hook", protocol, mediaServerItem.getHookIp());
         String recordHookPrex = null;
         String recordHookPrex = null;
         if (mediaServerItem.getRecordAssistPort() != 0) {
         if (mediaServerItem.getRecordAssistPort() != 0) {
             recordHookPrex = String.format("http://127.0.0.1:%s/api/record", mediaServerItem.getRecordAssistPort());
             recordHookPrex = String.format("http://127.0.0.1:%s/api/record", mediaServerItem.getRecordAssistPort());

+ 26 - 23
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/ZLMHttpHookListener.java

@@ -11,6 +11,8 @@ import org.jetlinks.community.media.storage.impl.RedisCacheStorageImpl;
 import org.jetlinks.community.media.zlm.dto.MediaItem;
 import org.jetlinks.community.media.zlm.dto.MediaItem;
 import org.jetlinks.community.media.zlm.entity.MediaServerItem;
 import org.jetlinks.community.media.zlm.entity.MediaServerItem;
 import org.jetlinks.community.media.zlm.dto.OriginType;
 import org.jetlinks.community.media.zlm.dto.OriginType;
+import org.jetlinks.core.event.EventBus;
+import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
 import org.springframework.http.ResponseEntity;
 import org.springframework.web.bind.annotation.*;
 import org.springframework.web.bind.annotation.*;
 import org.springframework.web.server.ServerWebExchange;
 import org.springframework.web.server.ServerWebExchange;
@@ -31,33 +33,33 @@ public class ZLMHttpHookListener {
     private final LocalMediaServerItemService mediaServerItemService;
     private final LocalMediaServerItemService mediaServerItemService;
     private final RedisCacheStorageImpl redisCatchStorage;
     private final RedisCacheStorageImpl redisCatchStorage;
     private final ZLMHttpHookSubscribe subscribe;
     private final ZLMHttpHookSubscribe subscribe;
-//
-//	/**
-//	 * 服务器定时上报时间,上报间隔可配置,默认10s上报一次
-//	 *
-//	 */
-//	@ResponseBody
-//	@PostMapping(value = "/on_server_keepalive", produces = "application/json;charset=UTF-8")
-//	public ResponseEntity<String> onServerKeepalive(@RequestBody JSONObject json){
-//
-//		if (logger.isDebugEnabled()) {
-//			logger.debug("[ ZLM HOOK ]on_server_keepalive API调用,参数:" + json.toString());
-//		}
-//		String mediaServerId = json.getString("mediaServerId");
-//
+    private final EventBus eventBus;
+    /**
+     * 服务器定时上报时间,上报间隔可配置,默认10s上报一次
+     *
+     */
+    @ResponseBody
+    @PostMapping(value = "/on_server_keepalive", produces = "application/json;charset=UTF-8")
+    public Mono<ResponseEntity<String>> onServerKeepalive(@RequestBody JSONObject json){
+
+        if (log.isDebugEnabled()) {
+            log.debug("[ ZLM HOOK ]on_server_keepalive API调用,参数:" + json.toString());
+        }
 //		List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(ZLMHttpHookSubscribe.HookType.on_server_keepalive);
 //		List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(ZLMHttpHookSubscribe.HookType.on_server_keepalive);
 //		if (subscribes != null  && subscribes.size() > 0) {
 //		if (subscribes != null  && subscribes.size() > 0) {
 //			for (ZLMHttpHookSubscribe.Event subscribe : subscribes) {
 //			for (ZLMHttpHookSubscribe.Event subscribe : subscribes) {
 //				subscribe.response(null, json);
 //				subscribe.response(null, json);
 //			}
 //			}
 //		}
 //		}
-//
-//		JSONObject ret = new JSONObject();
-//		ret.put("code", 0);
-//		ret.put("msg", "success");
-//		return new ResponseEntity<String>(ret.toString(), HttpStatus.OK);
-//	}
-//
+
+        JSONObject ret = new JSONObject();
+        ret.put("code", 0);
+        ret.put("msg", "success");
+        return eventBus.
+            publish(ZLMHttpHookSubscribe.HookType.on_server_keepalive.name(),json)
+            .thenReturn(ResponseEntity.ok(ret.toString()));
+    }
+
 //	/**
 //	/**
 //	 * 流量统计事件,播放器或推流器断开时并且耗用流量超过特定阈值时会触发此事件,阈值通过配置文件general.flowThreshold配置;此事件对回复不敏感。
 //	 * 流量统计事件,播放器或推流器断开时并且耗用流量超过特定阈值时会触发此事件,阈值通过配置文件general.flowThreshold配置;此事件对回复不敏感。
 //	 *
 //	 *
@@ -451,10 +453,11 @@ public class ZLMHttpHookListener {
         }
         }
         String remoteAddr = exchange.getRequest().getRemoteAddress().getAddress().toString();
         String remoteAddr = exchange.getRequest().getRemoteAddress().getAddress().toString();
         jsonObject.append("ip", remoteAddr);
         jsonObject.append("ip", remoteAddr);
-        subscribe.publish(ZLMHttpHookSubscribe.HookType.on_server_started,null,jsonObject);
         JSONObject ret = new JSONObject()
         JSONObject ret = new JSONObject()
             .append("code", 0)
             .append("code", 0)
             .append("msg", "success");
             .append("msg", "success");
-        return Mono.just(ResponseEntity.ok(ret.toString()));
+        return eventBus.
+            publish(ZLMHttpHookSubscribe.HookType.on_server_started.name(),jsonObject)
+            .thenReturn(ResponseEntity.ok(ret.toString()));
     }
     }
 }
 }

+ 22 - 13
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/ZLMRunner.java

@@ -9,6 +9,9 @@ import lombok.extern.slf4j.Slf4j;
 import org.jetlinks.community.media.config.MediaConfig;
 import org.jetlinks.community.media.config.MediaConfig;
 import org.jetlinks.community.media.service.LocalMediaServerItemService;
 import org.jetlinks.community.media.service.LocalMediaServerItemService;
 import org.jetlinks.community.media.zlm.entity.MediaServerItem;
 import org.jetlinks.community.media.zlm.entity.MediaServerItem;
+import org.jetlinks.core.event.EventBus;
+import org.jetlinks.core.event.Subscription;
+import org.jetlinks.core.event.TopicPayload;
 import org.springframework.boot.CommandLineRunner;
 import org.springframework.boot.CommandLineRunner;
 import org.springframework.boot.autoconfigure.AutoConfigureAfter;
 import org.springframework.boot.autoconfigure.AutoConfigureAfter;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -41,6 +44,7 @@ public class ZLMRunner implements CommandLineRunner {
 
 
     private final MediaConfig mediaConfig;
     private final MediaConfig mediaConfig;
 
 
+    private final EventBus eventBus;
 
 
     @Override
     @Override
     public void run(String... strings)  {
     public void run(String... strings)  {
@@ -95,17 +99,20 @@ public class ZLMRunner implements CommandLineRunner {
      * @return Mono
      * @return Mono
      */
      */
     private void subscribeOnServerStarted(){
     private void subscribeOnServerStarted(){
-        hookSubscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_server_started,null,
-            (MediaServerItem mediaServerItem, JSONObject response)->{
-                ZLMServerConfig zlmServerConfig = JSONUtil.toBean(response, ZLMServerConfig.class);
+        // 订阅 zlm保活事件, 当zlm离线时做业务的处理
+        eventBus.subscribe(Subscription.of("ZLM-started",ZLMHttpHookSubscribe.HookType.on_server_started.name(), Subscription.Feature.local))
+            .flatMap(payload -> Mono.just(payload.bodyToJson(true)))
+            .flatMap(response -> {
+                ZLMServerConfig zlmServerConfig = JSONUtil.toBean(response.toJSONString(), ZLMServerConfig.class);
                 if (zlmServerConfig !=null ) {
                 if (zlmServerConfig !=null ) {
                     if (startGetMedia != null) {
                     if (startGetMedia != null) {
                         startGetMedia.remove(zlmServerConfig.getGeneralMediaServerId());
                         startGetMedia.remove(zlmServerConfig.getGeneralMediaServerId());
                     }
                     }
-                    mediaServerItemService.zlmServerOnline(zlmServerConfig)
-                        .subscribe();
+                    return mediaServerItemService.zlmServerOnline(zlmServerConfig);
                 }
                 }
-            });
+                return Mono.empty();
+            })
+            .subscribe();
     }
     }
 
 
     /**
     /**
@@ -114,15 +121,17 @@ public class ZLMRunner implements CommandLineRunner {
      */
      */
     private void subscribeOnServerKeepalive(){
     private void subscribeOnServerKeepalive(){
         // 订阅 zlm保活事件, 当zlm离线时做业务的处理
         // 订阅 zlm保活事件, 当zlm离线时做业务的处理
-        hookSubscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_server_keepalive,null,
-            (MediaServerItem mediaServerItem, JSONObject response)->{
-                String mediaServerId = response.getStr("mediaServerId");
+        eventBus.subscribe(Subscription.of("ZLM-keepAlive",ZLMHttpHookSubscribe.HookType.on_server_keepalive.name(), Subscription.Feature.local))
+            .flatMap(payload -> Mono.just(payload.bodyToJson(true)))
+            .flatMap(response -> {
+                String mediaServerId = response.getString("mediaServerId");
                 if (mediaServerId !=null ) {
                 if (mediaServerId !=null ) {
-                    mediaServerItemService
-                        .updateMediaServerKeepalive(mediaServerId, response.getJSONObject("data"))
-                        .subscribe();
+                    return mediaServerItemService
+                        .updateMediaServerKeepalive(mediaServerId, JSONUtil.parseObj(response.getJSONObject("data")));
                 }
                 }
-            });
+                return Mono.empty();
+            })
+            .subscribe();
     }
     }
 
 
     /**
     /**