|
|
@@ -0,0 +1,185 @@
|
|
|
+package org.jetlinks.community.device.service;
|
|
|
+
|
|
|
+import cn.hutool.core.util.StrUtil;
|
|
|
+import cn.hutool.cron.CronUtil;
|
|
|
+import lombok.AllArgsConstructor;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.hswebframework.ezorm.rdb.mapping.ReactiveUpdate;
|
|
|
+import org.hswebframework.web.crud.service.GenericReactiveCrudService;
|
|
|
+import org.jetlinks.community.device.entity.DeviceFirmwareEntity;
|
|
|
+import org.jetlinks.community.device.entity.DeviceFirmwareTaskEntity;
|
|
|
+import org.jetlinks.community.device.entity.DeviceInstanceEntity;
|
|
|
+import org.jetlinks.community.device.entity.DeviceUpgradeHistoryEntity;
|
|
|
+import org.jetlinks.community.device.enums.FirmwareUpgradeState;
|
|
|
+import org.jetlinks.community.device.enums.TaskMode;
|
|
|
+import org.jetlinks.community.gateway.annotation.Subscribe;
|
|
|
+import org.jetlinks.core.device.DeviceMessageSender;
|
|
|
+import org.jetlinks.core.device.DeviceOperator;
|
|
|
+import org.jetlinks.core.device.DeviceRegistry;
|
|
|
+import org.jetlinks.core.message.firmware.*;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+import reactor.core.publisher.Flux;
|
|
|
+import reactor.core.publisher.Mono;
|
|
|
+
|
|
|
+import javax.management.Query;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Objects;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.concurrent.Executors;
|
|
|
+import java.util.concurrent.ScheduledFuture;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+
|
|
|
+/**
|
|
|
+ * @author lifang
|
|
|
+ * @version 1.0.0
|
|
|
+ * @ClassName LocalDeviceFirmwareService.java
|
|
|
+ * @Description TODO
|
|
|
+ * @createTime 2021年08月10日 09:26:00
|
|
|
+ */
|
|
|
+@Service
|
|
|
+@AllArgsConstructor
|
|
|
+@Slf4j
|
|
|
+public class LocalFirmwareUpgradeHistoryService extends GenericReactiveCrudService<DeviceUpgradeHistoryEntity, String> {
|
|
|
+ private final LocalDeviceFirmwareTaskService firmwareTaskService;
|
|
|
+ private final LocalDeviceFirmwareService firmwareService;
|
|
|
+ private final LocalDeviceInstanceService deviceService;
|
|
|
+ private final DeviceRegistry registry;
|
|
|
+ public static final Map<String,ScheduledFuture<?>> taskDelayService=new ConcurrentHashMap<>();
|
|
|
+ /**
|
|
|
+ * 上报版本信息
|
|
|
+ * @param message
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ @Subscribe("/device/*/*/firmware/report")
|
|
|
+ public Mono<Void> createHistory(ReportFirmwareMessage message) {
|
|
|
+ return
|
|
|
+ Mono.zip(
|
|
|
+ registry.getDevice(message.getDeviceId()),
|
|
|
+ firmwareService.createQuery().where(DeviceFirmwareEntity::getProductId, message.getHeader("productId").get()).fetchOne(),
|
|
|
+ //平台推送
|
|
|
+ firmwareTaskService.createQuery()
|
|
|
+ .where(DeviceFirmwareTaskEntity::getMode,TaskMode.push)
|
|
|
+ .where(DeviceFirmwareTaskEntity::getProductId, message.getHeader("productId").get()).fetchOne())
|
|
|
+ .doOnNext(tp3 -> {
|
|
|
+ DeviceOperator operator = tp3.getT1();
|
|
|
+ DeviceFirmwareEntity firmware = tp3.getT2();
|
|
|
+ if (!firmware.getVersion().equals(message.getVersion())) {
|
|
|
+ //如果存在平台推送且版本不一致,则推送升级固件消息,等待升级
|
|
|
+ UpgradeFirmwareMessage firmwareMessage = new UpgradeFirmwareMessage();
|
|
|
+ firmwareMessage.setDeviceId(message.getDeviceId());
|
|
|
+ firmwareMessage.setUrl(firmware.getUrl());
|
|
|
+ firmwareMessage.setVersion(firmware.getVersion());
|
|
|
+ firmwareMessage.setSign(firmware.getSign());
|
|
|
+ firmwareMessage.setSignMethod(firmware.getSignMethod());
|
|
|
+ firmwareMessage.setTimestamp(System.currentTimeMillis());
|
|
|
+ operator.messageSender().sendAndForget(firmwareMessage).subscribe();
|
|
|
+ }
|
|
|
+ }).then();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 上报更新进度
|
|
|
+ * @param message
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ @Subscribe("/device/*/*/firmware/progress")
|
|
|
+ public Mono<Void> handleProcess(UpgradeFirmwareProgressMessage message) {
|
|
|
+ return Mono.just(message)
|
|
|
+ .doOnNext(progress->{
|
|
|
+ String firmwareId = message.getFirmwareId();
|
|
|
+ String version = message.getVersion();
|
|
|
+ ReactiveUpdate<DeviceUpgradeHistoryEntity> upgrade = this.createUpdate()
|
|
|
+ .where(DeviceUpgradeHistoryEntity::getDeviceId, message.getDeviceId())
|
|
|
+ .where(DeviceUpgradeHistoryEntity::getFirmwareId, firmwareId)
|
|
|
+ .where(DeviceUpgradeHistoryEntity::getVersion, version)
|
|
|
+ .in(DeviceUpgradeHistoryEntity::getState,FirmwareUpgradeState.processing,FirmwareUpgradeState.waiting)
|
|
|
+ .set(DeviceUpgradeHistoryEntity::getProgress, message.getProgress())
|
|
|
+ .set(DeviceUpgradeHistoryEntity::getLastUpdateTime,System.currentTimeMillis());
|
|
|
+ boolean complete = message.isComplete();
|
|
|
+ //更新是否结束
|
|
|
+ if(complete){
|
|
|
+ //判断更新成功或者失败
|
|
|
+ boolean success = message.isSuccess();
|
|
|
+ if(success){
|
|
|
+ upgrade.set(DeviceUpgradeHistoryEntity::getState,FirmwareUpgradeState.finished)
|
|
|
+ .set(DeviceUpgradeHistoryEntity::getProgress, 100)
|
|
|
+ .set(DeviceUpgradeHistoryEntity::getErrorReason,null);
|
|
|
+ }else {
|
|
|
+ upgrade.set(DeviceUpgradeHistoryEntity::getState,FirmwareUpgradeState.failed)
|
|
|
+ .set(DeviceUpgradeHistoryEntity::getErrorReason,message.getErrorReason());
|
|
|
+ }
|
|
|
+ }else {
|
|
|
+ upgrade.set(DeviceUpgradeHistoryEntity::getState,FirmwareUpgradeState.processing);
|
|
|
+ }
|
|
|
+ upgrade.execute().subscribe();
|
|
|
+ }).doOnNext(progress->{
|
|
|
+ if(message.getProgress()==0){
|
|
|
+ //设备id+固件id+版本号作为key值
|
|
|
+ String key=message.getDeviceId()+message.getFirmwareId()+message.getVersion();
|
|
|
+ //开始更新,创建超时任务
|
|
|
+ taskDelayService.putIfAbsent(key, Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(
|
|
|
+ () -> {
|
|
|
+ String firmwareId = message.getFirmwareId();
|
|
|
+ String version = message.getVersion();
|
|
|
+ //更新失败
|
|
|
+ this.createUpdate()
|
|
|
+ .where(DeviceUpgradeHistoryEntity::getDeviceId, message.getDeviceId())
|
|
|
+ .where(DeviceUpgradeHistoryEntity::getFirmwareId, firmwareId)
|
|
|
+ .where(DeviceUpgradeHistoryEntity::getVersion, version)
|
|
|
+ .where(DeviceUpgradeHistoryEntity::getState,FirmwareUpgradeState.processing)
|
|
|
+ .set(DeviceUpgradeHistoryEntity::getState,FirmwareUpgradeState.failed)
|
|
|
+ .set(DeviceUpgradeHistoryEntity::getErrorReason,"升级超时")
|
|
|
+ .set(DeviceUpgradeHistoryEntity::getProgress, message.getProgress())
|
|
|
+ .set(DeviceUpgradeHistoryEntity::getLastUpdateTime,System.currentTimeMillis()).execute().subscribe();
|
|
|
+ taskDelayService.remove(key);
|
|
|
+ },
|
|
|
+ System.currentTimeMillis(), 3L, TimeUnit.SECONDS));
|
|
|
+
|
|
|
+ }
|
|
|
+ })
|
|
|
+ .then();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 拉取固件更新,返回更新信息并创建历史更新记录
|
|
|
+ * @param messages
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ @Subscribe("/device/*/*/firmware/pull")
|
|
|
+ public Mono<Void> pushUpgrade(RequestFirmwareMessage messages) {
|
|
|
+ return registry.getDevice(messages.getDeviceId())
|
|
|
+ .flatMap(operator->{
|
|
|
+ return Mono.zip(operator.getProduct(),
|
|
|
+ Mono.just(operator.messageSender()),
|
|
|
+ firmwareService.createQuery()
|
|
|
+ .where(DeviceFirmwareEntity::getProductId,messages.getHeader("productId").get())
|
|
|
+ .when("version",messages.getRequestVersion(),value->StrUtil.isNotEmpty(messages.getRequestVersion()),query->query::where)
|
|
|
+ .fetchOne(),
|
|
|
+ deviceService.findById(messages.getDeviceId()));
|
|
|
+ })
|
|
|
+ .flatMap(tp4->{
|
|
|
+ //查询当前产品的升级任务
|
|
|
+ DeviceFirmwareEntity firmware = tp4.getT3();
|
|
|
+ return Mono.zip(Mono.just(tp4.getT1()),Mono.just(tp4.getT2()),Mono.just(tp4.getT3()),Mono.just(tp4.getT4()),
|
|
|
+ firmwareTaskService.createQuery()
|
|
|
+ .where(DeviceFirmwareTaskEntity::getFirmwareId,firmware.getId())
|
|
|
+ .where(DeviceFirmwareTaskEntity::getMode, TaskMode.pull.getValue())
|
|
|
+ .fetchOne());
|
|
|
+ })
|
|
|
+ .doOnNext(tp5->{
|
|
|
+ DeviceFirmwareEntity firmware = tp5.getT3();
|
|
|
+ DeviceMessageSender sender = tp5.getT2();
|
|
|
+ RequestFirmwareMessageReply reply = new RequestFirmwareMessageReply();
|
|
|
+ reply.setTimestamp(System.currentTimeMillis());
|
|
|
+ reply.setDeviceId(messages.getDeviceId());
|
|
|
+ reply.setSign(firmware.getSign());
|
|
|
+ reply.setSignMethod(firmware.getSignMethod());
|
|
|
+ reply.setSize(10L);
|
|
|
+ reply.setUrl(firmware.getUrl());
|
|
|
+ reply.setMessageId(messages.getMessageId());
|
|
|
+ sender.sendAndForget(reply).
|
|
|
+ subscribe();
|
|
|
+ }).then();
|
|
|
+ }
|
|
|
+}
|