|
|
@@ -10,10 +10,12 @@ import com.coffee.aliyun.utils.Items;
|
|
|
import com.coffee.bus.bean.AliIotConfig;
|
|
|
import com.coffee.bus.entity.BusDeviceEntity;
|
|
|
import com.coffee.bus.entity.BusDeviceRunningEntity;
|
|
|
+import com.coffee.bus.entity.BusHospitalLogEntity;
|
|
|
+import com.coffee.bus.enums.HospitalLogEnum;
|
|
|
import com.coffee.bus.listener.event.bean.DeviceInfoEvent;
|
|
|
-import com.coffee.bus.service.LocalBusDeviceHistoryService;
|
|
|
import com.coffee.bus.service.LocalBusDeviceService;
|
|
|
-import com.coffee.bus.service.LocalBusInfusionHistoryService;
|
|
|
+import com.coffee.bus.service.LocalBusHospitalLogService;
|
|
|
+import com.coffee.bus.websocket.listener.DeviceInfoListener;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
|
@@ -26,34 +28,40 @@ import javax.jms.MessageListener;
|
|
|
import java.util.Date;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.Map;
|
|
|
-import java.util.concurrent.ExecutorService;
|
|
|
-import java.util.concurrent.LinkedBlockingQueue;
|
|
|
-import java.util.concurrent.ThreadPoolExecutor;
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.*;
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
/**
|
|
|
-* @Author 龙三郎
|
|
|
-* @Date 2022-4-06 16:22:13
|
|
|
-* @Version 1.0
|
|
|
-* @Description 阿里云物联网平台服务端订阅
|
|
|
-*/
|
|
|
+ * @Author 龙三郎
|
|
|
+ * @Date 2022-4-06 16:22:13
|
|
|
+ * @Version 1.0
|
|
|
+ * @Description 阿里云物联网平台服务端订阅
|
|
|
+ */
|
|
|
@Service
|
|
|
@Slf4j
|
|
|
public class AliyunConsumerGroupService {
|
|
|
+ private static final String IOTID = "iotId";
|
|
|
+ private static final String PRODUCTKEY = "productKey";
|
|
|
+ private static final String DEVICENAME = "deviceName";
|
|
|
+ private static final String TOPIC = "topic";
|
|
|
+ private static final String MESSAGEID = "messageId";
|
|
|
+ private static final String ITEMS = "items";
|
|
|
+ private static final String VALUE = "value";
|
|
|
+ private static final String STATUS = "status";
|
|
|
+ private static final String CONTENT = "content";
|
|
|
+
|
|
|
@Autowired
|
|
|
@Lazy
|
|
|
private AliyunIotSubscribeClient client;
|
|
|
@Autowired
|
|
|
@Lazy
|
|
|
private LocalBusDeviceService deviceService;
|
|
|
- @Autowired
|
|
|
- @Lazy
|
|
|
- private LocalBusInfusionHistoryService infusionHistoryService;
|
|
|
|
|
|
+ @Autowired
|
|
|
+ private DeviceInfoListener deviceInfoListener;
|
|
|
@Autowired
|
|
|
@Lazy
|
|
|
- private LocalBusDeviceHistoryService deviceHistoryService;
|
|
|
-
|
|
|
+ private LocalBusHospitalLogService hospitalLogService;
|
|
|
@Value("${aliyun.server-subscription.enable:false}")
|
|
|
private boolean isEnable;
|
|
|
|
|
|
@@ -76,44 +84,63 @@ public class AliyunConsumerGroupService {
|
|
|
|
|
|
|
|
|
|
|
|
- //业务处理异步线程池,线程池参数可以根据您的业务特点调整,或者您也可以用其他异步方式处理接收到的消息。
|
|
|
+ /**
|
|
|
+ * 描述:业务处理异步线程池,线程池参数可以根据您的业务特点调整,或者您也可以用其他异步方式处理接收到的消息。
|
|
|
+ */
|
|
|
private final static ExecutorService executorService = new ThreadPoolExecutor(
|
|
|
Runtime.getRuntime().availableProcessors(),
|
|
|
Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,
|
|
|
- new LinkedBlockingQueue(50000));
|
|
|
+ new LinkedBlockingQueue(50000),
|
|
|
+ new AliYunThreadFactory());
|
|
|
+
|
|
|
+ static class AliYunThreadFactory implements ThreadFactory {
|
|
|
+ static final AtomicInteger poolNumber = new AtomicInteger(1);
|
|
|
+ final ThreadGroup group;
|
|
|
+ final AtomicInteger threadNumber = new AtomicInteger(1);
|
|
|
+ final String namePrefix;
|
|
|
+
|
|
|
+ AliYunThreadFactory() {
|
|
|
+ SecurityManager s = System.getSecurityManager();
|
|
|
+ group = (s != null)? s.getThreadGroup() :
|
|
|
+ Thread.currentThread().getThreadGroup();
|
|
|
+ namePrefix = "ali-iot-" +
|
|
|
+ poolNumber.getAndIncrement();
|
|
|
+ }
|
|
|
+ @Override
|
|
|
+ public Thread newThread(Runnable r) {
|
|
|
+ Thread t = new Thread(group, r,
|
|
|
+ namePrefix + threadNumber.getAndIncrement(),
|
|
|
+ 0);
|
|
|
+ if (t.isDaemon()){
|
|
|
+ t.setDaemon(false);
|
|
|
+ }
|
|
|
+ if (t.getPriority() != Thread.NORM_PRIORITY){
|
|
|
+ t.setPriority(Thread.NORM_PRIORITY);
|
|
|
+ }
|
|
|
+ return t;
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
private MessageListener messageListener = (message) -> {
|
|
|
- try {
|
|
|
- //1.收到消息之后一定要ACK。
|
|
|
- // 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。
|
|
|
- // 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。
|
|
|
- // message.acknowledge();
|
|
|
- //2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。
|
|
|
- // 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。
|
|
|
- executorService.submit(()-> processMessage(message));
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("submit task occurs exception ", e);
|
|
|
- }
|
|
|
+ try {
|
|
|
+ //1.收到消息之后一定要ACK。
|
|
|
+ // 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。
|
|
|
+ // 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。
|
|
|
+ // message.acknowledge();
|
|
|
+ //2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。
|
|
|
+ // 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。
|
|
|
+ executorService.submit(()-> processMessage(message));
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("submit task occurs exception ", e);
|
|
|
+ }
|
|
|
};
|
|
|
|
|
|
- /**
|
|
|
- * 处理逻辑,加上锁
|
|
|
- * @param message
|
|
|
- */
|
|
|
- private static final String IOTID = "iotId";
|
|
|
- private static final String PRODUCTKEY = "productKey";
|
|
|
- private static final String DEVICENAME = "deviceName";
|
|
|
- private static final String TOPIC = "topic";
|
|
|
- private static final String MESSAGEID = "messageId";
|
|
|
- private static final String ITEMS = "items";
|
|
|
- private static final String VALUE = "value";
|
|
|
- private static final String STATUS = "status";
|
|
|
- private static final String CONTENT = "content";
|
|
|
-
|
|
|
private void processMessage(Message message) {
|
|
|
|
|
|
- PlatformLog platformLog = new PlatformLog();
|
|
|
Map<String, Object> platformData = new HashMap<>();
|
|
|
+ BusHospitalLogEntity hospitalLog = new BusHospitalLogEntity();
|
|
|
+ hospitalLog.setType(HospitalLogEnum.ALI);
|
|
|
+ long startTime = System.currentTimeMillis();
|
|
|
try {
|
|
|
// 获取主题,消息id和内容
|
|
|
String topic = message.getStringProperty(TOPIC);
|
|
|
@@ -127,20 +154,23 @@ public class AliyunConsumerGroupService {
|
|
|
|
|
|
// 设备名称
|
|
|
String deviceName = content.getString(DEVICENAME);
|
|
|
+
|
|
|
+ hospitalLog.setIdentityCode(deviceName);
|
|
|
+ hospitalLog.setInput(content.toJSONString());
|
|
|
// 创建设备对象
|
|
|
Date now = new Date();
|
|
|
- // 根据topic判断数据类型
|
|
|
- if (topic.matches("^/as/mqtt/status/[\\w\\/]*")){//设备上下线状态
|
|
|
+ // 根据topic判断数据类型,设备上下线状态
|
|
|
+ if (topic.matches("^/as/mqtt/status/[\\w\\/]*")){
|
|
|
BusDeviceEntity device = new BusDeviceEntity();
|
|
|
device.setDeviceId(deviceName);
|
|
|
String status = content.getString(STATUS);
|
|
|
device.setStatus(EnumUtils.getDeviceStatusEnum2(status));
|
|
|
- if (status.equals("online")){
|
|
|
- log.info(deviceName+"设备上线");
|
|
|
- }else if (status.equals("offline")){
|
|
|
- log.info(deviceName+"设备下线");
|
|
|
+ if ("online".equalsIgnoreCase(status)){
|
|
|
+ log.info(deviceName+"设备【{}】上线",deviceName);
|
|
|
+ }else if ("offline".equalsIgnoreCase(status)){
|
|
|
+ log.info(deviceName+"设备【{}】下线",deviceName);
|
|
|
}else {
|
|
|
- log.info(deviceName+"设备未激活");
|
|
|
+ log.info(deviceName+"设备【{}】未激活",deviceName);
|
|
|
}
|
|
|
// 更新设备状态
|
|
|
deviceService.updateDevice(device);
|
|
|
@@ -149,20 +179,7 @@ public class AliyunConsumerGroupService {
|
|
|
Items items = new Items(content.getJSONObject("items"));
|
|
|
BusDeviceRunningEntity deviceRunning = new BusDeviceRunningEntity();
|
|
|
deviceRunning.updateFieldsByItems(deviceName,items);
|
|
|
- // 发布事件
|
|
|
- SpringUtil.publishEvent(new DeviceInfoEvent(this,deviceRunning,deviceName));
|
|
|
-
|
|
|
-
|
|
|
-// BusInfusionHistoryEntity infusionHistory = infusionHistoryService.saveInfusion(deviceName,items);
|
|
|
-// // 保存到device_history表中
|
|
|
-// BusDeviceHistoryEntity deviceHistory = new BusDeviceHistoryEntity();
|
|
|
-// // 将infusionHistory对象的值,复制到deviceHistory对象
|
|
|
-// BeanUtils.copyProperties(infusionHistory,deviceHistory);
|
|
|
-// deviceHistory.setId(null);
|
|
|
-// deviceHistory.setInfusionId(infusionHistory.getId());
|
|
|
-// // 保存设备的历史数据
|
|
|
-// deviceHistoryService.save(deviceHistory);
|
|
|
-
|
|
|
+ deviceInfoListener.deviceInfoDetail(new DeviceInfoEvent(this,deviceRunning,deviceName));
|
|
|
}else if(topic.matches("[\\w\\/]+thing/lifecycle$")){// 设备生命周期
|
|
|
// 获取生命周期类型
|
|
|
String action = content.getString("action");
|
|
|
@@ -190,18 +207,19 @@ public class AliyunConsumerGroupService {
|
|
|
// 删除设备
|
|
|
deviceService.removeByDeviceId(deviceName);
|
|
|
}
|
|
|
-
|
|
|
}else {
|
|
|
- log.error("未知的topic:"+topic);
|
|
|
+ log.warn("阿里云数据【{}】,未知的topic:【{}】",content.toJSONString(),topic);
|
|
|
}
|
|
|
-
|
|
|
+ hospitalLog.setSuccess(true);
|
|
|
} catch (Exception e) {
|
|
|
- // 错误标志
|
|
|
- platformLog.setIsError("1");
|
|
|
- platformLog.setErrorLog(e.getMessage());
|
|
|
- log.error("数据处理失败 ", e);
|
|
|
+ hospitalLog.setSuccess(false);
|
|
|
+ hospitalLog.setMessage(e.toString());
|
|
|
+ log.error("阿里云数据【{}】数据处理失败 ", JSONUtil.toJsonStr(message), e);
|
|
|
+ }finally {
|
|
|
+ long entTime = System.currentTimeMillis();
|
|
|
+ hospitalLog.setUseTime(entTime-startTime);
|
|
|
+ hospitalLogService.save(hospitalLog);
|
|
|
}
|
|
|
- // 存储日志
|
|
|
|
|
|
}
|
|
|
|