/*
 * Decompiled with CFR 0.152.
 */
package progress.message.ft;

import com.sonicsw.mq.components.BrokerComponent;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Date;
import java.util.Enumeration;
import java.util.Hashtable;
import java.util.Map;
import java.util.Properties;
import progress.message.broker.Acceptor;
import progress.message.broker.AgentConnection;
import progress.message.broker.AgentSender;
import progress.message.broker.Broker;
import progress.message.broker.Config;
import progress.message.broker.IClientContext;
import progress.message.broker.SSLAcceptor;
import progress.message.broker.TCPIPAcceptor;
import progress.message.ft.ConnectionManager;
import progress.message.ft.ReplicationListener;
import progress.message.ft.ReplicationManager;
import progress.message.ft.ReplicationSender;
import progress.message.ft.ReplicationSessionContext;
import progress.message.msg.IMgram;
import progress.message.msg.MgramFactory;
import progress.message.net.ESocketConfigException;
import progress.message.net.ISocket;
import progress.message.net.ISocketHandler;
import progress.message.net.ProgressSocketFactory;
import progress.message.util.ArrayUtil;
import progress.message.util.DebugState;
import progress.message.util.URLUtil;
import progress.message.zclient.ClientSecurityContext;
import progress.message.zclient.EUnexpectedMgram;
import progress.message.zclient.ProgressSecureRandom;

public final class ReplicationChannel
extends AgentConnection
implements Runnable {
    private static final int TCP = 0;
    private static final int SSL = 1;
    private static final int DISCONNECTED = 0;
    private static final int PASSIVE_CONNECT = 1;
    private static final int ACTIVE_CONNECT = 4;
    private static final int ACTIVE = 5;
    private static final String[] CHANNEL_STATE = new String[]{"DISCONNECTED", "PASSIVE_CONNECT", "ILLEGAL_STATE", "ILLEGAL_STATE", "ACTIVE_CONNECT", "ACTIVE"};
    private static final Hashtable s_channelTable = new Hashtable();
    private static final Comparator s_comparator = new Comparator(){

        public int compare(Object x, Object y) {
            int w2;
            int w1 = ((ReplicationChannel)x).getWeight();
            if (w1 > (w2 = ((ReplicationChannel)y).getWeight())) {
                return -1;
            }
            if (w1 == w2) {
                return 0;
            }
            return 1;
        }
    };
    private static ReplicationChannel[] s_channels = null;
    private static ConnectionManager s_connectionManager = null;
    private static ISocketHandler s_socketHandler = null;
    private static boolean s_shutdown = false;
    private String m_channelName;
    private int m_protocol;
    private int m_weight;
    private boolean m_isAdminShutdown = false;
    Hashtable m_properties = null;
    private Acceptor m_acceptor = null;
    private volatile int m_state = 0;
    private volatile boolean m_invalid = false;
    private String m_primaryAddr = null;
    private int m_primaryPort = -1;
    private String m_backupAddr = null;
    private int m_backupPort = -1;
    private Properties m_channelProperties;
    private String _lastReset = null;
    private boolean m_inShutdown = false;
    private Object m_shutdownLock = new Object();
    private Thread m_connectThread = null;
    private ReplicationSessionContext m_sc = null;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    static void startAcceptors(ConnectionManager cm) {
        Hashtable hashtable = s_channelTable;
        synchronized (hashtable) {
            s_connectionManager = cm;
            s_socketHandler = cm.getReplicationManager();
            if (s_channels == null) {
                return;
            }
            if (s_channels != null) {
                for (int i = 0; i < s_channels.length; ++i) {
                    s_channels[i].startAcceptor(s_socketHandler);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static ReplicationChannel addChannel(String name, String protocol, String primaryAddr, int primaryPort, String backupAddr, int backupPort, String dynHost, int weight, Properties properties) {
        try {
            ReplicationChannel channel = null;
            boolean startImmediately = false;
            Hashtable hashtable = s_channelTable;
            synchronized (hashtable) {
                if (s_shutdown) {
                    return null;
                }
                channel = new ReplicationChannel(name, protocol, primaryAddr, primaryPort, backupAddr, backupPort, dynHost, weight, properties);
                s_channelTable.put(channel.getName(), channel);
                s_channels = ReplicationChannel.sortChannels();
                if (s_connectionManager != null) {
                    startImmediately = true;
                }
            }
            if (startImmediately) {
                channel.startAcceptor(s_socketHandler);
                s_connectionManager.newChannelAvailable();
                if (s_connectionManager.acquireSessionContext() != null) {
                    channel.startConnectThread();
                }
            }
            return channel;
        }
        catch (IOException ex) {
            return null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static boolean removeChannel(String channelName) {
        ReplicationChannel channel = null;
        Hashtable hashtable = s_channelTable;
        synchronized (hashtable) {
            channel = (ReplicationChannel)s_channelTable.remove(channelName);
            if (channel == null) {
                return false;
            }
            channel.invalidate();
            channel.notifyPeerOfShutdown();
            s_channels = ReplicationChannel.sortChannels();
        }
        channel.shutdown();
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void shutdownChannels() {
        Hashtable hashtable = s_channelTable;
        synchronized (hashtable) {
            s_shutdown = true;
        }
        for (int i = 0; i < s_channels.length; ++i) {
            if (s_channels[i] == null) continue;
            s_channels[i].shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void setWeight(String channelName, int weight) {
        Hashtable hashtable = s_channelTable;
        synchronized (hashtable) {
            ReplicationChannel ch = (ReplicationChannel)s_channelTable.get(channelName);
            if (ch == null) {
                return;
            }
            int oldWeight = ch.getWeight();
            if (weight == oldWeight) {
                return;
            }
            ch.setWeight(weight);
            s_channels = ReplicationChannel.sortChannels();
            if (s_connectionManager != null && oldWeight == 0) {
                s_connectionManager.newChannelAvailable();
            }
            if (s_connectionManager != null && ch.isConnected()) {
                s_connectionManager.newChannelWeight(ch);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void setAcceptorAddress(String channelName, String address, boolean isPrimary) {
        Hashtable hashtable = s_channelTable;
        synchronized (hashtable) {
            ReplicationChannel ch = (ReplicationChannel)s_channelTable.get(channelName);
            if (ch == null) {
                return;
            }
            if (isPrimary) {
                String oldPrimaryAddress = ch.getPrimaryAddr();
                if (Config.PRIMARY) {
                    if (oldPrimaryAddress != null && !oldPrimaryAddress.equalsIgnoreCase(address)) {
                        BrokerComponent.getComponentContext().logMessage("Can not dynamically change the replcation acceptor address from \"" + oldPrimaryAddress + "\" to \"" + address + "\" for replication channel " + channelName, 3);
                    }
                    return;
                }
                BrokerComponent.getComponentContext().logMessage("BACKUP is notified of the primary's new address (" + address + ") for replication channel " + channelName, 3);
                if (oldPrimaryAddress != null && oldPrimaryAddress.equalsIgnoreCase(address)) {
                    return;
                }
                ch.setPrimaryAddr(address);
            } else {
                String oldBackupAddress = ch.getBackupAddr();
                if (!Config.PRIMARY) {
                    if (oldBackupAddress != null && !oldBackupAddress.equalsIgnoreCase(address)) {
                        BrokerComponent.getComponentContext().logMessage("Can not dynamically change the replcation acceptor address from \"" + oldBackupAddress + "\" to \"" + address + "\" for replication channel " + channelName, 3);
                    }
                    return;
                }
                BrokerComponent.getComponentContext().logMessage("PRIMARY is notified of the backup's new address (" + address + ") for replication channel " + channelName, 3);
                if (oldBackupAddress != null && oldBackupAddress.equalsIgnoreCase(address)) {
                    return;
                }
                ch.setBackupAddr(address);
            }
            if (s_connectionManager != null) {
                s_connectionManager.newChannelAvailable();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static ReplicationChannel getChannel(String name) {
        Hashtable hashtable = s_channelTable;
        synchronized (hashtable) {
            return (ReplicationChannel)s_channelTable.get(name);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    static ReplicationChannel getNextActive() {
        Hashtable hashtable = s_channelTable;
        synchronized (hashtable) {
            for (int i = 0; i < s_channels.length; ++i) {
                if (!s_channels[i].isValid() || !s_channels[i].isConnected() || s_channels[i].getWeight() <= 0) continue;
                return s_channels[i];
            }
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    static void setConnectionProperties(Hashtable props) {
        Hashtable hashtable = s_channelTable;
        synchronized (hashtable) {
            for (int i = 0; i < s_channels.length; ++i) {
                s_channels[i].setProperties(props);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    static void startChannels() {
        Hashtable hashtable = s_channelTable;
        synchronized (hashtable) {
            for (int i = 0; i < s_channels.length; ++i) {
                s_channels[i].startConnectThread();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static ReplicationChannel[] getChannels() {
        Hashtable hashtable = s_channelTable;
        synchronized (hashtable) {
            return s_channels;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static String[] getChannelNames() {
        Hashtable hashtable = s_channelTable;
        synchronized (hashtable) {
            String[] names = new String[s_channelTable.size()];
            int index = 0;
            Enumeration keys = s_channelTable.keys();
            while (keys.hasMoreElements()) {
                names[index++] = (String)keys.nextElement();
            }
            return names;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    static int getActiveChannelCount() {
        int count = 0;
        Hashtable hashtable = s_channelTable;
        synchronized (hashtable) {
            for (int i = 0; i < s_channels.length; ++i) {
                if (!s_channels[i].isConnected()) continue;
                ++count;
            }
        }
        return count;
    }

    private static ReplicationChannel[] sortChannels() {
        ReplicationChannel[] channels = new ReplicationChannel[s_channelTable.size()];
        int i = 0;
        Enumeration elms = s_channelTable.elements();
        while (elms.hasMoreElements()) {
            channels[i] = (ReplicationChannel)elms.nextElement();
            ++i;
        }
        Arrays.sort(channels, s_comparator);
        return channels;
    }

    public String getName() {
        return this.m_channelName;
    }

    public String getPrimaryAddr() {
        return this.m_primaryAddr;
    }

    public void setPrimaryAddr(String addr) {
        if (this.DEBUG) {
            this.debug("change the primary address from \"" + this.m_primaryAddr + "\" to \"" + addr + "\"");
        }
        this.m_primaryAddr = addr;
    }

    public int getPrimaryPort() {
        return this.m_primaryPort;
    }

    public String getBackupAddr() {
        return this.m_backupAddr;
    }

    public void setBackupAddr(String addr) {
        if (this.DEBUG) {
            this.debug("change the backup address from \"" + this.m_backupAddr + "\" to \"" + addr + "\"");
        }
        this.m_backupAddr = addr;
    }

    public int getBackupPort() {
        return this.m_backupPort;
    }

    public String getLocalURL() {
        return URLUtil.buildUrlString(this.m_protocol == 1 ? "ssl" : "tcp", this.getLocalAddress(), this.getLocalPort());
    }

    public String getRemoteURL() {
        return URLUtil.buildUrlString(this.m_protocol == 1 ? "ssl" : "tcp", this.getRemoteAddress(), this.getRemotePort());
    }

    public boolean isPeerAddressAvailable() {
        return this.getRemoteAddress() != null && this.getRemoteAddress().trim().length() > 0;
    }

    public void simulateNetworkFailure() {
        if (Config.PRIMARY) {
            this.m_backupAddr = "nonexistenthost";
        } else {
            this.m_primaryAddr = "nonexistenthost";
        }
        this.shutdown();
    }

    void setName(String newName) {
        this.m_channelName = newName;
    }

    public void setProperties(Hashtable props) {
        this.m_properties = props;
    }

    public boolean isValid() {
        return !this.m_invalid;
    }

    public int getWeight() {
        return this.m_weight;
    }

    public void invalidate() {
        if (this.DEBUG) {
            this.debug("deleting configuration for channel " + this);
        }
        this.m_invalid = true;
    }

    public void setWeight(int weight) {
        this.m_weight = weight;
    }

    void handlePing(IMgram m) {
        if (this.DEBUG) {
            this.debug("received peer heartbeat at " + new Date(System.currentTimeMillis()).toString());
        }
        if (m.getBodyLength() > 0) {
            this.m_isAdminShutdown = true;
            if (this.DEBUG) {
                this.debug(this.getName() + " is being shut down administratively.");
            }
        }
    }

    boolean isAdminShutdown() {
        return this.m_isAdminShutdown;
    }

    public boolean isConnected() {
        return this.m_state == 5;
    }

    ReplicationChannel(String name, String protocol, String primaryAddr, int primaryPort, String backupAddr, int backupPort, String dynHost, int weight, Properties properties) throws UnknownHostException, IOException {
        super(null, -1L, false);
        if (DebugState.GLOBAL_DEBUG_ON) {
            this.debugName("ReplicationChannel " + name);
        }
        this.m_channelName = name;
        this.m_primaryAddr = primaryAddr;
        this.m_backupAddr = backupAddr;
        this.m_channelProperties = properties;
        if (dynHost != null && dynHost.length() > 0) {
            if (Config.PRIMARY) {
                this.m_primaryAddr = dynHost;
            } else {
                this.m_backupAddr = dynHost;
            }
            if (this.DEBUG) {
                this.debug("creating a channel using dynamic host binding, " + (Config.PRIMARY ? "primary host = " : "backup host = ") + dynHost);
            }
        }
        this.m_primaryPort = primaryPort;
        this.m_backupPort = backupPort;
        int n = this.m_protocol = protocol.equalsIgnoreCase("SSL") ? 1 : 0;
        if (this.DEBUG) {
            String host = Config.PRIMARY ? this.m_primaryAddr : this.m_backupAddr;
            int port = Config.PRIMARY ? this.m_primaryPort : this.m_backupPort;
            this.debug("local bind address = " + host + ":" + port);
            host = Config.PRIMARY ? this.m_backupAddr : this.m_primaryAddr;
            port = Config.PRIMARY ? this.m_backupPort : this.m_primaryPort;
            this.debug("remote connect address = " + host + ":" + port);
        }
        this.m_weight = weight;
        if (this.DEBUG) {
            this.debug("channel weight = " + this.m_weight);
        }
    }

    private String getLocalAddress() {
        return Config.PRIMARY ? this.m_primaryAddr : this.m_backupAddr;
    }

    private int getLocalPort() {
        return Config.PRIMARY ? this.m_primaryPort : this.m_backupPort;
    }

    private String getRemoteAddress() {
        return Config.PRIMARY ? this.m_backupAddr : this.m_primaryAddr;
    }

    private int getRemotePort() {
        return Config.PRIMARY ? this.m_backupPort : this.m_primaryPort;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean startAcceptor(ISocketHandler sh) {
        try {
            Properties props = new Properties();
            Config.getSSLProperties(props);
            props.putAll((Map<?, ?>)this.m_channelProperties);
            this.m_acceptor = this.m_protocol == 1 ? new SSLAcceptor(this.m_channelName, this.getLocalURL(), null, false, null, null, props) : new TCPIPAcceptor(this.m_channelName, this.getLocalURL(), null, false, null, null, props);
            this.m_acceptor.setBindToSpecifiedIpOrHostOnly(true);
        }
        catch (ESocketConfigException ex) {
            return false;
        }
        this.m_acceptor.setSocketHandler(sh);
        this.m_acceptor.start();
        Acceptor acceptor = this.m_acceptor;
        synchronized (acceptor) {
            while (this.m_acceptor.getAcceptorState() == 0) {
                try {
                    this.m_acceptor.wait();
                }
                catch (Exception exception) {}
            }
        }
        return this.m_acceptor.getAcceptorState() != 1;
    }

    void notifyPeerOfShutdown() {
        this.m_isAdminShutdown = true;
        if (this.isConnected()) {
            if (this.DEBUG) {
                this.debug("notify the peer this channel is being shut down administratively.");
            }
            IMgram heartbeatStop = MgramFactory.getMgramFactory().buildPingRequest(new byte[1], 0);
            try {
                this.getAgentSender().sendThrough(heartbeatStop);
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }
    }

    void ping(IMgram m) {
        block8: {
            if (this.isConnected()) {
                try {
                    AgentSender sender = this.getAgentSender();
                    if (sender != null) {
                        sender.sendThrough(m);
                    }
                    break block8;
                }
                catch (IOException ex) {
                    if (this.reset() && !s_connectionManager.getReplicationManager().isShuttingDown()) {
                        s_connectionManager.getReplicationManager().onDisconnect(this);
                        if (!this.isShuttingDown() && s_connectionManager.getConnection() != null) {
                            this.startConnectThread();
                        }
                    }
                    break block8;
                }
            }
            if (this.m_state == 0 && !s_connectionManager.getReplicationManager().isShuttingDown()) {
                s_connectionManager.getReplicationManager().onDisconnect(this);
                if (!this.isShuttingDown() && s_connectionManager.getConnection() != null) {
                    this.startConnectThread();
                }
            }
        }
    }

    boolean pingIfAlive(IMgram m) {
        if (this.isConnected()) {
            try {
                AgentSender sender = this.getAgentSender();
                if (sender != null) {
                    sender.sendThrough(m);
                }
            }
            catch (IOException ex) {
                return false;
            }
            return true;
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean connect(ISocket socket, long socketId, ReplicationSessionContext sc, boolean createThread) {
        ReplicationChannel replicationChannel = this;
        synchronized (replicationChannel) {
            if (sc != null) {
                this.m_sc = sc;
            }
            if (this.m_sc == null) {
                if (this.DEBUG) {
                    this.debug("Unable to activate channel w/o session context.");
                }
                return false;
            }
            ClientSecurityContext csc = this.m_sc.getSecurityContext();
            byte sessionVersion = this.m_sc.getPeerVerson();
            try {
                if (createThread) {
                    if (socket == null) {
                        return false;
                    }
                    this.m_socket = socket;
                    this.setSocketId(socketId);
                    this.setAgentListener(new ReplicationListener(this, ReplicationManager.getReplicationManager()));
                    this.setAgentSender(new ReplicationSender(this));
                }
                this.connectSuccess(0, csc, csc.getClientId());
                ((ReplicationListener)this.getAgentListener()).setClientSessionVer(sessionVersion);
                this.getAgentListener().setCommonSecurityContext(csc);
                if (Config.ENABLE_QOPSECURITY) {
                    byte[] sessionKey = csc.getSessionKey();
                    if (sessionKey != null) {
                        this.getAgentSender().initMessageProtection(sessionKey);
                    } else {
                        if (this.DEBUG) {
                            this.debug("connect() aborting - paired FT broker does not have security enabled.");
                        }
                        this.reset();
                        return false;
                    }
                }
                this.m_isAdminShutdown = false;
                this.m_state = 5;
                this.notifyAll();
                if (this.DEBUG) {
                    this.debug(this + " is connected.");
                }
            }
            catch (IOException ex) {
                if (this.DEBUG) {
                    this.debug(ex.toString(), ex);
                }
                this.reset();
                return false;
            }
        }
        try {
            s_connectionManager.newChannel(this);
        }
        catch (IOException ioe) {
            this.reset();
            return false;
        }
        catch (InterruptedException ex) {
            this.reset();
            return false;
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    synchronized AgentSender activate(IClientContext cc) {
        AgentSender sender = null;
        if (!this.isConnected()) {
            return null;
        }
        try {
            cc.lock();
            sender = this.getAgentSender();
            if (sender.isStarted() && !sender.isAlive() || sender.isShuttingDown() || sender.isInterrupted()) {
                try {
                    sender = new ReplicationSender(this);
                    this.setAgentSender(sender);
                }
                catch (Throwable t) {
                    if (this.DEBUG) {
                        this.debug("failed to create new replication sender for " + this + ", trace follows:", t);
                    }
                    AgentSender agentSender = null;
                    cc.unlock();
                    return agentSender;
                }
            }
            sender.connect(cc, 0);
            if (!sender.isStarted()) {
                sender.start();
            }
        }
        finally {
            cc.unlock();
        }
        BrokerComponent.getComponentContext().logMessage("Replication using " + this.getName(), 3);
        return sender;
    }

    public String debugReset() {
        return this._lastReset;
    }

    synchronized boolean reset() {
        if (this.DEBUG) {
            this.debug(this + " reset by " + Thread.currentThread());
        }
        if (this.m_state == 0) {
            return false;
        }
        StringWriter sw = new StringWriter();
        new Error("Last reset of " + CHANNEL_STATE[this.m_state] + " channel " + this).printStackTrace(new PrintWriter(sw));
        this._lastReset = sw.toString();
        super.close();
        if (this.getAgentSender() != null && this.getAgentSender() != Thread.currentThread()) {
            try {
                this.getAgentSender().shutdown();
            }
            catch (IllegalThreadStateException itse) {
                // empty catch block
            }
        }
        if (this.getAgentListener() != null && this.getAgentListener() != Thread.currentThread()) {
            try {
                this.getAgentListener().shutdown();
            }
            catch (IllegalThreadStateException illegalThreadStateException) {
                // empty catch block
            }
        }
        super.closeInternal(0);
        if (this.DEBUG) {
            this.debug("reset() changing state to DISCONNECTED by thread " + Thread.currentThread());
        }
        this.m_state = 0;
        this.m_sc = null;
        this.setAgentListener(null);
        this.setAgentSender(null);
        this.notifyAll();
        return true;
    }

    @Override
    public void close() {
        this.reset();
    }

    void sendThrough(IMgram m) throws IOException {
        if (!this.isConnected()) {
            throw new IOException("Unable to send - channel is closed.");
        }
        AgentSender as = this.getAgentSender();
        if (as == null) {
            throw new IOException("Unable to send - channel sender is closed.");
        }
        as.sendThrough(m);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean isShuttingDown() {
        Object object = this.m_shutdownLock;
        synchronized (object) {
            return this.m_inShutdown;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() {
        Object object = this.m_shutdownLock;
        synchronized (object) {
            if (this.m_inShutdown) {
                return;
            }
            this.m_inShutdown = true;
        }
        if (this.DEBUG) {
            this.debug("shutdown: stopping ConnectThread");
        }
        this.stopConnectThread();
        try {
            if (this.DEBUG) {
                this.debug("shutdown: stopping Acceptor");
            }
            this.m_acceptor.stopAcceptorNoWait();
            if (this.reset() && !s_connectionManager.getReplicationManager().isShuttingDown()) {
                s_connectionManager.getReplicationManager().onDisconnect(this);
            }
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Override
    public String toString() {
        return "ReplicationChannel " + this.m_channelName + "(" + URLUtil.buildUrlString(null, this.getLocalAddress(), this.getLocalPort()) + ", " + URLUtil.buildUrlString(null, this.getRemoteAddress(), this.getRemotePort()) + ")";
    }

    void startConnectThread() {
        if (!this.isValid()) {
            return;
        }
        if (this.m_connectThread != null && this.m_connectThread.isAlive()) {
            return;
        }
        this.m_connectThread = new Thread((Runnable)this, "Connect thread of ReplicationChannel " + this.getName());
        this.m_connectThread.setDaemon(true);
        this.m_connectThread.start();
    }

    private void stopConnectThread() {
        Thread t = this.m_connectThread;
        this.m_connectThread = null;
        if (t != null) {
            t.interrupt();
        }
    }

    synchronized void passiveConnect(ISocket socket, long socketId, ReplicationSessionContext sc) throws IOException {
        if (this.m_state == 4) {
            if (Config.PRIMARY) {
                if (this.DEBUG) {
                    this.debug("passiveConnect(): collision - PRIMARY closing the accepted socket to continue active connect.");
                }
                socket.close();
                return;
            }
            if (this.DEBUG) {
                this.debug("passiveConnect(): collision - STANDBY continue passive connect AFTER the peer closed the accepted socket; state = " + CHANNEL_STATE[this.m_state]);
            }
            while (this.m_state != 0) {
                try {
                    this.wait();
                }
                catch (InterruptedException ex) {
                    if (this.DEBUG) {
                        this.debug("passiveConnect() interrupted; closing socket");
                    }
                    socket.close();
                    return;
                }
            }
        }
        if (this.m_state != 0) {
            if (this.DEBUG) {
                this.debug("passiveConnect() rejecting passive connect, state = " + CHANNEL_STATE[this.m_state]);
            }
            new Error("rejecting passive connect, state = " + CHANNEL_STATE[this.m_state]).printStackTrace();
            return;
        }
        if (!s_connectionManager.validateSessionContext(sc)) {
            if (this.DEBUG) {
                this.debug("passiveConnect(): invalid SessionContext; aborting connect");
            }
            socket.close();
            return;
        }
        if (this.DEBUG) {
            this.debug("passiveConnect() changing state to PASSIVE_CONNECT by thread " + Thread.currentThread());
        }
        this.m_state = 1;
        this.m_sc = sc;
        this.m_socket = socket;
        this.setSocketId(socketId);
        ReplicationListener listener = new ReplicationListener(this, ReplicationManager.getReplicationManager());
        listener.setActiveConnect(false);
        this.setAgentListener(listener);
        this.setAgentSender(new ReplicationSender(this));
        listener.start();
    }

    synchronized void handleConvert(IMgram m) throws EUnexpectedMgram {
        long token;
        int op = -1;
        try {
            byte[] body = m.getRawBody();
            token = ArrayUtil.readLong(body, 0);
            op = body[8];
        }
        catch (IndexOutOfBoundsException e) {
            if (this.DEBUG) {
                this.debug("handleConvert() - invalid convert request.", e);
            }
            throw new EUnexpectedMgram(m);
        }
        if (op != 3) {
            if (this.DEBUG) {
                this.debug("handleConvert() - invalid convert request type");
            }
            throw new EUnexpectedMgram(m);
        }
        if (this.m_sc != null && this.m_sc.getSessionId() == token) {
            if (this.m_state == 1) {
                if (this.DEBUG) {
                    this.debug("handleConvert() - token ok, sending success reply");
                }
                this.getAgentSender().send(m);
            }
            if (this.connect(null, -1L, null, false)) {
                if (Config.ENABLE_QOPSECURITY) {
                    this.getAgentListener().initMessageProtection(this.m_sc.getSecurityContext().getSessionKey());
                }
                if (this.DEBUG) {
                    this.debug("handleConvert() - activated.");
                }
                return;
            }
            if (this.DEBUG) {
                this.debug("handleConvert() - failed to activate.");
            }
            throw new EUnexpectedMgram(m);
        }
        if (this.DEBUG) {
            this.debug("invalid session token.");
        }
        throw new EUnexpectedMgram(m);
    }

    synchronized ReplicationSessionContext getSessionContext() {
        return this.m_sc;
    }

    synchronized void setSessionContext(ReplicationSessionContext sc) {
        this.m_sc = sc;
    }

    private synchronized boolean activeConnect(ReplicationSessionContext sc) {
        if (this.m_state == 5) {
            if (this.DEBUG) {
                this.debug("activeConnect() stopping active connect - channel is active.");
            }
            return true;
        }
        if (this.m_state == 1) {
            if (this.DEBUG) {
                this.debug("activeConnect() stopping active connect - peer is connecting.");
            }
            return false;
        }
        this.m_state = 4;
        this.m_sc = sc;
        if (this.DEBUG) {
            this.debug("activeConnect() changing state to ACTIVE_CONNECT");
        }
        try {
            if (this.DEBUG) {
                this.debug("activeConnect() openning socket to the peer @ " + this.getRemoteAddress() + ":" + this.getRemotePort());
            }
            this.m_socket = ProgressSocketFactory.getFactory(this.m_protocol).createProgressSocket(null, this.getRemoteAddress(), this.getRemotePort(), this.m_properties, null);
            this.setSocketId(ProgressSecureRandom.theSecureRandom().nextLong());
            if (!s_connectionManager.validateSessionContext(this.m_sc)) {
                if (this.DEBUG) {
                    this.debug("activeConnect(): invalid SessionContext; aborting connect");
                }
                this.reset();
                return true;
            }
            if (this.DEBUG) {
                this.debug("activeConnect() created socket to the peer @ " + this.getRemoteAddress() + ":" + this.getRemotePort());
            }
            ReplicationListener listener = new ReplicationListener(this, ReplicationManager.getReplicationManager());
            listener.setActiveConnect(true);
            this.setAgentListener(listener);
            this.setAgentSender(new ReplicationSender(this));
            listener.start();
            try {
                this.wait();
            }
            catch (InterruptedException ex) {
                if (this.m_connectThread != Thread.currentThread()) {
                    if (this.DEBUG) {
                        this.debug("activeConnect() connect thread is being stopped, state = " + CHANNEL_STATE[this.m_state]);
                    } else if (this.DEBUG) {
                        ex.printStackTrace();
                    }
                }
                this.reset();
                return true;
            }
            if (this.DEBUG) {
                this.debug("activeConnect() waken up with state = " + CHANNEL_STATE[this.m_state]);
            }
            switch (this.m_state) {
                case 5: {
                    return true;
                }
                case 1: {
                    return true;
                }
                case 0: {
                    this.reset();
                    return !this.isValid();
                }
            }
            return false;
        }
        catch (IOException ex) {
            this.reset();
            return false;
        }
    }

    @Override
    public void run() {
        while (this.isValid() && this.m_connectThread == Thread.currentThread() && !Broker.isInShutdown()) {
            ReplicationSessionContext rsc = null;
            try {
                rsc = s_connectionManager.acquireSessionContext(true);
            }
            catch (InterruptedException ex) {
                if (this.DEBUG) {
                    this.debug("connect thread: Interrupted getting SessionContext; exiting; state = " + CHANNEL_STATE[this.m_state]);
                }
                return;
            }
            if (this.activeConnect(rsc)) {
                if (this.DEBUG) {
                    this.debug("connect thread exiting; activeConnect returned true; state = " + CHANNEL_STATE[this.m_state]);
                }
                return;
            }
            if (!this.isValid() || this.m_connectThread != Thread.currentThread() || Broker.isInShutdown()) {
                return;
            }
            try {
                if (this.DEBUG) {
                    this.debug("connect thread sleeping for " + Config.FT_RETRY_INTERVAL + " ms, state = " + CHANNEL_STATE[this.m_state]);
                }
                Thread.sleep(Config.FT_RETRY_INTERVAL);
            }
            catch (InterruptedException ex) {
                if (this.m_connectThread != Thread.currentThread()) {
                    if (this.DEBUG) {
                        this.debug("connect thread is being stopped, state = " + CHANNEL_STATE[this.m_state]);
                    }
                } else if (this.DEBUG) {
                    ex.printStackTrace();
                }
                if (this.DEBUG) {
                    this.debug("connect thread exiting...");
                }
                return;
            }
        }
    }

    public static void main(String[] argv) {
        int max = 10;
        for (int i = 1; i <= max; ++i) {
            ReplicationChannel.addChannel("channel " + i, "tcp", "nbpyin2", 20000 + i, "nbpyin2", 30000 + i, null, i < max ? i - 1 : 5, null);
        }
        ReplicationChannel ch = ReplicationChannel.getChannel("channel 1");
        ch.setName("CHANNEL 1");
        ReplicationChannel.removeChannel("channel 2");
        for (int j = 0; j < s_channels.length; ++j) {
            System.out.println(s_channels[j].getName() + " with weight = " + s_channels[j].getWeight());
        }
    }

    public synchronized boolean reset(long m_id) {
        if (m_id == this.getSocketId()) {
            return this.reset();
        }
        return false;
    }
}

