|
@@ -98,7 +98,7 @@ class ClusterUniqueTask implements Task ,Serializable{
|
|
|
/**
|
|
/**
|
|
|
* 持有锁的线程
|
|
* 持有锁的线程
|
|
|
*/
|
|
*/
|
|
|
- private ExecutorService heldLockThread= new ThreadPoolExecutor(1, 1,
|
|
|
|
|
|
|
+ private transient ExecutorService heldLockThread= new ThreadPoolExecutor(1, 1,
|
|
|
Integer.MAX_VALUE, TimeUnit.SECONDS, new ArrayBlockingQueue(1024),r->new Thread(r,"cluster-task"),new ThreadPoolExecutor.DiscardOldestPolicy());
|
|
Integer.MAX_VALUE, TimeUnit.SECONDS, new ArrayBlockingQueue(1024),r->new Thread(r,"cluster-task"),new ThreadPoolExecutor.DiscardOldestPolicy());
|
|
|
|
|
|
|
|
ClusterUniqueTask(String schedulerId,
|
|
ClusterUniqueTask(String schedulerId,
|
|
@@ -335,7 +335,7 @@ class ClusterUniqueTask implements Task ,Serializable{
|
|
|
handleDisposable = clusterManager.getTopic(pingTopic)
|
|
handleDisposable = clusterManager.getTopic(pingTopic)
|
|
|
.subscribePattern()
|
|
.subscribePattern()
|
|
|
.mergeWith(clusterManager.getTopic(operationTopic).subscribePattern())
|
|
.mergeWith(clusterManager.getTopic(operationTopic).subscribePattern())
|
|
|
- .timeout(Duration.ofSeconds(pingTime), Mono.fromRunnable(() -> isAlive.set(false)))
|
|
|
|
|
|
|
+ .timeout(Duration.ofSeconds(Math.multiplyExact(pingTime,2)), Mono.fromRunnable(() -> isAlive.set(false)))
|
|
|
.publishOn(Schedulers.boundedElastic())
|
|
.publishOn(Schedulers.boundedElastic())
|
|
|
.flatMap(obj -> {
|
|
.flatMap(obj -> {
|
|
|
if(!generatePingMsgDisposable.isDisposed()){
|
|
if(!generatePingMsgDisposable.isDisposed()){
|