Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 6ae1b420a -> 35f173a0e
Make LCS split compaction results over many directories Patch by marcuse; reviewed by yukim for CASSANDRA-8329 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2ce1ad8e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2ce1ad8e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2ce1ad8e Branch: refs/heads/cassandra-2.1 Commit: 2ce1ad8e6f5d3c5cf781e1ff87cda4f61c89d9ee Parents: 025b406 Author: Marcus Eriksson <marc...@apache.org> Authored: Tue Nov 18 11:01:17 2014 +0100 Committer: Marcus Eriksson <marc...@apache.org> Committed: Mon Nov 24 09:43:47 2014 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/db/Memtable.java | 20 ++++-- .../cassandra/db/compaction/CompactionTask.java | 74 ++++++++++++-------- .../cassandra/io/util/DiskAwareRunnable.java | 37 +++------- 4 files changed, 69 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ce1ad8e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 01ea887..6a5ac0d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0.12: + * Make LCS split compaction results over all data directories (CASSANDRA-8329) * Fix some failing queries that use multi-column relations on COMPACT STORAGE tables (CASSANDRA-8264) * Fix InvalidRequestException with ORDER BY (CASSANDRA-8286) http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ce1ad8e/src/java/org/apache/cassandra/db/Memtable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java index 0b186dc..425b352 100644 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@ -336,13 +336,23 @@ public class Memtable return estimatedSize; } - protected void runWith(File sstableDirectory) throws Exception + protected void runMayThrow() throws Exception { + long writeSize = getExpectedWriteSize(); + Directories.DataDirectory dataDirectory = getWriteDirectory(writeSize); + File sstableDirectory = cfs.directories.getLocationForDisk(dataDirectory); assert sstableDirectory != null : "Flush task is not bound to any disk"; - - SSTableReader sstable = writeSortedContents(context, sstableDirectory); - cfs.replaceFlushed(Memtable.this, sstable); - latch.countDown(); + try + { + SSTableReader sstable = writeSortedContents(context, sstableDirectory); + cfs.replaceFlushed(Memtable.this, sstable); + latch.countDown(); + } + finally + { + if (dataDirectory != null) + returnWriteDirectory(dataDirectory, writeSize); + } } protected Directories getDirectories() http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ce1ad8e/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 5ef4aad..08fe81a 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -87,11 +87,11 @@ public class CompactionTask extends AbstractCompactionTask * which are properly serialized. * Caller is in charge of marking/unmarking the sstables as compacting. */ - protected void runWith(File sstableDirectory) throws Exception + protected void runMayThrow() throws Exception { // The collection of sstables passed may be empty (but not null); even if // it is not empty, it may compact down to nothing if all rows are deleted. - assert sstables != null && sstableDirectory != null; + assert sstables != null; // Note that the current compaction strategy, is not necessarily the one this task was created under. // This should be harmless; see comments to CFS.maybeReloadCompactionStrategy. @@ -149,45 +149,60 @@ public class CompactionTask extends AbstractCompactionTask return; } - SSTableWriter writer = createCompactionWriter(sstableDirectory, keysPerSSTable); + long writeSize = getExpectedWriteSize() / estimatedSSTables; + Directories.DataDirectory dataDirectory = getWriteDirectory(writeSize); + SSTableWriter writer = createCompactionWriter(cfs.directories.getLocationForDisk(dataDirectory), keysPerSSTable); writers.add(writer); - while (iter.hasNext()) + try { - if (ci.isStopRequested()) - throw new CompactionInterruptedException(ci.getCompactionInfo()); - - AbstractCompactedRow row = iter.next(); - RowIndexEntry indexEntry = writer.append(row); - if (indexEntry == null) + while (iter.hasNext()) { - controller.invalidateCachedRow(row.key); - row.close(); - continue; - } + if (ci.isStopRequested()) + throw new CompactionInterruptedException(ci.getCompactionInfo()); - totalkeysWritten++; + AbstractCompactedRow row = iter.next(); + RowIndexEntry indexEntry = writer.append(row); + if (indexEntry == null) + { + controller.invalidateCachedRow(row.key); + row.close(); + continue; + } - if (DatabaseDescriptor.getPreheatKeyCache()) - { - for (SSTableReader sstable : actuallyCompact) + totalkeysWritten++; + + if (DatabaseDescriptor.getPreheatKeyCache()) { - if (sstable.getCachedPosition(row.key, false) != null) + for (SSTableReader sstable : actuallyCompact) { - cachedKeys.put(row.key, indexEntry); - break; + if (sstable.getCachedPosition(row.key, false) != null) + { + cachedKeys.put(row.key, indexEntry); + break; + } } } - } - if (newSSTableSegmentThresholdReached(writer)) - { - // tmp = false because later we want to query it with descriptor from SSTableReader - cachedKeyMap.put(writer.descriptor.asTemporary(false), cachedKeys); - writer = createCompactionWriter(sstableDirectory, keysPerSSTable); - writers.add(writer); - cachedKeys = new HashMap<DecoratedKey, RowIndexEntry>(); + if (newSSTableSegmentThresholdReached(writer)) + { + // tmp = false because later we want to query it with descriptor from SSTableReader + cachedKeyMap.put(writer.descriptor.asTemporary(false), cachedKeys); + returnWriteDirectory(dataDirectory, writeSize); + // make sure we don't try to call returnWriteDirectory in finally {..} if we throw exception in getWriteDirectory() below: + dataDirectory = null; + writeSize = getExpectedWriteSize() / estimatedSSTables; + dataDirectory = getWriteDirectory(writeSize); + writer = createCompactionWriter(cfs.directories.getLocationForDisk(dataDirectory), keysPerSSTable); + writers.add(writer); + cachedKeys = new HashMap<DecoratedKey, RowIndexEntry>(); + } } } + finally + { + if (dataDirectory != null) + returnWriteDirectory(dataDirectory, writeSize); + } if (writer.getFilePointer() > 0) { @@ -291,6 +306,7 @@ public class CompactionTask extends AbstractCompactionTask private SSTableWriter createCompactionWriter(File sstableDirectory, long keysPerSSTable) { + assert sstableDirectory != null; return new SSTableWriter(cfs.getTempSSTablePath(sstableDirectory), keysPerSSTable, cfs.metadata, http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ce1ad8e/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java index 198a88d..93b06ab 100644 --- a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java +++ b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java @@ -17,23 +17,16 @@ */ package org.apache.cassandra.io.util; -import java.io.File; - import org.apache.cassandra.db.Directories; import org.apache.cassandra.utils.WrappedRunnable; public abstract class DiskAwareRunnable extends WrappedRunnable { - /** - * Run this task after selecting the optimal disk for it - */ - protected void runMayThrow() throws Exception + protected Directories.DataDirectory getWriteDirectory(long writeSize) { - long writeSize; Directories.DataDirectory directory; while (true) { - writeSize = getExpectedWriteSize(); directory = getDirectories().getWriteableLocation(); if (directory != null || !reduceScopeForLimitedSpace()) break; @@ -43,15 +36,13 @@ public abstract class DiskAwareRunnable extends WrappedRunnable directory.currentTasks.incrementAndGet(); directory.estimatedWorkingSize.addAndGet(writeSize); - try - { - runWith(getDirectories().getLocationForDisk(directory)); - } - finally - { - directory.estimatedWorkingSize.addAndGet(-1 * writeSize); - directory.currentTasks.decrementAndGet(); - } + return directory; + } + + protected void returnWriteDirectory(Directories.DataDirectory directory, long writeSize) + { + directory.estimatedWorkingSize.addAndGet(-1 * writeSize); + directory.currentTasks.decrementAndGet(); } /** @@ -61,18 +52,6 @@ public abstract class DiskAwareRunnable extends WrappedRunnable protected abstract Directories getDirectories(); /** - * Executes this task on given {@code sstableDirectory}. - * @param sstableDirectory sstable directory to work on - */ - protected abstract void runWith(File sstableDirectory) throws Exception; - - /** - * Get expected write size to determine which disk to use for this task. - * @return expected size in bytes this task will write to disk. - */ - public abstract long getExpectedWriteSize(); - - /** * Called if no disk is available with free space for the full write size. * @return true if the scope of the task was successfully reduced. */