blerer commented on a change in pull request #493:
URL: https://github.com/apache/cassandra/pull/493#discussion_r428068126



##########
File path: src/java/org/apache/cassandra/service/reads/DataResolver.java
##########
@@ -103,6 +103,156 @@ public PartitionIterator resolve()
             });
         }
 
+        if (!needsReplicaFilteringProtection())
+        {
+            ResolveContext context = new ResolveContext(replicas);
+            return resolveWithReadRepair(context,
+                                         i -> shortReadProtectedResponse(i, 
context),
+                                         UnaryOperator.identity(),
+                                         repairedDataTracker);
+        }
+
+        return resolveWithReplicaFilteringProtection(replicas, 
repairedDataTracker);
+    }
+
+    private boolean needsReplicaFilteringProtection()
+    {
+        if (command.rowFilter().isEmpty())
+            return false;
+
+        IndexMetadata indexDef = command.indexMetadata();
+        if (indexDef != null && indexDef.isCustom())
+        {
+            String className = 
indexDef.options.get(IndexTarget.CUSTOM_INDEX_OPTION_NAME);
+            return !SASIIndex.class.getName().equals(className);
+        }
+
+        return true;
+    }
+
+    private class ResolveContext
+    {
+        private final E replicas;
+        private final DataLimits.Counter mergedResultCounter;
+
+        private ResolveContext(E replicas)
+        {
+            this.replicas = replicas;
+            this.mergedResultCounter = 
command.limits().newCounter(command.nowInSec(),
+                                                                   true,
+                                                                   
command.selectsFullPartition(),
+                                                                   
enforceStrictLiveness);
+        }
+
+        private boolean needsReadRepair()
+        {
+            return replicas.size() > 1;
+        }
+
+        private boolean needShortReadProtection()
+        {
+            // If we have only one result, there is no read repair to do and 
we can't get short reads
+            // Also, so-called "short reads" stems from nodes returning only a 
subset of the results they have for a
+            // partition due to the limit, but that subset not being enough 
post-reconciliation. So if we don't have limit,
+            // don't bother protecting against short reads.
+            return replicas.size() > 1 && !command.limits().isUnlimited();
+        }
+    }
+
+    @FunctionalInterface
+    private interface ResponseProvider
+    {
+        UnfilteredPartitionIterator getResponse(int i);
+    }
+
+    private UnfilteredPartitionIterator shortReadProtectedResponse(int i, 
ResolveContext context)
+    {
+        UnfilteredPartitionIterator originalResponse = 
responses.get(i).payload.makeIterator(command);
+
+        return context.needShortReadProtection()
+               ? ShortReadProtection.extend(context.replicas.get(i),
+                                            originalResponse,
+                                            command,
+                                            context.mergedResultCounter,
+                                            queryStartNanoTime,
+                                            enforceStrictLiveness)
+               : originalResponse;
+    }
+
+    private PartitionIterator resolveWithReadRepair(ResolveContext context,
+                                                    ResponseProvider 
responseProvider,
+                                                    
UnaryOperator<PartitionIterator> preCountFilter,
+                                                    RepairedDataTracker 
repairedDataTracker)
+    {
+        UnfilteredPartitionIterators.MergeListener listener = null;
+        if (context.needsReadRepair())
+        {
+            P sources = replicaPlan.getWithContacts(context.replicas);
+            listener = wrapMergeListener(readRepair.getMergeListener(sources), 
sources, repairedDataTracker);
+        }
+
+        return resolveInternal(context, listener, responseProvider, 
preCountFilter);
+    }
+
+    @SuppressWarnings("resource")
+    private PartitionIterator resolveWithReplicaFilteringProtection(E 
replicas, RepairedDataTracker repairedDataTracker)
+    {
+        // Protecting against inconsistent replica filtering (some replica 
returning a row that is outdated but that
+        // wouldn't be removed by normal reconciliation because up-to-date 
replica have filtered the up-to-date version
+        // of that row) works in 3 steps:
+        //   1) we read the full response just to collect rows that may be 
outdated (the ones we got from some
+        //      replica but didn't got any response for other; it could be 
those other replica have filtered a more
+        //      up-to-date result). In doing so, we do not count any of such 
"potentially outdated" row towards the
+        //      query limit. This simulate the worst case scenario where all 
those "potentially outdated" rows are
+        //      indeed outdated, and thus make sure we are guaranteed to read 
enough results (thanks to short read
+        //      protection).
+        //   2) we query all the replica/rows we need to rule out whether 
those "potentially outdated" rows are outdated
+        //      or not.
+        //   3) we re-read cached copies of each replica response using the 
"normal" read path merge with read-repair,
+        //      but where for each replica we use their original response 
_plus_ the additional rows queried in the
+        //      previous step (and apply the command#rowFilter() on the full 
result). Since the first phase has
+        //      pessimistically collected enough results for the case where 
all potentially outdated results are indeed
+        //      outdated, we shouldn't need further short-read protection 
requests during this phase.
+
+        // We need separate contexts, as each context has his own counter
+        ResolveContext firstPhaseContext = new ResolveContext(replicas);
+        ResolveContext secondPhaseContext = new ResolveContext(replicas);
+        ReplicaFilteringProtection<E> rfp = new 
ReplicaFilteringProtection<>(replicaPlan().keyspace(),
+                                                                             
command,
+                                                                             
replicaPlan().consistencyLevel(),
+                                                                             
queryStartNanoTime,
+                                                                             
firstPhaseContext.replicas);
+        PartitionIterator firstPhasePartitions = 
resolveInternal(firstPhaseContext,
+                                                                 
rfp.mergeController(),
+                                                                 i -> 
shortReadProtectedResponse(i, firstPhaseContext),
+                                                                 
UnaryOperator.identity());
+
+        // Consume the fist phase partitions to populate the replica filtering 
protection with both those materialized
+        // partitions and the primarey keys to be fetched.

Review comment:
       primarey -> primary




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org

Reply via email to