http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java 
b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
index 72b5e2a..acfe71a 100644
--- a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
+++ b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
@@ -19,8 +19,9 @@
 package org.apache.cassandra.db;
 
 import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Comparator;
 import java.util.List;
+import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,6 +31,8 @@ import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Splitter;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.service.PendingRangeCalculatorService;
 import org.apache.cassandra.service.StorageService;
@@ -68,7 +71,7 @@ public class DiskBoundaryManager
 
     private static DiskBoundaries getDiskBoundaryValue(ColumnFamilyStore cfs)
     {
-        Collection<Range<Token>> localRanges;
+        RangesAtEndpoint localRanges;
 
         long ringVersion;
         TokenMetadata tmd;
@@ -87,7 +90,7 @@ public class DiskBoundaryManager
                 // Reason we use use the future settled TMD is that if we 
decommission a node, we want to stream
                 // from that node to the correct location on disk, if we 
didn't, we would put new files in the wrong places.
                 // We do this to minimize the amount of data we need to move 
in rebalancedisks once everything settled
-                localRanges = 
cfs.keyspace.getReplicationStrategy().getAddressRanges(tmd.cloneAfterAllSettled()).get(FBUtilities.getBroadcastAddressAndPort());
+                localRanges = 
cfs.keyspace.getReplicationStrategy().getAddressReplicas(tmd.cloneAfterAllSettled(),
 FBUtilities.getBroadcastAddressAndPort());
             }
             logger.debug("Got local ranges {} (ringVersion = {})", 
localRanges, ringVersion);
         }
@@ -106,9 +109,18 @@ public class DiskBoundaryManager
         if (localRanges == null || localRanges.isEmpty())
             return new DiskBoundaries(dirs, null, ringVersion, 
directoriesVersion);
 
-        List<Range<Token>> sortedLocalRanges = Range.sort(localRanges);
+        // note that Range.sort unwraps any wraparound ranges, so we need to 
sort them here
+        List<Range<Token>> fullLocalRanges = Range.sort(localRanges.stream()
+                                                                   
.filter(Replica::isFull)
+                                                                   
.map(Replica::range)
+                                                                   
.collect(Collectors.toList()));
+        List<Range<Token>> transientLocalRanges = 
Range.sort(localRanges.stream()
+                                                                        
.filter(Replica::isTransient)
+                                                                        
.map(Replica::range)
+                                                                        
.collect(Collectors.toList()));
+
+        List<PartitionPosition> positions = getDiskBoundaries(fullLocalRanges, 
transientLocalRanges, cfs.getPartitioner(), dirs);
 
-        List<PartitionPosition> positions = 
getDiskBoundaries(sortedLocalRanges, cfs.getPartitioner(), dirs);
         return new DiskBoundaries(dirs, positions, ringVersion, 
directoriesVersion);
     }
 
@@ -121,15 +133,26 @@ public class DiskBoundaryManager
      *
      * The final entry in the returned list will always be the partitioner 
maximum tokens upper key bound
      */
-    private static List<PartitionPosition> 
getDiskBoundaries(List<Range<Token>> sortedLocalRanges, IPartitioner 
partitioner, Directories.DataDirectory[] dataDirectories)
+    private static List<PartitionPosition> 
getDiskBoundaries(List<Range<Token>> fullRanges, List<Range<Token>> 
transientRanges, IPartitioner partitioner, Directories.DataDirectory[] 
dataDirectories)
     {
         assert partitioner.splitter().isPresent();
+
         Splitter splitter = partitioner.splitter().get();
         boolean dontSplitRanges = DatabaseDescriptor.getNumTokens() > 1;
-        List<Token> boundaries = 
splitter.splitOwnedRanges(dataDirectories.length, sortedLocalRanges, 
dontSplitRanges);
+
+        List<Splitter.WeightedRange> weightedRanges = new 
ArrayList<>(fullRanges.size() + transientRanges.size());
+        for (Range<Token> r : fullRanges)
+            weightedRanges.add(new Splitter.WeightedRange(1.0, r));
+
+        for (Range<Token> r : transientRanges)
+            weightedRanges.add(new Splitter.WeightedRange(0.1, r));
+
+        
weightedRanges.sort(Comparator.comparing(Splitter.WeightedRange::left));
+
+        List<Token> boundaries = 
splitter.splitOwnedRanges(dataDirectories.length, weightedRanges, 
dontSplitRanges);
         // If we can't split by ranges, split evenly to ensure utilisation of 
all disks
         if (dontSplitRanges && boundaries.size() < dataDirectories.length)
-            boundaries = splitter.splitOwnedRanges(dataDirectories.length, 
sortedLocalRanges, false);
+            boundaries = splitter.splitOwnedRanges(dataDirectories.length, 
weightedRanges, false);
 
         List<PartitionPosition> diskBoundaries = new ArrayList<>();
         for (int i = 0; i < boundaries.size() - 1; i++)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java 
b/src/java/org/apache/cassandra/db/Memtable.java
index c162697..436b7ef 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -503,6 +503,7 @@ public class Memtable implements Comparable<Memtable>
                                                 toFlush.size(),
                                                 
ActiveRepairService.UNREPAIRED_SSTABLE,
                                                 
ActiveRepairService.NO_PENDING_REPAIR,
+                                                false,
                                                 sstableMetadataCollector,
                                                 new SerializationHeader(true, 
cfs.metadata(), columns, stats), txn);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/MutationVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/MutationVerbHandler.java 
b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
index 8386048..9660f65 100644
--- a/src/java/org/apache/cassandra/db/MutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.db;
 
-import java.io.IOException;
 import java.util.Iterator;
 
 import org.apache.cassandra.exceptions.WriteTimeoutException;
@@ -38,7 +37,7 @@ public class MutationVerbHandler implements 
IVerbHandler<Mutation>
         Tracing.trace("Payload application resulted in WriteTimeout, not 
replying");
     }
 
-    public void doVerb(MessageIn<Mutation> message, int id)  throws IOException
+    public void doVerb(MessageIn<Mutation> message, int id)
     {
         // Check if there were any forwarding headers in this message
         InetAddressAndPort from = 
(InetAddressAndPort)message.parameters.get(ParameterType.FORWARD_FROM);
@@ -69,7 +68,7 @@ public class MutationVerbHandler implements 
IVerbHandler<Mutation>
         }
     }
 
-    private static void forwardToLocalNodes(Mutation mutation, 
MessagingService.Verb verb, ForwardToContainer forwardTo, InetAddressAndPort 
from) throws IOException
+    private static void forwardToLocalNodes(Mutation mutation, 
MessagingService.Verb verb, ForwardToContainer forwardTo, InetAddressAndPort 
from)
     {
         // tell the recipients who to send their ack to
         MessageOut<Mutation> message = new MessageOut<>(verb, mutation, 
Mutation.serializer).withParameter(ParameterType.FORWARD_FROM, from);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java 
b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index 2bfb434..7eab016 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -24,7 +24,6 @@ import java.util.List;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
 
-import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.filter.*;
@@ -61,6 +60,7 @@ public class PartitionRangeReadCommand extends ReadCommand 
implements PartitionR
 
     private PartitionRangeReadCommand(boolean isDigest,
                                      int digestVersion,
+                                     boolean acceptsTransient,
                                      TableMetadata metadata,
                                      int nowInSec,
                                      ColumnFilter columnFilter,
@@ -69,7 +69,7 @@ public class PartitionRangeReadCommand extends ReadCommand 
implements PartitionR
                                      DataRange dataRange,
                                      IndexMetadata index)
     {
-        super(Kind.PARTITION_RANGE, isDigest, digestVersion, metadata, 
nowInSec, columnFilter, rowFilter, limits, index);
+        super(Kind.PARTITION_RANGE, isDigest, digestVersion, acceptsTransient, 
metadata, nowInSec, columnFilter, rowFilter, limits, index);
         this.dataRange = dataRange;
     }
 
@@ -82,6 +82,7 @@ public class PartitionRangeReadCommand extends ReadCommand 
implements PartitionR
     {
         return new PartitionRangeReadCommand(false,
                                              0,
+                                             false,
                                              metadata,
                                              nowInSec,
                                              columnFilter,
@@ -103,6 +104,7 @@ public class PartitionRangeReadCommand extends ReadCommand 
implements PartitionR
     {
         return new PartitionRangeReadCommand(false,
                                              0,
+                                             false,
                                              metadata,
                                              nowInSec,
                                              ColumnFilter.all(metadata),
@@ -151,6 +153,7 @@ public class PartitionRangeReadCommand extends ReadCommand 
implements PartitionR
         // on the ring.
         return new PartitionRangeReadCommand(isDigestQuery(),
                                              digestVersion(),
+                                             acceptsTransient(),
                                              metadata(),
                                              nowInSec(),
                                              columnFilter(),
@@ -164,6 +167,7 @@ public class PartitionRangeReadCommand extends ReadCommand 
implements PartitionR
     {
         return new PartitionRangeReadCommand(isDigestQuery(),
                                              digestVersion(),
+                                             acceptsTransient(),
                                              metadata(),
                                              nowInSec(),
                                              columnFilter(),
@@ -177,6 +181,21 @@ public class PartitionRangeReadCommand extends ReadCommand 
implements PartitionR
     {
         return new PartitionRangeReadCommand(true,
                                              digestVersion(),
+                                             false,
+                                             metadata(),
+                                             nowInSec(),
+                                             columnFilter(),
+                                             rowFilter(),
+                                             limits(),
+                                             dataRange(),
+                                             indexMetadata());
+    }
+
+    public PartitionRangeReadCommand copyAsTransientQuery()
+    {
+        return new PartitionRangeReadCommand(false,
+                                             0,
+                                             true,
                                              metadata(),
                                              nowInSec(),
                                              columnFilter(),
@@ -191,6 +210,7 @@ public class PartitionRangeReadCommand extends ReadCommand 
implements PartitionR
     {
         return new PartitionRangeReadCommand(isDigestQuery(),
                                              digestVersion(),
+                                             acceptsTransient(),
                                              metadata(),
                                              nowInSec(),
                                              columnFilter(),
@@ -205,6 +225,7 @@ public class PartitionRangeReadCommand extends ReadCommand 
implements PartitionR
     {
         return new PartitionRangeReadCommand(isDigestQuery(),
                                              digestVersion(),
+                                             acceptsTransient(),
                                              metadata(),
                                              nowInSec(),
                                              columnFilter(),
@@ -406,6 +427,7 @@ public class PartitionRangeReadCommand extends ReadCommand 
implements PartitionR
                                        int version,
                                        boolean isDigest,
                                        int digestVersion,
+                                       boolean acceptsTransient,
                                        TableMetadata metadata,
                                        int nowInSec,
                                        ColumnFilter columnFilter,
@@ -415,7 +437,7 @@ public class PartitionRangeReadCommand extends ReadCommand 
implements PartitionR
         throws IOException
         {
             DataRange range = DataRange.serializer.deserialize(in, version, 
metadata);
-            return new PartitionRangeReadCommand(isDigest, digestVersion, 
metadata, nowInSec, columnFilter, rowFilter, limits, range, index);
+            return new PartitionRangeReadCommand(isDigest, digestVersion, 
acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, range, 
index);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java 
b/src/java/org/apache/cassandra/db/ReadCommand.java
index 0262140..736e3a3 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -34,7 +34,6 @@ import org.apache.cassandra.db.transform.RTBoundCloser;
 import org.apache.cassandra.db.transform.RTBoundValidator;
 import org.apache.cassandra.db.transform.StoppingTransformation;
 import org.apache.cassandra.db.transform.Transformation;
-import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.UnknownIndexException;
 import org.apache.cassandra.index.Index;
 import org.apache.cassandra.index.IndexNotAvailableException;
@@ -68,6 +67,7 @@ public abstract class ReadCommand extends AbstractReadQuery
     private final Kind kind;
 
     private final boolean isDigestQuery;
+    private final boolean acceptsTransient;
     // if a digest query, the version for which the digest is expected. 
Ignored if not a digest.
     private int digestVersion;
 
@@ -80,6 +80,7 @@ public abstract class ReadCommand extends AbstractReadQuery
                                                 int version,
                                                 boolean isDigest,
                                                 int digestVersion,
+                                                boolean acceptsTransient,
                                                 TableMetadata metadata,
                                                 int nowInSec,
                                                 ColumnFilter columnFilter,
@@ -104,6 +105,7 @@ public abstract class ReadCommand extends AbstractReadQuery
     protected ReadCommand(Kind kind,
                           boolean isDigestQuery,
                           int digestVersion,
+                          boolean acceptsTransient,
                           TableMetadata metadata,
                           int nowInSec,
                           ColumnFilter columnFilter,
@@ -115,6 +117,7 @@ public abstract class ReadCommand extends AbstractReadQuery
         this.kind = kind;
         this.isDigestQuery = isDigestQuery;
         this.digestVersion = digestVersion;
+        this.acceptsTransient = acceptsTransient;
         this.index = index;
     }
 
@@ -176,6 +179,14 @@ public abstract class ReadCommand extends AbstractReadQuery
     }
 
     /**
+     * @return Whether this query expects only a transient data response, or a 
full response
+     */
+    public boolean acceptsTransient()
+    {
+        return acceptsTransient;
+    }
+
+    /**
      * Index (metadata) chosen for this query. Can be null.
      *
      * @return index (metadata) chosen for this query
@@ -210,6 +221,7 @@ public abstract class ReadCommand extends AbstractReadQuery
      * Returns a copy of this command with isDigestQuery set to true.
      */
     public abstract ReadCommand copyAsDigestQuery();
+    public abstract ReadCommand copyAsTransientQuery();
 
     protected abstract UnfilteredPartitionIterator 
queryStorage(ColumnFamilyStore cfs, ReadExecutionController 
executionController);
 
@@ -569,6 +581,16 @@ public abstract class ReadCommand extends AbstractReadQuery
             return (flags & 0x01) != 0;
         }
 
+        private static boolean acceptsTransient(int flags)
+        {
+            return (flags & 0x08) != 0;
+        }
+
+        private static int acceptsTransientFlag(boolean acceptsTransient)
+        {
+            return acceptsTransient ? 0x08 : 0;
+        }
+
         // We don't set this flag anymore, but still look if we receive a
         // command with it set in case someone is using thrift a mixed 3.0/4.0+
         // cluster (which is unsupported). This is also a reminder for not
@@ -592,7 +614,11 @@ public abstract class ReadCommand extends AbstractReadQuery
         public void serialize(ReadCommand command, DataOutputPlus out, int 
version) throws IOException
         {
             out.writeByte(command.kind.ordinal());
-            out.writeByte(digestFlag(command.isDigestQuery()) | indexFlag(null 
!= command.indexMetadata()));
+            out.writeByte(
+                    digestFlag(command.isDigestQuery())
+                    | indexFlag(null != command.indexMetadata())
+                    | acceptsTransientFlag(command.acceptsTransient())
+            );
             if (command.isDigestQuery())
                 out.writeUnsignedVInt(command.digestVersion());
             command.metadata().id.serialize(out);
@@ -611,6 +637,7 @@ public abstract class ReadCommand extends AbstractReadQuery
             Kind kind = Kind.values()[in.readByte()];
             int flags = in.readByte();
             boolean isDigest = isDigest(flags);
+            boolean acceptsTransient = acceptsTransient(flags);
             // Shouldn't happen or it's a user error (see comment above) but
             // better complain loudly than doing the wrong thing.
             if (isForThrift(flags))
@@ -628,7 +655,7 @@ public abstract class ReadCommand extends AbstractReadQuery
             DataLimits limits = DataLimits.serializer.deserialize(in, version, 
 metadata.comparator);
             IndexMetadata index = hasIndex ? deserializeIndexMetadata(in, 
version, metadata) : null;
 
-            return kind.selectionDeserializer.deserialize(in, version, 
isDigest, digestVersion, metadata, nowInSec, columnFilter, rowFilter, limits, 
index);
+            return kind.selectionDeserializer.deserialize(in, version, 
isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, 
rowFilter, limits, index);
         }
 
         private IndexMetadata deserializeIndexMetadata(DataInputPlus in, int 
version, TableMetadata metadata) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/SSTableImporter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SSTableImporter.java 
b/src/java/org/apache/cassandra/db/SSTableImporter.java
index c919d25..7597f82 100644
--- a/src/java/org/apache/cassandra/db/SSTableImporter.java
+++ b/src/java/org/apache/cassandra/db/SSTableImporter.java
@@ -349,9 +349,9 @@ public class SSTableImporter
             }
             if (options.clearRepaired)
             {
-                descriptor.getMetadataSerializer().mutateRepaired(descriptor,
-                                                                  
ActiveRepairService.UNREPAIRED_SSTABLE,
-                                                                  null);
+                
descriptor.getMetadataSerializer().mutateRepairMetadata(descriptor, 
ActiveRepairService.UNREPAIRED_SSTABLE,
+                                                                        null,
+                                                                        false);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java 
b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 97ab210..c81185e 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -36,7 +36,6 @@ import org.apache.cassandra.db.lifecycle.*;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.transform.Transformation;
-import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableReadsListener;
@@ -71,6 +70,7 @@ public class SinglePartitionReadCommand extends ReadCommand 
implements SinglePar
     @VisibleForTesting
     protected SinglePartitionReadCommand(boolean isDigest,
                                          int digestVersion,
+                                         boolean acceptsTransient,
                                          TableMetadata metadata,
                                          int nowInSec,
                                          ColumnFilter columnFilter,
@@ -80,7 +80,7 @@ public class SinglePartitionReadCommand extends ReadCommand 
implements SinglePar
                                          ClusteringIndexFilter 
clusteringIndexFilter,
                                          IndexMetadata index)
     {
-        super(Kind.SINGLE_PARTITION, isDigest, digestVersion, metadata, 
nowInSec, columnFilter, rowFilter, limits, index);
+        super(Kind.SINGLE_PARTITION, isDigest, digestVersion, 
acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, index);
         assert partitionKey.getPartitioner() == metadata.partitioner;
         this.partitionKey = partitionKey;
         this.clusteringIndexFilter = clusteringIndexFilter;
@@ -111,6 +111,7 @@ public class SinglePartitionReadCommand extends ReadCommand 
implements SinglePar
     {
         return new SinglePartitionReadCommand(false,
                                               0,
+                                              false,
                                               metadata,
                                               nowInSec,
                                               columnFilter,
@@ -286,6 +287,7 @@ public class SinglePartitionReadCommand extends ReadCommand 
implements SinglePar
     {
         return new SinglePartitionReadCommand(isDigestQuery(),
                                               digestVersion(),
+                                              acceptsTransient(),
                                               metadata(),
                                               nowInSec(),
                                               columnFilter(),
@@ -300,6 +302,22 @@ public class SinglePartitionReadCommand extends 
ReadCommand implements SinglePar
     {
         return new SinglePartitionReadCommand(true,
                                               digestVersion(),
+                                              acceptsTransient(),
+                                              metadata(),
+                                              nowInSec(),
+                                              columnFilter(),
+                                              rowFilter(),
+                                              limits(),
+                                              partitionKey(),
+                                              clusteringIndexFilter(),
+                                              indexMetadata());
+    }
+
+    public SinglePartitionReadCommand copyAsTransientQuery()
+    {
+        return new SinglePartitionReadCommand(false,
+                                              0,
+                                              true,
                                               metadata(),
                                               nowInSec(),
                                               columnFilter(),
@@ -315,6 +333,7 @@ public class SinglePartitionReadCommand extends ReadCommand 
implements SinglePar
     {
         return new SinglePartitionReadCommand(isDigestQuery(),
                                               digestVersion(),
+                                              acceptsTransient(),
                                               metadata(),
                                               nowInSec(),
                                               columnFilter(),
@@ -1064,6 +1083,7 @@ public class SinglePartitionReadCommand extends 
ReadCommand implements SinglePar
                                        int version,
                                        boolean isDigest,
                                        int digestVersion,
+                                       boolean acceptsTransient,
                                        TableMetadata metadata,
                                        int nowInSec,
                                        ColumnFilter columnFilter,
@@ -1074,7 +1094,7 @@ public class SinglePartitionReadCommand extends 
ReadCommand implements SinglePar
         {
             DecoratedKey key = 
metadata.partitioner.decorateKey(metadata.partitionKeyType.readValue(in, 
DatabaseDescriptor.getMaxValueSize()));
             ClusteringIndexFilter filter = 
ClusteringIndexFilter.serializer.deserialize(in, version, metadata);
-            return new SinglePartitionReadCommand(isDigest, digestVersion, 
metadata, nowInSec, columnFilter, rowFilter, limits, key, filter, index);
+            return new SinglePartitionReadCommand(isDigest, digestVersion, 
acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, key, 
filter, index);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java 
b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index fb9e889..ff070a3 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -29,13 +29,15 @@ import java.util.stream.StreamSupport;
 import javax.management.openmbean.OpenDataException;
 import javax.management.openmbean.TabularData;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.SetMultimap;
 import com.google.common.collect.Sets;
 import com.google.common.io.ByteStreams;
 import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.locator.Replica;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -71,6 +73,8 @@ import static java.util.Collections.singletonMap;
 
 import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
 import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal;
+import static org.apache.cassandra.locator.Replica.fullReplica;
+import static org.apache.cassandra.locator.Replica.transientReplica;
 
 public final class SystemKeyspace
 {
@@ -95,12 +99,10 @@ public final class SystemKeyspace
     public static final String LOCAL = "local";
     public static final String PEERS_V2 = "peers_v2";
     public static final String PEER_EVENTS_V2 = "peer_events_v2";
-    public static final String RANGE_XFERS = "range_xfers";
     public static final String COMPACTION_HISTORY = "compaction_history";
     public static final String SSTABLE_ACTIVITY = "sstable_activity";
     public static final String SIZE_ESTIMATES = "size_estimates";
-    public static final String AVAILABLE_RANGES = "available_ranges";
-    public static final String TRANSFERRED_RANGES = "transferred_ranges";
+    public static final String AVAILABLE_RANGES_V2 = "available_ranges_v2";
     public static final String TRANSFERRED_RANGES_V2 = "transferred_ranges_v2";
     public static final String VIEW_BUILDS_IN_PROGRESS = 
"view_builds_in_progress";
     public static final String BUILT_VIEWS = "built_views";
@@ -110,6 +112,8 @@ public final class SystemKeyspace
     @Deprecated public static final String LEGACY_PEERS = "peers";
     @Deprecated public static final String LEGACY_PEER_EVENTS = "peer_events";
     @Deprecated public static final String LEGACY_TRANSFERRED_RANGES = 
"transferred_ranges";
+    @Deprecated public static final String LEGACY_AVAILABLE_RANGES = 
"available_ranges";
+
 
     public static final TableMetadata Batches =
         parse(BATCHES,
@@ -207,15 +211,6 @@ public final class SystemKeyspace
                 + "PRIMARY KEY ((peer), peer_port))")
                 .build();
 
-    private static final TableMetadata RangeXfers =
-        parse(RANGE_XFERS,
-                "ranges requested for transfer",
-                "CREATE TABLE %s ("
-                + "token_bytes blob,"
-                + "requested_at timestamp,"
-                + "PRIMARY KEY ((token_bytes)))")
-                .build();
-
     private static final TableMetadata CompactionHistory =
         parse(COMPACTION_HISTORY,
                 "week-long compaction history",
@@ -256,14 +251,15 @@ public final class SystemKeyspace
                 + "PRIMARY KEY ((keyspace_name), table_name, range_start, 
range_end))")
                 .build();
 
-    private static final TableMetadata AvailableRanges =
-        parse(AVAILABLE_RANGES,
-                "available keyspace/ranges during bootstrap/replace that are 
ready to be served",
-                "CREATE TABLE %s ("
-                + "keyspace_name text,"
-                + "ranges set<blob>,"
-                + "PRIMARY KEY ((keyspace_name)))")
-                .build();
+    private static final TableMetadata AvailableRangesV2 =
+    parse(AVAILABLE_RANGES_V2,
+          "available keyspace/ranges during bootstrap/replace that are ready 
to be served",
+          "CREATE TABLE %s ("
+          + "keyspace_name text,"
+          + "full_ranges set<blob>,"
+          + "transient_ranges set<blob>,"
+          + "PRIMARY KEY ((keyspace_name)))")
+    .build();
 
     private static final TableMetadata TransferredRangesV2 =
         parse(TRANSFERRED_RANGES_V2,
@@ -366,6 +362,16 @@ public final class SystemKeyspace
             + "PRIMARY KEY ((operation, keyspace_name), peer))")
             .build();
 
+    @Deprecated
+    private static final TableMetadata LegacyAvailableRanges =
+        parse(LEGACY_AVAILABLE_RANGES,
+              "available keyspace/ranges during bootstrap/replace that are 
ready to be served",
+              "CREATE TABLE %s ("
+              + "keyspace_name text,"
+              + "ranges set<blob>,"
+              + "PRIMARY KEY ((keyspace_name)))")
+        .build();
+
     private static TableMetadata.Builder parse(String table, String 
description, String cql)
     {
         return CreateTableStatement.parse(format(cql, table), 
SchemaConstants.SYSTEM_KEYSPACE_NAME)
@@ -390,11 +396,11 @@ public final class SystemKeyspace
                          LegacyPeers,
                          PeerEventsV2,
                          LegacyPeerEvents,
-                         RangeXfers,
                          CompactionHistory,
                          SSTableActivity,
                          SizeEstimates,
-                         AvailableRanges,
+                         AvailableRangesV2,
+                         LegacyAvailableRanges,
                          TransferredRangesV2,
                          LegacyTransferredRanges,
                          ViewBuildsInProgress,
@@ -1270,36 +1276,38 @@ public final class SystemKeyspace
         executeInternal(cql, keyspace, table);
     }
 
-    public static synchronized void updateAvailableRanges(String keyspace, 
Collection<Range<Token>> completedRanges)
+    public static synchronized void updateAvailableRanges(String keyspace, 
Collection<Range<Token>> completedFullRanges, Collection<Range<Token>> 
completedTransientRanges)
     {
-        String cql = "UPDATE system.%s SET ranges = ranges + ? WHERE 
keyspace_name = ?";
-        Set<ByteBuffer> rangesToUpdate = new HashSet<>(completedRanges.size());
-        for (Range<Token> range : completedRanges)
-        {
-            rangesToUpdate.add(rangeToBytes(range));
-        }
-        executeInternal(format(cql, AVAILABLE_RANGES), rangesToUpdate, 
keyspace);
+        String cql = "UPDATE system.%s SET full_ranges = full_ranges + ?, 
transient_ranges = transient_ranges + ? WHERE keyspace_name = ?";
+        executeInternal(format(cql, AVAILABLE_RANGES_V2),
+                        
completedFullRanges.stream().map(SystemKeyspace::rangeToBytes).collect(Collectors.toSet()),
+                        
completedTransientRanges.stream().map(SystemKeyspace::rangeToBytes).collect(Collectors.toSet()),
+                        keyspace);
     }
 
-    public static synchronized Set<Range<Token>> getAvailableRanges(String 
keyspace, IPartitioner partitioner)
+    public static synchronized RangesAtEndpoint getAvailableRanges(String 
keyspace, IPartitioner partitioner)
     {
-        Set<Range<Token>> result = new HashSet<>();
         String query = "SELECT * FROM system.%s WHERE keyspace_name=?";
-        UntypedResultSet rs = executeInternal(format(query, AVAILABLE_RANGES), 
keyspace);
+        UntypedResultSet rs = executeInternal(format(query, 
AVAILABLE_RANGES_V2), keyspace);
+        InetAddressAndPort endpoint = InetAddressAndPort.getLocalHost();
+        RangesAtEndpoint.Builder builder = RangesAtEndpoint.builder(endpoint);
         for (UntypedResultSet.Row row : rs)
         {
-            Set<ByteBuffer> rawRanges = row.getSet("ranges", 
BytesType.instance);
-            for (ByteBuffer rawRange : rawRanges)
-            {
-                result.add(byteBufferToRange(rawRange, partitioner));
-            }
+            Optional.ofNullable(row.getSet("full_ranges", BytesType.instance))
+                    .ifPresent(full_ranges -> full_ranges.stream()
+                            .map(buf -> byteBufferToRange(buf, partitioner))
+                            .forEach(range -> 
builder.add(fullReplica(endpoint, range))));
+            Optional.ofNullable(row.getSet("transient_ranges", 
BytesType.instance))
+                    .ifPresent(transient_ranges -> transient_ranges.stream()
+                            .map(buf -> byteBufferToRange(buf, partitioner))
+                            .forEach(range -> 
builder.add(transientReplica(endpoint, range))));
         }
-        return ImmutableSet.copyOf(result);
+        return builder.build();
     }
 
     public static void resetAvailableRanges()
     {
-        ColumnFamilyStore availableRanges = 
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(AVAILABLE_RANGES);
+        ColumnFamilyStore availableRanges = 
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(AVAILABLE_RANGES_V2);
         availableRanges.truncateBlocking();
     }
 
@@ -1405,7 +1413,13 @@ public final class SystemKeyspace
         return result.one().getString("release_version");
     }
 
-    private static ByteBuffer rangeToBytes(Range<Token> range)
+    @VisibleForTesting
+    public static Set<Range<Token>> rawRangesToRangeSet(Set<ByteBuffer> 
rawRanges, IPartitioner partitioner)
+    {
+        return rawRanges.stream().map(buf -> byteBufferToRange(buf, 
partitioner)).collect(Collectors.toSet());
+    }
+
+    static ByteBuffer rangeToBytes(Range<Token> range)
     {
         try (DataOutputBuffer out = new DataOutputBuffer())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/SystemKeyspaceMigrator40.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspaceMigrator40.java 
b/src/java/org/apache/cassandra/db/SystemKeyspaceMigrator40.java
index ea5ff59..e0a58ba 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspaceMigrator40.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspaceMigrator40.java
@@ -18,6 +18,11 @@
 
 package org.apache.cassandra.db;
 
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,6 +50,9 @@ public class SystemKeyspaceMigrator40
     static final String peerEventsName = String.format("%s.%s", 
SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.PEER_EVENTS_V2);
     static final String legacyTransferredRangesName = String.format("%s.%s", 
SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.LEGACY_TRANSFERRED_RANGES);
     static final String transferredRangesName = String.format("%s.%s", 
SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.TRANSFERRED_RANGES_V2);
+    static final String legacyAvailableRangesName = String.format("%s.%s", 
SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.LEGACY_AVAILABLE_RANGES);
+    static final String availableRangesName = String.format("%s.%s", 
SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.AVAILABLE_RANGES_V2);
+
 
     private static final Logger logger = 
LoggerFactory.getLogger(SystemKeyspaceMigrator40.class);
 
@@ -55,6 +63,7 @@ public class SystemKeyspaceMigrator40
         migratePeers();
         migratePeerEvents();
         migrateTransferredRanges();
+        migrateAvailableRanges();
     }
 
     private static void migratePeers()
@@ -181,4 +190,40 @@ public class SystemKeyspaceMigrator40
         logger.info("Migrated {} rows from legacy {} to {}", transferred, 
legacyTransferredRangesName, transferredRangesName);
     }
 
+    static void migrateAvailableRanges()
+    {
+        ColumnFamilyStore newAvailableRanges = 
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(SystemKeyspace.AVAILABLE_RANGES_V2);
+
+        if (!newAvailableRanges.isEmpty())
+            return;
+
+        logger.info("{} table was empty, migrating legacy {} to {}", 
availableRangesName, legacyAvailableRangesName, availableRangesName);
+
+        String query = String.format("SELECT * FROM %s",
+                                     legacyAvailableRangesName);
+
+        String insert = String.format("INSERT INTO %s ("
+                                      + "keyspace_name, "
+                                      + "full_ranges, "
+                                      + "transient_ranges) "
+                                      + " values ( ?, ?, ? )",
+                                      availableRangesName);
+
+        UntypedResultSet rows = 
QueryProcessor.executeInternalWithPaging(query, 1000);
+        int transferred = 0;
+        for (UntypedResultSet.Row row : rows)
+        {
+            logger.debug("Transferring row {}", transferred);
+            String keyspace = row.getString("keyspace_name");
+            Set<ByteBuffer> ranges = Optional.ofNullable(row.getSet("ranges", 
BytesType.instance)).orElse(Collections.emptySet());
+            QueryProcessor.executeInternal(insert,
+                                           keyspace,
+                                           ranges,
+                                           Collections.emptySet());
+            transferred++;
+        }
+
+        logger.info("Migrated {} rows from legacy {} to {}", transferred, 
legacyAvailableRangesName, availableRangesName);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/TableCQLHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/TableCQLHelper.java 
b/src/java/org/apache/cassandra/db/TableCQLHelper.java
index 550a6d6..f97bebc 100644
--- a/src/java/org/apache/cassandra/db/TableCQLHelper.java
+++ b/src/java/org/apache/cassandra/db/TableCQLHelper.java
@@ -310,6 +310,7 @@ public class TableCQLHelper
         builder.append("\n\tAND max_index_interval = 
").append(tableParams.maxIndexInterval);
         builder.append("\n\tAND memtable_flush_period_in_ms = 
").append(tableParams.memtableFlushPeriodInMs);
         builder.append("\n\tAND speculative_retry = 
'").append(tableParams.speculativeRetry).append("'");
+        builder.append("\n\tAND speculative_write_threshold = 
'").append(tableParams.speculativeWriteThreshold).append("'");
         builder.append("\n\tAND comment = 
").append(singleQuote(tableParams.comment));
         builder.append("\n\tAND caching = 
").append(toCQL(tableParams.caching.asMap()));
         builder.append("\n\tAND compaction = 
").append(toCQL(tableParams.compaction.asMap()));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 59bdce6..28ea90a 100644
--- 
a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -530,12 +530,13 @@ public abstract class AbstractCompactionStrategy
                                                        long keyCount,
                                                        long repairedAt,
                                                        UUID pendingRepair,
+                                                       boolean isTransient,
                                                        MetadataCollector meta,
                                                        SerializationHeader 
header,
                                                        Collection<Index> 
indexes,
                                                        LifecycleTransaction 
txn)
     {
-        return SimpleSSTableMultiWriter.create(descriptor, keyCount, 
repairedAt, pendingRepair, cfs.metadata, meta, header, indexes, txn);
+        return SimpleSSTableMultiWriter.create(descriptor, keyCount, 
repairedAt, pendingRepair, isTransient, cfs.metadata, meta, header, indexes, 
txn);
     }
 
     public boolean supportsEarlyOpen()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java 
b/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java
index dc16261..24bea06 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java
@@ -158,11 +158,11 @@ public abstract class AbstractStrategyHolder
      * groups they deal with. IOW, if one holder returns true for a given 
isRepaired/isPendingRepair combo,
      * none of the others should.
      */
-    public abstract boolean managesRepairedGroup(boolean isRepaired, boolean 
isPendingRepair);
+    public abstract boolean managesRepairedGroup(boolean isRepaired, boolean 
isPendingRepair, boolean isTransient);
 
     public boolean managesSSTable(SSTableReader sstable)
     {
-        return managesRepairedGroup(sstable.isRepaired(), 
sstable.isPendingRepair());
+        return managesRepairedGroup(sstable.isRepaired(), 
sstable.isPendingRepair(), sstable.isTransient());
     }
 
     public abstract AbstractCompactionStrategy getStrategyFor(SSTableReader 
sstable);
@@ -193,6 +193,7 @@ public abstract class AbstractStrategyHolder
                                                                 long keyCount,
                                                                 long 
repairedAt,
                                                                 UUID 
pendingRepair,
+                                                                boolean 
isTransient,
                                                                 
MetadataCollector collector,
                                                                 
SerializationHeader header,
                                                                 
Collection<Index> indexes,
@@ -203,4 +204,6 @@ public abstract class AbstractStrategyHolder
      * if it's not held by this holder
      */
     public abstract int getStrategyIndex(AbstractCompactionStrategy strategy);
+
+    public abstract boolean containsSSTable(SSTableReader sstable);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index a872fea..2a56650 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -23,7 +23,7 @@ import java.lang.management.ManagementFactory;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.LongPredicate;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
@@ -34,6 +34,9 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.*;
 import com.google.common.util.concurrent.*;
+
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.locator.Replica;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,7 +46,6 @@ import 
org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.repair.ValidationPartitionIterator;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.schema.Schema;
@@ -71,7 +73,6 @@ import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.metrics.CompactionMetrics;
 import org.apache.cassandra.metrics.TableMetrics;
-import org.apache.cassandra.repair.Validator;
 import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageService;
@@ -81,6 +82,8 @@ import org.apache.cassandra.utils.Throwables;
 import org.apache.cassandra.utils.concurrent.Refs;
 
 import static java.util.Collections.singleton;
+import static 
org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR;
+import static 
org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE;
 
 /**
  * <p>
@@ -509,7 +512,10 @@ public class CompactionManager implements 
CompactionManagerMBean
             return AllSSTableOpStatus.ABORTED;
         }
         // if local ranges is empty, it means no data should remain
-        final Collection<Range<Token>> ranges = 
StorageService.instance.getLocalRanges(keyspace.getName());
+        final RangesAtEndpoint replicas = 
StorageService.instance.getLocalReplicas(keyspace.getName());
+        final Set<Range<Token>> allRanges = replicas.ranges();
+        final Set<Range<Token>> transientRanges = 
replicas.filter(Replica::isTransient).ranges();
+        final Set<Range<Token>> fullRanges = 
replicas.filter(Replica::isFull).ranges();
         final boolean hasIndexes = cfStore.indexManager.hasIndexes();
 
         return parallelAllSSTableOperation(cfStore, new OneSSTableOperation()
@@ -525,8 +531,8 @@ public class CompactionManager implements 
CompactionManagerMBean
             @Override
             public void execute(LifecycleTransaction txn) throws IOException
             {
-                CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfStore, 
ranges, FBUtilities.nowInSeconds());
-                doCleanupOne(cfStore, txn, cleanupStrategy, ranges, 
hasIndexes);
+                CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfStore, 
allRanges, transientRanges, txn.onlyOne().isRepaired(), 
FBUtilities.nowInSeconds());
+                doCleanupOne(cfStore, txn, cleanupStrategy, replicas.ranges(), 
fullRanges, transientRanges, hasIndexes);
             }
         }, jobs, OperationType.CLEANUP);
     }
@@ -574,9 +580,8 @@ public class CompactionManager implements 
CompactionManagerMBean
             logger.info("Partitioner does not support splitting");
             return AllSSTableOpStatus.ABORTED;
         }
-        final Collection<Range<Token>> r = 
StorageService.instance.getLocalRanges(cfs.keyspace.getName());
 
-        if (r.isEmpty())
+        if 
(StorageService.instance.getLocalReplicas(cfs.keyspace.getName()).isEmpty())
         {
             logger.info("Relocate cannot run before a node has joined the 
ring");
             return AllSSTableOpStatus.ABORTED;
@@ -643,7 +648,11 @@ public class CompactionManager implements 
CompactionManagerMBean
     /**
      * Splits the given token ranges of the given sstables into a pending 
repair silo
      */
-    public ListenableFuture<?> submitPendingAntiCompaction(ColumnFamilyStore 
cfs, Collection<Range<Token>> ranges, Refs<SSTableReader> sstables, 
LifecycleTransaction txn, UUID sessionId)
+    public ListenableFuture<?> submitPendingAntiCompaction(ColumnFamilyStore 
cfs,
+                                                           RangesAtEndpoint 
tokenRanges,
+                                                           Refs<SSTableReader> 
sstables,
+                                                           
LifecycleTransaction txn,
+                                                           UUID sessionId)
     {
         Runnable runnable = new WrappedRunnable()
         {
@@ -651,7 +660,7 @@ public class CompactionManager implements 
CompactionManagerMBean
             {
                 try (TableMetrics.TableTimer.Context ctx = 
cfs.metric.anticompactionTime.time())
                 {
-                    performAnticompaction(cfs, ranges, sstables, txn, 
ActiveRepairService.UNREPAIRED_SSTABLE, sessionId, sessionId);
+                    performAnticompaction(cfs, tokenRanges, sstables, txn, 
sessionId);
                 }
             }
         };
@@ -673,48 +682,69 @@ public class CompactionManager implements 
CompactionManagerMBean
     }
 
     /**
+     * for sstables that are fully contained in the given ranges, just rewrite 
their metadata with
+     * the pending repair id and remove them from the transaction
+     */
+    private static void mutateFullyContainedSSTables(ColumnFamilyStore cfs,
+                                                     Refs<SSTableReader> refs,
+                                                     Iterator<SSTableReader> 
sstableIterator,
+                                                     Collection<Range<Token>> 
ranges,
+                                                     LifecycleTransaction txn,
+                                                     UUID sessionID,
+                                                     boolean isTransient) 
throws IOException
+    {
+        if (ranges.isEmpty())
+            return;
+
+        List<Range<Token>> normalizedRanges = Range.normalize(ranges);
+
+        Set<SSTableReader> fullyContainedSSTables = 
findSSTablesToAnticompact(sstableIterator, normalizedRanges, sessionID);
+
+        
cfs.metric.bytesMutatedAnticompaction.inc(SSTableReader.getTotalBytes(fullyContainedSSTables));
+        
cfs.getCompactionStrategyManager().mutateRepaired(fullyContainedSSTables, 
UNREPAIRED_SSTABLE, sessionID, isTransient);
+        // since we're just re-writing the sstable metdata for the fully 
contained sstables, we don't want
+        // them obsoleted when the anti-compaction is complete. So they're 
removed from the transaction here
+        txn.cancel(fullyContainedSSTables);
+        refs.release(fullyContainedSSTables);
+    }
+
+    /**
      * Make sure the {validatedForRepair} are marked for compaction before 
calling this.
      *
      * Caller must reference the validatedForRepair sstables (via 
ParentRepairSession.getActiveRepairedSSTableRefs(..)).
      *
      * @param cfs
-     * @param ranges Ranges that the repair was carried out on
+     * @param ranges token ranges to be repaired
      * @param validatedForRepair SSTables containing the repaired ranges. 
Should be referenced before passing them.
-     * @param parentRepairSession parent repair session ID
+     * @param sessionID the repair session we're anti-compacting for
      * @throws InterruptedException
      * @throws IOException
      */
     public void performAnticompaction(ColumnFamilyStore cfs,
-                                      Collection<Range<Token>> ranges,
+                                      RangesAtEndpoint ranges,
                                       Refs<SSTableReader> validatedForRepair,
                                       LifecycleTransaction txn,
-                                      long repairedAt,
-                                      UUID pendingRepair,
-                                      UUID parentRepairSession) throws 
InterruptedException, IOException
+                                      UUID sessionID) throws IOException
     {
         try
         {
-            ActiveRepairService.ParentRepairSession prs = 
ActiveRepairService.instance.getParentRepairSession(parentRepairSession);
+            ActiveRepairService.ParentRepairSession prs = 
ActiveRepairService.instance.getParentRepairSession(sessionID);
             Preconditions.checkArgument(!prs.isPreview(), "Cannot anticompact 
for previews");
+            Preconditions.checkArgument(!ranges.isEmpty(), "No ranges to 
anti-compact");
 
             if (logger.isInfoEnabled())
-                logger.info("{} Starting anticompaction for {}.{} on {}/{} 
sstables", PreviewKind.NONE.logPrefix(parentRepairSession), 
cfs.keyspace.getName(), cfs.getTableName(), validatedForRepair.size(), 
cfs.getLiveSSTables().size());
+                logger.info("{} Starting anticompaction for {}.{} on {}/{} 
sstables", PreviewKind.NONE.logPrefix(sessionID), cfs.keyspace.getName(), 
cfs.getTableName(), validatedForRepair.size(), cfs.getLiveSSTables().size());
             if (logger.isTraceEnabled())
-                logger.trace("{} Starting anticompaction for ranges {}", 
PreviewKind.NONE.logPrefix(parentRepairSession), ranges);
-            Set<SSTableReader> sstables = new HashSet<>(validatedForRepair);
-
-            Iterator<SSTableReader> sstableIterator = sstables.iterator();
-            List<Range<Token>> normalizedRanges = Range.normalize(ranges);
+                logger.trace("{} Starting anticompaction for ranges {}", 
PreviewKind.NONE.logPrefix(sessionID), ranges);
 
-            Set<SSTableReader> fullyContainedSSTables = 
findSSTablesToAnticompact(sstableIterator, normalizedRanges, 
parentRepairSession);
+            Set<SSTableReader> sstables = new HashSet<>(validatedForRepair);
+            validateSSTableBoundsForAnticompaction(sessionID, sstables, 
ranges);
+            mutateFullyContainedSSTables(cfs, validatedForRepair, 
sstables.iterator(), ranges.fullRanges(), txn, sessionID, false);
+            mutateFullyContainedSSTables(cfs, validatedForRepair, 
sstables.iterator(), ranges.transientRanges(), txn, sessionID, true);
 
-            
cfs.metric.bytesMutatedAnticompaction.inc(SSTableReader.getTotalBytes(fullyContainedSSTables));
-            
cfs.getCompactionStrategyManager().mutateRepaired(fullyContainedSSTables, 
repairedAt, pendingRepair);
-            txn.cancel(fullyContainedSSTables);
-            validatedForRepair.release(fullyContainedSSTables);
             assert txn.originals().equals(sstables);
             if (!sstables.isEmpty())
-                doAntiCompaction(cfs, ranges, txn, repairedAt, pendingRepair);
+                doAntiCompaction(cfs, ranges, txn, sessionID);
             txn.finish();
         }
         finally
@@ -723,7 +753,28 @@ public class CompactionManager implements 
CompactionManagerMBean
             txn.close();
         }
 
-        logger.info("{} Completed anticompaction successfully", 
PreviewKind.NONE.logPrefix(parentRepairSession));
+        logger.info("{} Completed anticompaction successfully", 
PreviewKind.NONE.logPrefix(sessionID));
+    }
+
+    static void validateSSTableBoundsForAnticompaction(UUID sessionID,
+                                                       
Collection<SSTableReader> sstables,
+                                                       RangesAtEndpoint ranges)
+    {
+        List<Range<Token>> normalizedRanges = Range.normalize(ranges.ranges());
+        for (SSTableReader sstable : sstables)
+        {
+            Bounds<Token> bounds = new Bounds<>(sstable.first.getToken(), 
sstable.last.getToken());
+
+            if (!Iterables.any(normalizedRanges, r -> (r.contains(bounds.left) 
&& r.contains(bounds.right)) || r.intersects(bounds)))
+            {
+                // this should never happen - in 
PendingAntiCompaction#getSSTables we select all sstables that intersect the 
repaired ranges, that can't have changed here
+                String message = String.format("%s SSTable %s (%s) does not 
intersect repaired ranges %s, this sstable should not have been included.",
+                                               
PreviewKind.NONE.logPrefix(sessionID), sstable, bounds, normalizedRanges);
+                logger.error(message);
+                throw new IllegalStateException(message);
+            }
+        }
+
     }
 
     @VisibleForTesting
@@ -736,8 +787,6 @@ public class CompactionManager implements 
CompactionManagerMBean
 
             Bounds<Token> sstableBounds = new 
Bounds<>(sstable.first.getToken(), sstable.last.getToken());
 
-            boolean shouldAnticompact = false;
-
             for (Range<Token> r : normalizedRanges)
             {
                 // ranges are normalized - no wrap around - if first and last 
are contained we know that all tokens are contained in the range
@@ -746,23 +795,13 @@ public class CompactionManager implements 
CompactionManagerMBean
                     logger.info("{} SSTable {} fully contained in range {}, 
mutating repairedAt instead of anticompacting", 
PreviewKind.NONE.logPrefix(parentRepairSession), sstable, r);
                     fullyContainedSSTables.add(sstable);
                     sstableIterator.remove();
-                    shouldAnticompact = true;
                     break;
                 }
                 else if (r.intersects(sstableBounds))
                 {
                     logger.info("{} SSTable {} ({}) will be anticompacted on 
range {}", PreviewKind.NONE.logPrefix(parentRepairSession), sstable, 
sstableBounds, r);
-                    shouldAnticompact = true;
                 }
             }
-
-            if (!shouldAnticompact)
-            {
-                // this should never happen - in 
PendingAntiCompaction#getSSTables we select all sstables that intersect the 
repaired ranges, that can't have changed here
-                String message = String.format("%s SSTable %s (%s) does not 
intersect repaired ranges %s, this sstable should not have been included.", 
PreviewKind.NONE.logPrefix(parentRepairSession), sstable, sstableBounds, 
normalizedRanges);
-                logger.error(message);
-                throw new IllegalStateException(message);
-            }
         }
         return fullyContainedSSTables;
     }
@@ -914,7 +953,10 @@ public class CompactionManager implements 
CompactionManagerMBean
         {
             ColumnFamilyStore cfs = entry.getKey();
             Keyspace keyspace = cfs.keyspace;
-            Collection<Range<Token>> ranges = 
StorageService.instance.getLocalRanges(keyspace.getName());
+            final RangesAtEndpoint replicas = 
StorageService.instance.getLocalReplicas(keyspace.getName());
+            final Set<Range<Token>> allRanges = replicas.ranges();
+            final Set<Range<Token>> transientRanges = 
replicas.filter(Replica::isTransient).ranges();
+            final Set<Range<Token>> fullRanges = 
replicas.filter(Replica::isFull).ranges();
             boolean hasIndexes = cfs.indexManager.hasIndexes();
             SSTableReader sstable = lookupSSTable(cfs, entry.getValue());
 
@@ -924,10 +966,10 @@ public class CompactionManager implements 
CompactionManagerMBean
             }
             else
             {
-                CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfs, 
ranges, FBUtilities.nowInSeconds());
+                CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfs, 
allRanges, transientRanges, sstable.isRepaired(), FBUtilities.nowInSeconds());
                 try (LifecycleTransaction txn = 
cfs.getTracker().tryModify(sstable, OperationType.CLEANUP))
                 {
-                    doCleanupOne(cfs, txn, cleanupStrategy, ranges, 
hasIndexes);
+                    doCleanupOne(cfs, txn, cleanupStrategy, allRanges, 
fullRanges, transientRanges, hasIndexes);
                 }
                 catch (IOException e)
                 {
@@ -1104,22 +1146,33 @@ public class CompactionManager implements 
CompactionManagerMBean
      *
      * @throws IOException
      */
-    private void doCleanupOne(final ColumnFamilyStore cfs, 
LifecycleTransaction txn, CleanupStrategy cleanupStrategy, 
Collection<Range<Token>> ranges, boolean hasIndexes) throws IOException
+    private void doCleanupOne(final ColumnFamilyStore cfs,
+                              LifecycleTransaction txn,
+                              CleanupStrategy cleanupStrategy,
+                              Collection<Range<Token>> allRanges,
+                              Collection<Range<Token>> fullRanges,
+                              Collection<Range<Token>> transientRanges,
+                              boolean hasIndexes) throws IOException
     {
         assert !cfs.isIndex();
 
         SSTableReader sstable = txn.onlyOne();
 
         // if ranges is empty and no index, entire sstable is discarded
-        if (!hasIndexes && !new Bounds<>(sstable.first.getToken(), 
sstable.last.getToken()).intersects(ranges))
+        if (!hasIndexes && !new Bounds<>(sstable.first.getToken(), 
sstable.last.getToken()).intersects(allRanges))
         {
             txn.obsoleteOriginals();
             txn.finish();
             return;
         }
-        if (!needsCleanup(sstable, ranges))
+
+        boolean needsCleanupFull = needsCleanup(sstable, fullRanges);
+        boolean needsCleanupTransient = needsCleanup(sstable, transientRanges);
+        //If there are no ranges for which the table needs cleanup either due 
to lack of intersection or lack
+        //of the table being repaired.
+        if (!needsCleanupFull && (!needsCleanupTransient || 
!sstable.isRepaired()))
         {
-            logger.trace("Skipping {} for cleanup; all rows should be kept", 
sstable);
+            logger.trace("Skipping {} for cleanup; all rows should be kept. 
Needs cleanup full ranges: {} Needs cleanup transient ranges: {} Repaired: {}", 
sstable, needsCleanupFull, needsCleanupTransient, sstable.isRepaired());
             return;
         }
 
@@ -1150,7 +1203,7 @@ public class CompactionManager implements 
CompactionManagerMBean
              CompactionIterator ci = new 
CompactionIterator(OperationType.CLEANUP, Collections.singletonList(scanner), 
controller, nowInSec, UUIDGen.getTimeUUID(), metrics))
         {
             StatsMetadata metadata = sstable.getSSTableMetadata();
-            writer.switchWriter(createWriter(cfs, compactionFileLocation, 
expectedBloomFilterSize, metadata.repairedAt, metadata.pendingRepair, sstable, 
txn));
+            writer.switchWriter(createWriter(cfs, compactionFileLocation, 
expectedBloomFilterSize, metadata.repairedAt, metadata.pendingRepair, 
metadata.isTransient, sstable, txn));
             long lastBytesScanned = 0;
 
             while (ci.hasNext())
@@ -1218,11 +1271,18 @@ public class CompactionManager implements 
CompactionManagerMBean
             this.nowInSec = nowInSec;
         }
 
-        public static CleanupStrategy get(ColumnFamilyStore cfs, 
Collection<Range<Token>> ranges, int nowInSec)
+        public static CleanupStrategy get(ColumnFamilyStore cfs, 
Collection<Range<Token>> ranges, Collection<Range<Token>> transientRanges, 
boolean isRepaired, int nowInSec)
         {
-            return cfs.indexManager.hasIndexes()
-                 ? new Full(cfs, ranges, nowInSec)
-                 : new Bounded(cfs, ranges, nowInSec);
+            if (cfs.indexManager.hasIndexes())
+            {
+                if (!transientRanges.isEmpty())
+                {
+                    //Shouldn't have been possible to create this situation
+                    throw new AssertionError("Can't have indexes and transient 
ranges");
+                }
+                return new Full(cfs, ranges, nowInSec);
+            }
+            return new Bounded(cfs, ranges, transientRanges, isRepaired, 
nowInSec);
         }
 
         public abstract ISSTableScanner getScanner(SSTableReader sstable);
@@ -1230,7 +1290,10 @@ public class CompactionManager implements 
CompactionManagerMBean
 
         private static final class Bounded extends CleanupStrategy
         {
-            public Bounded(final ColumnFamilyStore cfs, 
Collection<Range<Token>> ranges, int nowInSec)
+            private final Collection<Range<Token>> transientRanges;
+            private final boolean isRepaired;
+
+            public Bounded(final ColumnFamilyStore cfs, 
Collection<Range<Token>> ranges, Collection<Range<Token>> transientRanges, 
boolean isRepaired, int nowInSec)
             {
                 super(ranges, nowInSec);
                 cacheCleanupExecutor.submit(new Runnable()
@@ -1241,12 +1304,23 @@ public class CompactionManager implements 
CompactionManagerMBean
                         cfs.cleanupCache();
                     }
                 });
+                this.transientRanges = transientRanges;
+                this.isRepaired = isRepaired;
             }
 
             @Override
             public ISSTableScanner getScanner(SSTableReader sstable)
             {
-                return sstable.getScanner(ranges);
+                //If transient replication is enabled and there are transient 
ranges
+                //then cleanup should remove any partitions that are repaired 
and in the transient range
+                //as they should already be synchronized at other full 
replicas.
+                //So just don't scan the portion of the table containing the 
repaired transient ranges
+                Collection<Range<Token>> rangesToScan = ranges;
+                if (isRepaired)
+                {
+                    rangesToScan = Collections2.filter(ranges, range -> 
!transientRanges.contains(range));
+                }
+                return sstable.getScanner(rangesToScan);
             }
 
             @Override
@@ -1291,6 +1365,7 @@ public class CompactionManager implements 
CompactionManagerMBean
                                              long expectedBloomFilterSize,
                                              long repairedAt,
                                              UUID pendingRepair,
+                                             boolean isTransient,
                                              SSTableReader sstable,
                                              LifecycleTransaction txn)
     {
@@ -1301,6 +1376,7 @@ public class CompactionManager implements 
CompactionManagerMBean
                                     expectedBloomFilterSize,
                                     repairedAt,
                                     pendingRepair,
+                                    isTransient,
                                     sstable.getSSTableLevel(),
                                     sstable.header,
                                     cfs.indexManager.listIndexes(),
@@ -1312,6 +1388,7 @@ public class CompactionManager implements 
CompactionManagerMBean
                                                               int 
expectedBloomFilterSize,
                                                               long repairedAt,
                                                               UUID 
pendingRepair,
+                                                              boolean 
isTransient,
                                                               
Collection<SSTableReader> sstables,
                                                               
LifecycleTransaction txn)
     {
@@ -1335,6 +1412,7 @@ public class CompactionManager implements 
CompactionManagerMBean
                                     (long) expectedBloomFilterSize,
                                     repairedAt,
                                     pendingRepair,
+                                    isTransient,
                                     cfs.metadata,
                                     new MetadataCollector(sstables, 
cfs.metadata().comparator, minLevel),
                                     SerializationHeader.make(cfs.metadata(), 
sstables),
@@ -1347,16 +1425,19 @@ public class CompactionManager implements 
CompactionManagerMBean
      * will store the non-repaired ranges. Once anticompation is completed, 
the original sstable is marked as compacted
      * and subsequently deleted.
      * @param cfs
-     * @param repaired a transaction over the repaired sstables to anticompacy
-     * @param ranges Repaired ranges to be placed into one of the new 
sstables. The repaired table will be tracked via
-     * the {@link 
org.apache.cassandra.io.sstable.metadata.StatsMetadata#repairedAt} field.
+     * @param txn a transaction over the repaired sstables to anticompact
+     * @param ranges full and transient ranges to be placed into one of the 
new sstables. The repaired table will be tracked via
+     *   the {@link 
org.apache.cassandra.io.sstable.metadata.StatsMetadata#pendingRepair} field.
      */
-    private void doAntiCompaction(ColumnFamilyStore cfs, 
Collection<Range<Token>> ranges, LifecycleTransaction repaired, long 
repairedAt, UUID pendingRepair)
+    private void doAntiCompaction(ColumnFamilyStore cfs,
+                                  RangesAtEndpoint ranges,
+                                  LifecycleTransaction txn,
+                                  UUID pendingRepair)
     {
-        logger.info("Performing anticompaction on {} sstables", 
repaired.originals().size());
+        logger.info("Performing anticompaction on {} sstables", 
txn.originals().size());
 
         //Group SSTables
-        Set<SSTableReader> sstables = repaired.originals();
+        Set<SSTableReader> sstables = txn.originals();
 
         // Repairs can take place on both unrepaired (incremental + full) and 
repaired (full) data.
         // Although anti-compaction could work on repaired sstables as well 
and would result in having more accurate
@@ -1366,101 +1447,111 @@ public class CompactionManager implements 
CompactionManagerMBean
         
cfs.metric.bytesAnticompacted.inc(SSTableReader.getTotalBytes(unrepairedSSTables));
         Collection<Collection<SSTableReader>> groupedSSTables = 
cfs.getCompactionStrategyManager().groupSSTablesForAntiCompaction(unrepairedSSTables);
 
-        // iterate over sstables to check if the repaired / unrepaired ranges 
intersect them.
+        // iterate over sstables to check if the full / transient / unrepaired 
ranges intersect them.
         int antiCompactedSSTableCount = 0;
         for (Collection<SSTableReader> sstableGroup : groupedSSTables)
         {
-            try (LifecycleTransaction txn = repaired.split(sstableGroup))
+            try (LifecycleTransaction groupTxn = txn.split(sstableGroup))
             {
-                int antiCompacted = antiCompactGroup(cfs, ranges, txn, 
repairedAt, pendingRepair);
+                int antiCompacted = antiCompactGroup(cfs, ranges, groupTxn, 
pendingRepair);
                 antiCompactedSSTableCount += antiCompacted;
             }
         }
 
         String format = "Anticompaction completed successfully, anticompacted 
from {} to {} sstable(s).";
-        logger.info(format, repaired.originals().size(), 
antiCompactedSSTableCount);
+        logger.info(format, txn.originals().size(), antiCompactedSSTableCount);
     }
 
-    private int antiCompactGroup(ColumnFamilyStore cfs, 
Collection<Range<Token>> ranges,
-                             LifecycleTransaction anticompactionGroup, long 
repairedAt, UUID pendingRepair)
+    private int antiCompactGroup(ColumnFamilyStore cfs,
+                                 RangesAtEndpoint ranges,
+                                 LifecycleTransaction txn,
+                                 UUID pendingRepair)
     {
+        Preconditions.checkArgument(!ranges.isEmpty(), "need at least one full 
or transient range");
         long groupMaxDataAge = -1;
 
-        for (Iterator<SSTableReader> i = 
anticompactionGroup.originals().iterator(); i.hasNext();)
+        for (Iterator<SSTableReader> i = txn.originals().iterator(); 
i.hasNext();)
         {
             SSTableReader sstable = i.next();
             if (groupMaxDataAge < sstable.maxDataAge)
                 groupMaxDataAge = sstable.maxDataAge;
         }
 
-        if (anticompactionGroup.originals().size() == 0)
+        if (txn.originals().size() == 0)
         {
             logger.info("No valid anticompactions for this group, All sstables 
were compacted and are no longer available");
             return 0;
         }
 
-        logger.info("Anticompacting {}", anticompactionGroup);
-        Set<SSTableReader> sstableAsSet = anticompactionGroup.originals();
+        logger.info("Anticompacting {}", txn);
+        Set<SSTableReader> sstableAsSet = txn.originals();
 
         File destination = 
cfs.getDirectories().getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet,
 OperationType.ANTICOMPACTION));
-        long repairedKeyCount = 0;
-        long unrepairedKeyCount = 0;
         int nowInSec = FBUtilities.nowInSeconds();
 
         CompactionStrategyManager strategy = 
cfs.getCompactionStrategyManager();
-        try (SSTableRewriter repairedSSTableWriter = 
SSTableRewriter.constructWithoutEarlyOpening(anticompactionGroup, false, 
groupMaxDataAge);
-             SSTableRewriter unRepairedSSTableWriter = 
SSTableRewriter.constructWithoutEarlyOpening(anticompactionGroup, false, 
groupMaxDataAge);
-             AbstractCompactionStrategy.ScannerList scanners = 
strategy.getScanners(anticompactionGroup.originals());
+        try (SSTableRewriter fullWriter = 
SSTableRewriter.constructWithoutEarlyOpening(txn, false, groupMaxDataAge);
+             SSTableRewriter transWriter = 
SSTableRewriter.constructWithoutEarlyOpening(txn, false, groupMaxDataAge);
+             SSTableRewriter unrepairedWriter = 
SSTableRewriter.constructWithoutEarlyOpening(txn, false, groupMaxDataAge);
+
+             AbstractCompactionStrategy.ScannerList scanners = 
strategy.getScanners(txn.originals());
              CompactionController controller = new CompactionController(cfs, 
sstableAsSet, getDefaultGcBefore(cfs, nowInSec));
              CompactionIterator ci = new 
CompactionIterator(OperationType.ANTICOMPACTION, scanners.scanners, controller, 
nowInSec, UUIDGen.getTimeUUID(), metrics))
         {
             int expectedBloomFilterSize = 
Math.max(cfs.metadata().params.minIndexInterval, 
(int)(SSTableReader.getApproximateKeyCount(sstableAsSet)));
 
-            
repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs,
 destination, expectedBloomFilterSize, repairedAt, pendingRepair, sstableAsSet, 
anticompactionGroup));
-            
unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs,
 destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, 
null, sstableAsSet, anticompactionGroup));
-            Range.OrderedRangeContainmentChecker containmentChecker = new 
Range.OrderedRangeContainmentChecker(ranges);
+            
fullWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, 
destination, expectedBloomFilterSize, UNREPAIRED_SSTABLE, pendingRepair, false, 
sstableAsSet, txn));
+            
transWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, 
destination, expectedBloomFilterSize, UNREPAIRED_SSTABLE, pendingRepair, true, 
sstableAsSet, txn));
+            
unrepairedWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs,
 destination, expectedBloomFilterSize, UNREPAIRED_SSTABLE, NO_PENDING_REPAIR, 
false, sstableAsSet, txn));
+
+            Predicate<Token> fullChecker = !ranges.fullRanges().isEmpty() ? 
new Range.OrderedRangeContainmentChecker(ranges.fullRanges()) : t -> false;
+            Predicate<Token> transChecker = 
!ranges.transientRanges().isEmpty() ? new 
Range.OrderedRangeContainmentChecker(ranges.transientRanges()) : t -> false;
             while (ci.hasNext())
             {
                 try (UnfilteredRowIterator partition = ci.next())
                 {
-                    // if current range from sstable is repaired, save it into 
the new repaired sstable
-                    if 
(containmentChecker.contains(partition.partitionKey().getToken()))
+                    Token token = partition.partitionKey().getToken();
+                    // if this row is contained in the full or transient 
ranges, append it to the appropriate sstable
+                    if (fullChecker.test(token))
                     {
-                        repairedSSTableWriter.append(partition);
-                        repairedKeyCount++;
+                        fullWriter.append(partition);
+                    }
+                    else if (transChecker.test(token))
+                    {
+                        transWriter.append(partition);
                     }
-                    // otherwise save into the new 'non-repaired' table
                     else
                     {
-                        unRepairedSSTableWriter.append(partition);
-                        unrepairedKeyCount++;
+                        // otherwise, append it to the unrepaired sstable
+                        unrepairedWriter.append(partition);
                     }
                 }
             }
 
             List<SSTableReader> anticompactedSSTables = new ArrayList<>();
-            // since both writers are operating over the same Transaction, we 
cannot use the convenience Transactional.finish() method,
+            // since all writers are operating over the same Transaction, we 
cannot use the convenience Transactional.finish() method,
             // as on the second finish() we would prepareToCommit() on a 
Transaction that has already been committed, which is forbidden by the API
             // (since it indicates misuse). We call permitRedundantTransitions 
so that calls that transition to a state already occupied are permitted.
-            anticompactionGroup.permitRedundantTransitions();
-            repairedSSTableWriter.setRepairedAt(repairedAt).prepareToCommit();
-            unRepairedSSTableWriter.prepareToCommit();
-            anticompactedSSTables.addAll(repairedSSTableWriter.finished());
-            anticompactedSSTables.addAll(unRepairedSSTableWriter.finished());
-            repairedSSTableWriter.commit();
-            unRepairedSSTableWriter.commit();
-
-            logger.trace("Repaired {} keys out of {} for {}/{} in {}", 
repairedKeyCount,
-                                                                       
repairedKeyCount + unrepairedKeyCount,
-                                                                       
cfs.keyspace.getName(),
-                                                                       
cfs.getTableName(),
-                                                                       
anticompactionGroup);
+            txn.permitRedundantTransitions();
+
+            fullWriter.prepareToCommit();
+            transWriter.prepareToCommit();
+            unrepairedWriter.prepareToCommit();
+
+            anticompactedSSTables.addAll(fullWriter.finished());
+            anticompactedSSTables.addAll(transWriter.finished());
+            anticompactedSSTables.addAll(unrepairedWriter.finished());
+
+            fullWriter.commit();
+            transWriter.commit();
+            unrepairedWriter.commit();
+
             return anticompactedSSTables.size();
         }
         catch (Throwable e)
         {
             JVMStabilityInspector.inspectThrowable(e);
-            logger.error("Error anticompacting " + anticompactionGroup, e);
+            logger.error("Error anticompacting " + txn, e);
         }
         return 0;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java
index 8fba121..8ce93fa 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.UUID;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.SerializationHeader;
@@ -71,11 +72,19 @@ public class CompactionStrategyHolder extends 
AbstractStrategyHolder
     }
 
     @Override
-    public boolean managesRepairedGroup(boolean isRepaired, boolean 
isPendingRepair)
+    public boolean managesRepairedGroup(boolean isRepaired, boolean 
isPendingRepair, boolean isTransient)
     {
-        Preconditions.checkArgument(!isPendingRepair || !isRepaired,
-                                    "SSTables cannot be both repaired and 
pending repair");
-        return !isPendingRepair && (isRepaired == this.isRepaired);
+        if (!isPendingRepair)
+        {
+            Preconditions.checkArgument(!isTransient, "isTransient can only be 
true for sstables pending repairs");
+            return this.isRepaired == isRepaired;
+        }
+        else
+        {
+            Preconditions.checkArgument(!isRepaired, "SSTables cannot be both 
repaired and pending repair");
+            return false;
+
+        }
     }
 
     @Override
@@ -206,7 +215,15 @@ public class CompactionStrategyHolder extends 
AbstractStrategyHolder
     }
 
     @Override
-    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, 
long keyCount, long repairedAt, UUID pendingRepair, MetadataCollector 
collector, SerializationHeader header, Collection<Index> indexes, 
LifecycleTransaction txn)
+    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor,
+                                                       long keyCount,
+                                                       long repairedAt,
+                                                       UUID pendingRepair,
+                                                       boolean isTransient,
+                                                       MetadataCollector 
collector,
+                                                       SerializationHeader 
header,
+                                                       Collection<Index> 
indexes,
+                                                       LifecycleTransaction 
txn)
     {
         if (isRepaired)
         {
@@ -226,6 +243,7 @@ public class CompactionStrategyHolder extends 
AbstractStrategyHolder
                                                  keyCount,
                                                  repairedAt,
                                                  pendingRepair,
+                                                 isTransient,
                                                  collector,
                                                  header,
                                                  indexes,
@@ -237,4 +255,10 @@ public class CompactionStrategyHolder extends 
AbstractStrategyHolder
     {
         return strategies.indexOf(strategy);
     }
+
+    @Override
+    public boolean containsSSTable(SSTableReader sstable)
+    {
+        return Iterables.any(strategies, acs -> 
acs.getSSTables().contains(sstable));
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to