/*
 * Decompiled with CFR 0.152.
 */
package progress.message.broker;

import java.util.ArrayList;
import progress.message.broker.AgentQueueProcessor;
import progress.message.broker.AgentRegistrar;
import progress.message.broker.AgentRoutingQueue;
import progress.message.broker.Broker;
import progress.message.broker.Config;
import progress.message.broker.IAgentQueue;
import progress.message.gr.RemoteBrokerHelper;
import progress.message.util.DebugState;
import progress.message.util.EAssertFailure;
import progress.message.zclient.DebugObject;
import progress.message.zclient.DebugThread;
import progress.message.zclient.FastVector;

class AgentQueueDispatcher
extends DebugObject {
    private FastVector m_dispatchers = new FastVector();
    private RoutingDispatchThread m_rdt = null;
    private boolean m_rdtStarted = false;
    private AgentRoutingQueue m_routingQ = null;
    private Object m_rqSyncObj;
    private static volatile AgentQueueProcessor s_queproc = null;
    private static Object QUEPROC_LOCK_OBJ = new Object();
    private final ArrayList m_dispatchList = new ArrayList();
    private final Object m_dispatchListSync = new Object();
    private static int s_count = 0;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    AgentQueueDispatcher() {
        super(DebugState.GLOBAL_DEBUG_ON ? "AgentQueueDispatcher" : null);
        if (s_queproc == null) {
            Object object = QUEPROC_LOCK_OBJ;
            synchronized (object) {
                if (s_queproc == null) {
                    s_queproc = AgentRegistrar.getAgentRegistrar().getQueueProc();
                }
            }
        }
        this.m_routingQ = s_queproc.getRoutingQueue();
        this.m_rqSyncObj = new Object();
    }

    synchronized void start() {
        for (int i = 0; i < Config.QUEUE_DELIVERY_THREADS; ++i) {
            DispatchThread dt = new DispatchThread();
            this.m_dispatchers.addElement(dt);
            dt.start();
        }
    }

    synchronized void startRoutingThread() {
        if (this.m_routingQ == null) {
            this.m_routingQ = s_queproc.getRoutingQueue();
        }
        if (this.DEBUG) {
            this.debug("startRoutingThread: routing enabled, routing queue = " + this.m_routingQ);
        }
        if (!this.m_rdtStarted && this.m_routingQ != null) {
            this.m_rdt = new RoutingDispatchThread();
            this.m_rdt.start();
            this.m_rdtStarted = true;
        }
        if (this.DEBUG) {
            this.debug("startRoutingThread: routing enabled, routing dispatcher thread started = " + this.m_rdtStarted);
        }
    }

    synchronized void shutdown() {
        if (this.m_rdtStarted && this.m_routingQ != null) {
            if (s_queproc.getRemoteBrokerHelper() != null) {
                s_queproc.getRemoteBrokerHelper().stopThreads();
            }
            if (this.m_rdt != null) {
                this.m_rdt.shutdown();
            }
        }
        DispatchThread dt = null;
        Object[] dispatchers = this.m_dispatchers.toArray();
        for (int i = 0; i < dispatchers.length; ++i) {
            dt = (DispatchThread)dispatchers[i];
            if (dt == null) continue;
            dt.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    synchronized void routedMessageEnqueued() {
        if (this.m_routingQ != null) {
            Object object = this.m_rqSyncObj;
            synchronized (object) {
                this.m_rqSyncObj.notifyAll();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    final void addQueueToDispatchList(IAgentQueue iaq) {
        Object object = this.m_dispatchListSync;
        synchronized (object) {
            this.m_dispatchList.add(iaq);
            this.m_dispatchListSync.notifyAll();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    final boolean removeQueueFromDispatchList(IAgentQueue iaq) {
        Object object = this.m_dispatchListSync;
        synchronized (object) {
            return this.m_dispatchList.remove(iaq);
        }
    }

    class RoutingDispatchThread
    extends DebugThread {
        RoutingDispatchThread() {
            super("RoutingDispatchThread");
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        @Override
        public void threadMain() throws InterruptedException {
            int dispatched = 0;
            RemoteBrokerHelper rbHelper = null;
            try {
                while (!Broker.exiting) {
                    if (this.isShuttingDown()) return;
                    Object object = AgentQueueDispatcher.this.m_rqSyncObj;
                    synchronized (object) {
                        if (this.DEBUG) {
                            this.debug("AgentQueueDispatcher$RoutingDispatchThread.threadMain: grabbed lock on m_routingQ, at time = " + System.currentTimeMillis());
                        }
                        while (AgentQueueDispatcher.this.m_routingQ.getEnqueuedCount() == 0 && !Broker.exiting) {
                            if (this.DEBUG) {
                                this.debug("AgentQueueDispatcher$RoutingDispatchThread.threadMain: Going idle, waiting on m_routingQ, time = " + System.currentTimeMillis() + "...");
                            }
                            AgentQueueDispatcher.this.m_rqSyncObj.wait();
                            if (!this.DEBUG) continue;
                            this.debug("AgentQueueDispatcher$RoutingDispatchThread.threadMain: Notified on m_routingQ, time = " + System.currentTimeMillis());
                        }
                    }
                    dispatched = 0;
                    object = AgentQueueDispatcher.this.m_routingQ;
                    synchronized (object) {
                        while (AgentQueueDispatcher.this.m_routingQ.getEnqueuedCount() > 0) {
                            AgentQueueDispatcher.this.m_routingQ.dispatch(0L, 0, false);
                            if (Thread.currentThread().isInterrupted()) {
                                return;
                            }
                            ++dispatched;
                            if (!this.DEBUG) continue;
                            this.debug("AgentQueueDispatcher$RoutingDispatchThread.threadMain: # of messages successfully dispatched = " + dispatched);
                        }
                    }
                    rbHelper = s_queproc.getRemoteBrokerHelper();
                    if (rbHelper != null) {
                        rbHelper.wakeUpConnectThread();
                    }
                    if (!this.DEBUG) continue;
                    this.debug("AgentQueueDispatcher$RoutingDispatchThread.threadMain: releasing the lock on m_routingQ, time = " + System.currentTimeMillis());
                }
                return;
            }
            catch (InterruptedException ie) {
                if (Broker.exiting) return;
                throw ie;
            }
            finally {
                if (this.DEBUG) {
                    this.debug("AgentQueueDispatch$DispatchThread.threadMain: broker exiting = " + Broker.exiting);
                }
            }
        }
    }

    class DispatchThread
    extends DebugThread {
        DispatchThread() {
            super("DispatchThread " + ++s_count);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void threadMain() throws InterruptedException {
            try {
                while (!Broker.exiting && !this.isShuttingDown()) {
                    IAgentQueue iaq = null;
                    Object object = AgentQueueDispatcher.this.m_dispatchListSync;
                    synchronized (object) {
                        while (AgentQueueDispatcher.this.m_dispatchList.isEmpty()) {
                            AgentQueueDispatcher.this.m_dispatchListSync.wait();
                        }
                        iaq = (IAgentQueue)AgentQueueDispatcher.this.m_dispatchList.remove(0);
                    }
                    iaq.processDispatchRequests();
                }
            }
            catch (InterruptedException ie) {
                if (!Broker.exiting) {
                    throw ie;
                }
            }
            catch (EAssertFailure eaf) {
                if (!Broker.exiting) {
                    throw eaf;
                }
                if (this.DEBUG) {
                    this.debug("************Broker exiting; Suppressing AssertFailure in " + Thread.currentThread().getName());
                    eaf.printStackTrace();
                }
            }
            finally {
                if (this.DEBUG) {
                    this.debug("AgentQueueDispatch$DispatchThread.threadMain: broker exiting = " + Broker.exiting);
                }
            }
        }
    }
}

