Jelajahi Sumber

add
mqtt信息发送

lifang 4 minggu lalu
induk
melakukan
a28e5531b2
17 mengubah file dengan 522 tambahan dan 99 penghapusan
  1. 1 0
      nb-auth/src/main/java/com/nb/auth/sa/SaTokenConfig.java
  2. 38 0
      nb-core/src/main/java/com/nb/core/config/MqttHandlerConfig.java
  3. 51 0
      nb-core/src/main/java/com/nb/core/controller/MqttMessageController.java
  4. 7 1
      nb-core/src/main/java/com/nb/core/entity/MqttMessage.java
  5. 16 0
      nb-core/src/main/java/com/nb/core/enums/MqttTopicEnum.java
  6. 44 0
      nb-core/src/main/java/com/nb/core/handler/AbstractMqttMessageHandler.java
  7. 26 0
      nb-core/src/main/java/com/nb/core/handler/MqttMessageHandler.java
  8. 130 0
      nb-core/src/main/java/com/nb/core/handler/MqttMessageHandlerManager.java
  9. 64 0
      nb-core/src/main/java/com/nb/core/service/MqttMessageProcessService.java
  10. 49 79
      nb-core/src/main/java/com/nb/core/utils/MqttClientUtil.java
  11. 5 3
      nb-core/src/test/java/com/nb/core/utils/MqttClientUtilTest.java
  12. 2 0
      nb-service-api/web-service-api/src/main/java/com/nb/web/api/feign/IHospitalClient.java
  13. 49 14
      nb-service/web-service/src/main/java/com/nb/web/service/bus/service/LocalBusEvaluationService.java
  14. 6 1
      nb-service/web-service/src/main/java/com/nb/web/service/bus/service/LocalBusHospitalService.java
  15. 14 0
      nb-service/web-service/src/main/java/com/nb/web/service/bus/service/LocalBusInfusionHistoryService.java
  16. 4 1
      nb-service/web-service/src/main/java/com/nb/web/service/bus/service/dto/ManualUndoConfig.java
  17. 16 0
      nb-service/web-service/src/main/java/com/nb/web/service/bus/service/dto/MqttUndoDTO.java

+ 1 - 0
nb-auth/src/main/java/com/nb/auth/sa/SaTokenConfig.java

@@ -70,6 +70,7 @@ public class SaTokenConfig {
         IGNORE_URL.add("/bus/device/info/hospitalCode");
         IGNORE_URL.add("/bus/device/info/hospitalCodeAndAlias");
         IGNORE_URL.add("/iot/**");
+        IGNORE_URL.add("/mqtt/**");
     }
 
 }

+ 38 - 0
nb-core/src/main/java/com/nb/core/config/MqttHandlerConfig.java

@@ -0,0 +1,38 @@
+package com.nb.core.config;
+
+import com.nb.core.handler.MqttMessageHandlerManager;
+import org.springframework.context.annotation.Configuration;
+
+import javax.annotation.PostConstruct;
+
+/**
+ * MQTT处理器配置类
+ * 确保处理器管理器正确初始化
+ *
+ * @author YourName
+ * @version 1.0.0
+ */
+@Configuration
+public class MqttHandlerConfig {
+    
+    private final MqttMessageHandlerManager handlerManager;
+    
+    public MqttHandlerConfig(MqttMessageHandlerManager handlerManager) {
+        this.handlerManager = handlerManager;
+    }
+    
+    @PostConstruct
+    public void init() {
+        // 确保处理器管理器已正确初始化
+        if (MqttMessageHandlerManager.getInstance() == null) {
+            try {
+                // 通过反射设置实例(备用方案)
+                java.lang.reflect.Field instanceField = MqttMessageHandlerManager.class.getDeclaredField("instance");
+                instanceField.setAccessible(true);
+                instanceField.set(null, handlerManager);
+            } catch (Exception e) {
+                // 忽略异常
+            }
+        }
+    }
+}

+ 51 - 0
nb-core/src/main/java/com/nb/core/controller/MqttMessageController.java

@@ -0,0 +1,51 @@
+package com.nb.core.controller;
+
+import com.nb.core.entity.MqttMessage;
+import com.nb.core.result.R;
+import com.nb.core.service.MqttMessageProcessService;
+import com.nb.core.utils.MqttClientUtil;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.web.bind.annotation.*;
+
+/**
+ * MQTT消息控制器
+ * 提供REST API用于测试和管理MQTT消息处理
+ *
+ * @author YourName
+ * @version 1.0.0
+ */
+@Slf4j
+@RestController
+@RequestMapping("/mqtt")
+@AllArgsConstructor
+public class MqttMessageController {
+    
+    private final MqttMessageProcessService messageProcessService;
+    private final MqttClientUtil mqttClientUtil;
+
+    
+    /**
+     * 处理MQTT消息
+     * @param message MQTT消息
+     * @return 处理结果
+     */
+    @PostMapping("/process")
+    public R<String> processMessage(@RequestBody MqttMessage message) {
+        try {
+            //订阅和发布只能同时存在一个
+            if(mqttClientUtil.isOnline()){
+                //发布存在则就不接受订阅消息
+                return R.fail("本节点是客户端节点,不予处理订阅消息");
+            }
+            String clientId = message.getClientId();
+            if(MqttClientUtil.CLIENT_ID.equals(clientId)){
+                return R.fail("本节点发出的数据不予处理");
+            }
+            messageProcessService.processMessage(message);
+            return R.success("消息处理成功");
+        } catch (Exception e) {
+            return R.fail("处理MQTT消息失败"+ e.getMessage());
+        }
+    }
+}

+ 7 - 1
nb-core/src/main/java/com/nb/core/entity/MqttMessage.java

@@ -1,7 +1,12 @@
 package com.nb.core.entity;
 
+import com.nb.core.handler.MqttMessageHandlerManager;
 import lombok.Data;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
 
+import java.io.Serializable;
 import java.util.Date;
 import java.util.Map;
 
@@ -13,7 +18,8 @@ import java.util.Map;
  * @version 1.0.0
  */
 @Data
-public class MqttMessage {
+public class MqttMessage implements Serializable {
+    private static final long serialVersionUID = 1L;
     /**
      * 主题
      */

+ 16 - 0
nb-core/src/main/java/com/nb/core/enums/MqttTopicEnum.java

@@ -0,0 +1,16 @@
+package com.nb.core.enums;
+
+/**
+ * 数据下发时的mqtt主题类
+ */
+public interface MqttTopicEnum {
+    /**
+     * 评价信息下发
+     */
+    String BUS_EVALUATION = "bus/evaluation";
+
+    /**
+     * 撤泵信息下发
+     */
+    String BUS_UNDO = "bus/undo";
+}

+ 44 - 0
nb-core/src/main/java/com/nb/core/handler/AbstractMqttMessageHandler.java

@@ -0,0 +1,44 @@
+package com.nb.core.handler;
+
+import com.nb.core.entity.MqttMessage;
+import org.springframework.util.AntPathMatcher;
+
+/**
+ * 抽象MQTT消息处理器
+ * 提供基于主题模式匹配的基础实现
+ *
+ * @author YourName
+ * @version 1.0.0
+ */
+public abstract class AbstractMqttMessageHandler implements MqttMessageHandler {
+    
+    private static final AntPathMatcher pathMatcher = new AntPathMatcher();
+    
+    /**
+     * 定义支持的主题模式
+     * 子类需要实现此方法返回支持的主题模式
+     * @return 支持的主题模式
+     */
+    protected abstract String getSupportedTopicPattern();
+    
+    /**
+     * 判断是否支持处理指定主题的消息
+     * @param topic MQTT主题
+     * @return 是否支持处理
+     */
+    @Override
+    public boolean supports(String topic) {
+        String pattern = getSupportedTopicPattern();
+        if (pattern == null || topic == null) {
+            return false;
+        }
+        return pathMatcher.match(pattern, topic);
+    }
+    
+    /**
+     * 处理MQTT消息
+     * @param message MQTT消息实体
+     */
+    @Override
+    public abstract void handle(MqttMessage message);
+}

+ 26 - 0
nb-core/src/main/java/com/nb/core/handler/MqttMessageHandler.java

@@ -0,0 +1,26 @@
+package com.nb.core.handler;
+
+import com.nb.core.entity.MqttMessage;
+
+/**
+ * MQTT消息处理器接口
+ * 定义处理MQTT消息的标准方法
+ *
+ * @author YourName
+ * @version 1.0.0
+ */
+public interface MqttMessageHandler {
+    
+    /**
+     * 处理MQTT消息
+     * @param message MQTT消息实体
+     */
+    void handle(MqttMessage message);
+    
+    /**
+     * 判断是否支持处理指定主题的消息
+     * @param topic MQTT主题
+     * @return 是否支持处理
+     */
+    boolean supports(String topic);
+}

+ 130 - 0
nb-core/src/main/java/com/nb/core/handler/MqttMessageHandlerManager.java

@@ -0,0 +1,130 @@
+package com.nb.core.handler;
+
+import com.nb.core.entity.MqttMessage;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * MQTT消息处理器管理器
+ * 负责管理和调度不同的消息处理器
+ *
+ * @author YourName
+ * @version 1.0.0
+ */
+@Slf4j
+@Component
+public class MqttMessageHandlerManager {
+    
+    // 添加单例实例引用
+    private static MqttMessageHandlerManager instance;
+    
+    @Autowired(required = false)
+    private List<MqttMessageHandler> handlers = new CopyOnWriteArrayList<>();
+    
+    private final Map<String, MqttMessageHandler> topicHandlerMap = new ConcurrentHashMap<>();
+    
+    private final ExecutorService executorService = Executors.newFixedThreadPool(10);
+    
+    @PostConstruct
+    public void init() {
+        instance = this;
+        log.info("MQTT消息处理器管理器初始化完成,已注册{}个处理器", handlers.size());
+    }
+    
+    /**
+     * 获取单例实例
+     * @return MqttMessageHandlerManager实例
+     */
+    public static MqttMessageHandlerManager getInstance() {
+        return instance;
+    }
+    
+    /**
+     * 处理MQTT消息
+     * 根据主题找到对应的处理器并处理消息
+     * @param message MQTT消息实体
+     */
+    public void handleMessage(MqttMessage message) {
+        if (message == null || message.getTopic() == null) {
+            log.warn("接收到无效的MQTT消息: {}", message);
+            return;
+        }
+        
+        String topic = message.getTopic();
+        
+        // 先从缓存中查找处理器
+        MqttMessageHandler handler = topicHandlerMap.get(topic);
+        if (handler != null) {
+            executeHandler(handler, message);
+            return;
+        }
+        
+        // 遍历所有处理器查找支持该主题的处理器
+        for (MqttMessageHandler messageHandler : handlers) {
+            if (messageHandler.supports(topic)) {
+                topicHandlerMap.put(topic, messageHandler);
+                executeHandler(messageHandler, message);
+                return;
+            }
+        }
+        
+        log.warn("未找到处理主题[{}]的处理器", topic);
+    }
+    
+    /**
+     * 异步执行处理器
+     * @param handler 消息处理器
+     * @param message MQTT消息
+     */
+    private void executeHandler(MqttMessageHandler handler, MqttMessage message) {
+        executorService.submit(() -> {
+            try {
+                handler.handle(message);
+            } catch (Exception e) {
+                log.error("处理MQTT消息时发生错误,主题: {}, 内容: {}", 
+                         message.getTopic(), message.getPayload(), e);
+            }
+        });
+    }
+    
+    /**
+     * 注册消息处理器
+     * @param handler 消息处理器
+     */
+    public void registerHandler(MqttMessageHandler handler) {
+        if (handler != null) {
+            handlers.add(handler);
+            log.info("注册MQTT消息处理器: {}", handler.getClass().getSimpleName());
+        }
+    }
+    
+    /**
+     * 移除消息处理器
+     * @param handler 消息处理器
+     */
+    public void unregisterHandler(MqttMessageHandler handler) {
+        if (handler != null) {
+            handlers.remove(handler);
+            // 清理缓存中的映射
+            topicHandlerMap.entrySet().removeIf(entry -> entry.getValue() == handler);
+            log.info("移除MQTT消息处理器: {}", handler.getClass().getSimpleName());
+        }
+    }
+    
+    /**
+     * 获取当前注册的处理器数量
+     * @return 处理器数量
+     */
+    public int getHandlerCount() {
+        return handlers.size();
+    }
+}

+ 64 - 0
nb-core/src/main/java/com/nb/core/service/MqttMessageProcessService.java

@@ -0,0 +1,64 @@
+package com.nb.core.service;
+
+import com.nb.core.entity.MqttMessage;
+import com.nb.core.handler.MqttMessageHandlerManager;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+
+/**
+ * MQTT消息处理服务
+ * 提供统一的消息处理接口
+ *
+ * @author YourName
+ * @version 1.0.0
+ */
+@Slf4j
+@Service
+public class MqttMessageProcessService {
+    
+    private final MqttMessageHandlerManager handlerManager;
+    
+    public MqttMessageProcessService(MqttMessageHandlerManager handlerManager) {
+        this.handlerManager = handlerManager;
+    }
+    
+    /**
+     * 处理MQTT消息
+     * @param message MQTT消息
+     */
+    public void processMessage(MqttMessage message) {
+        if (message == null) {
+            log.warn("尝试处理空的MQTT消息");
+            return;
+        }
+        
+        try {
+            handlerManager.handleMessage(message);
+        } catch (Exception e) {
+            log.error("处理MQTT消息时发生异常,主题: {}, 内容: {}", 
+                     message.getTopic(), message.getPayload(), e);
+        }
+    }
+    
+    /**
+     * 根据主题和内容创建并处理MQTT消息
+     * @param topic 主题
+     * @param payload 消息内容
+     */
+    public void processMessage(String topic, String payload) {
+        MqttMessage message = new MqttMessage(topic, payload);
+        processMessage(message);
+    }
+    
+    /**
+     * 根据主题和内容创建并处理MQTT消息
+     * @param topic 主题
+     * @param payload 消息内容
+     * @param clientId 客户端ID
+     */
+    public void processMessage(String topic, String payload, String clientId) {
+        MqttMessage message = new MqttMessage(topic, payload);
+        message.setClientId(clientId);
+        processMessage(message);
+    }
+}

+ 49 - 79
nb-core/src/main/java/com/nb/core/utils/MqttClientUtil.java

@@ -1,8 +1,8 @@
 package com.nb.core.utils;
 
+import cn.hutool.core.util.ObjectUtil;
 import cn.hutool.json.JSONUtil;
 import com.nb.core.entity.MqttMessage;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.integration.channel.DirectChannel;
 import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
 import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
@@ -14,6 +14,7 @@ import org.springframework.messaging.MessagingException;
 import org.springframework.context.ApplicationContext;
 import org.springframework.context.ApplicationContextAware;
 import org.springframework.stereotype.Component;
+import org.springframework.scheduling.annotation.Async;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
@@ -30,22 +31,23 @@ import java.util.function.Consumer;
  */
 @Component
 public class MqttClientUtil implements ApplicationContextAware {
-
+    public static final String CLIENT_ID = "nb-netpump"+"-"+System.currentTimeMillis();
     private ApplicationContext applicationContext;
 
     private final Map<String, MqttPahoMessageDrivenChannelAdapter> subscribers = new ConcurrentHashMap<>();
     
     private MqttPahoClientFactory mqttClientFactory;
-    
-    private String defaultClientId = "nb-netpump";
-    
+
     private MessageChannel mqttInputChannel;
-    
+
+    public Boolean isOnline(){
+        return ObjectUtil.isNotNull(mqttClientFactory);
+    }
+
     @PostConstruct
     public void init() {
         // 创建默认的消息通道
         mqttInputChannel = new DirectChannel();
-        
         // 尝试从Spring容器中获取MqttPahoClientFactory
         try {
             mqttClientFactory = applicationContext.getBean(MqttPahoClientFactory.class);
@@ -81,103 +83,60 @@ public class MqttClientUtil implements ApplicationContextAware {
      * @param topic 主题
      * @param payload 消息内容
      */
-    public void publish(String topic, String payload) {
-        publish(topic, payload, 0, false);
-    }
-    
-    /**
-     * 发布消息
-     * @param topic 主题
-     * @param payload 消息内容
-     * @param qos 服务质量等级 (0, 1, 2)
-     * @param retained 是否保留消息
-     */
-    public void publish(String topic, String payload, int qos, boolean retained) {
-        try {
-            // 创建消息实体
-            MqttMessage message = new MqttMessage(topic, payload);
-            message.setClientId(defaultClientId + "-pub-" + System.currentTimeMillis());
-            // 确保客户端工厂已初始化
-            if (mqttClientFactory == null) {
-                return;
-            }
-            
-            MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(defaultClientId + "-pub-" + System.currentTimeMillis(), mqttClientFactory);
-            messageHandler.setDefaultTopic(topic);
-            messageHandler.setDefaultQos(qos);
-            messageHandler.setDefaultRetained(retained);
-            // 确保处理器已初始化
-            messageHandler.afterPropertiesSet();
-            
-            org.springframework.messaging.Message<String> springMessage = 
-                org.springframework.messaging.support.MessageBuilder.withPayload(JSONUtil.toJsonStr(message)).build();
-            messageHandler.handleMessage(springMessage);
-        } catch (Exception e) {
-            throw new RuntimeException("发布MQTT消息失败: " + e.getMessage(), e);
-        }
+    public void publish(String hospitalCode,String topic, Object payload) {
+        publish(hospitalCode,topic, payload, 0, false);
     }
     
     /**
      * 发布消息到指定客户端
-     * @param clientId 客户端ID
      * @param topic 主题
      * @param payload 消息内容
      * @param qos 服务质量等级
      * @param retained 是否保留消息
      */
-    public void publish(String clientId, String topic, String payload, int qos, boolean retained) {
+    public void publish(String hospitalCode, String topic, Object payload, int qos, boolean retained) {
         try {
             // 创建消息实体
-            MqttMessage message = new MqttMessage(topic, payload);
-            message.setClientId(clientId + "-pub-" + System.currentTimeMillis());
+            MqttMessage message = new MqttMessage(topic, JSONUtil.toJsonStr( payload));
+            message.setClientId(CLIENT_ID);
             
             if (mqttClientFactory == null) {
                 return;
             }
             
-            MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId + "-pub-" + System.currentTimeMillis(), mqttClientFactory);
-            messageHandler.setDefaultTopic(topic);
+            MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(CLIENT_ID, mqttClientFactory);
+            messageHandler.setDefaultTopic("hospitalInfo/"+hospitalCode);
             messageHandler.setDefaultQos(qos);
             messageHandler.setDefaultRetained(retained);
             // 确保处理器已初始化
             messageHandler.afterPropertiesSet();
             
-            org.springframework.messaging.Message<String> springMessage = 
-                org.springframework.messaging.support.MessageBuilder.withPayload(payload).build();
+            org.springframework.messaging.Message<String> springMessage =
+                org.springframework.messaging.support.MessageBuilder.withPayload(JSONUtil.toJsonStr(message)).build();
             messageHandler.handleMessage(springMessage);
         } catch (Exception e) {
             throw new RuntimeException("发布MQTT消息失败: " + e.getMessage(), e);
         }
     }
+
     
     /**
      * 订阅主题
      * @param topic 主题
      * @param messageHandler 消息处理器
      */
-    public void subscribe(String topic, MessageHandler messageHandler) {
-        subscribe(defaultClientId + "-sub-" + System.currentTimeMillis(), topic, messageHandler);
-    }
-    
-    /**
-     * 订阅主题
-     * @param clientId 客户端ID
-     * @param topic 主题
-     * @param messageHandler 消息处理器
-     */
-    public void subscribe(String clientId, String topic, MessageHandler messageHandler) {
+    public void subscribe( String topic, MessageHandler messageHandler) {
         try {
             if (mqttClientFactory == null) {
                 return;
             }
             // 如果已经订阅了该主题,则先取消订阅
-            String key = clientId + ":" + topic;
-            if (subscribers.containsKey(key)) {
-                unsubscribe(clientId, topic);
+            if (subscribers.containsKey(topic)) {
+                unsubscribe( topic);
             }
             
             MqttPahoMessageDrivenChannelAdapter adapter = 
-                new MqttPahoMessageDrivenChannelAdapter(clientId, mqttClientFactory, topic);
+                new MqttPahoMessageDrivenChannelAdapter(CLIENT_ID, mqttClientFactory, topic);
             adapter.setConverter(new DefaultPahoMessageConverter());
             adapter.setOutputChannel(mqttInputChannel);
             
@@ -188,7 +147,7 @@ public class MqttClientUtil implements ApplicationContextAware {
             
             adapter.start();
             
-            subscribers.put(key, adapter);
+            subscribers.put(topic, adapter);
         } catch (Exception e) {
             throw new RuntimeException("订阅MQTT主题失败: " + e.getMessage(), e);
         }
@@ -200,16 +159,6 @@ public class MqttClientUtil implements ApplicationContextAware {
      * @param messageConsumer 消息消费者
      */
     public void subscribe(String topic, Consumer<String> messageConsumer) {
-        subscribe(defaultClientId + "-sub-" + System.currentTimeMillis(), topic, messageConsumer);
-    }
-    
-    /**
-     * 订阅主题(使用函数式接口处理消息)
-     * @param clientId 客户端ID
-     * @param topic 主题
-     * @param messageConsumer 消息消费者
-     */
-    public void subscribe(String clientId, String topic, Consumer<String> messageConsumer) {
         MessageHandler handler = new MessageHandler() {
             @Override
             public void handleMessage(org.springframework.messaging.Message<?> message) throws MessagingException {
@@ -217,20 +166,41 @@ public class MqttClientUtil implements ApplicationContextAware {
                 messageConsumer.accept(payload);
             }
         };
-        subscribe(clientId, topic, handler);
+        subscribe(topic, handler);
     }
     
     /**
      * 取消订阅
-     * @param clientId 客户端ID
      * @param topic 主题
      */
-    public void unsubscribe(String clientId, String topic) {
-        String key = clientId + ":" + topic;
+    public void unsubscribe( String topic) {
+        String key = topic;
         MqttPahoMessageDrivenChannelAdapter adapter = subscribers.get(key);
         if (adapter != null) {
             adapter.stop();
             subscribers.remove(key);
         }
     }
+
+    /**
+     * 异步发布消息
+     * @param topic 主题
+     * @param payload 消息内容
+     */
+    @Async
+    public void asyncPublish(String hospitalCode,String topic, Object payload) {
+        publish(hospitalCode,topic, payload);
+    }
+
+    /**
+     * 异步发布消息到指定客户端
+     * @param topic 主题
+     * @param payload 消息内容
+     * @param qos 服务质量等级
+     * @param retained 是否保留消息
+     */
+    @Async
+    public void asyncPublish(String hospitalCode, String topic, String payload, int qos, boolean retained) {
+        publish(hospitalCode, topic, payload, qos, retained);
+    }
 }

+ 5 - 3
nb-core/src/test/java/com/nb/core/utils/MqttClientUtilTest.java

@@ -1,5 +1,6 @@
 package com.nb.core.utils;
 
+import cn.hutool.core.map.MapUtil;
 import cn.hutool.core.util.ReflectUtil;
 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 import org.junit.jupiter.api.Test;
@@ -28,7 +29,9 @@ public class MqttClientUtilTest {
 
         // 测试发布消息
         try {
-            mqttClientUtil.publish("testtopic/123", "Hello MQTT");
+            mqttClientUtil.publish("1000","testtopic/123", MapUtil.builder()
+                            .put("test","123")
+                    .build());
         } catch (Exception e) {
             e.printStackTrace();
             // 在测试环境中,由于没有实际的MQTT服务器,会抛出异常,这是预期的
@@ -42,13 +45,12 @@ public class MqttClientUtilTest {
         
         // 测试发布消息(带参数)
         try {
-            mqttClientUtil.publish("test/topic", "Hello MQTT", 1, false);
+            mqttClientUtil.publish("1000","test/topic", "Hello MQTT", 1, false);
         } catch (Exception e) {
             // 在测试环境中,由于没有实际的MQTT服务器,会抛出异常,这是预期的
         }
     }
 
-    private String defaultClientId = "nb-netpump";
 
     private String defaultBrokerUrl = "tcp://iot.tuoren.com:1883";
 

+ 2 - 0
nb-service-api/web-service-api/src/main/java/com/nb/web/api/feign/IHospitalClient.java

@@ -21,4 +21,6 @@ public interface IHospitalClient {
     List<HospitalResult> selectAll();
 
     List<AppHospitalVO> selectUserHospitalList(String username);
+
+    String selectHospitalCode(String id);
 }

+ 49 - 14
nb-service/web-service/src/main/java/com/nb/web/service/bus/service/LocalBusEvaluationService.java

@@ -2,18 +2,23 @@ package com.nb.web.service.bus.service;
 
 import cn.hutool.core.collection.CollectionUtil;
 import cn.hutool.core.text.CharSequenceUtil;
+import cn.hutool.core.util.ObjectUtil;
 import cn.hutool.core.util.StrUtil;
-import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
 import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
-import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
 import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.nb.auth.utils.SecurityUtil;
+import com.nb.core.enums.MqttTopicEnum;
+import com.nb.core.utils.MqttClientUtil;
 import com.nb.web.api.dto.EvalBatchDTO;
 import com.nb.web.api.entity.BusClinicEntity;
 import com.nb.web.api.entity.BusEvaluationEntity;
 import com.nb.web.api.feign.IClinicEvalClient;
+import com.nb.web.api.feign.IHospitalClient;
+import com.nb.web.service.bus.entity.BusPatientEntity;
 import com.nb.web.service.bus.mapper.BusEvaluationMapper;
 import com.nb.web.api.feign.query.EvalQuery;
+import com.nb.web.service.bus.mapper.BusPatientMapper;
 import com.nb.web.service.bus.utils.AdverseReactionUtil;
 import com.nb.web.service.bus.utils.WsPublishUtils;
 import com.nb.common.crud.BaseService;
@@ -46,6 +51,14 @@ public class LocalBusEvaluationService extends BaseService<BusEvaluationMapper,
     @Lazy
     private WsPublishUtils wsPublishUtils;
 
+    @Autowired
+    private BusPatientMapper patientMapper;
+
+    @Autowired
+    private MqttClientUtil mqttClientUtil;
+
+    @Autowired
+    private IHospitalClient hospitalClient;
 
     public IPage<BusEvaluationEntity> pageQuery(EvalQuery query) {
         return this.baseMapper.pageQuery(query.getPage(),query);
@@ -120,14 +133,36 @@ public class LocalBusEvaluationService extends BaseService<BusEvaluationMapper,
 
     @Override
     public void validateBeforeSave(BusEvaluationEntity entity) {
-        if(StrUtil.isEmpty(entity.getPatientId())){
-            String clinicId = entity.getClinicId();
-            BusClinicEntity clinic = clinicService.getById(clinicId);
+        BusClinicEntity clinic=null;
+        String clinicId = entity.getClinicId();
+        if (StrUtil.isEmpty(entity.getPatientCode())) {
+            clinic = clinicService.getById(clinicId);
             if(clinic==null){
                 throw new CustomException("临床id不能为空");
             }
+            entity.setPatientCode(clinic.getPatientCode());
+        }
+        if(StrUtil.isEmpty(entity.getPatientId())){
+            if (ObjectUtil.isNull(clinic)) {
+                clinic = clinicService.getById(clinicId);
+                if(clinic==null){
+                    throw new CustomException("临床id不能为空");
+                }
+            }
             entity.setPatientId(clinic.getPatientId());
         }
+        if(StrUtil.isEmpty(entity.getInfusionId())&&StrUtil.isNotEmpty(entity.getPatientId())){
+            BusPatientEntity patient = patientMapper.selectById(entity.getPatientId());
+            if(patient==null){
+                return;
+            }
+            entity.setInfusionId(patient.getInfusionId());
+        }
+        String hospitalCode = hospitalClient.selectHospitalCode(SecurityUtil.getTenantId());
+        if (StrUtil.isNotEmpty(hospitalCode)) {
+            //发布消息
+            mqttClientUtil.asyncPublish(hospitalCode, MqttTopicEnum.BUS_EVALUATION,entity);
+        }
     }
 
     @Override
@@ -138,15 +173,15 @@ public class LocalBusEvaluationService extends BaseService<BusEvaluationMapper,
     @Override
     public void postSave(BusEvaluationEntity entity) {
         if(StrUtil.isNotEmpty(entity.getClinicId())){
-        String adverseReactions = AdverseReactionUtil.getAdverseReactions(entity);
-        String clinicId = entity.getClinicId();
-        BusClinicEntity clinic = clinicService.getById(clinicId);
-        clinic.setLastBadEval(adverseReactions);
-        clinic.setEvalTime(Optional.ofNullable(entity.getEvaluateTime()).orElse(new Date()));
-        clinicService.updateById(clinic);
-        if(CharSequenceUtil.isNotBlank(entity.getPatientId())){
-            wsPublishUtils.publishPatientMonitor(entity.getPatientId(),entity.getTenantId());
-        }
+            String adverseReactions = AdverseReactionUtil.getAdverseReactions(entity);
+            String clinicId = entity.getClinicId();
+            BusClinicEntity clinic = clinicService.getById(clinicId);
+            clinic.setLastBadEval(adverseReactions);
+            clinic.setEvalTime(Optional.ofNullable(entity.getEvaluateTime()).orElse(new Date()));
+            clinicService.updateById(clinic);
+            if(CharSequenceUtil.isNotBlank(entity.getPatientId())){
+                wsPublishUtils.publishPatientMonitor(entity.getPatientId(),entity.getTenantId());
+            }
         }
     }
 

+ 6 - 1
nb-service/web-service/src/main/java/com/nb/web/service/bus/service/LocalBusHospitalService.java

@@ -250,7 +250,7 @@ public class LocalBusHospitalService extends BaseService<BusHospitalMapper, BusH
     public void run(String... args) {
         saveDefaultHospital();
         List<BusHospitalEntity> list = list();
-        list.forEach(entity -> {
+        list.parallelStream().forEach(entity -> {
             entity.setScriptOnline(false);
             nameCache.setConfig(entity.getId(),entity.getName());
             this.baseMapper.updateById(entity);
@@ -340,4 +340,9 @@ public class LocalBusHospitalService extends BaseService<BusHospitalMapper, BusH
     public List<AppHospitalVO> selectUserHospitalList(String username) {
         return hospitalMapper.selectUserHospitalListByUsername(username);
     }
+
+    @Override
+    public String selectHospitalCode(String id) {
+        return getName(id);
+    }
 }

+ 14 - 0
nb-service/web-service/src/main/java/com/nb/web/service/bus/service/LocalBusInfusionHistoryService.java

@@ -9,9 +9,12 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
 import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import com.nb.core.enums.MqttTopicEnum;
+import com.nb.core.utils.MqttClientUtil;
 import com.nb.web.api.bean.UndoDeviceConfig;
 import com.nb.web.api.entity.BusClinicEntity;
 import com.nb.web.api.entity.BusInfusionHistoryEntity;
+import com.nb.web.api.feign.IHospitalClient;
 import com.nb.web.service.bus.entity.*;
 import com.nb.web.api.enums.PatientAlarmEnum;
 import com.nb.web.service.bus.mapper.BusInfusionHistoryMapper;
@@ -66,6 +69,12 @@ public class LocalBusInfusionHistoryService extends BaseService<BusInfusionHisto
     @Autowired
     @Lazy
     private WsPublishUtils wsPublishUtils;
+
+    @Autowired
+    private MqttClientUtil mqttClientUtil;
+
+    @Autowired
+    private IHospitalClient hospitalClient;
     @Override
     public void validateBeforeSave(BusInfusionHistoryEntity entity) {
 
@@ -194,6 +203,7 @@ public class LocalBusInfusionHistoryService extends BaseService<BusInfusionHisto
         }
         log.info("结束临床信息:{}", JSONUtil.toJsonStr(manualUndoConfig));
         BusPatientEntity patient = patientService.getById(manualUndoConfig.getPatientId());
+        manualUndoConfig.setPatientCode(patient.getCode());
         if(!finishClinic){
             if(Boolean.TRUE.equals(manualUndoConfig.getMonitorType())){
                 if(patient==null){
@@ -245,6 +255,10 @@ public class LocalBusInfusionHistoryService extends BaseService<BusInfusionHisto
                     .set(BusPatientEntity::getAlarm,PatientAlarmEnum.NONE));
             clinicService.finish(manualUndoConfig.getClinicId(),manualUndoConfig.getTenantId());
         }
+        mqttClientUtil.asyncPublish(
+                hospitalClient.selectHospitalCode(manualUndoConfig.getTenantId()),
+                MqttTopicEnum.BUS_UNDO,
+                MqttUndoDTO.of(manualUndoConfig,finishClinic));
     }
 
     /**

+ 4 - 1
nb-service/web-service/src/main/java/com/nb/web/service/bus/service/dto/ManualUndoConfig.java

@@ -51,7 +51,10 @@ public class ManualUndoConfig implements Serializable {
     @NotNull(message = "撤泵配置不能为空")
     private UndoDeviceConfig undo;
 
+
+    @ApiModelProperty("住院号")
+    private String patientCode;
     public static ManualUndoConfig of(List<String> infusionIds, String patientId,String clinicId,String tenantId,Boolean monitorType,UndoDeviceConfig undo){
-        return new ManualUndoConfig(null,infusionIds,patientId,clinicId,tenantId,monitorType,undo);
+        return new ManualUndoConfig(null,infusionIds,patientId,clinicId,tenantId,monitorType,undo,null);
     }
 }

+ 16 - 0
nb-service/web-service/src/main/java/com/nb/web/service/bus/service/dto/MqttUndoDTO.java

@@ -0,0 +1,16 @@
+package com.nb.web.service.bus.service.dto;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+
+@Data
+@AllArgsConstructor(staticName = "of")
+@NoArgsConstructor
+public class MqttUndoDTO implements Serializable {
+    private static final long serialVersionUID = 1L;
+    private ManualUndoConfig manualUndoConfig;
+    private Boolean finishClinic;
+}