http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java b/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java index 9c90d57..4afeb5a 100644 --- a/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java +++ b/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java @@ -20,7 +20,6 @@ package org.apache.cassandra.locator; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -39,9 +38,11 @@ import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.Pair; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class OldNetworkTopologyStrategyTest { + private List<Token> keyTokens; private TokenMetadata tmd; private Map<String, ArrayList<InetAddressAndPort>> expectedResults; @@ -53,7 +54,7 @@ public class OldNetworkTopologyStrategyTest } @Before - public void init() + public void init() throws Exception { keyTokens = new ArrayList<Token>(); tmd = new TokenMetadata(); @@ -160,11 +161,11 @@ public class OldNetworkTopologyStrategyTest { for (Token keyToken : keyTokens) { - List<InetAddressAndPort> endpoints = strategy.getNaturalEndpoints(keyToken); - for (int j = 0; j < endpoints.size(); j++) + int j = 0; + for (InetAddressAndPort endpoint : strategy.getNaturalReplicasForToken(keyToken).endpoints()) { ArrayList<InetAddressAndPort> hostsExpected = expectedResults.get(keyToken.toString()); - assertEquals(endpoints.get(j), hostsExpected.get(j)); + assertEquals(endpoint, hostsExpected.get(j++)); } } } @@ -188,7 +189,6 @@ public class OldNetworkTopologyStrategyTest assertEquals(ranges.left.iterator().next().left, tokensAfterMove[movingNodeIdx]); assertEquals(ranges.left.iterator().next().right, tokens[movingNodeIdx]); assertEquals("No data should be fetched", ranges.right.size(), 0); - } @Test @@ -205,7 +205,6 @@ public class OldNetworkTopologyStrategyTest assertEquals("No data should be streamed", ranges.left.size(), 0); assertEquals(ranges.right.iterator().next().left, tokens[movingNodeIdx]); assertEquals(ranges.right.iterator().next().right, tokensAfterMove[movingNodeIdx]); - } @SuppressWarnings("unchecked") @@ -366,16 +365,21 @@ public class OldNetworkTopologyStrategyTest TokenMetadata tokenMetadataAfterMove = initTokenMetadata(tokensAfterMove); AbstractReplicationStrategy strategy = new OldNetworkTopologyStrategy("Keyspace1", tokenMetadataCurrent, endpointSnitch, optsWithRF(2)); - Collection<Range<Token>> currentRanges = strategy.getAddressRanges().get(movingNode); - Collection<Range<Token>> updatedRanges = strategy.getPendingAddressRanges(tokenMetadataAfterMove, tokensAfterMove[movingNodeIdx], movingNode); - - Pair<Set<Range<Token>>, Set<Range<Token>>> ranges = StorageService.instance.calculateStreamAndFetchRanges(currentRanges, updatedRanges); + RangesAtEndpoint currentRanges = strategy.getAddressReplicas().get(movingNode); + RangesAtEndpoint updatedRanges = strategy.getPendingAddressRanges(tokenMetadataAfterMove, tokensAfterMove[movingNodeIdx], movingNode); - return ranges; + return asRanges(StorageService.calculateStreamAndFetchRanges(currentRanges, updatedRanges)); } private static Map<String, String> optsWithRF(int rf) { return Collections.singletonMap("replication_factor", Integer.toString(rf)); } + + public static Pair<Set<Range<Token>>, Set<Range<Token>>> asRanges(Pair<RangesAtEndpoint, RangesAtEndpoint> replicas) + { + Set<Range<Token>> leftRanges = replicas.left.ranges(); + Set<Range<Token>> rightRanges = replicas.right.ranges(); + return Pair.create(leftRanges, rightRanges); + } }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/locator/PendingRangeMapsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/PendingRangeMapsTest.java b/test/unit/org/apache/cassandra/locator/PendingRangeMapsTest.java index 56fd181..8e0bc00 100644 --- a/test/unit/org/apache/cassandra/locator/PendingRangeMapsTest.java +++ b/test/unit/org/apache/cassandra/locator/PendingRangeMapsTest.java @@ -26,7 +26,6 @@ import org.apache.cassandra.dht.Token; import org.junit.Test; import java.net.UnknownHostException; -import java.util.Collection; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -38,17 +37,29 @@ public class PendingRangeMapsTest { return new Range<Token>(new BigIntegerToken(left), new BigIntegerToken(right)); } + private static void addPendingRange(PendingRangeMaps pendingRangeMaps, Range<Token> range, String endpoint) + { + try + { + pendingRangeMaps.addPendingRange(range, Replica.fullReplica(InetAddressAndPort.getByName(endpoint), range)); + } + catch (UnknownHostException e) + { + throw new AssertionError(e); + } + } + @Test public void testPendingEndpoints() throws UnknownHostException { PendingRangeMaps pendingRangeMaps = new PendingRangeMaps(); - pendingRangeMaps.addPendingRange(genRange("5", "15"), InetAddressAndPort.getByName("127.0.0.1")); - pendingRangeMaps.addPendingRange(genRange("15", "25"), InetAddressAndPort.getByName("127.0.0.2")); - pendingRangeMaps.addPendingRange(genRange("25", "35"), InetAddressAndPort.getByName("127.0.0.3")); - pendingRangeMaps.addPendingRange(genRange("35", "45"), InetAddressAndPort.getByName("127.0.0.4")); - pendingRangeMaps.addPendingRange(genRange("45", "55"), InetAddressAndPort.getByName("127.0.0.5")); - pendingRangeMaps.addPendingRange(genRange("45", "65"), InetAddressAndPort.getByName("127.0.0.6")); + addPendingRange(pendingRangeMaps, genRange("5", "15"), "127.0.0.1"); + addPendingRange(pendingRangeMaps, genRange("15", "25"), "127.0.0.2"); + addPendingRange(pendingRangeMaps, genRange("25", "35"), "127.0.0.3"); + addPendingRange(pendingRangeMaps, genRange("35", "45"), "127.0.0.4"); + addPendingRange(pendingRangeMaps, genRange("45", "55"), "127.0.0.5"); + addPendingRange(pendingRangeMaps, genRange("45", "65"), "127.0.0.6"); assertEquals(0, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("0")).size()); assertEquals(0, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("5")).size()); @@ -61,8 +72,8 @@ public class PendingRangeMapsTest { assertEquals(2, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("55")).size()); assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("65")).size()); - Collection<InetAddressAndPort> endpoints = pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("15")); - assertTrue(endpoints.contains(InetAddressAndPort.getByName("127.0.0.1"))); + EndpointsForToken replicas = pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("15")); + assertTrue(replicas.endpoints().contains(InetAddressAndPort.getByName("127.0.0.1"))); } @Test @@ -70,13 +81,13 @@ public class PendingRangeMapsTest { { PendingRangeMaps pendingRangeMaps = new PendingRangeMaps(); - pendingRangeMaps.addPendingRange(genRange("5", "15"), InetAddressAndPort.getByName("127.0.0.1")); - pendingRangeMaps.addPendingRange(genRange("15", "25"), InetAddressAndPort.getByName("127.0.0.2")); - pendingRangeMaps.addPendingRange(genRange("25", "35"), InetAddressAndPort.getByName("127.0.0.3")); - pendingRangeMaps.addPendingRange(genRange("35", "45"), InetAddressAndPort.getByName("127.0.0.4")); - pendingRangeMaps.addPendingRange(genRange("45", "55"), InetAddressAndPort.getByName("127.0.0.5")); - pendingRangeMaps.addPendingRange(genRange("45", "65"), InetAddressAndPort.getByName("127.0.0.6")); - pendingRangeMaps.addPendingRange(genRange("65", "7"), InetAddressAndPort.getByName("127.0.0.7")); + addPendingRange(pendingRangeMaps, genRange("5", "15"), "127.0.0.1"); + addPendingRange(pendingRangeMaps, genRange("15", "25"), "127.0.0.2"); + addPendingRange(pendingRangeMaps, genRange("25", "35"), "127.0.0.3"); + addPendingRange(pendingRangeMaps, genRange("35", "45"), "127.0.0.4"); + addPendingRange(pendingRangeMaps, genRange("45", "55"), "127.0.0.5"); + addPendingRange(pendingRangeMaps, genRange("45", "65"), "127.0.0.6"); + addPendingRange(pendingRangeMaps, genRange("65", "7"), "127.0.0.7"); assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("0")).size()); assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("5")).size()); @@ -90,8 +101,8 @@ public class PendingRangeMapsTest { assertEquals(2, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("55")).size()); assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("65")).size()); - Collection<InetAddressAndPort> endpoints = pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("6")); - assertTrue(endpoints.contains(InetAddressAndPort.getByName("127.0.0.1"))); - assertTrue(endpoints.contains(InetAddressAndPort.getByName("127.0.0.7"))); + EndpointsForToken replicas = pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("6")); + assertTrue(replicas.endpoints().contains(InetAddressAndPort.getByName("127.0.0.1"))); + assertTrue(replicas.endpoints().contains(InetAddressAndPort.getByName("127.0.0.7"))); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java b/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java new file mode 100644 index 0000000..66eff23 --- /dev/null +++ b/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.locator; + +import com.google.common.base.Predicates; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict; +import org.apache.cassandra.utils.FBUtilities; +import org.junit.Assert; +import org.junit.Test; + +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.Comparator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import static org.apache.cassandra.locator.Replica.fullReplica; +import static org.apache.cassandra.locator.Replica.transientReplica; + +public class ReplicaCollectionTest +{ + + static final InetAddressAndPort EP1, EP2, EP3, EP4, EP5, BROADCAST_EP, NULL_EP; + static final Range<Token> R1, R2, R3, R4, R5, BROADCAST_RANGE, NULL_RANGE; + + static + { + try + { + EP1 = InetAddressAndPort.getByName("127.0.0.1"); + EP2 = InetAddressAndPort.getByName("127.0.0.2"); + EP3 = InetAddressAndPort.getByName("127.0.0.3"); + EP4 = InetAddressAndPort.getByName("127.0.0.4"); + EP5 = InetAddressAndPort.getByName("127.0.0.5"); + BROADCAST_EP = FBUtilities.getBroadcastAddressAndPort(); + NULL_EP = InetAddressAndPort.getByName("127.255.255.255"); + R1 = range(0, 1); + R2 = range(1, 2); + R3 = range(2, 3); + R4 = range(3, 4); + R5 = range(4, 5); + BROADCAST_RANGE = range(10, 11); + NULL_RANGE = range(10000, 10001); + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } + } + + static Token tk(long t) + { + return new Murmur3Partitioner.LongToken(t); + } + + static Range<Token> range(long left, long right) + { + return new Range<>(tk(left), tk(right)); + } + + static class TestCase<C extends AbstractReplicaCollection<C>> + { + final C test; + final List<Replica> canonicalList; + final Multimap<InetAddressAndPort, Replica> canonicalByEndpoint; + final Multimap<Range<Token>, Replica> canonicalByRange; + + TestCase(C test, List<Replica> canonicalList) + { + this.test = test; + this.canonicalList = canonicalList; + this.canonicalByEndpoint = HashMultimap.create(); + this.canonicalByRange = HashMultimap.create(); + for (Replica replica : canonicalList) + canonicalByEndpoint.put(replica.endpoint(), replica); + for (Replica replica : canonicalList) + canonicalByRange.put(replica.range(), replica); + } + + void testSize() + { + Assert.assertEquals(canonicalList.size(), test.size()); + } + + void testEquals() + { + Assert.assertTrue(Iterables.elementsEqual(canonicalList, test)); + } + + void testEndpoints() + { + // TODO: we should do more exhaustive tests of the collection + Assert.assertEquals(ImmutableSet.copyOf(canonicalByEndpoint.keySet()), ImmutableSet.copyOf(test.endpoints())); + try + { + test.endpoints().add(EP5); + Assert.fail(); + } catch (UnsupportedOperationException e) {} + try + { + test.endpoints().remove(EP5); + Assert.fail(); + } catch (UnsupportedOperationException e) {} + + Assert.assertTrue(test.endpoints().containsAll(canonicalByEndpoint.keySet())); + for (InetAddressAndPort ep : canonicalByEndpoint.keySet()) + Assert.assertTrue(test.endpoints().contains(ep)); + for (InetAddressAndPort ep : ImmutableList.of(EP1, EP2, EP3, EP4, EP5, BROADCAST_EP)) + if (!canonicalByEndpoint.containsKey(ep)) + Assert.assertFalse(test.endpoints().contains(ep)); + } + + public void testOrderOfIteration() + { + Assert.assertEquals(canonicalList, ImmutableList.copyOf(test)); + Assert.assertEquals(canonicalList, test.stream().collect(Collectors.toList())); + Assert.assertEquals(new LinkedHashSet<>(Lists.transform(canonicalList, Replica::endpoint)), test.endpoints()); + } + + void testSelect(int subListDepth, int filterDepth, int sortDepth, int selectDepth) + { + TestCase<C> allMatchZeroCapacity = new TestCase<>(test.select().add(Predicates.alwaysTrue(), 0).get(), Collections.emptyList()); + allMatchZeroCapacity.testAll(subListDepth, filterDepth, sortDepth, selectDepth - 1); + + TestCase<C> noMatchFullCapacity = new TestCase<>(test.select().add(Predicates.alwaysFalse(), canonicalList.size()).get(), Collections.emptyList()); + noMatchFullCapacity.testAll(subListDepth, filterDepth, sortDepth,selectDepth - 1); + + if (canonicalList.size() <= 2) + return; + + List<Replica> newOrderList = ImmutableList.of(canonicalList.get(2), canonicalList.get(1), canonicalList.get(0)); + TestCase<C> newOrder = new TestCase<>( + test.select() + .add(r -> r == newOrderList.get(0), 3) + .add(r -> r == newOrderList.get(1), 3) + .add(r -> r == newOrderList.get(2), 3) + .get(), newOrderList + ); + newOrder.testAll(subListDepth, filterDepth, sortDepth,selectDepth - 1); + } + + private void assertSubList(C subCollection, int from, int to) + { + Assert.assertTrue(subCollection.isSnapshot); + if (from == to) + { + Assert.assertTrue(subCollection.isEmpty()); + } + else + { + List<Replica> subList = this.test.list.subList(from, to); + if (test.isSnapshot) + Assert.assertSame(subList.getClass(), subCollection.list.getClass()); + Assert.assertEquals(subList, subCollection.list); + } + } + + void testSubList(int subListDepth, int filterDepth, int sortDepth, int selectDepth) + { + if (test.isSnapshot) + Assert.assertSame(test, test.subList(0, test.size())); + + if (test.isEmpty()) + return; + + TestCase<C> skipFront = new TestCase<>(test.subList(1, test.size()), canonicalList.subList(1, canonicalList.size())); + assertSubList(skipFront.test, 1, canonicalList.size()); + skipFront.testAll(subListDepth - 1, filterDepth, sortDepth, selectDepth); + TestCase<C> skipBack = new TestCase<>(test.subList(0, test.size() - 1), canonicalList.subList(0, canonicalList.size() - 1)); + assertSubList(skipBack.test, 0, canonicalList.size() - 1); + skipBack.testAll(subListDepth - 1, filterDepth, sortDepth, selectDepth); + } + + void testFilter(int subListDepth, int filterDepth, int sortDepth, int selectDepth) + { + if (test.isSnapshot) + Assert.assertSame(test, test.filter(Predicates.alwaysTrue())); + + if (test.isEmpty()) + return; + // remove start + // we recurse on the same subset in testSubList, so just corroborate we have the correct list here + assertSubList(test.filter(r -> r != canonicalList.get(0)), 1, canonicalList.size()); + + if (test.size() <= 1) + return; + // remove end + // we recurse on the same subset in testSubList, so just corroborate we have the correct list here + assertSubList(test.filter(r -> r != canonicalList.get(canonicalList.size() - 1)), 0, canonicalList.size() - 1); + + if (test.size() <= 2) + return; + Predicate<Replica> removeMiddle = r -> r != canonicalList.get(canonicalList.size() / 2); + TestCase<C> filtered = new TestCase<>(test.filter(removeMiddle), ImmutableList.copyOf(Iterables.filter(canonicalList, removeMiddle::test))); + filtered.testAll(subListDepth, filterDepth - 1, sortDepth, selectDepth); + } + + void testContains() + { + for (Replica replica : canonicalList) + Assert.assertTrue(test.contains(replica)); + Assert.assertFalse(test.contains(fullReplica(NULL_EP, NULL_RANGE))); + } + + void testGet() + { + for (int i = 0 ; i < canonicalList.size() ; ++i) + Assert.assertEquals(canonicalList.get(i), test.get(i)); + } + + void testSort(int subListDepth, int filterDepth, int sortDepth, int selectDepth) + { + final Comparator<Replica> comparator = (o1, o2) -> + { + boolean f1 = o1 == canonicalList.get(0); + boolean f2 = o2 == canonicalList.get(0); + return f1 == f2 ? 0 : f1 ? 1 : -1; + }; + TestCase<C> sorted = new TestCase<>(test.sorted(comparator), ImmutableList.sortedCopyOf(comparator, canonicalList)); + sorted.testAll(subListDepth, filterDepth, sortDepth - 1, selectDepth); + } + + private void testAll(int subListDepth, int filterDepth, int sortDepth, int selectDepth) + { + testEndpoints(); + testOrderOfIteration(); + testContains(); + testGet(); + testEquals(); + testSize(); + if (subListDepth > 0) + testSubList(subListDepth, filterDepth, sortDepth, selectDepth); + if (filterDepth > 0) + testFilter(subListDepth, filterDepth, sortDepth, selectDepth); + if (sortDepth > 0) + testSort(subListDepth, filterDepth, sortDepth, selectDepth); + if (selectDepth > 0) + testSelect(subListDepth, filterDepth, sortDepth, selectDepth); + } + + public void testAll() + { + testAll(2, 2, 2, 2); + } + } + + static class RangesAtEndpointTestCase extends TestCase<RangesAtEndpoint> + { + RangesAtEndpointTestCase(RangesAtEndpoint test, List<Replica> canonicalList) + { + super(test, canonicalList); + } + + void testRanges() + { + Assert.assertEquals(ImmutableSet.copyOf(canonicalByRange.keySet()), ImmutableSet.copyOf(test.ranges())); + try + { + test.ranges().add(R5); + Assert.fail(); + } catch (UnsupportedOperationException e) {} + try + { + test.ranges().remove(R5); + Assert.fail(); + } catch (UnsupportedOperationException e) {} + + Assert.assertTrue(test.ranges().containsAll(canonicalByRange.keySet())); + for (Range<Token> range : canonicalByRange.keySet()) + Assert.assertTrue(test.ranges().contains(range)); + for (Range<Token> range : ImmutableList.of(R1, R2, R3, R4, R5, BROADCAST_RANGE)) + if (!canonicalByRange.containsKey(range)) + Assert.assertFalse(test.ranges().contains(range)); + } + + @Override + public void testOrderOfIteration() + { + super.testOrderOfIteration(); + Assert.assertEquals(new LinkedHashSet<>(Lists.transform(canonicalList, Replica::range)), test.ranges()); + } + + @Override + public void testAll() + { + super.testAll(); + testRanges(); + } + } + + private static final ImmutableList<Replica> RANGES_AT_ENDPOINT = ImmutableList.of( + fullReplica(EP1, R1), + fullReplica(EP1, R2), + transientReplica(EP1, R3), + fullReplica(EP1, R4), + transientReplica(EP1, R5) + ); + + @Test + public void testRangesAtEndpoint() + { + ImmutableList<Replica> canonical = RANGES_AT_ENDPOINT; + new RangesAtEndpointTestCase( + RangesAtEndpoint.copyOf(canonical), canonical + ).testAll(); + } + + @Test + public void testMutableRangesAtEndpoint() + { + ImmutableList<Replica> canonical1 = RANGES_AT_ENDPOINT.subList(0, RANGES_AT_ENDPOINT.size()); + RangesAtEndpoint.Mutable test = new RangesAtEndpoint.Mutable(RANGES_AT_ENDPOINT.get(0).endpoint(), canonical1.size()); + test.addAll(canonical1, Conflict.NONE); + try + { // incorrect range + test.addAll(canonical1, Conflict.NONE); + Assert.fail(); + } catch (IllegalArgumentException e) { } + test.addAll(canonical1, Conflict.DUPLICATE); // we ignore exact duplicates + try + { // invalid endpoint; always error + test.add(fullReplica(EP2, BROADCAST_RANGE), Conflict.ALL); + Assert.fail(); + } catch (IllegalArgumentException e) { } + try + { // conflict on isFull/isTransient + test.add(fullReplica(EP1, R3), Conflict.DUPLICATE); + Assert.fail(); + } catch (IllegalArgumentException e) { } + test.add(fullReplica(EP1, R3), Conflict.ALL); + + new RangesAtEndpointTestCase(test, canonical1).testAll(); + + RangesAtEndpoint view = test.asImmutableView(); + RangesAtEndpoint snapshot = view.subList(0, view.size()); + + ImmutableList<Replica> canonical2 = RANGES_AT_ENDPOINT; + test.addAll(canonical2.reverse(), Conflict.DUPLICATE); + new TestCase<>(snapshot, canonical1).testAll(); + new TestCase<>(view, canonical2).testAll(); + new TestCase<>(test, canonical2).testAll(); + } + + private static final ImmutableList<Replica> ENDPOINTS_FOR_X = ImmutableList.of( + fullReplica(EP1, R1), + fullReplica(EP2, R1), + transientReplica(EP3, R1), + fullReplica(EP4, R1), + transientReplica(EP5, R1) + ); + + @Test + public void testEndpointsForRange() + { + ImmutableList<Replica> canonical = ENDPOINTS_FOR_X; + new TestCase<>( + EndpointsForRange.copyOf(canonical), canonical + ).testAll(); + } + + @Test + public void testMutableEndpointsForRange() + { + ImmutableList<Replica> canonical1 = ENDPOINTS_FOR_X.subList(0, ENDPOINTS_FOR_X.size() - 1); + EndpointsForRange.Mutable test = new EndpointsForRange.Mutable(R1, canonical1.size()); + test.addAll(canonical1, Conflict.NONE); + try + { // incorrect range + test.addAll(canonical1, Conflict.NONE); + Assert.fail(); + } catch (IllegalArgumentException e) { } + test.addAll(canonical1, Conflict.DUPLICATE); // we ignore exact duplicates + try + { // incorrect range + test.add(fullReplica(BROADCAST_EP, R2), Conflict.ALL); + Assert.fail(); + } catch (IllegalArgumentException e) { } + try + { // conflict on isFull/isTransient + test.add(transientReplica(EP1, R1), Conflict.DUPLICATE); + Assert.fail(); + } catch (IllegalArgumentException e) { } + test.add(transientReplica(EP1, R1), Conflict.ALL); + + new TestCase<>(test, canonical1).testAll(); + + EndpointsForRange view = test.asImmutableView(); + EndpointsForRange snapshot = view.subList(0, view.size()); + + ImmutableList<Replica> canonical2 = ENDPOINTS_FOR_X; + test.addAll(canonical2.reverse(), Conflict.DUPLICATE); + new TestCase<>(snapshot, canonical1).testAll(); + new TestCase<>(view, canonical2).testAll(); + new TestCase<>(test, canonical2).testAll(); + } + + @Test + public void testEndpointsForToken() + { + ImmutableList<Replica> canonical = ENDPOINTS_FOR_X; + new TestCase<>( + EndpointsForToken.copyOf(tk(1), canonical), canonical + ).testAll(); + } + + @Test + public void testMutableEndpointsForToken() + { + ImmutableList<Replica> canonical1 = ENDPOINTS_FOR_X.subList(0, ENDPOINTS_FOR_X.size() - 1); + EndpointsForToken.Mutable test = new EndpointsForToken.Mutable(tk(1), canonical1.size()); + test.addAll(canonical1, Conflict.NONE); + try + { // incorrect range + test.addAll(canonical1, Conflict.NONE); + Assert.fail(); + } catch (IllegalArgumentException e) { } + test.addAll(canonical1, Conflict.DUPLICATE); // we ignore exact duplicates + try + { // incorrect range + test.add(fullReplica(BROADCAST_EP, R2), Conflict.ALL); + Assert.fail(); + } catch (IllegalArgumentException e) { } + try + { // conflict on isFull/isTransient + test.add(transientReplica(EP1, R1), Conflict.DUPLICATE); + Assert.fail(); + } catch (IllegalArgumentException e) { } + test.add(transientReplica(EP1, R1), Conflict.ALL); + + new TestCase<>(test, canonical1).testAll(); + + EndpointsForToken view = test.asImmutableView(); + EndpointsForToken snapshot = view.subList(0, view.size()); + + ImmutableList<Replica> canonical2 = ENDPOINTS_FOR_X; + test.addAll(canonical2.reverse(), Conflict.DUPLICATE); + new TestCase<>(snapshot, canonical1).testAll(); + new TestCase<>(view, canonical2).testAll(); + new TestCase<>(test, canonical2).testAll(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/locator/ReplicaUtils.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/ReplicaUtils.java b/test/unit/org/apache/cassandra/locator/ReplicaUtils.java new file mode 100644 index 0000000..66f538f --- /dev/null +++ b/test/unit/org/apache/cassandra/locator/ReplicaUtils.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.locator; + +import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; + +import static org.apache.cassandra.locator.Replica.fullReplica; +import static org.apache.cassandra.locator.Replica.transientReplica; + +public class ReplicaUtils +{ + public static final Range<Token> FULL_RANGE = new Range<>(Murmur3Partitioner.MINIMUM, Murmur3Partitioner.MINIMUM); + public static final AbstractBounds<PartitionPosition> FULL_BOUNDS = new Range<>(Murmur3Partitioner.MINIMUM.minKeyBound(), Murmur3Partitioner.MINIMUM.maxKeyBound()); + + public static Replica full(InetAddressAndPort endpoint) + { + return fullReplica(endpoint, FULL_RANGE); + } + + public static Replica trans(InetAddressAndPort endpoint) + { + return transientReplica(endpoint, FULL_RANGE); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/locator/ReplicationFactorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/ReplicationFactorTest.java b/test/unit/org/apache/cassandra/locator/ReplicationFactorTest.java new file mode 100644 index 0000000..a0427db --- /dev/null +++ b/test/unit/org/apache/cassandra/locator/ReplicationFactorTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.locator; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.gms.Gossiper; + +public class ReplicationFactorTest +{ + + @BeforeClass + public static void setupClass() + { + DatabaseDescriptor.daemonInitialization(); + DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true); + Gossiper.instance.start(1); + } + + private static void assertRfParseFailure(String s) + { + try + { + ReplicationFactor.fromString(s); + Assert.fail("Expected IllegalArgumentException"); + } + catch (IllegalArgumentException e) + { + // expected + } + } + + private static void assertRfParse(String s, int expectedReplicas, int expectedTrans) + { + ReplicationFactor rf = ReplicationFactor.fromString(s); + Assert.assertEquals(expectedReplicas, rf.allReplicas); + Assert.assertEquals(expectedTrans, rf.transientReplicas()); + Assert.assertEquals(expectedReplicas - expectedTrans, rf.fullReplicas); + } + + @Test + public void parseTest() + { + assertRfParse("3", 3, 0); + assertRfParse("3/1", 3, 1); + + assertRfParse("5", 5, 0); + assertRfParse("5/2", 5, 2); + + assertRfParseFailure("-1"); + assertRfParseFailure("3/3"); + assertRfParseFailure("3/4"); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java b/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java index a8caa72..e6a9365 100644 --- a/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java +++ b/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java @@ -17,9 +17,7 @@ */ package org.apache.cassandra.locator; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; import org.apache.commons.lang3.StringUtils; @@ -75,7 +73,7 @@ public class ReplicationStrategyEndpointCacheTest public void runEndpointsWereCachedTest(Class stratClass, Map<String, String> configOptions) throws Exception { setup(stratClass, configOptions); - assert strategy.getNaturalEndpoints(searchToken).equals(strategy.getNaturalEndpoints(searchToken)); + assert strategy.getNaturalReplicasForToken(searchToken).equals(strategy.getNaturalReplicasForToken(searchToken)); } @Test @@ -89,34 +87,34 @@ public class ReplicationStrategyEndpointCacheTest public void runCacheRespectsTokenChangesTest(Class stratClass, Map<String, String> configOptions) throws Exception { setup(stratClass, configOptions); - ArrayList<InetAddressAndPort> initial; - ArrayList<InetAddressAndPort> endpoints; + EndpointsForToken initial; + EndpointsForToken replicas; - endpoints = strategy.getNaturalEndpoints(searchToken); - assert endpoints.size() == 5 : StringUtils.join(endpoints, ","); + replicas = strategy.getNaturalReplicasForToken(searchToken); + assert replicas.size() == 5 : StringUtils.join(replicas, ","); // test token addition, in DC2 before existing token - initial = strategy.getNaturalEndpoints(searchToken); + initial = strategy.getNaturalReplicasForToken(searchToken); tmd.updateNormalToken(new BigIntegerToken(String.valueOf(35)), InetAddressAndPort.getByName("127.0.0.5")); - endpoints = strategy.getNaturalEndpoints(searchToken); - assert endpoints.size() == 5 : StringUtils.join(endpoints, ","); - assert !endpoints.equals(initial); + replicas = strategy.getNaturalReplicasForToken(searchToken); + assert replicas.size() == 5 : StringUtils.join(replicas, ","); + assert !replicas.equals(initial); // test token removal, newly created token - initial = strategy.getNaturalEndpoints(searchToken); + initial = strategy.getNaturalReplicasForToken(searchToken); tmd.removeEndpoint(InetAddressAndPort.getByName("127.0.0.5")); - endpoints = strategy.getNaturalEndpoints(searchToken); - assert endpoints.size() == 5 : StringUtils.join(endpoints, ","); - assert !endpoints.contains(InetAddressAndPort.getByName("127.0.0.5")); - assert !endpoints.equals(initial); + replicas = strategy.getNaturalReplicasForToken(searchToken); + assert replicas.size() == 5 : StringUtils.join(replicas, ","); + assert !replicas.endpoints().contains(InetAddressAndPort.getByName("127.0.0.5")); + assert !replicas.equals(initial); // test token change - initial = strategy.getNaturalEndpoints(searchToken); + initial = strategy.getNaturalReplicasForToken(searchToken); //move .8 after search token but before other DC3 tmd.updateNormalToken(new BigIntegerToken(String.valueOf(25)), InetAddressAndPort.getByName("127.0.0.8")); - endpoints = strategy.getNaturalEndpoints(searchToken); - assert endpoints.size() == 5 : StringUtils.join(endpoints, ","); - assert !endpoints.equals(initial); + replicas = strategy.getNaturalReplicasForToken(searchToken); + assert replicas.size() == 5 : StringUtils.join(replicas, ","); + assert !replicas.equals(initial); } protected static class FakeSimpleStrategy extends SimpleStrategy @@ -128,11 +126,11 @@ public class ReplicationStrategyEndpointCacheTest super(keyspaceName, tokenMetadata, snitch, configOptions); } - public List<InetAddressAndPort> calculateNaturalEndpoints(Token token, TokenMetadata metadata) + public EndpointsForRange calculateNaturalReplicas(Token token, TokenMetadata metadata) { - assert !called : "calculateNaturalEndpoints was already called, result should have been cached"; + assert !called : "calculateNaturalReplicas was already called, result should have been cached"; called = true; - return super.calculateNaturalEndpoints(token, metadata); + return super.calculateNaturalReplicas(token, metadata); } } @@ -145,11 +143,11 @@ public class ReplicationStrategyEndpointCacheTest super(keyspaceName, tokenMetadata, snitch, configOptions); } - public List<InetAddressAndPort> calculateNaturalEndpoints(Token token, TokenMetadata metadata) + public EndpointsForRange calculateNaturalReplicas(Token token, TokenMetadata metadata) { - assert !called : "calculateNaturalEndpoints was already called, result should have been cached"; + assert !called : "calculateNaturalReplicas was already called, result should have been cached"; called = true; - return super.calculateNaturalEndpoints(token, metadata); + return super.calculateNaturalReplicas(token, metadata); } } @@ -162,11 +160,11 @@ public class ReplicationStrategyEndpointCacheTest super(keyspaceName, tokenMetadata, snitch, configOptions); } - public List<InetAddressAndPort> calculateNaturalEndpoints(Token token, TokenMetadata metadata) + public EndpointsForRange calculateNaturalReplicas(Token token, TokenMetadata metadata) { - assert !called : "calculateNaturalEndpoints was already called, result should have been cached"; + assert !called : "calculateNaturalReplicas was already called, result should have been cached"; called = true; - return super.calculateNaturalEndpoints(token, metadata); + return super.calculateNaturalReplicas(token, metadata); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java b/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java index fe77b0e..1e0c152 100644 --- a/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java +++ b/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java @@ -19,14 +19,22 @@ package org.apache.cassandra.locator; import java.net.UnknownHostException; import java.util.ArrayList; -import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Range; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.dht.IPartitioner; @@ -53,6 +61,7 @@ public class SimpleStrategyTest { SchemaLoader.prepareServer(); SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(1)); + DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true); } @Test @@ -107,12 +116,12 @@ public class SimpleStrategyTest for (int i = 0; i < keyTokens.length; i++) { - List<InetAddressAndPort> endpoints = strategy.getNaturalEndpoints(keyTokens[i]); - assertEquals(strategy.getReplicationFactor(), endpoints.size()); + EndpointsForToken replicas = strategy.getNaturalReplicasForToken(keyTokens[i]); + assertEquals(strategy.getReplicationFactor().allReplicas, replicas.size()); List<InetAddressAndPort> correctEndpoints = new ArrayList<>(); - for (int j = 0; j < endpoints.size(); j++) + for (int j = 0; j < replicas.size(); j++) correctEndpoints.add(hosts.get((i + j + 1) % hosts.size())); - assertEquals(new HashSet<>(correctEndpoints), new HashSet<>(endpoints)); + assertEquals(new HashSet<>(correctEndpoints), replicas.endpoints()); } } } @@ -154,30 +163,80 @@ public class SimpleStrategyTest PendingRangeCalculatorService.calculatePendingRanges(strategy, keyspaceName); - int replicationFactor = strategy.getReplicationFactor(); + int replicationFactor = strategy.getReplicationFactor().allReplicas; for (int i = 0; i < keyTokens.length; i++) { - Collection<InetAddressAndPort> endpoints = tmd.getWriteEndpoints(keyTokens[i], keyspaceName, strategy.getNaturalEndpoints(keyTokens[i])); - assertTrue(endpoints.size() >= replicationFactor); + EndpointsForToken replicas = tmd.getWriteEndpoints(keyTokens[i], keyspaceName, strategy.getNaturalReplicasForToken(keyTokens[i])); + assertTrue(replicas.size() >= replicationFactor); for (int j = 0; j < replicationFactor; j++) { //Check that the old nodes are definitely included - assertTrue(endpoints.contains(hosts.get((i + j + 1) % hosts.size()))); + assertTrue(replicas.endpoints().contains(hosts.get((i + j + 1) % hosts.size()))); } // bootstrapEndpoint should be in the endpoints for i in MAX-RF to MAX, but not in any earlier ep. if (i < RING_SIZE - replicationFactor) - assertFalse(endpoints.contains(bootstrapEndpoint)); + assertFalse(replicas.endpoints().contains(bootstrapEndpoint)); else - assertTrue(endpoints.contains(bootstrapEndpoint)); + assertTrue(replicas.endpoints().contains(bootstrapEndpoint)); } } StorageServiceAccessor.setTokenMetadata(oldTmd); } + private static Token tk(long t) + { + return new Murmur3Partitioner.LongToken(t); + } + + private static Range<Token> range(long l, long r) + { + return new Range<>(tk(l), tk(r)); + } + + @Test + public void transientReplica() throws Exception + { + IEndpointSnitch snitch = new SimpleSnitch(); + DatabaseDescriptor.setEndpointSnitch(snitch); + + List<InetAddressAndPort> endpoints = Lists.newArrayList(InetAddressAndPort.getByName("127.0.0.1"), + InetAddressAndPort.getByName("127.0.0.2"), + InetAddressAndPort.getByName("127.0.0.3"), + InetAddressAndPort.getByName("127.0.0.4")); + + Multimap<InetAddressAndPort, Token> tokens = HashMultimap.create(); + tokens.put(endpoints.get(0), tk(100)); + tokens.put(endpoints.get(1), tk(200)); + tokens.put(endpoints.get(2), tk(300)); + tokens.put(endpoints.get(3), tk(400)); + TokenMetadata metadata = new TokenMetadata(); + metadata.updateNormalTokens(tokens); + + Map<String, String> configOptions = new HashMap<String, String>(); + configOptions.put("replication_factor", "3/1"); + + SimpleStrategy strategy = new SimpleStrategy("ks", metadata, snitch, configOptions); + + Range<Token> range1 = range(400, 100); + Assert.assertEquals(EndpointsForToken.of(range1.right, + Replica.fullReplica(endpoints.get(0), range1), + Replica.fullReplica(endpoints.get(1), range1), + Replica.transientReplica(endpoints.get(2), range1)), + strategy.getNaturalReplicasForToken(tk(99))); + + + Range<Token> range2 = range(100, 200); + Assert.assertEquals(EndpointsForToken.of(range2.right, + Replica.fullReplica(endpoints.get(1), range2), + Replica.fullReplica(endpoints.get(2), range2), + Replica.transientReplica(endpoints.get(3), range2)), + strategy.getNaturalReplicasForToken(tk(101))); + } + private AbstractReplicationStrategy getStrategy(String keyspaceName, TokenMetadata tmd) { KeyspaceMetadata ksmd = Schema.instance.getKeyspaceMetadata(keyspaceName); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java b/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java index b589d2d..ae8c011 100644 --- a/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java +++ b/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java @@ -118,7 +118,7 @@ public class TokenMetadataTest } @Override - public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2) + public int compareEndpoints(InetAddressAndPort target, Replica a1, Replica a2) { return 0; } @@ -165,7 +165,7 @@ public class TokenMetadataTest } @Override - public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2) + public int compareEndpoints(InetAddressAndPort target, Replica a1, Replica a2) { return 0; } @@ -216,7 +216,7 @@ public class TokenMetadataTest } @Override - public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2) + public int compareEndpoints(InetAddressAndPort target, Replica a1, Replica a2) { return 0; } @@ -263,7 +263,7 @@ public class TokenMetadataTest } @Override - public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2) + public int compareEndpoints(InetAddressAndPort target, Replica a1, Replica a2) { return 0; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java b/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java index d97cdb8..e226d32 100644 --- a/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java +++ b/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java @@ -25,19 +25,20 @@ import org.junit.Test; import org.junit.Assert; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.BufferDecoratedKey; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.RegularAndStaticColumns; import org.apache.cassandra.db.partitions.PartitionUpdate; -import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.ReplicaUtils; import org.apache.cassandra.net.MessagingService.Verb; import org.apache.cassandra.schema.MockSchema; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.paxos.Commit; import org.apache.cassandra.utils.ByteBufferUtil; +import static org.apache.cassandra.locator.ReplicaUtils.full; + public class WriteCallbackInfoTest { @BeforeClass @@ -65,7 +66,7 @@ public class WriteCallbackInfoTest ? new Commit(UUID.randomUUID(), new PartitionUpdate.Builder(metadata, ByteBufferUtil.EMPTY_BYTE_BUFFER, RegularAndStaticColumns.NONE, 1).build()) : new Mutation(PartitionUpdate.simpleBuilder(metadata, "").build()); - WriteCallbackInfo wcbi = new WriteCallbackInfo(InetAddressAndPort.getByName("192.168.1.1"), null, new MessageOut(verb, payload, null), null, cl, allowHints); + WriteCallbackInfo wcbi = new WriteCallbackInfo(full(InetAddressAndPort.getByName("192.168.1.1")), null, new MessageOut(verb, payload, null), null, cl, allowHints); Assert.assertEquals(expectHint, wcbi.shouldHint()); if (expectHint) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java b/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java index 6a8dc83..379031c 100644 --- a/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java +++ b/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java @@ -46,6 +46,7 @@ import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.locator.AbstractEndpointSnitch; import org.apache.cassandra.locator.IEndpointSnitch; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.Replica; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.MessagingServiceTest; @@ -172,7 +173,7 @@ public class OutboundMessagingConnectionTest return nodeToDc.get(endpoint); } - public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2) + public int compareEndpoints(InetAddressAndPort target, Replica a1, Replica a2) { return 0; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java deleted file mode 100644 index 802a673..0000000 --- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java +++ /dev/null @@ -1,191 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.repair; - -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.UUID; - -import com.google.common.collect.Lists; -import org.junit.BeforeClass; -import org.junit.Test; -import org.apache.cassandra.SchemaLoader; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.dht.IPartitioner; -import org.apache.cassandra.dht.Murmur3Partitioner; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.schema.KeyspaceParams; -import org.apache.cassandra.schema.Schema; -import org.apache.cassandra.schema.TableId; -import org.apache.cassandra.service.ActiveRepairService; -import org.apache.cassandra.streaming.StreamPlan; -import org.apache.cassandra.streaming.PreviewKind; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.MerkleTree; -import org.apache.cassandra.utils.MerkleTrees; -import org.apache.cassandra.utils.UUIDGen; - -import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -public class LocalSyncTaskTest extends AbstractRepairTest -{ - private static final IPartitioner partitioner = Murmur3Partitioner.instance; - public static final String KEYSPACE1 = "DifferencerTest"; - public static final String CF_STANDARD = "Standard1"; - public static ColumnFamilyStore cfs; - - @BeforeClass - public static void defineSchema() - { - SchemaLoader.prepareServer(); - SchemaLoader.createKeyspace(KEYSPACE1, - KeyspaceParams.simple(1), - SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD)); - - TableId tid = Schema.instance.getTableMetadata(KEYSPACE1, CF_STANDARD).id; - cfs = Schema.instance.getColumnFamilyStoreInstance(tid); - } - - /** - * When there is no difference between two, LocalSyncTask should return stats with 0 difference. - */ - @Test - public void testNoDifference() throws Throwable - { - final InetAddressAndPort ep1 = InetAddressAndPort.getByName("127.0.0.1"); - final InetAddressAndPort ep2 = InetAddressAndPort.getByName("127.0.0.1"); - - Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken()); - RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), KEYSPACE1, "Standard1", Arrays.asList(range)); - - MerkleTrees tree1 = createInitialTree(desc); - - MerkleTrees tree2 = createInitialTree(desc); - - // difference the trees - // note: we reuse the same endpoint which is bogus in theory but fine here - TreeResponse r1 = new TreeResponse(ep1, tree1); - TreeResponse r2 = new TreeResponse(ep2, tree2); - LocalSyncTask task = new LocalSyncTask(desc, r1, r2, NO_PENDING_REPAIR, false, PreviewKind.NONE); - task.run(); - - assertEquals(0, task.get().numberOfDifferences); - } - - @Test - public void testDifference() throws Throwable - { - Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken()); - UUID parentRepairSession = UUID.randomUUID(); - Keyspace keyspace = Keyspace.open(KEYSPACE1); - ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1"); - - ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, FBUtilities.getBroadcastAddressAndPort(), - Arrays.asList(cfs), Arrays.asList(range), false, - ActiveRepairService.UNREPAIRED_SSTABLE, false, - PreviewKind.NONE); - - RepairJobDesc desc = new RepairJobDesc(parentRepairSession, UUID.randomUUID(), KEYSPACE1, "Standard1", Arrays.asList(range)); - - MerkleTrees tree1 = createInitialTree(desc); - - MerkleTrees tree2 = createInitialTree(desc); - - // change a range in one of the trees - Token token = partitioner.midpoint(range.left, range.right); - tree1.invalidate(token); - MerkleTree.TreeRange changed = tree1.get(token); - changed.hash("non-empty hash!".getBytes()); - - Set<Range<Token>> interesting = new HashSet<>(); - interesting.add(changed); - - // difference the trees - // note: we reuse the same endpoint which is bogus in theory but fine here - TreeResponse r1 = new TreeResponse(InetAddressAndPort.getByName("127.0.0.1"), tree1); - TreeResponse r2 = new TreeResponse(InetAddressAndPort.getByName("127.0.0.2"), tree2); - LocalSyncTask task = new LocalSyncTask(desc, r1, r2, NO_PENDING_REPAIR, false, PreviewKind.NONE); - task.run(); - - // ensure that the changed range was recorded - assertEquals("Wrong differing ranges", interesting.size(), task.getCurrentStat().numberOfDifferences); - } - - @Test - public void fullRepairStreamPlan() throws Exception - { - UUID sessionID = registerSession(cfs, true, true); - ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID); - RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), KEYSPACE1, CF_STANDARD, prs.getRanges()); - - TreeResponse r1 = new TreeResponse(PARTICIPANT1, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); - TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); - - LocalSyncTask task = new LocalSyncTask(desc, r1, r2, NO_PENDING_REPAIR, false, PreviewKind.NONE); - StreamPlan plan = task.createStreamPlan(PARTICIPANT1, Lists.newArrayList(RANGE1)); - - assertEquals(NO_PENDING_REPAIR, plan.getPendingRepair()); - assertTrue(plan.getFlushBeforeTransfer()); - } - - @Test - public void incrementalRepairStreamPlan() throws Exception - { - UUID sessionID = registerSession(cfs, true, true); - ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID); - RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), KEYSPACE1, CF_STANDARD, prs.getRanges()); - - TreeResponse r1 = new TreeResponse(PARTICIPANT1, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); - TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); - - LocalSyncTask task = new LocalSyncTask(desc, r1, r2, desc.parentSessionId, false, PreviewKind.NONE); - StreamPlan plan = task.createStreamPlan(PARTICIPANT1, Lists.newArrayList(RANGE1)); - - assertEquals(desc.parentSessionId, plan.getPendingRepair()); - assertFalse(plan.getFlushBeforeTransfer()); - } - - private MerkleTrees createInitialTree(RepairJobDesc desc, IPartitioner partitioner) - { - MerkleTrees tree = new MerkleTrees(partitioner); - tree.addMerkleTrees((int) Math.pow(2, 15), desc.ranges); - tree.init(); - for (MerkleTree.TreeRange r : tree.invalids()) - { - r.ensureHashInitialised(); - } - return tree; - } - - private MerkleTrees createInitialTree(RepairJobDesc desc) - { - return createInitialTree(desc, partitioner); - - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java b/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java index 2044106..418d7de 100644 --- a/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java +++ b/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java @@ -18,7 +18,6 @@ package org.apache.cassandra.repair; -import java.net.InetAddress; import java.util.Collections; import java.util.List; import java.util.Set; @@ -29,7 +28,6 @@ import org.junit.Assert; import org.junit.Test; import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.repair.RepairRunnable.CommonRange; import static org.apache.cassandra.repair.RepairRunnable.filterCommonRanges; @@ -41,7 +39,7 @@ public class RepairRunnableTest extends AbstractRepairTest @Test public void filterCommonIncrementalRangesNotForced() throws Exception { - CommonRange cr = new CommonRange(PARTICIPANTS, ALL_RANGES); + CommonRange cr = new CommonRange(PARTICIPANTS, Collections.emptySet(), ALL_RANGES); List<CommonRange> expected = Lists.newArrayList(cr); List<CommonRange> actual = filterCommonRanges(expected, Collections.emptySet(), false); @@ -52,13 +50,13 @@ public class RepairRunnableTest extends AbstractRepairTest @Test public void forceFilterCommonIncrementalRanges() throws Exception { - CommonRange cr1 = new CommonRange(Sets.newHashSet(PARTICIPANT1, PARTICIPANT2), Sets.newHashSet(RANGE1, RANGE2)); - CommonRange cr2 = new CommonRange(Sets.newHashSet(PARTICIPANT1, PARTICIPANT2, PARTICIPANT3), Sets.newHashSet(RANGE3)); + CommonRange cr1 = new CommonRange(Sets.newHashSet(PARTICIPANT1, PARTICIPANT2), Collections.emptySet(), Sets.newHashSet(RANGE1, RANGE2)); + CommonRange cr2 = new CommonRange(Sets.newHashSet(PARTICIPANT1, PARTICIPANT2, PARTICIPANT3), Collections.emptySet(), Sets.newHashSet(RANGE3)); Set<InetAddressAndPort> liveEndpoints = Sets.newHashSet(PARTICIPANT2, PARTICIPANT3); // PARTICIPANT1 is excluded List<CommonRange> initial = Lists.newArrayList(cr1, cr2); - List<CommonRange> expected = Lists.newArrayList(new CommonRange(Sets.newHashSet(PARTICIPANT2), Sets.newHashSet(RANGE1, RANGE2)), - new CommonRange(Sets.newHashSet(PARTICIPANT2, PARTICIPANT3), Sets.newHashSet(RANGE3))); + List<CommonRange> expected = Lists.newArrayList(new CommonRange(Sets.newHashSet(PARTICIPANT2), Collections.emptySet(), Sets.newHashSet(RANGE1, RANGE2)), + new CommonRange(Sets.newHashSet(PARTICIPANT2, PARTICIPANT3), Collections.emptySet(), Sets.newHashSet(RANGE3))); List<CommonRange> actual = filterCommonRanges(initial, liveEndpoints, true); Assert.assertEquals(expected, actual); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/repair/RepairSessionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java index 54f0511..e77d657 100644 --- a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java +++ b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java @@ -20,6 +20,7 @@ package org.apache.cassandra.repair; import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.Set; import java.util.UUID; import java.util.concurrent.ExecutionException; @@ -62,9 +63,10 @@ public class RepairSessionTest IPartitioner p = Murmur3Partitioner.instance; Range<Token> repairRange = new Range<>(p.getToken(ByteBufferUtil.bytes(0)), p.getToken(ByteBufferUtil.bytes(100))); Set<InetAddressAndPort> endpoints = Sets.newHashSet(remote); - RepairSession session = new RepairSession(parentSessionId, sessionId, Arrays.asList(repairRange), + RepairSession session = new RepairSession(parentSessionId, sessionId, + new CommonRange(endpoints, Collections.emptySet(), Arrays.asList(repairRange)), "Keyspace1", RepairParallelism.SEQUENTIAL, - endpoints, false, false, false, + false, false, false, PreviewKind.NONE, false, "Standard1"); // perform convict http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/repair/SymmetricLocalSyncTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/SymmetricLocalSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/SymmetricLocalSyncTaskTest.java new file mode 100644 index 0000000..92ae172 --- /dev/null +++ b/test/unit/org/apache/cassandra/repair/SymmetricLocalSyncTaskTest.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import org.junit.BeforeClass; +import org.junit.Test; +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.streaming.StreamCoordinator; +import org.apache.cassandra.streaming.DefaultConnectionFactory; +import org.apache.cassandra.streaming.StreamPlan; +import org.apache.cassandra.streaming.PreviewKind; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MerkleTree; +import org.apache.cassandra.utils.MerkleTrees; +import org.apache.cassandra.utils.UUIDGen; + +import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class SymmetricLocalSyncTaskTest extends AbstractRepairTest +{ + private static final IPartitioner partitioner = Murmur3Partitioner.instance; + public static final String KEYSPACE1 = "DifferencerTest"; + public static final String CF_STANDARD = "Standard1"; + public static ColumnFamilyStore cfs; + + @BeforeClass + public static void defineSchema() + { + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE1, + KeyspaceParams.simple(1), + SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD)); + + TableId tid = Schema.instance.getTableMetadata(KEYSPACE1, CF_STANDARD).id; + cfs = Schema.instance.getColumnFamilyStoreInstance(tid); + } + + /** + * When there is no difference between two, SymmetricLocalSyncTask should return stats with 0 difference. + */ + @Test + public void testNoDifference() throws Throwable + { + final InetAddressAndPort ep1 = InetAddressAndPort.getByName("127.0.0.1"); + final InetAddressAndPort ep2 = InetAddressAndPort.getByName("127.0.0.1"); + + Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken()); + RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), KEYSPACE1, "Standard1", Arrays.asList(range)); + + MerkleTrees tree1 = createInitialTree(desc); + + MerkleTrees tree2 = createInitialTree(desc); + + // difference the trees + // note: we reuse the same endpoint which is bogus in theory but fine here + TreeResponse r1 = new TreeResponse(ep1, tree1); + TreeResponse r2 = new TreeResponse(ep2, tree2); + SymmetricLocalSyncTask task = new SymmetricLocalSyncTask(desc, r1, r2, false, NO_PENDING_REPAIR, false, PreviewKind.NONE); + task.run(); + + assertEquals(0, task.get().numberOfDifferences); + } + + @Test + public void testDifference() throws Throwable + { + Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken()); + UUID parentRepairSession = UUID.randomUUID(); + Keyspace keyspace = Keyspace.open(KEYSPACE1); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1"); + + ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, FBUtilities.getBroadcastAddressAndPort(), + Arrays.asList(cfs), Arrays.asList(range), false, + ActiveRepairService.UNREPAIRED_SSTABLE, false, + PreviewKind.NONE); + + RepairJobDesc desc = new RepairJobDesc(parentRepairSession, UUID.randomUUID(), KEYSPACE1, "Standard1", Arrays.asList(range)); + + MerkleTrees tree1 = createInitialTree(desc); + + MerkleTrees tree2 = createInitialTree(desc); + + // change a range in one of the trees + Token token = partitioner.midpoint(range.left, range.right); + tree1.invalidate(token); + MerkleTree.TreeRange changed = tree1.get(token); + changed.hash("non-empty hash!".getBytes()); + + Set<Range<Token>> interesting = new HashSet<>(); + interesting.add(changed); + + // difference the trees + // note: we reuse the same endpoint which is bogus in theory but fine here + TreeResponse r1 = new TreeResponse(InetAddressAndPort.getByName("127.0.0.1"), tree1); + TreeResponse r2 = new TreeResponse(InetAddressAndPort.getByName("127.0.0.2"), tree2); + SymmetricLocalSyncTask task = new SymmetricLocalSyncTask(desc, r1, r2, false, NO_PENDING_REPAIR, false, PreviewKind.NONE); + DefaultConnectionFactory.MAX_CONNECT_ATTEMPTS = 1; + DefaultConnectionFactory.MAX_WAIT_TIME_NANOS = TimeUnit.SECONDS.toNanos(2); + try + { + task.run(); + } + finally + { + DefaultConnectionFactory.MAX_WAIT_TIME_NANOS = TimeUnit.SECONDS.toNanos(30); + DefaultConnectionFactory.MAX_CONNECT_ATTEMPTS = 3; + } + + // ensure that the changed range was recorded + assertEquals("Wrong differing ranges", interesting.size(), task.getCurrentStat().numberOfDifferences); + } + + @Test + public void fullRepairStreamPlan() throws Exception + { + UUID sessionID = registerSession(cfs, true, true); + ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID); + RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), KEYSPACE1, CF_STANDARD, prs.getRanges()); + + TreeResponse r1 = new TreeResponse(PARTICIPANT1, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); + TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); + + SymmetricLocalSyncTask task = new SymmetricLocalSyncTask(desc, r1, r2, false, NO_PENDING_REPAIR, false, PreviewKind.NONE); + StreamPlan plan = task.createStreamPlan(PARTICIPANT1, Lists.newArrayList(RANGE1)); + + assertEquals(NO_PENDING_REPAIR, plan.getPendingRepair()); + assertTrue(plan.getFlushBeforeTransfer()); + } + + private static void assertNumInOut(StreamPlan plan, int expectedIncoming, int expectedOutgoing) + { + StreamCoordinator coordinator = plan.getCoordinator(); + StreamSession session = Iterables.getOnlyElement(coordinator.getAllStreamSessions()); + assertEquals(expectedIncoming, session.getNumRequests()); + assertEquals(expectedOutgoing, session.getNumTransfers()); + } + + @Test + public void incrementalRepairStreamPlan() throws Exception + { + UUID sessionID = registerSession(cfs, true, true); + ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID); + RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), KEYSPACE1, CF_STANDARD, prs.getRanges()); + + TreeResponse r1 = new TreeResponse(PARTICIPANT1, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); + TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); + + SymmetricLocalSyncTask task = new SymmetricLocalSyncTask(desc, r1, r2, false, desc.parentSessionId, false, PreviewKind.NONE); + StreamPlan plan = task.createStreamPlan(PARTICIPANT1, Lists.newArrayList(RANGE1)); + + assertEquals(desc.parentSessionId, plan.getPendingRepair()); + assertFalse(plan.getFlushBeforeTransfer()); + assertNumInOut(plan, 1, 1); + } + + /** + * Don't reciprocate streams if the other endpoint is a transient replica + */ + @Test + public void transientStreamPlan() + { + UUID sessionID = registerSession(cfs, true, true); + ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID); + RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), KEYSPACE1, CF_STANDARD, prs.getRanges()); + + TreeResponse r1 = new TreeResponse(PARTICIPANT1, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); + TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); + + SymmetricLocalSyncTask task = new SymmetricLocalSyncTask(desc, r1, r2, true, desc.parentSessionId, false, PreviewKind.NONE); + StreamPlan plan = task.createStreamPlan(PARTICIPANT2, Lists.newArrayList(RANGE1)); + assertNumInOut(plan, 1, 0); + } + + private MerkleTrees createInitialTree(RepairJobDesc desc, IPartitioner partitioner) + { + MerkleTrees tree = new MerkleTrees(partitioner); + tree.addMerkleTrees((int) Math.pow(2, 15), desc.ranges); + tree.init(); + for (MerkleTree.TreeRange r : tree.invalids()) + { + r.ensureHashInitialised(); + } + return tree; + } + + private MerkleTrees createInitialTree(RepairJobDesc desc) + { + return createInitialTree(desc, partitioner); + + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/repair/SymmetricRemoteSyncTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/SymmetricRemoteSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/SymmetricRemoteSyncTaskTest.java new file mode 100644 index 0000000..06f968f --- /dev/null +++ b/test/unit/org/apache/cassandra/repair/SymmetricRemoteSyncTaskTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair; + +import java.util.List; + +import com.google.common.collect.ImmutableList; +import org.junit.Assert; +import org.junit.Test; + +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.repair.messages.RepairMessage; +import org.apache.cassandra.repair.messages.SyncRequest; +import org.apache.cassandra.streaming.PreviewKind; +import org.apache.cassandra.utils.UUIDGen; + +public class SymmetricRemoteSyncTaskTest extends AbstractRepairTest +{ + private static final RepairJobDesc DESC = new RepairJobDesc(UUIDGen.getTimeUUID(), UUIDGen.getTimeUUID(), "ks", "tbl", ALL_RANGES); + private static final List<Range<Token>> RANGE_LIST = ImmutableList.of(RANGE1); + + private static class InstrumentedSymmetricRemoteSyncTask extends SymmetricRemoteSyncTask + { + public InstrumentedSymmetricRemoteSyncTask(InetAddressAndPort e1, InetAddressAndPort e2) + { + super(DESC, new TreeResponse(e1, null), new TreeResponse(e2, null), PreviewKind.NONE); + } + + RepairMessage sentMessage = null; + InetAddressAndPort sentTo = null; + + @Override + void sendRequest(RepairMessage request, InetAddressAndPort to) + { + Assert.assertNull(sentMessage); + Assert.assertNotNull(request); + Assert.assertNotNull(to); + sentMessage = request; + sentTo = to; + } + } + + @Test + public void normalSync() + { + InstrumentedSymmetricRemoteSyncTask syncTask = new InstrumentedSymmetricRemoteSyncTask(PARTICIPANT1, PARTICIPANT2); + syncTask.startSync(RANGE_LIST); + + Assert.assertNotNull(syncTask.sentMessage); + Assert.assertSame(SyncRequest.class, syncTask.sentMessage.getClass()); + Assert.assertEquals(PARTICIPANT1, syncTask.sentTo); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java index 3ea888d..a7e8272 100644 --- a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java +++ b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java @@ -44,12 +44,13 @@ public class LocalSessionAccessor ARS.consistent.local.putSessionUnsafe(session); } - public static void finalizeUnsafe(UUID sessionID) + public static long finalizeUnsafe(UUID sessionID) { LocalSession session = ARS.consistent.local.getSession(sessionID); assert session != null; session.setState(ConsistentSession.State.FINALIZED); ARS.consistent.local.save(session); + return session.repairedAt; } public static void failUnsafe(UUID sessionID) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java index d368510..e387c41 100644 --- a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java +++ b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java @@ -40,10 +40,9 @@ import org.junit.BeforeClass; import org.junit.Test; import org.apache.cassandra.SchemaLoader; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.statements.schema.CreateTableStatement; +import org.apache.cassandra.locator.RangesAtEndpoint; import org.apache.cassandra.repair.AbstractRepairTest; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.repair.KeyspaceRepairManager; @@ -136,7 +135,11 @@ public class LocalSessionTest extends AbstractRepairTest boolean prepareSessionCalled = false; @Override - ListenableFuture prepareSession(KeyspaceRepairManager repairManager, UUID sessionID, Collection<ColumnFamilyStore> tables, Collection<Range<Token>> ranges, ExecutorService executor) + ListenableFuture prepareSession(KeyspaceRepairManager repairManager, + UUID sessionID, + Collection<ColumnFamilyStore> tables, + RangesAtEndpoint ranges, + ExecutorService executor) { prepareSessionCalled = true; if (prepareSessionFuture != null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/schema/MockSchema.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/schema/MockSchema.java b/test/unit/org/apache/cassandra/schema/MockSchema.java index 98bf9ca..7a6b011 100644 --- a/test/unit/org/apache/cassandra/schema/MockSchema.java +++ b/test/unit/org/apache/cassandra/schema/MockSchema.java @@ -127,7 +127,7 @@ public class MockSchema } SerializationHeader header = SerializationHeader.make(cfs.metadata(), Collections.emptyList()); StatsMetadata metadata = (StatsMetadata) new MetadataCollector(cfs.metadata().comparator) - .finalizeMetadata(cfs.metadata().partitioner.getClass().getCanonicalName(), 0.01f, -1, null, header) + .finalizeMetadata(cfs.metadata().partitioner.getClass().getCanonicalName(), 0.01f, -1, null, false, header) .get(MetadataType.STATS); SSTableReader reader = SSTableReader.internalOpen(descriptor, components, cfs.metadata, RANDOM_ACCESS_READER_FACTORY.sharedCopy(), RANDOM_ACCESS_READER_FACTORY.sharedCopy(), indexSummary.sharedCopy(), --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org