LocalBusDeviceService.java 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. package com.coffee.bus.service;
  2. import cn.hutool.core.collection.CollUtil;
  3. import cn.hutool.core.text.CharSequenceUtil;
  4. import cn.hutool.core.util.StrUtil;
  5. import com.aliyuncs.iot.model.v20180120.QueryDeviceDetailResponse;
  6. import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
  7. import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
  8. import com.baomidou.mybatisplus.core.metadata.IPage;
  9. import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
  10. import com.coffee.aliyun.sdk.AliyunIotSdk;
  11. import com.coffee.bus.bean.AliIotConfig;
  12. import com.coffee.bus.registry.device.DeviceRegistry;
  13. import com.coffee.bus.entity.BusDeviceEntity;
  14. import com.coffee.bus.mapper.BusDeviceMapper;
  15. import com.coffee.bus.registry.device.DeviceOperator;
  16. import com.coffee.bus.service.dto.DeviceQuery;
  17. import com.coffee.bus.service.dto.DeviceResult;
  18. import com.coffee.common.crud.BaseService;
  19. import com.coffee.common.exception.CustomException;
  20. import lombok.extern.slf4j.Slf4j;
  21. import org.springframework.beans.factory.annotation.Autowired;
  22. import org.springframework.context.annotation.Lazy;
  23. import org.springframework.stereotype.Service;
  24. import org.springframework.transaction.annotation.Transactional;
  25. import java.util.List;
  26. import java.util.Objects;
  27. import java.util.concurrent.CompletableFuture;
  28. import java.util.concurrent.atomic.AtomicReference;
  29. import java.util.stream.Collectors;
  30. /**
  31. * @author lifang
  32. * @version 1.0.0
  33. * @ClassName LocalBusHospitalService.java
  34. * @Description 设备注册
  35. * @createTime 2022年03月19日 09:27:00
  36. */
  37. @Service
  38. @Slf4j
  39. public class LocalBusDeviceService extends BaseService<BusDeviceMapper, BusDeviceEntity,String> {
  40. @Autowired
  41. @Lazy
  42. private DeviceRegistry deviceRegistry;
  43. @Autowired
  44. private BusDeviceMapper deviceMapper;
  45. @Autowired
  46. private AliyunIotSdk aliyunIotSdk;
  47. @Override
  48. public void validateBeforeSave(BusDeviceEntity entity) {
  49. if(entity.getTenantId()==null){
  50. entity.setTenantId("1");
  51. }
  52. }
  53. @Override
  54. public void validateBeforeUpdate(BusDeviceEntity entity) {
  55. }
  56. @Override
  57. public void validateBeforeDelete(String id) {
  58. }
  59. @Override
  60. public void postSave(BusDeviceEntity entity) {
  61. }
  62. @Override
  63. public void postUpdate(BusDeviceEntity entity) {
  64. DeviceOperator deviceOperator = deviceRegistry
  65. .getOperator(entity.getDeviceId());
  66. if(deviceOperator==null){
  67. return;
  68. }
  69. if(CharSequenceUtil.isNotEmpty(entity.getAlias())){
  70. deviceOperator.setAlias(entity.getAlias());
  71. }
  72. if(entity.getEnable()!=null){
  73. deviceOperator.setEnable(entity.getEnable());
  74. }
  75. if(CharSequenceUtil.isNotEmpty(entity.getTenantId())){
  76. deviceOperator.setTenantId(entity.getTenantId());
  77. }
  78. }
  79. @Override
  80. public void postDelete(String id) {
  81. BusDeviceEntity registeredEntity = this.getById(id);
  82. deviceRegistry.remove(registeredEntity.getDeviceId());
  83. }
  84. /**
  85. * @author 龙三郎
  86. * 根据deviceId删除设备
  87. * @param deviceId
  88. */
  89. public void removeByDeviceId(String deviceId){
  90. this.remove(new QueryWrapper<BusDeviceEntity>().lambda()
  91. .eq(BusDeviceEntity::getDeviceId,deviceId));
  92. }
  93. /**
  94. * @author 龙三郎
  95. * 根据deviceId更新设备在线状态
  96. * @param device
  97. */
  98. public boolean updateDevice(BusDeviceEntity device){
  99. return this.update(device,new QueryWrapper<BusDeviceEntity>().lambda()
  100. .eq(BusDeviceEntity::getDeviceId,device.getDeviceId()));
  101. }
  102. /**
  103. * @author 龙三郎
  104. * 保存一个设备,系统中存在时就更新,不存在时则插入。
  105. * @param entity
  106. */
  107. public boolean saveDevice(BusDeviceEntity entity) {
  108. // 查询设备是否存在,无视逻辑删除
  109. BusDeviceEntity device = deviceMapper.selectOneByDeviceId(entity.getDeviceId());
  110. // 判断设备是否存在
  111. boolean isExists = Objects.nonNull(device);
  112. // 设备存在,且处于删除状态,首先去掉逻辑删除标志 ?? 逻辑删除后设备数据是否不再接收
  113. if (isExists && device.getIsDelete() == 1){
  114. deviceMapper.notDelete(entity.getDeviceId());
  115. }
  116. // 设备存在
  117. if (isExists){
  118. // 更新设备
  119. return this.updateDevice(entity);
  120. }else {
  121. // 添加设备
  122. try {
  123. return this.save(entity);
  124. }catch (Exception e){
  125. e.printStackTrace();
  126. }
  127. return false;
  128. }
  129. }
  130. /**
  131. * @author 龙三郎
  132. * 更新一个系统中现有的设备
  133. * @param deviceId
  134. */
  135. public boolean updateDeviceByDeviceId(String deviceId) {
  136. // 查询设备是否存在
  137. BusDeviceEntity device = getByDeviceId(deviceId);
  138. // 设备不存在直接退出
  139. if (Objects.isNull(device)){
  140. return false;
  141. }
  142. // 从阿里云物联网查询设备
  143. QueryDeviceDetailResponse response = aliyunIotSdk.queryDeviceDetail(deviceId);
  144. // 设备存在
  145. if (Boolean.TRUE.equals(response.getSuccess())){
  146. // 更新设备参数
  147. device.updateFields(response.getData());
  148. // 更新设备
  149. return this.updateDevice(device);
  150. }else {
  151. return false;
  152. }
  153. }
  154. /**
  155. * @author 龙三郎
  156. * 根据deviceId获取设备
  157. * @param deviceId
  158. * @return
  159. */
  160. public BusDeviceEntity getByDeviceId(String deviceId) {
  161. BusDeviceEntity device = getOne(new QueryWrapper<BusDeviceEntity>().lambda()
  162. .eq(BusDeviceEntity::getDeviceId,deviceId));
  163. return device;
  164. }
  165. /**
  166. * @author 龙三郎
  167. * 该方法用于从阿里云同步设备,系统暂时不允许创建非阿里云物联网平台设备。意思是系统中的设备必须存在于阿里云物联网平台。
  168. * 通过deviceId从阿里云查询设备,如果设备在阿里云平台不存在,则创建失败,如果存在,则获取设备数据,插入或更新设备数据,表示创建成功。
  169. * @param deviceId
  170. * @return
  171. */
  172. public BusDeviceEntity addDevice(String deviceId) {
  173. BusDeviceEntity device = new BusDeviceEntity();
  174. device.setConfig(new AliIotConfig());
  175. // 从阿里云物联网查询设备
  176. QueryDeviceDetailResponse response = aliyunIotSdk.queryDeviceDetail(deviceId);
  177. // 设备存在
  178. if (Boolean.TRUE.equals(response.getSuccess())){
  179. device.updateFields(response.getData());
  180. // 存储设备
  181. this.saveDevice(device);
  182. return device;
  183. }
  184. // 不存在返回null
  185. return null;
  186. }
  187. public IPage<DeviceResult> pageQuery(Page<DeviceResult> page, DeviceQuery query){
  188. return this.baseMapper.pageQuery(page,query);
  189. }
  190. /**
  191. * @author 龙三郎
  192. * 从阿里云平台获取全部的设备,同步到系统数据库
  193. * @return
  194. */
  195. public int syncAllDevice(){
  196. Integer m = 0;
  197. // 创建异步执行任务,有返回值
  198. CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(()->{
  199. AtomicReference<Integer> n = new AtomicReference<>(0);
  200. // 同步所有的设备
  201. aliyunIotSdk.queryDevice().forEach(item ->{
  202. BusDeviceEntity device = new BusDeviceEntity();
  203. device.setConfig(new AliIotConfig());
  204. // 更新设备属性
  205. device.updateFields(item);
  206. n.updateAndGet(v -> v + (this.saveDevice(device)?1:0));
  207. });
  208. return n.get();
  209. });
  210. //等待子任务执行完成
  211. try {
  212. m = cf.get();
  213. }catch (Exception e){
  214. throw new CustomException(e.getMessage());
  215. }
  216. return m;
  217. }
  218. /**
  219. * @author 龙三郎
  220. * 根据ids获取指定的设备,更新本地设备
  221. * @return
  222. */
  223. public int syncDevice(List<String> ids) {
  224. AtomicReference<Integer> n = new AtomicReference<>(0);
  225. // ids不能为空
  226. if (CollUtil.isEmpty(ids)){
  227. throw new CustomException("设备id不能为空");
  228. }
  229. // 同步指定的设备
  230. ids.forEach(id->{
  231. n.updateAndGet(v -> v + (this.updateDeviceByDeviceId(id)?1:0));
  232. });
  233. return n.get();
  234. }
  235. /**
  236. * 描述: 设备医院换绑操作
  237. * @author lifang
  238. * @date 2022/5/8 21:34
  239. * @param deviceIds 换绑的设备id
  240. * @param afterTenantId 换绑之后的医院id
  241. * @return void
  242. */
  243. @Transactional(rollbackFor = Exception.class)
  244. public void shift(List<String> deviceIds, String afterTenantId) {
  245. List<DeviceOperator> operators = deviceIds.stream()
  246. .map(deviceRegistry::getOperator)
  247. .collect(Collectors.toList());
  248. this.update(new UpdateWrapper<BusDeviceEntity>().lambda()
  249. .in(BusDeviceEntity::getDeviceId,deviceIds)
  250. .set(BusDeviceEntity::getTenantId,afterTenantId));
  251. operators
  252. .forEach(deviceOperator -> deviceOperator.setTenantId(afterTenantId));
  253. }
  254. }