This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-4.0 by this push:
     new 0974a3656d Optionally avoid hint transfer during decommission
0974a3656d is described below

commit 0974a3656dd4fd98b527264a763b50980f49be24
Author: Caleb Rackliffe <calebrackli...@gmail.com>
AuthorDate: Fri Apr 5 15:26:39 2024 -0500

    Optionally avoid hint transfer during decommission
    
    patch by Paul Chandler; reviewed by Caleb Rackliffe and Brandon Williams 
for CASSANDRA-19525
---
 CHANGES.txt                                        |  1 +
 conf/cassandra.yaml                                |  5 +++
 src/java/org/apache/cassandra/config/Config.java   |  1 +
 .../cassandra/config/DatabaseDescriptor.java       | 10 +++++
 .../cassandra/hints/HintsDispatchExecutor.java     | 20 ++++++----
 .../apache/cassandra/service/StorageService.java   | 29 +++++++++++++--
 .../cassandra/service/StorageServiceMBean.java     |  3 ++
 .../test/HintedHandoffAddRemoveNodesTest.java      | 43 ++++++++++++++++++++++
 8 files changed, 102 insertions(+), 10 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 20f4fd47ea..b71ca9254f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0.13
+ * Optionally avoid hint transfer during decommission (CASSANDRA-19525)
  * Change logging to TRACE when failing to get peer certificate 
(CASSANDRA-19508)
  * Push LocalSessions info logs to debug (CASSANDRA-18335)
  * Filter remote DC replicas out when constructing the initial replica plan 
for the local read repair (CASSANDRA-19120)
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 7f162749d2..b5e6af8767 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -86,6 +86,11 @@ hints_flush_period_in_ms: 10000
 # Maximum size for a single hints file, in megabytes.
 max_hints_file_size_in_mb: 128
 
+# Enable/disable transfering hints to a peer during decommission. Even when 
enabled, this does not guarantee
+# consistency for logged batches, and it may delay decommission when coupled 
with a strict hinted_handoff_throttle.
+# Default: true
+#transfer_hints_on_decommission: true
+
 # Compression to apply to the hint files. If omitted, hints files
 # will be written uncompressed. LZ4, Snappy, and Deflate compressors
 # are supported.
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index d7517124df..dc17639f98 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -296,6 +296,7 @@ public class Config
     public int hints_flush_period_in_ms = 10000;
     public int max_hints_file_size_in_mb = 128;
     public ParameterizedClass hints_compression;
+    public volatile boolean transfer_hints_on_decommission = true;
 
     public volatile boolean incremental_backups = false;
     public boolean trickle_fsync = false;
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 377f67117d..561fc24116 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -2596,6 +2596,16 @@ public class DatabaseDescriptor
         conf.hints_compression = parameterizedClass;
     }
 
+    public static boolean getTransferHintsOnDecommission()
+    {
+        return conf.transfer_hints_on_decommission;
+    }
+
+    public static void setTransferHintsOnDecommission(boolean enabled)
+    {
+        conf.transfer_hints_on_decommission = enabled;
+    }
+
     public static boolean isIncrementalBackupsEnabled()
     {
         return conf.incremental_backups;
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java 
b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
index b5eb0b1fac..54e13f428b 100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
@@ -182,7 +182,7 @@ final class HintsDispatchExecutor
         private boolean transfer(UUID hostId)
         {
             catalog.stores()
-                   .map(store -> new DispatchHintsTask(store, hostId))
+                   .map(store -> new DispatchHintsTask(store, hostId, true))
                    .forEach(Runnable::run);
 
             return !catalog.hasFiles();
@@ -195,21 +195,27 @@ final class HintsDispatchExecutor
         private final UUID hostId;
         private final RateLimiter rateLimiter;
 
-        DispatchHintsTask(HintsStore store, UUID hostId)
+        DispatchHintsTask(HintsStore store, UUID hostId, boolean isTransfer)
         {
             this.store = store;
             this.hostId = hostId;
 
-            // rate limit is in bytes per second. Uses Double.MAX_VALUE if 
disabled (set to 0 in cassandra.yaml).
-            // max rate is scaled by the number of nodes in the cluster 
(CASSANDRA-5272).
-            // the goal is to bound maximum hints traffic going towards a 
particular node from the rest of the cluster,
-            // not total outgoing hints traffic from this node - this is why 
the rate limiter is not shared between
+            // Rate limit is in bytes per second. Uses Double.MAX_VALUE if 
disabled (set to 0 in cassandra.yaml).
+            // Max rate is scaled by the number of nodes in the cluster 
(CASSANDRA-5272), unless we are transferring
+            // hints during decomission rather than dispatching them to their 
final destination.
+            // The goal is to bound maximum hints traffic going towards a 
particular node from the rest of the cluster,
+            // not total outgoing hints traffic from this node. This is why 
the rate limiter is not shared between
             // all the dispatch tasks (as there will be at most one dispatch 
task for a particular host id at a time).
-            int nodesCount = Math.max(1, 
StorageService.instance.getTokenMetadata().getAllEndpoints().size() - 1);
+            int nodesCount = isTransfer ? 1 : Math.max(1, 
StorageService.instance.getTokenMetadata().getAllEndpoints().size() - 1);
             double throttleInBytes = 
DatabaseDescriptor.getHintedHandoffThrottleInKB() * 1024.0 / nodesCount;
             this.rateLimiter = RateLimiter.create(throttleInBytes == 0 ? 
Double.MAX_VALUE : throttleInBytes);
         }
 
+        DispatchHintsTask(HintsStore store, UUID hostId)
+        {
+            this(store, hostId, false);
+        }
+
         public void run()
         {
             try
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 18bd6c17fd..081e133db8 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -351,7 +351,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
     /* the probability for tracing any particular request, 0 disables tracing 
and 1 enables for all */
     private double traceProbability = 0.0;
 
-    private static enum Mode { STARTING, NORMAL, JOINING, LEAVING, 
DECOMMISSIONED, MOVING, DRAINING, DRAINED }
+    public static enum Mode { STARTING, NORMAL, JOINING, LEAVING, 
DECOMMISSIONED, MOVING, DRAINING, DRAINED }
     private volatile Mode operationMode = Mode.STARTING;
 
     /* Used for tracking drain progress */
@@ -4720,9 +4720,21 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         logger.debug("waiting for batch log processing.");
         batchlogReplay.get();
 
-        setMode(Mode.LEAVING, "streaming hints to other nodes", true);
+        Future<?> hintsSuccess = Futures.immediateFuture(null);
+
+        if (DatabaseDescriptor.getTransferHintsOnDecommission())
+        {
+            setMode(Mode.LEAVING, "streaming hints to other nodes", true);
+            hintsSuccess = streamHints();
+        }
+        else
+        {
+            setMode(Mode.LEAVING, "pausing dispatch and deleting hints", true);
+            DatabaseDescriptor.setHintedHandoffEnabled(false);
+            HintsService.instance.pauseDispatch();
+            HintsService.instance.deleteAllHints();
+        }
 
-        Future hintsSuccess = streamHints();
 
         // wait for the transfer runnables to signal the latch.
         logger.debug("waiting for stream acks.");
@@ -5962,6 +5974,17 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         }
     }
 
+    public boolean getTransferHintsOnDecommission()
+    {
+        return DatabaseDescriptor.getTransferHintsOnDecommission();
+    }
+
+    public void setTransferHintsOnDecommission(boolean enabled)
+    {
+        DatabaseDescriptor.setTransferHintsOnDecommission(enabled);
+        logger.info("updated transfer_hints_on_decommission to {}", enabled);
+    }
+
     public void setHintedHandoffThrottleInKB(int throttleInKB)
     {
         DatabaseDescriptor.setHintedHandoffThrottleInKB(throttleInKB);
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java 
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index c61e45e7a4..31dc6243a4 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -767,6 +767,9 @@ public interface StorageServiceMBean extends 
NotificationEmitter
     /** Sets the hinted handoff throttle in kb per second, per delivery 
thread. */
     public void setHintedHandoffThrottleInKB(int throttleInKB);
 
+    public boolean getTransferHintsOnDecommission();
+    public void setTransferHintsOnDecommission(boolean enabled);
+
     /**
      * Resume bootstrap streaming when there is failed data streaming.
      *
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/HintedHandoffAddRemoveNodesTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/HintedHandoffAddRemoveNodesTest.java
index 5cf1ab66df..a846a2eebb 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/HintedHandoffAddRemoveNodesTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/HintedHandoffAddRemoveNodesTest.java
@@ -27,14 +27,18 @@ import 
org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.distributed.api.TokenSupplier;
 import org.apache.cassandra.distributed.shared.NetworkTopology;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.metrics.HintsServiceMetrics;
 import org.apache.cassandra.metrics.StorageMetrics;
 import org.apache.cassandra.service.StorageService;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
 import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertEquals;
 
 import static 
org.apache.cassandra.distributed.action.GossipHelper.decommission;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.TWO;
 import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
 import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
 import static org.apache.cassandra.distributed.api.Feature.NETWORK;
@@ -46,6 +50,39 @@ import static 
org.apache.cassandra.distributed.shared.AssertUtils.row;
  */
 public class HintedHandoffAddRemoveNodesTest extends TestBaseImpl
 {
+    @SuppressWarnings("Convert2MethodRef")
+    @Test
+    public void shouldAvoidHintTransferOnDecommission() throws Exception
+    {
+        try (Cluster cluster = init(builder().withNodes(3)
+                                             .withConfig(config -> 
config.set("transfer_hints_on_decommission", false).with(GOSSIP))
+                                             .withoutVNodes()
+                                             .start()))
+        {
+            cluster.schemaChange(withKeyspace("CREATE TABLE 
%s.decom_no_hints_test (key int PRIMARY KEY, value int)"));
+
+            cluster.coordinator(1).execute(withKeyspace("INSERT INTO 
%s.decom_no_hints_test (key, value) VALUES (?, ?)"), ALL, 0, 0);
+            long hintsBeforeShutdown = countTotalHints(cluster.get(1));
+            assertThat(hintsBeforeShutdown).isEqualTo(0);
+            long hintsDelivered = countHintsDelivered(cluster.get(1));
+            assertThat(hintsDelivered).isEqualTo(0);
+
+            // Shutdown node 3 so hints can be written against it.
+            cluster.get(3).shutdown().get();
+
+            cluster.coordinator(1).execute(withKeyspace("INSERT INTO 
%s.decom_no_hints_test (key, value) VALUES (?, ?)"), TWO, 0, 0);
+            Awaitility.await().until(() -> countTotalHints(cluster.get(1)) > 
0);
+            long hintsAfterShutdown = countTotalHints(cluster.get(1));
+            assertThat(hintsAfterShutdown).isEqualTo(1);
+
+            cluster.get(1).nodetoolResult("decommission", 
"--force").asserts().success();
+            long hintsDeliveredByDecom = countHintsDelivered(cluster.get(1));
+            String mode = cluster.get(1).callOnInstance(() -> 
StorageService.instance.getOperationMode());
+            assertEquals(StorageService.Mode.DECOMMISSIONED.toString(), mode);
+            assertThat(hintsDeliveredByDecom).isEqualTo(0);
+        }
+    }
+
     /**
      * Replaces Python dtest {@code 
hintedhandoff_test.py:TestHintedHandoff.test_hintedhandoff_decom()}.
      */
@@ -130,6 +167,12 @@ public class HintedHandoffAddRemoveNodesTest extends 
TestBaseImpl
         return instance.callOnInstance(() -> 
StorageMetrics.totalHints.getCount());
     }
 
+    @SuppressWarnings("Convert2MethodRef")
+    private long countHintsDelivered(IInvokableInstance instance)
+    {
+        return instance.callOnInstance(() -> 
HintsServiceMetrics.hintsSucceeded.getCount());
+    }
+
     @SuppressWarnings("SameParameterValue")
     private void populate(Cluster cluster, String table, int coordinator, int 
start, int count, ConsistencyLevel cl)
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to