package progress.message.broker.mqtt;

import com.sonicsw.mq.components.BrokerComponent;
import java.io.IOException;
import java.io.InputStream;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.Iterator;
import progress.message.broker.Config;
import progress.message.broker.mqtt.codec.MqttCodecUtils;
import progress.message.broker.mqtt.codec.MqttDecoder;
import progress.message.broker.mqtt.proto.ConnAckMessage;
import progress.message.broker.mqtt.proto.ConnectMessage;
import progress.message.broker.mqtt.proto.MessageIdMessage;
import progress.message.broker.mqtt.proto.MqttConnectException;
import progress.message.broker.mqtt.proto.MqttMessage;
import progress.message.broker.mqtt.proto.MqttSubscription;
import progress.message.broker.mqtt.proto.PingRespMessage;
import progress.message.broker.mqtt.proto.PubRelMessage;
import progress.message.broker.mqtt.proto.PublishMessage;
import progress.message.broker.mqtt.proto.SubAckMessage;
import progress.message.broker.mqtt.proto.SubscribeMessage;
import progress.message.broker.mqtt.proto.UnsubAckMessage;
import progress.message.broker.mqtt.proto.UnsubscribeMessage;
import progress.message.client.EAlreadyConnected;
import progress.message.client.EAnonymousConnectionDisallowed;
import progress.message.client.EConnectionLimitExceeded;
import progress.message.client.ECredentialInUse;
import progress.message.client.EInauthenticBroker;
import progress.message.client.EInauthenticClient;
import progress.message.client.EPasswordExpired;
import progress.message.client.ESecurityGeneralException;
import progress.message.client.EUserAlreadyConnected;
import progress.message.net.ISocket;
import progress.message.strm.StreamFactory;
import progress.message.util.server.ByteBuffer;
import progress.message.zclient.Listener;

/* loaded from: input_file:progress/message/broker/mqtt/MqttListener.class */
public class MqttListener extends Listener {
    private final MqttBrokerConnection m_connection;
    private InputStream m_is;

    public MqttListener(MqttBrokerConnection mqttBrokerConnection) throws IOException {
        super(mqttBrokerConnection);
        this.m_connection = mqttBrokerConnection;
    }

    @Override // progress.message.zclient.DebugThread
    public void threadMain() {
        int read;
        int read2;
        try {
            if (this.DEBUG) {
                logDebug(128, "Starting MQTT Listener");
            }
            initialize();
            while (!isShuttingDown()) {
                byte[] bArr = new byte[this.m_connection.getMaxRcvBufferSize()];
                ByteBuffer byteBuffer = new ByteBuffer(bArr.length);
                while (!isShuttingDown() && (read = this.m_is.read(bArr)) != -1) {
                    try {
                        byteBuffer.write(bArr, read);
                        while (!MqttCodecUtils.checkHeaderAvailability(byteBuffer) && (read2 = this.m_is.read(bArr)) != -1) {
                            byteBuffer.write(bArr, read2);
                        }
                        while (true) {
                            MqttMessage decode = MqttDecoder.decode(byteBuffer);
                            if (decode != null) {
                                this.m_connection.messageReceived(decode);
                                switch (decode.getMessageType()) {
                                    case 1:
                                        handleConnect((ConnectMessage) decode);
                                        break;
                                    case 2:
                                    case 9:
                                    case 11:
                                    case 13:
                                    default:
                                        handleInvalidMessage(decode);
                                        break;
                                    case 3:
                                        handlePublish((PublishMessage) decode);
                                        break;
                                    case 4:
                                    case 7:
                                        handleConfirmDelivery((MessageIdMessage) decode);
                                        break;
                                    case 5:
                                        handleAckDelivery((MessageIdMessage) decode);
                                        break;
                                    case 6:
                                        handleConfirmPublish((PubRelMessage) decode);
                                        break;
                                    case 8:
                                        handleSubscribe((SubscribeMessage) decode);
                                        break;
                                    case 10:
                                        handleUnsubscribe((UnsubscribeMessage) decode);
                                        break;
                                    case 12:
                                        handlePing();
                                        break;
                                    case 14:
                                        handleDisconnect();
                                        break;
                                }
                            } else if (byteBuffer.countUnread() == 0) {
                                byteBuffer.clear();
                            }
                        }
                    } catch (MqttConnectException e) {
                        ConnAckMessage connAckMessage = new ConnAckMessage();
                        connAckMessage.setReturnCode(e.getReturnCode());
                        this.m_connection.send(connAckMessage);
                        this.m_connection.fatalError();
                    }
                }
                if (!isShuttingDown()) {
                    if (this.DEBUG) {
                        BrokerComponent.getComponentContext().logMessage("Detected EOF on MQTT connection with clientId = " + this.m_connection.getClientId(), 7);
                    }
                    this.m_connection.fatalError();
                }
            }
        } catch (SocketException | SocketTimeoutException e2) {
            if (isShuttingDown()) {
                return;
            }
            BrokerComponent.getComponentContext().logMessage("Detected timeout on MQTT connection with clientId = " + this.m_connection.getClientId(), 2);
            this.m_connection.fatalError();
        } catch (Exception e3) {
            BrokerComponent.getComponentContext().logMessage(e3, 2);
            if (this.DEBUG) {
                debug("Caught " + e3);
            }
            if (isShuttingDown()) {
                return;
            }
            this.m_connection.fatalError();
        }
    }

    private void handlePublish(PublishMessage publishMessage) throws IOException {
        this.m_connection.getSession().publish(publishMessage);
    }

    private void handleConfirmPublish(PubRelMessage pubRelMessage) {
        this.m_connection.getSession().confirmPublish(pubRelMessage);
    }

    private void handleConfirmDelivery(MessageIdMessage messageIdMessage) {
        this.m_connection.getSession().confirmDelivery(messageIdMessage);
    }

    private void handleAckDelivery(MessageIdMessage messageIdMessage) throws IOException {
        this.m_connection.getSession().ackDelivery(messageIdMessage);
    }

    private void handleSubscribe(SubscribeMessage subscribeMessage) throws IOException {
        SubAckMessage subAckMessage = new SubAckMessage();
        subAckMessage.setMessageId(subscribeMessage.getMessageId());
        Iterator<MqttSubscription> it = subscribeMessage.subscriptions().iterator();
        while (it.hasNext()) {
            subAckMessage.addGrantedQos(this.m_connection.getSession().subscribe(it.next()));
        }
        this.m_connection.send(subAckMessage);
    }

    private void handleUnsubscribe(UnsubscribeMessage unsubscribeMessage) throws IOException {
        UnsubAckMessage unsubAckMessage = new UnsubAckMessage();
        unsubAckMessage.setMessageId(unsubscribeMessage.getMessageId());
        Iterator<String> it = unsubscribeMessage.topicFilters().iterator();
        while (it.hasNext()) {
            this.m_connection.getSession().unsubscribe(it.next());
        }
        this.m_connection.send(unsubAckMessage);
    }

    private void handleInvalidMessage(MqttMessage mqttMessage) {
        BrokerComponent.getComponentContext().logMessage("MQTT message (" + ((int) mqttMessage.getMessageType()) + ") not supported by the broker", 2);
        this.m_connection.fatalError();
    }

    private void handleDisconnect() {
        this.m_connection.close();
    }

    private void handlePing() throws IOException {
        this.m_connection.send(new PingRespMessage());
    }

    private void handleConnect(ConnectMessage connectMessage) throws IOException {
        if (this.m_connection.isConnected()) {
            this.m_connection.fatalError();
            return;
        }
        ConnAckMessage connAckMessage = new ConnAckMessage();
        try {
            connAckMessage.setSessionPresent(!connectMessage.isCleanSession() && MqttRegistrar.isSessionPresent(connectMessage.getClientID()));
            this.m_connection.connectInternal(connectMessage);
            connAckMessage.setReturnCode((byte) 0);
            this.m_connection.send(connAckMessage);
        } catch (EAlreadyConnected | EConnectionLimitExceeded | ESecurityGeneralException | EUserAlreadyConnected e) {
            connAckMessage.setReturnCode((byte) 5);
            this.m_connection.send(connAckMessage);
            this.m_connection.fatalError();
        } catch (EAnonymousConnectionDisallowed | ECredentialInUse | EInauthenticBroker | EInauthenticClient | EPasswordExpired e2) {
            connAckMessage.setReturnCode((byte) 4);
            this.m_connection.send(connAckMessage);
            this.m_connection.fatalError();
        } catch (IOException e3) {
            connAckMessage.setReturnCode((byte) 3);
            this.m_connection.send(connAckMessage);
            this.m_connection.fatalError();
        }
    }

    private void initialize() throws IOException {
        ISocket socket = this.m_connection.m481getSocket();
        socket.setSoTimeout(Config.BROKER_CONNECT_TIMEOUT);
        if (this.DEBUG) {
            logDebug(128, "Starting transport handshake");
        }
        socket.startTransportHandshake();
        if (this.DEBUG) {
            logDebug(128, "Starting MQTT handshake");
        }
        socket.startProgressHandshake();
        this.m_is = StreamFactory.getListenerInputStream(socket, this.m_connection.getMaxRcvBufferSize(), this.m_connection.getMinRcvBufferSize(), this.m_connection.getInitialRcvBufferSize(), false);
        if (this.DEBUG) {
            logDebug(64, "Created InputStream " + this.m_is);
        }
    }
}
