Author: slebresne Date: Fri Jun 17 15:00:21 2011 New Revision: 1136904 URL: http://svn.apache.org/viewvc?rev=1136904&view=rev Log: Fix compaction of the same sstable by multiple thread patch by slebresne; reviewed by jbellis for CASSANDRA-2769
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java Modified: cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java?rev=1136904&r1=1136903&r2=1136904&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java Fri Jun 17 15:00:21 2011 @@ -33,7 +33,6 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.cache.AutoSavingCache; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.compaction.AbstractCompactionTask; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.utils.Pair; @@ -163,6 +162,9 @@ public class DataTracker { if (max < min || max < 1) return null; + if (tomark == null || tomark.isEmpty()) + return null; + View currentView, newView; Set<SSTableReader> subset = null; // order preserving set copy of the input @@ -190,41 +192,6 @@ public class DataTracker return subset; } - public boolean markCompacting(AbstractCompactionTask task) - { - ColumnFamilyStore cfs = task.getColumnFamilyStore(); - return markCompacting(task, cfs.getMinimumCompactionThreshold(), cfs.getMaximumCompactionThreshold()); - } - - public boolean markCompacting(AbstractCompactionTask task, int min, int max) - { - Collection<SSTableReader> sstablesToMark = task.getSSTables(); - if (sstablesToMark == null || sstablesToMark.isEmpty()) - return false; - - if (max < min || max < 1) - return false; - - View currentView, newView; - // order preserving set copy of the input - Set<SSTableReader> remaining = new LinkedHashSet<SSTableReader>(sstablesToMark); - do - { - currentView = view.get(); - - // find the subset that is active and not already compacting - remaining.removeAll(currentView.compacting); - remaining.retainAll(currentView.sstables); - if (remaining.size() < min || remaining.size() > max) - // cannot meet the min and max threshold - return false; - - newView = currentView.markCompacting(sstablesToMark); - } - while (!view.compareAndSet(currentView, newView)); - return true; - } - /** * Removes files from compacting status: this is different from 'markCompacted' * because it should be run regardless of whether a compaction succeeded. @@ -240,11 +207,6 @@ public class DataTracker while (!view.compareAndSet(currentView, newView)); } - public void unmarkCompacting(AbstractCompactionTask task) - { - unmarkCompacting(task.getSSTables()); - } - public void markCompacted(Collection<SSTableReader> sstables) { replace(sstables, Collections.<SSTableReader>emptyList()); Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java?rev=1136904&r1=1136903&r2=1136904&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java Fri Jun 17 15:00:21 2011 @@ -19,6 +19,7 @@ package org.apache.cassandra.db.compaction; import java.util.Collection; +import java.util.Set; import java.io.IOException; import org.apache.cassandra.io.sstable.SSTableReader; @@ -47,4 +48,33 @@ public abstract class AbstractCompaction { return sstables; } + + /** + * Try to mark the sstable to compact as compacting. + * It returns true if some sstables have been marked for compaction, false + * otherwise. + * This *must* be called before calling execute(). Moreover, + * unmarkSSTables *must* always be called after execute() if this + * method returns true. + */ + public boolean markSSTablesForCompaction() + { + return markSSTablesForCompaction(cfs.getMinimumCompactionThreshold(), cfs.getMaximumCompactionThreshold()); + } + + public boolean markSSTablesForCompaction(int min, int max) + { + Set<SSTableReader> marked = cfs.getDataTracker().markCompacting(sstables, min, max); + + if (marked == null || marked.isEmpty()) + return false; + + this.sstables = marked; + return true; + } + + public void unmarkSSTables() + { + cfs.getDataTracker().unmarkCompacting(sstables); + } } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1136904&r1=1136903&r2=1136904&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Fri Jun 17 15:00:21 2011 @@ -110,7 +110,7 @@ public class CompactionManager implement AbstractCompactionStrategy strategy = cfs.getCompactionStrategy(); for (AbstractCompactionTask task : strategy.getBackgroundTasks(getDefaultGcBefore(cfs))) { - if (!cfs.getDataTracker().markCompacting(task)) + if (!task.markSSTablesForCompaction()) continue; try @@ -119,7 +119,7 @@ public class CompactionManager implement } finally { - cfs.getDataTracker().unmarkCompacting(task); + task.unmarkSSTables(); } } } @@ -246,7 +246,7 @@ public class CompactionManager implement AbstractCompactionStrategy strategy = cfStore.getCompactionStrategy(); for (AbstractCompactionTask task : strategy.getMaximalTasks(gcBefore)) { - if (!cfStore.getDataTracker().markCompacting(task, 0, Integer.MAX_VALUE)) + if (!task.markSSTablesForCompaction(0, Integer.MAX_VALUE)) return this; try { @@ -264,7 +264,7 @@ public class CompactionManager implement } finally { - cfStore.getDataTracker().unmarkCompacting(task); + task.unmarkSSTables(); } } } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java?rev=1136904&r1=1136903&r2=1136904&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java Fri Jun 17 15:00:21 2011 @@ -60,12 +60,18 @@ public class CompactionTask extends Abst /** * For internal use and testing only. The rest of the system should go through the submit* methods, * which are properly serialized. + * Caller is in charge of marking/unmarking the sstables as compacting. */ public int execute(CompactionExecutorStatsCollector collector) throws IOException { + // The collection of sstables passed may be empty (but not null); even if + // it is not empty, it may compact down to nothing if all rows are deleted. + assert sstables != null; + + Set<SSTableReader> toCompact = new HashSet<SSTableReader>(sstables); if (!isUserDefined) { - if (sstables.size() < 2) + if (toCompact.size() < 2) { logger.info("Nothing to compact in " + cfs.getColumnFamilyName() + "." + "Use forceUserDefinedCompaction if you wish to force compaction of single sstables " + @@ -74,18 +80,19 @@ public class CompactionTask extends Abst } if (compactionFileLocation == null) - compactionFileLocation = cfs.table.getDataFileLocation(cfs.getExpectedCompactedFileSize(sstables)); + compactionFileLocation = cfs.table.getDataFileLocation(cfs.getExpectedCompactedFileSize(toCompact)); // If the compaction file path is null that means we have no space left for this compaction. // Try again w/o the largest one. if (compactionFileLocation == null) { - Set<SSTableReader> smallerSSTables = new HashSet<SSTableReader>(sstables); - while (compactionFileLocation == null && smallerSSTables.size() > 1) + while (compactionFileLocation == null && toCompact.size() > 1) { - logger.warn("insufficient space to compact all requested files " + StringUtils.join(smallerSSTables, ", ")); - smallerSSTables.remove(cfs.getMaxSizeFile(smallerSSTables)); - compactionFileLocation = cfs.table.getDataFileLocation(cfs.getExpectedCompactedFileSize(smallerSSTables)); + logger.warn("insufficient space to compact all requested files " + StringUtils.join(toCompact, ", ")); + // Note that we have removed files that are still marked as compacting. This suboptimal but ok since the caller will unmark all + // the sstables at the end. + toCompact.remove(cfs.getMaxSizeFile(toCompact)); + compactionFileLocation = cfs.table.getDataFileLocation(cfs.getExpectedCompactedFileSize(toCompact)); } } @@ -96,36 +103,32 @@ public class CompactionTask extends Abst } } - // The collection of sstables passed may be empty (but not null); even if - // it is not empty, it may compact down to nothing if all rows are deleted. - assert sstables != null; - if (DatabaseDescriptor.isSnapshotBeforeCompaction()) cfs.table.snapshot(System.currentTimeMillis() + "-" + "compact-" + cfs.columnFamily); // sanity check: all sstables must belong to the same cfs - for (SSTableReader sstable : sstables) + for (SSTableReader sstable : toCompact) assert sstable.descriptor.cfname.equals(cfs.columnFamily); - CompactionController controller = new CompactionController(cfs, sstables, gcBefore, isUserDefined); + CompactionController controller = new CompactionController(cfs, toCompact, gcBefore, isUserDefined); // new sstables from flush can be added during a compaction, but only the compaction can remove them, // so in our single-threaded compaction world this is a valid way of determining if we're compacting // all the sstables (that existed when we started) CompactionType type = controller.isMajor() ? CompactionType.MAJOR : CompactionType.MINOR; - logger.info("Compacting {}: {}", type, sstables); + logger.info("Compacting {}: {}", type, toCompact); long startTime = System.currentTimeMillis(); long totalkeysWritten = 0; // TODO the int cast here is potentially buggy - int expectedBloomFilterSize = Math.max(DatabaseDescriptor.getIndexInterval(), (int)SSTableReader.getApproximateKeyCount(sstables)); + int expectedBloomFilterSize = Math.max(DatabaseDescriptor.getIndexInterval(), (int)SSTableReader.getApproximateKeyCount(toCompact)); if (logger.isDebugEnabled()) logger.debug("Expected bloom filter size : " + expectedBloomFilterSize); SSTableWriter writer; - CompactionIterator ci = new CompactionIterator(type, sstables, controller); // retain a handle so we can call close() + CompactionIterator ci = new CompactionIterator(type, toCompact, controller); // retain a handle so we can call close() Iterator<AbstractCompactedRow> nni = Iterators.filter(ci, Predicates.notNull()); Map<DecoratedKey, Long> cachedKeys = new HashMap<DecoratedKey, Long>(); @@ -138,11 +141,11 @@ public class CompactionTask extends Abst // don't mark compacted in the finally block, since if there _is_ nondeleted data, // we need to sync it (via closeAndOpen) first, so there is no period during which // a crash could cause data loss. - cfs.markCompacted(sstables); + cfs.markCompacted(toCompact); return 0; } - writer = cfs.createCompactionWriter(expectedBloomFilterSize, compactionFileLocation, sstables); + writer = cfs.createCompactionWriter(expectedBloomFilterSize, compactionFileLocation, toCompact); while (nni.hasNext()) { AbstractCompactedRow row = nni.next(); @@ -151,7 +154,7 @@ public class CompactionTask extends Abst if (DatabaseDescriptor.getPreheatKeyCache()) { - for (SSTableReader sstable : sstables) + for (SSTableReader sstable : toCompact) { if (sstable.getCachedPosition(row.key) != null) { @@ -169,19 +172,19 @@ public class CompactionTask extends Abst collector.finishCompaction(ci); } - SSTableReader ssTable = writer.closeAndOpenReader(getMaxDataAge(sstables)); - cfs.replaceCompactedSSTables(sstables, Arrays.asList(ssTable)); + SSTableReader ssTable = writer.closeAndOpenReader(getMaxDataAge(toCompact)); + cfs.replaceCompactedSSTables(toCompact, Arrays.asList(ssTable)); for (Entry<DecoratedKey, Long> entry : cachedKeys.entrySet()) // empty if preheat is off ssTable.cacheKey(entry.getKey(), entry.getValue()); CompactionManager.instance.submitBackground(cfs); long dTime = System.currentTimeMillis() - startTime; - long startsize = SSTable.getTotalBytes(sstables); + long startsize = SSTable.getTotalBytes(toCompact); long endsize = ssTable.length(); double ratio = (double)endsize / (double)startsize; logger.info(String.format("Compacted to %s. %,d to %,d (~%d%% of original) bytes for %,d keys. Time: %,dms.", - writer.getFilename(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, dTime)); - return sstables.size(); + ssTable.getFilename(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, dTime)); + return toCompact.size(); } public static long getMaxDataAge(Collection<SSTableReader> sstables)