|
@@ -1,10 +1,12 @@
|
|
|
-package com.nb.aliyun.service;
|
|
|
|
|
|
|
+package com.nb.aliyun.service.consumer;
|
|
|
|
|
|
|
|
import cn.hutool.core.text.CharSequenceUtil;
|
|
import cn.hutool.core.text.CharSequenceUtil;
|
|
|
|
|
+import cn.hutool.extra.spring.SpringUtil;
|
|
|
import cn.hutool.json.JSONUtil;
|
|
import cn.hutool.json.JSONUtil;
|
|
|
import com.alibaba.fastjson.JSON;
|
|
import com.alibaba.fastjson.JSON;
|
|
|
import com.alibaba.fastjson.JSONObject;
|
|
import com.alibaba.fastjson.JSONObject;
|
|
|
import com.baomidou.mybatisplus.core.toolkit.IdWorker;
|
|
import com.baomidou.mybatisplus.core.toolkit.IdWorker;
|
|
|
|
|
+import com.nb.aliyun.service.pojo.AliIotConsumerPojo;
|
|
|
import com.nb.web.api.bean.AliIotConfig;
|
|
import com.nb.web.api.bean.AliIotConfig;
|
|
|
import com.nb.web.api.entity.BusDeviceEntity;
|
|
import com.nb.web.api.entity.BusDeviceEntity;
|
|
|
import com.nb.web.api.entity.BusHospitalLogEntity;
|
|
import com.nb.web.api.entity.BusHospitalLogEntity;
|
|
@@ -16,83 +18,59 @@ import com.nb.web.api.utils.EnumUtils;
|
|
|
import com.nb.core.utils.ExceptionUtil;
|
|
import com.nb.core.utils.ExceptionUtil;
|
|
|
import com.nb.web.api.feign.IDeviceClient;
|
|
import com.nb.web.api.feign.IDeviceClient;
|
|
|
import com.nb.web.api.utils.Items;
|
|
import com.nb.web.api.utils.Items;
|
|
|
-import io.netty.util.concurrent.SingleThreadEventExecutor;
|
|
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
-import org.springframework.beans.factory.annotation.Autowired;
|
|
|
|
|
-import org.springframework.beans.factory.annotation.Value;
|
|
|
|
|
-import org.springframework.context.annotation.Lazy;
|
|
|
|
|
-import org.springframework.stereotype.Service;
|
|
|
|
|
-
|
|
|
|
|
-import javax.annotation.PostConstruct;
|
|
|
|
|
import javax.jms.Message;
|
|
import javax.jms.Message;
|
|
|
import javax.jms.MessageListener;
|
|
import javax.jms.MessageListener;
|
|
|
-import java.util.*;
|
|
|
|
|
import java.util.concurrent.*;
|
|
import java.util.concurrent.*;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
|
|
+ * 4G和Nb型网络泵消费者处理组
|
|
|
|
|
+ *
|
|
|
|
|
+ *
|
|
|
* @Author 龙三郎
|
|
* @Author 龙三郎
|
|
|
* @Date 2022-4-06 16:22:13
|
|
* @Date 2022-4-06 16:22:13
|
|
|
* @Version 1.0
|
|
* @Version 1.0
|
|
|
* @Description 阿里云物联网平台服务端订阅
|
|
* @Description 阿里云物联网平台服务端订阅
|
|
|
|
|
+ *
|
|
|
|
|
+ *
|
|
|
|
|
+ * @Author 李放
|
|
|
|
|
+ * @Date 2024-02-18 16:22:13
|
|
|
|
|
+ * @Version 2.0
|
|
|
|
|
+ * @Description 改为多连接配置订阅
|
|
|
*/
|
|
*/
|
|
|
-@Service
|
|
|
|
|
@Slf4j
|
|
@Slf4j
|
|
|
-public class AliyunConsumerGroupService {
|
|
|
|
|
- private static final String IOTID = "iotId";
|
|
|
|
|
- private static final String PRODUCTKEY = "productKey";
|
|
|
|
|
|
|
+public class NBAndFourGConsumerGroupService extends AbstractAliConsumer {
|
|
|
private static final String DEVICENAME = "deviceName";
|
|
private static final String DEVICENAME = "deviceName";
|
|
|
private static final String TOPIC = "topic";
|
|
private static final String TOPIC = "topic";
|
|
|
private static final String MESSAGEID = "messageId";
|
|
private static final String MESSAGEID = "messageId";
|
|
|
- private static final String ITEMS = "items";
|
|
|
|
|
- private static final String VALUE = "value";
|
|
|
|
|
private static final String STATUS = "status";
|
|
private static final String STATUS = "status";
|
|
|
- private static final String CONTENT = "payload";
|
|
|
|
|
|
|
|
|
|
- @Autowired
|
|
|
|
|
- @Lazy
|
|
|
|
|
- private AliyunIotSubscribeClient client;
|
|
|
|
|
- @Autowired
|
|
|
|
|
- @Lazy
|
|
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
private IDeviceClient deviceService;
|
|
private IDeviceClient deviceService;
|
|
|
|
|
|
|
|
- @Autowired
|
|
|
|
|
private IIotMsgHandler iotMsgHandler;
|
|
private IIotMsgHandler iotMsgHandler;
|
|
|
|
|
|
|
|
- /**
|
|
|
|
|
- * 阿里云订阅延迟启动,给ws重连时间
|
|
|
|
|
- */
|
|
|
|
|
- @Value("${tio.websocket.server.heartbeat-timeout:0}")
|
|
|
|
|
- private Long delayTime;
|
|
|
|
|
|
|
|
|
|
- @Autowired
|
|
|
|
|
- @Lazy
|
|
|
|
|
private IHospitalLogClient hospitalLogService;
|
|
private IHospitalLogClient hospitalLogService;
|
|
|
|
|
|
|
|
- @Value("${aliyun.server-subscription.enable:false}")
|
|
|
|
|
- private boolean isEnable;
|
|
|
|
|
|
|
+ public NBAndFourGConsumerGroupService(AliIotSubscribeClient client,AliIotConsumerPojo consumer) {
|
|
|
|
|
+ super(client,consumer);
|
|
|
|
|
+ this.deviceService= SpringUtil.getBean(IDeviceClient.class);
|
|
|
|
|
+ this.iotMsgHandler= SpringUtil.getBean(IIotMsgHandler.class);
|
|
|
|
|
+ this.hospitalLogService= SpringUtil.getBean(IHospitalLogClient.class);
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- // 开启服务端订阅
|
|
|
|
|
- @PostConstruct
|
|
|
|
|
- public void subscribe(){
|
|
|
|
|
- if (!isEnable){
|
|
|
|
|
- log.info("订阅禁止");
|
|
|
|
|
- return;
|
|
|
|
|
- }
|
|
|
|
|
- log.info("允许开启订阅");
|
|
|
|
|
|
|
+
|
|
|
|
|
+ @Override
|
|
|
|
|
+ public void doSubscribe(){
|
|
|
try {
|
|
try {
|
|
|
- client.start(messageListener);
|
|
|
|
|
|
|
+ this.getClient().start(messageListener);
|
|
|
|
|
+ log.info("{},阿里云物联网订阅成功。",this.getConsumer().getName());
|
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
|
- e.printStackTrace();
|
|
|
|
|
|
|
+ log.error("{},阿里云物联网订阅失败,e:{}",this.getConsumer().getName(),e);
|
|
|
}
|
|
}
|
|
|
- log.info("阿里云物联网订阅成功。。。。。。。。。。。");
|
|
|
|
|
-// // 开启订阅
|
|
|
|
|
-// Executors.newSingleThreadScheduledExecutor()
|
|
|
|
|
-// .schedule(()->{
|
|
|
|
|
-//
|
|
|
|
|
-// },
|
|
|
|
|
-// Optional.ofNullable(delayTime).orElse(0L)*2,TimeUnit.SECONDS ) ;
|
|
|
|
|
-
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@@ -106,6 +84,7 @@ public class AliyunConsumerGroupService {
|
|
|
new LinkedBlockingQueue(50000),
|
|
new LinkedBlockingQueue(50000),
|
|
|
new AliYunThreadFactory());
|
|
new AliYunThreadFactory());
|
|
|
|
|
|
|
|
|
|
+
|
|
|
static class AliYunThreadFactory implements ThreadFactory {
|
|
static class AliYunThreadFactory implements ThreadFactory {
|
|
|
static final AtomicInteger poolNumber = new AtomicInteger(1);
|
|
static final AtomicInteger poolNumber = new AtomicInteger(1);
|
|
|
final ThreadGroup group;
|
|
final ThreadGroup group;
|
|
@@ -144,7 +123,7 @@ public class AliyunConsumerGroupService {
|
|
|
// 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。
|
|
// 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。
|
|
|
executorService.submit(()-> processMessage(message));
|
|
executorService.submit(()-> processMessage(message));
|
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
|
- log.error("submit task occurs exception ", e);
|
|
|
|
|
|
|
+ log.error("{},submit task occurs exception ", this.getConsumer().getName(),e);
|
|
|
}
|
|
}
|
|
|
};
|
|
};
|
|
|
|
|
|
|
@@ -158,13 +137,11 @@ public class AliyunConsumerGroupService {
|
|
|
String messageId = message.getStringProperty(MESSAGEID);
|
|
String messageId = message.getStringProperty(MESSAGEID);
|
|
|
hospitalLog.setMsgId(messageId);
|
|
hospitalLog.setMsgId(messageId);
|
|
|
JSONObject content = JSON.parseObject(new String(message.getBody(byte[].class)));
|
|
JSONObject content = JSON.parseObject(new String(message.getBody(byte[].class)));
|
|
|
- log.info("阿里云物联网发送的数据:"+JSON.toJSONString(content));
|
|
|
|
|
|
|
+ log.info("{},阿里云物联网发送的数据:{}",this.getConsumer().getName(),JSON.toJSONString(content));
|
|
|
// 设备名称
|
|
// 设备名称
|
|
|
deviceName = content.getString(DEVICENAME);
|
|
deviceName = content.getString(DEVICENAME);
|
|
|
hospitalLog.setIdentityCode(deviceName);
|
|
hospitalLog.setIdentityCode(deviceName);
|
|
|
hospitalLog.setInput(content.toJSONString());
|
|
hospitalLog.setInput(content.toJSONString());
|
|
|
- // 创建设备对象
|
|
|
|
|
- Date now = new Date();
|
|
|
|
|
// 根据topic判断数据类型,设备上下线状态
|
|
// 根据topic判断数据类型,设备上下线状态
|
|
|
if (topic.matches("^/as/mqtt/status/[\\w\\/]*")){
|
|
if (topic.matches("^/as/mqtt/status/[\\w\\/]*")){
|
|
|
BusDeviceEntity device = new BusDeviceEntity();
|
|
BusDeviceEntity device = new BusDeviceEntity();
|
|
@@ -236,11 +213,7 @@ public class AliyunConsumerGroupService {
|
|
|
}
|
|
}
|
|
|
hospitalLogService.save(hospitalLog);
|
|
hospitalLogService.save(hospitalLog);
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
}
|
|
}
|