This is an automated email from the ASF dual-hosted git repository. jmckenzie pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 377e114cb1 Introduce compaction priorities to prevent upgrade compaction inability to finish 377e114cb1 is described below commit 377e114cb1459895423c292cb0bf7f921fd30e43 Author: Josh McKenzie <jmcken...@apache.org> AuthorDate: Thu Aug 25 15:27:24 2022 -0400 Introduce compaction priorities to prevent upgrade compaction inability to finish Patch by Alex Petrov; reviewed by Josh McKenzie and Marcus Eriksson for CASSANDRA-17851 Co-authored-by: Alex Petrov <oleksandr.pet...@gmail.com> Co-authored-by: Josh McKenzie <jmcken...@apache.org> --- CHANGES.txt | 1 + .../org/apache/cassandra/db/ColumnFamilyStore.java | 42 +++- .../cassandra/db/compaction/CompactionManager.java | 131 +++++++----- .../db/compaction/CompactionStrategyManager.java | 4 +- .../cassandra/db/compaction/OperationType.java | 60 ++++-- .../cassandra/db/repair/PendingAntiCompaction.java | 7 +- .../distributed/test/PreviewRepairTest.java | 31 +-- .../distributed/test/UpgradeSSTablesTest.java | 223 ++++++++++++++++++++- .../LongLeveledCompactionStrategyTest.java | 4 +- .../db/compaction/CancelCompactionsTest.java | 19 +- .../db/repair/PendingAntiCompactionTest.java | 27 ++- 11 files changed, 421 insertions(+), 128 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 6d6c736e1e..a3548e313d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.2 + * Introduce compaction priorities to prevent upgrade compaction inability to finish (CASSANDRA-17851) * Prevent a user from manually removing ephemeral snapshots (CASSANDRA-17757) * Remove dependency on Maven Ant Tasks (CASSANDRA-17750) * Update ASM(9.1 to 9.3), Mockito(1.10.10 to 1.12.13) and ByteBuddy(3.2.4 to 4.7.0) (CASSANDRA-17835) diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index de1033ae0c..e4b9d781e7 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -83,6 +83,7 @@ import org.apache.cassandra.config.DurationSpec; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.commitlog.CommitLogPosition; import org.apache.cassandra.db.compaction.AbstractCompactionStrategy; +import org.apache.cassandra.db.compaction.CompactionInfo; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.compaction.CompactionStrategyManager; import org.apache.cassandra.db.compaction.OperationType; @@ -1800,7 +1801,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner return session != null && sessions.contains(session); }; return runWithCompactionsDisabled(() -> compactionStrategyManager.releaseRepairData(sessions), - predicate, false, true, true); + predicate, OperationType.STREAM, false, true, true); } else { @@ -2539,7 +2540,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner cfs.runWithCompactionsDisabled((Callable<Void>) () -> { cfs.data.reset(memtableFactory.create(new AtomicReference<>(CommitLogPosition.NONE), cfs.metadata, cfs)); return null; - }, true, false); + }, OperationType.P0, true, false); } } @@ -2628,7 +2629,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner } }; - runWithCompactionsDisabled(FutureTask.callable(truncateRunnable), true, true); + runWithCompactionsDisabled(FutureTask.callable(truncateRunnable), OperationType.P0, true, true); viewManager.build(); @@ -2659,9 +2660,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner FBUtilities.waitOnFuture(dumpMemtable()); } - public <V> V runWithCompactionsDisabled(Callable<V> callable, boolean interruptValidation, boolean interruptViews) + public <V> V runWithCompactionsDisabled(Callable<V> callable, OperationType operationType, boolean interruptValidation, boolean interruptViews) { - return runWithCompactionsDisabled(callable, (sstable) -> true, interruptValidation, interruptViews, true); + return runWithCompactionsDisabled(callable, (sstable) -> true, operationType, interruptValidation, interruptViews, true); } /** @@ -2674,13 +2675,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner * @param interruptIndexes if we should interrupt compactions on indexes. NOTE: if you set this to true your sstablePredicate * must be able to handle LocalPartitioner sstables! */ - public <V> V runWithCompactionsDisabled(Callable<V> callable, Predicate<SSTableReader> sstablesPredicate, boolean interruptValidation, boolean interruptViews, boolean interruptIndexes) + public <V> V runWithCompactionsDisabled(Callable<V> callable, Predicate<SSTableReader> sstablesPredicate, OperationType operationType, boolean interruptValidation, boolean interruptViews, boolean interruptIndexes) { // synchronize so that concurrent invocations don't re-enable compactions partway through unexpectedly, // and so we only run one major compaction at a time synchronized (this) { - logger.trace("Cancelling in-progress compactions for {}", metadata.name); + logger.debug("Cancelling in-progress compactions for {}", metadata.name); Iterable<ColumnFamilyStore> toInterruptFor = interruptIndexes ? concatWithIndexes() : Collections.singleton(this); @@ -2689,9 +2690,24 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner ? Iterables.concat(toInterruptFor, viewManager.allViewsCfs()) : toInterruptFor; + Iterable<TableMetadata> toInterruptForMetadata = Iterables.transform(toInterruptFor, ColumnFamilyStore::metadata); + try (CompactionManager.CompactionPauser pause = CompactionManager.instance.pauseGlobalCompaction(); CompactionManager.CompactionPauser pausedStrategies = pauseCompactionStrategies(toInterruptFor)) { + List<CompactionInfo.Holder> uninterruptibleTasks = CompactionManager.instance.getCompactionsMatching(toInterruptForMetadata, + (info) -> info.getTaskType().priority <= operationType.priority); + if (!uninterruptibleTasks.isEmpty()) + { + logger.info("Unable to cancel in-progress compactions, since they're running with higher or same priority: {}. You can abort these operations using `nodetool stop`.", + uninterruptibleTasks.stream().map((compaction) -> String.format("%s@%s (%s)", + compaction.getCompactionInfo().getTaskType(), + compaction.getCompactionInfo().getTable(), + compaction.getCompactionInfo().getTaskId())) + .collect(Collectors.joining(","))); + return null; + } + // interrupt in-progress compactions CompactionManager.instance.interruptCompactionForCFs(toInterruptFor, sstablesPredicate, interruptValidation); CompactionManager.instance.waitForCessation(toInterruptFor, sstablesPredicate); @@ -2701,7 +2717,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner { if (cfs.getTracker().getCompacting().stream().anyMatch(sstablesPredicate)) { - logger.warn("Unable to cancel in-progress compactions for {}. Perhaps there is an unusually large row in progress somewhere, or the system is simply overloaded.", metadata.name); + logger.warn("Unable to cancel in-progress compactions for {}. " + + "Perhaps there is an unusually large row in progress somewhere, or the system is simply overloaded.", + metadata.name); return null; } } @@ -2756,7 +2774,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner return accumulate; } - public LifecycleTransaction markAllCompacting(final OperationType operationType) + public <T> T withAllSSTables(final OperationType operationType, Function<LifecycleTransaction, T> op) { Callable<LifecycleTransaction> callable = () -> { assert data.getCompacting().isEmpty() : data.getCompacting(); @@ -2767,10 +2785,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner return modifier; }; - return runWithCompactionsDisabled(callable, false, false); + try (LifecycleTransaction compacting = runWithCompactionsDisabled(callable, operationType, false, false)) + { + return op.apply(compacting); + } } - @Override public String toString() { diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 5906ac294a..dc22b6712a 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -369,68 +369,70 @@ public class CompactionManager implements CompactionManagerMBean * @throws InterruptedException */ @SuppressWarnings("resource") - private AllSSTableOpStatus parallelAllSSTableOperation(final ColumnFamilyStore cfs, final OneSSTableOperation operation, int jobs, OperationType operationType) throws ExecutionException, InterruptedException + private AllSSTableOpStatus parallelAllSSTableOperation(final ColumnFamilyStore cfs, final OneSSTableOperation operation, int jobs, OperationType operationType) { - logger.info("Starting {} for {}.{}", operationType, cfs.keyspace.getName(), cfs.getTableName()); - List<LifecycleTransaction> transactions = new ArrayList<>(); - List<Future<?>> futures = new ArrayList<>(); - try (LifecycleTransaction compacting = cfs.markAllCompacting(operationType)) - { - if (compacting == null) - return AllSSTableOpStatus.UNABLE_TO_CANCEL; - - Iterable<SSTableReader> sstables = Lists.newArrayList(operation.filterSSTables(compacting)); - if (Iterables.isEmpty(sstables)) + return cfs.withAllSSTables(operationType, (compacting) -> { + logger.info("Starting {} for {}.{}", operationType, cfs.keyspace.getName(), cfs.getTableName()); + List<LifecycleTransaction> transactions = new ArrayList<>(); + List<Future<?>> futures = new ArrayList<>(); + try { - logger.info("No sstables to {} for {}.{}", operationType.name(), cfs.keyspace.getName(), cfs.name); - return AllSSTableOpStatus.SUCCESSFUL; - } + if (compacting == null) + return AllSSTableOpStatus.UNABLE_TO_CANCEL; - for (final SSTableReader sstable : sstables) - { - final LifecycleTransaction txn = compacting.split(singleton(sstable)); - transactions.add(txn); - Callable<Object> callable = new Callable<Object>() + Iterable<SSTableReader> sstables = Lists.newArrayList(operation.filterSSTables(compacting)); + if (Iterables.isEmpty(sstables)) { - @Override - public Object call() throws Exception - { - operation.execute(txn); - return this; - } - }; - Future<?> fut = executor.submitIfRunning(callable, "paralell sstable operation"); - if (!fut.isCancelled()) - futures.add(fut); - else - return AllSSTableOpStatus.ABORTED; + logger.info("No sstables to {} for {}.{}", operationType.name(), cfs.keyspace.getName(), cfs.name); + return AllSSTableOpStatus.SUCCESSFUL; + } - if (jobs > 0 && futures.size() == jobs) + for (final SSTableReader sstable : sstables) { - Future<?> f = FBUtilities.waitOnFirstFuture(futures); - futures.remove(f); + final LifecycleTransaction txn = compacting.split(singleton(sstable)); + transactions.add(txn); + Callable<Object> callable = new Callable<Object>() + { + @Override + public Object call() throws Exception + { + operation.execute(txn); + return this; + } + }; + Future<?> fut = executor.submitIfRunning(callable, "paralell sstable operation"); + if (!fut.isCancelled()) + futures.add(fut); + else + return AllSSTableOpStatus.ABORTED; + + if (jobs > 0 && futures.size() == jobs) + { + Future<?> f = FBUtilities.waitOnFirstFuture(futures); + futures.remove(f); + } } - } - FBUtilities.waitOnFutures(futures); - assert compacting.originals().isEmpty(); - logger.info("Finished {} for {}.{} successfully", operationType, cfs.keyspace.getName(), cfs.getTableName()); - return AllSSTableOpStatus.SUCCESSFUL; - } - finally - { - // wait on any unfinished futures to make sure we don't close an ongoing transaction - try - { FBUtilities.waitOnFutures(futures); + assert compacting.originals().isEmpty(); + logger.info("Finished {} for {}.{} successfully", operationType, cfs.keyspace.getName(), cfs.getTableName()); + return AllSSTableOpStatus.SUCCESSFUL; } - catch (Throwable t) + finally { - // these are handled/logged in CompactionExecutor#afterExecute + // wait on any unfinished futures to make sure we don't close an ongoing transaction + try + { + FBUtilities.waitOnFutures(futures); + } + catch (Throwable t) + { + // these are handled/logged in CompactionExecutor#afterExecute + } + Throwable fail = Throwables.close(null, transactions); + if (fail != null) + logger.error("Failed to cleanup lifecycle transactions ({} for {}.{})", operationType, cfs.keyspace.getName(), cfs.getTableName(), fail); } - Throwable fail = Throwables.close(null, transactions); - if (fail != null) - logger.error("Failed to cleanup lifecycle transactions ({} for {}.{})", operationType, cfs.keyspace.getName(), cfs.getTableName(), fail); - } + }); } private static interface OneSSTableOperation @@ -914,11 +916,17 @@ public class CompactionManager implements CompactionManagerMBean @SuppressWarnings("resource") // the tasks are executed in parallel on the executor, making sure that they get closed public List<Future<?>> submitMaximal(final ColumnFamilyStore cfStore, final int gcBefore, boolean splitOutput) + { + return submitMaximal(cfStore, gcBefore, splitOutput, OperationType.MAJOR_COMPACTION); + } + + @SuppressWarnings("resource") + public List<Future<?>> submitMaximal(final ColumnFamilyStore cfStore, final int gcBefore, boolean splitOutput, OperationType operationType) { // here we compute the task off the compaction executor, so having that present doesn't // confuse runWithCompactionsDisabled -- i.e., we don't want to deadlock ourselves, waiting // for ourselves to finish/acknowledge cancellation before continuing. - CompactionTasks tasks = cfStore.getCompactionStrategyManager().getMaximalTasks(gcBefore, splitOutput); + CompactionTasks tasks = cfStore.getCompactionStrategyManager().getMaximalTasks(gcBefore, splitOutput, operationType); if (tasks.isEmpty()) return Collections.emptyList(); @@ -963,6 +971,7 @@ public class CompactionManager implements CompactionManagerMBean try (CompactionTasks tasks = cfStore.runWithCompactionsDisabled(taskCreator, sstablesPredicate, + OperationType.MAJOR_COMPACTION, false, false, false)) @@ -2265,6 +2274,24 @@ public class CompactionManager implements CompactionManagerMBean } } + public List<Holder> getCompactionsMatching(Iterable<TableMetadata> columnFamilies, Predicate<CompactionInfo> predicate) + { + Preconditions.checkArgument(columnFamilies != null, "Attempted to getCompactionsMatching in CompactionManager with no columnFamilies specified."); + + List<Holder> matched = new ArrayList<>(); + // consider all in-progress compactions + for (Holder holder : active.getCompactions()) + { + CompactionInfo info = holder.getCompactionInfo(); + if (info.getTableMetadata() == null || Iterables.contains(columnFamilies, info.getTableMetadata())) + { + if (predicate.test(info)) + matched.add(holder); + } + } + return matched; + } + /** * Try to stop all of the compactions for given ColumnFamilies. * diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java index ca67ddb0ea..808ea9ecd6 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java @@ -991,7 +991,7 @@ public class CompactionStrategyManager implements INotificationConsumer } } - public CompactionTasks getMaximalTasks(final int gcBefore, final boolean splitOutput) + public CompactionTasks getMaximalTasks(final int gcBefore, final boolean splitOutput, OperationType operationType) { maybeReloadDiskBoundaries(); // runWithCompactionsDisabled cancels active compactions and disables them, then we are able @@ -1012,7 +1012,7 @@ public class CompactionStrategyManager implements INotificationConsumer readLock.unlock(); } return CompactionTasks.create(tasks); - }, false, false); + }, operationType, false, false); } /** diff --git a/src/java/org/apache/cassandra/db/compaction/OperationType.java b/src/java/org/apache/cassandra/db/compaction/OperationType.java index e957e42c9d..a15693fe83 100644 --- a/src/java/org/apache/cassandra/db/compaction/OperationType.java +++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java @@ -20,35 +20,51 @@ package org.apache.cassandra.db.compaction; public enum OperationType { /** Each modification here should be also applied to {@link org.apache.cassandra.tools.nodetool.Stop#compactionType} */ - COMPACTION("Compaction"), - VALIDATION("Validation"), - KEY_CACHE_SAVE("Key cache save"), - ROW_CACHE_SAVE("Row cache save"), - COUNTER_CACHE_SAVE("Counter cache save"), - CLEANUP("Cleanup"), - SCRUB("Scrub"), - UPGRADE_SSTABLES("Upgrade sstables"), - INDEX_BUILD("Secondary index build"), - /** Compaction for tombstone removal */ - TOMBSTONE_COMPACTION("Tombstone Compaction"), - UNKNOWN("Unknown compaction type"), - ANTICOMPACTION("Anticompaction after repair"), - VERIFY("Verify"), - FLUSH("Flush"), - STREAM("Stream"), - WRITE("Write"), - VIEW_BUILD("View build"), - INDEX_SUMMARY("Index summary redistribution"), - RELOCATE("Relocate sstables to correct disk"), - GARBAGE_COLLECT("Remove deleted data"); + P0("Cancel all operations", 0), + + // Automation or operator-driven tasks + CLEANUP("Cleanup", 1), + SCRUB("Scrub", 1), + UPGRADE_SSTABLES("Upgrade sstables", 1), + VERIFY("Verify", 1), + MAJOR_COMPACTION("Major compaction", 1), + RELOCATE("Relocate sstables to correct disk", 1), + GARBAGE_COLLECT("Remove deleted data", 1), + + // Internal SSTable writing + FLUSH("Flush", 1), + WRITE("Write", 1), + + ANTICOMPACTION("Anticompaction after repair", 2), + VALIDATION("Validation", 3), + + INDEX_BUILD("Secondary index build", 4), + VIEW_BUILD("View build", 4), + + COMPACTION("Compaction", 5), + TOMBSTONE_COMPACTION("Tombstone Compaction", 5), // Compaction for tombstone removal + UNKNOWN("Unknown compaction type", 5), + + STREAM("Stream", 6), + KEY_CACHE_SAVE("Key cache save", 6), + ROW_CACHE_SAVE("Row cache save", 6), + COUNTER_CACHE_SAVE("Counter cache save", 6), + INDEX_SUMMARY("Index summary redistribution", 6); public final String type; public final String fileName; - OperationType(String type) + // As of now, priority takes part only for interrupting tasks to give way to operator-driven tasks. + // Operation types that have a smaller number will be allowed to cancel ones that have larger numbers. + // + // Submitted tasks may be prioritised differently when forming a queue, if/when CASSANDRA-11218 is implemented. + public final int priority; + + OperationType(String type, int priority) { this.type = type; this.fileName = type.toLowerCase().replace(" ", ""); + this.priority = priority; } public static OperationType fromFileName(String fileName) diff --git a/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java b/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java index af9888a3f1..a993bac5ee 100644 --- a/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java +++ b/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java @@ -214,6 +214,11 @@ public class PendingAntiCompaction return null; } + protected AcquireResult acquireSSTables() + { + return cfs.runWithCompactionsDisabled(this::acquireTuple, predicate, OperationType.ANTICOMPACTION, false, false, false); + } + public AcquireResult call() { logger.debug("acquiring sstables for pending anti compaction on session {}", sessionID); @@ -231,7 +236,7 @@ public class PendingAntiCompaction { // Note that anticompactions are not disabled when running this. This is safe since runWithCompactionsDisabled // is synchronized - acquireTuple and predicate can only be run by a single thread (for the given cfs). - return cfs.runWithCompactionsDisabled(this::acquireTuple, predicate, false, false, false); + return acquireSSTables(); } catch (SSTableAcquisitionException e) { diff --git a/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java index 90e29f2eea..a0b643f0d3 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java @@ -73,9 +73,7 @@ import static org.apache.cassandra.distributed.api.Feature.GOSSIP; import static org.apache.cassandra.distributed.api.Feature.NETWORK; import static org.apache.cassandra.distributed.api.IMessageFilters.Matcher; import static org.apache.cassandra.distributed.impl.Instance.deserializeMessage; -import static org.apache.cassandra.distributed.test.PreviewRepairTest.DelayFirstRepairTypeMessageFilter.finalizePropose; import static org.apache.cassandra.distributed.test.PreviewRepairTest.DelayFirstRepairTypeMessageFilter.validationRequest; -import static org.apache.cassandra.net.Verb.FINALIZE_PROPOSE_MSG; import static org.apache.cassandra.net.Verb.VALIDATION_REQ; import static org.apache.cassandra.service.StorageService.instance; import static org.apache.cassandra.utils.concurrent.Condition.newOneTimeCondition; @@ -187,10 +185,14 @@ public class PreviewRepairTest extends TestBaseImpl previewRepairStarted.await(); // this needs to finish before the preview repair is unpaused on node2 cluster.get(1).callOnInstance(repair(options(false, false))); + RepairResult irResult = cluster.get(1).callOnInstance(repair(options(false, false))); continuePreviewRepair.signalAll(); RepairResult rs = rsFuture.get(); - assertFalse(rs.success); // preview repair should have failed + assertFalse(rs.success); // preview repair was started before IR, but has lower priority, so its task will get cancelled assertFalse(rs.wasInconsistent); // and no mismatches should have been reported + + assertTrue(irResult.success); // IR was started after preview repair, but has a higher priority, so it'll be allowed to finish + assertFalse(irResult.wasInconsistent); } finally { @@ -226,34 +228,21 @@ public class PreviewRepairTest extends TestBaseImpl .messagesMatching(validationRequest(previewRepairStarted, continuePreviewRepair)) .drop(); - Condition irRepairStarted = newOneTimeCondition(); - Condition continueIrRepair = newOneTimeCondition(); - // this blocks the IR from committing, so we can reenable the preview - cluster.filters() - .outbound() - .verbs(FINALIZE_PROPOSE_MSG.id) - .from(1).to(2) - .messagesMatching(finalizePropose(irRepairStarted, continueIrRepair)) - .drop(); - Future<RepairResult> previewResult = cluster.get(1).asyncCallsOnInstance(repair(options(true, false))).call(); previewRepairStarted.await(); - // trigger IR and wait till its ready to commit + // trigger IR and wait till it's ready to commit Future<RepairResult> irResult = cluster.get(1).asyncCallsOnInstance(repair(options(false, false))).call(); - irRepairStarted.await(); + RepairResult ir = irResult.get(); + assertTrue(ir.success); // IR was submitted after preview repair has acquired sstables, but has higher priority + assertFalse(ir.wasInconsistent); // not preview, so we don't care about preview notification // unblock preview repair and wait for it to complete continuePreviewRepair.signalAll(); RepairResult rs = previewResult.get(); - assertFalse(rs.success); // preview repair should have failed + assertFalse(rs.success); // preview repair was started earlier than IR session; but has smaller priority assertFalse(rs.wasInconsistent); // and no mismatches should have been reported - - continueIrRepair.signalAll(); - RepairResult ir = irResult.get(); - assertTrue(ir.success); - assertFalse(ir.wasInconsistent); // not preview, so we don't care about preview notification } } diff --git a/test/distributed/org/apache/cassandra/distributed/test/UpgradeSSTablesTest.java b/test/distributed/org/apache/cassandra/distributed/test/UpgradeSSTablesTest.java index c599f17e87..445e349885 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/UpgradeSSTablesTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/UpgradeSSTablesTest.java @@ -19,22 +19,209 @@ package org.apache.cassandra.distributed.test; import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.junit.Assert; import org.junit.Test; -import org.apache.cassandra.config.DatabaseDescriptor; +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; +import net.bytebuddy.implementation.MethodDelegation; +import net.bytebuddy.implementation.bind.annotation.SuperCall; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.compaction.ActiveCompactions; +import org.apache.cassandra.db.compaction.CompactionInfo; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.distributed.api.ConsistencyLevel; import org.apache.cassandra.distributed.api.ICluster; import org.apache.cassandra.distributed.api.IInvokableInstance; import org.apache.cassandra.distributed.api.LogAction; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.concurrent.CountDownLatch; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static org.apache.cassandra.utils.concurrent.CountDownLatch.newCountDownLatch; public class UpgradeSSTablesTest extends TestBaseImpl { + @Test + public void upgradeSSTablesInterruptsOngoingCompaction() throws Throwable + { + try (ICluster<IInvokableInstance> cluster = init(builder().withNodes(1).start())) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck));"); + cluster.get(1).acceptsOnInstance((String ks) -> { + ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore("tbl"); + cfs.disableAutoCompaction(); + CompactionManager.instance.setMaximumCompactorThreads(1); + CompactionManager.instance.setCoreCompactorThreads(1); + }).accept(KEYSPACE); + + String blob = "blob"; + for (int i = 0; i < 6; i++) + blob += blob; + + for (int cnt = 0; cnt < 5; cnt++) + { + for (int i = 0; i < 100; i++) + { + cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?,?,?)", + ConsistencyLevel.QUORUM, (cnt * 1000) + i, i, blob); + } + cluster.get(1).nodetool("flush", KEYSPACE, "tbl"); + } + + LogAction logAction = cluster.get(1).logs(); + logAction.mark(); + Future<?> future = cluster.get(1).asyncAcceptsOnInstance((String ks) -> { + ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore("tbl"); + CompactionManager.instance.submitMaximal(cfs, FBUtilities.nowInSeconds(), false, OperationType.COMPACTION); + }).apply(KEYSPACE); + Assert.assertEquals(0, cluster.get(1).nodetool("upgradesstables", "-a", KEYSPACE, "tbl")); + future.get(); + Assert.assertFalse(logAction.grep("Compaction interrupted").getResult().isEmpty()); + } + } + + @Test + public void compactionDoesNotCancelUpgradeSSTables() throws Throwable + { + try (ICluster<IInvokableInstance> cluster = init(builder().withNodes(1).start())) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck));"); + cluster.get(1).acceptsOnInstance((String ks) -> { + ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore("tbl"); + cfs.disableAutoCompaction(); + CompactionManager.instance.setMaximumCompactorThreads(1); + CompactionManager.instance.setCoreCompactorThreads(1); + }).accept(KEYSPACE); + + String blob = "blob"; + for (int i = 0; i < 6; i++) + blob += blob; + + for (int cnt = 0; cnt < 5; cnt++) + { + for (int i = 0; i < 100; i++) + { + cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?,?,?)", + ConsistencyLevel.QUORUM, (cnt * 1000) + i, i, blob); + } + cluster.get(1).nodetool("flush", KEYSPACE, "tbl"); + } + + LogAction logAction = cluster.get(1).logs(); + logAction.mark(); + Assert.assertEquals(0, cluster.get(1).nodetool("upgradesstables", "-a", KEYSPACE, "tbl")); + Assert.assertFalse(logAction.watchFor("Compacting").getResult().isEmpty()); + + cluster.get(1).acceptsOnInstance((String ks) -> { + ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore("tbl"); + FBUtilities.allOf(CompactionManager.instance.submitMaximal(cfs, FBUtilities.nowInSeconds(), false, OperationType.COMPACTION)) + .awaitUninterruptibly(1, TimeUnit.MINUTES); + + }).accept(KEYSPACE); + Assert.assertTrue(logAction.grep("Compaction interrupted").getResult().isEmpty()); + Assert.assertFalse(logAction.grep("Finished Upgrade sstables").getResult().isEmpty()); + Assert.assertFalse(logAction.grep("Compacted (.*) 5 sstables to").getResult().isEmpty()); + } + } + + @Test + public void cleanupDoesNotInterruptUpgradeSSTables() throws Throwable + { + try (ICluster<IInvokableInstance> cluster = init(builder().withNodes(1).withInstanceInitializer(BB::install).start())) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck));"); + + cluster.get(1).acceptsOnInstance((String ks) -> { + ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore("tbl"); + cfs.disableAutoCompaction(); + }).accept(KEYSPACE); + + String blob = "blob"; + for (int i = 0; i < 6; i++) + blob += blob; + + for (int i = 0; i < 10000; i++) + { + cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?,?,?)", + ConsistencyLevel.QUORUM, i, i, blob); + } + + cluster.get(1).nodetool("flush", KEYSPACE, "tbl"); + + LogAction logAction = cluster.get(1).logs(); + logAction.mark(); + + // Start upgradingsstables - use BB to pause once inside ActiveCompactions.beginCompaction + Thread upgradeThread = new Thread(() -> { + cluster.get(1).nodetool("upgradesstables", "-a", KEYSPACE, "tbl"); + }); + upgradeThread.start(); + Assert.assertTrue(cluster.get(1).callOnInstance(() -> BB.starting.awaitUninterruptibly(1, TimeUnit.MINUTES))); + + // Start a scrub and make sure that it fails, log check later to make sure it was + // because it cannot cancel the active upgrade sstables + Assert.assertNotEquals(0, cluster.get(1).nodetool("scrub", KEYSPACE, "tbl")); + + // Now resume the upgrade sstables so test can shut down + cluster.get(1).runOnInstance(() -> { + BB.start.decrement(); + }); + upgradeThread.join(); + + Assert.assertFalse(logAction.grep("Unable to cancel in-progress compactions, since they're running with higher or same priority: Upgrade sstables").getResult().isEmpty()); + Assert.assertFalse(logAction.grep("Starting Scrub for ").getResult().isEmpty()); + Assert.assertFalse(logAction.grep("Finished Upgrade sstables for distributed_test_keyspace.tbl successfully").getResult().isEmpty()); + } + } + + @Test + public void truncateWhileUpgrading() throws Throwable + { + try (ICluster<IInvokableInstance> cluster = init(builder().withNodes(1).start())) + { + cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck)) ")); + cluster.get(1).acceptsOnInstance((String ks) -> { + ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore("tbl"); + cfs.disableAutoCompaction(); + CompactionManager.instance.setMaximumCompactorThreads(1); + CompactionManager.instance.setCoreCompactorThreads(1); + }).accept(KEYSPACE); + + String blob = "blob"; + for (int i = 0; i < 10; i++) + blob += blob; + + for (int i = 0; i < 500; i++) + { + cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.tbl (pk, ck, v) VALUES (?,?,?)"), + ConsistencyLevel.QUORUM, i, i, blob); + if (i > 0 && i % 100 == 0) + cluster.get(1).nodetool("flush", KEYSPACE, "tbl"); + } + + LogAction logAction = cluster.get(1).logs(); + logAction.mark(); + + Future<?> upgrade = CompletableFuture.runAsync(() -> { + cluster.get(1).nodetool("upgradesstables", "-a", KEYSPACE, "tbl"); + }); + + cluster.schemaChange(withKeyspace("TRUNCATE %s.tbl")); + upgrade.get(); + Assert.assertFalse(logAction.grep("Compaction interrupted").getResult().isEmpty()); + } + } + @Test public void rewriteSSTablesTest() throws Throwable { @@ -116,4 +303,38 @@ public class UpgradeSSTablesTest extends TestBaseImpl } } } + + public static class BB + { + // Will be initialized in the context of the instance class loader + static CountDownLatch starting = newCountDownLatch(1); + static CountDownLatch start = newCountDownLatch(1); + + public static void install(ClassLoader classLoader, Integer num) + { + new ByteBuddy().rebase(ActiveCompactions.class) + .method(named("beginCompaction")) + .intercept(MethodDelegation.to(BB.class)) + .make() + .load(classLoader, ClassLoadingStrategy.Default.INJECTION); + } + + @SuppressWarnings("unused") + public static void beginCompaction(CompactionInfo.Holder ci, @SuperCall Callable<Void> zuperCall) + { + try + { + zuperCall.call(); + if (ci.getCompactionInfo().getTaskType() == OperationType.UPGRADE_SSTABLES) + { + starting.decrement(); + Assert.assertTrue(start.awaitUninterruptibly(1, TimeUnit.MINUTES)); + } + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + } } diff --git a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java index 733e46fd8d..a780cf1e26 100644 --- a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java +++ b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java @@ -198,9 +198,7 @@ public class LongLeveledCompactionStrategyTest } return null; } - }, true, true); - - + }, OperationType.COMPACTION, true, true); } @Test diff --git a/test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java index 67421ba6d2..51da0c4431 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java @@ -86,14 +86,16 @@ public class CancelCompactionsTest extends CQLTester assertEquals(1, activeCompactions.size()); assertEquals(activeCompactions.get(0).getCompactionInfo().getSSTables(), toMarkCompacting); // predicate requires the non-compacting sstables, should not cancel the one currently compacting: - cfs.runWithCompactionsDisabled(() -> null, (sstable) -> !toMarkCompacting.contains(sstable), false, false, true); + cfs.runWithCompactionsDisabled(() -> null, (sstable) -> !toMarkCompacting.contains(sstable), + OperationType.P0, false, false, true); assertEquals(1, activeCompactions.size()); assertFalse(activeCompactions.get(0).isStopRequested()); // predicate requires the compacting ones - make sure stop is requested and that when we abort that // compaction we actually run the callable (countdown the latch) CountDownLatch cdl = new CountDownLatch(1); - Thread t = new Thread(() -> cfs.runWithCompactionsDisabled(() -> { cdl.countDown(); return null; }, toMarkCompacting::contains, false, false, true)); + Thread t = new Thread(() -> cfs.runWithCompactionsDisabled(() -> { cdl.countDown(); return null; }, toMarkCompacting::contains, + OperationType.P0, false, false, true)); t.start(); while (!activeCompactions.get(0).isStopRequested()) Thread.sleep(100); @@ -139,13 +141,16 @@ public class CancelCompactionsTest extends CQLTester expectedSSTables.add(new HashSet<>(sstables.subList(6, 9))); assertEquals(compactingSSTables, expectedSSTables); - cfs.runWithCompactionsDisabled(() -> null, (sstable) -> false, false, false, true); + cfs.runWithCompactionsDisabled(() -> null, (sstable) -> false, + OperationType.P0, false, false, true); assertEquals(2, activeCompactions.size()); assertTrue(activeCompactions.stream().noneMatch(CompactionInfo.Holder::isStopRequested)); CountDownLatch cdl = new CountDownLatch(1); // start a compaction which only needs the sstables where first token is > 50 - these are the sstables compacted by tcts.get(1) - Thread t = new Thread(() -> cfs.runWithCompactionsDisabled(() -> { cdl.countDown(); return null; }, (sstable) -> first(sstable) > 50, false, false, true)); + Thread t = new Thread(() -> cfs.runWithCompactionsDisabled(() -> { cdl.countDown(); return null; }, + (sstable) -> first(sstable) > 50, + OperationType.P0, false, false, true)); t.start(); activeCompactions = getActiveCompactionsForTable(cfs); assertEquals(2, activeCompactions.size()); @@ -333,7 +338,8 @@ public class CancelCompactionsTest extends CQLTester } } assertTrue(foundCompaction); - cfs.runWithCompactionsDisabled(() -> {compactionsStopped.countDown(); return null;}, (sstable) -> true, false, false, true); + cfs.runWithCompactionsDisabled(() -> { compactionsStopped.countDown(); return null; }, + (sstable) -> true, OperationType.P0, false, false, true); // wait for the runWithCompactionsDisabled callable compactionsStopped.await(); assertEquals(1, getActiveCompactionsForTable(cfs).size()); @@ -430,7 +436,8 @@ public class CancelCompactionsTest extends CQLTester Set<SSTableReader> sstables = new HashSet<>(); try (LifecycleTransaction txn = idx.getTracker().tryModify(idx.getLiveSSTables(), OperationType.COMPACTION)) { - getCurrentColumnFamilyStore().runWithCompactionsDisabled(() -> true, (sstable) -> { sstables.add(sstable); return true;}, false, false, false); + getCurrentColumnFamilyStore().runWithCompactionsDisabled(() -> true, (sstable) -> { sstables.add(sstable); return true;}, + OperationType.P0, false, false, false); } // the predicate only gets compacting sstables, and we are only compacting the 2i sstables - with interruptIndexes = false we should see no sstables here assertTrue(sstables.isEmpty()); diff --git a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java index a559478b87..c95b2dbb0e 100644 --- a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java +++ b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java @@ -41,17 +41,14 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; -import org.apache.cassandra.Util; -import org.apache.cassandra.concurrent.ExecutorPlus; -import org.apache.cassandra.concurrent.FutureTask; -import org.apache.cassandra.concurrent.ImmediateExecutor; -import org.apache.cassandra.utils.TimeUUID; -import org.apache.cassandra.utils.concurrent.Future; import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.concurrent.ExecutorPlus; +import org.apache.cassandra.concurrent.FutureTask; +import org.apache.cassandra.concurrent.ImmediateExecutor; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.db.ColumnFamilyStore; @@ -78,7 +75,10 @@ import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.streaming.PreviewKind; +import org.apache.cassandra.Util; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.concurrent.Future; +import org.apache.cassandra.utils.TimeUUID; import org.apache.cassandra.utils.WrappedRunnable; import org.apache.cassandra.utils.concurrent.Transactional; @@ -664,15 +664,24 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest { @Override public boolean apply(SSTableReader sstable) + { + return true; + } + }; + + CompactionManager.instance.active.beginCompaction(holder); + PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, nextTimeUUID(), 10, 1, acp) + { + protected PendingAntiCompaction.AcquireResult acquireSSTables() { cdl.countDown(); if (cdl.getCount() > 0) throw new PendingAntiCompaction.SSTableAcquisitionException("blah"); - return true; + else + CompactionManager.instance.active.finishCompaction(holder); + return super.acquireSSTables(); } }; - CompactionManager.instance.active.beginCompaction(holder); - PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, nextTimeUUID(), 10, 1, acp); Future f = es.submit(acquisitionCallable); cdl.await(); assertNotNull(f.get()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org