http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java 
b/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java
index 9c90d57..4afeb5a 100644
--- a/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java
+++ b/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.locator;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -39,9 +38,11 @@ import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.Pair;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class OldNetworkTopologyStrategyTest
 {
+
     private List<Token> keyTokens;
     private TokenMetadata tmd;
     private Map<String, ArrayList<InetAddressAndPort>> expectedResults;
@@ -53,7 +54,7 @@ public class OldNetworkTopologyStrategyTest
     }
 
     @Before
-    public void init()
+    public void init() throws Exception
     {
         keyTokens = new ArrayList<Token>();
         tmd = new TokenMetadata();
@@ -160,11 +161,11 @@ public class OldNetworkTopologyStrategyTest
     {
         for (Token keyToken : keyTokens)
         {
-            List<InetAddressAndPort> endpoints = 
strategy.getNaturalEndpoints(keyToken);
-            for (int j = 0; j < endpoints.size(); j++)
+            int j = 0;
+            for (InetAddressAndPort endpoint : 
strategy.getNaturalReplicasForToken(keyToken).endpoints())
             {
                 ArrayList<InetAddressAndPort> hostsExpected = 
expectedResults.get(keyToken.toString());
-                assertEquals(endpoints.get(j), hostsExpected.get(j));
+                assertEquals(endpoint, hostsExpected.get(j++));
             }
         }
     }
@@ -188,7 +189,6 @@ public class OldNetworkTopologyStrategyTest
         assertEquals(ranges.left.iterator().next().left, 
tokensAfterMove[movingNodeIdx]);
         assertEquals(ranges.left.iterator().next().right, 
tokens[movingNodeIdx]);
         assertEquals("No data should be fetched", ranges.right.size(), 0);
-
     }
 
     @Test
@@ -205,7 +205,6 @@ public class OldNetworkTopologyStrategyTest
         assertEquals("No data should be streamed", ranges.left.size(), 0);
         assertEquals(ranges.right.iterator().next().left, 
tokens[movingNodeIdx]);
         assertEquals(ranges.right.iterator().next().right, 
tokensAfterMove[movingNodeIdx]);
-
     }
 
     @SuppressWarnings("unchecked")
@@ -366,16 +365,21 @@ public class OldNetworkTopologyStrategyTest
         TokenMetadata tokenMetadataAfterMove = 
initTokenMetadata(tokensAfterMove);
         AbstractReplicationStrategy strategy = new 
OldNetworkTopologyStrategy("Keyspace1", tokenMetadataCurrent, endpointSnitch, 
optsWithRF(2));
 
-        Collection<Range<Token>> currentRanges = 
strategy.getAddressRanges().get(movingNode);
-        Collection<Range<Token>> updatedRanges = 
strategy.getPendingAddressRanges(tokenMetadataAfterMove, 
tokensAfterMove[movingNodeIdx], movingNode);
-
-        Pair<Set<Range<Token>>, Set<Range<Token>>> ranges = 
StorageService.instance.calculateStreamAndFetchRanges(currentRanges, 
updatedRanges);
+        RangesAtEndpoint currentRanges = 
strategy.getAddressReplicas().get(movingNode);
+        RangesAtEndpoint updatedRanges = 
strategy.getPendingAddressRanges(tokenMetadataAfterMove, 
tokensAfterMove[movingNodeIdx], movingNode);
 
-        return ranges;
+        return 
asRanges(StorageService.calculateStreamAndFetchRanges(currentRanges, 
updatedRanges));
     }
 
     private static Map<String, String> optsWithRF(int rf)
     {
         return Collections.singletonMap("replication_factor", 
Integer.toString(rf));
     }
+
+    public static Pair<Set<Range<Token>>, Set<Range<Token>>> 
asRanges(Pair<RangesAtEndpoint, RangesAtEndpoint> replicas)
+    {
+        Set<Range<Token>> leftRanges = replicas.left.ranges();
+        Set<Range<Token>> rightRanges = replicas.right.ranges();
+        return Pair.create(leftRanges, rightRanges);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/locator/PendingRangeMapsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/PendingRangeMapsTest.java 
b/test/unit/org/apache/cassandra/locator/PendingRangeMapsTest.java
index 56fd181..8e0bc00 100644
--- a/test/unit/org/apache/cassandra/locator/PendingRangeMapsTest.java
+++ b/test/unit/org/apache/cassandra/locator/PendingRangeMapsTest.java
@@ -26,7 +26,6 @@ import org.apache.cassandra.dht.Token;
 import org.junit.Test;
 
 import java.net.UnknownHostException;
-import java.util.Collection;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -38,17 +37,29 @@ public class PendingRangeMapsTest {
         return new Range<Token>(new BigIntegerToken(left), new 
BigIntegerToken(right));
     }
 
+    private static void addPendingRange(PendingRangeMaps pendingRangeMaps, 
Range<Token> range, String endpoint)
+    {
+        try
+        {
+            pendingRangeMaps.addPendingRange(range, 
Replica.fullReplica(InetAddressAndPort.getByName(endpoint), range));
+        }
+        catch (UnknownHostException e)
+        {
+            throw new AssertionError(e);
+        }
+    }
+
     @Test
     public void testPendingEndpoints() throws UnknownHostException
     {
         PendingRangeMaps pendingRangeMaps = new PendingRangeMaps();
 
-        pendingRangeMaps.addPendingRange(genRange("5", "15"), 
InetAddressAndPort.getByName("127.0.0.1"));
-        pendingRangeMaps.addPendingRange(genRange("15", "25"), 
InetAddressAndPort.getByName("127.0.0.2"));
-        pendingRangeMaps.addPendingRange(genRange("25", "35"), 
InetAddressAndPort.getByName("127.0.0.3"));
-        pendingRangeMaps.addPendingRange(genRange("35", "45"), 
InetAddressAndPort.getByName("127.0.0.4"));
-        pendingRangeMaps.addPendingRange(genRange("45", "55"), 
InetAddressAndPort.getByName("127.0.0.5"));
-        pendingRangeMaps.addPendingRange(genRange("45", "65"), 
InetAddressAndPort.getByName("127.0.0.6"));
+        addPendingRange(pendingRangeMaps, genRange("5", "15"), "127.0.0.1");
+        addPendingRange(pendingRangeMaps, genRange("15", "25"), "127.0.0.2");
+        addPendingRange(pendingRangeMaps, genRange("25", "35"), "127.0.0.3");
+        addPendingRange(pendingRangeMaps, genRange("35", "45"), "127.0.0.4");
+        addPendingRange(pendingRangeMaps, genRange("45", "55"), "127.0.0.5");
+        addPendingRange(pendingRangeMaps, genRange("45", "65"), "127.0.0.6");
 
         assertEquals(0, pendingRangeMaps.pendingEndpointsFor(new 
BigIntegerToken("0")).size());
         assertEquals(0, pendingRangeMaps.pendingEndpointsFor(new 
BigIntegerToken("5")).size());
@@ -61,8 +72,8 @@ public class PendingRangeMapsTest {
         assertEquals(2, pendingRangeMaps.pendingEndpointsFor(new 
BigIntegerToken("55")).size());
         assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new 
BigIntegerToken("65")).size());
 
-        Collection<InetAddressAndPort> endpoints = 
pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("15"));
-        
assertTrue(endpoints.contains(InetAddressAndPort.getByName("127.0.0.1")));
+        EndpointsForToken replicas = pendingRangeMaps.pendingEndpointsFor(new 
BigIntegerToken("15"));
+        
assertTrue(replicas.endpoints().contains(InetAddressAndPort.getByName("127.0.0.1")));
     }
 
     @Test
@@ -70,13 +81,13 @@ public class PendingRangeMapsTest {
     {
         PendingRangeMaps pendingRangeMaps = new PendingRangeMaps();
 
-        pendingRangeMaps.addPendingRange(genRange("5", "15"), 
InetAddressAndPort.getByName("127.0.0.1"));
-        pendingRangeMaps.addPendingRange(genRange("15", "25"), 
InetAddressAndPort.getByName("127.0.0.2"));
-        pendingRangeMaps.addPendingRange(genRange("25", "35"), 
InetAddressAndPort.getByName("127.0.0.3"));
-        pendingRangeMaps.addPendingRange(genRange("35", "45"), 
InetAddressAndPort.getByName("127.0.0.4"));
-        pendingRangeMaps.addPendingRange(genRange("45", "55"), 
InetAddressAndPort.getByName("127.0.0.5"));
-        pendingRangeMaps.addPendingRange(genRange("45", "65"), 
InetAddressAndPort.getByName("127.0.0.6"));
-        pendingRangeMaps.addPendingRange(genRange("65", "7"), 
InetAddressAndPort.getByName("127.0.0.7"));
+        addPendingRange(pendingRangeMaps, genRange("5", "15"), "127.0.0.1");
+        addPendingRange(pendingRangeMaps, genRange("15", "25"), "127.0.0.2");
+        addPendingRange(pendingRangeMaps, genRange("25", "35"), "127.0.0.3");
+        addPendingRange(pendingRangeMaps, genRange("35", "45"), "127.0.0.4");
+        addPendingRange(pendingRangeMaps, genRange("45", "55"), "127.0.0.5");
+        addPendingRange(pendingRangeMaps, genRange("45", "65"), "127.0.0.6");
+        addPendingRange(pendingRangeMaps, genRange("65", "7"), "127.0.0.7");
 
         assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new 
BigIntegerToken("0")).size());
         assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new 
BigIntegerToken("5")).size());
@@ -90,8 +101,8 @@ public class PendingRangeMapsTest {
         assertEquals(2, pendingRangeMaps.pendingEndpointsFor(new 
BigIntegerToken("55")).size());
         assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new 
BigIntegerToken("65")).size());
 
-        Collection<InetAddressAndPort> endpoints = 
pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("6"));
-        
assertTrue(endpoints.contains(InetAddressAndPort.getByName("127.0.0.1")));
-        
assertTrue(endpoints.contains(InetAddressAndPort.getByName("127.0.0.7")));
+        EndpointsForToken replicas = pendingRangeMaps.pendingEndpointsFor(new 
BigIntegerToken("6"));
+        
assertTrue(replicas.endpoints().contains(InetAddressAndPort.getByName("127.0.0.1")));
+        
assertTrue(replicas.endpoints().contains(InetAddressAndPort.getByName("127.0.0.7")));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java 
b/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java
new file mode 100644
index 0000000..66eff23
--- /dev/null
+++ b/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import com.google.common.base.Predicates;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict;
+import org.apache.cassandra.utils.FBUtilities;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import static org.apache.cassandra.locator.Replica.fullReplica;
+import static org.apache.cassandra.locator.Replica.transientReplica;
+
+public class ReplicaCollectionTest
+{
+
+    static final InetAddressAndPort EP1, EP2, EP3, EP4, EP5, BROADCAST_EP, 
NULL_EP;
+    static final Range<Token> R1, R2, R3, R4, R5, BROADCAST_RANGE, NULL_RANGE;
+
+    static
+    {
+        try
+        {
+            EP1 = InetAddressAndPort.getByName("127.0.0.1");
+            EP2 = InetAddressAndPort.getByName("127.0.0.2");
+            EP3 = InetAddressAndPort.getByName("127.0.0.3");
+            EP4 = InetAddressAndPort.getByName("127.0.0.4");
+            EP5 = InetAddressAndPort.getByName("127.0.0.5");
+            BROADCAST_EP = FBUtilities.getBroadcastAddressAndPort();
+            NULL_EP = InetAddressAndPort.getByName("127.255.255.255");
+            R1 = range(0, 1);
+            R2 = range(1, 2);
+            R3 = range(2, 3);
+            R4 = range(3, 4);
+            R5 = range(4, 5);
+            BROADCAST_RANGE = range(10, 11);
+            NULL_RANGE = range(10000, 10001);
+        }
+        catch (UnknownHostException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    static Token tk(long t)
+    {
+        return new Murmur3Partitioner.LongToken(t);
+    }
+
+    static Range<Token> range(long left, long right)
+    {
+        return new Range<>(tk(left), tk(right));
+    }
+
+    static class TestCase<C extends AbstractReplicaCollection<C>>
+    {
+        final C test;
+        final List<Replica> canonicalList;
+        final Multimap<InetAddressAndPort, Replica> canonicalByEndpoint;
+        final Multimap<Range<Token>, Replica> canonicalByRange;
+
+        TestCase(C test, List<Replica> canonicalList)
+        {
+            this.test = test;
+            this.canonicalList = canonicalList;
+            this.canonicalByEndpoint = HashMultimap.create();
+            this.canonicalByRange = HashMultimap.create();
+            for (Replica replica : canonicalList)
+                canonicalByEndpoint.put(replica.endpoint(), replica);
+            for (Replica replica : canonicalList)
+                canonicalByRange.put(replica.range(), replica);
+        }
+
+        void testSize()
+        {
+            Assert.assertEquals(canonicalList.size(), test.size());
+        }
+
+        void testEquals()
+        {
+            Assert.assertTrue(Iterables.elementsEqual(canonicalList, test));
+        }
+
+        void testEndpoints()
+        {
+            // TODO: we should do more exhaustive tests of the collection
+            
Assert.assertEquals(ImmutableSet.copyOf(canonicalByEndpoint.keySet()), 
ImmutableSet.copyOf(test.endpoints()));
+            try
+            {
+                test.endpoints().add(EP5);
+                Assert.fail();
+            } catch (UnsupportedOperationException e) {}
+            try
+            {
+                test.endpoints().remove(EP5);
+                Assert.fail();
+            } catch (UnsupportedOperationException e) {}
+
+            
Assert.assertTrue(test.endpoints().containsAll(canonicalByEndpoint.keySet()));
+            for (InetAddressAndPort ep : canonicalByEndpoint.keySet())
+                Assert.assertTrue(test.endpoints().contains(ep));
+            for (InetAddressAndPort ep : ImmutableList.of(EP1, EP2, EP3, EP4, 
EP5, BROADCAST_EP))
+                if (!canonicalByEndpoint.containsKey(ep))
+                    Assert.assertFalse(test.endpoints().contains(ep));
+        }
+
+        public void testOrderOfIteration()
+        {
+            Assert.assertEquals(canonicalList, ImmutableList.copyOf(test));
+            Assert.assertEquals(canonicalList, 
test.stream().collect(Collectors.toList()));
+            Assert.assertEquals(new 
LinkedHashSet<>(Lists.transform(canonicalList, Replica::endpoint)), 
test.endpoints());
+        }
+
+        void testSelect(int subListDepth, int filterDepth, int sortDepth, int 
selectDepth)
+        {
+            TestCase<C> allMatchZeroCapacity = new 
TestCase<>(test.select().add(Predicates.alwaysTrue(), 0).get(), 
Collections.emptyList());
+            allMatchZeroCapacity.testAll(subListDepth, filterDepth, sortDepth, 
selectDepth - 1);
+
+            TestCase<C> noMatchFullCapacity = new 
TestCase<>(test.select().add(Predicates.alwaysFalse(), 
canonicalList.size()).get(), Collections.emptyList());
+            noMatchFullCapacity.testAll(subListDepth, filterDepth, 
sortDepth,selectDepth - 1);
+
+            if (canonicalList.size() <= 2)
+                return;
+
+            List<Replica> newOrderList = 
ImmutableList.of(canonicalList.get(2), canonicalList.get(1), 
canonicalList.get(0));
+            TestCase<C> newOrder = new TestCase<>(
+                    test.select()
+                            .add(r -> r == newOrderList.get(0), 3)
+                            .add(r -> r == newOrderList.get(1), 3)
+                            .add(r -> r == newOrderList.get(2), 3)
+                            .get(), newOrderList
+            );
+            newOrder.testAll(subListDepth, filterDepth, sortDepth,selectDepth 
- 1);
+        }
+
+        private void assertSubList(C subCollection, int from, int to)
+        {
+            Assert.assertTrue(subCollection.isSnapshot);
+            if (from == to)
+            {
+                Assert.assertTrue(subCollection.isEmpty());
+            }
+            else
+            {
+                List<Replica> subList = this.test.list.subList(from, to);
+                if (test.isSnapshot)
+                    Assert.assertSame(subList.getClass(), 
subCollection.list.getClass());
+                Assert.assertEquals(subList, subCollection.list);
+            }
+        }
+
+        void testSubList(int subListDepth, int filterDepth, int sortDepth, int 
selectDepth)
+        {
+            if (test.isSnapshot)
+                Assert.assertSame(test, test.subList(0, test.size()));
+
+            if (test.isEmpty())
+                return;
+
+            TestCase<C> skipFront = new TestCase<>(test.subList(1, 
test.size()), canonicalList.subList(1, canonicalList.size()));
+            assertSubList(skipFront.test, 1, canonicalList.size());
+            skipFront.testAll(subListDepth - 1, filterDepth, sortDepth, 
selectDepth);
+            TestCase<C> skipBack = new TestCase<>(test.subList(0, test.size() 
- 1), canonicalList.subList(0, canonicalList.size() - 1));
+            assertSubList(skipBack.test, 0, canonicalList.size() - 1);
+            skipBack.testAll(subListDepth - 1, filterDepth, sortDepth, 
selectDepth);
+        }
+
+        void testFilter(int subListDepth, int filterDepth, int sortDepth, int 
selectDepth)
+        {
+            if (test.isSnapshot)
+                Assert.assertSame(test, test.filter(Predicates.alwaysTrue()));
+
+            if (test.isEmpty())
+                return;
+            // remove start
+            // we recurse on the same subset in testSubList, so just 
corroborate we have the correct list here
+            assertSubList(test.filter(r -> r != canonicalList.get(0)), 1, 
canonicalList.size());
+
+            if (test.size() <= 1)
+                return;
+            // remove end
+            // we recurse on the same subset in testSubList, so just 
corroborate we have the correct list here
+            assertSubList(test.filter(r -> r != 
canonicalList.get(canonicalList.size() - 1)), 0, canonicalList.size() - 1);
+
+            if (test.size() <= 2)
+                return;
+            Predicate<Replica> removeMiddle = r -> r != 
canonicalList.get(canonicalList.size() / 2);
+            TestCase<C> filtered = new TestCase<>(test.filter(removeMiddle), 
ImmutableList.copyOf(Iterables.filter(canonicalList, removeMiddle::test)));
+            filtered.testAll(subListDepth, filterDepth - 1, sortDepth, 
selectDepth);
+        }
+
+        void testContains()
+        {
+            for (Replica replica : canonicalList)
+                Assert.assertTrue(test.contains(replica));
+            Assert.assertFalse(test.contains(fullReplica(NULL_EP, 
NULL_RANGE)));
+        }
+
+        void testGet()
+        {
+            for (int i = 0 ; i < canonicalList.size() ; ++i)
+                Assert.assertEquals(canonicalList.get(i), test.get(i));
+        }
+
+        void testSort(int subListDepth, int filterDepth, int sortDepth, int 
selectDepth)
+        {
+            final Comparator<Replica> comparator = (o1, o2) ->
+            {
+                boolean f1 = o1 == canonicalList.get(0);
+                boolean f2 = o2 == canonicalList.get(0);
+                return f1 == f2 ? 0 : f1 ? 1 : -1;
+            };
+            TestCase<C> sorted = new TestCase<>(test.sorted(comparator), 
ImmutableList.sortedCopyOf(comparator, canonicalList));
+            sorted.testAll(subListDepth, filterDepth, sortDepth - 1, 
selectDepth);
+        }
+
+        private void testAll(int subListDepth, int filterDepth, int sortDepth, 
int selectDepth)
+        {
+            testEndpoints();
+            testOrderOfIteration();
+            testContains();
+            testGet();
+            testEquals();
+            testSize();
+            if (subListDepth > 0)
+                testSubList(subListDepth, filterDepth, sortDepth, selectDepth);
+            if (filterDepth > 0)
+                testFilter(subListDepth, filterDepth, sortDepth, selectDepth);
+            if (sortDepth > 0)
+                testSort(subListDepth, filterDepth, sortDepth, selectDepth);
+            if (selectDepth > 0)
+                testSelect(subListDepth, filterDepth, sortDepth, selectDepth);
+        }
+
+        public void testAll()
+        {
+            testAll(2, 2, 2, 2);
+        }
+    }
+
+    static class RangesAtEndpointTestCase extends TestCase<RangesAtEndpoint>
+    {
+        RangesAtEndpointTestCase(RangesAtEndpoint test, List<Replica> 
canonicalList)
+        {
+            super(test, canonicalList);
+        }
+
+        void testRanges()
+        {
+            
Assert.assertEquals(ImmutableSet.copyOf(canonicalByRange.keySet()), 
ImmutableSet.copyOf(test.ranges()));
+            try
+            {
+                test.ranges().add(R5);
+                Assert.fail();
+            } catch (UnsupportedOperationException e) {}
+            try
+            {
+                test.ranges().remove(R5);
+                Assert.fail();
+            } catch (UnsupportedOperationException e) {}
+
+            
Assert.assertTrue(test.ranges().containsAll(canonicalByRange.keySet()));
+            for (Range<Token> range : canonicalByRange.keySet())
+                Assert.assertTrue(test.ranges().contains(range));
+            for (Range<Token> range : ImmutableList.of(R1, R2, R3, R4, R5, 
BROADCAST_RANGE))
+                if (!canonicalByRange.containsKey(range))
+                    Assert.assertFalse(test.ranges().contains(range));
+        }
+
+        @Override
+        public void testOrderOfIteration()
+        {
+            super.testOrderOfIteration();
+            Assert.assertEquals(new 
LinkedHashSet<>(Lists.transform(canonicalList, Replica::range)), test.ranges());
+        }
+
+        @Override
+        public void testAll()
+        {
+            super.testAll();
+            testRanges();
+        }
+    }
+
+    private static final ImmutableList<Replica> RANGES_AT_ENDPOINT = 
ImmutableList.of(
+            fullReplica(EP1, R1),
+            fullReplica(EP1, R2),
+            transientReplica(EP1, R3),
+            fullReplica(EP1, R4),
+            transientReplica(EP1, R5)
+    );
+
+    @Test
+    public void testRangesAtEndpoint()
+    {
+        ImmutableList<Replica> canonical = RANGES_AT_ENDPOINT;
+        new RangesAtEndpointTestCase(
+                RangesAtEndpoint.copyOf(canonical), canonical
+        ).testAll();
+    }
+
+    @Test
+    public void testMutableRangesAtEndpoint()
+    {
+        ImmutableList<Replica> canonical1 = RANGES_AT_ENDPOINT.subList(0, 
RANGES_AT_ENDPOINT.size());
+        RangesAtEndpoint.Mutable test = new 
RangesAtEndpoint.Mutable(RANGES_AT_ENDPOINT.get(0).endpoint(), 
canonical1.size());
+        test.addAll(canonical1, Conflict.NONE);
+        try
+        {   // incorrect range
+            test.addAll(canonical1, Conflict.NONE);
+            Assert.fail();
+        } catch (IllegalArgumentException e) { }
+        test.addAll(canonical1, Conflict.DUPLICATE); // we ignore exact 
duplicates
+        try
+        {   // invalid endpoint; always error
+            test.add(fullReplica(EP2, BROADCAST_RANGE), Conflict.ALL);
+            Assert.fail();
+        } catch (IllegalArgumentException e) { }
+        try
+        {   // conflict on isFull/isTransient
+            test.add(fullReplica(EP1, R3), Conflict.DUPLICATE);
+            Assert.fail();
+        } catch (IllegalArgumentException e) { }
+        test.add(fullReplica(EP1, R3), Conflict.ALL);
+
+        new RangesAtEndpointTestCase(test, canonical1).testAll();
+
+        RangesAtEndpoint view = test.asImmutableView();
+        RangesAtEndpoint snapshot = view.subList(0, view.size());
+
+        ImmutableList<Replica> canonical2 = RANGES_AT_ENDPOINT;
+        test.addAll(canonical2.reverse(), Conflict.DUPLICATE);
+        new TestCase<>(snapshot, canonical1).testAll();
+        new TestCase<>(view, canonical2).testAll();
+        new TestCase<>(test, canonical2).testAll();
+    }
+
+    private static final ImmutableList<Replica> ENDPOINTS_FOR_X = 
ImmutableList.of(
+            fullReplica(EP1, R1),
+            fullReplica(EP2, R1),
+            transientReplica(EP3, R1),
+            fullReplica(EP4, R1),
+            transientReplica(EP5, R1)
+    );
+
+    @Test
+    public void testEndpointsForRange()
+    {
+        ImmutableList<Replica> canonical = ENDPOINTS_FOR_X;
+        new TestCase<>(
+                EndpointsForRange.copyOf(canonical), canonical
+        ).testAll();
+    }
+
+    @Test
+    public void testMutableEndpointsForRange()
+    {
+        ImmutableList<Replica> canonical1 = ENDPOINTS_FOR_X.subList(0, 
ENDPOINTS_FOR_X.size() - 1);
+        EndpointsForRange.Mutable test = new EndpointsForRange.Mutable(R1, 
canonical1.size());
+        test.addAll(canonical1, Conflict.NONE);
+        try
+        {   // incorrect range
+            test.addAll(canonical1, Conflict.NONE);
+            Assert.fail();
+        } catch (IllegalArgumentException e) { }
+        test.addAll(canonical1, Conflict.DUPLICATE); // we ignore exact 
duplicates
+        try
+        {   // incorrect range
+            test.add(fullReplica(BROADCAST_EP, R2), Conflict.ALL);
+            Assert.fail();
+        } catch (IllegalArgumentException e) { }
+        try
+        {   // conflict on isFull/isTransient
+            test.add(transientReplica(EP1, R1), Conflict.DUPLICATE);
+            Assert.fail();
+        } catch (IllegalArgumentException e) { }
+        test.add(transientReplica(EP1, R1), Conflict.ALL);
+
+        new TestCase<>(test, canonical1).testAll();
+
+        EndpointsForRange view = test.asImmutableView();
+        EndpointsForRange snapshot = view.subList(0, view.size());
+
+        ImmutableList<Replica> canonical2 = ENDPOINTS_FOR_X;
+        test.addAll(canonical2.reverse(), Conflict.DUPLICATE);
+        new TestCase<>(snapshot, canonical1).testAll();
+        new TestCase<>(view, canonical2).testAll();
+        new TestCase<>(test, canonical2).testAll();
+    }
+
+    @Test
+    public void testEndpointsForToken()
+    {
+        ImmutableList<Replica> canonical = ENDPOINTS_FOR_X;
+        new TestCase<>(
+                EndpointsForToken.copyOf(tk(1), canonical), canonical
+        ).testAll();
+    }
+
+    @Test
+    public void testMutableEndpointsForToken()
+    {
+        ImmutableList<Replica> canonical1 = ENDPOINTS_FOR_X.subList(0, 
ENDPOINTS_FOR_X.size() - 1);
+        EndpointsForToken.Mutable test = new EndpointsForToken.Mutable(tk(1), 
canonical1.size());
+        test.addAll(canonical1, Conflict.NONE);
+        try
+        {   // incorrect range
+            test.addAll(canonical1, Conflict.NONE);
+            Assert.fail();
+        } catch (IllegalArgumentException e) { }
+        test.addAll(canonical1, Conflict.DUPLICATE); // we ignore exact 
duplicates
+        try
+        {   // incorrect range
+            test.add(fullReplica(BROADCAST_EP, R2), Conflict.ALL);
+            Assert.fail();
+        } catch (IllegalArgumentException e) { }
+        try
+        {   // conflict on isFull/isTransient
+            test.add(transientReplica(EP1, R1), Conflict.DUPLICATE);
+            Assert.fail();
+        } catch (IllegalArgumentException e) { }
+        test.add(transientReplica(EP1, R1), Conflict.ALL);
+
+        new TestCase<>(test, canonical1).testAll();
+
+        EndpointsForToken view = test.asImmutableView();
+        EndpointsForToken snapshot = view.subList(0, view.size());
+
+        ImmutableList<Replica> canonical2 = ENDPOINTS_FOR_X;
+        test.addAll(canonical2.reverse(), Conflict.DUPLICATE);
+        new TestCase<>(snapshot, canonical1).testAll();
+        new TestCase<>(view, canonical2).testAll();
+        new TestCase<>(test, canonical2).testAll();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/locator/ReplicaUtils.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/ReplicaUtils.java 
b/test/unit/org/apache/cassandra/locator/ReplicaUtils.java
new file mode 100644
index 0000000..66f538f
--- /dev/null
+++ b/test/unit/org/apache/cassandra/locator/ReplicaUtils.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+
+import static org.apache.cassandra.locator.Replica.fullReplica;
+import static org.apache.cassandra.locator.Replica.transientReplica;
+
+public class ReplicaUtils
+{
+    public static final Range<Token> FULL_RANGE = new 
Range<>(Murmur3Partitioner.MINIMUM, Murmur3Partitioner.MINIMUM);
+    public static final AbstractBounds<PartitionPosition> FULL_BOUNDS = new 
Range<>(Murmur3Partitioner.MINIMUM.minKeyBound(), 
Murmur3Partitioner.MINIMUM.maxKeyBound());
+
+    public static Replica full(InetAddressAndPort endpoint)
+    {
+        return fullReplica(endpoint, FULL_RANGE);
+    }
+
+    public static Replica trans(InetAddressAndPort endpoint)
+    {
+        return transientReplica(endpoint, FULL_RANGE);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/locator/ReplicationFactorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/ReplicationFactorTest.java 
b/test/unit/org/apache/cassandra/locator/ReplicationFactorTest.java
new file mode 100644
index 0000000..a0427db
--- /dev/null
+++ b/test/unit/org/apache/cassandra/locator/ReplicationFactorTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.gms.Gossiper;
+
+public class ReplicationFactorTest
+{
+
+    @BeforeClass
+    public static void setupClass()
+    {
+        DatabaseDescriptor.daemonInitialization();
+        DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true);
+        Gossiper.instance.start(1);
+    }
+
+    private static void assertRfParseFailure(String s)
+    {
+        try
+        {
+            ReplicationFactor.fromString(s);
+            Assert.fail("Expected IllegalArgumentException");
+        }
+        catch (IllegalArgumentException e)
+        {
+            // expected
+        }
+    }
+
+    private static void assertRfParse(String s, int expectedReplicas, int 
expectedTrans)
+    {
+        ReplicationFactor rf = ReplicationFactor.fromString(s);
+        Assert.assertEquals(expectedReplicas, rf.allReplicas);
+        Assert.assertEquals(expectedTrans, rf.transientReplicas());
+        Assert.assertEquals(expectedReplicas - expectedTrans, rf.fullReplicas);
+    }
+
+    @Test
+    public void parseTest()
+    {
+        assertRfParse("3", 3, 0);
+        assertRfParse("3/1", 3, 1);
+
+        assertRfParse("5", 5, 0);
+        assertRfParse("5/2", 5, 2);
+
+        assertRfParseFailure("-1");
+        assertRfParseFailure("3/3");
+        assertRfParseFailure("3/4");
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
 
b/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
index a8caa72..e6a9365 100644
--- 
a/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
+++ 
b/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
@@ -17,9 +17,7 @@
  */
 package org.apache.cassandra.locator;
 
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.lang3.StringUtils;
@@ -75,7 +73,7 @@ public class ReplicationStrategyEndpointCacheTest
     public void runEndpointsWereCachedTest(Class stratClass, Map<String, 
String> configOptions) throws Exception
     {
         setup(stratClass, configOptions);
-        assert 
strategy.getNaturalEndpoints(searchToken).equals(strategy.getNaturalEndpoints(searchToken));
+        assert 
strategy.getNaturalReplicasForToken(searchToken).equals(strategy.getNaturalReplicasForToken(searchToken));
     }
 
     @Test
@@ -89,34 +87,34 @@ public class ReplicationStrategyEndpointCacheTest
     public void runCacheRespectsTokenChangesTest(Class stratClass, Map<String, 
String> configOptions) throws Exception
     {
         setup(stratClass, configOptions);
-        ArrayList<InetAddressAndPort> initial;
-        ArrayList<InetAddressAndPort> endpoints;
+        EndpointsForToken initial;
+        EndpointsForToken replicas;
 
-        endpoints = strategy.getNaturalEndpoints(searchToken);
-        assert endpoints.size() == 5 : StringUtils.join(endpoints, ",");
+        replicas = strategy.getNaturalReplicasForToken(searchToken);
+        assert replicas.size() == 5 : StringUtils.join(replicas, ",");
 
         // test token addition, in DC2 before existing token
-        initial = strategy.getNaturalEndpoints(searchToken);
+        initial = strategy.getNaturalReplicasForToken(searchToken);
         tmd.updateNormalToken(new BigIntegerToken(String.valueOf(35)), 
InetAddressAndPort.getByName("127.0.0.5"));
-        endpoints = strategy.getNaturalEndpoints(searchToken);
-        assert endpoints.size() == 5 : StringUtils.join(endpoints, ",");
-        assert !endpoints.equals(initial);
+        replicas = strategy.getNaturalReplicasForToken(searchToken);
+        assert replicas.size() == 5 : StringUtils.join(replicas, ",");
+        assert !replicas.equals(initial);
 
         // test token removal, newly created token
-        initial = strategy.getNaturalEndpoints(searchToken);
+        initial = strategy.getNaturalReplicasForToken(searchToken);
         tmd.removeEndpoint(InetAddressAndPort.getByName("127.0.0.5"));
-        endpoints = strategy.getNaturalEndpoints(searchToken);
-        assert endpoints.size() == 5 : StringUtils.join(endpoints, ",");
-        assert !endpoints.contains(InetAddressAndPort.getByName("127.0.0.5"));
-        assert !endpoints.equals(initial);
+        replicas = strategy.getNaturalReplicasForToken(searchToken);
+        assert replicas.size() == 5 : StringUtils.join(replicas, ",");
+        assert 
!replicas.endpoints().contains(InetAddressAndPort.getByName("127.0.0.5"));
+        assert !replicas.equals(initial);
 
         // test token change
-        initial = strategy.getNaturalEndpoints(searchToken);
+        initial = strategy.getNaturalReplicasForToken(searchToken);
         //move .8 after search token but before other DC3
         tmd.updateNormalToken(new BigIntegerToken(String.valueOf(25)), 
InetAddressAndPort.getByName("127.0.0.8"));
-        endpoints = strategy.getNaturalEndpoints(searchToken);
-        assert endpoints.size() == 5 : StringUtils.join(endpoints, ",");
-        assert !endpoints.equals(initial);
+        replicas = strategy.getNaturalReplicasForToken(searchToken);
+        assert replicas.size() == 5 : StringUtils.join(replicas, ",");
+        assert !replicas.equals(initial);
     }
 
     protected static class FakeSimpleStrategy extends SimpleStrategy
@@ -128,11 +126,11 @@ public class ReplicationStrategyEndpointCacheTest
             super(keyspaceName, tokenMetadata, snitch, configOptions);
         }
 
-        public List<InetAddressAndPort> calculateNaturalEndpoints(Token token, 
TokenMetadata metadata)
+        public EndpointsForRange calculateNaturalReplicas(Token token, 
TokenMetadata metadata)
         {
-            assert !called : "calculateNaturalEndpoints was already called, 
result should have been cached";
+            assert !called : "calculateNaturalReplicas was already called, 
result should have been cached";
             called = true;
-            return super.calculateNaturalEndpoints(token, metadata);
+            return super.calculateNaturalReplicas(token, metadata);
         }
     }
 
@@ -145,11 +143,11 @@ public class ReplicationStrategyEndpointCacheTest
             super(keyspaceName, tokenMetadata, snitch, configOptions);
         }
 
-        public List<InetAddressAndPort> calculateNaturalEndpoints(Token token, 
TokenMetadata metadata)
+        public EndpointsForRange calculateNaturalReplicas(Token token, 
TokenMetadata metadata)
         {
-            assert !called : "calculateNaturalEndpoints was already called, 
result should have been cached";
+            assert !called : "calculateNaturalReplicas was already called, 
result should have been cached";
             called = true;
-            return super.calculateNaturalEndpoints(token, metadata);
+            return super.calculateNaturalReplicas(token, metadata);
         }
     }
 
@@ -162,11 +160,11 @@ public class ReplicationStrategyEndpointCacheTest
             super(keyspaceName, tokenMetadata, snitch, configOptions);
         }
 
-        public List<InetAddressAndPort> calculateNaturalEndpoints(Token token, 
TokenMetadata metadata)
+        public EndpointsForRange calculateNaturalReplicas(Token token, 
TokenMetadata metadata)
         {
-            assert !called : "calculateNaturalEndpoints was already called, 
result should have been cached";
+            assert !called : "calculateNaturalReplicas was already called, 
result should have been cached";
             called = true;
-            return super.calculateNaturalEndpoints(token, metadata);
+            return super.calculateNaturalReplicas(token, metadata);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java 
b/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
index fe77b0e..1e0c152 100644
--- a/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
+++ b/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
@@ -19,14 +19,22 @@ package org.apache.cassandra.locator;
 
 import java.net.UnknownHostException;
 import java.util.ArrayList;
-import java.util.Collection;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.dht.IPartitioner;
@@ -53,6 +61,7 @@ public class SimpleStrategyTest
     {
         SchemaLoader.prepareServer();
         SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(1));
+        DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true);
     }
 
     @Test
@@ -107,12 +116,12 @@ public class SimpleStrategyTest
 
             for (int i = 0; i < keyTokens.length; i++)
             {
-                List<InetAddressAndPort> endpoints = 
strategy.getNaturalEndpoints(keyTokens[i]);
-                assertEquals(strategy.getReplicationFactor(), 
endpoints.size());
+                EndpointsForToken replicas = 
strategy.getNaturalReplicasForToken(keyTokens[i]);
+                assertEquals(strategy.getReplicationFactor().allReplicas, 
replicas.size());
                 List<InetAddressAndPort> correctEndpoints = new ArrayList<>();
-                for (int j = 0; j < endpoints.size(); j++)
+                for (int j = 0; j < replicas.size(); j++)
                     correctEndpoints.add(hosts.get((i + j + 1) % 
hosts.size()));
-                assertEquals(new HashSet<>(correctEndpoints), new 
HashSet<>(endpoints));
+                assertEquals(new HashSet<>(correctEndpoints), 
replicas.endpoints());
             }
         }
     }
@@ -154,30 +163,80 @@ public class SimpleStrategyTest
 
             PendingRangeCalculatorService.calculatePendingRanges(strategy, 
keyspaceName);
 
-            int replicationFactor = strategy.getReplicationFactor();
+            int replicationFactor = 
strategy.getReplicationFactor().allReplicas;
 
             for (int i = 0; i < keyTokens.length; i++)
             {
-                Collection<InetAddressAndPort> endpoints = 
tmd.getWriteEndpoints(keyTokens[i], keyspaceName, 
strategy.getNaturalEndpoints(keyTokens[i]));
-                assertTrue(endpoints.size() >= replicationFactor);
+                EndpointsForToken replicas = 
tmd.getWriteEndpoints(keyTokens[i], keyspaceName, 
strategy.getNaturalReplicasForToken(keyTokens[i]));
+                assertTrue(replicas.size() >= replicationFactor);
 
                 for (int j = 0; j < replicationFactor; j++)
                 {
                     //Check that the old nodes are definitely included
-                    assertTrue(endpoints.contains(hosts.get((i + j + 1) % 
hosts.size())));
+                   assertTrue(replicas.endpoints().contains(hosts.get((i + j + 
1) % hosts.size())));
                 }
 
                 // bootstrapEndpoint should be in the endpoints for i in 
MAX-RF to MAX, but not in any earlier ep.
                 if (i < RING_SIZE - replicationFactor)
-                    assertFalse(endpoints.contains(bootstrapEndpoint));
+                    
assertFalse(replicas.endpoints().contains(bootstrapEndpoint));
                 else
-                    assertTrue(endpoints.contains(bootstrapEndpoint));
+                    
assertTrue(replicas.endpoints().contains(bootstrapEndpoint));
             }
         }
 
         StorageServiceAccessor.setTokenMetadata(oldTmd);
     }
 
+    private static Token tk(long t)
+    {
+        return new Murmur3Partitioner.LongToken(t);
+    }
+
+    private static Range<Token> range(long l, long r)
+    {
+        return new Range<>(tk(l), tk(r));
+    }
+
+    @Test
+    public void transientReplica() throws Exception
+    {
+        IEndpointSnitch snitch = new SimpleSnitch();
+        DatabaseDescriptor.setEndpointSnitch(snitch);
+
+        List<InetAddressAndPort> endpoints = 
Lists.newArrayList(InetAddressAndPort.getByName("127.0.0.1"),
+                                                                
InetAddressAndPort.getByName("127.0.0.2"),
+                                                                
InetAddressAndPort.getByName("127.0.0.3"),
+                                                                
InetAddressAndPort.getByName("127.0.0.4"));
+
+        Multimap<InetAddressAndPort, Token> tokens = HashMultimap.create();
+        tokens.put(endpoints.get(0), tk(100));
+        tokens.put(endpoints.get(1), tk(200));
+        tokens.put(endpoints.get(2), tk(300));
+        tokens.put(endpoints.get(3), tk(400));
+        TokenMetadata metadata = new TokenMetadata();
+        metadata.updateNormalTokens(tokens);
+
+        Map<String, String> configOptions = new HashMap<String, String>();
+        configOptions.put("replication_factor", "3/1");
+
+        SimpleStrategy strategy = new SimpleStrategy("ks", metadata, snitch, 
configOptions);
+
+        Range<Token> range1 = range(400, 100);
+        Assert.assertEquals(EndpointsForToken.of(range1.right,
+                                                 
Replica.fullReplica(endpoints.get(0), range1),
+                                                 
Replica.fullReplica(endpoints.get(1), range1),
+                                                 
Replica.transientReplica(endpoints.get(2), range1)),
+                            strategy.getNaturalReplicasForToken(tk(99)));
+
+
+        Range<Token> range2 = range(100, 200);
+        Assert.assertEquals(EndpointsForToken.of(range2.right,
+                                                 
Replica.fullReplica(endpoints.get(1), range2),
+                                                 
Replica.fullReplica(endpoints.get(2), range2),
+                                                 
Replica.transientReplica(endpoints.get(3), range2)),
+                            strategy.getNaturalReplicasForToken(tk(101)));
+    }
+
     private AbstractReplicationStrategy getStrategy(String keyspaceName, 
TokenMetadata tmd)
     {
         KeyspaceMetadata ksmd = 
Schema.instance.getKeyspaceMetadata(keyspaceName);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java 
b/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java
index b589d2d..ae8c011 100644
--- a/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java
+++ b/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java
@@ -118,7 +118,7 @@ public class TokenMetadataTest
             }
 
             @Override
-            public int compareEndpoints(InetAddressAndPort target, 
InetAddressAndPort a1, InetAddressAndPort a2)
+            public int compareEndpoints(InetAddressAndPort target, Replica a1, 
Replica a2)
             {
                 return 0;
             }
@@ -165,7 +165,7 @@ public class TokenMetadataTest
             }
 
             @Override
-            public int compareEndpoints(InetAddressAndPort target, 
InetAddressAndPort a1, InetAddressAndPort a2)
+            public int compareEndpoints(InetAddressAndPort target, Replica a1, 
Replica a2)
             {
                 return 0;
             }
@@ -216,7 +216,7 @@ public class TokenMetadataTest
             }
 
             @Override
-            public int compareEndpoints(InetAddressAndPort target, 
InetAddressAndPort a1, InetAddressAndPort a2)
+            public int compareEndpoints(InetAddressAndPort target, Replica a1, 
Replica a2)
             {
                 return 0;
             }
@@ -263,7 +263,7 @@ public class TokenMetadataTest
             }
 
             @Override
-            public int compareEndpoints(InetAddressAndPort target, 
InetAddressAndPort a1, InetAddressAndPort a2)
+            public int compareEndpoints(InetAddressAndPort target, Replica a1, 
Replica a2)
             {
                 return 0;
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java 
b/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
index d97cdb8..e226d32 100644
--- a/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
+++ b/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
@@ -25,19 +25,20 @@ import org.junit.Test;
 
 import org.junit.Assert;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.BufferDecoratedKey;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.RegularAndStaticColumns;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.ReplicaUtils;
 import org.apache.cassandra.net.MessagingService.Verb;
 import org.apache.cassandra.schema.MockSchema;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.paxos.Commit;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
+import static org.apache.cassandra.locator.ReplicaUtils.full;
+
 public class WriteCallbackInfoTest
 {
     @BeforeClass
@@ -65,7 +66,7 @@ public class WriteCallbackInfoTest
                          ? new Commit(UUID.randomUUID(), new 
PartitionUpdate.Builder(metadata, ByteBufferUtil.EMPTY_BYTE_BUFFER, 
RegularAndStaticColumns.NONE, 1).build())
                          : new 
Mutation(PartitionUpdate.simpleBuilder(metadata, "").build());
 
-        WriteCallbackInfo wcbi = new 
WriteCallbackInfo(InetAddressAndPort.getByName("192.168.1.1"), null, new 
MessageOut(verb, payload, null), null, cl, allowHints);
+        WriteCallbackInfo wcbi = new 
WriteCallbackInfo(full(InetAddressAndPort.getByName("192.168.1.1")), null, new 
MessageOut(verb, payload, null), null, cl, allowHints);
         Assert.assertEquals(expectHint, wcbi.shouldHint());
         if (expectHint)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java 
b/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java
index 6a8dc83..379031c 100644
--- 
a/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java
+++ 
b/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java
@@ -46,6 +46,7 @@ import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.locator.AbstractEndpointSnitch;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.MessagingServiceTest;
@@ -172,7 +173,7 @@ public class OutboundMessagingConnectionTest
             return nodeToDc.get(endpoint);
         }
 
-        public int compareEndpoints(InetAddressAndPort target, 
InetAddressAndPort a1, InetAddressAndPort a2)
+        public int compareEndpoints(InetAddressAndPort target, Replica a1, 
Replica a2)
         {
             return 0;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java 
b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
deleted file mode 100644
index 802a673..0000000
--- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.repair;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-
-import com.google.common.collect.Lists;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.Murmur3Partitioner;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.schema.KeyspaceParams;
-import org.apache.cassandra.schema.Schema;
-import org.apache.cassandra.schema.TableId;
-import org.apache.cassandra.service.ActiveRepairService;
-import org.apache.cassandra.streaming.StreamPlan;
-import org.apache.cassandra.streaming.PreviewKind;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.MerkleTree;
-import org.apache.cassandra.utils.MerkleTrees;
-import org.apache.cassandra.utils.UUIDGen;
-
-import static 
org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class LocalSyncTaskTest extends AbstractRepairTest
-{
-    private static final IPartitioner partitioner = 
Murmur3Partitioner.instance;
-    public static final String KEYSPACE1 = "DifferencerTest";
-    public static final String CF_STANDARD = "Standard1";
-    public static ColumnFamilyStore cfs;
-
-    @BeforeClass
-    public static void defineSchema()
-    {
-        SchemaLoader.prepareServer();
-        SchemaLoader.createKeyspace(KEYSPACE1,
-                                    KeyspaceParams.simple(1),
-                                    SchemaLoader.standardCFMD(KEYSPACE1, 
CF_STANDARD));
-
-        TableId tid = Schema.instance.getTableMetadata(KEYSPACE1, 
CF_STANDARD).id;
-        cfs = Schema.instance.getColumnFamilyStoreInstance(tid);
-    }
-
-    /**
-     * When there is no difference between two, LocalSyncTask should return 
stats with 0 difference.
-     */
-    @Test
-    public void testNoDifference() throws Throwable
-    {
-        final InetAddressAndPort ep1 = 
InetAddressAndPort.getByName("127.0.0.1");
-        final InetAddressAndPort ep2 = 
InetAddressAndPort.getByName("127.0.0.1");
-
-        Range<Token> range = new Range<>(partitioner.getMinimumToken(), 
partitioner.getRandomToken());
-        RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), 
UUID.randomUUID(), KEYSPACE1, "Standard1", Arrays.asList(range));
-
-        MerkleTrees tree1 = createInitialTree(desc);
-
-        MerkleTrees tree2 = createInitialTree(desc);
-
-        // difference the trees
-        // note: we reuse the same endpoint which is bogus in theory but fine 
here
-        TreeResponse r1 = new TreeResponse(ep1, tree1);
-        TreeResponse r2 = new TreeResponse(ep2, tree2);
-        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, 
NO_PENDING_REPAIR, false, PreviewKind.NONE);
-        task.run();
-
-        assertEquals(0, task.get().numberOfDifferences);
-    }
-
-    @Test
-    public void testDifference() throws Throwable
-    {
-        Range<Token> range = new Range<>(partitioner.getMinimumToken(), 
partitioner.getRandomToken());
-        UUID parentRepairSession = UUID.randomUUID();
-        Keyspace keyspace = Keyspace.open(KEYSPACE1);
-        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
-
-        
ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, 
FBUtilities.getBroadcastAddressAndPort(),
-                                                                 
Arrays.asList(cfs), Arrays.asList(range), false,
-                                                                 
ActiveRepairService.UNREPAIRED_SSTABLE, false,
-                                                                 
PreviewKind.NONE);
-
-        RepairJobDesc desc = new RepairJobDesc(parentRepairSession, 
UUID.randomUUID(), KEYSPACE1, "Standard1", Arrays.asList(range));
-
-        MerkleTrees tree1 = createInitialTree(desc);
-
-        MerkleTrees tree2 = createInitialTree(desc);
-
-        // change a range in one of the trees
-        Token token = partitioner.midpoint(range.left, range.right);
-        tree1.invalidate(token);
-        MerkleTree.TreeRange changed = tree1.get(token);
-        changed.hash("non-empty hash!".getBytes());
-
-        Set<Range<Token>> interesting = new HashSet<>();
-        interesting.add(changed);
-
-        // difference the trees
-        // note: we reuse the same endpoint which is bogus in theory but fine 
here
-        TreeResponse r1 = new 
TreeResponse(InetAddressAndPort.getByName("127.0.0.1"), tree1);
-        TreeResponse r2 = new 
TreeResponse(InetAddressAndPort.getByName("127.0.0.2"), tree2);
-        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, 
NO_PENDING_REPAIR, false, PreviewKind.NONE);
-        task.run();
-
-        // ensure that the changed range was recorded
-        assertEquals("Wrong differing ranges", interesting.size(), 
task.getCurrentStat().numberOfDifferences);
-    }
-
-    @Test
-    public void fullRepairStreamPlan() throws Exception
-    {
-        UUID sessionID = registerSession(cfs, true, true);
-        ActiveRepairService.ParentRepairSession prs = 
ActiveRepairService.instance.getParentRepairSession(sessionID);
-        RepairJobDesc desc = new RepairJobDesc(sessionID, 
UUIDGen.getTimeUUID(), KEYSPACE1, CF_STANDARD, prs.getRanges());
-
-        TreeResponse r1 = new TreeResponse(PARTICIPANT1, 
createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
-        TreeResponse r2 = new TreeResponse(PARTICIPANT2, 
createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
-
-        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, 
NO_PENDING_REPAIR, false, PreviewKind.NONE);
-        StreamPlan plan = task.createStreamPlan(PARTICIPANT1, 
Lists.newArrayList(RANGE1));
-
-        assertEquals(NO_PENDING_REPAIR, plan.getPendingRepair());
-        assertTrue(plan.getFlushBeforeTransfer());
-    }
-
-    @Test
-    public void incrementalRepairStreamPlan() throws Exception
-    {
-        UUID sessionID = registerSession(cfs, true, true);
-        ActiveRepairService.ParentRepairSession prs = 
ActiveRepairService.instance.getParentRepairSession(sessionID);
-        RepairJobDesc desc = new RepairJobDesc(sessionID, 
UUIDGen.getTimeUUID(), KEYSPACE1, CF_STANDARD, prs.getRanges());
-
-        TreeResponse r1 = new TreeResponse(PARTICIPANT1, 
createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
-        TreeResponse r2 = new TreeResponse(PARTICIPANT2, 
createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
-
-        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, 
desc.parentSessionId, false, PreviewKind.NONE);
-        StreamPlan plan = task.createStreamPlan(PARTICIPANT1, 
Lists.newArrayList(RANGE1));
-
-        assertEquals(desc.parentSessionId, plan.getPendingRepair());
-        assertFalse(plan.getFlushBeforeTransfer());
-    }
-
-    private MerkleTrees createInitialTree(RepairJobDesc desc, IPartitioner 
partitioner)
-    {
-        MerkleTrees tree = new MerkleTrees(partitioner);
-        tree.addMerkleTrees((int) Math.pow(2, 15), desc.ranges);
-        tree.init();
-        for (MerkleTree.TreeRange r : tree.invalids())
-        {
-            r.ensureHashInitialised();
-        }
-        return tree;
-    }
-
-    private MerkleTrees createInitialTree(RepairJobDesc desc)
-    {
-        return createInitialTree(desc, partitioner);
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java 
b/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java
index 2044106..418d7de 100644
--- a/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java
+++ b/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.repair;
 
-import java.net.InetAddress;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
@@ -29,7 +28,6 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.repair.RepairRunnable.CommonRange;
 
 import static org.apache.cassandra.repair.RepairRunnable.filterCommonRanges;
 
@@ -41,7 +39,7 @@ public class RepairRunnableTest extends AbstractRepairTest
     @Test
     public void filterCommonIncrementalRangesNotForced() throws Exception
     {
-        CommonRange cr = new CommonRange(PARTICIPANTS, ALL_RANGES);
+        CommonRange cr = new CommonRange(PARTICIPANTS, Collections.emptySet(), 
ALL_RANGES);
 
         List<CommonRange> expected = Lists.newArrayList(cr);
         List<CommonRange> actual = filterCommonRanges(expected, 
Collections.emptySet(), false);
@@ -52,13 +50,13 @@ public class RepairRunnableTest extends AbstractRepairTest
     @Test
     public void forceFilterCommonIncrementalRanges() throws Exception
     {
-        CommonRange cr1 = new CommonRange(Sets.newHashSet(PARTICIPANT1, 
PARTICIPANT2), Sets.newHashSet(RANGE1, RANGE2));
-        CommonRange cr2 = new CommonRange(Sets.newHashSet(PARTICIPANT1, 
PARTICIPANT2, PARTICIPANT3), Sets.newHashSet(RANGE3));
+        CommonRange cr1 = new CommonRange(Sets.newHashSet(PARTICIPANT1, 
PARTICIPANT2), Collections.emptySet(), Sets.newHashSet(RANGE1, RANGE2));
+        CommonRange cr2 = new CommonRange(Sets.newHashSet(PARTICIPANT1, 
PARTICIPANT2, PARTICIPANT3), Collections.emptySet(), Sets.newHashSet(RANGE3));
         Set<InetAddressAndPort> liveEndpoints = Sets.newHashSet(PARTICIPANT2, 
PARTICIPANT3); // PARTICIPANT1 is excluded
 
         List<CommonRange> initial = Lists.newArrayList(cr1, cr2);
-        List<CommonRange> expected = Lists.newArrayList(new 
CommonRange(Sets.newHashSet(PARTICIPANT2), Sets.newHashSet(RANGE1, RANGE2)),
-                                                        new 
CommonRange(Sets.newHashSet(PARTICIPANT2, PARTICIPANT3), 
Sets.newHashSet(RANGE3)));
+        List<CommonRange> expected = Lists.newArrayList(new 
CommonRange(Sets.newHashSet(PARTICIPANT2), Collections.emptySet(), 
Sets.newHashSet(RANGE1, RANGE2)),
+                                                        new 
CommonRange(Sets.newHashSet(PARTICIPANT2, PARTICIPANT3), 
Collections.emptySet(), Sets.newHashSet(RANGE3)));
         List<CommonRange> actual = filterCommonRanges(initial, liveEndpoints, 
true);
 
         Assert.assertEquals(expected, actual);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java 
b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
index 54f0511..e77d657 100644
--- a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.repair;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
@@ -62,9 +63,10 @@ public class RepairSessionTest
         IPartitioner p = Murmur3Partitioner.instance;
         Range<Token> repairRange = new 
Range<>(p.getToken(ByteBufferUtil.bytes(0)), 
p.getToken(ByteBufferUtil.bytes(100)));
         Set<InetAddressAndPort> endpoints = Sets.newHashSet(remote);
-        RepairSession session = new RepairSession(parentSessionId, sessionId, 
Arrays.asList(repairRange),
+        RepairSession session = new RepairSession(parentSessionId, sessionId,
+                                                  new CommonRange(endpoints, 
Collections.emptySet(), Arrays.asList(repairRange)),
                                                   "Keyspace1", 
RepairParallelism.SEQUENTIAL,
-                                                  endpoints, false, false, 
false,
+                                                  false, false, false,
                                                   PreviewKind.NONE, false, 
"Standard1");
 
         // perform convict

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/repair/SymmetricLocalSyncTaskTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/repair/SymmetricLocalSyncTaskTest.java 
b/test/unit/org/apache/cassandra/repair/SymmetricLocalSyncTaskTest.java
new file mode 100644
index 0000000..92ae172
--- /dev/null
+++ b/test/unit/org/apache/cassandra/repair/SymmetricLocalSyncTaskTest.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.streaming.StreamCoordinator;
+import org.apache.cassandra.streaming.DefaultConnectionFactory;
+import org.apache.cassandra.streaming.StreamPlan;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static 
org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class SymmetricLocalSyncTaskTest extends AbstractRepairTest
+{
+    private static final IPartitioner partitioner = 
Murmur3Partitioner.instance;
+    public static final String KEYSPACE1 = "DifferencerTest";
+    public static final String CF_STANDARD = "Standard1";
+    public static ColumnFamilyStore cfs;
+
+    @BeforeClass
+    public static void defineSchema()
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE1,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, 
CF_STANDARD));
+
+        TableId tid = Schema.instance.getTableMetadata(KEYSPACE1, 
CF_STANDARD).id;
+        cfs = Schema.instance.getColumnFamilyStoreInstance(tid);
+    }
+
+    /**
+     * When there is no difference between two, SymmetricLocalSyncTask should 
return stats with 0 difference.
+     */
+    @Test
+    public void testNoDifference() throws Throwable
+    {
+        final InetAddressAndPort ep1 = 
InetAddressAndPort.getByName("127.0.0.1");
+        final InetAddressAndPort ep2 = 
InetAddressAndPort.getByName("127.0.0.1");
+
+        Range<Token> range = new Range<>(partitioner.getMinimumToken(), 
partitioner.getRandomToken());
+        RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), 
UUID.randomUUID(), KEYSPACE1, "Standard1", Arrays.asList(range));
+
+        MerkleTrees tree1 = createInitialTree(desc);
+
+        MerkleTrees tree2 = createInitialTree(desc);
+
+        // difference the trees
+        // note: we reuse the same endpoint which is bogus in theory but fine 
here
+        TreeResponse r1 = new TreeResponse(ep1, tree1);
+        TreeResponse r2 = new TreeResponse(ep2, tree2);
+        SymmetricLocalSyncTask task = new SymmetricLocalSyncTask(desc, r1, r2, 
false, NO_PENDING_REPAIR, false, PreviewKind.NONE);
+        task.run();
+
+        assertEquals(0, task.get().numberOfDifferences);
+    }
+
+    @Test
+    public void testDifference() throws Throwable
+    {
+        Range<Token> range = new Range<>(partitioner.getMinimumToken(), 
partitioner.getRandomToken());
+        UUID parentRepairSession = UUID.randomUUID();
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
+
+        
ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, 
FBUtilities.getBroadcastAddressAndPort(),
+                                                                 
Arrays.asList(cfs), Arrays.asList(range), false,
+                                                                 
ActiveRepairService.UNREPAIRED_SSTABLE, false,
+                                                                 
PreviewKind.NONE);
+
+        RepairJobDesc desc = new RepairJobDesc(parentRepairSession, 
UUID.randomUUID(), KEYSPACE1, "Standard1", Arrays.asList(range));
+
+        MerkleTrees tree1 = createInitialTree(desc);
+
+        MerkleTrees tree2 = createInitialTree(desc);
+
+        // change a range in one of the trees
+        Token token = partitioner.midpoint(range.left, range.right);
+        tree1.invalidate(token);
+        MerkleTree.TreeRange changed = tree1.get(token);
+        changed.hash("non-empty hash!".getBytes());
+
+        Set<Range<Token>> interesting = new HashSet<>();
+        interesting.add(changed);
+
+        // difference the trees
+        // note: we reuse the same endpoint which is bogus in theory but fine 
here
+        TreeResponse r1 = new 
TreeResponse(InetAddressAndPort.getByName("127.0.0.1"), tree1);
+        TreeResponse r2 = new 
TreeResponse(InetAddressAndPort.getByName("127.0.0.2"), tree2);
+        SymmetricLocalSyncTask task = new SymmetricLocalSyncTask(desc, r1, r2, 
false, NO_PENDING_REPAIR, false, PreviewKind.NONE);
+        DefaultConnectionFactory.MAX_CONNECT_ATTEMPTS = 1;
+        DefaultConnectionFactory.MAX_WAIT_TIME_NANOS = 
TimeUnit.SECONDS.toNanos(2);
+        try
+        {
+            task.run();
+        }
+        finally
+        {
+            DefaultConnectionFactory.MAX_WAIT_TIME_NANOS = 
TimeUnit.SECONDS.toNanos(30);
+            DefaultConnectionFactory.MAX_CONNECT_ATTEMPTS = 3;
+        }
+
+        // ensure that the changed range was recorded
+        assertEquals("Wrong differing ranges", interesting.size(), 
task.getCurrentStat().numberOfDifferences);
+    }
+
+    @Test
+    public void fullRepairStreamPlan() throws Exception
+    {
+        UUID sessionID = registerSession(cfs, true, true);
+        ActiveRepairService.ParentRepairSession prs = 
ActiveRepairService.instance.getParentRepairSession(sessionID);
+        RepairJobDesc desc = new RepairJobDesc(sessionID, 
UUIDGen.getTimeUUID(), KEYSPACE1, CF_STANDARD, prs.getRanges());
+
+        TreeResponse r1 = new TreeResponse(PARTICIPANT1, 
createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
+        TreeResponse r2 = new TreeResponse(PARTICIPANT2, 
createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
+
+        SymmetricLocalSyncTask task = new SymmetricLocalSyncTask(desc, r1, r2, 
false, NO_PENDING_REPAIR, false, PreviewKind.NONE);
+        StreamPlan plan = task.createStreamPlan(PARTICIPANT1, 
Lists.newArrayList(RANGE1));
+
+        assertEquals(NO_PENDING_REPAIR, plan.getPendingRepair());
+        assertTrue(plan.getFlushBeforeTransfer());
+    }
+
+    private static void assertNumInOut(StreamPlan plan, int expectedIncoming, 
int expectedOutgoing)
+    {
+        StreamCoordinator coordinator = plan.getCoordinator();
+        StreamSession session = 
Iterables.getOnlyElement(coordinator.getAllStreamSessions());
+        assertEquals(expectedIncoming, session.getNumRequests());
+        assertEquals(expectedOutgoing, session.getNumTransfers());
+    }
+
+    @Test
+    public void incrementalRepairStreamPlan() throws Exception
+    {
+        UUID sessionID = registerSession(cfs, true, true);
+        ActiveRepairService.ParentRepairSession prs = 
ActiveRepairService.instance.getParentRepairSession(sessionID);
+        RepairJobDesc desc = new RepairJobDesc(sessionID, 
UUIDGen.getTimeUUID(), KEYSPACE1, CF_STANDARD, prs.getRanges());
+
+        TreeResponse r1 = new TreeResponse(PARTICIPANT1, 
createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
+        TreeResponse r2 = new TreeResponse(PARTICIPANT2, 
createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
+
+        SymmetricLocalSyncTask task = new SymmetricLocalSyncTask(desc, r1, r2, 
false, desc.parentSessionId, false, PreviewKind.NONE);
+        StreamPlan plan = task.createStreamPlan(PARTICIPANT1, 
Lists.newArrayList(RANGE1));
+
+        assertEquals(desc.parentSessionId, plan.getPendingRepair());
+        assertFalse(plan.getFlushBeforeTransfer());
+        assertNumInOut(plan, 1, 1);
+    }
+
+    /**
+     * Don't reciprocate streams if the other endpoint is a transient replica
+     */
+    @Test
+    public void transientStreamPlan()
+    {
+        UUID sessionID = registerSession(cfs, true, true);
+        ActiveRepairService.ParentRepairSession prs = 
ActiveRepairService.instance.getParentRepairSession(sessionID);
+        RepairJobDesc desc = new RepairJobDesc(sessionID, 
UUIDGen.getTimeUUID(), KEYSPACE1, CF_STANDARD, prs.getRanges());
+
+        TreeResponse r1 = new TreeResponse(PARTICIPANT1, 
createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
+        TreeResponse r2 = new TreeResponse(PARTICIPANT2, 
createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
+
+        SymmetricLocalSyncTask task = new SymmetricLocalSyncTask(desc, r1, r2, 
true, desc.parentSessionId, false, PreviewKind.NONE);
+        StreamPlan plan = task.createStreamPlan(PARTICIPANT2, 
Lists.newArrayList(RANGE1));
+        assertNumInOut(plan, 1, 0);
+    }
+
+    private MerkleTrees createInitialTree(RepairJobDesc desc, IPartitioner 
partitioner)
+    {
+        MerkleTrees tree = new MerkleTrees(partitioner);
+        tree.addMerkleTrees((int) Math.pow(2, 15), desc.ranges);
+        tree.init();
+        for (MerkleTree.TreeRange r : tree.invalids())
+        {
+            r.ensureHashInitialised();
+        }
+        return tree;
+    }
+
+    private MerkleTrees createInitialTree(RepairJobDesc desc)
+    {
+        return createInitialTree(desc, partitioner);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/repair/SymmetricRemoteSyncTaskTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/repair/SymmetricRemoteSyncTaskTest.java 
b/test/unit/org/apache/cassandra/repair/SymmetricRemoteSyncTaskTest.java
new file mode 100644
index 0000000..06f968f
--- /dev/null
+++ b/test/unit/org/apache/cassandra/repair/SymmetricRemoteSyncTaskTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.repair.messages.RepairMessage;
+import org.apache.cassandra.repair.messages.SyncRequest;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.utils.UUIDGen;
+
+public class SymmetricRemoteSyncTaskTest extends AbstractRepairTest
+{
+    private static final RepairJobDesc DESC = new 
RepairJobDesc(UUIDGen.getTimeUUID(), UUIDGen.getTimeUUID(), "ks", "tbl", 
ALL_RANGES);
+    private static final List<Range<Token>> RANGE_LIST = 
ImmutableList.of(RANGE1);
+
+    private static class InstrumentedSymmetricRemoteSyncTask extends 
SymmetricRemoteSyncTask
+    {
+        public InstrumentedSymmetricRemoteSyncTask(InetAddressAndPort e1, 
InetAddressAndPort e2)
+        {
+            super(DESC, new TreeResponse(e1, null), new TreeResponse(e2, 
null), PreviewKind.NONE);
+        }
+
+        RepairMessage sentMessage = null;
+        InetAddressAndPort sentTo = null;
+
+        @Override
+        void sendRequest(RepairMessage request, InetAddressAndPort to)
+        {
+            Assert.assertNull(sentMessage);
+            Assert.assertNotNull(request);
+            Assert.assertNotNull(to);
+            sentMessage = request;
+            sentTo = to;
+        }
+    }
+
+    @Test
+    public void normalSync()
+    {
+        InstrumentedSymmetricRemoteSyncTask syncTask = new 
InstrumentedSymmetricRemoteSyncTask(PARTICIPANT1, PARTICIPANT2);
+        syncTask.startSync(RANGE_LIST);
+
+        Assert.assertNotNull(syncTask.sentMessage);
+        Assert.assertSame(SyncRequest.class, syncTask.sentMessage.getClass());
+        Assert.assertEquals(PARTICIPANT1, syncTask.sentTo);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java 
b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java
index 3ea888d..a7e8272 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java
@@ -44,12 +44,13 @@ public class LocalSessionAccessor
         ARS.consistent.local.putSessionUnsafe(session);
     }
 
-    public static void finalizeUnsafe(UUID sessionID)
+    public static long finalizeUnsafe(UUID sessionID)
     {
         LocalSession session = ARS.consistent.local.getSession(sessionID);
         assert session != null;
         session.setState(ConsistentSession.State.FINALIZED);
         ARS.consistent.local.save(session);
+        return session.repairedAt;
     }
 
     public static void failUnsafe(UUID sessionID)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java 
b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
index d368510..e387c41 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
@@ -40,10 +40,9 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
+import org.apache.cassandra.locator.RangesAtEndpoint;
 import org.apache.cassandra.repair.AbstractRepairTest;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.repair.KeyspaceRepairManager;
@@ -136,7 +135,11 @@ public class LocalSessionTest extends AbstractRepairTest
         boolean prepareSessionCalled = false;
 
         @Override
-        ListenableFuture prepareSession(KeyspaceRepairManager repairManager, 
UUID sessionID, Collection<ColumnFamilyStore> tables, Collection<Range<Token>> 
ranges, ExecutorService executor)
+        ListenableFuture prepareSession(KeyspaceRepairManager repairManager,
+                                        UUID sessionID,
+                                        Collection<ColumnFamilyStore> tables,
+                                        RangesAtEndpoint ranges,
+                                        ExecutorService executor)
         {
             prepareSessionCalled = true;
             if (prepareSessionFuture != null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/schema/MockSchema.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/schema/MockSchema.java 
b/test/unit/org/apache/cassandra/schema/MockSchema.java
index 98bf9ca..7a6b011 100644
--- a/test/unit/org/apache/cassandra/schema/MockSchema.java
+++ b/test/unit/org/apache/cassandra/schema/MockSchema.java
@@ -127,7 +127,7 @@ public class MockSchema
         }
         SerializationHeader header = SerializationHeader.make(cfs.metadata(), 
Collections.emptyList());
         StatsMetadata metadata = (StatsMetadata) new 
MetadataCollector(cfs.metadata().comparator)
-                                                 
.finalizeMetadata(cfs.metadata().partitioner.getClass().getCanonicalName(), 
0.01f, -1, null, header)
+                                                 
.finalizeMetadata(cfs.metadata().partitioner.getClass().getCanonicalName(), 
0.01f, -1, null, false, header)
                                                  .get(MetadataType.STATS);
         SSTableReader reader = SSTableReader.internalOpen(descriptor, 
components, cfs.metadata,
                                                           
RANDOM_ACCESS_READER_FACTORY.sharedCopy(), 
RANDOM_ACCESS_READER_FACTORY.sharedCopy(), indexSummary.sharedCopy(),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to