|
@@ -1,18 +1,25 @@
|
|
|
package org.jetlinks.community.standalone.configuration.cluster;
|
|
package org.jetlinks.community.standalone.configuration.cluster;
|
|
|
|
|
|
|
|
|
|
+import cn.hutool.core.util.StrUtil;
|
|
|
|
|
+import cn.hutool.extra.spring.SpringUtil;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
|
|
+import org.jetlinks.core.cluster.ClusterTopic;
|
|
|
import org.jetlinks.core.cluster.message.ClusterMessage;
|
|
import org.jetlinks.core.cluster.message.ClusterMessage;
|
|
|
import org.jetlinks.core.cluster.ClusterManager;
|
|
import org.jetlinks.core.cluster.ClusterManager;
|
|
|
import org.jetlinks.core.cluster.ServerNode;
|
|
import org.jetlinks.core.cluster.ServerNode;
|
|
|
|
|
+import org.jetlinks.core.device.DeviceOperator;
|
|
|
|
|
+import org.jetlinks.core.device.DeviceRegistry;
|
|
|
import org.jetlinks.core.device.StandaloneDeviceMessageBroker;
|
|
import org.jetlinks.core.device.StandaloneDeviceMessageBroker;
|
|
|
import org.jetlinks.core.enums.ErrorCode;
|
|
import org.jetlinks.core.enums.ErrorCode;
|
|
|
import org.jetlinks.core.exception.DeviceOperationException;
|
|
import org.jetlinks.core.exception.DeviceOperationException;
|
|
|
import org.jetlinks.core.message.*;
|
|
import org.jetlinks.core.message.*;
|
|
|
import org.reactivestreams.Publisher;
|
|
import org.reactivestreams.Publisher;
|
|
|
|
|
+import org.springframework.util.StringUtils;
|
|
|
import reactor.core.publisher.*;
|
|
import reactor.core.publisher.*;
|
|
|
import reactor.core.scheduler.Schedulers;
|
|
import reactor.core.scheduler.Schedulers;
|
|
|
import java.util.List;
|
|
import java.util.List;
|
|
|
import java.util.Optional;
|
|
import java.util.Optional;
|
|
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -29,6 +36,7 @@ public class ClusterDeviceMessageBrokeMessageBroker extends StandaloneDeviceMess
|
|
|
|
|
|
|
|
private ClusterManager clusterManager;
|
|
private ClusterManager clusterManager;
|
|
|
|
|
|
|
|
|
|
+ private DeviceRegistry deviceRegistry;
|
|
|
private List<ServerNode> getAllNode(){
|
|
private List<ServerNode> getAllNode(){
|
|
|
return clusterManager.getHaManager().getAllNode();
|
|
return clusterManager.getHaManager().getAllNode();
|
|
|
}
|
|
}
|
|
@@ -36,6 +44,12 @@ public class ClusterDeviceMessageBrokeMessageBroker extends StandaloneDeviceMess
|
|
|
public ClusterDeviceMessageBrokeMessageBroker(String serverId, ClusterManager clusterManager) {
|
|
public ClusterDeviceMessageBrokeMessageBroker(String serverId, ClusterManager clusterManager) {
|
|
|
this.serverId = serverId;
|
|
this.serverId = serverId;
|
|
|
this.clusterManager = clusterManager;
|
|
this.clusterManager = clusterManager;
|
|
|
|
|
+ clusterManager.getTopic("_reply_"+serverId)
|
|
|
|
|
+ .subscribePattern()
|
|
|
|
|
+ .map(ClusterTopic.TopicMessage::getMessage)
|
|
|
|
|
+ .cast(DeviceMessageReply.class)
|
|
|
|
|
+ .flatMap(super::reply)
|
|
|
|
|
+ .subscribe();
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -48,6 +62,24 @@ public class ClusterDeviceMessageBrokeMessageBroker extends StandaloneDeviceMess
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
+ @Override
|
|
|
|
|
+ public Mono<Boolean> reply(DeviceMessageReply message) {
|
|
|
|
|
+ if(deviceRegistry==null){
|
|
|
|
|
+ deviceRegistry=SpringUtil.getBean(DeviceRegistry.class);
|
|
|
|
|
+ }
|
|
|
|
|
+ String deviceId = message.getDeviceId();
|
|
|
|
|
+ if(StrUtil.isEmpty(deviceId)){
|
|
|
|
|
+ return super.reply(message);
|
|
|
|
|
+ }
|
|
|
|
|
+ return deviceRegistry.getDevice(deviceId)
|
|
|
|
|
+ .flatMap(DeviceOperator::getConnectionServerId)
|
|
|
|
|
+ .flatMap(connectionServerId->{
|
|
|
|
|
+ if(this.serverId.equals(connectionServerId)){
|
|
|
|
|
+ return super.reply(message);
|
|
|
|
|
+ }
|
|
|
|
|
+ return clusterManager.getTopic("_reply_"+serverId).publish(Mono.just(message)).thenReturn(true);
|
|
|
|
|
+ });
|
|
|
|
|
+ }
|
|
|
@Override
|
|
@Override
|
|
|
public Mono<Integer> send(String serverId, Publisher<? extends Message> message) {
|
|
public Mono<Integer> send(String serverId, Publisher<? extends Message> message) {
|
|
|
if(this.serverId.equals(serverId)){
|
|
if(this.serverId.equals(serverId)){
|