blambov commented on code in PR #3688:
URL: https://github.com/apache/cassandra/pull/3688#discussion_r1872852784


##########
src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java:
##########
@@ -52,6 +52,13 @@ public interface ColumnFamilyStoreMBean
      */
     public void forceMajorCompaction(boolean splitOutput) throws 
ExecutionException, InterruptedException;
 
+    /**
+     * force a major compaction of this column family
+     *
+     * @param permittedParallelism the maximum number of compaction threads 
that can be used by the operation

Review Comment:
   Good call. Verifying that it is indeed applied as described uncovered 
problems.
   
   In fact, the strategy itself is the wrong place to apply this limit, because 
we may have other tasks (e.g. from other repair state or other disks).
   
   Moved all the limit application to `CompactionStrategyManager`



##########
src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java:
##########
@@ -331,56 +309,6 @@ public void metadataChanged(StatsMetadata oldMetadata, 
SSTableReader sstable)
     {
     }
 
-    public static class ScannerList implements AutoCloseable
-    {
-        public final List<ISSTableScanner> scanners;
-        public ScannerList(List<ISSTableScanner> scanners)
-        {
-            this.scanners = scanners;
-        }
-
-        public long getTotalBytesScanned()
-        {
-            long bytesScanned = 0L;
-            for (int i=0, isize=scanners.size(); i<isize; i++)
-                bytesScanned += scanners.get(i).getBytesScanned();
-
-            return bytesScanned;
-        }
-
-        public long getTotalCompressedSize()
-        {
-            long compressedSize = 0;
-            for (int i=0, isize=scanners.size(); i<isize; i++)
-                compressedSize += scanners.get(i).getCompressedLengthInBytes();
-
-            return compressedSize;
-        }
-
-        public double getCompressionRatio()
-        {
-            double compressed = 0.0;
-            double uncompressed = 0.0;
-
-            for (int i=0, isize=scanners.size(); i<isize; i++)
-            {
-                ISSTableScanner scanner = scanners.get(i);
-                compressed += scanner.getCompressedLengthInBytes();
-                uncompressed += scanner.getLengthInBytes();
-            }
-
-            if (compressed == uncompressed || uncompressed == 0)
-                return MetadataCollector.NO_COMPRESSION_RATIO;
-
-            return compressed / uncompressed;
-        }
-
-        public void close()
-        {
-            ISSTableScanner.closeAllAndPropagate(scanners, null);
-        }
-    }
-
     public ScannerList getScanners(Collection<SSTableReader> toCompact)

Review Comment:
   Done



##########
src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java:
##########
@@ -62,7 +60,7 @@
  *    i/o done by compaction, and merging done at read time.
  *  - perform a full (maximum possible) compaction if requested by the user
  */
-public abstract class AbstractCompactionStrategy
+public abstract class AbstractCompactionStrategy implements ScannerFactory

Review Comment:
   Done



##########
src/java/org/apache/cassandra/db/compaction/CompositeCompactionTask.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.cassandra.utils.Throwables;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/// A composition of several compaction tasks into one. This is used to limit 
the parallelism of some compaction tasks
+/// that split into a large number of parallelizable tasks but should not be 
allowed to take all compaction executor
+/// threads.
+public class CompositeCompactionTask extends AbstractCompactionTask
+{
+    @VisibleForTesting
+    final ArrayList<AbstractCompactionTask> tasks;
+
+    public CompositeCompactionTask(AbstractCompactionTask first)
+    {
+        super(first.cfs, 
first.cfs.getTracker().tryModify(Collections.emptyList(), 
OperationType.COMPACTION));
+        tasks = new ArrayList<>();
+        addTask(first);
+    }
+
+    /// Add a task to the composition.
+    public CompositeCompactionTask addTask(AbstractCompactionTask task)
+    {
+        tasks.add(task);
+        return this;
+    }
+
+    @Override
+    protected int executeInternal(ActiveCompactionsTracker tracker)

Review Comment:
   Done



##########
src/java/org/apache/cassandra/db/compaction/unified/UnifiedCompactionTask.java:
##########
@@ -36,26 +41,76 @@ public class UnifiedCompactionTask extends CompactionTask
 {
     private final ShardManager shardManager;
     private final Controller controller;
+    private final Range<Token> operationRange;
+    private final Set<SSTableReader> actuallyCompact;
 
     public UnifiedCompactionTask(ColumnFamilyStore cfs,
                                  UnifiedCompactionStrategy strategy,
-                                 LifecycleTransaction txn,
+                                 ILifecycleTransaction txn,
                                  long gcBefore,
                                  ShardManager shardManager)
     {
-        super(cfs, txn, gcBefore);
+        this(cfs, strategy, txn, gcBefore, shardManager, null, null);
+    }
+
+    public UnifiedCompactionTask(ColumnFamilyStore cfs,
+                                 UnifiedCompactionStrategy strategy,
+                                 ILifecycleTransaction txn,
+                                 long gcBefore,
+                                 ShardManager shardManager,
+                                 Range<Token> operationRange,
+                                 Collection<SSTableReader> actuallyCompact)
+    {
+        super(cfs, strategy, txn, gcBefore);
         this.controller = strategy.getController();
         this.shardManager = shardManager;
+
+        if (operationRange != null)
+        {
+            assert actuallyCompact != null : "Ranged tasks should use a set of 
sstables to compact";
+        }
+        this.operationRange = operationRange;
+        // To make sure actuallyCompact tracks any removals from 
txn.originals(), we intersect the given set with it.
+        // This should not be entirely necessary (as 
shouldReduceScopeForSpace() is false for ranged tasks), but it
+        // is cleaner to enforce inputSSTables()'s requirements.
+        this.actuallyCompact = actuallyCompact != null ? 
Sets.intersection(ImmutableSet.copyOf(actuallyCompact),
+                                                                           
txn.originals())
+                                                       : txn.originals();
     }
 
     @Override
     public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore 
cfs,
                                                           Directories 
directories,
-                                                          LifecycleTransaction 
txn,
+                                                          
ILifecycleTransaction txn,
                                                           Set<SSTableReader> 
nonExpiredSSTables)
     {
         double density = 
shardManager.calculateCombinedDensity(nonExpiredSSTables);
         int numShards = controller.getNumShards(density * 
shardManager.shardSetCoverage());
-        return new ShardedCompactionWriter(cfs, directories, txn, 
nonExpiredSSTables, keepOriginals, shardManager.boundaries(numShards));
+        boolean earlyOpenAllowed = tokenRange() == null;
+        return new ShardedCompactionWriter(cfs,
+                                           directories,
+                                           txn,
+                                           nonExpiredSSTables,
+                                           keepOriginals,
+                                           earlyOpenAllowed,
+                                           shardManager.boundaries(numShards));
+    }
+
+    @Override
+    protected Range<Token> tokenRange()
+    {
+        return operationRange;
+    }
+
+    @Override

Review Comment:
   Added



##########
src/java/org/apache/cassandra/db/lifecycle/CompositeLifecycleTransaction.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.lifecycle;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.utils.TimeUUID;
+
+/// Composite lifecycle transaction. This is a wrapper around a lifecycle 
transaction that allows for multiple partial
+/// operations that comprise the whole transaction. This is used to 
parallelize compaction operations over individual
+/// output shards where the compaction sources are shared among the 
operations; in this case we can only release the
+/// shared sources once all operations are complete.
+///
+/// A composite transaction is initialized with a main transaction that will 
be used to commit the transaction. Each
+/// part of the composite transaction must be registered with the transaction 
before it is used. The transaction must
+/// be initialized by calling [#completeInitialization()] before any of the 
processing is allowed to proceed.
+///
+/// The transaction is considered complete when all parts have been committed 
or aborted. If any part is aborted, the
+/// whole transaction is also aborted ([PartialLifecycleTransaction] will also 
throw an exception on other parts when
+/// they access it if the composite is already aborted).
+///
+/// When all parts are committed, the full transaction is applied by 
performing a checkpoint, obsoletion of the
+/// originals if any of the parts requested it, preparation and commit. This 
may somewhat violate the rules of
+/// transactions as a part that has been committed may actually have no effect 
if another part is aborted later.
+/// There are also restrictions on the operations that this model can accept, 
e.g. replacement of sources and partial
+/// checkpointing are not supported (as they are parts of early open which we 
don't aim to support at this time),
+/// and we consider that all parts will have the same opinion about the 
obsoletion of the originals.
+public class CompositeLifecycleTransaction
+{
+    protected static final Logger logger = 
LoggerFactory.getLogger(CompositeLifecycleTransaction.class);
+
+    final LifecycleTransaction mainTransaction;
+    private final AtomicInteger partsToCommitOrAbort;
+    private volatile boolean obsoleteOriginalsRequested;
+    private volatile boolean wasAborted;
+    private volatile boolean initializationComplete;
+    private volatile int partsCount = 0;
+
+    public CompositeLifecycleTransaction(LifecycleTransaction mainTransaction)
+    {
+        this.mainTransaction = mainTransaction;
+        this.partsToCommitOrAbort = new AtomicInteger(0);
+        this.wasAborted = false;
+        this.obsoleteOriginalsRequested = false;
+    }
+
+    /// Register one part of the composite transaction. Every part must 
register itself before the composite transaction
+    /// is initialized and the parts are allowed to proceed.
+    /// @param part the part to register
+    public TimeUUID register(PartialLifecycleTransaction part)
+    {
+        int index = partsToCommitOrAbort.incrementAndGet();
+        return mainTransaction.opId().withSequence(index);

Review Comment:
   Added comment to the constructor.



##########
src/java/org/apache/cassandra/utils/TimeUUID.java:
##########
@@ -293,6 +293,22 @@ public static String toString(TimeUUID ballot, String kind)
         return ballot == null ? "null" : String.format("%s(%d:%s)", kind, 
ballot.uuidTimestamp(), ballot);
     }
 
+    public int sequence()
+    {
+        return (int) ((lsb >> 48) & 0x0000000000003FFFL);
+    }
+
+    /**
+     * Returns a new TimeUUID with the same timestamp as this one, but with 
the provided sequence value.

Review Comment:
   Added



##########
src/java/org/apache/cassandra/db/compaction/CompositeCompactionTask.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.cassandra.utils.Throwables;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/// A composition of several compaction tasks into one. This is used to limit 
the parallelism of some compaction tasks

Review Comment:
   Done



##########
src/java/org/apache/cassandra/db/compaction/CompactionManager.java:
##########
@@ -376,24 +386,129 @@ public void run()
                 }
 
                 CompactionStrategyManager strategy = 
cfs.getCompactionStrategyManager();
-                AbstractCompactionTask task = 
strategy.getNextBackgroundTask(getDefaultGcBefore(cfs, 
FBUtilities.nowInSeconds()));
-                if (task == null)
+                tasks = 
strategy.getNextBackgroundTasks(getDefaultGcBefore(cfs, 
FBUtilities.nowInSeconds()));
+                if (tasks == null || tasks.isEmpty())
                 {
                     if (DatabaseDescriptor.automaticSSTableUpgrade())
                         ranCompaction = maybeRunUpgradeTask(strategy);
                 }
-                else
+                else if (tasks.size() == 1)
                 {
-                    task.execute(active);
+                    // If just one task, run it directly on this thread
+                    for (AbstractCompactionTask task : tasks)
+                        task.execute(active);
                     ranCompaction = true;
                 }
+                else

Review Comment:
   Changed



##########
src/java/org/apache/cassandra/db/compaction/CompactionController.java:
##########
@@ -145,20 +151,20 @@ public Set<SSTableReader> getFullyExpiredSSTables()
      *
      * @param cfStore
      * @param compacting we take the drop-candidates from this set, it is 
usually the sstables included in the compaction
-     * @param overlapping the sstables that overlap the ones in compacting.
+     * @param overlappingSupplier function used to get the sstables that 
overlap the ones in compacting.

Review Comment:
   Done



##########
src/java/org/apache/cassandra/db/compaction/CompactionManager.java:
##########
@@ -1805,13 +1930,12 @@ public void obsoleteOriginals() {}
             public void close() {}
         }
 
-        CompactionStrategyManager strategy = 
cfs.getCompactionStrategyManager();
         try (SharedTxn sharedTxn = new SharedTxn(txn);
              SSTableRewriter fullWriter = 
SSTableRewriter.constructWithoutEarlyOpening(sharedTxn, false, groupMaxDataAge);
              SSTableRewriter transWriter = 
SSTableRewriter.constructWithoutEarlyOpening(sharedTxn, false, groupMaxDataAge);
              SSTableRewriter unrepairedWriter = 
SSTableRewriter.constructWithoutEarlyOpening(sharedTxn, false, groupMaxDataAge);
 
-             AbstractCompactionStrategy.ScannerList scanners = 
strategy.getScanners(txn.originals());
+             ScannerList scanners = 
ScannerFactory.DEFAULT.getScanners(txn.originals());

Review Comment:
   Yes, it does. Taking advantage of it in places like this is the reason for 
the complexity I was trying to get rid of (the sstable set must be split for 
the individual strategy instances, then combined back; this adds cost to normal 
compactions).
   
   Note that since CASSANDRA-8915, the cost of not using the LCS-specific 
scanner fell dramatically to just one key comparison per partition (i.e. one 
long comparison in most cases).



##########
src/java/org/apache/cassandra/db/compaction/CompactionTask.java:
##########
@@ -62,15 +64,17 @@ public class CompactionTask extends AbstractCompactionTask
     protected final boolean keepOriginals;
     protected static long totalBytesCompacted = 0;
     private ActiveCompactionsTracker activeCompactions;
+    protected final ScannerFactory scannerFactory;
 
-    public CompactionTask(ColumnFamilyStore cfs, LifecycleTransaction txn, 
long gcBefore)
+    public CompactionTask(ColumnFamilyStore cfs, AbstractCompactionStrategy 
strategy, ILifecycleTransaction txn, long gcBefore)
     {
-        this(cfs, txn, gcBefore, false);
+        this(cfs, strategy, txn, gcBefore, false);
     }
 
-    public CompactionTask(ColumnFamilyStore cfs, LifecycleTransaction txn, 
long gcBefore, boolean keepOriginals)
+    public CompactionTask(ColumnFamilyStore cfs, AbstractCompactionStrategy 
strategy, ILifecycleTransaction txn, long gcBefore, boolean keepOriginals)

Review Comment:
   Done; it was a little annoying, though, there were quite a few calls that 
needed to be adjusted.
   
   I am tempted to completely remove the LCS-specific scanner.



##########
src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java:
##########
@@ -100,6 +100,7 @@ public boolean 
reduceScopeForLimitedSpace(Set<SSTableReader> nonExpiredSSTables,
                             largestL0SSTable.onDiskLength(),
                             transaction.opId());
                 transaction.cancel(largestL0SSTable);
+                nonExpiredSSTables.remove(largestL0SSTable);

Review Comment:
   Yes, done



##########
src/java/org/apache/cassandra/db/compaction/CompactionTask.java:
##########
@@ -84,68 +88,107 @@ protected int executeInternal(ActiveCompactionsTracker 
activeCompactions)
     {
         this.activeCompactions = activeCompactions == null ? 
ActiveCompactionsTracker.NOOP : activeCompactions;
         run();
-        return transaction.originals().size();
+        return inputSSTables().size();
     }
 
     public boolean reduceScopeForLimitedSpace(Set<SSTableReader> 
nonExpiredSSTables, long expectedSize)
     {
-        if (partialCompactionsAcceptable() && transaction.originals().size() > 
1)
+        if (partialCompactionsAcceptable() && nonExpiredSSTables.size() > 1)
         {
             // Try again w/o the largest one.
             SSTableReader removedSSTable = 
cfs.getMaxSizeFile(nonExpiredSSTables);
             logger.warn("insufficient space to compact all requested files. 
{}MiB required, {} for compaction {} - removing largest SSTable: {}",
                         (float) expectedSize / 1024 / 1024,
-                        StringUtils.join(transaction.originals(), ", "),
-                        transaction.opId(),
+                        StringUtils.join(nonExpiredSSTables, ", "),
+                        transaction.opIdString(),
                         removedSSTable);
             // Note that we have removed files that are still marked as 
compacting.
             // This suboptimal but ok since the caller will unmark all the 
sstables at the end.
             transaction.cancel(removedSSTable);
+            nonExpiredSSTables.remove(removedSSTable);
             return true;
         }
         return false;
     }
 
+    /**
+     * @return The token range that the operation should compact. This is 
usually null, but if we have a parallelizable
+     * multi-task operation (see {@link 
UnifiedCompactionStrategy#createCompactionTasks}), it will specify a subrange.
+     */
+    protected Range<Token> tokenRange()
+    {
+        return null;
+    }
+
+    /**
+     * @return The set of input sstables for this compaction. This must be a 
subset of the transaction originals and
+     * must reflect any removal of sstables from the originals set for correct 
overlap tracking.
+     * See {@link UnifiedCompactionTask} for an example.
+     */
+    protected Set<SSTableReader> inputSSTables()
+    {
+        return transaction.originals();
+    }
+
+    /**
+     * @return True if the task should try to limit the operation size to the 
available space by removing sstables from
+     * the compacting set. This cannot be done if this is part of a multi-task 
operation with a shared transaction.
+     */
+    protected boolean shouldReduceScopeForSpace()
+    {
+        return true;
+    }
+    
     /**
      * For internal use and testing only.  The rest of the system should go 
through the submit* methods,
      * which are properly serialized.
      * Caller is in charge of marking/unmarking the sstables as compacting.
      */
+    @Override
     protected void runMayThrow() throws Exception
     {
         // The collection of sstables passed may be empty (but not null); even 
if
         // it is not empty, it may compact down to nothing if all rows are 
deleted.
         assert transaction != null;
 
-        if (transaction.originals().isEmpty())
+        if (inputSSTables().isEmpty())
             return;
 
-        // Note that the current compaction strategy, is not necessarily the 
one this task was created under.
-        // This should be harmless; see comments to 
CFS.maybeReloadCompactionStrategy.
-        CompactionStrategyManager strategy = 
cfs.getCompactionStrategyManager();
-
         if (DatabaseDescriptor.isSnapshotBeforeCompaction())
         {
             Instant creationTime = now();
             cfs.snapshotWithoutMemtable(creationTime.toEpochMilli() + 
"-compact-" + cfs.name, creationTime);
         }
 
-        try (CompactionController controller = 
getCompactionController(transaction.originals()))
+        try (CompactionController controller = 
getCompactionController(inputSSTables()))
         {
+            // Note: the controller set-up above relies on using the 
transaction-provided sstable list, from which
+            // fully-expired sstables should not be removed (so that the 
overlap tracker does not include them), but
+            // sstables excluded for scope reduction should be removed.
+            Set<SSTableReader> actuallyCompact = new 
HashSet<>(inputSSTables());
 
             final Set<SSTableReader> fullyExpiredSSTables = 
controller.getFullyExpiredSSTables();
+            if (!fullyExpiredSSTables.isEmpty())
+            {
+                logger.debug("Compaction {} dropping expired sstables: {}", 
transaction.opIdString(), fullyExpiredSSTables);
+                actuallyCompact.removeAll(fullyExpiredSSTables);
+            }
 
             TimeUUID taskId = transaction.opId();
             // select SSTables to compact based on available disk space.
-            if 
(!buildCompactionCandidatesForAvailableDiskSpace(fullyExpiredSSTables, taskId))
+            final boolean hasExpirations = !fullyExpiredSSTables.isEmpty();
+            if (shouldReduceScopeForSpace() && 
!buildCompactionCandidatesForAvailableDiskSpace(actuallyCompact, 
hasExpirations, taskId)

Review Comment:
   Added



##########
src/java/org/apache/cassandra/db/compaction/CompactionTask.java:
##########
@@ -408,12 +450,8 @@ protected boolean 
buildCompactionCandidatesForAvailableDiskSpace(final Set<SSTab
                 // usually means we've run out of disk space
 
                 // but we can still compact expired SSTables
-                if(partialCompactionsAcceptable() && 
fullyExpiredSSTables.size() > 0 )
-                {
-                    // sanity check to make sure we compact only fully expired 
SSTables.
-                    assert 
transaction.originals().equals(fullyExpiredSSTables);
+                if (partialCompactionsAcceptable() && containsExpired)

Review Comment:
   You are right, the new code would unnecessarily compact one sstable, while 
the old one would remove some of the expired ones.
   
   Changed this part to remove any remaining nonExpired sstables from the 
transaction, which should result in leaving only all of expired ones (which is 
an improvement over the old behaviour). The effect of not having anything to 
compact is to complete the transaction after doing nothing, which in turn will 
delete the expired sstables.



##########
src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java:
##########
@@ -180,17 +178,17 @@ public void shutdown()
      *
      * Is responsible for marking its sstables as compaction-pending.
      */
-    public abstract AbstractCompactionTask getNextBackgroundTask(final long 
gcBefore);
+    public abstract Collection<AbstractCompactionTask> 
getNextBackgroundTasks(final long gcBefore);

Review Comment:
   Changed, also in the next method.



##########
src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java:
##########
@@ -352,22 +346,25 @@ synchronized int getNumPendingRepairFinishedTasks()
         return count;
     }
 
-    synchronized AbstractCompactionTask getNextRepairFinishedTask()
+    synchronized Collection<AbstractCompactionTask> 
getNextRepairFinishedTasks()
     {
+        List<AbstractCompactionTask> tasks = null;

Review Comment:
   Done



##########
src/java/org/apache/cassandra/db/lifecycle/CompositeLifecycleTransaction.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.lifecycle;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.utils.TimeUUID;
+
+/// Composite lifecycle transaction. This is a wrapper around a lifecycle 
transaction that allows for multiple partial
+/// operations that comprise the whole transaction. This is used to 
parallelize compaction operations over individual
+/// output shards where the compaction sources are shared among the 
operations; in this case we can only release the
+/// shared sources once all operations are complete.
+///
+/// A composite transaction is initialized with a main transaction that will 
be used to commit the transaction. Each
+/// part of the composite transaction must be registered with the transaction 
before it is used. The transaction must
+/// be initialized by calling [#completeInitialization()] before any of the 
processing is allowed to proceed.
+///
+/// The transaction is considered complete when all parts have been committed 
or aborted. If any part is aborted, the
+/// whole transaction is also aborted ([PartialLifecycleTransaction] will also 
throw an exception on other parts when
+/// they access it if the composite is already aborted).
+///
+/// When all parts are committed, the full transaction is applied by 
performing a checkpoint, obsoletion of the
+/// originals if any of the parts requested it, preparation and commit. This 
may somewhat violate the rules of
+/// transactions as a part that has been committed may actually have no effect 
if another part is aborted later.
+/// There are also restrictions on the operations that this model can accept, 
e.g. replacement of sources and partial
+/// checkpointing are not supported (as they are parts of early open which we 
don't aim to support at this time),
+/// and we consider that all parts will have the same opinion about the 
obsoletion of the originals.
+public class CompositeLifecycleTransaction
+{
+    protected static final Logger logger = 
LoggerFactory.getLogger(CompositeLifecycleTransaction.class);
+
+    final LifecycleTransaction mainTransaction;
+    private final AtomicInteger partsToCommitOrAbort;
+    private volatile boolean obsoleteOriginalsRequested;
+    private volatile boolean wasAborted;
+    private volatile boolean initializationComplete;
+    private volatile int partsCount = 0;
+
+    public CompositeLifecycleTransaction(LifecycleTransaction mainTransaction)
+    {
+        this.mainTransaction = mainTransaction;
+        this.partsToCommitOrAbort = new AtomicInteger(0);
+        this.wasAborted = false;
+        this.obsoleteOriginalsRequested = false;
+    }
+
+    /// Register one part of the composite transaction. Every part must 
register itself before the composite transaction
+    /// is initialized and the parts are allowed to proceed.
+    /// @param part the part to register
+    public TimeUUID register(PartialLifecycleTransaction part)
+    {
+        int index = partsToCommitOrAbort.incrementAndGet();
+        return mainTransaction.opId().withSequence(index);
+    }
+
+    /// Complete the initialization of the composite transaction. This must be 
called before any of the parts are
+    /// executed.
+    public void completeInitialization()
+    {
+        partsCount = partsToCommitOrAbort.get();
+        initializationComplete = true;
+        // TODO: Switch to trace before merging.

Review Comment:
   Changed



##########
src/java/org/apache/cassandra/db/compaction/ShardManagerTrivial.java:
##########
@@ -54,14 +57,21 @@ public double rangeSpanned(SSTableReader rdr)
     }
 
     @Override
-    public double calculateCombinedDensity(Set<? extends SSTableReader> 
sstables)
+    public double calculateCombinedDensity(Collection<SSTableReader> sstables)
     {
         double totalSize = 0;
         for (SSTableReader sstable : sstables)
             totalSize += sstable.onDiskLength();
         return totalSize;
     }
 
+    public <T> List<T> splitSSTablesInShards(Collection<SSTableReader> 
sstables,

Review Comment:
   Done



##########
src/java/org/apache/cassandra/tools/nodetool/Compact.java:
##########
@@ -50,6 +50,12 @@ public class Compact extends NodeToolCmd
     @Option(title = "partition_key", name = {"--partition"}, description = 
"String representation of the partition key")
     private String partitionKey = EMPTY;
 
+    @Option(title = "jobs",
+            name = {"-j", "--jobs"},
+            description = "Use -j to specify the maximum number of threads to 
use for parallel compaction. " +
+                          "If not set, up to half the compaction threads will 
be used. " +
+                          "If set to 0, the major compaction will use all 
threads and will not permit other compactions to run until it completes (use 
with caution).")

Review Comment:
   This is now fixed,



##########
src/java/org/apache/cassandra/db/compaction/UnifiedCompactionStrategy.java:
##########
@@ -139,47 +139,80 @@ private static int atLeast2(int value, String str)
     public static String printScalingParameter(int w)
     {
         if (w < 0)
-            return "L" + Integer.toString(2 - w);
+            return 'L' + Integer.toString(2 - w);
         else if (w > 0)
-            return "T" + Integer.toString(w + 2);
+            return 'T' + Integer.toString(w + 2);
         else
             return "N";
     }
 
+    private TimeUUID nextTimeUUID()
+    {
+        return TimeUUID.Generator.nextTimeUUID().withSequence(0);
+    }
+
     @Override
-    public synchronized Collection<AbstractCompactionTask> getMaximalTask(long 
gcBefore, boolean splitOutput)
+    public synchronized Collection<AbstractCompactionTask> 
getMaximalTasks(long gcBefore, boolean splitOutput, int permittedParallelism)
     {
         maybeUpdateShardManager();
         // The tasks are split by repair status and disk, as well as in 
non-overlapping sections to enable some
         // parallelism (to the amount that L0 sstables are split, i.e. at 
least base_shard_count). The result will be
         // split across shards according to its density. Depending on the 
parallelism, the operation may require up to
         // 100% extra space to complete.
         List<AbstractCompactionTask> tasks = new ArrayList<>();
-        List<Set<SSTableReader>> nonOverlapping = 
splitInNonOverlappingSets(filterSuspectSSTables(getSSTables()));
-        for (Set<SSTableReader> set : nonOverlapping)
+        if (permittedParallelism <= 0)

Review Comment:
   No, it was not (apart from a test), and has been removed in the current code.



##########
src/java/org/apache/cassandra/db/compaction/UnifiedCompactionStrategy.java:
##########
@@ -139,47 +139,80 @@ private static int atLeast2(int value, String str)
     public static String printScalingParameter(int w)
     {
         if (w < 0)
-            return "L" + Integer.toString(2 - w);
+            return 'L' + Integer.toString(2 - w);
         else if (w > 0)
-            return "T" + Integer.toString(w + 2);
+            return 'T' + Integer.toString(w + 2);
         else
             return "N";
     }
 
+    private TimeUUID nextTimeUUID()
+    {
+        return TimeUUID.Generator.nextTimeUUID().withSequence(0);

Review Comment:
   Added



##########
src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java:
##########
@@ -221,7 +222,7 @@ public CassandraValidationIterator(ColumnFamilyStore cfs, 
SharedContext ctx, Col
 
         long gcBefore = dontPurgeTombstones ? Long.MIN_VALUE : 
getDefaultGcBefore(cfs, nowInSec);
         controller = new ValidationCompactionController(cfs, gcBefore);
-        scanners = cfs.getCompactionStrategyManager().getScanners(sstables, 
ranges);
+        scanners = ScannerFactory.DEFAULT.getScanners(sstables, ranges);

Review Comment:
   This is correct, that optimization will no longer apply here.
   
   We have three options on this:
   - Leave as is (special scanner applies only to compactions, no cost to get 
list)
   - Roll back 
https://github.com/apache/cassandra/pull/3688/commits/b24959861d38b139720b36791714047261efcf1d
 and associated follow-ups (special scanner applies everywhere, all operations 
pay a collection cost when getting the scanner list)
   - Drop the special LCS scanner altogether (LCS pays a cost of up to 1 extra 
key comparison per partition in exchange for lower call polymorphism).
   
   If you are not happy with the first option, I can run some benchmarks to see 
if the special scanner actually saves anything to choose between the other two.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to