http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java 
b/src/java/org/apache/cassandra/dht/RangeStreamer.java
index 110fed6..e8aa5d3 100644
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -18,27 +18,40 @@
 package org.apache.cassandra.dht;
 
 import java.util.*;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ArrayListMultimap;
+import com.google.common.base.Predicate;
 import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
 import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
 
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.locator.Endpoints;
+import org.apache.cassandra.locator.EndpointsByReplica;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.LocalStrategy;
 
+import org.apache.cassandra.locator.EndpointsByRange;
+import org.apache.cassandra.locator.EndpointsForRange;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.gms.EndpointState;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.IFailureDetector;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaCollection;
+import org.apache.cassandra.locator.Replicas;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.PreviewKind;
@@ -47,13 +60,25 @@ import org.apache.cassandra.streaming.StreamResultFuture;
 import org.apache.cassandra.streaming.StreamOperation;
 import org.apache.cassandra.utils.FBUtilities;
 
+import static com.google.common.base.Predicates.and;
+import static com.google.common.base.Predicates.not;
+import static com.google.common.collect.Iterables.all;
+import static com.google.common.collect.Iterables.any;
+import static org.apache.cassandra.locator.Replica.fullReplica;
+
 /**
- * Assists in streaming ranges to a node.
+ * Assists in streaming ranges to this node.
  */
 public class RangeStreamer
 {
     private static final Logger logger = 
LoggerFactory.getLogger(RangeStreamer.class);
 
+    public static Predicate<Replica> ALIVE_PREDICATE = replica ->
+                                                             
(!Gossiper.instance.isEnabled() ||
+                                                              
(Gossiper.instance.getEndpointStateForEndpoint(replica.endpoint()) == null ||
+                                                               
Gossiper.instance.getEndpointStateForEndpoint(replica.endpoint()).isAlive())) &&
+                                                             
FailureDetector.instance.isAlive(replica.endpoint());
+
     /* bootstrap tokens. can be null if replacing the node. */
     private final Collection<Token> tokens;
     /* current token ring */
@@ -62,26 +87,59 @@ public class RangeStreamer
     private final InetAddressAndPort address;
     /* streaming description */
     private final String description;
-    private final Multimap<String, Map.Entry<InetAddressAndPort, 
Collection<Range<Token>>>> toFetch = HashMultimap.create();
-    private final Set<ISourceFilter> sourceFilters = new HashSet<>();
+    private final Multimap<String, Multimap<InetAddressAndPort, FetchReplica>> 
toFetch = HashMultimap.create();
+    private final Set<Predicate<Replica>> sourceFilters = new HashSet<>();
     private final StreamPlan streamPlan;
     private final boolean useStrictConsistency;
     private final IEndpointSnitch snitch;
     private final StreamStateStore stateStore;
 
-    /**
-     * A filter applied to sources to stream from when constructing a fetch 
map.
-     */
-    public static interface ISourceFilter
+    public static class FetchReplica
     {
-        public boolean shouldInclude(InetAddressAndPort endpoint);
+        public final Replica local;
+        public final Replica remote;
+
+        public FetchReplica(Replica local, Replica remote)
+        {
+            Preconditions.checkNotNull(local);
+            Preconditions.checkNotNull(remote);
+            assert local.isLocal() && !remote.isLocal();
+            this.local = local;
+            this.remote = remote;
+        }
+
+        public String toString()
+        {
+            return "FetchReplica{" +
+                   "local=" + local +
+                   ", remote=" + remote +
+                   '}';
+        }
+
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            FetchReplica that = (FetchReplica) o;
+
+            if (!local.equals(that.local)) return false;
+            return remote.equals(that.remote);
+        }
+
+        public int hashCode()
+        {
+            int result = local.hashCode();
+            result = 31 * result + remote.hashCode();
+            return result;
+        }
     }
 
     /**
      * Source filter which excludes any endpoints that are not alive according 
to a
      * failure detector.
      */
-    public static class FailureDetectorSourceFilter implements ISourceFilter
+    public static class FailureDetectorSourceFilter implements 
Predicate<Replica>
     {
         private final IFailureDetector fd;
 
@@ -90,16 +148,16 @@ public class RangeStreamer
             this.fd = fd;
         }
 
-        public boolean shouldInclude(InetAddressAndPort endpoint)
+        public boolean apply(Replica replica)
         {
-            return fd.isAlive(endpoint);
+            return fd.isAlive(replica.endpoint());
         }
     }
 
     /**
      * Source filter which excludes any endpoints that are not in a specific 
data center.
      */
-    public static class SingleDatacenterFilter implements ISourceFilter
+    public static class SingleDatacenterFilter implements Predicate<Replica>
     {
         private final String sourceDc;
         private final IEndpointSnitch snitch;
@@ -110,27 +168,27 @@ public class RangeStreamer
             this.snitch = snitch;
         }
 
-        public boolean shouldInclude(InetAddressAndPort endpoint)
+        public boolean apply(Replica replica)
         {
-            return snitch.getDatacenter(endpoint).equals(sourceDc);
+            return snitch.getDatacenter(replica).equals(sourceDc);
         }
     }
 
     /**
      * Source filter which excludes the current node from source calculations
      */
-    public static class ExcludeLocalNodeFilter implements ISourceFilter
+    public static class ExcludeLocalNodeFilter implements Predicate<Replica>
     {
-        public boolean shouldInclude(InetAddressAndPort endpoint)
+        public boolean apply(Replica replica)
         {
-            return !FBUtilities.getBroadcastAddressAndPort().equals(endpoint);
+            return !replica.isLocal();
         }
     }
 
     /**
      * Source filter which only includes endpoints contained within a provided 
set.
      */
-    public static class WhitelistedSourcesFilter implements ISourceFilter
+    public static class WhitelistedSourcesFilter implements Predicate<Replica>
     {
         private final Set<InetAddressAndPort> whitelistedSources;
 
@@ -139,9 +197,9 @@ public class RangeStreamer
             this.whitelistedSources = whitelistedSources;
         }
 
-        public boolean shouldInclude(InetAddressAndPort endpoint)
+        public boolean apply(Replica replica)
         {
-            return whitelistedSources.contains(endpoint);
+            return whitelistedSources.contains(replica.endpoint());
         }
     }
 
@@ -167,7 +225,7 @@ public class RangeStreamer
         streamPlan.listeners(this.stateStore);
     }
 
-    public void addSourceFilter(ISourceFilter filter)
+    public void addSourceFilter(Predicate<Replica> filter)
     {
         sourceFilters.add(filter);
     }
@@ -176,80 +234,95 @@ public class RangeStreamer
      * Add ranges to be streamed for given keyspace.
      *
      * @param keyspaceName keyspace name
-     * @param ranges ranges to be streamed
+     * @param replicas ranges to be streamed
      */
-    public void addRanges(String keyspaceName, Collection<Range<Token>> ranges)
+    public void addRanges(String keyspaceName, ReplicaCollection<?> replicas)
     {
-        if(Keyspace.open(keyspaceName).getReplicationStrategy() instanceof 
LocalStrategy)
+        Keyspace keyspace = Keyspace.open(keyspaceName);
+        AbstractReplicationStrategy strat = keyspace.getReplicationStrategy();
+        if(strat instanceof LocalStrategy)
         {
             logger.info("Not adding ranges for Local Strategy keyspace={}", 
keyspaceName);
             return;
         }
 
-        boolean useStrictSource = useStrictSourcesForRanges(keyspaceName);
-        Multimap<Range<Token>, InetAddressAndPort> rangesForKeyspace = 
useStrictSource
-                ? getAllRangesWithStrictSourcesFor(keyspaceName, ranges) : 
getAllRangesWithSourcesFor(keyspaceName, ranges);
+        boolean useStrictSource = useStrictSourcesForRanges(strat);
+        EndpointsByReplica fetchMap = 
calculateRangesToFetchWithPreferredEndpoints(replicas, keyspace, 
useStrictSource);
 
-        for (Map.Entry<Range<Token>, InetAddressAndPort> entry : 
rangesForKeyspace.entries())
+        for (Map.Entry<Replica, Replica> entry : fetchMap.flattenEntries())
             logger.info("{}: range {} exists on {} for keyspace {}", 
description, entry.getKey(), entry.getValue(), keyspaceName);
 
-        AbstractReplicationStrategy strat = 
Keyspace.open(keyspaceName).getReplicationStrategy();
-        Multimap<InetAddressAndPort, Range<Token>> rangeFetchMap = 
useStrictSource || strat == null || strat.getReplicationFactor() == 1
-                                                            ? 
getRangeFetchMap(rangesForKeyspace, sourceFilters, keyspaceName, 
useStrictConsistency)
-                                                            : 
getOptimizedRangeFetchMap(rangesForKeyspace, sourceFilters, keyspaceName);
 
-        for (Map.Entry<InetAddressAndPort, Collection<Range<Token>>> entry : 
rangeFetchMap.asMap().entrySet())
+        Multimap<InetAddressAndPort, FetchReplica> workMap;
+        //Only use the optimized strategy if we don't care about strict 
sources, have a replication factor > 1, and no
+        //transient replicas.
+        if (useStrictSource || strat == null || 
strat.getReplicationFactor().allReplicas == 1 || 
strat.getReplicationFactor().hasTransientReplicas())
+        {
+            workMap = convertPreferredEndpointsToWorkMap(fetchMap);
+        }
+        else
+        {
+            workMap = getOptimizedWorkMap(fetchMap, sourceFilters, 
keyspaceName);
+        }
+
+        toFetch.put(keyspaceName, workMap);
+        for (Map.Entry<InetAddressAndPort, Collection<FetchReplica>> entry : 
workMap.asMap().entrySet())
         {
             if (logger.isTraceEnabled())
             {
-                for (Range<Token> r : entry.getValue())
-                    logger.trace("{}: range {} from source {} for keyspace 
{}", description, r, entry.getKey(), keyspaceName);
+                for (FetchReplica r : entry.getValue())
+                    logger.trace("{}: range source {} local range {} for 
keyspace {}", description, r.remote, r.local, keyspaceName);
             }
-            toFetch.put(keyspaceName, entry);
         }
     }
 
     /**
-     * @param keyspaceName keyspace name to check
+     * @param strat AbstractReplicationStrategy of keyspace to check
      * @return true when the node is bootstrapping, useStrictConsistency is 
true and # of nodes in the cluster is more than # of replica
      */
-    private boolean useStrictSourcesForRanges(String keyspaceName)
+    private boolean useStrictSourcesForRanges(AbstractReplicationStrategy 
strat)
     {
-        AbstractReplicationStrategy strat = 
Keyspace.open(keyspaceName).getReplicationStrategy();
         return useStrictConsistency
                 && tokens != null
-                && metadata.getSizeOfAllEndpoints() != 
strat.getReplicationFactor();
+                && metadata.getSizeOfAllEndpoints() != 
strat.getReplicationFactor().allReplicas;
     }
 
     /**
-     * Get a map of all ranges and their respective sources that are 
candidates for streaming the given ranges
-     * to us. For each range, the list of sources is sorted by proximity 
relative to the given destAddress.
-     *
-     * @throws java.lang.IllegalStateException when there is no source to get 
data streamed
+     * Wrapper method to assemble the arguments for invoking the 
implementation with RangeStreamer's parameters
+     * @param fetchRanges
+     * @param keyspace
+     * @param useStrictConsistency
+     * @return
      */
-    private Multimap<Range<Token>, InetAddressAndPort> 
getAllRangesWithSourcesFor(String keyspaceName, Collection<Range<Token>> 
desiredRanges)
+    private EndpointsByReplica 
calculateRangesToFetchWithPreferredEndpoints(ReplicaCollection<?> fetchRanges, 
Keyspace keyspace, boolean useStrictConsistency)
     {
-        AbstractReplicationStrategy strat = 
Keyspace.open(keyspaceName).getReplicationStrategy();
-        Multimap<Range<Token>, InetAddressAndPort> rangeAddresses = 
strat.getRangeAddresses(metadata.cloneOnlyTokenMap());
+        AbstractReplicationStrategy strat = keyspace.getReplicationStrategy();
 
-        Multimap<Range<Token>, InetAddressAndPort> rangeSources = 
ArrayListMultimap.create();
-        for (Range<Token> desiredRange : desiredRanges)
-        {
-            for (Range<Token> range : rangeAddresses.keySet())
-            {
-                if (range.contains(desiredRange))
-                {
-                    List<InetAddressAndPort> preferred = 
snitch.getSortedListByProximity(address, rangeAddresses.get(range));
-                    rangeSources.putAll(desiredRange, preferred);
-                    break;
-                }
-            }
+        TokenMetadata tmd = metadata.cloneOnlyTokenMap();
+
+        TokenMetadata tmdAfter = null;
 
-            if (!rangeSources.keySet().contains(desiredRange))
-                throw new IllegalStateException("No sources found for " + 
desiredRange);
+        if (tokens != null)
+        {
+            // Pending ranges
+            tmdAfter =  tmd.cloneOnlyTokenMap();
+            tmdAfter.updateNormalTokens(tokens, address);
         }
+        else if (useStrictConsistency)
+        {
+            throw new IllegalArgumentException("Can't ask for strict 
consistency and not supply tokens");
+        }
+
+        return 
RangeStreamer.calculateRangesToFetchWithPreferredEndpoints(snitch::sortedByProximity,
+                                                                           
strat,
+                                                                           
fetchRanges,
+                                                                           
useStrictConsistency,
+                                                                           tmd,
+                                                                           
tmdAfter,
+                                                                           
ALIVE_PREDICATE,
+                                                                           
keyspace.getName(),
+                                                                           
sourceFilters);
 
-        return rangeSources;
     }
 
     /**
@@ -257,129 +330,234 @@ public class RangeStreamer
      * For each range, the list should only contain a single source. This 
allows us to consistently migrate data without violating
      * consistency.
      *
-     * @throws java.lang.IllegalStateException when there is no source to get 
data streamed, or more than 1 source found.
-     */
-    private Multimap<Range<Token>, InetAddressAndPort> 
getAllRangesWithStrictSourcesFor(String keyspace, Collection<Range<Token>> 
desiredRanges)
+     **/
+     public static EndpointsByReplica
+     
calculateRangesToFetchWithPreferredEndpoints(BiFunction<InetAddressAndPort, 
EndpointsForRange, EndpointsForRange> snitchGetSortedListByProximity,
+                                                  AbstractReplicationStrategy 
strat,
+                                                  ReplicaCollection<?> 
fetchRanges,
+                                                  boolean useStrictConsistency,
+                                                  TokenMetadata tmdBefore,
+                                                  TokenMetadata tmdAfter,
+                                                  Predicate<Replica> isAlive,
+                                                  String keyspace,
+                                                  
Collection<Predicate<Replica>> sourceFilters)
     {
-        assert tokens != null;
-        AbstractReplicationStrategy strat = 
Keyspace.open(keyspace).getReplicationStrategy();
-
-        // Active ranges
-        TokenMetadata metadataClone = metadata.cloneOnlyTokenMap();
-        Multimap<Range<Token>, InetAddressAndPort> addressRanges = 
strat.getRangeAddresses(metadataClone);
+        EndpointsByRange rangeAddresses = strat.getRangeAddresses(tmdBefore);
 
-        // Pending ranges
-        metadataClone.updateNormalTokens(tokens, address);
-        Multimap<Range<Token>, InetAddressAndPort> pendingRangeAddresses = 
strat.getRangeAddresses(metadataClone);
+        InetAddressAndPort localAddress = 
FBUtilities.getBroadcastAddressAndPort();
+        logger.debug ("Keyspace: {}", keyspace);
+        logger.debug("To fetch RN: {}", fetchRanges);
+        logger.debug("Fetch ranges: {}", rangeAddresses);
 
-        // Collects the source that will have its range moved to the new node
-        Multimap<Range<Token>, InetAddressAndPort> rangeSources = 
ArrayListMultimap.create();
+        Predicate<Replica> testSourceFilters = and(sourceFilters);
+        Function<EndpointsForRange, EndpointsForRange> sorted =
+                endpoints -> 
snitchGetSortedListByProximity.apply(localAddress, endpoints);
 
-        for (Range<Token> desiredRange : desiredRanges)
+        //This list of replicas is just candidates. With strict consistency 
it's going to be a narrow list.
+        EndpointsByReplica.Mutable rangesToFetchWithPreferredEndpoints = new 
EndpointsByReplica.Mutable();
+        for (Replica toFetch : fetchRanges)
         {
-            for (Map.Entry<Range<Token>, Collection<InetAddressAndPort>> 
preEntry : addressRanges.asMap().entrySet())
+            //Replica that is sufficient to provide the data we need
+            //With strict consistency and transient replication we may end up 
with multiple types
+            //so this isn't used with strict consistency
+            Predicate<Replica> isSufficient = r -> (toFetch.isTransient() || 
r.isFull());
+            Predicate<Replica> accept = r ->
+                       isSufficient.test(r)                 // is sufficient
+                    && !r.endpoint().equals(localAddress)   // is not self
+                    && isAlive.test(r);                     // is alive
+
+            logger.debug("To fetch {}", toFetch);
+            for (Range<Token> range : rangeAddresses.keySet())
             {
-                if (preEntry.getKey().contains(desiredRange))
+                if (range.contains(toFetch.range()))
                 {
-                    Set<InetAddressAndPort> oldEndpoints = 
Sets.newHashSet(preEntry.getValue());
-                    Set<InetAddressAndPort> newEndpoints = 
Sets.newHashSet(pendingRangeAddresses.get(desiredRange));
+                    EndpointsForRange oldEndpoints = rangeAddresses.get(range);
 
-                    // Due to CASSANDRA-5953 we can have a higher RF then we 
have endpoints.
-                    // So we need to be careful to only be strict when 
endpoints == RF
-                    if (oldEndpoints.size() == strat.getReplicationFactor())
+                    //Ultimately we populate this with whatever is going to be 
fetched from to satisfy toFetch
+                    //It could be multiple endpoints and we must fetch from 
all of them if they are there
+                    //With transient replication and strict consistency this 
is to get the full data from a full replica and
+                    //transient data from the transient replica losing data
+                    EndpointsForRange sources;
+                    if (useStrictConsistency)
+                    {
+                        //Start with two sets of who replicates the range 
before and who replicates it after
+                        EndpointsForRange newEndpoints = 
strat.calculateNaturalReplicas(toFetch.range().right, tmdAfter);
+                        logger.debug("Old endpoints {}", oldEndpoints);
+                        logger.debug("New endpoints {}", newEndpoints);
+
+                        //Due to CASSANDRA-5953 we can have a higher RF then 
we have endpoints.
+                        //So we need to be careful to only be strict when 
endpoints == RF
+                        if (oldEndpoints.size() == 
strat.getReplicationFactor().allReplicas)
+                        {
+                            Set<InetAddressAndPort> endpointsStillReplicated = 
newEndpoints.endpoints();
+                            // Remove new endpoints from old endpoints based 
on address
+                            oldEndpoints = oldEndpoints.filter(r -> 
!endpointsStillReplicated.contains(r.endpoint()));
+
+                            if (!all(oldEndpoints, isAlive))
+                                throw new IllegalStateException("A node 
required to move the data consistently is down: "
+                                                                + 
oldEndpoints.filter(not(isAlive)));
+
+                            if (oldEndpoints.size() > 1)
+                                throw new AssertionError("Expected <= 1 
endpoint but found " + oldEndpoints);
+
+                            //If we are transitioning from transient to full 
and and the set of replicas for the range is not changing
+                            //we might end up with no endpoints to fetch from 
by address. In that case we can pick any full replica safely
+                            //since we are already a transient replica and the 
existing replica remains.
+                            //The old behavior where we might be asked to 
fetch ranges we don't need shouldn't occur anymore.
+                            //So it's an error if we don't find what we need.
+                            if (oldEndpoints.isEmpty() && 
toFetch.isTransient())
+                            {
+                                throw new AssertionError("If there are no 
endpoints to fetch from then we must be transitioning from transient to full 
for range " + toFetch);
+                            }
+
+                            if (!any(oldEndpoints, isSufficient))
+                            {
+                                // need an additional replica
+                                EndpointsForRange endpointsForRange = 
sorted.apply(rangeAddresses.get(range));
+                                // include all our filters, to ensure we 
include a matching node
+                                Optional<Replica> fullReplica = 
Iterables.<Replica>tryFind(endpointsForRange, and(accept, 
testSourceFilters)).toJavaUtil();
+                                if (fullReplica.isPresent())
+                                    oldEndpoints = 
Endpoints.concat(oldEndpoints, EndpointsForRange.of(fullReplica.get()));
+                                else
+                                    throw new IllegalStateException("Couldn't 
find any matching sufficient replica out of " + endpointsForRange);
+                            }
+
+                            //We have to check the source filters here to see 
if they will remove any replicas
+                            //required for strict consistency
+                            if (!all(oldEndpoints, testSourceFilters))
+                                throw new IllegalStateException("Necessary 
replicas for strict consistency were removed by source filters: " + 
oldEndpoints.filter(not(testSourceFilters)));
+                        }
+                        else
+                        {
+                            oldEndpoints = 
sorted.apply(oldEndpoints.filter(accept));
+                        }
+
+                        //Apply testSourceFilters that were given to us, and 
establish everything remaining is alive for the strict case
+                        sources = oldEndpoints.filter(testSourceFilters);
+                    }
+                    else
                     {
-                        oldEndpoints.removeAll(newEndpoints);
-                        assert oldEndpoints.size() == 1 : "Expected 1 endpoint 
but found " + oldEndpoints.size();
+                        //Without strict consistency we have given up on 
correctness so no point in fetching from
+                        //a random full + transient replica since it's also 
likely to lose data
+                        //Also apply testSourceFilters that were given to us 
so we can safely select a single source
+                        sources = 
sorted.apply(rangeAddresses.get(range).filter(and(accept, testSourceFilters)));
+                        //Limit it to just the first possible source, we don't 
need more than one and downstream
+                        //will fetch from every source we supply
+                        sources = sources.size() > 0 ? sources.subList(0, 1) : 
sources;
                     }
 
-                    rangeSources.put(desiredRange, 
oldEndpoints.iterator().next());
+                    // storing range and preferred endpoint set
+                    rangesToFetchWithPreferredEndpoints.putAll(toFetch, 
sources, Conflict.NONE);
+                    logger.debug("Endpoints to fetch for {} are {}", toFetch, 
sources);
                 }
             }
 
-            // Validate
-            Collection<InetAddressAndPort> addressList = 
rangeSources.get(desiredRange);
-            if (addressList == null || addressList.isEmpty())
-                throw new IllegalStateException("No sources found for " + 
desiredRange);
-
-            if (addressList.size() > 1)
-                throw new IllegalStateException("Multiple endpoints found for 
" + desiredRange);
+            EndpointsForRange addressList = 
rangesToFetchWithPreferredEndpoints.getIfPresent(toFetch);
+            if (addressList == null)
+                throw new IllegalStateException("Failed to find endpoints to 
fetch " + toFetch);
+
+            /*
+             * When we move forwards (shrink our bucket) we are the one losing 
a range and no one else loses
+             * from that action (we also don't gain). When we move backwards 
there are two people losing a range. One is a full replica
+             * and the other is a transient replica. So we must need fetch 
from two places in that case for the full range we gain.
+             * For a transient range we only need to fetch from one.
+             */
+            if (useStrictConsistency && addressList.size() > 1 && 
(addressList.filter(Replica::isFull).size() > 1 || 
addressList.filter(Replica::isTransient).size() > 1))
+                throw new IllegalStateException(String.format("Multiple strict 
sources found for %s, sources: %s", toFetch, addressList));
+
+            //We must have enough stuff to fetch from
+            if ((toFetch.isFull() && !any(addressList, Replica::isFull)) || 
addressList.isEmpty())
+            {
+                if (strat.getReplicationFactor().allReplicas == 1)
+                {
+                    if (useStrictConsistency)
+                    {
+                        logger.warn("A node required to move the data 
consistently is down");
+                        throw new IllegalStateException("Unable to find 
sufficient sources for streaming range " + toFetch + " in keyspace " + keyspace 
+ " with RF=1. " +
+                                                        "Ensure this keyspace 
contains replicas in the source datacenter.");
+                    }
+                    else
+                        logger.warn("Unable to find sufficient sources for 
streaming range {} in keyspace {} with RF=1. " +
+                                    "Keyspace might be missing data.", 
toFetch, keyspace);
 
-            InetAddressAndPort sourceIp = addressList.iterator().next();
-            EndpointState sourceState = 
Gossiper.instance.getEndpointStateForEndpoint(sourceIp);
-            if (Gossiper.instance.isEnabled() && (sourceState == null || 
!sourceState.isAlive()))
-                throw new RuntimeException("A node required to move the data 
consistently is down (" + sourceIp + "). " +
-                                           "If you wish to move the data from 
a potentially inconsistent replica, restart the node with 
-Dcassandra.consistent.rangemovement=false");
+                }
+                else
+                {
+                    if (useStrictConsistency)
+                        logger.warn("A node required to move the data 
consistently is down");
+                    throw new IllegalStateException("Unable to find sufficient 
sources for streaming range " + toFetch + " in keyspace " + keyspace);
+                }
+            }
         }
-
-        return rangeSources;
+        return rangesToFetchWithPreferredEndpoints.asImmutableView();
     }
 
     /**
-     * @param rangesWithSources The ranges we want to fetch (key) and their 
potential sources (value)
-     * @param sourceFilters A (possibly empty) collection of source filters to 
apply. In addition to any filters given
-     *                      here, we always exclude ourselves.
-     * @param keyspace keyspace name
-     * @return Map of source endpoint to collection of ranges
+     * The preferred endpoint list is the wrong format because it is keyed by 
Replica (this node) rather than the source
+     * endpoint we will fetch from which streaming wants.
+     * @param preferredEndpoints
+     * @return
      */
-    private static Multimap<InetAddressAndPort, Range<Token>> 
getRangeFetchMap(Multimap<Range<Token>, InetAddressAndPort> rangesWithSources,
-                                                                               
Collection<ISourceFilter> sourceFilters, String keyspace,
-                                                                               
boolean useStrictConsistency)
+    public static Multimap<InetAddressAndPort, FetchReplica> 
convertPreferredEndpointsToWorkMap(EndpointsByReplica preferredEndpoints)
     {
-        Multimap<InetAddressAndPort, Range<Token>> rangeFetchMapMap = 
HashMultimap.create();
-        for (Range<Token> range : rangesWithSources.keySet())
+        Multimap<InetAddressAndPort, FetchReplica> workMap = 
HashMultimap.create();
+        for (Map.Entry<Replica, EndpointsForRange> e : 
preferredEndpoints.entrySet())
         {
-            boolean foundSource = false;
-
-            outer:
-            for (InetAddressAndPort address : rangesWithSources.get(range))
+            for (Replica source : e.getValue())
             {
-                for (ISourceFilter filter : sourceFilters)
-                {
-                    if (!filter.shouldInclude(address))
-                        continue outer;
-                }
+                assert (e.getKey()).isLocal();
+                assert !source.isLocal();
+                workMap.put(source.endpoint(), new FetchReplica(e.getKey(), 
source));
+            }
+        }
+        logger.debug("Work map {}", workMap);
+        return workMap;
+    }
 
-                if (address.equals(FBUtilities.getBroadcastAddressAndPort()))
-                {
-                    // If localhost is a source, we have found one, but we 
don't add it to the map to avoid streaming locally
-                    foundSource = true;
-                    continue;
-                }
+    /**
+     * Optimized version that also outputs the final work map
+     */
+    private static Multimap<InetAddressAndPort, FetchReplica> 
getOptimizedWorkMap(EndpointsByReplica rangesWithSources,
+                                                                               
   Collection<Predicate<Replica>> sourceFilters, String keyspace)
+    {
+        //For now we just aren't going to use the optimized range fetch map 
with transient replication to shrink
+        //the surface area to test and introduce bugs.
+        //In the future it's possible we could run it twice once for full 
ranges with only full replicas
+        //and once with transient ranges and all replicas. Then merge the 
result.
+        EndpointsByRange.Mutable unwrapped = new EndpointsByRange.Mutable();
+        for (Map.Entry<Replica, Replica> entry : 
rangesWithSources.flattenEntries())
+        {
+            Replicas.temporaryAssertFull(entry.getValue());
+            unwrapped.put(entry.getKey().range(), entry.getValue());
+        }
 
-                rangeFetchMapMap.put(address, range);
-                foundSource = true;
-                break; // ensure we only stream from one other node for each 
range
-            }
+        RangeFetchMapCalculator calculator = new 
RangeFetchMapCalculator(unwrapped.asImmutableView(), sourceFilters, keyspace);
+        Multimap<InetAddressAndPort, Range<Token>> rangeFetchMapMap = 
calculator.getRangeFetchMap();
+        logger.info("Output from RangeFetchMapCalculator for keyspace {}", 
keyspace);
+        validateRangeFetchMap(unwrapped.asImmutableView(), rangeFetchMapMap, 
keyspace);
 
-            if (!foundSource)
+        //Need to rewrap as Replicas
+        Multimap<InetAddressAndPort, FetchReplica> wrapped = 
HashMultimap.create();
+        for (Map.Entry<InetAddressAndPort, Range<Token>> entry : 
rangeFetchMapMap.entries())
+        {
+            Replica toFetch = null;
+            for (Replica r : rangesWithSources.keySet())
             {
-                AbstractReplicationStrategy strat = 
Keyspace.open(keyspace).getReplicationStrategy();
-                if (strat != null && strat.getReplicationFactor() == 1)
+                if (r.range().equals(entry.getValue()))
                 {
-                    if (useStrictConsistency)
-                        throw new IllegalStateException("Unable to find 
sufficient sources for streaming range " + range + " in keyspace " + keyspace + 
" with RF=1. " +
-                                                        "Ensure this keyspace 
contains replicas in the source datacenter.");
-                    else
-                        logger.warn("Unable to find sufficient sources for 
streaming range {} in keyspace {} with RF=1. " +
-                                    "Keyspace might be missing data.", range, 
keyspace);
+                    if (toFetch != null)
+                        throw new AssertionError(String.format("There 
shouldn't be multiple replicas for range %s, replica %s and %s here", 
r.range(), r, toFetch));
+                    toFetch = r;
                 }
-                else
-                    throw new IllegalStateException("Unable to find sufficient 
sources for streaming range " + range + " in keyspace " + keyspace);
             }
+            if (toFetch == null)
+                throw new AssertionError("Shouldn't be possible for the 
Replica we fetch to be null here");
+            //Committing the cardinal sin of synthesizing a Replica, but it's 
ok because we assert earlier all of them
+            //are full and optimized range fetch map doesn't support transient 
replication yet.
+            wrapped.put(entry.getKey(), new FetchReplica(toFetch, 
fullReplica(entry.getKey(), entry.getValue())));
         }
 
-        return rangeFetchMapMap;
-    }
-
-
-    private static Multimap<InetAddressAndPort, Range<Token>> 
getOptimizedRangeFetchMap(Multimap<Range<Token>, InetAddressAndPort> 
rangesWithSources,
-                                                                               
         Collection<ISourceFilter> sourceFilters, String keyspace)
-    {
-        RangeFetchMapCalculator calculator = new 
RangeFetchMapCalculator(rangesWithSources, sourceFilters, keyspace);
-        Multimap<InetAddressAndPort, Range<Token>> rangeFetchMapMap = 
calculator.getRangeFetchMap();
-        logger.info("Output from RangeFetchMapCalculator for keyspace {}", 
keyspace);
-        validateRangeFetchMap(rangesWithSources, rangeFetchMapMap, keyspace);
-        return rangeFetchMapMap;
+        return wrapped;
     }
 
     /**
@@ -388,7 +566,7 @@ public class RangeStreamer
      * @param rangeFetchMapMap
      * @param keyspace
      */
-    private static void validateRangeFetchMap(Multimap<Range<Token>, 
InetAddressAndPort> rangesWithSources, Multimap<InetAddressAndPort, 
Range<Token>> rangeFetchMapMap, String keyspace)
+    private static void validateRangeFetchMap(EndpointsByRange 
rangesWithSources, Multimap<InetAddressAndPort, Range<Token>> rangeFetchMapMap, 
String keyspace)
     {
         for (Map.Entry<InetAddressAndPort, Range<Token>> entry : 
rangeFetchMapMap.entries())
         {
@@ -398,7 +576,7 @@ public class RangeStreamer
                                         + " in keyspace " + keyspace);
             }
 
-            if 
(!rangesWithSources.get(entry.getValue()).contains(entry.getKey()))
+            if 
(!rangesWithSources.get(entry.getValue()).endpoints().contains(entry.getKey()))
             {
                 throw new IllegalStateException("Trying to stream from wrong 
endpoint. Range: " + entry.getValue()
                                                 + " in keyspace " + keyspace + 
" from endpoint: " + entry.getKey());
@@ -408,39 +586,70 @@ public class RangeStreamer
         }
     }
 
-    public static Multimap<InetAddressAndPort, Range<Token>> 
getWorkMap(Multimap<Range<Token>, InetAddressAndPort> rangesWithSourceTarget, 
String keyspace,
-                                                                        
IFailureDetector fd, boolean useStrictConsistency)
-    {
-        return getRangeFetchMap(rangesWithSourceTarget, 
Collections.<ISourceFilter>singleton(new FailureDetectorSourceFilter(fd)), 
keyspace, useStrictConsistency);
-    }
-
     // For testing purposes
     @VisibleForTesting
-    Multimap<String, Map.Entry<InetAddressAndPort, Collection<Range<Token>>>> 
toFetch()
+    Multimap<String, Multimap<InetAddressAndPort, FetchReplica>> toFetch()
     {
         return toFetch;
     }
 
     public StreamResultFuture fetchAsync()
     {
-        for (Map.Entry<String, Map.Entry<InetAddressAndPort, 
Collection<Range<Token>>>> entry : toFetch.entries())
-        {
-            String keyspace = entry.getKey();
-            InetAddressAndPort source = entry.getValue().getKey();
-            Collection<Range<Token>> ranges = entry.getValue().getValue();
+        toFetch.forEach((keyspace, sources) -> {
+            logger.debug("Keyspace {} Sources {}", keyspace, sources);
+            sources.asMap().forEach((source, fetchReplicas) -> {
 
-            // filter out already streamed ranges
-            Set<Range<Token>> availableRanges = 
stateStore.getAvailableRanges(keyspace, 
StorageService.instance.getTokenMetadata().partitioner);
-            if (ranges.removeAll(availableRanges))
-            {
-                logger.info("Some ranges of {} are already available. Skipping 
streaming those ranges.", availableRanges);
-            }
+                // filter out already streamed ranges
+                RangesAtEndpoint available = 
stateStore.getAvailableRanges(keyspace, 
StorageService.instance.getTokenMetadata().partitioner);
 
-            if (logger.isTraceEnabled())
-                logger.trace("{}ing from {} ranges {}", description, source, 
StringUtils.join(ranges, ", "));
-            /* Send messages to respective folks to stream data over to me */
-            streamPlan.requestRanges(source, keyspace, ranges);
-        }
+                Predicate<FetchReplica> isAvailable = fetch -> {
+                    Replica availableRange =  
available.byRange().get(fetch.local.range());
+                    if (availableRange == null)
+                        //Range is unavailable
+                        return false;
+                    if (fetch.local.isFull())
+                        //For full, pick only replicas with matching 
transientness
+                        return availableRange.isFull() == 
fetch.remote.isFull();
+
+                    // Any transient or full will do
+                    return true;
+                };
+
+                List<FetchReplica> remaining = 
fetchReplicas.stream().filter(not(isAvailable)).collect(Collectors.toList());
+
+                if (remaining.size() < available.size())
+                {
+                    List<FetchReplica> skipped = 
fetchReplicas.stream().filter(isAvailable).collect(Collectors.toList());
+                    logger.info("Some ranges of {} are already available. 
Skipping streaming those ranges. Skipping {}. Fully available {} Transiently 
available {}",
+                                fetchReplicas, skipped, 
available.filter(Replica::isFull).ranges(), 
available.filter(Replica::isTransient).ranges());
+                }
+
+                if (logger.isTraceEnabled())
+                    logger.trace("{}ing from {} ranges {}", description, 
source, StringUtils.join(remaining, ", "));
+
+                //At the other end the distinction between full and transient 
is ignored it just used the transient status
+                //of the Replica objects we send to determine what to send. 
The real reason we have this split down to
+                //StreamRequest is that on completion StreamRequest is used to 
write to the system table tracking
+                //what has already been streamed. At that point since we only 
have the local Replica instances so we don't
+                //know what we got from the remote. We preserve that here by 
splitting based on the remotes transient
+                //status.
+                InetAddressAndPort self = 
FBUtilities.getBroadcastAddressAndPort();
+                RangesAtEndpoint full = remaining.stream()
+                        .filter(pair -> pair.remote.isFull())
+                        .map(pair -> pair.local)
+                        .collect(RangesAtEndpoint.collector(self));
+                RangesAtEndpoint transientReplicas = remaining.stream()
+                        .filter(pair -> pair.remote.isTransient())
+                        .map(pair -> pair.local)
+                        .collect(RangesAtEndpoint.collector(self));
+
+                logger.debug("Source and our replicas {}", fetchReplicas);
+                logger.debug("Source {} Keyspace {}  streaming full {} 
transient {}", source, keyspace, full, transientReplicas);
+
+                /* Send messages to respective folks to stream data over to me 
*/
+                streamPlan.requestRanges(source, keyspace, full, 
transientReplicas);
+            });
+        });
 
         return streamPlan.execute();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/dht/Splitter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Splitter.java 
b/src/java/org/apache/cassandra/dht/Splitter.java
index c63fe91..8578448 100644
--- a/src/java/org/apache/cassandra/dht/Splitter.java
+++ b/src/java/org/apache/cassandra/dht/Splitter.java
@@ -25,6 +25,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Objects;
 import java.util.Set;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -117,32 +118,31 @@ public abstract class Splitter
         return new BigDecimal(elapsedTokens(token, range)).divide(new 
BigDecimal(tokensInRange(range)), 3, BigDecimal.ROUND_HALF_EVEN).doubleValue();
     }
 
-    public List<Token> splitOwnedRanges(int parts, List<Range<Token>> 
localRanges, boolean dontSplitRanges)
+    public List<Token> splitOwnedRanges(int parts, List<WeightedRange> 
weightedRanges, boolean dontSplitRanges)
     {
-        if (localRanges.isEmpty() || parts == 1)
+        if (weightedRanges.isEmpty() || parts == 1)
             return Collections.singletonList(partitioner.getMaximumToken());
 
         BigInteger totalTokens = BigInteger.ZERO;
-        for (Range<Token> r : localRanges)
+        for (WeightedRange weightedRange : weightedRanges)
         {
-            BigInteger right = valueForToken(token(r.right));
-            totalTokens = 
totalTokens.add(right.subtract(valueForToken(r.left)));
+            totalTokens = totalTokens.add(weightedRange.totalTokens(this));
         }
+
         BigInteger perPart = totalTokens.divide(BigInteger.valueOf(parts));
         // the range owned is so tiny we can't split it:
         if (perPart.equals(BigInteger.ZERO))
             return Collections.singletonList(partitioner.getMaximumToken());
 
         if (dontSplitRanges)
-            return splitOwnedRangesNoPartialRanges(localRanges, perPart, 
parts);
+            return splitOwnedRangesNoPartialRanges(weightedRanges, perPart, 
parts);
 
         List<Token> boundaries = new ArrayList<>();
         BigInteger sum = BigInteger.ZERO;
-        for (Range<Token> r : localRanges)
+        for (WeightedRange weightedRange : weightedRanges)
         {
-            Token right = token(r.right);
-            BigInteger currentRangeWidth = 
valueForToken(right).subtract(valueForToken(r.left)).abs();
-            BigInteger left = valueForToken(r.left);
+            BigInteger currentRangeWidth = weightedRange.totalTokens(this);
+            BigInteger left = valueForToken(weightedRange.left());
             while (sum.add(currentRangeWidth).compareTo(perPart) >= 0)
             {
                 BigInteger withinRangeBoundary = perPart.subtract(sum);
@@ -155,26 +155,24 @@ public abstract class Splitter
         }
         boundaries.set(boundaries.size() - 1, partitioner.getMaximumToken());
 
-        assert boundaries.size() == parts : boundaries.size() + "!=" + parts + 
" " + boundaries + ":" + localRanges;
+        assert boundaries.size() == parts : boundaries.size() + "!=" + parts + 
" " + boundaries + ":" + weightedRanges;
         return boundaries;
     }
 
-    private List<Token> splitOwnedRangesNoPartialRanges(List<Range<Token>> 
localRanges, BigInteger perPart, int parts)
+    private List<Token> splitOwnedRangesNoPartialRanges(List<WeightedRange> 
weightedRanges, BigInteger perPart, int parts)
     {
         List<Token> boundaries = new ArrayList<>(parts);
         BigInteger sum = BigInteger.ZERO;
 
         int i = 0;
-        final int rangesCount = localRanges.size();
+        final int rangesCount = weightedRanges.size();
         while (boundaries.size() < parts - 1 && i < rangesCount - 1)
         {
-            Range<Token> r = localRanges.get(i);
-            Range<Token> nextRange = localRanges.get(i + 1);
-            Token right = token(r.right);
-            Token nextRight = token(nextRange.right);
+            WeightedRange r = weightedRanges.get(i);
+            WeightedRange nextRange = weightedRanges.get(i + 1);
 
-            BigInteger currentRangeWidth = 
valueForToken(right).subtract(valueForToken(r.left));
-            BigInteger nextRangeWidth = 
valueForToken(nextRight).subtract(valueForToken(nextRange.left));
+            BigInteger currentRangeWidth = r.totalTokens(this);
+            BigInteger nextRangeWidth = nextRange.totalTokens(this);
             sum = sum.add(currentRangeWidth);
 
             // does this or next range take us beyond the per part limit?
@@ -187,7 +185,7 @@ public abstract class Splitter
                 if (diffNext.compareTo(diffCurrent) >= 0)
                 {
                     sum = BigInteger.ZERO;
-                    boundaries.add(right);
+                    boundaries.add(token(r.right()));
                 }
             }
             i++;
@@ -256,4 +254,61 @@ public abstract class Splitter
         }
         return subranges;
     }
+
+    public static class WeightedRange
+    {
+        private final double weight;
+        private final Range<Token> range;
+
+        public WeightedRange(double weight, Range<Token> range)
+        {
+            this.weight = weight;
+            this.range = range;
+        }
+
+        public BigInteger totalTokens(Splitter splitter)
+        {
+            BigInteger right = 
splitter.valueForToken(splitter.token(range.right));
+            BigInteger left = splitter.valueForToken(range.left);
+            BigInteger factor = BigInteger.valueOf(Math.max(1, (long) (1 / 
weight)));
+            BigInteger size = right.subtract(left);
+            return size.abs().divide(factor);
+        }
+
+        public Token left()
+        {
+            return range.left;
+        }
+
+        public Token right()
+        {
+            return range.right;
+        }
+
+        public Range<Token> range()
+        {
+            return range;
+        }
+
+        public String toString()
+        {
+            return "WeightedRange{" +
+                   "weight=" + weight +
+                   ", range=" + range +
+                   '}';
+        }
+
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (!(o instanceof WeightedRange)) return false;
+            WeightedRange that = (WeightedRange) o;
+            return Objects.equals(range, that.range);
+        }
+
+        public int hashCode()
+        {
+            return Objects.hash(weight, range);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/dht/StreamStateStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/StreamStateStore.java 
b/src/java/org/apache/cassandra/dht/StreamStateStore.java
index e3ea838..3144e81 100644
--- a/src/java/org/apache/cassandra/dht/StreamStateStore.java
+++ b/src/java/org/apache/cassandra/dht/StreamStateStore.java
@@ -19,38 +19,43 @@ package org.apache.cassandra.dht;
 
 import java.util.Set;
 
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.streaming.StreamEvent;
 import org.apache.cassandra.streaming.StreamEventHandler;
 import org.apache.cassandra.streaming.StreamRequest;
 import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.utils.Pair;
 
 /**
  * Store and update available ranges (data already received) to system 
keyspace.
  */
 public class StreamStateStore implements StreamEventHandler
 {
-    public Set<Range<Token>> getAvailableRanges(String keyspace, IPartitioner 
partitioner)
+    private static final Logger logger = 
LoggerFactory.getLogger(StreamStateStore.class);
+
+    public RangesAtEndpoint getAvailableRanges(String keyspace, IPartitioner 
partitioner)
     {
         return SystemKeyspace.getAvailableRanges(keyspace, partitioner);
     }
 
     /**
-     * Check if given token's data is available in this node.
+     * Check if given token's data is available in this node. This doesn't 
handle transientness in a useful way
+     * so it's only used by a legacy test
      *
      * @param keyspace keyspace name
      * @param token token to check
      * @return true if given token in the keyspace is already streamed and 
ready to be served.
      */
+    @VisibleForTesting
     public boolean isDataAvailable(String keyspace, Token token)
     {
-        Set<Range<Token>> availableRanges = getAvailableRanges(keyspace, 
token.getPartitioner());
-        for (Range<Token> range : availableRanges)
-        {
-            if (range.contains(token))
-                return true;
-        }
-        return false;
+        RangesAtEndpoint availableRanges = getAvailableRanges(keyspace, 
token.getPartitioner());
+        return availableRanges.ranges().stream().anyMatch(range -> 
range.contains(token));
     }
 
     /**
@@ -73,7 +78,7 @@ public class StreamStateStore implements StreamEventHandler
                 }
                 for (StreamRequest request : se.requests)
                 {
-                    SystemKeyspace.updateAvailableRanges(request.keyspace, 
request.ranges);
+                    SystemKeyspace.updateAvailableRanges(request.keyspace, 
request.full.ranges(), request.transientReplicas.ranges());
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocator.java
 
b/src/java/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocator.java
index efd2766..36fc8c2 100644
--- 
a/src/java/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocator.java
+++ 
b/src/java/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocator.java
@@ -73,7 +73,7 @@ class ReplicationAwareTokenAllocator<Unit> extends 
TokenAllocatorBase<Unit>
         Map<Unit, UnitInfo<Unit>> unitInfos = createUnitInfos(groups);
         if (groups.size() < replicas)
         {
-            // We need at least replicas groups to do allocation correctly. If 
there aren't enough, 
+            // We need at least replicas groups to do allocation correctly. If 
there aren't enough,
             // use random allocation.
             // This part of the code should only be reached via the RATATest. 
StrategyAdapter should disallow
             // token allocation in this case as the algorithm is not able to 
cover the behavior of NetworkTopologyStrategy.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java 
b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
index 61082df..ef91fbb 100644
--- a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
+++ b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
@@ -113,7 +113,7 @@ public class TokenAllocation
     {
         double size = current.size(next);
         Token representative = current.getPartitioner().midpoint(current, 
next);
-        for (InetAddressAndPort n : 
rs.calculateNaturalEndpoints(representative, tokenMetadata))
+        for (InetAddressAndPort n : 
rs.calculateNaturalReplicas(representative, tokenMetadata).endpoints())
         {
             Double v = ownership.get(n);
             ownership.put(n, v != null ? v + size : size);
@@ -169,7 +169,7 @@ public class TokenAllocation
 
     static StrategyAdapter getStrategy(final TokenMetadata tokenMetadata, 
final SimpleStrategy rs, final InetAddressAndPort endpoint)
     {
-        final int replicas = rs.getReplicationFactor();
+        final int replicas = rs.getReplicationFactor().allReplicas;
 
         return new StrategyAdapter()
         {
@@ -196,7 +196,7 @@ public class TokenAllocation
     static StrategyAdapter getStrategy(final TokenMetadata tokenMetadata, 
final NetworkTopologyStrategy rs, final IEndpointSnitch snitch, final 
InetAddressAndPort endpoint)
     {
         final String dc = snitch.getDatacenter(endpoint);
-        final int replicas = rs.getReplicationFactor(dc);
+        final int replicas = rs.getReplicationFactor(dc).allReplicas;
 
         if (replicas == 0 || replicas == 1)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/exceptions/UnavailableException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/exceptions/UnavailableException.java 
b/src/java/org/apache/cassandra/exceptions/UnavailableException.java
index 7b4edd8..d6e8488 100644
--- a/src/java/org/apache/cassandra/exceptions/UnavailableException.java
+++ b/src/java/org/apache/cassandra/exceptions/UnavailableException.java
@@ -25,14 +25,26 @@ public class UnavailableException extends 
RequestExecutionException
     public final int required;
     public final int alive;
 
-    public UnavailableException(ConsistencyLevel consistency, int required, 
int alive)
+    public static UnavailableException create(ConsistencyLevel consistency, 
int required, int alive)
     {
-        this("Cannot achieve consistency level " + consistency, consistency, 
required, alive);
+        assert alive < required;
+        return create(consistency, required, 0, alive, 0);
     }
 
-    public UnavailableException(ConsistencyLevel consistency, String dc, int 
required, int alive)
+    public static UnavailableException create(ConsistencyLevel consistency, 
int required, int requiredFull, int alive, int aliveFull)
     {
-        this("Cannot achieve consistency level " + consistency + " in DC " + 
dc, consistency, required, alive);
+        if (required > alive)
+            return new UnavailableException("Cannot achieve consistency level 
" + consistency, consistency, required, alive);
+        assert requiredFull < aliveFull;
+        return new UnavailableException("Insufficient full replicas", 
consistency, required, alive);
+    }
+
+    public static UnavailableException create(ConsistencyLevel consistency, 
String dc, int required, int requiredFull, int alive, int aliveFull)
+    {
+        if (required > alive)
+            return new UnavailableException("Cannot achieve consistency level 
" + consistency + " in DC " + dc, consistency, required, alive);
+        assert requiredFull < aliveFull;
+        return new UnavailableException("Insufficient full replicas in DC " + 
dc, consistency, required, alive);
     }
 
     public UnavailableException(String msg, ConsistencyLevel consistency, int 
required, int alive)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/gms/EndpointState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java 
b/src/java/org/apache/cassandra/gms/EndpointState.java
index 5646bf6..8546a70 100644
--- a/src/java/org/apache/cassandra/gms/EndpointState.java
+++ b/src/java/org/apache/cassandra/gms/EndpointState.java
@@ -144,6 +144,11 @@ public class EndpointState
         return rpcState != null && Boolean.parseBoolean(rpcState.value);
     }
 
+    public boolean isNormalState()
+    {
+        return getStatus().equals(VersionedValue.STATUS_NORMAL);
+    }
+
     public String getStatus()
     {
         VersionedValue status = 
getApplicationState(ApplicationState.STATUS_WITH_PORT);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/hints/HintsService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsService.java 
b/src/java/org/apache/cassandra/hints/HintsService.java
index 0cd1278..c6ad3d9 100644
--- a/src/java/org/apache/cassandra/hints/HintsService.java
+++ b/src/java/org/apache/cassandra/hints/HintsService.java
@@ -20,11 +20,14 @@ package org.apache.cassandra.hints;
 import java.io.File;
 import java.lang.management.ManagementFactory;
 import java.net.UnknownHostException;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
@@ -39,6 +42,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.ParameterizedClass;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.gms.IFailureDetector;
+import org.apache.cassandra.locator.EndpointsForToken;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.metrics.HintedHandoffMetrics;
 import org.apache.cassandra.metrics.StorageMetrics;
@@ -46,9 +50,7 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
 
-import static com.google.common.collect.Iterables.filter;
 import static com.google.common.collect.Iterables.transform;
-import static com.google.common.collect.Iterables.size;
 
 /**
  * A singleton-ish wrapper over various hints components:
@@ -151,7 +153,7 @@ public final class HintsService implements HintsServiceMBean
      * @param hostIds host ids of the hint's target nodes
      * @param hint the hint to store
      */
-    public void write(Iterable<UUID> hostIds, Hint hint)
+    public void write(Collection<UUID> hostIds, Hint hint)
     {
         if (isShutDown)
             throw new IllegalStateException("HintsService is shut down and 
can't accept new hints");
@@ -161,7 +163,7 @@ public final class HintsService implements HintsServiceMBean
 
         bufferPool.write(hostIds, hint);
 
-        StorageMetrics.totalHints.inc(size(hostIds));
+        StorageMetrics.totalHints.inc(hostIds.size());
     }
 
     /**
@@ -183,9 +185,14 @@ public final class HintsService implements 
HintsServiceMBean
         String keyspaceName = hint.mutation.getKeyspaceName();
         Token token = hint.mutation.key().getToken();
 
-        Iterable<UUID> hostIds =
-        
transform(filter(StorageService.instance.getNaturalAndPendingEndpoints(keyspaceName,
 token), StorageProxy::shouldHint),
-                  StorageService.instance::getHostIdForEndpoint);
+        EndpointsForToken replicas = 
StorageService.instance.getNaturalAndPendingReplicasForToken(keyspaceName, 
token);
+
+        // judicious use of streams: eagerly materializing probably cheaper
+        // than performing filters / translations 2x extra via 
Iterables.filter/transform
+        List<UUID> hostIds = replicas.stream()
+                .filter(StorageProxy::shouldHint)
+                .map(replica -> 
StorageService.instance.getHostIdForEndpoint(replica.endpoint()))
+                .collect(Collectors.toList());
 
         write(hostIds, hint);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java 
b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index 044a00b..4eaf1fe 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -69,13 +69,14 @@ abstract class AbstractSSTableSimpleWriter implements 
Closeable
         SerializationHeader header = new SerializationHeader(true, 
metadata.get(), columns, EncodingStats.NO_STATS);
 
         if (makeRangeAware)
-            return SSTableTxnWriter.createRangeAware(metadata, 0,  
ActiveRepairService.UNREPAIRED_SSTABLE, ActiveRepairService.NO_PENDING_REPAIR, 
formatType, 0, header);
+            return SSTableTxnWriter.createRangeAware(metadata, 0,  
ActiveRepairService.UNREPAIRED_SSTABLE, ActiveRepairService.NO_PENDING_REPAIR, 
false, formatType, 0, header);
 
         return SSTableTxnWriter.create(metadata,
                                        createDescriptor(directory, 
metadata.keyspace, metadata.name, formatType),
                                        0,
                                        ActiveRepairService.UNREPAIRED_SSTABLE,
                                        ActiveRepairService.NO_PENDING_REPAIR,
+                                       false,
                                        0,
                                        header,
                                        Collections.emptySet());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/SSTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java 
b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index f2605fb..055bf24 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -23,6 +23,7 @@ import java.nio.charset.Charset;
 import java.util.*;
 import java.util.concurrent.CopyOnWriteArraySet;
 
+import com.google.common.base.Preconditions;
 import com.google.common.base.Predicates;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.Sets;
@@ -46,6 +47,9 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.memory.HeapAllocator;
 
+import static 
org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR;
+import static 
org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE;
+
 /**
  * This class is built on top of the SequenceFile. It stores
  * data on disk in sorted fashion. However the sorting is upto
@@ -350,4 +354,13 @@ public abstract class SSTable
     {
         return AbstractBounds.bounds(first.getToken(), true, last.getToken(), 
true);
     }
+
+    public static void validateRepairedMetadata(long repairedAt, UUID 
pendingRepair, boolean isTransient)
+    {
+        Preconditions.checkArgument((pendingRepair == NO_PENDING_REPAIR) || 
(repairedAt == UNREPAIRED_SSTABLE),
+                                    "pendingRepair cannot be set on a repaired 
sstable");
+        Preconditions.checkArgument(!isTransient || (pendingRepair != 
NO_PENDING_REPAIR),
+                                    "isTransient can only be true for sstables 
pending repair");
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 4ba0533..ec2a700 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -126,7 +126,7 @@ public class SSTableLoader implements StreamEventHandler
                                               for 
(Map.Entry<InetAddressAndPort, Collection<Range<Token>>> entry : 
ranges.entrySet())
                                               {
                                                   InetAddressAndPort endpoint 
= entry.getKey();
-                                                  Collection<Range<Token>> 
tokenRanges = entry.getValue();
+                                                  List<Range<Token>> 
tokenRanges = Range.normalize(entry.getValue());
 
                                                   
List<SSTableReader.PartitionPositionBounds> sstableSections = 
sstable.getPositionsForRanges(tokenRanges);
                                                   long estimatedKeys = 
sstable.estimatedKeysForRanges(tokenRanges);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
index 60b8962..cfb1365 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
@@ -99,10 +99,10 @@ public class SSTableTxnWriter extends 
Transactional.AbstractTransactional implem
     }
 
     @SuppressWarnings("resource") // log and writer closed during doPostCleanup
-    public static SSTableTxnWriter create(ColumnFamilyStore cfs, Descriptor 
descriptor, long keyCount, long repairedAt, UUID pendingRepair, int 
sstableLevel, SerializationHeader header)
+    public static SSTableTxnWriter create(ColumnFamilyStore cfs, Descriptor 
descriptor, long keyCount, long repairedAt, UUID pendingRepair, boolean 
isTransient, int sstableLevel, SerializationHeader header)
     {
         LifecycleTransaction txn = 
LifecycleTransaction.offline(OperationType.WRITE);
-        SSTableMultiWriter writer = cfs.createSSTableMultiWriter(descriptor, 
keyCount, repairedAt, pendingRepair, sstableLevel, header, txn);
+        SSTableMultiWriter writer = cfs.createSSTableMultiWriter(descriptor, 
keyCount, repairedAt, pendingRepair, isTransient, sstableLevel, header, txn);
         return new SSTableTxnWriter(txn, writer);
     }
 
@@ -112,6 +112,7 @@ public class SSTableTxnWriter extends 
Transactional.AbstractTransactional implem
                                                     long keyCount,
                                                     long repairedAt,
                                                     UUID pendingRepair,
+                                                    boolean isTransient,
                                                     SSTableFormat.Type type,
                                                     int sstableLevel,
                                                     SerializationHeader header)
@@ -122,7 +123,7 @@ public class SSTableTxnWriter extends 
Transactional.AbstractTransactional implem
         SSTableMultiWriter writer;
         try
         {
-            writer = new RangeAwareSSTableWriter(cfs, keyCount, repairedAt, 
pendingRepair, type, sstableLevel, 0, txn, header);
+            writer = new RangeAwareSSTableWriter(cfs, keyCount, repairedAt, 
pendingRepair, isTransient, type, sstableLevel, 0, txn, header);
         }
         catch (IOException e)
         {
@@ -140,6 +141,7 @@ public class SSTableTxnWriter extends 
Transactional.AbstractTransactional implem
                                           long keyCount,
                                           long repairedAt,
                                           UUID pendingRepair,
+                                          boolean isTransient,
                                           int sstableLevel,
                                           SerializationHeader header,
                                           Collection<Index> indexes)
@@ -147,12 +149,12 @@ public class SSTableTxnWriter extends 
Transactional.AbstractTransactional implem
         // if the column family store does not exist, we create a new default 
SSTableMultiWriter to use:
         LifecycleTransaction txn = 
LifecycleTransaction.offline(OperationType.WRITE);
         MetadataCollector collector = new 
MetadataCollector(metadata.get().comparator).sstableLevel(sstableLevel);
-        SSTableMultiWriter writer = 
SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, 
pendingRepair, metadata, collector, header, indexes, txn);
+        SSTableMultiWriter writer = 
SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, 
pendingRepair, isTransient, metadata, collector, header, indexes, txn);
         return new SSTableTxnWriter(txn, writer);
     }
 
-    public static SSTableTxnWriter create(ColumnFamilyStore cfs, Descriptor 
desc, long keyCount, long repairedAt, UUID pendingRepair, SerializationHeader 
header)
+    public static SSTableTxnWriter create(ColumnFamilyStore cfs, Descriptor 
desc, long keyCount, long repairedAt, UUID pendingRepair, boolean isTransient, 
SerializationHeader header)
     {
-        return create(cfs, desc, keyCount, repairedAt, pendingRepair, 0, 
header);
+        return create(cfs, desc, keyCount, repairedAt, pendingRepair, 
isTransient, 0, header);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java 
b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
index a40ec18..eb5c5fe 100644
--- a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
@@ -111,13 +111,14 @@ public class SimpleSSTableMultiWriter implements 
SSTableMultiWriter
                                             long keyCount,
                                             long repairedAt,
                                             UUID pendingRepair,
+                                            boolean isTransient,
                                             TableMetadataRef metadata,
                                             MetadataCollector 
metadataCollector,
                                             SerializationHeader header,
                                             Collection<Index> indexes,
                                             LifecycleTransaction txn)
     {
-        SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, 
repairedAt, pendingRepair, metadata, metadataCollector, header, indexes, txn);
+        SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, 
repairedAt, pendingRepair, isTransient, metadata, metadataCollector, header, 
indexes, txn);
         return new SimpleSSTableMultiWriter(writer, txn);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
index f289fe3..29fa573 100644
--- 
a/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
+++ 
b/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
@@ -44,6 +44,7 @@ public class RangeAwareSSTableWriter implements 
SSTableMultiWriter
     private final long estimatedKeys;
     private final long repairedAt;
     private final UUID pendingRepair;
+    private final boolean isTransient;
     private final SSTableFormat.Type format;
     private final SerializationHeader header;
     private final LifecycleTransaction txn;
@@ -53,7 +54,7 @@ public class RangeAwareSSTableWriter implements 
SSTableMultiWriter
     private final List<SSTableReader> finishedReaders = new ArrayList<>();
     private SSTableMultiWriter currentWriter = null;
 
-    public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys, 
long repairedAt, UUID pendingRepair, SSTableFormat.Type format, int 
sstableLevel, long totalSize, LifecycleTransaction txn, SerializationHeader 
header) throws IOException
+    public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys, 
long repairedAt, UUID pendingRepair, boolean isTransient, SSTableFormat.Type 
format, int sstableLevel, long totalSize, LifecycleTransaction txn, 
SerializationHeader header) throws IOException
     {
         DiskBoundaries db = cfs.getDiskBoundaries();
         directories = db.directories;
@@ -62,6 +63,7 @@ public class RangeAwareSSTableWriter implements 
SSTableMultiWriter
         this.estimatedKeys = estimatedKeys / directories.size();
         this.repairedAt = repairedAt;
         this.pendingRepair = pendingRepair;
+        this.isTransient = isTransient;
         this.format = format;
         this.txn = txn;
         this.header = header;
@@ -73,7 +75,7 @@ public class RangeAwareSSTableWriter implements 
SSTableMultiWriter
                 throw new IOException(String.format("Insufficient disk space 
to store %s",
                                                     
FBUtilities.prettyPrintMemory(totalSize)));
             Descriptor desc = 
cfs.newSSTableDescriptor(cfs.getDirectories().getLocationForDisk(localDir), 
format);
-            currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, 
repairedAt, pendingRepair, sstableLevel, header, txn);
+            currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, 
repairedAt, pendingRepair, isTransient, sstableLevel, header, txn);
         }
     }
 
@@ -95,7 +97,7 @@ public class RangeAwareSSTableWriter implements 
SSTableMultiWriter
                 finishedWriters.add(currentWriter);
 
             Descriptor desc = 
cfs.newSSTableDescriptor(cfs.getDirectories().getLocationForDisk(directories.get(currentIndex)),
 format);
-            currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, 
repairedAt, pendingRepair, sstableLevel, header, txn);
+            currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, 
repairedAt, pendingRepair, isTransient, sstableLevel, header, txn);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java 
b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 2fade21..edb3afa 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -1852,6 +1852,11 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
         return sstableMetadata.repairedAt;
     }
 
+    public boolean isTransient()
+    {
+        return sstableMetadata.isTransient;
+    }
+
     public boolean intersects(Collection<Range<Token>> ranges)
     {
         Bounds<Token> range = new Bounds<>(first.getToken(), last.getToken());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index 1e183e2..cca59cf 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@ -55,6 +55,7 @@ public abstract class SSTableWriter extends SSTable 
implements Transactional
 {
     protected long repairedAt;
     protected UUID pendingRepair;
+    protected boolean isTransient;
     protected long maxDataAge = -1;
     protected final long keyCount;
     protected final MetadataCollector metadataCollector;
@@ -77,6 +78,7 @@ public abstract class SSTableWriter extends SSTable 
implements Transactional
                             long keyCount,
                             long repairedAt,
                             UUID pendingRepair,
+                            boolean isTransient,
                             TableMetadataRef metadata,
                             MetadataCollector metadataCollector,
                             SerializationHeader header,
@@ -86,6 +88,7 @@ public abstract class SSTableWriter extends SSTable 
implements Transactional
         this.keyCount = keyCount;
         this.repairedAt = repairedAt;
         this.pendingRepair = pendingRepair;
+        this.isTransient = isTransient;
         this.metadataCollector = metadataCollector;
         this.header = header;
         this.rowIndexEntrySerializer = 
descriptor.version.getSSTableFormat().getIndexSerializer(metadata.get(), 
descriptor.version, header);
@@ -96,6 +99,7 @@ public abstract class SSTableWriter extends SSTable 
implements Transactional
                                        Long keyCount,
                                        Long repairedAt,
                                        UUID pendingRepair,
+                                       boolean isTransient,
                                        TableMetadataRef metadata,
                                        MetadataCollector metadataCollector,
                                        SerializationHeader header,
@@ -103,20 +107,21 @@ public abstract class SSTableWriter extends SSTable 
implements Transactional
                                        LifecycleTransaction txn)
     {
         Factory writerFactory = descriptor.getFormat().getWriterFactory();
-        return writerFactory.open(descriptor, keyCount, repairedAt, 
pendingRepair, metadata, metadataCollector, header, observers(descriptor, 
indexes, txn.opType()), txn);
+        return writerFactory.open(descriptor, keyCount, repairedAt, 
pendingRepair, isTransient, metadata, metadataCollector, header, 
observers(descriptor, indexes, txn.opType()), txn);
     }
 
     public static SSTableWriter create(Descriptor descriptor,
                                        long keyCount,
                                        long repairedAt,
                                        UUID pendingRepair,
+                                       boolean isTransient,
                                        int sstableLevel,
                                        SerializationHeader header,
                                        Collection<Index> indexes,
                                        LifecycleTransaction txn)
     {
         TableMetadataRef metadata = 
Schema.instance.getTableMetadataRef(descriptor);
-        return create(metadata, descriptor, keyCount, repairedAt, 
pendingRepair, sstableLevel, header, indexes, txn);
+        return create(metadata, descriptor, keyCount, repairedAt, 
pendingRepair, isTransient, sstableLevel, header, indexes, txn);
     }
 
     public static SSTableWriter create(TableMetadataRef metadata,
@@ -124,13 +129,14 @@ public abstract class SSTableWriter extends SSTable 
implements Transactional
                                        long keyCount,
                                        long repairedAt,
                                        UUID pendingRepair,
+                                       boolean isTransient,
                                        int sstableLevel,
                                        SerializationHeader header,
                                        Collection<Index> indexes,
                                        LifecycleTransaction txn)
     {
         MetadataCollector collector = new 
MetadataCollector(metadata.get().comparator).sstableLevel(sstableLevel);
-        return create(descriptor, keyCount, repairedAt, pendingRepair, 
metadata, collector, header, indexes, txn);
+        return create(descriptor, keyCount, repairedAt, pendingRepair, 
isTransient, metadata, collector, header, indexes, txn);
     }
 
     @VisibleForTesting
@@ -138,11 +144,12 @@ public abstract class SSTableWriter extends SSTable 
implements Transactional
                                        long keyCount,
                                        long repairedAt,
                                        UUID pendingRepair,
+                                       boolean isTransient,
                                        SerializationHeader header,
                                        Collection<Index> indexes,
                                        LifecycleTransaction txn)
     {
-        return create(descriptor, keyCount, repairedAt, pendingRepair, 0, 
header, indexes, txn);
+        return create(descriptor, keyCount, repairedAt, pendingRepair, 
isTransient, 0, header, indexes, txn);
     }
 
     private static Set<Component> components(TableMetadata metadata)
@@ -309,6 +316,7 @@ public abstract class SSTableWriter extends SSTable 
implements Transactional
                                                   
metadata().params.bloomFilterFpChance,
                                                   repairedAt,
                                                   pendingRepair,
+                                                  isTransient,
                                                   header);
     }
 
@@ -338,6 +346,7 @@ public abstract class SSTableWriter extends SSTable 
implements Transactional
                                            long keyCount,
                                            long repairedAt,
                                            UUID pendingRepair,
+                                           boolean isTransient,
                                            TableMetadataRef metadata,
                                            MetadataCollector metadataCollector,
                                            SerializationHeader header,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/format/Version.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/Version.java 
b/src/java/org/apache/cassandra/io/sstable/format/Version.java
index 1d965ce..9b82c14 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/Version.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/Version.java
@@ -55,6 +55,8 @@ public abstract class Version
 
     public abstract boolean hasPendingRepair();
 
+    public abstract boolean hasIsTransient();
+
     public abstract boolean hasMetadataChecksum();
 
     /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to