|
|
@@ -0,0 +1,230 @@
|
|
|
+package com.nb.aliyun;
|
|
|
+
|
|
|
+import cn.hutool.core.text.CharSequenceUtil;
|
|
|
+import cn.hutool.json.JSONUtil;
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
+import com.baomidou.mybatisplus.core.toolkit.IdWorker;
|
|
|
+import com.nb.web.api.bean.AliIotConfig;
|
|
|
+import com.nb.web.api.entity.BusDeviceEntity;
|
|
|
+import com.nb.web.api.entity.BusHospitalLogEntity;
|
|
|
+import com.nb.web.api.entity.common.BusDeviceRunningEntity;
|
|
|
+import com.nb.web.api.enums.HospitalLogEnum;
|
|
|
+import com.nb.web.api.feign.IHospitalLogClient;
|
|
|
+import com.nb.web.api.feign.IIotMsgHandler;
|
|
|
+import com.nb.web.api.utils.EnumUtils;
|
|
|
+import com.nb.core.utils.ExceptionUtil;
|
|
|
+import com.nb.web.api.feign.IDeviceClient;
|
|
|
+import com.nb.web.api.utils.Items;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
+import org.springframework.context.annotation.Lazy;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+
|
|
|
+import javax.annotation.PostConstruct;
|
|
|
+import javax.jms.Message;
|
|
|
+import javax.jms.MessageListener;
|
|
|
+import java.util.*;
|
|
|
+import java.util.concurrent.*;
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
+
|
|
|
+/**
|
|
|
+ * @Author 龙三郎
|
|
|
+ * @Date 2022-4-06 16:22:13
|
|
|
+ * @Version 1.0
|
|
|
+ * @Description 阿里云物联网平台服务端订阅
|
|
|
+ */
|
|
|
+@Service
|
|
|
+@Slf4j
|
|
|
+public class AliyunConsumerGroupService {
|
|
|
+ private static final String IOTID = "iotId";
|
|
|
+ private static final String PRODUCTKEY = "productKey";
|
|
|
+ private static final String DEVICENAME = "deviceName";
|
|
|
+ private static final String TOPIC = "topic";
|
|
|
+ private static final String MESSAGEID = "messageId";
|
|
|
+ private static final String ITEMS = "items";
|
|
|
+ private static final String VALUE = "value";
|
|
|
+ private static final String STATUS = "status";
|
|
|
+ private static final String CONTENT = "content";
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ @Lazy
|
|
|
+ private AliyunIotSubscribeClient client;
|
|
|
+ @Autowired
|
|
|
+ @Lazy
|
|
|
+ private IDeviceClient deviceService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private IIotMsgHandler iotMsgHandler;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ @Lazy
|
|
|
+ private IHospitalLogClient hospitalLogService;
|
|
|
+ @Value("${aliyun.server-subscription.enable:false}")
|
|
|
+ private boolean isEnable;
|
|
|
+
|
|
|
+ // 开启服务端订阅
|
|
|
+ @PostConstruct
|
|
|
+ public void subscribe(){
|
|
|
+ if (!isEnable){
|
|
|
+ log.info("订阅禁止");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ log.info("允许开启订阅");
|
|
|
+ try {
|
|
|
+ // 开启订阅
|
|
|
+ client.start(messageListener);
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ log.info("阿里云物联网订阅成功。。。。。。。。。。。");
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 描述:业务处理异步线程池,线程池参数可以根据您的业务特点调整,或者您也可以用其他异步方式处理接收到的消息。
|
|
|
+ */
|
|
|
+ private final static ExecutorService executorService = new ThreadPoolExecutor(
|
|
|
+ Runtime.getRuntime().availableProcessors(),
|
|
|
+ Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,
|
|
|
+ new LinkedBlockingQueue(50000),
|
|
|
+ new AliYunThreadFactory());
|
|
|
+
|
|
|
+ static class AliYunThreadFactory implements ThreadFactory {
|
|
|
+ static final AtomicInteger poolNumber = new AtomicInteger(1);
|
|
|
+ final ThreadGroup group;
|
|
|
+ final AtomicInteger threadNumber = new AtomicInteger(1);
|
|
|
+ final String namePrefix;
|
|
|
+
|
|
|
+ AliYunThreadFactory() {
|
|
|
+ SecurityManager s = System.getSecurityManager();
|
|
|
+ group = (s != null)? s.getThreadGroup() :
|
|
|
+ Thread.currentThread().getThreadGroup();
|
|
|
+ namePrefix = "ali-iot-" +
|
|
|
+ poolNumber.getAndIncrement();
|
|
|
+ }
|
|
|
+ @Override
|
|
|
+ public Thread newThread(Runnable r) {
|
|
|
+ Thread t = new Thread(group, r,
|
|
|
+ namePrefix + threadNumber.getAndIncrement(),
|
|
|
+ 0);
|
|
|
+ if (t.isDaemon()){
|
|
|
+ t.setDaemon(false);
|
|
|
+ }
|
|
|
+ if (t.getPriority() != Thread.NORM_PRIORITY){
|
|
|
+ t.setPriority(Thread.NORM_PRIORITY);
|
|
|
+ }
|
|
|
+ return t;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private MessageListener messageListener = (message) -> {
|
|
|
+ try {
|
|
|
+ //1.收到消息之后一定要ACK。
|
|
|
+ // 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。
|
|
|
+ // 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。
|
|
|
+ // message.acknowledge();
|
|
|
+ //2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。
|
|
|
+ // 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。
|
|
|
+ executorService.submit(()-> processMessage(message));
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("submit task occurs exception ", e);
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ private void processMessage(Message message) {
|
|
|
+ BusHospitalLogEntity hospitalLog = new BusHospitalLogEntity();
|
|
|
+ long startTime = System.currentTimeMillis();
|
|
|
+ String deviceName=null;
|
|
|
+ try {
|
|
|
+ // 获取主题,消息id和内容
|
|
|
+ String topic = message.getStringProperty(TOPIC);
|
|
|
+ String messageId = message.getStringProperty(MESSAGEID);
|
|
|
+ hospitalLog.setMsgId(messageId);
|
|
|
+ JSONObject content = JSON.parseObject(new String(message.getBody(byte[].class)));
|
|
|
+ log.info("阿里云物联网发送的数据:"+JSON.toJSONString(content));
|
|
|
+ // 设备名称
|
|
|
+ deviceName = content.getString(DEVICENAME);
|
|
|
+ hospitalLog.setIdentityCode(deviceName);
|
|
|
+ hospitalLog.setInput(content.toJSONString());
|
|
|
+ // 创建设备对象
|
|
|
+ Date now = new Date();
|
|
|
+ // 根据topic判断数据类型,设备上下线状态
|
|
|
+ if (topic.matches("^/as/mqtt/status/[\\w\\/]*")){
|
|
|
+ BusDeviceEntity device = new BusDeviceEntity();
|
|
|
+ device.setDeviceId(deviceName);
|
|
|
+ String status = content.getString(STATUS);
|
|
|
+ device.setStatus(EnumUtils.getDeviceStatusEnum2(status));
|
|
|
+ if ("online".equalsIgnoreCase(status)){
|
|
|
+ log.info(deviceName+"设备【{}】上线",deviceName);
|
|
|
+ }else if ("offline".equalsIgnoreCase(status)){
|
|
|
+ log.info(deviceName+"设备【{}】下线",deviceName);
|
|
|
+ }else {
|
|
|
+ log.info(deviceName+"设备【{}】未激活",deviceName);
|
|
|
+ }
|
|
|
+ hospitalLog.setType(HospitalLogEnum.ALI_STATUS);
|
|
|
+ // 更新设备状态
|
|
|
+ deviceService.updateDevice(device);
|
|
|
+ }else if (topic.matches("[\\w\\/]*event/property/post$")){//设备属性上报
|
|
|
+ // 设备属性集合
|
|
|
+ Items items = new Items(content.getJSONObject("items"));
|
|
|
+ log.info("上传设备属性:【{}】",JSONUtil.toJsonStr(items));
|
|
|
+ BusDeviceRunningEntity deviceRunning = new BusDeviceRunningEntity();
|
|
|
+ deviceRunning.updateFieldsByItems(deviceName,items);
|
|
|
+ deviceRunning.setMsgId(messageId);
|
|
|
+ hospitalLog.setType(HospitalLogEnum.ALI_DATA_UPLOAD);
|
|
|
+ hospitalLog.setTenantId(iotMsgHandler.sync(deviceRunning,deviceName).getTenantId());
|
|
|
+ }else if(topic.matches("[\\w\\/]+thing/lifecycle$")){// 设备生命周期
|
|
|
+ // 获取生命周期类型
|
|
|
+ String action = content.getString("action");
|
|
|
+ if ("create".equals(action)){
|
|
|
+ // 创建设备
|
|
|
+ BusDeviceEntity device = new BusDeviceEntity();
|
|
|
+ device.setDeviceId(deviceName);
|
|
|
+
|
|
|
+ // 配置信息
|
|
|
+ AliIotConfig config = new AliIotConfig();
|
|
|
+ config.setDeviceName(deviceName);
|
|
|
+ config.setDeviceSecret(content.getString("deviceSecret"));
|
|
|
+ config.setIotId(content.getString("iotId"));
|
|
|
+ config.setProductKey(content.getString("productKey"));
|
|
|
+ hospitalLog.setType(HospitalLogEnum.ALI_LIFECYCLE);
|
|
|
+ // 设置配置信息
|
|
|
+ device.setConfig(config);
|
|
|
+ deviceService.saveDevice(device);
|
|
|
+ }else if ("delete".equals(action)){
|
|
|
+ // 删除设备
|
|
|
+ hospitalLog.setType(HospitalLogEnum.ALI_DEL);
|
|
|
+ deviceService.removeByDeviceId(deviceName);
|
|
|
+ }
|
|
|
+ }else {
|
|
|
+ log.warn("阿里云数据【{}】,未知的topic:【{}】",content.toJSONString(),topic);
|
|
|
+ }
|
|
|
+ hospitalLog.setSuccess(true);
|
|
|
+ } catch (Exception e) {
|
|
|
+ hospitalLog.setSuccess(false);
|
|
|
+ hospitalLog.setMessage(ExceptionUtil.getExceptionMsg(e));
|
|
|
+ log.error("阿里云数据【{}】数据处理失败 ", JSONUtil.toJsonStr(message), e);
|
|
|
+ }finally {
|
|
|
+ if(CharSequenceUtil.isNotBlank(deviceName)&&CharSequenceUtil.isEmpty(hospitalLog.getTenantId())){
|
|
|
+ BusDeviceEntity device = deviceService.getByDeviceId(deviceName);
|
|
|
+ hospitalLog.setTenantId(device.getTenantId());
|
|
|
+ }
|
|
|
+ long entTime = System.currentTimeMillis();
|
|
|
+ hospitalLog.setUseTime(entTime-startTime);
|
|
|
+ hospitalLog.setId(IdWorker.getIdStr());
|
|
|
+ if(CharSequenceUtil.isEmpty(hospitalLog.getTenantId())){
|
|
|
+ log.warn("日志【{}】医院为空,进行自动填充",JSONUtil.toJsonStr(hospitalLog));
|
|
|
+ }
|
|
|
+ hospitalLogService.save(hospitalLog);
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+}
|