Updated Branches: refs/heads/trunk efbdee237 -> ebefb77c6
Add an official way to disable autocompaction. patch by marcuse, reviewed by pcmanus for CASSANDRA-5074 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ebefb77c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ebefb77c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ebefb77c Branch: refs/heads/trunk Commit: ebefb77c6e8a5046a8c1b1bb0edd258aaf0ad8b7 Parents: efbdee2 Author: Marcus Eriksson <marc...@spotify.com> Authored: Tue Apr 9 18:28:11 2013 +0200 Committer: Marcus Eriksson <marc...@spotify.com> Committed: Tue Apr 9 18:28:11 2013 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 3 + src/java/org/apache/cassandra/cli/CliClient.java | 8 +++- .../org/apache/cassandra/config/CFMetaData.java | 6 ++- .../apache/cassandra/cql/AlterTableStatement.java | 8 ++- .../cassandra/cql/CreateColumnFamilyStatement.java | 10 +++- src/java/org/apache/cassandra/cql3/CFPropDefs.java | 8 ++- .../apache/cassandra/db/CollationController.java | 2 +- .../org/apache/cassandra/db/ColumnFamilyStore.java | 40 +++++++++++--- .../cassandra/db/ColumnFamilyStoreMBean.java | 5 +-- .../db/compaction/AbstractCompactionStrategy.java | 34 ++++++++++++ .../cassandra/db/compaction/CompactionManager.java | 6 ++ .../db/compaction/LeveledCompactionStrategy.java | 2 +- .../compaction/SizeTieredCompactionStrategy.java | 11 ++--- .../apache/cassandra/service/StorageService.java | 18 +++++++ .../cassandra/service/StorageServiceMBean.java | 4 ++ src/java/org/apache/cassandra/tools/NodeCmd.java | 11 ++++- src/java/org/apache/cassandra/tools/NodeProbe.java | 10 ++++ .../org/apache/cassandra/tools/NodeToolHelp.yaml | 6 ++ .../cassandra/db/compaction/TTLExpiryTest.java | 6 +-- 20 files changed, 163 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/ebefb77c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index b322425..18c267a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -30,6 +30,7 @@ (CASSANDRA-4937) * Improve repair by deciding on a gcBefore before sending out TreeRequests (CASSANDRA-4932) + * Add an official way to disable compactions (CASSANDRA-5074) 1.2.4 * Ensure that PerRowSecondaryIndex updates see the most recent values http://git-wip-us.apache.org/repos/asf/cassandra/blob/ebefb77c/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index 3c5e07b..1c68de6 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -30,6 +30,9 @@ Operations ---------- - Major compactions, cleanup, scrub, and upgradesstables will interrupt any in-progress compactions (but not repair validations) when invoked. + - Disabling autocompactions by setting min/max compaction threshold to 0 + has been deprecated, instead, use the nodetool commands 'disableautocompaction' + and 'enableautocompaction' or set the compaction strategy option enabled = false 1.2.4 http://git-wip-us.apache.org/repos/asf/cassandra/blob/ebefb77c/src/java/org/apache/cassandra/cli/CliClient.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cli/CliClient.java b/src/java/org/apache/cassandra/cli/CliClient.java index 9347b59..7ff9048 100644 --- a/src/java/org/apache/cassandra/cli/CliClient.java +++ b/src/java/org/apache/cassandra/cli/CliClient.java @@ -1311,9 +1311,15 @@ public class CliClient cfDef.setDefault_validation_class(CliUtils.unescapeSQLString(mValue)); break; case MIN_COMPACTION_THRESHOLD: - cfDef.setMin_compaction_threshold(Integer.parseInt(mValue)); + int threshold = Integer.parseInt(mValue); + if (threshold <= 0) + throw new RuntimeException("Disabling compaction by setting min/max compaction thresholds to 0 has been deprecated, set compaction_strategy_options={'enabled':false} instead"); + cfDef.setMin_compaction_threshold(threshold); break; case MAX_COMPACTION_THRESHOLD: + threshold = Integer.parseInt(mValue); + if (threshold <= 0) + throw new RuntimeException("Disabling compaction by setting min/max compaction thresholds to 0 has been deprecated, set compaction_strategy_options={'enabled':false} instead"); cfDef.setMax_compaction_threshold(Integer.parseInt(mValue)); break; case REPLICATE_ON_WRITE: http://git-wip-us.apache.org/repos/asf/cassandra/blob/ebefb77c/src/java/org/apache/cassandra/config/CFMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java index 3ccf641..79a75d5 100644 --- a/src/java/org/apache/cassandra/config/CFMetaData.java +++ b/src/java/org/apache/cassandra/config/CFMetaData.java @@ -168,7 +168,7 @@ public final class CFMetaData + "mutation blob," + "PRIMARY KEY (target_id, hint_id, message_version)" + ") WITH COMPACT STORAGE " - + "AND COMPACTION={'class' : 'SizeTieredCompactionStrategy', 'min_threshold' : 0, 'max_threshold' : 0} " + + "AND COMPACTION={'class' : 'SizeTieredCompactionStrategy', 'enabled' : false} " + "AND COMMENT='hints awaiting delivery'" + "AND gc_grace_seconds=0"); @@ -1335,7 +1335,11 @@ public final class CFMetaData private void validateCompactionThresholds() throws ConfigurationException { if (maxCompactionThreshold == 0) + { + logger.warn("Disabling compaction by setting max or min compaction has been deprecated, " + + "set the compaction strategy option 'enabled' to 'false' instead"); return; + } if (minCompactionThreshold <= 1) throw new ConfigurationException(String.format("Min compaction threshold cannot be less than 2 (got %d).", minCompactionThreshold)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ebefb77c/src/java/org/apache/cassandra/cql/AlterTableStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql/AlterTableStatement.java b/src/java/org/apache/cassandra/cql/AlterTableStatement.java index 1f48c1e..662d889 100644 --- a/src/java/org/apache/cassandra/cql/AlterTableStatement.java +++ b/src/java/org/apache/cassandra/cql/AlterTableStatement.java @@ -174,8 +174,12 @@ public class AlterTableStatement cfm.dcLocalReadRepairChance(cfProps.getPropertyDouble(CFPropDefs.KW_DCLOCALREADREPAIRCHANCE, cfm.getDcLocalReadRepair())); cfm.gcGraceSeconds(cfProps.getPropertyInt(CFPropDefs.KW_GCGRACESECONDS, cfm.getGcGraceSeconds())); cfm.replicateOnWrite(cfProps.getPropertyBoolean(CFPropDefs.KW_REPLICATEONWRITE, cfm.getReplicateOnWrite())); - cfm.minCompactionThreshold(cfProps.getPropertyInt(CFPropDefs.KW_MINCOMPACTIONTHRESHOLD, cfm.getMinCompactionThreshold())); - cfm.maxCompactionThreshold(cfProps.getPropertyInt(CFPropDefs.KW_MAXCOMPACTIONTHRESHOLD, cfm.getMaxCompactionThreshold())); + int minCompactionThreshold = cfProps.getPropertyInt(CFPropDefs.KW_MINCOMPACTIONTHRESHOLD, cfm.getMinCompactionThreshold()); + int maxCompactionThreshold = cfProps.getPropertyInt(CFPropDefs.KW_MAXCOMPACTIONTHRESHOLD, cfm.getMaxCompactionThreshold()); + if (minCompactionThreshold <= 0 || maxCompactionThreshold <= 0) + throw new ConfigurationException("Disabling compaction by setting compaction thresholds to 0 has been deprecated, set the compaction option 'enabled' to false instead."); + cfm.minCompactionThreshold(minCompactionThreshold); + cfm.maxCompactionThreshold(maxCompactionThreshold); cfm.caching(CFMetaData.Caching.fromString(cfProps.getPropertyString(CFPropDefs.KW_CACHING, cfm.getCaching().toString()))); cfm.defaultTimeToLive(cfProps.getPropertyInt(CFPropDefs.KW_DEFAULT_TIME_TO_LIVE, cfm.getDefaultTimeToLive())); cfm.speculativeRetry(CFMetaData.SpeculativeRetry.fromString(cfProps.getPropertyString(CFPropDefs.KW_SPECULATIVE_RETRY, cfm.getSpeculativeRetry().toString()))); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ebefb77c/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java b/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java index 2f04c38..2f0790c 100644 --- a/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java +++ b/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java @@ -161,7 +161,7 @@ public class CreateColumnFamilyStatement * @return a CFMetaData instance corresponding to the values parsed from this statement * @throws InvalidRequestException on failure to validate parsed parameters */ - public CFMetaData getCFMetaData(String keyspace, List<ByteBuffer> variables) throws InvalidRequestException + public CFMetaData getCFMetaData(String keyspace, List<ByteBuffer> variables) throws InvalidRequestException, ConfigurationException { validate(variables); @@ -178,6 +178,10 @@ public class CreateColumnFamilyStatement if (CFMetaData.DEFAULT_COMPRESSOR != null && cfProps.compressionParameters.isEmpty()) cfProps.compressionParameters.put(CompressionParameters.SSTABLE_COMPRESSION, CFMetaData.DEFAULT_COMPRESSOR); + int maxCompactionThreshold = getPropertyInt(CFPropDefs.KW_MAXCOMPACTIONTHRESHOLD, CFMetaData.DEFAULT_MAX_COMPACTION_THRESHOLD); + int minCompactionThreshold = getPropertyInt(CFPropDefs.KW_MINCOMPACTIONTHRESHOLD, CFMetaData.DEFAULT_MIN_COMPACTION_THRESHOLD); + if (minCompactionThreshold <= 0 || maxCompactionThreshold <= 0) + throw new ConfigurationException("Disabling compaction by setting compaction thresholds to 0 has been deprecated, set the compaction option 'enabled' to false instead."); newCFMD.comment(cfProps.getProperty(CFPropDefs.KW_COMMENT)) .readRepairChance(getPropertyDouble(CFPropDefs.KW_READREPAIRCHANCE, CFMetaData.DEFAULT_READ_REPAIR_CHANCE)) @@ -185,8 +189,8 @@ public class CreateColumnFamilyStatement .replicateOnWrite(getPropertyBoolean(CFPropDefs.KW_REPLICATEONWRITE, CFMetaData.DEFAULT_REPLICATE_ON_WRITE)) .gcGraceSeconds(getPropertyInt(CFPropDefs.KW_GCGRACESECONDS, CFMetaData.DEFAULT_GC_GRACE_SECONDS)) .defaultValidator(cfProps.getValidator()) - .minCompactionThreshold(getPropertyInt(CFPropDefs.KW_MINCOMPACTIONTHRESHOLD, CFMetaData.DEFAULT_MIN_COMPACTION_THRESHOLD)) - .maxCompactionThreshold(getPropertyInt(CFPropDefs.KW_MAXCOMPACTIONTHRESHOLD, CFMetaData.DEFAULT_MAX_COMPACTION_THRESHOLD)) + .minCompactionThreshold(minCompactionThreshold) + .maxCompactionThreshold(maxCompactionThreshold) .columnMetadata(getColumns(comparator)) .keyValidator(TypeParser.parse(CFPropDefs.comparators.get(getKeyType()))) .compactionStrategyClass(cfProps.compactionStrategyClass) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ebefb77c/src/java/org/apache/cassandra/cql3/CFPropDefs.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/CFPropDefs.java b/src/java/org/apache/cassandra/cql3/CFPropDefs.java index a4edd4f..cc4c457 100644 --- a/src/java/org/apache/cassandra/cql3/CFPropDefs.java +++ b/src/java/org/apache/cassandra/cql3/CFPropDefs.java @@ -139,8 +139,12 @@ public class CFPropDefs extends PropertyDefinitions cfm.dcLocalReadRepairChance(getDouble(KW_DCLOCALREADREPAIRCHANCE, cfm.getDcLocalReadRepair())); cfm.gcGraceSeconds(getInt(KW_GCGRACESECONDS, cfm.getGcGraceSeconds())); cfm.replicateOnWrite(getBoolean(KW_REPLICATEONWRITE, cfm.getReplicateOnWrite())); - cfm.minCompactionThreshold(toInt(KW_MINCOMPACTIONTHRESHOLD, getCompactionOptions().get(KW_MINCOMPACTIONTHRESHOLD), cfm.getMinCompactionThreshold())); - cfm.maxCompactionThreshold(toInt(KW_MAXCOMPACTIONTHRESHOLD, getCompactionOptions().get(KW_MAXCOMPACTIONTHRESHOLD), cfm.getMaxCompactionThreshold())); + int minCompactionThreshold = toInt(KW_MINCOMPACTIONTHRESHOLD, getCompactionOptions().get(KW_MINCOMPACTIONTHRESHOLD), cfm.getMinCompactionThreshold()); + int maxCompactionThreshold = toInt(KW_MAXCOMPACTIONTHRESHOLD, getCompactionOptions().get(KW_MAXCOMPACTIONTHRESHOLD), cfm.getMaxCompactionThreshold()); + if (minCompactionThreshold <= 0 || maxCompactionThreshold <= 0) + throw new ConfigurationException("Disabling compaction by setting compaction thresholds to 0 has been deprecated, set the compaction option 'enabled' to false instead."); + cfm.minCompactionThreshold(minCompactionThreshold); + cfm.maxCompactionThreshold(maxCompactionThreshold); cfm.caching(CFMetaData.Caching.fromString(getString(KW_CACHING, cfm.getCaching().toString()))); cfm.defaultTimeToLive(getInt(KW_DEFAULT_TIME_TO_LIVE, cfm.getDefaultTimeToLive())); cfm.speculativeRetry(CFMetaData.SpeculativeRetry.fromString(getString(KW_SPECULATIVE_RETRY, cfm.getSpeculativeRetry().toString()))); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ebefb77c/src/java/org/apache/cassandra/db/CollationController.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CollationController.java b/src/java/org/apache/cassandra/db/CollationController.java index 2ea2d01..ea06118 100644 --- a/src/java/org/apache/cassandra/db/CollationController.java +++ b/src/java/org/apache/cassandra/db/CollationController.java @@ -182,7 +182,7 @@ public class CollationController // "hoist up" the requested data into a more recent sstable if (sstablesIterated > cfs.getMinimumCompactionThreshold() - && !cfs.isCompactionDisabled() + && !cfs.isAutoCompactionDisabled() && cfs.getCompactionStrategy() instanceof SizeTieredCompactionStrategy) { Tracing.trace("Defragmenting requested data"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ebefb77c/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 516fcd6..3f8b7fc 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Pattern; import javax.management.*; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.collect.AbstractIterator; import com.google.common.collect.ImmutableSet; @@ -268,6 +269,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean // compaction strategy should be created after the CFS has been prepared this.compactionStrategy = metadata.createCompactionStrategyInstance(this); + if (maxCompactionThreshold.value() <= 0 || minCompactionThreshold.value() <=0) + { + logger.warn("Disabling compaction strategy by setting compaction thresholds to 0 is deprecated, set the compaction option 'enabled' to 'false' instead."); + this.compactionStrategy.disable(); + } + // create the private ColumnFamilyStores for the secondary column indexes for (ColumnDefinition info : metadata.allColumns()) { @@ -1951,14 +1958,30 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { // we don't use CompactionStrategy.pause since we don't want users flipping that on and off // during runWithCompactionsDisabled - minCompactionThreshold.set(0); - maxCompactionThreshold.set(0); + this.compactionStrategy.disable(); } public void enableAutoCompaction() { - minCompactionThreshold.reset(); - maxCompactionThreshold.reset(); + enableAutoCompaction(false); + } + + /** + * used for tests - to be able to check things after a minor compaction + * @param waitForFutures if we should block until autocompaction is done + */ + @VisibleForTesting + public void enableAutoCompaction(boolean waitForFutures) + { + this.compactionStrategy.enable(); + List<Future<?>> futures = CompactionManager.instance.submitBackground(this); + if (waitForFutures) + FBUtilities.waitOnFutures(futures); + } + + public boolean isAutoCompactionDisabled() + { + return !this.compactionStrategy.isEnabled(); } /* @@ -2012,14 +2035,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean private void validateCompactionThresholds(int minThreshold, int maxThreshold) { - if (minThreshold > maxThreshold && maxThreshold != 0) + if (minThreshold > maxThreshold) throw new RuntimeException(String.format("The min_compaction_threshold cannot be larger than the max_compaction_threshold. " + "Min is '%d', Max is '%d'.", minThreshold, maxThreshold)); - } - public boolean isCompactionDisabled() - { - return getMinimumCompactionThreshold() <= 0 || getMaximumCompactionThreshold() <= 0; + if (maxThreshold == 0 || minThreshold == 0) + throw new RuntimeException("Disabling compaction by setting min_compaction_threshold or max_compaction_threshold to 0 " + + "is deprecated, set the compaction strategy option 'enabled' to 'false' instead or use the nodetool command 'disableautocompaction'."); } // End JMX get/set. http://git-wip-us.apache.org/repos/asf/cassandra/blob/ebefb77c/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java index 554c204..e810dc6 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java @@ -281,10 +281,7 @@ public interface ColumnFamilyStoreMBean */ public void setCrcCheckChance(double crcCheckChance); - /** - * Disable automatic compaction. - */ - public void disableAutoCompaction(); + public boolean isAutoCompactionDisabled(); public long estimateKeys(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ebefb77c/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java index 7066d41..18af985 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java @@ -48,6 +48,7 @@ public abstract class AbstractCompactionStrategy protected static final long DEFAULT_TOMBSTONE_COMPACTION_INTERVAL = 86400; protected static final String TOMBSTONE_THRESHOLD_OPTION = "tombstone_threshold"; protected static final String TOMBSTONE_COMPACTION_INTERVAL_OPTION = "tombstone_compaction_interval"; + protected static final String COMPACTION_ENABLED = "enabled"; public final Map<String, String> options; @@ -67,6 +68,8 @@ public abstract class AbstractCompactionStrategy */ protected boolean isActive = true; + protected volatile boolean enabled = true; + protected AbstractCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options) { assert cfs != null; @@ -82,6 +85,13 @@ public abstract class AbstractCompactionStrategy tombstoneThreshold = optionValue == null ? DEFAULT_TOMBSTONE_THRESHOLD : Float.parseFloat(optionValue); optionValue = options.get(TOMBSTONE_COMPACTION_INTERVAL_OPTION); tombstoneCompactionInterval = optionValue == null ? DEFAULT_TOMBSTONE_COMPACTION_INTERVAL : Long.parseLong(optionValue); + optionValue = options.get(COMPACTION_ENABLED); + + if (optionValue != null) + { + if (optionValue.equalsIgnoreCase("false")) + this.enabled = false; + } } catch (ConfigurationException e) { @@ -157,6 +167,21 @@ public abstract class AbstractCompactionStrategy */ public abstract long getMaxSSTableSize(); + public boolean isEnabled() + { + return this.enabled && this.isActive; + } + + public void enable() + { + this.enabled = true; + } + + public void disable() + { + this.enabled = false; + } + /** * Filters SSTables that are to be blacklisted from the given collection * @@ -282,9 +307,18 @@ public abstract class AbstractCompactionStrategy } } + String compactionEnabled = options.get(COMPACTION_ENABLED); + if (compactionEnabled != null) + { + if (!compactionEnabled.equalsIgnoreCase("true") && !compactionEnabled.equalsIgnoreCase("false")) + { + throw new ConfigurationException(String.format("enabled should either be 'true' or 'false', not %s", compactionEnabled)); + } + } Map<String, String> uncheckedOptions = new HashMap<String, String>(options); uncheckedOptions.remove(TOMBSTONE_THRESHOLD_OPTION); uncheckedOptions.remove(TOMBSTONE_COMPACTION_INTERVAL_OPTION); + uncheckedOptions.remove(COMPACTION_ENABLED); return uncheckedOptions; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ebefb77c/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index b5aefee..f420efd 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -106,6 +106,12 @@ public class CompactionManager implements CompactionManagerMBean */ public List<Future<?>> submitBackground(final ColumnFamilyStore cfs) { + if (cfs.isAutoCompactionDisabled()) + { + logger.debug("Autocompaction is disabled"); + return Collections.emptyList(); + } + int count = compactingCF.count(cfs); if (count > 0 && executor.getActiveCount() >= executor.getMaximumPoolSize()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/ebefb77c/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java index 6ccd551..f704518 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java @@ -89,7 +89,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem */ public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore) { - if (!isActive || cfs.isCompactionDisabled()) + if (!isEnabled()) return null; return getMaximalTask(gcBefore); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ebefb77c/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java index 844720f..ae6f627 100644 --- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java @@ -56,19 +56,16 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy bucketLow = optionValue == null ? DEFAULT_BUCKET_LOW : Double.parseDouble(optionValue); optionValue = options.get(BUCKET_HIGH_KEY); bucketHigh = optionValue == null ? DEFAULT_BUCKET_HIGH : Double.parseDouble(optionValue); - cfs.setCompactionThresholds(cfs.metadata.getMinCompactionThreshold(), cfs.metadata.getMaxCompactionThreshold()); } private List<SSTableReader> getNextBackgroundSSTables(final int gcBefore) { + if (!isEnabled()) + return Collections.emptyList(); + // make local copies so they can't be changed out from under us mid-method int minThreshold = cfs.getMinimumCompactionThreshold(); int maxThreshold = cfs.getMaximumCompactionThreshold(); - if (minThreshold == 0 || maxThreshold == 0) - { - logger.debug("Compaction is currently disabled."); - return Collections.emptyList(); - } Set<SSTableReader> candidates = cfs.getUncompactingSSTables(); List<List<SSTableReader>> buckets = getBuckets(createSSTableAndLengthPairs(filterSuspectSSTables(candidates)), bucketHigh, bucketLow, minSSTableSize); @@ -135,7 +132,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore) { - if (!isActive) + if (!isEnabled()) return null; while (true) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ebefb77c/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 80e2c56..36724b9 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -3842,4 +3842,22 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { rangeXferExecutor.tearDown(); } + + @Override + public void disableAutoCompaction(String ks, String... columnFamilies) throws IOException + { + for (ColumnFamilyStore cfs : getValidColumnFamilies(true, true, ks, columnFamilies)) + { + cfs.disableAutoCompaction(); + } + } + + @Override + public void enableAutoCompaction(String ks, String... columnFamilies) throws IOException + { + for (ColumnFamilyStore cfs : getValidColumnFamilies(true, true, ks, columnFamilies)) + { + cfs.enableAutoCompaction(); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ebefb77c/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index b831d7d..84d0346 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -470,4 +470,8 @@ public interface StorageServiceMBean extends NotificationEmitter public void enableScheduledRangeXfers(); /** Disable processing of queued range transfers. */ public void disableScheduledRangeXfers(); + + void disableAutoCompaction(String ks, String ... columnFamilies) throws IOException; + void enableAutoCompaction(String ks, String ... columnFamilies) throws IOException; + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ebefb77c/src/java/org/apache/cassandra/tools/NodeCmd.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java index ed82e32..2140358 100644 --- a/src/java/org/apache/cassandra/tools/NodeCmd.java +++ b/src/java/org/apache/cassandra/tools/NodeCmd.java @@ -118,6 +118,8 @@ public class NodeCmd ENABLETHRIFT, FLUSH, GETCOMPACTIONTHRESHOLD, + DISABLEAUTOCOMPACTION, + ENABLEAUTOCOMPACTION, GETENDPOINTS, GETSSTABLES, GOSSIPINFO, @@ -1168,6 +1170,8 @@ public class NodeCmd case FLUSH : case SCRUB : case UPGRADESSTABLES : + case DISABLEAUTOCOMPACTION: + case ENABLEAUTOCOMPACTION: optionalKSandCFs(command, cmd, arguments, probe); break; @@ -1195,7 +1199,6 @@ public class NodeCmd if (minthreshold < 2 && maxthreshold != 0) { badUse("Min threshold must be at least 2"); } probe.setCompactionThreshold(arguments[0], arguments[1], minthreshold, maxthreshold); break; - case GETENDPOINTS : if (arguments.length != 3) { badUse("getendpoints requires ks, cf and key args"); } nodeCmd.printEndPoints(arguments[0], arguments[1], arguments[2], System.out); @@ -1420,6 +1423,12 @@ public class NodeCmd try { probe.upgradeSSTables(keyspace, excludeCurrentVersion, columnFamilies); } catch (ExecutionException ee) { err(ee, "Error occurred while upgrading the sstables for keyspace " + keyspace); } break; + case ENABLEAUTOCOMPACTION: + probe.enableAutoCompaction(keyspace, columnFamilies); + break; + case DISABLEAUTOCOMPACTION: + probe.disableAutoCompaction(keyspace, columnFamilies); + break; default: throw new RuntimeException("Unreachable code."); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ebefb77c/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 085a12a..fb941f5 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -520,6 +520,16 @@ public class NodeProbe cfsProxy.setCompactionThresholds(minimumCompactionThreshold, maximumCompactionThreshold); } + public void disableAutoCompaction(String ks, String ... columnFamilies) throws IOException + { + ssProxy.disableAutoCompaction(ks, columnFamilies); + } + + public void enableAutoCompaction(String ks, String ... columnFamilies) throws IOException + { + ssProxy.enableAutoCompaction(ks, columnFamilies); + } + public void setCacheCapacities(int keyCacheCapacity, int rowCacheCapacity) { try http://git-wip-us.apache.org/repos/asf/cassandra/blob/ebefb77c/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml ---------------------------------------------------------------------- diff --git a/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml b/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml index d3e8436..6924602 100644 --- a/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml +++ b/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml @@ -155,6 +155,12 @@ commands: - name: getcompactionthreshold <keyspace> <cfname> help: | Print min and max compaction thresholds for a given column family + - name: disableautocompaction [keyspace] [cfnames] + help: | + Disable autocompaction for the given keyspace and column family + - name: enableautocompaction [keyspace] [cfnames] + help: | + Enable autocompaction - name: stop <compaction_type> help: | Supported types are COMPACTION, VALIDATION, CLEANUP, SCRUB, INDEX_BUILD http://git-wip-us.apache.org/repos/asf/cassandra/blob/ebefb77c/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java b/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java index b01b806..c9699b7 100644 --- a/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java @@ -69,8 +69,7 @@ public class TTLExpiryTest extends SchemaLoader cfs.forceBlockingFlush(); Thread.sleep(2000); // wait for ttl to expire assertEquals(4, cfs.getSSTables().size()); - cfs.enableAutoCompaction(); - FBUtilities.waitOnFutures(CompactionManager.instance.submitBackground(cfs)); + cfs.enableAutoCompaction(true); assertEquals(0, cfs.getSSTables().size()); } @@ -117,8 +116,7 @@ public class TTLExpiryTest extends SchemaLoader cfs.forceBlockingFlush(); Thread.sleep(2000); // wait for ttl to expire assertEquals(4, cfs.getSSTables().size()); - cfs.enableAutoCompaction(); - FBUtilities.waitOnFutures(CompactionManager.instance.submitBackground(cfs)); + cfs.enableAutoCompaction(true); assertEquals(1, cfs.getSSTables().size()); SSTableReader sstable = cfs.getSSTables().iterator().next(); SSTableScanner scanner = sstable.getScanner(new QueryFilter(null, "Standard1", new IdentityQueryFilter()));