Pārlūkot izejas kodu

add 规则引擎集群

18339543638 4 gadi atpakaļ
vecāks
revīzija
3ddedef99c

+ 11 - 11
jetlinks-components/rule-engine-component/pom.xml

@@ -21,17 +21,17 @@
 
 
         <!-- 另一种Spring集成starter,本章未使用 -->
-        <!--<dependency>-->
-            <!--<groupId>org.redisson</groupId>-->
-            <!--<artifactId>redisson-spring-boot-starter</artifactId>-->
-            <!--<version>3.13.6</version>-->
-            <!--<exclusions>-->
-                <!--<exclusion>-->
-                    <!--<groupId>org.springframework.boot</groupId>-->
-                    <!--<artifactId>spring-boot-starter-web</artifactId>-->
-                <!--</exclusion>-->
-            <!--</exclusions>-->
-        <!--</dependency>-->
+        <dependency>
+            <groupId>org.redisson</groupId>
+            <artifactId>redisson-spring-boot-starter</artifactId>
+            <version>3.13.6</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.springframework.boot</groupId>
+                    <artifactId>spring-boot-starter-web</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
 
 
 

+ 16 - 17
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/cluster/ClusterUniqueTask.java

@@ -10,8 +10,8 @@ import org.jetlinks.rule.engine.api.task.ExecutableTaskExecutor;
 import org.jetlinks.rule.engine.api.task.Task;
 import org.jetlinks.rule.engine.api.task.TaskExecutor;
 import org.jetlinks.rule.engine.defaults.AbstractExecutionContext;
-//import org.redisson.api.RLock;
-//import org.redisson.api.RedissonClient;
+import org.redisson.api.RLock;
+import org.redisson.api.RedissonClient;
 import reactor.core.publisher.EmitterProcessor;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -53,7 +53,7 @@ public class ClusterUniqueTask implements Task ,Serializable{
      */
     private long lastStateTime;
 
-//    private transient final RedissonClient redissonClient;
+    private transient final RedissonClient redissonClient;
 
     /**
      * 开始时间
@@ -80,7 +80,7 @@ public class ClusterUniqueTask implements Task ,Serializable{
 
 
 
-//    private transient RLock lock;
+    private transient RLock lock;
     /**
      * 该任务已死亡
      */
@@ -126,9 +126,8 @@ public class ClusterUniqueTask implements Task ,Serializable{
                              AbstractExecutionContext context,
                              TaskExecutor executor,
                              String currentSeverId,
-                             ClusterManager clusterManager
-//                             RedissonClient redissonClient) {
-        ) {
+                             ClusterManager clusterManager,
+                             RedissonClient redissonClient) {
         this.schedulerId = schedulerId;
         this.context = context;
         this.executor = executor;
@@ -137,8 +136,8 @@ public class ClusterUniqueTask implements Task ,Serializable{
         this.clusterManager=clusterManager;
         this.pingTopic=String.format(this.pingTopic,context.getInstanceId());
         this.operationTopic=String.format(this.operationTopic,context.getInstanceId());
-//        this.lock = redissonClient.getLock("cluster-unique-"+this.getId());
-//        this.redissonClient=redissonClient;
+        this.lock = redissonClient.getLock("cluster-unique-"+this.getId());
+        this.redissonClient=redissonClient;
         //先创建本地任务,再争夺任务的唯一锁,避免消息漏发
         initUniqueTask().subscribe();
     }
@@ -155,19 +154,19 @@ public class ClusterUniqueTask implements Task ,Serializable{
             return Mono.just(false);
         }
         boolean result =false;
-//        try {
-//            //获取锁
-//            result = lock.tryLock(-1,pingTime, TimeUnit.SECONDS);
-//        }catch (InterruptedException e){
-//            return Mono.just(false);
-//        }
+        try {
+            //获取锁
+            result = lock.tryLock(-1,pingTime, TimeUnit.SECONDS);
+        }catch (InterruptedException e){
+            return Mono.just(false);
+        }
         //争夺成功
         if(result){
             //创建任务,锁争夺成功后发送心跳
             isDead=false;
             return Flux.interval(Duration.ofSeconds(pingTime/2))
                 .flatMap(ignore->{
-//                    lock.lock(pingTime,TimeUnit.SECONDS);
+                    lock.lock(pingTime,TimeUnit.SECONDS);
                     return clusterManager.getTopic(pingTopic).publish(Mono.just(this));
                 })
                 .then(Mono.just(true));
@@ -250,7 +249,7 @@ public class ClusterUniqueTask implements Task ,Serializable{
             case SHUTDOWN:
                 this.taskState=State.shutdown;
                 //解锁
-//                this.lock.unlock();
+                this.lock.unlock();
                 break;
             case ENABLE_DEBUG:
                 return Mono.fromRunnable(() -> context.setDebug(true));

+ 5 - 7
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/cluster/ClusterUniqueWork.java

@@ -12,7 +12,7 @@ import org.jetlinks.rule.engine.cluster.scope.ClusterGlobalScope;
 import org.jetlinks.rule.engine.cluster.worker.ClusterExecutionContext;
 import org.jetlinks.rule.engine.defaults.DefaultExecutionContext;
 import org.jetlinks.rule.engine.defaults.scope.InMemoryGlobalScope;
-//import org.redisson.api.RedissonClient;
+import org.redisson.api.RedissonClient;
 import reactor.core.publisher.Mono;
 
 import java.io.Serializable;
@@ -48,17 +48,15 @@ public class ClusterUniqueWork implements Worker, Serializable {
     private static final InMemoryGlobalScope scope = new InMemoryGlobalScope();
 
 
-//    private final RedissonClient redissonClient;
+    private final RedissonClient redissonClient;
 
-    public ClusterUniqueWork(String id, String name, EventBus eventBus, ConditionEvaluator conditionEvaluator, ClusterManager clusterManager
-//                             RedissonClient redissonClient) {
-        ) {
+    public ClusterUniqueWork(String id, String name, EventBus eventBus, ConditionEvaluator conditionEvaluator, ClusterManager clusterManager,RedissonClient redissonClient) {
         this.id = id;
         this.name = name;
         this.eventBus = eventBus;
         this.conditionEvaluator = conditionEvaluator;
         this.clusterManager = clusterManager;
-//        this.redissonClient=redissonClient;
+        this.redissonClient=redissonClient;
     }
 
     @Override
@@ -68,7 +66,7 @@ public class ClusterUniqueWork implements Worker, Serializable {
             .flatMap(provider -> {
                 DefaultExecutionContext context = createContext(job);
                 return provider.createTask(context)
-                    .map(executor -> new ClusterUniqueTask(schedulerId, context,executor,id,clusterManager));
+                    .map(executor -> new ClusterUniqueTask(schedulerId, context,executor,id,clusterManager,redissonClient));
             });
     }
 

+ 15 - 0
jetlinks-standalone/pom.xml

@@ -18,6 +18,7 @@
         <skipTests>true</skipTests>
     </properties>
 
+
     <build>
         <resources>
             <resource>
@@ -51,6 +52,20 @@
                 </executions>
             </plugin>
 
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <source>1.8</source>
+                    <target>1.8</target>
+                    <encoding>UTF-8</encoding>
+                    <compilerArguments>
+                        <verbose />
+                        <!-- 将jdk的依赖jar打入项目中,这样项目中使用的jdk的依赖就尅正常使用 -->
+                        <bootclasspath>${java.home}/lib/rt.jar;${java.home}/lib/jce.jar;${java.home}/lib/jsse.jar</bootclasspath>
+                    </compilerArguments>
+                </configuration>
+            </plugin>
+
         </plugins>
     </build>
 

+ 4 - 5
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/JetLinksApplication.java

@@ -6,19 +6,18 @@ import org.hswebframework.web.authorization.events.AuthorizingHandleBeforeEvent;
 import org.hswebframework.web.crud.annotation.EnableEasyormRepository;
 import org.hswebframework.web.logging.aop.EnableAccessLogger;
 import org.hswebframework.web.logging.events.AccessLoggerAfterEvent;
-import org.jetlinks.community.visualization.entity.VisualizationConfigEntity;
+import org.redisson.spring.starter.RedissonAutoConfiguration;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.cache.annotation.EnableCaching;
 import org.springframework.context.annotation.Profile;
 import org.springframework.context.event.EventListener;
 import org.springframework.stereotype.Component;
-
 import javax.annotation.PostConstruct;
-import java.util.function.Supplier;
 
 
-@SpringBootApplication(scanBasePackages = "org.jetlinks.community", exclude = {
+@SpringBootApplication( scanBasePackages = "org.jetlinks.community", exclude = {
+    RedissonAutoConfiguration.class
 //    DataSourceAutoConfiguration.class,
 //    ElasticsearchRestClientAutoConfiguration.class
 })
@@ -29,7 +28,7 @@ import java.util.function.Supplier;
 @Slf4j
 public class JetLinksApplication {
 
-    public static void main(String[] args) {
+    public static void main(String[] args) throws Exception {
         SpringApplication.run(JetLinksApplication.class, args);
     }
 

+ 11 - 7
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/ClusterConfiguration.java

@@ -1,5 +1,7 @@
 package org.jetlinks.community.standalone.configuration;
 
+import lombok.AllArgsConstructor;
+import lombok.Getter;
 import org.jetlinks.community.rule.engine.cluster.ClusterUniqueWork;
 import org.jetlinks.community.standalone.configuration.cluster.ClusterDeviceMessageBrokeMessageBroker;
 import org.jetlinks.community.standalone.configuration.cluster.ClusterDeviceMessageConnector;
@@ -12,15 +14,14 @@ import org.jetlinks.rule.engine.api.task.ConditionEvaluator;
 import org.jetlinks.rule.engine.api.task.TaskExecutorProvider;
 import org.jetlinks.rule.engine.condition.ConditionEvaluatorStrategy;
 import org.jetlinks.rule.engine.condition.DefaultConditionEvaluator;
-import org.jetlinks.rule.engine.defaults.LocalWorker;
 import org.jetlinks.rule.engine.model.DefaultRuleModelParser;
 import org.jetlinks.rule.engine.model.RuleModelParserStrategy;
 import org.jetlinks.supports.config.ClusterConfigStorageManager;
-//import org.redisson.api.RedissonClient;
+import org.redisson.api.RedissonClient;
 import org.springframework.beans.BeansException;
 import org.springframework.beans.factory.config.BeanPostProcessor;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
@@ -32,9 +33,12 @@ import org.springframework.context.annotation.Configuration;
  * @createTime 2021年11月08日 15:35:00
  */
 @Configuration
+@AllArgsConstructor
 @ConditionalOnProperty(name = "jetlinks.cluster",havingValue = "true")
 public class ClusterConfiguration {
 
+    @Getter
+    private RedisProperties properties;
     @Bean
     public ClusterDeviceMessageBrokeMessageBroker clusterDeviceMessageBrokeMessageBroker(JetLinksProperties properties, ClusterManager clusterManager) {
         return new ClusterDeviceMessageBrokeMessageBroker(properties.getServerId(),clusterManager);
@@ -83,9 +87,9 @@ public class ClusterConfiguration {
     }
 
     @Bean
-    public ClusterUniqueWork clusterUniqueWork(JetLinksProperties properties, EventBus eventBus, ConditionEvaluator evaluator, ClusterManager clusterManager
-//                                               RedissonClient redissonClient) {
-        ) {
-        return new ClusterUniqueWork(properties.getServerId(), properties.getClusterName()+properties.getServerId(), eventBus, evaluator,clusterManager);
+    public ClusterUniqueWork clusterUniqueWork(JetLinksProperties properties, EventBus eventBus, ConditionEvaluator evaluator, ClusterManager clusterManager,
+                                               RedissonClient redissonClient) {
+//        ) {
+        return new ClusterUniqueWork(properties.getServerId(), properties.getClusterName()+properties.getServerId(), eventBus, evaluator,clusterManager,redissonClient);
     }
 }

+ 0 - 41
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/JetLinksRedisConfiguration.java

@@ -1,41 +0,0 @@
-package org.jetlinks.community.standalone.configuration;
-
-import org.jetlinks.community.standalone.configuration.fst.FstSerializationRedisSerializer;
-import org.nustaq.serialization.FSTConfiguration;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.core.io.ResourceLoader;
-import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
-import org.springframework.data.redis.core.ReactiveRedisTemplate;
-import org.springframework.data.redis.serializer.RedisSerializationContext;
-import org.springframework.data.redis.serializer.RedisSerializer;
-import org.springframework.data.redis.serializer.StringRedisSerializer;
-
-@Configuration
-@ConditionalOnProperty(prefix = "spring.redis",name = "serializer",havingValue = "fst")
-public class JetLinksRedisConfiguration {
-
-    @Bean
-    public ReactiveRedisTemplate<Object, Object> reactiveRedisTemplate(
-        ReactiveRedisConnectionFactory reactiveRedisConnectionFactory,
-        ResourceLoader resourceLoader) {
-
-        FstSerializationRedisSerializer serializer = new FstSerializationRedisSerializer(() -> {
-            FSTConfiguration configuration = FSTConfiguration.createDefaultConfiguration()
-                .setForceSerializable(true);
-            configuration.setClassLoader(resourceLoader.getClassLoader());
-            return configuration;
-        });
-        @SuppressWarnings("all")
-        RedisSerializationContext<Object, Object> serializationContext = RedisSerializationContext
-            .newSerializationContext()
-            .key((RedisSerializer)new StringRedisSerializer())
-            .value(serializer)
-            .hashKey(StringRedisSerializer.UTF_8)
-            .hashValue(serializer)
-            .build();
-
-        return new ReactiveRedisTemplate<>(reactiveRedisConnectionFactory, serializationContext);
-    }
-}

+ 191 - 0
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/RedissonClientAutoConfiguration.java

@@ -0,0 +1,191 @@
+//
+// Source code recreated from a .class file by IntelliJ IDEA
+// (powered by Fernflower decompiler)
+//
+
+package org.jetlinks.community.standalone.configuration;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import org.redisson.Redisson;
+import org.redisson.api.RedissonClient;
+import org.redisson.config.ClusterServersConfig;
+import org.redisson.config.Config;
+import org.redisson.config.SentinelServersConfig;
+import org.redisson.config.SingleServerConfig;
+import org.redisson.spring.data.connection.RedissonConnectionFactory;
+import org.redisson.spring.starter.RedissonAutoConfigurationCustomizer;
+import org.redisson.spring.starter.RedissonProperties;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.AutoConfigureBefore;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration;
+import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
+import org.springframework.boot.autoconfigure.data.redis.RedisProperties.Sentinel;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.core.io.Resource;
+import org.springframework.data.redis.connection.RedisConnectionFactory;
+import org.springframework.data.redis.core.RedisOperations;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.core.StringRedisTemplate;
+import org.springframework.util.ReflectionUtils;
+
+@Configuration
+@ConditionalOnClass({Redisson.class, RedisOperations.class})
+@AutoConfigureBefore({RedisAutoConfiguration.class})
+@EnableConfigurationProperties({RedissonProperties.class, RedisProperties.class})
+public class RedissonClientAutoConfiguration {
+    private static final String REDIS_PROTOCOL_PREFIX = "redis://";
+    private static final String REDISS_PROTOCOL_PREFIX = "rediss://";
+    @Autowired(
+        required = false
+    )
+    private List<RedissonAutoConfigurationCustomizer> redissonAutoConfigurationCustomizers;
+    @Autowired
+    private RedissonProperties redissonProperties;
+    @Autowired
+    private RedisProperties redisProperties;
+    @Autowired
+    private ApplicationContext ctx;
+
+    public RedissonClientAutoConfiguration() {
+    }
+
+    @Bean
+    @ConditionalOnMissingBean(
+        name = {"redisTemplate"}
+    )
+    public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
+        RedisTemplate<Object, Object> template = new RedisTemplate();
+        template.setConnectionFactory(redisConnectionFactory);
+        return template;
+    }
+
+    @Bean
+    @ConditionalOnMissingBean({StringRedisTemplate.class})
+    public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
+        StringRedisTemplate template = new StringRedisTemplate();
+        template.setConnectionFactory(redisConnectionFactory);
+        return template;
+    }
+
+    @Bean(
+        destroyMethod = "shutdown"
+    )
+    @ConditionalOnMissingBean({RedissonClient.class})
+    public RedissonClient redisson() throws IOException {
+        Config config = null;
+        Method clusterMethod = ReflectionUtils.findMethod(RedisProperties.class, "getCluster");
+        Method timeoutMethod = ReflectionUtils.findMethod(RedisProperties.class, "getTimeout");
+        Object timeoutValue = ReflectionUtils.invokeMethod(timeoutMethod, this.redisProperties);
+        int timeout;
+        Method nodesMethod;
+        if (null == timeoutValue) {
+            timeout = 10000;
+        } else if (!(timeoutValue instanceof Integer)) {
+            nodesMethod = ReflectionUtils.findMethod(timeoutValue.getClass(), "toMillis");
+            timeout = ((Long)ReflectionUtils.invokeMethod(nodesMethod, timeoutValue)).intValue();
+        } else {
+            timeout = (Integer)timeoutValue;
+        }
+
+        if (this.redissonProperties.getConfig() != null) {
+            try {
+                config = Config.fromYAML(this.redissonProperties.getConfig());
+            } catch (IOException var13) {
+                try {
+                    config = Config.fromJSON(this.redissonProperties.getConfig());
+                } catch (IOException var12) {
+                    throw new IllegalArgumentException("Can't parse config", var12);
+                }
+            }
+        } else if (this.redissonProperties.getFile() != null) {
+            try {
+                InputStream is = this.getConfigStream();
+                config = Config.fromYAML(is);
+            } catch (IOException var11) {
+                try {
+                    InputStream is = this.getConfigStream();
+                    config = Config.fromJSON(is);
+                } catch (IOException var10) {
+                    throw new IllegalArgumentException("Can't parse config", var10);
+                }
+            }
+        } else if (this.redisProperties.getSentinel() != null) {
+            nodesMethod = ReflectionUtils.findMethod(Sentinel.class, "getNodes");
+            Object nodesValue = ReflectionUtils.invokeMethod(nodesMethod, this.redisProperties.getSentinel());
+            String[] nodes;
+            if (nodesValue instanceof String) {
+                nodes = this.convert(Arrays.asList(((String)nodesValue).split(",")));
+            } else {
+                nodes = this.convert((List)nodesValue);
+            }
+
+            config = new Config();
+            ((SentinelServersConfig)config.useSentinelServers().setMasterName(this.redisProperties.getSentinel().getMaster()).addSentinelAddress(nodes).setDatabase(this.redisProperties.getDatabase()).setConnectTimeout(timeout)).setPassword(this.redisProperties.getPassword());
+        } else {
+            Method method;
+            if (clusterMethod != null && ReflectionUtils.invokeMethod(clusterMethod, this.redisProperties) != null) {
+                Object clusterObject = ReflectionUtils.invokeMethod(clusterMethod, this.redisProperties);
+                method = ReflectionUtils.findMethod(clusterObject.getClass(), "getNodes");
+                List<String> nodesObject = (List)ReflectionUtils.invokeMethod(method, clusterObject);
+                String[] nodes = this.convert(nodesObject);
+                config = new Config();
+                ((ClusterServersConfig)config.useClusterServers().addNodeAddress(nodes).setConnectTimeout(timeout)).setPassword(this.redisProperties.getPassword());
+            } else {
+                config = new Config();
+                String prefix = "redis://";
+                method = ReflectionUtils.findMethod(RedisProperties.class, "isSsl");
+                if (method != null && (Boolean)ReflectionUtils.invokeMethod(method, this.redisProperties)) {
+                    prefix = "rediss://";
+                }
+
+                ((SingleServerConfig)config.useSingleServer().setAddress(prefix + this.redisProperties.getHost() + ":" + this.redisProperties.getPort()).setConnectTimeout(timeout)).setDatabase(this.redisProperties.getDatabase()).setPassword(this.redisProperties.getPassword());
+            }
+        }
+
+        if (this.redissonAutoConfigurationCustomizers != null) {
+            Iterator var19 = this.redissonAutoConfigurationCustomizers.iterator();
+
+            while(var19.hasNext()) {
+                RedissonAutoConfigurationCustomizer customizer = (RedissonAutoConfigurationCustomizer)var19.next();
+                customizer.customize(config);
+            }
+        }
+
+        return Redisson.create(config);
+    }
+
+    private String[] convert(List<String> nodesObject) {
+        List<String> nodes = new ArrayList(nodesObject.size());
+        Iterator var3 = nodesObject.iterator();
+
+        while(true) {
+            while(var3.hasNext()) {
+                String node = (String)var3.next();
+                if (!node.startsWith("redis://") && !node.startsWith("rediss://")) {
+                    nodes.add("redis://" + node);
+                } else {
+                    nodes.add(node);
+                }
+            }
+
+            return (String[])nodes.toArray(new String[nodes.size()]);
+        }
+    }
+
+    private InputStream getConfigStream() throws IOException {
+        Resource resource = this.ctx.getResource(this.redissonProperties.getFile());
+        InputStream is = resource.getInputStream();
+        return is;
+    }
+}

+ 0 - 0
jetlinks-standalone/src/main/resources/META-INF/spring.factories


+ 1 - 0
jetlinks-standalone/src/main/resources/application.yml

@@ -22,6 +22,7 @@ spring:
     lettuce:
       pool:
         max-active: 1024
+        max-idle: 8
     timeout: 20s
     serializer: jdk # 设置fst时,redis key使用string序列化,value使用 fst序列化.
   #    database: 3