Bläddra i källkod

changed 将唯一任务单独提取复用

18339543638 4 år sedan
förälder
incheckning
e3b9713198

+ 5 - 1
jetlinks-components/rule-engine-component/pom.xml

@@ -20,7 +20,11 @@
         </dependency>
 
 
-        <!-- 另一种Spring集成starter,本章未使用 -->
+        <dependency>
+            <groupId>org.jetlinks</groupId>
+            <artifactId>jetlinks-core</artifactId>
+        </dependency>
+
         <dependency>
             <groupId>org.redisson</groupId>
             <artifactId>redisson-spring-boot-starter</artifactId>

+ 0 - 483
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/cluster/ClusterUniqueTask.java

@@ -1,483 +0,0 @@
-package org.jetlinks.community.rule.engine.cluster;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.extern.slf4j.Slf4j;
-import org.jetlinks.core.cluster.ClusterManager;
-import org.jetlinks.rule.engine.api.RuleData;
-import org.jetlinks.rule.engine.api.scheduler.ScheduleJob;
-import org.jetlinks.rule.engine.api.task.ExecutableTaskExecutor;
-import org.jetlinks.rule.engine.api.task.Task;
-import org.jetlinks.rule.engine.api.task.TaskExecutor;
-import org.jetlinks.rule.engine.defaults.AbstractExecutionContext;
-import org.redisson.api.RLock;
-import org.redisson.api.RedissonClient;
-import reactor.core.Disposable;
-import reactor.core.publisher.*;
-import reactor.core.scheduler.Schedulers;
-import java.io.Serializable;
-import java.time.Duration;
-import java.util.concurrent.*;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * @author lifang
- * @version 1.0.0
- * @ClassName 集群唯一任务.java
- * @Description 集群唯一任务
- *  1、监听自身的任务情况
- *  2、改变状态后向集群广播
- *
- *
- *  管理自身状态(除开启和停止以外的状态)
- *  任务自行进行管理,由work进行监听
- * @createTime 2021年11月12日 10:57:00
- */
-@Data
-@AllArgsConstructor
-@Slf4j
-class ClusterUniqueTask implements Task ,Serializable{
-
-    private String id;
-
-    private static final long serialVersionUID=1L;
-    /**
-     * 工作服务器节点id
-     */
-    private String workerId;
-
-    /**
-     * 当前服务器id
-     */
-    private final String currentSeverId;
-
-    private final String schedulerId;
-
-    /**
-     * 最后更新时间
-     */
-    private long lastStateTime;
-
-
-
-    /**
-     * 开始时间
-     */
-    private long startTime;
-
-
-
-    /**
-     * 默认心跳时间10s
-     */
-    private static final int pingTime=10;
-
-    /**
-     * 操作主题
-     */
-    private Task.State taskState;
-
-    private transient final AbstractExecutionContext context;
-
-    private transient final RedissonClient redissonClient;
-
-    /**
-     * 心跳,心跳包传递任务状态,集群实时更新任务状态
-     */
-    private transient String pingTopic ="task-ping-%s";
-
-    private transient String operationTopic ="task-operation-%s";
-
-    private transient RLock lock;
-
-    private transient final ClusterManager clusterManager;
-
-    private transient final TaskExecutor executor;
-
-    private transient AtomicBoolean isAlive=new AtomicBoolean(true);
-
-    private transient Disposable handleDisposable =null;
-
-    private transient Disposable generatePingMsgDisposable =null;
-
-    /**
-     * 持有锁的线程
-     */
-
-    private transient ExecutorService heldLockThread= new ThreadPoolExecutor(1, 1,
-        Integer.MAX_VALUE, TimeUnit.SECONDS, new ArrayBlockingQueue(1024),r->new Thread(r,"cluster-task"),new ThreadPoolExecutor.DiscardOldestPolicy());
-
-    ClusterUniqueTask(String schedulerId,
-                      AbstractExecutionContext context,
-                      TaskExecutor executor,
-                      String currentSeverId,
-                      ClusterManager clusterManager,
-                      RedissonClient redissonClient) {
-        this.schedulerId = schedulerId;
-        this.context = context;
-        this.executor = executor;
-        this.currentSeverId=currentSeverId;
-        this.workerId=currentSeverId;
-        this.id=context.getInstanceId()+"-"+context.getJob().getNodeId();
-        this.clusterManager=clusterManager;
-        this.pingTopic=String.format(this.pingTopic,this.id);
-        this.operationTopic=String.format(this.operationTopic,this.id);
-        this.lock = redissonClient.getLock("cluster-unique-"+this.getId());
-        this.redissonClient=redissonClient;
-        //先创建本地任务,再争夺任务的唯一锁,避免消息漏发
-        init()
-            .subscribe();
-    }
-
-    @Override
-    public String getName() {
-        return this.executor.getName();
-    }
-
-    @Override
-    public ScheduleJob getJob() {
-        return context.getJob();
-    }
-
-    /**
-     * 设置任务应该集群中所有机器全部进行设置
-     * 所以当本机设置任务成功后,广播给集群机器对任务进行更新
-     * @param job
-     * @return
-     */
-    @Override
-    public Mono<Void> setJob(ScheduleJob job) {
-        return Mono.fromRunnable(() -> {
-            ScheduleJob old = context.getJob();
-            context.setJob(job);
-            try {
-                executor.validate();
-            } catch (Throwable e) {
-                context.setJob(old);
-                throw e;
-            }
-        }).concatWith(this.clusterManager.getTopic(this.operationTopic)
-                    .publish(Mono.just(OperationMessage.of(TaskOperation.JOB,Collections.singletonList(job),this.currentSeverId))))
-            .then();
-
-    }
-
-
-
-    private Mono<Void> init(){
-
-        return generatePingMsg()
-            .mergeWith(this.handlePingMsg())
-            .then();
-    }
-
-    /**
-     * 产生心跳信息
-     * @return
-     */
-    private Mono<Void> generatePingMsg() {
-        return Flux.interval(Duration.ofSeconds(pingTime/2))
-            .filter(ignore->{
-                if (isReplica()) {
-                    this.heldLockThread.execute(()->{
-                        if (this.lock.isHeldByThread(Thread.currentThread().getId())) {
-                            //非工作节点且占据锁,则解锁
-                            this.lock.unlock();
-                        }else if(!this.isAlive.get()&&!this.lock.isLocked()){
-                            //争夺锁
-                            try {
-                                //获取锁
-                                if (this.lock.tryLock(-1, pingTime, TimeUnit.SECONDS)) {
-                                    //获取锁成功后改写工作节点id
-                                    this.workerId=this.getCurrentSeverId();
-                                    this.isAlive.set(true);
-                                }else {
-                                    //失败后停止发送心跳(若存在)
-                                    this.workerId=null;
-                                    if(this.generatePingMsgDisposable!=null&&!this.generatePingMsgDisposable.isDisposed()){
-                                        this.generatePingMsgDisposable.dispose();
-                                    }
-                                }
-                            }catch (InterruptedException e){
-                            }
-                        }
-                    });
-                    return false;
-                }
-                return true;
-            })
-            .doOnNext(ignore->{
-                //锁续约
-                AtomicBoolean finished= new AtomicBoolean(false);
-                this.heldLockThread.execute(()->{
-                    try {
-                        if (this.lock.tryLock(-1,pingTime/2, TimeUnit.SECONDS)) {
-                            this.workerId=this.getCurrentSeverId();
-                        }else {
-                            this.workerId=null;
-                            if(this.generatePingMsgDisposable!=null&&!this.generatePingMsgDisposable.isDisposed()){
-                                this.generatePingMsgDisposable.dispose();
-                            }
-                        }
-
-                    } catch (InterruptedException e) {
-                    }finally {
-                        finished.set(true);
-                    }
-                });
-                while (!finished.get()){
-                }
-                if(!isReplica()){
-                    if(State.running.equals(this.taskState)&&!State.running.equals(this.executor.getState())){
-                        this.executor.start();
-                    }
-                    if(this.generatePingMsgDisposable==null||this.generatePingMsgDisposable.isDisposed()){
-                        this.generatePingMsgDisposable = this.clusterManager.getTopic(pingTopic).publish(Mono.just(this)).subscribe();
-                    }
-                }
-
-            })
-            .then();
-    }
-
-
-    /**
-     * 处理消息
-     *
-     */
-    private Mono<Void> handlePingMsg(){
-        return Flux.interval(Duration.ofSeconds(pingTime/3))
-            .filter(ignore-> (this.handleDisposable ==null|| this.handleDisposable.isDisposed()))
-            .doOnNext(ignore->{
-                    if(this.generatePingMsgDisposable!=null&&!this.generatePingMsgDisposable.isDisposed()){
-                        if(this.handleDisposable!=null&&!this.handleDisposable.isDisposed()){
-                            this.handleDisposable.isDisposed();
-                        }
-                        return;
-                    }
-                    this.handleDisposable = this.clusterManager.getTopic(this.pingTopic)
-                        .subscribePattern()
-                        .mergeWith(this.clusterManager.getTopic(this.operationTopic).subscribePattern())
-                        .timeout(Duration.ofSeconds(Math.multiplyExact(pingTime,2)), Mono.fromRunnable(() -> this.isAlive.set(false)))
-                        .doOnNext(__->{
-                            if(isReplica()&&this.executor.getState().equals(State.running)){
-                                this.taskState=State.running;
-                                this.executor.pause();
-                            }
-                        })
-                        .publishOn(Schedulers.boundedElastic())
-                        .flatMap(obj -> {
-                            if(this.generatePingMsgDisposable!=null&&!this.generatePingMsgDisposable.isDisposed()){
-                                this.generatePingMsgDisposable.dispose();
-                            }
-                            Object message = obj.getMessage();
-                            this.isAlive.set(true);
-                            if (message instanceof ClusterUniqueTask) {
-                                //心跳信息
-                                ClusterUniqueTask task = (ClusterUniqueTask) message;
-                                if(task.getCurrentSeverId().equals(this.workerId)){
-                                    return Mono.empty();
-                                }
-                                this.workerId = task.currentSeverId;
-                                this.taskState = task.taskState;
-                                this.lastStateTime = task.lastStateTime;
-                                this.startTime = task.startTime;
-                            } else if (message instanceof OperationMessage) {
-                                //操作信息
-                                OperationMessage operationMessage = (OperationMessage) message;
-                                if(operationMessage.fromServerId.equals(this.workerId)){
-                                    return Mono.empty();
-                                }
-                                //任务存活
-                                return operation(operationMessage);
-                            }
-                            return Mono.empty();
-                        })
-                        .subscribe();
-                }
-            ).then();
-    }
-
-    @Override
-    public Mono<Void> reload() {
-        return operation(OperationMessage.of(TaskOperation.RELOAD,currentSeverId))
-            .concatWith(this.clusterManager.getTopic(this.operationTopic)
-                        .publish(Mono.just(OperationMessage.of(TaskOperation.RELOAD,null,this.currentSeverId))).then())
-            .then();
-    }
-
-    @Override
-    public Mono<Void> start() {
-        return operation(OperationMessage.of(TaskOperation.START,this.currentSeverId));
-    }
-
-    @Override
-    public Mono<Void> pause() {
-        return operation(OperationMessage.of(TaskOperation.PAUSE,this.currentSeverId));
-    }
-
-    @Override
-    public Mono<Void> shutdown() {
-        return operation(OperationMessage.of(TaskOperation.SHUTDOWN,this.currentSeverId));
-    }
-
-    @Override
-    public Mono<Void> execute(RuleData data) {
-        return operation(OperationMessage.of(TaskOperation.EXECUTE,Collections.singletonList(data),this.currentSeverId));
-    }
-
-    @Override
-    public Mono<State> getState() {
-        if(isReplica()){
-            //副本
-            return Mono.just(this.getTaskState());
-        }
-        return Mono.just(executor.getState());
-    }
-
-    @Override
-    public Mono<Void> debug(boolean debug) {
-        return operation(debug ?OperationMessage.of(TaskOperation.ENABLE_DEBUG,this.currentSeverId)  :OperationMessage.of(TaskOperation.DISABLE_DEBUG,this.currentSeverId));
-    }
-
-    @Override
-    public Mono<Long> getLastStateTime() {
-        return Mono.just(this.lastStateTime);
-    }
-
-    @Override
-    public Mono<Long> getStartTime() {
-        return Mono.just(this.startTime);
-    }
-
-    private Mono<Void> operation(OperationMessage message) {
-        TaskOperation operation=message.operation;
-        this.taskState=State.unknown.equals(operation.getState())?this.taskState:operation.getState();
-        if(isReplica()){
-            //当前为任务副本或重新加载不在此处进行广播操作
-            if(!TaskOperation.JOB.equals(message.operation)&&!TaskOperation.RELOAD.equals(message.getOperation())){
-                return  this.clusterManager.getTopic(this.operationTopic)
-                    .publish(Mono.just(message))
-                    .then();
-            }
-        }
-        long currentTimeMillis = System.currentTimeMillis();
-        this.lastStateTime=currentTimeMillis;
-        //该任务不是其他任务的副本,即运行实例位于该机器中,改变状态后进行广播操作
-        switch (operation){
-            case JOB:
-                return Mono.fromRunnable(() -> {
-                    ScheduleJob job = (ScheduleJob) message.getParams().get(0);
-                    ScheduleJob old = context.getJob();
-                    context.setJob(job);
-                    try {
-                        executor.validate();
-                    } catch (Throwable e) {
-                        context.setJob(old);
-                        throw e;
-                    }
-                });
-            case START:
-                return Mono.<Void>fromRunnable(this.executor::start)
-                    .doOnSuccess((v) ->this. startTime =currentTimeMillis)
-                    .subscribeOn(Schedulers.boundedElastic());
-            case PAUSE:
-                return Mono.fromRunnable(this.executor::pause);
-            case RELOAD:
-                return Mono.<Void>fromRunnable(this.executor::reload)
-                    .subscribeOn(Schedulers.boundedElastic());
-            case SHUTDOWN:
-                this.taskState=State.shutdown;
-                //解锁
-                this.heldLockThread.execute(()->this.lock.unlock());
-                break;
-            case ENABLE_DEBUG:
-                return Mono.fromRunnable(() ->this. context.setDebug(true));
-            case DISABLE_DEBUG:
-                return Mono.fromRunnable(() -> this.context.setDebug(false));
-            case EXECUTE:
-                if(this.executor instanceof ExecutableTaskExecutor){
-                    RuleData data = (RuleData) message.getParams().get(0);
-                    return ((ExecutableTaskExecutor) this.executor).execute(data);
-                }
-                return Mono.empty();
-
-            default:break;
-        }
-        return Mono.empty();
-    }
-
-
-    /**
-     * 副本节点
-     * @return
-     */
-    private boolean isReplica(){
-        return !this.getCurrentSeverId().equals(this.getWorkerId());
-    }
-
-
-
-    @AllArgsConstructor
-    enum TaskOperation implements Serializable {
-        /**
-         * 开始任务
-         */
-        START(State.running),
-        /**
-         * 暂停任务
-         */
-        PAUSE(State.paused),
-        /**
-         * 重载任务
-         */
-        RELOAD(State.unknown),
-        /**
-         * 关闭任务
-         */
-        SHUTDOWN(State.shutdown),
-        /**
-         * 设置任务
-         */
-        JOB(State.unknown),
-        /**
-         * 执行任务
-         */
-        EXECUTE(State.unknown),
-        /**
-         * 开启debug
-         */
-        ENABLE_DEBUG(State.unknown),
-        /**
-         * 关闭debug
-         */
-        DISABLE_DEBUG(State.unknown);
-        private State state;
-
-        public State getState() {
-            return state;
-        }
-    }
-
-    @Data
-    public static class OperationMessage implements Serializable{
-        private TaskOperation operation;
-        private List<Object> params;
-        private String fromServerId;
-
-        private OperationMessage(TaskOperation operation, List<Object> params, String fromServerId) {
-            this.operation = operation;
-            this.params = params;
-            this.fromServerId=fromServerId;
-        }
-
-        public static OperationMessage of(TaskOperation operation, List<Object> params,String fromServerId) {
-            return new OperationMessage(operation,params,fromServerId);
-        }
-
-        public static OperationMessage of(TaskOperation operation,String fromServerId) {
-            return new OperationMessage(operation,null,fromServerId);
-        }
-    }
-}

+ 1 - 3
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/cluster/ClusterUniqueWork.java

@@ -8,8 +8,6 @@ import org.jetlinks.rule.engine.api.task.ConditionEvaluator;
 import org.jetlinks.rule.engine.api.task.Task;
 import org.jetlinks.rule.engine.api.task.TaskExecutorProvider;
 import org.jetlinks.rule.engine.api.worker.Worker;
-import org.jetlinks.rule.engine.cluster.scope.ClusterGlobalScope;
-import org.jetlinks.rule.engine.cluster.worker.ClusterExecutionContext;
 import org.jetlinks.rule.engine.defaults.DefaultExecutionContext;
 import org.jetlinks.rule.engine.defaults.scope.InMemoryGlobalScope;
 import org.redisson.api.RedissonClient;
@@ -66,7 +64,7 @@ public class ClusterUniqueWork implements Worker, Serializable {
             .flatMap(provider -> {
                 DefaultExecutionContext context = createContext(job);
                 return provider.createTask(context)
-                    .map(executor -> new ClusterUniqueTask(schedulerId, context,executor,id,clusterManager,redissonClient));
+                    .map(executor -> new RuleClusterUniqueTask(schedulerId, context,executor,id,clusterManager,redissonClient));
             });
     }
 

+ 488 - 0
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/cluster/RuleClusterUniqueTask.java

@@ -0,0 +1,488 @@
+package org.jetlinks.community.rule.engine.cluster;
+
+import lombok.*;
+import lombok.extern.slf4j.Slf4j;
+import org.jetlinks.core.cluster.ClusterManager;
+import org.jetlinks.core.cluster.ClusterUniqueTask;
+import org.jetlinks.rule.engine.api.RuleData;
+import org.jetlinks.rule.engine.api.scheduler.ScheduleJob;
+import org.jetlinks.rule.engine.api.task.ExecutableTaskExecutor;
+import org.jetlinks.rule.engine.api.task.Task;
+import org.jetlinks.rule.engine.api.task.TaskExecutor;
+import org.jetlinks.rule.engine.defaults.AbstractExecutionContext;
+import org.redisson.api.RLock;
+import org.redisson.api.RedissonClient;
+import reactor.core.Disposable;
+import reactor.core.publisher.*;
+import reactor.core.scheduler.Schedulers;
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.concurrent.*;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName 集群唯一任务.java
+ * @Description 集群唯一任务
+ *  1、监听自身的任务情况
+ *  2、改变状态后向集群广播
+ *
+ *
+ *  管理自身状态(除开启和停止以外的状态)
+ *  任务自行进行管理,由work进行监听
+ * @createTime 2021年11月12日 10:57:00
+ */
+@Slf4j
+@EqualsAndHashCode(callSuper = false)
+class RuleClusterUniqueTask extends ClusterUniqueTask implements Task ,Serializable{
+
+    private static final long serialVersionUID=1L;
+
+
+    @Getter
+    private final String schedulerId;
+
+
+    /**
+     * 最后更新时间
+     */
+    private long lastStateTime;
+
+
+
+    /**
+     * 开始时间
+     */
+    private long startTime;
+
+    /**
+     * 操作主题
+     */
+    private Task.State taskState;
+
+    private transient final AbstractExecutionContext context;
+
+    private transient final TaskExecutor executor;
+
+    /**
+     * 持有锁的线程
+     */
+
+    private transient ExecutorService heldLockThread= new ThreadPoolExecutor(1, 1,
+        Integer.MAX_VALUE, TimeUnit.SECONDS, new ArrayBlockingQueue(1024),r->new Thread(r,"cluster-task"),new ThreadPoolExecutor.DiscardOldestPolicy());
+
+    RuleClusterUniqueTask(String schedulerId,
+                          AbstractExecutionContext context,
+                          TaskExecutor executor,
+                          String currentSeverId,
+                          ClusterManager clusterManager,
+                          RedissonClient redissonClient) {
+        super(context.getInstanceId()+"-"+context.getJob().getNodeId(),currentSeverId,clusterManager,redissonClient);
+        this.schedulerId = schedulerId;
+        this.context = context;
+        this.executor = executor;
+//        this.workerId=currentSeverId;
+//        this.id=context.getInstanceId()+"-"+context.getJob().getNodeId();
+//        this.clusterManager=clusterManager;
+//        this.pingTopic=String.format(this.pingTopic,this.id);
+//        this.operationTopic=String.format(this.operationTopic,this.id);
+//        this.lock = redissonClient.getLock("cluster-unique-"+this.getId());
+//        this.redissonClient=redissonClient;
+//        //先创建本地任务,再争夺任务的唯一锁,避免消息漏发
+//        init()
+//            .subscribe();
+    }
+
+    @Override
+    public String getName() {
+        return this.executor.getName();
+    }
+
+    @Override
+    public ScheduleJob getJob() {
+        return context.getJob();
+    }
+
+    /**
+     * 设置任务应该集群中所有机器全部进行设置
+     * 所以当本机设置任务成功后,广播给集群机器对任务进行更新
+     * @param job
+     * @return
+     */
+    @Override
+    public Mono<Void> setJob(ScheduleJob job) {
+        return Mono.fromRunnable(() -> {
+            ScheduleJob old = context.getJob();
+            context.setJob(job);
+            try {
+                executor.validate();
+            } catch (Throwable e) {
+                context.setJob(old);
+                throw e;
+            }
+        }).concatWith(this.getClusterManager().getTopic(this.getOperationTopic())
+            .publish(Mono.just(OperationMessage.of(TaskOperation.JOB,Collections.singletonList(job),this.getCurrentSeverId()))))
+            .then();
+
+    }
+
+
+
+//    private Mono<Void> init(){
+//
+//        return generatePingMsg()
+//            .mergeWith(this.handlePingMsg())
+//            .then();
+//    }
+
+//    /**
+//     * 产生心跳信息
+//     * @return
+//     */
+//    private Mono<Void> generatePingMsg() {
+//        return Flux.interval(Duration.ofSeconds(pingTime/2))
+//            .filter(ignore->{
+//                if (isReplica()) {
+//                    this.heldLockThread.execute(()->{
+//                        if (this.lock.isHeldByThread(Thread.currentThread().getId())) {
+//                            //非工作节点且占据锁,则解锁
+//                            this.lock.unlock();
+//                        }else if(!this.isAlive.get()&&!this.lock.isLocked()){
+//                            //争夺锁
+//                            try {
+//                                //获取锁
+//                                if (this.lock.tryLock(-1, pingTime, TimeUnit.SECONDS)) {
+//                                    //获取锁成功后改写工作节点id
+//                                    this.workerId=this.getCurrentSeverId();
+//                                    this.isAlive.set(true);
+//                                }else {
+//                                    //失败后停止发送心跳(若存在)
+//                                    this.workerId=null;
+//                                    if(this.generatePingMsgDisposable!=null&&!this.generatePingMsgDisposable.isDisposed()){
+//                                        this.generatePingMsgDisposable.dispose();
+//                                    }
+//                                }
+//                            }catch (InterruptedException e){
+//                            }
+//                        }
+//                    });
+//                    return false;
+//                }
+//                return true;
+//            })
+//            .doOnNext(ignore->{
+//                //锁续约
+//                AtomicBoolean finished= new AtomicBoolean(false);
+//                this.heldLockThread.execute(()->{
+//                    try {
+//                        if (this.lock.tryLock(-1,pingTime/2, TimeUnit.SECONDS)) {
+//                            this.workerId=this.getCurrentSeverId();
+//                        }else {
+//                            this.workerId=null;
+//                            if(this.generatePingMsgDisposable!=null&&!this.generatePingMsgDisposable.isDisposed()){
+//                                this.generatePingMsgDisposable.dispose();
+//                            }
+//                        }
+//
+//                    } catch (InterruptedException e) {
+//                    }finally {
+//                        finished.set(true);
+//                    }
+//                });
+//                while (!finished.get()){
+//                }
+//                if(!isReplica()){
+//                    if(State.running.equals(this.taskState)&&!State.running.equals(this.executor.getState())){
+//                        this.executor.start();
+//                    }
+//                    if(this.generatePingMsgDisposable==null||this.generatePingMsgDisposable.isDisposed()){
+//                        this.generatePingMsgDisposable = this.clusterManager.getTopic(pingTopic).publish(Mono.just(this)).subscribe();
+//                    }
+//                }
+//
+//            })
+//            .then();
+//    }
+
+    @Override
+    public void beMasterPostProcessor() {
+        if(isReplica()&&this.executor.getState().equals(State.running)){
+            this.taskState=State.running;
+            this.executor.pause();
+        }
+    }
+
+    @Override
+    public Mono<Void> handleMsg(Object message) {
+        if (message instanceof RuleClusterUniqueTask) {
+            //心跳信息
+            RuleClusterUniqueTask task = (RuleClusterUniqueTask) message;
+            if (task.getCurrentSeverId().equals(this.getWorkerId())) {
+                return Mono.empty();
+            }
+            this.setWorkerId(task.getCurrentSeverId());
+            this.taskState = task.taskState;
+            this.lastStateTime = task.lastStateTime;
+            this.startTime = task.startTime;
+        } else if (message instanceof OperationMessage) {
+            //操作信息
+            OperationMessage operationMessage = (OperationMessage) message;
+            if (operationMessage.getFromServerId().equals(this.getWorkerId())) {
+                return Mono.empty();
+            }
+            //任务存活
+            return operation(operationMessage);
+        }
+        return Mono.empty();
+    }
+
+    @Override
+    public void beforeHandleMsg() {
+        if(isReplica()&&this.executor.getState().equals(State.running)){
+            this.taskState=State.running;
+            this.executor.pause();
+        }
+    }
+
+
+//    /**
+//     * 处理消息
+//     *
+//     */
+//    private Mono<Void> handlePingMsg(){
+//        return Flux.interval(Duration.ofSeconds(pingTime/3))
+//            .filter(ignore-> (this.handleDisposable ==null|| this.handleDisposable.isDisposed()))
+//            .doOnNext(ignore->{
+//                    if(this.generatePingMsgDisposable!=null&&!this.generatePingMsgDisposable.isDisposed()){
+//                        if(this.handleDisposable!=null&&!this.handleDisposable.isDisposed()){
+//                            this.handleDisposable.isDisposed();
+//                        }
+//                        return;
+//                    }
+//                    this.handleDisposable = this.clusterManager.getTopic(this.pingTopic)
+//                        .subscribePattern()
+//                        .mergeWith(this.clusterManager.getTopic(this.operationTopic).subscribePattern())
+//                        .timeout(Duration.ofSeconds(Math.multiplyExact(pingTime,2)), Mono.fromRunnable(() -> this.isAlive.set(false)))
+//                        .doOnNext(__->{
+//                            if(isReplica()&&this.executor.getState().equals(State.running)){
+//                                this.taskState=State.running;
+//                                this.executor.pause();
+//                            }
+//                        })
+//                        .publishOn(Schedulers.boundedElastic())
+//                        .flatMap(obj -> {
+//                            if(this.generatePingMsgDisposable!=null&&!this.generatePingMsgDisposable.isDisposed()){
+//                                this.generatePingMsgDisposable.dispose();
+//                            }
+//                            Object message = obj.getMessage();
+//                            this.isAlive.set(true);
+//                            if (message instanceof RuleClusterUniqueTask) {
+//                                //心跳信息
+//                                RuleClusterUniqueTask task = (RuleClusterUniqueTask) message;
+//                                if(task.getCurrentSeverId().equals(this.workerId)){
+//                                    return Mono.empty();
+//                                }
+//                                this.workerId = task.currentSeverId;
+//                                this.taskState = task.taskState;
+//                                this.lastStateTime = task.lastStateTime;
+//                                this.startTime = task.startTime;
+//                            } else if (message instanceof OperationMessage) {
+//                                //操作信息
+//                                OperationMessage operationMessage = (OperationMessage) message;
+//                                if(operationMessage.fromServerId.equals(this.workerId)){
+//                                    return Mono.empty();
+//                                }
+//                                //任务存活
+//                                return operation(operationMessage);
+//                            }
+//                            return Mono.empty();
+//                        })
+//                        .subscribe();
+//                }
+//            ).then();
+//    }
+
+    @Override
+    public Mono<Void> reload() {
+        return operation(OperationMessage.of(TaskOperation.RELOAD,getCurrentSeverId()))
+            .concatWith(this.getClusterManager().getTopic(this.getOperationTopic())
+                .publish(Mono.just(OperationMessage.of(TaskOperation.RELOAD,null,this.getCurrentSeverId()))).then())
+            .then();
+    }
+
+    @Override
+    public Mono<Void> start() {
+        return operation(OperationMessage.of(TaskOperation.START,this.getCurrentSeverId()));
+    }
+
+    @Override
+    public Mono<Void> pause() {
+        return operation(OperationMessage.of(TaskOperation.PAUSE,this.getCurrentSeverId()));
+    }
+
+    @Override
+    public Mono<Void> shutdown() {
+        return operation(OperationMessage.of(TaskOperation.SHUTDOWN,this.getCurrentSeverId()));
+    }
+
+    @Override
+    public Mono<Void> execute(RuleData data) {
+        return operation(OperationMessage.of(TaskOperation.EXECUTE,Collections.singletonList(data),this.getCurrentSeverId()));
+    }
+
+    @Override
+    public Mono<State> getState() {
+        if(isReplica()){
+            //副本
+            return Mono.just(this.taskState);
+        }
+        return Mono.just(executor.getState());
+    }
+
+    @Override
+    public Mono<Void> debug(boolean debug) {
+        return operation(debug ?OperationMessage.of(TaskOperation.ENABLE_DEBUG,this.getCurrentSeverId())  :OperationMessage.of(TaskOperation.DISABLE_DEBUG,this.getCurrentSeverId()));
+    }
+
+    @Override
+    public Mono<Long> getLastStateTime() {
+        return Mono.just(this.lastStateTime);
+    }
+
+    @Override
+    public Mono<Long> getStartTime() {
+        return Mono.just(this.startTime);
+    }
+
+    private Mono<Void> operation(OperationMessage message) {
+        TaskOperation operation=message.operation;
+        this.taskState=State.unknown.equals(operation.getState())?this.taskState:operation.getState();
+        if(isReplica()){
+            //当前为任务副本或重新加载不在此处进行广播操作
+            if(!TaskOperation.JOB.equals(message.operation)&&!TaskOperation.RELOAD.equals(message.getOperation())){
+                return  this.getClusterManager().getTopic(this.getOperationTopic())
+                    .publish(Mono.just(message))
+                    .then();
+            }
+        }
+        long currentTimeMillis = System.currentTimeMillis();
+        this.lastStateTime=currentTimeMillis;
+        //该任务不是其他任务的副本,即运行实例位于该机器中,改变状态后进行广播操作
+        switch (operation){
+            case JOB:
+                return Mono.fromRunnable(() -> {
+                    ScheduleJob job = (ScheduleJob) message.getParams().get(0);
+                    ScheduleJob old = context.getJob();
+                    context.setJob(job);
+                    try {
+                        executor.validate();
+                    } catch (Throwable e) {
+                        context.setJob(old);
+                        throw e;
+                    }
+                });
+            case START:
+                return Mono.<Void>fromRunnable(this.executor::start)
+                    .doOnSuccess((v) ->this. startTime =currentTimeMillis)
+                    .subscribeOn(Schedulers.boundedElastic());
+            case PAUSE:
+                return Mono.fromRunnable(this.executor::pause);
+            case RELOAD:
+                return Mono.<Void>fromRunnable(this.executor::reload)
+                    .subscribeOn(Schedulers.boundedElastic());
+            case SHUTDOWN:
+                this.taskState=State.shutdown;
+                //解锁
+                this.heldLockThread.execute(()->this.getLock().unlock());
+                break;
+            case ENABLE_DEBUG:
+                return Mono.fromRunnable(() ->this. context.setDebug(true));
+            case DISABLE_DEBUG:
+                return Mono.fromRunnable(() -> this.context.setDebug(false));
+            case EXECUTE:
+                if(this.executor instanceof ExecutableTaskExecutor){
+                    RuleData data = (RuleData) message.getParams().get(0);
+                    return ((ExecutableTaskExecutor) this.executor).execute(data);
+                }
+                return Mono.empty();
+
+            default:break;
+        }
+        return Mono.empty();
+    }
+
+
+//    /**
+//     * 副本节点
+//     * @return
+//     */
+//    private boolean isReplica(){
+//        return !this.getCurrentSeverId().equals(this.getWorkerId());
+//    }
+
+
+
+    @AllArgsConstructor
+    enum TaskOperation implements Serializable {
+        /**
+         * 开始任务
+         */
+        START(State.running),
+        /**
+         * 暂停任务
+         */
+        PAUSE(State.paused),
+        /**
+         * 重载任务
+         */
+        RELOAD(State.unknown),
+        /**
+         * 关闭任务
+         */
+        SHUTDOWN(State.shutdown),
+        /**
+         * 设置任务
+         */
+        JOB(State.unknown),
+        /**
+         * 执行任务
+         */
+        EXECUTE(State.unknown),
+        /**
+         * 开启debug
+         */
+        ENABLE_DEBUG(State.unknown),
+        /**
+         * 关闭debug
+         */
+        DISABLE_DEBUG(State.unknown);
+        private State state;
+
+        public State getState() {
+            return state;
+        }
+    }
+
+    @Data
+    public static class OperationMessage implements Serializable{
+        private TaskOperation operation;
+        private List<Object> params;
+        private String fromServerId;
+
+        private OperationMessage(TaskOperation operation, List<Object> params, String fromServerId) {
+            this.operation = operation;
+            this.params = params;
+            this.fromServerId=fromServerId;
+        }
+
+        public static OperationMessage of(TaskOperation operation, List<Object> params,String fromServerId) {
+            return new OperationMessage(operation,params,fromServerId);
+        }
+
+        public static OperationMessage of(TaskOperation operation,String fromServerId) {
+            return new OperationMessage(operation,null,fromServerId);
+        }
+    }
+}

+ 15 - 0
jetlinks-core/pom.xml

@@ -81,6 +81,21 @@
             <optional>true</optional>
         </dependency>
 
+
+
+        <dependency>
+            <groupId>org.redisson</groupId>
+            <artifactId>redisson-spring-boot-starter</artifactId>
+            <version>3.13.6</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.springframework.boot</groupId>
+                    <artifactId>spring-boot-starter-web</artifactId>
+                </exclusion>
+            </exclusions>
+            <scope>provided</scope>
+        </dependency>
+
     </dependencies>
 
 </project>

+ 220 - 0
jetlinks-core/src/main/java/org/jetlinks/core/cluster/ClusterUniqueTask.java

@@ -0,0 +1,220 @@
+package org.jetlinks.core.cluster;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import org.redisson.api.RLock;
+import org.redisson.api.RedissonClient;
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName ClusterUniqueTask.java
+ * @Description TODO
+ * @createTime 2021年11月27日 10:54:00
+ */
+@AllArgsConstructor
+@Data
+public abstract class ClusterUniqueTask implements Serializable {
+    private static final long serialVersionUID=1L;
+    private String id;
+    /**
+     * 工作服务器节点id
+     */
+    private String workerId;
+    /**
+     * 当前服务器id
+     */
+    private final String currentSeverId;
+
+    /**
+     * 默认心跳时间10s
+     */
+    private static final int pingTime=10;
+
+    /**
+     * 心跳,心跳包传递任务状态,集群实时更新任务状态
+     */
+    private transient String pingTopic ="task-ping-%s";
+
+    private transient String operationTopic ="task-operation-%s";
+
+    private transient RLock lock;
+
+    private transient final ClusterManager clusterManager;
+
+    private transient AtomicBoolean isAlive=new AtomicBoolean(true);
+
+    private transient Disposable handleDisposable =null;
+
+    private transient Disposable generatePingMsgDisposable =null;
+
+    /**
+     * 持有锁的线程
+     */
+
+    private transient ExecutorService heldLockThread= new ThreadPoolExecutor(1, 1,
+        Integer.MAX_VALUE, TimeUnit.SECONDS, new ArrayBlockingQueue(1024),r->new Thread(r,"cluster-task"),new ThreadPoolExecutor.DiscardOldestPolicy());
+
+    public ClusterUniqueTask(String id,String currentSeverId,
+                          ClusterManager clusterManager,
+                          RedissonClient redissonClient) {
+        this.currentSeverId=currentSeverId;
+        this.workerId=currentSeverId;
+        this.clusterManager=clusterManager;
+        this.pingTopic=String.format(this.pingTopic,this.id);
+        this.operationTopic=String.format(this.operationTopic,this.id);
+        this.lock = redissonClient.getLock("cluster-unique-"+this.id);
+        //先创建本地任务,再争夺任务的唯一锁,避免消息漏发
+        init()
+            .subscribe();
+    }
+
+
+
+    private Mono<Void> init(){
+
+        return generatePingMsg()
+            .mergeWith(this.handlePingMsg())
+            .then();
+    }
+
+    /**
+     * 产生心跳信息
+     * @return
+     */
+    private Mono<Void> generatePingMsg() {
+        return Flux.interval(Duration.ofSeconds(pingTime/2))
+            .filter(ignore->{
+                if (isReplica()) {
+                    this.heldLockThread.execute(()->{
+                        if (this.lock.isHeldByThread(Thread.currentThread().getId())) {
+                            //非工作节点且占据锁,则解锁
+                            this.lock.unlock();
+                        }else if(!this.isAlive.get()&&!this.lock.isLocked()){
+                            //争夺锁
+                            try {
+                                //获取锁
+                                if (this.lock.tryLock(-1, pingTime, TimeUnit.SECONDS)) {
+                                    //获取锁成功后改写工作节点id
+                                    this.workerId=this.currentSeverId;
+                                    this.isAlive.set(true);
+                                }else {
+                                    //失败后停止发送心跳(若存在)
+                                    this.workerId=null;
+                                    if(this.generatePingMsgDisposable!=null&&!this.generatePingMsgDisposable.isDisposed()){
+                                        this.generatePingMsgDisposable.dispose();
+                                    }
+                                }
+                            }catch (InterruptedException e){
+                            }
+                        }
+                    });
+                    return false;
+                }
+                return true;
+            })
+            .doOnNext(ignore->{
+                //锁续约
+                AtomicBoolean finished= new AtomicBoolean(false);
+                this.heldLockThread.execute(()->{
+                    try {
+                        if (this.lock.tryLock(-1,pingTime/2, TimeUnit.SECONDS)) {
+                            this.workerId=this.currentSeverId;
+                        }else {
+                            this.workerId=null;
+                            if(this.generatePingMsgDisposable!=null&&!this.generatePingMsgDisposable.isDisposed()){
+                                this.generatePingMsgDisposable.dispose();
+                            }
+                        }
+
+                    } catch (InterruptedException e) {
+                    }finally {
+                        finished.set(true);
+                    }
+                });
+                while (!finished.get()){
+                }
+                if(!isReplica()){
+                    beMasterPostProcessor();
+                    if(this.generatePingMsgDisposable==null||this.generatePingMsgDisposable.isDisposed()){
+                        this.generatePingMsgDisposable = this.clusterManager.getTopic(pingTopic).publish(Mono.just(this)).subscribe();
+                    }
+                }
+
+            })
+            .then();
+    }
+
+    /**
+     * 当该节点变为集群主节点时,调用该方法进行处理操作
+     */
+    public abstract void beMasterPostProcessor();
+
+    /**
+     * 当有集群消息发送过来时,重新该方法处理消息
+     * @param msg
+     */
+    public abstract Mono<?> handleMsg(Object msg);
+
+
+    /**
+     * 当集群消息发过来时,在处理消息前调用该方法
+     */
+    public abstract void beforeHandleMsg();
+
+
+    /**
+     * 处理消息
+     *
+     */
+    private Mono<Void> handlePingMsg(){
+        return Flux.interval(Duration.ofSeconds(pingTime/3))
+            .filter(ignore-> (this.handleDisposable ==null|| this.handleDisposable.isDisposed()))
+            .doOnNext(ignore->{
+                    if(this.generatePingMsgDisposable!=null&&!this.generatePingMsgDisposable.isDisposed()){
+                        if(this.handleDisposable!=null&&!this.handleDisposable.isDisposed()){
+                            this.handleDisposable.isDisposed();
+                        }
+                        return;
+                    }
+                    this.handleDisposable = this.clusterManager.getTopic(this.pingTopic)
+                        .subscribePattern()
+                        .mergeWith(this.clusterManager.getTopic(this.operationTopic).subscribePattern())
+                        .timeout(Duration.ofSeconds(Math.multiplyExact(pingTime,2)), Mono.fromRunnable(() -> this.isAlive.set(false)))
+                        .doOnNext(__->{
+                            beforeHandleMsg();
+                        })
+                        .publishOn(Schedulers.boundedElastic())
+                        .flatMap(obj -> {
+                            if(this.generatePingMsgDisposable!=null&&!this.generatePingMsgDisposable.isDisposed()){
+                                this.generatePingMsgDisposable.dispose();
+                            }
+                            Object message = obj.getMessage();
+                            this.isAlive.set(true);
+                            return handleMsg(message);
+                        })
+                        .subscribe();
+                }
+            ).then();
+    }
+    /**
+     * 副本节点
+     * @return
+     */
+    public boolean isReplica(){
+        return !this.currentSeverId.equals(this.workerId);
+    }
+
+}

+ 2 - 1
pom.xml

@@ -32,7 +32,8 @@
         <fastjson.version>1.2.75</fastjson.version>
         <hutool.version>5.7.16</hutool.version>
         <jetlinks.version>1.1.7-SNAPSHOT</jetlinks.version>
-        <aliyun.sdk.version>4.5.2</aliyun.sdk.version>
+        <aliyun.sdk.version>4.5.6</aliyun.sdk.version>
+        <aliyun.iot.sdk.version>7.29.0</aliyun.iot.sdk.version>
     </properties>
 
     <build>