http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 9766454..afe628b 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -56,6 +56,7 @@ import org.apache.cassandra.index.Index;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
@@ -112,6 +113,7 @@ public class CompactionStrategyManager implements 
INotificationConsumer
     /**
      * Variables guarded by read and write lock above
      */
+    private final PendingRepairHolder transientRepairs;
     private final PendingRepairHolder pendingRepairs;
     private final CompactionStrategyHolder repaired;
     private final CompactionStrategyHolder unrepaired;
@@ -156,10 +158,11 @@ public class CompactionStrategyManager implements 
INotificationConsumer
                 return compactionStrategyIndexForDirectory(descriptor);
             }
         };
-        pendingRepairs = new PendingRepairHolder(cfs, router);
+        transientRepairs = new PendingRepairHolder(cfs, router, true);
+        pendingRepairs = new PendingRepairHolder(cfs, router, false);
         repaired = new CompactionStrategyHolder(cfs, router, true);
         unrepaired = new CompactionStrategyHolder(cfs, router, false);
-        holders = ImmutableList.of(pendingRepairs, repaired, unrepaired);
+        holders = ImmutableList.of(transientRepairs, pendingRepairs, repaired, 
unrepaired);
 
         cfs.getTracker().subscribe(this);
         logger.trace("{} subscribed to the data tracker.", this);
@@ -176,7 +179,6 @@ public class CompactionStrategyManager implements 
INotificationConsumer
      * Return the next background task
      *
      * Returns a task for the compaction strategy that needs it the most (most 
estimated remaining tasks)
-     *
      */
     public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
     {
@@ -188,18 +190,16 @@ public class CompactionStrategyManager implements 
INotificationConsumer
                 return null;
 
             int numPartitions = getNumTokenPartitions();
+
             // first try to promote/demote sstables from completed repairs
-            List<TaskSupplier> repairFinishedSuppliers = 
pendingRepairs.getRepairFinishedTaskSuppliers();
-            if (!repairFinishedSuppliers.isEmpty())
-            {
-                Collections.sort(repairFinishedSuppliers);
-                for (TaskSupplier supplier : repairFinishedSuppliers)
-                {
-                    AbstractCompactionTask task = supplier.getTask();
-                    if (task != null)
-                        return task;
-                }
-            }
+            AbstractCompactionTask repairFinishedTask;
+            repairFinishedTask = pendingRepairs.getNextRepairFinishedTask();
+            if (repairFinishedTask != null)
+                return repairFinishedTask;
+
+            repairFinishedTask = transientRepairs.getNextRepairFinishedTask();
+            if (repairFinishedTask != null)
+                return repairFinishedTask;
 
             // sort compaction task suppliers by remaining tasks descending
             List<TaskSupplier> suppliers = new ArrayList<>(numPartitions * 
holders.size());
@@ -393,64 +393,28 @@ public class CompactionStrategyManager implements 
INotificationConsumer
         }
     }
 
-
-
     @VisibleForTesting
-    List<AbstractCompactionStrategy> getRepaired()
+    CompactionStrategyHolder getRepairedUnsafe()
     {
-        readLock.lock();
-        try
-        {
-            return Lists.newArrayList(repaired.allStrategies());
-        }
-        finally
-        {
-            readLock.unlock();
-        }
+        return repaired;
     }
 
     @VisibleForTesting
-    List<AbstractCompactionStrategy> getUnrepaired()
+    CompactionStrategyHolder getUnrepairedUnsafe()
     {
-        readLock.lock();
-        try
-        {
-            return Lists.newArrayList(unrepaired.allStrategies());
-        }
-        finally
-        {
-            readLock.unlock();
-        }
+        return unrepaired;
     }
 
     @VisibleForTesting
-    Iterable<AbstractCompactionStrategy> getForPendingRepair(UUID sessionID)
+    PendingRepairHolder getPendingRepairsUnsafe()
     {
-        readLock.lock();
-        try
-        {
-            return pendingRepairs.getStrategiesFor(sessionID);
-        }
-        finally
-        {
-            readLock.unlock();
-        }
+        return pendingRepairs;
     }
 
     @VisibleForTesting
-    Set<UUID> pendingRepairs()
+    PendingRepairHolder getTransientRepairsUnsafe()
     {
-        readLock.lock();
-        try
-        {
-            Set<UUID> ids = new HashSet<>();
-            pendingRepairs.getManagers().forEach(p -> 
ids.addAll(p.getSessions()));
-            return ids;
-        }
-        finally
-        {
-            readLock.unlock();
-        }
+        return transientRepairs;
     }
 
     public boolean hasDataForPendingRepair(UUID sessionID)
@@ -458,8 +422,7 @@ public class CompactionStrategyManager implements 
INotificationConsumer
         readLock.lock();
         try
         {
-            return Iterables.any(pendingRepairs.getManagers(),
-                                 prm -> prm.hasDataForSession(sessionID));
+            return pendingRepairs.hasDataForSession(sessionID) || 
transientRepairs.hasDataForSession(sessionID);
         }
         finally
         {
@@ -682,18 +645,19 @@ public class CompactionStrategyManager implements 
INotificationConsumer
         throw new IllegalStateException("No holder claimed " + sstable);
     }
 
-    private AbstractStrategyHolder getHolder(long repairedAt, UUID 
pendingRepair)
+    private AbstractStrategyHolder getHolder(long repairedAt, UUID 
pendingRepair, boolean isTransient)
     {
         return getHolder(repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE,
-                         pendingRepair != 
ActiveRepairService.NO_PENDING_REPAIR);
+                         pendingRepair != 
ActiveRepairService.NO_PENDING_REPAIR,
+                         isTransient);
     }
 
     @VisibleForTesting
-    AbstractStrategyHolder getHolder(boolean isRepaired, boolean 
isPendingRepair)
+    AbstractStrategyHolder getHolder(boolean isRepaired, boolean 
isPendingRepair, boolean isTransient)
     {
         for (AbstractStrategyHolder holder : holders)
         {
-            if (holder.managesRepairedGroup(isRepaired, isPendingRepair))
+            if (holder.managesRepairedGroup(isRepaired, isPendingRepair, 
isTransient))
                 return holder;
         }
 
@@ -1146,16 +1110,26 @@ public class CompactionStrategyManager implements 
INotificationConsumer
                                                        long keyCount,
                                                        long repairedAt,
                                                        UUID pendingRepair,
+                                                       boolean isTransient,
                                                        MetadataCollector 
collector,
                                                        SerializationHeader 
header,
                                                        Collection<Index> 
indexes,
                                                        LifecycleTransaction 
txn)
     {
+        SSTable.validateRepairedMetadata(repairedAt, pendingRepair, 
isTransient);
         maybeReloadDiskBoundaries();
         readLock.lock();
         try
         {
-            return getHolder(repairedAt, 
pendingRepair).createSSTableMultiWriter(descriptor, keyCount, repairedAt, 
pendingRepair, collector, header, indexes, txn);
+            return getHolder(repairedAt, pendingRepair, 
isTransient).createSSTableMultiWriter(descriptor,
+                                                                               
               keyCount,
+                                                                               
               repairedAt,
+                                                                               
               pendingRepair,
+                                                                               
               isTransient,
+                                                                               
               collector,
+                                                                               
               header,
+                                                                               
               indexes,
+                                                                               
               txn);
         }
         finally
         {
@@ -1220,7 +1194,7 @@ public class CompactionStrategyManager implements 
INotificationConsumer
      * Mutates sstable repairedAt times and notifies listeners of the change 
with the writeLock held. Prevents races
      * with other processes between when the metadata is changed and when 
sstables are moved between strategies.
       */
-    public void mutateRepaired(Collection<SSTableReader> sstables, long 
repairedAt, UUID pendingRepair) throws IOException
+    public void mutateRepaired(Collection<SSTableReader> sstables, long 
repairedAt, UUID pendingRepair, boolean isTransient) throws IOException
     {
         Set<SSTableReader> changed = new HashSet<>();
 
@@ -1229,7 +1203,7 @@ public class CompactionStrategyManager implements 
INotificationConsumer
         {
             for (SSTableReader sstable: sstables)
             {
-                
sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, 
repairedAt, pendingRepair);
+                
sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor,
 repairedAt, pendingRepair, isTransient);
                 sstable.reloadSSTableMetadata();
                 changed.add(sstable);
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 662384c..591b7c4 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -29,20 +29,19 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.RateLimiter;
-
-import org.apache.cassandra.db.Directories;
-import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
-import org.apache.cassandra.db.compaction.writers.DefaultCompactionWriter;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.SystemKeyspace;
 import 
org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector;
+import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
+import org.apache.cassandra.db.compaction.writers.DefaultCompactionWriter;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.FBUtilities;
@@ -339,6 +338,23 @@ public class CompactionTask extends AbstractCompactionTask
         return ids.iterator().next();
     }
 
+    public static boolean getIsTransient(Set<SSTableReader> sstables)
+    {
+        if (sstables.isEmpty())
+        {
+            return false;
+        }
+
+        boolean isTransient = sstables.iterator().next().isTransient();
+
+        if (!Iterables.all(sstables, sstable -> sstable.isTransient() == 
isTransient))
+        {
+            throw new RuntimeException("Attempting to compact transient 
sstables with non transient sstables");
+        }
+
+        return isTransient;
+    }
+
 
     /*
      * Checks if we have enough disk space to execute the compaction.  Drops 
the largest sstable out of the Task until

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java 
b/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java
index 7b9123f..92e44a7 100644
--- a/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java
+++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.compaction;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 
@@ -43,10 +44,12 @@ import org.apache.cassandra.service.ActiveRepairService;
 public class PendingRepairHolder extends AbstractStrategyHolder
 {
     private final List<PendingRepairManager> managers = new ArrayList<>();
+    private final boolean isTransient;
 
-    public PendingRepairHolder(ColumnFamilyStore cfs, DestinationRouter router)
+    public PendingRepairHolder(ColumnFamilyStore cfs, DestinationRouter 
router, boolean isTransient)
     {
         super(cfs, router);
+        this.isTransient = isTransient;
     }
 
     @Override
@@ -66,15 +69,15 @@ public class PendingRepairHolder extends 
AbstractStrategyHolder
     {
         managers.clear();
         for (int i = 0; i < numTokenPartitions; i++)
-            managers.add(new PendingRepairManager(cfs, params));
+            managers.add(new PendingRepairManager(cfs, params, isTransient));
     }
 
     @Override
-    public boolean managesRepairedGroup(boolean isRepaired, boolean 
isPendingRepair)
+    public boolean managesRepairedGroup(boolean isRepaired, boolean 
isPendingRepair, boolean isTransient)
     {
         Preconditions.checkArgument(!isPendingRepair || !isRepaired,
                                     "SSTables cannot be both repaired and 
pending repair");
-        return isPendingRepair;
+        return isPendingRepair && (this.isTransient == isTransient);
     }
 
     @Override
@@ -145,7 +148,23 @@ public class PendingRepairHolder extends 
AbstractStrategyHolder
         return tasks;
     }
 
-    public ArrayList<TaskSupplier> getRepairFinishedTaskSuppliers()
+    AbstractCompactionTask getNextRepairFinishedTask()
+    {
+        List<TaskSupplier> repairFinishedSuppliers = 
getRepairFinishedTaskSuppliers();
+        if (!repairFinishedSuppliers.isEmpty())
+        {
+            Collections.sort(repairFinishedSuppliers);
+            for (TaskSupplier supplier : repairFinishedSuppliers)
+            {
+                AbstractCompactionTask task = supplier.getTask();
+                if (task != null)
+                    return task;
+            }
+        }
+        return null;
+    }
+
+    private ArrayList<TaskSupplier> getRepairFinishedTaskSuppliers()
     {
         ArrayList<TaskSupplier> suppliers = new ArrayList<>(managers.size());
         for (PendingRepairManager manager : managers)
@@ -218,6 +237,7 @@ public class PendingRepairHolder extends 
AbstractStrategyHolder
                                                        long keyCount,
                                                        long repairedAt,
                                                        UUID pendingRepair,
+                                                       boolean isTransient,
                                                        MetadataCollector 
collector,
                                                        SerializationHeader 
header,
                                                        Collection<Index> 
indexes,
@@ -233,6 +253,7 @@ public class PendingRepairHolder extends 
AbstractStrategyHolder
                                                  keyCount,
                                                  repairedAt,
                                                  pendingRepair,
+                                                 isTransient,
                                                  collector,
                                                  header,
                                                  indexes,
@@ -249,4 +270,15 @@ public class PendingRepairHolder extends 
AbstractStrategyHolder
         }
         return -1;
     }
+
+    public boolean hasDataForSession(UUID sessionID)
+    {
+        return Iterables.any(managers, prm -> 
prm.hasDataForSession(sessionID));
+    }
+
+    @Override
+    public boolean containsSSTable(SSTableReader sstable)
+    {
+        return Iterables.any(managers, prm -> prm.containsSSTable(sstable));
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java 
b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
index edc9a2f..6763abf 100644
--- a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
@@ -30,7 +30,9 @@ import java.util.UUID;
 import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 
 import org.slf4j.Logger;
@@ -62,6 +64,7 @@ class PendingRepairManager
 
     private final ColumnFamilyStore cfs;
     private final CompactionParams params;
+    private final boolean isTransient;
     private volatile ImmutableMap<UUID, AbstractCompactionStrategy> strategies 
= ImmutableMap.of();
 
     /**
@@ -75,10 +78,11 @@ class PendingRepairManager
         }
     }
 
-    PendingRepairManager(ColumnFamilyStore cfs, CompactionParams params)
+    PendingRepairManager(ColumnFamilyStore cfs, CompactionParams params, 
boolean isTransient)
     {
         this.cfs = cfs;
         this.params = params;
+        this.isTransient = isTransient;
     }
 
     private ImmutableMap.Builder<UUID, AbstractCompactionStrategy> mapBuilder()
@@ -156,6 +160,7 @@ class PendingRepairManager
 
     synchronized void addSSTable(SSTableReader sstable)
     {
+        Preconditions.checkArgument(sstable.isTransient() == isTransient);
         getOrCreate(sstable).addSSTable(sstable);
     }
 
@@ -389,6 +394,15 @@ class PendingRepairManager
         return strategies.keySet().contains(sessionID);
     }
 
+    boolean containsSSTable(SSTableReader sstable)
+    {
+        if (!sstable.isPendingRepair())
+            return false;
+
+        AbstractCompactionStrategy strategy = 
strategies.get(sstable.getPendingRepair());
+        return strategy != null && strategy.getSSTables().contains(sstable);
+    }
+
     public Collection<AbstractCompactionTask> 
createUserDefinedTasks(Collection<SSTableReader> sstables, int gcBefore)
     {
         Map<UUID, List<SSTableReader>> group = 
sstables.stream().collect(Collectors.groupingBy(s -> 
s.getSSTableMetadata().pendingRepair));
@@ -419,18 +433,35 @@ class PendingRepairManager
         protected void runMayThrow() throws Exception
         {
             boolean completed = false;
+            boolean obsoleteSSTables = isTransient && repairedAt > 0;
             try
             {
-                logger.debug("Setting repairedAt to {} on {} for {}", 
repairedAt, transaction.originals(), sessionID);
-                
cfs.getCompactionStrategyManager().mutateRepaired(transaction.originals(), 
repairedAt, ActiveRepairService.NO_PENDING_REPAIR);
+                if (obsoleteSSTables)
+                {
+                    logger.info("Obsoleting transient repaired ssatbles");
+                    
Preconditions.checkState(Iterables.all(transaction.originals(), 
SSTableReader::isTransient));
+                    transaction.obsoleteOriginals();
+                }
+                else
+                {
+                    logger.debug("Setting repairedAt to {} on {} for {}", 
repairedAt, transaction.originals(), sessionID);
+                    
cfs.getCompactionStrategyManager().mutateRepaired(transaction.originals(), 
repairedAt, ActiveRepairService.NO_PENDING_REPAIR, false);
+                }
                 completed = true;
             }
             finally
             {
-                // we always abort because mutating metadata isn't guarded by 
LifecycleTransaction, so this won't roll
-                // anything back. Also, we don't want to obsolete the 
originals. We're only using it to prevent other
-                // compactions from marking these sstables compacting, and 
unmarking them when we're done
-                transaction.abort();
+                if (obsoleteSSTables)
+                {
+                    transaction.finish();
+                }
+                else
+                {
+                    // we abort here because mutating metadata isn't guarded 
by LifecycleTransaction, so this won't roll
+                    // anything back. Also, we don't want to obsolete the 
originals. We're only using it to prevent other
+                    // compactions from marking these sstables compacting, and 
unmarking them when we're done
+                    transaction.abort();
+                }
                 if (completed)
                 {
                     removeSession(sessionID);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java 
b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index f97b693..aa41051 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -170,7 +170,7 @@ public class Scrubber implements Closeable
             }
 
             StatsMetadata metadata = sstable.getSSTableMetadata();
-            writer.switchWriter(CompactionManager.createWriter(cfs, 
destination, expectedBloomFilterSize, metadata.repairedAt, 
metadata.pendingRepair, sstable, transaction));
+            writer.switchWriter(CompactionManager.createWriter(cfs, 
destination, expectedBloomFilterSize, metadata.repairedAt, 
metadata.pendingRepair, metadata.isTransient, sstable, transaction));
 
             DecoratedKey prevKey = null;
 
@@ -277,7 +277,7 @@ public class Scrubber implements Closeable
                 // out of order rows, but no bad rows found - we can keep our 
repairedAt time
                 long repairedAt = badRows > 0 ? 
ActiveRepairService.UNREPAIRED_SSTABLE : metadata.repairedAt;
                 SSTableReader newInOrderSstable;
-                try (SSTableWriter inOrderWriter = 
CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, 
repairedAt, metadata.pendingRepair, sstable, transaction))
+                try (SSTableWriter inOrderWriter = 
CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, 
repairedAt, metadata.pendingRepair, metadata.isTransient, sstable, transaction))
                 {
                     for (Partition partition : outOfOrder)
                         inOrderWriter.append(partition.unfilteredIterator());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java 
b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index 80453ef..e1406aa 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -68,14 +68,15 @@ public class Upgrader
         this.estimatedRows = (long) Math.ceil((double) estimatedTotalKeys / 
estimatedSSTables);
     }
 
-    private SSTableWriter createCompactionWriter(long repairedAt, UUID 
parentRepair)
+    private SSTableWriter createCompactionWriter(StatsMetadata metadata)
     {
         MetadataCollector sstableMetadataCollector = new 
MetadataCollector(cfs.getComparator());
         sstableMetadataCollector.sstableLevel(sstable.getSSTableLevel());
         return SSTableWriter.create(cfs.newSSTableDescriptor(directory),
                                     estimatedRows,
-                                    repairedAt,
-                                    parentRepair,
+                                    metadata.repairedAt,
+                                    metadata.pendingRepair,
+                                    metadata.isTransient,
                                     cfs.metadata,
                                     sstableMetadataCollector,
                                     SerializationHeader.make(cfs.metadata(), 
Sets.newHashSet(sstable)),
@@ -91,8 +92,7 @@ public class Upgrader
              AbstractCompactionStrategy.ScannerList scanners = 
strategyManager.getScanners(transaction.originals());
              CompactionIterator iter = new 
CompactionIterator(transaction.opType(), scanners.scanners, controller, 
nowInSec, UUIDGen.getTimeUUID()))
         {
-            StatsMetadata metadata = sstable.getSSTableMetadata();
-            writer.switchWriter(createCompactionWriter(metadata.repairedAt, 
metadata.pendingRepair));
+            
writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata()));
             while (iter.hasNext())
                 writer.append(iter.next());
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/Verifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java 
b/src/java/org/apache/cassandra/db/compaction/Verifier.java
index db49369..446d527 100644
--- a/src/java/org/apache/cassandra/db/compaction/Verifier.java
+++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java
@@ -350,6 +350,7 @@ public class Verifier implements Closeable
         public RangeOwnHelper(List<Range<Token>> normalizedRanges)
         {
             this.normalizedRanges = normalizedRanges;
+            Range.assertNormalized(normalizedRanges);
         }
 
         /**
@@ -457,7 +458,7 @@ public class Verifier implements Closeable
         {
             try
             {
-                
sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, 
ActiveRepairService.UNREPAIRED_SSTABLE, 
sstable.getSSTableMetadata().pendingRepair);
+                
sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor,
 ActiveRepairService.UNREPAIRED_SSTABLE, sstable.getPendingRepair(), 
sstable.isTransient());
                 sstable.reloadSSTableMetadata();
                 
cfs.getTracker().notifySSTableRepairedStatusChanged(Collections.singleton(sstable));
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
 
b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
index 5ddd99c..d72b236 100644
--- 
a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
@@ -57,6 +57,7 @@ public abstract class CompactionAwareWriter extends 
Transactional.AbstractTransa
     protected final long maxAge;
     protected final long minRepairedAt;
     protected final UUID pendingRepair;
+    protected final boolean isTransient;
 
     protected final SSTableRewriter sstableWriter;
     protected final LifecycleTransaction txn;
@@ -91,6 +92,7 @@ public abstract class CompactionAwareWriter extends 
Transactional.AbstractTransa
         sstableWriter = SSTableRewriter.construct(cfs, txn, keepOriginals, 
maxAge);
         minRepairedAt = CompactionTask.getMinRepairedAt(nonExpiredSSTables);
         pendingRepair = CompactionTask.getPendingRepair(nonExpiredSSTables);
+        isTransient = CompactionTask.getIsTransient(nonExpiredSSTables);
         DiskBoundaries db = cfs.getDiskBoundaries();
         diskBoundaries = db.positions;
         locations = db.directories;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
 
b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
index cda7e38..6180f96 100644
--- 
a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
@@ -72,6 +72,7 @@ public class DefaultCompactionWriter extends 
CompactionAwareWriter
                                                     estimatedTotalKeys,
                                                     minRepairedAt,
                                                     pendingRepair,
+                                                    isTransient,
                                                     cfs.metadata,
                                                     new 
MetadataCollector(txn.originals(), cfs.metadata().comparator, sstableLevel),
                                                     
SerializationHeader.make(cfs.metadata(), nonExpiredSSTables),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
 
b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
index 3959b4b..2b93eb4 100644
--- 
a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
@@ -108,6 +108,7 @@ public class MajorLeveledCompactionWriter extends 
CompactionAwareWriter
                 keysPerSSTable,
                 minRepairedAt,
                 pendingRepair,
+                isTransient,
                 cfs.metadata,
                 new MetadataCollector(txn.originals(), 
cfs.metadata().comparator, currentLevel),
                 SerializationHeader.make(cfs.metadata(), txn.originals()),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java 
b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
index c4f84e8..df7eeaf 100644
--- 
a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
@@ -111,6 +111,7 @@ public class MaxSSTableSizeWriter extends 
CompactionAwareWriter
                                                     estimatedTotalKeys / 
estimatedSSTables,
                                                     minRepairedAt,
                                                     pendingRepair,
+                                                    isTransient,
                                                     cfs.metadata,
                                                     new 
MetadataCollector(allSSTables, cfs.metadata().comparator, level),
                                                     
SerializationHeader.make(cfs.metadata(), nonExpiredSSTables),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
 
b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
index a4af783..7533f1d 100644
--- 
a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
@@ -107,6 +107,7 @@ public class SplittingSizeTieredCompactionWriter extends 
CompactionAwareWriter
                                                     currentPartitionsToWrite,
                                                     minRepairedAt,
                                                     pendingRepair,
+                                                    isTransient,
                                                     cfs.metadata,
                                                     new 
MetadataCollector(allSSTables, cfs.metadata().comparator, 0),
                                                     
SerializationHeader.make(cfs.metadata(), nonExpiredSSTables),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java 
b/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
index 9064b0f..bed0958 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
@@ -82,18 +82,6 @@ public abstract class PartitionIterators
         return new SingletonPartitionIterator(iterator);
     }
 
-    public static void consume(PartitionIterator iterator)
-    {
-        while (iterator.hasNext())
-        {
-            try (RowIterator partition = iterator.next())
-            {
-                while (partition.hasNext())
-                    partition.next();
-            }
-        }
-    }
-
     /**
      * Wraps the provided iterator so it logs the returned rows for debugging 
purposes.
      * <p>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/repair/CassandraKeyspaceRepairManager.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/repair/CassandraKeyspaceRepairManager.java 
b/src/java/org/apache/cassandra/db/repair/CassandraKeyspaceRepairManager.java
index 5f2e5a0..fa2e653 100644
--- 
a/src/java/org/apache/cassandra/db/repair/CassandraKeyspaceRepairManager.java
+++ 
b/src/java/org/apache/cassandra/db/repair/CassandraKeyspaceRepairManager.java
@@ -26,8 +26,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.RangesAtEndpoint;
 import org.apache.cassandra.repair.KeyspaceRepairManager;
 
 public class CassandraKeyspaceRepairManager implements KeyspaceRepairManager
@@ -40,9 +39,12 @@ public class CassandraKeyspaceRepairManager implements 
KeyspaceRepairManager
     }
 
     @Override
-    public ListenableFuture prepareIncrementalRepair(UUID sessionID, 
Collection<ColumnFamilyStore> tables, Collection<Range<Token>> ranges, 
ExecutorService executor)
+    public ListenableFuture prepareIncrementalRepair(UUID sessionID,
+                                                     
Collection<ColumnFamilyStore> tables,
+                                                     RangesAtEndpoint 
tokenRanges,
+                                                     ExecutorService executor)
     {
-        PendingAntiCompaction pac = new PendingAntiCompaction(sessionID, 
tables, ranges, executor);
+        PendingAntiCompaction pac = new PendingAntiCompaction(sessionID, 
tables, tokenRanges, executor);
         return pac.run();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java 
b/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
index 4e0f13d..a205c3c 100644
--- a/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
+++ b/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
@@ -43,6 +43,7 @@ import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.locator.RangesAtEndpoint;
 import org.apache.cassandra.utils.concurrent.Refs;
 
 /**
@@ -126,17 +127,17 @@ public class PendingAntiCompaction
     static class AcquisitionCallback implements 
AsyncFunction<List<AcquireResult>, Object>
     {
         private final UUID parentRepairSession;
-        private final Collection<Range<Token>> ranges;
+        private final RangesAtEndpoint tokenRanges;
 
-        public AcquisitionCallback(UUID parentRepairSession, 
Collection<Range<Token>> ranges)
+        public AcquisitionCallback(UUID parentRepairSession, RangesAtEndpoint 
tokenRanges)
         {
             this.parentRepairSession = parentRepairSession;
-            this.ranges = ranges;
+            this.tokenRanges = tokenRanges;
         }
 
         ListenableFuture<?> submitPendingAntiCompaction(AcquireResult result)
         {
-            return 
CompactionManager.instance.submitPendingAntiCompaction(result.cfs, ranges, 
result.refs, result.txn, parentRepairSession);
+            return 
CompactionManager.instance.submitPendingAntiCompaction(result.cfs, tokenRanges, 
result.refs, result.txn, parentRepairSession);
         }
 
         public ListenableFuture apply(List<AcquireResult> results) throws 
Exception
@@ -177,14 +178,17 @@ public class PendingAntiCompaction
 
     private final UUID prsId;
     private final Collection<ColumnFamilyStore> tables;
-    private final Collection<Range<Token>> ranges;
+    private final RangesAtEndpoint tokenRanges;
     private final ExecutorService executor;
 
-    public PendingAntiCompaction(UUID prsId, Collection<ColumnFamilyStore> 
tables, Collection<Range<Token>> ranges, ExecutorService executor)
+    public PendingAntiCompaction(UUID prsId,
+                                 Collection<ColumnFamilyStore> tables,
+                                 RangesAtEndpoint tokenRanges,
+                                 ExecutorService executor)
     {
         this.prsId = prsId;
         this.tables = tables;
-        this.ranges = ranges;
+        this.tokenRanges = tokenRanges;
         this.executor = executor;
     }
 
@@ -194,12 +198,12 @@ public class PendingAntiCompaction
         for (ColumnFamilyStore cfs : tables)
         {
             cfs.forceBlockingFlush();
-            ListenableFutureTask<AcquireResult> task = 
ListenableFutureTask.create(new AcquisitionCallable(cfs, ranges, prsId));
+            ListenableFutureTask<AcquireResult> task = 
ListenableFutureTask.create(new AcquisitionCallable(cfs, tokenRanges.ranges(), 
prsId));
             executor.submit(task);
             tasks.add(task);
         }
         ListenableFuture<List<AcquireResult>> acquisitionResults = 
Futures.successfulAsList(tasks);
-        ListenableFuture compactionResult = 
Futures.transformAsync(acquisitionResults, new AcquisitionCallback(prsId, 
ranges), MoreExecutors.directExecutor());
+        ListenableFuture compactionResult = 
Futures.transformAsync(acquisitionResults, new AcquisitionCallback(prsId, 
tokenRanges), MoreExecutors.directExecutor());
         return compactionResult;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java 
b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
index 5252187..c688fdf 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
@@ -68,17 +68,18 @@ public class CassandraOutgoingFile implements OutgoingStream
     private final ComponentManifest manifest;
     private Boolean isFullyContained;
 
-    private final List<Range<Token>> ranges;
+    private final List<Range<Token>> normalizedRanges;
 
     public CassandraOutgoingFile(StreamOperation operation, Ref<SSTableReader> 
ref,
-                                 List<SSTableReader.PartitionPositionBounds> 
sections, Collection<Range<Token>> ranges,
+                                 List<SSTableReader.PartitionPositionBounds> 
sections, List<Range<Token>> normalizedRanges,
                                  long estimatedKeys)
     {
         Preconditions.checkNotNull(ref.get());
+        Range.assertNormalized(normalizedRanges);
         this.ref = ref;
         this.estimatedKeys = estimatedKeys;
         this.sections = sections;
-        this.ranges = ImmutableList.copyOf(ranges);
+        this.normalizedRanges = ImmutableList.copyOf(normalizedRanges);
         this.filename = ref.get().getFilename();
         this.manifest = getComponentManifest(ref.get());
 
@@ -194,7 +195,7 @@ public class CassandraOutgoingFile implements OutgoingStream
                                                            
.getCompactionStrategyFor(ref.get());
 
         if (compactionStrategy instanceof LeveledCompactionStrategy)
-            return contained(ranges, ref.get());
+            return contained(normalizedRanges, ref.get());
 
         return false;
     }
@@ -251,6 +252,6 @@ public class CassandraOutgoingFile implements OutgoingStream
     @Override
     public String toString()
     {
-        return "CassandraOutgoingFile{" + ref.get().getFilename() + '}';
+        return "CassandraOutgoingFile{" + filename + '}';
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java 
b/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java
index 43667d0..6c2631c 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java
@@ -18,19 +18,10 @@
 
 package org.apache.cassandra.db.streaming;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.db.lifecycle.SSTableIntervalTree;
@@ -39,6 +30,8 @@ import org.apache.cassandra.db.lifecycle.View;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.streaming.IncomingStream;
 import org.apache.cassandra.streaming.OutgoingStream;
@@ -49,6 +42,14 @@ import org.apache.cassandra.streaming.TableStreamManager;
 import org.apache.cassandra.streaming.messages.StreamMessageHeader;
 import org.apache.cassandra.utils.concurrent.Ref;
 import org.apache.cassandra.utils.concurrent.Refs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
 
 /**
  * Implements the streaming interface for the native cassandra storage engine.
@@ -96,14 +97,14 @@ public class CassandraStreamManager implements 
TableStreamManager
     }
 
     @Override
-    public Collection<OutgoingStream> createOutgoingStreams(StreamSession 
session, Collection<Range<Token>> ranges, UUID pendingRepair, PreviewKind 
previewKind)
+    public Collection<OutgoingStream> createOutgoingStreams(StreamSession 
session, RangesAtEndpoint replicas, UUID pendingRepair, PreviewKind previewKind)
     {
         Refs<SSTableReader> refs = new Refs<>();
         try
         {
-            final List<Range<PartitionPosition>> keyRanges = new 
ArrayList<>(ranges.size());
-            for (Range<Token> range : ranges)
-                keyRanges.add(Range.makeRowRange(range));
+            final List<Range<PartitionPosition>> keyRanges = new 
ArrayList<>(replicas.size());
+            for (Replica replica : replicas)
+                keyRanges.add(Range.makeRowRange(replica.range()));
             refs.addAll(cfs.selectAndReference(view -> {
                 Set<SSTableReader> sstables = Sets.newHashSet();
                 SSTableIntervalTree intervalTree = 
SSTableIntervalTree.build(view.select(SSTableSet.CANONICAL));
@@ -141,11 +142,16 @@ public class CassandraStreamManager implements 
TableStreamManager
             }).refs);
 
 
+            List<Range<Token>> normalizedFullRanges = 
Range.normalize(replicas.filter(Replica::isFull).ranges());
+            List<Range<Token>> normalizedAllRanges = 
Range.normalize(replicas.ranges());
+            //Create outgoing file streams for ranges possibly skipping 
repaired ranges in sstables
             List<OutgoingStream> streams = new ArrayList<>(refs.size());
-            for (SSTableReader sstable: refs)
+            for (SSTableReader sstable : refs)
             {
-                Ref<SSTableReader> ref = refs.get(sstable);
+                List<Range<Token>> ranges = sstable.isRepaired() ? 
normalizedFullRanges : normalizedAllRanges;
                 List<SSTableReader.PartitionPositionBounds> sections = 
sstable.getPositionsForRanges(ranges);
+
+                Ref<SSTableReader> ref = refs.get(sstable);
                 if (sections.isEmpty())
                 {
                     ref.release();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java 
b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
index fccabfe..572c648 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
@@ -156,7 +156,7 @@ public class CassandraStreamReader implements IStreamReader
         Preconditions.checkState(streamReceiver instanceof 
CassandraStreamReceiver);
         LifecycleTransaction txn = 
CassandraStreamReceiver.fromReceiver(session.getAggregator(tableId)).getTransaction();
 
-        RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, 
estimatedKeys, repairedAt, pendingRepair, format, sstableLevel, totalSize, txn, 
getHeader(cfs.metadata()));
+        RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, 
estimatedKeys, repairedAt, pendingRepair, false, format, sstableLevel, 
totalSize, txn, getHeader(cfs.metadata()));
         return writer;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/view/TableViews.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/TableViews.java 
b/src/java/org/apache/cassandra/db/view/TableViews.java
index d35457e..09490e8 100644
--- a/src/java/org/apache/cassandra/db/view/TableViews.java
+++ b/src/java/org/apache/cassandra/db/view/TableViews.java
@@ -60,6 +60,11 @@ public class TableViews extends AbstractCollection<View>
         baseTableMetadata = Schema.instance.getTableMetadataRef(id);
     }
 
+    public boolean hasViews()
+    {
+        return !views.isEmpty();
+    }
+
     public int size()
     {
         return views.size();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/view/ViewBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilder.java 
b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
index c727f63..6717297 100644
--- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java
+++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
@@ -43,6 +43,8 @@ import 
org.apache.cassandra.db.compaction.CompactionInterruptedException;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.locator.Replicas;
 import org.apache.cassandra.repair.SystemDistributedKeyspace;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
@@ -135,14 +137,15 @@ class ViewBuilder
         }
 
         // Get the local ranges for which the view hasn't already been built 
nor it's building
-        Set<Range<Token>> newRanges = 
StorageService.instance.getLocalRanges(ksName)
-                                                             .stream()
-                                                             .map(r -> 
r.subtractAll(builtRanges))
-                                                             
.flatMap(Set::stream)
-                                                             .map(r -> 
r.subtractAll(pendingRanges.keySet()))
-                                                             
.flatMap(Set::stream)
-                                                             
.collect(Collectors.toSet());
-
+        RangesAtEndpoint replicatedRanges = 
StorageService.instance.getLocalReplicas(ksName);
+        Replicas.temporaryAssertFull(replicatedRanges);
+        Set<Range<Token>> newRanges = replicatedRanges.ranges()
+                                                      .stream()
+                                                      .map(r -> 
r.subtractAll(builtRanges))
+                                                      .flatMap(Set::stream)
+                                                      .map(r -> 
r.subtractAll(pendingRanges.keySet()))
+                                                      .flatMap(Set::stream)
+                                                      
.collect(Collectors.toSet());
         // If there are no new nor pending ranges we should finish the build
         if (newRanges.isEmpty() && pendingRanges.isEmpty())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/view/ViewManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewManager.java 
b/src/java/org/apache/cassandra/db/view/ViewManager.java
index 000477d..7e3ea1b 100644
--- a/src/java/org/apache/cassandra/db/view/ViewManager.java
+++ b/src/java/org/apache/cassandra/db/view/ViewManager.java
@@ -79,7 +79,7 @@ public class ViewManager
             {
                 assert keyspace.getName().equals(update.metadata().keyspace);
 
-                if (coordinatorBatchlog && 
keyspace.getReplicationStrategy().getReplicationFactor() == 1)
+                if (coordinatorBatchlog && 
keyspace.getReplicationStrategy().getReplicationFactor().allReplicas == 1)
                     continue;
 
                 if 
(!forTable(update.metadata().id).updatedViews(update).isEmpty())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/view/ViewUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewUtils.java 
b/src/java/org/apache/cassandra/db/view/ViewUtils.java
index df16943..ad10d9d 100644
--- a/src/java/org/apache/cassandra/db/view/ViewUtils.java
+++ b/src/java/org/apache/cassandra/db/view/ViewUtils.java
@@ -18,16 +18,17 @@
 
 package org.apache.cassandra.db.view;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Optional;
+import java.util.function.Predicate;
 
+import com.google.common.collect.Iterables;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
-import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.EndpointsForToken;
 import org.apache.cassandra.locator.NetworkTopologyStrategy;
+import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.utils.FBUtilities;
 
 public final class ViewUtils
@@ -58,46 +59,51 @@ public final class ViewUtils
      *
      * @return Optional.empty() if this method is called using a base token 
which does not belong to this replica
      */
-    public static Optional<InetAddressAndPort> getViewNaturalEndpoint(String 
keyspaceName, Token baseToken, Token viewToken)
+    public static Optional<Replica> getViewNaturalEndpoint(String 
keyspaceName, Token baseToken, Token viewToken)
     {
         AbstractReplicationStrategy replicationStrategy = 
Keyspace.open(keyspaceName).getReplicationStrategy();
 
         String localDataCenter = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
-        List<InetAddressAndPort> baseEndpoints = new ArrayList<>();
-        List<InetAddressAndPort> viewEndpoints = new ArrayList<>();
-        for (InetAddressAndPort baseEndpoint : 
replicationStrategy.getNaturalEndpoints(baseToken))
-        {
-            // An endpoint is local if we're not using Net
-            if (!(replicationStrategy instanceof NetworkTopologyStrategy) ||
-                
DatabaseDescriptor.getEndpointSnitch().getDatacenter(baseEndpoint).equals(localDataCenter))
-                baseEndpoints.add(baseEndpoint);
-        }
+        EndpointsForToken naturalBaseReplicas = 
replicationStrategy.getNaturalReplicasForToken(baseToken);
+        EndpointsForToken naturalViewReplicas = 
replicationStrategy.getNaturalReplicasForToken(viewToken);
 
-        for (InetAddressAndPort viewEndpoint : 
replicationStrategy.getNaturalEndpoints(viewToken))
-        {
-            // If we are a base endpoint which is also a view replica, we use 
ourselves as our view replica
-            if (viewEndpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
-                return Optional.of(viewEndpoint);
+        Optional<Replica> localReplica = 
Iterables.tryFind(naturalViewReplicas, Replica::isLocal).toJavaUtil();
+        if (localReplica.isPresent())
+            return localReplica;
 
-            // We have to remove any endpoint which is shared between the base 
and the view, as it will select itself
-            // and throw off the counts otherwise.
-            if (baseEndpoints.contains(viewEndpoint))
-                baseEndpoints.remove(viewEndpoint);
-            else if (!(replicationStrategy instanceof NetworkTopologyStrategy) 
||
-                     
DatabaseDescriptor.getEndpointSnitch().getDatacenter(viewEndpoint).equals(localDataCenter))
-                viewEndpoints.add(viewEndpoint);
-        }
+        // We only select replicas from our own DC
+        // TODO: this is poor encapsulation, leaking implementation details of 
replication strategy
+        Predicate<Replica> isLocalDC = r -> !(replicationStrategy instanceof 
NetworkTopologyStrategy)
+                || 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(r).equals(localDataCenter);
+
+        // We have to remove any endpoint which is shared between the base and 
the view, as it will select itself
+        // and throw off the counts otherwise.
+        EndpointsForToken baseReplicas = naturalBaseReplicas.filter(
+                r -> !naturalViewReplicas.endpoints().contains(r.endpoint()) 
&& isLocalDC.test(r)
+        );
+        EndpointsForToken viewReplicas = naturalViewReplicas.filter(
+                r -> !naturalBaseReplicas.endpoints().contains(r.endpoint()) 
&& isLocalDC.test(r)
+        );
 
         // The replication strategy will be the same for the base and the 
view, as they must belong to the same keyspace.
         // Since the same replication strategy is used, the same placement 
should be used and we should get the same
         // number of replicas for all of the tokens in the ring.
-        assert baseEndpoints.size() == viewEndpoints.size() : "Replication 
strategy should have the same number of endpoints for the base and the view";
-        int baseIdx = 
baseEndpoints.indexOf(FBUtilities.getBroadcastAddressAndPort());
+        assert baseReplicas.size() == viewReplicas.size() : "Replication 
strategy should have the same number of endpoints for the base and the view";
+
+        int baseIdx = -1;
+        for (int i=0; i<baseReplicas.size(); i++)
+        {
+            if (baseReplicas.get(i).isLocal())
+            {
+                baseIdx = i;
+                break;
+            }
+        }
 
         if (baseIdx < 0)
             //This node is not a base replica of this key, so we return empty
             return Optional.empty();
 
-        return Optional.of(viewEndpoints.get(baseIdx));
+        return Optional.of(viewReplicas.get(baseIdx));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/dht/Range.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Range.java 
b/src/java/org/apache/cassandra/dht/Range.java
index 974d08e..e03c5ec 100644
--- a/src/java/org/apache/cassandra/dht/Range.java
+++ b/src/java/org/apache/cassandra/dht/Range.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.dht;
 
 import java.io.Serializable;
 import java.util.*;
+import java.util.function.Predicate;
 
 import org.apache.commons.lang3.ObjectUtils;
 
@@ -529,7 +530,7 @@ public class Range<T extends RingPosition<T>> extends 
AbstractBounds<T> implemen
     /**
      * Helper class to check if a token is contained within a given collection 
of ranges
      */
-    public static class OrderedRangeContainmentChecker
+    public static class OrderedRangeContainmentChecker implements 
Predicate<Token>
     {
         private final Iterator<Range<Token>> normalizedRangesIterator;
         private Token lastToken = null;
@@ -550,7 +551,8 @@ public class Range<T extends RingPosition<T>> extends 
AbstractBounds<T> implemen
          * @param t token to check, must be larger than or equal to the last 
token passed
          * @return true if the token is contained within the ranges given to 
the constructor.
          */
-        public boolean contains(Token t)
+        @Override
+        public boolean test(Token t)
         {
             assert lastToken == null || lastToken.compareTo(t) <= 0;
             lastToken = t;
@@ -567,4 +569,25 @@ public class Range<T extends RingPosition<T>> extends 
AbstractBounds<T> implemen
             }
         }
     }
+
+    public static <T extends RingPosition<T>> void 
assertNormalized(List<Range<T>> ranges)
+    {
+        Range<T> lastRange = null;
+        for (Range<T> range : ranges)
+        {
+            if (lastRange == null)
+            {
+                lastRange = range;
+            }
+            else if (lastRange.left.compareTo(range.left) >= 0 || 
lastRange.intersects(range))
+            {
+                throw new AssertionError(String.format("Ranges aren't properly 
normalized. lastRange %s, range %s, compareTo %d, intersects %b, all ranges 
%s%n",
+                                                       lastRange,
+                                                       range,
+                                                       
lastRange.compareTo(range),
+                                                       
lastRange.intersects(range),
+                                                       ranges));
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java 
b/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java
index b90bc96..4b98b97 100644
--- a/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java
+++ b/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java
@@ -19,25 +19,27 @@
 package org.apache.cassandra.dht;
 
 import java.math.BigInteger;
-import java.net.InetAddress;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Comparator;
-import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 
+import org.apache.cassandra.locator.EndpointsByRange;
+import org.apache.cassandra.locator.EndpointsForRange;
 import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.locator.Replica;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.Replicas;
 import org.psjava.algo.graph.flownetwork.FordFulkersonAlgorithm;
 import org.psjava.algo.graph.flownetwork.MaximumFlowAlgorithm;
 import org.psjava.algo.graph.flownetwork.MaximumFlowAlgorithmResult;
@@ -73,20 +75,20 @@ public class RangeFetchMapCalculator
 {
     private static final Logger logger = 
LoggerFactory.getLogger(RangeFetchMapCalculator.class);
     private static final long TRIVIAL_RANGE_LIMIT = 1000;
-    private final Multimap<Range<Token>, InetAddressAndPort> rangesWithSources;
-    private final Collection<RangeStreamer.ISourceFilter> sourceFilters;
+    private final EndpointsByRange rangesWithSources;
+    private final Predicate<Replica> sourceFilters;
     private final String keyspace;
     //We need two Vertices to act as source and destination in the algorithm
     private final Vertex sourceVertex = OuterVertex.getSourceVertex();
     private final Vertex destinationVertex = 
OuterVertex.getDestinationVertex();
     private final Set<Range<Token>> trivialRanges;
 
-    public RangeFetchMapCalculator(Multimap<Range<Token>, InetAddressAndPort> 
rangesWithSources,
-                                   Collection<RangeStreamer.ISourceFilter> 
sourceFilters,
+    public RangeFetchMapCalculator(EndpointsByRange rangesWithSources,
+                                   Collection<Predicate<Replica>> 
sourceFilters,
                                    String keyspace)
     {
         this.rangesWithSources = rangesWithSources;
-        this.sourceFilters = sourceFilters;
+        this.sourceFilters = Predicates.and(sourceFilters);
         this.keyspace = keyspace;
         this.trivialRanges = rangesWithSources.keySet()
                                               .stream()
@@ -158,14 +160,15 @@ public class RangeFetchMapCalculator
             boolean localDCCheck = true;
             while (!added)
             {
-                List<InetAddressAndPort> srcs = new 
ArrayList<>(rangesWithSources.get(trivialRange));
                 // sort with the endpoint having the least number of streams 
first:
-                srcs.sort(Comparator.comparingInt(o -> 
optimisedMap.get(o).size()));
-                for (InetAddressAndPort src : srcs)
+                EndpointsForRange replicas = 
rangesWithSources.get(trivialRange)
+                        .sorted(Comparator.comparingInt(o -> 
optimisedMap.get(o.endpoint()).size()));
+                Replicas.temporaryAssertFull(replicas);
+                for (Replica replica : replicas)
                 {
-                    if (passFilters(src, localDCCheck))
+                    if (passFilters(replica, localDCCheck))
                     {
-                        fetchMap.put(src, trivialRange);
+                        fetchMap.put(replica.endpoint(), trivialRange);
                         added = true;
                         break;
                     }
@@ -347,15 +350,16 @@ public class RangeFetchMapCalculator
     private boolean addEndpoints(MutableCapacityGraph<Vertex, Integer> 
capacityGraph, RangeVertex rangeVertex, boolean localDCCheck)
     {
         boolean sourceFound = false;
-        for (InetAddressAndPort endpoint : 
rangesWithSources.get(rangeVertex.getRange()))
+        
Replicas.temporaryAssertFull(rangesWithSources.get(rangeVertex.getRange()));
+        for (Replica replica : rangesWithSources.get(rangeVertex.getRange()))
         {
-            if (passFilters(endpoint, localDCCheck))
+            if (passFilters(replica, localDCCheck))
             {
                 sourceFound = true;
                 // if we pass filters, it means that we don't filter away 
localhost and we can count it as a source:
-                if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
+                if (replica.isLocal())
                     continue; // but don't add localhost to the graph to avoid 
streaming locally
-                final Vertex endpointVertex = new EndpointVertex(endpoint);
+                final Vertex endpointVertex = new 
EndpointVertex(replica.endpoint());
                 capacityGraph.insertVertex(rangeVertex);
                 capacityGraph.insertVertex(endpointVertex);
                 capacityGraph.addEdge(rangeVertex, endpointVertex, 
Integer.MAX_VALUE);
@@ -364,26 +368,20 @@ public class RangeFetchMapCalculator
         return sourceFound;
     }
 
-    private boolean isInLocalDC(InetAddressAndPort endpoint)
+    private boolean isInLocalDC(Replica replica)
     {
-        return 
DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(endpoint));
+        return 
DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(replica));
     }
 
     /**
      *
-     * @param endpoint   Endpoint to check
+     * @param replica   Replica to check
      * @param localDCCheck Allow endpoints with local DC
      * @return   True if filters pass this endpoint
      */
-    private boolean passFilters(final InetAddressAndPort endpoint, boolean 
localDCCheck)
+    private boolean passFilters(final Replica replica, boolean localDCCheck)
     {
-        for (RangeStreamer.ISourceFilter filter : sourceFilters)
-        {
-            if (!filter.shouldInclude(endpoint))
-                return false;
-        }
-
-        return !localDCCheck || isInLocalDC(endpoint);
+        return sourceFilters.apply(replica) && (!localDCCheck || 
isInLocalDC(replica));
     }
 
     private static abstract class Vertex


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to