فهرست منبع

add 规则引擎集群

18339543638 4 سال پیش
والد
کامیت
06f5823fe7

+ 44 - 19
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/cluster/ClusterUniqueTask.java

@@ -4,7 +4,6 @@ import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.jetlinks.core.cluster.ClusterManager;
-import org.jetlinks.core.cluster.ClusterTopic;
 import org.jetlinks.rule.engine.api.RuleData;
 import org.jetlinks.rule.engine.api.scheduler.ScheduleJob;
 import org.jetlinks.rule.engine.api.task.ExecutableTaskExecutor;
@@ -20,7 +19,6 @@ import java.io.Serializable;
 import java.time.Duration;
 import java.util.concurrent.TimeUnit;
 import java.util.*;
-import java.util.function.Function;
 
 /**
  * @author lifang
@@ -64,7 +62,9 @@ public class ClusterUniqueTask implements Task ,Serializable{
 
     private transient final AbstractExecutionContext context;
 
-    //默认心跳时间10s
+    /**
+     * 默认心跳时间10s
+     */
     private int pingTime=10;
 
     /**
@@ -113,19 +113,7 @@ public class ClusterUniqueTask implements Task ,Serializable{
 
     @Override
     public Mono<Void> setJob(ScheduleJob job) {
-        if(isReplica()){
-            return operation(OperationMessage.of(TaskOperation.JOB,Arrays.asList(job)));
-        }
-        return Mono.fromRunnable(() -> {
-            ScheduleJob old = context.getJob();
-            context.setJob(job);
-            try {
-                executor.validate();
-            } catch (Throwable e) {
-                context.setJob(old);
-                throw e;
-            }
-        });
+        return operation(OperationMessage.of(TaskOperation.JOB,Arrays.asList(job)));
     }
 
     public ClusterUniqueTask(String schedulerId,
@@ -171,16 +159,25 @@ public class ClusterUniqueTask implements Task ,Serializable{
         if(result){
             //创建任务,锁争夺成功后发送心跳
             isDead=false;
-            if(State.running.equals(taskState)||State.running.equals(executor.getState())){
-                executor.start();
-            }
+            workerId=this.getCurrentSeverId();
             return Flux.interval(Duration.ofSeconds(pingTime/2))
+                .filter(ignore->{
+                    if (!isReplica()) {
+                        //非主节点,解锁,停止发送心跳
+                        lock.unlock();
+                        creatUniqueFail();
+                    }
+                    return true;
+                })
                 .flatMap(ignore->{
                     lock.lock(pingTime,TimeUnit.SECONDS);
                     if (listenDisposable!=null) {
                         //终止监听心跳
                         listenDisposable.dispose();
                     }
+                    if(State.running.equals(taskState)||State.running.equals(executor.getState())){
+                        executor.start();
+                    }
                     return clusterManager.getTopic(pingTopic).publish(Mono.just(this)).then(Mono.empty());
                 }).then(Mono.just(true));
         }
@@ -301,7 +298,11 @@ public class ClusterUniqueTask implements Task ,Serializable{
     private void creatUniqueFail(){
         /**
          * 抢夺失败时
+         *
          */
+        if(listenDisposable==null) {
+            return;
+        }
         listenDisposable = clusterManager.getTopic(pingTopic)
             .subscribePattern()
             .mergeWith(clusterManager.getTopic(operationTopic).subscribePattern())
@@ -332,13 +333,37 @@ public class ClusterUniqueTask implements Task ,Serializable{
             });
     }
     enum TaskOperation implements Serializable {
+        /**
+         * 开始任务
+         */
         START,
+        /**
+         * 暂停任务
+         */
         PAUSE,
+        /**
+         * 重载任务
+         */
         RELOAD,
+        /**
+         * 关闭任务
+         */
         SHUTDOWN,
+        /**
+         * 设置任务
+         */
         JOB,
+        /**
+         * 执行任务
+         */
         EXECUTE,
+        /**
+         * 开启debug
+         */
         ENABLE_DEBUG,
+        /**
+         * 关闭debug
+         */
         DISABLE_DEBUG
     }