Ver código fonte

add
mqtt信息发送

lifang 3 semanas atrás
pai
commit
22e73e8a19

+ 1 - 1
nb-admin/src/main/resources/application-prod.yml

@@ -223,7 +223,7 @@ mqtt:
   connect-options:
     clean-session: true
     connection-timeout: 30
-    keep-alive-interval: 60
+    keep-alive-interval: 10
     automatic-reconnect: true
 knife4j:
   enable: true

+ 2 - 13
nb-core/src/main/java/com/nb/core/config/MqttConfig.java

@@ -1,5 +1,6 @@
 package com.nb.core.config;
 
+import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
@@ -16,18 +17,6 @@ import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  */
 @Configuration
 public class MqttConfig {
-    
-    /**
-     * 创建MQTT配置属性Bean
-     *
-     * @return MqttProperties
-     */
-    @Bean
-    @ConditionalOnMissingBean
-    public MqttProperties mqttProperties() {
-        return new MqttProperties();
-    }
-
     /**
      * 创建默认的MQTT客户端工厂
      * 当容器中没有MqttPahoClientFactory时创建
@@ -36,7 +25,7 @@ public class MqttConfig {
      * @return MqttPahoClientFactory
      */
     @Bean
-    @ConditionalOnMissingBean(MqttPahoClientFactory.class)
+    @ConditionalOnBean(MqttProperties.class)
     public MqttPahoClientFactory mqttClientFactory(MqttProperties mqttProperties) {
         DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
         MqttConnectOptions options = new MqttConnectOptions();

+ 7 - 7
nb-core/src/main/java/com/nb/core/config/MqttProperties.java

@@ -17,23 +17,23 @@ public class MqttProperties {
     /**
      * 客户端配置
      */
-    private Client client = new Client();
+    private Client client = null;
     
     /**
      * 代理配置
      */
-    private Broker broker = new Broker();
+    private Broker broker = null;
     
     /**
      * 连接选项
      */
-    private ConnectOptions connectOptions = new ConnectOptions();
+    private ConnectOptions connectOptions = null;
     
     public static class Client {
         /**
          * 客户端ID
          */
-        private String id = "nb-netpump-client";
+        private String id ;
         
         public String getId() {
             return id;
@@ -48,17 +48,17 @@ public class MqttProperties {
         /**
          * 代理URL
          */
-        private String url = "tcp://iot.tuoren.com:1883";
+        private String url ;
         
         /**
          * 用户名
          */
-        private String username = "hospital";
+        private String username;
         
         /**
          * 密码
          */
-        private String password = "1qaz!QAZ";
+        private String password;
         
         public String getUrl() {
             return url;

+ 1 - 0
nb-core/src/main/java/com/nb/core/utils/MqttClientUtil.java

@@ -52,6 +52,7 @@ public class MqttClientUtil implements ApplicationContextAware {
         try {
             mqttClientFactory = applicationContext.getBean(MqttPahoClientFactory.class);
         } catch (Exception e) {
+            mqttClientFactory=null;
             // 忽略异常,如果获取不到工厂则无法进行MQTT操作
         }
     }

+ 1 - 1
nb-service-api/web-service-api/src/main/java/com/nb/web/api/feign/result/AppHospitalVO.java

@@ -13,5 +13,5 @@ public class AppHospitalVO implements Serializable {
     @ApiModelProperty("医院名称")
     private String name;
 
-    private String uerId;
+    private String userId;
 }

+ 9 - 1
nb-service/iot-service/src/main/java/com/nb/aliyun/service/consumer/NBAndFourGConsumerGroupService.java

@@ -127,7 +127,7 @@ public class NBAndFourGConsumerGroupService extends AbstractAliConsumer {
     private final static ExecutorService executorService = new ThreadPoolExecutor(
             Runtime.getRuntime().availableProcessors(),
             Runtime.getRuntime().availableProcessors(), 60, TimeUnit.SECONDS,
-            new LinkedBlockingQueue(50000),
+            new LinkedBlockingQueue(300),
             new AliYunThreadFactory());
 
 
@@ -164,6 +164,14 @@ public class NBAndFourGConsumerGroupService extends AbstractAliConsumer {
             //1.收到消息之后一定要ACK。
             ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService;
             int queueSize = threadPoolExecutor.getQueue().size();
+            
+            // 实现过载保护机制:当队列中待处理的任务数量超过阈值时,将消息重新返回队列
+            if (queueSize > 50) {
+                log.warn("消息队列已满,当前待处理任务数量: {},将消息重新返回队列", queueSize);
+                // 不进行acknowledge操作,让消息重新入队
+                return;
+            }
+            
             log.info("当前消息队列中待处理的任务数量: {}", queueSize);
             executorService.submit(()-> {
                 try {