|
|
@@ -121,71 +121,71 @@ public class MqttClientUtil implements ApplicationContextAware {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-
|
|
|
- /**
|
|
|
- * 订阅主题
|
|
|
- * @param topic 主题
|
|
|
- * @param messageHandler 消息处理器
|
|
|
- */
|
|
|
- public void subscribe(String topic, MessageHandler messageHandler) {
|
|
|
- try {
|
|
|
- if (mqttClientFactory == null) {
|
|
|
- return;
|
|
|
- }
|
|
|
- // 如果已经订阅了该主题,则先取消订阅
|
|
|
- if (subscribers.containsKey(topic)) {
|
|
|
- unsubscribe(topic);
|
|
|
- }
|
|
|
-
|
|
|
- MqttPahoMessageDrivenChannelAdapter adapter =
|
|
|
- new MqttPahoMessageDrivenChannelAdapter(CLIENT_ID, mqttClientFactory, topic);
|
|
|
- adapter.setConverter(new DefaultPahoMessageConverter());
|
|
|
- adapter.setOutputChannel(mqttInputChannel);
|
|
|
-
|
|
|
- // 创建一个带有处理逻辑的通道
|
|
|
- DirectChannel channel = new DirectChannel();
|
|
|
- channel.subscribe(messageHandler);
|
|
|
- adapter.setOutputChannel(channel);
|
|
|
-
|
|
|
- adapter.start();
|
|
|
-
|
|
|
- subscribers.put(topic, adapter);
|
|
|
- } catch (Exception e) {
|
|
|
- throw new RuntimeException("订阅MQTT主题失败: " + e.getMessage(), e);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 订阅主题(使用函数式接口处理消息)
|
|
|
- * @param topic 主题
|
|
|
- * @param messageConsumer 消息消费者
|
|
|
- */
|
|
|
- public void subscribe(String topic, Consumer<String> messageConsumer) {
|
|
|
- MessageHandler handler = new MessageHandler() {
|
|
|
- @Override
|
|
|
- public void handleMessage(org.springframework.messaging.Message<?> message) throws MessagingException {
|
|
|
- String payload = new String((byte[]) message.getPayload());
|
|
|
- messageConsumer.accept(payload);
|
|
|
- }
|
|
|
- };
|
|
|
- subscribe(topic, handler);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 取消订阅
|
|
|
- * @param topic 主题
|
|
|
- */
|
|
|
- public void unsubscribe(String topic) {
|
|
|
- MqttPahoMessageDrivenChannelAdapter adapter = subscribers.get(topic);
|
|
|
- if (adapter != null) {
|
|
|
- try {
|
|
|
- adapter.stop();
|
|
|
- } catch (Exception e) {
|
|
|
- // 忽略停止异常
|
|
|
- }
|
|
|
- subscribers.remove(topic);
|
|
|
- }
|
|
|
- }
|
|
|
+//
|
|
|
+// /**
|
|
|
+// * 订阅主题
|
|
|
+// * @param topic 主题
|
|
|
+// * @param messageHandler 消息处理器
|
|
|
+// */
|
|
|
+// public void subscribe(String topic, MessageHandler messageHandler) {
|
|
|
+// try {
|
|
|
+// if (mqttClientFactory == null) {
|
|
|
+// return;
|
|
|
+// }
|
|
|
+// // 如果已经订阅了该主题,则先取消订阅
|
|
|
+// if (subscribers.containsKey(topic)) {
|
|
|
+// unsubscribe(topic);
|
|
|
+// }
|
|
|
+//
|
|
|
+// MqttPahoMessageDrivenChannelAdapter adapter =
|
|
|
+// new MqttPahoMessageDrivenChannelAdapter(CLIENT_ID, mqttClientFactory, topic);
|
|
|
+// adapter.setConverter(new DefaultPahoMessageConverter());
|
|
|
+// adapter.setOutputChannel(mqttInputChannel);
|
|
|
+//
|
|
|
+// // 创建一个带有处理逻辑的通道
|
|
|
+// DirectChannel channel = new DirectChannel();
|
|
|
+// channel.subscribe(messageHandler);
|
|
|
+// adapter.setOutputChannel(channel);
|
|
|
+//
|
|
|
+// adapter.start();
|
|
|
+//
|
|
|
+// subscribers.put(topic, adapter);
|
|
|
+// } catch (Exception e) {
|
|
|
+// throw new RuntimeException("订阅MQTT主题失败: " + e.getMessage(), e);
|
|
|
+// }
|
|
|
+// }
|
|
|
+//
|
|
|
+// /**
|
|
|
+// * 订阅主题(使用函数式接口处理消息)
|
|
|
+// * @param topic 主题
|
|
|
+// * @param messageConsumer 消息消费者
|
|
|
+// */
|
|
|
+// public void subscribe(String topic, Consumer<String> messageConsumer) {
|
|
|
+// MessageHandler handler = new MessageHandler() {
|
|
|
+// @Override
|
|
|
+// public void handleMessage(org.springframework.messaging.Message<?> message) throws MessagingException {
|
|
|
+// String payload = new String((byte[]) message.getPayload());
|
|
|
+// messageConsumer.accept(payload);
|
|
|
+// }
|
|
|
+// };
|
|
|
+// subscribe(topic, handler);
|
|
|
+// }
|
|
|
+//
|
|
|
+// /**
|
|
|
+// * 取消订阅
|
|
|
+// * @param topic 主题
|
|
|
+// */
|
|
|
+// public void unsubscribe(String topic) {
|
|
|
+// MqttPahoMessageDrivenChannelAdapter adapter = subscribers.get(topic);
|
|
|
+// if (adapter != null) {
|
|
|
+// try {
|
|
|
+// adapter.stop();
|
|
|
+// } catch (Exception e) {
|
|
|
+// // 忽略停止异常
|
|
|
+// }
|
|
|
+// subscribers.remove(topic);
|
|
|
+// }
|
|
|
+// }
|
|
|
|
|
|
/**
|
|
|
* 异步发布消息
|