blambov commented on code in PR #2287:
URL: https://github.com/apache/cassandra/pull/2287#discussion_r1242183162


##########
src/java/org/apache/cassandra/config/CassandraRelevantProperties.java:
##########
@@ -725,6 +731,56 @@ public long getLong(long overrideDefaultValue)
         return LONG_CONVERTER.convert(value);
     }
 
+    /**
+     * Gets the value of a system property as a double.
+     * @return System property value if it exists, defaultValue otherwise. 
Throws an exception if no default value is set.
+     */
+    public double getDouble()
+    {
+        String value = System.getProperty(key);
+        if (value == null && defaultVal == null)
+            throw new ConfigurationException("Missing property value or 
default value is not set: " + key);
+        return DOUBLE_CONVERTER.convert(value == null ? defaultVal : value);
+    }
+
+    /**
+     * Gets the value of a system property as a double.
+     * @return system property long value if it exists, defaultValue otherwise.
+     */
+    public double getLong(double overrideDefaultValue)

Review Comment:
   Yes, changed.



##########
src/java/org/apache/cassandra/config/CassandraRelevantProperties.java:
##########
@@ -516,6 +517,11 @@ public enum CassandraRelevantProperties
     TRIGGERS_DIR("cassandra.triggers_dir"),
     TRUNCATE_BALLOT_METADATA("cassandra.truncate_ballot_metadata"),
     TYPE_UDT_CONFLICT_BEHAVIOR("cassandra.type.udt.conflict_behavior"),
+    UCS_BASE_SHARD_COUNT("unified_compaction.base_shard_count", "4"),

Review Comment:
   Added a reference to `Controller`, where they are defined and described.



##########
src/java/org/apache/cassandra/db/compaction/unified/Controller.java:
##########
@@ -0,0 +1,572 @@
+/*
+ * Copyright DataStax, Inc.

Review Comment:
   Done



##########
src/java/org/apache/cassandra/config/Config.java:
##########
@@ -1103,6 +1103,11 @@ public enum PaxosOnLinearizabilityViolation
     public volatile long min_tracked_partition_tombstone_count = 5000;
     public volatile boolean top_partitions_enabled = true;
 
+    /**
+     * Default compaction configuration, used if a table does not specify any.
+     */
+    public ParameterizedClass default_compaction = null;

Review Comment:
   Added an entry in `cassandra.yaml`.
   
   The flag specifies a default value for the compaction parameter. Could you 
elaborate on why it should be renamed?



##########
src/java/org/apache/cassandra/db/compaction/unified/Controller.java:
##########
@@ -0,0 +1,572 @@
+/*
+ * Copyright DataStax, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction.unified;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.compaction.UnifiedCompactionStrategy;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.utils.Overlaps;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MonotonicClock;
+
+/**
+* The controller provides compaction parameters to the unified compaction 
strategy
+*/
+public class Controller
+{
+    protected static final Logger logger = 
LoggerFactory.getLogger(Controller.class);
+
+    /**
+     * The scaling parameters W, one per bucket index and separated by a comma.
+     * Higher indexes will use the value of the last index with a W specified.
+     */
+    final static String SCALING_PARAMETERS_OPTION = "scaling_parameters";
+    private final static String DEFAULT_SCALING_PARAMETERS = 
CassandraRelevantProperties.UCS_SCALING_PARAMETER.getString();
+
+    /**
+     * Override for the flush size in MB. The database should be able to 
calculate this from executing flushes, this
+     * should only be necessary in rare cases.
+     */
+    static final String FLUSH_SIZE_OVERRIDE_OPTION = "flush_size_override";
+
+    static final String BASE_SHARD_COUNT_OPTION = "base_shard_count";
+    /**
+     * Default base shard count, used when a base count is not explicitly 
supplied. This value applies as long as the
+     * table is not a system one, and directories are not defined.
+     *
+     * For others a base count of 1 is used as system tables are usually small 
and do not need as much compaction
+     * parallelism, while having directories defined provides for parallelism 
in a different way.
+     */
+    public static final int DEFAULT_BASE_SHARD_COUNT = 
CassandraRelevantProperties.UCS_BASE_SHARD_COUNT.getInt();
+
+    static final String TARGET_SSTABLE_SIZE_OPTION = "target_sstable_size";
+    public static final long DEFAULT_TARGET_SSTABLE_SIZE = 
CassandraRelevantProperties.UCS_TARGET_SSTABLE_SIZE.getSizeInBytes();
+    static final long MIN_TARGET_SSTABLE_SIZE = 1L << 20;
+
+    /**
+     * This parameter is intended to modify the shape of the LSM by taking 
into account the survival ratio of data, for now it is fixed to one.
+     */
+    static final double DEFAULT_SURVIVAL_FACTOR = 
CassandraRelevantProperties.UCS_SURVIVAL_FACTOR.getDouble();
+    static final double[] DEFAULT_SURVIVAL_FACTORS = new double[] { 
DEFAULT_SURVIVAL_FACTOR };
+
+    /**
+     * The maximum number of sstables to compact in one operation.
+     *
+     * This is expected to be large and never be reached, but compaction going 
very very late may cause the accumulation
+     * of thousands and even tens of thousands of sstables which may cause 
problems if compacted in one long operation.
+     * The default is chosen to be half of the maximum permitted space 
overhead when the source sstables are of the
+     * minimum sstable size.
+     *
+     * If the fanout factor is larger than the maximum number of sstables, the 
strategy will ignore the latter.
+     */
+    static final String MAX_SSTABLES_TO_COMPACT_OPTION = 
"max_sstables_to_compact";
+
+    static final String ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_OPTION = 
"unsafe_aggressive_sstable_expiration";
+    static final boolean ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION = 
CassandraRelevantProperties.ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION.getBoolean();
+    static final boolean DEFAULT_ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION = 
false;
+
+    static final int DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS = 60 * 10;
+    static final String EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_OPTION = 
"expired_sstable_check_frequency_seconds";
+
+    /** The maximum splitting factor for shards. The maximum number of shards 
is this number multiplied by the base count. */
+    static final double MAX_SHARD_SPLIT = 1048576;
+
+    /**
+     * Overlap inclusion method. NONE for participating sstables only (not 
recommended), SINGLE to only include sstables
+     * that overlap with participating (LCS-like, higher concurrency during 
upgrades but some double compaction),
+     * TRANSITIVE to include overlaps of overlaps (likely to trigger whole 
level compactions, safest).
+     */
+    static final String OVERLAP_INCLUSION_METHOD_OPTION = 
"overlap_inclusion_method";
+    static final Overlaps.InclusionMethod DEFAULT_OVERLAP_INCLUSION_METHOD =
+        
CassandraRelevantProperties.UCS_OVERLAP_INCLUSION_METHOD.getEnum(Overlaps.InclusionMethod.TRANSITIVE);
+
+    protected final ColumnFamilyStore cfs;
+    protected final MonotonicClock clock;
+    private final int[] scalingParameters;
+    protected final double[] survivalFactors;
+    protected final long flushSizeOverride;
+    protected volatile long currentFlushSize;
+    protected final int maxSSTablesToCompact;
+    protected final long expiredSSTableCheckFrequency;
+    protected final boolean ignoreOverlapsInExpirationCheck;
+
+    protected final int baseShardCount;
+
+    protected final double targetSSTableSizeMin;
+
+    protected final Overlaps.InclusionMethod overlapInclusionMethod;
+
+    Controller(ColumnFamilyStore cfs,
+               MonotonicClock clock,
+               int[] scalingParameters,
+               double[] survivalFactors,
+               long flushSizeOverride,
+               int maxSSTablesToCompact,
+               long expiredSSTableCheckFrequency,
+               boolean ignoreOverlapsInExpirationCheck,
+               int baseShardCount,
+               double targetSStableSize,
+               Overlaps.InclusionMethod overlapInclusionMethod)
+    {
+        this.cfs = cfs;
+        this.clock = clock;
+        this.scalingParameters = scalingParameters;
+        this.survivalFactors = survivalFactors;
+        this.flushSizeOverride = flushSizeOverride;
+        this.currentFlushSize = flushSizeOverride;
+        this.expiredSSTableCheckFrequency = 
TimeUnit.MILLISECONDS.convert(expiredSSTableCheckFrequency, TimeUnit.SECONDS);
+        this.baseShardCount = baseShardCount;
+        this.targetSSTableSizeMin = targetSStableSize * Math.sqrt(0.5);
+        this.overlapInclusionMethod = overlapInclusionMethod;
+
+        if (maxSSTablesToCompact <= 0)
+            maxSSTablesToCompact = Integer.MAX_VALUE;
+
+        this.maxSSTablesToCompact = maxSSTablesToCompact;
+
+        if (ignoreOverlapsInExpirationCheck && 
!ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION)
+        {
+            logger.warn("Not enabling aggressive SSTable expiration, as the 
system property '" + 
CassandraRelevantProperties.ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION.name() + 
"' is set to 'false'. " +
+                    "Set it to 'true' to enable aggressive SSTable 
expiration.");
+        }
+        this.ignoreOverlapsInExpirationCheck = 
ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION && ignoreOverlapsInExpirationCheck;
+    }
+
+    /**
+     * @return the scaling parameter W
+     * @param index
+     */
+    public int getScalingParameter(int index)
+    {
+        if (index < 0)
+            throw new IllegalArgumentException("Index should be >= 0: " + 
index);
+
+        return index < scalingParameters.length ? scalingParameters[index] : 
scalingParameters[scalingParameters.length - 1];
+    }
+
+    @Override
+    public String toString()
+    {
+        return String.format("Controller, m: %s, o: %s, Ws: %s",
+                             
FBUtilities.prettyPrintBinary(targetSSTableSizeMin, "B", ""),
+                             Arrays.toString(survivalFactors),
+                             printScalingParameters(scalingParameters));
+    }
+
+    public int getFanout(int index) {
+        int W = getScalingParameter(index);
+        return UnifiedCompactionStrategy.fanoutFromScalingParameter(W);
+    }
+
+    public int getThreshold(int index) {
+        int W = getScalingParameter(index);
+        return UnifiedCompactionStrategy.thresholdFromScalingParameter(W);
+    }
+
+    /**
+     * Calculate the number of shards to split the local token space in for 
the given sstable density.
+     * This is calculated as a power-of-two multiple of baseShardCount, so 
that the expected size of resulting sstables
+     * is between targetSSTableSizeMin and 2*targetSSTableSizeMin (in other 
words, sqrt(0.5) * targetSSTableSize and
+     * sqrt(2) * targetSSTableSize), with a minimum of baseShardCount shards 
for smaller sstables.
+     *
+     * Note that to get the sstables resulting from this splitting within the 
bounds, the density argument must be
+     * normalized to the span that is being split. In other words, if no disks 
are defined, the density should be
+     * scaled by the token coverage of the locally-owned ranges. If multiple 
data directories are defined, the density
+     * should be scaled by the token coverage of the respective data 
directory. That is localDensity = size / span,
+     * where the span is normalized so that span = 1 when the data covers the 
range that is being split.
+     */
+    public int getNumShards(double localDensity)
+    {
+        // How many we would have to aim for the target size. Divided by the 
base shard count, so that we can ensure
+        // the result is a multiple of it by multiplying back below.
+        double count = localDensity / (targetSSTableSizeMin * baseShardCount);
+        if (count > MAX_SHARD_SPLIT)
+            count = MAX_SHARD_SPLIT;
+        assert !(count < 0);    // Must be positive, 0 or NaN, which should 
translate to baseShardCount
+
+        // Make it a power of two multiple of the base count so that split 
points for lower levels remain split points for higher.
+        // The conversion to int and highestOneBit round down, for which we 
compensate by using the sqrt(0.5) multiplier
+        // already applied in targetSSTableSizeMin.
+        // Setting the bottom bit to 1 ensures the result is at least 
baseShardCount.
+        int shards = baseShardCount * Integer.highestOneBit((int) count | 1);
+        logger.debug("Shard count {} for density {}, {} times target {}",
+                     shards,
+                     FBUtilities.prettyPrintBinary(localDensity, "B", " "),
+                     localDensity / targetSSTableSizeMin,
+                     FBUtilities.prettyPrintBinary(targetSSTableSizeMin, "B", 
" "));
+        return shards;
+    }
+
+    /**
+     * @return the survival factor o
+     * @param index
+     */
+    public double getSurvivalFactor(int index)
+    {
+        if (index < 0)
+            throw new IllegalArgumentException("Index should be >= 0: " + 
index);
+
+        return index < survivalFactors.length ? survivalFactors[index] : 
survivalFactors[survivalFactors.length - 1];
+    }
+
+    /**
+     * Return the flush sstable size in bytes.
+     *
+     * This is usually obtained from the observed sstable flush sizes, 
refreshed when it differs significantly
+     * from the current values.
+     * It can also be set by the user in the options.
+     *
+     * @return the flush size in bytes.
+     */
+    public long getFlushSizeBytes()
+    {
+        if (flushSizeOverride > 0)
+            return flushSizeOverride;
+
+        double envFlushSize = cfs.metric.flushSizeOnDisk.get();
+        if (currentFlushSize == 0 || Math.abs(1 - (currentFlushSize / 
envFlushSize)) > 0.5)
+        {
+            // The current size is not initialized, or it differs by over 50% 
from the observed.
+            // Use the observed size rounded up to a whole megabyte.
+            currentFlushSize = ((long) (Math.ceil(Math.scalb(envFlushSize, 
-20)))) << 20;
+        }
+        return currentFlushSize;
+    }
+
+    /**
+     * @return whether is allowed to drop expired SSTables without checking if 
partition keys appear in other SSTables.
+     * Same behavior as in TWCS.
+     */
+    public boolean getIgnoreOverlapsInExpirationCheck()
+    {
+        return ignoreOverlapsInExpirationCheck;
+    }
+
+    public long getExpiredSSTableCheckFrequency()
+    {
+        return expiredSSTableCheckFrequency;
+    }
+
+    /**
+     * The strategy will call this method each time {@link 
UnifiedCompactionStrategy#getNextBackgroundTask} is called.
+     */
+    public void onStrategyBackgroundTaskRequest()
+    {

Review Comment:
   This method is not needed yet. Removed.



##########
test/data/legacy-sstables/da/legacy_tables/legacy_da_clust/da-1-bti-Data.db:
##########


Review Comment:
   As above, this test only matters when "da" becomes a legacy sstable for one 
of the next versions.



##########
test/unit/org/apache/cassandra/db/compaction/unified/ControllerTest.java:
##########
@@ -0,0 +1,336 @@
+/*
+ * Copyright DataStax, Inc.

Review Comment:
   Done



##########
test/unit/org/apache/cassandra/db/compaction/UnifiedCompactionStrategyTest.java:
##########
@@ -0,0 +1,913 @@
+/*
+ * Copyright DataStax, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Iterables;
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.BufferDecoratedKey;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.DiskBoundaries;
+import org.apache.cassandra.db.compaction.unified.Controller;
+import org.apache.cassandra.db.compaction.unified.UnifiedCompactionTask;
+import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.db.lifecycle.Tracker;
+import org.apache.cassandra.db.marshal.AsciiType;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Splitter;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Overlaps;
+import org.apache.cassandra.utils.Pair;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyDouble;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.RETURNS_SMART_NULLS;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.withSettings;
+
+/**
+ * The unified compaction strategy is described in this design document:
+ *
+ * See CEP-26: 
https://cwiki.apache.org/confluence/display/CASSANDRA/CEP-26%3A+Unified+Compaction+Strategy
+ */
+public class UnifiedCompactionStrategyTest
+{
+    private final static long ONE_MB = 1 << 20;
+
+    // Multiple disks can be used both with and without disk boundaries. We 
want to test both cases.
+
+    final String keyspace = "ks";
+    final String table = "tbl";
+
+    @Mock(answer = Answers.RETURNS_SMART_NULLS)
+    ColumnFamilyStore cfs;
+
+    @Mock(answer = Answers.RETURNS_SMART_NULLS)
+    CompactionStrategyManager csm;
+
+    ColumnFamilyStore.VersionedLocalRanges localRanges;
+
+    Tracker dataTracker;
+
+    long repairedAt;
+
+    IPartitioner partitioner;
+
+    Splitter splitter;
+
+    @BeforeClass
+    public static void setUpClass()
+    {
+        long seed = System.currentTimeMillis();
+        random.setSeed(seed);
+        System.out.println("Random seed: " + seed);
+
+        DatabaseDescriptor.daemonInitialization(); // because of all the 
static initialization in CFS
+        DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
+    }
+
+
+    static final JDKRandomGenerator random = new JDKRandomGenerator();
+
+    @Before
+    public void setUp()
+    {
+        setUp(1);
+    }
+
+    protected void setUp(int numShards)
+    {
+        MockitoAnnotations.initMocks(this);
+
+        TableMetadata metadata = TableMetadata.builder(keyspace, table)
+                                              .addPartitionKeyColumn("pk", 
AsciiType.instance)
+                                              .build();
+
+        dataTracker = Tracker.newDummyTracker();
+        repairedAt = System.currentTimeMillis();
+        partitioner = DatabaseDescriptor.getPartitioner();
+        splitter = partitioner.splitter().orElse(null);
+        if (numShards > 1)
+            assertNotNull("Splitter is required with multiple compaction 
shards", splitter);
+
+        when(cfs.getPartitioner()).thenReturn(partitioner);
+        localRanges = cfs.fullWeightedRange(0, partitioner);
+
+        when(cfs.metadata()).thenReturn(metadata);
+        when(cfs.getTableName()).thenReturn(table);
+        when(cfs.localRangesWeighted()).thenReturn(localRanges);
+        when(cfs.getTracker()).thenReturn(dataTracker);
+        when(cfs.getLiveSSTables()).thenAnswer(request -> 
dataTracker.getView().select(SSTableSet.LIVE));
+        when(cfs.getSSTables(any())).thenAnswer(request -> 
dataTracker.getView().select(request.getArgument(0)));
+        when(cfs.getCompactionStrategyManager()).thenReturn(csm);
+
+        DiskBoundaries db = new DiskBoundaries(cfs, new 
Directories.DataDirectory[0], 0);
+        when(cfs.getDiskBoundaries()).thenReturn(db);
+
+        when(csm.onlyPurgeRepairedTombstones()).thenReturn(false);
+    }
+
+    @Test
+    public void testNoSSTables()
+    {
+        Controller controller = Mockito.mock(Controller.class);
+        long minimalSizeBytes = 2 << 20;
+        when(controller.getScalingParameter(anyInt())).thenReturn(4);
+        when(controller.getSurvivalFactor(anyInt())).thenReturn(1.0);
+        when(controller.getMaxLevelDensity(anyInt(), 
anyDouble())).thenCallRealMethod();
+        when(controller.getSurvivalFactor(anyInt())).thenReturn(1.0);
+        when(controller.getNumShards(anyDouble())).thenReturn(1);
+        when(controller.getBaseSstableSize(anyInt())).thenReturn((double) 
minimalSizeBytes);
+        when(controller.maxConcurrentCompactions()).thenReturn(1000); // let 
it generate as many candidates as it can
+        when(controller.maxThroughput()).thenReturn(Double.MAX_VALUE);
+        when(controller.maxSSTablesToCompact()).thenReturn(1000);
+        when(controller.random()).thenCallRealMethod();
+
+        UnifiedCompactionStrategy strategy = new 
UnifiedCompactionStrategy(cfs, new HashMap<>(), controller);
+
+        assertNull(strategy.getNextBackgroundTask(FBUtilities.nowInSeconds()));
+        assertEquals(0, strategy.getEstimatedRemainingTasks());
+    }
+
+    @Test
+    public void testGetBucketsSameWUniqueArena()
+    {
+        final int m = 2; // minimal sorted run size in MB m
+        final Map<Integer, Integer> sstables = new TreeMap<>();
+
+        for (int i = 0; i < 20; i++)
+        {
+            int numSSTables = 2 + random.nextInt(18);
+            sstables.put(m * i, numSSTables);
+        }
+
+        // W = 3, o = 1 => F = 5, T = 5 => expected T sstables and 2 buckets: 
0-10m, 10-50m
+        testGetBucketsOneArena(sstables, new int[] { 3 }, m, new int[] { 5, 
5});
+
+        // W = 2, o = 1 => F = 4, T = 4 => expected T sstables and 3 buckets: 
0-8m, 8-32m, 32-128m
+        testGetBucketsOneArena(sstables, new int[] { 2 }, m, new int[] { 4, 4, 
4});
+
+        // W = 0, o = 1 => F = 2, T = 2 => expected 2 sstables and 5 buckets: 
0-4m, 4-8m, 8-16m, 16-32m, 32-64m
+        testGetBucketsOneArena(sstables, new int[] { 0 }, m, new int[] { 2, 2, 
2, 2, 2});
+
+        // W = -2, o = 1 => F = 4, T = 2 => expected 2 sstables and 3 buckets: 
0-8mb, 8-32m, 32-128m
+        testGetBucketsOneArena(sstables, new int[] { -2 }, m, new int[] { 2, 
2, 2});
+
+        // W = -3, o = 1 => F = 5, T = 2 => expected 2 sstables and 2 buckets: 
0-10m, 10-50m
+        testGetBucketsOneArena(sstables, new int[] { -3 }, m, new int[] { 2, 
2});
+
+        // remove sstables from 4m to 8m to create an empty bucket in the next 
call
+        sstables.remove(4); // 4m
+        sstables.remove(6); // 6m
+        sstables.remove(8); // 8m
+
+        // W = 0, o = 1 => F = 2, T = 2 => expected 2 sstables and 5 buckets: 
0-4m, 4-8m, 8-16m, 16-32m, 32-64m
+        testGetBucketsOneArena(sstables, new int[] { 0 }, m, new int[] { 2, 2, 
2, 2, 2});
+    }
+
+    @Test
+    public void testGetBucketsDifferentWsUniqueArena()
+    {
+        final int m = 2; // minimal sorted run size in MB m
+        final Map<Integer, Integer> sstables = new TreeMap<>();
+
+        for (int i : new int[] { 50, 100, 200, 400, 600, 800, 1000})
+        {
+            int numSSTables = 2 + random.nextInt(18);
+            sstables.put(i, numSSTables);
+        }
+
+        // W = [30, 2, -6], o = 1 => F = [32, 4, 8] , T = [32, 4, 2]  => 
expected 3 buckets: 0-64m, 64-256m 256-2048m
+        testGetBucketsOneArena(sstables, new int[]{ 30, 2, -6 }, m, new int[] 
{ 32, 4, 2});
+
+        // W = [30, 6, -8], o = 1 => F = [32, 8, 10] , T = [32, 8, 2]  => 
expected 3 buckets: 0-64m, 64-544m 544-5440m
+        testGetBucketsOneArena(sstables, new int[]{ 30, 6, -8 }, m, new int[] 
{ 32, 8, 2});
+
+        // W = [0, 0, 0, -2, -2], o = 1 => F = [2, 2, 2, 4, 4] , T = [2, 2, 2, 
2, 2]  => expected 6 buckets: 0-4m, 4-8m, 8-16m, 16-64m, 64-256m, 256-1024m
+        testGetBucketsOneArena(sstables, new int[]{ 0, 0, 0, -2, -2 }, m, new 
int[] { 2, 2, 2, 2, 2, 2});
+    }
+
+    private void testGetBucketsOneArena(Map<Integer, Integer> sstableMap, 
int[] Ws, int m, int[] expectedTs)
+    {
+        long minimalSizeBytes = m << 20;
+
+        Controller controller = Mockito.mock(Controller.class);
+        when(controller.getNumShards(anyDouble())).thenReturn(1);
+        when(controller.getBaseSstableSize(anyInt())).thenReturn((double) 
minimalSizeBytes);
+        when(controller.maxConcurrentCompactions()).thenReturn(1000); // let 
it generate as many candidates as it can
+        when(controller.maxThroughput()).thenReturn(Double.MAX_VALUE);
+        when(controller.maxSSTablesToCompact()).thenReturn(1000);
+
+        when(controller.getScalingParameter(anyInt())).thenAnswer(answer -> {
+            int index = answer.getArgument(0);
+            return Ws[index < Ws.length ? index : Ws.length - 1];
+        });
+        when(controller.getFanout(anyInt())).thenCallRealMethod();
+        when(controller.getThreshold(anyInt())).thenCallRealMethod();
+        when(controller.getMaxLevelDensity(anyInt(), 
anyDouble())).thenCallRealMethod();
+
+        when(controller.getSurvivalFactor(anyInt())).thenReturn(1.0);
+        when(controller.random()).thenCallRealMethod();
+
+        UnifiedCompactionStrategy strategy = new 
UnifiedCompactionStrategy(cfs, new HashMap<>(), controller);
+
+        IPartitioner partitioner = cfs.getPartitioner();
+        DecoratedKey first = new 
BufferDecoratedKey(partitioner.getMinimumToken(), ByteBuffer.allocate(0));
+        DecoratedKey last = new 
BufferDecoratedKey(partitioner.getMaximumToken(), ByteBuffer.allocate(0));
+
+        List<SSTableReader> sstables = new ArrayList<>();
+        long dataSetSizeBytes = 0;
+        for (Map.Entry<Integer, Integer> entry : sstableMap.entrySet())
+        {
+            for (int i = 0; i < entry.getValue(); i++)
+            {
+                // we want a number > 0 and < 1 so that the sstable has always 
some size and never crosses the boundary to the next bucket
+                // so we leave a 1% margin, picking a number from 0.01 to 0.99
+                double rand = 0.01 + 0.98 * random.nextDouble();
+                long sizeOnDiskBytes = (entry.getKey() << 20) + (long) 
(minimalSizeBytes * rand);
+                dataSetSizeBytes += sizeOnDiskBytes;
+                sstables.add(mockSSTable(sizeOnDiskBytes, 
System.currentTimeMillis(), first, last));
+            }
+        }
+        strategy.addSSTables(sstables);
+        dataTracker.addInitialSSTables(sstables);
+
+        List<UnifiedCompactionStrategy.Level> levels = strategy.getLevels();
+        assertEquals(expectedTs.length, levels.size());
+
+        for (int i = 0; i < expectedTs.length; i++)
+        {
+            UnifiedCompactionStrategy.Level level = levels.get(i);
+            assertEquals(i, level.getIndex());
+            UnifiedCompactionStrategy.SelectionContext context = new 
UnifiedCompactionStrategy.SelectionContext(strategy.getController());
+            UnifiedCompactionStrategy.CompactionPick pick = 
level.getCompactionPick(context);
+
+            assertEquals(level.getSSTables().size() >= expectedTs[i], pick != 
null);
+        }
+    }
+
+    @Test
+    public void testPreserveLayout_W2_947()
+    {
+        testPreserveLayout(2, 947);
+    }
+
+    @Test
+    public void testPreserveLayout_WM2_947()
+    {
+        testPreserveLayout(-2, 947);
+    }
+
+    @Test
+    public void testPreserveLayout_W2_251()
+    {
+        testPreserveLayout(2, 251);
+    }
+
+    @Test
+    public void testPreserveLayout_WM2_251()
+    {
+        testPreserveLayout(-2, 251);
+    }
+
+    @Test
+    public void testPreserveLayout_W2_320()
+    {
+        testPreserveLayout(2, 320);
+    }
+
+    @Test
+    public void testPreserveLayout_WM2_320()
+    {
+        testPreserveLayout(-2, 320);
+    }
+
+    @Test
+    public void testPreserveLayout_WM2_947_128()
+    {
+        testLayout(-2, 947, 128);
+    }
+
+    @Test
+    public void testPreserveLayout_WM2_947_64()
+    {
+        testLayout(-2, 947, 64);
+    }
+
+    public void testPreserveLayout(int W, int numSSTables)
+    {
+        testLayout(W, numSSTables, 10000);
+    }
+
+    @Test
+    public void testMaxSSTablesToCompact()
+    {
+        testLayout(2, 944,  60);
+        testLayout(2, 944, 1000);
+        testLayout(2, 944,  100);
+        testLayout(2, 803,  200);
+    }
+
+    public void testLayout(int W, int numSSTables, int maxSSTablesToCompact)
+    {
+        int F = 2 + Math.abs(W);
+        int T = W < 0 ? 2 : F;
+        final long minSstableSizeBytes = 2L << 20; // 2 MB
+        final int numShards = 1;
+        final int levels = (int) Math.floor(Math.log(numSSTables) / 
Math.log(F)) + 1;
+
+        Controller controller = Mockito.mock(Controller.class);
+        when(controller.getScalingParameter(anyInt())).thenReturn(W);
+        when(controller.getFanout(anyInt())).thenCallRealMethod();
+        when(controller.getThreshold(anyInt())).thenCallRealMethod();
+        when(controller.getMaxLevelDensity(anyInt(), 
anyDouble())).thenCallRealMethod();
+        when(controller.getSurvivalFactor(anyInt())).thenReturn(1.0);
+        when(controller.getNumShards(anyDouble())).thenReturn(numShards);
+        when(controller.getBaseSstableSize(anyInt())).thenReturn((double) 
minSstableSizeBytes);
+
+        if (maxSSTablesToCompact >= numSSTables)
+            when(controller.maxConcurrentCompactions()).thenReturn(levels * (W 
< 0 ? 1 : F)); // make sure the work is assigned to different levels
+        else
+            when(controller.maxConcurrentCompactions()).thenReturn(1000); // 
make sure the work is assigned to different levels
+
+        when(controller.maxThroughput()).thenReturn(Double.MAX_VALUE);
+        
when(controller.maxSSTablesToCompact()).thenReturn(maxSSTablesToCompact);
+        Random random = Mockito.mock(Random.class);
+        when(random.nextInt(anyInt())).thenReturn(0);
+        when(controller.random()).thenReturn(random);
+
+        UnifiedCompactionStrategy strategy = new 
UnifiedCompactionStrategy(cfs, new HashMap<>(), controller);
+        List<SSTableReader> allSstables = new ArrayList<>();
+
+        List<SSTableReader> sstables = mockSSTables(numSSTables,
+//                                                    minSstableSizeBytes,

Review Comment:
   Fixed



##########
src/java/org/apache/cassandra/db/compaction/ShardManager.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableList;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+
+public interface ShardManager
+{
+    /**
+     * Single-partition, and generally sstables with very few partitions, can 
cover very small sections of the token
+     * space, resulting in very high densities.
+     * Additionally, sstables that have completely fallen outside of the local 
token ranges will end up with a zero
+     * coverage.
+     * To avoid problems with both we check if coverage is below the minimum, 
and replace it with 1.
+     */
+    static final double MINIMUM_TOKEN_COVERAGE = Math.scalb(1.0, -48);
+
+    static ShardManager create(ColumnFamilyStore cfs)
+    {
+        final ImmutableList<PartitionPosition> diskPositions = 
cfs.getDiskBoundaries().positions;
+        ColumnFamilyStore.VersionedLocalRanges localRanges = 
cfs.localRangesWeighted();
+        IPartitioner partitioner = cfs.getPartitioner();
+
+        if (diskPositions != null && diskPositions.size() > 1)
+            return new ShardManagerDiskAware(localRanges, 
diskPositions.stream()
+                                                                       
.map(PartitionPosition::getToken)
+                                                                       
.collect(Collectors.toList()));
+        else if (partitioner.splitter().isPresent())
+            return new ShardManagerNoDisks(localRanges);
+        else
+            return new ShardManagerTrivial(partitioner);
+    }
+
+    boolean isOutOfDate(long ringVersion);
+
+    /**
+     * The token range fraction spanned by the given range, adjusted for the 
local range ownership.
+     */
+    double rangeSpanned(Range<Token> tableRange);
+
+    /**
+     * The total fraction of the token space covered by the local ranges.
+     */
+    double localSpaceCoverage();
+
+    /**
+     * The fraction of the token space covered by a shard set, i.e. the space 
that is split in the requested number of
+     * shards.
+     * If no disks are defined, this is the same as localSpaceCoverage(). 
Otherwise, it is the token coverage of a disk.
+     */
+    double shardSetCoverage();
+
+    /**
+     * Construct a boundary/shard iterator for the given number of shards.
+     *
+     * Note: This does not offer a method of listing the shard boundaries it 
generates, just to advance to the
+     * corresponding one for a given token.  The only usage for listing is 
currently in tests. Should a need for this
+     * arise, see {@link CompactionSimulationTest} for a possible 
implementation.
+     */
+    ShardTracker boundaries(int shardCount);
+
+    static Range<Token> coveringRange(SSTableReader sstable)
+    {
+        return coveringRange(sstable.getFirst(), sstable.getLast());
+    }
+
+    static Range<Token> coveringRange(PartitionPosition first, 
PartitionPosition last)
+    {
+        // To include the token of last, the range's upper bound must be 
increased.
+        return new Range<>(first.getToken(), last.getToken().nextValidToken());
+    }
+
+
+    /**
+     * Return the token space share that the given SSTable spans, excluding 
any non-locally owned space.
+     * Returns a positive floating-point number between 0 and 1.
+     */
+    default double rangeSpanned(SSTableReader rdr)
+    {
+        double reported = rdr.tokenSpaceCoverage();

Review Comment:
   The full form is slightly easier to read.



##########
src/java/org/apache/cassandra/db/compaction/unified/Controller.java:
##########
@@ -0,0 +1,572 @@
+/*
+ * Copyright DataStax, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction.unified;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.compaction.UnifiedCompactionStrategy;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.utils.Overlaps;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MonotonicClock;
+
+/**
+* The controller provides compaction parameters to the unified compaction 
strategy
+*/
+public class Controller
+{
+    protected static final Logger logger = 
LoggerFactory.getLogger(Controller.class);
+
+    /**
+     * The scaling parameters W, one per bucket index and separated by a comma.
+     * Higher indexes will use the value of the last index with a W specified.
+     */
+    final static String SCALING_PARAMETERS_OPTION = "scaling_parameters";
+    private final static String DEFAULT_SCALING_PARAMETERS = 
CassandraRelevantProperties.UCS_SCALING_PARAMETER.getString();
+
+    /**
+     * Override for the flush size in MB. The database should be able to 
calculate this from executing flushes, this
+     * should only be necessary in rare cases.
+     */
+    static final String FLUSH_SIZE_OVERRIDE_OPTION = "flush_size_override";
+
+    static final String BASE_SHARD_COUNT_OPTION = "base_shard_count";
+    /**
+     * Default base shard count, used when a base count is not explicitly 
supplied. This value applies as long as the
+     * table is not a system one, and directories are not defined.
+     *
+     * For others a base count of 1 is used as system tables are usually small 
and do not need as much compaction
+     * parallelism, while having directories defined provides for parallelism 
in a different way.
+     */
+    public static final int DEFAULT_BASE_SHARD_COUNT = 
CassandraRelevantProperties.UCS_BASE_SHARD_COUNT.getInt();
+
+    static final String TARGET_SSTABLE_SIZE_OPTION = "target_sstable_size";
+    public static final long DEFAULT_TARGET_SSTABLE_SIZE = 
CassandraRelevantProperties.UCS_TARGET_SSTABLE_SIZE.getSizeInBytes();
+    static final long MIN_TARGET_SSTABLE_SIZE = 1L << 20;
+
+    /**
+     * This parameter is intended to modify the shape of the LSM by taking 
into account the survival ratio of data, for now it is fixed to one.
+     */
+    static final double DEFAULT_SURVIVAL_FACTOR = 
CassandraRelevantProperties.UCS_SURVIVAL_FACTOR.getDouble();
+    static final double[] DEFAULT_SURVIVAL_FACTORS = new double[] { 
DEFAULT_SURVIVAL_FACTOR };
+
+    /**
+     * The maximum number of sstables to compact in one operation.
+     *
+     * This is expected to be large and never be reached, but compaction going 
very very late may cause the accumulation
+     * of thousands and even tens of thousands of sstables which may cause 
problems if compacted in one long operation.
+     * The default is chosen to be half of the maximum permitted space 
overhead when the source sstables are of the
+     * minimum sstable size.
+     *
+     * If the fanout factor is larger than the maximum number of sstables, the 
strategy will ignore the latter.
+     */
+    static final String MAX_SSTABLES_TO_COMPACT_OPTION = 
"max_sstables_to_compact";
+
+    static final String ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_OPTION = 
"unsafe_aggressive_sstable_expiration";
+    static final boolean ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION = 
CassandraRelevantProperties.ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION.getBoolean();
+    static final boolean DEFAULT_ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION = 
false;
+
+    static final int DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS = 60 * 10;
+    static final String EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_OPTION = 
"expired_sstable_check_frequency_seconds";
+
+    /** The maximum splitting factor for shards. The maximum number of shards 
is this number multiplied by the base count. */
+    static final double MAX_SHARD_SPLIT = 1048576;
+
+    /**
+     * Overlap inclusion method. NONE for participating sstables only (not 
recommended), SINGLE to only include sstables
+     * that overlap with participating (LCS-like, higher concurrency during 
upgrades but some double compaction),
+     * TRANSITIVE to include overlaps of overlaps (likely to trigger whole 
level compactions, safest).
+     */
+    static final String OVERLAP_INCLUSION_METHOD_OPTION = 
"overlap_inclusion_method";
+    static final Overlaps.InclusionMethod DEFAULT_OVERLAP_INCLUSION_METHOD =
+        
CassandraRelevantProperties.UCS_OVERLAP_INCLUSION_METHOD.getEnum(Overlaps.InclusionMethod.TRANSITIVE);
+
+    protected final ColumnFamilyStore cfs;
+    protected final MonotonicClock clock;
+    private final int[] scalingParameters;
+    protected final double[] survivalFactors;
+    protected final long flushSizeOverride;
+    protected volatile long currentFlushSize;
+    protected final int maxSSTablesToCompact;
+    protected final long expiredSSTableCheckFrequency;
+    protected final boolean ignoreOverlapsInExpirationCheck;
+
+    protected final int baseShardCount;
+
+    protected final double targetSSTableSizeMin;
+
+    protected final Overlaps.InclusionMethod overlapInclusionMethod;
+
+    Controller(ColumnFamilyStore cfs,
+               MonotonicClock clock,
+               int[] scalingParameters,
+               double[] survivalFactors,
+               long flushSizeOverride,
+               int maxSSTablesToCompact,
+               long expiredSSTableCheckFrequency,
+               boolean ignoreOverlapsInExpirationCheck,
+               int baseShardCount,
+               double targetSStableSize,
+               Overlaps.InclusionMethod overlapInclusionMethod)
+    {
+        this.cfs = cfs;
+        this.clock = clock;
+        this.scalingParameters = scalingParameters;
+        this.survivalFactors = survivalFactors;
+        this.flushSizeOverride = flushSizeOverride;
+        this.currentFlushSize = flushSizeOverride;
+        this.expiredSSTableCheckFrequency = 
TimeUnit.MILLISECONDS.convert(expiredSSTableCheckFrequency, TimeUnit.SECONDS);
+        this.baseShardCount = baseShardCount;
+        this.targetSSTableSizeMin = targetSStableSize * Math.sqrt(0.5);
+        this.overlapInclusionMethod = overlapInclusionMethod;
+
+        if (maxSSTablesToCompact <= 0)
+            maxSSTablesToCompact = Integer.MAX_VALUE;
+
+        this.maxSSTablesToCompact = maxSSTablesToCompact;
+
+        if (ignoreOverlapsInExpirationCheck && 
!ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION)
+        {
+            logger.warn("Not enabling aggressive SSTable expiration, as the 
system property '" + 
CassandraRelevantProperties.ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION.name() + 
"' is set to 'false'. " +
+                    "Set it to 'true' to enable aggressive SSTable 
expiration.");
+        }
+        this.ignoreOverlapsInExpirationCheck = 
ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION && ignoreOverlapsInExpirationCheck;
+    }
+
+    /**
+     * @return the scaling parameter W
+     * @param index
+     */
+    public int getScalingParameter(int index)
+    {
+        if (index < 0)
+            throw new IllegalArgumentException("Index should be >= 0: " + 
index);
+
+        return index < scalingParameters.length ? scalingParameters[index] : 
scalingParameters[scalingParameters.length - 1];
+    }
+
+    @Override
+    public String toString()
+    {
+        return String.format("Controller, m: %s, o: %s, Ws: %s",
+                             
FBUtilities.prettyPrintBinary(targetSSTableSizeMin, "B", ""),
+                             Arrays.toString(survivalFactors),
+                             printScalingParameters(scalingParameters));
+    }
+
+    public int getFanout(int index) {
+        int W = getScalingParameter(index);
+        return UnifiedCompactionStrategy.fanoutFromScalingParameter(W);
+    }
+
+    public int getThreshold(int index) {
+        int W = getScalingParameter(index);
+        return UnifiedCompactionStrategy.thresholdFromScalingParameter(W);
+    }
+
+    /**
+     * Calculate the number of shards to split the local token space in for 
the given sstable density.
+     * This is calculated as a power-of-two multiple of baseShardCount, so 
that the expected size of resulting sstables
+     * is between targetSSTableSizeMin and 2*targetSSTableSizeMin (in other 
words, sqrt(0.5) * targetSSTableSize and
+     * sqrt(2) * targetSSTableSize), with a minimum of baseShardCount shards 
for smaller sstables.
+     *
+     * Note that to get the sstables resulting from this splitting within the 
bounds, the density argument must be
+     * normalized to the span that is being split. In other words, if no disks 
are defined, the density should be
+     * scaled by the token coverage of the locally-owned ranges. If multiple 
data directories are defined, the density
+     * should be scaled by the token coverage of the respective data 
directory. That is localDensity = size / span,
+     * where the span is normalized so that span = 1 when the data covers the 
range that is being split.
+     */
+    public int getNumShards(double localDensity)
+    {
+        // How many we would have to aim for the target size. Divided by the 
base shard count, so that we can ensure
+        // the result is a multiple of it by multiplying back below.
+        double count = localDensity / (targetSSTableSizeMin * baseShardCount);
+        if (count > MAX_SHARD_SPLIT)
+            count = MAX_SHARD_SPLIT;
+        assert !(count < 0);    // Must be positive, 0 or NaN, which should 
translate to baseShardCount
+
+        // Make it a power of two multiple of the base count so that split 
points for lower levels remain split points for higher.
+        // The conversion to int and highestOneBit round down, for which we 
compensate by using the sqrt(0.5) multiplier
+        // already applied in targetSSTableSizeMin.
+        // Setting the bottom bit to 1 ensures the result is at least 
baseShardCount.
+        int shards = baseShardCount * Integer.highestOneBit((int) count | 1);
+        logger.debug("Shard count {} for density {}, {} times target {}",
+                     shards,
+                     FBUtilities.prettyPrintBinary(localDensity, "B", " "),
+                     localDensity / targetSSTableSizeMin,
+                     FBUtilities.prettyPrintBinary(targetSSTableSizeMin, "B", 
" "));
+        return shards;
+    }
+
+    /**
+     * @return the survival factor o
+     * @param index
+     */
+    public double getSurvivalFactor(int index)
+    {
+        if (index < 0)
+            throw new IllegalArgumentException("Index should be >= 0: " + 
index);
+
+        return index < survivalFactors.length ? survivalFactors[index] : 
survivalFactors[survivalFactors.length - 1];
+    }
+
+    /**
+     * Return the flush sstable size in bytes.
+     *
+     * This is usually obtained from the observed sstable flush sizes, 
refreshed when it differs significantly
+     * from the current values.
+     * It can also be set by the user in the options.
+     *
+     * @return the flush size in bytes.
+     */
+    public long getFlushSizeBytes()
+    {
+        if (flushSizeOverride > 0)
+            return flushSizeOverride;
+
+        double envFlushSize = cfs.metric.flushSizeOnDisk.get();
+        if (currentFlushSize == 0 || Math.abs(1 - (currentFlushSize / 
envFlushSize)) > 0.5)
+        {
+            // The current size is not initialized, or it differs by over 50% 
from the observed.
+            // Use the observed size rounded up to a whole megabyte.
+            currentFlushSize = ((long) (Math.ceil(Math.scalb(envFlushSize, 
-20)))) << 20;
+        }
+        return currentFlushSize;
+    }
+
+    /**
+     * @return whether is allowed to drop expired SSTables without checking if 
partition keys appear in other SSTables.
+     * Same behavior as in TWCS.
+     */
+    public boolean getIgnoreOverlapsInExpirationCheck()
+    {
+        return ignoreOverlapsInExpirationCheck;
+    }
+
+    public long getExpiredSSTableCheckFrequency()
+    {
+        return expiredSSTableCheckFrequency;
+    }
+
+    /**
+     * The strategy will call this method each time {@link 
UnifiedCompactionStrategy#getNextBackgroundTask} is called.
+     */
+    public void onStrategyBackgroundTaskRequest()
+    {
+    }
+
+    /**
+     * Returns a maximum bucket index for the given data size and fanout.
+     */
+    private int maxBucketIndex(long totalLength, int fanout)
+    {
+        double o = getSurvivalFactor(0);
+        long m = getFlushSizeBytes();
+        return Math.max(0, (int) Math.floor((Math.log(totalLength) - 
Math.log(m)) / (Math.log(fanout) - Math.log(o))));
+    }
+
+    public static Controller fromOptions(ColumnFamilyStore cfs, Map<String, 
String> options)
+    {
+        int[] Ws = 
parseScalingParameters(options.getOrDefault(SCALING_PARAMETERS_OPTION, 
DEFAULT_SCALING_PARAMETERS));
+
+        long flushSizeOverride = 
FBUtilities.parseHumanReadableBytes(options.getOrDefault(FLUSH_SIZE_OVERRIDE_OPTION,
 "0MiB"));
+        int maxSSTablesToCompact = 
Integer.parseInt(options.getOrDefault(MAX_SSTABLES_TO_COMPACT_OPTION, "0"));
+        long expiredSSTableCheckFrequency = 
options.containsKey(EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_OPTION)

Review Comment:
   I prefer to err on the side of keeping the code within the max line width. 
Moved the other properties to be aligned on the same position.



##########
src/java/org/apache/cassandra/db/compaction/UnifiedCompactionStrategy.java:
##########
@@ -0,0 +1,866 @@
+/*
+ * Copyright DataStax, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
+import org.apache.cassandra.db.commitlog.IntervalSet;
+import org.apache.cassandra.db.compaction.unified.Controller;
+import org.apache.cassandra.db.compaction.unified.ShardedMultiWriter;
+import org.apache.cassandra.db.compaction.unified.UnifiedCompactionTask;
+import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.index.Index;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.Clock;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Overlaps;
+import org.apache.cassandra.utils.TimeUUID;
+
+/**
+ * The design of the unified compaction strategy is described in the 
accompanying UnifiedCompactionStrategy.md.
+ *
+ * See CEP-26: 
https://cwiki.apache.org/confluence/display/CASSANDRA/CEP-26%3A+Unified+Compaction+Strategy
+ */
+public class UnifiedCompactionStrategy extends AbstractCompactionStrategy
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(UnifiedCompactionStrategy.class);
+
+    static final int MAX_LEVELS = 32;   // This is enough for a few petabytes 
of data (with the worst case fan factor
+    // at W=0 this leaves room for 2^32 sstables, presumably of at least 1MB 
each).
+
+    private static final Pattern SCALING_PARAMETER_PATTERN = 
Pattern.compile("(N)|L(\\d+)|T(\\d+)|([+-]?\\d+)");
+    private static final String SCALING_PARAMETER_PATTERN_SIMPLIFIED = 
SCALING_PARAMETER_PATTERN.pattern()
+                                                                               
                 .replaceAll("[()]", "")
+                                                                               
                 .replace("\\d", "[0-9]");
+
+    private final Controller controller;
+
+    private volatile ShardManager shardManager;
+
+    private long lastExpiredCheck;
+
+    protected volatile int estimatedRemainingTasks;
+    @VisibleForTesting
+    protected final Set<SSTableReader> sstables = new HashSet<>();
+
+    public UnifiedCompactionStrategy(ColumnFamilyStore cfs, Map<String, 
String> options)
+    {
+        this(cfs, options, Controller.fromOptions(cfs, options));
+    }
+
+    public UnifiedCompactionStrategy(ColumnFamilyStore cfs, Map<String, 
String> options, Controller controller)
+    {
+        super(cfs, options);
+        this.controller = controller;
+        estimatedRemainingTasks = 0;
+    }
+
+    public static Map<String, String> validateOptions(Map<String, String> 
options) throws ConfigurationException
+    {
+        return 
Controller.validateOptions(AbstractCompactionStrategy.validateOptions(options));
+    }
+
+    public static int fanoutFromScalingParameter(int w)
+    {
+        return w < 0 ? 2 - w : 2 + w; // see formula in design doc
+    }
+
+    public static int thresholdFromScalingParameter(int w)
+    {
+        return w <= 0 ? 2 : 2 + w; // see formula in design doc
+    }
+
+    public static int parseScalingParameter(String value)
+    {
+        Matcher m = SCALING_PARAMETER_PATTERN.matcher(value);
+        if (!m.matches())
+            throw new ConfigurationException("Scaling parameter " + value + " 
must match " + SCALING_PARAMETER_PATTERN_SIMPLIFIED);
+
+        if (m.group(1) != null)
+            return 0;
+        else if (m.group(2) != null)
+            return 2 - atLeast2(Integer.parseInt(m.group(2)), value);
+        else if (m.group(3) != null)
+            return atLeast2(Integer.parseInt(m.group(3)), value) - 2;
+        else
+            return Integer.parseInt(m.group(4));
+    }
+
+    private static int atLeast2(int value, String str)
+    {
+        if (value < 2)
+            throw new ConfigurationException("Fan factor cannot be lower than 
2 in " + str);
+        return value;
+    }
+
+    public static String printScalingParameter(int w)
+    {
+        if (w < 0)
+            return "L" + Integer.toString(2 - w);
+        else if (w > 0)
+            return "T" + Integer.toString(w + 2);
+        else
+            return "N";
+    }
+
+    @Override
+    public synchronized Collection<AbstractCompactionTask> getMaximalTask(long 
gcBefore, boolean splitOutput)
+    {
+        maybeUpdateShardManager();
+        // The tasks are split by repair status and disk, as well as in 
non-overlapping sections to enable some
+        // parallelism (to the amount that L0 sstables are split, i.e. at 
least base_shard_count). The result will be
+        // split across shards according to its density. Depending on the 
parallelism, the operation may require up to
+        // 100% extra space to complete.
+        List<AbstractCompactionTask> tasks = new ArrayList<>();
+        List<Set<SSTableReader>> nonOverlapping = 
splitInNonOverlappingSets(filterSuspectSSTables(getSSTables()));
+        for (Set<SSTableReader> set : nonOverlapping)
+        {
+            @SuppressWarnings("resource")   // closed by the returned task
+            LifecycleTransaction txn = cfs.getTracker().tryModify(set, 
OperationType.COMPACTION);
+            if (txn != null)
+                tasks.add(createCompactionTask(txn, gcBefore));
+        }
+        return tasks;
+    }
+
+    private static List<Set<SSTableReader>> 
splitInNonOverlappingSets(Collection<SSTableReader> sstables)
+    {
+        List<Set<SSTableReader>> overlapSets = 
Overlaps.constructOverlapSets(new ArrayList<>(sstables),
+                                                                             
UnifiedCompactionStrategy::startsAfter,
+                                                                             
SSTableReader.firstKeyComparator,
+                                                                             
SSTableReader.lastKeyComparator);
+        if (overlapSets.isEmpty())
+            return overlapSets;
+
+        Set<SSTableReader> group = overlapSets.get(0);
+        List<Set<SSTableReader>> groups = new ArrayList<>();
+        for (int i = 1; i < overlapSets.size(); ++i)
+        {
+            Set<SSTableReader> current = overlapSets.get(i);
+            if (Sets.intersection(current, group).isEmpty())
+            {
+                groups.add(group);
+                group = current;
+            }
+            else
+            {
+                group.addAll(current);
+            }
+        }
+        groups.add(group);
+        return groups;
+    }
+
+    @Override
+    @SuppressWarnings("resource")   // transaction closed by the returned task
+    public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> 
sstables, final long gcBefore)
+    {
+        assert !sstables.isEmpty(); // checked for by CM.submitUserDefined
+
+        LifecycleTransaction transaction = 
cfs.getTracker().tryModify(sstables, OperationType.COMPACTION);
+        if (transaction == null)
+        {
+            logger.trace("Unable to mark {} for compaction; probably a 
background compaction got to it first.  You can disable background compactions 
temporarily if this is a problem", sstables);
+            return null;
+        }
+
+        return createCompactionTask(transaction, 
gcBefore).setUserDefined(true);
+    }
+
+    /**
+     * Returns a compaction task to run next.
+     *
+     * This method is synchronized because task creation is significantly more 
expensive in UCS; the strategy is
+     * stateless, therefore it has to compute the shard/bucket structure on 
each call.
+     *
+     * @param gcBefore throw away tombstones older than this
+     */
+    @Override
+    public synchronized UnifiedCompactionTask getNextBackgroundTask(long 
gcBefore)
+    {
+        controller.onStrategyBackgroundTaskRequest();
+
+        while (true)
+        {
+            CompactionPick pick = getNextCompactionPick(gcBefore);
+            if (pick == null)
+                return null;
+            UnifiedCompactionTask task = createCompactionTask(pick, gcBefore);
+            if (task != null)
+                return task;
+        }
+    }
+
+    @SuppressWarnings("resource")   // transaction closed by the returned task
+    private UnifiedCompactionTask createCompactionTask(CompactionPick pick, 
long gcBefore)
+    {
+        Preconditions.checkNotNull(pick);
+        Preconditions.checkArgument(!pick.isEmpty());
+
+        LifecycleTransaction transaction = cfs.getTracker().tryModify(pick,
+                                                                      
OperationType.COMPACTION);
+        if (transaction != null)
+        {
+            return createCompactionTask(transaction, gcBefore);
+        }
+        else
+        {
+            // This can happen e.g. due to a race with upgrade tasks
+            logger.error("Failed to submit compaction {} because a transaction 
could not be created. If this happens frequently, it should be reported", pick);

Review Comment:
   Changed to warning and added a reference to one problem this can be an 
indication of.



##########
src/java/org/apache/cassandra/db/compaction/unified/Controller.java:
##########
@@ -0,0 +1,572 @@
+/*
+ * Copyright DataStax, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction.unified;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.compaction.UnifiedCompactionStrategy;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.utils.Overlaps;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MonotonicClock;
+
+/**
+* The controller provides compaction parameters to the unified compaction 
strategy
+*/
+public class Controller
+{
+    protected static final Logger logger = 
LoggerFactory.getLogger(Controller.class);
+
+    /**
+     * The scaling parameters W, one per bucket index and separated by a comma.
+     * Higher indexes will use the value of the last index with a W specified.
+     */
+    final static String SCALING_PARAMETERS_OPTION = "scaling_parameters";
+    private final static String DEFAULT_SCALING_PARAMETERS = 
CassandraRelevantProperties.UCS_SCALING_PARAMETER.getString();
+
+    /**
+     * Override for the flush size in MB. The database should be able to 
calculate this from executing flushes, this
+     * should only be necessary in rare cases.
+     */
+    static final String FLUSH_SIZE_OVERRIDE_OPTION = "flush_size_override";
+
+    static final String BASE_SHARD_COUNT_OPTION = "base_shard_count";
+    /**
+     * Default base shard count, used when a base count is not explicitly 
supplied. This value applies as long as the
+     * table is not a system one, and directories are not defined.
+     *
+     * For others a base count of 1 is used as system tables are usually small 
and do not need as much compaction
+     * parallelism, while having directories defined provides for parallelism 
in a different way.
+     */
+    public static final int DEFAULT_BASE_SHARD_COUNT = 
CassandraRelevantProperties.UCS_BASE_SHARD_COUNT.getInt();
+
+    static final String TARGET_SSTABLE_SIZE_OPTION = "target_sstable_size";
+    public static final long DEFAULT_TARGET_SSTABLE_SIZE = 
CassandraRelevantProperties.UCS_TARGET_SSTABLE_SIZE.getSizeInBytes();
+    static final long MIN_TARGET_SSTABLE_SIZE = 1L << 20;
+
+    /**
+     * This parameter is intended to modify the shape of the LSM by taking 
into account the survival ratio of data, for now it is fixed to one.
+     */
+    static final double DEFAULT_SURVIVAL_FACTOR = 
CassandraRelevantProperties.UCS_SURVIVAL_FACTOR.getDouble();
+    static final double[] DEFAULT_SURVIVAL_FACTORS = new double[] { 
DEFAULT_SURVIVAL_FACTOR };
+
+    /**
+     * The maximum number of sstables to compact in one operation.
+     *
+     * This is expected to be large and never be reached, but compaction going 
very very late may cause the accumulation
+     * of thousands and even tens of thousands of sstables which may cause 
problems if compacted in one long operation.
+     * The default is chosen to be half of the maximum permitted space 
overhead when the source sstables are of the
+     * minimum sstable size.
+     *
+     * If the fanout factor is larger than the maximum number of sstables, the 
strategy will ignore the latter.
+     */
+    static final String MAX_SSTABLES_TO_COMPACT_OPTION = 
"max_sstables_to_compact";
+
+    static final String ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_OPTION = 
"unsafe_aggressive_sstable_expiration";
+    static final boolean ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION = 
CassandraRelevantProperties.ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION.getBoolean();
+    static final boolean DEFAULT_ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION = 
false;
+
+    static final int DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS = 60 * 10;
+    static final String EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_OPTION = 
"expired_sstable_check_frequency_seconds";
+
+    /** The maximum splitting factor for shards. The maximum number of shards 
is this number multiplied by the base count. */
+    static final double MAX_SHARD_SPLIT = 1048576;
+
+    /**
+     * Overlap inclusion method. NONE for participating sstables only (not 
recommended), SINGLE to only include sstables
+     * that overlap with participating (LCS-like, higher concurrency during 
upgrades but some double compaction),
+     * TRANSITIVE to include overlaps of overlaps (likely to trigger whole 
level compactions, safest).
+     */
+    static final String OVERLAP_INCLUSION_METHOD_OPTION = 
"overlap_inclusion_method";
+    static final Overlaps.InclusionMethod DEFAULT_OVERLAP_INCLUSION_METHOD =
+        
CassandraRelevantProperties.UCS_OVERLAP_INCLUSION_METHOD.getEnum(Overlaps.InclusionMethod.TRANSITIVE);
+
+    protected final ColumnFamilyStore cfs;
+    protected final MonotonicClock clock;
+    private final int[] scalingParameters;
+    protected final double[] survivalFactors;
+    protected final long flushSizeOverride;
+    protected volatile long currentFlushSize;
+    protected final int maxSSTablesToCompact;
+    protected final long expiredSSTableCheckFrequency;
+    protected final boolean ignoreOverlapsInExpirationCheck;
+
+    protected final int baseShardCount;
+
+    protected final double targetSSTableSizeMin;
+
+    protected final Overlaps.InclusionMethod overlapInclusionMethod;
+
+    Controller(ColumnFamilyStore cfs,
+               MonotonicClock clock,
+               int[] scalingParameters,
+               double[] survivalFactors,
+               long flushSizeOverride,
+               int maxSSTablesToCompact,
+               long expiredSSTableCheckFrequency,
+               boolean ignoreOverlapsInExpirationCheck,
+               int baseShardCount,
+               double targetSStableSize,
+               Overlaps.InclusionMethod overlapInclusionMethod)
+    {
+        this.cfs = cfs;
+        this.clock = clock;
+        this.scalingParameters = scalingParameters;
+        this.survivalFactors = survivalFactors;
+        this.flushSizeOverride = flushSizeOverride;
+        this.currentFlushSize = flushSizeOverride;
+        this.expiredSSTableCheckFrequency = 
TimeUnit.MILLISECONDS.convert(expiredSSTableCheckFrequency, TimeUnit.SECONDS);
+        this.baseShardCount = baseShardCount;
+        this.targetSSTableSizeMin = targetSStableSize * Math.sqrt(0.5);
+        this.overlapInclusionMethod = overlapInclusionMethod;
+
+        if (maxSSTablesToCompact <= 0)
+            maxSSTablesToCompact = Integer.MAX_VALUE;
+
+        this.maxSSTablesToCompact = maxSSTablesToCompact;
+
+        if (ignoreOverlapsInExpirationCheck && 
!ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION)
+        {
+            logger.warn("Not enabling aggressive SSTable expiration, as the 
system property '" + 
CassandraRelevantProperties.ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION.name() + 
"' is set to 'false'. " +
+                    "Set it to 'true' to enable aggressive SSTable 
expiration.");
+        }
+        this.ignoreOverlapsInExpirationCheck = 
ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION && ignoreOverlapsInExpirationCheck;
+    }
+
+    /**
+     * @return the scaling parameter W
+     * @param index
+     */
+    public int getScalingParameter(int index)
+    {
+        if (index < 0)
+            throw new IllegalArgumentException("Index should be >= 0: " + 
index);
+
+        return index < scalingParameters.length ? scalingParameters[index] : 
scalingParameters[scalingParameters.length - 1];
+    }
+
+    @Override
+    public String toString()
+    {
+        return String.format("Controller, m: %s, o: %s, Ws: %s",
+                             
FBUtilities.prettyPrintBinary(targetSSTableSizeMin, "B", ""),
+                             Arrays.toString(survivalFactors),
+                             printScalingParameters(scalingParameters));
+    }
+
+    public int getFanout(int index) {
+        int W = getScalingParameter(index);
+        return UnifiedCompactionStrategy.fanoutFromScalingParameter(W);
+    }
+
+    public int getThreshold(int index) {
+        int W = getScalingParameter(index);
+        return UnifiedCompactionStrategy.thresholdFromScalingParameter(W);
+    }
+
+    /**
+     * Calculate the number of shards to split the local token space in for 
the given sstable density.
+     * This is calculated as a power-of-two multiple of baseShardCount, so 
that the expected size of resulting sstables
+     * is between targetSSTableSizeMin and 2*targetSSTableSizeMin (in other 
words, sqrt(0.5) * targetSSTableSize and
+     * sqrt(2) * targetSSTableSize), with a minimum of baseShardCount shards 
for smaller sstables.
+     *
+     * Note that to get the sstables resulting from this splitting within the 
bounds, the density argument must be
+     * normalized to the span that is being split. In other words, if no disks 
are defined, the density should be
+     * scaled by the token coverage of the locally-owned ranges. If multiple 
data directories are defined, the density
+     * should be scaled by the token coverage of the respective data 
directory. That is localDensity = size / span,
+     * where the span is normalized so that span = 1 when the data covers the 
range that is being split.
+     */
+    public int getNumShards(double localDensity)
+    {
+        // How many we would have to aim for the target size. Divided by the 
base shard count, so that we can ensure
+        // the result is a multiple of it by multiplying back below.
+        double count = localDensity / (targetSSTableSizeMin * baseShardCount);
+        if (count > MAX_SHARD_SPLIT)
+            count = MAX_SHARD_SPLIT;
+        assert !(count < 0);    // Must be positive, 0 or NaN, which should 
translate to baseShardCount
+
+        // Make it a power of two multiple of the base count so that split 
points for lower levels remain split points for higher.
+        // The conversion to int and highestOneBit round down, for which we 
compensate by using the sqrt(0.5) multiplier
+        // already applied in targetSSTableSizeMin.
+        // Setting the bottom bit to 1 ensures the result is at least 
baseShardCount.
+        int shards = baseShardCount * Integer.highestOneBit((int) count | 1);
+        logger.debug("Shard count {} for density {}, {} times target {}",
+                     shards,
+                     FBUtilities.prettyPrintBinary(localDensity, "B", " "),
+                     localDensity / targetSSTableSizeMin,
+                     FBUtilities.prettyPrintBinary(targetSSTableSizeMin, "B", 
" "));
+        return shards;
+    }
+
+    /**
+     * @return the survival factor o
+     * @param index
+     */
+    public double getSurvivalFactor(int index)
+    {
+        if (index < 0)
+            throw new IllegalArgumentException("Index should be >= 0: " + 
index);
+
+        return index < survivalFactors.length ? survivalFactors[index] : 
survivalFactors[survivalFactors.length - 1];
+    }
+
+    /**
+     * Return the flush sstable size in bytes.
+     *
+     * This is usually obtained from the observed sstable flush sizes, 
refreshed when it differs significantly
+     * from the current values.
+     * It can also be set by the user in the options.
+     *
+     * @return the flush size in bytes.
+     */
+    public long getFlushSizeBytes()
+    {
+        if (flushSizeOverride > 0)
+            return flushSizeOverride;
+
+        double envFlushSize = cfs.metric.flushSizeOnDisk.get();
+        if (currentFlushSize == 0 || Math.abs(1 - (currentFlushSize / 
envFlushSize)) > 0.5)
+        {
+            // The current size is not initialized, or it differs by over 50% 
from the observed.
+            // Use the observed size rounded up to a whole megabyte.
+            currentFlushSize = ((long) (Math.ceil(Math.scalb(envFlushSize, 
-20)))) << 20;
+        }
+        return currentFlushSize;
+    }
+
+    /**
+     * @return whether is allowed to drop expired SSTables without checking if 
partition keys appear in other SSTables.
+     * Same behavior as in TWCS.
+     */
+    public boolean getIgnoreOverlapsInExpirationCheck()
+    {
+        return ignoreOverlapsInExpirationCheck;
+    }
+
+    public long getExpiredSSTableCheckFrequency()
+    {
+        return expiredSSTableCheckFrequency;
+    }
+
+    /**
+     * The strategy will call this method each time {@link 
UnifiedCompactionStrategy#getNextBackgroundTask} is called.
+     */
+    public void onStrategyBackgroundTaskRequest()
+    {
+    }
+
+    /**
+     * Returns a maximum bucket index for the given data size and fanout.
+     */
+    private int maxBucketIndex(long totalLength, int fanout)
+    {
+        double o = getSurvivalFactor(0);
+        long m = getFlushSizeBytes();
+        return Math.max(0, (int) Math.floor((Math.log(totalLength) - 
Math.log(m)) / (Math.log(fanout) - Math.log(o))));
+    }
+
+    public static Controller fromOptions(ColumnFamilyStore cfs, Map<String, 
String> options)
+    {
+        int[] Ws = 
parseScalingParameters(options.getOrDefault(SCALING_PARAMETERS_OPTION, 
DEFAULT_SCALING_PARAMETERS));
+
+        long flushSizeOverride = 
FBUtilities.parseHumanReadableBytes(options.getOrDefault(FLUSH_SIZE_OVERRIDE_OPTION,
 "0MiB"));
+        int maxSSTablesToCompact = 
Integer.parseInt(options.getOrDefault(MAX_SSTABLES_TO_COMPACT_OPTION, "0"));
+        long expiredSSTableCheckFrequency = 
options.containsKey(EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_OPTION)
+                ? 
Long.parseLong(options.get(EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_OPTION))
+                : DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS;
+        boolean ignoreOverlapsInExpirationCheck = 
options.containsKey(ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_OPTION)
+                ? 
Boolean.parseBoolean(options.get(ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_OPTION))
+                : DEFAULT_ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION;
+
+        int baseShardCount;
+        if (options.containsKey(BASE_SHARD_COUNT_OPTION))
+        {
+            baseShardCount = 
Integer.parseInt(options.get(BASE_SHARD_COUNT_OPTION));
+        }
+        else
+        {
+            if (SchemaConstants.isSystemKeyspace(cfs.getKeyspaceName()) || 
(cfs.getDiskBoundaries().positions != null && 
cfs.getDiskBoundaries().positions.size() > 1))
+                baseShardCount = 1;
+            else
+                baseShardCount = DEFAULT_BASE_SHARD_COUNT;
+        }
+
+        long targetSStableSize = 
options.containsKey(TARGET_SSTABLE_SIZE_OPTION)
+                                 ? 
FBUtilities.parseHumanReadableBytes(options.get(TARGET_SSTABLE_SIZE_OPTION))
+                                 : DEFAULT_TARGET_SSTABLE_SIZE;
+
+        Overlaps.InclusionMethod inclusionMethod = 
options.containsKey(OVERLAP_INCLUSION_METHOD_OPTION)
+                                                   ? 
Overlaps.InclusionMethod.valueOf(options.get(OVERLAP_INCLUSION_METHOD_OPTION).toUpperCase())
+                                                   : 
DEFAULT_OVERLAP_INCLUSION_METHOD;
+
+        return new Controller(cfs,
+                              MonotonicClock.Global.preciseTime,
+                              Ws,
+                              DEFAULT_SURVIVAL_FACTORS,
+                              flushSizeOverride,
+                              maxSSTablesToCompact,
+                              expiredSSTableCheckFrequency,
+                              ignoreOverlapsInExpirationCheck,
+                              baseShardCount,
+                              targetSStableSize,
+                              inclusionMethod);
+    }
+
+    public static Map<String, String> validateOptions(Map<String, String> 
options) throws ConfigurationException
+    {
+        String nonPositiveErr = "Invalid configuration, %s should be positive: 
%d";

Review Comment:
   Inlined all.



##########
src/java/org/apache/cassandra/db/compaction/unified/ShardedMultiWriter.java:
##########
@@ -0,0 +1,254 @@
+/*
+ * Copyright DataStax, Inc.

Review Comment:
   Done



##########
src/java/org/apache/cassandra/db/compaction/unified/Controller.java:
##########
@@ -0,0 +1,572 @@
+/*
+ * Copyright DataStax, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction.unified;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.compaction.UnifiedCompactionStrategy;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.utils.Overlaps;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MonotonicClock;
+
+/**
+* The controller provides compaction parameters to the unified compaction 
strategy
+*/
+public class Controller
+{
+    protected static final Logger logger = 
LoggerFactory.getLogger(Controller.class);
+
+    /**
+     * The scaling parameters W, one per bucket index and separated by a comma.
+     * Higher indexes will use the value of the last index with a W specified.
+     */
+    final static String SCALING_PARAMETERS_OPTION = "scaling_parameters";
+    private final static String DEFAULT_SCALING_PARAMETERS = 
CassandraRelevantProperties.UCS_SCALING_PARAMETER.getString();
+
+    /**
+     * Override for the flush size in MB. The database should be able to 
calculate this from executing flushes, this
+     * should only be necessary in rare cases.
+     */
+    static final String FLUSH_SIZE_OVERRIDE_OPTION = "flush_size_override";
+
+    static final String BASE_SHARD_COUNT_OPTION = "base_shard_count";
+    /**
+     * Default base shard count, used when a base count is not explicitly 
supplied. This value applies as long as the
+     * table is not a system one, and directories are not defined.
+     *
+     * For others a base count of 1 is used as system tables are usually small 
and do not need as much compaction
+     * parallelism, while having directories defined provides for parallelism 
in a different way.
+     */
+    public static final int DEFAULT_BASE_SHARD_COUNT = 
CassandraRelevantProperties.UCS_BASE_SHARD_COUNT.getInt();
+
+    static final String TARGET_SSTABLE_SIZE_OPTION = "target_sstable_size";
+    public static final long DEFAULT_TARGET_SSTABLE_SIZE = 
CassandraRelevantProperties.UCS_TARGET_SSTABLE_SIZE.getSizeInBytes();
+    static final long MIN_TARGET_SSTABLE_SIZE = 1L << 20;
+
+    /**
+     * This parameter is intended to modify the shape of the LSM by taking 
into account the survival ratio of data, for now it is fixed to one.
+     */
+    static final double DEFAULT_SURVIVAL_FACTOR = 
CassandraRelevantProperties.UCS_SURVIVAL_FACTOR.getDouble();
+    static final double[] DEFAULT_SURVIVAL_FACTORS = new double[] { 
DEFAULT_SURVIVAL_FACTOR };
+
+    /**
+     * The maximum number of sstables to compact in one operation.
+     *
+     * This is expected to be large and never be reached, but compaction going 
very very late may cause the accumulation
+     * of thousands and even tens of thousands of sstables which may cause 
problems if compacted in one long operation.
+     * The default is chosen to be half of the maximum permitted space 
overhead when the source sstables are of the
+     * minimum sstable size.
+     *
+     * If the fanout factor is larger than the maximum number of sstables, the 
strategy will ignore the latter.
+     */
+    static final String MAX_SSTABLES_TO_COMPACT_OPTION = 
"max_sstables_to_compact";
+
+    static final String ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_OPTION = 
"unsafe_aggressive_sstable_expiration";
+    static final boolean ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION = 
CassandraRelevantProperties.ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION.getBoolean();
+    static final boolean DEFAULT_ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION = 
false;
+
+    static final int DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS = 60 * 10;
+    static final String EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_OPTION = 
"expired_sstable_check_frequency_seconds";
+
+    /** The maximum splitting factor for shards. The maximum number of shards 
is this number multiplied by the base count. */
+    static final double MAX_SHARD_SPLIT = 1048576;
+
+    /**
+     * Overlap inclusion method. NONE for participating sstables only (not 
recommended), SINGLE to only include sstables
+     * that overlap with participating (LCS-like, higher concurrency during 
upgrades but some double compaction),
+     * TRANSITIVE to include overlaps of overlaps (likely to trigger whole 
level compactions, safest).
+     */
+    static final String OVERLAP_INCLUSION_METHOD_OPTION = 
"overlap_inclusion_method";
+    static final Overlaps.InclusionMethod DEFAULT_OVERLAP_INCLUSION_METHOD =
+        
CassandraRelevantProperties.UCS_OVERLAP_INCLUSION_METHOD.getEnum(Overlaps.InclusionMethod.TRANSITIVE);
+
+    protected final ColumnFamilyStore cfs;
+    protected final MonotonicClock clock;
+    private final int[] scalingParameters;
+    protected final double[] survivalFactors;
+    protected final long flushSizeOverride;
+    protected volatile long currentFlushSize;
+    protected final int maxSSTablesToCompact;
+    protected final long expiredSSTableCheckFrequency;
+    protected final boolean ignoreOverlapsInExpirationCheck;
+
+    protected final int baseShardCount;
+
+    protected final double targetSSTableSizeMin;
+
+    protected final Overlaps.InclusionMethod overlapInclusionMethod;
+
+    Controller(ColumnFamilyStore cfs,
+               MonotonicClock clock,
+               int[] scalingParameters,
+               double[] survivalFactors,
+               long flushSizeOverride,
+               int maxSSTablesToCompact,
+               long expiredSSTableCheckFrequency,
+               boolean ignoreOverlapsInExpirationCheck,
+               int baseShardCount,
+               double targetSStableSize,
+               Overlaps.InclusionMethod overlapInclusionMethod)
+    {
+        this.cfs = cfs;
+        this.clock = clock;
+        this.scalingParameters = scalingParameters;
+        this.survivalFactors = survivalFactors;
+        this.flushSizeOverride = flushSizeOverride;
+        this.currentFlushSize = flushSizeOverride;
+        this.expiredSSTableCheckFrequency = 
TimeUnit.MILLISECONDS.convert(expiredSSTableCheckFrequency, TimeUnit.SECONDS);
+        this.baseShardCount = baseShardCount;
+        this.targetSSTableSizeMin = targetSStableSize * Math.sqrt(0.5);
+        this.overlapInclusionMethod = overlapInclusionMethod;
+
+        if (maxSSTablesToCompact <= 0)
+            maxSSTablesToCompact = Integer.MAX_VALUE;
+
+        this.maxSSTablesToCompact = maxSSTablesToCompact;
+
+        if (ignoreOverlapsInExpirationCheck && 
!ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION)
+        {
+            logger.warn("Not enabling aggressive SSTable expiration, as the 
system property '" + 
CassandraRelevantProperties.ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION.name() + 
"' is set to 'false'. " +
+                    "Set it to 'true' to enable aggressive SSTable 
expiration.");
+        }
+        this.ignoreOverlapsInExpirationCheck = 
ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION && ignoreOverlapsInExpirationCheck;
+    }
+
+    /**
+     * @return the scaling parameter W
+     * @param index
+     */
+    public int getScalingParameter(int index)
+    {
+        if (index < 0)
+            throw new IllegalArgumentException("Index should be >= 0: " + 
index);
+
+        return index < scalingParameters.length ? scalingParameters[index] : 
scalingParameters[scalingParameters.length - 1];
+    }
+
+    @Override
+    public String toString()
+    {
+        return String.format("Controller, m: %s, o: %s, Ws: %s",
+                             
FBUtilities.prettyPrintBinary(targetSSTableSizeMin, "B", ""),
+                             Arrays.toString(survivalFactors),
+                             printScalingParameters(scalingParameters));
+    }
+
+    public int getFanout(int index) {
+        int W = getScalingParameter(index);
+        return UnifiedCompactionStrategy.fanoutFromScalingParameter(W);
+    }
+
+    public int getThreshold(int index) {
+        int W = getScalingParameter(index);
+        return UnifiedCompactionStrategy.thresholdFromScalingParameter(W);
+    }
+
+    /**
+     * Calculate the number of shards to split the local token space in for 
the given sstable density.
+     * This is calculated as a power-of-two multiple of baseShardCount, so 
that the expected size of resulting sstables
+     * is between targetSSTableSizeMin and 2*targetSSTableSizeMin (in other 
words, sqrt(0.5) * targetSSTableSize and
+     * sqrt(2) * targetSSTableSize), with a minimum of baseShardCount shards 
for smaller sstables.
+     *
+     * Note that to get the sstables resulting from this splitting within the 
bounds, the density argument must be
+     * normalized to the span that is being split. In other words, if no disks 
are defined, the density should be
+     * scaled by the token coverage of the locally-owned ranges. If multiple 
data directories are defined, the density
+     * should be scaled by the token coverage of the respective data 
directory. That is localDensity = size / span,
+     * where the span is normalized so that span = 1 when the data covers the 
range that is being split.
+     */
+    public int getNumShards(double localDensity)
+    {
+        // How many we would have to aim for the target size. Divided by the 
base shard count, so that we can ensure
+        // the result is a multiple of it by multiplying back below.
+        double count = localDensity / (targetSSTableSizeMin * baseShardCount);
+        if (count > MAX_SHARD_SPLIT)
+            count = MAX_SHARD_SPLIT;
+        assert !(count < 0);    // Must be positive, 0 or NaN, which should 
translate to baseShardCount
+
+        // Make it a power of two multiple of the base count so that split 
points for lower levels remain split points for higher.
+        // The conversion to int and highestOneBit round down, for which we 
compensate by using the sqrt(0.5) multiplier
+        // already applied in targetSSTableSizeMin.
+        // Setting the bottom bit to 1 ensures the result is at least 
baseShardCount.
+        int shards = baseShardCount * Integer.highestOneBit((int) count | 1);
+        logger.debug("Shard count {} for density {}, {} times target {}",
+                     shards,
+                     FBUtilities.prettyPrintBinary(localDensity, "B", " "),
+                     localDensity / targetSSTableSizeMin,
+                     FBUtilities.prettyPrintBinary(targetSSTableSizeMin, "B", 
" "));
+        return shards;
+    }
+
+    /**
+     * @return the survival factor o
+     * @param index
+     */
+    public double getSurvivalFactor(int index)
+    {
+        if (index < 0)
+            throw new IllegalArgumentException("Index should be >= 0: " + 
index);
+
+        return index < survivalFactors.length ? survivalFactors[index] : 
survivalFactors[survivalFactors.length - 1];
+    }
+
+    /**
+     * Return the flush sstable size in bytes.
+     *
+     * This is usually obtained from the observed sstable flush sizes, 
refreshed when it differs significantly
+     * from the current values.
+     * It can also be set by the user in the options.
+     *
+     * @return the flush size in bytes.
+     */
+    public long getFlushSizeBytes()
+    {
+        if (flushSizeOverride > 0)
+            return flushSizeOverride;
+
+        double envFlushSize = cfs.metric.flushSizeOnDisk.get();
+        if (currentFlushSize == 0 || Math.abs(1 - (currentFlushSize / 
envFlushSize)) > 0.5)
+        {
+            // The current size is not initialized, or it differs by over 50% 
from the observed.
+            // Use the observed size rounded up to a whole megabyte.
+            currentFlushSize = ((long) (Math.ceil(Math.scalb(envFlushSize, 
-20)))) << 20;
+        }
+        return currentFlushSize;
+    }
+
+    /**
+     * @return whether is allowed to drop expired SSTables without checking if 
partition keys appear in other SSTables.
+     * Same behavior as in TWCS.
+     */
+    public boolean getIgnoreOverlapsInExpirationCheck()
+    {
+        return ignoreOverlapsInExpirationCheck;
+    }
+
+    public long getExpiredSSTableCheckFrequency()
+    {
+        return expiredSSTableCheckFrequency;
+    }
+
+    /**
+     * The strategy will call this method each time {@link 
UnifiedCompactionStrategy#getNextBackgroundTask} is called.
+     */
+    public void onStrategyBackgroundTaskRequest()
+    {
+    }
+
+    /**
+     * Returns a maximum bucket index for the given data size and fanout.
+     */
+    private int maxBucketIndex(long totalLength, int fanout)
+    {
+        double o = getSurvivalFactor(0);
+        long m = getFlushSizeBytes();
+        return Math.max(0, (int) Math.floor((Math.log(totalLength) - 
Math.log(m)) / (Math.log(fanout) - Math.log(o))));
+    }
+
+    public static Controller fromOptions(ColumnFamilyStore cfs, Map<String, 
String> options)
+    {
+        int[] Ws = 
parseScalingParameters(options.getOrDefault(SCALING_PARAMETERS_OPTION, 
DEFAULT_SCALING_PARAMETERS));
+
+        long flushSizeOverride = 
FBUtilities.parseHumanReadableBytes(options.getOrDefault(FLUSH_SIZE_OVERRIDE_OPTION,
 "0MiB"));
+        int maxSSTablesToCompact = 
Integer.parseInt(options.getOrDefault(MAX_SSTABLES_TO_COMPACT_OPTION, "0"));
+        long expiredSSTableCheckFrequency = 
options.containsKey(EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_OPTION)
+                ? 
Long.parseLong(options.get(EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_OPTION))
+                : DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS;
+        boolean ignoreOverlapsInExpirationCheck = 
options.containsKey(ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_OPTION)
+                ? 
Boolean.parseBoolean(options.get(ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_OPTION))
+                : DEFAULT_ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION;
+
+        int baseShardCount;
+        if (options.containsKey(BASE_SHARD_COUNT_OPTION))
+        {
+            baseShardCount = 
Integer.parseInt(options.get(BASE_SHARD_COUNT_OPTION));
+        }
+        else
+        {
+            if (SchemaConstants.isSystemKeyspace(cfs.getKeyspaceName()) || 
(cfs.getDiskBoundaries().positions != null && 
cfs.getDiskBoundaries().positions.size() > 1))
+                baseShardCount = 1;
+            else
+                baseShardCount = DEFAULT_BASE_SHARD_COUNT;
+        }
+
+        long targetSStableSize = 
options.containsKey(TARGET_SSTABLE_SIZE_OPTION)
+                                 ? 
FBUtilities.parseHumanReadableBytes(options.get(TARGET_SSTABLE_SIZE_OPTION))
+                                 : DEFAULT_TARGET_SSTABLE_SIZE;
+
+        Overlaps.InclusionMethod inclusionMethod = 
options.containsKey(OVERLAP_INCLUSION_METHOD_OPTION)
+                                                   ? 
Overlaps.InclusionMethod.valueOf(options.get(OVERLAP_INCLUSION_METHOD_OPTION).toUpperCase())
+                                                   : 
DEFAULT_OVERLAP_INCLUSION_METHOD;
+
+        return new Controller(cfs,
+                              MonotonicClock.Global.preciseTime,
+                              Ws,
+                              DEFAULT_SURVIVAL_FACTORS,
+                              flushSizeOverride,
+                              maxSSTablesToCompact,
+                              expiredSSTableCheckFrequency,
+                              ignoreOverlapsInExpirationCheck,
+                              baseShardCount,
+                              targetSStableSize,
+                              inclusionMethod);
+    }
+
+    public static Map<String, String> validateOptions(Map<String, String> 
options) throws ConfigurationException
+    {
+        String nonPositiveErr = "Invalid configuration, %s should be positive: 
%d";
+        String booleanParseErr = "%s should either be 'true' or 'false', not 
%s";
+        String intParseErr = "%s is not a parsable int (base10) for %s";
+        String longParseErr = "%s is not a parsable long (base10) for %s";
+        String sizeUnacceptableErr = "%s %s is not acceptable, size must be at 
least %s";
+        String invalidSizeErr = "%s %s is not a valid size in bytes: %s";
+        options = new HashMap<>(options);
+        String s;

Review Comment:
   We can, but this will make the first removal violate the pattern of the rest 
(and make it slightly harder to insert anything in front). Keeping as is.



##########
src/java/org/apache/cassandra/db/compaction/unified/ShardedCompactionWriter.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Copyright DataStax, Inc.

Review Comment:
   Done



##########
test/data/legacy-sstables/da/legacy_tables/legacy_da_clust/da-1-bti-CompressionInfo.db:
##########


Review Comment:
   The purpose of the legacy-tables directory is to make sure that new versions 
can read all supported format versions. It is not strictly necessary to include 
the current version among them, but by doing so we make sure that we have 
something that was actually created by the current release when we test a 
future one.



##########
src/java/org/apache/cassandra/db/compaction/unified/UnifiedCompactionTask.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Copyright DataStax, Inc.

Review Comment:
   Done



##########
src/java/org/apache/cassandra/db/compaction/unified/Controller.java:
##########
@@ -0,0 +1,572 @@
+/*
+ * Copyright DataStax, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction.unified;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.compaction.UnifiedCompactionStrategy;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.utils.Overlaps;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MonotonicClock;
+
+/**
+* The controller provides compaction parameters to the unified compaction 
strategy
+*/
+public class Controller
+{
+    protected static final Logger logger = 
LoggerFactory.getLogger(Controller.class);
+
+    /**
+     * The scaling parameters W, one per bucket index and separated by a comma.
+     * Higher indexes will use the value of the last index with a W specified.
+     */
+    final static String SCALING_PARAMETERS_OPTION = "scaling_parameters";
+    private final static String DEFAULT_SCALING_PARAMETERS = 
CassandraRelevantProperties.UCS_SCALING_PARAMETER.getString();
+
+    /**
+     * Override for the flush size in MB. The database should be able to 
calculate this from executing flushes, this
+     * should only be necessary in rare cases.
+     */
+    static final String FLUSH_SIZE_OVERRIDE_OPTION = "flush_size_override";
+
+    static final String BASE_SHARD_COUNT_OPTION = "base_shard_count";
+    /**
+     * Default base shard count, used when a base count is not explicitly 
supplied. This value applies as long as the
+     * table is not a system one, and directories are not defined.
+     *
+     * For others a base count of 1 is used as system tables are usually small 
and do not need as much compaction
+     * parallelism, while having directories defined provides for parallelism 
in a different way.
+     */
+    public static final int DEFAULT_BASE_SHARD_COUNT = 
CassandraRelevantProperties.UCS_BASE_SHARD_COUNT.getInt();
+
+    static final String TARGET_SSTABLE_SIZE_OPTION = "target_sstable_size";
+    public static final long DEFAULT_TARGET_SSTABLE_SIZE = 
CassandraRelevantProperties.UCS_TARGET_SSTABLE_SIZE.getSizeInBytes();
+    static final long MIN_TARGET_SSTABLE_SIZE = 1L << 20;
+
+    /**
+     * This parameter is intended to modify the shape of the LSM by taking 
into account the survival ratio of data, for now it is fixed to one.
+     */
+    static final double DEFAULT_SURVIVAL_FACTOR = 
CassandraRelevantProperties.UCS_SURVIVAL_FACTOR.getDouble();
+    static final double[] DEFAULT_SURVIVAL_FACTORS = new double[] { 
DEFAULT_SURVIVAL_FACTOR };
+
+    /**
+     * The maximum number of sstables to compact in one operation.
+     *
+     * This is expected to be large and never be reached, but compaction going 
very very late may cause the accumulation
+     * of thousands and even tens of thousands of sstables which may cause 
problems if compacted in one long operation.
+     * The default is chosen to be half of the maximum permitted space 
overhead when the source sstables are of the
+     * minimum sstable size.
+     *
+     * If the fanout factor is larger than the maximum number of sstables, the 
strategy will ignore the latter.
+     */
+    static final String MAX_SSTABLES_TO_COMPACT_OPTION = 
"max_sstables_to_compact";
+
+    static final String ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_OPTION = 
"unsafe_aggressive_sstable_expiration";
+    static final boolean ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION = 
CassandraRelevantProperties.ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION.getBoolean();
+    static final boolean DEFAULT_ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION = 
false;
+
+    static final int DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS = 60 * 10;
+    static final String EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_OPTION = 
"expired_sstable_check_frequency_seconds";
+
+    /** The maximum splitting factor for shards. The maximum number of shards 
is this number multiplied by the base count. */
+    static final double MAX_SHARD_SPLIT = 1048576;
+
+    /**
+     * Overlap inclusion method. NONE for participating sstables only (not 
recommended), SINGLE to only include sstables
+     * that overlap with participating (LCS-like, higher concurrency during 
upgrades but some double compaction),
+     * TRANSITIVE to include overlaps of overlaps (likely to trigger whole 
level compactions, safest).
+     */
+    static final String OVERLAP_INCLUSION_METHOD_OPTION = 
"overlap_inclusion_method";
+    static final Overlaps.InclusionMethod DEFAULT_OVERLAP_INCLUSION_METHOD =
+        
CassandraRelevantProperties.UCS_OVERLAP_INCLUSION_METHOD.getEnum(Overlaps.InclusionMethod.TRANSITIVE);
+
+    protected final ColumnFamilyStore cfs;
+    protected final MonotonicClock clock;
+    private final int[] scalingParameters;
+    protected final double[] survivalFactors;
+    protected final long flushSizeOverride;
+    protected volatile long currentFlushSize;
+    protected final int maxSSTablesToCompact;
+    protected final long expiredSSTableCheckFrequency;
+    protected final boolean ignoreOverlapsInExpirationCheck;
+
+    protected final int baseShardCount;
+
+    protected final double targetSSTableSizeMin;
+
+    protected final Overlaps.InclusionMethod overlapInclusionMethod;
+
+    Controller(ColumnFamilyStore cfs,
+               MonotonicClock clock,
+               int[] scalingParameters,
+               double[] survivalFactors,
+               long flushSizeOverride,
+               int maxSSTablesToCompact,
+               long expiredSSTableCheckFrequency,
+               boolean ignoreOverlapsInExpirationCheck,
+               int baseShardCount,
+               double targetSStableSize,
+               Overlaps.InclusionMethod overlapInclusionMethod)
+    {
+        this.cfs = cfs;
+        this.clock = clock;
+        this.scalingParameters = scalingParameters;
+        this.survivalFactors = survivalFactors;
+        this.flushSizeOverride = flushSizeOverride;
+        this.currentFlushSize = flushSizeOverride;
+        this.expiredSSTableCheckFrequency = 
TimeUnit.MILLISECONDS.convert(expiredSSTableCheckFrequency, TimeUnit.SECONDS);
+        this.baseShardCount = baseShardCount;
+        this.targetSSTableSizeMin = targetSStableSize * Math.sqrt(0.5);
+        this.overlapInclusionMethod = overlapInclusionMethod;
+
+        if (maxSSTablesToCompact <= 0)
+            maxSSTablesToCompact = Integer.MAX_VALUE;
+
+        this.maxSSTablesToCompact = maxSSTablesToCompact;
+
+        if (ignoreOverlapsInExpirationCheck && 
!ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION)
+        {
+            logger.warn("Not enabling aggressive SSTable expiration, as the 
system property '" + 
CassandraRelevantProperties.ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION.name() + 
"' is set to 'false'. " +
+                    "Set it to 'true' to enable aggressive SSTable 
expiration.");
+        }
+        this.ignoreOverlapsInExpirationCheck = 
ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION && ignoreOverlapsInExpirationCheck;
+    }
+
+    /**
+     * @return the scaling parameter W
+     * @param index
+     */
+    public int getScalingParameter(int index)
+    {
+        if (index < 0)
+            throw new IllegalArgumentException("Index should be >= 0: " + 
index);
+
+        return index < scalingParameters.length ? scalingParameters[index] : 
scalingParameters[scalingParameters.length - 1];
+    }
+
+    @Override
+    public String toString()
+    {
+        return String.format("Controller, m: %s, o: %s, Ws: %s",
+                             
FBUtilities.prettyPrintBinary(targetSSTableSizeMin, "B", ""),
+                             Arrays.toString(survivalFactors),
+                             printScalingParameters(scalingParameters));
+    }
+
+    public int getFanout(int index) {
+        int W = getScalingParameter(index);
+        return UnifiedCompactionStrategy.fanoutFromScalingParameter(W);
+    }
+
+    public int getThreshold(int index) {
+        int W = getScalingParameter(index);
+        return UnifiedCompactionStrategy.thresholdFromScalingParameter(W);
+    }
+
+    /**
+     * Calculate the number of shards to split the local token space in for 
the given sstable density.
+     * This is calculated as a power-of-two multiple of baseShardCount, so 
that the expected size of resulting sstables
+     * is between targetSSTableSizeMin and 2*targetSSTableSizeMin (in other 
words, sqrt(0.5) * targetSSTableSize and
+     * sqrt(2) * targetSSTableSize), with a minimum of baseShardCount shards 
for smaller sstables.
+     *
+     * Note that to get the sstables resulting from this splitting within the 
bounds, the density argument must be
+     * normalized to the span that is being split. In other words, if no disks 
are defined, the density should be
+     * scaled by the token coverage of the locally-owned ranges. If multiple 
data directories are defined, the density
+     * should be scaled by the token coverage of the respective data 
directory. That is localDensity = size / span,
+     * where the span is normalized so that span = 1 when the data covers the 
range that is being split.
+     */
+    public int getNumShards(double localDensity)
+    {
+        // How many we would have to aim for the target size. Divided by the 
base shard count, so that we can ensure
+        // the result is a multiple of it by multiplying back below.
+        double count = localDensity / (targetSSTableSizeMin * baseShardCount);
+        if (count > MAX_SHARD_SPLIT)
+            count = MAX_SHARD_SPLIT;
+        assert !(count < 0);    // Must be positive, 0 or NaN, which should 
translate to baseShardCount
+
+        // Make it a power of two multiple of the base count so that split 
points for lower levels remain split points for higher.
+        // The conversion to int and highestOneBit round down, for which we 
compensate by using the sqrt(0.5) multiplier
+        // already applied in targetSSTableSizeMin.
+        // Setting the bottom bit to 1 ensures the result is at least 
baseShardCount.
+        int shards = baseShardCount * Integer.highestOneBit((int) count | 1);
+        logger.debug("Shard count {} for density {}, {} times target {}",
+                     shards,
+                     FBUtilities.prettyPrintBinary(localDensity, "B", " "),
+                     localDensity / targetSSTableSizeMin,
+                     FBUtilities.prettyPrintBinary(targetSSTableSizeMin, "B", 
" "));
+        return shards;
+    }
+
+    /**
+     * @return the survival factor o
+     * @param index
+     */
+    public double getSurvivalFactor(int index)
+    {
+        if (index < 0)
+            throw new IllegalArgumentException("Index should be >= 0: " + 
index);
+
+        return index < survivalFactors.length ? survivalFactors[index] : 
survivalFactors[survivalFactors.length - 1];
+    }
+
+    /**
+     * Return the flush sstable size in bytes.
+     *
+     * This is usually obtained from the observed sstable flush sizes, 
refreshed when it differs significantly
+     * from the current values.
+     * It can also be set by the user in the options.
+     *
+     * @return the flush size in bytes.
+     */
+    public long getFlushSizeBytes()
+    {
+        if (flushSizeOverride > 0)
+            return flushSizeOverride;
+
+        double envFlushSize = cfs.metric.flushSizeOnDisk.get();
+        if (currentFlushSize == 0 || Math.abs(1 - (currentFlushSize / 
envFlushSize)) > 0.5)
+        {
+            // The current size is not initialized, or it differs by over 50% 
from the observed.
+            // Use the observed size rounded up to a whole megabyte.
+            currentFlushSize = ((long) (Math.ceil(Math.scalb(envFlushSize, 
-20)))) << 20;
+        }
+        return currentFlushSize;
+    }
+
+    /**
+     * @return whether is allowed to drop expired SSTables without checking if 
partition keys appear in other SSTables.
+     * Same behavior as in TWCS.
+     */
+    public boolean getIgnoreOverlapsInExpirationCheck()
+    {
+        return ignoreOverlapsInExpirationCheck;
+    }
+
+    public long getExpiredSSTableCheckFrequency()
+    {
+        return expiredSSTableCheckFrequency;
+    }
+
+    /**
+     * The strategy will call this method each time {@link 
UnifiedCompactionStrategy#getNextBackgroundTask} is called.
+     */
+    public void onStrategyBackgroundTaskRequest()
+    {
+    }
+
+    /**
+     * Returns a maximum bucket index for the given data size and fanout.
+     */
+    private int maxBucketIndex(long totalLength, int fanout)
+    {
+        double o = getSurvivalFactor(0);
+        long m = getFlushSizeBytes();
+        return Math.max(0, (int) Math.floor((Math.log(totalLength) - 
Math.log(m)) / (Math.log(fanout) - Math.log(o))));
+    }
+
+    public static Controller fromOptions(ColumnFamilyStore cfs, Map<String, 
String> options)
+    {
+        int[] Ws = 
parseScalingParameters(options.getOrDefault(SCALING_PARAMETERS_OPTION, 
DEFAULT_SCALING_PARAMETERS));
+
+        long flushSizeOverride = 
FBUtilities.parseHumanReadableBytes(options.getOrDefault(FLUSH_SIZE_OVERRIDE_OPTION,
 "0MiB"));
+        int maxSSTablesToCompact = 
Integer.parseInt(options.getOrDefault(MAX_SSTABLES_TO_COMPACT_OPTION, "0"));
+        long expiredSSTableCheckFrequency = 
options.containsKey(EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_OPTION)
+                ? 
Long.parseLong(options.get(EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_OPTION))
+                : DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS;
+        boolean ignoreOverlapsInExpirationCheck = 
options.containsKey(ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_OPTION)
+                ? 
Boolean.parseBoolean(options.get(ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_OPTION))
+                : DEFAULT_ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION;
+
+        int baseShardCount;
+        if (options.containsKey(BASE_SHARD_COUNT_OPTION))
+        {
+            baseShardCount = 
Integer.parseInt(options.get(BASE_SHARD_COUNT_OPTION));
+        }
+        else
+        {
+            if (SchemaConstants.isSystemKeyspace(cfs.getKeyspaceName()) || 
(cfs.getDiskBoundaries().positions != null && 
cfs.getDiskBoundaries().positions.size() > 1))
+                baseShardCount = 1;
+            else
+                baseShardCount = DEFAULT_BASE_SHARD_COUNT;
+        }
+
+        long targetSStableSize = 
options.containsKey(TARGET_SSTABLE_SIZE_OPTION)
+                                 ? 
FBUtilities.parseHumanReadableBytes(options.get(TARGET_SSTABLE_SIZE_OPTION))
+                                 : DEFAULT_TARGET_SSTABLE_SIZE;
+
+        Overlaps.InclusionMethod inclusionMethod = 
options.containsKey(OVERLAP_INCLUSION_METHOD_OPTION)
+                                                   ? 
Overlaps.InclusionMethod.valueOf(options.get(OVERLAP_INCLUSION_METHOD_OPTION).toUpperCase())
+                                                   : 
DEFAULT_OVERLAP_INCLUSION_METHOD;
+
+        return new Controller(cfs,
+                              MonotonicClock.Global.preciseTime,
+                              Ws,
+                              DEFAULT_SURVIVAL_FACTORS,
+                              flushSizeOverride,
+                              maxSSTablesToCompact,
+                              expiredSSTableCheckFrequency,
+                              ignoreOverlapsInExpirationCheck,
+                              baseShardCount,
+                              targetSStableSize,
+                              inclusionMethod);
+    }
+
+    public static Map<String, String> validateOptions(Map<String, String> 
options) throws ConfigurationException
+    {
+        String nonPositiveErr = "Invalid configuration, %s should be positive: 
%d";
+        String booleanParseErr = "%s should either be 'true' or 'false', not 
%s";
+        String intParseErr = "%s is not a parsable int (base10) for %s";
+        String longParseErr = "%s is not a parsable long (base10) for %s";
+        String sizeUnacceptableErr = "%s %s is not acceptable, size must be at 
least %s";
+        String invalidSizeErr = "%s %s is not a valid size in bytes: %s";
+        options = new HashMap<>(options);
+        String s;
+
+        s = options.remove(SCALING_PARAMETERS_OPTION);
+        if (s != null)
+            parseScalingParameters(s);
+
+        s = options.remove(BASE_SHARD_COUNT_OPTION);
+        if (s != null)
+        {
+            try
+            {
+                int numShards = Integer.parseInt(s);
+                if (numShards <= 0)
+                    throw new 
ConfigurationException(String.format(nonPositiveErr,
+                                                                   
BASE_SHARD_COUNT_OPTION,
+                                                                   numShards));
+            }
+            catch (NumberFormatException e)
+            {
+                throw new ConfigurationException(String.format(intParseErr, s, 
BASE_SHARD_COUNT_OPTION), e);
+            }
+        }
+
+        s = options.remove(TARGET_SSTABLE_SIZE_OPTION);
+        if (s != null)
+        {
+            try
+            {
+                long targetSSTableSize = 
FBUtilities.parseHumanReadableBytes(s);
+                if (targetSSTableSize < MIN_TARGET_SSTABLE_SIZE)
+                {
+                    throw new 
ConfigurationException(String.format(sizeUnacceptableErr,
+                                                                   
TARGET_SSTABLE_SIZE_OPTION,
+                                                                   s,
+                                                                   
FBUtilities.prettyPrintBinary(MIN_TARGET_SSTABLE_SIZE, "B", "")));
+                }
+            }
+            catch (NumberFormatException e)
+            {
+                throw new ConfigurationException(String.format(invalidSizeErr,
+                                                               
TARGET_SSTABLE_SIZE_OPTION,
+                                                               s,
+                                                               e.getMessage()),
+                                                 e);
+            }
+        }
+
+        s = options.remove(FLUSH_SIZE_OVERRIDE_OPTION);
+        if (s != null)
+        {
+            try
+            {
+                long flushSize = FBUtilities.parseHumanReadableBytes(s);
+                if (flushSize < MIN_TARGET_SSTABLE_SIZE)
+                    throw new 
ConfigurationException(String.format(sizeUnacceptableErr,
+                                                                   
FLUSH_SIZE_OVERRIDE_OPTION,
+                                                                   s,
+                                                                   
FBUtilities.prettyPrintBinary(MIN_TARGET_SSTABLE_SIZE, "B", "")));
+            }
+            catch (NumberFormatException e)
+            {
+                throw new ConfigurationException(String.format(invalidSizeErr,
+                                                               
FLUSH_SIZE_OVERRIDE_OPTION,
+                                                               s,
+                                                               e.getMessage()),
+                                                 e);
+            }
+        }
+
+        s = options.remove(MAX_SSTABLES_TO_COMPACT_OPTION);
+        if (s != null)
+        {
+             try
+             {
+                 Integer.parseInt(s); // values less than or equal to 0 enable 
the default
+             }
+             catch (NumberFormatException e)
+             {
+                 throw new ConfigurationException(String.format(intParseErr,
+                                                                s,
+                                                                
MAX_SSTABLES_TO_COMPACT_OPTION),
+                                                  e);
+             }
+        }
+        s = options.remove(EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_OPTION);
+        if (s != null)
+        {
+            try
+            {
+                long expiredSSTableCheckFrequency = Long.parseLong(s);
+                if (expiredSSTableCheckFrequency <= 0)
+                    throw new 
ConfigurationException(String.format(nonPositiveErr,
+                                                                   
EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_OPTION,
+                                                                   
expiredSSTableCheckFrequency));
+            }
+            catch (NumberFormatException e)
+            {
+                throw new ConfigurationException(String.format(longParseErr,
+                                                               s,
+                                                               
EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_OPTION),
+                                                 e);
+            }
+        }
+
+        s = options.remove(ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_OPTION);
+        if (s != null && !s.equalsIgnoreCase("true") && 
!s.equalsIgnoreCase("false"))
+        {
+            throw new ConfigurationException(String.format(booleanParseErr,
+                                                           
ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_OPTION, s));
+        }
+
+        s = options.remove(OVERLAP_INCLUSION_METHOD_OPTION);
+        if (s != null)
+        {
+            try
+            {
+                Overlaps.InclusionMethod.valueOf(s.toUpperCase());
+            }
+            catch (IllegalArgumentException e)
+            {
+                throw new ConfigurationException(String.format("Invalid 
overlap inclusion method %s. The valid options are %s.",
+                                                               s,
+                                                               
Arrays.toString(Overlaps.InclusionMethod.values())));
+            }
+        }
+
+        return options;
+    }
+
+    // The methods below are implemented here (rather than directly in UCS) to 
aid testability.
+

Review Comment:
   The line is necessary to denote that the comment does not apply only to the 
next function.



##########
test/unit/org/apache/cassandra/db/compaction/unified/ShardedMultiWriterTest.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Copyright DataStax, Inc.

Review Comment:
   Done



##########
test/unit/org/apache/cassandra/db/compaction/unified/ShardedCompactionWriterTest.java:
##########
@@ -0,0 +1,295 @@
+/*
+ * Copyright DataStax, Inc.

Review Comment:
   Done



##########
test/unit/org/apache/cassandra/db/compaction/UnifiedCompactionStrategyTest.java:
##########
@@ -0,0 +1,913 @@
+/*
+ * Copyright DataStax, Inc.

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to