blambov commented on code in PR #2267:
URL: https://github.com/apache/cassandra/pull/2267#discussion_r1170988693


##########
src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableScanner.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.bti;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterators;
+
+import org.apache.cassandra.db.DataRange;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.filter.ClusteringIndexFilter;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.rows.LazilyInitializedUnfilteredRowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.AbstractBounds.Boundary;
+import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.SSTableReadsListener;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.AbstractIterator;
+import org.apache.cassandra.utils.CloseableIterator;
+
+import static org.apache.cassandra.dht.AbstractBounds.isEmpty;
+import static org.apache.cassandra.dht.AbstractBounds.maxLeft;
+import static org.apache.cassandra.dht.AbstractBounds.minRight;
+
+public class BtiTableScanner implements ISSTableScanner
+{
+    private final AtomicBoolean isClosed = new AtomicBoolean(false);
+    protected final RandomAccessReader dfile;
+    public final BtiTableReader sstable;
+
+    private final Iterator<AbstractBounds<PartitionPosition>> rangeIterator;
+
+    private final ColumnFilter columns;
+    private final DataRange dataRange;
+    private final SSTableReadsListener listener;
+    private long startScan = -1;
+    private long bytesScanned = 0;
+
+    protected CloseableIterator<UnfilteredRowIterator> iterator;
+
+    // Full scan of the sstables
+    public static ISSTableScanner getScanner(BtiTableReader sstable)
+    {
+        return getScanner(sstable, 
Iterators.singletonIterator(fullRange(sstable)));
+    }
+
+    public static ISSTableScanner getScanner(BtiTableReader sstable,
+                                             ColumnFilter columns,
+                                             DataRange dataRange,
+                                             SSTableReadsListener listener)
+    {
+        return new BtiTableScanner(sstable, columns, dataRange, 
makeBounds(sstable, dataRange).iterator(), listener);
+    }
+
+    public static ISSTableScanner getScanner(BtiTableReader sstable, 
Collection<Range<Token>> tokenRanges)
+    {
+        return getScanner(sstable, makeBounds(sstable, 
tokenRanges).iterator());
+    }
+
+    public static ISSTableScanner getScanner(BtiTableReader sstable, 
Iterator<AbstractBounds<PartitionPosition>> rangeIterator)
+    {
+        return new BtiTableScanner(sstable, 
ColumnFilter.all(sstable.metadata()), null, rangeIterator, 
SSTableReadsListener.NOOP_LISTENER);
+    }
+
+    private BtiTableScanner(BtiTableReader sstable,
+                            ColumnFilter columns,
+                            DataRange dataRange,
+                            Iterator<AbstractBounds<PartitionPosition>> 
rangeIterator,
+                            SSTableReadsListener listener)
+    {
+        assert sstable != null;
+
+        this.dfile = sstable.openDataReader();
+        this.sstable = sstable;
+        this.columns = columns;
+        this.dataRange = dataRange;
+        this.rangeIterator = rangeIterator;
+        this.listener = listener;
+    }
+
+    public static List<AbstractBounds<PartitionPosition>> 
makeBounds(SSTableReader sstable, Collection<Range<Token>> tokenRanges)
+    {
+        List<AbstractBounds<PartitionPosition>> boundsList = new 
ArrayList<>(tokenRanges.size());
+        for (Range<Token> range : Range.normalize(tokenRanges))
+            addRange(sstable, Range.makeRowRange(range), boundsList);
+        return boundsList;
+    }
+
+    static List<AbstractBounds<PartitionPosition>> makeBounds(SSTableReader 
sstable, DataRange dataRange)
+    {
+        List<AbstractBounds<PartitionPosition>> boundsList = new 
ArrayList<>(2);
+        addRange(sstable, dataRange.keyRange(), boundsList);
+        return boundsList;
+    }
+
+    static AbstractBounds<PartitionPosition> fullRange(SSTableReader sstable)
+    {
+        return new Bounds<>(sstable.first, sstable.last);
+    }
+
+    private static void addRange(SSTableReader sstable, 
AbstractBounds<PartitionPosition> requested, 
List<AbstractBounds<PartitionPosition>> boundsList)
+    {
+        if (requested instanceof Range && ((Range<?>) 
requested).isWrapAround())
+        {
+            if (requested.right.compareTo(sstable.first) >= 0)
+            {
+                // since we wrap, we must contain the whole sstable prior to 
stopKey()
+                Boundary<PartitionPosition> left = new 
Boundary<>(sstable.first, true);
+                Boundary<PartitionPosition> right;
+                right = requested.rightBoundary();
+                right = minRight(right, sstable.last, true);
+                if (!isEmpty(left, right))
+                    boundsList.add(AbstractBounds.bounds(left, right));
+            }
+            if (requested.left.compareTo(sstable.last) <= 0)
+            {
+                // since we wrap, we must contain the whole sstable after 
dataRange.startKey()
+                Boundary<PartitionPosition> right = new 
Boundary<>(sstable.last, true);
+                Boundary<PartitionPosition> left;
+                left = requested.leftBoundary();
+                left = maxLeft(left, sstable.first, true);
+                if (!isEmpty(left, right))
+                    boundsList.add(AbstractBounds.bounds(left, right));
+            }
+        }
+        else
+        {
+            assert !AbstractBounds.strictlyWrapsAround(requested.left, 
requested.right);
+            Boundary<PartitionPosition> left, right;
+            left = requested.leftBoundary();
+            right = requested.rightBoundary();
+            left = maxLeft(left, sstable.first, true);
+            // apparently isWrapAround() doesn't count Bounds that extend to 
the limit (min) as wrapping
+            right = requested.right.isMinimum() ? new Boundary<>(sstable.last, 
true)
+                                                : minRight(right, 
sstable.last, true);
+            if (!isEmpty(left, right))
+                boundsList.add(AbstractBounds.bounds(left, right));
+        }
+    }
+
+    public void close()
+    {
+        try
+        {
+            if (isClosed.compareAndSet(false, true))
+            {
+                FileUtils.close(dfile);
+                if (iterator != null)
+                    iterator.close();
+            }
+        }
+        catch (IOException e)
+        {
+            sstable.markSuspect();
+            throw new CorruptSSTableException(e, sstable.getFilename());
+        }
+    }
+
+    public long getBytesScanned()
+    {
+        return bytesScanned;
+    }
+
+    @Override
+    public long getLengthInBytes()
+    {
+        return sstable.uncompressedLength();
+    }
+
+
+    public long getCompressedLengthInBytes()
+    {
+        return sstable.onDiskLength();
+    }
+
+    @Override
+    public long getCurrentPosition()
+    {
+        return dfile.getFilePointer();
+    }
+
+    @Override
+    public Set<SSTableReader> getBackingSSTables()
+    {
+        return ImmutableSet.of(sstable);
+    }
+
+    public int level()

Review Comment:
   Removed.



##########
src/java/org/apache/cassandra/io/sstable/format/bti/PartitionIndexBuilder.java:
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.bti;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.io.tries.IncrementalTrieWriter;
+import org.apache.cassandra.io.tries.Walker;
+import org.apache.cassandra.io.util.FileHandle;
+import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.bytecomparable.ByteComparable;
+
+/**
+ * Partition index builder: stores index or data positions in an incrementally 
built, page aware on-disk trie.
+ *
+ * Not to be used outside of package. Public only for IndexRewriter tool.
+ */
+public class PartitionIndexBuilder implements AutoCloseable
+{
+    private final SequentialWriter writer;
+    private final IncrementalTrieWriter<PartitionIndex.Payload> trieWriter;
+    private final FileHandle.Builder fhBuilder;
+
+    // the last synced data file position
+    private long dataSyncPosition;
+    // the last synced row index file position
+    private long rowIndexSyncPosition;
+    // the last synced partition index file position
+    private long partitionIndexSyncPosition;
+
+    // Partial index can only be used after all three files have been synced 
to the required positions.
+    private long partialIndexDataEnd;
+    private long partialIndexRowEnd;
+    private long partialIndexPartitionEnd;
+    private IncrementalTrieWriter.PartialTail partialIndexTail;
+    private Consumer<PartitionIndex> partialIndexConsumer;
+    private DecoratedKey partialIndexLastKey;
+
+    private int lastDiffPoint;
+    private DecoratedKey firstKey;
+    private DecoratedKey lastKey;
+    private DecoratedKey lastWrittenKey;
+    private PartitionIndex.Payload lastPayload;
+
+    public PartitionIndexBuilder(SequentialWriter writer, FileHandle.Builder 
fhBuilder)
+    {
+        this.writer = writer;
+        this.trieWriter = 
IncrementalTrieWriter.open(PartitionIndex.TRIE_SERIALIZER, writer);
+        this.fhBuilder = fhBuilder;
+    }
+
+    /*
+     * Called when partition index has been flushed to the given position.
+     * If this makes all required positions for a partial view flushed, this 
will call the partialIndexConsumer.
+     */
+    public void markPartitionIndexSynced(long upToPosition)
+    {
+        partitionIndexSyncPosition = upToPosition;
+        refreshReadableBoundary();
+    }
+
+    /*
+     * Called when row index has been flushed to the given position.
+     * If this makes all required positions for a partial view flushed, this 
will call the partialIndexConsumer.
+     */
+    public void markRowIndexSynced(long upToPosition)
+    {
+        rowIndexSyncPosition = upToPosition;
+        refreshReadableBoundary();
+    }
+
+    /*
+     * Called when data file has been flushed to the given position.
+     * If this makes all required positions for a partial view flushed, this 
will call the partialIndexConsumer.
+     */
+    public void markDataSynced(long upToPosition)
+    {
+        dataSyncPosition = upToPosition;
+        refreshReadableBoundary();
+    }
+
+    private void refreshReadableBoundary()
+    {
+        if (partialIndexConsumer == null)
+            return;
+        if (dataSyncPosition < partialIndexDataEnd)
+            return;
+        if (rowIndexSyncPosition < partialIndexRowEnd)
+            return;
+        if (partitionIndexSyncPosition < partialIndexPartitionEnd)
+            return;
+
+        try (FileHandle fh = 
fhBuilder.withLengthOverride(writer.getLastFlushOffset()).complete())
+        {
+            @SuppressWarnings("resource")
+            PartitionIndex pi = new PartitionIndexEarly(fh, 
partialIndexTail.root(), partialIndexTail.count(), firstKey, 
partialIndexLastKey, partialIndexTail.cutoff(), partialIndexTail.tail());
+            partialIndexConsumer.accept(pi);
+            partialIndexConsumer = null;
+        }
+        finally
+        {
+            fhBuilder.withLengthOverride(-1);
+        }
+
+    }
+
+    private final static Logger logger = 
LoggerFactory.getLogger(PartitionIndexBuilder.class);

Review Comment:
   Removed



##########
src/java/org/apache/cassandra/io/sstable/format/bti/PartitionIndexBuilder.java:
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.bti;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.io.tries.IncrementalTrieWriter;
+import org.apache.cassandra.io.tries.Walker;
+import org.apache.cassandra.io.util.FileHandle;
+import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.bytecomparable.ByteComparable;
+
+/**
+ * Partition index builder: stores index or data positions in an incrementally 
built, page aware on-disk trie.
+ *

Review Comment:
   Removed latter part (`IndexRewriter` does not exist) and made class 
package-private.



##########
src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableReaderLoadingBuilder.java:
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable.format.bti;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.KeyReader;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.format.CompressionInfoComponent;
+import org.apache.cassandra.io.sstable.format.FilterComponent;
+import org.apache.cassandra.io.sstable.format.SSTableReaderLoadingBuilder;
+import org.apache.cassandra.io.sstable.format.StatsComponent;
+import org.apache.cassandra.io.sstable.format.bti.BtiFormat.Components;
+import org.apache.cassandra.io.sstable.metadata.MetadataType;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.io.sstable.metadata.ValidationMetadata;
+import org.apache.cassandra.io.util.FileHandle;
+import org.apache.cassandra.metrics.TableMetrics;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.FilterFactory;
+import org.apache.cassandra.utils.IFilter;
+import org.apache.cassandra.utils.Throwables;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class BtiTableReaderLoadingBuilder extends 
SSTableReaderLoadingBuilder<BtiTableReader, BtiTableReader.Builder>
+{
+    private final static Logger logger = 
LoggerFactory.getLogger(BtiTableReaderLoadingBuilder.class);
+
+    private FileHandle.Builder dataFileBuilder;
+    private FileHandle.Builder partitionIndexFileBuilder;
+    private FileHandle.Builder rowIndexFileBuilder;
+
+    public BtiTableReaderLoadingBuilder(SSTable.Builder<?, ?> builder)
+    {
+        super(builder);
+    }
+
+    @Override
+    public KeyReader buildKeyReader(TableMetrics tableMetrics) throws 
IOException
+    {
+        StatsComponent statsComponent = StatsComponent.load(descriptor, 
MetadataType.STATS, MetadataType.HEADER, MetadataType.VALIDATION);
+        return createKeyReader(statsComponent.statsMetadata());
+    }
+
+    private KeyReader createKeyReader(StatsMetadata statsMetadata) throws 
IOException
+    {
+        checkNotNull(statsMetadata);
+
+        try (PartitionIndex index = 
PartitionIndex.load(partitionIndexFileBuilder(), 
tableMetadataRef.getLocal().partitioner, false);
+             CompressionMetadata compressionMetadata = 
CompressionInfoComponent.maybeLoad(descriptor, components);
+             FileHandle dFile = 
dataFileBuilder(statsMetadata).withCompressionMetadata(compressionMetadata).complete();
+             FileHandle riFile = rowIndexFileBuilder().complete())
+        {
+            return PartitionIterator.create(index,
+                                            
tableMetadataRef.getLocal().partitioner,
+                                            riFile,
+                                            dFile);
+        }
+    }
+
+    @Override
+    protected void openComponents(BtiTableReader.Builder builder, 
SSTable.Owner owner, boolean validate, boolean online) throws IOException
+    {
+        try
+        {
+            StatsComponent statsComponent = StatsComponent.load(descriptor, 
MetadataType.STATS, MetadataType.VALIDATION, MetadataType.HEADER);
+            
builder.setSerializationHeader(statsComponent.serializationHeader(builder.getTableMetadataRef().getLocal()));
+            assert !online || builder.getSerializationHeader() != null;
+
+            builder.setStatsMetadata(statsComponent.statsMetadata());
+            ValidationMetadata validationMetadata = 
statsComponent.validationMetadata();
+            validatePartitioner(builder.getTableMetadataRef().getLocal(), 
validationMetadata);
+
+            boolean filterNeeded = online;
+            if (filterNeeded)
+                builder.setFilter(loadFilter(validationMetadata));
+            boolean rebuildFilter = filterNeeded && builder.getFilter() == 
null;
+
+            if (descriptor.version.hasKeyRange())
+            {
+                IPartitioner partitioner = 
tableMetadataRef.getLocal().partitioner;
+                
builder.setFirst(partitioner.decorateKey(builder.getStatsMetadata().firstKey));
+                
builder.setLast(partitioner.decorateKey(builder.getStatsMetadata().lastKey));
+            }
+
+            if (builder.getComponents().contains(Components.PARTITION_INDEX) 
&& builder.getComponents().contains(Components.ROW_INDEX) && rebuildFilter)
+            {
+                @SuppressWarnings("resource")
+                IFilter filter = 
buildBloomFilter(statsComponent.statsMetadata());
+
+                if (filter != null)
+                {
+                    builder.setFilter(filter);
+
+                    if (online)
+                        FilterComponent.save(filter, descriptor, false);
+                }
+            }
+
+            if (builder.getFilter() == null)
+                builder.setFilter(FilterFactory.AlwaysPresent);
+
+            if (builder.getComponents().contains(Components.ROW_INDEX))
+                builder.setRowIndexFile(rowIndexFileBuilder().complete());
+
+            if (builder.getComponents().contains(Components.PARTITION_INDEX))
+            {
+                
builder.setPartitionIndex(openPartitionIndex(builder.getFilter().isInformative()));
+                builder.setFirst(builder.getPartitionIndex().firstKey());
+                builder.setLast(builder.getPartitionIndex().lastKey());
+            }
+
+            try (CompressionMetadata compressionMetadata = 
CompressionInfoComponent.maybeLoad(descriptor, components))
+            {
+                
builder.setDataFile(dataFileBuilder(builder.getStatsMetadata()).withCompressionMetadata(compressionMetadata).complete());
+            }
+        }
+        catch (IOException | RuntimeException | Error ex)
+        {
+            // in case of failure, close only those components which have been 
opened in this try-catch block
+            Throwables.closeAndAddSuppressed(ex, builder.getPartitionIndex(), 
builder.getRowIndexFile(), builder.getDataFile(), builder.getFilter());
+            throw ex;
+        }
+    }
+
+    private IFilter buildBloomFilter(StatsMetadata statsMetadata) throws 
IOException
+    {
+        IFilter bf = null;
+
+        try (KeyReader keyReader = createKeyReader(statsMetadata))
+        {
+            if (keyReader == null)

Review Comment:
   Removed



##########
src/java/org/apache/cassandra/io/tries/ReverseValueIterator.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.tries;
+
+import org.apache.cassandra.io.util.Rebufferer;
+import org.apache.cassandra.utils.bytecomparable.ByteComparable;
+import org.apache.cassandra.utils.bytecomparable.ByteSource;
+
+/**
+ * Thread-unsafe reverse value iterator for on-disk tries. Uses the 
assumptions of Walker.
+ */
+public class ReverseValueIterator<Concrete extends 
ReverseValueIterator<Concrete>> extends Walker<Concrete>
+{
+    private final ByteSource limit;
+    private IterationPosition stack;
+    private long next;
+    private boolean reportingPrefixes;
+
+    static class IterationPosition
+    {
+        long node;
+        int childIndex;
+        int limit;

Review Comment:
   Done



##########
src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableReader.java:
##########
@@ -0,0 +1,560 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.bti;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DataRange;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.Slices;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.Rows;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterators;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.IVerifier;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.SSTableReadsListener;
+import org.apache.cassandra.io.sstable.SSTableReadsListener.SelectionReason;
+import org.apache.cassandra.io.sstable.SSTableReadsListener.SkippingReason;
+import org.apache.cassandra.io.sstable.format.SSTableReaderWithFilter;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.FileHandle;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.IFilter;
+import org.apache.cassandra.utils.OutputHandler;
+
+import static org.apache.cassandra.io.sstable.format.SSTableReader.Operator.EQ;
+import static org.apache.cassandra.io.sstable.format.SSTableReader.Operator.GE;
+import static org.apache.cassandra.io.sstable.format.SSTableReader.Operator.GT;
+import static 
org.apache.cassandra.utils.concurrent.SharedCloseable.sharedCopyOrNull;
+
+/**
+ * SSTableReaders are open()ed by Keyspace.onStart; after that they are 
created by SSTableWriter.renameAndOpen.
+ * Do not re-call open() on existing SSTable files; use the references kept by 
ColumnFamilyStore post-start instead.
+ */
+public class BtiTableReader extends SSTableReaderWithFilter
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(BtiTableReader.class);
+
+    private final FileHandle rowIndexFile;
+    private final PartitionIndex partitionIndex;
+
+    public BtiTableReader(Builder builder, SSTable.Owner owner)
+    {
+        super(builder, owner);
+        this.rowIndexFile = builder.getRowIndexFile();
+        this.partitionIndex = builder.getPartitionIndex();
+    }
+
+    protected final Builder unbuildTo(Builder builder, boolean sharedCopy)
+    {
+        Builder b = super.unbuildTo(builder, sharedCopy);

Review Comment:
   Hmm. This seems to indicate a problem with the hierarchy -- it seems you can 
get different results depending on what type of reference you call `unbuildTo` 
on, as the methods don't override one another.
   
   @jacek-lewandowski, do you think there's a way to make this safer, or should 
we rename the methods to e.g. `unbuildBtiReaderTo` etc.?



##########
src/java/org/apache/cassandra/io/tries/ReverseValueIterator.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.tries;
+
+import org.apache.cassandra.io.util.Rebufferer;
+import org.apache.cassandra.utils.bytecomparable.ByteComparable;
+import org.apache.cassandra.utils.bytecomparable.ByteSource;
+
+/**
+ * Thread-unsafe reverse value iterator for on-disk tries. Uses the 
assumptions of Walker.
+ */
+public class ReverseValueIterator<Concrete extends 
ReverseValueIterator<Concrete>> extends Walker<Concrete>
+{
+    private final ByteSource limit;
+    private IterationPosition stack;
+    private long next;
+    private boolean reportingPrefixes;
+
+    static class IterationPosition
+    {
+        long node;

Review Comment:
   Done



##########
src/java/org/apache/cassandra/io/tries/ReverseValueIterator.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.tries;
+
+import org.apache.cassandra.io.util.Rebufferer;
+import org.apache.cassandra.utils.bytecomparable.ByteComparable;
+import org.apache.cassandra.utils.bytecomparable.ByteSource;
+
+/**
+ * Thread-unsafe reverse value iterator for on-disk tries. Uses the 
assumptions of Walker.
+ */
+public class ReverseValueIterator<Concrete extends 
ReverseValueIterator<Concrete>> extends Walker<Concrete>
+{
+    private final ByteSource limit;
+    private IterationPosition stack;
+    private long next;
+    private boolean reportingPrefixes;
+
+    static class IterationPosition
+    {
+        long node;
+        int childIndex;
+        int limit;
+        IterationPosition prev;

Review Comment:
   Done



##########
src/java/org/apache/cassandra/io/tries/IncrementalDeepTrieWriterPageAware.java:
##########
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.tries;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+/**
+ * This class is a variant of {@link IncrementalTrieWriterPageAware} which is 
able to build even very deep
+ * tries. While the parent class uses recursion for clarity, it may end up 
with stack overflow for tries with
+ * very long keys. This implementation can switch processing from stack to 
heap at a certain depth (provided
+ * as a constructor param).
+ */
+public class IncrementalDeepTrieWriterPageAware<VALUE> extends 
IncrementalTrieWriterPageAware<VALUE>
+{
+    private final int maxRecursionDepth;
+
+    public IncrementalDeepTrieWriterPageAware(TrieSerializer<VALUE, ? super 
DataOutputPlus> trieSerializer, DataOutputPlus dest, int maxRecursionDepth)
+    {
+        super(trieSerializer, dest);
+        this.maxRecursionDepth = maxRecursionDepth;
+    }
+
+    public IncrementalDeepTrieWriterPageAware(TrieSerializer<VALUE, ? super 
DataOutputPlus> trieSerializer, DataOutputPlus dest)
+    {
+        this(trieSerializer, dest, 64);
+    }
+
+    @Override
+    protected int recalcTotalSize(Node<VALUE> node, long nodePosition) throws 
IOException
+    {
+        return recalcTotalSizeRecursiveOnStack(node, nodePosition, 0);
+    }
+
+    private int recalcTotalSizeRecursiveOnStack(Node<VALUE> node, long 
nodePosition, int depth) throws IOException
+    {
+        if (node.hasOutOfPageInBranch)
+        {
+            int sz = 0;
+            for (Node<VALUE> child : node.children)
+            {
+                if (depth < maxRecursionDepth)
+                    sz += recalcTotalSizeRecursiveOnStack(child, nodePosition 
+ sz, depth + 1);
+                else
+                    sz += recalcTotalSizeRecursiveOnHeap(child, nodePosition + 
sz);
+            }
+            node.branchSize = sz;
+        }
+
+        // The sizing below will use the branch size calculated above. Since 
that can change on out-of-page in branch,
+        // we need to recalculate the size if either flag is set.
+        if (node.hasOutOfPageChildren || node.hasOutOfPageInBranch)
+            node.nodeSize = serializer.sizeofNode(node, nodePosition + 
node.branchSize);
+
+        return node.branchSize + node.nodeSize;
+    }
+
+    @Override
+    protected long write(Node<VALUE> node) throws IOException
+    {
+        return writeRecursiveOnStack(node, 0);
+    }
+
+    private long writeRecursiveOnStack(Node<VALUE> node, int depth) throws 
IOException
+    {
+        long nodePosition = dest.position();
+        for (Node<VALUE> child : node.children)
+            if (child.filePos == -1)
+            {
+                if (depth < maxRecursionDepth)
+                    child.filePos = writeRecursiveOnStack(child, depth + 1);
+                else
+                    child.filePos = writeRecursiveOnHeap(child);
+            }
+
+        nodePosition += node.branchSize;
+        assert dest.position() == nodePosition
+                : "Expected node position to be " + nodePosition + " but got " 
+ dest.position() + " after writing children.\n" + dumpNode(node, 
dest.position());
+
+        serializer.write(dest, node, nodePosition);
+
+        assert dest.position() == nodePosition + node.nodeSize
+               || dest.paddedPosition() == dest.position() // For 
PartitionIndexTest.testPointerGrowth where position may jump on page boundaries.
+                : "Expected node position to be " + (nodePosition + 
node.nodeSize) + " but got " + dest.position() + " after writing node, nodeSize 
" + node.nodeSize + ".\n" + dumpNode(node, nodePosition);
+        return nodePosition;
+    }
+
+    @Override
+    protected long writePartial(Node<VALUE> node, DataOutputPlus dest, long 
baseOffset) throws IOException
+    {
+        return writePartialRecursiveOnStack(node, dest, baseOffset, 0);
+    }
+
+    private long writePartialRecursiveOnStack(Node<VALUE> node, DataOutputPlus 
dest, long baseOffset, int depth) throws IOException
+    {
+        long startPosition = dest.position() + baseOffset;
+
+        List<Node<VALUE>> childrenToClear = new ArrayList<>();
+        for (Node<VALUE> child : node.children)
+        {
+            if (child.filePos == -1)
+            {
+                childrenToClear.add(child);
+                if (depth < maxRecursionDepth)
+                    child.filePos = writePartialRecursiveOnStack(child, dest, 
baseOffset, depth + 1);
+                else
+                    child.filePos = writePartialRecursiveOnHeap(child, dest, 
baseOffset);
+            }
+        }
+
+        long nodePosition = dest.position() + baseOffset;
+
+        if (node.hasOutOfPageInBranch)
+        {
+            // Update the branch size with the size of what we have just 
written. This may be used by the node's
+            // maxPositionDelta, and it's a better approximation for later 
fitting calculations.
+            node.branchSize = (int) (nodePosition - startPosition);
+        }
+
+        serializer.write(dest, node, nodePosition);
+
+        if (node.hasOutOfPageChildren || node.hasOutOfPageInBranch)
+        {
+            // Update the node size with what we have just seen. It's a better 
approximation for later fitting
+            // calculations.
+            long endPosition = dest.position() + baseOffset;
+            node.nodeSize = (int) (endPosition - nodePosition);
+        }
+
+        for (Node<VALUE> child : childrenToClear)
+            child.filePos = -1;
+        return nodePosition;
+    }
+
+    private int recalcTotalSizeRecursiveOnHeap(Node<VALUE> node, long 
nodePosition) throws IOException
+    {
+        if (node.hasOutOfPageInBranch)
+            new RecalcTotalSizeRecursion(node, null, nodePosition).process();
+
+        if (node.hasOutOfPageChildren || node.hasOutOfPageInBranch)
+            node.nodeSize = serializer.sizeofNode(node, nodePosition + 
node.branchSize);
+
+        return node.branchSize + node.nodeSize;
+    }
+
+    private long writeRecursiveOnHeap(Node<VALUE> node) throws IOException
+    {
+        return new WriteRecursion(node, null).process().node.filePos;
+    }
+
+    private long writePartialRecursiveOnHeap(Node<VALUE> node, DataOutputPlus 
dest, long baseOffset) throws IOException
+    {
+        new WritePartialRecursion(node, dest, baseOffset).process();
+        long pos = node.filePos;
+        node.filePos = -1;
+        return pos;
+    }
+
+    /**
+     * Simple framework for executing recursion using on-heap linked trace to 
avoid stack overruns.
+     */
+    static abstract class Recursion<NODE>
+    {
+        final Recursion<NODE> parent;
+        final NODE node;
+        final Iterator<NODE> childIterator;
+
+        Recursion(NODE node, Iterator<NODE> childIterator, Recursion<NODE> 
parent)
+        {
+            this.parent = parent;
+            this.node = node;
+            this.childIterator = childIterator;
+        }
+
+        /**
+         * Make a child Recursion object for the given node and initialize it 
as necessary to continue processing
+         * with it.
+         *
+         * May return null if the recursion does not need to continue inside 
the child branch.
+         */
+        abstract Recursion<NODE> makeChild(NODE child);
+
+        /**
+         * Complete the processing this Recursion object.
+         *

Review Comment:
   Done



##########
src/java/org/apache/cassandra/io/tries/IncrementalDeepTrieWriterPageAware.java:
##########
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.tries;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+/**
+ * This class is a variant of {@link IncrementalTrieWriterPageAware} which is 
able to build even very deep
+ * tries. While the parent class uses recursion for clarity, it may end up 
with stack overflow for tries with
+ * very long keys. This implementation can switch processing from stack to 
heap at a certain depth (provided
+ * as a constructor param).
+ */
+public class IncrementalDeepTrieWriterPageAware<VALUE> extends 
IncrementalTrieWriterPageAware<VALUE>
+{
+    private final int maxRecursionDepth;
+
+    public IncrementalDeepTrieWriterPageAware(TrieSerializer<VALUE, ? super 
DataOutputPlus> trieSerializer, DataOutputPlus dest, int maxRecursionDepth)
+    {
+        super(trieSerializer, dest);
+        this.maxRecursionDepth = maxRecursionDepth;
+    }
+
+    public IncrementalDeepTrieWriterPageAware(TrieSerializer<VALUE, ? super 
DataOutputPlus> trieSerializer, DataOutputPlus dest)
+    {
+        this(trieSerializer, dest, 64);
+    }
+
+    @Override
+    protected int recalcTotalSize(Node<VALUE> node, long nodePosition) throws 
IOException
+    {
+        return recalcTotalSizeRecursiveOnStack(node, nodePosition, 0);
+    }
+
+    private int recalcTotalSizeRecursiveOnStack(Node<VALUE> node, long 
nodePosition, int depth) throws IOException
+    {
+        if (node.hasOutOfPageInBranch)
+        {
+            int sz = 0;
+            for (Node<VALUE> child : node.children)
+            {
+                if (depth < maxRecursionDepth)
+                    sz += recalcTotalSizeRecursiveOnStack(child, nodePosition 
+ sz, depth + 1);
+                else
+                    sz += recalcTotalSizeRecursiveOnHeap(child, nodePosition + 
sz);
+            }
+            node.branchSize = sz;
+        }
+
+        // The sizing below will use the branch size calculated above. Since 
that can change on out-of-page in branch,
+        // we need to recalculate the size if either flag is set.
+        if (node.hasOutOfPageChildren || node.hasOutOfPageInBranch)
+            node.nodeSize = serializer.sizeofNode(node, nodePosition + 
node.branchSize);
+
+        return node.branchSize + node.nodeSize;
+    }
+
+    @Override
+    protected long write(Node<VALUE> node) throws IOException
+    {
+        return writeRecursiveOnStack(node, 0);
+    }
+
+    private long writeRecursiveOnStack(Node<VALUE> node, int depth) throws 
IOException
+    {
+        long nodePosition = dest.position();
+        for (Node<VALUE> child : node.children)
+            if (child.filePos == -1)
+            {
+                if (depth < maxRecursionDepth)
+                    child.filePos = writeRecursiveOnStack(child, depth + 1);
+                else
+                    child.filePos = writeRecursiveOnHeap(child);
+            }
+
+        nodePosition += node.branchSize;
+        assert dest.position() == nodePosition
+                : "Expected node position to be " + nodePosition + " but got " 
+ dest.position() + " after writing children.\n" + dumpNode(node, 
dest.position());
+
+        serializer.write(dest, node, nodePosition);
+
+        assert dest.position() == nodePosition + node.nodeSize
+               || dest.paddedPosition() == dest.position() // For 
PartitionIndexTest.testPointerGrowth where position may jump on page boundaries.
+                : "Expected node position to be " + (nodePosition + 
node.nodeSize) + " but got " + dest.position() + " after writing node, nodeSize 
" + node.nodeSize + ".\n" + dumpNode(node, nodePosition);
+        return nodePosition;
+    }
+
+    @Override
+    protected long writePartial(Node<VALUE> node, DataOutputPlus dest, long 
baseOffset) throws IOException
+    {
+        return writePartialRecursiveOnStack(node, dest, baseOffset, 0);
+    }
+
+    private long writePartialRecursiveOnStack(Node<VALUE> node, DataOutputPlus 
dest, long baseOffset, int depth) throws IOException
+    {
+        long startPosition = dest.position() + baseOffset;
+
+        List<Node<VALUE>> childrenToClear = new ArrayList<>();
+        for (Node<VALUE> child : node.children)
+        {
+            if (child.filePos == -1)
+            {
+                childrenToClear.add(child);
+                if (depth < maxRecursionDepth)
+                    child.filePos = writePartialRecursiveOnStack(child, dest, 
baseOffset, depth + 1);
+                else
+                    child.filePos = writePartialRecursiveOnHeap(child, dest, 
baseOffset);
+            }
+        }
+
+        long nodePosition = dest.position() + baseOffset;
+
+        if (node.hasOutOfPageInBranch)
+        {
+            // Update the branch size with the size of what we have just 
written. This may be used by the node's
+            // maxPositionDelta, and it's a better approximation for later 
fitting calculations.
+            node.branchSize = (int) (nodePosition - startPosition);
+        }
+
+        serializer.write(dest, node, nodePosition);
+
+        if (node.hasOutOfPageChildren || node.hasOutOfPageInBranch)
+        {
+            // Update the node size with what we have just seen. It's a better 
approximation for later fitting
+            // calculations.
+            long endPosition = dest.position() + baseOffset;
+            node.nodeSize = (int) (endPosition - nodePosition);
+        }
+
+        for (Node<VALUE> child : childrenToClear)
+            child.filePos = -1;
+        return nodePosition;
+    }
+
+    private int recalcTotalSizeRecursiveOnHeap(Node<VALUE> node, long 
nodePosition) throws IOException
+    {
+        if (node.hasOutOfPageInBranch)
+            new RecalcTotalSizeRecursion(node, null, nodePosition).process();
+
+        if (node.hasOutOfPageChildren || node.hasOutOfPageInBranch)
+            node.nodeSize = serializer.sizeofNode(node, nodePosition + 
node.branchSize);
+
+        return node.branchSize + node.nodeSize;
+    }
+
+    private long writeRecursiveOnHeap(Node<VALUE> node) throws IOException
+    {
+        return new WriteRecursion(node, null).process().node.filePos;
+    }
+
+    private long writePartialRecursiveOnHeap(Node<VALUE> node, DataOutputPlus 
dest, long baseOffset) throws IOException
+    {
+        new WritePartialRecursion(node, dest, baseOffset).process();
+        long pos = node.filePos;
+        node.filePos = -1;
+        return pos;
+    }
+
+    /**
+     * Simple framework for executing recursion using on-heap linked trace to 
avoid stack overruns.
+     */
+    static abstract class Recursion<NODE>
+    {
+        final Recursion<NODE> parent;
+        final NODE node;
+        final Iterator<NODE> childIterator;
+
+        Recursion(NODE node, Iterator<NODE> childIterator, Recursion<NODE> 
parent)
+        {
+            this.parent = parent;
+            this.node = node;
+            this.childIterator = childIterator;
+        }
+
+        /**
+         * Make a child Recursion object for the given node and initialize it 
as necessary to continue processing
+         * with it.
+         *

Review Comment:
   Done



##########
src/java/org/apache/cassandra/io/sstable/format/bti/RowIndexWriter.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.bti;
+
+import java.io.IOException;
+
+import org.apache.cassandra.db.ClusteringComparator;
+import org.apache.cassandra.db.ClusteringPrefix;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.io.sstable.format.bti.RowIndexReader.IndexInfo;
+import org.apache.cassandra.io.tries.IncrementalTrieWriter;
+import org.apache.cassandra.io.tries.Walker;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.bytecomparable.ByteComparable;
+import org.apache.cassandra.utils.bytecomparable.ByteSource;
+
+/**
+ * Preparer / writer of row index tries.
+ *

Review Comment:
   Done



##########
src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableReaderLoadingBuilder.java:
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable.format.bti;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.KeyReader;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.format.CompressionInfoComponent;
+import org.apache.cassandra.io.sstable.format.FilterComponent;
+import org.apache.cassandra.io.sstable.format.SSTableReaderLoadingBuilder;
+import org.apache.cassandra.io.sstable.format.StatsComponent;
+import org.apache.cassandra.io.sstable.format.bti.BtiFormat.Components;
+import org.apache.cassandra.io.sstable.metadata.MetadataType;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.io.sstable.metadata.ValidationMetadata;
+import org.apache.cassandra.io.util.FileHandle;
+import org.apache.cassandra.metrics.TableMetrics;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.FilterFactory;
+import org.apache.cassandra.utils.IFilter;
+import org.apache.cassandra.utils.Throwables;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class BtiTableReaderLoadingBuilder extends 
SSTableReaderLoadingBuilder<BtiTableReader, BtiTableReader.Builder>
+{
+    private final static Logger logger = 
LoggerFactory.getLogger(BtiTableReaderLoadingBuilder.class);
+
+    private FileHandle.Builder dataFileBuilder;
+    private FileHandle.Builder partitionIndexFileBuilder;
+    private FileHandle.Builder rowIndexFileBuilder;
+
+    public BtiTableReaderLoadingBuilder(SSTable.Builder<?, ?> builder)
+    {
+        super(builder);
+    }
+
+    @Override
+    public KeyReader buildKeyReader(TableMetrics tableMetrics) throws 
IOException
+    {
+        StatsComponent statsComponent = StatsComponent.load(descriptor, 
MetadataType.STATS, MetadataType.HEADER, MetadataType.VALIDATION);
+        return createKeyReader(statsComponent.statsMetadata());
+    }
+
+    private KeyReader createKeyReader(StatsMetadata statsMetadata) throws 
IOException
+    {
+        checkNotNull(statsMetadata);
+
+        try (PartitionIndex index = 
PartitionIndex.load(partitionIndexFileBuilder(), 
tableMetadataRef.getLocal().partitioner, false);
+             CompressionMetadata compressionMetadata = 
CompressionInfoComponent.maybeLoad(descriptor, components);
+             FileHandle dFile = 
dataFileBuilder(statsMetadata).withCompressionMetadata(compressionMetadata).complete();
+             FileHandle riFile = rowIndexFileBuilder().complete())
+        {
+            return PartitionIterator.create(index,
+                                            
tableMetadataRef.getLocal().partitioner,
+                                            riFile,
+                                            dFile);
+        }
+    }
+
+    @Override
+    protected void openComponents(BtiTableReader.Builder builder, 
SSTable.Owner owner, boolean validate, boolean online) throws IOException
+    {
+        try
+        {
+            StatsComponent statsComponent = StatsComponent.load(descriptor, 
MetadataType.STATS, MetadataType.VALIDATION, MetadataType.HEADER);
+            
builder.setSerializationHeader(statsComponent.serializationHeader(builder.getTableMetadataRef().getLocal()));
+            assert !online || builder.getSerializationHeader() != null;

Review Comment:
   Changed to `checkArgument` i.e. `InvalidArgumentException` which is as much 
as that message could say anyway.



##########
src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableVerifier.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.bti;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import com.google.common.base.Throwables;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.compaction.CompactionInterruptedException;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.dht.LocalPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.io.sstable.IVerifier;
+import org.apache.cassandra.io.sstable.KeyReader;
+import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SortedTableVerifier;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.OutputHandler;
+
+public class BtiTableVerifier extends SortedTableVerifier<BtiTableReader> 
implements IVerifier
+{
+    public BtiTableVerifier(ColumnFamilyStore cfs, BtiTableReader sstable, 
OutputHandler outputHandler, boolean isOffline, Options options)
+    {
+        super(cfs, sstable, outputHandler, isOffline, options);
+    }
+
+    public void verify()
+    {
+        verifySSTableVersion();
+
+        verifySSTableMetadata();
+
+        verifyIndex();
+
+        verifyBloomFilter();
+
+        if (options.checkOwnsTokens && !isOffline && !(cfs.getPartitioner() 
instanceof LocalPartitioner))
+        {
+            if (verifyOwnedRanges() == 0)
+                return;
+        }
+
+        if (options.quick)
+            return;
+
+        if (verifyDigest() && !options.extendedVerification)
+            return;
+
+        verifySSTable();
+
+        outputHandler.output("Verify of %s succeeded. All %d rows read 
successfully", sstable, goodRows);
+    }
+
+    private void verifySSTable()
+    {
+        long rowStart;
+        outputHandler.output("Extended Verify requested, proceeding to inspect 
values");
+
+        try (VerifyController verifyController = new VerifyController(cfs);
+             KeyReader indexIterator = sstable.keyReader())
+        {
+            if (indexIterator.dataPosition() != 0)
+                markAndThrow(new RuntimeException("First row position from 
index != 0: " + indexIterator.dataPosition()));
+
+            List<Range<Token>> ownedRanges = isOffline ? 
Collections.emptyList() : 
Range.normalize(tokenLookup.apply(cfs.metadata().keyspace));
+            RangeOwnHelper rangeOwnHelper = new RangeOwnHelper(ownedRanges);
+            DecoratedKey prevKey = null;
+
+            while (!dataFile.isEOF())
+            {
+
+                if (verifyInfo.isStopRequested())
+                    throw new 
CompactionInterruptedException(verifyInfo.getCompactionInfo());
+
+                rowStart = dataFile.getFilePointer();
+                outputHandler.debug("Reading row at %d", rowStart);
+
+                DecoratedKey key = null;
+                try
+                {
+                    key = 
sstable.decorateKey(ByteBufferUtil.readWithShortLength(dataFile));
+                }
+                catch (Throwable th)
+                {
+                    throwIfFatal(th);

Review Comment:
   The verifier is not supposed to continue after any error -- changed to 
`markAndThrow`.



##########
src/java/org/apache/cassandra/io/sstable/format/bti/RowIndexReverseIterator.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.bti;
+
+import java.io.PrintStream;
+
+import org.apache.cassandra.io.sstable.format.bti.RowIndexReader.IndexInfo;
+import org.apache.cassandra.io.tries.ReverseValueIterator;
+import org.apache.cassandra.io.util.FileHandle;
+import org.apache.cassandra.utils.bytecomparable.ByteComparable;
+
+/**
+ * Reverse iterator over the row index. Needed to get previous index blocks 
for reverse iteration.
+ */
+class RowIndexReverseIterator extends 
ReverseValueIterator<RowIndexReverseIterator>
+{
+    private long currentNode = -1;
+
+    public RowIndexReverseIterator(FileHandle file, long root, ByteComparable 
start, ByteComparable end)
+    {
+        super(file.instantiateRebufferer(null), root, start, end, true);
+    }
+
+    public RowIndexReverseIterator(FileHandle file, TrieIndexEntry entry, 
ByteComparable end)
+    {
+        this(file, entry.indexTrieRoot, ByteComparable.EMPTY, end);
+    }
+
+    /**
+     * This method must be async-read-safe.
+     */
+    public IndexInfo nextIndexInfo()
+    {
+        if (currentNode == -1)
+        {
+            currentNode = nextPayloadedNode();
+            if (currentNode == -1)
+                return null;
+        }
+
+        go(currentNode);
+        IndexInfo info = RowIndexReader.readPayload(buf, payloadPosition(), 
payloadFlags());
+
+        currentNode = -1;
+        return info;
+    }
+
+    public void dumpTrie(PrintStream out)

Review Comment:
   Added `@SuppressWarnings`.



##########
src/java/org/apache/cassandra/io/sstable/format/bti/RowIndexReverseIterator.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.bti;
+
+import java.io.PrintStream;
+
+import org.apache.cassandra.io.sstable.format.bti.RowIndexReader.IndexInfo;
+import org.apache.cassandra.io.tries.ReverseValueIterator;
+import org.apache.cassandra.io.util.FileHandle;
+import org.apache.cassandra.utils.bytecomparable.ByteComparable;
+
+/**
+ * Reverse iterator over the row index. Needed to get previous index blocks 
for reverse iteration.
+ */
+class RowIndexReverseIterator extends 
ReverseValueIterator<RowIndexReverseIterator>
+{
+    private long currentNode = -1;
+
+    public RowIndexReverseIterator(FileHandle file, long root, ByteComparable 
start, ByteComparable end)
+    {
+        super(file.instantiateRebufferer(null), root, start, end, true);
+    }
+
+    public RowIndexReverseIterator(FileHandle file, TrieIndexEntry entry, 
ByteComparable end)
+    {
+        this(file, entry.indexTrieRoot, ByteComparable.EMPTY, end);
+    }
+
+    /**
+     * This method must be async-read-safe.
+     */
+    public IndexInfo nextIndexInfo()
+    {
+        if (currentNode == -1)
+        {
+            currentNode = nextPayloadedNode();
+            if (currentNode == -1)
+                return null;
+        }
+
+        go(currentNode);
+        IndexInfo info = RowIndexReader.readPayload(buf, payloadPosition(), 
payloadFlags());
+
+        currentNode = -1;
+        return info;
+    }
+
+    public void dumpTrie(PrintStream out)
+    {
+        dumpTrie(out, (buf, ppos, bits) -> {
+            IndexInfo ii = RowIndexReader.readPayload(buf, ppos, bits);

Review Comment:
   Extracted method in `RowIndexReader` to avoid code repetition.



##########
src/java/org/apache/cassandra/io/tries/ValueIterator.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.tries;
+
+import org.apache.cassandra.io.util.Rebufferer;
+import org.apache.cassandra.utils.bytecomparable.ByteComparable;
+import org.apache.cassandra.utils.bytecomparable.ByteSource;
+
+/**
+ * Thread-unsafe value iterator for on-disk tries. Uses the assumptions of 
Walker.
+ */
+public class ValueIterator<CONCRETE extends ValueIterator<CONCRETE>> extends 
Walker<CONCRETE>
+{
+    private final ByteSource limit;
+    private IterationPosition stack;
+    private long next;
+
+    static class IterationPosition
+    {
+        long node;

Review Comment:
   Done



##########
src/java/org/apache/cassandra/io/tries/IncrementalDeepTrieWriterPageAware.java:
##########
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.tries;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+/**
+ * This class is a variant of {@link IncrementalTrieWriterPageAware} which is 
able to build even very deep
+ * tries. While the parent class uses recursion for clarity, it may end up 
with stack overflow for tries with
+ * very long keys. This implementation can switch processing from stack to 
heap at a certain depth (provided
+ * as a constructor param).
+ */
+public class IncrementalDeepTrieWriterPageAware<VALUE> extends 
IncrementalTrieWriterPageAware<VALUE>
+{
+    private final int maxRecursionDepth;
+
+    public IncrementalDeepTrieWriterPageAware(TrieSerializer<VALUE, ? super 
DataOutputPlus> trieSerializer, DataOutputPlus dest, int maxRecursionDepth)
+    {
+        super(trieSerializer, dest);
+        this.maxRecursionDepth = maxRecursionDepth;
+    }
+
+    public IncrementalDeepTrieWriterPageAware(TrieSerializer<VALUE, ? super 
DataOutputPlus> trieSerializer, DataOutputPlus dest)
+    {
+        this(trieSerializer, dest, 64);
+    }
+
+    @Override
+    protected int recalcTotalSize(Node<VALUE> node, long nodePosition) throws 
IOException
+    {
+        return recalcTotalSizeRecursiveOnStack(node, nodePosition, 0);
+    }
+
+    private int recalcTotalSizeRecursiveOnStack(Node<VALUE> node, long 
nodePosition, int depth) throws IOException
+    {
+        if (node.hasOutOfPageInBranch)
+        {
+            int sz = 0;
+            for (Node<VALUE> child : node.children)
+            {
+                if (depth < maxRecursionDepth)
+                    sz += recalcTotalSizeRecursiveOnStack(child, nodePosition 
+ sz, depth + 1);
+                else
+                    sz += recalcTotalSizeRecursiveOnHeap(child, nodePosition + 
sz);
+            }
+            node.branchSize = sz;
+        }
+
+        // The sizing below will use the branch size calculated above. Since 
that can change on out-of-page in branch,
+        // we need to recalculate the size if either flag is set.
+        if (node.hasOutOfPageChildren || node.hasOutOfPageInBranch)
+            node.nodeSize = serializer.sizeofNode(node, nodePosition + 
node.branchSize);
+
+        return node.branchSize + node.nodeSize;
+    }
+
+    @Override
+    protected long write(Node<VALUE> node) throws IOException
+    {
+        return writeRecursiveOnStack(node, 0);
+    }
+
+    private long writeRecursiveOnStack(Node<VALUE> node, int depth) throws 
IOException
+    {
+        long nodePosition = dest.position();
+        for (Node<VALUE> child : node.children)
+            if (child.filePos == -1)
+            {
+                if (depth < maxRecursionDepth)
+                    child.filePos = writeRecursiveOnStack(child, depth + 1);
+                else
+                    child.filePos = writeRecursiveOnHeap(child);
+            }
+
+        nodePosition += node.branchSize;
+        assert dest.position() == nodePosition
+                : "Expected node position to be " + nodePosition + " but got " 
+ dest.position() + " after writing children.\n" + dumpNode(node, 
dest.position());
+
+        serializer.write(dest, node, nodePosition);
+
+        assert dest.position() == nodePosition + node.nodeSize
+               || dest.paddedPosition() == dest.position() // For 
PartitionIndexTest.testPointerGrowth where position may jump on page boundaries.
+                : "Expected node position to be " + (nodePosition + 
node.nodeSize) + " but got " + dest.position() + " after writing node, nodeSize 
" + node.nodeSize + ".\n" + dumpNode(node, nodePosition);
+        return nodePosition;
+    }
+
+    @Override
+    protected long writePartial(Node<VALUE> node, DataOutputPlus dest, long 
baseOffset) throws IOException
+    {
+        return writePartialRecursiveOnStack(node, dest, baseOffset, 0);
+    }
+
+    private long writePartialRecursiveOnStack(Node<VALUE> node, DataOutputPlus 
dest, long baseOffset, int depth) throws IOException
+    {
+        long startPosition = dest.position() + baseOffset;
+
+        List<Node<VALUE>> childrenToClear = new ArrayList<>();
+        for (Node<VALUE> child : node.children)
+        {
+            if (child.filePos == -1)
+            {
+                childrenToClear.add(child);
+                if (depth < maxRecursionDepth)
+                    child.filePos = writePartialRecursiveOnStack(child, dest, 
baseOffset, depth + 1);
+                else
+                    child.filePos = writePartialRecursiveOnHeap(child, dest, 
baseOffset);
+            }
+        }
+
+        long nodePosition = dest.position() + baseOffset;
+
+        if (node.hasOutOfPageInBranch)
+        {
+            // Update the branch size with the size of what we have just 
written. This may be used by the node's
+            // maxPositionDelta, and it's a better approximation for later 
fitting calculations.
+            node.branchSize = (int) (nodePosition - startPosition);
+        }
+
+        serializer.write(dest, node, nodePosition);
+
+        if (node.hasOutOfPageChildren || node.hasOutOfPageInBranch)
+        {
+            // Update the node size with what we have just seen. It's a better 
approximation for later fitting
+            // calculations.
+            long endPosition = dest.position() + baseOffset;
+            node.nodeSize = (int) (endPosition - nodePosition);
+        }
+
+        for (Node<VALUE> child : childrenToClear)
+            child.filePos = -1;
+        return nodePosition;
+    }
+
+    private int recalcTotalSizeRecursiveOnHeap(Node<VALUE> node, long 
nodePosition) throws IOException
+    {
+        if (node.hasOutOfPageInBranch)
+            new RecalcTotalSizeRecursion(node, null, nodePosition).process();
+
+        if (node.hasOutOfPageChildren || node.hasOutOfPageInBranch)
+            node.nodeSize = serializer.sizeofNode(node, nodePosition + 
node.branchSize);
+
+        return node.branchSize + node.nodeSize;
+    }
+
+    private long writeRecursiveOnHeap(Node<VALUE> node) throws IOException
+    {
+        return new WriteRecursion(node, null).process().node.filePos;
+    }
+
+    private long writePartialRecursiveOnHeap(Node<VALUE> node, DataOutputPlus 
dest, long baseOffset) throws IOException
+    {
+        new WritePartialRecursion(node, dest, baseOffset).process();
+        long pos = node.filePos;
+        node.filePos = -1;
+        return pos;
+    }
+
+    /**
+     * Simple framework for executing recursion using on-heap linked trace to 
avoid stack overruns.
+     */
+    static abstract class Recursion<NODE>
+    {
+        final Recursion<NODE> parent;
+        final NODE node;
+        final Iterator<NODE> childIterator;
+
+        Recursion(NODE node, Iterator<NODE> childIterator, Recursion<NODE> 
parent)
+        {
+            this.parent = parent;
+            this.node = node;
+            this.childIterator = childIterator;
+        }
+
+        /**
+         * Make a child Recursion object for the given node and initialize it 
as necessary to continue processing
+         * with it.
+         *
+         * May return null if the recursion does not need to continue inside 
the child branch.
+         */
+        abstract Recursion<NODE> makeChild(NODE child);
+
+        /**
+         * Complete the processing this Recursion object.
+         *
+         * Note: this method is not called for the nodes for which makeChild() 
returns null.
+         */
+        abstract void complete() throws IOException;
+
+        /**
+         * Complete processing of the given child (possibly retrieve data to 
apply to any accumulation performed
+         * in this Recursion object).
+         *
+         * This is called when processing a child completes, including when 
recursion inside the child branch
+         * is skipped by makeChild() returning null.
+         */
+        void completeChild(NODE child)
+        {}
+
+        /**
+         * Recursive process, in depth-first order, the branch rooted at this 
recursion node.
+         *

Review Comment:
   Done



##########
src/java/org/apache/cassandra/io/tries/ValueIterator.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.tries;
+
+import org.apache.cassandra.io.util.Rebufferer;
+import org.apache.cassandra.utils.bytecomparable.ByteComparable;
+import org.apache.cassandra.utils.bytecomparable.ByteSource;
+
+/**
+ * Thread-unsafe value iterator for on-disk tries. Uses the assumptions of 
Walker.
+ */
+public class ValueIterator<CONCRETE extends ValueIterator<CONCRETE>> extends 
Walker<CONCRETE>
+{
+    private final ByteSource limit;
+    private IterationPosition stack;
+    private long next;
+
+    static class IterationPosition
+    {
+        long node;
+        int childIndex;
+        int limit;
+        IterationPosition prev;

Review Comment:
   Done



##########
src/java/org/apache/cassandra/io/tries/ValueIterator.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.tries;
+
+import org.apache.cassandra.io.util.Rebufferer;
+import org.apache.cassandra.utils.bytecomparable.ByteComparable;
+import org.apache.cassandra.utils.bytecomparable.ByteSource;
+
+/**
+ * Thread-unsafe value iterator for on-disk tries. Uses the assumptions of 
Walker.
+ */
+public class ValueIterator<CONCRETE extends ValueIterator<CONCRETE>> extends 
Walker<CONCRETE>
+{
+    private final ByteSource limit;
+    private IterationPosition stack;
+    private long next;
+
+    static class IterationPosition
+    {
+        long node;
+        int childIndex;
+        int limit;

Review Comment:
   Done



##########
src/java/org/apache/cassandra/io/tries/IncrementalDeepTrieWriterPageAware.java:
##########
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.tries;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+/**
+ * This class is a variant of {@link IncrementalTrieWriterPageAware} which is 
able to build even very deep
+ * tries. While the parent class uses recursion for clarity, it may end up 
with stack overflow for tries with
+ * very long keys. This implementation can switch processing from stack to 
heap at a certain depth (provided
+ * as a constructor param).
+ */
+public class IncrementalDeepTrieWriterPageAware<VALUE> extends 
IncrementalTrieWriterPageAware<VALUE>
+{
+    private final int maxRecursionDepth;
+
+    public IncrementalDeepTrieWriterPageAware(TrieSerializer<VALUE, ? super 
DataOutputPlus> trieSerializer, DataOutputPlus dest, int maxRecursionDepth)
+    {
+        super(trieSerializer, dest);
+        this.maxRecursionDepth = maxRecursionDepth;
+    }
+
+    public IncrementalDeepTrieWriterPageAware(TrieSerializer<VALUE, ? super 
DataOutputPlus> trieSerializer, DataOutputPlus dest)
+    {
+        this(trieSerializer, dest, 64);
+    }
+
+    @Override
+    protected int recalcTotalSize(Node<VALUE> node, long nodePosition) throws 
IOException
+    {
+        return recalcTotalSizeRecursiveOnStack(node, nodePosition, 0);
+    }
+
+    private int recalcTotalSizeRecursiveOnStack(Node<VALUE> node, long 
nodePosition, int depth) throws IOException
+    {
+        if (node.hasOutOfPageInBranch)
+        {
+            int sz = 0;
+            for (Node<VALUE> child : node.children)
+            {
+                if (depth < maxRecursionDepth)
+                    sz += recalcTotalSizeRecursiveOnStack(child, nodePosition 
+ sz, depth + 1);
+                else
+                    sz += recalcTotalSizeRecursiveOnHeap(child, nodePosition + 
sz);
+            }
+            node.branchSize = sz;
+        }
+
+        // The sizing below will use the branch size calculated above. Since 
that can change on out-of-page in branch,
+        // we need to recalculate the size if either flag is set.
+        if (node.hasOutOfPageChildren || node.hasOutOfPageInBranch)
+            node.nodeSize = serializer.sizeofNode(node, nodePosition + 
node.branchSize);
+
+        return node.branchSize + node.nodeSize;
+    }
+
+    @Override
+    protected long write(Node<VALUE> node) throws IOException
+    {
+        return writeRecursiveOnStack(node, 0);
+    }
+
+    private long writeRecursiveOnStack(Node<VALUE> node, int depth) throws 
IOException
+    {
+        long nodePosition = dest.position();
+        for (Node<VALUE> child : node.children)
+            if (child.filePos == -1)
+            {
+                if (depth < maxRecursionDepth)
+                    child.filePos = writeRecursiveOnStack(child, depth + 1);
+                else
+                    child.filePos = writeRecursiveOnHeap(child);
+            }
+
+        nodePosition += node.branchSize;
+        assert dest.position() == nodePosition
+                : "Expected node position to be " + nodePosition + " but got " 
+ dest.position() + " after writing children.\n" + dumpNode(node, 
dest.position());
+
+        serializer.write(dest, node, nodePosition);
+
+        assert dest.position() == nodePosition + node.nodeSize
+               || dest.paddedPosition() == dest.position() // For 
PartitionIndexTest.testPointerGrowth where position may jump on page boundaries.
+                : "Expected node position to be " + (nodePosition + 
node.nodeSize) + " but got " + dest.position() + " after writing node, nodeSize 
" + node.nodeSize + ".\n" + dumpNode(node, nodePosition);
+        return nodePosition;
+    }
+
+    @Override
+    protected long writePartial(Node<VALUE> node, DataOutputPlus dest, long 
baseOffset) throws IOException
+    {
+        return writePartialRecursiveOnStack(node, dest, baseOffset, 0);
+    }
+
+    private long writePartialRecursiveOnStack(Node<VALUE> node, DataOutputPlus 
dest, long baseOffset, int depth) throws IOException
+    {
+        long startPosition = dest.position() + baseOffset;
+
+        List<Node<VALUE>> childrenToClear = new ArrayList<>();
+        for (Node<VALUE> child : node.children)
+        {
+            if (child.filePos == -1)
+            {
+                childrenToClear.add(child);
+                if (depth < maxRecursionDepth)
+                    child.filePos = writePartialRecursiveOnStack(child, dest, 
baseOffset, depth + 1);
+                else
+                    child.filePos = writePartialRecursiveOnHeap(child, dest, 
baseOffset);
+            }
+        }
+
+        long nodePosition = dest.position() + baseOffset;
+
+        if (node.hasOutOfPageInBranch)
+        {
+            // Update the branch size with the size of what we have just 
written. This may be used by the node's
+            // maxPositionDelta, and it's a better approximation for later 
fitting calculations.
+            node.branchSize = (int) (nodePosition - startPosition);
+        }
+
+        serializer.write(dest, node, nodePosition);
+
+        if (node.hasOutOfPageChildren || node.hasOutOfPageInBranch)
+        {
+            // Update the node size with what we have just seen. It's a better 
approximation for later fitting
+            // calculations.
+            long endPosition = dest.position() + baseOffset;
+            node.nodeSize = (int) (endPosition - nodePosition);
+        }
+
+        for (Node<VALUE> child : childrenToClear)
+            child.filePos = -1;
+        return nodePosition;
+    }
+
+    private int recalcTotalSizeRecursiveOnHeap(Node<VALUE> node, long 
nodePosition) throws IOException
+    {
+        if (node.hasOutOfPageInBranch)
+            new RecalcTotalSizeRecursion(node, null, nodePosition).process();
+
+        if (node.hasOutOfPageChildren || node.hasOutOfPageInBranch)
+            node.nodeSize = serializer.sizeofNode(node, nodePosition + 
node.branchSize);
+
+        return node.branchSize + node.nodeSize;
+    }
+
+    private long writeRecursiveOnHeap(Node<VALUE> node) throws IOException
+    {
+        return new WriteRecursion(node, null).process().node.filePos;
+    }
+
+    private long writePartialRecursiveOnHeap(Node<VALUE> node, DataOutputPlus 
dest, long baseOffset) throws IOException
+    {
+        new WritePartialRecursion(node, dest, baseOffset).process();
+        long pos = node.filePos;
+        node.filePos = -1;
+        return pos;
+    }
+
+    /**
+     * Simple framework for executing recursion using on-heap linked trace to 
avoid stack overruns.
+     */
+    static abstract class Recursion<NODE>
+    {
+        final Recursion<NODE> parent;
+        final NODE node;
+        final Iterator<NODE> childIterator;
+
+        Recursion(NODE node, Iterator<NODE> childIterator, Recursion<NODE> 
parent)
+        {
+            this.parent = parent;
+            this.node = node;
+            this.childIterator = childIterator;
+        }
+
+        /**
+         * Make a child Recursion object for the given node and initialize it 
as necessary to continue processing
+         * with it.
+         *
+         * May return null if the recursion does not need to continue inside 
the child branch.
+         */
+        abstract Recursion<NODE> makeChild(NODE child);
+
+        /**
+         * Complete the processing this Recursion object.
+         *
+         * Note: this method is not called for the nodes for which makeChild() 
returns null.
+         */
+        abstract void complete() throws IOException;
+
+        /**
+         * Complete processing of the given child (possibly retrieve data to 
apply to any accumulation performed
+         * in this Recursion object).
+         *

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to