| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212 |
- package com.coffee.aliyun;
- import cn.hutool.extra.spring.SpringUtil;
- import cn.hutool.json.JSONUtil;
- import com.alibaba.fastjson.JSON;
- import com.alibaba.fastjson.JSONObject;
- import com.coffee.aliyun.utils.Constants;
- import com.coffee.aliyun.utils.EnumUtils;
- import com.coffee.aliyun.utils.Items;
- import com.coffee.bus.bean.AliIotConfig;
- import com.coffee.bus.entity.BusDeviceEntity;
- import com.coffee.bus.entity.BusDeviceRunningEntity;
- import com.coffee.bus.listener.event.bean.DeviceInfoEvent;
- import com.coffee.bus.service.LocalBusDeviceHistoryService;
- import com.coffee.bus.service.LocalBusDeviceService;
- import com.coffee.bus.service.LocalBusInfusionHistoryService;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Lazy;
- import org.springframework.stereotype.Service;
- import javax.annotation.PostConstruct;
- import javax.jms.Message;
- import javax.jms.MessageListener;
- import java.util.Date;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.LinkedBlockingQueue;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
- /**
- * @Author 龙三郎
- * @Date 2022-4-06 16:22:13
- * @Version 1.0
- * @Description 阿里云物联网平台服务端订阅
- */
- @Service
- @Slf4j
- public class AliyunConsumerGroupService {
- @Autowired
- @Lazy
- private AliyunIotSubscribeClient client;
- @Autowired
- @Lazy
- private LocalBusDeviceService deviceService;
- @Autowired
- @Lazy
- private LocalBusInfusionHistoryService infusionHistoryService;
- @Autowired
- @Lazy
- private LocalBusDeviceHistoryService deviceHistoryService;
- @Value("${aliyun.server-subscription.enable:false}")
- private boolean isEnable;
- // 开启服务端订阅
- @PostConstruct
- public void subscribe(){
- if (!isEnable){
- log.info("订阅禁止");
- return;
- }
- log.info("允许开启订阅");
- try {
- // 开启订阅
- client.start(messageListener);
- } catch (Exception e) {
- e.printStackTrace();
- }
- log.info("阿里云物联网订阅成功。。。。。。。。。。。");
- }
- //业务处理异步线程池,线程池参数可以根据您的业务特点调整,或者您也可以用其他异步方式处理接收到的消息。
- private final static ExecutorService executorService = new ThreadPoolExecutor(
- Runtime.getRuntime().availableProcessors(),
- Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,
- new LinkedBlockingQueue(50000));
- private MessageListener messageListener = (message) -> {
- try {
- //1.收到消息之后一定要ACK。
- // 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。
- // 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。
- // message.acknowledge();
- //2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。
- // 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。
- executorService.submit(()-> processMessage(message));
- } catch (Exception e) {
- log.error("submit task occurs exception ", e);
- }
- };
- /**
- * 处理逻辑,加上锁
- * @param message
- */
- private static final String IOTID = "iotId";
- private static final String PRODUCTKEY = "productKey";
- private static final String DEVICENAME = "deviceName";
- private static final String TOPIC = "topic";
- private static final String MESSAGEID = "messageId";
- private static final String ITEMS = "items";
- private static final String VALUE = "value";
- private static final String STATUS = "status";
- private static final String CONTENT = "content";
- private void processMessage(Message message) {
- PlatformLog platformLog = new PlatformLog();
- Map<String, Object> platformData = new HashMap<>();
- try {
- // 获取主题,消息id和内容
- String topic = message.getStringProperty(TOPIC);
- String messageId = message.getStringProperty(MESSAGEID);
- JSONObject content = JSON.parseObject(new String(message.getBody(byte[].class)));
- // 平台数据
- platformData.put(TOPIC,topic);
- platformData.put(MESSAGEID,messageId);
- platformData.put(CONTENT,content);
- log.info("阿里云物联网发送的数据:"+JSON.toJSONString(platformData));
- // 设备名称
- String deviceName = content.getString(DEVICENAME);
- // 创建设备对象
- Date now = new Date();
- // 根据topic判断数据类型
- if (topic.matches("^/as/mqtt/status/[\\w\\/]*")){//设备上下线状态
- BusDeviceEntity device = new BusDeviceEntity();
- device.setDeviceId(deviceName);
- String status = content.getString(STATUS);
- device.setStatus(EnumUtils.getDeviceStatusEnum2(status));
- if (status.equals("online")){
- log.info(deviceName+"设备上线");
- }else if (status.equals("offline")){
- log.info(deviceName+"设备下线");
- }else {
- log.info(deviceName+"设备未激活");
- }
- // 更新设备状态
- deviceService.updateDevice(device);
- }else if (topic.matches("[\\w\\/]*event/property/post$")){//设备属性上报
- // 设备属性集合
- Items items = new Items(content.getJSONObject("items"));
- BusDeviceRunningEntity deviceRunning = new BusDeviceRunningEntity();
- deviceRunning.updateFieldsByItems(deviceName,items);
- // 发布事件
- SpringUtil.publishEvent(new DeviceInfoEvent(this,deviceRunning,deviceName));
- // BusInfusionHistoryEntity infusionHistory = infusionHistoryService.saveInfusion(deviceName,items);
- // // 保存到device_history表中
- // BusDeviceHistoryEntity deviceHistory = new BusDeviceHistoryEntity();
- // // 将infusionHistory对象的值,复制到deviceHistory对象
- // BeanUtils.copyProperties(infusionHistory,deviceHistory);
- // deviceHistory.setId(null);
- // deviceHistory.setInfusionId(infusionHistory.getId());
- // // 保存设备的历史数据
- // deviceHistoryService.save(deviceHistory);
- }else if(topic.matches("[\\w\\/]+thing/lifecycle$")){// 设备生命周期
- // 获取生命周期类型
- String action = content.getString("action");
- if ("create".equals(action)){
- // 创建设备
- BusDeviceEntity device = new BusDeviceEntity();
- device.setDeviceId(deviceName);
- device.setCreateTime(now);
- device.setCreateBy(Constants.DefaultCreateBy);
- device.setUpdateTime(now);
- device.setUpdateBy(Constants.DefaultUpdateBy);
- device.setTenantId(Constants.DefaultHospital);
- // 配置信息
- AliIotConfig config = new AliIotConfig();
- config.setDeviceName(deviceName);
- config.setDeviceSecret(content.getString("deviceSecret"));
- config.setIotId(content.getString("iotId"));
- config.setProductKey(content.getString("productKey"));
- // 设置配置信息
- device.setConfig(config);
- deviceService.saveDevice(device);
- }else if ("delete".equals(action)){
- // 删除设备
- deviceService.removeByDeviceId(deviceName);
- }
- }else {
- log.error("未知的topic:"+topic);
- }
- } catch (Exception e) {
- // 错误标志
- platformLog.setIsError("1");
- platformLog.setErrorLog(e.getMessage());
- log.error("数据处理失败 ", e);
- }
- // 存储日志
- }
- }
|