This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch cassandra-3.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.0 by this push: new 730b898b74 Don't group TWCS sstables for anticompaction 730b898b74 is described below commit 730b898b7415e568138c6fc1e961f69b41ba1fa0 Author: Marcus Eriksson <marc...@apache.org> AuthorDate: Tue Oct 18 12:57:07 2022 +0200 Don't group TWCS sstables for anticompaction Patch by marcuse; reviewed by Aleksey Yeschenko for CASSANDRA-17970 --- CHANGES.txt | 1 + .../compaction/TimeWindowCompactionStrategy.java | 14 +++++++++++++ test/unit/org/apache/cassandra/MockSchema.java | 23 +++++++++++++++++++--- .../TimeWindowCompactionStrategyTest.java | 23 ++++++++++++++++++++++ 4 files changed, 58 insertions(+), 3 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 98d1cb1e04..3900ab5b58 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.29 + * Avoid anticompaction mixing data from two different time windows with TWCS (CASSANDRA-17970) * Do not spam the logs with MigrationCoordinator not being able to pull schemas (CASSANDRA-18096) * Fix incorrect resource name in LIST PERMISSION output (CASSANDRA-17848) * Suppress CVE-2022-41854 and similar (CASSANDRA-18083) diff --git a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java index c44d3aa980..5ae1cc784c 100644 --- a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java @@ -336,6 +336,20 @@ public class TimeWindowCompactionStrategy extends AbstractCompactionStrategy return Collections.singleton(new CompactionTask(cfs, txn, gcBefore)); } + /** + * TWCS should not group sstables for anticompaction - this can mix new and old data + */ + @Override + public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader> sstablesToGroup) + { + Collection<Collection<SSTableReader>> groups = new ArrayList<>(sstablesToGroup.size()); + for (SSTableReader sstable : sstablesToGroup) + { + groups.add(Collections.singleton(sstable)); + } + return groups; + } + @Override @SuppressWarnings("resource") // transaction is closed by AbstractCompactionTask::execute public synchronized AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore) diff --git a/test/unit/org/apache/cassandra/MockSchema.java b/test/unit/org/apache/cassandra/MockSchema.java index 1b47fc22ff..5f3198dad0 100644 --- a/test/unit/org/apache/cassandra/MockSchema.java +++ b/test/unit/org/apache/cassandra/MockSchema.java @@ -50,6 +50,8 @@ import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.utils.AlwaysPresentFilter; import org.apache.cassandra.utils.ByteBufferUtil; +import static org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE; + public class MockSchema { static @@ -84,7 +86,18 @@ public class MockSchema return sstable(generation, size, false, cfs); } + public static SSTableReader sstableWithTimestamp(int generation, long timestamp, ColumnFamilyStore cfs) + { + return sstable(generation, 0, false, timestamp, cfs); + } + public static SSTableReader sstable(int generation, int size, boolean keepRef, ColumnFamilyStore cfs) + { + return sstable(generation, size, keepRef, System.currentTimeMillis() * 1000, cfs); + } + + + public static SSTableReader sstable(int generation, int size, boolean keepRef, long timestamp, ColumnFamilyStore cfs) { Descriptor descriptor = new Descriptor(cfs.getDirectories().getDirectoryForNewSSTables(), cfs.keyspace.getName(), @@ -119,9 +132,13 @@ public class MockSchema } } SerializationHeader header = SerializationHeader.make(cfs.metadata, Collections.emptyList()); - StatsMetadata metadata = (StatsMetadata) new MetadataCollector(cfs.metadata.comparator) - .finalizeMetadata(cfs.metadata.partitioner.getClass().getCanonicalName(), 0.01f, -1, header) - .get(MetadataType.STATS); + MetadataCollector collector = new MetadataCollector(cfs.metadata.comparator); + collector.update(new DeletionTime(timestamp, (int) (System.currentTimeMillis() / 1000))); + StatsMetadata metadata = (StatsMetadata) collector.finalizeMetadata(cfs.metadata.partitioner.getClass().getCanonicalName(), + 0.01f, + -1, + header).get(MetadataType.STATS); + SSTableReader reader = SSTableReader.internalOpen(descriptor, components, cfs.metadata, segmentedFile.sharedCopy(), segmentedFile.sharedCopy(), indexSummary.sharedCopy(), new AlwaysPresentFilter(), 1L, metadata, SSTableReader.OpenReason.NORMAL, header); diff --git a/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java index 051e7c0432..9bed7c1c94 100644 --- a/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java @@ -19,12 +19,14 @@ package org.apache.cassandra.db.compaction; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; @@ -47,6 +49,7 @@ import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.MockSchema; import static org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy.getWindowBoundsInMillis; import static org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy.newestBucket; @@ -279,4 +282,24 @@ public class TimeWindowCompactionStrategyTest extends SchemaLoader t.transaction.abort(); } + @Test + public void testGroupForAntiCompaction() + { + ColumnFamilyStore cfs = MockSchema.newCFS("test_group_for_anticompaction"); + cfs.setCompactionParameters(ImmutableMap.of("class", "TimeWindowCompactionStrategy", + "timestamp_resolution", "MILLISECONDS", + "compaction_window_size", "1", + "compaction_window_unit", "MINUTES")); + + List<SSTableReader> sstables = new ArrayList<>(10); + long curr = System.currentTimeMillis(); + for (int i = 0; i < 10; i++) + sstables.add(MockSchema.sstableWithTimestamp(i, curr + TimeUnit.MILLISECONDS.convert(i, TimeUnit.MINUTES), cfs)); + + cfs.addSSTables(sstables); + Collection<Collection<SSTableReader>> groups = cfs.getCompactionStrategyManager().getStrategies().get(1).groupSSTablesForAntiCompaction(sstables); + assertTrue(groups.size() > 0); + for (Collection<SSTableReader> group : groups) + assertEquals(1, group.size()); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org