Repository: cassandra Updated Branches: refs/heads/trunk 2aeed037e -> bdb52801c
Abort compactions quicker Patch by marcuse; reviewed by Alex Petrov for CASSANDRA-14397 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bdb52801 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bdb52801 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bdb52801 Branch: refs/heads/trunk Commit: bdb52801c7384ef07f7fc0b4f3b965bdf35d821d Parents: 2aeed03 Author: Marcus Eriksson <marc...@apache.org> Authored: Fri Apr 13 15:15:03 2018 +0200 Committer: Marcus Eriksson <marc...@apache.org> Committed: Wed Jun 13 13:06:59 2018 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/compaction/CompactionIterator.java | 38 +++++++++++- .../db/compaction/CompactionManager.java | 3 - .../cassandra/db/compaction/CompactionTask.java | 3 - .../db/repair/CassandraValidationIterator.java | 8 --- .../db/compaction/CompactionIteratorTest.java | 61 ++++++++++++++++++++ 6 files changed, 99 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdb52801/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 629df0c..49738cd 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Abort compactions quicker (CASSANDRA-14397) * Support light-weight transactions in cassandra-stress (CASSANDRA-13529) * Make AsyncOneResponse use the correct timeout (CASSANDRA-14509) * Add option to sanity check tombstones on reads/compactions (CASSANDRA-14467) http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdb52801/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java index dfbb6cc..c9d7e52 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java @@ -104,7 +104,8 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte ? EmptyIterators.unfilteredPartition(controller.cfs.metadata()) : UnfilteredPartitionIterators.merge(scanners, nowInSec, listener()); merged = Transformation.apply(merged, new GarbageSkipper(controller, nowInSec)); - this.compacted = Transformation.apply(merged, new Purger(controller, nowInSec)); + merged = Transformation.apply(merged, new Purger(controller, nowInSec)); + compacted = Transformation.apply(merged, new AbortableUnfilteredPartitionTransformation(this)); } public TableMetadata metadata() @@ -542,4 +543,39 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte return new GarbageSkippingUnfilteredRowIterator(partition, UnfilteredRowIterators.merge(iters, nowInSec), nowInSec, cellLevelGC); } } + + private static class AbortableUnfilteredPartitionTransformation extends Transformation<UnfilteredRowIterator> + { + private final AbortableUnfilteredRowTransformation abortableIter; + + private AbortableUnfilteredPartitionTransformation(CompactionIterator iter) + { + this.abortableIter = new AbortableUnfilteredRowTransformation(iter); + } + + @Override + protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition) + { + if (abortableIter.iter.isStopRequested()) + throw new CompactionInterruptedException(abortableIter.iter.getCompactionInfo()); + return Transformation.apply(partition, abortableIter); + } + } + + private static class AbortableUnfilteredRowTransformation extends Transformation + { + private final CompactionIterator iter; + + private AbortableUnfilteredRowTransformation(CompactionIterator iter) + { + this.iter = iter; + } + + public Row applyToRow(Row row) + { + if (iter.isStopRequested()) + throw new CompactionInterruptedException(iter.getCompactionInfo()); + return row; + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdb52801/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 5c61982..a872fea 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -1155,9 +1155,6 @@ public class CompactionManager implements CompactionManagerMBean while (ci.hasNext()) { - if (ci.isStopRequested()) - throw new CompactionInterruptedException(ci.getCompactionInfo()); - try (UnfilteredRowIterator partition = ci.next(); UnfilteredRowIterator notCleaned = cleanupStrategy.cleanup(partition)) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdb52801/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 5697df2..662384c 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -197,9 +197,6 @@ public class CompactionTask extends AbstractCompactionTask estimatedKeys = writer.estimatedKeys(); while (ci.hasNext()) { - if (ci.isStopRequested()) - throw new CompactionInterruptedException(ci.getCompactionInfo()); - if (writer.append(ci.next())) totalKeysWritten++; http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdb52801/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java b/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java index 6fa0be2..caf1b8e 100644 --- a/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java +++ b/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java @@ -281,23 +281,15 @@ public class CassandraValidationIterator extends ValidationPartitionIterator return cfs.metadata.get(); } - private void throwIfStopRequested() - { - if (ci.isStopRequested()) - throw new CompactionInterruptedException(ci.getCompactionInfo()); - } - @Override public boolean hasNext() { - throwIfStopRequested(); return ci.hasNext(); } @Override public UnfilteredRowIterator next() { - throwIfStopRequested(); return ci.next(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdb52801/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java index 99df52f..d5ea56c 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java @@ -310,6 +310,67 @@ public class CompactionIteratorTest } } + @Test + public void transformTest() + { + UnfilteredRowsGenerator generator = new UnfilteredRowsGenerator(metadata.comparator, false); + List<List<Unfiltered>> inputLists = parse(new String[] {"10[100] 11[100] 12[100]"}, generator); + List<List<Unfiltered>> tombstoneLists = parse(new String[] {}, generator); + List<Iterable<UnfilteredRowIterator>> content = ImmutableList.copyOf(Iterables.transform(inputLists, list -> ImmutableList.of(listToIterator(list, kk)))); + Map<DecoratedKey, Iterable<UnfilteredRowIterator>> transformedSources = new TreeMap<>(); + transformedSources.put(kk, Iterables.transform(tombstoneLists, list -> listToIterator(list, kk))); + try (CompactionController controller = new Controller(Keyspace.openAndGetStore(metadata), transformedSources, GC_BEFORE); + CompactionIterator iter = new CompactionIterator(OperationType.COMPACTION, + Lists.transform(content, x -> new Scanner(x)), + controller, NOW, null)) + { + assertTrue(iter.hasNext()); + UnfilteredRowIterator rows = iter.next(); + assertTrue(rows.hasNext()); + assertNotNull(rows.next()); + + iter.stop(); + try + { + // Will call Transformation#applyToRow + rows.hasNext(); + fail("Should have thrown CompactionInterruptedException"); + } + catch (CompactionInterruptedException e) + { + // ignore + } + } + } + + @Test + public void transformPartitionTest() + { + UnfilteredRowsGenerator generator = new UnfilteredRowsGenerator(metadata.comparator, false); + List<List<Unfiltered>> inputLists = parse(new String[] {"10[100] 11[100] 12[100]"}, generator); + List<List<Unfiltered>> tombstoneLists = parse(new String[] {}, generator); + List<Iterable<UnfilteredRowIterator>> content = ImmutableList.copyOf(Iterables.transform(inputLists, list -> ImmutableList.of(listToIterator(list, kk)))); + Map<DecoratedKey, Iterable<UnfilteredRowIterator>> transformedSources = new TreeMap<>(); + transformedSources.put(kk, Iterables.transform(tombstoneLists, list -> listToIterator(list, kk))); + try (CompactionController controller = new Controller(Keyspace.openAndGetStore(metadata), transformedSources, GC_BEFORE); + CompactionIterator iter = new CompactionIterator(OperationType.COMPACTION, + Lists.transform(content, x -> new Scanner(x)), + controller, NOW, null)) + { + iter.stop(); + try + { + // Will call Transformation#applyToPartition + iter.hasNext(); + fail("Should have thrown CompactionInterruptedException"); + } + catch (CompactionInterruptedException e) + { + // ignore + } + } + } + class Controller extends CompactionController { private final Map<DecoratedKey, Iterable<UnfilteredRowIterator>> tombstoneSources; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org