Selaa lähdekoodia

add 延迟时间为0,本机处理

18339543638 3 vuotta sitten
vanhempi
commit
9b17cfb46f

+ 31 - 23
nb-common/delay-queue-common/src/main/java/com/nb/common/queue/delay/RedissonDelayMessageManager.java

@@ -36,38 +36,19 @@ public class RedissonDelayMessageManager implements DelayMessageManager {
 
     private final RDelayedQueue<DelayMessage> delayedQueue;
 
+    private final List<DelayMessageHandler> handlers;
     private static final String NAME="redisson-delay-message-queue";
     @Autowired
     public RedissonDelayMessageManager(RedissonClient redissonClient,
                                        List<DelayMessageHandler> handlers) {
         this.blockingQueue = redissonClient.getBlockingQueue(NAME);
         this.delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
+        this.handlers=handlers;
         if(CollectionUtil.isEmpty(handlers)){
             log.warn("the size of DelayMessageHandler is zero");
         }
         //将头结点取出后触发该方法
-        blockingQueue.subscribeOnElements(i->{
-            //开启新的线程消费,唯一线程消费,不可阻塞该异步线程
-            CompletableFuture.runAsync(()->{
-                log.info("redisson延迟队列开始消费数据,消息id【{}】,消息内容【{}】",i.getMsgId(),JSONUtil.toJsonStr(i));
-                Optional<DelayMessageHandler> delayMessageHandler = handlers.stream().filter(handler -> ObjectUtil.equals(i.getHandlerId(), handler.getId()))
-                        .findFirst()
-                        .map(handler -> {
-                            handler.handle(i);
-                            return handler;
-                        });
-                if(!delayMessageHandler.isPresent()){
-                    log.warn("延迟队列消息处理失败,消息【{}】无相应的处理器处理",JSONUtil.toJsonStr(i));
-                }
-            })
-                    .whenComplete((__,t)->{
-                        if(t==null){
-                            log.info("redisson延迟队列中,数据【{}】消费完成",i.getMsgId());
-                        }else {
-                            log.error("redisson延迟队列中,消费数据【{}】失败,",JSONUtil.toJsonStr(i), t);
-                        }
-                    });
-        });
+        blockingQueue.subscribeOnElements(this::handler);
     }
 
     @PostConstruct
@@ -82,7 +63,12 @@ public class RedissonDelayMessageManager implements DelayMessageManager {
             return;
         }
         log.info("redisson-delay-queue ,add message = 【{}】",JSONUtil.toJsonStr(message));
-        delayedQueue.offerAsync(message, message.getProperties().getExpire(), message.getProperties().getTimeUnit());
+        long expire = message.getProperties().getExpire();
+        if(expire<=0){
+            handler(message);
+        }else {
+            delayedQueue.offerAsync(message, message.getProperties().getExpire(), message.getProperties().getTimeUnit());
+        }
     }
 
     @Override
@@ -94,4 +80,26 @@ public class RedissonDelayMessageManager implements DelayMessageManager {
     public void destroy() {
         delayedQueue.destroy();
     }
+
+    private void handler(DelayMessage message){
+        CompletableFuture.runAsync(()->{
+            log.info("redisson延迟队列开始消费数据,消息id【{}】,消息内容【{}】",message.getMsgId(),JSONUtil.toJsonStr(message));
+            Optional<DelayMessageHandler> delayMessageHandler = handlers.stream().filter(handler -> ObjectUtil.equals(message.getHandlerId(), handler.getId()))
+                    .findFirst()
+                    .map(handler -> {
+                        handler.handle(message);
+                        return handler;
+                    });
+            if(!delayMessageHandler.isPresent()){
+                log.warn("延迟队列消息处理失败,消息【{}】无相应的处理器处理",JSONUtil.toJsonStr(message));
+            }
+        })
+                .whenComplete((__,t)->{
+                    if(t==null){
+                        log.info("redisson延迟队列中,数据【{}】消费完成",message.getMsgId());
+                    }else {
+                        log.error("redisson延迟队列中,消费数据【{}】失败,",JSONUtil.toJsonStr(message), t);
+                    }
+                });
+    }
 }