|
|
@@ -1,35 +1,28 @@
|
|
|
package com.nb.aliyun.service;
|
|
|
|
|
|
-import cn.hutool.core.text.CharSequenceUtil;
|
|
|
+import cn.hutool.core.util.ObjectUtil;
|
|
|
+import cn.hutool.core.util.StrUtil;
|
|
|
+import cn.hutool.json.JSONObject;
|
|
|
import cn.hutool.json.JSONUtil;
|
|
|
-import com.alibaba.fastjson.JSON;
|
|
|
-import com.alibaba.fastjson.JSONObject;
|
|
|
-import com.baomidou.mybatisplus.core.toolkit.IdWorker;
|
|
|
-import com.nb.web.api.bean.AliIotConfig;
|
|
|
+import com.nb.aliyun.service.bean.AliIotMessagePojo;
|
|
|
+import com.nb.aliyun.service.handler.AbstractAliIotMsgHandler;
|
|
|
+import com.nb.web.api.HospitalProperties;
|
|
|
import com.nb.web.api.entity.BusDeviceEntity;
|
|
|
-import com.nb.web.api.entity.BusHospitalLogEntity;
|
|
|
-import com.nb.web.api.entity.common.BusDeviceRunningEntity;
|
|
|
-import com.nb.web.api.enums.HospitalLogEnum;
|
|
|
-import com.nb.web.api.feign.IHospitalLogClient;
|
|
|
-import com.nb.web.api.feign.IIotMsgHandler;
|
|
|
-import com.nb.web.api.utils.EnumUtils;
|
|
|
-import com.nb.core.utils.ExceptionUtil;
|
|
|
import com.nb.web.api.feign.IDeviceClient;
|
|
|
-import com.nb.web.api.utils.Items;
|
|
|
-import io.netty.util.concurrent.SingleThreadEventExecutor;
|
|
|
+import com.nb.web.api.feign.IHospitalClient;
|
|
|
+import com.nb.web.api.feign.result.HospitalResult;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
-import org.springframework.beans.factory.annotation.Value;
|
|
|
+import org.springframework.boot.CommandLineRunner;
|
|
|
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
|
|
import org.springframework.context.annotation.Lazy;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
-
|
|
|
-import javax.annotation.PostConstruct;
|
|
|
import javax.jms.Message;
|
|
|
import javax.jms.MessageListener;
|
|
|
-import java.util.*;
|
|
|
import java.util.concurrent.*;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
-
|
|
|
+import java.util.*;
|
|
|
/**
|
|
|
* @Author 龙三郎
|
|
|
* @Date 2022-4-06 16:22:13
|
|
|
@@ -38,65 +31,24 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|
|
*/
|
|
|
@Service
|
|
|
@Slf4j
|
|
|
-public class AliyunConsumerGroupService {
|
|
|
- private static final String IOTID = "iotId";
|
|
|
- private static final String PRODUCTKEY = "productKey";
|
|
|
- private static final String DEVICENAME = "deviceName";
|
|
|
- private static final String TOPIC = "topic";
|
|
|
- private static final String MESSAGEID = "messageId";
|
|
|
- private static final String ITEMS = "items";
|
|
|
- private static final String VALUE = "value";
|
|
|
- private static final String STATUS = "status";
|
|
|
- private static final String CONTENT = "payload";
|
|
|
+@ConditionalOnProperty(name = "aliyun.server-subscription.enable",havingValue = "true",matchIfMissing = false)
|
|
|
+public class AliyunConsumerGroupService implements CommandLineRunner {
|
|
|
|
|
|
@Autowired
|
|
|
@Lazy
|
|
|
private AliyunIotSubscribeClient client;
|
|
|
- @Autowired
|
|
|
- @Lazy
|
|
|
- private IDeviceClient deviceService;
|
|
|
|
|
|
@Autowired
|
|
|
- private IIotMsgHandler iotMsgHandler;
|
|
|
-
|
|
|
- /**
|
|
|
- * 阿里云订阅延迟启动,给ws重连时间
|
|
|
- */
|
|
|
- @Value("${tio.websocket.server.heartbeat-timeout:0}")
|
|
|
- private Long delayTime;
|
|
|
+ private List<AbstractAliIotMsgHandler> handlers;
|
|
|
|
|
|
@Autowired
|
|
|
- @Lazy
|
|
|
- private IHospitalLogClient hospitalLogService;
|
|
|
-
|
|
|
- @Value("${aliyun.server-subscription.enable:false}")
|
|
|
- private boolean isEnable;
|
|
|
-
|
|
|
- // 开启服务端订阅
|
|
|
- @PostConstruct
|
|
|
- public void subscribe(){
|
|
|
- if (!isEnable){
|
|
|
- log.info("订阅禁止");
|
|
|
- return;
|
|
|
- }
|
|
|
- log.info("允许开启订阅");
|
|
|
- try {
|
|
|
- client.start(messageListener);
|
|
|
- } catch (Exception e) {
|
|
|
- e.printStackTrace();
|
|
|
- }
|
|
|
- log.info("阿里云物联网订阅成功。。。。。。。。。。。");
|
|
|
-// // 开启订阅
|
|
|
-// Executors.newSingleThreadScheduledExecutor()
|
|
|
-// .schedule(()->{
|
|
|
-//
|
|
|
-// },
|
|
|
-// Optional.ofNullable(delayTime).orElse(0L)*2,TimeUnit.SECONDS ) ;
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
+ private RabbitTemplate rabbitTemplate;
|
|
|
|
|
|
+ @Autowired
|
|
|
+ private IDeviceClient deviceService;
|
|
|
|
|
|
+ @Autowired
|
|
|
+ private IHospitalClient hospitalClient;
|
|
|
/**
|
|
|
* 描述:业务处理异步线程池,线程池参数可以根据您的业务特点调整,或者您也可以用其他异步方式处理接收到的消息。
|
|
|
*/
|
|
|
@@ -106,6 +58,16 @@ public class AliyunConsumerGroupService {
|
|
|
new LinkedBlockingQueue(50000),
|
|
|
new AliYunThreadFactory());
|
|
|
|
|
|
+ @Override
|
|
|
+ public void run(String... args) throws Exception {
|
|
|
+ try {
|
|
|
+ client.start(messageListener);
|
|
|
+ log.info("阿里云物联网订阅成功。");
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("[阿里云物联网订阅失败],",e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
static class AliYunThreadFactory implements ThreadFactory {
|
|
|
static final AtomicInteger poolNumber = new AtomicInteger(1);
|
|
|
final ThreadGroup group;
|
|
|
@@ -148,99 +110,40 @@ public class AliyunConsumerGroupService {
|
|
|
}
|
|
|
};
|
|
|
|
|
|
+ public void doProcessMessage(AliIotMessagePojo msg,String tenantId){
|
|
|
+ for (AbstractAliIotMsgHandler handler : handlers) {
|
|
|
+ handler.handle(msg,tenantId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private void processMessage(Message message) {
|
|
|
- BusHospitalLogEntity hospitalLog = new BusHospitalLogEntity();
|
|
|
- long startTime = System.currentTimeMillis();
|
|
|
- String deviceName=null;
|
|
|
try {
|
|
|
// 获取主题,消息id和内容
|
|
|
- String topic = message.getStringProperty(TOPIC);
|
|
|
- String messageId = message.getStringProperty(MESSAGEID);
|
|
|
- hospitalLog.setMsgId(messageId);
|
|
|
- JSONObject content = JSON.parseObject(new String(message.getBody(byte[].class)));
|
|
|
- log.info("阿里云物联网发送的数据:"+JSON.toJSONString(content));
|
|
|
- // 设备名称
|
|
|
- deviceName = content.getString(DEVICENAME);
|
|
|
- hospitalLog.setIdentityCode(deviceName);
|
|
|
- hospitalLog.setInput(content.toJSONString());
|
|
|
- // 创建设备对象
|
|
|
- Date now = new Date();
|
|
|
- // 根据topic判断数据类型,设备上下线状态
|
|
|
- if (topic.matches("^/as/mqtt/status/[\\w\\/]*")){
|
|
|
- BusDeviceEntity device = new BusDeviceEntity();
|
|
|
- device.setDeviceId(deviceName);
|
|
|
- String status = content.getString(STATUS);
|
|
|
- device.setStatus(EnumUtils.getDeviceStatusEnum2(status));
|
|
|
- if ("online".equalsIgnoreCase(status)){
|
|
|
- log.info(deviceName+"设备【{}】上线",deviceName);
|
|
|
- }else if ("offline".equalsIgnoreCase(status)){
|
|
|
- log.info(deviceName+"设备【{}】下线",deviceName);
|
|
|
- }else {
|
|
|
- log.info(deviceName+"设备【{}】未激活",deviceName);
|
|
|
- }
|
|
|
- hospitalLog.setType(HospitalLogEnum.ALI_STATUS);
|
|
|
- // 更新设备状态
|
|
|
- deviceService.updateDevice(device);
|
|
|
- }else if (topic.matches("[\\w\\/]*event/property/post$")){//设备属性上报
|
|
|
- // 设备属性集合
|
|
|
- Items items = new Items(content.getJSONObject("items"));
|
|
|
- log.info("上传设备属性:【{}】",JSONUtil.toJsonStr(items));
|
|
|
- BusDeviceRunningEntity deviceRunning = new BusDeviceRunningEntity();
|
|
|
- deviceRunning.updateFieldsByItems(deviceName,items);
|
|
|
- deviceRunning.setMsgId(messageId);
|
|
|
- hospitalLog.setType(HospitalLogEnum.ALI_DATA_UPLOAD);
|
|
|
- BusDeviceRunningEntity result = iotMsgHandler.sync(deviceRunning, deviceName);
|
|
|
- hospitalLog.setTenantId(result.getTenantId());
|
|
|
- hospitalLog.setResult(JSONUtil.toJsonStr(result));
|
|
|
- }else if(topic.matches("[\\w\\/]+thing/lifecycle$")){// 设备生命周期
|
|
|
- // 获取生命周期类型
|
|
|
- String action = content.getString("action");
|
|
|
- if ("create".equals(action)){
|
|
|
- // 创建设备
|
|
|
- BusDeviceEntity device = new BusDeviceEntity();
|
|
|
- device.setDeviceId(deviceName);
|
|
|
-
|
|
|
- // 配置信息
|
|
|
- AliIotConfig config = new AliIotConfig();
|
|
|
- config.setDeviceName(deviceName);
|
|
|
- config.setDeviceSecret(content.getString("deviceSecret"));
|
|
|
- config.setIotId(content.getString("iotId"));
|
|
|
- config.setProductKey(content.getString("productKey"));
|
|
|
- hospitalLog.setType(HospitalLogEnum.ALI_LIFECYCLE);
|
|
|
- // 设置配置信息
|
|
|
- device.setConfig(config);
|
|
|
- deviceService.saveDevice(device);
|
|
|
- }else if ("delete".equals(action)){
|
|
|
- // 删除设备
|
|
|
- hospitalLog.setType(HospitalLogEnum.ALI_DEL);
|
|
|
- deviceService.removeByDeviceId(deviceName);
|
|
|
- }
|
|
|
- }else {
|
|
|
- log.warn("阿里云数据【{}】,未知的topic:【{}】",content.toJSONString(),topic);
|
|
|
- }
|
|
|
- hospitalLog.setSuccess(true);
|
|
|
+ String topic = message.getStringProperty(AliIotConstant.TOPIC);
|
|
|
+ String messageId = message.getStringProperty(AliIotConstant.MESSAGE_ID);
|
|
|
+ JSONObject content = JSONUtil.parseObj(new String(message.getBody(byte[].class)));
|
|
|
+ log.info("阿里云物联网发送的数据:"+content.toString());
|
|
|
+ AliIotMessagePojo msg = AliIotMessagePojo.of(topic, messageId, content);
|
|
|
+ String deviceName = content.getStr(AliIotConstant.DEVICE_NAME);
|
|
|
+ BusDeviceEntity device = deviceService.getByDeviceId(deviceName);
|
|
|
+
|
|
|
+ //分发数据
|
|
|
+ distributeIotMsg(msg, ObjectUtil.isNull(device)?null:device.getTenantId());
|
|
|
+ doProcessMessage(msg,ObjectUtil.isNull(device)?null:device.getTenantId());
|
|
|
} catch (Exception e) {
|
|
|
- hospitalLog.setSuccess(false);
|
|
|
- hospitalLog.setMessage(ExceptionUtil.getExceptionMsg(e));
|
|
|
log.error("阿里云数据【{}】数据处理失败 ", JSONUtil.toJsonStr(message), e);
|
|
|
- }finally {
|
|
|
- if(CharSequenceUtil.isNotBlank(deviceName)&&CharSequenceUtil.isEmpty(hospitalLog.getTenantId())){
|
|
|
- BusDeviceEntity device = deviceService.getByDeviceId(deviceName);
|
|
|
- hospitalLog.setTenantId(device.getTenantId());
|
|
|
- }
|
|
|
- long entTime = System.currentTimeMillis();
|
|
|
- hospitalLog.setUseTime(entTime-startTime);
|
|
|
- hospitalLog.setId(IdWorker.getIdStr());
|
|
|
- if(CharSequenceUtil.isEmpty(hospitalLog.getTenantId())){
|
|
|
- log.warn("日志【{}】医院为空,进行自动填充",JSONUtil.toJsonStr(hospitalLog));
|
|
|
- }
|
|
|
- hospitalLogService.save(hospitalLog);
|
|
|
}
|
|
|
-
|
|
|
}
|
|
|
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
+ /**
|
|
|
+ * 向本地服务器分发数据
|
|
|
+ * @param msg
|
|
|
+ */
|
|
|
+ private void distributeIotMsg(AliIotMessagePojo msg,String tenantId){
|
|
|
+ if(StrUtil.isNullOrUndefined(tenantId)){
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ HospitalResult hospital = hospitalClient.findById(tenantId);
|
|
|
+ rabbitTemplate.convertAndSend(AliIotConstant.Exchange,hospital.getCode(),msg);
|
|
|
+ }
|
|
|
}
|