|
|
@@ -1,28 +1,17 @@
|
|
|
package com.nb.aliyun.service;
|
|
|
|
|
|
-import cn.hutool.core.util.ObjectUtil;
|
|
|
-import cn.hutool.core.util.StrUtil;
|
|
|
import cn.hutool.json.JSONObject;
|
|
|
import cn.hutool.json.JSONUtil;
|
|
|
import com.nb.aliyun.service.bean.AliIotMessagePojo;
|
|
|
-import com.nb.aliyun.service.handler.AbstractAliIotMsgHandler;
|
|
|
-import com.nb.web.api.HospitalProperties;
|
|
|
-import com.nb.web.api.entity.BusDeviceEntity;
|
|
|
-import com.nb.web.api.feign.IDeviceClient;
|
|
|
-import com.nb.web.api.feign.IHospitalClient;
|
|
|
-import com.nb.web.api.feign.result.HospitalResult;
|
|
|
+import com.nb.aliyun.service.process.DeviceMsgHandler;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
-import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
|
|
-import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.boot.CommandLineRunner;
|
|
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
|
|
-import org.springframework.context.annotation.Lazy;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
import javax.jms.Message;
|
|
|
import javax.jms.MessageListener;
|
|
|
import java.util.concurrent.*;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
-import java.util.*;
|
|
|
/**
|
|
|
* @Author 龙三郎
|
|
|
* @Date 2022-4-06 16:22:13
|
|
|
@@ -34,21 +23,16 @@ import java.util.*;
|
|
|
@ConditionalOnProperty(name = "aliyun.server-subscription.enable",havingValue = "true",matchIfMissing = false)
|
|
|
public class AliyunConsumerGroupService implements CommandLineRunner {
|
|
|
|
|
|
- @Autowired
|
|
|
- @Lazy
|
|
|
- private AliyunIotSubscribeClient client;
|
|
|
+ private final AliyunIotSubscribeClient client;
|
|
|
|
|
|
- @Autowired
|
|
|
- private List<AbstractAliIotMsgHandler> handlers;
|
|
|
+ private final DeviceMsgHandler deviceMsgHandler;
|
|
|
|
|
|
- @Autowired
|
|
|
- private RabbitTemplate rabbitTemplate;
|
|
|
|
|
|
- @Autowired
|
|
|
- private IDeviceClient deviceService;
|
|
|
+ public AliyunConsumerGroupService(AliyunIotSubscribeClient client, DeviceMsgHandler deviceMsgHandler) {
|
|
|
+ this.client = client;
|
|
|
+ this.deviceMsgHandler = deviceMsgHandler;
|
|
|
+ }
|
|
|
|
|
|
- @Autowired
|
|
|
- private IHospitalClient hospitalClient;
|
|
|
/**
|
|
|
* 描述:业务处理异步线程池,线程池参数可以根据您的业务特点调整,或者您也可以用其他异步方式处理接收到的消息。
|
|
|
*/
|
|
|
@@ -110,12 +94,6 @@ public class AliyunConsumerGroupService implements CommandLineRunner {
|
|
|
}
|
|
|
};
|
|
|
|
|
|
- public void doProcessMessage(AliIotMessagePojo msg,String tenantId){
|
|
|
- for (AbstractAliIotMsgHandler handler : handlers) {
|
|
|
- handler.handle(msg,tenantId);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
private void processMessage(Message message) {
|
|
|
try {
|
|
|
// 获取主题,消息id和内容
|
|
|
@@ -124,26 +102,9 @@ public class AliyunConsumerGroupService implements CommandLineRunner {
|
|
|
JSONObject content = JSONUtil.parseObj(new String(message.getBody(byte[].class)));
|
|
|
log.info("阿里云物联网发送的数据:"+content.toString());
|
|
|
AliIotMessagePojo msg = AliIotMessagePojo.of(topic, messageId, content);
|
|
|
- String deviceName = content.getStr(AliIotConstant.DEVICE_NAME);
|
|
|
- BusDeviceEntity device = deviceService.getByDeviceId(deviceName);
|
|
|
-
|
|
|
- //分发数据
|
|
|
- distributeIotMsg(msg, ObjectUtil.isNull(device)?null:device.getTenantId());
|
|
|
- doProcessMessage(msg,ObjectUtil.isNull(device)?null:device.getTenantId());
|
|
|
+ deviceMsgHandler.handleMessage(msg,true);
|
|
|
} catch (Exception e) {
|
|
|
- log.error("阿里云数据【{}】数据处理失败 ", JSONUtil.toJsonStr(message), e);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 向本地服务器分发数据
|
|
|
- * @param msg
|
|
|
- */
|
|
|
- private void distributeIotMsg(AliIotMessagePojo msg,String tenantId){
|
|
|
- if(StrUtil.isNullOrUndefined(tenantId)){
|
|
|
- return;
|
|
|
+ log.error("阿里云数据【{}】处理失败 ", JSONUtil.toJsonStr(message), e);
|
|
|
}
|
|
|
- HospitalResult hospital = hospitalClient.findById(tenantId);
|
|
|
- rabbitTemplate.convertAndSend(AliIotConstant.Exchange,hospital.getCode(),msg);
|
|
|
}
|
|
|
}
|