/*
 * Decompiled with CFR 0.152.
 */
package progress.message.ft;

import com.sonicsw.mq.components.BrokerComponent;
import java.io.IOException;
import java.io.ObjectInput;
import java.util.ArrayList;
import java.util.LinkedList;
import progress.message.broker.AgentRegistrar;
import progress.message.broker.BrokerStateManager;
import progress.message.broker.Config;
import progress.message.broker.EBrokerAborted;
import progress.message.broker.EOLEvent;
import progress.message.broker.IPubSubDeleteTracker;
import progress.message.broker.IReplicateableSaverOp;
import progress.message.broker.ISaverPostProcessor;
import progress.message.broker.IStateListener;
import progress.message.broker.LogEvent;
import progress.message.broker.LogManager;
import progress.message.broker.MsgSaver;
import progress.message.broker.MsgSaverOp;
import progress.message.broker.NullSaverPostProcessor;
import progress.message.broker.QMsgSaverOp;
import progress.message.broker.QueueMsgSaver;
import progress.message.broker.SyncpointLoc;
import progress.message.broker.prAccessor;
import progress.message.db.EDatabaseException;
import progress.message.dbsc.data.IDbDupDetectData;
import progress.message.dd.NoDupDetectDb;
import progress.message.dd.NoDupDetectDbConnection;
import progress.message.ft.DynamicSyncRecoveryController;
import progress.message.ft.DynamicSyncRoutesSaveOp;
import progress.message.ft.FTMgramFactory;
import progress.message.ft.FTReflectionOperation;
import progress.message.ft.IFTEventProcessor;
import progress.message.ft.NotifyOnRedoEvt;
import progress.message.ft.PubSubDeleteTracker;
import progress.message.ft.RBRegistrySyncOp;
import progress.message.ft.ReplicationManager;
import progress.message.ft.StandbyDBQMsgs;
import progress.message.ft.StateEventAckActive;
import progress.message.ft.StateEventNullExec;
import progress.message.ft.StateEventWrapper;
import progress.message.ft.SubscriptionSyncOp;
import progress.message.ft.TxnFileReplicationMgr;
import progress.message.ft.UserIDMappingSyncOp;
import progress.message.gr.RouteSaver;
import progress.message.gr.RouteSaverOp;
import progress.message.msg.IMgram;
import progress.message.util.DebugState;
import progress.message.util.PriorityQueue;
import progress.message.zclient.DebugObject;
import progress.message.zclient.DebugThread;
import progress.message.zclient.IStateEvent;
import progress.message.zclient.QueueLimiter;

public class ReplicationDemultiplexer
extends DebugObject
implements IStateListener {
    private PriorityQueue m_inboundQueue;
    private Object m_inboundQueueSyncObject = new Object();
    private long m_inboundQOps;
    private long m_maxIBOps = 0L;
    private EventQueue m_eventQueue;
    private Thread m_demuxThread;
    private volatile boolean m_stopped = true;
    private ReplicationManager m_replMgr = null;
    private TxnFileReplicationMgr m_txnFileReplicationMgr = null;
    private AgentRegistrar m_reg = null;
    private LogManager m_logMgr = null;
    private MsgSaver m_msgSaver = null;
    private QueueMsgSaver m_qmsgSaver = null;
    private RouteSaver m_routeSaver = null;
    private long m_lastSyncBeginSeqNo = -1L;
    private long m_lastSyncEndSeqNo = -1L;
    private boolean m_gotSyncBegin = false;
    private NotifyOnRedoEvt m_syncBeginFlushedEvt = null;
    private boolean m_initialSyncpointCompleted = false;
    private IFTEventProcessor m_eventProcessor = null;
    private Object m_stateSwitchSyncObj = new Object();
    private StandbyDBQMsgs m_standbyDBQMsgs = null;
    private IPubSubDeleteTracker m_pubSubDeleteTracker = null;
    private DynamicSyncRecoveryController m_dsRecoveryController = null;
    private SyncpointLoc m_lastKnownGoodSP = null;
    private int m_size = 0;

    public ReplicationDemultiplexer(ReplicationManager mgr) {
        super(DebugState.GLOBAL_DEBUG_ON ? "ReplicationDemultiplexer" : null);
        try {
            BrokerStateManager.getBrokerStateManager().registerListener(this);
        }
        catch (Exception exception) {
            // empty catch block
        }
        this.m_replMgr = mgr;
        this.m_txnFileReplicationMgr = this.m_replMgr.getTxnFileReplicationMgr();
        this.m_eventQueue = new EventQueue(Config.LOG_QUEUE_SIZE);
        this.m_reg = AgentRegistrar.getAgentRegistrar();
        this.m_msgSaver = this.m_reg.getMsgSaver();
        this.m_qmsgSaver = this.m_reg.getQueueMsgSaver();
        this.m_routeSaver = this.m_reg.getRouterManager().getRouteForwarder().getRouteSaver();
        this.m_dsRecoveryController = new DynamicSyncRecoveryController(this.m_eventQueue, this.m_replMgr);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private PriorityQueue getInboundQueue() {
        Object object = this.m_inboundQueueSyncObject;
        synchronized (object) {
            if (this.m_inboundQueue == null) {
                this.m_inboundQueue = new PriorityQueue(1);
            }
        }
        return this.m_inboundQueue;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Object dequeueWaitInboundQueue() throws InterruptedException {
        PriorityQueue pq = this.getInboundQueue();
        Object obj = null;
        PriorityQueue priorityQueue = pq;
        synchronized (priorityQueue) {
            obj = pq.dequeueWait();
            this.processInboundQueue(pq);
        }
        return obj;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void enqueueInboundQueue(Object obj, int prio) {
        PriorityQueue pq;
        PriorityQueue priorityQueue = pq = this.getInboundQueue();
        synchronized (priorityQueue) {
            pq.enqueue(obj, prio);
            this.processInboundQueue(pq);
        }
    }

    private void processInboundQueue(PriorityQueue pq) {
        if (this.checkDebugFlags(64)) {
            ++this.m_inboundQOps;
            int qsize = pq.getEnqueued();
            if ((long)qsize > this.m_maxIBOps) {
                this.m_maxIBOps = qsize;
            }
            if (this.m_inboundQOps % 10000L == 0L) {
                this.debug("m_inboundQueue Currentcount= " + qsize + " maxCt= " + this.m_maxIBOps);
                this.m_maxIBOps = 0L;
                this.m_inboundQOps = 0L;
            }
        }
    }

    private void handleInfoObject(Object infoObject) throws InterruptedException, IOException {
        IMgram m = (IMgram)infoObject;
        switch (m.getType()) {
            case 24: {
                this.handleStateEvent(m.getStateEventHandle().getStateEvent(), m.getGuarenteedTrackingNum());
                break;
            }
            case 29: {
                this.enqueueSaverOp(m);
                break;
            }
            case 25: {
                this.handleOperation(m);
                break;
            }
            default: {
                this.m_replMgr.acknowledge(m.getGuarenteedTrackingNum());
            }
        }
    }

    private void handleOperation(IMgram m) throws InterruptedException, IOException {
        try {
            int type = m.getOperationHandle().getOperationType();
            block3 : switch (type) {
                case 23: {
                    ObjectInput di = m.getPayloadInputStreamHandle();
                    byte version = di.readByte();
                    byte subType = di.readByte();
                    switch (subType) {
                        case 1: {
                            this.endSyncpoint();
                            this.m_replMgr.onSyncpointComplete();
                            this.m_replMgr.acknowledge(m.getGuarenteedTrackingNum());
                            break block3;
                        }
                        case 5: {
                            this.m_replMgr.handleTxnSyncCompleteOp();
                            this.m_replMgr.acknowledge(m.getGuarenteedTrackingNum());
                            break block3;
                        }
                        case 2: {
                            int tid = di.readInt();
                            short flags = di.readShort();
                            this.m_replMgr.handleReplicateTxnMgram(tid, flags, m);
                            break block3;
                        }
                        case 3: {
                            if (this.DEBUG) {
                                this.debug("Received FTMgramFactory.USERIDMAPPING_SYNC_OP mgram");
                            }
                            UserIDMappingSyncOp userIdMappingSyncOp = new UserIDMappingSyncOp(di);
                            userIdMappingSyncOp.setGuarTrackingNum(m.getGuarenteedTrackingNum());
                            this.m_replMgr.handleUserIDMappingSyncOp(userIdMappingSyncOp);
                            break block3;
                        }
                        case 8: {
                            if (this.DEBUG) {
                                this.debug("Received FTMgramFactory.RBREGISTRY_SYNC_OP mgram");
                            }
                            RBRegistrySyncOp rbRegistrySyncOp = new RBRegistrySyncOp(di);
                            rbRegistrySyncOp.setGuarTrackingNum(m.getGuarenteedTrackingNum());
                            this.m_replMgr.handleRBRegistrySyncOp(rbRegistrySyncOp);
                            if (!this.CALLBACK) break block3;
                            this.callback("Processed RBRegsitry Op", 10, null);
                            break block3;
                        }
                        case 9: {
                            if (this.DEBUG) {
                                this.debug("Received FTMgramFactory.ROUTEINFOS_SYNC_OP mgram");
                            }
                            DynamicSyncRoutesSaveOp dynSyncRSOp = new DynamicSyncRoutesSaveOp(di);
                            this.m_routeSaver.enqueueNoLimiterWait(dynSyncRSOp);
                            this.m_replMgr.acknowledge(m.getGuarenteedTrackingNum());
                            if (!this.CALLBACK) break block3;
                            this.callback("Processed Route Info Op", 7, null);
                            break block3;
                        }
                        case 4: {
                            if (this.DEBUG) {
                                this.debug("Received FTMgramFactory.SUBSCRIPTION_SYNC_OP mgram");
                            }
                            SubscriptionSyncOp subSyncOp = new SubscriptionSyncOp(di, m);
                            subSyncOp.setGuarTrackingNum(m.getGuarenteedTrackingNum());
                            subSyncOp.execute(this.m_eventProcessor);
                            this.m_replMgr.acknowledge(m.getGuarenteedTrackingNum());
                            if (!subSyncOp.isLastChunk()) break block3;
                            if (this.DEBUG) {
                                this.debug("Processed last chunk for SUBSCRIPTION_SYNC_OP");
                            }
                            this.m_replMgr.setDynamicSyncStatus(16);
                            break block3;
                        }
                        case 6: {
                            if (this.DEBUG) {
                                this.debug("Received dynamically sync'd replicated mgrams");
                            }
                            this.m_replMgr.acknowledge(m.getGuarenteedTrackingNum());
                            LinkedList list = m.getOperationHandle().getMgramList();
                            boolean lastChunk = di.readBoolean();
                            if (list != null && !list.isEmpty()) {
                                this.m_msgSaver.saveReplicatedMsgs(list, m.getOperationHandle().getMgramListMemoryLength());
                            }
                            if (!lastChunk) break block3;
                            this.m_msgSaver.flush();
                            this.m_replMgr.setDynamicSyncStatus(2);
                            break block3;
                        }
                        case 7: {
                            this.m_replMgr.acknowledge(m.getGuarenteedTrackingNum());
                            long time = 0L;
                            if (this.DEBUG) {
                                time = System.currentTimeMillis();
                            }
                            boolean lastChunk = di.readBoolean();
                            int size = di.readInt();
                            if (size > 0) {
                                ArrayList<IDbDupDetectData> dataList = new ArrayList<IDbDupDetectData>();
                                try {
                                    NoDupDetectDbConnection ndd = NoDupDetectDb.getConnection();
                                    for (int count = 0; count < size; ++count) {
                                        IDbDupDetectData data = FTMgramFactory.getDupDetectData(di);
                                        dataList.add(data);
                                    }
                                    ndd.writeDupDetectData(dataList);
                                }
                                catch (EDatabaseException e) {
                                    try {
                                        BrokerComponent.getBrokerComponent().abort(prAccessor.getString(prAccessor.getString("DUP_DETECT_ERROR")), e, 0);
                                    }
                                    catch (EBrokerAborted eba) {
                                        return;
                                    }
                                }
                            }
                            if (lastChunk) {
                                this.m_replMgr.setDynamicSyncStatus(1);
                            }
                            if (this.CALLBACK) {
                                this.callback("Processed Dup Detect Op", 2, null);
                            }
                            if (!this.DEBUG) break block3;
                            this.debug("Total Time for dup detect op: " + (System.currentTimeMillis() - time));
                            break block3;
                        }
                        case 10: {
                            this.m_replMgr.acknowledge(m.getGuarenteedTrackingNum());
                            long lastTracking = di.readLong();
                            this.m_replMgr.completeQDynamicSync(lastTracking);
                            break block3;
                        }
                    }
                    break;
                }
                case 22: {
                    FTReflectionOperation op = FTMgramFactory.createReflectionOperation(m);
                    op.execute();
                    this.m_replMgr.acknowledge(m.getGuarenteedTrackingNum());
                    break;
                }
            }
        }
        catch (IOException e) {
            e.printStackTrace();
        }
        catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
        catch (NoSuchMethodException e) {
            if (this.DEBUG) {
                e.printStackTrace();
            }
        }
        catch (IllegalAccessException e) {
            e.printStackTrace();
        }
    }

    private void endSyncpoint() throws InterruptedException, IOException {
        if (this.m_lastSyncEndSeqNo != -1L && this.m_gotSyncBegin) {
            this.m_logMgr.beginFlush();
            this.m_syncBeginFlushedEvt.waitForRedo();
            this.m_logMgr.waitForFlush(this.m_lastSyncEndSeqNo);
            this.m_reg.writeSyncSubscriptionsStandby(this.m_lastKnownGoodSP);
            if (this.checkDebugFlags(32)) {
                this.debug("endSyncpoint: Wrote syncpoint to Db; " + this.m_lastKnownGoodSP);
            }
            if (this.m_initialSyncpointCompleted) {
                this.m_logMgr.endSyncpoint();
                if (this.checkDebugFlags(32)) {
                    this.debug("endSyncpoint: promoted current syncpointLoc in log; " + this.m_lastKnownGoodSP + " " + this.m_logMgr.getLastSyncsize());
                }
            } else {
                this.m_initialSyncpointCompleted = true;
                if (this.checkDebugFlags(32)) {
                    this.debug("endSyncpoint: setting m_initialSPcompleted=TRUE; lastGoodSP= " + this.m_lastKnownGoodSP);
                }
            }
        }
        this.m_gotSyncBegin = false;
        this.m_lastSyncEndSeqNo = -1L;
        this.m_syncBeginFlushedEvt = null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void enqueueSaverOp(IMgram m) {
        IReplicateableSaverOp op = (IReplicateableSaverOp)m.getSaverOpHandle().getSaverOp();
        long tracking = m.getGuarenteedTrackingNum();
        switch (op.getType()) {
            case 3: 
            case 4: 
            case 5: 
            case 6: 
            case 8: 
            case 13: {
                this.m_replMgr.acknowledge(tracking);
                MsgSaverOp msgSaverOp = (MsgSaverOp)((Object)op);
                if (op.getType() == 5) {
                    long clientId = ((MsgSaver.MsgDeleteOp)msgSaverOp).getClientId();
                    boolean subjectAck = ((MsgSaver.MsgDeleteOp)msgSaverOp).isSubjectAck();
                    short subjectTracking = ((MsgSaver.MsgDeleteOp)msgSaverOp).getSubjectTracking();
                    this.m_pubSubDeleteTracker.msgDelete(clientId, msgSaverOp.getMessageId(), subjectAck, subjectTracking, false);
                }
                msgSaverOp.setPostProcessor(new NullSaverPostProcessor());
                this.m_msgSaver.enqueue(msgSaverOp, op.memsize());
                if (!msgSaverOp.hasFTMetaState()) break;
                Object clientId = this.m_stateSwitchSyncObj;
                synchronized (clientId) {
                    try {
                        msgSaverOp.updateFTMetaState(this.m_eventProcessor);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        return;
                    }
                }
            }
            case 1: 
            case 12: {
                this.m_replMgr.acknowledge(tracking);
                QMsgSaverOp qmsgSaverOp = (QMsgSaverOp)((Object)op);
                this.m_standbyDBQMsgs.deleteReplicated(qmsgSaverOp.getTracking());
                qmsgSaverOp.setPostProcessor(new NullSaverPostProcessor());
                this.m_qmsgSaver.enqueue((QMsgSaverOp)((Object)op));
                break;
            }
            case 0: 
            case 11: {
                this.m_replMgr.acknowledge(tracking);
                QMsgSaverOp qmsgSaverOp = (QMsgSaverOp)((Object)op);
                this.m_standbyDBQMsgs.add(qmsgSaverOp.getTracking());
                qmsgSaverOp.setPostProcessor(new NullSaverPostProcessor());
                this.m_qmsgSaver.enqueue((QMsgSaverOp)((Object)op));
                break;
            }
            case 9: 
            case 10: {
                this.m_replMgr.acknowledge(tracking);
                RouteSaverOp routeSaverOp = (RouteSaverOp)((Object)op);
                this.m_routeSaver.enqueue(routeSaverOp);
                break;
            }
            case 7: {
                MsgSaverOp commitOp = (MsgSaverOp)((Object)op);
                commitOp.setPostProcessor(new AckPostProcessor(this.m_replMgr, tracking));
                this.m_msgSaver.enqueue(commitOp, op.memsize());
                break;
            }
            case 2: {
                QMsgSaverOp qcommitOp = (QMsgSaverOp)((Object)op);
                qcommitOp.setPostProcessor(new AckPostProcessor(this.m_replMgr, tracking));
                this.m_qmsgSaver.enqueue(qcommitOp);
                break;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleStateEvent(IStateEvent e, long trackingNum) throws InterruptedException {
        if (this.DEBUG) {
            this.debug("Handling StateEvent: " + e + " Tracking number: " + trackingNum);
        }
        Object object = this.m_stateSwitchSyncObj;
        synchronized (object) {
            this.m_eventProcessor.addEvent(e, trackingNum);
        }
        if (e.type() == 6 && this.m_gotSyncBegin) {
            this.m_logMgr.beginFlush();
            this.m_logMgr.waitForFlush(this.m_lastSyncBeginSeqNo);
            if (!this.m_initialSyncpointCompleted) {
                this.m_lastKnownGoodSP = this.m_logMgr.endSyncpoint();
                if (this.checkDebugFlags(32)) {
                    this.debug("handleStateEvent: processing SYNC_END event; promoted current syncpointLoc in log; " + this.m_lastKnownGoodSP);
                }
            } else {
                this.m_lastKnownGoodSP = this.m_logMgr.getSyncpointLoc();
                if (this.checkDebugFlags(32)) {
                    this.debug("handleStateEvent: processing SYNC_END event; current syncpointLoc= " + this.m_lastKnownGoodSP);
                }
            }
            this.m_lastSyncEndSeqNo = e.getSeqNo();
        }
        if (e.type() == 5) {
            this.m_syncBeginFlushedEvt = new NotifyOnRedoEvt();
            this.m_gotSyncBegin = true;
            this.m_lastSyncBeginSeqNo = e.getSeqNo();
            this.getEventQueue().enqueue(this.m_syncBeginFlushedEvt);
        }
    }

    public StandbyDBQMsgs getStandbyDBQMsgs() {
        return this.m_standbyDBQMsgs;
    }

    public IPubSubDeleteTracker getPubSubDeleteTracker() {
        return this.m_pubSubDeleteTracker;
    }

    @Override
    public void stateChanging(int newState) {
    }

    @Override
    public void stateChanged(int newState) throws Exception {
        switch (newState) {
            case 5: {
                this.m_dsRecoveryController.reset();
                this.stopThread(newState, false);
                this.setupStandbySyncProcessing();
                this.startThread();
                break;
            }
            case 6: {
                this.setupStandbyProcessing();
                this.startThread();
                break;
            }
            case 1: 
            case 4: {
                this.m_dsRecoveryController.reset();
                this.stopThread(newState, false);
                break;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void startThread() {
        if (this.m_demuxThread == null) {
            this.m_demuxThread = new DemuxThread();
        }
        if (this.m_logMgr == null) {
            this.m_logMgr = this.m_reg.getLogManager();
        }
        if (this.m_stopped) {
            Object object = this.m_inboundQueueSyncObject;
            synchronized (object) {
                this.m_inboundQueue = null;
            }
            this.m_stopped = false;
            this.m_demuxThread.start();
        }
    }

    public synchronized void stopThread(int newState, boolean fatal) throws InterruptedException {
        if (this.m_demuxThread == null) {
            this.m_stopped = true;
            return;
        }
        this.m_stopped = true;
        this.getInboundQueue().cancel();
        if (fatal) {
            this.m_demuxThread.interrupt();
        }
        this.m_demuxThread.join();
        this.m_demuxThread = null;
        EOLEvent evt = new EOLEvent();
        switch (newState) {
            case 1: {
                this.m_msgSaver.flush();
                this.m_standbyDBQMsgs.completeRecovery();
                this.m_pubSubDeleteTracker.completeRecovery();
                evt.setStatus((byte)1);
                this.m_eventQueue.enqueue(evt);
                this.m_eventQueue = null;
                break;
            }
            case 4: 
            case 5: {
                this.m_standbyDBQMsgs.reset();
                this.m_pubSubDeleteTracker.reset();
                evt.setStatus((byte)2);
                this.m_eventQueue.enqueue(evt);
                break;
            }
        }
    }

    public EventQueue getEventQueue() {
        return this.m_eventQueue;
    }

    public DynamicSyncRecoveryController getDSRecoveryController() {
        return this.m_dsRecoveryController;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void setupStandbyProcessing() {
        Object object = this.m_stateSwitchSyncObj;
        synchronized (object) {
            this.m_dsRecoveryController.reset();
            this.m_eventProcessor = new RecoveryThreadEventProcessor();
            if (this.checkDebugFlags(32)) {
                this.debug("setupStandbyProcessing() completed");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void setupStandbySyncProcessing() throws EDatabaseException {
        Object object = this.m_stateSwitchSyncObj;
        synchronized (object) {
            this.m_eventProcessor = this.m_dsRecoveryController;
            if (this.m_standbyDBQMsgs == null) {
                this.m_standbyDBQMsgs = new StandbyDBQMsgs(this.m_reg.getBrokerDatabase(), this);
            }
            if (this.m_pubSubDeleteTracker == null) {
                this.m_pubSubDeleteTracker = new PubSubDeleteTracker();
            }
            this.m_lastSyncBeginSeqNo = -1L;
            this.m_lastSyncEndSeqNo = -1L;
            this.m_gotSyncBegin = false;
            this.m_syncBeginFlushedEvt = null;
            this.m_lastKnownGoodSP = null;
            this.m_initialSyncpointCompleted = false;
            this.m_size = 0;
            if (this.checkDebugFlags(32)) {
                this.debug("setupStandbySyncProcessing() completed");
            }
        }
    }

    public class EventQueue {
        private QueueLimiter m_limiter;
        private PriorityQueue m_queue;
        private long m_numOps;
        private long m_maxSize;
        private long m_maxCt;

        EventQueue(int maxSize) {
            this.m_limiter = new QueueLimiter(maxSize);
            this.m_queue = new PriorityQueue(1);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void enqueue(LogEvent evt) throws InterruptedException {
            int size = evt.memsize();
            int qct = this.m_queue.getEnqueued();
            EventQueue eventQueue = this;
            synchronized (eventQueue) {
                while (!this.m_limiter.hasRoom(size)) {
                    this.wait();
                }
                this.m_limiter.add(size);
                this.checkDebug(qct);
            }
            this.m_queue.enqueue(evt, 0);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void forceEnqueue(LogEvent evt) {
            int size = evt.memsize();
            int qct = this.m_queue.getEnqueued();
            EventQueue eventQueue = this;
            synchronized (eventQueue) {
                this.m_limiter.add(size);
                this.checkDebug(qct);
            }
            this.m_queue.enqueue(evt, 0);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public LogEvent dequeueWait() throws InterruptedException {
            LogEvent result = (LogEvent)this.m_queue.dequeueWait();
            if (result == null) {
                return null;
            }
            int qct = this.m_queue.getEnqueued();
            int size = result.memsize();
            EventQueue eventQueue = this;
            synchronized (eventQueue) {
                this.m_limiter.add(-size);
                this.checkDebug(qct);
                this.notifyAll();
            }
            return result;
        }

        public void cancel() {
            this.m_queue.cancel();
        }

        private void checkDebug(long qct) {
            if (ReplicationDemultiplexer.this.checkDebugFlags(64)) {
                ++this.m_numOps;
                long sz = this.m_limiter.m_size;
                if (sz > this.m_maxSize) {
                    this.m_maxSize = sz;
                }
                if (qct > this.m_maxCt) {
                    this.m_maxCt = qct;
                }
                if (this.m_numOps % 10000L == 0L) {
                    ReplicationDemultiplexer.this.debug("m_EventQueue size= " + this.m_limiter.m_size + " ct= " + qct + " maxSize= " + this.m_maxSize + " maxct= " + this.m_maxCt);
                    this.m_maxSize = 0L;
                    this.m_maxCt = 0L;
                    this.m_numOps = 0L;
                }
            }
        }

        private synchronized long getNumOps() {
            return this.m_numOps;
        }
    }

    private static class AckPostProcessor
    implements ISaverPostProcessor {
        private ReplicationManager m_replMgr;
        private long m_tracking;

        AckPostProcessor(ReplicationManager replMgr, long tracking) {
            this.m_replMgr = replMgr;
            this.m_tracking = tracking;
        }

        @Override
        public void doPostProcessing() throws InterruptedException {
            this.m_replMgr.acknowledge(this.m_tracking);
        }
    }

    class DemuxThread
    extends DebugThread {
        DemuxThread() {
            super("DemuxThread");
        }

        @Override
        public void threadMain() {
            try {
                if (this.DEBUG) {
                    this.debug("Starting Replication Demultiplexer thread");
                }
                while (!ReplicationDemultiplexer.this.m_stopped && !this.isInterrupted()) {
                    Object infoObject = ReplicationDemultiplexer.this.dequeueWaitInboundQueue();
                    if (infoObject == null) {
                        return;
                    }
                    ReplicationDemultiplexer.this.handleInfoObject(infoObject);
                }
            }
            catch (IOException e) {
                try {
                    BrokerComponent.getBrokerComponent().abort(prAccessor.getString("STR259"), e, 1);
                }
                catch (EBrokerAborted eba) {
                    return;
                }
            }
            catch (InterruptedException e) {
                this.interrupt();
                return;
            }
            finally {
                if (this.DEBUG) {
                    this.debug("Exiting Replication Demultiplexer thread");
                }
            }
        }
    }

    class RecoveryThreadEventProcessor
    implements IFTEventProcessor {
        RecoveryThreadEventProcessor() {
        }

        @Override
        public void addEventNoLog(IStateEvent e) throws InterruptedException {
            LogEvent evt = (LogEvent)e;
            ReplicationDemultiplexer.this.getEventQueue().enqueue(evt);
        }

        @Override
        public void addEvent(IStateEvent e, long trackingNum) throws InterruptedException {
            LogEvent evt = (LogEvent)e;
            ReplicationDemultiplexer.this.getEventQueue().enqueue(evt);
            if (!Config.FT_REPLICATE_PERSISTENT) {
                ReplicationDemultiplexer.this.m_size += evt.memsize();
            }
            StateEventWrapper logEvt = Config.FT_REPLICATE_PERSISTENT ? new StateEventAckActive(e, ReplicationDemultiplexer.this.m_replMgr, trackingNum) : new StateEventNullExec(e);
            boolean flush = Config.FT_REPLICATE_PERSISTENT;
            if (ReplicationDemultiplexer.this.m_size > Config.LOG_BLOCK_SIZE) {
                flush = true;
                ReplicationDemultiplexer.this.m_size = 0;
            }
            ReplicationDemultiplexer.this.m_logMgr.addReplicatedEvent(logEvt, flush);
            if (!Config.FT_REPLICATE_PERSISTENT) {
                ReplicationDemultiplexer.this.m_replMgr.acknowledge(trackingNum);
            }
        }
    }
}

