Broadcast shutdown message when cleanly stopping gossip.
Patch by brandonwilliams, reviewed by vijay for CASSANDRA-3936


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/190e27bb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/190e27bb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/190e27bb

Branch: refs/heads/trunk
Commit: 190e27bb5cc910d020b3828c1fbe83f009534b89
Parents: 27c999a
Author: Brandon Williams <brandonwilli...@apache.org>
Authored: Thu Mar 29 13:03:41 2012 -0500
Committer: Brandon Williams <brandonwilli...@apache.org>
Committed: Thu Mar 29 13:03:41 2012 -0500

----------------------------------------------------------------------
 .../org/apache/cassandra/gms/FailureDetector.java  |    9 ++
 .../cassandra/gms/GossipShutdownMessage.java       |   63 +++++++++++++++
 .../cassandra/gms/GossipShutdownVerbHandler.java   |   42 ++++++++++
 src/java/org/apache/cassandra/gms/Gossiper.java    |   50 +++++++++++-
 .../org/apache/cassandra/gms/IFailureDetector.java |    5 +
 .../apache/cassandra/service/StorageService.java   |    3 +
 .../org/apache/cassandra/dht/BootStrapperTest.java |    1 +
 7 files changed, 171 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/190e27bb/src/java/org/apache/cassandra/gms/FailureDetector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/FailureDetector.java 
b/src/java/org/apache/cassandra/gms/FailureDetector.java
index c8ba43a..0a66cb6 100644
--- a/src/java/org/apache/cassandra/gms/FailureDetector.java
+++ b/src/java/org/apache/cassandra/gms/FailureDetector.java
@@ -195,6 +195,15 @@ public class FailureDetector implements IFailureDetector, 
FailureDetectorMBean
         }
     }
 
+    public void forceConviction(InetAddress ep)
+    {
+        logger.debug("Forcing conviction of {}", ep);
+        for ( IFailureDetectionEventListener listener : fdEvntListeners )
+        {
+            listener.convict(ep, phiConvictThreshold);
+        }
+    }
+
     public void remove(InetAddress ep)
     {
         arrivalSamples_.remove(ep);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/190e27bb/src/java/org/apache/cassandra/gms/GossipShutdownMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipShutdownMessage.java 
b/src/java/org/apache/cassandra/gms/GossipShutdownMessage.java
new file mode 100644
index 0000000..3122986
--- /dev/null
+++ b/src/java/org/apache/cassandra/gms/GossipShutdownMessage.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.gms;
+
+import org.apache.cassandra.io.IVersionedSerializer;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * This message indicates the gossiper is shutting down
+ */
+
+class GossipShutdownMessage
+{
+    private static final IVersionedSerializer<GossipShutdownMessage> 
serializer;
+    static
+    {
+        serializer = new GossipShutdownMessageSerializer();
+    }
+
+    static IVersionedSerializer<GossipShutdownMessage> serializer()
+    {
+        return serializer;
+    }
+
+    GossipShutdownMessage()
+    {
+    }
+}
+
+class GossipShutdownMessageSerializer implements 
IVersionedSerializer<GossipShutdownMessage>
+{
+    public void serialize(GossipShutdownMessage gShutdownMessage, DataOutput 
dos, int version) throws IOException
+    {
+    }
+
+    public GossipShutdownMessage deserialize(DataInput dis, int version) 
throws IOException
+    {
+        return new GossipShutdownMessage();
+    }
+
+    public long serializedSize(GossipShutdownMessage gossipShutdownMessage, 
int version)
+    {
+        throw new UnsupportedOperationException();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/190e27bb/src/java/org/apache/cassandra/gms/GossipShutdownVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipShutdownVerbHandler.java 
b/src/java/org/apache/cassandra/gms/GossipShutdownVerbHandler.java
new file mode 100644
index 0000000..352e229
--- /dev/null
+++ b/src/java/org/apache/cassandra/gms/GossipShutdownVerbHandler.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.gms;
+
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+
+public class GossipShutdownVerbHandler implements IVerbHandler
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(GossipShutdownVerbHandler.class);
+
+    public void doVerb(Message message, String id)
+    {
+        InetAddress from = message.getFrom();
+        if (!Gossiper.instance.isEnabled())
+        {
+            logger.debug("Ignoring shutdown message from {} because gossip is 
disabled", from);
+            return;
+        }
+        FailureDetector.instance.forceConviction(from);
+    }
+    
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/190e27bb/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java 
b/src/java/org/apache/cassandra/gms/Gossiper.java
index d6237a2..e8cf5a8 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -54,6 +54,8 @@ import org.apache.cassandra.service.StorageService;
  * sends node A a GossipDigestAckMessage. On receipt of this message node A 
sends node B a
  * GossipDigestAck2Message which completes a round of Gossip. This module as 
and when it hears one
  * of the three above mentioned messages updates the Failure Detector with the 
liveness information.
+ * Upon hearing a GossipShutdownMessage, this module will instantly mark the 
remote node as down in
+ * the Failure Detector.
  */
 
 public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
@@ -294,7 +296,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         unreachableEndpoints.remove(endpoint);
         endpointStateMap.remove(endpoint);
         expireTimeEndpointMap.remove(endpoint);
-        justRemovedEndpoints.put(endpoint, System.currentTimeMillis());
+        quarantineEndpoint(endpoint);
         if (logger.isDebugEnabled())
             logger.debug("evicting " + endpoint + " from gossip");
     }
@@ -313,12 +315,21 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         // do not remove endpointState until the quarantine expires
         FailureDetector.instance.remove(endpoint);
         versions.remove(endpoint);
-        justRemovedEndpoints.put(endpoint, System.currentTimeMillis());
+        quarantineEndpoint(endpoint);
         if (logger.isDebugEnabled())
             logger.debug("removing endpoint " + endpoint);
     }
 
     /**
+     * Quarantines the endpoint for QUARANTINE_DELAY
+     * @param endpoint
+     */
+    private void quarantineEndpoint(InetAddress endpoint)
+    {
+        justRemovedEndpoints.put(endpoint, System.currentTimeMillis());
+    }
+
+    /**
      * Remove the Endpoint and evict immediately, to avoid gossiping about 
this node.
      * This should only be called when a token is taken over by a new IP 
address.
      * @param endpoint The endpoint that has been replaced
@@ -522,6 +533,14 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         GossipDigestAck2Message.serializer().serialize(gDigestAck2Message, 
dos, version);
         return new Message(FBUtilities.getBroadcastAddress(), 
StorageService.Verb.GOSSIP_DIGEST_ACK2, bos.toByteArray(), version);
     }
+    
+    Message makeGossipShutdownMessage(int version) throws IOException
+    {
+        FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(bos);
+        GossipShutdownMessage.serializer().serialize(new 
GossipShutdownMessage(), dos, version);
+        return new Message(FBUtilities.getBroadcastAddress(), 
StorageService.Verb.GOSSIP_SHUTDOWN, bos.toByteArray(), version);
+    }
 
     /**
      * Returns true if the chosen target was also a seed. False otherwise
@@ -1111,6 +1130,33 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
     public void stop()
     {
         scheduledGossipTask.cancel(false);
+        logger.info("Announcing shutdown");
+        try
+        {
+            Thread.sleep(intervalInMillis);
+        }
+        catch (InterruptedException e)
+        {
+            throw new RuntimeException(e);
+        }
+        MessageProducer prod = new MessageProducer()
+        {
+            public Message getMessage(Integer version) throws IOException
+            {
+                return makeGossipShutdownMessage(version);
+            }
+        };
+        for (InetAddress ep : liveEndpoints)
+        {
+            try
+            {
+                
MessagingService.instance().sendOneWay(prod.getMessage(getVersion(ep)), ep);
+            }
+            catch (IOException ex)
+            {
+                // keep going
+            }
+        }
     }
 
     public boolean isEnabled()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/190e27bb/src/java/org/apache/cassandra/gms/IFailureDetector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/IFailureDetector.java 
b/src/java/org/apache/cassandra/gms/IFailureDetector.java
index d5bf9ec..734047b 100644
--- a/src/java/org/apache/cassandra/gms/IFailureDetector.java
+++ b/src/java/org/apache/cassandra/gms/IFailureDetector.java
@@ -68,6 +68,11 @@ public interface IFailureDetector
     public void remove(InetAddress ep);
 
     /**
+     * force conviction of endpoint in the failure detector
+     */
+    public void forceConviction(InetAddress ep);
+
+    /**
      * Register interest for Failure Detector events.
      * @param listener implementation of an application provided 
IFailureDetectionEventListener
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/190e27bb/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index d3105a9..3e1034a 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -114,6 +114,7 @@ public class StorageService implements 
IEndpointStateChangeSubscriber, StorageSe
         STREAMING_REPAIR_RESPONSE,
         SNAPSHOT, // Similar to nt snapshot
         MIGRATION_REQUEST,
+        GOSSIP_SHUTDOWN,
         // use as padding for backwards compatability where a previous version 
needs to validate a verb from the future.
         UNUSED_1,
         UNUSED_2,
@@ -142,6 +143,7 @@ public class StorageService implements 
IEndpointStateChangeSubscriber, StorageSe
         put(Verb.GOSSIP_DIGEST_ACK, Stage.GOSSIP);
         put(Verb.GOSSIP_DIGEST_ACK2, Stage.GOSSIP);
         put(Verb.GOSSIP_DIGEST_SYN, Stage.GOSSIP);
+        put(Verb.GOSSIP_SHUTDOWN, Stage.GOSSIP);
         put(Verb.DEFINITIONS_UPDATE, Stage.MIGRATION);
         put(Verb.SCHEMA_CHECK, Stage.MIGRATION);
         put(Verb.MIGRATION_REQUEST, Stage.MIGRATION);
@@ -286,6 +288,7 @@ public class StorageService implements 
IEndpointStateChangeSubscriber, StorageSe
         
MessagingService.instance().registerVerbHandlers(Verb.GOSSIP_DIGEST_SYN, new 
GossipDigestSynVerbHandler());
         
MessagingService.instance().registerVerbHandlers(Verb.GOSSIP_DIGEST_ACK, new 
GossipDigestAckVerbHandler());
         
MessagingService.instance().registerVerbHandlers(Verb.GOSSIP_DIGEST_ACK2, new 
GossipDigestAck2VerbHandler());
+        MessagingService.instance().registerVerbHandlers(Verb.GOSSIP_SHUTDOWN, 
new GossipShutdownVerbHandler());
 
         
MessagingService.instance().registerVerbHandlers(Verb.DEFINITIONS_UPDATE, new 
DefinitionsUpdateVerbHandler());
         MessagingService.instance().registerVerbHandlers(Verb.SCHEMA_CHECK, 
new SchemaCheckVerbHandler());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/190e27bb/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java 
b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
index 4b19505..30c0254 100644
--- a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
+++ b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
@@ -178,6 +178,7 @@ public class BootStrapperTest extends SchemaLoader
             public void 
registerFailureDetectionEventListener(IFailureDetectionEventListener listener) 
{ throw new UnsupportedOperationException(); }
             public void 
unregisterFailureDetectionEventListener(IFailureDetectionEventListener 
listener) { throw new UnsupportedOperationException(); }
             public void remove(InetAddress ep) { throw new 
UnsupportedOperationException(); }
+            public void forceConviction(InetAddress ep) { throw new 
UnsupportedOperationException(); }
             public void clear(InetAddress ep) { throw new 
UnsupportedOperationException(); }
         };
         s.addSourceFilter(new 
RangeStreamer.FailureDetectorSourceFilter(mockFailureDetector));

Reply via email to