This is an automated email from the ASF dual-hosted git repository. maedhroz pushed a commit to branch cassandra-4.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-4.0 by this push: new 0974a3656d Optionally avoid hint transfer during decommission 0974a3656d is described below commit 0974a3656dd4fd98b527264a763b50980f49be24 Author: Caleb Rackliffe <calebrackli...@gmail.com> AuthorDate: Fri Apr 5 15:26:39 2024 -0500 Optionally avoid hint transfer during decommission patch by Paul Chandler; reviewed by Caleb Rackliffe and Brandon Williams for CASSANDRA-19525 --- CHANGES.txt | 1 + conf/cassandra.yaml | 5 +++ src/java/org/apache/cassandra/config/Config.java | 1 + .../cassandra/config/DatabaseDescriptor.java | 10 +++++ .../cassandra/hints/HintsDispatchExecutor.java | 20 ++++++---- .../apache/cassandra/service/StorageService.java | 29 +++++++++++++-- .../cassandra/service/StorageServiceMBean.java | 3 ++ .../test/HintedHandoffAddRemoveNodesTest.java | 43 ++++++++++++++++++++++ 8 files changed, 102 insertions(+), 10 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 20f4fd47ea..b71ca9254f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0.13 + * Optionally avoid hint transfer during decommission (CASSANDRA-19525) * Change logging to TRACE when failing to get peer certificate (CASSANDRA-19508) * Push LocalSessions info logs to debug (CASSANDRA-18335) * Filter remote DC replicas out when constructing the initial replica plan for the local read repair (CASSANDRA-19120) diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index 7f162749d2..b5e6af8767 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -86,6 +86,11 @@ hints_flush_period_in_ms: 10000 # Maximum size for a single hints file, in megabytes. max_hints_file_size_in_mb: 128 +# Enable/disable transfering hints to a peer during decommission. Even when enabled, this does not guarantee +# consistency for logged batches, and it may delay decommission when coupled with a strict hinted_handoff_throttle. +# Default: true +#transfer_hints_on_decommission: true + # Compression to apply to the hint files. If omitted, hints files # will be written uncompressed. LZ4, Snappy, and Deflate compressors # are supported. diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index d7517124df..dc17639f98 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -296,6 +296,7 @@ public class Config public int hints_flush_period_in_ms = 10000; public int max_hints_file_size_in_mb = 128; public ParameterizedClass hints_compression; + public volatile boolean transfer_hints_on_decommission = true; public volatile boolean incremental_backups = false; public boolean trickle_fsync = false; diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 377f67117d..561fc24116 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -2596,6 +2596,16 @@ public class DatabaseDescriptor conf.hints_compression = parameterizedClass; } + public static boolean getTransferHintsOnDecommission() + { + return conf.transfer_hints_on_decommission; + } + + public static void setTransferHintsOnDecommission(boolean enabled) + { + conf.transfer_hints_on_decommission = enabled; + } + public static boolean isIncrementalBackupsEnabled() { return conf.incremental_backups; diff --git a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java index b5eb0b1fac..54e13f428b 100644 --- a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java +++ b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java @@ -182,7 +182,7 @@ final class HintsDispatchExecutor private boolean transfer(UUID hostId) { catalog.stores() - .map(store -> new DispatchHintsTask(store, hostId)) + .map(store -> new DispatchHintsTask(store, hostId, true)) .forEach(Runnable::run); return !catalog.hasFiles(); @@ -195,21 +195,27 @@ final class HintsDispatchExecutor private final UUID hostId; private final RateLimiter rateLimiter; - DispatchHintsTask(HintsStore store, UUID hostId) + DispatchHintsTask(HintsStore store, UUID hostId, boolean isTransfer) { this.store = store; this.hostId = hostId; - // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml). - // max rate is scaled by the number of nodes in the cluster (CASSANDRA-5272). - // the goal is to bound maximum hints traffic going towards a particular node from the rest of the cluster, - // not total outgoing hints traffic from this node - this is why the rate limiter is not shared between + // Rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml). + // Max rate is scaled by the number of nodes in the cluster (CASSANDRA-5272), unless we are transferring + // hints during decomission rather than dispatching them to their final destination. + // The goal is to bound maximum hints traffic going towards a particular node from the rest of the cluster, + // not total outgoing hints traffic from this node. This is why the rate limiter is not shared between // all the dispatch tasks (as there will be at most one dispatch task for a particular host id at a time). - int nodesCount = Math.max(1, StorageService.instance.getTokenMetadata().getAllEndpoints().size() - 1); + int nodesCount = isTransfer ? 1 : Math.max(1, StorageService.instance.getTokenMetadata().getAllEndpoints().size() - 1); double throttleInBytes = DatabaseDescriptor.getHintedHandoffThrottleInKB() * 1024.0 / nodesCount; this.rateLimiter = RateLimiter.create(throttleInBytes == 0 ? Double.MAX_VALUE : throttleInBytes); } + DispatchHintsTask(HintsStore store, UUID hostId) + { + this(store, hostId, false); + } + public void run() { try diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 18bd6c17fd..081e133db8 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -351,7 +351,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE /* the probability for tracing any particular request, 0 disables tracing and 1 enables for all */ private double traceProbability = 0.0; - private static enum Mode { STARTING, NORMAL, JOINING, LEAVING, DECOMMISSIONED, MOVING, DRAINING, DRAINED } + public static enum Mode { STARTING, NORMAL, JOINING, LEAVING, DECOMMISSIONED, MOVING, DRAINING, DRAINED } private volatile Mode operationMode = Mode.STARTING; /* Used for tracking drain progress */ @@ -4720,9 +4720,21 @@ public class StorageService extends NotificationBroadcasterSupport implements IE logger.debug("waiting for batch log processing."); batchlogReplay.get(); - setMode(Mode.LEAVING, "streaming hints to other nodes", true); + Future<?> hintsSuccess = Futures.immediateFuture(null); + + if (DatabaseDescriptor.getTransferHintsOnDecommission()) + { + setMode(Mode.LEAVING, "streaming hints to other nodes", true); + hintsSuccess = streamHints(); + } + else + { + setMode(Mode.LEAVING, "pausing dispatch and deleting hints", true); + DatabaseDescriptor.setHintedHandoffEnabled(false); + HintsService.instance.pauseDispatch(); + HintsService.instance.deleteAllHints(); + } - Future hintsSuccess = streamHints(); // wait for the transfer runnables to signal the latch. logger.debug("waiting for stream acks."); @@ -5962,6 +5974,17 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } } + public boolean getTransferHintsOnDecommission() + { + return DatabaseDescriptor.getTransferHintsOnDecommission(); + } + + public void setTransferHintsOnDecommission(boolean enabled) + { + DatabaseDescriptor.setTransferHintsOnDecommission(enabled); + logger.info("updated transfer_hints_on_decommission to {}", enabled); + } + public void setHintedHandoffThrottleInKB(int throttleInKB) { DatabaseDescriptor.setHintedHandoffThrottleInKB(throttleInKB); diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index c61e45e7a4..31dc6243a4 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -767,6 +767,9 @@ public interface StorageServiceMBean extends NotificationEmitter /** Sets the hinted handoff throttle in kb per second, per delivery thread. */ public void setHintedHandoffThrottleInKB(int throttleInKB); + public boolean getTransferHintsOnDecommission(); + public void setTransferHintsOnDecommission(boolean enabled); + /** * Resume bootstrap streaming when there is failed data streaming. * diff --git a/test/distributed/org/apache/cassandra/distributed/test/HintedHandoffAddRemoveNodesTest.java b/test/distributed/org/apache/cassandra/distributed/test/HintedHandoffAddRemoveNodesTest.java index 5cf1ab66df..a846a2eebb 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/HintedHandoffAddRemoveNodesTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/HintedHandoffAddRemoveNodesTest.java @@ -27,14 +27,18 @@ import org.apache.cassandra.distributed.api.IInvokableInstance; import org.apache.cassandra.distributed.api.TokenSupplier; import org.apache.cassandra.distributed.shared.NetworkTopology; import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.metrics.HintsServiceMetrics; import org.apache.cassandra.metrics.StorageMetrics; import org.apache.cassandra.service.StorageService; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertEquals; import static org.apache.cassandra.distributed.action.GossipHelper.decommission; +import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL; +import static org.apache.cassandra.distributed.api.ConsistencyLevel.TWO; import static org.apache.cassandra.distributed.api.Feature.GOSSIP; import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL; import static org.apache.cassandra.distributed.api.Feature.NETWORK; @@ -46,6 +50,39 @@ import static org.apache.cassandra.distributed.shared.AssertUtils.row; */ public class HintedHandoffAddRemoveNodesTest extends TestBaseImpl { + @SuppressWarnings("Convert2MethodRef") + @Test + public void shouldAvoidHintTransferOnDecommission() throws Exception + { + try (Cluster cluster = init(builder().withNodes(3) + .withConfig(config -> config.set("transfer_hints_on_decommission", false).with(GOSSIP)) + .withoutVNodes() + .start())) + { + cluster.schemaChange(withKeyspace("CREATE TABLE %s.decom_no_hints_test (key int PRIMARY KEY, value int)")); + + cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.decom_no_hints_test (key, value) VALUES (?, ?)"), ALL, 0, 0); + long hintsBeforeShutdown = countTotalHints(cluster.get(1)); + assertThat(hintsBeforeShutdown).isEqualTo(0); + long hintsDelivered = countHintsDelivered(cluster.get(1)); + assertThat(hintsDelivered).isEqualTo(0); + + // Shutdown node 3 so hints can be written against it. + cluster.get(3).shutdown().get(); + + cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.decom_no_hints_test (key, value) VALUES (?, ?)"), TWO, 0, 0); + Awaitility.await().until(() -> countTotalHints(cluster.get(1)) > 0); + long hintsAfterShutdown = countTotalHints(cluster.get(1)); + assertThat(hintsAfterShutdown).isEqualTo(1); + + cluster.get(1).nodetoolResult("decommission", "--force").asserts().success(); + long hintsDeliveredByDecom = countHintsDelivered(cluster.get(1)); + String mode = cluster.get(1).callOnInstance(() -> StorageService.instance.getOperationMode()); + assertEquals(StorageService.Mode.DECOMMISSIONED.toString(), mode); + assertThat(hintsDeliveredByDecom).isEqualTo(0); + } + } + /** * Replaces Python dtest {@code hintedhandoff_test.py:TestHintedHandoff.test_hintedhandoff_decom()}. */ @@ -130,6 +167,12 @@ public class HintedHandoffAddRemoveNodesTest extends TestBaseImpl return instance.callOnInstance(() -> StorageMetrics.totalHints.getCount()); } + @SuppressWarnings("Convert2MethodRef") + private long countHintsDelivered(IInvokableInstance instance) + { + return instance.callOnInstance(() -> HintsServiceMetrics.hintsSucceeded.getCount()); + } + @SuppressWarnings("SameParameterValue") private void populate(Cluster cluster, String table, int coordinator, int start, int count, ConsistencyLevel cl) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org