aweisberg commented on code in PR #4508:
URL: https://github.com/apache/cassandra/pull/4508#discussion_r2590365615


##########
test/unit/org/apache/cassandra/service/replication/migration/MigrationRouterTest.java:
##########
@@ -0,0 +1,550 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.replication.migration;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.ServerTestUtils;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.DataRange;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.PartitionRangeReadCommand;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.NormalizedRanges;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.schema.DistributedSchema;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Keyspaces;
+import org.apache.cassandra.schema.ReplicationType;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.Tables;
+import org.apache.cassandra.service.accord.AccordFastPath;
+import org.apache.cassandra.service.accord.AccordStaleReplicas;
+import 
org.apache.cassandra.service.consensus.migration.ConsensusMigrationState;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.ownership.DataPlacements;
+import org.apache.cassandra.tcm.ownership.TokenMap;
+import org.apache.cassandra.tcm.sequences.InProgressSequences;
+import org.apache.cassandra.tcm.sequences.LockedRanges;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+public class MigrationRouterTest
+{
+    private static final String TEST_KEYSPACE = "test_ks";
+    private static final String TEST_TABLE = "test_table";
+    private static final String SYSTEM_KEYSPACE = 
SchemaConstants.SYSTEM_KEYSPACE_NAME;
+    private static TableMetadata systemTable;
+
+    @BeforeClass
+    public static void setUpClass()
+    {
+        DatabaseDescriptor.daemonInitialization();
+        ServerTestUtils.prepareServer();
+
+        systemTable = TableMetadata.builder(SYSTEM_KEYSPACE, "system_table")
+                                   .addPartitionKeyColumn("pk", 
UTF8Type.instance)
+                                   .addRegularColumn("value", 
UTF8Type.instance)
+                                   .partitioner(Murmur3Partitioner.instance)
+                                   .build();
+    }
+
+    /**
+     * Helper method to create a PartitionRangeReadCommand for a specific 
token range.
+     */
+    private PartitionRangeReadCommand createRangeCommand(TableMetadata table, 
Token start, Token end)
+    {
+        Range<PartitionPosition> keyRange = new Range<>(start.minKeyBound(), 
end.maxKeyBound());
+        DataRange dataRange = DataRange.forKeyRange(keyRange);
+
+        return PartitionRangeReadCommand.create(table,
+                                                0, // nowInSec
+                                                ColumnFilter.all(table),
+                                                RowFilter.none(),
+                                                DataLimits.NONE,
+                                                dataRange);
+    }
+
+    private Token createToken(long value)
+    {
+        return new Murmur3Partitioner.LongToken(value);
+    }
+
+    /**
+     * Helper method to create KeyspaceMigrationInfo for testing routing logic.
+     */
+    private KeyspaceMigrationInfo createMigrationInfo(TableId tableId, 
List<Range<Token>> pendingRanges)
+    {
+        Map<TableId, NormalizedRanges<Token>> pendingRangesPerTable;
+
+        if (pendingRanges.isEmpty())
+        {
+            // Empty pending ranges means table has completed migration and is 
removed from map
+            // This matches the behavior of withRangesRepairedForTable
+            pendingRangesPerTable = Collections.emptyMap();
+        }
+        else
+        {
+            // Create normalized ranges and add to map
+            NormalizedRanges<Token> normalizedRanges = 
NormalizedRanges.normalizedRanges(pendingRanges);
+            pendingRangesPerTable = Collections.singletonMap(tableId, 
normalizedRanges);
+        }
+
+        return new KeyspaceMigrationInfo(
+            TEST_KEYSPACE,
+            pendingRangesPerTable,
+            Epoch.create(1));
+    }
+
+    /**
+     * Helper method to create ClusterMetadata with migration state and 
correct schema.
+     */
+    private ClusterMetadata createMetadata(boolean isTracked, 
List<Range<Token>> pendingRanges)
+    {
+

Review Comment:
   Extra whitespace?



##########
src/java/org/apache/cassandra/service/replication/migration/MigrationRouter.java:
##########
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.replication.migration;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+
+import org.apache.cassandra.db.IMutation;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.PartitionRangeReadCommand;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.virtual.VirtualMutation;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.NormalizedRanges;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.utils.Pair;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Routes read and write requests based on schema and migration state.
+ *
+ * During migration in either direction, reads are untracked and writes are 
tracked
+ */
+public class MigrationRouter
+{
+    public static boolean shouldUseTracked(SinglePartitionReadCommand command)
+    {
+        // System keyspaces never use tracked replication
+        if (SchemaConstants.isSystemKeyspace(command.metadata().keyspace))
+            return false;
+
+        ClusterMetadata metadata = ClusterMetadata.current();
+
+        KeyspaceMigrationInfo migrationInfo = 
metadata.mutationTrackingMigrationState.getKeyspaceInfo(command.metadata().keyspace);
+
+        if (migrationInfo == null)
+            return command.metadata().replicationType().isTracked();
+
+        Token token = command.partitionKey().getToken();
+        boolean isTracked = command.metadata().replicationType().isTracked();
+
+        return migrationInfo.shouldUseTrackedForReads(isTracked, 
command.metadata().id(), token);
+    }
+
+    /**
+     * Wrapper for a range read command paired with its routing decision.
+     */
+    public static class RangeReadWithReplication
+    {
+        public final PartitionRangeReadCommand read;
+        public final boolean useTracked;
+
+        public RangeReadWithReplication(PartitionRangeReadCommand read, 
boolean useTracked)
+        {
+            this.read = read;
+            this.useTracked = useTracked;
+        }
+    }
+
+    /**
+     * Helper to create and add a range split to the result list.
+     */
+    private static void addSplit(List<RangeReadWithReplication> result,
+                                PartitionRangeReadCommand command,
+                                AbstractBounds<PartitionPosition> range,
+                                boolean isTracked)
+    {
+        boolean isFirst = result.isEmpty();
+        result.add(new RangeReadWithReplication(command.forSubRange(range, 
isFirst), isTracked));
+    }
+
+    /**
+     * Adds a split for the non-pending region before pendingRange, if one 
exists.
+     *
+     * @param isTracked the target replication type (TO_TRACKED=true, 
TO_UNTRACKED=false)
+     * @return true if remainder ends before pendingRange (no intersection 
possible)
+     */
+    private static boolean 
addNonPendingGapIfExists(List<RangeReadWithReplication> result,
+                                                    PartitionRangeReadCommand 
command,
+                                                    
AbstractBounds<PartitionPosition> remainder,
+                                                    Range<Token> pendingRange,
+                                                    boolean isTracked)
+    {
+        Token pendingStart = pendingRange.left;
+        Token remainderStart = remainder.left.getToken();
+        Token remainderEnd = remainder.right.getToken();
+
+        if (remainderStart.compareTo(pendingStart) >= 0)
+            return false; // No gap before pending range
+
+        // Check if remainder ends before pending range starts
+        if (remainderEnd.compareTo(pendingStart) <= 0)
+        {
+            // Entire remainder is before this pending range - no intersection
+            // Non-pending regions use the new protocol (isTracked)
+            addSplit(result, command, remainder, isTracked);
+            return true;
+        }
+
+        // Add the non-pending gap before pending range
+        AbstractBounds<PartitionPosition> gap = 
remainder.withNewRight(pendingStart.maxKeyBound());
+
+        if (!gap.left.equals(gap.right))
+            addSplit(result, command, gap, isTracked);
+
+        return false;
+    }
+
+    /**
+     * Split a range by pending ranges, creating sub-ranges for each 
contiguous region.
+     *
+     * If we're migrating to tracked replication, pending ranges use untracked 
reads, non-pending uses tracked
+     *
+     * If we're migrating to untracked replication, pending uses tracked 
reads, and non-pending uses untracked
+     */
+    private static List<RangeReadWithReplication> 
splitRangeByPendingRanges(PartitionRangeReadCommand command,
+                                                                            
AbstractBounds<PartitionPosition> keyRange,
+                                                                            
NormalizedRanges<Token> pendingRanges,
+                                                                            
boolean isTracked)
+    {
+        List<RangeReadWithReplication> result = new ArrayList<>();
+        AbstractBounds<PartitionPosition> remainder = keyRange;
+
+        for (Range<Token> pendingRange : pendingRanges)
+        {
+            // Add non-pending gap before this pending range (if exists)
+            if (addNonPendingGapIfExists(result, command, remainder, 
pendingRange, isTracked))
+            {
+                remainder = null;
+                break; // No more remainder to process
+            }
+
+            // Add intersection with pending range
+            Pair<AbstractBounds<PartitionPosition>, 
AbstractBounds<PartitionPosition>> split =
+                Range.intersectionAndRemainder(remainder, pendingRange);
+
+            // Pending regions use the old protocol (!isTracked)
+            if (split.left != null)
+                addSplit(result, command, split.left, !isTracked);
+
+            remainder = split.right;
+            if (remainder == null)
+                break;
+        }
+
+        // Add final non-pending remainder
+        if (remainder != null)
+            addSplit(result, command, remainder, isTracked);
+
+        return result;
+    }
+
+    /**
+     * Validate that splits are contiguous, cover the entire original range, 
and alternate protocols.
+     */
+    private static void validateSplitContiguity(PartitionRangeReadCommand 
originalCommand,
+                                               List<RangeReadWithReplication> 
splits)
+    {
+        checkState(!splits.isEmpty(), "Shouldn't have empty result");
+
+        // Validate coverage
+        checkState(splits.get(0).read.dataRange().startKey()
+                        .equals(originalCommand.dataRange().startKey()),
+                   "Split reads should encompass entire range");
+        checkState(splits.get(splits.size() - 1).read.dataRange().stopKey()
+                        .equals(originalCommand.dataRange().stopKey()),
+                   "Split reads should encompass entire range");
+
+        // Validate contiguity and alternating protocols
+        if (splits.size() > 1)
+        {
+            for (int i = 0; i < splits.size() - 1; i++)
+            {
+                checkState(splits.get(i).read.dataRange().stopKey()
+                                .equals(splits.get(i + 
1).read.dataRange().startKey()),
+                           "Split reads should all be adjacent");
+                checkState(splits.get(i).useTracked != splits.get(i + 
1).useTracked,
+                           "Split reads should be for different replication 
protocols");
+            }
+        }
+    }
+
+    /**
+     * Split a range read command into sub-ranges based on migration state.
+     */
+    public static List<RangeReadWithReplication> 
splitRangeRead(ClusterMetadata metadata,
+                                                                 
PartitionRangeReadCommand command)
+    {
+        // System keyspaces never use tracked replication
+        if (SchemaConstants.isSystemKeyspace(command.metadata().keyspace))
+            return ImmutableList.of(new RangeReadWithReplication(command, 
false));
+
+        KeyspaceMigrationInfo migrationInfo = 
metadata.mutationTrackingMigrationState
+                                              
.getKeyspaceInfo(command.metadata().keyspace);
+
+        boolean isTracked = command.metadata().replicationType().isTracked();
+
+        // During migration, reads use untracked replication except for ranges 
that have
+        // completed migration to tracked. Therefore, we only need to split 
ranges when
+        // migrating to tracked replication. For untracked migrations, all 
reads use untracked.
+        if (!isTracked || migrationInfo == null)
+            return ImmutableList.of(new RangeReadWithReplication(command, 
isTracked));
+
+        // Get pending ranges for this table
+        NormalizedRanges<Token> tablePendingRanges = 
migrationInfo.pendingRangesPerTable.get(command.metadata().id());
+
+        // No pending ranges for this table - entire range uses current 
protocol
+        if (tablePendingRanges == null)
+            return ImmutableList.of(new RangeReadWithReplication(command, 
isTracked));
+
+        // split into pending (untracked) and non-pending (tracked) ranges
+        List<RangeReadWithReplication> result = splitRangeByPendingRanges(
+            command,
+            command.dataRange().keyRange(),
+            tablePendingRanges,
+            isTracked);
+
+        // Validate the splits
+        validateSplitContiguity(command, result);
+
+        return result;
+    }
+
+    public static boolean shouldUseTrackedForWrites(ClusterMetadata metadata, 
String keyspace, TableId tableId, Token token)
+    {
+        if (SchemaConstants.isSystemKeyspace(keyspace))
+            return false;
+
+        KeyspaceMigrationInfo migrationInfo = 
metadata.mutationTrackingMigrationState
+                                              .getKeyspaceInfo(keyspace);
+
+        if (migrationInfo == null)
+            return 
metadata.schema.getKeyspaceMetadata(keyspace).params.replicationType.isTracked();
+
+        boolean isTracked = 
metadata.schema.getKeyspaceMetadata(keyspace).params.replicationType.isTracked();
+        return migrationInfo.shouldUseTrackedForWrites(isTracked, tableId, 
token);
+    }
+
+    public static class RoutedMutations
+    {
+        public final List<? extends IMutation> trackedMutations;
+        public final List<? extends IMutation> untrackedMutations;
+
+        public RoutedMutations(List<? extends IMutation> tracked, List<? 
extends IMutation> untracked)
+        {
+            this.trackedMutations = tracked;
+            this.untrackedMutations = untracked;
+        }
+    }
+
+    /**
+     * Route a list of mutations, splitting them into tracked and untracked 
groups.
+     */
+    public static RoutedMutations routeMutations(List<? extends IMutation> 
mutations)
+    {
+        List<IMutation> tracked = new ArrayList<>();
+        List<IMutation> untracked = new ArrayList<>();
+        ClusterMetadata cm = ClusterMetadata.current();
+
+        for (IMutation mutation : mutations)
+        {
+            if (mutation instanceof VirtualMutation)
+            {
+                untracked.add(mutation);
+                continue;
+            }
+            IMutation untrackedMutation = mutation.filter(tid -> 
!shouldUseTrackedForWrites(cm, mutation.getKeyspaceName(), tid, 
mutation.key().getToken()));
+            if (untrackedMutation != null)
+                untracked.add(mutation);
+
+            IMutation trackedMutation = mutation.filter(tid -> 
shouldUseTrackedForWrites(cm, mutation.getKeyspaceName(), tid, 
mutation.key().getToken()));
+            if (trackedMutation != null)
+                tracked.add(mutation);
+        }
+
+        return new RoutedMutations(tracked, untracked);
+    }
+
+    public static boolean isFullyTracked(IMutation mutation)

Review Comment:
   This has no tests?



##########
src/java/org/apache/cassandra/service/replication/migration/MigrationRouter.java:
##########
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.replication.migration;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+
+import org.apache.cassandra.db.IMutation;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.PartitionRangeReadCommand;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.virtual.VirtualMutation;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.NormalizedRanges;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.utils.Pair;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Routes read and write requests based on schema and migration state.
+ *
+ * During migration in either direction, reads are untracked and writes are 
tracked
+ */
+public class MigrationRouter
+{
+    public static boolean shouldUseTracked(SinglePartitionReadCommand command)
+    {
+        // System keyspaces never use tracked replication
+        if (SchemaConstants.isSystemKeyspace(command.metadata().keyspace))
+            return false;
+
+        ClusterMetadata metadata = ClusterMetadata.current();
+
+        KeyspaceMigrationInfo migrationInfo = 
metadata.mutationTrackingMigrationState.getKeyspaceInfo(command.metadata().keyspace);
+
+        if (migrationInfo == null)
+            return command.metadata().replicationType().isTracked();
+
+        Token token = command.partitionKey().getToken();
+        boolean isTracked = command.metadata().replicationType().isTracked();
+
+        return migrationInfo.shouldUseTrackedForReads(isTracked, 
command.metadata().id(), token);
+    }
+
+    /**
+     * Wrapper for a range read command paired with its routing decision.
+     */
+    public static class RangeReadWithReplication
+    {
+        public final PartitionRangeReadCommand read;
+        public final boolean useTracked;
+
+        public RangeReadWithReplication(PartitionRangeReadCommand read, 
boolean useTracked)
+        {
+            this.read = read;
+            this.useTracked = useTracked;
+        }
+    }
+
+    /**
+     * Helper to create and add a range split to the result list.
+     */
+    private static void addSplit(List<RangeReadWithReplication> result,
+                                PartitionRangeReadCommand command,
+                                AbstractBounds<PartitionPosition> range,
+                                boolean isTracked)
+    {
+        boolean isFirst = result.isEmpty();
+        result.add(new RangeReadWithReplication(command.forSubRange(range, 
isFirst), isTracked));
+    }
+
+    /**
+     * Adds a split for the non-pending region before pendingRange, if one 
exists.
+     *
+     * @param isTracked the target replication type (TO_TRACKED=true, 
TO_UNTRACKED=false)
+     * @return true if remainder ends before pendingRange (no intersection 
possible)
+     */
+    private static boolean 
addNonPendingGapIfExists(List<RangeReadWithReplication> result,
+                                                    PartitionRangeReadCommand 
command,
+                                                    
AbstractBounds<PartitionPosition> remainder,
+                                                    Range<Token> pendingRange,
+                                                    boolean isTracked)
+    {
+        Token pendingStart = pendingRange.left;
+        Token remainderStart = remainder.left.getToken();
+        Token remainderEnd = remainder.right.getToken();
+
+        if (remainderStart.compareTo(pendingStart) >= 0)
+            return false; // No gap before pending range
+
+        // Check if remainder ends before pending range starts
+        if (remainderEnd.compareTo(pendingStart) <= 0)
+        {
+            // Entire remainder is before this pending range - no intersection
+            // Non-pending regions use the new protocol (isTracked)
+            addSplit(result, command, remainder, isTracked);
+            return true;
+        }
+
+        // Add the non-pending gap before pending range
+        AbstractBounds<PartitionPosition> gap = 
remainder.withNewRight(pendingStart.maxKeyBound());
+
+        if (!gap.left.equals(gap.right))
+            addSplit(result, command, gap, isTracked);
+
+        return false;
+    }
+
+    /**
+     * Split a range by pending ranges, creating sub-ranges for each 
contiguous region.
+     *
+     * If we're migrating to tracked replication, pending ranges use untracked 
reads, non-pending uses tracked
+     *
+     * If we're migrating to untracked replication, pending uses tracked 
reads, and non-pending uses untracked
+     */
+    private static List<RangeReadWithReplication> 
splitRangeByPendingRanges(PartitionRangeReadCommand command,
+                                                                            
AbstractBounds<PartitionPosition> keyRange,
+                                                                            
NormalizedRanges<Token> pendingRanges,
+                                                                            
boolean isTracked)
+    {
+        List<RangeReadWithReplication> result = new ArrayList<>();
+        AbstractBounds<PartitionPosition> remainder = keyRange;
+
+        for (Range<Token> pendingRange : pendingRanges)
+        {
+            // Add non-pending gap before this pending range (if exists)
+            if (addNonPendingGapIfExists(result, command, remainder, 
pendingRange, isTracked))
+            {
+                remainder = null;
+                break; // No more remainder to process
+            }
+
+            // Add intersection with pending range
+            Pair<AbstractBounds<PartitionPosition>, 
AbstractBounds<PartitionPosition>> split =
+                Range.intersectionAndRemainder(remainder, pendingRange);
+
+            // Pending regions use the old protocol (!isTracked)
+            if (split.left != null)
+                addSplit(result, command, split.left, !isTracked);
+
+            remainder = split.right;
+            if (remainder == null)
+                break;
+        }
+
+        // Add final non-pending remainder
+        if (remainder != null)
+            addSplit(result, command, remainder, isTracked);
+
+        return result;
+    }
+
+    /**
+     * Validate that splits are contiguous, cover the entire original range, 
and alternate protocols.
+     */
+    private static void validateSplitContiguity(PartitionRangeReadCommand 
originalCommand,
+                                               List<RangeReadWithReplication> 
splits)
+    {
+        checkState(!splits.isEmpty(), "Shouldn't have empty result");
+
+        // Validate coverage
+        checkState(splits.get(0).read.dataRange().startKey()
+                        .equals(originalCommand.dataRange().startKey()),
+                   "Split reads should encompass entire range");
+        checkState(splits.get(splits.size() - 1).read.dataRange().stopKey()
+                        .equals(originalCommand.dataRange().stopKey()),
+                   "Split reads should encompass entire range");
+
+        // Validate contiguity and alternating protocols
+        if (splits.size() > 1)
+        {
+            for (int i = 0; i < splits.size() - 1; i++)
+            {
+                checkState(splits.get(i).read.dataRange().stopKey()
+                                .equals(splits.get(i + 
1).read.dataRange().startKey()),
+                           "Split reads should all be adjacent");
+                checkState(splits.get(i).useTracked != splits.get(i + 
1).useTracked,
+                           "Split reads should be for different replication 
protocols");
+            }
+        }
+    }
+
+    /**
+     * Split a range read command into sub-ranges based on migration state.
+     */
+    public static List<RangeReadWithReplication> 
splitRangeRead(ClusterMetadata metadata,
+                                                                 
PartitionRangeReadCommand command)
+    {
+        // System keyspaces never use tracked replication
+        if (SchemaConstants.isSystemKeyspace(command.metadata().keyspace))
+            return ImmutableList.of(new RangeReadWithReplication(command, 
false));
+
+        KeyspaceMigrationInfo migrationInfo = 
metadata.mutationTrackingMigrationState
+                                              
.getKeyspaceInfo(command.metadata().keyspace);
+
+        boolean isTracked = command.metadata().replicationType().isTracked();
+
+        // During migration, reads use untracked replication except for ranges 
that have
+        // completed migration to tracked. Therefore, we only need to split 
ranges when
+        // migrating to tracked replication. For untracked migrations, all 
reads use untracked.
+        if (!isTracked || migrationInfo == null)
+            return ImmutableList.of(new RangeReadWithReplication(command, 
isTracked));
+
+        // Get pending ranges for this table
+        NormalizedRanges<Token> tablePendingRanges = 
migrationInfo.pendingRangesPerTable.get(command.metadata().id());
+
+        // No pending ranges for this table - entire range uses current 
protocol
+        if (tablePendingRanges == null)
+            return ImmutableList.of(new RangeReadWithReplication(command, 
isTracked));
+
+        // split into pending (untracked) and non-pending (tracked) ranges
+        List<RangeReadWithReplication> result = splitRangeByPendingRanges(
+            command,
+            command.dataRange().keyRange(),
+            tablePendingRanges,
+            isTracked);
+
+        // Validate the splits
+        validateSplitContiguity(command, result);
+
+        return result;
+    }
+
+    public static boolean shouldUseTrackedForWrites(ClusterMetadata metadata, 
String keyspace, TableId tableId, Token token)
+    {
+        if (SchemaConstants.isSystemKeyspace(keyspace))
+            return false;
+
+        KeyspaceMigrationInfo migrationInfo = 
metadata.mutationTrackingMigrationState
+                                              .getKeyspaceInfo(keyspace);
+
+        if (migrationInfo == null)
+            return 
metadata.schema.getKeyspaceMetadata(keyspace).params.replicationType.isTracked();
+
+        boolean isTracked = 
metadata.schema.getKeyspaceMetadata(keyspace).params.replicationType.isTracked();
+        return migrationInfo.shouldUseTrackedForWrites(isTracked, tableId, 
token);
+    }
+
+    public static class RoutedMutations
+    {
+        public final List<? extends IMutation> trackedMutations;
+        public final List<? extends IMutation> untrackedMutations;
+
+        public RoutedMutations(List<? extends IMutation> tracked, List<? 
extends IMutation> untracked)
+        {
+            this.trackedMutations = tracked;
+            this.untrackedMutations = untracked;
+        }
+    }
+
+    /**
+     * Route a list of mutations, splitting them into tracked and untracked 
groups.
+     */
+    public static RoutedMutations routeMutations(List<? extends IMutation> 
mutations)

Review Comment:
   There are no tests of `routeMutations` AFAICT.



##########
src/java/org/apache/cassandra/service/replication/migration/KeyspaceMigrationInfo.java:
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.replication.migration;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import javax.annotation.Nonnull;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.NormalizedRanges;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.tcm.serialization.MetadataSerializer;
+import org.apache.cassandra.tcm.serialization.Version;
+
+import static org.apache.cassandra.db.TypeSizes.sizeof;
+import static org.apache.cassandra.utils.CollectionSerializers.deserializeList;
+import static org.apache.cassandra.utils.CollectionSerializers.deserializeMap;
+import static 
org.apache.cassandra.utils.CollectionSerializers.serializeCollection;
+import static org.apache.cassandra.utils.CollectionSerializers.serializeMap;
+import static 
org.apache.cassandra.utils.CollectionSerializers.serializedCollectionSize;
+import static 
org.apache.cassandra.utils.CollectionSerializers.serializedMapSize;
+
+/**
+ * TCM state tracking mutation tracking migration for a keyspace. Since repair 
advances the migration, and
+ * and repair sessions operate against tables, this class tracks repairs on 
every table that existed in the
+ * keyspace when the migration started.
+ * At the beginning of a migration, the full range is added to the 
pendingRanges, and as repairs are completed, the
+ * repaired ranges are subtracted from the pending ranges. When the pending 
range list is empty, the migration is finished.
+ */
+public class KeyspaceMigrationInfo
+{
+    @Nonnull public final String keyspace;
+    @Nonnull public final Map<TableId, NormalizedRanges<Token>> 
pendingRangesPerTable;
+    @Nonnull public final Epoch startedAtEpoch;
+
+    public KeyspaceMigrationInfo(@Nonnull String keyspace,
+                                 @Nonnull Map<TableId, 
NormalizedRanges<Token>> pendingRangesPerTable,
+                                 @Nonnull Epoch startedAtEpoch)
+    {
+        this.keyspace = Objects.requireNonNull(keyspace);
+        this.pendingRangesPerTable = 
ImmutableMap.copyOf(pendingRangesPerTable);
+        this.startedAtEpoch = Objects.requireNonNull(startedAtEpoch);
+    }
+
+    /**
+     * Reverse migration direction. Since unfinished migrations can be 
aborted, ranges that have not completed migrating
+     * in the previous direction are immediately rolled back. For ranges that 
did complete migration, or tables that were
+     * added since migration started, migration in the other direction is now 
required, so they're marked pending.
+     */
+    public KeyspaceMigrationInfo withDirectionReversed(@Nonnull 
Collection<TableId> allTableIds,
+                                                       @Nonnull Epoch epoch)
+    {
+        Token minimumToken = 
DatabaseDescriptor.getPartitioner().getMinimumToken();
+
+        // Reset all tables to full ring pending (includes tables currently 
migrating, added during migration, or already migrated)
+        ImmutableMap.Builder<TableId, NormalizedRanges<Token>> 
reversedPendingBuilder = ImmutableMap.builder();
+
+        for (TableId tableId : allTableIds)
+        {
+            Range<Token> fullRing = new Range<>(minimumToken, minimumToken);
+            NormalizedRanges<Token> reversedRanges = 
NormalizedRanges.normalizedRanges(Collections.singleton(fullRing));
+
+            NormalizedRanges<Token> existingPending = 
pendingRangesPerTable.get(tableId);
+            if (existingPending != null)
+            {
+                Set<Range<Token>> ranges = 
Range.subtract(Collections.singletonList(fullRing), existingPending);
+                reversedRanges = NormalizedRanges.normalizedRanges(ranges);
+            }
+
+            if (!reversedRanges.isEmpty())
+                reversedPendingBuilder.put(tableId, reversedRanges);
+        }
+
+        return new KeyspaceMigrationInfo(

Review Comment:
   OK I see my misunderstanding here. It defaults to full range and then if 
there are existing pending ranges it uses that to narrow it further.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to