18339543638 пре 4 година
родитељ
комит
26f0fe427c

+ 30 - 5
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/cluster/ClusterUniqueTask.java

@@ -140,6 +140,12 @@ class ClusterUniqueTask implements Task ,Serializable{
         return context.getJob();
     }
 
+    /**
+     * 设置任务应该集群中所有机器全部进行设置
+     * 所以当本机设置任务成功后,广播给集群机器对任务进行更新
+     * @param job
+     * @return
+     */
     @Override
     public Mono<Void> setJob(ScheduleJob job) {
         return Mono.fromRunnable(() -> {
@@ -151,8 +157,13 @@ class ClusterUniqueTask implements Task ,Serializable{
                 context.setJob(old);
                 throw e;
             }
-        }).concatWith(operation(OperationMessage.of(TaskOperation.JOB,Collections.singletonList(job),this.currentSeverId)))
+        }).concatWith(Mono.fromRunnable(
+            ()->
+                this.clusterManager.getTopic(this.operationTopic)
+                    .publish(Mono.just(OperationMessage.of(TaskOperation.JOB,Collections.singletonList(job),this.currentSeverId)))
+                    .then()))
             .then();
+
     }
 
 
@@ -346,15 +357,29 @@ class ClusterUniqueTask implements Task ,Serializable{
         this.taskState=State.unknown.equals(operation.getState())?this.taskState:operation.getState();
         if(isReplica()&&
             (this.generatePingMsgDisposable!=null&&!this.generatePingMsgDisposable.isDisposed())){
-            //当前为任务副本,传递给任务执行者
-            return  this.clusterManager.getTopic(this.operationTopic)
-                .publish(Mono.just(message))
-                .then();
+            //当前为任务副本不在此处进行广播操作
+            if(!TaskOperation.JOB.equals(message.operation)){
+                return  this.clusterManager.getTopic(this.operationTopic)
+                    .publish(Mono.just(message))
+                    .then();
+            }
         }
         long currentTimeMillis = System.currentTimeMillis();
         this.lastStateTime=currentTimeMillis;
         //该任务不是其他任务的副本,即运行实例位于该机器中,改变状态后进行广播操作
         switch (operation){
+            case JOB:
+                return Mono.fromRunnable(() -> {
+                    ScheduleJob job = (ScheduleJob) message.getParams().get(0);
+                    ScheduleJob old = context.getJob();
+                    context.setJob(job);
+                    try {
+                        executor.validate();
+                    } catch (Throwable e) {
+                        context.setJob(old);
+                        throw e;
+                    }
+                });
             case START:
                 return Mono.<Void>fromRunnable(this.executor::start)
                     .doOnSuccess((v) ->this. startTime =currentTimeMillis)