|
|
@@ -0,0 +1,359 @@
|
|
|
+package org.jetlinks.community.rule.engine.cluster;
|
|
|
+
|
|
|
+import lombok.AllArgsConstructor;
|
|
|
+import lombok.Data;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.jetlinks.core.cluster.ClusterManager;
|
|
|
+import org.jetlinks.rule.engine.api.RuleData;
|
|
|
+import org.jetlinks.rule.engine.api.scheduler.ScheduleJob;
|
|
|
+import org.jetlinks.rule.engine.api.task.ExecutableTaskExecutor;
|
|
|
+import org.jetlinks.rule.engine.api.task.Task;
|
|
|
+import org.jetlinks.rule.engine.api.task.TaskExecutor;
|
|
|
+import org.jetlinks.rule.engine.defaults.AbstractExecutionContext;
|
|
|
+import org.redisson.api.RLock;
|
|
|
+import org.redisson.api.RedissonClient;
|
|
|
+import reactor.core.publisher.EmitterProcessor;
|
|
|
+import reactor.core.publisher.Flux;
|
|
|
+import reactor.core.publisher.Mono;
|
|
|
+import reactor.core.scheduler.Schedulers;
|
|
|
+import java.io.Serializable;
|
|
|
+import java.time.Duration;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.*;
|
|
|
+
|
|
|
+/**
|
|
|
+ * @author lifang
|
|
|
+ * @version 1.0.0
|
|
|
+ * @ClassName ClusterTask.java
|
|
|
+ * @Description 集群唯一任务
|
|
|
+ * 1、监听自身的任务情况
|
|
|
+ * 2、改变状态后向集群广播
|
|
|
+ *
|
|
|
+ *
|
|
|
+ * 管理自身状态(除开启和停止以外的状态)
|
|
|
+ * 任务自行进行管理,由work进行监听
|
|
|
+ * @createTime 2021年11月12日 10:57:00
|
|
|
+ */
|
|
|
+@Data
|
|
|
+@AllArgsConstructor
|
|
|
+@Slf4j
|
|
|
+public class ClusterUniqueTask implements Task ,Serializable{
|
|
|
+
|
|
|
+ private String workerId;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 当前服务器id
|
|
|
+ */
|
|
|
+ private final String currentSeverId;
|
|
|
+
|
|
|
+ private final String schedulerId;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 最后更新时间
|
|
|
+ */
|
|
|
+ private long lastStateTime;
|
|
|
+
|
|
|
+ private transient final RedissonClient redissonClient;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 开始时间
|
|
|
+ */
|
|
|
+ private long startTime;
|
|
|
+
|
|
|
+ private transient final AbstractExecutionContext context;
|
|
|
+
|
|
|
+ //默认心跳时间10s
|
|
|
+ private int pingTime=10;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 心跳,心跳包传递任务状态,集群实时更新任务状态
|
|
|
+ */
|
|
|
+ private transient String pingTopic ="cluster-unique-task-ping-%s";
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 操作主题
|
|
|
+ */
|
|
|
+ private transient String operationTopic ="cluster-unique-task-operation-%s";
|
|
|
+
|
|
|
+ private Task.State taskState;
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ private transient RLock lock;
|
|
|
+ /**
|
|
|
+ * 该任务已死亡
|
|
|
+ */
|
|
|
+ private boolean isDead=true;
|
|
|
+
|
|
|
+ private transient final ClusterManager clusterManager;
|
|
|
+
|
|
|
+ private transient final TaskExecutor executor;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String getId(){
|
|
|
+ return this.getContext().getInstanceId();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String getName() {
|
|
|
+ return executor.getName();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public ScheduleJob getJob() {
|
|
|
+ return context.getJob();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<Void> setJob(ScheduleJob job) {
|
|
|
+ if(isReplica()){
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ return Mono.fromRunnable(() -> {
|
|
|
+ ScheduleJob old = context.getJob();
|
|
|
+ context.setJob(job);
|
|
|
+ try {
|
|
|
+ executor.validate();
|
|
|
+ } catch (Throwable e) {
|
|
|
+ context.setJob(old);
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ public ClusterUniqueTask(String schedulerId,
|
|
|
+ AbstractExecutionContext context,
|
|
|
+ TaskExecutor executor,
|
|
|
+ String currentSeverId,
|
|
|
+ ClusterManager clusterManager,
|
|
|
+ RedissonClient redissonClient) {
|
|
|
+ this.schedulerId = schedulerId;
|
|
|
+ this.context = context;
|
|
|
+ this.executor = executor;
|
|
|
+ this.currentSeverId=currentSeverId;
|
|
|
+ this.workerId=currentSeverId;
|
|
|
+ this.clusterManager=clusterManager;
|
|
|
+ this.pingTopic=String.format(this.pingTopic,context.getInstanceId());
|
|
|
+ this.operationTopic=String.format(this.operationTopic,context.getInstanceId());
|
|
|
+ this.lock = redissonClient.getLock("cluster-unique-"+this.getId());
|
|
|
+ this.redissonClient=redissonClient;
|
|
|
+ //先创建本地任务,再争夺任务的唯一锁,避免消息漏发
|
|
|
+ initUniqueTask().subscribe();
|
|
|
+ }
|
|
|
+
|
|
|
+ private Mono<Void> initUniqueTask(){
|
|
|
+ return creatUniqueTask()
|
|
|
+ .filter(result->!result)
|
|
|
+ .flatMap(ignore->this.creatUniqueFail())
|
|
|
+ .then();
|
|
|
+
|
|
|
+ }
|
|
|
+ private Mono<Boolean> creatUniqueTask() {
|
|
|
+ if(!isDead){
|
|
|
+ return Mono.just(false);
|
|
|
+ }
|
|
|
+ boolean result =false;
|
|
|
+ try {
|
|
|
+ //获取锁
|
|
|
+ result = lock.tryLock(-1,pingTime, TimeUnit.SECONDS);
|
|
|
+ }catch (InterruptedException e){
|
|
|
+ return Mono.just(false);
|
|
|
+ }
|
|
|
+ //争夺成功
|
|
|
+ if(result){
|
|
|
+ //创建任务,锁争夺成功后发送心跳
|
|
|
+ isDead=false;
|
|
|
+ return Flux.interval(Duration.ofSeconds(pingTime/2))
|
|
|
+ .flatMap(ignore->{
|
|
|
+ lock.lock(pingTime,TimeUnit.SECONDS);
|
|
|
+ return clusterManager.getTopic(pingTopic).publish(Mono.just(this));
|
|
|
+ })
|
|
|
+ .then(Mono.just(true));
|
|
|
+ }
|
|
|
+ //争夺失败
|
|
|
+ return Mono.just(false);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<Void> reload() {
|
|
|
+ return operation(OperationMessage.of(TaskOperation.RELOAD));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<Void> start() {
|
|
|
+ return operation(OperationMessage.of(TaskOperation.START));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<Void> pause() {
|
|
|
+ return operation(OperationMessage.of(TaskOperation.PAUSE));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<Void> shutdown() {
|
|
|
+ return operation(OperationMessage.of(TaskOperation.SHUTDOWN));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<Void> execute(RuleData data) {
|
|
|
+ return operation(OperationMessage.of(TaskOperation.EXECUTE,Arrays.asList(data)));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<State> getState() {
|
|
|
+ if(isReplica()){
|
|
|
+ //副本
|
|
|
+ return Mono.just(this.getTaskState());
|
|
|
+ }
|
|
|
+ return Mono.just(executor.getState());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<Void> debug(boolean debug) {
|
|
|
+ return operation(debug ?OperationMessage.of(TaskOperation.ENABLE_DEBUG) :OperationMessage.of(TaskOperation.DISABLE_DEBUG));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<Long> getLastStateTime() {
|
|
|
+ return Mono.just(lastStateTime);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<Long> getStartTime() {
|
|
|
+ return Mono.just(startTime);
|
|
|
+ }
|
|
|
+
|
|
|
+ private Mono<Void> operation(OperationMessage message) {
|
|
|
+ TaskOperation operation=message.operation;
|
|
|
+ if(isReplica()){
|
|
|
+ //当前为任务副本,传递给任务执行者
|
|
|
+ return clusterManager.getTopic(operationTopic)
|
|
|
+ .publish(Mono.just(message))
|
|
|
+ .then();
|
|
|
+ }
|
|
|
+ long currentTimeMillis = System.currentTimeMillis();
|
|
|
+ lastStateTime=currentTimeMillis;
|
|
|
+ //该任务不是其他任务的副本,即运行实例位于该机器中,改变状态后进行广播操作
|
|
|
+ switch (operation){
|
|
|
+ case START:
|
|
|
+ return Mono.<Void>fromRunnable(executor::start)
|
|
|
+ .doOnSuccess((v) -> startTime =currentTimeMillis)
|
|
|
+ .subscribeOn(Schedulers.boundedElastic());
|
|
|
+ case PAUSE:
|
|
|
+ return Mono.fromRunnable(executor::pause);
|
|
|
+ case RELOAD:
|
|
|
+ return Mono.<Void>fromRunnable(executor::reload)
|
|
|
+ .subscribeOn(Schedulers.boundedElastic());
|
|
|
+ case SHUTDOWN:
|
|
|
+ this.taskState=State.shutdown;
|
|
|
+ //解锁
|
|
|
+ this.lock.unlock();
|
|
|
+ break;
|
|
|
+ case ENABLE_DEBUG:
|
|
|
+ return Mono.fromRunnable(() -> context.setDebug(true));
|
|
|
+ case DISABLE_DEBUG:
|
|
|
+ return Mono.fromRunnable(() -> context.setDebug(false));
|
|
|
+ case EXECUTE:
|
|
|
+ if(executor instanceof ExecutableTaskExecutor){
|
|
|
+ RuleData data = (RuleData) message.getParams().get(0);
|
|
|
+ return ((ExecutableTaskExecutor) executor).execute(data);
|
|
|
+ }
|
|
|
+ return Mono.empty();
|
|
|
+
|
|
|
+ default:break;
|
|
|
+ }
|
|
|
+ return Mono.empty();
|
|
|
+ }
|
|
|
+
|
|
|
+// /**
|
|
|
+// * 状态发生改变
|
|
|
+// * @return
|
|
|
+// */
|
|
|
+// public Flux<?> handleStateChange(){
|
|
|
+// //非本机任务状态发生改变
|
|
|
+// return changeState.map(Function.identity()).filter(ignore->this.isReplica());
|
|
|
+// }
|
|
|
+
|
|
|
+
|
|
|
+ public boolean isReplica(){
|
|
|
+ return !this.getCurrentSeverId().equals(this.getWorkerId());
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ private Mono<?> creatUniqueFail(){
|
|
|
+ return EmitterProcessor.create(true)
|
|
|
+ .flatMap(result -> clusterManager.getTopic(pingTopic)
|
|
|
+ .subscribePattern()
|
|
|
+ .mergeWith(clusterManager.getTopic(operationTopic).subscribePattern())
|
|
|
+ .flatMap(obj -> {
|
|
|
+ isDead=false;
|
|
|
+ Object message = obj.getMessage();
|
|
|
+ if (message instanceof ClusterUniqueTask) {
|
|
|
+ ClusterUniqueTask task = (ClusterUniqueTask) message;
|
|
|
+ //心跳信息
|
|
|
+ return Mono.just(task);
|
|
|
+ } else if (message instanceof TaskOperation) {
|
|
|
+ //操作信息
|
|
|
+ TaskOperation task = (TaskOperation) message;
|
|
|
+ return Mono.just(task);
|
|
|
+ }
|
|
|
+ return Mono.empty();
|
|
|
+ }))
|
|
|
+ //心跳超时
|
|
|
+ .timeout(Duration.ofSeconds(pingTime),
|
|
|
+ Mono.empty()
|
|
|
+ .doOnNext(ignore->isDead=true)
|
|
|
+ .concatWith(initUniqueTask())
|
|
|
+ .then(Mono.just(this)))
|
|
|
+ .flatMap(msg -> {
|
|
|
+ if (msg instanceof ClusterUniqueTask) {
|
|
|
+ //同步心跳信息
|
|
|
+ ClusterUniqueTask task = (ClusterUniqueTask) msg;
|
|
|
+ this.workerId = task.currentSeverId;
|
|
|
+ this.taskState = task.taskState;
|
|
|
+ this.lastStateTime = task.lastStateTime;
|
|
|
+ this.startTime = task.startTime;
|
|
|
+ return Mono.empty();
|
|
|
+ } else if (msg instanceof OperationMessage) {
|
|
|
+ //同步操作信息
|
|
|
+ OperationMessage message = (OperationMessage) msg;
|
|
|
+ if(!isDead){
|
|
|
+ //任务存活
|
|
|
+ return operation(message);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return Mono.empty();
|
|
|
+ })
|
|
|
+ .then();
|
|
|
+ }
|
|
|
+ enum TaskOperation implements Serializable {
|
|
|
+ START,
|
|
|
+ PAUSE,
|
|
|
+ RELOAD,
|
|
|
+ SHUTDOWN,
|
|
|
+ EXECUTE,
|
|
|
+ ENABLE_DEBUG,
|
|
|
+ DISABLE_DEBUG
|
|
|
+ }
|
|
|
+
|
|
|
+ @Data
|
|
|
+ public static class OperationMessage implements Serializable{
|
|
|
+ private TaskOperation operation;
|
|
|
+ private List<Object> params;
|
|
|
+
|
|
|
+ private OperationMessage(TaskOperation operation, List<Object> params) {
|
|
|
+ this.operation = operation;
|
|
|
+ this.params = params;
|
|
|
+ }
|
|
|
+
|
|
|
+ public static OperationMessage of(TaskOperation operation, List<Object> params) {
|
|
|
+ return new OperationMessage(operation,params);
|
|
|
+ }
|
|
|
+
|
|
|
+ public static OperationMessage of(TaskOperation operation) {
|
|
|
+ return new OperationMessage(operation,null);
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|