|
|
@@ -0,0 +1,333 @@
|
|
|
+package com.nb.core.utils;
|
|
|
+
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.eclipse.paho.client.mqttv3.*;
|
|
|
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
|
|
+
|
|
|
+import java.nio.charset.StandardCharsets;
|
|
|
+import java.util.UUID;
|
|
|
+import java.util.concurrent.CompletableFuture;
|
|
|
+
|
|
|
+/**
|
|
|
+ * MQTT客户端工具类
|
|
|
+ * 提供MQTT连接、消息发布、订阅等通用功能
|
|
|
+ *
|
|
|
+ * @author lingma
|
|
|
+ * @version 1.0.0
|
|
|
+ * @since 1.0.0
|
|
|
+ */
|
|
|
+@Slf4j
|
|
|
+public class MqttClientUtil {
|
|
|
+
|
|
|
+ private MqttClient client;
|
|
|
+ private MqttConnectOptions options;
|
|
|
+ private String serverURI;
|
|
|
+ private String clientId;
|
|
|
+ private String username;
|
|
|
+ private String password;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 构造函数
|
|
|
+ *
|
|
|
+ * @param serverURI MQTT服务器地址,例如:tcp://localhost:1883
|
|
|
+ * @param clientId 客户端ID,需要唯一
|
|
|
+ */
|
|
|
+ public MqttClientUtil(String serverURI, String clientId) {
|
|
|
+ this.serverURI = serverURI;
|
|
|
+ this.clientId = clientId;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 构造函数
|
|
|
+ *
|
|
|
+ * @param serverURI MQTT服务器地址,例如:tcp://localhost:1883
|
|
|
+ * @param clientId 客户端ID,需要唯一
|
|
|
+ * @param username 用户名
|
|
|
+ * @param password 密码
|
|
|
+ */
|
|
|
+ public MqttClientUtil(String serverURI, String clientId, String username, String password) {
|
|
|
+ this(serverURI, clientId);
|
|
|
+ this.username = username;
|
|
|
+ this.password = password;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 初始化MQTT客户端
|
|
|
+ *
|
|
|
+ * @throws MqttException MQTT异常
|
|
|
+ */
|
|
|
+ public void init() throws MqttException {
|
|
|
+ if (client == null) {
|
|
|
+ client = new MqttClient(serverURI, clientId, new MemoryPersistence());
|
|
|
+ }
|
|
|
+
|
|
|
+ options = new MqttConnectOptions();
|
|
|
+ options.setCleanSession(true);
|
|
|
+ options.setConnectionTimeout(30);
|
|
|
+ options.setKeepAliveInterval(60);
|
|
|
+
|
|
|
+ if (username != null && password != null) {
|
|
|
+ options.setUserName(username);
|
|
|
+ options.setPassword(password.toCharArray());
|
|
|
+ }
|
|
|
+
|
|
|
+ client.setCallback(new MqttCallbackExtended() {
|
|
|
+ @Override
|
|
|
+ public void connectComplete(boolean reconnect, String serverURI) {
|
|
|
+ log.info("MQTT客户端连接完成 - serverURI: {}, clientId: {}, reconnect: {}", serverURI, clientId, reconnect);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void connectionLost(Throwable cause) {
|
|
|
+ log.error("MQTT客户端连接丢失 - clientId: {}", clientId, cause);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void messageArrived(String topic, MqttMessage message) throws Exception {
|
|
|
+ log.info("MQTT客户端收到消息 - topic: {}, message: {}", topic, new String(message.getPayload(), StandardCharsets.UTF_8));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void deliveryComplete(IMqttDeliveryToken token) {
|
|
|
+ log.debug("MQTT消息发送完成 - token: {}", token);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 连接到MQTT服务器
|
|
|
+ *
|
|
|
+ * @throws MqttException MQTT异常
|
|
|
+ */
|
|
|
+ public void connect() throws MqttException {
|
|
|
+ if (client == null) {
|
|
|
+ init();
|
|
|
+ }
|
|
|
+ if (!client.isConnected()) {
|
|
|
+ client.connect(options);
|
|
|
+ log.info("MQTT客户端连接成功 - serverURI: {}, clientId: {}", serverURI, clientId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 异步连接到MQTT服务器
|
|
|
+ *
|
|
|
+ * @return CompletableFuture<Void>
|
|
|
+ */
|
|
|
+ public CompletableFuture<Void> connectAsync() {
|
|
|
+ return CompletableFuture.runAsync(() -> {
|
|
|
+ try {
|
|
|
+ connect();
|
|
|
+ } catch (MqttException e) {
|
|
|
+ log.error("MQTT客户端连接失败 - serverURI: {}, clientId: {}", serverURI, clientId, e);
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 断开MQTT连接
|
|
|
+ *
|
|
|
+ * @throws MqttException MQTT异常
|
|
|
+ */
|
|
|
+ public void disconnect() throws MqttException {
|
|
|
+ if (client != null && client.isConnected()) {
|
|
|
+ client.disconnect();
|
|
|
+ log.info("MQTT客户端断开连接 - clientId: {}", clientId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 异步断开MQTT连接
|
|
|
+ *
|
|
|
+ * @return CompletableFuture<Void>
|
|
|
+ */
|
|
|
+ public CompletableFuture<Void> disconnectAsync() {
|
|
|
+ return CompletableFuture.runAsync(() -> {
|
|
|
+ try {
|
|
|
+ disconnect();
|
|
|
+ } catch (MqttException e) {
|
|
|
+ log.error("MQTT客户端断开连接失败 - clientId: {}", clientId, e);
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 关闭MQTT客户端
|
|
|
+ *
|
|
|
+ * @throws MqttException MQTT异常
|
|
|
+ */
|
|
|
+ public void close() throws MqttException {
|
|
|
+ if (client != null) {
|
|
|
+ if (client.isConnected()) {
|
|
|
+ client.disconnect();
|
|
|
+ }
|
|
|
+ client.close();
|
|
|
+ log.info("MQTT客户端已关闭 - clientId: {}", clientId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 异步关闭MQTT客户端
|
|
|
+ *
|
|
|
+ * @return CompletableFuture<Void>
|
|
|
+ */
|
|
|
+ public CompletableFuture<Void> closeAsync() {
|
|
|
+ return CompletableFuture.runAsync(() -> {
|
|
|
+ try {
|
|
|
+ close();
|
|
|
+ } catch (MqttException e) {
|
|
|
+ log.error("MQTT客户端关闭失败 - clientId: {}", clientId, e);
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 发布消息
|
|
|
+ *
|
|
|
+ * @param topic 主题
|
|
|
+ * @param payload 消息内容
|
|
|
+ * @param qos 服务质量等级(0,1,2)
|
|
|
+ * @param retained 是否保留消息
|
|
|
+ * @throws MqttException MQTT异常
|
|
|
+ */
|
|
|
+ public void publish(String topic, String payload, int qos, boolean retained) throws MqttException {
|
|
|
+ MqttMessage message = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8));
|
|
|
+ message.setQos(qos);
|
|
|
+ message.setRetained(retained);
|
|
|
+ publish(topic, message);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 发布消息
|
|
|
+ *
|
|
|
+ * @param topic 主题
|
|
|
+ * @param message 消息
|
|
|
+ * @throws MqttException MQTT异常
|
|
|
+ */
|
|
|
+ public void publish(String topic, MqttMessage message) throws MqttException {
|
|
|
+ checkConnected();
|
|
|
+ client.publish(topic, message);
|
|
|
+ log.debug("MQTT消息发布成功 - topic: {}", topic);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 异步发布消息
|
|
|
+ *
|
|
|
+ * @param topic 主题
|
|
|
+ * @param payload 消息内容
|
|
|
+ * @param qos 服务质量等级(0,1,2)
|
|
|
+ * @param retained 是否保留消息
|
|
|
+ * @return CompletableFuture<Void>
|
|
|
+ */
|
|
|
+ public CompletableFuture<Void> publishAsync(String topic, String payload, int qos, boolean retained) {
|
|
|
+ return CompletableFuture.runAsync(() -> {
|
|
|
+ try {
|
|
|
+ publish(topic, payload, qos, retained);
|
|
|
+ } catch (MqttException e) {
|
|
|
+ log.error("MQTT消息发布失败 - topic: {}", topic, e);
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 订阅主题
|
|
|
+ *
|
|
|
+ * @param topic 主题
|
|
|
+ * @param qos 服务质量等级
|
|
|
+ * @throws MqttException MQTT异常
|
|
|
+ */
|
|
|
+ public void subscribe(String topic, int qos) throws MqttException {
|
|
|
+ checkConnected();
|
|
|
+ client.subscribe(topic, qos);
|
|
|
+ log.info("MQTT订阅主题成功 - topic: {}, qos: {}", topic, qos);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 异步订阅主题
|
|
|
+ *
|
|
|
+ * @param topic 主题
|
|
|
+ * @param qos 服务质量等级
|
|
|
+ * @return CompletableFuture<Void>
|
|
|
+ */
|
|
|
+ public CompletableFuture<Void> subscribeAsync(String topic, int qos) {
|
|
|
+ return CompletableFuture.runAsync(() -> {
|
|
|
+ try {
|
|
|
+ subscribe(topic, qos);
|
|
|
+ } catch (MqttException e) {
|
|
|
+ log.error("MQTT订阅主题失败 - topic: {}", topic, e);
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 取消订阅主题
|
|
|
+ *
|
|
|
+ * @param topic 主题
|
|
|
+ * @throws MqttException MQTT异常
|
|
|
+ */
|
|
|
+ public void unsubscribe(String topic) throws MqttException {
|
|
|
+ checkConnected();
|
|
|
+ client.unsubscribe(topic);
|
|
|
+ log.info("MQTT取消订阅主题成功 - topic: {}", topic);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 异步取消订阅主题
|
|
|
+ *
|
|
|
+ * @param topic 主题
|
|
|
+ * @return CompletableFuture<Void>
|
|
|
+ */
|
|
|
+ public CompletableFuture<Void> unsubscribeAsync(String topic) {
|
|
|
+ return CompletableFuture.runAsync(() -> {
|
|
|
+ try {
|
|
|
+ unsubscribe(topic);
|
|
|
+ } catch (MqttException e) {
|
|
|
+ log.error("MQTT取消订阅主题失败 - topic: {}", topic, e);
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 检查客户端是否连接
|
|
|
+ *
|
|
|
+ * @throws MqttException MQTT异常
|
|
|
+ */
|
|
|
+ private void checkConnected() throws MqttException {
|
|
|
+ if (client == null || !client.isConnected()) {
|
|
|
+ throw new MqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取客户端ID
|
|
|
+ *
|
|
|
+ * @return 客户端ID
|
|
|
+ */
|
|
|
+ public String getClientId() {
|
|
|
+ return clientId;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取MQTT客户端
|
|
|
+ *
|
|
|
+ * @return MqttClient
|
|
|
+ */
|
|
|
+ public MqttClient getClient() {
|
|
|
+ return client;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 生成随机客户端ID
|
|
|
+ *
|
|
|
+ * @return 随机客户端ID
|
|
|
+ */
|
|
|
+ public static String generateClientId() {
|
|
|
+ return "mqtt_client_" + UUID.randomUUID().toString().replace("-", "");
|
|
|
+ }
|
|
|
+}
|