This is an automated email from the ASF dual-hosted git repository.

paulo pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-4.0 by this push:
     new 8bb9c72f58 Add safeguard so cleanup fails when node has pending ranges
8bb9c72f58 is described below

commit 8bb9c72f582de6bcc39522ba9ade91fd5bc22f67
Author: lzurovchak1 <lzurovch...@bloomberg.net>
AuthorDate: Wed Dec 21 13:34:18 2022 -0500

    Add safeguard so cleanup fails when node has pending ranges
    
    Patch by Lindsey Zurovchak; Reviewed by Paulo Motta, Stefan Miklosovic for 
CASSANDRA-16418
    
    Closes #2061
---
 CHANGES.txt                                        |   1 +
 .../cassandra/db/compaction/CompactionManager.java |   6 +-
 .../apache/cassandra/service/StorageService.java   |   4 +
 .../cassandra/distributed/action/GossipHelper.java |  12 +++
 .../distributed/test/ring/CleanupFailureTest.java  | 111 +++++++++++++++++++++
 test/unit/org/apache/cassandra/db/CleanupTest.java |   6 +-
 6 files changed, 133 insertions(+), 7 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index a75ee17128..789d8f7fa8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0.8
+ * Add safeguard so cleanup fails when node has pending ranges 
(CASSANDRA-16418)
  * Fix legacy clustering serialization for paging with compact storage 
(CASSANDRA-17507)
  * Add support for python 3.11 (CASSANDRA-18088)
  * Fix formatting of duration in cqlsh (CASSANDRA-18141)
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 799eed3d0d..a5277356de 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -515,11 +515,7 @@ public class CompactionManager implements 
CompactionManagerMBean
     {
         assert !cfStore.isIndex();
         Keyspace keyspace = cfStore.keyspace;
-        if (!StorageService.instance.isJoined())
-        {
-            logger.info("Cleanup cannot run before a node has joined the 
ring");
-            return AllSSTableOpStatus.ABORTED;
-        }
+
         // if local ranges is empty, it means no data should remain
         final RangesAtEndpoint replicas = 
StorageService.instance.getLocalReplicas(keyspace.getName());
         final Set<Range<Token>> allRanges = replicas.ranges();
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index b70347a301..343f435f11 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -132,6 +132,7 @@ import static 
org.apache.cassandra.index.SecondaryIndexManager.isIndexColumnFami
 import static org.apache.cassandra.net.NoPayload.noPayload;
 import static org.apache.cassandra.net.Verb.REPLICATION_DONE_REQ;
 import static 
org.apache.cassandra.schema.MigrationManager.evolveSystemKeyspace;
+import static 
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
 
 /**
  * This abstraction contains the token/identifier of this node
@@ -3634,6 +3635,9 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         if (SchemaConstants.isLocalSystemKeyspace(keyspaceName))
             throw new RuntimeException("Cleanup of the system keyspace is 
neither necessary nor wise");
 
+        if (tokenMetadata.getPendingRanges(keyspaceName, 
getBroadcastAddressAndPort()).size() > 0)
+            throw new RuntimeException("Node is involved in cluster membership 
changes. Not safe to run cleanup.");
+
         CompactionManager.AllSSTableOpStatus status = 
CompactionManager.AllSSTableOpStatus.SUCCESSFUL;
         for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, 
keyspaceName, tables))
         {
diff --git 
a/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java 
b/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java
index f4b5838a86..4f0d343f7b 100644
--- a/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java
+++ b/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java
@@ -66,6 +66,18 @@ public class GossipHelper
         };
     }
 
+    public static InstanceAction statusToDecommission(IInvokableInstance 
newNode)
+    {
+        return (instance) ->
+        {
+            changeGossipState(instance,
+                              newNode,
+                              Arrays.asList(tokens(newNode),
+                                            statusLeaving(newNode),
+                                            statusWithPortLeaving(newNode)));
+        };
+    }
+
     public static InstanceAction statusToNormal(IInvokableInstance peer)
     {
         return (target) ->
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/ring/CleanupFailureTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/ring/CleanupFailureTest.java
new file mode 100644
index 0000000000..9c8f71fde9
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/ring/CleanupFailureTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.ring;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+
+import static org.junit.Assert.assertEquals;
+import static 
org.apache.cassandra.distributed.action.GossipHelper.statusToBootstrap;
+import static 
org.apache.cassandra.distributed.action.GossipHelper.statusToDecommission;
+import static 
org.apache.cassandra.distributed.action.GossipHelper.withProperty;
+import static 
org.apache.cassandra.distributed.test.ring.BootstrapTest.populate;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static 
org.apache.cassandra.distributed.api.TokenSupplier.evenlyDistributedTokens;
+
+public class CleanupFailureTest extends TestBaseImpl
+{
+    @Test
+    public void cleanupDuringDecommissionTest() throws Throwable
+    {
+        try (Cluster cluster = builder().withNodes(2)
+                                        
.withTokenSupplier(evenlyDistributedTokens(2))
+                                        
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(2, "dc0", "rack0"))
+                                        .withConfig(config -> 
config.with(NETWORK, GOSSIP))
+                                        .start())
+        {
+            IInvokableInstance nodeToDecommission = cluster.get(1);
+            IInvokableInstance nodeToRemainInCluster = cluster.get(2);
+
+            // Start decomission on nodeToDecommission
+            cluster.forEach(statusToDecommission(nodeToDecommission));
+
+            // Add data to cluster while node is decomissioning
+            int NUM_ROWS = 100;
+            populate(cluster, 0, NUM_ROWS, 1, 1, ConsistencyLevel.ONE);
+            cluster.forEach(c -> c.flush(KEYSPACE));
+
+            // Check data before cleanup on nodeToRemainInCluster
+            assertEquals(100, nodeToRemainInCluster.executeInternal("SELECT * 
FROM " + KEYSPACE + ".tbl").length);
+
+            // Run cleanup on nodeToRemainInCluster
+            NodeToolResult result = 
nodeToRemainInCluster.nodetoolResult("cleanup");
+            result.asserts().failure();
+            result.asserts().stderrContains("Node is involved in cluster 
membership changes. Not safe to run cleanup.");
+
+            // Check data after cleanup on nodeToRemainInCluster
+            assertEquals(100, nodeToRemainInCluster.executeInternal("SELECT * 
FROM " + KEYSPACE + ".tbl").length);
+        }
+    }
+
+    @Test
+    public void cleanupDuringBootstrapTest() throws Throwable
+    {
+        int originalNodeCount = 1;
+        int expandedNodeCount = originalNodeCount + 1;
+
+        try (Cluster cluster = builder().withNodes(originalNodeCount)
+                                        
.withTokenSupplier(evenlyDistributedTokens(expandedNodeCount))
+                                        
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, 
"dc0", "rack0"))
+                                        .withConfig(config -> 
config.with(NETWORK, GOSSIP))
+                                        .start())
+        {
+            IInstanceConfig config = cluster.newInstanceConfig();
+            IInvokableInstance bootstrappingNode = cluster.bootstrap(config);
+            withProperty("cassandra.join_ring", false,
+                         () -> bootstrappingNode.startup(cluster));
+
+            // Start decomission on bootstrappingNode
+            cluster.forEach(statusToBootstrap(bootstrappingNode));
+
+            // Add data to cluster while node is bootstrapping
+            int NUM_ROWS = 100;
+            populate(cluster, 0, NUM_ROWS, 1, 2, ConsistencyLevel.ONE);
+            cluster.forEach(c -> c.flush(KEYSPACE));
+
+            // Check data before cleanup on bootstrappingNode
+            assertEquals(NUM_ROWS, bootstrappingNode.executeInternal("SELECT * 
FROM " + KEYSPACE + ".tbl").length);
+
+            // Run cleanup on bootstrappingNode
+            NodeToolResult result = 
bootstrappingNode.nodetoolResult("cleanup");
+            result.asserts().stderrContains("Node is involved in cluster 
membership changes. Not safe to run cleanup.");
+
+            // Check data after cleanup on bootstrappingNode
+            assertEquals(NUM_ROWS, bootstrappingNode.executeInternal("SELECT * 
FROM " + KEYSPACE + ".tbl").length);
+        }
+    }
+}
\ No newline at end of file
diff --git a/test/unit/org/apache/cassandra/db/CleanupTest.java 
b/test/unit/org/apache/cassandra/db/CleanupTest.java
index 6bb6433db1..f59bdd6e7f 100644
--- a/test/unit/org/apache/cassandra/db/CleanupTest.java
+++ b/test/unit/org/apache/cassandra/db/CleanupTest.java
@@ -116,9 +116,11 @@ public class CleanupTest
     }
 
     @Test
-    public void testCleanup() throws ExecutionException, InterruptedException
+    public void testCleanup() throws ExecutionException, InterruptedException, 
UnknownHostException
     {
-        StorageService.instance.getTokenMetadata().clearUnsafe();
+        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+        tmd.clearUnsafe();
+        tmd.updateNormalToken(token(new byte[]{ 50 }), 
InetAddressAndPort.getByName("127.0.0.1"));
 
         Keyspace keyspace = Keyspace.open(KEYSPACE1);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to