CASSANDRA-13418 Allow to skip overlapings checks

 patch by Romain GÉRARD; reviewed by Mick Semb Wever for CASSANDRA-13418


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/14d67d81
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/14d67d81
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/14d67d81

Branch: refs/heads/trunk
Commit: 14d67d81c57d6387c77bd85c57b342d285880835
Parents: 37d6730
Author: Romain GÉRARD <r.ger...@criteo.com>
Authored: Wed Aug 16 16:21:46 2017 +0200
Committer: Mick Semb Wever <m...@apache.org>
Committed: Tue Sep 5 08:33:25 2017 +1000

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/compaction/CompactionController.java     | 67 ++++++++++++++++--
 .../TimeWindowCompactionController.java         | 49 +++++++++++++
 .../TimeWindowCompactionStrategy.java           | 10 +--
 .../TimeWindowCompactionStrategyOptions.java    | 22 ++++++
 .../db/compaction/TimeWindowCompactionTask.java | 42 +++++++++++
 .../db/compaction/CompactionControllerTest.java |  5 ++
 .../TimeWindowCompactionStrategyTest.java       | 74 +++++++++++++++++++-
 8 files changed, 257 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/14d67d81/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1f63ced..9218d90 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.11.1
+ * Add a compaction option to TWCS to ignore sstables overlapping checks 
(CASSANDRA-13418)
  * BTree.Builder memory leak (CASSANDRA-13754)
  * Revert CASSANDRA-10368 of supporting non-pk column filtering due to 
correctness (CASSANDRA-13798)
  * Fix cassandra-stress hang issues when an error during cluster connection 
happens (CASSANDRA-12938)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14d67d81/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/CompactionController.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index bf3647a..84aac09 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.compaction;
 import java.util.*;
 import java.util.function.Predicate;
 
+import org.apache.cassandra.config.Config;
 import org.apache.cassandra.db.Memtable;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 
@@ -49,7 +50,8 @@ import static 
org.apache.cassandra.db.lifecycle.SSTableIntervalTree.buildInterva
 public class CompactionController implements AutoCloseable
 {
     private static final Logger logger = 
LoggerFactory.getLogger(CompactionController.class);
-    static final boolean NEVER_PURGE_TOMBSTONES = 
Boolean.getBoolean("cassandra.never_purge_tombstones");
+    private static final String NEVER_PURGE_TOMBSTONES_PROPERTY = 
Config.PROPERTY_PREFIX + "never_purge_tombstones";
+    static final boolean NEVER_PURGE_TOMBSTONES = 
Boolean.getBoolean(NEVER_PURGE_TOMBSTONES_PROPERTY);
 
     public final ColumnFamilyStore cfs;
     private final boolean compactingRepaired;
@@ -98,7 +100,14 @@ public class CompactionController implements AutoCloseable
     {
         if (NEVER_PURGE_TOMBSTONES)
         {
-            logger.debug("not refreshing overlaps - running with 
-Dcassandra.never_purge_tombstones=true");
+            logger.debug("not refreshing overlaps - running with -D{}=true",
+                    NEVER_PURGE_TOMBSTONES_PROPERTY);
+            return;
+        }
+
+        if (ignoreOverlaps())
+        {
+            logger.debug("not refreshing overlaps - running with 
ignoreOverlaps activated");
             return;
         }
 
@@ -120,7 +129,7 @@ public class CompactionController implements AutoCloseable
         if (this.overlappingSSTables != null)
             close();
 
-        if (compacting == null)
+        if (compacting == null || ignoreOverlaps())
             overlappingSSTables = 
Refs.tryRef(Collections.<SSTableReader>emptyList());
         else
             overlappingSSTables = 
cfs.getAndReferenceOverlappingLiveSSTables(compacting);
@@ -129,7 +138,7 @@ public class CompactionController implements AutoCloseable
 
     public Set<SSTableReader> getFullyExpiredSSTables()
     {
-        return getFullyExpiredSSTables(cfs, compacting, overlappingSSTables, 
gcBefore);
+        return getFullyExpiredSSTables(cfs, compacting, overlappingSSTables, 
gcBefore, ignoreOverlaps());
     }
 
     /**
@@ -146,20 +155,39 @@ public class CompactionController implements AutoCloseable
      * @param compacting we take the drop-candidates from this set, it is 
usually the sstables included in the compaction
      * @param overlapping the sstables that overlap the ones in compacting.
      * @param gcBefore
+     * @param ignoreOverlaps don't check if data shadows/overlaps any data in 
other sstables
      * @return
      */
-    public static Set<SSTableReader> getFullyExpiredSSTables(ColumnFamilyStore 
cfStore, Iterable<SSTableReader> compacting, Iterable<SSTableReader> 
overlapping, int gcBefore)
+    public static Set<SSTableReader> getFullyExpiredSSTables(ColumnFamilyStore 
cfStore,
+                                                             
Iterable<SSTableReader> compacting,
+                                                             
Iterable<SSTableReader> overlapping,
+                                                             int gcBefore,
+                                                             boolean 
ignoreOverlaps)
     {
         logger.trace("Checking droppable sstables in {}", cfStore);
 
         if (NEVER_PURGE_TOMBSTONES || compacting == null)
-            return Collections.<SSTableReader>emptySet();
+            return Collections.emptySet();
 
         if 
(cfStore.getCompactionStrategyManager().onlyPurgeRepairedTombstones() && 
!Iterables.all(compacting, SSTableReader::isRepaired))
             return Collections.emptySet();
 
-        List<SSTableReader> candidates = new ArrayList<>();
+        if (ignoreOverlaps)
+        {
+            Set<SSTableReader> fullyExpired = new HashSet<>();
+            for (SSTableReader candidate : compacting)
+            {
+                if (candidate.getSSTableMetadata().maxLocalDeletionTime < 
gcBefore)
+                {
+                    fullyExpired.add(candidate);
+                    logger.trace("Dropping overlap ignored expired SSTable {} 
(maxLocalDeletionTime={}, gcBefore={})",
+                                 candidate, 
candidate.getSSTableMetadata().maxLocalDeletionTime, gcBefore);
+                }
+            }
+            return fullyExpired;
+        }
 
+        List<SSTableReader> candidates = new ArrayList<>();
         long minTimestamp = Long.MAX_VALUE;
 
         for (SSTableReader sstable : overlapping)
@@ -203,6 +231,14 @@ public class CompactionController implements AutoCloseable
         return new HashSet<>(candidates);
     }
 
+    public static Set<SSTableReader> getFullyExpiredSSTables(ColumnFamilyStore 
cfStore,
+                                                             
Iterable<SSTableReader> compacting,
+                                                             
Iterable<SSTableReader> overlapping,
+                                                             int gcBefore)
+    {
+        return getFullyExpiredSSTables(cfStore, compacting, overlapping, 
gcBefore, false);
+    }
+
     public String getKeyspace()
     {
         return cfs.keyspace.getName();
@@ -306,6 +342,23 @@ public class CompactionController implements AutoCloseable
         return reader.simpleIterator(dfile, key, position, tombstoneOnly);
     }
 
+    /**
+     * Is overlapped sstables ignored
+     *
+     * Control whether or not we are taking into account overlapping sstables 
when looking for fully expired sstables.
+     * In order to reduce the amount of work needed, we look for sstables that 
can be dropped instead of compacted.
+     * As a safeguard mechanism, for each time range of data in a sstable, we 
are checking globally to see if all data
+     * of this time range is fully expired before considering to drop the 
sstable.
+     * This strategy can retain for a long time a lot of sstables on disk (see 
CASSANDRA-13418) so this option
+     * control whether or not this check should be ignored.
+     *
+     * @return false by default
+     */
+    protected boolean ignoreOverlaps()
+    {
+        return false;
+    }
+
     private FileDataInput openDataFile(SSTableReader reader)
     {
         return limiter != null ? reader.openDataReader(limiter) : 
reader.openDataReader();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14d67d81/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionController.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionController.java
 
b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionController.java
new file mode 100644
index 0000000..cf9e0e6
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionController.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+
+public class TimeWindowCompactionController extends CompactionController
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(TimeWindowCompactionController.class);
+
+    private final boolean ignoreOverlaps;
+
+    public TimeWindowCompactionController(ColumnFamilyStore cfs, 
Set<SSTableReader> compacting, int gcBefore, boolean ignoreOverlaps)
+    {
+        super(cfs, compacting, gcBefore);
+        this.ignoreOverlaps = ignoreOverlaps;
+        if (ignoreOverlaps)
+            logger.warn("You are running with sstables overlapping checks 
disabled, it can result in loss of data");
+    }
+
+    @Override
+    protected boolean ignoreOverlaps()
+    {
+        return ignoreOverlaps;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14d67d81/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
index 595c46d..9532cc4 100644
--- 
a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
@@ -66,7 +66,6 @@ public class TimeWindowCompactionStrategy extends 
AbstractCompactionStrategy
         }
         else
             logger.debug("Enabling tombstone compactions for TWCS");
-
     }
 
     @Override
@@ -82,7 +81,7 @@ public class TimeWindowCompactionStrategy extends 
AbstractCompactionStrategy
 
             LifecycleTransaction modifier = 
cfs.getTracker().tryModify(latestBucket, OperationType.COMPACTION);
             if (modifier != null)
-                return new CompactionTask(cfs, modifier, gcBefore);
+                return new TimeWindowCompactionTask(cfs, modifier, gcBefore, 
options.ignoreOverlaps);
         }
     }
 
@@ -104,7 +103,8 @@ public class TimeWindowCompactionStrategy extends 
AbstractCompactionStrategy
         if (System.currentTimeMillis() - lastExpiredCheck > 
options.expiredSSTableCheckFrequency)
         {
             logger.debug("TWCS expired check sufficiently far in the past, 
checking for fully expired SSTables");
-            expired = CompactionController.getFullyExpiredSSTables(cfs, 
uncompacting, cfs.getOverlappingLiveSSTables(uncompacting), gcBefore);
+            expired = CompactionController.getFullyExpiredSSTables(cfs, 
uncompacting, options.ignoreOverlaps ? Collections.emptySet() : 
cfs.getOverlappingLiveSSTables(uncompacting),
+                                                                   gcBefore, 
options.ignoreOverlaps);
             lastExpiredCheck = System.currentTimeMillis();
         }
         else
@@ -330,7 +330,7 @@ public class TimeWindowCompactionStrategy extends 
AbstractCompactionStrategy
         LifecycleTransaction txn = 
cfs.getTracker().tryModify(filteredSSTables, OperationType.COMPACTION);
         if (txn == null)
             return null;
-        return Collections.singleton(new CompactionTask(cfs, txn, gcBefore));
+        return Collections.singleton(new TimeWindowCompactionTask(cfs, txn, 
gcBefore, options.ignoreOverlaps));
     }
 
     @Override
@@ -346,7 +346,7 @@ public class TimeWindowCompactionStrategy extends 
AbstractCompactionStrategy
             return null;
         }
 
-        return new CompactionTask(cfs, modifier, 
gcBefore).setUserDefined(true);
+        return new TimeWindowCompactionTask(cfs, modifier, gcBefore, 
options.ignoreOverlaps).setUserDefined(true);
     }
 
     public int getEstimatedRemainingTasks()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14d67d81/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyOptions.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyOptions.java
 
b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyOptions.java
index 07df606..24b4fe0 100644
--- 
a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyOptions.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyOptions.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.Config;
 import org.apache.cassandra.exceptions.ConfigurationException;
 
 public final class TimeWindowCompactionStrategyOptions
@@ -36,16 +37,21 @@ public final class TimeWindowCompactionStrategyOptions
     protected static final TimeUnit DEFAULT_COMPACTION_WINDOW_UNIT = 
TimeUnit.DAYS;
     protected static final int DEFAULT_COMPACTION_WINDOW_SIZE = 1;
     protected static final int DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS 
= 60 * 10;
+    protected static final Boolean 
DEFAULT_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION = false;
 
     protected static final String TIMESTAMP_RESOLUTION_KEY = 
"timestamp_resolution";
     protected static final String COMPACTION_WINDOW_UNIT_KEY = 
"compaction_window_unit";
     protected static final String COMPACTION_WINDOW_SIZE_KEY = 
"compaction_window_size";
     protected static final String EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY 
= "expired_sstable_check_frequency_seconds";
+    protected static final String UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_KEY = 
"unsafe_aggressive_sstable_expiration";
+
+    static final String UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_PROPERTY = 
Config.PROPERTY_PREFIX + "allow_unsafe_aggressive_sstable_expiration";
 
     protected final int sstableWindowSize;
     protected final TimeUnit sstableWindowUnit;
     protected final TimeUnit timestampResolution;
     protected final long expiredSSTableCheckFrequency;
+    protected final boolean ignoreOverlaps;
 
     SizeTieredCompactionStrategyOptions stcsOptions;
 
@@ -68,6 +74,9 @@ public final class TimeWindowCompactionStrategyOptions
         optionValue = options.get(EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY);
         expiredSSTableCheckFrequency = 
TimeUnit.MILLISECONDS.convert(optionValue == null ? 
DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS : Long.parseLong(optionValue), 
TimeUnit.SECONDS);
 
+        optionValue = options.get(UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_KEY);
+        ignoreOverlaps = optionValue == null ? 
DEFAULT_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION : 
(Boolean.getBoolean(UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_PROPERTY) && 
Boolean.parseBoolean(optionValue));
+
         stcsOptions = new SizeTieredCompactionStrategyOptions(options);
     }
 
@@ -77,6 +86,7 @@ public final class TimeWindowCompactionStrategyOptions
         timestampResolution = DEFAULT_TIMESTAMP_RESOLUTION;
         sstableWindowSize = DEFAULT_COMPACTION_WINDOW_SIZE;
         expiredSSTableCheckFrequency = 
TimeUnit.MILLISECONDS.convert(DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS, 
TimeUnit.SECONDS);
+        ignoreOverlaps = DEFAULT_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION;
         stcsOptions = new SizeTieredCompactionStrategyOptions();
     }
 
@@ -136,10 +146,22 @@ public final class TimeWindowCompactionStrategyOptions
             throw new ConfigurationException(String.format("%s is not a 
parsable int (base10) for %s", optionValue, 
EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY), e);
         }
 
+
+        optionValue = options.get(UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_KEY);
+        if (optionValue != null)
+        {
+            if (!(optionValue.equalsIgnoreCase("true") || 
optionValue.equalsIgnoreCase("false")))
+                throw new ConfigurationException(String.format("%s is not 
'true' or 'false' (%s)", UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_KEY, 
optionValue));
+
+            if(optionValue.equalsIgnoreCase("true") && 
!Boolean.getBoolean(UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_PROPERTY))
+                throw new ConfigurationException(String.format("%s is 
requested but not allowed, restart cassandra with -D%s=true to allow it", 
UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_KEY, 
UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_PROPERTY));
+        }
+
         uncheckedOptions.remove(COMPACTION_WINDOW_SIZE_KEY);
         uncheckedOptions.remove(COMPACTION_WINDOW_UNIT_KEY);
         uncheckedOptions.remove(TIMESTAMP_RESOLUTION_KEY);
         uncheckedOptions.remove(EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY);
+        uncheckedOptions.remove(UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_KEY);
 
         uncheckedOptions = 
SizeTieredCompactionStrategyOptions.validateOptions(options, uncheckedOptions);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14d67d81/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionTask.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionTask.java 
b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionTask.java
new file mode 100644
index 0000000..4f1fe6a
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionTask.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.util.Set;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+
+public class TimeWindowCompactionTask extends CompactionTask
+{
+    private final boolean ignoreOverlaps;
+
+    public TimeWindowCompactionTask(ColumnFamilyStore cfs, 
LifecycleTransaction txn, int gcBefore, boolean ignoreOverlaps)
+    {
+        super(cfs, txn, gcBefore);
+        this.ignoreOverlaps = ignoreOverlaps;
+    }
+
+    @Override
+    public CompactionController getCompactionController(Set<SSTableReader> 
toCompact)
+    {
+        return new TimeWindowCompactionController(cfs, toCompact, gcBefore, 
ignoreOverlaps);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14d67d81/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java 
b/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
index 1b400e8..052206e 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
@@ -177,6 +177,11 @@ public class CompactionControllerTest extends SchemaLoader
         expired = CompactionController.getFullyExpiredSSTables(cfs, 
compacting, overlapping, gcBefore);
         assertNotNull(expired);
         assertEquals(0, expired.size());
+
+        // Now if we explicitly ask to ignore overlaped sstables, we should 
get back our expired sstable
+        expired = CompactionController.getFullyExpiredSSTables(cfs, 
compacting, overlapping, gcBefore, true);
+        assertNotNull(expired);
+        assertEquals(1, expired.size());
     }
 
     private void applyMutation(CFMetaData cfm, DecoratedKey key, long 
timestamp)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14d67d81/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java
 
b/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java
index 56d53bd..6fff279 100644
--- 
a/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java
+++ 
b/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java
@@ -60,6 +60,7 @@ public class TimeWindowCompactionStrategyTest extends 
SchemaLoader
     {
         // Disable tombstone histogram rounding for tests
         System.setProperty("cassandra.streaminghistogram.roundseconds", "1");
+        
System.setProperty(TimeWindowCompactionStrategyOptions.UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_PROPERTY,
 "true");
 
         SchemaLoader.prepareServer();
 
@@ -100,13 +101,24 @@ public class TimeWindowCompactionStrategyTest extends 
SchemaLoader
         {
             
options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY, 
"MONTHS");
             validateOptions(options);
-            fail(String.format("Invalid time units should be rejected", 
TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY));
+            fail(String.format("Invalid %s should be rejected", 
TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY));
         }
         catch (ConfigurationException e)
         {
             
options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY, 
"MINUTES");
         }
 
+        try
+        {
+            
options.put(TimeWindowCompactionStrategyOptions.UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_KEY,
 "not-a-boolean");
+            validateOptions(options);
+            fail(String.format("Invalid %s should be rejected", 
TimeWindowCompactionStrategyOptions.UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_KEY));
+        }
+        catch (ConfigurationException e)
+        {
+            
options.put(TimeWindowCompactionStrategyOptions.UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_KEY,
 "true");
+        }
+
         options.put("bad_option", "1.0");
         unvalidated = validateOptions(options);
         assertTrue(unvalidated.containsKey("bad_option"));
@@ -272,4 +284,64 @@ public class TimeWindowCompactionStrategyTest extends 
SchemaLoader
         t.transaction.abort();
     }
 
+    @Test
+    public void testDropOverlappingExpiredSSTables() throws 
InterruptedException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+
+        ByteBuffer value = ByteBuffer.wrap(new byte[100]);
+
+        // create 2 sstables
+        DecoratedKey key = Util.dk(String.valueOf("expired"));
+        new RowUpdateBuilder(cfs.metadata, System.currentTimeMillis(), 1, 
key.getKey())
+            .clustering("column")
+            .add("val", value).build().applyUnsafe();
+
+        cfs.forceBlockingFlush();
+        SSTableReader expiredSSTable = cfs.getLiveSSTables().iterator().next();
+        Thread.sleep(10);
+
+        new RowUpdateBuilder(cfs.metadata, System.currentTimeMillis() - 1000, 
key.getKey())
+            .clustering("column")
+            .add("val", value).build().applyUnsafe();
+        key = Util.dk(String.valueOf("nonexpired"));
+        new RowUpdateBuilder(cfs.metadata, System.currentTimeMillis(), 
key.getKey())
+            .clustering("column")
+            .add("val", value).build().applyUnsafe();
+
+        cfs.forceBlockingFlush();
+        assertEquals(cfs.getLiveSSTables().size(), 2);
+
+        Map<String, String> options = new HashMap<>();
+
+        
options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY, 
"30");
+        
options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY, 
"SECONDS");
+        
options.put(TimeWindowCompactionStrategyOptions.TIMESTAMP_RESOLUTION_KEY, 
"MILLISECONDS");
+        
options.put(TimeWindowCompactionStrategyOptions.EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY,
 "0");
+        TimeWindowCompactionStrategy twcs = new 
TimeWindowCompactionStrategy(cfs, options);
+        for (SSTableReader sstable : cfs.getLiveSSTables())
+            twcs.addSSTable(sstable);
+
+        twcs.startup();
+        assertNull(twcs.getNextBackgroundTask((int) 
(System.currentTimeMillis() / 1000)));
+        Thread.sleep(2000);
+        assertNull(twcs.getNextBackgroundTask((int) 
(System.currentTimeMillis()/1000)));
+
+        
options.put(TimeWindowCompactionStrategyOptions.UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_KEY,
 "true");
+        twcs = new TimeWindowCompactionStrategy(cfs, options);
+        for (SSTableReader sstable : cfs.getLiveSSTables())
+            twcs.addSSTable(sstable);
+
+        twcs.startup();
+        AbstractCompactionTask t = twcs.getNextBackgroundTask((int) 
(System.currentTimeMillis()/1000));
+        assertNotNull(t);
+        assertEquals(1, Iterables.size(t.transaction.originals()));
+        SSTableReader sstable = t.transaction.originals().iterator().next();
+        assertEquals(sstable, expiredSSTable);
+        twcs.shutdown();
+        t.transaction.abort();
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to