dcapwell commented on code in PR #3694:
URL: https://github.com/apache/cassandra/pull/3694#discussion_r1853094742
##########
src/java/org/apache/cassandra/service/accord/txn/TxnRangeRead.java:
##########
@@ -87,44 +88,50 @@ public class TxnRangeRead extends
AbstractSerialized<ReadCommand> implements Txn
{
private static final Logger logger =
LoggerFactory.getLogger(TxnRangeRead.class);
- public static final TxnRangeRead EMPTY = new TxnRangeRead(null, null,
null);
+ public static final TxnRangeRead EMPTY = new TxnRangeRead(null, null,
(Ranges)null);
private static final long EMPTY_SIZE = ObjectSizes.measure(EMPTY);
@Nonnull
private final ConsistencyLevel cassandraConsistencyLevel;
@Nonnull
private final Ranges covering;
- public TxnRangeRead(@Nonnull PartitionRangeReadCommand command, @Nonnull
ConsistencyLevel cassandraConsistencyLevel)
+ public TxnRangeRead(@Nonnull PartitionRangeReadCommand command, @Nonnull
List<AbstractBounds<PartitionPosition>> ranges, @Nonnull ConsistencyLevel
cassandraConsistencyLevel)
{
super(command);
checkArgument(cassandraConsistencyLevel == null ||
SUPPORTED_READ_CONSISTENCY_LEVELS.contains(cassandraConsistencyLevel),
"Unsupported consistency level for read");
this.cassandraConsistencyLevel = cassandraConsistencyLevel;
TableId tableId = command.metadata().id;
- AbstractBounds<PartitionPosition> range =
command.dataRange().keyRange();
-
- // Read commands can contain a mix of different kinds of bounds to
facilitate paging
- // and we need to communicate that to Accord as its own ranges. We
will use
- // TokenKey and Sentinel key and stick exclusively with left
exclusive/right inclusive
- // ranges rather add more types of ranges to the mix
- boolean inclusiveLeft = range.inclusiveLeft();
- Token startToken = range.left.getToken();
- AccordRoutingKey startAccordRoutingKey;
- if (startToken.isMinimum() && inclusiveLeft)
- startAccordRoutingKey = SentinelKey.min(tableId);
- else
- startAccordRoutingKey = new TokenKey(tableId, startToken);
-
- boolean inclusiveRight = range.inclusiveRight();
- Token stopToken = range.right.getToken();
- AccordRoutingKey stopAccordRoutingKey;
- if (inclusiveRight)
- stopAccordRoutingKey = new TokenKey(tableId, stopToken);
- else
- stopAccordRoutingKey = new TokenKey(tableId,
stopToken.decreaseSlightly());
-
- covering = Ranges.of(new TokenRange(startAccordRoutingKey,
stopAccordRoutingKey));
+ TokenRange[] accordRanges = new TokenRange[ranges.size()];
+ for (int i = 0; i < ranges.size(); i++)
+ {
+ AbstractBounds<PartitionPosition> range = ranges.get(i);
+ // Read commands can contain a mix of different kinds of bounds to
facilitate paging
+ // and we need to communicate that to Accord as its own ranges.
This uses
+ // TokenKey, SentinelKey, and MinTokenKey and sticks exclusively
with left exclusive/right inclusive
+ // ranges rather add more types of ranges to the mix
+ // MinTokenKey allows emulating inclusive left and exclusive right
with Range
+ boolean inclusiveLeft = range.inclusiveLeft();
+ Token startToken = range.left.getToken();
+ AccordRoutingKey startAccordRoutingKey;
+ if (startToken.isMinimum() && inclusiveLeft)
Review Comment:
confirmed, this logic isn't safe.
```
java.lang.IllegalArgumentException:
1b255f4d-ef25-40a6-0000-000000000005:9223372036854775807 >=
1b255f4d-ef25-40a6-0000-000000000005:-9223372036854775808
at accord.primitives.Range.<init>(Range.java:212)
at accord.primitives.Range$EndInclusive.<init>(Range.java:44)
at
org.apache.cassandra.service.accord.TokenRange.<init>(TokenRange.java:43)
at
org.apache.cassandra.service.accord.txn.TxnRangeRead.<init>(TxnRangeRead.java:132)
at
org.apache.cassandra.service.StorageProxy.readWithAccord(StorageProxy.java:2192)
at
org.apache.cassandra.service.reads.range.RangeCommandIterator.query(RangeCommandIterator.java:220)
at
org.apache.cassandra.service.reads.range.RangeCommandIterator.sendNextRequests(RangeCommandIterator.java:262)
at
org.apache.cassandra.service.reads.range.RangeCommandIterator.computeNext(RangeCommandIterator.java:132)
at
org.apache.cassandra.service.reads.range.RangeCommandIterator.computeNext(RangeCommandIterator.java:67)
```
test to repo this
```
@Test
public void simplified() throws IOException
{
try (Cluster cluster = Cluster.build(1)
.start())
{
init(cluster);
cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk blob
primary key) WITH " + TransactionalMode.full.asCqlParam()));
ICoordinator node = cluster.coordinator(1);
long token = Long.MIN_VALUE;
node.executeWithResult(withKeyspace("INSERT INTO %s.tbl (pk)
VALUES (?)"), QUORUM, token(token));
node.executeWithResult(withKeyspace("SELECT * FROM %s.tbl WHERE
token(pk) >= token(?)"), QUORUM, token(token));
}
}
private static ByteBuffer token(long token)
{
return Murmur3Partitioner.LongToken.keyForToken(token);
}
```
##########
src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java:
##########
@@ -290,6 +291,66 @@ private AsyncChain<Data> readChains()
}));
});
});
+ return results;
+ }
+
+ private List<AsyncChain<Data>> rangeReadChains(long nowInSeconds,
Dispatcher.RequestTime requestTime)
+ {
+ TxnRangeRead read = (TxnRangeRead) txn.read();
+ Seekables<?, ?> keys = txn.read().keys();
+ List<AsyncChain<Data>> results = new ArrayList<>();
+ keys.forEach(key -> {
+ TokenRange range = (TokenRange)key;
+ PartitionRangeReadCommand command = read.commandForSubrange(range,
nowInSeconds);
+
+ // This should only rarely occur when coordinators start a
transaction in a migrating range
+ // because they haven't yet updated their cluster metadata.
+ // It would be harmless to do the read, because it will be
rejected in `TxnQuery` anyways,
+ // but it's faster to skip the read
+ // TODO (required): To make migration work we need to validate
that the range is all on Accord
+ // if any part isn't we should reject the read
+// TableMigrationState tms =
ConsensusTableMigration.getTableMigrationState(command.metadata().id);
+// AccordClientRequestMetrics metrics = txn.kind().isWrite() ?
accordWriteMetrics : accordReadMetrics;
+// if
(ConsensusRequestRouter.instance.isKeyInMigratingOrMigratedRangeFromAccord(command.metadata(),
tms, command.partitionKey()))
+// {
+// metrics.migrationSkippedReads.mark();
+//
results.add(AsyncChains.success(TxnData.emptyPartition(fragment.txnDataName(),
command)));
+// return;
+// }
+
+
results.add(AsyncChains.ofCallable(Stage.ACCORD_MIGRATION.executor(), () -> {
+ TxnData result = new TxnData();
+ try (PartitionIterator iterator =
StorageProxy.getRangeSlice(command, consistencyLevel, this, requestTime))
+ {
+ TxnDataRangeValue value = new TxnDataRangeValue();
+ while (iterator.hasNext())
+ {
+ try (RowIterator partition = iterator.next())
+ {
+ FilteredPartition filtered =
FilteredPartition.create(partition);
+ if (filtered.hasRows() ||
command.selectsFullPartition())
+ value.add(filtered);
+ }
+ }
+ result.put(TxnData.txnDataName(TxnDataNameKind.USER),
value);
Review Comment:
since this is interop we don't need to worry about multi table reads? batch
doesn't allow select so this should be fine...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]