Author: slebresne Date: Wed Jul 6 11:34:50 2011 New Revision: 1143352 URL: http://svn.apache.org/viewvc?rev=1143352&view=rev Log: Handle row tombstones correctly in EchoedRow patch by slebresne; reviewed by jbellis for CASSANDRA-2786
Modified: cassandra/branches/cassandra-0.8/CHANGES.txt cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/EchoedRow.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionController.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java Modified: cassandra/branches/cassandra-0.8/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1143352&r1=1143351&r2=1143352&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.8/CHANGES.txt Wed Jul 6 11:34:50 2011 @@ -15,6 +15,7 @@ * fix index-building status display (CASSANDRA-2853) * fix CLI perpetuating obsolete KsDef.replication_factor (CASSANDRA-2846) * improve cli treatment of multiline comments (CASSANDRA-2852) + * handle row tombstones correctly in EchoedRow (CASSANDRA-2786) 0.8.1 Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/EchoedRow.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/EchoedRow.java?rev=1143352&r1=1143351&r2=1143352&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/EchoedRow.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/EchoedRow.java Wed Jul 6 11:34:50 2011 @@ -26,6 +26,7 @@ import java.io.IOException; import java.security.MessageDigest; import org.apache.cassandra.db.compaction.AbstractCompactedRow; +import org.apache.cassandra.db.compaction.CompactionController; import org.apache.cassandra.io.sstable.SSTableIdentityIterator; /** @@ -35,11 +36,13 @@ import org.apache.cassandra.io.sstable.S public class EchoedRow extends AbstractCompactedRow { private final SSTableIdentityIterator row; + private final int gcBefore; - public EchoedRow(SSTableIdentityIterator row) + public EchoedRow(CompactionController controller, SSTableIdentityIterator row) { super(row.getKey()); this.row = row; + this.gcBefore = controller.gcBefore; // Reset SSTableIdentityIterator because we have not guarantee the filePointer hasn't moved since the Iterator was built row.reset(); } @@ -59,7 +62,7 @@ public class EchoedRow extends AbstractC public boolean isEmpty() { - return !row.hasNext(); + return !row.hasNext() && ColumnFamilyStore.removeDeletedCF(row.getColumnFamily(), gcBefore) == null; } public int columnCount() Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionController.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionController.java?rev=1143352&r1=1143351&r2=1143352&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionController.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionController.java Wed Jul 6 11:34:50 2011 @@ -113,7 +113,7 @@ public class CompactionController public AbstractCompactedRow getCompactedRow(List<SSTableIdentityIterator> rows) { if (rows.size() == 1 && !needDeserialize() && !shouldPurge(rows.get(0).getKey())) - return new EchoedRow(rows.get(0)); + return new EchoedRow(this, rows.get(0)); long rowSize = 0; for (SSTableIdentityIterator row : rows) Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1143352&r1=1143351&r2=1143352&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Wed Jul 6 11:34:50 2011 @@ -412,7 +412,8 @@ public class CompactionManager implement // success: perform the compaction try { - doCompactionWithoutSizeEstimation(cfs, sstables, gcBefore, location); + // Forcing deserialization because in case the user wants expired columns to be transformed to tombstones + doCompactionWithoutSizeEstimation(cfs, sstables, gcBefore, location, true); } finally { @@ -501,7 +502,7 @@ public class CompactionManager implement { String compactionFileLocation = table.getDataFileLocation(cfs.getExpectedCompactedFileSize(smallerSSTables)); if (compactionFileLocation != null) - return doCompactionWithoutSizeEstimation(cfs, smallerSSTables, gcBefore, compactionFileLocation); + return doCompactionWithoutSizeEstimation(cfs, smallerSSTables, gcBefore, compactionFileLocation, false); logger.warn("insufficient space to compact all requested files " + StringUtils.join(smallerSSTables, ", ")); smallerSSTables.remove(cfs.getMaxSizeFile(smallerSSTables)); @@ -515,7 +516,7 @@ public class CompactionManager implement * For internal use and testing only. The rest of the system should go through the submit* methods, * which are properly serialized. */ - int doCompactionWithoutSizeEstimation(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int gcBefore, String compactionFileLocation) throws IOException + int doCompactionWithoutSizeEstimation(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int gcBefore, String compactionFileLocation, boolean forceDeserialize) throws IOException { // The collection of sstables passed may be empty (but not null); even if // it is not empty, it may compact down to nothing if all rows are deleted. @@ -529,10 +530,6 @@ public class CompactionManager implement for (SSTableReader sstable : sstables) assert sstable.descriptor.cfname.equals(cfs.columnFamily); - // compaction won't normally compact a single sstable, so if that's what we're doing - // it must have been requested manually by the user, which probably means he wants to force - // tombstone purge, which won't happen unless we force deserializing the rows. - boolean forceDeserialize = sstables.size() == 1; CompactionController controller = new CompactionController(cfs, sstables, gcBefore, forceDeserialize); // new sstables from flush can be added during a compaction, but only the compaction can remove them, // so in our single-threaded compaction world this is a valid way of determining if we're compacting Modified: cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java?rev=1143352&r1=1143351&r2=1143352&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java (original) +++ cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java Wed Jul 6 11:34:50 2011 @@ -24,6 +24,7 @@ import java.nio.ByteBuffer; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.ArrayList; import java.util.Set; @@ -181,27 +182,54 @@ public class CompactionsTest extends Cle if (i % 2 == 0) store.forceBlockingFlush(); } + Collection<SSTableReader> toCompact = store.getSSTables(); - // Force compaction. Since each row is in only one sstable, we will be using EchoedRow. - CompactionManager.instance.performMajor(store); + // Reinserting the same keys. We will compact only the previous sstable, but we need those new ones + // to make sure we use EchoedRow, otherwise it won't be used because purge can be done. + for (int i=1; i < 5; i++) + { + DecoratedKey key = Util.dk(String.valueOf(i)); + RowMutation rm = new RowMutation(TABLE1, key.key); + rm.add(new QueryPath("Standard2", null, ByteBufferUtil.bytes(String.valueOf(i))), ByteBufferUtil.EMPTY_BYTE_BUFFER, i); + rm.apply(); + } + store.forceBlockingFlush(); + SSTableReader tmpSSTable = null; + for (SSTableReader sstable : store.getSSTables()) + if (!toCompact.contains(sstable)) + tmpSSTable = sstable; + + // Force compaction on first sstables. Since each row is in only one sstable, we will be using EchoedRow. + CompactionManager.instance.doCompaction(store, toCompact, (int) (System.currentTimeMillis() / 1000) - store.metadata.getGcGraceSeconds()); + + // Now, we remove the sstable that was just created to force the use of EchoedRow (so that it doesn't hide the problem) + store.markCompacted(Collections.singleton(tmpSSTable)); - // Now assert we do have the two keys + // Now assert we do have the 4 keys assertEquals(4, Util.getRangeSlice(store).size()); } @Test public void testDontPurgeAccidentaly() throws IOException, ExecutionException, InterruptedException { + // Testing with and without forcing deserialization. Without deserialization, EchoedRow will be used. + testDontPurgeAccidentaly("test1", false); + testDontPurgeAccidentaly("test2", true); + } + + private void testDontPurgeAccidentaly(String k, boolean forceDeserialize) throws IOException, ExecutionException, InterruptedException + { // This test catches the regression of CASSANDRA-2786 Table table = Table.open(TABLE1); String cfname = "Super5"; ColumnFamilyStore store = table.getColumnFamilyStore(cfname); // disable compaction while flushing + store.removeAllSSTables(); store.disableAutoCompaction(); // Add test row - DecoratedKey key = Util.dk("test"); + DecoratedKey key = Util.dk(k); RowMutation rm = new RowMutation(TABLE1, key.key); rm.add(new QueryPath(cfname, ByteBufferUtil.bytes("sc"), ByteBufferUtil.bytes("c")), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0); rm.apply(); @@ -210,11 +238,10 @@ public class CompactionsTest extends Cle Collection<SSTableReader> sstablesBefore = store.getSSTables(); - QueryFilter filter = QueryFilter.getIdentityFilter(Util.dk("test"), new QueryPath(cfname, null, null)); + QueryFilter filter = QueryFilter.getIdentityFilter(key, new QueryPath(cfname, null, null)); assert !store.getColumnFamily(filter).isEmpty(); // Remove key - key = Util.dk("test"); rm = new RowMutation(TABLE1, key.key); rm.delete(new QueryPath(cfname, null, null), 2); rm.apply(); @@ -225,12 +252,13 @@ public class CompactionsTest extends Cle store.forceBlockingFlush(); Collection<SSTableReader> sstablesAfter = store.getSSTables(); - Collection<Descriptor> toCompact = new ArrayList<Descriptor>(); + Collection<SSTableReader> toCompact = new ArrayList<SSTableReader>(); for (SSTableReader sstable : sstablesAfter) if (!sstablesBefore.contains(sstable)) - toCompact.add(sstable.descriptor); + toCompact.add(sstable); - CompactionManager.instance.submitUserDefined(store, toCompact, (int) (System.currentTimeMillis() / 1000) - store.metadata.getGcGraceSeconds()).get(); + String location = store.table.getDataFileLocation(1); + CompactionManager.instance.doCompactionWithoutSizeEstimation(store, toCompact, (int) (System.currentTimeMillis() / 1000) - store.metadata.getGcGraceSeconds(), location, forceDeserialize); cf = store.getColumnFamily(filter); assert cf.isEmpty() : "should be empty: " + cf;