dcapwell commented on code in PR #3568:
URL: https://github.com/apache/cassandra/pull/3568#discussion_r1779062043
##########
src/java/org/apache/cassandra/service/consensus/TransactionalMode.java:
##########
@@ -112,14 +121,25 @@ public ConsistencyLevel
commitCLForStrategy(ConsistencyLevel consistencyLevel)
return consistencyLevel;
}
- // TODO (required): This won't work for migration directly from none to
full because there is no safe system to read from
- // during the first phase (repair). Accord won't read correctly beacuse it
won't honor the CL and miss non-transactional writes that haven't been repaired
and non-transactional
- // reads will miss all the writes being routed through Accord since they
occur asynchronously. Something has to give here where either writes routed
through are Accord are synchronous at CL
- // or reads are routed through Accord and read at quorum as long as the
range has not completed the first phase (repair).
- public ConsistencyLevel readCLForStrategy(ConsistencyLevel
consistencyLevel)
+ private boolean ignoresSuppliedReadCL()
{
- if (ignoresSuppliedConsistencyLevel)
- return null;
+ return writesThroughAccord && blockingReadRepairThroughAccord;
Review Comment:
with this change do you still need `public final boolean
ignoresSuppliedConsistencyLevel`? Also wouldn't you also need reads to go
through accord as well (aka, im just reading w/e is in the enum)
##########
src/java/org/apache/cassandra/service/consensus/TransactionalMode.java:
##########
@@ -112,14 +121,25 @@ public ConsistencyLevel
commitCLForStrategy(ConsistencyLevel consistencyLevel)
return consistencyLevel;
}
- // TODO (required): This won't work for migration directly from none to
full because there is no safe system to read from
- // during the first phase (repair). Accord won't read correctly beacuse it
won't honor the CL and miss non-transactional writes that haven't been repaired
and non-transactional
- // reads will miss all the writes being routed through Accord since they
occur asynchronously. Something has to give here where either writes routed
through are Accord are synchronous at CL
- // or reads are routed through Accord and read at quorum as long as the
range has not completed the first phase (repair).
- public ConsistencyLevel readCLForStrategy(ConsistencyLevel
consistencyLevel)
+ private boolean ignoresSuppliedReadCL()
{
- if (ignoresSuppliedConsistencyLevel)
- return null;
+ return writesThroughAccord && blockingReadRepairThroughAccord;
+ }
+
+ public ConsistencyLevel readCLForStrategy(TransactionalMigrationFromMode
fromMode, ConsistencyLevel consistencyLevel, ClusterMetadata cm, TableId
tableId, Token token)
+ {
+ if (ignoresSuppliedReadCL())
+ {
+ TableMigrationState tms =
cm.consensusMigrationState.tableStates.get(tableId);
+ checkState(tms != null || fromMode ==
TransactionalMigrationFromMode.none);
Review Comment:
is this true? If you enable migration and this is the first time its
touched, does a `TableMigrationState` exist?
##########
src/java/org/apache/cassandra/service/consensus/TransactionalMode.java:
##########
@@ -112,14 +121,25 @@ public ConsistencyLevel
commitCLForStrategy(ConsistencyLevel consistencyLevel)
return consistencyLevel;
}
- // TODO (required): This won't work for migration directly from none to
full because there is no safe system to read from
- // during the first phase (repair). Accord won't read correctly beacuse it
won't honor the CL and miss non-transactional writes that haven't been repaired
and non-transactional
- // reads will miss all the writes being routed through Accord since they
occur asynchronously. Something has to give here where either writes routed
through are Accord are synchronous at CL
- // or reads are routed through Accord and read at quorum as long as the
range has not completed the first phase (repair).
- public ConsistencyLevel readCLForStrategy(ConsistencyLevel
consistencyLevel)
+ private boolean ignoresSuppliedReadCL()
{
- if (ignoresSuppliedConsistencyLevel)
- return null;
+ return writesThroughAccord && blockingReadRepairThroughAccord;
+ }
+
+ public ConsistencyLevel readCLForStrategy(TransactionalMigrationFromMode
fromMode, ConsistencyLevel consistencyLevel, ClusterMetadata cm, TableId
tableId, Token token)
+ {
+ if (ignoresSuppliedReadCL())
+ {
+ TableMigrationState tms =
cm.consensusMigrationState.tableStates.get(tableId);
+ checkState(tms != null || fromMode ==
TransactionalMigrationFromMode.none);
Review Comment:
```suggestion
checkState(tms != null || fromMode ==
TransactionalMigrationFromMode.none, "%s == null or %s != none", tms, fromMode);
```
if this happens, good to know why
##########
src/java/org/apache/cassandra/service/StorageProxy.java:
##########
@@ -356,6 +361,7 @@ public static RowIterator cas(String keyspaceName,
ConsensusAttemptResult lastAttemptResult;
do
{
+ ClusterMetadata cm = ClusterMetadata.current();
Review Comment:
should we fetch the `TableMetadata` again from this CM?
##########
src/java/org/apache/cassandra/service/StorageProxy.java:
##########
@@ -2267,7 +2324,28 @@ public static PartitionIterator
readRegular(SinglePartitionReadCommand.Group gro
long start = nanoTime();
try
{
- PartitionIterator result = fetchRows(group.queries,
consistencyLevel, coordinator, requestTime);
+ ClusterMetadata cm = ClusterMetadata.current();
+ TableId tableId = group.queries.get(0).metadata().id;
+ // Returns null for local tables
+ TableMetadata tableMetadata = getTableMetadata(cm, tableId);
+ if (tableMetadata == null)
+ tableMetadata =
Schema.instance.localKeyspaces().getTableOrViewNullable(tableId);
+ TableParams tableParams = tableMetadata.params;
+
+ TransactionalMode transactionalMode =
tableParams.transactionalMode;
+ TransactionalMigrationFromMode transactionalMigrationFromMode =
tableParams.transactionalMigrationFrom;
+ if (transactionalMigrationFromMode !=
TransactionalMigrationFromMode.none && transactionalMode.readsThroughAccord &&
transactionalMigrationFromMode.writesThroughAccord() &&
transactionalMigrationFromMode.readsThroughAccord())
Review Comment:
what case is this? if the read/writes *from* are accord... then are we
migrating from `full` to `full`?
##########
src/java/org/apache/cassandra/service/StorageProxy.java:
##########
@@ -2141,28 +2149,77 @@ private static PartitionIterator
readWithConsensus(SinglePartitionReadCommand.Gr
return lastResult.serialReadResult;
}
- private static ConsensusAttemptResult
readWithAccord(SinglePartitionReadCommand.Group group, ConsistencyLevel
consistencyLevel, Dispatcher.RequestTime requestTime)
+ private static ConsistencyLevel
consistencyLevelForAccordRead(ClusterMetadata cm,
SinglePartitionReadCommand.Group group, @Nullable ConsistencyLevel
consistencyLevel)
+ {
+ // Null means no specific consistency behavior is required from
Accord, it's functionally similar to
+ // reading at ONE if you are reading data that wasn't written via
Accord
+ if (consistencyLevel == null)
+ return null;
+
+ TableId tableId = group.queries.get(0).metadata().id;
+ TableParams tableParams = getTableMetadata(cm, tableId).params;
+ TransactionalMode mode = tableParams.transactionalMode;
+ TransactionalMigrationFromMode migrationFromMode =
tableParams.transactionalMigrationFrom;
+ for (SinglePartitionReadCommand command : group.queries)
+ {
+ // readCLForStrategy should return either null or the supplied
consistency level
+ // in which case we will read everything at that CL since Accord
doesn't support per table
+ // read consistency
+ ConsistencyLevel commitCL =
mode.readCLForStrategy(migrationFromMode, consistencyLevel, cm, tableId,
command.partitionKey().getToken());
+ if (commitCL != null)
+ return commitCL;
+ }
+ return null;
+ }
+
+ private static ConsensusAttemptResult readWithAccord(ClusterMetadata cm,
SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel,
Dispatcher.RequestTime requestTime)
{
- if (group.queries.size() > 1)
- throw new InvalidRequestException("SERIAL/LOCAL_SERIAL consistency
may only be requested for one partition at a time");
- SinglePartitionReadCommand readCommand = group.queries.get(0);
// If the non-SERIAL write strategy is sending all writes through
Accord there is no need to use the supplied consistency
// level since Accord will manage reading safely
TransactionalMode transactionalMode =
group.metadata().params.transactionalMode;
- consistencyLevel =
transactionalMode.readCLForStrategy(consistencyLevel);
- TxnRead read = TxnRead.createSerialRead(readCommand, consistencyLevel);
- Invariants.checkState(read.keys().size() == 1, "Ephemeral reads are
only strict-serializable for single partition reads");
- Txn txn = new Txn.InMemory(transactionalMode == TransactionalMode.full
&& DatabaseDescriptor.getAccordEphemeralReadEnabledEnabled() ? EphemeralRead :
Read, read.keys(), read, TxnQuery.ALL, null);
- IAccordService accordService = AccordService.instance();
- TxnResult txnResult = accordService.coordinate(txn, consistencyLevel,
requestTime);
+ consistencyLevel = consistencyLevelForAccordRead(cm, group,
consistencyLevel);
+ TxnRead read = TxnRead.createSerialRead(group.queries,
consistencyLevel);
Review Comment:
if `consistencyLevel == null` can't we do a normal Accord read and not a
serial read?
##########
src/java/org/apache/cassandra/service/StorageProxy.java:
##########
@@ -2141,28 +2149,77 @@ private static PartitionIterator
readWithConsensus(SinglePartitionReadCommand.Gr
return lastResult.serialReadResult;
}
- private static ConsensusAttemptResult
readWithAccord(SinglePartitionReadCommand.Group group, ConsistencyLevel
consistencyLevel, Dispatcher.RequestTime requestTime)
+ private static ConsistencyLevel
consistencyLevelForAccordRead(ClusterMetadata cm,
SinglePartitionReadCommand.Group group, @Nullable ConsistencyLevel
consistencyLevel)
+ {
+ // Null means no specific consistency behavior is required from
Accord, it's functionally similar to
+ // reading at ONE if you are reading data that wasn't written via
Accord
+ if (consistencyLevel == null)
+ return null;
+
+ TableId tableId = group.queries.get(0).metadata().id;
+ TableParams tableParams = getTableMetadata(cm, tableId).params;
+ TransactionalMode mode = tableParams.transactionalMode;
+ TransactionalMigrationFromMode migrationFromMode =
tableParams.transactionalMigrationFrom;
+ for (SinglePartitionReadCommand command : group.queries)
+ {
+ // readCLForStrategy should return either null or the supplied
consistency level
+ // in which case we will read everything at that CL since Accord
doesn't support per table
+ // read consistency
+ ConsistencyLevel commitCL =
mode.readCLForStrategy(migrationFromMode, consistencyLevel, cm, tableId,
command.partitionKey().getToken());
+ if (commitCL != null)
+ return commitCL;
+ }
+ return null;
+ }
+
+ private static ConsensusAttemptResult readWithAccord(ClusterMetadata cm,
SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel,
Dispatcher.RequestTime requestTime)
{
- if (group.queries.size() > 1)
- throw new InvalidRequestException("SERIAL/LOCAL_SERIAL consistency
may only be requested for one partition at a time");
- SinglePartitionReadCommand readCommand = group.queries.get(0);
// If the non-SERIAL write strategy is sending all writes through
Accord there is no need to use the supplied consistency
// level since Accord will manage reading safely
TransactionalMode transactionalMode =
group.metadata().params.transactionalMode;
- consistencyLevel =
transactionalMode.readCLForStrategy(consistencyLevel);
- TxnRead read = TxnRead.createSerialRead(readCommand, consistencyLevel);
- Invariants.checkState(read.keys().size() == 1, "Ephemeral reads are
only strict-serializable for single partition reads");
- Txn txn = new Txn.InMemory(transactionalMode == TransactionalMode.full
&& DatabaseDescriptor.getAccordEphemeralReadEnabledEnabled() ? EphemeralRead :
Read, read.keys(), read, TxnQuery.ALL, null);
- IAccordService accordService = AccordService.instance();
- TxnResult txnResult = accordService.coordinate(txn, consistencyLevel,
requestTime);
+ consistencyLevel = consistencyLevelForAccordRead(cm, group,
consistencyLevel);
+ TxnRead read = TxnRead.createSerialRead(group.queries,
consistencyLevel);
+ Txn.Kind kind = Read;
+ if (transactionalMode == TransactionalMode.full &&
DatabaseDescriptor.getAccordEphemeralReadEnabledEnabled() &&
group.queries.size() == 1)
+ kind = EphemeralRead;
+ Txn txn = new Txn.InMemory(kind, read.keys(), read, TxnQuery.ALL,
null);
+ AsyncTxnResult asyncTxnResult =
AccordService.instance().coordinateAsync(txn, consistencyLevel, requestTime);
+ return getConsensusAttemptResultFromAsyncTxnResult(asyncTxnResult,
group.queries.size(), index -> group.queries.get(index).isReversed(),
consistencyLevel, requestTime);
+ }
+
+ /*
+ * Used for both the SERIAL and non-SERIAL read path into Accord
+ */
+ public static ConsensusAttemptResult
getConsensusAttemptResultFromAsyncTxnResult(AsyncTxnResult asyncTxnResult, int
numQueries, IntPredicate isQueryReversed, ConsistencyLevel cl,
Dispatcher.RequestTime requestTime)
+ {
+ TxnResult txnResult =
AccordService.instance().getTxnResult(asyncTxnResult, false, cl, requestTime);
+ // TODO (required): Converge on a single approach to
RETRY_NEW_PROTOCOL, this works for now because reads don't support it anyways
if (txnResult.kind() == retry_new_protocol)
return RETRY_NEW_PROTOCOL;
- TxnData data = (TxnData)txnResult;
- FilteredPartition partition = data.get(TxnRead.SERIAL_READ);
- if (partition != null)
- return
serialReadResult(PartitionIterators.singletonIterator(partition.rowIterator(readCommand.isReversed())));
- else
+ TxnData data = (TxnData) txnResult;
+
+ if (data.isEmpty())
+ {
return serialReadResult(EmptyIterators.partition());
+ }
+ else if (data.size() == 1)
+ {
+ FilteredPartition value = data.values().iterator().next();
+ return
serialReadResult(PartitionIterators.singletonIterator(value.rowIterator(isQueryReversed.test(0))));
+ }
+ else
+ {
+ // TODO (review): 95% sure this isn't actually needed and the
consumer is going consume these by DecoratedKey not iteration order, but the
non-transactional path does preserve the order of the iterators
Review Comment:
```
SELECT * FROM ... WHERE pk IN (?, ?);
```
I would expect the rows to be in token order, so do think this is needed?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]