소스 검색

feat: 阿里云物联网平台对接

龙三郎 3 년 전
부모
커밋
1c237c22b7

+ 41 - 0
coffee-admin/src/test/java/com/coffee/admin/AliyunTest.java

@@ -0,0 +1,41 @@
+package com.coffee.admin;
+
+import com.coffee.admin.AdminApplication;
+import com.coffee.bus.entity.BusDeviceEntity;
+import com.coffee.bus.entity.BusPumpEntity;
+import com.coffee.bus.listener.event.bean.DeviceInfoEvent;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.ApplicationContext;
+import org.springframework.test.context.junit4.SpringRunner;
+
+/**
+ * @Author longsanlang
+ * @Date 2022-04-06 17:55:44
+ * @Version 1.0
+ * @Description XXX
+ */
+@RunWith(SpringRunner.class)
+@SpringBootTest(classes = AdminApplication.class)
+public class AliyunTest {
+
+    @Autowired
+    private ApplicationContext applicationContext;
+
+    @Test
+    public void test001(){
+        BusPumpEntity pump = new BusPumpEntity();
+        pump.setAlias("123456777");
+        DeviceInfoEvent event = new DeviceInfoEvent(this,pump,"123");
+        applicationContext.publishEvent(event);
+
+        try {
+            Thread.sleep(2000000);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+
+}

+ 0 - 0
coffee-admin/src/main/test/java/com/coffee/admin/BusHospitalLogTest.java → coffee-admin/src/test/java/com/coffee/admin/BusHospitalLogTest.java


+ 0 - 0
coffee-admin/src/main/test/java/com/coffee/admin/BusHospitalTest.java → coffee-admin/src/test/java/com/coffee/admin/BusHospitalTest.java


+ 0 - 0
coffee-admin/src/main/test/java/com/coffee/admin/BusNetpumpTest.java → coffee-admin/src/test/java/com/coffee/admin/BusNetpumpTest.java


+ 0 - 0
coffee-admin/src/main/test/java/com/coffee/admin/BusTemplateAnalgesicTest.java → coffee-admin/src/test/java/com/coffee/admin/BusTemplateAnalgesicTest.java


+ 0 - 0
coffee-admin/src/main/test/java/com/coffee/admin/SpringBootApplicationTests.java → coffee-admin/src/test/java/com/coffee/admin/SpringBootApplicationTests.java


+ 24 - 0
coffee-system/pom.xml

@@ -20,6 +20,30 @@
             <groupId>org.python</groupId>
             <artifactId>jython-standalone</artifactId>
         </dependency>
+
+
+        <!--阿里云物联网sdk-->
+        <!-- https://mvnrepository.com/artifact/com.aliyun/aliyun-java-sdk-iot -->
+        <dependency>
+            <groupId>com.aliyun</groupId>
+            <artifactId>aliyun-java-sdk-iot</artifactId>
+            <version>7.31.0</version>
+        </dependency>
+        <dependency>
+            <groupId>com.aliyun</groupId>
+            <artifactId>aliyun-java-sdk-core</artifactId>
+            <version>4.5.6</version>
+        </dependency>
+        <!-- amqp 1.0 qpid client -->
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-jms-client</artifactId>
+            <version>0.56.0</version>
+        </dependency>
+
+
+
+
     </dependencies>
 
 </project>

+ 202 - 0
coffee-system/src/main/java/com/coffee/aliyun/AliyunConsumerGroupService.java

@@ -0,0 +1,202 @@
+package com.coffee.aliyun;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.coffee.bus.entity.BusPumpEntity;
+import com.coffee.bus.listener.event.bean.DeviceInfoEvent;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.ApplicationContext;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+* @Author longsanlang
+* @Date 2022-4-06 16:22:13
+* @Version 1.0
+* @Description 阿里云物联网平台服务端订阅
+*/
+@Service
+@Slf4j
+public class AliyunConsumerGroupService {
+
+    @Autowired
+    private ApplicationContext applicationContext;
+
+    @Value("${aliyun.data-access.enable:false}")
+    private boolean isEnable;
+
+    static {
+
+    }
+
+    // 开启服务端订阅
+    @PostConstruct
+    public void subscribe(){
+        if (isEnable){
+            log.info("允许开启订阅");
+        }else {
+            log.info("订阅禁止");
+            return;
+        }
+
+        // 获取ip地址
+        InetAddress addr = null;
+        try {
+            addr = InetAddress.getLocalHost();
+        } catch (UnknownHostException e) {
+            e.printStackTrace();
+        }
+        String ip = addr.getHostAddress();
+        // 初始化订阅客户端
+        // 阿里云账号信息
+        AliyunIotSubscribeClient client =
+                new AliyunIotSubscribeClient(
+                        "LTAI4FhB19MgQuviGxwA3aod",
+                        "cQQVkATR0yv2G9CEtfjAhEGBepPDRs",
+                        "cn-shanghai",
+                        "DEFAULT_GROUP",
+                        "1177450762772738",
+                        "","DEFAULT_GROUP" + ip
+                );
+        try {
+            // 开启订阅
+            client.start(messageListener);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        System.out.println("订阅成功。。。。。。。。。。。");
+    }
+
+
+
+    //业务处理异步线程池,线程池参数可以根据您的业务特点调整,或者您也可以用其他异步方式处理接收到的消息。
+    private final static ExecutorService executorService = new ThreadPoolExecutor(
+            Runtime.getRuntime().availableProcessors(),
+            Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,
+            new LinkedBlockingQueue(50000));
+
+    private MessageListener messageListener = (message) -> {
+            try {
+                //1.收到消息之后一定要ACK。
+                // 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。
+                // 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。
+                // message.acknowledge();
+                //2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。
+                // 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。
+                executorService.submit(()-> processMessage(message));
+            } catch (Exception e) {
+                log.error("submit task occurs exception ", e);
+            }
+    };
+
+    /**
+     * 处理逻辑,加上锁
+     * @param message
+     */
+    private static final String IOTID = "iotId";
+    private static final String PRODUCTKEY = "productKey";
+    private static final String TOPIC = "topic";
+    private static final String MESSAGEID = "messageId";
+    private static final String ITEMS = "items";
+    private static final String VALUE = "value";
+    private static final String STATUS = "status";
+    private synchronized void processMessage(Message message) {
+
+        PlatformDataLog dataLog = new PlatformDataLog();
+        Map<String, Object> platformData = new HashMap<>();
+        try {
+            byte[] body = message.getBody(byte[].class);
+            String contentString = new String(body);
+            String topic = message.getStringProperty(TOPIC);
+            String messageId = message.getStringProperty(MESSAGEID);
+            platformData.put("topic",topic);
+            platformData.put("messageId",messageId);
+            platformData.put("content",contentString);
+            /**
+             * platformData:
+             * {
+             * "topic":"/a1M7k1TAECc/ceshi001/thing/event/property/post",
+             * "messageId":"1496410088574460416",
+             * "content":"{\"deviceType\":\"CustomCategory\",
+             * \"iotId\":\"0syOzMqwkGebZBGKa08d000000\",
+             * \"requestId\":\"254\",\"checkFailedData\":{},
+             * \"productKey\":\"a1M7k1TAECc\",\"gmtCreate\":1645606941721,
+             * \"deviceName\":\"ceshi001\",
+             * \"items\":{
+             * \"total\":{\"value\":100,\"time\":1645606941717},
+             * \"pumpCode\":{\"value\":\"5719512336330299\",\"time\":1645606941717},
+             * \"dataNumber\":{\"value\":254,\"time\":1645606941717},
+             * \"PCAInvalid\":{\"value\":0,\"time\":1645606941717},
+             * \"alarm\":{\"value\":0,\"time\":1645606941717},
+             * \"flowRate\":{\"value\":2,\"time\":1645606941717},
+             * \"PCAValid\":{\"value\":0,\"time\":1645606941717},
+             * \"forecast\":{\"value\":0,\"time\":1645606941717},
+             * \"finished\":{\"value\":17,\"time\":1645606941717},
+             * \"battery\":{\"value\":65,\"time\":1645606941717},
+             * \"runStatus\":{\"value\":2,\"time\":1645606941717},
+             * \"patientCode\":{\"value\":111110000,\"time\":1645606941717}}}"
+             * }
+             */
+            // 日志编号
+            dataLog.setCode(messageId);
+            // 平台数据
+            dataLog.setPlatformData(JSON.toJSONString(platformData));
+            // 将内容转换成对象
+            JSONObject content = JSON.parseObject(contentString);
+            // 设备编号
+            String deviceCode = content.getString(IOTID);
+            String productKey = content.getString(PRODUCTKEY);
+            // 设置平台和设备
+            dataLog.setPlatformCode(PlatformType.ALIYUN);
+            dataLog.setDeviceCode(deviceCode);
+            dataLog.setPlatformProductCode(productKey);
+            // 根据topic判断数据类型
+            if (topic.matches("^/as/mqtt/status/[\\w\\/]*")){//设备上下线状态
+                String status = content.getString(STATUS);
+                if (status.equals("online")){
+                    log.info(deviceCode+"设备上线");
+                }else if (status.equals("offline")){
+                    log.info(deviceCode+"设备下线");
+                }else {
+                    log.info(deviceCode+"设备未激活");
+                }
+            }else if (topic.matches("[\\w\\/]*event/property/post$")){//设备属性上报
+                // 设备属性上报
+                DeviceInfoEvent deviceInfoEvent = new DeviceInfoEvent(this,new BusPumpEntity(),"123456");
+                applicationContext.publishEvent(deviceInfoEvent);
+
+            }else{// 其他的topic
+                System.out.println("未知topic:"+topic);
+            }
+
+        } catch (Exception e) {
+//            e.printStackTrace();
+            // 错误标志
+            dataLog.setIsError("1");
+            dataLog.setErrorLog(e.getMessage());
+            log.error("数据处理失败 ", e);
+        }
+        // 存储日志
+
+    }
+
+
+
+
+
+
+
+}

+ 323 - 0
coffee-system/src/main/java/com/coffee/aliyun/AliyunIotSdk.java

@@ -0,0 +1,323 @@
+package com.coffee.aliyun;
+
+import com.alibaba.fastjson.JSON;
+import com.aliyuncs.DefaultAcsClient;
+import com.aliyuncs.exceptions.ClientException;
+import com.aliyuncs.iot.model.v20180120.*;
+import com.aliyuncs.profile.DefaultProfile;
+import com.aliyuncs.profile.IClientProfile;
+import lombok.Data;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+//import com.tuoren.common.utils.StringUtils;
+
+/**
+ * @Author XX
+ * @Date 2021-06-11 08:10:58
+ * @Version 1.0
+ * @Description XXX
+ */
+@Data
+public class AliyunIotSdk {
+    private static final String ACCESSKEY = "accessKey";
+    private static final String ACCESSSECRET = "accessSecret";
+    private static final String REFIONID = "regionId";
+
+    private String accessKey;
+    private String accessSecret;
+    private String regionId;
+    // client
+    private DefaultAcsClient client;
+    public AliyunIotSdk(String accessKey, String accessSecret, String regionId) {
+        this.accessKey = accessKey;
+        this.accessSecret = accessSecret;
+        this.regionId = regionId;
+    }
+    public AliyunIotSdk(AliyunParams aliyunParams){
+        this.accessKey = aliyunParams.getAccessKey();
+        this.accessSecret = aliyunParams.getAccessKeySecret();
+        this.regionId = aliyunParams.getRegionId();
+    }
+
+    public AliyunIotSdk(PlatformAccount platformAccount) {
+        this.accessKey = platformAccount.getConfiguration().get(ACCESSKEY).toString();
+        this.accessSecret = platformAccount.getConfiguration().get(ACCESSSECRET).toString();
+        this.regionId = platformAccount.getConfiguration().get(REFIONID).toString();
+    }
+
+    /**
+     * 获取阿里云客户端
+     * @return
+     */
+    protected final DefaultAcsClient getAliyuniotClient(){
+        if (client != null){
+            return client;
+        }
+        IClientProfile profile = DefaultProfile.getProfile(regionId, accessKey, accessSecret);
+        DefaultAcsClient client = new DefaultAcsClient(profile); //初始化SDK客户端。
+        this.client = client;
+        return client;
+    }
+
+    /**
+     * 	查询产品列表。
+     * @return
+     */
+    public List<QueryProductListResponse.Data.ProductInfo> queryProductList(){
+        // 获取阿里云SDK客户端
+        DefaultAcsClient client = this.getAliyuniotClient();
+        List<QueryProductListResponse.Data.ProductInfo> products = new ArrayList<>();
+        Integer pageSize = 100,currentPage = 1;
+        while (true){
+            QueryProductListRequest queryProductListRequest = new QueryProductListRequest();
+            queryProductListRequest.setPageSize(pageSize);
+            queryProductListRequest.setCurrentPage(currentPage);
+            try {
+                QueryProductListResponse queryProductListResponse = client.getAcsResponse(queryProductListRequest);
+                System.out.println(queryProductListResponse.getSuccess());
+                // 获取失败直接跳出循环
+                if (!queryProductListResponse.getSuccess()){
+                    break;
+                }
+                QueryProductListResponse.Data data = queryProductListResponse.getData();
+                // 添加到列表
+                products.addAll(data.getList());
+                // 判断是否获取了全部
+                if (data.getCurrentPage() < data.getPageCount()){
+                    // 当前页小于总页数,说明没有获取完
+                    currentPage++;
+                }else {
+                    break;
+                }
+            } catch (ClientException e) {
+                e.printStackTrace();
+            }
+        }
+        return products;
+    }
+
+
+    /**
+     * 注册设备
+     * @param
+     * @return
+     */
+    public RegisterDeviceResponse.Data registerDevice(String productKey, String name){
+
+        // 获取阿里云SDK客户端
+        DefaultAcsClient client = this.getAliyuniotClient();
+        List<QueryDeviceResponse.DeviceInfo> list = new ArrayList<>();
+        Integer pageSize = 100,currentPage = 1;
+
+        RegisterDeviceRequest request = new RegisterDeviceRequest();
+        // 产品key,必需
+        request.setProductKey(productKey);
+        // 产品名称,非必需
+        request.setDeviceName(name);
+        try {
+            RegisterDeviceResponse response = client.getAcsResponse(request);
+            System.out.println(response.getSuccess());
+            // 获取失败直接跳出循环
+            if (!response.getSuccess()){
+                return null;
+            }
+            RegisterDeviceResponse.Data data = response.getData();
+            return data;
+        } catch (ClientException e) {
+            e.printStackTrace();
+        }
+        return null;
+    }
+
+    /**
+     * 删除设备
+     * @param
+     * @return
+     */
+    public int deleteDevice(String iotId){
+        // 获取阿里云SDK客户端
+        DefaultAcsClient client = this.getAliyuniotClient();
+        DeleteDeviceRequest request = new DeleteDeviceRequest();
+        // 产品iotId,必需
+        request.setIotId(iotId);
+        try {
+            DeleteDeviceResponse response = client.getAcsResponse(request);
+            System.out.println(response.getSuccess());
+            // 获取失败直接跳出循环
+            if (!response.getSuccess()){
+                return 1;
+            }
+        } catch (ClientException e) {
+            e.printStackTrace();
+        }
+        return 0;
+    }
+
+    /**
+     * 获取设备
+     * @param productKey
+     * @return
+     */
+    public List<QueryDeviceResponse.DeviceInfo> queryDevice(String productKey){
+
+        // 获取阿里云SDK客户端
+        DefaultAcsClient client = this.getAliyuniotClient();
+        List<QueryDeviceResponse.DeviceInfo> list = new ArrayList<>();
+        Integer pageSize = 100,currentPage = 1;
+        while (true){
+            QueryDeviceRequest request = new QueryDeviceRequest();
+            request.setPageSize(pageSize);
+            request.setCurrentPage(currentPage);
+            request.setProductKey(productKey);
+            try {
+                QueryDeviceResponse response = client.getAcsResponse(request);
+                System.out.println(response.getSuccess());
+                // 获取失败直接跳出循环
+                if (!response.getSuccess()){
+                    break;
+                }
+                List<QueryDeviceResponse.DeviceInfo> data = response.getData();
+                // 添加到列表
+                list.addAll(data);
+                // 判断是否获取了全部
+                if (response.getPage() < response.getPageCount()){
+                    // 当前页小于总页数,说明没有获取完
+                    currentPage++;
+                }else {
+                    break;
+                }
+            } catch (ClientException e) {
+                e.printStackTrace();
+            }
+        }
+        return list;
+    }
+
+
+    /**
+     * 获取阿里云消费组
+     * @return
+     */
+    public List<QueryConsumerGroupListResponse.ConsumerGroupDTO> queryConsumerGroupList(){
+
+        // 获取阿里云SDK客户端
+        DefaultAcsClient client = this.getAliyuniotClient();
+
+        QueryConsumerGroupListRequest _request = new QueryConsumerGroupListRequest();
+        _request.setPageSize(100);
+        _request.setCurrentPage(1);
+
+        List<QueryConsumerGroupListResponse.ConsumerGroupDTO> list = null;
+
+        try {
+            QueryConsumerGroupListResponse _response = client.getAcsResponse(_request);
+            list = _response.getData();
+        } catch (ClientException e) {
+            e.printStackTrace();
+        }
+
+        return list;
+
+    }
+
+    /**
+     * 创建一个消费组
+     * @param groupName
+     * @return
+     */
+    public CreateConsumerGroupResponse createConsumerGroup(String groupName){
+        // 获取阿里云SDK客户端
+        DefaultAcsClient client = this.getAliyuniotClient();
+
+        CreateConsumerGroupRequest _request = new CreateConsumerGroupRequest();
+        _request.setGroupName(groupName);
+
+        CreateConsumerGroupResponse acsResponse = null;
+        try {
+            acsResponse = client.getAcsResponse(_request);
+        }catch (Exception e){
+            e.printStackTrace();
+        }
+        return acsResponse;
+    }
+
+    /**
+     * 创建一个云产品流转规则
+     * @param ruleName
+     * @param productKey
+     * @param device_list
+     * @return
+     */
+    public CreateRuleResponse createRule(String ruleName, String productKey, String[] device_list){
+        // 获取阿里云SDK客户端
+        DefaultAcsClient client = this.getAliyuniotClient();
+
+        CreateRuleRequest _request = new CreateRuleRequest();
+        _request.setName(ruleName);
+        _request.setSelect("*");
+        _request.setShortTopic("+/thing/event/property/post");
+        _request.setProductKey(productKey);
+//        _request.setWhere("deviceName() in ("+ StringUtils.arrayToString(device_list,"'",",") +")");
+
+
+        CreateRuleResponse acsResponse = null;
+        try {
+            acsResponse = client.getAcsResponse(_request);
+        }catch (Exception e){
+            e.printStackTrace();
+        }
+        return acsResponse;
+    }
+
+    public CreateRuleActionResponse createRuleAction(Long ruleId, String groupId){
+        // 获取阿里云SDK客户端
+        DefaultAcsClient client = this.getAliyuniotClient();
+
+        CreateRuleActionRequest _request = new CreateRuleActionRequest();
+        _request.setRuleId(ruleId);
+        _request.setType("AMQP");
+
+        Map<String, Object> map = new HashMap<>();
+        map.put("groupId",groupId);
+
+        _request.setConfiguration(JSON.toJSONString(map));
+
+
+        CreateRuleActionResponse acsResponse = null;
+        try {
+            acsResponse = client.getAcsResponse(_request);
+        }catch (Exception e){
+            e.printStackTrace();
+        }
+        return acsResponse;
+    }
+
+
+
+    /**
+     * 获取设备的详细信息
+     * @param iotId
+     */
+    public QueryDeviceDetailResponse.Data queryDeviceDetail(String iotId){
+        // 获取阿里云SDK客户端
+        DefaultAcsClient client = this.getAliyuniotClient();
+
+        QueryDeviceDetailRequest _request = new QueryDeviceDetailRequest();
+        _request.setIotId(iotId);
+
+        QueryDeviceDetailResponse.Data data = null;
+        try {
+            QueryDeviceDetailResponse acsResponse = client.getAcsResponse(_request);
+            data = acsResponse.getData();
+        } catch (ClientException e) {
+            e.printStackTrace();
+        }
+        return data;
+    }
+
+
+}

+ 192 - 0
coffee-system/src/main/java/com/coffee/aliyun/AliyunIotSubscribeClient.java

@@ -0,0 +1,192 @@
+package com.coffee.aliyun;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.JmsConnectionListener;
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
+import javax.jms.*;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.List;
+
+/**
+ * @Author XX
+ * @Date 2021-06-17 10:53:30
+ * @Version 1.0
+ * @Description XXX
+ */
+@Slf4j
+public class AliyunIotSubscribeClient {
+
+
+    public AliyunIotSubscribeClient(String accessKey,
+                                    String accessSecret,
+                                    String regionId,
+                                    String consumerGroupId,
+                                    String aliyunUid,
+                                    String iotInstanceId,
+                                    String clientId) {
+        this.accessKey = accessKey;
+        this.accessSecret = accessSecret;
+        this.regionId = regionId;
+        this.consumerGroupId = consumerGroupId;
+        this.aliyunUid = aliyunUid;
+        this.iotInstanceId = iotInstanceId;
+        this.clientId = clientId;
+        this.host = aliyunUid+".iot-amqp."+ regionId +".aliyuncs.com";
+    }
+
+
+    private String accessKey = "LTAI4FhB19MgQuviGxwA3aod";
+    private String accessSecret = "cQQVkATR0yv2G9CEtfjAhEGBepPDRs";
+    private String consumerGroupId = "DEFAULT_GROUP";
+    private String aliyunUid = "1177450762772738";
+    private String regionId = "cn-shanghai";
+
+    //iotInstanceId:企业版实例请填写实例ID,公共实例请填空字符串""。
+    private String iotInstanceId = "";
+
+    //控制台服务端订阅中消费组状态页客户端ID一栏将显示clientId参数。
+    //建议使用机器UUID、MAC地址、IP等唯一标识等作为clientId。便于您区分识别不同的客户端。
+    private String clientId = "123456789";
+
+    //${YourHost}为接入域名,请参见AMQP客户端接入说明文档。
+    private String host = "1177450762772738.iot-amqp.cn-shanghai.aliyuncs.com";
+
+    // 指定单个进程启动的连接数
+    // 单个连接消费速率有限,请参考使用限制,最大64个连接
+    // 连接数和消费速率及rebalance相关,建议每500QPS增加一个连接
+    private static int connectionCount = 4;
+
+    @Getter
+    private List<Connection> connections = null;
+    public void start(MessageListener messageListener) throws Exception {
+        // 先关闭一下
+        close();
+        connections = new ArrayList<>();
+        //参数说明,请参见AMQP客户端接入说明文档。
+        for (int i = 0; i < connectionCount; i++) {
+            long timeStamp = System.currentTimeMillis();
+            //签名方法:支持hmacmd5、hmacsha1和hmacsha256。
+            String signMethod = "hmacsha1";
+
+            //userName组装方法,请参见AMQP客户端接入说明文档。
+            String userName = clientId + "-" + i + "|authMode=aksign"
+                    + ",signMethod=" + signMethod
+                    + ",timestamp=" + timeStamp
+                    + ",authId=" + accessKey
+                    + ",iotInstanceId=" + iotInstanceId
+                    + ",consumerGroupId=" + consumerGroupId
+                    + "|";
+            //计算签名,password组装方法,请参见AMQP客户端接入说明文档。
+            String signContent = "authId=" + accessKey + "&timestamp=" + timeStamp;
+            String password = doSign(signContent, accessSecret, signMethod);
+            String connectionUrl = "failover:(amqps://" + host + ":5671?amqp.idleTimeout=80000)"
+                    + "?failover.reconnectDelay=30";
+
+            Hashtable<String, String> hashtable = new Hashtable<>();
+            hashtable.put("connectionfactory.SBCF", connectionUrl);
+            hashtable.put("queue.QUEUE", "default");
+            hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
+            Context context = new InitialContext(hashtable);
+            ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF");
+            Destination queue = (Destination)context.lookup("QUEUE");
+            // 创建连接。
+            Connection connection = cf.createConnection(userName, password);
+            connections.add(connection);
+
+            ((JmsConnection)connection).addConnectionListener(myJmsConnectionListener);
+            // 创建会话。
+            // Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()。
+            // Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)。
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            connection.start();
+            // 创建Receiver连接。
+            MessageConsumer consumer = session.createConsumer(queue);
+            consumer.setMessageListener(messageListener);
+        }
+    }
+
+    public void close() throws Exception {
+        if (connections == null){
+            return;
+        }
+        connections.forEach(c-> {
+            try {
+                c.close();
+            } catch (JMSException e) {
+                log.error("连接关闭失败", e);
+            }
+        });
+    }
+
+
+    private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {
+        /**
+         * 连接成功建立。
+         */
+        @Override
+        public void onConnectionEstablished(URI remoteURI) {
+            log.info("连接成功, remoteUri:{}", remoteURI);
+        }
+
+        /**
+         * 尝试过最大重试次数之后,最终连接失败。
+         */
+        @Override
+        public void onConnectionFailure(Throwable error) {
+            log.error("连接失败, {}", error.getMessage());
+        }
+
+        /**
+         * 连接中断。
+         */
+        @Override
+        public void onConnectionInterrupted(URI remoteURI) {
+            log.info("连接中断, remoteUri:{}", remoteURI);
+        }
+
+        /**
+         * 连接中断后又自动重连上。
+         */
+        @Override
+        public void onConnectionRestored(URI remoteURI) {
+            log.info("自动重连, remoteUri:{}", remoteURI);
+        }
+
+        @Override
+        public void onInboundMessage(JmsInboundMessageDispatch envelope) {}
+
+        @Override
+        public void onSessionClosed(Session session, Throwable cause) {}
+
+        @Override
+        public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {}
+
+        @Override
+        public void onProducerClosed(MessageProducer producer, Throwable cause) {}
+    };
+
+    /**
+     * 计算签名,password组装方法,请参见AMQP客户端接入说明文档。
+     */
+    private static String doSign(String toSignString, String secret, String signMethod) throws Exception {
+        SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
+        Mac mac = Mac.getInstance(signMethod);
+        mac.init(signingKey);
+        byte[] rawHmac = mac.doFinal(toSignString.getBytes());
+        return Base64.encodeBase64String(rawHmac);
+    }
+
+
+
+}

+ 16 - 0
coffee-system/src/main/java/com/coffee/aliyun/AliyunParams.java

@@ -0,0 +1,16 @@
+package com.coffee.aliyun;
+
+import lombok.Data;
+
+/**
+ * @Author XX
+ * @Date 2022-02-11 16:58:55
+ * @Version 1.0
+ * @Description XXX
+ */
+@Data
+public class AliyunParams {
+    private String regionId;
+    private String accessKey;
+    private String accessKeySecret;
+}

+ 29 - 0
coffee-system/src/main/java/com/coffee/aliyun/Platform.java

@@ -0,0 +1,29 @@
+package com.coffee.aliyun;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableField;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.extension.handlers.FastjsonTypeHandler;
+import lombok.Data;
+
+import java.util.Map;
+
+/**
+ * @Author XX
+ * @Date 2022-01-05 17:33:29
+ * @Version 1.0
+ * @Description XXX
+ */
+@Data
+public class Platform {
+    @TableId(value = "id",type = IdType.AUTO)
+    private Long id;
+    private String code;
+    private String name;
+    @TableField(typeHandler = FastjsonTypeHandler.class)
+    private Map<String, Object> configuration;
+    private Long addTime;
+    private Long updateTime;
+    private String description;
+    private String isDelete;
+}

+ 35 - 0
coffee-system/src/main/java/com/coffee/aliyun/PlatformAccount.java

@@ -0,0 +1,35 @@
+package com.coffee.aliyun;
+
+import com.alibaba.fastjson.JSONObject;
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableField;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+import com.baomidou.mybatisplus.extension.handlers.FastjsonTypeHandler;
+import lombok.Data;
+
+/**
+ * @Author XX
+ * @Date 2022-01-05 17:33:29
+ * @Version 1.0
+ * @Description XXX
+ */
+@Data
+@TableName(autoResultMap = true)
+public class PlatformAccount {
+    @TableId(value = "id",type = IdType.AUTO)
+    private Long id;
+    private String code;
+    private String platformCode;
+    private String name;
+    @TableField(typeHandler = FastjsonTypeHandler.class)
+    private JSONObject configuration;
+    private Long addTime;
+    private Long updateTime;
+    private String description;
+    private String isDelete;
+
+    @TableField(exist = false)
+    private Platform platform;
+
+}

+ 40 - 0
coffee-system/src/main/java/com/coffee/aliyun/PlatformDataLog.java

@@ -0,0 +1,40 @@
+package com.coffee.aliyun;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableField;
+import com.baomidou.mybatisplus.annotation.TableId;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * @Author XX
+ * @Date 2022-01-10 11:43:57
+ * @Version 1.0
+ * @Description XXX
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class PlatformDataLog {
+    @TableId(value = "id",type = IdType.AUTO)
+    private Long id;
+    private String code;
+    private String platformCode;
+    private String platformProductCode;
+    private String deviceCode;
+    private String platformData;
+    private String resultData;
+    private String errorLog;
+    private String isError;
+    private Long addTime;
+    private Long updateTime;
+    private String description;
+    private String isDelete;
+
+    @TableField(exist = false)
+    private Platform platform;
+
+
+
+}

+ 12 - 0
coffee-system/src/main/java/com/coffee/aliyun/PlatformType.java

@@ -0,0 +1,12 @@
+package com.coffee.aliyun;
+
+/**
+ * @Author XX
+ * @Date 2022-02-12 08:08:15
+ * @Version 1.0
+ * @Description XXX
+ */
+public class PlatformType {
+    public static final String ALIYUN = "10";
+    public static final String TUOREN = "30";
+}

+ 26 - 0
coffee-system/src/main/java/com/coffee/aliyun/TestProgram.java

@@ -0,0 +1,26 @@
+package com.coffee.aliyun;
+
+import com.coffee.bus.entity.BusPumpEntity;
+import com.coffee.bus.listener.event.bean.DeviceInfoEvent;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
+
+/**
+ * @Author XX
+ * @Date 2022-04-06 17:18:38
+ * @Version 1.0
+ * @Description XXX
+ */
+
+public class TestProgram {
+
+    @Autowired
+    private ApplicationContext applicationContext;
+
+
+    public void test001() {
+        // 设备属性上报
+        DeviceInfoEvent deviceInfoEvent = new DeviceInfoEvent(this,new BusPumpEntity(),"123456");
+        applicationContext.publishEvent(deviceInfoEvent);
+    }
+}

+ 0 - 1
coffee-system/src/main/java/com/coffee/bus/script/PythonParse.java

@@ -8,7 +8,6 @@ import org.python.core.*;
 import org.python.util.PythonInterpreter;
 import org.springframework.stereotype.Component;
 
-import javax.xml.soap.SOAPMessage;
 import java.util.Properties;
 
 /**