|
|
@@ -21,7 +21,10 @@ import reactor.core.publisher.Mono;
|
|
|
import reactor.core.scheduler.Schedulers;
|
|
|
import javax.annotation.Nonnull;
|
|
|
import java.time.Duration;
|
|
|
+import java.time.LocalDateTime;
|
|
|
+import java.time.ZoneOffset;
|
|
|
import java.util.*;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
/**
|
|
|
* @author lifang
|
|
|
@@ -35,6 +38,9 @@ public class ClusterDeviceMessageConnector extends DeviceMessageConnector {
|
|
|
|
|
|
private LinkedHashMap<String,ClusterMessage> messageServerMap=new LinkedHashMap<>(1024,0.75f,true);
|
|
|
|
|
|
+
|
|
|
+ private static final long DEFAULT_TIMEOUT = TimeUnit.SECONDS.toMillis(Integer.getInteger("jetlinks.device.message.default-timeout", 10));
|
|
|
+
|
|
|
public ClusterDeviceMessageConnector(EventBus eventBus, DeviceRegistry registry, MessageHandler messageHandler, DeviceSessionManager sessionManager, String serverId, ClusterManager clusterManager) {
|
|
|
super(eventBus, registry, messageHandler, sessionManager);
|
|
|
this.serverId = serverId;
|
|
|
@@ -91,23 +97,25 @@ public class ClusterDeviceMessageConnector extends DeviceMessageConnector {
|
|
|
/**
|
|
|
* 清理过期消息
|
|
|
*/
|
|
|
-// Flux.interval(Duration.ofSeconds(5))
|
|
|
-// .doOnNext(ignore->{
|
|
|
-// LocalDateTime now = LocalDateTime.now();
|
|
|
-// Set<String> keySet = messageServerMap.keySet();
|
|
|
-// for (String key : keySet) {
|
|
|
-// Message message = messageServerMap.get(key);
|
|
|
-// if(message!=null){
|
|
|
-// long messageTime = message.getTimestamp();
|
|
|
-// if(Duration.between(LocalDateTime.ofEpochSecond(messageTime,0, ZoneOffset.ofHours(8)),now).toMillis()>3000){
|
|
|
-// messageServerMap.remove(key);
|
|
|
-// }else {
|
|
|
-// break;
|
|
|
-// }
|
|
|
-// }
|
|
|
-// }
|
|
|
-// })
|
|
|
-// .subscribe();
|
|
|
+ Flux.interval(Duration.ofSeconds(5))
|
|
|
+ .doOnNext(ignore->{
|
|
|
+ LocalDateTime now = LocalDateTime.now();
|
|
|
+ Set<String> keySet = messageServerMap.keySet();
|
|
|
+ for (String key : keySet) {
|
|
|
+ ClusterMessage message = messageServerMap.get(key);
|
|
|
+ if(message!=null){
|
|
|
+ long messageTime = message.getPayload().getTimestamp();
|
|
|
+ //超过10s将过期消息删除
|
|
|
+ if(Duration.between(LocalDateTime.ofEpochSecond(messageTime/1000,0, ZoneOffset.ofHours(8)),now)
|
|
|
+ .toMillis()>DEFAULT_TIMEOUT){
|
|
|
+ messageServerMap.remove(key);
|
|
|
+ }else {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ })
|
|
|
+ .subscribe();
|
|
|
|
|
|
|
|
|
}
|
|
|
@@ -118,7 +126,9 @@ public class ClusterDeviceMessageConnector extends DeviceMessageConnector {
|
|
|
return Flux.fromStream(getAllNode().stream())
|
|
|
.flatMap(node-> clusterManager
|
|
|
.getTopic(ClusterMessageType.topicOf(node.getId(), ClusterMessageType.up))
|
|
|
- .publish(Mono.just(message))
|
|
|
+ .publish(Mono.just(message)
|
|
|
+ .doOnNext(msg->msg.addHeader(Headers.serverId,serverId))
|
|
|
+ .map(msg->new ClusterMessage(msg,serverId,ClusterMessageType.topicOf(serverId,ClusterMessageType.up))))
|
|
|
)
|
|
|
.flatMap(result->result==0?Mono.just(false):Mono.just(true))
|
|
|
.reduce((a1,a2)->a1&&a2);
|
|
|
@@ -145,7 +155,9 @@ public class ClusterDeviceMessageConnector extends DeviceMessageConnector {
|
|
|
}
|
|
|
return clusterManager
|
|
|
.getTopic(ClusterMessageType.topicOf(serverId,ClusterMessageType.up))
|
|
|
- .publish(Mono.just(message))
|
|
|
+ .publish(Mono.just(message)
|
|
|
+ .doOnNext(msg->msg.addHeader(Headers.serverId,serverId))
|
|
|
+ .map(msg->new ClusterMessage(msg,serverId,ClusterMessageType.topicOf(serverId,ClusterMessageType.up))))
|
|
|
.flatMap(result->result==0?Mono.just(false):Mono.just(true));
|
|
|
}
|
|
|
});
|