|
|
@@ -44,7 +44,7 @@ public class ClusterDeviceMessageBrokeMessageBroker extends StandaloneDeviceMess
|
|
|
public ClusterDeviceMessageBrokeMessageBroker(String serverId, ClusterManager clusterManager) {
|
|
|
this.serverId = serverId;
|
|
|
this.clusterManager = clusterManager;
|
|
|
- clusterManager.getTopic("_reply_"+serverId)
|
|
|
+ clusterManager.getTopic("_reply_")
|
|
|
.subscribePattern()
|
|
|
.map(ClusterTopic.TopicMessage::getMessage)
|
|
|
.cast(DeviceMessageReply.class)
|
|
|
@@ -64,21 +64,28 @@ public class ClusterDeviceMessageBrokeMessageBroker extends StandaloneDeviceMess
|
|
|
|
|
|
@Override
|
|
|
public Mono<Boolean> reply(DeviceMessageReply message) {
|
|
|
- if(deviceRegistry==null){
|
|
|
- deviceRegistry=SpringUtil.getBean(DeviceRegistry.class);
|
|
|
- }
|
|
|
- String deviceId = message.getDeviceId();
|
|
|
- if(StrUtil.isEmpty(deviceId)){
|
|
|
+ if (Boolean.TRUE.equals(message.getHeader("Broadcast").orElse(false))) {
|
|
|
return super.reply(message);
|
|
|
+ }else {
|
|
|
+ message.addHeader("Broadcast",true);
|
|
|
+ return clusterManager.getTopic("_reply_").publish(Mono.just(message)).thenReturn(true);
|
|
|
}
|
|
|
- return deviceRegistry.getDevice(deviceId)
|
|
|
- .flatMap(DeviceOperator::getConnectionServerId)
|
|
|
- .flatMap(connectionServerId->{
|
|
|
- if(this.serverId.equals(connectionServerId)){
|
|
|
- return super.reply(message);
|
|
|
- }
|
|
|
- return clusterManager.getTopic("_reply_"+serverId).publish(Mono.just(message)).thenReturn(true);
|
|
|
- });
|
|
|
+// if(deviceRegistry==null){
|
|
|
+// deviceRegistry=SpringUtil.getBean(DeviceRegistry.class);
|
|
|
+// }
|
|
|
+// String deviceId = message.getDeviceId();
|
|
|
+// if(StrUtil.isEmpty(deviceId)){
|
|
|
+// return super.reply(message);
|
|
|
+// }
|
|
|
+//
|
|
|
+// return deviceRegistry.getDevice(deviceId)
|
|
|
+// .flatMap(DeviceOperator::getConnectionServerId)
|
|
|
+// .flatMap(connectionServerId->{
|
|
|
+// if(this.serverId.equals(connectionServerId)){
|
|
|
+// return super.reply(message);
|
|
|
+// }
|
|
|
+// return clusterManager.getTopic("_reply_"+serverId).publish(Mono.just(message)).thenReturn(true);
|
|
|
+// });
|
|
|
}
|
|
|
@Override
|
|
|
public Mono<Integer> send(String serverId, Publisher<? extends Message> message) {
|