dcapwell commented on code in PR #3568:
URL: https://github.com/apache/cassandra/pull/3568#discussion_r1779889761
##########
src/java/org/apache/cassandra/service/StorageProxy.java:
##########
@@ -2141,28 +2149,77 @@ private static PartitionIterator
readWithConsensus(SinglePartitionReadCommand.Gr
return lastResult.serialReadResult;
}
- private static ConsensusAttemptResult
readWithAccord(SinglePartitionReadCommand.Group group, ConsistencyLevel
consistencyLevel, Dispatcher.RequestTime requestTime)
+ private static ConsistencyLevel
consistencyLevelForAccordRead(ClusterMetadata cm,
SinglePartitionReadCommand.Group group, @Nullable ConsistencyLevel
consistencyLevel)
+ {
+ // Null means no specific consistency behavior is required from
Accord, it's functionally similar to
+ // reading at ONE if you are reading data that wasn't written via
Accord
+ if (consistencyLevel == null)
+ return null;
+
+ TableId tableId = group.queries.get(0).metadata().id;
+ TableParams tableParams = getTableMetadata(cm, tableId).params;
+ TransactionalMode mode = tableParams.transactionalMode;
+ TransactionalMigrationFromMode migrationFromMode =
tableParams.transactionalMigrationFrom;
+ for (SinglePartitionReadCommand command : group.queries)
+ {
+ // readCLForStrategy should return either null or the supplied
consistency level
+ // in which case we will read everything at that CL since Accord
doesn't support per table
+ // read consistency
+ ConsistencyLevel commitCL =
mode.readCLForStrategy(migrationFromMode, consistencyLevel, cm, tableId,
command.partitionKey().getToken());
+ if (commitCL != null)
+ return commitCL;
+ }
+ return null;
+ }
+
+ private static ConsensusAttemptResult readWithAccord(ClusterMetadata cm,
SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel,
Dispatcher.RequestTime requestTime)
{
- if (group.queries.size() > 1)
- throw new InvalidRequestException("SERIAL/LOCAL_SERIAL consistency
may only be requested for one partition at a time");
- SinglePartitionReadCommand readCommand = group.queries.get(0);
// If the non-SERIAL write strategy is sending all writes through
Accord there is no need to use the supplied consistency
// level since Accord will manage reading safely
TransactionalMode transactionalMode =
group.metadata().params.transactionalMode;
- consistencyLevel =
transactionalMode.readCLForStrategy(consistencyLevel);
- TxnRead read = TxnRead.createSerialRead(readCommand, consistencyLevel);
- Invariants.checkState(read.keys().size() == 1, "Ephemeral reads are
only strict-serializable for single partition reads");
- Txn txn = new Txn.InMemory(transactionalMode == TransactionalMode.full
&& DatabaseDescriptor.getAccordEphemeralReadEnabledEnabled() ? EphemeralRead :
Read, read.keys(), read, TxnQuery.ALL, null);
- IAccordService accordService = AccordService.instance();
- TxnResult txnResult = accordService.coordinate(txn, consistencyLevel,
requestTime);
+ consistencyLevel = consistencyLevelForAccordRead(cm, group,
consistencyLevel);
+ TxnRead read = TxnRead.createSerialRead(group.queries,
consistencyLevel);
+ Txn.Kind kind = Read;
+ if (transactionalMode == TransactionalMode.full &&
DatabaseDescriptor.getAccordEphemeralReadEnabledEnabled() &&
group.queries.size() == 1)
+ kind = EphemeralRead;
+ Txn txn = new Txn.InMemory(kind, read.keys(), read, TxnQuery.ALL,
null);
+ AsyncTxnResult asyncTxnResult =
AccordService.instance().coordinateAsync(txn, consistencyLevel, requestTime);
+ return getConsensusAttemptResultFromAsyncTxnResult(asyncTxnResult,
group.queries.size(), index -> group.queries.get(index).isReversed(),
consistencyLevel, requestTime);
+ }
+
+ /*
+ * Used for both the SERIAL and non-SERIAL read path into Accord
+ */
+ public static ConsensusAttemptResult
getConsensusAttemptResultFromAsyncTxnResult(AsyncTxnResult asyncTxnResult, int
numQueries, IntPredicate isQueryReversed, ConsistencyLevel cl,
Dispatcher.RequestTime requestTime)
+ {
+ TxnResult txnResult =
AccordService.instance().getTxnResult(asyncTxnResult, false, cl, requestTime);
+ // TODO (required): Converge on a single approach to
RETRY_NEW_PROTOCOL, this works for now because reads don't support it anyways
if (txnResult.kind() == retry_new_protocol)
return RETRY_NEW_PROTOCOL;
- TxnData data = (TxnData)txnResult;
- FilteredPartition partition = data.get(TxnRead.SERIAL_READ);
- if (partition != null)
- return
serialReadResult(PartitionIterators.singletonIterator(partition.rowIterator(readCommand.isReversed())));
- else
+ TxnData data = (TxnData) txnResult;
+
+ if (data.isEmpty())
+ {
return serialReadResult(EmptyIterators.partition());
+ }
+ else if (data.size() == 1)
+ {
+ FilteredPartition value = data.values().iterator().next();
+ return
serialReadResult(PartitionIterators.singletonIterator(value.rowIterator(isQueryReversed.test(0))));
+ }
+ else
+ {
+ // TODO (review): 95% sure this isn't actually needed and the
consumer is going consume these by DecoratedKey not iteration order, but the
non-transactional path does preserve the order of the iterators
Review Comment:
not a blocker for this patch, but we should find testing that hits this case
and make sure we maintain the existing behavior
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]