tolbertam commented on code in PR #3598:
URL: https://github.com/apache/cassandra/pull/3598#discussion_r1949911742


##########
src/java/org/apache/cassandra/service/ActiveRepairService.java:
##########
@@ -51,13 +51,25 @@
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
 
+import org.apache.cassandra.concurrent.ExecutorPlus;

Review Comment:
   nit: Imports should go with the other org.apache ones.



##########
src/java/org/apache/cassandra/service/ActiveRepairService.java:
##########
@@ -1075,6 +1097,16 @@ public void 
setRepairPendingCompactionRejectThreshold(int value)
         DatabaseDescriptor.setRepairPendingCompactionRejectThreshold(value);
     }
 
+    public double getIncrementalRepairDiskHeadroomRejectRatio()
+    {
+        return 
DatabaseDescriptor.getIncrementalRepairDiskHeadroomRejectRatio();
+    }
+
+    public void setIncrementalRepairDiskHeadroomRejectRatio(double value)
+    {
+        DatabaseDescriptor.setIncrementalRepairDiskHeadroomRejectRatio(value);
+    }
+

Review Comment:
   Can we add these to ActiveRepairServiceMBean so we configure this from JMX?  
Was just thinking that `RepairPendingCompactionRejectThreshold` is something 
i've had to tweak through JMX a bunch in the past



##########
src/java/org/apache/cassandra/config/Config.java:
##########
@@ -354,6 +356,10 @@ public MemtableOptions()
     // The number of executors to use for building secondary indexes
     public volatile int concurrent_index_builders = 2;
 
+    // at least 20% of disk must be unused to run incremental repair
+    // if you want to disable this feature (the recommendation is not to, but 
if you want to disable it for whatever reason) then set the ratio to 0.0
+    public volatile double incremental_repair_disk_headroom_reject_ratio = 0.2;

Review Comment:
   Just noticed this prop is missing from cassandra.yaml, can you add it?



##########
src/java/org/apache/cassandra/config/DurationSpec.java:
##########
@@ -41,7 +42,7 @@
  * users the opportunity to be able to provide config with a unit of their 
choice in cassandra.yaml as per the available
  * options. (CASSANDRA-15234)
  */
-public abstract class DurationSpec
+public abstract class DurationSpec implements Serializable

Review Comment:
   With CASSANDRA-20185 we don't need to make this Serializable anymore I 
think, so would be good to change this back.



##########
test/unit/org/apache/cassandra/service/AutoRepairServiceBasicTest.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.repair.autorepair.AutoRepairConfig;
+
+import static org.junit.Assert.assertEquals;
+
+public class AutoRepairServiceBasicTest extends CQLTester {
+    private static AutoRepairService autoRepairService;
+    private static AutoRepairConfig config;
+
+    @Before

Review Comment:
   nit: paren should go on next line in (methods in file)



##########
test/unit/org/apache/cassandra/repair/autorepair/RepairTokenRangeSplitterTest.java:
##########
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.autorepair;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DataStorageSpec.LongMebibytesBound;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.big.BigFormat;
+import org.apache.cassandra.repair.autorepair.AutoRepairConfig.RepairType;
+import 
org.apache.cassandra.repair.autorepair.RepairTokenRangeSplitter.FilteredRepairAssignments;
+import 
org.apache.cassandra.repair.autorepair.RepairTokenRangeSplitter.SizeEstimate;
+import 
org.apache.cassandra.repair.autorepair.RepairTokenRangeSplitter.SizedRepairAssignment;
+import org.apache.cassandra.service.AutoRepairService;
+import org.apache.cassandra.utils.concurrent.Refs;
+
+import static 
org.apache.cassandra.repair.autorepair.RepairTokenRangeSplitter.MAX_BYTES_PER_SCHEDULE;
+import static 
org.apache.cassandra.repair.autorepair.RepairTokenRangeSplitter.BYTES_PER_ASSIGNMENT;
+import static 
org.apache.cassandra.repair.autorepair.RepairTokenRangeSplitter.MAX_TABLES_PER_ASSIGNMENT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class RepairTokenRangeSplitterTest extends CQLTester
+{
+    private RepairTokenRangeSplitter repairRangeSplitter;
+    private String tableName;
+    private static Range<Token> FULL_RANGE;
+
+    @BeforeClass
+    public static void setUpClass()
+    {
+        CQLTester.setUpClass();
+        AutoRepairService.setup();
+        FULL_RANGE = new 
Range<>(DatabaseDescriptor.getPartitioner().getMinimumToken(), 
DatabaseDescriptor.getPartitioner().getMaximumToken());
+        
DatabaseDescriptor.setSelectedSSTableFormat(DatabaseDescriptor.getSSTableFormats().get(BigFormat.NAME));
+    }
+
+    @Before
+    public void setUp()
+    {
+        repairRangeSplitter = new RepairTokenRangeSplitter(RepairType.FULL, 
Collections.emptyMap());
+        tableName = createTable("CREATE TABLE %s (k INT PRIMARY KEY, v INT)");
+        assertTrue(BigFormat.isSelected());
+    }
+
+    @Test
+    public void testSizePartitionCount() throws Throwable
+    {
+        insertAndFlushTable(tableName, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+        Refs<SSTableReader> sstables = 
RepairTokenRangeSplitter.getSSTableReaderRefs(RepairType.FULL, KEYSPACE, 
tableName, FULL_RANGE);
+        assertEquals(10, 
sstables.iterator().next().getEstimatedPartitionSize().count());
+        SizeEstimate sizes = 
RepairTokenRangeSplitter.getSizesForRangeOfSSTables(RepairType.FULL, KEYSPACE, 
tableName, FULL_RANGE, sstables);
+        assertEquals(10, sizes.partitions);
+    }
+
+    @Test
+    public void testSizePartitionCountSplit() throws Throwable
+    {
+        int[] values = new int[10000];
+        for (int i = 0; i < values.length; i++)
+            values[i] = i + 1;
+        insertAndFlushTable(tableName, values);
+        Iterator<Range<Token>> range = AutoRepairUtils.split(FULL_RANGE, 
2).iterator();
+        Range<Token> tokenRange1 = range.next();
+        Range<Token> tokenRange2 = range.next();
+        Assert.assertFalse(range.hasNext());
+
+        Refs<SSTableReader> sstables1 = 
RepairTokenRangeSplitter.getSSTableReaderRefs(RepairType.FULL, KEYSPACE, 
tableName, tokenRange1);
+        Refs<SSTableReader> sstables2 = 
RepairTokenRangeSplitter.getSSTableReaderRefs(RepairType.FULL, KEYSPACE, 
tableName, tokenRange2);
+        SizeEstimate sizes1 = 
RepairTokenRangeSplitter.getSizesForRangeOfSSTables(RepairType.FULL, KEYSPACE, 
tableName, tokenRange1, sstables1);
+        SizeEstimate sizes2 = 
RepairTokenRangeSplitter.getSizesForRangeOfSSTables(RepairType.FULL, KEYSPACE, 
tableName, tokenRange2, sstables2);
+        // +-5% because HLL merge and the applying of range size approx ratio 
causes estimation errors
+        assertTrue(Math.abs(10000 - (sizes1.partitions + sizes2.partitions)) 
<= 60);
+    }
+
+    @Test
+    public void testGetRepairAssignmentsForTable_NoSSTables()
+    {
+        // Should return 1 assignment if there are no SSTables
+        Collection<Range<Token>> ranges = Collections.singleton(new 
Range<>(Murmur3Partitioner.instance.getMinimumToken(), 
Murmur3Partitioner.instance.getMaximumToken()));
+        List<SizedRepairAssignment> assignments = 
repairRangeSplitter.getRepairAssignmentsForTable(CQLTester.KEYSPACE, tableName, 
ranges);
+        assertEquals(1, assignments.size());
+    }
+
+    @Test
+    public void testGetRepairAssignmentsForTable_Single() throws Throwable
+    {
+        Collection<Range<Token>> ranges = Collections.singleton(new 
Range<>(DatabaseDescriptor.getPartitioner().getMinimumToken(), 
DatabaseDescriptor.getPartitioner().getMaximumToken()));
+        insertAndFlushSingleTable();
+        List<SizedRepairAssignment> assignments = 
repairRangeSplitter.getRepairAssignmentsForTable(CQLTester.KEYSPACE, tableName, 
ranges);
+        assertEquals(1, assignments.size());
+    }
+
+    @Test
+    public void testGetRepairAssignmentsForTable_BatchingTables() throws 
Throwable
+    {
+        repairRangeSplitter = new RepairTokenRangeSplitter(RepairType.FULL, 
Collections.singletonMap(MAX_TABLES_PER_ASSIGNMENT, "2"));
+        Collection<Range<Token>> ranges = Collections.singleton(FULL_RANGE);
+
+        List<String> tableNames = createAndInsertTables(3);
+        List<SizedRepairAssignment> assignments = 
repairRangeSplitter.getRepairAssignmentsForKeyspace(RepairType.FULL, KEYSPACE, 
tableNames, ranges);
+
+        // We expect two assignments, one with table1 and table2 batched, and 
one with table3
+        assertEquals(2, assignments.size());
+        assertEquals(2, assignments.get(0).getTableNames().size());
+        assertEquals(1, assignments.get(1).getTableNames().size());
+    }
+
+    @Test
+    public void testGetRepairAssignmentsForTable_BatchSize() throws Throwable
+    {
+        repairRangeSplitter = new RepairTokenRangeSplitter(RepairType.FULL, 
Collections.singletonMap(MAX_TABLES_PER_ASSIGNMENT, "2"));
+        Collection<Range<Token>> ranges = Collections.singleton(FULL_RANGE);
+
+        List<String> tableNames = createAndInsertTables(2);
+        List<SizedRepairAssignment> assignments = 
repairRangeSplitter.getRepairAssignmentsForKeyspace(RepairType.FULL, KEYSPACE, 
tableNames, ranges);
+
+        // We expect one assignment, with two tables batched
+        assertEquals(1, assignments.size());
+        assertEquals(2, assignments.get(0).getTableNames().size());
+    }
+
+    @Test
+    public void testGetRepairAssignmentsForTable_NoBatching() throws Throwable
+    {
+        repairRangeSplitter = new RepairTokenRangeSplitter(RepairType.FULL, 
Collections.singletonMap(MAX_TABLES_PER_ASSIGNMENT, "1"));
+        Collection<Range<Token>> ranges = Collections.singleton(FULL_RANGE);
+
+        List<String> tableNames = createAndInsertTables(3);
+        List<SizedRepairAssignment> assignments = 
repairRangeSplitter.getRepairAssignmentsForKeyspace(RepairType.FULL, KEYSPACE, 
tableNames, ranges);
+
+        assertEquals(3, assignments.size());
+    }
+
+    @Test
+    public void testGetRepairAssignmentsForTable_AllBatched() throws Throwable
+    {
+        repairRangeSplitter = new RepairTokenRangeSplitter(RepairType.FULL, 
Collections.singletonMap(MAX_TABLES_PER_ASSIGNMENT, "100"));
+        Collection<Range<Token>> ranges = Collections.singleton(FULL_RANGE);
+
+        List<String> tableNames = createAndInsertTables(5);
+        List<SizedRepairAssignment> assignments = 
repairRangeSplitter.getRepairAssignmentsForKeyspace(RepairType.FULL, KEYSPACE, 
tableNames, ranges);
+
+        assertEquals(1, assignments.size());
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testMergeEmptyAssignments()
+    {
+        // Test when the list of assignments is empty
+        List<SizedRepairAssignment> emptyAssignments = Collections.emptyList();
+        RepairTokenRangeSplitter.merge(emptyAssignments);
+    }
+
+    @Test
+    public void testMergeSingleAssignment()
+    {
+        // Test when there is only one assignment in the list
+        String keyspaceName = "testKeyspace";
+        List<String> tableNames = Arrays.asList("table1", "table2");
+
+        SizedRepairAssignment assignment = new 
SizedRepairAssignment(FULL_RANGE, keyspaceName, tableNames);
+        List<SizedRepairAssignment> assignments = 
Collections.singletonList(assignment);
+
+        SizedRepairAssignment result = 
RepairTokenRangeSplitter.merge(assignments);
+
+        assertEquals(FULL_RANGE, result.getTokenRange());
+        assertEquals(keyspaceName, result.getKeyspaceName());
+        assertEquals(new HashSet<>(tableNames), new 
HashSet<>(result.getTableNames()));
+    }
+
+    @Test
+    public void testMergeMultipleAssignmentsWithSameTokenRangeAndKeyspace()
+    {
+        // Test merging multiple assignments with the same token range and 
keyspace
+        String keyspaceName = "testKeyspace";
+        List<String> tableNames1 = Arrays.asList("table1", "table2");
+        List<String> tableNames2 = Arrays.asList("table2", "table3");
+
+        SizedRepairAssignment assignment1 = new 
SizedRepairAssignment(FULL_RANGE, keyspaceName, tableNames1);
+        SizedRepairAssignment assignment2 = new 
SizedRepairAssignment(FULL_RANGE, keyspaceName, tableNames2);
+        List<SizedRepairAssignment> assignments = Arrays.asList(assignment1, 
assignment2);
+
+        SizedRepairAssignment result = 
RepairTokenRangeSplitter.merge(assignments);
+
+        assertEquals(FULL_RANGE, result.getTokenRange());
+        assertEquals(keyspaceName, result.getKeyspaceName());
+        assertEquals(new HashSet<>(Arrays.asList("table1", "table2", 
"table3")), new HashSet<>(result.getTableNames()));
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testMergeDifferentTokenRange()
+    {
+        // Test merging assignments with different token ranges
+        Iterator<Range<Token>> range = AutoRepairUtils.split(FULL_RANGE, 
2).iterator(); // Split the full range into two ranges ie (0-100, 100-200
+        Range<Token> tokenRange1 = range.next();
+        Range<Token> tokenRange2 = range.next();
+        Assert.assertFalse(range.hasNext());
+
+        String keyspaceName = "testKeyspace";
+        List<String> tableNames = Arrays.asList("table1", "table2");
+
+        SizedRepairAssignment assignment1 = new 
SizedRepairAssignment(tokenRange1, keyspaceName, tableNames);
+        SizedRepairAssignment assignment2 = new 
SizedRepairAssignment(tokenRange2, keyspaceName, tableNames);
+        List<SizedRepairAssignment> assignments = Arrays.asList(assignment1, 
assignment2);
+
+        RepairTokenRangeSplitter.merge(assignments); // Should throw 
IllegalStateException
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testMergeDifferentKeyspaceName()
+    {
+        // Test merging assignments with different keyspace names
+        List<String> tableNames = Arrays.asList("table1", "table2");
+
+        SizedRepairAssignment assignment1 = new 
SizedRepairAssignment(FULL_RANGE, "keyspace1", tableNames);
+        SizedRepairAssignment assignment2 = new 
SizedRepairAssignment(FULL_RANGE, "keyspace2", tableNames);
+        List<SizedRepairAssignment> assignments = Arrays.asList(assignment1, 
assignment2);
+
+        RepairTokenRangeSplitter.merge(assignments); // Should throw 
IllegalStateException
+    }
+
+    @Test
+    public void testMergeWithDuplicateTables()
+    {
+        // Test merging assignments with duplicate table names
+        String keyspaceName = "testKeyspace";
+        List<String> tableNames1 = Arrays.asList("table1", "table2");
+        List<String> tableNames2 = Arrays.asList("table2", "table3");
+
+        SizedRepairAssignment assignment1 = new 
SizedRepairAssignment(FULL_RANGE, keyspaceName, tableNames1);
+        SizedRepairAssignment assignment2 = new 
SizedRepairAssignment(FULL_RANGE, keyspaceName, tableNames2);
+        List<SizedRepairAssignment> assignments = Arrays.asList(assignment1, 
assignment2);
+
+        RepairAssignment result = RepairTokenRangeSplitter.merge(assignments);
+
+        // The merged result should contain all unique table names
+        assertEquals(new HashSet<>(Arrays.asList("table1", "table2", 
"table3")), new HashSet<>(result.getTableNames()));
+    }
+
+    @Test
+    public void 
testGetRepairAssignmentsSplitsBySubrangeSizeAndFilterLimitsByMaxBytesPerSchedule()
+    {
+        // Ensures that getRepairAssignments splits by BYTES_PER_ASSIGNMENT 
and filterRepairAssignments limits by MAX_BYTES_PER_SCHEDULE.
+        repairRangeSplitter = new 
RepairTokenRangeSplitter(RepairType.INCREMENTAL, Collections.emptyMap());
+        repairRangeSplitter.setParameter(BYTES_PER_ASSIGNMENT, "50GiB");
+        repairRangeSplitter.setParameter(MAX_BYTES_PER_SCHEDULE, "100GiB");
+
+        // Given a size estimate of 1024GiB, we should expect 21 splits 
(50GiB*21 = 1050GiB < 1024GiB)
+        SizeEstimate sizeEstimate = sizeEstimateByBytes(new 
LongMebibytesBound("1024GiB"));
+
+        List<SizedRepairAssignment> assignments = 
repairRangeSplitter.getRepairAssignments(Collections.singletonList(sizeEstimate));
+
+        // Should be 21 assignments, each being ~48.76 GiB
+        assertEquals(21, assignments.size());
+        long expectedBytes = 52357696560L;
+        for (int i = 0; i < assignments.size(); i++)
+        {
+            SizedRepairAssignment assignment = assignments.get(i);
+            assertEquals("Did not get expected value for assignment " + i, 
52357696560L, assignment.getEstimatedBytes());
+        }
+
+        // When filtering we should only get 2 assignments back (48.76 * 2 < 
100GiB)
+        FilteredRepairAssignments filteredRepairAssignments = 
repairRangeSplitter.filterRepairAssignments(0, KEYSPACE, assignments, 0);
+        List<RepairAssignment> finalRepairAssignments = 
filteredRepairAssignments.repairAssignments;
+        assertEquals(2, finalRepairAssignments.size());
+        assertEquals(expectedBytes*2, filteredRepairAssignments.newBytesSoFar);
+    }
+
+    @Test(expected=IllegalArgumentException.class)
+    public void testSetParameterShouldNotAllowUnknownParameter()
+    {
+        repairRangeSplitter.setParameter("unknown", "x");
+    }
+
+    @Test(expected=IllegalArgumentException.class)
+    public void 
testSetParameterShouldNotAllowSettingBytesPerAssignmentGreaterThanMaxBytesPerSchedule()
+    {
+        repairRangeSplitter.setParameter(MAX_BYTES_PER_SCHEDULE, "500GiB");
+        repairRangeSplitter.setParameter(BYTES_PER_ASSIGNMENT, "600GiB");
+    }
+
+    @Test(expected=IllegalArgumentException.class)
+    public void 
testSetParameterShouldNotAllowSettingMaxBytesPerScheduleLessThanBytesPerAssignment()
+    {
+        repairRangeSplitter.setParameter(BYTES_PER_ASSIGNMENT, "100MiB");
+        repairRangeSplitter.setParameter(MAX_BYTES_PER_SCHEDULE, "50MiB");
+    }
+
+    @Test
+    public void testGetParameters()
+    {
+        repairRangeSplitter.setParameter(BYTES_PER_ASSIGNMENT, "100MiB");
+        repairRangeSplitter.setParameter(MAX_TABLES_PER_ASSIGNMENT, "5");
+
+        Map<String, String> parameters = repairRangeSplitter.getParameters();
+        // Each parameter should be present.
+        assertEquals(RepairTokenRangeSplitter.PARAMETERS.size(), 
parameters.size());
+        // The parameters we explicitly set should be set exactly as we set 
them.
+        assertEquals("100MiB", parameters.get(BYTES_PER_ASSIGNMENT));
+        assertEquals("5", parameters.get(MAX_TABLES_PER_ASSIGNMENT));
+    }
+
+    private SizeEstimate sizeEstimateByBytes(LongMebibytesBound totalSize)
+    {
+        return sizeEstimateByBytes(totalSize, totalSize);
+    }
+
+    private SizeEstimate sizeEstimateByBytes(LongMebibytesBound sizeInRange, 
LongMebibytesBound totalSize)
+    {
+        return new SizeEstimate(RepairType.INCREMENTAL, KEYSPACE, "table1", 
FULL_RANGE, 1, sizeInRange.toBytes(), totalSize.toBytes());
+    }
+
+
+    private void insertAndFlushSingleTable() throws Throwable
+    {
+        execute("INSERT INTO %s (k, v) values (?, ?)", 1, 1);
+        flush();
+    }
+
+    private List<String> createAndInsertTables(int count) throws Throwable
+    {
+        List<String> tableNames = new ArrayList<>();
+        for (int i = 0; i < count; i++) {

Review Comment:
   nit: paren on next line



##########
src/java/org/apache/cassandra/utils/FBUtilities.java:
##########
@@ -85,7 +87,6 @@
 import org.apache.cassandra.io.sstable.metadata.MetadataType;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.DataOutputBufferFixed;
-import org.apache.cassandra.io.util.File;

Review Comment:
   nit: accidental import move, can this be reverted?



##########
src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java:
##########
@@ -25,16 +25,19 @@
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.config.DatabaseDescriptor;

Review Comment:
   nit: move imports



##########
src/java/org/apache/cassandra/config/Config.java:
##########
@@ -32,6 +32,8 @@
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
+
+import org.apache.cassandra.repair.autorepair.AutoRepairConfig;

Review Comment:
   nit:  move import below `AuditLogOptions` import



##########
src/java/org/apache/cassandra/config/ParameterizedClass.java:
##########
@@ -30,7 +31,7 @@
 import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
 
 @Shared(scope = SIMULATION)
-public class ParameterizedClass
+public class ParameterizedClass implements Serializable

Review Comment:
   same as `DurationSpec` shouldn't need to be `Serializable` anymore.



##########
src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java:
##########
@@ -20,6 +20,8 @@
 import java.io.IOException;
 import java.util.Iterator;
 
+import javax.annotation.Nullable;

Review Comment:
   I'll revert this with my change to use `onDiskSizeForPartitionPositions`



##########
src/java/org/apache/cassandra/io/sstable/format/SSTableScanner.java:
##########
@@ -83,6 +85,14 @@ protected SSTableScanner(S sstable,
         this.listener = listener;
     }
 
+    public static List<AbstractBounds<PartitionPosition>> 
makeBounds(SSTableReader sstable, Collection<Range<Token>> tokenRanges)
+    {
+        List<AbstractBounds<PartitionPosition>> boundsList = new 
ArrayList<>(tokenRanges.size());
+        for (Range<Token> range : Range.normalize(tokenRanges))
+            addRange(sstable, Range.makeRowRange(range), boundsList);
+        return boundsList;
+    }
+

Review Comment:
   It looks like this code exists on 4.1, but no longer exists on trunk.   I 
also realized that `RepairTokenRangeScanner` needs to account for `Bti` instead 
of using `BigTableScanner` directly.   After looking at the code,  I think we 
can make this work without changing this code at all and we can make it 
implementation agnostic.  I'll propose something.



##########
test/unit/org/apache/cassandra/tools/nodetool/SSTableRepairedSetTest.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.nodetool;
+
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.tools.NodeProbe;
+import org.apache.cassandra.tools.Output;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class SSTableRepairedSetTest
+{
+    @Mock
+    private NodeProbe probe;
+
+    private SSTableRepairedSet cmd;
+
+    @Before
+    public void setUp() {

Review Comment:
   nit: parens in next line (all methods in this file)



##########
src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java:
##########
@@ -190,11 +192,11 @@ private boolean 
cdcRequiresWriteCommitLog(ColumnFamilyStore cfs)
      * For CDC-enabled tables and write path for CDC is enabled, we want to 
ensure that the mutations are
      * run through the CommitLog, so they can be archived by the CDC process 
on discard.
      */
-    private boolean requiresWritePath(ColumnFamilyStore cfs)
+    public boolean requiresWritePath(ColumnFamilyStore cfs)

Review Comment:
   Looks like this is only used externally by `CassandraStreamReceiverTest`, so 
suggest changing to default visibility and use `@VisibleForTesting` (which 
would need to be imported as well).
   ```suggestion
       @VisibleForTesting
       boolean requiresWritePath(ColumnFamilyStore cfs)
   ```



##########
src/java/org/apache/cassandra/repair/autorepair/RepairTokenRangeSplitter.java:
##########
@@ -0,0 +1,1001 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.autorepair;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.cassandra.db.DataRange;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.io.sstable.SSTableReadsListener;
+import org.apache.cassandra.io.sstable.format.big.BigTableReader;
+import org.apache.cassandra.tcm.compatibility.TokenRingUtils;
+import org.apache.cassandra.utils.FBUtilities;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
+import com.clearspring.analytics.stream.cardinality.ICardinality;
+import org.apache.cassandra.config.DataStorageSpec;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.lifecycle.SSTableIntervalTree;
+import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.db.lifecycle.View;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.big.BigTableScanner;
+import org.apache.cassandra.io.sstable.metadata.CompactionMetadata;
+import org.apache.cassandra.io.sstable.metadata.MetadataType;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.service.AutoRepairService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.concurrent.Refs;
+
+import static org.apache.cassandra.repair.autorepair.AutoRepairUtils.split;
+
+/**
+ * In Apache Cassandra, tuning repair ranges has four main goals:
+ * <ol>
+ * <li>
+ *     <b>Create smaller, consistent repair times</b>: Long repairs, such as 
those lasting 15 hours, can be problematic.
+ *     If a node fails 14 hours into the repair, the entire process must be 
restarted. The goal is to reduce the impact
+ *     of disturbances or failures. However, making the repairs too short can 
lead to overhead from repair orchestration
+ *     becoming the main bottleneck.
+ * </li>
+ * <li>
+ *     <b>Minimize the impact on hosts</b>: Repairs should not heavily affect 
the host systems. For incremental repairs,
+ *     this might involve anti-compaction work. In full repairs, streaming 
large amounts of data—especially with wide
+ *     partitions—can lead to issues with disk usage and higher compaction 
costs.
+ * </li>
+ * <li>
+ *     <b>Reduce overstreaming</b>: The Merkle tree, which represents data 
within each partition and range,
+ *     has a maximum size. If a repair covers too many partitions, the tree’s 
leaves represent larger data ranges.
+ *     Even a small change in a leaf can trigger excessive data streaming, 
making the process inefficient.
+ * </li>
+ * <li>
+ *     <b>Reduce number of repairs</b>: If there are many small tables, it's 
beneficial to batch these tables together
+ *     under a single parent repair. This prevents the repair overhead from 
becoming a bottleneck, especially when
+ *     dealing with hundreds of tables. Running individual repairs for each 
table can significantly impact performance
+ *     and efficiency.
+ * </li>
+ * </ol>
+ * To manage these issues, the strategy involves estimating the size and 
number of partitions within a range and
+ * splitting it accordingly to bound the size of the range splits. This is 
established by iterating over SSTable
+ * index files to estimate the amount of bytes and partitions involved in the 
ranges being repaired and by what
+ * repair type is being invoked.
+ * <p/>
+ * While this splitter has a lot of tuning parameters, the expectation is that 
the established default configuration
+ * shall be sensible for all {@link 
org.apache.cassandra.repair.autorepair.AutoRepairConfig.RepairType}'s. The 
following
+ * configuration parameters are offered.
+ * <ul>
+ *     <li>
+ *         <b>bytes_per_assigment</b>: The target and maximum amount of bytes 
that should be included in a repair
+ *         assignment. This is meant to scope the amount of work involved in a 
repair. For incremental repair, this
+ *         involves the total number of bytes in all SSTables containing 
unrepaired data involving the ranges being
+ *         repaired, including data that doesn't cover the range. This is to 
account for the amount of anticompaction
+ *         that is expected. For all other repair types, this involves the 
amount of data covering the range being
+ *         repaired.
+ *     </li>
+ *     <li>
+ *         <b>partitions_per_assignment</b>: The maximum number of partitions 
that should be included in a repair
+ *         assignment. This configuration exists to reduce excessive 
overstreaming by attempting to limit the number
+ *         of partitions present in a merkle tree leaf node.
+ *     </li>
+ *     <li>
+ *         <b>max_tables_per_assignment</b>: The maximum number of tables that 
can be included in a repair assignment.
+ *         This aims to reduce the number of repairs, especially in cases 
where a large amount of tables exists for
+ *         a keyspace. Note that the splitter will avoid batching tables 
together if they exceed the other
+ *         configuration parameters such as <code>bytes_per_assignment</code> 
and <code>partitions_per_assignment</code>.
+ *     </li>
+ *     <li>
+ *         <b>max_bytes_per_schedule</b>: The maximum number of bytes to cover 
an individual schedule. This serves
+ *         as a mechanism for throttling the amount of work that can be done 
on each repair cycle. One may opt to
+ *         reduce this value if the impact of repairs is causing too many load 
on the cluster, or increase it if
+ *         writes outpace the amount of data being repaired. Alternatively, 
one may want to choose tuning down or up
+ *         the <code>min_repair_interval</code>.
+ *     </li>
+ * </ul>
+ * Given the impact of what each repair type accomplishes, different defaults 
are established per repair type.
+ * <ul>
+ *     <li>
+ *         <b>full</b>:  Configured in a way that attempts to accomplish 
repairing all data in a schedule, with
+ *         individual repairs targeting at most 200GiB of data and 1048576 
partitions.
+ *         <b>max_bytes_per_schedule</b> is set to a large value for full 
repair to attempt to repair all data per
+ *         repair schedule.
+ *         <ul>
+ *             <li><b>bytes_per_assignment</b>: 200GiB</li>
+ *             <li><b>partitions_per_assignment</b>: 1048576</li>
+ *             <li><b>max_tables_per_assignment</b>: 64</li>
+ *             <li><b>max_bytes_per_schedule</b>: 100TiB</li>
+ *         </ul>
+ *     </li>
+ *     <li>
+ *         <b>incremental</b>: Configured in a way that attempts to repair 
50GiB of data per repair, and 100GiB per
+ *         schedule. This attempts to throttle the amount of IR and 
anticompaction done per schedule after turning
+ *         incremental on for the first time. You may want to consider 
increasing <code>max_bytes_per_schedule</code>
+ *         more than this much data is written per 
<code>min_repair_interval</code>.
+ *         <ul>
+ *             <li><b>bytes_per_assignment</b>: 50GiB</li>
+ *             <li><b>partitions_per_assignment</b>: 1048576</li>
+ *             <li><b>max_tables_per_assignment</b>: 64</li>
+ *             <li><b>max_bytes_per_schedule</b>: 100GiB</li>
+ *         </ul>
+ *     </li>
+ *     <li>
+ *         <b>preview_repaired</b>:  Configured in a way that attempts to 
accomplish previewing all data in a schedule,
+ *         with previews targeting at most 200GiB of data and 1048576 
partitions.
+ *         <ul>
+ *             <li><b>bytes_per_assignment</b>: 200GiB</li>
+ *             <li><b>partitions_per_assignment</b>: 1048576</li>
+ *             <li><b>max_tables_per_assignment</b>: 64</li>
+ *             <li><b>max_bytes_per_schedule</b>: 100TiB</li>
+ *         </ul>
+ *     </li>
+ * </ul>
+ */
+public class RepairTokenRangeSplitter implements IAutoRepairTokenRangeSplitter
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(RepairTokenRangeSplitter.class);
+
+    // Default max bytes to 100TiB, which is much more readable than 
Long.MAX_VALUE
+    private static final DataStorageSpec.LongBytesBound MAX_BYTES = new 
DataStorageSpec.LongBytesBound(100_000, 
DataStorageSpec.DataStorageUnit.GIBIBYTES);
+
+    static final String BYTES_PER_ASSIGNMENT = "bytes_per_assignment";
+    static final String PARTITIONS_PER_ASSIGNMENT = 
"partitions_per_assignment";
+    static final String MAX_TABLES_PER_ASSIGNMENT = 
"max_tables_per_assignment";
+    static final String MAX_BYTES_PER_SCHEDULE = "max_bytes_per_schedule";
+
+    static final List<String> PARAMETERS = Arrays.asList(BYTES_PER_ASSIGNMENT, 
PARTITIONS_PER_ASSIGNMENT, MAX_TABLES_PER_ASSIGNMENT, MAX_BYTES_PER_SCHEDULE);
+
+    private final AutoRepairConfig.RepairType repairType;
+
+    private final Map<String, String> givenParameters = new HashMap<>();
+
+    private DataStorageSpec.LongBytesBound bytesPerAssignment;
+    private long partitionsPerAssignment;
+    private int maxTablesPerAssignment;
+    private DataStorageSpec.LongBytesBound maxBytesPerSchedule;
+
+    /**
+     * Established default for each {@link 
org.apache.cassandra.repair.autorepair.AutoRepairConfig.RepairType}, meant to
+     * choose sensible defaults for each.
+     * <p>
+     * Defaults if not specified for the given repair type:
+     * <li>
+     *     <ul><b>bytes_per_assignment</b>: 200GiB</ul>
+     *     <ul><b>partitions_per_assignment</b>: 
2^repair_session_max_tree_depth</ul>
+     *     <ul><b>max_tables_per_assignment</b>: 64</ul>
+     *     <ul><b>max_bytes_per_schedule</b>: 1000GiB</ul>
+     * </li>
+     * It's expected that these defaults should work well for everything 
except incremental, so we confine
+     * bytes_per_assignment to 50GiB and max_bytes_per_schedule to 100GiB. 
This should strike a good balance
+     * between the amount of data that will be repaired during an initial 
migration to incremental repair and should
+     * move the entire repaired set from unrepaired to repaired at steady 
state, assuming not more the 100GiB of
+     * data is written to a node per min_repair_interval.
+     */
+    private static final Map<AutoRepairConfig.RepairType, RepairTypeDefaults> 
DEFAULTS_BY_REPAIR_TYPE = new EnumMap<>(AutoRepairConfig.RepairType.class) {{
+        put(AutoRepairConfig.RepairType.FULL, 
RepairTypeDefaults.builder(AutoRepairConfig.RepairType.FULL)
+                                                                .build());
+        // Restrict incremental repair to 50GB bytes per assignment to confine 
the amount of possible autocompaction.
+        put(AutoRepairConfig.RepairType.INCREMENTAL, 
RepairTypeDefaults.builder(AutoRepairConfig.RepairType.INCREMENTAL)
+                                                                       
.withBytesPerAssignment(new DataStorageSpec.LongBytesBound("50GiB"))
+                                                                       
.withMaxBytesPerSchedule(new DataStorageSpec.LongBytesBound("100GiB"))
+                                                                       
.build());
+        put(AutoRepairConfig.RepairType.PREVIEW_REPAIRED, 
RepairTypeDefaults.builder(AutoRepairConfig.RepairType.PREVIEW_REPAIRED)
+                                                                            
.build());
+    }};
+
+    public RepairTokenRangeSplitter(AutoRepairConfig.RepairType repairType, 
Map<String, String> parameters)
+    {
+        this.repairType = repairType;
+        this.givenParameters.putAll(parameters);
+
+        reinitParameters();
+    }
+
+    private void reinitParameters()
+    {
+        RepairTypeDefaults defaults = DEFAULTS_BY_REPAIR_TYPE.get(repairType);
+
+        DataStorageSpec.LongBytesBound bytesPerAssignmentTmp = 
getPropertyOrDefault(BYTES_PER_ASSIGNMENT, DataStorageSpec.LongBytesBound::new, 
defaults.bytesPerAssignment);
+        DataStorageSpec.LongBytesBound maxBytesPerScheduleTmp = 
getPropertyOrDefault(MAX_BYTES_PER_SCHEDULE, 
DataStorageSpec.LongBytesBound::new, defaults.maxBytesPerSchedule);
+
+        // Validate that bytesPerAssignment <= maxBytesPerSchedule
+        if (bytesPerAssignmentTmp.toBytes() > maxBytesPerScheduleTmp.toBytes())
+        {
+            throw new IllegalArgumentException(String.format("%s='%s' cannot 
be greater than %s='%s' for %s",
+                                                             
BYTES_PER_ASSIGNMENT,
+                                                             
bytesPerAssignmentTmp,
+                                                             
MAX_BYTES_PER_SCHEDULE,
+                                                             
maxBytesPerScheduleTmp,
+                                                             
repairType.getConfigName()));
+        }
+
+        bytesPerAssignment = bytesPerAssignmentTmp;
+        maxBytesPerSchedule = maxBytesPerScheduleTmp;
+
+        partitionsPerAssignment = 
getPropertyOrDefault(PARTITIONS_PER_ASSIGNMENT, Long::parseLong, 
defaults.partitionsPerAssignment);
+        maxTablesPerAssignment = 
getPropertyOrDefault(MAX_TABLES_PER_ASSIGNMENT, Integer::parseInt, 
defaults.maxTablesPerAssignment);
+
+        logger.info("Configured {}[{}] with {}={}, {}={}, {}={}, {}={}", 
RepairTokenRangeSplitter.class.getName(),
+                    repairType.getConfigName(),
+                    BYTES_PER_ASSIGNMENT, bytesPerAssignment,
+                    PARTITIONS_PER_ASSIGNMENT, partitionsPerAssignment,
+                    MAX_TABLES_PER_ASSIGNMENT, maxTablesPerAssignment,
+                    MAX_BYTES_PER_SCHEDULE, maxBytesPerSchedule);
+    }
+
+    private <T> T getPropertyOrDefault(String propertyName, Function<String, 
T> mapper, T defaultValue)
+    {
+        return 
Optional.ofNullable(this.givenParameters.get(propertyName)).map(mapper).orElse(defaultValue);
+    }
+
+    @Override
+    public Iterator<KeyspaceRepairAssignments> getRepairAssignments(boolean 
primaryRangeOnly, List<PrioritizedRepairPlan> repairPlans)
+    {
+        return new BytesBasedRepairAssignmentIterator(primaryRangeOnly, 
repairPlans);
+    }
+
+    /**
+     * A custom {@link RepairAssignmentIterator} that confines the number of 
repair assignments to
+     * <code>max_bytes_per_schedule</code>.
+     */
+    private class BytesBasedRepairAssignmentIterator extends 
RepairAssignmentIterator {
+
+        private final boolean primaryRangeOnly;
+        private long bytesSoFar = 0;
+
+        BytesBasedRepairAssignmentIterator(boolean primaryRangeOnly, 
List<PrioritizedRepairPlan> repairPlans)
+        {
+            super(repairPlans);
+            this.primaryRangeOnly = primaryRangeOnly;
+        }
+
+        @Override
+        protected KeyspaceRepairAssignments next(int priority, 
KeyspaceRepairPlan repairPlan)
+        {
+            // short circuit if we've accumulated too many bytes by returning 
a KeyspaceRepairAssignments with
+            // no assignments. We do this rather than returning false in 
hasNext() because we want to signal
+            // to AutoRepair that a keyspace generated no assignments.
+            if (bytesSoFar >= maxBytesPerSchedule.toBytes())
+            {
+                return new KeyspaceRepairAssignments(priority, 
repairPlan.getKeyspaceName(), Collections.emptyList());
+            }
+
+            Collection<Range<Token>> tokenRanges = 
getTokenRanges(primaryRangeOnly, repairPlan.getKeyspaceName());
+            List<SizedRepairAssignment> repairAssignments = 
getRepairAssignmentsForKeyspace(repairType, repairPlan.getKeyspaceName(), 
repairPlan.getTableNames(), tokenRanges);
+            FilteredRepairAssignments filteredRepairAssignments = 
filterRepairAssignments(priority, repairPlan.getKeyspaceName(), 
repairAssignments, bytesSoFar);
+            bytesSoFar = filteredRepairAssignments.newBytesSoFar;
+            return new KeyspaceRepairAssignments(priority, 
repairPlan.getKeyspaceName(), filteredRepairAssignments.repairAssignments);
+        }
+    }
+
+    @VisibleForTesting
+    List<SizedRepairAssignment> 
getRepairAssignmentsForKeyspace(AutoRepairConfig.RepairType repairType, String 
keyspaceName, List<String> tableNames, Collection<Range<Token>> tokenRanges)
+    {
+        List<SizedRepairAssignment> repairAssignments = new ArrayList<>();
+        // this is used for batching minimal single assignment tables together
+        List<SizedRepairAssignment> currentAssignments = new ArrayList<>();
+
+        AutoRepairConfig config = 
AutoRepairService.instance.getAutoRepairConfig();
+
+        // If we can repair by keyspace, sort the tables by size so can batch 
the smallest ones together
+        boolean repairByKeyspace = config.getRepairByKeyspace(repairType);
+        if (repairByKeyspace)
+        {
+            tableNames.sort((t1, t2) -> {
+                ColumnFamilyStore cfs1 = 
ColumnFamilyStore.getIfExists(keyspaceName, t1);
+                ColumnFamilyStore cfs2 = 
ColumnFamilyStore.getIfExists(keyspaceName, t2);
+                // If for whatever reason the CFS is not retrievable, we can 
assume it has been deleted, so give the
+                // other cfs precedence.
+                if (cfs1 == null)
+                {
+                    // cfs1 is lesser than because its null
+                    return -1;
+                }
+                else if (cfs2 == null)
+                {
+                    // cfs1 is greather than because cfs2 is null
+                    return 1;
+                }
+                return Long.compare(cfs1.metric.totalDiskSpaceUsed.getCount(), 
cfs2.metric.totalDiskSpaceUsed.getCount());
+            });
+        }
+
+        for (String tableName : tableNames)
+        {
+            List<SizedRepairAssignment> tableAssignments = 
getRepairAssignmentsForTable(keyspaceName, tableName, tokenRanges);
+
+            if (tableAssignments.isEmpty())
+                continue;
+
+            // if not repairing by keyspace don't attempt to batch them with 
others.
+            if (!repairByKeyspace)
+            {
+                repairAssignments.addAll(tableAssignments);
+            }
+            // If the table assignments are for the same token range, and we 
have room to add more tables to the current assignment
+            else if (tableAssignments.size() == 1 &&
+                     currentAssignments.size() < maxTablesPerAssignment &&
+                     (currentAssignments.isEmpty() || 
currentAssignments.get(0).getTokenRange().equals(tableAssignments.get(0).getTokenRange())))
+            {
+                long currentAssignmentsBytes = 
getEstimatedBytes(currentAssignments);
+                long tableAssignmentsBytes = 
getEstimatedBytes(tableAssignments);
+                // only add assignments together if they don't exceed max 
bytes per schedule.
+                if (currentAssignmentsBytes + tableAssignmentsBytes < 
maxBytesPerSchedule.toBytes()) {
+                    currentAssignments.addAll(tableAssignments);
+                }
+                else
+                {
+                    // add table assignments by themselves
+                    repairAssignments.addAll(tableAssignments);
+                }
+            }
+            else
+            {
+                if (!currentAssignments.isEmpty())
+                {
+                    repairAssignments.add(merge(currentAssignments));
+                    currentAssignments.clear();
+                }
+                repairAssignments.addAll(tableAssignments);
+            }
+        }
+
+        if (!currentAssignments.isEmpty())
+            repairAssignments.add(merge(currentAssignments));
+
+        return repairAssignments;
+    }
+
+    /**
+     * Given a repair type and map of sized-based repair assignments, confine 
them by <code>maxBytesPerSchedule</code>.
+     * @param repairAssignments the assignments to filter.
+     * @param bytesSoFar repair assignment bytes accumulated so far.
+     * @return A list of repair assignments confined by 
<code>maxBytesPerSchedule</code>.
+     */
+    @VisibleForTesting
+    FilteredRepairAssignments filterRepairAssignments(int priority, String 
keyspaceName, List<SizedRepairAssignment> repairAssignments, long bytesSoFar)
+    {
+        // Confine repair assignments by maxBytesPerSchedule.
+        long bytesSoFarThisIteration = 0L;
+        long bytesNotRepaired = 0L;
+        int assignmentsNotRepaired = 0;
+        int assignmentsToRepair = 0;
+        int totalAssignments = 0;
+
+        List<RepairAssignment> assignmentsToReturn = new 
ArrayList<>(repairAssignments.size());
+        for (SizedRepairAssignment repairAssignment : repairAssignments)
+        {
+            totalAssignments++;
+            // skip any repair assignments that would accumulate us past the 
maxBytesPerSchedule
+            if (bytesSoFar + repairAssignment.getEstimatedBytes() > 
maxBytesPerSchedule.toBytes())
+            {
+                // log that repair assignment was skipped.
+                bytesNotRepaired += repairAssignment.getEstimatedBytes();
+                assignmentsNotRepaired++;
+                logger.warn("Skipping {} because it would increase total 
repair bytes to {}",
+                            repairAssignment,
+                            getBytesOfMaxBytesPerSchedule(bytesSoFar + 
repairAssignment.getEstimatedBytes()));
+            }
+            else
+            {
+                bytesSoFar += repairAssignment.getEstimatedBytes();
+                bytesSoFarThisIteration += 
repairAssignment.getEstimatedBytes();
+                assignmentsToRepair++;
+                logger.info("Adding {}, increasing repair bytes to {}",
+                            repairAssignment,
+                            getBytesOfMaxBytesPerSchedule(bytesSoFar));
+                assignmentsToReturn.add(repairAssignment);
+            }
+        }
+
+        String message = "Returning {} assignment(s) for priorityBucket {} and 
keyspace {}, totaling {} ({} overall)";
+        if (assignmentsNotRepaired != 0)
+        {
+            message += ". Skipping {} of {} assignment(s), totaling {}";
+            if (repairType != AutoRepairConfig.RepairType.INCREMENTAL)
+            {
+                message += ". The entire primary range will not be repaired 
this schedule. " +
+                           "Consider increasing maxBytesPerSchedule, reducing 
node density or monitoring to ensure " +
+                           "all ranges do get repaired within 
gc_grace_seconds";
+                logger.warn(message, assignmentsToRepair, priority, 
keyspaceName,
+                            
FileUtils.stringifyFileSize(bytesSoFarThisIteration),
+                            getBytesOfMaxBytesPerSchedule(bytesSoFar),
+                            assignmentsNotRepaired, totalAssignments,
+                            FileUtils.stringifyFileSize(bytesNotRepaired));
+            }
+            else
+            {
+                logger.info(message, assignmentsToRepair, priority, 
keyspaceName,
+                            
FileUtils.stringifyFileSize(bytesSoFarThisIteration),
+                            getBytesOfMaxBytesPerSchedule(bytesSoFar),
+                            assignmentsNotRepaired, totalAssignments,
+                            FileUtils.stringifyFileSize(bytesNotRepaired));
+            }
+        }
+        else
+        {
+            logger.info(message, assignmentsToRepair, priority, keyspaceName,
+                        FileUtils.stringifyFileSize(bytesSoFarThisIteration),
+                        getBytesOfMaxBytesPerSchedule(bytesSoFar));
+        }
+
+        return new FilteredRepairAssignments(assignmentsToReturn, bytesSoFar);
+    }
+
+    @VisibleForTesting
+    static class FilteredRepairAssignments
+    {
+        final List<RepairAssignment> repairAssignments;
+        final long newBytesSoFar;
+
+        private FilteredRepairAssignments(List<RepairAssignment> 
repairAssignments, long newBytesSoFar)
+        {
+            this.repairAssignments = repairAssignments;
+            this.newBytesSoFar = newBytesSoFar;
+        }
+    }
+
+    private String getBytesOfMaxBytesPerSchedule(long bytes)
+    {
+        if (maxBytesPerSchedule.equals(MAX_BYTES))
+            return FileUtils.stringifyFileSize(bytes);
+        else
+            return String.format("%s of %s", 
FileUtils.stringifyFileSize(bytes), maxBytesPerSchedule);
+    }
+
+    /**
+     * @return The sum of {@link SizedRepairAssignment#getEstimatedBytes()} of 
all given
+     * repairAssignments.
+     * @param repairAssignments The assignments to sum
+     */
+    @VisibleForTesting
+    protected static long getEstimatedBytes(List<SizedRepairAssignment> 
repairAssignments)
+    {
+        return repairAssignments
+               .stream()
+               .mapToLong(SizedRepairAssignment::getEstimatedBytes)
+               .sum();
+    }
+
+    @VisibleForTesting
+    static SizedRepairAssignment merge(List<SizedRepairAssignment> assignments)
+    {
+        if (assignments.isEmpty())
+            throw new IllegalStateException("Cannot merge empty assignments");
+
+        Set<String> mergedTableNames = new HashSet<>();
+        Range<Token> referenceTokenRange = assignments.get(0).getTokenRange();
+        String referenceKeyspaceName = assignments.get(0).getKeyspaceName();
+
+        for (SizedRepairAssignment assignment : assignments)
+        {
+            // These checks _should_ be unnecessary but are here to ensure 
that the assignments are consistent
+            if (!assignment.getTokenRange().equals(referenceTokenRange))
+                throw new IllegalStateException("All assignments must have the 
same token range");
+            if (!assignment.getKeyspaceName().equals(referenceKeyspaceName))
+                throw new IllegalStateException("All assignments must have the 
same keyspace name");
+
+            mergedTableNames.addAll(assignment.getTableNames());
+        }
+
+        long sizeForAssignment = getEstimatedBytes(assignments);
+        return new SizedRepairAssignment(referenceTokenRange, 
referenceKeyspaceName, new ArrayList<>(mergedTableNames),
+                                         "full primary range for " + 
mergedTableNames.size() + " tables", sizeForAssignment);
+    }
+
+    @VisibleForTesting
+    protected List<SizedRepairAssignment> getRepairAssignmentsForTable(String 
keyspaceName, String tableName, Collection<Range<Token>> tokenRanges)
+    {
+        List<SizeEstimate> sizeEstimates = getRangeSizeEstimate(keyspaceName, 
tableName, tokenRanges);
+        return getRepairAssignments(sizeEstimates);
+    }
+
+    private static void logSkippingTable(String keyspaceName, String tableName)
+    {
+        logger.warn("Could not resolve table data for {}.{} assuming it has 
since been deleted, skipping", keyspaceName, tableName);
+    }
+
+    @VisibleForTesting
+    protected List<SizedRepairAssignment> 
getRepairAssignments(List<SizeEstimate> sizeEstimates)
+    {
+        List<SizedRepairAssignment> repairAssignments = new ArrayList<>();
+
+        // since its possible for us to hit maxBytesPerSchedule before seeing 
all ranges, shuffle so there is chance
+        // at least of hitting all the ranges _eventually_ for the worst case 
scenarios
+        Collections.shuffle(sizeEstimates);
+        int totalExpectedSubRanges = 0;
+        for (SizeEstimate estimate : sizeEstimates)
+        {
+            if (estimate.sizeForRepair != 0)
+            {
+                boolean needsSplitting = estimate.sizeForRepair > 
bytesPerAssignment.toBytes() || estimate.partitions > partitionsPerAssignment;
+                if (needsSplitting)
+                {
+                    totalExpectedSubRanges += 
calculateNumberOfSplits(estimate);
+                }
+            }
+        }
+        for (SizeEstimate estimate : sizeEstimates)
+        {
+            if (estimate.sizeForRepair == 0)
+            {
+                ColumnFamilyStore cfs = 
ColumnFamilyStore.getIfExists(estimate.keyspace, estimate.table);
+
+                if (cfs == null)
+                {
+                    logSkippingTable(estimate.keyspace, estimate.table);
+                    continue;
+                }
+
+                long memtableSize = 
cfs.getTracker().getView().getCurrentMemtable().getLiveDataSize();
+                if (memtableSize > 0L)
+                {
+                    logger.debug("Included {}.{} range {}, had no unrepaired 
SSTables, but memtableSize={}, adding single repair assignment", 
estimate.keyspace, estimate.table, estimate.tokenRange, memtableSize);
+                    SizedRepairAssignment assignment = new 
SizedRepairAssignment(estimate.tokenRange, estimate.keyspace, 
Collections.singletonList(estimate.table), "full primary rangee for table with 
memtable only detected", memtableSize);
+                    repairAssignments.add(assignment);
+                }
+                else
+                {
+                    logger.debug("Included {}.{} range {}, has no SSTables or 
memtable data, but adding single repair assignment for entire range in case 
writes were missed", estimate.keyspace, estimate.table, estimate.tokenRange);
+                    SizedRepairAssignment assignment = new 
SizedRepairAssignment(estimate.tokenRange, estimate.keyspace, 
Collections.singletonList(estimate.table), "full primary range for table with 
no data detected", 0L);
+                    repairAssignments.add(assignment);
+                }
+            }
+            else
+            {
+                // Check if the estimate needs splitting based on the criteria
+                boolean needsSplitting = estimate.sizeForRepair > 
bytesPerAssignment.toBytes() || estimate.partitions > partitionsPerAssignment;
+                if (needsSplitting)
+                {
+                    int numberOfSplits = calculateNumberOfSplits(estimate);
+                    long approximateBytesPerSplit = estimate.sizeForRepair / 
numberOfSplits;
+                    Collection<Range<Token>> subranges = 
split(estimate.tokenRange, numberOfSplits);
+                    for (Range<Token> subrange : subranges)
+                    {
+                        SizedRepairAssignment assignment = new 
SizedRepairAssignment(subrange, estimate.keyspace, 
Collections.singletonList(estimate.table),
+                                                                               
      String.format("subrange %d of %d", repairAssignments.size()+1, 
totalExpectedSubRanges),
+                                                                               
      approximateBytesPerSplit);
+                        repairAssignments.add(assignment);
+                    }
+                }
+                else
+                {
+                    // No splitting needed, repair the entire range as-is
+                    SizedRepairAssignment assignment = new 
SizedRepairAssignment(estimate.tokenRange, estimate.keyspace,
+                                                                               
  Collections.singletonList(estimate.table),
+                                                                               
  "full primary range for table", estimate.sizeForRepair);
+                    repairAssignments.add(assignment);
+                }
+            }
+        }
+        return repairAssignments;
+    }
+
+    private int calculateNumberOfSplits(SizeEstimate estimate)
+    {
+        // Calculate the number of splits needed for size and partitions
+        int splitsForSize = (int) Math.ceil((double) estimate.sizeForRepair / 
bytesPerAssignment.toBytes());
+        int splitsForPartitions = (int) Math.ceil((double) estimate.partitions 
/ partitionsPerAssignment);
+
+        // Split the token range into subranges based on whichever 
(partitions, bytes) would generate the most splits.
+        boolean splitBySize = splitsForSize > splitsForPartitions;
+        int splits = splitBySize ? splitsForSize : splitsForPartitions;
+
+        // calculate approximation for logging purposes
+        long approximateBytesPerSplit = estimate.sizeForRepair / splits;
+        long approximatePartitionsPerSplit = estimate.partitions / splits;
+
+        logger.info("Splitting {}.{} for range {} into {} sub ranges by {} 
(splitsForSize={}, splitsForPartitions={}, " +
+                    "approximateBytesInRange={}, 
approximatePartitionsInRange={}, " +
+                    "approximateBytesPerSplit={}, 
approximatePartitionsPerSplit={})",
+                    estimate.keyspace, estimate.table, estimate.tokenRange,
+                    splits, splitBySize ? "size" : "partitions",
+                    splitsForSize, splitsForPartitions,
+                    FileUtils.stringifyFileSize(estimate.sizeForRepair), 
estimate.partitions,
+                    FileUtils.stringifyFileSize(approximateBytesPerSplit), 
approximatePartitionsPerSplit
+        );
+        return splits;
+    }
+
+    private Collection<Range<Token>> getTokenRanges(boolean primaryRangeOnly, 
String keyspaceName)
+    {
+        // Collect all applicable token ranges
+        Collection<Range<Token>> wrappedRanges;
+        if (primaryRangeOnly)
+        {
+            wrappedRanges = 
TokenRingUtils.getPrimaryRangesForEndpoint(keyspaceName, 
FBUtilities.getBroadcastAddressAndPort());
+        }
+        else
+        {
+            wrappedRanges = 
StorageService.instance.getLocalRanges(keyspaceName);
+        }
+
+        // Unwrap each range as we need to account for ranges that overlap the 
ring
+        List<Range<Token>> ranges = new ArrayList<>();
+        for (Range<Token> wrappedRange : wrappedRanges)
+        {
+            ranges.addAll(wrappedRange.unwrap());
+        }
+        return ranges;
+    }
+
+    private List<SizeEstimate> getRangeSizeEstimate(String keyspace, String 
table, Collection<Range<Token>> tokenRanges)
+    {
+        List<SizeEstimate> sizeEstimates = new ArrayList<>();
+        for (Range<Token> tokenRange : tokenRanges)
+        {
+            logger.debug("Calculating size estimate for {}.{} for range {}", 
keyspace, table, tokenRange);
+            try (Refs<SSTableReader> refs = getSSTableReaderRefs(repairType, 
keyspace, table, tokenRange))
+            {
+                SizeEstimate estimate = getSizesForRangeOfSSTables(repairType, 
keyspace, table, tokenRange, refs);
+                logger.debug("Generated size estimate {}", estimate);
+                sizeEstimates.add(estimate);
+            }
+        }
+        return sizeEstimates;
+    }
+
+    @VisibleForTesting
+    static SizeEstimate getSizesForRangeOfSSTables(AutoRepairConfig.RepairType 
repairType, String keyspace, String table, Range<Token> tokenRange, 
Refs<SSTableReader> refs)
+    {
+        ICardinality cardinality = new HyperLogLogPlus(13, 25);
+        long approxBytesInRange = 0L;
+        long totalBytes = 0L;
+
+        for (SSTableReader reader : refs)
+        {
+            try
+            {
+                if (reader.openReason == SSTableReader.OpenReason.EARLY)
+                    continue;
+                CompactionMetadata metadata = (CompactionMetadata) 
reader.descriptor.getMetadataSerializer().deserialize(reader.descriptor, 
MetadataType.COMPACTION);
+                if (metadata != null)
+                    cardinality = 
cardinality.merge(metadata.cardinalityEstimator);
+
+                long sstableSize = reader.bytesOnDisk();
+                totalBytes += sstableSize;
+                // get the bounds of the sstable for this range using the 
index file but do not actually read it.
+                List<AbstractBounds<PartitionPosition>> bounds = 
BigTableScanner.makeBounds(reader, Collections.singleton(tokenRange));

Review Comment:
   A couple issues here I realized just now.
   
   1. `makeBounds` with a `TokenRange` was removed between 4.1 and trunk.  We 
also had changed visibility to public which probably shouldn't have done in 
hindsight.
   2. `BtiTableScanner` (trie-based index) is now a thing, and we need to 
account for that.
   3. We probably don't need to use `makeBounds` directly;  I believe the only 
two things it was use for was checking whether the SSTable encompassed the 
range  (which it should since we pre-filter), and to get the approximate bytes 
in the range.  In fact it looks like there's an API method on `SSTable` called 
`onDiskSizeForPartitionPositions` that will do this for us in an implementation 
agnostic way.



##########
src/java/org/apache/cassandra/service/StorageService.java:
##########
@@ -1113,6 +1117,18 @@ public void doAuthSetup(boolean async)
         }
     }
 
+    public void doAutoRepairSetup()
+    {
+        AutoRepairService.setup();
+        if 
(DatabaseDescriptor.getAutoRepairConfig().isAutoRepairSchedulingEnabled())
+        {
+            logger.info("Enable auto-repair scheduling");

Review Comment:
   ```suggestion
               logger.info("Enabling auto-repair scheduling");
   ```



##########
test/unit/org/apache/cassandra/service/AutoRepairServiceRepairTypeTest.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.repair.autorepair.AutoRepairConfig;
+import org.apache.cassandra.repair.autorepair.AutoRepairUtils;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.apache.cassandra.Util.setAutoRepairEnabled;
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.SYSTEM_DISTRIBUTED_DEFAULT_RF;
+import static org.junit.Assert.assertEquals;
+
+@RunWith(Parameterized.class)
+public class AutoRepairServiceRepairTypeTest extends CQLTester {
+    @Parameterized.Parameter()
+    public AutoRepairConfig.RepairType repairType;
+
+    private final UUID host1 = 
UUID.fromString("00000000-0000-0000-0000-000000000001");
+    private final UUID host2 = 
UUID.fromString("00000000-0000-0000-0000-000000000002");
+
+    private AutoRepairService instance;
+
+    @Parameterized.Parameters(name = "repairType={0}")
+    public static Collection<AutoRepairConfig.RepairType> repairTypes() {

Review Comment:
   nit: paren should go on next line in this file



##########
src/java/org/apache/cassandra/repair/autorepair/FixedSplitTokenRangeSplitter.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair.autorepair;
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.service.AutoRepairService;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.tcm.compatibility.TokenRingUtils;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.repair.autorepair.AutoRepairUtils.split;
+
+public class FixedSplitTokenRangeSplitter implements 
IAutoRepairTokenRangeSplitter
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(FixedSplitTokenRangeSplitter.class);
+
+    /**
+     * The number of subranges to split each to-be-repaired token range into. 
Defaults to 1.
+     * <p>
+     * The higher this number, the smaller the repair sessions will be.
+     * <p>
+     * If you are using vnodes, say 256, then the repair will always go one 
vnode range at a time.  This parameter,
+     * additionally, will let us further subdivide a given vnode range into 
subranges.
+     * <p>
+     * With the value "1" and vnodes of 256, a given table on a node will 
undergo the repair 256 times. But with a
+     * value "2", the same table on a node will undergo a repair 512 times 
because every vnode range will be further
+     * divided by two.
+     * <p>
+     * If you do not use vnodes or the number of vnodes is pretty small, say 
8, setting this value to a higher number,
+     * such as 16, will be useful to repair on a smaller range, and the chance 
of succeeding is higher.
+     */
+    static final String NUMBER_OF_SUBRANGES = "number_of_subranges";
+
+    private final AutoRepairConfig.RepairType repairType;
+    private int numberOfSubranges;
+
+    public FixedSplitTokenRangeSplitter(AutoRepairConfig.RepairType 
repairType, Map<String, String> parameters)
+    {
+        this.repairType = repairType;
+
+        numberOfSubranges = 
Integer.parseInt(parameters.getOrDefault(NUMBER_OF_SUBRANGES, "1"));
+    }
+
+    @Override
+    public Iterator<KeyspaceRepairAssignments> getRepairAssignments(boolean 
primaryRangeOnly, List<PrioritizedRepairPlan> repairPlans)
+    {
+        return new RepairAssignmentIterator(repairPlans) {

Review Comment:
   nit: paren should go on next line



##########
src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java:
##########
@@ -217,6 +218,8 @@ public SequenceState executeNext()
                                         "For more, see `nodetool help 
bootstrap`. {}", SystemKeyspace.getBootstrapState());
                             return halted();
                         }
+                        // this node might have just bootstrapped; check if we 
should run repair immediately
+                        
AutoRepairUtils.runRepairOnNewlyBootstrappedNodeIfEnabled();

Review Comment:
   Maybe paranoid on my part, but could we move this towards the end, maybe in 
the end of the `if (finishJoiningRing)` block?  This way is something 
unexpected happens in this code path, it doesn't skip marking the bootstrap as 
complete/updating CMS.



##########
test/unit/org/apache/cassandra/service/AutoRepairServiceSetterTest.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.repair.autorepair.AutoRepairConfig;
+import org.apache.cassandra.repair.autorepair.AutoRepairUtils;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.schema.SystemDistributedKeyspace;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.cassandra.Util.setAutoRepairEnabled;
+import static org.assertj.core.api.Assertions.assertThat;
+
+@RunWith(Parameterized.class)
+public class AutoRepairServiceSetterTest<T> extends CQLTester {
+    private static final AutoRepairConfig config = new AutoRepairConfig(true);
+
+    @Parameterized.Parameter
+    public AutoRepairConfig.RepairType repairTypeStr;
+
+    @Parameterized.Parameter(1)
+    public T arg;
+
+    @Parameterized.Parameter(2)
+    public BiConsumer<String, T> setter;
+
+    @Parameterized.Parameter(3)
+    public Function<AutoRepairConfig.RepairType, T> getter;
+
+    @Parameterized.Parameters(name = "{index}: repairType={0}, arg={1}")
+    public static Collection<Object[]> testCases() {

Review Comment:
   nit: paren should go on next line in this file



##########
test/unit/org/apache/cassandra/config/YamlConfigurationLoaderTest.java:
##########
@@ -42,6 +42,8 @@
 import static 
org.apache.cassandra.config.YamlConfigurationLoader.SYSTEM_PROPERTY_PREFIX;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import org.apache.cassandra.repair.autorepair.AutoRepairConfig;

Review Comment:
   nit: import order



##########
src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java:
##########
@@ -217,6 +218,8 @@ public SequenceState executeNext()
                                         "For more, see `nodetool help 
bootstrap`. {}", SystemKeyspace.getBootstrapState());
                             return halted();
                         }
+                        // this node might have just bootstrapped; check if we 
should run repair immediately
+                        
AutoRepairUtils.runRepairOnNewlyBootstrappedNodeIfEnabled();

Review Comment:
   Same applies for `BootstrapAndReplace`/`ReplaceSameAddress`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org

Reply via email to