18339543638 4 роки тому
батько
коміт
62efaa9414

+ 12 - 2
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/cluster/ClusterUniqueTask.java

@@ -101,6 +101,7 @@ class ClusterUniqueTask implements Task ,Serializable{
 
     private transient Disposable generatePingMsgDisposable =null;
 
+    private transient long lastReceiveMsgTime;
     /**
      * 持有锁的线程
      */
@@ -247,6 +248,7 @@ class ClusterUniqueTask implements Task ,Serializable{
      *
      */
     private Mono<Void> handlePingMsg(){
+        lastReceiveMsgTime=System.currentTimeMillis();
         return Flux.interval(Duration.ofSeconds(pingTime/3))
             .filter(ignore-> (this.handleDisposable ==null|| this.handleDisposable.isDisposed()))
             .doOnNext(ignore->{
@@ -259,12 +261,17 @@ class ClusterUniqueTask implements Task ,Serializable{
                     this.handleDisposable = this.clusterManager.getTopic(this.pingTopic)
                         .subscribePattern()
                         .mergeWith(this.clusterManager.getTopic(this.operationTopic).subscribePattern())
-                        .timeout(Duration.ofSeconds(Math.multiplyExact(pingTime,2)), Mono.fromRunnable(() -> this.isAlive.set(false)))
+//                        .timeout(Duration.ofSeconds(Math.multiplyExact(pingTime,2)), Mono.fromRunnable(() -> this.isAlive.set(false)))
                         .doOnNext(__->{
                             if(isReplica()&&this.executor.getState().equals(State.running)){
                                 this.taskState=State.running;
                                 this.executor.pause();
                             }
+                            //心跳超时
+                            long now = System.currentTimeMillis();
+                            if(isReplica()&&(now - lastReceiveMsgTime>TimeUnit.SECONDS.toMillis(Math.multiplyExact(pingTime,2)))){
+                                this.isAlive.set(false);
+                            }
                         })
                         .publishOn(Schedulers.boundedElastic())
                         .flatMap(obj -> {
@@ -277,8 +284,9 @@ class ClusterUniqueTask implements Task ,Serializable{
                                 //心跳信息
                                 ClusterUniqueTask task = (ClusterUniqueTask) message;
                                 if(task.getCurrentSeverId().equals(this.workerId)){
-                                    //本地节点监听心跳关闭,开始发送心跳
                                     return Mono.empty();
+                                }else if(isReplica()){
+                                    lastReceiveMsgTime=System.currentTimeMillis();
                                 }
                                 this.workerId = task.currentSeverId;
                                 this.taskState = task.taskState;
@@ -289,6 +297,8 @@ class ClusterUniqueTask implements Task ,Serializable{
                                 OperationMessage operationMessage = (OperationMessage) message;
                                 if(operationMessage.fromServerId.equals(this.workerId)){
                                     return Mono.empty();
+                                }else if(isReplica()){
+                                    lastReceiveMsgTime=System.currentTimeMillis();
                                 }
                                 //任务存活
                                 return operation(operationMessage);