This is an automated email from the ASF dual-hosted git repository. paulo pushed a commit to branch cassandra-4.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-4.0 by this push: new 8bb9c72f58 Add safeguard so cleanup fails when node has pending ranges 8bb9c72f58 is described below commit 8bb9c72f582de6bcc39522ba9ade91fd5bc22f67 Author: lzurovchak1 <lzurovch...@bloomberg.net> AuthorDate: Wed Dec 21 13:34:18 2022 -0500 Add safeguard so cleanup fails when node has pending ranges Patch by Lindsey Zurovchak; Reviewed by Paulo Motta, Stefan Miklosovic for CASSANDRA-16418 Closes #2061 --- CHANGES.txt | 1 + .../cassandra/db/compaction/CompactionManager.java | 6 +- .../apache/cassandra/service/StorageService.java | 4 + .../cassandra/distributed/action/GossipHelper.java | 12 +++ .../distributed/test/ring/CleanupFailureTest.java | 111 +++++++++++++++++++++ test/unit/org/apache/cassandra/db/CleanupTest.java | 6 +- 6 files changed, 133 insertions(+), 7 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index a75ee17128..789d8f7fa8 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0.8 + * Add safeguard so cleanup fails when node has pending ranges (CASSANDRA-16418) * Fix legacy clustering serialization for paging with compact storage (CASSANDRA-17507) * Add support for python 3.11 (CASSANDRA-18088) * Fix formatting of duration in cqlsh (CASSANDRA-18141) diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 799eed3d0d..a5277356de 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -515,11 +515,7 @@ public class CompactionManager implements CompactionManagerMBean { assert !cfStore.isIndex(); Keyspace keyspace = cfStore.keyspace; - if (!StorageService.instance.isJoined()) - { - logger.info("Cleanup cannot run before a node has joined the ring"); - return AllSSTableOpStatus.ABORTED; - } + // if local ranges is empty, it means no data should remain final RangesAtEndpoint replicas = StorageService.instance.getLocalReplicas(keyspace.getName()); final Set<Range<Token>> allRanges = replicas.ranges(); diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index b70347a301..343f435f11 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -132,6 +132,7 @@ import static org.apache.cassandra.index.SecondaryIndexManager.isIndexColumnFami import static org.apache.cassandra.net.NoPayload.noPayload; import static org.apache.cassandra.net.Verb.REPLICATION_DONE_REQ; import static org.apache.cassandra.schema.MigrationManager.evolveSystemKeyspace; +import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort; /** * This abstraction contains the token/identifier of this node @@ -3634,6 +3635,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE if (SchemaConstants.isLocalSystemKeyspace(keyspaceName)) throw new RuntimeException("Cleanup of the system keyspace is neither necessary nor wise"); + if (tokenMetadata.getPendingRanges(keyspaceName, getBroadcastAddressAndPort()).size() > 0) + throw new RuntimeException("Node is involved in cluster membership changes. Not safe to run cleanup."); + CompactionManager.AllSSTableOpStatus status = CompactionManager.AllSSTableOpStatus.SUCCESSFUL; for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, keyspaceName, tables)) { diff --git a/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java b/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java index f4b5838a86..4f0d343f7b 100644 --- a/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java +++ b/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java @@ -66,6 +66,18 @@ public class GossipHelper }; } + public static InstanceAction statusToDecommission(IInvokableInstance newNode) + { + return (instance) -> + { + changeGossipState(instance, + newNode, + Arrays.asList(tokens(newNode), + statusLeaving(newNode), + statusWithPortLeaving(newNode))); + }; + } + public static InstanceAction statusToNormal(IInvokableInstance peer) { return (target) -> diff --git a/test/distributed/org/apache/cassandra/distributed/test/ring/CleanupFailureTest.java b/test/distributed/org/apache/cassandra/distributed/test/ring/CleanupFailureTest.java new file mode 100644 index 0000000000..9c8f71fde9 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/ring/CleanupFailureTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.ring; + +import org.junit.Test; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.IInstanceConfig; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.api.NodeToolResult; +import org.apache.cassandra.distributed.shared.NetworkTopology; +import org.apache.cassandra.distributed.test.TestBaseImpl; + +import static org.junit.Assert.assertEquals; +import static org.apache.cassandra.distributed.action.GossipHelper.statusToBootstrap; +import static org.apache.cassandra.distributed.action.GossipHelper.statusToDecommission; +import static org.apache.cassandra.distributed.action.GossipHelper.withProperty; +import static org.apache.cassandra.distributed.test.ring.BootstrapTest.populate; +import static org.apache.cassandra.distributed.api.Feature.GOSSIP; +import static org.apache.cassandra.distributed.api.Feature.NETWORK; +import static org.apache.cassandra.distributed.api.TokenSupplier.evenlyDistributedTokens; + +public class CleanupFailureTest extends TestBaseImpl +{ + @Test + public void cleanupDuringDecommissionTest() throws Throwable + { + try (Cluster cluster = builder().withNodes(2) + .withTokenSupplier(evenlyDistributedTokens(2)) + .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(2, "dc0", "rack0")) + .withConfig(config -> config.with(NETWORK, GOSSIP)) + .start()) + { + IInvokableInstance nodeToDecommission = cluster.get(1); + IInvokableInstance nodeToRemainInCluster = cluster.get(2); + + // Start decomission on nodeToDecommission + cluster.forEach(statusToDecommission(nodeToDecommission)); + + // Add data to cluster while node is decomissioning + int NUM_ROWS = 100; + populate(cluster, 0, NUM_ROWS, 1, 1, ConsistencyLevel.ONE); + cluster.forEach(c -> c.flush(KEYSPACE)); + + // Check data before cleanup on nodeToRemainInCluster + assertEquals(100, nodeToRemainInCluster.executeInternal("SELECT * FROM " + KEYSPACE + ".tbl").length); + + // Run cleanup on nodeToRemainInCluster + NodeToolResult result = nodeToRemainInCluster.nodetoolResult("cleanup"); + result.asserts().failure(); + result.asserts().stderrContains("Node is involved in cluster membership changes. Not safe to run cleanup."); + + // Check data after cleanup on nodeToRemainInCluster + assertEquals(100, nodeToRemainInCluster.executeInternal("SELECT * FROM " + KEYSPACE + ".tbl").length); + } + } + + @Test + public void cleanupDuringBootstrapTest() throws Throwable + { + int originalNodeCount = 1; + int expandedNodeCount = originalNodeCount + 1; + + try (Cluster cluster = builder().withNodes(originalNodeCount) + .withTokenSupplier(evenlyDistributedTokens(expandedNodeCount)) + .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0")) + .withConfig(config -> config.with(NETWORK, GOSSIP)) + .start()) + { + IInstanceConfig config = cluster.newInstanceConfig(); + IInvokableInstance bootstrappingNode = cluster.bootstrap(config); + withProperty("cassandra.join_ring", false, + () -> bootstrappingNode.startup(cluster)); + + // Start decomission on bootstrappingNode + cluster.forEach(statusToBootstrap(bootstrappingNode)); + + // Add data to cluster while node is bootstrapping + int NUM_ROWS = 100; + populate(cluster, 0, NUM_ROWS, 1, 2, ConsistencyLevel.ONE); + cluster.forEach(c -> c.flush(KEYSPACE)); + + // Check data before cleanup on bootstrappingNode + assertEquals(NUM_ROWS, bootstrappingNode.executeInternal("SELECT * FROM " + KEYSPACE + ".tbl").length); + + // Run cleanup on bootstrappingNode + NodeToolResult result = bootstrappingNode.nodetoolResult("cleanup"); + result.asserts().stderrContains("Node is involved in cluster membership changes. Not safe to run cleanup."); + + // Check data after cleanup on bootstrappingNode + assertEquals(NUM_ROWS, bootstrappingNode.executeInternal("SELECT * FROM " + KEYSPACE + ".tbl").length); + } + } +} \ No newline at end of file diff --git a/test/unit/org/apache/cassandra/db/CleanupTest.java b/test/unit/org/apache/cassandra/db/CleanupTest.java index 6bb6433db1..f59bdd6e7f 100644 --- a/test/unit/org/apache/cassandra/db/CleanupTest.java +++ b/test/unit/org/apache/cassandra/db/CleanupTest.java @@ -116,9 +116,11 @@ public class CleanupTest } @Test - public void testCleanup() throws ExecutionException, InterruptedException + public void testCleanup() throws ExecutionException, InterruptedException, UnknownHostException { - StorageService.instance.getTokenMetadata().clearUnsafe(); + TokenMetadata tmd = StorageService.instance.getTokenMetadata(); + tmd.clearUnsafe(); + tmd.updateNormalToken(token(new byte[]{ 50 }), InetAddressAndPort.getByName("127.0.0.1")); Keyspace keyspace = Keyspace.open(KEYSPACE1); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org