| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362 |
- package com.nb.bus.service;
- import cn.hutool.core.collection.CollUtil;
- import cn.hutool.core.text.CharSequenceUtil;
- import cn.hutool.core.util.ObjectUtil;
- import cn.hutool.core.util.StrUtil;
- import com.aliyuncs.iot.model.v20180120.QueryDeviceDetailResponse;
- import com.aliyuncs.iot.model.v20180120.QueryDeviceResponse;
- import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
- import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
- import com.baomidou.mybatisplus.core.metadata.IPage;
- import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
- import com.nb.aliyun.sdk.AliyunIotSdk;
- import com.nb.bus.bean.AliIotConfig;
- import com.nb.bus.registry.constant.DeviceKeyConstant;
- import com.nb.bus.registry.device.DeviceRegistry;
- import com.nb.bus.entity.BusDeviceEntity;
- import com.nb.bus.mapper.BusDeviceMapper;
- import com.nb.bus.registry.device.DeviceOperator;
- import com.nb.bus.service.dto.DeviceQuery;
- import com.nb.bus.service.dto.DeviceResult;
- import com.nb.common.crud.BaseService;
- import com.nb.common.exception.CustomException;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.context.annotation.Lazy;
- import org.springframework.stereotype.Service;
- import org.springframework.transaction.annotation.Transactional;
- import java.util.*;
- import java.util.concurrent.CompletableFuture;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.atomic.AtomicReference;
- import java.util.stream.Collectors;
- /**
- * @author lifang
- * @version 1.0.0
- * @ClassName LocalBusHospitalService.java
- * @Description 设备注册
- * @createTime 2022年03月19日 09:27:00
- */
- @Service
- @Slf4j
- public class LocalBusDeviceService extends BaseService<BusDeviceMapper, BusDeviceEntity,String> {
- @Autowired
- @Lazy
- private DeviceRegistry deviceRegistry;
- @Autowired
- private BusDeviceMapper deviceMapper;
- @Autowired
- private AliyunIotSdk aliyunIotSdk;
- @Override
- public void validateBeforeSave(BusDeviceEntity entity) {
- if(entity.getTenantId()==null){
- entity.setTenantId("1");
- }
- }
- @Override
- public void validateBeforeUpdate(BusDeviceEntity entity) {
- }
- @Override
- public void validateBeforeDelete(String id) {
- }
- @Override
- public void postSave(BusDeviceEntity entity) {
- }
- @Override
- public void postUpdate(BusDeviceEntity entity) {
- DeviceOperator deviceOperator = deviceRegistry
- .getOperator(entity.getDeviceId());
- if(deviceOperator==null){
- return;
- }
- if(entity.getEnable()!=null){
- deviceOperator.setEnable(entity.getEnable());
- }
- deviceOperator.getCache().remove(DeviceKeyConstant.TENANT_ID);
- }
- @Override
- public void postDelete(String id) {
- BusDeviceEntity registeredEntity = this.getById(id);
- deviceRegistry.remove(registeredEntity.getDeviceId());
- }
- /**
- * 描述: 根据id删除阿里云设备和本地设备记录
- * @author lifang
- * @date 2022/7/8 8:48
- * @param id
- * @return void
- */
- @Transactional(rollbackFor = Exception.class)
- public boolean removeIotById(String id){
- //首先判断id是否存在
- BusDeviceEntity device = this.getById(id);
- if(ObjectUtil.isNotNull(device)){
- if (aliyunIotSdk.deleteDevice(device.getDeviceId())) {
- return this.removeById(id);
- }else {
- return false;
- }
- }
- return true;
- }
- /**
- * @author 龙三郎
- * 根据deviceId删除设备
- * @param deviceId
- */
- public void removeByDeviceId(String deviceId){
- this.remove(new QueryWrapper<BusDeviceEntity>().lambda()
- .eq(BusDeviceEntity::getDeviceId,deviceId));
- }
- /**
- * @author 龙三郎
- * 根据deviceId更新设备在线状态
- * @param device
- */
- public boolean updateDevice(BusDeviceEntity device){
- return this.update(device,new QueryWrapper<BusDeviceEntity>().lambda()
- .eq(BusDeviceEntity::getDeviceId,device.getDeviceId()));
- }
- /**
- * @author 龙三郎
- * 保存一个设备,系统中存在时就更新,不存在时则插入。
- * @param entity
- */
- public boolean saveDevice(BusDeviceEntity entity) {
- // 查询设备是否存在,无视逻辑删除
- BusDeviceEntity device = deviceMapper.selectOneByDeviceId(entity.getDeviceId());
- // 判断设备是否存在
- boolean isExists = Objects.nonNull(device);
- // 设备存在,且处于删除状态,首先去掉逻辑删除标志 ?? 逻辑删除后设备数据是否不再接收
- if (isExists && device.getIsDelete() == 1){
- deviceMapper.notDelete(entity.getDeviceId());
- }
- // 设备存在
- if (isExists){
- // 更新设备
- return this.updateDevice(entity);
- }else {
- // 添加设备
- try {
- return this.save(entity);
- }catch (Exception e){
- e.printStackTrace();
- }
- return false;
- }
- }
- /**
- * @author 龙三郎
- * 更新一个系统中现有的设备
- * @param deviceId
- */
- public boolean updateDeviceByDeviceId(String deviceId) {
- // 查询设备是否存在
- BusDeviceEntity device = getByDeviceId(deviceId);
- // 设备不存在直接退出
- if (Objects.isNull(device)){
- return false;
- }
- // 从阿里云物联网查询设备
- QueryDeviceDetailResponse response = aliyunIotSdk.queryDeviceDetail(deviceId);
- // 设备存在
- if (Boolean.TRUE.equals(response.getSuccess())){
- // 更新设备参数
- device.updateFields(response.getData());
- // 更新设备
- return this.updateDevice(device);
- }else {
- return false;
- }
- }
- /**
- * @author 龙三郎
- * 根据deviceId获取设备
- * @param deviceId
- * @return
- */
- public BusDeviceEntity getByDeviceId(String deviceId) {
- BusDeviceEntity device = getOne(new QueryWrapper<BusDeviceEntity>().lambda()
- .eq(BusDeviceEntity::getDeviceId,deviceId));
- return device;
- }
- /**
- * @author 龙三郎
- * 该方法用于从阿里云同步设备,系统暂时不允许创建非阿里云物联网平台设备。意思是系统中的设备必须存在于阿里云物联网平台。
- * 通过deviceId从阿里云查询设备,如果设备在阿里云平台不存在,则创建失败,如果存在,则获取设备数据,插入或更新设备数据,表示创建成功。
- * @param deviceId
- * @return
- */
- public BusDeviceEntity addDevice(String deviceId) {
- BusDeviceEntity device = new BusDeviceEntity();
- device.setConfig(new AliIotConfig());
- // 从阿里云物联网查询设备
- QueryDeviceDetailResponse response = aliyunIotSdk.queryDeviceDetail(deviceId);
- // 设备存在
- if (Boolean.TRUE.equals(response.getSuccess())){
- device.updateFields(response.getData());
- // 存储设备
- this.saveDevice(device);
- return device;
- }
- // 不存在返回null
- return null;
- }
- public IPage<DeviceResult> pageQuery(Page<DeviceResult> page, DeviceQuery query){
- return this.baseMapper.pageQuery(page,query);
- }
- public DeviceResult view(String id){
- return this.baseMapper.view(id);
- }
- /**
- * 描述: 从阿里云平台获取全部的设备,同步到系统数据库
- * @author lifang
- * @date 2022/6/21 9:46
- * @param
- * @return int
- */
- public void syncDeviceFromIot() throws ExecutionException, InterruptedException {
- log.info("从阿里云同步所有设备信息");
- HashMap<String, BusDeviceEntity> deviceById = new HashMap<>();
- ArrayList<BusDeviceEntity> devices = new ArrayList<>();
- List<QueryDeviceResponse.DeviceInfo> deviceInfos = aliyunIotSdk.queryDevice();
- log.info("同步数据数量:【{}】",CollUtil.size(deviceInfos));
- if (CollUtil.isNotEmpty(deviceInfos)) {
- for (QueryDeviceResponse.DeviceInfo deviceInfo : deviceInfos) {
- BusDeviceEntity device = new BusDeviceEntity();
- device.setConfig(new AliIotConfig());
- // 更新设备属性
- device.updateFields(deviceInfo);
- devices.add(device);
- deviceById.put(device.getDeviceId(),device);
- }
- CompletableFuture.runAsync(() -> {
- Set<String> pollDeviceIds = devices.stream().filter(info -> StrUtil.isNotBlank(info.getDeviceId()))
- .map(BusDeviceEntity::getDeviceId)
- .collect(Collectors.toSet());
- List<BusDeviceEntity> existDevices = this.list(new QueryWrapper<BusDeviceEntity>().lambda().in(BusDeviceEntity::getDeviceId, pollDeviceIds));
- if (CollUtil.size(pollDeviceIds) != CollUtil.size(existDevices)) {
- //有新的设备产生
- Set<String> existDeviceIds = existDevices.stream()
- .peek(device->{
- BusDeviceEntity pollDevice = deviceById.get(device.getDeviceId());
- device.setStatus(pollDevice.getStatus());
- })
- .map(BusDeviceEntity::getDeviceId).collect(Collectors.toSet());
- List<BusDeviceEntity> insertDevices = devices.stream().filter(info -> !existDeviceIds.contains(info.getDeviceId())).collect(Collectors.toList());
- this.saveBatch(insertDevices);
- log.info("新增设备数量:【{}】",CollUtil.size(insertDevices));
- }
- log.info("更新设备数量:【{}】",CollUtil.size(existDevices));
- this.updateBatchById(existDevices);
- })
- .whenComplete((i,e)->{
- if(e!=null){
- log.error("同步设备失败",e);
- }
- });
- }
- }
- /**
- * @author 龙三郎
- * 从阿里云平台获取全部的设备,同步到系统数据库
- * @return
- */
- @Deprecated
- public int syncAllDevice(){
- Integer m = 0;
- // 创建异步执行任务,有返回值
- CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(()->{
- AtomicReference<Integer> n = new AtomicReference<>(0);
- // 同步所有的设备
- aliyunIotSdk.queryDevice().forEach(item ->{
- //避免循环中操作数据库
- BusDeviceEntity device = new BusDeviceEntity();
- device.setConfig(new AliIotConfig());
- // 更新设备属性
- device.updateFields(item);
- n.updateAndGet(v -> v + (this.saveDevice(device)?1:0));
- });
- return n.get();
- });
- //等待子任务执行完成
- try {
- m = cf.get();
- }catch (Exception e){
- throw new CustomException(e.getMessage());
- }
- return m;
- }
- /**
- * @author 龙三郎
- * 根据ids获取指定的设备,更新本地设备
- * @return
- */
- public int syncDevice(List<String> ids) {
- AtomicReference<Integer> n = new AtomicReference<>(0);
- // ids不能为空
- if (CollUtil.isEmpty(ids)){
- throw new CustomException("设备id不能为空");
- }
- // 同步指定的设备
- ids.forEach(id->{
- n.updateAndGet(v -> v + (this.updateDeviceByDeviceId(id)?1:0));
- });
- return n.get();
- }
- /**
- * 描述: 设备医院换绑操作
- * @author lifang
- * @date 2022/5/8 21:34
- * @param deviceIds 换绑的设备id
- * @param afterTenantId 换绑之后的医院id
- * @return void
- */
- @Transactional(rollbackFor = Exception.class)
- public void shift(List<String> deviceIds, String afterTenantId) {
- List<DeviceOperator> operators = deviceIds.stream()
- .map(deviceRegistry::getOperator)
- .collect(Collectors.toList());
- if (CollUtil.isEmpty(operators)) {
- return;
- }
- this.update(new UpdateWrapper<BusDeviceEntity>().lambda()
- .in(BusDeviceEntity::getDeviceId,deviceIds)
- .set(BusDeviceEntity::getTenantId,afterTenantId));
- operators
- .stream()
- .filter(Objects::nonNull)
- .forEach(deviceOperator -> deviceOperator.getCache().remove(DeviceKeyConstant.TENANT_ID));
- }
- }
|