소스 검색

add websocket 订阅发布模式

18339543638 3 년 전
부모
커밋
01bbb3786a

+ 39 - 0
coffee-common/src/main/java/com/coffee/common/config/websocket/DefaultMessageListener.java

@@ -0,0 +1,39 @@
+package com.coffee.common.config.websocket;
+
+import cn.hutool.core.collection.CollectionUtil;
+import cn.hutool.json.JSONUtil;
+import com.coffee.common.result.R;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+import org.tio.core.ChannelContext;
+import org.tio.core.Tio;
+import org.tio.websocket.common.WsResponse;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Set;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName DefaultMessageListener.java
+ * @Description TODO
+ * @createTime 2022年03月25日 14:42:00
+ */
+@Data
+@Slf4j
+public class DefaultMessageListener implements MessageListener {
+    private final Set<ChannelContext> channelContexts;
+
+    @Override
+    public void onMessage(Message message, byte[] pattern) {
+        if (CollectionUtil.isNotEmpty(channelContexts)) {
+            String body= new String(message.getBody());
+            channelContexts.parallelStream()
+                    .filter(channelContext -> !channelContext.isClosed)
+                    .forEach(channel -> Tio.send(channel, WsResponse.fromText(JSONUtil.toJsonStr(R.success(body)),"utf-8")));
+        }
+    }
+}

+ 20 - 0
coffee-common/src/main/java/com/coffee/common/config/websocket/DefaultRedisCallBack.java

@@ -0,0 +1,20 @@
+package com.coffee.common.config.websocket;
+
+import org.springframework.dao.DataAccessException;
+import org.springframework.data.redis.connection.RedisConnection;
+import org.springframework.data.redis.core.RedisCallback;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName DefaultRedisCallBack.java
+ * @Description TODO
+ * @createTime 2022年03月25日 16:14:00
+ */
+public class DefaultRedisCallBack implements RedisCallback<Object> {
+    @Override
+    public Object doInRedis(RedisConnection connection) throws DataAccessException {
+
+        return null;
+    }
+}

+ 54 - 26
coffee-common/src/main/java/com/coffee/common/config/websocket/DefaultWebSocketMsgHandler.java

@@ -1,14 +1,15 @@
 package com.coffee.common.config.websocket;
 
-import cn.dev33.satoken.session.SaSession;
 import cn.dev33.satoken.stp.StpUtil;
+import cn.hutool.core.collection.CollectionUtil;
 import cn.hutool.core.util.StrUtil;
-import cn.hutool.cron.Scheduler;
 import cn.hutool.json.JSONUtil;
-import com.coffee.common.redis.RedisUtils;
+import com.coffee.common.Constants;
+import com.coffee.common.bo.LoginUser;
+import com.coffee.common.config.websocket.handler.WsHandler;
 import com.coffee.common.result.R;
+import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.stereotype.Component;
 import org.tio.core.ChannelContext;
 import org.tio.core.Tio;
@@ -17,24 +18,16 @@ import org.tio.http.common.HttpResponse;
 import org.tio.websocket.common.WsRequest;
 import org.tio.websocket.common.WsResponse;
 import org.tio.websocket.server.handler.IWsMsgHandler;
-import reactor.core.scheduler.Schedulers;
+import java.util.*;
+import java.util.stream.Collectors;
 
 @Component
 @Slf4j
+@AllArgsConstructor
 public class DefaultWebSocketMsgHandler implements IWsMsgHandler {
-    RedisUtils redisUtils;
-    RedisTemplate redisTemplate;
+    private final List<WsHandler> messageHandlers;
     @Override
-    public HttpResponse handshake(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception {
-
-//        if(StrUtil.isEmpty(authorization)){
-//            return httpResponse.setBody(R.fail());
-//        }
-//        httpResponse.setBody("123".getBytes());
-//        throw new CustomException("授权失败");
-//        Tio.send(channelContext,WsResponse.fromText("授权失败","utf-8"));
-
-
+    public HttpResponse handshake(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) {
         return httpResponse;
     }
 
@@ -51,40 +44,75 @@ public class DefaultWebSocketMsgHandler implements IWsMsgHandler {
             channelContext.setClosed(true);
             return;
         }
-        SaSession tokenSessionByToken = StpUtil.getTokenSessionByToken(authorization);
-        if(null==tokenSessionByToken){
+        Object result = StpUtil.getTokenSessionByToken(authorization).get(Constants.LOGIN_USER_KEY);
+        if(null==result){
             Tio.send(channelContext,WsResponse.fromText(JSONUtil.toJsonStr(R.fail("授权失败")),"utf-8"));
             if(log.isDebugEnabled()){
                 log.debug("Authorization:{},鉴权失败",authorization);
             }
-            Tio.unbindToken(channelContext);
+            unbind(channelContext);
             Thread.sleep(50);
             channelContext.setClosed(true);
             return;
         }
-        Tio.bindToken(channelContext, JSONUtil.toJsonStr(tokenSessionByToken));
+        Tio.bindToken(channelContext, JSONUtil.toJsonStr(authorization));
+        LoginUser loginUser = (LoginUser)result;
+        channelContext.set(Constants.LOGIN_USER_KEY,loginUser);
         Tio.send(channelContext,WsResponse.fromText(JSONUtil.toJsonStr(R.success("连接成功")),"utf-8"));
 
     }
 
     @Override
-    public Object onBytes(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {
+    public Object onBytes(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext){
         return null;
     }
 
     @Override
-    public Object onClose(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {
+    public Object onClose(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) {
+        messageHandlers.forEach(wsHandler -> wsHandler.close(channelContext));
         return null;
     }
 
     @Override
-    public Object onText(WsRequest wsRequest, String message, ChannelContext channelContext) throws Exception {
-        System.out.println("接收到文本消息:"+message);
+    public Object onText(WsRequest wsRequest, String message, ChannelContext channelContext) {
         if (StrUtil.isEmpty(message)) {
-            Tio.unbindBsId(channelContext);
+            unbind(channelContext);
             channelContext.setClosed(true);
             return null;
         }
+        if(log.isDebugEnabled()){
+            log.debug("websocket 接收到消息,message:{},token:{},userId:{}",message,channelContext.getToken(),JSONUtil.toJsonStr(channelContext.get(Constants.LOGIN_USER_KEY)));
+        }
+        //心跳请求 todo
+        if("ping".equals(message.trim().toLowerCase())){
+            return null;
+        }
+        try {
+            MessagingRequest messagingRequest = JSONUtil.toBean(message, MessagingRequest.class);
+            messagingRequest.validate();
+            List<WsHandler> collect = messageHandlers
+                    .parallelStream()
+                    .filter(handler -> messagingRequest.getId().equals(handler.getId()))
+                    .collect(Collectors.toList());
+            //传入格式错误,不支持该订阅id
+            if(CollectionUtil.isEmpty(collect)){
+                unbind(channelContext);
+                channelContext.setClosed(true);
+                return null;
+            }
+            collect.forEach(handler->handler.onMessage(messagingRequest,channelContext));
+        }catch (Exception e){
+            e.printStackTrace();
+            log.warn("websocket 接收到异常请求,token:{},message:{},userId:{}",channelContext.getToken(),message,JSONUtil.toJsonStr(channelContext.get(Constants.LOGIN_USER_KEY)));
+            unbind(channelContext);
+            channelContext.setClosed(true);
+        }
+
         return null;
     }
+
+    private void unbind(ChannelContext channelContext){
+        Tio.unbindToken(channelContext);
+        Tio.unbindUser(channelContext);
+    }
 }

+ 20 - 0
coffee-common/src/main/java/com/coffee/common/config/websocket/MessageResponse.java

@@ -0,0 +1,20 @@
+package com.coffee.common.config.websocket;
+
+import cn.hutool.core.util.ObjectUtil;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName MessageResponse.java
+ * @Description TODO
+ * @createTime 2022年03月25日 16:25:00
+ */
+@Data
+@AllArgsConstructor(staticName = "of")
+public class MessageResponse {
+    private String id;
+    private String type;
+    private Object payload;
+}

+ 23 - 3
coffee-common/src/main/java/com/coffee/common/config/websocket/MessagingRequest.java

@@ -1,7 +1,10 @@
 package com.coffee.common.config.websocket;
 
-import java.util.Map;
+import cn.hutool.core.util.StrUtil;
+import lombok.Data;
+
 import java.util.*;
+@Data
 public class MessagingRequest {
 
     /**
@@ -15,12 +18,29 @@ public class MessagingRequest {
     private Type type;
 
     /**
-     * 订阅id,如设备id、病人id
+     * 产品名称
+     */
+    private String productName;
+    /**
+     * 订阅id,如设备id、病人id、报警类型、设备状态类型
      */
     private List<String> params;
 
 
-    public enum Type{
+    private String tenantId;
+
+    private Integer isSys;
+
+    public static enum Type{
         sub,unsub
     }
+
+    public void validate(){
+        if(StrUtil.isNullOrUndefined(id)){
+            throw new RuntimeException("MessageRequest id不能为空");
+        }
+        if(StrUtil.isNullOrUndefined(tenantId)&&isSys==null){
+            throw new RuntimeException("tenantId、isSys不能同时为空");
+        }
+    }
 }

+ 106 - 0
coffee-common/src/main/java/com/coffee/common/config/websocket/handler/Subscribe.java

@@ -0,0 +1,106 @@
+package com.coffee.common.config.websocket.handler;
+
+import cn.hutool.core.collection.CollectionUtil;
+import com.coffee.common.config.websocket.DefaultMessageListener;
+import com.coffee.common.config.websocket.MessageResponse;
+import io.netty.channel.DefaultEventLoop;
+import io.netty.channel.EventLoop;
+import org.springframework.dao.DataAccessException;
+import org.springframework.data.redis.connection.RedisConnection;
+import org.springframework.data.redis.connection.Subscription;
+import org.springframework.data.redis.core.RedisCallback;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.util.ConcurrentReferenceHashMap;
+import org.tio.core.ChannelContext;
+import reactor.util.function.Tuple3;
+
+import javax.annotation.Resource;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName Subscribe.java
+ * @Description TODO
+ * @createTime 2022年03月25日 14:18:00
+ */
+public abstract class Subscribe implements WsHandler {
+    @Resource
+    private RedisTemplate<String,Object> redisTemplate;
+
+    public static final String SUBSCRIBE_TOPIC="subscribe-topic";
+    /**
+     * 存储主题与ws通道关联
+     */
+    private Map<String,Set<ChannelContext>> subscribeTopics=new ConcurrentHashMap<>();
+
+
+    /**
+     * 存储主题与redis通道关联
+     */
+    private Map<String,RedisConnection> redisConnectionMap=new ConcurrentReferenceHashMap<>();
+
+
+    private EventLoop singleThreadEventLoop=new DefaultEventLoop() ;
+    /**
+     * ws 订阅主题
+     * @param channelContext
+     * @param topic
+     */
+    public void subscribe(ChannelContext channelContext, String topic){
+        //同一主题只订阅一次
+        Set<ChannelContext> channelContexts = Optional.ofNullable(subscribeTopics.get(topic)).orElse(new HashSet<>());
+        if(!subscribeTopics.containsKey(topic)){
+            channelContexts.add(channelContext);
+            redisTemplate.execute(new RedisCallback<Object>() {
+                @Override
+                public Object doInRedis(RedisConnection connection) throws DataAccessException {
+                    connection.subscribe(new DefaultMessageListener(channelContexts),topic.getBytes());
+                    redisConnectionMap.put(topic,connection);
+                    return null;
+                }
+            });
+            new Thread(()->singleThreadEventLoop.scheduleAtFixedRate(()->redisTemplate.convertAndSend(topic, MessageResponse.of(getId(),"result","主题为["+topic+"]的回复消息")),1,3, TimeUnit.SECONDS))
+                    .start();
+        }
+        subscribeTopics.put(topic,channelContexts);
+        //将主题与ws通道绑定
+        Object result = Optional.ofNullable(channelContext.get(SUBSCRIBE_TOPIC)).orElse(new HashSet<>());
+        Set<String> subscribeTopicSet= (Set<String>) result;
+        subscribeTopicSet.add(topic);
+        channelContext.set(SUBSCRIBE_TOPIC,subscribeTopicSet);
+    };
+
+    /**
+     * ws取消订阅主题
+     * @param channelContext
+     * @param topic
+     */
+    public void unsubscribe(ChannelContext channelContext, String topic){
+        if(subscribeTopics.containsKey(topic)){
+            Set<ChannelContext> channelContexts = subscribeTopics.get(topic);
+            if(CollectionUtil.isNotEmpty(channelContexts)){
+                channelContexts.remove(channelContext);
+            }
+            //重新获取集合,避免多线程发生冲突,再次判断此时是否为空
+            if(CollectionUtil.isEmpty(subscribeTopics.get(topic))){
+                subscribeTopics.remove(topic);
+                redisTemplate.execute(new RedisCallback<Object>() {
+                    @Override
+                    public Object doInRedis(RedisConnection connection) throws DataAccessException {
+                        RedisConnection redisConnection = redisConnectionMap.get(topic);
+                        if (redisConnection!=null) {
+                            redisConnection.getSubscription().unsubscribe(topic.getBytes());
+                        }
+                        return null;
+                    }
+                });
+                redisConnectionMap.remove(topic);
+            }
+        }
+    };
+
+
+}

+ 3 - 6
coffee-common/src/main/java/com/coffee/common/config/websocket/handler/WsHandler.java

@@ -1,7 +1,7 @@
 package com.coffee.common.config.websocket.handler;
 
+import com.coffee.common.config.websocket.MessagingRequest;
 import org.tio.core.ChannelContext;
-import org.tio.websocket.common.WsRequest;
 
 /**
  * @author lifang
@@ -13,10 +13,7 @@ import org.tio.websocket.common.WsRequest;
 public interface WsHandler {
     String getId();
 
+    void onMessage(MessagingRequest message, ChannelContext channelContext);
 
-    void subscribe();
-
-    void onBytes(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext);
-
-    void onText(WsRequest wsRequest, String message, ChannelContext channelContext);
+    void close(ChannelContext channelContext);
 }

+ 32 - 0
coffee-system/src/main/java/com/coffee/bus/websocket/AlarmCountHandler.java

@@ -0,0 +1,32 @@
+package com.coffee.bus.websocket;
+
+import com.coffee.common.config.websocket.MessagingRequest;
+import com.coffee.common.config.websocket.handler.Subscribe;
+import org.springframework.stereotype.Component;
+import org.tio.core.ChannelContext;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName AlarmCountHandler.java
+ * @Description 设备报警数量订阅
+ * @createTime 2022年03月25日 14:22:00
+ */
+@Component
+public class AlarmCountHandler extends Subscribe {
+
+    @Override
+    public String getId() {
+        return WebSocketConstant.ALARM_COUNT;
+    }
+
+    @Override
+    public void onMessage(MessagingRequest message, ChannelContext channelContext) {
+
+    }
+
+    @Override
+    public void close(ChannelContext channelContext) {
+
+    }
+}

+ 59 - 0
coffee-system/src/main/java/com/coffee/bus/websocket/DeviceInfoDetailHandler.java

@@ -0,0 +1,59 @@
+package com.coffee.bus.websocket;
+
+import cn.hutool.core.collection.CollectionUtil;
+import com.coffee.common.Constants;
+import com.coffee.common.bo.LoginUser;
+import com.coffee.common.config.websocket.MessagingRequest;
+import com.coffee.common.config.websocket.handler.Subscribe;
+import org.springframework.stereotype.Component;
+import org.tio.core.ChannelContext;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName DeviceInfoDetailHandler.java
+ * @Description 处理订阅设备详情
+ * @createTime 2022年03月25日 14:20:00
+ */
+@Component
+public class DeviceInfoDetailHandler extends Subscribe{
+
+    @Override
+    public String getId() {
+        return WebSocketConstant.DEVICE_INFO_DETAIL;
+    }
+
+    @Override
+    public void onMessage(MessagingRequest message, ChannelContext channelContext) {
+        LoginUser loginUser = (LoginUser) channelContext.get(Constants.LOGIN_USER_KEY);
+        if(loginUser==null){
+            channelContext.setClosed(true);
+            return;
+        }
+        //获取所有设备id
+        List<String> params = message.getParams();
+        if(CollectionUtil.isEmpty(params)){
+            return;
+        }
+        //需要处理的主题
+        List<String> subScribeTopic =
+                params.stream().map(deviceId -> WebSocketConstant.getTopic(this.getId(), message.getProductName(), deviceId, loginUser.getTenantId()))
+                        .collect(Collectors.toList());
+        MessagingRequest.Type type = message.getType();
+        if(MessagingRequest.Type.sub==type){
+            //订阅主题
+            subScribeTopic.forEach(topic->this.subscribe(channelContext,topic));
+        }else {
+            //取消订阅主题
+            subScribeTopic.forEach(topic->this.unsubscribe(channelContext,topic));
+        }
+    }
+
+    @Override
+    public void close(ChannelContext channelContext) {
+
+    }
+
+}

+ 33 - 0
coffee-system/src/main/java/com/coffee/bus/websocket/DeviceStateCountHandler.java

@@ -0,0 +1,33 @@
+package com.coffee.bus.websocket;
+
+import com.coffee.common.config.websocket.MessagingRequest;
+import com.coffee.common.config.websocket.handler.Subscribe;
+import com.coffee.common.config.websocket.handler.WsHandler;
+import org.springframework.stereotype.Component;
+import org.tio.core.ChannelContext;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName DeviceStateCountHandler.java
+ * @Description 设备数量状态更新订阅
+ * @createTime 2022年03月25日 14:21:00
+ */
+@Component
+public class DeviceStateCountHandler  extends Subscribe {
+
+    @Override
+    public String getId() {
+        return WebSocketConstant.DEVICE_STATE_COUNT;
+    }
+
+    @Override
+    public void onMessage(MessagingRequest message, ChannelContext channelContext) {
+
+    }
+
+    @Override
+    public void close(ChannelContext channelContext) {
+
+    }
+}

+ 39 - 0
coffee-system/src/main/java/com/coffee/bus/websocket/WebSocketCloseHandler.java

@@ -0,0 +1,39 @@
+package com.coffee.bus.websocket;
+
+import com.coffee.common.config.websocket.MessagingRequest;
+import com.coffee.common.config.websocket.handler.Subscribe;
+import com.coffee.common.config.websocket.handler.WsHandler;
+import org.springframework.stereotype.Component;
+import org.tio.core.ChannelContext;
+
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName WebSocketCloseHandler.java
+ * @Description TODO
+ * @createTime 2022年03月25日 17:07:00
+ */
+@Component
+public class WebSocketCloseHandler extends Subscribe {
+    @Override
+    public String getId() {
+        return "";
+    }
+
+    @Override
+    public void onMessage(MessagingRequest message, ChannelContext channelContext) {
+
+    }
+
+    @Override
+    public void close(ChannelContext channelContext) {
+        //关闭则取消订阅
+        Object result = Optional.ofNullable(channelContext.get(SUBSCRIBE_TOPIC)).orElse(new HashSet<>());
+        Set<String> subscribeTopicSet= (Set<String>) result;
+        subscribeTopicSet.forEach(topic->this.unsubscribe(channelContext,topic));
+    }
+}

+ 31 - 0
coffee-system/src/main/java/com/coffee/bus/websocket/WebSocketConstant.java

@@ -0,0 +1,31 @@
+package com.coffee.bus.websocket;
+
+import cn.hutool.core.util.StrUtil;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName WebSocketContants.java
+ * @Description websocket订阅所用常量
+ * @createTime 2022年03月25日 14:25:00
+ */
+public class WebSocketConstant {
+
+    public static final String ALARM_COUNT="alarm-count";
+    public static final String DEVICE_INFO_DETAIL="device-info-detail";
+    public static final String DEVICE_STATE_COUNT="device-state-count";
+
+    /**
+     * 主题格式为 device-info-detail:default:45789215623:医院id
+     *             alarm-count:default:电量不足:医院id
+     * @param id
+     * @param productName
+     * @param param
+     * @return
+     */
+    public static String getTopic(String id,String productName,String param,String tenantId){
+        productName=StrUtil.isEmptyIfStr(productName)?"default":productName;
+//        tenantId=StrUtil.isNullOrUndefined(tenantId)?"*":tenantId;
+        return id+":"+productName+":"+param+":"+tenantId;
+    }
+}