aweisberg commented on code in PR #3777:
URL: https://github.com/apache/cassandra/pull/3777#discussion_r1915654176
##########
src/java/org/apache/cassandra/service/reads/range/RangeCommandIterator.java:
##########
@@ -196,72 +252,115 @@ static int computeConcurrencyFactor(int totalRangeCount,
int rangesQueried, int
* {@code DataLimits}) may have "state" information and that state may
only be valid for the first query (in
* that it's the query that "continues" whatever we're previously queried).
*/
- private IRangeResponse query(ReplicaPlan.ForRangeRead replicaPlan,
ReadCoordinator readCoordinator, boolean isFirst)
+ private PartitionIterator query(ClusterMetadata cm,
ReplicaPlan.ForRangeRead replicaPlan, ReadCoordinator readCoordinator,
List<ReadRepair<?, ?>> readRepairs, boolean isFirst)
{
PartitionRangeReadCommand rangeCommand =
command.forSubRange(replicaPlan.range(), isFirst);
-
- // If enabled, request repaired data tracking info from full replicas,
but
- // only if there are multiple full replicas to compare results from.
- boolean trackRepairedStatus =
DatabaseDescriptor.getRepairedDataTrackingForRangeReadsEnabled()
- &&
replicaPlan.contacts().filter(Replica::isFull).size() > 1;
- ClusterMetadata cm = ClusterMetadata.current();
- TableMetadata metadata = command.metadata();
- TableParams tableParams = metadata.params;
- TransactionalMode transactionalMode = tableParams.transactionalMode;
- TransactionalMigrationFromMode transactionalMigrationFromMode =
tableParams.transactionalMigrationFrom;
- if (transactionalMigrationFromMode !=
TransactionalMigrationFromMode.none &&
transactionalMode.nonSerialReadsThroughAccord &&
transactionalMigrationFromMode.nonSerialWritesThroughAccord() &&
transactionalMigrationFromMode.nonSerialReadsThroughAccord())
- throw new UnsupportedOperationException("Live migration is not
supported, can't safely read when migrating from " +
transactionalMigrationFromMode + " to " + transactionalMode);
- if (transactionalMode.nonSerialReadsThroughAccord &&
readCoordinator.isEventuallyConsistent())
+ // Accord interop execution should always be coordinated through the
C* plumbing
+ if (!readCoordinator.isEventuallyConsistent())
{
- //TODO (nicetohave): This is very inefficient because it will not
map the the command store owned ranges
- // so every command store will return results and most will be
discarded due to the limit
- // Really we want to split the ranges by command stores owned
ranges and then query one at a time
- AsyncTxnResult result = StorageProxy.readWithAccord(cm,
rangeCommand, ImmutableList.of(rangeCommand.dataRange().keyRange()),
replicaPlan.consistencyLevel(), requestTime);
- return new AccordRangeResponse(result, rangeCommand.isReversed(),
replicaPlan.consistencyLevel(), requestTime);
+ SingleRangeResponse response = executeNormal(replicaPlan,
rangeCommand, readCoordinator);
+ readRepairs.add(response.getReadRepair());
+ return response;
}
- else
+
+ List<RangeReadWithTarget> reads =
ConsensusRequestRouter.splitReadIntoAccordAndNormal(cm, rangeCommand,
readCoordinator, requestTime);
+ // Special case returning directly to avoid wrapping the iterator and
applying the limits an extra time
+ if (reads.size() == 1)
{
- ReplicaPlan.SharedForRangeRead sharedReplicaPlan =
ReplicaPlan.shared(replicaPlan);
- ReadRepair<EndpointsForRange, ReplicaPlan.ForRangeRead> readRepair
=
- ReadRepair.create(ReadCoordinator.DEFAULT, command,
sharedReplicaPlan, requestTime);
- DataResolver<EndpointsForRange, ReplicaPlan.ForRangeRead> resolver
=
- new DataResolver<>(ReadCoordinator.DEFAULT, rangeCommand,
sharedReplicaPlan, readRepair, requestTime, trackRepairedStatus);
- ReadCallback<EndpointsForRange, ReplicaPlan.ForRangeRead> handler =
- new ReadCallback<>(resolver, rangeCommand, sharedReplicaPlan,
requestTime);
-
- if (replicaPlan.contacts().size() == 1 &&
replicaPlan.contacts().get(0).isSelf())
+ RangeReadWithTarget rangeReadWithTarget = reads.get(0);
+
checkState(rangeReadWithTarget.read.dataRange().keyRange().equals(rangeCommand.dataRange().keyRange()));
+ if (rangeReadWithTarget.target == RangeReadTarget.accord &&
readCoordinator.isEventuallyConsistent())
{
- Stage.READ.execute(new
StorageProxy.LocalReadRunnable(rangeCommand, handler, requestTime,
trackRepairedStatus));
+ return executeAccord(cm,
+ rangeReadWithTarget.read,
+ replicaPlan.consistencyLevel());
}
else
{
- for (Replica replica : replicaPlan.contacts())
- {
- Tracing.trace("Enqueuing request to {}", replica);
- ReadCommand command = replica.isFull() ? rangeCommand :
rangeCommand.copyAsTransientQuery(replica);
- Message<ReadCommand> message =
command.createMessage(trackRepairedStatus && replica.isFull(), requestTime);
- MessagingService.instance().sendWithCallback(message,
replica.endpoint(), handler);
- }
+ SingleRangeResponse response = executeNormal(replicaPlan,
rangeReadWithTarget.read, readCoordinator);
+ readRepairs.add(response.getReadRepair());
+ return response;
}
- return new CassandraRangeResponse(resolver, handler, readRepair);
}
+
+ // TODO (review): Should this be reworked to execute the queries
serially from the iterator? It would respect
Review Comment:
Yeah I will remove this now that you have reviewed. I didn't do it because I
thought it would cause timeouts especially as the number of command stores (and
thus independent txns) increases. Trying to do it serially from a single txn
would require more work in Accord to allow it without unnecessarily causing
reads to block other transactions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]