Repository: cassandra Updated Branches: refs/heads/trunk ae063e806 -> ef5bbedd6
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java index 0c7ee59..ebacf34 100644 --- a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java +++ b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java @@ -20,23 +20,52 @@ package org.apache.cassandra.db; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; +import java.util.Iterator; import java.util.List; +import com.google.common.primitives.Ints; + import org.apache.cassandra.Util; +import org.apache.cassandra.cache.IMeasurableMemory; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.db.columniterator.AbstractSSTableIterator; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.LongType; +import org.apache.cassandra.db.rows.AbstractUnfilteredRowIterator; +import org.apache.cassandra.db.rows.BTreeRow; +import org.apache.cassandra.db.rows.BufferCell; +import org.apache.cassandra.db.rows.ColumnData; import org.apache.cassandra.db.rows.EncodingStats; import org.apache.cassandra.db.partitions.*; -import org.apache.cassandra.io.sstable.IndexHelper; +import org.apache.cassandra.db.rows.RangeTombstoneMarker; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.Unfiltered; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.rows.UnfilteredSerializer; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.io.sstable.IndexInfo; +import org.apache.cassandra.io.sstable.format.SSTableFlushObserver; +import org.apache.cassandra.io.sstable.format.Version; import org.apache.cassandra.io.sstable.format.big.BigFormat; import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.io.util.SegmentedFile; import org.apache.cassandra.io.util.SequentialWriter; +import org.apache.cassandra.serializers.LongSerializer; +import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.ObjectSizes; +import org.apache.cassandra.utils.btree.BTree; import org.junit.Assert; import org.junit.Test; @@ -46,68 +75,330 @@ import static junit.framework.Assert.assertTrue; public class RowIndexEntryTest extends CQLTester { - private static final List<AbstractType<?>> clusterTypes = Collections.<AbstractType<?>>singletonList(LongType.instance); + private static final List<AbstractType<?>> clusterTypes = Collections.singletonList(LongType.instance); private static final ClusteringComparator comp = new ClusteringComparator(clusterTypes); - private static ClusteringPrefix cn(long l) + + private static final byte[] dummy_100k = new byte[100000]; + + private static Clustering cn(long l) { return Util.clustering(comp, l); } @Test - public void testArtificialIndexOf() throws IOException + public void testC11206AgainstPreviousArray() throws Exception + { + DatabaseDescriptor.setColumnIndexCacheSize(99999); + testC11206AgainstPrevious(); + } + + @Test + public void testC11206AgainstPreviousShallow() throws Exception + { + DatabaseDescriptor.setColumnIndexCacheSize(0); + testC11206AgainstPrevious(); + } + + private static void testC11206AgainstPrevious() throws Exception + { + // partition without IndexInfo + try (DoubleSerializer doubleSerializer = new DoubleSerializer()) + { + doubleSerializer.build(null, partitionKey(42L), + Collections.singletonList(cn(42)), + 0L); + assertEquals(doubleSerializer.rieOldSerialized, doubleSerializer.rieNewSerialized); + } + + // partition with multiple IndexInfo + try (DoubleSerializer doubleSerializer = new DoubleSerializer()) + { + doubleSerializer.build(null, partitionKey(42L), + Arrays.asList(cn(42), cn(43), cn(44)), + 0L); + assertEquals(doubleSerializer.rieOldSerialized, doubleSerializer.rieNewSerialized); + } + + // partition with multiple IndexInfo + try (DoubleSerializer doubleSerializer = new DoubleSerializer()) + { + doubleSerializer.build(null, partitionKey(42L), + Arrays.asList(cn(42), cn(43), cn(44), cn(45), cn(46), cn(47), cn(48), cn(49), cn(50), cn(51)), + 0L); + assertEquals(doubleSerializer.rieOldSerialized, doubleSerializer.rieNewSerialized); + } + } + + private static DecoratedKey partitionKey(long l) + { + ByteBuffer key = LongSerializer.instance.serialize(l); + Token token = Murmur3Partitioner.instance.getToken(key); + return new BufferDecoratedKey(token, key); + } + + private static class DoubleSerializer implements AutoCloseable { CFMetaData cfMeta = CFMetaData.compile("CREATE TABLE pipe.dev_null (pk bigint, ck bigint, val text, PRIMARY KEY(pk, ck))", "foo"); + Version version = BigFormat.latestVersion; DeletionTime deletionInfo = new DeletionTime(FBUtilities.timestampMicros(), FBUtilities.nowInSeconds()); + LivenessInfo primaryKeyLivenessInfo = LivenessInfo.EMPTY; + Row.Deletion deletion = Row.Deletion.LIVE; SerializationHeader header = new SerializationHeader(true, cfMeta, cfMeta.partitionColumns(), EncodingStats.NO_STATS); - IndexHelper.IndexInfo.Serializer indexSerializer = new IndexHelper.IndexInfo.Serializer(cfMeta, BigFormat.latestVersion, header); - - DataOutputBuffer dob = new DataOutputBuffer(); - dob.writeUnsignedVInt(0); - DeletionTime.serializer.serialize(DeletionTime.LIVE, dob); - dob.writeUnsignedVInt(3); - int off0 = dob.getLength(); - indexSerializer.serialize(new IndexHelper.IndexInfo(cn(0L), cn(5L), 0, 0, deletionInfo), dob); - int off1 = dob.getLength(); - indexSerializer.serialize(new IndexHelper.IndexInfo(cn(10L), cn(15L), 0, 0, deletionInfo), dob); - int off2 = dob.getLength(); - indexSerializer.serialize(new IndexHelper.IndexInfo(cn(20L), cn(25L), 0, 0, deletionInfo), dob); - dob.writeInt(off0); - dob.writeInt(off1); - dob.writeInt(off2); - - @SuppressWarnings("resource") DataOutputBuffer dobRie = new DataOutputBuffer(); - dobRie.writeUnsignedVInt(42L); - dobRie.writeUnsignedVInt(dob.getLength()); - dobRie.write(dob.buffer()); - - ByteBuffer buf = dobRie.buffer(); - - RowIndexEntry<IndexHelper.IndexInfo> rie = new RowIndexEntry.Serializer(cfMeta, BigFormat.latestVersion, header).deserialize(new DataInputBuffer(buf, false)); - - Assert.assertEquals(42L, rie.position); - - Assert.assertEquals(0, IndexHelper.indexFor(cn(-1L), rie.columnsIndex(), comp, false, -1)); - Assert.assertEquals(0, IndexHelper.indexFor(cn(5L), rie.columnsIndex(), comp, false, -1)); - Assert.assertEquals(1, IndexHelper.indexFor(cn(12L), rie.columnsIndex(), comp, false, -1)); - Assert.assertEquals(2, IndexHelper.indexFor(cn(17L), rie.columnsIndex(), comp, false, -1)); - Assert.assertEquals(3, IndexHelper.indexFor(cn(100L), rie.columnsIndex(), comp, false, -1)); - Assert.assertEquals(3, IndexHelper.indexFor(cn(100L), rie.columnsIndex(), comp, false, 0)); - Assert.assertEquals(3, IndexHelper.indexFor(cn(100L), rie.columnsIndex(), comp, false, 1)); - Assert.assertEquals(3, IndexHelper.indexFor(cn(100L), rie.columnsIndex(), comp, false, 2)); - Assert.assertEquals(3, IndexHelper.indexFor(cn(100L), rie.columnsIndex(), comp, false, 3)); - - Assert.assertEquals(-1, IndexHelper.indexFor(cn(-1L), rie.columnsIndex(), comp, true, -1)); - Assert.assertEquals(0, IndexHelper.indexFor(cn(5L), rie.columnsIndex(), comp, true, 3)); - Assert.assertEquals(0, IndexHelper.indexFor(cn(5L), rie.columnsIndex(), comp, true, 2)); - Assert.assertEquals(1, IndexHelper.indexFor(cn(17L), rie.columnsIndex(), comp, true, 3)); - Assert.assertEquals(2, IndexHelper.indexFor(cn(100L), rie.columnsIndex(), comp, true, 3)); - Assert.assertEquals(2, IndexHelper.indexFor(cn(100L), rie.columnsIndex(), comp, true, 4)); - Assert.assertEquals(1, IndexHelper.indexFor(cn(12L), rie.columnsIndex(), comp, true, 3)); - Assert.assertEquals(1, IndexHelper.indexFor(cn(12L), rie.columnsIndex(), comp, true, 2)); - Assert.assertEquals(1, IndexHelper.indexFor(cn(100L), rie.columnsIndex(), comp, true, 1)); - Assert.assertEquals(2, IndexHelper.indexFor(cn(100L), rie.columnsIndex(), comp, true, 2)); + + // create C-11206 + old serializer instances + RowIndexEntry.IndexSerializer rieSerializer = new RowIndexEntry.Serializer(cfMeta, version, header); + Pre_C_11206_RowIndexEntry.Serializer oldSerializer = new Pre_C_11206_RowIndexEntry.Serializer(cfMeta, version, header); + + @SuppressWarnings({ "resource", "IOResourceOpenedButNotSafelyClosed" }) + final DataOutputBuffer rieOutput = new DataOutputBuffer(1024); + @SuppressWarnings({ "resource", "IOResourceOpenedButNotSafelyClosed" }) + final DataOutputBuffer oldOutput = new DataOutputBuffer(1024); + + final SequentialWriter dataWriterNew; + final SequentialWriter dataWriterOld; + final org.apache.cassandra.db.ColumnIndex columnIndex; + + RowIndexEntry rieNew; + ByteBuffer rieNewSerialized; + Pre_C_11206_RowIndexEntry rieOld; + ByteBuffer rieOldSerialized; + + DoubleSerializer() throws IOException + { + File f = File.createTempFile("RowIndexEntryTest-", "db"); + dataWriterNew = new SequentialWriter(f, 1024, BufferType.ON_HEAP); + columnIndex = new org.apache.cassandra.db.ColumnIndex(header, dataWriterNew, version, Collections.emptyList(), + rieSerializer.indexInfoSerializer()); + + f = File.createTempFile("RowIndexEntryTest-", "db"); + dataWriterOld = new SequentialWriter(f, 1024, BufferType.ON_HEAP); + } + + public void close() throws Exception + { + dataWriterNew.close(); + dataWriterOld.close(); + } + + void build(Row staticRow, DecoratedKey partitionKey, + Collection<Clustering> clusterings, long startPosition) throws IOException + { + + Iterator<Clustering> clusteringIter = clusterings.iterator(); + columnIndex.buildRowIndex(makeRowIter(staticRow, partitionKey, clusteringIter, dataWriterNew)); + rieNew = RowIndexEntry.create(startPosition, 0L, + deletionInfo, columnIndex.headerLength, columnIndex.columnIndexCount, + columnIndex.indexInfoSerializedSize(), + columnIndex.indexSamples, columnIndex.offsets(), + rieSerializer.indexInfoSerializer()); + rieSerializer.serialize(rieNew, rieOutput, columnIndex.buffer()); + rieNewSerialized = rieOutput.buffer().duplicate(); + + Iterator<Clustering> clusteringIter2 = clusterings.iterator(); + ColumnIndex columnIndex = RowIndexEntryTest.ColumnIndex.writeAndBuildIndex(makeRowIter(staticRow, partitionKey, clusteringIter2, dataWriterOld), + dataWriterOld, header, Collections.emptySet(), BigFormat.latestVersion); + rieOld = Pre_C_11206_RowIndexEntry.create(startPosition, deletionInfo, columnIndex); + oldSerializer.serialize(rieOld, oldOutput); + rieOldSerialized = oldOutput.buffer().duplicate(); + } + + private AbstractUnfilteredRowIterator makeRowIter(Row staticRow, DecoratedKey partitionKey, + Iterator<Clustering> clusteringIter, SequentialWriter dataWriter) + { + return new AbstractUnfilteredRowIterator(cfMeta, partitionKey, deletionInfo, cfMeta.partitionColumns(), + staticRow, false, new EncodingStats(0, 0, 0)) + { + protected Unfiltered computeNext() + { + if (!clusteringIter.hasNext()) + return endOfData(); + try + { + // write some fake bytes to the data file to force writing the IndexInfo object + dataWriter.write(dummy_100k); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + return buildRow(clusteringIter.next()); + } + }; + } + + private Unfiltered buildRow(Clustering clustering) + { + BTree.Builder<ColumnData> builder = BTree.builder(ColumnData.comparator); + builder.add(BufferCell.live(cfMeta.allColumns().iterator().next(), + 1L, + ByteBuffer.allocate(0))); + return BTreeRow.create(clustering, primaryKeyLivenessInfo, deletion, builder.build()); + } + } + + /** + * Pre C-11206 code. + */ + static final class ColumnIndex + { + final long partitionHeaderLength; + final List<IndexInfo> columnsIndex; + + private static final ColumnIndex EMPTY = new ColumnIndex(-1, Collections.emptyList()); + + private ColumnIndex(long partitionHeaderLength, List<IndexInfo> columnsIndex) + { + assert columnsIndex != null; + + this.partitionHeaderLength = partitionHeaderLength; + this.columnsIndex = columnsIndex; + } + + static ColumnIndex writeAndBuildIndex(UnfilteredRowIterator iterator, + SequentialWriter output, + SerializationHeader header, + Collection<SSTableFlushObserver> observers, + Version version) throws IOException + { + assert !iterator.isEmpty() && version.storeRows(); + + Builder builder = new Builder(iterator, output, header, observers, version.correspondingMessagingVersion()); + return builder.build(); + } + + public static ColumnIndex nothing() + { + return EMPTY; + } + + /** + * Help to create an index for a column family based on size of columns, + * and write said columns to disk. + */ + private static class Builder + { + private final UnfilteredRowIterator iterator; + private final SequentialWriter writer; + private final SerializationHeader header; + private final int version; + + private final List<IndexInfo> columnsIndex = new ArrayList<>(); + private final long initialPosition; + private long headerLength = -1; + + private long startPosition = -1; + + private int written; + private long previousRowStart; + + private ClusteringPrefix firstClustering; + private ClusteringPrefix lastClustering; + + private DeletionTime openMarker; + + private final Collection<SSTableFlushObserver> observers; + + Builder(UnfilteredRowIterator iterator, + SequentialWriter writer, + SerializationHeader header, + Collection<SSTableFlushObserver> observers, + int version) + { + this.iterator = iterator; + this.writer = writer; + this.header = header; + this.version = version; + this.observers = observers == null ? Collections.emptyList() : observers; + this.initialPosition = writer.position(); + } + + private void writePartitionHeader(UnfilteredRowIterator iterator) throws IOException + { + ByteBufferUtil.writeWithShortLength(iterator.partitionKey().getKey(), writer); + DeletionTime.serializer.serialize(iterator.partitionLevelDeletion(), writer); + if (header.hasStatic()) + UnfilteredSerializer.serializer.serializeStaticRow(iterator.staticRow(), header, writer, version); + } + + public ColumnIndex build() throws IOException + { + writePartitionHeader(iterator); + this.headerLength = writer.position() - initialPosition; + + while (iterator.hasNext()) + add(iterator.next()); + + return close(); + } + + private long currentPosition() + { + return writer.position() - initialPosition; + } + + private void addIndexBlock() + { + IndexInfo cIndexInfo = new IndexInfo(firstClustering, + lastClustering, + startPosition, + currentPosition() - startPosition, + openMarker); + columnsIndex.add(cIndexInfo); + firstClustering = null; + } + + private void add(Unfiltered unfiltered) throws IOException + { + long pos = currentPosition(); + + if (firstClustering == null) + { + // Beginning of an index block. Remember the start and position + firstClustering = unfiltered.clustering(); + startPosition = pos; + } + + UnfilteredSerializer.serializer.serialize(unfiltered, header, writer, pos - previousRowStart, version); + + // notify observers about each new row + if (!observers.isEmpty()) + observers.forEach((o) -> o.nextUnfilteredCluster(unfiltered)); + + lastClustering = unfiltered.clustering(); + previousRowStart = pos; + ++written; + + if (unfiltered.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER) + { + RangeTombstoneMarker marker = (RangeTombstoneMarker)unfiltered; + openMarker = marker.isOpen(false) ? marker.openDeletionTime(false) : null; + } + + // if we hit the column index size that we have to index after, go ahead and index it. + if (currentPosition() - startPosition >= DatabaseDescriptor.getColumnIndexSize()) + addIndexBlock(); + + } + + private ColumnIndex close() throws IOException + { + UnfilteredSerializer.serializer.writeEndOfPartition(writer); + + // It's possible we add no rows, just a top level deletion + if (written == 0) + return RowIndexEntryTest.ColumnIndex.EMPTY; + + // the last column may have fallen on an index boundary already. if not, index it explicitly. + if (firstClustering != null) + addIndexBlock(); + + // we should always have at least one computed index block, but we only write it out if there is more than that. + assert !columnsIndex.isEmpty() && headerLength >= 0; + return new ColumnIndex(headerLength, columnsIndex); + } + } } @Test @@ -116,11 +407,11 @@ public class RowIndexEntryTest extends CQLTester String tableName = createTable("CREATE TABLE %s (a int, b text, c int, PRIMARY KEY(a, b))"); ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName); - final RowIndexEntry simple = new RowIndexEntry(123); + Pre_C_11206_RowIndexEntry simple = new Pre_C_11206_RowIndexEntry(123); DataOutputBuffer buffer = new DataOutputBuffer(); SerializationHeader header = new SerializationHeader(true, cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS); - RowIndexEntry.Serializer serializer = new RowIndexEntry.Serializer(cfs.metadata, BigFormat.latestVersion, header); + Pre_C_11206_RowIndexEntry.Serializer serializer = new Pre_C_11206_RowIndexEntry.Serializer(cfs.metadata, BigFormat.latestVersion, header); serializer.serialize(simple, buffer); @@ -128,16 +419,16 @@ public class RowIndexEntryTest extends CQLTester // write enough rows to ensure we get a few column index entries for (int i = 0; i <= DatabaseDescriptor.getColumnIndexSize() / 4; i++) - execute("INSERT INTO %s (a, b, c) VALUES (?, ?, ?)", 0, "" + i, i); + execute("INSERT INTO %s (a, b, c) VALUES (?, ?, ?)", 0, String.valueOf(i), i); ImmutableBTreePartition partition = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs).build()); File tempFile = File.createTempFile("row_index_entry_test", null); tempFile.deleteOnExit(); SequentialWriter writer = SequentialWriter.open(tempFile); - ColumnIndex columnIndex = ColumnIndex.writeAndBuildIndex(partition.unfilteredIterator(), writer, header, Collections.emptySet(), BigFormat.latestVersion); - RowIndexEntry<IndexHelper.IndexInfo> withIndex = RowIndexEntry.create(0xdeadbeef, DeletionTime.LIVE, columnIndex); - IndexHelper.IndexInfo.Serializer indexSerializer = new IndexHelper.IndexInfo.Serializer(cfs.metadata, BigFormat.latestVersion, header); + ColumnIndex columnIndex = RowIndexEntryTest.ColumnIndex.writeAndBuildIndex(partition.unfilteredIterator(), writer, header, Collections.emptySet(), BigFormat.latestVersion); + Pre_C_11206_RowIndexEntry withIndex = Pre_C_11206_RowIndexEntry.create(0xdeadbeef, DeletionTime.LIVE, columnIndex); + IndexInfo.Serializer indexSerializer = cfs.metadata.serializers().indexInfoSerializer(BigFormat.latestVersion, header); // sanity check assertTrue(columnIndex.columnsIndex.size() >= 3); @@ -174,11 +465,11 @@ public class RowIndexEntryTest extends CQLTester bb = buffer.buffer(); input = new DataInputBuffer(bb, false); - RowIndexEntry.Serializer.skip(input, BigFormat.latestVersion); + Pre_C_11206_RowIndexEntry.Serializer.skip(input, BigFormat.latestVersion); Assert.assertEquals(0, bb.remaining()); } - private void serializationCheck(RowIndexEntry<IndexHelper.IndexInfo> withIndex, IndexHelper.IndexInfo.Serializer indexSerializer, ByteBuffer bb, DataInputBuffer input) throws IOException + private static void serializationCheck(Pre_C_11206_RowIndexEntry withIndex, IndexInfo.Serializer indexSerializer, ByteBuffer bb, DataInputBuffer input) throws IOException { Assert.assertEquals(0xdeadbeef, input.readUnsignedVInt()); Assert.assertEquals(withIndex.promotedSize(indexSerializer), input.readUnsignedVInt()); @@ -193,7 +484,7 @@ public class RowIndexEntryTest extends CQLTester { int pos = bb.position(); offsets[i] = pos - offset; - IndexHelper.IndexInfo info = indexSerializer.deserialize(input); + IndexInfo info = indexSerializer.deserialize(input); int end = bb.position(); Assert.assertEquals(indexSerializer.serializedSize(info), end - pos); @@ -210,4 +501,361 @@ public class RowIndexEntryTest extends CQLTester Assert.assertEquals(0, bb.remaining()); } + + static class Pre_C_11206_RowIndexEntry implements IMeasurableMemory + { + private static final long EMPTY_SIZE = ObjectSizes.measure(new Pre_C_11206_RowIndexEntry(0)); + + public final long position; + + Pre_C_11206_RowIndexEntry(long position) + { + this.position = position; + } + + protected int promotedSize(IndexInfo.Serializer idxSerializer) + { + return 0; + } + + public static Pre_C_11206_RowIndexEntry create(long position, DeletionTime deletionTime, ColumnIndex index) + { + assert index != null; + assert deletionTime != null; + + // we only consider the columns summary when determining whether to create an IndexedEntry, + // since if there are insufficient columns to be worth indexing we're going to seek to + // the beginning of the row anyway, so we might as well read the tombstone there as well. + if (index.columnsIndex.size() > 1) + return new Pre_C_11206_RowIndexEntry.IndexedEntry(position, deletionTime, index.partitionHeaderLength, index.columnsIndex); + else + return new Pre_C_11206_RowIndexEntry(position); + } + + /** + * @return true if this index entry contains the row-level tombstone and column summary. Otherwise, + * caller should fetch these from the row header. + */ + public boolean isIndexed() + { + return !columnsIndex().isEmpty(); + } + + public DeletionTime deletionTime() + { + throw new UnsupportedOperationException(); + } + + /** + * The length of the row header (partition key, partition deletion and static row). + * This value is only provided for indexed entries and this method will throw + * {@code UnsupportedOperationException} if {@code !isIndexed()}. + */ + public long headerLength() + { + throw new UnsupportedOperationException(); + } + + public List<IndexInfo> columnsIndex() + { + return Collections.emptyList(); + } + + public long unsharedHeapSize() + { + return EMPTY_SIZE; + } + + public static class Serializer + { + private final IndexInfo.Serializer idxSerializer; + private final Version version; + + Serializer(CFMetaData metadata, Version version, SerializationHeader header) + { + this.idxSerializer = metadata.serializers().indexInfoSerializer(version, header); + this.version = version; + } + + public void serialize(Pre_C_11206_RowIndexEntry rie, DataOutputPlus out) throws IOException + { + assert version.storeRows() : "We read old index files but we should never write them"; + + out.writeUnsignedVInt(rie.position); + out.writeUnsignedVInt(rie.promotedSize(idxSerializer)); + + if (rie.isIndexed()) + { + out.writeUnsignedVInt(rie.headerLength()); + DeletionTime.serializer.serialize(rie.deletionTime(), out); + out.writeUnsignedVInt(rie.columnsIndex().size()); + + // Calculate and write the offsets to the IndexInfo objects. + + int[] offsets = new int[rie.columnsIndex().size()]; + + if (out.hasPosition()) + { + // Out is usually a SequentialWriter, so using the file-pointer is fine to generate the offsets. + // A DataOutputBuffer also works. + long start = out.position(); + int i = 0; + for (IndexInfo info : rie.columnsIndex()) + { + offsets[i] = i == 0 ? 0 : (int)(out.position() - start); + i++; + idxSerializer.serialize(info, out); + } + } + else + { + // Not sure this branch will ever be needed, but if it is called, it has to calculate the + // serialized sizes instead of simply using the file-pointer. + int i = 0; + int offset = 0; + for (IndexInfo info : rie.columnsIndex()) + { + offsets[i++] = offset; + idxSerializer.serialize(info, out); + offset += idxSerializer.serializedSize(info); + } + } + + for (int off : offsets) + out.writeInt(off); + } + } + + public Pre_C_11206_RowIndexEntry deserialize(DataInputPlus in) throws IOException + { + if (!version.storeRows()) + { + long position = in.readLong(); + + int size = in.readInt(); + if (size > 0) + { + DeletionTime deletionTime = DeletionTime.serializer.deserialize(in); + + int entries = in.readInt(); + List<IndexInfo> columnsIndex = new ArrayList<>(entries); + + long headerLength = 0L; + for (int i = 0; i < entries; i++) + { + IndexInfo info = idxSerializer.deserialize(in); + columnsIndex.add(info); + if (i == 0) + headerLength = info.offset; + } + + return new Pre_C_11206_RowIndexEntry.IndexedEntry(position, deletionTime, headerLength, columnsIndex); + } + else + { + return new Pre_C_11206_RowIndexEntry(position); + } + } + + long position = in.readUnsignedVInt(); + + int size = (int)in.readUnsignedVInt(); + if (size > 0) + { + long headerLength = in.readUnsignedVInt(); + DeletionTime deletionTime = DeletionTime.serializer.deserialize(in); + int entries = (int)in.readUnsignedVInt(); + List<IndexInfo> columnsIndex = new ArrayList<>(entries); + for (int i = 0; i < entries; i++) + columnsIndex.add(idxSerializer.deserialize(in)); + + in.skipBytesFully(entries * TypeSizes.sizeof(0)); + + return new Pre_C_11206_RowIndexEntry.IndexedEntry(position, deletionTime, headerLength, columnsIndex); + } + else + { + return new Pre_C_11206_RowIndexEntry(position); + } + } + + // Reads only the data 'position' of the index entry and returns it. Note that this left 'in' in the middle + // of reading an entry, so this is only useful if you know what you are doing and in most case 'deserialize' + // should be used instead. + static long readPosition(DataInputPlus in, Version version) throws IOException + { + return version.storeRows() ? in.readUnsignedVInt() : in.readLong(); + } + + public static void skip(DataInputPlus in, Version version) throws IOException + { + readPosition(in, version); + skipPromotedIndex(in, version); + } + + private static void skipPromotedIndex(DataInputPlus in, Version version) throws IOException + { + int size = version.storeRows() ? (int)in.readUnsignedVInt() : in.readInt(); + if (size <= 0) + return; + + in.skipBytesFully(size); + } + + public int serializedSize(Pre_C_11206_RowIndexEntry rie) + { + assert version.storeRows() : "We read old index files but we should never write them"; + + int indexedSize = 0; + if (rie.isIndexed()) + { + List<IndexInfo> index = rie.columnsIndex(); + + indexedSize += TypeSizes.sizeofUnsignedVInt(rie.headerLength()); + indexedSize += DeletionTime.serializer.serializedSize(rie.deletionTime()); + indexedSize += TypeSizes.sizeofUnsignedVInt(index.size()); + + for (IndexInfo info : index) + indexedSize += idxSerializer.serializedSize(info); + + indexedSize += index.size() * TypeSizes.sizeof(0); + } + + return TypeSizes.sizeofUnsignedVInt(rie.position) + TypeSizes.sizeofUnsignedVInt(indexedSize) + indexedSize; + } + } + + /** + * An entry in the row index for a row whose columns are indexed. + */ + private static final class IndexedEntry extends Pre_C_11206_RowIndexEntry + { + private final DeletionTime deletionTime; + + // The offset in the file when the index entry end + private final long headerLength; + private final List<IndexInfo> columnsIndex; + private static final long BASE_SIZE = + ObjectSizes.measure(new IndexedEntry(0, DeletionTime.LIVE, 0, Arrays.asList(null, null))) + + ObjectSizes.measure(new ArrayList<>(1)); + + private IndexedEntry(long position, DeletionTime deletionTime, long headerLength, List<IndexInfo> columnsIndex) + { + super(position); + assert deletionTime != null; + assert columnsIndex != null && columnsIndex.size() > 1; + this.deletionTime = deletionTime; + this.headerLength = headerLength; + this.columnsIndex = columnsIndex; + } + + @Override + public DeletionTime deletionTime() + { + return deletionTime; + } + + @Override + public long headerLength() + { + return headerLength; + } + + @Override + public List<IndexInfo> columnsIndex() + { + return columnsIndex; + } + + @Override + protected int promotedSize(IndexInfo.Serializer idxSerializer) + { + long size = TypeSizes.sizeofUnsignedVInt(headerLength) + + DeletionTime.serializer.serializedSize(deletionTime) + + TypeSizes.sizeofUnsignedVInt(columnsIndex.size()); // number of entries + for (IndexInfo info : columnsIndex) + size += idxSerializer.serializedSize(info); + + size += columnsIndex.size() * TypeSizes.sizeof(0); + + return Ints.checkedCast(size); + } + + @Override + public long unsharedHeapSize() + { + long entrySize = 0; + for (IndexInfo idx : columnsIndex) + entrySize += idx.unsharedHeapSize(); + + return BASE_SIZE + + entrySize + + deletionTime.unsharedHeapSize() + + ObjectSizes.sizeOfReferenceArray(columnsIndex.size()); + } + } + } + + @Test + public void testIndexFor() throws IOException + { + DeletionTime deletionInfo = new DeletionTime(FBUtilities.timestampMicros(), FBUtilities.nowInSeconds()); + + List<IndexInfo> indexes = new ArrayList<>(); + indexes.add(new IndexInfo(cn(0L), cn(5L), 0, 0, deletionInfo)); + indexes.add(new IndexInfo(cn(10L), cn(15L), 0, 0, deletionInfo)); + indexes.add(new IndexInfo(cn(20L), cn(25L), 0, 0, deletionInfo)); + + RowIndexEntry rie = new RowIndexEntry(0L) + { + public IndexInfoRetriever openWithIndex(SegmentedFile indexFile) + { + return new IndexInfoRetriever() + { + public IndexInfo columnsIndex(int index) + { + return indexes.get(index); + } + + public void close() + { + } + }; + } + + public int columnsIndexCount() + { + return indexes.size(); + } + }; + + AbstractSSTableIterator.IndexState indexState = new AbstractSSTableIterator.IndexState( + null, comp, rie, false, null + ); + + assertEquals(0, indexState.indexFor(cn(-1L), -1)); + assertEquals(0, indexState.indexFor(cn(5L), -1)); + assertEquals(1, indexState.indexFor(cn(12L), -1)); + assertEquals(2, indexState.indexFor(cn(17L), -1)); + assertEquals(3, indexState.indexFor(cn(100L), -1)); + assertEquals(3, indexState.indexFor(cn(100L), 0)); + assertEquals(3, indexState.indexFor(cn(100L), 1)); + assertEquals(3, indexState.indexFor(cn(100L), 2)); + assertEquals(3, indexState.indexFor(cn(100L), 3)); + + indexState = new AbstractSSTableIterator.IndexState( + null, comp, rie, true, null + ); + + assertEquals(-1, indexState.indexFor(cn(-1L), -1)); + assertEquals(0, indexState.indexFor(cn(5L), 3)); + assertEquals(0, indexState.indexFor(cn(5L), 2)); + assertEquals(1, indexState.indexFor(cn(17L), 3)); + assertEquals(2, indexState.indexFor(cn(100L), 3)); + assertEquals(2, indexState.indexFor(cn(100L), 4)); + assertEquals(1, indexState.indexFor(cn(12L), 3)); + assertEquals(1, indexState.indexFor(cn(12L), 2)); + assertEquals(1, indexState.indexFor(cn(100L), 1)); + assertEquals(2, indexState.indexFor(cn(100L), 2)); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/test/unit/org/apache/cassandra/io/sstable/IndexHelperTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexHelperTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexHelperTest.java deleted file mode 100644 index e6328de..0000000 --- a/test/unit/org/apache/cassandra/io/sstable/IndexHelperTest.java +++ /dev/null @@ -1,78 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ -package org.apache.cassandra.io.sstable; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -import org.junit.Test; - -import org.apache.cassandra.Util; -import org.apache.cassandra.db.ClusteringComparator; -import org.apache.cassandra.db.ClusteringPrefix; -import org.apache.cassandra.db.DeletionTime; -import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.db.marshal.LongType; -import org.apache.cassandra.utils.FBUtilities; - -import static org.apache.cassandra.io.sstable.IndexHelper.IndexInfo; -import static org.junit.Assert.assertEquals; - -public class IndexHelperTest -{ - - private static ClusteringComparator comp = new ClusteringComparator(Collections.<AbstractType<?>>singletonList(LongType.instance)); - private static ClusteringPrefix cn(long l) - { - return Util.clustering(comp, l); - } - - @Test - public void testIndexHelper() - { - DeletionTime deletionInfo = new DeletionTime(FBUtilities.timestampMicros(), FBUtilities.nowInSeconds()); - - List<IndexInfo> indexes = new ArrayList<>(); - indexes.add(new IndexInfo(cn(0L), cn(5L), 0, 0, deletionInfo)); - indexes.add(new IndexInfo(cn(10L), cn(15L), 0, 0, deletionInfo)); - indexes.add(new IndexInfo(cn(20L), cn(25L), 0, 0, deletionInfo)); - - assertEquals(0, IndexHelper.indexFor(cn(-1L), indexes, comp, false, -1)); - assertEquals(0, IndexHelper.indexFor(cn(5L), indexes, comp, false, -1)); - assertEquals(1, IndexHelper.indexFor(cn(12L), indexes, comp, false, -1)); - assertEquals(2, IndexHelper.indexFor(cn(17L), indexes, comp, false, -1)); - assertEquals(3, IndexHelper.indexFor(cn(100L), indexes, comp, false, -1)); - assertEquals(3, IndexHelper.indexFor(cn(100L), indexes, comp, false, 0)); - assertEquals(3, IndexHelper.indexFor(cn(100L), indexes, comp, false, 1)); - assertEquals(3, IndexHelper.indexFor(cn(100L), indexes, comp, false, 2)); - assertEquals(3, IndexHelper.indexFor(cn(100L), indexes, comp, false, 3)); - - assertEquals(-1, IndexHelper.indexFor(cn(-1L), indexes, comp, true, -1)); - assertEquals(0, IndexHelper.indexFor(cn(5L), indexes, comp, true, 3)); - assertEquals(0, IndexHelper.indexFor(cn(5L), indexes, comp, true, 2)); - assertEquals(1, IndexHelper.indexFor(cn(17L), indexes, comp, true, 3)); - assertEquals(2, IndexHelper.indexFor(cn(100L), indexes, comp, true, 3)); - assertEquals(2, IndexHelper.indexFor(cn(100L), indexes, comp, true, 4)); - assertEquals(1, IndexHelper.indexFor(cn(12L), indexes, comp, true, 3)); - assertEquals(1, IndexHelper.indexFor(cn(12L), indexes, comp, true, 2)); - assertEquals(1, IndexHelper.indexFor(cn(100L), indexes, comp, true, 1)); - assertEquals(2, IndexHelper.indexFor(cn(100L), indexes, comp, true, 2)); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5bbedd/test/unit/org/apache/cassandra/io/sstable/LargePartitionsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/LargePartitionsTest.java b/test/unit/org/apache/cassandra/io/sstable/LargePartitionsTest.java new file mode 100644 index 0000000..f97356a --- /dev/null +++ b/test/unit/org/apache/cassandra/io/sstable/LargePartitionsTest.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.sstable; + +import java.util.concurrent.ThreadLocalRandom; + +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.apache.cassandra.OrderedJUnit4ClassRunner; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.metrics.CacheMetrics; +import org.apache.cassandra.service.CacheService; + +/** + * Test intended to manually measure GC pressure to write and read partitions of different size + * for CASSANDRA-11206. + */ +@RunWith(OrderedJUnit4ClassRunner.class) +@Ignore // all these tests take very, very long - so only run them manually +public class LargePartitionsTest extends CQLTester +{ + + @FunctionalInterface + interface Measured + { + void measure() throws Throwable; + } + + private static void measured(String name, Measured measured) throws Throwable + { + long t0 = System.currentTimeMillis(); + measured.measure(); + long t = System.currentTimeMillis() - t0; + System.out.println(name + " took " + t + " ms"); + } + + private static String randomText(int bytes) + { + char[] ch = new char[bytes]; + ThreadLocalRandom r = ThreadLocalRandom.current(); + for (int i = 0; i < bytes; i++) + ch[i] = (char) (32 + r.nextInt(95)); + return new String(ch); + } + + private static final int rowKBytes = 8; + + private void withPartitionSize(long partitionKBytes, long totalMBytes) throws Throwable + { + long totalKBytes = totalMBytes * 1024L; + + createTable("CREATE TABLE %s (pk text, ck text, val text, PRIMARY KEY (pk, ck))"); + + String name = "part=" + partitionKBytes + "k total=" + totalMBytes + 'M'; + + measured("INSERTs for " + name, () -> { + for (long writtenKBytes = 0L; writtenKBytes < totalKBytes; writtenKBytes += partitionKBytes) + { + String pk = Long.toBinaryString(writtenKBytes); + for (long kbytes = 0L; kbytes < partitionKBytes; kbytes += rowKBytes) + { + String ck = Long.toBinaryString(kbytes); + execute("INSERT INTO %s (pk, ck, val) VALUES (?,?,?)", pk, ck, randomText(rowKBytes * 1024)); + } + } + }); + + measured("flush for " + name, () -> flush(true)); + + CacheService.instance.keyCache.clear(); + + measured("compact for " + name, () -> { + keyCacheMetrics("before compaction"); + compact(); + keyCacheMetrics("after compaction"); + }); + + measured("SELECTs 1 for " + name, () -> selects(partitionKBytes, totalKBytes)); + + measured("SELECTs 2 for " + name, () -> selects(partitionKBytes, totalKBytes)); + } + + private void selects(long partitionKBytes, long totalKBytes) throws Throwable + { + for (int i = 0; i < 50000; i++) + { + long pk = ThreadLocalRandom.current().nextLong(totalKBytes / partitionKBytes) * partitionKBytes; + long ck = ThreadLocalRandom.current().nextLong(partitionKBytes / rowKBytes) * rowKBytes; + execute("SELECT val FROM %s WHERE pk=? AND ck=?", + Long.toBinaryString(pk), + Long.toBinaryString(ck)).one(); + if (i % 1000 == 0) + keyCacheMetrics("after " + i + " selects"); + } + keyCacheMetrics("after all selects"); + } + + private static void keyCacheMetrics(String title) + { + CacheMetrics metrics = CacheService.instance.keyCache.getMetrics(); + System.out.println("Key cache metrics " + title + ": capacity:" + metrics.capacity.getValue() + + " size:"+metrics.size.getValue()+ + " entries:" + metrics.entries.getValue() + + " hit-rate:"+metrics.hitRate.getValue() + + " one-min-rate:"+metrics.oneMinuteHitRate.getValue()); + } + + @Test + public void prepare() throws Throwable + { + for (int i = 0; i < 4; i++) + { + withPartitionSize(8L, 32L); + } + } + + @Test + public void test_01_16k() throws Throwable + { + withPartitionSize(16L, 1024L); + } + + @Test + public void test_02_512k() throws Throwable + { + withPartitionSize(512L, 1024L); + } + + @Test + public void test_03_1M() throws Throwable + { + withPartitionSize(1024L, 1024L); + } + + @Test + public void test_04_4M() throws Throwable + { + withPartitionSize(4L * 1024L, 1024L); + } + + @Test + public void test_05_8M() throws Throwable + { + withPartitionSize(8L * 1024L, 1024L); + } + + @Test + public void test_06_16M() throws Throwable + { + withPartitionSize(16L * 1024L, 1024L); + } + + @Test + public void test_07_32M() throws Throwable + { + withPartitionSize(32L * 1024L, 1024L); + } + + @Test + public void test_08_64M() throws Throwable + { + withPartitionSize(64L * 1024L, 1024L); + } + + @Test + public void test_09_256M() throws Throwable + { + withPartitionSize(256L * 1024L, 4 * 1024L); + } + + @Test + public void test_10_512M() throws Throwable + { + withPartitionSize(512L * 1024L, 4 * 1024L); + } + + @Test + public void test_11_1G() throws Throwable + { + withPartitionSize(1024L * 1024L, 8 * 1024L); + } + + @Test + public void test_12_2G() throws Throwable + { + withPartitionSize(2L * 1024L * 1024L, 8 * 1024L); + } + + @Test + public void test_13_4G() throws Throwable + { + withPartitionSize(4L * 1024L * 1024L, 16 * 1024L); + } + + @Test + public void test_14_8G() throws Throwable + { + withPartitionSize(8L * 1024L * 1024L, 32 * 1024L); + } +}