|
|
@@ -142,7 +142,17 @@ class ClusterUniqueTask implements Task ,Serializable{
|
|
|
|
|
|
@Override
|
|
|
public Mono<Void> setJob(ScheduleJob job) {
|
|
|
- return operation(OperationMessage.of(TaskOperation.JOB,Collections.singletonList(job),this.currentSeverId));
|
|
|
+ return Mono.fromRunnable(() -> {
|
|
|
+ ScheduleJob old = context.getJob();
|
|
|
+ context.setJob(job);
|
|
|
+ try {
|
|
|
+ executor.validate();
|
|
|
+ } catch (Throwable e) {
|
|
|
+ context.setJob(old);
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
+ }).concatWith(operation(OperationMessage.of(TaskOperation.JOB,Collections.singletonList(job),this.currentSeverId)))
|
|
|
+ .then();
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -238,7 +248,7 @@ class ClusterUniqueTask implements Task ,Serializable{
|
|
|
}
|
|
|
return;
|
|
|
}
|
|
|
- this.handleDisposable = this.clusterManager.getTopic(this.pingTopic)
|
|
|
+ this.handleDisposable = this.clusterManager.getTopic(this.pingTopic)
|
|
|
.subscribePattern()
|
|
|
.mergeWith(this.clusterManager.getTopic(this.operationTopic).subscribePattern())
|
|
|
.timeout(Duration.ofSeconds(Math.multiplyExact(pingTime,2)), Mono.fromRunnable(() -> this.isAlive.set(false)))
|
|
|
@@ -347,18 +357,6 @@ class ClusterUniqueTask implements Task ,Serializable{
|
|
|
this.lastStateTime=currentTimeMillis;
|
|
|
//该任务不是其他任务的副本,即运行实例位于该机器中,改变状态后进行广播操作
|
|
|
switch (operation){
|
|
|
- case JOB:
|
|
|
- return Mono.fromRunnable(() -> {
|
|
|
- ScheduleJob job = (ScheduleJob) message.getParams().get(0);
|
|
|
- ScheduleJob old = this.context.getJob();
|
|
|
- this.context.setJob(job);
|
|
|
- try {
|
|
|
- this.executor.validate();
|
|
|
- } catch (Throwable e) {
|
|
|
- this.context.setJob(old);
|
|
|
- throw e;
|
|
|
- }
|
|
|
- });
|
|
|
case START:
|
|
|
return Mono.<Void>fromRunnable(this.executor::start)
|
|
|
.doOnSuccess((v) ->this. startTime =currentTimeMillis)
|