|
|
@@ -7,6 +7,7 @@ import com.coffee.common.config.websocket.DefaultMessageListener;
|
|
|
import com.coffee.common.config.websocket.MessagingRequest;
|
|
|
import com.coffee.common.config.websocket.WebSocketConstant;
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.dao.DataAccessException;
|
|
|
import org.springframework.data.redis.connection.RedisConnection;
|
|
|
@@ -17,7 +18,9 @@ import org.tio.core.ChannelContext;
|
|
|
|
|
|
import javax.annotation.Resource;
|
|
|
import java.util.*;
|
|
|
+import java.util.concurrent.CompletableFuture;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
/**
|
|
|
@@ -27,6 +30,7 @@ import java.util.stream.Collectors;
|
|
|
* @Description TODO
|
|
|
* @createTime 2022年03月25日 14:18:00
|
|
|
*/
|
|
|
+@Slf4j
|
|
|
public abstract class Subscribe implements WsHandler {
|
|
|
@Autowired
|
|
|
private RedisTemplate<String,Object> redisTemplate;
|
|
|
@@ -38,13 +42,13 @@ public abstract class Subscribe implements WsHandler {
|
|
|
/**
|
|
|
* 存储主题与ws通道关联
|
|
|
*/
|
|
|
- private Map<String,Set<ChannelContext>> subscribeTopics=new ConcurrentHashMap<>();
|
|
|
+ private static Map<String,Set<ChannelContext>> subscribeTopics=new ConcurrentHashMap<>();
|
|
|
|
|
|
|
|
|
/**
|
|
|
* 存储主题与redis通道关联
|
|
|
*/
|
|
|
- private Map<String,RedisConnection> redisConnectionMap=new ConcurrentReferenceHashMap<>();
|
|
|
+ private static Map<String,RedisConnection> redisConnectionMap=new ConcurrentReferenceHashMap<>();
|
|
|
|
|
|
|
|
|
public TopicWrapper getTopic(String productName,String param,String tenantId){
|
|
|
@@ -52,6 +56,10 @@ public abstract class Subscribe implements WsHandler {
|
|
|
|
|
|
};
|
|
|
|
|
|
+ public static Set<ChannelContext> getSubscribeChannel(String topic){
|
|
|
+ return subscribeTopics.get(topic);
|
|
|
+ }
|
|
|
+
|
|
|
|
|
|
@Override
|
|
|
public void onMessage(MessagingRequest message, ChannelContext channelContext) {
|
|
|
@@ -78,11 +86,16 @@ public abstract class Subscribe implements WsHandler {
|
|
|
MessagingRequest.Type type = message.getType();
|
|
|
if(MessagingRequest.Type.sub==type){
|
|
|
//订阅主题
|
|
|
- subScribeTopic.forEach(topicWrapper->this.subscribe(channelContext,topicWrapper));
|
|
|
+ for (TopicWrapper topicWrapper : subScribeTopic) {
|
|
|
+ this.subscribe(channelContext,topicWrapper);
|
|
|
+ }
|
|
|
+// subScribeTopic.forEach(topicWrapper->this.subscribe(channelContext,topicWrapper));
|
|
|
}else {
|
|
|
//取消订阅主题
|
|
|
subScribeTopic.forEach(topicWrapper->this.unsubscribe(channelContext,topicWrapper.getTopic()));
|
|
|
}
|
|
|
+ log.error("订阅成功{}",subScribeTopic.stream().map(TopicWrapper::getTopic).collect(Collectors.toList()));
|
|
|
+
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -91,6 +104,7 @@ public abstract class Subscribe implements WsHandler {
|
|
|
* @param topicWrapper
|
|
|
*/
|
|
|
public void subscribe(ChannelContext channelContext, TopicWrapper topicWrapper){
|
|
|
+
|
|
|
//将主题与ws通道绑定
|
|
|
Object result = Optional.ofNullable(channelContext.get(SUBSCRIBE_TOPIC)).orElse(new HashSet<>());
|
|
|
Set<String> subscribeTopicSet= (Set<String>) result;
|
|
|
@@ -100,19 +114,20 @@ public abstract class Subscribe implements WsHandler {
|
|
|
subscribeTopicSet.add(topicWrapper.getTopic());
|
|
|
channelContext.set(SUBSCRIBE_TOPIC,subscribeTopicSet);
|
|
|
//同一主题只订阅一次
|
|
|
- Set<ChannelContext> channelContexts = Optional.ofNullable(subscribeTopics.get(topicWrapper.getTopic())).orElse(new HashSet<>());
|
|
|
+ Set<ChannelContext> channelContexts = subscribeTopics.computeIfAbsent(topicWrapper.getTopic(), k -> new HashSet<>());
|
|
|
channelContexts.add(channelContext);
|
|
|
- if(!subscribeTopics.containsKey(topicWrapper.getTopic())){
|
|
|
+ if(!redisConnectionMap.containsKey(topicWrapper.getTopic())){
|
|
|
redisTemplate.execute(new RedisCallback<Object>() {
|
|
|
@Override
|
|
|
public Object doInRedis(RedisConnection connection) throws DataAccessException {
|
|
|
- connection.pSubscribe(new DefaultMessageListener(getId(),channelContexts,topicWrapper,objectMapper),topicWrapper.getTopic().getBytes());
|
|
|
+ CompletableFuture.runAsync(()->
|
|
|
+ connection.subscribe(new DefaultMessageListener(getId(),topicWrapper,objectMapper),topicWrapper.getTopic().getBytes()));
|
|
|
redisConnectionMap.put(topicWrapper.getTopic(),connection);
|
|
|
return null;
|
|
|
}
|
|
|
});
|
|
|
+
|
|
|
}
|
|
|
- subscribeTopics.put(topicWrapper.getTopic(),channelContexts);
|
|
|
};
|
|
|
|
|
|
/**
|
|
|
@@ -121,14 +136,12 @@ public abstract class Subscribe implements WsHandler {
|
|
|
* @param topic
|
|
|
*/
|
|
|
public void unsubscribe(ChannelContext channelContext, String topic){
|
|
|
- if(subscribeTopics.containsKey(topic)){
|
|
|
- Set<ChannelContext> channelContexts = subscribeTopics.get(topic);
|
|
|
- if(CollectionUtil.isNotEmpty(channelContexts)){
|
|
|
- channelContexts.remove(channelContext);
|
|
|
- }
|
|
|
- //重新获取集合,避免多线程发生冲突,再次判断此时是否为空
|
|
|
- if(CollectionUtil.isEmpty(subscribeTopics.get(topic))){
|
|
|
- subscribeTopics.remove(topic);
|
|
|
+ Object result = Optional.ofNullable(channelContext.get(SUBSCRIBE_TOPIC)).orElse(new HashSet<>());
|
|
|
+ Set<String> subscribeTopicSet= (Set<String>) result;
|
|
|
+ subscribeTopicSet.remove(topic);
|
|
|
+ subscribeTopics.computeIfPresent(topic,(k,v)->{
|
|
|
+ v.remove(channelContext);
|
|
|
+ if(CollectionUtil.isEmpty(v)){
|
|
|
redisTemplate.execute(new RedisCallback<Object>() {
|
|
|
@Override
|
|
|
public Object doInRedis(RedisConnection connection) throws DataAccessException {
|
|
|
@@ -141,7 +154,8 @@ public abstract class Subscribe implements WsHandler {
|
|
|
});
|
|
|
redisConnectionMap.remove(topic);
|
|
|
}
|
|
|
- }
|
|
|
+ return v;
|
|
|
+ });
|
|
|
};
|
|
|
|
|
|
|