/*
 * Decompiled with CFR 0.152.
 */
package progress.message.broker;

import com.sonicsw.mq.components.BrokerManagementNotificationsHelper;
import java.util.ArrayList;
import progress.message.broker.AgentRegistrar;
import progress.message.broker.Broker;
import progress.message.broker.ClientContextMgramQueue;
import progress.message.broker.Config;
import progress.message.broker.IClientContext;
import progress.message.zclient.DebugThread;
import progress.message.zclient.ITimeOnBufferTracker;

public class SlowSubscriberMonitor
extends DebugThread {
    private final int m_pollingIntervalInSeconds;
    private final long m_timeInBufferThreshold;
    private AgentRegistrar m_reg;

    SlowSubscriberMonitor(AgentRegistrar reg, int pollingIntervalInSeconds, long timeInBufferThreshold) {
        super("SlowSubscriberMonitor");
        this.m_reg = reg;
        this.m_pollingIntervalInSeconds = pollingIntervalInSeconds;
        this.m_timeInBufferThreshold = timeInBufferThreshold;
        this.start();
    }

    @Override
    public void start() {
        if (SlowSubscriberMonitor.isEnabled()) {
            super.start();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void threadMain() throws InterruptedException {
        if (this.DEBUG) {
            this.debug("Thread started. m_pollingInterval = " + this.m_pollingIntervalInSeconds + " secs," + " m_timeInBufferThreshold" + this.m_timeInBufferThreshold + " ms.");
        }
        try {
            while (!Broker.exiting && !this.isInterrupted()) {
                SlowSubscriberMonitor slowSubscriberMonitor = this;
                synchronized (slowSubscriberMonitor) {
                    this.wait(this.m_pollingIntervalInSeconds * 1000);
                }
                if (this.DEBUG) {
                    this.debug("m_pollingInterval = " + this.m_pollingIntervalInSeconds + " (seconds)," + " m_timeInBufferThreshold = " + this.m_timeInBufferThreshold + " (milliseconds).");
                }
                this.doCheck();
            }
        }
        catch (InterruptedException ie) {
            if (!Broker.exiting) {
                throw ie;
            }
        }
        finally {
            if (this.DEBUG) {
                this.debug("Thread exiting, broker exiting = " + Broker.exiting);
            }
        }
    }

    public static boolean isEnabled() {
        return Config.SLOW_SUBSCRIBER_MONITOR_POLL_INTERVAL > 0;
    }

    private void doCheck() {
        ArrayList ccs = this.m_reg.getClientContexts();
        for (IClientContext cc : ccs) {
            long maxTimeOnBuffer;
            ClientContextMgramQueue outQueue = cc.getOutQueue();
            ITimeOnBufferTracker tracker = outQueue != null ? outQueue.getTimeOnBufferTracker() : null;
            if (tracker == null || (maxTimeOnBuffer = tracker.getAndResetMaxTimeMs()) < this.m_timeInBufferThreshold) continue;
            this.generateNotification(cc, maxTimeOnBuffer);
        }
    }

    private void generateNotification(IClientContext clientContext, long maxTimeOnBufferValueMs) {
        BrokerManagementNotificationsHelper.sendSlowSubscriberNotification(clientContext, this.m_timeInBufferThreshold, maxTimeOnBufferValueMs);
        if (this.DEBUG) {
            this.debug("Buffer time exceeded by " + clientContext.getAppid() + ": " + maxTimeOnBufferValueMs + "ms");
        }
    }
}

