Maxwell-Guo commented on code in PR #2287:
URL: https://github.com/apache/cassandra/pull/2287#discussion_r1243052938


##########
src/java/org/apache/cassandra/db/compaction/unified/Controller.java:
##########
@@ -0,0 +1,572 @@
+/*
+ * Copyright DataStax, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction.unified;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.compaction.UnifiedCompactionStrategy;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.utils.Overlaps;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MonotonicClock;
+
+/**
+* The controller provides compaction parameters to the unified compaction 
strategy
+*/
+public class Controller
+{
+    protected static final Logger logger = 
LoggerFactory.getLogger(Controller.class);
+
+    /**
+     * The scaling parameters W, one per bucket index and separated by a comma.
+     * Higher indexes will use the value of the last index with a W specified.
+     */
+    final static String SCALING_PARAMETERS_OPTION = "scaling_parameters";
+    private final static String DEFAULT_SCALING_PARAMETERS = 
CassandraRelevantProperties.UCS_SCALING_PARAMETER.getString();
+
+    /**
+     * Override for the flush size in MB. The database should be able to 
calculate this from executing flushes, this
+     * should only be necessary in rare cases.
+     */
+    static final String FLUSH_SIZE_OVERRIDE_OPTION = "flush_size_override";
+
+    static final String BASE_SHARD_COUNT_OPTION = "base_shard_count";
+    /**
+     * Default base shard count, used when a base count is not explicitly 
supplied. This value applies as long as the
+     * table is not a system one, and directories are not defined.
+     *
+     * For others a base count of 1 is used as system tables are usually small 
and do not need as much compaction
+     * parallelism, while having directories defined provides for parallelism 
in a different way.
+     */
+    public static final int DEFAULT_BASE_SHARD_COUNT = 
CassandraRelevantProperties.UCS_BASE_SHARD_COUNT.getInt();
+
+    static final String TARGET_SSTABLE_SIZE_OPTION = "target_sstable_size";
+    public static final long DEFAULT_TARGET_SSTABLE_SIZE = 
CassandraRelevantProperties.UCS_TARGET_SSTABLE_SIZE.getSizeInBytes();
+    static final long MIN_TARGET_SSTABLE_SIZE = 1L << 20;
+
+    /**
+     * This parameter is intended to modify the shape of the LSM by taking 
into account the survival ratio of data, for now it is fixed to one.
+     */
+    static final double DEFAULT_SURVIVAL_FACTOR = 
CassandraRelevantProperties.UCS_SURVIVAL_FACTOR.getDouble();
+    static final double[] DEFAULT_SURVIVAL_FACTORS = new double[] { 
DEFAULT_SURVIVAL_FACTOR };
+
+    /**
+     * The maximum number of sstables to compact in one operation.
+     *
+     * This is expected to be large and never be reached, but compaction going 
very very late may cause the accumulation
+     * of thousands and even tens of thousands of sstables which may cause 
problems if compacted in one long operation.
+     * The default is chosen to be half of the maximum permitted space 
overhead when the source sstables are of the
+     * minimum sstable size.
+     *
+     * If the fanout factor is larger than the maximum number of sstables, the 
strategy will ignore the latter.
+     */
+    static final String MAX_SSTABLES_TO_COMPACT_OPTION = 
"max_sstables_to_compact";
+
+    static final String ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_OPTION = 
"unsafe_aggressive_sstable_expiration";
+    static final boolean ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION = 
CassandraRelevantProperties.ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION.getBoolean();
+    static final boolean DEFAULT_ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION = 
false;
+
+    static final int DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS = 60 * 10;
+    static final String EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_OPTION = 
"expired_sstable_check_frequency_seconds";
+
+    /** The maximum splitting factor for shards. The maximum number of shards 
is this number multiplied by the base count. */
+    static final double MAX_SHARD_SPLIT = 1048576;
+
+    /**
+     * Overlap inclusion method. NONE for participating sstables only (not 
recommended), SINGLE to only include sstables
+     * that overlap with participating (LCS-like, higher concurrency during 
upgrades but some double compaction),
+     * TRANSITIVE to include overlaps of overlaps (likely to trigger whole 
level compactions, safest).
+     */
+    static final String OVERLAP_INCLUSION_METHOD_OPTION = 
"overlap_inclusion_method";
+    static final Overlaps.InclusionMethod DEFAULT_OVERLAP_INCLUSION_METHOD =
+        
CassandraRelevantProperties.UCS_OVERLAP_INCLUSION_METHOD.getEnum(Overlaps.InclusionMethod.TRANSITIVE);
+
+    protected final ColumnFamilyStore cfs;
+    protected final MonotonicClock clock;
+    private final int[] scalingParameters;
+    protected final double[] survivalFactors;
+    protected final long flushSizeOverride;
+    protected volatile long currentFlushSize;
+    protected final int maxSSTablesToCompact;
+    protected final long expiredSSTableCheckFrequency;
+    protected final boolean ignoreOverlapsInExpirationCheck;
+
+    protected final int baseShardCount;
+
+    protected final double targetSSTableSizeMin;
+
+    protected final Overlaps.InclusionMethod overlapInclusionMethod;
+
+    Controller(ColumnFamilyStore cfs,
+               MonotonicClock clock,
+               int[] scalingParameters,
+               double[] survivalFactors,
+               long flushSizeOverride,
+               int maxSSTablesToCompact,
+               long expiredSSTableCheckFrequency,
+               boolean ignoreOverlapsInExpirationCheck,
+               int baseShardCount,
+               double targetSStableSize,
+               Overlaps.InclusionMethod overlapInclusionMethod)
+    {
+        this.cfs = cfs;
+        this.clock = clock;
+        this.scalingParameters = scalingParameters;
+        this.survivalFactors = survivalFactors;
+        this.flushSizeOverride = flushSizeOverride;
+        this.currentFlushSize = flushSizeOverride;
+        this.expiredSSTableCheckFrequency = 
TimeUnit.MILLISECONDS.convert(expiredSSTableCheckFrequency, TimeUnit.SECONDS);
+        this.baseShardCount = baseShardCount;
+        this.targetSSTableSizeMin = targetSStableSize * Math.sqrt(0.5);
+        this.overlapInclusionMethod = overlapInclusionMethod;
+
+        if (maxSSTablesToCompact <= 0)
+            maxSSTablesToCompact = Integer.MAX_VALUE;
+
+        this.maxSSTablesToCompact = maxSSTablesToCompact;
+
+        if (ignoreOverlapsInExpirationCheck && 
!ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION)
+        {
+            logger.warn("Not enabling aggressive SSTable expiration, as the 
system property '" + 
CassandraRelevantProperties.ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION.name() + 
"' is set to 'false'. " +
+                    "Set it to 'true' to enable aggressive SSTable 
expiration.");
+        }
+        this.ignoreOverlapsInExpirationCheck = 
ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION && ignoreOverlapsInExpirationCheck;
+    }
+
+    /**
+     * @return the scaling parameter W
+     * @param index
+     */
+    public int getScalingParameter(int index)
+    {
+        if (index < 0)
+            throw new IllegalArgumentException("Index should be >= 0: " + 
index);
+
+        return index < scalingParameters.length ? scalingParameters[index] : 
scalingParameters[scalingParameters.length - 1];
+    }
+
+    @Override
+    public String toString()
+    {
+        return String.format("Controller, m: %s, o: %s, Ws: %s",
+                             
FBUtilities.prettyPrintBinary(targetSSTableSizeMin, "B", ""),
+                             Arrays.toString(survivalFactors),
+                             printScalingParameters(scalingParameters));
+    }
+
+    public int getFanout(int index) {
+        int W = getScalingParameter(index);
+        return UnifiedCompactionStrategy.fanoutFromScalingParameter(W);
+    }
+
+    public int getThreshold(int index) {
+        int W = getScalingParameter(index);
+        return UnifiedCompactionStrategy.thresholdFromScalingParameter(W);
+    }
+
+    /**
+     * Calculate the number of shards to split the local token space in for 
the given sstable density.
+     * This is calculated as a power-of-two multiple of baseShardCount, so 
that the expected size of resulting sstables
+     * is between targetSSTableSizeMin and 2*targetSSTableSizeMin (in other 
words, sqrt(0.5) * targetSSTableSize and
+     * sqrt(2) * targetSSTableSize), with a minimum of baseShardCount shards 
for smaller sstables.
+     *
+     * Note that to get the sstables resulting from this splitting within the 
bounds, the density argument must be
+     * normalized to the span that is being split. In other words, if no disks 
are defined, the density should be
+     * scaled by the token coverage of the locally-owned ranges. If multiple 
data directories are defined, the density
+     * should be scaled by the token coverage of the respective data 
directory. That is localDensity = size / span,
+     * where the span is normalized so that span = 1 when the data covers the 
range that is being split.
+     */
+    public int getNumShards(double localDensity)
+    {
+        // How many we would have to aim for the target size. Divided by the 
base shard count, so that we can ensure
+        // the result is a multiple of it by multiplying back below.
+        double count = localDensity / (targetSSTableSizeMin * baseShardCount);
+        if (count > MAX_SHARD_SPLIT)
+            count = MAX_SHARD_SPLIT;
+        assert !(count < 0);    // Must be positive, 0 or NaN, which should 
translate to baseShardCount
+
+        // Make it a power of two multiple of the base count so that split 
points for lower levels remain split points for higher.
+        // The conversion to int and highestOneBit round down, for which we 
compensate by using the sqrt(0.5) multiplier
+        // already applied in targetSSTableSizeMin.
+        // Setting the bottom bit to 1 ensures the result is at least 
baseShardCount.
+        int shards = baseShardCount * Integer.highestOneBit((int) count | 1);
+        logger.debug("Shard count {} for density {}, {} times target {}",
+                     shards,
+                     FBUtilities.prettyPrintBinary(localDensity, "B", " "),
+                     localDensity / targetSSTableSizeMin,
+                     FBUtilities.prettyPrintBinary(targetSSTableSizeMin, "B", 
" "));
+        return shards;
+    }
+
+    /**
+     * @return the survival factor o
+     * @param index
+     */
+    public double getSurvivalFactor(int index)
+    {
+        if (index < 0)
+            throw new IllegalArgumentException("Index should be >= 0: " + 
index);
+
+        return index < survivalFactors.length ? survivalFactors[index] : 
survivalFactors[survivalFactors.length - 1];
+    }
+
+    /**
+     * Return the flush sstable size in bytes.
+     *
+     * This is usually obtained from the observed sstable flush sizes, 
refreshed when it differs significantly
+     * from the current values.
+     * It can also be set by the user in the options.
+     *
+     * @return the flush size in bytes.
+     */
+    public long getFlushSizeBytes()
+    {
+        if (flushSizeOverride > 0)
+            return flushSizeOverride;
+
+        double envFlushSize = cfs.metric.flushSizeOnDisk.get();
+        if (currentFlushSize == 0 || Math.abs(1 - (currentFlushSize / 
envFlushSize)) > 0.5)
+        {
+            // The current size is not initialized, or it differs by over 50% 
from the observed.
+            // Use the observed size rounded up to a whole megabyte.
+            currentFlushSize = ((long) (Math.ceil(Math.scalb(envFlushSize, 
-20)))) << 20;
+        }
+        return currentFlushSize;
+    }
+
+    /**
+     * @return whether is allowed to drop expired SSTables without checking if 
partition keys appear in other SSTables.
+     * Same behavior as in TWCS.
+     */
+    public boolean getIgnoreOverlapsInExpirationCheck()
+    {
+        return ignoreOverlapsInExpirationCheck;
+    }
+
+    public long getExpiredSSTableCheckFrequency()
+    {
+        return expiredSSTableCheckFrequency;
+    }
+
+    /**
+     * The strategy will call this method each time {@link 
UnifiedCompactionStrategy#getNextBackgroundTask} is called.
+     */
+    public void onStrategyBackgroundTaskRequest()
+    {
+    }
+
+    /**
+     * Returns a maximum bucket index for the given data size and fanout.
+     */
+    private int maxBucketIndex(long totalLength, int fanout)
+    {
+        double o = getSurvivalFactor(0);
+        long m = getFlushSizeBytes();
+        return Math.max(0, (int) Math.floor((Math.log(totalLength) - 
Math.log(m)) / (Math.log(fanout) - Math.log(o))));
+    }
+
+    public static Controller fromOptions(ColumnFamilyStore cfs, Map<String, 
String> options)
+    {
+        int[] Ws = 
parseScalingParameters(options.getOrDefault(SCALING_PARAMETERS_OPTION, 
DEFAULT_SCALING_PARAMETERS));
+
+        long flushSizeOverride = 
FBUtilities.parseHumanReadableBytes(options.getOrDefault(FLUSH_SIZE_OVERRIDE_OPTION,
 "0MiB"));
+        int maxSSTablesToCompact = 
Integer.parseInt(options.getOrDefault(MAX_SSTABLES_TO_COMPACT_OPTION, "0"));
+        long expiredSSTableCheckFrequency = 
options.containsKey(EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_OPTION)
+                ? 
Long.parseLong(options.get(EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_OPTION))
+                : DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS;
+        boolean ignoreOverlapsInExpirationCheck = 
options.containsKey(ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_OPTION)
+                ? 
Boolean.parseBoolean(options.get(ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_OPTION))
+                : DEFAULT_ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION;
+
+        int baseShardCount;
+        if (options.containsKey(BASE_SHARD_COUNT_OPTION))
+        {
+            baseShardCount = 
Integer.parseInt(options.get(BASE_SHARD_COUNT_OPTION));
+        }
+        else
+        {
+            if (SchemaConstants.isSystemKeyspace(cfs.getKeyspaceName()) || 
(cfs.getDiskBoundaries().positions != null && 
cfs.getDiskBoundaries().positions.size() > 1))
+                baseShardCount = 1;
+            else
+                baseShardCount = DEFAULT_BASE_SHARD_COUNT;
+        }
+
+        long targetSStableSize = 
options.containsKey(TARGET_SSTABLE_SIZE_OPTION)
+                                 ? 
FBUtilities.parseHumanReadableBytes(options.get(TARGET_SSTABLE_SIZE_OPTION))
+                                 : DEFAULT_TARGET_SSTABLE_SIZE;
+
+        Overlaps.InclusionMethod inclusionMethod = 
options.containsKey(OVERLAP_INCLUSION_METHOD_OPTION)
+                                                   ? 
Overlaps.InclusionMethod.valueOf(options.get(OVERLAP_INCLUSION_METHOD_OPTION).toUpperCase())
+                                                   : 
DEFAULT_OVERLAP_INCLUSION_METHOD;
+
+        return new Controller(cfs,
+                              MonotonicClock.Global.preciseTime,
+                              Ws,
+                              DEFAULT_SURVIVAL_FACTORS,
+                              flushSizeOverride,
+                              maxSSTablesToCompact,
+                              expiredSSTableCheckFrequency,
+                              ignoreOverlapsInExpirationCheck,
+                              baseShardCount,
+                              targetSStableSize,
+                              inclusionMethod);
+    }
+
+    public static Map<String, String> validateOptions(Map<String, String> 
options) throws ConfigurationException
+    {
+        String nonPositiveErr = "Invalid configuration, %s should be positive: 
%d";
+        String booleanParseErr = "%s should either be 'true' or 'false', not 
%s";
+        String intParseErr = "%s is not a parsable int (base10) for %s";
+        String longParseErr = "%s is not a parsable long (base10) for %s";
+        String sizeUnacceptableErr = "%s %s is not acceptable, size must be at 
least %s";
+        String invalidSizeErr = "%s %s is not a valid size in bytes: %s";
+        options = new HashMap<>(options);
+        String s;
+
+        s = options.remove(SCALING_PARAMETERS_OPTION);
+        if (s != null)
+            parseScalingParameters(s);
+
+        s = options.remove(BASE_SHARD_COUNT_OPTION);
+        if (s != null)
+        {
+            try
+            {
+                int numShards = Integer.parseInt(s);
+                if (numShards <= 0)
+                    throw new 
ConfigurationException(String.format(nonPositiveErr,
+                                                                   
BASE_SHARD_COUNT_OPTION,
+                                                                   numShards));
+            }
+            catch (NumberFormatException e)
+            {
+                throw new ConfigurationException(String.format(intParseErr, s, 
BASE_SHARD_COUNT_OPTION), e);
+            }
+        }
+
+        s = options.remove(TARGET_SSTABLE_SIZE_OPTION);
+        if (s != null)
+        {
+            try
+            {
+                long targetSSTableSize = 
FBUtilities.parseHumanReadableBytes(s);
+                if (targetSSTableSize < MIN_TARGET_SSTABLE_SIZE)
+                {
+                    throw new 
ConfigurationException(String.format(sizeUnacceptableErr,
+                                                                   
TARGET_SSTABLE_SIZE_OPTION,
+                                                                   s,
+                                                                   
FBUtilities.prettyPrintBinary(MIN_TARGET_SSTABLE_SIZE, "B", "")));
+                }
+            }
+            catch (NumberFormatException e)
+            {
+                throw new ConfigurationException(String.format(invalidSizeErr,
+                                                               
TARGET_SSTABLE_SIZE_OPTION,
+                                                               s,
+                                                               e.getMessage()),
+                                                 e);
+            }
+        }
+
+        s = options.remove(FLUSH_SIZE_OVERRIDE_OPTION);
+        if (s != null)
+        {
+            try
+            {
+                long flushSize = FBUtilities.parseHumanReadableBytes(s);
+                if (flushSize < MIN_TARGET_SSTABLE_SIZE)
+                    throw new 
ConfigurationException(String.format(sizeUnacceptableErr,
+                                                                   
FLUSH_SIZE_OVERRIDE_OPTION,
+                                                                   s,
+                                                                   
FBUtilities.prettyPrintBinary(MIN_TARGET_SSTABLE_SIZE, "B", "")));
+            }
+            catch (NumberFormatException e)
+            {
+                throw new ConfigurationException(String.format(invalidSizeErr,
+                                                               
FLUSH_SIZE_OVERRIDE_OPTION,
+                                                               s,
+                                                               e.getMessage()),
+                                                 e);
+            }
+        }
+
+        s = options.remove(MAX_SSTABLES_TO_COMPACT_OPTION);
+        if (s != null)
+        {
+             try
+             {
+                 Integer.parseInt(s); // values less than or equal to 0 enable 
the default
+             }
+             catch (NumberFormatException e)
+             {
+                 throw new ConfigurationException(String.format(intParseErr,
+                                                                s,
+                                                                
MAX_SSTABLES_TO_COMPACT_OPTION),
+                                                  e);
+             }
+        }
+        s = options.remove(EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_OPTION);
+        if (s != null)
+        {
+            try
+            {
+                long expiredSSTableCheckFrequency = Long.parseLong(s);
+                if (expiredSSTableCheckFrequency <= 0)
+                    throw new 
ConfigurationException(String.format(nonPositiveErr,
+                                                                   
EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_OPTION,
+                                                                   
expiredSSTableCheckFrequency));
+            }
+            catch (NumberFormatException e)
+            {
+                throw new ConfigurationException(String.format(longParseErr,
+                                                               s,
+                                                               
EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_OPTION),
+                                                 e);
+            }
+        }
+
+        s = options.remove(ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_OPTION);
+        if (s != null && !s.equalsIgnoreCase("true") && 
!s.equalsIgnoreCase("false"))
+        {
+            throw new ConfigurationException(String.format(booleanParseErr,
+                                                           
ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_OPTION, s));
+        }
+
+        s = options.remove(OVERLAP_INCLUSION_METHOD_OPTION);
+        if (s != null)
+        {
+            try
+            {
+                Overlaps.InclusionMethod.valueOf(s.toUpperCase());
+            }
+            catch (IllegalArgumentException e)
+            {
+                throw new ConfigurationException(String.format("Invalid 
overlap inclusion method %s. The valid options are %s.",
+                                                               s,
+                                                               
Arrays.toString(Overlaps.InclusionMethod.values())));
+            }
+        }
+
+        return options;
+    }
+
+    // The methods below are implemented here (rather than directly in UCS) to 
aid testability.
+

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to