/*
 * Decompiled with CFR 0.152.
 */
package progress.message.broker;

import java.util.ArrayList;
import java.util.Date;
import java.util.Enumeration;
import java.util.Hashtable;
import progress.message.broker.AgentQueueProcessor;
import progress.message.broker.AgentRegistrar;
import progress.message.broker.AgentRoutingQueue;
import progress.message.broker.Broker;
import progress.message.broker.CleanupListItem;
import progress.message.broker.CleanupListItemMid;
import progress.message.broker.Config;
import progress.message.broker.IAgentQueue;
import progress.message.broker.INeighbor;
import progress.message.broker.IRemoteBroker;
import progress.message.broker.prAccessor;
import progress.message.gr.RemoteBroker;
import progress.message.gr.RouterManager;
import progress.message.interbroker.Interbroker;
import progress.message.msg.IMgram;
import progress.message.resources.prMessageFormat;
import progress.message.util.LinkedList;
import progress.message.zclient.DebugThread;

public class QueueCleanupThread
extends DebugThread {
    private AgentQueueProcessor m_aqp = null;
    private LinkedList m_cleanupList;
    private ArrayList m_deletedRemoteSubscriptions;
    private boolean m_explicitlyAwakened = false;
    private long m_nextCleanUpTime;
    private long m_cleanupInterval;
    private int qsize = 0;
    private long items;
    private long itemsMid;
    private long noops;

    QueueCleanupThread(AgentQueueProcessor aqp) {
        super("QueueCleanupThread");
        this.m_aqp = aqp;
        this.m_cleanupList = new LinkedList();
        this.m_explicitlyAwakened = false;
        this.m_deletedRemoteSubscriptions = new ArrayList();
        this.m_cleanupInterval = Config.QUEUE_CLEANUP_INTERVAL * 1000L;
        if (this.DEBUG) {
            this.debug("Cleanup Interval= " + this.m_cleanupInterval);
        }
    }

    public synchronized void notifyRemoteSubscriptionDeletion(String node) {
        CleanupListItem cli = new CleanupListItem(node, true);
        this.m_cleanupList.append(cli);
        ++this.qsize;
        if (this.DEBUG) {
            this.debug("Request cleanup of remote subscriptions for node:" + node);
        }
        if (this.checkDebugFlags(64)) {
            ++this.items;
            if (this.qsize % 5000 == 0) {
                this.debug(" qsize = " + this.qsize + " items= " + this.items + " itemsMid= " + this.itemsMid + " noops= " + this.noops);
            }
        }
        this.notifyAll();
    }

    void addExpiredMsg(IMgram m) {
        this.addMsgForCleanup(m, 1);
    }

    void addExpiredMsg(IMgram m, String qname, long mid) {
        this.addExpiredMsg(m, qname, mid, 1);
    }

    void addExpiredMsg(IMgram m, String qname, long mid, int reason) {
        if (m != null) {
            this.addMsgForCleanup(m, reason);
        } else {
            this.addMsgForCleanup(qname, mid, reason);
        }
    }

    void addDeletedRemoteSubscriptionMsg(IMgram m) {
        this.addMsgForCleanup(m, -1);
    }

    public void addMsgForCleanup(IMgram m, String qname, long mid, int reason) {
        if (m != null) {
            this.addMsgForCleanup(m, reason);
        } else {
            this.addMsgForCleanup(qname, mid, reason);
        }
    }

    public synchronized void addMsgForCleanup(IMgram m, int reason) {
        CleanupListItem cli = new CleanupListItem(m, reason);
        this.m_cleanupList.append(cli);
        ++this.qsize;
        if (this.DEBUG) {
            this.debug("Added for cleanup " + cli + " qsize = " + this.qsize);
        }
        if (this.checkDebugFlags(64)) {
            ++this.items;
            if (this.qsize % 5000 == 0) {
                this.debug(" qsize = " + this.qsize + " items= " + this.items + " itemsMid= " + this.itemsMid + " noops= " + this.noops);
            }
        }
        this.notifyAll();
    }

    public synchronized void addMsgForCleanup(String qname, long mid, int reason) {
        CleanupListItemMid cli = new CleanupListItemMid(qname, mid, reason);
        this.m_cleanupList.append(cli);
        ++this.qsize;
        if (this.DEBUG) {
            this.debug("Added for cleanup " + cli + " qsize = " + this.qsize);
        }
        if (this.checkDebugFlags(64)) {
            ++this.itemsMid;
            if (this.qsize % 5000 == 0) {
                this.debug(" qsize = " + this.qsize + " items= " + this.items + " itemsMid= " + this.itemsMid + " noops= " + this.noops);
            }
        }
        this.notifyAll();
    }

    private synchronized CleanupListItem getNextMsg(boolean wait) throws InterruptedException {
        CleanupListItem item;
        while (true) {
            long timeToWait;
            if ((item = (CleanupListItem)this.m_cleanupList.dequeue()) != null) {
                --this.qsize;
                if (this.DEBUG) {
                    this.debug("Removed for cleanup " + item + " qsize= " + this.qsize);
                }
                if (!this.checkDebugFlags(64) || this.qsize % 5000 != 0) break;
                this.debug(" qsize = " + this.qsize + " items= " + this.items + " itemsMid= " + this.itemsMid + " noops= " + this.noops);
                break;
            }
            if (!wait) break;
            if (Config.ENABLE_DYNAMIC_QUEUE_CLEANUP) {
                long time = System.currentTimeMillis();
                timeToWait = this.m_nextCleanUpTime - time;
                if (timeToWait <= 0L) {
                    break;
                }
            } else {
                timeToWait = this.m_cleanupInterval;
            }
            if (this.DEBUG) {
                this.debug("Waiting " + timeToWait + " msecs");
            }
            this.wait(timeToWait);
        }
        return item;
    }

    private void processMsgs(boolean wait) throws InterruptedException {
        CleanupListItem item;
        int ct = 0;
        while ((item = this.getNextMsg(wait)) != null) {
            IMgram m;
            if (item.includesWakeUp()) {
                this.m_explicitlyAwakened = true;
                wait = false;
            }
            if ((m = item.getMgram()) != null) {
                if (this.DEBUG) {
                    this.debug("Processing message " + m.getGuarenteedTrackingNum() + " reason=" + item.getReason());
                }
                Hashtable<String, String> errorInfo = null;
                if (item.getReason() == 28) {
                    errorInfo = new Hashtable<String, String>();
                    Object[] obj = new Object[]{new Integer(Config.BROKER_MAX_DELIVERY_COUNT)};
                    String msg = prMessageFormat.format(prAccessor.getString("MAX_DELIVERY_LIMIT_BROKER_DMQ_EXPLAIN"), obj);
                    errorInfo.put("JMS_SonicMQ_undeliveredExplanationText", msg);
                }
                this.m_aqp.processUndelivered(m, item.getReason(), true, errorInfo);
                ++ct;
                continue;
            }
            String node = item.getNode();
            if (node != null) {
                this.m_deletedRemoteSubscriptions.add(node);
                continue;
            }
            if (!this.checkDebugFlags(64)) continue;
            if (item instanceof CleanupListItemMid) {
                CleanupListItemMid mitem = (CleanupListItemMid)item;
                this.debug("no mgram; noops= " + this.noops + " q= " + mitem.m_qName + " mid= " + mitem.m_mid);
            }
            ++this.noops;
        }
        if (this.DEBUG) {
            this.debug("processMsgs: processed " + ct + " messages");
        }
    }

    private void performExplicitRequestedCleanupActions() throws InterruptedException {
        this.m_explicitlyAwakened = false;
        if (!this.m_deletedRemoteSubscriptions.isEmpty()) {
            AgentRoutingQueue arq = this.m_aqp.getRoutingQueue();
            arq.clearExpiredMsgs(true);
            this.m_deletedRemoteSubscriptions.clear();
            this.processMsgs(false);
            this.expireRemotes();
            this.expireNeighbors();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public void threadMain() throws InterruptedException {
        IAgentQueue iaq = null;
        this.m_nextCleanUpTime = System.currentTimeMillis() + this.m_cleanupInterval;
        if (this.checkDebugFlags(64)) {
            this.debug("Thread Starting;  qsize = " + this.qsize + " items= " + this.items + " itemsMid= " + this.itemsMid + " noops= " + this.noops);
        }
        try {
            while (!Broker.exiting && !this.isInterrupted()) {
                boolean wait = !this.m_explicitlyAwakened;
                this.processMsgs(wait);
                if (this.m_explicitlyAwakened) {
                    if (this.DEBUG) {
                        this.debug("Starting explicitly requested cleanup " + new Date());
                    }
                    this.performExplicitRequestedCleanupActions();
                    continue;
                }
                if (!Config.ENABLE_DYNAMIC_QUEUE_CLEANUP) continue;
                if (this.DEBUG) {
                    this.debug("Starting scheduled cleanup " + new Date());
                }
                Enumeration queues = this.m_aqp.getAgentQueues();
                while (queues.hasMoreElements()) {
                    iaq = (IAgentQueue)queues.nextElement();
                    if (iaq.getQueueName().equals("SonicMQ.deadMessage")) continue;
                    iaq.clearExpiredMsgs();
                    this.processMsgs(false);
                }
                this.expireRemotes();
                this.expireNeighbors();
                QueueCleanupThread queueCleanupThread = this;
                synchronized (queueCleanupThread) {
                    this.m_nextCleanUpTime = System.currentTimeMillis() + this.m_cleanupInterval;
                }
            }
            return;
        }
        catch (InterruptedException ie) {
            if (Broker.exiting) return;
            throw ie;
        }
        finally {
            if (this.checkDebugFlags(64)) {
                this.debug("threadMain: thread exiting; Broker.exiting = " + Broker.exiting);
            }
        }
    }

    private void expireRemotes() throws InterruptedException {
        RouterManager rm = AgentRegistrar.getAgentRegistrar().getRouterManager();
        Enumeration brokers = rm.getAllRemoteBrokers();
        while (brokers.hasMoreElements()) {
            RemoteBroker rb = (RemoteBroker)brokers.nextElement();
            rb.expireSent();
        }
    }

    private void expireNeighbors() throws InterruptedException {
        if (Config.ENABLE_INTERBROKER) {
            INeighbor[] narray = Interbroker.getInterbroker().getActiveNeighbors();
            for (int i = 0; i < narray.length; ++i) {
                IRemoteBroker irb = (IRemoteBroker)((Object)narray[i]);
                irb.expireSent();
            }
        }
    }
}

