Bladeren bron

add 规则引擎集群

18339543638 4 jaren geleden
bovenliggende
commit
7176eeda27

+ 28 - 17
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/cluster/ruleEngine/ClusterRuleEngine.java

@@ -8,10 +8,13 @@ import org.jetlinks.rule.engine.api.model.RuleModel;
 import org.jetlinks.rule.engine.api.scheduler.Scheduler;
 import org.jetlinks.rule.engine.api.task.Task;
 import org.jetlinks.rule.engine.defaults.DefaultRuleEngine;
+import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
+import java.time.Duration;
+
 /**
  * @author lifang
  * @version 1.0.0
@@ -25,6 +28,7 @@ public class ClusterRuleEngine extends DefaultRuleEngine {
     private ClusterManager clusterManager;
     private volatile  boolean started=false;
     private String serverId;
+    private Disposable result =null;
     public ClusterRuleEngine(Scheduler scheduler) {
         super(scheduler);
     }
@@ -40,25 +44,32 @@ public class ClusterRuleEngine extends DefaultRuleEngine {
             return;
         }
         started=true;
-        //广播消息
-        clusterManager
-            .getTopic("")
-            .subscribePattern()
-            .publishOn(Schedulers.boundedElastic())
-            .flatMap(obj -> {
-                if(obj instanceof ClusterMessage){
-                    ClusterMessage clusterMessage = (ClusterMessage) obj;
-                    String fromServer = clusterMessage.getFromServer();
-                    if(this.serverId.equals(fromServer)){
-                        return Flux.empty();
-                    }
-                    ClusterMessage.ServiceMessage serviceMessage = clusterMessage.getService();
-                    String type = serviceMessage.getType();
-                    RuleInstanceState ruleInstanceState = RuleInstanceState.valueOf(type);
-                    return execute(ruleInstanceState,serviceMessage.getId(),serviceMessage.getPayload());
+        Flux.interval(Duration.ofSeconds(10))
+            .doOnNext(ignore->{
+                if(result==null||result.isDisposed()){
+                    result = clusterManager
+                        .getTopic("cluster-rule-engine")
+                        .subscribePattern()
+                        .publishOn(Schedulers.boundedElastic())
+                        .flatMap(obj -> {
+                            if (obj instanceof ClusterMessage) {
+                                ClusterMessage clusterMessage = (ClusterMessage) obj;
+                                String fromServer = clusterMessage.getFromServer();
+                                if (this.serverId.equals(fromServer)) {
+                                    return Flux.empty();
+                                }
+                                ClusterMessage.ServiceMessage serviceMessage = clusterMessage.getService();
+                                String type = serviceMessage.getType();
+                                RuleInstanceState ruleInstanceState = RuleInstanceState.valueOf(type);
+                                return execute(ruleInstanceState, serviceMessage.getId(), serviceMessage.getPayload());
+                            }
+                            return Flux.empty();
+                        }).subscribe();
                 }
-                return Flux.empty();
             }).subscribe();
+
+        //广播消息
+
     }
 
     private Mono<Void> execute(RuleInstanceState state, String id, Object payload){