Author: slebresne
Date: Mon May 23 07:34:59 2011
New Revision: 1126356

URL: http://svn.apache.org/viewvc?rev=1126356&view=rev
Log:
Improve forceDeserialize/getCompactedRow encapsulation
patch by jbellis; reviewed by slebresne for CASSANDRA-<ticket>

Added:
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/EchoedRow.java
Modified:
    cassandra/branches/cassandra-0.8/CHANGES.txt
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CompactionManager.java
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/marshal/AbstractType.java
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/CompactionController.java
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/CompactionIterator.java
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/LazilyCompactedRow.java
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
    
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java

Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1126356&r1=1126355&r2=1126356&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Mon May 23 07:34:59 2011
@@ -14,6 +14,7 @@
    buffers again, especially on CL writes (CASSANDRA-2660)
  * add DROP INDEX support to CLI (CASSANDRA-2616)
  * don't perform HH to client-mode [storageproxy] nodes (CASSANDRA-2668)
+ * Improve forceDeserialize/getCompactedRow encapsulation (CASSANDRA-2659)
 
 
 0.8.0-final

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1126356&r1=1126355&r2=1126356&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
 Mon May 23 07:34:59 2011
@@ -968,9 +968,9 @@ public class ColumnFamilyStore implement
         data.markCompacted(sstables);
     }
 
-    boolean isCompleteSSTables(Collection<SSTableReader> sstables)
+    public boolean isCompleteSSTables(Set<SSTableReader> sstables)
     {
-        return data.getSSTables().equals(new HashSet<SSTableReader>(sstables));
+        return data.getSSTables().equals(sstables);
     }
 
     void replaceCompactedSSTables(Collection<SSTableReader> sstables, 
Iterable<SSTableReader> replacements)

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CompactionManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1126356&r1=1126355&r2=1126356&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CompactionManager.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CompactionManager.java
 Mon May 23 07:34:59 2011
@@ -18,13 +18,11 @@
 
 package org.apache.cassandra.db;
 
-import java.io.DataOutput;
 import java.io.File;
 import java.io.IOError;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.nio.ByteBuffer;
-import java.security.MessageDigest;
 import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.*;
@@ -128,7 +126,7 @@ public class CompactionManager implement
                     logger.debug("Checking to see if compaction of " + 
cfs.columnFamily + " would be useful");
                     Set<List<SSTableReader>> buckets = 
getBuckets(convertSSTablesToPairs(cfs.getSSTables()), 50L * 1024L * 1024L);
                     updateEstimateFor(cfs, buckets);
-                    int gcBefore = cfs.isIndex() ? Integer.MAX_VALUE : 
getDefaultGcBefore(cfs);
+                    int gcBefore = getDefaultGcBefore(cfs);
                     
                     for (List<SSTableReader> sstables : buckets)
                     {
@@ -529,11 +527,15 @@ public class CompactionManager implement
         for (SSTableReader sstable : sstables)
             assert sstable.descriptor.cfname.equals(cfs.columnFamily);
 
+        // compaction won't normally compact a single sstable, so if that's 
what we're doing
+        // it must have been requested manually by the user, which probably 
means he wants to force
+        // tombstone purge, which won't happen unless we force deserializing 
the rows.
+        boolean forceDeserialize = sstables.size() == 1;
+        CompactionController controller = new CompactionController(cfs, 
sstables, gcBefore, forceDeserialize);
         // new sstables from flush can be added during a compaction, but only 
the compaction can remove them,
         // so in our single-threaded compaction world this is a valid way of 
determining if we're compacting
         // all the sstables (that existed when we started)
-        boolean major = cfs.isCompleteSSTables(sstables);
-        CompactionType type = major
+        CompactionType type = controller.isMajor()
                             ? CompactionType.MAJOR
                             : CompactionType.MINOR;
         logger.info("Compacting {}: {}", type, sstables);
@@ -547,7 +549,6 @@ public class CompactionManager implement
           logger.debug("Expected bloom filter size : " + 
expectedBloomFilterSize);
 
         SSTableWriter writer;
-        CompactionController controller = new CompactionController(cfs, 
sstables, major, gcBefore, false);
         CompactionIterator ci = new CompactionIterator(type, sstables, 
controller); // retain a handle so we can call close()
         Iterator<AbstractCompactedRow> nni = new FilterIterator(ci, 
PredicateUtils.notNullPredicate());
         Map<DecoratedKey, Long> cachedKeys = new HashMap<DecoratedKey, Long>();
@@ -630,6 +631,7 @@ public class CompactionManager implement
         for (final SSTableReader sstable : sstables)
         {
             logger.info("Scrubbing " + sstable);
+            CompactionController controller = new CompactionController(cfs, 
Collections.singletonList(sstable), getDefaultGcBefore(cfs), true);
 
             // Calculate the expected compacted filesize
             String compactionFileLocation = 
cfs.table.getDataFileLocation(sstable.length());
@@ -708,7 +710,7 @@ public class CompactionManager implement
                     if (dataSize > dataFile.length())
                         throw new IOError(new IOException("Impossible row size 
" + dataSize));
                     SSTableIdentityIterator row = new 
SSTableIdentityIterator(sstable, dataFile, key, dataStart, dataSize, true);
-                    AbstractCompactedRow compactedRow = getCompactedRow(row, 
sstable.descriptor, true);
+                    AbstractCompactedRow compactedRow = 
controller.getCompactedRow(row);
                     if (compactedRow.isEmpty())
                     {
                         emptyRows++;
@@ -736,7 +738,7 @@ public class CompactionManager implement
                         try
                         {
                             SSTableIdentityIterator row = new 
SSTableIdentityIterator(sstable, dataFile, key, dataStartFromIndex, 
dataSizeFromIndex, true);
-                            AbstractCompactedRow compactedRow = 
getCompactedRow(row, sstable.descriptor, true);
+                            AbstractCompactedRow compactedRow = 
controller.getCompactedRow(row);
                             if (compactedRow.isEmpty())
                             {
                                 emptyRows++;
@@ -811,7 +813,9 @@ public class CompactionManager implement
 
         for (SSTableReader sstable : sstables)
         {
+            CompactionController controller = new CompactionController(cfs, 
Collections.singletonList(sstable), getDefaultGcBefore(cfs), false);
             long startTime = System.currentTimeMillis();
+
             long totalkeysWritten = 0;
 
             int expectedBloomFilterSize = 
Math.max(DatabaseDescriptor.getIndexInterval(),
@@ -841,7 +845,7 @@ public class CompactionManager implement
                         if (Range.isTokenInRanges(row.getKey().token, ranges))
                         {
                             writer = maybeCreateWriter(cfs, 
compactionFileLocation, expectedBloomFilterSize, writer, 
Collections.singletonList(sstable));
-                            writer.append(getCompactedRow(row, 
sstable.descriptor, false));
+                            writer.append(controller.getCompactedRow(row));
                             totalkeysWritten++;
                         }
                         else
@@ -906,21 +910,6 @@ public class CompactionManager implement
         }
     }
 
-    /**
-     * @return an AbstractCompactedRow implementation to write the row in 
question.
-     * If the data is from a current-version sstable, write it unchanged.  
Otherwise,
-     * re-serialize it in the latest version. The returned 
AbstractCompactedRow will not purge data.
-     */
-    private AbstractCompactedRow getCompactedRow(SSTableIdentityIterator row, 
Descriptor descriptor, boolean forceDeserialize)
-    {
-        if (descriptor.isLatestVersion && !forceDeserialize)
-            return new EchoedRow(row);
-
-        return row.dataSize > DatabaseDescriptor.getInMemoryCompactionLimit()
-               ? new 
LazilyCompactedRow(CompactionController.getBasicController(forceDeserialize), 
Arrays.asList(row))
-               : new 
PrecompactedRow(CompactionController.getBasicController(forceDeserialize), 
Arrays.asList(row));
-    }
-
     private SSTableWriter maybeCreateWriter(ColumnFamilyStore cfs, String 
compactionFileLocation, int expectedBloomFilterSize, SSTableWriter writer, 
Collection<SSTableReader> sstables)
             throws IOException
     {
@@ -1146,7 +1135,9 @@ public class CompactionManager implement
 
     private static int getDefaultGcBefore(ColumnFamilyStore cfs)
     {
-        return (int) (System.currentTimeMillis() / 1000) - 
cfs.metadata.getGcGraceSeconds();
+        return cfs.isIndex()
+               ? Integer.MAX_VALUE
+               : (int) (System.currentTimeMillis() / 1000) - 
cfs.metadata.getGcGraceSeconds();
     }
 
     private static class ValidationCompactionIterator extends 
CompactionIterator
@@ -1155,7 +1146,7 @@ public class CompactionManager implement
         {
             super(CompactionType.VALIDATION,
                   getCollatingIterator(cfs.getSSTables(), range),
-                  new CompactionController(cfs, cfs.getSSTables(), true, 
getDefaultGcBefore(cfs), false));
+                  new CompactionController(cfs, cfs.getSSTables(), 
getDefaultGcBefore(cfs), true));
         }
 
         protected static CollatingIterator 
getCollatingIterator(Iterable<SSTableReader> sstables, Range range) throws 
IOException
@@ -1277,40 +1268,6 @@ public class CompactionManager implement
         }
     }
 
-    private static class EchoedRow extends AbstractCompactedRow
-    {
-        private final SSTableIdentityIterator row;
-
-        public EchoedRow(SSTableIdentityIterator row)
-        {
-            super(row.getKey());
-            this.row = row;
-        }
-
-        public void write(DataOutput out) throws IOException
-        {
-            assert row.dataSize > 0;
-            out.writeLong(row.dataSize);
-            row.echoData(out);
-        }
-
-        public void update(MessageDigest digest)
-        {
-            // EchoedRow is not used in anti-entropy validation
-            throw new UnsupportedOperationException();
-        }
-
-        public boolean isEmpty()
-        {
-            return !row.hasNext();
-        }
-
-        public int columnCount()
-        {
-            return row.columnCount;
-        }
-    }
-
     private static class CleanupInfo implements CompactionInfo.Holder
     {
         private final SSTableReader sstable;

Added: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/EchoedRow.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/EchoedRow.java?rev=1126356&view=auto
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/EchoedRow.java
 (added)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/EchoedRow.java
 Mon May 23 07:34:59 2011
@@ -0,0 +1,46 @@
+package org.apache.cassandra.db;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.security.MessageDigest;
+
+import org.apache.cassandra.io.AbstractCompactedRow;
+import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
+
+/**
+ * A CompactedRow implementation that just echos the original row bytes 
without deserializing.
+ * Currently only used by cleanup.
+ */
+public class EchoedRow extends AbstractCompactedRow
+{
+    private final SSTableIdentityIterator row;
+
+    public EchoedRow(SSTableIdentityIterator row)
+    {
+        super(row.getKey());
+        this.row = row;
+    }
+
+    public void write(DataOutput out) throws IOException
+    {
+        assert row.dataSize > 0;
+        out.writeLong(row.dataSize);
+        row.echoData(out);
+    }
+
+    public void update(MessageDigest digest)
+    {
+        // EchoedRow is not used in anti-entropy validation
+        throw new UnsupportedOperationException();
+    }
+
+    public boolean isEmpty()
+    {
+        return !row.hasNext();
+    }
+
+    public int columnCount()
+    {
+        return row.columnCount;
+    }
+}

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/marshal/AbstractType.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/marshal/AbstractType.java?rev=1126356&r1=1126355&r2=1126356&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/marshal/AbstractType.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/marshal/AbstractType.java
 Mon May 23 07:34:59 2011
@@ -100,7 +100,7 @@ public abstract class AbstractType<T> im
 
     /** get a string representation of a particular type. */
     public abstract String toString(T t);
-    
+
     /** get a string representation of the bytes suitable for log messages */
     public abstract String getString(ByteBuffer bytes);
 

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/CompactionController.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/CompactionController.java?rev=1126356&r1=1126355&r2=1126356&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/CompactionController.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/CompactionController.java
 Mon May 23 07:34:59 2011
@@ -19,21 +19,28 @@
  */
 package org.apache.cassandra.io;
 
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.*;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.EchoedRow;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.utils.ByteBufferUtil;
 
 /**
  * Manage compaction options.
  */
 public class CompactionController
 {
+    private static Logger logger = 
LoggerFactory.getLogger(CompactionController.class);
+
     private final ColumnFamilyStore cfs;
     private final Set<SSTableReader> sstables;
     private final boolean forceDeserialize;
@@ -41,41 +48,31 @@ public class CompactionController
     public final boolean isMajor;
     public final int gcBefore;
 
-    private static final CompactionController basicController = new 
CompactionController(null, Collections.<SSTableReader>emptySet(), false, 
Integer.MAX_VALUE, false);
-    private static final CompactionController basicDeserializingController = 
new CompactionController(null, Collections.<SSTableReader>emptySet(), false, 
Integer.MAX_VALUE, true);
-
-    public CompactionController(ColumnFamilyStore cfs, 
Collection<SSTableReader> sstables, boolean isMajor, int gcBefore, boolean 
forceDeserialize)
+    public CompactionController(ColumnFamilyStore cfs, 
Collection<SSTableReader> sstables, int gcBefore, boolean forceDeserialize)
     {
+        assert cfs != null;
         this.cfs = cfs;
-        this.isMajor = isMajor;
         this.sstables = new HashSet<SSTableReader>(sstables);
         this.gcBefore = gcBefore;
         this.forceDeserialize = forceDeserialize;
+        isMajor = cfs.isCompleteSSTables(this.sstables);
     }
 
-    /**
-     * Returns a controller that never purge
-     */
-    public static CompactionController getBasicController(boolean 
forceDeserialize)
-    {
-        return forceDeserialize ? basicDeserializingController : 
basicController;
-    }
-
-    /** @return The keyspace name: only valid if created with a non-null CFS. 
*/
+    /** @return the keyspace name */
     public String getKeyspace()
     {
-        return cfs != null ? cfs.table.name : "n/a";
+        return cfs.table.name;
     }
 
-    /** @return The column family name: only valid if created with a non-null 
CFS. */
+    /** @return the column family name */
     public String getColumnFamily()
     {
-        return cfs != null ? cfs.columnFamily : "n/a";
+        return cfs.columnFamily;
     }
 
     public boolean shouldPurge(DecoratedKey key)
     {
-        return isMajor || (cfs != null && !cfs.isKeyInRemainingSSTables(key, 
sstables));
+        return isMajor || !cfs.isKeyInRemainingSSTables(key, sstables);
     }
 
     public boolean needDeserialize()
@@ -92,18 +89,50 @@ public class CompactionController
 
     public void invalidateCachedRow(DecoratedKey key)
     {
-        if (cfs != null)
-            cfs.invalidateCachedRow(key);
+        cfs.invalidateCachedRow(key);
     }
 
     public void removeDeletedInCache(DecoratedKey key)
     {
-        if (cfs != null)
+        ColumnFamily cachedRow = cfs.getRawCachedRow(key);
+        if (cachedRow != null)
+            ColumnFamilyStore.removeDeleted(cachedRow, gcBefore);
+    }
+
+    public boolean isMajor()
+    {
+        return isMajor;
+    }
+
+    /**
+     * @return an AbstractCompactedRow implementation to write the merged rows 
in question.
+     *
+     * If there is a single source row, the data is from a current-version 
sstable,
+     * and we aren't forcing deserialization for scrub,
+     * write it unchanged.  Otherwise, we deserialize, purge tombstones, and
+     * reserialize in the latest version.
+     */
+    public AbstractCompactedRow getCompactedRow(List<SSTableIdentityIterator> 
rows)
+    {
+        if (rows.size() == 1 && !needDeserialize())
+            return new EchoedRow(rows.get(0));
+
+        long rowSize = 0;
+        for (SSTableIdentityIterator row : rows)
+            rowSize += row.dataSize;
+
+        if (rowSize > DatabaseDescriptor.getInMemoryCompactionLimit())
         {
-            ColumnFamily cachedRow = cfs.getRawCachedRow(key);
-            if (cachedRow != null)
-                ColumnFamilyStore.removeDeleted(cachedRow, gcBefore);
+            logger.info(String.format("Compacting large row %s (%d bytes) 
incrementally",
+                                      
ByteBufferUtil.bytesToHex(rows.get(0).getKey().key), rowSize));
+            return new LazilyCompactedRow(this, rows);
         }
+        return new PrecompactedRow(this, rows);
     }
 
+    /** convenience method for single-sstable compactions */
+    public AbstractCompactedRow getCompactedRow(SSTableIdentityIterator row)
+    {
+        return getCompactedRow(Collections.singletonList(row));
+    }
 }

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/CompactionIterator.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/CompactionIterator.java?rev=1126356&r1=1126355&r2=1126356&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/CompactionIterator.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/CompactionIterator.java
 Mon May 23 07:34:59 2011
@@ -38,7 +38,6 @@ import org.apache.cassandra.io.sstable.S
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableScanner;
 import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.ReducingIterator;
 
@@ -122,7 +121,7 @@ implements Closeable, CompactionInfo.Hol
 
         try
         {
-            AbstractCompactedRow compactedRow = getCompactedRow();
+            AbstractCompactedRow compactedRow = 
controller.getCompactedRow(rows);
             if (compactedRow.isEmpty())
             {
                 controller.invalidateCachedRow(compactedRow.key);
@@ -151,23 +150,6 @@ implements Closeable, CompactionInfo.Hol
         }
     }
 
-    protected AbstractCompactedRow getCompactedRow()
-    {
-        long rowSize = 0;
-        for (SSTableIdentityIterator row : rows)
-        {
-            rowSize += row.dataSize;
-        }
-
-        if (rowSize > DatabaseDescriptor.getInMemoryCompactionLimit())
-        {
-            logger.info(String.format("Compacting large row %s (%d bytes) 
incrementally",
-                                      
ByteBufferUtil.bytesToHex(rows.get(0).getKey().key), rowSize));
-            return new LazilyCompactedRow(controller, rows);
-        }
-        return new PrecompactedRow(controller, rows);
-    }
-
     private void throttle()
     {
         if (DatabaseDescriptor.getCompactionThroughputMbPerSec() < 1 || 
StorageService.instance.isBootstrapMode())

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/LazilyCompactedRow.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/LazilyCompactedRow.java?rev=1126356&r1=1126355&r2=1126356&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/LazilyCompactedRow.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/LazilyCompactedRow.java
 Mon May 23 07:34:59 2011
@@ -93,15 +93,6 @@ public class LazilyCompactedRow extends 
 
     public void write(DataOutput out) throws IOException
     {
-        if (rows.size() == 1 && !shouldPurge && !controller.needDeserialize())
-        {
-            SSTableIdentityIterator row = rows.get(0);
-            assert row.dataSize > 0;
-            out.writeLong(row.dataSize);
-            row.echoData(out);
-            return;
-        }
-
         DataOutputBuffer clockOut = new DataOutputBuffer();
         ColumnFamily.serializer().serializeCFInfo(emptyColumnFamily, clockOut);
 

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1126356&r1=1126355&r2=1126356&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
 Mon May 23 07:34:59 2011
@@ -494,9 +494,7 @@ public class SSTableWriter extends SSTab
             long rows = 0L;
             DecoratedKey key;
 
-            CompactionController controller = 
CompactionController.getBasicController(true);
-
-            long dfileLength = dfile.length();
+            CompactionController controller = new CompactionController(cfs, 
Collections.<SSTableReader>emptyList(), Integer.MAX_VALUE, true);
             while (!dfile.isEOF())
             {
                 // read key
@@ -506,17 +504,7 @@ public class SSTableWriter extends SSTab
                 long dataSize = SSTableReader.readRowSize(dfile, desc);
                 SSTableIdentityIterator iter = new 
SSTableIdentityIterator(cfs.metadata, dfile, key, dfile.getFilePointer(), 
dataSize, true);
 
-                AbstractCompactedRow row;
-                if (dataSize > DatabaseDescriptor.getInMemoryCompactionLimit())
-                {
-                    logger.info(String.format("Rebuilding post-streaming large 
counter row %s (%d bytes) incrementally", ByteBufferUtil.bytesToHex(key.key), 
dataSize));
-                    row = new LazilyCompactedRow(controller, 
Collections.singletonList(iter));
-                }
-                else
-                {
-                    row = new PrecompactedRow(controller, 
Collections.singletonList(iter));
-                }
-
+                AbstractCompactedRow row = controller.getCompactedRow(iter);
                 updateCache(key, dataSize, row);
 
                 rowSizes.add(dataSize);

Modified: 
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java?rev=1126356&r1=1126355&r2=1126356&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
 Mon May 23 07:34:59 2011
@@ -28,6 +28,7 @@ import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.Collection;
+import java.util.List;
 import java.util.concurrent.ExecutionException;
 
 import org.apache.cassandra.CleanupHelper;
@@ -40,6 +41,7 @@ import org.apache.cassandra.db.RowMutati
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.io.sstable.IndexHelper;
+import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.MappedFileDataInput;
@@ -53,12 +55,11 @@ import org.junit.Test;
 
 public class LazilyCompactedRowTest extends CleanupHelper
 {
-    private void assertBytes(ColumnFamilyStore cfs, int gcBefore, boolean 
major) throws IOException
+    private void assertBytes(ColumnFamilyStore cfs, int gcBefore) throws 
IOException
     {
         Collection<SSTableReader> sstables = cfs.getSSTables();
-        CompactionController controller = new CompactionController(cfs, 
sstables, major, gcBefore, false);
-        CompactionIterator ci1 = new PreCompactingIterator(sstables, 
controller);
-        CompactionIterator ci2 = new LazyCompactionIterator(sstables, 
controller);
+        CompactionIterator ci1 = new 
CompactionIterator(CompactionType.UNKNOWN, sstables, new 
PreCompactingController(cfs, sstables, gcBefore, false));
+        CompactionIterator ci2 = new 
CompactionIterator(CompactionType.UNKNOWN, sstables, new 
LazilyCompactingController(cfs, sstables, gcBefore, false));
 
         while (true)
         {
@@ -129,12 +130,11 @@ public class LazilyCompactedRowTest exte
         }
     }
     
-    private void assertDigest(ColumnFamilyStore cfs, int gcBefore, boolean 
major) throws IOException, NoSuchAlgorithmException
+    private void assertDigest(ColumnFamilyStore cfs, int gcBefore) throws 
IOException, NoSuchAlgorithmException
     {
         Collection<SSTableReader> sstables = cfs.getSSTables();
-        CompactionController controller = new CompactionController(cfs, 
sstables, major, gcBefore, false);
-        CompactionIterator ci1 = new PreCompactingIterator(sstables, 
controller);
-        CompactionIterator ci2 = new LazyCompactionIterator(sstables, 
controller);
+        CompactionIterator ci1 = new 
CompactionIterator(CompactionType.UNKNOWN, sstables, new 
PreCompactingController(cfs, sstables, gcBefore, false));
+        CompactionIterator ci2 = new 
CompactionIterator(CompactionType.UNKNOWN, sstables, new 
LazilyCompactingController(cfs, sstables, gcBefore, false));
 
         while (true)
         {
@@ -170,8 +170,8 @@ public class LazilyCompactedRowTest exte
         rm.apply();
         cfs.forceBlockingFlush();
 
-        assertBytes(cfs, Integer.MAX_VALUE, true);
-        assertDigest(cfs, Integer.MAX_VALUE, true);
+        assertBytes(cfs, Integer.MAX_VALUE);
+        assertDigest(cfs, Integer.MAX_VALUE);
     }
 
     @Test
@@ -189,8 +189,8 @@ public class LazilyCompactedRowTest exte
         rm.apply();
         cfs.forceBlockingFlush();
 
-        assertBytes(cfs, Integer.MAX_VALUE, true);
-        assertDigest(cfs, Integer.MAX_VALUE, true);
+        assertBytes(cfs, Integer.MAX_VALUE);
+        assertDigest(cfs, Integer.MAX_VALUE);
     }
 
     @Test
@@ -211,8 +211,8 @@ public class LazilyCompactedRowTest exte
         assert out.getLength() > DatabaseDescriptor.getColumnIndexSize();
         cfs.forceBlockingFlush();
 
-        assertBytes(cfs, Integer.MAX_VALUE, true);
-        assertDigest(cfs, Integer.MAX_VALUE, true);
+        assertBytes(cfs, Integer.MAX_VALUE);
+        assertDigest(cfs, Integer.MAX_VALUE);
     }
 
     @Test
@@ -232,8 +232,8 @@ public class LazilyCompactedRowTest exte
         rm.apply();
         cfs.forceBlockingFlush();
 
-        assertBytes(cfs, Integer.MAX_VALUE, true);
-        assertDigest(cfs, Integer.MAX_VALUE, true);
+        assertBytes(cfs, Integer.MAX_VALUE);
+        assertDigest(cfs, Integer.MAX_VALUE);
     }
 
     @Test
@@ -254,8 +254,8 @@ public class LazilyCompactedRowTest exte
         rm.apply();
         cfs.forceBlockingFlush();
 
-        assertBytes(cfs, Integer.MAX_VALUE, true);
-        assertDigest(cfs, Integer.MAX_VALUE, true);
+        assertBytes(cfs, Integer.MAX_VALUE);
+        assertDigest(cfs, Integer.MAX_VALUE);
     }
 
     @Test
@@ -277,8 +277,8 @@ public class LazilyCompactedRowTest exte
             cfs.forceBlockingFlush();
         }
 
-        assertBytes(cfs, Integer.MAX_VALUE, true);
-        assertDigest(cfs, Integer.MAX_VALUE, true);
+        assertBytes(cfs, Integer.MAX_VALUE);
+        assertDigest(cfs, Integer.MAX_VALUE);
     }
 
     @Test
@@ -299,35 +299,35 @@ public class LazilyCompactedRowTest exte
         rm.apply();
         cfs.forceBlockingFlush();
 
-        assertBytes(cfs, Integer.MAX_VALUE, true);
+        assertBytes(cfs, Integer.MAX_VALUE);
     }
 
 
-    private static class LazyCompactionIterator extends CompactionIterator
+    private static class LazilyCompactingController extends 
CompactionController
     {
-        public LazyCompactionIterator(Iterable<SSTableReader> sstables, 
CompactionController controller) throws IOException
+        public LazilyCompactingController(ColumnFamilyStore cfs, 
Collection<SSTableReader> sstables, int gcBefore, boolean forceDeserialize)
         {
-            super(CompactionType.UNKNOWN, sstables, controller);
+            super(cfs, sstables, gcBefore, forceDeserialize);
         }
 
         @Override
-        protected AbstractCompactedRow getCompactedRow()
+        public AbstractCompactedRow 
getCompactedRow(List<SSTableIdentityIterator> rows)
         {
-            return new LazilyCompactedRow(controller, rows);
+            return new LazilyCompactedRow(this, rows);
         }
     }
 
-    private static class PreCompactingIterator extends CompactionIterator
+    private static class PreCompactingController extends CompactionController
     {
-        public PreCompactingIterator(Iterable<SSTableReader> sstables, 
CompactionController controller) throws IOException
+        public PreCompactingController(ColumnFamilyStore cfs, 
Collection<SSTableReader> sstables, int gcBefore, boolean forceDeserialize)
         {
-            super(CompactionType.UNKNOWN, sstables, controller);
+            super(cfs, sstables, gcBefore, forceDeserialize);
         }
 
         @Override
-        protected AbstractCompactedRow getCompactedRow()
+        public AbstractCompactedRow 
getCompactedRow(List<SSTableIdentityIterator> rows)
         {
-            return new PrecompactedRow(controller, rows);
+            return new PrecompactedRow(this, rows);
         }
     }
 }


Reply via email to