Kaynağa Gözat

fix(订阅/发布):
新增订阅/发布排他模式

18339543638 2 yıl önce
ebeveyn
işleme
1b945ce6c2
18 değiştirilmiş dosya ile 202 ekleme ve 27 silme
  1. 7 0
      tr-dependencies/pom.xml
  2. 1 1
      tr-modules/pom.xml
  3. 0 2
      tr-modules/tr-module-system/src/main/resources/mapper/storage/SysStorageRecordMapper.xml
  4. 1 0
      tr-plugins/pom.xml
  5. 20 9
      tr-plugins/tr-spring-boot-starter-plugin-eventbus/src/main/java/cn/tr/plugin/eventbus/TrEventBusAutoConfiguration.java
  6. 1 1
      tr-plugins/tr-spring-boot-starter-plugin-eventbus/src/main/java/cn/tr/plugin/eventbus/config/EventBus.java
  7. 34 0
      tr-plugins/tr-spring-boot-starter-plugin-eventbus/src/main/java/cn/tr/plugin/eventbus/config/EventBusEnvironmentPostProcessor.java
  8. 11 6
      tr-plugins/tr-spring-boot-starter-plugin-eventbus/src/main/java/cn/tr/plugin/eventbus/config/StdEventBus.java
  9. 9 1
      tr-plugins/tr-spring-boot-starter-plugin-eventbus/src/main/java/cn/tr/plugin/eventbus/config/SubscribeListenerAnnotationBeanPostProcessor.java
  10. 36 0
      tr-plugins/tr-spring-boot-starter-plugin-eventbus/src/main/java/cn/tr/plugin/eventbus/constant/ExEventBusConstant.java
  11. 1 1
      tr-plugins/tr-spring-boot-starter-plugin-eventbus/src/main/java/cn/tr/plugin/eventbus/mq/consumer/EventBusConsumer.java
  12. 29 0
      tr-plugins/tr-spring-boot-starter-plugin-eventbus/src/main/java/cn/tr/plugin/eventbus/mq/consumer/ExEventBusConsumer.java
  13. 32 0
      tr-plugins/tr-spring-boot-starter-plugin-eventbus/src/main/java/cn/tr/plugin/eventbus/mq/producer/ExEventBusProducer.java
  14. 1 0
      tr-plugins/tr-spring-boot-starter-plugin-eventbus/src/main/resources/META-INF/spring.factories
  15. 11 0
      tr-plugins/tr-spring-boot-starter-plugin-eventbus/src/main/resources/application-unit-test.yml
  16. 1 1
      tr-plugins/tr-spring-boot-starter-plugin-eventbus/src/test/java/cn/tr/plugin/eventbus/SpringBootSubPubTest.java
  17. 6 4
      tr-test/pom.xml
  18. 1 1
      tr-test/src/main/resources/application-stream.yml

+ 7 - 0
tr-dependencies/pom.xml

@@ -407,6 +407,13 @@
                 <version>${revision}</version>
             </dependency>
 
+            <!--导入导出-->
+            <dependency>
+                <groupId>cn.tr</groupId>
+                <artifactId>tr-spring-boot-starter-plugin-import-export</artifactId>
+                <version>${revision}</version>
+            </dependency>
+
             <!--websocket插件-->
             <dependency>
                 <groupId>cn.tr</groupId>

+ 1 - 1
tr-modules/pom.xml

@@ -15,7 +15,7 @@
     <modules>
         <module>tr-module-system</module>
         <module>tr-module-gen</module>
-        <module>tr-module-export</module>
+        <module>tr-module-import-export</module>
         <module>tr-module-quartz</module>
     </modules>
 

+ 0 - 2
tr-modules/tr-module-system/src/main/resources/mapper/storage/SysStorageRecordMapper.xml

@@ -25,7 +25,6 @@
         sr.biz_name as biz_name,
         sr.absolute_path as absolute_path,
         sr.config_id as config_id,
-        sr.config_name as config_name,
         sr.size as size,
         sr.suffix as suffix,
         sr.cate_id as cate_id,
@@ -50,7 +49,6 @@
         sr.biz_name as biz_name,
         sr.absolute_path as absolute_path,
         sr.config_id as config_id,
-        sr.config_name as config_name,
         sr.size as size,
         sr.suffix as suffix,
         sr.cate_id as cate_id,

+ 1 - 0
tr-plugins/pom.xml

@@ -37,6 +37,7 @@
         <module>tr-spring-boot-starter-plugin-biz-bean-mapper</module>
         <module>tr-spring-boot-starter-plugin-biz-constant</module>
         <module>tr-spring-boot-starter-plugin-numbering-strategy</module>
+        <module>tr-spring-boot-starter-plugin-import-export</module>
     </modules>
 
 </project>

+ 20 - 9
tr-plugins/tr-spring-boot-starter-plugin-eventbus/src/main/java/cn/tr/plugin/eventbus/TrEventBusAutoConfiguration.java

@@ -4,11 +4,12 @@ package cn.tr.plugin.eventbus;
 import cn.tr.plugin.eventbus.config.EventBus;
 import cn.tr.plugin.eventbus.config.StdEventBus;
 import cn.tr.plugin.eventbus.config.SubscribeListenerAnnotationBeanPostProcessor;
-import cn.tr.plugin.eventbus.mq.consumer.EvenBusConsumer;
+import cn.tr.plugin.eventbus.mq.consumer.EventBusConsumer;
+import cn.tr.plugin.eventbus.mq.consumer.ExEventBusConsumer;
 import cn.tr.plugin.eventbus.mq.message.EvenBusMessageEvent;
 import cn.tr.plugin.eventbus.mq.producer.EvenBusProducer;
+import cn.tr.plugin.eventbus.mq.producer.ExEventBusProducer;
 import org.springframework.beans.factory.annotation.Value;
-import org.springframework.boot.autoconfigure.AutoConfigureAfter;
 import org.springframework.cloud.bus.ServiceMatcher;
 import org.springframework.cloud.bus.jackson.RemoteApplicationEventScan;
 import org.springframework.context.ApplicationEventPublisher;
@@ -21,26 +22,36 @@ import org.springframework.context.annotation.Bean;
  * @Date: 2023年03月15日
  */
 @RemoteApplicationEventScan(basePackageClasses = EvenBusMessageEvent.class)
-@AutoConfigureAfter({ApplicationEventPublisher.class,ServiceMatcher.class})
 public class TrEventBusAutoConfiguration {
 
+    @Bean
+    public ExEventBusConsumer exEventBusConsumer(){
+        return new ExEventBusConsumer();
+    }
+
+
+    @Bean
+    public ExEventBusProducer exEventBusProducer(){
+        return new ExEventBusProducer();
+    }
+
     @Value("${spring.application.name}")
     protected String applicationName;
 
     @Bean
-    public EvenBusConsumer evenBusConsumer(){
-        return new EvenBusConsumer();
+    public EventBusConsumer eventBusConsumer(){
+        return new EventBusConsumer();
     }
 
     @Bean
-    public EvenBusProducer evenBusProducer( ApplicationEventPublisher applicationEventPublisher,ServiceMatcher serviceMatcher){
+    public EvenBusProducer eventBusProducer( ApplicationEventPublisher applicationEventPublisher,ServiceMatcher serviceMatcher){
         return new EvenBusProducer(applicationEventPublisher,serviceMatcher,applicationName);
     }
 
-
     @Bean
-    public EventBus eventBus(EvenBusConsumer evenBusConsumer, EvenBusProducer evenBusProducer){
-        return new StdEventBus(evenBusConsumer,evenBusProducer);
+    public EventBus eventBus(EventBusConsumer eventBusConsumer, EvenBusProducer evenBusProducer,
+                             ExEventBusConsumer excludeConsumer, ExEventBusProducer excludeProducer){
+        return new StdEventBus(eventBusConsumer,evenBusProducer,excludeProducer,excludeConsumer);
     }
 
     @Bean

+ 1 - 1
tr-plugins/tr-spring-boot-starter-plugin-eventbus/src/main/java/cn/tr/plugin/eventbus/config/EventBus.java

@@ -69,7 +69,7 @@ public interface EventBus {
     };
 
     /**
-     * 发布排他主题消息(仅本机使用)
+     * 发布排他主题消息
      * @param topic
      * @param message
      * @param <T>

+ 34 - 0
tr-plugins/tr-spring-boot-starter-plugin-eventbus/src/main/java/cn/tr/plugin/eventbus/config/EventBusEnvironmentPostProcessor.java

@@ -0,0 +1,34 @@
+package cn.tr.plugin.eventbus.config;
+
+import cn.tr.plugin.eventbus.constant.ExEventBusConstant;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.env.EnvironmentPostProcessor;
+import org.springframework.cloud.bus.BusEnvironmentPostProcessor;
+import org.springframework.core.env.ConfigurableEnvironment;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @ClassName : EventBusEnvironmentPostProcessor
+ * @Description :
+ * @Author : LF
+ * @Date: 2023年05月16日
+ */
+
+public class EventBusEnvironmentPostProcessor implements EnvironmentPostProcessor {
+    @Override
+    public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) {
+        Map<String, Object> defaults = new HashMap<>();
+
+        defaults.put("spring.cloud.stream.bindings." + ExEventBusConstant.INPUT + ".group", "default");
+        defaults.put("spring.cloud.stream.bindings." + ExEventBusConstant.INPUT + ".destination", ExEventBusConstant.DESTINATION);
+
+
+        defaults.put("spring.cloud.stream.bindings." + ExEventBusConstant.OUTPUT + ".destination", ExEventBusConstant.DESTINATION);
+
+        defaults.put("spring.cloud.function.definition", ExEventBusConstant.BUS_CONSUMER+";"+ExEventBusConstant.BUS_PRODUCER);
+
+        BusEnvironmentPostProcessor.addOrReplace(environment.getPropertySources(), defaults, ExEventBusConstant.DEFAULTS_PROPERTY_SOURCE_NAME, false);
+    }
+}

+ 11 - 6
tr-plugins/tr-spring-boot-starter-plugin-eventbus/src/main/java/cn/tr/plugin/eventbus/config/StdEventBus.java

@@ -3,8 +3,10 @@ package cn.tr.plugin.eventbus.config;
 
 import cn.hutool.core.collection.CollectionUtil;
 import cn.hutool.core.util.StrUtil;
-import cn.tr.plugin.eventbus.mq.consumer.EvenBusConsumer;
+import cn.tr.plugin.eventbus.mq.consumer.EventBusConsumer;
+import cn.tr.plugin.eventbus.mq.consumer.ExEventBusConsumer;
 import cn.tr.plugin.eventbus.mq.producer.EvenBusProducer;
+import cn.tr.plugin.eventbus.mq.producer.ExEventBusProducer;
 import cn.tr.plugin.eventbus.proxy.ProxyMessageListener;
 import cn.tr.plugin.eventbus.mq.message.TopicPayload;
 import cn.tr.plugin.eventbus.utils.TopicUtils;
@@ -30,11 +32,14 @@ public class StdEventBus implements EventBus {
     private static final Map<String,List<ProxyMessageListener>> regexMatchMap = new HashMap<>();
 
     private final EvenBusProducer evenBusProducer;
-    public StdEventBus(EvenBusConsumer evenBusConsumer,EvenBusProducer evenBusProducer) {
-        this.evenBusProducer=evenBusProducer;
 
-        evenBusConsumer.getMsgSink()
+    private final ExEventBusProducer excludeProducer;
+    public StdEventBus(EventBusConsumer eventBusConsumer, EvenBusProducer evenBusProducer, ExEventBusProducer excludeProducer, ExEventBusConsumer excludeConsumer) {
+        this.evenBusProducer=evenBusProducer;
+        this.excludeProducer=excludeProducer;
+        eventBusConsumer.getMsgSink()
                 .asFlux()
+                .mergeWith(excludeConsumer.getMsgSink().asFlux())
                 .doOnNext(this::doPublish)
                 .subscribe();
     }
@@ -62,7 +67,7 @@ public class StdEventBus implements EventBus {
         if(share){
             evenBusProducer.sendEvenBusMsg(payload);
         }else {
-            doPublish(payload);
+            excludeProducer.sendEvenBusMsg(payload);
         }
         return 1L;
     }
@@ -88,7 +93,7 @@ public class StdEventBus implements EventBus {
      */
     private Long doPublish(Map<String, List<ProxyMessageListener>> topicMap,TopicPayload payload,boolean regex){
         String payloadTopic = payload.getTopic();
-        ArrayList<ProxyMessageListener> publishListeners = new ArrayList<>();
+        Set<ProxyMessageListener> publishListeners = new HashSet<>();
         topicMap.forEach((listenerTopic,listeners)->{
             boolean match=false;
             if(regex){

+ 9 - 1
tr-plugins/tr-spring-boot-starter-plugin-eventbus/src/main/java/cn/tr/plugin/eventbus/config/SubscribeListenerAnnotationBeanPostProcessor.java

@@ -30,7 +30,7 @@ import java.util.stream.Stream;
 @Slf4j
 public class SubscribeListenerAnnotationBeanPostProcessor implements BeanPostProcessor {
     private final ConcurrentMap<Class<?>, TypeMetadata> typeCache = new ConcurrentHashMap<>();
-
+    private final Set<String> registerSubscribeIdSet=new HashSet<>();
     private final EventBus eventBus;
 
     public SubscribeListenerAnnotationBeanPostProcessor(EventBus eventBus) {
@@ -66,11 +66,19 @@ public class SubscribeListenerAnnotationBeanPostProcessor implements BeanPostPro
         if(isArray.isPresent()){
             throw new UnsupportedOperationException("The parameter of class :"+ objClass.getName()+"method : "+method.getName()+"   cannot be an array");
         }
+        String subscribeId=objClass.getName()+"-"+method.getName();
+        if(method.getParameters()!=null&&method.getParameters().length>0){
+            subscribeId =subscribeId+"-"+ method.getParameters()[0].getType().getCanonicalName();
+        }
+        if(registerSubscribeIdSet.contains(subscribeId)){
+            return;
+        }
         ProxyMessageListener proxyMessageListener = new ProxyMessageListener(bean, method);
         Set<String> topics = Stream.of(subscribe.value()).collect(Collectors.toSet());
         for (String topic : topics) {
             eventBus.subscribe(topic,proxyMessageListener);
         }
+        registerSubscribeIdSet.add(subscribeId);
     }
 
 

+ 36 - 0
tr-plugins/tr-spring-boot-starter-plugin-eventbus/src/main/java/cn/tr/plugin/eventbus/constant/ExEventBusConstant.java

@@ -0,0 +1,36 @@
+package cn.tr.plugin.eventbus.constant;
+
+/**
+ * @ClassName : ExEventBusConstant
+ * @Description :
+ * @Author : LF
+ * @Date: 2023年05月16日
+ */
+
+public interface ExEventBusConstant {
+    String DEFAULTS_PROPERTY_SOURCE_NAME = "eventBusDefaultProperties";
+    /**
+     * 入口通道
+     */
+    String INPUT = "exEventBusConsumer-in-0";
+
+    /**
+     * 出口通道
+     */
+    String OUTPUT = "exEventBusProducer-out-0";
+
+    /**
+     * 目的地名称
+     */
+    String DESTINATION = "eventBus";
+
+    /**
+     * 排他消息总线消费者
+     */
+    String BUS_CONSUMER = "exEventBusConsumer";
+
+    /**
+     * 排他消息总线生产者
+     */
+    String BUS_PRODUCER = "exEventBusProducer";
+}

+ 1 - 1
tr-plugins/tr-spring-boot-starter-plugin-eventbus/src/main/java/cn/tr/plugin/eventbus/mq/consumer/EvenBusConsumer.java → tr-plugins/tr-spring-boot-starter-plugin-eventbus/src/main/java/cn/tr/plugin/eventbus/mq/consumer/EventBusConsumer.java

@@ -12,7 +12,7 @@ import reactor.core.publisher.Sinks;
  * @Author : LF
  * @Date: 2023年05月11日
  */
-public class EvenBusConsumer {
+public class EventBusConsumer {
     @Getter
     private Sinks.Many<TopicPayload> msgSink = Sinks.many().unicast().onBackpressureBuffer();
     @EventListener

+ 29 - 0
tr-plugins/tr-spring-boot-starter-plugin-eventbus/src/main/java/cn/tr/plugin/eventbus/mq/consumer/ExEventBusConsumer.java

@@ -0,0 +1,29 @@
+package cn.tr.plugin.eventbus.mq.consumer;
+
+import cn.tr.plugin.eventbus.mq.message.TopicPayload;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.Sinks;
+import java.util.function.Function;
+
+/**
+ * @ClassName : ExEventBusConsumer
+ * @Description : 排他消费者,集群仅消费一次
+ * @Author : LF
+ * @Date: 2023年05月11日
+ */
+@Slf4j
+public class ExEventBusConsumer implements Function<Flux<TopicPayload>, Mono<Void>> {
+    @Getter
+    private Sinks.Many<TopicPayload> msgSink = Sinks.many().unicast().onBackpressureBuffer();
+
+    @Override
+    public Mono<Void> apply(Flux<TopicPayload> flux) {
+        return flux
+                .log("exEventBus")
+                .doOnNext(msgSink::tryEmitNext)
+                .then();
+    }
+}

+ 32 - 0
tr-plugins/tr-spring-boot-starter-plugin-eventbus/src/main/java/cn/tr/plugin/eventbus/mq/producer/ExEventBusProducer.java

@@ -0,0 +1,32 @@
+package cn.tr.plugin.eventbus.mq.producer;
+
+import cn.tr.plugin.eventbus.mq.message.EvenBusMessageEvent;
+import cn.tr.plugin.eventbus.mq.message.TopicPayload;
+import lombok.Getter;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Sinks;
+
+import java.util.function.Supplier;
+
+/**
+ * @ClassName : ExEventBusProducer
+ * @Description : 排他消息生产者
+ * @Author : LF
+ * @Date: 2023年04月25日
+ */
+public class ExEventBusProducer implements Supplier<Flux<TopicPayload>> {
+    @Getter
+    private Sinks.Many<TopicPayload> msgSink = Sinks.many().multicast().onBackpressureBuffer();
+
+    /**
+     * 发送 {@link EvenBusMessageEvent} 消息
+     */
+    public void sendEvenBusMsg(TopicPayload topicPayload) {
+        msgSink.tryEmitNext(topicPayload);
+    }
+
+    @Override
+    public Flux<TopicPayload> get() {
+        return msgSink.asFlux();
+    }
+}

+ 1 - 0
tr-plugins/tr-spring-boot-starter-plugin-eventbus/src/main/resources/META-INF/spring.factories

@@ -0,0 +1 @@
+org.springframework.boot.env.EnvironmentPostProcessor=cn.tr.plugin.eventbus.config.EventBusEnvironmentPostProcessor

+ 11 - 0
tr-plugins/tr-spring-boot-starter-plugin-eventbus/src/main/resources/application-unit-test.yml

@@ -8,5 +8,16 @@ spring:
   cloud:
     bus:
       enabled: true # 是否开启,默认为 true
+#    stream:
+#      bindings:
+#        #短信发送通道名称
+#        exEventBusProducer-out-0:
+#          #交换机名称
+#          destination: eventBus
+#        exEventBusConsumer-in-0:
+#          destination: eventBus
+#          group: tr
+#    function:
+#      definition: exEventBusProducer;exEventBusConsumer
   application:
     name: eventBus

+ 1 - 1
tr-plugins/tr-spring-boot-starter-plugin-eventbus/src/test/java/cn/tr/plugin/eventbus/SpringBootSubPubTest.java

@@ -48,7 +48,7 @@ public class SpringBootSubPubTest {
     @Test
     public void pub(){
         for (int i = 0; i < 10; i++) {
-            eventBus.publish("test"+i,User.of("123","测试"),true);
+            eventBus.publishEx("test"+i,User.of("123","测试"));
         }
         while (true){
 

+ 6 - 4
tr-test/pom.xml

@@ -32,10 +32,12 @@
             <artifactId>tr-spring-boot-starter-plugin-websocket</artifactId>
         </dependency>
 
-        <!--<dependency>-->
-            <!--<groupId>cn.tr</groupId>-->
-            <!--<artifactId>tr-spring-boot-starter-plugin-eventbus</artifactId>-->
-        <!--</dependency>-->
+        <dependency>
+            <groupId>cn.tr</groupId>
+            <artifactId>tr-module-import-export</artifactId>
+            <version>0.0.9</version>
+        </dependency>
+
         <dependency>
             <groupId>org.redisson</groupId>
             <artifactId>redisson-spring-boot-starter</artifactId>

+ 1 - 1
tr-test/src/main/resources/application-stream.yml

@@ -6,4 +6,4 @@ spring:
           connection-name-prefix: ${spring.application.name}
     bus:
       enabled: true # 是否开启,默认为 true
-#      id: ${spring.application.name}:${server.port} # 编号,Spring Cloud Alibaba 建议使用“应用:端口”的格式
+#      id: ${spring.application.name}:${server.port} # 编号,Spring Cloud Alibab