AliyunConsumerGroupService.java 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. package com.coffee.aliyun;
  2. import cn.hutool.extra.spring.SpringUtil;
  3. import com.alibaba.fastjson.JSON;
  4. import com.alibaba.fastjson.JSONObject;
  5. import com.coffee.aliyun.utils.Constants;
  6. import com.coffee.aliyun.utils.Items;
  7. import com.coffee.bus.bean.AliIotConfig;
  8. import com.coffee.bus.entity.BusDeviceEntity;
  9. import com.coffee.bus.entity.BusDeviceRunningEntity;
  10. import com.coffee.bus.entity.BusInfusionHistoryEntity;
  11. import com.coffee.bus.listener.event.bean.DeviceInfoEvent;
  12. import com.coffee.bus.service.LocalBusDeviceService;
  13. import com.coffee.bus.service.LocalBusInfusionHistoryService;
  14. import lombok.extern.slf4j.Slf4j;
  15. import org.springframework.beans.factory.annotation.Value;
  16. import org.springframework.stereotype.Service;
  17. import javax.annotation.PostConstruct;
  18. import javax.jms.Message;
  19. import javax.jms.MessageListener;
  20. import java.util.Date;
  21. import java.util.HashMap;
  22. import java.util.Map;
  23. import java.util.concurrent.ExecutorService;
  24. import java.util.concurrent.LinkedBlockingQueue;
  25. import java.util.concurrent.ThreadPoolExecutor;
  26. import java.util.concurrent.TimeUnit;
  27. /**
  28. * @Author longsanlang
  29. * @Date 2022-4-06 16:22:13
  30. * @Version 1.0
  31. * @Description 阿里云物联网平台服务端订阅
  32. */
  33. @Service
  34. @Slf4j
  35. public class AliyunConsumerGroupService {
  36. // 初始化订阅客户端
  37. private final AliyunIotSubscribeClient client;
  38. // 设备Service
  39. private final LocalBusDeviceService deviceService;
  40. // 输注Service
  41. private final LocalBusInfusionHistoryService infusionHistoryService;
  42. public AliyunConsumerGroupService(AliyunIotSubscribeClient client,
  43. LocalBusDeviceService deviceService,
  44. LocalBusInfusionHistoryService infusionHistoryService) {
  45. this.client = client;
  46. this.deviceService = deviceService;
  47. this.infusionHistoryService = infusionHistoryService;
  48. }
  49. @Value("${aliyun.server-subscription.enable:false}")
  50. private boolean isEnable;
  51. // 开启服务端订阅
  52. @PostConstruct
  53. public void subscribe(){
  54. if (!isEnable){
  55. log.info("订阅禁止");
  56. return;
  57. }
  58. log.info("允许开启订阅");
  59. try {
  60. // 开启订阅
  61. client.start(messageListener);
  62. } catch (Exception e) {
  63. e.printStackTrace();
  64. }
  65. log.info("阿里云物联网订阅成功。。。。。。。。。。。");
  66. }
  67. //业务处理异步线程池,线程池参数可以根据您的业务特点调整,或者您也可以用其他异步方式处理接收到的消息。
  68. private final static ExecutorService executorService = new ThreadPoolExecutor(
  69. Runtime.getRuntime().availableProcessors(),
  70. Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,
  71. new LinkedBlockingQueue(50000));
  72. private MessageListener messageListener = (message) -> {
  73. try {
  74. //1.收到消息之后一定要ACK。
  75. // 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。
  76. // 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。
  77. // message.acknowledge();
  78. //2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。
  79. // 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。
  80. executorService.submit(()-> processMessage(message));
  81. } catch (Exception e) {
  82. log.error("submit task occurs exception ", e);
  83. }
  84. };
  85. /**
  86. * 处理逻辑,加上锁
  87. * @param message
  88. */
  89. private static final String IOTID = "iotId";
  90. private static final String PRODUCTKEY = "productKey";
  91. private static final String DEVICENAME = "deviceName";
  92. private static final String TOPIC = "topic";
  93. private static final String MESSAGEID = "messageId";
  94. private static final String ITEMS = "items";
  95. private static final String VALUE = "value";
  96. private static final String STATUS = "status";
  97. private static final String CONTENT = "content";
  98. private void processMessage(Message message) {
  99. PlatformLog platformLog = new PlatformLog();
  100. Map<String, Object> platformData = new HashMap<>();
  101. try {
  102. // 获取主题,消息id和内容
  103. String topic = message.getStringProperty(TOPIC);
  104. String messageId = message.getStringProperty(MESSAGEID);
  105. JSONObject content = JSON.parseObject(new String(message.getBody(byte[].class)));
  106. // 平台数据
  107. platformData.put(TOPIC,topic);
  108. platformData.put(MESSAGEID,messageId);
  109. platformData.put(CONTENT,content);
  110. log.info("阿里云物联网发送的数据:"+JSON.toJSONString(platformData));
  111. // 设备名称
  112. String deviceName = content.getString(DEVICENAME);
  113. // 创建设备对象
  114. Date now = new Date();
  115. // 根据topic判断数据类型
  116. if (topic.matches("^/as/mqtt/status/[\\w\\/]*")){//设备上下线状态
  117. String status = content.getString(STATUS);
  118. if (status.equals("online")){
  119. log.info(deviceName+"设备上线");
  120. }else if (status.equals("offline")){
  121. log.info(deviceName+"设备下线");
  122. }else {
  123. log.info(deviceName+"设备未激活");
  124. }
  125. }else if (topic.matches("[\\w\\/]*event/property/post$")){//设备属性上报
  126. /**
  127. * platformData:
  128. * {
  129. * "topic":"/a1M7k1TAECc/ceshi001/thing/event/property/post",
  130. * "messageId":"1496410088574460416",
  131. * "content":"{\"deviceType\":\"CustomCategory\",
  132. * \"iotId\":\"0syOzMqwkGebZBGKa08d000000\",
  133. * \"requestId\":\"254\",\"checkFailedData\":{},
  134. * \"productKey\":\"a1M7k1TAECc\",\"gmtCreate\":1645606941721,
  135. * \"deviceName\":\"ceshi001\",
  136. * \"items\":{
  137. * \"total\":{\"value\":100,\"time\":1645606941717},
  138. * \"pumpCode\":{\"value\":\"5719512336330299\",\"time\":1645606941717},
  139. * \"dataNumber\":{\"value\":254,\"time\":1645606941717},
  140. * \"PCAInvalid\":{\"value\":0,\"time\":1645606941717},
  141. * \"alarm\":{\"value\":0,\"time\":1645606941717},
  142. * \"flowRate\":{\"value\":2,\"time\":1645606941717},
  143. * \"PCAValid\":{\"value\":0,\"time\":1645606941717},
  144. * \"forecast\":{\"value\":0,\"time\":1645606941717},
  145. * \"finished\":{\"value\":17,\"time\":1645606941717},
  146. * \"battery\":{\"value\":65,\"time\":1645606941717},
  147. * \"runStatus\":{\"value\":2,\"time\":1645606941717},
  148. * \"patientCode\":{\"value\":111110000,\"time\":1645606941717}}}"
  149. * }
  150. */
  151. // 设备属性集合
  152. Items items = new Items(content.getJSONObject("items"));
  153. BusInfusionHistoryEntity infusionHistory = infusionHistoryService.saveInfusion(deviceName,items);
  154. // 发布设备信息事件。处理完毕,传向下一步处理
  155. DeviceInfoEvent deviceInfoEvent = new DeviceInfoEvent(this,new BusDeviceRunningEntity(),"123456");
  156. SpringUtil.publishEvent(deviceInfoEvent);
  157. }else if(topic.matches("[\\w\\/]+thing/lifecycle$")){// 设备生命周期
  158. // 获取生命周期类型
  159. String action = content.getString("action");
  160. if ("create".equals(action)){
  161. // 创建设备
  162. BusDeviceEntity device = new BusDeviceEntity();
  163. device.setDeviceId(deviceName);
  164. device.setCreateBy(Constants.DefaultCreateBy);
  165. device.setUpdateBy(Constants.DefaultUpdateBy);
  166. device.setUpdateTime(now);
  167. device.setTenantId(Constants.DefaultHospital);
  168. // 配置信息
  169. AliIotConfig config = new AliIotConfig();
  170. config.setDeviceName(deviceName);
  171. config.setDeviceSecret(content.getString("deviceSecret"));
  172. config.setIotId(content.getString("iotId"));
  173. config.setProductKey(content.getString("productKey"));
  174. // 设置配置信息
  175. device.setConfig(config);
  176. deviceService.saveByDeviceId(device);
  177. }else if ("delete".equals(action)){
  178. // 删除设备
  179. deviceService.removeByDeviceId(deviceName);
  180. }
  181. }else {
  182. log.error("未知的topic:"+topic);
  183. }
  184. } catch (Exception e) {
  185. // 错误标志
  186. platformLog.setIsError("1");
  187. platformLog.setErrorLog(e.getMessage());
  188. log.error("数据处理失败 ", e);
  189. }
  190. // 存储日志
  191. }
  192. }