This is an automated email from the ASF dual-hosted git repository. samt pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 4f53bc8 Bootstrap replace produce correct pending range calculations 4f53bc8 is described below commit 4f53bc87261b470adf292fdf37ed4e81bb6f8704 Author: Sam Tunnicliffe <s...@beobal.com> AuthorDate: Wed Feb 27 12:33:32 2019 +0000 Bootstrap replace produce correct pending range calculations Patch by Sam Tunnicliffe; reviewed by Benedict Elliott Smith for CASSANDRA-14802 --- CHANGES.txt | 1 + .../apache/cassandra/locator/TokenMetadata.java | 16 +- .../cassandra/locator/PendingRangesTest.java | 260 +++++++++++++++++++++ 3 files changed, 271 insertions(+), 6 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 7e1b5eb..2254452 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Calculate pending ranges for BOOTSTRAP_REPLACE correctly (CASSANDRA-14802) * Make TableCQLHelper reuse the single quote pattern (CASSANDRA-15033) * Add Zstd compressor (CASSANDRA-14482) * Fix IR prepare anti-compaction race (CASSANDRA-15027) diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java index cb189c8..c16538b 100644 --- a/src/java/org/apache/cassandra/locator/TokenMetadata.java +++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java @@ -906,19 +906,23 @@ public class TokenMetadata // At this stage newPendingRanges has been updated according to leave operations. We can // now continue the calculation by checking bootstrapping nodes. - // For each of the bootstrapping nodes, simply add and remove them one by one to - // allLeftMetadata and check in between what their ranges would be. + // For each of the bootstrapping nodes, simply add to the allLeftMetadata and check what their + // ranges would be. We actually need to clone allLeftMetadata each time as resetting its state + // after getting the new pending ranges is not as simple as just removing the bootstrapping + // endpoint. If the bootstrapping endpoint constitutes a replacement, removing it after checking + // the newly pending ranges means there are now fewer endpoints that there were originally and + // causes its next neighbour to take over its primary range which affects the next RF endpoints + // in the ring. Multimap<InetAddressAndPort, Token> bootstrapAddresses = bootstrapTokens.inverse(); for (InetAddressAndPort endpoint : bootstrapAddresses.keySet()) { Collection<Token> tokens = bootstrapAddresses.get(endpoint); - - allLeftMetadata.updateNormalTokens(tokens, endpoint); - for (Replica replica : strategy.getAddressReplicas(allLeftMetadata, endpoint)) + TokenMetadata cloned = allLeftMetadata.cloneOnlyTokenMap(); + cloned.updateNormalTokens(tokens, endpoint); + for (Replica replica : strategy.getAddressReplicas(cloned, endpoint)) { newPendingRanges.addPendingRange(replica.range(), replica); } - allLeftMetadata.removeEndpoint(endpoint); } // At this stage newPendingRanges has been updated according to leaving and bootstrapping nodes. diff --git a/test/unit/org/apache/cassandra/locator/PendingRangesTest.java b/test/unit/org/apache/cassandra/locator/PendingRangesTest.java new file mode 100644 index 0000000..48bf546 --- /dev/null +++ b/test/unit/org/apache/cassandra/locator/PendingRangesTest.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.locator; + +import java.net.UnknownHostException; +import java.util.Collections; + +import com.google.common.collect.*; +import org.junit.BeforeClass; +import org.junit.Test; + + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Token; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class PendingRangesTest +{ + private static final String RACK1 = "RACK1"; + private static final String DC1 = "DC1"; + private static final String KEYSPACE = "ks"; + private static final InetAddressAndPort PEER1 = peer(1); + private static final InetAddressAndPort PEER2 = peer(2); + private static final InetAddressAndPort PEER3 = peer(3); + private static final InetAddressAndPort PEER4 = peer(4); + private static final InetAddressAndPort PEER5 = peer(5); + private static final InetAddressAndPort PEER6 = peer(6); + + private static final InetAddressAndPort PEER1A = peer(11); + private static final InetAddressAndPort PEER4A = peer(14); + + private static final Token TOKEN1 = token(0); + private static final Token TOKEN2 = token(10); + private static final Token TOKEN3 = token(20); + private static final Token TOKEN4 = token(30); + private static final Token TOKEN5 = token(40); + private static final Token TOKEN6 = token(50); + + @BeforeClass + public static void beforeClass() throws Throwable + { + DatabaseDescriptor.daemonInitialization(); + IEndpointSnitch snitch = snitch(); + DatabaseDescriptor.setEndpointSnitch(snitch); + } + + @Test + public void calculatePendingRangesForConcurrentReplacements() + { + /* + * As described in CASSANDRA-14802, concurrent range movements can generate pending ranges + * which are far larger than strictly required, which in turn can impact availability. + * + * In the narrow case of straight replacement, the pending ranges should mirror the owned ranges + * of the nodes being replaced. + * + * Note: the following example is purely illustrative as the iteration order for processing + * bootstrapping endpoints is not guaranteed. Because of this, precisely which endpoints' pending + * ranges are correct/incorrect depends on the specifics of the ring. Concretely, the bootstrap tokens + * are ultimately backed by a HashMap, so iteration of bootstrapping nodes is based on the hashcodes + * of the endpoints. + * + * E.g. a 6 node cluster with tokens: + * + * nodeA : 0 + * nodeB : 10 + * nodeC : 20 + * nodeD : 30 + * nodeE : 40 + * nodeF : 50 + * + * with an RF of 3, this gives an initial ring of : + * + * nodeA : (50, 0], (40, 50], (30, 40] + * nodeB : (0, 10], (50, 0], (40, 50] + * nodeC : (10, 20], (0, 10], (50, 0] + * nodeD : (20, 30], (10, 20], (0, 10] + * nodeE : (30, 40], (20, 30], (10, 20] + * nodeF : (40, 50], (30, 40], (20, 30] + * + * If nodeA is replaced by node1A, then the pending ranges map should be: + * { + * (50, 0] : [node1A], + * (40, 50] : [node1A], + * (30, 40] : [node1A] + * } + * + * Starting a second concurrent replacement of a node with non-overlapping ranges + * (i.e. node4 for node4A) should result in a pending range map of: + * { + * (50, 0] : [node1A], + * (40, 50] : [node1A], + * (30, 40] : [node1A], + * (20, 30] : [node4A], + * (10, 20] : [node4A], + * (0, 10] : [node4A] + * } + * + * But, the bug in CASSANDRA-14802 causes it to be: + * { + * (50, 0] : [node1A], + * (40, 50] : [node1A], + * (30, 40] : [node1A], + * (20, 30] : [node4A], + * (10, 20] : [node4A], + * (50, 10] : [node4A] + * } + * + * so node4A incorrectly becomes a pending endpoint for an additional sub-range: (50, 0). + * + */ + TokenMetadata tm = new TokenMetadata(); + AbstractReplicationStrategy replicationStrategy = simpleStrategy(tm, 3); + + // setup initial ring + addNode(tm, PEER1, TOKEN1); + addNode(tm, PEER2, TOKEN2); + addNode(tm, PEER3, TOKEN3); + addNode(tm, PEER4, TOKEN4); + addNode(tm, PEER5, TOKEN5); + addNode(tm, PEER6, TOKEN6); + + // no pending ranges before any replacements + tm.calculatePendingRanges(replicationStrategy, KEYSPACE); + assertEquals(0, Iterators.size(tm.getPendingRanges(KEYSPACE).iterator())); + + // Ranges initially owned by PEER1 and PEER4 + RangesAtEndpoint peer1Ranges = replicationStrategy.getAddressReplicas(tm).get(PEER1); + RangesAtEndpoint peer4Ranges = replicationStrategy.getAddressReplicas(tm).get(PEER4); + // Replace PEER1 with PEER1A + replace(PEER1, PEER1A, TOKEN1, tm, replicationStrategy); + // The only pending ranges should be the ones previously belonging to PEER1 + // and these should have a single pending endpoint, PEER1A + RangesByEndpoint.Builder b1 = new RangesByEndpoint.Builder(); + peer1Ranges.iterator().forEachRemaining(replica -> b1.put(PEER1A, new Replica(PEER1A, replica.range(), replica.isFull()))); + RangesByEndpoint expected = b1.build(); + assertPendingRanges(tm.getPendingRanges(KEYSPACE), expected); + // Also verify the Multimap variant of getPendingRanges + assertPendingRanges(tm.getPendingRangesMM(KEYSPACE), expected); + + // Replace PEER4 with PEER4A + replace(PEER4, PEER4A, TOKEN4, tm, replicationStrategy); + // Pending ranges should now include the ranges originally belonging + // to PEER1 (now pending for PEER1A) and the ranges originally belonging to PEER4 + // (now pending for PEER4A). + RangesByEndpoint.Builder b2 = new RangesByEndpoint.Builder(); + peer1Ranges.iterator().forEachRemaining(replica -> b2.put(PEER1A, new Replica(PEER1A, replica.range(), replica.isFull()))); + peer4Ranges.iterator().forEachRemaining(replica -> b2.put(PEER4A, new Replica(PEER4A, replica.range(), replica.isFull()))); + expected = b2.build(); + assertPendingRanges(tm.getPendingRanges(KEYSPACE), expected); + assertPendingRanges(tm.getPendingRangesMM(KEYSPACE), expected); + } + + + private void assertPendingRanges(PendingRangeMaps pending, RangesByEndpoint expected) + { + RangesByEndpoint.Builder actual = new RangesByEndpoint.Builder(); + pending.iterator().forEachRemaining(pendingRange -> { + Replica replica = Iterators.getOnlyElement(pendingRange.getValue().iterator()); + actual.put(replica.endpoint(), replica); + }); + assertRangesByEndpoint(expected, actual.build()); + } + + private void assertPendingRanges(EndpointsByRange pending, RangesByEndpoint expected) + { + RangesByEndpoint.Builder actual = new RangesByEndpoint.Builder(); + pending.flattenEntries().forEach(entry -> actual.put(entry.getValue().endpoint(), entry.getValue())); + assertRangesByEndpoint(expected, actual.build()); + } + + + private void assertRangesByEndpoint(RangesByEndpoint expected, RangesByEndpoint actual) + { + assertEquals(expected.keySet(), actual.keySet()); + for (InetAddressAndPort endpoint : expected.keySet()) + { + RangesAtEndpoint expectedReplicas = expected.get(endpoint); + RangesAtEndpoint actualReplicas = actual.get(endpoint); + assertEquals(expectedReplicas.size(), actualReplicas.size()); + assertTrue(Iterables.all(expectedReplicas, actualReplicas::contains)); + } + } + + private void addNode(TokenMetadata tm, InetAddressAndPort replica, Token token) + { + tm.updateNormalTokens(Collections.singleton(token), replica); + } + + private void replace(InetAddressAndPort toReplace, + InetAddressAndPort replacement, + Token token, + TokenMetadata tm, + AbstractReplicationStrategy replicationStrategy) + { + assertEquals(toReplace, tm.getEndpoint(token)); + tm.addReplaceTokens(Collections.singleton(token), replacement, toReplace); + tm.calculatePendingRanges(replicationStrategy, KEYSPACE); + } + + private static Token token(long token) + { + return Murmur3Partitioner.instance.getTokenFactory().fromString(Long.toString(token)); + } + + private static InetAddressAndPort peer(int addressSuffix) + { + try + { + return InetAddressAndPort.getByAddress(new byte[]{ 127, 0, 0, (byte) addressSuffix}); + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } + } + + private static IEndpointSnitch snitch() + { + return new AbstractNetworkTopologySnitch() + { + public String getRack(InetAddressAndPort endpoint) + { + return RACK1; + } + + public String getDatacenter(InetAddressAndPort endpoint) + { + return DC1; + } + }; + } + + private static AbstractReplicationStrategy simpleStrategy(TokenMetadata tokenMetadata, int replicationFactor) + { + return new SimpleStrategy(KEYSPACE, + tokenMetadata, + DatabaseDescriptor.getEndpointSnitch(), + Collections.singletonMap("replication_factor", Integer.toString(replicationFactor))); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org