/*
 * Decompiled with CFR 0.152.
 */
package progress.message.ft;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Iterator;
import java.util.TreeMap;
import progress.message.broker.AgentRegistrar;
import progress.message.broker.Broker;
import progress.message.broker.QueueMsgSaver;
import progress.message.dbq.IPtpDBQ;
import progress.message.ft.DynamicSyncDBQMsgsTracker;
import progress.message.ft.DynamicSyncManager;
import progress.message.ft.FTMgramFactory;
import progress.message.ft.IRequestHandler;
import progress.message.ft.ReplicationManager;
import progress.message.ft.StandbyDBQMsgs;
import progress.message.msg.IMgram;
import progress.message.util.PriorityQueue;
import progress.message.zclient.DebugThread;

public class QueueMsgDynSyncer
extends DebugThread {
    private DynamicSyncManager m_dsm;
    private ReplicationManager m_rm;
    private QueueMsgSaver m_qmsgSaver;
    private IPtpDBQ m_ptpDBQ;
    private int m_role = -1;
    private DynamicSyncDBQMsgsTracker m_dbqMsgsTracker;
    private Throwable m_err = null;
    private boolean m_syncComplete = false;
    private long m_minActiveIndoubt = -1L;
    private long m_maxActiveIndoubt = -1L;
    private long m_lastMax = -1L;
    private PriorityQueue m_requestQueue = null;
    static final int TRACKING_NUMBERS_XCHANGE = 0;
    static final int IN_DOUBT_RESOLUTION = 1;

    public QueueMsgDynSyncer(DynamicSyncManager dsm, int role) {
        super("QueueMsgDynSyncer");
        this.m_dsm = dsm;
        this.m_rm = this.m_dsm.getReplicationManager();
        this.m_role = role;
        AgentRegistrar reg = AgentRegistrar.getAgentRegistrar();
        this.m_qmsgSaver = reg.getQueueMsgSaver();
        this.m_ptpDBQ = reg.getBrokerDatabase().getIPtpDBQ();
        this.m_dbqMsgsTracker = new DynamicSyncDBQMsgsTracker(this.m_dsm);
        if (this.m_role == 2) {
            IRequestHandler handler = new IRequestHandler(){

                @Override
                public void handleRequest(IMgram m) {
                    long replyTracking = FTMgramFactory.getRequestReplyTracking(m);
                    byte[] data = m.getRawBody();
                    int offset = 8;
                    QueueMsgDynSyncer.this.enqueueRequest(replyTracking, data, offset);
                }
            };
            this.m_rm.registerRequestHandler("QUEUE_DB_SYNC_EXCHANGE", handler);
            this.m_requestQueue = new PriorityQueue(1);
            StandbyDBQMsgs dbqMsgs = this.m_rm.getReplicationDemultiplexer().getStandbyDBQMsgs();
        }
    }

    DynamicSyncDBQMsgsTracker getDynamicSyncDBQTracker() {
        return this.m_dbqMsgsTracker;
    }

    private void enqueueRequest(long replyTracking, byte[] requestData, int offset) {
        if (this.DEBUG) {
            this.debug("receivng active's request...");
        }
        this.m_requestQueue.enqueue(new DBQMessageSyncRequest(replyTracking, requestData, offset), 0);
    }

    public void endStandbyDynamicSync(long lastTracking) {
        this.m_requestQueue.enqueue(new DBQMessageSyncRequest(lastTracking), 0);
    }

    private DBQMessageSyncRequest getNextRequest() throws InterruptedException {
        DBQMessageSyncRequest req = (DBQMessageSyncRequest)this.m_requestQueue.dequeueWait();
        if (this.DEBUG) {
            this.debug("processing active's request...");
        }
        return req;
    }

    private boolean processActiveRequest(DBQMessageSyncRequest request) {
        long replyTracking = request.m_replyTracking;
        byte[] requestData = request.m_requestData;
        int offset = request.m_offset;
        boolean isEnd = request.m_isEnd;
        boolean handled = true;
        if (isEnd) {
            this.completeStandbySync(request.m_lastTracking);
            return true;
        }
        try {
            ByteArrayInputStream bis = new ByteArrayInputStream(requestData, offset, requestData.length - 1);
            DataInputStream data = new DataInputStream(bis);
            int type = data.readInt();
            if (this.DEBUG) {
                this.debug("request type = " + type);
            }
            switch (type) {
                case 0: {
                    if (this.DEBUG) {
                        this.debug("Active tracking numbers received - send standby tracking numbers.");
                    }
                    handled = this.sendStandbyTrackingNumbers(data, replyTracking);
                    break;
                }
                case 1: {
                    handled = this.sendInDoubtReply(data, replyTracking);
                }
            }
        }
        catch (Exception ex) {
            if (this.DEBUG) {
                ex.printStackTrace();
            }
            this.m_err = ex;
            handled = false;
        }
        if (!handled) {
            this.sendErrorReply(replyTracking);
        }
        return handled;
    }

    private boolean sendActiveTrackingNumbers(long max, long min) throws InterruptedException {
        if (this.DEBUG) {
            this.debug("sending tracking numbers to standby, max = " + max + ", min = " + min);
        }
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        DataOutputStream request = new DataOutputStream(bos);
        try {
            request.writeInt(0);
            request.writeLong(max);
            request.writeLong(min);
            request.close();
        }
        catch (IOException ioe) {
            if (!Broker.exiting) {
                ioe.printStackTrace();
            }
            return false;
        }
        ReplicationManager.ReplicationRequest job = this.m_rm.sendRequest("QUEUE_DB_SYNC_EXCHANGE", bos.toByteArray());
        if (job == null) {
            if (this.DEBUG) {
                this.debug("Failed to send active's tracking numbers, max = " + max + ", min = " + min);
            }
            return false;
        }
        IMgram reply = null;
        if (job.join()) {
            reply = job.getReplyMgram();
        }
        if (reply != null) {
            ByteArrayInputStream bis = new ByteArrayInputStream(reply.getRawBody());
            DataInputStream data = new DataInputStream(bis);
            try {
                if (!data.readBoolean()) {
                    return false;
                }
                long standbyMax = data.readLong();
                long standbyMin = data.readLong();
                if (this.DEBUG) {
                    this.debug("received standby tracking numbers, max = " + standbyMax + ", min = " + standbyMin);
                }
                this.m_dbqMsgsTracker.setStandbyTrackings(standbyMax, standbyMin);
                this.m_dbqMsgsTracker.sort();
                return true;
            }
            catch (IOException ioe) {
                if (!Broker.exiting) {
                    ioe.printStackTrace();
                }
                return false;
            }
        }
        if (this.DEBUG) {
            this.debug("Failed to exchange tracking numbers with standby - no reply.");
        }
        return false;
    }

    private void sendSyncComplete(long lastTracking) throws InterruptedException {
        if (this.DEBUG) {
            this.debug("notify the standby of the completion of queue message sync.");
        }
        this.m_qmsgSaver.dynamicSyncDone(lastTracking);
    }

    void sendErrorReply(long replyTracking) {
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        DataOutputStream reply = new DataOutputStream(bos);
        try {
            reply.writeBoolean(false);
            reply.close();
        }
        catch (IOException ioe) {
            if (this.DEBUG) {
                ioe.printStackTrace();
            }
            return;
        }
        this.m_rm.sendReply(replyTracking, bos.toByteArray());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean sendStandbyTrackingNumbers(DataInputStream request, long replyTracking) throws IOException {
        if (this.checkDebugFlags(64)) {
            this.debug("sendStandbyTrackingNumbers starting");
        }
        this.m_dbqMsgsTracker.setActiveTrackings(request.readLong(), request.readLong());
        request.close();
        long max = -1L;
        long min = -1L;
        boolean releaseLock = false;
        try {
            this.m_ptpDBQ.acquireLock();
            releaseLock = true;
            Long[] result = this.m_ptpDBQ.getMaxMinMessageIds();
            if (result != null) {
                if (result[0] != null) {
                    max = result[0];
                }
                if (result[1] != null) {
                    min = result[1];
                }
                this.m_dbqMsgsTracker.setStandbyTrackings(max, min);
                if (this.DEBUG) {
                    this.debug("standby db snapshot taken, max id = " + max + ", min id = " + min);
                }
                this.m_ptpDBQ.commit();
            }
        }
        catch (Exception ex) {
            if (!Broker.exiting && this.DEBUG) {
                ex.printStackTrace();
            }
            this.m_err = ex;
            boolean bl = false;
            return bl;
        }
        finally {
            if (releaseLock) {
                this.m_ptpDBQ.releaseLock();
            }
        }
        this.m_dbqMsgsTracker.sort();
        this.m_dbqMsgsTracker.deleteMTD();
        if (this.m_dbqMsgsTracker.getInDoubt() != null) {
            long lower = this.m_dbqMsgsTracker.getInDoubt().m_lower;
            long upper = this.m_dbqMsgsTracker.getInDoubt().m_upper;
            if (this.checkDebugFlags(64)) {
                this.debug("sendStandbyTrackingNumbers: Deleting indoubt messages; lower= " + lower + " upper= " + upper);
            }
            this.m_qmsgSaver.deleteMsgs(lower, upper);
            try {
                this.m_qmsgSaver.flush();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            int ct = this.m_dsm.getStandbyDBQMsgs().removeBatch(lower, upper);
            if (this.checkDebugFlags(64)) {
                this.debug("Completed deleting indoubt messages; lower= " + lower + " upper= " + upper + " count= " + ct);
            }
        }
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        DataOutputStream reply = new DataOutputStream(bos);
        try {
            reply.writeBoolean(true);
            reply.writeLong(max);
            reply.writeLong(min);
            reply.close();
        }
        catch (IOException ioe) {
            if (!Broker.exiting) {
                ioe.printStackTrace();
            }
            this.m_err = ioe;
            return false;
        }
        this.m_rm.sendReply(replyTracking, bos.toByteArray());
        if (this.checkDebugFlags(64)) {
            this.debug("sendStandbyTrackingNumbers completed");
        }
        return true;
    }

    boolean sendInDoubtReply(DataInputStream request, long replyTracking) throws IOException {
        TreeMap<Long, Object> activeInDoubts = new TreeMap<Long, Object>();
        Object dummyObj = new Object();
        long tracking = -1L;
        int count = request.readInt();
        if (this.checkDebugFlags(64)) {
            this.debug("sendInDoubtReply: received " + count + " in-doubt trackings from active.");
        }
        for (int i = 0; i < count; ++i) {
            tracking = request.readLong();
            activeInDoubts.put(new Long(tracking), dummyObj);
            if (!this.DEBUG) continue;
            this.debug("received in-doubt tracking " + tracking + " from active.");
        }
        long max = (Long)activeInDoubts.lastKey();
        long min = (Long)activeInDoubts.firstKey();
        this.m_minActiveIndoubt = this.m_minActiveIndoubt == -1L ? min : Math.min(min, this.m_minActiveIndoubt);
        this.m_maxActiveIndoubt = this.m_maxActiveIndoubt == -1L ? max : Math.max(max, this.m_maxActiveIndoubt);
        request.close();
        if (this.DEBUG) {
            this.debug("selecting ids within " + min + " and " + max + " from stadnby db.");
        }
        long startPos = -1L;
        startPos = this.m_lastMax == -1L ? min : this.m_lastMax + 1L;
        this.m_lastMax = max;
        TreeMap result = this.m_ptpDBQ.getMessageIds(startPos, max, (int)(max - startPos + 1L));
        if (this.checkDebugFlags(64)) {
            this.debug("sendInDoubtReply: " + result.size() + " ids found within " + min + " and " + max + " in standby db.");
        }
        if (result != null) {
            for (Long remove : result.keySet()) {
                if (activeInDoubts.remove(remove) != null) continue;
                if (this.DEBUG) {
                    this.debug("COULD NOT FIND: " + remove + " In active's list, deleting");
                }
                this.m_qmsgSaver.deleteMsg(remove, false);
                this.m_dsm.getStandbyDBQMsgs().remove(remove);
            }
        }
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        DataOutputStream reply = new DataOutputStream(bos);
        try {
            reply.writeBoolean(true);
            int total = activeInDoubts.size();
            reply.writeInt(total);
            if (this.checkDebugFlags(64)) {
                this.debug("sendInDoubtReply: requesting " + total + " message from active.");
            }
            Iterator iter = activeInDoubts.keySet().iterator();
            while (iter.hasNext()) {
                tracking = (Long)iter.next();
                reply.writeLong(tracking);
                if (!this.DEBUG) continue;
                this.debug("requesting message from active, tracking = " + tracking);
            }
            reply.close();
        }
        catch (IOException ioe) {
            if (!Broker.exiting) {
                ioe.printStackTrace();
            }
            this.m_err = ioe;
            return false;
        }
        this.m_rm.sendReply(replyTracking, bos.toByteArray());
        if (this.checkDebugFlags(64)) {
            this.debug("sendInDoubtReply completed");
        }
        return true;
    }

    void completeStandbySync(long lastTracking) {
        if (this.DEBUG) {
            this.debug("active message sync completed, last tracking = " + lastTracking);
        }
        StandbyDBQMsgs dbqMsgs = this.m_dsm.getStandbyDBQMsgs();
        if (lastTracking == -1L && this.m_dbqMsgsTracker.getActiveMax() == -1L && this.m_dbqMsgsTracker.getActiveMin() == -1L) {
            this.m_dbqMsgsTracker.setStandbyTrackings(dbqMsgs.getMaxTrackingNumber(), dbqMsgs.getMinTrackingNumber());
            this.m_dbqMsgsTracker.sort();
            this.m_dbqMsgsTracker.deleteMTD();
        }
        if (this.m_dbqMsgsTracker.getInDoubt() != null) {
            long min = this.m_dbqMsgsTracker.getInDoubt().m_lower;
            long max = this.m_dbqMsgsTracker.getInDoubt().m_upper;
            if (this.m_minActiveIndoubt == -1L) {
                this.m_qmsgSaver.deleteMsgs(min, max);
                int ct = dbqMsgs.removeBatch(min, max);
                if (this.DEBUG) {
                    this.debug("completeStandbySync: db saver op (1) enqueued to delete messages from " + min + " to " + max + " count= " + ct);
                }
            } else {
                int ct;
                if (min < this.m_minActiveIndoubt) {
                    this.m_qmsgSaver.deleteMsgs(min, this.m_minActiveIndoubt - 1L);
                    ct = dbqMsgs.removeBatch(min, this.m_minActiveIndoubt - 1L);
                    if (this.DEBUG) {
                        this.debug("completeStandbySync: db saver op (2) enqueued to delete messages from " + min + " to " + (this.m_minActiveIndoubt - 1L) + " count= " + ct);
                    }
                }
                if (max > this.m_maxActiveIndoubt) {
                    this.m_qmsgSaver.deleteMsgs(this.m_maxActiveIndoubt + 1L, max);
                    ct = dbqMsgs.removeBatch(this.m_maxActiveIndoubt + 1L, max);
                    if (this.DEBUG) {
                        this.debug("completeStandbySync: db saver op (3) enqueued to delete messages from " + (this.m_maxActiveIndoubt + 1L) + " to " + max + " count= " + ct);
                    }
                }
            }
        }
        if (this.DEBUG) {
            this.debug("last replicated message received, tracking = " + lastTracking);
        }
        try {
            this.m_qmsgSaver.flush();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        this.m_dsm.setStatus(4);
        this.m_syncComplete = true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void activeLoop() throws InterruptedException {
        boolean failed = false;
        if (this.checkDebugFlags(64)) {
            this.debug("activeLoop starting");
        }
        boolean releaseLock = false;
        try {
            this.m_ptpDBQ.acquireLock();
            releaseLock = true;
            long max = -1L;
            long min = -1L;
            Long[] result = this.m_ptpDBQ.getMaxMinMessageIds();
            if (result != null) {
                if (result[0] != null) {
                    max = result[0];
                }
                if (result[1] != null) {
                    min = result[1];
                }
                this.m_dbqMsgsTracker.setActiveTrackings(max, min);
                if (this.checkDebugFlags(64)) {
                    this.debug("activeLoop: active db snapshot taken, max id = " + max + ", min id = " + min);
                }
                this.m_ptpDBQ.commit();
            }
            this.m_dsm.queueSnapshotComplete();
            if (max == -1L && min == -1L) {
                if (this.DEBUG) {
                    this.debug("max and min message ids unavailable - empty table.");
                }
                this.sendSyncComplete(-1L);
                this.m_dsm.setStatus(4);
                return;
            }
            if (this.sendActiveTrackingNumbers(this.m_dbqMsgsTracker.getActiveMax(), this.m_dbqMsgsTracker.getActiveMin())) {
                if (this.checkDebugFlags(64)) {
                    this.debug("activeLoop: received MAX and MIN message ids from the standby, max = " + this.m_dbqMsgsTracker.getStandbyMax() + ", min = " + this.m_dbqMsgsTracker.getStandbyMin());
                }
            } else {
                if (this.DEBUG) {
                    this.debug("failed to excahnge MAX and MIN message ids with the standby.");
                }
                failed = true;
            }
        }
        catch (Exception ex) {
            if (!Broker.exiting && this.DEBUG) {
                ex.printStackTrace();
            }
            failed = true;
        }
        finally {
            if (releaseLock) {
                this.m_ptpDBQ.releaseLock();
                releaseLock = false;
            }
        }
        if (failed) {
            return;
        }
        if (this.CALLBACK) {
            this.callback("Queue dynamic sync snapshot taken", 0, null);
        }
        if (!this.m_dbqMsgsTracker.reconcile()) {
            return;
        }
        this.sendSyncComplete(this.m_dbqMsgsTracker.getLastTracking());
        this.m_dsm.setStatus(4);
        if (this.checkDebugFlags(64)) {
            this.debug("ActiveLoop: completed");
        }
    }

    private void standbyLoop() throws InterruptedException {
        if (this.checkDebugFlags(64)) {
            this.debug("standbyLoop starting");
        }
        long ct = 0L;
        while (!(Broker.exiting || this.m_syncComplete || this.isShuttingDown())) {
            DBQMessageSyncRequest req = this.getNextRequest();
            if (this.checkDebugFlags(64) && ++ct == 1L) {
                this.debug("standbyLoop: gotFirstRequest");
            }
            if (this.processActiveRequest(req)) continue;
            return;
        }
        if (this.checkDebugFlags(64)) {
            this.debug("StandbyLoop: completed; numRequestsProcessed= " + ct);
        }
    }

    @Override
    public void threadMain() {
        try {
            if (this.DEBUG) {
                this.debug("QueueMsgDynSyncer running... m_role: " + this.m_role);
            }
            if (this.m_role == 1) {
                this.activeLoop();
            } else if (this.m_role == 2) {
                this.standbyLoop();
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        if (this.DEBUG) {
            this.debug("QueueMsgDynSyncer exiting...");
        }
    }

    private static class DBQMessageSyncRequest {
        long m_replyTracking;
        byte[] m_requestData;
        int m_offset;
        boolean m_isEnd = false;
        long m_lastTracking;

        DBQMessageSyncRequest(long tracking, byte[] data, int offset) {
            this.m_replyTracking = tracking;
            this.m_requestData = data;
            this.m_offset = offset;
            this.m_isEnd = false;
        }

        DBQMessageSyncRequest(long lastTracking) {
            this.m_isEnd = true;
            this.m_lastTracking = lastTracking;
        }
    }
}

