Repository: cassandra Updated Branches: refs/heads/trunk 8554d6b35 -> 914c66685
Introduce RangesAtEndpoint.unwrap; simplify StreamSession.addTransferRanges Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/914c6668 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/914c6668 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/914c6668 Branch: refs/heads/trunk Commit: 914c66685c5bebe1624d827a9b4562b73a08c297 Parents: 8554d6b Author: Benedict Elliott Smith <bened...@apple.com> Authored: Tue Sep 18 13:17:15 2018 +0100 Committer: Benedict Elliott Smith <bened...@apple.com> Committed: Wed Sep 26 11:12:12 2018 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/locator/RangesAtEndpoint.java | 31 +++++++++++++ .../cassandra/streaming/StreamSession.java | 11 +---- .../locator/ReplicaCollectionTest.java | 46 +++++++++++++++----- 4 files changed, 69 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/914c6668/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 9139822..e227c40 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Introduce RangesAtEndpoint.unwrap to simplify StreamSession.addTransferRanges (CASSANDRA-14770) * LOCAL_QUORUM may speculate to non-local nodes, resulting in Timeout instead of Unavailable (CASSANDRA-14735) * Avoid creating empty compaction tasks after truncate (CASSANDRA-14780) * Fail incremental repair prepare phase if it encounters sstables from un-finalized sessions (CASSANDRA-14763) http://git-wip-us.apache.org/repos/asf/cassandra/blob/914c6668/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java b/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java index f57c28e..8319d92 100644 --- a/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java +++ b/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java @@ -165,6 +165,37 @@ public class RangesAtEndpoint extends AbstractReplicaCollection<RangesAtEndpoint return Collections.unmodifiableMap(byRange); } + /** + * @return if there are no wrap around ranges contained in this RangesAtEndpoint, return self; + * otherwise, return a RangesAtEndpoint covering the same logical portions of the ring, but with those ranges unwrapped + */ + public RangesAtEndpoint unwrap() + { + int wrapAroundCount = 0; + for (Replica replica : this) + { + if (replica.range().isWrapAround()) + ++wrapAroundCount; + } + + assert wrapAroundCount <= 1; + if (wrapAroundCount == 0) + return snapshot(); + + RangesAtEndpoint.Builder builder = builder(endpoint, size() + wrapAroundCount); + for (Replica replica : this) + { + if (!replica.range().isWrapAround()) + { + builder.add(replica); + continue; + } + for (Range<Token> range : replica.range().unwrap()) + builder.add(replica.decorateSubrange(range)); + } + return builder.build(); + } + public static Collector<Replica, Builder, RangesAtEndpoint> collector(InetAddressAndPort endpoint) { return collector(ImmutableSet.of(), () -> new Builder(endpoint)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/914c6668/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index d7d0836..80fcebb 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -335,15 +335,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber //Was it safe to remove this normalize, sorting seems not to matter, merging? Maybe we should have? //Do we need to unwrap here also or is that just making it worse? //Range and if it's transient - RangesAtEndpoint.Builder unwrappedRanges = RangesAtEndpoint.builder(replicas.endpoint(), replicas.size()); - for (Replica replica : replicas) - { - for (Range<Token> unwrapped : replica.range().unwrap()) - { - unwrappedRanges.add(new Replica(replica.endpoint(), unwrapped, replica.isFull())); - } - } - List<OutgoingStream> streams = getOutgoingStreamsForRanges(unwrappedRanges.build(), stores, pendingRepair, previewKind); + RangesAtEndpoint unwrappedRanges = replicas.unwrap(); + List<OutgoingStream> streams = getOutgoingStreamsForRanges(unwrappedRanges, stores, pendingRepair, previewKind); addTransferStreams(streams); Set<Range<Token>> toBeUpdated = transferredRangesPerKeyspace.get(keyspace); if (toBeUpdated == null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/914c6668/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java b/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java index f937f96..c289d50 100644 --- a/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java +++ b/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java @@ -33,6 +33,7 @@ import org.junit.Assert; import org.junit.Test; import java.net.UnknownHostException; +import java.util.ArrayList; import java.util.Comparator; import java.util.LinkedHashSet; import java.util.List; @@ -65,7 +66,7 @@ public class ReplicaCollectionTest R2 = range(1, 2); R3 = range(2, 3); R4 = range(3, 4); - R5 = range(4, 5); + R5 = range(4, 0); BROADCAST_RANGE = range(10, 11); NULL_RANGE = range(10000, 10001); } @@ -187,7 +188,7 @@ public class ReplicaCollectionTest // remove start // we recurse on the same subset in testSubList, so just corroborate we have the correct list here { - Predicate<Replica> removeFirst = r -> r != canonicalList.get(0); + Predicate<Replica> removeFirst = r -> !r.equals(canonicalList.get(0)); assertSubList(test.filter(removeFirst), 1, canonicalList.size()); assertSubList(test.filter(removeFirst, 1), 1, Math.min(canonicalList.size(), 2)); } @@ -199,14 +200,14 @@ public class ReplicaCollectionTest // we recurse on the same subset in testSubList, so just corroborate we have the correct list here { int last = canonicalList.size() - 1; - Predicate<Replica> removeLast = r -> r != canonicalList.get(last); + Predicate<Replica> removeLast = r -> !r.equals(canonicalList.get(last)); assertSubList(test.filter(removeLast), 0, last); } if (test.size() <= 2) return; - Predicate<Replica> removeMiddle = r -> r != canonicalList.get(canonicalList.size() / 2); + Predicate<Replica> removeMiddle = r -> !r.equals(canonicalList.get(canonicalList.size() / 2)); TestCase<C> filtered = new TestCase<>(test.filter(removeMiddle), ImmutableList.copyOf(filter(canonicalList, removeMiddle::test))); filtered.testAll(subListDepth, filterDepth - 1, sortDepth); } @@ -224,7 +225,7 @@ public class ReplicaCollectionTest for (int i = 0 ; i < canonicalList.size() ; ++i) { Replica discount = canonicalList.get(i); - Assert.assertEquals(canonicalList.size() - 1, test.count(r -> r != discount)); + Assert.assertEquals(canonicalList.size() - 1, test.count(r -> !r.equals(discount))); } } @@ -245,15 +246,15 @@ public class ReplicaCollectionTest { final Comparator<Replica> comparator = (o1, o2) -> { - boolean f1 = o1 == canonicalList.get(0); - boolean f2 = o2 == canonicalList.get(0); + boolean f1 = o1.equals(canonicalList.get(0)); + boolean f2 = o2.equals(canonicalList.get(0)); return f1 == f2 ? 0 : f1 ? 1 : -1; }; TestCase<C> sorted = new TestCase<>(test.sorted(comparator), ImmutableList.sortedCopyOf(comparator, canonicalList)); sorted.testAll(subListDepth, filterDepth, sortDepth - 1); } - private void testAll(int subListDepth, int filterDepth, int sortDepth) + void testAll(int subListDepth, int filterDepth, int sortDepth) { testEndpoints(); testOrderOfIteration(); @@ -312,12 +313,35 @@ public class ReplicaCollectionTest Assert.assertEquals(new LinkedHashSet<>(Lists.transform(canonicalList, Replica::range)), test.ranges()); } - @Override - public void testAll() + public void testUnwrap(int subListDepth, int filterDepth, int sortDepth) + { + List<Replica> canonUnwrap = new ArrayList<>(); + for (Replica replica : canonicalList) + for (Range<Token> range : replica.range().unwrap()) + canonUnwrap.add(replica.decorateSubrange(range)); + RangesAtEndpoint testUnwrap = test.unwrap(); + if (testUnwrap == test) + { + Assert.assertEquals(canonicalList, canonUnwrap); + } + else + { + new RangesAtEndpointTestCase(testUnwrap, canonUnwrap) + .testAllExceptUnwrap(subListDepth, filterDepth, sortDepth); + } + } + + void testAllExceptUnwrap(int subListDepth, int filterDepth, int sortDepth) { - super.testAll(); + super.testAll(subListDepth, filterDepth, sortDepth); testRanges(); } + + void testAll(int subListDepth, int filterDepth, int sortDepth) + { + testAllExceptUnwrap(subListDepth, filterDepth, sortDepth); + testUnwrap(subListDepth, filterDepth, sortDepth); + } } private static final ImmutableList<Replica> RANGES_AT_ENDPOINT = ImmutableList.of( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org