blambov commented on code in PR #2267:
URL: https://github.com/apache/cassandra/pull/2267#discussion_r1170004427


##########
src/java/org/apache/cassandra/db/DeletionTime.java:
##########
@@ -191,4 +201,4 @@ public long serializedSize(DeletionTime delTime)
                  + TypeSizes.sizeof(delTime.markedForDeleteAt());
         }
     }
-}
+}

Review Comment:
   Reverted



##########
src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableScrubber.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.bti;
+
+import java.io.IOError;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.db.compaction.CompactionInterruptedException;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterators;
+import org.apache.cassandra.io.sstable.IScrubber;
+import org.apache.cassandra.io.sstable.SSTableRewriter;
+import org.apache.cassandra.io.sstable.format.SortedTableScrubber;
+import org.apache.cassandra.io.sstable.format.bti.BtiFormat.Components;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.OutputHandler;
+import org.apache.cassandra.utils.Throwables;
+
+public class BtiTableScrubber extends SortedTableScrubber<BtiTableReader> 
implements IScrubber
+{
+    private final boolean isIndex;
+    private ScrubPartitionIterator indexIterator;
+
+    public BtiTableScrubber(ColumnFamilyStore cfs,
+                            LifecycleTransaction transaction,
+                            OutputHandler outputHandler,
+                            IScrubber.Options options)
+    {
+        super(cfs, transaction, outputHandler, options);
+
+        boolean hasIndexFile = 
sstable.getComponents().contains(Components.PARTITION_INDEX);
+        this.isIndex = cfs.isIndex();
+        if (!hasIndexFile)
+        {
+            // if there's any corruption in the -Data.db then partitions can't 
be skipped over. but it's worth a shot.
+            outputHandler.warn("Missing index component");
+        }
+
+        try
+        {
+            this.indexIterator = hasIndexFile
+                                 ? openIndexIterator()
+                                 : null;
+        }
+        catch (RuntimeException ex)
+        {
+            outputHandler.warn("Detected corruption in the index file - cannot 
open index iterator", ex);
+        }
+    }
+
+    private ScrubPartitionIterator openIndexIterator()
+    {
+        try
+        {
+            return sstable.scrubPartitionsIterator();
+        }
+        catch (IOException e)
+        {
+            outputHandler.warn("Index is unreadable.");

Review Comment:
   Added stack, changed handled to catch all errors, and added "scrubbing will 
continue without index."



##########
src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableVerifier.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.bti;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import com.google.common.base.Throwables;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.compaction.CompactionInterruptedException;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.dht.LocalPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.io.sstable.IVerifier;
+import org.apache.cassandra.io.sstable.KeyReader;
+import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SortedTableVerifier;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.OutputHandler;
+
+public class BtiTableVerifier extends SortedTableVerifier<BtiTableReader> 
implements IVerifier
+{
+    public BtiTableVerifier(ColumnFamilyStore cfs, BtiTableReader sstable, 
OutputHandler outputHandler, boolean isOffline, Options options)
+    {
+        super(cfs, sstable, outputHandler, isOffline, options);
+    }
+
+    public void verify()
+    {
+        verifySSTableVersion();
+
+        verifySSTableMetadata();
+
+        verifyIndex();
+
+        verifyBloomFilter();
+
+        if (options.checkOwnsTokens && !isOffline && !(cfs.getPartitioner() 
instanceof LocalPartitioner))
+        {
+            if (verifyOwnedRanges() == 0)
+                return;
+        }
+
+        if (options.quick)
+            return;
+
+        if (verifyDigest() && !options.extendedVerification)
+            return;
+
+        verifySSTable();
+
+        outputHandler.output("Verify of %s succeeded. All %d rows read 
successfully", sstable, goodRows);
+    }
+
+    private void verifySSTable()
+    {
+        long rowStart;

Review Comment:
   Done



##########
src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java:
##########
@@ -337,6 +337,41 @@ private void seekToChunkStart()
         }
     }
 
+    // Page management using chunk boundaries

Review Comment:
   This is a comment for the next several methods that implement 
page-management functionality.
   
   Removed now, together with the methods which are currently unused.



##########
src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableVerifier.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.bti;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import com.google.common.base.Throwables;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.compaction.CompactionInterruptedException;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.dht.LocalPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.io.sstable.IVerifier;
+import org.apache.cassandra.io.sstable.KeyReader;
+import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SortedTableVerifier;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.OutputHandler;
+
+public class BtiTableVerifier extends SortedTableVerifier<BtiTableReader> 
implements IVerifier
+{
+    public BtiTableVerifier(ColumnFamilyStore cfs, BtiTableReader sstable, 
OutputHandler outputHandler, boolean isOffline, Options options)
+    {
+        super(cfs, sstable, outputHandler, isOffline, options);
+    }
+
+    public void verify()
+    {
+        verifySSTableVersion();
+
+        verifySSTableMetadata();
+
+        verifyIndex();
+
+        verifyBloomFilter();

Review Comment:
   Moved most functionality into the shared `SortedTableVerifier`.



##########
src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableVerifier.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.bti;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import com.google.common.base.Throwables;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.compaction.CompactionInterruptedException;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.dht.LocalPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.io.sstable.IVerifier;
+import org.apache.cassandra.io.sstable.KeyReader;
+import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SortedTableVerifier;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.OutputHandler;
+
+public class BtiTableVerifier extends SortedTableVerifier<BtiTableReader> 
implements IVerifier
+{
+    public BtiTableVerifier(ColumnFamilyStore cfs, BtiTableReader sstable, 
OutputHandler outputHandler, boolean isOffline, Options options)
+    {
+        super(cfs, sstable, outputHandler, isOffline, options);
+    }
+
+    public void verify()
+    {
+        verifySSTableVersion();
+
+        verifySSTableMetadata();
+
+        verifyIndex();
+
+        verifyBloomFilter();
+
+        if (options.checkOwnsTokens && !isOffline && !(cfs.getPartitioner() 
instanceof LocalPartitioner))
+        {
+            if (verifyOwnedRanges() == 0)
+                return;
+        }
+
+        if (options.quick)
+            return;
+
+        if (verifyDigest() && !options.extendedVerification)
+            return;
+
+        verifySSTable();
+
+        outputHandler.output("Verify of %s succeeded. All %d rows read 
successfully", sstable, goodRows);
+    }
+
+    private void verifySSTable()
+    {
+        long rowStart;
+        outputHandler.output("Extended Verify requested, proceeding to inspect 
values");
+
+        try (VerifyController verifyController = new VerifyController(cfs);
+             KeyReader indexIterator = sstable.keyReader())
+        {
+            if (indexIterator.dataPosition() != 0)
+                markAndThrow(new RuntimeException("First row position from 
index != 0: " + indexIterator.dataPosition()));
+
+            List<Range<Token>> ownedRanges = isOffline ? 
Collections.emptyList() : 
Range.normalize(tokenLookup.apply(cfs.metadata().keyspace));
+            RangeOwnHelper rangeOwnHelper = new RangeOwnHelper(ownedRanges);
+            DecoratedKey prevKey = null;
+
+            while (!dataFile.isEOF())
+            {
+
+                if (verifyInfo.isStopRequested())
+                    throw new 
CompactionInterruptedException(verifyInfo.getCompactionInfo());
+
+                rowStart = dataFile.getFilePointer();
+                outputHandler.debug("Reading row at %d", rowStart);
+
+                DecoratedKey key = null;
+                try
+                {
+                    key = 
sstable.decorateKey(ByteBufferUtil.readWithShortLength(dataFile));
+                }
+                catch (Throwable th)
+                {
+                    throwIfFatal(th);
+                    // check for null key below
+                }
+
+                if (options.checkOwnsTokens && ownedRanges.size() > 0 && 
!(cfs.getPartitioner() instanceof LocalPartitioner))
+                {
+                    try
+                    {
+                        rangeOwnHelper.validate(key);
+                    }
+                    catch (Throwable t)
+                    {
+                        outputHandler.warn(t, "Key %s in sstable %s not owned 
by local ranges %s", key, sstable, ownedRanges);
+                        markAndThrow(t);
+                    }
+                }
+
+                ByteBuffer currentIndexKey = indexIterator.key();
+                long nextRowPositionFromIndex = 0;
+                try
+                {
+                    nextRowPositionFromIndex = indexIterator.advance()
+                                               ? indexIterator.dataPosition()
+                                               : dataFile.length();
+                }
+                catch (Throwable th)
+                {
+                    markAndThrow(th);
+                }
+
+                long dataStart = dataFile.getFilePointer();
+                long dataStartFromIndex = currentIndexKey == null
+                                          ? -1
+                                          : rowStart + 2 + 
currentIndexKey.remaining();
+
+                long dataSize = nextRowPositionFromIndex - dataStartFromIndex;
+                // avoid an NPE if key is null
+                String keyName = key == null ? "(unreadable key)" : 
ByteBufferUtil.bytesToHex(key.getKey());
+                outputHandler.debug("row %s is %s", keyName, 
FBUtilities.prettyPrintMemory(dataSize));
+
+                try
+                {
+                    if (key == null || dataSize > dataFile.length())
+                        markAndThrow(new RuntimeException(String.format("key = 
%s, dataSize=%d, dataFile.length() = %d", key, dataSize, dataFile.length())));
+
+                    //mimic the scrub read path, intentionally unused
+                    try (UnfilteredRowIterator ignored = 
SSTableIdentityIterator.create(sstable, dataFile, key)) {
+                        // no-op, just open and close
+                    }
+
+                    if ((prevKey != null && prevKey.compareTo(key) > 0) || 
!key.getKey().equals(currentIndexKey) || dataStart != dataStartFromIndex)
+                        markAndThrow(new RuntimeException("Key out of order: 
previous = " + prevKey + " : current = " + key));
+
+                    goodRows++;
+                    prevKey = key;
+
+
+                    outputHandler.debug("Row %s at %s valid, moving to next 
row at %s ", goodRows, rowStart, nextRowPositionFromIndex);
+                    dataFile.seek(nextRowPositionFromIndex);
+                }
+                catch (Throwable th)
+                {
+                    markAndThrow(th);
+                }
+            }
+        }
+        catch (Throwable t)
+        {
+            throw Throwables.propagate(t);

Review Comment:
   Fixed



##########
src/java/org/apache/cassandra/io/sstable/format/bti/SSTableIterator.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.bti;
+
+import java.io.IOException;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Slice;
+import org.apache.cassandra.db.Slices;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.io.sstable.AbstractRowIndexEntry;
+import org.apache.cassandra.io.sstable.AbstractSSTableIterator;
+import org.apache.cassandra.io.sstable.format.bti.RowIndexReader.IndexInfo;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.FileHandle;
+
+/**
+ *  Unfiltered row iterator over a BTI SSTable.
+ */
+class SSTableIterator extends AbstractSSTableIterator<AbstractRowIndexEntry>
+{
+    /**
+     * The index of the slice being processed.
+     */
+    private int slice;
+
+    public SSTableIterator(BtiTableReader sstable,
+                           FileDataInput file,
+                           DecoratedKey key,
+                           AbstractRowIndexEntry indexEntry,
+                           Slices slices,
+                           ColumnFilter columns,
+                           FileHandle ifile)
+    {
+        super(sstable, file, key, indexEntry, slices, columns, ifile);
+    }
+
+    protected Reader createReaderInternal(AbstractRowIndexEntry indexEntry, 
FileDataInput file, boolean shouldCloseFile)
+    {
+        if (indexEntry.isIndexed())
+            return new ForwardIndexedReader(indexEntry, file, shouldCloseFile);
+        else
+            return new ForwardReader(file, shouldCloseFile);
+    }
+
+    protected int nextSliceIndex()
+    {
+        int next = slice;
+        slice++;
+        return next;
+    }
+
+    protected boolean hasMoreSlices()
+    {
+        return slice < slices.size();
+    }
+
+    public boolean isReverseOrder()
+    {
+        return false;
+    }
+
+    private class ForwardIndexedReader extends ForwardReader
+    {
+        private final RowIndexReader indexReader;
+        long basePosition;

Review Comment:
   Done



##########
src/java/org/apache/cassandra/io/sstable/format/bti/SSTableReversedIterator.java:
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.bti;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+import com.carrotsearch.hppc.LongStack;
+import org.apache.cassandra.db.ClusteringBound;
+import org.apache.cassandra.db.ClusteringComparator;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Slice;
+import org.apache.cassandra.db.Slices;
+import org.apache.cassandra.db.UnfilteredValidation;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.rows.RangeTombstoneBoundMarker;
+import org.apache.cassandra.db.rows.RangeTombstoneMarker;
+import org.apache.cassandra.db.rows.Unfiltered;
+import org.apache.cassandra.io.sstable.AbstractRowIndexEntry;
+import org.apache.cassandra.io.sstable.AbstractSSTableIterator;
+import org.apache.cassandra.io.sstable.format.bti.RowIndexReader.IndexInfo;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.FileHandle;
+
+/**
+ * Unfiltered row iterator over a BTI SSTable that returns rows in reverse 
order.
+ */
+class SSTableReversedIterator extends AbstractSSTableIterator<TrieIndexEntry>
+{
+    /**
+     * The index of the slice being processed.
+     */
+    private int slice;
+
+    public SSTableReversedIterator(BtiTableReader sstable,
+                                   FileDataInput file,
+                                   DecoratedKey key,
+                                   TrieIndexEntry indexEntry,
+                                   Slices slices,
+                                   ColumnFilter columns,
+                                   FileHandle ifile)
+    {
+        super(sstable, file, key, indexEntry, slices, columns, ifile);
+    }
+
+    protected Reader createReaderInternal(TrieIndexEntry indexEntry, 
FileDataInput file, boolean shouldCloseFile)
+    {
+        if (indexEntry.isIndexed())
+            return new ReverseIndexedReader(indexEntry, file, shouldCloseFile);
+        else
+            return new ReverseReader(file, shouldCloseFile);
+    }
+
+    public boolean isReverseOrder()
+    {
+        return true;
+    }
+
+    protected int nextSliceIndex()
+    {
+        int next = slice;
+        slice++;
+        return slices.size() - (next + 1);
+    }
+
+    protected boolean hasMoreSlices()
+    {
+        return slice < slices.size();
+    }
+
+    /**
+     * Reverse iteration is performed by going through an index block (or the 
whole partition if not indexed) forwards
+     * and storing the positions of each entry that falls within the slice in 
a stack. Reverse iteration then pops out
+     * positions and reads the entries.
+     * <p>
+     * Note: The earlier version of this was constructing an in-memory view of 
the block instead, which gives better
+     * performance on bigger queries and index blocks (due to not having to 
read disk again). With the lower
+     * granularity of the tries it makes better sense to store as little as 
possible as the beginning of the block
+     * should very rarely be in other page/chunk cache locations. This has the 
benefit of being able to answer small
+     * queries (esp. LIMIT 1) faster and with less GC churn.
+     */
+    private class ReverseReader extends AbstractReader
+    {
+        LongStack rowOffsets = new LongStack();

Review Comment:
   Done



##########
src/java/org/apache/cassandra/io/sstable/format/bti/SSTableReversedIterator.java:
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.bti;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+import com.carrotsearch.hppc.LongStack;
+import org.apache.cassandra.db.ClusteringBound;
+import org.apache.cassandra.db.ClusteringComparator;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Slice;
+import org.apache.cassandra.db.Slices;
+import org.apache.cassandra.db.UnfilteredValidation;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.rows.RangeTombstoneBoundMarker;
+import org.apache.cassandra.db.rows.RangeTombstoneMarker;
+import org.apache.cassandra.db.rows.Unfiltered;
+import org.apache.cassandra.io.sstable.AbstractRowIndexEntry;
+import org.apache.cassandra.io.sstable.AbstractSSTableIterator;
+import org.apache.cassandra.io.sstable.format.bti.RowIndexReader.IndexInfo;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.FileHandle;
+
+/**
+ * Unfiltered row iterator over a BTI SSTable that returns rows in reverse 
order.
+ */
+class SSTableReversedIterator extends AbstractSSTableIterator<TrieIndexEntry>
+{
+    /**
+     * The index of the slice being processed.
+     */
+    private int slice;
+
+    public SSTableReversedIterator(BtiTableReader sstable,
+                                   FileDataInput file,
+                                   DecoratedKey key,
+                                   TrieIndexEntry indexEntry,
+                                   Slices slices,
+                                   ColumnFilter columns,
+                                   FileHandle ifile)
+    {
+        super(sstable, file, key, indexEntry, slices, columns, ifile);
+    }
+
+    protected Reader createReaderInternal(TrieIndexEntry indexEntry, 
FileDataInput file, boolean shouldCloseFile)
+    {
+        if (indexEntry.isIndexed())
+            return new ReverseIndexedReader(indexEntry, file, shouldCloseFile);
+        else
+            return new ReverseReader(file, shouldCloseFile);
+    }
+
+    public boolean isReverseOrder()
+    {
+        return true;
+    }
+
+    protected int nextSliceIndex()
+    {
+        int next = slice;
+        slice++;
+        return slices.size() - (next + 1);
+    }
+
+    protected boolean hasMoreSlices()
+    {
+        return slice < slices.size();
+    }
+
+    /**
+     * Reverse iteration is performed by going through an index block (or the 
whole partition if not indexed) forwards
+     * and storing the positions of each entry that falls within the slice in 
a stack. Reverse iteration then pops out
+     * positions and reads the entries.
+     * <p>
+     * Note: The earlier version of this was constructing an in-memory view of 
the block instead, which gives better
+     * performance on bigger queries and index blocks (due to not having to 
read disk again). With the lower
+     * granularity of the tries it makes better sense to store as little as 
possible as the beginning of the block
+     * should very rarely be in other page/chunk cache locations. This has the 
benefit of being able to answer small
+     * queries (esp. LIMIT 1) faster and with less GC churn.
+     */
+    private class ReverseReader extends AbstractReader
+    {
+        LongStack rowOffsets = new LongStack();
+        RangeTombstoneMarker blockOpenMarker, blockCloseMarker;
+        Unfiltered next = null;
+        boolean foundLessThan;
+        long startPos = -1;
+
+        private ReverseReader(FileDataInput file, boolean shouldCloseFile)
+        {
+            super(file, shouldCloseFile);
+        }
+
+        public void setForSlice(Slice slice) throws IOException
+        {
+            // read full row and filter
+            if (startPos == -1)
+                startPos = file.getFilePointer();
+            else
+                seekToPosition(startPos);
+
+            fillOffsets(slice, true, true, Long.MAX_VALUE);
+        }
+
+        protected boolean hasNextInternal() throws IOException
+        {
+            if (next != null)
+                return true;
+            next = computeNext();
+            return next != null;
+        }
+
+        protected Unfiltered nextInternal() throws IOException
+        {
+            if (!hasNextInternal())
+                throw new NoSuchElementException();
+
+            Unfiltered toReturn = next;
+            next = null;
+            return toReturn;
+        }
+
+        private Unfiltered computeNext() throws IOException
+        {
+            Unfiltered toReturn;
+            do
+            {
+                if (blockCloseMarker != null)
+                {
+                    toReturn = blockCloseMarker;
+                    blockCloseMarker = null;
+                    return toReturn;
+                }
+                while (!rowOffsets.isEmpty())
+                {
+                    seekToPosition(rowOffsets.pop());
+                    boolean hasNext = deserializer.hasNext();
+                    assert hasNext;
+                    toReturn = deserializer.readNext();
+                    UnfilteredValidation.maybeValidateUnfiltered(toReturn, 
metadata(), key, sstable);
+                    // We may get empty row for the same reason expressed on 
UnfilteredSerializer.deserializeOne.
+                    if (!toReturn.isEmpty())
+                        return toReturn;
+                }
+            }
+            while (!foundLessThan && advanceIndexBlock());
+
+            // open marker to be output only as slice is finished
+            if (blockOpenMarker != null)
+            {
+                toReturn = blockOpenMarker;
+                blockOpenMarker = null;
+                return toReturn;
+            }
+            return null;
+        }
+
+        protected boolean advanceIndexBlock() throws IOException
+        {
+            return false;
+        }
+
+        void fillOffsets(Slice slice, boolean filterStart, boolean filterEnd, 
long stopPosition) throws IOException
+        {
+            filterStart &= !slice.start().equals(ClusteringBound.BOTTOM);
+            filterEnd &= !slice.end().equals(ClusteringBound.TOP);
+            long currentPosition = -1;
+
+            ClusteringBound start = slice.start();
+            currentPosition = file.getFilePointer();
+            foundLessThan = false;
+            // This is a copy of handlePreSliceData which also checks 
currentPosition < stopPosition.
+            // Not extracted to method as we need both marker and 
currentPosition.
+            if (filterStart)
+            {
+                while (currentPosition < stopPosition && 
deserializer.hasNext() && deserializer.compareNextTo(start) <= 0)
+                {
+                    if (deserializer.nextIsRow())
+                        deserializer.skipNext();
+                    else
+                        updateOpenMarker((RangeTombstoneMarker) 
deserializer.readNext());
+
+                    currentPosition = file.getFilePointer();
+                    foundLessThan = true;
+                }
+            }
+
+            // We've reached the beginning of our queried slice. If we have an 
open marker
+            // we should return that at the end of the slice to close the 
deletion.
+            if (openMarker != null)
+                blockOpenMarker = new RangeTombstoneBoundMarker(start, 
openMarker);
+
+
+            // Now deserialize everything until we reach our requested end (if 
we have one)
+            // See SSTableIterator.ForwardRead.computeNext() for why this is a 
strict inequality below: this is the same
+            // reasoning here.
+            while (currentPosition < stopPosition && deserializer.hasNext()
+                   && (!filterEnd || deserializer.compareNextTo(slice.end()) < 
0))
+            {
+                rowOffsets.push(currentPosition);
+                if (deserializer.nextIsRow())
+                    deserializer.skipNext();
+                else
+                    updateOpenMarker((RangeTombstoneMarker) 
deserializer.readNext());
+
+                currentPosition = file.getFilePointer();
+            }
+
+            // If we have an open marker, we should output that first, unless 
end is not being filtered
+            // (i.e. it's either top (where a marker can't be open) or we 
placed that marker during previous block).
+            if (openMarker != null && filterEnd)
+            {
+                // If we have no end and still an openMarker, this means we're 
indexed and the marker is closed in a following block.
+                blockCloseMarker = new RangeTombstoneBoundMarker(slice.end(), 
openMarker);
+                openMarker = null;
+            }
+        }
+    }
+
+    private class ReverseIndexedReader extends ReverseReader
+    {
+        private RowIndexReverseIterator indexReader;
+        final TrieIndexEntry indexEntry;
+        long basePosition;

Review Comment:
   Done



##########
src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableReaderLoadingBuilder.java:
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable.format.bti;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.KeyReader;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.format.CompressionInfoComponent;
+import org.apache.cassandra.io.sstable.format.FilterComponent;
+import org.apache.cassandra.io.sstable.format.SSTableReaderLoadingBuilder;
+import org.apache.cassandra.io.sstable.format.StatsComponent;
+import org.apache.cassandra.io.sstable.format.bti.BtiFormat.Components;
+import org.apache.cassandra.io.sstable.metadata.MetadataType;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.io.sstable.metadata.ValidationMetadata;
+import org.apache.cassandra.io.util.FileHandle;
+import org.apache.cassandra.metrics.TableMetrics;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.FilterFactory;
+import org.apache.cassandra.utils.IFilter;
+import org.apache.cassandra.utils.Throwables;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class BtiTableReaderLoadingBuilder extends 
SSTableReaderLoadingBuilder<BtiTableReader, BtiTableReader.Builder>
+{
+    private final static Logger logger = 
LoggerFactory.getLogger(BtiTableReaderLoadingBuilder.class);
+
+    private FileHandle.Builder dataFileBuilder;
+    private FileHandle.Builder partitionIndexFileBuilder;
+    private FileHandle.Builder rowIndexFileBuilder;
+
+    public BtiTableReaderLoadingBuilder(SSTable.Builder<?, ?> builder)
+    {
+        super(builder);
+    }
+
+    @Override
+    public KeyReader buildKeyReader(TableMetrics tableMetrics) throws 
IOException
+    {
+        StatsComponent statsComponent = StatsComponent.load(descriptor, 
MetadataType.STATS, MetadataType.HEADER, MetadataType.VALIDATION);
+        return createKeyReader(statsComponent.statsMetadata());
+    }
+
+    private KeyReader createKeyReader(StatsMetadata statsMetadata) throws 
IOException
+    {
+        checkNotNull(statsMetadata);
+
+        try (PartitionIndex index = 
PartitionIndex.load(partitionIndexFileBuilder(), 
tableMetadataRef.getLocal().partitioner, false);
+             CompressionMetadata compressionMetadata = 
CompressionInfoComponent.maybeLoad(descriptor, components);
+             FileHandle dFile = 
dataFileBuilder(statsMetadata).withCompressionMetadata(compressionMetadata).complete();
+             FileHandle riFile = rowIndexFileBuilder().complete())
+        {
+            return PartitionIterator.create(index,
+                                            
tableMetadataRef.getLocal().partitioner,
+                                            riFile,
+                                            dFile);
+        }
+    }
+
+    @Override
+    protected void openComponents(BtiTableReader.Builder builder, 
SSTable.Owner owner, boolean validate, boolean online) throws IOException
+    {
+        try
+        {
+            StatsComponent statsComponent = StatsComponent.load(descriptor, 
MetadataType.STATS, MetadataType.VALIDATION, MetadataType.HEADER);
+            
builder.setSerializationHeader(statsComponent.serializationHeader(builder.getTableMetadataRef().getLocal()));
+            assert !online || builder.getSerializationHeader() != null;
+
+            builder.setStatsMetadata(statsComponent.statsMetadata());
+            ValidationMetadata validationMetadata = 
statsComponent.validationMetadata();
+            validatePartitioner(builder.getTableMetadataRef().getLocal(), 
validationMetadata);
+
+            boolean filterNeeded = online;
+            if (filterNeeded)
+                builder.setFilter(loadFilter(validationMetadata));
+            boolean rebuildFilter = filterNeeded && builder.getFilter() == 
null;
+
+            if (descriptor.version.hasKeyRange())
+            {
+                IPartitioner partitioner = 
tableMetadataRef.getLocal().partitioner;
+                
builder.setFirst(partitioner.decorateKey(builder.getStatsMetadata().firstKey));
+                
builder.setLast(partitioner.decorateKey(builder.getStatsMetadata().lastKey));
+            }
+
+            if (builder.getComponents().contains(Components.PARTITION_INDEX) 
&& builder.getComponents().contains(Components.ROW_INDEX) && rebuildFilter)
+            {
+                @SuppressWarnings("resource")

Review Comment:
   Done



##########
src/java/org/apache/cassandra/io/sstable/format/bti/PartitionIterator.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.bti;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.sstable.KeyReader;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.FileHandle;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Throwables;
+
+import static 
org.apache.cassandra.utils.FBUtilities.immutableListWithFilteredNulls;
+
+class PartitionIterator extends PartitionIndex.IndexPosIterator implements 
KeyReader
+{
+    private final PartitionIndex partitionIndex;
+    private final IPartitioner partitioner;
+    private final PartitionPosition limit;
+    private final int exclusiveLimit;
+    private final FileHandle dataFile;
+    private final FileHandle rowIndexFile;
+
+    private FileDataInput dataInput;
+    private FileDataInput indexInput;
+
+    private DecoratedKey currentKey;
+    private TrieIndexEntry currentEntry;
+    private DecoratedKey nextKey;
+    private TrieIndexEntry nextEntry;
+
+    @SuppressWarnings("resource")

Review Comment:
   Done



##########
src/java/org/apache/cassandra/io/sstable/format/bti/PartitionIndexBuilder.java:
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.bti;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.io.tries.IncrementalTrieWriter;
+import org.apache.cassandra.io.tries.Walker;
+import org.apache.cassandra.io.util.FileHandle;
+import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.bytecomparable.ByteComparable;
+
+/**
+ * Partition index builder: stores index or data positions in an incrementally 
built, page aware on-disk trie.
+ *
+ * Not to be used outside of package. Public only for IndexRewriter tool.
+ */
+public class PartitionIndexBuilder implements AutoCloseable
+{
+    private final SequentialWriter writer;
+    private final IncrementalTrieWriter<PartitionIndex.Payload> trieWriter;
+    private final FileHandle.Builder fhBuilder;
+
+    // the last synced data file position
+    private long dataSyncPosition;
+    // the last synced row index file position
+    private long rowIndexSyncPosition;
+    // the last synced partition index file position
+    private long partitionIndexSyncPosition;
+
+    // Partial index can only be used after all three files have been synced 
to the required positions.
+    private long partialIndexDataEnd;
+    private long partialIndexRowEnd;
+    private long partialIndexPartitionEnd;
+    private IncrementalTrieWriter.PartialTail partialIndexTail;
+    private Consumer<PartitionIndex> partialIndexConsumer;
+    private DecoratedKey partialIndexLastKey;
+
+    private int lastDiffPoint;
+    private DecoratedKey firstKey;
+    private DecoratedKey lastKey;
+    private DecoratedKey lastWrittenKey;
+    private PartitionIndex.Payload lastPayload;
+
+    public PartitionIndexBuilder(SequentialWriter writer, FileHandle.Builder 
fhBuilder)
+    {
+        this.writer = writer;
+        this.trieWriter = 
IncrementalTrieWriter.open(PartitionIndex.TRIE_SERIALIZER, writer);
+        this.fhBuilder = fhBuilder;
+    }
+
+    /*
+     * Called when partition index has been flushed to the given position.
+     * If this makes all required positions for a partial view flushed, this 
will call the partialIndexConsumer.
+     */
+    public void markPartitionIndexSynced(long upToPosition)
+    {
+        partitionIndexSyncPosition = upToPosition;
+        refreshReadableBoundary();
+    }
+
+    /*
+     * Called when row index has been flushed to the given position.
+     * If this makes all required positions for a partial view flushed, this 
will call the partialIndexConsumer.
+     */
+    public void markRowIndexSynced(long upToPosition)
+    {
+        rowIndexSyncPosition = upToPosition;
+        refreshReadableBoundary();
+    }
+
+    /*
+     * Called when data file has been flushed to the given position.
+     * If this makes all required positions for a partial view flushed, this 
will call the partialIndexConsumer.
+     */
+    public void markDataSynced(long upToPosition)
+    {
+        dataSyncPosition = upToPosition;
+        refreshReadableBoundary();
+    }
+
+    private void refreshReadableBoundary()
+    {
+        if (partialIndexConsumer == null)
+            return;
+        if (dataSyncPosition < partialIndexDataEnd)
+            return;
+        if (rowIndexSyncPosition < partialIndexRowEnd)
+            return;
+        if (partitionIndexSyncPosition < partialIndexPartitionEnd)
+            return;
+
+        try (FileHandle fh = 
fhBuilder.withLengthOverride(writer.getLastFlushOffset()).complete())
+        {
+            @SuppressWarnings("resource")

Review Comment:
   Done



##########
src/java/org/apache/cassandra/io/sstable/format/bti/PartitionIterator.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.bti;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.sstable.KeyReader;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.FileHandle;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Throwables;
+
+import static 
org.apache.cassandra.utils.FBUtilities.immutableListWithFilteredNulls;
+
+class PartitionIterator extends PartitionIndex.IndexPosIterator implements 
KeyReader
+{
+    private final PartitionIndex partitionIndex;
+    private final IPartitioner partitioner;
+    private final PartitionPosition limit;
+    private final int exclusiveLimit;
+    private final FileHandle dataFile;
+    private final FileHandle rowIndexFile;
+
+    private FileDataInput dataInput;
+    private FileDataInput indexInput;
+
+    private DecoratedKey currentKey;
+    private TrieIndexEntry currentEntry;
+    private DecoratedKey nextKey;
+    private TrieIndexEntry nextEntry;
+
+    @SuppressWarnings("resource")
+    static PartitionIterator create(PartitionIndex partitionIndex, 
IPartitioner partitioner, FileHandle rowIndexFile, FileHandle dataFile,
+                                    PartitionPosition left, int inclusiveLeft, 
PartitionPosition right, int exclusiveRight) throws IOException
+    {
+        PartitionIterator partitionIterator = null;
+        PartitionIndex partitionIndexCopy = null;
+        FileHandle dataFileCopy = null;
+        FileHandle rowIndexFileCopy = null;
+
+        try
+        {
+            partitionIndexCopy = partitionIndex.sharedCopy();
+            dataFileCopy = dataFile.sharedCopy();
+            rowIndexFileCopy = rowIndexFile.sharedCopy();
+
+            partitionIterator = new PartitionIterator(partitionIndexCopy, 
partitioner, rowIndexFileCopy, dataFileCopy, left, right, exclusiveRight);
+
+            partitionIterator.readNext();
+            // Because the index stores prefixes, the first value can be in 
any relationship with the left bound.
+            if (partitionIterator.nextKey != null && 
!(partitionIterator.nextKey.compareTo(left) > inclusiveLeft))
+            {
+                partitionIterator.readNext();
+            }
+            partitionIterator.advance();
+            return partitionIterator;
+        }
+        catch (IOException | RuntimeException ex)
+        {
+            if (partitionIterator != null)
+            {
+                partitionIterator.close();
+            }
+            else
+            {
+                Throwables.closeNonNullAndAddSuppressed(ex, rowIndexFileCopy, 
dataFileCopy, partitionIndexCopy);
+            }
+            throw ex;
+        }
+    }
+
+    static PartitionIterator create(PartitionIndex partitionIndex, 
IPartitioner partitioner, FileHandle rowIndexFile, FileHandle dataFile) throws 
IOException
+    {
+        return create(partitionIndex, partitioner, rowIndexFile, dataFile, 
partitionIndex.firstKey(), -1, partitionIndex.lastKey(), 0);
+    }
+
+    static PartitionIterator empty(PartitionIndex partitionIndex)
+    {
+        return new PartitionIterator(partitionIndex.sharedCopy());
+    }
+
+    private PartitionIterator(PartitionIndex partitionIndex, IPartitioner 
partitioner, FileHandle rowIndexFile, FileHandle dataFile,
+                              PartitionPosition left, PartitionPosition right, 
int exclusiveRight) throws IOException

Review Comment:
   Removed



##########
src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableReaderLoadingBuilder.java:
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable.format.bti;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.KeyReader;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.format.CompressionInfoComponent;
+import org.apache.cassandra.io.sstable.format.FilterComponent;
+import org.apache.cassandra.io.sstable.format.SSTableReaderLoadingBuilder;
+import org.apache.cassandra.io.sstable.format.StatsComponent;
+import org.apache.cassandra.io.sstable.format.bti.BtiFormat.Components;
+import org.apache.cassandra.io.sstable.metadata.MetadataType;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.io.sstable.metadata.ValidationMetadata;
+import org.apache.cassandra.io.util.FileHandle;
+import org.apache.cassandra.metrics.TableMetrics;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.FilterFactory;
+import org.apache.cassandra.utils.IFilter;
+import org.apache.cassandra.utils.Throwables;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class BtiTableReaderLoadingBuilder extends 
SSTableReaderLoadingBuilder<BtiTableReader, BtiTableReader.Builder>
+{
+    private final static Logger logger = 
LoggerFactory.getLogger(BtiTableReaderLoadingBuilder.class);
+
+    private FileHandle.Builder dataFileBuilder;
+    private FileHandle.Builder partitionIndexFileBuilder;
+    private FileHandle.Builder rowIndexFileBuilder;
+
+    public BtiTableReaderLoadingBuilder(SSTable.Builder<?, ?> builder)
+    {
+        super(builder);
+    }
+
+    @Override
+    public KeyReader buildKeyReader(TableMetrics tableMetrics) throws 
IOException
+    {
+        StatsComponent statsComponent = StatsComponent.load(descriptor, 
MetadataType.STATS, MetadataType.HEADER, MetadataType.VALIDATION);
+        return createKeyReader(statsComponent.statsMetadata());
+    }
+
+    private KeyReader createKeyReader(StatsMetadata statsMetadata) throws 
IOException
+    {
+        checkNotNull(statsMetadata);
+
+        try (PartitionIndex index = 
PartitionIndex.load(partitionIndexFileBuilder(), 
tableMetadataRef.getLocal().partitioner, false);
+             CompressionMetadata compressionMetadata = 
CompressionInfoComponent.maybeLoad(descriptor, components);
+             FileHandle dFile = 
dataFileBuilder(statsMetadata).withCompressionMetadata(compressionMetadata).complete();
+             FileHandle riFile = rowIndexFileBuilder().complete())
+        {
+            return PartitionIterator.create(index,
+                                            
tableMetadataRef.getLocal().partitioner,
+                                            riFile,
+                                            dFile);
+        }
+    }
+
+    @Override
+    protected void openComponents(BtiTableReader.Builder builder, 
SSTable.Owner owner, boolean validate, boolean online) throws IOException
+    {
+        try
+        {
+            StatsComponent statsComponent = StatsComponent.load(descriptor, 
MetadataType.STATS, MetadataType.VALIDATION, MetadataType.HEADER);
+            
builder.setSerializationHeader(statsComponent.serializationHeader(builder.getTableMetadataRef().getLocal()));
+            assert !online || builder.getSerializationHeader() != null;
+
+            builder.setStatsMetadata(statsComponent.statsMetadata());
+            ValidationMetadata validationMetadata = 
statsComponent.validationMetadata();
+            validatePartitioner(builder.getTableMetadataRef().getLocal(), 
validationMetadata);
+
+            boolean filterNeeded = online;
+            if (filterNeeded)
+                builder.setFilter(loadFilter(validationMetadata));
+            boolean rebuildFilter = filterNeeded && builder.getFilter() == 
null;
+
+            if (descriptor.version.hasKeyRange())
+            {
+                IPartitioner partitioner = 
tableMetadataRef.getLocal().partitioner;
+                
builder.setFirst(partitioner.decorateKey(builder.getStatsMetadata().firstKey));
+                
builder.setLast(partitioner.decorateKey(builder.getStatsMetadata().lastKey));
+            }
+
+            if (builder.getComponents().contains(Components.PARTITION_INDEX) 
&& builder.getComponents().contains(Components.ROW_INDEX) && rebuildFilter)
+            {
+                @SuppressWarnings("resource")
+                IFilter filter = 
buildBloomFilter(statsComponent.statsMetadata());
+
+                if (filter != null)
+                {
+                    builder.setFilter(filter);
+
+                    if (online)

Review Comment:
   Done



##########
src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableReaderLoadingBuilder.java:
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable.format.bti;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.KeyReader;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.format.CompressionInfoComponent;
+import org.apache.cassandra.io.sstable.format.FilterComponent;
+import org.apache.cassandra.io.sstable.format.SSTableReaderLoadingBuilder;
+import org.apache.cassandra.io.sstable.format.StatsComponent;
+import org.apache.cassandra.io.sstable.format.bti.BtiFormat.Components;
+import org.apache.cassandra.io.sstable.metadata.MetadataType;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.io.sstable.metadata.ValidationMetadata;
+import org.apache.cassandra.io.util.FileHandle;
+import org.apache.cassandra.metrics.TableMetrics;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.FilterFactory;
+import org.apache.cassandra.utils.IFilter;
+import org.apache.cassandra.utils.Throwables;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class BtiTableReaderLoadingBuilder extends 
SSTableReaderLoadingBuilder<BtiTableReader, BtiTableReader.Builder>
+{
+    private final static Logger logger = 
LoggerFactory.getLogger(BtiTableReaderLoadingBuilder.class);
+
+    private FileHandle.Builder dataFileBuilder;
+    private FileHandle.Builder partitionIndexFileBuilder;
+    private FileHandle.Builder rowIndexFileBuilder;
+
+    public BtiTableReaderLoadingBuilder(SSTable.Builder<?, ?> builder)
+    {
+        super(builder);
+    }
+
+    @Override
+    public KeyReader buildKeyReader(TableMetrics tableMetrics) throws 
IOException
+    {
+        StatsComponent statsComponent = StatsComponent.load(descriptor, 
MetadataType.STATS, MetadataType.HEADER, MetadataType.VALIDATION);
+        return createKeyReader(statsComponent.statsMetadata());
+    }
+
+    private KeyReader createKeyReader(StatsMetadata statsMetadata) throws 
IOException
+    {
+        checkNotNull(statsMetadata);
+
+        try (PartitionIndex index = 
PartitionIndex.load(partitionIndexFileBuilder(), 
tableMetadataRef.getLocal().partitioner, false);
+             CompressionMetadata compressionMetadata = 
CompressionInfoComponent.maybeLoad(descriptor, components);
+             FileHandle dFile = 
dataFileBuilder(statsMetadata).withCompressionMetadata(compressionMetadata).complete();
+             FileHandle riFile = rowIndexFileBuilder().complete())
+        {
+            return PartitionIterator.create(index,
+                                            
tableMetadataRef.getLocal().partitioner,
+                                            riFile,
+                                            dFile);
+        }
+    }
+
+    @Override
+    protected void openComponents(BtiTableReader.Builder builder, 
SSTable.Owner owner, boolean validate, boolean online) throws IOException
+    {
+        try
+        {
+            StatsComponent statsComponent = StatsComponent.load(descriptor, 
MetadataType.STATS, MetadataType.VALIDATION, MetadataType.HEADER);
+            
builder.setSerializationHeader(statsComponent.serializationHeader(builder.getTableMetadataRef().getLocal()));
+            assert !online || builder.getSerializationHeader() != null;
+
+            builder.setStatsMetadata(statsComponent.statsMetadata());
+            ValidationMetadata validationMetadata = 
statsComponent.validationMetadata();
+            validatePartitioner(builder.getTableMetadataRef().getLocal(), 
validationMetadata);
+
+            boolean filterNeeded = online;

Review Comment:
   Removed 118



##########
src/java/org/apache/cassandra/io/sstable/format/bti/BtiFormat.md:
##########
@@ -0,0 +1,991 @@
+Big Trie-Indexed (BTI) SSTable format
+-------------------------------------
+
+This document describes the BTI SSTable format, which is introduced to
+Cassandra with 
[CEP-25](https://cwiki.apache.org/confluence/display/CASSANDRA/CEP-25%3A+Trie-indexed+SSTable+format).
+
+The format is called BTI, which stands for "Big Trie-Indexed", because it 
shares
+the data format of the existing BIG format and only changes the primary indexes
+inside SSTables. The format relies on byte order and tries, and uses a 
combination
+of features to make the indexing structures compact and efficient. The 
paragraphs
+below describe the format's features and mechanisms together with the 
motivation 
+behind them, and conclude with detailed description of the on-disk format.
+
+# Prerequisites
+
+## Byte-comparable types
+
+The property of being byte-comparable (also called byte-ordered) for a
+key denotes that there is a serialisation of that key to a sequence of
+bytes where the lexicographic comparison of the unsigned bytes produces
+the same result as performing a typed comparison of the key.
+
+For Cassandra, such a representation is given by
+[CASSANDRA-6936](https://issues.apache.org/jira/browse/CASSANDRA-6936).
+Detailed description of the mechanics of the translation are provided in
+the [included 
documentation](../../../../utils/bytecomparable/ByteComparable.md).
+
+## Tries
+
+A trie is a data structure that describes a mapping between sequences
+and associated values. It is very similar to a deterministic finite
+state automaton, with the main difference that an automaton is allowed
+to have cycles, while a trie is not.
+
+Because the theory and main usage of the structure is for encoding words
+of a language, the trie terminology talks about "characters", "words"
+and "alphabet", which in our case map to bytes of the byte-ordered
+representation, the sequence that encodes it, and the possible values of
+a byte[^1].
+
+[^1]: For simplicity this description assumes we directly map a byte to
+a character. Other options are also possible (e.g. using hex digits
+as the alphabet and two transitions per byte).
+
+A trie can be defined as a tree graph in which vertices are states, some
+of which can be final and contain associated information, and where
+edges are labelled with characters. A valid word in the trie is encoded
+by a path starting from the root of the trie where each edge is labelled
+with the next character of the word, and ending in a final state which
+contains the 'payload' associated with the word.
+
+```mermaid
+graph TD
+  Node_(( ))
+  style Node_ fill:darkgrey
+  Node_ --"a"--> Node_a(((a)))
+    Node_a --"l"--> Node_al(( ))
+      Node_al --"l"--> Node_all(( ))
+        Node_all --"o"--> Node_allo(( ))
+          Node_allo --"w"--> Node_allow(((allow)))
+    Node_a --"n"--> Node_an(((an)))
+      Node_an --"d"--> Node_and(((and)))
+      Node_an --"y"--> Node_any(((any)))
+    Node_a --"r"--> Node_ar(( ))
+      Node_ar --"e"--> Node_are(((are)))
+    Node_a --"s"--> Node_as(((as)))
+  Node_ --"n"--> Node_n(( ))
+    Node_n --"o"--> Node_no(( ))
+      Node_no --"d"--> Node_nod(( ))
+        Node_nod --"e"--> Node_node(((node)))
+  Node_ --"o"--> Node_o(( ))
+    Node_o --"f"--> Node_of(((of)))
+    Node_o --"n"--> Node_on(((on)))
+  Node_ --"t"--> Node_t(( ))
+    Node_t --"h"--> Node_th(( ))
+      Node_th --"e"--> Node_the(((the)))
+      Node_th --"i"--> Node_thi(( ))
+        Node_thi --"s"--> Node_this(((this)))
+    Node_t --"o"--> Node_to(((to)))
+    Node_t --"r"--> Node_tr(( ))
+      Node_tr --"i"--> Node_tri(( ))
+        Node_tri --"e"--> Node_trie(((trie)))
+    Node_t --"y"--> Node_ty(( ))
+      Node_ty --"p"--> Node_typ(( ))
+        Node_typ --"e"--> Node_type(( ))
+          Node_type --"s"--> Node_types(((types)))
+  Node_ --"w"--> Node_w(( ))
+    Node_w --"i"--> Node_wi(( ))
+      Node_wi --"t"--> Node_wit(( ))
+        Node_wit --"h"--> Node_with(((with)))
+          Node_with --"o"--> Node_witho(( ))
+            Node_witho --"u"--> Node_withou(( ))
+              Node_withou --"t"--> Node_without(((without)))
+```
+
+This means that in a constructed trie finding the payload associated
+with a word is a matter of following the edges (also called
+"transitions") from the initial state labelled with the consecutive
+characters of the word, and retrieving the payload associated with the
+state at which we end up. If that's not a final state, or if at any
+point in this we did not find a transition in the trie matching the
+character, the trie does not have an association for the word. The
+complexity of lookup is thus _O_(len(word)) transitions, where the cost of
+taking a transition is usually constant, thus this complexity is
+theoretically optimal.
+
+From a storage space perspective, one of the main benefits of a trie as
+a data structure for storing a map is the fact that it completely avoids
+storing redundant prefixes. All words that start with the same sequence
+store a representation of that sequence only once. If prefixes are
+commonly shared, this can save a great deal of space.
+
+When the items stored in a trie are lexicographically (=byte) ordered, a
+trie is also an ordered structure. A trie can be walked in order and it
+is also possible to efficiently list the items between two given keys.
+
+In fact, one can efficiently (and lazily) apply set algebra over tries,
+and slicing can be seen as a simple application of intersection, where
+the intersecting trie is generated on the fly. The set operations
+benefit from the same prefix-sharing effect &mdash; we apply union /
+intersection / difference to a state, which has the effect of applying
+the operation to all words that share the prefix denoted by that state.
+
+```mermaid
+graph TD
+  Node_(( ))
+  style Node_ fill:darkgrey
+  Node_ --"a"--> Node_a(((a)))
+  style Node_a stroke:lightgrey,color:lightgrey
+  linkStyle 0 stroke:lightgrey,color:lightgrey
+    Node_a --"l"--> Node_al(( ))
+    style Node_al stroke:lightgrey,color:lightgrey
+    linkStyle 1 stroke:lightgrey,color:lightgrey
+      Node_al --"l"--> Node_all(( ))
+      style Node_all stroke:lightgrey,color:lightgrey
+      linkStyle 2 stroke:lightgrey,color:lightgrey
+        Node_all --"o"--> Node_allo(( ))
+        style Node_allo stroke:lightgrey,color:lightgrey
+        linkStyle 3 stroke:lightgrey,color:lightgrey
+          Node_allo --"w"--> Node_allow(((allow)))
+          style Node_allow stroke:lightgrey,color:lightgrey
+          linkStyle 4 stroke:lightgrey,color:lightgrey
+    Node_a --"n"--> Node_an(((an)))
+          style Node_an stroke:lightgrey,color:lightgrey
+          linkStyle 5 stroke:lightgrey,color:lightgrey
+      Node_an --"d"--> Node_and(((and)))
+          style Node_and stroke:lightgrey,color:lightgrey
+          linkStyle 6 stroke:lightgrey,color:lightgrey
+      Node_an --"y"--> Node_any(((any)))
+          style Node_any stroke:lightgrey,color:lightgrey
+          linkStyle 7 stroke:lightgrey,color:lightgrey
+    Node_a --"r"--> Node_ar(( ))
+          style Node_ar stroke:lightgrey,color:lightgrey
+          linkStyle 8 stroke:lightgrey,color:lightgrey
+      Node_ar --"e"--> Node_are(((are)))
+          style Node_are stroke:lightgrey,color:lightgrey
+          linkStyle 9 stroke:lightgrey,color:lightgrey
+    Node_a --"s"--> Node_as(((as)))
+          style Node_as stroke:lightgrey,color:lightgrey
+          linkStyle 10 stroke:lightgrey,color:lightgrey
+  Node_ --"n"--> Node_n(( ))
+    Node_n --"o"--> Node_no(( ))
+      Node_no --"d"--> Node_nod(( ))
+        Node_nod --"e"--> Node_node(((node)))
+  Node_ --"o"--> Node_o(( ))
+    Node_o --"f"--> Node_of(((of)))
+    Node_o --"n"--> Node_on(((on)))
+  Node_ --"t"--> Node_t(( ))
+    Node_t --"h"--> Node_th(( ))
+      Node_th --"e"--> Node_the(((the)))
+      Node_th --"i"--> Node_thi(( ))
+        Node_thi --"s"--> Node_this(((this)))
+          style Node_this stroke:lightgrey,color:lightgrey
+          linkStyle 22 stroke:lightgrey,color:lightgrey
+    Node_t --"o"--> Node_to(((to)))
+          style Node_to stroke:lightgrey,color:lightgrey
+          linkStyle 23 stroke:lightgrey,color:lightgrey
+    Node_t --"r"--> Node_tr(( ))
+          style Node_tr stroke:lightgrey,color:lightgrey
+          linkStyle 24 stroke:lightgrey,color:lightgrey
+      Node_tr --"i"--> Node_tri(( ))
+          style Node_tri stroke:lightgrey,color:lightgrey
+          linkStyle 25 stroke:lightgrey,color:lightgrey
+        Node_tri --"e"--> Node_trie(((trie)))
+          style Node_trie stroke:lightgrey,color:lightgrey
+          linkStyle 26 stroke:lightgrey,color:lightgrey
+    Node_t --"y"--> Node_ty(( ))
+          style Node_ty stroke:lightgrey,color:lightgrey
+          linkStyle 27 stroke:lightgrey,color:lightgrey
+      Node_ty --"p"--> Node_typ(( ))
+          style Node_typ stroke:lightgrey,color:lightgrey
+          linkStyle 28 stroke:lightgrey,color:lightgrey
+        Node_typ --"e"--> Node_type(( ))
+          style Node_type stroke:lightgrey,color:lightgrey
+          linkStyle 29 stroke:lightgrey,color:lightgrey
+          Node_type --"s"--> Node_types(((types)))
+          style Node_types stroke:lightgrey,color:lightgrey
+          linkStyle 30 stroke:lightgrey,color:lightgrey
+  Node_ --"w"--> Node_w(( ))
+          style Node_w stroke:lightgrey,color:lightgrey
+          linkStyle 31 stroke:lightgrey,color:lightgrey
+    Node_w --"i"--> Node_wi(( ))
+          style Node_wi stroke:lightgrey,color:lightgrey
+          linkStyle 32 stroke:lightgrey,color:lightgrey
+      Node_wi --"t"--> Node_wit(( ))
+          style Node_wit stroke:lightgrey,color:lightgrey
+          linkStyle 33 stroke:lightgrey,color:lightgrey
+        Node_wit --"h"--> Node_with(((with)))
+          style Node_with stroke:lightgrey,color:lightgrey
+          linkStyle 34 stroke:lightgrey,color:lightgrey
+          Node_with --"o"--> Node_witho(( ))
+          style Node_witho stroke:lightgrey,color:lightgrey
+          linkStyle 35 stroke:lightgrey,color:lightgrey
+            Node_witho --"u"--> Node_withou(( ))
+          style Node_withou stroke:lightgrey,color:lightgrey
+          linkStyle 36 stroke:lightgrey,color:lightgrey
+              Node_withou --"t"--> Node_without(((without)))
+          style Node_without stroke:lightgrey,color:lightgrey
+          linkStyle 37 stroke:lightgrey,color:lightgrey
+
+```
+
+(An example of slicing the trie above with the range "bit"-"thing".
+Processing only applies on boundary nodes (root, "t", "th", "thi"),
+where we throw away the transitions outside the range. Subtries like the
+ones for "n" and "o" fall completely between "b" and "t" thus are fully
+inside the range and can be processed without any further restrictions.)
+
+A trie can be used as a modifiable in-memory data structure where one
+can add and remove individual elements. It can also be constructed from
+sorted input, incrementally storing the data directly to disk and
+building an efficient read-only on-disk data structure.
+
+For more formal information about the concept and applications of tries
+and finite state automata, try [Introduction to Automata Theory,
+Languages, and Computation](https://books.google.bg/books?id=dVipBwAAQBAJ).

Review Comment:
   Changed to the .com site. If you have a better suggestion on a neutral site 
for the reference, I'd be happy to change it.



##########
src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableReader.java:
##########
@@ -0,0 +1,560 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.bti;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DataRange;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.Slices;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.Rows;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterators;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.IVerifier;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.SSTableReadsListener;
+import org.apache.cassandra.io.sstable.SSTableReadsListener.SelectionReason;
+import org.apache.cassandra.io.sstable.SSTableReadsListener.SkippingReason;
+import org.apache.cassandra.io.sstable.format.SSTableReaderWithFilter;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.FileHandle;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.IFilter;
+import org.apache.cassandra.utils.OutputHandler;
+
+import static org.apache.cassandra.io.sstable.format.SSTableReader.Operator.EQ;
+import static org.apache.cassandra.io.sstable.format.SSTableReader.Operator.GE;
+import static org.apache.cassandra.io.sstable.format.SSTableReader.Operator.GT;
+import static 
org.apache.cassandra.utils.concurrent.SharedCloseable.sharedCopyOrNull;
+
+/**
+ * SSTableReaders are open()ed by Keyspace.onStart; after that they are 
created by SSTableWriter.renameAndOpen.
+ * Do not re-call open() on existing SSTable files; use the references kept by 
ColumnFamilyStore post-start instead.
+ */
+public class BtiTableReader extends SSTableReaderWithFilter
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(BtiTableReader.class);

Review Comment:
   Removed



##########
src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableScrubber.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.bti;
+
+import java.io.IOError;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.db.compaction.CompactionInterruptedException;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterators;
+import org.apache.cassandra.io.sstable.IScrubber;
+import org.apache.cassandra.io.sstable.SSTableRewriter;
+import org.apache.cassandra.io.sstable.format.SortedTableScrubber;
+import org.apache.cassandra.io.sstable.format.bti.BtiFormat.Components;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.OutputHandler;
+import org.apache.cassandra.utils.Throwables;
+
+public class BtiTableScrubber extends SortedTableScrubber<BtiTableReader> 
implements IScrubber
+{
+    private final boolean isIndex;
+    private ScrubPartitionIterator indexIterator;
+
+    public BtiTableScrubber(ColumnFamilyStore cfs,
+                            LifecycleTransaction transaction,
+                            OutputHandler outputHandler,
+                            IScrubber.Options options)
+    {
+        super(cfs, transaction, outputHandler, options);
+
+        boolean hasIndexFile = 
sstable.getComponents().contains(Components.PARTITION_INDEX);
+        this.isIndex = cfs.isIndex();
+        if (!hasIndexFile)
+        {
+            // if there's any corruption in the -Data.db then partitions can't 
be skipped over. but it's worth a shot.
+            outputHandler.warn("Missing index component");
+        }
+
+        try
+        {
+            this.indexIterator = hasIndexFile
+                                 ? openIndexIterator()
+                                 : null;
+        }
+        catch (RuntimeException ex)
+        {
+            outputHandler.warn("Detected corruption in the index file - cannot 
open index iterator", ex);
+        }
+    }
+
+    private ScrubPartitionIterator openIndexIterator()
+    {
+        try
+        {
+            return sstable.scrubPartitionsIterator();
+        }
+        catch (IOException e)
+        {
+            outputHandler.warn("Index is unreadable.");
+        }
+        return null;
+    }
+
+    @Override
+    protected UnfilteredRowIterator withValidation(UnfilteredRowIterator iter, 
String filename)
+    {
+        return options.checkData && !isIndex ? 
UnfilteredRowIterators.withValidation(iter, filename) : iter;
+    }
+
+    public void scrubInternal(SSTableRewriter writer)
+    {
+        assert !indexAvailable() || indexIterator.dataPosition() == 0 : 
indexIterator.dataPosition();
+
+        DecoratedKey prevKey = null;
+
+        while (!dataFile.isEOF())
+        {
+            if (scrubInfo.isStopRequested())
+                throw new 
CompactionInterruptedException(scrubInfo.getCompactionInfo());
+
+            // position in a data file where the partition starts
+            long dataStart = dataFile.getFilePointer();
+            outputHandler.debug("Reading row at %d", dataStart);
+
+            DecoratedKey key = null;
+            Throwable keyReadError = null;
+            try
+            {
+                ByteBuffer raw = ByteBufferUtil.readWithShortLength(dataFile);
+                if (!cfs.metadata.getLocal().isIndex())
+                    cfs.metadata.getLocal().partitionKeyType.validate(raw);
+                key = sstable.decorateKey(raw);
+            }
+            catch (Throwable th)
+            {
+                keyReadError = th;
+                throwIfFatal(th);
+                // check for null key below
+            }
+
+            // position of the partition in a data file, it points to the 
beginning of the partition key
+            long dataStartFromIndex = -1;
+            // size of the partition (including partition key)
+            long dataSizeFromIndex = -1;
+            ByteBuffer currentIndexKey = null;
+            if (indexAvailable())
+            {
+                currentIndexKey = indexIterator.key();
+                dataStartFromIndex = indexIterator.dataPosition();
+                if (!indexIterator.isExhausted())
+                {
+                    try
+                    {
+                        indexIterator.advance();
+                        if (!indexIterator.isExhausted())
+                            dataSizeFromIndex = indexIterator.dataPosition() - 
dataStartFromIndex;
+                    }
+                    catch (Throwable th)
+                    {
+                        throwIfFatal(th);
+                        outputHandler.warn(th, "Failed to advance to the next 
index position. Index is corrupted. " +

Review Comment:
   Done



##########
src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableVerifier.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.bti;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import com.google.common.base.Throwables;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.compaction.CompactionInterruptedException;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.dht.LocalPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.io.sstable.IVerifier;
+import org.apache.cassandra.io.sstable.KeyReader;
+import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SortedTableVerifier;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.OutputHandler;
+
+public class BtiTableVerifier extends SortedTableVerifier<BtiTableReader> 
implements IVerifier
+{
+    public BtiTableVerifier(ColumnFamilyStore cfs, BtiTableReader sstable, 
OutputHandler outputHandler, boolean isOffline, Options options)
+    {
+        super(cfs, sstable, outputHandler, isOffline, options);
+    }
+
+    public void verify()
+    {
+        verifySSTableVersion();
+
+        verifySSTableMetadata();
+
+        verifyIndex();
+
+        verifyBloomFilter();
+
+        if (options.checkOwnsTokens && !isOffline && !(cfs.getPartitioner() 
instanceof LocalPartitioner))
+        {
+            if (verifyOwnedRanges() == 0)
+                return;
+        }
+
+        if (options.quick)
+            return;
+
+        if (verifyDigest() && !options.extendedVerification)
+            return;
+
+        verifySSTable();
+
+        outputHandler.output("Verify of %s succeeded. All %d rows read 
successfully", sstable, goodRows);
+    }
+
+    private void verifySSTable()
+    {
+        long rowStart;
+        outputHandler.output("Extended Verify requested, proceeding to inspect 
values");
+
+        try (VerifyController verifyController = new VerifyController(cfs);
+             KeyReader indexIterator = sstable.keyReader())
+        {
+            if (indexIterator.dataPosition() != 0)
+                markAndThrow(new RuntimeException("First row position from 
index != 0: " + indexIterator.dataPosition()));
+
+            List<Range<Token>> ownedRanges = isOffline ? 
Collections.emptyList() : 
Range.normalize(tokenLookup.apply(cfs.metadata().keyspace));
+            RangeOwnHelper rangeOwnHelper = new RangeOwnHelper(ownedRanges);
+            DecoratedKey prevKey = null;
+
+            while (!dataFile.isEOF())
+            {
+
+                if (verifyInfo.isStopRequested())

Review Comment:
   Moved most functionality into the shared `SortedTableVerifier`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to