aweisberg commented on code in PR #4508:
URL: https://github.com/apache/cassandra/pull/4508#discussion_r2586166789


##########
src/java/org/apache/cassandra/journal/Segments.java:
##########
@@ -157,6 +157,10 @@ void select(long minTimestamp, long maxTimestamp, 
Collection<Segment<K, V>> into
         }
     }
 
+    /**
+     * Find index of first segment with timestamp >= given timestamp.
+     * Returns sorted.size() if timestamp greater than all segments.
+     */
     int findIdxFor(long timestamp)

Review Comment:
   Java already has binary search algorithms built in? Really doesn't seem like 
we should be implementing it here.



##########
src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationMutationHelper.java:
##########
@@ -275,21 +270,27 @@ public SplitMutation(@Nullable T accordMutation, 
@Nullable T untrackedMutation,
 
     public <T extends IMutation> SplitMutation<T> splitMutation(T mutation, 
ClusterMetadata cm)
     {
-        boolean isTracked = isTrackedMutation(mutation);
+        Token token = mutation.key().getToken();
+        Predicate<TableId> isTrackedUpdate = tableId -> 
MigrationRouter.shouldUseTrackedForWrites(cm, mutation.getKeyspaceName(), 
tableId, token);
+        Predicate<TableId> isUntrackedUpdate = not(isTrackedUpdate);
         if (mutation.potentialTxnConflicts().allowed)
-            return new SplitMutation<>(null, isTracked ? null : mutation, 
isTracked ? mutation : null);
+        {

Review Comment:
   This check for potentialTxnConflicts is odd. Looks like I added it, but it 
barely makes sense and is misleading because I don't think you would ever split 
mutations from a context where txn conflicts are allowed?
   
   That flag is set on mutations from Paxos and Accord when the mutation is for 
commit and it tells the checks in `ColumnFamilyStore` to not generate an error 
if there could be a potential transaction conflict.
   
   Maybe remove this and add an illegal state exception if transaction 
conflicts are allowed with a message saying it's not needed/supported.



##########
src/java/org/apache/cassandra/service/replication/migration/KeyspaceMigrationInfo.java:
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.replication.migration;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import javax.annotation.Nonnull;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.NormalizedRanges;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.tcm.serialization.MetadataSerializer;
+import org.apache.cassandra.tcm.serialization.Version;
+
+import static org.apache.cassandra.db.TypeSizes.sizeof;
+import static org.apache.cassandra.utils.CollectionSerializers.deserializeList;
+import static org.apache.cassandra.utils.CollectionSerializers.deserializeMap;
+import static 
org.apache.cassandra.utils.CollectionSerializers.serializeCollection;
+import static org.apache.cassandra.utils.CollectionSerializers.serializeMap;
+import static 
org.apache.cassandra.utils.CollectionSerializers.serializedCollectionSize;
+import static 
org.apache.cassandra.utils.CollectionSerializers.serializedMapSize;
+
+/**
+ * TCM state tracking mutation tracking migration for a keyspace. Since repair 
advances the migration, and
+ * and repair sessions operate against tables, this class tracks repairs on 
every table that existed in the
+ * keyspace when the migration started.
+ * At the beginning of a migration, the full range is added to the 
pendingRanges, and as repairs are completed, the
+ * repaired ranges are subtracted from the pending ranges. When the pending 
range list is empty, the migration is finished.
+ */
+public class KeyspaceMigrationInfo
+{
+    @Nonnull public final String keyspace;
+    @Nonnull public final Map<TableId, NormalizedRanges<Token>> 
pendingRangesPerTable;
+    @Nonnull public final Epoch startedAtEpoch;
+
+    public KeyspaceMigrationInfo(@Nonnull String keyspace,
+                                 @Nonnull Map<TableId, 
NormalizedRanges<Token>> pendingRangesPerTable,
+                                 @Nonnull Epoch startedAtEpoch)
+    {
+        this.keyspace = Objects.requireNonNull(keyspace);
+        this.pendingRangesPerTable = 
ImmutableMap.copyOf(pendingRangesPerTable);
+        this.startedAtEpoch = Objects.requireNonNull(startedAtEpoch);
+    }
+
+    /**
+     * Reverse migration direction. Since unfinished migrations can be 
aborted, ranges that have not completed migrating
+     * in the previous direction are immediately rolled back. For ranges that 
did complete migration, or tables that were
+     * added since migration started, migration in the other direction is now 
required, so they're marked pending.
+     */
+    public KeyspaceMigrationInfo withDirectionReversed(@Nonnull 
Collection<TableId> allTableIds,
+                                                       @Nonnull Epoch epoch)
+    {
+        Token minimumToken = 
DatabaseDescriptor.getPartitioner().getMinimumToken();
+
+        // Reset all tables to full ring pending (includes tables currently 
migrating, added during migration, or already migrated)
+        ImmutableMap.Builder<TableId, NormalizedRanges<Token>> 
reversedPendingBuilder = ImmutableMap.builder();
+
+        for (TableId tableId : allTableIds)
+        {
+            Range<Token> fullRing = new Range<>(minimumToken, minimumToken);
+            NormalizedRanges<Token> reversedRanges = 
NormalizedRanges.normalizedRanges(Collections.singleton(fullRing));
+
+            NormalizedRanges<Token> existingPending = 
pendingRangesPerTable.get(tableId);
+            if (existingPending != null)
+            {
+                Set<Range<Token>> ranges = 
Range.subtract(Collections.singletonList(fullRing), existingPending);
+                reversedRanges = NormalizedRanges.normalizedRanges(ranges);
+            }
+
+            if (!reversedRanges.isEmpty())
+                reversedPendingBuilder.put(tableId, reversedRanges);
+        }
+
+        return new KeyspaceMigrationInfo(
+            keyspace,
+            reversedPendingBuilder.build(),
+            epoch
+        );
+    }
+
+    /**
+     * Remove tables from migration state. Returns null if all tables removed.
+     */
+    public KeyspaceMigrationInfo withTablesRemoved(@Nonnull Set<TableId> 
tablesToRemove)
+    {
+        if (tablesToRemove.isEmpty())
+            return this;
+
+        ImmutableMap.Builder<TableId, NormalizedRanges<Token>> builder = 
ImmutableMap.builder();
+        boolean anyRemoved = false;
+
+        for (Map.Entry<TableId, NormalizedRanges<Token>> entry : 
pendingRangesPerTable.entrySet())
+        {
+            if (!tablesToRemove.contains(entry.getKey()))
+            {
+                builder.put(entry.getKey(), entry.getValue());
+            }
+            else
+            {
+                anyRemoved = true;
+            }
+        }
+
+        if (!anyRemoved)
+            return this;
+
+        Map<TableId, NormalizedRanges<Token>> newPending = builder.build();
+
+        if (newPending.isEmpty())
+            return null;
+
+        return new KeyspaceMigrationInfo(
+            keyspace,
+            newPending,
+            startedAtEpoch
+        );
+    }
+
+    /**
+     * Subtract repaired ranges from table's pending set.
+     * Automatically removes table if all ranges repaired.
+     */
+    public KeyspaceMigrationInfo withRangesRepairedForTable(@Nonnull TableId 
tableId,
+                                                            @Nonnull 
Collection<Range<Token>> repairedRanges)
+    {
+        NormalizedRanges<Token> currentPendingForTable = 
pendingRangesPerTable.get(tableId);
+        if (currentPendingForTable == null)
+        {
+            return this;
+        }
+
+        NormalizedRanges<Token> normalizedRepaired = 
NormalizedRanges.normalizedRanges(repairedRanges);
+        NormalizedRanges<Token> remainingForTable = 
currentPendingForTable.subtract(normalizedRepaired);
+
+        ImmutableMap.Builder<TableId, NormalizedRanges<Token>> builder = 
ImmutableMap.builder();
+        for (Map.Entry<TableId, NormalizedRanges<Token>> entry : 
pendingRangesPerTable.entrySet())
+        {
+            if (entry.getKey().equals(tableId))
+            {
+                if (!remainingForTable.isEmpty())
+                    builder.put(tableId, remainingForTable);
+            }
+            else
+            {
+                builder.put(entry.getKey(), entry.getValue());
+            }
+        }
+
+        return new KeyspaceMigrationInfo(keyspace, builder.build(), 
startedAtEpoch);
+    }
+
+    /**
+     * Check if migration is complete (no tables have pending ranges).
+     * Migration is complete when all tables have been fully repaired and 
removed from the map.
+     */
+    public boolean isComplete()
+    {
+        return pendingRangesPerTable.isEmpty();
+    }
+
+    public NormalizedRanges<Token> getPendingRangesForTable(@Nonnull TableId 
tableId)
+    {
+        NormalizedRanges<Token> ranges = pendingRangesPerTable.get(tableId);
+        return ranges != null ? ranges : NormalizedRanges.empty();
+    }
+
+    /**
+     * Check if token is in any pending range.
+     * Used for routing decisions during migration.
+     *
+     * @param token token to check
+     * @return true if token is in a pending range
+     */
+    public boolean isTokenInPendingRange(TableId tableId, Token token)
+    {
+        NormalizedRanges<Token> tableRanges = 
pendingRangesPerTable.get(tableId);
+        if (tableRanges == null)
+            return false;
+        return tableRanges.intersects(token);
+    }
+
+    /**
+     * Determine if read operations on a token should use tracked replication 
during migration.
+     *
+     * We only use tracked reads for ranges that have completed migrating _to_ 
tracked replication.
+     */
+    public boolean shouldUseTrackedForReads(boolean isTracked, TableId 
tableId, Token token)
+    {
+        return isTracked && !isTokenInPendingRange(tableId, token);
+    }
+
+    /**
+     * Determine if write operations on a token should use tracked replication 
during migration.
+     *
+     * The only time we don't use tracked reads is when a range has completed 
migration to untracked replication

Review Comment:
   ```suggestion
        * The only time we don't use tracked writes is when a range has 
completed migration to untracked replication
   ```



##########
src/java/org/apache/cassandra/service/replication/migration/KeyspaceMigrationInfo.java:
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.replication.migration;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import javax.annotation.Nonnull;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.NormalizedRanges;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.tcm.serialization.MetadataSerializer;
+import org.apache.cassandra.tcm.serialization.Version;
+
+import static org.apache.cassandra.db.TypeSizes.sizeof;
+import static org.apache.cassandra.utils.CollectionSerializers.deserializeList;
+import static org.apache.cassandra.utils.CollectionSerializers.deserializeMap;
+import static 
org.apache.cassandra.utils.CollectionSerializers.serializeCollection;
+import static org.apache.cassandra.utils.CollectionSerializers.serializeMap;
+import static 
org.apache.cassandra.utils.CollectionSerializers.serializedCollectionSize;
+import static 
org.apache.cassandra.utils.CollectionSerializers.serializedMapSize;
+
+/**
+ * TCM state tracking mutation tracking migration for a keyspace. Since repair 
advances the migration, and
+ * and repair sessions operate against tables, this class tracks repairs on 
every table that existed in the
+ * keyspace when the migration started.
+ * At the beginning of a migration, the full range is added to the 
pendingRanges, and as repairs are completed, the
+ * repaired ranges are subtracted from the pending ranges. When the pending 
range list is empty, the migration is finished.
+ */
+public class KeyspaceMigrationInfo
+{
+    @Nonnull public final String keyspace;
+    @Nonnull public final Map<TableId, NormalizedRanges<Token>> 
pendingRangesPerTable;
+    @Nonnull public final Epoch startedAtEpoch;
+
+    public KeyspaceMigrationInfo(@Nonnull String keyspace,
+                                 @Nonnull Map<TableId, 
NormalizedRanges<Token>> pendingRangesPerTable,
+                                 @Nonnull Epoch startedAtEpoch)
+    {
+        this.keyspace = Objects.requireNonNull(keyspace);
+        this.pendingRangesPerTable = 
ImmutableMap.copyOf(pendingRangesPerTable);
+        this.startedAtEpoch = Objects.requireNonNull(startedAtEpoch);
+    }
+
+    /**
+     * Reverse migration direction. Since unfinished migrations can be 
aborted, ranges that have not completed migrating
+     * in the previous direction are immediately rolled back. For ranges that 
did complete migration, or tables that were
+     * added since migration started, migration in the other direction is now 
required, so they're marked pending.
+     */
+    public KeyspaceMigrationInfo withDirectionReversed(@Nonnull 
Collection<TableId> allTableIds,
+                                                       @Nonnull Epoch epoch)
+    {
+        Token minimumToken = 
DatabaseDescriptor.getPartitioner().getMinimumToken();
+
+        // Reset all tables to full ring pending (includes tables currently 
migrating, added during migration, or already migrated)
+        ImmutableMap.Builder<TableId, NormalizedRanges<Token>> 
reversedPendingBuilder = ImmutableMap.builder();
+
+        for (TableId tableId : allTableIds)
+        {
+            Range<Token> fullRing = new Range<>(minimumToken, minimumToken);
+            NormalizedRanges<Token> reversedRanges = 
NormalizedRanges.normalizedRanges(Collections.singleton(fullRing));
+
+            NormalizedRanges<Token> existingPending = 
pendingRangesPerTable.get(tableId);
+            if (existingPending != null)
+            {
+                Set<Range<Token>> ranges = 
Range.subtract(Collections.singletonList(fullRing), existingPending);
+                reversedRanges = NormalizedRanges.normalizedRanges(ranges);
+            }
+
+            if (!reversedRanges.isEmpty())
+                reversedPendingBuilder.put(tableId, reversedRanges);
+        }
+
+        return new KeyspaceMigrationInfo(

Review Comment:
   I think this has omitted tables that weren't pending migration in that 
keyspace? If the goal is to migrate the keyspace the other way then it needs to 
include all tables right?



##########
src/java/org/apache/cassandra/service/replication/migration/MigrationRouter.java:
##########
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.replication.migration;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+
+import org.apache.cassandra.db.IMutation;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.PartitionRangeReadCommand;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.virtual.VirtualMutation;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.NormalizedRanges;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.utils.Pair;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Routes read and write requests based on schema and migration state.
+ *
+ * During migration in either direction, reads are untracked and writes are 
tracked
+ */
+public class MigrationRouter
+{
+    public static boolean shouldUseTracked(SinglePartitionReadCommand command)
+    {
+        // System keyspaces never use tracked replication
+        if (SchemaConstants.isSystemKeyspace(command.metadata().keyspace))
+            return false;
+
+        ClusterMetadata metadata = ClusterMetadata.current();
+
+        KeyspaceMigrationInfo migrationInfo = 
metadata.mutationTrackingMigrationState.getKeyspaceInfo(command.metadata().keyspace);
+
+        if (migrationInfo == null)
+            return command.metadata().replicationType().isTracked();
+
+        Token token = command.partitionKey().getToken();
+        boolean isTracked = command.metadata().replicationType().isTracked();
+
+        return migrationInfo.shouldUseTrackedForReads(isTracked, 
command.metadata().id(), token);
+    }
+
+    /**
+     * Wrapper for a range read command paired with its routing decision.
+     */
+    public static class RangeReadWithReplication
+    {
+        public final PartitionRangeReadCommand read;
+        public final boolean useTracked;
+
+        public RangeReadWithReplication(PartitionRangeReadCommand read, 
boolean useTracked)
+        {
+            this.read = read;
+            this.useTracked = useTracked;
+        }
+    }
+
+    /**
+     * Helper to create and add a range split to the result list.
+     */
+    private static void addSplit(List<RangeReadWithReplication> result,
+                                PartitionRangeReadCommand command,
+                                AbstractBounds<PartitionPosition> range,
+                                boolean isTracked)
+    {
+        boolean isFirst = result.isEmpty();
+        result.add(new RangeReadWithReplication(command.forSubRange(range, 
isFirst), isTracked));
+    }
+
+    /**
+     * Adds a split for the non-pending region before pendingRange, if one 
exists.
+     *
+     * @param isTracked the target replication type (TO_TRACKED=true, 
TO_UNTRACKED=false)
+     * @return true if remainder ends before pendingRange (no intersection 
possible)
+     */
+    private static boolean 
addNonPendingGapIfExists(List<RangeReadWithReplication> result,
+                                                    PartitionRangeReadCommand 
command,
+                                                    
AbstractBounds<PartitionPosition> remainder,
+                                                    Range<Token> pendingRange,
+                                                    boolean isTracked)
+    {
+        Token pendingStart = pendingRange.left;
+        Token remainderStart = remainder.left.getToken();
+        Token remainderEnd = remainder.right.getToken();
+
+        if (remainderStart.compareTo(pendingStart) >= 0)
+            return false; // No gap before pending range
+
+        // Check if remainder ends before pending range starts
+        if (remainderEnd.compareTo(pendingStart) <= 0)
+        {
+            // Entire remainder is before this pending range - no intersection
+            // Non-pending regions use the new protocol (isTracked)
+            addSplit(result, command, remainder, isTracked);
+            return true;
+        }
+
+        // Add the non-pending gap before pending range
+        AbstractBounds<PartitionPosition> gap = 
remainder.withNewRight(pendingStart.maxKeyBound());
+
+        if (!gap.left.equals(gap.right))
+            addSplit(result, command, gap, isTracked);
+
+        return false;
+    }
+
+    /**
+     * Split a range by pending ranges, creating sub-ranges for each 
contiguous region.
+     *
+     * If we're migrating to tracked replication, pending ranges use untracked 
reads, non-pending uses tracked
+     *
+     * If we're migrating to untracked replication, pending uses tracked 
reads, and non-pending uses untracked
+     */
+    private static List<RangeReadWithReplication> 
splitRangeByPendingRanges(PartitionRangeReadCommand command,
+                                                                            
AbstractBounds<PartitionPosition> keyRange,
+                                                                            
NormalizedRanges<Token> pendingRanges,
+                                                                            
boolean isTracked)
+    {
+        List<RangeReadWithReplication> result = new ArrayList<>();
+        AbstractBounds<PartitionPosition> remainder = keyRange;
+
+        for (Range<Token> pendingRange : pendingRanges)
+        {
+            // Add non-pending gap before this pending range (if exists)
+            if (addNonPendingGapIfExists(result, command, remainder, 
pendingRange, isTracked))
+            {
+                remainder = null;
+                break; // No more remainder to process
+            }
+
+            // Add intersection with pending range
+            Pair<AbstractBounds<PartitionPosition>, 
AbstractBounds<PartitionPosition>> split =
+                Range.intersectionAndRemainder(remainder, pendingRange);
+
+            // Pending regions use the old protocol (!isTracked)
+            if (split.left != null)
+                addSplit(result, command, split.left, !isTracked);
+
+            remainder = split.right;
+            if (remainder == null)
+                break;
+        }
+
+        // Add final non-pending remainder
+        if (remainder != null)
+            addSplit(result, command, remainder, isTracked);
+
+        return result;
+    }
+
+    /**
+     * Validate that splits are contiguous, cover the entire original range, 
and alternate protocols.
+     */
+    private static void validateSplitContiguity(PartitionRangeReadCommand 
originalCommand,
+                                               List<RangeReadWithReplication> 
splits)
+    {
+        checkState(!splits.isEmpty(), "Shouldn't have empty result");
+
+        // Validate coverage
+        checkState(splits.get(0).read.dataRange().startKey()
+                        .equals(originalCommand.dataRange().startKey()),
+                   "Split reads should encompass entire range");
+        checkState(splits.get(splits.size() - 1).read.dataRange().stopKey()
+                        .equals(originalCommand.dataRange().stopKey()),
+                   "Split reads should encompass entire range");
+
+        // Validate contiguity and alternating protocols
+        if (splits.size() > 1)
+        {
+            for (int i = 0; i < splits.size() - 1; i++)
+            {
+                checkState(splits.get(i).read.dataRange().stopKey()
+                                .equals(splits.get(i + 
1).read.dataRange().startKey()),
+                           "Split reads should all be adjacent");
+                checkState(splits.get(i).useTracked != splits.get(i + 
1).useTracked,
+                           "Split reads should be for different replication 
protocols");
+            }
+        }
+    }
+
+    /**
+     * Split a range read command into sub-ranges based on migration state.
+     */
+    public static List<RangeReadWithReplication> 
splitRangeRead(ClusterMetadata metadata,
+                                                                 
PartitionRangeReadCommand command)
+    {
+        // System keyspaces never use tracked replication
+        if (SchemaConstants.isSystemKeyspace(command.metadata().keyspace))
+            return ImmutableList.of(new RangeReadWithReplication(command, 
false));
+
+        KeyspaceMigrationInfo migrationInfo = 
metadata.mutationTrackingMigrationState
+                                              
.getKeyspaceInfo(command.metadata().keyspace);
+
+        boolean isTracked = command.metadata().replicationType().isTracked();
+
+        // During migration, reads use untracked replication except for ranges 
that have
+        // completed migration to tracked. Therefore, we only need to split 
ranges when
+        // migrating to tracked replication. For untracked migrations, all 
reads use untracked.
+        if (!isTracked || migrationInfo == null)
+            return ImmutableList.of(new RangeReadWithReplication(command, 
isTracked));
+
+        // Get pending ranges for this table
+        NormalizedRanges<Token> tablePendingRanges = 
migrationInfo.pendingRangesPerTable.get(command.metadata().id());
+
+        // No pending ranges for this table - entire range uses current 
protocol
+        if (tablePendingRanges == null)
+            return ImmutableList.of(new RangeReadWithReplication(command, 
isTracked));
+
+        // split into pending (untracked) and non-pending (tracked) ranges
+        List<RangeReadWithReplication> result = splitRangeByPendingRanges(
+            command,
+            command.dataRange().keyRange(),
+            tablePendingRanges,
+            isTracked);
+
+        // Validate the splits
+        validateSplitContiguity(command, result);
+
+        return result;
+    }
+
+    public static boolean shouldUseTrackedForWrites(ClusterMetadata metadata, 
String keyspace, TableId tableId, Token token)
+    {
+        if (SchemaConstants.isSystemKeyspace(keyspace))
+            return false;
+
+        KeyspaceMigrationInfo migrationInfo = 
metadata.mutationTrackingMigrationState
+                                              .getKeyspaceInfo(keyspace);
+
+        if (migrationInfo == null)
+            return 
metadata.schema.getKeyspaceMetadata(keyspace).params.replicationType.isTracked();
+
+        boolean isTracked = 
metadata.schema.getKeyspaceMetadata(keyspace).params.replicationType.isTracked();

Review Comment:
   Can NPE if keyspace is dropped.



##########
src/java/org/apache/cassandra/db/Keyspace.java:
##########
@@ -403,7 +404,7 @@ public void initCf(TableMetadata metadata, boolean 
loadSSTables)
 
     public Future<?> applyFuture(Mutation mutation, boolean writeCommitLog, 
boolean updateIndexes)
     {
-        return getMetadata().useMutationTracking()
+        return MigrationRouter.isFullyTracked(mutation)

Review Comment:
   If it's partially tracked and migrating to mutation tracking and don't apply 
the partially tracked portion as tracked then it forces background 
reconciliation to reconcile all the mutations? I don't get how it makes sense 
to bypass the mutation tracking plumbing during migration.
   
   I thought that writes would flow through the mutation tracking system and 
background reconciliation would occur and it would be fine even if there it 
does extra/redundant work with read repair.
   
   Basically I think for writes there isn't a range based form of "is it 
tracked" and that is only relevant for reads which can't be correct without 
relying on range based repairs Just throw it into the mutation tracking system 
and call it good?



##########
src/java/org/apache/cassandra/service/replication/migration/MigrationRouter.java:
##########
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.replication.migration;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+
+import org.apache.cassandra.db.IMutation;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.PartitionRangeReadCommand;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.virtual.VirtualMutation;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.NormalizedRanges;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.utils.Pair;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Routes read and write requests based on schema and migration state.
+ *
+ * During migration in either direction, reads are untracked and writes are 
tracked
+ */
+public class MigrationRouter
+{
+    public static boolean shouldUseTracked(SinglePartitionReadCommand command)
+    {
+        // System keyspaces never use tracked replication
+        if (SchemaConstants.isSystemKeyspace(command.metadata().keyspace))
+            return false;
+
+        ClusterMetadata metadata = ClusterMetadata.current();
+
+        KeyspaceMigrationInfo migrationInfo = 
metadata.mutationTrackingMigrationState.getKeyspaceInfo(command.metadata().keyspace);
+
+        if (migrationInfo == null)
+            return command.metadata().replicationType().isTracked();
+
+        Token token = command.partitionKey().getToken();
+        boolean isTracked = command.metadata().replicationType().isTracked();
+
+        return migrationInfo.shouldUseTrackedForReads(isTracked, 
command.metadata().id(), token);
+    }
+
+    /**
+     * Wrapper for a range read command paired with its routing decision.
+     */
+    public static class RangeReadWithReplication
+    {
+        public final PartitionRangeReadCommand read;
+        public final boolean useTracked;
+
+        public RangeReadWithReplication(PartitionRangeReadCommand read, 
boolean useTracked)
+        {
+            this.read = read;
+            this.useTracked = useTracked;
+        }
+    }
+
+    /**
+     * Helper to create and add a range split to the result list.
+     */
+    private static void addSplit(List<RangeReadWithReplication> result,
+                                PartitionRangeReadCommand command,
+                                AbstractBounds<PartitionPosition> range,
+                                boolean isTracked)
+    {
+        boolean isFirst = result.isEmpty();
+        result.add(new RangeReadWithReplication(command.forSubRange(range, 
isFirst), isTracked));
+    }
+
+    /**
+     * Adds a split for the non-pending region before pendingRange, if one 
exists.
+     *
+     * @param isTracked the target replication type (TO_TRACKED=true, 
TO_UNTRACKED=false)
+     * @return true if remainder ends before pendingRange (no intersection 
possible)
+     */
+    private static boolean 
addNonPendingGapIfExists(List<RangeReadWithReplication> result,
+                                                    PartitionRangeReadCommand 
command,
+                                                    
AbstractBounds<PartitionPosition> remainder,
+                                                    Range<Token> pendingRange,
+                                                    boolean isTracked)
+    {
+        Token pendingStart = pendingRange.left;
+        Token remainderStart = remainder.left.getToken();
+        Token remainderEnd = remainder.right.getToken();
+
+        if (remainderStart.compareTo(pendingStart) >= 0)
+            return false; // No gap before pending range
+
+        // Check if remainder ends before pending range starts
+        if (remainderEnd.compareTo(pendingStart) <= 0)
+        {
+            // Entire remainder is before this pending range - no intersection
+            // Non-pending regions use the new protocol (isTracked)
+            addSplit(result, command, remainder, isTracked);
+            return true;
+        }
+
+        // Add the non-pending gap before pending range
+        AbstractBounds<PartitionPosition> gap = 
remainder.withNewRight(pendingStart.maxKeyBound());
+
+        if (!gap.left.equals(gap.right))
+            addSplit(result, command, gap, isTracked);
+
+        return false;
+    }
+
+    /**
+     * Split a range by pending ranges, creating sub-ranges for each 
contiguous region.
+     *
+     * If we're migrating to tracked replication, pending ranges use untracked 
reads, non-pending uses tracked
+     *
+     * If we're migrating to untracked replication, pending uses tracked 
reads, and non-pending uses untracked
+     */
+    private static List<RangeReadWithReplication> 
splitRangeByPendingRanges(PartitionRangeReadCommand command,
+                                                                            
AbstractBounds<PartitionPosition> keyRange,
+                                                                            
NormalizedRanges<Token> pendingRanges,
+                                                                            
boolean isTracked)
+    {
+        List<RangeReadWithReplication> result = new ArrayList<>();
+        AbstractBounds<PartitionPosition> remainder = keyRange;
+
+        for (Range<Token> pendingRange : pendingRanges)
+        {
+            // Add non-pending gap before this pending range (if exists)
+            if (addNonPendingGapIfExists(result, command, remainder, 
pendingRange, isTracked))
+            {
+                remainder = null;
+                break; // No more remainder to process
+            }
+
+            // Add intersection with pending range
+            Pair<AbstractBounds<PartitionPosition>, 
AbstractBounds<PartitionPosition>> split =
+                Range.intersectionAndRemainder(remainder, pendingRange);
+
+            // Pending regions use the old protocol (!isTracked)
+            if (split.left != null)
+                addSplit(result, command, split.left, !isTracked);
+
+            remainder = split.right;
+            if (remainder == null)
+                break;
+        }
+
+        // Add final non-pending remainder
+        if (remainder != null)
+            addSplit(result, command, remainder, isTracked);
+
+        return result;
+    }
+
+    /**
+     * Validate that splits are contiguous, cover the entire original range, 
and alternate protocols.
+     */
+    private static void validateSplitContiguity(PartitionRangeReadCommand 
originalCommand,
+                                               List<RangeReadWithReplication> 
splits)
+    {
+        checkState(!splits.isEmpty(), "Shouldn't have empty result");
+
+        // Validate coverage
+        checkState(splits.get(0).read.dataRange().startKey()
+                        .equals(originalCommand.dataRange().startKey()),
+                   "Split reads should encompass entire range");
+        checkState(splits.get(splits.size() - 1).read.dataRange().stopKey()
+                        .equals(originalCommand.dataRange().stopKey()),
+                   "Split reads should encompass entire range");
+
+        // Validate contiguity and alternating protocols
+        if (splits.size() > 1)
+        {
+            for (int i = 0; i < splits.size() - 1; i++)
+            {
+                checkState(splits.get(i).read.dataRange().stopKey()
+                                .equals(splits.get(i + 
1).read.dataRange().startKey()),
+                           "Split reads should all be adjacent");
+                checkState(splits.get(i).useTracked != splits.get(i + 
1).useTracked,
+                           "Split reads should be for different replication 
protocols");
+            }
+        }
+    }
+
+    /**
+     * Split a range read command into sub-ranges based on migration state.
+     */
+    public static List<RangeReadWithReplication> 
splitRangeRead(ClusterMetadata metadata,
+                                                                 
PartitionRangeReadCommand command)
+    {
+        // System keyspaces never use tracked replication
+        if (SchemaConstants.isSystemKeyspace(command.metadata().keyspace))
+            return ImmutableList.of(new RangeReadWithReplication(command, 
false));
+
+        KeyspaceMigrationInfo migrationInfo = 
metadata.mutationTrackingMigrationState
+                                              
.getKeyspaceInfo(command.metadata().keyspace);
+
+        boolean isTracked = command.metadata().replicationType().isTracked();
+
+        // During migration, reads use untracked replication except for ranges 
that have
+        // completed migration to tracked. Therefore, we only need to split 
ranges when
+        // migrating to tracked replication. For untracked migrations, all 
reads use untracked.
+        if (!isTracked || migrationInfo == null)
+            return ImmutableList.of(new RangeReadWithReplication(command, 
isTracked));
+
+        // Get pending ranges for this table
+        NormalizedRanges<Token> tablePendingRanges = 
migrationInfo.pendingRangesPerTable.get(command.metadata().id());
+
+        // No pending ranges for this table - entire range uses current 
protocol
+        if (tablePendingRanges == null)
+            return ImmutableList.of(new RangeReadWithReplication(command, 
isTracked));
+
+        // split into pending (untracked) and non-pending (tracked) ranges
+        List<RangeReadWithReplication> result = splitRangeByPendingRanges(
+            command,
+            command.dataRange().keyRange(),
+            tablePendingRanges,
+            isTracked);
+
+        // Validate the splits
+        validateSplitContiguity(command, result);
+
+        return result;
+    }
+
+    public static boolean shouldUseTrackedForWrites(ClusterMetadata metadata, 
String keyspace, TableId tableId, Token token)
+    {
+        if (SchemaConstants.isSystemKeyspace(keyspace))
+            return false;
+
+        KeyspaceMigrationInfo migrationInfo = 
metadata.mutationTrackingMigrationState
+                                              .getKeyspaceInfo(keyspace);
+
+        if (migrationInfo == null)
+            return 
metadata.schema.getKeyspaceMetadata(keyspace).params.replicationType.isTracked();

Review Comment:
   Can NPE if keyspace is dropped



##########
src/java/org/apache/cassandra/tcm/transformations/AlterSchema.java:
##########
@@ -332,6 +337,64 @@ public static Transformer 
maybeUpdateConsensusMigrationState(ConsensusMigrationS
         return next;
     }
 
+    /**
+     * Auto-start mutation tracking migration when keyspace replication type 
changes.
+     * Detects transitions between tracked and untracked replication and 
initializes
+     * migration state accordingly.
+     * Also handles removing dropped tables and keyspaces from migration state.
+     */
+    public static Transformer maybeUpdateMutationTrackingMigrationState(Epoch 
nextEpoch,
+                                                                        
MutationTrackingMigrationState prev,
+                                                                        
Transformer next,
+                                                                        
ImmutableList<KeyspaceDiff> altered,
+                                                                        
Keyspaces dropped)
+    {
+        MutationTrackingMigrationState migrationState = prev;
+
+        // Handle dropped keyspaces - remove their migration state entirely
+        if (!dropped.isEmpty())
+        {
+            Set<String> droppedKeyspaceNames = dropped.stream()
+                .map(ks -> ks.name)
+                .collect(Collectors.toSet());
+            migrationState = migrationState.dropKeyspaces(nextEpoch, 
droppedKeyspaceNames);
+        }
+
+        // Handle dropped tables from altered keyspaces
+        Set<TableId> droppedTableIds = altered.stream()
+            .flatMap(diff -> 
diff.tables.dropped.stream().map(TableMetadata::id))
+            .collect(Collectors.toSet());
+
+        if (!droppedTableIds.isEmpty())
+            migrationState = migrationState.dropTables(droppedTableIds, 
nextEpoch);
+
+        // Handle keyspace replication type changes (new migrations or 
reversals)
+        for (KeyspaceDiff diff : altered)
+        {
+            ReplicationType beforeType = diff.before.params.replicationType;
+            ReplicationType afterType = diff.after.params.replicationType;
+
+            // Check if replication type changed
+            if (beforeType != afterType)
+            {
+                // Auto-start migration for this keyspace
+                logger.info("Auto-starting mutation tracking migration for 
keyspace {} (replication_type={})",
+                          diff.after.name, afterType);
+
+                Collection<TableId> tableIds = diff.after.tables.stream()
+                    .map(table -> table.id)
+                    .collect(Collectors.toList());
+
+                migrationState = 
migrationState.withKeyspaceMigrating(diff.after.name, tableIds, nextEpoch);

Review Comment:
   A little sneaky that this is a reversal on whatever is already there.



##########
src/java/org/apache/cassandra/service/replication/migration/MigrationRouter.java:
##########
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.replication.migration;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+
+import org.apache.cassandra.db.IMutation;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.PartitionRangeReadCommand;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.virtual.VirtualMutation;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.NormalizedRanges;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.utils.Pair;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Routes read and write requests based on schema and migration state.
+ *
+ * During migration in either direction, reads are untracked and writes are 
tracked
+ */
+public class MigrationRouter
+{
+    public static boolean shouldUseTracked(SinglePartitionReadCommand command)
+    {
+        // System keyspaces never use tracked replication
+        if (SchemaConstants.isSystemKeyspace(command.metadata().keyspace))
+            return false;
+
+        ClusterMetadata metadata = ClusterMetadata.current();
+
+        KeyspaceMigrationInfo migrationInfo = 
metadata.mutationTrackingMigrationState.getKeyspaceInfo(command.metadata().keyspace);
+
+        if (migrationInfo == null)
+            return command.metadata().replicationType().isTracked();
+
+        Token token = command.partitionKey().getToken();
+        boolean isTracked = command.metadata().replicationType().isTracked();
+
+        return migrationInfo.shouldUseTrackedForReads(isTracked, 
command.metadata().id(), token);
+    }
+
+    /**
+     * Wrapper for a range read command paired with its routing decision.
+     */
+    public static class RangeReadWithReplication
+    {
+        public final PartitionRangeReadCommand read;
+        public final boolean useTracked;
+
+        public RangeReadWithReplication(PartitionRangeReadCommand read, 
boolean useTracked)
+        {
+            this.read = read;
+            this.useTracked = useTracked;
+        }
+    }
+
+    /**
+     * Helper to create and add a range split to the result list.
+     */
+    private static void addSplit(List<RangeReadWithReplication> result,
+                                PartitionRangeReadCommand command,
+                                AbstractBounds<PartitionPosition> range,
+                                boolean isTracked)
+    {
+        boolean isFirst = result.isEmpty();
+        result.add(new RangeReadWithReplication(command.forSubRange(range, 
isFirst), isTracked));
+    }
+
+    /**
+     * Adds a split for the non-pending region before pendingRange, if one 
exists.
+     *
+     * @param isTracked the target replication type (TO_TRACKED=true, 
TO_UNTRACKED=false)
+     * @return true if remainder ends before pendingRange (no intersection 
possible)
+     */
+    private static boolean 
addNonPendingGapIfExists(List<RangeReadWithReplication> result,
+                                                    PartitionRangeReadCommand 
command,
+                                                    
AbstractBounds<PartitionPosition> remainder,
+                                                    Range<Token> pendingRange,
+                                                    boolean isTracked)
+    {
+        Token pendingStart = pendingRange.left;
+        Token remainderStart = remainder.left.getToken();
+        Token remainderEnd = remainder.right.getToken();
+
+        if (remainderStart.compareTo(pendingStart) >= 0)
+            return false; // No gap before pending range
+
+        // Check if remainder ends before pending range starts
+        if (remainderEnd.compareTo(pendingStart) <= 0)
+        {
+            // Entire remainder is before this pending range - no intersection
+            // Non-pending regions use the new protocol (isTracked)
+            addSplit(result, command, remainder, isTracked);
+            return true;
+        }
+
+        // Add the non-pending gap before pending range
+        AbstractBounds<PartitionPosition> gap = 
remainder.withNewRight(pendingStart.maxKeyBound());
+
+        if (!gap.left.equals(gap.right))
+            addSplit(result, command, gap, isTracked);
+
+        return false;
+    }
+
+    /**
+     * Split a range by pending ranges, creating sub-ranges for each 
contiguous region.
+     *
+     * If we're migrating to tracked replication, pending ranges use untracked 
reads, non-pending uses tracked
+     *
+     * If we're migrating to untracked replication, pending uses tracked 
reads, and non-pending uses untracked
+     */
+    private static List<RangeReadWithReplication> 
splitRangeByPendingRanges(PartitionRangeReadCommand command,

Review Comment:
   If this doesn't handle and isn't tested for wrap around ranges then create a 
precondition that rejects wrap around ranges. It's fine if it ends up being 
needed it's just generally easier/safer to enforce that if you haven't tried to 
handle wrap around ranges that it errors out and we can fix it instead of 
getting a wrong answer.



##########
src/java/org/apache/cassandra/service/replication/migration/MutationTrackingMigrationState.java:
##########
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.replication.migration;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import javax.annotation.Nonnull;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.dht.NormalizedRanges;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.DistributedSchema;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.tcm.MetadataValue;
+import org.apache.cassandra.tcm.serialization.MetadataSerializer;
+import org.apache.cassandra.tcm.serialization.Version;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.cassandra.utils.CollectionSerializers.deserializeMap;
+import static org.apache.cassandra.utils.CollectionSerializers.serializeMap;
+import static 
org.apache.cassandra.utils.CollectionSerializers.serializedMapSize;
+
+/**
+ * Cluster wide per keyspace mutation tracking migration state.
+ * Tracks ranges needing migration per keyspace
+ * Only schema changes and repair coordinators execute TCM transformations; 
replicas read for routing of reads/writes.
+ */
+public class MutationTrackingMigrationState implements 
MetadataValue<MutationTrackingMigrationState>
+{
+    public static final MutationTrackingMigrationState EMPTY =
+        new MutationTrackingMigrationState(Epoch.EMPTY, ImmutableMap.of());
+
+    @Nonnull
+    public final ImmutableMap<String, KeyspaceMigrationInfo> keyspaceInfo;
+
+    @Nonnull
+    public final Epoch lastModified;
+
+    public MutationTrackingMigrationState(@Nonnull Epoch lastModified,
+                                         @Nonnull Map<String, 
KeyspaceMigrationInfo> keyspaceInfo)
+    {
+        checkNotNull(lastModified);
+        checkNotNull(keyspaceInfo);
+        this.lastModified = lastModified;
+        this.keyspaceInfo = ImmutableMap.copyOf(keyspaceInfo);
+    }
+
+    @Override
+    public MutationTrackingMigrationState withLastModified(Epoch epoch)
+    {
+        return new MutationTrackingMigrationState(epoch, keyspaceInfo);
+    }
+
+    @Override
+    public Epoch lastModified()
+    {
+        return lastModified;
+    }
+
+    private MutationTrackingMigrationState withUpdatedKeyspaceInfo(Epoch 
epoch, KeyspaceMigrationInfo info)
+    {
+        ImmutableMap.Builder<String, KeyspaceMigrationInfo> updated = 
ImmutableMap.builder();
+        for (Map.Entry<String, KeyspaceMigrationInfo> entry : 
keyspaceInfo.entrySet())
+        {
+            if (!entry.getKey().equals(info.keyspace))
+                updated.put(entry.getKey(), entry.getValue());
+        }
+
+        if (info != null && !info.isComplete())

Review Comment:
   I don't think this null check is needed



##########
src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationMutationHelper.java:
##########
@@ -275,21 +270,27 @@ public SplitMutation(@Nullable T accordMutation, 
@Nullable T untrackedMutation,
 
     public <T extends IMutation> SplitMutation<T> splitMutation(T mutation, 
ClusterMetadata cm)
     {
-        boolean isTracked = isTrackedMutation(mutation);
+        Token token = mutation.key().getToken();
+        Predicate<TableId> isTrackedUpdate = tableId -> 
MigrationRouter.shouldUseTrackedForWrites(cm, mutation.getKeyspaceName(), 
tableId, token);
+        Predicate<TableId> isUntrackedUpdate = not(isTrackedUpdate);
         if (mutation.potentialTxnConflicts().allowed)
-            return new SplitMutation<>(null, isTracked ? null : mutation, 
isTracked ? mutation : null);
+        {
+            return new SplitMutation<>(null, (T) 
mutation.filter(isUntrackedUpdate), (T) mutation.filter(isTrackedUpdate));
+        }
 
-        Token token = mutation.key().getToken();
         Predicate<TableId> isAccordUpdate = tableId -> 
tokenShouldBeWrittenThroughAccord(cm, tableId, token, 
TransactionalMode::nonSerialWritesThroughAccord, 
TransactionalMigrationFromMode::nonSerialWritesThroughAccord);
 
         T accordMutation = (T)mutation.filter(isAccordUpdate);
-        T normalMutation = (T)mutation.filter(not(isAccordUpdate));
+        T untrackedMutation = (T)mutation.filter(tid -> 
!isAccordUpdate.test(tid) && isUntrackedUpdate.test(tid));
+        T trackedMutation = (T)mutation.filter(tid -> 
!isAccordUpdate.test(tid) && isTrackedUpdate.test(tid));
+
         for (PartitionUpdate pu : mutation.getPartitionUpdates())
             checkState((accordMutation == null ? false : 
accordMutation.hasUpdateForTable(pu.metadata().id))
-                       || (normalMutation == null ? false : 
normalMutation.hasUpdateForTable(pu.metadata().id)),
+                       || (untrackedMutation == null ? false : 
untrackedMutation.hasUpdateForTable(pu.metadata().id))
+                       || (trackedMutation == null ? false : 
trackedMutation.hasUpdateForTable(pu.metadata().id)),
                        "All partition updates should still be present after 
splitting");
 
-        return new SplitMutation(accordMutation, isTracked ? null : 
normalMutation, isTracked ? normalMutation : null);
+        return new SplitMutation(accordMutation, untrackedMutation, 
trackedMutation);

Review Comment:
   This code was for splitting into Accord and Paxos both of which should then 
handle whether the mutation is tracked or not and how to apply it.
   
   This makes it seem like a three way split when it really isn't. Maybe add 
assertions that reflect the current reality where if there are any Accord 
updates there should be 0 tracked updates.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to