pcmanus commented on code in PR #3688:
URL: https://github.com/apache/cassandra/pull/3688#discussion_r1871182901
##########
src/java/org/apache/cassandra/db/compaction/CompactionController.java:
##########
@@ -145,20 +151,20 @@ public Set<SSTableReader> getFullyExpiredSSTables()
*
* @param cfStore
* @param compacting we take the drop-candidates from this set, it is
usually the sstables included in the compaction
- * @param overlapping the sstables that overlap the ones in compacting.
+ * @param overlappingSupplier function used to get the sstables that
overlap the ones in compacting.
Review Comment:
Nit: I'd rephrase to something like "called on the `compacting` sstables to
compute the set of sstables that overlap with them if needed" (both to specify
what the function argument is, and suggest that the reason this is a method is
to avoid the computation if it's not needed (which I assume the reason?)).
##########
src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java:
##########
@@ -331,56 +309,6 @@ public void metadataChanged(StatsMetadata oldMetadata,
SSTableReader sstable)
{
}
- public static class ScannerList implements AutoCloseable
- {
- public final List<ISSTableScanner> scanners;
- public ScannerList(List<ISSTableScanner> scanners)
- {
- this.scanners = scanners;
- }
-
- public long getTotalBytesScanned()
- {
- long bytesScanned = 0L;
- for (int i=0, isize=scanners.size(); i<isize; i++)
- bytesScanned += scanners.get(i).getBytesScanned();
-
- return bytesScanned;
- }
-
- public long getTotalCompressedSize()
- {
- long compressedSize = 0;
- for (int i=0, isize=scanners.size(); i<isize; i++)
- compressedSize += scanners.get(i).getCompressedLengthInBytes();
-
- return compressedSize;
- }
-
- public double getCompressionRatio()
- {
- double compressed = 0.0;
- double uncompressed = 0.0;
-
- for (int i=0, isize=scanners.size(); i<isize; i++)
- {
- ISSTableScanner scanner = scanners.get(i);
- compressed += scanner.getCompressedLengthInBytes();
- uncompressed += scanner.getLengthInBytes();
- }
-
- if (compressed == uncompressed || uncompressed == 0)
- return MetadataCollector.NO_COMPRESSION_RATIO;
-
- return compressed / uncompressed;
- }
-
- public void close()
- {
- ISSTableScanner.closeAllAndPropagate(scanners, null);
- }
- }
-
public ScannerList getScanners(Collection<SSTableReader> toCompact)
Review Comment:
This method can go away; it does the same thing than the default
implementation of the interface.
##########
src/java/org/apache/cassandra/db/compaction/CompactionTask.java:
##########
@@ -62,15 +64,17 @@ public class CompactionTask extends AbstractCompactionTask
protected final boolean keepOriginals;
protected static long totalBytesCompacted = 0;
private ActiveCompactionsTracker activeCompactions;
+ protected final ScannerFactory scannerFactory;
- public CompactionTask(ColumnFamilyStore cfs, LifecycleTransaction txn,
long gcBefore)
+ public CompactionTask(ColumnFamilyStore cfs, AbstractCompactionStrategy
strategy, ILifecycleTransaction txn, long gcBefore)
{
- this(cfs, txn, gcBefore, false);
+ this(cfs, strategy, txn, gcBefore, false);
}
- public CompactionTask(ColumnFamilyStore cfs, LifecycleTransaction txn,
long gcBefore, boolean keepOriginals)
+ public CompactionTask(ColumnFamilyStore cfs, AbstractCompactionStrategy
strategy, ILifecycleTransaction txn, long gcBefore, boolean keepOriginals)
Review Comment:
Nit: I've have pass a `ScannerFactory` type here instead of the strategy.
That's the only part used and so makes the intent imo clearer. At which point
I'd refuse `null` and manually pass `ScannerFactory.DEFAULT` when appropriate:
that's one less thing that require readers to dig for that a `null` mean in
this context.
##########
src/java/org/apache/cassandra/db/compaction/CompactionTask.java:
##########
@@ -84,68 +88,107 @@ protected int executeInternal(ActiveCompactionsTracker
activeCompactions)
{
this.activeCompactions = activeCompactions == null ?
ActiveCompactionsTracker.NOOP : activeCompactions;
run();
- return transaction.originals().size();
+ return inputSSTables().size();
}
public boolean reduceScopeForLimitedSpace(Set<SSTableReader>
nonExpiredSSTables, long expectedSize)
{
- if (partialCompactionsAcceptable() && transaction.originals().size() >
1)
+ if (partialCompactionsAcceptable() && nonExpiredSSTables.size() > 1)
{
// Try again w/o the largest one.
SSTableReader removedSSTable =
cfs.getMaxSizeFile(nonExpiredSSTables);
logger.warn("insufficient space to compact all requested files.
{}MiB required, {} for compaction {} - removing largest SSTable: {}",
(float) expectedSize / 1024 / 1024,
- StringUtils.join(transaction.originals(), ", "),
- transaction.opId(),
+ StringUtils.join(nonExpiredSSTables, ", "),
+ transaction.opIdString(),
removedSSTable);
// Note that we have removed files that are still marked as
compacting.
// This suboptimal but ok since the caller will unmark all the
sstables at the end.
transaction.cancel(removedSSTable);
+ nonExpiredSSTables.remove(removedSSTable);
return true;
}
return false;
}
+ /**
+ * @return The token range that the operation should compact. This is
usually null, but if we have a parallelizable
+ * multi-task operation (see {@link
UnifiedCompactionStrategy#createCompactionTasks}), it will specify a subrange.
+ */
+ protected Range<Token> tokenRange()
+ {
+ return null;
+ }
+
+ /**
+ * @return The set of input sstables for this compaction. This must be a
subset of the transaction originals and
+ * must reflect any removal of sstables from the originals set for correct
overlap tracking.
+ * See {@link UnifiedCompactionTask} for an example.
+ */
+ protected Set<SSTableReader> inputSSTables()
+ {
+ return transaction.originals();
+ }
+
+ /**
+ * @return True if the task should try to limit the operation size to the
available space by removing sstables from
+ * the compacting set. This cannot be done if this is part of a multi-task
operation with a shared transaction.
+ */
+ protected boolean shouldReduceScopeForSpace()
+ {
+ return true;
+ }
+
/**
* For internal use and testing only. The rest of the system should go
through the submit* methods,
* which are properly serialized.
* Caller is in charge of marking/unmarking the sstables as compacting.
*/
+ @Override
protected void runMayThrow() throws Exception
{
// The collection of sstables passed may be empty (but not null); even
if
// it is not empty, it may compact down to nothing if all rows are
deleted.
assert transaction != null;
- if (transaction.originals().isEmpty())
+ if (inputSSTables().isEmpty())
return;
- // Note that the current compaction strategy, is not necessarily the
one this task was created under.
- // This should be harmless; see comments to
CFS.maybeReloadCompactionStrategy.
- CompactionStrategyManager strategy =
cfs.getCompactionStrategyManager();
-
if (DatabaseDescriptor.isSnapshotBeforeCompaction())
{
Instant creationTime = now();
cfs.snapshotWithoutMemtable(creationTime.toEpochMilli() +
"-compact-" + cfs.name, creationTime);
}
- try (CompactionController controller =
getCompactionController(transaction.originals()))
+ try (CompactionController controller =
getCompactionController(inputSSTables()))
{
+ // Note: the controller set-up above relies on using the
transaction-provided sstable list, from which
+ // fully-expired sstables should not be removed (so that the
overlap tracker does not include them), but
+ // sstables excluded for scope reduction should be removed.
+ Set<SSTableReader> actuallyCompact = new
HashSet<>(inputSSTables());
final Set<SSTableReader> fullyExpiredSSTables =
controller.getFullyExpiredSSTables();
+ if (!fullyExpiredSSTables.isEmpty())
+ {
+ logger.debug("Compaction {} dropping expired sstables: {}",
transaction.opIdString(), fullyExpiredSSTables);
+ actuallyCompact.removeAll(fullyExpiredSSTables);
+ }
TimeUUID taskId = transaction.opId();
// select SSTables to compact based on available disk space.
- if
(!buildCompactionCandidatesForAvailableDiskSpace(fullyExpiredSSTables, taskId))
+ final boolean hasExpirations = !fullyExpiredSSTables.isEmpty();
+ if (shouldReduceScopeForSpace() &&
!buildCompactionCandidatesForAvailableDiskSpace(actuallyCompact,
hasExpirations, taskId)
Review Comment:
I'd add parenthesis to clarify the operator precedence we want (I suspect
I'm not the only one that don't know operator precedence by heart; I reckon the
line change here was probably meant to show the precedence, but I can't quite
be sure it's not just your editor switching line automatically :)).
##########
src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java:
##########
@@ -100,6 +100,7 @@ public boolean
reduceScopeForLimitedSpace(Set<SSTableReader> nonExpiredSSTables,
largestL0SSTable.onDiskLength(),
transaction.opId());
transaction.cancel(largestL0SSTable);
+ nonExpiredSSTables.remove(largestL0SSTable);
Review Comment:
Should the test at the beginning of this method be updated to check
`nonExpiredSSTables` like the generic version of this in `CompactionTask`?
##########
src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java:
##########
@@ -180,17 +178,17 @@ public void shutdown()
*
* Is responsible for marking its sstables as compaction-pending.
*/
- public abstract AbstractCompactionTask getNextBackgroundTask(final long
gcBefore);
+ public abstract Collection<AbstractCompactionTask>
getNextBackgroundTasks(final long gcBefore);
Review Comment:
Nit: the javadoc needs edits (`null` is never returned anymore in
particular).
##########
src/java/org/apache/cassandra/db/lifecycle/CompositeLifecycleTransaction.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.lifecycle;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.utils.TimeUUID;
+
+/// Composite lifecycle transaction. This is a wrapper around a lifecycle
transaction that allows for multiple partial
+/// operations that comprise the whole transaction. This is used to
parallelize compaction operations over individual
+/// output shards where the compaction sources are shared among the
operations; in this case we can only release the
+/// shared sources once all operations are complete.
+///
+/// A composite transaction is initialized with a main transaction that will
be used to commit the transaction. Each
+/// part of the composite transaction must be registered with the transaction
before it is used. The transaction must
+/// be initialized by calling [#completeInitialization()] before any of the
processing is allowed to proceed.
+///
+/// The transaction is considered complete when all parts have been committed
or aborted. If any part is aborted, the
+/// whole transaction is also aborted ([PartialLifecycleTransaction] will also
throw an exception on other parts when
+/// they access it if the composite is already aborted).
+///
+/// When all parts are committed, the full transaction is applied by
performing a checkpoint, obsoletion of the
+/// originals if any of the parts requested it, preparation and commit. This
may somewhat violate the rules of
+/// transactions as a part that has been committed may actually have no effect
if another part is aborted later.
+/// There are also restrictions on the operations that this model can accept,
e.g. replacement of sources and partial
+/// checkpointing are not supported (as they are parts of early open which we
don't aim to support at this time),
+/// and we consider that all parts will have the same opinion about the
obsoletion of the originals.
+public class CompositeLifecycleTransaction
+{
+ protected static final Logger logger =
LoggerFactory.getLogger(CompositeLifecycleTransaction.class);
+
+ final LifecycleTransaction mainTransaction;
+ private final AtomicInteger partsToCommitOrAbort;
+ private volatile boolean obsoleteOriginalsRequested;
+ private volatile boolean wasAborted;
+ private volatile boolean initializationComplete;
+ private volatile int partsCount = 0;
+
+ public CompositeLifecycleTransaction(LifecycleTransaction mainTransaction)
+ {
+ this.mainTransaction = mainTransaction;
+ this.partsToCommitOrAbort = new AtomicInteger(0);
+ this.wasAborted = false;
+ this.obsoleteOriginalsRequested = false;
+ }
+
+ /// Register one part of the composite transaction. Every part must
register itself before the composite transaction
+ /// is initialized and the parts are allowed to proceed.
+ /// @param part the part to register
+ public TimeUUID register(PartialLifecycleTransaction part)
+ {
+ int index = partsToCommitOrAbort.incrementAndGet();
+ return mainTransaction.opId().withSequence(index);
+ }
+
+ /// Complete the initialization of the composite transaction. This must be
called before any of the parts are
+ /// executed.
+ public void completeInitialization()
+ {
+ partsCount = partsToCommitOrAbort.get();
+ initializationComplete = true;
+ // TODO: Switch to trace before merging.
Review Comment:
Flagging this so it doesn't get forgotten.
##########
src/java/org/apache/cassandra/db/compaction/UnifiedCompactionStrategy.java:
##########
@@ -139,47 +139,80 @@ private static int atLeast2(int value, String str)
public static String printScalingParameter(int w)
{
if (w < 0)
- return "L" + Integer.toString(2 - w);
+ return 'L' + Integer.toString(2 - w);
else if (w > 0)
- return "T" + Integer.toString(w + 2);
+ return 'T' + Integer.toString(w + 2);
else
return "N";
}
+ private TimeUUID nextTimeUUID()
+ {
+ return TimeUUID.Generator.nextTimeUUID().withSequence(0);
Review Comment:
Reading further, I realize this is due to how
`CompositeLifecycleTransaction` name its subtasks. It's fine, but again, worth
documenting since the underlying reason is a bit "distant".
##########
src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java:
##########
@@ -62,7 +60,7 @@
* i/o done by compaction, and merging done at read time.
* - perform a full (maximum possible) compaction if requested by the user
*/
-public abstract class AbstractCompactionStrategy
+public abstract class AbstractCompactionStrategy implements ScannerFactory
Review Comment:
Nit: I find it slightly convoluted to implement `ScannerFactory`, and only a
handful of calls to `getScanner` on the strategy remains. I'd have added a
protected method instead (for LCS to override)
```java
protected ScannerFactory scannerFactory()
{
return ScannerFactory.DEFAULT;
}
```
##########
src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java:
##########
@@ -52,6 +52,13 @@ public interface ColumnFamilyStoreMBean
*/
public void forceMajorCompaction(boolean splitOutput) throws
ExecutionException, InterruptedException;
+ /**
+ * force a major compaction of this column family
+ *
+ * @param permittedParallelism the maximum number of compaction threads
that can be used by the operation
Review Comment:
Nit: wouldn't hurt to say that `permittedParallelism <= 0` is supported and
treated specially.
##########
src/java/org/apache/cassandra/db/compaction/CompactionTask.java:
##########
@@ -408,12 +450,8 @@ protected boolean
buildCompactionCandidatesForAvailableDiskSpace(final Set<SSTab
// usually means we've run out of disk space
// but we can still compact expired SSTables
- if(partialCompactionsAcceptable() &&
fullyExpiredSSTables.size() > 0 )
- {
- // sanity check to make sure we compact only fully expired
SSTables.
- assert
transaction.originals().equals(fullyExpiredSSTables);
+ if (partialCompactionsAcceptable() && containsExpired)
Review Comment:
I'm not quite sure about this, but possibly because I don't fully understand
this part.
My reading of the code before is that `reduceScope() == false` and
`partialCompactionsAcceptable` implies that `transaction.originals() <= 1`, and
so if we also have `fullyExpiredSSTable > 0`, then it did imply the one
remaining sstable was expired, which the code asserted (it asserted we only had
fully expired remaining, but I'm not there could have been more than one given
the condition in `reduceScopeForLimitedSpace`).
With the change, `reduceScope() == false` and `partialCompactionsAcceptable`
implies that `nonExpiredSSTables <= 1`. But afaict, even if we have expired
sstables, that does not exclude having 1 remaining non expired sstable, which
seems to contradict the "we can still compact expired SSTables".
Will all that said, I don't really understand what this "we can still
compact expired SSTables" branch does in the first place, even before the
changes. Because even if we exit through this branch, I don't see
`CompactionTask#runMayThrow` doing anything with those "fully expired sstables"
in the first place.
I'll lastly remark that `reduceCopeForLimitedSpace` is also overwritten by
LCS, and I'm not sure how to reason about that one (neither before or after
this patch).
##########
src/java/org/apache/cassandra/db/compaction/CompositeCompactionTask.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.cassandra.utils.Throwables;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/// A composition of several compaction tasks into one. This is used to limit
the parallelism of some compaction tasks
Review Comment:
Nit: maybe specify here that the composition executes task sequentially?
##########
src/java/org/apache/cassandra/db/compaction/CompactionManager.java:
##########
@@ -1805,13 +1930,12 @@ public void obsoleteOriginals() {}
public void close() {}
}
- CompactionStrategyManager strategy =
cfs.getCompactionStrategyManager();
try (SharedTxn sharedTxn = new SharedTxn(txn);
SSTableRewriter fullWriter =
SSTableRewriter.constructWithoutEarlyOpening(sharedTxn, false, groupMaxDataAge);
SSTableRewriter transWriter =
SSTableRewriter.constructWithoutEarlyOpening(sharedTxn, false, groupMaxDataAge);
SSTableRewriter unrepairedWriter =
SSTableRewriter.constructWithoutEarlyOpening(sharedTxn, false, groupMaxDataAge);
- AbstractCompactionStrategy.ScannerList scanners =
strategy.getScanners(txn.originals());
+ ScannerList scanners =
ScannerFactory.DEFAULT.getScanners(txn.originals());
Review Comment:
This appears to disable the LCS "optimisation" with scanners. Am I
misreading it? Is it intentional?
##########
src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java:
##########
@@ -352,22 +346,25 @@ synchronized int getNumPendingRepairFinishedTasks()
return count;
}
- synchronized AbstractCompactionTask getNextRepairFinishedTask()
+ synchronized Collection<AbstractCompactionTask>
getNextRepairFinishedTasks()
{
+ List<AbstractCompactionTask> tasks = null;
Review Comment:
Any specific reason to use `null` here instead of returning an empty
collection? Using an empty collection would seem more consistent with the
other methods that are passed to `TaskSupplier` (and would even allow to expect
`TaskSupplier.get()` to never return `null`).
##########
src/java/org/apache/cassandra/db/compaction/CompactionManager.java:
##########
@@ -376,24 +386,129 @@ public void run()
}
CompactionStrategyManager strategy =
cfs.getCompactionStrategyManager();
- AbstractCompactionTask task =
strategy.getNextBackgroundTask(getDefaultGcBefore(cfs,
FBUtilities.nowInSeconds()));
- if (task == null)
+ tasks =
strategy.getNextBackgroundTasks(getDefaultGcBefore(cfs,
FBUtilities.nowInSeconds()));
+ if (tasks == null || tasks.isEmpty())
{
if (DatabaseDescriptor.automaticSSTableUpgrade())
ranCompaction = maybeRunUpgradeTask(strategy);
}
- else
+ else if (tasks.size() == 1)
{
- task.execute(active);
+ // If just one task, run it directly on this thread
+ for (AbstractCompactionTask task : tasks)
+ task.execute(active);
ranCompaction = true;
}
+ else
Review Comment:
Nit: I personally really find `if` that mix branches with and without
brackets distracting/bad (and I've not seen others use them, though admittedly
my personal experience with this particular code base is outdated of late). I
could have swear this was explicitly prohibited by the coding style, but it's
admittedly not clearly stated in the current form, and so be it. But this one
trips me every time, so I figured I'd at least voice my opinion once, just so I
feel better having voiced it, but it's quite a personal preference and feel
free to ignore (and I won't bring it again).
##########
src/java/org/apache/cassandra/db/compaction/CompositeCompactionTask.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.cassandra.utils.Throwables;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/// A composition of several compaction tasks into one. This is used to limit
the parallelism of some compaction tasks
+/// that split into a large number of parallelizable tasks but should not be
allowed to take all compaction executor
+/// threads.
+public class CompositeCompactionTask extends AbstractCompactionTask
+{
+ @VisibleForTesting
+ final ArrayList<AbstractCompactionTask> tasks;
+
+ public CompositeCompactionTask(AbstractCompactionTask first)
+ {
+ super(first.cfs,
first.cfs.getTracker().tryModify(Collections.emptyList(),
OperationType.COMPACTION));
+ tasks = new ArrayList<>();
+ addTask(first);
+ }
+
+ /// Add a task to the composition.
+ public CompositeCompactionTask addTask(AbstractCompactionTask task)
+ {
+ tasks.add(task);
+ return this;
+ }
+
+ @Override
+ protected int executeInternal(ActiveCompactionsTracker tracker)
Review Comment:
Nit: I couldn't find a single call of `execute` or `executeInternal` that
was using the returned value. Maybe remove it and simplify instead of papering
over it here?
##########
src/java/org/apache/cassandra/db/compaction/UnifiedCompactionStrategy.java:
##########
@@ -139,47 +139,80 @@ private static int atLeast2(int value, String str)
public static String printScalingParameter(int w)
{
if (w < 0)
- return "L" + Integer.toString(2 - w);
+ return 'L' + Integer.toString(2 - w);
else if (w > 0)
- return "T" + Integer.toString(w + 2);
+ return 'T' + Integer.toString(w + 2);
else
return "N";
}
+ private TimeUUID nextTimeUUID()
+ {
+ return TimeUUID.Generator.nextTimeUUID().withSequence(0);
Review Comment:
It's not quite obvious to me why we're doing this. Can you explain/document?
##########
src/java/org/apache/cassandra/db/compaction/UnifiedCompactionStrategy.java:
##########
@@ -139,47 +139,80 @@ private static int atLeast2(int value, String str)
public static String printScalingParameter(int w)
{
if (w < 0)
- return "L" + Integer.toString(2 - w);
+ return 'L' + Integer.toString(2 - w);
else if (w > 0)
- return "T" + Integer.toString(w + 2);
+ return 'T' + Integer.toString(w + 2);
else
return "N";
}
+ private TimeUUID nextTimeUUID()
+ {
+ return TimeUUID.Generator.nextTimeUUID().withSequence(0);
+ }
+
@Override
- public synchronized Collection<AbstractCompactionTask> getMaximalTask(long
gcBefore, boolean splitOutput)
+ public synchronized Collection<AbstractCompactionTask>
getMaximalTasks(long gcBefore, boolean splitOutput, int permittedParallelism)
{
maybeUpdateShardManager();
// The tasks are split by repair status and disk, as well as in
non-overlapping sections to enable some
// parallelism (to the amount that L0 sstables are split, i.e. at
least base_shard_count). The result will be
// split across shards according to its density. Depending on the
parallelism, the operation may require up to
// 100% extra space to complete.
List<AbstractCompactionTask> tasks = new ArrayList<>();
- List<Set<SSTableReader>> nonOverlapping =
splitInNonOverlappingSets(filterSuspectSSTables(getSSTables()));
- for (Set<SSTableReader> set : nonOverlapping)
+ if (permittedParallelism <= 0)
Review Comment:
Is this ever used? `CompactionManager.submitMaximal`, which I believe call
this, already default it to `#compactors / 2` (it's also a tad confusing that
that both places don't agree on what the default means).
##########
src/java/org/apache/cassandra/db/compaction/ShardManagerTrivial.java:
##########
@@ -54,14 +57,21 @@ public double rangeSpanned(SSTableReader rdr)
}
@Override
- public double calculateCombinedDensity(Set<? extends SSTableReader>
sstables)
+ public double calculateCombinedDensity(Collection<SSTableReader> sstables)
{
double totalSize = 0;
for (SSTableReader sstable : sstables)
totalSize += sstable.onDiskLength();
return totalSize;
}
+ public <T> List<T> splitSSTablesInShards(Collection<SSTableReader>
sstables,
Review Comment:
Nit: add `@Override`?
##########
src/java/org/apache/cassandra/tools/nodetool/Compact.java:
##########
@@ -50,6 +50,12 @@ public class Compact extends NodeToolCmd
@Option(title = "partition_key", name = {"--partition"}, description =
"String representation of the partition key")
private String partitionKey = EMPTY;
+ @Option(title = "jobs",
+ name = {"-j", "--jobs"},
+ description = "Use -j to specify the maximum number of threads to
use for parallel compaction. " +
+ "If not set, up to half the compaction threads will
be used. " +
+ "If set to 0, the major compaction will use all
threads and will not permit other compactions to run until it completes (use
with caution).")
Review Comment:
I don't think this comment is up-to-date. Afaict, unset and 0 behave the
same way.
##########
src/java/org/apache/cassandra/db/lifecycle/CompositeLifecycleTransaction.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.lifecycle;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.utils.TimeUUID;
+
+/// Composite lifecycle transaction. This is a wrapper around a lifecycle
transaction that allows for multiple partial
+/// operations that comprise the whole transaction. This is used to
parallelize compaction operations over individual
+/// output shards where the compaction sources are shared among the
operations; in this case we can only release the
+/// shared sources once all operations are complete.
+///
+/// A composite transaction is initialized with a main transaction that will
be used to commit the transaction. Each
+/// part of the composite transaction must be registered with the transaction
before it is used. The transaction must
+/// be initialized by calling [#completeInitialization()] before any of the
processing is allowed to proceed.
+///
+/// The transaction is considered complete when all parts have been committed
or aborted. If any part is aborted, the
+/// whole transaction is also aborted ([PartialLifecycleTransaction] will also
throw an exception on other parts when
+/// they access it if the composite is already aborted).
+///
+/// When all parts are committed, the full transaction is applied by
performing a checkpoint, obsoletion of the
+/// originals if any of the parts requested it, preparation and commit. This
may somewhat violate the rules of
+/// transactions as a part that has been committed may actually have no effect
if another part is aborted later.
+/// There are also restrictions on the operations that this model can accept,
e.g. replacement of sources and partial
+/// checkpointing are not supported (as they are parts of early open which we
don't aim to support at this time),
+/// and we consider that all parts will have the same opinion about the
obsoletion of the originals.
+public class CompositeLifecycleTransaction
+{
+ protected static final Logger logger =
LoggerFactory.getLogger(CompositeLifecycleTransaction.class);
+
+ final LifecycleTransaction mainTransaction;
+ private final AtomicInteger partsToCommitOrAbort;
+ private volatile boolean obsoleteOriginalsRequested;
+ private volatile boolean wasAborted;
+ private volatile boolean initializationComplete;
+ private volatile int partsCount = 0;
+
+ public CompositeLifecycleTransaction(LifecycleTransaction mainTransaction)
+ {
+ this.mainTransaction = mainTransaction;
+ this.partsToCommitOrAbort = new AtomicInteger(0);
+ this.wasAborted = false;
+ this.obsoleteOriginalsRequested = false;
+ }
+
+ /// Register one part of the composite transaction. Every part must
register itself before the composite transaction
+ /// is initialized and the parts are allowed to proceed.
+ /// @param part the part to register
+ public TimeUUID register(PartialLifecycleTransaction part)
+ {
+ int index = partsToCommitOrAbort.incrementAndGet();
+ return mainTransaction.opId().withSequence(index);
Review Comment:
Nit: in practice, the idea of having linked operation ID between the parent
and child tasks make sense. But this does kind of assume that the
`mainTransaction` ID was generated with a 0 sequence (nothing would strongly
break if that's not the case, but it's still somewhat assumed here), and that's
not documented/easy to miss.
##########
src/java/org/apache/cassandra/db/compaction/unified/UnifiedCompactionTask.java:
##########
@@ -36,26 +41,76 @@ public class UnifiedCompactionTask extends CompactionTask
{
private final ShardManager shardManager;
private final Controller controller;
+ private final Range<Token> operationRange;
+ private final Set<SSTableReader> actuallyCompact;
public UnifiedCompactionTask(ColumnFamilyStore cfs,
UnifiedCompactionStrategy strategy,
- LifecycleTransaction txn,
+ ILifecycleTransaction txn,
long gcBefore,
ShardManager shardManager)
{
- super(cfs, txn, gcBefore);
+ this(cfs, strategy, txn, gcBefore, shardManager, null, null);
+ }
+
+ public UnifiedCompactionTask(ColumnFamilyStore cfs,
+ UnifiedCompactionStrategy strategy,
+ ILifecycleTransaction txn,
+ long gcBefore,
+ ShardManager shardManager,
+ Range<Token> operationRange,
+ Collection<SSTableReader> actuallyCompact)
+ {
+ super(cfs, strategy, txn, gcBefore);
this.controller = strategy.getController();
this.shardManager = shardManager;
+
+ if (operationRange != null)
+ {
+ assert actuallyCompact != null : "Ranged tasks should use a set of
sstables to compact";
+ }
+ this.operationRange = operationRange;
+ // To make sure actuallyCompact tracks any removals from
txn.originals(), we intersect the given set with it.
+ // This should not be entirely necessary (as
shouldReduceScopeForSpace() is false for ranged tasks), but it
+ // is cleaner to enforce inputSSTables()'s requirements.
+ this.actuallyCompact = actuallyCompact != null ?
Sets.intersection(ImmutableSet.copyOf(actuallyCompact),
+
txn.originals())
+ : txn.originals();
}
@Override
public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore
cfs,
Directories
directories,
- LifecycleTransaction
txn,
+
ILifecycleTransaction txn,
Set<SSTableReader>
nonExpiredSSTables)
{
double density =
shardManager.calculateCombinedDensity(nonExpiredSSTables);
int numShards = controller.getNumShards(density *
shardManager.shardSetCoverage());
- return new ShardedCompactionWriter(cfs, directories, txn,
nonExpiredSSTables, keepOriginals, shardManager.boundaries(numShards));
+ boolean earlyOpenAllowed = tokenRange() == null;
+ return new ShardedCompactionWriter(cfs,
+ directories,
+ txn,
+ nonExpiredSSTables,
+ keepOriginals,
+ earlyOpenAllowed,
+ shardManager.boundaries(numShards));
+ }
+
+ @Override
+ protected Range<Token> tokenRange()
+ {
+ return operationRange;
+ }
+
+ @Override
Review Comment:
Can you document why we should not reduce scope when `tokenRange() == null`?
##########
src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java:
##########
@@ -221,7 +222,7 @@ public CassandraValidationIterator(ColumnFamilyStore cfs,
SharedContext ctx, Col
long gcBefore = dontPurgeTombstones ? Long.MIN_VALUE :
getDefaultGcBefore(cfs, nowInSec);
controller = new ValidationCompactionController(cfs, gcBefore);
- scanners = cfs.getCompactionStrategyManager().getScanners(sstables,
ranges);
+ scanners = ScannerFactory.DEFAULT.getScanners(sstables, ranges);
Review Comment:
Not super fussed about it, but isn't it a slight regression that this is not
allowed to rely on the compaction strategy to optimize the scanners?
##########
src/java/org/apache/cassandra/utils/TimeUUID.java:
##########
@@ -293,6 +293,22 @@ public static String toString(TimeUUID ballot, String kind)
return ballot == null ? "null" : String.format("%s(%d:%s)", kind,
ballot.uuidTimestamp(), ballot);
}
+ public int sequence()
+ {
+ return (int) ((lsb >> 48) & 0x0000000000003FFFL);
+ }
+
+ /**
+ * Returns a new TimeUUID with the same timestamp as this one, but with
the provided sequence value.
Review Comment:
Let's call out that this method should be used with care as it removes
"uniqueness" guarantees of the returned `TimeUUID` (same as we warn on
`minAtUnixMillis` for instance).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]