|
|
@@ -139,23 +139,22 @@ public class CoapServerDeviceGateway implements DeviceGateway, MonitorSupportDev
|
|
|
.filter(ignore->started.get())
|
|
|
.publishOn(Schedulers.parallel())
|
|
|
.doOnNext(operator->gatewayMonitor.receivedMessage())
|
|
|
+ .doOnNext(operator->{
|
|
|
+ counter.increment();
|
|
|
+ CoapConnectionSession newSession = new CoapConnectionSession(exchangeMessage.getDeviceId(),operator,DefaultTransport.CoAP,gatewayMonitor);
|
|
|
+ DeviceSession session = sessionManager.getSession(operator.getDeviceId());
|
|
|
+ if (null == session) {
|
|
|
+ sessionManager.register(newSession);
|
|
|
+ } else if (session instanceof ReplaceableDeviceSession) {
|
|
|
+ ((ReplaceableDeviceSession) session).replaceWith(newSession);
|
|
|
+ }else{
|
|
|
+ sessionManager.register(newSession);
|
|
|
+ }
|
|
|
+ gatewayMonitor.connected();
|
|
|
+ gatewayMonitor.totalConnection(counter.sum());
|
|
|
+ })
|
|
|
.flatMap(operator->
|
|
|
this.decodeAndHandleMessage(operator,exchangeMessage)
|
|
|
- .doOnNext(ignore->{
|
|
|
- counter.increment();
|
|
|
- CoapConnectionSession newSession = new CoapConnectionSession(exchangeMessage.getDeviceId(),operator,DefaultTransport.CoAP,gatewayMonitor);
|
|
|
- DeviceSession session = sessionManager.getSession(operator.getDeviceId());
|
|
|
- if (null == session) {
|
|
|
- sessionManager.register(newSession);
|
|
|
- } else if (session instanceof ReplaceableDeviceSession) {
|
|
|
- ((ReplaceableDeviceSession) session).replaceWith(newSession);
|
|
|
- }else{
|
|
|
- sessionManager.register(newSession);
|
|
|
- }
|
|
|
- gatewayMonitor.connected();
|
|
|
- gatewayMonitor.totalConnection(counter.sum());
|
|
|
- }
|
|
|
- )
|
|
|
)
|
|
|
.doOnSuccess(s->{
|
|
|
exchangeMessage.response(CoAP.ResponseCode.CONTENT);
|
|
|
@@ -170,7 +169,7 @@ public class CoapServerDeviceGateway implements DeviceGateway, MonitorSupportDev
|
|
|
return operator
|
|
|
.getProtocol()
|
|
|
.flatMap(protocol -> protocol.getMessageCodec(getTransport()))
|
|
|
- .flatMapMany(codec -> codec.decode(FromDeviceMessageContext.of(null, exchangeMessage, registry)))
|
|
|
+ .flatMapMany(codec -> codec.decode(FromDeviceMessageContext.of(sessionManager.getSession(operator.getDeviceId()), exchangeMessage, registry)))
|
|
|
.cast(DeviceMessage.class)
|
|
|
.flatMap(msg -> {
|
|
|
if (messageProcessor.hasDownstreams()) {
|
|
|
@@ -191,7 +190,7 @@ public class CoapServerDeviceGateway implements DeviceGateway, MonitorSupportDev
|
|
|
return handleMessage(operator, msg);
|
|
|
})
|
|
|
.then()
|
|
|
- .doOnEach(ReactiveLogger.onError(err -> log.error("处理MQTT连接[{}]消息失败:{}", operator.getDeviceId(), exchangeMessage, err)))
|
|
|
+ .doOnEach(ReactiveLogger.onError(err -> log.error("处理Coap连接[{}]消息失败:{}", operator.getDeviceId(), exchangeMessage, err)))
|
|
|
.onErrorResume((err) -> Mono.empty());
|
|
|
}
|
|
|
|