Ver Fonte

add 规则引擎集群

18339543638 há 4 anos atrás
pai
commit
6cb66118c4

+ 35 - 26
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/cluster/ClusterUniqueTask.java

@@ -325,33 +325,42 @@ class ClusterUniqueTask implements Task ,Serializable{
     private Mono<Void> handlePingMsg(){
         return Flux.interval(Duration.ofSeconds(pingTime/3))
             .filter(ignore-> (handleDisposable ==null|| handleDisposable.isDisposed())&&(generatePingMsgDisposable==null||generatePingMsgDisposable.isDisposed()))
-            .doOnNext(ignore->
-                handleDisposable = clusterManager.getTopic(pingTopic)
-                    .subscribePattern()
-                    .mergeWith(clusterManager.getTopic(operationTopic).subscribePattern())
-                    .timeout(Duration.ofSeconds(pingTime), Mono.fromRunnable(() -> isAlive.set(false)))
-                    .publishOn(Schedulers.boundedElastic())
-                    .flatMap(obj -> {
-                        if(!generatePingMsgDisposable.isDisposed()){
-                            generatePingMsgDisposable.dispose();
-                        }
-                        Object message = obj.getMessage();
-                        isAlive.set(true);
-                        if (message instanceof ClusterUniqueTask) {
-                            //心跳信息
-                            ClusterUniqueTask task = (ClusterUniqueTask) message;
-                            this.workerId = task.currentSeverId;
-                            this.taskState = task.taskState;
-                            this.lastStateTime = task.lastStateTime;
-                            this.startTime = task.startTime;
-                        } else if (message instanceof OperationMessage) {
-                            //操作信息
-                            OperationMessage operationMessage = (OperationMessage) message;
-                            //任务存活
-                            return operation(operationMessage);
+            .doOnNext(ignore->{
+                    if(generatePingMsgDisposable!=null&&!generatePingMsgDisposable.isDisposed()){
+                        if(handleDisposable!=null&&!handleDisposable.isDisposed()){
+                            handleDisposable.isDisposed();
                         }
-                        return Mono.empty();
-                    }).subscribe()).then();
+                        return;
+                    }
+                    handleDisposable = clusterManager.getTopic(pingTopic)
+                        .subscribePattern()
+                        .mergeWith(clusterManager.getTopic(operationTopic).subscribePattern())
+                        .timeout(Duration.ofSeconds(pingTime), Mono.fromRunnable(() -> isAlive.set(false)))
+                        .publishOn(Schedulers.boundedElastic())
+                        .flatMap(obj -> {
+                            if(!generatePingMsgDisposable.isDisposed()){
+                                generatePingMsgDisposable.dispose();
+                            }
+                            Object message = obj.getMessage();
+                            isAlive.set(true);
+                            if (message instanceof ClusterUniqueTask) {
+                                //心跳信息
+                                ClusterUniqueTask task = (ClusterUniqueTask) message;
+                                this.workerId = task.currentSeverId;
+                                this.taskState = task.taskState;
+                                this.lastStateTime = task.lastStateTime;
+                                this.startTime = task.startTime;
+                            } else if (message instanceof OperationMessage) {
+                                //操作信息
+                                OperationMessage operationMessage = (OperationMessage) message;
+                                //任务存活
+                                return operation(operationMessage);
+                            }
+                            return Mono.empty();
+                        })
+                        .subscribe();
+                }
+            ).then();
     }
     enum TaskOperation implements Serializable {
         /**