dcapwell commented on code in PR #3777:
URL: https://github.com/apache/cassandra/pull/3777#discussion_r1929148218


##########
test/distributed/org/apache/cassandra/distributed/test/QueriesTableTest.java:
##########
@@ -240,12 +248,14 @@ public static class QueryDelayHelper
 
         static void install(ClassLoader cl, int nodeNumber)
         {
+            
checkState(Arrays.stream(Mutation.class.getDeclaredMethods()).anyMatch(method 
-> method.getName().equals("apply") && method.getParameterCount() == 3));

Review Comment:
   i really wish BB would fail in these cases... 😢 



##########
src/java/org/apache/cassandra/service/StorageProxy.java:
##########
@@ -2576,6 +2664,7 @@ protected void runMayThrow()
 
                 boolean readRejected = false;
                 long deadline = 
requestTime.computeDeadline(verb.expiresAfterNanos());
+                long now = Clock.Global.nanoTime();

Review Comment:
   dead code?



##########
src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java:
##########
@@ -234,123 +227,125 @@ public EndpointsForToken 
forNonLocalStrategyTokenRead(ClusterMetadata doNotUse,
     public void sendReadCommand(Message<ReadCommand> message, 
InetAddressAndPort to, RequestCallback<ReadResponse> callback)
     {
         Node.Id id = endpointMapper.mappedId(to);
-        SinglePartitionReadCommand command = (SinglePartitionReadCommand) 
message.payload;
+        // TODO (nicetohave): It would be better to use the re-use the command 
from the transaction but it's fragile
+        // to try and figure out exactly what changed for things like read 
repair and short read protection
+        // Also this read scope doesn't reflect the contents of this 
particular read and is larger than it needs to be
         // TODO (required): understand interop and whether StableFastPath is 
appropriate
-        AccordInteropStableThenRead commit = new 
AccordInteropStableThenRead(id, allTopologies, txnId, Kind.StableFastPath, 
executeAt, txn, deps, route, command);
+        AccordInteropStableThenRead commit = new 
AccordInteropStableThenRead(id, allTopologies, txnId, Kind.StableFastPath, 
executeAt, txn, deps, route, message.payload);
         node.send(id, commit, executor, new AccordInteropRead.ReadCallback(id, 
to, message, callback, this));
     }
 
     @Override
     public void sendReadRepairMutation(Message<Mutation> message, 
InetAddressAndPort to, RequestCallback<Object> callback)
     {
-        checkArgument(message.payload.allowsPotentialTransactionConflicts());
+        checkArgument(message.payload.potentialTxnConflicts().allowed);
+        checkArgument(message.payload.getTableIds().size() == 1);
         Node.Id id = endpointMapper.mappedId(to);
+        Participants<?> readScope = 
Participants.singleton(txn.read().keys().domain(), new 
TokenKey(message.payload.getTableIds().iterator().next(), 
message.payload.key().getToken()));
         AccordInteropReadRepair readRepair = new AccordInteropReadRepair(id, 
executes, txnId, readScope, executeAt.epoch(), message.payload);
         node.send(id, readRepair, executor, new 
AccordInteropReadRepair.ReadRepairCallback(id, to, message, callback, this));
     }
 
-    private List<AsyncChain<Data>> keyReadChains(int nowInSeconds, 
Dispatcher.RequestTime requestTime)
+    private List<AsyncChain<Data>> readChains(Dispatcher.RequestTime 
requestTime)
     {
-        TxnKeyRead read = (TxnKeyRead) txn.read();
-        List<AsyncChain<Data>> results = new ArrayList<>();
+        TxnRead read = (TxnRead) txn.read();
         Seekables<?, ?> keys = txn.read().keys();
+        switch (keys.domain())
+        {
+            case Key:
+                return keyReadChains(read, keys, requestTime);
+            case Range:
+                return rangeReadChains(read, keys, requestTime);
+            default:
+                throw new IllegalStateException("Unhandled domain " + 
keys.domain());
+        }
+    }
+
+    private List<AsyncChain<Data>> keyReadChains(TxnRead read, Seekables<?, ?> 
keys, Dispatcher.RequestTime requestTime)
+    {
+        ClusterMetadata cm = ClusterMetadata.current();
+        List<AsyncChain<Data>> results = new ArrayList<>();
+        keys.forEach(key -> {
+                         read.forEachWithKey((PartitionKey) key, fragment -> {
+                             SinglePartitionReadCommand command = 
(SinglePartitionReadCommand) fragment.command();
+
+                             // This should only rarely occur when 
coordinators start a transaction in a migrating range
+                             // because they haven't yet updated their cluster 
metadata.
+                             // It would be harmless to do the read, because 
it will be rejected in `TxnQuery` anyways,
+                             // but it's faster to skip the read
+                             AccordClientRequestMetrics metrics = 
txn.kind().isWrite() ? accordWriteMetrics : accordReadMetrics;
+                             // TODO (required): This doesn't use the metadata 
from the correct epoch

Review Comment:
   this pattern is all over the place, so i do wonder about the intended 
semantic... if a range is part of accord at epoch=N but not epoch=N+1 and we 
are executing in epoch=N but locally we know epoch=N+1 should we allow 
progress?  If so we need similar logic as drop tables that makes the TCM change 
multi staged and blocks waiting on everything allowed for that epoch.



##########
src/java/org/apache/cassandra/service/reads/range/RangeCommandIterator.java:
##########
@@ -187,6 +193,47 @@ static int computeConcurrencyFactor(int totalRangeCount, 
int rangesQueried, int
         return concurrencyFactor;
     }
 
+    private PartitionIterator executeAccord(ClusterMetadata cm, 
PartitionRangeReadCommand rangeCommand, ConsistencyLevel cl)
+    {
+        //TODO (nicetohave): 
https://issues.apache.org/jira/browse/CASSANDRA-20210 More efficient reads 
across command stores
+        AsyncTxnResult result = StorageProxy.readWithAccord(cm, rangeCommand, 
rangeCommand.dataRange().keyRange(), cl, requestTime);
+        return new AccordRangeResponse(result, rangeCommand.isReversed(), cl, 
requestTime);
+    }
+
+    private SingleRangeResponse executeNormal(ReplicaPlan.ForRangeRead 
replicaPlan, PartitionRangeReadCommand rangeCommand, ReadCoordinator 
readCoordinator)
+    {
+        rangeCommand = (PartitionRangeReadCommand) 
readCoordinator.maybeAllowOutOfRangeReads(rangeCommand, 
replicaPlan.consistencyLevel());
+        // If enabled, request repaired data tracking info from full replicas, 
but
+        // only if there are multiple full replicas to compare results from.
+        boolean trackRepairedStatus = 
DatabaseDescriptor.getRepairedDataTrackingForRangeReadsEnabled()
+                                      && 
replicaPlan.contacts().filter(Replica::isFull).size() > 1;
+
+        ReplicaPlan.SharedForRangeRead sharedReplicaPlan = 
ReplicaPlan.shared(replicaPlan);
+        ReadRepair<EndpointsForRange, ReplicaPlan.ForRangeRead> readRepair =
+        ReadRepair.create(readCoordinator, command, sharedReplicaPlan, 
requestTime);
+        DataResolver<EndpointsForRange, ReplicaPlan.ForRangeRead> resolver =
+        new DataResolver<>(readCoordinator, rangeCommand, sharedReplicaPlan, 
readRepair, requestTime, trackRepairedStatus);
+        ReadCallback<EndpointsForRange, ReplicaPlan.ForRangeRead> handler =
+        new ReadCallback<>(resolver, rangeCommand, sharedReplicaPlan, 
requestTime);
+
+        if (replicaPlan.contacts().size() == 1 && 
replicaPlan.contacts().get(0).isSelf() && readCoordinator.localReadSupported())

Review Comment:
   why `replicaPlan.contacts().size() == 1`?  don't we normally handle this in 
the loop when `replicaPlan.contacts().get(0).isSelf() && 
readCoordinator.localReadSupported()`?



##########
src/java/org/apache/cassandra/service/accord/interop/AccordInteropRead.java:
##########
@@ -108,27 +132,76 @@ public LocalReadData deserialize(DataInputPlus in, int 
version) throws IOExcepti
             @Override
             public long serializedSize(LocalReadData data, int version)
             {
-                return ReadResponse.serializer.serializedSize(data.response, 
version);
+                data.ensureRemoteResponse();
+                return 
ReadResponse.serializer.serializedSize(data.remoteResponse, version);
             }
         };
 
-        final ReadResponse response;
+        // Will be null at coordinator
+        List<Pair<AccordRoutingKey, ReadResponse>> localResponses;
+        // Will be null at coordinator
+        final ReadCommand readCommand;
+        // Will be not null at coordinator, but null at the node creating the 
response until it serialized
+        ReadResponse remoteResponse;
 
-        public LocalReadData(ReadResponse response)
+        public LocalReadData(@Nullable AccordRoutingKey start, @Nonnull 
ReadResponse response, @Nonnull ReadCommand readCommand)
         {
-            this.response = response;
+            checkNotNull(response, "response is null");
+            checkNotNull(readCommand, "readCommand is null");
+            localResponses = ImmutableList.of(Pair.create(start, response));
+            this.readCommand = readCommand;
+            this.remoteResponse = null;
+        }
+
+        public LocalReadData(@Nonnull ReadResponse remoteResponse)
+        {
+            checkNotNull(remoteResponse);
+            this.remoteResponse = remoteResponse;
+            readCommand = null;
         }
 
         @Override
         public String toString()
         {
-            return "LocalReadData{" + response + '}';
+            if (localResponses != null)
+               return "LocalReadData{" + localResponses + '}';
+            else
+                return "LocalReadData{" + remoteResponse + '}';
         }
 
         @Override
         public Data merge(Data data)
         {
-            throw new IllegalStateException("Should only ever be a single 
partition");
+            checkState(remoteResponse == null, "Already serialized");
+            checkState(readCommand.isRangeRequest(), "Should only ever be a 
single partition");
+            LocalReadData other = (LocalReadData)data;
+            checkState(readCommand == other.readCommand, "Should share the 
same ReadCommand");
+            if (localResponses.size() == 1)

Review Comment:
   isn't this nullable?  this condition looks to try to handle `public 
LocalReadData(@Nullable AccordRoutingKey start, @Nonnull ReadResponse response, 
@Nonnull ReadCommand readCommand)` where it does
   
   ```
   localResponses = ImmutableList.of(Pair.create(start, response));
   ```
   
   but it doesn't make too much sense in other code paths?   wouldn't it be 
less code to just make that constructor use a mutable list so we could avoid 
this condition?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to