pcmanus commented on code in PR #3688:
URL: https://github.com/apache/cassandra/pull/3688#discussion_r1871182901


##########
src/java/org/apache/cassandra/db/compaction/CompactionController.java:
##########
@@ -145,20 +151,20 @@ public Set<SSTableReader> getFullyExpiredSSTables()
      *
      * @param cfStore
      * @param compacting we take the drop-candidates from this set, it is 
usually the sstables included in the compaction
-     * @param overlapping the sstables that overlap the ones in compacting.
+     * @param overlappingSupplier function used to get the sstables that 
overlap the ones in compacting.

Review Comment:
   Nit: I'd rephrase to something like "called on the `compacting` sstables to 
compute the set of sstables that overlap with them if needed" (both to specify 
what the function argument is, and suggest that the reason this is a method is 
to avoid the computation if it's not needed (which I assume the reason?)).



##########
src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java:
##########
@@ -331,56 +309,6 @@ public void metadataChanged(StatsMetadata oldMetadata, 
SSTableReader sstable)
     {
     }
 
-    public static class ScannerList implements AutoCloseable
-    {
-        public final List<ISSTableScanner> scanners;
-        public ScannerList(List<ISSTableScanner> scanners)
-        {
-            this.scanners = scanners;
-        }
-
-        public long getTotalBytesScanned()
-        {
-            long bytesScanned = 0L;
-            for (int i=0, isize=scanners.size(); i<isize; i++)
-                bytesScanned += scanners.get(i).getBytesScanned();
-
-            return bytesScanned;
-        }
-
-        public long getTotalCompressedSize()
-        {
-            long compressedSize = 0;
-            for (int i=0, isize=scanners.size(); i<isize; i++)
-                compressedSize += scanners.get(i).getCompressedLengthInBytes();
-
-            return compressedSize;
-        }
-
-        public double getCompressionRatio()
-        {
-            double compressed = 0.0;
-            double uncompressed = 0.0;
-
-            for (int i=0, isize=scanners.size(); i<isize; i++)
-            {
-                ISSTableScanner scanner = scanners.get(i);
-                compressed += scanner.getCompressedLengthInBytes();
-                uncompressed += scanner.getLengthInBytes();
-            }
-
-            if (compressed == uncompressed || uncompressed == 0)
-                return MetadataCollector.NO_COMPRESSION_RATIO;
-
-            return compressed / uncompressed;
-        }
-
-        public void close()
-        {
-            ISSTableScanner.closeAllAndPropagate(scanners, null);
-        }
-    }
-
     public ScannerList getScanners(Collection<SSTableReader> toCompact)

Review Comment:
   This method can go away; it does the same thing than the default 
implementation of the interface.



##########
src/java/org/apache/cassandra/db/compaction/CompactionTask.java:
##########
@@ -62,15 +64,17 @@ public class CompactionTask extends AbstractCompactionTask
     protected final boolean keepOriginals;
     protected static long totalBytesCompacted = 0;
     private ActiveCompactionsTracker activeCompactions;
+    protected final ScannerFactory scannerFactory;
 
-    public CompactionTask(ColumnFamilyStore cfs, LifecycleTransaction txn, 
long gcBefore)
+    public CompactionTask(ColumnFamilyStore cfs, AbstractCompactionStrategy 
strategy, ILifecycleTransaction txn, long gcBefore)
     {
-        this(cfs, txn, gcBefore, false);
+        this(cfs, strategy, txn, gcBefore, false);
     }
 
-    public CompactionTask(ColumnFamilyStore cfs, LifecycleTransaction txn, 
long gcBefore, boolean keepOriginals)
+    public CompactionTask(ColumnFamilyStore cfs, AbstractCompactionStrategy 
strategy, ILifecycleTransaction txn, long gcBefore, boolean keepOriginals)

Review Comment:
   Nit: I've have pass a `ScannerFactory` type here instead of the strategy. 
That's the only part used and so makes the intent imo clearer. At which point 
I'd refuse `null` and manually pass `ScannerFactory.DEFAULT` when appropriate: 
that's one less thing that require readers to dig for that a `null` mean in 
this context.



##########
src/java/org/apache/cassandra/db/compaction/CompactionTask.java:
##########
@@ -84,68 +88,107 @@ protected int executeInternal(ActiveCompactionsTracker 
activeCompactions)
     {
         this.activeCompactions = activeCompactions == null ? 
ActiveCompactionsTracker.NOOP : activeCompactions;
         run();
-        return transaction.originals().size();
+        return inputSSTables().size();
     }
 
     public boolean reduceScopeForLimitedSpace(Set<SSTableReader> 
nonExpiredSSTables, long expectedSize)
     {
-        if (partialCompactionsAcceptable() && transaction.originals().size() > 
1)
+        if (partialCompactionsAcceptable() && nonExpiredSSTables.size() > 1)
         {
             // Try again w/o the largest one.
             SSTableReader removedSSTable = 
cfs.getMaxSizeFile(nonExpiredSSTables);
             logger.warn("insufficient space to compact all requested files. 
{}MiB required, {} for compaction {} - removing largest SSTable: {}",
                         (float) expectedSize / 1024 / 1024,
-                        StringUtils.join(transaction.originals(), ", "),
-                        transaction.opId(),
+                        StringUtils.join(nonExpiredSSTables, ", "),
+                        transaction.opIdString(),
                         removedSSTable);
             // Note that we have removed files that are still marked as 
compacting.
             // This suboptimal but ok since the caller will unmark all the 
sstables at the end.
             transaction.cancel(removedSSTable);
+            nonExpiredSSTables.remove(removedSSTable);
             return true;
         }
         return false;
     }
 
+    /**
+     * @return The token range that the operation should compact. This is 
usually null, but if we have a parallelizable
+     * multi-task operation (see {@link 
UnifiedCompactionStrategy#createCompactionTasks}), it will specify a subrange.
+     */
+    protected Range<Token> tokenRange()
+    {
+        return null;
+    }
+
+    /**
+     * @return The set of input sstables for this compaction. This must be a 
subset of the transaction originals and
+     * must reflect any removal of sstables from the originals set for correct 
overlap tracking.
+     * See {@link UnifiedCompactionTask} for an example.
+     */
+    protected Set<SSTableReader> inputSSTables()
+    {
+        return transaction.originals();
+    }
+
+    /**
+     * @return True if the task should try to limit the operation size to the 
available space by removing sstables from
+     * the compacting set. This cannot be done if this is part of a multi-task 
operation with a shared transaction.
+     */
+    protected boolean shouldReduceScopeForSpace()
+    {
+        return true;
+    }
+    
     /**
      * For internal use and testing only.  The rest of the system should go 
through the submit* methods,
      * which are properly serialized.
      * Caller is in charge of marking/unmarking the sstables as compacting.
      */
+    @Override
     protected void runMayThrow() throws Exception
     {
         // The collection of sstables passed may be empty (but not null); even 
if
         // it is not empty, it may compact down to nothing if all rows are 
deleted.
         assert transaction != null;
 
-        if (transaction.originals().isEmpty())
+        if (inputSSTables().isEmpty())
             return;
 
-        // Note that the current compaction strategy, is not necessarily the 
one this task was created under.
-        // This should be harmless; see comments to 
CFS.maybeReloadCompactionStrategy.
-        CompactionStrategyManager strategy = 
cfs.getCompactionStrategyManager();
-
         if (DatabaseDescriptor.isSnapshotBeforeCompaction())
         {
             Instant creationTime = now();
             cfs.snapshotWithoutMemtable(creationTime.toEpochMilli() + 
"-compact-" + cfs.name, creationTime);
         }
 
-        try (CompactionController controller = 
getCompactionController(transaction.originals()))
+        try (CompactionController controller = 
getCompactionController(inputSSTables()))
         {
+            // Note: the controller set-up above relies on using the 
transaction-provided sstable list, from which
+            // fully-expired sstables should not be removed (so that the 
overlap tracker does not include them), but
+            // sstables excluded for scope reduction should be removed.
+            Set<SSTableReader> actuallyCompact = new 
HashSet<>(inputSSTables());
 
             final Set<SSTableReader> fullyExpiredSSTables = 
controller.getFullyExpiredSSTables();
+            if (!fullyExpiredSSTables.isEmpty())
+            {
+                logger.debug("Compaction {} dropping expired sstables: {}", 
transaction.opIdString(), fullyExpiredSSTables);
+                actuallyCompact.removeAll(fullyExpiredSSTables);
+            }
 
             TimeUUID taskId = transaction.opId();
             // select SSTables to compact based on available disk space.
-            if 
(!buildCompactionCandidatesForAvailableDiskSpace(fullyExpiredSSTables, taskId))
+            final boolean hasExpirations = !fullyExpiredSSTables.isEmpty();
+            if (shouldReduceScopeForSpace() && 
!buildCompactionCandidatesForAvailableDiskSpace(actuallyCompact, 
hasExpirations, taskId)

Review Comment:
   I'd add parenthesis to clarify the operator precedence we want (I suspect 
I'm not the only one that don't know operator precedence by heart; I reckon the 
line change here was probably meant to show the precedence, but I can't quite 
be sure it's not just your editor switching line automatically :)).



##########
src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java:
##########
@@ -100,6 +100,7 @@ public boolean 
reduceScopeForLimitedSpace(Set<SSTableReader> nonExpiredSSTables,
                             largestL0SSTable.onDiskLength(),
                             transaction.opId());
                 transaction.cancel(largestL0SSTable);
+                nonExpiredSSTables.remove(largestL0SSTable);

Review Comment:
   Should the test at the beginning of this method be updated to check 
`nonExpiredSSTables` like the generic version of this in `CompactionTask`?



##########
src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java:
##########
@@ -180,17 +178,17 @@ public void shutdown()
      *
      * Is responsible for marking its sstables as compaction-pending.
      */
-    public abstract AbstractCompactionTask getNextBackgroundTask(final long 
gcBefore);
+    public abstract Collection<AbstractCompactionTask> 
getNextBackgroundTasks(final long gcBefore);

Review Comment:
   Nit: the javadoc needs edits (`null` is never returned anymore in 
particular).



##########
src/java/org/apache/cassandra/db/lifecycle/CompositeLifecycleTransaction.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.lifecycle;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.utils.TimeUUID;
+
+/// Composite lifecycle transaction. This is a wrapper around a lifecycle 
transaction that allows for multiple partial
+/// operations that comprise the whole transaction. This is used to 
parallelize compaction operations over individual
+/// output shards where the compaction sources are shared among the 
operations; in this case we can only release the
+/// shared sources once all operations are complete.
+///
+/// A composite transaction is initialized with a main transaction that will 
be used to commit the transaction. Each
+/// part of the composite transaction must be registered with the transaction 
before it is used. The transaction must
+/// be initialized by calling [#completeInitialization()] before any of the 
processing is allowed to proceed.
+///
+/// The transaction is considered complete when all parts have been committed 
or aborted. If any part is aborted, the
+/// whole transaction is also aborted ([PartialLifecycleTransaction] will also 
throw an exception on other parts when
+/// they access it if the composite is already aborted).
+///
+/// When all parts are committed, the full transaction is applied by 
performing a checkpoint, obsoletion of the
+/// originals if any of the parts requested it, preparation and commit. This 
may somewhat violate the rules of
+/// transactions as a part that has been committed may actually have no effect 
if another part is aborted later.
+/// There are also restrictions on the operations that this model can accept, 
e.g. replacement of sources and partial
+/// checkpointing are not supported (as they are parts of early open which we 
don't aim to support at this time),
+/// and we consider that all parts will have the same opinion about the 
obsoletion of the originals.
+public class CompositeLifecycleTransaction
+{
+    protected static final Logger logger = 
LoggerFactory.getLogger(CompositeLifecycleTransaction.class);
+
+    final LifecycleTransaction mainTransaction;
+    private final AtomicInteger partsToCommitOrAbort;
+    private volatile boolean obsoleteOriginalsRequested;
+    private volatile boolean wasAborted;
+    private volatile boolean initializationComplete;
+    private volatile int partsCount = 0;
+
+    public CompositeLifecycleTransaction(LifecycleTransaction mainTransaction)
+    {
+        this.mainTransaction = mainTransaction;
+        this.partsToCommitOrAbort = new AtomicInteger(0);
+        this.wasAborted = false;
+        this.obsoleteOriginalsRequested = false;
+    }
+
+    /// Register one part of the composite transaction. Every part must 
register itself before the composite transaction
+    /// is initialized and the parts are allowed to proceed.
+    /// @param part the part to register
+    public TimeUUID register(PartialLifecycleTransaction part)
+    {
+        int index = partsToCommitOrAbort.incrementAndGet();
+        return mainTransaction.opId().withSequence(index);
+    }
+
+    /// Complete the initialization of the composite transaction. This must be 
called before any of the parts are
+    /// executed.
+    public void completeInitialization()
+    {
+        partsCount = partsToCommitOrAbort.get();
+        initializationComplete = true;
+        // TODO: Switch to trace before merging.

Review Comment:
   Flagging this so it doesn't get forgotten.



##########
src/java/org/apache/cassandra/db/compaction/UnifiedCompactionStrategy.java:
##########
@@ -139,47 +139,80 @@ private static int atLeast2(int value, String str)
     public static String printScalingParameter(int w)
     {
         if (w < 0)
-            return "L" + Integer.toString(2 - w);
+            return 'L' + Integer.toString(2 - w);
         else if (w > 0)
-            return "T" + Integer.toString(w + 2);
+            return 'T' + Integer.toString(w + 2);
         else
             return "N";
     }
 
+    private TimeUUID nextTimeUUID()
+    {
+        return TimeUUID.Generator.nextTimeUUID().withSequence(0);

Review Comment:
   Reading further, I realize this is due to how 
`CompositeLifecycleTransaction` name its subtasks. It's fine, but again, worth 
documenting since the underlying reason is a bit "distant".



##########
src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java:
##########
@@ -62,7 +60,7 @@
  *    i/o done by compaction, and merging done at read time.
  *  - perform a full (maximum possible) compaction if requested by the user
  */
-public abstract class AbstractCompactionStrategy
+public abstract class AbstractCompactionStrategy implements ScannerFactory

Review Comment:
   Nit: I find it slightly convoluted to implement `ScannerFactory`, and only a 
handful  of calls to `getScanner` on the strategy remains. I'd have added a 
protected method instead (for LCS to override)
   ```java
   protected ScannerFactory scannerFactory()
   {
      return ScannerFactory.DEFAULT;
   }
   ```



##########
src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java:
##########
@@ -52,6 +52,13 @@ public interface ColumnFamilyStoreMBean
      */
     public void forceMajorCompaction(boolean splitOutput) throws 
ExecutionException, InterruptedException;
 
+    /**
+     * force a major compaction of this column family
+     *
+     * @param permittedParallelism the maximum number of compaction threads 
that can be used by the operation

Review Comment:
   Nit: wouldn't hurt to say that `permittedParallelism <= 0` is supported and 
treated specially.



##########
src/java/org/apache/cassandra/db/compaction/CompactionTask.java:
##########
@@ -408,12 +450,8 @@ protected boolean 
buildCompactionCandidatesForAvailableDiskSpace(final Set<SSTab
                 // usually means we've run out of disk space
 
                 // but we can still compact expired SSTables
-                if(partialCompactionsAcceptable() && 
fullyExpiredSSTables.size() > 0 )
-                {
-                    // sanity check to make sure we compact only fully expired 
SSTables.
-                    assert 
transaction.originals().equals(fullyExpiredSSTables);
+                if (partialCompactionsAcceptable() && containsExpired)

Review Comment:
   I'm not quite sure about this, but possibly because I don't fully understand 
this part.
   
   My reading of the code before is that `reduceScope() == false` and 
`partialCompactionsAcceptable` implies that `transaction.originals() <= 1`, and 
so if we also have `fullyExpiredSSTable > 0`, then it did imply the one 
remaining sstable was expired, which the code asserted (it asserted we only had 
fully expired remaining, but I'm not there could have been more than one given 
the condition in `reduceScopeForLimitedSpace`).
   
   With the change, `reduceScope() == false` and `partialCompactionsAcceptable` 
implies that `nonExpiredSSTables <= 1`. But afaict, even if we have expired 
sstables, that does not exclude having 1 remaining non expired sstable, which 
seems to contradict the "we can still compact expired SSTables".
   
   Will all that said, I don't really understand what this "we can still 
compact expired SSTables" branch does in the first place, even before the 
changes. Because even if we exit through this branch, I don't see 
`CompactionTask#runMayThrow` doing anything with those "fully expired sstables" 
in the first place.
   
   I'll lastly remark that `reduceCopeForLimitedSpace` is also overwritten by 
LCS, and I'm not sure how to reason about that one (neither before or after 
this patch).
   
   



##########
src/java/org/apache/cassandra/db/compaction/CompositeCompactionTask.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.cassandra.utils.Throwables;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/// A composition of several compaction tasks into one. This is used to limit 
the parallelism of some compaction tasks

Review Comment:
   Nit: maybe specify here that the composition executes task sequentially?



##########
src/java/org/apache/cassandra/db/compaction/CompactionManager.java:
##########
@@ -1805,13 +1930,12 @@ public void obsoleteOriginals() {}
             public void close() {}
         }
 
-        CompactionStrategyManager strategy = 
cfs.getCompactionStrategyManager();
         try (SharedTxn sharedTxn = new SharedTxn(txn);
              SSTableRewriter fullWriter = 
SSTableRewriter.constructWithoutEarlyOpening(sharedTxn, false, groupMaxDataAge);
              SSTableRewriter transWriter = 
SSTableRewriter.constructWithoutEarlyOpening(sharedTxn, false, groupMaxDataAge);
              SSTableRewriter unrepairedWriter = 
SSTableRewriter.constructWithoutEarlyOpening(sharedTxn, false, groupMaxDataAge);
 
-             AbstractCompactionStrategy.ScannerList scanners = 
strategy.getScanners(txn.originals());
+             ScannerList scanners = 
ScannerFactory.DEFAULT.getScanners(txn.originals());

Review Comment:
   This appears to disable the LCS "optimisation" with scanners. Am I 
misreading it? Is it intentional?



##########
src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java:
##########
@@ -352,22 +346,25 @@ synchronized int getNumPendingRepairFinishedTasks()
         return count;
     }
 
-    synchronized AbstractCompactionTask getNextRepairFinishedTask()
+    synchronized Collection<AbstractCompactionTask> 
getNextRepairFinishedTasks()
     {
+        List<AbstractCompactionTask> tasks = null;

Review Comment:
   Any specific reason to use `null` here instead of returning an empty 
collection? Using an empty collection would seem more consistent with  the 
other methods that are passed to `TaskSupplier` (and would even allow to expect 
`TaskSupplier.get()` to never return `null`).



##########
src/java/org/apache/cassandra/db/compaction/CompactionManager.java:
##########
@@ -376,24 +386,129 @@ public void run()
                 }
 
                 CompactionStrategyManager strategy = 
cfs.getCompactionStrategyManager();
-                AbstractCompactionTask task = 
strategy.getNextBackgroundTask(getDefaultGcBefore(cfs, 
FBUtilities.nowInSeconds()));
-                if (task == null)
+                tasks = 
strategy.getNextBackgroundTasks(getDefaultGcBefore(cfs, 
FBUtilities.nowInSeconds()));
+                if (tasks == null || tasks.isEmpty())
                 {
                     if (DatabaseDescriptor.automaticSSTableUpgrade())
                         ranCompaction = maybeRunUpgradeTask(strategy);
                 }
-                else
+                else if (tasks.size() == 1)
                 {
-                    task.execute(active);
+                    // If just one task, run it directly on this thread
+                    for (AbstractCompactionTask task : tasks)
+                        task.execute(active);
                     ranCompaction = true;
                 }
+                else

Review Comment:
   Nit: I personally really find `if` that mix branches with and without 
brackets distracting/bad (and I've not seen others use them, though admittedly 
my personal experience with this particular code base is outdated of late). I 
could have swear this was explicitly prohibited by the coding style, but it's 
admittedly not clearly stated in the current form, and so be it. But this one 
trips me every time, so I figured I'd at least voice my opinion once, just so I 
feel better having voiced it, but it's quite a personal preference and feel 
free to ignore (and I won't bring it again).



##########
src/java/org/apache/cassandra/db/compaction/CompositeCompactionTask.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.cassandra.utils.Throwables;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/// A composition of several compaction tasks into one. This is used to limit 
the parallelism of some compaction tasks
+/// that split into a large number of parallelizable tasks but should not be 
allowed to take all compaction executor
+/// threads.
+public class CompositeCompactionTask extends AbstractCompactionTask
+{
+    @VisibleForTesting
+    final ArrayList<AbstractCompactionTask> tasks;
+
+    public CompositeCompactionTask(AbstractCompactionTask first)
+    {
+        super(first.cfs, 
first.cfs.getTracker().tryModify(Collections.emptyList(), 
OperationType.COMPACTION));
+        tasks = new ArrayList<>();
+        addTask(first);
+    }
+
+    /// Add a task to the composition.
+    public CompositeCompactionTask addTask(AbstractCompactionTask task)
+    {
+        tasks.add(task);
+        return this;
+    }
+
+    @Override
+    protected int executeInternal(ActiveCompactionsTracker tracker)

Review Comment:
   Nit: I couldn't find a single call of `execute` or `executeInternal` that 
was using the returned value. Maybe remove it and simplify instead of papering 
over it here?



##########
src/java/org/apache/cassandra/db/compaction/UnifiedCompactionStrategy.java:
##########
@@ -139,47 +139,80 @@ private static int atLeast2(int value, String str)
     public static String printScalingParameter(int w)
     {
         if (w < 0)
-            return "L" + Integer.toString(2 - w);
+            return 'L' + Integer.toString(2 - w);
         else if (w > 0)
-            return "T" + Integer.toString(w + 2);
+            return 'T' + Integer.toString(w + 2);
         else
             return "N";
     }
 
+    private TimeUUID nextTimeUUID()
+    {
+        return TimeUUID.Generator.nextTimeUUID().withSequence(0);

Review Comment:
   It's not quite obvious to me why we're doing this. Can you explain/document?



##########
src/java/org/apache/cassandra/db/compaction/UnifiedCompactionStrategy.java:
##########
@@ -139,47 +139,80 @@ private static int atLeast2(int value, String str)
     public static String printScalingParameter(int w)
     {
         if (w < 0)
-            return "L" + Integer.toString(2 - w);
+            return 'L' + Integer.toString(2 - w);
         else if (w > 0)
-            return "T" + Integer.toString(w + 2);
+            return 'T' + Integer.toString(w + 2);
         else
             return "N";
     }
 
+    private TimeUUID nextTimeUUID()
+    {
+        return TimeUUID.Generator.nextTimeUUID().withSequence(0);
+    }
+
     @Override
-    public synchronized Collection<AbstractCompactionTask> getMaximalTask(long 
gcBefore, boolean splitOutput)
+    public synchronized Collection<AbstractCompactionTask> 
getMaximalTasks(long gcBefore, boolean splitOutput, int permittedParallelism)
     {
         maybeUpdateShardManager();
         // The tasks are split by repair status and disk, as well as in 
non-overlapping sections to enable some
         // parallelism (to the amount that L0 sstables are split, i.e. at 
least base_shard_count). The result will be
         // split across shards according to its density. Depending on the 
parallelism, the operation may require up to
         // 100% extra space to complete.
         List<AbstractCompactionTask> tasks = new ArrayList<>();
-        List<Set<SSTableReader>> nonOverlapping = 
splitInNonOverlappingSets(filterSuspectSSTables(getSSTables()));
-        for (Set<SSTableReader> set : nonOverlapping)
+        if (permittedParallelism <= 0)

Review Comment:
   Is this ever used? `CompactionManager.submitMaximal`, which I believe call 
this, already default it to `#compactors / 2` (it's also a tad confusing that 
that both places don't agree on what the default means).



##########
src/java/org/apache/cassandra/db/compaction/ShardManagerTrivial.java:
##########
@@ -54,14 +57,21 @@ public double rangeSpanned(SSTableReader rdr)
     }
 
     @Override
-    public double calculateCombinedDensity(Set<? extends SSTableReader> 
sstables)
+    public double calculateCombinedDensity(Collection<SSTableReader> sstables)
     {
         double totalSize = 0;
         for (SSTableReader sstable : sstables)
             totalSize += sstable.onDiskLength();
         return totalSize;
     }
 
+    public <T> List<T> splitSSTablesInShards(Collection<SSTableReader> 
sstables,

Review Comment:
   Nit: add `@Override`?



##########
src/java/org/apache/cassandra/tools/nodetool/Compact.java:
##########
@@ -50,6 +50,12 @@ public class Compact extends NodeToolCmd
     @Option(title = "partition_key", name = {"--partition"}, description = 
"String representation of the partition key")
     private String partitionKey = EMPTY;
 
+    @Option(title = "jobs",
+            name = {"-j", "--jobs"},
+            description = "Use -j to specify the maximum number of threads to 
use for parallel compaction. " +
+                          "If not set, up to half the compaction threads will 
be used. " +
+                          "If set to 0, the major compaction will use all 
threads and will not permit other compactions to run until it completes (use 
with caution).")

Review Comment:
   I don't think this comment is up-to-date. Afaict, unset and 0 behave the 
same way.



##########
src/java/org/apache/cassandra/db/lifecycle/CompositeLifecycleTransaction.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.lifecycle;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.utils.TimeUUID;
+
+/// Composite lifecycle transaction. This is a wrapper around a lifecycle 
transaction that allows for multiple partial
+/// operations that comprise the whole transaction. This is used to 
parallelize compaction operations over individual
+/// output shards where the compaction sources are shared among the 
operations; in this case we can only release the
+/// shared sources once all operations are complete.
+///
+/// A composite transaction is initialized with a main transaction that will 
be used to commit the transaction. Each
+/// part of the composite transaction must be registered with the transaction 
before it is used. The transaction must
+/// be initialized by calling [#completeInitialization()] before any of the 
processing is allowed to proceed.
+///
+/// The transaction is considered complete when all parts have been committed 
or aborted. If any part is aborted, the
+/// whole transaction is also aborted ([PartialLifecycleTransaction] will also 
throw an exception on other parts when
+/// they access it if the composite is already aborted).
+///
+/// When all parts are committed, the full transaction is applied by 
performing a checkpoint, obsoletion of the
+/// originals if any of the parts requested it, preparation and commit. This 
may somewhat violate the rules of
+/// transactions as a part that has been committed may actually have no effect 
if another part is aborted later.
+/// There are also restrictions on the operations that this model can accept, 
e.g. replacement of sources and partial
+/// checkpointing are not supported (as they are parts of early open which we 
don't aim to support at this time),
+/// and we consider that all parts will have the same opinion about the 
obsoletion of the originals.
+public class CompositeLifecycleTransaction
+{
+    protected static final Logger logger = 
LoggerFactory.getLogger(CompositeLifecycleTransaction.class);
+
+    final LifecycleTransaction mainTransaction;
+    private final AtomicInteger partsToCommitOrAbort;
+    private volatile boolean obsoleteOriginalsRequested;
+    private volatile boolean wasAborted;
+    private volatile boolean initializationComplete;
+    private volatile int partsCount = 0;
+
+    public CompositeLifecycleTransaction(LifecycleTransaction mainTransaction)
+    {
+        this.mainTransaction = mainTransaction;
+        this.partsToCommitOrAbort = new AtomicInteger(0);
+        this.wasAborted = false;
+        this.obsoleteOriginalsRequested = false;
+    }
+
+    /// Register one part of the composite transaction. Every part must 
register itself before the composite transaction
+    /// is initialized and the parts are allowed to proceed.
+    /// @param part the part to register
+    public TimeUUID register(PartialLifecycleTransaction part)
+    {
+        int index = partsToCommitOrAbort.incrementAndGet();
+        return mainTransaction.opId().withSequence(index);

Review Comment:
   Nit: in practice, the idea of having linked operation ID between the parent 
and child tasks make sense. But this does kind of assume that the 
`mainTransaction` ID was generated with a 0 sequence (nothing would strongly 
break if that's not the case, but it's still somewhat assumed here), and that's 
not documented/easy to miss.



##########
src/java/org/apache/cassandra/db/compaction/unified/UnifiedCompactionTask.java:
##########
@@ -36,26 +41,76 @@ public class UnifiedCompactionTask extends CompactionTask
 {
     private final ShardManager shardManager;
     private final Controller controller;
+    private final Range<Token> operationRange;
+    private final Set<SSTableReader> actuallyCompact;
 
     public UnifiedCompactionTask(ColumnFamilyStore cfs,
                                  UnifiedCompactionStrategy strategy,
-                                 LifecycleTransaction txn,
+                                 ILifecycleTransaction txn,
                                  long gcBefore,
                                  ShardManager shardManager)
     {
-        super(cfs, txn, gcBefore);
+        this(cfs, strategy, txn, gcBefore, shardManager, null, null);
+    }
+
+    public UnifiedCompactionTask(ColumnFamilyStore cfs,
+                                 UnifiedCompactionStrategy strategy,
+                                 ILifecycleTransaction txn,
+                                 long gcBefore,
+                                 ShardManager shardManager,
+                                 Range<Token> operationRange,
+                                 Collection<SSTableReader> actuallyCompact)
+    {
+        super(cfs, strategy, txn, gcBefore);
         this.controller = strategy.getController();
         this.shardManager = shardManager;
+
+        if (operationRange != null)
+        {
+            assert actuallyCompact != null : "Ranged tasks should use a set of 
sstables to compact";
+        }
+        this.operationRange = operationRange;
+        // To make sure actuallyCompact tracks any removals from 
txn.originals(), we intersect the given set with it.
+        // This should not be entirely necessary (as 
shouldReduceScopeForSpace() is false for ranged tasks), but it
+        // is cleaner to enforce inputSSTables()'s requirements.
+        this.actuallyCompact = actuallyCompact != null ? 
Sets.intersection(ImmutableSet.copyOf(actuallyCompact),
+                                                                           
txn.originals())
+                                                       : txn.originals();
     }
 
     @Override
     public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore 
cfs,
                                                           Directories 
directories,
-                                                          LifecycleTransaction 
txn,
+                                                          
ILifecycleTransaction txn,
                                                           Set<SSTableReader> 
nonExpiredSSTables)
     {
         double density = 
shardManager.calculateCombinedDensity(nonExpiredSSTables);
         int numShards = controller.getNumShards(density * 
shardManager.shardSetCoverage());
-        return new ShardedCompactionWriter(cfs, directories, txn, 
nonExpiredSSTables, keepOriginals, shardManager.boundaries(numShards));
+        boolean earlyOpenAllowed = tokenRange() == null;
+        return new ShardedCompactionWriter(cfs,
+                                           directories,
+                                           txn,
+                                           nonExpiredSSTables,
+                                           keepOriginals,
+                                           earlyOpenAllowed,
+                                           shardManager.boundaries(numShards));
+    }
+
+    @Override
+    protected Range<Token> tokenRange()
+    {
+        return operationRange;
+    }
+
+    @Override

Review Comment:
   Can you document why we should not reduce scope when `tokenRange() == null`?



##########
src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java:
##########
@@ -221,7 +222,7 @@ public CassandraValidationIterator(ColumnFamilyStore cfs, 
SharedContext ctx, Col
 
         long gcBefore = dontPurgeTombstones ? Long.MIN_VALUE : 
getDefaultGcBefore(cfs, nowInSec);
         controller = new ValidationCompactionController(cfs, gcBefore);
-        scanners = cfs.getCompactionStrategyManager().getScanners(sstables, 
ranges);
+        scanners = ScannerFactory.DEFAULT.getScanners(sstables, ranges);

Review Comment:
   Not super fussed about it, but isn't it a slight regression that this is not 
allowed to rely on the compaction strategy to optimize the scanners?



##########
src/java/org/apache/cassandra/utils/TimeUUID.java:
##########
@@ -293,6 +293,22 @@ public static String toString(TimeUUID ballot, String kind)
         return ballot == null ? "null" : String.format("%s(%d:%s)", kind, 
ballot.uuidTimestamp(), ballot);
     }
 
+    public int sequence()
+    {
+        return (int) ((lsb >> 48) & 0x0000000000003FFFL);
+    }
+
+    /**
+     * Returns a new TimeUUID with the same timestamp as this one, but with 
the provided sequence value.

Review Comment:
   Let's call out that this method should be used with care as it removes 
"uniqueness" guarantees of the returned `TimeUUID` (same as we warn on 
`minAtUnixMillis` for instance).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to