maedhroz commented on code in PR #2267: URL: https://github.com/apache/cassandra/pull/2267#discussion_r1179713120
########## src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableWriter.java: ########## @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable.format.bti; + +import java.io.IOException; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; +import org.apache.cassandra.io.FSReadError; +import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.sstable.AbstractRowIndexEntry; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.io.sstable.format.DataComponent; +import org.apache.cassandra.io.sstable.format.IndexComponent; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableReader.OpenReason; +import org.apache.cassandra.io.sstable.format.SortedTableWriter; +import org.apache.cassandra.io.sstable.format.bti.BtiFormat.Components; +import org.apache.cassandra.io.util.DataPosition; +import org.apache.cassandra.io.util.FileHandle; +import org.apache.cassandra.io.util.MmappedRegionsCache; +import org.apache.cassandra.io.util.SequentialWriter; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.Clock; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.IFilter; +import org.apache.cassandra.utils.JVMStabilityInspector; +import org.apache.cassandra.utils.Throwables; +import org.apache.cassandra.utils.concurrent.Transactional; + +import static org.apache.cassandra.io.util.FileHandle.Builder.NO_LENGTH_OVERRIDE; + +@VisibleForTesting +public class BtiTableWriter extends SortedTableWriter<BtiFormatPartitionWriter> +{ + private static final Logger logger = LoggerFactory.getLogger(BtiTableWriter.class); + + private final BtiFormatPartitionWriter partitionWriter; + private final IndexWriter iwriter; + + public BtiTableWriter(Builder builder, LifecycleNewTracker lifecycleNewTracker, SSTable.Owner owner) + { + super(builder, lifecycleNewTracker, owner); + this.iwriter = builder.getIndexWriter(); + this.partitionWriter = builder.getPartitionWriter(); + } + + @Override + public void mark() + { + super.mark(); + iwriter.mark(); + } + + @Override + public void resetAndTruncate() + { + super.resetAndTruncate(); + iwriter.resetAndTruncate(); + } + + @Override + protected TrieIndexEntry createRowIndexEntry(DecoratedKey key, DeletionTime partitionLevelDeletion, long finishResult) throws IOException + { + TrieIndexEntry entry = TrieIndexEntry.create(partitionWriter.getInitialPosition(), + finishResult, + partitionLevelDeletion, + partitionWriter.getRowIndexCount()); + iwriter.append(key, entry); + return entry; + } + + @SuppressWarnings("resource") + private BtiTableReader openInternal(OpenReason openReason, boolean isFinal, Supplier<PartitionIndex> partitionIndexSupplier) + { + IFilter filter = null; + FileHandle dataFile = null; + PartitionIndex partitionIndex = null; + FileHandle rowIndexFile = null; + + BtiTableReader.Builder builder = unbuildTo(new BtiTableReader.Builder(descriptor), true).setMaxDataAge(maxDataAge) + .setSerializationHeader(header) + .setOpenReason(openReason); + + try + { + builder.setStatsMetadata(statsMetadata()); + + partitionIndex = partitionIndexSupplier.get(); + rowIndexFile = iwriter.rowIndexFHBuilder.complete(); + dataFile = openDataFile(isFinal ? NO_LENGTH_OVERRIDE : dataWriter.getLastFlushOffset(), builder.getStatsMetadata()); + filter = iwriter.getFilterCopy(); + + return builder.setPartitionIndex(partitionIndex) + .setFirst(partitionIndex.firstKey()) + .setLast(partitionIndex.lastKey()) + .setRowIndexFile(rowIndexFile) + .setDataFile(dataFile) + .setFilter(filter) + .build(owner().orElse(null), true, true); + } + catch (RuntimeException | Error ex) + { + JVMStabilityInspector.inspectThrowable(ex); + Throwables.closeNonNullAndAddSuppressed(ex, filter, dataFile, rowIndexFile, partitionIndex); + throw ex; + } + } + + + public void openEarly(Consumer<SSTableReader> callWhenReady) + { + long dataLength = dataWriter.position(); + iwriter.buildPartial(dataLength, partitionIndex -> + { + iwriter.rowIndexFHBuilder.withLengthOverride(iwriter.rowIndexWriter.getLastFlushOffset()); + BtiTableReader reader = openInternal(OpenReason.EARLY, false, () -> partitionIndex); + callWhenReady.accept(reader); + }); + } + + public SSTableReader openFinalEarly() + { + // we must ensure the data is completely flushed to disk + iwriter.complete(); // This will be called by completedPartitionIndex() below too, but we want it done now to + // ensure outstanding openEarly actions are not triggered. + dataWriter.sync(); + iwriter.rowIndexWriter.sync(); + // Note: Nothing must be written to any of the files after this point, as the chunk cache could pick up and + // retain a partially-written page (see DB-2446). + + return openFinal(OpenReason.EARLY); + } + + @SuppressWarnings("resource") + protected SSTableReader openFinal(OpenReason openReason) + { + + if (maxDataAge < 0) + maxDataAge = Clock.Global.currentTimeMillis(); + + return openInternal(openReason, true, iwriter::completedPartitionIndex); + } + + protected TransactionalProxy txnProxy() + { + return new TransactionalProxy(() -> FBUtilities.immutableListWithFilteredNulls(iwriter, dataWriter)); + } + + private class TransactionalProxy extends SortedTableWriter<BtiFormatPartitionWriter>.TransactionalProxy + { + public TransactionalProxy(Supplier<ImmutableList<Transactional>> transactionals) + { + super(transactionals); + } + + @Override + protected Throwable doPostCleanup(Throwable accumulate) + { + accumulate = Throwables.close(accumulate, partitionWriter); + accumulate = super.doPostCleanup(accumulate); + return accumulate; + } + } + + /** + * Encapsulates writing the index and filter for an SSTable. The state of this object is not valid until it has been closed. + */ + static class IndexWriter extends SortedTableWriter.AbstractIndexWriter + { + final SequentialWriter rowIndexWriter; + private final FileHandle.Builder rowIndexFHBuilder; + private final SequentialWriter partitionIndexWriter; + private final FileHandle.Builder partitionIndexFHBuilder; + private final PartitionIndexBuilder partitionIndex; + boolean partitionIndexCompleted = false; + private DataPosition riMark; + private DataPosition piMark; + + IndexWriter(Builder b) + { + super(b); + rowIndexWriter = new SequentialWriter(descriptor.fileFor(Components.ROW_INDEX), b.getIOOptions().writerOptions); + rowIndexFHBuilder = IndexComponent.fileBuilder(Components.ROW_INDEX, b).withMmappedRegionsCache(b.getMmappedRegionsCache()); + partitionIndexWriter = new SequentialWriter(descriptor.fileFor(Components.PARTITION_INDEX), b.getIOOptions().writerOptions); + partitionIndexFHBuilder = IndexComponent.fileBuilder(Components.PARTITION_INDEX, b).withMmappedRegionsCache(b.getMmappedRegionsCache()); + partitionIndex = new PartitionIndexBuilder(partitionIndexWriter, partitionIndexFHBuilder); + // register listeners to be alerted when the data files are flushed + partitionIndexWriter.setPostFlushListener(() -> partitionIndex.markPartitionIndexSynced(partitionIndexWriter.getLastFlushOffset())); + rowIndexWriter.setPostFlushListener(() -> partitionIndex.markRowIndexSynced(rowIndexWriter.getLastFlushOffset())); + @SuppressWarnings("resource") Review Comment: ```suggestion @SuppressWarnings({ "resource", "RedundantSuppression" }) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

