/*
 * Decompiled with CFR 0.152.
 */
package progress.message.broker.mqtt.agent;

import com.sonicsw.mq.components.BrokerComponent;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import progress.message.broker.Config;
import progress.message.broker.RemotePipeSocket;
import progress.message.broker.mqtt.MqttBrokerConnection;
import progress.message.broker.mqtt.MqttJmsUtils;
import progress.message.broker.mqtt.MqttSession;
import progress.message.broker.mqtt.agent.MqttAgentConnection;
import progress.message.broker.mqtt.agent.MqttAgentSubscription;
import progress.message.broker.mqtt.agent.MqttClientConnection;
import progress.message.broker.mqtt.agent.MqttConnectionDroppedHandler;
import progress.message.broker.mqtt.agent.MqttPublicationListener;
import progress.message.broker.mqtt.agent.MqttSubscriptionHandler;
import progress.message.broker.mqtt.proto.MessageIdMessage;
import progress.message.broker.mqtt.proto.MqttConnectException;
import progress.message.broker.mqtt.proto.MqttQoS;
import progress.message.broker.mqtt.proto.MqttSubscription;
import progress.message.broker.mqtt.proto.PublishMessage;
import progress.message.client.Credentials;
import progress.message.client.EGeneralException;
import progress.message.jimpl.Session;
import progress.message.zclient.ClientData;
import progress.message.zclient.Connection;
import progress.message.zclient.Envelope;
import progress.message.zclient.IMessageHandler;
import progress.message.zclient.Label;
import progress.message.zclient.MessageHandler;
import progress.message.zclient.SessionConfig;
import progress.message.zclient.Subject;
import progress.message.zclient.Subscription;

public class MqttAgentSession
extends MqttSession {
    private final Object m_connectionLock = new Object();
    private final Object m_subscriptionsLock = new Object();
    private final Object m_deliveryTrackingLock = new Object();
    private final Map<String, MqttAgentSubscription> m_subscriptions = new ConcurrentHashMap<String, MqttAgentSubscription>();
    private final Map<Integer, Envelope> m_deliveryTracking = new ConcurrentHashMap<Integer, Envelope>();
    private MqttAgentConnection m_agentConnection;
    private MqttClientConnection m_clientConnection;
    private Connection m_session;
    private String m_username;
    private String m_password;
    private boolean m_cleanSession;

    public MqttAgentSession(String clientId) {
        super(clientId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void restore(MqttBrokerConnection connection, String username, String password) throws IOException {
        Object object = this.m_connectionLock;
        synchronized (object) {
            if (!this.validateCredentials(username, password)) {
                throw new MqttConnectException("Invalid username and password", 4);
            }
            if (!this.m_clientConnection.isConnected() || this.m_clientConnection.getConnectionDropped()) {
                throw new MqttConnectException("Session not connected", 3);
            }
            this.setConnection(connection);
            RemotePipeSocket remoteBrokerSocket = (RemotePipeSocket)this.m_agentConnection.getSocket();
            remoteBrokerSocket.setRemoteSocket(connection.getSocket());
            RemotePipeSocket remoteClientSocket = (RemotePipeSocket)this.m_clientConnection.getSocket();
            remoteClientSocket.setRemoteSocket(connection.getSocket());
            this.m_agentConnection.setAcceptor(connection.getAcceptor());
            this.start();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void connect(MqttBrokerConnection connection, String username, String password) throws IOException {
        Object object = this.m_connectionLock;
        synchronized (object) {
            this.setConnection(connection);
            this.m_username = username;
            this.m_password = password;
            this.m_cleanSession = connection.isCleanSession();
            RemotePipeSocket internalServerSocket = new RemotePipeSocket(this.getConnection().getSocket());
            this.m_agentConnection = new MqttAgentConnection(this.getConnection(), internalServerSocket);
            this.m_agentConnection.setAcceptor(this.getConnection().getAcceptor());
            this.m_agentConnection.startListener();
            RemotePipeSocket internalClientSocket = new RemotePipeSocket(this.getConnection().getSocket(), internalServerSocket);
            String appId = this.getClientId() + "$CONNECTION$";
            Credentials credentials = new Credentials(username, password);
            MessageHandler messageHandler = new MessageHandler(null, true, true);
            messageHandler.setGuaranteed(true);
            this.m_clientConnection = new MqttClientConnection(internalClientSocket, appId, credentials, messageHandler);
            String connectionDroppedSubject = SessionConfig.getClientPrefix() + "brokerConnectionDropped";
            MqttConnectionDroppedHandler connectionDroppedHandler = new MqttConnectionDroppedHandler(this);
            messageHandler.bind(connectionDroppedSubject, (IMessageHandler)connectionDroppedHandler);
            String clientHost = internalServerSocket.getRemoteAddress();
            ClientData clientData = new ClientData(clientHost, 0, this.getClientId(), null);
            this.m_clientConnection.setClientData(clientData);
            this.m_clientConnection.setAuthenticationSPIEnabled(false);
            this.m_clientConnection.setEnableLoadBalancing(false);
            this.m_clientConnection.setEnableFaultTolerance(false);
            this.m_clientConnection.setFlowControlDisabled(false);
            this.m_clientConnection.setMaxRcvBufferSize(this.getConnection().getMaxRcvBufferSize());
            this.m_clientConnection.setMaxSendBufferSize(this.getConnection().getMaxSendBufferSize());
            this.m_clientConnection.setMinRcvBufferSize(this.getConnection().getMinRcvBufferSize());
            this.m_clientConnection.setMinSendBufferSize(this.getConnection().getMinSendBufferSize());
            this.m_clientConnection.setInitialRcvBufferSize(this.getConnection().getInitialRcvBufferSize());
            this.m_clientConnection.setInitialSendBufferSize(this.getConnection().getInitialSendBufferSize());
            this.m_clientConnection.connect("");
            this.startStopConnection(this.m_clientConnection, false);
            this.createSession();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public MqttQoS subscribe(MqttSubscription mqttSubscription) throws IOException {
        Objects.requireNonNull(mqttSubscription);
        String jmsTopic = MqttJmsUtils.convertToJmsTopic(mqttSubscription.getTopicFilter());
        if (!this.m_session.getDefaultSession().isSubscribeAllowed(jmsTopic)) {
            return MqttQoS.FAILURE;
        }
        MqttQoS qos = mqttSubscription.getQos();
        Object object = this.m_subscriptionsLock;
        synchronized (object) {
            if (this.m_subscriptions.containsKey(jmsTopic)) {
                if (this.m_cleanSession) {
                    this.unsubscribe(mqttSubscription.getTopicFilter());
                    MqttAgentSubscription subscription = this.createSubscription(jmsTopic, qos);
                    this.m_subscriptions.put(jmsTopic, subscription);
                } else {
                    MqttAgentSubscription subscription = this.m_subscriptions.get(jmsTopic);
                    subscription.getSubscriptionHandler().setStartDelivery(true);
                }
            } else {
                MqttAgentSubscription subscription = this.createSubscription(jmsTopic, qos);
                this.m_subscriptions.put(jmsTopic, subscription);
            }
        }
        return qos;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void unsubscribe(String topicFilter) throws IOException {
        Objects.requireNonNull(topicFilter);
        String jmsTopic = MqttJmsUtils.convertToJmsTopic(topicFilter);
        Object object = this.m_subscriptionsLock;
        synchronized (object) {
            if (this.m_subscriptions.containsKey(jmsTopic)) {
                MqttAgentSubscription subscription = this.m_subscriptions.get(jmsTopic);
                subscription.unsubscribe();
                this.m_subscriptions.remove(jmsTopic);
            }
        }
    }

    @Override
    protected void publishToSubscribers(PublishMessage message) {
        try {
            Envelope envelope = MqttJmsUtils.createJmsMessage(message);
            envelope.setAsyncDelivery(true);
            MessageIdMessage ackMessage = this.createAckPublishMessage(message);
            MqttPublicationListener publicationListener = new MqttPublicationListener(this, ackMessage);
            this.m_session.getDefaultSession().publishMs(envelope, 0, true, publicationListener);
        }
        catch (EGeneralException ex) {
            String msg = "Failed to publish MQTT message = " + message + ", error = " + ex.toString();
            BrokerComponent.getComponentContext().logMessage(msg, (Throwable)ex, 2);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void trackDelivery(PublishMessage message, Envelope envelope) {
        super.trackDelivery(message, envelope);
        Object object = this.m_deliveryTrackingLock;
        synchronized (object) {
            this.m_deliveryTracking.put(message.getMessageId(), envelope);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void confirmDelivery(MessageIdMessage message) {
        super.confirmDelivery(message);
        Object object = this.m_deliveryTrackingLock;
        synchronized (object) {
            Envelope envelope = this.m_deliveryTracking.get(message.getMessageId());
            if (envelope != null) {
                try {
                    this.m_session.getDefaultSession().acknowledge(envelope);
                    envelope.handlerDone(envelope.isGuaranteed());
                    this.m_deliveryTracking.remove(message.getMessageId());
                }
                catch (EGeneralException ex) {
                    String msg = "Failed to acknowledge MQTT message = " + message + " in session = " + this.m_session;
                    BrokerComponent.getComponentContext().logMessage(msg, (Throwable)ex, 2);
                }
            }
        }
    }

    @Override
    public void stop() {
        this.startStopConnections(true);
    }

    @Override
    public void start() {
        this.startStopConnections(false);
    }

    @Override
    protected void closeInternal() {
        super.closeInternal();
        this.clearDeliveryTracking();
        this.clearSubscriptions();
        this.clearConnections();
    }

    private boolean validateCredentials(String username, String password) {
        return this.m_username.equals(username) && this.m_password.equals(password);
    }

    private void createSession() throws EGeneralException {
        MessageHandler messageHandler = new MessageHandler(null, true, true);
        messageHandler.setGuaranteed(true);
        String appId = "$SESSION$";
        long parentId = this.m_clientConnection.getClientId();
        this.m_session = new Connection(appId, null, messageHandler);
        this.m_session.setDeliveryCloseTimeout(Config.MQTT_DELIVERY_TIMEOUT);
        this.m_session.setReturnDbLimitExceptions(2);
        this.m_session.connect(this.m_clientConnection, parentId);
        this.startStopConnection(this.m_session, false);
    }

    private MqttAgentSubscription createSubscription(String jmsTopic, MqttQoS qos) throws EGeneralException {
        MessageHandler messageHandler = new MessageHandler(null, true, true);
        messageHandler.setGuaranteed(true);
        boolean durable = !this.m_cleanSession && (qos == MqttQoS.AT_LEAST_ONCE || qos == MqttQoS.EXACTLY_ONCE);
        String appId = durable ? SessionConfig.createDurableAppid(this.getClientId(), jmsTopic, false) : "$NONDURABLE$";
        long parentId = this.m_session.getClientId();
        Connection consumerConnection = new Connection(appId, null, messageHandler);
        short ackMode = Session.j2zAckMode(MqttJmsUtils.convertJmsAckMode(qos), false);
        consumerConnection.setAckMode(ackMode);
        consumerConnection.connect(this.m_clientConnection, parentId);
        Subject subject = new Subject(jmsTopic);
        Label label = new Label();
        label.setPersistent(durable);
        Subscription subscription = messageHandler.getSession().submitJMSSubscription(subject, label, null, false, false, 0, durable);
        MqttSubscriptionHandler subscriptionHandler = new MqttSubscriptionHandler(this, qos);
        subscriptionHandler.setStartDelivery(true);
        messageHandler.bind(subject, (IMessageHandler)subscriptionHandler);
        this.startStopConnection(consumerConnection, false);
        return new MqttAgentSubscription(consumerConnection, messageHandler, subscription, subscriptionHandler);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void startStopConnections(boolean stop) {
        this.startStopConnection(this.m_clientConnection, stop);
        this.startStopConnection(this.m_session, stop);
        Object object = this.m_subscriptionsLock;
        synchronized (object) {
            for (MqttAgentSubscription subscription : this.m_subscriptions.values()) {
                this.startStopConnection(subscription.getConsumerConnection(), stop);
                if (!stop) continue;
                subscription.getSubscriptionHandler().setStartDelivery(!stop);
            }
        }
    }

    private void startStopConnection(Connection connection, boolean stop) {
        try {
            if (stop && connection.getDeliveryStarted()) {
                connection.stopDelivery();
            } else if (!stop && !connection.getDeliveryStarted()) {
                connection.startDelivery();
            }
        }
        catch (EGeneralException ex) {
            BrokerComponent.getComponentContext().logMessage("Failed to start/stop connection id = " + connection, (Throwable)ex, 2);
        }
    }

    private void clearConnection(Connection connection) {
        try {
            if (connection.isConnected()) {
                connection.disconnect(false);
            }
        }
        catch (EGeneralException ex) {
            BrokerComponent.getComponentContext().logMessage("Failed to clear connection id = " + connection, (Throwable)ex, 2);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void clearSubscriptions() {
        Object object = this.m_subscriptionsLock;
        synchronized (object) {
            try {
                for (MqttAgentSubscription subscription : this.m_subscriptions.values()) {
                    this.clearConnection(subscription.getConsumerConnection());
                }
            }
            finally {
                this.m_subscriptions.clear();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void clearConnections() {
        Object object = this.m_connectionLock;
        synchronized (object) {
            this.clearConnection(this.m_session);
            if (this.m_clientConnection != null) {
                this.m_clientConnection.close();
            }
            if (this.m_agentConnection != null) {
                this.m_agentConnection.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void clearDeliveryTracking() {
        Object object = this.m_deliveryTrackingLock;
        synchronized (object) {
            this.m_deliveryTracking.clear();
        }
    }
}

