package com.coffee.aliyun; import cn.hutool.core.text.CharSequenceUtil; import cn.hutool.extra.spring.SpringUtil; import cn.hutool.json.JSONUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.coffee.aliyun.utils.Constants; import com.coffee.aliyun.utils.EnumUtils; import com.coffee.aliyun.utils.Items; import com.coffee.bus.bean.AliIotConfig; import com.coffee.bus.entity.BusDeviceEntity; import com.coffee.bus.entity.BusDeviceRunningEntity; import com.coffee.bus.entity.BusHospitalLogEntity; import com.coffee.bus.enums.HospitalLogEnum; import com.coffee.bus.listener.event.bean.DeviceInfoEvent; import com.coffee.bus.service.LocalBusDeviceService; import com.coffee.bus.service.LocalBusHospitalLogService; import com.coffee.bus.websocket.listener.DeviceInfoListener; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import javax.jms.Message; import javax.jms.MessageListener; import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; /** * @Author 龙三郎 * @Date 2022-4-06 16:22:13 * @Version 1.0 * @Description 阿里云物联网平台服务端订阅 */ @Service @Slf4j public class AliyunConsumerGroupService { private static final String IOTID = "iotId"; private static final String PRODUCTKEY = "productKey"; private static final String DEVICENAME = "deviceName"; private static final String TOPIC = "topic"; private static final String MESSAGEID = "messageId"; private static final String ITEMS = "items"; private static final String VALUE = "value"; private static final String STATUS = "status"; private static final String CONTENT = "content"; @Autowired @Lazy private AliyunIotSubscribeClient client; @Autowired @Lazy private LocalBusDeviceService deviceService; @Autowired private DeviceInfoListener deviceInfoListener; @Autowired @Lazy private LocalBusHospitalLogService hospitalLogService; @Value("${aliyun.server-subscription.enable:false}") private boolean isEnable; // 开启服务端订阅 @PostConstruct public void subscribe(){ if (!isEnable){ log.info("订阅禁止"); return; } log.info("允许开启订阅"); try { // 开启订阅 client.start(messageListener); } catch (Exception e) { e.printStackTrace(); } log.info("阿里云物联网订阅成功。。。。。。。。。。。"); } /** * 描述:业务处理异步线程池,线程池参数可以根据您的业务特点调整,或者您也可以用其他异步方式处理接收到的消息。 */ private final static ExecutorService executorService = new ThreadPoolExecutor( Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS, new LinkedBlockingQueue(50000), new AliYunThreadFactory()); static class AliYunThreadFactory implements ThreadFactory { static final AtomicInteger poolNumber = new AtomicInteger(1); final ThreadGroup group; final AtomicInteger threadNumber = new AtomicInteger(1); final String namePrefix; AliYunThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null)? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "ali-iot-" + poolNumber.getAndIncrement(); } @Override public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()){ t.setDaemon(false); } if (t.getPriority() != Thread.NORM_PRIORITY){ t.setPriority(Thread.NORM_PRIORITY); } return t; } } private MessageListener messageListener = (message) -> { try { //1.收到消息之后一定要ACK。 // 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。 // 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。 // message.acknowledge(); //2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。 // 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。 executorService.submit(()-> processMessage(message)); } catch (Exception e) { log.error("submit task occurs exception ", e); } }; private void processMessage(Message message) { BusHospitalLogEntity hospitalLog = new BusHospitalLogEntity(); hospitalLog.setType(HospitalLogEnum.ALI); long startTime = System.currentTimeMillis(); String deviceName=null; try { // 获取主题,消息id和内容 String topic = message.getStringProperty(TOPIC); String messageId = message.getStringProperty(MESSAGEID); hospitalLog.setMsgId(messageId); JSONObject content = JSON.parseObject(new String(message.getBody(byte[].class))); log.info("阿里云物联网发送的数据:"+JSON.toJSONString(content)); // 设备名称 deviceName = content.getString(DEVICENAME); hospitalLog.setIdentityCode(deviceName); hospitalLog.setInput(content.toJSONString()); // 创建设备对象 Date now = new Date(); // 根据topic判断数据类型,设备上下线状态 if (topic.matches("^/as/mqtt/status/[\\w\\/]*")){ BusDeviceEntity device = new BusDeviceEntity(); device.setDeviceId(deviceName); String status = content.getString(STATUS); device.setStatus(EnumUtils.getDeviceStatusEnum2(status)); if ("online".equalsIgnoreCase(status)){ log.info(deviceName+"设备【{}】上线",deviceName); }else if ("offline".equalsIgnoreCase(status)){ log.info(deviceName+"设备【{}】下线",deviceName); }else { log.info(deviceName+"设备【{}】未激活",deviceName); } // 更新设备状态 deviceService.updateDevice(device); }else if (topic.matches("[\\w\\/]*event/property/post$")){//设备属性上报 // 设备属性集合 Items items = new Items(content.getJSONObject("items")); BusDeviceRunningEntity deviceRunning = new BusDeviceRunningEntity(); deviceRunning.updateFieldsByItems(deviceName,items); deviceInfoListener.deviceInfoDetail(new DeviceInfoEvent(this,deviceRunning,deviceName)); }else if(topic.matches("[\\w\\/]+thing/lifecycle$")){// 设备生命周期 // 获取生命周期类型 String action = content.getString("action"); if ("create".equals(action)){ // 创建设备 BusDeviceEntity device = new BusDeviceEntity(); device.setDeviceId(deviceName); device.setCreateTime(now); device.setCreateBy(Constants.DefaultCreateBy); device.setUpdateTime(now); device.setUpdateBy(Constants.DefaultUpdateBy); device.setTenantId(Constants.DefaultHospital); // 配置信息 AliIotConfig config = new AliIotConfig(); config.setDeviceName(deviceName); config.setDeviceSecret(content.getString("deviceSecret")); config.setIotId(content.getString("iotId")); config.setProductKey(content.getString("productKey")); // 设置配置信息 device.setConfig(config); deviceService.saveDevice(device); }else if ("delete".equals(action)){ // 删除设备 deviceService.removeByDeviceId(deviceName); } }else { log.warn("阿里云数据【{}】,未知的topic:【{}】",content.toJSONString(),topic); } hospitalLog.setSuccess(true); } catch (Exception e) { hospitalLog.setSuccess(false); hospitalLog.setMessage(e.toString()); log.error("阿里云数据【{}】数据处理失败 ", JSONUtil.toJsonStr(message), e); }finally { if(CharSequenceUtil.isNotBlank(deviceName)){ BusDeviceEntity device = deviceService.getByDeviceId(deviceName); long entTime = System.currentTimeMillis(); hospitalLog.setTenantId(device.getTenantId()); hospitalLog.setUseTime(entTime-startTime); hospitalLogService.save(hospitalLog); } } } }