Prechádzať zdrojové kódy

add 规则引擎集群

18339543638 4 rokov pred
rodič
commit
34b548cb05

+ 84 - 104
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/cluster/ClusterUniqueTask.java

@@ -19,6 +19,7 @@ import java.io.Serializable;
 import java.time.Duration;
 import java.util.concurrent.TimeUnit;
 import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * @author lifang
@@ -36,9 +37,12 @@ import java.util.*;
 @Data
 @AllArgsConstructor
 @Slf4j
-public class ClusterUniqueTask implements Task ,Serializable{
+class ClusterUniqueTask implements Task ,Serializable{
 
     private static final long serialVersionUID=1L;
+    /**
+     * 工作服务器节点id
+     */
     private String workerId;
 
     /**
@@ -72,7 +76,6 @@ public class ClusterUniqueTask implements Task ,Serializable{
      */
     private transient String pingTopic ="cluster-unique-task-ping-%s";
 
-
     /**
      * 操作主题
      */
@@ -80,22 +83,15 @@ public class ClusterUniqueTask implements Task ,Serializable{
 
     private Task.State taskState;
 
-
-
     private transient RLock lock;
-    /**
-     * 该任务已死亡
-     */
-    private volatile boolean isDead=true;
 
     private transient final ClusterManager clusterManager;
 
     private transient final TaskExecutor executor;
 
-    /**
-     * 监听任务
-     */
-    private transient Disposable listenDisposable ;
+    private AtomicBoolean isAlive=new AtomicBoolean(true);
+
+    private transient Disposable disposable=null;
     @Override
     public String getId(){
         return this.getContext().getInstanceId();
@@ -113,15 +109,15 @@ public class ClusterUniqueTask implements Task ,Serializable{
 
     @Override
     public Mono<Void> setJob(ScheduleJob job) {
-        return operation(OperationMessage.of(TaskOperation.JOB,Arrays.asList(job)));
+        return operation(OperationMessage.of(TaskOperation.JOB,Collections.singletonList(job)));
     }
 
-    public ClusterUniqueTask(String schedulerId,
-                             AbstractExecutionContext context,
-                             TaskExecutor executor,
-                             String currentSeverId,
-                             ClusterManager clusterManager,
-                             RedissonClient redissonClient) {
+    ClusterUniqueTask(String schedulerId,
+                      AbstractExecutionContext context,
+                      TaskExecutor executor,
+                      String currentSeverId,
+                      ClusterManager clusterManager,
+                      RedissonClient redissonClient) {
         this.schedulerId = schedulerId;
         this.context = context;
         this.executor = executor;
@@ -133,58 +129,51 @@ public class ClusterUniqueTask implements Task ,Serializable{
         this.lock = redissonClient.getLock("cluster-unique-"+this.getId());
         this.redissonClient=redissonClient;
         //先创建本地任务,再争夺任务的唯一锁,避免消息漏发
-        initUniqueTask()
+        init()
             .subscribe();
     }
 
-    private Mono<Void> initUniqueTask(){
-        return creatUniqueTask()
-            .filter(result->!result)
-            .doOnNext(ignore->this.creatUniqueFail())
+    private Mono<Void> init(){
+        return generatePingMsg()
+            .concatWith(this.handlePingMsg())
             .then();
 
     }
-    private Mono<Boolean> creatUniqueTask() {
-        if(!isDead){
-            return Mono.just(false);
-        }
-        boolean result =false;
-        try {
-            //获取锁
-            result = lock.tryLock(-1,pingTime, TimeUnit.SECONDS);
-        }catch (InterruptedException e){
-            return Mono.just(false);
-        }
-        //争夺成功
-        if(result){
-            //创建任务,锁争夺成功后发送心跳
-            isDead=false;
-            workerId=this.getCurrentSeverId();
-            return Flux.interval(Duration.ofSeconds(pingTime/2))
-                .filter(ignore->{
-                    if (isReplica()) {
-                        //非主节点,解锁,停止发送心跳
+    private Mono<Void> generatePingMsg() {
+        return Flux.interval(Duration.ofSeconds(pingTime/2))
+            .filter(ignore->{
+                if (isReplica()) {
+                    if (lock.isHeldByThread(Thread.currentThread().getId())) {
+                        //非工作节点且占据锁,则解锁
                         lock.unlock();
-                        creatUniqueFail();
-                    }
-                    return true;
-                })
-                .flatMap(ignore->{
-                    try {
-                        lock.tryLock(-1,pingTime, TimeUnit.SECONDS);
-                    }catch (Exception e){}
-                    if (listenDisposable!=null) {
-                        //终止监听心跳
-                        listenDisposable.dispose();
+                    }else if(!isAlive.get()&&!lock.isLocked()){
+                        //争夺锁
+                        try {
+                            //获取锁
+                            if (lock.tryLock(-1, pingTime, TimeUnit.SECONDS)) {
+                                //获取锁成功后改写工作节点id
+                                this.workerId=this.getCurrentSeverId();
+                                isAlive.set(true);
+                                return true;
+                            }
+                            return false;
+                        }catch (InterruptedException e){
+                        }
                     }
-                    if(State.running.equals(taskState)&&!State.running.equals(executor.getState())){
-                        executor.start();
-                    }
-                    return clusterManager.getTopic(pingTopic).publish(Mono.just(this)).then(Mono.empty());
-                }).then(Mono.just(true));
-        }
-        //争夺失败
-        return Mono.just(false);
+                }
+                return true;
+            })
+            .flatMap(ignore->{
+                try {
+                    //锁续约
+                    lock.tryLock(-1,pingTime/2, TimeUnit.SECONDS);
+                }catch (Exception e){}
+                if(State.running.equals(taskState)&&!State.running.equals(executor.getState())){
+                    executor.start();
+                }
+                return clusterManager.getTopic(pingTopic).publish(Mono.just(this)).then(Mono.empty());
+            })
+            .then();
     }
 
 
@@ -210,7 +199,7 @@ public class ClusterUniqueTask implements Task ,Serializable{
 
     @Override
     public Mono<Void> execute(RuleData data) {
-        return operation(OperationMessage.of(TaskOperation.EXECUTE,Arrays.asList(data)));
+        return operation(OperationMessage.of(TaskOperation.EXECUTE,Collections.singletonList(data)));
     }
 
     @Override
@@ -300,46 +289,37 @@ public class ClusterUniqueTask implements Task ,Serializable{
         return !this.getCurrentSeverId().equals(this.getWorkerId());
     }
 
-
-    private void creatUniqueFail(){
-        /**
-         * 抢夺失败时
-         *
-         */
-        if(listenDisposable!=null) {
-            listenDisposable.dispose();
-        }
-        if(executor!=null){
-            executor.pause();
-        }
-        listenDisposable = clusterManager.getTopic(pingTopic)
-            .subscribePattern()
-            .mergeWith(clusterManager.getTopic(operationTopic).subscribePattern())
-            .timeout(Duration.ofSeconds(pingTime),
-                Mono.zip(
-                    Mono.just(isDead=true),
-                    initUniqueTask())
-                    .then(Mono.empty()))
-            .publishOn(Schedulers.boundedElastic())
-            .subscribe(obj->{
-                isDead=false;
-                Object message = obj.getMessage();
-                if (message instanceof ClusterUniqueTask) {
-                    //心跳信息
-                    ClusterUniqueTask task = (ClusterUniqueTask) message;
-                    this.workerId = task.currentSeverId;
-                    this.taskState = task.taskState;
-                    this.lastStateTime = task.lastStateTime;
-                    this.startTime = task.startTime;
-                } else if (message instanceof OperationMessage) {
-                    //操作信息
-                    OperationMessage operationMessage = (OperationMessage) message;
-                    if(!isDead){
-                        //任务存活
-                        operation(operationMessage).subscribe();
-                    }
-                }
-            });
+    /**
+     * 抢夺失败时
+     *
+     */
+    private Mono<Void> handlePingMsg(){
+        return Flux.interval(Duration.ofSeconds(pingTime/3))
+            .filter(ignore->disposable==null||disposable.isDisposed())
+            .doOnNext(ignore->
+                disposable= clusterManager.getTopic(pingTopic)
+                    .subscribePattern()
+                    .mergeWith(clusterManager.getTopic(operationTopic).subscribePattern())
+                    .timeout(Duration.ofSeconds(pingTime), Mono.fromRunnable(() -> isAlive.set(false)))
+                    .publishOn(Schedulers.boundedElastic())
+                    .flatMap(obj -> {
+                        Object message = obj.getMessage();
+                        isAlive.set(true);
+                        if (message instanceof ClusterUniqueTask) {
+                            //心跳信息
+                            ClusterUniqueTask task = (ClusterUniqueTask) message;
+                            this.workerId = task.currentSeverId;
+                            this.taskState = task.taskState;
+                            this.lastStateTime = task.lastStateTime;
+                            this.startTime = task.startTime;
+                        } else if (message instanceof OperationMessage) {
+                            //操作信息
+                            OperationMessage operationMessage = (OperationMessage) message;
+                            //任务存活
+                            return operation(operationMessage);
+                        }
+                        return Mono.empty();
+                    }).subscribe()).then();
     }
     enum TaskOperation implements Serializable {
         /**