dcapwell commented on code in PR #3777:
URL: https://github.com/apache/cassandra/pull/3777#discussion_r1929148218
##########
test/distributed/org/apache/cassandra/distributed/test/QueriesTableTest.java:
##########
@@ -240,12 +248,14 @@ public static class QueryDelayHelper
static void install(ClassLoader cl, int nodeNumber)
{
+
checkState(Arrays.stream(Mutation.class.getDeclaredMethods()).anyMatch(method
-> method.getName().equals("apply") && method.getParameterCount() == 3));
Review Comment:
i really wish BB would fail in these cases... 😢
##########
src/java/org/apache/cassandra/service/StorageProxy.java:
##########
@@ -2576,6 +2664,7 @@ protected void runMayThrow()
boolean readRejected = false;
long deadline =
requestTime.computeDeadline(verb.expiresAfterNanos());
+ long now = Clock.Global.nanoTime();
Review Comment:
dead code?
##########
src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java:
##########
@@ -234,123 +227,125 @@ public EndpointsForToken
forNonLocalStrategyTokenRead(ClusterMetadata doNotUse,
public void sendReadCommand(Message<ReadCommand> message,
InetAddressAndPort to, RequestCallback<ReadResponse> callback)
{
Node.Id id = endpointMapper.mappedId(to);
- SinglePartitionReadCommand command = (SinglePartitionReadCommand)
message.payload;
+ // TODO (nicetohave): It would be better to use the re-use the command
from the transaction but it's fragile
+ // to try and figure out exactly what changed for things like read
repair and short read protection
+ // Also this read scope doesn't reflect the contents of this
particular read and is larger than it needs to be
// TODO (required): understand interop and whether StableFastPath is
appropriate
- AccordInteropStableThenRead commit = new
AccordInteropStableThenRead(id, allTopologies, txnId, Kind.StableFastPath,
executeAt, txn, deps, route, command);
+ AccordInteropStableThenRead commit = new
AccordInteropStableThenRead(id, allTopologies, txnId, Kind.StableFastPath,
executeAt, txn, deps, route, message.payload);
node.send(id, commit, executor, new AccordInteropRead.ReadCallback(id,
to, message, callback, this));
}
@Override
public void sendReadRepairMutation(Message<Mutation> message,
InetAddressAndPort to, RequestCallback<Object> callback)
{
- checkArgument(message.payload.allowsPotentialTransactionConflicts());
+ checkArgument(message.payload.potentialTxnConflicts().allowed);
+ checkArgument(message.payload.getTableIds().size() == 1);
Node.Id id = endpointMapper.mappedId(to);
+ Participants<?> readScope =
Participants.singleton(txn.read().keys().domain(), new
TokenKey(message.payload.getTableIds().iterator().next(),
message.payload.key().getToken()));
AccordInteropReadRepair readRepair = new AccordInteropReadRepair(id,
executes, txnId, readScope, executeAt.epoch(), message.payload);
node.send(id, readRepair, executor, new
AccordInteropReadRepair.ReadRepairCallback(id, to, message, callback, this));
}
- private List<AsyncChain<Data>> keyReadChains(int nowInSeconds,
Dispatcher.RequestTime requestTime)
+ private List<AsyncChain<Data>> readChains(Dispatcher.RequestTime
requestTime)
{
- TxnKeyRead read = (TxnKeyRead) txn.read();
- List<AsyncChain<Data>> results = new ArrayList<>();
+ TxnRead read = (TxnRead) txn.read();
Seekables<?, ?> keys = txn.read().keys();
+ switch (keys.domain())
+ {
+ case Key:
+ return keyReadChains(read, keys, requestTime);
+ case Range:
+ return rangeReadChains(read, keys, requestTime);
+ default:
+ throw new IllegalStateException("Unhandled domain " +
keys.domain());
+ }
+ }
+
+ private List<AsyncChain<Data>> keyReadChains(TxnRead read, Seekables<?, ?>
keys, Dispatcher.RequestTime requestTime)
+ {
+ ClusterMetadata cm = ClusterMetadata.current();
+ List<AsyncChain<Data>> results = new ArrayList<>();
+ keys.forEach(key -> {
+ read.forEachWithKey((PartitionKey) key, fragment -> {
+ SinglePartitionReadCommand command =
(SinglePartitionReadCommand) fragment.command();
+
+ // This should only rarely occur when
coordinators start a transaction in a migrating range
+ // because they haven't yet updated their cluster
metadata.
+ // It would be harmless to do the read, because
it will be rejected in `TxnQuery` anyways,
+ // but it's faster to skip the read
+ AccordClientRequestMetrics metrics =
txn.kind().isWrite() ? accordWriteMetrics : accordReadMetrics;
+ // TODO (required): This doesn't use the metadata
from the correct epoch
Review Comment:
this pattern is all over the place, so i do wonder about the intended
semantic... if a range is part of accord at epoch=N but not epoch=N+1 and we
are executing in epoch=N but locally we know epoch=N+1 should we allow
progress? If so we need similar logic as drop tables that makes the TCM change
multi staged and blocks waiting on everything allowed for that epoch.
##########
src/java/org/apache/cassandra/service/reads/range/RangeCommandIterator.java:
##########
@@ -187,6 +193,47 @@ static int computeConcurrencyFactor(int totalRangeCount,
int rangesQueried, int
return concurrencyFactor;
}
+ private PartitionIterator executeAccord(ClusterMetadata cm,
PartitionRangeReadCommand rangeCommand, ConsistencyLevel cl)
+ {
+ //TODO (nicetohave):
https://issues.apache.org/jira/browse/CASSANDRA-20210 More efficient reads
across command stores
+ AsyncTxnResult result = StorageProxy.readWithAccord(cm, rangeCommand,
rangeCommand.dataRange().keyRange(), cl, requestTime);
+ return new AccordRangeResponse(result, rangeCommand.isReversed(), cl,
requestTime);
+ }
+
+ private SingleRangeResponse executeNormal(ReplicaPlan.ForRangeRead
replicaPlan, PartitionRangeReadCommand rangeCommand, ReadCoordinator
readCoordinator)
+ {
+ rangeCommand = (PartitionRangeReadCommand)
readCoordinator.maybeAllowOutOfRangeReads(rangeCommand,
replicaPlan.consistencyLevel());
+ // If enabled, request repaired data tracking info from full replicas,
but
+ // only if there are multiple full replicas to compare results from.
+ boolean trackRepairedStatus =
DatabaseDescriptor.getRepairedDataTrackingForRangeReadsEnabled()
+ &&
replicaPlan.contacts().filter(Replica::isFull).size() > 1;
+
+ ReplicaPlan.SharedForRangeRead sharedReplicaPlan =
ReplicaPlan.shared(replicaPlan);
+ ReadRepair<EndpointsForRange, ReplicaPlan.ForRangeRead> readRepair =
+ ReadRepair.create(readCoordinator, command, sharedReplicaPlan,
requestTime);
+ DataResolver<EndpointsForRange, ReplicaPlan.ForRangeRead> resolver =
+ new DataResolver<>(readCoordinator, rangeCommand, sharedReplicaPlan,
readRepair, requestTime, trackRepairedStatus);
+ ReadCallback<EndpointsForRange, ReplicaPlan.ForRangeRead> handler =
+ new ReadCallback<>(resolver, rangeCommand, sharedReplicaPlan,
requestTime);
+
+ if (replicaPlan.contacts().size() == 1 &&
replicaPlan.contacts().get(0).isSelf() && readCoordinator.localReadSupported())
Review Comment:
why `replicaPlan.contacts().size() == 1`? don't we normally handle this in
the loop when `replicaPlan.contacts().get(0).isSelf() &&
readCoordinator.localReadSupported()`?
##########
src/java/org/apache/cassandra/service/accord/interop/AccordInteropRead.java:
##########
@@ -108,27 +132,76 @@ public LocalReadData deserialize(DataInputPlus in, int
version) throws IOExcepti
@Override
public long serializedSize(LocalReadData data, int version)
{
- return ReadResponse.serializer.serializedSize(data.response,
version);
+ data.ensureRemoteResponse();
+ return
ReadResponse.serializer.serializedSize(data.remoteResponse, version);
}
};
- final ReadResponse response;
+ // Will be null at coordinator
+ List<Pair<AccordRoutingKey, ReadResponse>> localResponses;
+ // Will be null at coordinator
+ final ReadCommand readCommand;
+ // Will be not null at coordinator, but null at the node creating the
response until it serialized
+ ReadResponse remoteResponse;
- public LocalReadData(ReadResponse response)
+ public LocalReadData(@Nullable AccordRoutingKey start, @Nonnull
ReadResponse response, @Nonnull ReadCommand readCommand)
{
- this.response = response;
+ checkNotNull(response, "response is null");
+ checkNotNull(readCommand, "readCommand is null");
+ localResponses = ImmutableList.of(Pair.create(start, response));
+ this.readCommand = readCommand;
+ this.remoteResponse = null;
+ }
+
+ public LocalReadData(@Nonnull ReadResponse remoteResponse)
+ {
+ checkNotNull(remoteResponse);
+ this.remoteResponse = remoteResponse;
+ readCommand = null;
}
@Override
public String toString()
{
- return "LocalReadData{" + response + '}';
+ if (localResponses != null)
+ return "LocalReadData{" + localResponses + '}';
+ else
+ return "LocalReadData{" + remoteResponse + '}';
}
@Override
public Data merge(Data data)
{
- throw new IllegalStateException("Should only ever be a single
partition");
+ checkState(remoteResponse == null, "Already serialized");
+ checkState(readCommand.isRangeRequest(), "Should only ever be a
single partition");
+ LocalReadData other = (LocalReadData)data;
+ checkState(readCommand == other.readCommand, "Should share the
same ReadCommand");
+ if (localResponses.size() == 1)
Review Comment:
isn't this nullable? this condition looks to try to handle `public
LocalReadData(@Nullable AccordRoutingKey start, @Nonnull ReadResponse response,
@Nonnull ReadCommand readCommand)` where it does
```
localResponses = ImmutableList.of(Pair.create(start, response));
```
but it doesn't make too much sense in other code paths? wouldn't it be
less code to just make that constructor use a mutable list so we could avoid
this condition?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]