bdeggleston commented on code in PR #4508: URL: https://github.com/apache/cassandra/pull/4508#discussion_r2600274081
########## src/java/org/apache/cassandra/service/replication/migration/MigrationRouter.java: ########## @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.replication.migration; + +import java.util.ArrayList; +import java.util.List; + +import com.google.common.collect.ImmutableList; + +import org.apache.cassandra.db.IMutation; +import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.db.PartitionRangeReadCommand; +import org.apache.cassandra.db.SinglePartitionReadCommand; +import org.apache.cassandra.db.virtual.VirtualMutation; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.NormalizedRanges; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.schema.SchemaConstants; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.utils.Pair; + +import static com.google.common.base.Preconditions.checkState; + +/** + * Routes read and write requests based on schema and migration state. + * + * During migration in either direction, reads are untracked and writes are tracked + */ +public class MigrationRouter +{ + public static boolean shouldUseTracked(SinglePartitionReadCommand command) + { + // System keyspaces never use tracked replication + if (SchemaConstants.isSystemKeyspace(command.metadata().keyspace)) + return false; + + ClusterMetadata metadata = ClusterMetadata.current(); + + KeyspaceMigrationInfo migrationInfo = metadata.mutationTrackingMigrationState.getKeyspaceInfo(command.metadata().keyspace); + + if (migrationInfo == null) + return command.metadata().replicationType().isTracked(); + + Token token = command.partitionKey().getToken(); + boolean isTracked = command.metadata().replicationType().isTracked(); + + return migrationInfo.shouldUseTrackedForReads(isTracked, command.metadata().id(), token); + } + + /** + * Wrapper for a range read command paired with its routing decision. + */ + public static class RangeReadWithReplication + { + public final PartitionRangeReadCommand read; + public final boolean useTracked; + + public RangeReadWithReplication(PartitionRangeReadCommand read, boolean useTracked) + { + this.read = read; + this.useTracked = useTracked; + } + } + + /** + * Helper to create and add a range split to the result list. + */ + private static void addSplit(List<RangeReadWithReplication> result, + PartitionRangeReadCommand command, + AbstractBounds<PartitionPosition> range, + boolean isTracked) + { + boolean isFirst = result.isEmpty(); + result.add(new RangeReadWithReplication(command.forSubRange(range, isFirst), isTracked)); + } + + /** + * Adds a split for the non-pending region before pendingRange, if one exists. + * + * @param isTracked the target replication type (TO_TRACKED=true, TO_UNTRACKED=false) + * @return true if remainder ends before pendingRange (no intersection possible) + */ + private static boolean addNonPendingGapIfExists(List<RangeReadWithReplication> result, + PartitionRangeReadCommand command, + AbstractBounds<PartitionPosition> remainder, + Range<Token> pendingRange, + boolean isTracked) + { + Token pendingStart = pendingRange.left; + Token remainderStart = remainder.left.getToken(); + Token remainderEnd = remainder.right.getToken(); + + if (remainderStart.compareTo(pendingStart) >= 0) + return false; // No gap before pending range + + // Check if remainder ends before pending range starts + if (remainderEnd.compareTo(pendingStart) <= 0) + { + // Entire remainder is before this pending range - no intersection + // Non-pending regions use the new protocol (isTracked) + addSplit(result, command, remainder, isTracked); + return true; + } + + // Add the non-pending gap before pending range + AbstractBounds<PartitionPosition> gap = remainder.withNewRight(pendingStart.maxKeyBound()); + + if (!gap.left.equals(gap.right)) + addSplit(result, command, gap, isTracked); + + return false; + } + + /** + * Split a range by pending ranges, creating sub-ranges for each contiguous region. + * + * If we're migrating to tracked replication, pending ranges use untracked reads, non-pending uses tracked + * + * If we're migrating to untracked replication, pending uses tracked reads, and non-pending uses untracked + */ + private static List<RangeReadWithReplication> splitRangeByPendingRanges(PartitionRangeReadCommand command, + AbstractBounds<PartitionPosition> keyRange, + NormalizedRanges<Token> pendingRanges, + boolean isTracked) + { + List<RangeReadWithReplication> result = new ArrayList<>(); + AbstractBounds<PartitionPosition> remainder = keyRange; + + for (Range<Token> pendingRange : pendingRanges) + { + // Add non-pending gap before this pending range (if exists) + if (addNonPendingGapIfExists(result, command, remainder, pendingRange, isTracked)) + { + remainder = null; + break; // No more remainder to process + } + + // Add intersection with pending range + Pair<AbstractBounds<PartitionPosition>, AbstractBounds<PartitionPosition>> split = + Range.intersectionAndRemainder(remainder, pendingRange); + + // Pending regions use the old protocol (!isTracked) + if (split.left != null) + addSplit(result, command, split.left, !isTracked); + + remainder = split.right; + if (remainder == null) + break; + } + + // Add final non-pending remainder + if (remainder != null) + addSplit(result, command, remainder, isTracked); + + return result; + } + + /** + * Validate that splits are contiguous, cover the entire original range, and alternate protocols. + */ + private static void validateSplitContiguity(PartitionRangeReadCommand originalCommand, + List<RangeReadWithReplication> splits) + { + checkState(!splits.isEmpty(), "Shouldn't have empty result"); + + // Validate coverage + checkState(splits.get(0).read.dataRange().startKey() + .equals(originalCommand.dataRange().startKey()), + "Split reads should encompass entire range"); + checkState(splits.get(splits.size() - 1).read.dataRange().stopKey() + .equals(originalCommand.dataRange().stopKey()), + "Split reads should encompass entire range"); + + // Validate contiguity and alternating protocols + if (splits.size() > 1) + { + for (int i = 0; i < splits.size() - 1; i++) + { + checkState(splits.get(i).read.dataRange().stopKey() + .equals(splits.get(i + 1).read.dataRange().startKey()), + "Split reads should all be adjacent"); + checkState(splits.get(i).useTracked != splits.get(i + 1).useTracked, + "Split reads should be for different replication protocols"); + } + } + } + + /** + * Split a range read command into sub-ranges based on migration state. + */ + public static List<RangeReadWithReplication> splitRangeRead(ClusterMetadata metadata, + PartitionRangeReadCommand command) + { + // System keyspaces never use tracked replication + if (SchemaConstants.isSystemKeyspace(command.metadata().keyspace)) + return ImmutableList.of(new RangeReadWithReplication(command, false)); + + KeyspaceMigrationInfo migrationInfo = metadata.mutationTrackingMigrationState + .getKeyspaceInfo(command.metadata().keyspace); + + boolean isTracked = command.metadata().replicationType().isTracked(); + + // During migration, reads use untracked replication except for ranges that have + // completed migration to tracked. Therefore, we only need to split ranges when + // migrating to tracked replication. For untracked migrations, all reads use untracked. + if (!isTracked || migrationInfo == null) + return ImmutableList.of(new RangeReadWithReplication(command, isTracked)); + + // Get pending ranges for this table + NormalizedRanges<Token> tablePendingRanges = migrationInfo.pendingRangesPerTable.get(command.metadata().id()); + + // No pending ranges for this table - entire range uses current protocol + if (tablePendingRanges == null) + return ImmutableList.of(new RangeReadWithReplication(command, isTracked)); + + // split into pending (untracked) and non-pending (tracked) ranges + List<RangeReadWithReplication> result = splitRangeByPendingRanges( + command, + command.dataRange().keyRange(), + tablePendingRanges, + isTracked); + + // Validate the splits + validateSplitContiguity(command, result); + + return result; + } + + public static boolean shouldUseTrackedForWrites(ClusterMetadata metadata, String keyspace, TableId tableId, Token token) + { + if (SchemaConstants.isSystemKeyspace(keyspace)) + return false; + + KeyspaceMigrationInfo migrationInfo = metadata.mutationTrackingMigrationState + .getKeyspaceInfo(keyspace); + + if (migrationInfo == null) + return metadata.schema.getKeyspaceMetadata(keyspace).params.replicationType.isTracked(); + + boolean isTracked = metadata.schema.getKeyspaceMetadata(keyspace).params.replicationType.isTracked(); Review Comment: `DistributedSchema#getKeyspaceMetadata` will throw a no such element exception if there's no keyspace. We have to throw something here if we've raced w/ a dropped keyspace. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

