18339543638 4 лет назад
Родитель
Сommit
d47963db6b

+ 22 - 8
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/cluster/ClusterUniqueTask.java

@@ -86,7 +86,7 @@ public class ClusterUniqueTask implements Task ,Serializable{
     /**
      * 该任务已死亡
      */
-    private boolean isDead=true;
+    private volatile boolean isDead=true;
 
     private transient final ClusterManager clusterManager;
 
@@ -171,6 +171,9 @@ public class ClusterUniqueTask implements Task ,Serializable{
         if(result){
             //创建任务,锁争夺成功后发送心跳
             isDead=false;
+            if(State.running.equals(taskState)||State.running.equals(executor.getState())){
+                executor.start();
+            }
             return Flux.interval(Duration.ofSeconds(pingTime/2))
                 .flatMap(ignore->{
                     lock.lock(pingTime,TimeUnit.SECONDS);
@@ -178,9 +181,8 @@ public class ClusterUniqueTask implements Task ,Serializable{
                         //终止监听心跳
                         listenDisposable.dispose();
                     }
-                    return clusterManager.getTopic(pingTopic).publish(Mono.just(this));
-                })
-                .then(Mono.just(true));
+                    return clusterManager.getTopic(pingTopic).publish(Mono.just(this)).then(Mono.empty());
+                }).then(Mono.just(true));
         }
         //争夺失败
         return Mono.just(false);
@@ -248,6 +250,18 @@ public class ClusterUniqueTask implements Task ,Serializable{
         lastStateTime=currentTimeMillis;
         //该任务不是其他任务的副本,即运行实例位于该机器中,改变状态后进行广播操作
         switch (operation){
+            case JOB:
+                return Mono.fromRunnable(() -> {
+                    ScheduleJob job = (ScheduleJob) message.getParams().get(0);
+                    ScheduleJob old = context.getJob();
+                    context.setJob(job);
+                    try {
+                        executor.validate();
+                    } catch (Throwable e) {
+                        context.setJob(old);
+                        throw e;
+                    }
+                });
             case START:
                 return Mono.<Void>fromRunnable(executor::start)
                     .doOnSuccess((v) -> startTime =currentTimeMillis)
@@ -292,9 +306,9 @@ public class ClusterUniqueTask implements Task ,Serializable{
             .subscribePattern()
             .mergeWith(clusterManager.getTopic(operationTopic).subscribePattern())
             .timeout(Duration.ofSeconds(pingTime),
-                Mono.empty()
-                    .doOnNext(ignore -> isDead = true)
-                    .concatWith(initUniqueTask())
+                Mono.zip(
+                    Mono.just(isDead=true),
+                    initUniqueTask())
                     .then(Mono.empty()))
             .publishOn(Schedulers.boundedElastic())
             .subscribe(obj->{
@@ -312,7 +326,7 @@ public class ClusterUniqueTask implements Task ,Serializable{
                     OperationMessage operationMessage = (OperationMessage) message;
                     if(!isDead){
                         //任务存活
-                         operation(operationMessage).subscribe();
+                        operation(operationMessage).subscribe();
                     }
                 }
             });