/*
 * Decompiled with CFR 0.152.
 */
package progress.message.broker.mqtt;

import com.sonicsw.mq.components.BrokerComponent;
import java.io.IOException;
import java.io.InputStream;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import progress.message.broker.Config;
import progress.message.broker.mqtt.MqttBrokerConnection;
import progress.message.broker.mqtt.MqttRegistrar;
import progress.message.broker.mqtt.codec.MqttCodecUtils;
import progress.message.broker.mqtt.codec.MqttDecoder;
import progress.message.broker.mqtt.proto.ConnAckMessage;
import progress.message.broker.mqtt.proto.ConnectMessage;
import progress.message.broker.mqtt.proto.MessageIdMessage;
import progress.message.broker.mqtt.proto.MqttConnectException;
import progress.message.broker.mqtt.proto.MqttMessage;
import progress.message.broker.mqtt.proto.MqttQoS;
import progress.message.broker.mqtt.proto.MqttSubscription;
import progress.message.broker.mqtt.proto.PingRespMessage;
import progress.message.broker.mqtt.proto.PubRelMessage;
import progress.message.broker.mqtt.proto.PublishMessage;
import progress.message.broker.mqtt.proto.SubAckMessage;
import progress.message.broker.mqtt.proto.SubscribeMessage;
import progress.message.broker.mqtt.proto.UnsubAckMessage;
import progress.message.broker.mqtt.proto.UnsubscribeMessage;
import progress.message.client.EAlreadyConnected;
import progress.message.client.EAnonymousConnectionDisallowed;
import progress.message.client.EConnectionLimitExceeded;
import progress.message.client.ECredentialInUse;
import progress.message.client.EInauthenticBroker;
import progress.message.client.EInauthenticClient;
import progress.message.client.EPasswordExpired;
import progress.message.client.ESecurityGeneralException;
import progress.message.client.EUserAlreadyConnected;
import progress.message.net.ISocket;
import progress.message.strm.StreamFactory;
import progress.message.util.server.ByteBuffer;
import progress.message.zclient.Listener;

public class MqttListener
extends Listener {
    private final MqttBrokerConnection m_connection;
    private InputStream m_is;

    public MqttListener(MqttBrokerConnection connection) throws IOException {
        super(connection);
        this.m_connection = connection;
    }

    @Override
    public void threadMain() {
        block24: {
            try {
                if (this.DEBUG) {
                    this.logDebug(128, "Starting MQTT Listener");
                }
                this.initialize();
                while (!this.isShuttingDown()) {
                    int read;
                    byte[] buffer = new byte[this.m_connection.getMaxRcvBufferSize()];
                    ByteBuffer readBuffer = new ByteBuffer(buffer.length);
                    while (!this.isShuttingDown() && (read = this.m_is.read(buffer)) != -1) {
                        try {
                            MqttMessage message;
                            readBuffer.write(buffer, read);
                            while (!MqttCodecUtils.checkHeaderAvailability(readBuffer) && (read = this.m_is.read(buffer)) != -1) {
                                readBuffer.write(buffer, read);
                            }
                            block19: while ((message = MqttDecoder.decode(readBuffer)) != null) {
                                this.m_connection.messageReceived(message);
                                switch (message.getMessageType()) {
                                    case 1: {
                                        this.handleConnect((ConnectMessage)message);
                                        continue block19;
                                    }
                                    case 12: {
                                        this.handlePing();
                                        continue block19;
                                    }
                                    case 14: {
                                        this.handleDisconnect();
                                        continue block19;
                                    }
                                    case 8: {
                                        this.handleSubscribe((SubscribeMessage)message);
                                        continue block19;
                                    }
                                    case 10: {
                                        this.handleUnsubscribe((UnsubscribeMessage)message);
                                        continue block19;
                                    }
                                    case 3: {
                                        this.handlePublish((PublishMessage)message);
                                        continue block19;
                                    }
                                    case 6: {
                                        this.handleConfirmPublish((PubRelMessage)message);
                                        continue block19;
                                    }
                                    case 5: {
                                        this.handleAckDelivery((MessageIdMessage)message);
                                        continue block19;
                                    }
                                    case 4: 
                                    case 7: {
                                        this.handleConfirmDelivery((MessageIdMessage)message);
                                        continue block19;
                                    }
                                }
                                this.handleInvalidMessage(message);
                            }
                            if (readBuffer.countUnread() != 0) continue;
                            readBuffer.clear();
                        }
                        catch (MqttConnectException ex) {
                            ConnAckMessage connAckMessage = new ConnAckMessage();
                            connAckMessage.setReturnCode(ex.getReturnCode());
                            this.m_connection.send(connAckMessage);
                            this.m_connection.fatalError();
                            break;
                        }
                    }
                    if (this.isShuttingDown()) continue;
                    if (this.DEBUG) {
                        String msg = "Detected EOF on MQTT connection with clientId = " + this.m_connection.getClientId();
                        BrokerComponent.getComponentContext().logMessage(msg, 7);
                    }
                    this.m_connection.fatalError();
                }
            }
            catch (SocketException | SocketTimeoutException ex) {
                if (!this.isShuttingDown()) {
                    String msg = "Detected timeout on MQTT connection with clientId = " + this.m_connection.getClientId();
                    BrokerComponent.getComponentContext().logMessage(msg, 2);
                    this.m_connection.fatalError();
                }
            }
            catch (Exception ex) {
                BrokerComponent.getComponentContext().logMessage((Throwable)ex, 2);
                if (this.DEBUG) {
                    this.debug("Caught " + ex);
                }
                if (this.isShuttingDown()) break block24;
                this.m_connection.fatalError();
            }
        }
    }

    private void handlePublish(PublishMessage message) throws IOException {
        this.m_connection.getSession().publish(message);
    }

    private void handleConfirmPublish(PubRelMessage message) {
        this.m_connection.getSession().confirmPublish(message);
    }

    private void handleConfirmDelivery(MessageIdMessage message) {
        this.m_connection.getSession().confirmDelivery(message);
    }

    private void handleAckDelivery(MessageIdMessage message) throws IOException {
        this.m_connection.getSession().ackDelivery(message);
    }

    private void handleSubscribe(SubscribeMessage message) throws IOException {
        SubAckMessage subAckMessage = new SubAckMessage();
        subAckMessage.setMessageId(message.getMessageId());
        for (MqttSubscription subscription : message.subscriptions()) {
            MqttQoS grantedQos = this.m_connection.getSession().subscribe(subscription);
            subAckMessage.addGrantedQos(grantedQos);
        }
        this.m_connection.send(subAckMessage);
    }

    private void handleUnsubscribe(UnsubscribeMessage message) throws IOException {
        UnsubAckMessage unsubAckMessage = new UnsubAckMessage();
        unsubAckMessage.setMessageId(message.getMessageId());
        for (String topicFilter : message.topicFilters()) {
            this.m_connection.getSession().unsubscribe(topicFilter);
        }
        this.m_connection.send(unsubAckMessage);
    }

    private void handleInvalidMessage(MqttMessage message) {
        String errorMessage = "MQTT message (" + message.getMessageType() + ") not supported by the broker";
        BrokerComponent.getComponentContext().logMessage(errorMessage, 2);
        this.m_connection.fatalError();
    }

    private void handleDisconnect() {
        this.m_connection.close();
    }

    private void handlePing() throws IOException {
        PingRespMessage pingRespMessage = new PingRespMessage();
        this.m_connection.send(pingRespMessage);
    }

    private void handleConnect(ConnectMessage message) throws IOException {
        if (this.m_connection.isConnected()) {
            this.m_connection.fatalError();
            return;
        }
        ConnAckMessage connAckMessage = new ConnAckMessage();
        try {
            boolean sessionPresent = !message.isCleanSession() && MqttRegistrar.isSessionPresent(message.getClientID());
            connAckMessage.setSessionPresent(sessionPresent);
            this.m_connection.connectInternal(message);
            connAckMessage.setReturnCode((byte)0);
            this.m_connection.send(connAckMessage);
        }
        catch (EAnonymousConnectionDisallowed | ECredentialInUse | EInauthenticBroker | EInauthenticClient | EPasswordExpired ex) {
            connAckMessage.setReturnCode((byte)4);
            this.m_connection.send(connAckMessage);
            this.m_connection.fatalError();
        }
        catch (EAlreadyConnected | EConnectionLimitExceeded | ESecurityGeneralException | EUserAlreadyConnected ex) {
            connAckMessage.setReturnCode((byte)5);
            this.m_connection.send(connAckMessage);
            this.m_connection.fatalError();
        }
        catch (IOException ex) {
            connAckMessage.setReturnCode((byte)3);
            this.m_connection.send(connAckMessage);
            this.m_connection.fatalError();
        }
    }

    private void initialize() throws IOException {
        ISocket socket = this.m_connection.getSocket();
        socket.setSoTimeout(Config.BROKER_CONNECT_TIMEOUT);
        if (this.DEBUG) {
            this.logDebug(128, "Starting transport handshake");
        }
        socket.startTransportHandshake();
        if (this.DEBUG) {
            this.logDebug(128, "Starting MQTT handshake");
        }
        socket.startProgressHandshake();
        this.m_is = StreamFactory.getListenerInputStream(socket, this.m_connection.getMaxRcvBufferSize(), this.m_connection.getMinRcvBufferSize(), this.m_connection.getInitialRcvBufferSize(), false);
        if (this.DEBUG) {
            this.logDebug(64, "Created InputStream " + this.m_is);
        }
    }
}

