dcapwell commented on code in PR #3694:
URL: https://github.com/apache/cassandra/pull/3694#discussion_r1847295190
##########
src/java/org/apache/cassandra/service/StorageProxy.java:
##########
@@ -2181,18 +2183,18 @@ private static ConsistencyLevel
consistencyLevelForAccordRead(ClusterMetadata cm
return null;
}
-
- public static AsyncTxnResult readWithAccord(ClusterMetadata cm,
PartitionRangeReadCommand command, ConsistencyLevel consistencyLevel,
Dispatcher.RequestTime requestTime)
- {
+ public static AsyncTxnResult readWithAccord(ClusterMetadata cm,
PartitionRangeReadCommand command, List<AbstractBounds<PartitionPosition>>
ranges, ConsistencyLevel consistencyLevel, Dispatcher.RequestTime requestTime)
{
if (consistencyLevel != null &&
!IAccordService.SUPPORTED_READ_CONSISTENCY_LEVELS.contains(consistencyLevel))
throw new InvalidRequestException(consistencyLevel + " is not
supported by Accord");
TableMetadata tableMetadata = getTableMetadata(cm,
command.metadata().id);
TableParams tableParams = tableMetadata.params;
Range<Token> readRange = new
Range<>(command.dataRange().startKey().getToken(),
command.dataRange().stopKey().getToken());
consistencyLevel =
tableParams.transactionalMode.readCLForStrategy(tableParams.transactionalMigrationFrom,
consistencyLevel, cm, tableMetadata.id, readRange);
- TxnRead read = new TxnRangeRead(command, consistencyLevel);
- Txn.Kind kind = EphemeralRead;
+ TxnRead read = new TxnRangeRead(command, ranges, consistencyLevel);
+ Txn.Kind kind = Read;
+ if (tableParams.transactionalMode == TransactionalMode.full &&
getAccordEphemeralReadEnabledEnabled() &&
tableParams.transactionalMigrationFrom == none)
Review Comment:
`tableParams.transactionalMode == TransactionalMode.full &&
getAccordEphemeralReadEnabledEnabled() &&
tableParams.transactionalMigrationFrom == none`
this really isn't the only place that deals with this, so wonder if its best
to make a reusable method? `allowEphemeralReads(tableMetadata)`?
##########
src/java/org/apache/cassandra/service/accord/api/AccordRoutableKey.java:
##########
@@ -79,20 +80,31 @@ public final int compareTo(AccordRoutableKey that)
if (cmp != 0)
return cmp;
- if (this.getClass() == SentinelKey.class || that.getClass() ==
SentinelKey.class)
+ Class<?> thisClass = this.getClass();
+ Class<?> thatClass = that.getClass();
+ if (thisClass== SentinelKey.class || thatClass == SentinelKey.class)
Review Comment:
```suggestion
if (thisClass == SentinelKey.class || thatClass == SentinelKey.class)
```
##########
src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java:
##########
@@ -318,6 +319,119 @@ public long serializedSize(TokenKey key, int version)
}
}
+ // Allows the creation of a Range that is begin inclusive or end exclusive
for a given Token
+ public static final class MinTokenKey extends TokenKey
+ {
+ private static final long EMPTY_SIZE;
+
+ @Override
+ public Range asRange()
+ {
+ AccordRoutingKey before = token.isMinimum()
+ ? new SentinelKey(table, true)
+ : new MinTokenKey(table,
token.decreaseSlightly());
+
+ return new TokenRange(before, this);
+ }
+
+ static
+ {
+ EMPTY_SIZE = ObjectSizes.measure(new MinTokenKey(null, null));
+ }
Review Comment:
shouldn't this be higher? before `asRange`?
##########
src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java:
##########
@@ -318,6 +319,119 @@ public long serializedSize(TokenKey key, int version)
}
}
+ // Allows the creation of a Range that is begin inclusive or end exclusive
for a given Token
+ public static final class MinTokenKey extends TokenKey
+ {
+ private static final long EMPTY_SIZE;
+
+ @Override
+ public Range asRange()
+ {
+ AccordRoutingKey before = token.isMinimum()
+ ? new SentinelKey(table, true)
+ : new MinTokenKey(table,
token.decreaseSlightly());
Review Comment:
shouldn't this be a `TokenKey`? as start is exclusive?
##########
src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java:
##########
@@ -290,6 +291,66 @@ private AsyncChain<Data> readChains()
}));
});
});
+ return results;
+ }
+
+ private List<AsyncChain<Data>> rangeReadChains(long nowInSeconds,
Dispatcher.RequestTime requestTime)
+ {
+ TxnRangeRead read = (TxnRangeRead) txn.read();
+ Seekables<?, ?> keys = txn.read().keys();
Review Comment:
nit: `ranges`?
##########
src/java/org/apache/cassandra/service/consensus/TransactionalMode.java:
##########
@@ -112,7 +112,10 @@ public enum TransactionalMode
* Execute writes through Accord skipping StorageProxy's normal write
path. Ignores the provided consistency level
* which makes Accord commit writes at ANY similar to Paxos with commit
consistency level ANY.
*/
- full(true, true, true, true, true);
+ full(true, true, true, true, true),
+
+ // For tests, Accord will read and be forced to do interop reads
+ test_interop_read(true, false, false, true, true);
Review Comment:
more states worry me... and even at community over code the feedback was to
limit this enum...
##########
test/harry/main/org/apache/cassandra/harry/ddl/SchemaSpec.java:
##########
@@ -337,7 +337,7 @@ public long[] deflateRegularColumns(Object[] regulars)
public boolean isWriteTimeFromAccord()
{
- return transactionalMode.isPresent() &&
transactionalMode.get().nonSerialWritesThroughAccord;
+ return writeTimeFromAccord;
Review Comment:
I was so confused by this, then looked at the git history and looks like
race condition of merge and not running CI with the most recent rebase...
##########
src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java:
##########
@@ -290,6 +291,66 @@ private AsyncChain<Data> readChains()
}));
});
});
+ return results;
+ }
+
+ private List<AsyncChain<Data>> rangeReadChains(long nowInSeconds,
Dispatcher.RequestTime requestTime)
+ {
+ TxnRangeRead read = (TxnRangeRead) txn.read();
+ Seekables<?, ?> keys = txn.read().keys();
+ List<AsyncChain<Data>> results = new ArrayList<>();
+ keys.forEach(key -> {
+ TokenRange range = (TokenRange)key;
+ PartitionRangeReadCommand command = read.commandForSubrange(range,
nowInSeconds);
+
+ // This should only rarely occur when coordinators start a
transaction in a migrating range
+ // because they haven't yet updated their cluster metadata.
+ // It would be harmless to do the read, because it will be
rejected in `TxnQuery` anyways,
+ // but it's faster to skip the read
+ // TODO (required): To make migration work we need to validate
that the range is all on Accord
+ // if any part isn't we should reject the read
+// TableMigrationState tms =
ConsensusTableMigration.getTableMigrationState(command.metadata().id);
+// AccordClientRequestMetrics metrics = txn.kind().isWrite() ?
accordWriteMetrics : accordReadMetrics;
+// if
(ConsensusRequestRouter.instance.isKeyInMigratingOrMigratedRangeFromAccord(command.metadata(),
tms, command.partitionKey()))
+// {
+// metrics.migrationSkippedReads.mark();
+//
results.add(AsyncChains.success(TxnData.emptyPartition(fragment.txnDataName(),
command)));
+// return;
+// }
+
+
results.add(AsyncChains.ofCallable(Stage.ACCORD_MIGRATION.executor(), () -> {
+ TxnData result = new TxnData();
+ try (PartitionIterator iterator =
StorageProxy.getRangeSlice(command, consistencyLevel, this, requestTime))
Review Comment:
is this supposed to avoid going through accord? I thought the integration
is in `StorageProxy` in several cases, so feels unclear while reading this?
##########
src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java:
##########
@@ -290,6 +291,66 @@ private AsyncChain<Data> readChains()
}));
});
});
+ return results;
+ }
+
+ private List<AsyncChain<Data>> rangeReadChains(long nowInSeconds,
Dispatcher.RequestTime requestTime)
+ {
+ TxnRangeRead read = (TxnRangeRead) txn.read();
+ Seekables<?, ?> keys = txn.read().keys();
+ List<AsyncChain<Data>> results = new ArrayList<>();
+ keys.forEach(key -> {
+ TokenRange range = (TokenRange)key;
+ PartitionRangeReadCommand command = read.commandForSubrange(range,
nowInSeconds);
+
+ // This should only rarely occur when coordinators start a
transaction in a migrating range
+ // because they haven't yet updated their cluster metadata.
+ // It would be harmless to do the read, because it will be
rejected in `TxnQuery` anyways,
+ // but it's faster to skip the read
+ // TODO (required): To make migration work we need to validate
that the range is all on Accord
+ // if any part isn't we should reject the read
Review Comment:
so if part of the range isn't on accord, then we don't start the migration
and sequence the read after the migration? Rejecting the read would cause the
user to see an error, so we would break SLO?
##########
src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java:
##########
@@ -318,6 +319,119 @@ public long serializedSize(TokenKey key, int version)
}
}
+ // Allows the creation of a Range that is begin inclusive or end exclusive
for a given Token
+ public static final class MinTokenKey extends TokenKey
+ {
+ private static final long EMPTY_SIZE;
+
+ @Override
+ public Range asRange()
+ {
+ AccordRoutingKey before = token.isMinimum()
+ ? new SentinelKey(table, true)
+ : new MinTokenKey(table,
token.decreaseSlightly());
+
+ return new TokenRange(before, this);
+ }
+
+ static
+ {
+ EMPTY_SIZE = ObjectSizes.measure(new MinTokenKey(null, null));
+ }
+
+ public MinTokenKey(TableId tableId, Token token)
+ {
+ super(tableId, token);
+ }
+
+ public MinTokenKey withToken(Token token)
+ {
+ return new MinTokenKey(table, token);
+ }
+
+ @Override
+ public Token token()
+ {
+ return token;
+ }
+
+ @Override
+ public RoutingKeyKind kindOfRoutingKey()
+ {
+ return RoutingKeyKind.MIN_TOKEN;
+ }
+
+ @Override
+ public String suffix()
+ {
+ return token.toString();
+ }
+
+ public long estimatedSizeOnHeap()
+ {
+ return EMPTY_SIZE + token().getHeapSize();
+ }
+
+ public AccordRoutingKey withTable(TableId table)
+ {
+ return new MinTokenKey(table, token);
+ }
+
+ public static final MinTokenKey.Serializer serializer = new
MinTokenKey.Serializer();
+ public static class Serializer implements
AccordKeySerializer<MinTokenKey>
Review Comment:
this is copy/paste of `TokenKey.Serializer`, why not just extend that to
avoid this duplication?
```
public static class ExtendedTokenKey extends TokenKey
{
public static class Serializer extends TokenKey.Serializer
implements AccordKeySerializer<ExtendedTokenKey>
{
@Override
public void serialize(ExtendedTokenKey key, DataOutputPlus out,
int version) throws IOException
{
super.serialize(key, out, version);
}
@Override
public ExtendedTokenKey deserialize(DataInputPlus in, int
version) throws IOException
{
return super.deserialize(in, version);
}
@Override
public ExtendedTokenKey fromBytes(ByteBuffer bytes, IPartitioner
partitioner)
{
return super.fromBytes(bytes, partitioner);
}
@Override
public long serializedSize(ExtendedTokenKey key, int version)
{
return super.serializedSize(key, version);
}
}
public ExtendedTokenKey(TableId tableId, Token token)
{
super(tableId, token);
}
}
```
The other option would be to make the `new` a factory so both can define
##########
test/distributed/org/apache/cassandra/fuzz/sai/SingleNodeSAITestBase.java:
##########
@@ -95,52 +97,60 @@ public static void before() throws Throwable
sut = new InJvmSut(cluster);
}
- public SingleNodeSAITestBase(boolean withAccord)
+ public SingleNodeSAITestBase(TransactionalMode transactionalMode)
{
- this.withAccord = withAccord;
+ this.transactionalMode = transactionalMode;
}
@Test
public void basicSaiTest()
+ {
Review Comment:
FYI @ifesdjeen is rewriting this to be a random schema. So there is
conflict and feel that your patch will likely go in first... but just wanted to
point out this pending conflict
##########
test/distributed/org/apache/cassandra/fuzz/sai/SingleNodeSAITestBase.java:
##########
@@ -287,6 +294,15 @@ else if (i % FLUSH_SKIP == 0)
{
logger.debug("Partition index = {}, run = {}, j = {},
i = {}", partitionIndex, run, j, i);
+ // Much more succinct output for debugging
+// logger.info(query.toSelectStatement(columns,
!query.schemaSpec.isWriteTimeFromAccord()).toString());
+// logger.info("Expected:");
+// modelState.rows().values().stream().forEach(row ->
logger.info(row.toString(schema)));
+// List<ResultSetRow> rows = SelectHelper.execute(sut,
history.clock(), query);
+// logger.info("Found:");
+// rows.stream().forEach(row ->
logger.info(row.toString(schema)));
+// fail();
Review Comment:
remove?
##########
test/harry/main/org/apache/cassandra/harry/sut/injvm/InJvmSutBase.java:
##########
@@ -173,6 +173,11 @@ public Object[][] execute(String statement,
ConsistencyLevel cl, int coordinator
{
try
{
+ // Easy way to get a lot of statements being run
+// String bound = statement;
+// for (Object bind : bindings)
+// bound = bound.replaceFirst("\\?", bind.toString());
+// logger.info(bound);
Review Comment:
remove?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]