tolbertam commented on code in PR #4455:
URL: https://github.com/apache/cassandra/pull/4455#discussion_r2646027042


##########
test/distributed/org/apache/cassandra/distributed/test/repair/AutoRepairOrphanCleanupTest.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.repair;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+import org.apache.cassandra.Util;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.repair.autorepair.AutoRepair;
+import org.apache.cassandra.repair.autorepair.AutoRepairConfig;
+import org.apache.cassandra.service.AutoRepairService;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static 
org.apache.cassandra.schema.SchemaConstants.DISTRIBUTED_KEYSPACE_NAME;
+import static 
org.apache.cassandra.schema.SystemDistributedKeyspace.AUTO_REPAIR_HISTORY;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test that verifies orphan nodes are cleaned up from auto_repair_history 
even when repairs

Review Comment:
   Would be nice to add a comment here indicating what causes a node to become 
an 'orphan' and how it is significant. From what I can discern, this is 
determined in `AutoRepairUtils.myTurnToRunRepair`.  I think your description in 
[CASSANDRA-20995] defines this well so maybe we include the first two 
paragraphs here?  Either that or could just add a link to the jira directly. 
   
   > When a node in the ring goes down, the auto-repair scheduler of all other 
nodes in the cluster start voting for this node's auto-repair history to be 
removed. Once >50% of the cluster vote to delete said down node, it's 
auto-repair history is deleted from the auto-repair system tables.
   >
   > This cleanup process is important to maintain continuous execution of 
repair across the entire cluster. If a node goes down, it will no longer 
perform repair and will not update its repair history in the system tables. 
However, as it is still present in the auto-repair history table, the down node 
will still be considered as a candidate to run repair. As a result, it will 
occupy space in the auto-repair queue and in cases with low auto-repair 
parallelism may even completely block auto-repair within the cluster.
   



##########
test/distributed/org/apache/cassandra/distributed/test/repair/AutoRepairOrphanCleanupTest.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.repair;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+import org.apache.cassandra.Util;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.repair.autorepair.AutoRepair;
+import org.apache.cassandra.repair.autorepair.AutoRepairConfig;
+import org.apache.cassandra.service.AutoRepairService;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static 
org.apache.cassandra.schema.SchemaConstants.DISTRIBUTED_KEYSPACE_NAME;
+import static 
org.apache.cassandra.schema.SystemDistributedKeyspace.AUTO_REPAIR_HISTORY;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test that verifies orphan nodes are cleaned up from auto_repair_history 
even when repairs
+ * are skipped due to min_repair_interval constraints.
+ */
+public class AutoRepairOrphanCleanupTest extends TestBaseImpl
+{
+    private static Cluster cluster;
+
+    @BeforeClass
+    public static void init() throws IOException
+    {
+        // Configure a 3-node cluster with auto_repair enabled but with a very 
high min_repair_interval
+        // This ensures that when we test, repairs will be skipped due to "too 
soon to repair"
+        cluster = Cluster.build(3)
+                         .withTokenCount(4)
+                         
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(3, 4))
+                         .withConfig(config -> config
+                                               .set("num_tokens", 4)
+                                               .set("auto_repair",
+                                                    ImmutableMap.of(
+                                                    "repair_check_interval", 
"1s",
+                                                    "repair_type_overrides",
+                                                    
ImmutableMap.of(AutoRepairConfig.RepairType.FULL.getConfigName(),
+                                                                    
ImmutableMap.builder()
+                                                                               
 .put("initial_scheduler_delay", "0s")
+                                                                               
 .put("enabled", "true")
+                                                                               
 // Set very high min_repair_interval
+                                                                               
 // to ensure repairs are skipped
+                                                                               
 .put("min_repair_interval", "24h")
+                                                                               
 .put("allow_parallel_replica_repair", "true")
+                                                                               
 .put("repair_by_keyspace", "true")
+                                                                               
 .build())))
+                                               .set("auto_repair.enabled", 
"true"))
+                         .start();
+    }
+
+    @AfterClass
+    public static void tearDown()
+    {
+        cluster.close();
+    }
+
+    @Test
+    public void testOrphanNodeCleanupWhenRepairSkipped()
+    {
+        // Insert 3 auto-repair records for each live node and 1 for the 
orphan node
+        List<UUID> liveHostIds = new ArrayList<>();
+        for (int i = 1; i <= 3; i++)
+        {
+            liveHostIds.add(
+            cluster.get(i).callOnInstance(() ->
+                                          
StorageService.instance.getHostIdForEndpoint(
+                                          
FBUtilities.getBroadcastAddressAndPort())));
+        }
+        UUID orphanHostId = UUID.randomUUID();
+
+        long currentTime = System.currentTimeMillis();
+        // Orphan node: oldest finish time, so it is next in line to run repair
+        long orphanStart = currentTime - TimeUnit.HOURS.toMillis(4);   // 4 
hours ago
+        long orphanFinish = currentTime - TimeUnit.HOURS.toMillis(3);  // 3 
hours ago
+
+        // Live nodes: more recent finish times
+        long[] liveStart = {
+        currentTime - TimeUnit.HOURS.toMillis(3), // 3 hours ago
+        currentTime - TimeUnit.HOURS.toMillis(2), // 2 hours ago
+        currentTime - TimeUnit.HOURS.toMillis(1)  // 1 hour ago
+        };
+        long[] liveFinish = {
+        currentTime - TimeUnit.HOURS.toMillis(2), // 2 hours ago
+        currentTime - TimeUnit.HOURS.toMillis(1), // 1 hour ago
+        currentTime                              // now
+        };
+
+        // Insert live node records
+        for (int i = 0; i < 3; i++)
+        {
+            cluster.coordinator(1).execute(String.format(
+            "INSERT INTO %s.%s (repair_type, host_id, repair_start_ts, 
repair_finish_ts, repair_turn) " +
+            "VALUES ('%s', %s, %d, %d, '%s')",
+            DISTRIBUTED_KEYSPACE_NAME,
+            AUTO_REPAIR_HISTORY,
+            AutoRepairConfig.RepairType.FULL,
+            liveHostIds.get(i),
+            liveStart[i],
+            liveFinish[i],
+            "NOT_MY_TURN"
+            ), ConsistencyLevel.QUORUM);
+        }
+
+        // Insert orphan node record (should be next in line to run repair)
+        cluster.coordinator(1).execute(String.format(
+        "INSERT INTO %s.%s (repair_type, host_id, repair_start_ts, 
repair_finish_ts, repair_turn) " +
+        "VALUES ('%s', %s, %d, %d, '%s')",
+        DISTRIBUTED_KEYSPACE_NAME,
+        AUTO_REPAIR_HISTORY,
+        AutoRepairConfig.RepairType.FULL,
+        orphanHostId,
+        orphanStart,
+        orphanFinish,
+        "NOT_MY_TURN"
+        ), ConsistencyLevel.QUORUM);
+
+        // Validate that all 4 records (3 live nodes + 1 orphan) are present 
before starting auto-repair
+        Object[][] initialRows = cluster.coordinator(1).execute(
+        String.format("SELECT host_id FROM %s.%s WHERE repair_type = '%s'",
+                      DISTRIBUTED_KEYSPACE_NAME,
+                      AUTO_REPAIR_HISTORY,
+                      AutoRepairConfig.RepairType.FULL),
+        ConsistencyLevel.QUORUM
+        );
+        assertEquals("Expected 4 records (3 live nodes + 1 orphan) before 
auto-repair starts", 4, initialRows.length);
+        boolean orphanFound = false;
+        int liveNodesFound = 0;
+        for (Object[] row : initialRows)
+        {
+            UUID hostId = (UUID) row[0];
+            if (hostId.equals(orphanHostId))
+                orphanFound = true;
+            else if (liveHostIds.contains(hostId))
+                liveNodesFound++;
+        }
+        assertEquals("All 3 live nodes should be present", 3, liveNodesFound);
+        assertEquals("Orphan node should be present", true, orphanFound);

Review Comment:
   small nit:
   
   could be `assertTrue` instead (with added import)
   
   ```suggestion
           assertTrue("Orphan node should be present", orphanFound);
   ```



##########
src/java/org/apache/cassandra/repair/autorepair/AutoRepair.java:
##########
@@ -188,13 +188,15 @@ public void repair(AutoRepairConfig.RepairType repairType)
             //consistency level to use for local query
             UUID myId = 
StorageService.instance.getHostIdForEndpoint(FBUtilities.getBroadcastAddressAndPort());
 
-            // If it's too soon to run repair, don't bother checking if it's 
our turn.
+            // Calculate repair turn first - this also cleans up orphan nodes 
from auto-repair system tables

Review Comment:
   👍 good to make clear in case anyone does some refactoring later.



##########
test/distributed/org/apache/cassandra/distributed/test/repair/AutoRepairOrphanCleanupTest.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.repair;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+import org.apache.cassandra.Util;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.repair.autorepair.AutoRepair;
+import org.apache.cassandra.repair.autorepair.AutoRepairConfig;
+import org.apache.cassandra.service.AutoRepairService;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static 
org.apache.cassandra.schema.SchemaConstants.DISTRIBUTED_KEYSPACE_NAME;
+import static 
org.apache.cassandra.schema.SystemDistributedKeyspace.AUTO_REPAIR_HISTORY;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test that verifies orphan nodes are cleaned up from auto_repair_history 
even when repairs
+ * are skipped due to min_repair_interval constraints.
+ */
+public class AutoRepairOrphanCleanupTest extends TestBaseImpl
+{
+    private static Cluster cluster;
+
+    @BeforeClass
+    public static void init() throws IOException
+    {
+        // Configure a 3-node cluster with auto_repair enabled but with a very 
high min_repair_interval
+        // This ensures that when we test, repairs will be skipped due to "too 
soon to repair"
+        cluster = Cluster.build(3)
+                         .withTokenCount(4)
+                         
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(3, 4))
+                         .withConfig(config -> config
+                                               .set("num_tokens", 4)
+                                               .set("auto_repair",
+                                                    ImmutableMap.of(
+                                                    "repair_check_interval", 
"1s",
+                                                    "repair_type_overrides",
+                                                    
ImmutableMap.of(AutoRepairConfig.RepairType.FULL.getConfigName(),
+                                                                    
ImmutableMap.builder()
+                                                                               
 .put("initial_scheduler_delay", "0s")
+                                                                               
 .put("enabled", "true")
+                                                                               
 // Set very high min_repair_interval
+                                                                               
 // to ensure repairs are skipped
+                                                                               
 .put("min_repair_interval", "24h")
+                                                                               
 .put("allow_parallel_replica_repair", "true")
+                                                                               
 .put("repair_by_keyspace", "true")
+                                                                               
 .build())))
+                                               .set("auto_repair.enabled", 
"true"))
+                         .start();
+    }
+
+    @AfterClass
+    public static void tearDown()
+    {
+        cluster.close();
+    }
+
+    @Test
+    public void testOrphanNodeCleanupWhenRepairSkipped()
+    {
+        // Insert 3 auto-repair records for each live node and 1 for the 
orphan node
+        List<UUID> liveHostIds = new ArrayList<>();
+        for (int i = 1; i <= 3; i++)
+        {
+            liveHostIds.add(
+            cluster.get(i).callOnInstance(() ->
+                                          
StorageService.instance.getHostIdForEndpoint(
+                                          
FBUtilities.getBroadcastAddressAndPort())));
+        }
+        UUID orphanHostId = UUID.randomUUID();
+
+        long currentTime = System.currentTimeMillis();
+        // Orphan node: oldest finish time, so it is next in line to run repair
+        long orphanStart = currentTime - TimeUnit.HOURS.toMillis(4);   // 4 
hours ago
+        long orphanFinish = currentTime - TimeUnit.HOURS.toMillis(3);  // 3 
hours ago
+
+        // Live nodes: more recent finish times
+        long[] liveStart = {
+        currentTime - TimeUnit.HOURS.toMillis(3), // 3 hours ago
+        currentTime - TimeUnit.HOURS.toMillis(2), // 2 hours ago
+        currentTime - TimeUnit.HOURS.toMillis(1)  // 1 hour ago
+        };
+        long[] liveFinish = {
+        currentTime - TimeUnit.HOURS.toMillis(2), // 2 hours ago
+        currentTime - TimeUnit.HOURS.toMillis(1), // 1 hour ago
+        currentTime                              // now
+        };
+
+        // Insert live node records
+        for (int i = 0; i < 3; i++)
+        {
+            cluster.coordinator(1).execute(String.format(
+            "INSERT INTO %s.%s (repair_type, host_id, repair_start_ts, 
repair_finish_ts, repair_turn) " +
+            "VALUES ('%s', %s, %d, %d, '%s')",
+            DISTRIBUTED_KEYSPACE_NAME,
+            AUTO_REPAIR_HISTORY,
+            AutoRepairConfig.RepairType.FULL,
+            liveHostIds.get(i),
+            liveStart[i],
+            liveFinish[i],
+            "NOT_MY_TURN"
+            ), ConsistencyLevel.QUORUM);
+        }
+
+        // Insert orphan node record (should be next in line to run repair)
+        cluster.coordinator(1).execute(String.format(
+        "INSERT INTO %s.%s (repair_type, host_id, repair_start_ts, 
repair_finish_ts, repair_turn) " +
+        "VALUES ('%s', %s, %d, %d, '%s')",
+        DISTRIBUTED_KEYSPACE_NAME,
+        AUTO_REPAIR_HISTORY,
+        AutoRepairConfig.RepairType.FULL,
+        orphanHostId,
+        orphanStart,
+        orphanFinish,
+        "NOT_MY_TURN"
+        ), ConsistencyLevel.QUORUM);
+
+        // Validate that all 4 records (3 live nodes + 1 orphan) are present 
before starting auto-repair
+        Object[][] initialRows = cluster.coordinator(1).execute(
+        String.format("SELECT host_id FROM %s.%s WHERE repair_type = '%s'",
+                      DISTRIBUTED_KEYSPACE_NAME,
+                      AUTO_REPAIR_HISTORY,
+                      AutoRepairConfig.RepairType.FULL),
+        ConsistencyLevel.QUORUM
+        );
+        assertEquals("Expected 4 records (3 live nodes + 1 orphan) before 
auto-repair starts", 4, initialRows.length);
+        boolean orphanFound = false;
+        int liveNodesFound = 0;
+        for (Object[] row : initialRows)
+        {
+            UUID hostId = (UUID) row[0];
+            if (hostId.equals(orphanHostId))
+                orphanFound = true;
+            else if (liveHostIds.contains(hostId))
+                liveNodesFound++;
+        }
+        assertEquals("All 3 live nodes should be present", 3, liveNodesFound);
+        assertEquals("Orphan node should be present", true, orphanFound);
+
+        // Once the auto_repair_history table is prepared, initialize 
auto-repair service on all nodes
+        cluster.forEach(i -> i.runOnInstance(() -> {
+            try
+            {
+                AutoRepairService.setup();
+                AutoRepair.instance.setup();
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
+        }));
+
+        // Wait for at least one auto-repair cycle to allow orphan cleanup to 
run
+        // (auto_repair.repair_check_interval is set to 1s, so 10s is plenty)
+        Util.spinAssertEquals(true, () -> {
+            Object[][] rows = cluster.coordinator(1).execute(
+            String.format("SELECT host_id FROM %s.%s WHERE repair_type = '%s'",
+                          DISTRIBUTED_KEYSPACE_NAME,
+                          AUTO_REPAIR_HISTORY,
+                          AutoRepairConfig.RepairType.FULL),
+            ConsistencyLevel.QUORUM
+            );
+            // The orphanHostId should not be present in the results
+            for (Object[] row : rows)
+            {
+                UUID hostId = (UUID) row[0];
+                if (hostId.equals(orphanHostId))
+                    return false;
+            }
+            return true;
+        }, 10_000);
+
+        // Query all records for this repair type
+        Object[][] rows = cluster.coordinator(1).execute(
+        String.format("SELECT host_id, repair_start_ts, repair_finish_ts FROM 
%s.%s WHERE repair_type = '%s'",
+                      DISTRIBUTED_KEYSPACE_NAME,
+                      AUTO_REPAIR_HISTORY,
+                      AutoRepairConfig.RepairType.FULL),
+        ConsistencyLevel.QUORUM
+        );
+
+        // Check that the orphan node's record is gone, and live nodes' 
records remain with unchanged timestamps
+        Map<UUID, long[]> actualLiveTimestamps = new HashMap<>();
+        for (Object[] row : rows)
+        {
+            UUID hostId = (UUID) row[0];
+            long startTs = ((Date) row[1]).getTime();
+            long finishTs = ((Date) row[2]).getTime();
+            if (hostId.equals(orphanHostId))
+            {
+                throw new AssertionError("Orphan node record was not cleaned 
up");
+            }
+            actualLiveTimestamps.put(hostId, new long[]{startTs, finishTs});
+        }
+
+        // All live nodes should still be present with their original 
timestamps
+        assertEquals("Unexpected number of live node records", 3, 
actualLiveTimestamps.size());
+        for (int i = 0; i < 3; i++)
+        {
+            UUID hostId = liveHostIds.get(i);
+            long[] expected = new long[]{liveStart[i], liveFinish[i]};
+            long[] actual = actualLiveTimestamps.get(hostId);
+            if (actual == null)
+                throw new AssertionError("Live node record missing for hostId: 
" + hostId);
+            assertEquals("Live node repair_start_ts changed for hostId: " + 
hostId, expected[0], actual[0]);
+            assertEquals("Live node repair_finish_ts changed for hostId: " + 
hostId, expected[1], actual[1]);
+        }

Review Comment:
   I was thinking jira mentioned that the orphan node occupies a repair slot, 
and with a parallelism of 1 this would prevent any repairs from occurring until 
it's cleaned up, that we should we add a sanity check to ensure a repair is run 
after the scheduler start.  But I think it's probably sufficient to ensure that 
the orphan record is gone, I think it would also extend the duration of the 
test from something quick (~1s) to something time bound (like 1 minute if we 
decreased min_repair_interval or messed with the Clock implementation), so I 
think it's probably sufficient to just ensure the orphan row is purged and the 
remaining rows are not disturbed like its doing now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to