This is an automated email from the ASF dual-hosted git repository.

blambov pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-3.11 by this push:
     new 87c2af85c1 Fix delayed SSTable release with 
unsafe_aggressive_sstable_expiration
87c2af85c1 is described below

commit 87c2af85c1305c130af7d66f83dec03a1c4a8bb2
Author: Ethan Brown <ethan.br...@datastax.com>
AuthorDate: Fri Aug 18 13:02:15 2023 -0700

    Fix delayed SSTable release with unsafe_aggressive_sstable_expiration
    
    patch by Ethan Brown; reviewed by Branimir Lambov and Mick Semb Wever for 
CASSANDRA-18756
---
 CHANGES.txt                                        |   1 +
 .../db/compaction/CompactionController.java        |  12 +-
 .../db/compaction/CompactionControllerTest.java    | 140 +++++++++++++++++++++
 3 files changed, 146 insertions(+), 7 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 2d9e2059e1..74755be6e7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.11.17
+ * Fix delayed SSTable release with unsafe_aggressive_sstable_expiration 
(CASSANDRA-18756)
  * Revert CASSANDRA-18543 (CASSANDRA-18854)
  * Fix NPE when using udfContext in UDF after a restart of a node 
(CASSANDRA-18739)
 Merged from 3.0:
diff --git 
a/src/java/org/apache/cassandra/db/compaction/CompactionController.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index 19318ff1a9..06272a1075 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -81,6 +81,8 @@ public class CompactionController implements AutoCloseable
 
     public CompactionController(ColumnFamilyStore cfs, Set<SSTableReader> 
compacting, int gcBefore, RateLimiter limiter, TombstoneOption tombstoneOption)
     {
+        //When making changes to the method, be aware that some of the state 
of the controller may still be uninitialized
+        //(e.g. TWCS sets up the value of ignoreOverlaps() after this 
completes)
         assert cfs != null;
         this.cfs = cfs;
         this.gcBefore = gcBefore;
@@ -105,12 +107,6 @@ public class CompactionController implements AutoCloseable
             return;
         }
 
-        if (ignoreOverlaps())
-        {
-            logger.debug("not refreshing overlaps - running with 
ignoreOverlaps activated");
-            return;
-        }
-
         for (SSTableReader reader : overlappingSSTables)
         {
             if (reader.isMarkedCompacted())
@@ -129,7 +125,7 @@ public class CompactionController implements AutoCloseable
         if (this.overlappingSSTables != null)
             close();
 
-        if (compacting == null || ignoreOverlaps())
+        if (compacting == null)
             overlappingSSTables = 
Refs.tryRef(Collections.<SSTableReader>emptyList());
         else
             overlappingSSTables = 
cfs.getAndReferenceOverlappingLiveSSTables(compacting);
@@ -358,6 +354,8 @@ public class CompactionController implements AutoCloseable
      * This strategy can retain for a long time a lot of sstables on disk (see 
CASSANDRA-13418) so this option
      * control whether or not this check should be ignored.
      *
+     * Do NOT call this method in the CompactionController constructor
+     *
      * @return false by default
      */
     protected boolean ignoreOverlaps()
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java 
b/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
index 052206e685..aa95ba56fb 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
@@ -19,12 +19,19 @@
 package org.apache.cassandra.db.compaction;
 
 import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
 
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
@@ -41,17 +48,27 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertNotNull;
 
+@RunWith(BMUnitRunner.class)
 public class CompactionControllerTest extends SchemaLoader
 {
     private static final String KEYSPACE = "CompactionControllerTest";
     private static final String CF1 = "Standard1";
     private static final String CF2 = "Standard2";
+    private static final int TTL_SECONDS = 10;
+    private static CountDownLatch compaction2FinishLatch = new 
CountDownLatch(1);
+    private static CountDownLatch createCompactionControllerLatch = new 
CountDownLatch(1);
+    private static CountDownLatch compaction1RefreshLatch = new 
CountDownLatch(1);
+    private static CountDownLatch refreshCheckLatch = new CountDownLatch(1);
+    private static int overlapRefreshCounter = 0;
 
     @BeforeClass
     public static void defineSchema() throws ConfigurationException
@@ -184,6 +201,124 @@ public class CompactionControllerTest extends SchemaLoader
         assertEquals(1, expired.size());
     }
 
+    @Test
+    @BMRules(rules = {
+    @BMRule(name = "Pause compaction",
+    targetClass = "CompactionTask",
+    targetMethod = "runMayThrow",
+    targetLocation = "INVOKE getCompactionAwareWriter",
+    condition = "Thread.currentThread().getName().equals(\"compaction1\")",
+    action = 
"org.apache.cassandra.db.compaction.CompactionControllerTest.createCompactionControllerLatch.countDown();"
 +
+             
"com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly" +
+             
"(org.apache.cassandra.db.compaction.CompactionControllerTest.compaction2FinishLatch);"),
+    @BMRule(name = "Check overlaps",
+    targetClass = "CompactionTask",
+    targetMethod = "runMayThrow",
+    targetLocation = "INVOKE finish",
+    condition = "Thread.currentThread().getName().equals(\"compaction1\")",
+    action = 
"org.apache.cassandra.db.compaction.CompactionControllerTest.compaction1RefreshLatch.countDown();"
 +
+             
"com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly" +
+             
"(org.apache.cassandra.db.compaction.CompactionControllerTest.refreshCheckLatch);"),
+    @BMRule(name = "Increment overlap refresh counter",
+    targetClass = "ColumnFamilyStore",
+    targetMethod = "getAndReferenceOverlappingLiveSSTables",
+    condition = "Thread.currentThread().getName().equals(\"compaction1\")",
+    action = 
"org.apache.cassandra.db.compaction.CompactionControllerTest.incrementOverlapRefreshCounter();")
+    })
+    public void testIgnoreOverlaps() throws Exception
+    {
+        testOverlapIterator(true);
+        overlapRefreshCounter = 0;
+        compaction2FinishLatch = new CountDownLatch(1);
+        createCompactionControllerLatch = new CountDownLatch(1);
+        compaction1RefreshLatch = new CountDownLatch(1);
+        refreshCheckLatch = new CountDownLatch(1);
+        testOverlapIterator(false);
+    }
+
+    public void testOverlapIterator(boolean ignoreOverlaps) throws Exception
+    {
+
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF1);
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+
+        //create 2 overlapping sstables
+        DecoratedKey key = Util.dk("k1");
+        long timestamp1 = FBUtilities.timestampMicros();
+        long timestamp2 = timestamp1 - 5;
+        applyMutation(cfs.metadata, key, timestamp1);
+        cfs.forceBlockingFlush();
+        assertEquals(cfs.getLiveSSTables().size(), 1);
+        Set<SSTableReader> sstables = cfs.getLiveSSTables();
+
+        applyMutation(cfs.metadata, key, timestamp2);
+        cfs.forceBlockingFlush();
+        assertEquals(cfs.getLiveSSTables().size(), 2);
+        String sstable2 = 
cfs.getLiveSSTables().iterator().next().getFilename();
+
+        
System.setProperty(TimeWindowCompactionStrategyOptions.UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_PROPERTY,
 "true");
+        Map<String, String> options = new HashMap<>();
+        
options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY, 
"30");
+        
options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY, 
"SECONDS");
+        
options.put(TimeWindowCompactionStrategyOptions.TIMESTAMP_RESOLUTION_KEY, 
"MILLISECONDS");
+        
options.put(TimeWindowCompactionStrategyOptions.EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY,
 "0");
+        
options.put(TimeWindowCompactionStrategyOptions.UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_KEY,
 Boolean.toString(ignoreOverlaps));
+        TimeWindowCompactionStrategy twcs = new 
TimeWindowCompactionStrategy(cfs, options);
+        for (SSTableReader sstable : cfs.getLiveSSTables())
+            twcs.addSSTable(sstable);
+
+        twcs.startup();
+
+        CompactionTask task = 
(CompactionTask)twcs.getUserDefinedTask(sstables, 0);
+
+        assertNotNull(task);
+        assertEquals(1, Iterables.size(task.transaction.originals()));
+
+        //start a compaction for the first sstable (compaction1)
+        //the overlap iterator should contain sstable2
+        //this compaction will be paused by the BMRule
+        Thread t = new Thread(() -> {
+            task.execute(null);
+        });
+
+        //start a compaction for the second sstable (compaction2)
+        //the overlap iterator should contain sstable1
+        //this compaction should complete as normal
+        Thread t2 = new Thread(() -> {
+            
Uninterruptibles.awaitUninterruptibly(createCompactionControllerLatch);
+            assertEquals(1, overlapRefreshCounter);
+            CompactionManager.instance.forceUserDefinedCompaction(sstable2);
+
+            //after compaction2 is finished, wait 1 minute and then resume 
compaction1 (this gives enough time for the overlapIterator to be refreshed)
+            //after resuming, the overlap iterator for compaction1 should be 
updated to include the new sstable created by compaction2,
+            //and it should not contain sstable2
+            try
+            {
+                TimeUnit.MINUTES.sleep(1);
+            }
+            catch (InterruptedException e)
+            {
+                throw new RuntimeException(e);
+            }
+            compaction2FinishLatch.countDown();
+        });
+
+        t.setName("compaction1");
+        t.start();
+        t2.start();
+
+        compaction1RefreshLatch.await();
+        //at this point, the overlap iterator for compaction1 should be 
refreshed
+
+        //verify that the overlap iterator for compaction1 is refreshed twice, 
(once during the constructor, and again after compaction2 finishes)
+        assertEquals(2, overlapRefreshCounter);
+
+        refreshCheckLatch.countDown();
+        t.join();
+    }
+
     private void applyMutation(CFMetaData cfm, DecoratedKey key, long 
timestamp)
     {
         ByteBuffer val = ByteBufferUtil.bytes(1L);
@@ -206,4 +341,9 @@ public class CompactionControllerTest extends SchemaLoader
         assertFalse(evaluator.test(boundary));
         assertTrue(evaluator.test(boundary - 1));
     }
+
+    public static void incrementOverlapRefreshCounter()
+    {
+        overlapRefreshCounter++;
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to