فهرست منبع

add 规则引擎集群

18339543638 4 سال پیش
والد
کامیت
dfcb49d6d0

+ 73 - 47
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/cluster/ClusterUniqueTask.java

@@ -17,14 +17,14 @@ import reactor.core.publisher.*;
 import reactor.core.scheduler.Schedulers;
 import java.io.Serializable;
 import java.time.Duration;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * @author lifang
  * @version 1.0.0
- * @ClassName ClusterTask.java
+ * @ClassName 集群唯一任务.java
  * @Description 集群唯一任务
  *  1、监听自身的任务情况
  *  2、改变状态后向集群广播
@@ -91,26 +91,14 @@ class ClusterUniqueTask implements Task ,Serializable{
 
     private AtomicBoolean isAlive=new AtomicBoolean(true);
 
-    private transient Disposable disposable=null;
-    @Override
-    public String getId(){
-        return this.getContext().getInstanceId();
-    }
-
-    @Override
-    public String getName() {
-        return executor.getName();
-    }
-
-    @Override
-    public ScheduleJob getJob() {
-        return context.getJob();
-    }
+    private transient Disposable handleDisposable =null;
 
-    @Override
-    public Mono<Void> setJob(ScheduleJob job) {
-        return operation(OperationMessage.of(TaskOperation.JOB,Collections.singletonList(job)));
-    }
+    private transient Disposable generatePingMsgDisposable =null;
+    /**
+     * 持有锁的线程
+     */
+    private ExecutorService heldLockThread= new ThreadPoolExecutor(1, 1,
+        Integer.MAX_VALUE, TimeUnit.SECONDS, new ArrayBlockingQueue(1024),r->new Thread(r,"cluster-task"),new ThreadPoolExecutor.DiscardOldestPolicy());
 
     ClusterUniqueTask(String schedulerId,
                       AbstractExecutionContext context,
@@ -133,6 +121,28 @@ class ClusterUniqueTask implements Task ,Serializable{
             .subscribe();
     }
 
+    @Override
+    public String getId(){
+        return this.getContext().getInstanceId();
+    }
+
+    @Override
+    public String getName() {
+        return executor.getName();
+    }
+
+    @Override
+    public ScheduleJob getJob() {
+        return context.getJob();
+    }
+
+    @Override
+    public Mono<Void> setJob(ScheduleJob job) {
+        return operation(OperationMessage.of(TaskOperation.JOB,Collections.singletonList(job)));
+    }
+
+
+
     private Mono<Void> init(){
         return generatePingMsg()
             .concatWith(this.handlePingMsg())
@@ -143,35 +153,48 @@ class ClusterUniqueTask implements Task ,Serializable{
         return Flux.interval(Duration.ofSeconds(pingTime/2))
             .filter(ignore->{
                 if (isReplica()) {
-                    if (lock.isHeldByThread(Thread.currentThread().getId())) {
-                        //非工作节点且占据锁,则解锁
-                        lock.unlock();
-                    }else if(!isAlive.get()&&!lock.isLocked()){
-                        //争夺锁
-                        try {
-                            //获取锁
-                            if (lock.tryLock(-1, pingTime, TimeUnit.SECONDS)) {
-                                //获取锁成功后改写工作节点id
-                                this.workerId=this.getCurrentSeverId();
-                                isAlive.set(true);
-                                return true;
+                    heldLockThread.execute(()->{
+                        if (lock.isHeldByThread(Thread.currentThread().getId())) {
+                            //非工作节点且占据锁,则解锁
+                            lock.unlock();
+                        }else if(!isAlive.get()&&!lock.isLocked()){
+                            //争夺锁
+                            try {
+                                //获取锁
+                                if (lock.tryLock(-1, pingTime, TimeUnit.SECONDS)) {
+                                    //获取锁成功后改写工作节点id
+                                    this.workerId=this.getCurrentSeverId();
+                                    isAlive.set(true);
+                                }
+                            }catch (InterruptedException e){
                             }
-                            return false;
-                        }catch (InterruptedException e){
                         }
-                    }
+                    });
+                    return false;
+                }else if(lock.isLocked()&&lock.isHeldByThread(new Thread().getId())){
+                    return false;
                 }
                 return true;
             })
-            .flatMap(ignore->{
-                try {
-                    //锁续约
-                    lock.tryLock(-1,pingTime/2, TimeUnit.SECONDS);
-                }catch (Exception e){}
-                if(State.running.equals(taskState)&&!State.running.equals(executor.getState())){
-                    executor.start();
+            .doOnNext(ignore->{
+                //锁续约
+                heldLockThread.execute(()->{
+                    try {
+                        if (lock.tryLock(-1,pingTime/2, TimeUnit.SECONDS)) {
+                            this.workerId=this.getCurrentSeverId();
+                        }
+                    } catch (InterruptedException e) {
+                    }
+                });
+
+                if(isReplica()){
+                    if(State.running.equals(taskState)&&!State.running.equals(executor.getState())){
+                        executor.start();
+                    }
+                    if(generatePingMsgDisposable==null||generatePingMsgDisposable.isDisposed()){
+                        generatePingMsgDisposable = clusterManager.getTopic(pingTopic).publish(Mono.just(this)).subscribe();
+                    }
                 }
-                return clusterManager.getTopic(pingTopic).publish(Mono.just(this)).then(Mono.empty());
             })
             .then();
     }
@@ -262,7 +285,7 @@ class ClusterUniqueTask implements Task ,Serializable{
             case SHUTDOWN:
                 this.taskState=State.shutdown;
                 //解锁
-                this.lock.unlock();
+                heldLockThread.execute(()->lock.unlock());
                 break;
             case ENABLE_DEBUG:
                 return Mono.fromRunnable(() -> context.setDebug(true));
@@ -295,14 +318,17 @@ class ClusterUniqueTask implements Task ,Serializable{
      */
     private Mono<Void> handlePingMsg(){
         return Flux.interval(Duration.ofSeconds(pingTime/3))
-            .filter(ignore->disposable==null||disposable.isDisposed())
+            .filter(ignore-> handleDisposable ==null|| handleDisposable.isDisposed())
             .doOnNext(ignore->
-                disposable= clusterManager.getTopic(pingTopic)
+                handleDisposable = clusterManager.getTopic(pingTopic)
                     .subscribePattern()
                     .mergeWith(clusterManager.getTopic(operationTopic).subscribePattern())
                     .timeout(Duration.ofSeconds(pingTime), Mono.fromRunnable(() -> isAlive.set(false)))
                     .publishOn(Schedulers.boundedElastic())
                     .flatMap(obj -> {
+                        if(!generatePingMsgDisposable.isDisposed()){
+                            generatePingMsgDisposable.dispose();
+                        }
                         Object message = obj.getMessage();
                         isAlive.set(true);
                         if (message instanceof ClusterUniqueTask) {