Jelajahi Sumber

fixed 固件升级任务

18339543638 4 tahun lalu
induk
melakukan
a2a8927c7b

+ 12 - 5
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/LocalFirmwareUpgradeHistoryService.java

@@ -1,6 +1,9 @@
 package org.jetlinks.community.device.service;
 
+import cn.hutool.core.date.DateUtil;
 import cn.hutool.core.util.StrUtil;
+import cn.hutool.cron.CronUtil;
+import com.google.common.collect.Maps;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.hswebframework.ezorm.rdb.mapping.ReactiveUpdate;
@@ -70,7 +73,6 @@ public class LocalFirmwareUpgradeHistoryService extends GenericReactiveCrudServi
                             long now = System.currentTimeMillis();
                             DeviceUpgradeHistoryEntity history= new DeviceUpgradeHistoryEntity(firmware.getProductId(),
                                 message.getDeviceId(), instance.getName(), firmware.getId(), "0", task.getId(), task.getName(), task.getTimeoutSeconds(), firmware.getVersion(), firmware.getVersionOrder(), now, FirmwareUpgradeState.waiting);
-
                             UpgradeFirmwareMessage firmwareMessage = new UpgradeFirmwareMessage();
                             firmwareMessage.setDeviceId(message.getDeviceId());
                             firmwareMessage.setUrl(firmware.getUrl());
@@ -79,8 +81,9 @@ public class LocalFirmwareUpgradeHistoryService extends GenericReactiveCrudServi
                             firmwareMessage.setSignMethod(firmware.getSignMethod());
                             firmwareMessage.setTimestamp(now);
                             firmwareMessage.setFirmwareId(firmware.getId());
+                            firmwareMessage.setParameters(Collections.singletonMap("taskId",task.getId()));
                             return operator.messageSender().sendAndForget(firmwareMessage)
-                                .zipWith(Mono.defer(()->this.insert(Mono.just(history))));
+                                .then(Mono.defer(()->this.insert(Mono.just(history))));
                         }
                     }
                     return Mono.empty();
@@ -104,7 +107,6 @@ public class LocalFirmwareUpgradeHistoryService extends GenericReactiveCrudServi
                     .where(DeviceUpgradeHistoryEntity::getFirmwareId, firmwareId)
                     .where(DeviceUpgradeHistoryEntity::getVersion, version)
                     .in(DeviceUpgradeHistoryEntity::getState,FirmwareUpgradeState.processing,FirmwareUpgradeState.waiting)
-//                    .set(DeviceUpgradeHistoryEntity::getProgress, message.getProgress())
                     .set(DeviceUpgradeHistoryEntity::getLastUpdateTime,System.currentTimeMillis());
                 boolean complete = message.isComplete();
                 //更新是否结束
@@ -129,6 +131,7 @@ public class LocalFirmwareUpgradeHistoryService extends GenericReactiveCrudServi
                     //设备id+固件id+版本号作为key值
                     String key=message.getDeviceId()+message.getFirmwareId()+message.getVersion();
                     //开始更新,创建超时任务
+
                     taskDelayService.putIfAbsent(key, Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(
                         () -> {
                             String firmwareId = message.getFirmwareId();
@@ -144,9 +147,11 @@ public class LocalFirmwareUpgradeHistoryService extends GenericReactiveCrudServi
                                 .set(DeviceUpgradeHistoryEntity::getProgress, message.getProgress())
                                 .set(DeviceUpgradeHistoryEntity::getLastUpdateTime,System.currentTimeMillis()).execute()
                                 .subscribe();
+                            ScheduledFuture<?> future = taskDelayService.get(key);
                             taskDelayService.remove(key);
+                            future.cancel(true);
                         },
-                        System.currentTimeMillis(), 3L, TimeUnit.SECONDS));
+                        3L, 3L, TimeUnit.SECONDS));
                 }
             })
             .then();
@@ -194,12 +199,14 @@ public class LocalFirmwareUpgradeHistoryService extends GenericReactiveCrudServi
                     reply.setDeviceId(messages.getDeviceId());
                     reply.setSign(firmware.getSign());
                     reply.setSignMethod(firmware.getSignMethod());
+                    reply.setFirmwareId(firmware.getId());
                     //todo 修改文件大小
                     reply.setSize(10L);
                     reply.setUrl(firmware.getUrl());
                     reply.setMessageId(messages.getMessageId());
+                    reply.setParameters(Collections.singletonMap("taskId",task.getId()));
                     return sender.sendAndForget(reply)
-                        .zipWith(Mono.defer(()->this.insert(Mono.just(history))));
+                        .then(Mono.defer(()->this.insert(Mono.just(history))));
                 }
                 return Mono.empty();
             })

+ 7 - 3
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/JetLinksApplication.java

@@ -8,7 +8,10 @@ import org.hswebframework.web.logging.aop.EnableAccessLogger;
 import org.hswebframework.web.logging.events.AccessLoggerAfterEvent;
 import org.jetlinks.community.support.message.TimeSyncMessage;
 import org.jetlinks.community.standalone.utils.EnumsUtils;
+import org.jetlinks.community.support.message.TimeSyncReplyMessage;
 import org.jetlinks.core.message.MessageType;
+import org.jetlinks.core.message.firmware.UpgradeFirmwareMessageReply;
+import org.jetlinks.core.message.firmware.UpgradeFirmwareProgressMessage;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.cache.annotation.EnableCaching;
@@ -74,12 +77,13 @@ public class JetLinksApplication {
             }});
 
             //添加时间回复主题
-            EnumsUtils.addEnum(MessageType.class,"timeSyncReply", new Class<?>[]{Supplier.class},new Object[]{new Supplier<TimeSyncMessage>() {
+            EnumsUtils.addEnum(MessageType.class,"timeSyncReply", new Class<?>[]{Supplier.class},new Object[]{new Supplier<TimeSyncReplyMessage>() {
                 @Override
-                public TimeSyncMessage get() {
-                    return new TimeSyncMessage();
+                public TimeSyncReplyMessage get() {
+                    return new TimeSyncReplyMessage();
                 }
             }});
+
             return Void.TYPE;
         }
     }

+ 7 - 0
pom.xml

@@ -31,6 +31,7 @@
         <reactor.ql.version>1.0.11</reactor.ql.version>
         <fastjson.version>1.2.70</fastjson.version>
         <hutool.version>5.5.8</hutool.version>
+        <jetlinks.version>1.1.7-SNAPSHOT</jetlinks.version>
     </properties>
 
     <build>
@@ -215,6 +216,12 @@
                 <version>${reactor.ql.version}</version>
             </dependency>
 
+            <dependency>
+                <groupId>org.jetlinks</groupId>
+                <artifactId>jetlinks-core</artifactId>
+                <version>${jetlinks.version}</version>
+            </dependency>
+
             <dependency>
                 <groupId>com.fasterxml.jackson</groupId>
                 <artifactId>jackson-bom</artifactId>