LocalBusDeviceService.java 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340
  1. package com.coffee.bus.service;
  2. import cn.hutool.core.collection.CollUtil;
  3. import cn.hutool.core.text.CharSequenceUtil;
  4. import cn.hutool.core.util.StrUtil;
  5. import com.aliyuncs.iot.model.v20180120.QueryDeviceDetailResponse;
  6. import com.aliyuncs.iot.model.v20180120.QueryDeviceResponse;
  7. import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
  8. import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
  9. import com.baomidou.mybatisplus.core.metadata.IPage;
  10. import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
  11. import com.coffee.aliyun.sdk.AliyunIotSdk;
  12. import com.coffee.bus.bean.AliIotConfig;
  13. import com.coffee.bus.registry.device.DeviceRegistry;
  14. import com.coffee.bus.entity.BusDeviceEntity;
  15. import com.coffee.bus.mapper.BusDeviceMapper;
  16. import com.coffee.bus.registry.device.DeviceOperator;
  17. import com.coffee.bus.service.dto.DeviceQuery;
  18. import com.coffee.bus.service.dto.DeviceResult;
  19. import com.coffee.common.crud.BaseService;
  20. import com.coffee.common.exception.CustomException;
  21. import lombok.extern.slf4j.Slf4j;
  22. import org.springframework.beans.factory.annotation.Autowired;
  23. import org.springframework.context.annotation.Lazy;
  24. import org.springframework.stereotype.Service;
  25. import org.springframework.transaction.annotation.Transactional;
  26. import java.util.*;
  27. import java.util.concurrent.CompletableFuture;
  28. import java.util.concurrent.ExecutionException;
  29. import java.util.concurrent.atomic.AtomicReference;
  30. import java.util.stream.Collectors;
  31. /**
  32. * @author lifang
  33. * @version 1.0.0
  34. * @ClassName LocalBusHospitalService.java
  35. * @Description 设备注册
  36. * @createTime 2022年03月19日 09:27:00
  37. */
  38. @Service
  39. @Slf4j
  40. public class LocalBusDeviceService extends BaseService<BusDeviceMapper, BusDeviceEntity,String> {
  41. @Autowired
  42. @Lazy
  43. private DeviceRegistry deviceRegistry;
  44. @Autowired
  45. private BusDeviceMapper deviceMapper;
  46. @Autowired
  47. private AliyunIotSdk aliyunIotSdk;
  48. @Override
  49. public void validateBeforeSave(BusDeviceEntity entity) {
  50. if(entity.getTenantId()==null){
  51. entity.setTenantId("1");
  52. }
  53. }
  54. @Override
  55. public void validateBeforeUpdate(BusDeviceEntity entity) {
  56. }
  57. @Override
  58. public void validateBeforeDelete(String id) {
  59. }
  60. @Override
  61. public void postSave(BusDeviceEntity entity) {
  62. }
  63. @Override
  64. public void postUpdate(BusDeviceEntity entity) {
  65. DeviceOperator deviceOperator = deviceRegistry
  66. .getOperator(entity.getDeviceId());
  67. if(deviceOperator==null){
  68. return;
  69. }
  70. if(CharSequenceUtil.isNotEmpty(entity.getAlias())){
  71. deviceOperator.setAlias(entity.getAlias());
  72. }
  73. if(entity.getEnable()!=null){
  74. deviceOperator.setEnable(entity.getEnable());
  75. }
  76. if(CharSequenceUtil.isNotEmpty(entity.getTenantId())){
  77. deviceOperator.setTenantId(entity.getTenantId());
  78. }
  79. }
  80. @Override
  81. public void postDelete(String id) {
  82. BusDeviceEntity registeredEntity = this.getById(id);
  83. deviceRegistry.remove(registeredEntity.getDeviceId());
  84. }
  85. /**
  86. * @author 龙三郎
  87. * 根据deviceId删除设备
  88. * @param deviceId
  89. */
  90. public void removeByDeviceId(String deviceId){
  91. this.remove(new QueryWrapper<BusDeviceEntity>().lambda()
  92. .eq(BusDeviceEntity::getDeviceId,deviceId));
  93. }
  94. /**
  95. * @author 龙三郎
  96. * 根据deviceId更新设备在线状态
  97. * @param device
  98. */
  99. public boolean updateDevice(BusDeviceEntity device){
  100. return this.update(device,new QueryWrapper<BusDeviceEntity>().lambda()
  101. .eq(BusDeviceEntity::getDeviceId,device.getDeviceId()));
  102. }
  103. /**
  104. * @author 龙三郎
  105. * 保存一个设备,系统中存在时就更新,不存在时则插入。
  106. * @param entity
  107. */
  108. public boolean saveDevice(BusDeviceEntity entity) {
  109. // 查询设备是否存在,无视逻辑删除
  110. BusDeviceEntity device = deviceMapper.selectOneByDeviceId(entity.getDeviceId());
  111. // 判断设备是否存在
  112. boolean isExists = Objects.nonNull(device);
  113. // 设备存在,且处于删除状态,首先去掉逻辑删除标志 ?? 逻辑删除后设备数据是否不再接收
  114. if (isExists && device.getIsDelete() == 1){
  115. deviceMapper.notDelete(entity.getDeviceId());
  116. }
  117. // 设备存在
  118. if (isExists){
  119. // 更新设备
  120. return this.updateDevice(entity);
  121. }else {
  122. // 添加设备
  123. try {
  124. return this.save(entity);
  125. }catch (Exception e){
  126. e.printStackTrace();
  127. }
  128. return false;
  129. }
  130. }
  131. /**
  132. * @author 龙三郎
  133. * 更新一个系统中现有的设备
  134. * @param deviceId
  135. */
  136. public boolean updateDeviceByDeviceId(String deviceId) {
  137. // 查询设备是否存在
  138. BusDeviceEntity device = getByDeviceId(deviceId);
  139. // 设备不存在直接退出
  140. if (Objects.isNull(device)){
  141. return false;
  142. }
  143. // 从阿里云物联网查询设备
  144. QueryDeviceDetailResponse response = aliyunIotSdk.queryDeviceDetail(deviceId);
  145. // 设备存在
  146. if (Boolean.TRUE.equals(response.getSuccess())){
  147. // 更新设备参数
  148. device.updateFields(response.getData());
  149. // 更新设备
  150. return this.updateDevice(device);
  151. }else {
  152. return false;
  153. }
  154. }
  155. /**
  156. * @author 龙三郎
  157. * 根据deviceId获取设备
  158. * @param deviceId
  159. * @return
  160. */
  161. public BusDeviceEntity getByDeviceId(String deviceId) {
  162. BusDeviceEntity device = getOne(new QueryWrapper<BusDeviceEntity>().lambda()
  163. .eq(BusDeviceEntity::getDeviceId,deviceId));
  164. return device;
  165. }
  166. /**
  167. * @author 龙三郎
  168. * 该方法用于从阿里云同步设备,系统暂时不允许创建非阿里云物联网平台设备。意思是系统中的设备必须存在于阿里云物联网平台。
  169. * 通过deviceId从阿里云查询设备,如果设备在阿里云平台不存在,则创建失败,如果存在,则获取设备数据,插入或更新设备数据,表示创建成功。
  170. * @param deviceId
  171. * @return
  172. */
  173. public BusDeviceEntity addDevice(String deviceId) {
  174. BusDeviceEntity device = new BusDeviceEntity();
  175. device.setConfig(new AliIotConfig());
  176. // 从阿里云物联网查询设备
  177. QueryDeviceDetailResponse response = aliyunIotSdk.queryDeviceDetail(deviceId);
  178. // 设备存在
  179. if (Boolean.TRUE.equals(response.getSuccess())){
  180. device.updateFields(response.getData());
  181. // 存储设备
  182. this.saveDevice(device);
  183. return device;
  184. }
  185. // 不存在返回null
  186. return null;
  187. }
  188. public IPage<DeviceResult> pageQuery(Page<DeviceResult> page, DeviceQuery query){
  189. return this.baseMapper.pageQuery(page,query);
  190. }
  191. /**
  192. * 描述: 从阿里云平台获取全部的设备,同步到系统数据库
  193. * @author lifang
  194. * @date 2022/6/21 9:46
  195. * @param
  196. * @return int
  197. */
  198. public void syncDeviceFromIot() throws ExecutionException, InterruptedException {
  199. log.info("从阿里云同步所有设备信息");
  200. HashMap<String, BusDeviceEntity> deviceById = new HashMap<>();
  201. ArrayList<BusDeviceEntity> devices = new ArrayList<>();
  202. List<QueryDeviceResponse.DeviceInfo> deviceInfos = aliyunIotSdk.queryDevice();
  203. log.info("同步数据数量:【{}】",CollUtil.size(deviceInfos));
  204. if (CollUtil.isNotEmpty(deviceInfos)) {
  205. for (QueryDeviceResponse.DeviceInfo deviceInfo : deviceInfos) {
  206. BusDeviceEntity device = new BusDeviceEntity();
  207. device.setConfig(new AliIotConfig());
  208. // 更新设备属性
  209. device.updateFields(deviceInfo);
  210. devices.add(device);
  211. deviceById.put(device.getDeviceId(),device);
  212. }
  213. CompletableFuture.runAsync(() -> {
  214. Set<String> pollDeviceIds = devices.stream().filter(info -> StrUtil.isNotBlank(info.getDeviceId()))
  215. .map(BusDeviceEntity::getDeviceId)
  216. .collect(Collectors.toSet());
  217. List<BusDeviceEntity> existDevices = this.list(new QueryWrapper<BusDeviceEntity>().lambda().in(BusDeviceEntity::getDeviceId, pollDeviceIds));
  218. if (CollUtil.size(pollDeviceIds) != CollUtil.size(existDevices)) {
  219. //有新的设备产生
  220. Set<String> existDeviceIds = existDevices.stream()
  221. .peek(device->{
  222. BusDeviceEntity pollDevice = deviceById.get(device.getDeviceId());
  223. device.setStatus(pollDevice.getStatus());
  224. })
  225. .map(BusDeviceEntity::getDeviceId).collect(Collectors.toSet());
  226. List<BusDeviceEntity> insertDevices = devices.stream().filter(info -> !existDeviceIds.contains(info.getDeviceId())).collect(Collectors.toList());
  227. this.saveBatch(insertDevices);
  228. log.info("新增设备数量:【{}】",CollUtil.size(insertDevices));
  229. }
  230. log.info("更新设备数量:【{}】",CollUtil.size(existDevices));
  231. this.updateBatchById(existDevices);
  232. })
  233. .whenComplete((i,e)->{
  234. if(e!=null){
  235. log.error("同步设备失败",e);
  236. }
  237. });
  238. }
  239. }
  240. /**
  241. * @author 龙三郎
  242. * 从阿里云平台获取全部的设备,同步到系统数据库
  243. * @return
  244. */
  245. @Deprecated
  246. public int syncAllDevice(){
  247. Integer m = 0;
  248. // 创建异步执行任务,有返回值
  249. CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(()->{
  250. AtomicReference<Integer> n = new AtomicReference<>(0);
  251. // 同步所有的设备
  252. aliyunIotSdk.queryDevice().forEach(item ->{
  253. //避免循环中操作数据库
  254. BusDeviceEntity device = new BusDeviceEntity();
  255. device.setConfig(new AliIotConfig());
  256. // 更新设备属性
  257. device.updateFields(item);
  258. n.updateAndGet(v -> v + (this.saveDevice(device)?1:0));
  259. });
  260. return n.get();
  261. });
  262. //等待子任务执行完成
  263. try {
  264. m = cf.get();
  265. }catch (Exception e){
  266. throw new CustomException(e.getMessage());
  267. }
  268. return m;
  269. }
  270. /**
  271. * @author 龙三郎
  272. * 根据ids获取指定的设备,更新本地设备
  273. * @return
  274. */
  275. public int syncDevice(List<String> ids) {
  276. AtomicReference<Integer> n = new AtomicReference<>(0);
  277. // ids不能为空
  278. if (CollUtil.isEmpty(ids)){
  279. throw new CustomException("设备id不能为空");
  280. }
  281. // 同步指定的设备
  282. ids.forEach(id->{
  283. n.updateAndGet(v -> v + (this.updateDeviceByDeviceId(id)?1:0));
  284. });
  285. return n.get();
  286. }
  287. /**
  288. * 描述: 设备医院换绑操作
  289. * @author lifang
  290. * @date 2022/5/8 21:34
  291. * @param deviceIds 换绑的设备id
  292. * @param afterTenantId 换绑之后的医院id
  293. * @return void
  294. */
  295. @Transactional(rollbackFor = Exception.class)
  296. public void shift(List<String> deviceIds, String afterTenantId) {
  297. List<DeviceOperator> operators = deviceIds.stream()
  298. .map(deviceRegistry::getOperator)
  299. .collect(Collectors.toList());
  300. if (CollUtil.isEmpty(operators)) {
  301. return;
  302. }
  303. this.update(new UpdateWrapper<BusDeviceEntity>().lambda()
  304. .in(BusDeviceEntity::getDeviceId,deviceIds)
  305. .set(BusDeviceEntity::getTenantId,afterTenantId));
  306. operators
  307. .stream()
  308. .filter(Objects::nonNull)
  309. .forEach(deviceOperator -> deviceOperator.setTenantId(afterTenantId));
  310. }
  311. }