blambov commented on code in PR #2267: URL: https://github.com/apache/cassandra/pull/2267#discussion_r1183602502
########## src/java/org/apache/cassandra/io/sstable/format/bti/PartitionIterator.java: ########## @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable.format.bti; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.io.sstable.KeyReader; +import org.apache.cassandra.io.util.FileDataInput; +import org.apache.cassandra.io.util.FileHandle; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.Throwables; + +import static org.apache.cassandra.utils.FBUtilities.immutableListWithFilteredNulls; + +class PartitionIterator extends PartitionIndex.IndexPosIterator implements KeyReader +{ + private final PartitionIndex partitionIndex; + private final IPartitioner partitioner; + private final PartitionPosition limit; + private final int exclusiveLimit; + private final FileHandle dataFile; + private final FileHandle rowIndexFile; + + private FileDataInput dataInput; + private FileDataInput indexInput; + + private DecoratedKey currentKey; + private TrieIndexEntry currentEntry; + private DecoratedKey nextKey; + private TrieIndexEntry nextEntry; + + @SuppressWarnings({ "resource", "RedundantSuppression" }) + static PartitionIterator create(PartitionIndex partitionIndex, IPartitioner partitioner, FileHandle rowIndexFile, FileHandle dataFile, + PartitionPosition left, int inclusiveLeft, PartitionPosition right, int exclusiveRight) throws IOException + { + PartitionIterator partitionIterator = null; + PartitionIndex partitionIndexCopy = null; + FileHandle dataFileCopy = null; + FileHandle rowIndexFileCopy = null; + + try + { + partitionIndexCopy = partitionIndex.sharedCopy(); + dataFileCopy = dataFile.sharedCopy(); + rowIndexFileCopy = rowIndexFile.sharedCopy(); + + partitionIterator = new PartitionIterator(partitionIndexCopy, partitioner, rowIndexFileCopy, dataFileCopy, left, right, exclusiveRight); + + partitionIterator.readNext(); + // Because the index stores prefixes, the first value can be in any relationship with the left bound. + if (partitionIterator.nextKey != null && !(partitionIterator.nextKey.compareTo(left) > inclusiveLeft)) + { + partitionIterator.readNext(); Review Comment: Added ########## src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableWriter.java: ########## @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable.format.bti; + +import java.io.IOException; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; +import org.apache.cassandra.io.FSReadError; +import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.sstable.AbstractRowIndexEntry; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.io.sstable.format.DataComponent; +import org.apache.cassandra.io.sstable.format.IndexComponent; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableReader.OpenReason; +import org.apache.cassandra.io.sstable.format.SortedTableWriter; +import org.apache.cassandra.io.sstable.format.bti.BtiFormat.Components; +import org.apache.cassandra.io.util.DataPosition; +import org.apache.cassandra.io.util.FileHandle; +import org.apache.cassandra.io.util.MmappedRegionsCache; +import org.apache.cassandra.io.util.SequentialWriter; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.Clock; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.IFilter; +import org.apache.cassandra.utils.JVMStabilityInspector; +import org.apache.cassandra.utils.Throwables; +import org.apache.cassandra.utils.concurrent.Transactional; + +import static org.apache.cassandra.io.util.FileHandle.Builder.NO_LENGTH_OVERRIDE; + +/** + * Writes SSTables in BTI format (see {@link BtiFormat}), which can be read by {@link BtiTableReader}. + */ +@VisibleForTesting +public class BtiTableWriter extends SortedTableWriter<BtiFormatPartitionWriter> +{ + private static final Logger logger = LoggerFactory.getLogger(BtiTableWriter.class); + + private final BtiFormatPartitionWriter partitionWriter; + private final IndexWriter iwriter; + + public BtiTableWriter(Builder builder, LifecycleNewTracker lifecycleNewTracker, SSTable.Owner owner) + { + super(builder, lifecycleNewTracker, owner); + this.iwriter = builder.getIndexWriter(); + this.partitionWriter = builder.getPartitionWriter(); + } + + @Override + public void mark() + { + super.mark(); + iwriter.mark(); + } + + @Override + public void resetAndTruncate() + { + super.resetAndTruncate(); + iwriter.resetAndTruncate(); + } + + @Override + protected TrieIndexEntry createRowIndexEntry(DecoratedKey key, DeletionTime partitionLevelDeletion, long finishResult) throws IOException + { + TrieIndexEntry entry = TrieIndexEntry.create(partitionWriter.getInitialPosition(), + finishResult, + partitionLevelDeletion, + partitionWriter.getRowIndexBlockCount()); + iwriter.append(key, entry); + return entry; + } + + @SuppressWarnings({"resource", "RedundantSuppression"}) + private BtiTableReader openInternal(OpenReason openReason, boolean isFinal, Supplier<PartitionIndex> partitionIndexSupplier) + { + IFilter filter = null; + FileHandle dataFile = null; + PartitionIndex partitionIndex = null; + FileHandle rowIndexFile = null; + + BtiTableReader.Builder builder = unbuildTo(new BtiTableReader.Builder(descriptor), true).setMaxDataAge(maxDataAge) + .setSerializationHeader(header) + .setOpenReason(openReason); + + try + { + builder.setStatsMetadata(statsMetadata()); + + partitionIndex = partitionIndexSupplier.get(); + rowIndexFile = iwriter.rowIndexFHBuilder.complete(); + dataFile = openDataFile(isFinal ? NO_LENGTH_OVERRIDE : dataWriter.getLastFlushOffset(), builder.getStatsMetadata()); + filter = iwriter.getFilterCopy(); + + return builder.setPartitionIndex(partitionIndex) + .setFirst(partitionIndex.firstKey()) + .setLast(partitionIndex.lastKey()) + .setRowIndexFile(rowIndexFile) + .setDataFile(dataFile) + .setFilter(filter) + .build(owner().orElse(null), true, true); + } + catch (RuntimeException | Error ex) + { + JVMStabilityInspector.inspectThrowable(ex); + Throwables.closeNonNullAndAddSuppressed(ex, filter, dataFile, rowIndexFile, partitionIndex); + throw ex; + } + } + + @Override + public void openEarly(Consumer<SSTableReader> callWhenReady) + { + long dataLength = dataWriter.position(); + iwriter.buildPartial(dataLength, partitionIndex -> + { + iwriter.rowIndexFHBuilder.withLengthOverride(iwriter.rowIndexWriter.getLastFlushOffset()); + BtiTableReader reader = openInternal(OpenReason.EARLY, false, () -> partitionIndex); + callWhenReady.accept(reader); + }); + } + + @Override + public SSTableReader openFinalEarly() + { + // we must ensure the data is completely flushed to disk + iwriter.complete(); // This will be called by completedPartitionIndex() below too, but we want it done now to + // ensure outstanding openEarly actions are not triggered. + dataWriter.sync(); + iwriter.rowIndexWriter.sync(); + // Note: Nothing must be written to any of the files after this point, as the chunk cache could pick up and + // retain a partially-written page. + + return openFinal(OpenReason.EARLY); + } + + @Override + @SuppressWarnings({"resource", "RedundantSuppression"}) + protected SSTableReader openFinal(OpenReason openReason) + { + + if (maxDataAge < 0) + maxDataAge = Clock.Global.currentTimeMillis(); + + return openInternal(openReason, true, iwriter::completedPartitionIndex); + } + + @Override + protected TransactionalProxy txnProxy() + { + return new TransactionalProxy(() -> FBUtilities.immutableListWithFilteredNulls(iwriter, dataWriter)); + } + + private class TransactionalProxy extends SortedTableWriter<BtiFormatPartitionWriter>.TransactionalProxy + { + public TransactionalProxy(Supplier<ImmutableList<Transactional>> transactionals) + { + super(transactionals); + } + + @Override + protected Throwable doPostCleanup(Throwable accumulate) + { + accumulate = Throwables.close(accumulate, partitionWriter); + accumulate = super.doPostCleanup(accumulate); + return accumulate; + } + } + + /** + * Encapsulates writing the index and filter for an SSTable. The state of this object is not valid until it has been closed. + */ + static class IndexWriter extends SortedTableWriter.AbstractIndexWriter + { + final SequentialWriter rowIndexWriter; + private final FileHandle.Builder rowIndexFHBuilder; + private final SequentialWriter partitionIndexWriter; + private final FileHandle.Builder partitionIndexFHBuilder; + private final PartitionIndexBuilder partitionIndex; + boolean partitionIndexCompleted = false; + private DataPosition riMark; + private DataPosition piMark; + + IndexWriter(Builder b) + { + super(b); + rowIndexWriter = new SequentialWriter(descriptor.fileFor(Components.ROW_INDEX), b.getIOOptions().writerOptions); + rowIndexFHBuilder = IndexComponent.fileBuilder(Components.ROW_INDEX, b).withMmappedRegionsCache(b.getMmappedRegionsCache()); + partitionIndexWriter = new SequentialWriter(descriptor.fileFor(Components.PARTITION_INDEX), b.getIOOptions().writerOptions); + partitionIndexFHBuilder = IndexComponent.fileBuilder(Components.PARTITION_INDEX, b).withMmappedRegionsCache(b.getMmappedRegionsCache()); + partitionIndex = new PartitionIndexBuilder(partitionIndexWriter, partitionIndexFHBuilder); + // register listeners to be alerted when the data files are flushed + partitionIndexWriter.setPostFlushListener(() -> partitionIndex.markPartitionIndexSynced(partitionIndexWriter.getLastFlushOffset())); + rowIndexWriter.setPostFlushListener(() -> partitionIndex.markRowIndexSynced(rowIndexWriter.getLastFlushOffset())); + @SuppressWarnings({"resource", "RedundantSuppression"}) + SequentialWriter dataWriter = b.getDataWriter(); + dataWriter.setPostFlushListener(() -> partitionIndex.markDataSynced(dataWriter.getLastFlushOffset())); + } + + public long append(DecoratedKey key, AbstractRowIndexEntry indexEntry) throws IOException + { + bf.add(key); + long position; + if (indexEntry.isIndexed()) + { + long indexStart = rowIndexWriter.position(); + try + { + ByteBufferUtil.writeWithShortLength(key.getKey(), rowIndexWriter); + ((TrieIndexEntry) indexEntry).serialize(rowIndexWriter, rowIndexWriter.position()); + } + catch (IOException e) + { + throw new FSWriteError(e, rowIndexWriter.getFile()); + } + + if (logger.isTraceEnabled()) + logger.trace("wrote index entry: {} at {}", indexEntry, indexStart); + position = indexStart; + } + else + { + // Write data position directly in trie. + position = ~indexEntry.position; + } + partitionIndex.addEntry(key, position); + return position; + } + + public boolean buildPartial(long dataPosition, Consumer<PartitionIndex> callWhenReady) Review Comment: This is okay, we can safely ignore not scheduling another early-open session, but it's still a good idea to report that we haven't. ########## src/java/org/apache/cassandra/io/sstable/format/bti/SSTableReversedIterator.java: ########## @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable.format.bti; + +import java.io.IOException; +import java.util.NoSuchElementException; + +import com.carrotsearch.hppc.LongStack; +import org.apache.cassandra.db.ClusteringBound; +import org.apache.cassandra.db.ClusteringComparator; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Slice; +import org.apache.cassandra.db.Slices; +import org.apache.cassandra.db.UnfilteredValidation; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.rows.RangeTombstoneBoundMarker; +import org.apache.cassandra.db.rows.RangeTombstoneMarker; +import org.apache.cassandra.db.rows.Unfiltered; +import org.apache.cassandra.io.sstable.AbstractRowIndexEntry; +import org.apache.cassandra.io.sstable.AbstractSSTableIterator; +import org.apache.cassandra.io.sstable.format.bti.RowIndexReader.IndexInfo; +import org.apache.cassandra.io.util.FileDataInput; +import org.apache.cassandra.io.util.FileHandle; + +/** + * Unfiltered row iterator over a BTI SSTable that returns rows in reverse order. + */ +class SSTableReversedIterator extends AbstractSSTableIterator<TrieIndexEntry> +{ + /** + * The index of the slice being processed. + */ + private int slice; + + public SSTableReversedIterator(BtiTableReader sstable, + FileDataInput file, + DecoratedKey key, + TrieIndexEntry indexEntry, + Slices slices, + ColumnFilter columns, + FileHandle ifile) + { + super(sstable, file, key, indexEntry, slices, columns, ifile); + } + + protected Reader createReaderInternal(TrieIndexEntry indexEntry, FileDataInput file, boolean shouldCloseFile) + { + if (indexEntry.isIndexed()) + return new ReverseIndexedReader(indexEntry, file, shouldCloseFile); + else + return new ReverseReader(file, shouldCloseFile); + } + + public boolean isReverseOrder() + { + return true; + } + + protected int nextSliceIndex() + { + int next = slice; + slice++; + return slices.size() - (next + 1); + } + + protected boolean hasMoreSlices() + { + return slice < slices.size(); + } + + /** + * Reverse iteration is performed by going through an index block (or the whole partition if not indexed) forwards + * and storing the positions of each entry that falls within the slice in a stack. Reverse iteration then pops out + * positions and reads the entries. + * <p> + * Note: The earlier version of this was constructing an in-memory view of the block instead, which gives better + * performance on bigger queries and index blocks (due to not having to read disk again). With the lower + * granularity of the tries it makes better sense to store as little as possible as the beginning of the block + * should very rarely be in other page/chunk cache locations. This has the benefit of being able to answer small + * queries (esp. LIMIT 1) faster and with less GC churn. + */ + private class ReverseReader extends AbstractReader + { + final LongStack rowOffsets = new LongStack(); + RangeTombstoneMarker blockOpenMarker, blockCloseMarker; + private Unfiltered next = null; + private boolean foundLessThan; + private long startPos = -1; + + private ReverseReader(FileDataInput file, boolean shouldCloseFile) + { + super(file, shouldCloseFile); + } + + @Override + public void setForSlice(Slice slice) throws IOException + { + // read full row and filter + if (startPos == -1) + startPos = file.getFilePointer(); + else + seekToPosition(startPos); + + fillOffsets(slice, true, true, Long.MAX_VALUE); + } + + @Override + protected boolean hasNextInternal() throws IOException + { + if (next != null) + return true; + next = computeNext(); + return next != null; + } + + @Override + protected Unfiltered nextInternal() throws IOException + { + if (!hasNextInternal()) + throw new NoSuchElementException(); + + Unfiltered toReturn = next; + next = null; + return toReturn; + } + + private Unfiltered computeNext() throws IOException + { + Unfiltered toReturn; + do + { + if (blockCloseMarker != null) + { + toReturn = blockCloseMarker; + blockCloseMarker = null; + return toReturn; + } + while (!rowOffsets.isEmpty()) + { + seekToPosition(rowOffsets.pop()); + boolean hasNext = deserializer.hasNext(); + assert hasNext; Review Comment: Added ########## src/java/org/apache/cassandra/io/sstable/format/bti/PartitionIterator.java: ########## @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable.format.bti; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.io.sstable.KeyReader; +import org.apache.cassandra.io.util.FileDataInput; +import org.apache.cassandra.io.util.FileHandle; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.Throwables; + +import static org.apache.cassandra.utils.FBUtilities.immutableListWithFilteredNulls; + +class PartitionIterator extends PartitionIndex.IndexPosIterator implements KeyReader +{ + private final PartitionIndex partitionIndex; + private final IPartitioner partitioner; + private final PartitionPosition limit; + private final int exclusiveLimit; + private final FileHandle dataFile; + private final FileHandle rowIndexFile; + + private FileDataInput dataInput; + private FileDataInput indexInput; + + private DecoratedKey currentKey; + private TrieIndexEntry currentEntry; + private DecoratedKey nextKey; + private TrieIndexEntry nextEntry; + + @SuppressWarnings({ "resource", "RedundantSuppression" }) + static PartitionIterator create(PartitionIndex partitionIndex, IPartitioner partitioner, FileHandle rowIndexFile, FileHandle dataFile, + PartitionPosition left, int inclusiveLeft, PartitionPosition right, int exclusiveRight) throws IOException + { + PartitionIterator partitionIterator = null; + PartitionIndex partitionIndexCopy = null; + FileHandle dataFileCopy = null; + FileHandle rowIndexFileCopy = null; + + try + { + partitionIndexCopy = partitionIndex.sharedCopy(); + dataFileCopy = dataFile.sharedCopy(); + rowIndexFileCopy = rowIndexFile.sharedCopy(); + + partitionIterator = new PartitionIterator(partitionIndexCopy, partitioner, rowIndexFileCopy, dataFileCopy, left, right, exclusiveRight); + + partitionIterator.readNext(); + // Because the index stores prefixes, the first value can be in any relationship with the left bound. + if (partitionIterator.nextKey != null && !(partitionIterator.nextKey.compareTo(left) > inclusiveLeft)) + { + partitionIterator.readNext(); + } + partitionIterator.advance(); + return partitionIterator; + } + catch (IOException | RuntimeException ex) + { + if (partitionIterator != null) + { + partitionIterator.close(); + } + else + { + Throwables.closeNonNullAndAddSuppressed(ex, rowIndexFileCopy, dataFileCopy, partitionIndexCopy); + } + throw ex; + } + } + + static PartitionIterator create(PartitionIndex partitionIndex, IPartitioner partitioner, FileHandle rowIndexFile, FileHandle dataFile) throws IOException + { + return create(partitionIndex, partitioner, rowIndexFile, dataFile, partitionIndex.firstKey(), -1, partitionIndex.lastKey(), 0); + } + + static PartitionIterator empty(PartitionIndex partitionIndex) + { + return new PartitionIterator(partitionIndex.sharedCopy()); + } + + private PartitionIterator(PartitionIndex partitionIndex, IPartitioner partitioner, FileHandle rowIndexFile, FileHandle dataFile, + PartitionPosition left, PartitionPosition right, int exclusiveRight) + { + super(partitionIndex, left, right); + this.partitionIndex = partitionIndex; + this.partitioner = partitioner; + this.limit = right; + this.exclusiveLimit = exclusiveRight; + this.rowIndexFile = rowIndexFile; + this.dataFile = dataFile; + } + + private PartitionIterator(PartitionIndex partitionIndex) + { + super(partitionIndex, partitionIndex.firstKey(), partitionIndex.firstKey()); + this.partitionIndex = partitionIndex; + this.partitioner = null; + this.limit = partitionIndex.firstKey(); + this.exclusiveLimit = -1; + this.rowIndexFile = null; + this.dataFile = null; + + this.currentEntry = null; + this.currentKey = null; + this.nextEntry = null; + this.nextKey = null; + } + + @Override + public void close() + { + Throwable accum = null; + accum = Throwables.close(accum, immutableListWithFilteredNulls(partitionIndex, dataFile, rowIndexFile)); + accum = Throwables.close(accum, immutableListWithFilteredNulls(dataInput, indexInput)); + accum = Throwables.perform(accum, super::close); + Throwables.maybeFail(accum); + } + + public DecoratedKey decoratedKey() + { + return currentKey; + } + + public ByteBuffer key() + { + return currentKey.getKey(); + } + + @Override + public long dataPosition() + { + return currentEntry != null ? currentEntry.position : -1; + } + + @Override + public long keyPositionForSecondaryIndex() + { + return dataPosition(); + } + + public TrieIndexEntry entry() + { + return currentEntry; + } + + @Override + public boolean advance() throws IOException + { + currentKey = nextKey; + currentEntry = nextEntry; + if (currentKey != null) + { + readNext(); + // if nextKey is null, then currentKey is the last key to be published, therefore check against any limit + // and suppress the partition if it is beyond the limit + if (nextKey == null && limit != null && currentKey.compareTo(limit) > exclusiveLimit) + { // exclude last partition outside range + currentKey = null; Review Comment: Added ########## src/java/org/apache/cassandra/io/tries/ReverseValueIterator.java: ########## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.tries; + +import javax.annotation.concurrent.NotThreadSafe; + +import org.apache.cassandra.io.util.Rebufferer; +import org.apache.cassandra.utils.bytecomparable.ByteComparable; +import org.apache.cassandra.utils.bytecomparable.ByteSource; + +/** + * Thread-unsafe reverse value iterator for on-disk tries. Uses the assumptions of Walker. + */ +@NotThreadSafe +public class ReverseValueIterator<Concrete extends ReverseValueIterator<Concrete>> extends Walker<Concrete> +{ + private final ByteSource limit; + private IterationPosition stack; + private long next; + private boolean reportingPrefixes; + + static class IterationPosition + { + final long node; + final int limit; + final IterationPosition prev; + int childIndex; + + public IterationPosition(long node, int childIndex, int limit, IterationPosition prev) + { + super(); + this.node = node; + this.childIndex = childIndex; + this.limit = limit; + this.prev = prev; + } + } + + protected ReverseValueIterator(Rebufferer source, long root) Review Comment: It's unused and doesn't appear correct. Removed. ########## src/java/org/apache/cassandra/io/sstable/format/SortedTableVerifier.java: ########## @@ -140,6 +142,33 @@ protected void markAndThrow(Throwable cause, boolean mutateRepaired) throw new RuntimeException(e); } + public void verify() Review Comment: Runs with J8 under IDEA and with `ant test`. I don't want to switch my IDEA target as it always breaks something in my config. ########## src/java/org/apache/cassandra/io/tries/IncrementalTrieWriterPageAware.java: ########## @@ -0,0 +1,458 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.tries; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.NavigableSet; +import java.util.TreeSet; + +import javax.annotation.concurrent.NotThreadSafe; + +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.io.util.DataOutputPlus; + +/** + * Incremental builders of on-disk tries which packs trie stages into disk cache pages. + * + * The incremental core is as in {@link IncrementalTrieWriterSimple}, which this augments by: + * - calculating branch sizes reflecting the amount of data that needs to be written to store the trie + * branch rooted at each node + * - delaying writing any part of a completed node until its branch size is above the page size + * - laying out (some of) its children branches (each smaller than a page) to be contained within a page + * - adjusting the branch size to reflect the fact that the children are now written (i.e. removing their size) + * + * The process is bottom-up, i.e. pages are packed at the bottom and the root page is usually smaller. + * This may appear less efficient than a top-down process which puts more information in the top pages that + * tend to stay in cache, but in both cases performing a search will usually require an additional disk read + * for the leaf page. When we maximize the amount of relevant data that read brings by using the bottom-up + * process, we have practically the same efficiency with smaller intermediate page footprint, i.e. fewer data + * to keep in cache. + * + * As an example, taking a sample page size fitting 4 nodes, a simple trie would be split like this: + * Node 0 | + * -a-> | Node 1 + * | -s-> Node 2 + * | -k-> Node 3 (payload 1) + * | -s-> Node 4 (payload 2) + * ----------------------------------- + * -b-> Node 5 | + * -a-> |Node 6 + * | -n-> Node 7 + * | -k-> Node 8 (payload 3) + * | -s-> Node 9 (payload 4) + * where lines denote page boundaries. + * + * The process itself will start by adding "ask" which adds three nodes after the root to the stack. Adding "ass" + * completes Node 3, setting its branch a size of 1 and replaces it on the stack with Node 4. + * The step of adding "bank" starts by completing Node 4 (size 1), Node 2 (size 3), Node 1 (size 4), then adds 4 more + * nodes to the stack. Adding "banks" descends one more node. + * The trie completion step completes nodes 9 (size 1), 8 (size 2), 7 (size 3), 6 (size 4), 5 (size 5). Since the size + * of node 5 is above the page size, the algorithm lays out its children. Nodes 6, 7, 8, 9 are written in order. The + * size of node 5 is now just the size of it individually, 1. The process continues with completing Node 0 (size 6). + * This is bigger than the page size, so some of its children need to be written. The algorithm takes the largest, + * Node 1, and lays it out with its children in the file. Node 0 now has an adjusted size of 2 which is below the + * page size, and we can continue the process. + * Since this was the root of the trie, the current page is padded and the remaining nodes 0, 5 are written. + */ +@NotThreadSafe +public class IncrementalTrieWriterPageAware<VALUE> +extends IncrementalTrieWriterBase<VALUE, DataOutputPlus, IncrementalTrieWriterPageAware.Node<VALUE>> +implements IncrementalTrieWriter<VALUE> +{ + final int maxBytesPerPage; + + private final static Comparator<Node<?>> BRANCH_SIZE_COMPARATOR = (l, r) -> + { + // Smaller branches first. + int c = Integer.compare(l.branchSize + l.nodeSize, r.branchSize + r.nodeSize); + if (c != 0) + return c; + + // Then order by character, which serves several purposes: + // - enforces inequality to make sure equal sizes aren't treated as duplicates, + // - makes sure the item we use for comparison key comes greater than all equal-sized nodes, + // - orders equal sized items so that most recently processed (and potentially having closer children) comes + // last and is thus the first one picked for layout. + c = Integer.compare(l.transition, r.transition); + + assert c != 0 || l == r; + return c; + }; + + IncrementalTrieWriterPageAware(TrieSerializer<VALUE, ? super DataOutputPlus> trieSerializer, DataOutputPlus dest) + { + super(trieSerializer, dest, new Node<>((byte) 0)); + this.maxBytesPerPage = dest.maxBytesInPage(); + } + + @Override + public void reset() + { + reset(new Node<>((byte) 0)); + } + + @Override + Node<VALUE> performCompletion() throws IOException + { + Node<VALUE> root = super.performCompletion(); + + int actualSize = recalcTotalSize(root, dest.position()); + int bytesLeft = dest.bytesLeftInPage(); + if (actualSize > bytesLeft) + { + if (actualSize <= maxBytesPerPage) + { + dest.padToPageBoundary(); + bytesLeft = maxBytesPerPage; + // position changed, recalculate again + actualSize = recalcTotalSize(root, dest.position()); + } + + if (actualSize > bytesLeft) + { + // Still greater. Lay out children separately. + layoutChildren(root); + + // Pad if needed and place. + if (root.nodeSize > dest.bytesLeftInPage()) + { + dest.padToPageBoundary(); + // Recalculate again as pointer size may have changed, triggering assertion in writeRecursive. + recalcTotalSize(root, dest.position()); + } + } + } + + + root.finalizeWithPosition(write(root)); + return root; + } + + @Override + void complete(Node<VALUE> node) throws IOException + { + assert node.filePos == -1; + + int branchSize = 0; + for (Node<VALUE> child : node.children) + branchSize += child.branchSize + child.nodeSize; + + node.branchSize = branchSize; + + int nodeSize = serializer.sizeofNode(node, dest.position()); + if (nodeSize + branchSize < maxBytesPerPage) + { + // Good. This node and all children will (most probably) fit page. + node.nodeSize = nodeSize; + node.hasOutOfPageChildren = false; + node.hasOutOfPageInBranch = false; + + for (Node<VALUE> child : node.children) + if (child.filePos != -1) + node.hasOutOfPageChildren = true; + else if (child.hasOutOfPageChildren || child.hasOutOfPageInBranch) + node.hasOutOfPageInBranch = true; + + return; + } + + // Cannot fit. Lay out children; The current node will be marked as one with out-of-page children. + layoutChildren(node); + } + + private void layoutChildren(Node<VALUE> node) throws IOException + { + assert node.filePos == -1; + + NavigableSet<Node<VALUE>> children = node.getChildrenWithUnsetPosition(); + + int bytesLeft = dest.bytesLeftInPage(); + Node<VALUE> cmp = new Node<>(256); // goes after all equal-sized unplaced nodes (whose transition character is 0-255) + cmp.nodeSize = 0; + while (!children.isEmpty()) + { + cmp.branchSize = bytesLeft; + Node<VALUE> child = children.headSet(cmp, true).pollLast(); // grab biggest that could fit + if (child == null) + { + dest.padToPageBoundary(); + bytesLeft = maxBytesPerPage; + child = children.pollLast(); // just biggest + } + + assert child != null; + if (child.hasOutOfPageChildren || child.hasOutOfPageInBranch) + { + // We didn't know what size this branch will actually need to be, node's children may be far. + // We now know where we would place it, so let's reevaluate size. + int actualSize = recalcTotalSize(child, dest.position()); + if (actualSize > bytesLeft) + { + if (bytesLeft == maxBytesPerPage) + { + // Branch doesn't even fit in a page. + + // Note: In this situation we aren't actually making the best choice as the layout should have + // taken place at the child (which could have made the current parent small enough to fit). + // This is not trivial to fix but should be very rare. + + layoutChildren(child); + bytesLeft = dest.bytesLeftInPage(); + + assert (child.filePos == -1); + } + + // Doesn't fit, but that's probably because we don't have a full page. Put it back with the new + // size and retry when we do have enough space. + children.add(child); + continue; + } + } + + child.finalizeWithPosition(write(child)); + bytesLeft = dest.bytesLeftInPage(); + } + + // The sizing below will use the branch size, so make sure it's set. + node.branchSize = 0; + node.hasOutOfPageChildren = true; + node.hasOutOfPageInBranch = false; + node.nodeSize = serializer.sizeofNode(node, dest.position()); + } + + protected int recalcTotalSize(Node<VALUE> node, long nodePosition) throws IOException + { + if (node.hasOutOfPageInBranch) + { + int sz = 0; + for (Node<VALUE> child : node.children) + sz += recalcTotalSize(child, nodePosition + sz); + node.branchSize = sz; + } + + // The sizing below will use the branch size calculated above. Since that can change on out-of-page in branch, + // we need to recalculate the size if either flag is set. + if (node.hasOutOfPageChildren || node.hasOutOfPageInBranch) + node.nodeSize = serializer.sizeofNode(node, nodePosition + node.branchSize); + + return node.branchSize + node.nodeSize; + } + + protected long write(Node<VALUE> node) throws IOException + { + long nodePosition = dest.position(); + for (Node<VALUE> child : node.children) + if (child.filePos == -1) + child.filePos = write(child); + + nodePosition += node.branchSize; + assert dest.position() == nodePosition + : "Expected node position to be " + nodePosition + " but got " + dest.position() + " after writing children.\n" + dumpNode(node, dest.position()); + + serializer.write(dest, node, nodePosition); + + assert dest.position() == nodePosition + node.nodeSize + || dest.paddedPosition() == dest.position() // For PartitionIndexTest.testPointerGrowth where position may jump on page boundaries. + : "Expected node position to be " + (nodePosition + node.nodeSize) + " but got " + dest.position() + " after writing node, nodeSize " + node.nodeSize + ".\n" + dumpNode(node, nodePosition); + return nodePosition; + } + + protected String dumpNode(Node<VALUE> node, long nodePosition) + { + StringBuilder res = new StringBuilder(String.format("At %,d(%x) type %s child count %s nodeSize %,d branchSize %,d %s%s%n", + nodePosition, nodePosition, + TrieNode.typeFor(node, nodePosition), node.childCount(), node.nodeSize, node.branchSize, + node.hasOutOfPageChildren ? "C" : "", + node.hasOutOfPageInBranch ? "B" : "")); + for (Node<VALUE> child : node.children) + res.append(String.format("Child %2x at %,d(%x) type %s child count %s size %s nodeSize %,d branchSize %,d %s%s%n", + child.transition & 0xFF, + child.filePos, + child.filePos, + child.children != null ? TrieNode.typeFor(child, child.filePos) : "n/a", + child.children != null ? child.childCount() : "n/a", + child.children != null ? serializer.sizeofNode(child, child.filePos) : "n/a", + child.nodeSize, + child.branchSize, + child.hasOutOfPageChildren ? "C" : "", + child.hasOutOfPageInBranch ? "B" : "")); + + return res.toString(); + } + + @Override + public PartialTail makePartialRoot() throws IOException + { + // The expectation is that the partial tail will be in memory, so we don't bother with page-fitting. + // We could also send some completed children to disk, but that could make suboptimal layout choices, so we'd + // rather not. Just write anything not written yet to a buffer, from bottom to top, and we're done. + try (DataOutputBuffer buf = new DataOutputBuffer()) + { + PTail tail = new PTail(); + // Readers ask rebufferers for page-aligned positions, so make sure tail starts at one. + // "Padding" of the cutoff point may leave some unaddressable space in the constructed file view. + // Nothing will point to it, though, so that's fine. + tail.cutoff = dest.paddedPosition(); + tail.count = count; + tail.root = writePartial(stack.getFirst(), buf, tail.cutoff); + tail.tail = buf.asNewBuffer(); + return tail; + } + } + + protected long writePartial(Node<VALUE> node, DataOutputPlus dest, long baseOffset) throws IOException + { + long startPosition = dest.position() + baseOffset; + + List<Node<VALUE>> childrenToClear = new ArrayList<>(); + for (Node<VALUE> child : node.children) + { + if (child.filePos == -1) + { + childrenToClear.add(child); + child.filePos = writePartial(child, dest, baseOffset); + } + } + + long nodePosition = dest.position() + baseOffset; + + if (node.hasOutOfPageInBranch) + { + // Update the branch size with the size of what we have just written. This may be used by the node's + // maxPositionDelta, and it's a better approximation for later fitting calculations. + node.branchSize = (int) (nodePosition - startPosition); + } + + serializer.write(dest, node, nodePosition); + + if (node.hasOutOfPageChildren || node.hasOutOfPageInBranch) + { + // Update the node size with what we have just seen. It's a better approximation for later fitting + // calculations. + long endPosition = dest.position() + baseOffset; + node.nodeSize = (int) (endPosition - nodePosition); + } + + for (Node<VALUE> child : childrenToClear) + child.filePos = -1; + return nodePosition; + } + + static class Node<Value> extends IncrementalTrieWriterBase.BaseNode<Value, Node<Value>> + { + /** + * Currently calculated size of the branch below this node, not including the node itself. + * If hasOutOfPageInBranch is true, this may be underestimated as the size + * depends on the position the branch is written. + */ + int branchSize = -1; + /** + * Currently calculated node size. If hasOutOfPageChildren is true, this may be underestimated as the size + * depends on the position the node is written. + */ + int nodeSize = -1; + + /** + * Whether there is an out-of-page, already written node in the branches below the immediate children of the + * node. + */ + boolean hasOutOfPageInBranch = false; + /** + * Whether a child of the node is out of page, already written. + * Forced to true before being set to make sure maxPositionDelta performs its evaluation on non-completed + * nodes for makePartialRoot. + */ + boolean hasOutOfPageChildren = true; + + Node(int transition) + { + super(transition); + } + + @Override + Node<Value> newNode(byte transition) + { + return new Node<>(transition & 0xFF); + } + + public long serializedPositionDelta(int i, long nodePosition) + { + assert (children.get(i).filePos != -1); + return children.get(i).filePos - nodePosition; + } + + /** + * The max delta is the delta with either: + * - the position where the first child not-yet-placed child will be laid out. + * - the position of the furthest child that is already placed. + * Review Comment: Done (with more reformatting) ########## src/java/org/apache/cassandra/io/tries/IncrementalTrieWriterPageAware.java: ########## @@ -0,0 +1,458 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.tries; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.NavigableSet; +import java.util.TreeSet; + +import javax.annotation.concurrent.NotThreadSafe; + +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.io.util.DataOutputPlus; + +/** + * Incremental builders of on-disk tries which packs trie stages into disk cache pages. + * + * The incremental core is as in {@link IncrementalTrieWriterSimple}, which this augments by: + * - calculating branch sizes reflecting the amount of data that needs to be written to store the trie + * branch rooted at each node + * - delaying writing any part of a completed node until its branch size is above the page size + * - laying out (some of) its children branches (each smaller than a page) to be contained within a page + * - adjusting the branch size to reflect the fact that the children are now written (i.e. removing their size) + * + * The process is bottom-up, i.e. pages are packed at the bottom and the root page is usually smaller. + * This may appear less efficient than a top-down process which puts more information in the top pages that + * tend to stay in cache, but in both cases performing a search will usually require an additional disk read + * for the leaf page. When we maximize the amount of relevant data that read brings by using the bottom-up + * process, we have practically the same efficiency with smaller intermediate page footprint, i.e. fewer data + * to keep in cache. + * + * As an example, taking a sample page size fitting 4 nodes, a simple trie would be split like this: + * Node 0 | + * -a-> | Node 1 + * | -s-> Node 2 + * | -k-> Node 3 (payload 1) + * | -s-> Node 4 (payload 2) + * ----------------------------------- + * -b-> Node 5 | + * -a-> |Node 6 + * | -n-> Node 7 + * | -k-> Node 8 (payload 3) + * | -s-> Node 9 (payload 4) + * where lines denote page boundaries. + * + * The process itself will start by adding "ask" which adds three nodes after the root to the stack. Adding "ass" + * completes Node 3, setting its branch a size of 1 and replaces it on the stack with Node 4. + * The step of adding "bank" starts by completing Node 4 (size 1), Node 2 (size 3), Node 1 (size 4), then adds 4 more + * nodes to the stack. Adding "banks" descends one more node. + * The trie completion step completes nodes 9 (size 1), 8 (size 2), 7 (size 3), 6 (size 4), 5 (size 5). Since the size + * of node 5 is above the page size, the algorithm lays out its children. Nodes 6, 7, 8, 9 are written in order. The + * size of node 5 is now just the size of it individually, 1. The process continues with completing Node 0 (size 6). + * This is bigger than the page size, so some of its children need to be written. The algorithm takes the largest, + * Node 1, and lays it out with its children in the file. Node 0 now has an adjusted size of 2 which is below the + * page size, and we can continue the process. + * Since this was the root of the trie, the current page is padded and the remaining nodes 0, 5 are written. + */ +@NotThreadSafe +public class IncrementalTrieWriterPageAware<VALUE> +extends IncrementalTrieWriterBase<VALUE, DataOutputPlus, IncrementalTrieWriterPageAware.Node<VALUE>> +implements IncrementalTrieWriter<VALUE> +{ + final int maxBytesPerPage; + + private final static Comparator<Node<?>> BRANCH_SIZE_COMPARATOR = (l, r) -> + { + // Smaller branches first. + int c = Integer.compare(l.branchSize + l.nodeSize, r.branchSize + r.nodeSize); + if (c != 0) + return c; + + // Then order by character, which serves several purposes: + // - enforces inequality to make sure equal sizes aren't treated as duplicates, + // - makes sure the item we use for comparison key comes greater than all equal-sized nodes, + // - orders equal sized items so that most recently processed (and potentially having closer children) comes + // last and is thus the first one picked for layout. + c = Integer.compare(l.transition, r.transition); + + assert c != 0 || l == r; + return c; + }; + + IncrementalTrieWriterPageAware(TrieSerializer<VALUE, ? super DataOutputPlus> trieSerializer, DataOutputPlus dest) + { + super(trieSerializer, dest, new Node<>((byte) 0)); + this.maxBytesPerPage = dest.maxBytesInPage(); + } + + @Override + public void reset() + { + reset(new Node<>((byte) 0)); + } + + @Override + Node<VALUE> performCompletion() throws IOException + { + Node<VALUE> root = super.performCompletion(); + + int actualSize = recalcTotalSize(root, dest.position()); + int bytesLeft = dest.bytesLeftInPage(); + if (actualSize > bytesLeft) + { + if (actualSize <= maxBytesPerPage) + { + dest.padToPageBoundary(); + bytesLeft = maxBytesPerPage; + // position changed, recalculate again + actualSize = recalcTotalSize(root, dest.position()); + } + + if (actualSize > bytesLeft) + { + // Still greater. Lay out children separately. + layoutChildren(root); + + // Pad if needed and place. + if (root.nodeSize > dest.bytesLeftInPage()) + { + dest.padToPageBoundary(); + // Recalculate again as pointer size may have changed, triggering assertion in writeRecursive. + recalcTotalSize(root, dest.position()); + } + } + } + + + root.finalizeWithPosition(write(root)); + return root; + } + + @Override + void complete(Node<VALUE> node) throws IOException + { + assert node.filePos == -1; + + int branchSize = 0; + for (Node<VALUE> child : node.children) + branchSize += child.branchSize + child.nodeSize; + + node.branchSize = branchSize; + + int nodeSize = serializer.sizeofNode(node, dest.position()); + if (nodeSize + branchSize < maxBytesPerPage) + { + // Good. This node and all children will (most probably) fit page. + node.nodeSize = nodeSize; + node.hasOutOfPageChildren = false; + node.hasOutOfPageInBranch = false; + + for (Node<VALUE> child : node.children) + if (child.filePos != -1) + node.hasOutOfPageChildren = true; Review Comment: This is not reachable. Children are written when their parent is being completed, thus no child can be written at this point (unless there's sharing of branches, which we don't have at this time). Finding this inspired me to try rewriting `layoutChildren` to avoid the extra `TreeSet` (which creates a bit of garbage we could do without). The result is [here](https://github.com/blambov/cassandra/commit/9b02b25aba9d6ef16a42e339f4de78c48ed2f98b); I am not including it in this PR as it needs some testing to prove it adds value. ########## src/java/org/apache/cassandra/io/sstable/format/bti/SSTableReversedIterator.java: ########## @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable.format.bti; + +import java.io.IOException; +import java.util.NoSuchElementException; + +import com.carrotsearch.hppc.LongStack; +import org.apache.cassandra.db.ClusteringBound; +import org.apache.cassandra.db.ClusteringComparator; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Slice; +import org.apache.cassandra.db.Slices; +import org.apache.cassandra.db.UnfilteredValidation; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.rows.RangeTombstoneBoundMarker; +import org.apache.cassandra.db.rows.RangeTombstoneMarker; +import org.apache.cassandra.db.rows.Unfiltered; +import org.apache.cassandra.io.sstable.AbstractRowIndexEntry; +import org.apache.cassandra.io.sstable.AbstractSSTableIterator; +import org.apache.cassandra.io.sstable.format.bti.RowIndexReader.IndexInfo; +import org.apache.cassandra.io.util.FileDataInput; +import org.apache.cassandra.io.util.FileHandle; + +/** + * Unfiltered row iterator over a BTI SSTable that returns rows in reverse order. + */ +class SSTableReversedIterator extends AbstractSSTableIterator<TrieIndexEntry> +{ + /** + * The index of the slice being processed. + */ + private int slice; + + public SSTableReversedIterator(BtiTableReader sstable, + FileDataInput file, + DecoratedKey key, + TrieIndexEntry indexEntry, + Slices slices, + ColumnFilter columns, + FileHandle ifile) + { + super(sstable, file, key, indexEntry, slices, columns, ifile); + } + + protected Reader createReaderInternal(TrieIndexEntry indexEntry, FileDataInput file, boolean shouldCloseFile) + { + if (indexEntry.isIndexed()) + return new ReverseIndexedReader(indexEntry, file, shouldCloseFile); + else + return new ReverseReader(file, shouldCloseFile); + } + + public boolean isReverseOrder() + { + return true; + } + + protected int nextSliceIndex() + { + int next = slice; + slice++; + return slices.size() - (next + 1); + } + + protected boolean hasMoreSlices() + { + return slice < slices.size(); + } + + /** + * Reverse iteration is performed by going through an index block (or the whole partition if not indexed) forwards + * and storing the positions of each entry that falls within the slice in a stack. Reverse iteration then pops out + * positions and reads the entries. + * <p> + * Note: The earlier version of this was constructing an in-memory view of the block instead, which gives better + * performance on bigger queries and index blocks (due to not having to read disk again). With the lower + * granularity of the tries it makes better sense to store as little as possible as the beginning of the block + * should very rarely be in other page/chunk cache locations. This has the benefit of being able to answer small + * queries (esp. LIMIT 1) faster and with less GC churn. + */ + private class ReverseReader extends AbstractReader + { + final LongStack rowOffsets = new LongStack(); + RangeTombstoneMarker blockOpenMarker, blockCloseMarker; + private Unfiltered next = null; + private boolean foundLessThan; + private long startPos = -1; + + private ReverseReader(FileDataInput file, boolean shouldCloseFile) + { + super(file, shouldCloseFile); + } + + @Override + public void setForSlice(Slice slice) throws IOException + { + // read full row and filter + if (startPos == -1) + startPos = file.getFilePointer(); + else + seekToPosition(startPos); + + fillOffsets(slice, true, true, Long.MAX_VALUE); + } + + @Override + protected boolean hasNextInternal() throws IOException + { + if (next != null) + return true; + next = computeNext(); + return next != null; + } + + @Override + protected Unfiltered nextInternal() throws IOException + { + if (!hasNextInternal()) + throw new NoSuchElementException(); + + Unfiltered toReturn = next; + next = null; + return toReturn; + } + + private Unfiltered computeNext() throws IOException + { + Unfiltered toReturn; + do + { + if (blockCloseMarker != null) + { + toReturn = blockCloseMarker; + blockCloseMarker = null; + return toReturn; + } + while (!rowOffsets.isEmpty()) + { + seekToPosition(rowOffsets.pop()); + boolean hasNext = deserializer.hasNext(); + assert hasNext; + toReturn = deserializer.readNext(); + UnfilteredValidation.maybeValidateUnfiltered(toReturn, metadata(), key, sstable); + // We may get empty row for the same reason expressed on UnfilteredSerializer.deserializeOne. + if (!toReturn.isEmpty()) + return toReturn; + } + } + while (!foundLessThan && advanceIndexBlock()); + + // open marker to be output only as slice is finished + if (blockOpenMarker != null) + { + toReturn = blockOpenMarker; + blockOpenMarker = null; + return toReturn; + } + return null; + } + + protected boolean advanceIndexBlock() throws IOException + { + return false; + } + + void fillOffsets(Slice slice, boolean filterStart, boolean filterEnd, long stopPosition) throws IOException + { + filterStart &= !slice.start().equals(ClusteringBound.BOTTOM); + filterEnd &= !slice.end().equals(ClusteringBound.TOP); + + ClusteringBound<?> start = slice.start(); + long currentPosition = file.getFilePointer(); + foundLessThan = false; + // This is a copy of handlePreSliceData which also checks currentPosition < stopPosition. + // Not extracted to method as we need both marker and currentPosition. + if (filterStart) + { + while (currentPosition < stopPosition && deserializer.hasNext() && deserializer.compareNextTo(start) <= 0) + { + if (deserializer.nextIsRow()) + deserializer.skipNext(); + else + updateOpenMarker((RangeTombstoneMarker) deserializer.readNext()); + + currentPosition = file.getFilePointer(); + foundLessThan = true; + } + } + + // We've reached the beginning of our queried slice. If we have an open marker + // we should return that at the end of the slice to close the deletion. + if (openMarker != null) + blockOpenMarker = new RangeTombstoneBoundMarker(start, openMarker); + + + // Now deserialize everything until we reach our requested end (if we have one) + // See SSTableIterator.ForwardRead.computeNext() for why this is a strict inequality below: this is the same + // reasoning here. + while (currentPosition < stopPosition && deserializer.hasNext() + && (!filterEnd || deserializer.compareNextTo(slice.end()) < 0)) + { + rowOffsets.push(currentPosition); + if (deserializer.nextIsRow()) + deserializer.skipNext(); + else + updateOpenMarker((RangeTombstoneMarker) deserializer.readNext()); + + currentPosition = file.getFilePointer(); + } + + // If we have an open marker, we should output that first, unless end is not being filtered + // (i.e. it's either top (where a marker can't be open) or we placed that marker during previous block). + if (openMarker != null && filterEnd) + { + // If we have no end and still an openMarker, this means we're indexed and the marker is closed in a following block. + blockCloseMarker = new RangeTombstoneBoundMarker(slice.end(), openMarker); + openMarker = null; + } + } + } + + private class ReverseIndexedReader extends ReverseReader + { + private RowIndexReverseIterator indexReader; + private final TrieIndexEntry indexEntry; + private final long basePosition; Review Comment: It does not to me -- block positions are calculated as `base + offset`. The offset can (at least in theory) be negative too. ########## test/unit/org/apache/cassandra/io/tries/TrieBuilderTest.java: ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.tries; + +import java.io.IOException; + +import org.junit.Test; + +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.io.util.Rebufferer; +import org.apache.cassandra.io.util.TailOverridingRebufferer; + +import static org.junit.Assert.assertEquals; + +public class TrieBuilderTest extends AbstractTrieTestBase +{ + @Test + public void testPartialBuild_DB1148() throws IOException Review Comment: Renamed ########## src/java/org/apache/cassandra/io/tries/IncrementalTrieWriterPageAware.java: ########## @@ -0,0 +1,458 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.tries; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.NavigableSet; +import java.util.TreeSet; + +import javax.annotation.concurrent.NotThreadSafe; + +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.io.util.DataOutputPlus; + +/** + * Incremental builders of on-disk tries which packs trie stages into disk cache pages. + * Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

