|
|
@@ -0,0 +1,134 @@
|
|
|
+package com.coffee.common.config.websocket.handler;
|
|
|
+
|
|
|
+import cn.hutool.core.collection.CollectionUtil;
|
|
|
+import com.coffee.common.Constants;
|
|
|
+import com.coffee.common.bo.LoginUser;
|
|
|
+import com.coffee.common.config.websocket.DefaultMessageListener;
|
|
|
+import com.coffee.common.config.websocket.MessagingRequest;
|
|
|
+import com.coffee.common.config.websocket.WebSocketConstant;
|
|
|
+import org.springframework.dao.DataAccessException;
|
|
|
+import org.springframework.data.redis.connection.RedisConnection;
|
|
|
+import org.springframework.data.redis.core.RedisCallback;
|
|
|
+import org.springframework.data.redis.core.RedisTemplate;
|
|
|
+import org.springframework.util.ConcurrentReferenceHashMap;
|
|
|
+import org.tio.core.ChannelContext;
|
|
|
+
|
|
|
+import javax.annotation.Resource;
|
|
|
+import java.util.*;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.stream.Collectors;
|
|
|
+
|
|
|
+/**
|
|
|
+ * @author lifang
|
|
|
+ * @version 1.0.0
|
|
|
+ * @ClassName Subscribe.java
|
|
|
+ * @Description TODO
|
|
|
+ * @createTime 2022年03月25日 14:18:00
|
|
|
+ */
|
|
|
+public abstract class Subscribe implements WsHandler {
|
|
|
+ @Resource
|
|
|
+ private RedisTemplate<String,Object> redisTemplate;
|
|
|
+
|
|
|
+ public static final String SUBSCRIBE_TOPIC="subscribe-topic";
|
|
|
+ /**
|
|
|
+ * 存储主题与ws通道关联
|
|
|
+ */
|
|
|
+ private Map<String,Set<ChannelContext>> subscribeTopics=new ConcurrentHashMap<>();
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 存储主题与redis通道关联
|
|
|
+ */
|
|
|
+ private Map<String,RedisConnection> redisConnectionMap=new ConcurrentReferenceHashMap<>();
|
|
|
+
|
|
|
+
|
|
|
+ public String getTopic(String productName,String param,String tenantId){
|
|
|
+ return WebSocketConstant.getTopic(this.getId(),productName, param, tenantId);
|
|
|
+
|
|
|
+ };
|
|
|
+
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onMessage(MessagingRequest message, ChannelContext channelContext) {
|
|
|
+ LoginUser loginUser = (LoginUser) channelContext.get(Constants.LOGIN_USER_KEY);
|
|
|
+ if(loginUser==null){
|
|
|
+ channelContext.setClosed(true);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ //获取所有设备id
|
|
|
+ List<String> params = message.getParams();
|
|
|
+ if(CollectionUtil.isEmpty(params)){
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ //需要处理的主题
|
|
|
+ List<String> subScribeTopic =
|
|
|
+ params.stream().map(deviceId -> getTopic(message.getProductName(), deviceId, loginUser.getTenantId()))
|
|
|
+ .collect(Collectors.toList());
|
|
|
+ MessagingRequest.Type type = message.getType();
|
|
|
+ if(MessagingRequest.Type.sub==type){
|
|
|
+ //订阅主题
|
|
|
+ subScribeTopic.forEach(topic->this.subscribe(channelContext,topic));
|
|
|
+ }else {
|
|
|
+ //取消订阅主题
|
|
|
+ subScribeTopic.forEach(topic->this.unsubscribe(channelContext,topic));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * ws 订阅主题
|
|
|
+ * @param channelContext
|
|
|
+ * @param topic
|
|
|
+ */
|
|
|
+ public void subscribe(ChannelContext channelContext, String topic){
|
|
|
+ //同一主题只订阅一次
|
|
|
+ Set<ChannelContext> channelContexts = Optional.ofNullable(subscribeTopics.get(topic)).orElse(new HashSet<>());
|
|
|
+ if(!subscribeTopics.containsKey(topic)){
|
|
|
+ channelContexts.add(channelContext);
|
|
|
+ redisTemplate.execute(new RedisCallback<Object>() {
|
|
|
+ @Override
|
|
|
+ public Object doInRedis(RedisConnection connection) throws DataAccessException {
|
|
|
+ connection.pSubscribe(new DefaultMessageListener(getId(),channelContexts),topic.getBytes());
|
|
|
+ redisConnectionMap.put(topic,connection);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+ subscribeTopics.put(topic,channelContexts);
|
|
|
+ //将主题与ws通道绑定
|
|
|
+ Object result = Optional.ofNullable(channelContext.get(SUBSCRIBE_TOPIC)).orElse(new HashSet<>());
|
|
|
+ Set<String> subscribeTopicSet= (Set<String>) result;
|
|
|
+ subscribeTopicSet.add(topic);
|
|
|
+ channelContext.set(SUBSCRIBE_TOPIC,subscribeTopicSet);
|
|
|
+ };
|
|
|
+
|
|
|
+ /**
|
|
|
+ * ws取消订阅主题
|
|
|
+ * @param channelContext
|
|
|
+ * @param topic
|
|
|
+ */
|
|
|
+ public void unsubscribe(ChannelContext channelContext, String topic){
|
|
|
+ if(subscribeTopics.containsKey(topic)){
|
|
|
+ Set<ChannelContext> channelContexts = subscribeTopics.get(topic);
|
|
|
+ if(CollectionUtil.isNotEmpty(channelContexts)){
|
|
|
+ channelContexts.remove(channelContext);
|
|
|
+ }
|
|
|
+ //重新获取集合,避免多线程发生冲突,再次判断此时是否为空
|
|
|
+ if(CollectionUtil.isEmpty(subscribeTopics.get(topic))){
|
|
|
+ subscribeTopics.remove(topic);
|
|
|
+ redisTemplate.execute(new RedisCallback<Object>() {
|
|
|
+ @Override
|
|
|
+ public Object doInRedis(RedisConnection connection) throws DataAccessException {
|
|
|
+ RedisConnection redisConnection = redisConnectionMap.get(topic);
|
|
|
+ if (redisConnection!=null) {
|
|
|
+ redisConnection.getSubscription().unsubscribe(topic.getBytes());
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ });
|
|
|
+ redisConnectionMap.remove(topic);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+
|
|
|
+}
|