/*
 * Decompiled with CFR 0.152.
 */
package progress.message.ft;

import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import progress.message.broker.AgentRegistrar;
import progress.message.broker.Broker;
import progress.message.broker.BrokerDatabase;
import progress.message.broker.BrokerJob;
import progress.message.broker.MsgSaver;
import progress.message.db.EDatabaseException;
import progress.message.ft.DurableSyncReplicationMgr;
import progress.message.ft.DynamicSyncManager;
import progress.message.ft.FTMgramFactory;
import progress.message.ft.IRequestHandler;
import progress.message.ft.ReplicationManager;
import progress.message.msg.IMgram;
import progress.message.util.EAssertFailure;
import progress.message.zclient.DebugThread;

public class DurableMsgDynSyncer
extends DebugThread {
    private DynamicSyncManager m_dsm;
    private int m_role = -1;

    public DurableMsgDynSyncer(DynamicSyncManager dsm, int role) {
        super("DurableMsgDynSyncer");
        this.m_dsm = dsm;
        this.m_role = role;
    }

    public static void initRequestHandler() {
        ReplicationManager rm = AgentRegistrar.getAgentRegistrar().getReplicationManager();
        IRequestHandler pubSubHandler = new IRequestHandler(){

            @Override
            public void handleRequest(IMgram m) {
                long replyTracking = FTMgramFactory.getRequestReplyTracking(m);
                List undelMsgs = FTMgramFactory.createReplicateUndelRequest_V2_List(m.getRawBody());
                AgentRegistrar.getAgentRegistrar().getMsgSaver().saveUndelMsgs(undelMsgs, replyTracking);
            }
        };
        rm.registerRequestHandler("PUBSUB_DB_SYNC_EXCHANGE1", pubSubHandler);
        pubSubHandler = new IRequestHandler(){

            @Override
            public void handleRequest(IMgram m) {
                long replyTracking = FTMgramFactory.getRequestReplyTracking(m);
                List undelMsgs = FTMgramFactory.createReplicateUndelRequestList(m.getRawBody());
                AgentRegistrar.getAgentRegistrar().getMsgSaver().saveUndelMsgs(undelMsgs, replyTracking);
            }
        };
        rm.registerRequestHandler("PUBSUB_DB_SYNC_EXCHANGE", pubSubHandler);
    }

    @Override
    public void threadMain() {
        try {
            if (this.DEBUG) {
                this.debug("DurableMsgDynSyncer running... m_role: " + this.m_role);
            }
            if (this.m_role == 1) {
                if (this.DEBUG) {
                    this.debug("get and send database info to be replicated on the standby");
                }
                this.performActiveDynamicSync();
                this.m_dsm.setStatus(2);
            } else if (this.m_role == 2) {
                throw new EAssertFailure("DurableMsgDynSyncer shouldn't be used by the STANDBY during dynamic sync.");
            }
            if (this.DEBUG) {
                this.debug("DurableMsgDynSyncer exiting...");
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void performActiveDynamicSync() throws InterruptedException {
        long maxTrackingNumber = -1L;
        BrokerDatabase db = AgentRegistrar.getAgentRegistrar().getBrokerDatabase();
        DurableSyncReplicationMgr durSyncMgr = null;
        try {
            db.beginPubSubDBTran();
            try {
                durSyncMgr = AgentRegistrar.getAgentRegistrar().getDurableSyncReplicationMgr();
                maxTrackingNumber = db.getMaxAddedMessageId();
                durSyncMgr.setInDurableDynamicSync(maxTrackingNumber);
                this.m_dsm.pubSubSnapshotComplete();
            }
            finally {
                db.releasePubSubDBTran();
            }
        }
        catch (EDatabaseException e) {
            if (durSyncMgr != null) {
                durSyncMgr.dynamicSyncComplete();
            }
            return;
        }
        if (this.CALLBACK) {
            this.callback("Durable dynamic sync snapshot complete", 0, null);
        }
        MsgSaver saver = AgentRegistrar.getAgentRegistrar().getMsgSaver();
        BrokerJob replicateJob = null;
        ReplicatedUndelMsgInfo reply = null;
        long minTrackingNumber = -1L;
        boolean canceled = false;
        boolean syncSuccessful = false;
        try {
            if (this.CALLBACK) {
                this.callback("PRE UNDEL CHUNK SYNC", 1, null);
            }
            do {
                replicateJob = new BrokerJob(1);
                saver.replicateUndelMsgs(minTrackingNumber, maxTrackingNumber, replicateJob);
                if (this.CALLBACK) {
                    this.callback("UNDEL CHUNK SYNC", 2, null);
                }
                Collection info = replicateJob.join();
                if (!replicateJob.wasCancelled()) {
                    Iterator iter = info.iterator();
                    reply = (ReplicatedUndelMsgInfo)iter.next();
                    minTrackingNumber = reply.m_maxTrackingNumberReplicated + 1L;
                    if (reply.m_request != null) {
                        IMgram replyMgram;
                        if (this.DEBUG) {
                            this.debug("Waiting for response from standby");
                        }
                        reply.m_request.join();
                        if (this.DEBUG) {
                            this.debug("Response from standby received");
                        }
                        if ((replyMgram = reply.m_request.getReplyMgram()) != null) {
                            List mgramsToReplicate = FTMgramFactory.createReplicateUndelReplyList(replyMgram.getRawBody());
                            if (mgramsToReplicate.size() <= 0 && !reply.m_isComplete) continue;
                            saver.replicateMsgs(mgramsToReplicate, reply.m_isComplete);
                            syncSuccessful = reply.m_isComplete;
                            continue;
                        }
                        if (this.DEBUG) {
                            this.debug("Response from standby failed");
                        }
                        canceled = true;
                        continue;
                    }
                    if (!reply.m_isComplete) continue;
                    saver.replicateMsgs(null, true);
                    syncSuccessful = true;
                    continue;
                }
                canceled = true;
            } while (!canceled && !reply.m_isComplete && !Broker.exiting && !Thread.currentThread().isInterrupted());
        }
        finally {
            if (!syncSuccessful) {
                durSyncMgr.dynamicSyncComplete();
            }
        }
    }

    public static final class ReplicatedUndelMsgInfo {
        public long m_maxTrackingNumberReplicated;
        public ReplicationManager.ReplicationRequest m_request;
        public boolean m_isComplete;
    }
}

