belliottsmith commented on code in PR #3777:
URL: https://github.com/apache/cassandra/pull/3777#discussion_r1915760303
##########
src/java/org/apache/cassandra/service/reads/range/RangeCommandIterator.java:
##########
@@ -196,72 +252,115 @@ static int computeConcurrencyFactor(int totalRangeCount,
int rangesQueried, int
* {@code DataLimits}) may have "state" information and that state may
only be valid for the first query (in
* that it's the query that "continues" whatever we're previously queried).
*/
- private IRangeResponse query(ReplicaPlan.ForRangeRead replicaPlan,
ReadCoordinator readCoordinator, boolean isFirst)
+ private PartitionIterator query(ClusterMetadata cm,
ReplicaPlan.ForRangeRead replicaPlan, ReadCoordinator readCoordinator,
List<ReadRepair<?, ?>> readRepairs, boolean isFirst)
{
PartitionRangeReadCommand rangeCommand =
command.forSubRange(replicaPlan.range(), isFirst);
-
- // If enabled, request repaired data tracking info from full replicas,
but
- // only if there are multiple full replicas to compare results from.
- boolean trackRepairedStatus =
DatabaseDescriptor.getRepairedDataTrackingForRangeReadsEnabled()
- &&
replicaPlan.contacts().filter(Replica::isFull).size() > 1;
- ClusterMetadata cm = ClusterMetadata.current();
- TableMetadata metadata = command.metadata();
- TableParams tableParams = metadata.params;
- TransactionalMode transactionalMode = tableParams.transactionalMode;
- TransactionalMigrationFromMode transactionalMigrationFromMode =
tableParams.transactionalMigrationFrom;
- if (transactionalMigrationFromMode !=
TransactionalMigrationFromMode.none &&
transactionalMode.nonSerialReadsThroughAccord &&
transactionalMigrationFromMode.nonSerialWritesThroughAccord() &&
transactionalMigrationFromMode.nonSerialReadsThroughAccord())
- throw new UnsupportedOperationException("Live migration is not
supported, can't safely read when migrating from " +
transactionalMigrationFromMode + " to " + transactionalMode);
- if (transactionalMode.nonSerialReadsThroughAccord &&
readCoordinator.isEventuallyConsistent())
+ // Accord interop execution should always be coordinated through the
C* plumbing
+ if (!readCoordinator.isEventuallyConsistent())
{
- //TODO (nicetohave): This is very inefficient because it will not
map the the command store owned ranges
- // so every command store will return results and most will be
discarded due to the limit
- // Really we want to split the ranges by command stores owned
ranges and then query one at a time
- AsyncTxnResult result = StorageProxy.readWithAccord(cm,
rangeCommand, ImmutableList.of(rangeCommand.dataRange().keyRange()),
replicaPlan.consistencyLevel(), requestTime);
- return new AccordRangeResponse(result, rangeCommand.isReversed(),
replicaPlan.consistencyLevel(), requestTime);
+ SingleRangeResponse response = executeNormal(replicaPlan,
rangeCommand, readCoordinator);
+ readRepairs.add(response.getReadRepair());
+ return response;
}
- else
+
+ List<RangeReadWithTarget> reads =
ConsensusRequestRouter.splitReadIntoAccordAndNormal(cm, rangeCommand,
readCoordinator, requestTime);
+ // Special case returning directly to avoid wrapping the iterator and
applying the limits an extra time
+ if (reads.size() == 1)
{
- ReplicaPlan.SharedForRangeRead sharedReplicaPlan =
ReplicaPlan.shared(replicaPlan);
- ReadRepair<EndpointsForRange, ReplicaPlan.ForRangeRead> readRepair
=
- ReadRepair.create(ReadCoordinator.DEFAULT, command,
sharedReplicaPlan, requestTime);
- DataResolver<EndpointsForRange, ReplicaPlan.ForRangeRead> resolver
=
- new DataResolver<>(ReadCoordinator.DEFAULT, rangeCommand,
sharedReplicaPlan, readRepair, requestTime, trackRepairedStatus);
- ReadCallback<EndpointsForRange, ReplicaPlan.ForRangeRead> handler =
- new ReadCallback<>(resolver, rangeCommand, sharedReplicaPlan,
requestTime);
-
- if (replicaPlan.contacts().size() == 1 &&
replicaPlan.contacts().get(0).isSelf())
+ RangeReadWithTarget rangeReadWithTarget = reads.get(0);
+
checkState(rangeReadWithTarget.read.dataRange().keyRange().equals(rangeCommand.dataRange().keyRange()));
+ if (rangeReadWithTarget.target == RangeReadTarget.accord &&
readCoordinator.isEventuallyConsistent())
{
- Stage.READ.execute(new
StorageProxy.LocalReadRunnable(rangeCommand, handler, requestTime,
trackRepairedStatus));
+ return executeAccord(cm,
+ rangeReadWithTarget.read,
+ replicaPlan.consistencyLevel());
}
else
{
- for (Replica replica : replicaPlan.contacts())
- {
- Tracing.trace("Enqueuing request to {}", replica);
- ReadCommand command = replica.isFull() ? rangeCommand :
rangeCommand.copyAsTransientQuery(replica);
- Message<ReadCommand> message =
command.createMessage(trackRepairedStatus && replica.isFull(), requestTime);
- MessagingService.instance().sendWithCallback(message,
replica.endpoint(), handler);
- }
+ SingleRangeResponse response = executeNormal(replicaPlan,
rangeReadWithTarget.read, readCoordinator);
+ readRepairs.add(response.getReadRepair());
+ return response;
}
- return new CassandraRangeResponse(resolver, handler, readRepair);
}
+
+ // TODO (review): Should this be reworked to execute the queries
serially from the iterator? It would respect
Review Comment:
I think this should be fairly straightforward to do in Accord actually; we
don't unblock other transactions until this one notifies via distributed Apply
message anyway, so performing the read serially doesn't really change that. The
only reason this might slow things down is we aren't loading transaction data
from as many command stores in one go. But for a range query this is probably
acceptable and maybe desirable? You don't really want to query all N command
stores when you expect the first matching one to return enough data...?
That said, we're running out of time so probably still something for the
future.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]