فهرست منبع

fix 服务器重启时阿里云订阅延迟启动,给ws重连时间

18339543638 3 سال پیش
والد
کامیت
4d78a1f64a

+ 0 - 1
nb-common/ws-common/src/main/java/com/nb/common/websocket/DefaultWebSocketMsgHandler.java

@@ -69,7 +69,6 @@ public class DefaultWebSocketMsgHandler implements IWsMsgHandler {
 
     @Override
     public Object onClose(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) {
-        messageHandlers.forEach(wsHandler -> wsHandler.close(channelContext));
         return null;
     }
 

+ 20 - 7
nb-service/iot-service/src/main/java/com/nb/aliyun/service/AliyunConsumerGroupService.java

@@ -16,6 +16,7 @@ import com.nb.web.api.utils.EnumUtils;
 import com.nb.core.utils.ExceptionUtil;
 import com.nb.web.api.feign.IDeviceClient;
 import com.nb.web.api.utils.Items;
+import io.netty.util.concurrent.SingleThreadEventExecutor;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
@@ -58,9 +59,16 @@ public class AliyunConsumerGroupService {
     @Autowired
     private IIotMsgHandler iotMsgHandler;
 
+    /**
+     * 阿里云订阅延迟启动,给ws重连时间
+     */
+    @Value("${tio.websocket.server.heartbeat-timeout:0}")
+    private Long delayTime;
+
     @Autowired
     @Lazy
     private IHospitalLogClient hospitalLogService;
+
     @Value("${aliyun.server-subscription.enable:false}")
     private boolean isEnable;
 
@@ -72,13 +80,18 @@ public class AliyunConsumerGroupService {
             return;
         }
         log.info("允许开启订阅");
-        try {
-            // 开启订阅
-            client.start(messageListener);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-        log.info("阿里云物联网订阅成功。。。。。。。。。。。");
+        // 开启订阅
+        Executors.newSingleThreadScheduledExecutor()
+                .schedule(()->{
+                            try {
+                                client.start(messageListener);
+                            } catch (Exception e) {
+                                e.printStackTrace();
+                            }
+                            log.info("阿里云物联网订阅成功。。。。。。。。。。。");
+                        },
+                        Optional.ofNullable(delayTime).orElse(0L)*2,TimeUnit.SECONDS ) ;
+
     }
 
 

+ 15 - 4
nb-service/web-service/src/main/java/com/nb/web/service/bus/listener/DeviceInfoListener.java

@@ -36,6 +36,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Supplier;
 
 /**
@@ -200,10 +201,11 @@ public class DeviceInfoListener implements IIotMsgHandler {
     private void handleIntelligent(BusDeviceRunningEntity device, DeviceOperator deviceOperator, List<Supplier<?>> cacheOperation) {
         //开机数据不进行加减档判定
         if (!DeviceStatusEnum.StartUp.equals(device.getRunState())) {
-            boolean flowLimit = deviceOperator.isFlowLimit();
+            AtomicBoolean flowLimit=new AtomicBoolean(deviceOperator.isFlowLimit());
+            BigDecimal lastContinueDose = deviceOperator.getContinueDose();
             if(FlowStatusEnum.None==device.getWarnFlow()
                     && Boolean.FALSE.equals(device.isNewInfusion())){
-                BigDecimal lastContinueDose = deviceOperator.getContinueDose();
+
                 if(lastContinueDose!=null){
                     if(CompareUtil.compare(device.getContinueDose(),lastContinueDose)>0){
                         log.info("消息【{}】,设备【{}】,当前流速【{}】,上一状态流速【{}】,加档",device.getMsgId(),device.getDeviceId(),device.getContinueDose(),lastContinueDose);
@@ -214,12 +216,21 @@ public class DeviceInfoListener implements IIotMsgHandler {
                     }
                 }
                 if(!FlowStatusEnum.Down.equals(device.getWarnFlow())){
-                    device.setWarnFlow(flowLimit?FlowStatusEnum.Limited:device.getWarnFlow());
+                    device.setWarnFlow(flowLimit.get()?FlowStatusEnum.Limited:device.getWarnFlow());
                 }
             }
+
+            //上限值调整
+            if (FlowStatusEnum.Limited.equals(device.getWarnFlow())) {
+                if(CompareUtil.compare(device.getContinueDose(),device.getFlowUpLimit())<0){
+                    flowLimit.set(false);
+                    device.setWarnFlow(FlowStatusEnum.None);
+                }
+            }
+
             cacheOperation.add(()->{
                 deviceOperator.setContinueDose(device.getContinueDose());
-                if(flowLimit&&Boolean.FALSE.equals(device.isNewInfusion())){
+                if(flowLimit.get()&&Boolean.FALSE.equals(device.isNewInfusion())){
                     //同一输注过程中,只有减档才能够消除加档受限
                     if(FlowStatusEnum.Down.equals(device.getWarnFlow())){
                         deviceOperator.setFlowLimit(false);