http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java 
b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
index 294731a..4f7cde0 100644
--- a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
+++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
@@ -20,8 +20,6 @@ package org.apache.cassandra.service;
 
 import java.util.*;
 
-import javax.xml.crypto.Data;
-
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Sets;
 import org.junit.Assert;
@@ -36,13 +34,13 @@ import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.RowUpdateBuilder;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.db.lifecycle.View;
-import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.repair.messages.RepairOption;
 import org.apache.cassandra.streaming.PreviewKind;
@@ -107,13 +105,13 @@ public class ActiveRepairServiceTest
     public void testGetNeighborsPlusOne() throws Throwable
     {
         // generate rf+1 nodes, and ensure that all nodes are returned
-        Set<InetAddressAndPort> expected = addTokens(1 + 
Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
+        Set<InetAddressAndPort> expected = addTokens(1 + 
Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor().allReplicas);
         expected.remove(FBUtilities.getBroadcastAddressAndPort());
-        Collection<Range<Token>> ranges = 
StorageService.instance.getLocalRanges(KEYSPACE5);
+        Iterable<Range<Token>> ranges = 
StorageService.instance.getLocalReplicas(KEYSPACE5).ranges();
         Set<InetAddressAndPort> neighbors = new HashSet<>();
         for (Range<Token> range : ranges)
         {
-            neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, 
ranges, range, null, null));
+            neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, 
ranges, range, null, null).endpoints());
         }
         assertEquals(expected, neighbors);
     }
@@ -124,19 +122,19 @@ public class ActiveRepairServiceTest
         TokenMetadata tmd = StorageService.instance.getTokenMetadata();
 
         // generate rf*2 nodes, and ensure that only neighbors specified by 
the ARS are returned
-        addTokens(2 * 
Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
+        addTokens(2 * 
Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor().allReplicas);
         AbstractReplicationStrategy ars = 
Keyspace.open(KEYSPACE5).getReplicationStrategy();
         Set<InetAddressAndPort> expected = new HashSet<>();
-        for (Range<Token> replicaRange : 
ars.getAddressRanges().get(FBUtilities.getBroadcastAddressAndPort()))
+        for (Replica replica : 
ars.getAddressReplicas().get(FBUtilities.getBroadcastAddressAndPort()))
         {
-            
expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicaRange));
+            
expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replica.range()).endpoints());
         }
         expected.remove(FBUtilities.getBroadcastAddressAndPort());
-        Collection<Range<Token>> ranges = 
StorageService.instance.getLocalRanges(KEYSPACE5);
+        Iterable<Range<Token>> ranges = 
StorageService.instance.getLocalReplicas(KEYSPACE5).ranges();
         Set<InetAddressAndPort> neighbors = new HashSet<>();
         for (Range<Token> range : ranges)
         {
-            neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, 
ranges, range, null, null));
+            neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, 
ranges, range, null, null).endpoints());
         }
         assertEquals(expected, neighbors);
     }
@@ -147,18 +145,18 @@ public class ActiveRepairServiceTest
         TokenMetadata tmd = StorageService.instance.getTokenMetadata();
 
         // generate rf+1 nodes, and ensure that all nodes are returned
-        Set<InetAddressAndPort> expected = addTokens(1 + 
Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
+        Set<InetAddressAndPort> expected = addTokens(1 + 
Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor().allReplicas);
         expected.remove(FBUtilities.getBroadcastAddressAndPort());
         // remove remote endpoints
         TokenMetadata.Topology topology = 
tmd.cloneOnlyTokenMap().getTopology();
         HashSet<InetAddressAndPort> localEndpoints = 
Sets.newHashSet(topology.getDatacenterEndpoints().get(DatabaseDescriptor.getLocalDataCenter()));
         expected = Sets.intersection(expected, localEndpoints);
 
-        Collection<Range<Token>> ranges = 
StorageService.instance.getLocalRanges(KEYSPACE5);
+        Iterable<Range<Token>> ranges = 
StorageService.instance.getLocalReplicas(KEYSPACE5).ranges();
         Set<InetAddressAndPort> neighbors = new HashSet<>();
         for (Range<Token> range : ranges)
         {
-            neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, 
ranges, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null));
+            neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, 
ranges, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), 
null).endpoints());
         }
         assertEquals(expected, neighbors);
     }
@@ -169,12 +167,12 @@ public class ActiveRepairServiceTest
         TokenMetadata tmd = StorageService.instance.getTokenMetadata();
 
         // generate rf*2 nodes, and ensure that only neighbors specified by 
the ARS are returned
-        addTokens(2 * 
Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
+        addTokens(2 * 
Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor().allReplicas);
         AbstractReplicationStrategy ars = 
Keyspace.open(KEYSPACE5).getReplicationStrategy();
         Set<InetAddressAndPort> expected = new HashSet<>();
-        for (Range<Token> replicaRange : 
ars.getAddressRanges().get(FBUtilities.getBroadcastAddressAndPort()))
+        for (Replica replica : 
ars.getAddressReplicas().get(FBUtilities.getBroadcastAddressAndPort()))
         {
-            
expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicaRange));
+            
expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replica.range()).endpoints());
         }
         expected.remove(FBUtilities.getBroadcastAddressAndPort());
         // remove remote endpoints
@@ -182,11 +180,11 @@ public class ActiveRepairServiceTest
         HashSet<InetAddressAndPort> localEndpoints = 
Sets.newHashSet(topology.getDatacenterEndpoints().get(DatabaseDescriptor.getLocalDataCenter()));
         expected = Sets.intersection(expected, localEndpoints);
 
-        Collection<Range<Token>> ranges = 
StorageService.instance.getLocalRanges(KEYSPACE5);
+        Iterable<Range<Token>> ranges = 
StorageService.instance.getLocalReplicas(KEYSPACE5).ranges();
         Set<InetAddressAndPort> neighbors = new HashSet<>();
         for (Range<Token> range : ranges)
         {
-            neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, 
ranges, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null));
+            neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, 
ranges, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), 
null).endpoints());
         }
         assertEquals(expected, neighbors);
     }
@@ -197,30 +195,30 @@ public class ActiveRepairServiceTest
         TokenMetadata tmd = StorageService.instance.getTokenMetadata();
 
         // generate rf*2 nodes, and ensure that only neighbors specified by 
the hosts are returned
-        addTokens(2 * 
Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
+        addTokens(2 * 
Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor().allReplicas);
         AbstractReplicationStrategy ars = 
Keyspace.open(KEYSPACE5).getReplicationStrategy();
         List<InetAddressAndPort> expected = new ArrayList<>();
-        for (Range<Token> replicaRange : 
ars.getAddressRanges().get(FBUtilities.getBroadcastAddressAndPort()))
+        for (Replica replicas : 
ars.getAddressReplicas().get(FBUtilities.getBroadcastAddressAndPort()))
         {
-            
expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicaRange));
+            
expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicas.range()).endpoints());
         }
 
         expected.remove(FBUtilities.getBroadcastAddressAndPort());
         Collection<String> hosts = 
Arrays.asList(FBUtilities.getBroadcastAddressAndPort().toString(),expected.get(0).toString());
-        Collection<Range<Token>> ranges = 
StorageService.instance.getLocalRanges(KEYSPACE5);
+        Iterable<Range<Token>> ranges = 
StorageService.instance.getLocalReplicas(KEYSPACE5).ranges();
 
         assertEquals(expected.get(0), 
ActiveRepairService.getNeighbors(KEYSPACE5, ranges,
                                                                        
ranges.iterator().next(),
-                                                                       null, 
hosts).iterator().next());
+                                                                       null, 
hosts).endpoints().iterator().next());
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testGetNeighborsSpecifiedHostsWithNoLocalHost() throws 
Throwable
     {
-        addTokens(2 * 
Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
+        addTokens(2 * 
Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor().allReplicas);
         //Dont give local endpoint
         Collection<String> hosts = Arrays.asList("127.0.0.3");
-        Collection<Range<Token>> ranges = 
StorageService.instance.getLocalRanges(KEYSPACE5);
+        Iterable<Range<Token>> ranges = 
StorageService.instance.getLocalReplicas(KEYSPACE5).ranges();
         ActiveRepairService.getNeighbors(KEYSPACE5, ranges, 
ranges.iterator().next(), null, hosts);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/BootstrapTransientTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/BootstrapTransientTest.java 
b/test/unit/org/apache/cassandra/service/BootstrapTransientTest.java
new file mode 100644
index 0000000..63973ea
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/BootstrapTransientTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.cassandra.locator.EndpointsByReplica;
+import org.apache.cassandra.locator.EndpointsForRange;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.OrderPreservingPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.RangeStreamer;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.AbstractEndpointSnitch;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaCollection;
+import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.utils.Pair;
+
+import static org.apache.cassandra.locator.Replica.fullReplica;
+import static org.apache.cassandra.locator.Replica.transientReplica;
+import static 
org.apache.cassandra.service.StorageServiceTest.assertMultimapEqualsIgnoreOrder;
+
+/**
+ * This is also fairly effectively testing source retrieval for bootstrap as 
well since RangeStreamer
+ * is used to calculate the endpoints to fetch from and check they are alive 
for both RangeRelocator (move) and
+ * bootstrap (RangeRelocator).
+ */
+public class BootstrapTransientTest
+{
+    static InetAddressAndPort aAddress;
+    static InetAddressAndPort bAddress;
+    static InetAddressAndPort cAddress;
+    static InetAddressAndPort dAddress;
+
+    @BeforeClass
+    public static void setUpClass() throws Exception
+    {
+        aAddress = InetAddressAndPort.getByName("127.0.0.1");
+        bAddress = InetAddressAndPort.getByName("127.0.0.2");
+        cAddress = InetAddressAndPort.getByName("127.0.0.3");
+        dAddress = InetAddressAndPort.getByName("127.0.0.4");
+    }
+
+    private final List<InetAddressAndPort> downNodes = new ArrayList<>();
+    Predicate<Replica> alivePredicate = replica -> 
!downNodes.contains(replica.endpoint());
+
+    private final List<InetAddressAndPort> sourceFilterDownNodes = new 
ArrayList<>();
+    private final Collection<Predicate<Replica>> sourceFilters = 
Collections.singleton(replica -> 
!sourceFilterDownNodes.contains(replica.endpoint()));
+
+    @After
+    public void clearDownNode()
+    {
+        downNodes.clear();
+        sourceFilterDownNodes.clear();
+    }
+
+    @BeforeClass
+    public static void setupDD()
+    {
+        DatabaseDescriptor.daemonInitialization();
+    }
+
+    Token tenToken = new OrderPreservingPartitioner.StringToken("00010");
+    Token twentyToken = new OrderPreservingPartitioner.StringToken("00020");
+    Token thirtyToken = new OrderPreservingPartitioner.StringToken("00030");
+    Token fourtyToken = new OrderPreservingPartitioner.StringToken("00040");
+
+    Range<Token> aRange = new Range<>(thirtyToken, tenToken);
+    Range<Token> bRange = new Range<>(tenToken, twentyToken);
+    Range<Token> cRange = new Range<>(twentyToken, thirtyToken);
+    Range<Token> dRange = new Range<>(thirtyToken, fourtyToken);
+
+    RangesAtEndpoint toFetch = RangesAtEndpoint.of(new Replica(dAddress, 
dRange, true),
+                                                   new Replica(dAddress, 
cRange, true),
+                                                   new Replica(dAddress, 
bRange, false));
+
+    @Test
+    public void testRangeStreamerRangesToFetch() throws Exception
+    {
+        EndpointsByReplica expectedResult = new 
EndpointsByReplica(ImmutableMap.of(
+        fullReplica(dAddress, dRange), 
EndpointsForRange.builder(aRange).add(fullReplica(bAddress, 
aRange)).add(transientReplica(cAddress, aRange)).build(),
+        fullReplica(dAddress, cRange), 
EndpointsForRange.builder(cRange).add(fullReplica(cAddress, 
cRange)).add(transientReplica(bAddress, cRange)).build(),
+        transientReplica(dAddress, bRange), 
EndpointsForRange.builder(bRange).add(transientReplica(aAddress, 
bRange)).build()));
+
+        invokeCalculateRangesToFetchWithPreferredEndpoints(toFetch, 
constructTMDs(), expectedResult);
+    }
+
+    private Pair<TokenMetadata, TokenMetadata> constructTMDs()
+    {
+        TokenMetadata tmd = new TokenMetadata();
+        tmd.updateNormalToken(aRange.right, aAddress);
+        tmd.updateNormalToken(bRange.right, bAddress);
+        tmd.updateNormalToken(cRange.right, cAddress);
+        TokenMetadata updated = tmd.cloneOnlyTokenMap();
+        updated.updateNormalToken(dRange.right, dAddress);
+
+        return Pair.create(tmd, updated);
+    }
+
+    private void 
invokeCalculateRangesToFetchWithPreferredEndpoints(ReplicaCollection<?> toFetch,
+                                                                    
Pair<TokenMetadata, TokenMetadata> tmds,
+                                                                    
EndpointsByReplica expectedResult)
+    {
+        DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true);
+
+        EndpointsByReplica result = 
RangeStreamer.calculateRangesToFetchWithPreferredEndpoints((address, replicas) 
-> replicas,
+                                                                               
                                    simpleStrategy(tmds.left),
+                                                                               
                                    toFetch,
+                                                                               
                                    true,
+                                                                               
                                    tmds.left,
+                                                                               
                                    tmds.right,
+                                                                               
                                    alivePredicate,
+                                                                               
                                    "OldNetworkTopologyStrategyTest",
+                                                                               
                                    sourceFilters);
+        result.asMap().forEach((replica, list) -> System.out.printf("Replica 
%s, sources %s%n", replica, list));
+        assertMultimapEqualsIgnoreOrder(expectedResult, result);
+
+    }
+
+    private AbstractReplicationStrategy simpleStrategy(TokenMetadata tmd)
+    {
+        IEndpointSnitch snitch = new AbstractEndpointSnitch()
+        {
+            public int compareEndpoints(InetAddressAndPort target, Replica r1, 
Replica r2)
+            {
+                return 0;
+            }
+
+            public String getRack(InetAddressAndPort endpoint)
+            {
+                return "R1";
+            }
+
+            public String getDatacenter(InetAddressAndPort endpoint)
+            {
+                return "DC1";
+            }
+        };
+
+        return new SimpleStrategy("MoveTransientTest",
+                                  tmd,
+                                  snitch,
+                                  
com.google.common.collect.ImmutableMap.of("replication_factor", "3/1"));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java 
b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
index 8ddc4f0..3c4748e 100644
--- a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
+++ b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
@@ -130,10 +130,10 @@ public class LeaveAndBootstrapTest
             strategy = getStrategy(keyspaceName, tmd);
             for (Token token : keyTokens)
             {
-                int replicationFactor = strategy.getReplicationFactor();
+                int replicationFactor = 
strategy.getReplicationFactor().allReplicas;
 
-                HashSet<InetAddressAndPort> actual = new 
HashSet<>(tmd.getWriteEndpoints(token, keyspaceName, 
strategy.calculateNaturalEndpoints(token, tmd.cloneOnlyTokenMap())));
-                HashSet<InetAddressAndPort> expected = new HashSet<>();
+                Set<InetAddressAndPort> actual = tmd.getWriteEndpoints(token, 
keyspaceName, strategy.calculateNaturalReplicas(token, 
tmd.cloneOnlyTokenMap()).forToken(token)).endpoints();
+                Set<InetAddressAndPort> expected = new HashSet<>();
 
                 for (int i = 0; i < replicationFactor; i++)
                 {
@@ -198,8 +198,6 @@ public class LeaveAndBootstrapTest
                     ApplicationState.STATUS,
                     
valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(7))));
 
-        Collection<InetAddressAndPort> endpoints = null;
-
         /* don't require test update every time a new keyspace is added to 
test/conf/cassandra.yaml */
         Map<String, AbstractReplicationStrategy> keyspaceStrategyMap = new 
HashMap<String, AbstractReplicationStrategy>();
         for (int i=1; i<=4; i++)
@@ -263,18 +261,18 @@ public class LeaveAndBootstrapTest
 
             for (int i = 0; i < keyTokens.size(); i++)
             {
-                endpoints = tmd.getWriteEndpoints(keyTokens.get(i), 
keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(i)));
+                Collection<InetAddressAndPort> endpoints = 
tmd.getWriteEndpoints(keyTokens.get(i), keyspaceName, 
strategy.getNaturalReplicasForToken(keyTokens.get(i))).endpoints();
                 
assertEquals(expectedEndpoints.get(keyspaceName).get(keyTokens.get(i)).size(), 
endpoints.size());
                 
assertTrue(expectedEndpoints.get(keyspaceName).get(keyTokens.get(i)).containsAll(endpoints));
             }
 
             // just to be sure that things still work according to the old 
tests, run them:
-            if (strategy.getReplicationFactor() != 3)
+            if (strategy.getReplicationFactor().allReplicas != 3)
                 continue;
             // tokens 5, 15 and 25 should go three nodes
             for (int i=0; i<3; ++i)
             {
-                endpoints = tmd.getWriteEndpoints(keyTokens.get(i), 
keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(i)));
+                Collection<InetAddressAndPort> endpoints = 
tmd.getWriteEndpoints(keyTokens.get(i), keyspaceName, 
strategy.getNaturalReplicasForToken(keyTokens.get(i))).endpoints();
                 assertEquals(3, endpoints.size());
                 assertTrue(endpoints.contains(hosts.get(i+1)));
                 assertTrue(endpoints.contains(hosts.get(i+2)));
@@ -282,7 +280,7 @@ public class LeaveAndBootstrapTest
             }
 
             // token 35 should go to nodes 4, 5, 6, 7 and boot1
-            endpoints = tmd.getWriteEndpoints(keyTokens.get(3), keyspaceName, 
strategy.getNaturalEndpoints(keyTokens.get(3)));
+            Collection<InetAddressAndPort> endpoints = 
tmd.getWriteEndpoints(keyTokens.get(3), keyspaceName, 
strategy.getNaturalReplicasForToken(keyTokens.get(3))).endpoints();
             assertEquals(5, endpoints.size());
             assertTrue(endpoints.contains(hosts.get(4)));
             assertTrue(endpoints.contains(hosts.get(5)));
@@ -291,7 +289,7 @@ public class LeaveAndBootstrapTest
             assertTrue(endpoints.contains(boot1));
 
             // token 45 should go to nodes 5, 6, 7, 0, boot1 and boot2
-            endpoints = tmd.getWriteEndpoints(keyTokens.get(4), keyspaceName, 
strategy.getNaturalEndpoints(keyTokens.get(4)));
+            endpoints = tmd.getWriteEndpoints(keyTokens.get(4), keyspaceName, 
strategy.getNaturalReplicasForToken(keyTokens.get(4))).endpoints();
             assertEquals(6, endpoints.size());
             assertTrue(endpoints.contains(hosts.get(5)));
             assertTrue(endpoints.contains(hosts.get(6)));
@@ -301,7 +299,7 @@ public class LeaveAndBootstrapTest
             assertTrue(endpoints.contains(boot2));
 
             // token 55 should go to nodes 6, 7, 8, 0, 1, boot1 and boot2
-            endpoints = tmd.getWriteEndpoints(keyTokens.get(5), keyspaceName, 
strategy.getNaturalEndpoints(keyTokens.get(5)));
+            endpoints = tmd.getWriteEndpoints(keyTokens.get(5), keyspaceName, 
strategy.getNaturalReplicasForToken(keyTokens.get(5))).endpoints();
             assertEquals(7, endpoints.size());
             assertTrue(endpoints.contains(hosts.get(6)));
             assertTrue(endpoints.contains(hosts.get(7)));
@@ -312,7 +310,7 @@ public class LeaveAndBootstrapTest
             assertTrue(endpoints.contains(boot2));
 
             // token 65 should go to nodes 7, 8, 9, 0, 1 and boot2
-            endpoints = tmd.getWriteEndpoints(keyTokens.get(6), keyspaceName, 
strategy.getNaturalEndpoints(keyTokens.get(6)));
+            endpoints = tmd.getWriteEndpoints(keyTokens.get(6), keyspaceName, 
strategy.getNaturalReplicasForToken(keyTokens.get(6))).endpoints();
             assertEquals(6, endpoints.size());
             assertTrue(endpoints.contains(hosts.get(7)));
             assertTrue(endpoints.contains(hosts.get(8)));
@@ -322,7 +320,7 @@ public class LeaveAndBootstrapTest
             assertTrue(endpoints.contains(boot2));
 
             // token 75 should to go nodes 8, 9, 0, 1, 2 and boot2
-            endpoints = tmd.getWriteEndpoints(keyTokens.get(7), keyspaceName, 
strategy.getNaturalEndpoints(keyTokens.get(7)));
+            endpoints = tmd.getWriteEndpoints(keyTokens.get(7), keyspaceName, 
strategy.getNaturalReplicasForToken(keyTokens.get(7))).endpoints();
             assertEquals(6, endpoints.size());
             assertTrue(endpoints.contains(hosts.get(8)));
             assertTrue(endpoints.contains(hosts.get(9)));
@@ -332,7 +330,7 @@ public class LeaveAndBootstrapTest
             assertTrue(endpoints.contains(boot2));
 
             // token 85 should go to nodes 9, 0, 1 and 2
-            endpoints = tmd.getWriteEndpoints(keyTokens.get(8), keyspaceName, 
strategy.getNaturalEndpoints(keyTokens.get(8)));
+            endpoints = tmd.getWriteEndpoints(keyTokens.get(8), keyspaceName, 
strategy.getNaturalReplicasForToken(keyTokens.get(8))).endpoints();
             assertEquals(4, endpoints.size());
             assertTrue(endpoints.contains(hosts.get(9)));
             assertTrue(endpoints.contains(hosts.get(0)));
@@ -340,7 +338,7 @@ public class LeaveAndBootstrapTest
             assertTrue(endpoints.contains(hosts.get(2)));
 
             // token 95 should go to nodes 0, 1 and 2
-            endpoints = tmd.getWriteEndpoints(keyTokens.get(9), keyspaceName, 
strategy.getNaturalEndpoints(keyTokens.get(9)));
+            endpoints = tmd.getWriteEndpoints(keyTokens.get(9), keyspaceName, 
strategy.getNaturalReplicasForToken(keyTokens.get(9))).endpoints();
             assertEquals(3, endpoints.size());
             assertTrue(endpoints.contains(hosts.get(0)));
             assertTrue(endpoints.contains(hosts.get(1)));
@@ -385,18 +383,18 @@ public class LeaveAndBootstrapTest
 
             for (int i = 0; i < keyTokens.size(); i++)
             {
-                endpoints = tmd.getWriteEndpoints(keyTokens.get(i), 
keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(i)));
+                Collection<InetAddressAndPort> endpoints = 
tmd.getWriteEndpoints(keyTokens.get(i), keyspaceName, 
strategy.getNaturalReplicasForToken(keyTokens.get(i))).endpoints();
                 
assertEquals(expectedEndpoints.get(keyspaceName).get(keyTokens.get(i)).size(), 
endpoints.size());
                 
assertTrue(expectedEndpoints.get(keyspaceName).get(keyTokens.get(i)).containsAll(endpoints));
             }
 
-            if (strategy.getReplicationFactor() != 3)
+            if (strategy.getReplicationFactor().allReplicas != 3)
                 continue;
             // leave this stuff in to guarantee the old tests work the way 
they were supposed to.
             // tokens 5, 15 and 25 should go three nodes
             for (int i=0; i<3; ++i)
             {
-                endpoints = tmd.getWriteEndpoints(keyTokens.get(i), 
keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(i)));
+                Collection<InetAddressAndPort> endpoints = 
tmd.getWriteEndpoints(keyTokens.get(i), keyspaceName, 
strategy.getNaturalReplicasForToken(keyTokens.get(i))).endpoints();
                 assertEquals(3, endpoints.size());
                 assertTrue(endpoints.contains(hosts.get(i+1)));
                 assertTrue(endpoints.contains(hosts.get(i+2)));
@@ -404,21 +402,21 @@ public class LeaveAndBootstrapTest
             }
 
             // token 35 goes to nodes 4, 5 and boot1
-            endpoints = tmd.getWriteEndpoints(keyTokens.get(3), keyspaceName, 
strategy.getNaturalEndpoints(keyTokens.get(3)));
+            Collection<InetAddressAndPort> endpoints = 
tmd.getWriteEndpoints(keyTokens.get(3), keyspaceName, 
strategy.getNaturalReplicasForToken(keyTokens.get(3))).endpoints();
             assertEquals(3, endpoints.size());
             assertTrue(endpoints.contains(hosts.get(4)));
             assertTrue(endpoints.contains(hosts.get(5)));
             assertTrue(endpoints.contains(boot1));
 
             // token 45 goes to nodes 5, boot1 and node7
-            endpoints = tmd.getWriteEndpoints(keyTokens.get(4), keyspaceName, 
strategy.getNaturalEndpoints(keyTokens.get(4)));
+            endpoints = tmd.getWriteEndpoints(keyTokens.get(4), keyspaceName, 
strategy.getNaturalReplicasForToken(keyTokens.get(4))).endpoints();
             assertEquals(3, endpoints.size());
             assertTrue(endpoints.contains(hosts.get(5)));
             assertTrue(endpoints.contains(boot1));
             assertTrue(endpoints.contains(hosts.get(7)));
 
             // token 55 goes to boot1, 7, boot2, 8 and 0
-            endpoints = tmd.getWriteEndpoints(keyTokens.get(5), keyspaceName, 
strategy.getNaturalEndpoints(keyTokens.get(5)));
+            endpoints = tmd.getWriteEndpoints(keyTokens.get(5), keyspaceName, 
strategy.getNaturalReplicasForToken(keyTokens.get(5))).endpoints();
             assertEquals(5, endpoints.size());
             assertTrue(endpoints.contains(boot1));
             assertTrue(endpoints.contains(hosts.get(7)));
@@ -427,7 +425,7 @@ public class LeaveAndBootstrapTest
             assertTrue(endpoints.contains(hosts.get(0)));
 
             // token 65 goes to nodes 7, boot2, 8, 0 and 1
-            endpoints = tmd.getWriteEndpoints(keyTokens.get(6), keyspaceName, 
strategy.getNaturalEndpoints(keyTokens.get(6)));
+            endpoints = tmd.getWriteEndpoints(keyTokens.get(6), keyspaceName, 
strategy.getNaturalReplicasForToken(keyTokens.get(6))).endpoints();
             assertEquals(5, endpoints.size());
             assertTrue(endpoints.contains(hosts.get(7)));
             assertTrue(endpoints.contains(boot2));
@@ -436,7 +434,7 @@ public class LeaveAndBootstrapTest
             assertTrue(endpoints.contains(hosts.get(1)));
 
             // token 75 goes to nodes boot2, 8, 0, 1 and 2
-            endpoints = tmd.getWriteEndpoints(keyTokens.get(7), keyspaceName, 
strategy.getNaturalEndpoints(keyTokens.get(7)));
+            endpoints = tmd.getWriteEndpoints(keyTokens.get(7), keyspaceName, 
strategy.getNaturalReplicasForToken(keyTokens.get(7))).endpoints();
             assertEquals(5, endpoints.size());
             assertTrue(endpoints.contains(boot2));
             assertTrue(endpoints.contains(hosts.get(8)));
@@ -445,14 +443,14 @@ public class LeaveAndBootstrapTest
             assertTrue(endpoints.contains(hosts.get(2)));
 
             // token 85 goes to nodes 0, 1 and 2
-            endpoints = tmd.getWriteEndpoints(keyTokens.get(8), keyspaceName, 
strategy.getNaturalEndpoints(keyTokens.get(8)));
+            endpoints = tmd.getWriteEndpoints(keyTokens.get(8), keyspaceName, 
strategy.getNaturalReplicasForToken(keyTokens.get(8))).endpoints();
             assertEquals(3, endpoints.size());
             assertTrue(endpoints.contains(hosts.get(0)));
             assertTrue(endpoints.contains(hosts.get(1)));
             assertTrue(endpoints.contains(hosts.get(2)));
 
             // token 95 goes to nodes 0, 1 and 2
-            endpoints = tmd.getWriteEndpoints(keyTokens.get(9), keyspaceName, 
strategy.getNaturalEndpoints(keyTokens.get(9)));
+            endpoints = tmd.getWriteEndpoints(keyTokens.get(9), keyspaceName, 
strategy.getNaturalReplicasForToken(keyTokens.get(9))).endpoints();
             assertEquals(3, endpoints.size());
             assertTrue(endpoints.contains(hosts.get(0)));
             assertTrue(endpoints.contains(hosts.get(1)));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/MoveTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/MoveTest.java 
b/test/unit/org/apache/cassandra/service/MoveTest.java
index 7321fba..731a25d 100644
--- a/test/unit/org/apache/cassandra/service/MoveTest.java
+++ b/test/unit/org/apache/cassandra/service/MoveTest.java
@@ -23,13 +23,21 @@ import java.net.UnknownHostException;
 import java.util.*;
 
 import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Multimap;
+import org.apache.cassandra.locator.EndpointsForRange;
+import org.apache.cassandra.locator.EndpointsForToken;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.locator.RangesByEndpoint;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaCollection;
+import org.apache.cassandra.locator.ReplicaUtils;
 import org.apache.cassandra.schema.MigrationManager;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -492,24 +500,25 @@ public class MoveTest
         tmd.updateNormalToken(new BigIntegerToken(String.valueOf(token)), 
host);
     }
 
-    private Map.Entry<Range<Token>, Collection<InetAddressAndPort>> 
generatePendingMapEntry(int start, int end, String... endpoints) throws 
UnknownHostException
+    private Map.Entry<Range<Token>, EndpointsForRange> 
generatePendingMapEntry(int start, int end, String... endpoints) throws 
UnknownHostException
     {
-        Map<Range<Token>, Collection<InetAddressAndPort>> pendingRanges = new 
HashMap<>();
-        pendingRanges.put(generateRange(start, end), makeAddrs(endpoints));
+        Map<Range<Token>, EndpointsForRange> pendingRanges = new HashMap<>();
+        Range<Token> range = generateRange(start, end);
+        pendingRanges.put(range, makeReplicas(range, endpoints));
         return pendingRanges.entrySet().iterator().next();
     }
 
-    private Map<Range<Token>, Collection<InetAddressAndPort>> 
generatePendingRanges(Map.Entry<Range<Token>, 
Collection<InetAddressAndPort>>... entries)
+    private Map<Range<Token>, EndpointsForRange> 
generatePendingRanges(Map.Entry<Range<Token>, EndpointsForRange>... entries)
     {
-        Map<Range<Token>, Collection<InetAddressAndPort>> pendingRanges = new 
HashMap<>();
-        for(Map.Entry<Range<Token>, Collection<InetAddressAndPort>> entry : 
entries)
+        Map<Range<Token>, EndpointsForRange> pendingRanges = new HashMap<>();
+        for(Map.Entry<Range<Token>, EndpointsForRange> entry : entries)
         {
             pendingRanges.put(entry.getKey(), entry.getValue());
         }
         return pendingRanges;
     }
 
-    private void assertPendingRanges(TokenMetadata tmd, Map<Range<Token>,  
Collection<InetAddressAndPort>> pendingRanges, String keyspaceName) throws 
ConfigurationException
+    private void assertPendingRanges(TokenMetadata tmd, Map<Range<Token>, 
EndpointsForRange> pendingRanges, String keyspaceName) throws 
ConfigurationException
     {
         boolean keyspaceFound = false;
         for (String nonSystemKeyspaceName : 
Schema.instance.getNonLocalStrategyKeyspaces())
@@ -523,15 +532,15 @@ public class MoveTest
         assert keyspaceFound;
     }
 
-    private void assertMaps(Map<Range<Token>, Collection<InetAddressAndPort>> 
expected, PendingRangeMaps actual)
+    private void assertMaps(Map<Range<Token>, EndpointsForRange> expected, 
PendingRangeMaps actual)
     {
         int sizeOfActual = 0;
-        Iterator<Map.Entry<Range<Token>, List<InetAddressAndPort>>> iterator = 
actual.iterator();
+        Iterator<Map.Entry<Range<Token>, EndpointsForRange.Mutable>> iterator 
= actual.iterator();
         while(iterator.hasNext())
         {
-            Map.Entry<Range<Token>, List<InetAddressAndPort>> actualEntry = 
iterator.next();
+            Map.Entry<Range<Token>, EndpointsForRange.Mutable> actualEntry = 
iterator.next();
             assertNotNull(expected.get(actualEntry.getKey()));
-            assertEquals(new HashSet<>(expected.get(actualEntry.getKey())), 
new HashSet<>(actualEntry.getValue()));
+            
assertEquals(ImmutableSet.copyOf(expected.get(actualEntry.getKey())), 
ImmutableSet.copyOf(actualEntry.getValue()));
             sizeOfActual++;
         }
 
@@ -589,9 +598,9 @@ public class MoveTest
             int numMoved = 0;
             for (Token token : keyTokens)
             {
-                int replicationFactor = strategy.getReplicationFactor();
+                int replicationFactor = 
strategy.getReplicationFactor().allReplicas;
 
-                HashSet<InetAddressAndPort> actual = new 
HashSet<>(tmd.getWriteEndpoints(token, keyspaceName, 
strategy.calculateNaturalEndpoints(token, tmd.cloneOnlyTokenMap())));
+                EndpointsForToken actual = tmd.getWriteEndpoints(token, 
keyspaceName, strategy.calculateNaturalReplicas(token, 
tmd.cloneOnlyTokenMap()).forToken(token));
                 HashSet<InetAddressAndPort> expected = new HashSet<>();
 
                 for (int i = 0; i < replicationFactor; i++)
@@ -600,10 +609,10 @@ public class MoveTest
                 }
 
                 if (expected.size() == actual.size()) {
-                       assertEquals("mismatched endpoint sets", expected, 
actual);
+                       assertEquals("mismatched endpoint sets", expected, 
actual.endpoints());
                 } else {
                        expected.add(hosts.get(MOVING_NODE));
-                       assertEquals("mismatched endpoint sets", expected, 
actual);
+                       assertEquals("mismatched endpoint sets", expected, 
actual.endpoints());
                        numMoved++;
                 }
             }
@@ -648,8 +657,6 @@ public class MoveTest
             newTokens.put(movingIndex, newToken);
         }
 
-        Collection<InetAddressAndPort> endpoints;
-
         tmd = tmd.cloneAfterAllSettled();
         ss.setTokenMetadataUnsafe(tmd);
 
@@ -693,37 +700,18 @@ public class MoveTest
         *  }
         */
 
-        Multimap<InetAddressAndPort, Range<Token>> keyspace1ranges = 
keyspaceStrategyMap.get(Simple_RF1_KeyspaceName).getAddressRanges();
-        Collection<Range<Token>> ranges1 = 
keyspace1ranges.get(InetAddressAndPort.getByName("127.0.0.1"));
-        assertEquals(1, collectionSize(ranges1));
-        assertEquals(generateRange(97, 0), ranges1.iterator().next());
-        Collection<Range<Token>> ranges2 = 
keyspace1ranges.get(InetAddressAndPort.getByName("127.0.0.2"));
-        assertEquals(1, collectionSize(ranges2));
-        assertEquals(generateRange(0, 10), ranges2.iterator().next());
-        Collection<Range<Token>> ranges3 = 
keyspace1ranges.get(InetAddressAndPort.getByName("127.0.0.3"));
-        assertEquals(1, collectionSize(ranges3));
-        assertEquals(generateRange(10, 20), ranges3.iterator().next());
-        Collection<Range<Token>> ranges4 = 
keyspace1ranges.get(InetAddressAndPort.getByName("127.0.0.4"));
-        assertEquals(1, collectionSize(ranges4));
-        assertEquals(generateRange(20, 30), ranges4.iterator().next());
-        Collection<Range<Token>> ranges5 = 
keyspace1ranges.get(InetAddressAndPort.getByName("127.0.0.5"));
-        assertEquals(1, collectionSize(ranges5));
-        assertEquals(generateRange(30, 40), ranges5.iterator().next());
-        Collection<Range<Token>> ranges6 = 
keyspace1ranges.get(InetAddressAndPort.getByName("127.0.0.6"));
-        assertEquals(1, collectionSize(ranges6));
-        assertEquals(generateRange(40, 50), ranges6.iterator().next());
-        Collection<Range<Token>> ranges7 = 
keyspace1ranges.get(InetAddressAndPort.getByName("127.0.0.7"));
-        assertEquals(1, collectionSize(ranges7));
-        assertEquals(generateRange(50, 67), ranges7.iterator().next());
-        Collection<Range<Token>> ranges8 = 
keyspace1ranges.get(InetAddressAndPort.getByName("127.0.0.8"));
-        assertEquals(1, collectionSize(ranges8));
-        assertEquals(generateRange(67, 70), ranges8.iterator().next());
-        Collection<Range<Token>> ranges9 = 
keyspace1ranges.get(InetAddressAndPort.getByName("127.0.0.9"));
-        assertEquals(1, collectionSize(ranges9));
-        assertEquals(generateRange(70, 87), ranges9.iterator().next());
-        Collection<Range<Token>> ranges10 = 
keyspace1ranges.get(InetAddressAndPort.getByName("127.0.0.10"));
-        assertEquals(1, collectionSize(ranges10));
-        assertEquals(generateRange(87, 97), ranges10.iterator().next());
+        RangesByEndpoint keyspace1ranges = 
keyspaceStrategyMap.get(Simple_RF1_KeyspaceName).getAddressReplicas();
+
+        assertRanges(keyspace1ranges, "127.0.0.1", 97, 0);
+        assertRanges(keyspace1ranges, "127.0.0.2", 0, 10);
+        assertRanges(keyspace1ranges, "127.0.0.3", 10, 20);
+        assertRanges(keyspace1ranges, "127.0.0.4", 20, 30);
+        assertRanges(keyspace1ranges, "127.0.0.5", 30, 40);
+        assertRanges(keyspace1ranges, "127.0.0.6", 40, 50);
+        assertRanges(keyspace1ranges, "127.0.0.7", 50, 67);
+        assertRanges(keyspace1ranges, "127.0.0.8", 67, 70);
+        assertRanges(keyspace1ranges, "127.0.0.9", 70, 87);
+        assertRanges(keyspace1ranges, "127.0.0.10", 87, 97);
 
 
         /**
@@ -742,37 +730,17 @@ public class MoveTest
         * }
         */
 
-        Multimap<InetAddressAndPort, Range<Token>> keyspace3ranges = 
keyspaceStrategyMap.get(KEYSPACE3).getAddressRanges();
-        ranges1 = 
keyspace3ranges.get(InetAddressAndPort.getByName("127.0.0.1"));
-        assertEquals(collectionSize(ranges1), 5);
-        assertTrue(ranges1.equals(generateRanges(97, 0, 70, 87, 50, 67, 87, 
97, 67, 70)));
-        ranges2 = 
keyspace3ranges.get(InetAddressAndPort.getByName("127.0.0.2"));
-        assertEquals(collectionSize(ranges2), 5);
-        assertTrue(ranges2.equals(generateRanges(97, 0, 70, 87, 87, 97, 0, 10, 
67, 70)));
-        ranges3 = 
keyspace3ranges.get(InetAddressAndPort.getByName("127.0.0.3"));
-        assertEquals(collectionSize(ranges3), 5);
-        assertTrue(ranges3.equals(generateRanges(97, 0, 70, 87, 87, 97, 0, 10, 
10, 20)));
-        ranges4 = 
keyspace3ranges.get(InetAddressAndPort.getByName("127.0.0.4"));
-        assertEquals(collectionSize(ranges4), 5);
-        assertTrue(ranges4.equals(generateRanges(97, 0, 20, 30, 87, 97, 0, 10, 
10, 20)));
-        ranges5 = 
keyspace3ranges.get(InetAddressAndPort.getByName("127.0.0.5"));
-        assertEquals(collectionSize(ranges5), 5);
-        assertTrue(ranges5.equals(generateRanges(97, 0, 30, 40, 20, 30, 0, 10, 
10, 20)));
-        ranges6 = 
keyspace3ranges.get(InetAddressAndPort.getByName("127.0.0.6"));
-        assertEquals(collectionSize(ranges6), 5);
-        assertTrue(ranges6.equals(generateRanges(40, 50, 30, 40, 20, 30, 0, 
10, 10, 20)));
-        ranges7 = 
keyspace3ranges.get(InetAddressAndPort.getByName("127.0.0.7"));
-        assertEquals(collectionSize(ranges7), 5);
-        assertTrue(ranges7.equals(generateRanges(40, 50, 30, 40, 50, 67, 20, 
30, 10, 20)));
-        ranges8 = 
keyspace3ranges.get(InetAddressAndPort.getByName("127.0.0.8"));
-        assertEquals(collectionSize(ranges8), 5);
-        assertTrue(ranges8.equals(generateRanges(40, 50, 30, 40, 50, 67, 20, 
30, 67, 70)));
-        ranges9 = 
keyspace3ranges.get(InetAddressAndPort.getByName("127.0.0.9"));
-        assertEquals(collectionSize(ranges9), 5);
-        assertTrue(ranges9.equals(generateRanges(40, 50, 70, 87, 30, 40, 50, 
67, 67, 70)));
-        ranges10 = 
keyspace3ranges.get(InetAddressAndPort.getByName("127.0.0.10"));
-        assertEquals(collectionSize(ranges10), 5);
-        assertTrue(ranges10.equals(generateRanges(40, 50, 70, 87, 50, 67, 87, 
97, 67, 70)));
+        RangesByEndpoint keyspace3ranges = 
keyspaceStrategyMap.get(KEYSPACE3).getAddressReplicas();
+        assertRanges(keyspace3ranges, "127.0.0.1", 97, 0, 70, 87, 50, 67, 87, 
97, 67, 70);
+        assertRanges(keyspace3ranges, "127.0.0.2", 97, 0, 70, 87, 87, 97, 0, 
10, 67, 70);
+        assertRanges(keyspace3ranges, "127.0.0.3", 97, 0, 70, 87, 87, 97, 0, 
10, 10, 20);
+        assertRanges(keyspace3ranges, "127.0.0.4", 97, 0, 20, 30, 87, 97, 0, 
10, 10, 20);
+        assertRanges(keyspace3ranges, "127.0.0.5", 97, 0, 30, 40, 20, 30, 0, 
10, 10, 20);
+        assertRanges(keyspace3ranges, "127.0.0.6", 40, 50, 30, 40, 20, 30, 0, 
10, 10, 20);
+        assertRanges(keyspace3ranges, "127.0.0.7", 40, 50, 30, 40, 50, 67, 20, 
30, 10, 20);
+        assertRanges(keyspace3ranges, "127.0.0.8", 40, 50, 30, 40, 50, 67, 20, 
30, 67, 70);
+        assertRanges(keyspace3ranges, "127.0.0.9", 40, 50, 70, 87, 30, 40, 50, 
67, 67, 70);
+        assertRanges(keyspace3ranges, "127.0.0.10", 40, 50, 70, 87, 50, 67, 
87, 97, 67, 70);
 
 
         /**
@@ -790,37 +758,18 @@ public class MoveTest
          *      /127.0.0.10=[(70,87], (87,97], (67,70]]
          *  }
          */
-        Multimap<InetAddressAndPort, Range<Token>> keyspace4ranges = 
keyspaceStrategyMap.get(Simple_RF3_KeyspaceName).getAddressRanges();
-        ranges1 = 
keyspace4ranges.get(InetAddressAndPort.getByName("127.0.0.1"));
-        assertEquals(collectionSize(ranges1), 3);
-        assertTrue(ranges1.equals(generateRanges(97, 0, 70, 87, 87, 97)));
-        ranges2 = 
keyspace4ranges.get(InetAddressAndPort.getByName("127.0.0.2"));
-        assertEquals(collectionSize(ranges2), 3);
-        assertTrue(ranges2.equals(generateRanges(97, 0, 87, 97, 0, 10)));
-        ranges3 = 
keyspace4ranges.get(InetAddressAndPort.getByName("127.0.0.3"));
-        assertEquals(collectionSize(ranges3), 3);
-        assertTrue(ranges3.equals(generateRanges(97, 0, 0, 10, 10, 20)));
-        ranges4 = 
keyspace4ranges.get(InetAddressAndPort.getByName("127.0.0.4"));
-        assertEquals(collectionSize(ranges4), 3);
-        assertTrue(ranges4.equals(generateRanges(20, 30, 0, 10, 10, 20)));
-        ranges5 = 
keyspace4ranges.get(InetAddressAndPort.getByName("127.0.0.5"));
-        assertEquals(collectionSize(ranges5), 3);
-        assertTrue(ranges5.equals(generateRanges(30, 40, 20, 30, 10, 20)));
-        ranges6 = 
keyspace4ranges.get(InetAddressAndPort.getByName("127.0.0.6"));
-        assertEquals(collectionSize(ranges6), 3);
-        assertTrue(ranges6.equals(generateRanges(40, 50, 30, 40, 20, 30)));
-        ranges7 = 
keyspace4ranges.get(InetAddressAndPort.getByName("127.0.0.7"));
-        assertEquals(collectionSize(ranges7), 3);
-        assertTrue(ranges7.equals(generateRanges(40, 50, 30, 40, 50, 67)));
-        ranges8 = 
keyspace4ranges.get(InetAddressAndPort.getByName("127.0.0.8"));
-        assertEquals(collectionSize(ranges8), 3);
-        assertTrue(ranges8.equals(generateRanges(40, 50, 50, 67, 67, 70)));
-        ranges9 = 
keyspace4ranges.get(InetAddressAndPort.getByName("127.0.0.9"));
-        assertEquals(collectionSize(ranges9), 3);
-        assertTrue(ranges9.equals(generateRanges(70, 87, 50, 67, 67, 70)));
-        ranges10 = 
keyspace4ranges.get(InetAddressAndPort.getByName("127.0.0.10"));
-        assertEquals(collectionSize(ranges10), 3);
-        assertTrue(ranges10.equals(generateRanges(70, 87, 87, 97, 67, 70)));
+        RangesByEndpoint keyspace4ranges = 
keyspaceStrategyMap.get(Simple_RF3_KeyspaceName).getAddressReplicas();
+
+        assertRanges(keyspace4ranges, "127.0.0.1", 97, 0, 70, 87, 87, 97);
+        assertRanges(keyspace4ranges, "127.0.0.2", 97, 0, 87, 97, 0, 10);
+        assertRanges(keyspace4ranges, "127.0.0.3", 97, 0, 0, 10, 10, 20);
+        assertRanges(keyspace4ranges, "127.0.0.4", 20, 30, 0, 10, 10, 20);
+        assertRanges(keyspace4ranges, "127.0.0.5", 30, 40, 20, 30, 10, 20);
+        assertRanges(keyspace4ranges, "127.0.0.6", 40, 50, 30, 40, 20, 30);
+        assertRanges(keyspace4ranges, "127.0.0.7", 40, 50, 30, 40, 50, 67);
+        assertRanges(keyspace4ranges, "127.0.0.8", 40, 50, 50, 67, 67, 70);
+        assertRanges(keyspace4ranges, "127.0.0.9", 70, 87, 50, 67, 67, 70);
+        assertRanges(keyspace4ranges, "127.0.0.10", 70, 87, 87, 97, 67, 70);
 
         // pre-calculate the results.
         Map<String, Multimap<Token, InetAddressAndPort>> expectedEndpoints = 
new HashMap<>();
@@ -876,79 +825,80 @@ public class MoveTest
 
             for (Token token : keyTokens)
             {
-                endpoints = tmd.getWriteEndpoints(token, keyspaceName, 
strategy.getNaturalEndpoints(token));
+                Collection<InetAddressAndPort> endpoints = 
tmd.getWriteEndpoints(token, keyspaceName, 
strategy.getNaturalReplicasForToken(token)).endpoints();
                 
assertEquals(expectedEndpoints.get(keyspaceName).get(token).size(), 
endpoints.size());
                 
assertTrue(expectedEndpoints.get(keyspaceName).get(token).containsAll(endpoints));
             }
 
             // just to be sure that things still work according to the old 
tests, run them:
-            if (strategy.getReplicationFactor() != 3)
+            if (strategy.getReplicationFactor().allReplicas != 3)
                 continue;
 
+            ReplicaCollection<?> replicas = null;
             // tokens 5, 15 and 25 should go three nodes
             for (int i = 0; i < 3; i++)
             {
-                endpoints = tmd.getWriteEndpoints(keyTokens.get(i), 
keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(i)));
-                assertEquals(3, endpoints.size());
-                assertTrue(endpoints.contains(hosts.get(i+1)));
-                assertTrue(endpoints.contains(hosts.get(i+2)));
-                assertTrue(endpoints.contains(hosts.get(i+3)));
+                replicas = tmd.getWriteEndpoints(keyTokens.get(i), 
keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(i)));
+                assertEquals(3, replicas.size());
+                assertTrue(replicas.endpoints().contains(hosts.get(i + 1)));
+                assertTrue(replicas.endpoints().contains(hosts.get(i + 2)));
+                assertTrue(replicas.endpoints().contains(hosts.get(i + 3)));
             }
 
             // token 35 should go to nodes 4, 5, 6 and boot1
-            endpoints = tmd.getWriteEndpoints(keyTokens.get(3), keyspaceName, 
strategy.getNaturalEndpoints(keyTokens.get(3)));
-            assertEquals(4, endpoints.size());
-            assertTrue(endpoints.contains(hosts.get(4)));
-            assertTrue(endpoints.contains(hosts.get(5)));
-            assertTrue(endpoints.contains(hosts.get(6)));
-            assertTrue(endpoints.contains(boot1));
+            replicas = tmd.getWriteEndpoints(keyTokens.get(3), keyspaceName, 
strategy.getNaturalReplicasForToken(keyTokens.get(3)));
+            assertEquals(4, replicas.size());
+            assertTrue(replicas.endpoints().contains(hosts.get(4)));
+            assertTrue(replicas.endpoints().contains(hosts.get(5)));
+            assertTrue(replicas.endpoints().contains(hosts.get(6)));
+            assertTrue(replicas.endpoints().contains(boot1));
 
             // token 45 should go to nodes 5, 6, 7 boot1
-            endpoints = tmd.getWriteEndpoints(keyTokens.get(4), keyspaceName, 
strategy.getNaturalEndpoints(keyTokens.get(4)));
-            assertEquals(4, endpoints.size());
-            assertTrue(endpoints.contains(hosts.get(5)));
-            assertTrue(endpoints.contains(hosts.get(6)));
-            assertTrue(endpoints.contains(hosts.get(7)));
-            assertTrue(endpoints.contains(boot1));
+            replicas = tmd.getWriteEndpoints(keyTokens.get(4), keyspaceName, 
strategy.getNaturalReplicasForToken(keyTokens.get(4)));
+            assertEquals(4, replicas.size());
+            assertTrue(replicas.endpoints().contains(hosts.get(5)));
+            assertTrue(replicas.endpoints().contains(hosts.get(6)));
+            assertTrue(replicas.endpoints().contains(hosts.get(7)));
+            assertTrue(replicas.endpoints().contains(boot1));
 
             // token 55 should go to nodes 6, 7, 8 boot1 and boot2
-            endpoints = tmd.getWriteEndpoints(keyTokens.get(5), keyspaceName, 
strategy.getNaturalEndpoints(keyTokens.get(5)));
-            assertEquals(5, endpoints.size());
-            assertTrue(endpoints.contains(hosts.get(6)));
-            assertTrue(endpoints.contains(hosts.get(7)));
-            assertTrue(endpoints.contains(hosts.get(8)));
-            assertTrue(endpoints.contains(boot1));
-            assertTrue(endpoints.contains(boot2));
+            replicas = tmd.getWriteEndpoints(keyTokens.get(5), keyspaceName, 
strategy.getNaturalReplicasForToken(keyTokens.get(5)));
+            assertEquals(5, replicas.size());
+            assertTrue(replicas.endpoints().contains(hosts.get(6)));
+            assertTrue(replicas.endpoints().contains(hosts.get(7)));
+            assertTrue(replicas.endpoints().contains(hosts.get(8)));
+            assertTrue(replicas.endpoints().contains(boot1));
+            assertTrue(replicas.endpoints().contains(boot2));
 
             // token 65 should go to nodes 6, 7, 8 and boot2
-            endpoints = tmd.getWriteEndpoints(keyTokens.get(6), keyspaceName, 
strategy.getNaturalEndpoints(keyTokens.get(6)));
-            assertEquals(4, endpoints.size());
-            assertTrue(endpoints.contains(hosts.get(6)));
-            assertTrue(endpoints.contains(hosts.get(7)));
-            assertTrue(endpoints.contains(hosts.get(8)));
-            assertTrue(endpoints.contains(boot2));
+            replicas = tmd.getWriteEndpoints(keyTokens.get(6), keyspaceName, 
strategy.getNaturalReplicasForToken(keyTokens.get(6)));
+            assertEquals(4, replicas.size());
+            assertTrue(replicas.endpoints().contains(hosts.get(6)));
+            assertTrue(replicas.endpoints().contains(hosts.get(7)));
+            assertTrue(replicas.endpoints().contains(hosts.get(8)));
+            assertTrue(replicas.endpoints().contains(boot2));
 
             // token 75 should to go nodes 8, 9, 0 and boot2
-            endpoints = tmd.getWriteEndpoints(keyTokens.get(7), keyspaceName, 
strategy.getNaturalEndpoints(keyTokens.get(7)));
-            assertEquals(4, endpoints.size());
-            assertTrue(endpoints.contains(hosts.get(8)));
-            assertTrue(endpoints.contains(hosts.get(9)));
-            assertTrue(endpoints.contains(hosts.get(0)));
-            assertTrue(endpoints.contains(boot2));
+            replicas = tmd.getWriteEndpoints(keyTokens.get(7), keyspaceName, 
strategy.getNaturalReplicasForToken(keyTokens.get(7)));
+            assertEquals(4, replicas.size());
+            assertTrue(replicas.endpoints().contains(hosts.get(8)));
+            assertTrue(replicas.endpoints().contains(hosts.get(9)));
+            assertTrue(replicas.endpoints().contains(hosts.get(0)));
+            assertTrue(replicas.endpoints().contains(boot2));
 
             // token 85 should go to nodes 8, 9 and 0
-            endpoints = tmd.getWriteEndpoints(keyTokens.get(8), keyspaceName, 
strategy.getNaturalEndpoints(keyTokens.get(8)));
-            assertEquals(3, endpoints.size());
-            assertTrue(endpoints.contains(hosts.get(8)));
-            assertTrue(endpoints.contains(hosts.get(9)));
-            assertTrue(endpoints.contains(hosts.get(0)));
+            replicas = tmd.getWriteEndpoints(keyTokens.get(8), keyspaceName, 
strategy.getNaturalReplicasForToken(keyTokens.get(8)));
+            assertEquals(3, replicas.size());
+            assertTrue(replicas.endpoints().contains(hosts.get(8)));
+            assertTrue(replicas.endpoints().contains(hosts.get(9)));
+            assertTrue(replicas.endpoints().contains(hosts.get(0)));
 
             // token 95 should go to nodes 9, 0 and 1
-            endpoints = tmd.getWriteEndpoints(keyTokens.get(9), keyspaceName, 
strategy.getNaturalEndpoints(keyTokens.get(9)));
-            assertEquals(3, endpoints.size());
-            assertTrue(endpoints.contains(hosts.get(9)));
-            assertTrue(endpoints.contains(hosts.get(0)));
-            assertTrue(endpoints.contains(hosts.get(1)));
+            replicas = tmd.getWriteEndpoints(keyTokens.get(9), keyspaceName, 
strategy.getNaturalReplicasForToken(keyTokens.get(9)));
+            assertEquals(3, replicas.size());
+            assertTrue(replicas.endpoints().contains(hosts.get(9)));
+            assertTrue(replicas.endpoints().contains(hosts.get(0)));
+            assertTrue(replicas.endpoints().contains(hosts.get(1)));
         }
 
         // all moving nodes are back to the normal state
@@ -1009,6 +959,14 @@ public class MoveTest
         return addrs;
     }
 
+    private static EndpointsForRange makeReplicas(Range<Token> range, 
String... hosts) throws UnknownHostException
+    {
+        EndpointsForRange.Builder replicas = EndpointsForRange.builder(range, 
hosts.length);
+        for (String host : hosts)
+            
replicas.add(Replica.fullReplica(InetAddressAndPort.getByName(host), range));
+        return replicas.build();
+    }
+
     private AbstractReplicationStrategy getStrategy(String keyspaceName, 
TokenMetadata tmd)
     {
         KeyspaceMetadata ksmd = 
Schema.instance.getKeyspaceMetadata(keyspaceName);
@@ -1025,7 +983,7 @@ public class MoveTest
         return new BigIntegerToken(String.valueOf(10 * position + 7));
     }
 
-    private int collectionSize(Collection<?> collection)
+    private static int collectionSize(Collection<?> collection)
     {
         if (collection.isEmpty())
             return 0;
@@ -1057,8 +1015,52 @@ public class MoveTest
         return ranges;
     }
 
-    private Range<Token> generateRange(int left, int right)
+    private static Token tk(int v)
+    {
+        return new BigIntegerToken(String.valueOf(v));
+    }
+
+    private static Range<Token> generateRange(int left, int right)
+    {
+        return new Range<Token>(tk(left), tk(right));
+    }
+
+    private static Replica replica(InetAddressAndPort endpoint, int left, int 
right, boolean full)
+    {
+        return new Replica(endpoint, tk(left), tk(right), full);
+    }
+
+    private static InetAddressAndPort inet(String name)
     {
-        return new Range<Token>(new BigIntegerToken(String.valueOf(left)), new 
BigIntegerToken(String.valueOf(right)));
+        try
+        {
+            return InetAddressAndPort.getByName(name);
+        }
+        catch (UnknownHostException e)
+        {
+            throw new AssertionError(e);
+        }
+    }
+
+    private static Replica replica(InetAddressAndPort endpoint, int left, int 
right)
+    {
+        return replica(endpoint, left, right, true);
+    }
+
+    private static void assertRanges(RangesByEndpoint epReplicas, String 
endpoint, int... rangePairs)
+    {
+        if (rangePairs.length % 2 == 1)
+            throw new RuntimeException("assertRanges argument count should be 
even");
+
+        InetAddressAndPort ep = inet(endpoint);
+        List<Replica> expected = new ArrayList<>(rangePairs.length/2);
+        for (int i=0; i<rangePairs.length; i+=2)
+            expected.add(replica(ep, rangePairs[i], rangePairs[i+1]));
+
+        RangesAtEndpoint actual = epReplicas.get(ep);
+        assertEquals(expected.size(), actual.size());
+        for (Replica replica : expected)
+            if (!actual.contains(replica))
+                assertEquals(RangesAtEndpoint.copyOf(expected), actual);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/MoveTransientTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/MoveTransientTest.java 
b/test/unit/org/apache/cassandra/service/MoveTransientTest.java
new file mode 100644
index 0000000..1e24735
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/MoveTransientTest.java
@@ -0,0 +1,638 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.base.Predicate;
+import org.apache.cassandra.locator.EndpointsByReplica;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.locator.RangesByEndpoint;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.RandomPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.RangeStreamer;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.AbstractEndpointSnitch;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.utils.Pair;
+
+import static org.apache.cassandra.locator.Replica.fullReplica;
+import static org.apache.cassandra.locator.Replica.transientReplica;
+import static 
org.apache.cassandra.service.StorageServiceTest.assertMultimapEqualsIgnoreOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This is also fairly effectively testing source retrieval for bootstrap as 
well since RangeStreamer
+ * is used to calculate the endpoints to fetch from and check they are alive 
for both RangeRelocator (move) and
+ * bootstrap (RangeRelocator).
+ */
+public class MoveTransientTest
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(MoveTransientTest.class);
+
+    static InetAddressAndPort aAddress;
+    static InetAddressAndPort bAddress;
+    static InetAddressAndPort cAddress;
+    static InetAddressAndPort dAddress;
+    static InetAddressAndPort eAddress;
+
+    @BeforeClass
+    public static void setUpClass() throws Exception
+    {
+        aAddress = InetAddressAndPort.getByName("127.0.0.1");
+        bAddress = InetAddressAndPort.getByName("127.0.0.2");
+        cAddress = InetAddressAndPort.getByName("127.0.0.3");
+        dAddress = InetAddressAndPort.getByName("127.0.0.4");
+        eAddress = InetAddressAndPort.getByName("127.0.0.5");
+    }
+
+    private final List<InetAddressAndPort> downNodes = new ArrayList();
+    Predicate<Replica> alivePredicate = replica -> 
!downNodes.contains(replica.endpoint());
+
+    private final List<InetAddressAndPort> sourceFilterDownNodes = new 
ArrayList<>();
+    private final Collection<Predicate<Replica>> sourceFilters = 
Collections.singleton(replica -> 
!sourceFilterDownNodes.contains(replica.endpoint()));
+
+    @After
+    public void clearDownNode()
+    {
+        downNodes.clear();
+        sourceFilterDownNodes.clear();
+    }
+
+    @BeforeClass
+    public static void setupDD()
+    {
+        DatabaseDescriptor.daemonInitialization();
+    }
+
+    Token oneToken = new RandomPartitioner.BigIntegerToken("1");
+    Token twoToken = new RandomPartitioner.BigIntegerToken("2");
+    Token threeToken = new RandomPartitioner.BigIntegerToken("3");
+    Token fourToken = new RandomPartitioner.BigIntegerToken("4");
+    Token sixToken = new RandomPartitioner.BigIntegerToken("6");
+    Token sevenToken = new RandomPartitioner.BigIntegerToken("7");
+    Token nineToken = new RandomPartitioner.BigIntegerToken("9");
+    Token elevenToken = new RandomPartitioner.BigIntegerToken("11");
+    Token fourteenToken = new RandomPartitioner.BigIntegerToken("14");
+
+    Range<Token> aRange = new Range(oneToken, threeToken);
+    Range<Token> bRange = new Range(threeToken, sixToken);
+    Range<Token> cRange = new Range(sixToken, nineToken);
+    Range<Token> dRange = new Range(nineToken, elevenToken);
+    Range<Token> eRange = new Range(elevenToken, oneToken);
+
+
+    RangesAtEndpoint current = RangesAtEndpoint.of(new Replica(aAddress, 
aRange, true),
+                                       new Replica(aAddress, eRange, true),
+                                       new Replica(aAddress, dRange, false));
+
+
+    /**
+     * Ring with start A 1-3 B 3-6 C 6-9 D 9-1
+     * A's token moves from 3 to 4.
+     * <p>
+     * Result is A gains some range
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testCalculateStreamAndFetchRangesMoveForward() throws Exception
+    {
+        calculateStreamAndFetchRangesMoveForward();
+    }
+
+    private Pair<RangesAtEndpoint, RangesAtEndpoint> 
calculateStreamAndFetchRangesMoveForward() throws Exception
+    {
+        Range<Token> aPrimeRange = new Range<>(oneToken, fourToken);
+
+        RangesAtEndpoint updated = RangesAtEndpoint.of(
+                new Replica(aAddress, aPrimeRange, true),
+                new Replica(aAddress, eRange, true),
+                new Replica(aAddress, dRange, false)
+        );
+
+        Pair<RangesAtEndpoint, RangesAtEndpoint> result = 
StorageService.calculateStreamAndFetchRanges(current, updated);
+        assertContentsIgnoreOrder(result.left);
+        assertContentsIgnoreOrder(result.right, fullReplica(aAddress, 
threeToken, fourToken));
+        return result;
+    }
+
+    /**
+     * Ring with start A 1-3 B 3-6 C 6-9 D 9-11 E 11-1
+     * A's token moves from 3 to 14
+     * <p>
+     * Result is A loses range and it must be streamed
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testCalculateStreamAndFetchRangesMoveBackwardBetween() throws 
Exception
+    {
+        calculateStreamAndFetchRangesMoveBackwardBetween();
+    }
+
+    public Pair<RangesAtEndpoint, RangesAtEndpoint> 
calculateStreamAndFetchRangesMoveBackwardBetween() throws Exception
+    {
+        Range<Token> aPrimeRange = new Range<>(elevenToken, fourteenToken);
+
+        RangesAtEndpoint updated = RangesAtEndpoint.of(
+            new Replica(aAddress, aPrimeRange, true),
+            new Replica(aAddress, dRange, true),
+            new Replica(aAddress, cRange, false)
+        );
+
+
+        Pair<RangesAtEndpoint, RangesAtEndpoint> result = 
StorageService.calculateStreamAndFetchRanges(current, updated);
+        assertContentsIgnoreOrder(result.left, fullReplica(aAddress, oneToken, 
threeToken), fullReplica(aAddress, fourteenToken, oneToken));
+        assertContentsIgnoreOrder(result.right, transientReplica(aAddress, 
sixToken, nineToken), fullReplica(aAddress, nineToken, elevenToken));
+        return result;
+    }
+
+    /**
+     * Ring with start A 1-3 B 3-6 C 6-9 D 9-11 E 11-1
+     * A's token moves from 3 to 2
+     *
+     * Result is A loses range and it must be streamed
+     * @throws Exception
+     */
+    @Test
+    public void testCalculateStreamAndFetchRangesMoveBackward() throws 
Exception
+    {
+        calculateStreamAndFetchRangesMoveBackward();
+    }
+
+    private Pair<RangesAtEndpoint, RangesAtEndpoint> 
calculateStreamAndFetchRangesMoveBackward() throws Exception
+    {
+        Range<Token> aPrimeRange = new Range<>(oneToken, twoToken);
+
+        RangesAtEndpoint updated = RangesAtEndpoint.of(
+            new Replica(aAddress, aPrimeRange, true),
+            new Replica(aAddress, eRange, true),
+            new Replica(aAddress, dRange, false)
+        );
+
+        Pair<RangesAtEndpoint, RangesAtEndpoint> result = 
StorageService.calculateStreamAndFetchRanges(current, updated);
+
+        //Moving backwards has no impact on any replica. We already fully 
replicate counter clockwise
+        //The transient replica does transiently replicate slightly more, but 
that is addressed by cleanup
+        assertContentsIgnoreOrder(result.left, fullReplica(aAddress, twoToken, 
threeToken));
+        assertContentsIgnoreOrder(result.right);
+
+        return result;
+    }
+
+    /**
+     * Ring with start A 1-3 B 3-6 C 6-9 D 9-11 E 11-1
+     * A's moves from 3 to 7
+     *
+     * @throws Exception
+     */
+    private Pair<RangesAtEndpoint, RangesAtEndpoint> 
calculateStreamAndFetchRangesMoveForwardBetween() throws Exception
+    {
+        Range<Token> aPrimeRange = new Range<>(sixToken, sevenToken);
+        Range<Token> bPrimeRange = new Range<>(oneToken, sixToken);
+
+
+        RangesAtEndpoint updated = RangesAtEndpoint.of(
+            new Replica(aAddress, aPrimeRange, true),
+            new Replica(aAddress, bPrimeRange, true),
+            new Replica(aAddress, eRange, false)
+        );
+
+        Pair<RangesAtEndpoint, RangesAtEndpoint> result = 
StorageService.calculateStreamAndFetchRanges(current, updated);
+
+        assertContentsIgnoreOrder(result.left, fullReplica(aAddress, 
elevenToken, oneToken), transientReplica(aAddress, nineToken, elevenToken));
+        assertContentsIgnoreOrder(result.right, fullReplica(aAddress, 
threeToken, sixToken), fullReplica(aAddress, sixToken, sevenToken));
+        return result;
+    }
+
+    /**
+     * Ring with start A 1-3 B 3-6 C 6-9 D 9-11 E 11-1
+     * A's token moves from 3 to 7
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testCalculateStreamAndFetchRangesMoveForwardBetween() throws 
Exception
+    {
+        calculateStreamAndFetchRangesMoveForwardBetween();
+    }
+
+    /**
+     * Construct the ring state for 
calculateStreamAndFetchRangesMoveBackwardBetween
+     * Where are A moves from 3 to 14
+     * @return
+     */
+    private Pair<TokenMetadata, TokenMetadata> 
constructTMDsMoveBackwardBetween()
+    {
+        TokenMetadata tmd = new TokenMetadata();
+        tmd.updateNormalToken(aRange.right, aAddress);
+        tmd.updateNormalToken(bRange.right, bAddress);
+        tmd.updateNormalToken(cRange.right, cAddress);
+        tmd.updateNormalToken(dRange.right, dAddress);
+        tmd.updateNormalToken(eRange.right, eAddress);
+        tmd.addMovingEndpoint(fourteenToken, aAddress);
+        TokenMetadata updated = tmd.cloneAfterAllSettled();
+
+        return Pair.create(tmd, updated);
+    }
+
+
+    /**
+     * Construct the ring state for 
calculateStreamAndFetchRangesMoveForwardBetween
+     * Where are A moves from 3 to 7
+     * @return
+     */
+    private Pair<TokenMetadata, TokenMetadata> 
constructTMDsMoveForwardBetween()
+    {
+        TokenMetadata tmd = new TokenMetadata();
+        tmd.updateNormalToken(aRange.right, aAddress);
+        tmd.updateNormalToken(bRange.right, bAddress);
+        tmd.updateNormalToken(cRange.right, cAddress);
+        tmd.updateNormalToken(dRange.right, dAddress);
+        tmd.updateNormalToken(eRange.right, eAddress);
+        tmd.addMovingEndpoint(sevenToken, aAddress);
+        TokenMetadata updated = tmd.cloneAfterAllSettled();
+
+        return Pair.create(tmd, updated);
+    }
+
+    private Pair<TokenMetadata, TokenMetadata> constructTMDsMoveBackward()
+    {
+        TokenMetadata tmd = new TokenMetadata();
+        tmd.updateNormalToken(aRange.right, aAddress);
+        tmd.updateNormalToken(bRange.right, bAddress);
+        tmd.updateNormalToken(cRange.right, cAddress);
+        tmd.updateNormalToken(dRange.right, dAddress);
+        tmd.updateNormalToken(eRange.right, eAddress);
+        tmd.addMovingEndpoint(twoToken, aAddress);
+        TokenMetadata updated = tmd.cloneAfterAllSettled();
+
+        return Pair.create(tmd, updated);
+    }
+
+    private Pair<TokenMetadata, TokenMetadata> constructTMDsMoveForward()
+    {
+        TokenMetadata tmd = new TokenMetadata();
+        tmd.updateNormalToken(aRange.right, aAddress);
+        tmd.updateNormalToken(bRange.right, bAddress);
+        tmd.updateNormalToken(cRange.right, cAddress);
+        tmd.updateNormalToken(dRange.right, dAddress);
+        tmd.updateNormalToken(eRange.right, eAddress);
+        tmd.addMovingEndpoint(fourToken, aAddress);
+        TokenMetadata updated = tmd.cloneAfterAllSettled();
+
+        return Pair.create(tmd, updated);
+    }
+
+
+    @Test
+    public void 
testMoveForwardBetweenCalculateRangesToFetchWithPreferredEndpoints() throws 
Exception
+    {
+        EndpointsByReplica.Mutable expectedResult = new 
EndpointsByReplica.Mutable();
+
+        InetAddressAndPort cOrB = (downNodes.contains(cAddress) || 
sourceFilterDownNodes.contains(cAddress)) ? bAddress : cAddress;
+
+        //Need to pull the full replica and the transient replica that is 
losing the range
+        expectedResult.put(fullReplica(aAddress, sixToken, sevenToken),  
fullReplica(dAddress, sixToken, nineToken));
+        expectedResult.put(fullReplica(aAddress, sixToken, sevenToken), 
transientReplica(eAddress, sixToken, nineToken));
+
+        //Same need both here as well
+        expectedResult.put(fullReplica(aAddress, threeToken, sixToken), 
fullReplica(cOrB, threeToken, sixToken));
+        expectedResult.put(fullReplica(aAddress, threeToken, sixToken), 
transientReplica(dAddress, threeToken, sixToken));
+
+        
invokeCalculateRangesToFetchWithPreferredEndpoints(calculateStreamAndFetchRangesMoveForwardBetween().right,
+                                                           
constructTMDsMoveForwardBetween(),
+                                                           
expectedResult.asImmutableView());
+    }
+
+    @Test
+    public void 
testMoveForwardBetweenCalculateRangesToFetchWithPreferredEndpointsDownNodes() 
throws Exception
+    {
+        for (InetAddressAndPort downNode : new InetAddressAndPort[] {dAddress, 
eAddress})
+        {
+            downNodes.clear();
+            downNodes.add(downNode);
+            boolean threw = false;
+            try
+            {
+                
testMoveForwardBetweenCalculateRangesToFetchWithPreferredEndpoints();
+            }
+            catch (IllegalStateException ise)
+            {
+                ise.printStackTrace();
+                assertTrue(downNode.toString(),
+                           ise.getMessage().startsWith("A node required to 
move the data consistently is down:")
+                                    && 
ise.getMessage().contains(downNode.toString()));
+                threw = true;
+            }
+            assertTrue("Didn't throw for " + downNode, threw);
+        }
+
+        //Shouldn't throw because another full replica is available
+        downNodes.clear();
+        downNodes.add(cAddress);
+        testMoveForwardBetweenCalculateRangesToFetchWithPreferredEndpoints();
+    }
+
+    @Test
+    public void 
testMoveForwardBetweenCalculateRangesToFetchWithPreferredEndpointsDownNodesSourceFilter()
 throws Exception
+    {
+        for (InetAddressAndPort downNode : new InetAddressAndPort[] {dAddress, 
eAddress})
+        {
+            sourceFilterDownNodes.clear();
+            sourceFilterDownNodes.add(downNode);
+            boolean threw = false;
+            try
+            {
+                
testMoveForwardBetweenCalculateRangesToFetchWithPreferredEndpoints();
+            }
+            catch (IllegalStateException ise)
+            {
+                ise.printStackTrace();
+                assertTrue(downNode.toString(),
+                           ise.getMessage().startsWith("Necessary replicas for 
strict consistency were removed by source filters:")
+                           && ise.getMessage().contains(downNode.toString()));
+                threw = true;
+            }
+            assertTrue("Didn't throw for " + downNode, threw);
+        }
+
+        //Shouldn't throw because another full replica is available
+        sourceFilterDownNodes.clear();
+        sourceFilterDownNodes.add(cAddress);
+        testMoveForwardBetweenCalculateRangesToFetchWithPreferredEndpoints();
+    }
+
+    @Test
+    public void 
testMoveBackwardBetweenCalculateRangesToFetchWithPreferredEndpoints() throws 
Exception
+    {
+        EndpointsByReplica.Mutable expectedResult = new 
EndpointsByReplica.Mutable();
+
+        //Need to pull the full replica and the transient replica that is 
losing the range
+        expectedResult.put(fullReplica(aAddress, nineToken, elevenToken), 
fullReplica(eAddress, nineToken, elevenToken));
+        expectedResult.put(transientReplica(aAddress, sixToken, nineToken), 
transientReplica(eAddress, sixToken, nineToken));
+
+        
invokeCalculateRangesToFetchWithPreferredEndpoints(calculateStreamAndFetchRangesMoveBackwardBetween().right,
+                                                           
constructTMDsMoveBackwardBetween(),
+                                                           
expectedResult.asImmutableView());
+
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void 
testMoveBackwardBetweenCalculateRangesToFetchWithPreferredEndpointsDownNodes() 
throws Exception
+    {
+        //Any replica can be the full replica so this will always fail on the 
transient range
+        downNodes.add(eAddress);
+        testMoveBackwardBetweenCalculateRangesToFetchWithPreferredEndpoints();
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void 
testMoveBackwardBetweenCalculateRangesToFetchWithPreferredEndpointsDownNodesSourceFilter()
 throws Exception
+    {
+        //Any replica can be the full replica so this will always fail on the 
transient range
+        sourceFilterDownNodes.add(eAddress);
+        testMoveBackwardBetweenCalculateRangesToFetchWithPreferredEndpoints();
+    }
+
+
+    //There is no down node version of this test because nothing needs to be 
fetched
+    @Test
+    public void testMoveBackwardCalculateRangesToFetchWithPreferredEndpoints() 
throws Exception
+    {
+        //Moving backwards should fetch nothing and fetch ranges is emptys so 
this doesn't test a ton
+        EndpointsByReplica.Mutable expectedResult = new 
EndpointsByReplica.Mutable();
+
+        
invokeCalculateRangesToFetchWithPreferredEndpoints(calculateStreamAndFetchRangesMoveBackward().right,
+                                                           
constructTMDsMoveBackward(),
+                                                           
expectedResult.asImmutableView());
+
+    }
+
+    @Test
+    public void testMoveForwardCalculateRangesToFetchWithPreferredEndpoints() 
throws Exception
+    {
+        EndpointsByReplica.Mutable expectedResult = new 
EndpointsByReplica.Mutable();
+
+        InetAddressAndPort cOrBAddress = (downNodes.contains(cAddress) || 
sourceFilterDownNodes.contains(cAddress)) ? bAddress : cAddress;
+
+        //Need to pull the full replica and the transient replica that is 
losing the range
+        expectedResult.put(fullReplica(aAddress, threeToken, fourToken), 
fullReplica(cOrBAddress, threeToken, sixToken));
+        expectedResult.put(fullReplica(aAddress, threeToken, fourToken), 
transientReplica(dAddress, threeToken, sixToken));
+
+        
invokeCalculateRangesToFetchWithPreferredEndpoints(calculateStreamAndFetchRangesMoveForward().right,
+                                                           
constructTMDsMoveForward(),
+                                                           
expectedResult.asImmutableView());
+
+    }
+
+    @Test
+    public void 
testMoveForwardCalculateRangesToFetchWithPreferredEndpointsDownNodes() throws 
Exception
+    {
+        downNodes.add(dAddress);
+        boolean threw = false;
+        try
+        {
+            testMoveForwardCalculateRangesToFetchWithPreferredEndpoints();
+        }
+        catch (IllegalStateException ise)
+        {
+            ise.printStackTrace();
+            assertTrue(dAddress.toString(),
+                       ise.getMessage().startsWith("A node required to move 
the data consistently is down:")
+                       && ise.getMessage().contains(dAddress.toString()));
+            threw = true;
+        }
+        assertTrue("Didn't throw for " + dAddress, threw);
+
+        //Shouldn't throw because another full replica is available
+        downNodes.clear();
+        downNodes.add(cAddress);
+        testMoveForwardBetweenCalculateRangesToFetchWithPreferredEndpoints();
+    }
+
+    @Test
+    public void 
testMoveForwardCalculateRangesToFetchWithPreferredEndpointsDownNodesSourceFilter()
 throws Exception
+    {
+        sourceFilterDownNodes.add(dAddress);
+        boolean threw = false;
+        try
+        {
+            testMoveForwardCalculateRangesToFetchWithPreferredEndpoints();
+        }
+        catch (IllegalStateException ise)
+        {
+            ise.printStackTrace();
+            assertTrue(dAddress.toString(),
+                       ise.getMessage().startsWith("Necessary replicas for 
strict consistency were removed by source filters:")
+                       && ise.getMessage().contains(dAddress.toString()));
+            threw = true;
+        }
+        assertTrue("Didn't throw for " + dAddress, threw);
+
+        //Shouldn't throw because another full replica is available
+        sourceFilterDownNodes.clear();
+        sourceFilterDownNodes.add(cAddress);
+        testMoveForwardBetweenCalculateRangesToFetchWithPreferredEndpoints();
+    }
+
+    private void 
invokeCalculateRangesToFetchWithPreferredEndpoints(RangesAtEndpoint toFetch,
+                                                                    
Pair<TokenMetadata, TokenMetadata> tmds,
+                                                                    
EndpointsByReplica expectedResult)
+    {
+        DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true);
+
+        EndpointsByReplica result = 
RangeStreamer.calculateRangesToFetchWithPreferredEndpoints((address, replicas) 
-> replicas.sorted((a, b) -> b.endpoint().compareTo(a.endpoint())),
+                                                                               
                                    simpleStrategy(tmds.left),
+                                                                               
                                    toFetch,
+                                                                               
                                    true,
+                                                                               
                                    tmds.left,
+                                                                               
                                    tmds.right,
+                                                                               
                                    alivePredicate,
+                                                                               
                                    "OldNetworkTopologyStrategyTest",
+                                                                               
                                    sourceFilters);
+        logger.info("Ranges to fetch with preferred endpoints");
+        logger.info(result.toString());
+        assertMultimapEqualsIgnoreOrder(expectedResult, result);
+
+    }
+
+    private AbstractReplicationStrategy simpleStrategy(TokenMetadata tmd)
+    {
+        IEndpointSnitch snitch = new AbstractEndpointSnitch()
+        {
+            public int compareEndpoints(InetAddressAndPort target, Replica r1, 
Replica r2)
+            {
+                return 0;
+            }
+
+            public String getRack(InetAddressAndPort endpoint)
+            {
+                return "R1";
+            }
+
+            public String getDatacenter(InetAddressAndPort endpoint)
+            {
+                return "DC1";
+            }
+        };
+
+        return new SimpleStrategy("MoveTransientTest",
+                                  tmd,
+                                  snitch,
+                                  
com.google.common.collect.ImmutableMap.of("replication_factor", "3/1"));
+    }
+
+    @Test
+    public void 
testMoveForwardBetweenCalculateRangesToStreamWithPreferredEndpoints() throws 
Exception
+    {
+        DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true);
+        RangesByEndpoint.Mutable expectedResult = new 
RangesByEndpoint.Mutable();
+
+        //Need to pull the full replica and the transient replica that is 
losing the range
+        expectedResult.put(bAddress, transientReplica(bAddress, nineToken, 
elevenToken));
+        expectedResult.put(bAddress, fullReplica(bAddress, elevenToken, 
oneToken));
+
+        
invokeCalculateRangesToStreamWithPreferredEndpoints(calculateStreamAndFetchRangesMoveForwardBetween().left,
+                                                            
constructTMDsMoveForwardBetween(),
+                                                            
expectedResult.asImmutableView());
+    }
+
+    @Test
+    public void 
testMoveBackwardBetweenCalculateRangesToStreamWithPreferredEndpoints() throws 
Exception
+    {
+        RangesByEndpoint.Mutable expectedResult = new 
RangesByEndpoint.Mutable();
+
+        expectedResult.put(bAddress, fullReplica(bAddress, fourteenToken, 
oneToken));
+
+        expectedResult.put(dAddress, transientReplica(dAddress, oneToken, 
threeToken));
+
+        expectedResult.put(cAddress, fullReplica(cAddress, oneToken, 
threeToken));
+        expectedResult.put(cAddress, transientReplica(cAddress, fourteenToken, 
oneToken));
+
+        
invokeCalculateRangesToStreamWithPreferredEndpoints(calculateStreamAndFetchRangesMoveBackwardBetween().left,
+                                                            
constructTMDsMoveBackwardBetween(),
+                                                            
expectedResult.asImmutableView());
+    }
+
+    @Test
+    public void 
testMoveBackwardCalculateRangesToStreamWithPreferredEndpoints() throws Exception
+    {
+        RangesByEndpoint.Mutable expectedResult = new 
RangesByEndpoint.Mutable();
+        expectedResult.put(cAddress, fullReplica(cAddress, twoToken, 
threeToken));
+        expectedResult.put(dAddress, transientReplica(dAddress, twoToken, 
threeToken));
+
+        
invokeCalculateRangesToStreamWithPreferredEndpoints(calculateStreamAndFetchRangesMoveBackward().left,
+                                                            
constructTMDsMoveBackward(),
+                                                            
expectedResult.asImmutableView());
+    }
+
+    @Test
+    public void testMoveForwardCalculateRangesToStreamWithPreferredEndpoints() 
throws Exception
+    {
+        //Nothing to stream moving forward because we are acquiring more range 
not losing range
+        RangesByEndpoint.Mutable expectedResult = new 
RangesByEndpoint.Mutable();
+
+        
invokeCalculateRangesToStreamWithPreferredEndpoints(calculateStreamAndFetchRangesMoveForward().left,
+                                                            
constructTMDsMoveForward(),
+                                                            
expectedResult.asImmutableView());
+    }
+
+    private void 
invokeCalculateRangesToStreamWithPreferredEndpoints(RangesAtEndpoint toStream,
+                                                                     
Pair<TokenMetadata, TokenMetadata> tmds,
+                                                                     
RangesByEndpoint expectedResult)
+    {
+        DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true);
+        StorageService.RangeRelocator relocator = new 
StorageService.RangeRelocator();
+        RangesByEndpoint result = 
relocator.calculateRangesToStreamWithEndpoints(toStream,
+                                                                               
  simpleStrategy(tmds.left),
+                                                                               
  tmds.left,
+                                                                               
  tmds.right);
+        logger.info("Ranges to stream by endpoint");
+        logger.info(result.toString());
+        assertMultimapEqualsIgnoreOrder(expectedResult, result);
+    }
+
+    private static void assertContentsIgnoreOrder(RangesAtEndpoint ranges, 
Replica ... replicas)
+    {
+        assertEquals(ranges.size(), replicas.length);
+        for (Replica replica : replicas)
+            if (!ranges.contains(replica))
+                assertEquals(RangesAtEndpoint.of(replicas), ranges);
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to