belliottsmith commented on code in PR #3777:
URL: https://github.com/apache/cassandra/pull/3777#discussion_r1915760303


##########
src/java/org/apache/cassandra/service/reads/range/RangeCommandIterator.java:
##########
@@ -196,72 +252,115 @@ static int computeConcurrencyFactor(int totalRangeCount, 
int rangesQueried, int
      * {@code DataLimits}) may have "state" information and that state may 
only be valid for the first query (in
      * that it's the query that "continues" whatever we're previously queried).
      */
-    private IRangeResponse query(ReplicaPlan.ForRangeRead replicaPlan, 
ReadCoordinator readCoordinator, boolean isFirst)
+    private PartitionIterator query(ClusterMetadata cm, 
ReplicaPlan.ForRangeRead replicaPlan, ReadCoordinator readCoordinator, 
List<ReadRepair<?, ?>> readRepairs, boolean isFirst)
     {
         PartitionRangeReadCommand rangeCommand = 
command.forSubRange(replicaPlan.range(), isFirst);
-        
-        // If enabled, request repaired data tracking info from full replicas, 
but
-        // only if there are multiple full replicas to compare results from.
-        boolean trackRepairedStatus = 
DatabaseDescriptor.getRepairedDataTrackingForRangeReadsEnabled()
-                                      && 
replicaPlan.contacts().filter(Replica::isFull).size() > 1;
 
-        ClusterMetadata cm = ClusterMetadata.current();
-        TableMetadata metadata = command.metadata();
-        TableParams tableParams = metadata.params;
-        TransactionalMode transactionalMode = tableParams.transactionalMode;
-        TransactionalMigrationFromMode transactionalMigrationFromMode = 
tableParams.transactionalMigrationFrom;
-        if (transactionalMigrationFromMode != 
TransactionalMigrationFromMode.none && 
transactionalMode.nonSerialReadsThroughAccord && 
transactionalMigrationFromMode.nonSerialWritesThroughAccord() && 
transactionalMigrationFromMode.nonSerialReadsThroughAccord())
-            throw new UnsupportedOperationException("Live migration is not 
supported, can't safely read when migrating from " + 
transactionalMigrationFromMode + " to " + transactionalMode);
-        if (transactionalMode.nonSerialReadsThroughAccord && 
readCoordinator.isEventuallyConsistent())
+        // Accord interop execution should always be coordinated through the 
C* plumbing
+        if (!readCoordinator.isEventuallyConsistent())
         {
-            //TODO (nicetohave): This is very inefficient because it will not 
map the the command store owned ranges
-            // so every command store will return results and most will be 
discarded due to the limit
-            // Really we want to split the ranges by command stores owned 
ranges and then query one at a time
-            AsyncTxnResult result = StorageProxy.readWithAccord(cm, 
rangeCommand, ImmutableList.of(rangeCommand.dataRange().keyRange()), 
replicaPlan.consistencyLevel(), requestTime);
-            return new AccordRangeResponse(result, rangeCommand.isReversed(), 
replicaPlan.consistencyLevel(), requestTime);
+            SingleRangeResponse response = executeNormal(replicaPlan, 
rangeCommand, readCoordinator);
+            readRepairs.add(response.getReadRepair());
+            return response;
         }
-        else
+
+        List<RangeReadWithTarget> reads = 
ConsensusRequestRouter.splitReadIntoAccordAndNormal(cm, rangeCommand, 
readCoordinator, requestTime);
+        // Special case returning directly to avoid wrapping the iterator and 
applying the limits an extra time
+        if (reads.size() == 1)
         {
-            ReplicaPlan.SharedForRangeRead sharedReplicaPlan = 
ReplicaPlan.shared(replicaPlan);
-            ReadRepair<EndpointsForRange, ReplicaPlan.ForRangeRead> readRepair 
=
-            ReadRepair.create(ReadCoordinator.DEFAULT, command, 
sharedReplicaPlan, requestTime);
-            DataResolver<EndpointsForRange, ReplicaPlan.ForRangeRead> resolver 
=
-            new DataResolver<>(ReadCoordinator.DEFAULT, rangeCommand, 
sharedReplicaPlan, readRepair, requestTime, trackRepairedStatus);
-            ReadCallback<EndpointsForRange, ReplicaPlan.ForRangeRead> handler =
-            new ReadCallback<>(resolver, rangeCommand, sharedReplicaPlan, 
requestTime);
-
-            if (replicaPlan.contacts().size() == 1 && 
replicaPlan.contacts().get(0).isSelf())
+            RangeReadWithTarget rangeReadWithTarget = reads.get(0);
+            
checkState(rangeReadWithTarget.read.dataRange().keyRange().equals(rangeCommand.dataRange().keyRange()));
+            if (rangeReadWithTarget.target == RangeReadTarget.accord && 
readCoordinator.isEventuallyConsistent())
             {
-                Stage.READ.execute(new 
StorageProxy.LocalReadRunnable(rangeCommand, handler, requestTime, 
trackRepairedStatus));
+                return executeAccord(cm,
+                                     rangeReadWithTarget.read,
+                                     replicaPlan.consistencyLevel());
             }
             else
             {
-                for (Replica replica : replicaPlan.contacts())
-                {
-                    Tracing.trace("Enqueuing request to {}", replica);
-                    ReadCommand command = replica.isFull() ? rangeCommand : 
rangeCommand.copyAsTransientQuery(replica);
-                    Message<ReadCommand> message = 
command.createMessage(trackRepairedStatus && replica.isFull(), requestTime);
-                    MessagingService.instance().sendWithCallback(message, 
replica.endpoint(), handler);
-                }
+                SingleRangeResponse response = executeNormal(replicaPlan, 
rangeReadWithTarget.read, readCoordinator);
+                readRepairs.add(response.getReadRepair());
+                return response;
             }
-            return new CassandraRangeResponse(resolver, handler, readRepair);
         }
+
+        // TODO (review): Should this be reworked to execute the queries 
serially from the iterator? It would respect

Review Comment:
   I think this should be fairly straightforward to do in Accord actually; we 
don't unblock other transactions until this one notifies via distributed Apply 
message anyway, so performing the read serially doesn't really change that. The 
only reason this might slow things down is we aren't loading transaction data 
from as many command stores in one go. But for a range query this is probably 
acceptable and maybe desirable? You don't really want to query all N command 
stores when you expect the first matching one to return enough data...?
   
   That said, we're running out of time so probably still something for the 
future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to