Procházet zdrojové kódy

add 规则引擎集群

18339543638 před 4 roky
rodič
revize
d479705d3a

+ 4 - 6
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/cluster/ClusterUniqueTask.java

@@ -221,9 +221,9 @@ class ClusterUniqueTask implements Task ,Serializable{
                     if(State.running.equals(this.taskState)&&!State.running.equals(this.executor.getState())){
                         this.executor.start();
                     }
-                    if(this.handleDisposable!=null&&!this.handleDisposable.isDisposed()){
-                        this.handleDisposable.dispose();
-                    }
+//                    if(this.handleDisposable!=null&&!this.handleDisposable.isDisposed()){
+//                        this.handleDisposable.dispose();
+//                    }
                     if(this.generatePingMsgDisposable==null||this.generatePingMsgDisposable.isDisposed()){
                         this.generatePingMsgDisposable = this.clusterManager.getTopic(pingTopic).publish(Mono.just(this)).subscribe();
                     }
@@ -240,7 +240,7 @@ class ClusterUniqueTask implements Task ,Serializable{
      */
     private Mono<Void> handlePingMsg(){
         return Flux.interval(Duration.ofSeconds(pingTime/3))
-            .filter(ignore-> (this.handleDisposable ==null|| this.handleDisposable.isDisposed())&&(this.generatePingMsgDisposable==null||this.generatePingMsgDisposable.isDisposed()))
+            .filter(ignore-> (this.handleDisposable ==null|| this.handleDisposable.isDisposed()))
             .doOnNext(ignore->{
                     if(this.generatePingMsgDisposable!=null&&!this.generatePingMsgDisposable.isDisposed()){
                         if(this.handleDisposable!=null&&!this.handleDisposable.isDisposed()){
@@ -270,7 +270,6 @@ class ClusterUniqueTask implements Task ,Serializable{
                                 ClusterUniqueTask task = (ClusterUniqueTask) message;
                                 if(task.getCurrentSeverId().equals(this.workerId)){
                                     //本地节点监听心跳关闭,开始发送心跳
-                                    this.handleDisposable.dispose();
                                     return Mono.empty();
                                 }
                                 this.workerId = task.currentSeverId;
@@ -281,7 +280,6 @@ class ClusterUniqueTask implements Task ,Serializable{
                                 //操作信息
                                 OperationMessage operationMessage = (OperationMessage) message;
                                 if(operationMessage.fromServerId.equals(this.workerId)){
-                                    this.handleDisposable.dispose();
                                     return Mono.empty();
                                 }
                                 //任务存活