18339543638 4 лет назад
Родитель
Сommit
1b1c33681e

+ 11 - 2
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/cluster/ClusterUniqueTask.java

@@ -204,7 +204,7 @@ class ClusterUniqueTask implements Task ,Serializable{
                     if(State.running.equals(taskState)&&!State.running.equals(executor.getState())){
                         executor.start();
                     }
-                    if(!handleDisposable.isDisposed()){
+                    if(handleDisposable!=null&&!handleDisposable.isDisposed()){
                         handleDisposable.dispose();
                     }
                     if(generatePingMsgDisposable==null||generatePingMsgDisposable.isDisposed()){
@@ -349,7 +349,7 @@ class ClusterUniqueTask implements Task ,Serializable{
                         .timeout(Duration.ofSeconds(Math.multiplyExact(pingTime,2)), Mono.fromRunnable(() -> isAlive.set(false)))
                         .publishOn(Schedulers.boundedElastic())
                         .flatMap(obj -> {
-                            if(!generatePingMsgDisposable.isDisposed()){
+                            if(generatePingMsgDisposable!=null&&!generatePingMsgDisposable.isDisposed()){
                                 generatePingMsgDisposable.dispose();
                             }
                             Object message = obj.getMessage();
@@ -357,6 +357,11 @@ class ClusterUniqueTask implements Task ,Serializable{
                             if (message instanceof ClusterUniqueTask) {
                                 //心跳信息
                                 ClusterUniqueTask task = (ClusterUniqueTask) message;
+                                if(task.getCurrentSeverId().equals(this.workerId)){
+                                    //本地节点关闭
+                                    handleDisposable.dispose();
+                                    return Mono.empty();
+                                }
                                 this.workerId = task.currentSeverId;
                                 this.taskState = task.taskState;
                                 this.lastStateTime = task.lastStateTime;
@@ -364,6 +369,10 @@ class ClusterUniqueTask implements Task ,Serializable{
                             } else if (message instanceof OperationMessage) {
                                 //操作信息
                                 OperationMessage operationMessage = (OperationMessage) message;
+                                if(operationMessage.fromServerId.equals(this.workerId)){
+                                    handleDisposable.dispose();
+                                    return Mono.empty();
+                                }
                                 //任务存活
                                 return operation(operationMessage);
                             }