package progress.message.broker.stomp;

import com.sonicsw.mq.components.BrokerComponent;
import java.io.IOException;
import java.io.InputStream;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.springframework.messaging.Message;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompDecoder;
import progress.message.broker.Config;
import progress.message.broker.protocol.proto.ProtocolException;
import progress.message.broker.stomp.proto.IStompReceiptHolder;
import progress.message.broker.stomp.proto.StompAckMessage;
import progress.message.broker.stomp.proto.StompConnectMessage;
import progress.message.broker.stomp.proto.StompConnectedMessage;
import progress.message.broker.stomp.proto.StompDisconnectMessage;
import progress.message.broker.stomp.proto.StompException;
import progress.message.broker.stomp.proto.StompMessage;
import progress.message.broker.stomp.proto.StompNackMessage;
import progress.message.broker.stomp.proto.StompReceiptMessage;
import progress.message.broker.stomp.proto.StompSendMessage;
import progress.message.broker.stomp.proto.StompSubscribeMessage;
import progress.message.broker.stomp.proto.StompUnsubscribeMessage;
import progress.message.client.EAlreadyConnected;
import progress.message.client.EAnonymousConnectionDisallowed;
import progress.message.client.EConnectionLimitExceeded;
import progress.message.client.ECredentialInUse;
import progress.message.client.EInauthenticBroker;
import progress.message.client.EInauthenticClient;
import progress.message.client.EPasswordExpired;
import progress.message.client.ESecurityGeneralException;
import progress.message.client.EUserAlreadyConnected;
import progress.message.net.ISocket;
import progress.message.strm.StreamFactory;
import progress.message.util.StringUtil;
import progress.message.zclient.Listener;
import progress.message.zclient.SessionConfig;
import sun.reflect.generics.reflectiveObjects.NotImplementedException;

/* loaded from: input_file:progress/message/broker/stomp/StompListener.class */
public class StompListener extends Listener {
    private final StompBrokerConnection m_connection;
    private InputStream m_is;

    public StompListener(StompBrokerConnection stompBrokerConnection) throws IOException {
        super("StompListener of", stompBrokerConnection);
        this.m_connection = stompBrokerConnection;
    }

    @Override // progress.message.zclient.DebugThread
    public void threadMain() {
        int read;
        try {
            if (this.DEBUG) {
                logDebug(128, "Starting STOMP Listener");
            }
            initialize();
            StompDecoder stompDecoder = new StompDecoder();
            while (!isShuttingDown()) {
                int maxRcvBufferSize = this.m_connection.getMaxRcvBufferSize();
                ByteBuffer allocate = ByteBuffer.allocate(maxRcvBufferSize);
                byte[] bArr = new byte[maxRcvBufferSize];
                while (!isShuttingDown() && (read = this.m_is.read(bArr)) != -1) {
                    try {
                        allocate.limit(allocate.capacity());
                        if (allocate.remaining() < read) {
                            ByteBuffer allocate2 = ByteBuffer.allocate(allocate.capacity() + read);
                            allocate2.put(allocate.array());
                            allocate2.position(allocate.position());
                            allocate = allocate2;
                        }
                        allocate.put(bArr, 0, read);
                        int position = allocate.position();
                        allocate.rewind();
                        allocate.limit(position);
                        LinkedList linkedList = new LinkedList();
                        try {
                            if (linkedList.addAll(stompDecoder.decode(allocate))) {
                                handleExtractedByteMessages(linkedList);
                                ByteBuffer allocate3 = ByteBuffer.allocate(allocate.capacity());
                                allocate3.put(allocate.array(), allocate.position(), allocate.remaining());
                                allocate = allocate3;
                            } else {
                                allocate.position(position);
                            }
                        } catch (Exception e) {
                            throw new StompException("Client STOMP messages decode error: (" + e.getClass().getSimpleName() + ") " + e.getLocalizedMessage());
                            break;
                        }
                    } catch (ProtocolException e2) {
                        this.m_connection.closeStompClientWithFatalError(e2.getLocalizedMessage());
                    }
                }
                if (!isShuttingDown()) {
                    if (this.DEBUG) {
                        BrokerComponent.getComponentContext().logMessage("Detected EOF on STOMP connection with clientId = " + this.m_connection.getClientId(), 7);
                    }
                    this.m_connection.fatalError();
                }
            }
        } catch (SocketException | SocketTimeoutException e3) {
            if (isShuttingDown()) {
                return;
            }
            BrokerComponent.getComponentContext().logMessage("Detected timeout on STOMP connection with clientId = " + this.m_connection.getClientId(), 2);
            this.m_connection.fatalError();
        } catch (Exception e4) {
            BrokerComponent.getComponentContext().logMessage(e4, 2);
            if (this.DEBUG) {
                debug("Caught " + e4);
            }
            if (isShuttingDown()) {
                return;
            }
            this.m_connection.fatalError();
        }
    }

    private void handleExtractedByteMessages(List<Message<byte[]>> list) throws IOException {
        Iterator<Message<byte[]>> it = list.iterator();
        while (it.hasNext()) {
            StompMessage decode = progress.message.broker.stomp.codec.StompDecoder.decode(it.next());
            if ((this.debugFlags & 65536) > 0) {
                synchronized (SessionConfig.getLog()) {
                    logDebug(65536, "STOMP msg RECEIVED: " + decode.dump());
                }
            }
            this.m_connection.messageReceived(decode);
            StompCommand messageType = decode.getMessageType();
            if (messageType == StompCommand.STOMP) {
                handleConnect((StompConnectMessage) decode);
            } else if (messageType == StompCommand.CONNECT) {
                handleConnect((StompConnectMessage) decode);
            } else if (messageType == StompCommand.DISCONNECT) {
                handleDisconnect((StompDisconnectMessage) decode);
            } else if (messageType == StompCommand.SUBSCRIBE) {
                handleSubscribe((StompSubscribeMessage) decode);
            } else if (messageType == StompCommand.UNSUBSCRIBE) {
                handleUnsubscribe((StompUnsubscribeMessage) decode);
            } else if (messageType == StompCommand.SEND) {
                handleSend((StompSendMessage) decode);
            } else if (messageType == StompCommand.ACK) {
                handleConfirmDelivery((StompAckMessage) decode);
            } else if (messageType == StompCommand.NACK) {
                handleRejectDelivery((StompNackMessage) decode);
            } else {
                handleInvalidMessage(decode);
            }
        }
    }

    private void handleRejectDelivery(StompNackMessage stompNackMessage) {
        throw new NotImplementedException();
    }

    private void handleSend(StompSendMessage stompSendMessage) throws IOException {
        this.m_connection.getSession().send(stompSendMessage);
        replyClientWithReceiptIfClaimed(stompSendMessage);
    }

    private void handleConfirmDelivery(StompAckMessage stompAckMessage) {
        this.m_connection.getSession().confirmDelivery(stompAckMessage);
    }

    private void handleSubscribe(StompSubscribeMessage stompSubscribeMessage) throws IOException {
        this.m_connection.getSession().subscribe(stompSubscribeMessage.getDestination(), stompSubscribeMessage.getSubscriptionId(), stompSubscribeMessage.getAck());
        replyClientWithReceiptIfClaimed(stompSubscribeMessage);
    }

    private void handleUnsubscribe(StompUnsubscribeMessage stompUnsubscribeMessage) throws IOException {
        this.m_connection.getSession().unsubscribe(stompUnsubscribeMessage.getSubscriptionId());
        replyClientWithReceiptIfClaimed(stompUnsubscribeMessage);
    }

    private void handleInvalidMessage(StompMessage stompMessage) {
        String str = "STOMP message (" + stompMessage.getMessageType() + ") not supported by the broker";
        BrokerComponent.getComponentContext().logMessage(str, 2);
        this.m_connection.closeStompClientWithFatalError(str);
    }

    private void handleDisconnect(StompDisconnectMessage stompDisconnectMessage) throws IOException {
        replyClientWithReceiptIfClaimed(stompDisconnectMessage);
        this.m_connection.close();
    }

    private void replyClientWithReceiptIfClaimed(IStompReceiptHolder iStompReceiptHolder) throws IOException {
        if (!this.m_connection.isConnected()) {
            this.m_connection.fatalError();
            return;
        }
        String receipt = iStompReceiptHolder.getReceipt();
        if (StringUtil.isNullOrEmpty(receipt)) {
            return;
        }
        StompReceiptMessage stompReceiptMessage = new StompReceiptMessage();
        stompReceiptMessage.setReceipt(receipt);
        this.m_connection.send(stompReceiptMessage);
    }

    private void handleConnect(StompConnectMessage stompConnectMessage) {
        if (this.m_connection.isConnected()) {
            this.m_connection.fatalError();
            return;
        }
        try {
            StompConnectedMessage stompConnectedMessage = new StompConnectedMessage();
            stompConnectedMessage.setSessionPresent(!stompConnectMessage.isCleanSession() && StompRegistrar.isSessionPresent(stompConnectMessage.getClientID()));
            this.m_connection.connectInternal(stompConnectMessage);
            this.m_connection.send(stompConnectedMessage);
        } catch (EAlreadyConnected | EConnectionLimitExceeded | ESecurityGeneralException | EUserAlreadyConnected e) {
            String str = "Authorization error (" + e.getClass().getSimpleName() + "): " + e.getLocalizedMessage();
            BrokerComponent.getComponentContext().logMessage(str, e, 2);
            this.m_connection.closeStompClientWithFatalError(str);
        } catch (EAnonymousConnectionDisallowed | ECredentialInUse | EInauthenticBroker | EInauthenticClient | EPasswordExpired e2) {
            String str2 = "Authentication error (" + e2.getClass().getSimpleName() + "): " + e2.getLocalizedMessage();
            BrokerComponent.getComponentContext().logMessage(str2, e2, 2);
            this.m_connection.closeStompClientWithFatalError(str2);
        } catch (IOException e3) {
            String str3 = "Server unavailable (" + e3.getClass().getSimpleName() + "): " + e3.getLocalizedMessage();
            BrokerComponent.getComponentContext().logMessage(str3, e3, 2);
            this.m_connection.closeStompClientWithFatalError(str3);
        }
    }

    private void initialize() throws IOException {
        ISocket socket = this.m_connection.m490getSocket();
        socket.setSoTimeout(Config.BROKER_CONNECT_TIMEOUT);
        if (this.DEBUG) {
            logDebug(128, "Starting transport handshake");
        }
        socket.startTransportHandshake();
        if (this.DEBUG) {
            logDebug(128, "Starting STOMP handshake");
        }
        socket.startProgressHandshake();
        this.m_is = StreamFactory.getListenerInputStream(socket, this.m_connection.getMaxRcvBufferSize(), this.m_connection.getMinRcvBufferSize(), this.m_connection.getInitialRcvBufferSize(), false);
        if (this.DEBUG) {
            logDebug(64, "Created InputStream " + this.m_is);
        }
    }
}
