aweisberg commented on code in PR #3777:
URL: https://github.com/apache/cassandra/pull/3777#discussion_r1915648188
##########
src/java/org/apache/cassandra/service/consensus/migration/ConsensusRequestRouter.java:
##########
@@ -340,4 +512,410 @@ private static ConsensusRoutingDecision pickPaxos()
{
return Paxos.useV2() ? paxosV2 : paxosV1;
}
+
+ public static void validateSafeToReadNonTransactionally(ReadCommand
command)
+ {
+ if (command.allowsPotentialTxnConflicts())
+ return;
+
+ String keyspace = command.metadata().keyspace;
+ // System keyspaces are never managed by Accord
+ if (SchemaConstants.isSystemKeyspace(keyspace))
+ return;
+
+ // Local keyspaces are never managed by Accord
+ if (Schema.instance.localKeyspaces().containsKeyspace(keyspace))
+ return;
+
+ ClusterMetadata cm = ClusterMetadata.current();
+ TableId tableId = command.metadata().id;
+ TableMetadata tableMetadata = getTableMetadata(cm, tableId);
+ // Null for local tables
+ if (tableMetadata == null)
+ return;
+
+ TransactionalMode transactionalMode =
tableMetadata.params.transactionalMode;
+ TransactionalMigrationFromMode transactionalMigrationFromMode =
tableMetadata.params.transactionalMigrationFrom;
+ if (!transactionalMode.nonSerialReadsThroughAccord &&
!transactionalMigrationFromMode.nonSerialReadsThroughAccord())
+ return;
+
+ TableMigrationState tms =
cm.consensusMigrationState.tableStates.get(tableId);
+
+ // Null with a transaction mode that reads through Accord indicates a
completed migration or table created
+ // to use Accord initially
+ if (tms == null)
+ {
+ checkState(transactionalMigrationFromMode ==
TransactionalMigrationFromMode.none);
+ if (transactionalMode.nonSerialReadsThroughAccord)
+ {
+ ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId);
+ if (cfs != null)
+ cfs.metric.readsRejectedOnWrongSystem.mark();
+ throw new RetryOnDifferentSystemException();
+ }
+ }
+
+ boolean isExclusivelyReadableFromAccord;
+ if (command.isRangeRequest())
+ isExclusivelyReadableFromAccord =
isBoundsExclusivelyManagedByAccordForRead(transactionalMode,
transactionalMigrationFromMode, tms, command.dataRange().keyRange());
+ else
+ isExclusivelyReadableFromAccord =
isTokenExclusivelyManagedByAccordForRead(transactionalMode,
transactionalMigrationFromMode, tms,
((SinglePartitionReadCommand)command).partitionKey().getToken());
+
+ if (isExclusivelyReadableFromAccord)
+ {
+ ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId);
+ if (cfs != null)
+ cfs.metric.readsRejectedOnWrongSystem.mark();
+ throw new RetryOnDifferentSystemException();
+ }
+ }
+
+ private static boolean isTokenExclusivelyManagedByAccordForRead(@Nonnull
TransactionalMode transactionalMode,
+ @Nonnull
TransactionalMigrationFromMode migrationFrom,
+ @Nonnull
TableMigrationState tms,
+ @Nonnull
Token token)
+ {
+ checkNotNull(transactionalMode, "transactionalMode is null");
+ checkNotNull(migrationFrom, "migrationFrom is null");
+ checkNotNull(tms, "tms (TableMigrationState) is null");
+ checkNotNull(token, "bounds is null");
+
+ if (transactionalMode.accordIsEnabled)
+ {
+ if (!migrationFrom.isMigrating())
+ return true;
+ if (migrationFrom.migratingFromAccord())
+ return true;
+
+ // Accord is exclusive once the range is fully migrated to Accord,
but possible to read from safely
+ // when accordSafeToReadRanges covers the entire bound
+ if (tms.migratedRanges.intersects(token))
+ return true;
+ }
+ else
+ {
+ // Once the migration starts only barriers are allowed to run for
the key in Accord
+ if (migrationFrom.migratingFromAccord() &&
!tms.migratingAndMigratedRanges.intersects(token))
+ return true;
+ }
+
+ return false;
+ }
+
+ // Returns true if any part of the bound
+ private static boolean isBoundsExclusivelyManagedByAccordForRead(@Nonnull
TransactionalMode transactionalMode,
+ @Nonnull
TransactionalMigrationFromMode migrationFrom,
+ @Nonnull
TableMigrationState tms,
+ @Nonnull
AbstractBounds<PartitionPosition> bounds)
+ {
+ checkNotNull(transactionalMode, "transactionalMode is null");
+ checkNotNull(migrationFrom, "migrationFrom is null");
+ checkNotNull(tms, "tms (TableMigrationState) is null");
+ checkNotNull(bounds, "bounds is null");
+
+ BiPredicate<AbstractBounds<PartitionPosition>,
NormalizedRanges<Token>> intersects = (testBounds, testRanges) -> {
+ // TODO (nicetohave): Efficiency of this intersection
+ for (org.apache.cassandra.dht.Range<Token> range : testRanges)
+ {
+ Pair<AbstractBounds<PartitionPosition>,
AbstractBounds<PartitionPosition>> intersectionAndRemainder =
Range.intersectionAndRemainder(testBounds, range);
+ return intersectionAndRemainder.left != null;
+ }
+ return false;
+ };
+
+ if (bounds.left.getToken().equals(bounds.right.getToken()) &&
!bounds.inclusiveLeft() && bounds.inclusiveRight())
+ {
+ return isTokenExclusivelyManagedByAccordForRead(transactionalMode,
migrationFrom, tms, bounds.left.getToken());
+ }
+
+ if (transactionalMode.accordIsEnabled)
+ {
+ if (!migrationFrom.isMigrating())
+ return true;
+ if (migrationFrom.migratingFromAccord())
+ return true;
+
+ // Accord is exclusive once the range is fully migrated to Accord,
but possible to read from safely
+ // when accordSafeToReadRanges covers the entire bound
+ if (intersects.test(bounds, tms.migratedRanges))
+ return true;
+ }
+ else
+ {
+ // Once the migration starts only barriers are allowed to run for
the key in Accord
+ if (migrationFrom.migratingFromAccord() &&
!intersects.test(bounds, tms.migratingAndMigratedRanges))
+ return true;
+ }
+
+ return false;
+ }
+
+ public enum RangeReadTarget
+ {
+ accord,
+ normal
+ }
+
+ public static class RangeReadWithTarget
+ {
+ public final PartitionRangeReadCommand read;
+ public final RangeReadTarget target;
+
+ private RangeReadWithTarget(PartitionRangeReadCommand read,
RangeReadTarget target)
+ {
+ this.read = read;
+ this.target = target;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "RangeReadWithTarget{" +
+ "read=" + read +
+ ", target=" + target +
+ '}';
+ }
+ }
+
+ /**
+ * While it's possible to map the Accord read to a single txn it doesn't
seem worth it since it's a pretty unusual
+ * scenario where we do this during migration and have a lot of different
read commands.
+ */
+ public static List<RangeReadWithTarget>
splitReadIntoAccordAndNormal(ClusterMetadata cm, PartitionRangeReadCommand
read, ReadCoordinator readCoordinator, Dispatcher.RequestTime requestTime)
+ {
+ if (!readCoordinator.isEventuallyConsistent())
+ return ImmutableList.of(new RangeReadWithTarget(read,
RangeReadTarget.normal));
+ TableMetadata tm = getTableMetadata(cm, read.metadata().id);
+ if (tm == null ||
(!tm.params.transactionalMode.nonSerialReadsThroughAccord &&
!tm.params.transactionalMigrationFrom.nonSerialReadsThroughAccord()))
+ return ImmutableList.of(new RangeReadWithTarget(read,
RangeReadTarget.normal));
+
+ List<RangeReadWithTarget> result = null;
+ TransactionalMode transactionalMode = tm.params.transactionalMode;
+ TransactionalMigrationFromMode transactionalMigrationFromMode =
tm.params.transactionalMigrationFrom;
+ boolean transactionalModeReadsThroughAccord =
transactionalMode.nonSerialReadsThroughAccord;
+ RangeReadTarget migrationToTarget =
transactionalModeReadsThroughAccord ? RangeReadTarget.accord :
RangeReadTarget.normal;
+ boolean migrationFromReadsThroughAccord =
transactionalMigrationFromMode.nonSerialReadsThroughAccord();
+ RangeReadTarget migrationFromTarget = migrationFromReadsThroughAccord
? RangeReadTarget.accord : RangeReadTarget.normal;
+ TableMigrationState tms =
cm.consensusMigrationState.tableStates.get(tm.id);
+ if (tms == null)
+ {
+ if (transactionalMigrationFromMode ==
TransactionalMigrationFromMode.none)
+ // There is no migration and no TMS so do what the schema says
since no migration should be required
+ return ImmutableList.of(new RangeReadWithTarget(read,
transactionalModeReadsThroughAccord ? RangeReadTarget.accord :
RangeReadTarget.normal));
+ else
+ // If we are migrating from something and there is no
migration state the migration hasn't begun
+ // so continue to do what we are migrating from does until the
range is marked as migrating
+ return ImmutableList.of(new RangeReadWithTarget(read,
migrationFromReadsThroughAccord ? RangeReadTarget.accord :
RangeReadTarget.normal));
+ }
+
+
+ // AbstractBounds can potentially be left/right inclusive while Range
used to track migration is only right inclusive
+ // The right way to tackle this seems to be to find the tokens that
intersect the key range and then split until
+ // until nothing intersects
+ AbstractBounds<PartitionPosition> keyRange =
read.dataRange().keyRange();
+ AbstractBounds<PartitionPosition> remainder = keyRange;
+
+ // Migrating to Accord we only read through Accord when the range is
fully migrated, but migrating back
+ // we stop reading from Accord as soon as the range is marked
migrating and do key migration on read
+ NormalizedRanges<Token> migratedRanges =
transactionalModeReadsThroughAccord ? tms.migratedRanges :
tms.migratingAndMigratedRanges;
+
+ // Add the preceding range if any
+ if (!migratedRanges.isEmpty())
+ {
+ Token firstMigratingToken = migratedRanges.get(0).left.getToken();
+ int leftCmp =
keyRange.left.getToken().compareTo(firstMigratingToken);
+ int rightCmp = compareRightToken(keyRange.right.getToken(),
firstMigratingToken);
+ if (leftCmp <= 0)
+ {
+ if (rightCmp <= 0)
+ return ImmutableList.of(new RangeReadWithTarget(read,
migrationFromTarget));
+ result = new ArrayList<>();
+ AbstractBounds<PartitionPosition> precedingRange =
keyRange.withNewRight(rightCmp <= 0 ? keyRange.right :
firstMigratingToken.maxKeyBound());
+ // Could be an empty bound, it's fine to let a min KeyBound
and max KeyBound through as that isn't empty
+ if (!precedingRange.left.equals(precedingRange.right))
+ result.add(new
RangeReadWithTarget(read.forSubRange(precedingRange, false),
migrationFromTarget));
+ }
+ }
+
+ boolean hadAccordReads = false;
+ for (Range<Token> r : migratedRanges)
+ {
+ Pair<AbstractBounds<PartitionPosition>,
AbstractBounds<PartitionPosition>> intersectionAndRemainder =
Range.intersectionAndRemainder(remainder, r);
+ if (intersectionAndRemainder.left != null)
+ {
+ if (result == null)
+ result = new ArrayList<>();
+ PartitionRangeReadCommand subRead =
read.forSubRange(intersectionAndRemainder.left, result.isEmpty() ? true :
false);
+ result.add(new RangeReadWithTarget(subRead,
migrationToTarget));
+ hadAccordReads = true;
+ }
+ remainder = intersectionAndRemainder.right;
+ if (remainder == null)
+ break;
+ }
+
+ if (remainder != null)
+ {
+ if (result != null)
+ result.add(new RangeReadWithTarget(read.forSubRange(remainder,
true), migrationFromTarget));
+ else
+ return ImmutableList.of(new
RangeReadWithTarget(read.forSubRange(remainder, false), migrationFromTarget));
+ }
+
+ checkState(result != null && !result.isEmpty(), "Shouldn't have null
or empty result");
+
checkState(result.get(0).read.dataRange().startKey().equals(read.dataRange().startKey()),
"Split reads should encompass entire range");
+ checkState(result.get(result.size() -
1).read.dataRange().stopKey().equals(read.dataRange().stopKey()), "Split reads
should encompass entire range");
+ if (result.size() > 1)
+ {
+ for (int i = 0; i < result.size() - 1; i++)
+ {
+
checkState(result.get(i).read.dataRange().stopKey().equals(result.get(i +
1).read.dataRange().startKey()), "Split reads should all be adjacent");
+ checkState(result.get(i).target != result.get(i + 1).target,
"Split reads should be for different targets");
+ }
+ }
+
+ //TODO (later): The range reads need a barrier for now only going to
provide READ_COMMITTED
Review Comment:
I will create a JIRA.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]