/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.demo.protocol.mqtt;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import io.netty.buffer.Unpooled;
import java.nio.charset.StandardCharsets;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DisconnectDeviceMessage;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.codec.DefaultTransport;
import org.jetlinks.core.message.codec.DeviceMessageCodec;
import org.jetlinks.core.message.codec.EncodedMessage;
import org.jetlinks.core.message.codec.MessageDecodeContext;
import org.jetlinks.core.message.codec.MessageEncodeContext;
import org.jetlinks.core.message.codec.MqttMessage;
import org.jetlinks.core.message.codec.SimpleMqttMessage;
import org.jetlinks.core.message.codec.ToDeviceMessageContext;
import org.jetlinks.core.message.codec.Transport;
import org.jetlinks.demo.protocol.TopicMessage;
import org.jetlinks.demo.protocol.TopicMessageCodec;
import reactor.core.publisher.Mono;

public class MqttDeviceMessageCodec
extends TopicMessageCodec
implements DeviceMessageCodec {
    public Transport getSupportTransport() {
        return DefaultTransport.MQTT;
    }

    public Mono<? extends Message> decode(MessageDecodeContext context) {
        return Mono.fromSupplier(() -> {
            MqttMessage mqttMessage = (MqttMessage)context.getMessage();
            String topic = mqttMessage.getTopic();
            JSONObject payload = JSON.parseObject((String)mqttMessage.getPayload().toString(StandardCharsets.UTF_8));
            String deviceId = context.getDevice() != null ? context.getDevice().getDeviceId() : null;
            return this.doDecode(deviceId, topic, payload);
        });
    }

    public Mono<EncodedMessage> encode(MessageEncodeContext context) {
        Message message = context.getMessage();
        return Mono.defer(() -> {
            if (message instanceof DeviceMessage) {
                if (message instanceof DisconnectDeviceMessage) {
                    return ((ToDeviceMessageContext)context).disconnect().then(Mono.empty());
                }
                TopicMessage msg = this.doEncode((DeviceMessage)message);
                if (null == msg) {
                    return Mono.empty();
                }
                return Mono.just((Object)SimpleMqttMessage.builder().topic(msg.getTopic()).payload(Unpooled.wrappedBuffer((byte[])JSON.toJSONBytes((Object)msg.getMessage(), (SerializerFeature[])new SerializerFeature[0]))).build());
            }
            return Mono.empty();
        });
    }
}

