|
|
@@ -101,7 +101,6 @@ class ClusterUniqueTask implements Task ,Serializable{
|
|
|
|
|
|
private transient Disposable generatePingMsgDisposable =null;
|
|
|
|
|
|
- private transient long lastReceiveMsgTime;
|
|
|
/**
|
|
|
* 持有锁的线程
|
|
|
*/
|
|
|
@@ -196,6 +195,9 @@ class ClusterUniqueTask implements Task ,Serializable{
|
|
|
}else {
|
|
|
//等待心跳传递过来
|
|
|
this.workerId=null;
|
|
|
+ if(this.generatePingMsgDisposable!=null&&!this.generatePingMsgDisposable.isDisposed()){
|
|
|
+ this.generatePingMsgDisposable.dispose();
|
|
|
+ }
|
|
|
}
|
|
|
}catch (InterruptedException e){
|
|
|
}
|
|
|
@@ -230,9 +232,6 @@ class ClusterUniqueTask implements Task ,Serializable{
|
|
|
if(State.running.equals(this.taskState)&&!State.running.equals(this.executor.getState())){
|
|
|
this.executor.start();
|
|
|
}
|
|
|
-// if(this.handleDisposable!=null&&!this.handleDisposable.isDisposed()){
|
|
|
-// this.handleDisposable.dispose();
|
|
|
-// }
|
|
|
if(this.generatePingMsgDisposable==null||this.generatePingMsgDisposable.isDisposed()){
|
|
|
this.generatePingMsgDisposable = this.clusterManager.getTopic(pingTopic).publish(Mono.just(this)).subscribe();
|
|
|
}
|
|
|
@@ -248,7 +247,6 @@ class ClusterUniqueTask implements Task ,Serializable{
|
|
|
*
|
|
|
*/
|
|
|
private Mono<Void> handlePingMsg(){
|
|
|
- lastReceiveMsgTime=System.currentTimeMillis();
|
|
|
return Flux.interval(Duration.ofSeconds(pingTime/3))
|
|
|
.filter(ignore-> (this.handleDisposable ==null|| this.handleDisposable.isDisposed()))
|
|
|
.doOnNext(ignore->{
|
|
|
@@ -261,17 +259,12 @@ class ClusterUniqueTask implements Task ,Serializable{
|
|
|
this.handleDisposable = this.clusterManager.getTopic(this.pingTopic)
|
|
|
.subscribePattern()
|
|
|
.mergeWith(this.clusterManager.getTopic(this.operationTopic).subscribePattern())
|
|
|
-// .timeout(Duration.ofSeconds(Math.multiplyExact(pingTime,2)), Mono.fromRunnable(() -> this.isAlive.set(false)))
|
|
|
+ .timeout(Duration.ofSeconds(Math.multiplyExact(pingTime,2)), Mono.fromRunnable(() -> this.isAlive.set(false)))
|
|
|
.doOnNext(__->{
|
|
|
if(isReplica()&&this.executor.getState().equals(State.running)){
|
|
|
this.taskState=State.running;
|
|
|
this.executor.pause();
|
|
|
}
|
|
|
- //心跳超时
|
|
|
- long now = System.currentTimeMillis();
|
|
|
- if(isReplica()&&(now - lastReceiveMsgTime>TimeUnit.SECONDS.toMillis(Math.multiplyExact(pingTime,2)))){
|
|
|
- this.isAlive.set(false);
|
|
|
- }
|
|
|
})
|
|
|
.publishOn(Schedulers.boundedElastic())
|
|
|
.flatMap(obj -> {
|
|
|
@@ -285,8 +278,6 @@ class ClusterUniqueTask implements Task ,Serializable{
|
|
|
ClusterUniqueTask task = (ClusterUniqueTask) message;
|
|
|
if(task.getCurrentSeverId().equals(this.workerId)){
|
|
|
return Mono.empty();
|
|
|
- }else if(isReplica()){
|
|
|
- lastReceiveMsgTime=System.currentTimeMillis();
|
|
|
}
|
|
|
this.workerId = task.currentSeverId;
|
|
|
this.taskState = task.taskState;
|
|
|
@@ -297,8 +288,6 @@ class ClusterUniqueTask implements Task ,Serializable{
|
|
|
OperationMessage operationMessage = (OperationMessage) message;
|
|
|
if(operationMessage.fromServerId.equals(this.workerId)){
|
|
|
return Mono.empty();
|
|
|
- }else if(isReplica()){
|
|
|
- lastReceiveMsgTime=System.currentTimeMillis();
|
|
|
}
|
|
|
//任务存活
|
|
|
return operation(operationMessage);
|