| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233 |
- package com.coffee.aliyun;
- import cn.hutool.core.text.CharSequenceUtil;
- import cn.hutool.extra.spring.SpringUtil;
- import cn.hutool.json.JSONUtil;
- import com.alibaba.fastjson.JSON;
- import com.alibaba.fastjson.JSONObject;
- import com.coffee.aliyun.utils.Constants;
- import com.coffee.aliyun.utils.EnumUtils;
- import com.coffee.aliyun.utils.Items;
- import com.coffee.bus.bean.AliIotConfig;
- import com.coffee.bus.entity.BusDeviceEntity;
- import com.coffee.bus.entity.BusDeviceRunningEntity;
- import com.coffee.bus.entity.BusHospitalLogEntity;
- import com.coffee.bus.enums.HospitalLogEnum;
- import com.coffee.bus.listener.event.bean.DeviceInfoEvent;
- import com.coffee.bus.service.LocalBusDeviceService;
- import com.coffee.bus.service.LocalBusHospitalLogService;
- import com.coffee.bus.websocket.listener.DeviceInfoListener;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Lazy;
- import org.springframework.stereotype.Service;
- import javax.annotation.PostConstruct;
- import javax.jms.Message;
- import javax.jms.MessageListener;
- import java.util.Date;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.concurrent.*;
- import java.util.concurrent.atomic.AtomicInteger;
- /**
- * @Author 龙三郎
- * @Date 2022-4-06 16:22:13
- * @Version 1.0
- * @Description 阿里云物联网平台服务端订阅
- */
- @Service
- @Slf4j
- public class AliyunConsumerGroupService {
- private static final String IOTID = "iotId";
- private static final String PRODUCTKEY = "productKey";
- private static final String DEVICENAME = "deviceName";
- private static final String TOPIC = "topic";
- private static final String MESSAGEID = "messageId";
- private static final String ITEMS = "items";
- private static final String VALUE = "value";
- private static final String STATUS = "status";
- private static final String CONTENT = "content";
- @Autowired
- @Lazy
- private AliyunIotSubscribeClient client;
- @Autowired
- @Lazy
- private LocalBusDeviceService deviceService;
- @Autowired
- private DeviceInfoListener deviceInfoListener;
- @Autowired
- @Lazy
- private LocalBusHospitalLogService hospitalLogService;
- @Value("${aliyun.server-subscription.enable:false}")
- private boolean isEnable;
- // 开启服务端订阅
- @PostConstruct
- public void subscribe(){
- if (!isEnable){
- log.info("订阅禁止");
- return;
- }
- log.info("允许开启订阅");
- try {
- // 开启订阅
- client.start(messageListener);
- } catch (Exception e) {
- e.printStackTrace();
- }
- log.info("阿里云物联网订阅成功。。。。。。。。。。。");
- }
- /**
- * 描述:业务处理异步线程池,线程池参数可以根据您的业务特点调整,或者您也可以用其他异步方式处理接收到的消息。
- */
- private final static ExecutorService executorService = new ThreadPoolExecutor(
- Runtime.getRuntime().availableProcessors(),
- Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,
- new LinkedBlockingQueue(50000),
- new AliYunThreadFactory());
- static class AliYunThreadFactory implements ThreadFactory {
- static final AtomicInteger poolNumber = new AtomicInteger(1);
- final ThreadGroup group;
- final AtomicInteger threadNumber = new AtomicInteger(1);
- final String namePrefix;
- AliYunThreadFactory() {
- SecurityManager s = System.getSecurityManager();
- group = (s != null)? s.getThreadGroup() :
- Thread.currentThread().getThreadGroup();
- namePrefix = "ali-iot-" +
- poolNumber.getAndIncrement();
- }
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(group, r,
- namePrefix + threadNumber.getAndIncrement(),
- 0);
- if (t.isDaemon()){
- t.setDaemon(false);
- }
- if (t.getPriority() != Thread.NORM_PRIORITY){
- t.setPriority(Thread.NORM_PRIORITY);
- }
- return t;
- }
- }
- private MessageListener messageListener = (message) -> {
- try {
- //1.收到消息之后一定要ACK。
- // 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。
- // 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。
- // message.acknowledge();
- //2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。
- // 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。
- executorService.submit(()-> processMessage(message));
- } catch (Exception e) {
- log.error("submit task occurs exception ", e);
- }
- };
- private void processMessage(Message message) {
- BusHospitalLogEntity hospitalLog = new BusHospitalLogEntity();
- hospitalLog.setType(HospitalLogEnum.ALI);
- long startTime = System.currentTimeMillis();
- String deviceName=null;
- try {
- // 获取主题,消息id和内容
- String topic = message.getStringProperty(TOPIC);
- String messageId = message.getStringProperty(MESSAGEID);
- hospitalLog.setMsgId(messageId);
- JSONObject content = JSON.parseObject(new String(message.getBody(byte[].class)));
- log.info("阿里云物联网发送的数据:"+JSON.toJSONString(content));
- // 设备名称
- deviceName = content.getString(DEVICENAME);
- hospitalLog.setIdentityCode(deviceName);
- hospitalLog.setInput(content.toJSONString());
- // 创建设备对象
- Date now = new Date();
- // 根据topic判断数据类型,设备上下线状态
- if (topic.matches("^/as/mqtt/status/[\\w\\/]*")){
- BusDeviceEntity device = new BusDeviceEntity();
- device.setDeviceId(deviceName);
- String status = content.getString(STATUS);
- device.setStatus(EnumUtils.getDeviceStatusEnum2(status));
- if ("online".equalsIgnoreCase(status)){
- log.info(deviceName+"设备【{}】上线",deviceName);
- }else if ("offline".equalsIgnoreCase(status)){
- log.info(deviceName+"设备【{}】下线",deviceName);
- }else {
- log.info(deviceName+"设备【{}】未激活",deviceName);
- }
- // 更新设备状态
- deviceService.updateDevice(device);
- }else if (topic.matches("[\\w\\/]*event/property/post$")){//设备属性上报
- // 设备属性集合
- Items items = new Items(content.getJSONObject("items"));
- BusDeviceRunningEntity deviceRunning = new BusDeviceRunningEntity();
- deviceRunning.updateFieldsByItems(deviceName,items);
- deviceInfoListener.deviceInfoDetail(new DeviceInfoEvent(this,deviceRunning,deviceName));
- }else if(topic.matches("[\\w\\/]+thing/lifecycle$")){// 设备生命周期
- // 获取生命周期类型
- String action = content.getString("action");
- if ("create".equals(action)){
- // 创建设备
- BusDeviceEntity device = new BusDeviceEntity();
- device.setDeviceId(deviceName);
- device.setCreateTime(now);
- device.setCreateBy(Constants.DefaultCreateBy);
- device.setUpdateTime(now);
- device.setUpdateBy(Constants.DefaultUpdateBy);
- device.setTenantId(Constants.DefaultHospital);
- // 配置信息
- AliIotConfig config = new AliIotConfig();
- config.setDeviceName(deviceName);
- config.setDeviceSecret(content.getString("deviceSecret"));
- config.setIotId(content.getString("iotId"));
- config.setProductKey(content.getString("productKey"));
- // 设置配置信息
- device.setConfig(config);
- deviceService.saveDevice(device);
- }else if ("delete".equals(action)){
- // 删除设备
- deviceService.removeByDeviceId(deviceName);
- }
- }else {
- log.warn("阿里云数据【{}】,未知的topic:【{}】",content.toJSONString(),topic);
- }
- hospitalLog.setSuccess(true);
- } catch (Exception e) {
- hospitalLog.setSuccess(false);
- hospitalLog.setMessage(e.toString());
- log.error("阿里云数据【{}】数据处理失败 ", JSONUtil.toJsonStr(message), e);
- }finally {
- if(CharSequenceUtil.isNotBlank(deviceName)){
- BusDeviceEntity device = deviceService.getByDeviceId(deviceName);
- long entTime = System.currentTimeMillis();
- hospitalLog.setTenantId(device.getTenantId());
- hospitalLog.setUseTime(entTime-startTime);
- hospitalLogService.save(hospitalLog);
- }
- }
- }
- }
|