Repository: cassandra
Updated Branches:
  refs/heads/trunk ae063e806 -> ef5bbedd6


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java 
b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
index 0c7ee59..ebacf34 100644
--- a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
+++ b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
@@ -20,23 +20,52 @@ package org.apache.cassandra.db;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 
+import com.google.common.primitives.Ints;
+
 import org.apache.cassandra.Util;
+import org.apache.cassandra.cache.IMeasurableMemory;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.columniterator.AbstractSSTableIterator;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.db.rows.AbstractUnfilteredRowIterator;
+import org.apache.cassandra.db.rows.BTreeRow;
+import org.apache.cassandra.db.rows.BufferCell;
+import org.apache.cassandra.db.rows.ColumnData;
 import org.apache.cassandra.db.rows.EncodingStats;
 import org.apache.cassandra.db.partitions.*;
-import org.apache.cassandra.io.sstable.IndexHelper;
+import org.apache.cassandra.db.rows.RangeTombstoneMarker;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.Unfiltered;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.rows.UnfilteredSerializer;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.sstable.IndexInfo;
+import org.apache.cassandra.io.sstable.format.SSTableFlushObserver;
+import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.io.sstable.format.big.BigFormat;
 import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.SegmentedFile;
 import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.serializers.LongSerializer;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.btree.BTree;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -46,68 +75,330 @@ import static junit.framework.Assert.assertTrue;
 
 public class RowIndexEntryTest extends CQLTester
 {
-    private static final List<AbstractType<?>> clusterTypes = 
Collections.<AbstractType<?>>singletonList(LongType.instance);
+    private static final List<AbstractType<?>> clusterTypes = 
Collections.singletonList(LongType.instance);
     private static final ClusteringComparator comp = new 
ClusteringComparator(clusterTypes);
-    private static ClusteringPrefix cn(long l)
+
+    private static final byte[] dummy_100k = new byte[100000];
+
+    private static Clustering cn(long l)
     {
         return Util.clustering(comp, l);
     }
 
     @Test
-    public void testArtificialIndexOf() throws IOException
+    public void testC11206AgainstPreviousArray() throws Exception
+    {
+        DatabaseDescriptor.setColumnIndexCacheSize(99999);
+        testC11206AgainstPrevious();
+    }
+
+    @Test
+    public void testC11206AgainstPreviousShallow() throws Exception
+    {
+        DatabaseDescriptor.setColumnIndexCacheSize(0);
+        testC11206AgainstPrevious();
+    }
+
+    private static void testC11206AgainstPrevious() throws Exception
+    {
+        // partition without IndexInfo
+        try (DoubleSerializer doubleSerializer = new DoubleSerializer())
+        {
+            doubleSerializer.build(null, partitionKey(42L),
+                                   Collections.singletonList(cn(42)),
+                                   0L);
+            assertEquals(doubleSerializer.rieOldSerialized, 
doubleSerializer.rieNewSerialized);
+        }
+
+        // partition with multiple IndexInfo
+        try (DoubleSerializer doubleSerializer = new DoubleSerializer())
+        {
+            doubleSerializer.build(null, partitionKey(42L),
+                                   Arrays.asList(cn(42), cn(43), cn(44)),
+                                   0L);
+            assertEquals(doubleSerializer.rieOldSerialized, 
doubleSerializer.rieNewSerialized);
+        }
+
+        // partition with multiple IndexInfo
+        try (DoubleSerializer doubleSerializer = new DoubleSerializer())
+        {
+            doubleSerializer.build(null, partitionKey(42L),
+                                   Arrays.asList(cn(42), cn(43), cn(44), 
cn(45), cn(46), cn(47), cn(48), cn(49), cn(50), cn(51)),
+                                   0L);
+            assertEquals(doubleSerializer.rieOldSerialized, 
doubleSerializer.rieNewSerialized);
+        }
+    }
+
+    private static DecoratedKey partitionKey(long l)
+    {
+        ByteBuffer key = LongSerializer.instance.serialize(l);
+        Token token = Murmur3Partitioner.instance.getToken(key);
+        return new BufferDecoratedKey(token, key);
+    }
+
+    private static class DoubleSerializer implements AutoCloseable
     {
         CFMetaData cfMeta = CFMetaData.compile("CREATE TABLE pipe.dev_null (pk 
bigint, ck bigint, val text, PRIMARY KEY(pk, ck))", "foo");
+        Version version = BigFormat.latestVersion;
 
         DeletionTime deletionInfo = new 
DeletionTime(FBUtilities.timestampMicros(), FBUtilities.nowInSeconds());
+        LivenessInfo primaryKeyLivenessInfo = LivenessInfo.EMPTY;
+        Row.Deletion deletion = Row.Deletion.LIVE;
 
         SerializationHeader header = new SerializationHeader(true, cfMeta, 
cfMeta.partitionColumns(), EncodingStats.NO_STATS);
-        IndexHelper.IndexInfo.Serializer indexSerializer = new 
IndexHelper.IndexInfo.Serializer(cfMeta, BigFormat.latestVersion, header);
-
-        DataOutputBuffer dob = new DataOutputBuffer();
-        dob.writeUnsignedVInt(0);
-        DeletionTime.serializer.serialize(DeletionTime.LIVE, dob);
-        dob.writeUnsignedVInt(3);
-        int off0 = dob.getLength();
-        indexSerializer.serialize(new IndexHelper.IndexInfo(cn(0L), cn(5L), 0, 
0, deletionInfo), dob);
-        int off1 = dob.getLength();
-        indexSerializer.serialize(new IndexHelper.IndexInfo(cn(10L), cn(15L), 
0, 0, deletionInfo), dob);
-        int off2 = dob.getLength();
-        indexSerializer.serialize(new IndexHelper.IndexInfo(cn(20L), cn(25L), 
0, 0, deletionInfo), dob);
-        dob.writeInt(off0);
-        dob.writeInt(off1);
-        dob.writeInt(off2);
-
-        @SuppressWarnings("resource") DataOutputBuffer dobRie = new 
DataOutputBuffer();
-        dobRie.writeUnsignedVInt(42L);
-        dobRie.writeUnsignedVInt(dob.getLength());
-        dobRie.write(dob.buffer());
-
-        ByteBuffer buf = dobRie.buffer();
-
-        RowIndexEntry<IndexHelper.IndexInfo> rie = new 
RowIndexEntry.Serializer(cfMeta, BigFormat.latestVersion, 
header).deserialize(new DataInputBuffer(buf, false));
-
-        Assert.assertEquals(42L, rie.position);
-
-        Assert.assertEquals(0, IndexHelper.indexFor(cn(-1L), 
rie.columnsIndex(), comp, false, -1));
-        Assert.assertEquals(0, IndexHelper.indexFor(cn(5L), 
rie.columnsIndex(), comp, false, -1));
-        Assert.assertEquals(1, IndexHelper.indexFor(cn(12L), 
rie.columnsIndex(), comp, false, -1));
-        Assert.assertEquals(2, IndexHelper.indexFor(cn(17L), 
rie.columnsIndex(), comp, false, -1));
-        Assert.assertEquals(3, IndexHelper.indexFor(cn(100L), 
rie.columnsIndex(), comp, false, -1));
-        Assert.assertEquals(3, IndexHelper.indexFor(cn(100L), 
rie.columnsIndex(), comp, false, 0));
-        Assert.assertEquals(3, IndexHelper.indexFor(cn(100L), 
rie.columnsIndex(), comp, false, 1));
-        Assert.assertEquals(3, IndexHelper.indexFor(cn(100L), 
rie.columnsIndex(), comp, false, 2));
-        Assert.assertEquals(3, IndexHelper.indexFor(cn(100L), 
rie.columnsIndex(), comp, false, 3));
-
-        Assert.assertEquals(-1, IndexHelper.indexFor(cn(-1L), 
rie.columnsIndex(), comp, true, -1));
-        Assert.assertEquals(0, IndexHelper.indexFor(cn(5L), 
rie.columnsIndex(), comp, true, 3));
-        Assert.assertEquals(0, IndexHelper.indexFor(cn(5L), 
rie.columnsIndex(), comp, true, 2));
-        Assert.assertEquals(1, IndexHelper.indexFor(cn(17L), 
rie.columnsIndex(), comp, true, 3));
-        Assert.assertEquals(2, IndexHelper.indexFor(cn(100L), 
rie.columnsIndex(), comp, true, 3));
-        Assert.assertEquals(2, IndexHelper.indexFor(cn(100L), 
rie.columnsIndex(), comp, true, 4));
-        Assert.assertEquals(1, IndexHelper.indexFor(cn(12L), 
rie.columnsIndex(), comp, true, 3));
-        Assert.assertEquals(1, IndexHelper.indexFor(cn(12L), 
rie.columnsIndex(), comp, true, 2));
-        Assert.assertEquals(1, IndexHelper.indexFor(cn(100L), 
rie.columnsIndex(), comp, true, 1));
-        Assert.assertEquals(2, IndexHelper.indexFor(cn(100L), 
rie.columnsIndex(), comp, true, 2));
+
+        // create C-11206 + old serializer instances
+        RowIndexEntry.IndexSerializer rieSerializer = new 
RowIndexEntry.Serializer(cfMeta, version, header);
+        Pre_C_11206_RowIndexEntry.Serializer oldSerializer = new 
Pre_C_11206_RowIndexEntry.Serializer(cfMeta, version, header);
+
+        @SuppressWarnings({ "resource", "IOResourceOpenedButNotSafelyClosed" })
+        final DataOutputBuffer rieOutput = new DataOutputBuffer(1024);
+        @SuppressWarnings({ "resource", "IOResourceOpenedButNotSafelyClosed" })
+        final DataOutputBuffer oldOutput = new DataOutputBuffer(1024);
+
+        final SequentialWriter dataWriterNew;
+        final SequentialWriter dataWriterOld;
+        final org.apache.cassandra.db.ColumnIndex columnIndex;
+
+        RowIndexEntry rieNew;
+        ByteBuffer rieNewSerialized;
+        Pre_C_11206_RowIndexEntry rieOld;
+        ByteBuffer rieOldSerialized;
+
+        DoubleSerializer() throws IOException
+        {
+            File f = File.createTempFile("RowIndexEntryTest-", "db");
+            dataWriterNew = new SequentialWriter(f, 1024, BufferType.ON_HEAP);
+            columnIndex = new org.apache.cassandra.db.ColumnIndex(header, 
dataWriterNew, version, Collections.emptyList(),
+                                                                  
rieSerializer.indexInfoSerializer());
+
+            f = File.createTempFile("RowIndexEntryTest-", "db");
+            dataWriterOld = new SequentialWriter(f, 1024, BufferType.ON_HEAP);
+        }
+
+        public void close() throws Exception
+        {
+            dataWriterNew.close();
+            dataWriterOld.close();
+        }
+
+        void build(Row staticRow, DecoratedKey partitionKey,
+                   Collection<Clustering> clusterings, long startPosition) 
throws IOException
+        {
+
+            Iterator<Clustering> clusteringIter = clusterings.iterator();
+            columnIndex.buildRowIndex(makeRowIter(staticRow, partitionKey, 
clusteringIter, dataWriterNew));
+            rieNew = RowIndexEntry.create(startPosition, 0L,
+                                          deletionInfo, 
columnIndex.headerLength, columnIndex.columnIndexCount,
+                                          
columnIndex.indexInfoSerializedSize(),
+                                          columnIndex.indexSamples, 
columnIndex.offsets(),
+                                          rieSerializer.indexInfoSerializer());
+            rieSerializer.serialize(rieNew, rieOutput, columnIndex.buffer());
+            rieNewSerialized = rieOutput.buffer().duplicate();
+
+            Iterator<Clustering> clusteringIter2 = clusterings.iterator();
+            ColumnIndex columnIndex = 
RowIndexEntryTest.ColumnIndex.writeAndBuildIndex(makeRowIter(staticRow, 
partitionKey, clusteringIter2, dataWriterOld),
+                                                                               
        dataWriterOld, header, Collections.emptySet(), BigFormat.latestVersion);
+            rieOld = Pre_C_11206_RowIndexEntry.create(startPosition, 
deletionInfo, columnIndex);
+            oldSerializer.serialize(rieOld, oldOutput);
+            rieOldSerialized = oldOutput.buffer().duplicate();
+        }
+
+        private AbstractUnfilteredRowIterator makeRowIter(Row staticRow, 
DecoratedKey partitionKey,
+                                                          Iterator<Clustering> 
clusteringIter, SequentialWriter dataWriter)
+        {
+            return new AbstractUnfilteredRowIterator(cfMeta, partitionKey, 
deletionInfo, cfMeta.partitionColumns(),
+                                                     staticRow, false, new 
EncodingStats(0, 0, 0))
+            {
+                protected Unfiltered computeNext()
+                {
+                    if (!clusteringIter.hasNext())
+                        return endOfData();
+                    try
+                    {
+                        // write some fake bytes to the data file to force 
writing the IndexInfo object
+                        dataWriter.write(dummy_100k);
+                    }
+                    catch (IOException e)
+                    {
+                        throw new RuntimeException(e);
+                    }
+                    return buildRow(clusteringIter.next());
+                }
+            };
+        }
+
+        private Unfiltered buildRow(Clustering clustering)
+        {
+            BTree.Builder<ColumnData> builder = 
BTree.builder(ColumnData.comparator);
+            builder.add(BufferCell.live(cfMeta.allColumns().iterator().next(),
+                                        1L,
+                                        ByteBuffer.allocate(0)));
+            return BTreeRow.create(clustering, primaryKeyLivenessInfo, 
deletion, builder.build());
+        }
+    }
+
+    /**
+     * Pre C-11206 code.
+     */
+    static final class ColumnIndex
+    {
+        final long partitionHeaderLength;
+        final List<IndexInfo> columnsIndex;
+
+        private static final ColumnIndex EMPTY = new ColumnIndex(-1, 
Collections.emptyList());
+
+        private ColumnIndex(long partitionHeaderLength, List<IndexInfo> 
columnsIndex)
+        {
+            assert columnsIndex != null;
+
+            this.partitionHeaderLength = partitionHeaderLength;
+            this.columnsIndex = columnsIndex;
+        }
+
+        static ColumnIndex writeAndBuildIndex(UnfilteredRowIterator iterator,
+                                              SequentialWriter output,
+                                              SerializationHeader header,
+                                              Collection<SSTableFlushObserver> 
observers,
+                                              Version version) throws 
IOException
+        {
+            assert !iterator.isEmpty() && version.storeRows();
+
+            Builder builder = new Builder(iterator, output, header, observers, 
version.correspondingMessagingVersion());
+            return builder.build();
+        }
+
+        public static ColumnIndex nothing()
+        {
+            return EMPTY;
+        }
+
+        /**
+         * Help to create an index for a column family based on size of 
columns,
+         * and write said columns to disk.
+         */
+        private static class Builder
+        {
+            private final UnfilteredRowIterator iterator;
+            private final SequentialWriter writer;
+            private final SerializationHeader header;
+            private final int version;
+
+            private final List<IndexInfo> columnsIndex = new ArrayList<>();
+            private final long initialPosition;
+            private long headerLength = -1;
+
+            private long startPosition = -1;
+
+            private int written;
+            private long previousRowStart;
+
+            private ClusteringPrefix firstClustering;
+            private ClusteringPrefix lastClustering;
+
+            private DeletionTime openMarker;
+
+            private final Collection<SSTableFlushObserver> observers;
+
+            Builder(UnfilteredRowIterator iterator,
+                           SequentialWriter writer,
+                           SerializationHeader header,
+                           Collection<SSTableFlushObserver> observers,
+                           int version)
+            {
+                this.iterator = iterator;
+                this.writer = writer;
+                this.header = header;
+                this.version = version;
+                this.observers = observers == null ? Collections.emptyList() : 
observers;
+                this.initialPosition = writer.position();
+            }
+
+            private void writePartitionHeader(UnfilteredRowIterator iterator) 
throws IOException
+            {
+                
ByteBufferUtil.writeWithShortLength(iterator.partitionKey().getKey(), writer);
+                
DeletionTime.serializer.serialize(iterator.partitionLevelDeletion(), writer);
+                if (header.hasStatic())
+                    
UnfilteredSerializer.serializer.serializeStaticRow(iterator.staticRow(), 
header, writer, version);
+            }
+
+            public ColumnIndex build() throws IOException
+            {
+                writePartitionHeader(iterator);
+                this.headerLength = writer.position() - initialPosition;
+
+                while (iterator.hasNext())
+                    add(iterator.next());
+
+                return close();
+            }
+
+            private long currentPosition()
+            {
+                return writer.position() - initialPosition;
+            }
+
+            private void addIndexBlock()
+            {
+                IndexInfo cIndexInfo = new IndexInfo(firstClustering,
+                                                     lastClustering,
+                                                     startPosition,
+                                                                             
currentPosition() - startPosition,
+                                                     openMarker);
+                columnsIndex.add(cIndexInfo);
+                firstClustering = null;
+            }
+
+            private void add(Unfiltered unfiltered) throws IOException
+            {
+                long pos = currentPosition();
+
+                if (firstClustering == null)
+                {
+                    // Beginning of an index block. Remember the start and 
position
+                    firstClustering = unfiltered.clustering();
+                    startPosition = pos;
+                }
+
+                UnfilteredSerializer.serializer.serialize(unfiltered, header, 
writer, pos - previousRowStart, version);
+
+                // notify observers about each new row
+                if (!observers.isEmpty())
+                    observers.forEach((o) -> 
o.nextUnfilteredCluster(unfiltered));
+
+                lastClustering = unfiltered.clustering();
+                previousRowStart = pos;
+                ++written;
+
+                if (unfiltered.kind() == 
Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
+                {
+                    RangeTombstoneMarker marker = 
(RangeTombstoneMarker)unfiltered;
+                    openMarker = marker.isOpen(false) ? 
marker.openDeletionTime(false) : null;
+                }
+
+                // if we hit the column index size that we have to index 
after, go ahead and index it.
+                if (currentPosition() - startPosition >= 
DatabaseDescriptor.getColumnIndexSize())
+                    addIndexBlock();
+
+            }
+
+            private ColumnIndex close() throws IOException
+            {
+                UnfilteredSerializer.serializer.writeEndOfPartition(writer);
+
+                // It's possible we add no rows, just a top level deletion
+                if (written == 0)
+                    return RowIndexEntryTest.ColumnIndex.EMPTY;
+
+                // the last column may have fallen on an index boundary 
already.  if not, index it explicitly.
+                if (firstClustering != null)
+                    addIndexBlock();
+
+                // we should always have at least one computed index block, 
but we only write it out if there is more than that.
+                assert !columnsIndex.isEmpty() && headerLength >= 0;
+                return new ColumnIndex(headerLength, columnsIndex);
+            }
+        }
     }
 
     @Test
@@ -116,11 +407,11 @@ public class RowIndexEntryTest extends CQLTester
         String tableName = createTable("CREATE TABLE %s (a int, b text, c int, 
PRIMARY KEY(a, b))");
         ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName);
 
-        final RowIndexEntry simple = new RowIndexEntry(123);
+        Pre_C_11206_RowIndexEntry simple = new Pre_C_11206_RowIndexEntry(123);
 
         DataOutputBuffer buffer = new DataOutputBuffer();
         SerializationHeader header = new SerializationHeader(true, 
cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS);
-        RowIndexEntry.Serializer serializer = new 
RowIndexEntry.Serializer(cfs.metadata, BigFormat.latestVersion, header);
+        Pre_C_11206_RowIndexEntry.Serializer serializer = new 
Pre_C_11206_RowIndexEntry.Serializer(cfs.metadata, BigFormat.latestVersion, 
header);
 
         serializer.serialize(simple, buffer);
 
@@ -128,16 +419,16 @@ public class RowIndexEntryTest extends CQLTester
 
         // write enough rows to ensure we get a few column index entries
         for (int i = 0; i <= DatabaseDescriptor.getColumnIndexSize() / 4; i++)
-            execute("INSERT INTO %s (a, b, c) VALUES (?, ?, ?)", 0, "" + i, i);
+            execute("INSERT INTO %s (a, b, c) VALUES (?, ?, ?)", 0, 
String.valueOf(i), i);
 
         ImmutableBTreePartition partition = 
Util.getOnlyPartitionUnfiltered(Util.cmd(cfs).build());
 
         File tempFile = File.createTempFile("row_index_entry_test", null);
         tempFile.deleteOnExit();
         SequentialWriter writer = SequentialWriter.open(tempFile);
-        ColumnIndex columnIndex = 
ColumnIndex.writeAndBuildIndex(partition.unfilteredIterator(), writer, header, 
Collections.emptySet(), BigFormat.latestVersion);
-        RowIndexEntry<IndexHelper.IndexInfo> withIndex = 
RowIndexEntry.create(0xdeadbeef, DeletionTime.LIVE, columnIndex);
-        IndexHelper.IndexInfo.Serializer indexSerializer = new 
IndexHelper.IndexInfo.Serializer(cfs.metadata, BigFormat.latestVersion, header);
+        ColumnIndex columnIndex = 
RowIndexEntryTest.ColumnIndex.writeAndBuildIndex(partition.unfilteredIterator(),
 writer, header, Collections.emptySet(), BigFormat.latestVersion);
+        Pre_C_11206_RowIndexEntry withIndex = 
Pre_C_11206_RowIndexEntry.create(0xdeadbeef, DeletionTime.LIVE, columnIndex);
+        IndexInfo.Serializer indexSerializer = 
cfs.metadata.serializers().indexInfoSerializer(BigFormat.latestVersion, header);
 
         // sanity check
         assertTrue(columnIndex.columnsIndex.size() >= 3);
@@ -174,11 +465,11 @@ public class RowIndexEntryTest extends CQLTester
 
         bb = buffer.buffer();
         input = new DataInputBuffer(bb, false);
-        RowIndexEntry.Serializer.skip(input, BigFormat.latestVersion);
+        Pre_C_11206_RowIndexEntry.Serializer.skip(input, 
BigFormat.latestVersion);
         Assert.assertEquals(0, bb.remaining());
     }
 
-    private void serializationCheck(RowIndexEntry<IndexHelper.IndexInfo> 
withIndex, IndexHelper.IndexInfo.Serializer indexSerializer, ByteBuffer bb, 
DataInputBuffer input) throws IOException
+    private static void serializationCheck(Pre_C_11206_RowIndexEntry 
withIndex, IndexInfo.Serializer indexSerializer, ByteBuffer bb, DataInputBuffer 
input) throws IOException
     {
         Assert.assertEquals(0xdeadbeef, input.readUnsignedVInt());
         Assert.assertEquals(withIndex.promotedSize(indexSerializer), 
input.readUnsignedVInt());
@@ -193,7 +484,7 @@ public class RowIndexEntryTest extends CQLTester
         {
             int pos = bb.position();
             offsets[i] = pos - offset;
-            IndexHelper.IndexInfo info = indexSerializer.deserialize(input);
+            IndexInfo info = indexSerializer.deserialize(input);
             int end = bb.position();
 
             Assert.assertEquals(indexSerializer.serializedSize(info), end - 
pos);
@@ -210,4 +501,361 @@ public class RowIndexEntryTest extends CQLTester
 
         Assert.assertEquals(0, bb.remaining());
     }
+
+    static class Pre_C_11206_RowIndexEntry implements IMeasurableMemory
+    {
+        private static final long EMPTY_SIZE = ObjectSizes.measure(new 
Pre_C_11206_RowIndexEntry(0));
+
+        public final long position;
+
+        Pre_C_11206_RowIndexEntry(long position)
+        {
+            this.position = position;
+        }
+
+        protected int promotedSize(IndexInfo.Serializer idxSerializer)
+        {
+            return 0;
+        }
+
+        public static Pre_C_11206_RowIndexEntry create(long position, 
DeletionTime deletionTime, ColumnIndex index)
+        {
+            assert index != null;
+            assert deletionTime != null;
+
+            // we only consider the columns summary when determining whether 
to create an IndexedEntry,
+            // since if there are insufficient columns to be worth indexing 
we're going to seek to
+            // the beginning of the row anyway, so we might as well read the 
tombstone there as well.
+            if (index.columnsIndex.size() > 1)
+                return new Pre_C_11206_RowIndexEntry.IndexedEntry(position, 
deletionTime, index.partitionHeaderLength, index.columnsIndex);
+            else
+                return new Pre_C_11206_RowIndexEntry(position);
+        }
+
+        /**
+         * @return true if this index entry contains the row-level tombstone 
and column summary.  Otherwise,
+         * caller should fetch these from the row header.
+         */
+        public boolean isIndexed()
+        {
+            return !columnsIndex().isEmpty();
+        }
+
+        public DeletionTime deletionTime()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        /**
+         * The length of the row header (partition key, partition deletion and 
static row).
+         * This value is only provided for indexed entries and this method 
will throw
+         * {@code UnsupportedOperationException} if {@code !isIndexed()}.
+         */
+        public long headerLength()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public List<IndexInfo> columnsIndex()
+        {
+            return Collections.emptyList();
+        }
+
+        public long unsharedHeapSize()
+        {
+            return EMPTY_SIZE;
+        }
+
+        public static class Serializer
+        {
+            private final IndexInfo.Serializer idxSerializer;
+            private final Version version;
+
+            Serializer(CFMetaData metadata, Version version, 
SerializationHeader header)
+            {
+                this.idxSerializer = 
metadata.serializers().indexInfoSerializer(version, header);
+                this.version = version;
+            }
+
+            public void serialize(Pre_C_11206_RowIndexEntry rie, 
DataOutputPlus out) throws IOException
+            {
+                assert version.storeRows() : "We read old index files but we 
should never write them";
+
+                out.writeUnsignedVInt(rie.position);
+                out.writeUnsignedVInt(rie.promotedSize(idxSerializer));
+
+                if (rie.isIndexed())
+                {
+                    out.writeUnsignedVInt(rie.headerLength());
+                    DeletionTime.serializer.serialize(rie.deletionTime(), out);
+                    out.writeUnsignedVInt(rie.columnsIndex().size());
+
+                    // Calculate and write the offsets to the IndexInfo 
objects.
+
+                    int[] offsets = new int[rie.columnsIndex().size()];
+
+                    if (out.hasPosition())
+                    {
+                        // Out is usually a SequentialWriter, so using the 
file-pointer is fine to generate the offsets.
+                        // A DataOutputBuffer also works.
+                        long start = out.position();
+                        int i = 0;
+                        for (IndexInfo info : rie.columnsIndex())
+                        {
+                            offsets[i] = i == 0 ? 0 : (int)(out.position() - 
start);
+                            i++;
+                            idxSerializer.serialize(info, out);
+                        }
+                    }
+                    else
+                    {
+                        // Not sure this branch will ever be needed, but if it 
is called, it has to calculate the
+                        // serialized sizes instead of simply using the 
file-pointer.
+                        int i = 0;
+                        int offset = 0;
+                        for (IndexInfo info : rie.columnsIndex())
+                        {
+                            offsets[i++] = offset;
+                            idxSerializer.serialize(info, out);
+                            offset += idxSerializer.serializedSize(info);
+                        }
+                    }
+
+                    for (int off : offsets)
+                        out.writeInt(off);
+                }
+            }
+
+            public Pre_C_11206_RowIndexEntry deserialize(DataInputPlus in) 
throws IOException
+            {
+                if (!version.storeRows())
+                {
+                    long position = in.readLong();
+
+                    int size = in.readInt();
+                    if (size > 0)
+                    {
+                        DeletionTime deletionTime = 
DeletionTime.serializer.deserialize(in);
+
+                        int entries = in.readInt();
+                        List<IndexInfo> columnsIndex = new 
ArrayList<>(entries);
+
+                        long headerLength = 0L;
+                        for (int i = 0; i < entries; i++)
+                        {
+                            IndexInfo info = idxSerializer.deserialize(in);
+                            columnsIndex.add(info);
+                            if (i == 0)
+                                headerLength = info.offset;
+                        }
+
+                        return new 
Pre_C_11206_RowIndexEntry.IndexedEntry(position, deletionTime, headerLength, 
columnsIndex);
+                    }
+                    else
+                    {
+                        return new Pre_C_11206_RowIndexEntry(position);
+                    }
+                }
+
+                long position = in.readUnsignedVInt();
+
+                int size = (int)in.readUnsignedVInt();
+                if (size > 0)
+                {
+                    long headerLength = in.readUnsignedVInt();
+                    DeletionTime deletionTime = 
DeletionTime.serializer.deserialize(in);
+                    int entries = (int)in.readUnsignedVInt();
+                    List<IndexInfo> columnsIndex = new ArrayList<>(entries);
+                    for (int i = 0; i < entries; i++)
+                        columnsIndex.add(idxSerializer.deserialize(in));
+
+                    in.skipBytesFully(entries * TypeSizes.sizeof(0));
+
+                    return new 
Pre_C_11206_RowIndexEntry.IndexedEntry(position, deletionTime, headerLength, 
columnsIndex);
+                }
+                else
+                {
+                    return new Pre_C_11206_RowIndexEntry(position);
+                }
+            }
+
+            // Reads only the data 'position' of the index entry and returns 
it. Note that this left 'in' in the middle
+            // of reading an entry, so this is only useful if you know what 
you are doing and in most case 'deserialize'
+            // should be used instead.
+            static long readPosition(DataInputPlus in, Version version) throws 
IOException
+            {
+                return version.storeRows() ? in.readUnsignedVInt() : 
in.readLong();
+            }
+
+            public static void skip(DataInputPlus in, Version version) throws 
IOException
+            {
+                readPosition(in, version);
+                skipPromotedIndex(in, version);
+            }
+
+            private static void skipPromotedIndex(DataInputPlus in, Version 
version) throws IOException
+            {
+                int size = version.storeRows() ? (int)in.readUnsignedVInt() : 
in.readInt();
+                if (size <= 0)
+                    return;
+
+                in.skipBytesFully(size);
+            }
+
+            public int serializedSize(Pre_C_11206_RowIndexEntry rie)
+            {
+                assert version.storeRows() : "We read old index files but we 
should never write them";
+
+                int indexedSize = 0;
+                if (rie.isIndexed())
+                {
+                    List<IndexInfo> index = rie.columnsIndex();
+
+                    indexedSize += 
TypeSizes.sizeofUnsignedVInt(rie.headerLength());
+                    indexedSize += 
DeletionTime.serializer.serializedSize(rie.deletionTime());
+                    indexedSize += TypeSizes.sizeofUnsignedVInt(index.size());
+
+                    for (IndexInfo info : index)
+                        indexedSize += idxSerializer.serializedSize(info);
+
+                    indexedSize += index.size() * TypeSizes.sizeof(0);
+                }
+
+                return TypeSizes.sizeofUnsignedVInt(rie.position) + 
TypeSizes.sizeofUnsignedVInt(indexedSize) + indexedSize;
+            }
+        }
+
+        /**
+         * An entry in the row index for a row whose columns are indexed.
+         */
+        private static final class IndexedEntry extends 
Pre_C_11206_RowIndexEntry
+        {
+            private final DeletionTime deletionTime;
+
+            // The offset in the file when the index entry end
+            private final long headerLength;
+            private final List<IndexInfo> columnsIndex;
+            private static final long BASE_SIZE =
+            ObjectSizes.measure(new IndexedEntry(0, DeletionTime.LIVE, 0, 
Arrays.asList(null, null)))
+            + ObjectSizes.measure(new ArrayList<>(1));
+
+            private IndexedEntry(long position, DeletionTime deletionTime, 
long headerLength, List<IndexInfo> columnsIndex)
+            {
+                super(position);
+                assert deletionTime != null;
+                assert columnsIndex != null && columnsIndex.size() > 1;
+                this.deletionTime = deletionTime;
+                this.headerLength = headerLength;
+                this.columnsIndex = columnsIndex;
+            }
+
+            @Override
+            public DeletionTime deletionTime()
+            {
+                return deletionTime;
+            }
+
+            @Override
+            public long headerLength()
+            {
+                return headerLength;
+            }
+
+            @Override
+            public List<IndexInfo> columnsIndex()
+            {
+                return columnsIndex;
+            }
+
+            @Override
+            protected int promotedSize(IndexInfo.Serializer idxSerializer)
+            {
+                long size = TypeSizes.sizeofUnsignedVInt(headerLength)
+                            + 
DeletionTime.serializer.serializedSize(deletionTime)
+                            + 
TypeSizes.sizeofUnsignedVInt(columnsIndex.size()); // number of entries
+                for (IndexInfo info : columnsIndex)
+                    size += idxSerializer.serializedSize(info);
+
+                size += columnsIndex.size() * TypeSizes.sizeof(0);
+
+                return Ints.checkedCast(size);
+            }
+
+            @Override
+            public long unsharedHeapSize()
+            {
+                long entrySize = 0;
+                for (IndexInfo idx : columnsIndex)
+                    entrySize += idx.unsharedHeapSize();
+
+                return BASE_SIZE
+                       + entrySize
+                       + deletionTime.unsharedHeapSize()
+                       + ObjectSizes.sizeOfReferenceArray(columnsIndex.size());
+            }
+        }
+    }
+
+    @Test
+    public void testIndexFor() throws IOException
+    {
+        DeletionTime deletionInfo = new 
DeletionTime(FBUtilities.timestampMicros(), FBUtilities.nowInSeconds());
+
+        List<IndexInfo> indexes = new ArrayList<>();
+        indexes.add(new IndexInfo(cn(0L), cn(5L), 0, 0, deletionInfo));
+        indexes.add(new IndexInfo(cn(10L), cn(15L), 0, 0, deletionInfo));
+        indexes.add(new IndexInfo(cn(20L), cn(25L), 0, 0, deletionInfo));
+
+        RowIndexEntry rie = new RowIndexEntry(0L)
+        {
+            public IndexInfoRetriever openWithIndex(SegmentedFile indexFile)
+            {
+                return new IndexInfoRetriever()
+                {
+                    public IndexInfo columnsIndex(int index)
+                    {
+                        return indexes.get(index);
+                    }
+
+                    public void close()
+                    {
+                    }
+                };
+            }
+
+            public int columnsIndexCount()
+            {
+                return indexes.size();
+            }
+        };
+        
+        AbstractSSTableIterator.IndexState indexState = new 
AbstractSSTableIterator.IndexState(
+            null, comp, rie, false, null                                       
                                                       
+        );
+        
+        assertEquals(0, indexState.indexFor(cn(-1L), -1));
+        assertEquals(0, indexState.indexFor(cn(5L), -1));
+        assertEquals(1, indexState.indexFor(cn(12L), -1));
+        assertEquals(2, indexState.indexFor(cn(17L), -1));
+        assertEquals(3, indexState.indexFor(cn(100L), -1));
+        assertEquals(3, indexState.indexFor(cn(100L), 0));
+        assertEquals(3, indexState.indexFor(cn(100L), 1));
+        assertEquals(3, indexState.indexFor(cn(100L), 2));
+        assertEquals(3, indexState.indexFor(cn(100L), 3));
+
+        indexState = new AbstractSSTableIterator.IndexState(
+            null, comp, rie, true, null
+        );
+
+        assertEquals(-1, indexState.indexFor(cn(-1L), -1));
+        assertEquals(0, indexState.indexFor(cn(5L), 3));
+        assertEquals(0, indexState.indexFor(cn(5L), 2));
+        assertEquals(1, indexState.indexFor(cn(17L), 3));
+        assertEquals(2, indexState.indexFor(cn(100L), 3));
+        assertEquals(2, indexState.indexFor(cn(100L), 4));
+        assertEquals(1, indexState.indexFor(cn(12L), 3));
+        assertEquals(1, indexState.indexFor(cn(12L), 2));
+        assertEquals(1, indexState.indexFor(cn(100L), 1));
+        assertEquals(2, indexState.indexFor(cn(100L), 2));
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/test/unit/org/apache/cassandra/io/sstable/IndexHelperTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexHelperTest.java 
b/test/unit/org/apache/cassandra/io/sstable/IndexHelperTest.java
deleted file mode 100644
index e6328de..0000000
--- a/test/unit/org/apache/cassandra/io/sstable/IndexHelperTest.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-package org.apache.cassandra.io.sstable;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.junit.Test;
-
-import org.apache.cassandra.Util;
-import org.apache.cassandra.db.ClusteringComparator;
-import org.apache.cassandra.db.ClusteringPrefix;
-import org.apache.cassandra.db.DeletionTime;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.LongType;
-import org.apache.cassandra.utils.FBUtilities;
-
-import static org.apache.cassandra.io.sstable.IndexHelper.IndexInfo;
-import static org.junit.Assert.assertEquals;
-
-public class IndexHelperTest
-{
-
-    private static ClusteringComparator comp = new 
ClusteringComparator(Collections.<AbstractType<?>>singletonList(LongType.instance));
-    private static ClusteringPrefix cn(long l)
-    {
-        return Util.clustering(comp, l);
-    }
-
-    @Test
-    public void testIndexHelper()
-    {
-        DeletionTime deletionInfo = new 
DeletionTime(FBUtilities.timestampMicros(), FBUtilities.nowInSeconds());
-
-        List<IndexInfo> indexes = new ArrayList<>();
-        indexes.add(new IndexInfo(cn(0L), cn(5L), 0, 0, deletionInfo));
-        indexes.add(new IndexInfo(cn(10L), cn(15L), 0, 0, deletionInfo));
-        indexes.add(new IndexInfo(cn(20L), cn(25L), 0, 0, deletionInfo));
-
-        assertEquals(0, IndexHelper.indexFor(cn(-1L), indexes, comp, false, 
-1));
-        assertEquals(0, IndexHelper.indexFor(cn(5L), indexes, comp, false, 
-1));
-        assertEquals(1, IndexHelper.indexFor(cn(12L), indexes, comp, false, 
-1));
-        assertEquals(2, IndexHelper.indexFor(cn(17L), indexes, comp, false, 
-1));
-        assertEquals(3, IndexHelper.indexFor(cn(100L), indexes, comp, false, 
-1));
-        assertEquals(3, IndexHelper.indexFor(cn(100L), indexes, comp, false, 
0));
-        assertEquals(3, IndexHelper.indexFor(cn(100L), indexes, comp, false, 
1));
-        assertEquals(3, IndexHelper.indexFor(cn(100L), indexes, comp, false, 
2));
-        assertEquals(3, IndexHelper.indexFor(cn(100L), indexes, comp, false, 
3));
-
-        assertEquals(-1, IndexHelper.indexFor(cn(-1L), indexes, comp, true, 
-1));
-        assertEquals(0, IndexHelper.indexFor(cn(5L), indexes, comp, true, 3));
-        assertEquals(0, IndexHelper.indexFor(cn(5L), indexes, comp, true, 2));
-        assertEquals(1, IndexHelper.indexFor(cn(17L), indexes, comp, true, 3));
-        assertEquals(2, IndexHelper.indexFor(cn(100L), indexes, comp, true, 
3));
-        assertEquals(2, IndexHelper.indexFor(cn(100L), indexes, comp, true, 
4));
-        assertEquals(1, IndexHelper.indexFor(cn(12L), indexes, comp, true, 3));
-        assertEquals(1, IndexHelper.indexFor(cn(12L), indexes, comp, true, 2));
-        assertEquals(1, IndexHelper.indexFor(cn(100L), indexes, comp, true, 
1));
-        assertEquals(2, IndexHelper.indexFor(cn(100L), indexes, comp, true, 
2));
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/test/unit/org/apache/cassandra/io/sstable/LargePartitionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/LargePartitionsTest.java 
b/test/unit/org/apache/cassandra/io/sstable/LargePartitionsTest.java
new file mode 100644
index 0000000..f97356a
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/sstable/LargePartitionsTest.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.cassandra.OrderedJUnit4ClassRunner;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.metrics.CacheMetrics;
+import org.apache.cassandra.service.CacheService;
+
+/**
+ * Test intended to manually measure GC pressure to write and read partitions 
of different size
+ * for CASSANDRA-11206.
+ */
+@RunWith(OrderedJUnit4ClassRunner.class)
+@Ignore // all these tests take very, very long - so only run them manually
+public class LargePartitionsTest extends CQLTester
+{
+
+    @FunctionalInterface
+    interface Measured
+    {
+        void measure() throws Throwable;
+    }
+
+    private static void measured(String name, Measured measured) throws 
Throwable
+    {
+        long t0 = System.currentTimeMillis();
+        measured.measure();
+        long t = System.currentTimeMillis() - t0;
+        System.out.println(name + " took " + t + " ms");
+    }
+
+    private static String randomText(int bytes)
+    {
+        char[] ch = new char[bytes];
+        ThreadLocalRandom r = ThreadLocalRandom.current();
+        for (int i = 0; i < bytes; i++)
+            ch[i] = (char) (32 + r.nextInt(95));
+        return new String(ch);
+    }
+
+    private static final int rowKBytes = 8;
+
+    private void withPartitionSize(long partitionKBytes, long totalMBytes) 
throws Throwable
+    {
+        long totalKBytes = totalMBytes * 1024L;
+
+        createTable("CREATE TABLE %s (pk text, ck text, val text, PRIMARY KEY 
(pk, ck))");
+
+        String name = "part=" + partitionKBytes + "k total=" + totalMBytes + 
'M';
+
+        measured("INSERTs for " + name, () -> {
+            for (long writtenKBytes = 0L; writtenKBytes < totalKBytes; 
writtenKBytes += partitionKBytes)
+            {
+                String pk = Long.toBinaryString(writtenKBytes);
+                for (long kbytes = 0L; kbytes < partitionKBytes; kbytes += 
rowKBytes)
+                {
+                    String ck = Long.toBinaryString(kbytes);
+                    execute("INSERT INTO %s (pk, ck, val) VALUES (?,?,?)", pk, 
ck, randomText(rowKBytes * 1024));
+                }
+            }
+        });
+
+        measured("flush for " + name, () -> flush(true));
+
+        CacheService.instance.keyCache.clear();
+
+        measured("compact for " + name, () -> {
+            keyCacheMetrics("before compaction");
+            compact();
+            keyCacheMetrics("after compaction");
+        });
+
+        measured("SELECTs 1 for " + name, () -> selects(partitionKBytes, 
totalKBytes));
+
+        measured("SELECTs 2 for " + name, () -> selects(partitionKBytes, 
totalKBytes));
+    }
+
+    private void selects(long partitionKBytes, long totalKBytes) throws 
Throwable
+    {
+        for (int i = 0; i < 50000; i++)
+        {
+            long pk = ThreadLocalRandom.current().nextLong(totalKBytes / 
partitionKBytes) * partitionKBytes;
+            long ck = ThreadLocalRandom.current().nextLong(partitionKBytes / 
rowKBytes) * rowKBytes;
+            execute("SELECT val FROM %s WHERE pk=? AND ck=?",
+                    Long.toBinaryString(pk),
+                    Long.toBinaryString(ck)).one();
+            if (i % 1000 == 0)
+                keyCacheMetrics("after " + i + " selects");
+        }
+        keyCacheMetrics("after all selects");
+    }
+
+    private static void keyCacheMetrics(String title)
+    {
+        CacheMetrics metrics = CacheService.instance.keyCache.getMetrics();
+        System.out.println("Key cache metrics " + title + ": capacity:" + 
metrics.capacity.getValue() +
+                           " size:"+metrics.size.getValue()+
+                           " entries:" + metrics.entries.getValue() +
+                           " hit-rate:"+metrics.hitRate.getValue() +
+                           " 
one-min-rate:"+metrics.oneMinuteHitRate.getValue());
+    }
+
+    @Test
+    public void prepare() throws Throwable
+    {
+        for (int i = 0; i < 4; i++)
+        {
+            withPartitionSize(8L, 32L);
+        }
+    }
+
+    @Test
+    public void test_01_16k() throws Throwable
+    {
+        withPartitionSize(16L, 1024L);
+    }
+
+    @Test
+    public void test_02_512k() throws Throwable
+    {
+        withPartitionSize(512L, 1024L);
+    }
+
+    @Test
+    public void test_03_1M() throws Throwable
+    {
+        withPartitionSize(1024L, 1024L);
+    }
+
+    @Test
+    public void test_04_4M() throws Throwable
+    {
+        withPartitionSize(4L * 1024L, 1024L);
+    }
+
+    @Test
+    public void test_05_8M() throws Throwable
+    {
+        withPartitionSize(8L * 1024L, 1024L);
+    }
+
+    @Test
+    public void test_06_16M() throws Throwable
+    {
+        withPartitionSize(16L * 1024L, 1024L);
+    }
+
+    @Test
+    public void test_07_32M() throws Throwable
+    {
+        withPartitionSize(32L * 1024L, 1024L);
+    }
+
+    @Test
+    public void test_08_64M() throws Throwable
+    {
+        withPartitionSize(64L * 1024L, 1024L);
+    }
+
+    @Test
+    public void test_09_256M() throws Throwable
+    {
+        withPartitionSize(256L * 1024L, 4 * 1024L);
+    }
+
+    @Test
+    public void test_10_512M() throws Throwable
+    {
+        withPartitionSize(512L * 1024L, 4 * 1024L);
+    }
+
+    @Test
+    public void test_11_1G() throws Throwable
+    {
+        withPartitionSize(1024L * 1024L, 8 * 1024L);
+    }
+
+    @Test
+    public void test_12_2G() throws Throwable
+    {
+        withPartitionSize(2L * 1024L * 1024L, 8 * 1024L);
+    }
+
+    @Test
+    public void test_13_4G() throws Throwable
+    {
+        withPartitionSize(4L * 1024L * 1024L, 16 * 1024L);
+    }
+
+    @Test
+    public void test_14_8G() throws Throwable
+    {
+        withPartitionSize(8L * 1024L * 1024L, 32 * 1024L);
+    }
+}

Reply via email to