aweisberg commented on code in PR #3777:
URL: https://github.com/apache/cassandra/pull/3777#discussion_r1929248969
##########
src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java:
##########
@@ -234,123 +227,125 @@ public EndpointsForToken
forNonLocalStrategyTokenRead(ClusterMetadata doNotUse,
public void sendReadCommand(Message<ReadCommand> message,
InetAddressAndPort to, RequestCallback<ReadResponse> callback)
{
Node.Id id = endpointMapper.mappedId(to);
- SinglePartitionReadCommand command = (SinglePartitionReadCommand)
message.payload;
+ // TODO (nicetohave): It would be better to use the re-use the command
from the transaction but it's fragile
+ // to try and figure out exactly what changed for things like read
repair and short read protection
+ // Also this read scope doesn't reflect the contents of this
particular read and is larger than it needs to be
// TODO (required): understand interop and whether StableFastPath is
appropriate
- AccordInteropStableThenRead commit = new
AccordInteropStableThenRead(id, allTopologies, txnId, Kind.StableFastPath,
executeAt, txn, deps, route, command);
+ AccordInteropStableThenRead commit = new
AccordInteropStableThenRead(id, allTopologies, txnId, Kind.StableFastPath,
executeAt, txn, deps, route, message.payload);
node.send(id, commit, executor, new AccordInteropRead.ReadCallback(id,
to, message, callback, this));
}
@Override
public void sendReadRepairMutation(Message<Mutation> message,
InetAddressAndPort to, RequestCallback<Object> callback)
{
- checkArgument(message.payload.allowsPotentialTransactionConflicts());
+ checkArgument(message.payload.potentialTxnConflicts().allowed);
+ checkArgument(message.payload.getTableIds().size() == 1);
Node.Id id = endpointMapper.mappedId(to);
+ Participants<?> readScope =
Participants.singleton(txn.read().keys().domain(), new
TokenKey(message.payload.getTableIds().iterator().next(),
message.payload.key().getToken()));
AccordInteropReadRepair readRepair = new AccordInteropReadRepair(id,
executes, txnId, readScope, executeAt.epoch(), message.payload);
node.send(id, readRepair, executor, new
AccordInteropReadRepair.ReadRepairCallback(id, to, message, callback, this));
}
- private List<AsyncChain<Data>> keyReadChains(int nowInSeconds,
Dispatcher.RequestTime requestTime)
+ private List<AsyncChain<Data>> readChains(Dispatcher.RequestTime
requestTime)
{
- TxnKeyRead read = (TxnKeyRead) txn.read();
- List<AsyncChain<Data>> results = new ArrayList<>();
+ TxnRead read = (TxnRead) txn.read();
Seekables<?, ?> keys = txn.read().keys();
+ switch (keys.domain())
+ {
+ case Key:
+ return keyReadChains(read, keys, requestTime);
+ case Range:
+ return rangeReadChains(read, keys, requestTime);
+ default:
+ throw new IllegalStateException("Unhandled domain " +
keys.domain());
+ }
+ }
+
+ private List<AsyncChain<Data>> keyReadChains(TxnRead read, Seekables<?, ?>
keys, Dispatcher.RequestTime requestTime)
+ {
+ ClusterMetadata cm = ClusterMetadata.current();
+ List<AsyncChain<Data>> results = new ArrayList<>();
+ keys.forEach(key -> {
+ read.forEachWithKey((PartitionKey) key, fragment -> {
+ SinglePartitionReadCommand command =
(SinglePartitionReadCommand) fragment.command();
+
+ // This should only rarely occur when
coordinators start a transaction in a migrating range
+ // because they haven't yet updated their cluster
metadata.
+ // It would be harmless to do the read, because
it will be rejected in `TxnQuery` anyways,
+ // but it's faster to skip the read
+ AccordClientRequestMetrics metrics =
txn.kind().isWrite() ? accordWriteMetrics : accordReadMetrics;
+ // TODO (required): This doesn't use the metadata
from the correct epoch
Review Comment:
Live migration is built around the decision making for whether to execute on
Accord or not being correctly tied to TCM epochs and the `TableMigrationState`
stored in TCM containing what ranges are migrating. It's not like add/drop
table because there is no secondary set of data that it is dependent on like
`Schema.instance` it's completely self contained in that in the cluster
metadata.
As long as you can get access to the correct version of the
`TableMigrationState` then Accord can make the correct decision. There is
actually now a way to get older epochs it's just not very efficient, but it's
probably fast enough for the few times this ends up being an issue. I just
haven't switched over yet.
Accord picks an execution timestamp and that determine the epoch of a
transaction. Paxos has each node vote on it as part of running Paxos.
Non-Paxos/Non-Accord writes are both voting on whether the write needs to be
retried on Accord while also blocking local application of the write at nodes
where Accord may have already started reading.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]