tolbertam commented on code in PR #3598: URL: https://github.com/apache/cassandra/pull/3598#discussion_r1949911742
########## src/java/org/apache/cassandra/service/ActiveRepairService.java: ########## @@ -51,13 +51,25 @@ import com.google.common.collect.Lists; import com.google.common.collect.Multimap; +import org.apache.cassandra.concurrent.ExecutorPlus; Review Comment: nit: Imports should go with the other org.apache ones. ########## src/java/org/apache/cassandra/service/ActiveRepairService.java: ########## @@ -1075,6 +1097,16 @@ public void setRepairPendingCompactionRejectThreshold(int value) DatabaseDescriptor.setRepairPendingCompactionRejectThreshold(value); } + public double getIncrementalRepairDiskHeadroomRejectRatio() + { + return DatabaseDescriptor.getIncrementalRepairDiskHeadroomRejectRatio(); + } + + public void setIncrementalRepairDiskHeadroomRejectRatio(double value) + { + DatabaseDescriptor.setIncrementalRepairDiskHeadroomRejectRatio(value); + } + Review Comment: Can we add these to ActiveRepairServiceMBean so we configure this from JMX? Was just thinking that `RepairPendingCompactionRejectThreshold` is something i've had to tweak through JMX a bunch in the past ########## src/java/org/apache/cassandra/config/Config.java: ########## @@ -354,6 +356,10 @@ public MemtableOptions() // The number of executors to use for building secondary indexes public volatile int concurrent_index_builders = 2; + // at least 20% of disk must be unused to run incremental repair + // if you want to disable this feature (the recommendation is not to, but if you want to disable it for whatever reason) then set the ratio to 0.0 + public volatile double incremental_repair_disk_headroom_reject_ratio = 0.2; Review Comment: Just noticed this prop is missing from cassandra.yaml, can you add it? ########## src/java/org/apache/cassandra/config/DurationSpec.java: ########## @@ -41,7 +42,7 @@ * users the opportunity to be able to provide config with a unit of their choice in cassandra.yaml as per the available * options. (CASSANDRA-15234) */ -public abstract class DurationSpec +public abstract class DurationSpec implements Serializable Review Comment: With CASSANDRA-20185 we don't need to make this Serializable anymore I think, so would be good to change this back. ########## test/unit/org/apache/cassandra/service/AutoRepairServiceBasicTest.java: ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service; + +import org.junit.Before; +import org.junit.Test; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.repair.autorepair.AutoRepairConfig; + +import static org.junit.Assert.assertEquals; + +public class AutoRepairServiceBasicTest extends CQLTester { + private static AutoRepairService autoRepairService; + private static AutoRepairConfig config; + + @Before Review Comment: nit: paren should go on next line in (methods in file) ########## test/unit/org/apache/cassandra/repair/autorepair/RepairTokenRangeSplitterTest.java: ########## @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair.autorepair; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.config.DataStorageSpec.LongMebibytesBound; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.big.BigFormat; +import org.apache.cassandra.repair.autorepair.AutoRepairConfig.RepairType; +import org.apache.cassandra.repair.autorepair.RepairTokenRangeSplitter.FilteredRepairAssignments; +import org.apache.cassandra.repair.autorepair.RepairTokenRangeSplitter.SizeEstimate; +import org.apache.cassandra.repair.autorepair.RepairTokenRangeSplitter.SizedRepairAssignment; +import org.apache.cassandra.service.AutoRepairService; +import org.apache.cassandra.utils.concurrent.Refs; + +import static org.apache.cassandra.repair.autorepair.RepairTokenRangeSplitter.MAX_BYTES_PER_SCHEDULE; +import static org.apache.cassandra.repair.autorepair.RepairTokenRangeSplitter.BYTES_PER_ASSIGNMENT; +import static org.apache.cassandra.repair.autorepair.RepairTokenRangeSplitter.MAX_TABLES_PER_ASSIGNMENT; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class RepairTokenRangeSplitterTest extends CQLTester +{ + private RepairTokenRangeSplitter repairRangeSplitter; + private String tableName; + private static Range<Token> FULL_RANGE; + + @BeforeClass + public static void setUpClass() + { + CQLTester.setUpClass(); + AutoRepairService.setup(); + FULL_RANGE = new Range<>(DatabaseDescriptor.getPartitioner().getMinimumToken(), DatabaseDescriptor.getPartitioner().getMaximumToken()); + DatabaseDescriptor.setSelectedSSTableFormat(DatabaseDescriptor.getSSTableFormats().get(BigFormat.NAME)); + } + + @Before + public void setUp() + { + repairRangeSplitter = new RepairTokenRangeSplitter(RepairType.FULL, Collections.emptyMap()); + tableName = createTable("CREATE TABLE %s (k INT PRIMARY KEY, v INT)"); + assertTrue(BigFormat.isSelected()); + } + + @Test + public void testSizePartitionCount() throws Throwable + { + insertAndFlushTable(tableName, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + Refs<SSTableReader> sstables = RepairTokenRangeSplitter.getSSTableReaderRefs(RepairType.FULL, KEYSPACE, tableName, FULL_RANGE); + assertEquals(10, sstables.iterator().next().getEstimatedPartitionSize().count()); + SizeEstimate sizes = RepairTokenRangeSplitter.getSizesForRangeOfSSTables(RepairType.FULL, KEYSPACE, tableName, FULL_RANGE, sstables); + assertEquals(10, sizes.partitions); + } + + @Test + public void testSizePartitionCountSplit() throws Throwable + { + int[] values = new int[10000]; + for (int i = 0; i < values.length; i++) + values[i] = i + 1; + insertAndFlushTable(tableName, values); + Iterator<Range<Token>> range = AutoRepairUtils.split(FULL_RANGE, 2).iterator(); + Range<Token> tokenRange1 = range.next(); + Range<Token> tokenRange2 = range.next(); + Assert.assertFalse(range.hasNext()); + + Refs<SSTableReader> sstables1 = RepairTokenRangeSplitter.getSSTableReaderRefs(RepairType.FULL, KEYSPACE, tableName, tokenRange1); + Refs<SSTableReader> sstables2 = RepairTokenRangeSplitter.getSSTableReaderRefs(RepairType.FULL, KEYSPACE, tableName, tokenRange2); + SizeEstimate sizes1 = RepairTokenRangeSplitter.getSizesForRangeOfSSTables(RepairType.FULL, KEYSPACE, tableName, tokenRange1, sstables1); + SizeEstimate sizes2 = RepairTokenRangeSplitter.getSizesForRangeOfSSTables(RepairType.FULL, KEYSPACE, tableName, tokenRange2, sstables2); + // +-5% because HLL merge and the applying of range size approx ratio causes estimation errors + assertTrue(Math.abs(10000 - (sizes1.partitions + sizes2.partitions)) <= 60); + } + + @Test + public void testGetRepairAssignmentsForTable_NoSSTables() + { + // Should return 1 assignment if there are no SSTables + Collection<Range<Token>> ranges = Collections.singleton(new Range<>(Murmur3Partitioner.instance.getMinimumToken(), Murmur3Partitioner.instance.getMaximumToken())); + List<SizedRepairAssignment> assignments = repairRangeSplitter.getRepairAssignmentsForTable(CQLTester.KEYSPACE, tableName, ranges); + assertEquals(1, assignments.size()); + } + + @Test + public void testGetRepairAssignmentsForTable_Single() throws Throwable + { + Collection<Range<Token>> ranges = Collections.singleton(new Range<>(DatabaseDescriptor.getPartitioner().getMinimumToken(), DatabaseDescriptor.getPartitioner().getMaximumToken())); + insertAndFlushSingleTable(); + List<SizedRepairAssignment> assignments = repairRangeSplitter.getRepairAssignmentsForTable(CQLTester.KEYSPACE, tableName, ranges); + assertEquals(1, assignments.size()); + } + + @Test + public void testGetRepairAssignmentsForTable_BatchingTables() throws Throwable + { + repairRangeSplitter = new RepairTokenRangeSplitter(RepairType.FULL, Collections.singletonMap(MAX_TABLES_PER_ASSIGNMENT, "2")); + Collection<Range<Token>> ranges = Collections.singleton(FULL_RANGE); + + List<String> tableNames = createAndInsertTables(3); + List<SizedRepairAssignment> assignments = repairRangeSplitter.getRepairAssignmentsForKeyspace(RepairType.FULL, KEYSPACE, tableNames, ranges); + + // We expect two assignments, one with table1 and table2 batched, and one with table3 + assertEquals(2, assignments.size()); + assertEquals(2, assignments.get(0).getTableNames().size()); + assertEquals(1, assignments.get(1).getTableNames().size()); + } + + @Test + public void testGetRepairAssignmentsForTable_BatchSize() throws Throwable + { + repairRangeSplitter = new RepairTokenRangeSplitter(RepairType.FULL, Collections.singletonMap(MAX_TABLES_PER_ASSIGNMENT, "2")); + Collection<Range<Token>> ranges = Collections.singleton(FULL_RANGE); + + List<String> tableNames = createAndInsertTables(2); + List<SizedRepairAssignment> assignments = repairRangeSplitter.getRepairAssignmentsForKeyspace(RepairType.FULL, KEYSPACE, tableNames, ranges); + + // We expect one assignment, with two tables batched + assertEquals(1, assignments.size()); + assertEquals(2, assignments.get(0).getTableNames().size()); + } + + @Test + public void testGetRepairAssignmentsForTable_NoBatching() throws Throwable + { + repairRangeSplitter = new RepairTokenRangeSplitter(RepairType.FULL, Collections.singletonMap(MAX_TABLES_PER_ASSIGNMENT, "1")); + Collection<Range<Token>> ranges = Collections.singleton(FULL_RANGE); + + List<String> tableNames = createAndInsertTables(3); + List<SizedRepairAssignment> assignments = repairRangeSplitter.getRepairAssignmentsForKeyspace(RepairType.FULL, KEYSPACE, tableNames, ranges); + + assertEquals(3, assignments.size()); + } + + @Test + public void testGetRepairAssignmentsForTable_AllBatched() throws Throwable + { + repairRangeSplitter = new RepairTokenRangeSplitter(RepairType.FULL, Collections.singletonMap(MAX_TABLES_PER_ASSIGNMENT, "100")); + Collection<Range<Token>> ranges = Collections.singleton(FULL_RANGE); + + List<String> tableNames = createAndInsertTables(5); + List<SizedRepairAssignment> assignments = repairRangeSplitter.getRepairAssignmentsForKeyspace(RepairType.FULL, KEYSPACE, tableNames, ranges); + + assertEquals(1, assignments.size()); + } + + @Test(expected = IllegalStateException.class) + public void testMergeEmptyAssignments() + { + // Test when the list of assignments is empty + List<SizedRepairAssignment> emptyAssignments = Collections.emptyList(); + RepairTokenRangeSplitter.merge(emptyAssignments); + } + + @Test + public void testMergeSingleAssignment() + { + // Test when there is only one assignment in the list + String keyspaceName = "testKeyspace"; + List<String> tableNames = Arrays.asList("table1", "table2"); + + SizedRepairAssignment assignment = new SizedRepairAssignment(FULL_RANGE, keyspaceName, tableNames); + List<SizedRepairAssignment> assignments = Collections.singletonList(assignment); + + SizedRepairAssignment result = RepairTokenRangeSplitter.merge(assignments); + + assertEquals(FULL_RANGE, result.getTokenRange()); + assertEquals(keyspaceName, result.getKeyspaceName()); + assertEquals(new HashSet<>(tableNames), new HashSet<>(result.getTableNames())); + } + + @Test + public void testMergeMultipleAssignmentsWithSameTokenRangeAndKeyspace() + { + // Test merging multiple assignments with the same token range and keyspace + String keyspaceName = "testKeyspace"; + List<String> tableNames1 = Arrays.asList("table1", "table2"); + List<String> tableNames2 = Arrays.asList("table2", "table3"); + + SizedRepairAssignment assignment1 = new SizedRepairAssignment(FULL_RANGE, keyspaceName, tableNames1); + SizedRepairAssignment assignment2 = new SizedRepairAssignment(FULL_RANGE, keyspaceName, tableNames2); + List<SizedRepairAssignment> assignments = Arrays.asList(assignment1, assignment2); + + SizedRepairAssignment result = RepairTokenRangeSplitter.merge(assignments); + + assertEquals(FULL_RANGE, result.getTokenRange()); + assertEquals(keyspaceName, result.getKeyspaceName()); + assertEquals(new HashSet<>(Arrays.asList("table1", "table2", "table3")), new HashSet<>(result.getTableNames())); + } + + @Test(expected = IllegalStateException.class) + public void testMergeDifferentTokenRange() + { + // Test merging assignments with different token ranges + Iterator<Range<Token>> range = AutoRepairUtils.split(FULL_RANGE, 2).iterator(); // Split the full range into two ranges ie (0-100, 100-200 + Range<Token> tokenRange1 = range.next(); + Range<Token> tokenRange2 = range.next(); + Assert.assertFalse(range.hasNext()); + + String keyspaceName = "testKeyspace"; + List<String> tableNames = Arrays.asList("table1", "table2"); + + SizedRepairAssignment assignment1 = new SizedRepairAssignment(tokenRange1, keyspaceName, tableNames); + SizedRepairAssignment assignment2 = new SizedRepairAssignment(tokenRange2, keyspaceName, tableNames); + List<SizedRepairAssignment> assignments = Arrays.asList(assignment1, assignment2); + + RepairTokenRangeSplitter.merge(assignments); // Should throw IllegalStateException + } + + @Test(expected = IllegalStateException.class) + public void testMergeDifferentKeyspaceName() + { + // Test merging assignments with different keyspace names + List<String> tableNames = Arrays.asList("table1", "table2"); + + SizedRepairAssignment assignment1 = new SizedRepairAssignment(FULL_RANGE, "keyspace1", tableNames); + SizedRepairAssignment assignment2 = new SizedRepairAssignment(FULL_RANGE, "keyspace2", tableNames); + List<SizedRepairAssignment> assignments = Arrays.asList(assignment1, assignment2); + + RepairTokenRangeSplitter.merge(assignments); // Should throw IllegalStateException + } + + @Test + public void testMergeWithDuplicateTables() + { + // Test merging assignments with duplicate table names + String keyspaceName = "testKeyspace"; + List<String> tableNames1 = Arrays.asList("table1", "table2"); + List<String> tableNames2 = Arrays.asList("table2", "table3"); + + SizedRepairAssignment assignment1 = new SizedRepairAssignment(FULL_RANGE, keyspaceName, tableNames1); + SizedRepairAssignment assignment2 = new SizedRepairAssignment(FULL_RANGE, keyspaceName, tableNames2); + List<SizedRepairAssignment> assignments = Arrays.asList(assignment1, assignment2); + + RepairAssignment result = RepairTokenRangeSplitter.merge(assignments); + + // The merged result should contain all unique table names + assertEquals(new HashSet<>(Arrays.asList("table1", "table2", "table3")), new HashSet<>(result.getTableNames())); + } + + @Test + public void testGetRepairAssignmentsSplitsBySubrangeSizeAndFilterLimitsByMaxBytesPerSchedule() + { + // Ensures that getRepairAssignments splits by BYTES_PER_ASSIGNMENT and filterRepairAssignments limits by MAX_BYTES_PER_SCHEDULE. + repairRangeSplitter = new RepairTokenRangeSplitter(RepairType.INCREMENTAL, Collections.emptyMap()); + repairRangeSplitter.setParameter(BYTES_PER_ASSIGNMENT, "50GiB"); + repairRangeSplitter.setParameter(MAX_BYTES_PER_SCHEDULE, "100GiB"); + + // Given a size estimate of 1024GiB, we should expect 21 splits (50GiB*21 = 1050GiB < 1024GiB) + SizeEstimate sizeEstimate = sizeEstimateByBytes(new LongMebibytesBound("1024GiB")); + + List<SizedRepairAssignment> assignments = repairRangeSplitter.getRepairAssignments(Collections.singletonList(sizeEstimate)); + + // Should be 21 assignments, each being ~48.76 GiB + assertEquals(21, assignments.size()); + long expectedBytes = 52357696560L; + for (int i = 0; i < assignments.size(); i++) + { + SizedRepairAssignment assignment = assignments.get(i); + assertEquals("Did not get expected value for assignment " + i, 52357696560L, assignment.getEstimatedBytes()); + } + + // When filtering we should only get 2 assignments back (48.76 * 2 < 100GiB) + FilteredRepairAssignments filteredRepairAssignments = repairRangeSplitter.filterRepairAssignments(0, KEYSPACE, assignments, 0); + List<RepairAssignment> finalRepairAssignments = filteredRepairAssignments.repairAssignments; + assertEquals(2, finalRepairAssignments.size()); + assertEquals(expectedBytes*2, filteredRepairAssignments.newBytesSoFar); + } + + @Test(expected=IllegalArgumentException.class) + public void testSetParameterShouldNotAllowUnknownParameter() + { + repairRangeSplitter.setParameter("unknown", "x"); + } + + @Test(expected=IllegalArgumentException.class) + public void testSetParameterShouldNotAllowSettingBytesPerAssignmentGreaterThanMaxBytesPerSchedule() + { + repairRangeSplitter.setParameter(MAX_BYTES_PER_SCHEDULE, "500GiB"); + repairRangeSplitter.setParameter(BYTES_PER_ASSIGNMENT, "600GiB"); + } + + @Test(expected=IllegalArgumentException.class) + public void testSetParameterShouldNotAllowSettingMaxBytesPerScheduleLessThanBytesPerAssignment() + { + repairRangeSplitter.setParameter(BYTES_PER_ASSIGNMENT, "100MiB"); + repairRangeSplitter.setParameter(MAX_BYTES_PER_SCHEDULE, "50MiB"); + } + + @Test + public void testGetParameters() + { + repairRangeSplitter.setParameter(BYTES_PER_ASSIGNMENT, "100MiB"); + repairRangeSplitter.setParameter(MAX_TABLES_PER_ASSIGNMENT, "5"); + + Map<String, String> parameters = repairRangeSplitter.getParameters(); + // Each parameter should be present. + assertEquals(RepairTokenRangeSplitter.PARAMETERS.size(), parameters.size()); + // The parameters we explicitly set should be set exactly as we set them. + assertEquals("100MiB", parameters.get(BYTES_PER_ASSIGNMENT)); + assertEquals("5", parameters.get(MAX_TABLES_PER_ASSIGNMENT)); + } + + private SizeEstimate sizeEstimateByBytes(LongMebibytesBound totalSize) + { + return sizeEstimateByBytes(totalSize, totalSize); + } + + private SizeEstimate sizeEstimateByBytes(LongMebibytesBound sizeInRange, LongMebibytesBound totalSize) + { + return new SizeEstimate(RepairType.INCREMENTAL, KEYSPACE, "table1", FULL_RANGE, 1, sizeInRange.toBytes(), totalSize.toBytes()); + } + + + private void insertAndFlushSingleTable() throws Throwable + { + execute("INSERT INTO %s (k, v) values (?, ?)", 1, 1); + flush(); + } + + private List<String> createAndInsertTables(int count) throws Throwable + { + List<String> tableNames = new ArrayList<>(); + for (int i = 0; i < count; i++) { Review Comment: nit: paren on next line ########## src/java/org/apache/cassandra/utils/FBUtilities.java: ########## @@ -85,7 +87,6 @@ import org.apache.cassandra.io.sstable.metadata.MetadataType; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.DataOutputBufferFixed; -import org.apache.cassandra.io.util.File; Review Comment: nit: accidental import move, can this be reverted? ########## src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java: ########## @@ -25,16 +25,19 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; + +import org.apache.cassandra.config.DatabaseDescriptor; Review Comment: nit: move imports ########## src/java/org/apache/cassandra/config/Config.java: ########## @@ -32,6 +32,8 @@ import com.google.common.base.Joiner; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; + +import org.apache.cassandra.repair.autorepair.AutoRepairConfig; Review Comment: nit: move import below `AuditLogOptions` import ########## src/java/org/apache/cassandra/config/ParameterizedClass.java: ########## @@ -30,7 +31,7 @@ import static org.apache.cassandra.utils.Shared.Scope.SIMULATION; @Shared(scope = SIMULATION) -public class ParameterizedClass +public class ParameterizedClass implements Serializable Review Comment: same as `DurationSpec` shouldn't need to be `Serializable` anymore. ########## src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java: ########## @@ -20,6 +20,8 @@ import java.io.IOException; import java.util.Iterator; +import javax.annotation.Nullable; Review Comment: I'll revert this with my change to use `onDiskSizeForPartitionPositions` ########## src/java/org/apache/cassandra/io/sstable/format/SSTableScanner.java: ########## @@ -83,6 +85,14 @@ protected SSTableScanner(S sstable, this.listener = listener; } + public static List<AbstractBounds<PartitionPosition>> makeBounds(SSTableReader sstable, Collection<Range<Token>> tokenRanges) + { + List<AbstractBounds<PartitionPosition>> boundsList = new ArrayList<>(tokenRanges.size()); + for (Range<Token> range : Range.normalize(tokenRanges)) + addRange(sstable, Range.makeRowRange(range), boundsList); + return boundsList; + } + Review Comment: It looks like this code exists on 4.1, but no longer exists on trunk. I also realized that `RepairTokenRangeScanner` needs to account for `Bti` instead of using `BigTableScanner` directly. After looking at the code, I think we can make this work without changing this code at all and we can make it implementation agnostic. I'll propose something. ########## test/unit/org/apache/cassandra/tools/nodetool/SSTableRepairedSetTest.java: ########## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tools.nodetool; + +import java.io.OutputStream; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Arrays; + +import org.junit.Before; +import org.junit.Test; + +import org.apache.cassandra.tools.NodeProbe; +import org.apache.cassandra.tools.Output; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class SSTableRepairedSetTest +{ + @Mock + private NodeProbe probe; + + private SSTableRepairedSet cmd; + + @Before + public void setUp() { Review Comment: nit: parens in next line (all methods in this file) ########## src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java: ########## @@ -190,11 +192,11 @@ private boolean cdcRequiresWriteCommitLog(ColumnFamilyStore cfs) * For CDC-enabled tables and write path for CDC is enabled, we want to ensure that the mutations are * run through the CommitLog, so they can be archived by the CDC process on discard. */ - private boolean requiresWritePath(ColumnFamilyStore cfs) + public boolean requiresWritePath(ColumnFamilyStore cfs) Review Comment: Looks like this is only used externally by `CassandraStreamReceiverTest`, so suggest changing to default visibility and use `@VisibleForTesting` (which would need to be imported as well). ```suggestion @VisibleForTesting boolean requiresWritePath(ColumnFamilyStore cfs) ``` ########## src/java/org/apache/cassandra/repair/autorepair/RepairTokenRangeSplitter.java: ########## @@ -0,0 +1,1001 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair.autorepair; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.EnumMap; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.cassandra.db.DataRange; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.io.sstable.SSTableReadsListener; +import org.apache.cassandra.io.sstable.format.big.BigTableReader; +import org.apache.cassandra.tcm.compatibility.TokenRingUtils; +import org.apache.cassandra.utils.FBUtilities; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.clearspring.analytics.stream.cardinality.CardinalityMergeException; +import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus; +import com.clearspring.analytics.stream.cardinality.ICardinality; +import org.apache.cassandra.config.DataStorageSpec; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.db.lifecycle.SSTableIntervalTree; +import org.apache.cassandra.db.lifecycle.SSTableSet; +import org.apache.cassandra.db.lifecycle.View; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.big.BigTableScanner; +import org.apache.cassandra.io.sstable.metadata.CompactionMetadata; +import org.apache.cassandra.io.sstable.metadata.MetadataType; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.service.AutoRepairService; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.concurrent.Refs; + +import static org.apache.cassandra.repair.autorepair.AutoRepairUtils.split; + +/** + * In Apache Cassandra, tuning repair ranges has four main goals: + * <ol> + * <li> + * <b>Create smaller, consistent repair times</b>: Long repairs, such as those lasting 15 hours, can be problematic. + * If a node fails 14 hours into the repair, the entire process must be restarted. The goal is to reduce the impact + * of disturbances or failures. However, making the repairs too short can lead to overhead from repair orchestration + * becoming the main bottleneck. + * </li> + * <li> + * <b>Minimize the impact on hosts</b>: Repairs should not heavily affect the host systems. For incremental repairs, + * this might involve anti-compaction work. In full repairs, streaming large amounts of data—especially with wide + * partitions—can lead to issues with disk usage and higher compaction costs. + * </li> + * <li> + * <b>Reduce overstreaming</b>: The Merkle tree, which represents data within each partition and range, + * has a maximum size. If a repair covers too many partitions, the tree’s leaves represent larger data ranges. + * Even a small change in a leaf can trigger excessive data streaming, making the process inefficient. + * </li> + * <li> + * <b>Reduce number of repairs</b>: If there are many small tables, it's beneficial to batch these tables together + * under a single parent repair. This prevents the repair overhead from becoming a bottleneck, especially when + * dealing with hundreds of tables. Running individual repairs for each table can significantly impact performance + * and efficiency. + * </li> + * </ol> + * To manage these issues, the strategy involves estimating the size and number of partitions within a range and + * splitting it accordingly to bound the size of the range splits. This is established by iterating over SSTable + * index files to estimate the amount of bytes and partitions involved in the ranges being repaired and by what + * repair type is being invoked. + * <p/> + * While this splitter has a lot of tuning parameters, the expectation is that the established default configuration + * shall be sensible for all {@link org.apache.cassandra.repair.autorepair.AutoRepairConfig.RepairType}'s. The following + * configuration parameters are offered. + * <ul> + * <li> + * <b>bytes_per_assigment</b>: The target and maximum amount of bytes that should be included in a repair + * assignment. This is meant to scope the amount of work involved in a repair. For incremental repair, this + * involves the total number of bytes in all SSTables containing unrepaired data involving the ranges being + * repaired, including data that doesn't cover the range. This is to account for the amount of anticompaction + * that is expected. For all other repair types, this involves the amount of data covering the range being + * repaired. + * </li> + * <li> + * <b>partitions_per_assignment</b>: The maximum number of partitions that should be included in a repair + * assignment. This configuration exists to reduce excessive overstreaming by attempting to limit the number + * of partitions present in a merkle tree leaf node. + * </li> + * <li> + * <b>max_tables_per_assignment</b>: The maximum number of tables that can be included in a repair assignment. + * This aims to reduce the number of repairs, especially in cases where a large amount of tables exists for + * a keyspace. Note that the splitter will avoid batching tables together if they exceed the other + * configuration parameters such as <code>bytes_per_assignment</code> and <code>partitions_per_assignment</code>. + * </li> + * <li> + * <b>max_bytes_per_schedule</b>: The maximum number of bytes to cover an individual schedule. This serves + * as a mechanism for throttling the amount of work that can be done on each repair cycle. One may opt to + * reduce this value if the impact of repairs is causing too many load on the cluster, or increase it if + * writes outpace the amount of data being repaired. Alternatively, one may want to choose tuning down or up + * the <code>min_repair_interval</code>. + * </li> + * </ul> + * Given the impact of what each repair type accomplishes, different defaults are established per repair type. + * <ul> + * <li> + * <b>full</b>: Configured in a way that attempts to accomplish repairing all data in a schedule, with + * individual repairs targeting at most 200GiB of data and 1048576 partitions. + * <b>max_bytes_per_schedule</b> is set to a large value for full repair to attempt to repair all data per + * repair schedule. + * <ul> + * <li><b>bytes_per_assignment</b>: 200GiB</li> + * <li><b>partitions_per_assignment</b>: 1048576</li> + * <li><b>max_tables_per_assignment</b>: 64</li> + * <li><b>max_bytes_per_schedule</b>: 100TiB</li> + * </ul> + * </li> + * <li> + * <b>incremental</b>: Configured in a way that attempts to repair 50GiB of data per repair, and 100GiB per + * schedule. This attempts to throttle the amount of IR and anticompaction done per schedule after turning + * incremental on for the first time. You may want to consider increasing <code>max_bytes_per_schedule</code> + * more than this much data is written per <code>min_repair_interval</code>. + * <ul> + * <li><b>bytes_per_assignment</b>: 50GiB</li> + * <li><b>partitions_per_assignment</b>: 1048576</li> + * <li><b>max_tables_per_assignment</b>: 64</li> + * <li><b>max_bytes_per_schedule</b>: 100GiB</li> + * </ul> + * </li> + * <li> + * <b>preview_repaired</b>: Configured in a way that attempts to accomplish previewing all data in a schedule, + * with previews targeting at most 200GiB of data and 1048576 partitions. + * <ul> + * <li><b>bytes_per_assignment</b>: 200GiB</li> + * <li><b>partitions_per_assignment</b>: 1048576</li> + * <li><b>max_tables_per_assignment</b>: 64</li> + * <li><b>max_bytes_per_schedule</b>: 100TiB</li> + * </ul> + * </li> + * </ul> + */ +public class RepairTokenRangeSplitter implements IAutoRepairTokenRangeSplitter +{ + private static final Logger logger = LoggerFactory.getLogger(RepairTokenRangeSplitter.class); + + // Default max bytes to 100TiB, which is much more readable than Long.MAX_VALUE + private static final DataStorageSpec.LongBytesBound MAX_BYTES = new DataStorageSpec.LongBytesBound(100_000, DataStorageSpec.DataStorageUnit.GIBIBYTES); + + static final String BYTES_PER_ASSIGNMENT = "bytes_per_assignment"; + static final String PARTITIONS_PER_ASSIGNMENT = "partitions_per_assignment"; + static final String MAX_TABLES_PER_ASSIGNMENT = "max_tables_per_assignment"; + static final String MAX_BYTES_PER_SCHEDULE = "max_bytes_per_schedule"; + + static final List<String> PARAMETERS = Arrays.asList(BYTES_PER_ASSIGNMENT, PARTITIONS_PER_ASSIGNMENT, MAX_TABLES_PER_ASSIGNMENT, MAX_BYTES_PER_SCHEDULE); + + private final AutoRepairConfig.RepairType repairType; + + private final Map<String, String> givenParameters = new HashMap<>(); + + private DataStorageSpec.LongBytesBound bytesPerAssignment; + private long partitionsPerAssignment; + private int maxTablesPerAssignment; + private DataStorageSpec.LongBytesBound maxBytesPerSchedule; + + /** + * Established default for each {@link org.apache.cassandra.repair.autorepair.AutoRepairConfig.RepairType}, meant to + * choose sensible defaults for each. + * <p> + * Defaults if not specified for the given repair type: + * <li> + * <ul><b>bytes_per_assignment</b>: 200GiB</ul> + * <ul><b>partitions_per_assignment</b>: 2^repair_session_max_tree_depth</ul> + * <ul><b>max_tables_per_assignment</b>: 64</ul> + * <ul><b>max_bytes_per_schedule</b>: 1000GiB</ul> + * </li> + * It's expected that these defaults should work well for everything except incremental, so we confine + * bytes_per_assignment to 50GiB and max_bytes_per_schedule to 100GiB. This should strike a good balance + * between the amount of data that will be repaired during an initial migration to incremental repair and should + * move the entire repaired set from unrepaired to repaired at steady state, assuming not more the 100GiB of + * data is written to a node per min_repair_interval. + */ + private static final Map<AutoRepairConfig.RepairType, RepairTypeDefaults> DEFAULTS_BY_REPAIR_TYPE = new EnumMap<>(AutoRepairConfig.RepairType.class) {{ + put(AutoRepairConfig.RepairType.FULL, RepairTypeDefaults.builder(AutoRepairConfig.RepairType.FULL) + .build()); + // Restrict incremental repair to 50GB bytes per assignment to confine the amount of possible autocompaction. + put(AutoRepairConfig.RepairType.INCREMENTAL, RepairTypeDefaults.builder(AutoRepairConfig.RepairType.INCREMENTAL) + .withBytesPerAssignment(new DataStorageSpec.LongBytesBound("50GiB")) + .withMaxBytesPerSchedule(new DataStorageSpec.LongBytesBound("100GiB")) + .build()); + put(AutoRepairConfig.RepairType.PREVIEW_REPAIRED, RepairTypeDefaults.builder(AutoRepairConfig.RepairType.PREVIEW_REPAIRED) + .build()); + }}; + + public RepairTokenRangeSplitter(AutoRepairConfig.RepairType repairType, Map<String, String> parameters) + { + this.repairType = repairType; + this.givenParameters.putAll(parameters); + + reinitParameters(); + } + + private void reinitParameters() + { + RepairTypeDefaults defaults = DEFAULTS_BY_REPAIR_TYPE.get(repairType); + + DataStorageSpec.LongBytesBound bytesPerAssignmentTmp = getPropertyOrDefault(BYTES_PER_ASSIGNMENT, DataStorageSpec.LongBytesBound::new, defaults.bytesPerAssignment); + DataStorageSpec.LongBytesBound maxBytesPerScheduleTmp = getPropertyOrDefault(MAX_BYTES_PER_SCHEDULE, DataStorageSpec.LongBytesBound::new, defaults.maxBytesPerSchedule); + + // Validate that bytesPerAssignment <= maxBytesPerSchedule + if (bytesPerAssignmentTmp.toBytes() > maxBytesPerScheduleTmp.toBytes()) + { + throw new IllegalArgumentException(String.format("%s='%s' cannot be greater than %s='%s' for %s", + BYTES_PER_ASSIGNMENT, + bytesPerAssignmentTmp, + MAX_BYTES_PER_SCHEDULE, + maxBytesPerScheduleTmp, + repairType.getConfigName())); + } + + bytesPerAssignment = bytesPerAssignmentTmp; + maxBytesPerSchedule = maxBytesPerScheduleTmp; + + partitionsPerAssignment = getPropertyOrDefault(PARTITIONS_PER_ASSIGNMENT, Long::parseLong, defaults.partitionsPerAssignment); + maxTablesPerAssignment = getPropertyOrDefault(MAX_TABLES_PER_ASSIGNMENT, Integer::parseInt, defaults.maxTablesPerAssignment); + + logger.info("Configured {}[{}] with {}={}, {}={}, {}={}, {}={}", RepairTokenRangeSplitter.class.getName(), + repairType.getConfigName(), + BYTES_PER_ASSIGNMENT, bytesPerAssignment, + PARTITIONS_PER_ASSIGNMENT, partitionsPerAssignment, + MAX_TABLES_PER_ASSIGNMENT, maxTablesPerAssignment, + MAX_BYTES_PER_SCHEDULE, maxBytesPerSchedule); + } + + private <T> T getPropertyOrDefault(String propertyName, Function<String, T> mapper, T defaultValue) + { + return Optional.ofNullable(this.givenParameters.get(propertyName)).map(mapper).orElse(defaultValue); + } + + @Override + public Iterator<KeyspaceRepairAssignments> getRepairAssignments(boolean primaryRangeOnly, List<PrioritizedRepairPlan> repairPlans) + { + return new BytesBasedRepairAssignmentIterator(primaryRangeOnly, repairPlans); + } + + /** + * A custom {@link RepairAssignmentIterator} that confines the number of repair assignments to + * <code>max_bytes_per_schedule</code>. + */ + private class BytesBasedRepairAssignmentIterator extends RepairAssignmentIterator { + + private final boolean primaryRangeOnly; + private long bytesSoFar = 0; + + BytesBasedRepairAssignmentIterator(boolean primaryRangeOnly, List<PrioritizedRepairPlan> repairPlans) + { + super(repairPlans); + this.primaryRangeOnly = primaryRangeOnly; + } + + @Override + protected KeyspaceRepairAssignments next(int priority, KeyspaceRepairPlan repairPlan) + { + // short circuit if we've accumulated too many bytes by returning a KeyspaceRepairAssignments with + // no assignments. We do this rather than returning false in hasNext() because we want to signal + // to AutoRepair that a keyspace generated no assignments. + if (bytesSoFar >= maxBytesPerSchedule.toBytes()) + { + return new KeyspaceRepairAssignments(priority, repairPlan.getKeyspaceName(), Collections.emptyList()); + } + + Collection<Range<Token>> tokenRanges = getTokenRanges(primaryRangeOnly, repairPlan.getKeyspaceName()); + List<SizedRepairAssignment> repairAssignments = getRepairAssignmentsForKeyspace(repairType, repairPlan.getKeyspaceName(), repairPlan.getTableNames(), tokenRanges); + FilteredRepairAssignments filteredRepairAssignments = filterRepairAssignments(priority, repairPlan.getKeyspaceName(), repairAssignments, bytesSoFar); + bytesSoFar = filteredRepairAssignments.newBytesSoFar; + return new KeyspaceRepairAssignments(priority, repairPlan.getKeyspaceName(), filteredRepairAssignments.repairAssignments); + } + } + + @VisibleForTesting + List<SizedRepairAssignment> getRepairAssignmentsForKeyspace(AutoRepairConfig.RepairType repairType, String keyspaceName, List<String> tableNames, Collection<Range<Token>> tokenRanges) + { + List<SizedRepairAssignment> repairAssignments = new ArrayList<>(); + // this is used for batching minimal single assignment tables together + List<SizedRepairAssignment> currentAssignments = new ArrayList<>(); + + AutoRepairConfig config = AutoRepairService.instance.getAutoRepairConfig(); + + // If we can repair by keyspace, sort the tables by size so can batch the smallest ones together + boolean repairByKeyspace = config.getRepairByKeyspace(repairType); + if (repairByKeyspace) + { + tableNames.sort((t1, t2) -> { + ColumnFamilyStore cfs1 = ColumnFamilyStore.getIfExists(keyspaceName, t1); + ColumnFamilyStore cfs2 = ColumnFamilyStore.getIfExists(keyspaceName, t2); + // If for whatever reason the CFS is not retrievable, we can assume it has been deleted, so give the + // other cfs precedence. + if (cfs1 == null) + { + // cfs1 is lesser than because its null + return -1; + } + else if (cfs2 == null) + { + // cfs1 is greather than because cfs2 is null + return 1; + } + return Long.compare(cfs1.metric.totalDiskSpaceUsed.getCount(), cfs2.metric.totalDiskSpaceUsed.getCount()); + }); + } + + for (String tableName : tableNames) + { + List<SizedRepairAssignment> tableAssignments = getRepairAssignmentsForTable(keyspaceName, tableName, tokenRanges); + + if (tableAssignments.isEmpty()) + continue; + + // if not repairing by keyspace don't attempt to batch them with others. + if (!repairByKeyspace) + { + repairAssignments.addAll(tableAssignments); + } + // If the table assignments are for the same token range, and we have room to add more tables to the current assignment + else if (tableAssignments.size() == 1 && + currentAssignments.size() < maxTablesPerAssignment && + (currentAssignments.isEmpty() || currentAssignments.get(0).getTokenRange().equals(tableAssignments.get(0).getTokenRange()))) + { + long currentAssignmentsBytes = getEstimatedBytes(currentAssignments); + long tableAssignmentsBytes = getEstimatedBytes(tableAssignments); + // only add assignments together if they don't exceed max bytes per schedule. + if (currentAssignmentsBytes + tableAssignmentsBytes < maxBytesPerSchedule.toBytes()) { + currentAssignments.addAll(tableAssignments); + } + else + { + // add table assignments by themselves + repairAssignments.addAll(tableAssignments); + } + } + else + { + if (!currentAssignments.isEmpty()) + { + repairAssignments.add(merge(currentAssignments)); + currentAssignments.clear(); + } + repairAssignments.addAll(tableAssignments); + } + } + + if (!currentAssignments.isEmpty()) + repairAssignments.add(merge(currentAssignments)); + + return repairAssignments; + } + + /** + * Given a repair type and map of sized-based repair assignments, confine them by <code>maxBytesPerSchedule</code>. + * @param repairAssignments the assignments to filter. + * @param bytesSoFar repair assignment bytes accumulated so far. + * @return A list of repair assignments confined by <code>maxBytesPerSchedule</code>. + */ + @VisibleForTesting + FilteredRepairAssignments filterRepairAssignments(int priority, String keyspaceName, List<SizedRepairAssignment> repairAssignments, long bytesSoFar) + { + // Confine repair assignments by maxBytesPerSchedule. + long bytesSoFarThisIteration = 0L; + long bytesNotRepaired = 0L; + int assignmentsNotRepaired = 0; + int assignmentsToRepair = 0; + int totalAssignments = 0; + + List<RepairAssignment> assignmentsToReturn = new ArrayList<>(repairAssignments.size()); + for (SizedRepairAssignment repairAssignment : repairAssignments) + { + totalAssignments++; + // skip any repair assignments that would accumulate us past the maxBytesPerSchedule + if (bytesSoFar + repairAssignment.getEstimatedBytes() > maxBytesPerSchedule.toBytes()) + { + // log that repair assignment was skipped. + bytesNotRepaired += repairAssignment.getEstimatedBytes(); + assignmentsNotRepaired++; + logger.warn("Skipping {} because it would increase total repair bytes to {}", + repairAssignment, + getBytesOfMaxBytesPerSchedule(bytesSoFar + repairAssignment.getEstimatedBytes())); + } + else + { + bytesSoFar += repairAssignment.getEstimatedBytes(); + bytesSoFarThisIteration += repairAssignment.getEstimatedBytes(); + assignmentsToRepair++; + logger.info("Adding {}, increasing repair bytes to {}", + repairAssignment, + getBytesOfMaxBytesPerSchedule(bytesSoFar)); + assignmentsToReturn.add(repairAssignment); + } + } + + String message = "Returning {} assignment(s) for priorityBucket {} and keyspace {}, totaling {} ({} overall)"; + if (assignmentsNotRepaired != 0) + { + message += ". Skipping {} of {} assignment(s), totaling {}"; + if (repairType != AutoRepairConfig.RepairType.INCREMENTAL) + { + message += ". The entire primary range will not be repaired this schedule. " + + "Consider increasing maxBytesPerSchedule, reducing node density or monitoring to ensure " + + "all ranges do get repaired within gc_grace_seconds"; + logger.warn(message, assignmentsToRepair, priority, keyspaceName, + FileUtils.stringifyFileSize(bytesSoFarThisIteration), + getBytesOfMaxBytesPerSchedule(bytesSoFar), + assignmentsNotRepaired, totalAssignments, + FileUtils.stringifyFileSize(bytesNotRepaired)); + } + else + { + logger.info(message, assignmentsToRepair, priority, keyspaceName, + FileUtils.stringifyFileSize(bytesSoFarThisIteration), + getBytesOfMaxBytesPerSchedule(bytesSoFar), + assignmentsNotRepaired, totalAssignments, + FileUtils.stringifyFileSize(bytesNotRepaired)); + } + } + else + { + logger.info(message, assignmentsToRepair, priority, keyspaceName, + FileUtils.stringifyFileSize(bytesSoFarThisIteration), + getBytesOfMaxBytesPerSchedule(bytesSoFar)); + } + + return new FilteredRepairAssignments(assignmentsToReturn, bytesSoFar); + } + + @VisibleForTesting + static class FilteredRepairAssignments + { + final List<RepairAssignment> repairAssignments; + final long newBytesSoFar; + + private FilteredRepairAssignments(List<RepairAssignment> repairAssignments, long newBytesSoFar) + { + this.repairAssignments = repairAssignments; + this.newBytesSoFar = newBytesSoFar; + } + } + + private String getBytesOfMaxBytesPerSchedule(long bytes) + { + if (maxBytesPerSchedule.equals(MAX_BYTES)) + return FileUtils.stringifyFileSize(bytes); + else + return String.format("%s of %s", FileUtils.stringifyFileSize(bytes), maxBytesPerSchedule); + } + + /** + * @return The sum of {@link SizedRepairAssignment#getEstimatedBytes()} of all given + * repairAssignments. + * @param repairAssignments The assignments to sum + */ + @VisibleForTesting + protected static long getEstimatedBytes(List<SizedRepairAssignment> repairAssignments) + { + return repairAssignments + .stream() + .mapToLong(SizedRepairAssignment::getEstimatedBytes) + .sum(); + } + + @VisibleForTesting + static SizedRepairAssignment merge(List<SizedRepairAssignment> assignments) + { + if (assignments.isEmpty()) + throw new IllegalStateException("Cannot merge empty assignments"); + + Set<String> mergedTableNames = new HashSet<>(); + Range<Token> referenceTokenRange = assignments.get(0).getTokenRange(); + String referenceKeyspaceName = assignments.get(0).getKeyspaceName(); + + for (SizedRepairAssignment assignment : assignments) + { + // These checks _should_ be unnecessary but are here to ensure that the assignments are consistent + if (!assignment.getTokenRange().equals(referenceTokenRange)) + throw new IllegalStateException("All assignments must have the same token range"); + if (!assignment.getKeyspaceName().equals(referenceKeyspaceName)) + throw new IllegalStateException("All assignments must have the same keyspace name"); + + mergedTableNames.addAll(assignment.getTableNames()); + } + + long sizeForAssignment = getEstimatedBytes(assignments); + return new SizedRepairAssignment(referenceTokenRange, referenceKeyspaceName, new ArrayList<>(mergedTableNames), + "full primary range for " + mergedTableNames.size() + " tables", sizeForAssignment); + } + + @VisibleForTesting + protected List<SizedRepairAssignment> getRepairAssignmentsForTable(String keyspaceName, String tableName, Collection<Range<Token>> tokenRanges) + { + List<SizeEstimate> sizeEstimates = getRangeSizeEstimate(keyspaceName, tableName, tokenRanges); + return getRepairAssignments(sizeEstimates); + } + + private static void logSkippingTable(String keyspaceName, String tableName) + { + logger.warn("Could not resolve table data for {}.{} assuming it has since been deleted, skipping", keyspaceName, tableName); + } + + @VisibleForTesting + protected List<SizedRepairAssignment> getRepairAssignments(List<SizeEstimate> sizeEstimates) + { + List<SizedRepairAssignment> repairAssignments = new ArrayList<>(); + + // since its possible for us to hit maxBytesPerSchedule before seeing all ranges, shuffle so there is chance + // at least of hitting all the ranges _eventually_ for the worst case scenarios + Collections.shuffle(sizeEstimates); + int totalExpectedSubRanges = 0; + for (SizeEstimate estimate : sizeEstimates) + { + if (estimate.sizeForRepair != 0) + { + boolean needsSplitting = estimate.sizeForRepair > bytesPerAssignment.toBytes() || estimate.partitions > partitionsPerAssignment; + if (needsSplitting) + { + totalExpectedSubRanges += calculateNumberOfSplits(estimate); + } + } + } + for (SizeEstimate estimate : sizeEstimates) + { + if (estimate.sizeForRepair == 0) + { + ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(estimate.keyspace, estimate.table); + + if (cfs == null) + { + logSkippingTable(estimate.keyspace, estimate.table); + continue; + } + + long memtableSize = cfs.getTracker().getView().getCurrentMemtable().getLiveDataSize(); + if (memtableSize > 0L) + { + logger.debug("Included {}.{} range {}, had no unrepaired SSTables, but memtableSize={}, adding single repair assignment", estimate.keyspace, estimate.table, estimate.tokenRange, memtableSize); + SizedRepairAssignment assignment = new SizedRepairAssignment(estimate.tokenRange, estimate.keyspace, Collections.singletonList(estimate.table), "full primary rangee for table with memtable only detected", memtableSize); + repairAssignments.add(assignment); + } + else + { + logger.debug("Included {}.{} range {}, has no SSTables or memtable data, but adding single repair assignment for entire range in case writes were missed", estimate.keyspace, estimate.table, estimate.tokenRange); + SizedRepairAssignment assignment = new SizedRepairAssignment(estimate.tokenRange, estimate.keyspace, Collections.singletonList(estimate.table), "full primary range for table with no data detected", 0L); + repairAssignments.add(assignment); + } + } + else + { + // Check if the estimate needs splitting based on the criteria + boolean needsSplitting = estimate.sizeForRepair > bytesPerAssignment.toBytes() || estimate.partitions > partitionsPerAssignment; + if (needsSplitting) + { + int numberOfSplits = calculateNumberOfSplits(estimate); + long approximateBytesPerSplit = estimate.sizeForRepair / numberOfSplits; + Collection<Range<Token>> subranges = split(estimate.tokenRange, numberOfSplits); + for (Range<Token> subrange : subranges) + { + SizedRepairAssignment assignment = new SizedRepairAssignment(subrange, estimate.keyspace, Collections.singletonList(estimate.table), + String.format("subrange %d of %d", repairAssignments.size()+1, totalExpectedSubRanges), + approximateBytesPerSplit); + repairAssignments.add(assignment); + } + } + else + { + // No splitting needed, repair the entire range as-is + SizedRepairAssignment assignment = new SizedRepairAssignment(estimate.tokenRange, estimate.keyspace, + Collections.singletonList(estimate.table), + "full primary range for table", estimate.sizeForRepair); + repairAssignments.add(assignment); + } + } + } + return repairAssignments; + } + + private int calculateNumberOfSplits(SizeEstimate estimate) + { + // Calculate the number of splits needed for size and partitions + int splitsForSize = (int) Math.ceil((double) estimate.sizeForRepair / bytesPerAssignment.toBytes()); + int splitsForPartitions = (int) Math.ceil((double) estimate.partitions / partitionsPerAssignment); + + // Split the token range into subranges based on whichever (partitions, bytes) would generate the most splits. + boolean splitBySize = splitsForSize > splitsForPartitions; + int splits = splitBySize ? splitsForSize : splitsForPartitions; + + // calculate approximation for logging purposes + long approximateBytesPerSplit = estimate.sizeForRepair / splits; + long approximatePartitionsPerSplit = estimate.partitions / splits; + + logger.info("Splitting {}.{} for range {} into {} sub ranges by {} (splitsForSize={}, splitsForPartitions={}, " + + "approximateBytesInRange={}, approximatePartitionsInRange={}, " + + "approximateBytesPerSplit={}, approximatePartitionsPerSplit={})", + estimate.keyspace, estimate.table, estimate.tokenRange, + splits, splitBySize ? "size" : "partitions", + splitsForSize, splitsForPartitions, + FileUtils.stringifyFileSize(estimate.sizeForRepair), estimate.partitions, + FileUtils.stringifyFileSize(approximateBytesPerSplit), approximatePartitionsPerSplit + ); + return splits; + } + + private Collection<Range<Token>> getTokenRanges(boolean primaryRangeOnly, String keyspaceName) + { + // Collect all applicable token ranges + Collection<Range<Token>> wrappedRanges; + if (primaryRangeOnly) + { + wrappedRanges = TokenRingUtils.getPrimaryRangesForEndpoint(keyspaceName, FBUtilities.getBroadcastAddressAndPort()); + } + else + { + wrappedRanges = StorageService.instance.getLocalRanges(keyspaceName); + } + + // Unwrap each range as we need to account for ranges that overlap the ring + List<Range<Token>> ranges = new ArrayList<>(); + for (Range<Token> wrappedRange : wrappedRanges) + { + ranges.addAll(wrappedRange.unwrap()); + } + return ranges; + } + + private List<SizeEstimate> getRangeSizeEstimate(String keyspace, String table, Collection<Range<Token>> tokenRanges) + { + List<SizeEstimate> sizeEstimates = new ArrayList<>(); + for (Range<Token> tokenRange : tokenRanges) + { + logger.debug("Calculating size estimate for {}.{} for range {}", keyspace, table, tokenRange); + try (Refs<SSTableReader> refs = getSSTableReaderRefs(repairType, keyspace, table, tokenRange)) + { + SizeEstimate estimate = getSizesForRangeOfSSTables(repairType, keyspace, table, tokenRange, refs); + logger.debug("Generated size estimate {}", estimate); + sizeEstimates.add(estimate); + } + } + return sizeEstimates; + } + + @VisibleForTesting + static SizeEstimate getSizesForRangeOfSSTables(AutoRepairConfig.RepairType repairType, String keyspace, String table, Range<Token> tokenRange, Refs<SSTableReader> refs) + { + ICardinality cardinality = new HyperLogLogPlus(13, 25); + long approxBytesInRange = 0L; + long totalBytes = 0L; + + for (SSTableReader reader : refs) + { + try + { + if (reader.openReason == SSTableReader.OpenReason.EARLY) + continue; + CompactionMetadata metadata = (CompactionMetadata) reader.descriptor.getMetadataSerializer().deserialize(reader.descriptor, MetadataType.COMPACTION); + if (metadata != null) + cardinality = cardinality.merge(metadata.cardinalityEstimator); + + long sstableSize = reader.bytesOnDisk(); + totalBytes += sstableSize; + // get the bounds of the sstable for this range using the index file but do not actually read it. + List<AbstractBounds<PartitionPosition>> bounds = BigTableScanner.makeBounds(reader, Collections.singleton(tokenRange)); Review Comment: A couple issues here I realized just now. 1. `makeBounds` with a `TokenRange` was removed between 4.1 and trunk. We also had changed visibility to public which probably shouldn't have done in hindsight. 2. `BtiTableScanner` (trie-based index) is now a thing, and we need to account for that. 3. We probably don't need to use `makeBounds` directly; I believe the only two things it was use for was checking whether the SSTable encompassed the range (which it should since we pre-filter), and to get the approximate bytes in the range. In fact it looks like there's an API method on `SSTable` called `onDiskSizeForPartitionPositions` that will do this for us in an implementation agnostic way. ########## src/java/org/apache/cassandra/service/StorageService.java: ########## @@ -1113,6 +1117,18 @@ public void doAuthSetup(boolean async) } } + public void doAutoRepairSetup() + { + AutoRepairService.setup(); + if (DatabaseDescriptor.getAutoRepairConfig().isAutoRepairSchedulingEnabled()) + { + logger.info("Enable auto-repair scheduling"); Review Comment: ```suggestion logger.info("Enabling auto-repair scheduling"); ``` ########## test/unit/org/apache/cassandra/service/AutoRepairServiceRepairTypeTest.java: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service; + +import com.google.common.collect.ImmutableSet; +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.repair.autorepair.AutoRepairConfig; +import org.apache.cassandra.repair.autorepair.AutoRepairUtils; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Set; +import java.util.UUID; + +import static org.apache.cassandra.Util.setAutoRepairEnabled; +import static org.apache.cassandra.config.CassandraRelevantProperties.SYSTEM_DISTRIBUTED_DEFAULT_RF; +import static org.junit.Assert.assertEquals; + +@RunWith(Parameterized.class) +public class AutoRepairServiceRepairTypeTest extends CQLTester { + @Parameterized.Parameter() + public AutoRepairConfig.RepairType repairType; + + private final UUID host1 = UUID.fromString("00000000-0000-0000-0000-000000000001"); + private final UUID host2 = UUID.fromString("00000000-0000-0000-0000-000000000002"); + + private AutoRepairService instance; + + @Parameterized.Parameters(name = "repairType={0}") + public static Collection<AutoRepairConfig.RepairType> repairTypes() { Review Comment: nit: paren should go on next line in this file ########## src/java/org/apache/cassandra/repair/autorepair/FixedSplitTokenRangeSplitter.java: ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair.autorepair; + + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.service.AutoRepairService; + +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.tcm.compatibility.TokenRingUtils; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.repair.autorepair.AutoRepairUtils.split; + +public class FixedSplitTokenRangeSplitter implements IAutoRepairTokenRangeSplitter +{ + private static final Logger logger = LoggerFactory.getLogger(FixedSplitTokenRangeSplitter.class); + + /** + * The number of subranges to split each to-be-repaired token range into. Defaults to 1. + * <p> + * The higher this number, the smaller the repair sessions will be. + * <p> + * If you are using vnodes, say 256, then the repair will always go one vnode range at a time. This parameter, + * additionally, will let us further subdivide a given vnode range into subranges. + * <p> + * With the value "1" and vnodes of 256, a given table on a node will undergo the repair 256 times. But with a + * value "2", the same table on a node will undergo a repair 512 times because every vnode range will be further + * divided by two. + * <p> + * If you do not use vnodes or the number of vnodes is pretty small, say 8, setting this value to a higher number, + * such as 16, will be useful to repair on a smaller range, and the chance of succeeding is higher. + */ + static final String NUMBER_OF_SUBRANGES = "number_of_subranges"; + + private final AutoRepairConfig.RepairType repairType; + private int numberOfSubranges; + + public FixedSplitTokenRangeSplitter(AutoRepairConfig.RepairType repairType, Map<String, String> parameters) + { + this.repairType = repairType; + + numberOfSubranges = Integer.parseInt(parameters.getOrDefault(NUMBER_OF_SUBRANGES, "1")); + } + + @Override + public Iterator<KeyspaceRepairAssignments> getRepairAssignments(boolean primaryRangeOnly, List<PrioritizedRepairPlan> repairPlans) + { + return new RepairAssignmentIterator(repairPlans) { Review Comment: nit: paren should go on next line ########## src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java: ########## @@ -217,6 +218,8 @@ public SequenceState executeNext() "For more, see `nodetool help bootstrap`. {}", SystemKeyspace.getBootstrapState()); return halted(); } + // this node might have just bootstrapped; check if we should run repair immediately + AutoRepairUtils.runRepairOnNewlyBootstrappedNodeIfEnabled(); Review Comment: Maybe paranoid on my part, but could we move this towards the end, maybe in the end of the `if (finishJoiningRing)` block? This way is something unexpected happens in this code path, it doesn't skip marking the bootstrap as complete/updating CMS. ########## test/unit/org/apache/cassandra/service/AutoRepairServiceSetterTest.java: ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service; + +import com.google.common.collect.ImmutableSet; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.repair.autorepair.AutoRepairConfig; +import org.apache.cassandra.repair.autorepair.AutoRepairUtils; +import org.apache.cassandra.schema.SchemaConstants; +import org.apache.cassandra.schema.SystemDistributedKeyspace; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Set; +import java.util.UUID; +import java.util.function.BiConsumer; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.cassandra.Util.setAutoRepairEnabled; +import static org.assertj.core.api.Assertions.assertThat; + +@RunWith(Parameterized.class) +public class AutoRepairServiceSetterTest<T> extends CQLTester { + private static final AutoRepairConfig config = new AutoRepairConfig(true); + + @Parameterized.Parameter + public AutoRepairConfig.RepairType repairTypeStr; + + @Parameterized.Parameter(1) + public T arg; + + @Parameterized.Parameter(2) + public BiConsumer<String, T> setter; + + @Parameterized.Parameter(3) + public Function<AutoRepairConfig.RepairType, T> getter; + + @Parameterized.Parameters(name = "{index}: repairType={0}, arg={1}") + public static Collection<Object[]> testCases() { Review Comment: nit: paren should go on next line in this file ########## test/unit/org/apache/cassandra/config/YamlConfigurationLoaderTest.java: ########## @@ -42,6 +42,8 @@ import static org.apache.cassandra.config.YamlConfigurationLoader.SYSTEM_PROPERTY_PREFIX; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import org.apache.cassandra.repair.autorepair.AutoRepairConfig; Review Comment: nit: import order ########## src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java: ########## @@ -217,6 +218,8 @@ public SequenceState executeNext() "For more, see `nodetool help bootstrap`. {}", SystemKeyspace.getBootstrapState()); return halted(); } + // this node might have just bootstrapped; check if we should run repair immediately + AutoRepairUtils.runRepairOnNewlyBootstrappedNodeIfEnabled(); Review Comment: Same applies for `BootstrapAndReplace`/`ReplaceSameAddress` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org