Bläddra i källkod

changed 改变网络泵解析方式

18339543638 3 år sedan
förälder
incheckning
1d90b0662f

+ 1 - 1
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttConnection.java

@@ -64,7 +64,7 @@ class VertxMqttConnection implements MqttConnection {
 
     public VertxMqttConnection(MqttEndpoint endpoint) {
         this.endpoint = endpoint;
-        this.keepAliveTimeoutMs = (endpoint.keepAliveTimeSeconds() + 10) * 1000L;
+        this.keepAliveTimeoutMs = (endpoint.keepAliveTimeSeconds() *2) * 1000L;
     }
 
     private final Consumer<MqttConnection> defaultListener = mqttConnection -> {

+ 1 - 0
jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/netpump/DataUtils.java

@@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 
 import java.util.*;
+import java.util.concurrent.atomic.DoubleAccumulator;
 
 /**
  * 数据处理与判断

+ 4 - 6
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalMediaDeviceService.java

@@ -57,7 +57,6 @@ public class LocalMediaDeviceService extends GenericReactiveCrudService<MediaDev
     private final DeviceRegistry registry;
     private final SipCommander cmder;
     private final DeviceGatewayMonitor gatewayMonitor;
-    private final DecodedClientMessageHandler connector;
     private final DeviceSessionManager sessionManager;
     private final LocalMediaDeviceChannelService deviceChannelService;
 
@@ -65,7 +64,6 @@ public class LocalMediaDeviceService extends GenericReactiveCrudService<MediaDev
                                    RedisCacheStorageImpl redisCacheStorage,
                                    EventBus eventBus, DeviceRegistry registry,
                                    SipCommander cmder,
-                                   DecodedClientMessageHandler connector,
                                    DeviceSessionManager sessionManager,
                                    SipServerHelper sipServerHelper,
                                    LocalMediaDeviceChannelService deviceChannelService) {
@@ -75,7 +73,6 @@ public class LocalMediaDeviceService extends GenericReactiveCrudService<MediaDev
         this.registry = registry;
         this.cmder = cmder;
         this.gatewayMonitor = GatewayMonitors.getDeviceGatewayMonitor("sip");
-        this.connector = connector;
         this.sessionManager = sessionManager;
         this.sipServerHelper = sipServerHelper;
         this.deviceChannelService=deviceChannelService;
@@ -87,13 +84,14 @@ public class LocalMediaDeviceService extends GenericReactiveCrudService<MediaDev
         this.updateById(mediaDevice.getId(),mediaDevice)
             .filter(count->count==0)
             .flatMap(ignore->save(mediaDevice))
+            .doOnNext(ignore->redisCacheStorage.updateDevice(mediaDevice))
             .concatWith(
                 Mono.just(mediaDevice)
                     .flatMap(ignore-> registry.getDevice(mediaDevice.getId()))
                     .doOnNext(operator -> {
                         DeviceSession session = sessionManager.getSession(mediaDevice.getId());
                         if(session==null){
-                            SipSession sipSession = new SipSession(mediaDevice.getId(), operator, connector, Optional.of(mediaDevice.getExpires().longValue()).orElse(30L));
+                            SipSession sipSession = new SipSession(mediaDevice.getId(), operator, messageHandler, Optional.of(mediaDevice.getExpires().longValue()).orElse(30L));
 //                            SipSession sipSession = new SipSession(mediaDevice.getId(), operator, connector, 10L);
                             sipSession.onClose(()->{
                                 //设备下线
@@ -111,10 +109,10 @@ public class LocalMediaDeviceService extends GenericReactiveCrudService<MediaDev
                         registerMessage.setTimestamp(System.currentTimeMillis());
                         registerMessage.addHeader("deviceName","GB28181-2016 "+mediaDevice.getId());
                         registerMessage.addHeader("productId",mediaDevice.getProductId());
+
                         return eventBus.publish("/device/*/*/register",registerMessage)
-                            .mergeWith(eventBus.publish("/media/device/heart-beat/"+mediaDevice.getId(),mediaDevice)).then(Mono.just(1L));
+                            .doOnNext(ignore->heartBeat(mediaDevice));
                     })
-
                     .then(Mono.empty())
             )
             .doOnComplete(()->{

+ 37 - 29
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/ZLMHttpHookListener.java

@@ -32,9 +32,13 @@ import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
 import org.springframework.web.bind.annotation.*;
 import org.springframework.web.server.ServerWebExchange;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
 import reactor.core.publisher.Mono;
+import reactor.core.publisher.UnicastProcessor;
 
 import java.util.List;
+import java.util.function.Function;
 
 /**
  * @description:针对 ZLMediaServer的hook事件监听
@@ -54,13 +58,22 @@ public class ZLMHttpHookListener {
     private final ZLMMediaListManager zlmMediaListManager;
     private final LocalGbStreamService gbStreamService;
     private final StandaloneDeviceMessageBroker deviceMessageBroker;
-    //todo
-    private final String serverId="";
     private final UserSetup userSetup;
     private final SipCommander sipCommander;
     private final LocalPlayService localPlayService;
     private final LocalMediaDeviceChannelService mediaDeviceChannelService;
 
+    /**
+     * ZLM服务器心跳消息处理
+     */
+    private final UnicastProcessor<JSONObject> keepAliveProcessor=UnicastProcessor.create();
+    private final FluxSink<JSONObject> keepAliveSink=keepAliveProcessor.sink();
+
+    /**
+     * ZLM心跳上线消息处理
+     */
+    private final UnicastProcessor<JSONObject> startProcessor=UnicastProcessor.create();
+    private final FluxSink<JSONObject> startSink=keepAliveProcessor.sink();
     /**
      * 服务器定时上报时间,上报间隔可配置,默认10s上报一次
      *
@@ -72,18 +85,12 @@ public class ZLMHttpHookListener {
         if (log.isDebugEnabled()) {
             log.debug("[ ZLM HOOK ]on_server_keepalive API调用,参数:" + json.toString());
         }
-        String mediaServerId = json.getStr("mediaServerId");
-
-        List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(ZLMHttpHookSubscribe.HookType.on_server_keepalive);
-        if (subscribes != null  && subscribes.size() > 0) {
-            for (ZLMHttpHookSubscribe.Event subscribe : subscribes) {
-                subscribe.accept(null, json);
-            }
+        if(keepAliveProcessor.hasDownstreams()){
+            keepAliveSink.next(json);
         }
-        JSONObject ret = new JSONObject()
+        return Mono.just(ResponseEntity.ok(new JSONObject()
             .putOpt("code", 0)
-            .putOpt("msg", "success");
-        return Mono.just(ResponseEntity.ok(ret.toString()));
+            .putOpt("msg", "success").toString()));
     }
 
 //	/**
@@ -330,7 +337,7 @@ public class ZLMHttpHookListener {
                 }
             }else {
                 if (!"rtp".equals(app)){
-                    result=result.flatMap(ingore->
+                    result=result.flatMap(ignore->
                         mediaServerItemService.findById(mediaServerId)
                             .doOnNext(mediaServerItem -> {
                                 String type = OriginType.values()[item.getOriginType()].getType();
@@ -379,7 +386,7 @@ public class ZLMHttpHookListener {
                                 if (type != null) {
                                     // 发送流变化redis消息
                                     JSONObject jsonObject = new JSONObject()
-                                        .putOpt("serverId", serverId)
+                                        .putOpt("serverId", userSetup.getServerId())
                                         .putOpt("app", app)
                                         .putOpt("stream", streamId)
                                         .putOpt("register", regist)
@@ -395,18 +402,15 @@ public class ZLMHttpHookListener {
                 }
             }
         }
-
-        JSONObject ret = new JSONObject()
-            .putOpt("code", 0)
-            .putOpt("msg", "success");
-
         MediaMessageReply<MediaItem> mediaMessageReply = MediaMessageReply.of(null,item);
         mediaMessageReply.setMessageId(ZLMKeyGenerate.getStreamChangedKey(ZLMHttpHookSubscribe.HookType.on_stream_changed,mediaServerId,app,regist,streamId));
         mediaMessageReply.setSuccess(true);
        ;
         return    result
             .mergeWith(deviceMessageBroker.reply(mediaMessageReply).thenReturn(1L))
-            .then(Mono.just(ResponseEntity.ok(ret.toString())));
+            .then(Mono.just(ResponseEntity.ok( new JSONObject()
+                .putOpt("code", 0)
+                .putOpt("msg", "success").toString())));
     }
 
     /**
@@ -532,15 +536,19 @@ public class ZLMHttpHookListener {
         }
         String remoteAddr = exchange.getRequest().getRemoteAddress().getAddress().toString();
         jsonObject.putOpt("ip", remoteAddr);
-        JSONObject ret = new JSONObject()
-            .putOpt("code", 0)
-            .putOpt("msg", "success");
-        List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(ZLMHttpHookSubscribe.HookType.on_server_started);
-        if (subscribes != null  && subscribes.size() > 0) {
-            for (ZLMHttpHookSubscribe.Event subscribe : subscribes) {
-                subscribe.accept(null, jsonObject);
-            }
+        if(startProcessor.hasDownstreams()){
+            startSink.next(jsonObject);
         }
-        return Mono.just(ResponseEntity.ok(ret.toString()));
+        return Mono.just(ResponseEntity.ok(new JSONObject()
+            .putOpt("code", 0)
+            .putOpt("msg", "success").toString()));
+    }
+
+    public Flux<JSONObject> handleKeepAlive(){
+        return keepAliveProcessor.map(Function.identity());
+    }
+
+    public Flux<JSONObject> handleStart(){
+        return startProcessor.map(Function.identity());
     }
 }

+ 14 - 18
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/ZLMRunner.java

@@ -2,16 +2,12 @@ package org.jetlinks.community.media.zlm;
 
 import cn.hutool.core.util.StrUtil;
 import cn.hutool.json.JSONArray;
-import cn.hutool.json.JSONObject;
 import cn.hutool.json.JSONUtil;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.jetlinks.community.media.config.MediaConfig;
 import org.jetlinks.community.media.service.LocalMediaServerItemService;
 import org.jetlinks.community.media.zlm.entity.MediaServerItem;
-import org.jetlinks.core.event.EventBus;
-import org.jetlinks.core.event.Subscription;
-import org.jetlinks.core.event.TopicPayload;
 import org.springframework.boot.CommandLineRunner;
 import org.springframework.boot.autoconfigure.AutoConfigureAfter;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -31,11 +27,9 @@ public class ZLMRunner implements CommandLineRunner {
 
     private Map<String, Boolean> startGetMedia;
 
-
     private final ZLMRESTfulUtils zlmresTfulUtils;
 
-
-    private final ZLMHttpHookSubscribe hookSubscribe;
+    private final ZLMHttpHookListener httpHookListener;
 
     private final LocalMediaServerItemService mediaServerItemService;
 
@@ -50,7 +44,7 @@ public class ZLMRunner implements CommandLineRunner {
             .switchIfEmpty(mediaServerItemService.save(mediaConfig.getMediaSerItem()).thenReturn(mediaConfig.getMediaSerItem()))
             //清除在线服务器信息
             .mergeWith(mediaServerItemService.clearMediaServerForOnline().then(Mono.empty()))
-            .mergeWith(Mono.delay(Duration.ofSeconds(20)).flatMap(ignore->timeOutHandle()).then(Mono.empty()))
+            .mergeWith(Mono.delay(Duration.ofSeconds(20)).flatMap(ignore-> timeoutHandle()).then(Mono.empty()))
             //更新默认服务器配置
             .flatMap(defaultMedia->{
                 //判断默认服务器是否发生变动
@@ -77,7 +71,7 @@ public class ZLMRunner implements CommandLineRunner {
             //20s进行超时处理
 //            .delayElement(Duration.ofSeconds(20))
 
-//            .flatMap(ignore->timeOutHandle())
+//            .flatMap(ignore->timeoutHandle())
 
             .subscribe();
     }
@@ -86,7 +80,7 @@ public class ZLMRunner implements CommandLineRunner {
      * 连接超时处理
      * @return  Mono
      */
-    private Mono<Void> timeOutHandle(){
+    private Mono<Void> timeoutHandle(){
         if (startGetMedia != null) {
             Set<String> allZlmId = startGetMedia.keySet();
             for (String id : allZlmId) {
@@ -105,16 +99,17 @@ public class ZLMRunner implements CommandLineRunner {
      * @return Mono
      */
     private void subscribeOnServerStarted(){
-        hookSubscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_server_started,null,
-            (MediaServerItem mediaServerItem, JSONObject response)->{
+        httpHookListener.handleStart()
+            .flatMap(response->{
                 ZLMServerConfig zlmServerConfig = JSONUtil.toBean(response, ZLMServerConfig.class);
                 if (zlmServerConfig !=null ) {
                     if (startGetMedia != null) {
                         startGetMedia.remove(zlmServerConfig.getGeneralMediaServerId());
                     }
-                    mediaServerItemService.zlmServerOnline(zlmServerConfig).subscribe();
+                    return mediaServerItemService.zlmServerOnline(zlmServerConfig);
                 }
-            });
+                return Mono.empty();
+            }).subscribe();
     }
 
     /**
@@ -123,13 +118,14 @@ public class ZLMRunner implements CommandLineRunner {
      */
     private void subscribeOnServerKeepalive(){
         // 订阅 zlm保活事件, 当zlm离线时做业务的处理
-        hookSubscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_server_keepalive,null,
-            (MediaServerItem mediaServerItem,JSONObject response)->{
+        httpHookListener.handleKeepAlive()
+            .flatMap(response->{
                 String mediaServerId = response.getStr("mediaServerId");
                 if (mediaServerId !=null ) {
-                    mediaServerItemService.updateMediaServerKeepalive(mediaServerId, response.getJSONObject("data")).subscribe();
+                   return  mediaServerItemService.updateMediaServerKeepalive(mediaServerId, response.getJSONObject("data"));
                 }
-            });
+                return Mono.empty();
+            }).subscribe();
     }
 
     /**

+ 0 - 20
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/zlm/dto/ZLMRunInfo.java

@@ -1,20 +0,0 @@
-package org.jetlinks.community.media.zlm.dto;
-
-import lombok.Data;
-
-/**
- * 记录zlm运行中一些参数
- */
-@Data
-public class ZLMRunInfo {
-
-    /**
-     * zlm当前流数量
-     */
-    private int mediaCount;
-
-    /**
-     * 在线状态
-     */
-    private boolean online;
-}

+ 12 - 2
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/service/RuleInstanceService.java

@@ -56,7 +56,7 @@ public class RuleInstanceService extends GenericReactiveCrudService<RuleInstance
             }
         };
         clusterServer.handleClusterMsg()
-            .flatMap(this::doStart).subscribe();
+            .flatMap(msg->this.doStart(msg,false)).subscribe();
     }
 
     public Mono<PagerResult<RuleEngineExecuteEventInfo>> queryExecuteEvent(QueryParam queryParam) {
@@ -92,11 +92,21 @@ public class RuleInstanceService extends GenericReactiveCrudService<RuleInstance
     }
 
     private Mono<Void> doStart(RuleInstanceEntity entity) {
+        return doStart(entity,true);
+    }
+
+    /**
+     * 是否在集群中进行广播
+     * @param entity
+     * @param broadcast
+     * @return
+     */
+    private Mono<Void> doStart(RuleInstanceEntity entity,boolean broadcast) {
         return Mono.defer(() -> {
             RuleModel model = entity.toRule(modelParser);
             return ruleEngine
                 .startRule(entity.getId(), model)
-                .concatWith(clusterServer.sendClusterMsg(entity).then(Mono.empty()))
+                .concatWith(Mono.just(broadcast).filter(Boolean.TRUE::equals).flatMap(ignore->clusterServer.sendClusterMsg(entity).then(Mono.empty())))
                 .then(createUpdate()
                     .set(RuleInstanceEntity::getState, RuleInstanceState.started)
                     .where(entity::getId)