package com.coffee.aliyun; import cn.hutool.extra.spring.SpringUtil; import cn.hutool.json.JSONUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.coffee.aliyun.utils.Constants; import com.coffee.aliyun.utils.EnumUtils; import com.coffee.aliyun.utils.Items; import com.coffee.bus.bean.AliIotConfig; import com.coffee.bus.entity.BusDeviceEntity; import com.coffee.bus.entity.BusDeviceRunningEntity; import com.coffee.bus.listener.event.bean.DeviceInfoEvent; import com.coffee.bus.service.LocalBusDeviceHistoryService; import com.coffee.bus.service.LocalBusDeviceService; import com.coffee.bus.service.LocalBusInfusionHistoryService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import javax.jms.Message; import javax.jms.MessageListener; import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * @Author 龙三郎 * @Date 2022-4-06 16:22:13 * @Version 1.0 * @Description 阿里云物联网平台服务端订阅 */ @Service @Slf4j public class AliyunConsumerGroupService { @Autowired @Lazy private AliyunIotSubscribeClient client; @Autowired @Lazy private LocalBusDeviceService deviceService; @Autowired @Lazy private LocalBusInfusionHistoryService infusionHistoryService; @Autowired @Lazy private LocalBusDeviceHistoryService deviceHistoryService; @Value("${aliyun.server-subscription.enable:false}") private boolean isEnable; // 开启服务端订阅 @PostConstruct public void subscribe(){ if (!isEnable){ log.info("订阅禁止"); return; } log.info("允许开启订阅"); try { // 开启订阅 client.start(messageListener); } catch (Exception e) { e.printStackTrace(); } log.info("阿里云物联网订阅成功。。。。。。。。。。。"); } //业务处理异步线程池,线程池参数可以根据您的业务特点调整,或者您也可以用其他异步方式处理接收到的消息。 private final static ExecutorService executorService = new ThreadPoolExecutor( Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS, new LinkedBlockingQueue(50000)); private MessageListener messageListener = (message) -> { try { //1.收到消息之后一定要ACK。 // 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。 // 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。 // message.acknowledge(); //2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。 // 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。 executorService.submit(()-> processMessage(message)); } catch (Exception e) { log.error("submit task occurs exception ", e); } }; /** * 处理逻辑,加上锁 * @param message */ private static final String IOTID = "iotId"; private static final String PRODUCTKEY = "productKey"; private static final String DEVICENAME = "deviceName"; private static final String TOPIC = "topic"; private static final String MESSAGEID = "messageId"; private static final String ITEMS = "items"; private static final String VALUE = "value"; private static final String STATUS = "status"; private static final String CONTENT = "content"; private void processMessage(Message message) { PlatformLog platformLog = new PlatformLog(); Map platformData = new HashMap<>(); try { // 获取主题,消息id和内容 String topic = message.getStringProperty(TOPIC); String messageId = message.getStringProperty(MESSAGEID); JSONObject content = JSON.parseObject(new String(message.getBody(byte[].class))); // 平台数据 platformData.put(TOPIC,topic); platformData.put(MESSAGEID,messageId); platformData.put(CONTENT,content); log.info("阿里云物联网发送的数据:"+JSON.toJSONString(platformData)); // 设备名称 String deviceName = content.getString(DEVICENAME); // 创建设备对象 Date now = new Date(); // 根据topic判断数据类型 if (topic.matches("^/as/mqtt/status/[\\w\\/]*")){//设备上下线状态 BusDeviceEntity device = new BusDeviceEntity(); device.setDeviceId(deviceName); String status = content.getString(STATUS); device.setStatus(EnumUtils.getDeviceStatusEnum2(status)); if (status.equals("online")){ log.info(deviceName+"设备上线"); }else if (status.equals("offline")){ log.info(deviceName+"设备下线"); }else { log.info(deviceName+"设备未激活"); } // 更新设备状态 deviceService.updateDevice(device); }else if (topic.matches("[\\w\\/]*event/property/post$")){//设备属性上报 // 设备属性集合 Items items = new Items(content.getJSONObject("items")); BusDeviceRunningEntity deviceRunning = new BusDeviceRunningEntity(); deviceRunning.updateFieldsByItems(deviceName,items); // 发布事件 SpringUtil.publishEvent(new DeviceInfoEvent(this,deviceRunning,deviceName)); // BusInfusionHistoryEntity infusionHistory = infusionHistoryService.saveInfusion(deviceName,items); // // 保存到device_history表中 // BusDeviceHistoryEntity deviceHistory = new BusDeviceHistoryEntity(); // // 将infusionHistory对象的值,复制到deviceHistory对象 // BeanUtils.copyProperties(infusionHistory,deviceHistory); // deviceHistory.setId(null); // deviceHistory.setInfusionId(infusionHistory.getId()); // // 保存设备的历史数据 // deviceHistoryService.save(deviceHistory); }else if(topic.matches("[\\w\\/]+thing/lifecycle$")){// 设备生命周期 // 获取生命周期类型 String action = content.getString("action"); if ("create".equals(action)){ // 创建设备 BusDeviceEntity device = new BusDeviceEntity(); device.setDeviceId(deviceName); device.setCreateTime(now); device.setCreateBy(Constants.DefaultCreateBy); device.setUpdateTime(now); device.setUpdateBy(Constants.DefaultUpdateBy); device.setTenantId(Constants.DefaultHospital); // 配置信息 AliIotConfig config = new AliIotConfig(); config.setDeviceName(deviceName); config.setDeviceSecret(content.getString("deviceSecret")); config.setIotId(content.getString("iotId")); config.setProductKey(content.getString("productKey")); // 设置配置信息 device.setConfig(config); deviceService.saveDevice(device); }else if ("delete".equals(action)){ // 删除设备 deviceService.removeByDeviceId(deviceName); } }else { log.error("未知的topic:"+topic); } } catch (Exception e) { // 错误标志 platformLog.setIsError("1"); platformLog.setErrorLog(e.getMessage()); log.error("数据处理失败 ", e); } // 存储日志 } }