Jelajahi Sumber

add 规则引擎集群

18339543638 4 tahun lalu
induk
melakukan
06593eeb42

+ 11 - 11
jetlinks-components/rule-engine-component/pom.xml

@@ -21,17 +21,17 @@
 
 
         <!-- 另一种Spring集成starter,本章未使用 -->
-        <dependency>
-            <groupId>org.redisson</groupId>
-            <artifactId>redisson-spring-boot-starter</artifactId>
-            <version>3.13.6</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.springframework.boot</groupId>
-                    <artifactId>spring-boot-starter-web</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
+        <!--<dependency>-->
+            <!--<groupId>org.redisson</groupId>-->
+            <!--<artifactId>redisson-spring-boot-starter</artifactId>-->
+            <!--<version>3.13.6</version>-->
+            <!--<exclusions>-->
+                <!--<exclusion>-->
+                    <!--<groupId>org.springframework.boot</groupId>-->
+                    <!--<artifactId>spring-boot-starter-web</artifactId>-->
+                <!--</exclusion>-->
+            <!--</exclusions>-->
+        <!--</dependency>-->
 
 
 

+ 17 - 16
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/cluster/ClusterUniqueTask.java

@@ -10,8 +10,8 @@ import org.jetlinks.rule.engine.api.task.ExecutableTaskExecutor;
 import org.jetlinks.rule.engine.api.task.Task;
 import org.jetlinks.rule.engine.api.task.TaskExecutor;
 import org.jetlinks.rule.engine.defaults.AbstractExecutionContext;
-import org.redisson.api.RLock;
-import org.redisson.api.RedissonClient;
+//import org.redisson.api.RLock;
+//import org.redisson.api.RedissonClient;
 import reactor.core.publisher.EmitterProcessor;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -53,7 +53,7 @@ public class ClusterUniqueTask implements Task ,Serializable{
      */
     private long lastStateTime;
 
-    private transient final RedissonClient redissonClient;
+//    private transient final RedissonClient redissonClient;
 
     /**
      * 开始时间
@@ -80,7 +80,7 @@ public class ClusterUniqueTask implements Task ,Serializable{
 
 
 
-    private transient RLock lock;
+//    private transient RLock lock;
     /**
      * 该任务已死亡
      */
@@ -126,8 +126,9 @@ public class ClusterUniqueTask implements Task ,Serializable{
                              AbstractExecutionContext context,
                              TaskExecutor executor,
                              String currentSeverId,
-                             ClusterManager clusterManager,
-                             RedissonClient redissonClient) {
+                             ClusterManager clusterManager
+//                             RedissonClient redissonClient) {
+        ) {
         this.schedulerId = schedulerId;
         this.context = context;
         this.executor = executor;
@@ -136,8 +137,8 @@ public class ClusterUniqueTask implements Task ,Serializable{
         this.clusterManager=clusterManager;
         this.pingTopic=String.format(this.pingTopic,context.getInstanceId());
         this.operationTopic=String.format(this.operationTopic,context.getInstanceId());
-        this.lock = redissonClient.getLock("cluster-unique-"+this.getId());
-        this.redissonClient=redissonClient;
+//        this.lock = redissonClient.getLock("cluster-unique-"+this.getId());
+//        this.redissonClient=redissonClient;
         //先创建本地任务,再争夺任务的唯一锁,避免消息漏发
         initUniqueTask().subscribe();
     }
@@ -154,19 +155,19 @@ public class ClusterUniqueTask implements Task ,Serializable{
             return Mono.just(false);
         }
         boolean result =false;
-        try {
-            //获取锁
-            result = lock.tryLock(-1,pingTime, TimeUnit.SECONDS);
-        }catch (InterruptedException e){
-            return Mono.just(false);
-        }
+//        try {
+//            //获取锁
+//            result = lock.tryLock(-1,pingTime, TimeUnit.SECONDS);
+//        }catch (InterruptedException e){
+//            return Mono.just(false);
+//        }
         //争夺成功
         if(result){
             //创建任务,锁争夺成功后发送心跳
             isDead=false;
             return Flux.interval(Duration.ofSeconds(pingTime/2))
                 .flatMap(ignore->{
-                    lock.lock(pingTime,TimeUnit.SECONDS);
+//                    lock.lock(pingTime,TimeUnit.SECONDS);
                     return clusterManager.getTopic(pingTopic).publish(Mono.just(this));
                 })
                 .then(Mono.just(true));
@@ -249,7 +250,7 @@ public class ClusterUniqueTask implements Task ,Serializable{
             case SHUTDOWN:
                 this.taskState=State.shutdown;
                 //解锁
-                this.lock.unlock();
+//                this.lock.unlock();
                 break;
             case ENABLE_DEBUG:
                 return Mono.fromRunnable(() -> context.setDebug(true));

+ 7 - 5
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/cluster/ClusterUniqueWork.java

@@ -12,7 +12,7 @@ import org.jetlinks.rule.engine.cluster.scope.ClusterGlobalScope;
 import org.jetlinks.rule.engine.cluster.worker.ClusterExecutionContext;
 import org.jetlinks.rule.engine.defaults.DefaultExecutionContext;
 import org.jetlinks.rule.engine.defaults.scope.InMemoryGlobalScope;
-import org.redisson.api.RedissonClient;
+//import org.redisson.api.RedissonClient;
 import reactor.core.publisher.Mono;
 
 import java.io.Serializable;
@@ -48,15 +48,17 @@ public class ClusterUniqueWork implements Worker, Serializable {
     private static final InMemoryGlobalScope scope = new InMemoryGlobalScope();
 
 
-    private final RedissonClient redissonClient;
+//    private final RedissonClient redissonClient;
 
-    public ClusterUniqueWork(String id, String name, EventBus eventBus, ConditionEvaluator conditionEvaluator, ClusterManager clusterManager,RedissonClient redissonClient) {
+    public ClusterUniqueWork(String id, String name, EventBus eventBus, ConditionEvaluator conditionEvaluator, ClusterManager clusterManager
+//                             RedissonClient redissonClient) {
+        ) {
         this.id = id;
         this.name = name;
         this.eventBus = eventBus;
         this.conditionEvaluator = conditionEvaluator;
         this.clusterManager = clusterManager;
-        this.redissonClient=redissonClient;
+//        this.redissonClient=redissonClient;
     }
 
     @Override
@@ -66,7 +68,7 @@ public class ClusterUniqueWork implements Worker, Serializable {
             .flatMap(provider -> {
                 DefaultExecutionContext context = createContext(job);
                 return provider.createTask(context)
-                    .map(executor -> new ClusterUniqueTask(schedulerId, context,executor,id,clusterManager,redissonClient));
+                    .map(executor -> new ClusterUniqueTask(schedulerId, context,executor,id,clusterManager));
             });
     }
 

+ 5 - 4
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/ClusterConfiguration.java

@@ -16,7 +16,7 @@ import org.jetlinks.rule.engine.defaults.LocalWorker;
 import org.jetlinks.rule.engine.model.DefaultRuleModelParser;
 import org.jetlinks.rule.engine.model.RuleModelParserStrategy;
 import org.jetlinks.supports.config.ClusterConfigStorageManager;
-import org.redisson.api.RedissonClient;
+//import org.redisson.api.RedissonClient;
 import org.springframework.beans.BeansException;
 import org.springframework.beans.factory.config.BeanPostProcessor;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
@@ -83,8 +83,9 @@ public class ClusterConfiguration {
     }
 
     @Bean
-    public ClusterUniqueWork clusterUniqueWork(JetLinksProperties properties, EventBus eventBus, ConditionEvaluator evaluator, ClusterManager clusterManager,
-                                               RedissonClient redissonClient) {
-        return new ClusterUniqueWork(properties.getServerId(), properties.getClusterName()+properties.getServerId(), eventBus, evaluator,clusterManager,redissonClient);
+    public ClusterUniqueWork clusterUniqueWork(JetLinksProperties properties, EventBus eventBus, ConditionEvaluator evaluator, ClusterManager clusterManager
+//                                               RedissonClient redissonClient) {
+        ) {
+        return new ClusterUniqueWork(properties.getServerId(), properties.getClusterName()+properties.getServerId(), eventBus, evaluator,clusterManager);
     }
 }

+ 2 - 2
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/JetLinksRedisConfiguration.java

@@ -18,7 +18,8 @@ public class JetLinksRedisConfiguration {
 
     @Bean
     public ReactiveRedisTemplate<Object, Object> reactiveRedisTemplate(
-        ReactiveRedisConnectionFactory reactiveRedisConnectionFactory, ResourceLoader resourceLoader) {
+        ReactiveRedisConnectionFactory reactiveRedisConnectionFactory,
+        ResourceLoader resourceLoader) {
 
         FstSerializationRedisSerializer serializer = new FstSerializationRedisSerializer(() -> {
             FSTConfiguration configuration = FSTConfiguration.createDefaultConfiguration()
@@ -37,5 +38,4 @@ public class JetLinksRedisConfiguration {
 
         return new ReactiveRedisTemplate<>(reactiveRedisConnectionFactory, serializationContext);
     }
-
 }

+ 9 - 16
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/cluster/RedisClusterManager.java

@@ -3,17 +3,11 @@ package org.jetlinks.community.standalone.configuration.cluster;
 import lombok.extern.slf4j.Slf4j;
 import org.jetlinks.core.cache.Caches;
 import org.jetlinks.core.cluster.*;
-import org.jetlinks.supports.cluster.redis.RedisClusterCache;
-import org.jetlinks.supports.cluster.redis.RedisClusterCounter;
-import org.jetlinks.supports.cluster.redis.RedisClusterNotifier;
-import org.jetlinks.supports.cluster.redis.RedisClusterSet;
+import org.jetlinks.supports.cluster.redis.*;
 import org.springframework.data.redis.core.ReactiveRedisOperations;
 import org.springframework.data.redis.core.ReactiveRedisTemplate;
 import org.springframework.data.redis.serializer.RedisSerializationContext;
 import org.springframework.data.redis.serializer.RedisSerializer;
-import reactor.core.publisher.Flux;
-
-import java.time.Duration;
 import java.util.Map;
 
 @SuppressWarnings("all")
@@ -24,6 +18,7 @@ public class RedisClusterManager implements ClusterManager {
 
     private String serverId;
 
+    private Map<String, RedisClusterQueue> queues = Caches.newCache();
     private Map<String, ClusterTopic> topics = Caches.newCache();
     private Map<String, ClusterCache> caches = Caches.newCache();
     private Map<String, ClusterSet> sets = Caches.newCache();
@@ -47,12 +42,12 @@ public class RedisClusterManager implements ClusterManager {
         this.stringOperations = new ReactiveRedisTemplate<>(operations.getConnectionFactory(), RedisSerializationContext.string());
 
         this.queueRedisTemplate = new ReactiveRedisTemplate<>(operations.getConnectionFactory(),
-                RedisSerializationContext.<String, Object>newSerializationContext()
-                        .key(RedisSerializer.string())
-                        .value((RedisSerializationContext.SerializationPair<Object>) operations.getSerializationContext().getValueSerializationPair())
-                        .hashKey(RedisSerializer.string())
-                        .hashValue(operations.getSerializationContext().getHashValueSerializationPair())
-                        .build());
+            RedisSerializationContext.<String, Object>newSerializationContext()
+                .key(RedisSerializer.string())
+                .value((RedisSerializationContext.SerializationPair<Object>) operations.getSerializationContext().getValueSerializationPair())
+                .hashKey(RedisSerializer.string())
+                .hashValue(operations.getSerializationContext().getHashValueSerializationPair())
+                .build());
     }
 
     public RedisClusterManager(String name, String serverId, ReactiveRedisTemplate<?, ?> operations) {
@@ -93,10 +88,8 @@ public class RedisClusterManager implements ClusterManager {
     }
 
     @Override
-    @Deprecated
     public <T> ClusterQueue<T> getQueue(String queueId) {
-//        return queues.computeIfAbsent(queueId, id -> new RedisClusterQueue<>(id, this.queueRedisTemplate));
-        return null;
+        return queues.computeIfAbsent(queueId, id -> new RedisClusterQueue<>(id, this.queueRedisTemplate));
     }
 
     @Override

+ 24 - 22
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/cluster/RedisClusterTopic.java

@@ -1,11 +1,13 @@
 package org.jetlinks.community.standalone.configuration.cluster;
 
+import org.hswebframework.web.exception.BusinessException;
 import org.jetlinks.core.cluster.ClusterTopic;
 import org.reactivestreams.Publisher;
 import org.springframework.data.redis.core.ReactiveRedisOperations;
 import reactor.core.Disposable;
 import reactor.core.publisher.*;
 
+import java.time.Duration;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public class RedisClusterTopic<T> implements ClusterTopic<T> {
@@ -32,39 +34,39 @@ public class RedisClusterTopic<T> implements ClusterTopic<T> {
     private void doSubscribe() {
         if (subscribed.compareAndSet(false, true)) {
             disposable = operations
-                    .listenToPattern(topicName)
-                    .subscribe(data -> {
-                        if (!processor.hasDownstreams()) {
-                            disposable.dispose();
-                            subscribed.compareAndSet(true, false);
-                        } else {
-                            sink.next(new TopicMessage<T>() {
-                                @Override
-                                public String getTopic() {
-                                    return data.getChannel();
-                                }
+                .listenToPattern(topicName)
+                .subscribe(data -> {
+                    if (!processor.hasDownstreams()) {
+                        disposable.dispose();
+                        subscribed.compareAndSet(true, false);
+                    } else {
+                        sink.next(new TopicMessage<T>() {
+                            @Override
+                            public String getTopic() {
+                                return data.getChannel();
+                            }
 
-                                @Override
-                                public T getMessage() {
-                                    return data.getMessage();
-                                }
-                            });
-                        }
-                    });
+                            @Override
+                            public T getMessage() {
+                                return data.getMessage();
+                            }
+                        });
+                    }
+                });
         }
     }
 
     @Override
     public Flux<TopicMessage<T>> subscribePattern() {
         return processor
-                .doOnSubscribe((r) -> doSubscribe());
+            .doOnSubscribe((r) -> doSubscribe());
     }
 
     @Override
     public Mono<Integer> publish(Publisher<? extends T> publisher) {
         return Flux.from(publisher)
-                .flatMap(data -> operations.convertAndSend(topicName, data))
-                .last(1L)
-                .map(Number::intValue);
+            .flatMap(data -> operations.convertAndSend(topicName, data))
+            .last(1L)
+            .map(Number::intValue);
     }
 }

+ 82 - 80
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/cluster/RedisHaManager.java

@@ -69,44 +69,44 @@ public class RedisHaManager implements HaManager {
         current.setLastKeepAlive(System.currentTimeMillis());
 
         inRedisNode.put(allNodeHashKey, current.getId(), current)
-                   .subscribe();
+            .subscribe();
 
         keepalive.publish(Mono.just(current)).subscribe();
 
         Map<String, ServerNode> maybeOffline = getAllNode()
-                .stream()
-                .filter(node -> System.currentTimeMillis() - node.getLastKeepAlive() > TimeUnit.SECONDS.toMillis(30))
-                .filter(node -> !node.isSame(current))
-                .collect(Collectors.toMap(ServerNode::getId, Function.identity()));
+            .stream()
+            .filter(node -> System.currentTimeMillis() - node.getLastKeepAlive() > TimeUnit.SECONDS.toMillis(30))
+            .filter(node -> !node.isSame(current))
+            .collect(Collectors.toMap(ServerNode::getId, Function.identity()));
 
         //检查节点是否离线
         inRedisNode.keys(allNodeHashKey)
-                   .filter(maybeOffline::containsKey)
-                   .map(maybeOffline::get)
-                   .collectList()
-                   .filter(list -> !list.isEmpty())
-                   .flatMapMany(list -> inRedisNode
-                           .remove(allNodeHashKey, list.stream().map(ServerNode::getId).toArray())
-                           .thenMany(Flux.fromIterable(list))
-                   )
-                   .as(offlineTopic::publish)
-                   .subscribe();
+            .filter(maybeOffline::containsKey)
+            .map(maybeOffline::get)
+            .collectList()
+            .filter(list -> !list.isEmpty())
+            .flatMapMany(list -> inRedisNode
+                .remove(allNodeHashKey, list.stream().map(ServerNode::getId).toArray())
+                .thenMany(Flux.fromIterable(list))
+            )
+            .as(offlineTopic::publish)
+            .subscribe();
     }
 
     private void electionLeader() {
         allNode.values()
-               .stream()
-               .peek(serverNode -> serverNode.setLeader(false))
-               .min(Comparator.comparing(ServerNode::getUptime))
-               .ifPresent(serverNode -> serverNode.setLeader(true));
+            .stream()
+            .peek(serverNode -> serverNode.setLeader(false))
+            .min(Comparator.comparing(ServerNode::getUptime))
+            .ifPresent(serverNode -> serverNode.setLeader(true));
     }
 
     public void shutdown() {
         inRedisNode
-                .remove(allNodeHashKey, current.getId())
-                .then(offlineTopic
-                              .publish(Mono.just(current)))
-                .block();
+            .remove(allNodeHashKey, current.getId())
+            .then(offlineTopic
+                .publish(Mono.just(current)))
+            .block();
     }
 
     public synchronized void startup() {
@@ -118,64 +118,66 @@ public class RedisHaManager implements HaManager {
 
         //注册自己
         inRedisNode.put(allNodeHashKey, current.getId(), current)
-                   .flatMapMany(r -> inRedisNode.values(allNodeHashKey))
-                   .collectList()
-                   .doOnNext(node -> {
-                       for (ServerNode serverNode : node) {
-                           serverNode.setLastKeepAlive(System.currentTimeMillis());
-                           allNode.put(serverNode.getId(), serverNode);
-                       }
-                       electionLeader();
-                       Flux.interval(Duration.ZERO, Duration.ofSeconds(5))
-                           .doOnNext(i -> this.checkAlive())
-                           .subscribe();
-                   })
-                   .block();
+            .flatMapMany(r -> inRedisNode.values(allNodeHashKey))
+            .collectList()
+            .doOnNext(node -> {
+                for (ServerNode serverNode : node) {
+                    serverNode.setLastKeepAlive(System.currentTimeMillis());
+                    allNode.put(serverNode.getId(), serverNode);
+                }
+                electionLeader();
+                Flux.interval(Duration.ZERO, Duration.ofSeconds(5))
+                    .doOnNext(i -> this.checkAlive())
+                    .subscribe();
+            })
+            .block();
 
         offlineTopic.subscribe()
-                    .subscribe(serverNode -> {
-                        //自己
-                        if (currentServer().isSame(serverNode)) {
-                            return;
-                        }
-                        if (allNode.remove(serverNode.getId()) != null) {
-                            log.debug("[{}]:server node [{}] offline", haName, serverNode.getId());
-                            //node offline
-                            inRedisNode
-                                    .remove(allNodeHashKey, serverNode.getId())
-                                    .subscribe();
-                            electionLeader();
-                            if (offlineProcessor.hasDownstreams()) {
-                                offlineProcessor.onNext(serverNode);
-                            }
-                        }
-                    });
+            .subscribe(serverNode -> {
+                //自己
+                if (currentServer().isSame(serverNode)) {
+                    return;
+                }
+                if (allNode.remove(serverNode.getId()) != null) {
+                    log.debug("[{}]:server node [{}] offline", haName, serverNode.getId());
+                    //node offline
+                    inRedisNode
+                        .remove(allNodeHashKey, serverNode.getId())
+                        .subscribe();
+                    electionLeader();
+                    if (offlineProcessor.hasDownstreams()) {
+                        offlineProcessor.onNext(serverNode);
+                    }
+                }
+            });
         //其他节点定时发布
-        keepalive.subscribe()
-                 .subscribe(serverNode -> {
-                     //自己
-                     if (currentServer().isSame(serverNode)) {
-                         return;
-                     }
-                     serverNode.setLastKeepAlive(System.currentTimeMillis());
-                     allNode.compute(serverNode.getId(), (id, node) -> {
-                         if (node != null) {
-                             node.setLastKeepAlive(System.currentTimeMillis());
-                             return node;
-                         }
-                         return null;
-                     });
-                     if (!allNode.containsKey(serverNode.getId())) {
-                         allNode.put(serverNode.getId(), serverNode);
-                         electionLeader();
-                         log.debug("[{}]:server node [{}] online", haName, serverNode.getId());
-                         //node join
-                         if (onlineProcessor.hasDownstreams()) {
-                             onlineProcessor.onNext(serverNode);
-                         }
-                     }
-
-                 });
+        keepalive.subscribePattern()
+//            .map(ClusterTopic.TopicMessage::getMessage)
+            .subscribe(msg -> {
+                ServerNode serverNode = msg.getMessage();
+                //自己
+                if (currentServer().isSame(serverNode)) {
+                    return;
+                }
+                serverNode.setLastKeepAlive(System.currentTimeMillis());
+                allNode.compute(serverNode.getId(), (id, node) -> {
+                    if (node != null) {
+                        node.setLastKeepAlive(System.currentTimeMillis());
+                        return node;
+                    }
+                    return null;
+                });
+                if (!allNode.containsKey(serverNode.getId())) {
+                    allNode.put(serverNode.getId(), serverNode);
+                    electionLeader();
+                    log.debug("[{}]:server node [{}] online", haName, serverNode.getId());
+                    //node join
+                    if (onlineProcessor.hasDownstreams()) {
+                        onlineProcessor.onNext(serverNode);
+                    }
+                }
+
+            });
 
 
     }
@@ -188,13 +190,13 @@ public class RedisHaManager implements HaManager {
     @Override
     public Flux<ServerNode> subscribeServerOnline() {
         return onlineProcessor
-                .filter(node -> !node.getId().equals(current.getId()));
+            .filter(node -> !node.getId().equals(current.getId()));
     }
 
     @Override
     public Flux<ServerNode> subscribeServerOffline() {
         return offlineProcessor
-                .filter(node -> !node.getId().equals(current.getId()));
+            .filter(node -> !node.getId().equals(current.getId()));
     }
 
     @Override