AliyunConsumerGroupService.java 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. package com.coffee.aliyun;
  2. import cn.hutool.core.text.CharSequenceUtil;
  3. import cn.hutool.extra.spring.SpringUtil;
  4. import cn.hutool.json.JSONUtil;
  5. import com.alibaba.fastjson.JSON;
  6. import com.alibaba.fastjson.JSONObject;
  7. import com.coffee.aliyun.utils.Constants;
  8. import com.coffee.aliyun.utils.EnumUtils;
  9. import com.coffee.aliyun.utils.Items;
  10. import com.coffee.bus.bean.AliIotConfig;
  11. import com.coffee.bus.entity.BusDeviceEntity;
  12. import com.coffee.bus.entity.BusDeviceRunningEntity;
  13. import com.coffee.bus.entity.BusHospitalLogEntity;
  14. import com.coffee.bus.enums.HospitalLogEnum;
  15. import com.coffee.bus.listener.event.bean.DeviceInfoEvent;
  16. import com.coffee.bus.service.LocalBusDeviceService;
  17. import com.coffee.bus.service.LocalBusHospitalLogService;
  18. import com.coffee.bus.websocket.listener.DeviceInfoListener;
  19. import lombok.extern.slf4j.Slf4j;
  20. import org.springframework.beans.factory.annotation.Autowired;
  21. import org.springframework.beans.factory.annotation.Value;
  22. import org.springframework.context.annotation.Lazy;
  23. import org.springframework.stereotype.Service;
  24. import javax.annotation.PostConstruct;
  25. import javax.jms.Message;
  26. import javax.jms.MessageListener;
  27. import java.util.Date;
  28. import java.util.HashMap;
  29. import java.util.Map;
  30. import java.util.concurrent.*;
  31. import java.util.concurrent.atomic.AtomicInteger;
  32. /**
  33. * @Author 龙三郎
  34. * @Date 2022-4-06 16:22:13
  35. * @Version 1.0
  36. * @Description 阿里云物联网平台服务端订阅
  37. */
  38. @Service
  39. @Slf4j
  40. public class AliyunConsumerGroupService {
  41. private static final String IOTID = "iotId";
  42. private static final String PRODUCTKEY = "productKey";
  43. private static final String DEVICENAME = "deviceName";
  44. private static final String TOPIC = "topic";
  45. private static final String MESSAGEID = "messageId";
  46. private static final String ITEMS = "items";
  47. private static final String VALUE = "value";
  48. private static final String STATUS = "status";
  49. private static final String CONTENT = "content";
  50. @Autowired
  51. @Lazy
  52. private AliyunIotSubscribeClient client;
  53. @Autowired
  54. @Lazy
  55. private LocalBusDeviceService deviceService;
  56. @Autowired
  57. private DeviceInfoListener deviceInfoListener;
  58. @Autowired
  59. @Lazy
  60. private LocalBusHospitalLogService hospitalLogService;
  61. @Value("${aliyun.server-subscription.enable:false}")
  62. private boolean isEnable;
  63. // 开启服务端订阅
  64. @PostConstruct
  65. public void subscribe(){
  66. if (!isEnable){
  67. log.info("订阅禁止");
  68. return;
  69. }
  70. log.info("允许开启订阅");
  71. try {
  72. // 开启订阅
  73. client.start(messageListener);
  74. } catch (Exception e) {
  75. e.printStackTrace();
  76. }
  77. log.info("阿里云物联网订阅成功。。。。。。。。。。。");
  78. }
  79. /**
  80. * 描述:业务处理异步线程池,线程池参数可以根据您的业务特点调整,或者您也可以用其他异步方式处理接收到的消息。
  81. */
  82. private final static ExecutorService executorService = new ThreadPoolExecutor(
  83. Runtime.getRuntime().availableProcessors(),
  84. Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,
  85. new LinkedBlockingQueue(50000),
  86. new AliYunThreadFactory());
  87. static class AliYunThreadFactory implements ThreadFactory {
  88. static final AtomicInteger poolNumber = new AtomicInteger(1);
  89. final ThreadGroup group;
  90. final AtomicInteger threadNumber = new AtomicInteger(1);
  91. final String namePrefix;
  92. AliYunThreadFactory() {
  93. SecurityManager s = System.getSecurityManager();
  94. group = (s != null)? s.getThreadGroup() :
  95. Thread.currentThread().getThreadGroup();
  96. namePrefix = "ali-iot-" +
  97. poolNumber.getAndIncrement();
  98. }
  99. @Override
  100. public Thread newThread(Runnable r) {
  101. Thread t = new Thread(group, r,
  102. namePrefix + threadNumber.getAndIncrement(),
  103. 0);
  104. if (t.isDaemon()){
  105. t.setDaemon(false);
  106. }
  107. if (t.getPriority() != Thread.NORM_PRIORITY){
  108. t.setPriority(Thread.NORM_PRIORITY);
  109. }
  110. return t;
  111. }
  112. }
  113. private MessageListener messageListener = (message) -> {
  114. try {
  115. //1.收到消息之后一定要ACK。
  116. // 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。
  117. // 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。
  118. // message.acknowledge();
  119. //2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。
  120. // 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。
  121. executorService.submit(()-> processMessage(message));
  122. } catch (Exception e) {
  123. log.error("submit task occurs exception ", e);
  124. }
  125. };
  126. private void processMessage(Message message) {
  127. BusHospitalLogEntity hospitalLog = new BusHospitalLogEntity();
  128. hospitalLog.setType(HospitalLogEnum.ALI);
  129. long startTime = System.currentTimeMillis();
  130. String deviceName=null;
  131. try {
  132. // 获取主题,消息id和内容
  133. String topic = message.getStringProperty(TOPIC);
  134. String messageId = message.getStringProperty(MESSAGEID);
  135. hospitalLog.setMsgId(messageId);
  136. JSONObject content = JSON.parseObject(new String(message.getBody(byte[].class)));
  137. log.info("阿里云物联网发送的数据:"+JSON.toJSONString(content));
  138. // 设备名称
  139. deviceName = content.getString(DEVICENAME);
  140. hospitalLog.setIdentityCode(deviceName);
  141. hospitalLog.setInput(content.toJSONString());
  142. // 创建设备对象
  143. Date now = new Date();
  144. // 根据topic判断数据类型,设备上下线状态
  145. if (topic.matches("^/as/mqtt/status/[\\w\\/]*")){
  146. BusDeviceEntity device = new BusDeviceEntity();
  147. device.setDeviceId(deviceName);
  148. String status = content.getString(STATUS);
  149. device.setStatus(EnumUtils.getDeviceStatusEnum2(status));
  150. if ("online".equalsIgnoreCase(status)){
  151. log.info(deviceName+"设备【{}】上线",deviceName);
  152. }else if ("offline".equalsIgnoreCase(status)){
  153. log.info(deviceName+"设备【{}】下线",deviceName);
  154. }else {
  155. log.info(deviceName+"设备【{}】未激活",deviceName);
  156. }
  157. // 更新设备状态
  158. deviceService.updateDevice(device);
  159. }else if (topic.matches("[\\w\\/]*event/property/post$")){//设备属性上报
  160. // 设备属性集合
  161. Items items = new Items(content.getJSONObject("items"));
  162. BusDeviceRunningEntity deviceRunning = new BusDeviceRunningEntity();
  163. deviceRunning.updateFieldsByItems(deviceName,items);
  164. deviceInfoListener.deviceInfoDetail(new DeviceInfoEvent(this,deviceRunning,deviceName));
  165. }else if(topic.matches("[\\w\\/]+thing/lifecycle$")){// 设备生命周期
  166. // 获取生命周期类型
  167. String action = content.getString("action");
  168. if ("create".equals(action)){
  169. // 创建设备
  170. BusDeviceEntity device = new BusDeviceEntity();
  171. device.setDeviceId(deviceName);
  172. device.setCreateTime(now);
  173. device.setCreateBy(Constants.DefaultCreateBy);
  174. device.setUpdateTime(now);
  175. device.setUpdateBy(Constants.DefaultUpdateBy);
  176. device.setTenantId(Constants.DefaultHospital);
  177. // 配置信息
  178. AliIotConfig config = new AliIotConfig();
  179. config.setDeviceName(deviceName);
  180. config.setDeviceSecret(content.getString("deviceSecret"));
  181. config.setIotId(content.getString("iotId"));
  182. config.setProductKey(content.getString("productKey"));
  183. // 设置配置信息
  184. device.setConfig(config);
  185. deviceService.saveDevice(device);
  186. }else if ("delete".equals(action)){
  187. // 删除设备
  188. deviceService.removeByDeviceId(deviceName);
  189. }
  190. }else {
  191. log.warn("阿里云数据【{}】,未知的topic:【{}】",content.toJSONString(),topic);
  192. }
  193. hospitalLog.setSuccess(true);
  194. } catch (Exception e) {
  195. hospitalLog.setSuccess(false);
  196. hospitalLog.setMessage(e.toString());
  197. log.error("阿里云数据【{}】数据处理失败 ", JSONUtil.toJsonStr(message), e);
  198. }finally {
  199. if(CharSequenceUtil.isNotBlank(deviceName)){
  200. BusDeviceEntity device = deviceService.getByDeviceId(deviceName);
  201. long entTime = System.currentTimeMillis();
  202. hospitalLog.setTenantId(device.getTenantId());
  203. hospitalLog.setUseTime(entTime-startTime);
  204. hospitalLogService.save(hospitalLog);
  205. }
  206. }
  207. }
  208. }