Author: slebresne Date: Mon May 23 07:34:59 2011 New Revision: 1126356 URL: http://svn.apache.org/viewvc?rev=1126356&view=rev Log: Improve forceDeserialize/getCompactedRow encapsulation patch by jbellis; reviewed by slebresne for CASSANDRA-<ticket>
Added: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/EchoedRow.java Modified: cassandra/branches/cassandra-0.8/CHANGES.txt cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CompactionManager.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/marshal/AbstractType.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/CompactionController.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/CompactionIterator.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/LazilyCompactedRow.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java Modified: cassandra/branches/cassandra-0.8/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1126356&r1=1126355&r2=1126356&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.8/CHANGES.txt Mon May 23 07:34:59 2011 @@ -14,6 +14,7 @@ buffers again, especially on CL writes (CASSANDRA-2660) * add DROP INDEX support to CLI (CASSANDRA-2616) * don't perform HH to client-mode [storageproxy] nodes (CASSANDRA-2668) + * Improve forceDeserialize/getCompactedRow encapsulation (CASSANDRA-2659) 0.8.0-final Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1126356&r1=1126355&r2=1126356&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Mon May 23 07:34:59 2011 @@ -968,9 +968,9 @@ public class ColumnFamilyStore implement data.markCompacted(sstables); } - boolean isCompleteSSTables(Collection<SSTableReader> sstables) + public boolean isCompleteSSTables(Set<SSTableReader> sstables) { - return data.getSSTables().equals(new HashSet<SSTableReader>(sstables)); + return data.getSSTables().equals(sstables); } void replaceCompactedSSTables(Collection<SSTableReader> sstables, Iterable<SSTableReader> replacements) Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CompactionManager.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1126356&r1=1126355&r2=1126356&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CompactionManager.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CompactionManager.java Mon May 23 07:34:59 2011 @@ -18,13 +18,11 @@ package org.apache.cassandra.db; -import java.io.DataOutput; import java.io.File; import java.io.IOError; import java.io.IOException; import java.lang.management.ManagementFactory; import java.nio.ByteBuffer; -import java.security.MessageDigest; import java.util.*; import java.util.Map.Entry; import java.util.concurrent.*; @@ -128,7 +126,7 @@ public class CompactionManager implement logger.debug("Checking to see if compaction of " + cfs.columnFamily + " would be useful"); Set<List<SSTableReader>> buckets = getBuckets(convertSSTablesToPairs(cfs.getSSTables()), 50L * 1024L * 1024L); updateEstimateFor(cfs, buckets); - int gcBefore = cfs.isIndex() ? Integer.MAX_VALUE : getDefaultGcBefore(cfs); + int gcBefore = getDefaultGcBefore(cfs); for (List<SSTableReader> sstables : buckets) { @@ -529,11 +527,15 @@ public class CompactionManager implement for (SSTableReader sstable : sstables) assert sstable.descriptor.cfname.equals(cfs.columnFamily); + // compaction won't normally compact a single sstable, so if that's what we're doing + // it must have been requested manually by the user, which probably means he wants to force + // tombstone purge, which won't happen unless we force deserializing the rows. + boolean forceDeserialize = sstables.size() == 1; + CompactionController controller = new CompactionController(cfs, sstables, gcBefore, forceDeserialize); // new sstables from flush can be added during a compaction, but only the compaction can remove them, // so in our single-threaded compaction world this is a valid way of determining if we're compacting // all the sstables (that existed when we started) - boolean major = cfs.isCompleteSSTables(sstables); - CompactionType type = major + CompactionType type = controller.isMajor() ? CompactionType.MAJOR : CompactionType.MINOR; logger.info("Compacting {}: {}", type, sstables); @@ -547,7 +549,6 @@ public class CompactionManager implement logger.debug("Expected bloom filter size : " + expectedBloomFilterSize); SSTableWriter writer; - CompactionController controller = new CompactionController(cfs, sstables, major, gcBefore, false); CompactionIterator ci = new CompactionIterator(type, sstables, controller); // retain a handle so we can call close() Iterator<AbstractCompactedRow> nni = new FilterIterator(ci, PredicateUtils.notNullPredicate()); Map<DecoratedKey, Long> cachedKeys = new HashMap<DecoratedKey, Long>(); @@ -630,6 +631,7 @@ public class CompactionManager implement for (final SSTableReader sstable : sstables) { logger.info("Scrubbing " + sstable); + CompactionController controller = new CompactionController(cfs, Collections.singletonList(sstable), getDefaultGcBefore(cfs), true); // Calculate the expected compacted filesize String compactionFileLocation = cfs.table.getDataFileLocation(sstable.length()); @@ -708,7 +710,7 @@ public class CompactionManager implement if (dataSize > dataFile.length()) throw new IOError(new IOException("Impossible row size " + dataSize)); SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataStart, dataSize, true); - AbstractCompactedRow compactedRow = getCompactedRow(row, sstable.descriptor, true); + AbstractCompactedRow compactedRow = controller.getCompactedRow(row); if (compactedRow.isEmpty()) { emptyRows++; @@ -736,7 +738,7 @@ public class CompactionManager implement try { SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataStartFromIndex, dataSizeFromIndex, true); - AbstractCompactedRow compactedRow = getCompactedRow(row, sstable.descriptor, true); + AbstractCompactedRow compactedRow = controller.getCompactedRow(row); if (compactedRow.isEmpty()) { emptyRows++; @@ -811,7 +813,9 @@ public class CompactionManager implement for (SSTableReader sstable : sstables) { + CompactionController controller = new CompactionController(cfs, Collections.singletonList(sstable), getDefaultGcBefore(cfs), false); long startTime = System.currentTimeMillis(); + long totalkeysWritten = 0; int expectedBloomFilterSize = Math.max(DatabaseDescriptor.getIndexInterval(), @@ -841,7 +845,7 @@ public class CompactionManager implement if (Range.isTokenInRanges(row.getKey().token, ranges)) { writer = maybeCreateWriter(cfs, compactionFileLocation, expectedBloomFilterSize, writer, Collections.singletonList(sstable)); - writer.append(getCompactedRow(row, sstable.descriptor, false)); + writer.append(controller.getCompactedRow(row)); totalkeysWritten++; } else @@ -906,21 +910,6 @@ public class CompactionManager implement } } - /** - * @return an AbstractCompactedRow implementation to write the row in question. - * If the data is from a current-version sstable, write it unchanged. Otherwise, - * re-serialize it in the latest version. The returned AbstractCompactedRow will not purge data. - */ - private AbstractCompactedRow getCompactedRow(SSTableIdentityIterator row, Descriptor descriptor, boolean forceDeserialize) - { - if (descriptor.isLatestVersion && !forceDeserialize) - return new EchoedRow(row); - - return row.dataSize > DatabaseDescriptor.getInMemoryCompactionLimit() - ? new LazilyCompactedRow(CompactionController.getBasicController(forceDeserialize), Arrays.asList(row)) - : new PrecompactedRow(CompactionController.getBasicController(forceDeserialize), Arrays.asList(row)); - } - private SSTableWriter maybeCreateWriter(ColumnFamilyStore cfs, String compactionFileLocation, int expectedBloomFilterSize, SSTableWriter writer, Collection<SSTableReader> sstables) throws IOException { @@ -1146,7 +1135,9 @@ public class CompactionManager implement private static int getDefaultGcBefore(ColumnFamilyStore cfs) { - return (int) (System.currentTimeMillis() / 1000) - cfs.metadata.getGcGraceSeconds(); + return cfs.isIndex() + ? Integer.MAX_VALUE + : (int) (System.currentTimeMillis() / 1000) - cfs.metadata.getGcGraceSeconds(); } private static class ValidationCompactionIterator extends CompactionIterator @@ -1155,7 +1146,7 @@ public class CompactionManager implement { super(CompactionType.VALIDATION, getCollatingIterator(cfs.getSSTables(), range), - new CompactionController(cfs, cfs.getSSTables(), true, getDefaultGcBefore(cfs), false)); + new CompactionController(cfs, cfs.getSSTables(), getDefaultGcBefore(cfs), true)); } protected static CollatingIterator getCollatingIterator(Iterable<SSTableReader> sstables, Range range) throws IOException @@ -1277,40 +1268,6 @@ public class CompactionManager implement } } - private static class EchoedRow extends AbstractCompactedRow - { - private final SSTableIdentityIterator row; - - public EchoedRow(SSTableIdentityIterator row) - { - super(row.getKey()); - this.row = row; - } - - public void write(DataOutput out) throws IOException - { - assert row.dataSize > 0; - out.writeLong(row.dataSize); - row.echoData(out); - } - - public void update(MessageDigest digest) - { - // EchoedRow is not used in anti-entropy validation - throw new UnsupportedOperationException(); - } - - public boolean isEmpty() - { - return !row.hasNext(); - } - - public int columnCount() - { - return row.columnCount; - } - } - private static class CleanupInfo implements CompactionInfo.Holder { private final SSTableReader sstable; Added: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/EchoedRow.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/EchoedRow.java?rev=1126356&view=auto ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/EchoedRow.java (added) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/EchoedRow.java Mon May 23 07:34:59 2011 @@ -0,0 +1,46 @@ +package org.apache.cassandra.db; + +import java.io.DataOutput; +import java.io.IOException; +import java.security.MessageDigest; + +import org.apache.cassandra.io.AbstractCompactedRow; +import org.apache.cassandra.io.sstable.SSTableIdentityIterator; + +/** + * A CompactedRow implementation that just echos the original row bytes without deserializing. + * Currently only used by cleanup. + */ +public class EchoedRow extends AbstractCompactedRow +{ + private final SSTableIdentityIterator row; + + public EchoedRow(SSTableIdentityIterator row) + { + super(row.getKey()); + this.row = row; + } + + public void write(DataOutput out) throws IOException + { + assert row.dataSize > 0; + out.writeLong(row.dataSize); + row.echoData(out); + } + + public void update(MessageDigest digest) + { + // EchoedRow is not used in anti-entropy validation + throw new UnsupportedOperationException(); + } + + public boolean isEmpty() + { + return !row.hasNext(); + } + + public int columnCount() + { + return row.columnCount; + } +} Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/marshal/AbstractType.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/marshal/AbstractType.java?rev=1126356&r1=1126355&r2=1126356&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/marshal/AbstractType.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/marshal/AbstractType.java Mon May 23 07:34:59 2011 @@ -100,7 +100,7 @@ public abstract class AbstractType<T> im /** get a string representation of a particular type. */ public abstract String toString(T t); - + /** get a string representation of the bytes suitable for log messages */ public abstract String getString(ByteBuffer bytes); Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/CompactionController.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/CompactionController.java?rev=1126356&r1=1126355&r2=1126356&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/CompactionController.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/CompactionController.java Mon May 23 07:34:59 2011 @@ -19,21 +19,28 @@ */ package org.apache.cassandra.io; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; +import java.util.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamily; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.EchoedRow; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableIdentityIterator; import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.utils.ByteBufferUtil; /** * Manage compaction options. */ public class CompactionController { + private static Logger logger = LoggerFactory.getLogger(CompactionController.class); + private final ColumnFamilyStore cfs; private final Set<SSTableReader> sstables; private final boolean forceDeserialize; @@ -41,41 +48,31 @@ public class CompactionController public final boolean isMajor; public final int gcBefore; - private static final CompactionController basicController = new CompactionController(null, Collections.<SSTableReader>emptySet(), false, Integer.MAX_VALUE, false); - private static final CompactionController basicDeserializingController = new CompactionController(null, Collections.<SSTableReader>emptySet(), false, Integer.MAX_VALUE, true); - - public CompactionController(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, boolean isMajor, int gcBefore, boolean forceDeserialize) + public CompactionController(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int gcBefore, boolean forceDeserialize) { + assert cfs != null; this.cfs = cfs; - this.isMajor = isMajor; this.sstables = new HashSet<SSTableReader>(sstables); this.gcBefore = gcBefore; this.forceDeserialize = forceDeserialize; + isMajor = cfs.isCompleteSSTables(this.sstables); } - /** - * Returns a controller that never purge - */ - public static CompactionController getBasicController(boolean forceDeserialize) - { - return forceDeserialize ? basicDeserializingController : basicController; - } - - /** @return The keyspace name: only valid if created with a non-null CFS. */ + /** @return the keyspace name */ public String getKeyspace() { - return cfs != null ? cfs.table.name : "n/a"; + return cfs.table.name; } - /** @return The column family name: only valid if created with a non-null CFS. */ + /** @return the column family name */ public String getColumnFamily() { - return cfs != null ? cfs.columnFamily : "n/a"; + return cfs.columnFamily; } public boolean shouldPurge(DecoratedKey key) { - return isMajor || (cfs != null && !cfs.isKeyInRemainingSSTables(key, sstables)); + return isMajor || !cfs.isKeyInRemainingSSTables(key, sstables); } public boolean needDeserialize() @@ -92,18 +89,50 @@ public class CompactionController public void invalidateCachedRow(DecoratedKey key) { - if (cfs != null) - cfs.invalidateCachedRow(key); + cfs.invalidateCachedRow(key); } public void removeDeletedInCache(DecoratedKey key) { - if (cfs != null) + ColumnFamily cachedRow = cfs.getRawCachedRow(key); + if (cachedRow != null) + ColumnFamilyStore.removeDeleted(cachedRow, gcBefore); + } + + public boolean isMajor() + { + return isMajor; + } + + /** + * @return an AbstractCompactedRow implementation to write the merged rows in question. + * + * If there is a single source row, the data is from a current-version sstable, + * and we aren't forcing deserialization for scrub, + * write it unchanged. Otherwise, we deserialize, purge tombstones, and + * reserialize in the latest version. + */ + public AbstractCompactedRow getCompactedRow(List<SSTableIdentityIterator> rows) + { + if (rows.size() == 1 && !needDeserialize()) + return new EchoedRow(rows.get(0)); + + long rowSize = 0; + for (SSTableIdentityIterator row : rows) + rowSize += row.dataSize; + + if (rowSize > DatabaseDescriptor.getInMemoryCompactionLimit()) { - ColumnFamily cachedRow = cfs.getRawCachedRow(key); - if (cachedRow != null) - ColumnFamilyStore.removeDeleted(cachedRow, gcBefore); + logger.info(String.format("Compacting large row %s (%d bytes) incrementally", + ByteBufferUtil.bytesToHex(rows.get(0).getKey().key), rowSize)); + return new LazilyCompactedRow(this, rows); } + return new PrecompactedRow(this, rows); } + /** convenience method for single-sstable compactions */ + public AbstractCompactedRow getCompactedRow(SSTableIdentityIterator row) + { + return getCompactedRow(Collections.singletonList(row)); + } } Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/CompactionIterator.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/CompactionIterator.java?rev=1126356&r1=1126355&r2=1126356&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/CompactionIterator.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/CompactionIterator.java Mon May 23 07:34:59 2011 @@ -38,7 +38,6 @@ import org.apache.cassandra.io.sstable.S import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.sstable.SSTableScanner; import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.ReducingIterator; @@ -122,7 +121,7 @@ implements Closeable, CompactionInfo.Hol try { - AbstractCompactedRow compactedRow = getCompactedRow(); + AbstractCompactedRow compactedRow = controller.getCompactedRow(rows); if (compactedRow.isEmpty()) { controller.invalidateCachedRow(compactedRow.key); @@ -151,23 +150,6 @@ implements Closeable, CompactionInfo.Hol } } - protected AbstractCompactedRow getCompactedRow() - { - long rowSize = 0; - for (SSTableIdentityIterator row : rows) - { - rowSize += row.dataSize; - } - - if (rowSize > DatabaseDescriptor.getInMemoryCompactionLimit()) - { - logger.info(String.format("Compacting large row %s (%d bytes) incrementally", - ByteBufferUtil.bytesToHex(rows.get(0).getKey().key), rowSize)); - return new LazilyCompactedRow(controller, rows); - } - return new PrecompactedRow(controller, rows); - } - private void throttle() { if (DatabaseDescriptor.getCompactionThroughputMbPerSec() < 1 || StorageService.instance.isBootstrapMode()) Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/LazilyCompactedRow.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/LazilyCompactedRow.java?rev=1126356&r1=1126355&r2=1126356&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/LazilyCompactedRow.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/LazilyCompactedRow.java Mon May 23 07:34:59 2011 @@ -93,15 +93,6 @@ public class LazilyCompactedRow extends public void write(DataOutput out) throws IOException { - if (rows.size() == 1 && !shouldPurge && !controller.needDeserialize()) - { - SSTableIdentityIterator row = rows.get(0); - assert row.dataSize > 0; - out.writeLong(row.dataSize); - row.echoData(out); - return; - } - DataOutputBuffer clockOut = new DataOutputBuffer(); ColumnFamily.serializer().serializeCFInfo(emptyColumnFamily, clockOut); Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1126356&r1=1126355&r2=1126356&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Mon May 23 07:34:59 2011 @@ -494,9 +494,7 @@ public class SSTableWriter extends SSTab long rows = 0L; DecoratedKey key; - CompactionController controller = CompactionController.getBasicController(true); - - long dfileLength = dfile.length(); + CompactionController controller = new CompactionController(cfs, Collections.<SSTableReader>emptyList(), Integer.MAX_VALUE, true); while (!dfile.isEOF()) { // read key @@ -506,17 +504,7 @@ public class SSTableWriter extends SSTab long dataSize = SSTableReader.readRowSize(dfile, desc); SSTableIdentityIterator iter = new SSTableIdentityIterator(cfs.metadata, dfile, key, dfile.getFilePointer(), dataSize, true); - AbstractCompactedRow row; - if (dataSize > DatabaseDescriptor.getInMemoryCompactionLimit()) - { - logger.info(String.format("Rebuilding post-streaming large counter row %s (%d bytes) incrementally", ByteBufferUtil.bytesToHex(key.key), dataSize)); - row = new LazilyCompactedRow(controller, Collections.singletonList(iter)); - } - else - { - row = new PrecompactedRow(controller, Collections.singletonList(iter)); - } - + AbstractCompactedRow row = controller.getCompactedRow(iter); updateCache(key, dataSize, row); rowSizes.add(dataSize); Modified: cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java?rev=1126356&r1=1126355&r2=1126356&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java (original) +++ cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java Mon May 23 07:34:59 2011 @@ -28,6 +28,7 @@ import java.nio.ByteBuffer; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.Collection; +import java.util.List; import java.util.concurrent.ExecutionException; import org.apache.cassandra.CleanupHelper; @@ -40,6 +41,7 @@ import org.apache.cassandra.db.RowMutati import org.apache.cassandra.db.Table; import org.apache.cassandra.db.filter.QueryPath; import org.apache.cassandra.io.sstable.IndexHelper; +import org.apache.cassandra.io.sstable.SSTableIdentityIterator; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.MappedFileDataInput; @@ -53,12 +55,11 @@ import org.junit.Test; public class LazilyCompactedRowTest extends CleanupHelper { - private void assertBytes(ColumnFamilyStore cfs, int gcBefore, boolean major) throws IOException + private void assertBytes(ColumnFamilyStore cfs, int gcBefore) throws IOException { Collection<SSTableReader> sstables = cfs.getSSTables(); - CompactionController controller = new CompactionController(cfs, sstables, major, gcBefore, false); - CompactionIterator ci1 = new PreCompactingIterator(sstables, controller); - CompactionIterator ci2 = new LazyCompactionIterator(sstables, controller); + CompactionIterator ci1 = new CompactionIterator(CompactionType.UNKNOWN, sstables, new PreCompactingController(cfs, sstables, gcBefore, false)); + CompactionIterator ci2 = new CompactionIterator(CompactionType.UNKNOWN, sstables, new LazilyCompactingController(cfs, sstables, gcBefore, false)); while (true) { @@ -129,12 +130,11 @@ public class LazilyCompactedRowTest exte } } - private void assertDigest(ColumnFamilyStore cfs, int gcBefore, boolean major) throws IOException, NoSuchAlgorithmException + private void assertDigest(ColumnFamilyStore cfs, int gcBefore) throws IOException, NoSuchAlgorithmException { Collection<SSTableReader> sstables = cfs.getSSTables(); - CompactionController controller = new CompactionController(cfs, sstables, major, gcBefore, false); - CompactionIterator ci1 = new PreCompactingIterator(sstables, controller); - CompactionIterator ci2 = new LazyCompactionIterator(sstables, controller); + CompactionIterator ci1 = new CompactionIterator(CompactionType.UNKNOWN, sstables, new PreCompactingController(cfs, sstables, gcBefore, false)); + CompactionIterator ci2 = new CompactionIterator(CompactionType.UNKNOWN, sstables, new LazilyCompactingController(cfs, sstables, gcBefore, false)); while (true) { @@ -170,8 +170,8 @@ public class LazilyCompactedRowTest exte rm.apply(); cfs.forceBlockingFlush(); - assertBytes(cfs, Integer.MAX_VALUE, true); - assertDigest(cfs, Integer.MAX_VALUE, true); + assertBytes(cfs, Integer.MAX_VALUE); + assertDigest(cfs, Integer.MAX_VALUE); } @Test @@ -189,8 +189,8 @@ public class LazilyCompactedRowTest exte rm.apply(); cfs.forceBlockingFlush(); - assertBytes(cfs, Integer.MAX_VALUE, true); - assertDigest(cfs, Integer.MAX_VALUE, true); + assertBytes(cfs, Integer.MAX_VALUE); + assertDigest(cfs, Integer.MAX_VALUE); } @Test @@ -211,8 +211,8 @@ public class LazilyCompactedRowTest exte assert out.getLength() > DatabaseDescriptor.getColumnIndexSize(); cfs.forceBlockingFlush(); - assertBytes(cfs, Integer.MAX_VALUE, true); - assertDigest(cfs, Integer.MAX_VALUE, true); + assertBytes(cfs, Integer.MAX_VALUE); + assertDigest(cfs, Integer.MAX_VALUE); } @Test @@ -232,8 +232,8 @@ public class LazilyCompactedRowTest exte rm.apply(); cfs.forceBlockingFlush(); - assertBytes(cfs, Integer.MAX_VALUE, true); - assertDigest(cfs, Integer.MAX_VALUE, true); + assertBytes(cfs, Integer.MAX_VALUE); + assertDigest(cfs, Integer.MAX_VALUE); } @Test @@ -254,8 +254,8 @@ public class LazilyCompactedRowTest exte rm.apply(); cfs.forceBlockingFlush(); - assertBytes(cfs, Integer.MAX_VALUE, true); - assertDigest(cfs, Integer.MAX_VALUE, true); + assertBytes(cfs, Integer.MAX_VALUE); + assertDigest(cfs, Integer.MAX_VALUE); } @Test @@ -277,8 +277,8 @@ public class LazilyCompactedRowTest exte cfs.forceBlockingFlush(); } - assertBytes(cfs, Integer.MAX_VALUE, true); - assertDigest(cfs, Integer.MAX_VALUE, true); + assertBytes(cfs, Integer.MAX_VALUE); + assertDigest(cfs, Integer.MAX_VALUE); } @Test @@ -299,35 +299,35 @@ public class LazilyCompactedRowTest exte rm.apply(); cfs.forceBlockingFlush(); - assertBytes(cfs, Integer.MAX_VALUE, true); + assertBytes(cfs, Integer.MAX_VALUE); } - private static class LazyCompactionIterator extends CompactionIterator + private static class LazilyCompactingController extends CompactionController { - public LazyCompactionIterator(Iterable<SSTableReader> sstables, CompactionController controller) throws IOException + public LazilyCompactingController(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int gcBefore, boolean forceDeserialize) { - super(CompactionType.UNKNOWN, sstables, controller); + super(cfs, sstables, gcBefore, forceDeserialize); } @Override - protected AbstractCompactedRow getCompactedRow() + public AbstractCompactedRow getCompactedRow(List<SSTableIdentityIterator> rows) { - return new LazilyCompactedRow(controller, rows); + return new LazilyCompactedRow(this, rows); } } - private static class PreCompactingIterator extends CompactionIterator + private static class PreCompactingController extends CompactionController { - public PreCompactingIterator(Iterable<SSTableReader> sstables, CompactionController controller) throws IOException + public PreCompactingController(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int gcBefore, boolean forceDeserialize) { - super(CompactionType.UNKNOWN, sstables, controller); + super(cfs, sstables, gcBefore, forceDeserialize); } @Override - protected AbstractCompactedRow getCompactedRow() + public AbstractCompactedRow getCompactedRow(List<SSTableIdentityIterator> rows) { - return new PrecompactedRow(controller, rows); + return new PrecompactedRow(this, rows); } } }