blambov commented on code in PR #2267: URL: https://github.com/apache/cassandra/pull/2267#discussion_r1170988693
########## src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableScanner.java: ########## @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable.format.bti; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterators; + +import org.apache.cassandra.db.DataRange; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.db.filter.ClusteringIndexFilter; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.rows.LazilyInitializedUnfilteredRowIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.AbstractBounds.Boundary; +import org.apache.cassandra.dht.Bounds; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.CorruptSSTableException; +import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.io.sstable.SSTableReadsListener; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.io.util.RandomAccessReader; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.utils.AbstractIterator; +import org.apache.cassandra.utils.CloseableIterator; + +import static org.apache.cassandra.dht.AbstractBounds.isEmpty; +import static org.apache.cassandra.dht.AbstractBounds.maxLeft; +import static org.apache.cassandra.dht.AbstractBounds.minRight; + +public class BtiTableScanner implements ISSTableScanner +{ + private final AtomicBoolean isClosed = new AtomicBoolean(false); + protected final RandomAccessReader dfile; + public final BtiTableReader sstable; + + private final Iterator<AbstractBounds<PartitionPosition>> rangeIterator; + + private final ColumnFilter columns; + private final DataRange dataRange; + private final SSTableReadsListener listener; + private long startScan = -1; + private long bytesScanned = 0; + + protected CloseableIterator<UnfilteredRowIterator> iterator; + + // Full scan of the sstables + public static ISSTableScanner getScanner(BtiTableReader sstable) + { + return getScanner(sstable, Iterators.singletonIterator(fullRange(sstable))); + } + + public static ISSTableScanner getScanner(BtiTableReader sstable, + ColumnFilter columns, + DataRange dataRange, + SSTableReadsListener listener) + { + return new BtiTableScanner(sstable, columns, dataRange, makeBounds(sstable, dataRange).iterator(), listener); + } + + public static ISSTableScanner getScanner(BtiTableReader sstable, Collection<Range<Token>> tokenRanges) + { + return getScanner(sstable, makeBounds(sstable, tokenRanges).iterator()); + } + + public static ISSTableScanner getScanner(BtiTableReader sstable, Iterator<AbstractBounds<PartitionPosition>> rangeIterator) + { + return new BtiTableScanner(sstable, ColumnFilter.all(sstable.metadata()), null, rangeIterator, SSTableReadsListener.NOOP_LISTENER); + } + + private BtiTableScanner(BtiTableReader sstable, + ColumnFilter columns, + DataRange dataRange, + Iterator<AbstractBounds<PartitionPosition>> rangeIterator, + SSTableReadsListener listener) + { + assert sstable != null; + + this.dfile = sstable.openDataReader(); + this.sstable = sstable; + this.columns = columns; + this.dataRange = dataRange; + this.rangeIterator = rangeIterator; + this.listener = listener; + } + + public static List<AbstractBounds<PartitionPosition>> makeBounds(SSTableReader sstable, Collection<Range<Token>> tokenRanges) + { + List<AbstractBounds<PartitionPosition>> boundsList = new ArrayList<>(tokenRanges.size()); + for (Range<Token> range : Range.normalize(tokenRanges)) + addRange(sstable, Range.makeRowRange(range), boundsList); + return boundsList; + } + + static List<AbstractBounds<PartitionPosition>> makeBounds(SSTableReader sstable, DataRange dataRange) + { + List<AbstractBounds<PartitionPosition>> boundsList = new ArrayList<>(2); + addRange(sstable, dataRange.keyRange(), boundsList); + return boundsList; + } + + static AbstractBounds<PartitionPosition> fullRange(SSTableReader sstable) + { + return new Bounds<>(sstable.first, sstable.last); + } + + private static void addRange(SSTableReader sstable, AbstractBounds<PartitionPosition> requested, List<AbstractBounds<PartitionPosition>> boundsList) + { + if (requested instanceof Range && ((Range<?>) requested).isWrapAround()) + { + if (requested.right.compareTo(sstable.first) >= 0) + { + // since we wrap, we must contain the whole sstable prior to stopKey() + Boundary<PartitionPosition> left = new Boundary<>(sstable.first, true); + Boundary<PartitionPosition> right; + right = requested.rightBoundary(); + right = minRight(right, sstable.last, true); + if (!isEmpty(left, right)) + boundsList.add(AbstractBounds.bounds(left, right)); + } + if (requested.left.compareTo(sstable.last) <= 0) + { + // since we wrap, we must contain the whole sstable after dataRange.startKey() + Boundary<PartitionPosition> right = new Boundary<>(sstable.last, true); + Boundary<PartitionPosition> left; + left = requested.leftBoundary(); + left = maxLeft(left, sstable.first, true); + if (!isEmpty(left, right)) + boundsList.add(AbstractBounds.bounds(left, right)); + } + } + else + { + assert !AbstractBounds.strictlyWrapsAround(requested.left, requested.right); + Boundary<PartitionPosition> left, right; + left = requested.leftBoundary(); + right = requested.rightBoundary(); + left = maxLeft(left, sstable.first, true); + // apparently isWrapAround() doesn't count Bounds that extend to the limit (min) as wrapping + right = requested.right.isMinimum() ? new Boundary<>(sstable.last, true) + : minRight(right, sstable.last, true); + if (!isEmpty(left, right)) + boundsList.add(AbstractBounds.bounds(left, right)); + } + } + + public void close() + { + try + { + if (isClosed.compareAndSet(false, true)) + { + FileUtils.close(dfile); + if (iterator != null) + iterator.close(); + } + } + catch (IOException e) + { + sstable.markSuspect(); + throw new CorruptSSTableException(e, sstable.getFilename()); + } + } + + public long getBytesScanned() + { + return bytesScanned; + } + + @Override + public long getLengthInBytes() + { + return sstable.uncompressedLength(); + } + + + public long getCompressedLengthInBytes() + { + return sstable.onDiskLength(); + } + + @Override + public long getCurrentPosition() + { + return dfile.getFilePointer(); + } + + @Override + public Set<SSTableReader> getBackingSSTables() + { + return ImmutableSet.of(sstable); + } + + public int level() Review Comment: Removed. ########## src/java/org/apache/cassandra/io/sstable/format/bti/PartitionIndexBuilder.java: ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable.format.bti; + +import java.io.IOException; +import java.util.function.Consumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.io.tries.IncrementalTrieWriter; +import org.apache.cassandra.io.tries.Walker; +import org.apache.cassandra.io.util.FileHandle; +import org.apache.cassandra.io.util.SequentialWriter; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.bytecomparable.ByteComparable; + +/** + * Partition index builder: stores index or data positions in an incrementally built, page aware on-disk trie. + * + * Not to be used outside of package. Public only for IndexRewriter tool. + */ +public class PartitionIndexBuilder implements AutoCloseable +{ + private final SequentialWriter writer; + private final IncrementalTrieWriter<PartitionIndex.Payload> trieWriter; + private final FileHandle.Builder fhBuilder; + + // the last synced data file position + private long dataSyncPosition; + // the last synced row index file position + private long rowIndexSyncPosition; + // the last synced partition index file position + private long partitionIndexSyncPosition; + + // Partial index can only be used after all three files have been synced to the required positions. + private long partialIndexDataEnd; + private long partialIndexRowEnd; + private long partialIndexPartitionEnd; + private IncrementalTrieWriter.PartialTail partialIndexTail; + private Consumer<PartitionIndex> partialIndexConsumer; + private DecoratedKey partialIndexLastKey; + + private int lastDiffPoint; + private DecoratedKey firstKey; + private DecoratedKey lastKey; + private DecoratedKey lastWrittenKey; + private PartitionIndex.Payload lastPayload; + + public PartitionIndexBuilder(SequentialWriter writer, FileHandle.Builder fhBuilder) + { + this.writer = writer; + this.trieWriter = IncrementalTrieWriter.open(PartitionIndex.TRIE_SERIALIZER, writer); + this.fhBuilder = fhBuilder; + } + + /* + * Called when partition index has been flushed to the given position. + * If this makes all required positions for a partial view flushed, this will call the partialIndexConsumer. + */ + public void markPartitionIndexSynced(long upToPosition) + { + partitionIndexSyncPosition = upToPosition; + refreshReadableBoundary(); + } + + /* + * Called when row index has been flushed to the given position. + * If this makes all required positions for a partial view flushed, this will call the partialIndexConsumer. + */ + public void markRowIndexSynced(long upToPosition) + { + rowIndexSyncPosition = upToPosition; + refreshReadableBoundary(); + } + + /* + * Called when data file has been flushed to the given position. + * If this makes all required positions for a partial view flushed, this will call the partialIndexConsumer. + */ + public void markDataSynced(long upToPosition) + { + dataSyncPosition = upToPosition; + refreshReadableBoundary(); + } + + private void refreshReadableBoundary() + { + if (partialIndexConsumer == null) + return; + if (dataSyncPosition < partialIndexDataEnd) + return; + if (rowIndexSyncPosition < partialIndexRowEnd) + return; + if (partitionIndexSyncPosition < partialIndexPartitionEnd) + return; + + try (FileHandle fh = fhBuilder.withLengthOverride(writer.getLastFlushOffset()).complete()) + { + @SuppressWarnings("resource") + PartitionIndex pi = new PartitionIndexEarly(fh, partialIndexTail.root(), partialIndexTail.count(), firstKey, partialIndexLastKey, partialIndexTail.cutoff(), partialIndexTail.tail()); + partialIndexConsumer.accept(pi); + partialIndexConsumer = null; + } + finally + { + fhBuilder.withLengthOverride(-1); + } + + } + + private final static Logger logger = LoggerFactory.getLogger(PartitionIndexBuilder.class); Review Comment: Removed ########## src/java/org/apache/cassandra/io/sstable/format/bti/PartitionIndexBuilder.java: ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable.format.bti; + +import java.io.IOException; +import java.util.function.Consumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.io.tries.IncrementalTrieWriter; +import org.apache.cassandra.io.tries.Walker; +import org.apache.cassandra.io.util.FileHandle; +import org.apache.cassandra.io.util.SequentialWriter; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.bytecomparable.ByteComparable; + +/** + * Partition index builder: stores index or data positions in an incrementally built, page aware on-disk trie. + * Review Comment: Removed latter part (`IndexRewriter` does not exist) and made class package-private. ########## src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableReaderLoadingBuilder.java: ########## @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.sstable.format.bti; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.io.compress.CompressionMetadata; +import org.apache.cassandra.io.sstable.KeyReader; +import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.io.sstable.format.CompressionInfoComponent; +import org.apache.cassandra.io.sstable.format.FilterComponent; +import org.apache.cassandra.io.sstable.format.SSTableReaderLoadingBuilder; +import org.apache.cassandra.io.sstable.format.StatsComponent; +import org.apache.cassandra.io.sstable.format.bti.BtiFormat.Components; +import org.apache.cassandra.io.sstable.metadata.MetadataType; +import org.apache.cassandra.io.sstable.metadata.StatsMetadata; +import org.apache.cassandra.io.sstable.metadata.ValidationMetadata; +import org.apache.cassandra.io.util.FileHandle; +import org.apache.cassandra.metrics.TableMetrics; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.FilterFactory; +import org.apache.cassandra.utils.IFilter; +import org.apache.cassandra.utils.Throwables; + +import static com.google.common.base.Preconditions.checkNotNull; + +public class BtiTableReaderLoadingBuilder extends SSTableReaderLoadingBuilder<BtiTableReader, BtiTableReader.Builder> +{ + private final static Logger logger = LoggerFactory.getLogger(BtiTableReaderLoadingBuilder.class); + + private FileHandle.Builder dataFileBuilder; + private FileHandle.Builder partitionIndexFileBuilder; + private FileHandle.Builder rowIndexFileBuilder; + + public BtiTableReaderLoadingBuilder(SSTable.Builder<?, ?> builder) + { + super(builder); + } + + @Override + public KeyReader buildKeyReader(TableMetrics tableMetrics) throws IOException + { + StatsComponent statsComponent = StatsComponent.load(descriptor, MetadataType.STATS, MetadataType.HEADER, MetadataType.VALIDATION); + return createKeyReader(statsComponent.statsMetadata()); + } + + private KeyReader createKeyReader(StatsMetadata statsMetadata) throws IOException + { + checkNotNull(statsMetadata); + + try (PartitionIndex index = PartitionIndex.load(partitionIndexFileBuilder(), tableMetadataRef.getLocal().partitioner, false); + CompressionMetadata compressionMetadata = CompressionInfoComponent.maybeLoad(descriptor, components); + FileHandle dFile = dataFileBuilder(statsMetadata).withCompressionMetadata(compressionMetadata).complete(); + FileHandle riFile = rowIndexFileBuilder().complete()) + { + return PartitionIterator.create(index, + tableMetadataRef.getLocal().partitioner, + riFile, + dFile); + } + } + + @Override + protected void openComponents(BtiTableReader.Builder builder, SSTable.Owner owner, boolean validate, boolean online) throws IOException + { + try + { + StatsComponent statsComponent = StatsComponent.load(descriptor, MetadataType.STATS, MetadataType.VALIDATION, MetadataType.HEADER); + builder.setSerializationHeader(statsComponent.serializationHeader(builder.getTableMetadataRef().getLocal())); + assert !online || builder.getSerializationHeader() != null; + + builder.setStatsMetadata(statsComponent.statsMetadata()); + ValidationMetadata validationMetadata = statsComponent.validationMetadata(); + validatePartitioner(builder.getTableMetadataRef().getLocal(), validationMetadata); + + boolean filterNeeded = online; + if (filterNeeded) + builder.setFilter(loadFilter(validationMetadata)); + boolean rebuildFilter = filterNeeded && builder.getFilter() == null; + + if (descriptor.version.hasKeyRange()) + { + IPartitioner partitioner = tableMetadataRef.getLocal().partitioner; + builder.setFirst(partitioner.decorateKey(builder.getStatsMetadata().firstKey)); + builder.setLast(partitioner.decorateKey(builder.getStatsMetadata().lastKey)); + } + + if (builder.getComponents().contains(Components.PARTITION_INDEX) && builder.getComponents().contains(Components.ROW_INDEX) && rebuildFilter) + { + @SuppressWarnings("resource") + IFilter filter = buildBloomFilter(statsComponent.statsMetadata()); + + if (filter != null) + { + builder.setFilter(filter); + + if (online) + FilterComponent.save(filter, descriptor, false); + } + } + + if (builder.getFilter() == null) + builder.setFilter(FilterFactory.AlwaysPresent); + + if (builder.getComponents().contains(Components.ROW_INDEX)) + builder.setRowIndexFile(rowIndexFileBuilder().complete()); + + if (builder.getComponents().contains(Components.PARTITION_INDEX)) + { + builder.setPartitionIndex(openPartitionIndex(builder.getFilter().isInformative())); + builder.setFirst(builder.getPartitionIndex().firstKey()); + builder.setLast(builder.getPartitionIndex().lastKey()); + } + + try (CompressionMetadata compressionMetadata = CompressionInfoComponent.maybeLoad(descriptor, components)) + { + builder.setDataFile(dataFileBuilder(builder.getStatsMetadata()).withCompressionMetadata(compressionMetadata).complete()); + } + } + catch (IOException | RuntimeException | Error ex) + { + // in case of failure, close only those components which have been opened in this try-catch block + Throwables.closeAndAddSuppressed(ex, builder.getPartitionIndex(), builder.getRowIndexFile(), builder.getDataFile(), builder.getFilter()); + throw ex; + } + } + + private IFilter buildBloomFilter(StatsMetadata statsMetadata) throws IOException + { + IFilter bf = null; + + try (KeyReader keyReader = createKeyReader(statsMetadata)) + { + if (keyReader == null) Review Comment: Removed ########## src/java/org/apache/cassandra/io/tries/ReverseValueIterator.java: ########## @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.tries; + +import org.apache.cassandra.io.util.Rebufferer; +import org.apache.cassandra.utils.bytecomparable.ByteComparable; +import org.apache.cassandra.utils.bytecomparable.ByteSource; + +/** + * Thread-unsafe reverse value iterator for on-disk tries. Uses the assumptions of Walker. + */ +public class ReverseValueIterator<Concrete extends ReverseValueIterator<Concrete>> extends Walker<Concrete> +{ + private final ByteSource limit; + private IterationPosition stack; + private long next; + private boolean reportingPrefixes; + + static class IterationPosition + { + long node; + int childIndex; + int limit; Review Comment: Done ########## src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableReader.java: ########## @@ -0,0 +1,560 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable.format.bti; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DataRange; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.db.Slices; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.rows.Rows; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIterators; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.CorruptSSTableException; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.io.sstable.IVerifier; +import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.io.sstable.SSTableReadsListener; +import org.apache.cassandra.io.sstable.SSTableReadsListener.SelectionReason; +import org.apache.cassandra.io.sstable.SSTableReadsListener.SkippingReason; +import org.apache.cassandra.io.sstable.format.SSTableReaderWithFilter; +import org.apache.cassandra.io.util.FileDataInput; +import org.apache.cassandra.io.util.FileHandle; +import org.apache.cassandra.io.util.RandomAccessReader; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.IFilter; +import org.apache.cassandra.utils.OutputHandler; + +import static org.apache.cassandra.io.sstable.format.SSTableReader.Operator.EQ; +import static org.apache.cassandra.io.sstable.format.SSTableReader.Operator.GE; +import static org.apache.cassandra.io.sstable.format.SSTableReader.Operator.GT; +import static org.apache.cassandra.utils.concurrent.SharedCloseable.sharedCopyOrNull; + +/** + * SSTableReaders are open()ed by Keyspace.onStart; after that they are created by SSTableWriter.renameAndOpen. + * Do not re-call open() on existing SSTable files; use the references kept by ColumnFamilyStore post-start instead. + */ +public class BtiTableReader extends SSTableReaderWithFilter +{ + private static final Logger logger = LoggerFactory.getLogger(BtiTableReader.class); + + private final FileHandle rowIndexFile; + private final PartitionIndex partitionIndex; + + public BtiTableReader(Builder builder, SSTable.Owner owner) + { + super(builder, owner); + this.rowIndexFile = builder.getRowIndexFile(); + this.partitionIndex = builder.getPartitionIndex(); + } + + protected final Builder unbuildTo(Builder builder, boolean sharedCopy) + { + Builder b = super.unbuildTo(builder, sharedCopy); Review Comment: Hmm. This seems to indicate a problem with the hierarchy -- it seems you can get different results depending on what type of reference you call `unbuildTo` on, as the methods don't override one another. @jacek-lewandowski, do you think there's a way to make this safer, or should we rename the methods to e.g. `unbuildBtiReaderTo` etc.? ########## src/java/org/apache/cassandra/io/tries/ReverseValueIterator.java: ########## @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.tries; + +import org.apache.cassandra.io.util.Rebufferer; +import org.apache.cassandra.utils.bytecomparable.ByteComparable; +import org.apache.cassandra.utils.bytecomparable.ByteSource; + +/** + * Thread-unsafe reverse value iterator for on-disk tries. Uses the assumptions of Walker. + */ +public class ReverseValueIterator<Concrete extends ReverseValueIterator<Concrete>> extends Walker<Concrete> +{ + private final ByteSource limit; + private IterationPosition stack; + private long next; + private boolean reportingPrefixes; + + static class IterationPosition + { + long node; Review Comment: Done ########## src/java/org/apache/cassandra/io/tries/ReverseValueIterator.java: ########## @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.tries; + +import org.apache.cassandra.io.util.Rebufferer; +import org.apache.cassandra.utils.bytecomparable.ByteComparable; +import org.apache.cassandra.utils.bytecomparable.ByteSource; + +/** + * Thread-unsafe reverse value iterator for on-disk tries. Uses the assumptions of Walker. + */ +public class ReverseValueIterator<Concrete extends ReverseValueIterator<Concrete>> extends Walker<Concrete> +{ + private final ByteSource limit; + private IterationPosition stack; + private long next; + private boolean reportingPrefixes; + + static class IterationPosition + { + long node; + int childIndex; + int limit; + IterationPosition prev; Review Comment: Done ########## src/java/org/apache/cassandra/io/tries/IncrementalDeepTrieWriterPageAware.java: ########## @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.tries; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.cassandra.io.util.DataOutputPlus; + +/** + * This class is a variant of {@link IncrementalTrieWriterPageAware} which is able to build even very deep + * tries. While the parent class uses recursion for clarity, it may end up with stack overflow for tries with + * very long keys. This implementation can switch processing from stack to heap at a certain depth (provided + * as a constructor param). + */ +public class IncrementalDeepTrieWriterPageAware<VALUE> extends IncrementalTrieWriterPageAware<VALUE> +{ + private final int maxRecursionDepth; + + public IncrementalDeepTrieWriterPageAware(TrieSerializer<VALUE, ? super DataOutputPlus> trieSerializer, DataOutputPlus dest, int maxRecursionDepth) + { + super(trieSerializer, dest); + this.maxRecursionDepth = maxRecursionDepth; + } + + public IncrementalDeepTrieWriterPageAware(TrieSerializer<VALUE, ? super DataOutputPlus> trieSerializer, DataOutputPlus dest) + { + this(trieSerializer, dest, 64); + } + + @Override + protected int recalcTotalSize(Node<VALUE> node, long nodePosition) throws IOException + { + return recalcTotalSizeRecursiveOnStack(node, nodePosition, 0); + } + + private int recalcTotalSizeRecursiveOnStack(Node<VALUE> node, long nodePosition, int depth) throws IOException + { + if (node.hasOutOfPageInBranch) + { + int sz = 0; + for (Node<VALUE> child : node.children) + { + if (depth < maxRecursionDepth) + sz += recalcTotalSizeRecursiveOnStack(child, nodePosition + sz, depth + 1); + else + sz += recalcTotalSizeRecursiveOnHeap(child, nodePosition + sz); + } + node.branchSize = sz; + } + + // The sizing below will use the branch size calculated above. Since that can change on out-of-page in branch, + // we need to recalculate the size if either flag is set. + if (node.hasOutOfPageChildren || node.hasOutOfPageInBranch) + node.nodeSize = serializer.sizeofNode(node, nodePosition + node.branchSize); + + return node.branchSize + node.nodeSize; + } + + @Override + protected long write(Node<VALUE> node) throws IOException + { + return writeRecursiveOnStack(node, 0); + } + + private long writeRecursiveOnStack(Node<VALUE> node, int depth) throws IOException + { + long nodePosition = dest.position(); + for (Node<VALUE> child : node.children) + if (child.filePos == -1) + { + if (depth < maxRecursionDepth) + child.filePos = writeRecursiveOnStack(child, depth + 1); + else + child.filePos = writeRecursiveOnHeap(child); + } + + nodePosition += node.branchSize; + assert dest.position() == nodePosition + : "Expected node position to be " + nodePosition + " but got " + dest.position() + " after writing children.\n" + dumpNode(node, dest.position()); + + serializer.write(dest, node, nodePosition); + + assert dest.position() == nodePosition + node.nodeSize + || dest.paddedPosition() == dest.position() // For PartitionIndexTest.testPointerGrowth where position may jump on page boundaries. + : "Expected node position to be " + (nodePosition + node.nodeSize) + " but got " + dest.position() + " after writing node, nodeSize " + node.nodeSize + ".\n" + dumpNode(node, nodePosition); + return nodePosition; + } + + @Override + protected long writePartial(Node<VALUE> node, DataOutputPlus dest, long baseOffset) throws IOException + { + return writePartialRecursiveOnStack(node, dest, baseOffset, 0); + } + + private long writePartialRecursiveOnStack(Node<VALUE> node, DataOutputPlus dest, long baseOffset, int depth) throws IOException + { + long startPosition = dest.position() + baseOffset; + + List<Node<VALUE>> childrenToClear = new ArrayList<>(); + for (Node<VALUE> child : node.children) + { + if (child.filePos == -1) + { + childrenToClear.add(child); + if (depth < maxRecursionDepth) + child.filePos = writePartialRecursiveOnStack(child, dest, baseOffset, depth + 1); + else + child.filePos = writePartialRecursiveOnHeap(child, dest, baseOffset); + } + } + + long nodePosition = dest.position() + baseOffset; + + if (node.hasOutOfPageInBranch) + { + // Update the branch size with the size of what we have just written. This may be used by the node's + // maxPositionDelta, and it's a better approximation for later fitting calculations. + node.branchSize = (int) (nodePosition - startPosition); + } + + serializer.write(dest, node, nodePosition); + + if (node.hasOutOfPageChildren || node.hasOutOfPageInBranch) + { + // Update the node size with what we have just seen. It's a better approximation for later fitting + // calculations. + long endPosition = dest.position() + baseOffset; + node.nodeSize = (int) (endPosition - nodePosition); + } + + for (Node<VALUE> child : childrenToClear) + child.filePos = -1; + return nodePosition; + } + + private int recalcTotalSizeRecursiveOnHeap(Node<VALUE> node, long nodePosition) throws IOException + { + if (node.hasOutOfPageInBranch) + new RecalcTotalSizeRecursion(node, null, nodePosition).process(); + + if (node.hasOutOfPageChildren || node.hasOutOfPageInBranch) + node.nodeSize = serializer.sizeofNode(node, nodePosition + node.branchSize); + + return node.branchSize + node.nodeSize; + } + + private long writeRecursiveOnHeap(Node<VALUE> node) throws IOException + { + return new WriteRecursion(node, null).process().node.filePos; + } + + private long writePartialRecursiveOnHeap(Node<VALUE> node, DataOutputPlus dest, long baseOffset) throws IOException + { + new WritePartialRecursion(node, dest, baseOffset).process(); + long pos = node.filePos; + node.filePos = -1; + return pos; + } + + /** + * Simple framework for executing recursion using on-heap linked trace to avoid stack overruns. + */ + static abstract class Recursion<NODE> + { + final Recursion<NODE> parent; + final NODE node; + final Iterator<NODE> childIterator; + + Recursion(NODE node, Iterator<NODE> childIterator, Recursion<NODE> parent) + { + this.parent = parent; + this.node = node; + this.childIterator = childIterator; + } + + /** + * Make a child Recursion object for the given node and initialize it as necessary to continue processing + * with it. + * + * May return null if the recursion does not need to continue inside the child branch. + */ + abstract Recursion<NODE> makeChild(NODE child); + + /** + * Complete the processing this Recursion object. + * Review Comment: Done ########## src/java/org/apache/cassandra/io/tries/IncrementalDeepTrieWriterPageAware.java: ########## @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.tries; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.cassandra.io.util.DataOutputPlus; + +/** + * This class is a variant of {@link IncrementalTrieWriterPageAware} which is able to build even very deep + * tries. While the parent class uses recursion for clarity, it may end up with stack overflow for tries with + * very long keys. This implementation can switch processing from stack to heap at a certain depth (provided + * as a constructor param). + */ +public class IncrementalDeepTrieWriterPageAware<VALUE> extends IncrementalTrieWriterPageAware<VALUE> +{ + private final int maxRecursionDepth; + + public IncrementalDeepTrieWriterPageAware(TrieSerializer<VALUE, ? super DataOutputPlus> trieSerializer, DataOutputPlus dest, int maxRecursionDepth) + { + super(trieSerializer, dest); + this.maxRecursionDepth = maxRecursionDepth; + } + + public IncrementalDeepTrieWriterPageAware(TrieSerializer<VALUE, ? super DataOutputPlus> trieSerializer, DataOutputPlus dest) + { + this(trieSerializer, dest, 64); + } + + @Override + protected int recalcTotalSize(Node<VALUE> node, long nodePosition) throws IOException + { + return recalcTotalSizeRecursiveOnStack(node, nodePosition, 0); + } + + private int recalcTotalSizeRecursiveOnStack(Node<VALUE> node, long nodePosition, int depth) throws IOException + { + if (node.hasOutOfPageInBranch) + { + int sz = 0; + for (Node<VALUE> child : node.children) + { + if (depth < maxRecursionDepth) + sz += recalcTotalSizeRecursiveOnStack(child, nodePosition + sz, depth + 1); + else + sz += recalcTotalSizeRecursiveOnHeap(child, nodePosition + sz); + } + node.branchSize = sz; + } + + // The sizing below will use the branch size calculated above. Since that can change on out-of-page in branch, + // we need to recalculate the size if either flag is set. + if (node.hasOutOfPageChildren || node.hasOutOfPageInBranch) + node.nodeSize = serializer.sizeofNode(node, nodePosition + node.branchSize); + + return node.branchSize + node.nodeSize; + } + + @Override + protected long write(Node<VALUE> node) throws IOException + { + return writeRecursiveOnStack(node, 0); + } + + private long writeRecursiveOnStack(Node<VALUE> node, int depth) throws IOException + { + long nodePosition = dest.position(); + for (Node<VALUE> child : node.children) + if (child.filePos == -1) + { + if (depth < maxRecursionDepth) + child.filePos = writeRecursiveOnStack(child, depth + 1); + else + child.filePos = writeRecursiveOnHeap(child); + } + + nodePosition += node.branchSize; + assert dest.position() == nodePosition + : "Expected node position to be " + nodePosition + " but got " + dest.position() + " after writing children.\n" + dumpNode(node, dest.position()); + + serializer.write(dest, node, nodePosition); + + assert dest.position() == nodePosition + node.nodeSize + || dest.paddedPosition() == dest.position() // For PartitionIndexTest.testPointerGrowth where position may jump on page boundaries. + : "Expected node position to be " + (nodePosition + node.nodeSize) + " but got " + dest.position() + " after writing node, nodeSize " + node.nodeSize + ".\n" + dumpNode(node, nodePosition); + return nodePosition; + } + + @Override + protected long writePartial(Node<VALUE> node, DataOutputPlus dest, long baseOffset) throws IOException + { + return writePartialRecursiveOnStack(node, dest, baseOffset, 0); + } + + private long writePartialRecursiveOnStack(Node<VALUE> node, DataOutputPlus dest, long baseOffset, int depth) throws IOException + { + long startPosition = dest.position() + baseOffset; + + List<Node<VALUE>> childrenToClear = new ArrayList<>(); + for (Node<VALUE> child : node.children) + { + if (child.filePos == -1) + { + childrenToClear.add(child); + if (depth < maxRecursionDepth) + child.filePos = writePartialRecursiveOnStack(child, dest, baseOffset, depth + 1); + else + child.filePos = writePartialRecursiveOnHeap(child, dest, baseOffset); + } + } + + long nodePosition = dest.position() + baseOffset; + + if (node.hasOutOfPageInBranch) + { + // Update the branch size with the size of what we have just written. This may be used by the node's + // maxPositionDelta, and it's a better approximation for later fitting calculations. + node.branchSize = (int) (nodePosition - startPosition); + } + + serializer.write(dest, node, nodePosition); + + if (node.hasOutOfPageChildren || node.hasOutOfPageInBranch) + { + // Update the node size with what we have just seen. It's a better approximation for later fitting + // calculations. + long endPosition = dest.position() + baseOffset; + node.nodeSize = (int) (endPosition - nodePosition); + } + + for (Node<VALUE> child : childrenToClear) + child.filePos = -1; + return nodePosition; + } + + private int recalcTotalSizeRecursiveOnHeap(Node<VALUE> node, long nodePosition) throws IOException + { + if (node.hasOutOfPageInBranch) + new RecalcTotalSizeRecursion(node, null, nodePosition).process(); + + if (node.hasOutOfPageChildren || node.hasOutOfPageInBranch) + node.nodeSize = serializer.sizeofNode(node, nodePosition + node.branchSize); + + return node.branchSize + node.nodeSize; + } + + private long writeRecursiveOnHeap(Node<VALUE> node) throws IOException + { + return new WriteRecursion(node, null).process().node.filePos; + } + + private long writePartialRecursiveOnHeap(Node<VALUE> node, DataOutputPlus dest, long baseOffset) throws IOException + { + new WritePartialRecursion(node, dest, baseOffset).process(); + long pos = node.filePos; + node.filePos = -1; + return pos; + } + + /** + * Simple framework for executing recursion using on-heap linked trace to avoid stack overruns. + */ + static abstract class Recursion<NODE> + { + final Recursion<NODE> parent; + final NODE node; + final Iterator<NODE> childIterator; + + Recursion(NODE node, Iterator<NODE> childIterator, Recursion<NODE> parent) + { + this.parent = parent; + this.node = node; + this.childIterator = childIterator; + } + + /** + * Make a child Recursion object for the given node and initialize it as necessary to continue processing + * with it. + * Review Comment: Done ########## src/java/org/apache/cassandra/io/sstable/format/bti/RowIndexWriter.java: ########## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable.format.bti; + +import java.io.IOException; + +import org.apache.cassandra.db.ClusteringComparator; +import org.apache.cassandra.db.ClusteringPrefix; +import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.io.sstable.format.bti.RowIndexReader.IndexInfo; +import org.apache.cassandra.io.tries.IncrementalTrieWriter; +import org.apache.cassandra.io.tries.Walker; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.utils.bytecomparable.ByteComparable; +import org.apache.cassandra.utils.bytecomparable.ByteSource; + +/** + * Preparer / writer of row index tries. + * Review Comment: Done ########## src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableReaderLoadingBuilder.java: ########## @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.sstable.format.bti; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.io.compress.CompressionMetadata; +import org.apache.cassandra.io.sstable.KeyReader; +import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.io.sstable.format.CompressionInfoComponent; +import org.apache.cassandra.io.sstable.format.FilterComponent; +import org.apache.cassandra.io.sstable.format.SSTableReaderLoadingBuilder; +import org.apache.cassandra.io.sstable.format.StatsComponent; +import org.apache.cassandra.io.sstable.format.bti.BtiFormat.Components; +import org.apache.cassandra.io.sstable.metadata.MetadataType; +import org.apache.cassandra.io.sstable.metadata.StatsMetadata; +import org.apache.cassandra.io.sstable.metadata.ValidationMetadata; +import org.apache.cassandra.io.util.FileHandle; +import org.apache.cassandra.metrics.TableMetrics; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.FilterFactory; +import org.apache.cassandra.utils.IFilter; +import org.apache.cassandra.utils.Throwables; + +import static com.google.common.base.Preconditions.checkNotNull; + +public class BtiTableReaderLoadingBuilder extends SSTableReaderLoadingBuilder<BtiTableReader, BtiTableReader.Builder> +{ + private final static Logger logger = LoggerFactory.getLogger(BtiTableReaderLoadingBuilder.class); + + private FileHandle.Builder dataFileBuilder; + private FileHandle.Builder partitionIndexFileBuilder; + private FileHandle.Builder rowIndexFileBuilder; + + public BtiTableReaderLoadingBuilder(SSTable.Builder<?, ?> builder) + { + super(builder); + } + + @Override + public KeyReader buildKeyReader(TableMetrics tableMetrics) throws IOException + { + StatsComponent statsComponent = StatsComponent.load(descriptor, MetadataType.STATS, MetadataType.HEADER, MetadataType.VALIDATION); + return createKeyReader(statsComponent.statsMetadata()); + } + + private KeyReader createKeyReader(StatsMetadata statsMetadata) throws IOException + { + checkNotNull(statsMetadata); + + try (PartitionIndex index = PartitionIndex.load(partitionIndexFileBuilder(), tableMetadataRef.getLocal().partitioner, false); + CompressionMetadata compressionMetadata = CompressionInfoComponent.maybeLoad(descriptor, components); + FileHandle dFile = dataFileBuilder(statsMetadata).withCompressionMetadata(compressionMetadata).complete(); + FileHandle riFile = rowIndexFileBuilder().complete()) + { + return PartitionIterator.create(index, + tableMetadataRef.getLocal().partitioner, + riFile, + dFile); + } + } + + @Override + protected void openComponents(BtiTableReader.Builder builder, SSTable.Owner owner, boolean validate, boolean online) throws IOException + { + try + { + StatsComponent statsComponent = StatsComponent.load(descriptor, MetadataType.STATS, MetadataType.VALIDATION, MetadataType.HEADER); + builder.setSerializationHeader(statsComponent.serializationHeader(builder.getTableMetadataRef().getLocal())); + assert !online || builder.getSerializationHeader() != null; Review Comment: Changed to `checkArgument` i.e. `InvalidArgumentException` which is as much as that message could say anyway. ########## src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableVerifier.java: ########## @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable.format.bti; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +import com.google.common.base.Throwables; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.compaction.CompactionInterruptedException; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.dht.LocalPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.CorruptSSTableException; +import org.apache.cassandra.io.sstable.IVerifier; +import org.apache.cassandra.io.sstable.KeyReader; +import org.apache.cassandra.io.sstable.SSTableIdentityIterator; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.SortedTableVerifier; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.OutputHandler; + +public class BtiTableVerifier extends SortedTableVerifier<BtiTableReader> implements IVerifier +{ + public BtiTableVerifier(ColumnFamilyStore cfs, BtiTableReader sstable, OutputHandler outputHandler, boolean isOffline, Options options) + { + super(cfs, sstable, outputHandler, isOffline, options); + } + + public void verify() + { + verifySSTableVersion(); + + verifySSTableMetadata(); + + verifyIndex(); + + verifyBloomFilter(); + + if (options.checkOwnsTokens && !isOffline && !(cfs.getPartitioner() instanceof LocalPartitioner)) + { + if (verifyOwnedRanges() == 0) + return; + } + + if (options.quick) + return; + + if (verifyDigest() && !options.extendedVerification) + return; + + verifySSTable(); + + outputHandler.output("Verify of %s succeeded. All %d rows read successfully", sstable, goodRows); + } + + private void verifySSTable() + { + long rowStart; + outputHandler.output("Extended Verify requested, proceeding to inspect values"); + + try (VerifyController verifyController = new VerifyController(cfs); + KeyReader indexIterator = sstable.keyReader()) + { + if (indexIterator.dataPosition() != 0) + markAndThrow(new RuntimeException("First row position from index != 0: " + indexIterator.dataPosition())); + + List<Range<Token>> ownedRanges = isOffline ? Collections.emptyList() : Range.normalize(tokenLookup.apply(cfs.metadata().keyspace)); + RangeOwnHelper rangeOwnHelper = new RangeOwnHelper(ownedRanges); + DecoratedKey prevKey = null; + + while (!dataFile.isEOF()) + { + + if (verifyInfo.isStopRequested()) + throw new CompactionInterruptedException(verifyInfo.getCompactionInfo()); + + rowStart = dataFile.getFilePointer(); + outputHandler.debug("Reading row at %d", rowStart); + + DecoratedKey key = null; + try + { + key = sstable.decorateKey(ByteBufferUtil.readWithShortLength(dataFile)); + } + catch (Throwable th) + { + throwIfFatal(th); Review Comment: The verifier is not supposed to continue after any error -- changed to `markAndThrow`. ########## src/java/org/apache/cassandra/io/sstable/format/bti/RowIndexReverseIterator.java: ########## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable.format.bti; + +import java.io.PrintStream; + +import org.apache.cassandra.io.sstable.format.bti.RowIndexReader.IndexInfo; +import org.apache.cassandra.io.tries.ReverseValueIterator; +import org.apache.cassandra.io.util.FileHandle; +import org.apache.cassandra.utils.bytecomparable.ByteComparable; + +/** + * Reverse iterator over the row index. Needed to get previous index blocks for reverse iteration. + */ +class RowIndexReverseIterator extends ReverseValueIterator<RowIndexReverseIterator> +{ + private long currentNode = -1; + + public RowIndexReverseIterator(FileHandle file, long root, ByteComparable start, ByteComparable end) + { + super(file.instantiateRebufferer(null), root, start, end, true); + } + + public RowIndexReverseIterator(FileHandle file, TrieIndexEntry entry, ByteComparable end) + { + this(file, entry.indexTrieRoot, ByteComparable.EMPTY, end); + } + + /** + * This method must be async-read-safe. + */ + public IndexInfo nextIndexInfo() + { + if (currentNode == -1) + { + currentNode = nextPayloadedNode(); + if (currentNode == -1) + return null; + } + + go(currentNode); + IndexInfo info = RowIndexReader.readPayload(buf, payloadPosition(), payloadFlags()); + + currentNode = -1; + return info; + } + + public void dumpTrie(PrintStream out) Review Comment: Added `@SuppressWarnings`. ########## src/java/org/apache/cassandra/io/sstable/format/bti/RowIndexReverseIterator.java: ########## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable.format.bti; + +import java.io.PrintStream; + +import org.apache.cassandra.io.sstable.format.bti.RowIndexReader.IndexInfo; +import org.apache.cassandra.io.tries.ReverseValueIterator; +import org.apache.cassandra.io.util.FileHandle; +import org.apache.cassandra.utils.bytecomparable.ByteComparable; + +/** + * Reverse iterator over the row index. Needed to get previous index blocks for reverse iteration. + */ +class RowIndexReverseIterator extends ReverseValueIterator<RowIndexReverseIterator> +{ + private long currentNode = -1; + + public RowIndexReverseIterator(FileHandle file, long root, ByteComparable start, ByteComparable end) + { + super(file.instantiateRebufferer(null), root, start, end, true); + } + + public RowIndexReverseIterator(FileHandle file, TrieIndexEntry entry, ByteComparable end) + { + this(file, entry.indexTrieRoot, ByteComparable.EMPTY, end); + } + + /** + * This method must be async-read-safe. + */ + public IndexInfo nextIndexInfo() + { + if (currentNode == -1) + { + currentNode = nextPayloadedNode(); + if (currentNode == -1) + return null; + } + + go(currentNode); + IndexInfo info = RowIndexReader.readPayload(buf, payloadPosition(), payloadFlags()); + + currentNode = -1; + return info; + } + + public void dumpTrie(PrintStream out) + { + dumpTrie(out, (buf, ppos, bits) -> { + IndexInfo ii = RowIndexReader.readPayload(buf, ppos, bits); Review Comment: Extracted method in `RowIndexReader` to avoid code repetition. ########## src/java/org/apache/cassandra/io/tries/ValueIterator.java: ########## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.tries; + +import org.apache.cassandra.io.util.Rebufferer; +import org.apache.cassandra.utils.bytecomparable.ByteComparable; +import org.apache.cassandra.utils.bytecomparable.ByteSource; + +/** + * Thread-unsafe value iterator for on-disk tries. Uses the assumptions of Walker. + */ +public class ValueIterator<CONCRETE extends ValueIterator<CONCRETE>> extends Walker<CONCRETE> +{ + private final ByteSource limit; + private IterationPosition stack; + private long next; + + static class IterationPosition + { + long node; Review Comment: Done ########## src/java/org/apache/cassandra/io/tries/IncrementalDeepTrieWriterPageAware.java: ########## @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.tries; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.cassandra.io.util.DataOutputPlus; + +/** + * This class is a variant of {@link IncrementalTrieWriterPageAware} which is able to build even very deep + * tries. While the parent class uses recursion for clarity, it may end up with stack overflow for tries with + * very long keys. This implementation can switch processing from stack to heap at a certain depth (provided + * as a constructor param). + */ +public class IncrementalDeepTrieWriterPageAware<VALUE> extends IncrementalTrieWriterPageAware<VALUE> +{ + private final int maxRecursionDepth; + + public IncrementalDeepTrieWriterPageAware(TrieSerializer<VALUE, ? super DataOutputPlus> trieSerializer, DataOutputPlus dest, int maxRecursionDepth) + { + super(trieSerializer, dest); + this.maxRecursionDepth = maxRecursionDepth; + } + + public IncrementalDeepTrieWriterPageAware(TrieSerializer<VALUE, ? super DataOutputPlus> trieSerializer, DataOutputPlus dest) + { + this(trieSerializer, dest, 64); + } + + @Override + protected int recalcTotalSize(Node<VALUE> node, long nodePosition) throws IOException + { + return recalcTotalSizeRecursiveOnStack(node, nodePosition, 0); + } + + private int recalcTotalSizeRecursiveOnStack(Node<VALUE> node, long nodePosition, int depth) throws IOException + { + if (node.hasOutOfPageInBranch) + { + int sz = 0; + for (Node<VALUE> child : node.children) + { + if (depth < maxRecursionDepth) + sz += recalcTotalSizeRecursiveOnStack(child, nodePosition + sz, depth + 1); + else + sz += recalcTotalSizeRecursiveOnHeap(child, nodePosition + sz); + } + node.branchSize = sz; + } + + // The sizing below will use the branch size calculated above. Since that can change on out-of-page in branch, + // we need to recalculate the size if either flag is set. + if (node.hasOutOfPageChildren || node.hasOutOfPageInBranch) + node.nodeSize = serializer.sizeofNode(node, nodePosition + node.branchSize); + + return node.branchSize + node.nodeSize; + } + + @Override + protected long write(Node<VALUE> node) throws IOException + { + return writeRecursiveOnStack(node, 0); + } + + private long writeRecursiveOnStack(Node<VALUE> node, int depth) throws IOException + { + long nodePosition = dest.position(); + for (Node<VALUE> child : node.children) + if (child.filePos == -1) + { + if (depth < maxRecursionDepth) + child.filePos = writeRecursiveOnStack(child, depth + 1); + else + child.filePos = writeRecursiveOnHeap(child); + } + + nodePosition += node.branchSize; + assert dest.position() == nodePosition + : "Expected node position to be " + nodePosition + " but got " + dest.position() + " after writing children.\n" + dumpNode(node, dest.position()); + + serializer.write(dest, node, nodePosition); + + assert dest.position() == nodePosition + node.nodeSize + || dest.paddedPosition() == dest.position() // For PartitionIndexTest.testPointerGrowth where position may jump on page boundaries. + : "Expected node position to be " + (nodePosition + node.nodeSize) + " but got " + dest.position() + " after writing node, nodeSize " + node.nodeSize + ".\n" + dumpNode(node, nodePosition); + return nodePosition; + } + + @Override + protected long writePartial(Node<VALUE> node, DataOutputPlus dest, long baseOffset) throws IOException + { + return writePartialRecursiveOnStack(node, dest, baseOffset, 0); + } + + private long writePartialRecursiveOnStack(Node<VALUE> node, DataOutputPlus dest, long baseOffset, int depth) throws IOException + { + long startPosition = dest.position() + baseOffset; + + List<Node<VALUE>> childrenToClear = new ArrayList<>(); + for (Node<VALUE> child : node.children) + { + if (child.filePos == -1) + { + childrenToClear.add(child); + if (depth < maxRecursionDepth) + child.filePos = writePartialRecursiveOnStack(child, dest, baseOffset, depth + 1); + else + child.filePos = writePartialRecursiveOnHeap(child, dest, baseOffset); + } + } + + long nodePosition = dest.position() + baseOffset; + + if (node.hasOutOfPageInBranch) + { + // Update the branch size with the size of what we have just written. This may be used by the node's + // maxPositionDelta, and it's a better approximation for later fitting calculations. + node.branchSize = (int) (nodePosition - startPosition); + } + + serializer.write(dest, node, nodePosition); + + if (node.hasOutOfPageChildren || node.hasOutOfPageInBranch) + { + // Update the node size with what we have just seen. It's a better approximation for later fitting + // calculations. + long endPosition = dest.position() + baseOffset; + node.nodeSize = (int) (endPosition - nodePosition); + } + + for (Node<VALUE> child : childrenToClear) + child.filePos = -1; + return nodePosition; + } + + private int recalcTotalSizeRecursiveOnHeap(Node<VALUE> node, long nodePosition) throws IOException + { + if (node.hasOutOfPageInBranch) + new RecalcTotalSizeRecursion(node, null, nodePosition).process(); + + if (node.hasOutOfPageChildren || node.hasOutOfPageInBranch) + node.nodeSize = serializer.sizeofNode(node, nodePosition + node.branchSize); + + return node.branchSize + node.nodeSize; + } + + private long writeRecursiveOnHeap(Node<VALUE> node) throws IOException + { + return new WriteRecursion(node, null).process().node.filePos; + } + + private long writePartialRecursiveOnHeap(Node<VALUE> node, DataOutputPlus dest, long baseOffset) throws IOException + { + new WritePartialRecursion(node, dest, baseOffset).process(); + long pos = node.filePos; + node.filePos = -1; + return pos; + } + + /** + * Simple framework for executing recursion using on-heap linked trace to avoid stack overruns. + */ + static abstract class Recursion<NODE> + { + final Recursion<NODE> parent; + final NODE node; + final Iterator<NODE> childIterator; + + Recursion(NODE node, Iterator<NODE> childIterator, Recursion<NODE> parent) + { + this.parent = parent; + this.node = node; + this.childIterator = childIterator; + } + + /** + * Make a child Recursion object for the given node and initialize it as necessary to continue processing + * with it. + * + * May return null if the recursion does not need to continue inside the child branch. + */ + abstract Recursion<NODE> makeChild(NODE child); + + /** + * Complete the processing this Recursion object. + * + * Note: this method is not called for the nodes for which makeChild() returns null. + */ + abstract void complete() throws IOException; + + /** + * Complete processing of the given child (possibly retrieve data to apply to any accumulation performed + * in this Recursion object). + * + * This is called when processing a child completes, including when recursion inside the child branch + * is skipped by makeChild() returning null. + */ + void completeChild(NODE child) + {} + + /** + * Recursive process, in depth-first order, the branch rooted at this recursion node. + * Review Comment: Done ########## src/java/org/apache/cassandra/io/tries/ValueIterator.java: ########## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.tries; + +import org.apache.cassandra.io.util.Rebufferer; +import org.apache.cassandra.utils.bytecomparable.ByteComparable; +import org.apache.cassandra.utils.bytecomparable.ByteSource; + +/** + * Thread-unsafe value iterator for on-disk tries. Uses the assumptions of Walker. + */ +public class ValueIterator<CONCRETE extends ValueIterator<CONCRETE>> extends Walker<CONCRETE> +{ + private final ByteSource limit; + private IterationPosition stack; + private long next; + + static class IterationPosition + { + long node; + int childIndex; + int limit; + IterationPosition prev; Review Comment: Done ########## src/java/org/apache/cassandra/io/tries/ValueIterator.java: ########## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.tries; + +import org.apache.cassandra.io.util.Rebufferer; +import org.apache.cassandra.utils.bytecomparable.ByteComparable; +import org.apache.cassandra.utils.bytecomparable.ByteSource; + +/** + * Thread-unsafe value iterator for on-disk tries. Uses the assumptions of Walker. + */ +public class ValueIterator<CONCRETE extends ValueIterator<CONCRETE>> extends Walker<CONCRETE> +{ + private final ByteSource limit; + private IterationPosition stack; + private long next; + + static class IterationPosition + { + long node; + int childIndex; + int limit; Review Comment: Done ########## src/java/org/apache/cassandra/io/tries/IncrementalDeepTrieWriterPageAware.java: ########## @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.tries; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.cassandra.io.util.DataOutputPlus; + +/** + * This class is a variant of {@link IncrementalTrieWriterPageAware} which is able to build even very deep + * tries. While the parent class uses recursion for clarity, it may end up with stack overflow for tries with + * very long keys. This implementation can switch processing from stack to heap at a certain depth (provided + * as a constructor param). + */ +public class IncrementalDeepTrieWriterPageAware<VALUE> extends IncrementalTrieWriterPageAware<VALUE> +{ + private final int maxRecursionDepth; + + public IncrementalDeepTrieWriterPageAware(TrieSerializer<VALUE, ? super DataOutputPlus> trieSerializer, DataOutputPlus dest, int maxRecursionDepth) + { + super(trieSerializer, dest); + this.maxRecursionDepth = maxRecursionDepth; + } + + public IncrementalDeepTrieWriterPageAware(TrieSerializer<VALUE, ? super DataOutputPlus> trieSerializer, DataOutputPlus dest) + { + this(trieSerializer, dest, 64); + } + + @Override + protected int recalcTotalSize(Node<VALUE> node, long nodePosition) throws IOException + { + return recalcTotalSizeRecursiveOnStack(node, nodePosition, 0); + } + + private int recalcTotalSizeRecursiveOnStack(Node<VALUE> node, long nodePosition, int depth) throws IOException + { + if (node.hasOutOfPageInBranch) + { + int sz = 0; + for (Node<VALUE> child : node.children) + { + if (depth < maxRecursionDepth) + sz += recalcTotalSizeRecursiveOnStack(child, nodePosition + sz, depth + 1); + else + sz += recalcTotalSizeRecursiveOnHeap(child, nodePosition + sz); + } + node.branchSize = sz; + } + + // The sizing below will use the branch size calculated above. Since that can change on out-of-page in branch, + // we need to recalculate the size if either flag is set. + if (node.hasOutOfPageChildren || node.hasOutOfPageInBranch) + node.nodeSize = serializer.sizeofNode(node, nodePosition + node.branchSize); + + return node.branchSize + node.nodeSize; + } + + @Override + protected long write(Node<VALUE> node) throws IOException + { + return writeRecursiveOnStack(node, 0); + } + + private long writeRecursiveOnStack(Node<VALUE> node, int depth) throws IOException + { + long nodePosition = dest.position(); + for (Node<VALUE> child : node.children) + if (child.filePos == -1) + { + if (depth < maxRecursionDepth) + child.filePos = writeRecursiveOnStack(child, depth + 1); + else + child.filePos = writeRecursiveOnHeap(child); + } + + nodePosition += node.branchSize; + assert dest.position() == nodePosition + : "Expected node position to be " + nodePosition + " but got " + dest.position() + " after writing children.\n" + dumpNode(node, dest.position()); + + serializer.write(dest, node, nodePosition); + + assert dest.position() == nodePosition + node.nodeSize + || dest.paddedPosition() == dest.position() // For PartitionIndexTest.testPointerGrowth where position may jump on page boundaries. + : "Expected node position to be " + (nodePosition + node.nodeSize) + " but got " + dest.position() + " after writing node, nodeSize " + node.nodeSize + ".\n" + dumpNode(node, nodePosition); + return nodePosition; + } + + @Override + protected long writePartial(Node<VALUE> node, DataOutputPlus dest, long baseOffset) throws IOException + { + return writePartialRecursiveOnStack(node, dest, baseOffset, 0); + } + + private long writePartialRecursiveOnStack(Node<VALUE> node, DataOutputPlus dest, long baseOffset, int depth) throws IOException + { + long startPosition = dest.position() + baseOffset; + + List<Node<VALUE>> childrenToClear = new ArrayList<>(); + for (Node<VALUE> child : node.children) + { + if (child.filePos == -1) + { + childrenToClear.add(child); + if (depth < maxRecursionDepth) + child.filePos = writePartialRecursiveOnStack(child, dest, baseOffset, depth + 1); + else + child.filePos = writePartialRecursiveOnHeap(child, dest, baseOffset); + } + } + + long nodePosition = dest.position() + baseOffset; + + if (node.hasOutOfPageInBranch) + { + // Update the branch size with the size of what we have just written. This may be used by the node's + // maxPositionDelta, and it's a better approximation for later fitting calculations. + node.branchSize = (int) (nodePosition - startPosition); + } + + serializer.write(dest, node, nodePosition); + + if (node.hasOutOfPageChildren || node.hasOutOfPageInBranch) + { + // Update the node size with what we have just seen. It's a better approximation for later fitting + // calculations. + long endPosition = dest.position() + baseOffset; + node.nodeSize = (int) (endPosition - nodePosition); + } + + for (Node<VALUE> child : childrenToClear) + child.filePos = -1; + return nodePosition; + } + + private int recalcTotalSizeRecursiveOnHeap(Node<VALUE> node, long nodePosition) throws IOException + { + if (node.hasOutOfPageInBranch) + new RecalcTotalSizeRecursion(node, null, nodePosition).process(); + + if (node.hasOutOfPageChildren || node.hasOutOfPageInBranch) + node.nodeSize = serializer.sizeofNode(node, nodePosition + node.branchSize); + + return node.branchSize + node.nodeSize; + } + + private long writeRecursiveOnHeap(Node<VALUE> node) throws IOException + { + return new WriteRecursion(node, null).process().node.filePos; + } + + private long writePartialRecursiveOnHeap(Node<VALUE> node, DataOutputPlus dest, long baseOffset) throws IOException + { + new WritePartialRecursion(node, dest, baseOffset).process(); + long pos = node.filePos; + node.filePos = -1; + return pos; + } + + /** + * Simple framework for executing recursion using on-heap linked trace to avoid stack overruns. + */ + static abstract class Recursion<NODE> + { + final Recursion<NODE> parent; + final NODE node; + final Iterator<NODE> childIterator; + + Recursion(NODE node, Iterator<NODE> childIterator, Recursion<NODE> parent) + { + this.parent = parent; + this.node = node; + this.childIterator = childIterator; + } + + /** + * Make a child Recursion object for the given node and initialize it as necessary to continue processing + * with it. + * + * May return null if the recursion does not need to continue inside the child branch. + */ + abstract Recursion<NODE> makeChild(NODE child); + + /** + * Complete the processing this Recursion object. + * + * Note: this method is not called for the nodes for which makeChild() returns null. + */ + abstract void complete() throws IOException; + + /** + * Complete processing of the given child (possibly retrieve data to apply to any accumulation performed + * in this Recursion object). + * Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

