|
@@ -2,21 +2,28 @@ package com.nb.common.websocket.handler;
|
|
|
|
|
|
|
|
import cn.hutool.core.collection.CollectionUtil;
|
|
import cn.hutool.core.collection.CollectionUtil;
|
|
|
import cn.hutool.core.util.StrUtil;
|
|
import cn.hutool.core.util.StrUtil;
|
|
|
|
|
+import cn.hutool.extra.spring.SpringUtil;
|
|
|
|
|
+import cn.hutool.json.JSONUtil;
|
|
|
import com.nb.auth.bean.LoginUser;
|
|
import com.nb.auth.bean.LoginUser;
|
|
|
|
|
+import com.nb.common.websocket.*;
|
|
|
|
|
+import com.nb.common.websocket.event.PubMsgEvent;
|
|
|
|
|
+import com.nb.common.websocket.filter.PubMsgFilter;
|
|
|
import com.nb.core.Constants;
|
|
import com.nb.core.Constants;
|
|
|
-import com.nb.common.websocket.DefaultMessageListener;
|
|
|
|
|
-import com.nb.common.websocket.MessagingRequest;
|
|
|
|
|
-import com.nb.common.websocket.TopicMessage;
|
|
|
|
|
-import com.nb.common.websocket.WebSocketConstant;
|
|
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
|
|
+import com.nb.core.result.R;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
|
|
+import org.apache.tomcat.util.descriptor.web.WebXml;
|
|
|
import org.redisson.api.RPatternTopic;
|
|
import org.redisson.api.RPatternTopic;
|
|
|
|
|
+import org.redisson.api.RTopic;
|
|
|
import org.redisson.api.RedissonClient;
|
|
import org.redisson.api.RedissonClient;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
-import org.springframework.util.ConcurrentReferenceHashMap;
|
|
|
|
|
import org.tio.core.ChannelContext;
|
|
import org.tio.core.ChannelContext;
|
|
|
|
|
+import org.tio.core.Tio;
|
|
|
|
|
+import org.tio.websocket.common.WsPacket;
|
|
|
|
|
+import org.tio.websocket.common.WsResponse;
|
|
|
|
|
|
|
|
import java.util.*;
|
|
import java.util.*;
|
|
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -33,11 +40,18 @@ public abstract class Subscribe implements WsHandler {
|
|
|
|
|
|
|
|
@Autowired
|
|
@Autowired
|
|
|
private RedissonClient redissonClient;
|
|
private RedissonClient redissonClient;
|
|
|
|
|
+
|
|
|
|
|
+ @Autowired(required = false)
|
|
|
|
|
+ private List<PubMsgFilter> msgFilters;
|
|
|
/**
|
|
/**
|
|
|
- * 存储主题与redis通道关联
|
|
|
|
|
|
|
+ * 订阅主题缓存
|
|
|
*/
|
|
*/
|
|
|
- private static Map<String, RPatternTopic> topicMap=new ConcurrentReferenceHashMap<>();
|
|
|
|
|
|
|
+ private static Map<String, RPatternTopic> subTopicMap =new ConcurrentHashMap<>();
|
|
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 发布主题缓存
|
|
|
|
|
+ */
|
|
|
|
|
+ private static Map<String, RTopic> pubTopicMap =new ConcurrentHashMap<>();
|
|
|
|
|
|
|
|
public TopicWrapper getTopic(String productName,String param,String tenantId){
|
|
public TopicWrapper getTopic(String productName,String param,String tenantId){
|
|
|
return WebSocketConstant.getTopic(this.getId(),productName, param, tenantId);
|
|
return WebSocketConstant.getTopic(this.getId(),productName, param, tenantId);
|
|
@@ -69,15 +83,43 @@ public abstract class Subscribe implements WsHandler {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
MessagingRequest.Type type = message.getType();
|
|
MessagingRequest.Type type = message.getType();
|
|
|
- if(MessagingRequest.Type.sub==type){
|
|
|
|
|
|
|
+ if(MessagingRequest.Type.sub.equals(type)){
|
|
|
//订阅主题
|
|
//订阅主题
|
|
|
subScribeTopic.forEach(topicWrapper->this.subscribe(channelContext,topicWrapper));
|
|
subScribeTopic.forEach(topicWrapper->this.subscribe(channelContext,topicWrapper));
|
|
|
- }else {
|
|
|
|
|
|
|
+ }else if(MessagingRequest.Type.unsub.equals(type)){
|
|
|
//取消订阅主题
|
|
//取消订阅主题
|
|
|
subScribeTopic.forEach(topicWrapper->this.unsubscribe(channelContext,topicWrapper.getTopic()));
|
|
subScribeTopic.forEach(topicWrapper->this.unsubscribe(channelContext,topicWrapper.getTopic()));
|
|
|
|
|
+ }else {
|
|
|
|
|
+ subScribeTopic.forEach(topicWrapper->this.publish(channelContext,message,topicWrapper.getTopic(),topicWrapper.getParam()));
|
|
|
}
|
|
}
|
|
|
if(log.isDebugEnabled()){
|
|
if(log.isDebugEnabled()){
|
|
|
- log.debug("订阅成功{}",subScribeTopic.stream().map(TopicWrapper::getTopic).collect(Collectors.toList()));
|
|
|
|
|
|
|
+ log.debug("ws数据处理成功{}", JSONUtil.toJsonStr(message));
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 描述: 推送主题
|
|
|
|
|
+ * @author lifang
|
|
|
|
|
+ * @date 2022/8/16 14:04
|
|
|
|
|
+ * @param channelContext 发布推送消息的通道
|
|
|
|
|
+ * @param message
|
|
|
|
|
+ * @param topic
|
|
|
|
|
+ * @param param
|
|
|
|
|
+ * @return void
|
|
|
|
|
+ */
|
|
|
|
|
+ private void publish(ChannelContext channelContext, MessagingRequest message, String topic, String param) {
|
|
|
|
|
+ if(CollectionUtil.isNotEmpty(msgFilters)){
|
|
|
|
|
+ boolean result = msgFilters.stream()
|
|
|
|
|
+ .allMatch(filter -> filter.doFilter(channelContext, message));
|
|
|
|
|
+ if(result){
|
|
|
|
|
+ SpringUtil.publishEvent(new PubMsgEvent(this,message));
|
|
|
|
|
+ pubTopicMap.computeIfAbsent(topic, k->redissonClient.getTopic(k))
|
|
|
|
|
+ .publishAsync( TopicMessage.of(message.getPayload(),param,message.getMsgId()))
|
|
|
|
|
+ .whenComplete((r,e)->{
|
|
|
|
|
+ Tio.send(channelContext, WsResponse.fromText(JSONUtil.toJsonStr(PubResponse.success(message.getMsgId())), WsPacket.CHARSET_NAME));
|
|
|
|
|
+ });
|
|
|
|
|
+
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -89,7 +131,7 @@ public abstract class Subscribe implements WsHandler {
|
|
|
public void subscribe(ChannelContext channelContext, TopicWrapper topicWrapper){
|
|
public void subscribe(ChannelContext channelContext, TopicWrapper topicWrapper){
|
|
|
getChannelTopic(channelContext).add(topicWrapper.getTopic());
|
|
getChannelTopic(channelContext).add(topicWrapper.getTopic());
|
|
|
//同一主题只订阅一次
|
|
//同一主题只订阅一次
|
|
|
- RPatternTopic rTopic = topicMap.computeIfAbsent(topicWrapper.getTopic(),topic->redissonClient.getPatternTopic(topicWrapper.getTopic()));
|
|
|
|
|
|
|
+ RPatternTopic rTopic = subTopicMap.computeIfAbsent(topicWrapper.getTopic(), topic->redissonClient.getPatternTopic(topicWrapper.getTopic()));
|
|
|
addTopicListener(rTopic,channelContext, topicWrapper.getTopic());
|
|
addTopicListener(rTopic,channelContext, topicWrapper.getTopic());
|
|
|
};
|
|
};
|
|
|
|
|
|
|
@@ -110,7 +152,7 @@ public abstract class Subscribe implements WsHandler {
|
|
|
|
|
|
|
|
//取消订阅
|
|
//取消订阅
|
|
|
for (String subTopic : allTopics) {
|
|
for (String subTopic : allTopics) {
|
|
|
- topicMap.computeIfPresent(subTopic,(k,rTopic)->{
|
|
|
|
|
|
|
+ subTopicMap.computeIfPresent(subTopic,(k, rTopic)->{
|
|
|
rTopic.removeListener( getTopicListener(channelContext,k));
|
|
rTopic.removeListener( getTopicListener(channelContext,k));
|
|
|
return rTopic;
|
|
return rTopic;
|
|
|
});
|
|
});
|