blambov commented on code in PR #2267:
URL: https://github.com/apache/cassandra/pull/2267#discussion_r1180073926


##########
src/java/org/apache/cassandra/io/sstable/format/bti/RowIndexReader.java:
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.bti;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.io.tries.SerializationNode;
+import org.apache.cassandra.io.tries.TrieNode;
+import org.apache.cassandra.io.tries.TrieSerializer;
+import org.apache.cassandra.io.tries.Walker;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.FileHandle;
+import org.apache.cassandra.io.util.SizedInts;
+import org.apache.cassandra.utils.bytecomparable.ByteComparable;
+
+/**
+ * Reader class for row index files.
+ *
+ * Row index "tries" do not need to store whole keys, as what we need from 
them is to be able to tell where in the data file
+ * to start looking for a given key. Instead, we store some prefix that is 
greater than the greatest key of the previous
+ * index section and smaller than or equal to the smallest key of the next. So 
for a given key the first index section
+ * that could potentially contain it is given by the trie's floor for that key.
+ *

Review Comment:
   Done



##########
src/java/org/apache/cassandra/io/sstable/format/bti/RowIndexWriter.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.bti;
+
+import java.io.IOException;
+
+import org.apache.cassandra.db.ClusteringComparator;
+import org.apache.cassandra.db.ClusteringPrefix;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.io.sstable.format.bti.RowIndexReader.IndexInfo;
+import org.apache.cassandra.io.tries.IncrementalTrieWriter;
+import org.apache.cassandra.io.tries.Walker;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.bytecomparable.ByteComparable;
+import org.apache.cassandra.utils.bytecomparable.ByteSource;
+
+/**
+ * Preparer / writer of row index tries.
+ * <p>
+ * Uses IncrementalTrieWriter to build a trie of index section separators of 
the shortest possible length such that
+ * prevMax < separator <= nextMin.
+ */
+class RowIndexWriter implements AutoCloseable
+{
+    private final ClusteringComparator comparator;
+    private final IncrementalTrieWriter<IndexInfo> trie;
+    private ByteComparable prevMax = null;
+    private ByteComparable prevSep = null;
+
+    RowIndexWriter(ClusteringComparator comparator, DataOutputPlus out)
+    {
+        this.comparator = comparator;
+        this.trie = IncrementalTrieWriter.open(RowIndexReader.trieSerializer, 
out);
+    }
+
+    void reset()
+    {
+        prevMax = null;
+        prevSep = null;
+        trie.reset();
+    }
+
+    @Override
+    public void close()
+    {
+        trie.close();
+    }
+
+    void add(ClusteringPrefix<?> firstName, ClusteringPrefix<?> lastName, 
IndexInfo info) throws IOException
+    {
+        assert info.openDeletion != null;
+        ByteComparable sep = prevMax == null
+                             ? ByteComparable.EMPTY
+                             : ByteComparable.separatorGt(prevMax, 
comparator.asByteComparable(firstName));
+        trie.add(sep, info);
+        prevSep = sep;
+        prevMax = comparator.asByteComparable(lastName);
+    }
+
+    public long complete(long endPos) throws IOException
+    {
+        // Add a separator after the last section, so that greater inputs can 
be quickly rejected.
+        // To maximize its efficiency we add it with the length of the last 
added separator.
+        int i = 0;
+        ByteSource max = 
prevMax.asComparableBytes(Walker.BYTE_COMPARABLE_VERSION);
+        ByteSource sep = 
prevSep.asComparableBytes(Walker.BYTE_COMPARABLE_VERSION);
+        int c;
+        while ((c = max.next()) == sep.next() && c != ByteSource.END_OF_STREAM)
+            ++i;
+        assert c != ByteSource.END_OF_STREAM : "Corrupted row order, max=" + 
prevMax;
+
+        trie.add(nudge(prevMax, i), new IndexInfo(endPos, DeletionTime.LIVE));
+
+        return trie.complete();
+    }
+
+    /**
+     * Produces a source that is slightly greater than argument with length at 
least nudgeAt.
+     */
+    private ByteComparable nudge(ByteComparable value, int nudgeAt)
+    {
+        return version -> new ByteSource()
+        {
+            private final ByteSource v = value.asComparableBytes(version);
+            private int cur = 0;
+
+            @Override
+            public int next()
+            {
+                int b = ByteSource.END_OF_STREAM;
+                if (cur <= nudgeAt)
+                {
+                    b = v.next();
+                    if (cur == nudgeAt)
+                    {
+                        if (b < 255)

Review Comment:
   I personally prefer not to hide fundamental constants behind identifiers as 
they can mislead and hide errors.
   
   Changed to `0xFF` to make it even more obvious.



##########
src/java/org/apache/cassandra/io/sstable/format/bti/RowIndexReader.java:
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.bti;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.io.tries.SerializationNode;
+import org.apache.cassandra.io.tries.TrieNode;
+import org.apache.cassandra.io.tries.TrieSerializer;
+import org.apache.cassandra.io.tries.Walker;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.FileHandle;
+import org.apache.cassandra.io.util.SizedInts;
+import org.apache.cassandra.utils.bytecomparable.ByteComparable;
+
+/**
+ * Reader class for row index files.
+ *
+ * Row index "tries" do not need to store whole keys, as what we need from 
them is to be able to tell where in the data file
+ * to start looking for a given key. Instead, we store some prefix that is 
greater than the greatest key of the previous
+ * index section and smaller than or equal to the smallest key of the next. So 
for a given key the first index section
+ * that could potentially contain it is given by the trie's floor for that key.
+ *
+ * This builds upon the trie Walker class which provides basic trie walking 
functionality. The class is thread-unsafe
+ * and must be re-instantiated for every thread that needs access to the trie 
(its overhead is below that of a
+ * RandomAccessReader).
+ */
+public class RowIndexReader extends Walker<RowIndexReader>
+{
+    private static final int FLAG_OPEN_MARKER = 8;
+
+    public static class IndexInfo
+    {
+        public final long offset;
+        public final DeletionTime openDeletion;
+
+        IndexInfo(long offset, DeletionTime openDeletion)
+        {
+            this.offset = offset;
+            this.openDeletion = openDeletion;
+        }
+    }
+
+    public RowIndexReader(FileHandle file, long root)
+    {
+        super(file.instantiateRebufferer(null), root);
+    }
+
+    public RowIndexReader(FileHandle file, TrieIndexEntry entry)
+    {
+        this(file, entry.indexTrieRoot);
+    }
+
+    /**
+     * Computes the floor for a given key.
+     */
+    public IndexInfo separatorFloor(ByteComparable key)
+    {
+        // Check for a prefix and find closest smaller branch.
+        IndexInfo res = prefixAndNeighbours(key, RowIndexReader::readPayload);
+        // If there's a prefix, in a separator trie it could be less than, 
equal, or greater than sought value.
+        // Sought value is still greater than max of previous section.
+        // On match the prefix must be used as a starting point.
+        if (res != null)
+            return res;
+
+        // Otherwise return the IndexInfo for the closest entry of the smaller 
branch (which is the max of lesserBranch).
+        // Note (see prefixAndNeighbours): since we accept prefix matches 
above, at this point there cannot be another
+        // prefix match that is closer than max(lesserBranch).
+        if (lesserBranch == -1)

Review Comment:
   Added (also to `TrieNode`).



##########
src/java/org/apache/cassandra/io/sstable/format/bti/RowIndexReader.java:
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.bti;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.io.tries.SerializationNode;
+import org.apache.cassandra.io.tries.TrieNode;
+import org.apache.cassandra.io.tries.TrieSerializer;
+import org.apache.cassandra.io.tries.Walker;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.FileHandle;
+import org.apache.cassandra.io.util.SizedInts;
+import org.apache.cassandra.utils.bytecomparable.ByteComparable;
+
+/**
+ * Reader class for row index files.
+ *
+ * Row index "tries" do not need to store whole keys, as what we need from 
them is to be able to tell where in the data file
+ * to start looking for a given key. Instead, we store some prefix that is 
greater than the greatest key of the previous
+ * index section and smaller than or equal to the smallest key of the next. So 
for a given key the first index section
+ * that could potentially contain it is given by the trie's floor for that key.
+ *
+ * This builds upon the trie Walker class which provides basic trie walking 
functionality. The class is thread-unsafe
+ * and must be re-instantiated for every thread that needs access to the trie 
(its overhead is below that of a
+ * RandomAccessReader).
+ */
+public class RowIndexReader extends Walker<RowIndexReader>
+{
+    private static final int FLAG_OPEN_MARKER = 8;
+
+    public static class IndexInfo
+    {
+        public final long offset;
+        public final DeletionTime openDeletion;
+
+        IndexInfo(long offset, DeletionTime openDeletion)
+        {
+            this.offset = offset;
+            this.openDeletion = openDeletion;
+        }
+    }
+
+    public RowIndexReader(FileHandle file, long root)
+    {
+        super(file.instantiateRebufferer(null), root);
+    }
+
+    public RowIndexReader(FileHandle file, TrieIndexEntry entry)
+    {
+        this(file, entry.indexTrieRoot);
+    }
+
+    /**
+     * Computes the floor for a given key.
+     */
+    public IndexInfo separatorFloor(ByteComparable key)
+    {
+        // Check for a prefix and find closest smaller branch.
+        IndexInfo res = prefixAndNeighbours(key, RowIndexReader::readPayload);
+        // If there's a prefix, in a separator trie it could be less than, 
equal, or greater than sought value.
+        // Sought value is still greater than max of previous section.
+        // On match the prefix must be used as a starting point.
+        if (res != null)
+            return res;
+
+        // Otherwise return the IndexInfo for the closest entry of the smaller 
branch (which is the max of lesserBranch).
+        // Note (see prefixAndNeighbours): since we accept prefix matches 
above, at this point there cannot be another
+        // prefix match that is closer than max(lesserBranch).
+        if (lesserBranch == -1)
+            return null;
+        goMax(lesserBranch);
+        return getCurrentIndexInfo();
+    }
+
+    public IndexInfo min()
+    {
+        goMin(root);
+        return getCurrentIndexInfo();
+    }
+
+    protected IndexInfo getCurrentIndexInfo()
+    {
+        return readPayload(payloadPosition(), payloadFlags());
+    }
+
+    protected IndexInfo readPayload(int ppos, int bits)
+    {
+        return readPayload(buf, ppos, bits);
+    }
+
+    static IndexInfo readPayload(ByteBuffer buf, int ppos, int bits)
+    {
+        long dataOffset;
+        if (bits == 0)
+            return null;
+        int bytes = bits & ~FLAG_OPEN_MARKER;
+        dataOffset = SizedInts.read(buf, ppos, bytes);
+        ppos += bytes;
+        DeletionTime deletion = (bits & FLAG_OPEN_MARKER) != 0
+                ? DeletionTime.serializer.deserialize(buf, ppos)
+                : null;
+        return new IndexInfo(dataOffset, deletion);
+    }
+
+    // The trie serializer describes how the payloads are written. Placed here 
(instead of writer) so that reading and
+    // writing the payload are close together should they need to be changed.
+    static final TrieSerializer<IndexInfo, DataOutputPlus> trieSerializer = 
new TrieSerializer<IndexInfo, DataOutputPlus>()
+    {
+        @Override
+        public int sizeofNode(SerializationNode<IndexInfo> node, long 
nodePosition)
+        {
+            return TrieNode.typeFor(node, nodePosition).sizeofNode(node) + 
sizeof(node.payload());
+        }
+
+        @Override
+        public void write(DataOutputPlus dest, SerializationNode<IndexInfo> 
node, long nodePosition) throws IOException
+        {
+            write(dest, TrieNode.typeFor(node, nodePosition), node, 
nodePosition);
+        }
+
+        public int sizeof(IndexInfo payload)
+        {
+            int size = 0;
+            if (payload != null)
+            {
+                size += SizedInts.nonZeroSize(payload.offset);
+                if (!payload.openDeletion.isLive())
+                    size += 
DeletionTime.serializer.serializedSize(payload.openDeletion);
+            }
+            return size;
+        }
+
+        public void write(DataOutputPlus dest, TrieNode type, 
SerializationNode<IndexInfo> node, long nodePosition) throws IOException
+        {
+            IndexInfo payload = node.payload();
+            int bytes = 0;
+            int hasOpenMarker = 0;
+            if (payload != null)
+            {
+                bytes = SizedInts.nonZeroSize(payload.offset);
+                if (!payload.openDeletion.isLive())
+                    hasOpenMarker = FLAG_OPEN_MARKER;
+            }
+            type.serialize(dest, node, bytes | hasOpenMarker, nodePosition);

Review Comment:
   It is, if the offset is over 32PiB.
   
   Added an assertion to fail with a sensible message if that should ever 
happen.



##########
src/java/org/apache/cassandra/io/sstable/format/bti/RowIndexWriter.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.bti;
+
+import java.io.IOException;
+
+import org.apache.cassandra.db.ClusteringComparator;
+import org.apache.cassandra.db.ClusteringPrefix;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.io.sstable.format.bti.RowIndexReader.IndexInfo;
+import org.apache.cassandra.io.tries.IncrementalTrieWriter;
+import org.apache.cassandra.io.tries.Walker;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.bytecomparable.ByteComparable;
+import org.apache.cassandra.utils.bytecomparable.ByteSource;
+
+/**
+ * Preparer / writer of row index tries.
+ * <p>
+ * Uses IncrementalTrieWriter to build a trie of index section separators of 
the shortest possible length such that

Review Comment:
   Done



##########
src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableWriter.java:
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.bti;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.sstable.AbstractRowIndexEntry;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.format.DataComponent;
+import org.apache.cassandra.io.sstable.format.IndexComponent;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableReader.OpenReason;
+import org.apache.cassandra.io.sstable.format.SortedTableWriter;
+import org.apache.cassandra.io.sstable.format.bti.BtiFormat.Components;
+import org.apache.cassandra.io.util.DataPosition;
+import org.apache.cassandra.io.util.FileHandle;
+import org.apache.cassandra.io.util.MmappedRegionsCache;
+import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Clock;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.IFilter;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.Throwables;
+import org.apache.cassandra.utils.concurrent.Transactional;
+
+import static 
org.apache.cassandra.io.util.FileHandle.Builder.NO_LENGTH_OVERRIDE;
+
+@VisibleForTesting
+public class BtiTableWriter extends SortedTableWriter<BtiFormatPartitionWriter>
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(BtiTableWriter.class);
+
+    private final BtiFormatPartitionWriter partitionWriter;
+    private final IndexWriter iwriter;
+
+    public BtiTableWriter(Builder builder, LifecycleNewTracker 
lifecycleNewTracker, SSTable.Owner owner)
+    {
+        super(builder, lifecycleNewTracker, owner);
+        this.iwriter = builder.getIndexWriter();
+        this.partitionWriter = builder.getPartitionWriter();
+    }
+
+    @Override
+    public void mark()
+    {
+        super.mark();
+        iwriter.mark();
+    }
+
+    @Override
+    public void resetAndTruncate()
+    {
+        super.resetAndTruncate();
+        iwriter.resetAndTruncate();
+    }
+
+    @Override
+    protected TrieIndexEntry createRowIndexEntry(DecoratedKey key, 
DeletionTime partitionLevelDeletion, long finishResult) throws IOException
+    {
+        TrieIndexEntry entry = 
TrieIndexEntry.create(partitionWriter.getInitialPosition(),
+                                                     finishResult,
+                                                     partitionLevelDeletion,
+                                                     
partitionWriter.getRowIndexCount());
+        iwriter.append(key, entry);
+        return entry;
+    }
+
+    @SuppressWarnings("resource")
+    private BtiTableReader openInternal(OpenReason openReason, boolean 
isFinal, Supplier<PartitionIndex> partitionIndexSupplier)
+    {
+        IFilter filter = null;
+        FileHandle dataFile = null;
+        PartitionIndex partitionIndex = null;
+        FileHandle rowIndexFile = null;
+
+        BtiTableReader.Builder builder = unbuildTo(new 
BtiTableReader.Builder(descriptor), true).setMaxDataAge(maxDataAge)
+                                                                               
                 .setSerializationHeader(header)
+                                                                               
                 .setOpenReason(openReason);
+
+        try
+        {
+            builder.setStatsMetadata(statsMetadata());
+
+            partitionIndex = partitionIndexSupplier.get();
+            rowIndexFile = iwriter.rowIndexFHBuilder.complete();
+            dataFile = openDataFile(isFinal ? NO_LENGTH_OVERRIDE : 
dataWriter.getLastFlushOffset(), builder.getStatsMetadata());
+            filter = iwriter.getFilterCopy();
+
+            return builder.setPartitionIndex(partitionIndex)
+                          .setFirst(partitionIndex.firstKey())
+                          .setLast(partitionIndex.lastKey())
+                          .setRowIndexFile(rowIndexFile)
+                          .setDataFile(dataFile)
+                          .setFilter(filter)
+                          .build(owner().orElse(null), true, true);
+        }
+        catch (RuntimeException | Error ex)
+        {
+            JVMStabilityInspector.inspectThrowable(ex);
+            Throwables.closeNonNullAndAddSuppressed(ex, filter, dataFile, 
rowIndexFile, partitionIndex);
+            throw ex;
+        }
+    }
+
+
+    public void openEarly(Consumer<SSTableReader> callWhenReady)
+    {
+        long dataLength = dataWriter.position();
+        iwriter.buildPartial(dataLength, partitionIndex ->
+        {
+            
iwriter.rowIndexFHBuilder.withLengthOverride(iwriter.rowIndexWriter.getLastFlushOffset());
+            BtiTableReader reader = openInternal(OpenReason.EARLY, false, () 
-> partitionIndex);
+            callWhenReady.accept(reader);
+        });
+    }
+
+    public SSTableReader openFinalEarly()
+    {
+        // we must ensure the data is completely flushed to disk
+        iwriter.complete(); // This will be called by 
completedPartitionIndex() below too, but we want it done now to
+        // ensure outstanding openEarly actions are not triggered.
+        dataWriter.sync();
+        iwriter.rowIndexWriter.sync();
+        // Note: Nothing must be written to any of the files after this point, 
as the chunk cache could pick up and
+        // retain a partially-written page (see DB-2446).
+
+        return openFinal(OpenReason.EARLY);
+    }
+
+    @SuppressWarnings("resource")

Review Comment:
   Done



##########
src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableWriter.java:
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.bti;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.sstable.AbstractRowIndexEntry;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.format.DataComponent;
+import org.apache.cassandra.io.sstable.format.IndexComponent;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableReader.OpenReason;
+import org.apache.cassandra.io.sstable.format.SortedTableWriter;
+import org.apache.cassandra.io.sstable.format.bti.BtiFormat.Components;
+import org.apache.cassandra.io.util.DataPosition;
+import org.apache.cassandra.io.util.FileHandle;
+import org.apache.cassandra.io.util.MmappedRegionsCache;
+import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Clock;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.IFilter;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.Throwables;
+import org.apache.cassandra.utils.concurrent.Transactional;
+
+import static 
org.apache.cassandra.io.util.FileHandle.Builder.NO_LENGTH_OVERRIDE;
+
+@VisibleForTesting
+public class BtiTableWriter extends SortedTableWriter<BtiFormatPartitionWriter>
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(BtiTableWriter.class);
+
+    private final BtiFormatPartitionWriter partitionWriter;
+    private final IndexWriter iwriter;
+
+    public BtiTableWriter(Builder builder, LifecycleNewTracker 
lifecycleNewTracker, SSTable.Owner owner)
+    {
+        super(builder, lifecycleNewTracker, owner);
+        this.iwriter = builder.getIndexWriter();
+        this.partitionWriter = builder.getPartitionWriter();
+    }
+
+    @Override
+    public void mark()
+    {
+        super.mark();
+        iwriter.mark();
+    }
+
+    @Override
+    public void resetAndTruncate()
+    {
+        super.resetAndTruncate();
+        iwriter.resetAndTruncate();
+    }
+
+    @Override
+    protected TrieIndexEntry createRowIndexEntry(DecoratedKey key, 
DeletionTime partitionLevelDeletion, long finishResult) throws IOException
+    {
+        TrieIndexEntry entry = 
TrieIndexEntry.create(partitionWriter.getInitialPosition(),
+                                                     finishResult,
+                                                     partitionLevelDeletion,
+                                                     
partitionWriter.getRowIndexCount());
+        iwriter.append(key, entry);
+        return entry;
+    }
+
+    @SuppressWarnings("resource")
+    private BtiTableReader openInternal(OpenReason openReason, boolean 
isFinal, Supplier<PartitionIndex> partitionIndexSupplier)
+    {
+        IFilter filter = null;
+        FileHandle dataFile = null;
+        PartitionIndex partitionIndex = null;
+        FileHandle rowIndexFile = null;
+
+        BtiTableReader.Builder builder = unbuildTo(new 
BtiTableReader.Builder(descriptor), true).setMaxDataAge(maxDataAge)
+                                                                               
                 .setSerializationHeader(header)
+                                                                               
                 .setOpenReason(openReason);
+
+        try
+        {
+            builder.setStatsMetadata(statsMetadata());
+
+            partitionIndex = partitionIndexSupplier.get();
+            rowIndexFile = iwriter.rowIndexFHBuilder.complete();
+            dataFile = openDataFile(isFinal ? NO_LENGTH_OVERRIDE : 
dataWriter.getLastFlushOffset(), builder.getStatsMetadata());
+            filter = iwriter.getFilterCopy();
+
+            return builder.setPartitionIndex(partitionIndex)
+                          .setFirst(partitionIndex.firstKey())
+                          .setLast(partitionIndex.lastKey())
+                          .setRowIndexFile(rowIndexFile)
+                          .setDataFile(dataFile)
+                          .setFilter(filter)
+                          .build(owner().orElse(null), true, true);
+        }
+        catch (RuntimeException | Error ex)
+        {
+            JVMStabilityInspector.inspectThrowable(ex);
+            Throwables.closeNonNullAndAddSuppressed(ex, filter, dataFile, 
rowIndexFile, partitionIndex);
+            throw ex;
+        }
+    }
+
+
+    public void openEarly(Consumer<SSTableReader> callWhenReady)

Review Comment:
   Done



##########
src/java/org/apache/cassandra/io/sstable/format/bti/PartitionIndex.java:
##########
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.bti;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.tries.SerializationNode;
+import org.apache.cassandra.io.tries.TrieNode;
+import org.apache.cassandra.io.tries.TrieSerializer;
+import org.apache.cassandra.io.tries.ValueIterator;
+import org.apache.cassandra.io.tries.Walker;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.FileHandle;
+import org.apache.cassandra.io.util.PageAware;
+import org.apache.cassandra.io.util.Rebufferer;
+import org.apache.cassandra.io.util.SizedInts;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.bytecomparable.ByteSource;
+import org.apache.cassandra.utils.concurrent.Ref;
+import org.apache.cassandra.utils.concurrent.SharedCloseable;
+
+/**
+ * This class holds the partition index as an on-disk trie mapping unique 
prefixes of decorated keys to:
+ *     - data file position if the partition is small enough to not need an 
index
+ *     - row index file position if the partition has a row index
+ * plus
+ *     - the last 8 bits of the key's filter hash which is used to filter out 
mismatched keys without reading the key
+ *
+ * To avoid having to create an object to carry the result, the two are 
distinguished by sign. Direct-to-dfile entries
+ * are recorded as ~position (~ instead of - to differentiate 0 in ifile from 
0 in dfile).
+ *
+ * In either case the contents of the file at this position start with a 
serialization of the key which can be used
+ * to verify the correct key is found.
+ *
+ * To read the index one must obtain a thread-unsafe Reader or 
IndexPosIterator.
+ */
+@VisibleForTesting
+public class PartitionIndex implements SharedCloseable
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(PartitionIndex.class);
+
+    private final FileHandle fh;
+    private final long keyCount;
+    private final DecoratedKey first;
+    private final DecoratedKey last;
+    private final long root;
+
+    public static final long NOT_FOUND = Long.MIN_VALUE;
+    public static final int FOOTER_LENGTH = 3 * 8;
+
+    @VisibleForTesting
+    public PartitionIndex(FileHandle fh, long trieRoot, long keyCount, 
DecoratedKey first, DecoratedKey last)
+    {
+        this.keyCount = keyCount;
+        this.fh = fh.sharedCopy();
+        this.first = first;
+        this.last = last;
+        this.root = trieRoot;
+    }
+
+    private PartitionIndex(PartitionIndex src)
+    {
+        this(src.fh, src.root, src.keyCount, src.first, src.last);
+    }
+
+    static class Payload
+    {
+        final long position;
+        final short hashBits;
+
+        public Payload(long position, short hashBits)
+        {
+            this.position = position;
+            assert this.position != NOT_FOUND;
+            this.hashBits = hashBits;
+        }
+    }
+
+    static final PartitionIndexSerializer TRIE_SERIALIZER = new 
PartitionIndexSerializer();
+
+    private static class PartitionIndexSerializer implements 
TrieSerializer<Payload, DataOutputPlus>
+    {
+        public int sizeofNode(SerializationNode<Payload> node, long 
nodePosition)
+        {
+            return TrieNode.typeFor(node, nodePosition).sizeofNode(node) +
+                   (node.payload() != null ? 1 + 
SizedInts.nonZeroSize(node.payload().position) : 0);
+        }
+
+        @Override
+        public void write(DataOutputPlus dest, SerializationNode<Payload> 
node, long nodePosition) throws IOException
+        {
+            write(dest, TrieNode.typeFor(node, nodePosition), node, 
nodePosition);
+        }
+
+        public void write(DataOutputPlus dest, TrieNode type, 
SerializationNode<Payload> node, long nodePosition) throws IOException
+        {
+            Payload payload = node.payload();
+            if (payload != null)
+            {
+                int payloadBits;
+                int size = SizedInts.nonZeroSize(payload.position);
+                payloadBits = 7 + size;
+                type.serialize(dest, node, payloadBits, nodePosition);
+                dest.writeByte(payload.hashBits);
+                SizedInts.write(dest, payload.position, size);
+            }
+            else
+                type.serialize(dest, node, 0, nodePosition);
+        }
+    }
+
+    public long size()
+    {
+        return keyCount;
+    }
+
+    public DecoratedKey firstKey()
+    {
+        return first;
+    }
+
+    public DecoratedKey lastKey()
+    {
+        return last;
+    }
+
+    @Override
+    public PartitionIndex sharedCopy()
+    {
+        return new PartitionIndex(this);
+    }
+
+    @Override
+    public void addTo(Ref.IdentityCollection identities)
+    {
+        fh.addTo(identities);
+    }
+
+    public static PartitionIndex load(FileHandle.Builder fhBuilder,
+                                      IPartitioner partitioner,
+                                      boolean preload) throws IOException
+    {
+        try (FileHandle fh = fhBuilder.complete())
+        {
+            return load(fh, partitioner, preload);
+        }
+    }
+
+    public static Pair<DecoratedKey, DecoratedKey> readFirstAndLastKey(File 
file, IPartitioner partitioner) throws IOException
+    {
+        try (PartitionIndex index = load(new FileHandle.Builder(file), 
partitioner, false))
+        {
+            return Pair.create(index.firstKey(), index.lastKey());
+        }
+    }
+
+    public static PartitionIndex load(FileHandle fh, IPartitioner partitioner, 
boolean preload) throws IOException
+    {
+        try (FileDataInput rdr = fh.createReader(fh.dataLength() - 
FOOTER_LENGTH))
+        {
+            long firstPos = rdr.readLong();
+            long keyCount = rdr.readLong();
+            long root = rdr.readLong();
+            rdr.seek(firstPos);
+            DecoratedKey first = partitioner != null ? 
partitioner.decorateKey(ByteBufferUtil.readWithShortLength(rdr)) : null;
+            DecoratedKey last = partitioner != null ? 
partitioner.decorateKey(ByteBufferUtil.readWithShortLength(rdr)) : null;
+            if (preload)
+            {
+                int csum = 0;
+                // force a read of all the pages of the index
+                for (long pos = 0; pos < fh.dataLength(); pos += 
PageAware.PAGE_SIZE)
+                {
+                    rdr.seek(pos);
+                    csum += rdr.readByte();
+                }
+                logger.trace("Checksum {}", csum);      // Note: trace is 
required so that reads aren't optimized away.
+            }
+
+            return new PartitionIndex(fh, root, keyCount, first, last);
+        }
+    }
+
+    @Override
+    public void close()
+    {
+        fh.close();
+    }
+
+    @Override
+    public Throwable close(Throwable accumulate)
+    {
+        return fh.close(accumulate);
+    }
+
+    public Reader openReader()
+    {
+        return new Reader(this);
+    }
+
+    protected IndexPosIterator allKeysIterator()
+    {
+        return new IndexPosIterator(this);
+    }
+
+    protected Rebufferer instantiateRebufferer()
+    {
+        return fh.instantiateRebufferer(null);
+    }
+
+
+    /**
+     * @return the file handle to the file on disk. This is needed for locking 
the index in RAM.
+     */
+    FileHandle getFileHandle()
+    {
+        return fh;
+    }
+
+    private static long getIndexPos(ByteBuffer contents, int payloadPos, int 
bytes)
+    {
+        if (bytes > 7)
+        {
+            ++payloadPos;
+            bytes -= 7;
+        }
+        if (bytes == 0)
+            return NOT_FOUND;
+        return SizedInts.read(contents, payloadPos, bytes);
+    }
+
+    public interface Acceptor<ArgType, ResultType>
+    {
+        ResultType accept(long position, boolean assumeNoMatch, ArgType v) 
throws IOException;
+    }
+
+    /**
+     * Provides methods to read the partition index trie.
+     * Thread-unsafe, uses class members to store lookup state.
+     */
+    public static class Reader extends Walker<Reader>

Review Comment:
   Done



##########
src/java/org/apache/cassandra/io/sstable/format/bti/RowIndexReader.java:
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.bti;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.io.tries.SerializationNode;
+import org.apache.cassandra.io.tries.TrieNode;
+import org.apache.cassandra.io.tries.TrieSerializer;
+import org.apache.cassandra.io.tries.Walker;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.FileHandle;
+import org.apache.cassandra.io.util.SizedInts;
+import org.apache.cassandra.utils.bytecomparable.ByteComparable;
+
+/**
+ * Reader class for row index files.
+ *
+ * Row index "tries" do not need to store whole keys, as what we need from 
them is to be able to tell where in the data file
+ * to start looking for a given key. Instead, we store some prefix that is 
greater than the greatest key of the previous
+ * index section and smaller than or equal to the smallest key of the next. So 
for a given key the first index section
+ * that could potentially contain it is given by the trie's floor for that key.
+ *
+ * This builds upon the trie Walker class which provides basic trie walking 
functionality. The class is thread-unsafe
+ * and must be re-instantiated for every thread that needs access to the trie 
(its overhead is below that of a
+ * RandomAccessReader).
+ */
+public class RowIndexReader extends Walker<RowIndexReader>
+{
+    private static final int FLAG_OPEN_MARKER = 8;
+
+    public static class IndexInfo
+    {
+        public final long offset;
+        public final DeletionTime openDeletion;
+
+        IndexInfo(long offset, DeletionTime openDeletion)
+        {
+            this.offset = offset;
+            this.openDeletion = openDeletion;
+        }
+    }
+
+    public RowIndexReader(FileHandle file, long root)
+    {
+        super(file.instantiateRebufferer(null), root);
+    }
+
+    public RowIndexReader(FileHandle file, TrieIndexEntry entry)
+    {
+        this(file, entry.indexTrieRoot);
+    }
+
+    /**
+     * Computes the floor for a given key.
+     */
+    public IndexInfo separatorFloor(ByteComparable key)
+    {
+        // Check for a prefix and find closest smaller branch.
+        IndexInfo res = prefixAndNeighbours(key, RowIndexReader::readPayload);
+        // If there's a prefix, in a separator trie it could be less than, 
equal, or greater than sought value.
+        // Sought value is still greater than max of previous section.
+        // On match the prefix must be used as a starting point.
+        if (res != null)
+            return res;
+
+        // Otherwise return the IndexInfo for the closest entry of the smaller 
branch (which is the max of lesserBranch).
+        // Note (see prefixAndNeighbours): since we accept prefix matches 
above, at this point there cannot be another
+        // prefix match that is closer than max(lesserBranch).
+        if (lesserBranch == -1)
+            return null;
+        goMax(lesserBranch);
+        return getCurrentIndexInfo();
+    }
+
+    public IndexInfo min()
+    {
+        goMin(root);
+        return getCurrentIndexInfo();
+    }
+
+    protected IndexInfo getCurrentIndexInfo()
+    {
+        return readPayload(payloadPosition(), payloadFlags());
+    }
+
+    protected IndexInfo readPayload(int ppos, int bits)
+    {
+        return readPayload(buf, ppos, bits);
+    }
+
+    static IndexInfo readPayload(ByteBuffer buf, int ppos, int bits)
+    {
+        long dataOffset;
+        if (bits == 0)
+            return null;
+        int bytes = bits & ~FLAG_OPEN_MARKER;
+        dataOffset = SizedInts.read(buf, ppos, bytes);
+        ppos += bytes;
+        DeletionTime deletion = (bits & FLAG_OPEN_MARKER) != 0
+                ? DeletionTime.serializer.deserialize(buf, ppos)
+                : null;
+        return new IndexInfo(dataOffset, deletion);
+    }
+
+    // The trie serializer describes how the payloads are written. Placed here 
(instead of writer) so that reading and
+    // writing the payload are close together should they need to be changed.
+    static final TrieSerializer<IndexInfo, DataOutputPlus> trieSerializer = 
new TrieSerializer<IndexInfo, DataOutputPlus>()
+    {
+        @Override
+        public int sizeofNode(SerializationNode<IndexInfo> node, long 
nodePosition)
+        {
+            return TrieNode.typeFor(node, nodePosition).sizeofNode(node) + 
sizeof(node.payload());
+        }
+
+        @Override
+        public void write(DataOutputPlus dest, SerializationNode<IndexInfo> 
node, long nodePosition) throws IOException
+        {
+            write(dest, TrieNode.typeFor(node, nodePosition), node, 
nodePosition);
+        }
+
+        public int sizeof(IndexInfo payload)
+        {
+            int size = 0;
+            if (payload != null)
+            {
+                size += SizedInts.nonZeroSize(payload.offset);
+                if (!payload.openDeletion.isLive())
+                    size += 
DeletionTime.serializer.serializedSize(payload.openDeletion);
+            }
+            return size;
+        }
+
+        public void write(DataOutputPlus dest, TrieNode type, 
SerializationNode<IndexInfo> node, long nodePosition) throws IOException
+        {
+            IndexInfo payload = node.payload();
+            int bytes = 0;
+            int hasOpenMarker = 0;
+            if (payload != null)
+            {
+                bytes = SizedInts.nonZeroSize(payload.offset);
+                if (!payload.openDeletion.isLive())
+                    hasOpenMarker = FLAG_OPEN_MARKER;
+            }
+            type.serialize(dest, node, bytes | hasOpenMarker, nodePosition);
+            if (payload != null)
+            {
+                SizedInts.write(dest, payload.offset, bytes);
+
+                if (hasOpenMarker != 0)

Review Comment:
   Changed



##########
src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableWriter.java:
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.bti;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.sstable.AbstractRowIndexEntry;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.format.DataComponent;
+import org.apache.cassandra.io.sstable.format.IndexComponent;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableReader.OpenReason;
+import org.apache.cassandra.io.sstable.format.SortedTableWriter;
+import org.apache.cassandra.io.sstable.format.bti.BtiFormat.Components;
+import org.apache.cassandra.io.util.DataPosition;
+import org.apache.cassandra.io.util.FileHandle;
+import org.apache.cassandra.io.util.MmappedRegionsCache;
+import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Clock;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.IFilter;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.Throwables;
+import org.apache.cassandra.utils.concurrent.Transactional;
+
+import static 
org.apache.cassandra.io.util.FileHandle.Builder.NO_LENGTH_OVERRIDE;
+
+@VisibleForTesting
+public class BtiTableWriter extends SortedTableWriter<BtiFormatPartitionWriter>
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(BtiTableWriter.class);
+
+    private final BtiFormatPartitionWriter partitionWriter;
+    private final IndexWriter iwriter;
+
+    public BtiTableWriter(Builder builder, LifecycleNewTracker 
lifecycleNewTracker, SSTable.Owner owner)
+    {
+        super(builder, lifecycleNewTracker, owner);
+        this.iwriter = builder.getIndexWriter();
+        this.partitionWriter = builder.getPartitionWriter();
+    }
+
+    @Override
+    public void mark()
+    {
+        super.mark();
+        iwriter.mark();
+    }
+
+    @Override
+    public void resetAndTruncate()
+    {
+        super.resetAndTruncate();
+        iwriter.resetAndTruncate();
+    }
+
+    @Override
+    protected TrieIndexEntry createRowIndexEntry(DecoratedKey key, 
DeletionTime partitionLevelDeletion, long finishResult) throws IOException
+    {
+        TrieIndexEntry entry = 
TrieIndexEntry.create(partitionWriter.getInitialPosition(),
+                                                     finishResult,
+                                                     partitionLevelDeletion,
+                                                     
partitionWriter.getRowIndexCount());
+        iwriter.append(key, entry);
+        return entry;
+    }
+
+    @SuppressWarnings("resource")

Review Comment:
   Done



##########
src/java/org/apache/cassandra/io/sstable/format/bti/RowIndexReader.java:
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.bti;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.io.tries.SerializationNode;
+import org.apache.cassandra.io.tries.TrieNode;
+import org.apache.cassandra.io.tries.TrieSerializer;
+import org.apache.cassandra.io.tries.Walker;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.FileHandle;
+import org.apache.cassandra.io.util.SizedInts;
+import org.apache.cassandra.utils.bytecomparable.ByteComparable;
+
+/**
+ * Reader class for row index files.
+ *

Review Comment:
   Done



##########
src/java/org/apache/cassandra/io/sstable/format/bti/RowIndexReader.java:
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.bti;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.io.tries.SerializationNode;
+import org.apache.cassandra.io.tries.TrieNode;
+import org.apache.cassandra.io.tries.TrieSerializer;
+import org.apache.cassandra.io.tries.Walker;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.FileHandle;
+import org.apache.cassandra.io.util.SizedInts;
+import org.apache.cassandra.utils.bytecomparable.ByteComparable;
+
+/**
+ * Reader class for row index files.
+ *
+ * Row index "tries" do not need to store whole keys, as what we need from 
them is to be able to tell where in the data file
+ * to start looking for a given key. Instead, we store some prefix that is 
greater than the greatest key of the previous
+ * index section and smaller than or equal to the smallest key of the next. So 
for a given key the first index section
+ * that could potentially contain it is given by the trie's floor for that key.
+ *
+ * This builds upon the trie Walker class which provides basic trie walking 
functionality. The class is thread-unsafe
+ * and must be re-instantiated for every thread that needs access to the trie 
(its overhead is below that of a
+ * RandomAccessReader).
+ */
+public class RowIndexReader extends Walker<RowIndexReader>
+{
+    private static final int FLAG_OPEN_MARKER = 8;
+
+    public static class IndexInfo
+    {
+        public final long offset;
+        public final DeletionTime openDeletion;
+
+        IndexInfo(long offset, DeletionTime openDeletion)
+        {
+            this.offset = offset;
+            this.openDeletion = openDeletion;
+        }
+    }
+
+    public RowIndexReader(FileHandle file, long root)
+    {
+        super(file.instantiateRebufferer(null), root);
+    }
+
+    public RowIndexReader(FileHandle file, TrieIndexEntry entry)
+    {
+        this(file, entry.indexTrieRoot);
+    }
+
+    /**
+     * Computes the floor for a given key.
+     */
+    public IndexInfo separatorFloor(ByteComparable key)
+    {
+        // Check for a prefix and find closest smaller branch.
+        IndexInfo res = prefixAndNeighbours(key, RowIndexReader::readPayload);
+        // If there's a prefix, in a separator trie it could be less than, 
equal, or greater than sought value.
+        // Sought value is still greater than max of previous section.
+        // On match the prefix must be used as a starting point.
+        if (res != null)
+            return res;
+
+        // Otherwise return the IndexInfo for the closest entry of the smaller 
branch (which is the max of lesserBranch).
+        // Note (see prefixAndNeighbours): since we accept prefix matches 
above, at this point there cannot be another
+        // prefix match that is closer than max(lesserBranch).
+        if (lesserBranch == -1)
+            return null;
+        goMax(lesserBranch);
+        return getCurrentIndexInfo();
+    }
+
+    public IndexInfo min()
+    {
+        goMin(root);
+        return getCurrentIndexInfo();
+    }
+
+    protected IndexInfo getCurrentIndexInfo()
+    {
+        return readPayload(payloadPosition(), payloadFlags());
+    }
+
+    protected IndexInfo readPayload(int ppos, int bits)
+    {
+        return readPayload(buf, ppos, bits);
+    }
+
+    static IndexInfo readPayload(ByteBuffer buf, int ppos, int bits)
+    {
+        long dataOffset;
+        if (bits == 0)
+            return null;
+        int bytes = bits & ~FLAG_OPEN_MARKER;
+        dataOffset = SizedInts.read(buf, ppos, bytes);
+        ppos += bytes;
+        DeletionTime deletion = (bits & FLAG_OPEN_MARKER) != 0
+                ? DeletionTime.serializer.deserialize(buf, ppos)
+                : null;
+        return new IndexInfo(dataOffset, deletion);
+    }
+
+    // The trie serializer describes how the payloads are written. Placed here 
(instead of writer) so that reading and
+    // writing the payload are close together should they need to be changed.
+    static final TrieSerializer<IndexInfo, DataOutputPlus> trieSerializer = 
new TrieSerializer<IndexInfo, DataOutputPlus>()
+    {
+        @Override
+        public int sizeofNode(SerializationNode<IndexInfo> node, long 
nodePosition)
+        {
+            return TrieNode.typeFor(node, nodePosition).sizeofNode(node) + 
sizeof(node.payload());
+        }
+
+        @Override
+        public void write(DataOutputPlus dest, SerializationNode<IndexInfo> 
node, long nodePosition) throws IOException
+        {
+            write(dest, TrieNode.typeFor(node, nodePosition), node, 
nodePosition);
+        }
+
+        public int sizeof(IndexInfo payload)
+        {
+            int size = 0;
+            if (payload != null)
+            {
+                size += SizedInts.nonZeroSize(payload.offset);
+                if (!payload.openDeletion.isLive())
+                    size += 
DeletionTime.serializer.serializedSize(payload.openDeletion);
+            }
+            return size;
+        }
+
+        public void write(DataOutputPlus dest, TrieNode type, 
SerializationNode<IndexInfo> node, long nodePosition) throws IOException

Review Comment:
   Done



##########
src/java/org/apache/cassandra/io/sstable/format/bti/RowIndexReader.java:
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.bti;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.io.tries.SerializationNode;
+import org.apache.cassandra.io.tries.TrieNode;
+import org.apache.cassandra.io.tries.TrieSerializer;
+import org.apache.cassandra.io.tries.Walker;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.FileHandle;
+import org.apache.cassandra.io.util.SizedInts;
+import org.apache.cassandra.utils.bytecomparable.ByteComparable;
+
+/**
+ * Reader class for row index files.
+ *
+ * Row index "tries" do not need to store whole keys, as what we need from 
them is to be able to tell where in the data file
+ * to start looking for a given key. Instead, we store some prefix that is 
greater than the greatest key of the previous
+ * index section and smaller than or equal to the smallest key of the next. So 
for a given key the first index section
+ * that could potentially contain it is given by the trie's floor for that key.
+ *
+ * This builds upon the trie Walker class which provides basic trie walking 
functionality. The class is thread-unsafe
+ * and must be re-instantiated for every thread that needs access to the trie 
(its overhead is below that of a
+ * RandomAccessReader).
+ */
+public class RowIndexReader extends Walker<RowIndexReader>

Review Comment:
   Done



##########
src/java/org/apache/cassandra/io/sstable/format/bti/RowIndexReader.java:
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.bti;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.io.tries.SerializationNode;
+import org.apache.cassandra.io.tries.TrieNode;
+import org.apache.cassandra.io.tries.TrieSerializer;
+import org.apache.cassandra.io.tries.Walker;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.FileHandle;
+import org.apache.cassandra.io.util.SizedInts;
+import org.apache.cassandra.utils.bytecomparable.ByteComparable;
+
+/**
+ * Reader class for row index files.
+ *
+ * Row index "tries" do not need to store whole keys, as what we need from 
them is to be able to tell where in the data file
+ * to start looking for a given key. Instead, we store some prefix that is 
greater than the greatest key of the previous
+ * index section and smaller than or equal to the smallest key of the next. So 
for a given key the first index section
+ * that could potentially contain it is given by the trie's floor for that key.
+ *
+ * This builds upon the trie Walker class which provides basic trie walking 
functionality. The class is thread-unsafe
+ * and must be re-instantiated for every thread that needs access to the trie 
(its overhead is below that of a
+ * RandomAccessReader).
+ */
+public class RowIndexReader extends Walker<RowIndexReader>
+{
+    private static final int FLAG_OPEN_MARKER = 8;
+
+    public static class IndexInfo
+    {
+        public final long offset;
+        public final DeletionTime openDeletion;
+
+        IndexInfo(long offset, DeletionTime openDeletion)
+        {
+            this.offset = offset;
+            this.openDeletion = openDeletion;
+        }
+    }
+
+    public RowIndexReader(FileHandle file, long root)
+    {
+        super(file.instantiateRebufferer(null), root);
+    }
+
+    public RowIndexReader(FileHandle file, TrieIndexEntry entry)
+    {
+        this(file, entry.indexTrieRoot);
+    }
+
+    /**
+     * Computes the floor for a given key.
+     */
+    public IndexInfo separatorFloor(ByteComparable key)
+    {
+        // Check for a prefix and find closest smaller branch.
+        IndexInfo res = prefixAndNeighbours(key, RowIndexReader::readPayload);
+        // If there's a prefix, in a separator trie it could be less than, 
equal, or greater than sought value.
+        // Sought value is still greater than max of previous section.
+        // On match the prefix must be used as a starting point.
+        if (res != null)
+            return res;
+
+        // Otherwise return the IndexInfo for the closest entry of the smaller 
branch (which is the max of lesserBranch).
+        // Note (see prefixAndNeighbours): since we accept prefix matches 
above, at this point there cannot be another
+        // prefix match that is closer than max(lesserBranch).
+        if (lesserBranch == -1)
+            return null;
+        goMax(lesserBranch);
+        return getCurrentIndexInfo();
+    }
+
+    public IndexInfo min()
+    {
+        goMin(root);
+        return getCurrentIndexInfo();
+    }
+
+    protected IndexInfo getCurrentIndexInfo()
+    {
+        return readPayload(payloadPosition(), payloadFlags());
+    }
+
+    protected IndexInfo readPayload(int ppos, int bits)
+    {
+        return readPayload(buf, ppos, bits);
+    }
+
+    static IndexInfo readPayload(ByteBuffer buf, int ppos, int bits)
+    {
+        long dataOffset;
+        if (bits == 0)
+            return null;
+        int bytes = bits & ~FLAG_OPEN_MARKER;
+        dataOffset = SizedInts.read(buf, ppos, bytes);
+        ppos += bytes;
+        DeletionTime deletion = (bits & FLAG_OPEN_MARKER) != 0
+                ? DeletionTime.serializer.deserialize(buf, ppos)
+                : null;
+        return new IndexInfo(dataOffset, deletion);
+    }
+
+    // The trie serializer describes how the payloads are written. Placed here 
(instead of writer) so that reading and
+    // writing the payload are close together should they need to be changed.
+    static final TrieSerializer<IndexInfo, DataOutputPlus> trieSerializer = 
new TrieSerializer<IndexInfo, DataOutputPlus>()
+    {
+        @Override
+        public int sizeofNode(SerializationNode<IndexInfo> node, long 
nodePosition)
+        {
+            return TrieNode.typeFor(node, nodePosition).sizeofNode(node) + 
sizeof(node.payload());
+        }
+
+        @Override
+        public void write(DataOutputPlus dest, SerializationNode<IndexInfo> 
node, long nodePosition) throws IOException
+        {
+            write(dest, TrieNode.typeFor(node, nodePosition), node, 
nodePosition);
+        }
+
+        public int sizeof(IndexInfo payload)

Review Comment:
   Done



##########
src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableWriter.java:
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.bti;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.sstable.AbstractRowIndexEntry;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.format.DataComponent;
+import org.apache.cassandra.io.sstable.format.IndexComponent;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableReader.OpenReason;
+import org.apache.cassandra.io.sstable.format.SortedTableWriter;
+import org.apache.cassandra.io.sstable.format.bti.BtiFormat.Components;
+import org.apache.cassandra.io.util.DataPosition;
+import org.apache.cassandra.io.util.FileHandle;
+import org.apache.cassandra.io.util.MmappedRegionsCache;
+import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Clock;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.IFilter;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.Throwables;
+import org.apache.cassandra.utils.concurrent.Transactional;
+
+import static 
org.apache.cassandra.io.util.FileHandle.Builder.NO_LENGTH_OVERRIDE;
+
+@VisibleForTesting
+public class BtiTableWriter extends SortedTableWriter<BtiFormatPartitionWriter>

Review Comment:
   Done



##########
src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableWriter.java:
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.bti;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.sstable.AbstractRowIndexEntry;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.format.DataComponent;
+import org.apache.cassandra.io.sstable.format.IndexComponent;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableReader.OpenReason;
+import org.apache.cassandra.io.sstable.format.SortedTableWriter;
+import org.apache.cassandra.io.sstable.format.bti.BtiFormat.Components;
+import org.apache.cassandra.io.util.DataPosition;
+import org.apache.cassandra.io.util.FileHandle;
+import org.apache.cassandra.io.util.MmappedRegionsCache;
+import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Clock;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.IFilter;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.Throwables;
+import org.apache.cassandra.utils.concurrent.Transactional;
+
+import static 
org.apache.cassandra.io.util.FileHandle.Builder.NO_LENGTH_OVERRIDE;
+
+@VisibleForTesting
+public class BtiTableWriter extends SortedTableWriter<BtiFormatPartitionWriter>
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(BtiTableWriter.class);
+
+    private final BtiFormatPartitionWriter partitionWriter;
+    private final IndexWriter iwriter;
+
+    public BtiTableWriter(Builder builder, LifecycleNewTracker 
lifecycleNewTracker, SSTable.Owner owner)
+    {
+        super(builder, lifecycleNewTracker, owner);
+        this.iwriter = builder.getIndexWriter();
+        this.partitionWriter = builder.getPartitionWriter();
+    }
+
+    @Override
+    public void mark()
+    {
+        super.mark();
+        iwriter.mark();
+    }
+
+    @Override
+    public void resetAndTruncate()
+    {
+        super.resetAndTruncate();
+        iwriter.resetAndTruncate();
+    }
+
+    @Override
+    protected TrieIndexEntry createRowIndexEntry(DecoratedKey key, 
DeletionTime partitionLevelDeletion, long finishResult) throws IOException
+    {
+        TrieIndexEntry entry = 
TrieIndexEntry.create(partitionWriter.getInitialPosition(),
+                                                     finishResult,
+                                                     partitionLevelDeletion,
+                                                     
partitionWriter.getRowIndexCount());
+        iwriter.append(key, entry);
+        return entry;
+    }
+
+    @SuppressWarnings("resource")
+    private BtiTableReader openInternal(OpenReason openReason, boolean 
isFinal, Supplier<PartitionIndex> partitionIndexSupplier)
+    {
+        IFilter filter = null;
+        FileHandle dataFile = null;
+        PartitionIndex partitionIndex = null;
+        FileHandle rowIndexFile = null;
+
+        BtiTableReader.Builder builder = unbuildTo(new 
BtiTableReader.Builder(descriptor), true).setMaxDataAge(maxDataAge)
+                                                                               
                 .setSerializationHeader(header)
+                                                                               
                 .setOpenReason(openReason);
+
+        try
+        {
+            builder.setStatsMetadata(statsMetadata());
+
+            partitionIndex = partitionIndexSupplier.get();
+            rowIndexFile = iwriter.rowIndexFHBuilder.complete();
+            dataFile = openDataFile(isFinal ? NO_LENGTH_OVERRIDE : 
dataWriter.getLastFlushOffset(), builder.getStatsMetadata());
+            filter = iwriter.getFilterCopy();
+
+            return builder.setPartitionIndex(partitionIndex)
+                          .setFirst(partitionIndex.firstKey())
+                          .setLast(partitionIndex.lastKey())
+                          .setRowIndexFile(rowIndexFile)
+                          .setDataFile(dataFile)
+                          .setFilter(filter)
+                          .build(owner().orElse(null), true, true);
+        }
+        catch (RuntimeException | Error ex)
+        {
+            JVMStabilityInspector.inspectThrowable(ex);
+            Throwables.closeNonNullAndAddSuppressed(ex, filter, dataFile, 
rowIndexFile, partitionIndex);
+            throw ex;
+        }
+    }
+
+
+    public void openEarly(Consumer<SSTableReader> callWhenReady)
+    {
+        long dataLength = dataWriter.position();
+        iwriter.buildPartial(dataLength, partitionIndex ->
+        {
+            
iwriter.rowIndexFHBuilder.withLengthOverride(iwriter.rowIndexWriter.getLastFlushOffset());
+            BtiTableReader reader = openInternal(OpenReason.EARLY, false, () 
-> partitionIndex);
+            callWhenReady.accept(reader);
+        });
+    }
+
+    public SSTableReader openFinalEarly()
+    {
+        // we must ensure the data is completely flushed to disk
+        iwriter.complete(); // This will be called by 
completedPartitionIndex() below too, but we want it done now to
+        // ensure outstanding openEarly actions are not triggered.
+        dataWriter.sync();
+        iwriter.rowIndexWriter.sync();
+        // Note: Nothing must be written to any of the files after this point, 
as the chunk cache could pick up and
+        // retain a partially-written page (see DB-2446).
+
+        return openFinal(OpenReason.EARLY);
+    }
+
+    @SuppressWarnings("resource")
+    protected SSTableReader openFinal(OpenReason openReason)
+    {
+
+        if (maxDataAge < 0)
+            maxDataAge = Clock.Global.currentTimeMillis();
+
+        return openInternal(openReason, true, 
iwriter::completedPartitionIndex);
+    }
+
+    protected TransactionalProxy txnProxy()
+    {
+        return new TransactionalProxy(() -> 
FBUtilities.immutableListWithFilteredNulls(iwriter, dataWriter));
+    }
+
+    private class TransactionalProxy extends 
SortedTableWriter<BtiFormatPartitionWriter>.TransactionalProxy
+    {
+        public TransactionalProxy(Supplier<ImmutableList<Transactional>> 
transactionals)
+        {
+            super(transactionals);
+        }
+
+        @Override
+        protected Throwable doPostCleanup(Throwable accumulate)
+        {
+            accumulate = Throwables.close(accumulate, partitionWriter);
+            accumulate = super.doPostCleanup(accumulate);
+            return accumulate;
+        }
+    }
+
+    /**
+     * Encapsulates writing the index and filter for an SSTable. The state of 
this object is not valid until it has been closed.
+     */
+    static class IndexWriter extends SortedTableWriter.AbstractIndexWriter
+    {
+        final SequentialWriter rowIndexWriter;
+        private final FileHandle.Builder rowIndexFHBuilder;
+        private final SequentialWriter partitionIndexWriter;
+        private final FileHandle.Builder partitionIndexFHBuilder;
+        private final PartitionIndexBuilder partitionIndex;
+        boolean partitionIndexCompleted = false;
+        private DataPosition riMark;
+        private DataPosition piMark;
+
+        IndexWriter(Builder b)
+        {
+            super(b);
+            rowIndexWriter = new 
SequentialWriter(descriptor.fileFor(Components.ROW_INDEX), 
b.getIOOptions().writerOptions);
+            rowIndexFHBuilder = 
IndexComponent.fileBuilder(Components.ROW_INDEX, 
b).withMmappedRegionsCache(b.getMmappedRegionsCache());
+            partitionIndexWriter = new 
SequentialWriter(descriptor.fileFor(Components.PARTITION_INDEX), 
b.getIOOptions().writerOptions);
+            partitionIndexFHBuilder = 
IndexComponent.fileBuilder(Components.PARTITION_INDEX, 
b).withMmappedRegionsCache(b.getMmappedRegionsCache());
+            partitionIndex = new PartitionIndexBuilder(partitionIndexWriter, 
partitionIndexFHBuilder);
+            // register listeners to be alerted when the data files are flushed
+            partitionIndexWriter.setPostFlushListener(() -> 
partitionIndex.markPartitionIndexSynced(partitionIndexWriter.getLastFlushOffset()));
+            rowIndexWriter.setPostFlushListener(() -> 
partitionIndex.markRowIndexSynced(rowIndexWriter.getLastFlushOffset()));
+            @SuppressWarnings("resource")

Review Comment:
   Done



##########
src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableWriter.java:
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.bti;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.sstable.AbstractRowIndexEntry;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.format.DataComponent;
+import org.apache.cassandra.io.sstable.format.IndexComponent;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableReader.OpenReason;
+import org.apache.cassandra.io.sstable.format.SortedTableWriter;
+import org.apache.cassandra.io.sstable.format.bti.BtiFormat.Components;
+import org.apache.cassandra.io.util.DataPosition;
+import org.apache.cassandra.io.util.FileHandle;
+import org.apache.cassandra.io.util.MmappedRegionsCache;
+import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Clock;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.IFilter;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.Throwables;
+import org.apache.cassandra.utils.concurrent.Transactional;
+
+import static 
org.apache.cassandra.io.util.FileHandle.Builder.NO_LENGTH_OVERRIDE;
+
+@VisibleForTesting
+public class BtiTableWriter extends SortedTableWriter<BtiFormatPartitionWriter>
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(BtiTableWriter.class);
+
+    private final BtiFormatPartitionWriter partitionWriter;
+    private final IndexWriter iwriter;
+
+    public BtiTableWriter(Builder builder, LifecycleNewTracker 
lifecycleNewTracker, SSTable.Owner owner)
+    {
+        super(builder, lifecycleNewTracker, owner);
+        this.iwriter = builder.getIndexWriter();
+        this.partitionWriter = builder.getPartitionWriter();
+    }
+
+    @Override
+    public void mark()
+    {
+        super.mark();
+        iwriter.mark();
+    }
+
+    @Override
+    public void resetAndTruncate()
+    {
+        super.resetAndTruncate();
+        iwriter.resetAndTruncate();
+    }
+
+    @Override
+    protected TrieIndexEntry createRowIndexEntry(DecoratedKey key, 
DeletionTime partitionLevelDeletion, long finishResult) throws IOException
+    {
+        TrieIndexEntry entry = 
TrieIndexEntry.create(partitionWriter.getInitialPosition(),
+                                                     finishResult,
+                                                     partitionLevelDeletion,
+                                                     
partitionWriter.getRowIndexCount());
+        iwriter.append(key, entry);
+        return entry;
+    }
+
+    @SuppressWarnings("resource")
+    private BtiTableReader openInternal(OpenReason openReason, boolean 
isFinal, Supplier<PartitionIndex> partitionIndexSupplier)
+    {
+        IFilter filter = null;
+        FileHandle dataFile = null;
+        PartitionIndex partitionIndex = null;
+        FileHandle rowIndexFile = null;
+
+        BtiTableReader.Builder builder = unbuildTo(new 
BtiTableReader.Builder(descriptor), true).setMaxDataAge(maxDataAge)
+                                                                               
                 .setSerializationHeader(header)
+                                                                               
                 .setOpenReason(openReason);
+
+        try
+        {
+            builder.setStatsMetadata(statsMetadata());
+
+            partitionIndex = partitionIndexSupplier.get();
+            rowIndexFile = iwriter.rowIndexFHBuilder.complete();
+            dataFile = openDataFile(isFinal ? NO_LENGTH_OVERRIDE : 
dataWriter.getLastFlushOffset(), builder.getStatsMetadata());
+            filter = iwriter.getFilterCopy();
+
+            return builder.setPartitionIndex(partitionIndex)
+                          .setFirst(partitionIndex.firstKey())
+                          .setLast(partitionIndex.lastKey())
+                          .setRowIndexFile(rowIndexFile)
+                          .setDataFile(dataFile)
+                          .setFilter(filter)
+                          .build(owner().orElse(null), true, true);
+        }
+        catch (RuntimeException | Error ex)
+        {
+            JVMStabilityInspector.inspectThrowable(ex);
+            Throwables.closeNonNullAndAddSuppressed(ex, filter, dataFile, 
rowIndexFile, partitionIndex);
+            throw ex;
+        }
+    }
+
+
+    public void openEarly(Consumer<SSTableReader> callWhenReady)
+    {
+        long dataLength = dataWriter.position();
+        iwriter.buildPartial(dataLength, partitionIndex ->
+        {
+            
iwriter.rowIndexFHBuilder.withLengthOverride(iwriter.rowIndexWriter.getLastFlushOffset());
+            BtiTableReader reader = openInternal(OpenReason.EARLY, false, () 
-> partitionIndex);
+            callWhenReady.accept(reader);
+        });
+    }
+
+    public SSTableReader openFinalEarly()
+    {
+        // we must ensure the data is completely flushed to disk
+        iwriter.complete(); // This will be called by 
completedPartitionIndex() below too, but we want it done now to
+        // ensure outstanding openEarly actions are not triggered.
+        dataWriter.sync();
+        iwriter.rowIndexWriter.sync();
+        // Note: Nothing must be written to any of the files after this point, 
as the chunk cache could pick up and
+        // retain a partially-written page (see DB-2446).

Review Comment:
   Done



##########
src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableWriter.java:
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.bti;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.sstable.AbstractRowIndexEntry;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.format.DataComponent;
+import org.apache.cassandra.io.sstable.format.IndexComponent;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableReader.OpenReason;
+import org.apache.cassandra.io.sstable.format.SortedTableWriter;
+import org.apache.cassandra.io.sstable.format.bti.BtiFormat.Components;
+import org.apache.cassandra.io.util.DataPosition;
+import org.apache.cassandra.io.util.FileHandle;
+import org.apache.cassandra.io.util.MmappedRegionsCache;
+import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Clock;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.IFilter;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.Throwables;
+import org.apache.cassandra.utils.concurrent.Transactional;
+
+import static 
org.apache.cassandra.io.util.FileHandle.Builder.NO_LENGTH_OVERRIDE;
+
+@VisibleForTesting
+public class BtiTableWriter extends SortedTableWriter<BtiFormatPartitionWriter>
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(BtiTableWriter.class);
+
+    private final BtiFormatPartitionWriter partitionWriter;
+    private final IndexWriter iwriter;
+
+    public BtiTableWriter(Builder builder, LifecycleNewTracker 
lifecycleNewTracker, SSTable.Owner owner)
+    {
+        super(builder, lifecycleNewTracker, owner);
+        this.iwriter = builder.getIndexWriter();
+        this.partitionWriter = builder.getPartitionWriter();
+    }
+
+    @Override
+    public void mark()
+    {
+        super.mark();
+        iwriter.mark();
+    }
+
+    @Override
+    public void resetAndTruncate()
+    {
+        super.resetAndTruncate();
+        iwriter.resetAndTruncate();
+    }
+
+    @Override
+    protected TrieIndexEntry createRowIndexEntry(DecoratedKey key, 
DeletionTime partitionLevelDeletion, long finishResult) throws IOException
+    {
+        TrieIndexEntry entry = 
TrieIndexEntry.create(partitionWriter.getInitialPosition(),
+                                                     finishResult,
+                                                     partitionLevelDeletion,
+                                                     
partitionWriter.getRowIndexCount());
+        iwriter.append(key, entry);
+        return entry;
+    }
+
+    @SuppressWarnings("resource")
+    private BtiTableReader openInternal(OpenReason openReason, boolean 
isFinal, Supplier<PartitionIndex> partitionIndexSupplier)
+    {
+        IFilter filter = null;
+        FileHandle dataFile = null;
+        PartitionIndex partitionIndex = null;
+        FileHandle rowIndexFile = null;
+
+        BtiTableReader.Builder builder = unbuildTo(new 
BtiTableReader.Builder(descriptor), true).setMaxDataAge(maxDataAge)
+                                                                               
                 .setSerializationHeader(header)
+                                                                               
                 .setOpenReason(openReason);
+
+        try
+        {
+            builder.setStatsMetadata(statsMetadata());
+
+            partitionIndex = partitionIndexSupplier.get();
+            rowIndexFile = iwriter.rowIndexFHBuilder.complete();
+            dataFile = openDataFile(isFinal ? NO_LENGTH_OVERRIDE : 
dataWriter.getLastFlushOffset(), builder.getStatsMetadata());
+            filter = iwriter.getFilterCopy();
+
+            return builder.setPartitionIndex(partitionIndex)
+                          .setFirst(partitionIndex.firstKey())
+                          .setLast(partitionIndex.lastKey())
+                          .setRowIndexFile(rowIndexFile)
+                          .setDataFile(dataFile)
+                          .setFilter(filter)
+                          .build(owner().orElse(null), true, true);
+        }
+        catch (RuntimeException | Error ex)
+        {
+            JVMStabilityInspector.inspectThrowable(ex);
+            Throwables.closeNonNullAndAddSuppressed(ex, filter, dataFile, 
rowIndexFile, partitionIndex);
+            throw ex;
+        }
+    }
+
+
+    public void openEarly(Consumer<SSTableReader> callWhenReady)
+    {
+        long dataLength = dataWriter.position();
+        iwriter.buildPartial(dataLength, partitionIndex ->
+        {
+            
iwriter.rowIndexFHBuilder.withLengthOverride(iwriter.rowIndexWriter.getLastFlushOffset());
+            BtiTableReader reader = openInternal(OpenReason.EARLY, false, () 
-> partitionIndex);
+            callWhenReady.accept(reader);
+        });
+    }
+
+    public SSTableReader openFinalEarly()
+    {
+        // we must ensure the data is completely flushed to disk
+        iwriter.complete(); // This will be called by 
completedPartitionIndex() below too, but we want it done now to
+        // ensure outstanding openEarly actions are not triggered.
+        dataWriter.sync();
+        iwriter.rowIndexWriter.sync();
+        // Note: Nothing must be written to any of the files after this point, 
as the chunk cache could pick up and
+        // retain a partially-written page (see DB-2446).
+
+        return openFinal(OpenReason.EARLY);
+    }
+
+    @SuppressWarnings("resource")
+    protected SSTableReader openFinal(OpenReason openReason)

Review Comment:
   Done



##########
src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableWriter.java:
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.bti;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.sstable.AbstractRowIndexEntry;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.format.DataComponent;
+import org.apache.cassandra.io.sstable.format.IndexComponent;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableReader.OpenReason;
+import org.apache.cassandra.io.sstable.format.SortedTableWriter;
+import org.apache.cassandra.io.sstable.format.bti.BtiFormat.Components;
+import org.apache.cassandra.io.util.DataPosition;
+import org.apache.cassandra.io.util.FileHandle;
+import org.apache.cassandra.io.util.MmappedRegionsCache;
+import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Clock;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.IFilter;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.Throwables;
+import org.apache.cassandra.utils.concurrent.Transactional;
+
+import static 
org.apache.cassandra.io.util.FileHandle.Builder.NO_LENGTH_OVERRIDE;
+
+@VisibleForTesting
+public class BtiTableWriter extends SortedTableWriter<BtiFormatPartitionWriter>
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(BtiTableWriter.class);
+
+    private final BtiFormatPartitionWriter partitionWriter;
+    private final IndexWriter iwriter;
+
+    public BtiTableWriter(Builder builder, LifecycleNewTracker 
lifecycleNewTracker, SSTable.Owner owner)
+    {
+        super(builder, lifecycleNewTracker, owner);
+        this.iwriter = builder.getIndexWriter();
+        this.partitionWriter = builder.getPartitionWriter();
+    }
+
+    @Override
+    public void mark()
+    {
+        super.mark();
+        iwriter.mark();
+    }
+
+    @Override
+    public void resetAndTruncate()
+    {
+        super.resetAndTruncate();
+        iwriter.resetAndTruncate();
+    }
+
+    @Override
+    protected TrieIndexEntry createRowIndexEntry(DecoratedKey key, 
DeletionTime partitionLevelDeletion, long finishResult) throws IOException
+    {
+        TrieIndexEntry entry = 
TrieIndexEntry.create(partitionWriter.getInitialPosition(),
+                                                     finishResult,
+                                                     partitionLevelDeletion,
+                                                     
partitionWriter.getRowIndexCount());
+        iwriter.append(key, entry);
+        return entry;
+    }
+
+    @SuppressWarnings("resource")
+    private BtiTableReader openInternal(OpenReason openReason, boolean 
isFinal, Supplier<PartitionIndex> partitionIndexSupplier)
+    {
+        IFilter filter = null;
+        FileHandle dataFile = null;
+        PartitionIndex partitionIndex = null;
+        FileHandle rowIndexFile = null;
+
+        BtiTableReader.Builder builder = unbuildTo(new 
BtiTableReader.Builder(descriptor), true).setMaxDataAge(maxDataAge)
+                                                                               
                 .setSerializationHeader(header)
+                                                                               
                 .setOpenReason(openReason);
+
+        try
+        {
+            builder.setStatsMetadata(statsMetadata());
+
+            partitionIndex = partitionIndexSupplier.get();
+            rowIndexFile = iwriter.rowIndexFHBuilder.complete();
+            dataFile = openDataFile(isFinal ? NO_LENGTH_OVERRIDE : 
dataWriter.getLastFlushOffset(), builder.getStatsMetadata());
+            filter = iwriter.getFilterCopy();
+
+            return builder.setPartitionIndex(partitionIndex)
+                          .setFirst(partitionIndex.firstKey())
+                          .setLast(partitionIndex.lastKey())
+                          .setRowIndexFile(rowIndexFile)
+                          .setDataFile(dataFile)
+                          .setFilter(filter)
+                          .build(owner().orElse(null), true, true);
+        }
+        catch (RuntimeException | Error ex)
+        {
+            JVMStabilityInspector.inspectThrowable(ex);
+            Throwables.closeNonNullAndAddSuppressed(ex, filter, dataFile, 
rowIndexFile, partitionIndex);
+            throw ex;
+        }
+    }
+
+
+    public void openEarly(Consumer<SSTableReader> callWhenReady)
+    {
+        long dataLength = dataWriter.position();
+        iwriter.buildPartial(dataLength, partitionIndex ->
+        {
+            
iwriter.rowIndexFHBuilder.withLengthOverride(iwriter.rowIndexWriter.getLastFlushOffset());
+            BtiTableReader reader = openInternal(OpenReason.EARLY, false, () 
-> partitionIndex);
+            callWhenReady.accept(reader);
+        });
+    }
+
+    public SSTableReader openFinalEarly()
+    {
+        // we must ensure the data is completely flushed to disk
+        iwriter.complete(); // This will be called by 
completedPartitionIndex() below too, but we want it done now to
+        // ensure outstanding openEarly actions are not triggered.
+        dataWriter.sync();
+        iwriter.rowIndexWriter.sync();
+        // Note: Nothing must be written to any of the files after this point, 
as the chunk cache could pick up and
+        // retain a partially-written page (see DB-2446).
+
+        return openFinal(OpenReason.EARLY);
+    }
+
+    @SuppressWarnings("resource")
+    protected SSTableReader openFinal(OpenReason openReason)
+    {
+
+        if (maxDataAge < 0)
+            maxDataAge = Clock.Global.currentTimeMillis();
+
+        return openInternal(openReason, true, 
iwriter::completedPartitionIndex);
+    }
+
+    protected TransactionalProxy txnProxy()

Review Comment:
   Done



##########
src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableWriter.java:
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.bti;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.sstable.AbstractRowIndexEntry;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.format.DataComponent;
+import org.apache.cassandra.io.sstable.format.IndexComponent;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableReader.OpenReason;
+import org.apache.cassandra.io.sstable.format.SortedTableWriter;
+import org.apache.cassandra.io.sstable.format.bti.BtiFormat.Components;
+import org.apache.cassandra.io.util.DataPosition;
+import org.apache.cassandra.io.util.FileHandle;
+import org.apache.cassandra.io.util.MmappedRegionsCache;
+import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Clock;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.IFilter;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.Throwables;
+import org.apache.cassandra.utils.concurrent.Transactional;
+
+import static 
org.apache.cassandra.io.util.FileHandle.Builder.NO_LENGTH_OVERRIDE;
+
+@VisibleForTesting
+public class BtiTableWriter extends SortedTableWriter<BtiFormatPartitionWriter>
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(BtiTableWriter.class);
+
+    private final BtiFormatPartitionWriter partitionWriter;
+    private final IndexWriter iwriter;
+
+    public BtiTableWriter(Builder builder, LifecycleNewTracker 
lifecycleNewTracker, SSTable.Owner owner)
+    {
+        super(builder, lifecycleNewTracker, owner);
+        this.iwriter = builder.getIndexWriter();
+        this.partitionWriter = builder.getPartitionWriter();
+    }
+
+    @Override
+    public void mark()
+    {
+        super.mark();
+        iwriter.mark();
+    }
+
+    @Override
+    public void resetAndTruncate()
+    {
+        super.resetAndTruncate();
+        iwriter.resetAndTruncate();
+    }
+
+    @Override
+    protected TrieIndexEntry createRowIndexEntry(DecoratedKey key, 
DeletionTime partitionLevelDeletion, long finishResult) throws IOException
+    {
+        TrieIndexEntry entry = 
TrieIndexEntry.create(partitionWriter.getInitialPosition(),
+                                                     finishResult,
+                                                     partitionLevelDeletion,
+                                                     
partitionWriter.getRowIndexCount());
+        iwriter.append(key, entry);
+        return entry;
+    }
+
+    @SuppressWarnings("resource")
+    private BtiTableReader openInternal(OpenReason openReason, boolean 
isFinal, Supplier<PartitionIndex> partitionIndexSupplier)
+    {
+        IFilter filter = null;
+        FileHandle dataFile = null;
+        PartitionIndex partitionIndex = null;
+        FileHandle rowIndexFile = null;
+
+        BtiTableReader.Builder builder = unbuildTo(new 
BtiTableReader.Builder(descriptor), true).setMaxDataAge(maxDataAge)
+                                                                               
                 .setSerializationHeader(header)
+                                                                               
                 .setOpenReason(openReason);
+
+        try
+        {
+            builder.setStatsMetadata(statsMetadata());
+
+            partitionIndex = partitionIndexSupplier.get();
+            rowIndexFile = iwriter.rowIndexFHBuilder.complete();
+            dataFile = openDataFile(isFinal ? NO_LENGTH_OVERRIDE : 
dataWriter.getLastFlushOffset(), builder.getStatsMetadata());
+            filter = iwriter.getFilterCopy();
+
+            return builder.setPartitionIndex(partitionIndex)
+                          .setFirst(partitionIndex.firstKey())
+                          .setLast(partitionIndex.lastKey())
+                          .setRowIndexFile(rowIndexFile)
+                          .setDataFile(dataFile)
+                          .setFilter(filter)
+                          .build(owner().orElse(null), true, true);
+        }
+        catch (RuntimeException | Error ex)
+        {
+            JVMStabilityInspector.inspectThrowable(ex);
+            Throwables.closeNonNullAndAddSuppressed(ex, filter, dataFile, 
rowIndexFile, partitionIndex);
+            throw ex;
+        }
+    }
+
+
+    public void openEarly(Consumer<SSTableReader> callWhenReady)
+    {
+        long dataLength = dataWriter.position();
+        iwriter.buildPartial(dataLength, partitionIndex ->
+        {
+            
iwriter.rowIndexFHBuilder.withLengthOverride(iwriter.rowIndexWriter.getLastFlushOffset());
+            BtiTableReader reader = openInternal(OpenReason.EARLY, false, () 
-> partitionIndex);
+            callWhenReady.accept(reader);
+        });
+    }
+
+    public SSTableReader openFinalEarly()

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to