|
|
@@ -0,0 +1,184 @@
|
|
|
+package com.coffee.bus.websocket;
|
|
|
+
|
|
|
+import cn.dev33.satoken.stp.StpUtil;
|
|
|
+import cn.hutool.core.collection.CollectionUtil;
|
|
|
+import cn.hutool.core.util.ObjectUtil;
|
|
|
+import cn.hutool.core.util.StrUtil;
|
|
|
+import cn.hutool.json.JSONUtil;
|
|
|
+import com.coffee.bus.hospital.his.HisResponse;
|
|
|
+import com.coffee.bus.hospital.his.HisScriptSession;
|
|
|
+import com.coffee.bus.hospital.his.HisScriptSessionManager;
|
|
|
+import com.coffee.common.Constants;
|
|
|
+import com.coffee.common.bo.LoginUser;
|
|
|
+import com.coffee.common.config.websocket.HospitalCodeCheck;
|
|
|
+import com.coffee.common.config.websocket.MessagingRequest;
|
|
|
+import com.coffee.common.config.websocket.handler.WsHandler;
|
|
|
+import com.coffee.common.result.R;
|
|
|
+import lombok.AllArgsConstructor;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.boot.autoconfigure.AutoConfigureAfter;
|
|
|
+import org.springframework.data.redis.core.RedisTemplate;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+import org.tio.core.ChannelContext;
|
|
|
+import org.tio.core.Tio;
|
|
|
+import org.tio.http.common.HttpRequest;
|
|
|
+import org.tio.http.common.HttpResponse;
|
|
|
+import org.tio.websocket.common.WsRequest;
|
|
|
+import org.tio.websocket.common.WsResponse;
|
|
|
+import org.tio.websocket.server.handler.IWsMsgHandler;
|
|
|
+import java.util.*;
|
|
|
+import java.util.concurrent.CompletableFuture;
|
|
|
+import java.util.stream.Collectors;
|
|
|
+
|
|
|
+@Component
|
|
|
+@Slf4j
|
|
|
+@AllArgsConstructor
|
|
|
+@AutoConfigureAfter(RedisTemplate.class)
|
|
|
+public class DefaultWebSocketMsgHandler implements IWsMsgHandler {
|
|
|
+ private final List<WsHandler> messageHandlers;
|
|
|
+ private final HisScriptSessionManager scriptSessionManager;
|
|
|
+ private final HospitalCodeCheck codeCheck;
|
|
|
+ @Override
|
|
|
+ public HttpResponse handshake(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) {
|
|
|
+ return httpResponse;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onAfterHandshaked(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception {
|
|
|
+ //用户授权码
|
|
|
+ String authorization = httpRequest.getParam("Authorization");
|
|
|
+ if(log.isDebugEnabled()){
|
|
|
+ log.debug("websocket 握手成功,开始进行权限校验,Authorization:{}",authorization);
|
|
|
+ }
|
|
|
+ if(StrUtil.isNullOrUndefined(authorization)){
|
|
|
+ Tio.send(channelContext,WsResponse.fromText(JSONUtil.toJsonStr(R.fail("授权失败")),"utf-8"));
|
|
|
+ //给返回信息一些时间 todo1
|
|
|
+ channelContext.setClosed(true);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ if (StrUtil.isNotEmpty(authorization)) {
|
|
|
+ Object result = StpUtil.getTokenSessionByToken(authorization).get(Constants.LOGIN_USER_KEY);
|
|
|
+ if(null==result){
|
|
|
+ //尝试医院请求登录
|
|
|
+ String hospitalId = codeCheck.getHospitalId(authorization);
|
|
|
+ if (StrUtil.isNotBlank(hospitalId)) {
|
|
|
+ channelContext.set(Constants.HOSPITAL_ID,hospitalId);
|
|
|
+ HisScriptSession hisScriptSession = scriptSessionManager.get(hospitalId);
|
|
|
+ //绑定
|
|
|
+ hisScriptSession.bindChannel(channelContext);
|
|
|
+ }else {
|
|
|
+ unbind(channelContext);
|
|
|
+ Thread.sleep(50);
|
|
|
+ channelContext.setClosed(true);
|
|
|
+ if(log.isDebugEnabled()){
|
|
|
+ log.debug("Authorization:{},鉴权失败",authorization);
|
|
|
+ }
|
|
|
+ Tio.send(channelContext,WsResponse.fromText(JSONUtil.toJsonStr(R.fail("授权失败")),"utf-8"));
|
|
|
+ }
|
|
|
+ return;
|
|
|
+ }else {
|
|
|
+ Tio.bindToken(channelContext, JSONUtil.toJsonStr(authorization));
|
|
|
+ LoginUser loginUser = (LoginUser)result;
|
|
|
+ channelContext.set(Constants.LOGIN_USER_KEY,loginUser);
|
|
|
+ }
|
|
|
+ Tio.send(channelContext,WsResponse.fromText(JSONUtil.toJsonStr(R.success("连接成功")),"utf-8"));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Object onBytes(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext){
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Object onClose(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) {
|
|
|
+ messageHandlers.forEach(wsHandler -> wsHandler.close(channelContext));
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Object onText(WsRequest wsRequest, String message, ChannelContext channelContext) {
|
|
|
+ if (StrUtil.isEmpty(message)) {
|
|
|
+ unbind(channelContext);
|
|
|
+ channelContext.setClosed(true);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ //心跳请求 todo
|
|
|
+ if("ping".equals(message.trim().toLowerCase())){
|
|
|
+ Tio.send(channelContext,WsResponse.fromText("pong","utf-8"));
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ synchronized (channelContext){
|
|
|
+ if(log.isDebugEnabled()){
|
|
|
+ log.debug("websocket 接收到消息,message:{},token:{},userId:{}",message,channelContext.getToken(),JSONUtil.toJsonStr(channelContext.get(Constants.LOGIN_USER_KEY)));
|
|
|
+ }
|
|
|
+ Object user = channelContext.get(Constants.LOGIN_USER_KEY);
|
|
|
+ if(ObjectUtil.isNotNull(user)){
|
|
|
+ handleUserMessage(wsRequest,message,channelContext);
|
|
|
+ }else {
|
|
|
+ handleHospitalMessage(wsRequest,message,channelContext,String.valueOf(channelContext.get(Constants.HOSPITAL_ID)));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }catch (Exception e){
|
|
|
+ log.warn("websocket 接收到异常请求,token:{},message:{},userId:{}",channelContext.getToken(),message,JSONUtil.toJsonStr(channelContext.get(Constants.LOGIN_USER_KEY)));
|
|
|
+ unbind(channelContext);
|
|
|
+ channelContext.setClosed(true);
|
|
|
+ }
|
|
|
+
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void unbind(ChannelContext channelContext){
|
|
|
+ Tio.unbindToken(channelContext);
|
|
|
+ Tio.unbindUser(channelContext);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 描述: 处理用户请求
|
|
|
+ * @author lifang
|
|
|
+ * @date 2022/5/27 9:54
|
|
|
+ * @param wsRequest
|
|
|
+ * @param message
|
|
|
+ * @param channelContext
|
|
|
+ * @return void
|
|
|
+ */
|
|
|
+ private void handleUserMessage(WsRequest wsRequest, String message, ChannelContext channelContext){
|
|
|
+ MessagingRequest messagingRequest = JSONUtil.toBean(message, MessagingRequest.class);
|
|
|
+ messagingRequest.validate();
|
|
|
+ List<WsHandler> collect = messageHandlers
|
|
|
+ .parallelStream()
|
|
|
+ .filter(handler -> messagingRequest.getId().equals(handler.getId()))
|
|
|
+ .collect(Collectors.toList());
|
|
|
+ //传入格式错误,不支持该订阅id
|
|
|
+ if(CollectionUtil.isEmpty(collect)){
|
|
|
+ unbind(channelContext);
|
|
|
+ channelContext.setClosed(true);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ collect.forEach(handler->handler.onMessage(messagingRequest,channelContext));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 描述: 处理医院响应请求
|
|
|
+ * @author lifang
|
|
|
+ * @date 2022/5/27 9:54
|
|
|
+ * @param wsRequest
|
|
|
+ * @param message
|
|
|
+ * @param channelContext
|
|
|
+ * @return void
|
|
|
+ */
|
|
|
+ private void handleHospitalMessage(WsRequest wsRequest, String message, ChannelContext channelContext,String hospitalId){
|
|
|
+ MessagingRequest messagingRequest = JSONUtil.toBean(message, MessagingRequest.class);
|
|
|
+ try {
|
|
|
+ //医院主题请求
|
|
|
+ messagingRequest.validate();
|
|
|
+ }catch (Exception e){
|
|
|
+ //医院响应
|
|
|
+ HisResponse hisResponse = JSONUtil.toBean(message, HisResponse.class);
|
|
|
+ CompletableFuture
|
|
|
+ .runAsync(()-> scriptSessionManager.get(hospitalId).response(hisResponse));
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|