package com.coffee.bus.websocket; import cn.hutool.core.collection.CollectionUtil; import com.coffee.common.Constants; import com.coffee.common.bo.LoginUser; import com.coffee.common.config.websocket.MessagingRequest; import com.coffee.common.config.websocket.handler.Subscribe; import org.springframework.stereotype.Component; import org.tio.core.ChannelContext; import java.util.List; import java.util.stream.Collectors; /** * @author lifang * @version 1.0.0 * @ClassName DeviceInfoDetailHandler.java * @Description 处理订阅设备详情 * @createTime 2022年03月25日 14:20:00 */ @Component public class DeviceInfoDetailHandler extends Subscribe{ @Override public String getId() { return WebSocketConstant.DEVICE_INFO_DETAIL; } @Override public void onMessage(MessagingRequest message, ChannelContext channelContext) { LoginUser loginUser = (LoginUser) channelContext.get(Constants.LOGIN_USER_KEY); if(loginUser==null){ channelContext.setClosed(true); return; } //获取所有设备id List params = message.getParams(); if(CollectionUtil.isEmpty(params)){ return; } //需要处理的主题 List subScribeTopic = params.stream().map(deviceId -> WebSocketConstant.getTopic(this.getId(), message.getProductName(), deviceId, loginUser.getTenantId())) .collect(Collectors.toList()); MessagingRequest.Type type = message.getType(); if(MessagingRequest.Type.sub==type){ //订阅主题 subScribeTopic.forEach(topic->this.subscribe(channelContext,topic)); }else { //取消订阅主题 subScribeTopic.forEach(topic->this.unsubscribe(channelContext,topic)); } } @Override public void close(ChannelContext channelContext) { } }