Browse Source

fix 规则引擎集群状态变化

18339543638 3 years ago
parent
commit
191e54391d

+ 17 - 11
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/service/RuleInstanceService.java

@@ -19,6 +19,9 @@ import org.jetlinks.rule.engine.api.RuleData;
 import org.jetlinks.rule.engine.api.RuleEngine;
 import org.jetlinks.rule.engine.api.model.RuleEngineModelParser;
 import org.jetlinks.rule.engine.api.model.RuleModel;
+import org.jetlinks.rule.engine.api.scheduler.Scheduler;
+import org.jetlinks.rule.engine.api.task.Task;
+import org.jetlinks.rule.engine.defaults.ScheduleJobCompiler;
 import org.reactivestreams.Publisher;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.CommandLineRunner;
@@ -28,6 +31,7 @@ import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
 import javax.annotation.PostConstruct;
+import java.util.function.Function;
 
 @Service
 @Slf4j
@@ -47,6 +51,8 @@ public class RuleInstanceService extends GenericReactiveCrudService<RuleInstance
 
     private ClusterService<RuleInstanceEntity> clusterServer;
 
+    @Autowired
+    private Scheduler scheduler;
     @PostConstruct
     public void init(){
         clusterServer=new ClusterService<RuleInstanceEntity>(clusterManager) {
@@ -126,17 +132,17 @@ public class RuleInstanceService extends GenericReactiveCrudService<RuleInstance
         createQuery()
             .where()
             .fetch()
-            .flatMap(e ->
-                this
-                    .doStart(e)
-                    .then(Mono.just(e)
-                        .filter(rule->rule.getState()==RuleInstanceState.stopped))
-                    .flatMap(rule->stop(rule.getId()))
-                    .onErrorResume(err -> {
-                        log.warn("启动规则[{}]失败", e.getName(), e);
-                        return Mono.empty();
-                    })
-            )
+            .parallel()
+            .runOn(Schedulers.parallel())
+            .flatMap(e ->{
+                RuleModel model = e.toRule(modelParser);
+                return Flux.fromIterable(new ScheduleJobCompiler(e.getId(), model).compile())
+                    .flatMap(scheduler::schedule)
+                    .collectList()
+                    .flatMapIterable(Function.identity())
+                    .filter(ignore->e.getState()==RuleInstanceState.started)
+                    .flatMap(Task::start);
+            })
             .subscribe();
     }