http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 9467c9a..7f4ae14 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.MatchResult;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
 import javax.annotation.Nullable;
@@ -41,9 +42,12 @@ import javax.management.openmbean.TabularDataSupport;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
 import com.google.common.collect.*;
 import com.google.common.util.concurrent.*;
 
+import org.apache.cassandra.dht.RangeStreamer.FetchReplica;
+import org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict;
 import org.apache.commons.lang3.StringUtils;
 
 import org.slf4j.Logger;
@@ -110,6 +114,8 @@ import 
org.apache.cassandra.utils.progress.ProgressEventType;
 import org.apache.cassandra.utils.progress.jmx.JMXBroadcastExecutor;
 import org.apache.cassandra.utils.progress.jmx.JMXProgressSupport;
 
+import static com.google.common.collect.Iterables.transform;
+import static com.google.common.collect.Iterables.tryFind;
 import static java.util.Arrays.asList;
 import static java.util.stream.Collectors.toList;
 import static org.apache.cassandra.index.SecondaryIndexManager.getIndexName;
@@ -164,9 +170,9 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         return isShutdown;
     }
 
-    public Collection<Range<Token>> getLocalRanges(String keyspaceName)
+    public RangesAtEndpoint getLocalReplicas(String keyspaceName)
     {
-        return getRangesForEndpoint(keyspaceName, 
FBUtilities.getBroadcastAddressAndPort());
+        return getReplicasForEndpoint(keyspaceName, 
FBUtilities.getBroadcastAddressAndPort());
     }
 
     public List<Range<Token>> getLocalAndPendingRanges(String ks)
@@ -174,9 +180,11 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         InetAddressAndPort broadcastAddress = 
FBUtilities.getBroadcastAddressAndPort();
         Keyspace keyspace = Keyspace.open(ks);
         List<Range<Token>> ranges = new ArrayList<>();
-        
ranges.addAll(keyspace.getReplicationStrategy().getAddressRanges().get(broadcastAddress));
-        ranges.addAll(getTokenMetadata().getPendingRanges(ks, 
broadcastAddress));
-        return Range.normalize(ranges);
+        for (Replica r : 
keyspace.getReplicationStrategy().getAddressReplicas(broadcastAddress))
+            ranges.add(r.range());
+        for (Replica r : getTokenMetadata().getPendingRanges(ks, 
broadcastAddress))
+            ranges.add(r.range());
+        return ranges;
     }
 
     public Collection<Range<Token>> getPrimaryRanges(String keyspace)
@@ -1225,11 +1233,11 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
             if (keyspace == null)
             {
                 for (String keyspaceName : 
Schema.instance.getNonLocalStrategyKeyspaces())
-                    streamer.addRanges(keyspaceName, 
getLocalRanges(keyspaceName));
+                    streamer.addRanges(keyspaceName, 
getLocalReplicas(keyspaceName));
             }
             else if (tokens == null)
             {
-                streamer.addRanges(keyspace, getLocalRanges(keyspace));
+                streamer.addRanges(keyspace, getLocalReplicas(keyspace));
             }
             else
             {
@@ -1251,14 +1259,16 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
                 }
 
                 // Ensure all specified ranges are actually ranges owned by 
this host
-                Collection<Range<Token>> localRanges = 
getLocalRanges(keyspace);
+                RangesAtEndpoint localReplicas = getLocalReplicas(keyspace);
+                RangesAtEndpoint.Builder streamRanges = new 
RangesAtEndpoint.Builder(FBUtilities.getBroadcastAddressAndPort(), 
ranges.size());
                 for (Range<Token> specifiedRange : ranges)
                 {
                     boolean foundParentRange = false;
-                    for (Range<Token> localRange : localRanges)
+                    for (Replica localReplica : localReplicas)
                     {
-                        if (localRange.contains(specifiedRange))
+                        if (localReplica.contains(specifiedRange))
                         {
+                            
streamRanges.add(localReplica.decorateSubrange(specifiedRange));
                             foundParentRange = true;
                             break;
                         }
@@ -1292,7 +1302,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
                     streamer.addSourceFilter(new 
RangeStreamer.WhitelistedSourcesFilter(sources));
                 }
 
-                streamer.addRanges(keyspace, ranges);
+                streamer.addRanges(keyspace, streamRanges.build());
             }
 
             StreamResultFuture resultFuture = streamer.fetchAsync();
@@ -1700,9 +1710,9 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
     {
         /* All the ranges for the tokens */
         Map<List<String>, List<String>> map = new HashMap<>();
-        for (Map.Entry<Range<Token>,List<InetAddressAndPort>> entry : 
getRangeToAddressMap(keyspace).entrySet())
+        for (Map.Entry<Range<Token>, EndpointsForRange> entry : 
getRangeToAddressMap(keyspace).entrySet())
         {
-            map.put(entry.getKey().asList(), stringify(entry.getValue(), 
withPort));
+            map.put(entry.getKey().asList(), 
Replicas.stringify(entry.getValue(), withPort));
         }
         return map;
     }
@@ -1753,12 +1763,12 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
     {
         /* All the ranges for the tokens */
         Map<List<String>, List<String>> map = new HashMap<>();
-        for (Map.Entry<Range<Token>, List<InetAddressAndPort>> entry : 
getRangeToAddressMap(keyspace).entrySet())
+        for (Map.Entry<Range<Token>, EndpointsForRange> entry : 
getRangeToAddressMap(keyspace).entrySet())
         {
             List<String> rpcaddrs = new ArrayList<>(entry.getValue().size());
-            for (InetAddressAndPort endpoint: entry.getValue())
+            for (Replica replicas: entry.getValue())
             {
-                rpcaddrs.add(getNativeaddress(endpoint, withPort));
+                rpcaddrs.add(getNativeaddress(replicas.endpoint(), withPort));
             }
             map.put(entry.getKey().asList(), rpcaddrs);
         }
@@ -1783,38 +1793,31 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
             keyspace = Schema.instance.getNonLocalStrategyKeyspaces().get(0);
 
         Map<List<String>, List<String>> map = new HashMap<>();
-        for (Map.Entry<Range<Token>, Collection<InetAddressAndPort>> entry : 
tokenMetadata.getPendingRangesMM(keyspace).asMap().entrySet())
+        for (Map.Entry<Range<Token>, EndpointsForRange> entry : 
tokenMetadata.getPendingRangesMM(keyspace).asMap().entrySet())
         {
-            List<InetAddressAndPort> l = new ArrayList<>(entry.getValue());
-            map.put(entry.getKey().asList(), stringify(l, withPort));
+            map.put(entry.getKey().asList(), 
Replicas.stringify(entry.getValue(), withPort));
         }
         return map;
     }
 
-    public Map<Range<Token>, List<InetAddressAndPort>> 
getRangeToAddressMap(String keyspace)
+    public EndpointsByRange getRangeToAddressMap(String keyspace)
     {
         return getRangeToAddressMap(keyspace, tokenMetadata.sortedTokens());
     }
 
-    public Map<Range<Token>, List<InetAddressAndPort>> 
getRangeToAddressMapInLocalDC(String keyspace)
+    public EndpointsByRange getRangeToAddressMapInLocalDC(String keyspace)
     {
-        Predicate<InetAddressAndPort> isLocalDC = new 
Predicate<InetAddressAndPort>()
-        {
-            public boolean apply(InetAddressAndPort address)
-            {
-                return isLocalDC(address);
-            }
-        };
+        Predicate<Replica> isLocalDC = replica -> 
isLocalDC(replica.endpoint());
 
-        Map<Range<Token>, List<InetAddressAndPort>> origMap = 
getRangeToAddressMap(keyspace, getTokensInLocalDC());
-        Map<Range<Token>, List<InetAddressAndPort>> filteredMap = 
Maps.newHashMap();
-        for (Map.Entry<Range<Token>, List<InetAddressAndPort>> entry : 
origMap.entrySet())
+        EndpointsByRange origMap = getRangeToAddressMap(keyspace, 
getTokensInLocalDC());
+        Map<Range<Token>, EndpointsForRange> filteredMap = Maps.newHashMap();
+        for (Map.Entry<Range<Token>, EndpointsForRange> entry : 
origMap.entrySet())
         {
-            List<InetAddressAndPort> endpointsInLocalDC = 
Lists.newArrayList(Collections2.filter(entry.getValue(), isLocalDC));
+            EndpointsForRange endpointsInLocalDC = 
entry.getValue().filter(isLocalDC);
             filteredMap.put(entry.getKey(), endpointsInLocalDC);
         }
 
-        return filteredMap;
+        return new EndpointsByRange(filteredMap);
     }
 
     private List<Token> getTokensInLocalDC()
@@ -1836,7 +1839,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         return remoteDC.equals(localDC);
     }
 
-    private Map<Range<Token>, List<InetAddressAndPort>> 
getRangeToAddressMap(String keyspace, List<Token> sortedTokens)
+    private EndpointsByRange getRangeToAddressMap(String keyspace, List<Token> 
sortedTokens)
     {
         // some people just want to get a visual representation of things. 
Allow null and set it to the first
         // non-system keyspace.
@@ -1917,13 +1920,13 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         List<TokenRange> ranges = new ArrayList<>();
         Token.TokenFactory tf = getTokenFactory();
 
-        Map<Range<Token>, List<InetAddressAndPort>> rangeToAddressMap =
+        EndpointsByRange rangeToAddressMap =
                 includeOnlyLocalDC
                         ? getRangeToAddressMapInLocalDC(keyspace)
                         : getRangeToAddressMap(keyspace);
 
-        for (Map.Entry<Range<Token>, List<InetAddressAndPort>> entry : 
rangeToAddressMap.entrySet())
-            ranges.add(TokenRange.create(tf, entry.getKey(), entry.getValue(), 
withPort));
+        for (Map.Entry<Range<Token>, EndpointsForRange> entry : 
rangeToAddressMap.entrySet())
+            ranges.add(TokenRange.create(tf, entry.getKey(), 
ImmutableList.copyOf(entry.getValue().endpoints()), withPort));
 
         return ranges;
     }
@@ -2010,14 +2013,14 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
      * @param ranges
      * @return mapping of ranges to the replicas responsible for them.
     */
-    private Map<Range<Token>, List<InetAddressAndPort>> 
constructRangeToEndpointMap(String keyspace, List<Range<Token>> ranges)
+    private EndpointsByRange constructRangeToEndpointMap(String keyspace, 
List<Range<Token>> ranges)
     {
-        Map<Range<Token>, List<InetAddressAndPort>> rangeToEndpointMap = new 
HashMap<>(ranges.size());
+        Map<Range<Token>, EndpointsForRange> rangeToEndpointMap = new 
HashMap<>(ranges.size());
         for (Range<Token> range : ranges)
         {
-            rangeToEndpointMap.put(range, 
Keyspace.open(keyspace).getReplicationStrategy().getNaturalEndpoints(range.right));
+            rangeToEndpointMap.put(range, 
Keyspace.open(keyspace).getReplicationStrategy().getNaturalReplicas(range.right));
         }
-        return rangeToEndpointMap;
+        return new EndpointsByRange(rangeToEndpointMap);
     }
 
     public void beforeChange(InetAddressAndPort endpoint, EndpointState 
currentState, ApplicationState newStateKey, VersionedValue newValue)
@@ -2735,32 +2738,56 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
      * Finds living endpoints responsible for the given ranges
      *
      * @param keyspaceName the keyspace ranges belong to
-     * @param ranges the ranges to find sources for
+     * @param leavingReplicas the ranges to find sources for
      * @return multimap of addresses to ranges the address is responsible for
      */
-    private Multimap<InetAddressAndPort, Range<Token>> 
getNewSourceRanges(String keyspaceName, Set<Range<Token>> ranges)
+    private Multimap<InetAddressAndPort, FetchReplica> 
getNewSourceReplicas(String keyspaceName, Set<LeavingReplica> leavingReplicas)
     {
         InetAddressAndPort myAddress = 
FBUtilities.getBroadcastAddressAndPort();
-        Multimap<Range<Token>, InetAddressAndPort> rangeAddresses = 
Keyspace.open(keyspaceName).getReplicationStrategy().getRangeAddresses(tokenMetadata.cloneOnlyTokenMap());
-        Multimap<InetAddressAndPort, Range<Token>> sourceRanges = 
HashMultimap.create();
+        EndpointsByRange rangeReplicas = 
Keyspace.open(keyspaceName).getReplicationStrategy().getRangeAddresses(tokenMetadata.cloneOnlyTokenMap());
+        Multimap<InetAddressAndPort, FetchReplica> sourceRanges = 
HashMultimap.create();
         IFailureDetector failureDetector = FailureDetector.instance;
 
+        logger.debug("Getting new source replicas for {}", leavingReplicas);
+
         // find alive sources for our new ranges
-        for (Range<Token> range : ranges)
-        {
-            Collection<InetAddressAndPort> possibleRanges = 
rangeAddresses.get(range);
+        for (LeavingReplica leaver : leavingReplicas)
+        {
+            //We need this to find the replicas from before leaving to supply 
the data
+            Replica leavingReplica = leaver.leavingReplica;
+            //We need this to know what to fetch and what the transient status 
is
+            Replica ourReplica = leaver.ourReplica;
+            //If we are going to be a full replica only consider full replicas
+            Predicate<Replica> replicaFilter = ourReplica.isFull() ? 
Replica::isFull : Predicates.alwaysTrue();
+            Predicate<Replica> notSelf = replica -> 
!replica.endpoint().equals(myAddress);
+            EndpointsForRange possibleReplicas = 
rangeReplicas.get(leavingReplica.range());
+            logger.info("Possible replicas for newReplica {} are {}", 
ourReplica, possibleReplicas);
             IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
-            List<InetAddressAndPort> sources = 
snitch.getSortedListByProximity(myAddress, possibleRanges);
+            EndpointsForRange sortedPossibleReplicas = 
snitch.sortedByProximity(myAddress, possibleReplicas);
+            logger.info("Sorted possible replicas starts as {}", 
sortedPossibleReplicas);
+            Optional<Replica> myCurrentReplica = tryFind(possibleReplicas, 
replica -> replica.endpoint().equals(myAddress)).toJavaUtil();
 
-            assert (!sources.contains(myAddress));
+            boolean transientToFull = myCurrentReplica.isPresent() && 
myCurrentReplica.get().isTransient() && ourReplica.isFull();
+            assert !sortedPossibleReplicas.endpoints().contains(myAddress) || 
transientToFull : String.format("My address %s, sortedPossibleReplicas %s, 
myCurrentReplica %s, myNewReplica %s", myAddress, sortedPossibleReplicas, 
myCurrentReplica, ourReplica);
 
-            for (InetAddressAndPort source : sources)
+            //Originally this didn't log if it couldn't restore replication 
and that seems wrong
+            boolean foundLiveReplica = false;
+            for (Replica possibleReplica : 
sortedPossibleReplicas.filter(Predicates.and(replicaFilter, notSelf)))
             {
-                if (failureDetector.isAlive(source))
+                if (failureDetector.isAlive(possibleReplica.endpoint()))
                 {
-                    sourceRanges.put(source, range);
+                    foundLiveReplica = true;
+                    sourceRanges.put(possibleReplica.endpoint(), new 
FetchReplica(ourReplica, possibleReplica));
                     break;
                 }
+                else
+                {
+                    logger.debug("Skipping down replica {}", possibleReplica);
+                }
+            }
+            if (!foundLiveReplica)
+            {
+                logger.warn("Didn't find live replica to restore replication 
for " + ourReplica);
             }
         }
         return sourceRanges;
@@ -2793,6 +2820,49 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         }
     }
 
+    private static class LeavingReplica
+    {
+        //The node that is leaving
+        private final Replica leavingReplica;
+
+        //Our range and transient status
+        private final Replica ourReplica;
+
+        public LeavingReplica(Replica leavingReplica, Replica ourReplica)
+        {
+            Preconditions.checkNotNull(leavingReplica);
+            Preconditions.checkNotNull(ourReplica);
+            this.leavingReplica = leavingReplica;
+            this.ourReplica = ourReplica;
+        }
+
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            LeavingReplica that = (LeavingReplica) o;
+
+            if (!leavingReplica.equals(that.leavingReplica)) return false;
+            return ourReplica.equals(that.ourReplica);
+        }
+
+        public int hashCode()
+        {
+            int result = leavingReplica.hashCode();
+            result = 31 * result + ourReplica.hashCode();
+            return result;
+        }
+
+        public String toString()
+        {
+            return "LeavingReplica{" +
+                   "leavingReplica=" + leavingReplica +
+                   ", ourReplica=" + ourReplica +
+                   '}';
+        }
+    }
+
     /**
      * Called when an endpoint is removed from the ring. This function checks
      * whether this node becomes responsible for new ranges as a
@@ -2805,38 +2875,52 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
      */
     private void restoreReplicaCount(InetAddressAndPort endpoint, final 
InetAddressAndPort notifyEndpoint)
     {
-        Multimap<String, Map.Entry<InetAddressAndPort, 
Collection<Range<Token>>>> rangesToFetch = HashMultimap.create();
+        Map<String, Multimap<InetAddressAndPort, FetchReplica>> 
replicasToFetch = new HashMap<>();
 
         InetAddressAndPort myAddress = 
FBUtilities.getBroadcastAddressAndPort();
 
         for (String keyspaceName : 
Schema.instance.getNonLocalStrategyKeyspaces())
         {
-            Multimap<Range<Token>, InetAddressAndPort> changedRanges = 
getChangedRangesForLeaving(keyspaceName, endpoint);
-            Set<Range<Token>> myNewRanges = new HashSet<>();
-            for (Map.Entry<Range<Token>, InetAddressAndPort> entry : 
changedRanges.entries())
-            {
-                if (entry.getValue().equals(myAddress))
-                    myNewRanges.add(entry.getKey());
-            }
-            Multimap<InetAddressAndPort, Range<Token>> sourceRanges = 
getNewSourceRanges(keyspaceName, myNewRanges);
-            for (Map.Entry<InetAddressAndPort, Collection<Range<Token>>> entry 
: sourceRanges.asMap().entrySet())
+            logger.debug("Restoring replica count for keyspace {}", 
keyspaceName);
+            EndpointsByReplica changedReplicas = 
getChangedReplicasForLeaving(keyspaceName, endpoint, tokenMetadata, 
Keyspace.open(keyspaceName).getReplicationStrategy());
+            Set<LeavingReplica> myNewReplicas = new HashSet<>();
+            for (Map.Entry<Replica, Replica> entry : 
changedReplicas.flattenEntries())
             {
-                rangesToFetch.put(keyspaceName, entry);
+                Replica replica = entry.getValue();
+                if (replica.endpoint().equals(myAddress))
+                {
+                    //Maybe we don't technically need to fetch transient data 
from somewhere
+                    //but it's probably not a lot and it probably makes things 
a hair more resilient to people
+                    //not running repair when they should.
+                    myNewReplicas.add(new LeavingReplica(entry.getKey(), 
entry.getValue()));
+                }
             }
+            logger.debug("Changed replicas for leaving {}, myNewReplicas {}", 
changedReplicas, myNewReplicas);
+            replicasToFetch.put(keyspaceName, 
getNewSourceReplicas(keyspaceName, myNewReplicas));
         }
 
         StreamPlan stream = new 
StreamPlan(StreamOperation.RESTORE_REPLICA_COUNT);
-        for (String keyspaceName : rangesToFetch.keySet())
-        {
-            for (Map.Entry<InetAddressAndPort, Collection<Range<Token>>> entry 
: rangesToFetch.get(keyspaceName))
-            {
-                InetAddressAndPort source = entry.getKey();
-                Collection<Range<Token>> ranges = entry.getValue();
+        replicasToFetch.forEach((keyspaceName, sources) -> {
+            logger.debug("Requesting keyspace {} sources", keyspaceName);
+            sources.asMap().forEach((sourceAddress, fetchReplicas) -> {
+                logger.debug("Source and our replicas are {}", fetchReplicas);
+                //Remember whether this node is providing the full or 
transient replicas for this range. We are going
+                //to pass streaming the local instance of Replica for the 
range which doesn't tell us anything about the source
+                //By encoding it as two separate sets we retain this 
information about the source.
+                RangesAtEndpoint full = fetchReplicas.stream()
+                                                             .filter(f -> 
f.remote.isFull())
+                                                             .map(f -> f.local)
+                                                             
.collect(RangesAtEndpoint.collector(myAddress));
+                RangesAtEndpoint transientReplicas = fetchReplicas.stream()
+                                                                  .filter(f -> 
f.remote.isTransient())
+                                                                  .map(f -> 
f.local)
+                                                                  
.collect(RangesAtEndpoint.collector(myAddress));
                 if (logger.isDebugEnabled())
-                    logger.debug("Requesting from {} ranges {}", source, 
StringUtils.join(ranges, ", "));
-                stream.requestRanges(source, keyspaceName, ranges);
-            }
-        }
+                    logger.debug("Requesting from {} full replicas {} 
transient replicas {}", sourceAddress, StringUtils.join(full, ", "), 
StringUtils.join(transientReplicas, ", "));
+
+                stream.requestRanges(sourceAddress, keyspaceName, full, 
transientReplicas);
+            });
+        });
         StreamResultFuture future = stream.execute();
         Futures.addCallback(future, new FutureCallback<StreamState>()
         {
@@ -2854,21 +2938,36 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         });
     }
 
+    /**
+     * This is used in three contexts, graceful decomission, and 
restoreReplicaCount/removeNode.
+     * Graceful decomission should never lose data and it's going to be 
important that transient data
+     * is streamed to at least one other node from this one for each range.
+     *
+     * For ranges this node replicates its removal should cause a new replica 
to be selected either as transient or full
+     * for every range. So I believe the current code doesn't have to do 
anything special because it will engage in streaming
+     * for every range it replicates to at least one other node and that 
should propagate the transient data that was here.
+     * When I graphed this out on paper the result of removal looked correct 
and there are no issues such as
+     * this node needing to create a full replica for a range it transiently 
replicates because what is created is just another
+     * transient replica to replace this node.
+     * @param keyspaceName
+     * @param endpoint
+     * @return
+     */
     // needs to be modified to accept either a keyspace or ARS.
-    private Multimap<Range<Token>, InetAddressAndPort> 
getChangedRangesForLeaving(String keyspaceName, InetAddressAndPort endpoint)
+    static EndpointsByReplica getChangedReplicasForLeaving(String 
keyspaceName, InetAddressAndPort endpoint, TokenMetadata tokenMetadata, 
AbstractReplicationStrategy strat)
     {
         // First get all ranges the leaving endpoint is responsible for
-        Collection<Range<Token>> ranges = getRangesForEndpoint(keyspaceName, 
endpoint);
+        RangesAtEndpoint replicas = strat.getAddressReplicas(endpoint);
 
         if (logger.isDebugEnabled())
-            logger.debug("Node {} ranges [{}]", endpoint, 
StringUtils.join(ranges, ", "));
+            logger.debug("Node {} replicas [{}]", endpoint, 
StringUtils.join(replicas, ", "));
 
-        Map<Range<Token>, List<InetAddressAndPort>> currentReplicaEndpoints = 
new HashMap<>(ranges.size());
+        Map<Replica, EndpointsForRange> currentReplicaEndpoints = 
Maps.newHashMapWithExpectedSize(replicas.size());
 
         // Find (for each range) all nodes that store replicas for these 
ranges as well
         TokenMetadata metadata = tokenMetadata.cloneOnlyTokenMap(); // don't 
do this in the loop! #7758
-        for (Range<Token> range : ranges)
-            currentReplicaEndpoints.put(range, 
Keyspace.open(keyspaceName).getReplicationStrategy().calculateNaturalEndpoints(range.right,
 metadata));
+        for (Replica replica : replicas)
+            currentReplicaEndpoints.put(replica, 
strat.calculateNaturalReplicas(replica.range().right, metadata));
 
         TokenMetadata temp = tokenMetadata.cloneAfterAllLeft();
 
@@ -2877,26 +2976,43 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         if (temp.isMember(endpoint))
             temp.removeEndpoint(endpoint);
 
-        Multimap<Range<Token>, InetAddressAndPort> changedRanges = 
HashMultimap.create();
+        EndpointsByReplica.Mutable changedRanges = new 
EndpointsByReplica.Mutable();
 
         // Go through the ranges and for each range check who will be
         // storing replicas for these ranges when the leaving endpoint
         // is gone. Whoever is present in newReplicaEndpoints list, but
         // not in the currentReplicaEndpoints list, will be needing the
         // range.
-        for (Range<Token> range : ranges)
-        {
-            Collection<InetAddressAndPort> newReplicaEndpoints = 
Keyspace.open(keyspaceName).getReplicationStrategy().calculateNaturalEndpoints(range.right,
 temp);
-            newReplicaEndpoints.removeAll(currentReplicaEndpoints.get(range));
+        for (Replica replica : replicas)
+        {
+            EndpointsForRange newReplicaEndpoints = 
strat.calculateNaturalReplicas(replica.range().right, temp);
+            newReplicaEndpoints = newReplicaEndpoints.filter(newReplica -> {
+                Optional<Replica> currentReplicaOptional =
+                    tryFind(currentReplicaEndpoints.get(replica),
+                            currentReplica -> 
newReplica.endpoint().equals(currentReplica.endpoint())
+                    ).toJavaUtil();
+                //If it is newly replicating then yes we must do something to 
get the data there
+                if (!currentReplicaOptional.isPresent())
+                    return true;
+
+                Replica currentReplica = currentReplicaOptional.get();
+                //This transition requires streaming to occur
+                //Full -> transient is handled by nodetool cleanup
+                //transient -> transient and full -> full don't require any 
action
+                if (currentReplica.isTransient() && newReplica.isFull())
+                    return true;
+                return false;
+            });
+
             if (logger.isDebugEnabled())
                 if (newReplicaEndpoints.isEmpty())
-                    logger.debug("Range {} already in all replicas", range);
+                    logger.debug("Replica {} already in all replicas", 
replica);
                 else
-                    logger.debug("Range {} will be responsibility of {}", 
range, StringUtils.join(newReplicaEndpoints, ", "));
-            changedRanges.putAll(range, newReplicaEndpoints);
+                    logger.debug("Replica {} will be responsibility of {}", 
replica, StringUtils.join(newReplicaEndpoints, ", "));
+            changedRanges.putAll(replica, newReplicaEndpoints, Conflict.NONE);
         }
 
-        return changedRanges;
+        return changedRanges.asImmutableView();
     }
 
     public void onJoin(InetAddressAndPort endpoint, EndpointState epState)
@@ -3602,10 +3718,10 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
             }
             else
             {
-                option.getRanges().addAll(getLocalRanges(keyspace));
+                Iterables.addAll(option.getRanges(), 
getLocalReplicas(keyspace).filter(Replica::isFull).ranges());
             }
         }
-        if (option.getRanges().isEmpty() || 
Keyspace.open(keyspace).getReplicationStrategy().getReplicationFactor() < 2)
+        if (option.getRanges().isEmpty() || 
Keyspace.open(keyspace).getReplicationStrategy().getReplicationFactor().allReplicas
 < 2)
             return 0;
 
         int cmd = nextRepairCommand.incrementAndGet();
@@ -3703,7 +3819,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
      * Get the "primary ranges" for the specified keyspace and endpoint.
      * "Primary ranges" are the ranges that the node is responsible for 
storing replica primarily.
      * The node that stores replica primarily is defined as the first node 
returned
-     * by {@link AbstractReplicationStrategy#calculateNaturalEndpoints}.
+     * by {@link AbstractReplicationStrategy#calculateNaturalReplicas}.
      *
      * @param keyspace Keyspace name to check primary ranges
      * @param ep endpoint we are interested in.
@@ -3716,9 +3832,12 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         TokenMetadata metadata = tokenMetadata.cloneOnlyTokenMap();
         for (Token token : metadata.sortedTokens())
         {
-            List<InetAddressAndPort> endpoints = 
strategy.calculateNaturalEndpoints(token, metadata);
-            if (endpoints.size() > 0 && endpoints.get(0).equals(ep))
+            EndpointsForRange replicas = 
strategy.calculateNaturalReplicas(token, metadata);
+            if (replicas.size() > 0 && replicas.get(0).endpoint().equals(ep))
+            {
+                Preconditions.checkState(replicas.get(0).isFull());
                 primaryRanges.add(new Range<>(metadata.getPredecessor(token), 
token));
+            }
         }
         return primaryRanges;
     }
@@ -3741,12 +3860,12 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         Collection<Range<Token>> localDCPrimaryRanges = new HashSet<>();
         for (Token token : metadata.sortedTokens())
         {
-            List<InetAddressAndPort> endpoints = 
strategy.calculateNaturalEndpoints(token, metadata);
-            for (InetAddressAndPort endpoint : endpoints)
+            EndpointsForRange replicas = 
strategy.calculateNaturalReplicas(token, metadata);
+            for (Replica replica : replicas)
             {
-                if (localDcNodes.contains(endpoint))
+                if (localDcNodes.contains(replica.endpoint()))
                 {
-                    if (endpoint.equals(referenceEndpoint))
+                    if (replica.endpoint().equals(referenceEndpoint))
                     {
                         localDCPrimaryRanges.add(new 
Range<>(metadata.getPredecessor(token), token));
                     }
@@ -3763,9 +3882,9 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
      * @param ep endpoint we are interested in.
      * @return ranges for the specified endpoint.
      */
-    Collection<Range<Token>> getRangesForEndpoint(String keyspaceName, 
InetAddressAndPort ep)
+    RangesAtEndpoint getReplicasForEndpoint(String keyspaceName, 
InetAddressAndPort ep)
     {
-        return 
Keyspace.open(keyspaceName).getReplicationStrategy().getAddressRanges().get(ep);
+        return 
Keyspace.open(keyspaceName).getReplicationStrategy().getAddressReplicas(ep);
     }
 
     /**
@@ -3806,40 +3925,53 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
     @Deprecated
     public List<InetAddress> getNaturalEndpoints(String keyspaceName, String 
cf, String key)
     {
-        KeyspaceMetadata ksMetaData = 
Schema.instance.getKeyspaceMetadata(keyspaceName);
-        if (ksMetaData == null)
-            throw new IllegalArgumentException("Unknown keyspace '" + 
keyspaceName + "'");
-
-        TableMetadata metadata = ksMetaData.getTableOrViewNullable(cf);
-        if (metadata == null)
-            throw new IllegalArgumentException("Unknown table '" + cf + "' in 
keyspace '" + keyspaceName + "'");
-
-        return getNaturalEndpoints(keyspaceName, 
tokenMetadata.partitioner.getToken(metadata.partitionKeyType.fromString(key))).stream().map(i
 -> i.address).collect(toList());
+        EndpointsForToken replicas = getNaturalReplicasForToken(keyspaceName, 
cf, key);
+        List<InetAddress> inetList = new ArrayList<>(replicas.size());
+        replicas.forEach(r -> inetList.add(r.endpoint().address));
+        return inetList;
     }
 
     public List<String> getNaturalEndpointsWithPort(String keyspaceName, 
String cf, String key)
     {
-        KeyspaceMetadata ksMetaData = 
Schema.instance.getKeyspaceMetadata(keyspaceName);
-        if (ksMetaData == null)
-            throw new IllegalArgumentException("Unknown keyspace '" + 
keyspaceName + "'");
-
-        TableMetadata metadata = ksMetaData.getTableOrViewNullable(cf);
-        if (metadata == null)
-            throw new IllegalArgumentException("Unknown table '" + cf + "' in 
keyspace '" + keyspaceName + "'");
-
-        return stringify(getNaturalEndpoints(keyspaceName, 
tokenMetadata.partitioner.getToken(metadata.partitionKeyType.fromString(key))), 
true);
+        return Replicas.stringify(getNaturalReplicasForToken(keyspaceName, cf, 
key), true);
     }
 
 
     @Deprecated
     public List<InetAddress> getNaturalEndpoints(String keyspaceName, 
ByteBuffer key)
     {
-        return getNaturalEndpoints(keyspaceName, 
tokenMetadata.partitioner.getToken(key)).stream().map(i -> 
i.address).collect(toList());
+        EndpointsForToken replicas = getNaturalReplicasForToken(keyspaceName, 
tokenMetadata.partitioner.getToken(key));
+        List<InetAddress> inetList = new ArrayList<>(replicas.size());
+        replicas.forEach(r -> inetList.add(r.endpoint().address));
+        return inetList;
     }
 
     public List<String> getNaturalEndpointsWithPort(String keyspaceName, 
ByteBuffer key)
     {
-        return stringify(getNaturalEndpoints(keyspaceName, 
tokenMetadata.partitioner.getToken(key)), true);
+        return Replicas.stringify(getNaturalReplicasForToken(keyspaceName, 
tokenMetadata.partitioner.getToken(key)), true);
+    }
+
+    public List<String> getReplicas(String keyspaceName, String cf, String key)
+    {
+        List<String> res = new ArrayList<>();
+        for (Replica replica : getNaturalReplicasForToken(keyspaceName, cf, 
key))
+        {
+            res.add(replica.toString());
+        }
+        return res;
+    }
+
+    public EndpointsForToken getNaturalReplicasForToken(String keyspaceName, 
String cf, String key)
+    {
+        KeyspaceMetadata ksMetaData = 
Schema.instance.getKeyspaceMetadata(keyspaceName);
+        if (ksMetaData == null)
+            throw new IllegalArgumentException("Unknown keyspace '" + 
keyspaceName + "'");
+
+        TableMetadata metadata = ksMetaData.getTableOrViewNullable(cf);
+        if (metadata == null)
+            throw new IllegalArgumentException("Unknown table '" + cf + "' in 
keyspace '" + keyspaceName + "'");
+
+        return getNaturalReplicasForToken(keyspaceName, 
tokenMetadata.partitioner.getToken(metadata.partitionKeyType.fromString(key)));
     }
 
     /**
@@ -3850,17 +3982,25 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
      * @param pos position for which we need to find the endpoint
      * @return the endpoint responsible for this token
      */
-    public List<InetAddressAndPort> getNaturalEndpoints(String keyspaceName, 
RingPosition pos)
+    public static EndpointsForToken getNaturalReplicasForToken(String 
keyspaceName, RingPosition pos)
     {
-        return 
Keyspace.open(keyspaceName).getReplicationStrategy().getNaturalEndpoints(pos);
+        return 
Keyspace.open(keyspaceName).getReplicationStrategy().getNaturalReplicasForToken(pos);
     }
 
     /**
      * Returns the endpoints currently responsible for storing the token plus 
pending ones
      */
-    public Iterable<InetAddressAndPort> getNaturalAndPendingEndpoints(String 
keyspaceName, Token token)
+    public EndpointsForToken getNaturalAndPendingReplicasForToken(String 
keyspaceName, Token token)
     {
-        return Iterables.concat(getNaturalEndpoints(keyspaceName, token), 
tokenMetadata.pendingEndpointsFor(token, keyspaceName));
+        // TODO: race condition to fetch these. impliciations??
+        EndpointsForToken natural = getNaturalReplicasForToken(keyspaceName, 
token);
+        EndpointsForToken pending = 
tokenMetadata.pendingEndpointsForToken(token, keyspaceName);
+        if (Endpoints.haveConflicts(natural, pending))
+        {
+            natural = Endpoints.resolveConflictsInNatural(natural, pending);
+            pending = Endpoints.resolveConflictsInPending(natural, pending);
+        }
+        return Endpoints.concat(natural, pending);
     }
 
     /**
@@ -3868,19 +4008,11 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
      * specified key i.e for replication.
      *
      * @param keyspace keyspace name also known as keyspace
-     * @param key key for which we need to find the endpoint
-     * @return the endpoint responsible for this key
+     * @param pos position for which we need to find the endpoint
      */
-    public List<InetAddressAndPort> getLiveNaturalEndpoints(Keyspace keyspace, 
ByteBuffer key)
-    {
-        return getLiveNaturalEndpoints(keyspace, 
tokenMetadata.decorateKey(key));
-    }
-
-    public List<InetAddressAndPort> getLiveNaturalEndpoints(Keyspace keyspace, 
RingPosition pos)
+    public EndpointsForToken getLiveNaturalReplicasForToken(Keyspace keyspace, 
RingPosition pos)
     {
-        List<InetAddressAndPort> liveEps = new ArrayList<>();
-        getLiveNaturalEndpoints(keyspace, pos, liveEps);
-        return liveEps;
+        return getLiveNaturalReplicas(keyspace, pos).forToken(pos.getToken());
     }
 
     /**
@@ -3889,17 +4021,11 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
      *
      * @param keyspace keyspace name also known as keyspace
      * @param pos position for which we need to find the endpoint
-     * @param liveEps the list of endpoints to mutate
      */
-    public void getLiveNaturalEndpoints(Keyspace keyspace, RingPosition pos, 
List<InetAddressAndPort> liveEps)
+    public EndpointsForRange getLiveNaturalReplicas(Keyspace keyspace, 
RingPosition pos)
     {
-        List<InetAddressAndPort> endpoints = 
keyspace.getReplicationStrategy().getNaturalEndpoints(pos);
-
-        for (InetAddressAndPort endpoint : endpoints)
-        {
-            if (FailureDetector.instance.isAlive(endpoint))
-                liveEps.add(endpoint);
-        }
+        EndpointsForRange replicas = 
keyspace.getReplicationStrategy().getNaturalReplicas(pos);
+        return replicas.filter(r -> 
FailureDetector.instance.isAlive(r.endpoint()));
     }
 
     public void setLoggingLevel(String classQualifier, String rawLevel) throws 
Exception
@@ -4019,13 +4145,13 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
                         if (keyspace.getReplicationStrategy() instanceof 
NetworkTopologyStrategy)
                         {
                             NetworkTopologyStrategy strategy = 
(NetworkTopologyStrategy) keyspace.getReplicationStrategy();
-                            rf = strategy.getReplicationFactor(dc);
+                            rf = strategy.getReplicationFactor(dc).allReplicas;
                             numNodes = 
metadata.getTopology().getDatacenterEndpoints().get(dc).size();
                         }
                         else
                         {
                             numNodes = metadata.getAllEndpoints().size();
-                            rf = 
keyspace.getReplicationStrategy().getReplicationFactor();
+                            rf = 
keyspace.getReplicationStrategy().getReplicationFactor().allReplicas;
                         }
 
                         if (numNodes <= rf)
@@ -4033,6 +4159,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
                                                                     + 
keyspaceName + " (RF = " + rf + ", N = " + numNodes + ")."
                                                                     + " 
Perform a forceful decommission to ignore.");
                     }
+                    // TODO: do we care about fixing transient/full 
self-movements here? probably
                     if (tokenMetadata.getPendingRanges(keyspaceName, 
FBUtilities.getBroadcastAddressAndPort()).size() > 0)
                         throw new UnsupportedOperationException("data is 
currently moving to this node; unable to leave the ring");
                 }
@@ -4095,11 +4222,11 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
 
     private void unbootstrap(Runnable onFinish) throws ExecutionException, 
InterruptedException
     {
-        Map<String, Multimap<Range<Token>, InetAddressAndPort>> rangesToStream 
= new HashMap<>();
+        Map<String, EndpointsByReplica> rangesToStream = new HashMap<>();
 
         for (String keyspaceName : 
Schema.instance.getNonLocalStrategyKeyspaces())
         {
-            Multimap<Range<Token>, InetAddressAndPort> rangesMM = 
getChangedRangesForLeaving(keyspaceName, 
FBUtilities.getBroadcastAddressAndPort());
+            EndpointsByReplica rangesMM = 
getChangedReplicasForLeaving(keyspaceName, 
FBUtilities.getBroadcastAddressAndPort(), tokenMetadata, 
Keyspace.open(keyspaceName).getReplicationStrategy());
 
             if (logger.isDebugEnabled())
                 logger.debug("Ranges needing transfer are [{}]", 
StringUtils.join(rangesMM.keySet(), ","));
@@ -4135,20 +4262,22 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         return 
HintsService.instance.transferHints(this::getPreferredHintsStreamTarget);
     }
 
+    private static EndpointsForRange 
getStreamCandidates(Collection<InetAddressAndPort> endpoints)
+    {
+        endpoints = endpoints.stream()
+                             .filter(endpoint -> 
FailureDetector.instance.isAlive(endpoint) && 
!FBUtilities.getBroadcastAddressAndPort().equals(endpoint))
+                             .collect(Collectors.toList());
+
+        return 
EndpointsForRange.copyOf(SystemReplicas.getSystemReplicas(endpoints));
+    }
     /**
      * Find the best target to stream hints to. Currently the closest peer 
according to the snitch
      */
     private UUID getPreferredHintsStreamTarget()
     {
-        List<InetAddressAndPort> candidates = new 
ArrayList<>(StorageService.instance.getTokenMetadata().cloneAfterAllLeft().getAllEndpoints());
-        candidates.remove(FBUtilities.getBroadcastAddressAndPort());
-        for (Iterator<InetAddressAndPort> iter = candidates.iterator(); 
iter.hasNext(); )
-        {
-            InetAddressAndPort address = iter.next();
-            if (!FailureDetector.instance.isAlive(address))
-                iter.remove();
-        }
+        Set<InetAddressAndPort> endpoints = 
StorageService.instance.getTokenMetadata().cloneAfterAllLeft().getAllEndpoints();
 
+        EndpointsForRange candidates = getStreamCandidates(endpoints);
         if (candidates.isEmpty())
         {
             logger.warn("Unable to stream hints since no live endpoints seen");
@@ -4157,8 +4286,8 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         else
         {
             // stream to the closest peer as chosen by the snitch
-            
DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddressAndPort(),
 candidates);
-            InetAddressAndPort hintsDestinationHost = candidates.get(0);
+            candidates = 
DatabaseDescriptor.getEndpointSnitch().sortedByProximity(FBUtilities.getBroadcastAddressAndPort(),
 candidates);
+            InetAddressAndPort hintsDestinationHost = 
candidates.get(0).endpoint();
             return tokenMetadata.getHostId(hintsDestinationHost);
         }
     }
@@ -4207,6 +4336,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         // checking if data is moving to this node
         for (String keyspaceName : keyspacesToProcess)
         {
+            // TODO: do we care about fixing transient/full self-movements 
here?
             if (tokenMetadata.getPendingRanges(keyspaceName, 
localAddress).size() > 0)
                 throw new UnsupportedOperationException("data is currently 
moving to this node; unable to leave the ring");
         }
@@ -4218,7 +4348,8 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         setMode(Mode.MOVING, String.format("Sleeping %s ms before start 
streaming/fetching ranges", RING_DELAY), true);
         Uninterruptibles.sleepUninterruptibly(RING_DELAY, 
TimeUnit.MILLISECONDS);
 
-        RangeRelocator relocator = new 
RangeRelocator(Collections.singleton(newToken), keyspacesToProcess);
+        RangeRelocator relocator = new 
RangeRelocator(Collections.singleton(newToken), keyspacesToProcess, 
tokenMetadata);
+        relocator.calculateToFromStreams();
 
         if (relocator.streamsNeeded())
         {
@@ -4243,131 +4374,191 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
             logger.debug("Successfully moved to new token {}", 
getLocalTokens().iterator().next());
     }
 
-    private class RangeRelocator
+    @VisibleForTesting
+    public static class RangeRelocator
     {
         private final StreamPlan streamPlan = new 
StreamPlan(StreamOperation.RELOCATION);
-
-        private RangeRelocator(Collection<Token> tokens, List<String> 
keyspaceNames)
-        {
-            calculateToFromStreams(tokens, keyspaceNames);
-        }
-
-        private void calculateToFromStreams(Collection<Token> newTokens, 
List<String> keyspaceNames)
-        {
-            InetAddressAndPort localAddress = 
FBUtilities.getBroadcastAddressAndPort();
-            IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
-            TokenMetadata tokenMetaCloneAllSettled = 
tokenMetadata.cloneAfterAllSettled();
-            // clone to avoid concurrent modification in 
calculateNaturalEndpoints
-            TokenMetadata tokenMetaClone = tokenMetadata.cloneOnlyTokenMap();
-
-            for (String keyspace : keyspaceNames)
+        private final InetAddressAndPort localAddress = 
FBUtilities.getBroadcastAddressAndPort();
+        private final TokenMetadata tokenMetaCloneAllSettled;
+        // clone to avoid concurrent modification in calculateNaturalReplicas
+        private final TokenMetadata tokenMetaClone;
+        private final Collection<Token> tokens;
+        private final List<String> keyspaceNames;
+
+
+        private RangeRelocator(Collection<Token> tokens, List<String> 
keyspaceNames, TokenMetadata tmd)
+        {
+            this.tokens = tokens;
+            this.keyspaceNames = keyspaceNames;
+            this.tokenMetaCloneAllSettled = tmd.cloneAfterAllSettled();
+            // clone to avoid concurrent modification in 
calculateNaturalReplicas
+            this.tokenMetaClone = tmd.cloneOnlyTokenMap();
+        }
+
+        @VisibleForTesting
+        public RangeRelocator()
+        {
+            this.tokens = null;
+            this.keyspaceNames = null;
+            this.tokenMetaCloneAllSettled = null;
+            this.tokenMetaClone = null;
+        }
+
+        /**
+         * Wrapper that supplies accessors to the real implementations of the 
various dependencies for this method
+         */
+        private Multimap<InetAddressAndPort, FetchReplica> 
calculateRangesToFetchWithPreferredEndpoints(AbstractReplicationStrategy 
strategy, RangesAtEndpoint fetchRanges, String keyspace)
+        {
+            EndpointsByReplica preferredEndpoints =
+            
RangeStreamer.calculateRangesToFetchWithPreferredEndpoints((address, replicas) 
-> DatabaseDescriptor.getEndpointSnitch().sortedByProximity(address, replicas),
+                                                                       
strategy,
+                                                                       
fetchRanges,
+                                                                       
useStrictConsistency,
+                                                                       
tokenMetaClone,
+                                                                       
tokenMetaCloneAllSettled,
+                                                                       
RangeStreamer.ALIVE_PREDICATE,
+                                                                       
keyspace,
+                                                                       
Collections.emptyList());
+            return 
RangeStreamer.convertPreferredEndpointsToWorkMap(preferredEndpoints);
+        }
+
+        /**
+         * calculating endpoints to stream current ranges to if needed
+         * in some situations node will handle current ranges as part of the 
new ranges
+         **/
+        public RangesByEndpoint 
calculateRangesToStreamWithEndpoints(RangesAtEndpoint streamRanges,
+                                                                     
AbstractReplicationStrategy strat,
+                                                                     
TokenMetadata tmdBefore,
+                                                                     
TokenMetadata tmdAfter)
+        {
+            RangesByEndpoint.Mutable endpointRanges = new 
RangesByEndpoint.Mutable();
+            for (Replica toStream : streamRanges)
             {
-                // replication strategy of the current keyspace
-                AbstractReplicationStrategy strategy = 
Keyspace.open(keyspace).getReplicationStrategy();
-                Multimap<InetAddressAndPort, Range<Token>> endpointToRanges = 
strategy.getAddressRanges();
-
-                logger.debug("Calculating ranges to stream and request for 
keyspace {}", keyspace);
-                for (Token newToken : newTokens)
+                //If the range we are sending is full only send it to the new 
full replica
+                //There will also be a new transient replica we need to send 
the data to, but not
+                //the repaired data
+                EndpointsForRange currentEndpoints = 
strat.calculateNaturalReplicas(toStream.range().right, tmdBefore);
+                EndpointsForRange newEndpoints = 
strat.calculateNaturalReplicas(toStream.range().right, tmdAfter);
+                logger.debug("Need to stream {}, current endpoints {}, new 
endpoints {}", toStream, currentEndpoints, newEndpoints);
+
+                for (Replica current : currentEndpoints)
                 {
-                    // getting collection of the currently used ranges by this 
keyspace
-                    Collection<Range<Token>> currentRanges = 
endpointToRanges.get(localAddress);
-                    // collection of ranges which this node will serve after 
move to the new token
-                    Collection<Range<Token>> updatedRanges = 
strategy.getPendingAddressRanges(tokenMetaClone, newToken, localAddress);
-
-                    // ring ranges and endpoints associated with them
-                    // this used to determine what nodes should we ping about 
range data
-                    Multimap<Range<Token>, InetAddressAndPort> rangeAddresses 
= strategy.getRangeAddresses(tokenMetaClone);
-
-                    // calculated parts of the ranges to request/stream 
from/to nodes in the ring
-                    Pair<Set<Range<Token>>, Set<Range<Token>>> 
rangesPerKeyspace = calculateStreamAndFetchRanges(currentRanges, updatedRanges);
-
-                    /**
-                     * In this loop we are going through all ranges "to fetch" 
and determining
-                     * nodes in the ring responsible for data we are 
interested in
-                     */
-                    Multimap<Range<Token>, InetAddressAndPort> 
rangesToFetchWithPreferredEndpoints = ArrayListMultimap.create();
-                    for (Range<Token> toFetch : rangesPerKeyspace.right)
+                    for (Replica updated : newEndpoints)
                     {
-                        for (Range<Token> range : rangeAddresses.keySet())
+                        if (current.endpoint().equals(updated.endpoint()))
                         {
-                            if (range.contains(toFetch))
-                            {
-                                List<InetAddressAndPort> endpoints = null;
+                            //Nothing to do
+                            if (current.equals(updated))
+                                break;
 
-                                if (useStrictConsistency)
+                            //In these two (really three) cases the existing 
data is sufficient and we should subtract whatever is already replicated
+                            if (current.isFull() == updated.isFull() || 
current.isFull())
+                            {
+                                //First subtract what we already have
+                                Set<Range<Token>> subsToStream = 
toStream.range().subtract(current.range());
+                                //Now we only stream what is still replicated
+                                subsToStream = 
subsToStream.stream().flatMap(range -> 
range.intersectionWith(updated.range()).stream()).collect(Collectors.toSet());
+                                for (Range<Token> subrange : subsToStream)
                                 {
-                                    Set<InetAddressAndPort> oldEndpoints = 
Sets.newHashSet(rangeAddresses.get(range));
-                                    Set<InetAddressAndPort> newEndpoints = 
Sets.newHashSet(strategy.calculateNaturalEndpoints(toFetch.right, 
tokenMetaCloneAllSettled));
-
-                                    //Due to CASSANDRA-5953 we can have a 
higher RF then we have endpoints.
-                                    //So we need to be careful to only be 
strict when endpoints == RF
-                                    if (oldEndpoints.size() == 
strategy.getReplicationFactor())
+                                    //Only stream what intersects with what is 
in the new world
+                                    Set<Range<Token>> intersections = 
subrange.intersectionWith(updated.range());
+                                    for (Range<Token> intersection : 
intersections)
                                     {
-                                        oldEndpoints.removeAll(newEndpoints);
-
-                                        //No relocation required
-                                        if (oldEndpoints.isEmpty())
-                                            continue;
-
-                                        assert oldEndpoints.size() == 1 : 
"Expected 1 endpoint but found " + oldEndpoints.size();
+                                        endpointRanges.put(updated.endpoint(), 
updated.decorateSubrange(intersection));
                                     }
-
-                                    endpoints = 
Lists.newArrayList(oldEndpoints.iterator().next());
                                 }
-                                else
+                            }
+                            else
+                            {
+                                for (Range<Token> intersection : 
toStream.range().intersectionWith(updated.range()))
                                 {
-                                    endpoints = 
snitch.getSortedListByProximity(localAddress, rangeAddresses.get(range));
+                                    endpointRanges.put(updated.endpoint(), 
updated.decorateSubrange(intersection));
                                 }
-
-                                // storing range and preferred endpoint set
-                                
rangesToFetchWithPreferredEndpoints.putAll(toFetch, endpoints);
                             }
                         }
+                    }
+                }
 
-                        Collection<InetAddressAndPort> addressList = 
rangesToFetchWithPreferredEndpoints.get(toFetch);
-                        if (addressList == null || addressList.isEmpty())
-                            continue;
-
-                        if (useStrictConsistency)
+                for (Replica updated : newEndpoints)
+                {
+                    if 
(!currentEndpoints.byEndpoint().containsKey(updated.endpoint()))
+                    {
+                        // Completely new range for this endpoint
+                        if (toStream.isTransient() && updated.isFull())
                         {
-                            if (addressList.size() > 1)
-                                throw new IllegalStateException("Multiple 
strict sources found for " + toFetch);
-
-                            InetAddressAndPort sourceIp = 
addressList.iterator().next();
-                            if (Gossiper.instance.isEnabled() && 
!Gossiper.instance.getEndpointStateForEndpoint(sourceIp).isAlive())
-                                throw new RuntimeException("A node required to 
move the data consistently is down ("+sourceIp+").  If you wish to move the 
data from a potentially inconsistent replica, restart the node with 
-Dcassandra.consistent.rangemovement=false");
+                            throw new AssertionError(String.format("Need to 
stream %s, but only have %s which is transient and not full", updated, 
toStream));
+                        }
+                        for (Range<Token> intersection : 
updated.range().intersectionWith(toStream.range()))
+                        {
+                            endpointRanges.put(updated.endpoint(), 
updated.decorateSubrange(intersection));
                         }
                     }
+                }
+            }
+            return endpointRanges.asImmutableView();
+        }
 
-                    // calculating endpoints to stream current ranges to if 
needed
-                    // in some situations node will handle current ranges as 
part of the new ranges
-                    Multimap<InetAddressAndPort, Range<Token>> endpointRanges 
= HashMultimap.create();
-                    for (Range<Token> toStream : rangesPerKeyspace.left)
+        private void calculateToFromStreams()
+        {
+            logger.debug("Current tmd " + tokenMetaClone);
+            logger.debug("Updated tmd " + tokenMetaCloneAllSettled);
+            for (String keyspace : keyspaceNames)
+            {
+                // replication strategy of the current keyspace
+                AbstractReplicationStrategy strategy = 
Keyspace.open(keyspace).getReplicationStrategy();
+                // getting collection of the currently used ranges by this 
keyspace
+                RangesAtEndpoint currentReplicas = 
strategy.getAddressReplicas(localAddress);
+
+                logger.info("Calculating ranges to stream and request for 
keyspace {}", keyspace);
+                //From what I have seen we only ever call this with a single 
token from StorageService.move(Token)
+                for (Token newToken : tokens)
+                {
+                    Collection<Token> currentTokens = 
tokenMetaClone.getTokens(localAddress);
+                    if (currentTokens.size() > 1 || currentTokens.isEmpty())
                     {
-                        Set<InetAddressAndPort> currentEndpoints = 
ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(toStream.right, 
tokenMetaClone));
-                        Set<InetAddressAndPort> newEndpoints = 
ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(toStream.right, 
tokenMetaCloneAllSettled));
-                        logger.debug("Range: {} Current endpoints: {} New 
endpoints: {}", toStream, currentEndpoints, newEndpoints);
-                        for (InetAddressAndPort address : 
Sets.difference(newEndpoints, currentEndpoints))
-                        {
-                            logger.debug("Range {} has new owner {}", 
toStream, address);
-                            endpointRanges.put(address, toStream);
-                        }
+                        throw new AssertionError("Unexpected current tokens: " 
+ currentTokens);
                     }
 
+                    // collection of ranges which this node will serve after 
move to the new token
+                    RangesAtEndpoint updatedReplicas = 
strategy.getPendingAddressRanges(tokenMetaClone, newToken, localAddress);
+
+                    // calculated parts of the ranges to request/stream 
from/to nodes in the ring
+                    Pair<RangesAtEndpoint, RangesAtEndpoint> 
streamAndFetchOwnRanges = Pair.create(RangesAtEndpoint.empty(localAddress), 
RangesAtEndpoint.empty(localAddress));
+                    //In the single node token move there is nothing to do and 
Range subtraction is broken
+                    //so it's easier to just identify this case up front.
+                    if 
(tokenMetaClone.getTopology().getDatacenterEndpoints().get(DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()
+)).size() > 1)
+                    {
+                        streamAndFetchOwnRanges = 
calculateStreamAndFetchRanges(currentReplicas, updatedReplicas);
+                    }
+
+                    Multimap<InetAddressAndPort, FetchReplica> workMap = 
calculateRangesToFetchWithPreferredEndpoints(strategy, 
streamAndFetchOwnRanges.right, keyspace);
+
+                    RangesByEndpoint endpointRanges = 
calculateRangesToStreamWithEndpoints(streamAndFetchOwnRanges.left, strategy, 
tokenMetaClone, tokenMetaCloneAllSettled);
+
+                    logger.info("Endpoint ranges to stream to " + 
endpointRanges);
+
                     // stream ranges
                     for (InetAddressAndPort address : endpointRanges.keySet())
                     {
                         logger.debug("Will stream range {} of keyspace {} to 
endpoint {}", endpointRanges.get(address), keyspace, address);
-                        streamPlan.transferRanges(address, keyspace, 
endpointRanges.get(address));
+                        RangesAtEndpoint ranges = endpointRanges.get(address);
+                        streamPlan.transferRanges(address, keyspace, ranges);
                     }
 
                     // stream requests
-                    Multimap<InetAddressAndPort, Range<Token>> workMap = 
RangeStreamer.getWorkMap(rangesToFetchWithPreferredEndpoints, keyspace, 
FailureDetector.instance, useStrictConsistency);
-                    for (InetAddressAndPort address : workMap.keySet())
-                    {
+                    workMap.asMap().forEach((address, sourceAndOurReplicas) -> 
{
+                        RangesAtEndpoint full = sourceAndOurReplicas.stream()
+                                .filter(pair -> pair.remote.isFull())
+                                .map(pair -> pair.local)
+                                
.collect(RangesAtEndpoint.collector(localAddress));
+                        RangesAtEndpoint transientReplicas = 
sourceAndOurReplicas.stream()
+                                .filter(pair -> pair.remote.isTransient())
+                                .map(pair -> pair.local)
+                                
.collect(RangesAtEndpoint.collector(localAddress));
                         logger.debug("Will request range {} of keyspace {} 
from endpoint {}", workMap.get(address), keyspace, address);
-                        streamPlan.requestRanges(address, keyspace, 
workMap.get(address));
-                    }
+                        streamPlan.requestRanges(address, keyspace, full, 
transientReplicas);
+                    });
 
                     logger.debug("Keyspace {}: work map {}.", keyspace, 
workMap);
                 }
@@ -4486,14 +4677,14 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         for (String keyspaceName : 
Schema.instance.getNonLocalStrategyKeyspaces())
         {
             // if the replication factor is 1 the data is lost so we shouldn't 
wait for confirmation
-            if 
(Keyspace.open(keyspaceName).getReplicationStrategy().getReplicationFactor() == 
1)
+            if 
(Keyspace.open(keyspaceName).getReplicationStrategy().getReplicationFactor().allReplicas
 == 1)
                 continue;
 
             // get all ranges that change ownership (that is, a node needs
             // to take responsibility for new range)
-            Multimap<Range<Token>, InetAddressAndPort> changedRanges = 
getChangedRangesForLeaving(keyspaceName, endpoint);
+            EndpointsByReplica changedRanges = 
getChangedReplicasForLeaving(keyspaceName, endpoint, tokenMetadata, 
Keyspace.open(keyspaceName).getReplicationStrategy());
             IFailureDetector failureDetector = FailureDetector.instance;
-            for (InetAddressAndPort ep : changedRanges.values())
+            for (InetAddressAndPort ep : 
transform(changedRanges.flattenValues(), Replica::endpoint))
             {
                 if (failureDetector.isAlive(ep))
                     replicatingNodes.add(ep);
@@ -4903,15 +5094,14 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
 
         Collection<Collection<InetAddressAndPort>> endpointsGroupedByDc = new 
ArrayList<>();
         // mapping of dc's to nodes, use sorted map so that we get dcs sorted
-        SortedMap<String, Collection<InetAddressAndPort>> sortedDcsToEndpoints 
= new TreeMap<>();
-        
sortedDcsToEndpoints.putAll(metadata.getTopology().getDatacenterEndpoints().asMap());
+        SortedMap<String, Collection<InetAddressAndPort>> sortedDcsToEndpoints 
= new TreeMap<>(metadata.getTopology().getDatacenterEndpoints().asMap());
         for (Collection<InetAddressAndPort> endpoints : 
sortedDcsToEndpoints.values())
             endpointsGroupedByDc.add(endpoints);
 
         Map<Token, Float> tokenOwnership = 
tokenMetadata.partitioner.describeOwnership(tokenMetadata.sortedTokens());
         LinkedHashMap<InetAddressAndPort, Float> finalOwnership = 
Maps.newLinkedHashMap();
 
-        Multimap<InetAddressAndPort, Range<Token>> endpointToRanges = 
strategy.getAddressRanges();
+        RangesByEndpoint endpointToRanges = strategy.getAddressReplicas();
         // calculate ownership per dc
         for (Collection<InetAddressAndPort> endpoints : endpointsGroupedByDc)
         {
@@ -4919,10 +5109,10 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
             for (InetAddressAndPort endpoint : endpoints)
             {
                 float ownership = 0.0f;
-                for (Range<Token> range : endpointToRanges.get(endpoint))
+                for (Replica replica : endpointToRanges.get(endpoint))
                 {
-                    if (tokenOwnership.containsKey(range.right))
-                        ownership += tokenOwnership.get(range.right);
+                    if (tokenOwnership.containsKey(replica.range().right))
+                        ownership += tokenOwnership.get(replica.range().right);
                 }
                 finalOwnership.put(endpoint, ownership);
             }
@@ -4974,9 +5164,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
             UUID hostId = entry.getValue();
             InetAddressAndPort endpoint = entry.getKey();
             result.put(endpoint.toString(withPort),
-                       coreViewStatus.containsKey(hostId)
-                       ? coreViewStatus.get(hostId)
-                       : "UNKNOWN");
+                       coreViewStatus.getOrDefault(hostId, "UNKNOWN"));
         }
 
         return Collections.unmodifiableMap(result);
@@ -5079,69 +5267,63 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
     }
 
     /**
-     * Seed data to the endpoints that will be responsible for it at the future
+     * Send data to the endpoints that will be responsible for it in the future
      *
      * @param rangesToStreamByKeyspace keyspaces and data ranges with 
endpoints included for each
      * @return async Future for whether stream was success
      */
-    private Future<StreamState> streamRanges(Map<String, 
Multimap<Range<Token>, InetAddressAndPort>> rangesToStreamByKeyspace)
+    private Future<StreamState> streamRanges(Map<String, EndpointsByReplica> 
rangesToStreamByKeyspace)
     {
         // First, we build a list of ranges to stream to each host, per table
-        Map<String, Map<InetAddressAndPort, List<Range<Token>>>> 
sessionsToStreamByKeyspace = new HashMap<>();
+        Map<String, RangesByEndpoint> sessionsToStreamByKeyspace = new 
HashMap<>();
 
-        for (Map.Entry<String, Multimap<Range<Token>, InetAddressAndPort>> 
entry : rangesToStreamByKeyspace.entrySet())
+        for (Map.Entry<String, EndpointsByReplica> entry : 
rangesToStreamByKeyspace.entrySet())
         {
             String keyspace = entry.getKey();
-            Multimap<Range<Token>, InetAddressAndPort> rangesWithEndpoints = 
entry.getValue();
+            EndpointsByReplica rangesWithEndpoints = entry.getValue();
 
             if (rangesWithEndpoints.isEmpty())
                 continue;
 
+            //Description is always Unbootstrap? Is that right?
             Map<InetAddressAndPort, Set<Range<Token>>> 
transferredRangePerKeyspace = SystemKeyspace.getTransferredRanges("Unbootstrap",
                                                                                
                                          keyspace,
                                                                                
                                          
StorageService.instance.getTokenMetadata().partitioner);
-            Map<InetAddressAndPort, List<Range<Token>>> rangesPerEndpoint = 
new HashMap<>();
-            for (Map.Entry<Range<Token>, InetAddressAndPort> endPointEntry : 
rangesWithEndpoints.entries())
+            RangesByEndpoint.Mutable replicasPerEndpoint = new 
RangesByEndpoint.Mutable();
+            for (Map.Entry<Replica, Replica> endPointEntry : 
rangesWithEndpoints.flattenEntries())
             {
-                Range<Token> range = endPointEntry.getKey();
-                InetAddressAndPort endpoint = endPointEntry.getValue();
-
-                Set<Range<Token>> transferredRanges = 
transferredRangePerKeyspace.get(endpoint);
-                if (transferredRanges != null && 
transferredRanges.contains(range))
+                Replica local = endPointEntry.getKey();
+                Replica remote = endPointEntry.getValue();
+                Set<Range<Token>> transferredRanges = 
transferredRangePerKeyspace.get(remote.endpoint());
+                if (transferredRanges != null && 
transferredRanges.contains(local.range()))
                 {
-                    logger.debug("Skipping transferred range {} of keyspace 
{}, endpoint {}", range, keyspace, endpoint);
+                    logger.debug("Skipping transferred range {} of keyspace 
{}, endpoint {}", local, keyspace, remote);
                     continue;
                 }
 
-                List<Range<Token>> curRanges = rangesPerEndpoint.get(endpoint);
-                if (curRanges == null)
-                {
-                    curRanges = new LinkedList<>();
-                    rangesPerEndpoint.put(endpoint, curRanges);
-                }
-                curRanges.add(range);
+                replicasPerEndpoint.put(remote.endpoint(), 
remote.decorateSubrange(local.range()));
             }
 
-            sessionsToStreamByKeyspace.put(keyspace, rangesPerEndpoint);
+            sessionsToStreamByKeyspace.put(keyspace, 
replicasPerEndpoint.asImmutableView());
         }
 
         StreamPlan streamPlan = new StreamPlan(StreamOperation.DECOMMISSION);
 
-        // Vinculate StreamStateStore to current StreamPlan to update 
transferred ranges per StreamSession
+        // Vinculate StreamStateStore to current StreamPlan to update 
transferred rangeas per StreamSession
         streamPlan.listeners(streamStateStore);
 
-        for (Map.Entry<String, Map<InetAddressAndPort, List<Range<Token>>>> 
entry : sessionsToStreamByKeyspace.entrySet())
+        for (Map.Entry<String, RangesByEndpoint> entry : 
sessionsToStreamByKeyspace.entrySet())
         {
             String keyspaceName = entry.getKey();
-            Map<InetAddressAndPort, List<Range<Token>>> rangesPerEndpoint = 
entry.getValue();
+            RangesByEndpoint replicasPerEndpoint = entry.getValue();
 
-            for (Map.Entry<InetAddressAndPort, List<Range<Token>>> rangesEntry 
: rangesPerEndpoint.entrySet())
+            for (Map.Entry<InetAddressAndPort, RangesAtEndpoint> rangesEntry : 
replicasPerEndpoint.asMap().entrySet())
             {
-                List<Range<Token>> ranges = rangesEntry.getValue();
+                RangesAtEndpoint replicas = rangesEntry.getValue();
                 InetAddressAndPort newEndpoint = rangesEntry.getKey();
 
                 // TODO each call to transferRanges re-flushes, this is 
potentially a lot of waste
-                streamPlan.transferRanges(newEndpoint, keyspaceName, ranges);
+                streamPlan.transferRanges(newEndpoint, keyspaceName, replicas);
             }
         }
         return streamPlan.execute();
@@ -5151,53 +5333,109 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
      * Calculate pair of ranges to stream/fetch for given two range collections
      * (current ranges for keyspace and ranges after move to new token)
      *
+     * With transient replication the added wrinkle is that if a range 
transitions from full to transient then
+     * we need to stream the range despite the fact that we are retaining it 
as transient. Some replica
+     * somewhere needs to transition from transient to full and we wll be the 
source.
+     *
+     * If the range is transient and is transitioning to full then always 
fetch even if the range was already transient
+     * since a transiently replicated obviously needs to fetch data to become 
full.
+     *
+     * This why there is a continue after checking for instersection because 
intersection is not sufficient reason
+     * to do the subtraction since we might need to stream/fetch data anyways.
+     *
      * @param current collection of the ranges by current token
      * @param updated collection of the ranges after token is changed
      * @return pair of ranges to stream/fetch for given current and updated 
range collections
      */
-    public Pair<Set<Range<Token>>, Set<Range<Token>>> 
calculateStreamAndFetchRanges(Collection<Range<Token>> current, 
Collection<Range<Token>> updated)
+    public static Pair<RangesAtEndpoint, RangesAtEndpoint> 
calculateStreamAndFetchRanges(RangesAtEndpoint current, RangesAtEndpoint 
updated)
     {
-        Set<Range<Token>> toStream = new HashSet<>();
-        Set<Range<Token>> toFetch  = new HashSet<>();
-
+        // FIXME: transient replication
+        // this should always be the local node, except for tests TODO: assert 
this
+        RangesAtEndpoint.Builder toStream = 
RangesAtEndpoint.builder(current.endpoint());
+        RangesAtEndpoint.Builder toFetch  = 
RangesAtEndpoint.builder(current.endpoint());
 
-        for (Range<Token> r1 : current)
+        logger.debug("Calculating toStream");
+        for (Replica r1 : current)
         {
             boolean intersect = false;
-            for (Range<Token> r2 : updated)
+            RangesAtEndpoint.Mutable remainder = null;
+            for (Replica r2 : updated)
             {
-                if (r1.intersects(r2))
+                logger.debug("Comparing {} and {}", r1, r2);
+                //If we will end up transiently replicating send the entire 
thing and don't subtract
+                if (r1.intersectsOnRange(r2) && !(r1.isFull() && 
r2.isTransient()))
                 {
-                    // adding difference ranges to fetch from a ring
-                    toStream.addAll(r1.subtract(r2));
+                    RangesAtEndpoint.Mutable oldRemainder = remainder;
+                    remainder = new 
RangesAtEndpoint.Mutable(current.endpoint());
+                    if (oldRemainder != null)
+                    {
+                        for (Replica replica : oldRemainder)
+                        {
+                            
remainder.addAll(replica.subtractIgnoreTransientStatus(r2.range()));
+                        }
+                    }
+                    else
+                    {
+                        
remainder.addAll(r1.subtractIgnoreTransientStatus(r2.range()));
+                    }
+                    logger.debug("    Intersects adding {}", remainder);
                     intersect = true;
                 }
             }
             if (!intersect)
             {
-                toStream.add(r1); // should seed whole old range
+                logger.debug("    Doesn't intersect adding {}", r1);
+                toStream.add(r1); // should stream whole old range
+            }
+            else
+            {
+                toStream.addAll(remainder);
             }
         }
 
-        for (Range<Token> r2 : updated)
+        logger.debug("Calculating toFetch");
+        for (Replica r2 : updated)
         {
             boolean intersect = false;
-            for (Range<Token> r1 : current)
+            RangesAtEndpoint.Mutable remainder = null;
+            for (Replica r1 : current)
             {
-                if (r2.intersects(r1))
+                logger.info("Comparing {} and {}", r2, r1);
+                //Transitioning from transient to full means fetch everything 
so intersection doesn't matter.
+                if (r2.intersectsOnRange(r1) && !(r1.isTransient() && 
r2.isFull()))
                 {
-                    // adding difference ranges to fetch from a ring
-                    toFetch.addAll(r2.subtract(r1));
+                    RangesAtEndpoint.Mutable oldRemainder = remainder;
+                    remainder = new 
RangesAtEndpoint.Mutable(current.endpoint());
+                    if (oldRemainder != null)
+                    {
+                        for (Replica replica : oldRemainder)
+                        {
+                            
remainder.addAll(replica.subtractIgnoreTransientStatus(r1.range()));
+                        }
+                    }
+                    else
+                    {
+                        
remainder.addAll(r2.subtractIgnoreTransientStatus(r1.range()));
+                    }
+                    logger.debug("    Intersects adding {}", remainder);
                     intersect = true;
                 }
             }
             if (!intersect)
             {
+                logger.debug("    Doesn't intersect adding {}", r2);
                 toFetch.add(r2); // should fetch whole old range
             }
+            else
+            {
+                toFetch.addAll(remainder);
+            }
         }
 
-        return Pair.create(toStream, toFetch);
+        logger.debug("To stream {}", toStream);
+        logger.debug("To fetch {}", toFetch);
+
+        return Pair.create(toStream.build(), toFetch.build());
     }
 
     public void bulkLoad(String directory)
@@ -5233,10 +5471,12 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
                 this.keyspace = keyspace;
                 try
                 {
-                    for (Map.Entry<Range<Token>, List<InetAddressAndPort>> 
entry : StorageService.instance.getRangeToAddressMap(keyspace).entrySet())
+                    for (Map.Entry<Range<Token>, EndpointsForRange> entry : 
StorageService.instance.getRangeToAddressMap(keyspace).entrySet())
                     {
                         Range<Token> range = entry.getKey();
-                        for (InetAddressAndPort endpoint : entry.getValue())
+                        EndpointsForRange replicas = entry.getValue();
+                        Replicas.temporaryAssertFull(replicas);
+                        for (InetAddressAndPort endpoint : 
replicas.endpoints())
                             addRangeForEndpoint(range, endpoint);
                     }
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java 
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 0f4c7dd..4e6295a 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -217,6 +217,8 @@ public interface StorageServiceMBean extends 
NotificationEmitter
     @Deprecated public List<InetAddress> getNaturalEndpoints(String 
keyspaceName, ByteBuffer key);
     public List<String> getNaturalEndpointsWithPort(String keysapceName, 
ByteBuffer key);
 
+    public List<String> getReplicas(String keyspaceName, String cf, String 
key);
+
     /**
      * @deprecated use {@link #takeSnapshot(String tag, Map options, String... 
entities)} instead.
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/WriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/WriteResponseHandler.java 
b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
index 65efeff..a07aae6 100644
--- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
@@ -17,18 +17,15 @@
  */
 package org.apache.cassandra.service;
 
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.ReplicaLayout;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.WriteType;
 
 /**
@@ -42,26 +39,18 @@ public class WriteResponseHandler<T> extends 
AbstractWriteResponseHandler<T>
     private static final AtomicIntegerFieldUpdater<WriteResponseHandler> 
responsesUpdater
             = AtomicIntegerFieldUpdater.newUpdater(WriteResponseHandler.class, 
"responses");
 
-    public WriteResponseHandler(Collection<InetAddressAndPort> writeEndpoints,
-                                Collection<InetAddressAndPort> 
pendingEndpoints,
-                                ConsistencyLevel consistencyLevel,
-                                Keyspace keyspace,
+    public WriteResponseHandler(ReplicaLayout.ForToken replicaLayout,
                                 Runnable callback,
                                 WriteType writeType,
                                 long queryStartNanoTime)
     {
-        super(keyspace, writeEndpoints, pendingEndpoints, consistencyLevel, 
callback, writeType, queryStartNanoTime);
+        super(replicaLayout, callback, writeType, queryStartNanoTime);
         responses = totalBlockFor();
     }
 
-    public WriteResponseHandler(InetAddressAndPort endpoint, WriteType 
writeType, Runnable callback, long queryStartNanoTime)
-    {
-        this(Arrays.asList(endpoint), 
Collections.<InetAddressAndPort>emptyList(), ConsistencyLevel.ONE, null, 
callback, writeType, queryStartNanoTime);
-    }
-
-    public WriteResponseHandler(InetAddressAndPort endpoint, WriteType 
writeType, long queryStartNanoTime)
+    public WriteResponseHandler(ReplicaLayout.ForToken replicaLayout, 
WriteType writeType, long queryStartNanoTime)
     {
-        this(endpoint, writeType, null, queryStartNanoTime);
+        this(replicaLayout, null, writeType, queryStartNanoTime);
     }
 
     public void response(MessageIn<T> m)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to