|
|
@@ -3,6 +3,7 @@ package org.jetlinks.community.device.interceptor;
|
|
|
import lombok.AllArgsConstructor;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.hswebframework.ezorm.rdb.mapping.defaults.SaveResult;
|
|
|
+import org.hswebframework.web.exception.BusinessException;
|
|
|
import org.jetlinks.community.device.entity.DeviceDirectivesEntity;
|
|
|
import org.jetlinks.community.device.enums.DirectiveState;
|
|
|
import org.jetlinks.community.device.service.DeviceDirectivesService;
|
|
|
@@ -15,10 +16,13 @@ import org.jetlinks.core.message.function.FunctionInvokeMessage;
|
|
|
import org.jetlinks.core.message.interceptor.DeviceMessageSenderInterceptor;
|
|
|
import org.jetlinks.core.message.property.ReadPropertyMessage;
|
|
|
import org.jetlinks.core.message.property.WritePropertyMessage;
|
|
|
+import org.reactivestreams.Publisher;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
import reactor.core.publisher.Flux;
|
|
|
import reactor.core.publisher.Mono;
|
|
|
|
|
|
+import java.util.function.Function;
|
|
|
+
|
|
|
/**
|
|
|
* @author lifang
|
|
|
* @version 1.0.0
|
|
|
@@ -59,6 +63,11 @@ public class DeviceDirectiveSendInterceptor implements DeviceMessageSenderInterc
|
|
|
public <R extends DeviceMessage> Flux<R> afterSent(DeviceOperator device, DeviceMessage message, Flux<R> reply) {
|
|
|
DeviceDirectivesEntity.MessageType directive = isDirective(message);
|
|
|
if(directive!=null){
|
|
|
+ Function<RuntimeException, Publisher<? extends Integer>> errorFallback = e->deviceDirectivesService.createUpdate()
|
|
|
+ .where(DeviceDirectivesEntity::getMessageId,message.getMessageId())
|
|
|
+ .set(DeviceDirectivesEntity::getLastError,e.getMessage())
|
|
|
+ .set(DeviceDirectivesEntity::getState,DirectiveState.sendError)
|
|
|
+ .execute();
|
|
|
return reply
|
|
|
.cast(DeviceMessage.class)
|
|
|
.flatMap(msg->
|
|
|
@@ -68,11 +77,8 @@ public class DeviceDirectiveSendInterceptor implements DeviceMessageSenderInterc
|
|
|
.set(DeviceDirectivesEntity::getState,DirectiveState.success)
|
|
|
.set(DeviceDirectivesEntity::getLastError,null)
|
|
|
.execute())
|
|
|
- .onErrorResume(DeviceOperationException.class,e->deviceDirectivesService.createUpdate()
|
|
|
- .where(DeviceDirectivesEntity::getMessageId,message.getMessageId())
|
|
|
- .set(DeviceDirectivesEntity::getLastError,e.getMessage())
|
|
|
- .set(DeviceDirectivesEntity::getState,DirectiveState.sendError)
|
|
|
- .execute())
|
|
|
+ .onErrorResume(DeviceOperationException.class, errorFallback)
|
|
|
+ .onErrorResume(BusinessException.class, errorFallback)
|
|
|
.thenMany(reply);
|
|
|
}
|
|
|
return reply;
|