|
|
@@ -1,333 +1,245 @@
|
|
|
package com.nb.core.utils;
|
|
|
|
|
|
-import lombok.extern.slf4j.Slf4j;
|
|
|
-import org.eclipse.paho.client.mqttv3.*;
|
|
|
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
|
|
-
|
|
|
-import java.nio.charset.StandardCharsets;
|
|
|
-import java.util.UUID;
|
|
|
-import java.util.concurrent.CompletableFuture;
|
|
|
+import cn.hutool.json.JSONUtil;
|
|
|
+import com.nb.core.entity.MqttMessage;
|
|
|
+import com.nb.core.service.MqttMessageForwardService;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.integration.channel.DirectChannel;
|
|
|
+import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
|
|
|
+import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
|
|
|
+import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
|
|
|
+import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
|
|
|
+import org.springframework.messaging.MessageChannel;
|
|
|
+import org.springframework.messaging.MessageHandler;
|
|
|
+import org.springframework.messaging.MessagingException;
|
|
|
+import org.springframework.context.ApplicationContext;
|
|
|
+import org.springframework.context.ApplicationContextAware;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
+import javax.annotation.PostConstruct;
|
|
|
+import javax.annotation.PreDestroy;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.function.Consumer;
|
|
|
|
|
|
/**
|
|
|
* MQTT客户端工具类
|
|
|
- * 提供MQTT连接、消息发布、订阅等通用功能
|
|
|
+ * 提供MQTT连接、消息发布、订阅等功能
|
|
|
*
|
|
|
- * @author lingma
|
|
|
+ * @author YourName
|
|
|
* @version 1.0.0
|
|
|
- * @since 1.0.0
|
|
|
*/
|
|
|
-@Slf4j
|
|
|
-public class MqttClientUtil {
|
|
|
-
|
|
|
- private MqttClient client;
|
|
|
- private MqttConnectOptions options;
|
|
|
- private String serverURI;
|
|
|
- private String clientId;
|
|
|
- private String username;
|
|
|
- private String password;
|
|
|
-
|
|
|
- /**
|
|
|
- * 构造函数
|
|
|
- *
|
|
|
- * @param serverURI MQTT服务器地址,例如:tcp://localhost:1883
|
|
|
- * @param clientId 客户端ID,需要唯一
|
|
|
- */
|
|
|
- public MqttClientUtil(String serverURI, String clientId) {
|
|
|
- this.serverURI = serverURI;
|
|
|
- this.clientId = clientId;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 构造函数
|
|
|
- *
|
|
|
- * @param serverURI MQTT服务器地址,例如:tcp://localhost:1883
|
|
|
- * @param clientId 客户端ID,需要唯一
|
|
|
- * @param username 用户名
|
|
|
- * @param password 密码
|
|
|
- */
|
|
|
- public MqttClientUtil(String serverURI, String clientId, String username, String password) {
|
|
|
- this(serverURI, clientId);
|
|
|
- this.username = username;
|
|
|
- this.password = password;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 初始化MQTT客户端
|
|
|
- *
|
|
|
- * @throws MqttException MQTT异常
|
|
|
- */
|
|
|
- public void init() throws MqttException {
|
|
|
- if (client == null) {
|
|
|
- client = new MqttClient(serverURI, clientId, new MemoryPersistence());
|
|
|
- }
|
|
|
-
|
|
|
- options = new MqttConnectOptions();
|
|
|
- options.setCleanSession(true);
|
|
|
- options.setConnectionTimeout(30);
|
|
|
- options.setKeepAliveInterval(60);
|
|
|
-
|
|
|
- if (username != null && password != null) {
|
|
|
- options.setUserName(username);
|
|
|
- options.setPassword(password.toCharArray());
|
|
|
- }
|
|
|
-
|
|
|
- client.setCallback(new MqttCallbackExtended() {
|
|
|
- @Override
|
|
|
- public void connectComplete(boolean reconnect, String serverURI) {
|
|
|
- log.info("MQTT客户端连接完成 - serverURI: {}, clientId: {}, reconnect: {}", serverURI, clientId, reconnect);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void connectionLost(Throwable cause) {
|
|
|
- log.error("MQTT客户端连接丢失 - clientId: {}", clientId, cause);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void messageArrived(String topic, MqttMessage message) throws Exception {
|
|
|
- log.info("MQTT客户端收到消息 - topic: {}, message: {}", topic, new String(message.getPayload(), StandardCharsets.UTF_8));
|
|
|
- }
|
|
|
+@Component
|
|
|
+public class MqttClientUtil implements ApplicationContextAware {
|
|
|
+
|
|
|
+ private ApplicationContext applicationContext;
|
|
|
+
|
|
|
+ @Autowired(required = false)
|
|
|
+ private MqttMessageForwardService mqttMessageForwardService;
|
|
|
+
|
|
|
+ private final Map<String, MqttPahoMessageDrivenChannelAdapter> subscribers = new ConcurrentHashMap<>();
|
|
|
+
|
|
|
+ private MqttPahoClientFactory mqttClientFactory;
|
|
|
+
|
|
|
+ private String defaultClientId = "nb-netpump";
|
|
|
+
|
|
|
+ private MessageChannel mqttInputChannel;
|
|
|
+
|
|
|
+ @PostConstruct
|
|
|
+ public void init() {
|
|
|
+ // 创建默认的消息通道
|
|
|
+ mqttInputChannel = new DirectChannel();
|
|
|
+
|
|
|
+ // 尝试从Spring容器中获取MqttPahoClientFactory
|
|
|
+ try {
|
|
|
+ mqttClientFactory = applicationContext.getBean(MqttPahoClientFactory.class);
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
|
- @Override
|
|
|
- public void deliveryComplete(IMqttDeliveryToken token) {
|
|
|
- log.debug("MQTT消息发送完成 - token: {}", token);
|
|
|
- }
|
|
|
- });
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 连接到MQTT服务器
|
|
|
- *
|
|
|
- * @throws MqttException MQTT异常
|
|
|
- */
|
|
|
- public void connect() throws MqttException {
|
|
|
- if (client == null) {
|
|
|
- init();
|
|
|
- }
|
|
|
- if (!client.isConnected()) {
|
|
|
- client.connect(options);
|
|
|
- log.info("MQTT客户端连接成功 - serverURI: {}, clientId: {}", serverURI, clientId);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * 异步连接到MQTT服务器
|
|
|
- *
|
|
|
- * @return CompletableFuture<Void>
|
|
|
- */
|
|
|
- public CompletableFuture<Void> connectAsync() {
|
|
|
- return CompletableFuture.runAsync(() -> {
|
|
|
+ @PreDestroy
|
|
|
+ public void destroy() {
|
|
|
+ // 清理所有订阅者
|
|
|
+ subscribers.values().forEach(adapter -> {
|
|
|
try {
|
|
|
- connect();
|
|
|
- } catch (MqttException e) {
|
|
|
- log.error("MQTT客户端连接失败 - serverURI: {}, clientId: {}", serverURI, clientId, e);
|
|
|
- throw new RuntimeException(e);
|
|
|
+ adapter.stop();
|
|
|
+ } catch (Exception e) {
|
|
|
+ // 忽略停止异常
|
|
|
}
|
|
|
});
|
|
|
+ subscribers.clear();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
- * 断开MQTT连接
|
|
|
- *
|
|
|
- * @throws MqttException MQTT异常
|
|
|
+ * 设置应用上下文
|
|
|
+ * @param applicationContext 应用上下文
|
|
|
*/
|
|
|
- public void disconnect() throws MqttException {
|
|
|
- if (client != null && client.isConnected()) {
|
|
|
- client.disconnect();
|
|
|
- log.info("MQTT客户端断开连接 - clientId: {}", clientId);
|
|
|
- }
|
|
|
+ @Override
|
|
|
+ public void setApplicationContext(ApplicationContext applicationContext) {
|
|
|
+ this.applicationContext = applicationContext;
|
|
|
}
|
|
|
-
|
|
|
- /**
|
|
|
- * 异步断开MQTT连接
|
|
|
- *
|
|
|
- * @return CompletableFuture<Void>
|
|
|
- */
|
|
|
- public CompletableFuture<Void> disconnectAsync() {
|
|
|
- return CompletableFuture.runAsync(() -> {
|
|
|
- try {
|
|
|
- disconnect();
|
|
|
- } catch (MqttException e) {
|
|
|
- log.error("MQTT客户端断开连接失败 - clientId: {}", clientId, e);
|
|
|
- throw new RuntimeException(e);
|
|
|
- }
|
|
|
- });
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 关闭MQTT客户端
|
|
|
- *
|
|
|
- * @throws MqttException MQTT异常
|
|
|
- */
|
|
|
- public void close() throws MqttException {
|
|
|
- if (client != null) {
|
|
|
- if (client.isConnected()) {
|
|
|
- client.disconnect();
|
|
|
- }
|
|
|
- client.close();
|
|
|
- log.info("MQTT客户端已关闭 - clientId: {}", clientId);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 异步关闭MQTT客户端
|
|
|
- *
|
|
|
- * @return CompletableFuture<Void>
|
|
|
- */
|
|
|
- public CompletableFuture<Void> closeAsync() {
|
|
|
- return CompletableFuture.runAsync(() -> {
|
|
|
- try {
|
|
|
- close();
|
|
|
- } catch (MqttException e) {
|
|
|
- log.error("MQTT客户端关闭失败 - clientId: {}", clientId, e);
|
|
|
- throw new RuntimeException(e);
|
|
|
- }
|
|
|
- });
|
|
|
- }
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* 发布消息
|
|
|
- *
|
|
|
- * @param topic 主题
|
|
|
+ * @param topic 主题
|
|
|
* @param payload 消息内容
|
|
|
- * @param qos 服务质量等级(0,1,2)
|
|
|
- * @param retained 是否保留消息
|
|
|
- * @throws MqttException MQTT异常
|
|
|
*/
|
|
|
- public void publish(String topic, String payload, int qos, boolean retained) throws MqttException {
|
|
|
- MqttMessage message = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8));
|
|
|
- message.setQos(qos);
|
|
|
- message.setRetained(retained);
|
|
|
- publish(topic, message);
|
|
|
+ public void publish(String topic, String payload) {
|
|
|
+ publish(topic, payload, 0, false);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* 发布消息
|
|
|
- *
|
|
|
- * @param topic 主题
|
|
|
- * @param message 消息
|
|
|
- * @throws MqttException MQTT异常
|
|
|
+ * @param topic 主题
|
|
|
+ * @param payload 消息内容
|
|
|
+ * @param qos 服务质量等级 (0, 1, 2)
|
|
|
+ * @param retained 是否保留消息
|
|
|
*/
|
|
|
- public void publish(String topic, MqttMessage message) throws MqttException {
|
|
|
- checkConnected();
|
|
|
- client.publish(topic, message);
|
|
|
- log.debug("MQTT消息发布成功 - topic: {}", topic);
|
|
|
+ public void publish(String topic, String payload, int qos, boolean retained) {
|
|
|
+ try {
|
|
|
+ // 创建消息实体
|
|
|
+ MqttMessage message = new MqttMessage(topic, payload);
|
|
|
+ message.setClientId(defaultClientId + "-pub-" + System.currentTimeMillis());
|
|
|
+ // 确保客户端工厂已初始化
|
|
|
+ if (mqttClientFactory == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(defaultClientId + "-pub-" + System.currentTimeMillis(), mqttClientFactory);
|
|
|
+ messageHandler.setDefaultTopic(topic);
|
|
|
+ messageHandler.setDefaultQos(qos);
|
|
|
+ messageHandler.setDefaultRetained(retained);
|
|
|
+ // 确保处理器已初始化
|
|
|
+ messageHandler.afterPropertiesSet();
|
|
|
+
|
|
|
+ org.springframework.messaging.Message<String> springMessage =
|
|
|
+ org.springframework.messaging.support.MessageBuilder.withPayload(JSONUtil.toJsonStr(message)).build();
|
|
|
+ messageHandler.handleMessage(springMessage);
|
|
|
+ } catch (Exception e) {
|
|
|
+ throw new RuntimeException("发布MQTT消息失败: " + e.getMessage(), e);
|
|
|
+ }
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
- * 异步发布消息
|
|
|
- *
|
|
|
- * @param topic 主题
|
|
|
+ * 发布消息到指定客户端
|
|
|
+ * @param clientId 客户端ID
|
|
|
+ * @param topic 主题
|
|
|
* @param payload 消息内容
|
|
|
- * @param qos 服务质量等级(0,1,2)
|
|
|
+ * @param qos 服务质量等级
|
|
|
* @param retained 是否保留消息
|
|
|
- * @return CompletableFuture<Void>
|
|
|
*/
|
|
|
- public CompletableFuture<Void> publishAsync(String topic, String payload, int qos, boolean retained) {
|
|
|
- return CompletableFuture.runAsync(() -> {
|
|
|
- try {
|
|
|
- publish(topic, payload, qos, retained);
|
|
|
- } catch (MqttException e) {
|
|
|
- log.error("MQTT消息发布失败 - topic: {}", topic, e);
|
|
|
- throw new RuntimeException(e);
|
|
|
+ public void publish(String clientId, String topic, String payload, int qos, boolean retained) {
|
|
|
+ try {
|
|
|
+ // 创建消息实体
|
|
|
+ MqttMessage message = new MqttMessage(topic, payload);
|
|
|
+ message.setClientId(clientId + "-pub-" + System.currentTimeMillis());
|
|
|
+
|
|
|
+ // 如果配置了转发服务,则转发消息
|
|
|
+ if (mqttMessageForwardService != null) {
|
|
|
+ // 这里可以配置转发到其他服务的URL
|
|
|
+ // mqttMessageForwardService.forwardMessage(message, "http://other-service/api/mqtt/receive");
|
|
|
}
|
|
|
- });
|
|
|
+ if (mqttClientFactory == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId + "-pub-" + System.currentTimeMillis(), mqttClientFactory);
|
|
|
+ messageHandler.setDefaultTopic(topic);
|
|
|
+ messageHandler.setDefaultQos(qos);
|
|
|
+ messageHandler.setDefaultRetained(retained);
|
|
|
+ // 确保处理器已初始化
|
|
|
+ messageHandler.afterPropertiesSet();
|
|
|
+
|
|
|
+ org.springframework.messaging.Message<String> springMessage =
|
|
|
+ org.springframework.messaging.support.MessageBuilder.withPayload(payload).build();
|
|
|
+ messageHandler.handleMessage(springMessage);
|
|
|
+ } catch (Exception e) {
|
|
|
+ throw new RuntimeException("发布MQTT消息失败: " + e.getMessage(), e);
|
|
|
+ }
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* 订阅主题
|
|
|
- *
|
|
|
* @param topic 主题
|
|
|
- * @param qos 服务质量等级
|
|
|
- * @throws MqttException MQTT异常
|
|
|
+ * @param messageHandler 消息处理器
|
|
|
*/
|
|
|
- public void subscribe(String topic, int qos) throws MqttException {
|
|
|
- checkConnected();
|
|
|
- client.subscribe(topic, qos);
|
|
|
- log.info("MQTT订阅主题成功 - topic: {}, qos: {}", topic, qos);
|
|
|
+ public void subscribe(String topic, MessageHandler messageHandler) {
|
|
|
+ subscribe(defaultClientId + "-sub-" + System.currentTimeMillis(), topic, messageHandler);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
- * 异步订阅主题
|
|
|
- *
|
|
|
+ * 订阅主题
|
|
|
+ * @param clientId 客户端ID
|
|
|
* @param topic 主题
|
|
|
- * @param qos 服务质量等级
|
|
|
- * @return CompletableFuture<Void>
|
|
|
+ * @param messageHandler 消息处理器
|
|
|
*/
|
|
|
- public CompletableFuture<Void> subscribeAsync(String topic, int qos) {
|
|
|
- return CompletableFuture.runAsync(() -> {
|
|
|
- try {
|
|
|
- subscribe(topic, qos);
|
|
|
- } catch (MqttException e) {
|
|
|
- log.error("MQTT订阅主题失败 - topic: {}", topic, e);
|
|
|
- throw new RuntimeException(e);
|
|
|
+ public void subscribe(String clientId, String topic, MessageHandler messageHandler) {
|
|
|
+ try {
|
|
|
+ if (mqttClientFactory == null) {
|
|
|
+ return;
|
|
|
}
|
|
|
- });
|
|
|
+ // 如果已经订阅了该主题,则先取消订阅
|
|
|
+ String key = clientId + ":" + topic;
|
|
|
+ if (subscribers.containsKey(key)) {
|
|
|
+ unsubscribe(clientId, topic);
|
|
|
+ }
|
|
|
+
|
|
|
+ MqttPahoMessageDrivenChannelAdapter adapter =
|
|
|
+ new MqttPahoMessageDrivenChannelAdapter(clientId, mqttClientFactory, topic);
|
|
|
+ adapter.setConverter(new DefaultPahoMessageConverter());
|
|
|
+ adapter.setOutputChannel(mqttInputChannel);
|
|
|
+
|
|
|
+ // 创建一个带有处理逻辑的通道
|
|
|
+ DirectChannel channel = new DirectChannel();
|
|
|
+ channel.subscribe(messageHandler);
|
|
|
+ adapter.setOutputChannel(channel);
|
|
|
+
|
|
|
+ adapter.start();
|
|
|
+
|
|
|
+ subscribers.put(key, adapter);
|
|
|
+ } catch (Exception e) {
|
|
|
+ throw new RuntimeException("订阅MQTT主题失败: " + e.getMessage(), e);
|
|
|
+ }
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
- * 取消订阅主题
|
|
|
- *
|
|
|
+ * 订阅主题(使用函数式接口处理消息)
|
|
|
* @param topic 主题
|
|
|
- * @throws MqttException MQTT异常
|
|
|
+ * @param messageConsumer 消息消费者
|
|
|
*/
|
|
|
- public void unsubscribe(String topic) throws MqttException {
|
|
|
- checkConnected();
|
|
|
- client.unsubscribe(topic);
|
|
|
- log.info("MQTT取消订阅主题成功 - topic: {}", topic);
|
|
|
+ public void subscribe(String topic, Consumer<String> messageConsumer) {
|
|
|
+ subscribe(defaultClientId + "-sub-" + System.currentTimeMillis(), topic, messageConsumer);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
- * 异步取消订阅主题
|
|
|
- *
|
|
|
+ * 订阅主题(使用函数式接口处理消息)
|
|
|
+ * @param clientId 客户端ID
|
|
|
* @param topic 主题
|
|
|
- * @return CompletableFuture<Void>
|
|
|
+ * @param messageConsumer 消息消费者
|
|
|
*/
|
|
|
- public CompletableFuture<Void> unsubscribeAsync(String topic) {
|
|
|
- return CompletableFuture.runAsync(() -> {
|
|
|
- try {
|
|
|
- unsubscribe(topic);
|
|
|
- } catch (MqttException e) {
|
|
|
- log.error("MQTT取消订阅主题失败 - topic: {}", topic, e);
|
|
|
- throw new RuntimeException(e);
|
|
|
+ public void subscribe(String clientId, String topic, Consumer<String> messageConsumer) {
|
|
|
+ MessageHandler handler = new MessageHandler() {
|
|
|
+ @Override
|
|
|
+ public void handleMessage(org.springframework.messaging.Message<?> message) throws MessagingException {
|
|
|
+ String payload = new String((byte[]) message.getPayload());
|
|
|
+ messageConsumer.accept(payload);
|
|
|
}
|
|
|
- });
|
|
|
+ };
|
|
|
+ subscribe(clientId, topic, handler);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
- * 检查客户端是否连接
|
|
|
- *
|
|
|
- * @throws MqttException MQTT异常
|
|
|
+ * 取消订阅
|
|
|
+ * @param clientId 客户端ID
|
|
|
+ * @param topic 主题
|
|
|
*/
|
|
|
- private void checkConnected() throws MqttException {
|
|
|
- if (client == null || !client.isConnected()) {
|
|
|
- throw new MqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED);
|
|
|
+ public void unsubscribe(String clientId, String topic) {
|
|
|
+ String key = clientId + ":" + topic;
|
|
|
+ MqttPahoMessageDrivenChannelAdapter adapter = subscribers.get(key);
|
|
|
+ if (adapter != null) {
|
|
|
+ adapter.stop();
|
|
|
+ subscribers.remove(key);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- /**
|
|
|
- * 获取客户端ID
|
|
|
- *
|
|
|
- * @return 客户端ID
|
|
|
- */
|
|
|
- public String getClientId() {
|
|
|
- return clientId;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 获取MQTT客户端
|
|
|
- *
|
|
|
- * @return MqttClient
|
|
|
- */
|
|
|
- public MqttClient getClient() {
|
|
|
- return client;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 生成随机客户端ID
|
|
|
- *
|
|
|
- * @return 随机客户端ID
|
|
|
- */
|
|
|
- public static String generateClientId() {
|
|
|
- return "mqtt_client_" + UUID.randomUUID().toString().replace("-", "");
|
|
|
- }
|
|
|
}
|