|
|
@@ -1,8 +1,9 @@
|
|
|
package com.nb.core.utils;
|
|
|
|
|
|
-import cn.hutool.core.util.ObjectUtil;
|
|
|
import cn.hutool.json.JSONUtil;
|
|
|
import com.nb.core.entity.MqttMessage;
|
|
|
+import com.nb.core.service.MqttMessageForwardService;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.integration.channel.DirectChannel;
|
|
|
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
|
|
|
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
|
|
|
@@ -14,7 +15,6 @@ import org.springframework.messaging.MessagingException;
|
|
|
import org.springframework.context.ApplicationContext;
|
|
|
import org.springframework.context.ApplicationContextAware;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
-import org.springframework.scheduling.annotation.Async;
|
|
|
|
|
|
import javax.annotation.PostConstruct;
|
|
|
import javax.annotation.PreDestroy;
|
|
|
@@ -31,29 +31,30 @@ import java.util.function.Consumer;
|
|
|
*/
|
|
|
@Component
|
|
|
public class MqttClientUtil implements ApplicationContextAware {
|
|
|
- public static final String CLIENT_ID = "nb-netpump"+"-"+System.currentTimeMillis();
|
|
|
- private ApplicationContext applicationContext;
|
|
|
|
|
|
+ private ApplicationContext applicationContext;
|
|
|
+
|
|
|
+ @Autowired(required = false)
|
|
|
+ private MqttMessageForwardService mqttMessageForwardService;
|
|
|
+
|
|
|
private final Map<String, MqttPahoMessageDrivenChannelAdapter> subscribers = new ConcurrentHashMap<>();
|
|
|
|
|
|
private MqttPahoClientFactory mqttClientFactory;
|
|
|
-
|
|
|
+
|
|
|
+ private String defaultClientId = "nb-netpump";
|
|
|
+
|
|
|
private MessageChannel mqttInputChannel;
|
|
|
-
|
|
|
- public Boolean isOnline(){
|
|
|
- return ObjectUtil.isNotNull(mqttClientFactory);
|
|
|
- }
|
|
|
-
|
|
|
+
|
|
|
@PostConstruct
|
|
|
public void init() {
|
|
|
// 创建默认的消息通道
|
|
|
mqttInputChannel = new DirectChannel();
|
|
|
+
|
|
|
// 尝试从Spring容器中获取MqttPahoClientFactory
|
|
|
try {
|
|
|
mqttClientFactory = applicationContext.getBean(MqttPahoClientFactory.class);
|
|
|
} catch (Exception e) {
|
|
|
- mqttClientFactory=null;
|
|
|
- // 忽略异常,如果获取不到工厂则无法进行MQTT操作
|
|
|
+
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -84,42 +85,79 @@ public class MqttClientUtil implements ApplicationContextAware {
|
|
|
* @param topic 主题
|
|
|
* @param payload 消息内容
|
|
|
*/
|
|
|
- public void publish(String hospitalCode,String topic, Object payload) {
|
|
|
- publish(hospitalCode,topic, payload, 0, false);
|
|
|
+ public void publish(String topic, String payload) {
|
|
|
+ publish(topic, payload, 0, false);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 发布消息到指定客户端
|
|
|
+ * 发布消息
|
|
|
* @param topic 主题
|
|
|
* @param payload 消息内容
|
|
|
- * @param qos 服务质量等级
|
|
|
+ * @param qos 服务质量等级 (0, 1, 2)
|
|
|
* @param retained 是否保留消息
|
|
|
*/
|
|
|
- public void publish(String hospitalCode, String topic, Object payload, int qos, boolean retained) {
|
|
|
+ public void publish(String topic, String payload, int qos, boolean retained) {
|
|
|
try {
|
|
|
+ // 创建消息实体
|
|
|
+ MqttMessage message = new MqttMessage(topic, payload);
|
|
|
+ message.setClientId(defaultClientId + "-pub-" + System.currentTimeMillis());
|
|
|
+ // 确保客户端工厂已初始化
|
|
|
if (mqttClientFactory == null) {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
+ MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(defaultClientId + "-pub-" + System.currentTimeMillis(), mqttClientFactory);
|
|
|
+ messageHandler.setDefaultTopic(topic);
|
|
|
+ messageHandler.setDefaultQos(qos);
|
|
|
+ messageHandler.setDefaultRetained(retained);
|
|
|
+ // 确保处理器已初始化
|
|
|
+ messageHandler.afterPropertiesSet();
|
|
|
+
|
|
|
+ org.springframework.messaging.Message<String> springMessage =
|
|
|
+ org.springframework.messaging.support.MessageBuilder.withPayload(JSONUtil.toJsonStr(message)).build();
|
|
|
+ messageHandler.handleMessage(springMessage);
|
|
|
+ } catch (Exception e) {
|
|
|
+ throw new RuntimeException("发布MQTT消息失败: " + e.getMessage(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 发布消息到指定客户端
|
|
|
+ * @param clientId 客户端ID
|
|
|
+ * @param topic 主题
|
|
|
+ * @param payload 消息内容
|
|
|
+ * @param qos 服务质量等级
|
|
|
+ * @param retained 是否保留消息
|
|
|
+ */
|
|
|
+ public void publish(String clientId, String topic, String payload, int qos, boolean retained) {
|
|
|
+ try {
|
|
|
// 创建消息实体
|
|
|
- MqttMessage message = new MqttMessage(topic, JSONUtil.toJsonStr(payload));
|
|
|
- message.setClientId(CLIENT_ID);
|
|
|
+ MqttMessage message = new MqttMessage(topic, payload);
|
|
|
+ message.setClientId(clientId + "-pub-" + System.currentTimeMillis());
|
|
|
+
|
|
|
+ // 如果配置了转发服务,则转发消息
|
|
|
+ if (mqttMessageForwardService != null) {
|
|
|
+ // 这里可以配置转发到其他服务的URL
|
|
|
+ // mqttMessageForwardService.forwardMessage(message, "http://other-service/api/mqtt/receive");
|
|
|
+ }
|
|
|
+ if (mqttClientFactory == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
- MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(CLIENT_ID, mqttClientFactory);
|
|
|
- messageHandler.setDefaultTopic("hospitalInfo/"+hospitalCode);
|
|
|
+ MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId + "-pub-" + System.currentTimeMillis(), mqttClientFactory);
|
|
|
+ messageHandler.setDefaultTopic(topic);
|
|
|
messageHandler.setDefaultQos(qos);
|
|
|
messageHandler.setDefaultRetained(retained);
|
|
|
// 确保处理器已初始化
|
|
|
messageHandler.afterPropertiesSet();
|
|
|
|
|
|
- org.springframework.messaging.Message<String> springMessage =
|
|
|
- org.springframework.messaging.support.MessageBuilder.withPayload(JSONUtil.toJsonStr(message)).build();
|
|
|
+ org.springframework.messaging.Message<String> springMessage =
|
|
|
+ org.springframework.messaging.support.MessageBuilder.withPayload(payload).build();
|
|
|
messageHandler.handleMessage(springMessage);
|
|
|
} catch (Exception e) {
|
|
|
throw new RuntimeException("发布MQTT消息失败: " + e.getMessage(), e);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
|
|
|
/**
|
|
|
* 订阅主题
|
|
|
@@ -127,17 +165,28 @@ public class MqttClientUtil implements ApplicationContextAware {
|
|
|
* @param messageHandler 消息处理器
|
|
|
*/
|
|
|
public void subscribe(String topic, MessageHandler messageHandler) {
|
|
|
+ subscribe(defaultClientId + "-sub-" + System.currentTimeMillis(), topic, messageHandler);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 订阅主题
|
|
|
+ * @param clientId 客户端ID
|
|
|
+ * @param topic 主题
|
|
|
+ * @param messageHandler 消息处理器
|
|
|
+ */
|
|
|
+ public void subscribe(String clientId, String topic, MessageHandler messageHandler) {
|
|
|
try {
|
|
|
if (mqttClientFactory == null) {
|
|
|
return;
|
|
|
}
|
|
|
// 如果已经订阅了该主题,则先取消订阅
|
|
|
- if (subscribers.containsKey(topic)) {
|
|
|
- unsubscribe(topic);
|
|
|
+ String key = clientId + ":" + topic;
|
|
|
+ if (subscribers.containsKey(key)) {
|
|
|
+ unsubscribe(clientId, topic);
|
|
|
}
|
|
|
|
|
|
MqttPahoMessageDrivenChannelAdapter adapter =
|
|
|
- new MqttPahoMessageDrivenChannelAdapter(CLIENT_ID, mqttClientFactory, topic);
|
|
|
+ new MqttPahoMessageDrivenChannelAdapter(clientId, mqttClientFactory, topic);
|
|
|
adapter.setConverter(new DefaultPahoMessageConverter());
|
|
|
adapter.setOutputChannel(mqttInputChannel);
|
|
|
|
|
|
@@ -148,7 +197,7 @@ public class MqttClientUtil implements ApplicationContextAware {
|
|
|
|
|
|
adapter.start();
|
|
|
|
|
|
- subscribers.put(topic, adapter);
|
|
|
+ subscribers.put(key, adapter);
|
|
|
} catch (Exception e) {
|
|
|
throw new RuntimeException("订阅MQTT主题失败: " + e.getMessage(), e);
|
|
|
}
|
|
|
@@ -160,6 +209,16 @@ public class MqttClientUtil implements ApplicationContextAware {
|
|
|
* @param messageConsumer 消息消费者
|
|
|
*/
|
|
|
public void subscribe(String topic, Consumer<String> messageConsumer) {
|
|
|
+ subscribe(defaultClientId + "-sub-" + System.currentTimeMillis(), topic, messageConsumer);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 订阅主题(使用函数式接口处理消息)
|
|
|
+ * @param clientId 客户端ID
|
|
|
+ * @param topic 主题
|
|
|
+ * @param messageConsumer 消息消费者
|
|
|
+ */
|
|
|
+ public void subscribe(String clientId, String topic, Consumer<String> messageConsumer) {
|
|
|
MessageHandler handler = new MessageHandler() {
|
|
|
@Override
|
|
|
public void handleMessage(org.springframework.messaging.Message<?> message) throws MessagingException {
|
|
|
@@ -167,44 +226,20 @@ public class MqttClientUtil implements ApplicationContextAware {
|
|
|
messageConsumer.accept(payload);
|
|
|
}
|
|
|
};
|
|
|
- subscribe(topic, handler);
|
|
|
+ subscribe(clientId, topic, handler);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 取消订阅
|
|
|
+ * @param clientId 客户端ID
|
|
|
* @param topic 主题
|
|
|
*/
|
|
|
- public void unsubscribe(String topic) {
|
|
|
- MqttPahoMessageDrivenChannelAdapter adapter = subscribers.get(topic);
|
|
|
+ public void unsubscribe(String clientId, String topic) {
|
|
|
+ String key = clientId + ":" + topic;
|
|
|
+ MqttPahoMessageDrivenChannelAdapter adapter = subscribers.get(key);
|
|
|
if (adapter != null) {
|
|
|
- try {
|
|
|
- adapter.stop();
|
|
|
- } catch (Exception e) {
|
|
|
- // 忽略停止异常
|
|
|
- }
|
|
|
- subscribers.remove(topic);
|
|
|
+ adapter.stop();
|
|
|
+ subscribers.remove(key);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- /**
|
|
|
- * 异步发布消息
|
|
|
- * @param topic 主题
|
|
|
- * @param payload 消息内容
|
|
|
- */
|
|
|
- @Async
|
|
|
- public void asyncPublish(String hospitalCode,String topic, Object payload) {
|
|
|
- publish(hospitalCode,topic, payload);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 异步发布消息到指定客户端
|
|
|
- * @param topic 主题
|
|
|
- * @param payload 消息内容
|
|
|
- * @param qos 服务质量等级
|
|
|
- * @param retained 是否保留消息
|
|
|
- */
|
|
|
- @Async
|
|
|
- public void asyncPublish(String hospitalCode, String topic, String payload, int qos, boolean retained) {
|
|
|
- publish(hospitalCode, topic, payload, qos, retained);
|
|
|
- }
|
|
|
}
|