aweisberg commented on code in PR #3777:
URL: https://github.com/apache/cassandra/pull/3777#discussion_r1929248969


##########
src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java:
##########
@@ -234,123 +227,125 @@ public EndpointsForToken 
forNonLocalStrategyTokenRead(ClusterMetadata doNotUse,
     public void sendReadCommand(Message<ReadCommand> message, 
InetAddressAndPort to, RequestCallback<ReadResponse> callback)
     {
         Node.Id id = endpointMapper.mappedId(to);
-        SinglePartitionReadCommand command = (SinglePartitionReadCommand) 
message.payload;
+        // TODO (nicetohave): It would be better to use the re-use the command 
from the transaction but it's fragile
+        // to try and figure out exactly what changed for things like read 
repair and short read protection
+        // Also this read scope doesn't reflect the contents of this 
particular read and is larger than it needs to be
         // TODO (required): understand interop and whether StableFastPath is 
appropriate
-        AccordInteropStableThenRead commit = new 
AccordInteropStableThenRead(id, allTopologies, txnId, Kind.StableFastPath, 
executeAt, txn, deps, route, command);
+        AccordInteropStableThenRead commit = new 
AccordInteropStableThenRead(id, allTopologies, txnId, Kind.StableFastPath, 
executeAt, txn, deps, route, message.payload);
         node.send(id, commit, executor, new AccordInteropRead.ReadCallback(id, 
to, message, callback, this));
     }
 
     @Override
     public void sendReadRepairMutation(Message<Mutation> message, 
InetAddressAndPort to, RequestCallback<Object> callback)
     {
-        checkArgument(message.payload.allowsPotentialTransactionConflicts());
+        checkArgument(message.payload.potentialTxnConflicts().allowed);
+        checkArgument(message.payload.getTableIds().size() == 1);
         Node.Id id = endpointMapper.mappedId(to);
+        Participants<?> readScope = 
Participants.singleton(txn.read().keys().domain(), new 
TokenKey(message.payload.getTableIds().iterator().next(), 
message.payload.key().getToken()));
         AccordInteropReadRepair readRepair = new AccordInteropReadRepair(id, 
executes, txnId, readScope, executeAt.epoch(), message.payload);
         node.send(id, readRepair, executor, new 
AccordInteropReadRepair.ReadRepairCallback(id, to, message, callback, this));
     }
 
-    private List<AsyncChain<Data>> keyReadChains(int nowInSeconds, 
Dispatcher.RequestTime requestTime)
+    private List<AsyncChain<Data>> readChains(Dispatcher.RequestTime 
requestTime)
     {
-        TxnKeyRead read = (TxnKeyRead) txn.read();
-        List<AsyncChain<Data>> results = new ArrayList<>();
+        TxnRead read = (TxnRead) txn.read();
         Seekables<?, ?> keys = txn.read().keys();
+        switch (keys.domain())
+        {
+            case Key:
+                return keyReadChains(read, keys, requestTime);
+            case Range:
+                return rangeReadChains(read, keys, requestTime);
+            default:
+                throw new IllegalStateException("Unhandled domain " + 
keys.domain());
+        }
+    }
+
+    private List<AsyncChain<Data>> keyReadChains(TxnRead read, Seekables<?, ?> 
keys, Dispatcher.RequestTime requestTime)
+    {
+        ClusterMetadata cm = ClusterMetadata.current();
+        List<AsyncChain<Data>> results = new ArrayList<>();
+        keys.forEach(key -> {
+                         read.forEachWithKey((PartitionKey) key, fragment -> {
+                             SinglePartitionReadCommand command = 
(SinglePartitionReadCommand) fragment.command();
+
+                             // This should only rarely occur when 
coordinators start a transaction in a migrating range
+                             // because they haven't yet updated their cluster 
metadata.
+                             // It would be harmless to do the read, because 
it will be rejected in `TxnQuery` anyways,
+                             // but it's faster to skip the read
+                             AccordClientRequestMetrics metrics = 
txn.kind().isWrite() ? accordWriteMetrics : accordReadMetrics;
+                             // TODO (required): This doesn't use the metadata 
from the correct epoch

Review Comment:
   Live migration is built around the decision making for whether to execute on 
Accord or not being correctly tied to TCM epochs and the `TableMigrationState` 
stored in TCM containing what ranges are migrating. It's not like add/drop 
table because there is no secondary set of data that it is dependent on like 
`Schema.instance` it's completely self contained in that in the cluster 
metadata.
   
   As long as you can get access to the correct version of the 
`TableMigrationState` then Accord can make the correct decision. There is 
actually now a way to get older epochs it's just not very efficient, but it's 
probably fast enough for the few times this ends up being an issue. I just 
haven't switched it over yet.
   
   Accord picks an execution timestamp and that determine the epoch of a 
transaction. Paxos has each node vote on it as part of running Paxos. 
Non-Paxos/Non-Accord writes are both voting on whether the write needs to be 
retried on Accord while also blocking local application of the write at nodes 
where Accord may have already started reading.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to