This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
     new e5c3d08  Operational improvements and hardening for replica filtering 
protection patch by Caleb Rackliffe; reviewed by Andrés de la Peña for 
CASSANDRA-15907
e5c3d08 is described below

commit e5c3d08a1428d378b6690f0419a2b25724b9736e
Author: Caleb Rackliffe <calebrackli...@gmail.com>
AuthorDate: Mon Jun 29 13:54:23 2020 -0500

    Operational improvements and hardening for replica filtering protection
    patch by Caleb Rackliffe; reviewed by Andrés de la Peña for CASSANDRA-15907
---
 CHANGES.txt                                        |   1 +
 build.xml                                          |   2 +-
 conf/cassandra.yaml                                |  20 +
 src/java/org/apache/cassandra/config/Config.java   |   2 +
 .../cassandra/config/DatabaseDescriptor.java       |  20 +
 .../config/ReplicaFilteringProtectionOptions.java  |  28 ++
 .../db/partitions/PartitionIterators.java          |  49 ++-
 .../partitions/UnfilteredPartitionIterators.java   |  19 -
 .../apache/cassandra/db/rows/EncodingStats.java    |  24 ++
 .../org/apache/cassandra/metrics/TableMetrics.java |  22 +-
 .../org/apache/cassandra/service/DataResolver.java |  89 +++--
 .../service/ReplicaFilteringProtection.java        | 423 ++++++++++++---------
 .../apache/cassandra/service/StorageService.java   |  27 +-
 .../cassandra/service/StorageServiceMBean.java     |  12 +
 .../org/apache/cassandra/utils/FBUtilities.java    |   4 +-
 .../cassandra/utils/concurrent/Accumulator.java    |   9 +-
 .../cassandra/distributed/impl/Coordinator.java    |  10 +
 .../apache/cassandra/distributed/impl/RowUtil.java |   7 +-
 .../test/ReplicaFilteringProtectionTest.java       | 244 ++++++++++++
 .../cassandra/db/rows/EncodingStatsTest.java       | 145 +++++++
 .../utils/concurrent/AccumulatorTest.java          |  34 +-
 21 files changed, 924 insertions(+), 267 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index a755b7e..182dca3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.22:
+ * Operational improvements and hardening for replica filtering protection 
(CASSANDRA-15907)
  * stop_paranoid disk failure policy is ignored on CorruptSSTableException 
after node is up (CASSANDRA-15191)
  * 3.x fails to start if commit log has range tombstones from a column which 
is also deleted (CASSANDRA-15970)
  * Forbid altering UDTs used in partition keys (CASSANDRA-15933)
diff --git a/build.xml b/build.xml
index 6492767..6c1d148 100644
--- a/build.xml
+++ b/build.xml
@@ -398,7 +398,7 @@
           </dependency>
           <dependency groupId="junit" artifactId="junit" version="4.6" />
           <dependency groupId="org.mockito" artifactId="mockito-core" 
version="3.2.4" />
-          <dependency groupId="org.apache.cassandra" artifactId="dtest-api" 
version="0.0.3" />
+          <dependency groupId="org.apache.cassandra" artifactId="dtest-api" 
version="0.0.4" />
           <dependency groupId="org.apache.rat" artifactId="apache-rat" 
version="0.10">
              <exclusion groupId="commons-lang" artifactId="commons-lang"/>
           </dependency>
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index c321a72..bb96f18 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -661,6 +661,26 @@ auto_snapshot: true
 tombstone_warn_threshold: 1000
 tombstone_failure_threshold: 100000
 
+# Filtering and secondary index queries at read consistency levels above 
ONE/LOCAL_ONE use a
+# mechanism called replica filtering protection to ensure that results from 
stale replicas do
+# not violate consistency. (See CASSANDRA-8272 and CASSANDRA-15907 for more 
details.) This 
+# mechanism materializes replica results by partition on-heap at the 
coordinator. The more possibly
+# stale results returned by the replicas, the more rows materialized during 
the query.
+replica_filtering_protection:
+    # These thresholds exist to limit the damage severely out-of-date replicas 
can cause during these
+    # queries. They limit the number of rows from all replicas individual 
index and filtering queries
+    # can materialize on-heap to return correct results at the desired read 
consistency level.
+    #
+    # "cached_replica_rows_warn_threshold" is the per-query threshold at which 
a warning will be logged.
+    # "cached_replica_rows_fail_threshold" is the per-query threshold at which 
the query will fail.
+    #
+    # These thresholds may also be adjusted at runtime using the 
StorageService mbean.
+    # 
+    # If the failure threshold is breached, it is likely that either the 
current page/fetch size
+    # is too large or one or more replicas is severely out-of-sync and in need 
of repair.
+    cached_rows_warn_threshold: 2000
+    cached_rows_fail_threshold: 32000
+
 # Granularity of the collation index of rows within a partition.
 # Increase if your rows are large, or if you have a very large
 # number of rows per partition.  The competing goals are these:
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index 6003bd1..2218ee2 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -280,6 +280,8 @@ public class Config
     public volatile int tombstone_warn_threshold = 1000;
     public volatile int tombstone_failure_threshold = 100000;
 
+    public final ReplicaFilteringProtectionOptions 
replica_filtering_protection = new ReplicaFilteringProtectionOptions();
+
     public volatile Long index_summary_capacity_in_mb;
     public volatile int index_summary_resize_interval_in_minutes = 60;
 
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 4b732c2..9369229 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1368,6 +1368,26 @@ public class DatabaseDescriptor
         conf.tombstone_failure_threshold = threshold;
     }
 
+    public static int getCachedReplicaRowsWarnThreshold()
+    {
+        return conf.replica_filtering_protection.cached_rows_warn_threshold;
+    }
+
+    public static void setCachedReplicaRowsWarnThreshold(int threshold)
+    {
+        conf.replica_filtering_protection.cached_rows_warn_threshold = 
threshold;
+    }
+
+    public static int getCachedReplicaRowsFailThreshold()
+    {
+        return conf.replica_filtering_protection.cached_rows_fail_threshold;
+    }
+
+    public static void setCachedReplicaRowsFailThreshold(int threshold)
+    {
+        conf.replica_filtering_protection.cached_rows_fail_threshold = 
threshold;
+    }
+
     /**
      * size of commitlog segments to allocate
      */
diff --git 
a/src/java/org/apache/cassandra/config/ReplicaFilteringProtectionOptions.java 
b/src/java/org/apache/cassandra/config/ReplicaFilteringProtectionOptions.java
new file mode 100644
index 0000000..7a755ab
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/config/ReplicaFilteringProtectionOptions.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.config;
+
+public class ReplicaFilteringProtectionOptions
+{
+    public static final int DEFAULT_WARN_THRESHOLD = 2000;
+    public static final int DEFAULT_FAIL_THRESHOLD = 32000;
+
+    public volatile int cached_rows_warn_threshold = DEFAULT_WARN_THRESHOLD;
+    public volatile int cached_rows_fail_threshold = DEFAULT_FAIL_THRESHOLD;
+}
diff --git 
a/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java 
b/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
index a3cf746..c7f20c5 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
@@ -18,10 +18,8 @@
 package org.apache.cassandra.db.partitions;
 
 import java.util.*;
-import java.security.MessageDigest;
 
 import org.apache.cassandra.db.EmptyIterators;
-import org.apache.cassandra.db.transform.FilteredPartitions;
 import org.apache.cassandra.db.transform.MorePartitions;
 import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.utils.AbstractIterator;
@@ -98,6 +96,21 @@ public abstract class PartitionIterators
     }
 
     /**
+     * Consumes all rows in the next partition of the provided partition 
iterator.
+     */
+    public static void consumeNext(PartitionIterator iterator)
+    {
+        if (iterator.hasNext())
+        {
+            try (RowIterator partition = iterator.next())
+            {
+                while (partition.hasNext())
+                    partition.next();
+            }
+        }
+    }
+
+    /**
      * Wraps the provided iterator so it logs the returned rows for debugging 
purposes.
      * <p>
      * Note that this is only meant for debugging as this can log a very large 
amount of
@@ -116,6 +129,38 @@ public abstract class PartitionIterators
         return Transformation.apply(iterator, new Logger());
     }
 
+    /**
+     * Wraps the provided iterator to run a specified action on close. Note 
that the action will be
+     * run even if closure of the provided iterator throws an exception.
+     */
+    public static PartitionIterator doOnClose(PartitionIterator delegate, 
Runnable action)
+    {
+        return new PartitionIterator()
+        {
+            public void close()
+            {
+                try
+                {
+                    delegate.close();
+                }
+                finally
+                {
+                    action.run();
+                }
+            }
+
+            public boolean hasNext()
+            {
+                return delegate.hasNext();
+            }
+
+            public RowIterator next()
+            {
+                return delegate.next();
+            }
+        };
+    }
+
     private static class SingletonPartitionIterator extends 
AbstractIterator<RowIterator> implements PartitionIterator
     {
         private final RowIterator iterator;
diff --git 
a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java 
b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index bff910e..4af53e2 100644
--- 
a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ 
b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@ -27,7 +27,6 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.transform.FilteredPartitions;
-import org.apache.cassandra.db.transform.MorePartitions;
 import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
@@ -78,24 +77,6 @@ public abstract class UnfilteredPartitionIterators
         return Transformation.apply(toReturn, new Close());
     }
 
-    public static UnfilteredPartitionIterator concat(final 
List<UnfilteredPartitionIterator> iterators)
-    {
-        if (iterators.size() == 1)
-            return iterators.get(0);
-
-        class Extend implements MorePartitions<UnfilteredPartitionIterator>
-        {
-            int i = 1;
-            public UnfilteredPartitionIterator moreContents()
-            {
-                if (i >= iterators.size())
-                    return null;
-                return iterators.get(i++);
-            }
-        }
-        return MorePartitions.extend(iterators.get(0), new Extend());
-    }
-
     public static PartitionIterator filter(final UnfilteredPartitionIterator 
iterator, final int nowInSec)
     {
         return FilteredPartitions.filter(iterator, nowInSec);
diff --git a/src/java/org/apache/cassandra/db/rows/EncodingStats.java 
b/src/java/org/apache/cassandra/db/rows/EncodingStats.java
index 955ffc7..c1235e4 100644
--- a/src/java/org/apache/cassandra/db/rows/EncodingStats.java
+++ b/src/java/org/apache/cassandra/db/rows/EncodingStats.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.rows;
 
 import java.io.IOException;
 import java.util.*;
+import java.util.function.Function;
 
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.partitions.PartitionStatisticsCollector;
@@ -107,6 +108,29 @@ public class EncodingStats
         return new EncodingStats(minTimestamp, minDelTime, minTTL);
     }
 
+    /**
+     * Merge one or more EncodingStats, that are lazily materialized from some 
list of arbitrary type by the provided function
+     */
+    public static <V, F extends Function<V, EncodingStats>> EncodingStats 
merge(List<V> values, F function)
+    {
+        if (values.size() == 1)
+            return function.apply(values.get(0));
+
+        Collector collector = new Collector();
+        for (int i = 0, iSize = values.size(); i < iSize; i++)
+        {
+            V v = values.get(i);
+            EncodingStats stats = function.apply(v);
+            if (stats.minTimestamp != TIMESTAMP_EPOCH)
+                collector.updateTimestamp(stats.minTimestamp);
+            if (stats.minLocalDeletionTime != DELETION_TIME_EPOCH)
+                collector.updateLocalDeletionTime(stats.minLocalDeletionTime);
+            if (stats.minTTL != TTL_EPOCH)
+                collector.updateTTL(stats.minTTL);
+        }
+        return collector.get();
+    }
+
     @Override
     public boolean equals(Object o)
     {
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java 
b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index 1f4803e..a551a78 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
 import com.codahale.metrics.*;
 import com.codahale.metrics.Timer;
 import com.google.common.collect.Maps;
+
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.Memtable;
@@ -153,7 +154,16 @@ public class TableMetrics
 
     public final Meter readRepairRequests;
     public final Meter shortReadProtectionRequests;
-    public final Meter replicaSideFilteringProtectionRequests;
+    
+    public final Meter replicaFilteringProtectionRequests;
+    
+    /**
+     * This histogram records the maximum number of rows {@link 
org.apache.cassandra.service.ReplicaFilteringProtection}
+     * caches at a point in time per query. With no replica divergence, this 
is equivalent to the maximum number of
+     * cached rows in a single partition during a query. It can be helpful 
when choosing appropriate values for the
+     * replica_filtering_protection thresholds in cassandra.yaml. 
+     */
+    public final Histogram rfpRowsCachedPerQuery;
 
     public final Map<Sampler, TopKSampler<ByteBuffer>> samplers;
     /**
@@ -652,7 +662,8 @@ public class TableMetrics
 
         readRepairRequests = createTableMeter("ReadRepairRequests");
         shortReadProtectionRequests = 
createTableMeter("ShortReadProtectionRequests");
-        replicaSideFilteringProtectionRequests = 
createTableMeter("ReplicaSideFilteringProtectionRequests");
+        replicaFilteringProtectionRequests = 
createTableMeter("ReplicaFilteringProtectionRequests");
+        rfpRowsCachedPerQuery = 
createHistogram("ReplicaFilteringProtectionRowsCachedPerQuery", true);
     }
 
     public void updateSSTableIterated(int count)
@@ -771,6 +782,13 @@ public class TableMetrics
         register(name, alias, tableMeter);
         return tableMeter;
     }
+    
+    private Histogram createHistogram(String name, boolean considerZeroes)
+    {
+        Histogram histogram = 
Metrics.histogram(factory.createMetricName(name), 
aliasFactory.createMetricName(name), considerZeroes);
+        register(name, name, histogram);
+        return histogram;
+    }
 
     /**
      * Create a histogram-like interface that will register both a CF, 
keyspace and global level
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java 
b/src/java/org/apache/cassandra/service/DataResolver.java
index 02d355e..1d0bb47 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -49,7 +49,7 @@ public class DataResolver extends ResponseResolver
         Boolean.getBoolean("cassandra.drop_oversized_readrepair_mutations");
 
     @VisibleForTesting
-    final List<AsyncOneResponse> repairResults = 
Collections.synchronizedList(new ArrayList<>());
+    final List<AsyncOneResponse<?>> repairResults = 
Collections.synchronizedList(new ArrayList<>());
 
     private final boolean enforceStrictLiveness;
 
@@ -135,7 +135,7 @@ public class DataResolver extends ResponseResolver
         UnfilteredPartitionIterator originalResponse = 
responses.get(i).payload.makeIterator(command);
 
         return context.needShortReadProtection()
-               ? extendWithShortReadProtection(originalResponse, 
context.sources[i], context.mergedResultCounter)
+               ? extendWithShortReadProtection(originalResponse, context, i)
                : originalResponse;
     }
 
@@ -150,20 +150,20 @@ public class DataResolver extends ResponseResolver
     {
         // Protecting against inconsistent replica filtering (some replica 
returning a row that is outdated but that
         // wouldn't be removed by normal reconciliation because up-to-date 
replica have filtered the up-to-date version
-        // of that row) works in 3 steps:
-        //   1) we read the full response just to collect rows that may be 
outdated (the ones we got from some
-        //      replica but didn't got any response for other; it could be 
those other replica have filtered a more
-        //      up-to-date result). In doing so, we do not count any of such 
"potentially outdated" row towards the
-        //      query limit. This simulate the worst case scenario where all 
those "potentially outdated" rows are
-        //      indeed outdated, and thus make sure we are guaranteed to read 
enough results (thanks to short read
-        //      protection).
-        //   2) we query all the replica/rows we need to rule out whether 
those "potentially outdated" rows are outdated
-        //      or not.
-        //   3) we re-read cached copies of each replica response using the 
"normal" read path merge with read-repair,
-        //      but where for each replica we use their original response 
_plus_ the additional rows queried in the
-        //      previous step (and apply the command#rowFilter() on the full 
result). Since the first phase has
-        //      pessimistically collected enough results for the case where 
all potentially outdated results are indeed
-        //      outdated, we shouldn't need further short-read protection 
requests during this phase.
+        // of that row) involves 3 main elements:
+        //   1) We combine short-read protection and a merge listener that 
identifies potentially "out-of-date"
+        //      rows to create an iterator that is guaranteed to produce 
enough valid row results to satisfy the query 
+        //      limit if enough actually exist. A row is considered 
out-of-date if its merged from is non-empty and we 
+        //      receive not response from at least one replica. In this case, 
it is possible that filtering at the
+        //      "silent" replica has produced a more up-to-date result.
+        //   2) This iterator is passed to the standard resolution process 
with read-repair, but is first wrapped in a 
+        //      response provider that lazily "completes" potentially 
out-of-date rows by directly querying them on the
+        //      replicas that were previously silent. As this iterator is 
consumed, it caches valid data for potentially
+        //      out-of-date rows, and this cached data is merged with the 
fetched data as rows are requested. If there
+        //      is no replica divergence, only rows in the partition being 
evalutated will be cached (then released
+        //      when the partition is consumed).
+        //   3) After a "complete" row is materialized, it must pass the row 
filter supplied by the original query 
+        //      before it counts against the limit.
 
         // We could get more responses while this method runs, which is ok 
(we're happy to ignore any response not here
         // at the beginning of this method), so grab the response count once 
and use that through the method.
@@ -171,25 +171,25 @@ public class DataResolver extends ResponseResolver
         // We need separate contexts, as each context has his own counter
         ResolveContext firstPhaseContext = new ResolveContext(count);
         ResolveContext secondPhaseContext = new ResolveContext(count);
-        ReplicaFilteringProtection rfp = new 
ReplicaFilteringProtection(keyspace, command, consistency, 
firstPhaseContext.sources);
+
+        ReplicaFilteringProtection rfp = new 
ReplicaFilteringProtection(keyspace,
+                                                                        
command,
+                                                                        
consistency,
+                                                                        
firstPhaseContext.sources,
+                                                                        
DatabaseDescriptor.getCachedReplicaRowsWarnThreshold(),
+                                                                        
DatabaseDescriptor.getCachedReplicaRowsFailThreshold());
+
         PartitionIterator firstPhasePartitions = 
resolveInternal(firstPhaseContext,
                                                                  
rfp.mergeController(),
                                                                  i -> 
shortReadProtectedResponse(i, firstPhaseContext),
                                                                  
UnaryOperator.identity());
-
-        // Consume the first phase partitions to populate the replica 
filtering protection with both those materialized
-        // partitions and the primary keys to be fetched.
-        PartitionIterators.consume(firstPhasePartitions);
-        firstPhasePartitions.close();
-
-        // After reading the entire query results the protection helper should 
have cached all the partitions so we can
-        // clear the responses accumulator for the sake of memory usage, given 
that the second phase might take long if
-        // it needs to query replicas.
-        responses.clearUnsafe();
-
-        return resolveWithReadRepair(secondPhaseContext,
-                                     rfp::queryProtectedPartitions,
-                                     results -> 
command.rowFilter().filter(results, command.metadata(), command.nowInSec()));
+        
+        PartitionIterator completedPartitions = 
resolveWithReadRepair(secondPhaseContext,
+                                                                      i -> 
rfp.queryProtectedPartitions(firstPhasePartitions, i),
+                                                                      results 
-> command.rowFilter().filter(results, command.metadata(), command.nowInSec()));
+        
+        // Ensure that the RFP instance has a chance to record metrics when 
the iterator closes.
+        return PartitionIterators.doOnClose(completedPartitions, 
firstPhasePartitions::close);
     }
 
     private PartitionIterator resolveInternal(ResolveContext context,
@@ -217,8 +217,8 @@ public class DataResolver extends ResponseResolver
          */
 
         UnfilteredPartitionIterator merged = 
UnfilteredPartitionIterators.merge(results, command.nowInSec(), mergeListener);
-        FilteredPartitions filtered =
-        FilteredPartitions.filter(merged, new Filter(command.nowInSec(), 
command.metadata().enforceStrictLiveness()));
+        Filter filter = new Filter(command.nowInSec(), 
command.metadata().enforceStrictLiveness());
+        FilteredPartitions filtered = FilteredPartitions.filter(merged, 
filter);
         PartitionIterator counted = 
Transformation.apply(preCountFilter.apply(filtered), 
context.mergedResultCounter);
 
         return command.isForThrift()
@@ -598,14 +598,17 @@ public class DataResolver extends ResponseResolver
     }
 
     private UnfilteredPartitionIterator 
extendWithShortReadProtection(UnfilteredPartitionIterator partitions,
-                                                                      
InetAddress source,
-                                                                      
DataLimits.Counter mergedResultCounter)
+                                                                      
ResolveContext context,
+                                                                      int i)
     {
         DataLimits.Counter singleResultCounter =
             command.limits().newCounter(command.nowInSec(), false, 
command.selectsFullPartition(), enforceStrictLiveness).onlyCount();
 
-        ShortReadPartitionsProtection protection =
-            new ShortReadPartitionsProtection(source, singleResultCounter, 
mergedResultCounter);
+        // The pre-fetch callback used here makes the initial round of 
responses for this replica collectable.
+        ShortReadPartitionsProtection protection = new 
ShortReadPartitionsProtection(context.sources[i],
+                                                                               
      () -> responses.clearUnsafe(i),
+                                                                               
      singleResultCounter,
+                                                                               
      context.mergedResultCounter);
 
         /*
          * The order of extention and transformations is important here. 
Extending with more partitions has to happen
@@ -642,6 +645,7 @@ public class DataResolver extends ResponseResolver
     private class ShortReadPartitionsProtection extends 
Transformation<UnfilteredRowIterator> implements 
MorePartitions<UnfilteredPartitionIterator>
     {
         private final InetAddress source;
+        private final Runnable preFetchCallback; // called immediately before 
fetching more contents
 
         private final DataLimits.Counter singleResultCounter; // unmerged 
per-source counter
         private final DataLimits.Counter mergedResultCounter; // merged 
end-result counter
@@ -650,9 +654,13 @@ public class DataResolver extends ResponseResolver
 
         private boolean partitionsFetched; // whether we've seen any new 
partitions since iteration start or last moreContents() call
 
-        private ShortReadPartitionsProtection(InetAddress source, 
DataLimits.Counter singleResultCounter, DataLimits.Counter mergedResultCounter)
+        private ShortReadPartitionsProtection(InetAddress source,
+                                              Runnable preFetchCallback,
+                                              DataLimits.Counter 
singleResultCounter,
+                                              DataLimits.Counter 
mergedResultCounter)
         {
             this.source = source;
+            this.preFetchCallback = preFetchCallback;
             this.singleResultCounter = singleResultCounter;
             this.mergedResultCounter = mergedResultCounter;
         }
@@ -723,6 +731,9 @@ public class DataResolver extends ResponseResolver
             
ColumnFamilyStore.metricsFor(command.metadata().cfId).shortReadProtectionRequests.mark();
             Tracing.trace("Requesting {} extra rows from {} for short read 
protection", toQuery, source);
 
+            // If we've arrived here, all responses have been consumed, and 
we're about to request more.
+            preFetchCallback.run();
+            
             PartitionRangeReadCommand cmd = 
makeFetchAdditionalPartitionReadCommand(toQuery);
             return executeReadCommand(cmd);
         }
@@ -742,7 +753,7 @@ public class DataResolver extends ResponseResolver
             return cmd.withUpdatedLimitsAndDataRange(newLimits, newDataRange);
         }
 
-        private class ShortReadRowsProtection extends Transformation 
implements MoreRows<UnfilteredRowIterator>
+        private class ShortReadRowsProtection extends 
Transformation<UnfilteredRowIterator> implements MoreRows<UnfilteredRowIterator>
         {
             private final CFMetaData metadata;
             private final DecoratedKey partitionKey;
diff --git 
a/src/java/org/apache/cassandra/service/ReplicaFilteringProtection.java 
b/src/java/org/apache/cassandra/service/ReplicaFilteringProtection.java
index 36d51cc..0a57e66 100644
--- a/src/java/org/apache/cassandra/service/ReplicaFilteringProtection.java
+++ b/src/java/org/apache/cassandra/service/ReplicaFilteringProtection.java
@@ -19,15 +19,15 @@
 package org.apache.cassandra.service;
 
 import java.net.InetAddress;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 import java.util.NavigableSet;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.stream.Collectors;
+import java.util.concurrent.TimeUnit;
+import java.util.Queue;
+import java.util.function.Function;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,6 +49,8 @@ import org.apache.cassandra.db.filter.ClusteringIndexFilter;
 import org.apache.cassandra.db.filter.ClusteringIndexNamesFilter;
 import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.PartitionIterators;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 import org.apache.cassandra.db.rows.EncodingStats;
@@ -58,11 +60,13 @@ import org.apache.cassandra.db.rows.Rows;
 import org.apache.cassandra.db.rows.Unfiltered;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.rows.UnfilteredRowIterators;
+import org.apache.cassandra.exceptions.OverloadedException;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
 import org.apache.cassandra.exceptions.UnavailableException;
 import org.apache.cassandra.metrics.TableMetrics;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.NoSpamLogger;
 import org.apache.cassandra.utils.btree.BTreeSet;
 
 /**
@@ -74,11 +78,15 @@ import org.apache.cassandra.utils.btree.BTreeSet;
  * the rows in a replica response that don't have a corresponding row in other 
replica responses, and requests them by
  * primary key to the "silent" replicas in a second fetch round.
  * <p>
- * See CASSANDRA-8272 and CASSANDRA-8273 for further details.
+ * See CASSANDRA-8272, CASSANDRA-8273, and CASSANDRA-15907 for further details.
  */
 class ReplicaFilteringProtection
 {
     private static final Logger logger = 
LoggerFactory.getLogger(ReplicaFilteringProtection.class);
+    private static final NoSpamLogger oneMinuteLogger = 
NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES);
+
+    private static final Function<UnfilteredRowIterator, EncodingStats> 
NULL_TO_NO_STATS =
+        rowIterator -> rowIterator == null ? EncodingStats.NO_STATS : 
rowIterator.stats();
 
     private final Keyspace keyspace;
     private final ReadCommand command;
@@ -86,106 +94,42 @@ class ReplicaFilteringProtection
     private final InetAddress[] sources;
     private final TableMetrics tableMetrics;
 
-    /**
-     * Per-source primary keys of the rows that might be outdated so they need 
to be fetched.
-     * For outdated static rows we use an empty builder to signal it has to be 
queried.
-     */
-    private final List<SortedMap<DecoratedKey, BTreeSet.Builder<Clustering>>> 
rowsToFetch;
+    private final int cachedRowsWarnThreshold;
+    private final int cachedRowsFailThreshold;
+    
+    /** Tracks whether or not we've already hit the warning threshold while 
evaluating a partition. */
+    private boolean hitWarningThreshold = false;
+
+    private int currentRowsCached = 0; // tracks the current number of cached 
rows
+    private int maxRowsCached = 0; // tracks the high watermark for the number 
of cached rows
 
     /**
-     * Per-source list of all the partitions seen by the merge listener, to be 
merged with the extra fetched rows.
+     * Per-source list of the pending partitions seen by the merge listener, 
to be merged with the extra fetched rows.
      */
-    private final List<List<PartitionBuilder>> originalPartitions;
+    private final List<Queue<PartitionBuilder>> originalPartitions;
 
     ReplicaFilteringProtection(Keyspace keyspace,
                                ReadCommand command,
                                ConsistencyLevel consistency,
-                               InetAddress[] sources)
+                               InetAddress[] sources,
+                               int cachedRowsWarnThreshold,
+                               int cachedRowsFailThreshold)
     {
         this.keyspace = keyspace;
         this.command = command;
         this.consistency = consistency;
         this.sources = sources;
-        this.rowsToFetch = new ArrayList<>(sources.length);
         this.originalPartitions = new ArrayList<>(sources.length);
 
-        for (InetAddress ignored : sources)
+        for (int i = 0; i < sources.length; i++)
         {
-            rowsToFetch.add(new TreeMap<>());
-            originalPartitions.add(new ArrayList<>());
+            originalPartitions.add(new ArrayDeque<>());
         }
 
         tableMetrics = ColumnFamilyStore.metricsFor(command.metadata().cfId);
-    }
 
-    private BTreeSet.Builder<Clustering> getOrCreateToFetch(int source, 
DecoratedKey partitionKey)
-    {
-        return rowsToFetch.get(source).computeIfAbsent(partitionKey, k -> 
BTreeSet.builder(command.metadata().comparator));
-    }
-
-    /**
-     * Returns the protected results for the specified replica. These are 
generated fetching the extra rows and merging
-     * them with the cached original filtered results for that replica.
-     *
-     * @param source the source
-     * @return the protected results for the specified replica
-     */
-    UnfilteredPartitionIterator queryProtectedPartitions(int source)
-    {
-        UnfilteredPartitionIterator original = 
makeIterator(originalPartitions.get(source));
-        SortedMap<DecoratedKey, BTreeSet.Builder<Clustering>> toFetch = 
rowsToFetch.get(source);
-
-        if (toFetch.isEmpty())
-            return original;
-
-        // TODO: this would be more efficient if we had multi-key queries 
internally
-        List<UnfilteredPartitionIterator> fetched = toFetch.keySet()
-                                                           .stream()
-                                                           .map(k -> 
querySourceOnKey(source, k))
-                                                           
.collect(Collectors.toList());
-
-        return UnfilteredPartitionIterators.merge(Arrays.asList(original, 
UnfilteredPartitionIterators.concat(fetched)),
-                                                  command.nowInSec(), null);
-    }
-
-    private UnfilteredPartitionIterator querySourceOnKey(int i, DecoratedKey 
key)
-    {
-        BTreeSet.Builder<Clustering> builder = rowsToFetch.get(i).get(key);
-        assert builder != null; // We're calling this on the result of 
rowsToFetch.get(i).keySet()
-
-        InetAddress source = sources[i];
-        NavigableSet<Clustering> clusterings = builder.build();
-        tableMetrics.replicaSideFilteringProtectionRequests.mark();
-        if (logger.isTraceEnabled())
-            logger.trace("Requesting rows {} in partition {} from {} for 
replica-side filtering protection",
-                         clusterings, key, source);
-        Tracing.trace("Requesting {} rows in partition {} from {} for 
replica-side filtering protection",
-                      clusterings.size(), key, source);
-
-        // build the read command taking into account that we could be 
requesting only in the static row
-        DataLimits limits = clusterings.isEmpty() ? DataLimits.cqlLimits(1) : 
DataLimits.NONE;
-        ClusteringIndexFilter filter = new 
ClusteringIndexNamesFilter(clusterings, command.isReversed());
-        SinglePartitionReadCommand cmd = 
SinglePartitionReadCommand.create(command.metadata(),
-                                                                           
command.nowInSec(),
-                                                                           
command.columnFilter(),
-                                                                           
RowFilter.NONE,
-                                                                           
limits,
-                                                                           key,
-                                                                           
filter);
-        try
-        {
-            return executeReadCommand(cmd, source);
-        }
-        catch (ReadTimeoutException e)
-        {
-            int blockFor = consistency.blockFor(keyspace);
-            throw new ReadTimeoutException(consistency, blockFor - 1, 
blockFor, true);
-        }
-        catch (UnavailableException e)
-        {
-            int blockFor = consistency.blockFor(keyspace);
-            throw new UnavailableException(consistency, blockFor, blockFor - 
1);
-        }
+        this.cachedRowsWarnThreshold = cachedRowsWarnThreshold;
+        this.cachedRowsFailThreshold = cachedRowsFailThreshold;
     }
 
     private UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd, 
InetAddress source)
@@ -212,83 +156,124 @@ class ReplicaFilteringProtection
      * <p>
      * The listener will track both the accepted data and the primary keys of 
the rows that are considered as outdated.
      * That way, once the query results would have been merged using this 
listener, further calls to
-     * {@link #queryProtectedPartitions(int)} will use the collected data to 
return a copy of the
+     * {@link #queryProtectedPartitions(PartitionIterator, int)} will use the 
collected data to return a copy of the
      * data originally collected from the specified replica, completed with 
the potentially outdated rows.
      */
     UnfilteredPartitionIterators.MergeListener mergeController()
     {
-        return (partitionKey, versions) -> {
-
-            PartitionBuilder[] builders = new PartitionBuilder[sources.length];
-
-            for (int i = 0; i < sources.length; i++)
-                builders[i] = new PartitionBuilder(partitionKey, 
columns(versions), stats(versions));
+        return new UnfilteredPartitionIterators.MergeListener()
+        {
+            @Override
+            public void close()
+            {
+                // If we hit the failure threshold before consuming a single 
partition, record the current rows cached.
+                
tableMetrics.rfpRowsCachedPerQuery.update(Math.max(currentRowsCached, 
maxRowsCached));
+            }
 
-            return new UnfilteredRowIterators.MergeListener()
+            @Override
+            public UnfilteredRowIterators.MergeListener 
getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> 
versions)
             {
-                @Override
-                public void onMergedPartitionLevelDeletion(DeletionTime 
mergedDeletion, DeletionTime[] versions)
+                PartitionBuilder[] builders = new 
PartitionBuilder[sources.length];
+                PartitionColumns columns = columns(versions);
+                EncodingStats stats = EncodingStats.merge(versions, 
NULL_TO_NO_STATS);
+                
+                for (int i = 0; i < sources.length; i++)
+                    builders[i] = new PartitionBuilder(partitionKey, 
sources[i], columns, stats);
+
+                return new UnfilteredRowIterators.MergeListener()
                 {
-                    // cache the deletion time versions to be able to 
regenerate the original row iterator
-                    for (int i = 0; i < versions.length; i++)
-                        builders[i].setDeletionTime(versions[i]);
-                }
+                    @Override
+                    public void onMergedPartitionLevelDeletion(DeletionTime 
mergedDeletion, DeletionTime[] versions)
+                    {
+                        // cache the deletion time versions to be able to 
regenerate the original row iterator
+                        for (int i = 0; i < versions.length; i++)
+                            builders[i].setDeletionTime(versions[i]);
+                    }
 
-                @Override
-                public Row onMergedRows(Row merged, Row[] versions)
-                {
-                    // cache the row versions to be able to regenerate the 
original row iterator
-                    for (int i = 0; i < versions.length; i++)
-                        builders[i].addRow(versions[i]);
+                    @Override
+                    public Row onMergedRows(Row merged, Row[] versions)
+                    {
+                        // cache the row versions to be able to regenerate the 
original row iterator
+                        for (int i = 0; i < versions.length; i++)
+                            builders[i].addRow(versions[i]);
 
-                    if (merged.isEmpty())
-                        return merged;
+                        if (merged.isEmpty())
+                            return merged;
 
-                    boolean isPotentiallyOutdated = false;
-                    boolean isStatic = merged.isStatic();
-                    for (int i = 0; i < versions.length; i++)
-                    {
-                        Row version = versions[i];
-                        if (version == null || (isStatic && version.isEmpty()))
+                        boolean isPotentiallyOutdated = false;
+                        boolean isStatic = merged.isStatic();
+                        for (int i = 0; i < versions.length; i++)
                         {
-                            isPotentiallyOutdated = true;
-                            BTreeSet.Builder<Clustering> toFetch = 
getOrCreateToFetch(i, partitionKey);
-                            // Note that for static, we shouldn't add the 
clustering to the clustering set (the
-                            // ClusteringIndexNamesFilter we'll build from 
this later does not expect it), but the fact
-                            // we created a builder in the first place will 
act as a marker that the static row must be
-                            // fetched, even if no other rows are added for 
this partition.
-                            if (!isStatic)
-                                toFetch.add(merged.clustering());
+                            Row version = versions[i];
+                            if (version == null || (isStatic && 
version.isEmpty()))
+                            {
+                                isPotentiallyOutdated = true;
+                                builders[i].addToFetch(merged);
+                            }
                         }
-                    }
 
-                    // If the row is potentially outdated (because some 
replica didn't send anything and so it _may_ be
-                    // an outdated result that is only present because other 
replica have filtered the up-to-date result
-                    // out), then we skip the row. In other words, the results 
of the initial merging of results by this
-                    // protection assume the worst case scenario where every 
row that might be outdated actually is.
-                    // This ensures that during this first phase (collecting 
additional row to fetch) we are guaranteed
-                    // to look at enough data to ultimately fulfill the query 
limit.
-                    return isPotentiallyOutdated ? null : merged;
-                }
+                        // If the row is potentially outdated (because some 
replica didn't send anything and so it _may_ be
+                        // an outdated result that is only present because 
other replica have filtered the up-to-date result
+                        // out), then we skip the row. In other words, the 
results of the initial merging of results by this
+                        // protection assume the worst case scenario where 
every row that might be outdated actually is.
+                        // This ensures that during this first phase 
(collecting additional row to fetch) we are guaranteed
+                        // to look at enough data to ultimately fulfill the 
query limit.
+                        return isPotentiallyOutdated ? null : merged;
+                    }
 
-                @Override
-                public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker 
merged, RangeTombstoneMarker[] versions)
-                {
-                    // cache the marker versions to be able to regenerate the 
original row iterator
-                    for (int i = 0; i < versions.length; i++)
-                        builders[i].addRangeTombstoneMarker(versions[i]);
-                }
+                    @Override
+                    public void 
onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, 
RangeTombstoneMarker[] versions)
+                    {
+                        // cache the marker versions to be able to regenerate 
the original row iterator
+                        for (int i = 0; i < versions.length; i++)
+                            builders[i].addRangeTombstoneMarker(versions[i]);
+                    }
 
-                @Override
-                public void close()
-                {
-                    for (int i = 0; i < sources.length; i++)
-                        originalPartitions.get(i).add(builders[i]);
-                }
-            };
+                    @Override
+                    public void close()
+                    {
+                        for (int i = 0; i < sources.length; i++)
+                            originalPartitions.get(i).add(builders[i]);
+                    }
+                };
+            }
         };
     }
 
+    private void incrementCachedRows()
+    {
+        currentRowsCached++;
+        
+        if (currentRowsCached == cachedRowsFailThreshold + 1)
+        {
+            String message = String.format("Replica filtering protection has 
cached over %d rows during query %s. " +
+                                           "(See 
'cached_replica_rows_fail_threshold' in cassandra.yaml.)",
+                                           cachedRowsFailThreshold, 
command.toCQLString());
+
+            logger.error(message);
+            Tracing.trace(message);
+            throw new OverloadedException(message);
+        }
+        else if (currentRowsCached == cachedRowsWarnThreshold + 1 && 
!hitWarningThreshold)
+        {
+            hitWarningThreshold = true;
+            
+            String message = String.format("Replica filtering protection has 
cached over %d rows during query %s. " +
+                                           "(See 
'cached_replica_rows_warn_threshold' in cassandra.yaml.)",
+                                           cachedRowsWarnThreshold, 
command.toCQLString());
+
+            ClientWarn.instance.warn(message);
+            oneMinuteLogger.warn(message);
+            Tracing.trace(message);
+        }
+    }
+
+    private void releaseCachedRows(int count)
+    {
+        maxRowsCached = Math.max(maxRowsCached, currentRowsCached);
+        currentRowsCached -= count;
+    }
+
     private static PartitionColumns columns(List<UnfilteredRowIterator> 
versions)
     {
         Columns statics = Columns.NONE;
@@ -305,24 +290,19 @@ class ReplicaFilteringProtection
         return new PartitionColumns(statics, regulars);
     }
 
-    private static EncodingStats stats(List<UnfilteredRowIterator> iterators)
-    {
-        EncodingStats stats = EncodingStats.NO_STATS;
-        for (UnfilteredRowIterator iter : iterators)
-        {
-            if (iter == null)
-                continue;
-
-            stats = stats.mergeWith(iter.stats());
-        }
-        return stats;
-    }
-
-    private UnfilteredPartitionIterator makeIterator(List<PartitionBuilder> 
builders)
+    /**
+     * Returns the protected results for the specified replica. These are 
generated fetching the extra rows and merging
+     * them with the cached original filtered results for that replica.
+     *
+     * @param merged the first iteration partitions, that should have been 
read used with the {@link #mergeController()}
+     * @param source the source
+     * @return the protected results for the specified replica
+     */
+    UnfilteredPartitionIterator queryProtectedPartitions(PartitionIterator 
merged, int source)
     {
         return new UnfilteredPartitionIterator()
         {
-            final Iterator<PartitionBuilder> iterator = builders.iterator();
+            final Queue<PartitionBuilder> partitions = 
originalPartitions.get(source);
 
             @Override
             public boolean isForThrift()
@@ -337,37 +317,48 @@ class ReplicaFilteringProtection
             }
 
             @Override
-            public void close()
-            {
-                // nothing to do here
-            }
+            public void close() { }
 
             @Override
             public boolean hasNext()
             {
-                return iterator.hasNext();
+                // If there are no cached partition builders for this source, 
advance the first phase iterator, which
+                // will force the RFP merge listener to load at least the next 
protected partition. Note that this may
+                // load more than one partition if any divergence between 
replicas is discovered by the merge listener.
+                if (partitions.isEmpty())
+                {
+                    PartitionIterators.consumeNext(merged);
+                }
+                
+                return !partitions.isEmpty();
             }
 
             @Override
             public UnfilteredRowIterator next()
             {
-                return iterator.next().build();
+                PartitionBuilder builder = partitions.poll();
+                assert builder != null;
+                return builder.protectedPartition();
             }
         };
     }
 
     private class PartitionBuilder
     {
-        private final DecoratedKey partitionKey;
+        private final DecoratedKey key;
+        private final InetAddress source;
         private final PartitionColumns columns;
         private final EncodingStats stats;
         private DeletionTime deletionTime;
         private Row staticRow = Rows.EMPTY_STATIC_ROW;
-        private final List<Unfiltered> contents = new ArrayList<>();
+        private final Queue<Unfiltered> contents = new ArrayDeque<>();
+        private BTreeSet.Builder<Clustering> toFetch;
+        private int partitionRowsCached;
 
-        private PartitionBuilder(DecoratedKey partitionKey, PartitionColumns 
columns, EncodingStats stats)
+        private PartitionBuilder(DecoratedKey key, InetAddress source, 
PartitionColumns columns, EncodingStats stats)
         {
-            this.partitionKey = partitionKey;
+            this.key = key;
+            this.source = source;
             this.columns = columns;
             this.stats = stats;
         }
@@ -379,6 +370,12 @@ class ReplicaFilteringProtection
 
         private void addRow(Row row)
         {
+            partitionRowsCached++;
+
+            incrementCachedRows();
+
+            // Note that even null rows are counted against the row caching 
limit. The assumption is that
+            // a subsequent protection query will later fetch the row onto the 
heap anyway.
             if (row == null)
                 return;
 
@@ -394,12 +391,23 @@ class ReplicaFilteringProtection
                 contents.add(marker);
         }
 
-        private UnfilteredRowIterator build()
+        private void addToFetch(Row row)
+        {
+            if (toFetch == null)
+                toFetch = BTreeSet.builder(command.metadata().comparator);
+
+            // Note that for static, we shouldn't add the clustering to the 
clustering set (the
+            // ClusteringIndexNamesFilter we'll build from this later does not 
expect it), but the fact
+            // we created a builder in the first place will act as a marker 
that the static row must be
+            // fetched, even if no other rows are added for this partition.
+            if (!row.isStatic())
+                toFetch.add(row.clustering());
+        }
+
+        private UnfilteredRowIterator originalPartition()
         {
             return new UnfilteredRowIterator()
             {
-                final Iterator<Unfiltered> iterator = contents.iterator();
-
                 @Override
                 public DeletionTime partitionLevelDeletion()
                 {
@@ -433,7 +441,7 @@ class ReplicaFilteringProtection
                 @Override
                 public DecoratedKey partitionKey()
                 {
-                    return partitionKey;
+                    return key;
                 }
 
                 @Override
@@ -445,21 +453,82 @@ class ReplicaFilteringProtection
                 @Override
                 public void close()
                 {
-                    // nothing to do here
+                    releaseCachedRows(partitionRowsCached);
                 }
 
                 @Override
                 public boolean hasNext()
                 {
-                    return iterator.hasNext();
+                    return !contents.isEmpty();
                 }
 
                 @Override
                 public Unfiltered next()
                 {
-                    return iterator.next();
+                    return contents.poll();
                 }
             };
         }
+
+        private UnfilteredRowIterator protectedPartition()
+        {
+            UnfilteredRowIterator original = originalPartition();
+
+            if (toFetch != null)
+            {
+                try (UnfilteredPartitionIterator partitions = 
fetchFromSource())
+                {
+                    if (partitions.hasNext())
+                    {
+                        try (UnfilteredRowIterator fetchedRows = 
partitions.next())
+                        {
+                            return 
UnfilteredRowIterators.merge(Arrays.asList(original, fetchedRows), 
command.nowInSec());
+                        }
+                    }
+                }
+            }
+
+            return original;
+        }
+
+        private UnfilteredPartitionIterator fetchFromSource()
+        {
+            assert toFetch != null;
+
+            NavigableSet<Clustering> clusterings = toFetch.build();
+            tableMetrics.replicaFilteringProtectionRequests.mark();
+            
+            if (logger.isTraceEnabled())
+                logger.trace("Requesting rows {} in partition {} from {} for 
replica filtering protection",
+                             clusterings, key, source);
+            
+            Tracing.trace("Requesting {} rows in partition {} from {} for 
replica filtering protection",
+                          clusterings.size(), key, source);
+
+            // build the read command taking into account that we could be 
requesting only in the static row
+            DataLimits limits = clusterings.isEmpty() ? 
DataLimits.cqlLimits(1) : DataLimits.NONE;
+            ClusteringIndexFilter filter = new 
ClusteringIndexNamesFilter(clusterings, command.isReversed());
+            SinglePartitionReadCommand cmd = 
SinglePartitionReadCommand.create(command.metadata(),
+                                                                               
command.nowInSec(),
+                                                                               
command.columnFilter(),
+                                                                               
RowFilter.NONE,
+                                                                               
limits,
+                                                                               
key,
+                                                                               
filter);
+            try
+            {
+                return executeReadCommand(cmd, source);
+            }
+            catch (ReadTimeoutException e)
+            {
+                int blockFor = consistency.blockFor(keyspace);
+                throw new ReadTimeoutException(consistency, blockFor - 1, 
blockFor, true);
+            }
+            catch (UnavailableException e)
+            {
+                int blockFor = consistency.blockFor(keyspace);
+                throw new UnavailableException(consistency, blockFor, blockFor 
- 1);
+            }
+        }
     }
 }
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index d287788..f0b183d 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -4799,6 +4799,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
     public void setTombstoneWarnThreshold(int threshold)
     {
         DatabaseDescriptor.setTombstoneWarnThreshold(threshold);
+        logger.info("updated tombstone_warn_threshold to {}", threshold);
     }
 
     public int getTombstoneFailureThreshold()
@@ -4809,6 +4810,29 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
     public void setTombstoneFailureThreshold(int threshold)
     {
         DatabaseDescriptor.setTombstoneFailureThreshold(threshold);
+        logger.info("updated tombstone_failure_threshold to {}", threshold);
+    }
+
+    public int getCachedReplicaRowsWarnThreshold()
+    {
+        return DatabaseDescriptor.getCachedReplicaRowsWarnThreshold();
+    }
+
+    public void setCachedReplicaRowsWarnThreshold(int threshold)
+    {
+        DatabaseDescriptor.setCachedReplicaRowsWarnThreshold(threshold);
+        logger.info("updated 
replica_filtering_protection.cached_rows_warn_threshold to {}", threshold);
+    }
+
+    public int getCachedReplicaRowsFailThreshold()
+    {
+        return DatabaseDescriptor.getCachedReplicaRowsFailThreshold();
+    }
+
+    public void setCachedReplicaRowsFailThreshold(int threshold)
+    {
+        DatabaseDescriptor.setCachedReplicaRowsFailThreshold(threshold);
+        logger.info("updated 
replica_filtering_protection.cached_rows_fail_threshold to {}", threshold);
     }
 
     public int getBatchSizeFailureThreshold()
@@ -4819,12 +4843,13 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
     public void setBatchSizeFailureThreshold(int threshold)
     {
         DatabaseDescriptor.setBatchSizeFailThresholdInKB(threshold);
+        logger.info("updated batch_size_fail_threshold_in_kb to {}", 
threshold);
     }
 
     public void setHintedHandoffThrottleInKB(int throttleInKB)
     {
         DatabaseDescriptor.setHintedHandoffThrottleInKB(throttleInKB);
-        logger.info(String.format("Updated hinted_handoff_throttle_in_kb to 
%d", throttleInKB));
+        logger.info("updated hinted_handoff_throttle_in_kb to {}", 
throttleInKB);
     }
 
     @VisibleForTesting
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java 
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index e22b094..1afa48e 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -604,6 +604,18 @@ public interface StorageServiceMBean extends 
NotificationEmitter
     /** Sets the threshold for abandoning queries with many tombstones */
     public void setTombstoneFailureThreshold(int tombstoneDebugThreshold);
 
+    /** Returns the number of rows cached at the coordinator before 
filtering/index queries log a warning. */
+    public int getCachedReplicaRowsWarnThreshold();
+
+    /** Sets the number of rows cached at the coordinator before 
filtering/index queries log a warning. */
+    public void setCachedReplicaRowsWarnThreshold(int threshold);
+
+    /** Returns the number of rows cached at the coordinator before 
filtering/index queries fail outright. */
+    public int getCachedReplicaRowsFailThreshold();
+
+    /** Sets the number of rows cached at the coordinator before 
filtering/index queries fail outright. */
+    public void setCachedReplicaRowsFailThreshold(int threshold);
+
     /** Returns the threshold for rejecting queries due to a large batch size 
*/
     public int getBatchSizeFailureThreshold();
     /** Sets the threshold for rejecting queries due to a large batch size */
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java 
b/src/java/org/apache/cassandra/utils/FBUtilities.java
index b560adf..d3633fd 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -437,9 +437,9 @@ public class FBUtilities
         }
     }
 
-    public static void waitOnFutures(List<AsyncOneResponse> results, long ms) 
throws TimeoutException
+    public static void waitOnFutures(List<AsyncOneResponse<?>> results, long 
ms) throws TimeoutException
     {
-        for (AsyncOneResponse result : results)
+        for (AsyncOneResponse<?> result : results)
             result.get(ms, TimeUnit.MILLISECONDS);
     }
 
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java 
b/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java
index ca9bb09..15afdbe 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java
@@ -18,7 +18,6 @@
 */
 package org.apache.cassandra.utils.concurrent;
 
-import java.util.Arrays;
 import java.util.Iterator;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
@@ -138,14 +137,12 @@ public class Accumulator<E> implements Iterable<E>
     }
 
     /**
-     * Removes all of the elements from this accumulator.
+     * Removes element at the speficied index from this accumulator.
      *
      * This method is not thread-safe when used concurrently with {@link 
#add(Object)}.
      */
-    public void clearUnsafe()
+    public void clearUnsafe(int i)
     {
-        nextIndexUpdater.set(this, 0);
-        presentCountUpdater.set(this, 0);
-        Arrays.fill(values, null);
+        values[i] = null;
     }
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java 
b/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
index 2f2b525..51a08e6 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.distributed.api.QueryResult;
 import org.apache.cassandra.distributed.api.QueryResults;
 import org.apache.cassandra.distributed.api.SimpleQueryResult;
 import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.pager.QueryPager;
 import org.apache.cassandra.transport.Server;
@@ -91,6 +92,11 @@ public class Coordinator implements ICoordinator
             boundBBValues.add(ByteBufferUtil.objectToBytes(boundValue));
 
         prepared.validate(QueryState.forInternalCalls().getClientState());
+
+        // Start capturing warnings on this thread. Note that this will 
implicitly clear out any previous 
+        // warnings as it sets a new State instance on the ThreadLocal.
+        ClientWarn.instance.captureWarnings();
+        
         ResultMessage res = prepared.execute(QueryState.forInternalCalls(),
                                              
QueryOptions.create(toCassandraCL(consistencyLevel),
                                                                  boundBBValues,
@@ -100,6 +106,10 @@ public class Coordinator implements ICoordinator
                                                                  null,
                                                                  
Server.CURRENT_VERSION));
 
+        // Collect warnings reported during the query.
+        if (res != null)
+            res.setWarnings(ClientWarn.instance.getWarnings());
+
         return RowUtil.toQueryResult(res);
     }
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/RowUtil.java 
b/test/distributed/org/apache/cassandra/distributed/impl/RowUtil.java
index 50d501e..876ce29 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/RowUtil.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/RowUtil.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.distributed.impl;
 
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
@@ -41,7 +42,11 @@ public class RowUtil
             ResultMessage.Rows rows = (ResultMessage.Rows) res;
             String[] names = getColumnNames(rows.result.metadata.names);
             Object[][] results = RowUtil.toObjects(rows);
-            return new SimpleQueryResult(names, results);
+            
+            // Warnings may be null here, due to ClientWarn#getWarnings() 
handling of empty warning lists.
+            List<String> warnings = res.getWarnings();
+
+            return new SimpleQueryResult(names, results, warnings == null ? 
Collections.emptyList() : warnings);
         }
         else
         {
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/ReplicaFilteringProtectionTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/ReplicaFilteringProtectionTest.java
new file mode 100644
index 0000000..2a847bf
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/ReplicaFilteringProtectionTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.exceptions.OverloadedException;
+import org.apache.cassandra.service.StorageService;
+
+import static 
org.apache.cassandra.config.ReplicaFilteringProtectionOptions.DEFAULT_FAIL_THRESHOLD;
+import static 
org.apache.cassandra.config.ReplicaFilteringProtectionOptions.DEFAULT_WARN_THRESHOLD;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.apache.cassandra.distributed.shared.AssertUtils.row;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Exercises the functionality of {@link 
org.apache.cassandra.service.ReplicaFilteringProtection}, the
+ * mechanism that ensures distributed index and filtering queries at read 
consistency levels > ONE/LOCAL_ONE
+ * avoid stale replica results.
+ */
+public class ReplicaFilteringProtectionTest extends TestBaseImpl
+{
+    private static final int REPLICAS = 2;
+    private static final int ROWS = 3;
+
+    private static Cluster cluster;
+
+    @BeforeClass
+    public static void setup() throws IOException
+    {
+        cluster = init(Cluster.build()
+                              .withNodes(REPLICAS)
+                              .withConfig(config -> 
config.set("hinted_handoff_enabled", false)
+                                                          
.set("commitlog_sync", "batch")
+                                                          .set("num_tokens", 
1)).start());
+
+        // Make sure we start w/ the correct defaults:
+        cluster.get(1).runOnInstance(() -> 
assertEquals(DEFAULT_WARN_THRESHOLD, 
StorageService.instance.getCachedReplicaRowsWarnThreshold()));
+        cluster.get(1).runOnInstance(() -> 
assertEquals(DEFAULT_FAIL_THRESHOLD, 
StorageService.instance.getCachedReplicaRowsFailThreshold()));
+    }
+
+    @AfterClass
+    public static void teardown()
+    {
+        cluster.close();
+    }
+
+    @Test
+    public void testMissedUpdatesBelowCachingWarnThreshold()
+    {
+        String tableName = "missed_updates_no_warning";
+        cluster.schemaChange(withKeyspace("CREATE TABLE %s." + tableName + " 
(k int PRIMARY KEY, v text)"));
+
+        // The warning threshold provided is one more than the total number of 
rows returned
+        // to the coordinator from all replicas and therefore should not be 
triggered.
+        testMissedUpdates(tableName, REPLICAS * ROWS, Integer.MAX_VALUE, 
false);
+    }
+
+    @Test
+    public void testMissedUpdatesAboveCachingWarnThreshold()
+    {
+        String tableName = "missed_updates_cache_warn";
+        cluster.schemaChange(withKeyspace("CREATE TABLE %s." + tableName + " 
(k int PRIMARY KEY, v text)"));
+
+        // The warning threshold provided is one less than the total number of 
rows returned
+        // to the coordinator from all replicas and therefore should be 
triggered but not fail the query.
+        testMissedUpdates(tableName, REPLICAS * ROWS - 1, Integer.MAX_VALUE, 
true);
+    }
+
+    @Test
+    public void testMissedUpdatesAroundCachingFailThreshold()
+    {
+        String tableName = "missed_updates_cache_fail";
+        cluster.schemaChange(withKeyspace("CREATE TABLE %s." + tableName + " 
(k int PRIMARY KEY, v text)"));
+
+        // The failure threshold provided is exactly the total number of rows 
returned
+        // to the coordinator from all replicas and therefore should just warn.
+        testMissedUpdates(tableName, 1, REPLICAS * ROWS, true);
+
+        try
+        {
+            // The failure threshold provided is one less than the total 
number of rows returned
+            // to the coordinator from all replicas and therefore should fail 
the query.
+            testMissedUpdates(tableName, 1, REPLICAS * ROWS - 1, true);
+        }
+        catch (RuntimeException e)
+        {
+            assertEquals(e.getCause().getClass().getName(), 
OverloadedException.class.getName());
+        }
+    }
+
+    private void testMissedUpdates(String tableName, int warnThreshold, int 
failThreshold, boolean shouldWarn)
+    {
+        cluster.get(1).runOnInstance(() -> 
StorageService.instance.setCachedReplicaRowsWarnThreshold(warnThreshold));
+        cluster.get(1).runOnInstance(() -> 
StorageService.instance.setCachedReplicaRowsFailThreshold(failThreshold));
+
+        String fullTableName = KEYSPACE + '.' + tableName;
+
+        // Case 1: Insert and query rows at ALL to verify base line.
+        for (int i = 0; i < ROWS; i++)
+        {
+            cluster.coordinator(1).execute("INSERT INTO " + fullTableName + 
"(k, v) VALUES (?, 'old')", ALL, i);
+        }
+
+        long histogramSampleCount = rowsCachedPerQueryCount(cluster.get(1), 
tableName);
+        
+        String query = "SELECT * FROM " + fullTableName + " WHERE v = ? LIMIT 
? ALLOW FILTERING";
+
+        Object[][] initialRows = cluster.coordinator(1).execute(query, ALL, 
"old", ROWS);
+        assertRows(initialRows, row(1, "old"), row(0, "old"), row(2, "old"));
+
+        // Make sure only one sample was recorded for the query.
+        assertEquals(histogramSampleCount + 1, 
rowsCachedPerQueryCount(cluster.get(1), tableName));
+
+        // Case 2: Update all rows on only one replica, leaving the entire 
dataset of the remaining replica out-of-date.
+        updateAllRowsOn(1, fullTableName, "new");
+
+        // The replica that missed the results creates a mismatch at every 
row, and we therefore cache a version
+        // of that row for all replicas.
+        SimpleQueryResult oldResult = 
cluster.coordinator(1).executeWithResult(query, ALL, "old", ROWS);
+        assertRows(oldResult.toObjectArrays());
+        verifyWarningState(shouldWarn, oldResult);
+
+        // We should have made 3 row "completion" requests.
+        assertEquals(ROWS, protectionQueryCount(cluster.get(1), tableName));
+
+        // In all cases above, the queries should be caching 1 row per 
partition per replica, but 
+        // 6 for the whole query, given every row is potentially stale.
+        assertEquals(ROWS * REPLICAS, maxRowsCachedPerQuery(cluster.get(1), 
tableName));
+
+        // Make sure only one more sample was recorded for the query.
+        assertEquals(histogramSampleCount + 2, 
rowsCachedPerQueryCount(cluster.get(1), tableName));
+
+        // Case 3: Observe the effects of blocking read-repair.
+        
+        // The previous query peforms a blocking read-repair, which removes 
replica divergence. This
+        // will only warn, therefore, if the warning threshold is actually 
below the number of replicas.
+        // (i.e. The row cache counter is decremented/reset as each partition 
is consumed.)
+        SimpleQueryResult newResult = 
cluster.coordinator(1).executeWithResult(query, ALL, "new", ROWS);
+        Object[][] newRows = newResult.toObjectArrays();
+        assertRows(newRows, row(1, "new"), row(0, "new"), row(2, "new"));
+        
+        verifyWarningState(warnThreshold < REPLICAS, newResult);
+        
+        // We still sould only have made 3 row "completion" requests, with no 
replica divergence in the last query.
+        assertEquals(ROWS, protectionQueryCount(cluster.get(1), tableName));
+
+        // With no replica divergence, we only cache a single partition at a 
time across 2 replicas.
+        assertEquals(REPLICAS, minRowsCachedPerQuery(cluster.get(1), 
tableName));
+
+        // Make sure only one more sample was recorded for the query.
+        assertEquals(histogramSampleCount + 3, 
rowsCachedPerQueryCount(cluster.get(1), tableName));
+
+        // Case 4: Introduce another mismatch by updating all rows on only one 
replica.
+        
+        updateAllRowsOn(1, fullTableName, "future");
+
+        // Another mismatch is introduced, and we once again cache a version 
of each row during resolution.
+        SimpleQueryResult futureResult = 
cluster.coordinator(1).executeWithResult(query, ALL, "future", ROWS);
+        Object[][] futureRows = futureResult.toObjectArrays();
+        assertRows(futureRows, row(1, "future"), row(0, "future"), row(2, 
"future"));
+        
+        verifyWarningState(shouldWarn, futureResult);
+
+        // We sould have made 3 more row "completion" requests.
+        assertEquals(ROWS * 2, protectionQueryCount(cluster.get(1), 
tableName));
+
+        // In all cases above, the queries should be caching 1 row per 
partition, but 6 for the 
+        // whole query, given every row is potentially stale.
+        assertEquals(ROWS * REPLICAS, maxRowsCachedPerQuery(cluster.get(1), 
tableName));
+
+        // Make sure only one more sample was recorded for the query.
+        assertEquals(histogramSampleCount + 4, 
rowsCachedPerQueryCount(cluster.get(1), tableName));
+    }
+    
+    private void updateAllRowsOn(int node, String table, String value)
+    {
+        for (int i = 0; i < ROWS; i++)
+        {
+            cluster.get(node).executeInternal("UPDATE " + table + " SET v = ? 
WHERE k = ?", value, i);
+        }
+    }
+    
+    private void verifyWarningState(boolean shouldWarn, SimpleQueryResult 
futureResult)
+    {
+        List<String> futureWarnings = futureResult.warnings();
+        assertEquals(shouldWarn, futureWarnings.stream().anyMatch(w -> 
w.contains("cached_replica_rows_warn_threshold")));
+        assertEquals(shouldWarn ? 1 : 0, futureWarnings.size());
+    }
+
+    private long protectionQueryCount(IInvokableInstance instance, String 
tableName)
+    {
+        return instance.callOnInstance(() -> Keyspace.open(KEYSPACE)
+                                                     
.getColumnFamilyStore(tableName)
+                                                     
.metric.replicaFilteringProtectionRequests.getCount());
+    }
+
+    private long maxRowsCachedPerQuery(IInvokableInstance instance, String 
tableName)
+    {
+        return instance.callOnInstance(() -> Keyspace.open(KEYSPACE)
+                                                     
.getColumnFamilyStore(tableName)
+                                                     
.metric.rfpRowsCachedPerQuery.getSnapshot().getMax());
+    }
+
+    private long minRowsCachedPerQuery(IInvokableInstance instance, String 
tableName)
+    {
+        return instance.callOnInstance(() -> Keyspace.open(KEYSPACE)
+                                                     
.getColumnFamilyStore(tableName)
+                                                     
.metric.rfpRowsCachedPerQuery.getSnapshot().getMin());
+    }
+
+    private long rowsCachedPerQueryCount(IInvokableInstance instance, String 
tableName)
+    {
+        return instance.callOnInstance(() -> Keyspace.open(KEYSPACE)
+                                                     
.getColumnFamilyStore(tableName)
+                                                     
.metric.rfpRowsCachedPerQuery.getCount());
+    }
+}
diff --git a/test/unit/org/apache/cassandra/db/rows/EncodingStatsTest.java 
b/test/unit/org/apache/cassandra/db/rows/EncodingStatsTest.java
new file mode 100644
index 0000000..559ce10
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/rows/EncodingStatsTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.rows;
+
+import java.util.function.Function;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.db.LivenessInfo;
+
+public class EncodingStatsTest
+{
+    @Test
+    public void testCollectWithNoStats()
+    {
+        EncodingStats none = EncodingStats.merge(ImmutableList.of(
+        EncodingStats.NO_STATS,
+        EncodingStats.NO_STATS,
+        EncodingStats.NO_STATS
+        ), Function.identity());
+        Assert.assertEquals(none, EncodingStats.NO_STATS);
+    }
+
+    @Test
+    public void testCollectWithNoStatsWithEmpty()
+    {
+        EncodingStats none = EncodingStats.merge(ImmutableList.of(
+        EncodingStats.NO_STATS,
+        EncodingStats.NO_STATS,
+        new EncodingStats(LivenessInfo.NO_TIMESTAMP, 
LivenessInfo.NO_EXPIRATION_TIME, 0)
+        ), Function.identity());
+        Assert.assertEquals(none, EncodingStats.NO_STATS);
+    }
+
+    @Test
+    public void testCollectWithNoStatsWithTimestamp()
+    {
+        EncodingStats single = new EncodingStats(1, 
LivenessInfo.NO_EXPIRATION_TIME, 0);
+        EncodingStats result = EncodingStats.merge(ImmutableList.of(
+        EncodingStats.NO_STATS,
+        EncodingStats.NO_STATS,
+        single,
+        EncodingStats.NO_STATS
+        ), Function.identity());
+        Assert.assertEquals(single, result);
+    }
+
+    @Test
+    public void testCollectWithNoStatsWithExpires()
+    {
+        EncodingStats single = new EncodingStats(LivenessInfo.NO_TIMESTAMP, 1, 
0);
+        EncodingStats result = EncodingStats.merge(ImmutableList.of(
+        EncodingStats.NO_STATS,
+        single,
+        EncodingStats.NO_STATS
+        ), Function.identity());
+        Assert.assertEquals(single, result);
+    }
+
+    @Test
+    public void testCollectWithNoStatsWithTTL()
+    {
+        EncodingStats single = new EncodingStats(LivenessInfo.NO_TIMESTAMP, 
LivenessInfo.NO_EXPIRATION_TIME, 1);
+        EncodingStats result = EncodingStats.merge(ImmutableList.of(
+        EncodingStats.NO_STATS,
+        single,
+        EncodingStats.NO_STATS
+        ), Function.identity());
+        Assert.assertEquals(single, result);
+    }
+
+    @Test
+    public void testCollectOneEach()
+    {
+        EncodingStats tsp = new EncodingStats(1, 
LivenessInfo.NO_EXPIRATION_TIME, 0);
+        EncodingStats exp = new EncodingStats(LivenessInfo.NO_TIMESTAMP, 1, 0);
+        EncodingStats ttl = new EncodingStats(LivenessInfo.NO_TIMESTAMP, 
LivenessInfo.NO_EXPIRATION_TIME, 1);
+        EncodingStats result = EncodingStats.merge(ImmutableList.of(
+        tsp,
+        exp,
+        ttl
+        ), Function.identity());
+        Assert.assertEquals(new EncodingStats(1, 1, 1), result);
+    }
+
+    @Test
+    public void testTimestamp()
+    {
+        EncodingStats one = new EncodingStats(1, 
LivenessInfo.NO_EXPIRATION_TIME, 0);
+        EncodingStats two = new EncodingStats(2, 
LivenessInfo.NO_EXPIRATION_TIME, 0);
+        EncodingStats thr = new EncodingStats(3, 
LivenessInfo.NO_EXPIRATION_TIME, 0);
+        EncodingStats result = EncodingStats.merge(ImmutableList.of(
+        one,
+        two,
+        thr
+        ), Function.identity());
+        Assert.assertEquals(one, result);
+    }
+
+    @Test
+    public void testExpires()
+    {
+        EncodingStats one = new EncodingStats(LivenessInfo.NO_TIMESTAMP,1, 0);
+        EncodingStats two = new EncodingStats(LivenessInfo.NO_TIMESTAMP,2, 0);
+        EncodingStats thr = new EncodingStats(LivenessInfo.NO_TIMESTAMP,3, 0);
+        EncodingStats result = EncodingStats.merge(ImmutableList.of(
+        one,
+        two,
+        thr
+        ), Function.identity());
+        Assert.assertEquals(one, result);
+    }
+
+    @Test
+    public void testTTL()
+    {
+        EncodingStats one = new EncodingStats(LivenessInfo.NO_TIMESTAMP, 
LivenessInfo.NO_EXPIRATION_TIME,1);
+        EncodingStats two = new EncodingStats(LivenessInfo.NO_TIMESTAMP, 
LivenessInfo.NO_EXPIRATION_TIME,2);
+        EncodingStats thr = new EncodingStats(LivenessInfo.NO_TIMESTAMP, 
LivenessInfo.NO_EXPIRATION_TIME,3);
+        EncodingStats result = EncodingStats.merge(ImmutableList.of(
+        thr,
+        one,
+        two
+        ), Function.identity());
+        Assert.assertEquals(one, result);
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/utils/concurrent/AccumulatorTest.java 
b/test/unit/org/apache/cassandra/utils/concurrent/AccumulatorTest.java
index 33daca7..ce0fe27 100644
--- a/test/unit/org/apache/cassandra/utils/concurrent/AccumulatorTest.java
+++ b/test/unit/org/apache/cassandra/utils/concurrent/AccumulatorTest.java
@@ -29,7 +29,7 @@ public class AccumulatorTest
     @Test
     public void testAddMoreThanCapacity()
     {
-        Accumulator<Integer> accu = new Accumulator(4);
+        Accumulator<Integer> accu = new Accumulator<>(4);
 
         accu.add(1);
         accu.add(2);
@@ -50,7 +50,7 @@ public class AccumulatorTest
     @Test
     public void testIsEmptyAndSize()
     {
-        Accumulator<Integer> accu = new Accumulator(4);
+        Accumulator<Integer> accu = new Accumulator<>(4);
 
         assertTrue(accu.isEmpty());
         assertEquals(0, accu.size());
@@ -58,20 +58,20 @@ public class AccumulatorTest
         accu.add(1);
         accu.add(2);
 
-        assertTrue(!accu.isEmpty());
+        assertFalse(accu.isEmpty());
         assertEquals(2, accu.size());
 
         accu.add(3);
         accu.add(4);
 
-        assertTrue(!accu.isEmpty());
+        assertFalse(accu.isEmpty());
         assertEquals(4, accu.size());
     }
 
     @Test
     public void testGetAndIterator()
     {
-        Accumulator<String> accu = new Accumulator(4);
+        Accumulator<String> accu = new Accumulator<>(4);
 
         accu.add("3");
         accu.add("2");
@@ -99,32 +99,32 @@ public class AccumulatorTest
     @Test
     public void testClearUnsafe()
     {
-        Accumulator<String> accu = new Accumulator<>(3);
+        Accumulator<String> accu = new Accumulator<>(5);
 
         accu.add("1");
         accu.add("2");
         accu.add("3");
 
-        accu.clearUnsafe();
+        accu.clearUnsafe(1);
 
-        assertEquals(0, accu.size());
-        assertFalse(accu.iterator().hasNext());
-        assertOutOfBonds(accu, 0);
+        assertEquals(3, accu.size());
+        assertTrue(accu.iterator().hasNext());
 
         accu.add("4");
         accu.add("5");
 
-        assertEquals(2, accu.size());
+        assertEquals(5, accu.size());
 
-        assertEquals("4", accu.get(0));
-        assertEquals("5", accu.get(1));
-        assertOutOfBonds(accu, 2);
+        assertEquals("4", accu.get(3));
+        assertEquals("5", accu.get(4));
+        assertOutOfBonds(accu, 5);
 
         Iterator<String> iter = accu.iterator();
         assertTrue(iter.hasNext());
-        assertEquals("4", iter.next());
-        assertEquals("5", iter.next());
-        assertFalse(iter.hasNext());
+        assertEquals("1", iter.next());
+        assertNull(iter.next());
+        assertTrue(iter.hasNext());
+        assertEquals("3", iter.next());
     }
 
     private static void assertOutOfBonds(Accumulator<String> accumulator, int 
index)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to