Support large partitions on the 3.0 sstable format patch by Robert Stupp; reviewed by T Jake Luciani for CASSANDRA-11206
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ef5bbedd Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ef5bbedd Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ef5bbedd Branch: refs/heads/trunk Commit: ef5bbedd687d75923e9a20fde9d2f78b4535241d Parents: ae063e8 Author: Robert Stupp <sn...@snazy.de> Authored: Thu Apr 21 16:48:26 2016 +0200 Committer: Robert Stupp <sn...@snazy.de> Committed: Thu Apr 21 16:48:26 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 3 + conf/cassandra.yaml | 8 + .../apache/cassandra/cache/AutoSavingCache.java | 4 +- .../org/apache/cassandra/config/Config.java | 1 + .../cassandra/config/DatabaseDescriptor.java | 11 + .../org/apache/cassandra/db/Clustering.java | 6 + .../cassandra/db/ClusteringComparator.java | 2 +- .../apache/cassandra/db/ClusteringPrefix.java | 29 + .../org/apache/cassandra/db/ColumnIndex.java | 296 +++--- .../org/apache/cassandra/db/RangeTombstone.java | 10 + .../org/apache/cassandra/db/RowIndexEntry.java | 1006 +++++++++++++++--- .../cassandra/db/SerializationHeader.java | 20 - .../org/apache/cassandra/db/Serializers.java | 95 +- .../columniterator/AbstractSSTableIterator.java | 111 +- .../db/columniterator/SSTableIterator.java | 15 +- .../columniterator/SSTableReversedIterator.java | 19 +- .../cassandra/db/compaction/Scrubber.java | 4 +- .../cassandra/db/compaction/Verifier.java | 4 +- .../UnfilteredRowIteratorWithLowerBound.java | 29 +- .../cassandra/index/sasi/SASIIndexBuilder.java | 2 +- .../org/apache/cassandra/io/ISerializer.java | 5 + .../cassandra/io/sstable/IndexHelper.java | 192 ---- .../apache/cassandra/io/sstable/IndexInfo.java | 178 ++++ .../io/sstable/format/SSTableFormat.java | 6 - .../io/sstable/format/SSTableReader.java | 10 +- .../io/sstable/format/big/BigTableReader.java | 8 +- .../io/sstable/format/big/BigTableScanner.java | 6 +- .../io/sstable/format/big/BigTableWriter.java | 32 +- .../apache/cassandra/service/CacheService.java | 12 +- .../cassandra/cache/AutoSavingCacheTest.java | 15 + .../apache/cassandra/cql3/KeyCacheCqlTest.java | 76 +- .../apache/cassandra/cql3/PagingQueryTest.java | 111 ++ .../cql3/TombstonesWithIndexedSSTableTest.java | 16 +- .../org/apache/cassandra/db/KeyCacheTest.java | 151 ++- .../org/apache/cassandra/db/KeyspaceTest.java | 2 +- .../apache/cassandra/db/RowIndexEntryTest.java | 768 +++++++++++-- .../cassandra/io/sstable/IndexHelperTest.java | 78 -- .../io/sstable/LargePartitionsTest.java | 219 ++++ 39 files changed, 2775 insertions(+), 786 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 1b94b2d..a0c7df6 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.6 + * Support large partitions on the 3.0 sstable format (CASSANDRA-11206) * JSON datetime formatting needs timezone (CASSANDRA-11137) * Add support to rebuild from specific range (CASSANDRA-10409) * Optimize the overlapping lookup by calculating all the http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index e073592..a177d37 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -18,6 +18,9 @@ using the provided 'sstableupgrade' tool. New features ------------ + - Key cache will only hold indexed entries up to the size configured by + column_index_cache_size_in_kb in cassandra.yaml in memory. Larger indexed entries + will never go into memory. See CASSANDRA-11206 for more details. - For tables having a default_time_to_live specifying a TTL of 0 will remove the TTL from the inserted or updated values. - Startup is now aborted if corrupted transaction log files are found. The details http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index f9be453..582859c 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -659,6 +659,14 @@ auto_snapshot: true # rows (as part of the key cache), so a larger granularity means # you can cache more hot rows column_index_size_in_kb: 64 +# Per sstable indexed key cache entries (the collation index in memory +# mentioned above) exceeding this size will not be held on heap. +# This means that only partition information is held on heap and the +# index entries are read from disk. +# +# Note that this size refers to the size of the +# serialized index information and not the size of the partition. +column_index_cache_size_in_kb: 2 # Number of simultaneous compactions to allow, NOT including # validation "compactions" for anti-entropy repair. Simultaneous http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/cache/AutoSavingCache.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java index e39dcf1..2921818 100644 --- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java +++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java @@ -77,8 +77,10 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K * a minor version letter. * * Sticking with "d" is fine for 3.0 since it has never been released or used by another version + * + * "e" introduced with CASSANDRA-11206, omits IndexInfo from key-cache, stores offset into index-file */ - private static final String CURRENT_VERSION = "d"; + private static final String CURRENT_VERSION = "e"; private static volatile IStreamFactory streamFactory = new IStreamFactory() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index b5dea06..809966d 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -162,6 +162,7 @@ public class Config /* if the size of columns or super-columns are more than this, indexing will kick in */ public Integer column_index_size_in_kb = 64; + public Integer column_index_cache_size_in_kb = 2; public volatile int batch_size_warn_threshold_in_kb = 5; public volatile int batch_size_fail_threshold_in_kb = 50; public Integer unlogged_batch_across_partitions_warn_threshold = 10; http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 37b9200..b98d103 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -962,6 +962,17 @@ public class DatabaseDescriptor conf.column_index_size_in_kb = val; } + public static int getColumnIndexCacheSize() + { + return conf.column_index_cache_size_in_kb * 1024; + } + + @VisibleForTesting + public static void setColumnIndexCacheSize(int val) + { + conf.column_index_cache_size_in_kb = val; + } + public static int getBatchSizeWarnThreshold() { return conf.batch_size_warn_threshold_in_kb * 1024; http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/db/Clustering.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Clustering.java b/src/java/org/apache/cassandra/db/Clustering.java index f5ffae4..fa38ce1 100644 --- a/src/java/org/apache/cassandra/db/Clustering.java +++ b/src/java/org/apache/cassandra/db/Clustering.java @@ -149,6 +149,12 @@ public interface Clustering extends ClusteringPrefix return ClusteringPrefix.serializer.valuesWithoutSizeSerializedSize(clustering, version, types); } + public void skip(DataInputPlus in, int version, List<AbstractType<?>> types) throws IOException + { + if (!types.isEmpty()) + ClusteringPrefix.serializer.skipValuesWithoutSize(in, types.size(), version, types); + } + public Clustering deserialize(DataInputPlus in, int version, List<AbstractType<?>> types) throws IOException { if (types.isEmpty()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/db/ClusteringComparator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ClusteringComparator.java b/src/java/org/apache/cassandra/db/ClusteringComparator.java index f5f6ae8..3030b5a 100644 --- a/src/java/org/apache/cassandra/db/ClusteringComparator.java +++ b/src/java/org/apache/cassandra/db/ClusteringComparator.java @@ -29,7 +29,7 @@ import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.serializers.MarshalException; -import static org.apache.cassandra.io.sstable.IndexHelper.IndexInfo; +import org.apache.cassandra.io.sstable.IndexInfo; /** * A comparator of clustering prefixes (or more generally of {@link Clusterable}}. http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/db/ClusteringPrefix.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ClusteringPrefix.java b/src/java/org/apache/cassandra/db/ClusteringPrefix.java index 4854517..df0befc 100644 --- a/src/java/org/apache/cassandra/db/ClusteringPrefix.java +++ b/src/java/org/apache/cassandra/db/ClusteringPrefix.java @@ -263,6 +263,17 @@ public interface ClusteringPrefix extends IMeasurableMemory, Clusterable } } + public void skip(DataInputPlus in, int version, List<AbstractType<?>> types) throws IOException + { + Kind kind = Kind.values()[in.readByte()]; + // We shouldn't serialize static clusterings + assert kind != Kind.STATIC_CLUSTERING; + if (kind == Kind.CLUSTERING) + Clustering.serializer.skip(in, version, types); + else + RangeTombstone.Bound.serializer.skipValues(in, kind, version, types); + } + public ClusteringPrefix deserialize(DataInputPlus in, int version, List<AbstractType<?>> types) throws IOException { Kind kind = Kind.values()[in.readByte()]; @@ -350,6 +361,24 @@ public interface ClusteringPrefix extends IMeasurableMemory, Clusterable return values; } + void skipValuesWithoutSize(DataInputPlus in, int size, int version, List<AbstractType<?>> types) throws IOException + { + // Callers of this method should handle the case where size = 0 (in all case we want to return a special value anyway). + assert size > 0; + int offset = 0; + while (offset < size) + { + long header = in.readUnsignedVInt(); + int limit = Math.min(size, offset + 32); + while (offset < limit) + { + if (!isNull(header, offset) && !isEmpty(header, offset)) + types.get(offset).skipValue(in); + offset++; + } + } + } + /** * Whatever the type of a given clustering column is, its value can always be either empty or null. So we at least need to distinguish those * 2 values, and because we want to be able to store fixed width values without appending their (fixed) size first, we need a way to encode http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/db/ColumnIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java index 1942052..4dcceff 100644 --- a/src/java/org/apache/cassandra/db/ColumnIndex.java +++ b/src/java/org/apache/cassandra/db/ColumnIndex.java @@ -15,184 +15,234 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.cassandra.db; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.*; -import com.google.common.annotations.VisibleForTesting; +import com.google.common.primitives.Ints; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.rows.*; -import org.apache.cassandra.io.sstable.IndexHelper; +import org.apache.cassandra.io.ISerializer; +import org.apache.cassandra.io.sstable.IndexInfo; import org.apache.cassandra.io.sstable.format.SSTableFlushObserver; import org.apache.cassandra.io.sstable.format.Version; +import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.SequentialWriter; import org.apache.cassandra.utils.ByteBufferUtil; +/** + * Column index builder used by {@link org.apache.cassandra.io.sstable.format.big.BigTableWriter}. + * For index entries that exceed {@link org.apache.cassandra.config.Config#column_index_cache_size_in_kb}, + * this uses the serialization logic as in {@link RowIndexEntry}. + */ public class ColumnIndex { - public final long partitionHeaderLength; - public final List<IndexHelper.IndexInfo> columnsIndex; - private static final ColumnIndex EMPTY = new ColumnIndex(-1, Collections.<IndexHelper.IndexInfo>emptyList()); + // used, if the row-index-entry reaches config column_index_cache_size_in_kb + private DataOutputBuffer buffer; + // used to track the size of the serialized size of row-index-entry (unused for buffer) + private int indexSamplesSerializedSize; + // used, until the row-index-entry reaches config column_index_cache_size_in_kb + public List<IndexInfo> indexSamples = new ArrayList<>(); - private ColumnIndex(long partitionHeaderLength, List<IndexHelper.IndexInfo> columnsIndex) - { - assert columnsIndex != null; + public int columnIndexCount; + private int[] indexOffsets; - this.partitionHeaderLength = partitionHeaderLength; - this.columnsIndex = columnsIndex; - } + private final SerializationHeader header; + private final int version; + private final SequentialWriter writer; + private final long initialPosition; + private final ISerializer<IndexInfo> idxSerializer; + public long headerLength = -1; - public static ColumnIndex writeAndBuildIndex(UnfilteredRowIterator iterator, - SequentialWriter output, - SerializationHeader header, - Collection<SSTableFlushObserver> observers, - Version version) throws IOException - { - assert !iterator.isEmpty() && version.storeRows(); + private long startPosition = -1; - Builder builder = new Builder(iterator, output, header, observers, version.correspondingMessagingVersion()); - return builder.build(); - } + private int written; + private long previousRowStart; + + private ClusteringPrefix firstClustering; + private ClusteringPrefix lastClustering; - @VisibleForTesting - public static ColumnIndex nothing() + private DeletionTime openMarker; + + private final Collection<SSTableFlushObserver> observers; + + public ColumnIndex(SerializationHeader header, + SequentialWriter writer, + Version version, + Collection<SSTableFlushObserver> observers, + ISerializer<IndexInfo> indexInfoSerializer) { - return EMPTY; + this.header = header; + this.idxSerializer = indexInfoSerializer; + this.writer = writer; + this.version = version.correspondingMessagingVersion(); + this.observers = observers; + this.initialPosition = writer.position(); } - /** - * Help to create an index for a column family based on size of columns, - * and write said columns to disk. - */ - private static class Builder + public void buildRowIndex(UnfilteredRowIterator iterator) throws IOException { - private final UnfilteredRowIterator iterator; - private final SequentialWriter writer; - private final SerializationHeader header; - private final int version; + writePartitionHeader(iterator); + this.headerLength = writer.position() - initialPosition; - private final List<IndexHelper.IndexInfo> columnsIndex = new ArrayList<>(); - private final long initialPosition; - private long headerLength = -1; + while (iterator.hasNext()) + add(iterator.next()); - private long startPosition = -1; + close(); + } - private int written; - private long previousRowStart; + private void writePartitionHeader(UnfilteredRowIterator iterator) throws IOException + { + ByteBufferUtil.writeWithShortLength(iterator.partitionKey().getKey(), writer); + DeletionTime.serializer.serialize(iterator.partitionLevelDeletion(), writer); + if (header.hasStatic()) + { + Row staticRow = iterator.staticRow(); - private ClusteringPrefix firstClustering; - private ClusteringPrefix lastClustering; + UnfilteredSerializer.serializer.serializeStaticRow(staticRow, header, writer, version); + if (!observers.isEmpty()) + observers.forEach((o) -> o.nextUnfilteredCluster(staticRow)); + } + } - private DeletionTime openMarker; + private long currentPosition() + { + return writer.position() - initialPosition; + } - private final Collection<SSTableFlushObserver> observers; + public ByteBuffer buffer() + { + return buffer != null ? buffer.buffer() : null; + } - public Builder(UnfilteredRowIterator iterator, - SequentialWriter writer, - SerializationHeader header, - Collection<SSTableFlushObserver> observers, - int version) + public int[] offsets() + { + return indexOffsets != null + ? Arrays.copyOf(indexOffsets, columnIndexCount) + : null; + } + + private void addIndexBlock() throws IOException + { + IndexInfo cIndexInfo = new IndexInfo(firstClustering, + lastClustering, + startPosition, + currentPosition() - startPosition, + openMarker); + + // indexOffsets is used for both shallow (ShallowIndexedEntry) and non-shallow IndexedEntry. + // For shallow ones, we need it to serialize the offsts in close(). + // For non-shallow ones, the offsts are passed into IndexedEntry, so we don't have to + // calculate the offsets again. + + // indexOffsets contains the offsets of the serialized IndexInfo objects. + // I.e. indexOffsets[0] is always 0 so we don't have to deal with a special handling + // for index #0 and always subtracting 1 for the index (which could be error-prone). + if (indexOffsets == null) + indexOffsets = new int[10]; + else { - this.iterator = iterator; - this.writer = writer; - this.header = header; - this.version = version; - this.observers = observers == null ? Collections.emptyList() : observers; - this.initialPosition = writer.position(); + if (columnIndexCount >= indexOffsets.length) + indexOffsets = Arrays.copyOf(indexOffsets, indexOffsets.length + 10); + indexOffsets[columnIndexCount] = + buffer != null + ? Ints.checkedCast(buffer.position()) + : indexSamplesSerializedSize; } + columnIndexCount++; - private void writePartitionHeader(UnfilteredRowIterator iterator) throws IOException + // First, we collect the IndexInfo objects until we reach Config.column_index_cache_size_in_kb in an ArrayList. + // When column_index_cache_size_in_kb is reached, we switch to byte-buffer mode. + if (buffer == null) { - ByteBufferUtil.writeWithShortLength(iterator.partitionKey().getKey(), writer); - DeletionTime.serializer.serialize(iterator.partitionLevelDeletion(), writer); - if (header.hasStatic()) + indexSamplesSerializedSize += idxSerializer.serializedSize(cIndexInfo); + if (indexSamplesSerializedSize + columnIndexCount * TypeSizes.sizeof(0) > DatabaseDescriptor.getColumnIndexCacheSize()) { - Row staticRow = iterator.staticRow(); - - UnfilteredSerializer.serializer.serializeStaticRow(staticRow, header, writer, version); - if (!observers.isEmpty()) - observers.forEach((o) -> o.nextUnfilteredCluster(staticRow)); + buffer = new DataOutputBuffer(DatabaseDescriptor.getColumnIndexCacheSize() * 2); + for (IndexInfo indexSample : indexSamples) + { + idxSerializer.serialize(indexSample, buffer); + } + indexSamples = null; + } + else + { + indexSamples.add(cIndexInfo); } } - - public ColumnIndex build() throws IOException + // don't put an else here... + if (buffer != null) { - writePartitionHeader(iterator); - this.headerLength = writer.position() - initialPosition; - - while (iterator.hasNext()) - add(iterator.next()); - - return close(); + idxSerializer.serialize(cIndexInfo, buffer); } - private long currentPosition() - { - return writer.position() - initialPosition; - } + firstClustering = null; + } + + private void add(Unfiltered unfiltered) throws IOException + { + long pos = currentPosition(); - private void addIndexBlock() + if (firstClustering == null) { - IndexHelper.IndexInfo cIndexInfo = new IndexHelper.IndexInfo(firstClustering, - lastClustering, - startPosition, - currentPosition() - startPosition, - openMarker); - columnsIndex.add(cIndexInfo); - firstClustering = null; + // Beginning of an index block. Remember the start and position + firstClustering = unfiltered.clustering(); + startPosition = pos; } - private void add(Unfiltered unfiltered) throws IOException - { - long pos = currentPosition(); + UnfilteredSerializer.serializer.serialize(unfiltered, header, writer, pos - previousRowStart, version); - if (firstClustering == null) - { - // Beginning of an index block. Remember the start and position - firstClustering = unfiltered.clustering(); - startPosition = pos; - } + // notify observers about each new row + if (!observers.isEmpty()) + observers.forEach((o) -> o.nextUnfilteredCluster(unfiltered)); - UnfilteredSerializer.serializer.serialize(unfiltered, header, writer, pos - previousRowStart, version); + lastClustering = unfiltered.clustering(); + previousRowStart = pos; + ++written; - // notify observers about each new row - if (!observers.isEmpty()) - observers.forEach((o) -> o.nextUnfilteredCluster(unfiltered)); - - lastClustering = unfiltered.clustering(); - previousRowStart = pos; - ++written; + if (unfiltered.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER) + { + RangeTombstoneMarker marker = (RangeTombstoneMarker) unfiltered; + openMarker = marker.isOpen(false) ? marker.openDeletionTime(false) : null; + } - if (unfiltered.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER) - { - RangeTombstoneMarker marker = (RangeTombstoneMarker)unfiltered; - openMarker = marker.isOpen(false) ? marker.openDeletionTime(false) : null; - } + // if we hit the column index size that we have to index after, go ahead and index it. + if (currentPosition() - startPosition >= DatabaseDescriptor.getColumnIndexSize()) + addIndexBlock(); + } - // if we hit the column index size that we have to index after, go ahead and index it. - if (currentPosition() - startPosition >= DatabaseDescriptor.getColumnIndexSize()) - addIndexBlock(); + private void close() throws IOException + { + UnfilteredSerializer.serializer.writeEndOfPartition(writer); - } + // It's possible we add no rows, just a top level deletion + if (written == 0) + return; - private ColumnIndex close() throws IOException - { - UnfilteredSerializer.serializer.writeEndOfPartition(writer); + // the last column may have fallen on an index boundary already. if not, index it explicitly. + if (firstClustering != null) + addIndexBlock(); - // It's possible we add no rows, just a top level deletion - if (written == 0) - return ColumnIndex.EMPTY; + // If we serialize the IndexInfo objects directly in the code above into 'buffer', + // we have to write the offsts to these here. The offsets have already been are collected + // in indexOffsets[]. buffer is != null, if it exceeds Config.column_index_cache_size_in_kb. + // In the other case, when buffer==null, the offsets are serialized in RowIndexEntry.IndexedEntry.serialize(). + if (buffer != null) + RowIndexEntry.Serializer.serializeOffsets(buffer, indexOffsets, columnIndexCount); - // the last column may have fallen on an index boundary already. if not, index it explicitly. - if (firstClustering != null) - addIndexBlock(); + // we should always have at least one computed index block, but we only write it out if there is more than that. + assert columnIndexCount > 0 && headerLength >= 0; + } - // we should always have at least one computed index block, but we only write it out if there is more than that. - assert columnsIndex.size() > 0 && headerLength >= 0; - return new ColumnIndex(headerLength, columnsIndex); - } + public int indexInfoSerializedSize() + { + return buffer != null + ? buffer.buffer().limit() + : indexSamplesSerializedSize + columnIndexCount * TypeSizes.sizeof(0); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/db/RangeTombstone.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RangeTombstone.java b/src/java/org/apache/cassandra/db/RangeTombstone.java index 8af3b97..29feb46 100644 --- a/src/java/org/apache/cassandra/db/RangeTombstone.java +++ b/src/java/org/apache/cassandra/db/RangeTombstone.java @@ -202,6 +202,16 @@ public class RangeTombstone return deserializeValues(in, kind, version, types); } + public void skipValues(DataInputPlus in, Kind kind, int version, + List<AbstractType<?>> types) throws IOException + { + int size = in.readUnsignedShort(); + if (size == 0) + return; + + ClusteringPrefix.serializer.skipValuesWithoutSize(in, size, version, types); + } + public RangeTombstone.Bound deserializeValues(DataInputPlus in, Kind kind, int version, List<AbstractType<?>> types) throws IOException { http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/db/RowIndexEntry.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RowIndexEntry.java b/src/java/org/apache/cassandra/db/RowIndexEntry.java index 4e2f063..7fda245 100644 --- a/src/java/org/apache/cassandra/db/RowIndexEntry.java +++ b/src/java/org/apache/cassandra/db/RowIndexEntry.java @@ -18,50 +18,140 @@ package org.apache.cassandra.db; import java.io.IOException; -import java.util.ArrayList; +import java.nio.ByteBuffer; import java.util.Arrays; -import java.util.Collections; import java.util.List; -import com.google.common.primitives.Ints; - +import com.codahale.metrics.Histogram; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.cache.IMeasurableMemory; -import org.apache.cassandra.io.sstable.IndexHelper; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.io.ISerializer; +import org.apache.cassandra.io.sstable.IndexInfo; import org.apache.cassandra.io.sstable.format.Version; import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.io.util.FileDataInput; +import org.apache.cassandra.io.util.SegmentedFile; +import org.apache.cassandra.io.util.TrackedDataInputPlus; +import org.apache.cassandra.metrics.DefaultNameFactory; +import org.apache.cassandra.metrics.MetricNameFactory; import org.apache.cassandra.utils.ObjectSizes; +import org.apache.cassandra.utils.vint.VIntCoding; +import org.github.jamm.Unmetered; + +import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; +/** + * Binary format of {@code RowIndexEntry} is defined as follows: + * {@code + * (long) position (64 bit long, vint encoded) + * (int) serialized size of data that follows (32 bit int, vint encoded) + * -- following for indexed entries only (so serialized size > 0) + * (int) DeletionTime.localDeletionTime + * (long) DeletionTime.markedForDeletionAt + * (int) number of IndexInfo objects (32 bit int, vint encoded) + * (*) serialized IndexInfo objects, see below + * (*) offsets of serialized IndexInfo objects, since version "ma" (3.0) + * Each IndexInfo object's offset is relative to the first IndexInfo object. + * } + * <p> + * See {@link IndexInfo} for a description of the serialized format. + * </p> + * + * <p> + * For each partition, the layout of the index file looks like this: + * </p> + * <ol> + * <li>partition key - prefixed with {@code short} length</li> + * <li>serialized {@code RowIndexEntry} objects</li> + * </ol> + * + * <p> + * Generally, we distinguish between index entries that have <i>index + * samples</i> (list of {@link IndexInfo} objects) and those who don't. + * For each <i>portion</i> of data for a single partition in the data file, + * an index sample is created. The size of that <i>portion</i> is defined + * by {@link org.apache.cassandra.config.Config#column_index_size_in_kb}. + * </p> + * <p> + * Index entries with less than 2 index samples, will just store the + * position in the data file. + * </p> + * <p> + * Note: legacy sstables for index entries are those sstable formats that + * do <i>not</i> have an offsets table to index samples ({@link IndexInfo} + * objects). These are those sstables created on Cassandra versions + * earlier than 3.0. + * </p> + * <p> + * For index entries with index samples we store the index samples + * ({@link IndexInfo} objects). The bigger the partition, the more + * index samples are created. Since a huge amount of index samples + * will "pollute" the heap and cause huge GC pressure, Cassandra 3.6 + * (CASSANDRA-11206) distinguishes between index entries with an + * "acceptable" amount of index samples per partition and those + * with an "enormous" amount of index samples. The barrier + * is controlled by the configuration parameter + * {@link org.apache.cassandra.config.Config#column_index_cache_size_in_kb}. + * Index entries with a total serialized size of index samples up to + * {@code column_index_cache_size_in_kb} will be held in an array. + * Index entries exceeding that value will always be accessed from + * disk. + * </p> + * <p> + * This results in these classes: + * </p> + * <ul> + * <li>{@link RowIndexEntry} just stores the offset in the data file.</li> + * <li>{@link IndexedEntry} is for index entries with index samples + * and used for both current and legacy sstables, which do not exceed + * {@link org.apache.cassandra.config.Config#column_index_cache_size_in_kb}.</li> + * <li>{@link ShallowIndexedEntry} is for index entries with index samples + * that exceed {@link org.apache.cassandra.config.Config#column_index_cache_size_in_kb} + * for sstables with an offset table to the index samples.</li> + * <li>{@link LegacyShallowIndexedEntry} is for index entries with index samples + * that exceed {@link org.apache.cassandra.config.Config#column_index_cache_size_in_kb} + * but for legacy sstables.</li> + * </ul> + * <p> + * Since access to index samples on disk (obviously) requires some file + * reader, that functionality is encapsulated in implementations of + * {@link IndexInfoRetriever}. There is an implementation to access + * index samples of legacy sstables (without the offsets table), + * an implementation of access sstables with an offsets table. + * </p> + * <p> + * Until now (Cassandra 3.x), we still support reading from <i>legacy</i> sstables - + * i.e. sstables created by Cassandra < 3.0 (see {@link org.apache.cassandra.io.sstable.format.big.BigFormat}. + * </p> + * + */ public class RowIndexEntry<T> implements IMeasurableMemory { private static final long EMPTY_SIZE = ObjectSizes.measure(new RowIndexEntry(0)); - public final long position; + // constants for type of row-index-entry as serialized for saved-cache + static final int CACHE_NOT_INDEXED = 0; + static final int CACHE_INDEXED = 1; + static final int CACHE_INDEXED_SHALLOW = 2; - public RowIndexEntry(long position) - { - this.position = position; + static final Histogram indexEntrySizeHistogram; + static final Histogram indexInfoCountHistogram; + static final Histogram indexInfoGetsHistogram; + static { + MetricNameFactory factory = new DefaultNameFactory("Index", "RowIndexEntry"); + indexEntrySizeHistogram = Metrics.histogram(factory.createMetricName("IndexedEntrySize"), false); + indexInfoCountHistogram = Metrics.histogram(factory.createMetricName("IndexInfoCount"), false); + indexInfoGetsHistogram = Metrics.histogram(factory.createMetricName("IndexInfoGets"), false); } - protected int promotedSize(IndexHelper.IndexInfo.Serializer idxSerializer) - { - return 0; - } + public final long position; - public static RowIndexEntry<IndexHelper.IndexInfo> create(long position, DeletionTime deletionTime, ColumnIndex index) + public RowIndexEntry(long position) { - assert index != null; - assert deletionTime != null; - - // we only consider the columns summary when determining whether to create an IndexedEntry, - // since if there are insufficient columns to be worth indexing we're going to seek to - // the beginning of the row anyway, so we might as well read the tombstone there as well. - if (index.columnsIndex.size() > 1) - return new IndexedEntry(position, deletionTime, index.partitionHeaderLength, index.columnsIndex); - else - return new RowIndexEntry<>(position); + this.position = position; } /** @@ -70,21 +160,17 @@ public class RowIndexEntry<T> implements IMeasurableMemory */ public boolean isIndexed() { - return !columnsIndex().isEmpty(); + return columnsIndexCount() > 1; } - public DeletionTime deletionTime() + public boolean indexOnHeap() { - throw new UnsupportedOperationException(); + return false; } - /** - * @return the offset to the start of the header information for this row. - * For some formats this may not be the start of the row. - */ - public long headerOffset() + public DeletionTime deletionTime() { - return 0; + throw new UnsupportedOperationException(); } /** @@ -97,9 +183,9 @@ public class RowIndexEntry<T> implements IMeasurableMemory throw new UnsupportedOperationException(); } - public List<T> columnsIndex() + public int columnsIndexCount() { - return Collections.emptyList(); + return 0; } public long unsharedHeapSize() @@ -107,129 +193,178 @@ public class RowIndexEntry<T> implements IMeasurableMemory return EMPTY_SIZE; } + /** + * @param dataFilePosition position of the partition in the {@link org.apache.cassandra.io.sstable.Component.Type#DATA} file + * @param indexFilePosition position in the {@link org.apache.cassandra.io.sstable.Component.Type#PRIMARY_INDEX} of the {@link RowIndexEntry} + * @param deletionTime deletion time of {@link RowIndexEntry} + * @param headerLength deletion time of {@link RowIndexEntry} + * @param columnIndexCount number of {@link IndexInfo} entries in the {@link RowIndexEntry} + * @param indexedPartSize serialized size of all serialized {@link IndexInfo} objects and their offsets + * @param indexSamples list with IndexInfo offsets (if total serialized size is less than {@link org.apache.cassandra.config.Config#column_index_cache_size_in_kb} + * @param offsets offsets of IndexInfo offsets + * @param idxInfoSerializer the {@link IndexInfo} serializer + */ + public static RowIndexEntry<IndexInfo> create(long dataFilePosition, long indexFilePosition, + DeletionTime deletionTime, long headerLength, int columnIndexCount, + int indexedPartSize, + List<IndexInfo> indexSamples, int[] offsets, + ISerializer<IndexInfo> idxInfoSerializer) + { + // If the "partition building code" in BigTableWriter.append() via ColumnIndex returns a list + // of IndexInfo objects, which is the case if the serialized size is less than + // Config.column_index_cache_size_in_kb, AND we have more than one IndexInfo object, we + // construct an IndexedEntry object. (note: indexSamples.size() and columnIndexCount have the same meaning) + if (indexSamples != null && indexSamples.size() > 1) + return new IndexedEntry(dataFilePosition, deletionTime, headerLength, + indexSamples.toArray(new IndexInfo[indexSamples.size()]), offsets, + indexedPartSize, idxInfoSerializer); + // Here we have to decide whether we have serialized IndexInfo objects that exceeds + // Config.column_index_cache_size_in_kb (not exceeding case covered above). + // Such a "big" indexed-entry is represented as a shallow one. + if (columnIndexCount > 1) + return new ShallowIndexedEntry(dataFilePosition, indexFilePosition, + deletionTime, headerLength, columnIndexCount, + indexedPartSize, idxInfoSerializer); + // Last case is that there are no index samples. + return new RowIndexEntry<>(dataFilePosition); + } + + public IndexInfoRetriever openWithIndex(SegmentedFile indexFile) + { + return null; + } + public interface IndexSerializer<T> { - void serialize(RowIndexEntry<T> rie, DataOutputPlus out) throws IOException; - RowIndexEntry<T> deserialize(DataInputPlus in) throws IOException; - int serializedSize(RowIndexEntry<T> rie); + void serialize(RowIndexEntry<T> rie, DataOutputPlus out, ByteBuffer indexInfo) throws IOException; + RowIndexEntry<T> deserialize(DataInputPlus in, long indexFilePosition) throws IOException; + void serializeForCache(RowIndexEntry<T> rie, DataOutputPlus out) throws IOException; + RowIndexEntry<T> deserializeForCache(DataInputPlus in) throws IOException; + + long deserializePositionAndSkip(DataInputPlus in) throws IOException; + + ISerializer<T> indexInfoSerializer(); } - public static class Serializer implements IndexSerializer<IndexHelper.IndexInfo> + public static final class Serializer implements IndexSerializer<IndexInfo> { - private final IndexHelper.IndexInfo.Serializer idxSerializer; + private final IndexInfo.Serializer idxInfoSerializer; private final Version version; public Serializer(CFMetaData metadata, Version version, SerializationHeader header) { - this.idxSerializer = new IndexHelper.IndexInfo.Serializer(metadata, version, header); + this.idxInfoSerializer = metadata.serializers().indexInfoSerializer(version, header); this.version = version; } - public void serialize(RowIndexEntry<IndexHelper.IndexInfo> rie, DataOutputPlus out) throws IOException + public IndexInfo.Serializer indexInfoSerializer() + { + return idxInfoSerializer; + } + + public void serialize(RowIndexEntry<IndexInfo> rie, DataOutputPlus out, ByteBuffer indexInfo) throws IOException { assert version.storeRows() : "We read old index files but we should never write them"; - out.writeUnsignedVInt(rie.position); - out.writeUnsignedVInt(rie.promotedSize(idxSerializer)); + rie.serialize(out, idxInfoSerializer, indexInfo); + } - if (rie.isIndexed()) - { - out.writeUnsignedVInt(rie.headerLength()); - DeletionTime.serializer.serialize(rie.deletionTime(), out); - out.writeUnsignedVInt(rie.columnsIndex().size()); + public void serializeForCache(RowIndexEntry<IndexInfo> rie, DataOutputPlus out) throws IOException + { + assert version.storeRows(); - // Calculate and write the offsets to the IndexInfo objects. + rie.serializeForCache(out); + } - int[] offsets = new int[rie.columnsIndex().size()]; + public RowIndexEntry<IndexInfo> deserializeForCache(DataInputPlus in) throws IOException + { + assert version.storeRows(); - if (out.hasPosition()) - { - // Out is usually a SequentialWriter, so using the file-pointer is fine to generate the offsets. - // A DataOutputBuffer also works. - long start = out.position(); - int i = 0; - for (IndexHelper.IndexInfo info : rie.columnsIndex()) - { - offsets[i] = i == 0 ? 0 : (int)(out.position() - start); - i++; - idxSerializer.serialize(info, out); - } - } - else - { - // Not sure this branch will ever be needed, but if it is called, it has to calculate the - // serialized sizes instead of simply using the file-pointer. - int i = 0; - int offset = 0; - for (IndexHelper.IndexInfo info : rie.columnsIndex()) - { - offsets[i++] = offset; - idxSerializer.serialize(info, out); - offset += idxSerializer.serializedSize(info); - } - } + long position = in.readUnsignedVInt(); - for (int off : offsets) - out.writeInt(off); + switch (in.readByte()) + { + case CACHE_NOT_INDEXED: + return new RowIndexEntry<>(position); + case CACHE_INDEXED: + return new IndexedEntry(position, in, idxInfoSerializer, version, true); + case CACHE_INDEXED_SHALLOW: + return new ShallowIndexedEntry(position, in, idxInfoSerializer); + default: + throw new AssertionError(); } } - public RowIndexEntry<IndexHelper.IndexInfo> deserialize(DataInputPlus in) throws IOException + public static void skipForCache(DataInputPlus in, Version version) throws IOException { - if (!version.storeRows()) - { - long position = in.readLong(); - - int size = in.readInt(); - if (size > 0) - { - DeletionTime deletionTime = DeletionTime.serializer.deserialize(in); - - int entries = in.readInt(); - List<IndexHelper.IndexInfo> columnsIndex = new ArrayList<>(entries); + assert version.storeRows(); - long headerLength = 0L; - for (int i = 0; i < entries; i++) - { - IndexHelper.IndexInfo info = idxSerializer.deserialize(in); - columnsIndex.add(info); - if (i == 0) - headerLength = info.offset; - } - - return new IndexedEntry(position, deletionTime, headerLength, columnsIndex); - } - else - { - return new RowIndexEntry<>(position); - } + /* long position = */in.readUnsignedVInt(); + switch (in.readByte()) + { + case CACHE_NOT_INDEXED: + break; + case CACHE_INDEXED: + IndexedEntry.skipForCache(in); + break; + case CACHE_INDEXED_SHALLOW: + ShallowIndexedEntry.skipForCache(in); + break; + default: + assert false; } + } + + public RowIndexEntry<IndexInfo> deserialize(DataInputPlus in, long indexFilePosition) throws IOException + { + if (!version.storeRows()) + return LegacyShallowIndexedEntry.deserialize(in, indexFilePosition, idxInfoSerializer, version); long position = in.readUnsignedVInt(); int size = (int)in.readUnsignedVInt(); - if (size > 0) + if (size == 0) + { + return new RowIndexEntry<>(position); + } + else { long headerLength = in.readUnsignedVInt(); DeletionTime deletionTime = DeletionTime.serializer.deserialize(in); - int entries = (int)in.readUnsignedVInt(); - List<IndexHelper.IndexInfo> columnsIndex = new ArrayList<>(entries); - for (int i = 0; i < entries; i++) - columnsIndex.add(idxSerializer.deserialize(in)); + int columnsIndexCount = (int) in.readUnsignedVInt(); - in.skipBytesFully(entries * TypeSizes.sizeof(0)); + int indexedPartSize = size - serializedSize(deletionTime, headerLength, columnsIndexCount); - return new IndexedEntry(position, deletionTime, headerLength, columnsIndex); - } - else - { - return new RowIndexEntry<>(position); + if (size <= DatabaseDescriptor.getColumnIndexCacheSize()) + { + return new IndexedEntry(position, in, deletionTime, headerLength, columnsIndexCount, + idxInfoSerializer, version, indexedPartSize); + } + else + { + in.skipBytes(indexedPartSize); + + return new ShallowIndexedEntry(position, + indexFilePosition, + deletionTime, headerLength, columnsIndexCount, + indexedPartSize, idxInfoSerializer); + } } } - // Reads only the data 'position' of the index entry and returns it. Note that this left 'in' in the middle - // of reading an entry, so this is only useful if you know what you are doing and in most case 'deserialize' - // should be used instead. + public long deserializePositionAndSkip(DataInputPlus in) throws IOException + { + if (!version.storeRows()) + return LegacyShallowIndexedEntry.deserializePositionAndSkip(in); + + return ShallowIndexedEntry.deserializePositionAndSkip(in); + } + + /** + * Reads only the data 'position' of the index entry and returns it. Note that this left 'in' in the middle + * of reading an entry, so this is only useful if you know what you are doing and in most case 'deserialize' + * should be used instead. + */ public static long readPosition(DataInputPlus in, Version version) throws IOException { return version.storeRows() ? in.readUnsignedVInt() : in.readLong(); @@ -250,51 +385,295 @@ public class RowIndexEntry<T> implements IMeasurableMemory in.skipBytesFully(size); } - public int serializedSize(RowIndexEntry<IndexHelper.IndexInfo> rie) + public static void serializeOffsets(DataOutputBuffer out, int[] indexOffsets, int columnIndexCount) throws IOException { - assert version.storeRows() : "We read old index files but we should never write them"; + for (int i = 0; i < columnIndexCount; i++) + out.writeInt(indexOffsets[i]); + } + } + + private static int serializedSize(DeletionTime deletionTime, long headerLength, int columnIndexCount) + { + return TypeSizes.sizeofUnsignedVInt(headerLength) + + (int) DeletionTime.serializer.serializedSize(deletionTime) + + TypeSizes.sizeofUnsignedVInt(columnIndexCount); + } + + public void serialize(DataOutputPlus out, IndexInfo.Serializer idxInfoSerializer, ByteBuffer indexInfo) throws IOException + { + out.writeUnsignedVInt(position); + + out.writeUnsignedVInt(0); + } + + public void serializeForCache(DataOutputPlus out) throws IOException + { + out.writeUnsignedVInt(position); + + out.writeByte(CACHE_NOT_INDEXED); + } + + private static final class LegacyShallowIndexedEntry extends RowIndexEntry<IndexInfo> + { + private static final long BASE_SIZE; + static + { + BASE_SIZE = ObjectSizes.measure(new LegacyShallowIndexedEntry(0, 0, DeletionTime.LIVE, 0, new int[0], null, 0)); + } + + private final long indexFilePosition; + private final int[] offsets; + @Unmetered + private final IndexInfo.Serializer idxInfoSerializer; + private final DeletionTime deletionTime; + private final long headerLength; + private final int serializedSize; + + private LegacyShallowIndexedEntry(long dataFilePosition, long indexFilePosition, + DeletionTime deletionTime, long headerLength, + int[] offsets, IndexInfo.Serializer idxInfoSerializer, + int serializedSize) + { + super(dataFilePosition); + this.deletionTime = deletionTime; + this.headerLength = headerLength; + this.indexFilePosition = indexFilePosition; + this.offsets = offsets; + this.idxInfoSerializer = idxInfoSerializer; + this.serializedSize = serializedSize; + } + + @Override + public DeletionTime deletionTime() + { + return deletionTime; + } + + @Override + public long headerLength() + { + return headerLength; + } + + @Override + public long unsharedHeapSize() + { + return BASE_SIZE + offsets.length * TypeSizes.sizeof(0); + } + + @Override + public int columnsIndexCount() + { + return offsets.length; + } + + @Override + public void serialize(DataOutputPlus out, IndexInfo.Serializer idxInfoSerializer, ByteBuffer indexInfo) + { + throw new UnsupportedOperationException("serializing legacy index entries is not supported"); + } + + @Override + public void serializeForCache(DataOutputPlus out) + { + throw new UnsupportedOperationException("serializing legacy index entries is not supported"); + } - int indexedSize = 0; - if (rie.isIndexed()) + @Override + public IndexInfoRetriever openWithIndex(SegmentedFile indexFile) + { + int fieldsSize = (int) DeletionTime.serializer.serializedSize(deletionTime) + + TypeSizes.sizeof(0); // columnIndexCount + indexEntrySizeHistogram.update(serializedSize); + indexInfoCountHistogram.update(offsets.length); + return new LegacyIndexInfoRetriever(indexFilePosition + + TypeSizes.sizeof(0L) + // position + TypeSizes.sizeof(0) + // indexInfoSize + fieldsSize, + offsets, indexFile.createReader(), idxInfoSerializer); + } + + public static RowIndexEntry<IndexInfo> deserialize(DataInputPlus in, long indexFilePosition, + IndexInfo.Serializer idxInfoSerializer, Version version) throws IOException + { + long dataFilePosition = in.readLong(); + + int size = in.readInt(); + if (size == 0) { - List<IndexHelper.IndexInfo> index = rie.columnsIndex(); + return new RowIndexEntry<>(dataFilePosition); + } + else if (size <= DatabaseDescriptor.getColumnIndexCacheSize()) + { + return new IndexedEntry(dataFilePosition, in, idxInfoSerializer, version, false); + } + else + { + DeletionTime deletionTime = DeletionTime.serializer.deserialize(in); - indexedSize += TypeSizes.sizeofUnsignedVInt(rie.headerLength()); - indexedSize += DeletionTime.serializer.serializedSize(rie.deletionTime()); - indexedSize += TypeSizes.sizeofUnsignedVInt(index.size()); + // For legacy sstables (i.e. sstables pre-"ma", pre-3.0) we have to scan all serialized IndexInfo + // objects to calculate the offsets array. However, it might be possible to deserialize all + // IndexInfo objects here - but to just skip feels more gentle to the heap/GC. - for (IndexHelper.IndexInfo info : index) - indexedSize += idxSerializer.serializedSize(info); + int entries = in.readInt(); + int[] offsets = new int[entries]; + + TrackedDataInputPlus tracked = new TrackedDataInputPlus(in); + long start = tracked.getBytesRead(); + long headerLength = 0L; + for (int i = 0; i < entries; i++) + { + offsets[i] = (int) (tracked.getBytesRead() - start); + if (i == 0) + { + IndexInfo info = idxInfoSerializer.deserialize(tracked); + headerLength = info.offset; + } + else + idxInfoSerializer.skip(tracked); + } - indexedSize += index.size() * TypeSizes.sizeof(0); + return new LegacyShallowIndexedEntry(dataFilePosition, indexFilePosition, deletionTime, headerLength, offsets, idxInfoSerializer, size); } + } + + static long deserializePositionAndSkip(DataInputPlus in) throws IOException + { + long position = in.readLong(); + + int size = in.readInt(); + if (size > 0) + in.skipBytesFully(size); - return TypeSizes.sizeofUnsignedVInt(rie.position) + TypeSizes.sizeofUnsignedVInt(indexedSize) + indexedSize; + return position; + } + } + + private static final class LegacyIndexInfoRetriever extends FileIndexInfoRetriever + { + private final int[] offsets; + + private LegacyIndexInfoRetriever(long indexFilePosition, int[] offsets, FileDataInput reader, IndexInfo.Serializer idxInfoSerializer) + { + super(indexFilePosition, offsets.length, reader, idxInfoSerializer); + this.offsets = offsets; + } + + IndexInfo fetchIndex(int index) throws IOException + { + retrievals++; + + // seek to posision of IndexInfo + indexReader.seek(indexInfoFilePosition + offsets[index]); + + // deserialize IndexInfo + return idxInfoSerializer.deserialize(indexReader); } } /** - * An entry in the row index for a row whose columns are indexed. + * An entry in the row index for a row whose columns are indexed - used for both legacy and current formats. */ - private static class IndexedEntry extends RowIndexEntry<IndexHelper.IndexInfo> + private static final class IndexedEntry extends RowIndexEntry<IndexInfo> { - private final DeletionTime deletionTime; + private static final long BASE_SIZE; - // The offset in the file when the index entry end + static + { + BASE_SIZE = ObjectSizes.measure(new IndexedEntry(0, DeletionTime.LIVE, 0, null, null, 0, null)); + } + + private final DeletionTime deletionTime; private final long headerLength; - private final List<IndexHelper.IndexInfo> columnsIndex; - private static final long BASE_SIZE = - ObjectSizes.measure(new IndexedEntry(0, DeletionTime.LIVE, 0, Arrays.<IndexHelper.IndexInfo>asList(null, null))) - + ObjectSizes.measure(new ArrayList<>(1)); - private IndexedEntry(long position, DeletionTime deletionTime, long headerLength, List<IndexHelper.IndexInfo> columnsIndex) + private final IndexInfo[] columnsIndex; + private final int[] offsets; + private final int indexedPartSize; + @Unmetered + private final ISerializer<IndexInfo> idxInfoSerializer; + + private IndexedEntry(long dataFilePosition, DeletionTime deletionTime, long headerLength, + IndexInfo[] columnsIndex, int[] offsets, + int indexedPartSize, ISerializer<IndexInfo> idxInfoSerializer) { - super(position); - assert deletionTime != null; - assert columnsIndex != null && columnsIndex.size() > 1; - this.deletionTime = deletionTime; + super(dataFilePosition); + this.headerLength = headerLength; + this.deletionTime = deletionTime; + this.columnsIndex = columnsIndex; + this.offsets = offsets; + this.indexedPartSize = indexedPartSize; + this.idxInfoSerializer = idxInfoSerializer; + } + + private IndexedEntry(long dataFilePosition, DataInputPlus in, + DeletionTime deletionTime, long headerLength, int columnIndexCount, + IndexInfo.Serializer idxInfoSerializer, + Version version, int indexedPartSize) throws IOException + { + super(dataFilePosition); + + this.headerLength = headerLength; + this.deletionTime = deletionTime; + int columnsIndexCount = columnIndexCount; + + this.columnsIndex = new IndexInfo[columnsIndexCount]; + for (int i = 0; i < columnsIndexCount; i++) + this.columnsIndex[i] = idxInfoSerializer.deserialize(in); + + int[] offsets = null; + if (version.storeRows()) + { + offsets = new int[this.columnsIndex.length]; + for (int i = 0; i < offsets.length; i++) + offsets[i] = in.readInt(); + } + this.offsets = offsets; + + this.indexedPartSize = indexedPartSize; + + this.idxInfoSerializer = idxInfoSerializer; + } + + private IndexedEntry(long dataFilePosition, DataInputPlus in, IndexInfo.Serializer idxInfoSerializer, Version version, boolean forCache) throws IOException + { + super(dataFilePosition); + + this.headerLength = in.readUnsignedVInt(); + this.deletionTime = DeletionTime.serializer.deserialize(in); + int columnsIndexCount = (int) in.readUnsignedVInt(); + + TrackedDataInputPlus trackedIn = new TrackedDataInputPlus(in); + + this.columnsIndex = new IndexInfo[columnsIndexCount]; + for (int i = 0; i < columnsIndexCount; i++) + this.columnsIndex[i] = idxInfoSerializer.deserialize(trackedIn); + + int[] offsets = null; + if (!forCache && version.storeRows()) + { + offsets = new int[this.columnsIndex.length]; + for (int i = 0; i < offsets.length; i++) + offsets[i] = trackedIn.readInt(); + } + this.offsets = offsets; + + this.indexedPartSize = (int) trackedIn.getBytesRead(); + + this.idxInfoSerializer = idxInfoSerializer; + } + + @Override + public boolean indexOnHeap() + { + return true; + } + + @Override + public int columnsIndexCount() + { + return columnsIndex.length; } @Override @@ -310,36 +689,337 @@ public class RowIndexEntry<T> implements IMeasurableMemory } @Override - public List<IndexHelper.IndexInfo> columnsIndex() + public IndexInfoRetriever openWithIndex(SegmentedFile indexFile) + { + indexEntrySizeHistogram.update(serializedSize(deletionTime, headerLength, columnsIndex.length) + indexedPartSize); + indexInfoCountHistogram.update(columnsIndex.length); + return new IndexInfoRetriever() + { + private int retrievals; + + @Override + public IndexInfo columnsIndex(int index) + { + retrievals++; + return columnsIndex[index]; + } + + public void close() + { + indexInfoGetsHistogram.update(retrievals); + } + }; + } + + @Override + public long unsharedHeapSize() + { + long entrySize = 0; + for (IndexInfo idx : columnsIndex) + entrySize += idx.unsharedHeapSize(); + return BASE_SIZE + + entrySize + + ObjectSizes.sizeOfReferenceArray(columnsIndex.length); + } + + @Override + public void serialize(DataOutputPlus out, IndexInfo.Serializer idxInfoSerializer, ByteBuffer indexInfo) throws IOException + { + assert indexedPartSize != Integer.MIN_VALUE; + + out.writeUnsignedVInt(position); + + out.writeUnsignedVInt(serializedSize(deletionTime, headerLength, columnsIndex.length) + indexedPartSize); + + out.writeUnsignedVInt(headerLength); + DeletionTime.serializer.serialize(deletionTime, out); + out.writeUnsignedVInt(columnsIndex.length); + for (IndexInfo info : columnsIndex) + idxInfoSerializer.serialize(info, out); + for (int offset : offsets) + out.writeInt(offset); + } + + @Override + public void serializeForCache(DataOutputPlus out) throws IOException + { + out.writeUnsignedVInt(position); + out.writeByte(CACHE_INDEXED); + + out.writeUnsignedVInt(headerLength); + DeletionTime.serializer.serialize(deletionTime, out); + out.writeUnsignedVInt(columnsIndexCount()); + + for (IndexInfo indexInfo : columnsIndex) + idxInfoSerializer.serialize(indexInfo, out); + } + + static void skipForCache(DataInputPlus in) throws IOException + { + /*long headerLength =*/in.readUnsignedVInt(); + /*DeletionTime deletionTime = */DeletionTime.serializer.skip(in); + /*int columnsIndexCount = (int)*/in.readUnsignedVInt(); + + /*int indexedPartSize = (int)*/in.readUnsignedVInt(); + } + } + + /** + * An entry in the row index for a row whose columns are indexed and the {@link IndexInfo} objects + * are not read into the key cache. + */ + private static final class ShallowIndexedEntry extends RowIndexEntry<IndexInfo> + { + private static final long BASE_SIZE; + + static + { + BASE_SIZE = ObjectSizes.measure(new ShallowIndexedEntry(0, 0, DeletionTime.LIVE, 0, 10, 0, null)); + } + + private final long indexFilePosition; + + private final DeletionTime deletionTime; + private final long headerLength; + private final int columnsIndexCount; + + private final int indexedPartSize; + private final int offsetsOffset; + @Unmetered + private final ISerializer<IndexInfo> idxInfoSerializer; + private final int fieldsSerializedSize; + + /** + * See {@link #create(long, long, DeletionTime, long, int, int, List, int[], ISerializer)} for a description + * of the parameters. + */ + private ShallowIndexedEntry(long dataFilePosition, long indexFilePosition, + DeletionTime deletionTime, long headerLength, int columnIndexCount, + int indexedPartSize, ISerializer<IndexInfo> idxInfoSerializer) + { + super(dataFilePosition); + + assert columnIndexCount > 1; + + this.indexFilePosition = indexFilePosition; + this.headerLength = headerLength; + this.deletionTime = deletionTime; + this.columnsIndexCount = columnIndexCount; + + this.indexedPartSize = indexedPartSize; + this.idxInfoSerializer = idxInfoSerializer; + + this.fieldsSerializedSize = serializedSize(deletionTime, headerLength, columnIndexCount); + this.offsetsOffset = indexedPartSize + fieldsSerializedSize - columnsIndexCount * TypeSizes.sizeof(0); + } + + /** + * Constructor for key-cache deserialization + */ + private ShallowIndexedEntry(long dataFilePosition, DataInputPlus in, IndexInfo.Serializer idxInfoSerializer) throws IOException + { + super(dataFilePosition); + + this.indexFilePosition = in.readUnsignedVInt(); + + this.headerLength = in.readUnsignedVInt(); + this.deletionTime = DeletionTime.serializer.deserialize(in); + this.columnsIndexCount = (int) in.readUnsignedVInt(); + + this.indexedPartSize = (int) in.readUnsignedVInt(); + + this.idxInfoSerializer = idxInfoSerializer; + + this.fieldsSerializedSize = serializedSize(deletionTime, headerLength, columnsIndexCount); + this.offsetsOffset = indexedPartSize + fieldsSerializedSize - columnsIndexCount * TypeSizes.sizeof(0); + } + + @Override + public int columnsIndexCount() { - return columnsIndex; + return columnsIndexCount; } @Override - protected int promotedSize(IndexHelper.IndexInfo.Serializer idxSerializer) + public DeletionTime deletionTime() { - long size = TypeSizes.sizeofUnsignedVInt(headerLength) - + DeletionTime.serializer.serializedSize(deletionTime) - + TypeSizes.sizeofUnsignedVInt(columnsIndex.size()); // number of entries - for (IndexHelper.IndexInfo info : columnsIndex) - size += idxSerializer.serializedSize(info); + return deletionTime; + } - size += columnsIndex.size() * TypeSizes.sizeof(0); + @Override + public long headerLength() + { + return headerLength; + } - return Ints.checkedCast(size); + @Override + public IndexInfoRetriever openWithIndex(SegmentedFile indexFile) + { + indexEntrySizeHistogram.update(indexedPartSize + fieldsSerializedSize); + indexInfoCountHistogram.update(columnsIndexCount); + return new ShallowInfoRetriever(indexFilePosition + + VIntCoding.computeUnsignedVIntSize(position) + + VIntCoding.computeUnsignedVIntSize(indexedPartSize + fieldsSerializedSize) + + fieldsSerializedSize, + offsetsOffset - fieldsSerializedSize, + columnsIndexCount, indexFile.createReader(), idxInfoSerializer); } @Override public long unsharedHeapSize() { - long entrySize = 0; - for (IndexHelper.IndexInfo idx : columnsIndex) - entrySize += idx.unsharedHeapSize(); + return BASE_SIZE; + } - return BASE_SIZE - + entrySize - + deletionTime.unsharedHeapSize() - + ObjectSizes.sizeOfReferenceArray(columnsIndex.size()); + @Override + public void serialize(DataOutputPlus out, IndexInfo.Serializer idxInfoSerializer, ByteBuffer indexInfo) throws IOException + { + out.writeUnsignedVInt(position); + + out.writeUnsignedVInt(fieldsSerializedSize + indexInfo.limit()); + + out.writeUnsignedVInt(headerLength); + DeletionTime.serializer.serialize(deletionTime, out); + out.writeUnsignedVInt(columnsIndexCount); + + out.write(indexInfo); + } + + static long deserializePositionAndSkip(DataInputPlus in) throws IOException + { + long position = in.readUnsignedVInt(); + + int size = (int) in.readUnsignedVInt(); + if (size > 0) + in.skipBytesFully(size); + + return position; + } + + @Override + public void serializeForCache(DataOutputPlus out) throws IOException + { + out.writeUnsignedVInt(position); + out.writeByte(CACHE_INDEXED_SHALLOW); + + out.writeUnsignedVInt(indexFilePosition); + + out.writeUnsignedVInt(headerLength); + DeletionTime.serializer.serialize(deletionTime, out); + out.writeUnsignedVInt(columnsIndexCount); + + out.writeUnsignedVInt(indexedPartSize); + } + + static void skipForCache(DataInputPlus in) throws IOException + { + /*long indexFilePosition =*/in.readUnsignedVInt(); + + /*long headerLength =*/in.readUnsignedVInt(); + /*DeletionTime deletionTime = */DeletionTime.serializer.skip(in); + /*int columnsIndexCount = (int)*/in.readUnsignedVInt(); + + /*int indexedPartSize = (int)*/in.readUnsignedVInt(); + } + } + + private static final class ShallowInfoRetriever extends FileIndexInfoRetriever + { + private final int offsetsOffset; + + private ShallowInfoRetriever(long indexInfoFilePosition, int offsetsOffset, int indexCount, + FileDataInput indexReader, ISerializer<IndexInfo> idxInfoSerializer) + { + super(indexInfoFilePosition, indexCount, indexReader, idxInfoSerializer); + this.offsetsOffset = offsetsOffset; + } + + IndexInfo fetchIndex(int index) throws IOException + { + assert index >= 0 && index < indexCount; + + retrievals++; + + // seek to position in "offsets to IndexInfo" table + indexReader.seek(indexInfoFilePosition + offsetsOffset + index * TypeSizes.sizeof(0)); + + // read offset of IndexInfo + int indexInfoPos = indexReader.readInt(); + + // seek to posision of IndexInfo + indexReader.seek(indexInfoFilePosition + indexInfoPos); + + // finally, deserialize IndexInfo + return idxInfoSerializer.deserialize(indexReader); + } + } + + /** + * Base class to access {@link IndexInfo} objects. + */ + public interface IndexInfoRetriever extends AutoCloseable + { + IndexInfo columnsIndex(int index) throws IOException; + + void close() throws IOException; + } + + /** + * Base class to access {@link IndexInfo} objects on disk that keeps already + * read {@link IndexInfo} on heap. + */ + private abstract static class FileIndexInfoRetriever implements IndexInfoRetriever + { + final long indexInfoFilePosition; + final int indexCount; + final ISerializer<IndexInfo> idxInfoSerializer; + final FileDataInput indexReader; + int retrievals; + + private IndexInfo[] lastIndexes; + + /** + * + * @param indexInfoFilePosition offset of first serialized {@link IndexInfo} object + * @param indexCount number of {@link IndexInfo} objects + * @param indexReader file data input to access the index file, closed by this instance + * @param idxInfoSerializer the index serializer to deserialize {@link IndexInfo} objects + */ + FileIndexInfoRetriever(long indexInfoFilePosition, int indexCount, FileDataInput indexReader, ISerializer<IndexInfo> idxInfoSerializer) + { + this.indexInfoFilePosition = indexInfoFilePosition; + this.indexCount = indexCount; + this.idxInfoSerializer = idxInfoSerializer; + this.indexReader = indexReader; + } + + public final IndexInfo columnsIndex(int index) throws IOException + { + if (lastIndexes != null + && lastIndexes.length > index && lastIndexes[index] != null) + { + // return a previously read/deserialized IndexInfo + return lastIndexes[index]; + } + + if (lastIndexes == null) + lastIndexes = new IndexInfo[index + 1]; + else if (lastIndexes.length <= index) + lastIndexes = Arrays.copyOf(lastIndexes, index + 1); + + IndexInfo indexInfo = fetchIndex(index); + lastIndexes[index] = indexInfo; + + return indexInfo; + } + + abstract IndexInfo fetchIndex(int index) throws IOException; + + public void close() throws IOException + { + indexReader.close(); + + indexInfoGetsHistogram.update(retrievals); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/db/SerializationHeader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SerializationHeader.java b/src/java/org/apache/cassandra/db/SerializationHeader.java index 0fd1281..af2d434 100644 --- a/src/java/org/apache/cassandra/db/SerializationHeader.java +++ b/src/java/org/apache/cassandra/db/SerializationHeader.java @@ -29,7 +29,6 @@ import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.db.marshal.TypeParser; import org.apache.cassandra.io.sstable.format.SSTableReader; @@ -75,25 +74,6 @@ public class SerializationHeader return new SerializationHeader(true, metadata, metadata.partitionColumns(), EncodingStats.NO_STATS); } - public static SerializationHeader forKeyCache(CFMetaData metadata) - { - // We don't save type information in the key cache (we could change - // that but it's easier right now), so instead we simply use BytesType - // for both serialization and deserialization. Note that we also only - // serializer clustering prefixes in the key cache, so only the clusteringTypes - // really matter. - int size = metadata.clusteringColumns().size(); - List<AbstractType<?>> clusteringTypes = new ArrayList<>(size); - for (int i = 0; i < size; i++) - clusteringTypes.add(BytesType.instance); - return new SerializationHeader(false, - BytesType.instance, - clusteringTypes, - PartitionColumns.NONE, - EncodingStats.NO_STATS, - Collections.<ByteBuffer, AbstractType<?>>emptyMap()); - } - public static SerializationHeader make(CFMetaData metadata, Collection<SSTableReader> sstables) { // The serialization header has to be computed before the start of compaction (since it's used to write) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/src/java/org/apache/cassandra/db/Serializers.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Serializers.java b/src/java/org/apache/cassandra/db/Serializers.java index cef06a3..a0503e0 100644 --- a/src/java/org/apache/cassandra/db/Serializers.java +++ b/src/java/org/apache/cassandra/db/Serializers.java @@ -20,10 +20,15 @@ package org.apache.cassandra.db; import java.io.*; import java.nio.ByteBuffer; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.CompositeType; import org.apache.cassandra.io.ISerializer; +import org.apache.cassandra.io.sstable.IndexInfo; +import org.apache.cassandra.io.sstable.format.big.BigFormat; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.io.sstable.format.Version; @@ -36,36 +41,66 @@ public class Serializers { private final CFMetaData metadata; + private Map<Version, IndexInfo.Serializer> otherVersionClusteringSerializers; + + private final IndexInfo.Serializer latestVersionIndexSerializer; + public Serializers(CFMetaData metadata) { this.metadata = metadata; + this.latestVersionIndexSerializer = new IndexInfo.Serializer(BigFormat.latestVersion, + indexEntryClusteringPrefixSerializer(BigFormat.latestVersion, SerializationHeader.makeWithoutStats(metadata))); + } + + IndexInfo.Serializer indexInfoSerializer(Version version, SerializationHeader header) + { + // null header indicates streaming from pre-3.0 sstables + if (version.equals(BigFormat.latestVersion) && header != null) + return latestVersionIndexSerializer; + + if (otherVersionClusteringSerializers == null) + otherVersionClusteringSerializers = new ConcurrentHashMap<>(); + IndexInfo.Serializer serializer = otherVersionClusteringSerializers.get(version); + if (serializer == null) + { + serializer = new IndexInfo.Serializer(version, + indexEntryClusteringPrefixSerializer(version, header)); + otherVersionClusteringSerializers.put(version, serializer); + } + return serializer; } // TODO: Once we drop support for old (pre-3.0) sstables, we can drop this method and inline the calls to - // ClusteringPrefix.serializer in IndexHelper directly. At which point this whole class probably becomes + // ClusteringPrefix.serializer directly. At which point this whole class probably becomes // unecessary (since IndexInfo.Serializer won't depend on the metadata either). - public ISerializer<ClusteringPrefix> indexEntryClusteringPrefixSerializer(final Version version, final SerializationHeader header) + private ISerializer<ClusteringPrefix> indexEntryClusteringPrefixSerializer(Version version, SerializationHeader header) { if (!version.storeRows() || header == null) //null header indicates streaming from pre-3.0 sstables { return oldFormatSerializer(version); } - return newFormatSerializer(version, header); + return new NewFormatSerializer(version, header.clusteringTypes()); } - private ISerializer<ClusteringPrefix> oldFormatSerializer(final Version version) + private ISerializer<ClusteringPrefix> oldFormatSerializer(Version version) { return new ISerializer<ClusteringPrefix>() { - SerializationHeader newHeader = SerializationHeader.makeWithoutStats(metadata); + List<AbstractType<?>> clusteringTypes = SerializationHeader.makeWithoutStats(metadata).clusteringTypes(); public void serialize(ClusteringPrefix clustering, DataOutputPlus out) throws IOException { //we deserialize in the old format and serialize in the new format ClusteringPrefix.serializer.serialize(clustering, out, version.correspondingMessagingVersion(), - newHeader.clusteringTypes()); + clusteringTypes); + } + + @Override + public void skip(DataInputPlus in) throws IOException + { + ByteBufferUtil.skipShortLength(in); } public ClusteringPrefix deserialize(DataInputPlus in) throws IOException @@ -108,31 +143,41 @@ public class Serializers public long serializedSize(ClusteringPrefix clustering) { return ClusteringPrefix.serializer.serializedSize(clustering, version.correspondingMessagingVersion(), - newHeader.clusteringTypes()); + clusteringTypes); } }; } - - private ISerializer<ClusteringPrefix> newFormatSerializer(final Version version, final SerializationHeader header) + private static class NewFormatSerializer implements ISerializer<ClusteringPrefix> { - return new ISerializer<ClusteringPrefix>() //Reading and writing from/to the new sstable format + private final Version version; + private final List<AbstractType<?>> clusteringTypes; + + NewFormatSerializer(Version version, List<AbstractType<?>> clusteringTypes) { - public void serialize(ClusteringPrefix clustering, DataOutputPlus out) throws IOException - { - ClusteringPrefix.serializer.serialize(clustering, out, version.correspondingMessagingVersion(), header.clusteringTypes()); - } + this.version = version; + this.clusteringTypes = clusteringTypes; + } - public ClusteringPrefix deserialize(DataInputPlus in) throws IOException - { - return ClusteringPrefix.serializer.deserialize(in, version.correspondingMessagingVersion(), header.clusteringTypes()); - } + public void serialize(ClusteringPrefix clustering, DataOutputPlus out) throws IOException + { + ClusteringPrefix.serializer.serialize(clustering, out, version.correspondingMessagingVersion(), clusteringTypes); + } - public long serializedSize(ClusteringPrefix clustering) - { - return ClusteringPrefix.serializer.serializedSize(clustering, version.correspondingMessagingVersion(), header.clusteringTypes()); - } - }; - } + @Override + public void skip(DataInputPlus in) throws IOException + { + ClusteringPrefix.serializer.skip(in, version.correspondingMessagingVersion(), clusteringTypes); + } + + public ClusteringPrefix deserialize(DataInputPlus in) throws IOException + { + return ClusteringPrefix.serializer.deserialize(in, version.correspondingMessagingVersion(), clusteringTypes); + } -} \ No newline at end of file + public long serializedSize(ClusteringPrefix clustering) + { + return ClusteringPrefix.serializer.serializedSize(clustering, version.correspondingMessagingVersion(), clusteringTypes); + } + } +}