LocalBusDeviceService.java 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. package com.nb.bus.service;
  2. import cn.hutool.core.collection.CollUtil;
  3. import cn.hutool.core.text.CharSequenceUtil;
  4. import cn.hutool.core.util.ObjectUtil;
  5. import cn.hutool.core.util.StrUtil;
  6. import com.aliyuncs.iot.model.v20180120.QueryDeviceDetailResponse;
  7. import com.aliyuncs.iot.model.v20180120.QueryDeviceResponse;
  8. import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
  9. import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
  10. import com.baomidou.mybatisplus.core.metadata.IPage;
  11. import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
  12. import com.nb.aliyun.sdk.AliyunIotSdk;
  13. import com.nb.bus.bean.AliIotConfig;
  14. import com.nb.bus.registry.constant.DeviceKeyConstant;
  15. import com.nb.bus.registry.device.DeviceRegistry;
  16. import com.nb.bus.entity.BusDeviceEntity;
  17. import com.nb.bus.mapper.BusDeviceMapper;
  18. import com.nb.bus.registry.device.DeviceOperator;
  19. import com.nb.bus.service.dto.DeviceQuery;
  20. import com.nb.bus.service.dto.DeviceResult;
  21. import com.nb.common.crud.BaseService;
  22. import com.nb.common.exception.CustomException;
  23. import lombok.extern.slf4j.Slf4j;
  24. import org.springframework.beans.factory.annotation.Autowired;
  25. import org.springframework.context.annotation.Lazy;
  26. import org.springframework.stereotype.Service;
  27. import org.springframework.transaction.annotation.Transactional;
  28. import java.util.*;
  29. import java.util.concurrent.CompletableFuture;
  30. import java.util.concurrent.ExecutionException;
  31. import java.util.concurrent.atomic.AtomicReference;
  32. import java.util.stream.Collectors;
  33. /**
  34. * @author lifang
  35. * @version 1.0.0
  36. * @ClassName LocalBusHospitalService.java
  37. * @Description 设备注册
  38. * @createTime 2022年03月19日 09:27:00
  39. */
  40. @Service
  41. @Slf4j
  42. public class LocalBusDeviceService extends BaseService<BusDeviceMapper, BusDeviceEntity,String> {
  43. @Autowired
  44. @Lazy
  45. private DeviceRegistry deviceRegistry;
  46. @Autowired
  47. private BusDeviceMapper deviceMapper;
  48. @Autowired
  49. private AliyunIotSdk aliyunIotSdk;
  50. @Override
  51. public void validateBeforeSave(BusDeviceEntity entity) {
  52. if(entity.getTenantId()==null){
  53. entity.setTenantId("1");
  54. }
  55. }
  56. @Override
  57. public void validateBeforeUpdate(BusDeviceEntity entity) {
  58. }
  59. @Override
  60. public void validateBeforeDelete(String id) {
  61. }
  62. @Override
  63. public void postSave(BusDeviceEntity entity) {
  64. }
  65. @Override
  66. public void postUpdate(BusDeviceEntity entity) {
  67. DeviceOperator deviceOperator = deviceRegistry
  68. .getOperator(entity.getDeviceId());
  69. if(deviceOperator==null){
  70. return;
  71. }
  72. if(entity.getEnable()!=null){
  73. deviceOperator.setEnable(entity.getEnable());
  74. }
  75. deviceOperator.getCache().remove(DeviceKeyConstant.TENANT_ID);
  76. }
  77. @Override
  78. public void postDelete(String id) {
  79. BusDeviceEntity registeredEntity = this.getById(id);
  80. deviceRegistry.remove(registeredEntity.getDeviceId());
  81. }
  82. /**
  83. * 描述: 根据id删除阿里云设备和本地设备记录
  84. * @author lifang
  85. * @date 2022/7/8 8:48
  86. * @param id
  87. * @return void
  88. */
  89. @Transactional(rollbackFor = Exception.class)
  90. public boolean removeIotById(String id){
  91. //首先判断id是否存在
  92. BusDeviceEntity device = this.getById(id);
  93. if(ObjectUtil.isNotNull(device)){
  94. if (aliyunIotSdk.deleteDevice(device.getDeviceId())) {
  95. return this.removeById(id);
  96. }else {
  97. return false;
  98. }
  99. }
  100. return true;
  101. }
  102. /**
  103. * @author 龙三郎
  104. * 根据deviceId删除设备
  105. * @param deviceId
  106. */
  107. public void removeByDeviceId(String deviceId){
  108. this.remove(new QueryWrapper<BusDeviceEntity>().lambda()
  109. .eq(BusDeviceEntity::getDeviceId,deviceId));
  110. }
  111. /**
  112. * @author 龙三郎
  113. * 根据deviceId更新设备在线状态
  114. * @param device
  115. */
  116. public boolean updateDevice(BusDeviceEntity device){
  117. return this.update(device,new QueryWrapper<BusDeviceEntity>().lambda()
  118. .eq(BusDeviceEntity::getDeviceId,device.getDeviceId()));
  119. }
  120. /**
  121. * @author 龙三郎
  122. * 保存一个设备,系统中存在时就更新,不存在时则插入。
  123. * @param entity
  124. */
  125. public boolean saveDevice(BusDeviceEntity entity) {
  126. // 查询设备是否存在,无视逻辑删除
  127. BusDeviceEntity device = deviceMapper.selectOneByDeviceId(entity.getDeviceId());
  128. // 判断设备是否存在
  129. boolean isExists = Objects.nonNull(device);
  130. // 设备存在,且处于删除状态,首先去掉逻辑删除标志 ?? 逻辑删除后设备数据是否不再接收
  131. if (isExists && device.getIsDelete() == 1){
  132. deviceMapper.notDelete(entity.getDeviceId());
  133. }
  134. // 设备存在
  135. if (isExists){
  136. // 更新设备
  137. return this.updateDevice(entity);
  138. }else {
  139. // 添加设备
  140. try {
  141. return this.save(entity);
  142. }catch (Exception e){
  143. e.printStackTrace();
  144. }
  145. return false;
  146. }
  147. }
  148. /**
  149. * @author 龙三郎
  150. * 更新一个系统中现有的设备
  151. * @param deviceId
  152. */
  153. public boolean updateDeviceByDeviceId(String deviceId) {
  154. // 查询设备是否存在
  155. BusDeviceEntity device = getByDeviceId(deviceId);
  156. // 设备不存在直接退出
  157. if (Objects.isNull(device)){
  158. return false;
  159. }
  160. // 从阿里云物联网查询设备
  161. QueryDeviceDetailResponse response = aliyunIotSdk.queryDeviceDetail(deviceId);
  162. // 设备存在
  163. if (Boolean.TRUE.equals(response.getSuccess())){
  164. // 更新设备参数
  165. device.updateFields(response.getData());
  166. // 更新设备
  167. return this.updateDevice(device);
  168. }else {
  169. return false;
  170. }
  171. }
  172. /**
  173. * @author 龙三郎
  174. * 根据deviceId获取设备
  175. * @param deviceId
  176. * @return
  177. */
  178. public BusDeviceEntity getByDeviceId(String deviceId) {
  179. BusDeviceEntity device = getOne(new QueryWrapper<BusDeviceEntity>().lambda()
  180. .eq(BusDeviceEntity::getDeviceId,deviceId));
  181. return device;
  182. }
  183. /**
  184. * @author 龙三郎
  185. * 该方法用于从阿里云同步设备,系统暂时不允许创建非阿里云物联网平台设备。意思是系统中的设备必须存在于阿里云物联网平台。
  186. * 通过deviceId从阿里云查询设备,如果设备在阿里云平台不存在,则创建失败,如果存在,则获取设备数据,插入或更新设备数据,表示创建成功。
  187. * @param deviceId
  188. * @return
  189. */
  190. public BusDeviceEntity addDevice(String deviceId) {
  191. BusDeviceEntity device = new BusDeviceEntity();
  192. device.setConfig(new AliIotConfig());
  193. // 从阿里云物联网查询设备
  194. QueryDeviceDetailResponse response = aliyunIotSdk.queryDeviceDetail(deviceId);
  195. // 设备存在
  196. if (Boolean.TRUE.equals(response.getSuccess())){
  197. device.updateFields(response.getData());
  198. // 存储设备
  199. this.saveDevice(device);
  200. return device;
  201. }
  202. // 不存在返回null
  203. return null;
  204. }
  205. public IPage<DeviceResult> pageQuery(Page<DeviceResult> page, DeviceQuery query){
  206. return this.baseMapper.pageQuery(page,query);
  207. }
  208. public DeviceResult view(String id){
  209. return this.baseMapper.view(id);
  210. }
  211. /**
  212. * 描述: 从阿里云平台获取全部的设备,同步到系统数据库
  213. * @author lifang
  214. * @date 2022/6/21 9:46
  215. * @param
  216. * @return int
  217. */
  218. public void syncDeviceFromIot() throws ExecutionException, InterruptedException {
  219. log.info("从阿里云同步所有设备信息");
  220. HashMap<String, BusDeviceEntity> deviceById = new HashMap<>();
  221. ArrayList<BusDeviceEntity> devices = new ArrayList<>();
  222. List<QueryDeviceResponse.DeviceInfo> deviceInfos = aliyunIotSdk.queryDevice();
  223. log.info("同步数据数量:【{}】",CollUtil.size(deviceInfos));
  224. if (CollUtil.isNotEmpty(deviceInfos)) {
  225. for (QueryDeviceResponse.DeviceInfo deviceInfo : deviceInfos) {
  226. BusDeviceEntity device = new BusDeviceEntity();
  227. device.setConfig(new AliIotConfig());
  228. // 更新设备属性
  229. device.updateFields(deviceInfo);
  230. devices.add(device);
  231. deviceById.put(device.getDeviceId(),device);
  232. }
  233. CompletableFuture.runAsync(() -> {
  234. Set<String> pollDeviceIds = devices.stream().filter(info -> StrUtil.isNotBlank(info.getDeviceId()))
  235. .map(BusDeviceEntity::getDeviceId)
  236. .collect(Collectors.toSet());
  237. List<BusDeviceEntity> existDevices = this.list(new QueryWrapper<BusDeviceEntity>().lambda().in(BusDeviceEntity::getDeviceId, pollDeviceIds));
  238. if (CollUtil.size(pollDeviceIds) != CollUtil.size(existDevices)) {
  239. //有新的设备产生
  240. Set<String> existDeviceIds = existDevices.stream()
  241. .peek(device->{
  242. BusDeviceEntity pollDevice = deviceById.get(device.getDeviceId());
  243. device.setStatus(pollDevice.getStatus());
  244. })
  245. .map(BusDeviceEntity::getDeviceId).collect(Collectors.toSet());
  246. List<BusDeviceEntity> insertDevices = devices.stream().filter(info -> !existDeviceIds.contains(info.getDeviceId())).collect(Collectors.toList());
  247. this.saveBatch(insertDevices);
  248. log.info("新增设备数量:【{}】",CollUtil.size(insertDevices));
  249. }
  250. log.info("更新设备数量:【{}】",CollUtil.size(existDevices));
  251. this.updateBatchById(existDevices);
  252. })
  253. .whenComplete((i,e)->{
  254. if(e!=null){
  255. log.error("同步设备失败",e);
  256. }
  257. });
  258. }
  259. }
  260. /**
  261. * @author 龙三郎
  262. * 从阿里云平台获取全部的设备,同步到系统数据库
  263. * @return
  264. */
  265. @Deprecated
  266. public int syncAllDevice(){
  267. Integer m = 0;
  268. // 创建异步执行任务,有返回值
  269. CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(()->{
  270. AtomicReference<Integer> n = new AtomicReference<>(0);
  271. // 同步所有的设备
  272. aliyunIotSdk.queryDevice().forEach(item ->{
  273. //避免循环中操作数据库
  274. BusDeviceEntity device = new BusDeviceEntity();
  275. device.setConfig(new AliIotConfig());
  276. // 更新设备属性
  277. device.updateFields(item);
  278. n.updateAndGet(v -> v + (this.saveDevice(device)?1:0));
  279. });
  280. return n.get();
  281. });
  282. //等待子任务执行完成
  283. try {
  284. m = cf.get();
  285. }catch (Exception e){
  286. throw new CustomException(e.getMessage());
  287. }
  288. return m;
  289. }
  290. /**
  291. * @author 龙三郎
  292. * 根据ids获取指定的设备,更新本地设备
  293. * @return
  294. */
  295. public int syncDevice(List<String> ids) {
  296. AtomicReference<Integer> n = new AtomicReference<>(0);
  297. // ids不能为空
  298. if (CollUtil.isEmpty(ids)){
  299. throw new CustomException("设备id不能为空");
  300. }
  301. // 同步指定的设备
  302. ids.forEach(id->{
  303. n.updateAndGet(v -> v + (this.updateDeviceByDeviceId(id)?1:0));
  304. });
  305. return n.get();
  306. }
  307. /**
  308. * 描述: 设备医院换绑操作
  309. * @author lifang
  310. * @date 2022/5/8 21:34
  311. * @param deviceIds 换绑的设备id
  312. * @param afterTenantId 换绑之后的医院id
  313. * @return void
  314. */
  315. @Transactional(rollbackFor = Exception.class)
  316. public void shift(List<String> deviceIds, String afterTenantId) {
  317. List<DeviceOperator> operators = deviceIds.stream()
  318. .map(deviceRegistry::getOperator)
  319. .collect(Collectors.toList());
  320. if (CollUtil.isEmpty(operators)) {
  321. return;
  322. }
  323. this.update(new UpdateWrapper<BusDeviceEntity>().lambda()
  324. .in(BusDeviceEntity::getDeviceId,deviceIds)
  325. .set(BusDeviceEntity::getTenantId,afterTenantId));
  326. operators
  327. .stream()
  328. .filter(Objects::nonNull)
  329. .forEach(deviceOperator -> deviceOperator.getCache().remove(DeviceKeyConstant.TENANT_ID));
  330. }
  331. }