Explorar el Código

fix(消息总线):
远程事件调用全部改为订阅发布
删除冗余代码

18339543638 hace 2 años
padre
commit
2ab8384077
Se han modificado 41 ficheros con 350 adiciones y 633 borrados
  1. 8 7
      tr-dependencies/pom.xml
  2. 1 2
      tr-modules/tr-module-quartz/pom.xml
  3. 0 18
      tr-modules/tr-module-quartz/src/main/java/cn/tr/module/quartz/core/mq/QuartzMqConfig.java
  4. 0 45
      tr-modules/tr-module-quartz/src/main/java/cn/tr/module/quartz/core/mq/consumer/QuartzConsumer.java
  5. 0 22
      tr-modules/tr-module-quartz/src/main/java/cn/tr/module/quartz/core/mq/message/job/QuartzDelMessage.java
  6. 0 24
      tr-modules/tr-module-quartz/src/main/java/cn/tr/module/quartz/core/mq/message/job/QuartzJobChangeStatusMessage.java
  7. 0 21
      tr-modules/tr-module-quartz/src/main/java/cn/tr/module/quartz/core/mq/message/job/QuartzJobRefreshMessage.java
  8. 0 39
      tr-modules/tr-module-quartz/src/main/java/cn/tr/module/quartz/core/mq/producer/QuartzProducer.java
  9. 0 13
      tr-modules/tr-module-quartz/src/main/java/cn/tr/module/quartz/job/service/ISysJobService.java
  10. 69 36
      tr-modules/tr-module-quartz/src/main/java/cn/tr/module/quartz/job/service/impl/SysJobServiceImpl.java
  11. 1 2
      tr-modules/tr-module-system/pom.xml
  12. 0 15
      tr-modules/tr-module-system/src/main/java/cn/tr/module/sys/core/mq/SysMqConfig.java
  13. 0 28
      tr-modules/tr-module-system/src/main/java/cn/tr/module/sys/core/mq/consumer/sms/SmsChannelRefreshConsumer.java
  14. 0 42
      tr-modules/tr-module-system/src/main/java/cn/tr/module/sys/core/mq/consumer/sms/SmsConsumer.java
  15. 0 29
      tr-modules/tr-module-system/src/main/java/cn/tr/module/sys/core/mq/consumer/sms/SmsTemplateRefreshConsumer.java
  16. 0 22
      tr-modules/tr-module-system/src/main/java/cn/tr/module/sys/core/mq/message/sms/SmsChannelRefreshMessage.java
  17. 0 23
      tr-modules/tr-module-system/src/main/java/cn/tr/module/sys/core/mq/message/sms/SmsTemplateRefreshMessage.java
  18. 0 30
      tr-modules/tr-module-system/src/main/java/cn/tr/module/sys/core/mq/producer/sms/SmsProducer.java
  19. 0 45
      tr-modules/tr-module-system/src/main/java/cn/tr/module/sys/core/mq/producer/sms/SmsSupplier.java
  20. 2 2
      tr-modules/tr-module-system/src/main/java/cn/tr/module/sys/sms/dto/SmsSendMessageDTO.java
  21. 2 2
      tr-modules/tr-module-system/src/main/java/cn/tr/module/sys/sms/service/ISmsSendService.java
  22. 30 5
      tr-modules/tr-module-system/src/main/java/cn/tr/module/sys/sms/service/impl/SmsSendServiceImpl.java
  23. 8 5
      tr-modules/tr-module-system/src/main/java/cn/tr/module/sys/sms/service/impl/SysSmsChannelServiceImpl.java
  24. 8 5
      tr-modules/tr-module-system/src/main/java/cn/tr/module/sys/sms/service/impl/SysSmsTempServiceImpl.java
  25. 10 0
      tr-modules/tr-module-system/src/main/java/cn/tr/module/sys/storage/controller/SysStorageConfigController.java
  26. 6 0
      tr-modules/tr-module-system/src/main/java/cn/tr/module/sys/storage/service/ISysStorageConfigService.java
  27. 2 0
      tr-modules/tr-module-system/src/main/java/cn/tr/module/sys/storage/service/impl/StorageFileServiceImpl.java
  28. 40 7
      tr-modules/tr-module-system/src/main/java/cn/tr/module/sys/storage/service/impl/SysStorageConfigServiceImpl.java
  29. 0 1
      tr-plugins/pom.xml
  30. 0 23
      tr-plugins/tr-spring-boot-starter-plugin-bus/pom.xml
  31. 0 41
      tr-plugins/tr-spring-boot-starter-plugin-bus/src/main/java/cn/tr/plugin/bus/AbstractBusProducer.java
  32. 0 17
      tr-plugins/tr-spring-boot-starter-plugin-eventbus/src/main/java/cn/tr/plugin/EvenBusApplication.java
  33. 1 4
      tr-plugins/tr-spring-boot-starter-plugin-eventbus/src/main/java/cn/tr/plugin/eventbus/TrEventBusAutoConfiguration.java
  34. 34 11
      tr-plugins/tr-spring-boot-starter-plugin-eventbus/src/main/java/cn/tr/plugin/eventbus/config/EventBus.java
  35. 113 0
      tr-plugins/tr-spring-boot-starter-plugin-eventbus/src/main/java/cn/tr/plugin/eventbus/config/StdEventBus.java
  36. 5 1
      tr-plugins/tr-spring-boot-starter-plugin-eventbus/src/main/java/cn/tr/plugin/eventbus/mq/message/TopicPayload.java
  37. 2 5
      tr-plugins/tr-spring-boot-starter-plugin-eventbus/src/main/java/cn/tr/plugin/eventbus/proxy/ProxyMessageListener.java
  38. 0 23
      tr-plugins/tr-spring-boot-starter-plugin-eventbus/src/main/java/cn/tr/plugin/eventbus/utils/EventBusStrategy.java
  39. 8 4
      tr-plugins/tr-spring-boot-starter-plugin-eventbus/src/test/java/cn/tr/plugin/eventbus/SpringBootSubPubTest.java
  40. 0 4
      tr-test/src/main/java/cn/tr/test/WebApplication.java
  41. 0 10
      tr-test/src/main/resources/application-stream.yml

+ 8 - 7
tr-dependencies/pom.xml

@@ -70,6 +70,8 @@
         <stream-rabbit.version>3.2.6</stream-rabbit.version>
 
         <bus-rabbit.version>3.1.2</bus-rabbit.version>
+
+        <easy-captcha.version>1.6.2</easy-captcha.version>
     </properties>
 
 
@@ -261,6 +263,12 @@
                 <version>${javassist.version}</version>
             </dependency>
 
+            <dependency>
+                <groupId>com.github.whvcse</groupId>
+                <artifactId>easy-captcha</artifactId>
+                <version>${easy-captcha.version}</version>
+            </dependency>
+
             <!--flink 组件-->
             <dependency>
                 <groupId>org.apache.flink</groupId>
@@ -441,13 +449,6 @@
                 <version>${revision}</version>
             </dependency>
 
-            <!--消息总线模块-->
-            <dependency>
-                <groupId>cn.tr</groupId>
-                <artifactId>tr-spring-boot-starter-plugin-bus</artifactId>
-                <version>${revision}</version>
-            </dependency>
-
             <!--短信模块-->
             <dependency>
                 <groupId>cn.tr</groupId>

+ 1 - 2
tr-modules/tr-module-quartz/pom.xml

@@ -53,9 +53,8 @@
 
         <dependency>
             <groupId>cn.tr</groupId>
-            <artifactId>tr-spring-boot-starter-plugin-bus</artifactId>
+            <artifactId>tr-spring-boot-starter-plugin-eventbus</artifactId>
         </dependency>
-
         <!-- 定时任务 -->
         <!-- 引入 Quartz 依赖-->
         <dependency>

+ 0 - 18
tr-modules/tr-module-quartz/src/main/java/cn/tr/module/quartz/core/mq/QuartzMqConfig.java

@@ -1,18 +0,0 @@
-package cn.tr.module.quartz.core.mq;
-
-import cn.tr.module.quartz.core.mq.message.job.QuartzDelMessage;
-import cn.tr.module.quartz.core.mq.message.job.QuartzJobChangeStatusMessage;
-import cn.tr.module.quartz.core.mq.message.job.QuartzJobRefreshMessage;
-import org.springframework.cloud.bus.jackson.RemoteApplicationEventScan;
-import org.springframework.context.annotation.Configuration;
-
-/**
- * @ClassName : QuartzMqConfig
- * @Description :
- * @Author : LF
- * @Date: 2023年05月06日
- */
-@Configuration
-@RemoteApplicationEventScan(basePackageClasses = {QuartzDelMessage.class, QuartzJobRefreshMessage.class, QuartzJobChangeStatusMessage.class})
-public class QuartzMqConfig {
-}

+ 0 - 45
tr-modules/tr-module-quartz/src/main/java/cn/tr/module/quartz/core/mq/consumer/QuartzConsumer.java

@@ -1,45 +0,0 @@
-package cn.tr.module.quartz.core.mq.consumer;
-
-import cn.tr.module.quartz.core.mq.message.job.QuartzDelMessage;
-import cn.tr.module.quartz.core.mq.message.job.QuartzJobChangeStatusMessage;
-import cn.tr.module.quartz.core.mq.message.job.QuartzJobRefreshMessage;
-import cn.tr.module.quartz.exception.TaskException;
-import cn.tr.module.quartz.job.dto.SysJobDTO;
-import cn.tr.module.quartz.job.service.ISysJobService;
-import lombok.AllArgsConstructor;
-import org.quartz.SchedulerException;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.context.event.EventListener;
-
-import java.util.Collection;
-
-/**
- * @ClassName : QuartzConsumer
- * @Description :
- * @Author : LF
- * @Date: 2023年05月06日
- */
-@Configuration
-@AllArgsConstructor
-public class QuartzConsumer {
-    private final ISysJobService jobService;
-    @EventListener
-    public void quartzDelListener(QuartzDelMessage message) throws SchedulerException {
-        Collection<SysJobDTO> jobs = message.getJobs();
-        for (SysJobDTO job : jobs) {
-            jobService.delCacheJob(job);
-        }
-
-    }
-
-    @EventListener
-    public void quartRefreshListener(QuartzJobRefreshMessage message) throws TaskException, SchedulerException {
-        String jobId = message.getJobId();
-        jobService.refreshCacheJob(jobId);
-    }
-
-    @EventListener
-    public void quartChangeStatusListener(QuartzJobChangeStatusMessage message) throws SchedulerException {
-        jobService.changeStatus(message.getJobId(),message.getStatus());
-    }
-}

+ 0 - 22
tr-modules/tr-module-quartz/src/main/java/cn/tr/module/quartz/core/mq/message/job/QuartzDelMessage.java

@@ -1,22 +0,0 @@
-package cn.tr.module.quartz.core.mq.message.job;
-
-import cn.tr.module.quartz.job.dto.SysJobDTO;
-import lombok.Getter;
-import org.springframework.cloud.bus.event.RemoteApplicationEvent;
-import java.util.*;
-/**
- * @ClassName : QuartzJobRefreshMessage
- * @Description : 任务调度刷新
- * @Author : LF
- * @Date: 2023年05月06日
- */
-
-public class QuartzDelMessage extends RemoteApplicationEvent {
-    @Getter
-    private Collection<SysJobDTO> jobs;
-
-    public QuartzDelMessage(Object source,Collection<SysJobDTO> jobs, String originService, String destinationService) {
-        super(source, originService, DEFAULT_DESTINATION_FACTORY.getDestination(destinationService));
-        this.jobs=jobs;
-    }
-}

+ 0 - 24
tr-modules/tr-module-quartz/src/main/java/cn/tr/module/quartz/core/mq/message/job/QuartzJobChangeStatusMessage.java

@@ -1,24 +0,0 @@
-package cn.tr.module.quartz.core.mq.message.job;
-
-import lombok.Getter;
-import org.springframework.cloud.bus.event.RemoteApplicationEvent;
-
-/**
- * @ClassName : QuartzJobRefreshMessage
- * @Description : 任务调度刷新
- * @Author : LF
- * @Date: 2023年05月06日
- */
-
-public class QuartzJobChangeStatusMessage extends RemoteApplicationEvent {
-    @Getter
-    private String jobId;
-    @Getter
-    private String status;
-
-    public QuartzJobChangeStatusMessage(Object source, String jobId, String status,String originService, String destinationService) {
-        super(source, originService, DEFAULT_DESTINATION_FACTORY.getDestination(destinationService));
-        this.jobId=jobId;
-        this.status=status;
-    }
-}

+ 0 - 21
tr-modules/tr-module-quartz/src/main/java/cn/tr/module/quartz/core/mq/message/job/QuartzJobRefreshMessage.java

@@ -1,21 +0,0 @@
-package cn.tr.module.quartz.core.mq.message.job;
-
-import lombok.Getter;
-import org.springframework.cloud.bus.event.RemoteApplicationEvent;
-
-/**
- * @ClassName : QuartzJobRefreshMessage
- * @Description : 任务调度刷新
- * @Author : LF
- * @Date: 2023年05月06日
- */
-
-public class QuartzJobRefreshMessage extends RemoteApplicationEvent {
-    @Getter
-    private String jobId;
-
-    public QuartzJobRefreshMessage(Object source,String jobId, String originService, String destinationService) {
-        super(source, originService, DEFAULT_DESTINATION_FACTORY.getDestination(destinationService));
-        this.jobId=jobId;
-    }
-}

+ 0 - 39
tr-modules/tr-module-quartz/src/main/java/cn/tr/module/quartz/core/mq/producer/QuartzProducer.java

@@ -1,39 +0,0 @@
-package cn.tr.module.quartz.core.mq.producer;
-
-import cn.tr.module.quartz.core.mq.message.job.QuartzDelMessage;
-import cn.tr.module.quartz.core.mq.message.job.QuartzJobChangeStatusMessage;
-import cn.tr.module.quartz.core.mq.message.job.QuartzJobRefreshMessage;
-import cn.tr.module.quartz.job.dto.SysJobDTO;
-import cn.tr.plugin.bus.AbstractBusProducer;
-import org.springframework.stereotype.Component;
-import java.util.*;
-/**
- * @ClassName : SmsProducer
- * @Description :
- * @Author : LF
- * @Date: 2023年04月25日
- */
-@Component
-public class QuartzProducer extends AbstractBusProducer {
-
-    /**
-     * 发送 {@link QuartzJobRefreshMessage} 消息
-     */
-    public void sendQuartzJobChangeStatusMessage(String jobId,String status) {
-        publishEvent(new QuartzJobChangeStatusMessage(this,jobId,status, getBusId(), selfDestinationService()));
-    }
-
-    /**
-     * 发送 {@link QuartzJobRefreshMessage} 消息
-     */
-    public void sendQuartzJobRefreshMessage(String jobId) {
-        publishEvent(new QuartzJobRefreshMessage(this,jobId, getBusId(), selfDestinationService()));
-    }
-
-    /**
-     * 发送 {@link QuartzDelMessage} 消息
-     */
-    public void sendQuartzDelMessage(Collection<SysJobDTO> jobs) {
-        publishEvent(new QuartzDelMessage(this,jobs, getBusId(), selfDestinationService()));
-    }
-}

+ 0 - 13
tr-modules/tr-module-quartz/src/main/java/cn/tr/module/quartz/job/service/ISysJobService.java

@@ -4,7 +4,6 @@ import cn.tr.module.quartz.exception.TaskException;
 import cn.tr.module.quartz.job.dto.SysJobDTO;
 import cn.tr.module.quartz.job.dto.SysJobQueryDTO;
 import org.quartz.SchedulerException;
-import org.springframework.transaction.annotation.Transactional;
 
 import java.util.*;
 
@@ -81,18 +80,6 @@ public interface ISysJobService{
      */
     boolean changeStatus(String id, String status) throws SchedulerException;
 
-    /**
-     * 删除缓存中调度任务
-     * @param job 任务
-     */
-    void delCacheJob(SysJobDTO job) throws SchedulerException;
-
-    /**
-     * 更新缓存中调度任务
-     * @param jobId 任务id
-     */
-    void refreshCacheJob(String jobId) throws TaskException, SchedulerException;
-
     /**
      * 恢复任务
      * @param jobId 任务id

+ 69 - 36
tr-modules/tr-module-quartz/src/main/java/cn/tr/module/quartz/job/service/impl/SysJobServiceImpl.java

@@ -1,12 +1,14 @@
 package cn.tr.module.quartz.job.service.impl;
 
+import cn.hutool.core.util.StrUtil;
 import cn.tr.core.exception.ServiceException;
 import cn.tr.core.exception.TRExcCode;
 import cn.tr.module.quartz.constant.ScheduleConstants;
-import cn.tr.module.quartz.core.mq.producer.QuartzProducer;
 import cn.tr.module.quartz.exception.TaskException;
 import cn.tr.module.quartz.utils.CronUtils;
 import cn.tr.module.quartz.utils.ScheduleUtils;
+import cn.tr.plugin.eventbus.annotation.Subscribe;
+import cn.tr.plugin.eventbus.config.EventBus;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
@@ -38,8 +40,25 @@ import javax.annotation.PostConstruct;
 @Service
 @AllArgsConstructor
 public class SysJobServiceImpl extends ServiceImpl<SysJobRepository,SysJobPO> implements ISysJobService {
+    /**
+     * 任务更新主题
+     */
+    private final String JOB_UPDATE_TOPIC="/quartz/job/update";
+    /**
+     * 任务状态刷新主题
+     */
+    private final String JOB_STATUS_TOPIC="/quartz/job/status";
+    /**
+     * 任务新增主题
+     */
+    private final String JOB_INSERT_TOPIC="/quartz/job/insert";
+    /**
+     * 任务删除主题
+     */
+    private final String JOB_REMOVE_TOPIC="/quartz/job/remove";
+
     private final Scheduler scheduler;
-    private final QuartzProducer quartzProducer;
+    private final EventBus eventBus;
     @PostConstruct
     @Override
     public void initLocal() throws TaskException, SchedulerException {
@@ -86,11 +105,10 @@ public class SysJobServiceImpl extends ServiceImpl<SysJobRepository,SysJobPO> im
      */
     @Transactional(rollbackFor = Exception.class)
     @Override
-    public boolean updateSysJobById(SysJobDTO source) throws SchedulerException, TaskException {
-        SysJobDTO properties = selectSysJobById(source.getJobId());
+    public boolean updateSysJobById(SysJobDTO source){
         int rows = baseMapper.updateById(SysJobMapper.INSTANCE.convertPO(source));
         if (rows > 0) {
-            updateSchedulerJob(source, properties.getJobGroup());
+            eventBus.publishShare(JOB_UPDATE_TOPIC,source.getJobId());
         }
         return rows!=0;
     };
@@ -103,13 +121,12 @@ public class SysJobServiceImpl extends ServiceImpl<SysJobRepository,SysJobPO> im
      */
     @Override
     @Transactional(rollbackFor = Exception.class)
-    public boolean insertSysJob(SysJobDTO source) throws SchedulerException, TaskException {
+    public boolean insertSysJob(SysJobDTO source) {
         source.setStatus(ScheduleConstants.Status.PAUSE.getValue());
         SysJobPO po = SysJobMapper.INSTANCE.convertPO(source);
         int rows = baseMapper.insert(po);
         if (rows > 0) {
-            source.setJobId(po.getJobId());
-            ScheduleUtils.createScheduleJob(scheduler, source);
+            eventBus.publishShare(JOB_INSERT_TOPIC,po.getJobId());
         }
         return rows!=0;
     };
@@ -122,14 +139,15 @@ public class SysJobServiceImpl extends ServiceImpl<SysJobRepository,SysJobPO> im
      */
     @Override
     @Transactional(rollbackFor = Exception.class)
-    public int removeSysJobByIds(Collection<String> ids) throws SchedulerException {
+    public int removeSysJobByIds(Collection<String> ids) {
         List<SysJobPO> jobs = this.listByIds(ids);
         if(CollectionUtil.isEmpty(jobs)){
             return CollectionUtil.size(ids);
         }
-        int result = baseMapper.deleteBatchIds(ids);
-        removeJobs(SysJobMapper.INSTANCE.convertDtoList(jobs));
-        return result;
+        for (SysJobDTO sysJobDTO : SysJobMapper.INSTANCE.convertDtoList(jobs)) {
+            eventBus.publishShare(JOB_REMOVE_TOPIC,sysJobDTO);
+        }
+        return baseMapper.deleteBatchIds(ids);
     }
 
     @Override
@@ -155,40 +173,29 @@ public class SysJobServiceImpl extends ServiceImpl<SysJobRepository,SysJobPO> im
     @Override
     @Transactional(rollbackFor = Exception.class)
     public boolean changeStatus(String id, String status) throws SchedulerException {
-        quartzProducer.sendQuartzJobChangeStatusMessage(id,status);
+        SysJobPO job = this.baseMapper.selectById(id);
         if (ScheduleConstants.Status.NORMAL.getValue().equals(status)) {
-            return resumeJob(id);
+            job.setStatus(ScheduleConstants.Status.NORMAL.getValue());
         }
         else if (ScheduleConstants.Status.PAUSE.getValue().equals(status)) {
-            return pauseJob(id);
+            job.setStatus(ScheduleConstants.Status.PAUSE.getValue());
         }
-        return false;
-    }
-
-    @Override
-    public void delCacheJob(SysJobDTO job) throws SchedulerException {
-        removeJobs(Collections.singleton(job));
-    }
-
-    @Override
-    public void refreshCacheJob(String jobId) throws TaskException, SchedulerException {
-        SysJobDTO job = selectSysJobById(jobId);
-        updateSchedulerJob(job,job.getJobGroup());
+        this.baseMapper.updateById(job);
+        this.eventBus.publishShare(JOB_STATUS_TOPIC,id);
+        return true;
     }
 
 
     /**
      * 删除调度器中的任务
-     * @param jobs
+     * @param job
      * @throws SchedulerException
      */
-    private void removeJobs(Collection<SysJobDTO> jobs) throws SchedulerException {
-        for (SysJobDTO job : jobs) {
-            String jobId = job.getJobId();
-            String jobGroup = job.getJobGroup();
-            scheduler.deleteJob(ScheduleUtils.getJobKey(jobId, jobGroup));
-        }
-        quartzProducer.sendQuartzDelMessage(jobs);
+    private void removeJobs(SysJobDTO job) throws SchedulerException {
+        String jobId = job.getJobId();
+        String jobGroup = job.getJobGroup();
+        scheduler.deleteJob(ScheduleUtils.getJobKey(jobId, jobGroup));
+
     }
 
     /**
@@ -206,7 +213,6 @@ public class SysJobServiceImpl extends ServiceImpl<SysJobRepository,SysJobPO> im
             scheduler.deleteJob(jobKey);
         }
         ScheduleUtils.createScheduleJob(scheduler, job);
-        quartzProducer.sendQuartzJobRefreshMessage(jobId);
     }
 
     /**
@@ -250,4 +256,31 @@ public class SysJobServiceImpl extends ServiceImpl<SysJobRepository,SysJobPO> im
         }
         return rows!=0;
     }
+
+    @Subscribe(JOB_STATUS_TOPIC)
+    public void jobStatusChange(String jobId) throws SchedulerException {
+        SysJobDTO job = selectSysJobById(jobId);
+        if(StrUtil.equals(ScheduleConstants.Status.NORMAL.getValue(),job.getStatus())){
+            scheduler.resumeJob(ScheduleUtils.getJobKey(jobId, job.getJobGroup()));
+        }else {
+            scheduler.pauseJob(ScheduleUtils.getJobKey(jobId, job.getJobGroup()));
+        }
+    }
+
+    @Subscribe(JOB_REMOVE_TOPIC)
+    public void jobRemove(SysJobDTO job) throws SchedulerException {
+        removeJobs(job);
+    }
+
+    @Subscribe(JOB_INSERT_TOPIC)
+    public void jobCreate(String jobId) throws TaskException, SchedulerException {
+        SysJobDTO job = selectSysJobById(jobId);
+        ScheduleUtils.createScheduleJob(scheduler, job);
+    }
+
+    @Subscribe(JOB_UPDATE_TOPIC)
+    public void jobUpdate(String jobId) throws TaskException, SchedulerException {
+        SysJobDTO job = selectSysJobById(jobId);
+        updateSchedulerJob(job, job.getJobGroup());
+    }
 }

+ 1 - 2
tr-modules/tr-module-system/pom.xml

@@ -101,13 +101,12 @@
 
         <dependency>
             <groupId>cn.tr</groupId>
-            <artifactId>tr-spring-boot-starter-plugin-bus</artifactId>
+            <artifactId>tr-spring-boot-starter-plugin-eventbus</artifactId>
         </dependency>
 
         <dependency>
             <groupId>com.github.whvcse</groupId>
             <artifactId>easy-captcha</artifactId>
-            <version>1.6.2</version>
         </dependency>
 
     </dependencies>

+ 0 - 15
tr-modules/tr-module-system/src/main/java/cn/tr/module/sys/core/mq/SysMqConfig.java

@@ -1,15 +0,0 @@
-package cn.tr.module.sys.core.mq;
-
-import org.springframework.cloud.bus.jackson.RemoteApplicationEventScan;
-import org.springframework.context.annotation.Configuration;
-
-/**
- * @ClassName : QuartzMqConfig
- * @Description :
- * @Author : LF
- * @Date: 2023年05月06日
- */
-@Configuration
-@RemoteApplicationEventScan(basePackages = {"cn.tr.module.sys.core.mq.message.*"})
-public class SysMqConfig {
-}

+ 0 - 28
tr-modules/tr-module-system/src/main/java/cn/tr/module/sys/core/mq/consumer/sms/SmsChannelRefreshConsumer.java

@@ -1,28 +0,0 @@
-package cn.tr.module.sys.core.mq.consumer.sms;
-
-import cn.tr.module.sys.core.mq.message.sms.SmsChannelRefreshMessage;
-import cn.tr.module.sys.sms.service.ISysSmsChannelService;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.context.event.EventListener;
-import org.springframework.stereotype.Component;
-
-import javax.annotation.Resource;
-
-/**
- * 针对 {@link SmsChannelRefreshMessage} 的消费者
- *
- * @author 芋道源码
- */
-@Component
-@Slf4j
-public class SmsChannelRefreshConsumer {
-
-    @Resource
-    private ISysSmsChannelService smsChannelService;
-
-    @EventListener
-    public void execute(SmsChannelRefreshMessage message) {
-        log.info("[execute][收到 SmsChannel 刷新消息]");
-        smsChannelService.initLocalCache();
-    }
-}

+ 0 - 42
tr-modules/tr-module-system/src/main/java/cn/tr/module/sys/core/mq/consumer/sms/SmsConsumer.java

@@ -1,42 +0,0 @@
-package cn.tr.module.sys.core.mq.consumer.sms;
-
-import cn.tr.module.sys.core.mq.message.sms.SmsSendMessage;
-import cn.tr.module.sys.core.mq.producer.sms.SmsSupplier;
-import cn.tr.module.sys.sms.service.ISmsSendService;
-import lombok.AllArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Component;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.util.function.Function;
-
-/**
- * 针对 {@link SmsSendMessage} 的消费者
- * @see  {https://docs.spring.io/spring-cloud-stream/docs/3.1.0/reference/html/spring-cloud-stream.html#_suppliers_sources}
- *
- * provider {@link SmsSupplier}
- * 默认的主题名称是通过
- * 输入:    <方法名> + -in- + <index>
- * 输出:    <方法名> + -out- + <index>
- * @author lf
- */
-@Component
-@Slf4j
-@AllArgsConstructor
-public class SmsConsumer implements Function<Flux<SmsSendMessage>, Mono<Void>> {
-    private final ISmsSendService smsSendService;
-
-    @Override
-    public Mono<Void> apply(Flux<SmsSendMessage> flux) {
-        return flux
-                .log("smsConsumer")
-                .doOnNext(smsSendService::doSendSms)
-                .onErrorContinue((t,m)->{
-                    if(t!=null){
-                        log.error("[SmsConsumer] 信息发送失败,",t);
-                    }
-                })
-                .then();
-    }
-}

+ 0 - 29
tr-modules/tr-module-system/src/main/java/cn/tr/module/sys/core/mq/consumer/sms/SmsTemplateRefreshConsumer.java

@@ -1,29 +0,0 @@
-package cn.tr.module.sys.core.mq.consumer.sms;
-
-import cn.tr.module.sys.core.mq.message.sms.SmsTemplateRefreshMessage;
-import cn.tr.module.sys.sms.service.ISysSmsTempService;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.context.event.EventListener;
-import org.springframework.stereotype.Component;
-
-import javax.annotation.Resource;
-
-/**
- * 针对 {@link SmsTemplateRefreshMessage} 的消费者
- *
- * @author 芋道源码
- */
-@Component
-@Slf4j
-public class SmsTemplateRefreshConsumer {
-
-    @Resource
-    private ISysSmsTempService smsTemplateService;
-
-    @EventListener
-    public void execute(SmsTemplateRefreshMessage message) {
-        log.info("[execute][收到 SmsTemplate 刷新消息]");
-        smsTemplateService.initLocalCache();
-    }
-
-}

+ 0 - 22
tr-modules/tr-module-system/src/main/java/cn/tr/module/sys/core/mq/message/sms/SmsChannelRefreshMessage.java

@@ -1,22 +0,0 @@
-package cn.tr.module.sys.core.mq.message.sms;
-
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import org.springframework.cloud.bus.event.RemoteApplicationEvent;
-
-/**
- * 短信渠道的数据刷新 Message
- *
- * @author 芋道源码
- */
-@Data
-@EqualsAndHashCode(callSuper = true)
-public class SmsChannelRefreshMessage extends RemoteApplicationEvent {
-
-    public SmsChannelRefreshMessage() {
-    }
-    public SmsChannelRefreshMessage(Object source, String originService, String destinationService) {
-        super(source, originService, DEFAULT_DESTINATION_FACTORY.getDestination(destinationService));
-    }
-
-}

+ 0 - 23
tr-modules/tr-module-system/src/main/java/cn/tr/module/sys/core/mq/message/sms/SmsTemplateRefreshMessage.java

@@ -1,23 +0,0 @@
-package cn.tr.module.sys.core.mq.message.sms;
-
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import org.springframework.cloud.bus.event.RemoteApplicationEvent;
-
-/**
- * 短信模板的数据刷新 Message
- *
- * @author 芋道源码
- */
-@Data
-@EqualsAndHashCode(callSuper = true)
-public class SmsTemplateRefreshMessage extends RemoteApplicationEvent {
-
-    public SmsTemplateRefreshMessage() {
-    }
-
-    public SmsTemplateRefreshMessage(Object source, String originService, String destinationService) {
-        super(source, originService, DEFAULT_DESTINATION_FACTORY.getDestination(destinationService));
-    }
-
-}

+ 0 - 30
tr-modules/tr-module-system/src/main/java/cn/tr/module/sys/core/mq/producer/sms/SmsProducer.java

@@ -1,30 +0,0 @@
-package cn.tr.module.sys.core.mq.producer.sms;
-
-import cn.tr.module.sys.core.mq.message.sms.SmsChannelRefreshMessage;
-import cn.tr.module.sys.core.mq.message.sms.SmsTemplateRefreshMessage;
-import cn.tr.plugin.bus.AbstractBusProducer;
-import org.springframework.stereotype.Component;
-
-/**
- * @ClassName : SmsProducer
- * @Description :
- * @Author : LF
- * @Date: 2023年04月25日
- */
-@Component
-public class SmsProducer extends AbstractBusProducer {
-
-    /**
-     * 发送 {@link SmsChannelRefreshMessage} 消息
-     */
-    public void sendSmsChannelRefreshMessage() {
-        publishEvent(new SmsChannelRefreshMessage(this, getBusId(), selfDestinationService()));
-    }
-
-    /**
-     * 发送 {@link SmsTemplateRefreshMessage} 消息
-     */
-    public void sendSmsTemplateRefreshMessage() {
-        publishEvent(new SmsTemplateRefreshMessage(this, getBusId(), selfDestinationService()));
-    }
-}

+ 0 - 45
tr-modules/tr-module-system/src/main/java/cn/tr/module/sys/core/mq/producer/sms/SmsSupplier.java

@@ -1,45 +0,0 @@
-package cn.tr.module.sys.core.mq.producer.sms;
-
-import cn.tr.core.pojo.KeyValue;
-import cn.tr.module.sys.core.mq.message.sms.SmsSendMessage;
-import cn.tr.plugin.bus.AbstractBusProducer;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Component;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Sinks;
-import java.util.List;
-import java.util.function.Supplier;
-
-/**
- * Sms 短信相关消息的 Producer
- *
- * @author zzf
- * @date 2021/3/9 16:35
- */
-@Slf4j
-@Component
-public class SmsSupplier extends AbstractBusProducer implements Supplier<Flux<SmsSendMessage>> {
-    private Sinks.Many<SmsSendMessage> sink = Sinks.many().unicast().onBackpressureBuffer();
-    /**
-     * 发送 {@link SmsSendMessage} 消息
-     *
-     * @param logId 短信日志编号
-     * @param mobile 手机号
-     * @param channelId 渠道编号
-     * @param apiTemplateId 短信模板编号
-     * @param templateParams 短信模板参数
-     */
-    public void sendSmsSendMessage(String logId, String mobile,
-                                   String channelId, String apiTemplateId, List<KeyValue<String, Object>> templateParams) {
-        SmsSendMessage message = new SmsSendMessage().setLogId(logId).setMobile(mobile);
-        message.setChannelId(channelId).setApiTemplateId(apiTemplateId).setTemplateParams(templateParams);
-        sink.tryEmitNext(message);
-    }
-
-    @Override
-    public Flux<SmsSendMessage> get() {
-        return sink.asFlux().log();
-    }
-
-
-}

+ 2 - 2
tr-modules/tr-module-system/src/main/java/cn/tr/module/sys/core/mq/message/sms/SmsSendMessage.java → tr-modules/tr-module-system/src/main/java/cn/tr/module/sys/sms/dto/SmsSendMessageDTO.java

@@ -1,4 +1,4 @@
-package cn.tr.module.sys.core.mq.message.sms;
+package cn.tr.module.sys.sms.dto;
 
 import cn.tr.core.pojo.KeyValue;
 import lombok.Data;
@@ -14,7 +14,7 @@ import java.util.List;
  */
 @Data
 @Accessors(chain = true)
-public class SmsSendMessage {
+public class SmsSendMessageDTO {
 
     /**
      * 短信日志编号

+ 2 - 2
tr-modules/tr-module-system/src/main/java/cn/tr/module/sys/sms/service/ISmsSendService.java

@@ -1,6 +1,6 @@
 package cn.tr.module.sys.sms.service;
 
-import cn.tr.module.sys.core.mq.message.sms.SmsSendMessage;
+import cn.tr.module.sys.sms.dto.SmsSendMessageDTO;
 
 import java.util.Map;
 
@@ -67,7 +67,7 @@ public interface ISmsSendService {
      *
      * @param message 短信
      */
-    void doSendSms(SmsSendMessage message);
+    void doSendSms(SmsSendMessageDTO message);
 
     /**
      * 接收短信的接收结果

+ 30 - 5
tr-modules/tr-module-system/src/main/java/cn/tr/module/sys/sms/service/impl/SmsSendServiceImpl.java

@@ -7,8 +7,8 @@ import cn.tr.core.enums.UserTypeEnum;
 import cn.tr.core.exception.ServiceException;
 import cn.tr.core.exception.TRExcCode;
 import cn.tr.core.pojo.KeyValue;
-import cn.tr.module.sys.core.mq.message.sms.SmsSendMessage;
-import cn.tr.module.sys.core.mq.producer.sms.SmsSupplier;
+import cn.tr.core.utils.JsonUtils;
+import cn.tr.module.sys.sms.dto.SmsSendMessageDTO;
 import cn.tr.module.sys.sms.dto.SysSmsChannelDTO;
 import cn.tr.module.sys.sms.dto.SysSmsTempDTO;
 import cn.tr.module.sys.sms.service.ISmsSendService;
@@ -17,6 +17,8 @@ import cn.tr.module.sys.sms.service.ISysSmsLogService;
 import cn.tr.module.sys.sms.service.ISysSmsTempService;
 import cn.tr.module.sys.user.dto.SysUserDTO;
 import cn.tr.module.sys.user.service.ISysUserService;
+import cn.tr.plugin.eventbus.annotation.Subscribe;
+import cn.tr.plugin.eventbus.config.EventBus;
 import cn.tr.plugin.sms.bo.SmsCommonResult;
 import cn.tr.plugin.sms.bo.SmsReceiveRespBO;
 import cn.tr.plugin.sms.bo.SmsSendRespBO;
@@ -50,7 +52,7 @@ public class SmsSendServiceImpl implements ISmsSendService {
 
     private final SmsClientFactory smsClientFactory;
 
-    private final SmsSupplier smsSupplier;
+    private final EventBus eventBus;
     @Override
     public String sendSingleSmsToAdmin(String mobile, String userId, String templateCode, Map<String, Object> templateParams) {
         if (StrUtil.isEmpty(mobile)) {
@@ -89,14 +91,14 @@ public class SmsSendServiceImpl implements ISmsSendService {
         String sendLogId = smsLogService.createSmsLog(mobile, userId, userType, isSend, template, content, templateParams);
         // 发送 MQ 消息,异步执行发送短信
         if (isSend) {
-            smsSupplier.sendSmsSendMessage(sendLogId, mobile, template.getChannelId(),
+            sendSmsSendMessage(sendLogId, mobile, template.getChannelId(),
                     template.getApiTempCode(), newTemplateParams);
         }
         return sendLogId;
     }
 
     @Override
-    public void doSendSms(SmsSendMessage message) {
+    public void doSendSms(SmsSendMessageDTO message) {
         // 获得渠道对应的 SmsClient 客户端
         SmsClient smsClient = smsClientFactory.getSmsClientByChannelId(message.getChannelId());
         if (Objects.isNull(smsClient)) {
@@ -172,5 +174,28 @@ public class SmsSendServiceImpl implements ISmsSendService {
         }).collect(Collectors.toList());
     }
 
+    @Subscribe("/sms/send")
+    public void subscribeSendMsg(SmsSendMessageDTO sendMessage){
+        if(log.isDebugEnabled()){
+            log.debug("发送短信消息:{}", JsonUtils.toJsonString(sendMessage));
+        }
+        doSendSms(sendMessage);
+    }
+
+    /**
+     * 发送 {@link SmsSendMessageDTO} 消息
+     *
+     * @param logId 短信日志编号
+     * @param mobile 手机号
+     * @param channelId 渠道编号
+     * @param apiTemplateId 短信模板编号
+     * @param templateParams 短信模板参数
+     */
+    public void sendSmsSendMessage(String logId, String mobile,
+                                   String channelId, String apiTemplateId, List<KeyValue<String, Object>> templateParams) {
+        SmsSendMessageDTO message = new SmsSendMessageDTO().setLogId(logId).setMobile(mobile);
+        message.setChannelId(channelId).setApiTemplateId(apiTemplateId).setTemplateParams(templateParams);
+        eventBus.publishEx("/sms/send",message);
+    }
 
 }

+ 8 - 5
tr-modules/tr-module-system/src/main/java/cn/tr/module/sys/sms/service/impl/SysSmsChannelServiceImpl.java

@@ -1,7 +1,8 @@
 package cn.tr.module.sys.sms.service.impl;
 
 import cn.tr.core.utils.JsonUtils;
-import cn.tr.module.sys.core.mq.producer.sms.SmsProducer;
+import cn.tr.plugin.eventbus.annotation.Subscribe;
+import cn.tr.plugin.eventbus.config.EventBus;
 import cn.tr.plugin.sms.config.SmsClientFactory;
 import cn.tr.plugin.sms.properties.SmsChannelProperties;
 import lombok.extern.slf4j.Slf4j;
@@ -28,6 +29,7 @@ import javax.annotation.PostConstruct;
 @Service
 @Slf4j
 public class SysSmsChannelServiceImpl implements ISysSmsChannelService {
+    private final String CHANNEL_REFRESH_TOPIC="/sms/channel/refresh";
     @Autowired
     private SysSmsChannelRepository baseRepository;
 
@@ -35,10 +37,11 @@ public class SysSmsChannelServiceImpl implements ISysSmsChannelService {
     private SmsClientFactory smsClientFactory;
 
     @Autowired
-    private SmsProducer smsProducer;
+    private EventBus eventBus;
 
     @Override
     @PostConstruct
+    @Subscribe(CHANNEL_REFRESH_TOPIC)
     public void initLocalCache() {
         // 第一步:查询数据
         List<SysSmsChannelPO> channels = baseRepository.selectList(new LambdaQueryWrapper<>());
@@ -95,7 +98,7 @@ public class SysSmsChannelServiceImpl implements ISysSmsChannelService {
     public boolean updateSysSmsChannelById(SysSmsChannelDTO source){
         boolean result = baseRepository.updateById(SysSmsChannelMapper.INSTANCE.convertPO(source)) != 0;
         if(result){
-            smsProducer.sendSmsChannelRefreshMessage();
+            eventBus.publishShare(CHANNEL_REFRESH_TOPIC,new Object());
         }
         return result;
     };
@@ -111,7 +114,7 @@ public class SysSmsChannelServiceImpl implements ISysSmsChannelService {
     public boolean insertSysSmsChannel(SysSmsChannelDTO source){
         boolean result = baseRepository.insert(SysSmsChannelMapper.INSTANCE.convertPO(source)) != 0;
         if(result){
-            smsProducer.sendSmsChannelRefreshMessage();
+            eventBus.publishShare(CHANNEL_REFRESH_TOPIC,new Object());
         }
         return result;
     };
@@ -126,7 +129,7 @@ public class SysSmsChannelServiceImpl implements ISysSmsChannelService {
     @Transactional(rollbackFor = Exception.class)
     public int removeSysSmsChannelByIds(Collection<String> ids){
         int result = baseRepository.deleteBatchIds(ids);
-        smsProducer.sendSmsChannelRefreshMessage();
+        eventBus.publishShare(CHANNEL_REFRESH_TOPIC,new Object());
         return result;
     };
 }

+ 8 - 5
tr-modules/tr-module-system/src/main/java/cn/tr/module/sys/sms/service/impl/SysSmsTempServiceImpl.java

@@ -4,7 +4,6 @@ import cn.hutool.core.collection.CollectionUtil;
 import cn.hutool.core.util.StrUtil;
 import cn.tr.core.exception.ServiceException;
 import cn.tr.core.exception.TRExcCode;
-import cn.tr.module.sys.core.mq.producer.sms.SmsProducer;
 import cn.tr.module.sys.sms.mapper.SysSmsTempMapper;
 import cn.tr.module.sys.sms.dto.SysSmsTempDTO;
 import cn.tr.module.sys.sms.dto.SysSmsTempQueryDTO;
@@ -12,6 +11,8 @@ import cn.tr.module.sys.sms.po.SysSmsTempPO;
 import cn.tr.module.sys.sms.repository.SysSmsTempRepository;
 import cn.tr.module.sys.sms.service.ISysSmsTempService;
 import cn.tr.module.sys.user.enums.CreateEnum;
+import cn.tr.plugin.eventbus.annotation.Subscribe;
+import cn.tr.plugin.eventbus.config.EventBus;
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -33,11 +34,12 @@ import java.util.stream.Collectors;
 @Service
 @Slf4j
 public class SysSmsTempServiceImpl implements ISysSmsTempService {
+    private final String TEMP_REFRESH_TOPIC="/sms/temp/refresh";
     @Autowired
     private SysSmsTempRepository baseRepository;
 
     @Autowired
-    private SmsProducer smsProducer;
+    private EventBus eventBus;
 
     private final PropertyPlaceholderHelper propertyPlaceholderHelper = new PropertyPlaceholderHelper("${", "}");
     /**
@@ -47,6 +49,7 @@ public class SysSmsTempServiceImpl implements ISysSmsTempService {
 
     @Override
     @PostConstruct
+    @Subscribe(TEMP_REFRESH_TOPIC)
     public void initLocalCache() {
         List<SysSmsTempPO> temps = baseRepository.selectList(new LambdaQueryWrapper<>());
         log.info("[initLocalCache][缓存短信模版,数量为:{}]", temps.size());
@@ -88,7 +91,7 @@ public class SysSmsTempServiceImpl implements ISysSmsTempService {
         validateSmsTempSource(source);
         boolean result = baseRepository.updateById(SysSmsTempMapper.INSTANCE.convertPO(source)) != 0;
         if(result){
-            smsProducer.sendSmsTemplateRefreshMessage();
+            eventBus.publishShare(TEMP_REFRESH_TOPIC,new Object());
         }
         return result;
     };
@@ -105,7 +108,7 @@ public class SysSmsTempServiceImpl implements ISysSmsTempService {
         validateSmsTempSource(source);
         boolean result = baseRepository.insert(SysSmsTempMapper.INSTANCE.convertPO(source)) != 0;
         if(result){
-            smsProducer.sendSmsTemplateRefreshMessage();
+            eventBus.publishShare(TEMP_REFRESH_TOPIC,new Object());
         }
         return result;
     };
@@ -126,7 +129,7 @@ public class SysSmsTempServiceImpl implements ISysSmsTempService {
             }
         }
         int result = baseRepository.deleteBatchIds(ids);
-        smsProducer.sendSmsTemplateRefreshMessage();
+        eventBus.publishShare(TEMP_REFRESH_TOPIC,new Object());
         return result;
     }
 

+ 10 - 0
tr-modules/tr-module-system/src/main/java/cn/tr/module/sys/storage/controller/SysStorageConfigController.java

@@ -77,4 +77,14 @@ public class SysStorageConfigController extends BaseController{
     public CommonResult<Integer> delete(@RequestBody Collection<String> ids) {
         return CommonResult.success(sysStorageConfigService.removeSysStorageConfigByIds(ids));
     }
+
+    @ApiOperationSupport(author = "lf",order = 6)
+    @ApiOperation(value="设置主存储配置",notes = "权限: storage:config:edit")
+    @PostMapping("/master/{configId}")
+    @OperateLog
+    @SaCheckPermission("storage:config:edit")
+    public CommonResult<Boolean> master(@PathVariable("configId") String configId) {
+        sysStorageConfigService.setMasterConfig(configId);
+        return CommonResult.success(true);
+    }
 }

+ 6 - 0
tr-modules/tr-module-system/src/main/java/cn/tr/module/sys/storage/service/ISysStorageConfigService.java

@@ -61,6 +61,12 @@ public interface ISysStorageConfigService{
      */
     int removeSysStorageConfigByIds(Collection<String> ids);
 
+    /**
+     * 设置主配置
+     * @param configId
+     */
+    void setMasterConfig(String configId);
+
     /**
      * 查询默认配置
      * @return

+ 2 - 0
tr-modules/tr-module-system/src/main/java/cn/tr/module/sys/storage/service/impl/StorageFileServiceImpl.java

@@ -10,6 +10,7 @@ import cn.tr.module.sys.storage.dto.SysStorageRecordDTO;
 import cn.tr.module.sys.storage.service.IStorageFileService;
 import cn.tr.module.sys.storage.service.ISysStorageConfigService;
 import cn.tr.module.sys.storage.service.ISysStorageRecordService;
+import cn.tr.plugin.eventbus.annotation.Subscribe;
 import cn.tr.plugin.file.config.FileClient;
 import cn.tr.plugin.file.config.FileClientConfig;
 import cn.tr.plugin.file.config.FileClientFactory;
@@ -33,6 +34,7 @@ import java.util.stream.Collectors;
  */
 @Service
 public class StorageFileServiceImpl implements IStorageFileService {
+
     //默认配置
     private SysStorageConfigDTO defaultConfig;
 

+ 40 - 7
tr-modules/tr-module-system/src/main/java/cn/tr/module/sys/storage/service/impl/SysStorageConfigServiceImpl.java

@@ -4,10 +4,14 @@ import cn.tr.core.exception.ServiceException;
 import cn.tr.core.exception.TRExcCode;
 import cn.tr.core.utils.JsonUtils;
 import cn.tr.core.utils.ValidationUtils;
+import cn.tr.module.sys.storage.service.IStorageFileService;
+import cn.tr.plugin.eventbus.annotation.Subscribe;
+import cn.tr.plugin.eventbus.config.EventBus;
 import cn.tr.plugin.file.config.FileClientConfig;
 import cn.tr.plugin.file.enums.FileStorageEnum;
-import lombok.AllArgsConstructor;
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Lazy;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
@@ -18,8 +22,6 @@ import cn.tr.module.sys.storage.dto.SysStorageConfigQueryDTO;
 import java.util.*;
 import cn.tr.module.sys.storage.service.ISysStorageConfigService;
 import cn.tr.module.sys.storage.mapper.SysStorageConfigMapper;
-
-import javax.validation.ValidationException;
 import javax.validation.Validator;
 
 /**
@@ -29,12 +31,25 @@ import javax.validation.Validator;
  * @date  2023/05/09 14:29
  **/
 @Service
-@AllArgsConstructor
 public class SysStorageConfigServiceImpl implements ISysStorageConfigService {
+    private final String STORAGE_DEFAULT_TOPIC="/storage/default/refresh";
+    @Autowired
+    private SysStorageConfigRepository baseRepository;
+
+    @Autowired
+    private  Validator validator;
+
+    @Autowired
+    private  EventBus eventBus;
 
-    private final SysStorageConfigRepository baseRepository;
+    @Autowired
+    @Lazy
+    private IStorageFileService storageFileService;
 
-    private final Validator validator;
+    @Subscribe(STORAGE_DEFAULT_TOPIC)
+    public void storageUpdate(){
+        storageFileService.initDefaultConfig();
+    }
 
     /**
      * 根据条件查询文件存储配置
@@ -94,11 +109,13 @@ public class SysStorageConfigServiceImpl implements ISysStorageConfigService {
     @Override
     @Transactional(rollbackFor = Exception.class)
     public boolean insertSysStorageConfig(SysStorageConfigDTO source){
+        Long count = baseRepository.selectCount(new LambdaQueryWrapper<>());
+        boolean master=count==0;
         SysStorageConfigPO config = SysStorageConfigMapper.INSTANCE.convertPO(source);
         config
                 .setConfig(parseClientConfig(config.getType(), source.getConfig()))
                 // 默认非 master
-                .setMaster(false);
+                .setMaster(master);
         return baseRepository.insert(config)!=0;
     };
 
@@ -120,6 +137,22 @@ public class SysStorageConfigServiceImpl implements ISysStorageConfigService {
         return baseRepository.deleteBatchIds(ids);
     }
 
+    @Override
+    @Transactional(rollbackFor = Exception.class)
+    public void setMasterConfig(String configId) {
+        //将主配置设置为副配置
+        baseRepository.update(null,new LambdaUpdateWrapper<SysStorageConfigPO>()
+                .eq(SysStorageConfigPO::getMaster,true)
+                .set(SysStorageConfigPO::getMaster,false));
+
+        //设置主配置
+        baseRepository.update(null,new LambdaUpdateWrapper<SysStorageConfigPO>()
+                .eq(SysStorageConfigPO::getId,configId)
+                .set(SysStorageConfigPO::getMaster,true));
+
+        eventBus.publishShare(STORAGE_DEFAULT_TOPIC,new Object());
+    }
+
     @Override
     public SysStorageConfigDTO selectDefault() {
         return SysStorageConfigMapper.INSTANCE.convertDto(baseRepository.selectOne(new LambdaQueryWrapper<SysStorageConfigPO>()

+ 0 - 1
tr-plugins/pom.xml

@@ -37,7 +37,6 @@
         <module>tr-spring-boot-starter-plugin-biz-bean-mapper</module>
         <module>tr-spring-boot-starter-plugin-biz-constant</module>
         <module>tr-spring-boot-starter-plugin-numbering-strategy</module>
-        <module>tr-spring-boot-starter-plugin-bus</module>
     </modules>
 
 </project>

+ 0 - 23
tr-plugins/tr-spring-boot-starter-plugin-bus/pom.xml

@@ -1,23 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <parent>
-        <artifactId>tr-plugins</artifactId>
-        <groupId>cn.tr</groupId>
-        <version>${revision}</version>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
-
-    <version>${revision}</version>
-    <artifactId>tr-spring-boot-starter-plugin-bus</artifactId>
-
-    <description>使用消息总线作为系统内部配置动态刷新</description>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.springframework.cloud</groupId>
-            <artifactId>spring-cloud-starter-bus-amqp</artifactId>
-        </dependency>
-    </dependencies>
-</project>

+ 0 - 41
tr-plugins/tr-spring-boot-starter-plugin-bus/src/main/java/cn/tr/plugin/bus/AbstractBusProducer.java

@@ -1,41 +0,0 @@
-package cn.tr.plugin.bus;
-
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.cloud.bus.ServiceMatcher;
-import org.springframework.cloud.bus.event.RemoteApplicationEvent;
-import org.springframework.context.ApplicationEventPublisher;
-
-import javax.annotation.Resource;
-
-/**
- * 基于 Spring Cloud Bus 实现的 Producer 抽象类
- *
- * @author tr
- */
-public abstract class AbstractBusProducer {
-
-    @Resource
-    protected ApplicationEventPublisher applicationEventPublisher;
-
-    @Resource
-    protected ServiceMatcher serviceMatcher;
-
-    @Value("${spring.application.name}")
-    protected String applicationName;
-
-    protected void publishEvent(RemoteApplicationEvent event) {
-        applicationEventPublisher.publishEvent(event);
-    }
-
-    /**
-     * @return 只广播给自己服务的实例
-     */
-    protected String selfDestinationService() {
-        return applicationName + ":**";
-    }
-
-    protected String getBusId() {
-        return serviceMatcher.getBusId();
-    }
-
-}

+ 0 - 17
tr-plugins/tr-spring-boot-starter-plugin-eventbus/src/main/java/cn/tr/plugin/EvenBusApplication.java

@@ -1,17 +0,0 @@
-package cn.tr.plugin;
-
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-
-/**
- * @ClassName : EvenBusApplication
- * @Description :
- * @Author : LF
- * @Date: 2023年05月11日
- */
-@SpringBootApplication
-public class EvenBusApplication {
-    public static void main(String[] args) {
-        SpringApplication.run(EvenBusApplication.class);
-    }
-}

+ 1 - 4
tr-plugins/tr-spring-boot-starter-plugin-eventbus/src/main/java/cn/tr/plugin/eventbus/TrEventBusAutoConfiguration.java

@@ -7,8 +7,6 @@ import cn.tr.plugin.eventbus.config.SubscribeListenerAnnotationBeanPostProcessor
 import cn.tr.plugin.eventbus.mq.consumer.EvenBusConsumer;
 import cn.tr.plugin.eventbus.mq.message.EvenBusMessageEvent;
 import cn.tr.plugin.eventbus.mq.producer.EvenBusProducer;
-import cn.tr.plugin.eventbus.utils.EventBusStrategy;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.AutoConfigureAfter;
 import org.springframework.cloud.bus.ServiceMatcher;
@@ -46,8 +44,7 @@ public class TrEventBusAutoConfiguration {
     }
 
     @Bean
-    public SubscribeListenerAnnotationBeanPostProcessor subscribeListenerAnnotationBeanPostProcessor(EventBus eventBus, ObjectMapper objectMapper){
-        EventBusStrategy.tr.objectMapperSupplier=()->objectMapper;
+    public SubscribeListenerAnnotationBeanPostProcessor subscribeListenerAnnotationBeanPostProcessor(EventBus eventBus){
         return new SubscribeListenerAnnotationBeanPostProcessor(eventBus);
     }
 }

+ 34 - 11
tr-plugins/tr-spring-boot-starter-plugin-eventbus/src/main/java/cn/tr/plugin/eventbus/config/EventBus.java

@@ -57,6 +57,28 @@ public interface EventBus {
      */
     void subscribe(Set<String> topics, ProxyMessageListener messageListener);
 
+    /**
+     * 发布共享主题消息
+     * @param topic
+     * @param message
+     * @param <T>
+     * @return
+     */
+    default  <T> Long publishShare(String topic, T message){
+        return publish(CollectionUtil.newHashSet(topic),()->message,true);
+    };
+
+    /**
+     * 发布排他主题消息(仅本机使用)
+     * @param topic
+     * @param message
+     * @param <T>
+     * @return
+     */
+    default  <T> Long publishEx(String topic, T message){
+        return publish(CollectionUtil.newHashSet(topic),()->message,false);
+    };
+
     /**
      * 向主题中发布消息
      * @param topics 发布主题
@@ -64,16 +86,16 @@ public interface EventBus {
      * @param <T>
      * @return
      */
-    default  <T> Long publish(Set<String> topics, T message){
-        return publish(topics,()->message);
+    default  <T> Long publish(Set<String> topics, T message,boolean share){
+        return publish(topics,()->message,share);
     };
 
-    default  <T> Long publish(String topic, T message){
-        return publish(CollectionUtil.newHashSet(topic),()->message);
+    default  <T> Long publish(String topic, T message,boolean share){
+        return publish(CollectionUtil.newHashSet(topic),()->message,share);
     };
 
-    default  <T> Long publish(String topic, Supplier<T> supplierMsg){
-        return publish(CollectionUtil.newHashSet(topic),supplierMsg);
+    default  <T> Long publish(String topic, Supplier<T> supplierMsg,boolean share){
+        return publish(CollectionUtil.newHashSet(topic),supplierMsg,share);
     };
 
     /**
@@ -83,20 +105,21 @@ public interface EventBus {
      * @param <T>
      * @return
      */
-    default  <T> Long publish(Set<String> topics, Supplier<T> supplierMsg){
+    default  <T> Long publish(Set<String> topics, Supplier<T> supplierMsg,boolean share){
         return topics.stream()
                 .map(topic->new TopicPayload(supplierMsg.get(),topic))
-                .map(this::publish)
+                .map(msg->publish(msg,share))
                 .mapToLong(Long::intValue)
                 .sum();
     };
 
     /**
      * 向主题中发布消息
-     * @param msgs 发布消息
-     * @param
+     * @param msg 发布消息
+     * @param share 是否为共享主题 ,当主题为共享主题,集群所有订阅该消息的消费者都会进行消费,
+     *                              当主题不为共享主题时,集群中相同的主题仅有一个会进行消费
      * @return
      */
-    Long publish(TopicPayload msgs);
+    Long publish(TopicPayload msg,boolean share);
 
 }

+ 113 - 0
tr-plugins/tr-spring-boot-starter-plugin-eventbus/src/main/java/cn/tr/plugin/eventbus/config/StdEventBus.java

@@ -0,0 +1,113 @@
+package cn.tr.plugin.eventbus.config;
+
+
+import cn.hutool.core.collection.CollectionUtil;
+import cn.hutool.core.util.StrUtil;
+import cn.tr.plugin.eventbus.mq.consumer.EvenBusConsumer;
+import cn.tr.plugin.eventbus.mq.producer.EvenBusProducer;
+import cn.tr.plugin.eventbus.proxy.ProxyMessageListener;
+import cn.tr.plugin.eventbus.mq.message.TopicPayload;
+import cn.tr.plugin.eventbus.utils.TopicUtils;
+import lombok.extern.slf4j.Slf4j;
+import java.util.*;
+
+/**
+ * @ClassName : StdEventBus
+ * @Description : 标准消息总线
+ * @Author : LF
+ * @Date: 2023年03月17日
+ */
+@Slf4j
+public class StdEventBus implements EventBus {
+    /**
+     * 精准匹配
+     */
+    private static final Map<String,List<ProxyMessageListener>> allMatchMap = new HashMap<>();
+
+    /**
+     * 正则映射
+     */
+    private static final Map<String,List<ProxyMessageListener>> regexMatchMap = new HashMap<>();
+
+    private final EvenBusProducer evenBusProducer;
+    public StdEventBus(EvenBusConsumer evenBusConsumer,EvenBusProducer evenBusProducer) {
+        this.evenBusProducer=evenBusProducer;
+
+        evenBusConsumer.getMsgSink()
+                .asFlux()
+                .doOnNext(this::doPublish)
+                .subscribe();
+    }
+
+    @Override
+    public void subscribe(Set<String> topics, ProxyMessageListener messageListener) {
+        topics.forEach(topic -> {
+            Map<String,List<ProxyMessageListener>> topicMap=null;
+            if (TopicUtils.isRegex(topic)) {
+                topicMap=regexMatchMap;
+            }else {
+                topicMap=allMatchMap;
+            }
+            //保存订阅消息消费者
+            topicMap.compute(topic,(k,listeners)->{
+                listeners= Objects.isNull(listeners)?new ArrayList<>():listeners;
+                listeners.add(messageListener);
+                return listeners;
+            });
+        });
+    }
+
+    @Override
+    public Long publish(TopicPayload payload,boolean share) {
+        if(share){
+            evenBusProducer.sendEvenBusMsg(payload);
+        }else {
+            doPublish(payload);
+        }
+        return 1L;
+    }
+
+    private Long doPublish(TopicPayload payload){
+        return publishAllMatchTopic(payload)+publishRegexMatchTopic(payload);
+    }
+
+    private Long publishAllMatchTopic(TopicPayload payload){
+        return doPublish(allMatchMap,payload,false);
+    }
+
+    private Long publishRegexMatchTopic(TopicPayload payload){
+        return doPublish(regexMatchMap,payload,true);
+    }
+
+    /**
+     * 真正发布主题消息的地方
+     * @param topicMap
+     * @param payload
+     * @param regex 是否进行正则匹配
+     * @return
+     */
+    private Long doPublish(Map<String, List<ProxyMessageListener>> topicMap,TopicPayload payload,boolean regex){
+        String payloadTopic = payload.getTopic();
+        ArrayList<ProxyMessageListener> publishListeners = new ArrayList<>();
+        topicMap.forEach((listenerTopic,listeners)->{
+            boolean match=false;
+            if(regex){
+                match=TopicUtils.match(listenerTopic,payloadTopic);
+            }else {
+                match= StrUtil.equals(listenerTopic,payloadTopic);
+            }
+            if(match){
+                publishListeners.addAll(listeners);
+            }
+        });
+        publishListeners.parallelStream()
+                .forEach(ln->{
+                    try {
+                        ln.onMessage(payload);
+                    }catch (Exception e){
+                        log.error("主题{}消息发布失败,",payloadTopic,e);
+                    }
+                });
+        return Integer.valueOf(CollectionUtil.size(publishListeners)).longValue();
+    }
+}

+ 5 - 1
tr-plugins/tr-spring-boot-starter-plugin-eventbus/src/main/java/cn/tr/plugin/eventbus/mq/message/TopicPayload.java

@@ -24,7 +24,11 @@ public class TopicPayload implements Serializable {
 
     public TopicPayload(Object payload, String topic) {
         this.id = IdUtil.getSnowflakeNextId();
-        this.payload = JsonUtils.toJsonString(payload);
+        if(payload instanceof String){
+            this.payload= (String) payload;
+        }else {
+            this.payload = JsonUtils.toJsonString(payload);
+        }
         this.createTime = System.currentTimeMillis();
         this.topic = topic;
     }

+ 2 - 5
tr-plugins/tr-spring-boot-starter-plugin-eventbus/src/main/java/cn/tr/plugin/eventbus/proxy/ProxyMessageListener.java

@@ -1,10 +1,9 @@
 package cn.tr.plugin.eventbus.proxy;
 
 import cn.hutool.core.util.ModifierUtil;
+import cn.tr.core.utils.JsonUtils;
 import cn.tr.core.utils.Proxy;
 import cn.tr.plugin.eventbus.mq.message.TopicPayload;
-import cn.tr.plugin.eventbus.utils.EventBusStrategy;
-import com.fasterxml.jackson.databind.ObjectReader;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.aop.support.AopUtils;
 
@@ -24,7 +23,6 @@ public class ProxyMessageListener {
     private final Object target;
     private final Method method;
     private final BiFunction<Object, Object, Object> proxy;
-    private ObjectReader objectReader=null;
 
     public ProxyMessageListener(Object target,Method method){
         this.target = target;
@@ -35,7 +33,6 @@ public class ProxyMessageListener {
         }
         if (parameterTypes.length == 1) {
             paramType = parameterTypes[0];
-            objectReader =  EventBusStrategy.tr.getObjectMapper().readerFor(paramType);
         } else {
             paramType = Void.class;
         }
@@ -78,7 +75,7 @@ public class ProxyMessageListener {
                 result=proxy.apply(target,paramVoid?null:payload);
             }
             else {
-                result=proxy.apply(target,paramVoid?null:objectReader.readValue(message));
+                result=proxy.apply(target,paramVoid?null: JsonUtils.parseObject(message,paramType));
             }
             return paramVoid?null:result;
         } catch (Exception e){

+ 0 - 23
tr-plugins/tr-spring-boot-starter-plugin-eventbus/src/main/java/cn/tr/plugin/eventbus/utils/EventBusStrategy.java

@@ -1,23 +0,0 @@
-package cn.tr.plugin.eventbus.utils;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import java.util.function.Supplier;
-
-/**
- * @ClassName : EvenBusUtils
- * @Description :
- * @Author : LF
- * @Date: 2023年03月17日
- */
-
-public class EventBusStrategy {
-    private EventBusStrategy() {
-    }
-    public Supplier<ObjectMapper> objectMapperSupplier=()->new ObjectMapper();
-    public static EventBusStrategy tr =new EventBusStrategy();
-
-    public ObjectMapper getObjectMapper(){
-        return objectMapperSupplier.get();
-    }
-}

+ 8 - 4
tr-plugins/tr-spring-boot-starter-plugin-eventbus/src/test/java/cn/tr/plugin/eventbus/SpringBootSubPubTest.java

@@ -1,6 +1,5 @@
 package cn.tr.plugin.eventbus;
 
-import cn.tr.plugin.EvenBusApplication;
 import cn.tr.plugin.eventbus.annotation.Subscribe;
 import cn.tr.plugin.eventbus.mq.message.TopicPayload;
 import cn.tr.plugin.eventbus.config.EventBus;
@@ -12,6 +11,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.context.annotation.Import;
 import org.springframework.test.context.ActiveProfiles;
@@ -25,10 +25,14 @@ import java.util.concurrent.atomic.AtomicInteger;
  * @Author : LF
  * @Date: 2023年03月19日
  */
-@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, classes = EvenBusApplication.class)
+@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, classes = SpringBootSubPubTest.Application.class)
 @ActiveProfiles("unit-test") // 设置使用 application-unit-test 配置文件
 @Import({ SpringBootSubPubTest.Sub.class})
-public class SpringBootSubPubTest  {
+public class SpringBootSubPubTest {
+
+    @SpringBootApplication(scanBasePackages = "cn.tr.plugin.eventbus")
+    public static class Application {
+    }
 
     @Autowired
     private EventBus eventBus;
@@ -44,7 +48,7 @@ public class SpringBootSubPubTest  {
     @Test
     public void pub(){
         for (int i = 0; i < 10; i++) {
-            eventBus.publish("test"+i,User.of("123","测试"));
+            eventBus.publish("test"+i,User.of("123","测试"),true);
         }
         while (true){
 

+ 0 - 4
tr-test/src/main/java/cn/tr/test/WebApplication.java

@@ -1,12 +1,9 @@
 package cn.tr.test;
 
 import cn.dev33.satoken.strategy.SaStrategy;
-import cn.tr.module.sys.core.mq.message.sms.SmsChannelRefreshMessage;
-import cn.tr.module.sys.core.mq.message.sms.SmsTemplateRefreshMessage;
 import org.mybatis.spring.annotation.MapperScan;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.cloud.bus.jackson.RemoteApplicationEventScan;
 import org.springframework.context.annotation.Bean;
 import org.springframework.scheduling.annotation.EnableAsync;
 
@@ -18,7 +15,6 @@ import org.springframework.scheduling.annotation.EnableAsync;
  */
 @SpringBootApplication(scanBasePackages = "cn.tr.module.*")
 @MapperScan({"cn.tr.module.*.*.repository","cn.tr.module.*.modular.*.mapper"})
-@RemoteApplicationEventScan(basePackageClasses = {SmsChannelRefreshMessage.class, SmsTemplateRefreshMessage.class})
 @EnableAsync
 public class WebApplication {
     public static void main(String[] args) {

+ 0 - 10
tr-test/src/main/resources/application-stream.yml

@@ -4,16 +4,6 @@ spring:
       rabbit:
         binder:
           connection-name-prefix: ${spring.application.name}
-      bindings:
-        #短信发送通道名称
-        smsSupplier-out-0:
-          #交换机名称
-          destination: smsSend
-        smsConsumer-in-0:
-          destination: smsSend
-          group: sms               #分组
-    function:
-      definition: smsSupplier;smsConsumer
     bus:
       enabled: true # 是否开启,默认为 true
 #      id: ${spring.application.name}:${server.port} # 编号,Spring Cloud Alibaba 建议使用“应用:端口”的格式