blambov commented on code in PR #2267: URL: https://github.com/apache/cassandra/pull/2267#discussion_r1195176307
########## src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableReader.java: ########## @@ -0,0 +1,558 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable.format.bti; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DataRange; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.db.Slices; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.rows.Rows; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIterators; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.CorruptSSTableException; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.io.sstable.IVerifier; +import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.io.sstable.SSTableReadsListener; +import org.apache.cassandra.io.sstable.SSTableReadsListener.SelectionReason; +import org.apache.cassandra.io.sstable.SSTableReadsListener.SkippingReason; +import org.apache.cassandra.io.sstable.format.SSTableReaderWithFilter; +import org.apache.cassandra.io.util.FileDataInput; +import org.apache.cassandra.io.util.FileHandle; +import org.apache.cassandra.io.util.RandomAccessReader; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.IFilter; +import org.apache.cassandra.utils.OutputHandler; + +import static org.apache.cassandra.io.sstable.format.SSTableReader.Operator.EQ; +import static org.apache.cassandra.io.sstable.format.SSTableReader.Operator.GE; +import static org.apache.cassandra.io.sstable.format.SSTableReader.Operator.GT; +import static org.apache.cassandra.utils.concurrent.SharedCloseable.sharedCopyOrNull; + +/** + * Reader of SSTable files in BTI format (see {@link BtiFormat}), written by {@link BtiTableWriter}. + * + * SSTableReaders are open()ed by Keyspace.onStart; after that they are created by SSTableWriter.renameAndOpen. + * Do not re-call open() on existing SSTable files; use the references kept by ColumnFamilyStore post-start instead. + */ +public class BtiTableReader extends SSTableReaderWithFilter +{ + private final FileHandle rowIndexFile; + private final PartitionIndex partitionIndex; + + public BtiTableReader(Builder builder, SSTable.Owner owner) + { + super(builder, owner); + this.rowIndexFile = builder.getRowIndexFile(); + this.partitionIndex = builder.getPartitionIndex(); + } + + protected final Builder unbuildTo(Builder builder, boolean sharedCopy) + { + Builder b = super.unbuildTo(builder, sharedCopy); + if (builder.getPartitionIndex() == null) + b.setPartitionIndex(sharedCopy ? sharedCopyOrNull(partitionIndex) : partitionIndex); + if (builder.getRowIndexFile() == null) + b.setRowIndexFile(sharedCopy ? sharedCopyOrNull(rowIndexFile) : rowIndexFile); + + return b; + } + + @Override + protected List<AutoCloseable> setupInstance(boolean trackHotness) + { + ArrayList<AutoCloseable> closeables = Lists.newArrayList(rowIndexFile, partitionIndex); + closeables.addAll(super.setupInstance(trackHotness)); + return closeables; + } + + /** + * Whether to filter out data before {@code first}. Needed for sources of data in a compaction, where the relevant + * output is opened early -- in this case the sstable's start is changed, but the data can still be found in the + * file. Range and point queries must filter it out. + */ + protected boolean filterFirst() + { + return openReason == OpenReason.MOVED_START; + } + + /** + * Whether to filter out data after {@code last}. Early-open sstables may contain data beyond the switch point + * (because an early-opened sstable is not ready until buffers have been flushed), and leaving that data visible + * will give a redundant copy with all associated overheads. + */ + protected boolean filterLast() + { + return openReason == OpenReason.EARLY && partitionIndex instanceof PartitionIndexEarly; + } + + public long estimatedKeys() + { + return partitionIndex == null ? 0 : partitionIndex.size(); + } + + @Override + protected TrieIndexEntry getRowIndexEntry(PartitionPosition key, + Operator operator, + boolean updateStats, + boolean permitMatchPastLast, + SSTableReadsListener listener) + { + assert !permitMatchPastLast; + + PartitionPosition searchKey; + Operator searchOp; + + if (operator == EQ) + return getExactPosition((DecoratedKey) key, listener, updateStats); + + if (operator == GT || operator == GE) + { + if (filterLast() && last.compareTo(key) < 0) + { + notifySkipped(SkippingReason.MIN_MAX_KEYS, listener, operator, updateStats); + return null; + } + boolean filteredLeft = (filterFirst() && first.compareTo(key) > 0); + searchKey = filteredLeft ? first : key; + searchOp = filteredLeft ? GE : operator; + + try (PartitionIndex.Reader reader = partitionIndex.openReader()) + { + TrieIndexEntry rie = reader.ceiling(searchKey, (pos, assumeNoMatch, compareKey) -> retrieveEntryIfAcceptable(searchOp, compareKey, pos, assumeNoMatch)); + if (rie != null) + notifySelected(SelectionReason.INDEX_ENTRY_FOUND, listener, operator, updateStats, rie); + else + notifySkipped(SkippingReason.INDEX_ENTRY_NOT_FOUND, listener, operator, updateStats); + return rie; + } + catch (IOException e) + { + markSuspect(); + throw new CorruptSSTableException(e, rowIndexFile.path()); + } + } + + throw new IllegalArgumentException("Invalid op: " + operator); + } + + /** + * Called by getPosition above (via Reader.ceiling/floor) to check if the position satisfies the full key constraint. + * This is called once if there is a prefix match (which can be in any relationship with the sought key, thus + * assumeNoMatch: false), and if it returns null it is called again for the closest greater position + * (with assumeNoMatch: true). + * Returns the index entry at this position, or null if the search op rejects it. + */ + private TrieIndexEntry retrieveEntryIfAcceptable(Operator searchOp, PartitionPosition searchKey, long pos, boolean assumeNoMatch) throws IOException + { + if (pos >= 0) + { + try (FileDataInput in = rowIndexFile.createReader(pos)) + { + if (assumeNoMatch) + ByteBufferUtil.skipShortLength(in); + else + { + ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in); + DecoratedKey decorated = decorateKey(indexKey); + if (searchOp.apply(decorated.compareTo(searchKey)) != 0) + return null; + } + return TrieIndexEntry.deserialize(in, in.getFilePointer()); + } + } + else + { + pos = ~pos; + if (!assumeNoMatch) + { + try (FileDataInput in = dfile.createReader(pos)) + { + ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in); + DecoratedKey decorated = decorateKey(indexKey); + if (searchOp.apply(decorated.compareTo(searchKey)) != 0) + return null; + } + } + return new TrieIndexEntry(pos); + } + } + + @Override + public DecoratedKey keyAtPositionFromSecondaryIndex(long keyPositionFromSecondaryIndex) throws IOException + { + try (RandomAccessReader reader = openDataReader()) + { + reader.seek(keyPositionFromSecondaryIndex); + if (reader.isEOF()) + return null; + return decorateKey(ByteBufferUtil.readWithShortLength(reader)); + } + } + + public TrieIndexEntry getExactPosition(DecoratedKey dk, + SSTableReadsListener listener, + boolean updateStats) + { + if ((filterFirst() && first.compareTo(dk) > 0) || (filterLast() && last.compareTo(dk) < 0)) + { + notifySkipped(SkippingReason.MIN_MAX_KEYS, listener, EQ, updateStats); Review Comment: Added in `SSTableReaderTest.testGetPositionsBloomFilterStats` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

