package com.nb.bus.service; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.text.CharSequenceUtil; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.aliyuncs.iot.model.v20180120.QueryDeviceDetailResponse; import com.aliyuncs.iot.model.v20180120.QueryDeviceResponse; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.nb.aliyun.sdk.AliyunIotSdk; import com.nb.bus.bean.AliIotConfig; import com.nb.bus.registry.constant.DeviceKeyConstant; import com.nb.bus.registry.device.DeviceRegistry; import com.nb.bus.entity.BusDeviceEntity; import com.nb.bus.mapper.BusDeviceMapper; import com.nb.bus.registry.device.DeviceOperator; import com.nb.bus.service.dto.DeviceQuery; import com.nb.bus.service.dto.DeviceResult; import com.nb.common.crud.BaseService; import com.nb.common.exception.CustomException; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; /** * @author lifang * @version 1.0.0 * @ClassName LocalBusHospitalService.java * @Description 设备注册 * @createTime 2022年03月19日 09:27:00 */ @Service @Slf4j public class LocalBusDeviceService extends BaseService { @Autowired @Lazy private DeviceRegistry deviceRegistry; @Autowired private BusDeviceMapper deviceMapper; @Autowired private AliyunIotSdk aliyunIotSdk; @Override public void validateBeforeSave(BusDeviceEntity entity) { if(entity.getTenantId()==null){ entity.setTenantId("1"); } } @Override public void validateBeforeUpdate(BusDeviceEntity entity) { } @Override public void validateBeforeDelete(String id) { } @Override public void postSave(BusDeviceEntity entity) { } @Override public void postUpdate(BusDeviceEntity entity) { DeviceOperator deviceOperator = deviceRegistry .getOperator(entity.getDeviceId()); if(deviceOperator==null){ return; } if(entity.getEnable()!=null){ deviceOperator.setEnable(entity.getEnable()); } deviceOperator.getCache().remove(DeviceKeyConstant.TENANT_ID); } @Override public void postDelete(String id) { BusDeviceEntity registeredEntity = this.getById(id); deviceRegistry.remove(registeredEntity.getDeviceId()); } /** * 描述: 根据id删除阿里云设备和本地设备记录 * @author lifang * @date 2022/7/8 8:48 * @param id * @return void */ @Transactional(rollbackFor = Exception.class) public boolean removeIotById(String id){ //首先判断id是否存在 BusDeviceEntity device = this.getById(id); if(ObjectUtil.isNotNull(device)){ if (aliyunIotSdk.deleteDevice(device.getDeviceId())) { return this.removeById(id); }else { return false; } } return true; } /** * @author 龙三郎 * 根据deviceId删除设备 * @param deviceId */ public void removeByDeviceId(String deviceId){ this.remove(new QueryWrapper().lambda() .eq(BusDeviceEntity::getDeviceId,deviceId)); } /** * @author 龙三郎 * 根据deviceId更新设备在线状态 * @param device */ public boolean updateDevice(BusDeviceEntity device){ return this.update(device,new QueryWrapper().lambda() .eq(BusDeviceEntity::getDeviceId,device.getDeviceId())); } /** * @author 龙三郎 * 保存一个设备,系统中存在时就更新,不存在时则插入。 * @param entity */ public boolean saveDevice(BusDeviceEntity entity) { // 查询设备是否存在,无视逻辑删除 BusDeviceEntity device = deviceMapper.selectOneByDeviceId(entity.getDeviceId()); // 判断设备是否存在 boolean isExists = Objects.nonNull(device); // 设备存在,且处于删除状态,首先去掉逻辑删除标志 ?? 逻辑删除后设备数据是否不再接收 if (isExists && device.getIsDelete() == 1){ deviceMapper.notDelete(entity.getDeviceId()); } // 设备存在 if (isExists){ // 更新设备 return this.updateDevice(entity); }else { // 添加设备 try { return this.save(entity); }catch (Exception e){ e.printStackTrace(); } return false; } } /** * @author 龙三郎 * 更新一个系统中现有的设备 * @param deviceId */ public boolean updateDeviceByDeviceId(String deviceId) { // 查询设备是否存在 BusDeviceEntity device = getByDeviceId(deviceId); // 设备不存在直接退出 if (Objects.isNull(device)){ return false; } // 从阿里云物联网查询设备 QueryDeviceDetailResponse response = aliyunIotSdk.queryDeviceDetail(deviceId); // 设备存在 if (Boolean.TRUE.equals(response.getSuccess())){ // 更新设备参数 device.updateFields(response.getData()); // 更新设备 return this.updateDevice(device); }else { return false; } } /** * @author 龙三郎 * 根据deviceId获取设备 * @param deviceId * @return */ public BusDeviceEntity getByDeviceId(String deviceId) { BusDeviceEntity device = getOne(new QueryWrapper().lambda() .eq(BusDeviceEntity::getDeviceId,deviceId)); return device; } /** * @author 龙三郎 * 该方法用于从阿里云同步设备,系统暂时不允许创建非阿里云物联网平台设备。意思是系统中的设备必须存在于阿里云物联网平台。 * 通过deviceId从阿里云查询设备,如果设备在阿里云平台不存在,则创建失败,如果存在,则获取设备数据,插入或更新设备数据,表示创建成功。 * @param deviceId * @return */ public BusDeviceEntity addDevice(String deviceId) { BusDeviceEntity device = new BusDeviceEntity(); device.setConfig(new AliIotConfig()); // 从阿里云物联网查询设备 QueryDeviceDetailResponse response = aliyunIotSdk.queryDeviceDetail(deviceId); // 设备存在 if (Boolean.TRUE.equals(response.getSuccess())){ device.updateFields(response.getData()); // 存储设备 this.saveDevice(device); return device; } // 不存在返回null return null; } public IPage pageQuery(Page page, DeviceQuery query){ return this.baseMapper.pageQuery(page,query); } public DeviceResult view(String id){ return this.baseMapper.view(id); } /** * 描述: 从阿里云平台获取全部的设备,同步到系统数据库 * @author lifang * @date 2022/6/21 9:46 * @param * @return int */ public void syncDeviceFromIot() throws ExecutionException, InterruptedException { log.info("从阿里云同步所有设备信息"); HashMap deviceById = new HashMap<>(); ArrayList devices = new ArrayList<>(); List deviceInfos = aliyunIotSdk.queryDevice(); log.info("同步数据数量:【{}】",CollUtil.size(deviceInfos)); if (CollUtil.isNotEmpty(deviceInfos)) { for (QueryDeviceResponse.DeviceInfo deviceInfo : deviceInfos) { BusDeviceEntity device = new BusDeviceEntity(); device.setConfig(new AliIotConfig()); // 更新设备属性 device.updateFields(deviceInfo); devices.add(device); deviceById.put(device.getDeviceId(),device); } CompletableFuture.runAsync(() -> { Set pollDeviceIds = devices.stream().filter(info -> StrUtil.isNotBlank(info.getDeviceId())) .map(BusDeviceEntity::getDeviceId) .collect(Collectors.toSet()); List existDevices = this.list(new QueryWrapper().lambda().in(BusDeviceEntity::getDeviceId, pollDeviceIds)); if (CollUtil.size(pollDeviceIds) != CollUtil.size(existDevices)) { //有新的设备产生 Set existDeviceIds = existDevices.stream() .peek(device->{ BusDeviceEntity pollDevice = deviceById.get(device.getDeviceId()); device.setStatus(pollDevice.getStatus()); }) .map(BusDeviceEntity::getDeviceId).collect(Collectors.toSet()); List insertDevices = devices.stream().filter(info -> !existDeviceIds.contains(info.getDeviceId())).collect(Collectors.toList()); this.saveBatch(insertDevices); log.info("新增设备数量:【{}】",CollUtil.size(insertDevices)); } log.info("更新设备数量:【{}】",CollUtil.size(existDevices)); this.updateBatchById(existDevices); }) .whenComplete((i,e)->{ if(e!=null){ log.error("同步设备失败",e); } }); } } /** * @author 龙三郎 * 从阿里云平台获取全部的设备,同步到系统数据库 * @return */ @Deprecated public int syncAllDevice(){ Integer m = 0; // 创建异步执行任务,有返回值 CompletableFuture cf = CompletableFuture.supplyAsync(()->{ AtomicReference n = new AtomicReference<>(0); // 同步所有的设备 aliyunIotSdk.queryDevice().forEach(item ->{ //避免循环中操作数据库 BusDeviceEntity device = new BusDeviceEntity(); device.setConfig(new AliIotConfig()); // 更新设备属性 device.updateFields(item); n.updateAndGet(v -> v + (this.saveDevice(device)?1:0)); }); return n.get(); }); //等待子任务执行完成 try { m = cf.get(); }catch (Exception e){ throw new CustomException(e.getMessage()); } return m; } /** * @author 龙三郎 * 根据ids获取指定的设备,更新本地设备 * @return */ public int syncDevice(List ids) { AtomicReference n = new AtomicReference<>(0); // ids不能为空 if (CollUtil.isEmpty(ids)){ throw new CustomException("设备id不能为空"); } // 同步指定的设备 ids.forEach(id->{ n.updateAndGet(v -> v + (this.updateDeviceByDeviceId(id)?1:0)); }); return n.get(); } /** * 描述: 设备医院换绑操作 * @author lifang * @date 2022/5/8 21:34 * @param deviceIds 换绑的设备id * @param afterTenantId 换绑之后的医院id * @return void */ @Transactional(rollbackFor = Exception.class) public void shift(List deviceIds, String afterTenantId) { List operators = deviceIds.stream() .map(deviceRegistry::getOperator) .collect(Collectors.toList()); if (CollUtil.isEmpty(operators)) { return; } this.update(new UpdateWrapper().lambda() .in(BusDeviceEntity::getDeviceId,deviceIds) .set(BusDeviceEntity::getTenantId,afterTenantId)); operators .stream() .filter(Objects::nonNull) .forEach(deviceOperator -> deviceOperator.getCache().remove(DeviceKeyConstant.TENANT_ID)); } }