aweisberg commented on code in PR #4508: URL: https://github.com/apache/cassandra/pull/4508#discussion_r2590365615
########## test/unit/org/apache/cassandra/service/replication/migration/MigrationRouterTest.java: ########## @@ -0,0 +1,550 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.replication.migration; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import com.google.common.collect.ImmutableMap; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.ServerTestUtils; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.DataRange; +import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.db.PartitionRangeReadCommand; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.filter.DataLimits; +import org.apache.cassandra.db.filter.RowFilter; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.NormalizedRanges; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.schema.DistributedSchema; +import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.Keyspaces; +import org.apache.cassandra.schema.ReplicationType; +import org.apache.cassandra.schema.SchemaConstants; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.schema.Tables; +import org.apache.cassandra.service.accord.AccordFastPath; +import org.apache.cassandra.service.accord.AccordStaleReplicas; +import org.apache.cassandra.service.consensus.migration.ConsensusMigrationState; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.Epoch; +import org.apache.cassandra.tcm.membership.Directory; +import org.apache.cassandra.tcm.ownership.DataPlacements; +import org.apache.cassandra.tcm.ownership.TokenMap; +import org.apache.cassandra.tcm.sequences.InProgressSequences; +import org.apache.cassandra.tcm.sequences.LockedRanges; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +public class MigrationRouterTest +{ + private static final String TEST_KEYSPACE = "test_ks"; + private static final String TEST_TABLE = "test_table"; + private static final String SYSTEM_KEYSPACE = SchemaConstants.SYSTEM_KEYSPACE_NAME; + private static TableMetadata systemTable; + + @BeforeClass + public static void setUpClass() + { + DatabaseDescriptor.daemonInitialization(); + ServerTestUtils.prepareServer(); + + systemTable = TableMetadata.builder(SYSTEM_KEYSPACE, "system_table") + .addPartitionKeyColumn("pk", UTF8Type.instance) + .addRegularColumn("value", UTF8Type.instance) + .partitioner(Murmur3Partitioner.instance) + .build(); + } + + /** + * Helper method to create a PartitionRangeReadCommand for a specific token range. + */ + private PartitionRangeReadCommand createRangeCommand(TableMetadata table, Token start, Token end) + { + Range<PartitionPosition> keyRange = new Range<>(start.minKeyBound(), end.maxKeyBound()); + DataRange dataRange = DataRange.forKeyRange(keyRange); + + return PartitionRangeReadCommand.create(table, + 0, // nowInSec + ColumnFilter.all(table), + RowFilter.none(), + DataLimits.NONE, + dataRange); + } + + private Token createToken(long value) + { + return new Murmur3Partitioner.LongToken(value); + } + + /** + * Helper method to create KeyspaceMigrationInfo for testing routing logic. + */ + private KeyspaceMigrationInfo createMigrationInfo(TableId tableId, List<Range<Token>> pendingRanges) + { + Map<TableId, NormalizedRanges<Token>> pendingRangesPerTable; + + if (pendingRanges.isEmpty()) + { + // Empty pending ranges means table has completed migration and is removed from map + // This matches the behavior of withRangesRepairedForTable + pendingRangesPerTable = Collections.emptyMap(); + } + else + { + // Create normalized ranges and add to map + NormalizedRanges<Token> normalizedRanges = NormalizedRanges.normalizedRanges(pendingRanges); + pendingRangesPerTable = Collections.singletonMap(tableId, normalizedRanges); + } + + return new KeyspaceMigrationInfo( + TEST_KEYSPACE, + pendingRangesPerTable, + Epoch.create(1)); + } + + /** + * Helper method to create ClusterMetadata with migration state and correct schema. + */ + private ClusterMetadata createMetadata(boolean isTracked, List<Range<Token>> pendingRanges) + { + Review Comment: Extra whitespace? ########## src/java/org/apache/cassandra/service/replication/migration/MigrationRouter.java: ########## @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.replication.migration; + +import java.util.ArrayList; +import java.util.List; + +import com.google.common.collect.ImmutableList; + +import org.apache.cassandra.db.IMutation; +import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.db.PartitionRangeReadCommand; +import org.apache.cassandra.db.SinglePartitionReadCommand; +import org.apache.cassandra.db.virtual.VirtualMutation; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.NormalizedRanges; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.schema.SchemaConstants; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.utils.Pair; + +import static com.google.common.base.Preconditions.checkState; + +/** + * Routes read and write requests based on schema and migration state. + * + * During migration in either direction, reads are untracked and writes are tracked + */ +public class MigrationRouter +{ + public static boolean shouldUseTracked(SinglePartitionReadCommand command) + { + // System keyspaces never use tracked replication + if (SchemaConstants.isSystemKeyspace(command.metadata().keyspace)) + return false; + + ClusterMetadata metadata = ClusterMetadata.current(); + + KeyspaceMigrationInfo migrationInfo = metadata.mutationTrackingMigrationState.getKeyspaceInfo(command.metadata().keyspace); + + if (migrationInfo == null) + return command.metadata().replicationType().isTracked(); + + Token token = command.partitionKey().getToken(); + boolean isTracked = command.metadata().replicationType().isTracked(); + + return migrationInfo.shouldUseTrackedForReads(isTracked, command.metadata().id(), token); + } + + /** + * Wrapper for a range read command paired with its routing decision. + */ + public static class RangeReadWithReplication + { + public final PartitionRangeReadCommand read; + public final boolean useTracked; + + public RangeReadWithReplication(PartitionRangeReadCommand read, boolean useTracked) + { + this.read = read; + this.useTracked = useTracked; + } + } + + /** + * Helper to create and add a range split to the result list. + */ + private static void addSplit(List<RangeReadWithReplication> result, + PartitionRangeReadCommand command, + AbstractBounds<PartitionPosition> range, + boolean isTracked) + { + boolean isFirst = result.isEmpty(); + result.add(new RangeReadWithReplication(command.forSubRange(range, isFirst), isTracked)); + } + + /** + * Adds a split for the non-pending region before pendingRange, if one exists. + * + * @param isTracked the target replication type (TO_TRACKED=true, TO_UNTRACKED=false) + * @return true if remainder ends before pendingRange (no intersection possible) + */ + private static boolean addNonPendingGapIfExists(List<RangeReadWithReplication> result, + PartitionRangeReadCommand command, + AbstractBounds<PartitionPosition> remainder, + Range<Token> pendingRange, + boolean isTracked) + { + Token pendingStart = pendingRange.left; + Token remainderStart = remainder.left.getToken(); + Token remainderEnd = remainder.right.getToken(); + + if (remainderStart.compareTo(pendingStart) >= 0) + return false; // No gap before pending range + + // Check if remainder ends before pending range starts + if (remainderEnd.compareTo(pendingStart) <= 0) + { + // Entire remainder is before this pending range - no intersection + // Non-pending regions use the new protocol (isTracked) + addSplit(result, command, remainder, isTracked); + return true; + } + + // Add the non-pending gap before pending range + AbstractBounds<PartitionPosition> gap = remainder.withNewRight(pendingStart.maxKeyBound()); + + if (!gap.left.equals(gap.right)) + addSplit(result, command, gap, isTracked); + + return false; + } + + /** + * Split a range by pending ranges, creating sub-ranges for each contiguous region. + * + * If we're migrating to tracked replication, pending ranges use untracked reads, non-pending uses tracked + * + * If we're migrating to untracked replication, pending uses tracked reads, and non-pending uses untracked + */ + private static List<RangeReadWithReplication> splitRangeByPendingRanges(PartitionRangeReadCommand command, + AbstractBounds<PartitionPosition> keyRange, + NormalizedRanges<Token> pendingRanges, + boolean isTracked) + { + List<RangeReadWithReplication> result = new ArrayList<>(); + AbstractBounds<PartitionPosition> remainder = keyRange; + + for (Range<Token> pendingRange : pendingRanges) + { + // Add non-pending gap before this pending range (if exists) + if (addNonPendingGapIfExists(result, command, remainder, pendingRange, isTracked)) + { + remainder = null; + break; // No more remainder to process + } + + // Add intersection with pending range + Pair<AbstractBounds<PartitionPosition>, AbstractBounds<PartitionPosition>> split = + Range.intersectionAndRemainder(remainder, pendingRange); + + // Pending regions use the old protocol (!isTracked) + if (split.left != null) + addSplit(result, command, split.left, !isTracked); + + remainder = split.right; + if (remainder == null) + break; + } + + // Add final non-pending remainder + if (remainder != null) + addSplit(result, command, remainder, isTracked); + + return result; + } + + /** + * Validate that splits are contiguous, cover the entire original range, and alternate protocols. + */ + private static void validateSplitContiguity(PartitionRangeReadCommand originalCommand, + List<RangeReadWithReplication> splits) + { + checkState(!splits.isEmpty(), "Shouldn't have empty result"); + + // Validate coverage + checkState(splits.get(0).read.dataRange().startKey() + .equals(originalCommand.dataRange().startKey()), + "Split reads should encompass entire range"); + checkState(splits.get(splits.size() - 1).read.dataRange().stopKey() + .equals(originalCommand.dataRange().stopKey()), + "Split reads should encompass entire range"); + + // Validate contiguity and alternating protocols + if (splits.size() > 1) + { + for (int i = 0; i < splits.size() - 1; i++) + { + checkState(splits.get(i).read.dataRange().stopKey() + .equals(splits.get(i + 1).read.dataRange().startKey()), + "Split reads should all be adjacent"); + checkState(splits.get(i).useTracked != splits.get(i + 1).useTracked, + "Split reads should be for different replication protocols"); + } + } + } + + /** + * Split a range read command into sub-ranges based on migration state. + */ + public static List<RangeReadWithReplication> splitRangeRead(ClusterMetadata metadata, + PartitionRangeReadCommand command) + { + // System keyspaces never use tracked replication + if (SchemaConstants.isSystemKeyspace(command.metadata().keyspace)) + return ImmutableList.of(new RangeReadWithReplication(command, false)); + + KeyspaceMigrationInfo migrationInfo = metadata.mutationTrackingMigrationState + .getKeyspaceInfo(command.metadata().keyspace); + + boolean isTracked = command.metadata().replicationType().isTracked(); + + // During migration, reads use untracked replication except for ranges that have + // completed migration to tracked. Therefore, we only need to split ranges when + // migrating to tracked replication. For untracked migrations, all reads use untracked. + if (!isTracked || migrationInfo == null) + return ImmutableList.of(new RangeReadWithReplication(command, isTracked)); + + // Get pending ranges for this table + NormalizedRanges<Token> tablePendingRanges = migrationInfo.pendingRangesPerTable.get(command.metadata().id()); + + // No pending ranges for this table - entire range uses current protocol + if (tablePendingRanges == null) + return ImmutableList.of(new RangeReadWithReplication(command, isTracked)); + + // split into pending (untracked) and non-pending (tracked) ranges + List<RangeReadWithReplication> result = splitRangeByPendingRanges( + command, + command.dataRange().keyRange(), + tablePendingRanges, + isTracked); + + // Validate the splits + validateSplitContiguity(command, result); + + return result; + } + + public static boolean shouldUseTrackedForWrites(ClusterMetadata metadata, String keyspace, TableId tableId, Token token) + { + if (SchemaConstants.isSystemKeyspace(keyspace)) + return false; + + KeyspaceMigrationInfo migrationInfo = metadata.mutationTrackingMigrationState + .getKeyspaceInfo(keyspace); + + if (migrationInfo == null) + return metadata.schema.getKeyspaceMetadata(keyspace).params.replicationType.isTracked(); + + boolean isTracked = metadata.schema.getKeyspaceMetadata(keyspace).params.replicationType.isTracked(); + return migrationInfo.shouldUseTrackedForWrites(isTracked, tableId, token); + } + + public static class RoutedMutations + { + public final List<? extends IMutation> trackedMutations; + public final List<? extends IMutation> untrackedMutations; + + public RoutedMutations(List<? extends IMutation> tracked, List<? extends IMutation> untracked) + { + this.trackedMutations = tracked; + this.untrackedMutations = untracked; + } + } + + /** + * Route a list of mutations, splitting them into tracked and untracked groups. + */ + public static RoutedMutations routeMutations(List<? extends IMutation> mutations) + { + List<IMutation> tracked = new ArrayList<>(); + List<IMutation> untracked = new ArrayList<>(); + ClusterMetadata cm = ClusterMetadata.current(); + + for (IMutation mutation : mutations) + { + if (mutation instanceof VirtualMutation) + { + untracked.add(mutation); + continue; + } + IMutation untrackedMutation = mutation.filter(tid -> !shouldUseTrackedForWrites(cm, mutation.getKeyspaceName(), tid, mutation.key().getToken())); + if (untrackedMutation != null) + untracked.add(mutation); + + IMutation trackedMutation = mutation.filter(tid -> shouldUseTrackedForWrites(cm, mutation.getKeyspaceName(), tid, mutation.key().getToken())); + if (trackedMutation != null) + tracked.add(mutation); + } + + return new RoutedMutations(tracked, untracked); + } + + public static boolean isFullyTracked(IMutation mutation) Review Comment: This has no tests? ########## src/java/org/apache/cassandra/service/replication/migration/MigrationRouter.java: ########## @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.replication.migration; + +import java.util.ArrayList; +import java.util.List; + +import com.google.common.collect.ImmutableList; + +import org.apache.cassandra.db.IMutation; +import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.db.PartitionRangeReadCommand; +import org.apache.cassandra.db.SinglePartitionReadCommand; +import org.apache.cassandra.db.virtual.VirtualMutation; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.NormalizedRanges; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.schema.SchemaConstants; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.utils.Pair; + +import static com.google.common.base.Preconditions.checkState; + +/** + * Routes read and write requests based on schema and migration state. + * + * During migration in either direction, reads are untracked and writes are tracked + */ +public class MigrationRouter +{ + public static boolean shouldUseTracked(SinglePartitionReadCommand command) + { + // System keyspaces never use tracked replication + if (SchemaConstants.isSystemKeyspace(command.metadata().keyspace)) + return false; + + ClusterMetadata metadata = ClusterMetadata.current(); + + KeyspaceMigrationInfo migrationInfo = metadata.mutationTrackingMigrationState.getKeyspaceInfo(command.metadata().keyspace); + + if (migrationInfo == null) + return command.metadata().replicationType().isTracked(); + + Token token = command.partitionKey().getToken(); + boolean isTracked = command.metadata().replicationType().isTracked(); + + return migrationInfo.shouldUseTrackedForReads(isTracked, command.metadata().id(), token); + } + + /** + * Wrapper for a range read command paired with its routing decision. + */ + public static class RangeReadWithReplication + { + public final PartitionRangeReadCommand read; + public final boolean useTracked; + + public RangeReadWithReplication(PartitionRangeReadCommand read, boolean useTracked) + { + this.read = read; + this.useTracked = useTracked; + } + } + + /** + * Helper to create and add a range split to the result list. + */ + private static void addSplit(List<RangeReadWithReplication> result, + PartitionRangeReadCommand command, + AbstractBounds<PartitionPosition> range, + boolean isTracked) + { + boolean isFirst = result.isEmpty(); + result.add(new RangeReadWithReplication(command.forSubRange(range, isFirst), isTracked)); + } + + /** + * Adds a split for the non-pending region before pendingRange, if one exists. + * + * @param isTracked the target replication type (TO_TRACKED=true, TO_UNTRACKED=false) + * @return true if remainder ends before pendingRange (no intersection possible) + */ + private static boolean addNonPendingGapIfExists(List<RangeReadWithReplication> result, + PartitionRangeReadCommand command, + AbstractBounds<PartitionPosition> remainder, + Range<Token> pendingRange, + boolean isTracked) + { + Token pendingStart = pendingRange.left; + Token remainderStart = remainder.left.getToken(); + Token remainderEnd = remainder.right.getToken(); + + if (remainderStart.compareTo(pendingStart) >= 0) + return false; // No gap before pending range + + // Check if remainder ends before pending range starts + if (remainderEnd.compareTo(pendingStart) <= 0) + { + // Entire remainder is before this pending range - no intersection + // Non-pending regions use the new protocol (isTracked) + addSplit(result, command, remainder, isTracked); + return true; + } + + // Add the non-pending gap before pending range + AbstractBounds<PartitionPosition> gap = remainder.withNewRight(pendingStart.maxKeyBound()); + + if (!gap.left.equals(gap.right)) + addSplit(result, command, gap, isTracked); + + return false; + } + + /** + * Split a range by pending ranges, creating sub-ranges for each contiguous region. + * + * If we're migrating to tracked replication, pending ranges use untracked reads, non-pending uses tracked + * + * If we're migrating to untracked replication, pending uses tracked reads, and non-pending uses untracked + */ + private static List<RangeReadWithReplication> splitRangeByPendingRanges(PartitionRangeReadCommand command, + AbstractBounds<PartitionPosition> keyRange, + NormalizedRanges<Token> pendingRanges, + boolean isTracked) + { + List<RangeReadWithReplication> result = new ArrayList<>(); + AbstractBounds<PartitionPosition> remainder = keyRange; + + for (Range<Token> pendingRange : pendingRanges) + { + // Add non-pending gap before this pending range (if exists) + if (addNonPendingGapIfExists(result, command, remainder, pendingRange, isTracked)) + { + remainder = null; + break; // No more remainder to process + } + + // Add intersection with pending range + Pair<AbstractBounds<PartitionPosition>, AbstractBounds<PartitionPosition>> split = + Range.intersectionAndRemainder(remainder, pendingRange); + + // Pending regions use the old protocol (!isTracked) + if (split.left != null) + addSplit(result, command, split.left, !isTracked); + + remainder = split.right; + if (remainder == null) + break; + } + + // Add final non-pending remainder + if (remainder != null) + addSplit(result, command, remainder, isTracked); + + return result; + } + + /** + * Validate that splits are contiguous, cover the entire original range, and alternate protocols. + */ + private static void validateSplitContiguity(PartitionRangeReadCommand originalCommand, + List<RangeReadWithReplication> splits) + { + checkState(!splits.isEmpty(), "Shouldn't have empty result"); + + // Validate coverage + checkState(splits.get(0).read.dataRange().startKey() + .equals(originalCommand.dataRange().startKey()), + "Split reads should encompass entire range"); + checkState(splits.get(splits.size() - 1).read.dataRange().stopKey() + .equals(originalCommand.dataRange().stopKey()), + "Split reads should encompass entire range"); + + // Validate contiguity and alternating protocols + if (splits.size() > 1) + { + for (int i = 0; i < splits.size() - 1; i++) + { + checkState(splits.get(i).read.dataRange().stopKey() + .equals(splits.get(i + 1).read.dataRange().startKey()), + "Split reads should all be adjacent"); + checkState(splits.get(i).useTracked != splits.get(i + 1).useTracked, + "Split reads should be for different replication protocols"); + } + } + } + + /** + * Split a range read command into sub-ranges based on migration state. + */ + public static List<RangeReadWithReplication> splitRangeRead(ClusterMetadata metadata, + PartitionRangeReadCommand command) + { + // System keyspaces never use tracked replication + if (SchemaConstants.isSystemKeyspace(command.metadata().keyspace)) + return ImmutableList.of(new RangeReadWithReplication(command, false)); + + KeyspaceMigrationInfo migrationInfo = metadata.mutationTrackingMigrationState + .getKeyspaceInfo(command.metadata().keyspace); + + boolean isTracked = command.metadata().replicationType().isTracked(); + + // During migration, reads use untracked replication except for ranges that have + // completed migration to tracked. Therefore, we only need to split ranges when + // migrating to tracked replication. For untracked migrations, all reads use untracked. + if (!isTracked || migrationInfo == null) + return ImmutableList.of(new RangeReadWithReplication(command, isTracked)); + + // Get pending ranges for this table + NormalizedRanges<Token> tablePendingRanges = migrationInfo.pendingRangesPerTable.get(command.metadata().id()); + + // No pending ranges for this table - entire range uses current protocol + if (tablePendingRanges == null) + return ImmutableList.of(new RangeReadWithReplication(command, isTracked)); + + // split into pending (untracked) and non-pending (tracked) ranges + List<RangeReadWithReplication> result = splitRangeByPendingRanges( + command, + command.dataRange().keyRange(), + tablePendingRanges, + isTracked); + + // Validate the splits + validateSplitContiguity(command, result); + + return result; + } + + public static boolean shouldUseTrackedForWrites(ClusterMetadata metadata, String keyspace, TableId tableId, Token token) + { + if (SchemaConstants.isSystemKeyspace(keyspace)) + return false; + + KeyspaceMigrationInfo migrationInfo = metadata.mutationTrackingMigrationState + .getKeyspaceInfo(keyspace); + + if (migrationInfo == null) + return metadata.schema.getKeyspaceMetadata(keyspace).params.replicationType.isTracked(); + + boolean isTracked = metadata.schema.getKeyspaceMetadata(keyspace).params.replicationType.isTracked(); + return migrationInfo.shouldUseTrackedForWrites(isTracked, tableId, token); + } + + public static class RoutedMutations + { + public final List<? extends IMutation> trackedMutations; + public final List<? extends IMutation> untrackedMutations; + + public RoutedMutations(List<? extends IMutation> tracked, List<? extends IMutation> untracked) + { + this.trackedMutations = tracked; + this.untrackedMutations = untracked; + } + } + + /** + * Route a list of mutations, splitting them into tracked and untracked groups. + */ + public static RoutedMutations routeMutations(List<? extends IMutation> mutations) Review Comment: There are no tests of `routeMutations` AFAICT. ########## src/java/org/apache/cassandra/service/replication/migration/KeyspaceMigrationInfo.java: ########## @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.replication.migration; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import javax.annotation.Nonnull; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.dht.NormalizedRanges; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.tcm.Epoch; +import org.apache.cassandra.tcm.serialization.MetadataSerializer; +import org.apache.cassandra.tcm.serialization.Version; + +import static org.apache.cassandra.db.TypeSizes.sizeof; +import static org.apache.cassandra.utils.CollectionSerializers.deserializeList; +import static org.apache.cassandra.utils.CollectionSerializers.deserializeMap; +import static org.apache.cassandra.utils.CollectionSerializers.serializeCollection; +import static org.apache.cassandra.utils.CollectionSerializers.serializeMap; +import static org.apache.cassandra.utils.CollectionSerializers.serializedCollectionSize; +import static org.apache.cassandra.utils.CollectionSerializers.serializedMapSize; + +/** + * TCM state tracking mutation tracking migration for a keyspace. Since repair advances the migration, and + * and repair sessions operate against tables, this class tracks repairs on every table that existed in the + * keyspace when the migration started. + * At the beginning of a migration, the full range is added to the pendingRanges, and as repairs are completed, the + * repaired ranges are subtracted from the pending ranges. When the pending range list is empty, the migration is finished. + */ +public class KeyspaceMigrationInfo +{ + @Nonnull public final String keyspace; + @Nonnull public final Map<TableId, NormalizedRanges<Token>> pendingRangesPerTable; + @Nonnull public final Epoch startedAtEpoch; + + public KeyspaceMigrationInfo(@Nonnull String keyspace, + @Nonnull Map<TableId, NormalizedRanges<Token>> pendingRangesPerTable, + @Nonnull Epoch startedAtEpoch) + { + this.keyspace = Objects.requireNonNull(keyspace); + this.pendingRangesPerTable = ImmutableMap.copyOf(pendingRangesPerTable); + this.startedAtEpoch = Objects.requireNonNull(startedAtEpoch); + } + + /** + * Reverse migration direction. Since unfinished migrations can be aborted, ranges that have not completed migrating + * in the previous direction are immediately rolled back. For ranges that did complete migration, or tables that were + * added since migration started, migration in the other direction is now required, so they're marked pending. + */ + public KeyspaceMigrationInfo withDirectionReversed(@Nonnull Collection<TableId> allTableIds, + @Nonnull Epoch epoch) + { + Token minimumToken = DatabaseDescriptor.getPartitioner().getMinimumToken(); + + // Reset all tables to full ring pending (includes tables currently migrating, added during migration, or already migrated) + ImmutableMap.Builder<TableId, NormalizedRanges<Token>> reversedPendingBuilder = ImmutableMap.builder(); + + for (TableId tableId : allTableIds) + { + Range<Token> fullRing = new Range<>(minimumToken, minimumToken); + NormalizedRanges<Token> reversedRanges = NormalizedRanges.normalizedRanges(Collections.singleton(fullRing)); + + NormalizedRanges<Token> existingPending = pendingRangesPerTable.get(tableId); + if (existingPending != null) + { + Set<Range<Token>> ranges = Range.subtract(Collections.singletonList(fullRing), existingPending); + reversedRanges = NormalizedRanges.normalizedRanges(ranges); + } + + if (!reversedRanges.isEmpty()) + reversedPendingBuilder.put(tableId, reversedRanges); + } + + return new KeyspaceMigrationInfo( Review Comment: OK I see my misunderstanding here. It defaults to full range and then if there are existing pending ranges it uses that to narrow it further. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

