Maxwell-Guo commented on code in PR #2287:
URL: https://github.com/apache/cassandra/pull/2287#discussion_r1244614771


##########
src/java/org/apache/cassandra/db/compaction/UnifiedCompactionStrategy.java:
##########
@@ -0,0 +1,865 @@
+/*
+ * Copyright DataStax, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
+import org.apache.cassandra.db.commitlog.IntervalSet;
+import org.apache.cassandra.db.compaction.unified.Controller;
+import org.apache.cassandra.db.compaction.unified.ShardedMultiWriter;
+import org.apache.cassandra.db.compaction.unified.UnifiedCompactionTask;
+import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.index.Index;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.Clock;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Overlaps;
+import org.apache.cassandra.utils.TimeUUID;
+
+/**
+ * The design of the unified compaction strategy is described in the 
accompanying UnifiedCompactionStrategy.md.
+ *
+ * See CEP-26: 
https://cwiki.apache.org/confluence/display/CASSANDRA/CEP-26%3A+Unified+Compaction+Strategy
+ */
+public class UnifiedCompactionStrategy extends AbstractCompactionStrategy
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(UnifiedCompactionStrategy.class);
+
+    static final int MAX_LEVELS = 32;   // This is enough for a few petabytes 
of data (with the worst case fan factor
+    // at W=0 this leaves room for 2^32 sstables, presumably of at least 1MB 
each).
+
+    private static final Pattern SCALING_PARAMETER_PATTERN = 
Pattern.compile("(N)|L(\\d+)|T(\\d+)|([+-]?\\d+)");
+    private static final String SCALING_PARAMETER_PATTERN_SIMPLIFIED = 
SCALING_PARAMETER_PATTERN.pattern()
+                                                                               
                 .replaceAll("[()]", "")
+
+                                                                               
                 .replace("\\d", "[0-9]");
+
+    private final Controller controller;
+
+    private volatile ShardManager shardManager;
+
+    private long lastExpiredCheck;
+
+    protected volatile int estimatedRemainingTasks;
+    @VisibleForTesting
+    protected final Set<SSTableReader> sstables = new HashSet<>();
+
+    public UnifiedCompactionStrategy(ColumnFamilyStore cfs, Map<String, 
String> options)
+    {
+        this(cfs, options, Controller.fromOptions(cfs, options));
+    }
+
+    public UnifiedCompactionStrategy(ColumnFamilyStore cfs, Map<String, 
String> options, Controller controller)
+    {
+        super(cfs, options);
+        this.controller = controller;
+        estimatedRemainingTasks = 0;
+    }
+
+    public static Map<String, String> validateOptions(Map<String, String> 
options) throws ConfigurationException
+    {
+        return 
Controller.validateOptions(AbstractCompactionStrategy.validateOptions(options));
+    }
+
+    public static int fanoutFromScalingParameter(int w)
+    {
+        return w < 0 ? 2 - w : 2 + w; // see formula in design doc
+    }
+
+    public static int thresholdFromScalingParameter(int w)
+    {
+        return w <= 0 ? 2 : 2 + w; // see formula in design doc
+    }
+
+    public static int parseScalingParameter(String value)
+    {
+        Matcher m = SCALING_PARAMETER_PATTERN.matcher(value);
+        if (!m.matches())
+            throw new ConfigurationException("Scaling parameter " + value + " 
must match " + SCALING_PARAMETER_PATTERN_SIMPLIFIED);
+
+        if (m.group(1) != null)
+            return 0;
+        else if (m.group(2) != null)
+            return 2 - atLeast2(Integer.parseInt(m.group(2)), value);
+        else if (m.group(3) != null)
+            return atLeast2(Integer.parseInt(m.group(3)), value) - 2;
+        else
+            return Integer.parseInt(m.group(4));
+    }
+
+    private static int atLeast2(int value, String str)
+    {
+        if (value < 2)
+            throw new ConfigurationException("Fan factor cannot be lower than 
2 in " + str);
+        return value;
+    }
+
+    public static String printScalingParameter(int w)
+    {
+        if (w < 0)
+            return "L" + Integer.toString(2 - w);
+        else if (w > 0)
+            return "T" + Integer.toString(w + 2);
+        else
+            return "N";
+    }
+
+    @Override
+    public synchronized Collection<AbstractCompactionTask> getMaximalTask(long 
gcBefore, boolean splitOutput)
+    {
+        maybeUpdateShardManager();
+        // The tasks are split by repair status and disk, as well as in 
non-overlapping sections to enable some
+        // parallelism (to the amount that L0 sstables are split, i.e. at 
least base_shard_count). The result will be
+        // split across shards according to its density. Depending on the 
parallelism, the operation may require up to
+        // 100% extra space to complete.
+        List<AbstractCompactionTask> tasks = new ArrayList<>();
+        List<Set<SSTableReader>> nonOverlapping = 
splitInNonOverlappingSets(filterSuspectSSTables(getSSTables()));
+        for (Set<SSTableReader> set : nonOverlapping)
+        {
+            @SuppressWarnings("resource")   // closed by the returned task
+            LifecycleTransaction txn = cfs.getTracker().tryModify(set, 
OperationType.COMPACTION);
+            if (txn != null)
+                tasks.add(createCompactionTask(txn, gcBefore));
+        }
+        return tasks;
+    }
+
+    private static List<Set<SSTableReader>> 
splitInNonOverlappingSets(Collection<SSTableReader> sstables)
+    {
+        List<Set<SSTableReader>> overlapSets = 
Overlaps.constructOverlapSets(new ArrayList<>(sstables),
+                                                                             
UnifiedCompactionStrategy::startsAfter,
+                                                                             
SSTableReader.firstKeyComparator,
+                                                                             
SSTableReader.lastKeyComparator);
+        if (overlapSets.isEmpty())
+            return overlapSets;
+
+        Set<SSTableReader> group = overlapSets.get(0);
+        List<Set<SSTableReader>> groups = new ArrayList<>();
+        for (int i = 1; i < overlapSets.size(); ++i)
+        {
+            Set<SSTableReader> current = overlapSets.get(i);
+            if (Sets.intersection(current, group).isEmpty())
+            {
+                groups.add(group);
+                group = current;
+            }
+            else
+            {
+                group.addAll(current);
+            }
+        }
+        groups.add(group);
+        return groups;
+    }
+
+    @Override
+    @SuppressWarnings("resource")   // transaction closed by the returned task
+    public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> 
sstables, final long gcBefore)
+    {
+        assert !sstables.isEmpty(); // checked for by CM.submitUserDefined
+
+        LifecycleTransaction transaction = 
cfs.getTracker().tryModify(sstables, OperationType.COMPACTION);
+        if (transaction == null)
+        {
+            logger.trace("Unable to mark {} for compaction; probably a 
background compaction got to it first.  You can disable background compactions 
temporarily if this is a problem", sstables);
+            return null;
+        }
+
+        return createCompactionTask(transaction, 
gcBefore).setUserDefined(true);
+    }
+
+    /**
+     * Returns a compaction task to run next.
+     *
+     * This method is synchronized because task creation is significantly more 
expensive in UCS; the strategy is
+     * stateless, therefore it has to compute the shard/bucket structure on 
each call.
+     *
+     * @param gcBefore throw away tombstones older than this
+     */
+    @Override
+    public synchronized UnifiedCompactionTask getNextBackgroundTask(long 
gcBefore)
+    {
+        controller.onStrategyBackgroundTaskRequest();
+
+        while (true)
+        {
+            CompactionPick pick = getNextCompactionPick(gcBefore);
+            if (pick == null)
+                return null;
+            UnifiedCompactionTask task = createCompactionTask(pick, gcBefore);
+            if (task != null)
+                return task;
+        }
+    }
+
+    @SuppressWarnings("resource")   // transaction closed by the returned task
+    private UnifiedCompactionTask createCompactionTask(CompactionPick pick, 
long gcBefore)
+    {
+        Preconditions.checkNotNull(pick);
+        Preconditions.checkArgument(!pick.isEmpty());
+
+        LifecycleTransaction transaction = cfs.getTracker().tryModify(pick,
+                                                                      
OperationType.COMPACTION);
+        if (transaction != null)
+        {
+            return createCompactionTask(transaction, gcBefore);
+        }
+        else
+        {
+            // This can happen e.g. due to a race with upgrade tasks
+            logger.error("Failed to submit compaction {} because a transaction 
could not be created. If this happens frequently, it should be reported", pick);
+            // FIXME: Needs the sstable removal race fix
+            return null;
+        }
+    }
+
+    /**
+     * Create the sstable writer used for flushing.
+     *
+     * @return an sstable writer that will split sstables into a number of 
shards as calculated by the controller for
+     *         the expected flush density.
+     */
+    @Override
+    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor,
+                                                       long keyCount,
+                                                       long repairedAt,
+                                                       TimeUUID pendingRepair,
+                                                       boolean isTransient,
+                                                       
IntervalSet<CommitLogPosition> commitLogPositions,
+                                                       int sstableLevel,
+                                                       SerializationHeader 
header,
+                                                       Collection<Index> 
indexes,
+                                                       LifecycleNewTracker 
lifecycleNewTracker)
+    {
+        // FIXME: needs the metadata collector fix
+        ShardManager shardManager = getShardManager();
+        double flushDensity = cfs.metric.flushSizeOnDisk.get() / 
shardManager.localSpaceCoverage();
+        ShardTracker boundaries = 
shardManager.boundaries(controller.getNumShards(flushDensity));
+        return new ShardedMultiWriter(cfs,
+                                      descriptor,
+                                      keyCount,
+                                      repairedAt,
+                                      pendingRepair,
+                                      isTransient,
+                                      commitLogPositions,
+                                      header,
+                                      indexes,
+                                      lifecycleNewTracker,
+                                      boundaries);
+    }
+
+    /**
+     * Create the task that in turns creates the sstable writer used for 
compaction.
+     *
+     * @return a sharded compaction task that in turn will create a sharded 
compaction writer.
+     */
+    private UnifiedCompactionTask createCompactionTask(LifecycleTransaction 
transaction, long gcBefore)
+    {
+        return new UnifiedCompactionTask(cfs, this, transaction, gcBefore, 
getShardManager());
+    }
+
+    private void maybeUpdateShardManager()
+    {
+        if (shardManager != null && 
!shardManager.isOutOfDate(StorageService.instance.getTokenMetadata().getRingVersion()))
+            return; // the disk boundaries (and thus the local ranges too) 
have not changed since the last time we calculated
+
+        synchronized (this)
+        {
+            // Recheck after entering critical section, another thread may 
have beaten us to it.
+            while (shardManager == null || 
shardManager.isOutOfDate(StorageService.instance.getTokenMetadata().getRingVersion()))
+                shardManager = ShardManager.create(cfs);
+            // Note: this can just as well be done without the synchronization 
(races would be benign, just doing some
+            // redundant work). For the current usages of this blocking is 
fine and expected to perform no worse.
+        }
+    }
+
+    @VisibleForTesting
+    ShardManager getShardManager()
+    {
+        maybeUpdateShardManager();
+        return shardManager;
+    }
+
+    /**
+     * Selects a compaction to run next.
+     */
+    @VisibleForTesting
+    CompactionPick getNextCompactionPick(long gcBefore)
+    {
+        SelectionContext context = new SelectionContext(controller);
+        List<SSTableReader> suitable = getCompactableSSTables(getSSTables(), 
UnifiedCompactionStrategy::isSuitableForCompaction);
+        Set<SSTableReader> expired = maybeGetExpiredSSTables(gcBefore, 
suitable);
+        suitable.removeAll(expired);
+
+        CompactionPick selected = chooseCompactionPick(suitable, context);
+        estimatedRemainingTasks = context.estimatedRemainingTasks;
+        if (selected == null)
+        {
+            if (expired.isEmpty())
+                return null;
+            else
+                return new CompactionPick(-1, -1, expired);
+        }
+
+        selected.addAll(expired);
+        return selected;
+    }
+
+    private Set<SSTableReader> maybeGetExpiredSSTables(long gcBefore, 
List<SSTableReader> suitable)
+    {
+        Set<SSTableReader> expired;
+        long ts = Clock.Global.currentTimeMillis();
+        if (ts - lastExpiredCheck > 
controller.getExpiredSSTableCheckFrequency())
+        {
+            lastExpiredCheck = ts;
+            expired = CompactionController.getFullyExpiredSSTables(cfs,
+                                                                   suitable,
+                                                                   
cfs.getOverlappingLiveSSTables(suitable),
+                                                                   gcBefore,
+                                                                   
controller.getIgnoreOverlapsInExpirationCheck());
+            if (logger.isTraceEnabled() && !expired.isEmpty())
+                logger.trace("Expiration check for {}.{} found {} fully 
expired SSTables",
+                             cfs.getKeyspaceName(),
+                             cfs.getTableName(),
+                             expired.size());
+        }
+        else
+            expired = Collections.emptySet();
+        return expired;
+    }
+
+    private CompactionPick chooseCompactionPick(List<SSTableReader> suitable, 
SelectionContext context)
+    {
+        // Select the level with the highest overlap; when multiple levels 
have the same overlap, prefer the lower one
+        // (i.e. reduction of RA for bigger token coverage).
+        int maxOverlap = -1;
+        CompactionPick selected = null;
+        for (Level level : formLevels(suitable))
+        {
+            CompactionPick pick = level.getCompactionPick(context);
+            int levelOverlap = level.maxOverlap;
+            if (levelOverlap > maxOverlap)
+            {
+                maxOverlap = levelOverlap;
+                selected = pick;
+            }
+        }
+        if (logger.isDebugEnabled() && selected != null)
+            logger.debug("Selected compaction on level {} overlap {} sstables 
{}",
+                         selected.level, selected.overlap, selected.size());
+
+        return selected;
+    }
+
+    @Override
+    public int getEstimatedRemainingTasks()
+    {
+        return estimatedRemainingTasks;
+    }
+
+    @Override
+    public long getMaxSSTableBytes()
+    {
+        return Long.MAX_VALUE;
+    }
+
+    @VisibleForTesting
+    public Controller getController()
+    {
+        return controller;
+    }
+
+    public static boolean isSuitableForCompaction(SSTableReader rdr)
+    {
+        return !rdr.isMarkedSuspect() && rdr.openReason != 
SSTableReader.OpenReason.EARLY;
+    }
+
+    @Override
+    public synchronized void addSSTable(SSTableReader added)
+    {
+        sstables.add(added);
+    }
+
+    @Override
+    public synchronized void removeSSTable(SSTableReader sstable)
+    {
+        sstables.remove(sstable);
+    }
+
+    @Override
+    protected synchronized Set<SSTableReader> getSSTables()
+    {
+        return ImmutableSet.copyOf(sstables);
+    }
+
+    /**
+     * @return a LinkedHashMap of arenas with buckets where order of arenas 
are preserved
+     */
+    @VisibleForTesting
+    List<Level> getLevels()
+    {
+        return getLevels(getSSTables(), 
UnifiedCompactionStrategy::isSuitableForCompaction);
+    }
+
+    /**
+     * Groups the sstables passed in into arenas and buckets. This is used by 
the strategy to determine
+     * new compactions, and by external tools in CNDB to analyze the strategy 
decisions.

Review Comment:
   Does CNDB is some thing that developed by datastax ? I have saw this keyword 
 in one of the  patchs of sai



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to