lifang преди 3 седмици
родител
ревизия
eb19b75135

+ 1 - 1
nb-admin/src/main/resources/application-prod.yml

@@ -223,7 +223,7 @@ mqtt:
   connect-options:
     clean-session: true
     connection-timeout: 30
-    keep-alive-interval: 60
+    keep-alive-interval: 10
     automatic-reconnect: true
 knife4j:
   enable: true

+ 2 - 13
nb-core/src/main/java/com/nb/core/config/MqttConfig.java

@@ -1,5 +1,6 @@
 package com.nb.core.config;
 
+import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
@@ -16,18 +17,6 @@ import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  */
 @Configuration
 public class MqttConfig {
-    
-    /**
-     * 创建MQTT配置属性Bean
-     *
-     * @return MqttProperties
-     */
-    @Bean
-    @ConditionalOnMissingBean
-    public MqttProperties mqttProperties() {
-        return new MqttProperties();
-    }
-
     /**
      * 创建默认的MQTT客户端工厂
      * 当容器中没有MqttPahoClientFactory时创建
@@ -36,7 +25,7 @@ public class MqttConfig {
      * @return MqttPahoClientFactory
      */
     @Bean
-    @ConditionalOnMissingBean(MqttPahoClientFactory.class)
+    @ConditionalOnBean(MqttProperties.class)
     public MqttPahoClientFactory mqttClientFactory(MqttProperties mqttProperties) {
         DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
         MqttConnectOptions options = new MqttConnectOptions();

+ 7 - 7
nb-core/src/main/java/com/nb/core/config/MqttProperties.java

@@ -17,23 +17,23 @@ public class MqttProperties {
     /**
      * 客户端配置
      */
-    private Client client = new Client();
+    private Client client = null;
     
     /**
      * 代理配置
      */
-    private Broker broker = new Broker();
+    private Broker broker = null;
     
     /**
      * 连接选项
      */
-    private ConnectOptions connectOptions = new ConnectOptions();
+    private ConnectOptions connectOptions = null;
     
     public static class Client {
         /**
          * 客户端ID
          */
-        private String id = "nb-netpump-client";
+        private String id ;
         
         public String getId() {
             return id;
@@ -48,17 +48,17 @@ public class MqttProperties {
         /**
          * 代理URL
          */
-        private String url = "tcp://iot.tuoren.com:1883";
+        private String url ;
         
         /**
          * 用户名
          */
-        private String username = "hospital";
+        private String username;
         
         /**
          * 密码
          */
-        private String password = "1qaz!QAZ";
+        private String password;
         
         public String getUrl() {
             return url;

+ 1 - 0
nb-core/src/main/java/com/nb/core/utils/MqttClientUtil.java

@@ -52,6 +52,7 @@ public class MqttClientUtil implements ApplicationContextAware {
         try {
             mqttClientFactory = applicationContext.getBean(MqttPahoClientFactory.class);
         } catch (Exception e) {
+            mqttClientFactory=null;
             // 忽略异常,如果获取不到工厂则无法进行MQTT操作
         }
     }

+ 1 - 1
nb-service-api/web-service-api/src/main/java/com/nb/web/api/feign/result/AppHospitalVO.java

@@ -13,5 +13,5 @@ public class AppHospitalVO implements Serializable {
     @ApiModelProperty("医院名称")
     private String name;
 
-    private String uerId;
+    private String userId;
 }

+ 83 - 12
nb-service/iot-service/src/main/java/com/nb/aliyun/service/consumer/NBAndFourGConsumerGroupService.java

@@ -7,6 +7,7 @@ import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 import com.baomidou.mybatisplus.core.toolkit.IdWorker;
 import com.nb.aliyun.service.pojo.AliIotConsumerPojo;
+import com.nb.core.exception.CustomException;
 import com.nb.web.api.bean.AliIotConfig;
 import com.nb.web.api.entity.BusDeviceEntity;
 import com.nb.web.api.entity.BusHospitalLogEntity;
@@ -19,8 +20,12 @@ import com.nb.core.utils.ExceptionUtil;
 import com.nb.web.api.feign.IDeviceClient;
 import com.nb.web.api.utils.Items;
 import lombok.extern.slf4j.Slf4j;
+
+import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageListener;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -46,6 +51,12 @@ public class NBAndFourGConsumerGroupService extends AbstractAliConsumer {
     private static final String MESSAGEID = "messageId";
     private static final String STATUS = "status";
 
+    // 批量提交日志的定时任务间隔(毫秒)
+    private static final long BATCH_COMMIT_INTERVAL = 5000; // 5秒
+    
+    // 批量提交日志的最大数量
+    private static final int BATCH_MAX_SIZE = 100;
+
 
 
     private IDeviceClient deviceService;
@@ -54,12 +65,47 @@ public class NBAndFourGConsumerGroupService extends AbstractAliConsumer {
 
 
     private IHospitalLogClient hospitalLogService;
+    
+    // 日志批量存储队列
+    private final BlockingQueue<BusHospitalLogEntity> logQueue = new LinkedBlockingQueue<>(10000);
+    
+    // 批量提交日志的调度器
+    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
 
     public NBAndFourGConsumerGroupService(AliIotSubscribeClient client,AliIotConsumerPojo consumer) {
         super(client,consumer);
         this.deviceService= SpringUtil.getBean(IDeviceClient.class);
         this.iotMsgHandler= SpringUtil.getBean(IIotMsgHandler.class);
         this.hospitalLogService= SpringUtil.getBean(IHospitalLogClient.class);
+        
+        // 启动定时批量提交任务
+        startBatchCommitTask();
+    }
+    
+    /**
+     * 启动定时批量提交日志任务
+     */
+    private void startBatchCommitTask() {
+        scheduledExecutorService.scheduleWithFixedDelay(this::batchCommitLogs, 
+                BATCH_COMMIT_INTERVAL, BATCH_COMMIT_INTERVAL, TimeUnit.MILLISECONDS);
+    }
+    
+    /**
+     * 批量提交日志到数据库
+     */
+    private void batchCommitLogs() {
+        List<BusHospitalLogEntity> logsToCommit = new ArrayList<>();
+        int drainedCount = logQueue.drainTo(logsToCommit, BATCH_MAX_SIZE);
+        
+        if (drainedCount > 0) {
+            log.info("开始批量提交 {} 条日志数据", drainedCount);
+            long startTime = System.currentTimeMillis();
+            hospitalLogService.batchSave(logsToCommit);
+            // 由于 IHospitalLogClient 只提供了单条保存接口,所以仍然需要逐条保存
+            // 在实际生产环境中,建议提供批量保存接口以进一步优化性能
+            long endTime = System.currentTimeMillis();
+            log.info("完成批量提交 {} 条日志数据,耗时 {} ms", drainedCount, (endTime - startTime));
+        }
     }
 
 
@@ -80,8 +126,8 @@ public class NBAndFourGConsumerGroupService extends AbstractAliConsumer {
      */
     private final static ExecutorService executorService = new ThreadPoolExecutor(
             Runtime.getRuntime().availableProcessors(),
-            Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,
-            new LinkedBlockingQueue(50000),
+            Runtime.getRuntime().availableProcessors(), 60, TimeUnit.SECONDS,
+            new LinkedBlockingQueue(300),
             new AliYunThreadFactory());
 
 
@@ -116,18 +162,34 @@ public class NBAndFourGConsumerGroupService extends AbstractAliConsumer {
     private MessageListener messageListener = (message) -> {
         try {
             //1.收到消息之后一定要ACK。
-            // 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。
-            // 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。
-            // message.acknowledge();
-            //2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。
-            // 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。
-            executorService.submit(()-> processMessage(message));
+            ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService;
+            int queueSize = threadPoolExecutor.getQueue().size();
+            
+            // 实现过载保护机制:当队列中待处理的任务数量超过阈值时,将消息重新返回队列
+            if (queueSize > 50) {
+                log.warn("消息队列已满,当前待处理任务数量: {},将消息重新返回队列", queueSize);
+                // 不进行acknowledge操作,让消息重新入队
+                return;
+            }
+            
+            log.info("当前消息队列中待处理的任务数量: {}", queueSize);
+            executorService.submit(()-> {
+                try {
+                    processMessage(message);
+                }finally {
+                    try {
+                        message.acknowledge();
+                    } catch (JMSException e) {
+                        throw new RuntimeException(e);
+                    }
+                };
+            });
         } catch (Exception e) {
             log.error("{},submit task occurs exception ", this.getConsumer().getName(),e);
         }
     };
 
-    private void processMessage(Message message) {
+    public void processMessage(Message message) {
         BusHospitalLogEntity hospitalLog = new BusHospitalLogEntity();
         long startTime = System.currentTimeMillis();
         String deviceName=null;
@@ -175,7 +237,6 @@ public class NBAndFourGConsumerGroupService extends AbstractAliConsumer {
                     // 创建设备
                     BusDeviceEntity device = new BusDeviceEntity();
                     device.setDeviceId(deviceName);
-
                     // 配置信息
                     AliIotConfig config = new AliIotConfig();
                     config.setDeviceName(deviceName);
@@ -195,6 +256,8 @@ public class NBAndFourGConsumerGroupService extends AbstractAliConsumer {
                 log.warn("阿里云数据【{}】,未知的topic:【{}】",content.toJSONString(),topic);
             }
             hospitalLog.setSuccess(true);
+        } catch (CustomException c){
+            c.printStackTrace();
         } catch (Exception e) {
             hospitalLog.setSuccess(false);
             hospitalLog.setMessage(ExceptionUtil.getExceptionMsg(e));
@@ -210,9 +273,17 @@ public class NBAndFourGConsumerGroupService extends AbstractAliConsumer {
             if(CharSequenceUtil.isEmpty(hospitalLog.getTenantId())){
                 log.warn("日志【{}】医院为空,进行自动填充",JSONUtil.toJsonStr(hospitalLog));
             }
-            hospitalLogService.save(hospitalLog);
+            // 改为异步批量提交日志
+            try {
+                if (!logQueue.offer(hospitalLog, 100, TimeUnit.MILLISECONDS)) {
+                    log.warn("日志队列已满,丢弃日志: {}", hospitalLog);
+                }
+            } catch (InterruptedException e) {
+                log.error("添加日志到队列被中断: {}", hospitalLog, e);
+                Thread.currentThread().interrupt();
+            }
         }
     }
 
 
-}
+}