blambov commented on code in PR #3688:
URL: https://github.com/apache/cassandra/pull/3688#discussion_r1872852784
##########
src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java:
##########
@@ -52,6 +52,13 @@ public interface ColumnFamilyStoreMBean
*/
public void forceMajorCompaction(boolean splitOutput) throws
ExecutionException, InterruptedException;
+ /**
+ * force a major compaction of this column family
+ *
+ * @param permittedParallelism the maximum number of compaction threads
that can be used by the operation
Review Comment:
Good call. Verifying that it is indeed applied as described uncovered
problems.
In fact, the strategy itself is the wrong place to apply this limit, because
we may have other tasks (e.g. from other repair state or other disks).
Moved all the limit application to `CompactionStrategyManager`
##########
src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java:
##########
@@ -331,56 +309,6 @@ public void metadataChanged(StatsMetadata oldMetadata,
SSTableReader sstable)
{
}
- public static class ScannerList implements AutoCloseable
- {
- public final List<ISSTableScanner> scanners;
- public ScannerList(List<ISSTableScanner> scanners)
- {
- this.scanners = scanners;
- }
-
- public long getTotalBytesScanned()
- {
- long bytesScanned = 0L;
- for (int i=0, isize=scanners.size(); i<isize; i++)
- bytesScanned += scanners.get(i).getBytesScanned();
-
- return bytesScanned;
- }
-
- public long getTotalCompressedSize()
- {
- long compressedSize = 0;
- for (int i=0, isize=scanners.size(); i<isize; i++)
- compressedSize += scanners.get(i).getCompressedLengthInBytes();
-
- return compressedSize;
- }
-
- public double getCompressionRatio()
- {
- double compressed = 0.0;
- double uncompressed = 0.0;
-
- for (int i=0, isize=scanners.size(); i<isize; i++)
- {
- ISSTableScanner scanner = scanners.get(i);
- compressed += scanner.getCompressedLengthInBytes();
- uncompressed += scanner.getLengthInBytes();
- }
-
- if (compressed == uncompressed || uncompressed == 0)
- return MetadataCollector.NO_COMPRESSION_RATIO;
-
- return compressed / uncompressed;
- }
-
- public void close()
- {
- ISSTableScanner.closeAllAndPropagate(scanners, null);
- }
- }
-
public ScannerList getScanners(Collection<SSTableReader> toCompact)
Review Comment:
Done
##########
src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java:
##########
@@ -62,7 +60,7 @@
* i/o done by compaction, and merging done at read time.
* - perform a full (maximum possible) compaction if requested by the user
*/
-public abstract class AbstractCompactionStrategy
+public abstract class AbstractCompactionStrategy implements ScannerFactory
Review Comment:
Done
##########
src/java/org/apache/cassandra/db/compaction/CompositeCompactionTask.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.cassandra.utils.Throwables;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/// A composition of several compaction tasks into one. This is used to limit
the parallelism of some compaction tasks
+/// that split into a large number of parallelizable tasks but should not be
allowed to take all compaction executor
+/// threads.
+public class CompositeCompactionTask extends AbstractCompactionTask
+{
+ @VisibleForTesting
+ final ArrayList<AbstractCompactionTask> tasks;
+
+ public CompositeCompactionTask(AbstractCompactionTask first)
+ {
+ super(first.cfs,
first.cfs.getTracker().tryModify(Collections.emptyList(),
OperationType.COMPACTION));
+ tasks = new ArrayList<>();
+ addTask(first);
+ }
+
+ /// Add a task to the composition.
+ public CompositeCompactionTask addTask(AbstractCompactionTask task)
+ {
+ tasks.add(task);
+ return this;
+ }
+
+ @Override
+ protected int executeInternal(ActiveCompactionsTracker tracker)
Review Comment:
Done
##########
src/java/org/apache/cassandra/db/compaction/unified/UnifiedCompactionTask.java:
##########
@@ -36,26 +41,76 @@ public class UnifiedCompactionTask extends CompactionTask
{
private final ShardManager shardManager;
private final Controller controller;
+ private final Range<Token> operationRange;
+ private final Set<SSTableReader> actuallyCompact;
public UnifiedCompactionTask(ColumnFamilyStore cfs,
UnifiedCompactionStrategy strategy,
- LifecycleTransaction txn,
+ ILifecycleTransaction txn,
long gcBefore,
ShardManager shardManager)
{
- super(cfs, txn, gcBefore);
+ this(cfs, strategy, txn, gcBefore, shardManager, null, null);
+ }
+
+ public UnifiedCompactionTask(ColumnFamilyStore cfs,
+ UnifiedCompactionStrategy strategy,
+ ILifecycleTransaction txn,
+ long gcBefore,
+ ShardManager shardManager,
+ Range<Token> operationRange,
+ Collection<SSTableReader> actuallyCompact)
+ {
+ super(cfs, strategy, txn, gcBefore);
this.controller = strategy.getController();
this.shardManager = shardManager;
+
+ if (operationRange != null)
+ {
+ assert actuallyCompact != null : "Ranged tasks should use a set of
sstables to compact";
+ }
+ this.operationRange = operationRange;
+ // To make sure actuallyCompact tracks any removals from
txn.originals(), we intersect the given set with it.
+ // This should not be entirely necessary (as
shouldReduceScopeForSpace() is false for ranged tasks), but it
+ // is cleaner to enforce inputSSTables()'s requirements.
+ this.actuallyCompact = actuallyCompact != null ?
Sets.intersection(ImmutableSet.copyOf(actuallyCompact),
+
txn.originals())
+ : txn.originals();
}
@Override
public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore
cfs,
Directories
directories,
- LifecycleTransaction
txn,
+
ILifecycleTransaction txn,
Set<SSTableReader>
nonExpiredSSTables)
{
double density =
shardManager.calculateCombinedDensity(nonExpiredSSTables);
int numShards = controller.getNumShards(density *
shardManager.shardSetCoverage());
- return new ShardedCompactionWriter(cfs, directories, txn,
nonExpiredSSTables, keepOriginals, shardManager.boundaries(numShards));
+ boolean earlyOpenAllowed = tokenRange() == null;
+ return new ShardedCompactionWriter(cfs,
+ directories,
+ txn,
+ nonExpiredSSTables,
+ keepOriginals,
+ earlyOpenAllowed,
+ shardManager.boundaries(numShards));
+ }
+
+ @Override
+ protected Range<Token> tokenRange()
+ {
+ return operationRange;
+ }
+
+ @Override
Review Comment:
Added
##########
src/java/org/apache/cassandra/db/lifecycle/CompositeLifecycleTransaction.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.lifecycle;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.utils.TimeUUID;
+
+/// Composite lifecycle transaction. This is a wrapper around a lifecycle
transaction that allows for multiple partial
+/// operations that comprise the whole transaction. This is used to
parallelize compaction operations over individual
+/// output shards where the compaction sources are shared among the
operations; in this case we can only release the
+/// shared sources once all operations are complete.
+///
+/// A composite transaction is initialized with a main transaction that will
be used to commit the transaction. Each
+/// part of the composite transaction must be registered with the transaction
before it is used. The transaction must
+/// be initialized by calling [#completeInitialization()] before any of the
processing is allowed to proceed.
+///
+/// The transaction is considered complete when all parts have been committed
or aborted. If any part is aborted, the
+/// whole transaction is also aborted ([PartialLifecycleTransaction] will also
throw an exception on other parts when
+/// they access it if the composite is already aborted).
+///
+/// When all parts are committed, the full transaction is applied by
performing a checkpoint, obsoletion of the
+/// originals if any of the parts requested it, preparation and commit. This
may somewhat violate the rules of
+/// transactions as a part that has been committed may actually have no effect
if another part is aborted later.
+/// There are also restrictions on the operations that this model can accept,
e.g. replacement of sources and partial
+/// checkpointing are not supported (as they are parts of early open which we
don't aim to support at this time),
+/// and we consider that all parts will have the same opinion about the
obsoletion of the originals.
+public class CompositeLifecycleTransaction
+{
+ protected static final Logger logger =
LoggerFactory.getLogger(CompositeLifecycleTransaction.class);
+
+ final LifecycleTransaction mainTransaction;
+ private final AtomicInteger partsToCommitOrAbort;
+ private volatile boolean obsoleteOriginalsRequested;
+ private volatile boolean wasAborted;
+ private volatile boolean initializationComplete;
+ private volatile int partsCount = 0;
+
+ public CompositeLifecycleTransaction(LifecycleTransaction mainTransaction)
+ {
+ this.mainTransaction = mainTransaction;
+ this.partsToCommitOrAbort = new AtomicInteger(0);
+ this.wasAborted = false;
+ this.obsoleteOriginalsRequested = false;
+ }
+
+ /// Register one part of the composite transaction. Every part must
register itself before the composite transaction
+ /// is initialized and the parts are allowed to proceed.
+ /// @param part the part to register
+ public TimeUUID register(PartialLifecycleTransaction part)
+ {
+ int index = partsToCommitOrAbort.incrementAndGet();
+ return mainTransaction.opId().withSequence(index);
Review Comment:
Added comment to the constructor.
##########
src/java/org/apache/cassandra/utils/TimeUUID.java:
##########
@@ -293,6 +293,22 @@ public static String toString(TimeUUID ballot, String kind)
return ballot == null ? "null" : String.format("%s(%d:%s)", kind,
ballot.uuidTimestamp(), ballot);
}
+ public int sequence()
+ {
+ return (int) ((lsb >> 48) & 0x0000000000003FFFL);
+ }
+
+ /**
+ * Returns a new TimeUUID with the same timestamp as this one, but with
the provided sequence value.
Review Comment:
Added
##########
src/java/org/apache/cassandra/db/compaction/CompositeCompactionTask.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.cassandra.utils.Throwables;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/// A composition of several compaction tasks into one. This is used to limit
the parallelism of some compaction tasks
Review Comment:
Done
##########
src/java/org/apache/cassandra/db/compaction/CompactionManager.java:
##########
@@ -376,24 +386,129 @@ public void run()
}
CompactionStrategyManager strategy =
cfs.getCompactionStrategyManager();
- AbstractCompactionTask task =
strategy.getNextBackgroundTask(getDefaultGcBefore(cfs,
FBUtilities.nowInSeconds()));
- if (task == null)
+ tasks =
strategy.getNextBackgroundTasks(getDefaultGcBefore(cfs,
FBUtilities.nowInSeconds()));
+ if (tasks == null || tasks.isEmpty())
{
if (DatabaseDescriptor.automaticSSTableUpgrade())
ranCompaction = maybeRunUpgradeTask(strategy);
}
- else
+ else if (tasks.size() == 1)
{
- task.execute(active);
+ // If just one task, run it directly on this thread
+ for (AbstractCompactionTask task : tasks)
+ task.execute(active);
ranCompaction = true;
}
+ else
Review Comment:
Changed
##########
src/java/org/apache/cassandra/db/compaction/CompactionController.java:
##########
@@ -145,20 +151,20 @@ public Set<SSTableReader> getFullyExpiredSSTables()
*
* @param cfStore
* @param compacting we take the drop-candidates from this set, it is
usually the sstables included in the compaction
- * @param overlapping the sstables that overlap the ones in compacting.
+ * @param overlappingSupplier function used to get the sstables that
overlap the ones in compacting.
Review Comment:
Done
##########
src/java/org/apache/cassandra/db/compaction/CompactionManager.java:
##########
@@ -1805,13 +1930,12 @@ public void obsoleteOriginals() {}
public void close() {}
}
- CompactionStrategyManager strategy =
cfs.getCompactionStrategyManager();
try (SharedTxn sharedTxn = new SharedTxn(txn);
SSTableRewriter fullWriter =
SSTableRewriter.constructWithoutEarlyOpening(sharedTxn, false, groupMaxDataAge);
SSTableRewriter transWriter =
SSTableRewriter.constructWithoutEarlyOpening(sharedTxn, false, groupMaxDataAge);
SSTableRewriter unrepairedWriter =
SSTableRewriter.constructWithoutEarlyOpening(sharedTxn, false, groupMaxDataAge);
- AbstractCompactionStrategy.ScannerList scanners =
strategy.getScanners(txn.originals());
+ ScannerList scanners =
ScannerFactory.DEFAULT.getScanners(txn.originals());
Review Comment:
Yes, it does. Taking advantage of it in places like this is the reason for
the complexity I was trying to get rid of (the sstable set must be split for
the individual strategy instances, then combined back; this adds cost to normal
compactions).
Note that since CASSANDRA-8915, the cost of not using the LCS-specific
scanner fell dramatically to just one key comparison per partition (i.e. one
long comparison in most cases).
##########
src/java/org/apache/cassandra/db/compaction/CompactionTask.java:
##########
@@ -62,15 +64,17 @@ public class CompactionTask extends AbstractCompactionTask
protected final boolean keepOriginals;
protected static long totalBytesCompacted = 0;
private ActiveCompactionsTracker activeCompactions;
+ protected final ScannerFactory scannerFactory;
- public CompactionTask(ColumnFamilyStore cfs, LifecycleTransaction txn,
long gcBefore)
+ public CompactionTask(ColumnFamilyStore cfs, AbstractCompactionStrategy
strategy, ILifecycleTransaction txn, long gcBefore)
{
- this(cfs, txn, gcBefore, false);
+ this(cfs, strategy, txn, gcBefore, false);
}
- public CompactionTask(ColumnFamilyStore cfs, LifecycleTransaction txn,
long gcBefore, boolean keepOriginals)
+ public CompactionTask(ColumnFamilyStore cfs, AbstractCompactionStrategy
strategy, ILifecycleTransaction txn, long gcBefore, boolean keepOriginals)
Review Comment:
Done; it was a little annoying, though, there were quite a few calls that
needed to be adjusted.
I am tempted to completely remove the LCS-specific scanner.
##########
src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java:
##########
@@ -100,6 +100,7 @@ public boolean
reduceScopeForLimitedSpace(Set<SSTableReader> nonExpiredSSTables,
largestL0SSTable.onDiskLength(),
transaction.opId());
transaction.cancel(largestL0SSTable);
+ nonExpiredSSTables.remove(largestL0SSTable);
Review Comment:
Yes, done
##########
src/java/org/apache/cassandra/db/compaction/CompactionTask.java:
##########
@@ -84,68 +88,107 @@ protected int executeInternal(ActiveCompactionsTracker
activeCompactions)
{
this.activeCompactions = activeCompactions == null ?
ActiveCompactionsTracker.NOOP : activeCompactions;
run();
- return transaction.originals().size();
+ return inputSSTables().size();
}
public boolean reduceScopeForLimitedSpace(Set<SSTableReader>
nonExpiredSSTables, long expectedSize)
{
- if (partialCompactionsAcceptable() && transaction.originals().size() >
1)
+ if (partialCompactionsAcceptable() && nonExpiredSSTables.size() > 1)
{
// Try again w/o the largest one.
SSTableReader removedSSTable =
cfs.getMaxSizeFile(nonExpiredSSTables);
logger.warn("insufficient space to compact all requested files.
{}MiB required, {} for compaction {} - removing largest SSTable: {}",
(float) expectedSize / 1024 / 1024,
- StringUtils.join(transaction.originals(), ", "),
- transaction.opId(),
+ StringUtils.join(nonExpiredSSTables, ", "),
+ transaction.opIdString(),
removedSSTable);
// Note that we have removed files that are still marked as
compacting.
// This suboptimal but ok since the caller will unmark all the
sstables at the end.
transaction.cancel(removedSSTable);
+ nonExpiredSSTables.remove(removedSSTable);
return true;
}
return false;
}
+ /**
+ * @return The token range that the operation should compact. This is
usually null, but if we have a parallelizable
+ * multi-task operation (see {@link
UnifiedCompactionStrategy#createCompactionTasks}), it will specify a subrange.
+ */
+ protected Range<Token> tokenRange()
+ {
+ return null;
+ }
+
+ /**
+ * @return The set of input sstables for this compaction. This must be a
subset of the transaction originals and
+ * must reflect any removal of sstables from the originals set for correct
overlap tracking.
+ * See {@link UnifiedCompactionTask} for an example.
+ */
+ protected Set<SSTableReader> inputSSTables()
+ {
+ return transaction.originals();
+ }
+
+ /**
+ * @return True if the task should try to limit the operation size to the
available space by removing sstables from
+ * the compacting set. This cannot be done if this is part of a multi-task
operation with a shared transaction.
+ */
+ protected boolean shouldReduceScopeForSpace()
+ {
+ return true;
+ }
+
/**
* For internal use and testing only. The rest of the system should go
through the submit* methods,
* which are properly serialized.
* Caller is in charge of marking/unmarking the sstables as compacting.
*/
+ @Override
protected void runMayThrow() throws Exception
{
// The collection of sstables passed may be empty (but not null); even
if
// it is not empty, it may compact down to nothing if all rows are
deleted.
assert transaction != null;
- if (transaction.originals().isEmpty())
+ if (inputSSTables().isEmpty())
return;
- // Note that the current compaction strategy, is not necessarily the
one this task was created under.
- // This should be harmless; see comments to
CFS.maybeReloadCompactionStrategy.
- CompactionStrategyManager strategy =
cfs.getCompactionStrategyManager();
-
if (DatabaseDescriptor.isSnapshotBeforeCompaction())
{
Instant creationTime = now();
cfs.snapshotWithoutMemtable(creationTime.toEpochMilli() +
"-compact-" + cfs.name, creationTime);
}
- try (CompactionController controller =
getCompactionController(transaction.originals()))
+ try (CompactionController controller =
getCompactionController(inputSSTables()))
{
+ // Note: the controller set-up above relies on using the
transaction-provided sstable list, from which
+ // fully-expired sstables should not be removed (so that the
overlap tracker does not include them), but
+ // sstables excluded for scope reduction should be removed.
+ Set<SSTableReader> actuallyCompact = new
HashSet<>(inputSSTables());
final Set<SSTableReader> fullyExpiredSSTables =
controller.getFullyExpiredSSTables();
+ if (!fullyExpiredSSTables.isEmpty())
+ {
+ logger.debug("Compaction {} dropping expired sstables: {}",
transaction.opIdString(), fullyExpiredSSTables);
+ actuallyCompact.removeAll(fullyExpiredSSTables);
+ }
TimeUUID taskId = transaction.opId();
// select SSTables to compact based on available disk space.
- if
(!buildCompactionCandidatesForAvailableDiskSpace(fullyExpiredSSTables, taskId))
+ final boolean hasExpirations = !fullyExpiredSSTables.isEmpty();
+ if (shouldReduceScopeForSpace() &&
!buildCompactionCandidatesForAvailableDiskSpace(actuallyCompact,
hasExpirations, taskId)
Review Comment:
Added
##########
src/java/org/apache/cassandra/db/compaction/CompactionTask.java:
##########
@@ -408,12 +450,8 @@ protected boolean
buildCompactionCandidatesForAvailableDiskSpace(final Set<SSTab
// usually means we've run out of disk space
// but we can still compact expired SSTables
- if(partialCompactionsAcceptable() &&
fullyExpiredSSTables.size() > 0 )
- {
- // sanity check to make sure we compact only fully expired
SSTables.
- assert
transaction.originals().equals(fullyExpiredSSTables);
+ if (partialCompactionsAcceptable() && containsExpired)
Review Comment:
You are right, the new code would unnecessarily compact one sstable, while
the old one would remove some of the expired ones.
Changed this part to remove any remaining nonExpired sstables from the
transaction, which should result in leaving only all of expired ones (which is
an improvement over the old behaviour). The effect of not having anything to
compact is to complete the transaction after doing nothing, which in turn will
delete the expired sstables.
##########
src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java:
##########
@@ -180,17 +178,17 @@ public void shutdown()
*
* Is responsible for marking its sstables as compaction-pending.
*/
- public abstract AbstractCompactionTask getNextBackgroundTask(final long
gcBefore);
+ public abstract Collection<AbstractCompactionTask>
getNextBackgroundTasks(final long gcBefore);
Review Comment:
Changed, also in the next method.
##########
src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java:
##########
@@ -352,22 +346,25 @@ synchronized int getNumPendingRepairFinishedTasks()
return count;
}
- synchronized AbstractCompactionTask getNextRepairFinishedTask()
+ synchronized Collection<AbstractCompactionTask>
getNextRepairFinishedTasks()
{
+ List<AbstractCompactionTask> tasks = null;
Review Comment:
Done
##########
src/java/org/apache/cassandra/db/lifecycle/CompositeLifecycleTransaction.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.lifecycle;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.utils.TimeUUID;
+
+/// Composite lifecycle transaction. This is a wrapper around a lifecycle
transaction that allows for multiple partial
+/// operations that comprise the whole transaction. This is used to
parallelize compaction operations over individual
+/// output shards where the compaction sources are shared among the
operations; in this case we can only release the
+/// shared sources once all operations are complete.
+///
+/// A composite transaction is initialized with a main transaction that will
be used to commit the transaction. Each
+/// part of the composite transaction must be registered with the transaction
before it is used. The transaction must
+/// be initialized by calling [#completeInitialization()] before any of the
processing is allowed to proceed.
+///
+/// The transaction is considered complete when all parts have been committed
or aborted. If any part is aborted, the
+/// whole transaction is also aborted ([PartialLifecycleTransaction] will also
throw an exception on other parts when
+/// they access it if the composite is already aborted).
+///
+/// When all parts are committed, the full transaction is applied by
performing a checkpoint, obsoletion of the
+/// originals if any of the parts requested it, preparation and commit. This
may somewhat violate the rules of
+/// transactions as a part that has been committed may actually have no effect
if another part is aborted later.
+/// There are also restrictions on the operations that this model can accept,
e.g. replacement of sources and partial
+/// checkpointing are not supported (as they are parts of early open which we
don't aim to support at this time),
+/// and we consider that all parts will have the same opinion about the
obsoletion of the originals.
+public class CompositeLifecycleTransaction
+{
+ protected static final Logger logger =
LoggerFactory.getLogger(CompositeLifecycleTransaction.class);
+
+ final LifecycleTransaction mainTransaction;
+ private final AtomicInteger partsToCommitOrAbort;
+ private volatile boolean obsoleteOriginalsRequested;
+ private volatile boolean wasAborted;
+ private volatile boolean initializationComplete;
+ private volatile int partsCount = 0;
+
+ public CompositeLifecycleTransaction(LifecycleTransaction mainTransaction)
+ {
+ this.mainTransaction = mainTransaction;
+ this.partsToCommitOrAbort = new AtomicInteger(0);
+ this.wasAborted = false;
+ this.obsoleteOriginalsRequested = false;
+ }
+
+ /// Register one part of the composite transaction. Every part must
register itself before the composite transaction
+ /// is initialized and the parts are allowed to proceed.
+ /// @param part the part to register
+ public TimeUUID register(PartialLifecycleTransaction part)
+ {
+ int index = partsToCommitOrAbort.incrementAndGet();
+ return mainTransaction.opId().withSequence(index);
+ }
+
+ /// Complete the initialization of the composite transaction. This must be
called before any of the parts are
+ /// executed.
+ public void completeInitialization()
+ {
+ partsCount = partsToCommitOrAbort.get();
+ initializationComplete = true;
+ // TODO: Switch to trace before merging.
Review Comment:
Changed
##########
src/java/org/apache/cassandra/db/compaction/ShardManagerTrivial.java:
##########
@@ -54,14 +57,21 @@ public double rangeSpanned(SSTableReader rdr)
}
@Override
- public double calculateCombinedDensity(Set<? extends SSTableReader>
sstables)
+ public double calculateCombinedDensity(Collection<SSTableReader> sstables)
{
double totalSize = 0;
for (SSTableReader sstable : sstables)
totalSize += sstable.onDiskLength();
return totalSize;
}
+ public <T> List<T> splitSSTablesInShards(Collection<SSTableReader>
sstables,
Review Comment:
Done
##########
src/java/org/apache/cassandra/tools/nodetool/Compact.java:
##########
@@ -50,6 +50,12 @@ public class Compact extends NodeToolCmd
@Option(title = "partition_key", name = {"--partition"}, description =
"String representation of the partition key")
private String partitionKey = EMPTY;
+ @Option(title = "jobs",
+ name = {"-j", "--jobs"},
+ description = "Use -j to specify the maximum number of threads to
use for parallel compaction. " +
+ "If not set, up to half the compaction threads will
be used. " +
+ "If set to 0, the major compaction will use all
threads and will not permit other compactions to run until it completes (use
with caution).")
Review Comment:
This is now fixed,
##########
src/java/org/apache/cassandra/db/compaction/UnifiedCompactionStrategy.java:
##########
@@ -139,47 +139,80 @@ private static int atLeast2(int value, String str)
public static String printScalingParameter(int w)
{
if (w < 0)
- return "L" + Integer.toString(2 - w);
+ return 'L' + Integer.toString(2 - w);
else if (w > 0)
- return "T" + Integer.toString(w + 2);
+ return 'T' + Integer.toString(w + 2);
else
return "N";
}
+ private TimeUUID nextTimeUUID()
+ {
+ return TimeUUID.Generator.nextTimeUUID().withSequence(0);
+ }
+
@Override
- public synchronized Collection<AbstractCompactionTask> getMaximalTask(long
gcBefore, boolean splitOutput)
+ public synchronized Collection<AbstractCompactionTask>
getMaximalTasks(long gcBefore, boolean splitOutput, int permittedParallelism)
{
maybeUpdateShardManager();
// The tasks are split by repair status and disk, as well as in
non-overlapping sections to enable some
// parallelism (to the amount that L0 sstables are split, i.e. at
least base_shard_count). The result will be
// split across shards according to its density. Depending on the
parallelism, the operation may require up to
// 100% extra space to complete.
List<AbstractCompactionTask> tasks = new ArrayList<>();
- List<Set<SSTableReader>> nonOverlapping =
splitInNonOverlappingSets(filterSuspectSSTables(getSSTables()));
- for (Set<SSTableReader> set : nonOverlapping)
+ if (permittedParallelism <= 0)
Review Comment:
No, it was not (apart from a test), and has been removed in the current code.
##########
src/java/org/apache/cassandra/db/compaction/UnifiedCompactionStrategy.java:
##########
@@ -139,47 +139,80 @@ private static int atLeast2(int value, String str)
public static String printScalingParameter(int w)
{
if (w < 0)
- return "L" + Integer.toString(2 - w);
+ return 'L' + Integer.toString(2 - w);
else if (w > 0)
- return "T" + Integer.toString(w + 2);
+ return 'T' + Integer.toString(w + 2);
else
return "N";
}
+ private TimeUUID nextTimeUUID()
+ {
+ return TimeUUID.Generator.nextTimeUUID().withSequence(0);
Review Comment:
Added
##########
src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java:
##########
@@ -221,7 +222,7 @@ public CassandraValidationIterator(ColumnFamilyStore cfs,
SharedContext ctx, Col
long gcBefore = dontPurgeTombstones ? Long.MIN_VALUE :
getDefaultGcBefore(cfs, nowInSec);
controller = new ValidationCompactionController(cfs, gcBefore);
- scanners = cfs.getCompactionStrategyManager().getScanners(sstables,
ranges);
+ scanners = ScannerFactory.DEFAULT.getScanners(sstables, ranges);
Review Comment:
This is correct, that optimization will no longer apply here.
We have three options on this:
- Leave as is (special scanner applies only to compactions, no cost to get
list)
- Roll back
https://github.com/apache/cassandra/pull/3688/commits/b24959861d38b139720b36791714047261efcf1d
and associated follow-ups (special scanner applies everywhere, all operations
pay a collection cost when getting the scanner list)
- Drop the special LCS scanner altogether (LCS pays a cost of up to 1 extra
key comparison per partition in exchange for lower call polymorphism).
If you are not happy with the first option, I can run some benchmarks to see
if the special scanner actually saves anything to choose between the other two.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]