|
|
@@ -1,30 +1,21 @@
|
|
|
package com.coffee.common.config.websocket.handler;
|
|
|
|
|
|
import cn.hutool.core.collection.CollectionUtil;
|
|
|
-import cn.hutool.core.util.StrUtil;
|
|
|
import com.coffee.common.Constants;
|
|
|
import com.coffee.common.bo.LoginUser;
|
|
|
import com.coffee.common.config.websocket.DefaultMessageListener;
|
|
|
import com.coffee.common.config.websocket.MessagingRequest;
|
|
|
+import com.coffee.common.config.websocket.TopicMessage;
|
|
|
import com.coffee.common.config.websocket.WebSocketConstant;
|
|
|
+import com.coffee.common.util.RedissonUtil;
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.redisson.api.RPatternTopic;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
-import org.springframework.dao.DataAccessException;
|
|
|
-import org.springframework.data.redis.connection.RedisConnection;
|
|
|
-import org.springframework.data.redis.connection.RedisPubSubCommands;
|
|
|
-import org.springframework.data.redis.connection.Subscription;
|
|
|
-import org.springframework.data.redis.core.RedisCallback;
|
|
|
-import org.springframework.data.redis.core.RedisTemplate;
|
|
|
import org.springframework.util.ConcurrentReferenceHashMap;
|
|
|
import org.tio.core.ChannelContext;
|
|
|
|
|
|
-import javax.annotation.Resource;
|
|
|
import java.util.*;
|
|
|
-import java.util.concurrent.CompletableFuture;
|
|
|
-import java.util.concurrent.ConcurrentHashMap;
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
-import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
/**
|
|
|
@@ -36,23 +27,15 @@ import java.util.stream.Collectors;
|
|
|
*/
|
|
|
@Slf4j
|
|
|
public abstract class Subscribe implements WsHandler {
|
|
|
- @Autowired
|
|
|
- private RedisTemplate<String,Object> redisTemplate;
|
|
|
-
|
|
|
@Autowired
|
|
|
private ObjectMapper objectMapper;
|
|
|
|
|
|
- public static final String SUBSCRIBE_TOPIC="subscribe-topic";
|
|
|
- /**
|
|
|
- * 存储主题与ws通道关联
|
|
|
- */
|
|
|
- private static Map<String,Set<ChannelContext>> subscribeTopics=new ConcurrentHashMap<>();
|
|
|
-
|
|
|
-
|
|
|
+ @Autowired
|
|
|
+ private RedissonUtil redissonUtil;
|
|
|
/**
|
|
|
* 存储主题与redis通道关联
|
|
|
*/
|
|
|
- private static Map<String,RedisConnection> redisConnectionMap=new ConcurrentReferenceHashMap<>();
|
|
|
+ private static Map<String, RPatternTopic> topicMap=new ConcurrentReferenceHashMap<>();
|
|
|
|
|
|
|
|
|
public TopicWrapper getTopic(String productName,String param,String tenantId){
|
|
|
@@ -60,10 +43,6 @@ public abstract class Subscribe implements WsHandler {
|
|
|
|
|
|
};
|
|
|
|
|
|
- public static Set<ChannelContext> getSubscribeChannel(String topic){
|
|
|
- return subscribeTopics.get(topic);
|
|
|
- }
|
|
|
-
|
|
|
|
|
|
@Override
|
|
|
public void onMessage(MessagingRequest message, ChannelContext channelContext) {
|
|
|
@@ -91,13 +70,10 @@ public abstract class Subscribe implements WsHandler {
|
|
|
MessagingRequest.Type type = message.getType();
|
|
|
if(MessagingRequest.Type.sub==type){
|
|
|
//订阅主题
|
|
|
- for (TopicWrapper topicWrapper : subScribeTopic) {
|
|
|
- this.subscribe(channelContext,topicWrapper);
|
|
|
- }
|
|
|
-// subScribeTopic.forEach(topicWrapper->this.subscribe(channelContext,topicWrapper));
|
|
|
+ subScribeTopic.forEach(topicWrapper->this.subscribe(channelContext,topicWrapper));
|
|
|
}else {
|
|
|
//取消订阅主题
|
|
|
-// subScribeTopic.forEach(topicWrapper->this.unsubscribe(channelContext,topicWrapper.getTopic()));
|
|
|
+ subScribeTopic.forEach(topicWrapper->this.unsubscribe(channelContext,topicWrapper.getTopic()));
|
|
|
}
|
|
|
log.error("订阅成功{}",subScribeTopic.stream().map(TopicWrapper::getTopic).collect(Collectors.toList()));
|
|
|
|
|
|
@@ -110,40 +86,8 @@ public abstract class Subscribe implements WsHandler {
|
|
|
*/
|
|
|
public void subscribe(ChannelContext channelContext, TopicWrapper topicWrapper){
|
|
|
//同一主题只订阅一次
|
|
|
- Set<ChannelContext> channelContexts = subscribeTopics.computeIfAbsent(topicWrapper.getTopic(), k -> new HashSet<>());
|
|
|
- channelContexts.add(channelContext);
|
|
|
- boolean subscribe=false;
|
|
|
- RedisConnection redisConnection = redisConnectionMap.get(topicWrapper.getTopic());
|
|
|
- if(redisConnection==null){
|
|
|
- subscribe=true;
|
|
|
- }else {
|
|
|
- Subscription subscription = redisConnection.getSubscription();
|
|
|
- if(subscription==null||CollectionUtil.isEmpty(subscription.getPatterns())){
|
|
|
- subscribe=true;
|
|
|
- }else {
|
|
|
- Collection<byte[]> patterns = subscription.getPatterns();
|
|
|
- for (byte[] pattern : patterns) {
|
|
|
- String patternName = new String(pattern);
|
|
|
- if(topicWrapper.getTopic().equals(patternName)){
|
|
|
- subscribe=true;
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- if(subscribe){
|
|
|
- redisTemplate.execute(new RedisCallback<Object>() {
|
|
|
- @Override
|
|
|
- public Object doInRedis(RedisConnection connection) throws DataAccessException {
|
|
|
- CompletableFuture.runAsync(()->{
|
|
|
- connection.pSubscribe(new DefaultMessageListener(getId(),topicWrapper,objectMapper,channelContexts),topicWrapper.getTopic().getBytes());
|
|
|
- });
|
|
|
- redisConnectionMap.put(topicWrapper.getTopic(),connection);
|
|
|
- return null;
|
|
|
- }
|
|
|
- });
|
|
|
- }
|
|
|
- getTopicByChannel(channelContext).add(topicWrapper.getTopic());
|
|
|
+ RPatternTopic rTopic = topicMap.computeIfAbsent(topicWrapper.getTopic(),topic->redissonUtil.getPatternTopic(topicWrapper.getTopic()));
|
|
|
+ addTopicListener(rTopic,channelContext, topicWrapper.getTopic());
|
|
|
};
|
|
|
|
|
|
/**
|
|
|
@@ -152,40 +96,34 @@ public abstract class Subscribe implements WsHandler {
|
|
|
* @param topic
|
|
|
*/
|
|
|
public void unsubscribe(ChannelContext channelContext, String topic){
|
|
|
- if(StrUtil.isEmpty(topic)){
|
|
|
- return;
|
|
|
- }
|
|
|
- AtomicBoolean remove = new AtomicBoolean(false);
|
|
|
- subscribeTopics.computeIfPresent(topic,(k,v)->{
|
|
|
- v.remove(channelContext);
|
|
|
-// if(CollectionUtil.isEmpty(v)){
|
|
|
- remove.set(true);
|
|
|
-// redisTemplate.execute(new RedisCallback<Object>() {
|
|
|
-// @Override
|
|
|
-// public Object doInRedis(RedisConnection connection) throws DataAccessException {
|
|
|
-// RedisConnection redisConnection = redisConnectionMap.get(topic);
|
|
|
-// if (redisConnection!=null) {
|
|
|
-// Optional.ofNullable(redisConnection.getSubscription())
|
|
|
-// .map(subscription -> {
|
|
|
-// subscription.pUnsubscribe(topic.getBytes());
|
|
|
-// return subscription;
|
|
|
-// });
|
|
|
-// }
|
|
|
-// return null;
|
|
|
-// }
|
|
|
-// });
|
|
|
-// redisConnectionMap.remove(topic);
|
|
|
-// }
|
|
|
- return v;
|
|
|
- });
|
|
|
+// if(StrUtil.isEmpty(topic)){
|
|
|
+// return;
|
|
|
+// }
|
|
|
+// topicMap.computeIfPresent(topic,(k,rTopic)->{
|
|
|
+// rTopic.removeListener( getTopicListener(channelContext,k));
|
|
|
+// return rTopic;
|
|
|
+// });
|
|
|
};
|
|
|
|
|
|
|
|
|
- public Set<String> getTopicByChannel(ChannelContext channelContext){
|
|
|
- Object result = Optional.ofNullable(channelContext.get(SUBSCRIBE_TOPIC)).orElse(new HashSet<>());
|
|
|
- channelContext.set(SUBSCRIBE_TOPIC,result);
|
|
|
- return (Set<String>) result;
|
|
|
+ public Map<String,DefaultMessageListener> getTopicListeners(ChannelContext channelContext){
|
|
|
+ Object result = Optional.ofNullable(channelContext.get("topic")).orElse(new HashMap<String,DefaultMessageListener>());
|
|
|
+ channelContext.set("topic",result);
|
|
|
+ return (Map<String,DefaultMessageListener>) result;
|
|
|
}
|
|
|
|
|
|
+ public DefaultMessageListener getTopicListener(ChannelContext channelContext,String topic){
|
|
|
+ Map<String,DefaultMessageListener> result = (Map<String, DefaultMessageListener>) Optional.ofNullable(channelContext.get("topic")).orElse(new HashMap<>());
|
|
|
+ return result.get(topic);
|
|
|
+ }
|
|
|
|
|
|
+ public DefaultMessageListener addTopicListener(RPatternTopic rTopic,ChannelContext channelContext,String topic){
|
|
|
+ Map<String, DefaultMessageListener> topicByChannel = getTopicListeners(channelContext);
|
|
|
+ DefaultMessageListener messageListener = topicByChannel.computeIfAbsent(topic, k -> {
|
|
|
+ DefaultMessageListener defaultMessageListener = new DefaultMessageListener(getId(), objectMapper, channelContext,rTopic);
|
|
|
+ rTopic.addListener(TopicMessage.class, defaultMessageListener);
|
|
|
+ return defaultMessageListener;
|
|
|
+ });
|
|
|
+ return messageListener;
|
|
|
+ }
|
|
|
}
|