/*
 * Decompiled with CFR 0.152.
 */
package progress.message.gr;

import com.sonicsw.mq.components.BrokerComponent;
import java.io.EOFException;
import java.io.IOException;
import java.util.Enumeration;
import progress.message.broker.AgentRegistrar;
import progress.message.broker.Config;
import progress.message.client.EGeneralException;
import progress.message.client.EInterrupted;
import progress.message.client.ENetworkFailure;
import progress.message.gr.RemoteBroker;
import progress.message.gr.RouterSession;
import progress.message.util.ArrayUtil;
import progress.message.util.DebugState;
import progress.message.zclient.Connection;
import progress.message.zclient.DebugObject;
import progress.message.zclient.DebugThread;
import progress.message.zclient.Envelope;
import progress.message.zclient.IMessageHandler;
import progress.message.zclient.Label;
import progress.message.zclient.Message;
import progress.message.zclient.MessageHandler;
import progress.message.zclient.Session;
import progress.message.zclient.Solicitation;

final class GuarAckExchanger
extends DebugObject {
    private static final int MAX_ACKS_PER_MSG = 2000;
    public static final String GUAR_REQ_SUBJ = "$ISYS.GR.ACK_EXCHANGE";
    private static final int TYPE_PUBSUB = 1;
    private static final int TYPE_QUEUE = 2;
    private long GAX_TIMEOUT = Config.ACKEXCHANGE_TIMEOUT;
    private long m_localId;
    private long m_remoteId;
    private long m_token;
    private RemoteBroker m_remoteBroker;
    private Connection m_con;
    private Session m_ssn;
    private MessageHandler m_reqMH;
    private MessageHandler m_repMH;
    private boolean m_isClient;
    private boolean m_localReqDone;
    private boolean m_remoteReqDone;
    private Solicitation m_localReq;
    private Envelope m_remoteReq;
    private EGeneralException m_err;
    private boolean m_repHandlerRunning;
    private boolean m_reqHandlerRunning;
    private boolean DEBUG1;
    private long m_lastAccessTime;
    private GAXMonitorThread m_activityMonitorThread;
    private boolean m_connDropped;

    GuarAckExchanger(Session ssn, long localId, long remoteId, long token, boolean localIsClient, RemoteBroker parent) throws EGeneralException {
        super(null);
        if (DebugState.GLOBAL_DEBUG_ON) {
            this.debugName("GuarAckExchanger " + this.hashCode());
        }
        boolean bl = this.DEBUG1 = (this.debugFlags & 0x40) > 0;
        if (this.DEBUG1) {
            this.debug("Constructor: localId= " + localId);
            this.debug("Constructor: remoteId= " + remoteId);
            this.debug("Constructor: localIsClient= " + localIsClient);
            this.debug("Constructor: token= " + token);
        }
        try {
            this.m_localId = localId;
            this.m_remoteId = remoteId;
            this.m_token = token;
            this.m_isClient = localIsClient;
            this.m_ssn = ssn;
            this.m_con = ssn.getConnection();
            this.m_remoteBroker = parent;
            if (DebugState.GLOBAL_DEBUG_ON) {
                this.debugName("GuarAckExchanger " + this.m_remoteBroker.getNodeName() + " " + this.hashCode());
            }
            if (this.DEBUG1) {
                this.debug("m_remoteBroker= " + this.m_remoteBroker.hashCode());
            }
            this.m_lastAccessTime = System.currentTimeMillis();
            this.m_activityMonitorThread = new GAXMonitorThread("GuarAckExchanger Thread " + this.hashCode() + " RB" + this.m_remoteBroker.hashCode());
            this.m_activityMonitorThread.start();
            this.m_reqMH = new MessageHandler(new IMessageHandler(){

                @Override
                public void handleMessage(Session s, Envelope env) {
                    try {
                        GuarAckExchanger.this.handleRequest(env);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            });
            this.m_reqMH.setName("DRA GuarAck request handler");
            this.m_reqMH.bind(GUAR_REQ_SUBJ);
            this.m_con.addMessageHandler(this.m_reqMH);
            if (this.DEBUG) {
                this.debug("Session= " + this.m_ssn + " Binding to " + GUAR_REQ_SUBJ);
            }
            this.m_repMH = new MessageHandler(new IMessageHandler(){

                @Override
                public void handleMessage(Session s, Envelope env) {
                    try {
                        GuarAckExchanger.this.handleReply(env);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            });
            this.m_repMH.setName("DRA GuarAck reply handler");
            this.m_con.addMessageHandler(this.m_repMH);
        }
        catch (EGeneralException e) {
            this.m_remoteReqDone = true;
            this.m_localReqDone = true;
            this.m_err = e;
            this.cleanUp();
            throw e;
        }
        if (this.DEBUG1) {
            this.debug("Created GuarAckExchanger; token= " + this.m_token);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    void doExchange() throws EGeneralException {
        GuarAckExchanger guarAckExchanger = this;
        synchronized (guarAckExchanger) {
            try {
                if (this.DEBUG) {
                    this.debug("doExchange: starting exchange");
                }
                if (this.m_isClient) {
                    this.sendClientSolicit();
                }
                if (this.DEBUG) {
                    this.debug("doExchange: waiting for exchange");
                }
                while (!(this.m_localReqDone && this.m_remoteReqDone || this.m_connDropped)) {
                    try {
                        if (Thread.interrupted()) {
                            throw new InterruptedException();
                        }
                        this.wait();
                        if (!this.DEBUG) continue;
                        this.debug("doExchange: m_localReqDone= " + this.m_localReqDone + " m_remoteReqDone= " + this.m_remoteReqDone);
                    }
                    catch (InterruptedException e) {
                        throw new EInterrupted();
                    }
                }
            }
            catch (EGeneralException e) {
                this.m_err = e;
            }
        }
        this.cleanUp();
        if (this.DEBUG) {
            this.debug("doExchange: Exchange Complete");
        }
        if (this.m_err != null) {
            throw this.m_err;
        }
    }

    synchronized void notifyConnDropped() {
        this.m_connDropped = true;
        this.notifyAll();
    }

    void cleanUp() {
        if (this.DEBUG) {
            this.debug("CleanUp: cleaning up");
        }
        if (this.m_localReq != null) {
            this.m_localReq.cancel();
        }
        try {
            if (this.m_reqMH != null) {
                this.m_con.removeMessageHandler(this.m_reqMH);
            }
        }
        catch (EGeneralException e) {
            // empty catch block
        }
        try {
            if (this.m_repMH != null) {
                this.m_con.removeMessageHandler(this.m_repMH);
            }
        }
        catch (EGeneralException eGeneralException) {
            // empty catch block
        }
        if (Thread.currentThread() != this.m_activityMonitorThread && this.m_activityMonitorThread.isAlive()) {
            this.m_activityMonitorThread.interrupt();
        }
        if (this.DEBUG1) {
            this.debug("cleanup: token= " + this.m_token + " GAX handlers have been shut down");
        }
        if (this.DEBUG1) {
            this.debug("cleanup: m_localReqDone= " + this.m_localReqDone + " m_remoteReqDone= " + this.m_remoteReqDone);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void abort() {
        if (this.DEBUG) {
            this.debug("aborting");
        }
        GuarAckExchanger guarAckExchanger = this;
        synchronized (guarAckExchanger) {
            boolean interrupted = false;
            while (!Thread.interrupted() && (this.m_reqHandlerRunning || this.m_repHandlerRunning)) {
                try {
                    this.wait();
                }
                catch (InterruptedException e) {
                    interrupted = true;
                }
            }
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
            this.m_remoteReqDone = true;
            this.m_localReqDone = true;
            this.notifyAll();
        }
        this.cleanUp();
        if (this.DEBUG1) {
            this.debug("abort complete: m_localReqDone= " + this.m_localReqDone + " m_remoteReqDone= " + this.m_remoteReqDone);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void handleRequest(Envelope env) throws InterruptedException {
        GuarAckExchanger guarAckExchanger = this;
        synchronized (guarAckExchanger) {
            this.m_reqHandlerRunning = true;
        }
        try {
            Message m = env.getMessage();
            byte[] body = m.getBody();
            long remoteId = ArrayUtil.readLong(body, 0);
            long token = ArrayUtil.readLong(body, 8);
            if (this.DEBUG) {
                this.debug("handleRequest: got remote request token= " + token);
            }
            if (remoteId != this.m_remoteId || token != this.m_token) {
                if (this.DEBUG1) {
                    this.debug("handleRequest: discarding remote request for Id " + remoteId + " token = " + token + "; Expecting " + this.m_remoteId + " " + this.m_token);
                }
                return;
            }
            GuarAckExchanger guarAckExchanger2 = this;
            synchronized (guarAckExchanger2) {
                this.m_lastAccessTime = System.currentTimeMillis();
                this.m_remoteReq = env;
                this.notifyAll();
            }
            if (!this.m_isClient) {
                this.sendBrokerSolicit();
            }
            AgentRegistrar.getAgentRegistrar().getLogManager().flush();
            int numAcks = this.m_remoteBroker.getNumXOnceQMsgs();
            if (this.DEBUG) {
                this.debug("handleRequest: sending replies: numAcks to send= " + numAcks);
            }
            Enumeration enu = this.m_remoteBroker.getXOnceQMsgTrackNums();
            do {
                int numInMsg;
                boolean more = (numInMsg = Math.min(numAcks, 2000)) < numAcks;
                Message rep = new Message();
                rep.writeBoolean(false);
                rep.writeBoolean(more);
                rep.writeByte(2);
                for (int i = 0; i < numInMsg; ++i) {
                    rep.writeLong((Long)enu.nextElement());
                }
                this.m_ssn.reply(rep, env);
                numAcks -= numInMsg;
                if (!this.DEBUG) continue;
                this.debug("handleRequest: sending replies: sent 1 reply; more= " + more);
            } while (numAcks > 0);
            if (this.DEBUG) {
                this.debug("handleRequest: all acks sent");
            }
        }
        catch (IOException e) {
            if (this.DEBUG) {
                this.debug("handleRequest: Exception in request handler, aborting: " + e);
                BrokerComponent.getComponentContext().logMessage((Throwable)e, 2);
            }
            GuarAckExchanger guarAckExchanger3 = this;
            synchronized (guarAckExchanger3) {
                if (e instanceof EGeneralException) {
                    this.m_err = (EGeneralException)e;
                } else {
                    if (this.DEBUG) {
                        BrokerComponent.getComponentContext().logMessage((Throwable)e, 2);
                    }
                    this.m_err = new ENetworkFailure(151, e.toString());
                }
                this.m_reqHandlerRunning = false;
            }
            this.abort();
        }
        finally {
            GuarAckExchanger guarAckExchanger4 = this;
            synchronized (guarAckExchanger4) {
                this.m_reqHandlerRunning = false;
                this.notifyAll();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void handleReply(Envelope env) throws InterruptedException {
        block57: {
            boolean interrupted = false;
            GuarAckExchanger guarAckExchanger = this;
            synchronized (guarAckExchanger) {
                this.m_repHandlerRunning = true;
                this.m_lastAccessTime = System.currentTimeMillis();
            }
            try {
                Message m = env.getMessage();
                if (m.readBoolean()) {
                    if (this.DEBUG) {
                        this.debug("handleReply: got final reply; remote has completed processing our acks");
                    }
                    this.m_remoteBroker.allXOnceQAcksDone();
                    GuarAckExchanger guarAckExchanger2 = this;
                    synchronized (guarAckExchanger2) {
                        if (this.m_localReqDone) {
                            this.m_localReq.cancel();
                        }
                        this.m_remoteReqDone = true;
                        if (this.DEBUG) {
                            this.debug("handleReply: Remote Request Done");
                        }
                        this.notifyAll();
                    }
                    if (this.DEBUG) {
                        this.debug("handleReply: done processing final reply");
                    }
                    break block57;
                }
                if (this.DEBUG) {
                    this.debug("handleReply: got reply to our request");
                }
                boolean more = m.readBoolean();
                byte type = m.readByte();
                int ct = 0;
                switch (type) {
                    case 1: {
                        break;
                    }
                    case 2: {
                        try {
                            while (true) {
                                long tracking = m.readLong();
                                this.m_remoteBroker.inDoubtQMsgAcknowledged(tracking);
                                ++ct;
                            }
                        }
                        catch (EOFException e) {
                            // empty catch block
                        }
                    }
                }
                if (this.DEBUG) {
                    this.debug("handleReply: finished processing reply; " + ct + " messages acknowledged");
                }
                if (more) break block57;
                if (this.DEBUG) {
                    this.debug("handleReply: no more (nonfinal) replies");
                }
                this.m_repMH.setOkToInterrupt(false);
                this.m_remoteBroker.allInDoubtQAcksReceived();
                this.m_repMH.setOkToInterrupt(true);
                if (this.m_repMH.wasInterrupted()) {
                    interrupted = true;
                    this.m_repMH.setInterrupted(false);
                    throw new EInterrupted();
                }
                GuarAckExchanger e = this;
                synchronized (e) {
                    while (!Thread.interrupted() && this.m_remoteReq == null) {
                        try {
                            this.wait();
                        }
                        catch (InterruptedException e2) {
                            interrupted = true;
                            throw new EInterrupted();
                        }
                    }
                }
                if (this.DEBUG) {
                    this.debug("handleReply: sending our final reply; completed processing of acks");
                }
                Message rep = new Message();
                rep.writeBoolean(true);
                this.m_ssn.reply(rep, this.m_remoteReq);
                GuarAckExchanger guarAckExchanger3 = this;
                synchronized (guarAckExchanger3) {
                    if (this.m_remoteReqDone) {
                        this.m_localReq.cancel();
                    }
                    this.m_localReqDone = true;
                    if (this.DEBUG) {
                        this.debug("handleReply: Local Request Done");
                    }
                    this.notifyAll();
                }
                if (this.DEBUG) {
                    this.debug("handleReply: final reply sent");
                }
            }
            catch (IOException e) {
                if (this.DEBUG) {
                    this.debug("Exception in reply handler, aborting: " + e);
                    BrokerComponent.getComponentContext().logMessage((Throwable)e, 2);
                }
                GuarAckExchanger guarAckExchanger4 = this;
                synchronized (guarAckExchanger4) {
                    if (e instanceof EGeneralException) {
                        this.m_err = (EGeneralException)e;
                    } else {
                        if (this.DEBUG) {
                            BrokerComponent.getComponentContext().logMessage((Throwable)e, 2);
                        }
                        this.m_err = new ENetworkFailure(151, e.toString());
                    }
                    this.m_repHandlerRunning = false;
                }
                this.abort();
            }
            finally {
                GuarAckExchanger guarAckExchanger5 = this;
                synchronized (guarAckExchanger5) {
                    this.m_repHandlerRunning = false;
                    this.notifyAll();
                }
                if (interrupted) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    private void sendClientSolicit() throws EGeneralException {
        if (this.DEBUG) {
            this.debug("sendClientSolicit: sending client side solicit");
        }
        Label local = new Label();
        local.setRouteLimit(1);
        Message req = new Message(GUAR_REQ_SUBJ);
        req.writeLong(this.m_localId);
        req.writeLong(this.m_token);
        this.m_localReq = this.m_ssn.solicit(new Envelope(req, local), this.m_repMH);
    }

    private void sendBrokerSolicit() throws EGeneralException {
        if (this.DEBUG) {
            this.debug("sendBrokerSolicit: sending broker side solicit");
        }
        Message req = new Message(GUAR_REQ_SUBJ);
        req.writeLong(this.m_localId);
        req.writeLong(this.m_token);
        if (this.DEBUG) {
            this.debug("Soliciting to " + this.m_remoteBroker.getClientID());
        }
        this.m_localReq = ((RouterSession)this.m_ssn).solicit(this.m_remoteBroker.getClientID(), req, this.m_repMH, 0);
    }

    class GAXMonitorThread
    extends DebugThread {
        GAXMonitorThread(String name) {
            super(name);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void threadMain() {
            if (GuarAckExchanger.this.DEBUG1) {
                this.debug("Thread starting; timeout= " + GuarAckExchanger.this.GAX_TIMEOUT);
            }
            boolean abort = false;
            while (!GAXMonitorThread.interrupted()) {
                try {
                    GAXMonitorThread.sleep(GuarAckExchanger.this.GAX_TIMEOUT);
                }
                catch (InterruptedException e) {
                    if (!GuarAckExchanger.this.DEBUG1) break;
                    this.debug("GAX Monitor thread has been interrupted ");
                    break;
                }
                long currentTime = System.currentTimeMillis();
                GuarAckExchanger guarAckExchanger = GuarAckExchanger.this;
                synchronized (guarAckExchanger) {
                    if (currentTime - GuarAckExchanger.this.m_lastAccessTime > GuarAckExchanger.this.GAX_TIMEOUT) {
                        abort = true;
                        break;
                    }
                }
            }
            if (abort) {
                if (GuarAckExchanger.this.DEBUG1) {
                    this.debug("GAX Monitor thread calling GAX abort ");
                }
                GuarAckExchanger.this.abort();
            }
            if (GuarAckExchanger.this.DEBUG1) {
                this.debug("GAX Monitor thread exiting ");
            }
        }
    }
}

