better handling for amid compaction failure; patch by yukim reviewed by 
slebresne for CASSANDRA-5137


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3cc8656f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3cc8656f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3cc8656f

Branch: refs/heads/cassandra-1.1
Commit: 3cc8656f8fbb67c7e665fe27642076ae0109c2b5
Parents: 1cbbba0
Author: Yuki Morishita <yu...@apache.org>
Authored: Fri Jan 11 12:32:59 2013 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Fri Jan 11 12:32:59 2013 -0600

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../org/apache/cassandra/db/ColumnFamilyStore.java |   35 ++++++++++-----
 .../cassandra/db/compaction/CompactionTask.java    |   28 +++++++-----
 3 files changed, 42 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cc8656f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 82f503c..6c76151 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@
  * fix user defined compaction to run against 1.1 data directory 
(CASSANDRA-5118)
  * Fix CQL3 BATCH authorization caching (CASSANDRA-5145)
  * fix get_count returns incorrect value with TTL (CASSANDRA-5099)
+ * better handling for amid compaction failure (CASSANDRA-5137)
 
 
 1.1.8

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cc8656f/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 8284d38..2781800 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -244,20 +244,33 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
             Directories.SSTableLister sstableFiles = 
directories.sstableLister().skipCompacted(true).skipTemporary(true);
             Collection<SSTableReader> sstables = 
SSTableReader.batchOpen(sstableFiles.list().entrySet(), savedKeys, data, 
metadata, this.partitioner);
 
-            // Filter non-compacted sstables, remove compacted ones
-            Set<Integer> compactedSSTables = new HashSet<Integer>();
-            for (SSTableReader sstable : sstables)
-                compactedSSTables.addAll(sstable.getAncestors());
+            if (metadata.getDefaultValidator().isCommutative())
+            {
+                // Filter non-compacted sstables, remove compacted ones
+                Set<Integer> compactedSSTables = new HashSet<Integer>();
+                for (SSTableReader sstable : sstables)
+                    compactedSSTables.addAll(sstable.getAncestors());
 
-            Set<SSTableReader> liveSSTables = new HashSet<SSTableReader>();
-            for (SSTableReader sstable : sstables)
+                Set<SSTableReader> liveSSTables = new HashSet<SSTableReader>();
+                for (SSTableReader sstable : sstables)
+                {
+                    if 
(compactedSSTables.contains(sstable.descriptor.generation))
+                    {
+                        logger.info("{} is already compacted and will be 
removed.", sstable);
+                        sstable.markCompacted(); // we need to mark as 
compacted to be deleted
+                        sstable.releaseReference(); // this amount to deleting 
the sstable
+                    }
+                    else
+                    {
+                        liveSSTables.add(sstable);
+                    }
+                }
+                data.addInitialSSTables(liveSSTables);
+            }
+            else
             {
-                if (compactedSSTables.contains(sstable.descriptor.generation))
-                    sstable.releaseReference(); // this amount to deleting the 
sstable
-                else
-                    liveSSTables.add(sstable);
+                data.addInitialSSTables(sstables);
             }
-            data.addInitialSSTables(liveSSTables);
         }
 
         // compaction strategy should be created after the CFS has been 
prepared

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cc8656f/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index b252bc5..714e308 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -32,9 +32,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
 import 
org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector;
-import org.apache.cassandra.io.sstable.SSTable;
-import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.sstable.SSTableWriter;
+import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.utils.CloseableIterator;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -127,7 +125,7 @@ public class CompactionTask extends AbstractCompactionTask
 
         // we can't preheat until the tracker has been set. This doesn't 
happen until we tell the cfs to
         // replace the old entries.  Track entries to preheat here until then.
-        Map<SSTableReader, Map<DecoratedKey, Long>> cachedKeyMap =  new 
HashMap<SSTableReader, Map<DecoratedKey, Long>>();
+        Map<Descriptor, Map<DecoratedKey, Long>> cachedKeyMap =  new 
HashMap<Descriptor, Map<DecoratedKey, Long>>();
 
         Collection<SSTableReader> sstables = new ArrayList<SSTableReader>();
         Collection<SSTableWriter> writers = new ArrayList<SSTableWriter>();
@@ -175,9 +173,8 @@ public class CompactionTask extends AbstractCompactionTask
                 }
                 if (!nni.hasNext() || 
newSSTableSegmentThresholdReached(writer))
                 {
-                    SSTableReader toIndex = 
writer.closeAndOpenReader(getMaxDataAge(toCompact));
-                    cachedKeyMap.put(toIndex, cachedKeys);
-                    sstables.add(toIndex);
+                    // tmp = false because later we want to query it with 
descriptor from SSTableReader
+                    cachedKeyMap.put(writer.descriptor.asTemporary(false), 
cachedKeys);
                     if (nni.hasNext())
                     {
                         writer = cfs.createCompactionWriter(keysPerSSTable, 
compactionFileLocation, toCompact);
@@ -186,11 +183,21 @@ public class CompactionTask extends AbstractCompactionTask
                     }
                 }
             }
+
+            long maxAge = getMaxDataAge(toCompact);
+            for (SSTableWriter completedWriter : writers)
+                sstables.add(completedWriter.closeAndOpenReader(maxAge));
         }
         catch (Exception e)
         {
             for (SSTableWriter writer : writers)
                 writer.abort();
+            // also remove already completed SSTables
+            for (SSTableReader sstable : sstables)
+            {
+                sstable.markCompacted();
+                sstable.releaseReference();
+            }
             throw FBUtilities.unchecked(e);
         }
         finally
@@ -202,11 +209,10 @@ public class CompactionTask extends AbstractCompactionTask
 
         cfs.replaceCompactedSSTables(toCompact, sstables, compactionType);
         // TODO: this doesn't belong here, it should be part of the reader to 
load when the tracker is wired up
-        for (Map.Entry<SSTableReader, Map<DecoratedKey, Long>> 
ssTableReaderMapEntry : cachedKeyMap.entrySet())
+        for (SSTableReader sstable : sstables)
         {
-            SSTableReader key = ssTableReaderMapEntry.getKey();
-            for (Map.Entry<DecoratedKey, Long> entry : 
ssTableReaderMapEntry.getValue().entrySet())
-               key.cacheKey(entry.getKey(), entry.getValue());
+            for (Map.Entry<DecoratedKey, Long> entry : 
cachedKeyMap.get(sstable.descriptor).entrySet())
+               sstable.cacheKey(entry.getKey(), entry.getValue());
         }
 
         long dTime = System.currentTimeMillis() - startTime;

Reply via email to