Kaynağa Gözat

add 规则引擎集群

18339543638 4 yıl önce
ebeveyn
işleme
9f8a191f69

+ 66 - 54
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/cluster/ClusterUniqueTask.java

@@ -102,6 +102,7 @@ class ClusterUniqueTask implements Task ,Serializable{
     /**
      * 持有锁的线程
      */
+
     private transient ExecutorService heldLockThread= new ThreadPoolExecutor(1, 1,
         Integer.MAX_VALUE, TimeUnit.SECONDS, new ArrayBlockingQueue(1024),r->new Thread(r,"cluster-task"),new ThreadPoolExecutor.DiscardOldestPolicy());
 
@@ -188,6 +189,7 @@ class ClusterUniqueTask implements Task ,Serializable{
             })
             .doOnNext(ignore->{
                 //锁续约
+                AtomicBoolean finished= new AtomicBoolean(false);
                 heldLockThread.execute(()->{
                     try {
                         if (lock.tryLock(-1,pingTime/2, TimeUnit.SECONDS)) {
@@ -196,10 +198,14 @@ class ClusterUniqueTask implements Task ,Serializable{
                             this.workerId=null;
                             generatePingMsgDisposable.dispose();
                         }
+
                     } catch (InterruptedException e) {
+                    }finally {
+                        finished.set(true);
                     }
                 });
-
+                while (!finished.get()){
+                }
                 if(!isReplica()){
                     if(State.running.equals(taskState)&&!State.running.equals(executor.getState())){
                         executor.start();
@@ -211,11 +217,69 @@ class ClusterUniqueTask implements Task ,Serializable{
                         generatePingMsgDisposable = clusterManager.getTopic(pingTopic).publish(Mono.just(this)).subscribe();
                     }
                 }
+
             })
             .then();
     }
 
 
+    /**
+     * 处理消息
+     *
+     */
+    private Mono<Void> handlePingMsg(){
+        return Flux.interval(Duration.ofSeconds(pingTime/3))
+            .filter(ignore-> (handleDisposable ==null|| handleDisposable.isDisposed())&&(generatePingMsgDisposable==null||generatePingMsgDisposable.isDisposed()))
+            .doOnNext(ignore->{
+                    if(generatePingMsgDisposable!=null&&!generatePingMsgDisposable.isDisposed()){
+                        if(handleDisposable!=null&&!handleDisposable.isDisposed()){
+                            handleDisposable.isDisposed();
+                        }
+                        return;
+                    }
+                    handleDisposable = clusterManager.getTopic(pingTopic)
+                        .subscribePattern()
+                        .mergeWith(clusterManager.getTopic(operationTopic).subscribePattern())
+                        .timeout(Duration.ofSeconds(Math.multiplyExact(pingTime,2)), Mono.fromRunnable(() -> {
+                            isAlive.set(false);
+                            workerId=null;
+                        }))
+                        .publishOn(Schedulers.boundedElastic())
+                        .flatMap(obj -> {
+                            if(generatePingMsgDisposable!=null&&!generatePingMsgDisposable.isDisposed()){
+                                generatePingMsgDisposable.dispose();
+                            }
+                            Object message = obj.getMessage();
+                            isAlive.set(true);
+                            if (message instanceof ClusterUniqueTask) {
+                                //心跳信息
+                                ClusterUniqueTask task = (ClusterUniqueTask) message;
+                                if(task.getCurrentSeverId().equals(this.workerId)){
+                                    //本地节点关闭
+                                    handleDisposable.dispose();
+                                    return Mono.empty();
+                                }
+                                this.workerId = task.currentSeverId;
+                                this.taskState = task.taskState;
+                                this.lastStateTime = task.lastStateTime;
+                                this.startTime = task.startTime;
+                            } else if (message instanceof OperationMessage) {
+                                //操作信息
+                                OperationMessage operationMessage = (OperationMessage) message;
+                                if(operationMessage.fromServerId.equals(this.workerId)){
+                                    handleDisposable.dispose();
+                                    return Mono.empty();
+                                }
+                                //任务存活
+                                return operation(operationMessage);
+                            }
+                            return Mono.empty();
+                        })
+                        .subscribe();
+                }
+            ).then();
+    }
+
     @Override
     public Mono<Void> reload() {
         return operation(OperationMessage.of(TaskOperation.RELOAD,currentSeverId));
@@ -329,59 +393,7 @@ class ClusterUniqueTask implements Task ,Serializable{
         return !this.getCurrentSeverId().equals(this.getWorkerId());
     }
 
-    /**
-     * 处理消息
-     *
-     */
-    private Mono<Void> handlePingMsg(){
-        return Flux.interval(Duration.ofSeconds(pingTime/3))
-            .filter(ignore-> (handleDisposable ==null|| handleDisposable.isDisposed())&&(generatePingMsgDisposable==null||generatePingMsgDisposable.isDisposed()))
-            .doOnNext(ignore->{
-                    if(generatePingMsgDisposable!=null&&!generatePingMsgDisposable.isDisposed()){
-                        if(handleDisposable!=null&&!handleDisposable.isDisposed()){
-                            handleDisposable.isDisposed();
-                        }
-                        return;
-                    }
-                    handleDisposable = clusterManager.getTopic(pingTopic)
-                        .subscribePattern()
-                        .mergeWith(clusterManager.getTopic(operationTopic).subscribePattern())
-                        .timeout(Duration.ofSeconds(Math.multiplyExact(pingTime,2)), Mono.fromRunnable(() -> isAlive.set(false)))
-                        .publishOn(Schedulers.boundedElastic())
-                        .flatMap(obj -> {
-                            if(generatePingMsgDisposable!=null&&!generatePingMsgDisposable.isDisposed()){
-                                generatePingMsgDisposable.dispose();
-                            }
-                            Object message = obj.getMessage();
-                            isAlive.set(true);
-                            if (message instanceof ClusterUniqueTask) {
-                                //心跳信息
-                                ClusterUniqueTask task = (ClusterUniqueTask) message;
-                                if(task.getCurrentSeverId().equals(this.workerId)){
-                                    //本地节点关闭
-                                    handleDisposable.dispose();
-                                    return Mono.empty();
-                                }
-                                this.workerId = task.currentSeverId;
-                                this.taskState = task.taskState;
-                                this.lastStateTime = task.lastStateTime;
-                                this.startTime = task.startTime;
-                            } else if (message instanceof OperationMessage) {
-                                //操作信息
-                                OperationMessage operationMessage = (OperationMessage) message;
-                                if(operationMessage.fromServerId.equals(this.workerId)){
-                                    handleDisposable.dispose();
-                                    return Mono.empty();
-                                }
-                                //任务存活
-                                return operation(operationMessage);
-                            }
-                            return Mono.empty();
-                        })
-                        .subscribe();
-                }
-            ).then();
-    }
+
     enum TaskOperation implements Serializable {
         /**
          * 开始任务