http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java index 296d142..5ee46da 100644 --- a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java +++ b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java @@ -18,22 +18,23 @@ package org.apache.cassandra.db.columniterator; import java.io.IOException; +import java.util.Comparator; import java.util.Iterator; -import java.util.List; import java.util.NoSuchElementException; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.io.sstable.IndexInfo; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.CorruptSSTableException; -import org.apache.cassandra.io.sstable.IndexHelper; import org.apache.cassandra.io.util.FileDataInput; import org.apache.cassandra.io.util.DataPosition; +import org.apache.cassandra.io.util.SegmentedFile; import org.apache.cassandra.utils.ByteBufferUtil; -abstract class AbstractSSTableIterator implements UnfilteredRowIterator +public abstract class AbstractSSTableIterator implements UnfilteredRowIterator { protected final SSTableReader sstable; protected final DecoratedKey key; @@ -46,6 +47,8 @@ abstract class AbstractSSTableIterator implements UnfilteredRowIterator private final boolean isForThrift; + protected final SegmentedFile ifile; + private boolean isClosed; protected final Slices slices; @@ -59,9 +62,11 @@ abstract class AbstractSSTableIterator implements UnfilteredRowIterator RowIndexEntry indexEntry, Slices slices, ColumnFilter columnFilter, - boolean isForThrift) + boolean isForThrift, + SegmentedFile ifile) { this.sstable = sstable; + this.ifile = ifile; this.key = key; this.columns = columnFilter; this.slices = slices; @@ -434,13 +439,13 @@ abstract class AbstractSSTableIterator implements UnfilteredRowIterator } // Used by indexed readers to store where they are of the index. - protected static class IndexState + public static class IndexState implements AutoCloseable { private final Reader reader; private final ClusteringComparator comparator; private final RowIndexEntry indexEntry; - private final List<IndexHelper.IndexInfo> indexes; + private final RowIndexEntry.IndexInfoRetriever indexInfoRetriever; private final boolean reversed; private int currentIndexIdx; @@ -448,43 +453,43 @@ abstract class AbstractSSTableIterator implements UnfilteredRowIterator // Marks the beginning of the block corresponding to currentIndexIdx. private DataPosition mark; - public IndexState(Reader reader, ClusteringComparator comparator, RowIndexEntry indexEntry, boolean reversed) + public IndexState(Reader reader, ClusteringComparator comparator, RowIndexEntry indexEntry, boolean reversed, SegmentedFile indexFile) { this.reader = reader; this.comparator = comparator; this.indexEntry = indexEntry; - this.indexes = indexEntry.columnsIndex(); + this.indexInfoRetriever = indexEntry.openWithIndex(indexFile); this.reversed = reversed; - this.currentIndexIdx = reversed ? indexEntry.columnsIndex().size() : -1; + this.currentIndexIdx = reversed ? indexEntry.columnsIndexCount() : -1; } public boolean isDone() { - return reversed ? currentIndexIdx < 0 : currentIndexIdx >= indexes.size(); + return reversed ? currentIndexIdx < 0 : currentIndexIdx >= indexEntry.columnsIndexCount(); } // Sets the reader to the beginning of blockIdx. public void setToBlock(int blockIdx) throws IOException { - if (blockIdx >= 0 && blockIdx < indexes.size()) + if (blockIdx >= 0 && blockIdx < indexEntry.columnsIndexCount()) { reader.seekToPosition(columnOffset(blockIdx)); reader.deserializer.clearState(); } currentIndexIdx = blockIdx; - reader.openMarker = blockIdx > 0 ? indexes.get(blockIdx - 1).endOpenMarker : null; + reader.openMarker = blockIdx > 0 ? index(blockIdx - 1).endOpenMarker : null; mark = reader.file.mark(); } - private long columnOffset(int i) + private long columnOffset(int i) throws IOException { - return indexEntry.position + indexes.get(i).offset; + return indexEntry.position + index(i).offset; } public int blocksCount() { - return indexes.size(); + return indexEntry.columnsIndexCount(); } // Update the block idx based on the current reader position if we're past the current block. @@ -503,7 +508,7 @@ abstract class AbstractSSTableIterator implements UnfilteredRowIterator return; } - while (currentIndexIdx + 1 < indexes.size() && isPastCurrentBlock()) + while (currentIndexIdx + 1 < indexEntry.columnsIndexCount() && isPastCurrentBlock()) { reader.openMarker = currentIndex().endOpenMarker; ++currentIndexIdx; @@ -526,7 +531,7 @@ abstract class AbstractSSTableIterator implements UnfilteredRowIterator } // Check if we've crossed an index boundary (based on the mark on the beginning of the index block). - public boolean isPastCurrentBlock() + public boolean isPastCurrentBlock() throws IOException { assert reader.deserializer != null; long correction = reader.deserializer.bytesReadForUnconsumedData(); @@ -538,32 +543,92 @@ abstract class AbstractSSTableIterator implements UnfilteredRowIterator return currentIndexIdx; } - public IndexHelper.IndexInfo currentIndex() + public IndexInfo currentIndex() throws IOException { return index(currentIndexIdx); } - public IndexHelper.IndexInfo index(int i) + public IndexInfo index(int i) throws IOException { - return indexes.get(i); + return indexInfoRetriever.columnsIndex(i); } // Finds the index of the first block containing the provided bound, starting at the provided index. // Will be -1 if the bound is before any block, and blocksCount() if it is after every block. - public int findBlockIndex(Slice.Bound bound, int fromIdx) + public int findBlockIndex(Slice.Bound bound, int fromIdx) throws IOException { if (bound == Slice.Bound.BOTTOM) return -1; if (bound == Slice.Bound.TOP) return blocksCount(); - return IndexHelper.indexFor(bound, indexes, comparator, reversed, fromIdx); + return indexFor(bound, fromIdx); + } + + public int indexFor(ClusteringPrefix name, int lastIndex) throws IOException + { + IndexInfo target = new IndexInfo(name, name, 0, 0, null); + /* + Take the example from the unit test, and say your index looks like this: + [0..5][10..15][20..25] + and you look for the slice [13..17]. + + When doing forward slice, we are doing a binary search comparing 13 (the start of the query) + to the lastName part of the index slot. You'll end up with the "first" slot, going from left to right, + that may contain the start. + + When doing a reverse slice, we do the same thing, only using as a start column the end of the query, + i.e. 17 in this example, compared to the firstName part of the index slots. bsearch will give us the + first slot where firstName > start ([20..25] here), so we subtract an extra one to get the slot just before. + */ + int startIdx = 0; + int endIdx = indexEntry.columnsIndexCount() - 1; + + if (reversed) + { + if (lastIndex < endIdx) + { + endIdx = lastIndex; + } + } + else + { + if (lastIndex > 0) + { + startIdx = lastIndex; + } + } + + int index = binarySearch(target, comparator.indexComparator(reversed), startIdx, endIdx); + return (index < 0 ? -index - (reversed ? 2 : 1) : index); + } + + private int binarySearch(IndexInfo key, Comparator<IndexInfo> c, int low, int high) throws IOException { + while (low <= high) { + int mid = (low + high) >>> 1; + IndexInfo midVal = index(mid); + int cmp = c.compare(midVal, key); + + if (cmp < 0) + low = mid + 1; + else if (cmp > 0) + high = mid - 1; + else + return mid; + } + return -(low + 1); } @Override public String toString() { - return String.format("IndexState(indexSize=%d, currentBlock=%d, reversed=%b)", indexes.size(), currentIndexIdx, reversed); + return String.format("IndexState(indexSize=%d, currentBlock=%d, reversed=%b)", indexEntry.columnsIndexCount(), currentIndexIdx, reversed); + } + + @Override + public void close() throws IOException + { + indexInfoRetriever.close(); } } }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java index 354564a..6b8f83f 100644 --- a/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java +++ b/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java @@ -25,6 +25,7 @@ import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.FileDataInput; +import org.apache.cassandra.io.util.SegmentedFile; /** * A Cell Iterator over SSTable @@ -37,9 +38,10 @@ public class SSTableIterator extends AbstractSSTableIterator RowIndexEntry indexEntry, Slices slices, ColumnFilter columns, - boolean isForThrift) + boolean isForThrift, + SegmentedFile ifile) { - super(sstable, file, key, indexEntry, slices, columns, isForThrift); + super(sstable, file, key, indexEntry, slices, columns, isForThrift, ifile); } protected Reader createReaderInternal(RowIndexEntry indexEntry, FileDataInput file, boolean shouldCloseFile) @@ -181,11 +183,18 @@ public class SSTableIterator extends AbstractSSTableIterator private ForwardIndexedReader(RowIndexEntry indexEntry, FileDataInput file, boolean shouldCloseFile) { super(file, shouldCloseFile); - this.indexState = new IndexState(this, sstable.metadata.comparator, indexEntry, false); + this.indexState = new IndexState(this, sstable.metadata.comparator, indexEntry, false, ifile); this.lastBlockIdx = indexState.blocksCount(); // if we never call setForSlice, that's where we want to stop } @Override + public void close() throws IOException + { + super.close(); + this.indexState.close(); + } + + @Override public void setForSlice(Slice slice) throws IOException { super.setForSlice(slice); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java index d8b41b4..7594cbd 100644 --- a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java +++ b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java @@ -27,6 +27,7 @@ import org.apache.cassandra.db.partitions.ImmutableBTreePartition; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.FileDataInput; +import org.apache.cassandra.io.util.SegmentedFile; import org.apache.cassandra.utils.btree.BTree; /** @@ -40,9 +41,10 @@ public class SSTableReversedIterator extends AbstractSSTableIterator RowIndexEntry indexEntry, Slices slices, ColumnFilter columns, - boolean isForThrift) + boolean isForThrift, + SegmentedFile ifile) { - super(sstable, file, key, indexEntry, slices, columns, isForThrift); + super(sstable, file, key, indexEntry, slices, columns, isForThrift, ifile); } protected Reader createReaderInternal(RowIndexEntry indexEntry, FileDataInput file, boolean shouldCloseFile) @@ -132,7 +134,7 @@ public class SSTableReversedIterator extends AbstractSSTableIterator return iterator.next(); } - protected boolean stopReadingDisk() + protected boolean stopReadingDisk() throws IOException { return false; } @@ -204,7 +206,14 @@ public class SSTableReversedIterator extends AbstractSSTableIterator private ReverseIndexedReader(RowIndexEntry indexEntry, FileDataInput file, boolean shouldCloseFile) { super(file, shouldCloseFile); - this.indexState = new IndexState(this, sstable.metadata.comparator, indexEntry, true); + this.indexState = new IndexState(this, sstable.metadata.comparator, indexEntry, true, ifile); + } + + @Override + public void close() throws IOException + { + super.close(); + this.indexState.close(); } @Override @@ -304,7 +313,7 @@ public class SSTableReversedIterator extends AbstractSSTableIterator } @Override - protected boolean stopReadingDisk() + protected boolean stopReadingDisk() throws IOException { return indexState.isPastCurrentBlock(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/db/compaction/Scrubber.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java index 187caa3..67d351a 100644 --- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java +++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java @@ -151,7 +151,7 @@ public class Scrubber implements Closeable if (indexAvailable()) { // throw away variable so we don't have a side effect in the assert - long firstRowPositionFromIndex = rowIndexEntrySerializer.deserialize(indexFile).position; + long firstRowPositionFromIndex = rowIndexEntrySerializer.deserializePositionAndSkip(indexFile); assert firstRowPositionFromIndex == 0 : firstRowPositionFromIndex; } @@ -338,7 +338,7 @@ public class Scrubber implements Closeable nextRowPositionFromIndex = !indexAvailable() ? dataFile.length() - : rowIndexEntrySerializer.deserialize(indexFile).position; + : rowIndexEntrySerializer.deserializePositionAndSkip(indexFile); } catch (Throwable th) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/db/compaction/Verifier.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java b/src/java/org/apache/cassandra/db/compaction/Verifier.java index 227b209..bb8bcdb 100644 --- a/src/java/org/apache/cassandra/db/compaction/Verifier.java +++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java @@ -128,7 +128,7 @@ public class Verifier implements Closeable { ByteBuffer nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile); { - long firstRowPositionFromIndex = rowIndexEntrySerializer.deserialize(indexFile).position; + long firstRowPositionFromIndex = rowIndexEntrySerializer.deserializePositionAndSkip(indexFile); if (firstRowPositionFromIndex != 0) markAndThrow(); } @@ -162,7 +162,7 @@ public class Verifier implements Closeable nextIndexKey = indexFile.isEOF() ? null : ByteBufferUtil.readWithShortLength(indexFile); nextRowPositionFromIndex = indexFile.isEOF() ? dataFile.length() - : rowIndexEntrySerializer.deserialize(indexFile).position; + : rowIndexEntrySerializer.deserializePositionAndSkip(indexFile); } catch (Throwable th) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java index 4f55677..5c04966 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java @@ -1,5 +1,6 @@ package org.apache.cassandra.db.rows; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.Comparator; import java.util.List; @@ -8,7 +9,7 @@ import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.ClusteringIndexFilter; import org.apache.cassandra.db.filter.ColumnFilter; -import org.apache.cassandra.io.sstable.IndexHelper; +import org.apache.cassandra.io.sstable.IndexInfo; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.metadata.StatsMetadata; import org.apache.cassandra.thrift.ThriftResultsMerger; @@ -60,6 +61,7 @@ public class UnfilteredRowIteratorWithLowerBound extends LazilyInitializedUnfilt // The partition index lower bound is more accurate than the sstable metadata lower bound but it is only // present if the iterator has already been initialized, which we only do when there are tombstones since in // this case we cannot use the sstable metadata clustering values + RangeTombstone.Bound ret = getPartitionIndexLowerBound(); return ret != null ? makeBound(ret) : makeBound(getMetadataLowerBound()); } @@ -158,27 +160,34 @@ public class UnfilteredRowIteratorWithLowerBound extends LazilyInitializedUnfilt */ private RangeTombstone.Bound getPartitionIndexLowerBound() { + // NOTE: CASSANDRA-11206 removed the lookup against the key-cache as the IndexInfo objects are no longer + // in memory for not heap backed IndexInfo objects (so, these are on disk). + // CASSANDRA-11369 is there to fix this afterwards. + // Creating the iterator ensures that rowIndexEntry is loaded if available (partitions bigger than // DatabaseDescriptor.column_index_size_in_kb) if (!canUseMetadataLowerBound()) maybeInit(); RowIndexEntry rowIndexEntry = sstable.getCachedPosition(partitionKey(), false); - if (rowIndexEntry == null) - return null; - - List<IndexHelper.IndexInfo> columns = rowIndexEntry.columnsIndex(); - if (columns.size() == 0) + if (rowIndexEntry == null || !rowIndexEntry.indexOnHeap()) return null; - IndexHelper.IndexInfo column = columns.get(filter.isReversed() ? columns.size() - 1 : 0); - ClusteringPrefix lowerBoundPrefix = filter.isReversed() ? column.lastName : column.firstName; - assert lowerBoundPrefix.getRawValues().length <= sstable.metadata.comparator.size() : + try (RowIndexEntry.IndexInfoRetriever onHeapRetriever = rowIndexEntry.openWithIndex(null)) + { + IndexInfo column = onHeapRetriever.columnsIndex(filter.isReversed() ? rowIndexEntry.columnsIndexCount() - 1 : 0); + ClusteringPrefix lowerBoundPrefix = filter.isReversed() ? column.lastName : column.firstName; + assert lowerBoundPrefix.getRawValues().length <= sstable.metadata.comparator.size() : String.format("Unexpected number of clustering values %d, expected %d or fewer for %s", lowerBoundPrefix.getRawValues().length, sstable.metadata.comparator.size(), sstable.getFilename()); - return RangeTombstone.Bound.inclusiveOpen(filter.isReversed(), lowerBoundPrefix.getRawValues()); + return RangeTombstone.Bound.inclusiveOpen(filter.isReversed(), lowerBoundPrefix.getRawValues()); + } + catch (IOException e) + { + throw new RuntimeException("should never occur", e); + } } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java b/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java index 205bf7e..6eb8b0a 100644 --- a/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java +++ b/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java @@ -73,7 +73,7 @@ class SASIIndexBuilder extends SecondaryIndexBuilder try { RowIndexEntry indexEntry = sstable.getPosition(key, SSTableReader.Operator.EQ); - dataFile.seek(indexEntry.position + indexEntry.headerOffset()); + dataFile.seek(indexEntry.position); ByteBufferUtil.readWithShortLength(dataFile); // key try (SSTableIdentityIterator partition = new SSTableIdentityIterator(sstable, dataFile, key)) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/io/ISerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/ISerializer.java b/src/java/org/apache/cassandra/io/ISerializer.java index 562d226..637a1c7 100644 --- a/src/java/org/apache/cassandra/io/ISerializer.java +++ b/src/java/org/apache/cassandra/io/ISerializer.java @@ -43,4 +43,9 @@ public interface ISerializer<T> public T deserialize(DataInputPlus in) throws IOException; public long serializedSize(T t); + + public default void skip(DataInputPlus in) throws IOException + { + deserialize(in); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/io/sstable/IndexHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/IndexHelper.java b/src/java/org/apache/cassandra/io/sstable/IndexHelper.java deleted file mode 100644 index 74a0fc5..0000000 --- a/src/java/org/apache/cassandra/io/sstable/IndexHelper.java +++ /dev/null @@ -1,192 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.io.sstable; - -import java.io.*; -import java.util.Collections; -import java.util.List; - -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.*; -import org.apache.cassandra.io.ISerializer; -import org.apache.cassandra.io.sstable.format.Version; -import org.apache.cassandra.io.util.DataInputPlus; -import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.utils.*; - -/** - * Provides helper to serialize, deserialize and use column indexes. - */ -public final class IndexHelper -{ - private IndexHelper() - { - } - - /** - * The index of the IndexInfo in which a scan starting with @name should begin. - * - * @param name name to search for - * @param indexList list of the indexInfo objects - * @param comparator the comparator to use - * @param reversed whether or not the search is reversed, i.e. we scan forward or backward from name - * @param lastIndex where to start the search from in indexList - * - * @return int index - */ - public static int indexFor(ClusteringPrefix name, List<IndexInfo> indexList, ClusteringComparator comparator, boolean reversed, int lastIndex) - { - IndexInfo target = new IndexInfo(name, name, 0, 0, null); - /* - Take the example from the unit test, and say your index looks like this: - [0..5][10..15][20..25] - and you look for the slice [13..17]. - - When doing forward slice, we are doing a binary search comparing 13 (the start of the query) - to the lastName part of the index slot. You'll end up with the "first" slot, going from left to right, - that may contain the start. - - When doing a reverse slice, we do the same thing, only using as a start column the end of the query, - i.e. 17 in this example, compared to the firstName part of the index slots. bsearch will give us the - first slot where firstName > start ([20..25] here), so we subtract an extra one to get the slot just before. - */ - int startIdx = 0; - List<IndexInfo> toSearch = indexList; - if (reversed) - { - if (lastIndex < indexList.size() - 1) - { - toSearch = indexList.subList(0, lastIndex + 1); - } - } - else - { - if (lastIndex > 0) - { - startIdx = lastIndex; - toSearch = indexList.subList(lastIndex, indexList.size()); - } - } - int index = Collections.binarySearch(toSearch, target, comparator.indexComparator(reversed)); - return startIdx + (index < 0 ? -index - (reversed ? 2 : 1) : index); - } - - public static class IndexInfo - { - private static final long EMPTY_SIZE = ObjectSizes.measure(new IndexInfo(null, null, 0, 0, null)); - - public final long offset; - public final long width; - public final ClusteringPrefix firstName; - public final ClusteringPrefix lastName; - - // If at the end of the index block there is an open range tombstone marker, this marker - // deletion infos. null otherwise. - public final DeletionTime endOpenMarker; - - public IndexInfo(ClusteringPrefix firstName, - ClusteringPrefix lastName, - long offset, - long width, - DeletionTime endOpenMarker) - { - this.firstName = firstName; - this.lastName = lastName; - this.offset = offset; - this.width = width; - this.endOpenMarker = endOpenMarker; - } - - public static class Serializer - { - // This is the default index size that we use to delta-encode width when serializing so we get better vint-encoding. - // This is imperfect as user can change the index size and ideally we would save the index size used with each index file - // to use as base. However, that's a bit more involved a change that we want for now and very seldom do use change the index - // size so using the default is almost surely better than using no base at all. - public static final long WIDTH_BASE = 64 * 1024; - - private final ISerializer<ClusteringPrefix> clusteringSerializer; - private final Version version; - - public Serializer(CFMetaData metadata, Version version, SerializationHeader header) - { - this.clusteringSerializer = metadata.serializers().indexEntryClusteringPrefixSerializer(version, header); - this.version = version; - } - - public void serialize(IndexInfo info, DataOutputPlus out) throws IOException - { - assert version.storeRows() : "We read old index files but we should never write them"; - - clusteringSerializer.serialize(info.firstName, out); - clusteringSerializer.serialize(info.lastName, out); - out.writeUnsignedVInt(info.offset); - out.writeVInt(info.width - WIDTH_BASE); - - out.writeBoolean(info.endOpenMarker != null); - if (info.endOpenMarker != null) - DeletionTime.serializer.serialize(info.endOpenMarker, out); - } - - public IndexInfo deserialize(DataInputPlus in) throws IOException - { - ClusteringPrefix firstName = clusteringSerializer.deserialize(in); - ClusteringPrefix lastName = clusteringSerializer.deserialize(in); - long offset; - long width; - DeletionTime endOpenMarker = null; - if (version.storeRows()) - { - offset = in.readUnsignedVInt(); - width = in.readVInt() + WIDTH_BASE; - if (in.readBoolean()) - endOpenMarker = DeletionTime.serializer.deserialize(in); - } - else - { - offset = in.readLong(); - width = in.readLong(); - } - return new IndexInfo(firstName, lastName, offset, width, endOpenMarker); - } - - public long serializedSize(IndexInfo info) - { - assert version.storeRows() : "We read old index files but we should never write them"; - - long size = clusteringSerializer.serializedSize(info.firstName) - + clusteringSerializer.serializedSize(info.lastName) - + TypeSizes.sizeofUnsignedVInt(info.offset) - + TypeSizes.sizeofVInt(info.width - WIDTH_BASE) - + TypeSizes.sizeof(info.endOpenMarker != null); - - if (info.endOpenMarker != null) - size += DeletionTime.serializer.serializedSize(info.endOpenMarker); - return size; - } - } - - public long unsharedHeapSize() - { - return EMPTY_SIZE - + firstName.unsharedHeapSize() - + lastName.unsharedHeapSize() - + (endOpenMarker == null ? 0 : endOpenMarker.unsharedHeapSize()); - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/io/sstable/IndexInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/IndexInfo.java b/src/java/org/apache/cassandra/io/sstable/IndexInfo.java new file mode 100644 index 0000000..b07ce4a --- /dev/null +++ b/src/java/org/apache/cassandra/io/sstable/IndexInfo.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.sstable; + +import java.io.IOException; + +import org.apache.cassandra.db.ClusteringPrefix; +import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.RowIndexEntry; +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.ISerializer; +import org.apache.cassandra.io.sstable.format.Version; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.utils.ObjectSizes; + +/** + * {@code IndexInfo} is embedded in the indexed version of {@link RowIndexEntry}. + * Each instance roughly covers a range of {@link org.apache.cassandra.config.Config#column_index_size_in_kb column_index_size_in_kb} kB + * and contains the first and last clustering value (or slice bound), its offset in the data file and width in the data file. + * <p> + * Each {@code IndexInfo} object is serialized as follows. + * </p> + * <p> + * Serialization format changed in 3.0. First, the {@code endOpenMarker} has been introduced. + * Second, the <i>order</i> of the fields in serialized representation changed to allow future + * optimizations to access {@code offset} and {@code width} fields directly without skipping + * {@code firstName}/{@code lastName}. + * </p> + * <p> + * {@code + * (*) IndexInfo.firstName (ClusteringPrefix serializer, either Clustering.serializer.serialize or Slice.Bound.serializer.serialize) + * (*) IndexInfo.lastName (ClusteringPrefix serializer, either Clustering.serializer.serialize or Slice.Bound.serializer.serialize) + * (long) IndexInfo.offset + * (long) IndexInfo.width + * (bool) IndexInfo.endOpenMarker != null (if 3.0) + * (int) IndexInfo.endOpenMarker.localDeletionTime (if 3.0 && IndexInfo.endOpenMarker != null) + * (long) IndexInfo.endOpenMarker.markedForDeletionAt (if 3.0 && IndexInfo.endOpenMarker != null) + * } + * </p> + */ +public class IndexInfo +{ + private static final long EMPTY_SIZE = ObjectSizes.measure(new IndexInfo(null, null, 0, 0, null)); + + public final long offset; + public final long width; + public final ClusteringPrefix firstName; + public final ClusteringPrefix lastName; + + // If at the end of the index block there is an open range tombstone marker, this marker + // deletion infos. null otherwise. + public final DeletionTime endOpenMarker; + + public IndexInfo(ClusteringPrefix firstName, + ClusteringPrefix lastName, + long offset, + long width, + DeletionTime endOpenMarker) + { + this.firstName = firstName; + this.lastName = lastName; + this.offset = offset; + this.width = width; + this.endOpenMarker = endOpenMarker; + } + + public static class Serializer implements ISerializer<IndexInfo> + { + // This is the default index size that we use to delta-encode width when serializing so we get better vint-encoding. + // This is imperfect as user can change the index size and ideally we would save the index size used with each index file + // to use as base. However, that's a bit more involved a change that we want for now and very seldom do use change the index + // size so using the default is almost surely better than using no base at all. + public static final long WIDTH_BASE = 64 * 1024; + + private final ISerializer<ClusteringPrefix> clusteringSerializer; + private final Version version; + + public Serializer(Version version, ISerializer<ClusteringPrefix> clusteringSerializer) + { + this.clusteringSerializer = clusteringSerializer; + this.version = version; + } + + public void serialize(IndexInfo info, DataOutputPlus out) throws IOException + { + assert version.storeRows() : "We read old index files but we should never write them"; + + clusteringSerializer.serialize(info.firstName, out); + clusteringSerializer.serialize(info.lastName, out); + out.writeUnsignedVInt(info.offset); + out.writeVInt(info.width - WIDTH_BASE); + + out.writeBoolean(info.endOpenMarker != null); + if (info.endOpenMarker != null) + DeletionTime.serializer.serialize(info.endOpenMarker, out); + } + + public void skip(DataInputPlus in) throws IOException + { + clusteringSerializer.skip(in); + clusteringSerializer.skip(in); + if (version.storeRows()) + { + in.readUnsignedVInt(); + in.readVInt(); + if (in.readBoolean()) + DeletionTime.serializer.skip(in); + } + else + { + in.skipBytes(TypeSizes.sizeof(0L)); + in.skipBytes(TypeSizes.sizeof(0L)); + } + } + + public IndexInfo deserialize(DataInputPlus in) throws IOException + { + ClusteringPrefix firstName = clusteringSerializer.deserialize(in); + ClusteringPrefix lastName = clusteringSerializer.deserialize(in); + long offset; + long width; + DeletionTime endOpenMarker = null; + if (version.storeRows()) + { + offset = in.readUnsignedVInt(); + width = in.readVInt() + WIDTH_BASE; + if (in.readBoolean()) + endOpenMarker = DeletionTime.serializer.deserialize(in); + } + else + { + offset = in.readLong(); + width = in.readLong(); + } + return new IndexInfo(firstName, lastName, offset, width, endOpenMarker); + } + + public long serializedSize(IndexInfo info) + { + assert version.storeRows() : "We read old index files but we should never write them"; + + long size = clusteringSerializer.serializedSize(info.firstName) + + clusteringSerializer.serializedSize(info.lastName) + + TypeSizes.sizeofUnsignedVInt(info.offset) + + TypeSizes.sizeofVInt(info.width - WIDTH_BASE) + + TypeSizes.sizeof(info.endOpenMarker != null); + + if (info.endOpenMarker != null) + size += DeletionTime.serializer.serializedSize(info.endOpenMarker); + return size; + } + } + + public long unsharedHeapSize() + { + return EMPTY_SIZE + + firstName.unsharedHeapSize() + + lastName.unsharedHeapSize() + + (endOpenMarker == null ? 0 : endOpenMarker.unsharedHeapSize()); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java index 1286f16..e68ca2a 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java @@ -18,16 +18,10 @@ package org.apache.cassandra.io.sstable.format; import com.google.common.base.CharMatcher; -import com.google.common.collect.ImmutableList; import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.LegacyLayout; import org.apache.cassandra.db.RowIndexEntry; import org.apache.cassandra.db.SerializationHeader; -import org.apache.cassandra.db.compaction.CompactionController; import org.apache.cassandra.io.sstable.format.big.BigFormat; -import org.apache.cassandra.io.util.FileDataInput; - -import java.util.Iterator; /** * Provides the accessors to data on disk. http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index 99adb8d..3181a55 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@ -823,12 +823,11 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS try (IndexSummaryBuilder summaryBuilder = summaryLoaded ? null : new IndexSummaryBuilder(estimatedKeys, metadata.params.minIndexInterval, samplingLevel)) { long indexPosition; - RowIndexEntry.IndexSerializer rowIndexSerializer = descriptor.getFormat().getIndexSerializer(metadata, descriptor.version, header); while ((indexPosition = primaryIndex.getFilePointer()) != indexSize) { ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex); - /*RowIndexEntry indexEntry = */rowIndexSerializer.deserialize(primaryIndex); + RowIndexEntry.Serializer.skip(primaryIndex, descriptor.version); DecoratedKey decoratedKey = decorateKey(key); if (first == null) first = decoratedKey; @@ -1822,7 +1821,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS // this saves index summary lookup and index file iteration which whould be pretty costly // especially in presence of promoted column indexes if (isKeyCacheSetup()) - cacheKey(key, rowIndexEntrySerializer.deserialize(in)); + cacheKey(key, rowIndexEntrySerializer.deserialize(in, in.getFilePointer())); } return key; @@ -2021,6 +2020,11 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS return ifile.channel; } + public SegmentedFile getIndexFile() + { + return ifile; + } + /** * @param component component to get timestamp. * @return last modified time for given component. 0 if given component does not exist or IO error occurs. http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java index 0112e55..7a7ce8c 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java @@ -70,8 +70,8 @@ public class BigTableReader extends SSTableReader if (indexEntry == null) return UnfilteredRowIterators.noRowsIterator(metadata, key, Rows.EMPTY_STATIC_ROW, DeletionTime.LIVE, reversed); return reversed - ? new SSTableReversedIterator(this, file, key, indexEntry, slices, selectedColumns, isForThrift) - : new SSTableIterator(this, file, key, indexEntry, slices, selectedColumns, isForThrift); + ? new SSTableReversedIterator(this, file, key, indexEntry, slices, selectedColumns, isForThrift, ifile) + : new SSTableIterator(this, file, key, indexEntry, slices, selectedColumns, isForThrift, ifile); } /** @@ -230,7 +230,7 @@ public class BigTableReader extends SSTableReader if (opSatisfied) { // read data position from index entry - RowIndexEntry indexEntry = rowIndexEntrySerializer.deserialize(in); + RowIndexEntry indexEntry = rowIndexEntrySerializer.deserialize(in, in.getFilePointer()); if (exactMatch && updateCacheAndStats) { assert key instanceof DecoratedKey; // key can be == to the index key only if it's a true row key @@ -252,7 +252,7 @@ public class BigTableReader extends SSTableReader } if (op == Operator.EQ && updateCacheAndStats) bloomFilterTracker.addTruePositive(); - Tracing.trace("Partition index with {} entries found for sstable {}", indexEntry.columnsIndex().size(), descriptor.generation); + Tracing.trace("Partition index with {} entries found for sstable {}", indexEntry.columnsIndexCount(), descriptor.generation); return indexEntry; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java index a14056b..16a0aed 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java @@ -288,7 +288,7 @@ public class BigTableScanner implements ISSTableScanner return endOfData(); currentKey = sstable.decorateKey(ByteBufferUtil.readWithShortLength(ifile)); - currentEntry = rowIndexEntrySerializer.deserialize(ifile); + currentEntry = rowIndexEntrySerializer.deserialize(ifile, ifile.getFilePointer()); } while (!currentRange.contains(currentKey)); } else @@ -307,7 +307,7 @@ public class BigTableScanner implements ISSTableScanner { // we need the position of the start of the next key, regardless of whether it falls in the current range nextKey = sstable.decorateKey(ByteBufferUtil.readWithShortLength(ifile)); - nextEntry = rowIndexEntrySerializer.deserialize(ifile); + nextEntry = rowIndexEntrySerializer.deserialize(ifile, ifile.getFilePointer()); if (!currentRange.contains(nextKey)) { @@ -329,7 +329,7 @@ public class BigTableScanner implements ISSTableScanner { if (dataRange == null) { - dfile.seek(currentEntry.position + currentEntry.headerOffset()); + dfile.seek(currentEntry.position); ByteBufferUtil.readWithShortLength(dfile); // key return new SSTableIdentityIterator(sstable, dfile, partitionKey()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java index 42f923a..bbb22d4 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java @@ -18,6 +18,7 @@ package org.apache.cassandra.io.sstable.format.big; import java.io.*; +import java.nio.ByteBuffer; import java.util.Collection; import java.util.Map; @@ -110,7 +111,7 @@ public class BigTableWriter extends SSTableWriter return (lastWrittenKey == null) ? 0 : dataFile.position(); } - private void afterAppend(DecoratedKey decoratedKey, long dataEnd, RowIndexEntry index) throws IOException + private void afterAppend(DecoratedKey decoratedKey, long dataEnd, RowIndexEntry index, ByteBuffer indexInfo) throws IOException { metadataCollector.addKey(decoratedKey.getKey()); lastWrittenKey = decoratedKey; @@ -120,7 +121,7 @@ public class BigTableWriter extends SSTableWriter if (logger.isTraceEnabled()) logger.trace("wrote {} at {}", decoratedKey, dataEnd); - iwriter.append(decoratedKey, index, dataEnd); + iwriter.append(decoratedKey, index, dataEnd, indexInfo); } /** @@ -150,15 +151,27 @@ public class BigTableWriter extends SSTableWriter try (UnfilteredRowIterator collecting = Transformation.apply(iterator, new StatsCollector(metadataCollector))) { - ColumnIndex index = ColumnIndex.writeAndBuildIndex(collecting, dataFile, header, observers, descriptor.version); + ColumnIndex columnIndex = new ColumnIndex(header, dataFile, descriptor.version, observers, + getRowIndexEntrySerializer().indexInfoSerializer()); - RowIndexEntry entry = RowIndexEntry.create(startPosition, collecting.partitionLevelDeletion(), index); + columnIndex.buildRowIndex(collecting); + + // afterAppend() writes the partition key before the first RowIndexEntry - so we have to add it's + // serialized size to the index-writer position + long indexFilePosition = ByteBufferUtil.serializedSizeWithShortLength(key.getKey()) + iwriter.indexFile.position(); + + RowIndexEntry entry = RowIndexEntry.create(startPosition, indexFilePosition, + collecting.partitionLevelDeletion(), columnIndex.headerLength, columnIndex.columnIndexCount, + columnIndex.indexInfoSerializedSize(), + columnIndex.indexSamples, + columnIndex.offsets(), + getRowIndexEntrySerializer().indexInfoSerializer()); long endPosition = dataFile.position(); long rowSize = endPosition - startPosition; maybeLogLargePartitionWarning(key, rowSize); metadataCollector.addPartitionSizeInBytes(rowSize); - afterAppend(key, endPosition, entry); + afterAppend(key, endPosition, entry, columnIndex.buffer()); return entry; } catch (IOException e) @@ -167,6 +180,11 @@ public class BigTableWriter extends SSTableWriter } } + private RowIndexEntry.IndexSerializer<IndexInfo> getRowIndexEntrySerializer() + { + return (RowIndexEntry.IndexSerializer<IndexInfo>) rowIndexEntrySerializer; + } + private void maybeLogLargePartitionWarning(DecoratedKey key, long rowSize) { if (rowSize > DatabaseDescriptor.getCompactionLargePartitionWarningThreshold()) @@ -403,14 +421,14 @@ public class BigTableWriter extends SSTableWriter return summary.getLastReadableBoundary(); } - public void append(DecoratedKey key, RowIndexEntry indexEntry, long dataEnd) throws IOException + public void append(DecoratedKey key, RowIndexEntry indexEntry, long dataEnd, ByteBuffer indexInfo) throws IOException { bf.add(key); long indexStart = indexFile.position(); try { ByteBufferUtil.writeWithShortLength(key.getKey(), indexFile); - rowIndexEntrySerializer.serialize(indexEntry, indexFile); + rowIndexEntrySerializer.serialize(indexEntry, indexFile, indexInfo); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/service/CacheService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java index 9bda3a0..552642c 100644 --- a/src/java/org/apache/cassandra/service/CacheService.java +++ b/src/java/org/apache/cassandra/service/CacheService.java @@ -465,7 +465,9 @@ public class CacheService implements CacheServiceMBean ByteBufferUtil.writeWithLength(key.key, out); out.writeInt(key.desc.generation); out.writeBoolean(true); - key.desc.getFormat().getIndexSerializer(cfs.metadata, key.desc.version, SerializationHeader.forKeyCache(cfs.metadata)).serialize(entry, out); + + SerializationHeader header = new SerializationHeader(false, cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS); + key.desc.getFormat().getIndexSerializer(cfs.metadata, key.desc.version, header).serializeForCache(entry, out); } public Future<Pair<KeyCacheKey, RowIndexEntry>> deserialize(DataInputPlus input, ColumnFamilyStore cfs) throws IOException @@ -481,20 +483,20 @@ public class CacheService implements CacheServiceMBean ByteBuffer key = ByteBufferUtil.read(input, keyLength); int generation = input.readInt(); input.readBoolean(); // backwards compatibility for "promoted indexes" boolean - SSTableReader reader = null; + SSTableReader reader; if (cfs == null || !cfs.isKeyCacheEnabled() || (reader = findDesc(generation, cfs.getSSTables(SSTableSet.CANONICAL))) == null) { // The sstable doesn't exist anymore, so we can't be sure of the exact version and assume its the current version. The only case where we'll be // wrong is during upgrade, in which case we fail at deserialization. This is not a huge deal however since 1) this is unlikely enough that // this won't affect many users (if any) and only once, 2) this doesn't prevent the node from starting and 3) CASSANDRA-10219 shows that this // part of the code has been broken for a while without anyone noticing (it is, btw, still broken until CASSANDRA-10219 is fixed). - RowIndexEntry.Serializer.skip(input, BigFormat.instance.getLatestVersion()); + RowIndexEntry.Serializer.skipForCache(input, BigFormat.instance.getLatestVersion()); return null; } RowIndexEntry.IndexSerializer<?> indexSerializer = reader.descriptor.getFormat().getIndexSerializer(reader.metadata, reader.descriptor.version, - SerializationHeader.forKeyCache(cfs.metadata)); - RowIndexEntry entry = indexSerializer.deserialize(input); + reader.header); + RowIndexEntry<?> entry = indexSerializer.deserializeForCache(input); return Futures.immediateFuture(Pair.create(new KeyCacheKey(cfs.metadata.ksAndCFName, reader.descriptor, key), entry)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java b/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java index 0c7e8a5..c952470 100644 --- a/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java +++ b/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java @@ -19,6 +19,7 @@ package org.apache.cassandra.cache; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.*; import org.apache.cassandra.db.marshal.AsciiType; import org.apache.cassandra.io.sstable.format.SSTableReader; @@ -51,9 +52,23 @@ public class AutoSavingCacheTest } @Test + public void testSerializeAndLoadKeyCache0kB() throws Exception + { + DatabaseDescriptor.setColumnIndexCacheSize(0); + doTestSerializeAndLoadKeyCache(); + } + + @Test public void testSerializeAndLoadKeyCache() throws Exception { + DatabaseDescriptor.setColumnIndexCacheSize(8); + doTestSerializeAndLoadKeyCache(); + } + + private static void doTestSerializeAndLoadKeyCache() throws Exception + { ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1); + cfs.truncateBlocking(); for (int i = 0; i < 2; i++) { ColumnDefinition colDef = ColumnDefinition.regularDef(cfs.metadata, ByteBufferUtil.bytes("col1"), AsciiType.instance); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java b/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java index 2529de1..21a17fa 100644 --- a/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java +++ b/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java @@ -28,6 +28,7 @@ import org.junit.Assert; import org.junit.Test; import org.apache.cassandra.cache.KeyCacheKey; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.index.Index; @@ -81,7 +82,20 @@ public class KeyCacheCqlTest extends CQLTester "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"; @Test - public void testSliceQueries() throws Throwable + public void testSliceQueriesShallowIndexEntry() throws Throwable + { + DatabaseDescriptor.setColumnIndexCacheSize(0); + testSliceQueries(); + } + + @Test + public void testSliceQueriesIndexInfoOnHeap() throws Throwable + { + DatabaseDescriptor.setColumnIndexCacheSize(8); + testSliceQueries(); + } + + private void testSliceQueries() throws Throwable { createTable("CREATE TABLE %s (pk text, ck1 int, ck2 int, val text, vpk text, vck1 int, vck2 int, PRIMARY KEY (pk, ck1, ck2))"); @@ -165,7 +179,20 @@ public class KeyCacheCqlTest extends CQLTester } @Test - public void test2iKeyCachePaths() throws Throwable + public void test2iKeyCachePathsShallowIndexEntry() throws Throwable + { + DatabaseDescriptor.setColumnIndexCacheSize(0); + test2iKeyCachePaths(); + } + + @Test + public void test2iKeyCachePathsIndexInfoOnHeap() throws Throwable + { + DatabaseDescriptor.setColumnIndexCacheSize(8); + test2iKeyCachePaths(); + } + + private void test2iKeyCachePaths() throws Throwable { String table = createTable("CREATE TABLE %s (" + commonColumnsDef @@ -242,7 +269,20 @@ public class KeyCacheCqlTest extends CQLTester } @Test - public void test2iKeyCachePathsSaveKeysForDroppedTable() throws Throwable + public void test2iKeyCachePathsSaveKeysForDroppedTableShallowIndexEntry() throws Throwable + { + DatabaseDescriptor.setColumnIndexCacheSize(0); + test2iKeyCachePathsSaveKeysForDroppedTable(); + } + + @Test + public void test2iKeyCachePathsSaveKeysForDroppedTableIndexInfoOnHeap() throws Throwable + { + DatabaseDescriptor.setColumnIndexCacheSize(8); + test2iKeyCachePathsSaveKeysForDroppedTable(); + } + + private void test2iKeyCachePathsSaveKeysForDroppedTable() throws Throwable { String table = createTable("CREATE TABLE %s (" + commonColumnsDef @@ -302,7 +342,20 @@ public class KeyCacheCqlTest extends CQLTester } @Test - public void testKeyCacheNonClustered() throws Throwable + public void testKeyCacheNonClusteredShallowIndexEntry() throws Throwable + { + DatabaseDescriptor.setColumnIndexCacheSize(0); + testKeyCacheNonClustered(); + } + + @Test + public void testKeyCacheNonClusteredIndexInfoOnHeap() throws Throwable + { + DatabaseDescriptor.setColumnIndexCacheSize(8); + testKeyCacheNonClustered(); + } + + private void testKeyCacheNonClustered() throws Throwable { String table = createTable("CREATE TABLE %s (" + commonColumnsDef @@ -335,7 +388,20 @@ public class KeyCacheCqlTest extends CQLTester } @Test - public void testKeyCacheClustered() throws Throwable + public void testKeyCacheClusteredShallowIndexEntry() throws Throwable + { + DatabaseDescriptor.setColumnIndexCacheSize(0); + testKeyCacheClustered(); + } + + @Test + public void testKeyCacheClusteredIndexInfoOnHeap() throws Throwable + { + DatabaseDescriptor.setColumnIndexCacheSize(8); + testKeyCacheClustered(); + } + + private void testKeyCacheClustered() throws Throwable { String table = createTable("CREATE TABLE %s (" + commonColumnsDef http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/test/unit/org/apache/cassandra/cql3/PagingQueryTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/PagingQueryTest.java b/test/unit/org/apache/cassandra/cql3/PagingQueryTest.java new file mode 100644 index 0000000..8f5f282 --- /dev/null +++ b/test/unit/org/apache/cassandra/cql3/PagingQueryTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.cql3; + +import java.util.Iterator; +import java.util.concurrent.ThreadLocalRandom; + +import org.junit.Test; + +import com.datastax.driver.core.*; +import com.datastax.driver.core.ResultSet; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class PagingQueryTest extends CQLTester +{ + @Test + public void pagingOnRegularColumn() throws Throwable + { + createTable("CREATE TABLE %s (" + + " k1 int," + + " c1 int," + + " c2 int," + + " v1 text," + + " v2 text," + + " v3 text," + + " v4 text," + + "PRIMARY KEY (k1, c1, c2))"); + + for (int c1 = 0; c1 < 100; c1++) + { + for (int c2 = 0; c2 < 100; c2++) + { + execute("INSERT INTO %s (k1, c1, c2, v1, v2, v3, v4) VALUES (?, ?, ?, ?, ?, ?, ?)", 1, c1, c2, + Integer.toString(c1), Integer.toString(c2), someText(), someText()); + } + + if (c1 % 30 == 0) + flush(); + } + + flush(); + + try (Session session = sessionNet()) + { + SimpleStatement stmt = new SimpleStatement("SELECT c1, c2, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE k1 = 1"); + stmt.setFetchSize(3); + ResultSet rs = session.execute(stmt); + Iterator<Row> iter = rs.iterator(); + for (int c1 = 0; c1 < 100; c1++) + { + for (int c2 = 0; c2 < 100; c2++) + { + assertTrue(iter.hasNext()); + Row row = iter.next(); + String msg = "On " + c1 + ',' + c2; + assertEquals(msg, c1, row.getInt(0)); + assertEquals(msg, c2, row.getInt(1)); + assertEquals(msg, Integer.toString(c1), row.getString(2)); + assertEquals(msg, Integer.toString(c2), row.getString(3)); + } + } + assertFalse(iter.hasNext()); + + for (int c1 = 0; c1 < 100; c1++) + { + stmt = new SimpleStatement("SELECT c1, c2, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE k1 = 1 AND c1 = ?", c1); + stmt.setFetchSize(3); + rs = session.execute(stmt); + iter = rs.iterator(); + for (int c2 = 0; c2 < 100; c2++) + { + assertTrue(iter.hasNext()); + Row row = iter.next(); + String msg = "Within " + c1 + " on " + c2; + assertEquals(msg, c1, row.getInt(0)); + assertEquals(msg, c2, row.getInt(1)); + assertEquals(msg, Integer.toString(c1), row.getString(2)); + assertEquals(msg, Integer.toString(c2), row.getString(3)); + } + assertFalse(iter.hasNext()); + } + } + } + + private static String someText() + { + char[] arr = new char[1024]; + for (int i = 0; i < arr.length; i++) + arr[i] = (char)(32 + ThreadLocalRandom.current().nextInt(95)); + return new String(arr); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/test/unit/org/apache/cassandra/cql3/TombstonesWithIndexedSSTableTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/TombstonesWithIndexedSSTableTest.java b/test/unit/org/apache/cassandra/cql3/TombstonesWithIndexedSSTableTest.java index 3042acd..41a7d91 100644 --- a/test/unit/org/apache/cassandra/cql3/TombstonesWithIndexedSSTableTest.java +++ b/test/unit/org/apache/cassandra/cql3/TombstonesWithIndexedSSTableTest.java @@ -24,8 +24,8 @@ import org.junit.Test; import org.apache.cassandra.Util; import org.apache.cassandra.db.*; import org.apache.cassandra.db.marshal.Int32Type; -import org.apache.cassandra.io.sstable.IndexHelper; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.FileDataInput; import org.apache.cassandra.utils.ByteBufferUtil; public class TombstonesWithIndexedSSTableTest extends CQLTester @@ -76,13 +76,17 @@ public class TombstonesWithIndexedSSTableTest extends CQLTester { // The line below failed with key caching off (CASSANDRA-11158) @SuppressWarnings("unchecked") - RowIndexEntry<IndexHelper.IndexInfo> indexEntry = sstable.getPosition(dk, SSTableReader.Operator.EQ); + RowIndexEntry indexEntry = sstable.getPosition(dk, SSTableReader.Operator.EQ); if (indexEntry != null && indexEntry.isIndexed()) { - ClusteringPrefix firstName = indexEntry.columnsIndex().get(1).firstName; - if (firstName.kind().isBoundary()) - break deletionLoop; - indexedRow = Int32Type.instance.compose(firstName.get(0)); + try (FileDataInput reader = sstable.openIndexReader()) + { + RowIndexEntry.IndexInfoRetriever infoRetriever = indexEntry.openWithIndex(sstable.getIndexFile()); + ClusteringPrefix firstName = infoRetriever.columnsIndex(1).firstName; + if (firstName.kind().isBoundary()) + break deletionLoop; + indexedRow = Int32Type.instance.compose(firstName.get(0)); + } } } assert indexedRow >= 0; http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/test/unit/org/apache/cassandra/db/KeyCacheTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/KeyCacheTest.java b/test/unit/org/apache/cassandra/db/KeyCacheTest.java index 515d30e..ada6b5b 100644 --- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java +++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java @@ -17,16 +17,15 @@ */ package org.apache.cassandra.db; +import java.io.IOException; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import com.google.common.collect.ImmutableList; -import com.google.common.util.concurrent.Uninterruptibles; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -34,9 +33,7 @@ import org.junit.Test; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; import org.apache.cassandra.cache.KeyCacheKey; -import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; @@ -54,6 +51,9 @@ public class KeyCacheTest private static final String COLUMN_FAMILY1 = "Standard1"; private static final String COLUMN_FAMILY2 = "Standard2"; private static final String COLUMN_FAMILY3 = "Standard3"; + private static final String COLUMN_FAMILY4 = "Standard4"; + private static final String COLUMN_FAMILY5 = "Standard5"; + private static final String COLUMN_FAMILY6 = "Standard6"; @BeforeClass @@ -64,7 +64,10 @@ public class KeyCacheTest KeyspaceParams.simple(1), SchemaLoader.standardCFMD(KEYSPACE1, COLUMN_FAMILY1), SchemaLoader.standardCFMD(KEYSPACE1, COLUMN_FAMILY2), - SchemaLoader.standardCFMD(KEYSPACE1, COLUMN_FAMILY3)); + SchemaLoader.standardCFMD(KEYSPACE1, COLUMN_FAMILY3), + SchemaLoader.standardCFMD(KEYSPACE1, COLUMN_FAMILY4), + SchemaLoader.standardCFMD(KEYSPACE1, COLUMN_FAMILY5), + SchemaLoader.standardCFMD(KEYSPACE1, COLUMN_FAMILY6)); } @AfterClass @@ -74,42 +77,61 @@ public class KeyCacheTest } @Test - public void testKeyCacheLoad() throws Exception + public void testKeyCacheLoadShallowIndexEntry() throws Exception + { + DatabaseDescriptor.setColumnIndexCacheSize(0); + testKeyCacheLoad(COLUMN_FAMILY2); + } + + @Test + public void testKeyCacheLoadIndexInfoOnHeap() throws Exception + { + DatabaseDescriptor.setColumnIndexCacheSize(8); + testKeyCacheLoad(COLUMN_FAMILY5); + } + + private void testKeyCacheLoad(String cf) throws Exception { CompactionManager.instance.disableAutoCompaction(); - ColumnFamilyStore store = Keyspace.open(KEYSPACE1).getColumnFamilyStore(COLUMN_FAMILY2); + ColumnFamilyStore store = Keyspace.open(KEYSPACE1).getColumnFamilyStore(cf); // empty the cache CacheService.instance.invalidateKeyCache(); - assertKeyCacheSize(0, KEYSPACE1, COLUMN_FAMILY2); + assertKeyCacheSize(0, KEYSPACE1, cf); // insert data and force to disk - SchemaLoader.insertData(KEYSPACE1, COLUMN_FAMILY2, 0, 100); + SchemaLoader.insertData(KEYSPACE1, cf, 0, 100); store.forceBlockingFlush(); // populate the cache - readData(KEYSPACE1, COLUMN_FAMILY2, 0, 100); - assertKeyCacheSize(100, KEYSPACE1, COLUMN_FAMILY2); + readData(KEYSPACE1, cf, 0, 100); + assertKeyCacheSize(100, KEYSPACE1, cf); // really? our caches don't implement the map interface? (hence no .addAll) - Map<KeyCacheKey, RowIndexEntry> savedMap = new HashMap<KeyCacheKey, RowIndexEntry>(); + Map<KeyCacheKey, RowIndexEntry> savedMap = new HashMap<>(); + Map<KeyCacheKey, RowIndexEntry.IndexInfoRetriever> savedInfoMap = new HashMap<>(); for (Iterator<KeyCacheKey> iter = CacheService.instance.keyCache.keyIterator(); iter.hasNext();) { KeyCacheKey k = iter.next(); - if (k.desc.ksname.equals(KEYSPACE1) && k.desc.cfname.equals(COLUMN_FAMILY2)) - savedMap.put(k, CacheService.instance.keyCache.get(k)); + if (k.desc.ksname.equals(KEYSPACE1) && k.desc.cfname.equals(cf)) + { + RowIndexEntry rie = CacheService.instance.keyCache.get(k); + savedMap.put(k, rie); + SSTableReader sstr = readerForKey(k); + savedInfoMap.put(k, rie.openWithIndex(sstr.getIndexFile())); + } } // force the cache to disk CacheService.instance.keyCache.submitWrite(Integer.MAX_VALUE).get(); CacheService.instance.invalidateKeyCache(); - assertKeyCacheSize(0, KEYSPACE1, COLUMN_FAMILY2); + assertKeyCacheSize(0, KEYSPACE1, cf); CacheService.instance.keyCache.loadSaved(); - assertKeyCacheSize(savedMap.size(), KEYSPACE1, COLUMN_FAMILY2); + assertKeyCacheSize(savedMap.size(), KEYSPACE1, cf); // probably it's better to add equals/hashCode to RowIndexEntry... for (Map.Entry<KeyCacheKey, RowIndexEntry> entry : savedMap.entrySet()) @@ -117,77 +139,132 @@ public class KeyCacheTest RowIndexEntry expected = entry.getValue(); RowIndexEntry actual = CacheService.instance.keyCache.get(entry.getKey()); assertEquals(expected.position, actual.position); - assertEquals(expected.columnsIndex(), actual.columnsIndex()); + assertEquals(expected.columnsIndexCount(), actual.columnsIndexCount()); + for (int i = 0; i < expected.columnsIndexCount(); i++) + { + SSTableReader actualSstr = readerForKey(entry.getKey()); + try (RowIndexEntry.IndexInfoRetriever actualIir = actual.openWithIndex(actualSstr.getIndexFile())) + { + RowIndexEntry.IndexInfoRetriever expectedIir = savedInfoMap.get(entry.getKey()); + assertEquals(expectedIir.columnsIndex(i), actualIir.columnsIndex(i)); + } + } if (expected.isIndexed()) { assertEquals(expected.deletionTime(), actual.deletionTime()); } } + + savedInfoMap.values().forEach(iir -> { + try + { + if (iir != null) + iir.close(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + }); + } + + private static SSTableReader readerForKey(KeyCacheKey k) + { + return ColumnFamilyStore.getIfExists(k.desc.ksname, k.desc.cfname).getLiveSSTables() + .stream() + .filter(sstreader -> sstreader.descriptor.generation == k.desc.generation) + .findFirst().get(); + } + + @Test + public void testKeyCacheLoadWithLostTableShallowIndexEntry() throws Exception + { + DatabaseDescriptor.setColumnIndexCacheSize(0); + testKeyCacheLoadWithLostTable(COLUMN_FAMILY3); } @Test - public void testKeyCacheLoadWithLostTable() throws Exception + public void testKeyCacheLoadWithLostTableIndexInfoOnHeap() throws Exception + { + DatabaseDescriptor.setColumnIndexCacheSize(8); + testKeyCacheLoadWithLostTable(COLUMN_FAMILY6); + } + + private void testKeyCacheLoadWithLostTable(String cf) throws Exception { CompactionManager.instance.disableAutoCompaction(); - ColumnFamilyStore store = Keyspace.open(KEYSPACE1).getColumnFamilyStore(COLUMN_FAMILY3); + ColumnFamilyStore store = Keyspace.open(KEYSPACE1).getColumnFamilyStore(cf); // empty the cache CacheService.instance.invalidateKeyCache(); - assertKeyCacheSize(0, KEYSPACE1, COLUMN_FAMILY3); + assertKeyCacheSize(0, KEYSPACE1, cf); // insert data and force to disk - SchemaLoader.insertData(KEYSPACE1, COLUMN_FAMILY3, 0, 100); + SchemaLoader.insertData(KEYSPACE1, cf, 0, 100); store.forceBlockingFlush(); Collection<SSTableReader> firstFlushTables = ImmutableList.copyOf(store.getLiveSSTables()); // populate the cache - readData(KEYSPACE1, COLUMN_FAMILY3, 0, 100); - assertKeyCacheSize(100, KEYSPACE1, COLUMN_FAMILY3); + readData(KEYSPACE1, cf, 0, 100); + assertKeyCacheSize(100, KEYSPACE1, cf); // insert some new data and force to disk - SchemaLoader.insertData(KEYSPACE1, COLUMN_FAMILY3, 100, 50); + SchemaLoader.insertData(KEYSPACE1, cf, 100, 50); store.forceBlockingFlush(); // check that it's fine - readData(KEYSPACE1, COLUMN_FAMILY3, 100, 50); - assertKeyCacheSize(150, KEYSPACE1, COLUMN_FAMILY3); + readData(KEYSPACE1, cf, 100, 50); + assertKeyCacheSize(150, KEYSPACE1, cf); // force the cache to disk CacheService.instance.keyCache.submitWrite(Integer.MAX_VALUE).get(); CacheService.instance.invalidateKeyCache(); - assertKeyCacheSize(0, KEYSPACE1, COLUMN_FAMILY3); + assertKeyCacheSize(0, KEYSPACE1, cf); // check that the content is written correctly CacheService.instance.keyCache.loadSaved(); - assertKeyCacheSize(150, KEYSPACE1, COLUMN_FAMILY3); + assertKeyCacheSize(150, KEYSPACE1, cf); CacheService.instance.invalidateKeyCache(); - assertKeyCacheSize(0, KEYSPACE1, COLUMN_FAMILY3); + assertKeyCacheSize(0, KEYSPACE1, cf); // now remove the first sstable from the store to simulate losing the file store.markObsolete(firstFlushTables, OperationType.UNKNOWN); // check that reading now correctly skips over lost table and reads the rest (CASSANDRA-10219) CacheService.instance.keyCache.loadSaved(); - assertKeyCacheSize(50, KEYSPACE1, COLUMN_FAMILY3); + assertKeyCacheSize(50, KEYSPACE1, cf); + } + + @Test + public void testKeyCacheShallowIndexEntry() throws ExecutionException, InterruptedException + { + DatabaseDescriptor.setColumnIndexCacheSize(0); + testKeyCache(COLUMN_FAMILY1); } @Test - public void testKeyCache() throws ExecutionException, InterruptedException + public void testKeyCacheIndexInfoOnHeap() throws ExecutionException, InterruptedException + { + DatabaseDescriptor.setColumnIndexCacheSize(8); + testKeyCache(COLUMN_FAMILY4); + } + + private void testKeyCache(String cf) throws ExecutionException, InterruptedException { CompactionManager.instance.disableAutoCompaction(); Keyspace keyspace = Keyspace.open(KEYSPACE1); - ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(COLUMN_FAMILY1); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cf); // just to make sure that everything is clean CacheService.instance.invalidateKeyCache(); // KeyCache should start at size 0 if we're caching X% of zero data. - assertKeyCacheSize(0, KEYSPACE1, COLUMN_FAMILY1); + assertKeyCacheSize(0, KEYSPACE1, cf); Mutation rm; @@ -202,7 +279,7 @@ public class KeyCacheTest Util.getAll(Util.cmd(cfs, "key1").build()); Util.getAll(Util.cmd(cfs, "key2").build()); - assertKeyCacheSize(2, KEYSPACE1, COLUMN_FAMILY1); + assertKeyCacheSize(2, KEYSPACE1, cf); Set<SSTableReader> readers = cfs.getLiveSSTables(); Refs<SSTableReader> refs = Refs.tryRef(readers); @@ -215,20 +292,20 @@ public class KeyCacheTest // after compaction cache should have entries for new SSTables, // but since we have kept a reference to the old sstables, // if we had 2 keys in cache previously it should become 4 - assertKeyCacheSize(noEarlyOpen ? 2 : 4, KEYSPACE1, COLUMN_FAMILY1); + assertKeyCacheSize(noEarlyOpen ? 2 : 4, KEYSPACE1, cf); refs.release(); LifecycleTransaction.waitForDeletions(); // after releasing the reference this should drop to 2 - assertKeyCacheSize(2, KEYSPACE1, COLUMN_FAMILY1); + assertKeyCacheSize(2, KEYSPACE1, cf); // re-read same keys to verify that key cache didn't grow further Util.getAll(Util.cmd(cfs, "key1").build()); Util.getAll(Util.cmd(cfs, "key2").build()); - assertKeyCacheSize(noEarlyOpen ? 4 : 2, KEYSPACE1, COLUMN_FAMILY1); + assertKeyCacheSize(noEarlyOpen ? 4 : 2, KEYSPACE1, cf); } private static void readData(String keyspace, String columnFamily, int startRow, int numberOfRows) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/test/unit/org/apache/cassandra/db/KeyspaceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/KeyspaceTest.java b/test/unit/org/apache/cassandra/db/KeyspaceTest.java index 6536285..b0820ec 100644 --- a/test/unit/org/apache/cassandra/db/KeyspaceTest.java +++ b/test/unit/org/apache/cassandra/db/KeyspaceTest.java @@ -390,7 +390,7 @@ public class KeyspaceTest extends CQLTester // verify that we do indeed have multiple index entries SSTableReader sstable = cfs.getLiveSSTables().iterator().next(); RowIndexEntry indexEntry = sstable.getPosition(Util.dk("0"), SSTableReader.Operator.EQ); - assert indexEntry.columnsIndex().size() > 2; + assert indexEntry.columnsIndexCount() > 2; validateSliceLarge(cfs); }