AliyunConsumerGroupService.java 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. package com.coffee.aliyun;
  2. import cn.hutool.extra.spring.SpringUtil;
  3. import cn.hutool.json.JSONUtil;
  4. import com.alibaba.fastjson.JSON;
  5. import com.alibaba.fastjson.JSONObject;
  6. import com.coffee.aliyun.utils.Constants;
  7. import com.coffee.aliyun.utils.EnumUtils;
  8. import com.coffee.aliyun.utils.Items;
  9. import com.coffee.bus.bean.AliIotConfig;
  10. import com.coffee.bus.entity.BusDeviceEntity;
  11. import com.coffee.bus.entity.BusDeviceRunningEntity;
  12. import com.coffee.bus.listener.event.bean.DeviceInfoEvent;
  13. import com.coffee.bus.service.LocalBusDeviceHistoryService;
  14. import com.coffee.bus.service.LocalBusDeviceService;
  15. import com.coffee.bus.service.LocalBusInfusionHistoryService;
  16. import lombok.extern.slf4j.Slf4j;
  17. import org.springframework.beans.factory.annotation.Autowired;
  18. import org.springframework.beans.factory.annotation.Value;
  19. import org.springframework.context.annotation.Lazy;
  20. import org.springframework.stereotype.Service;
  21. import javax.annotation.PostConstruct;
  22. import javax.jms.Message;
  23. import javax.jms.MessageListener;
  24. import java.util.Date;
  25. import java.util.HashMap;
  26. import java.util.Map;
  27. import java.util.concurrent.ExecutorService;
  28. import java.util.concurrent.LinkedBlockingQueue;
  29. import java.util.concurrent.ThreadPoolExecutor;
  30. import java.util.concurrent.TimeUnit;
  31. /**
  32. * @Author 龙三郎
  33. * @Date 2022-4-06 16:22:13
  34. * @Version 1.0
  35. * @Description 阿里云物联网平台服务端订阅
  36. */
  37. @Service
  38. @Slf4j
  39. public class AliyunConsumerGroupService {
  40. @Autowired
  41. @Lazy
  42. private AliyunIotSubscribeClient client;
  43. @Autowired
  44. @Lazy
  45. private LocalBusDeviceService deviceService;
  46. @Autowired
  47. @Lazy
  48. private LocalBusInfusionHistoryService infusionHistoryService;
  49. @Autowired
  50. @Lazy
  51. private LocalBusDeviceHistoryService deviceHistoryService;
  52. @Value("${aliyun.server-subscription.enable:false}")
  53. private boolean isEnable;
  54. // 开启服务端订阅
  55. @PostConstruct
  56. public void subscribe(){
  57. if (!isEnable){
  58. log.info("订阅禁止");
  59. return;
  60. }
  61. log.info("允许开启订阅");
  62. try {
  63. // 开启订阅
  64. client.start(messageListener);
  65. } catch (Exception e) {
  66. e.printStackTrace();
  67. }
  68. log.info("阿里云物联网订阅成功。。。。。。。。。。。");
  69. }
  70. //业务处理异步线程池,线程池参数可以根据您的业务特点调整,或者您也可以用其他异步方式处理接收到的消息。
  71. private final static ExecutorService executorService = new ThreadPoolExecutor(
  72. Runtime.getRuntime().availableProcessors(),
  73. Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,
  74. new LinkedBlockingQueue(50000));
  75. private MessageListener messageListener = (message) -> {
  76. try {
  77. //1.收到消息之后一定要ACK。
  78. // 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。
  79. // 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。
  80. // message.acknowledge();
  81. //2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。
  82. // 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。
  83. executorService.submit(()-> processMessage(message));
  84. } catch (Exception e) {
  85. log.error("submit task occurs exception ", e);
  86. }
  87. };
  88. /**
  89. * 处理逻辑,加上锁
  90. * @param message
  91. */
  92. private static final String IOTID = "iotId";
  93. private static final String PRODUCTKEY = "productKey";
  94. private static final String DEVICENAME = "deviceName";
  95. private static final String TOPIC = "topic";
  96. private static final String MESSAGEID = "messageId";
  97. private static final String ITEMS = "items";
  98. private static final String VALUE = "value";
  99. private static final String STATUS = "status";
  100. private static final String CONTENT = "content";
  101. private void processMessage(Message message) {
  102. PlatformLog platformLog = new PlatformLog();
  103. Map<String, Object> platformData = new HashMap<>();
  104. try {
  105. // 获取主题,消息id和内容
  106. String topic = message.getStringProperty(TOPIC);
  107. String messageId = message.getStringProperty(MESSAGEID);
  108. JSONObject content = JSON.parseObject(new String(message.getBody(byte[].class)));
  109. // 平台数据
  110. platformData.put(TOPIC,topic);
  111. platformData.put(MESSAGEID,messageId);
  112. platformData.put(CONTENT,content);
  113. log.info("阿里云物联网发送的数据:"+JSON.toJSONString(platformData));
  114. // 设备名称
  115. String deviceName = content.getString(DEVICENAME);
  116. // 创建设备对象
  117. Date now = new Date();
  118. // 根据topic判断数据类型
  119. if (topic.matches("^/as/mqtt/status/[\\w\\/]*")){//设备上下线状态
  120. BusDeviceEntity device = new BusDeviceEntity();
  121. device.setDeviceId(deviceName);
  122. String status = content.getString(STATUS);
  123. device.setStatus(EnumUtils.getDeviceStatusEnum2(status));
  124. if (status.equals("online")){
  125. log.info(deviceName+"设备上线");
  126. }else if (status.equals("offline")){
  127. log.info(deviceName+"设备下线");
  128. }else {
  129. log.info(deviceName+"设备未激活");
  130. }
  131. // 更新设备状态
  132. deviceService.updateDevice(device);
  133. }else if (topic.matches("[\\w\\/]*event/property/post$")){//设备属性上报
  134. // 设备属性集合
  135. Items items = new Items(content.getJSONObject("items"));
  136. BusDeviceRunningEntity deviceRunning = new BusDeviceRunningEntity();
  137. deviceRunning.updateFieldsByItems(deviceName,items);
  138. // 发布事件
  139. SpringUtil.publishEvent(new DeviceInfoEvent(this,deviceRunning,deviceName));
  140. // BusInfusionHistoryEntity infusionHistory = infusionHistoryService.saveInfusion(deviceName,items);
  141. // // 保存到device_history表中
  142. // BusDeviceHistoryEntity deviceHistory = new BusDeviceHistoryEntity();
  143. // // 将infusionHistory对象的值,复制到deviceHistory对象
  144. // BeanUtils.copyProperties(infusionHistory,deviceHistory);
  145. // deviceHistory.setId(null);
  146. // deviceHistory.setInfusionId(infusionHistory.getId());
  147. // // 保存设备的历史数据
  148. // deviceHistoryService.save(deviceHistory);
  149. }else if(topic.matches("[\\w\\/]+thing/lifecycle$")){// 设备生命周期
  150. // 获取生命周期类型
  151. String action = content.getString("action");
  152. if ("create".equals(action)){
  153. // 创建设备
  154. BusDeviceEntity device = new BusDeviceEntity();
  155. device.setDeviceId(deviceName);
  156. device.setCreateTime(now);
  157. device.setCreateBy(Constants.DefaultCreateBy);
  158. device.setUpdateTime(now);
  159. device.setUpdateBy(Constants.DefaultUpdateBy);
  160. device.setTenantId(Constants.DefaultHospital);
  161. // 配置信息
  162. AliIotConfig config = new AliIotConfig();
  163. config.setDeviceName(deviceName);
  164. config.setDeviceSecret(content.getString("deviceSecret"));
  165. config.setIotId(content.getString("iotId"));
  166. config.setProductKey(content.getString("productKey"));
  167. // 设置配置信息
  168. device.setConfig(config);
  169. deviceService.saveDevice(device);
  170. }else if ("delete".equals(action)){
  171. // 删除设备
  172. deviceService.removeByDeviceId(deviceName);
  173. }
  174. }else {
  175. log.error("未知的topic:"+topic);
  176. }
  177. } catch (Exception e) {
  178. // 错误标志
  179. platformLog.setIsError("1");
  180. platformLog.setErrorLog(e.getMessage());
  181. log.error("数据处理失败 ", e);
  182. }
  183. // 存储日志
  184. }
  185. }