Pārlūkot izejas kodu

add
mqtt信息发送

lifang 1 mēnesi atpakaļ
vecāks
revīzija
32e1f95f5f

+ 7 - 1
nb-core/src/main/java/com/nb/core/entity/MqttMessage.java

@@ -1,7 +1,12 @@
 package com.nb.core.entity;
 
+import com.nb.core.handler.MqttMessageHandlerManager;
 import lombok.Data;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
 
+import java.io.Serializable;
 import java.util.Date;
 import java.util.Map;
 
@@ -13,7 +18,8 @@ import java.util.Map;
  * @version 1.0.0
  */
 @Data
-public class MqttMessage {
+public class MqttMessage implements Serializable {
+    private static final long serialVersionUID = 1L;
     /**
      * 主题
      */

+ 49 - 79
nb-core/src/main/java/com/nb/core/utils/MqttClientUtil.java

@@ -1,8 +1,8 @@
 package com.nb.core.utils;
 
+import cn.hutool.core.util.ObjectUtil;
 import cn.hutool.json.JSONUtil;
 import com.nb.core.entity.MqttMessage;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.integration.channel.DirectChannel;
 import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
 import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
@@ -14,6 +14,7 @@ import org.springframework.messaging.MessagingException;
 import org.springframework.context.ApplicationContext;
 import org.springframework.context.ApplicationContextAware;
 import org.springframework.stereotype.Component;
+import org.springframework.scheduling.annotation.Async;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
@@ -30,22 +31,23 @@ import java.util.function.Consumer;
  */
 @Component
 public class MqttClientUtil implements ApplicationContextAware {
-
+    public static final String CLIENT_ID = "nb-netpump"+"-"+System.currentTimeMillis();
     private ApplicationContext applicationContext;
 
     private final Map<String, MqttPahoMessageDrivenChannelAdapter> subscribers = new ConcurrentHashMap<>();
     
     private MqttPahoClientFactory mqttClientFactory;
-    
-    private String defaultClientId = "nb-netpump";
-    
+
     private MessageChannel mqttInputChannel;
-    
+
+    public Boolean isOnline(){
+        return ObjectUtil.isNotNull(mqttClientFactory);
+    }
+
     @PostConstruct
     public void init() {
         // 创建默认的消息通道
         mqttInputChannel = new DirectChannel();
-        
         // 尝试从Spring容器中获取MqttPahoClientFactory
         try {
             mqttClientFactory = applicationContext.getBean(MqttPahoClientFactory.class);
@@ -81,103 +83,60 @@ public class MqttClientUtil implements ApplicationContextAware {
      * @param topic 主题
      * @param payload 消息内容
      */
-    public void publish(String topic, String payload) {
-        publish(topic, payload, 0, false);
-    }
-    
-    /**
-     * 发布消息
-     * @param topic 主题
-     * @param payload 消息内容
-     * @param qos 服务质量等级 (0, 1, 2)
-     * @param retained 是否保留消息
-     */
-    public void publish(String topic, String payload, int qos, boolean retained) {
-        try {
-            // 创建消息实体
-            MqttMessage message = new MqttMessage(topic, payload);
-            message.setClientId(defaultClientId + "-pub-" + System.currentTimeMillis());
-            // 确保客户端工厂已初始化
-            if (mqttClientFactory == null) {
-                return;
-            }
-            
-            MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(defaultClientId + "-pub-" + System.currentTimeMillis(), mqttClientFactory);
-            messageHandler.setDefaultTopic(topic);
-            messageHandler.setDefaultQos(qos);
-            messageHandler.setDefaultRetained(retained);
-            // 确保处理器已初始化
-            messageHandler.afterPropertiesSet();
-            
-            org.springframework.messaging.Message<String> springMessage = 
-                org.springframework.messaging.support.MessageBuilder.withPayload(JSONUtil.toJsonStr(message)).build();
-            messageHandler.handleMessage(springMessage);
-        } catch (Exception e) {
-            throw new RuntimeException("发布MQTT消息失败: " + e.getMessage(), e);
-        }
+    public void publish(String hospitalCode,String topic, Object payload) {
+        publish(hospitalCode,topic, payload, 0, false);
     }
     
     /**
      * 发布消息到指定客户端
-     * @param clientId 客户端ID
      * @param topic 主题
      * @param payload 消息内容
      * @param qos 服务质量等级
      * @param retained 是否保留消息
      */
-    public void publish(String clientId, String topic, String payload, int qos, boolean retained) {
+    public void publish(String hospitalCode, String topic, Object payload, int qos, boolean retained) {
         try {
             // 创建消息实体
-            MqttMessage message = new MqttMessage(topic, payload);
-            message.setClientId(clientId + "-pub-" + System.currentTimeMillis());
+            MqttMessage message = new MqttMessage(topic, JSONUtil.toJsonStr( payload));
+            message.setClientId(CLIENT_ID);
             
             if (mqttClientFactory == null) {
                 return;
             }
             
-            MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId + "-pub-" + System.currentTimeMillis(), mqttClientFactory);
-            messageHandler.setDefaultTopic(topic);
+            MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(CLIENT_ID, mqttClientFactory);
+            messageHandler.setDefaultTopic("hospitalInfo/"+hospitalCode);
             messageHandler.setDefaultQos(qos);
             messageHandler.setDefaultRetained(retained);
             // 确保处理器已初始化
             messageHandler.afterPropertiesSet();
             
-            org.springframework.messaging.Message<String> springMessage = 
-                org.springframework.messaging.support.MessageBuilder.withPayload(payload).build();
+            org.springframework.messaging.Message<String> springMessage =
+                org.springframework.messaging.support.MessageBuilder.withPayload(JSONUtil.toJsonStr(message)).build();
             messageHandler.handleMessage(springMessage);
         } catch (Exception e) {
             throw new RuntimeException("发布MQTT消息失败: " + e.getMessage(), e);
         }
     }
+
     
     /**
      * 订阅主题
      * @param topic 主题
      * @param messageHandler 消息处理器
      */
-    public void subscribe(String topic, MessageHandler messageHandler) {
-        subscribe(defaultClientId + "-sub-" + System.currentTimeMillis(), topic, messageHandler);
-    }
-    
-    /**
-     * 订阅主题
-     * @param clientId 客户端ID
-     * @param topic 主题
-     * @param messageHandler 消息处理器
-     */
-    public void subscribe(String clientId, String topic, MessageHandler messageHandler) {
+    public void subscribe( String topic, MessageHandler messageHandler) {
         try {
             if (mqttClientFactory == null) {
                 return;
             }
             // 如果已经订阅了该主题,则先取消订阅
-            String key = clientId + ":" + topic;
-            if (subscribers.containsKey(key)) {
-                unsubscribe(clientId, topic);
+            if (subscribers.containsKey(topic)) {
+                unsubscribe( topic);
             }
             
             MqttPahoMessageDrivenChannelAdapter adapter = 
-                new MqttPahoMessageDrivenChannelAdapter(clientId, mqttClientFactory, topic);
+                new MqttPahoMessageDrivenChannelAdapter(CLIENT_ID, mqttClientFactory, topic);
             adapter.setConverter(new DefaultPahoMessageConverter());
             adapter.setOutputChannel(mqttInputChannel);
             
@@ -188,7 +147,7 @@ public class MqttClientUtil implements ApplicationContextAware {
             
             adapter.start();
             
-            subscribers.put(key, adapter);
+            subscribers.put(topic, adapter);
         } catch (Exception e) {
             throw new RuntimeException("订阅MQTT主题失败: " + e.getMessage(), e);
         }
@@ -200,16 +159,6 @@ public class MqttClientUtil implements ApplicationContextAware {
      * @param messageConsumer 消息消费者
      */
     public void subscribe(String topic, Consumer<String> messageConsumer) {
-        subscribe(defaultClientId + "-sub-" + System.currentTimeMillis(), topic, messageConsumer);
-    }
-    
-    /**
-     * 订阅主题(使用函数式接口处理消息)
-     * @param clientId 客户端ID
-     * @param topic 主题
-     * @param messageConsumer 消息消费者
-     */
-    public void subscribe(String clientId, String topic, Consumer<String> messageConsumer) {
         MessageHandler handler = new MessageHandler() {
             @Override
             public void handleMessage(org.springframework.messaging.Message<?> message) throws MessagingException {
@@ -217,20 +166,41 @@ public class MqttClientUtil implements ApplicationContextAware {
                 messageConsumer.accept(payload);
             }
         };
-        subscribe(clientId, topic, handler);
+        subscribe(topic, handler);
     }
     
     /**
      * 取消订阅
-     * @param clientId 客户端ID
      * @param topic 主题
      */
-    public void unsubscribe(String clientId, String topic) {
-        String key = clientId + ":" + topic;
+    public void unsubscribe( String topic) {
+        String key = topic;
         MqttPahoMessageDrivenChannelAdapter adapter = subscribers.get(key);
         if (adapter != null) {
             adapter.stop();
             subscribers.remove(key);
         }
     }
+
+    /**
+     * 异步发布消息
+     * @param topic 主题
+     * @param payload 消息内容
+     */
+    @Async
+    public void asyncPublish(String hospitalCode,String topic, Object payload) {
+        publish(hospitalCode,topic, payload);
+    }
+
+    /**
+     * 异步发布消息到指定客户端
+     * @param topic 主题
+     * @param payload 消息内容
+     * @param qos 服务质量等级
+     * @param retained 是否保留消息
+     */
+    @Async
+    public void asyncPublish(String hospitalCode, String topic, String payload, int qos, boolean retained) {
+        publish(hospitalCode, topic, payload, qos, retained);
+    }
 }

+ 5 - 3
nb-core/src/test/java/com/nb/core/utils/MqttClientUtilTest.java

@@ -1,5 +1,6 @@
 package com.nb.core.utils;
 
+import cn.hutool.core.map.MapUtil;
 import cn.hutool.core.util.ReflectUtil;
 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 import org.junit.jupiter.api.Test;
@@ -28,7 +29,9 @@ public class MqttClientUtilTest {
 
         // 测试发布消息
         try {
-            mqttClientUtil.publish("testtopic/123", "Hello MQTT");
+            mqttClientUtil.publish("1000","testtopic/123", MapUtil.builder()
+                            .put("test","123")
+                    .build());
         } catch (Exception e) {
             e.printStackTrace();
             // 在测试环境中,由于没有实际的MQTT服务器,会抛出异常,这是预期的
@@ -42,13 +45,12 @@ public class MqttClientUtilTest {
         
         // 测试发布消息(带参数)
         try {
-            mqttClientUtil.publish("test/topic", "Hello MQTT", 1, false);
+            mqttClientUtil.publish("1000","test/topic", "Hello MQTT", 1, false);
         } catch (Exception e) {
             // 在测试环境中,由于没有实际的MQTT服务器,会抛出异常,这是预期的
         }
     }
 
-    private String defaultClientId = "nb-netpump";
 
     private String defaultBrokerUrl = "tcp://iot.tuoren.com:1883";
 

+ 2 - 0
nb-service-api/web-service-api/src/main/java/com/nb/web/api/feign/IHospitalClient.java

@@ -21,4 +21,6 @@ public interface IHospitalClient {
     List<HospitalResult> selectAll();
 
     List<AppHospitalVO> selectUserHospitalList(String username);
+
+    String selectHospitalCode(String id);
 }

+ 49 - 14
nb-service/web-service/src/main/java/com/nb/web/service/bus/service/LocalBusEvaluationService.java

@@ -2,18 +2,23 @@ package com.nb.web.service.bus.service;
 
 import cn.hutool.core.collection.CollectionUtil;
 import cn.hutool.core.text.CharSequenceUtil;
+import cn.hutool.core.util.ObjectUtil;
 import cn.hutool.core.util.StrUtil;
-import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
 import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
-import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
 import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.nb.auth.utils.SecurityUtil;
+import com.nb.core.enums.MqttTopicEnum;
+import com.nb.core.utils.MqttClientUtil;
 import com.nb.web.api.dto.EvalBatchDTO;
 import com.nb.web.api.entity.BusClinicEntity;
 import com.nb.web.api.entity.BusEvaluationEntity;
 import com.nb.web.api.feign.IClinicEvalClient;
+import com.nb.web.api.feign.IHospitalClient;
+import com.nb.web.service.bus.entity.BusPatientEntity;
 import com.nb.web.service.bus.mapper.BusEvaluationMapper;
 import com.nb.web.api.feign.query.EvalQuery;
+import com.nb.web.service.bus.mapper.BusPatientMapper;
 import com.nb.web.service.bus.utils.AdverseReactionUtil;
 import com.nb.web.service.bus.utils.WsPublishUtils;
 import com.nb.common.crud.BaseService;
@@ -46,6 +51,14 @@ public class LocalBusEvaluationService extends BaseService<BusEvaluationMapper,
     @Lazy
     private WsPublishUtils wsPublishUtils;
 
+    @Autowired
+    private BusPatientMapper patientMapper;
+
+    @Autowired
+    private MqttClientUtil mqttClientUtil;
+
+    @Autowired
+    private IHospitalClient hospitalClient;
 
     public IPage<BusEvaluationEntity> pageQuery(EvalQuery query) {
         return this.baseMapper.pageQuery(query.getPage(),query);
@@ -120,14 +133,36 @@ public class LocalBusEvaluationService extends BaseService<BusEvaluationMapper,
 
     @Override
     public void validateBeforeSave(BusEvaluationEntity entity) {
-        if(StrUtil.isEmpty(entity.getPatientId())){
-            String clinicId = entity.getClinicId();
-            BusClinicEntity clinic = clinicService.getById(clinicId);
+        BusClinicEntity clinic=null;
+        String clinicId = entity.getClinicId();
+        if (StrUtil.isEmpty(entity.getPatientCode())) {
+            clinic = clinicService.getById(clinicId);
             if(clinic==null){
                 throw new CustomException("临床id不能为空");
             }
+            entity.setPatientCode(clinic.getPatientCode());
+        }
+        if(StrUtil.isEmpty(entity.getPatientId())){
+            if (ObjectUtil.isNull(clinic)) {
+                clinic = clinicService.getById(clinicId);
+                if(clinic==null){
+                    throw new CustomException("临床id不能为空");
+                }
+            }
             entity.setPatientId(clinic.getPatientId());
         }
+        if(StrUtil.isEmpty(entity.getInfusionId())&&StrUtil.isNotEmpty(entity.getPatientId())){
+            BusPatientEntity patient = patientMapper.selectById(entity.getPatientId());
+            if(patient==null){
+                return;
+            }
+            entity.setInfusionId(patient.getInfusionId());
+        }
+        String hospitalCode = hospitalClient.selectHospitalCode(SecurityUtil.getTenantId());
+        if (StrUtil.isNotEmpty(hospitalCode)) {
+            //发布消息
+            mqttClientUtil.asyncPublish(hospitalCode, MqttTopicEnum.BUS_EVALUATION,entity);
+        }
     }
 
     @Override
@@ -138,15 +173,15 @@ public class LocalBusEvaluationService extends BaseService<BusEvaluationMapper,
     @Override
     public void postSave(BusEvaluationEntity entity) {
         if(StrUtil.isNotEmpty(entity.getClinicId())){
-        String adverseReactions = AdverseReactionUtil.getAdverseReactions(entity);
-        String clinicId = entity.getClinicId();
-        BusClinicEntity clinic = clinicService.getById(clinicId);
-        clinic.setLastBadEval(adverseReactions);
-        clinic.setEvalTime(Optional.ofNullable(entity.getEvaluateTime()).orElse(new Date()));
-        clinicService.updateById(clinic);
-        if(CharSequenceUtil.isNotBlank(entity.getPatientId())){
-            wsPublishUtils.publishPatientMonitor(entity.getPatientId(),entity.getTenantId());
-        }
+            String adverseReactions = AdverseReactionUtil.getAdverseReactions(entity);
+            String clinicId = entity.getClinicId();
+            BusClinicEntity clinic = clinicService.getById(clinicId);
+            clinic.setLastBadEval(adverseReactions);
+            clinic.setEvalTime(Optional.ofNullable(entity.getEvaluateTime()).orElse(new Date()));
+            clinicService.updateById(clinic);
+            if(CharSequenceUtil.isNotBlank(entity.getPatientId())){
+                wsPublishUtils.publishPatientMonitor(entity.getPatientId(),entity.getTenantId());
+            }
         }
     }
 

+ 6 - 1
nb-service/web-service/src/main/java/com/nb/web/service/bus/service/LocalBusHospitalService.java

@@ -250,7 +250,7 @@ public class LocalBusHospitalService extends BaseService<BusHospitalMapper, BusH
     public void run(String... args) {
         saveDefaultHospital();
         List<BusHospitalEntity> list = list();
-        list.forEach(entity -> {
+        list.parallelStream().forEach(entity -> {
             entity.setScriptOnline(false);
             nameCache.setConfig(entity.getId(),entity.getName());
             this.baseMapper.updateById(entity);
@@ -340,4 +340,9 @@ public class LocalBusHospitalService extends BaseService<BusHospitalMapper, BusH
     public List<AppHospitalVO> selectUserHospitalList(String username) {
         return hospitalMapper.selectUserHospitalListByUsername(username);
     }
+
+    @Override
+    public String selectHospitalCode(String id) {
+        return getName(id);
+    }
 }

+ 4 - 8
nb-service/web-service/src/main/java/com/nb/web/service/bus/service/LocalBusInfusionHistoryService.java

@@ -255,14 +255,10 @@ public class LocalBusInfusionHistoryService extends BaseService<BusInfusionHisto
                     .set(BusPatientEntity::getAlarm,PatientAlarmEnum.NONE));
             clinicService.finish(manualUndoConfig.getClinicId(),manualUndoConfig.getTenantId());
         }
-        try {
-            mqttClientUtil.asyncPublish(
-                    hospitalClient.selectHospitalCode(manualUndoConfig.getTenantId()),
-                    MqttTopicEnum.BUS_UNDO,
-                    MqttUndoDTO.of(manualUndoConfig,finishClinic));
-        }catch (Exception e){
-
-        }
+        mqttClientUtil.asyncPublish(
+                hospitalClient.selectHospitalCode(manualUndoConfig.getTenantId()),
+                MqttTopicEnum.BUS_UNDO,
+                MqttUndoDTO.of(manualUndoConfig,finishClinic));
     }
 
     /**