This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4f53bc8  Bootstrap replace produce correct pending range calculations
4f53bc8 is described below

commit 4f53bc87261b470adf292fdf37ed4e81bb6f8704
Author: Sam Tunnicliffe <s...@beobal.com>
AuthorDate: Wed Feb 27 12:33:32 2019 +0000

    Bootstrap replace produce correct pending range calculations
    
    Patch by Sam Tunnicliffe; reviewed by Benedict Elliott Smith for 
CASSANDRA-14802
---
 CHANGES.txt                                        |   1 +
 .../apache/cassandra/locator/TokenMetadata.java    |  16 +-
 .../cassandra/locator/PendingRangesTest.java       | 260 +++++++++++++++++++++
 3 files changed, 271 insertions(+), 6 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 7e1b5eb..2254452 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Calculate pending ranges for BOOTSTRAP_REPLACE correctly (CASSANDRA-14802)
  * Make TableCQLHelper reuse the single quote pattern (CASSANDRA-15033)
  * Add Zstd compressor (CASSANDRA-14482)
  * Fix IR prepare anti-compaction race (CASSANDRA-15027)
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java 
b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index cb189c8..c16538b 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -906,19 +906,23 @@ public class TokenMetadata
         // At this stage newPendingRanges has been updated according to leave 
operations. We can
         // now continue the calculation by checking bootstrapping nodes.
 
-        // For each of the bootstrapping nodes, simply add and remove them one 
by one to
-        // allLeftMetadata and check in between what their ranges would be.
+        // For each of the bootstrapping nodes, simply add to the 
allLeftMetadata and check what their
+        // ranges would be. We actually need to clone allLeftMetadata each 
time as resetting its state
+        // after getting the new pending ranges is not as simple as just 
removing the bootstrapping
+        // endpoint. If the bootstrapping endpoint constitutes a replacement, 
removing it after checking
+        // the newly pending ranges means there are now fewer endpoints that 
there were originally and
+        // causes its next neighbour to take over its primary range which 
affects the next RF endpoints
+        // in the ring.
         Multimap<InetAddressAndPort, Token> bootstrapAddresses = 
bootstrapTokens.inverse();
         for (InetAddressAndPort endpoint : bootstrapAddresses.keySet())
         {
             Collection<Token> tokens = bootstrapAddresses.get(endpoint);
-
-            allLeftMetadata.updateNormalTokens(tokens, endpoint);
-            for (Replica replica : 
strategy.getAddressReplicas(allLeftMetadata, endpoint))
+            TokenMetadata cloned = allLeftMetadata.cloneOnlyTokenMap();
+            cloned.updateNormalTokens(tokens, endpoint);
+            for (Replica replica : strategy.getAddressReplicas(cloned, 
endpoint))
             {
                 newPendingRanges.addPendingRange(replica.range(), replica);
             }
-            allLeftMetadata.removeEndpoint(endpoint);
         }
 
         // At this stage newPendingRanges has been updated according to 
leaving and bootstrapping nodes.
diff --git a/test/unit/org/apache/cassandra/locator/PendingRangesTest.java 
b/test/unit/org/apache/cassandra/locator/PendingRangesTest.java
new file mode 100644
index 0000000..48bf546
--- /dev/null
+++ b/test/unit/org/apache/cassandra/locator/PendingRangesTest.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.net.UnknownHostException;
+import java.util.Collections;
+
+import com.google.common.collect.*;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Token;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class PendingRangesTest
+{
+    private static final String RACK1 = "RACK1";
+    private static final String DC1 = "DC1";
+    private static final String KEYSPACE = "ks";
+    private static final InetAddressAndPort PEER1 = peer(1);
+    private static final InetAddressAndPort PEER2 = peer(2);
+    private static final InetAddressAndPort PEER3 = peer(3);
+    private static final InetAddressAndPort PEER4 = peer(4);
+    private static final InetAddressAndPort PEER5 = peer(5);
+    private static final InetAddressAndPort PEER6 = peer(6);
+
+    private static final InetAddressAndPort PEER1A = peer(11);
+    private static final InetAddressAndPort PEER4A = peer(14);
+
+    private static final Token TOKEN1 = token(0);
+    private static final Token TOKEN2 = token(10);
+    private static final Token TOKEN3 = token(20);
+    private static final Token TOKEN4 = token(30);
+    private static final Token TOKEN5 = token(40);
+    private static final Token TOKEN6 = token(50);
+
+    @BeforeClass
+    public static void beforeClass() throws Throwable
+    {
+        DatabaseDescriptor.daemonInitialization();
+        IEndpointSnitch snitch = snitch();
+        DatabaseDescriptor.setEndpointSnitch(snitch);
+    }
+
+    @Test
+    public void calculatePendingRangesForConcurrentReplacements()
+    {
+        /*
+         * As described in CASSANDRA-14802, concurrent range movements can 
generate pending ranges
+         * which are far larger than strictly required, which in turn can 
impact availability.
+         *
+         * In the narrow case of straight replacement, the pending ranges 
should mirror the owned ranges
+         * of the nodes being replaced.
+         *
+         * Note: the following example is purely illustrative as the iteration 
order for processing
+         * bootstrapping endpoints is not guaranteed. Because of this, 
precisely which endpoints' pending
+         * ranges are correct/incorrect depends on the specifics of the ring. 
Concretely, the bootstrap tokens
+         * are ultimately backed by a HashMap, so iteration of bootstrapping 
nodes is based on the hashcodes
+         * of the endpoints.
+         *
+         * E.g. a 6 node cluster with tokens:
+         *
+         * nodeA : 0
+         * nodeB : 10
+         * nodeC : 20
+         * nodeD : 30
+         * nodeE : 40
+         * nodeF : 50
+         *
+         * with an RF of 3, this gives an initial ring of :
+         *
+         * nodeA : (50, 0], (40, 50], (30, 40]
+         * nodeB : (0, 10], (50, 0], (40, 50]
+         * nodeC : (10, 20], (0, 10], (50, 0]
+         * nodeD : (20, 30], (10, 20], (0, 10]
+         * nodeE : (30, 40], (20, 30], (10, 20]
+         * nodeF : (40, 50], (30, 40], (20, 30]
+         *
+         * If nodeA is replaced by node1A, then the pending ranges map should 
be:
+         * {
+         *   (50, 0]  : [node1A],
+         *   (40, 50] : [node1A],
+         *   (30, 40] : [node1A]
+         * }
+         *
+         * Starting a second concurrent replacement of a node with 
non-overlapping ranges
+         * (i.e. node4 for node4A) should result in a pending range map of:
+         * {
+         *   (50, 0]  : [node1A],
+         *   (40, 50] : [node1A],
+         *   (30, 40] : [node1A],
+         *   (20, 30] : [node4A],
+         *   (10, 20] : [node4A],
+         *   (0, 10]  : [node4A]
+         * }
+         *
+         * But, the bug in CASSANDRA-14802 causes it to be:
+         * {
+         *   (50, 0]  : [node1A],
+         *   (40, 50] : [node1A],
+         *   (30, 40] : [node1A],
+         *   (20, 30] : [node4A],
+         *   (10, 20] : [node4A],
+         *   (50, 10] : [node4A]
+         * }
+         *
+         * so node4A incorrectly becomes a pending endpoint for an additional 
sub-range: (50, 0).
+         *
+         */
+        TokenMetadata tm = new TokenMetadata();
+        AbstractReplicationStrategy replicationStrategy = simpleStrategy(tm, 
3);
+
+        // setup initial ring
+        addNode(tm, PEER1, TOKEN1);
+        addNode(tm, PEER2, TOKEN2);
+        addNode(tm, PEER3, TOKEN3);
+        addNode(tm, PEER4, TOKEN4);
+        addNode(tm, PEER5, TOKEN5);
+        addNode(tm, PEER6, TOKEN6);
+
+        // no pending ranges before any replacements
+        tm.calculatePendingRanges(replicationStrategy, KEYSPACE);
+        assertEquals(0, 
Iterators.size(tm.getPendingRanges(KEYSPACE).iterator()));
+
+        // Ranges initially owned by PEER1 and PEER4
+        RangesAtEndpoint peer1Ranges = 
replicationStrategy.getAddressReplicas(tm).get(PEER1);
+        RangesAtEndpoint peer4Ranges = 
replicationStrategy.getAddressReplicas(tm).get(PEER4);
+        // Replace PEER1 with PEER1A
+        replace(PEER1, PEER1A, TOKEN1, tm, replicationStrategy);
+        // The only pending ranges should be the ones previously belonging to 
PEER1
+        // and these should have a single pending endpoint, PEER1A
+        RangesByEndpoint.Builder b1 = new RangesByEndpoint.Builder();
+        peer1Ranges.iterator().forEachRemaining(replica -> b1.put(PEER1A, new 
Replica(PEER1A, replica.range(), replica.isFull())));
+        RangesByEndpoint expected = b1.build();
+        assertPendingRanges(tm.getPendingRanges(KEYSPACE), expected);
+        // Also verify the Multimap variant of getPendingRanges
+        assertPendingRanges(tm.getPendingRangesMM(KEYSPACE), expected);
+
+        // Replace PEER4 with PEER4A
+        replace(PEER4, PEER4A, TOKEN4, tm, replicationStrategy);
+        // Pending ranges should now include the ranges originally belonging
+        // to PEER1 (now pending for PEER1A) and the ranges originally 
belonging to PEER4
+        // (now pending for PEER4A).
+        RangesByEndpoint.Builder b2 = new RangesByEndpoint.Builder();
+        peer1Ranges.iterator().forEachRemaining(replica -> b2.put(PEER1A, new 
Replica(PEER1A, replica.range(), replica.isFull())));
+        peer4Ranges.iterator().forEachRemaining(replica -> b2.put(PEER4A, new 
Replica(PEER4A, replica.range(), replica.isFull())));
+        expected = b2.build();
+        assertPendingRanges(tm.getPendingRanges(KEYSPACE), expected);
+        assertPendingRanges(tm.getPendingRangesMM(KEYSPACE), expected);
+    }
+
+
+    private void assertPendingRanges(PendingRangeMaps pending, 
RangesByEndpoint expected)
+    {
+        RangesByEndpoint.Builder actual = new RangesByEndpoint.Builder();
+        pending.iterator().forEachRemaining(pendingRange -> {
+            Replica replica = 
Iterators.getOnlyElement(pendingRange.getValue().iterator());
+            actual.put(replica.endpoint(), replica);
+        });
+        assertRangesByEndpoint(expected, actual.build());
+    }
+
+    private void assertPendingRanges(EndpointsByRange pending, 
RangesByEndpoint expected)
+    {
+        RangesByEndpoint.Builder actual = new RangesByEndpoint.Builder();
+        pending.flattenEntries().forEach(entry -> 
actual.put(entry.getValue().endpoint(), entry.getValue()));
+        assertRangesByEndpoint(expected, actual.build());
+    }
+
+
+    private void assertRangesByEndpoint(RangesByEndpoint expected, 
RangesByEndpoint actual)
+    {
+        assertEquals(expected.keySet(), actual.keySet());
+        for (InetAddressAndPort endpoint : expected.keySet())
+        {
+            RangesAtEndpoint expectedReplicas = expected.get(endpoint);
+            RangesAtEndpoint actualReplicas = actual.get(endpoint);
+            assertEquals(expectedReplicas.size(), actualReplicas.size());
+            assertTrue(Iterables.all(expectedReplicas, 
actualReplicas::contains));
+        }
+    }
+
+    private void addNode(TokenMetadata tm, InetAddressAndPort replica, Token 
token)
+    {
+        tm.updateNormalTokens(Collections.singleton(token), replica);
+    }
+
+    private void replace(InetAddressAndPort toReplace,
+                         InetAddressAndPort replacement,
+                         Token token,
+                         TokenMetadata tm,
+                         AbstractReplicationStrategy replicationStrategy)
+    {
+        assertEquals(toReplace, tm.getEndpoint(token));
+        tm.addReplaceTokens(Collections.singleton(token), replacement, 
toReplace);
+        tm.calculatePendingRanges(replicationStrategy, KEYSPACE);
+    }
+
+    private static Token token(long token)
+    {
+        return 
Murmur3Partitioner.instance.getTokenFactory().fromString(Long.toString(token));
+    }
+
+    private static InetAddressAndPort peer(int addressSuffix)
+    {
+        try
+        {
+            return InetAddressAndPort.getByAddress(new byte[]{ 127, 0, 0, 
(byte) addressSuffix});
+        }
+        catch (UnknownHostException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static IEndpointSnitch snitch()
+    {
+        return new AbstractNetworkTopologySnitch()
+        {
+            public String getRack(InetAddressAndPort endpoint)
+            {
+                return RACK1;
+            }
+
+            public String getDatacenter(InetAddressAndPort endpoint)
+            {
+                return DC1;
+            }
+        };
+    }
+
+    private static AbstractReplicationStrategy simpleStrategy(TokenMetadata 
tokenMetadata, int replicationFactor)
+    {
+        return new SimpleStrategy(KEYSPACE,
+                                  tokenMetadata,
+                                  DatabaseDescriptor.getEndpointSnitch(),
+                                  
Collections.singletonMap("replication_factor", 
Integer.toString(replicationFactor)));
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to