Support large partitions on the 3.0 sstable format

patch by Robert Stupp; reviewed by T Jake Luciani for CASSANDRA-11206


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ef5bbedd
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ef5bbedd
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ef5bbedd

Branch: refs/heads/trunk
Commit: ef5bbedd687d75923e9a20fde9d2f78b4535241d
Parents: ae063e8
Author: Robert Stupp <sn...@snazy.de>
Authored: Thu Apr 21 16:48:26 2016 +0200
Committer: Robert Stupp <sn...@snazy.de>
Committed: Thu Apr 21 16:48:26 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |    1 +
 NEWS.txt                                        |    3 +
 conf/cassandra.yaml                             |    8 +
 .../apache/cassandra/cache/AutoSavingCache.java |    4 +-
 .../org/apache/cassandra/config/Config.java     |    1 +
 .../cassandra/config/DatabaseDescriptor.java    |   11 +
 .../org/apache/cassandra/db/Clustering.java     |    6 +
 .../cassandra/db/ClusteringComparator.java      |    2 +-
 .../apache/cassandra/db/ClusteringPrefix.java   |   29 +
 .../org/apache/cassandra/db/ColumnIndex.java    |  296 +++---
 .../org/apache/cassandra/db/RangeTombstone.java |   10 +
 .../org/apache/cassandra/db/RowIndexEntry.java  | 1006 +++++++++++++++---
 .../cassandra/db/SerializationHeader.java       |   20 -
 .../org/apache/cassandra/db/Serializers.java    |   95 +-
 .../columniterator/AbstractSSTableIterator.java |  111 +-
 .../db/columniterator/SSTableIterator.java      |   15 +-
 .../columniterator/SSTableReversedIterator.java |   19 +-
 .../cassandra/db/compaction/Scrubber.java       |    4 +-
 .../cassandra/db/compaction/Verifier.java       |    4 +-
 .../UnfilteredRowIteratorWithLowerBound.java    |   29 +-
 .../cassandra/index/sasi/SASIIndexBuilder.java  |    2 +-
 .../org/apache/cassandra/io/ISerializer.java    |    5 +
 .../cassandra/io/sstable/IndexHelper.java       |  192 ----
 .../apache/cassandra/io/sstable/IndexInfo.java  |  178 ++++
 .../io/sstable/format/SSTableFormat.java        |    6 -
 .../io/sstable/format/SSTableReader.java        |   10 +-
 .../io/sstable/format/big/BigTableReader.java   |    8 +-
 .../io/sstable/format/big/BigTableScanner.java  |    6 +-
 .../io/sstable/format/big/BigTableWriter.java   |   32 +-
 .../apache/cassandra/service/CacheService.java  |   12 +-
 .../cassandra/cache/AutoSavingCacheTest.java    |   15 +
 .../apache/cassandra/cql3/KeyCacheCqlTest.java  |   76 +-
 .../apache/cassandra/cql3/PagingQueryTest.java  |  111 ++
 .../cql3/TombstonesWithIndexedSSTableTest.java  |   16 +-
 .../org/apache/cassandra/db/KeyCacheTest.java   |  151 ++-
 .../org/apache/cassandra/db/KeyspaceTest.java   |    2 +-
 .../apache/cassandra/db/RowIndexEntryTest.java  |  768 +++++++++++--
 .../cassandra/io/sstable/IndexHelperTest.java   |   78 --
 .../io/sstable/LargePartitionsTest.java         |  219 ++++
 39 files changed, 2775 insertions(+), 786 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1b94b2d..a0c7df6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.6
+ * Support large partitions on the 3.0 sstable format (CASSANDRA-11206)
  * JSON datetime formatting needs timezone (CASSANDRA-11137)
  * Add support to rebuild from specific range (CASSANDRA-10409)
  * Optimize the overlapping lookup by calculating all the

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index e073592..a177d37 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -18,6 +18,9 @@ using the provided 'sstableupgrade' tool.
 
 New features
 ------------
+   - Key cache will only hold indexed entries up to the size configured by
+     column_index_cache_size_in_kb in cassandra.yaml in memory. Larger indexed 
entries
+     will never go into memory. See CASSANDRA-11206 for more details.
    - For tables having a default_time_to_live specifying a TTL of 0 will 
remove the TTL
      from the inserted or updated values.
    - Startup is now aborted if corrupted transaction log files are found. The 
details

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index f9be453..582859c 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -659,6 +659,14 @@ auto_snapshot: true
 #      rows (as part of the key cache), so a larger granularity means
 #      you can cache more hot rows
 column_index_size_in_kb: 64
+# Per sstable indexed key cache entries (the collation index in memory
+# mentioned above) exceeding this size will not be held on heap.
+# This means that only partition information is held on heap and the
+# index entries are read from disk.
+#
+# Note that this size refers to the size of the
+# serialized index information and not the size of the partition.
+column_index_cache_size_in_kb: 2
 
 # Number of simultaneous compactions to allow, NOT including
 # validation "compactions" for anti-entropy repair.  Simultaneous

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java 
b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index e39dcf1..2921818 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -77,8 +77,10 @@ public class AutoSavingCache<K extends CacheKey, V> extends 
InstrumentingCache<K
      * a minor version letter.
      *
      * Sticking with "d" is fine for 3.0 since it has never been released or 
used by another version
+     *
+     * "e" introduced with CASSANDRA-11206, omits IndexInfo from key-cache, 
stores offset into index-file
      */
-    private static final String CURRENT_VERSION = "d";
+    private static final String CURRENT_VERSION = "e";
 
     private static volatile IStreamFactory streamFactory = new IStreamFactory()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index b5dea06..809966d 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -162,6 +162,7 @@ public class Config
 
     /* if the size of columns or super-columns are more than this, indexing 
will kick in */
     public Integer column_index_size_in_kb = 64;
+    public Integer column_index_cache_size_in_kb = 2;
     public volatile int batch_size_warn_threshold_in_kb = 5;
     public volatile int batch_size_fail_threshold_in_kb = 50;
     public Integer unlogged_batch_across_partitions_warn_threshold = 10;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 37b9200..b98d103 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -962,6 +962,17 @@ public class DatabaseDescriptor
         conf.column_index_size_in_kb = val;
     }
 
+    public static int getColumnIndexCacheSize()
+    {
+        return conf.column_index_cache_size_in_kb * 1024;
+    }
+
+    @VisibleForTesting
+    public static void setColumnIndexCacheSize(int val)
+    {
+        conf.column_index_cache_size_in_kb = val;
+    }
+
     public static int getBatchSizeWarnThreshold()
     {
         return conf.batch_size_warn_threshold_in_kb * 1024;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/db/Clustering.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Clustering.java 
b/src/java/org/apache/cassandra/db/Clustering.java
index f5ffae4..fa38ce1 100644
--- a/src/java/org/apache/cassandra/db/Clustering.java
+++ b/src/java/org/apache/cassandra/db/Clustering.java
@@ -149,6 +149,12 @@ public interface Clustering extends ClusteringPrefix
             return 
ClusteringPrefix.serializer.valuesWithoutSizeSerializedSize(clustering, 
version, types);
         }
 
+        public void skip(DataInputPlus in, int version, List<AbstractType<?>> 
types) throws IOException
+        {
+            if (!types.isEmpty())
+                ClusteringPrefix.serializer.skipValuesWithoutSize(in, 
types.size(), version, types);
+        }
+
         public Clustering deserialize(DataInputPlus in, int version, 
List<AbstractType<?>> types) throws IOException
         {
             if (types.isEmpty())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/db/ClusteringComparator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ClusteringComparator.java 
b/src/java/org/apache/cassandra/db/ClusteringComparator.java
index f5f6ae8..3030b5a 100644
--- a/src/java/org/apache/cassandra/db/ClusteringComparator.java
+++ b/src/java/org/apache/cassandra/db/ClusteringComparator.java
@@ -29,7 +29,7 @@ import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.serializers.MarshalException;
 
-import static org.apache.cassandra.io.sstable.IndexHelper.IndexInfo;
+import org.apache.cassandra.io.sstable.IndexInfo;
 
 /**
  * A comparator of clustering prefixes (or more generally of {@link 
Clusterable}}.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/db/ClusteringPrefix.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ClusteringPrefix.java 
b/src/java/org/apache/cassandra/db/ClusteringPrefix.java
index 4854517..df0befc 100644
--- a/src/java/org/apache/cassandra/db/ClusteringPrefix.java
+++ b/src/java/org/apache/cassandra/db/ClusteringPrefix.java
@@ -263,6 +263,17 @@ public interface ClusteringPrefix extends 
IMeasurableMemory, Clusterable
             }
         }
 
+        public void skip(DataInputPlus in, int version, List<AbstractType<?>> 
types) throws IOException
+        {
+            Kind kind = Kind.values()[in.readByte()];
+            // We shouldn't serialize static clusterings
+            assert kind != Kind.STATIC_CLUSTERING;
+            if (kind == Kind.CLUSTERING)
+                Clustering.serializer.skip(in, version, types);
+            else
+                RangeTombstone.Bound.serializer.skipValues(in, kind, version, 
types);
+        }
+
         public ClusteringPrefix deserialize(DataInputPlus in, int version, 
List<AbstractType<?>> types) throws IOException
         {
             Kind kind = Kind.values()[in.readByte()];
@@ -350,6 +361,24 @@ public interface ClusteringPrefix extends 
IMeasurableMemory, Clusterable
             return values;
         }
 
+        void skipValuesWithoutSize(DataInputPlus in, int size, int version, 
List<AbstractType<?>> types) throws IOException
+        {
+            // Callers of this method should handle the case where size = 0 
(in all case we want to return a special value anyway).
+            assert size > 0;
+            int offset = 0;
+            while (offset < size)
+            {
+                long header = in.readUnsignedVInt();
+                int limit = Math.min(size, offset + 32);
+                while (offset < limit)
+                {
+                    if (!isNull(header, offset) && !isEmpty(header, offset))
+                         types.get(offset).skipValue(in);
+                    offset++;
+                }
+            }
+        }
+
         /**
          * Whatever the type of a given clustering column is, its value can 
always be either empty or null. So we at least need to distinguish those
          * 2 values, and because we want to be able to store fixed width 
values without appending their (fixed) size first, we need a way to encode

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java 
b/src/java/org/apache/cassandra/db/ColumnIndex.java
index 1942052..4dcceff 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -15,184 +15,234 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.cassandra.db;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.*;
 
-import com.google.common.annotations.VisibleForTesting;
+import com.google.common.primitives.Ints;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.io.sstable.IndexHelper;
+import org.apache.cassandra.io.ISerializer;
+import org.apache.cassandra.io.sstable.IndexInfo;
 import org.apache.cassandra.io.sstable.format.SSTableFlushObserver;
 import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.SequentialWriter;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
+/**
+ * Column index builder used by {@link 
org.apache.cassandra.io.sstable.format.big.BigTableWriter}.
+ * For index entries that exceed {@link 
org.apache.cassandra.config.Config#column_index_cache_size_in_kb},
+ * this uses the serialization logic as in {@link RowIndexEntry}.
+ */
 public class ColumnIndex
 {
-    public final long partitionHeaderLength;
-    public final List<IndexHelper.IndexInfo> columnsIndex;
 
-    private static final ColumnIndex EMPTY = new ColumnIndex(-1, 
Collections.<IndexHelper.IndexInfo>emptyList());
+    // used, if the row-index-entry reaches config 
column_index_cache_size_in_kb
+    private DataOutputBuffer buffer;
+    // used to track the size of the serialized size of row-index-entry 
(unused for buffer)
+    private int indexSamplesSerializedSize;
+    // used, until the row-index-entry reaches config 
column_index_cache_size_in_kb
+    public List<IndexInfo> indexSamples = new ArrayList<>();
 
-    private ColumnIndex(long partitionHeaderLength, 
List<IndexHelper.IndexInfo> columnsIndex)
-    {
-        assert columnsIndex != null;
+    public int columnIndexCount;
+    private int[] indexOffsets;
 
-        this.partitionHeaderLength = partitionHeaderLength;
-        this.columnsIndex = columnsIndex;
-    }
+    private final SerializationHeader header;
+    private final int version;
+    private final SequentialWriter writer;
+    private final long initialPosition;
+    private final ISerializer<IndexInfo> idxSerializer;
+    public long headerLength = -1;
 
-    public static ColumnIndex writeAndBuildIndex(UnfilteredRowIterator 
iterator,
-                                                 SequentialWriter output,
-                                                 SerializationHeader header,
-                                                 
Collection<SSTableFlushObserver> observers,
-                                                 Version version) throws 
IOException
-    {
-        assert !iterator.isEmpty() && version.storeRows();
+    private long startPosition = -1;
 
-        Builder builder = new Builder(iterator, output, header, observers, 
version.correspondingMessagingVersion());
-        return builder.build();
-    }
+    private int written;
+    private long previousRowStart;
+
+    private ClusteringPrefix firstClustering;
+    private ClusteringPrefix lastClustering;
 
-    @VisibleForTesting
-    public static ColumnIndex nothing()
+    private DeletionTime openMarker;
+
+    private final Collection<SSTableFlushObserver> observers;
+
+    public ColumnIndex(SerializationHeader header,
+                       SequentialWriter writer,
+                       Version version,
+                       Collection<SSTableFlushObserver> observers,
+                       ISerializer<IndexInfo> indexInfoSerializer)
     {
-        return EMPTY;
+        this.header = header;
+        this.idxSerializer = indexInfoSerializer;
+        this.writer = writer;
+        this.version = version.correspondingMessagingVersion();
+        this.observers = observers;
+        this.initialPosition = writer.position();
     }
 
-    /**
-     * Help to create an index for a column family based on size of columns,
-     * and write said columns to disk.
-     */
-    private static class Builder
+    public void buildRowIndex(UnfilteredRowIterator iterator) throws 
IOException
     {
-        private final UnfilteredRowIterator iterator;
-        private final SequentialWriter writer;
-        private final SerializationHeader header;
-        private final int version;
+        writePartitionHeader(iterator);
+        this.headerLength = writer.position() - initialPosition;
 
-        private final List<IndexHelper.IndexInfo> columnsIndex = new 
ArrayList<>();
-        private final long initialPosition;
-        private long headerLength = -1;
+        while (iterator.hasNext())
+            add(iterator.next());
 
-        private long startPosition = -1;
+        close();
+    }
 
-        private int written;
-        private long previousRowStart;
+    private void writePartitionHeader(UnfilteredRowIterator iterator) throws 
IOException
+    {
+        ByteBufferUtil.writeWithShortLength(iterator.partitionKey().getKey(), 
writer);
+        DeletionTime.serializer.serialize(iterator.partitionLevelDeletion(), 
writer);
+        if (header.hasStatic())
+        {
+            Row staticRow = iterator.staticRow();
 
-        private ClusteringPrefix firstClustering;
-        private ClusteringPrefix lastClustering;
+            UnfilteredSerializer.serializer.serializeStaticRow(staticRow, 
header, writer, version);
+            if (!observers.isEmpty())
+                observers.forEach((o) -> o.nextUnfilteredCluster(staticRow));
+        }
+    }
 
-        private DeletionTime openMarker;
+    private long currentPosition()
+    {
+        return writer.position() - initialPosition;
+    }
 
-        private final Collection<SSTableFlushObserver> observers;
+    public ByteBuffer buffer()
+    {
+        return buffer != null ? buffer.buffer() : null;
+    }
 
-        public Builder(UnfilteredRowIterator iterator,
-                       SequentialWriter writer,
-                       SerializationHeader header,
-                       Collection<SSTableFlushObserver> observers,
-                       int version)
+    public int[] offsets()
+    {
+        return indexOffsets != null
+               ? Arrays.copyOf(indexOffsets, columnIndexCount)
+               : null;
+    }
+
+    private void addIndexBlock() throws IOException
+    {
+        IndexInfo cIndexInfo = new IndexInfo(firstClustering,
+                                             lastClustering,
+                                             startPosition,
+                                             currentPosition() - startPosition,
+                                             openMarker);
+
+        // indexOffsets is used for both shallow (ShallowIndexedEntry) and 
non-shallow IndexedEntry.
+        // For shallow ones, we need it to serialize the offsts in close().
+        // For non-shallow ones, the offsts are passed into IndexedEntry, so 
we don't have to
+        // calculate the offsets again.
+
+        // indexOffsets contains the offsets of the serialized IndexInfo 
objects.
+        // I.e. indexOffsets[0] is always 0 so we don't have to deal with a 
special handling
+        // for index #0 and always subtracting 1 for the index (which could be 
error-prone).
+        if (indexOffsets == null)
+            indexOffsets = new int[10];
+        else
         {
-            this.iterator = iterator;
-            this.writer = writer;
-            this.header = header;
-            this.version = version;
-            this.observers = observers == null ? Collections.emptyList() : 
observers;
-            this.initialPosition = writer.position();
+            if (columnIndexCount >= indexOffsets.length)
+                indexOffsets = Arrays.copyOf(indexOffsets, indexOffsets.length 
+ 10);
+            indexOffsets[columnIndexCount] =
+                buffer != null
+                    ? Ints.checkedCast(buffer.position())
+                    : indexSamplesSerializedSize;
         }
+        columnIndexCount++;
 
-        private void writePartitionHeader(UnfilteredRowIterator iterator) 
throws IOException
+        // First, we collect the IndexInfo objects until we reach 
Config.column_index_cache_size_in_kb in an ArrayList.
+        // When column_index_cache_size_in_kb is reached, we switch to 
byte-buffer mode.
+        if (buffer == null)
         {
-            
ByteBufferUtil.writeWithShortLength(iterator.partitionKey().getKey(), writer);
-            
DeletionTime.serializer.serialize(iterator.partitionLevelDeletion(), writer);
-            if (header.hasStatic())
+            indexSamplesSerializedSize += 
idxSerializer.serializedSize(cIndexInfo);
+            if (indexSamplesSerializedSize + columnIndexCount * 
TypeSizes.sizeof(0) > DatabaseDescriptor.getColumnIndexCacheSize())
             {
-                Row staticRow = iterator.staticRow();
-
-                UnfilteredSerializer.serializer.serializeStaticRow(staticRow, 
header, writer, version);
-                if (!observers.isEmpty())
-                    observers.forEach((o) -> 
o.nextUnfilteredCluster(staticRow));
+                buffer = new 
DataOutputBuffer(DatabaseDescriptor.getColumnIndexCacheSize() * 2);
+                for (IndexInfo indexSample : indexSamples)
+                {
+                    idxSerializer.serialize(indexSample, buffer);
+                }
+                indexSamples = null;
+            }
+            else
+            {
+                indexSamples.add(cIndexInfo);
             }
         }
-
-        public ColumnIndex build() throws IOException
+        // don't put an else here...
+        if (buffer != null)
         {
-            writePartitionHeader(iterator);
-            this.headerLength = writer.position() - initialPosition;
-
-            while (iterator.hasNext())
-                add(iterator.next());
-
-            return close();
+            idxSerializer.serialize(cIndexInfo, buffer);
         }
 
-        private long currentPosition()
-        {
-            return writer.position() - initialPosition;
-        }
+        firstClustering = null;
+    }
+
+    private void add(Unfiltered unfiltered) throws IOException
+    {
+        long pos = currentPosition();
 
-        private void addIndexBlock()
+        if (firstClustering == null)
         {
-            IndexHelper.IndexInfo cIndexInfo = new 
IndexHelper.IndexInfo(firstClustering,
-                                                                         
lastClustering,
-                                                                         
startPosition,
-                                                                         
currentPosition() - startPosition,
-                                                                         
openMarker);
-            columnsIndex.add(cIndexInfo);
-            firstClustering = null;
+            // Beginning of an index block. Remember the start and position
+            firstClustering = unfiltered.clustering();
+            startPosition = pos;
         }
 
-        private void add(Unfiltered unfiltered) throws IOException
-        {
-            long pos = currentPosition();
+        UnfilteredSerializer.serializer.serialize(unfiltered, header, writer, 
pos - previousRowStart, version);
 
-            if (firstClustering == null)
-            {
-                // Beginning of an index block. Remember the start and position
-                firstClustering = unfiltered.clustering();
-                startPosition = pos;
-            }
+        // notify observers about each new row
+        if (!observers.isEmpty())
+            observers.forEach((o) -> o.nextUnfilteredCluster(unfiltered));
 
-            UnfilteredSerializer.serializer.serialize(unfiltered, header, 
writer, pos - previousRowStart, version);
+        lastClustering = unfiltered.clustering();
+        previousRowStart = pos;
+        ++written;
 
-            // notify observers about each new row
-            if (!observers.isEmpty())
-                observers.forEach((o) -> o.nextUnfilteredCluster(unfiltered));
-
-            lastClustering = unfiltered.clustering();
-            previousRowStart = pos;
-            ++written;
+        if (unfiltered.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
+        {
+            RangeTombstoneMarker marker = (RangeTombstoneMarker) unfiltered;
+            openMarker = marker.isOpen(false) ? marker.openDeletionTime(false) 
: null;
+        }
 
-            if (unfiltered.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
-            {
-                RangeTombstoneMarker marker = (RangeTombstoneMarker)unfiltered;
-                openMarker = marker.isOpen(false) ? 
marker.openDeletionTime(false) : null;
-            }
+        // if we hit the column index size that we have to index after, go 
ahead and index it.
+        if (currentPosition() - startPosition >= 
DatabaseDescriptor.getColumnIndexSize())
+            addIndexBlock();
+    }
 
-            // if we hit the column index size that we have to index after, go 
ahead and index it.
-            if (currentPosition() - startPosition >= 
DatabaseDescriptor.getColumnIndexSize())
-                addIndexBlock();
+    private void close() throws IOException
+    {
+        UnfilteredSerializer.serializer.writeEndOfPartition(writer);
 
-        }
+        // It's possible we add no rows, just a top level deletion
+        if (written == 0)
+            return;
 
-        private ColumnIndex close() throws IOException
-        {
-            UnfilteredSerializer.serializer.writeEndOfPartition(writer);
+        // the last column may have fallen on an index boundary already.  if 
not, index it explicitly.
+        if (firstClustering != null)
+            addIndexBlock();
 
-            // It's possible we add no rows, just a top level deletion
-            if (written == 0)
-                return ColumnIndex.EMPTY;
+        // If we serialize the IndexInfo objects directly in the code above 
into 'buffer',
+        // we have to write the offsts to these here. The offsets have already 
been are collected
+        // in indexOffsets[]. buffer is != null, if it exceeds 
Config.column_index_cache_size_in_kb.
+        // In the other case, when buffer==null, the offsets are serialized in 
RowIndexEntry.IndexedEntry.serialize().
+        if (buffer != null)
+            RowIndexEntry.Serializer.serializeOffsets(buffer, indexOffsets, 
columnIndexCount);
 
-            // the last column may have fallen on an index boundary already.  
if not, index it explicitly.
-            if (firstClustering != null)
-                addIndexBlock();
+        // we should always have at least one computed index block, but we 
only write it out if there is more than that.
+        assert columnIndexCount > 0 && headerLength >= 0;
+    }
 
-            // we should always have at least one computed index block, but we 
only write it out if there is more than that.
-            assert columnsIndex.size() > 0 && headerLength >= 0;
-            return new ColumnIndex(headerLength, columnsIndex);
-        }
+    public int indexInfoSerializedSize()
+    {
+        return buffer != null
+               ? buffer.buffer().limit()
+               : indexSamplesSerializedSize + columnIndexCount * 
TypeSizes.sizeof(0);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/db/RangeTombstone.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeTombstone.java 
b/src/java/org/apache/cassandra/db/RangeTombstone.java
index 8af3b97..29feb46 100644
--- a/src/java/org/apache/cassandra/db/RangeTombstone.java
+++ b/src/java/org/apache/cassandra/db/RangeTombstone.java
@@ -202,6 +202,16 @@ public class RangeTombstone
                 return deserializeValues(in, kind, version, types);
             }
 
+            public void skipValues(DataInputPlus in, Kind kind, int version,
+                                                          
List<AbstractType<?>> types) throws IOException
+            {
+                int size = in.readUnsignedShort();
+                if (size == 0)
+                    return;
+
+                ClusteringPrefix.serializer.skipValuesWithoutSize(in, size, 
version, types);
+            }
+
             public RangeTombstone.Bound deserializeValues(DataInputPlus in, 
Kind kind, int version,
                     List<AbstractType<?>> types) throws IOException
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/db/RowIndexEntry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowIndexEntry.java 
b/src/java/org/apache/cassandra/db/RowIndexEntry.java
index 4e2f063..7fda245 100644
--- a/src/java/org/apache/cassandra/db/RowIndexEntry.java
+++ b/src/java/org/apache/cassandra/db/RowIndexEntry.java
@@ -18,50 +18,140 @@
 package org.apache.cassandra.db;
 
 import java.io.IOException;
-import java.util.ArrayList;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 
-import com.google.common.primitives.Ints;
-
+import com.codahale.metrics.Histogram;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.cache.IMeasurableMemory;
-import org.apache.cassandra.io.sstable.IndexHelper;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.ISerializer;
+import org.apache.cassandra.io.sstable.IndexInfo;
 import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.SegmentedFile;
+import org.apache.cassandra.io.util.TrackedDataInputPlus;
+import org.apache.cassandra.metrics.DefaultNameFactory;
+import org.apache.cassandra.metrics.MetricNameFactory;
 import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.vint.VIntCoding;
+import org.github.jamm.Unmetered;
+
+import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
 
+/**
+ * Binary format of {@code RowIndexEntry} is defined as follows:
+ * {@code
+ * (long) position (64 bit long, vint encoded)
+ *  (int) serialized size of data that follows (32 bit int, vint encoded)
+ * -- following for indexed entries only (so serialized size > 0)
+ *  (int) DeletionTime.localDeletionTime
+ * (long) DeletionTime.markedForDeletionAt
+ *  (int) number of IndexInfo objects (32 bit int, vint encoded)
+ *    (*) serialized IndexInfo objects, see below
+ *    (*) offsets of serialized IndexInfo objects, since version "ma" (3.0)
+ *        Each IndexInfo object's offset is relative to the first IndexInfo 
object.
+ * }
+ * <p>
+ * See {@link IndexInfo} for a description of the serialized format.
+ * </p>
+ *
+ * <p>
+ * For each partition, the layout of the index file looks like this:
+ * </p>
+ * <ol>
+ *     <li>partition key - prefixed with {@code short} length</li>
+ *     <li>serialized {@code RowIndexEntry} objects</li>
+ * </ol>
+ *
+ * <p>
+ *     Generally, we distinguish between index entries that have <i>index
+ *     samples</i> (list of {@link IndexInfo} objects) and those who don't.
+ *     For each <i>portion</i> of data for a single partition in the data file,
+ *     an index sample is created. The size of that <i>portion</i> is defined
+ *     by {@link org.apache.cassandra.config.Config#column_index_size_in_kb}.
+ * </p>
+ * <p>
+ *     Index entries with less than 2 index samples, will just store the
+ *     position in the data file.
+ * </p>
+ * <p>
+ *     Note: legacy sstables for index entries are those sstable formats that
+ *     do <i>not</i> have an offsets table to index samples ({@link IndexInfo}
+ *     objects). These are those sstables created on Cassandra versions
+ *     earlier than 3.0.
+ * </p>
+ * <p>
+ *     For index entries with index samples we store the index samples
+ *     ({@link IndexInfo} objects). The bigger the partition, the more
+ *     index samples are created. Since a huge amount of index samples
+ *     will "pollute" the heap and cause huge GC pressure, Cassandra 3.6
+ *     (CASSANDRA-11206) distinguishes between index entries with an
+ *     "acceptable" amount of index samples per partition and those
+ *     with an "enormous" amount of index samples. The barrier
+ *     is controlled by the configuration parameter
+ *     {@link 
org.apache.cassandra.config.Config#column_index_cache_size_in_kb}.
+ *     Index entries with a total serialized size of index samples up to
+ *     {@code column_index_cache_size_in_kb} will be held in an array.
+ *     Index entries exceeding that value will always be accessed from
+ *     disk.
+ * </p>
+ * <p>
+ *     This results in these classes:
+ * </p>
+ * <ul>
+ *     <li>{@link RowIndexEntry} just stores the offset in the data file.</li>
+ *     <li>{@link IndexedEntry} is for index entries with index samples
+ *     and used for both current and legacy sstables, which do not exceed
+ *     {@link 
org.apache.cassandra.config.Config#column_index_cache_size_in_kb}.</li>
+ *     <li>{@link ShallowIndexedEntry} is for index entries with index samples
+ *     that exceed {@link 
org.apache.cassandra.config.Config#column_index_cache_size_in_kb}
+ *     for sstables with an offset table to the index samples.</li>
+ *     <li>{@link LegacyShallowIndexedEntry} is for index entries with index 
samples
+ *     that exceed {@link 
org.apache.cassandra.config.Config#column_index_cache_size_in_kb}
+ *     but for legacy sstables.</li>
+ * </ul>
+ * <p>
+ *     Since access to index samples on disk (obviously) requires some file
+ *     reader, that functionality is encapsulated in implementations of
+ *     {@link IndexInfoRetriever}. There is an implementation to access
+ *     index samples of legacy sstables (without the offsets table),
+ *     an implementation of access sstables with an offsets table.
+ * </p>
+ * <p>
+ *     Until now (Cassandra 3.x), we still support reading from <i>legacy</i> 
sstables -
+ *     i.e. sstables created by Cassandra &lt; 3.0 (see {@link 
org.apache.cassandra.io.sstable.format.big.BigFormat}.
+ * </p>
+ *
+ */
 public class RowIndexEntry<T> implements IMeasurableMemory
 {
     private static final long EMPTY_SIZE = ObjectSizes.measure(new 
RowIndexEntry(0));
 
-    public final long position;
+    // constants for type of row-index-entry as serialized for saved-cache
+    static final int CACHE_NOT_INDEXED = 0;
+    static final int CACHE_INDEXED = 1;
+    static final int CACHE_INDEXED_SHALLOW = 2;
 
-    public RowIndexEntry(long position)
-    {
-        this.position = position;
+    static final Histogram indexEntrySizeHistogram;
+    static final Histogram indexInfoCountHistogram;
+    static final Histogram indexInfoGetsHistogram;
+    static {
+        MetricNameFactory factory = new DefaultNameFactory("Index", 
"RowIndexEntry");
+        indexEntrySizeHistogram = 
Metrics.histogram(factory.createMetricName("IndexedEntrySize"), false);
+        indexInfoCountHistogram = 
Metrics.histogram(factory.createMetricName("IndexInfoCount"), false);
+        indexInfoGetsHistogram = 
Metrics.histogram(factory.createMetricName("IndexInfoGets"), false);
     }
 
-    protected int promotedSize(IndexHelper.IndexInfo.Serializer idxSerializer)
-    {
-        return 0;
-    }
+    public final long position;
 
-    public static RowIndexEntry<IndexHelper.IndexInfo> create(long position, 
DeletionTime deletionTime, ColumnIndex index)
+    public RowIndexEntry(long position)
     {
-        assert index != null;
-        assert deletionTime != null;
-
-        // we only consider the columns summary when determining whether to 
create an IndexedEntry,
-        // since if there are insufficient columns to be worth indexing we're 
going to seek to
-        // the beginning of the row anyway, so we might as well read the 
tombstone there as well.
-        if (index.columnsIndex.size() > 1)
-            return new IndexedEntry(position, deletionTime, 
index.partitionHeaderLength, index.columnsIndex);
-        else
-            return new RowIndexEntry<>(position);
+        this.position = position;
     }
 
     /**
@@ -70,21 +160,17 @@ public class RowIndexEntry<T> implements IMeasurableMemory
      */
     public boolean isIndexed()
     {
-        return !columnsIndex().isEmpty();
+        return columnsIndexCount() > 1;
     }
 
-    public DeletionTime deletionTime()
+    public boolean indexOnHeap()
     {
-        throw new UnsupportedOperationException();
+        return false;
     }
 
-    /**
-     * @return the offset to the start of the header information for this row.
-     * For some formats this may not be the start of the row.
-     */
-    public long headerOffset()
+    public DeletionTime deletionTime()
     {
-        return 0;
+        throw new UnsupportedOperationException();
     }
 
     /**
@@ -97,9 +183,9 @@ public class RowIndexEntry<T> implements IMeasurableMemory
         throw new UnsupportedOperationException();
     }
 
-    public List<T> columnsIndex()
+    public int columnsIndexCount()
     {
-        return Collections.emptyList();
+        return 0;
     }
 
     public long unsharedHeapSize()
@@ -107,129 +193,178 @@ public class RowIndexEntry<T> implements 
IMeasurableMemory
         return EMPTY_SIZE;
     }
 
+    /**
+     * @param dataFilePosition  position of the partition in the {@link 
org.apache.cassandra.io.sstable.Component.Type#DATA} file
+     * @param indexFilePosition position in the {@link 
org.apache.cassandra.io.sstable.Component.Type#PRIMARY_INDEX} of the {@link 
RowIndexEntry}
+     * @param deletionTime      deletion time of {@link RowIndexEntry}
+     * @param headerLength      deletion time of {@link RowIndexEntry}
+     * @param columnIndexCount  number of {@link IndexInfo} entries in the 
{@link RowIndexEntry}
+     * @param indexedPartSize   serialized size of all serialized {@link 
IndexInfo} objects and their offsets
+     * @param indexSamples      list with IndexInfo offsets (if total 
serialized size is less than {@link 
org.apache.cassandra.config.Config#column_index_cache_size_in_kb}
+     * @param offsets           offsets of IndexInfo offsets
+     * @param idxInfoSerializer the {@link IndexInfo} serializer
+     */
+    public static RowIndexEntry<IndexInfo> create(long dataFilePosition, long 
indexFilePosition,
+                                       DeletionTime deletionTime, long 
headerLength, int columnIndexCount,
+                                       int indexedPartSize,
+                                       List<IndexInfo> indexSamples, int[] 
offsets,
+                                       ISerializer<IndexInfo> 
idxInfoSerializer)
+    {
+        // If the "partition building code" in BigTableWriter.append() via 
ColumnIndex returns a list
+        // of IndexInfo objects, which is the case if the serialized size is 
less than
+        // Config.column_index_cache_size_in_kb, AND we have more than one 
IndexInfo object, we
+        // construct an IndexedEntry object. (note: indexSamples.size() and 
columnIndexCount have the same meaning)
+        if (indexSamples != null && indexSamples.size() > 1)
+            return new IndexedEntry(dataFilePosition, deletionTime, 
headerLength,
+                                    indexSamples.toArray(new 
IndexInfo[indexSamples.size()]), offsets,
+                                    indexedPartSize, idxInfoSerializer);
+        // Here we have to decide whether we have serialized IndexInfo objects 
that exceeds
+        // Config.column_index_cache_size_in_kb (not exceeding case covered 
above).
+        // Such a "big" indexed-entry is represented as a shallow one.
+        if (columnIndexCount > 1)
+            return new ShallowIndexedEntry(dataFilePosition, indexFilePosition,
+                                           deletionTime, headerLength, 
columnIndexCount,
+                                           indexedPartSize, idxInfoSerializer);
+        // Last case is that there are no index samples.
+        return new RowIndexEntry<>(dataFilePosition);
+    }
+
+    public IndexInfoRetriever openWithIndex(SegmentedFile indexFile)
+    {
+        return null;
+    }
+
     public interface IndexSerializer<T>
     {
-        void serialize(RowIndexEntry<T> rie, DataOutputPlus out) throws 
IOException;
-        RowIndexEntry<T> deserialize(DataInputPlus in) throws IOException;
-        int serializedSize(RowIndexEntry<T> rie);
+        void serialize(RowIndexEntry<T> rie, DataOutputPlus out, ByteBuffer 
indexInfo) throws IOException;
+        RowIndexEntry<T> deserialize(DataInputPlus in, long indexFilePosition) 
throws IOException;
+        void serializeForCache(RowIndexEntry<T> rie, DataOutputPlus out) 
throws IOException;
+        RowIndexEntry<T> deserializeForCache(DataInputPlus in) throws 
IOException;
+
+        long deserializePositionAndSkip(DataInputPlus in) throws IOException;
+
+        ISerializer<T> indexInfoSerializer();
     }
 
-    public static class Serializer implements 
IndexSerializer<IndexHelper.IndexInfo>
+    public static final class Serializer implements IndexSerializer<IndexInfo>
     {
-        private final IndexHelper.IndexInfo.Serializer idxSerializer;
+        private final IndexInfo.Serializer idxInfoSerializer;
         private final Version version;
 
         public Serializer(CFMetaData metadata, Version version, 
SerializationHeader header)
         {
-            this.idxSerializer = new 
IndexHelper.IndexInfo.Serializer(metadata, version, header);
+            this.idxInfoSerializer = 
metadata.serializers().indexInfoSerializer(version, header);
             this.version = version;
         }
 
-        public void serialize(RowIndexEntry<IndexHelper.IndexInfo> rie, 
DataOutputPlus out) throws IOException
+        public IndexInfo.Serializer indexInfoSerializer()
+        {
+            return idxInfoSerializer;
+        }
+
+        public void serialize(RowIndexEntry<IndexInfo> rie, DataOutputPlus 
out, ByteBuffer indexInfo) throws IOException
         {
             assert version.storeRows() : "We read old index files but we 
should never write them";
 
-            out.writeUnsignedVInt(rie.position);
-            out.writeUnsignedVInt(rie.promotedSize(idxSerializer));
+            rie.serialize(out, idxInfoSerializer, indexInfo);
+        }
 
-            if (rie.isIndexed())
-            {
-                out.writeUnsignedVInt(rie.headerLength());
-                DeletionTime.serializer.serialize(rie.deletionTime(), out);
-                out.writeUnsignedVInt(rie.columnsIndex().size());
+        public void serializeForCache(RowIndexEntry<IndexInfo> rie, 
DataOutputPlus out) throws IOException
+        {
+            assert version.storeRows();
 
-                // Calculate and write the offsets to the IndexInfo objects.
+            rie.serializeForCache(out);
+        }
 
-                int[] offsets = new int[rie.columnsIndex().size()];
+        public RowIndexEntry<IndexInfo> deserializeForCache(DataInputPlus in) 
throws IOException
+        {
+            assert version.storeRows();
 
-                if (out.hasPosition())
-                {
-                    // Out is usually a SequentialWriter, so using the 
file-pointer is fine to generate the offsets.
-                    // A DataOutputBuffer also works.
-                    long start = out.position();
-                    int i = 0;
-                    for (IndexHelper.IndexInfo info : rie.columnsIndex())
-                    {
-                        offsets[i] = i == 0 ? 0 : (int)(out.position() - 
start);
-                        i++;
-                        idxSerializer.serialize(info, out);
-                    }
-                }
-                else
-                {
-                    // Not sure this branch will ever be needed, but if it is 
called, it has to calculate the
-                    // serialized sizes instead of simply using the 
file-pointer.
-                    int i = 0;
-                    int offset = 0;
-                    for (IndexHelper.IndexInfo info : rie.columnsIndex())
-                    {
-                        offsets[i++] = offset;
-                        idxSerializer.serialize(info, out);
-                        offset += idxSerializer.serializedSize(info);
-                    }
-                }
+            long position = in.readUnsignedVInt();
 
-                for (int off : offsets)
-                    out.writeInt(off);
+            switch (in.readByte())
+            {
+                case CACHE_NOT_INDEXED:
+                    return new RowIndexEntry<>(position);
+                case CACHE_INDEXED:
+                    return new IndexedEntry(position, in, idxInfoSerializer, 
version, true);
+                case CACHE_INDEXED_SHALLOW:
+                    return new ShallowIndexedEntry(position, in, 
idxInfoSerializer);
+                default:
+                    throw new AssertionError();
             }
         }
 
-        public RowIndexEntry<IndexHelper.IndexInfo> deserialize(DataInputPlus 
in) throws IOException
+        public static void skipForCache(DataInputPlus in, Version version) 
throws IOException
         {
-            if (!version.storeRows())
-            {
-                long position = in.readLong();
-
-                int size = in.readInt();
-                if (size > 0)
-                {
-                    DeletionTime deletionTime = 
DeletionTime.serializer.deserialize(in);
-
-                    int entries = in.readInt();
-                    List<IndexHelper.IndexInfo> columnsIndex = new 
ArrayList<>(entries);
+            assert version.storeRows();
 
-                    long headerLength = 0L;
-                    for (int i = 0; i < entries; i++)
-                    {
-                        IndexHelper.IndexInfo info = 
idxSerializer.deserialize(in);
-                        columnsIndex.add(info);
-                        if (i == 0)
-                            headerLength = info.offset;
-                    }
-
-                    return new IndexedEntry(position, deletionTime, 
headerLength, columnsIndex);
-                }
-                else
-                {
-                    return new RowIndexEntry<>(position);
-                }
+            /* long position = */in.readUnsignedVInt();
+            switch (in.readByte())
+            {
+                case CACHE_NOT_INDEXED:
+                    break;
+                case CACHE_INDEXED:
+                    IndexedEntry.skipForCache(in);
+                    break;
+                case CACHE_INDEXED_SHALLOW:
+                    ShallowIndexedEntry.skipForCache(in);
+                    break;
+                default:
+                    assert false;
             }
+        }
+
+        public RowIndexEntry<IndexInfo> deserialize(DataInputPlus in, long 
indexFilePosition) throws IOException
+        {
+            if (!version.storeRows())
+                return LegacyShallowIndexedEntry.deserialize(in, 
indexFilePosition, idxInfoSerializer, version);
 
             long position = in.readUnsignedVInt();
 
             int size = (int)in.readUnsignedVInt();
-            if (size > 0)
+            if (size == 0)
+            {
+                return new RowIndexEntry<>(position);
+            }
+            else
             {
                 long headerLength = in.readUnsignedVInt();
                 DeletionTime deletionTime = 
DeletionTime.serializer.deserialize(in);
-                int entries = (int)in.readUnsignedVInt();
-                List<IndexHelper.IndexInfo> columnsIndex = new 
ArrayList<>(entries);
-                for (int i = 0; i < entries; i++)
-                    columnsIndex.add(idxSerializer.deserialize(in));
+                int columnsIndexCount = (int) in.readUnsignedVInt();
 
-                in.skipBytesFully(entries * TypeSizes.sizeof(0));
+                int indexedPartSize = size - serializedSize(deletionTime, 
headerLength, columnsIndexCount);
 
-                return new IndexedEntry(position, deletionTime, headerLength, 
columnsIndex);
-            }
-            else
-            {
-                return new RowIndexEntry<>(position);
+                if (size <= DatabaseDescriptor.getColumnIndexCacheSize())
+                {
+                    return new IndexedEntry(position, in, deletionTime, 
headerLength, columnsIndexCount,
+                                            idxInfoSerializer, version, 
indexedPartSize);
+                }
+                else
+                {
+                    in.skipBytes(indexedPartSize);
+
+                    return new ShallowIndexedEntry(position,
+                                                   indexFilePosition,
+                                                   deletionTime, headerLength, 
columnsIndexCount,
+                                                   indexedPartSize, 
idxInfoSerializer);
+                }
             }
         }
 
-        // Reads only the data 'position' of the index entry and returns it. 
Note that this left 'in' in the middle
-        // of reading an entry, so this is only useful if you know what you 
are doing and in most case 'deserialize'
-        // should be used instead.
+        public long deserializePositionAndSkip(DataInputPlus in) throws 
IOException
+        {
+            if (!version.storeRows())
+                return 
LegacyShallowIndexedEntry.deserializePositionAndSkip(in);
+
+            return ShallowIndexedEntry.deserializePositionAndSkip(in);
+        }
+
+        /**
+         * Reads only the data 'position' of the index entry and returns it. 
Note that this left 'in' in the middle
+         * of reading an entry, so this is only useful if you know what you 
are doing and in most case 'deserialize'
+         * should be used instead.
+         */
         public static long readPosition(DataInputPlus in, Version version) 
throws IOException
         {
             return version.storeRows() ? in.readUnsignedVInt() : in.readLong();
@@ -250,51 +385,295 @@ public class RowIndexEntry<T> implements 
IMeasurableMemory
             in.skipBytesFully(size);
         }
 
-        public int serializedSize(RowIndexEntry<IndexHelper.IndexInfo> rie)
+        public static void serializeOffsets(DataOutputBuffer out, int[] 
indexOffsets, int columnIndexCount) throws IOException
         {
-            assert version.storeRows() : "We read old index files but we 
should never write them";
+            for (int i = 0; i < columnIndexCount; i++)
+                out.writeInt(indexOffsets[i]);
+        }
+    }
+
+    private static int serializedSize(DeletionTime deletionTime, long 
headerLength, int columnIndexCount)
+    {
+        return TypeSizes.sizeofUnsignedVInt(headerLength)
+               + (int) DeletionTime.serializer.serializedSize(deletionTime)
+               + TypeSizes.sizeofUnsignedVInt(columnIndexCount);
+    }
+
+    public void serialize(DataOutputPlus out, IndexInfo.Serializer 
idxInfoSerializer, ByteBuffer indexInfo) throws IOException
+    {
+        out.writeUnsignedVInt(position);
+
+        out.writeUnsignedVInt(0);
+    }
+
+    public void serializeForCache(DataOutputPlus out) throws IOException
+    {
+        out.writeUnsignedVInt(position);
+
+        out.writeByte(CACHE_NOT_INDEXED);
+    }
+
+    private static final class LegacyShallowIndexedEntry extends 
RowIndexEntry<IndexInfo>
+    {
+        private static final long BASE_SIZE;
+        static
+        {
+            BASE_SIZE = ObjectSizes.measure(new LegacyShallowIndexedEntry(0, 
0, DeletionTime.LIVE, 0, new int[0], null, 0));
+        }
+
+        private final long indexFilePosition;
+        private final int[] offsets;
+        @Unmetered
+        private final IndexInfo.Serializer idxInfoSerializer;
+        private final DeletionTime deletionTime;
+        private final long headerLength;
+        private final int serializedSize;
+
+        private LegacyShallowIndexedEntry(long dataFilePosition, long 
indexFilePosition,
+                                          DeletionTime deletionTime, long 
headerLength,
+                                          int[] offsets, IndexInfo.Serializer 
idxInfoSerializer,
+                                          int serializedSize)
+        {
+            super(dataFilePosition);
+            this.deletionTime = deletionTime;
+            this.headerLength = headerLength;
+            this.indexFilePosition = indexFilePosition;
+            this.offsets = offsets;
+            this.idxInfoSerializer = idxInfoSerializer;
+            this.serializedSize = serializedSize;
+        }
+
+        @Override
+        public DeletionTime deletionTime()
+        {
+            return deletionTime;
+        }
+
+        @Override
+        public long headerLength()
+        {
+            return headerLength;
+        }
+
+        @Override
+        public long unsharedHeapSize()
+        {
+            return BASE_SIZE + offsets.length * TypeSizes.sizeof(0);
+        }
+
+        @Override
+        public int columnsIndexCount()
+        {
+            return offsets.length;
+        }
+
+        @Override
+        public void serialize(DataOutputPlus out, IndexInfo.Serializer 
idxInfoSerializer, ByteBuffer indexInfo)
+        {
+            throw new UnsupportedOperationException("serializing legacy index 
entries is not supported");
+        }
+
+        @Override
+        public void serializeForCache(DataOutputPlus out)
+        {
+            throw new UnsupportedOperationException("serializing legacy index 
entries is not supported");
+        }
 
-            int indexedSize = 0;
-            if (rie.isIndexed())
+        @Override
+        public IndexInfoRetriever openWithIndex(SegmentedFile indexFile)
+        {
+            int fieldsSize = (int) 
DeletionTime.serializer.serializedSize(deletionTime)
+                             + TypeSizes.sizeof(0); // columnIndexCount
+            indexEntrySizeHistogram.update(serializedSize);
+            indexInfoCountHistogram.update(offsets.length);
+            return new LegacyIndexInfoRetriever(indexFilePosition +
+                                                TypeSizes.sizeof(0L) + // 
position
+                                                TypeSizes.sizeof(0) + // 
indexInfoSize
+                                                fieldsSize,
+                                                offsets, 
indexFile.createReader(), idxInfoSerializer);
+        }
+
+        public static RowIndexEntry<IndexInfo> deserialize(DataInputPlus in, 
long indexFilePosition,
+                                                IndexInfo.Serializer 
idxInfoSerializer, Version version) throws IOException
+        {
+            long dataFilePosition = in.readLong();
+
+            int size = in.readInt();
+            if (size == 0)
             {
-                List<IndexHelper.IndexInfo> index = rie.columnsIndex();
+                return new RowIndexEntry<>(dataFilePosition);
+            }
+            else if (size <= DatabaseDescriptor.getColumnIndexCacheSize())
+            {
+                return new IndexedEntry(dataFilePosition, in, 
idxInfoSerializer, version, false);
+            }
+            else
+            {
+                DeletionTime deletionTime = 
DeletionTime.serializer.deserialize(in);
 
-                indexedSize += 
TypeSizes.sizeofUnsignedVInt(rie.headerLength());
-                indexedSize += 
DeletionTime.serializer.serializedSize(rie.deletionTime());
-                indexedSize += TypeSizes.sizeofUnsignedVInt(index.size());
+                // For legacy sstables (i.e. sstables pre-"ma", pre-3.0) we 
have to scan all serialized IndexInfo
+                // objects to calculate the offsets array. However, it might 
be possible to deserialize all
+                // IndexInfo objects here - but to just skip feels more gentle 
to the heap/GC.
 
-                for (IndexHelper.IndexInfo info : index)
-                    indexedSize += idxSerializer.serializedSize(info);
+                int entries = in.readInt();
+                int[] offsets = new int[entries];
+
+                TrackedDataInputPlus tracked = new TrackedDataInputPlus(in);
+                long start = tracked.getBytesRead();
+                long headerLength = 0L;
+                for (int i = 0; i < entries; i++)
+                {
+                    offsets[i] = (int) (tracked.getBytesRead() - start);
+                    if (i == 0)
+                    {
+                        IndexInfo info = 
idxInfoSerializer.deserialize(tracked);
+                        headerLength = info.offset;
+                    }
+                    else
+                        idxInfoSerializer.skip(tracked);
+                }
 
-                indexedSize += index.size() * TypeSizes.sizeof(0);
+                return new LegacyShallowIndexedEntry(dataFilePosition, 
indexFilePosition, deletionTime, headerLength, offsets, idxInfoSerializer, 
size);
             }
+        }
+
+        static long deserializePositionAndSkip(DataInputPlus in) throws 
IOException
+        {
+            long position = in.readLong();
+
+            int size = in.readInt();
+            if (size > 0)
+                in.skipBytesFully(size);
 
-            return TypeSizes.sizeofUnsignedVInt(rie.position) + 
TypeSizes.sizeofUnsignedVInt(indexedSize) + indexedSize;
+            return position;
+        }
+    }
+
+    private static final class LegacyIndexInfoRetriever extends 
FileIndexInfoRetriever
+    {
+        private final int[] offsets;
+
+        private LegacyIndexInfoRetriever(long indexFilePosition, int[] 
offsets, FileDataInput reader, IndexInfo.Serializer idxInfoSerializer)
+        {
+            super(indexFilePosition, offsets.length, reader, 
idxInfoSerializer);
+            this.offsets = offsets;
+        }
+
+        IndexInfo fetchIndex(int index) throws IOException
+        {
+            retrievals++;
+
+            // seek to posision of IndexInfo
+            indexReader.seek(indexInfoFilePosition + offsets[index]);
+
+            // deserialize IndexInfo
+            return idxInfoSerializer.deserialize(indexReader);
         }
     }
 
     /**
-     * An entry in the row index for a row whose columns are indexed.
+     * An entry in the row index for a row whose columns are indexed - used 
for both legacy and current formats.
      */
-    private static class IndexedEntry extends 
RowIndexEntry<IndexHelper.IndexInfo>
+    private static final class IndexedEntry extends RowIndexEntry<IndexInfo>
     {
-        private final DeletionTime deletionTime;
+        private static final long BASE_SIZE;
 
-        // The offset in the file when the index entry end
+        static
+        {
+            BASE_SIZE = ObjectSizes.measure(new IndexedEntry(0, 
DeletionTime.LIVE, 0, null, null, 0, null));
+        }
+
+        private final DeletionTime deletionTime;
         private final long headerLength;
-        private final List<IndexHelper.IndexInfo> columnsIndex;
-        private static final long BASE_SIZE =
-                ObjectSizes.measure(new IndexedEntry(0, DeletionTime.LIVE, 0, 
Arrays.<IndexHelper.IndexInfo>asList(null, null)))
-              + ObjectSizes.measure(new ArrayList<>(1));
 
-        private IndexedEntry(long position, DeletionTime deletionTime, long 
headerLength, List<IndexHelper.IndexInfo> columnsIndex)
+        private final IndexInfo[] columnsIndex;
+        private final int[] offsets;
+        private final int indexedPartSize;
+        @Unmetered
+        private final ISerializer<IndexInfo> idxInfoSerializer;
+
+        private IndexedEntry(long dataFilePosition, DeletionTime deletionTime, 
long headerLength,
+                             IndexInfo[] columnsIndex, int[] offsets,
+                             int indexedPartSize, ISerializer<IndexInfo> 
idxInfoSerializer)
         {
-            super(position);
-            assert deletionTime != null;
-            assert columnsIndex != null && columnsIndex.size() > 1;
-            this.deletionTime = deletionTime;
+            super(dataFilePosition);
+
             this.headerLength = headerLength;
+            this.deletionTime = deletionTime;
+
             this.columnsIndex = columnsIndex;
+            this.offsets = offsets;
+            this.indexedPartSize = indexedPartSize;
+            this.idxInfoSerializer = idxInfoSerializer;
+        }
+
+        private IndexedEntry(long dataFilePosition, DataInputPlus in,
+                             DeletionTime deletionTime, long headerLength, int 
columnIndexCount,
+                             IndexInfo.Serializer idxInfoSerializer,
+                             Version version, int indexedPartSize) throws 
IOException
+        {
+            super(dataFilePosition);
+
+            this.headerLength = headerLength;
+            this.deletionTime = deletionTime;
+            int columnsIndexCount = columnIndexCount;
+
+            this.columnsIndex = new IndexInfo[columnsIndexCount];
+            for (int i = 0; i < columnsIndexCount; i++)
+                this.columnsIndex[i] = idxInfoSerializer.deserialize(in);
+
+            int[] offsets = null;
+            if (version.storeRows())
+            {
+                offsets = new int[this.columnsIndex.length];
+                for (int i = 0; i < offsets.length; i++)
+                    offsets[i] = in.readInt();
+            }
+            this.offsets = offsets;
+
+            this.indexedPartSize = indexedPartSize;
+
+            this.idxInfoSerializer = idxInfoSerializer;
+        }
+
+        private IndexedEntry(long dataFilePosition, DataInputPlus in, 
IndexInfo.Serializer idxInfoSerializer, Version version, boolean forCache) 
throws IOException
+        {
+            super(dataFilePosition);
+
+            this.headerLength = in.readUnsignedVInt();
+            this.deletionTime = DeletionTime.serializer.deserialize(in);
+            int columnsIndexCount = (int) in.readUnsignedVInt();
+
+            TrackedDataInputPlus trackedIn = new TrackedDataInputPlus(in);
+
+            this.columnsIndex = new IndexInfo[columnsIndexCount];
+            for (int i = 0; i < columnsIndexCount; i++)
+                this.columnsIndex[i] = 
idxInfoSerializer.deserialize(trackedIn);
+
+            int[] offsets = null;
+            if (!forCache && version.storeRows())
+            {
+                offsets = new int[this.columnsIndex.length];
+                for (int i = 0; i < offsets.length; i++)
+                    offsets[i] = trackedIn.readInt();
+            }
+            this.offsets = offsets;
+
+            this.indexedPartSize = (int) trackedIn.getBytesRead();
+
+            this.idxInfoSerializer = idxInfoSerializer;
+        }
+
+        @Override
+        public boolean indexOnHeap()
+        {
+            return true;
+        }
+
+        @Override
+        public int columnsIndexCount()
+        {
+            return columnsIndex.length;
         }
 
         @Override
@@ -310,36 +689,337 @@ public class RowIndexEntry<T> implements 
IMeasurableMemory
         }
 
         @Override
-        public List<IndexHelper.IndexInfo> columnsIndex()
+        public IndexInfoRetriever openWithIndex(SegmentedFile indexFile)
+        {
+            indexEntrySizeHistogram.update(serializedSize(deletionTime, 
headerLength, columnsIndex.length) + indexedPartSize);
+            indexInfoCountHistogram.update(columnsIndex.length);
+            return new IndexInfoRetriever()
+            {
+                private int retrievals;
+
+                @Override
+                public IndexInfo columnsIndex(int index)
+                {
+                    retrievals++;
+                    return columnsIndex[index];
+                }
+
+                public void close()
+                {
+                    indexInfoGetsHistogram.update(retrievals);
+                }
+            };
+        }
+
+        @Override
+        public long unsharedHeapSize()
+        {
+            long entrySize = 0;
+            for (IndexInfo idx : columnsIndex)
+                entrySize += idx.unsharedHeapSize();
+            return BASE_SIZE
+                + entrySize
+                + ObjectSizes.sizeOfReferenceArray(columnsIndex.length);
+        }
+
+        @Override
+        public void serialize(DataOutputPlus out, IndexInfo.Serializer 
idxInfoSerializer, ByteBuffer indexInfo) throws IOException
+        {
+            assert indexedPartSize != Integer.MIN_VALUE;
+
+            out.writeUnsignedVInt(position);
+
+            out.writeUnsignedVInt(serializedSize(deletionTime, headerLength, 
columnsIndex.length) + indexedPartSize);
+
+            out.writeUnsignedVInt(headerLength);
+            DeletionTime.serializer.serialize(deletionTime, out);
+            out.writeUnsignedVInt(columnsIndex.length);
+            for (IndexInfo info : columnsIndex)
+                idxInfoSerializer.serialize(info, out);
+            for (int offset : offsets)
+                out.writeInt(offset);
+        }
+
+        @Override
+        public void serializeForCache(DataOutputPlus out) throws IOException
+        {
+            out.writeUnsignedVInt(position);
+            out.writeByte(CACHE_INDEXED);
+
+            out.writeUnsignedVInt(headerLength);
+            DeletionTime.serializer.serialize(deletionTime, out);
+            out.writeUnsignedVInt(columnsIndexCount());
+
+            for (IndexInfo indexInfo : columnsIndex)
+                idxInfoSerializer.serialize(indexInfo, out);
+        }
+
+        static void skipForCache(DataInputPlus in) throws IOException
+        {
+            /*long headerLength =*/in.readUnsignedVInt();
+            /*DeletionTime deletionTime = */DeletionTime.serializer.skip(in);
+            /*int columnsIndexCount = (int)*/in.readUnsignedVInt();
+
+            /*int indexedPartSize = (int)*/in.readUnsignedVInt();
+        }
+    }
+
+    /**
+     * An entry in the row index for a row whose columns are indexed and the 
{@link IndexInfo} objects
+     * are not read into the key cache.
+     */
+    private static final class ShallowIndexedEntry extends 
RowIndexEntry<IndexInfo>
+    {
+        private static final long BASE_SIZE;
+
+        static
+        {
+            BASE_SIZE = ObjectSizes.measure(new ShallowIndexedEntry(0, 0, 
DeletionTime.LIVE, 0, 10, 0, null));
+        }
+
+        private final long indexFilePosition;
+
+        private final DeletionTime deletionTime;
+        private final long headerLength;
+        private final int columnsIndexCount;
+
+        private final int indexedPartSize;
+        private final int offsetsOffset;
+        @Unmetered
+        private final ISerializer<IndexInfo> idxInfoSerializer;
+        private final int fieldsSerializedSize;
+
+        /**
+         * See {@link #create(long, long, DeletionTime, long, int, int, List, 
int[], ISerializer)} for a description
+         * of the parameters.
+         */
+        private ShallowIndexedEntry(long dataFilePosition, long 
indexFilePosition,
+                                    DeletionTime deletionTime, long 
headerLength, int columnIndexCount,
+                                    int indexedPartSize, 
ISerializer<IndexInfo> idxInfoSerializer)
+        {
+            super(dataFilePosition);
+
+            assert columnIndexCount > 1;
+
+            this.indexFilePosition = indexFilePosition;
+            this.headerLength = headerLength;
+            this.deletionTime = deletionTime;
+            this.columnsIndexCount = columnIndexCount;
+
+            this.indexedPartSize = indexedPartSize;
+            this.idxInfoSerializer = idxInfoSerializer;
+
+            this.fieldsSerializedSize = serializedSize(deletionTime, 
headerLength, columnIndexCount);
+            this.offsetsOffset = indexedPartSize + fieldsSerializedSize - 
columnsIndexCount * TypeSizes.sizeof(0);
+        }
+
+        /**
+         * Constructor for key-cache deserialization
+         */
+        private ShallowIndexedEntry(long dataFilePosition, DataInputPlus in, 
IndexInfo.Serializer idxInfoSerializer) throws IOException
+        {
+            super(dataFilePosition);
+
+            this.indexFilePosition = in.readUnsignedVInt();
+
+            this.headerLength = in.readUnsignedVInt();
+            this.deletionTime = DeletionTime.serializer.deserialize(in);
+            this.columnsIndexCount = (int) in.readUnsignedVInt();
+
+            this.indexedPartSize = (int) in.readUnsignedVInt();
+
+            this.idxInfoSerializer = idxInfoSerializer;
+
+            this.fieldsSerializedSize = serializedSize(deletionTime, 
headerLength, columnsIndexCount);
+            this.offsetsOffset = indexedPartSize + fieldsSerializedSize - 
columnsIndexCount * TypeSizes.sizeof(0);
+        }
+
+        @Override
+        public int columnsIndexCount()
         {
-            return columnsIndex;
+            return columnsIndexCount;
         }
 
         @Override
-        protected int promotedSize(IndexHelper.IndexInfo.Serializer 
idxSerializer)
+        public DeletionTime deletionTime()
         {
-            long size = TypeSizes.sizeofUnsignedVInt(headerLength)
-                      + DeletionTime.serializer.serializedSize(deletionTime)
-                      + TypeSizes.sizeofUnsignedVInt(columnsIndex.size()); // 
number of entries
-            for (IndexHelper.IndexInfo info : columnsIndex)
-                size += idxSerializer.serializedSize(info);
+            return deletionTime;
+        }
 
-            size += columnsIndex.size() * TypeSizes.sizeof(0);
+        @Override
+        public long headerLength()
+        {
+            return headerLength;
+        }
 
-            return Ints.checkedCast(size);
+        @Override
+        public IndexInfoRetriever openWithIndex(SegmentedFile indexFile)
+        {
+            indexEntrySizeHistogram.update(indexedPartSize + 
fieldsSerializedSize);
+            indexInfoCountHistogram.update(columnsIndexCount);
+            return new ShallowInfoRetriever(indexFilePosition +
+                                            
VIntCoding.computeUnsignedVIntSize(position) +
+                                            
VIntCoding.computeUnsignedVIntSize(indexedPartSize + fieldsSerializedSize) +
+                                            fieldsSerializedSize,
+                                            offsetsOffset - 
fieldsSerializedSize,
+                                            columnsIndexCount, 
indexFile.createReader(), idxInfoSerializer);
         }
 
         @Override
         public long unsharedHeapSize()
         {
-            long entrySize = 0;
-            for (IndexHelper.IndexInfo idx : columnsIndex)
-                entrySize += idx.unsharedHeapSize();
+            return BASE_SIZE;
+        }
 
-            return BASE_SIZE
-                   + entrySize
-                   + deletionTime.unsharedHeapSize()
-                   + ObjectSizes.sizeOfReferenceArray(columnsIndex.size());
+        @Override
+        public void serialize(DataOutputPlus out, IndexInfo.Serializer 
idxInfoSerializer, ByteBuffer indexInfo) throws IOException
+        {
+            out.writeUnsignedVInt(position);
+
+            out.writeUnsignedVInt(fieldsSerializedSize + indexInfo.limit());
+
+            out.writeUnsignedVInt(headerLength);
+            DeletionTime.serializer.serialize(deletionTime, out);
+            out.writeUnsignedVInt(columnsIndexCount);
+
+            out.write(indexInfo);
+        }
+
+        static long deserializePositionAndSkip(DataInputPlus in) throws 
IOException
+        {
+            long position = in.readUnsignedVInt();
+
+            int size = (int) in.readUnsignedVInt();
+            if (size > 0)
+                in.skipBytesFully(size);
+
+            return position;
+        }
+
+        @Override
+        public void serializeForCache(DataOutputPlus out) throws IOException
+        {
+            out.writeUnsignedVInt(position);
+            out.writeByte(CACHE_INDEXED_SHALLOW);
+
+            out.writeUnsignedVInt(indexFilePosition);
+
+            out.writeUnsignedVInt(headerLength);
+            DeletionTime.serializer.serialize(deletionTime, out);
+            out.writeUnsignedVInt(columnsIndexCount);
+
+            out.writeUnsignedVInt(indexedPartSize);
+        }
+
+        static void skipForCache(DataInputPlus in) throws IOException
+        {
+            /*long indexFilePosition =*/in.readUnsignedVInt();
+
+            /*long headerLength =*/in.readUnsignedVInt();
+            /*DeletionTime deletionTime = */DeletionTime.serializer.skip(in);
+            /*int columnsIndexCount = (int)*/in.readUnsignedVInt();
+
+            /*int indexedPartSize = (int)*/in.readUnsignedVInt();
+        }
+    }
+
+    private static final class ShallowInfoRetriever extends 
FileIndexInfoRetriever
+    {
+        private final int offsetsOffset;
+
+        private ShallowInfoRetriever(long indexInfoFilePosition, int 
offsetsOffset, int indexCount,
+                                     FileDataInput indexReader, 
ISerializer<IndexInfo> idxInfoSerializer)
+        {
+            super(indexInfoFilePosition, indexCount, indexReader, 
idxInfoSerializer);
+            this.offsetsOffset = offsetsOffset;
+        }
+
+        IndexInfo fetchIndex(int index) throws IOException
+        {
+            assert index >= 0 && index < indexCount;
+
+            retrievals++;
+
+            // seek to position in "offsets to IndexInfo" table
+            indexReader.seek(indexInfoFilePosition + offsetsOffset + index * 
TypeSizes.sizeof(0));
+
+            // read offset of IndexInfo
+            int indexInfoPos = indexReader.readInt();
+
+            // seek to posision of IndexInfo
+            indexReader.seek(indexInfoFilePosition + indexInfoPos);
+
+            // finally, deserialize IndexInfo
+            return idxInfoSerializer.deserialize(indexReader);
+        }
+    }
+
+    /**
+     * Base class to access {@link IndexInfo} objects.
+     */
+    public interface IndexInfoRetriever extends AutoCloseable
+    {
+        IndexInfo columnsIndex(int index) throws IOException;
+
+        void close() throws IOException;
+    }
+
+    /**
+     * Base class to access {@link IndexInfo} objects on disk that keeps 
already
+     * read {@link IndexInfo} on heap.
+     */
+    private abstract static class FileIndexInfoRetriever implements 
IndexInfoRetriever
+    {
+        final long indexInfoFilePosition;
+        final int indexCount;
+        final ISerializer<IndexInfo> idxInfoSerializer;
+        final FileDataInput indexReader;
+        int retrievals;
+
+        private IndexInfo[] lastIndexes;
+
+        /**
+         *
+         * @param indexInfoFilePosition offset of first serialized {@link 
IndexInfo} object
+         * @param indexCount number of {@link IndexInfo} objects
+         * @param indexReader file data input to access the index file, closed 
by this instance
+         * @param idxInfoSerializer the index serializer to deserialize {@link 
IndexInfo} objects
+         */
+        FileIndexInfoRetriever(long indexInfoFilePosition, int indexCount, 
FileDataInput indexReader, ISerializer<IndexInfo> idxInfoSerializer)
+        {
+            this.indexInfoFilePosition = indexInfoFilePosition;
+            this.indexCount = indexCount;
+            this.idxInfoSerializer = idxInfoSerializer;
+            this.indexReader = indexReader;
+        }
+
+        public final IndexInfo columnsIndex(int index) throws IOException
+        {
+            if (lastIndexes != null
+                && lastIndexes.length > index && lastIndexes[index] != null)
+            {
+                // return a previously read/deserialized IndexInfo
+                return lastIndexes[index];
+            }
+
+            if (lastIndexes == null)
+                lastIndexes = new IndexInfo[index + 1];
+            else if (lastIndexes.length <= index)
+                lastIndexes = Arrays.copyOf(lastIndexes, index + 1);
+
+            IndexInfo indexInfo = fetchIndex(index);
+            lastIndexes[index] = indexInfo;
+
+            return indexInfo;
+        }
+
+        abstract IndexInfo fetchIndex(int index) throws IOException;
+
+        public void close() throws IOException
+        {
+            indexReader.close();
+
+            indexInfoGetsHistogram.update(retrievals);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/db/SerializationHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SerializationHeader.java 
b/src/java/org/apache/cassandra/db/SerializationHeader.java
index 0fd1281..af2d434 100644
--- a/src/java/org/apache/cassandra/db/SerializationHeader.java
+++ b/src/java/org/apache/cassandra/db/SerializationHeader.java
@@ -29,7 +29,6 @@ import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.db.marshal.TypeParser;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
@@ -75,25 +74,6 @@ public class SerializationHeader
         return new SerializationHeader(true, metadata, 
metadata.partitionColumns(), EncodingStats.NO_STATS);
     }
 
-    public static SerializationHeader forKeyCache(CFMetaData metadata)
-    {
-        // We don't save type information in the key cache (we could change
-        // that but it's easier right now), so instead we simply use BytesType
-        // for both serialization and deserialization. Note that we also only
-        // serializer clustering prefixes in the key cache, so only the 
clusteringTypes
-        // really matter.
-        int size = metadata.clusteringColumns().size();
-        List<AbstractType<?>> clusteringTypes = new ArrayList<>(size);
-        for (int i = 0; i < size; i++)
-            clusteringTypes.add(BytesType.instance);
-        return new SerializationHeader(false,
-                                       BytesType.instance,
-                                       clusteringTypes,
-                                       PartitionColumns.NONE,
-                                       EncodingStats.NO_STATS,
-                                       Collections.<ByteBuffer, 
AbstractType<?>>emptyMap());
-    }
-
     public static SerializationHeader make(CFMetaData metadata, 
Collection<SSTableReader> sstables)
     {
         // The serialization header has to be computed before the start of 
compaction (since it's used to write)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/db/Serializers.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Serializers.java 
b/src/java/org/apache/cassandra/db/Serializers.java
index cef06a3..a0503e0 100644
--- a/src/java/org/apache/cassandra/db/Serializers.java
+++ b/src/java/org/apache/cassandra/db/Serializers.java
@@ -20,10 +20,15 @@ package org.apache.cassandra.db;
 import java.io.*;
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.io.ISerializer;
+import org.apache.cassandra.io.sstable.IndexInfo;
+import org.apache.cassandra.io.sstable.format.big.BigFormat;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.sstable.format.Version;
@@ -36,36 +41,66 @@ public class Serializers
 {
     private final CFMetaData metadata;
 
+    private Map<Version, IndexInfo.Serializer> 
otherVersionClusteringSerializers;
+
+    private final IndexInfo.Serializer latestVersionIndexSerializer;
+
     public Serializers(CFMetaData metadata)
     {
         this.metadata = metadata;
+        this.latestVersionIndexSerializer = new 
IndexInfo.Serializer(BigFormat.latestVersion,
+                                                                     
indexEntryClusteringPrefixSerializer(BigFormat.latestVersion, 
SerializationHeader.makeWithoutStats(metadata)));
+    }
+
+    IndexInfo.Serializer indexInfoSerializer(Version version, 
SerializationHeader header)
+    {
+        // null header indicates streaming from pre-3.0 sstables
+        if (version.equals(BigFormat.latestVersion) && header != null)
+            return latestVersionIndexSerializer;
+
+        if (otherVersionClusteringSerializers == null)
+            otherVersionClusteringSerializers = new ConcurrentHashMap<>();
+        IndexInfo.Serializer serializer = 
otherVersionClusteringSerializers.get(version);
+        if (serializer == null)
+        {
+            serializer = new IndexInfo.Serializer(version,
+                                                  
indexEntryClusteringPrefixSerializer(version, header));
+            otherVersionClusteringSerializers.put(version, serializer);
+        }
+        return serializer;
     }
 
     // TODO: Once we drop support for old (pre-3.0) sstables, we can drop this 
method and inline the calls to
-    // ClusteringPrefix.serializer in IndexHelper directly. At which point 
this whole class probably becomes
+    // ClusteringPrefix.serializer directly. At which point this whole class 
probably becomes
     // unecessary (since IndexInfo.Serializer won't depend on the metadata 
either).
-    public ISerializer<ClusteringPrefix> 
indexEntryClusteringPrefixSerializer(final Version version, final 
SerializationHeader header)
+    private ISerializer<ClusteringPrefix> 
indexEntryClusteringPrefixSerializer(Version version, SerializationHeader 
header)
     {
         if (!version.storeRows() || header ==  null) //null header indicates 
streaming from pre-3.0 sstables
         {
             return oldFormatSerializer(version);
         }
 
-        return newFormatSerializer(version, header);
+        return new NewFormatSerializer(version, header.clusteringTypes());
     }
 
-    private ISerializer<ClusteringPrefix> oldFormatSerializer(final Version 
version)
+    private ISerializer<ClusteringPrefix> oldFormatSerializer(Version version)
     {
         return new ISerializer<ClusteringPrefix>()
         {
-            SerializationHeader newHeader = 
SerializationHeader.makeWithoutStats(metadata);
+            List<AbstractType<?>> clusteringTypes = 
SerializationHeader.makeWithoutStats(metadata).clusteringTypes();
 
             public void serialize(ClusteringPrefix clustering, DataOutputPlus 
out) throws IOException
             {
                 //we deserialize in the old format and serialize in the new 
format
                 ClusteringPrefix.serializer.serialize(clustering, out,
                                                       
version.correspondingMessagingVersion(),
-                                                      
newHeader.clusteringTypes());
+                                                      clusteringTypes);
+            }
+
+            @Override
+            public void skip(DataInputPlus in) throws IOException
+            {
+                ByteBufferUtil.skipShortLength(in);
             }
 
             public ClusteringPrefix deserialize(DataInputPlus in) throws 
IOException
@@ -108,31 +143,41 @@ public class Serializers
             public long serializedSize(ClusteringPrefix clustering)
             {
                 return ClusteringPrefix.serializer.serializedSize(clustering, 
version.correspondingMessagingVersion(),
-                                                                  
newHeader.clusteringTypes());
+                                                                  
clusteringTypes);
             }
         };
     }
 
-
-    private ISerializer<ClusteringPrefix> newFormatSerializer(final Version 
version, final SerializationHeader header)
+    private static class NewFormatSerializer implements 
ISerializer<ClusteringPrefix>
     {
-        return new ISerializer<ClusteringPrefix>() //Reading and writing 
from/to the new sstable format
+        private final Version version;
+        private final List<AbstractType<?>> clusteringTypes;
+
+        NewFormatSerializer(Version version, List<AbstractType<?>> 
clusteringTypes)
         {
-            public void serialize(ClusteringPrefix clustering, DataOutputPlus 
out) throws IOException
-            {
-                ClusteringPrefix.serializer.serialize(clustering, out, 
version.correspondingMessagingVersion(), header.clusteringTypes());
-            }
+            this.version = version;
+            this.clusteringTypes = clusteringTypes;
+        }
 
-            public ClusteringPrefix deserialize(DataInputPlus in) throws 
IOException
-            {
-                return ClusteringPrefix.serializer.deserialize(in, 
version.correspondingMessagingVersion(), header.clusteringTypes());
-            }
+        public void serialize(ClusteringPrefix clustering, DataOutputPlus out) 
throws IOException
+        {
+            ClusteringPrefix.serializer.serialize(clustering, out, 
version.correspondingMessagingVersion(), clusteringTypes);
+        }
 
-            public long serializedSize(ClusteringPrefix clustering)
-            {
-                return ClusteringPrefix.serializer.serializedSize(clustering, 
version.correspondingMessagingVersion(), header.clusteringTypes());
-            }
-        };
-    }
+        @Override
+        public void skip(DataInputPlus in) throws IOException
+        {
+            ClusteringPrefix.serializer.skip(in, 
version.correspondingMessagingVersion(), clusteringTypes);
+        }
+
+        public ClusteringPrefix deserialize(DataInputPlus in) throws 
IOException
+        {
+            return ClusteringPrefix.serializer.deserialize(in, 
version.correspondingMessagingVersion(), clusteringTypes);
+        }
 
-}
\ No newline at end of file
+        public long serializedSize(ClusteringPrefix clustering)
+        {
+            return ClusteringPrefix.serializer.serializedSize(clustering, 
version.correspondingMessagingVersion(), clusteringTypes);
+        }
+    }
+}

Reply via email to