aweisberg commented on code in PR #4508:
URL: https://github.com/apache/cassandra/pull/4508#discussion_r2590325213


##########
test/unit/org/apache/cassandra/service/replication/migration/KeyspaceMigrationInfoTest.java:
##########
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.replication.migration;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.ServerTestUtils;
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.NormalizedRanges;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.tcm.membership.NodeVersion;
+
+import static org.junit.Assert.*;
+
+public class KeyspaceMigrationInfoTest
+{
+    private static IPartitioner partitioner;
+    private static TableId testTableId;
+
+    @BeforeClass
+    public static void setup() throws Exception
+    {
+        System.setProperty(CassandraRelevantProperties.PARTITIONER.getKey(), 
Murmur3Partitioner.class.getName());
+
+        ServerTestUtils.prepareServerNoRegister();
+        partitioner = DatabaseDescriptor.getPartitioner();
+        assertTrue(partitioner instanceof Murmur3Partitioner);
+        testTableId = TableId.generate();
+    }
+
+    @Test
+    public void testConstruction()
+    {
+        Epoch epoch = Epoch.create(1);
+        Map<TableId, NormalizedRanges<Token>> pendingRangesPerTable = 
Collections.emptyMap();
+
+        KeyspaceMigrationInfo info = new KeyspaceMigrationInfo(
+            "test_ks",
+            pendingRangesPerTable,
+            epoch
+        );
+
+        assertEquals("test_ks", info.keyspace);
+        assertEquals(epoch, info.startedAtEpoch);
+        assertTrue(info.isComplete());
+        assertTrue(info.pendingRangesPerTable.isEmpty());
+    }
+
+    @Test
+    public void testWithRangesRepairedForTable()
+    {
+        List<Range<Token>> ranges = createTestRanges();
+        Epoch epoch1 = Epoch.create(1);
+        Epoch epoch2 = Epoch.create(2);
+
+        // Start with full ring as pending for testTableId
+        Range<Token> fullRing = new Range<>(partitioner.getMinimumToken(), 
partitioner.getMinimumToken());
+        NormalizedRanges<Token> fullRingNormalized = 
NormalizedRanges.normalizedRanges(Collections.singleton(fullRing));
+        Map<TableId, NormalizedRanges<Token>> pendingRangesPerTable = 
Collections.singletonMap(testTableId, fullRingNormalized);
+
+        KeyspaceMigrationInfo info = new KeyspaceMigrationInfo(
+            "test_ks",
+            pendingRangesPerTable,
+            epoch1
+        );
+
+        // Subtract first range - should leave remaining ranges as pending
+        KeyspaceMigrationInfo updated = 
info.withRangesRepairedForTable(testTableId, 
Collections.singleton(ranges.get(0)));
+
+        assertFalse(updated.isComplete());
+
+        // Subtract second range - should leave even fewer pending ranges
+        KeyspaceMigrationInfo updated2 = 
updated.withRangesRepairedForTable(testTableId, 
Collections.singleton(ranges.get(1)));
+
+        assertFalse(updated2.isComplete());

Review Comment:
   This isn't asserting much about what the outcome should be? Nothing could 
have changed and this test would pass.



##########
test/unit/org/apache/cassandra/service/replication/migration/KeyspaceMigrationInfoTest.java:
##########
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.replication.migration;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.ServerTestUtils;
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.NormalizedRanges;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.tcm.membership.NodeVersion;
+
+import static org.junit.Assert.*;
+
+public class KeyspaceMigrationInfoTest
+{
+    private static IPartitioner partitioner;
+    private static TableId testTableId;
+
+    @BeforeClass
+    public static void setup() throws Exception
+    {
+        System.setProperty(CassandraRelevantProperties.PARTITIONER.getKey(), 
Murmur3Partitioner.class.getName());
+
+        ServerTestUtils.prepareServerNoRegister();
+        partitioner = DatabaseDescriptor.getPartitioner();
+        assertTrue(partitioner instanceof Murmur3Partitioner);
+        testTableId = TableId.generate();
+    }
+
+    @Test
+    public void testConstruction()
+    {
+        Epoch epoch = Epoch.create(1);
+        Map<TableId, NormalizedRanges<Token>> pendingRangesPerTable = 
Collections.emptyMap();
+
+        KeyspaceMigrationInfo info = new KeyspaceMigrationInfo(
+            "test_ks",
+            pendingRangesPerTable,
+            epoch
+        );
+
+        assertEquals("test_ks", info.keyspace);
+        assertEquals(epoch, info.startedAtEpoch);
+        assertTrue(info.isComplete());
+        assertTrue(info.pendingRangesPerTable.isEmpty());
+    }
+
+    @Test
+    public void testWithRangesRepairedForTable()
+    {
+        List<Range<Token>> ranges = createTestRanges();
+        Epoch epoch1 = Epoch.create(1);
+        Epoch epoch2 = Epoch.create(2);

Review Comment:
   Unused



##########
test/unit/org/apache/cassandra/service/replication/migration/KeyspaceMigrationInfoTest.java:
##########
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.replication.migration;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.ServerTestUtils;
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.NormalizedRanges;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.tcm.membership.NodeVersion;
+
+import static org.junit.Assert.*;
+
+public class KeyspaceMigrationInfoTest
+{
+    private static IPartitioner partitioner;
+    private static TableId testTableId;
+
+    @BeforeClass
+    public static void setup() throws Exception
+    {
+        System.setProperty(CassandraRelevantProperties.PARTITIONER.getKey(), 
Murmur3Partitioner.class.getName());
+
+        ServerTestUtils.prepareServerNoRegister();
+        partitioner = DatabaseDescriptor.getPartitioner();
+        assertTrue(partitioner instanceof Murmur3Partitioner);
+        testTableId = TableId.generate();
+    }
+
+    @Test
+    public void testConstruction()
+    {
+        Epoch epoch = Epoch.create(1);
+        Map<TableId, NormalizedRanges<Token>> pendingRangesPerTable = 
Collections.emptyMap();
+
+        KeyspaceMigrationInfo info = new KeyspaceMigrationInfo(
+            "test_ks",
+            pendingRangesPerTable,
+            epoch
+        );
+
+        assertEquals("test_ks", info.keyspace);
+        assertEquals(epoch, info.startedAtEpoch);
+        assertTrue(info.isComplete());
+        assertTrue(info.pendingRangesPerTable.isEmpty());
+    }
+
+    @Test
+    public void testWithRangesRepairedForTable()
+    {
+        List<Range<Token>> ranges = createTestRanges();
+        Epoch epoch1 = Epoch.create(1);
+        Epoch epoch2 = Epoch.create(2);
+
+        // Start with full ring as pending for testTableId
+        Range<Token> fullRing = new Range<>(partitioner.getMinimumToken(), 
partitioner.getMinimumToken());
+        NormalizedRanges<Token> fullRingNormalized = 
NormalizedRanges.normalizedRanges(Collections.singleton(fullRing));
+        Map<TableId, NormalizedRanges<Token>> pendingRangesPerTable = 
Collections.singletonMap(testTableId, fullRingNormalized);
+
+        KeyspaceMigrationInfo info = new KeyspaceMigrationInfo(
+            "test_ks",
+            pendingRangesPerTable,
+            epoch1
+        );
+
+        // Subtract first range - should leave remaining ranges as pending
+        KeyspaceMigrationInfo updated = 
info.withRangesRepairedForTable(testTableId, 
Collections.singleton(ranges.get(0)));
+
+        assertFalse(updated.isComplete());
+
+        // Subtract second range - should leave even fewer pending ranges
+        KeyspaceMigrationInfo updated2 = 
updated.withRangesRepairedForTable(testTableId, 
Collections.singleton(ranges.get(1)));
+
+        assertFalse(updated2.isComplete());
+    }
+
+    @Test
+    public void testSerialization() throws IOException
+    {
+        List<Range<Token>> ranges = createTestRanges();
+        Epoch epoch = Epoch.create(42);
+
+        // Create pending ranges for testTableId
+        NormalizedRanges<Token> normalizedRanges = 
NormalizedRanges.normalizedRanges(ranges.subList(0, 2));
+        Map<TableId, NormalizedRanges<Token>> pendingRangesPerTable = 
Collections.singletonMap(testTableId, normalizedRanges);
+
+        KeyspaceMigrationInfo info = new KeyspaceMigrationInfo(
+            "test_ks",
+            pendingRangesPerTable,
+            epoch
+        );
+
+        // Serialize
+        DataOutputBuffer out = new DataOutputBuffer();
+        KeyspaceMigrationInfo.serializer.serialize(info, out, 
NodeVersion.CURRENT.serializationVersion());
+
+        // Deserialize
+        DataInputBuffer in = new DataInputBuffer(out.toByteArray());
+        KeyspaceMigrationInfo deserialized = 
KeyspaceMigrationInfo.serializer.deserialize(in, 
NodeVersion.CURRENT.serializationVersion());
+
+        // Verify
+        assertEquals(info.keyspace, deserialized.keyspace);
+        assertEquals(info.pendingRangesPerTable.size(), 
deserialized.pendingRangesPerTable.size());

Review Comment:
   Why not also compare the contents of the `pendingRangesPerTable`?



##########
test/unit/org/apache/cassandra/service/replication/migration/KeyspaceMigrationInfoTest.java:
##########
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.replication.migration;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.ServerTestUtils;
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.NormalizedRanges;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.tcm.membership.NodeVersion;
+
+import static org.junit.Assert.*;
+
+public class KeyspaceMigrationInfoTest
+{
+    private static IPartitioner partitioner;
+    private static TableId testTableId;
+
+    @BeforeClass
+    public static void setup() throws Exception
+    {
+        System.setProperty(CassandraRelevantProperties.PARTITIONER.getKey(), 
Murmur3Partitioner.class.getName());
+
+        ServerTestUtils.prepareServerNoRegister();
+        partitioner = DatabaseDescriptor.getPartitioner();
+        assertTrue(partitioner instanceof Murmur3Partitioner);
+        testTableId = TableId.generate();
+    }
+
+    @Test
+    public void testConstruction()
+    {
+        Epoch epoch = Epoch.create(1);
+        Map<TableId, NormalizedRanges<Token>> pendingRangesPerTable = 
Collections.emptyMap();
+
+        KeyspaceMigrationInfo info = new KeyspaceMigrationInfo(
+            "test_ks",
+            pendingRangesPerTable,
+            epoch
+        );
+
+        assertEquals("test_ks", info.keyspace);
+        assertEquals(epoch, info.startedAtEpoch);
+        assertTrue(info.isComplete());
+        assertTrue(info.pendingRangesPerTable.isEmpty());
+    }
+
+    @Test
+    public void testWithRangesRepairedForTable()
+    {
+        List<Range<Token>> ranges = createTestRanges();
+        Epoch epoch1 = Epoch.create(1);
+        Epoch epoch2 = Epoch.create(2);
+
+        // Start with full ring as pending for testTableId
+        Range<Token> fullRing = new Range<>(partitioner.getMinimumToken(), 
partitioner.getMinimumToken());
+        NormalizedRanges<Token> fullRingNormalized = 
NormalizedRanges.normalizedRanges(Collections.singleton(fullRing));
+        Map<TableId, NormalizedRanges<Token>> pendingRangesPerTable = 
Collections.singletonMap(testTableId, fullRingNormalized);
+
+        KeyspaceMigrationInfo info = new KeyspaceMigrationInfo(
+            "test_ks",
+            pendingRangesPerTable,
+            epoch1
+        );
+
+        // Subtract first range - should leave remaining ranges as pending
+        KeyspaceMigrationInfo updated = 
info.withRangesRepairedForTable(testTableId, 
Collections.singleton(ranges.get(0)));
+
+        assertFalse(updated.isComplete());
+
+        // Subtract second range - should leave even fewer pending ranges
+        KeyspaceMigrationInfo updated2 = 
updated.withRangesRepairedForTable(testTableId, 
Collections.singleton(ranges.get(1)));
+
+        assertFalse(updated2.isComplete());
+    }
+
+    @Test
+    public void testSerialization() throws IOException
+    {
+        List<Range<Token>> ranges = createTestRanges();
+        Epoch epoch = Epoch.create(42);
+
+        // Create pending ranges for testTableId
+        NormalizedRanges<Token> normalizedRanges = 
NormalizedRanges.normalizedRanges(ranges.subList(0, 2));
+        Map<TableId, NormalizedRanges<Token>> pendingRangesPerTable = 
Collections.singletonMap(testTableId, normalizedRanges);
+
+        KeyspaceMigrationInfo info = new KeyspaceMigrationInfo(
+            "test_ks",
+            pendingRangesPerTable,
+            epoch
+        );
+
+        // Serialize
+        DataOutputBuffer out = new DataOutputBuffer();
+        KeyspaceMigrationInfo.serializer.serialize(info, out, 
NodeVersion.CURRENT.serializationVersion());
+
+        // Deserialize
+        DataInputBuffer in = new DataInputBuffer(out.toByteArray());
+        KeyspaceMigrationInfo deserialized = 
KeyspaceMigrationInfo.serializer.deserialize(in, 
NodeVersion.CURRENT.serializationVersion());
+
+        // Verify
+        assertEquals(info.keyspace, deserialized.keyspace);
+        assertEquals(info.pendingRangesPerTable.size(), 
deserialized.pendingRangesPerTable.size());
+        assertEquals(info.startedAtEpoch, deserialized.startedAtEpoch);
+    }
+
+    @Test
+    public void testImmutability()
+    {
+        List<Range<Token>> ranges = createTestRanges();
+        Epoch epoch = Epoch.create(1);
+
+        // Start with full ring as pending for testTableId
+        Range<Token> fullRing = new Range<>(partitioner.getMinimumToken(), 
partitioner.getMinimumToken());
+        NormalizedRanges<Token> fullRingNormalized = 
NormalizedRanges.normalizedRanges(Collections.singleton(fullRing));
+        Map<TableId, NormalizedRanges<Token>> pendingRangesPerTable = 
Collections.singletonMap(testTableId, fullRingNormalized);
+
+        KeyspaceMigrationInfo info = new KeyspaceMigrationInfo(
+            "test_ks",
+            pendingRangesPerTable,
+            epoch
+        );
+
+        NormalizedRanges<Token> originalPendingRanges = 
info.getPendingRangesForTable(testTableId);
+        int originalSize = originalPendingRanges.size();
+
+        // Subtract a range for testTableId
+        KeyspaceMigrationInfo updated = info.withRangesRepairedForTable(
+            testTableId,
+            Collections.singleton(ranges.get(1)));
+
+        // Original should be unchanged
+        assertEquals(originalSize, originalPendingRanges.size());
+        assertTrue(originalPendingRanges.contains(fullRing));
+
+        // Updated should have ranges subtracted (different from original)
+        assertNotEquals(originalPendingRanges, 
updated.getPendingRangesForTable(testTableId));
+    }
+
+    @Test
+    public void testWithDirectionReversed_PartialCompletion()

Review Comment:
   If I am correct about reversal not marking tables that are completely 
migrated as needing migration then this test needs to be updated to validate 
that those tables end up pending.



##########
src/java/org/apache/cassandra/service/replication/migration/KeyspaceMigrationInfo.java:
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.replication.migration;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import javax.annotation.Nonnull;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.NormalizedRanges;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.tcm.serialization.MetadataSerializer;
+import org.apache.cassandra.tcm.serialization.Version;
+
+import static org.apache.cassandra.db.TypeSizes.sizeof;
+import static org.apache.cassandra.utils.CollectionSerializers.deserializeList;
+import static org.apache.cassandra.utils.CollectionSerializers.deserializeMap;
+import static 
org.apache.cassandra.utils.CollectionSerializers.serializeCollection;
+import static org.apache.cassandra.utils.CollectionSerializers.serializeMap;
+import static 
org.apache.cassandra.utils.CollectionSerializers.serializedCollectionSize;
+import static 
org.apache.cassandra.utils.CollectionSerializers.serializedMapSize;
+
+/**
+ * TCM state tracking mutation tracking migration for a keyspace. Since repair 
advances the migration, and
+ * and repair sessions operate against tables, this class tracks repairs on 
every table that existed in the
+ * keyspace when the migration started.
+ * At the beginning of a migration, the full range is added to the 
pendingRanges, and as repairs are completed, the
+ * repaired ranges are subtracted from the pending ranges. When the pending 
range list is empty, the migration is finished.
+ */
+public class KeyspaceMigrationInfo
+{
+    @Nonnull public final String keyspace;
+    @Nonnull public final Map<TableId, NormalizedRanges<Token>> 
pendingRangesPerTable;
+    @Nonnull public final Epoch startedAtEpoch;
+
+    public KeyspaceMigrationInfo(@Nonnull String keyspace,
+                                 @Nonnull Map<TableId, 
NormalizedRanges<Token>> pendingRangesPerTable,
+                                 @Nonnull Epoch startedAtEpoch)
+    {
+        this.keyspace = Objects.requireNonNull(keyspace);
+        this.pendingRangesPerTable = 
ImmutableMap.copyOf(pendingRangesPerTable);
+        this.startedAtEpoch = Objects.requireNonNull(startedAtEpoch);
+    }
+
+    /**
+     * Reverse migration direction. Since unfinished migrations can be 
aborted, ranges that have not completed migrating
+     * in the previous direction are immediately rolled back. For ranges that 
did complete migration, or tables that were
+     * added since migration started, migration in the other direction is now 
required, so they're marked pending.
+     */
+    public KeyspaceMigrationInfo withDirectionReversed(@Nonnull 
Collection<TableId> allTableIds,
+                                                       @Nonnull Epoch epoch)
+    {
+        Token minimumToken = 
DatabaseDescriptor.getPartitioner().getMinimumToken();
+
+        // Reset all tables to full ring pending (includes tables currently 
migrating, added during migration, or already migrated)
+        ImmutableMap.Builder<TableId, NormalizedRanges<Token>> 
reversedPendingBuilder = ImmutableMap.builder();
+
+        for (TableId tableId : allTableIds)
+        {
+            Range<Token> fullRing = new Range<>(minimumToken, minimumToken);
+            NormalizedRanges<Token> reversedRanges = 
NormalizedRanges.normalizedRanges(Collections.singleton(fullRing));
+
+            NormalizedRanges<Token> existingPending = 
pendingRangesPerTable.get(tableId);
+            if (existingPending != null)
+            {
+                Set<Range<Token>> ranges = 
Range.subtract(Collections.singletonList(fullRing), existingPending);
+                reversedRanges = NormalizedRanges.normalizedRanges(ranges);
+            }
+
+            if (!reversedRanges.isEmpty())
+                reversedPendingBuilder.put(tableId, reversedRanges);
+        }
+
+        return new KeyspaceMigrationInfo(
+            keyspace,
+            reversedPendingBuilder.build(),
+            epoch
+        );
+    }
+
+    /**
+     * Remove tables from migration state. Returns null if all tables removed.
+     */
+    public KeyspaceMigrationInfo withTablesRemoved(@Nonnull Set<TableId> 
tablesToRemove)
+    {
+        if (tablesToRemove.isEmpty())
+            return this;
+
+        ImmutableMap.Builder<TableId, NormalizedRanges<Token>> builder = 
ImmutableMap.builder();
+        boolean anyRemoved = false;
+
+        for (Map.Entry<TableId, NormalizedRanges<Token>> entry : 
pendingRangesPerTable.entrySet())
+        {
+            if (!tablesToRemove.contains(entry.getKey()))
+            {
+                builder.put(entry.getKey(), entry.getValue());
+            }
+            else
+            {
+                anyRemoved = true;
+            }
+        }
+
+        if (!anyRemoved)
+            return this;
+
+        Map<TableId, NormalizedRanges<Token>> newPending = builder.build();
+
+        if (newPending.isEmpty())
+            return null;
+
+        return new KeyspaceMigrationInfo(
+            keyspace,
+            newPending,
+            startedAtEpoch
+        );
+    }
+
+    /**
+     * Subtract repaired ranges from table's pending set.
+     * Automatically removes table if all ranges repaired.
+     */
+    public KeyspaceMigrationInfo withRangesRepairedForTable(@Nonnull TableId 
tableId,

Review Comment:
   This omits the checks that the epoch the repair occurred at is sufficient to 
actually effect the repair of the ranges?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to