Selaa lähdekoodia

add 规则引擎集群

18339543638 4 vuotta sitten
vanhempi
commit
d7e02af12d

+ 71 - 56
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/cluster/ClusterUniqueTask.java

@@ -4,6 +4,7 @@ import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.jetlinks.core.cluster.ClusterManager;
+import org.jetlinks.core.cluster.ClusterTopic;
 import org.jetlinks.rule.engine.api.RuleData;
 import org.jetlinks.rule.engine.api.scheduler.ScheduleJob;
 import org.jetlinks.rule.engine.api.task.ExecutableTaskExecutor;
@@ -12,14 +13,14 @@ import org.jetlinks.rule.engine.api.task.TaskExecutor;
 import org.jetlinks.rule.engine.defaults.AbstractExecutionContext;
 import org.redisson.api.RLock;
 import org.redisson.api.RedissonClient;
-import reactor.core.publisher.EmitterProcessor;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
+import reactor.core.Disposable;
+import reactor.core.publisher.*;
 import reactor.core.scheduler.Schedulers;
 import java.io.Serializable;
 import java.time.Duration;
 import java.util.concurrent.TimeUnit;
 import java.util.*;
+import java.util.function.Function;
 
 /**
  * @author lifang
@@ -90,6 +91,12 @@ public class ClusterUniqueTask implements Task ,Serializable{
 
     private transient final TaskExecutor executor;
 
+    private transient final UnicastProcessor<ClusterTopic.TopicMessage> listenPing= UnicastProcessor.create();
+
+    /**
+     * 监听任务
+     */
+    private transient Disposable listenDisposable ;
     @Override
     public String getId(){
         return this.getContext().getInstanceId();
@@ -139,13 +146,15 @@ public class ClusterUniqueTask implements Task ,Serializable{
         this.lock = redissonClient.getLock("cluster-unique-"+this.getId());
         this.redissonClient=redissonClient;
         //先创建本地任务,再争夺任务的唯一锁,避免消息漏发
-        initUniqueTask().subscribe();
+        initUniqueTask()
+            .flatMap(ignore->handlePingMessage()).
+            subscribe();
     }
 
     private Mono<Void> initUniqueTask(){
         return creatUniqueTask()
             .filter(result->!result)
-            .flatMap(ignore->this.creatUniqueFail())
+            .doOnNext(ignore->this.creatUniqueFail())
             .then();
 
     }
@@ -167,6 +176,10 @@ public class ClusterUniqueTask implements Task ,Serializable{
             return Flux.interval(Duration.ofSeconds(pingTime/2))
                 .flatMap(ignore->{
                     lock.lock(pingTime,TimeUnit.SECONDS);
+                    if (listenDisposable!=null) {
+                        //终止监听心跳
+                        listenDisposable.dispose();
+                    }
                     return clusterManager.getTopic(pingTopic).publish(Mono.just(this));
                 })
                 .then(Mono.just(true));
@@ -267,66 +280,27 @@ public class ClusterUniqueTask implements Task ,Serializable{
         return Mono.empty();
     }
 
-//    /**
-//     * 状态发生改变
-//     * @return
-//     */
-//    public Flux<?> handleStateChange(){
-//        //非本机任务状态发生改变
-//        return changeState.map(Function.identity()).filter(ignore->this.isReplica());
-//    }
-
 
     public boolean isReplica(){
         return !this.getCurrentSeverId().equals(this.getWorkerId());
     }
 
 
-    private Mono<?> creatUniqueFail(){
-        return EmitterProcessor.create(true)
-            .flatMap(result -> clusterManager.getTopic(pingTopic)
-                .subscribePattern()
-                .mergeWith(clusterManager.getTopic(operationTopic).subscribePattern())
-                .flatMap(obj -> {
-                    isDead=false;
-                    Object message = obj.getMessage();
-                    if (message instanceof ClusterUniqueTask) {
-                        ClusterUniqueTask task = (ClusterUniqueTask) message;
-                        //心跳信息
-                        return Mono.just(task);
-                    } else if (message instanceof TaskOperation) {
-                        //操作信息
-                        TaskOperation task = (TaskOperation) message;
-                        return Mono.just(task);
-                    }
-                    return Mono.empty();
-                }))
-            //心跳超时
+    private void creatUniqueFail(){
+        /**
+         * 抢夺失败时
+         */
+        listenDisposable = clusterManager.getTopic(pingTopic)
+            .subscribePattern()
+            .mergeWith(clusterManager.getTopic(operationTopic).subscribePattern())
+            .filter(ignore -> listenPing.hasDownstreams())
+            .doOnNext(listenPing::onNext)
             .timeout(Duration.ofSeconds(pingTime),
                 Mono.empty()
-                    .doOnNext(ignore->isDead=true)
+                    .doOnNext(ignore -> isDead = true)
                     .concatWith(initUniqueTask())
-                    .then(Mono.just(this)))
-            .flatMap(msg -> {
-                if (msg instanceof ClusterUniqueTask) {
-                    //同步心跳信息
-                    ClusterUniqueTask task = (ClusterUniqueTask) msg;
-                    this.workerId = task.currentSeverId;
-                    this.taskState = task.taskState;
-                    this.lastStateTime = task.lastStateTime;
-                    this.startTime = task.startTime;
-                    return Mono.empty();
-                } else if (msg instanceof OperationMessage) {
-                    //同步操作信息
-                    OperationMessage message = (OperationMessage) msg;
-                    if(!isDead){
-                        //任务存活
-                        return operation(message);
-                    }
-                }
-                return Mono.empty();
-            })
-            .then();
+                    .then(Mono.empty()))
+            .subscribe();
     }
     enum TaskOperation implements Serializable {
         START,
@@ -356,4 +330,45 @@ public class ClusterUniqueTask implements Task ,Serializable{
             return new OperationMessage(operation,null);
         }
     }
+
+    private Flux<ClusterTopic.TopicMessage> getPingMessage(){
+        return listenPing.map(Function.identity());
+    }
+
+    private Mono<Void> handlePingMessage(){
+        return getPingMessage()
+            .flatMap(obj->{
+                isDead=false;
+                Object message = obj.getMessage();
+                if (message instanceof ClusterUniqueTask) {
+                    ClusterUniqueTask task = (ClusterUniqueTask) message;
+                    //心跳信息
+                    return Mono.just(task);
+                } else if (message instanceof TaskOperation) {
+                    //操作信息
+                    TaskOperation task = (TaskOperation) message;
+                    return Mono.just(task);
+                }
+                return Mono.empty();
+            })
+            .flatMap(msg -> {
+                if (msg instanceof ClusterUniqueTask) {
+                    //同步心跳信息
+                    ClusterUniqueTask task = (ClusterUniqueTask) msg;
+                    this.workerId = task.currentSeverId;
+                    this.taskState = task.taskState;
+                    this.lastStateTime = task.lastStateTime;
+                    this.startTime = task.startTime;
+                    return Mono.empty();
+                } else if (msg instanceof OperationMessage) {
+                    //同步操作信息
+                    OperationMessage message = (OperationMessage) msg;
+                    if(!isDead){
+                        //任务存活
+                        return operation(message);
+                    }
+                }
+                return Mono.empty();
+            }).then();
+    }
 }