|
|
@@ -68,6 +68,7 @@ public class TimerTaskExecutorProvider implements TaskExecutorProvider {
|
|
|
}
|
|
|
return this.disposable =
|
|
|
Mono.delay(nextTime, scheduler)
|
|
|
+ .filter(ignore -> state == Task.State.running)
|
|
|
.flatMap(t -> context.getOutput().write(Mono.just(context.newRuleData(t))))
|
|
|
.then(context.fireEvent(RuleConstants.Event.complete, context.newRuleData(System.currentTimeMillis())).thenReturn(1))
|
|
|
.onErrorResume(err -> context.onError(err, null).then(Mono.empty()))
|