http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java 
b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
index 296d142..5ee46da 100644
--- 
a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
+++ 
b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
@@ -18,22 +18,23 @@
 package org.apache.cassandra.db.columniterator;
 
 import java.io.IOException;
+import java.util.Comparator;
 import java.util.Iterator;
-import java.util.List;
 import java.util.NoSuchElementException;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.io.sstable.IndexInfo;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
-import org.apache.cassandra.io.sstable.IndexHelper;
 import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.io.util.DataPosition;
+import org.apache.cassandra.io.util.SegmentedFile;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
-abstract class AbstractSSTableIterator implements UnfilteredRowIterator
+public abstract class AbstractSSTableIterator implements UnfilteredRowIterator
 {
     protected final SSTableReader sstable;
     protected final DecoratedKey key;
@@ -46,6 +47,8 @@ abstract class AbstractSSTableIterator implements 
UnfilteredRowIterator
 
     private final boolean isForThrift;
 
+    protected final SegmentedFile ifile;
+
     private boolean isClosed;
 
     protected final Slices slices;
@@ -59,9 +62,11 @@ abstract class AbstractSSTableIterator implements 
UnfilteredRowIterator
                                       RowIndexEntry indexEntry,
                                       Slices slices,
                                       ColumnFilter columnFilter,
-                                      boolean isForThrift)
+                                      boolean isForThrift,
+                                      SegmentedFile ifile)
     {
         this.sstable = sstable;
+        this.ifile = ifile;
         this.key = key;
         this.columns = columnFilter;
         this.slices = slices;
@@ -434,13 +439,13 @@ abstract class AbstractSSTableIterator implements 
UnfilteredRowIterator
     }
 
     // Used by indexed readers to store where they are of the index.
-    protected static class IndexState
+    public static class IndexState implements AutoCloseable
     {
         private final Reader reader;
         private final ClusteringComparator comparator;
 
         private final RowIndexEntry indexEntry;
-        private final List<IndexHelper.IndexInfo> indexes;
+        private final RowIndexEntry.IndexInfoRetriever indexInfoRetriever;
         private final boolean reversed;
 
         private int currentIndexIdx;
@@ -448,43 +453,43 @@ abstract class AbstractSSTableIterator implements 
UnfilteredRowIterator
         // Marks the beginning of the block corresponding to currentIndexIdx.
         private DataPosition mark;
 
-        public IndexState(Reader reader, ClusteringComparator comparator, 
RowIndexEntry indexEntry, boolean reversed)
+        public IndexState(Reader reader, ClusteringComparator comparator, 
RowIndexEntry indexEntry, boolean reversed, SegmentedFile indexFile)
         {
             this.reader = reader;
             this.comparator = comparator;
             this.indexEntry = indexEntry;
-            this.indexes = indexEntry.columnsIndex();
+            this.indexInfoRetriever = indexEntry.openWithIndex(indexFile);
             this.reversed = reversed;
-            this.currentIndexIdx = reversed ? indexEntry.columnsIndex().size() 
: -1;
+            this.currentIndexIdx = reversed ? indexEntry.columnsIndexCount() : 
-1;
         }
 
         public boolean isDone()
         {
-            return reversed ? currentIndexIdx < 0 : currentIndexIdx >= 
indexes.size();
+            return reversed ? currentIndexIdx < 0 : currentIndexIdx >= 
indexEntry.columnsIndexCount();
         }
 
         // Sets the reader to the beginning of blockIdx.
         public void setToBlock(int blockIdx) throws IOException
         {
-            if (blockIdx >= 0 && blockIdx < indexes.size())
+            if (blockIdx >= 0 && blockIdx < indexEntry.columnsIndexCount())
             {
                 reader.seekToPosition(columnOffset(blockIdx));
                 reader.deserializer.clearState();
             }
 
             currentIndexIdx = blockIdx;
-            reader.openMarker = blockIdx > 0 ? indexes.get(blockIdx - 
1).endOpenMarker : null;
+            reader.openMarker = blockIdx > 0 ? index(blockIdx - 
1).endOpenMarker : null;
             mark = reader.file.mark();
         }
 
-        private long columnOffset(int i)
+        private long columnOffset(int i) throws IOException
         {
-            return indexEntry.position + indexes.get(i).offset;
+            return indexEntry.position + index(i).offset;
         }
 
         public int blocksCount()
         {
-            return indexes.size();
+            return indexEntry.columnsIndexCount();
         }
 
         // Update the block idx based on the current reader position if we're 
past the current block.
@@ -503,7 +508,7 @@ abstract class AbstractSSTableIterator implements 
UnfilteredRowIterator
                 return;
             }
 
-            while (currentIndexIdx + 1 < indexes.size() && 
isPastCurrentBlock())
+            while (currentIndexIdx + 1 < indexEntry.columnsIndexCount() && 
isPastCurrentBlock())
             {
                 reader.openMarker = currentIndex().endOpenMarker;
                 ++currentIndexIdx;
@@ -526,7 +531,7 @@ abstract class AbstractSSTableIterator implements 
UnfilteredRowIterator
         }
 
         // Check if we've crossed an index boundary (based on the mark on the 
beginning of the index block).
-        public boolean isPastCurrentBlock()
+        public boolean isPastCurrentBlock() throws IOException
         {
             assert reader.deserializer != null;
             long correction = reader.deserializer.bytesReadForUnconsumedData();
@@ -538,32 +543,92 @@ abstract class AbstractSSTableIterator implements 
UnfilteredRowIterator
             return currentIndexIdx;
         }
 
-        public IndexHelper.IndexInfo currentIndex()
+        public IndexInfo currentIndex() throws IOException
         {
             return index(currentIndexIdx);
         }
 
-        public IndexHelper.IndexInfo index(int i)
+        public IndexInfo index(int i) throws IOException
         {
-            return indexes.get(i);
+            return indexInfoRetriever.columnsIndex(i);
         }
 
         // Finds the index of the first block containing the provided bound, 
starting at the provided index.
         // Will be -1 if the bound is before any block, and blocksCount() if 
it is after every block.
-        public int findBlockIndex(Slice.Bound bound, int fromIdx)
+        public int findBlockIndex(Slice.Bound bound, int fromIdx) throws 
IOException
         {
             if (bound == Slice.Bound.BOTTOM)
                 return -1;
             if (bound == Slice.Bound.TOP)
                 return blocksCount();
 
-            return IndexHelper.indexFor(bound, indexes, comparator, reversed, 
fromIdx);
+            return indexFor(bound, fromIdx);
+        }
+
+        public int indexFor(ClusteringPrefix name, int lastIndex) throws 
IOException
+        {
+            IndexInfo target = new IndexInfo(name, name, 0, 0, null);
+            /*
+            Take the example from the unit test, and say your index looks like 
this:
+            [0..5][10..15][20..25]
+            and you look for the slice [13..17].
+
+            When doing forward slice, we are doing a binary search comparing 
13 (the start of the query)
+            to the lastName part of the index slot. You'll end up with the 
"first" slot, going from left to right,
+            that may contain the start.
+
+            When doing a reverse slice, we do the same thing, only using as a 
start column the end of the query,
+            i.e. 17 in this example, compared to the firstName part of the 
index slots.  bsearch will give us the
+            first slot where firstName > start ([20..25] here), so we subtract 
an extra one to get the slot just before.
+            */
+            int startIdx = 0;
+            int endIdx = indexEntry.columnsIndexCount() - 1;
+
+            if (reversed)
+            {
+                if (lastIndex < endIdx)
+                {
+                    endIdx = lastIndex;
+                }
+            }
+            else
+            {
+                if (lastIndex > 0)
+                {
+                    startIdx = lastIndex;
+                }
+            }
+
+            int index = binarySearch(target, 
comparator.indexComparator(reversed), startIdx, endIdx);
+            return (index < 0 ? -index - (reversed ? 2 : 1) : index);
+        }
+
+        private int binarySearch(IndexInfo key, Comparator<IndexInfo> c, int 
low, int high) throws IOException {
+            while (low <= high) {
+                int mid = (low + high) >>> 1;
+                IndexInfo midVal = index(mid);
+                int cmp = c.compare(midVal, key);
+
+                if (cmp < 0)
+                    low = mid + 1;
+                else if (cmp > 0)
+                    high = mid - 1;
+                else
+                    return mid;
+            }
+            return -(low + 1);
         }
 
         @Override
         public String toString()
         {
-            return String.format("IndexState(indexSize=%d, currentBlock=%d, 
reversed=%b)", indexes.size(), currentIndexIdx, reversed);
+            return String.format("IndexState(indexSize=%d, currentBlock=%d, 
reversed=%b)", indexEntry.columnsIndexCount(), currentIndexIdx, reversed);
+        }
+
+        @Override
+        public void close() throws IOException
+        {
+            indexInfoRetriever.close();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java 
b/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
index 354564a..6b8f83f 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
@@ -25,6 +25,7 @@ import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.SegmentedFile;
 
 /**
  *  A Cell Iterator over SSTable
@@ -37,9 +38,10 @@ public class SSTableIterator extends AbstractSSTableIterator
                            RowIndexEntry indexEntry,
                            Slices slices,
                            ColumnFilter columns,
-                           boolean isForThrift)
+                           boolean isForThrift,
+                           SegmentedFile ifile)
     {
-        super(sstable, file, key, indexEntry, slices, columns, isForThrift);
+        super(sstable, file, key, indexEntry, slices, columns, isForThrift, 
ifile);
     }
 
     protected Reader createReaderInternal(RowIndexEntry indexEntry, 
FileDataInput file, boolean shouldCloseFile)
@@ -181,11 +183,18 @@ public class SSTableIterator extends 
AbstractSSTableIterator
         private ForwardIndexedReader(RowIndexEntry indexEntry, FileDataInput 
file, boolean shouldCloseFile)
         {
             super(file, shouldCloseFile);
-            this.indexState = new IndexState(this, 
sstable.metadata.comparator, indexEntry, false);
+            this.indexState = new IndexState(this, 
sstable.metadata.comparator, indexEntry, false, ifile);
             this.lastBlockIdx = indexState.blocksCount(); // if we never call 
setForSlice, that's where we want to stop
         }
 
         @Override
+        public void close() throws IOException
+        {
+            super.close();
+            this.indexState.close();
+        }
+
+        @Override
         public void setForSlice(Slice slice) throws IOException
         {
             super.setForSlice(slice);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java 
b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
index d8b41b4..7594cbd 100644
--- 
a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
+++ 
b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
@@ -27,6 +27,7 @@ import 
org.apache.cassandra.db.partitions.ImmutableBTreePartition;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.SegmentedFile;
 import org.apache.cassandra.utils.btree.BTree;
 
 /**
@@ -40,9 +41,10 @@ public class SSTableReversedIterator extends 
AbstractSSTableIterator
                                    RowIndexEntry indexEntry,
                                    Slices slices,
                                    ColumnFilter columns,
-                                   boolean isForThrift)
+                                   boolean isForThrift,
+                                   SegmentedFile ifile)
     {
-        super(sstable, file, key, indexEntry, slices, columns, isForThrift);
+        super(sstable, file, key, indexEntry, slices, columns, isForThrift, 
ifile);
     }
 
     protected Reader createReaderInternal(RowIndexEntry indexEntry, 
FileDataInput file, boolean shouldCloseFile)
@@ -132,7 +134,7 @@ public class SSTableReversedIterator extends 
AbstractSSTableIterator
             return iterator.next();
         }
 
-        protected boolean stopReadingDisk()
+        protected boolean stopReadingDisk() throws IOException
         {
             return false;
         }
@@ -204,7 +206,14 @@ public class SSTableReversedIterator extends 
AbstractSSTableIterator
         private ReverseIndexedReader(RowIndexEntry indexEntry, FileDataInput 
file, boolean shouldCloseFile)
         {
             super(file, shouldCloseFile);
-            this.indexState = new IndexState(this, 
sstable.metadata.comparator, indexEntry, true);
+            this.indexState = new IndexState(this, 
sstable.metadata.comparator, indexEntry, true, ifile);
+        }
+
+        @Override
+        public void close() throws IOException
+        {
+            super.close();
+            this.indexState.close();
         }
 
         @Override
@@ -304,7 +313,7 @@ public class SSTableReversedIterator extends 
AbstractSSTableIterator
         }
 
         @Override
-        protected boolean stopReadingDisk()
+        protected boolean stopReadingDisk() throws IOException
         {
             return indexState.isPastCurrentBlock();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java 
b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 187caa3..67d351a 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -151,7 +151,7 @@ public class Scrubber implements Closeable
             if (indexAvailable())
             {
                 // throw away variable so we don't have a side effect in the 
assert
-                long firstRowPositionFromIndex = 
rowIndexEntrySerializer.deserialize(indexFile).position;
+                long firstRowPositionFromIndex = 
rowIndexEntrySerializer.deserializePositionAndSkip(indexFile);
                 assert firstRowPositionFromIndex == 0 : 
firstRowPositionFromIndex;
             }
 
@@ -338,7 +338,7 @@ public class Scrubber implements Closeable
 
             nextRowPositionFromIndex = !indexAvailable()
                     ? dataFile.length()
-                    : rowIndexEntrySerializer.deserialize(indexFile).position;
+                    : 
rowIndexEntrySerializer.deserializePositionAndSkip(indexFile);
         }
         catch (Throwable th)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/db/compaction/Verifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java 
b/src/java/org/apache/cassandra/db/compaction/Verifier.java
index 227b209..bb8bcdb 100644
--- a/src/java/org/apache/cassandra/db/compaction/Verifier.java
+++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java
@@ -128,7 +128,7 @@ public class Verifier implements Closeable
         {
             ByteBuffer nextIndexKey = 
ByteBufferUtil.readWithShortLength(indexFile);
             {
-                long firstRowPositionFromIndex = 
rowIndexEntrySerializer.deserialize(indexFile).position;
+                long firstRowPositionFromIndex = 
rowIndexEntrySerializer.deserializePositionAndSkip(indexFile);
                 if (firstRowPositionFromIndex != 0)
                     markAndThrow();
             }
@@ -162,7 +162,7 @@ public class Verifier implements Closeable
                     nextIndexKey = indexFile.isEOF() ? null : 
ByteBufferUtil.readWithShortLength(indexFile);
                     nextRowPositionFromIndex = indexFile.isEOF()
                                              ? dataFile.length()
-                                             : 
rowIndexEntrySerializer.deserialize(indexFile).position;
+                                             : 
rowIndexEntrySerializer.deserializePositionAndSkip(indexFile);
                 }
                 catch (Throwable th)
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java
 
b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java
index 4f55677..5c04966 100644
--- 
a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java
+++ 
b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java
@@ -1,5 +1,6 @@
 package org.apache.cassandra.db.rows;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Comparator;
 import java.util.List;
@@ -8,7 +9,7 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ClusteringIndexFilter;
 import org.apache.cassandra.db.filter.ColumnFilter;
-import org.apache.cassandra.io.sstable.IndexHelper;
+import org.apache.cassandra.io.sstable.IndexInfo;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 import org.apache.cassandra.thrift.ThriftResultsMerger;
@@ -60,6 +61,7 @@ public class UnfilteredRowIteratorWithLowerBound extends 
LazilyInitializedUnfilt
         // The partition index lower bound is more accurate than the sstable 
metadata lower bound but it is only
         // present if the iterator has already been initialized, which we only 
do when there are tombstones since in
         // this case we cannot use the sstable metadata clustering values
+
         RangeTombstone.Bound ret = getPartitionIndexLowerBound();
         return ret != null ? makeBound(ret) : 
makeBound(getMetadataLowerBound());
     }
@@ -158,27 +160,34 @@ public class UnfilteredRowIteratorWithLowerBound extends 
LazilyInitializedUnfilt
      */
     private RangeTombstone.Bound getPartitionIndexLowerBound()
     {
+        // NOTE: CASSANDRA-11206 removed the lookup against the key-cache as 
the IndexInfo objects are no longer
+        // in memory for not heap backed IndexInfo objects (so, these are on 
disk).
+        // CASSANDRA-11369 is there to fix this afterwards.
+
         // Creating the iterator ensures that rowIndexEntry is loaded if 
available (partitions bigger than
         // DatabaseDescriptor.column_index_size_in_kb)
         if (!canUseMetadataLowerBound())
             maybeInit();
 
         RowIndexEntry rowIndexEntry = 
sstable.getCachedPosition(partitionKey(), false);
-        if (rowIndexEntry == null)
-            return null;
-
-        List<IndexHelper.IndexInfo> columns = rowIndexEntry.columnsIndex();
-        if (columns.size() == 0)
+        if (rowIndexEntry == null || !rowIndexEntry.indexOnHeap())
             return null;
 
-        IndexHelper.IndexInfo column = columns.get(filter.isReversed() ? 
columns.size() - 1 : 0);
-        ClusteringPrefix lowerBoundPrefix = filter.isReversed() ? 
column.lastName : column.firstName;
-        assert lowerBoundPrefix.getRawValues().length <= 
sstable.metadata.comparator.size() :
+        try (RowIndexEntry.IndexInfoRetriever onHeapRetriever = 
rowIndexEntry.openWithIndex(null))
+        {
+            IndexInfo column = 
onHeapRetriever.columnsIndex(filter.isReversed() ? 
rowIndexEntry.columnsIndexCount() - 1 : 0);
+            ClusteringPrefix lowerBoundPrefix = filter.isReversed() ? 
column.lastName : column.firstName;
+            assert lowerBoundPrefix.getRawValues().length <= 
sstable.metadata.comparator.size() :
             String.format("Unexpected number of clustering values %d, expected 
%d or fewer for %s",
                           lowerBoundPrefix.getRawValues().length,
                           sstable.metadata.comparator.size(),
                           sstable.getFilename());
-        return RangeTombstone.Bound.inclusiveOpen(filter.isReversed(), 
lowerBoundPrefix.getRawValues());
+            return RangeTombstone.Bound.inclusiveOpen(filter.isReversed(), 
lowerBoundPrefix.getRawValues());
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException("should never occur", e);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java 
b/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java
index 205bf7e..6eb8b0a 100644
--- a/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java
+++ b/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java
@@ -73,7 +73,7 @@ class SASIIndexBuilder extends SecondaryIndexBuilder
                         try
                         {
                             RowIndexEntry indexEntry = 
sstable.getPosition(key, SSTableReader.Operator.EQ);
-                            dataFile.seek(indexEntry.position + 
indexEntry.headerOffset());
+                            dataFile.seek(indexEntry.position);
                             ByteBufferUtil.readWithShortLength(dataFile); // 
key
 
                             try (SSTableIdentityIterator partition = new 
SSTableIdentityIterator(sstable, dataFile, key))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/io/ISerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/ISerializer.java 
b/src/java/org/apache/cassandra/io/ISerializer.java
index 562d226..637a1c7 100644
--- a/src/java/org/apache/cassandra/io/ISerializer.java
+++ b/src/java/org/apache/cassandra/io/ISerializer.java
@@ -43,4 +43,9 @@ public interface ISerializer<T>
     public T deserialize(DataInputPlus in) throws IOException;
 
     public long serializedSize(T t);
+
+    public default void skip(DataInputPlus in) throws IOException
+    {
+        deserialize(in);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexHelper.java 
b/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
deleted file mode 100644
index 74a0fc5..0000000
--- a/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.sstable;
-
-import java.io.*;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.io.ISerializer;
-import org.apache.cassandra.io.sstable.format.Version;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.utils.*;
-
-/**
- * Provides helper to serialize, deserialize and use column indexes.
- */
-public final class IndexHelper
-{
-    private IndexHelper()
-    {
-    }
-
-    /**
-     * The index of the IndexInfo in which a scan starting with @name should 
begin.
-     *
-     * @param name name to search for
-     * @param indexList list of the indexInfo objects
-     * @param comparator the comparator to use
-     * @param reversed whether or not the search is reversed, i.e. we scan 
forward or backward from name
-     * @param lastIndex where to start the search from in indexList
-     *
-     * @return int index
-     */
-    public static int indexFor(ClusteringPrefix name, List<IndexInfo> 
indexList, ClusteringComparator comparator, boolean reversed, int lastIndex)
-    {
-        IndexInfo target = new IndexInfo(name, name, 0, 0, null);
-        /*
-        Take the example from the unit test, and say your index looks like 
this:
-        [0..5][10..15][20..25]
-        and you look for the slice [13..17].
-
-        When doing forward slice, we are doing a binary search comparing 13 
(the start of the query)
-        to the lastName part of the index slot. You'll end up with the "first" 
slot, going from left to right,
-        that may contain the start.
-
-        When doing a reverse slice, we do the same thing, only using as a 
start column the end of the query,
-        i.e. 17 in this example, compared to the firstName part of the index 
slots.  bsearch will give us the
-        first slot where firstName > start ([20..25] here), so we subtract an 
extra one to get the slot just before.
-        */
-        int startIdx = 0;
-        List<IndexInfo> toSearch = indexList;
-        if (reversed)
-        {
-            if (lastIndex < indexList.size() - 1)
-            {
-                toSearch = indexList.subList(0, lastIndex + 1);
-            }
-        }
-        else
-        {
-            if (lastIndex > 0)
-            {
-                startIdx = lastIndex;
-                toSearch = indexList.subList(lastIndex, indexList.size());
-            }
-        }
-        int index = Collections.binarySearch(toSearch, target, 
comparator.indexComparator(reversed));
-        return startIdx + (index < 0 ? -index - (reversed ? 2 : 1) : index);
-    }
-
-    public static class IndexInfo
-    {
-        private static final long EMPTY_SIZE = ObjectSizes.measure(new 
IndexInfo(null, null, 0, 0, null));
-
-        public final long offset;
-        public final long width;
-        public final ClusteringPrefix firstName;
-        public final ClusteringPrefix lastName;
-
-        // If at the end of the index block there is an open range tombstone 
marker, this marker
-        // deletion infos. null otherwise.
-        public final DeletionTime endOpenMarker;
-
-        public IndexInfo(ClusteringPrefix firstName,
-                         ClusteringPrefix lastName,
-                         long offset,
-                         long width,
-                         DeletionTime endOpenMarker)
-        {
-            this.firstName = firstName;
-            this.lastName = lastName;
-            this.offset = offset;
-            this.width = width;
-            this.endOpenMarker = endOpenMarker;
-        }
-
-        public static class Serializer
-        {
-            // This is the default index size that we use to delta-encode 
width when serializing so we get better vint-encoding.
-            // This is imperfect as user can change the index size and ideally 
we would save the index size used with each index file
-            // to use as base. However, that's a bit more involved a change 
that we want for now and very seldom do use change the index
-            // size so using the default is almost surely better than using no 
base at all.
-            public static final long WIDTH_BASE = 64 * 1024;
-
-            private final ISerializer<ClusteringPrefix> clusteringSerializer;
-            private final Version version;
-
-            public Serializer(CFMetaData metadata, Version version, 
SerializationHeader header)
-            {
-                this.clusteringSerializer = 
metadata.serializers().indexEntryClusteringPrefixSerializer(version, header);
-                this.version = version;
-            }
-
-            public void serialize(IndexInfo info, DataOutputPlus out) throws 
IOException
-            {
-                assert version.storeRows() : "We read old index files but we 
should never write them";
-
-                clusteringSerializer.serialize(info.firstName, out);
-                clusteringSerializer.serialize(info.lastName, out);
-                out.writeUnsignedVInt(info.offset);
-                out.writeVInt(info.width - WIDTH_BASE);
-
-                out.writeBoolean(info.endOpenMarker != null);
-                if (info.endOpenMarker != null)
-                    DeletionTime.serializer.serialize(info.endOpenMarker, out);
-            }
-
-            public IndexInfo deserialize(DataInputPlus in) throws IOException
-            {
-                ClusteringPrefix firstName = 
clusteringSerializer.deserialize(in);
-                ClusteringPrefix lastName = 
clusteringSerializer.deserialize(in);
-                long offset;
-                long width;
-                DeletionTime endOpenMarker = null;
-                if (version.storeRows())
-                {
-                    offset = in.readUnsignedVInt();
-                    width = in.readVInt() + WIDTH_BASE;
-                    if (in.readBoolean())
-                        endOpenMarker = 
DeletionTime.serializer.deserialize(in);
-                }
-                else
-                {
-                    offset = in.readLong();
-                    width = in.readLong();
-                }
-                return new IndexInfo(firstName, lastName, offset, width, 
endOpenMarker);
-            }
-
-            public long serializedSize(IndexInfo info)
-            {
-                assert version.storeRows() : "We read old index files but we 
should never write them";
-
-                long size = clusteringSerializer.serializedSize(info.firstName)
-                          + clusteringSerializer.serializedSize(info.lastName)
-                          + TypeSizes.sizeofUnsignedVInt(info.offset)
-                          + TypeSizes.sizeofVInt(info.width - WIDTH_BASE)
-                          + TypeSizes.sizeof(info.endOpenMarker != null);
-
-                if (info.endOpenMarker != null)
-                    size += 
DeletionTime.serializer.serializedSize(info.endOpenMarker);
-                return size;
-            }
-        }
-
-        public long unsharedHeapSize()
-        {
-            return EMPTY_SIZE
-                 + firstName.unsharedHeapSize()
-                 + lastName.unsharedHeapSize()
-                 + (endOpenMarker == null ? 0 : 
endOpenMarker.unsharedHeapSize());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/io/sstable/IndexInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexInfo.java 
b/src/java/org/apache/cassandra/io/sstable/IndexInfo.java
new file mode 100644
index 0000000..b07ce4a
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/IndexInfo.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.io.IOException;
+
+import org.apache.cassandra.db.ClusteringPrefix;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.ISerializer;
+import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.ObjectSizes;
+
+/**
+ * {@code IndexInfo} is embedded in the indexed version of {@link 
RowIndexEntry}.
+ * Each instance roughly covers a range of {@link 
org.apache.cassandra.config.Config#column_index_size_in_kb 
column_index_size_in_kb} kB
+ * and contains the first and last clustering value (or slice bound), its 
offset in the data file and width in the data file.
+ * <p>
+ * Each {@code IndexInfo} object is serialized as follows.
+ * </p>
+ * <p>
+ * Serialization format changed in 3.0. First, the {@code endOpenMarker} has 
been introduced.
+ * Second, the <i>order</i> of the fields in serialized representation changed 
to allow future
+ * optimizations to access {@code offset} and {@code width} fields directly 
without skipping
+ * {@code firstName}/{@code lastName}.
+ * </p>
+ * <p>
+ * {@code
+ *    (*) IndexInfo.firstName (ClusteringPrefix serializer, either 
Clustering.serializer.serialize or Slice.Bound.serializer.serialize)
+ *    (*) IndexInfo.lastName (ClusteringPrefix serializer, either 
Clustering.serializer.serialize or Slice.Bound.serializer.serialize)
+ * (long) IndexInfo.offset
+ * (long) IndexInfo.width
+ * (bool) IndexInfo.endOpenMarker != null              (if 3.0)
+ *  (int) IndexInfo.endOpenMarker.localDeletionTime    (if 3.0 && 
IndexInfo.endOpenMarker != null)
+ * (long) IndexInfo.endOpenMarker.markedForDeletionAt  (if 3.0 && 
IndexInfo.endOpenMarker != null)
+ * }
+ * </p>
+ */
+public class IndexInfo
+{
+    private static final long EMPTY_SIZE = ObjectSizes.measure(new 
IndexInfo(null, null, 0, 0, null));
+
+    public final long offset;
+    public final long width;
+    public final ClusteringPrefix firstName;
+    public final ClusteringPrefix lastName;
+
+    // If at the end of the index block there is an open range tombstone 
marker, this marker
+    // deletion infos. null otherwise.
+    public final DeletionTime endOpenMarker;
+
+    public IndexInfo(ClusteringPrefix firstName,
+                     ClusteringPrefix lastName,
+                     long offset,
+                     long width,
+                     DeletionTime endOpenMarker)
+    {
+        this.firstName = firstName;
+        this.lastName = lastName;
+        this.offset = offset;
+        this.width = width;
+        this.endOpenMarker = endOpenMarker;
+    }
+
+    public static class Serializer implements ISerializer<IndexInfo>
+    {
+        // This is the default index size that we use to delta-encode width 
when serializing so we get better vint-encoding.
+        // This is imperfect as user can change the index size and ideally we 
would save the index size used with each index file
+        // to use as base. However, that's a bit more involved a change that 
we want for now and very seldom do use change the index
+        // size so using the default is almost surely better than using no 
base at all.
+        public static final long WIDTH_BASE = 64 * 1024;
+
+        private final ISerializer<ClusteringPrefix> clusteringSerializer;
+        private final Version version;
+
+        public Serializer(Version version, ISerializer<ClusteringPrefix> 
clusteringSerializer)
+        {
+            this.clusteringSerializer = clusteringSerializer;
+            this.version = version;
+        }
+
+        public void serialize(IndexInfo info, DataOutputPlus out) throws 
IOException
+        {
+            assert version.storeRows() : "We read old index files but we 
should never write them";
+
+            clusteringSerializer.serialize(info.firstName, out);
+            clusteringSerializer.serialize(info.lastName, out);
+            out.writeUnsignedVInt(info.offset);
+            out.writeVInt(info.width - WIDTH_BASE);
+
+            out.writeBoolean(info.endOpenMarker != null);
+            if (info.endOpenMarker != null)
+                DeletionTime.serializer.serialize(info.endOpenMarker, out);
+        }
+
+        public void skip(DataInputPlus in) throws IOException
+        {
+            clusteringSerializer.skip(in);
+            clusteringSerializer.skip(in);
+            if (version.storeRows())
+            {
+                in.readUnsignedVInt();
+                in.readVInt();
+                if (in.readBoolean())
+                    DeletionTime.serializer.skip(in);
+            }
+            else
+            {
+                in.skipBytes(TypeSizes.sizeof(0L));
+                in.skipBytes(TypeSizes.sizeof(0L));
+            }
+        }
+
+        public IndexInfo deserialize(DataInputPlus in) throws IOException
+        {
+            ClusteringPrefix firstName = clusteringSerializer.deserialize(in);
+            ClusteringPrefix lastName = clusteringSerializer.deserialize(in);
+            long offset;
+            long width;
+            DeletionTime endOpenMarker = null;
+            if (version.storeRows())
+            {
+                offset = in.readUnsignedVInt();
+                width = in.readVInt() + WIDTH_BASE;
+                if (in.readBoolean())
+                    endOpenMarker = DeletionTime.serializer.deserialize(in);
+            }
+            else
+            {
+                offset = in.readLong();
+                width = in.readLong();
+            }
+            return new IndexInfo(firstName, lastName, offset, width, 
endOpenMarker);
+        }
+
+        public long serializedSize(IndexInfo info)
+        {
+            assert version.storeRows() : "We read old index files but we 
should never write them";
+
+            long size = clusteringSerializer.serializedSize(info.firstName)
+                        + clusteringSerializer.serializedSize(info.lastName)
+                        + TypeSizes.sizeofUnsignedVInt(info.offset)
+                        + TypeSizes.sizeofVInt(info.width - WIDTH_BASE)
+                        + TypeSizes.sizeof(info.endOpenMarker != null);
+
+            if (info.endOpenMarker != null)
+                size += 
DeletionTime.serializer.serializedSize(info.endOpenMarker);
+            return size;
+        }
+    }
+
+    public long unsharedHeapSize()
+    {
+        return EMPTY_SIZE
+             + firstName.unsharedHeapSize()
+             + lastName.unsharedHeapSize()
+             + (endOpenMarker == null ? 0 : endOpenMarker.unsharedHeapSize());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java 
b/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java
index 1286f16..e68ca2a 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java
@@ -18,16 +18,10 @@
 package org.apache.cassandra.io.sstable.format;
 
 import com.google.common.base.CharMatcher;
-import com.google.common.collect.ImmutableList;
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.LegacyLayout;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.SerializationHeader;
-import org.apache.cassandra.db.compaction.CompactionController;
 import org.apache.cassandra.io.sstable.format.big.BigFormat;
-import org.apache.cassandra.io.util.FileDataInput;
-
-import java.util.Iterator;
 
 /**
  * Provides the accessors to data on disk.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java 
b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 99adb8d..3181a55 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -823,12 +823,11 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
             try (IndexSummaryBuilder summaryBuilder = summaryLoaded ? null : 
new IndexSummaryBuilder(estimatedKeys, metadata.params.minIndexInterval, 
samplingLevel))
             {
                 long indexPosition;
-                RowIndexEntry.IndexSerializer rowIndexSerializer = 
descriptor.getFormat().getIndexSerializer(metadata, descriptor.version, header);
 
                 while ((indexPosition = primaryIndex.getFilePointer()) != 
indexSize)
                 {
                     ByteBuffer key = 
ByteBufferUtil.readWithShortLength(primaryIndex);
-                    /*RowIndexEntry indexEntry = 
*/rowIndexSerializer.deserialize(primaryIndex);
+                    RowIndexEntry.Serializer.skip(primaryIndex, 
descriptor.version);
                     DecoratedKey decoratedKey = decorateKey(key);
                     if (first == null)
                         first = decoratedKey;
@@ -1822,7 +1821,7 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
             // this saves index summary lookup and index file iteration which 
whould be pretty costly
             // especially in presence of promoted column indexes
             if (isKeyCacheSetup())
-                cacheKey(key, rowIndexEntrySerializer.deserialize(in));
+                cacheKey(key, rowIndexEntrySerializer.deserialize(in, 
in.getFilePointer()));
         }
 
         return key;
@@ -2021,6 +2020,11 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
         return ifile.channel;
     }
 
+    public SegmentedFile getIndexFile()
+    {
+        return ifile;
+    }
+
     /**
      * @param component component to get timestamp.
      * @return last modified time for given component. 0 if given component 
does not exist or IO error occurs.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java 
b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
index 0112e55..7a7ce8c 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
@@ -70,8 +70,8 @@ public class BigTableReader extends SSTableReader
         if (indexEntry == null)
             return UnfilteredRowIterators.noRowsIterator(metadata, key, 
Rows.EMPTY_STATIC_ROW, DeletionTime.LIVE, reversed);
         return reversed
-             ? new SSTableReversedIterator(this, file, key, indexEntry, 
slices, selectedColumns, isForThrift)
-             : new SSTableIterator(this, file, key, indexEntry, slices, 
selectedColumns, isForThrift);
+             ? new SSTableReversedIterator(this, file, key, indexEntry, 
slices, selectedColumns, isForThrift, ifile)
+             : new SSTableIterator(this, file, key, indexEntry, slices, 
selectedColumns, isForThrift, ifile);
     }
 
     /**
@@ -230,7 +230,7 @@ public class BigTableReader extends SSTableReader
                 if (opSatisfied)
                 {
                     // read data position from index entry
-                    RowIndexEntry indexEntry = 
rowIndexEntrySerializer.deserialize(in);
+                    RowIndexEntry indexEntry = 
rowIndexEntrySerializer.deserialize(in, in.getFilePointer());
                     if (exactMatch && updateCacheAndStats)
                     {
                         assert key instanceof DecoratedKey; // key can be == 
to the index key only if it's a true row key
@@ -252,7 +252,7 @@ public class BigTableReader extends SSTableReader
                     }
                     if (op == Operator.EQ && updateCacheAndStats)
                         bloomFilterTracker.addTruePositive();
-                    Tracing.trace("Partition index with {} entries found for 
sstable {}", indexEntry.columnsIndex().size(), descriptor.generation);
+                    Tracing.trace("Partition index with {} entries found for 
sstable {}", indexEntry.columnsIndexCount(), descriptor.generation);
                     return indexEntry;
                 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java 
b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
index a14056b..16a0aed 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
@@ -288,7 +288,7 @@ public class BigTableScanner implements ISSTableScanner
                             return endOfData();
 
                         currentKey = 
sstable.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
-                        currentEntry = 
rowIndexEntrySerializer.deserialize(ifile);
+                        currentEntry = 
rowIndexEntrySerializer.deserialize(ifile, ifile.getFilePointer());
                     } while (!currentRange.contains(currentKey));
                 }
                 else
@@ -307,7 +307,7 @@ public class BigTableScanner implements ISSTableScanner
                 {
                     // we need the position of the start of the next key, 
regardless of whether it falls in the current range
                     nextKey = 
sstable.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
-                    nextEntry = rowIndexEntrySerializer.deserialize(ifile);
+                    nextEntry = rowIndexEntrySerializer.deserialize(ifile, 
ifile.getFilePointer());
 
                     if (!currentRange.contains(nextKey))
                     {
@@ -329,7 +329,7 @@ public class BigTableScanner implements ISSTableScanner
                         {
                             if (dataRange == null)
                             {
-                                dfile.seek(currentEntry.position + 
currentEntry.headerOffset());
+                                dfile.seek(currentEntry.position);
                                 ByteBufferUtil.readWithShortLength(dfile); // 
key
                                 return new SSTableIdentityIterator(sstable, 
dfile, partitionKey());
                             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index 42f923a..bbb22d4 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.io.sstable.format.big;
 
 import java.io.*;
+import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Map;
 
@@ -110,7 +111,7 @@ public class BigTableWriter extends SSTableWriter
         return (lastWrittenKey == null) ? 0 : dataFile.position();
     }
 
-    private void afterAppend(DecoratedKey decoratedKey, long dataEnd, 
RowIndexEntry index) throws IOException
+    private void afterAppend(DecoratedKey decoratedKey, long dataEnd, 
RowIndexEntry index, ByteBuffer indexInfo) throws IOException
     {
         metadataCollector.addKey(decoratedKey.getKey());
         lastWrittenKey = decoratedKey;
@@ -120,7 +121,7 @@ public class BigTableWriter extends SSTableWriter
 
         if (logger.isTraceEnabled())
             logger.trace("wrote {} at {}", decoratedKey, dataEnd);
-        iwriter.append(decoratedKey, index, dataEnd);
+        iwriter.append(decoratedKey, index, dataEnd, indexInfo);
     }
 
     /**
@@ -150,15 +151,27 @@ public class BigTableWriter extends SSTableWriter
 
         try (UnfilteredRowIterator collecting = Transformation.apply(iterator, 
new StatsCollector(metadataCollector)))
         {
-            ColumnIndex index = ColumnIndex.writeAndBuildIndex(collecting, 
dataFile, header, observers, descriptor.version);
+            ColumnIndex columnIndex = new ColumnIndex(header, dataFile, 
descriptor.version, observers,
+                                                      
getRowIndexEntrySerializer().indexInfoSerializer());
 
-            RowIndexEntry entry = RowIndexEntry.create(startPosition, 
collecting.partitionLevelDeletion(), index);
+            columnIndex.buildRowIndex(collecting);
+
+            // afterAppend() writes the partition key before the first 
RowIndexEntry - so we have to add it's
+            // serialized size to the index-writer position
+            long indexFilePosition = 
ByteBufferUtil.serializedSizeWithShortLength(key.getKey()) + 
iwriter.indexFile.position();
+
+            RowIndexEntry entry = RowIndexEntry.create(startPosition, 
indexFilePosition,
+                                                       
collecting.partitionLevelDeletion(), columnIndex.headerLength, 
columnIndex.columnIndexCount,
+                                                       
columnIndex.indexInfoSerializedSize(),
+                                                       
columnIndex.indexSamples,
+                                                       columnIndex.offsets(),
+                                                       
getRowIndexEntrySerializer().indexInfoSerializer());
 
             long endPosition = dataFile.position();
             long rowSize = endPosition - startPosition;
             maybeLogLargePartitionWarning(key, rowSize);
             metadataCollector.addPartitionSizeInBytes(rowSize);
-            afterAppend(key, endPosition, entry);
+            afterAppend(key, endPosition, entry, columnIndex.buffer());
             return entry;
         }
         catch (IOException e)
@@ -167,6 +180,11 @@ public class BigTableWriter extends SSTableWriter
         }
     }
 
+    private RowIndexEntry.IndexSerializer<IndexInfo> 
getRowIndexEntrySerializer()
+    {
+        return (RowIndexEntry.IndexSerializer<IndexInfo>) 
rowIndexEntrySerializer;
+    }
+
     private void maybeLogLargePartitionWarning(DecoratedKey key, long rowSize)
     {
         if (rowSize > 
DatabaseDescriptor.getCompactionLargePartitionWarningThreshold())
@@ -403,14 +421,14 @@ public class BigTableWriter extends SSTableWriter
             return summary.getLastReadableBoundary();
         }
 
-        public void append(DecoratedKey key, RowIndexEntry indexEntry, long 
dataEnd) throws IOException
+        public void append(DecoratedKey key, RowIndexEntry indexEntry, long 
dataEnd, ByteBuffer indexInfo) throws IOException
         {
             bf.add(key);
             long indexStart = indexFile.position();
             try
             {
                 ByteBufferUtil.writeWithShortLength(key.getKey(), indexFile);
-                rowIndexEntrySerializer.serialize(indexEntry, indexFile);
+                rowIndexEntrySerializer.serialize(indexEntry, indexFile, 
indexInfo);
             }
             catch (IOException e)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CacheService.java 
b/src/java/org/apache/cassandra/service/CacheService.java
index 9bda3a0..552642c 100644
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@ -465,7 +465,9 @@ public class CacheService implements CacheServiceMBean
             ByteBufferUtil.writeWithLength(key.key, out);
             out.writeInt(key.desc.generation);
             out.writeBoolean(true);
-            key.desc.getFormat().getIndexSerializer(cfs.metadata, 
key.desc.version, 
SerializationHeader.forKeyCache(cfs.metadata)).serialize(entry, out);
+
+            SerializationHeader header = new SerializationHeader(false, 
cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS);
+            key.desc.getFormat().getIndexSerializer(cfs.metadata, 
key.desc.version, header).serializeForCache(entry, out);
         }
 
         public Future<Pair<KeyCacheKey, RowIndexEntry>> 
deserialize(DataInputPlus input, ColumnFamilyStore cfs) throws IOException
@@ -481,20 +483,20 @@ public class CacheService implements CacheServiceMBean
             ByteBuffer key = ByteBufferUtil.read(input, keyLength);
             int generation = input.readInt();
             input.readBoolean(); // backwards compatibility for "promoted 
indexes" boolean
-            SSTableReader reader = null;
+            SSTableReader reader;
             if (cfs == null || !cfs.isKeyCacheEnabled() || (reader = 
findDesc(generation, cfs.getSSTables(SSTableSet.CANONICAL))) == null)
             {
                 // The sstable doesn't exist anymore, so we can't be sure of 
the exact version and assume its the current version. The only case where we'll 
be
                 // wrong is during upgrade, in which case we fail at 
deserialization. This is not a huge deal however since 1) this is unlikely 
enough that
                 // this won't affect many users (if any) and only once, 2) 
this doesn't prevent the node from starting and 3) CASSANDRA-10219 shows that 
this
                 // part of the code has been broken for a while without anyone 
noticing (it is, btw, still broken until CASSANDRA-10219 is fixed).
-                RowIndexEntry.Serializer.skip(input, 
BigFormat.instance.getLatestVersion());
+                RowIndexEntry.Serializer.skipForCache(input, 
BigFormat.instance.getLatestVersion());
                 return null;
             }
             RowIndexEntry.IndexSerializer<?> indexSerializer = 
reader.descriptor.getFormat().getIndexSerializer(reader.metadata,
                                                                                
                                 reader.descriptor.version,
-                                                                               
                                 SerializationHeader.forKeyCache(cfs.metadata));
-            RowIndexEntry entry = indexSerializer.deserialize(input);
+                                                                               
                                 reader.header);
+            RowIndexEntry<?> entry = 
indexSerializer.deserializeForCache(input);
             return Futures.immediateFuture(Pair.create(new 
KeyCacheKey(cfs.metadata.ksAndCFName, reader.descriptor, key), entry));
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java 
b/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java
index 0c7e8a5..c952470 100644
--- a/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java
+++ b/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.cache;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.AsciiType;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
@@ -51,9 +52,23 @@ public class AutoSavingCacheTest
     }
 
     @Test
+    public void testSerializeAndLoadKeyCache0kB() throws Exception
+    {
+        DatabaseDescriptor.setColumnIndexCacheSize(0);
+        doTestSerializeAndLoadKeyCache();
+    }
+
+    @Test
     public void testSerializeAndLoadKeyCache() throws Exception
     {
+        DatabaseDescriptor.setColumnIndexCacheSize(8);
+        doTestSerializeAndLoadKeyCache();
+    }
+
+    private static void doTestSerializeAndLoadKeyCache() throws Exception
+    {
         ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
+        cfs.truncateBlocking();
         for (int i = 0; i < 2; i++)
         {
             ColumnDefinition colDef = 
ColumnDefinition.regularDef(cfs.metadata, ByteBufferUtil.bytes("col1"), 
AsciiType.instance);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java 
b/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java
index 2529de1..21a17fa 100644
--- a/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java
+++ b/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java
@@ -28,6 +28,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import org.apache.cassandra.cache.KeyCacheKey;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.index.Index;
@@ -81,7 +82,20 @@ public class KeyCacheCqlTest extends CQLTester
                                      
"0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789";
 
     @Test
-    public void testSliceQueries() throws Throwable
+    public void testSliceQueriesShallowIndexEntry() throws Throwable
+    {
+        DatabaseDescriptor.setColumnIndexCacheSize(0);
+        testSliceQueries();
+    }
+
+    @Test
+    public void testSliceQueriesIndexInfoOnHeap() throws Throwable
+    {
+        DatabaseDescriptor.setColumnIndexCacheSize(8);
+        testSliceQueries();
+    }
+
+    private void testSliceQueries() throws Throwable
     {
         createTable("CREATE TABLE %s (pk text, ck1 int, ck2 int, val text, vpk 
text, vck1 int, vck2 int, PRIMARY KEY (pk, ck1, ck2))");
 
@@ -165,7 +179,20 @@ public class KeyCacheCqlTest extends CQLTester
     }
 
     @Test
-    public void test2iKeyCachePaths() throws Throwable
+    public void test2iKeyCachePathsShallowIndexEntry() throws Throwable
+    {
+        DatabaseDescriptor.setColumnIndexCacheSize(0);
+        test2iKeyCachePaths();
+    }
+
+    @Test
+    public void test2iKeyCachePathsIndexInfoOnHeap() throws Throwable
+    {
+        DatabaseDescriptor.setColumnIndexCacheSize(8);
+        test2iKeyCachePaths();
+    }
+
+    private void test2iKeyCachePaths() throws Throwable
     {
         String table = createTable("CREATE TABLE %s ("
                                    + commonColumnsDef
@@ -242,7 +269,20 @@ public class KeyCacheCqlTest extends CQLTester
     }
 
     @Test
-    public void test2iKeyCachePathsSaveKeysForDroppedTable() throws Throwable
+    public void test2iKeyCachePathsSaveKeysForDroppedTableShallowIndexEntry() 
throws Throwable
+    {
+        DatabaseDescriptor.setColumnIndexCacheSize(0);
+        test2iKeyCachePathsSaveKeysForDroppedTable();
+    }
+
+    @Test
+    public void test2iKeyCachePathsSaveKeysForDroppedTableIndexInfoOnHeap() 
throws Throwable
+    {
+        DatabaseDescriptor.setColumnIndexCacheSize(8);
+        test2iKeyCachePathsSaveKeysForDroppedTable();
+    }
+
+    private void test2iKeyCachePathsSaveKeysForDroppedTable() throws Throwable
     {
         String table = createTable("CREATE TABLE %s ("
                                    + commonColumnsDef
@@ -302,7 +342,20 @@ public class KeyCacheCqlTest extends CQLTester
     }
 
     @Test
-    public void testKeyCacheNonClustered() throws Throwable
+    public void testKeyCacheNonClusteredShallowIndexEntry() throws Throwable
+    {
+        DatabaseDescriptor.setColumnIndexCacheSize(0);
+        testKeyCacheNonClustered();
+    }
+
+    @Test
+    public void testKeyCacheNonClusteredIndexInfoOnHeap() throws Throwable
+    {
+        DatabaseDescriptor.setColumnIndexCacheSize(8);
+        testKeyCacheNonClustered();
+    }
+
+    private void testKeyCacheNonClustered() throws Throwable
     {
         String table = createTable("CREATE TABLE %s ("
                                    + commonColumnsDef
@@ -335,7 +388,20 @@ public class KeyCacheCqlTest extends CQLTester
     }
 
     @Test
-    public void testKeyCacheClustered() throws Throwable
+    public void testKeyCacheClusteredShallowIndexEntry() throws Throwable
+    {
+        DatabaseDescriptor.setColumnIndexCacheSize(0);
+        testKeyCacheClustered();
+    }
+
+    @Test
+    public void testKeyCacheClusteredIndexInfoOnHeap() throws Throwable
+    {
+        DatabaseDescriptor.setColumnIndexCacheSize(8);
+        testKeyCacheClustered();
+    }
+
+    private void testKeyCacheClustered() throws Throwable
     {
         String table = createTable("CREATE TABLE %s ("
                                    + commonColumnsDef

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/test/unit/org/apache/cassandra/cql3/PagingQueryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/PagingQueryTest.java 
b/test/unit/org/apache/cassandra/cql3/PagingQueryTest.java
new file mode 100644
index 0000000..8f5f282
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/PagingQueryTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3;
+
+import java.util.Iterator;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.junit.Test;
+
+import com.datastax.driver.core.*;
+import com.datastax.driver.core.ResultSet;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class PagingQueryTest extends CQLTester
+{
+    @Test
+    public void pagingOnRegularColumn() throws Throwable
+    {
+        createTable("CREATE TABLE %s (" +
+                    " k1 int," +
+                    " c1 int," +
+                    " c2 int," +
+                    " v1 text," +
+                    " v2 text," +
+                    " v3 text," +
+                    " v4 text," +
+                    "PRIMARY KEY (k1, c1, c2))");
+
+        for (int c1 = 0; c1 < 100; c1++)
+        {
+            for (int c2 = 0; c2 < 100; c2++)
+            {
+                execute("INSERT INTO %s (k1, c1, c2, v1, v2, v3, v4) VALUES 
(?, ?, ?, ?, ?, ?, ?)", 1, c1, c2,
+                        Integer.toString(c1), Integer.toString(c2), 
someText(), someText());
+            }
+
+            if (c1 % 30 == 0)
+                flush();
+        }
+
+        flush();
+
+        try (Session session = sessionNet())
+        {
+            SimpleStatement stmt = new SimpleStatement("SELECT c1, c2, v1, v2 
FROM " + KEYSPACE + '.' + currentTable() + " WHERE k1 = 1");
+            stmt.setFetchSize(3);
+            ResultSet rs = session.execute(stmt);
+            Iterator<Row> iter = rs.iterator();
+            for (int c1 = 0; c1 < 100; c1++)
+            {
+                for (int c2 = 0; c2 < 100; c2++)
+                {
+                    assertTrue(iter.hasNext());
+                    Row row = iter.next();
+                    String msg = "On " + c1 + ',' + c2;
+                    assertEquals(msg, c1, row.getInt(0));
+                    assertEquals(msg, c2, row.getInt(1));
+                    assertEquals(msg, Integer.toString(c1), row.getString(2));
+                    assertEquals(msg, Integer.toString(c2), row.getString(3));
+                }
+            }
+            assertFalse(iter.hasNext());
+
+            for (int c1 = 0; c1 < 100; c1++)
+            {
+                stmt = new SimpleStatement("SELECT c1, c2, v1, v2 FROM " + 
KEYSPACE + '.' + currentTable() + " WHERE k1 = 1 AND c1 = ?", c1);
+                stmt.setFetchSize(3);
+                rs = session.execute(stmt);
+                iter = rs.iterator();
+                for (int c2 = 0; c2 < 100; c2++)
+                {
+                    assertTrue(iter.hasNext());
+                    Row row = iter.next();
+                    String msg = "Within " + c1 + " on " + c2;
+                    assertEquals(msg, c1, row.getInt(0));
+                    assertEquals(msg, c2, row.getInt(1));
+                    assertEquals(msg, Integer.toString(c1), row.getString(2));
+                    assertEquals(msg, Integer.toString(c2), row.getString(3));
+                }
+                assertFalse(iter.hasNext());
+            }
+        }
+    }
+
+    private static String someText()
+    {
+        char[] arr = new char[1024];
+        for (int i = 0; i < arr.length; i++)
+            arr[i] = (char)(32 + ThreadLocalRandom.current().nextInt(95));
+        return new String(arr);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/test/unit/org/apache/cassandra/cql3/TombstonesWithIndexedSSTableTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/cql3/TombstonesWithIndexedSSTableTest.java 
b/test/unit/org/apache/cassandra/cql3/TombstonesWithIndexedSSTableTest.java
index 3042acd..41a7d91 100644
--- a/test/unit/org/apache/cassandra/cql3/TombstonesWithIndexedSSTableTest.java
+++ b/test/unit/org/apache/cassandra/cql3/TombstonesWithIndexedSSTableTest.java
@@ -24,8 +24,8 @@ import org.junit.Test;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.Int32Type;
-import org.apache.cassandra.io.sstable.IndexHelper;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class TombstonesWithIndexedSSTableTest extends CQLTester
@@ -76,13 +76,17 @@ public class TombstonesWithIndexedSSTableTest extends 
CQLTester
             {
                 // The line below failed with key caching off (CASSANDRA-11158)
                 @SuppressWarnings("unchecked")
-                RowIndexEntry<IndexHelper.IndexInfo> indexEntry = 
sstable.getPosition(dk, SSTableReader.Operator.EQ);
+                RowIndexEntry indexEntry = sstable.getPosition(dk, 
SSTableReader.Operator.EQ);
                 if (indexEntry != null && indexEntry.isIndexed())
                 {
-                    ClusteringPrefix firstName = 
indexEntry.columnsIndex().get(1).firstName;
-                    if (firstName.kind().isBoundary())
-                        break deletionLoop;
-                    indexedRow = Int32Type.instance.compose(firstName.get(0));
+                    try (FileDataInput reader = sstable.openIndexReader())
+                    {
+                        RowIndexEntry.IndexInfoRetriever infoRetriever = 
indexEntry.openWithIndex(sstable.getIndexFile());
+                        ClusteringPrefix firstName = 
infoRetriever.columnsIndex(1).firstName;
+                        if (firstName.kind().isBoundary())
+                            break deletionLoop;
+                        indexedRow = 
Int32Type.instance.compose(firstName.get(0));
+                    }
                 }
             }
             assert indexedRow >= 0;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/test/unit/org/apache/cassandra/db/KeyCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/KeyCacheTest.java 
b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
index 515d30e..ada6b5b 100644
--- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
@@ -17,16 +17,15 @@
  */
 package org.apache.cassandra.db;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.Uninterruptibles;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -34,9 +33,7 @@ import org.junit.Test;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.cache.KeyCacheKey;
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@ -54,6 +51,9 @@ public class KeyCacheTest
     private static final String COLUMN_FAMILY1 = "Standard1";
     private static final String COLUMN_FAMILY2 = "Standard2";
     private static final String COLUMN_FAMILY3 = "Standard3";
+    private static final String COLUMN_FAMILY4 = "Standard4";
+    private static final String COLUMN_FAMILY5 = "Standard5";
+    private static final String COLUMN_FAMILY6 = "Standard6";
 
 
     @BeforeClass
@@ -64,7 +64,10 @@ public class KeyCacheTest
                                     KeyspaceParams.simple(1),
                                     SchemaLoader.standardCFMD(KEYSPACE1, 
COLUMN_FAMILY1),
                                     SchemaLoader.standardCFMD(KEYSPACE1, 
COLUMN_FAMILY2),
-                                    SchemaLoader.standardCFMD(KEYSPACE1, 
COLUMN_FAMILY3));
+                                    SchemaLoader.standardCFMD(KEYSPACE1, 
COLUMN_FAMILY3),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, 
COLUMN_FAMILY4),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, 
COLUMN_FAMILY5),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, 
COLUMN_FAMILY6));
     }
 
     @AfterClass
@@ -74,42 +77,61 @@ public class KeyCacheTest
     }
 
     @Test
-    public void testKeyCacheLoad() throws Exception
+    public void testKeyCacheLoadShallowIndexEntry() throws Exception
+    {
+        DatabaseDescriptor.setColumnIndexCacheSize(0);
+        testKeyCacheLoad(COLUMN_FAMILY2);
+    }
+
+    @Test
+    public void testKeyCacheLoadIndexInfoOnHeap() throws Exception
+    {
+        DatabaseDescriptor.setColumnIndexCacheSize(8);
+        testKeyCacheLoad(COLUMN_FAMILY5);
+    }
+
+    private void testKeyCacheLoad(String cf) throws Exception
     {
         CompactionManager.instance.disableAutoCompaction();
 
-        ColumnFamilyStore store = 
Keyspace.open(KEYSPACE1).getColumnFamilyStore(COLUMN_FAMILY2);
+        ColumnFamilyStore store = 
Keyspace.open(KEYSPACE1).getColumnFamilyStore(cf);
 
         // empty the cache
         CacheService.instance.invalidateKeyCache();
-        assertKeyCacheSize(0, KEYSPACE1, COLUMN_FAMILY2);
+        assertKeyCacheSize(0, KEYSPACE1, cf);
 
         // insert data and force to disk
-        SchemaLoader.insertData(KEYSPACE1, COLUMN_FAMILY2, 0, 100);
+        SchemaLoader.insertData(KEYSPACE1, cf, 0, 100);
         store.forceBlockingFlush();
 
         // populate the cache
-        readData(KEYSPACE1, COLUMN_FAMILY2, 0, 100);
-        assertKeyCacheSize(100, KEYSPACE1, COLUMN_FAMILY2);
+        readData(KEYSPACE1, cf, 0, 100);
+        assertKeyCacheSize(100, KEYSPACE1, cf);
 
         // really? our caches don't implement the map interface? (hence no 
.addAll)
-        Map<KeyCacheKey, RowIndexEntry> savedMap = new HashMap<KeyCacheKey, 
RowIndexEntry>();
+        Map<KeyCacheKey, RowIndexEntry> savedMap = new HashMap<>();
+        Map<KeyCacheKey, RowIndexEntry.IndexInfoRetriever> savedInfoMap = new 
HashMap<>();
         for (Iterator<KeyCacheKey> iter = 
CacheService.instance.keyCache.keyIterator();
              iter.hasNext();)
         {
             KeyCacheKey k = iter.next();
-            if (k.desc.ksname.equals(KEYSPACE1) && 
k.desc.cfname.equals(COLUMN_FAMILY2))
-                savedMap.put(k, CacheService.instance.keyCache.get(k));
+            if (k.desc.ksname.equals(KEYSPACE1) && k.desc.cfname.equals(cf))
+            {
+                RowIndexEntry rie = CacheService.instance.keyCache.get(k);
+                savedMap.put(k, rie);
+                SSTableReader sstr = readerForKey(k);
+                savedInfoMap.put(k, rie.openWithIndex(sstr.getIndexFile()));
+            }
         }
 
         // force the cache to disk
         CacheService.instance.keyCache.submitWrite(Integer.MAX_VALUE).get();
 
         CacheService.instance.invalidateKeyCache();
-        assertKeyCacheSize(0, KEYSPACE1, COLUMN_FAMILY2);
+        assertKeyCacheSize(0, KEYSPACE1, cf);
 
         CacheService.instance.keyCache.loadSaved();
-        assertKeyCacheSize(savedMap.size(), KEYSPACE1, COLUMN_FAMILY2);
+        assertKeyCacheSize(savedMap.size(), KEYSPACE1, cf);
 
         // probably it's better to add equals/hashCode to RowIndexEntry...
         for (Map.Entry<KeyCacheKey, RowIndexEntry> entry : savedMap.entrySet())
@@ -117,77 +139,132 @@ public class KeyCacheTest
             RowIndexEntry expected = entry.getValue();
             RowIndexEntry actual = 
CacheService.instance.keyCache.get(entry.getKey());
             assertEquals(expected.position, actual.position);
-            assertEquals(expected.columnsIndex(), actual.columnsIndex());
+            assertEquals(expected.columnsIndexCount(), 
actual.columnsIndexCount());
+            for (int i = 0; i < expected.columnsIndexCount(); i++)
+            {
+                SSTableReader actualSstr = readerForKey(entry.getKey());
+                try (RowIndexEntry.IndexInfoRetriever actualIir = 
actual.openWithIndex(actualSstr.getIndexFile()))
+                {
+                    RowIndexEntry.IndexInfoRetriever expectedIir = 
savedInfoMap.get(entry.getKey());
+                    assertEquals(expectedIir.columnsIndex(i), 
actualIir.columnsIndex(i));
+                }
+            }
             if (expected.isIndexed())
             {
                 assertEquals(expected.deletionTime(), actual.deletionTime());
             }
         }
+
+        savedInfoMap.values().forEach(iir -> {
+            try
+            {
+                if (iir != null)
+                    iir.close();
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
+        });
+    }
+
+    private static SSTableReader readerForKey(KeyCacheKey k)
+    {
+        return ColumnFamilyStore.getIfExists(k.desc.ksname, 
k.desc.cfname).getLiveSSTables()
+                                .stream()
+                                .filter(sstreader -> 
sstreader.descriptor.generation == k.desc.generation)
+                                .findFirst().get();
+    }
+
+    @Test
+    public void testKeyCacheLoadWithLostTableShallowIndexEntry() throws 
Exception
+    {
+        DatabaseDescriptor.setColumnIndexCacheSize(0);
+        testKeyCacheLoadWithLostTable(COLUMN_FAMILY3);
     }
 
     @Test
-    public void testKeyCacheLoadWithLostTable() throws Exception
+    public void testKeyCacheLoadWithLostTableIndexInfoOnHeap() throws Exception
+    {
+        DatabaseDescriptor.setColumnIndexCacheSize(8);
+        testKeyCacheLoadWithLostTable(COLUMN_FAMILY6);
+    }
+
+    private void testKeyCacheLoadWithLostTable(String cf) throws Exception
     {
         CompactionManager.instance.disableAutoCompaction();
 
-        ColumnFamilyStore store = 
Keyspace.open(KEYSPACE1).getColumnFamilyStore(COLUMN_FAMILY3);
+        ColumnFamilyStore store = 
Keyspace.open(KEYSPACE1).getColumnFamilyStore(cf);
 
         // empty the cache
         CacheService.instance.invalidateKeyCache();
-        assertKeyCacheSize(0, KEYSPACE1, COLUMN_FAMILY3);
+        assertKeyCacheSize(0, KEYSPACE1, cf);
 
         // insert data and force to disk
-        SchemaLoader.insertData(KEYSPACE1, COLUMN_FAMILY3, 0, 100);
+        SchemaLoader.insertData(KEYSPACE1, cf, 0, 100);
         store.forceBlockingFlush();
 
         Collection<SSTableReader> firstFlushTables = 
ImmutableList.copyOf(store.getLiveSSTables());
 
         // populate the cache
-        readData(KEYSPACE1, COLUMN_FAMILY3, 0, 100);
-        assertKeyCacheSize(100, KEYSPACE1, COLUMN_FAMILY3);
+        readData(KEYSPACE1, cf, 0, 100);
+        assertKeyCacheSize(100, KEYSPACE1, cf);
 
         // insert some new data and force to disk
-        SchemaLoader.insertData(KEYSPACE1, COLUMN_FAMILY3, 100, 50);
+        SchemaLoader.insertData(KEYSPACE1, cf, 100, 50);
         store.forceBlockingFlush();
 
         // check that it's fine
-        readData(KEYSPACE1, COLUMN_FAMILY3, 100, 50);
-        assertKeyCacheSize(150, KEYSPACE1, COLUMN_FAMILY3);
+        readData(KEYSPACE1, cf, 100, 50);
+        assertKeyCacheSize(150, KEYSPACE1, cf);
 
         // force the cache to disk
         CacheService.instance.keyCache.submitWrite(Integer.MAX_VALUE).get();
 
         CacheService.instance.invalidateKeyCache();
-        assertKeyCacheSize(0, KEYSPACE1, COLUMN_FAMILY3);
+        assertKeyCacheSize(0, KEYSPACE1, cf);
 
         // check that the content is written correctly
         CacheService.instance.keyCache.loadSaved();
-        assertKeyCacheSize(150, KEYSPACE1, COLUMN_FAMILY3);
+        assertKeyCacheSize(150, KEYSPACE1, cf);
 
         CacheService.instance.invalidateKeyCache();
-        assertKeyCacheSize(0, KEYSPACE1, COLUMN_FAMILY3);
+        assertKeyCacheSize(0, KEYSPACE1, cf);
 
         // now remove the first sstable from the store to simulate losing the 
file
         store.markObsolete(firstFlushTables, OperationType.UNKNOWN);
 
         // check that reading now correctly skips over lost table and reads 
the rest (CASSANDRA-10219)
         CacheService.instance.keyCache.loadSaved();
-        assertKeyCacheSize(50, KEYSPACE1, COLUMN_FAMILY3);
+        assertKeyCacheSize(50, KEYSPACE1, cf);
+    }
+
+    @Test
+    public void testKeyCacheShallowIndexEntry() throws ExecutionException, 
InterruptedException
+    {
+        DatabaseDescriptor.setColumnIndexCacheSize(0);
+        testKeyCache(COLUMN_FAMILY1);
     }
 
     @Test
-    public void testKeyCache() throws ExecutionException, InterruptedException
+    public void testKeyCacheIndexInfoOnHeap() throws ExecutionException, 
InterruptedException
+    {
+        DatabaseDescriptor.setColumnIndexCacheSize(8);
+        testKeyCache(COLUMN_FAMILY4);
+    }
+
+    private void testKeyCache(String cf) throws ExecutionException, 
InterruptedException
     {
         CompactionManager.instance.disableAutoCompaction();
 
         Keyspace keyspace = Keyspace.open(KEYSPACE1);
-        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(COLUMN_FAMILY1);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cf);
 
         // just to make sure that everything is clean
         CacheService.instance.invalidateKeyCache();
 
         // KeyCache should start at size 0 if we're caching X% of zero data.
-        assertKeyCacheSize(0, KEYSPACE1, COLUMN_FAMILY1);
+        assertKeyCacheSize(0, KEYSPACE1, cf);
 
         Mutation rm;
 
@@ -202,7 +279,7 @@ public class KeyCacheTest
         Util.getAll(Util.cmd(cfs, "key1").build());
         Util.getAll(Util.cmd(cfs, "key2").build());
 
-        assertKeyCacheSize(2, KEYSPACE1, COLUMN_FAMILY1);
+        assertKeyCacheSize(2, KEYSPACE1, cf);
 
         Set<SSTableReader> readers = cfs.getLiveSSTables();
         Refs<SSTableReader> refs = Refs.tryRef(readers);
@@ -215,20 +292,20 @@ public class KeyCacheTest
         // after compaction cache should have entries for new SSTables,
         // but since we have kept a reference to the old sstables,
         // if we had 2 keys in cache previously it should become 4
-        assertKeyCacheSize(noEarlyOpen ? 2 : 4, KEYSPACE1, COLUMN_FAMILY1);
+        assertKeyCacheSize(noEarlyOpen ? 2 : 4, KEYSPACE1, cf);
 
         refs.release();
 
         LifecycleTransaction.waitForDeletions();
 
         // after releasing the reference this should drop to 2
-        assertKeyCacheSize(2, KEYSPACE1, COLUMN_FAMILY1);
+        assertKeyCacheSize(2, KEYSPACE1, cf);
 
         // re-read same keys to verify that key cache didn't grow further
         Util.getAll(Util.cmd(cfs, "key1").build());
         Util.getAll(Util.cmd(cfs, "key2").build());
 
-        assertKeyCacheSize(noEarlyOpen ? 4 : 2, KEYSPACE1, COLUMN_FAMILY1);
+        assertKeyCacheSize(noEarlyOpen ? 4 : 2, KEYSPACE1, cf);
     }
 
     private static void readData(String keyspace, String columnFamily, int 
startRow, int numberOfRows)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/test/unit/org/apache/cassandra/db/KeyspaceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/KeyspaceTest.java 
b/test/unit/org/apache/cassandra/db/KeyspaceTest.java
index 6536285..b0820ec 100644
--- a/test/unit/org/apache/cassandra/db/KeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/db/KeyspaceTest.java
@@ -390,7 +390,7 @@ public class KeyspaceTest extends CQLTester
         // verify that we do indeed have multiple index entries
         SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
         RowIndexEntry indexEntry = sstable.getPosition(Util.dk("0"), 
SSTableReader.Operator.EQ);
-        assert indexEntry.columnsIndex().size() > 2;
+        assert indexEntry.columnsIndexCount() > 2;
 
         validateSliceLarge(cfs);
     }

Reply via email to