|
@@ -3,10 +3,10 @@ package org.jetlinks.community.device.web;
|
|
|
import io.swagger.v3.oas.annotations.Operation;
|
|
import io.swagger.v3.oas.annotations.Operation;
|
|
|
import io.swagger.v3.oas.annotations.Parameter;
|
|
import io.swagger.v3.oas.annotations.Parameter;
|
|
|
import io.swagger.v3.oas.annotations.tags.Tag;
|
|
import io.swagger.v3.oas.annotations.tags.Tag;
|
|
|
|
|
+import lombok.AllArgsConstructor;
|
|
|
import lombok.Getter;
|
|
import lombok.Getter;
|
|
|
import lombok.SneakyThrows;
|
|
import lombok.SneakyThrows;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
-import org.hswebframework.ezorm.rdb.exception.DuplicateKeyException;
|
|
|
|
|
import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
|
|
import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
|
|
|
import org.hswebframework.ezorm.rdb.mapping.defaults.SaveResult;
|
|
import org.hswebframework.ezorm.rdb.mapping.defaults.SaveResult;
|
|
|
import org.hswebframework.reactor.excel.ReactorExcel;
|
|
import org.hswebframework.reactor.excel.ReactorExcel;
|
|
@@ -28,6 +28,7 @@ import org.jetlinks.community.device.response.DeviceDeployResult;
|
|
|
import org.jetlinks.community.device.response.DeviceDetail;
|
|
import org.jetlinks.community.device.response.DeviceDetail;
|
|
|
import org.jetlinks.community.device.response.ImportDeviceInstanceResult;
|
|
import org.jetlinks.community.device.response.ImportDeviceInstanceResult;
|
|
|
import org.jetlinks.community.device.service.DeviceConfigMetadataManager;
|
|
import org.jetlinks.community.device.service.DeviceConfigMetadataManager;
|
|
|
|
|
+import org.jetlinks.community.device.service.DeviceTagsService;
|
|
|
import org.jetlinks.community.device.service.LocalDeviceInstanceService;
|
|
import org.jetlinks.community.device.service.LocalDeviceInstanceService;
|
|
|
import org.jetlinks.community.device.service.LocalDeviceProductService;
|
|
import org.jetlinks.community.device.service.LocalDeviceProductService;
|
|
|
import org.jetlinks.community.device.service.data.DeviceDataService;
|
|
import org.jetlinks.community.device.service.data.DeviceDataService;
|
|
@@ -47,8 +48,10 @@ import org.jetlinks.core.message.Message;
|
|
|
import org.jetlinks.core.message.MessageType;
|
|
import org.jetlinks.core.message.MessageType;
|
|
|
import org.jetlinks.core.message.RepayableDeviceMessage;
|
|
import org.jetlinks.core.message.RepayableDeviceMessage;
|
|
|
import org.jetlinks.core.metadata.*;
|
|
import org.jetlinks.core.metadata.*;
|
|
|
|
|
+import org.jetlinks.core.metadata.types.GeoType;
|
|
|
import org.springframework.core.io.buffer.DataBufferFactory;
|
|
import org.springframework.core.io.buffer.DataBufferFactory;
|
|
|
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
|
|
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
|
|
|
|
|
+import org.springframework.dao.DuplicateKeyException;
|
|
|
import org.springframework.data.util.Lazy;
|
|
import org.springframework.data.util.Lazy;
|
|
|
import org.springframework.http.HttpHeaders;
|
|
import org.springframework.http.HttpHeaders;
|
|
|
import org.springframework.http.MediaType;
|
|
import org.springframework.http.MediaType;
|
|
@@ -61,14 +64,10 @@ import reactor.core.scheduler.Schedulers;
|
|
|
import reactor.util.function.Tuple2;
|
|
import reactor.util.function.Tuple2;
|
|
|
import reactor.util.function.Tuple4;
|
|
import reactor.util.function.Tuple4;
|
|
|
import reactor.util.function.Tuples;
|
|
import reactor.util.function.Tuples;
|
|
|
-
|
|
|
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
|
import java.net.URLEncoder;
|
|
import java.net.URLEncoder;
|
|
|
import java.nio.charset.StandardCharsets;
|
|
import java.nio.charset.StandardCharsets;
|
|
|
-import java.util.Collections;
|
|
|
|
|
-import java.util.HashMap;
|
|
|
|
|
-import java.util.List;
|
|
|
|
|
-import java.util.Map;
|
|
|
|
|
|
|
+import java.util.*;
|
|
|
import java.util.function.Function;
|
|
import java.util.function.Function;
|
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
|
@@ -96,6 +95,7 @@ public class DeviceInstanceController implements
|
|
|
|
|
|
|
|
private final DeviceConfigMetadataManager metadataManager;
|
|
private final DeviceConfigMetadataManager metadataManager;
|
|
|
|
|
|
|
|
|
|
+ private final DeviceTagsService tagsService;
|
|
|
@SuppressWarnings("all")
|
|
@SuppressWarnings("all")
|
|
|
public DeviceInstanceController(LocalDeviceInstanceService service,
|
|
public DeviceInstanceController(LocalDeviceInstanceService service,
|
|
|
DeviceRegistry registry,
|
|
DeviceRegistry registry,
|
|
@@ -103,7 +103,8 @@ public class DeviceInstanceController implements
|
|
|
ImportExportService importExportService,
|
|
ImportExportService importExportService,
|
|
|
ReactiveRepository<DeviceTagEntity, String> tagRepository,
|
|
ReactiveRepository<DeviceTagEntity, String> tagRepository,
|
|
|
DeviceDataService deviceDataService,
|
|
DeviceDataService deviceDataService,
|
|
|
- DeviceConfigMetadataManager metadataManager) {
|
|
|
|
|
|
|
+ DeviceConfigMetadataManager metadataManager,
|
|
|
|
|
+ DeviceTagsService tagsService) {
|
|
|
this.service = service;
|
|
this.service = service;
|
|
|
this.registry = registry;
|
|
this.registry = registry;
|
|
|
this.productService = productService;
|
|
this.productService = productService;
|
|
@@ -111,6 +112,7 @@ public class DeviceInstanceController implements
|
|
|
this.tagRepository = tagRepository;
|
|
this.tagRepository = tagRepository;
|
|
|
this.deviceDataService = deviceDataService;
|
|
this.deviceDataService = deviceDataService;
|
|
|
this.metadataManager = metadataManager;
|
|
this.metadataManager = metadataManager;
|
|
|
|
|
+ this.tagsService=tagsService;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@@ -207,12 +209,13 @@ public class DeviceInstanceController implements
|
|
|
//新建设备
|
|
//新建设备
|
|
|
@PostMapping
|
|
@PostMapping
|
|
|
@Operation(summary = "新建设备")
|
|
@Operation(summary = "新建设备")
|
|
|
|
|
+ @Override
|
|
|
public Mono<DeviceInstanceEntity> add(@RequestBody Mono<DeviceInstanceEntity> payload) {
|
|
public Mono<DeviceInstanceEntity> add(@RequestBody Mono<DeviceInstanceEntity> payload) {
|
|
|
return Mono
|
|
return Mono
|
|
|
.zip(payload, Authentication.currentReactive(), this::applyAuthentication)
|
|
.zip(payload, Authentication.currentReactive(), this::applyAuthentication)
|
|
|
.flatMap(entity -> service.insert(Mono.just(entity)).thenReturn(entity))
|
|
.flatMap(entity -> service.insert(Mono.just(entity)).thenReturn(entity))
|
|
|
|
|
+ .onErrorMap(DuplicateKeyException.class, err -> new BusinessException("设备ID已存在", err))
|
|
|
.onErrorMap(e->new BusinessException("服务器繁忙,请稍后重试",e));
|
|
.onErrorMap(e->new BusinessException("服务器繁忙,请稍后重试",e));
|
|
|
-// .onErrorMap(DuplicateKeyException.class, err -> new BusinessException("设备ID已存在", err));
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -259,8 +262,8 @@ public class DeviceInstanceController implements
|
|
|
public Mono<DeviceProperty> getDeviceLatestProperty(@PathVariable @Parameter(description = "设备ID") String deviceId,
|
|
public Mono<DeviceProperty> getDeviceLatestProperty(@PathVariable @Parameter(description = "设备ID") String deviceId,
|
|
|
@PathVariable @Parameter(description = "属性ID") String property) {
|
|
@PathVariable @Parameter(description = "属性ID") String property) {
|
|
|
return deviceDataService.queryEachOneProperties(deviceId, QueryParamEntity.of(), property)
|
|
return deviceDataService.queryEachOneProperties(deviceId, QueryParamEntity.of(), property)
|
|
|
- .take(1)
|
|
|
|
|
- .singleOrEmpty()
|
|
|
|
|
|
|
+ .take(1)
|
|
|
|
|
+ .singleOrEmpty()
|
|
|
;
|
|
;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -325,10 +328,10 @@ public class DeviceInstanceController implements
|
|
|
public Mono<Void> deleteDeviceTag(@PathVariable @Parameter(description = "设备ID") String deviceId,
|
|
public Mono<Void> deleteDeviceTag(@PathVariable @Parameter(description = "设备ID") String deviceId,
|
|
|
@PathVariable @Parameter(description = "标签ID") String tagId) {
|
|
@PathVariable @Parameter(description = "标签ID") String tagId) {
|
|
|
return tagRepository.createDelete()
|
|
return tagRepository.createDelete()
|
|
|
- .where(DeviceTagEntity::getDeviceId, deviceId)
|
|
|
|
|
- .and(DeviceTagEntity::getId, tagId)
|
|
|
|
|
- .execute()
|
|
|
|
|
- .then();
|
|
|
|
|
|
|
+ .where(DeviceTagEntity::getDeviceId, deviceId)
|
|
|
|
|
+ .and(DeviceTagEntity::getId, tagId)
|
|
|
|
|
+ .execute()
|
|
|
|
|
+ .then();
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -343,7 +346,7 @@ public class DeviceInstanceController implements
|
|
|
@Operation(summary = "批量删除设备")
|
|
@Operation(summary = "批量删除设备")
|
|
|
public Mono<Integer> deleteBatch(@RequestBody Mono<List<String>> idList) {
|
|
public Mono<Integer> deleteBatch(@RequestBody Mono<List<String>> idList) {
|
|
|
return idList.flatMapMany(Flux::fromIterable)
|
|
return idList.flatMapMany(Flux::fromIterable)
|
|
|
- .as(service::deleteById);
|
|
|
|
|
|
|
+ .as(service::deleteById);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -372,9 +375,9 @@ public class DeviceInstanceController implements
|
|
|
@Operation(summary = "批量激活设备")
|
|
@Operation(summary = "批量激活设备")
|
|
|
public Mono<Integer> deployBatch(@RequestBody Mono<List<String>> idList) {
|
|
public Mono<Integer> deployBatch(@RequestBody Mono<List<String>> idList) {
|
|
|
return idList.flatMapMany(service::findById)
|
|
return idList.flatMapMany(service::findById)
|
|
|
- .as(service::deploy)
|
|
|
|
|
- .map(DeviceDeployResult::getTotal)
|
|
|
|
|
- .reduce(Math::addExact);
|
|
|
|
|
|
|
+ .as(service::deploy)
|
|
|
|
|
+ .map(DeviceDeployResult::getTotal)
|
|
|
|
|
+ .reduce(Math::addExact);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -400,8 +403,8 @@ public class DeviceInstanceController implements
|
|
|
@Operation(summary = "获取设备全部标签数据")
|
|
@Operation(summary = "获取设备全部标签数据")
|
|
|
public Flux<DeviceTagEntity> getDeviceTags(@PathVariable @Parameter(description = "设备ID") String deviceId) {
|
|
public Flux<DeviceTagEntity> getDeviceTags(@PathVariable @Parameter(description = "设备ID") String deviceId) {
|
|
|
return tagRepository.createQuery()
|
|
return tagRepository.createQuery()
|
|
|
- .where(DeviceTagEntity::getDeviceId, deviceId)
|
|
|
|
|
- .fetch();
|
|
|
|
|
|
|
+ .where(DeviceTagEntity::getDeviceId, deviceId)
|
|
|
|
|
+ .fetch();
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
//保存设备标签
|
|
//保存设备标签
|
|
@@ -416,7 +419,7 @@ public class DeviceInstanceController implements
|
|
|
tag.setDeviceId(deviceId);
|
|
tag.setDeviceId(deviceId);
|
|
|
tag.tryValidate();
|
|
tag.tryValidate();
|
|
|
})
|
|
})
|
|
|
- .as(tagRepository::save)
|
|
|
|
|
|
|
+ .as(tagsService::save)
|
|
|
.thenMany(getDeviceTags(deviceId));
|
|
.thenMany(getDeviceTags(deviceId));
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -433,9 +436,9 @@ public class DeviceInstanceController implements
|
|
|
registry.getProduct(productId).flatMap(DeviceProductOperator::getMetadata),
|
|
registry.getProduct(productId).flatMap(DeviceProductOperator::getMetadata),
|
|
|
//配置
|
|
//配置
|
|
|
metadataManager.getDeviceConfigMetadataByProductId(productId)
|
|
metadataManager.getDeviceConfigMetadataByProductId(productId)
|
|
|
- .flatMapIterable(ConfigMetadata::getProperties)
|
|
|
|
|
- .collectList()
|
|
|
|
|
- .defaultIfEmpty(Collections.emptyList())
|
|
|
|
|
|
|
+ .flatMapIterable(ConfigMetadata::getProperties)
|
|
|
|
|
+ .collectList()
|
|
|
|
|
+ .defaultIfEmpty(Collections.emptyList())
|
|
|
);
|
|
);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -464,12 +467,12 @@ public class DeviceInstanceController implements
|
|
|
.buffer(100)//每100条数据保存一次
|
|
.buffer(100)//每100条数据保存一次
|
|
|
.publishOn(Schedulers.single())
|
|
.publishOn(Schedulers.single())
|
|
|
.concatMap(buffer ->
|
|
.concatMap(buffer ->
|
|
|
- Mono.zip(
|
|
|
|
|
- service.save(Flux.fromIterable(buffer).map(Tuple2::getT1)),
|
|
|
|
|
- tagRepository
|
|
|
|
|
- .save(Flux.fromIterable(buffer).flatMapIterable(Tuple2::getT2))
|
|
|
|
|
- .defaultIfEmpty(SaveResult.of(0, 0))
|
|
|
|
|
- ))
|
|
|
|
|
|
|
+ Mono.zip(
|
|
|
|
|
+ service.save(Flux.fromIterable(buffer).map(Tuple2::getT1)),
|
|
|
|
|
+ tagRepository
|
|
|
|
|
+ .save(Flux.fromIterable(buffer).flatMapIterable(Tuple2::getT2))
|
|
|
|
|
+ .defaultIfEmpty(SaveResult.of(0, 0))
|
|
|
|
|
+ ))
|
|
|
.map(res -> ImportDeviceInstanceResult.success(res.getT1()))
|
|
.map(res -> ImportDeviceInstanceResult.success(res.getT1()))
|
|
|
.onErrorResume(err -> Mono.just(ImportDeviceInstanceResult.error(err)));
|
|
.onErrorResume(err -> Mono.just(ImportDeviceInstanceResult.error(err)));
|
|
|
}
|
|
}
|
|
@@ -482,16 +485,16 @@ public class DeviceInstanceController implements
|
|
|
ServerHttpResponse response,
|
|
ServerHttpResponse response,
|
|
|
@PathVariable @Parameter(description = "文件格式,支持csv,xlsx") String format) throws IOException {
|
|
@PathVariable @Parameter(description = "文件格式,支持csv,xlsx") String format) throws IOException {
|
|
|
response.getHeaders().set(HttpHeaders.CONTENT_DISPOSITION,
|
|
response.getHeaders().set(HttpHeaders.CONTENT_DISPOSITION,
|
|
|
- "attachment; filename=".concat(URLEncoder.encode("设备导入模版." + format, StandardCharsets.UTF_8
|
|
|
|
|
- .displayName())));
|
|
|
|
|
|
|
+ "attachment; filename=".concat(URLEncoder.encode("设备导入模版." + format, StandardCharsets.UTF_8
|
|
|
|
|
+ .displayName())));
|
|
|
return getDeviceProductDetail(productId)
|
|
return getDeviceProductDetail(productId)
|
|
|
.map(tp4 -> DeviceExcelInfo.getTemplateHeaderMapping(tp4.getT3().getTags(), tp4.getT4()))
|
|
.map(tp4 -> DeviceExcelInfo.getTemplateHeaderMapping(tp4.getT3().getTags(), tp4.getT4()))
|
|
|
.defaultIfEmpty(DeviceExcelInfo.getTemplateHeaderMapping(Collections.emptyList(), Collections.emptyList()))
|
|
.defaultIfEmpty(DeviceExcelInfo.getTemplateHeaderMapping(Collections.emptyList(), Collections.emptyList()))
|
|
|
.flatMapMany(headers ->
|
|
.flatMapMany(headers ->
|
|
|
- ReactorExcel.<DeviceExcelInfo>writer(format)
|
|
|
|
|
- .headers(headers)
|
|
|
|
|
- .converter(DeviceExcelInfo::toMap)
|
|
|
|
|
- .writeBuffer(Flux.empty()))
|
|
|
|
|
|
|
+ ReactorExcel.<DeviceExcelInfo>writer(format)
|
|
|
|
|
+ .headers(headers)
|
|
|
|
|
+ .converter(DeviceExcelInfo::toMap)
|
|
|
|
|
+ .writeBuffer(Flux.empty()))
|
|
|
.doOnError(err -> log.error(err.getMessage(), err))
|
|
.doOnError(err -> log.error(err.getMessage(), err))
|
|
|
.map(bufferFactory::wrap)
|
|
.map(bufferFactory::wrap)
|
|
|
.as(response::writeWith);
|
|
.as(response::writeWith);
|
|
@@ -506,8 +509,8 @@ public class DeviceInstanceController implements
|
|
|
@Parameter(hidden = true) QueryParamEntity parameter,
|
|
@Parameter(hidden = true) QueryParamEntity parameter,
|
|
|
@PathVariable @Parameter(description = "文件格式,支持csv,xlsx") String format) throws IOException {
|
|
@PathVariable @Parameter(description = "文件格式,支持csv,xlsx") String format) throws IOException {
|
|
|
response.getHeaders().set(HttpHeaders.CONTENT_DISPOSITION,
|
|
response.getHeaders().set(HttpHeaders.CONTENT_DISPOSITION,
|
|
|
- "attachment; filename=".concat(URLEncoder.encode("设备实例." + format, StandardCharsets.UTF_8
|
|
|
|
|
- .displayName())));
|
|
|
|
|
|
|
+ "attachment; filename=".concat(URLEncoder.encode("设备实例." + format, StandardCharsets.UTF_8
|
|
|
|
|
+ .displayName())));
|
|
|
parameter.setPaging(false);
|
|
parameter.setPaging(false);
|
|
|
parameter.toNestQuery(q -> q.is(DeviceInstanceEntity::getProductId, productId));
|
|
parameter.toNestQuery(q -> q.is(DeviceInstanceEntity::getProductId, productId));
|
|
|
return getDeviceProductDetail(productId)
|
|
return getDeviceProductDetail(productId)
|
|
@@ -523,44 +526,44 @@ public class DeviceInstanceController implements
|
|
|
.collect(Collectors.toList())
|
|
.collect(Collectors.toList())
|
|
|
))
|
|
))
|
|
|
.defaultIfEmpty(Tuples.of(DeviceExcelInfo.getExportHeaderMapping(Collections.emptyList(),
|
|
.defaultIfEmpty(Tuples.of(DeviceExcelInfo.getExportHeaderMapping(Collections.emptyList(),
|
|
|
- Collections.emptyList()),
|
|
|
|
|
- Collections.emptyList()))
|
|
|
|
|
|
|
+ Collections.emptyList()),
|
|
|
|
|
+ Collections.emptyList()))
|
|
|
.flatMapMany(headerAndConfigKey ->
|
|
.flatMapMany(headerAndConfigKey ->
|
|
|
- ReactorExcel.<DeviceExcelInfo>writer(format)
|
|
|
|
|
- .headers(headerAndConfigKey.getT1())
|
|
|
|
|
- .converter(DeviceExcelInfo::toMap)
|
|
|
|
|
- .writeBuffer(
|
|
|
|
|
- service.query(parameter)
|
|
|
|
|
- .flatMap(entity -> {
|
|
|
|
|
- DeviceExcelInfo exportEntity = FastBeanCopier.copy(entity, new DeviceExcelInfo(), "state");
|
|
|
|
|
- exportEntity.setState(entity.getState().getText());
|
|
|
|
|
- return registry
|
|
|
|
|
- .getDevice(entity.getId())
|
|
|
|
|
- .flatMap(deviceOperator -> deviceOperator
|
|
|
|
|
- .getSelfConfigs(headerAndConfigKey.getT2())
|
|
|
|
|
- .map(Values::getAllValues))
|
|
|
|
|
- .doOnNext(configs -> exportEntity
|
|
|
|
|
- .getConfiguration()
|
|
|
|
|
- .putAll(configs))
|
|
|
|
|
- .thenReturn(exportEntity);
|
|
|
|
|
- })
|
|
|
|
|
- .buffer(200)
|
|
|
|
|
- .flatMap(list -> {
|
|
|
|
|
- Map<String, DeviceExcelInfo> importInfo = list
|
|
|
|
|
- .stream()
|
|
|
|
|
- .collect(Collectors.toMap(DeviceExcelInfo::getId, Function.identity()));
|
|
|
|
|
- return tagRepository.createQuery()
|
|
|
|
|
- .where()
|
|
|
|
|
- .in(DeviceTagEntity::getDeviceId, importInfo.keySet())
|
|
|
|
|
- .fetch()
|
|
|
|
|
- .collect(Collectors.groupingBy(DeviceTagEntity::getDeviceId))
|
|
|
|
|
- .flatMapIterable(Map::entrySet)
|
|
|
|
|
- .doOnNext(entry -> importInfo
|
|
|
|
|
- .get(entry.getKey())
|
|
|
|
|
- .setTags(entry.getValue()))
|
|
|
|
|
- .thenMany(Flux.fromIterable(list));
|
|
|
|
|
- })
|
|
|
|
|
- , 512 * 1024))//缓冲512k
|
|
|
|
|
|
|
+ ReactorExcel.<DeviceExcelInfo>writer(format)
|
|
|
|
|
+ .headers(headerAndConfigKey.getT1())
|
|
|
|
|
+ .converter(DeviceExcelInfo::toMap)
|
|
|
|
|
+ .writeBuffer(
|
|
|
|
|
+ service.query(parameter)
|
|
|
|
|
+ .flatMap(entity -> {
|
|
|
|
|
+ DeviceExcelInfo exportEntity = FastBeanCopier.copy(entity, new DeviceExcelInfo(), "state");
|
|
|
|
|
+ exportEntity.setState(entity.getState().getText());
|
|
|
|
|
+ return registry
|
|
|
|
|
+ .getDevice(entity.getId())
|
|
|
|
|
+ .flatMap(deviceOperator -> deviceOperator
|
|
|
|
|
+ .getSelfConfigs(headerAndConfigKey.getT2())
|
|
|
|
|
+ .map(Values::getAllValues))
|
|
|
|
|
+ .doOnNext(configs -> exportEntity
|
|
|
|
|
+ .getConfiguration()
|
|
|
|
|
+ .putAll(configs))
|
|
|
|
|
+ .thenReturn(exportEntity);
|
|
|
|
|
+ })
|
|
|
|
|
+ .buffer(200)
|
|
|
|
|
+ .flatMap(list -> {
|
|
|
|
|
+ Map<String, DeviceExcelInfo> importInfo = list
|
|
|
|
|
+ .stream()
|
|
|
|
|
+ .collect(Collectors.toMap(DeviceExcelInfo::getId, Function.identity()));
|
|
|
|
|
+ return tagRepository.createQuery()
|
|
|
|
|
+ .where()
|
|
|
|
|
+ .in(DeviceTagEntity::getDeviceId, importInfo.keySet())
|
|
|
|
|
+ .fetch()
|
|
|
|
|
+ .collect(Collectors.groupingBy(DeviceTagEntity::getDeviceId))
|
|
|
|
|
+ .flatMapIterable(Map::entrySet)
|
|
|
|
|
+ .doOnNext(entry -> importInfo
|
|
|
|
|
+ .get(entry.getKey())
|
|
|
|
|
+ .setTags(entry.getValue()))
|
|
|
|
|
+ .thenMany(Flux.fromIterable(list));
|
|
|
|
|
+ })
|
|
|
|
|
+ , 512 * 1024))//缓冲512k
|
|
|
.doOnError(err -> log.error(err.getMessage(), err))
|
|
.doOnError(err -> log.error(err.getMessage(), err))
|
|
|
.map(bufferFactory::wrap)
|
|
.map(bufferFactory::wrap)
|
|
|
.as(response::writeWith);
|
|
.as(response::writeWith);
|
|
@@ -575,8 +578,8 @@ public class DeviceInstanceController implements
|
|
|
@Parameter(hidden = true) QueryParamEntity parameter,
|
|
@Parameter(hidden = true) QueryParamEntity parameter,
|
|
|
@PathVariable @Parameter(description = "文件格式,支持csv,xlsx") String format) throws IOException {
|
|
@PathVariable @Parameter(description = "文件格式,支持csv,xlsx") String format) throws IOException {
|
|
|
response.getHeaders().set(HttpHeaders.CONTENT_DISPOSITION,
|
|
response.getHeaders().set(HttpHeaders.CONTENT_DISPOSITION,
|
|
|
- "attachment; filename=".concat(URLEncoder.encode("设备实例." + format, StandardCharsets.UTF_8
|
|
|
|
|
- .displayName())));
|
|
|
|
|
|
|
+ "attachment; filename=".concat(URLEncoder.encode("设备实例." + format, StandardCharsets.UTF_8
|
|
|
|
|
+ .displayName())));
|
|
|
return ReactorExcel.<DeviceExcelInfo>writer(format)
|
|
return ReactorExcel.<DeviceExcelInfo>writer(format)
|
|
|
.headers(DeviceExcelInfo.getExportHeaderMapping(Collections.emptyList(), Collections.emptyList()))
|
|
.headers(DeviceExcelInfo.getExportHeaderMapping(Collections.emptyList(), Collections.emptyList()))
|
|
|
.converter(DeviceExcelInfo::toMap)
|
|
.converter(DeviceExcelInfo::toMap)
|
|
@@ -603,8 +606,8 @@ public class DeviceInstanceController implements
|
|
|
return Mono
|
|
return Mono
|
|
|
.zip(registry.getDevice(deviceId), shadow)
|
|
.zip(registry.getDevice(deviceId), shadow)
|
|
|
.flatMap(tp2 -> tp2.getT1()
|
|
.flatMap(tp2 -> tp2.getT1()
|
|
|
- .setConfig(DeviceConfigKey.shadow, tp2.getT2())
|
|
|
|
|
- .thenReturn(tp2.getT2()));
|
|
|
|
|
|
|
+ .setConfig(DeviceConfigKey.shadow, tp2.getT2())
|
|
|
|
|
+ .thenReturn(tp2.getT2()));
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
//获取设备影子
|
|
//获取设备影子
|
|
@@ -649,10 +652,10 @@ public class DeviceInstanceController implements
|
|
|
return param
|
|
return param
|
|
|
.flatMapMany(request -> deviceDataService
|
|
.flatMapMany(request -> deviceDataService
|
|
|
.aggregationPropertiesByDevice(deviceId,
|
|
.aggregationPropertiesByDevice(deviceId,
|
|
|
- request.getQuery(),
|
|
|
|
|
- request
|
|
|
|
|
- .getColumns()
|
|
|
|
|
- .toArray(new DeviceDataService.DevicePropertyAggregation[0]))
|
|
|
|
|
|
|
+ request.getQuery(),
|
|
|
|
|
+ request
|
|
|
|
|
+ .getColumns()
|
|
|
|
|
+ .toArray(new DeviceDataService.DevicePropertyAggregation[0]))
|
|
|
)
|
|
)
|
|
|
.map(AggregationData::values);
|
|
.map(AggregationData::values);
|
|
|
}
|
|
}
|
|
@@ -718,8 +721,8 @@ public class DeviceInstanceController implements
|
|
|
entity.setWhere(where);
|
|
entity.setWhere(where);
|
|
|
entity.includes("id");
|
|
entity.includes("id");
|
|
|
return service.query(entity)
|
|
return service.query(entity)
|
|
|
- .flatMap(device -> registry.getDevice(device.getId()))
|
|
|
|
|
- .cache();
|
|
|
|
|
|
|
+ .flatMap(device -> registry.getDevice(device.getId()))
|
|
|
|
|
+ .cache();
|
|
|
});
|
|
});
|
|
|
return messages
|
|
return messages
|
|
|
.flatMap(message -> {
|
|
.flatMap(message -> {
|
|
@@ -766,13 +769,24 @@ public class DeviceInstanceController implements
|
|
|
public Mono<Void> updateMetadata(@PathVariable String id,
|
|
public Mono<Void> updateMetadata(@PathVariable String id,
|
|
|
@RequestBody Mono<String> metadata) {
|
|
@RequestBody Mono<String> metadata) {
|
|
|
return metadata
|
|
return metadata
|
|
|
- .flatMap(metadata_ -> service
|
|
|
|
|
|
|
+ .doOnNext(metadata_ -> service
|
|
|
.createUpdate()
|
|
.createUpdate()
|
|
|
.set(DeviceInstanceEntity::getDeriveMetadata, metadata_)
|
|
.set(DeviceInstanceEntity::getDeriveMetadata, metadata_)
|
|
|
.where(DeviceInstanceEntity::getId, id)
|
|
.where(DeviceInstanceEntity::getId, id)
|
|
|
.execute()
|
|
.execute()
|
|
|
.then(registry.getDevice(id))
|
|
.then(registry.getDevice(id))
|
|
|
- .flatMap(device -> device.updateMetadata(metadata_)))
|
|
|
|
|
|
|
+ .doOnNext(device -> device.updateMetadata(metadata_))
|
|
|
|
|
+ .flatMap(DeviceOperator::getMetadata)
|
|
|
|
|
+ .doOnNext(metadataMono -> {
|
|
|
|
|
+ //查看是否存在地理位置
|
|
|
|
|
+ List<PropertyMetadata> properties = metadataMono.getProperties();
|
|
|
|
|
+ List<PropertyMetadata> tags = metadataMono.getTags();
|
|
|
|
|
+ //属性中存在地理位置
|
|
|
|
|
+ Set<PropertyMetadata> proCollection =
|
|
|
|
|
+ properties.stream().filter(propertyMetadata -> propertyMetadata.getValueType().getType().equals(GeoType.ID)).collect(Collectors.toSet());
|
|
|
|
|
+ //标签中存在地理位置
|
|
|
|
|
+ Set<PropertyMetadata> tagCollection = tags.stream().filter(propertyMetadata -> propertyMetadata.getValueType().getType().equals(GeoType.ID)).collect(Collectors.toSet());
|
|
|
|
|
+ }))
|
|
|
.then();
|
|
.then();
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -786,11 +800,11 @@ public class DeviceInstanceController implements
|
|
|
.getDevice(id)
|
|
.getDevice(id)
|
|
|
.flatMap(DeviceOperator::resetMetadata)
|
|
.flatMap(DeviceOperator::resetMetadata)
|
|
|
.then(service
|
|
.then(service
|
|
|
- .createUpdate()
|
|
|
|
|
- .setNull(DeviceInstanceEntity::getDeriveMetadata)
|
|
|
|
|
- .where(DeviceInstanceEntity::getId, id)
|
|
|
|
|
- .execute()
|
|
|
|
|
- .then());
|
|
|
|
|
|
|
+ .createUpdate()
|
|
|
|
|
+ .setNull(DeviceInstanceEntity::getDeriveMetadata)
|
|
|
|
|
+ .where(DeviceInstanceEntity::getId, id)
|
|
|
|
|
+ .execute()
|
|
|
|
|
+ .then());
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
|