http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/STABLE.java
----------------------------------------------------------------------
diff --git 
a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/STABLE.java
 
b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/STABLE.java
deleted file mode 100644
index 5b07a8f..0000000
--- 
a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/STABLE.java
+++ /dev/null
@@ -1,907 +0,0 @@
-/** Notice of modification as required by the LGPL
- *  This file was modified by Gemstone Systems Inc. on
- *  $Date$
- **/
-// $Id: STABLE.java,v 1.39 2005/08/11 12:43:46 belaban Exp $
-
-package com.gemstone.org.jgroups.protocols.pbcast;
-
-
-
-import com.gemstone.org.jgroups.*;
-import com.gemstone.org.jgroups.stack.Protocol;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-import com.gemstone.org.jgroups.util.Streamable;
-import com.gemstone.org.jgroups.util.TimeScheduler;
-import com.gemstone.org.jgroups.util.Util;
-
-import java.io.*;
-import java.util.*;
-
-
-
-
-/**
- * Computes the broadcast messages that are stable; i.e., have been received 
by all members. Sends
- * STABLE events up the stack when this is the case. This allows NAKACK to 
garbage collect messages that
- * have been seen by all members.<p>
- * Works as follows: periodically we mcast our highest seqnos (seen for each 
member) to the group.
- * A stability vector, which maintains the highest seqno for each member and 
initially contains no data,
- * is updated when such a message is received. The entry for a member P is 
computed set to
- * min(entry[P], digest[P]). When messages from all members have been 
received, a stability
- * message is mcast, which causes all members to send a STABLE event up the 
stack (triggering garbage collection
- * in the NAKACK layer).<p>
- * The stable task now terminates after max_num_gossips if no messages or view 
changes have been sent or received
- * in the meantime. It will resume when messages are received. This 
effectively suspends sending superfluous
- * STABLE messages in the face of no activity.<br/>
- * New: when <code>max_bytes</code> is exceeded (unless disabled by setting it 
to 0),
- * a STABLE task will be started (unless it is already running).
- * @author Bela Ban
- */
-public class STABLE extends Protocol  {
-    Address             local_addr=null;
-    final Vector        mbrs=new Vector();
-    final Digest        digest=new Digest(10);        // keeps track of the 
highest seqnos from all members
-    final Digest        latest_local_digest=new Digest(10); // keeps track of 
the latest digests received from NAKACK
-    final Vector        heard_from=new Vector();      // keeps track of who we 
already heard from (STABLE_GOSSIP msgs)
-
-    /** Sends a STABLE gossip every 20 seconds on average. 0 disables 
gossipping of STABLE messages */
-    long                desired_avg_gossip=20000;
-
-    /** delay before we send STABILITY msg (give others a change to send 
first). This should be set to a very
-     * small number (> 0 !) if <code>max_bytes</code> is used */
-    long                stability_delay=6000;
-    StabilitySendTask   stability_task=null;
-    final Object        stability_mutex=new Object(); // to synchronize on 
stability_task
-    volatile/*GemStoneAddition*/ StableTask          stable_task=null;         
    // bcasts periodic STABLE message (added to timer below)
-    final Object        stable_task_mutex=new Object(); // to sync on 
stable_task
-    TimeScheduler       timer=null;                   // to send periodic 
STABLE msgs (and STABILITY messages)
-    static final String name="STABLE";
-
-    /** Total amount of bytes from incoming messages (default = 0 = disabled). 
When exceeded, a STABLE
-     * message will be broadcast and <code>num_bytes_received</code> reset to 
0 . If this is > 0, then ideally
-     * <code>stability_delay</code> should be set to a low number as well */
-    long                max_bytes=0;
-
-    /** The total number of bytes received from unicast and multicast messages 
*/
-    long                num_bytes_received=0;
-
-    /** When true, don't take part in garbage collection protocol: neither 
send STABLE messages nor
-     * handle STABILITY messages */
-    boolean             suspended=false;
-
-    boolean             initialized=false;
-
-    ResumeTask          resume_task=null;
-    final Object        resume_task_mutex=new Object();
-    private long suspendStartTime = 0;
-
-    /** Number of gossip messages */
-    int                 num_gossips=0;
-
-
-
-    @Override // GemStoneAddition
-    public String getName() {
-        return name;
-    }
-
-    // start GemStoneAddition
-    @Override // GemStoneAddition
-    public int getProtocolEnum() {
-      return com.gemstone.org.jgroups.stack.Protocol.enumSTABLE;
-    }
-    // end GemStone addition
-
-
-    public long getDesiredAverageGossip() {
-        return desired_avg_gossip;
-    }
-
-    public void setDesiredAverageGossip(long gossip_interval) {
-        desired_avg_gossip=gossip_interval;
-    }
-
-    public long getMaxBytes() {
-        return max_bytes;
-    }
-
-    public void setMaxBytes(long max_bytes) {
-        this.max_bytes=max_bytes;
-    }
-
-    public int getNumberOfGossipMessages() {return num_gossips;}
-
-    @Override // GemStoneAddition
-    public void resetStats() {
-        super.resetStats();
-        num_gossips=0;
-    }
-
-
-    @Override // GemStoneAddition
-    public Vector requiredDownServices() {
-        Vector retval=new Vector();
-        retval.addElement(Integer.valueOf(Event.GET_DIGEST_STABLE));  // 
NAKACK layer
-        return retval;
-    }
-
-    @Override // GemStoneAddition
-    public boolean setProperties(Properties props) {
-        String str;
-
-        super.setProperties(props);
-        str=props.getProperty("digest_timeout");
-        if(str != null) {
-            props.remove("digest_timeout");
-            
log.error(ExternalStrings.STABLE_DIGEST_TIMEOUT_HAS_BEEN_DEPRECATED_IT_WILL_BE_IGNORED);
-        }
-
-        str=props.getProperty("desired_avg_gossip");
-        if(str != null) {
-            desired_avg_gossip=Long.parseLong(str);
-            props.remove("desired_avg_gossip");
-        }
-
-        str=props.getProperty("stability_delay");
-        if(str != null) {
-            stability_delay=Long.parseLong(str);
-            props.remove("stability_delay");
-        }
-
-        str=props.getProperty("max_gossip_runs");
-        if(str != null) {
-            props.remove("max_gossip_runs");
-            
log.error(ExternalStrings.STABLE_MAX_GOSSIP_RUNS_HAS_BEEN_DEPRECATED_AND_WILL_BE_IGNORED);
-        }
-
-        str=props.getProperty("max_bytes");
-        if(str != null) {
-            max_bytes=Long.parseLong(str);
-            props.remove("max_bytes");
-        }
-
-        str=props.getProperty("max_suspend_time");
-        if(str != null) {
-            
log.error(ExternalStrings.STABLE_MAX_SUSPEND_TIME_IS_NOT_SUPPORTED_ANY_LONGER_PLEASE_REMOVE_IT_IGNORING_IT);
-            props.remove("max_suspend_time");
-        }
-
-        if(props.size() > 0) {
-            
log.error(ExternalStrings.STABLE_THESE_PROPERTIES_ARE_NOT_RECOGNIZED__0, props);
-            
-            return false;
-        }
-        return true;
-    }
-
-
-    private void suspend(long timeout) {
-        if(!suspended) {
-            suspended=true;
-            if(log.isDebugEnabled())
-                log.debug("suspending message garbage collection");
-            if (stack.enableClockStats)
-              suspendStartTime = nanoTime(); // GemStoneAddition
-        }
-        startResumeTask(timeout); // will not start task if already running
-    }
-
-    protected/*GemStoneAddition*/ void resume() {
-        suspended=false;
-        if(log.isDebugEnabled())
-            log.debug("resuming message garbage collection");
-        stopResumeTask();
-    }
-
-    @Override // GemStoneAddition
-    public void start() throws Exception {
-        if(stack != null && stack.timer != null)
-            timer=stack.timer;
-        else
-            throw new Exception("timer cannot be retrieved from protocol 
stack");
-        if(desired_avg_gossip > 0)
-            startStableTask();
-    }
-
-    @Override // GemStoneAddition
-    public void stop() {
-        stopStableTask();
-        clearDigest();
-    }
-
-
-    @Override // GemStoneAddition
-    public void up(Event evt) {
-        Message msg;
-        StableHeader hdr;
-        int type=evt.getType();
-//        long start; // GemStoneAddition
-
-        switch(type) {
-
-        case Event.MSG:
-            msg=(Message)evt.getArg();
-            if(max_bytes > 0) {  // message counting is enabled
-                long size=Math.max(msg.getLength(), 24);
-                num_bytes_received+=size;
-                if(num_bytes_received >= max_bytes) {
-                    if(trace) {
-                        log.trace(new StringBuffer("max_bytes has been reached 
(").append(max_bytes).
-                                  append(", bytes 
received=").append(num_bytes_received).append("): triggers stable msg"));
-                    }
-                    num_bytes_received=0;
-                    // asks the NAKACK protocol for the current digest, reply 
event is GET_DIGEST_STABLE_OK (arg=digest)
-                    passDown(new Event(Event.GET_DIGEST_STABLE));
-                }
-            }
-
-            hdr=(StableHeader)msg.removeHeader(name);
-            if(hdr == null)
-                break;
-            switch(hdr.type) {
-            case StableHeader.STABLE_GOSSIP:
-                handleStableMessage(msg.getSrc(), hdr.stableDigest);
-                break;
-            case StableHeader.STABILITY:
-                handleStabilityMessage(hdr.stableDigest, msg.getSrc());
-                break;
-            default:
-                if(log.isErrorEnabled()) 
log.error(ExternalStrings.STABLE_STABLEHEADER_TYPE__0__NOT_KNOWN, hdr.type);
-            }
-            return;  // don't pass STABLE or STABILITY messages up the stack
-
-        case Event.GET_DIGEST_STABLE_OK:
-            Digest d=(Digest)evt.getArg();
-            synchronized(latest_local_digest) {
-                latest_local_digest.replace(d);
-            }
-            if(trace)
-                log.trace("setting latest_local_digest from NAKACK: " + 
d.printHighSeqnos());
-            sendStableMessage(d);
-            break;
-
-        case Event.VIEW_CHANGE:
-            View view=(View)evt.getArg();
-            handleViewChange(view);
-            break;
-
-        case Event.SET_LOCAL_ADDRESS:
-            local_addr=(Address)evt.getArg();
-            break;
-        }
-        passUp(evt);
-    }
-
-
-
-
-    @Override // GemStoneAddition
-    public void down(Event evt) {
-        switch(evt.getType()) {
-        case Event.VIEW_CHANGE:
-            View v=(View)evt.getArg();
-            handleViewChange(v);
-            break;
-
-        case Event.SUSPEND_STABLE:
-            long timeout=0;
-            Object t=evt.getArg();
-            if(t != null && t instanceof Long)
-                timeout=((Long)t).longValue();
-            suspend(timeout);
-            break;
-
-        case Event.RESUME_STABLE:
-            resume();
-            break;
-        }
-        passDown(evt);
-    }
-
-
-    public void runMessageGarbageCollection() {
-        Digest copy;
-        synchronized(digest) {
-            copy=digest.copy();
-        }
-        sendStableMessage(copy);
-    }
-
-
-
-    /* --------------------------------------- Private Methods 
---------------------------------------- */
-
-
-    private void handleViewChange(View v) {
-        Vector tmp=v.getMembers();
-        mbrs.clear();
-        mbrs.addAll(tmp);
-        adjustSenders(digest, tmp);
-        adjustSenders(latest_local_digest, tmp);
-        resetDigest(tmp);
-        if(!initialized)
-            initialized=true;
-    }
-
-
-    /** Digest and members are guaranteed to be non-null */
-    private void adjustSenders(Digest d, Vector members) {
-        synchronized(d) {
-            // 1. remove all members from digest who are not in the view
-            Iterator it=d.senders.keySet().iterator();
-            Address mbr;
-            while(it.hasNext()) {
-                mbr=(Address)it.next();
-                if(!members.contains(mbr))
-                    it.remove();
-            }
-            // 2. add members to digest which are in the new view but not in 
the digest
-            for(int i=0; i < members.size(); i++) {
-                mbr=(Address)members.get(i);
-                if(!d.contains(mbr))
-                    d.add(mbr, -1, -1);
-            }
-        }
-    }
-
-
-    private void clearDigest() {
-        synchronized(digest) {
-            digest.clear();
-        }
-    }
-
-
-
-    /** Update my own digest from a digest received by somebody else. Returns 
whether the update was successful.
-     *  Needs to be called with a lock on digest */
-    private boolean updateLocalDigest(Digest d, Address sender) {
-        if(d == null || d.size() == 0)
-            return false;
-
-        if(!initialized) {
-            if(trace)
-                log.trace("STABLE message will not be handled as I'm not yet 
initialized");
-            return false;
-        }
-
-        if(!digest.sameSenders(d)) {
-            if(trace)
-                log.trace(new StringBuffer("received a digest 
").append(d.printHighSeqnos()).append(" from ").
-                          append(sender).append(" which has different members 
than mine (").
-                          append(digest.printHighSeqnos()).append("), 
discarding it and resetting heard_from list"));
-            // to avoid sending incorrect stability/stable msgs, we simply 
reset our heard_from list, see DESIGN
-            resetDigest(mbrs);
-            return false;
-        }
-
-        StringBuffer sb=null;
-        if(trace)
-            sb=new StringBuffer("my [").append(local_addr).append("] digest 
before: ").append(digest).
-                    append("\ndigest from ").append(sender).append(": 
").append(d);
-        Address mbr;
-        long highest_seqno, my_highest_seqno, new_highest_seqno;
-        long highest_seen_seqno, my_highest_seen_seqno, new_highest_seen_seqno;
-        Map.Entry entry;
-        com.gemstone.org.jgroups.protocols.pbcast.Digest.Entry val;
-        for(Iterator it=d.senders.entrySet().iterator(); it.hasNext();) {
-            entry=(Map.Entry)it.next();
-            mbr=(Address)entry.getKey();
-            
val=(com.gemstone.org.jgroups.protocols.pbcast.Digest.Entry)entry.getValue();
-            highest_seqno=val.high_seqno;
-            highest_seen_seqno=val.high_seqno_seen;
-
-            // compute the minimum of the highest seqnos deliverable (for 
garbage collection)
-            my_highest_seqno=digest.highSeqnoAt(mbr);
-            // compute the maximum of the highest seqnos seen (for 
retransmission of last missing message)
-            my_highest_seen_seqno=digest.highSeqnoSeenAt(mbr);
-
-            new_highest_seqno=Math.min(my_highest_seqno, highest_seqno);
-            new_highest_seen_seqno=Math.max(my_highest_seen_seqno, 
highest_seen_seqno);
-            digest.setHighestDeliveredAndSeenSeqnos(mbr, new_highest_seqno, 
new_highest_seen_seqno);
-        }
-        if(trace) {
-            sb.append("\nmy [").append(local_addr).append("] digest after: 
").append(digest).append("\n");
-            log.trace(sb);
-        }
-        return true;
-    }
-
-
-
-    private void resetDigest(Vector new_members) {
-        if(new_members == null || new_members.size() == 0)
-            return;
-        synchronized(heard_from) {
-            heard_from.clear();
-            heard_from.addAll(new_members);
-        }
-
-        Digest copy_of_latest;
-        synchronized(latest_local_digest) {
-            copy_of_latest=latest_local_digest.copy();
-        }
-        synchronized(digest) {
-            digest.replace(copy_of_latest);
-            if(trace)
-                log.trace("resetting digest from NAKACK: " + 
copy_of_latest.printHighSeqnos());
-        }
-    }
-
-    /**
-     * Removes mbr from heard_from and returns true if this was the last 
member, otherwise false.
-     * Resets the heard_from list (populates with membership)
-     * @param mbr
-     * @return boolean
-     */
-    private boolean removeFromHeardFromList(Address mbr) {
-        synchronized(heard_from) {
-            heard_from.remove(mbr);
-            if(heard_from.size() == 0) {
-                resetDigest(this.mbrs);
-                return true;
-            }
-        }
-        return false;
-    }
-
-
-    void startStableTask() {
-        // Here, double-checked locking works: we don't want to synchronize if 
the task already runs (which is the case
-        // 99% of the time). If stable_task gets nulled after the condition 
check, we return anyways, but just miss
-        // 1 cycle: on the next message or view, we will start the task
-        if(stable_task != null)
-            return;
-        synchronized(stable_task_mutex) {
-            if(stable_task != null && stable_task.running()) {
-                return;  // already running
-            }
-            stable_task=new StableTask();
-            timer.add(stable_task, true); // fixed-rate scheduling
-        }
-        if(trace)
-            log.trace("stable task started");
-    }
-
-
-    void stopStableTask() {
-        // contrary to startStableTask(), we don't need double-checked locking 
here because this method is not
-        // called frequently
-        synchronized(stable_task_mutex) {
-            if(stable_task != null) {
-                stable_task.stop();
-                stable_task=null;
-            }
-        }
-    }
-
-
-    void startResumeTask(long max_suspend_time) {
-        max_suspend_time=(long)(max_suspend_time * 1.1); // little slack
-
-        synchronized(resume_task_mutex) {
-            if(resume_task != null && resume_task.running()) {
-                return;  // already running
-            }
-            else {
-                resume_task=new ResumeTask(max_suspend_time);
-                timer.add(resume_task, true); // fixed-rate scheduling
-            }
-        }
-        if(log.isDebugEnabled())
-            log.debug("resume task started, max_suspend_time=" + 
max_suspend_time);
-    }
-
-
-    void stopResumeTask() {
-        synchronized(resume_task_mutex) {
-            if(resume_task != null) {
-                resume_task.stop();
-                resume_task=null;
-                if (stack.enableClockStats)
-                  stack.gfPeerFunctions.incJgSTABLEsuspendTime(nanoTime() - 
suspendStartTime);
-            }
-        }
-    }
-
-
-    void startStabilityTask(Digest d, long delay) {
-        synchronized(stability_mutex) {
-            if(stability_task != null && stability_task.running()) {
-            }
-            else {
-                stability_task=new StabilitySendTask(d, delay); // runs only 
once
-                timer.add(stability_task, true);
-            }
-        }
-    }
-
-
-    void stopStabilityTask() {
-        synchronized(stability_mutex) {
-            if(stability_task != null) {
-                stability_task.stop();
-                stability_task=null;
-            }
-        }
-    }
-
-
-    /**
-     Digest d contains (a) the highest seqnos <em>deliverable</em> for each 
sender and (b) the highest seqnos
-     <em>seen</em> for each member. (Difference: with 1,2,4,5, the highest 
seqno seen is 5, whereas the highest
-     seqno deliverable is 2). The minimum of all highest seqnos deliverable 
will be taken to send a stability
-     message, which results in garbage collection of messages lower than the 
ones in the stability vector. The
-     maximum of all seqnos will be taken to trigger possible retransmission of 
last missing seqno (see DESIGN
-     for details).
-     */
-    private void handleStableMessage(Address sender, Digest d) {
-        if(d == null || sender == null) {
-            if(log.isErrorEnabled()) 
log.error(ExternalStrings.STABLE_DIGEST_OR_SENDER_IS_NULL);
-            return;
-        }
-
-        if(!initialized) {
-            if(trace)
-                log.trace("STABLE message will not be handled as I'm not yet 
initialized");
-            return;
-        }
-
-        if(suspended) {
-            if(trace)
-                log.trace("STABLE message will not be handled as I'm 
suspended");
-            return;
-        }
-
-        if(trace)
-            log.trace(new StringBuffer("received stable msg from 
").append(sender).append(": ").append(d.printHighSeqnos()));
-        if(!heard_from.contains(sender)) {  // already received gossip from 
sender; discard it
-            if(trace) log.trace("already received stable msg from " + sender);
-            return;
-        }
-        
-        stack.gfPeerFunctions.incJgSTABLEmessages(1);
-        
-        Digest copy;
-        synchronized(digest) {
-            boolean success=updateLocalDigest(d, sender);
-            if(!success) // we can only remove the sender from heard_from if 
*all* elements of my digest were updated
-                return;
-            copy=digest.copy();
-        }
-
-        boolean was_last=removeFromHeardFromList(sender);
-        if(was_last) {
-            sendStabilityMessage(copy);
-        }
-    }
-
-
-    /**
-     * Bcasts a STABLE message of the current digest to all members. Message 
contains highest seqnos of all members
-     * seen by this member. Highest seqnos are retrieved from the NAKACK layer 
below.
-     * @param d A <em>copy</em> of this.digest
-     */
-    private void sendStableMessage(Digest d) {
-        if(suspended) {
-            if(trace)
-                log.trace("will not send STABLE message as I'm suspended");
-            return;
-        }
-        stack.gfPeerFunctions.incJgSTABLEmessagesSent(1);
-
-        if(d != null && d.size() > 0) {
-            if(trace)
-                log.trace("sending stable msg " + d.printHighSeqnos());
-            Message msg=new Message(); // mcast message
-            StableHeader hdr=new StableHeader(StableHeader.STABLE_GOSSIP, d);
-            msg.putHeader(name, hdr);
-            num_gossips++;
-            passDown(new Event(Event.MSG, msg));
-        }
-    }
-
-
-
-    /**
-     Schedules a stability message to be mcast after a random number of 
milliseconds (range 1-5 secs).
-     The reason for waiting a random amount of time is that, in the worst 
case, all members receive a
-     STABLE_GOSSIP message from the last outstanding member at the same time 
and would therefore mcast the
-     STABILITY message at the same time too. To avoid this, each member waits 
random N msecs. If, before N
-     elapses, some other member sent the STABILITY message, we just cancel our 
own message. If, during
-     waiting for N msecs to send STABILITY message S1, another STABILITY 
message S2 is to be sent, we just
-     discard S2.
-     @param tmp A copy of te stability digest, so we don't need to copy it 
again
-     */
-    void sendStabilityMessage(Digest tmp) {
-        long delay;
-
-        // give other members a chance to mcast STABILITY message. if we 
receive STABILITY by the end of
-        // our random sleep, we will not send the STABILITY msg. this prevents 
that all mbrs mcast a
-        // STABILITY msg at the same time
-        delay=Util.random(stability_delay);
-        startStabilityTask(tmp, delay);
-    }
-
-
-    void handleStabilityMessage(Digest d, Address sender) {
-        if(d == null) {
-            if(log.isErrorEnabled()) 
log.error(ExternalStrings.STABLE_STABILITY_DIGEST_IS_NULL);
-            return;
-        }
-
-        if(!initialized) {
-            if(trace)
-                log.trace("STABLE message will not be handled as I'm not yet 
initialized");
-            return;
-        }
-
-        if(suspended) {
-            if(log.isDebugEnabled()) {
-                log.debug("stability message will not be handled as I'm 
suspended");
-            }
-            return;
-        }
-
-        if(trace)
-            log.trace(new StringBuffer("received stability msg from 
").append(sender).append(": ").append(d.printHighSeqnos()));
-        stopStabilityTask();
-        stack.gfPeerFunctions.incJgSTABILITYmessages(1);
-
-        // we won't handle the gossip d, if d's members don't match the 
membership in my own digest,
-        // this is part of the fix for the NAKACK problem (bugs #943480 and 
#938584)
-        if(!this.digest.sameSenders(d)) {
-            if(log.isDebugEnabled()) {
-                log.debug("received digest (digest=" + d + ") which does not 
match my own digest ("+
-                        this.digest + "): ignoring digest and re-initializing 
own digest");
-            }
-            return;
-        }
-
-        resetDigest(mbrs);
-
-        // pass STABLE event down the stack, so NAKACK can garbage collect old 
messages
-        passDown(new Event(Event.STABLE, d));
-    }
-
-
-
-    /* ------------------------------------End of Private Methods 
------------------------------------- */
-
-
-
-
-
-
-
-    public static class StableHeader extends Header implements Streamable {
-        public static final int STABLE_GOSSIP=1;
-        public static final int STABILITY=2;
-
-        int type=0;
-        // Digest digest=new Digest();  // used for both STABLE_GOSSIP and 
STABILITY message
-        Digest stableDigest=null; // changed by Bela April 4 2004
-
-        public StableHeader() {
-        } // used for externalizable
-
-
-        public StableHeader(int type, Digest digest) {
-            this.type=type;
-            this.stableDigest=digest;
-        }
-
-
-        static String type2String(int t) {
-            switch(t) {
-                case STABLE_GOSSIP:
-                    return "STABLE_GOSSIP";
-                case STABILITY:
-                    return "STABILITY";
-                default:
-                    return "<unknown>";
-            }
-        }
-
-        @Override // GemStoneAddition
-        public String toString() {
-            StringBuffer sb=new StringBuffer();
-            sb.append('[');
-            sb.append(type2String(type));
-            sb.append("]: digest is ");
-            sb.append(stableDigest);
-            return sb.toString();
-        }
-
-
-        public void writeExternal(ObjectOutput out) throws IOException {
-            out.writeInt(type);
-            if(stableDigest == null) {
-                out.writeBoolean(false);
-                return;
-            }
-            out.writeBoolean(true);
-            stableDigest.writeExternal(out);
-        }
-
-
-        public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-            type=in.readInt();
-            boolean digest_not_null=in.readBoolean();
-            if(digest_not_null) {
-                stableDigest=new Digest();
-                stableDigest.readExternal(in);
-            }
-        }
-
-        @Override // GemStoneAddition
-        public long size(short version) {
-            long retval=Global.INT_SIZE + Global.BYTE_SIZE; // type + presence 
for digest
-            if(stableDigest != null)
-                retval+=stableDigest.serializedSize(version);
-            return retval;
-        }
-
-        public void writeTo(DataOutputStream out) throws IOException {
-            out.writeInt(type);
-            Util.writeStreamable(stableDigest, out);
-        }
-
-        public void readFrom(DataInputStream in) throws IOException, 
IllegalAccessException, InstantiationException {
-            type=in.readInt();
-            stableDigest=(Digest)Util.readStreamable(Digest.class, in);
-        }
-
-
-    }
-
-
-
-
-    /**
-     Mcast periodic STABLE message. Interval between sends varies. Terminates 
after num_gossip_runs is 0.
-     However, UP or DOWN messages will reset num_gossip_runs to 
max_gossip_runs. This has the effect that the
-     stable_send task terminates only after a period of time within which no 
messages were either sent
-     or received
-     */
-    protected/*GemStoneAddition*/ class StableTask implements 
TimeScheduler.Task {
-        boolean stopped=false;
-
-        public void stop() {
-            stopped=true;
-        }
-
-        public boolean running() { // syntactic sugar
-            return !stopped;
-        }
-
-        public boolean cancelled() {
-            return stopped;
-        }
-
-        public long nextInterval() {
-            long interval=computeSleepTime();
-            if(interval <= 0)
-                return 10000;
-            else
-                return interval;
-        }
-
-
-        public void run() {
-            if(suspended) {
-                if(trace)
-                    log.trace("stable task will not run as suspended=" + 
suspended);
-                return;
-            }
-
-            // asks the NAKACK protocol for the current digest, reply event is 
GET_DIGEST_STABLE_OK (arg=digest)
-            passDown(new Event(Event.GET_DIGEST_STABLE));
-        }
-
-        long computeSleepTime() {
-            return getRandom((mbrs.size() * desired_avg_gossip * 2));
-        }
-
-        long getRandom(long range) {
-            return (long)((Math.random() * range) % range);
-        }
-    }
-
-
-
-
-
-    /**
-     * Multicasts a STABILITY message.
-     */
-    private class StabilitySendTask implements TimeScheduler.Task {
-        Digest   d=null;
-        boolean  stopped=false;
-        long     delay=2000;
-
-
-        StabilitySendTask(Digest d, long delay) {
-            this.d=d;
-            this.delay=delay;
-        }
-
-        public boolean running() {
-            return !stopped;
-        }
-
-        public void stop() {
-            stopped=true;
-        }
-
-        public boolean cancelled() {
-            return stopped;
-        }
-
-
-        /** wait a random number of msecs (to give other a chance to send the 
STABILITY msg first) */
-        public long nextInterval() {
-            return delay;
-        }
-
-
-        public void run() {
-            Message msg;
-            StableHeader hdr;
-
-            if(suspended) {
-                if(log.isDebugEnabled()) {
-                    log.debug("STABILITY message will not be sent as 
suspended=" + suspended);
-                }
-                stopped=true;
-                return;
-            }
-
-            if(d != null && !stopped) {
-                msg=new Message();
-                hdr=new StableHeader(StableHeader.STABILITY, d);
-                msg.putHeader(STABLE.name, hdr);
-                if(trace) log.trace("sending stability msg " + 
d.printHighSeqnos());
-                passDown(new Event(Event.MSG, msg));
-                d=null;
-            }
-            stopped=true; // run only once
-        }
-    }
-
-
-    private class ResumeTask implements TimeScheduler.Task {
-        boolean running=true;
-        long max_suspend_time=0;
-
-        ResumeTask(long max_suspend_time) {
-            this.max_suspend_time=max_suspend_time;
-        }
-
-        void stop() {
-            running=false;
-        }
-
-        public boolean running() {
-            return running;
-        }
-
-        public boolean cancelled() {
-            return running == false;
-        }
-
-        public long nextInterval() {
-            return max_suspend_time;
-        }
-
-        public void run() {
-            if(suspended)
-                log.warn("ResumeTask resumed message garbage collection - this 
should be done by a RESUME_STABLE event; " +
-                         "check why this event was not received (or increase 
max_suspend_time for large state transfers)");
-            resume();
-        }
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/STABLE.java.latest
----------------------------------------------------------------------
diff --git 
a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/STABLE.java.latest
 
b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/STABLE.java.latest
deleted file mode 100644
index debbd40..0000000
--- 
a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/STABLE.java.latest
+++ /dev/null
@@ -1,897 +0,0 @@
-// $Id: STABLE.java,v 1.31 2005/07/16 13:20:42 belaban Exp $
-
-package org.jgroups.protocols.pbcast;
-
-
-import org.jgroups.*;
-import org.jgroups.stack.Protocol;
-import org.jgroups.util.Streamable;
-import org.jgroups.util.TimeScheduler;
-import org.jgroups.util.Util;
-
-import java.io.*;
-import java.util.*;
-
-
-
-
-/**
- * Computes the broadcast messages that are stable, i.e. have been received by 
all members. Sends
- * STABLE events up the stack when this is the case. This allows NAKACK to 
garbage collect messages that
- * have been seen by all members.<p>
- * Works as follows: periodically we mcast our highest seqnos (seen for each 
member) to the group.
- * A stability vector, which maintains the highest seqno for each member and 
initially contains no data,
- * is updated when such a message is received. The entry for a member P is 
computed set to
- * min(entry[P], digest[P]). When messages from all members have been 
received, a stability
- * message is mcast, which causes all members to send a STABLE event up the 
stack (triggering garbage collection
- * in the NAKACK layer).<p>
- * The stable task now terminates after max_num_gossips if no messages or view 
changes have been sent or received
- * in the meantime. It will resume when messages are received. This 
effectively suspends sending superfluous
- * STABLE messages in the face of no activity.<br/>
- * New: when <code>max_bytes</code> is exceeded (unless disabled by setting it 
to 0),
- * a STABLE task will be started (unless it is already running).
- * @author Bela Ban
- */
-public class STABLE extends Protocol {
-    Address             local_addr=null;
-    final Vector        mbrs=new Vector();
-    final Digest        digest=new Digest(10);        // keeps track of the 
highest seqnos from all members
-    final Digest        latest_local_digest=new Digest(10); // keeps track of 
the latest digests received from NAKACK
-    final Vector        heard_from=new Vector();      // keeps track of who we 
already heard from (STABLE_GOSSIP msgs)
-
-    /** Sends a STABLE gossip every 20 seconds on average. 0 disables 
gossipping of STABLE messages */
-    long                desired_avg_gossip=20000;
-
-    /** delay before we send STABILITY msg (give others a change to send 
first). This should be set to a very
-     * small number (> 0 !) if <code>max_bytes</code> is used */
-    long                stability_delay=6000;
-    StabilitySendTask   stability_task=null;
-    final Object        stability_mutex=new Object(); // to synchronize on 
stability_task
-    StableTask          stable_task=null;             // bcasts periodic 
STABLE message (added to timer below)
-    final Object        stable_task_mutex=new Object(); // to sync on 
stable_task
-    TimeScheduler       timer=null;                   // to send periodic 
STABLE msgs (and STABILITY messages)
-    // int                 max_gossip_runs=3;            // max. number of 
times the StableTask runs before terminating
-    // int                 num_gossip_runs=max_gossip_runs; // this number is 
decremented (max_gossip_runs doesn't change)
-    static final String name="STABLE";
-
-    /** Total amount of bytes from incoming messages (default = 0 = disabled). 
When exceeded, a STABLE
-     * message will be broadcast and <code>num_bytes_received</code> reset to 
0 . If this is > 0, then ideally
-     * <code>stability_delay</code> should be set to a low number as well */
-    long                max_bytes=0;
-
-    /** The total number of bytes received from unicast and multicast messages 
*/
-    long                num_bytes_received=0;
-
-    /** When true, don't take part in garbage collection protocol: neither 
send STABLE messages nor
-     * handle STABILITY messages */
-    boolean             suspended=false;
-
-    boolean             initialized=false;
-
-    /** Max time we should hold off on message garbage collection. This is a 
second line of defense in case
-     * we get a SUSPEND_STABLE, but forget to send a corresponding 
RESUME_STABLE (which should never happen !)
-     * The consequence of a missing RESUME_STABLE would be that the group 
doesn't garbage collect stable
-     * messages anymore, eventually, with a lot of traffic, every member would 
accumulate messages and run
-     * out of memory !
-     */
-    // long                max_suspend_time=600000;
-
-    ResumeTask          resume_task=null;
-    final Object        resume_task_mutex=new Object();
-
-    /** Number of gossip messages */
-    int                 num_gossips=0;
-
-
-    public String getName() {
-        return name;
-    }
-
-    public long getDesiredAverageGossip() {
-        return desired_avg_gossip;
-    }
-
-    public void setDesiredAverageGossip(long gossip_interval) {
-        desired_avg_gossip=gossip_interval;
-    }
-
-    public long getMaxBytes() {
-        return max_bytes;
-    }
-
-    public void setMaxBytes(long max_bytes) {
-        this.max_bytes=max_bytes;
-    }
-
-    public int getNumberOfGossipMessages() {return num_gossips;}
-
-    public void resetStats() {
-        super.resetStats();
-        num_gossips=0;
-    }
-
-
-    public Vector requiredDownServices() {
-        Vector retval=new Vector();
-        retval.addElement(new Integer(Event.GET_DIGEST_STABLE));  // NAKACK 
layer
-        return retval;
-    }
-
-    public boolean setProperties(Properties props) {
-        String str;
-
-        super.setProperties(props);
-        str=props.getProperty("digest_timeout");
-        if(str != null) {
-            props.remove("digest_timeout");
-            log.error("digest_timeout has been deprecated; it will be 
ignored");
-        }
-
-        str=props.getProperty("desired_avg_gossip");
-        if(str != null) {
-            desired_avg_gossip=Long.parseLong(str);
-            props.remove("desired_avg_gossip");
-        }
-
-        str=props.getProperty("stability_delay");
-        if(str != null) {
-            stability_delay=Long.parseLong(str);
-            props.remove("stability_delay");
-        }
-
-        str=props.getProperty("max_gossip_runs");
-        if(str != null) {
-            props.remove("max_gossip_runs");
-            log.error("max_gossip_runs has been deprecated and will be 
ignored");
-        }
-
-        str=props.getProperty("max_bytes");
-        if(str != null) {
-            max_bytes=Long.parseLong(str);
-            props.remove("max_bytes");
-        }
-
-        str=props.getProperty("max_suspend_time");
-        if(str != null) {
-            log.error("max_suspend_time is not supported any longer; please 
remove it (ignoring it)");
-            props.remove("max_suspend_time");
-        }
-
-        if(props.size() > 0) {
-            log.error("these properties are not recognized: " + props);
-            
-            return false;
-        }
-        return true;
-    }
-
-
-    private void suspend(long timeout) {
-        if(!suspended) {
-            suspended=true;
-            if(log.isDebugEnabled())
-                log.debug("suspending message garbage collection");
-        }
-        startResumeTask(timeout); // will not start task if already running
-    }
-
-    private void resume() {
-        suspended=false;
-        if(log.isDebugEnabled())
-            log.debug("resuming message garbage collection");
-        stopResumeTask();
-    }
-
-    public void start() throws Exception {
-        if(stack != null && stack.timer != null)
-            timer=stack.timer;
-        else
-            throw new Exception("timer cannot be retrieved from protocol 
stack");
-        if(desired_avg_gossip > 0)
-            startStableTask();
-    }
-
-    public void stop() {
-        stopStableTask();
-        clearDigest();
-    }
-
-
-    public void up(Event evt) {
-        Message msg;
-        StableHeader hdr;
-        int type=evt.getType();
-
-        switch(type) {
-
-        case Event.MSG:
-            msg=(Message)evt.getArg();
-            if(max_bytes > 0) {  // message counting is enabled
-                long size=Math.max(msg.getLength(), 24);
-                num_bytes_received+=size;
-                if(num_bytes_received >= max_bytes) {
-                    if(log.isTraceEnabled()) {
-                        StringBuffer sb=new StringBuffer("max_bytes has been 
reached (max_bytes=");
-                        sb.append(max_bytes).append(", number of bytes 
received=");
-                        sb.append(num_bytes_received).append("): triggers 
stable msg");
-                        log.trace(sb.toString());
-                    }
-                    // asks the NAKACK protocol for the current digest, reply 
event is GET_DIGEST_STABLE_OK (arg=digest)
-                    passDown(new Event(Event.GET_DIGEST_STABLE));
-                    num_bytes_received=0;
-                }
-            }
-
-
-            hdr=(StableHeader)msg.removeHeader(name);
-            if(hdr == null)
-                break;
-            switch(hdr.type) {
-            case StableHeader.STABLE_GOSSIP:
-                handleStableMessage(msg.getSrc(), hdr.stableDigest);
-                break;
-            case StableHeader.STABILITY:
-                handleStabilityMessage(hdr.stableDigest, msg.getSrc());
-                break;
-            default:
-                if(log.isErrorEnabled()) log.error("StableHeader type " + 
hdr.type + " not known");
-            }
-            return;  // don't pass STABLE or STABILITY messages up the stack
-
-        case Event.VIEW_CHANGE:
-            View view=(View)evt.getArg();
-            handleViewChange(view);
-            break;
-
-        case Event.SET_LOCAL_ADDRESS:
-            local_addr=(Address)evt.getArg();
-            break;
-
-        case Event.GET_DIGEST_STABLE_OK:
-            Digest d=(Digest)evt.getArg(), copy=null;
-
-            synchronized(latest_local_digest) {
-                latest_local_digest.replace(d);
-                //if(heard_from.contains(local_addr))
-                copy=digest.copy();
-            }
-
-//            synchronized(digest) {
-//                boolean success=updateLocalDigest(d, local_addr, true);
-//                if(!success)
-//                    break;
-//                copy=digest.copy();
-//            }
-
-            if(copy != null)
-                sendStableMessage(copy);
-
-            break;
-        }
-
-        passUp(evt);
-    }
-
-
-
-
-    public void down(Event evt) {
-        switch(evt.getType()) {
-        case Event.VIEW_CHANGE:
-            View v=(View)evt.getArg();
-            handleViewChange(v);
-            break;
-
-        case Event.SUSPEND_STABLE:
-            long timeout=0;
-            Object t=evt.getArg();
-            if(t != null && t instanceof Long)
-                timeout=((Long)t).longValue();
-            suspend(timeout);
-            break;
-
-        case Event.RESUME_STABLE:
-            resume();
-            break;
-        }
-        passDown(evt);
-    }
-
-
-    public void runMessageGarbageCollection() {
-        Digest copy;
-        synchronized(digest) {
-            copy=digest.copy();
-        }
-        sendStableMessage(copy);
-    }
-
-
-
-    /* --------------------------------------- Private Methods 
---------------------------------------- */
-
-
-    private void handleViewChange(View v) {
-        Vector tmp=v.getMembers();
-        mbrs.clear();
-        mbrs.addAll(tmp);
-        adjustSenders(digest, tmp);
-        adjustSenders(latest_local_digest, tmp);
-        resetDigest(tmp);
-        if(!initialized)
-            initialized=true;
-    }
-
-
-    /** Digest and members are guaranteed to be non-null */
-    private void adjustSenders(Digest d, Vector members) {
-        synchronized(d) {
-            // 1. remove all members from digest who are not in the view
-            Iterator it=d.senders.keySet().iterator();
-            Address mbr;
-            while(it.hasNext()) {
-                mbr=(Address)it.next();
-                if(!members.contains(mbr))
-                    it.remove();
-            }
-            // 2. add members to digest which are in the new view but not in 
the digest
-            for(int i=0; i < members.size(); i++) {
-                mbr=(Address)members.get(i);
-                if(!d.contains(mbr))
-                    d.add(mbr, -1, -1);
-            }
-        }
-    }
-
-
-    private void clearDigest() {
-        synchronized(digest) {
-            digest.clear();
-        }
-    }
-
-
-
-    /** Update my own digest from a digest received by somebody else. Returns 
whether the update was successful.
-     *  Needs to be called with a lock on digest */
-    private boolean updateLocalDigest(Digest d, Address sender) {
-        if(d == null || d.size() == 0)
-            return false;
-
-        if(!initialized) {
-            if(log.isTraceEnabled())
-                log.trace("STABLE message will not be handled as I'm not yet 
initialized");
-            return false;
-        }
-
-//        if(self) {
-//            if(heard_from.contains(sender)) {
-//                resetHeardFromList(mbrs);
-//                digest.replace(d);
-//                if(log.isTraceEnabled())
-//                    log.trace("initialized digest from " + d);
-//                return true;
-//            }
-//            else
-//                return false;
-//        }
-
-        if(!digest.sameSenders(d)) {
-            if(log.isTraceEnabled())
-                log.trace(new StringBuffer("received a digest 
").append(d.printHighSeqnos()).append(" from ").
-                          append(sender).append(" which has different members 
than mine (").
-                          append(digest.printHighSeqnos()).append("), 
discarding it and resetting heard_from list"));
-            // to avoid sending incorrect stability/stable msgs, we simply 
reset our heard_from list, see DESIGN
-            resetDigest(mbrs);
-            return false;
-        }
-
-        StringBuffer sb=null;
-        if(log.isTraceEnabled())
-            sb=new StringBuffer("my [").append(local_addr).append("] digest 
before: ").append(digest).
-                    append("\ndigest from ").append(sender).append(": 
").append(d).append("\n");
-        Address mbr;
-        long highest_seqno, my_highest_seqno, new_highest_seqno;
-        long highest_seen_seqno, my_highest_seen_seqno, new_highest_seen_seqno;
-        Map.Entry entry;
-        org.jgroups.protocols.pbcast.Digest.Entry val;
-        for(Iterator it=d.senders.entrySet().iterator(); it.hasNext();) {
-            entry=(Map.Entry)it.next();
-            mbr=(Address)entry.getKey();
-            val=(org.jgroups.protocols.pbcast.Digest.Entry)entry.getValue();
-            highest_seqno=val.high_seqno;
-            highest_seen_seqno=val.high_seqno_seen;
-
-            // compute the minimum of the highest seqnos deliverable (for 
garbage collection)
-            my_highest_seqno=digest.highSeqnoAt(mbr);
-            // compute the maximum of the highest seqnos seen (for 
retransmission of last missing message)
-            my_highest_seen_seqno=digest.highSeqnoSeenAt(mbr);
-
-            new_highest_seqno=Math.min(my_highest_seqno, highest_seqno);
-            new_highest_seen_seqno=Math.max(my_highest_seen_seqno, 
highest_seen_seqno);
-            digest.setHighestDeliveredAndSeenSeqnos(mbr, new_highest_seqno, 
new_highest_seen_seqno);
-        }
-        if(log.isTraceEnabled()) {
-            sb.append("\nmy [").append(local_addr).append("] digest after: 
").append(digest).append("\n");
-            log.trace(sb);
-        }
-        return true;
-    }
-
-
-
-    private void resetDigest(Vector new_members) {
-        if(new_members == null || new_members.size() == 0)
-            return;
-        synchronized(heard_from) {
-            heard_from.clear();
-            heard_from.addAll(new_members);
-        }
-        synchronized(digest) {
-            digest.replace(latest_local_digest);
-        }
-    }
-
-    private boolean removeFromHeardFromList(Address mbr) {
-        synchronized(heard_from) {
-            heard_from.remove(mbr);
-            if(heard_from.size() == 0) {
-                resetDigest(this.mbrs);
-                return true;
-            }
-        }
-        return false;
-    }
-
-
-    void startStableTask() {
-        // Here, double-checked locking works: we don't want to synchronize if 
the task already runs (which is the case
-        // 99% of the time). If stable_task gets nulled after the condition 
check, we return anyways, but just miss
-        // 1 cycle: on the next message or view, we will start the task
-        if(stable_task != null)
-            return;
-        synchronized(stable_task_mutex) {
-            if(stable_task != null && stable_task.running()) {
-                return;  // already running
-            }
-            stable_task=new StableTask();
-            timer.add(stable_task, true); // fixed-rate scheduling
-        }
-        if(log.isTraceEnabled())
-            log.trace("stable task started");
-    }
-
-
-    void stopStableTask() {
-        // contrary to startStableTask(), we don't need double-checked locking 
here because this method is not
-        // called frequently
-        synchronized(stable_task_mutex) {
-            if(stable_task != null) {
-                stable_task.stop();
-                stable_task=null;
-            }
-        }
-    }
-
-
-    void startResumeTask(long max_suspend_time) {
-        max_suspend_time=(long)(max_suspend_time * 1.1); // little slack
-
-        synchronized(resume_task_mutex) {
-            if(resume_task != null && resume_task.running()) {
-                return;  // already running
-            }
-            else {
-                resume_task=new ResumeTask(max_suspend_time);
-                timer.add(resume_task, true); // fixed-rate scheduling
-            }
-        }
-        if(log.isDebugEnabled())
-            log.debug("resume task started, max_suspend_time=" + 
max_suspend_time);
-    }
-
-
-    void stopResumeTask() {
-        synchronized(resume_task_mutex) {
-            if(resume_task != null) {
-                resume_task.stop();
-                resume_task=null;
-            }
-        }
-    }
-
-
-    void startStabilityTask(Digest d, long delay) {
-        synchronized(stability_mutex) {
-            if(stability_task != null && stability_task.running()) {
-                return;  // already running
-            }
-            else {
-                stability_task=new StabilitySendTask(d, delay); // runs only 
once
-                timer.add(stability_task, true);
-            }
-        }
-    }
-
-
-    void stopStabilityTask() {
-        synchronized(stability_mutex) {
-            if(stability_task != null) {
-                stability_task.stop();
-                stability_task=null;
-            }
-        }
-    }
-
-
-    /**
-     Digest d contains (a) the highest seqnos <em>deliverable</em> for each 
sender and (b) the highest seqnos
-     <em>seen</em> for each member. (Difference: with 1,2,4,5, the highest 
seqno seen is 5, whereas the highest
-     seqno deliverable is 2). The minimum of all highest seqnos deliverable 
will be taken to send a stability
-     message, which results in garbage collection of messages lower than the 
ones in the stability vector. The
-     maximum of all seqnos will be taken to trigger possible retransmission of 
last missing seqno (see DESIGN
-     for details).
-     */
-    private void handleStableMessage(Address sender, Digest d) {
-        if(d == null || sender == null) {
-            if(log.isErrorEnabled()) log.error("digest or sender is null");
-            return;
-        }
-
-        if(!initialized) {
-            if(log.isTraceEnabled())
-                log.trace("STABLE message will not be handled as I'm not yet 
initialized");
-            return;
-        }
-
-        if(suspended) {
-            if(log.isTraceEnabled())
-                log.trace("STABLE message will not be handled as I'm 
suspended");
-            return;
-        }
-
-        if(log.isTraceEnabled())
-            log.trace(new StringBuffer("received stable msg from 
").append(sender).append(": ").append(d.printHighSeqnos()));
-        if(!heard_from.contains(sender)) {  // already received gossip from 
sender; discard it
-            if(log.isTraceEnabled()) log.trace("already received stable msg 
from " + sender);
-            return;
-        }
-
-        Digest copy;
-        synchronized(digest) {
-            boolean success=updateLocalDigest(d, sender);
-            if(!success) // we can only remove the sender from heard_from if 
*all* elements of my digest were updated
-                return;
-            copy=digest.copy();
-        }
-
-        boolean was_last=removeFromHeardFromList(sender);
-        if(was_last) {
-            sendStabilityMessage(copy);
-        }
-    }
-
-
-    /**
-     * Bcasts a STABLE message of the current digest to all members. Message 
contains highest seqnos of all members
-     * seen by this member. Highest seqnos are retrieved from the NAKACK layer 
below.
-     * @param Digest A <em>copy</em> of this.digest
-     */
-    private void sendStableMessage(Digest d) {
-        if(suspended) {
-            if(log.isTraceEnabled())
-                log.trace("will not send STABLE message as I'm suspended");
-            return;
-        }
-
-        if(d != null && d.size() > 0) {
-            if(log.isTraceEnabled())
-                log.trace("sending stable msg " + d.printHighSeqnos());
-            Message msg=new Message(); // mcast message
-            StableHeader hdr=new StableHeader(StableHeader.STABLE_GOSSIP, d);
-            msg.putHeader(name, hdr);
-            num_gossips++;
-            passDown(new Event(Event.MSG, msg));
-        }
-    }
-
-
-
-    /**
-     Schedules a stability message to be mcast after a random number of 
milliseconds (range 1-5 secs).
-     The reason for waiting a random amount of time is that, in the worst 
case, all members receive a
-     STABLE_GOSSIP message from the last outstanding member at the same time 
and would therefore mcast the
-     STABILITY message at the same time too. To avoid this, each member waits 
random N msecs. If, before N
-     elapses, some other member sent the STABILITY message, we just cancel our 
own message. If, during
-     waiting for N msecs to send STABILITY message S1, another STABILITY 
message S2 is to be sent, we just
-     discard S2.
-     @param tmp A copy of te stability digest, so we don't need to copy it 
again
-     */
-    void sendStabilityMessage(Digest tmp) {
-        long delay;
-
-        // give other members a chance to mcast STABILITY message. if we 
receive STABILITY by the end of
-        // our random sleep, we will not send the STABILITY msg. this prevents 
that all mbrs mcast a
-        // STABILITY msg at the same time
-        delay=Util.random(stability_delay);
-        startStabilityTask(tmp, delay);
-    }
-
-
-    void handleStabilityMessage(Digest d, Address sender) {
-        if(d == null) {
-            if(log.isErrorEnabled()) log.error("stability digest is null");
-            return;
-        }
-
-        if(!initialized) {
-            if(log.isTraceEnabled())
-                log.trace("STABLE message will not be handled as I'm not yet 
initialized");
-            return;
-        }
-
-        if(suspended) {
-            if(log.isDebugEnabled()) {
-                log.debug("stability message will not be handled as I'm 
suspended");
-            }
-            return;
-        }
-
-        if(log.isTraceEnabled())
-            log.trace(new StringBuffer("received stability msg from 
").append(sender).append(": ").append(d.printHighSeqnos()));
-        stopStabilityTask();
-
-        // we won't handle the gossip d, if d's members don't match the 
membership in my own digest,
-        // this is part of the fix for the NAKACK problem (bugs #943480 and 
#938584)
-        if(!this.digest.sameSenders(d)) {
-            if(log.isDebugEnabled()) {
-                log.debug("received digest (digest=" + d + ") which does not 
match my own digest ("+
-                        this.digest + "): ignoring digest and re-initializing 
own digest");
-            }
-            return;
-        }
-
-        resetDigest(mbrs);
-
-        // pass STABLE event down the stack, so NAKACK can garbage collect old 
messages
-        passDown(new Event(Event.STABLE, d));
-    }
-
-
-
-    /* ------------------------------------End of Private Methods 
------------------------------------- */
-
-
-
-
-
-
-
-    public static class StableHeader extends Header implements Streamable {
-        public static final int STABLE_GOSSIP=1;
-        public static final int STABILITY=2;
-
-        int type=0;
-        // Digest digest=new Digest();  // used for both STABLE_GOSSIP and 
STABILITY message
-        Digest stableDigest=null; // changed by Bela April 4 2004
-
-        public StableHeader() {
-        } // used for externalizable
-
-
-        public StableHeader(int type, Digest digest) {
-            this.type=type;
-            this.stableDigest=digest;
-        }
-
-
-        static String type2String(int t) {
-            switch(t) {
-                case STABLE_GOSSIP:
-                    return "STABLE_GOSSIP";
-                case STABILITY:
-                    return "STABILITY";
-                default:
-                    return "<unknown>";
-            }
-        }
-
-        public String toString() {
-            StringBuffer sb=new StringBuffer();
-            sb.append('[');
-            sb.append(type2String(type));
-            sb.append("]: digest is ");
-            sb.append(stableDigest);
-            return sb.toString();
-        }
-
-
-        public void writeExternal(ObjectOutput out) throws IOException {
-            out.writeInt(type);
-            if(stableDigest == null) {
-                out.writeBoolean(false);
-                return;
-            }
-            out.writeBoolean(true);
-            stableDigest.writeExternal(out);
-        }
-
-
-        public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-            type=in.readInt();
-            boolean digest_not_null=in.readBoolean();
-            if(digest_not_null) {
-                stableDigest=new Digest();
-                stableDigest.readExternal(in);
-            }
-        }
-
-        public long size() {
-            long retval=Global.INT_SIZE + Global.BYTE_SIZE; // type + presence 
for digest
-            if(stableDigest != null)
-                retval+=stableDigest.serializedSize();
-            return retval;
-        }
-
-        public void writeTo(DataOutputStream out) throws IOException {
-            out.writeInt(type);
-            Util.writeStreamable(stableDigest, out);
-        }
-
-        public void readFrom(DataInputStream in) throws IOException, 
IllegalAccessException, InstantiationException {
-            type=in.readInt();
-            stableDigest=(Digest)Util.readStreamable(Digest.class, in);
-        }
-
-
-    }
-
-
-
-
-    /**
-     Mcast periodic STABLE message. Interval between sends varies. Terminates 
after num_gossip_runs is 0.
-     However, UP or DOWN messages will reset num_gossip_runs to 
max_gossip_runs. This has the effect that the
-     stable_send task terminates only after a period of time within which no 
messages were either sent
-     or received
-     */
-    private class StableTask implements TimeScheduler.Task {
-        boolean stopped=false;
-
-        public void stop() {
-            stopped=true;
-        }
-
-        public boolean running() { // syntactic sugar
-            return !stopped;
-        }
-
-        public boolean cancelled() {
-            return stopped;
-        }
-
-        public long nextInterval() {
-            long interval=computeSleepTime();
-            if(interval <= 0)
-                return 10000;
-            else
-                return interval;
-        }
-
-
-        public void run() {
-            if(suspended) {
-                if(log.isTraceEnabled())
-                    log.trace("stable task will not run as suspended=" + 
suspended);
-                return;
-            }
-
-            // asks the NAKACK protocol for the current digest, reply event is 
GET_DIGEST_STABLE_OK (arg=digest)
-            passDown(new Event(Event.GET_DIGEST_STABLE));
-        }
-
-        long computeSleepTime() {
-            return getRandom((mbrs.size() * desired_avg_gossip * 2));
-        }
-
-        long getRandom(long range) {
-            return (long)((Math.random() * range) % range);
-        }
-    }
-
-
-
-
-
-    /**
-     * Multicasts a STABILITY message.
-     */
-    private class StabilitySendTask implements TimeScheduler.Task {
-        Digest   d=null;
-        boolean  stopped=false;
-        long     delay=2000;
-
-
-        public StabilitySendTask(Digest d, long delay) {
-            this.d=d;
-            this.delay=delay;
-        }
-
-        public boolean running() {
-            return !stopped;
-        }
-
-        public void stop() {
-            stopped=true;
-        }
-
-        public boolean cancelled() {
-            return stopped;
-        }
-
-
-        /** wait a random number of msecs (to give other a chance to send the 
STABILITY msg first) */
-        public long nextInterval() {
-            return delay;
-        }
-
-
-        public void run() {
-            Message msg;
-            StableHeader hdr;
-
-            if(suspended) {
-                if(log.isDebugEnabled()) {
-                    log.debug("STABILITY message will not be sent as 
suspended=" + suspended);
-                }
-                stopped=true;
-                return;
-            }
-
-            if(d != null && !stopped) {
-                msg=new Message();
-                hdr=new StableHeader(StableHeader.STABILITY, d);
-                msg.putHeader(STABLE.name, hdr);
-                if(log.isTraceEnabled()) log.trace("sending stability msg " + 
d.printHighSeqnos());
-                passDown(new Event(Event.MSG, msg));
-                d=null;
-            }
-            stopped=true; // run only once
-        }
-    }
-
-
-    private class ResumeTask implements TimeScheduler.Task {
-        boolean running=true;
-        long max_suspend_time=0;
-
-        ResumeTask(long max_suspend_time) {
-            this.max_suspend_time=max_suspend_time;
-        }
-
-        void stop() {
-            running=false;
-        }
-
-        public boolean running() {
-            return running;
-        }
-
-        public boolean cancelled() {
-            return running == false;
-        }
-
-        public long nextInterval() {
-            return max_suspend_time;
-        }
-
-        public void run() {
-            if(suspended)
-                log.warn("ResumeTask resumed message garbage collection - this 
should be done by a RESUME_STABLE event; " +
-                         "check why this event was not received (or increase 
max_suspend_time for large state transfers)");
-            resume();
-        }
-    }
-
-
-}

Reply via email to