Author: slebresne
Date: Wed Jul  6 11:34:50 2011
New Revision: 1143352

URL: http://svn.apache.org/viewvc?rev=1143352&view=rev
Log:
Handle row tombstones correctly in EchoedRow
patch by slebresne; reviewed by jbellis for CASSANDRA-2786

Modified:
    cassandra/branches/cassandra-0.8/CHANGES.txt
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/EchoedRow.java
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionController.java
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
    
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java

Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1143352&r1=1143351&r2=1143352&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Wed Jul  6 11:34:50 2011
@@ -15,6 +15,7 @@
  * fix index-building status display (CASSANDRA-2853)
  * fix CLI perpetuating obsolete KsDef.replication_factor (CASSANDRA-2846)
  * improve cli treatment of multiline comments (CASSANDRA-2852)
+ * handle row tombstones correctly in EchoedRow (CASSANDRA-2786)
 
 
 0.8.1

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/EchoedRow.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/EchoedRow.java?rev=1143352&r1=1143351&r2=1143352&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/EchoedRow.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/EchoedRow.java
 Wed Jul  6 11:34:50 2011
@@ -26,6 +26,7 @@ import java.io.IOException;
 import java.security.MessageDigest;
 
 import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+import org.apache.cassandra.db.compaction.CompactionController;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 
 /**
@@ -35,11 +36,13 @@ import org.apache.cassandra.io.sstable.S
 public class EchoedRow extends AbstractCompactedRow
 {
     private final SSTableIdentityIterator row;
+    private final int gcBefore;
 
-    public EchoedRow(SSTableIdentityIterator row)
+    public EchoedRow(CompactionController controller, SSTableIdentityIterator 
row)
     {
         super(row.getKey());
         this.row = row;
+        this.gcBefore = controller.gcBefore;
         // Reset SSTableIdentityIterator because we have not guarantee the 
filePointer hasn't moved since the Iterator was built
         row.reset();
     }
@@ -59,7 +62,7 @@ public class EchoedRow extends AbstractC
 
     public boolean isEmpty()
     {
-        return !row.hasNext();
+        return !row.hasNext() && 
ColumnFamilyStore.removeDeletedCF(row.getColumnFamily(), gcBefore) == null;
     }
 
     public int columnCount()

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionController.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionController.java?rev=1143352&r1=1143351&r2=1143352&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionController.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionController.java
 Wed Jul  6 11:34:50 2011
@@ -113,7 +113,7 @@ public class CompactionController
     public AbstractCompactedRow getCompactedRow(List<SSTableIdentityIterator> 
rows)
     {
         if (rows.size() == 1 && !needDeserialize() && 
!shouldPurge(rows.get(0).getKey()))
-            return new EchoedRow(rows.get(0));
+            return new EchoedRow(this, rows.get(0));
 
         long rowSize = 0;
         for (SSTableIdentityIterator row : rows)

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1143352&r1=1143351&r2=1143352&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
 Wed Jul  6 11:34:50 2011
@@ -412,7 +412,8 @@ public class CompactionManager implement
                         // success: perform the compaction
                         try
                         {
-                            doCompactionWithoutSizeEstimation(cfs, sstables, 
gcBefore, location);
+                            // Forcing deserialization because in case the 
user wants expired columns to be transformed to tombstones
+                            doCompactionWithoutSizeEstimation(cfs, sstables, 
gcBefore, location, true);
                         }
                         finally
                         {
@@ -501,7 +502,7 @@ public class CompactionManager implement
         {
             String compactionFileLocation = 
table.getDataFileLocation(cfs.getExpectedCompactedFileSize(smallerSSTables));
             if (compactionFileLocation != null)
-                return doCompactionWithoutSizeEstimation(cfs, smallerSSTables, 
gcBefore, compactionFileLocation);
+                return doCompactionWithoutSizeEstimation(cfs, smallerSSTables, 
gcBefore, compactionFileLocation, false);
 
             logger.warn("insufficient space to compact all requested files " + 
StringUtils.join(smallerSSTables, ", "));
             smallerSSTables.remove(cfs.getMaxSizeFile(smallerSSTables));
@@ -515,7 +516,7 @@ public class CompactionManager implement
      * For internal use and testing only.  The rest of the system should go 
through the submit* methods,
      * which are properly serialized.
      */
-    int doCompactionWithoutSizeEstimation(ColumnFamilyStore cfs, 
Collection<SSTableReader> sstables, int gcBefore, String 
compactionFileLocation) throws IOException
+    int doCompactionWithoutSizeEstimation(ColumnFamilyStore cfs, 
Collection<SSTableReader> sstables, int gcBefore, String 
compactionFileLocation, boolean forceDeserialize) throws IOException
     {
         // The collection of sstables passed may be empty (but not null); even 
if
         // it is not empty, it may compact down to nothing if all rows are 
deleted.
@@ -529,10 +530,6 @@ public class CompactionManager implement
         for (SSTableReader sstable : sstables)
             assert sstable.descriptor.cfname.equals(cfs.columnFamily);
 
-        // compaction won't normally compact a single sstable, so if that's 
what we're doing
-        // it must have been requested manually by the user, which probably 
means he wants to force
-        // tombstone purge, which won't happen unless we force deserializing 
the rows.
-        boolean forceDeserialize = sstables.size() == 1;
         CompactionController controller = new CompactionController(cfs, 
sstables, gcBefore, forceDeserialize);
         // new sstables from flush can be added during a compaction, but only 
the compaction can remove them,
         // so in our single-threaded compaction world this is a valid way of 
determining if we're compacting

Modified: 
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java?rev=1143352&r1=1143351&r2=1143352&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
 Wed Jul  6 11:34:50 2011
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.ArrayList;
 import java.util.Set;
@@ -181,27 +182,54 @@ public class CompactionsTest extends Cle
             if (i % 2 == 0)
                 store.forceBlockingFlush();
         }
+        Collection<SSTableReader> toCompact = store.getSSTables();
 
-        // Force compaction. Since each row is in only one sstable, we will be 
using EchoedRow.
-        CompactionManager.instance.performMajor(store);
+        // Reinserting the same keys. We will compact only the previous 
sstable, but we need those new ones
+        // to make sure we use EchoedRow, otherwise it won't be used because 
purge can be done.
+        for (int i=1; i < 5; i++)
+        {
+            DecoratedKey key = Util.dk(String.valueOf(i));
+            RowMutation rm = new RowMutation(TABLE1, key.key);
+            rm.add(new QueryPath("Standard2", null, 
ByteBufferUtil.bytes(String.valueOf(i))), ByteBufferUtil.EMPTY_BYTE_BUFFER, i);
+            rm.apply();
+        }
+        store.forceBlockingFlush();
+        SSTableReader tmpSSTable = null;
+        for (SSTableReader sstable : store.getSSTables())
+            if (!toCompact.contains(sstable))
+                tmpSSTable = sstable;
+
+        // Force compaction on first sstables. Since each row is in only one 
sstable, we will be using EchoedRow.
+        CompactionManager.instance.doCompaction(store, toCompact, (int) 
(System.currentTimeMillis() / 1000) - store.metadata.getGcGraceSeconds());
+
+        // Now, we remove the sstable that was just created to force the use 
of EchoedRow (so that it doesn't hide the problem)
+        store.markCompacted(Collections.singleton(tmpSSTable));
 
-        // Now assert we do have the two keys
+        // Now assert we do have the 4 keys
         assertEquals(4, Util.getRangeSlice(store).size());
     }
 
     @Test
     public void testDontPurgeAccidentaly() throws IOException, 
ExecutionException, InterruptedException
     {
+        // Testing with and without forcing deserialization. Without 
deserialization, EchoedRow will be used.
+        testDontPurgeAccidentaly("test1", false);
+        testDontPurgeAccidentaly("test2", true);
+    }
+
+    private void testDontPurgeAccidentaly(String k, boolean forceDeserialize) 
throws IOException, ExecutionException, InterruptedException
+    {
         // This test catches the regression of CASSANDRA-2786
         Table table = Table.open(TABLE1);
         String cfname = "Super5";
         ColumnFamilyStore store = table.getColumnFamilyStore(cfname);
 
         // disable compaction while flushing
+        store.removeAllSSTables();
         store.disableAutoCompaction();
 
         // Add test row
-        DecoratedKey key = Util.dk("test");
+        DecoratedKey key = Util.dk(k);
         RowMutation rm = new RowMutation(TABLE1, key.key);
         rm.add(new QueryPath(cfname, ByteBufferUtil.bytes("sc"), 
ByteBufferUtil.bytes("c")), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
         rm.apply();
@@ -210,11 +238,10 @@ public class CompactionsTest extends Cle
 
         Collection<SSTableReader> sstablesBefore = store.getSSTables();
 
-        QueryFilter filter = QueryFilter.getIdentityFilter(Util.dk("test"), 
new QueryPath(cfname, null, null));
+        QueryFilter filter = QueryFilter.getIdentityFilter(key, new 
QueryPath(cfname, null, null));
         assert !store.getColumnFamily(filter).isEmpty();
 
         // Remove key
-        key = Util.dk("test");
         rm = new RowMutation(TABLE1, key.key);
         rm.delete(new QueryPath(cfname, null, null), 2);
         rm.apply();
@@ -225,12 +252,13 @@ public class CompactionsTest extends Cle
         store.forceBlockingFlush();
 
         Collection<SSTableReader> sstablesAfter = store.getSSTables();
-        Collection<Descriptor> toCompact = new ArrayList<Descriptor>();
+        Collection<SSTableReader> toCompact = new ArrayList<SSTableReader>();
         for (SSTableReader sstable : sstablesAfter)
             if (!sstablesBefore.contains(sstable))
-                toCompact.add(sstable.descriptor);
+                toCompact.add(sstable);
 
-        CompactionManager.instance.submitUserDefined(store, toCompact, (int) 
(System.currentTimeMillis() / 1000) - store.metadata.getGcGraceSeconds()).get();
+        String location = store.table.getDataFileLocation(1);
+        CompactionManager.instance.doCompactionWithoutSizeEstimation(store, 
toCompact, (int) (System.currentTimeMillis() / 1000) - 
store.metadata.getGcGraceSeconds(), location, forceDeserialize);
 
         cf = store.getColumnFamily(filter);
         assert cf.isEmpty() : "should be empty: " + cf;


Reply via email to