jaydeepkumar1984 commented on code in PR #3598:
URL: https://github.com/apache/cassandra/pull/3598#discussion_r2030299160


##########
src/java/org/apache/cassandra/repair/autorepair/AutoRepairConfig.java:
##########
@@ -0,0 +1,609 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.autorepair;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+
+import javax.annotation.Nonnull;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+
+import org.apache.cassandra.config.DurationSpec;
+import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.utils.LocalizeString;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Defines configurations for AutoRepair.
+ */
+public class AutoRepairConfig implements Serializable
+{
+    // Enable/Disable the auto-repair scheduler.
+    // If set to false, the scheduler thread will not be started.
+    // If set to true, the repair scheduler thread will be created. The thread 
will
+    // check for secondary configuration available for each repair type (full, 
incremental,
+    // and preview_repaired), and based on that, it will schedule repairs.
+    public volatile Boolean enabled;
+    // Time interval between successive checks to see if ongoing repairs are 
complete or if it is time to schedule
+    // repairs.
+    public final DurationSpec.IntSecondsBound repair_check_interval = new 
DurationSpec.IntSecondsBound("5m");
+    // The scheduler needs to adjust its order when nodes leave the ring. 
Deleted hosts are tracked in metadata
+    // for a specified duration to ensure they are indeed removed before 
adjustments are made to the schedule.
+    public volatile DurationSpec.IntSecondsBound 
history_clear_delete_hosts_buffer_interval = new 
DurationSpec.IntSecondsBound("2h");
+    // Minimum duration for the execution of a single repair task. This 
prevents the scheduler from overwhelming
+    // the node by scheduling too many repair tasks in a short period of time.
+    public volatile DurationSpec.LongSecondsBound repair_task_min_duration = 
new DurationSpec.LongSecondsBound("5s");
+
+    // global_settings overides Options.defaultOptions for all repair types
+    public volatile Options global_settings;
+
+    public static final Class<? extends IAutoRepairTokenRangeSplitter> 
DEFAULT_SPLITTER = RepairTokenRangeSplitter.class;
+
+    // make transient so gets consturcted in the implementation.
+    private final transient Map<RepairType, IAutoRepairTokenRangeSplitter> 
tokenRangeSplitters = new EnumMap<>(RepairType.class);
+
+    public enum RepairType implements Serializable
+    {
+        FULL,
+        INCREMENTAL,
+        PREVIEW_REPAIRED;
+
+        private final String configName;
+
+        RepairType()
+        {
+            this.configName = LocalizeString.toLowerCaseLocalized(name());
+        }
+
+        /**
+         * @return Format of the repair type as it should be represented in 
configuration.
+         * Canonically this is the enum name in lowerCase.
+         */
+        public String getConfigName()
+        {
+            return configName;
+        }
+
+        public static AutoRepairState getAutoRepairState(RepairType repairType)
+        {
+            switch (repairType)
+            {
+                case FULL:
+                    return new FullRepairState();
+                case INCREMENTAL:
+                    return new IncrementalRepairState();
+                case PREVIEW_REPAIRED:
+                    return new PreviewRepairedState();
+            }
+
+            throw new IllegalArgumentException("Invalid repair type: " + 
repairType);
+        }
+
+        /**
+         * Case-insensitive parsing of the repair type string into {@link 
RepairType}
+         *
+         * @param repairTypeStr the repair type string
+         * @return the {@link RepairType} represented by the {@code 
repairTypeStr} string
+         * @throws IllegalArgumentException when the repair type string does 
not match any repair type
+         */
+        public static RepairType parse(String repairTypeStr)
+        {
+            return 
RepairType.valueOf(LocalizeString.toUpperCaseLocalized(Objects.requireNonNull(repairTypeStr,
 "repairTypeStr cannot be null")));
+        }
+    }
+
+    // repair_type_overrides overrides the global_settings for a specific 
repair type.  String used as key instead
+    // of enum to allow lower case key in yaml.
+    public volatile ConcurrentMap<String, Options> repair_type_overrides = 
Maps.newConcurrentMap();
+
+    public AutoRepairConfig()
+    {
+        this(false);
+    }
+
+    public AutoRepairConfig(boolean enabled)
+    {
+        this.enabled = enabled;
+        global_settings = Options.getDefaultOptions();
+    }
+
+    public DurationSpec.IntSecondsBound getRepairCheckInterval()
+    {
+        return repair_check_interval;
+    }
+
+    public boolean isAutoRepairSchedulingEnabled()
+    {
+        return enabled;
+    }
+
+    @VisibleForTesting
+    public void setAutoRepairSchedulingEnabled(boolean enabled)
+    {
+        this.enabled = enabled;
+    }
+
+    public DurationSpec.IntSecondsBound 
getAutoRepairHistoryClearDeleteHostsBufferInterval()
+    {
+        return history_clear_delete_hosts_buffer_interval;
+    }
+
+    public void startScheduler()
+    {
+        enabled = true;
+        AutoRepair.instance.setup();
+    }
+
+    public void setAutoRepairHistoryClearDeleteHostsBufferInterval(String 
duration)
+    {
+        history_clear_delete_hosts_buffer_interval = new 
DurationSpec.IntSecondsBound(duration);
+    }
+
+    public DurationSpec.LongSecondsBound getRepairTaskMinDuration()
+    {
+        return repair_task_min_duration;
+    }
+
+    public void setRepairTaskMinDuration(String duration)
+    {
+        repair_task_min_duration = new DurationSpec.LongSecondsBound(duration);
+    }
+
+    public boolean isAutoRepairEnabled(RepairType repairType)
+    {
+        return enabled && applyOverrides(repairType, opt -> opt.enabled);
+    }
+
+    public void setAutoRepairEnabled(RepairType repairType, boolean enabled)
+    {
+        getOptions(repairType).enabled = enabled;
+    }
+
+    public void setRepairByKeyspace(RepairType repairType, boolean 
repairByKeyspace)
+    {
+        getOptions(repairType).repair_by_keyspace = repairByKeyspace;
+    }
+
+    public boolean getRepairByKeyspace(RepairType repairType)
+    {
+        return applyOverrides(repairType, opt -> opt.repair_by_keyspace);
+    }
+
+    public int getRepairThreads(RepairType repairType)
+    {
+        return applyOverrides(repairType, opt -> opt.number_of_repair_threads);
+    }
+
+    public void setRepairThreads(RepairType repairType, int repairThreads)
+    {
+        getOptions(repairType).number_of_repair_threads = repairThreads;
+    }
+
+    public DurationSpec.IntSecondsBound getRepairMinInterval(RepairType 
repairType)
+    {
+        return applyOverrides(repairType, opt -> opt.min_repair_interval);
+    }
+
+    public void setRepairMinInterval(RepairType repairType, String 
minRepairInterval)
+    {
+        getOptions(repairType).min_repair_interval = new 
DurationSpec.IntSecondsBound(minRepairInterval);
+    }
+
+    public int getRepairSSTableCountHigherThreshold(RepairType repairType)
+    {
+        return applyOverrides(repairType, opt -> opt.sstable_upper_threshold);
+    }
+
+    public void setRepairSSTableCountHigherThreshold(RepairType repairType, 
int sstableHigherThreshold)
+    {
+        getOptions(repairType).sstable_upper_threshold = 
sstableHigherThreshold;
+    }
+
+    public DurationSpec.IntSecondsBound 
getAutoRepairTableMaxRepairTime(RepairType repairType)
+    {
+        return applyOverrides(repairType, opt -> opt.table_max_repair_time);
+    }
+
+    public void setAutoRepairTableMaxRepairTime(RepairType repairType, String 
autoRepairTableMaxRepairTime)
+    {
+        getOptions(repairType).table_max_repair_time = new 
DurationSpec.IntSecondsBound(autoRepairTableMaxRepairTime);
+    }
+
+    public Set<String> getIgnoreDCs(RepairType repairType)
+    {
+        return applyOverrides(repairType, opt -> opt.ignore_dcs);
+    }
+
+    public void setIgnoreDCs(RepairType repairType, Set<String> ignoreDCs)
+    {
+        getOptions(repairType).ignore_dcs = ignoreDCs;
+    }
+
+    public boolean getRepairPrimaryTokenRangeOnly(RepairType repairType)
+    {
+        return applyOverrides(repairType, opt -> 
opt.repair_primary_token_range_only);
+    }
+
+    public void setRepairPrimaryTokenRangeOnly(RepairType repairType, boolean 
primaryTokenRangeOnly)
+    {
+        getOptions(repairType).repair_primary_token_range_only = 
primaryTokenRangeOnly;
+    }
+
+    public int getParallelRepairPercentage(RepairType repairType)
+    {
+        return applyOverrides(repairType, opt -> 
opt.parallel_repair_percentage);
+    }
+
+    public void setParallelRepairPercentage(RepairType repairType, int 
percentage)
+    {
+        getOptions(repairType).parallel_repair_percentage = percentage;
+    }
+
+    public int getParallelRepairCount(RepairType repairType)
+    {
+        return applyOverrides(repairType, opt -> opt.parallel_repair_count);
+    }
+
+    public void setParallelRepairCount(RepairType repairType, int count)
+    {
+        getOptions(repairType).parallel_repair_count = count;
+    }
+
+    public boolean getAllowParallelReplicaRepair(RepairType repairType)
+    {
+        return applyOverrides(repairType, opt -> 
opt.allow_parallel_replica_repair);
+    }
+
+    public void setAllowParallelReplicaRepair(RepairType repairType, boolean 
enabled)
+    {
+        getOptions(repairType).allow_parallel_replica_repair = enabled;
+    }
+
+    public boolean getAllowParallelReplicaRepairAcrossSchedules(RepairType 
repairType)
+    {
+        return applyOverrides(repairType, opt -> 
opt.allow_parallel_replica_repair_across_schedules);
+    }
+
+    public void setAllowParallelReplicaRepairAcrossSchedules(RepairType 
repairType, boolean enabled)
+    {
+        getOptions(repairType).allow_parallel_replica_repair_across_schedules 
= enabled;
+    }
+
+    public boolean getMaterializedViewRepairEnabled(RepairType repairType)
+    {
+        return applyOverrides(repairType, opt -> 
opt.materialized_view_repair_enabled);
+    }
+
+    public void setMaterializedViewRepairEnabled(RepairType repairType, 
boolean enabled)
+    {
+        getOptions(repairType).materialized_view_repair_enabled = enabled;
+    }
+
+    public void setForceRepairNewNode(RepairType repairType, boolean 
forceRepairNewNode)
+    {
+        getOptions(repairType).force_repair_new_node = forceRepairNewNode;
+    }
+
+    public boolean getForceRepairNewNode(RepairType repairType)
+    {
+        return applyOverrides(repairType, opt -> opt.force_repair_new_node);
+    }
+
+    public ParameterizedClass getTokenRangeSplitter(RepairType repairType)
+    {
+        return applyOverrides(repairType, opt -> opt.token_range_splitter);
+    }
+
+    /**
+     * Set a new token range splitter, this is not meant to be used other than 
for testing.
+     */
+    @VisibleForTesting
+    void setTokenRangeSplitter(RepairType repairType, ParameterizedClass 
tokenRangeSplitter)
+    {
+        getOptions(repairType).token_range_splitter = tokenRangeSplitter;
+        tokenRangeSplitters.remove(repairType);
+    }
+

Review Comment:
   Removed



##########
test/unit/org/apache/cassandra/repair/autorepair/FixedSplitTokenRangeSplitterHelper.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.autorepair;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.cassandra.ServerTestUtils;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.dht.BootStrapper;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.index.sai.disk.format.Version;
+import org.apache.cassandra.service.AutoRepairService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.tcm.ClusterMetadata;
+
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.SYSTEM_DISTRIBUTED_DEFAULT_RF;
+import static org.apache.cassandra.cql3.CQLTester.Fuzzed.setupSeed;
+import static org.apache.cassandra.cql3.CQLTester.Fuzzed.updateConfigs;
+import static 
org.apache.cassandra.repair.autorepair.FixedSplitTokenRangeSplitter.DEFAULT_NUMBER_OF_SUBRANGES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Helper class for {@link FixedSplitTokenRangeSplitterNoVNodesTest} and 
{@link FixedSplitTokenRangeSplitterVNodesTest}
+ */
+public class FixedSplitTokenRangeSplitterHelper
+{
+    private static final String TABLE1 = "tbl1";
+    private static final String TABLE2 = "tbl2";
+    private static final String TABLE3 = "tbl3";
+    public static final String KEYSPACE = "ks";
+
+    public static void setupClass(int numTokens) throws Exception
+    {
+        setupSeed();
+        updateConfigs();
+        
DatabaseDescriptor.setPartitioner("org.apache.cassandra.dht.Murmur3Partitioner");
+        ServerTestUtils.prepareServerNoRegister();
+
+        Set<Token> tokens = 
BootStrapper.getRandomTokens(ClusterMetadata.current(), numTokens);
+        ServerTestUtils.registerLocal(tokens);
+        // Ensure that the on-disk format statics are loaded before the test 
run
+        Version.LATEST.onDiskFormat();
+        StorageService.instance.doAutoRepairSetup();
+
+        SYSTEM_DISTRIBUTED_DEFAULT_RF.setInt(1);
+        QueryProcessor.executeInternal(String.format("CREATE KEYSPACE %s WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}", 
FixedSplitTokenRangeSplitterHelper.KEYSPACE));
+    }
+
+    public static void testTokenRangesSplitByTable(int numTokens, int 
numberOfSubRanges, AutoRepairConfig.RepairType repairType)
+    {
+        int numberOfSplits = calcSplits(numTokens, numberOfSubRanges);
+        
AutoRepairService.instance.getAutoRepairConfig().setRepairByKeyspace(repairType,
 false);
+        Collection<Range<Token>> tokens = 
StorageService.instance.getPrimaryRanges(KEYSPACE);

Review Comment:
   Incorporated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org

Reply via email to