aweisberg commented on code in PR #3777:
URL: https://github.com/apache/cassandra/pull/3777#discussion_r1915654176


##########
src/java/org/apache/cassandra/service/reads/range/RangeCommandIterator.java:
##########
@@ -196,72 +252,115 @@ static int computeConcurrencyFactor(int totalRangeCount, 
int rangesQueried, int
      * {@code DataLimits}) may have "state" information and that state may 
only be valid for the first query (in
      * that it's the query that "continues" whatever we're previously queried).
      */
-    private IRangeResponse query(ReplicaPlan.ForRangeRead replicaPlan, 
ReadCoordinator readCoordinator, boolean isFirst)
+    private PartitionIterator query(ClusterMetadata cm, 
ReplicaPlan.ForRangeRead replicaPlan, ReadCoordinator readCoordinator, 
List<ReadRepair<?, ?>> readRepairs, boolean isFirst)
     {
         PartitionRangeReadCommand rangeCommand = 
command.forSubRange(replicaPlan.range(), isFirst);
-        
-        // If enabled, request repaired data tracking info from full replicas, 
but
-        // only if there are multiple full replicas to compare results from.
-        boolean trackRepairedStatus = 
DatabaseDescriptor.getRepairedDataTrackingForRangeReadsEnabled()
-                                      && 
replicaPlan.contacts().filter(Replica::isFull).size() > 1;
 
-        ClusterMetadata cm = ClusterMetadata.current();
-        TableMetadata metadata = command.metadata();
-        TableParams tableParams = metadata.params;
-        TransactionalMode transactionalMode = tableParams.transactionalMode;
-        TransactionalMigrationFromMode transactionalMigrationFromMode = 
tableParams.transactionalMigrationFrom;
-        if (transactionalMigrationFromMode != 
TransactionalMigrationFromMode.none && 
transactionalMode.nonSerialReadsThroughAccord && 
transactionalMigrationFromMode.nonSerialWritesThroughAccord() && 
transactionalMigrationFromMode.nonSerialReadsThroughAccord())
-            throw new UnsupportedOperationException("Live migration is not 
supported, can't safely read when migrating from " + 
transactionalMigrationFromMode + " to " + transactionalMode);
-        if (transactionalMode.nonSerialReadsThroughAccord && 
readCoordinator.isEventuallyConsistent())
+        // Accord interop execution should always be coordinated through the 
C* plumbing
+        if (!readCoordinator.isEventuallyConsistent())
         {
-            //TODO (nicetohave): This is very inefficient because it will not 
map the the command store owned ranges
-            // so every command store will return results and most will be 
discarded due to the limit
-            // Really we want to split the ranges by command stores owned 
ranges and then query one at a time
-            AsyncTxnResult result = StorageProxy.readWithAccord(cm, 
rangeCommand, ImmutableList.of(rangeCommand.dataRange().keyRange()), 
replicaPlan.consistencyLevel(), requestTime);
-            return new AccordRangeResponse(result, rangeCommand.isReversed(), 
replicaPlan.consistencyLevel(), requestTime);
+            SingleRangeResponse response = executeNormal(replicaPlan, 
rangeCommand, readCoordinator);
+            readRepairs.add(response.getReadRepair());
+            return response;
         }
-        else
+
+        List<RangeReadWithTarget> reads = 
ConsensusRequestRouter.splitReadIntoAccordAndNormal(cm, rangeCommand, 
readCoordinator, requestTime);
+        // Special case returning directly to avoid wrapping the iterator and 
applying the limits an extra time
+        if (reads.size() == 1)
         {
-            ReplicaPlan.SharedForRangeRead sharedReplicaPlan = 
ReplicaPlan.shared(replicaPlan);
-            ReadRepair<EndpointsForRange, ReplicaPlan.ForRangeRead> readRepair 
=
-            ReadRepair.create(ReadCoordinator.DEFAULT, command, 
sharedReplicaPlan, requestTime);
-            DataResolver<EndpointsForRange, ReplicaPlan.ForRangeRead> resolver 
=
-            new DataResolver<>(ReadCoordinator.DEFAULT, rangeCommand, 
sharedReplicaPlan, readRepair, requestTime, trackRepairedStatus);
-            ReadCallback<EndpointsForRange, ReplicaPlan.ForRangeRead> handler =
-            new ReadCallback<>(resolver, rangeCommand, sharedReplicaPlan, 
requestTime);
-
-            if (replicaPlan.contacts().size() == 1 && 
replicaPlan.contacts().get(0).isSelf())
+            RangeReadWithTarget rangeReadWithTarget = reads.get(0);
+            
checkState(rangeReadWithTarget.read.dataRange().keyRange().equals(rangeCommand.dataRange().keyRange()));
+            if (rangeReadWithTarget.target == RangeReadTarget.accord && 
readCoordinator.isEventuallyConsistent())
             {
-                Stage.READ.execute(new 
StorageProxy.LocalReadRunnable(rangeCommand, handler, requestTime, 
trackRepairedStatus));
+                return executeAccord(cm,
+                                     rangeReadWithTarget.read,
+                                     replicaPlan.consistencyLevel());
             }
             else
             {
-                for (Replica replica : replicaPlan.contacts())
-                {
-                    Tracing.trace("Enqueuing request to {}", replica);
-                    ReadCommand command = replica.isFull() ? rangeCommand : 
rangeCommand.copyAsTransientQuery(replica);
-                    Message<ReadCommand> message = 
command.createMessage(trackRepairedStatus && replica.isFull(), requestTime);
-                    MessagingService.instance().sendWithCallback(message, 
replica.endpoint(), handler);
-                }
+                SingleRangeResponse response = executeNormal(replicaPlan, 
rangeReadWithTarget.read, readCoordinator);
+                readRepairs.add(response.getReadRepair());
+                return response;
             }
-            return new CassandraRangeResponse(resolver, handler, readRepair);
         }
+
+        // TODO (review): Should this be reworked to execute the queries 
serially from the iterator? It would respect

Review Comment:
   Yeah I will remove this now that you have reviewed. I didn't do it because I 
thought it would cause timeouts especially as the number of command stores (and 
thus independent txns) increases. Trying to do it serially from a single txn 
would require more work in Accord to allow it without unnecessarily causing 
reads to block other transactions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to