blambov commented on code in PR #2064:
URL: https://github.com/apache/cassandra/pull/2064#discussion_r1072343960
##########
src/java/org/apache/cassandra/io/sstable/format/FilterComponent.java:
##########
@@ -90,4 +97,63 @@ public static void saveOrDeleteCorrupted(Descriptor
descriptor, IFilter filter)
throw ex;
}
}
+
+ /**
+ * Optionally loads a Bloom filter. If the filter is not needed (FP chance
is neglectable), it sets
+ * {@link AlwaysPresentFilter} as a filter in the builder. If the filter
is expected to be recreated for various
+ * reasons, it leaves it {@code null} (unchanged). Otherwise, it attempts
to load the filter, and if it succeeds,
+ * it is set in the builder. If a filter fails to load, it is left {@code
null} (unchanged) meaning that it should
Review Comment:
This comment needs adjusting — the filter is returned rather than set
in the builder.
##########
src/java/org/apache/cassandra/io/sstable/format/FilterComponent.java:
##########
@@ -90,4 +97,63 @@ public static void saveOrDeleteCorrupted(Descriptor
descriptor, IFilter filter)
throw ex;
}
}
+
+ /**
+ * Optionally loads a Bloom filter. If the filter is not needed (FP chance
is neglectable), it sets
+ * {@link AlwaysPresentFilter} as a filter in the builder. If the filter
is expected to be recreated for various
+ * reasons, it leaves it {@code null} (unchanged). Otherwise, it attempts
to load the filter, and if it succeeds,
+ * it is set in the builder. If a filter fails to load, it is left {@code
null} (unchanged) meaning that it should
+ * be rebuilt.
+ */
+ public static IFilter maybeLoadBloomFilter(Descriptor descriptor,
Set<Component> components, TableMetadata metadata, ValidationMetadata
validationMetadata)
+ {
+ double currentFPChance = validationMetadata != null ?
validationMetadata.bloomFilterFPChance : Double.NaN;
+ double desiredFPChance = metadata.params.bloomFilterFpChance;
+
+ IFilter filter = null;
+ if (!shouldUseBloomFilter(desiredFPChance))
+ {
+ if (logger.isTraceEnabled())
+ logger.trace("Bloom filter for {} will not be loaded because
fpChance={} is neglectable", descriptor, desiredFPChance);
+
+ return FilterFactory.AlwaysPresent;
+ }
+ else if (!components.contains(Component.FILTER) ||
Double.isNaN(currentFPChance))
+ {
+ if (logger.isTraceEnabled())
+ logger.trace("Bloom filter for {} will not be loaded because
filter component is missing or sstable lacks validation metadata", descriptor);
+
+ return null;
+ }
+ else if (!isFPChanceDiffNeglectable(desiredFPChance, currentFPChance)
&& rebuildFilterOnFPChanceChange)
+ {
+ if (logger.isTraceEnabled())
+ logger.trace("Bloom filter for {} will not be loaded because
fpChance has changed from {} to {} and the filter should be recreated",
descriptor, currentFPChance, desiredFPChance);
+
+ return null;
+ }
+
+ try
+ {
+ filter = load(descriptor);
+ if (filter == null || filter instanceof AlwaysPresentFilter)
+ logger.info("Bloom filter for {} is missing or invalid",
descriptor);
+ }
+ catch (IOException ex)
+ {
+ logger.info("Bloom filter for " + descriptor + " could not be
deserialized", ex);
+ }
+
+ return filter;
+ }
+
+ static boolean shouldUseBloomFilter(double fpChance)
+ {
+ return !(Math.abs(1 - fpChance) <= filterFPChanceTolerance);
+ }
+
+ static boolean isFPChanceDiffNeglectable(double fpChance1, double
fpChance2)
Review Comment:
Nit: We should use "negligible" instead of "neglectable" (archaic form).
##########
src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java:
##########
@@ -43,11 +43,11 @@
* for on-close (i.e. when all references expire) that drops the page cache
prior to that key position
*
* hard-links are created for each partially written sstable so that readers
opened against them continue to work past
- * the rename of the temporary file, which is deleted once all readers against
the hard-link have been closed.
+ * renaming of the temporary file, which is deleted once all readers against
the hard-link have been closed.
* If for any reason the writer is rolled over, we immediately rename and
fully expose the completed file in the Tracker.
*
- * On abort we restore the original lower bounds to the existing readers and
delete any temporary files we had in progress,
- * but leave any hard-links in place for the readers we opened to cleanup when
they're finished as we would had we finished
+ * On abort, we restore the original lower bounds to the existing readers and
delete any temporary files we had in progress,
+ * but leave any hard-links in place for the readers we opened to clean-up
when they're finished as we would have finished
Review Comment:
This does not have the same meaning, "had we" in this case stands for "if we
had"
##########
src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriterBuilder.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable.format.big;
+
+import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.io.sstable.DataComponent;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.SortedTableWriterBuilder;
+import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.utils.Throwables;
+
+public class BigTableWriterBuilder extends
SortedTableWriterBuilder<RowIndexEntry, BigFormatPartitionWriter,
BigTableWriter, BigTableWriterBuilder>
+{
+ private RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
+ private BigTableWriter.IndexWriter indexWriter;
+ private SequentialWriter dataWriter;
+ private BigFormatPartitionWriter partitionWriter;
+
+ public BigTableWriterBuilder(Descriptor descriptor)
+ {
+ super(descriptor);
+ }
+
+ @Override
+ public SequentialWriter getDataWriter()
+ {
+ return dataWriter;
+ }
+
+ @Override
+ public BigFormatPartitionWriter getPartitionWriter()
+ {
+ return partitionWriter;
+ }
+
+ public RowIndexEntry.IndexSerializer getRowIndexEntrySerializer()
+ {
+ return rowIndexEntrySerializer;
+ }
+
+ public BigTableWriter.IndexWriter getIndexWriter()
+ {
+ return indexWriter;
+ }
+
+ @Override
+ protected BigTableWriter buildInternal(LifecycleNewTracker
lifecycleNewTracker)
+ {
+ try
+ {
+ rowIndexEntrySerializer = new
RowIndexEntry.Serializer(descriptor.version, getSerializationHeader());
+ dataWriter = DataComponent.buildWriter(descriptor,
Review Comment:
Setting these fields here and grabbing them via the getter is somewhat
unsafe; e.g. if the builder is reused we could mix instances up.
Can't we just pass anything we construct as arguments to the
`BigTableWriter` constructor?
##########
src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java:
##########
@@ -217,7 +218,7 @@ public void setOpenResult(boolean openResult)
/**
* Open the resultant SSTableReader before it has been fully written
Review Comment:
Doc should explain the consumer.
##########
src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java:
##########
@@ -1983,39 +1380,36 @@ public void addTo(Ref.IdentityCollection identities)
{
identities.add(this);
identities.add(tidy.globalRef);
- dfile.addTo(identities);
- ifile.addTo(identities);
- bf.addTo(identities);
- indexSummary.addTo(identities);
-
+ tidy.closeables.forEach(c -> {
+ if (c instanceof SharedCloseable)
+ ((SharedCloseable) c).addTo(identities);
+ });
}
public boolean maybePresent(DecoratedKey key)
Review Comment:
Could this be renamed to `mayBePresent` or better still `couldContain`? The
current spelling implies "present" is used as a verb, i.e. that this is a
function that changes some state.
##########
src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java:
##########
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable.format;
+
+import java.io.IOException;
+import java.nio.BufferOverflowException;
+import java.util.Set;
+import java.util.function.Consumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionPurger;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.guardrails.Guardrails;
+import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.rows.ComplexColumnData;
+import org.apache.cassandra.db.rows.PartitionSerializationException;
+import org.apache.cassandra.db.rows.RangeTombstoneBoundMarker;
+import org.apache.cassandra.db.rows.RangeTombstoneBoundaryMarker;
+import org.apache.cassandra.db.rows.RangeTombstoneMarker;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.Rows;
+import org.apache.cassandra.db.rows.Unfiltered;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.compress.CompressedSequentialWriter;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.io.util.DataPosition;
+import org.apache.cassandra.io.util.FileHandle;
+import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.schema.TableMetadataRef;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.FilterFactory;
+import org.apache.cassandra.utils.IFilter;
+import org.apache.cassandra.utils.Throwables;
+import org.apache.cassandra.utils.concurrent.Transactional;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public abstract class SortedTableWriter<P extends SortedTablePartitionWriter,
RIE extends AbstractRowIndexEntry> extends SSTableWriter<RIE>
+{
+ private final static Logger logger =
LoggerFactory.getLogger(SortedTableWriter.class);
+
+ protected final SequentialWriter dataWriter;
+ protected final P partitionWriter;
+ private final FileHandle.Builder dataFileBuilder = new
FileHandle.Builder(descriptor.fileFor(Component.DATA));
+ private DecoratedKey lastWrittenKey;
+ private DataPosition dataMark;
+ private long lastEarlyOpenLength;
+
+ public SortedTableWriter(SortedTableWriterBuilder<RIE, P, ?, ?> builder,
LifecycleNewTracker lifecycleNewTracker)
+ {
+ super(builder, lifecycleNewTracker);
+ checkNotNull(builder.getDataWriter());
+ checkNotNull(builder.getPartitionWriter());
+
+ this.dataWriter = builder.getDataWriter();
+ this.partitionWriter = builder.getPartitionWriter();
+ }
+
+ /**
+ * Appends partition data to this writer.
+ *
+ * @param partition the partition to write
+ * @return the created index entry if something was written, that is if
{@code iterator}
+ * wasn't empty, {@code null} otherwise.
+ * @throws FSWriteError if write to the dataFile fails
+ */
+ @Override
+ public final RIE append(UnfilteredRowIterator partition)
+ {
+ if (partition.isEmpty())
+ return null;
+
+ try
+ {
+ if (!verifyPartition(partition.partitionKey()))
+ return null;
+
+ startPartition(partition.partitionKey(),
partition.partitionLevelDeletion());
+
+ RIE indexEntry;
+ if (header.hasStatic())
+ addStaticRow(partition.partitionKey(), partition.staticRow());
+
+ while (partition.hasNext())
+ addUnfiltered(partition.partitionKey(), partition.next());
+
+ indexEntry = endPartition(partition.partitionKey(),
partition.partitionLevelDeletion());
+
+ return indexEntry;
+ }
+ catch (BufferOverflowException boe)
+ {
+ throw new PartitionSerializationException(partition, boe);
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, getFilename());
+ }
+ }
+
+ private boolean verifyPartition(DecoratedKey key)
+ {
+ assert key != null : "Keys must not be null"; // empty keys ARE
allowed b/c of indexed column values
+
+ if (key.getKey().remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
+ {
+ logger.error("Key size {} exceeds maximum of {}, skipping row",
key.getKey().remaining(), FBUtilities.MAX_UNSIGNED_SHORT);
+ return false;
+ }
+
+ if (lastWrittenKey != null && lastWrittenKey.compareTo(key) >= 0)
+ throw new RuntimeException(String.format("Last written key %s >=
current key %s, writing into %s", lastWrittenKey, key, getFilename()));
+
+ return true;
+ }
+
+ private void startPartition(DecoratedKey key, DeletionTime
partitionLevelDeletion) throws IOException
+ {
+ partitionWriter.start(key, partitionLevelDeletion);
+ metadataCollector.update(partitionLevelDeletion);
+
+ onStartPartition(key);
+ }
+
+ private void addStaticRow(DecoratedKey key, Row row) throws IOException
+ {
+ guardCollectionSize(key, row);
+
+ partitionWriter.addStaticRow(row);
+ if (!row.isEmpty())
+ Rows.collectStats(row, metadataCollector);
+
+ onStaticRow(row);
+ }
+
+ private void addUnfiltered(DecoratedKey key, Unfiltered unfiltered) throws
IOException
+ {
+ if (unfiltered.isRow())
+ {
+ addRow(key, (Row) unfiltered);
+ }
+ else
+ {
+ assert unfiltered.isRangeTombstoneMarker();
+ addRangeTomstoneMarker((RangeTombstoneMarker) unfiltered);
+ }
+ }
+
+ private void addRow(DecoratedKey key, Row row) throws IOException
+ {
+ guardCollectionSize(key, row);
+
+ partitionWriter.addUnfiltered(row);
+ metadataCollector.updateClusteringValues(row.clustering());
+ Rows.collectStats(row, metadataCollector);
+
+ onRow(row);
+ }
+
+ private void addRangeTomstoneMarker(RangeTombstoneMarker marker) throws
IOException
+ {
+ partitionWriter.addUnfiltered(marker);
+
+ metadataCollector.updateClusteringValues(marker.clustering());
+ if (marker.isBoundary())
+ {
+ RangeTombstoneBoundaryMarker bm = (RangeTombstoneBoundaryMarker)
marker;
+ metadataCollector.update(bm.endDeletionTime());
+ metadataCollector.update(bm.startDeletionTime());
+ }
+ else
+ {
+ metadataCollector.update(((RangeTombstoneBoundMarker)
marker).deletionTime());
+ }
+
+ onRangeTombstoneMarker(marker);
+ }
+
+ private RIE endPartition(DecoratedKey key, DeletionTime
partitionLevelDeletion) throws IOException
+ {
+ long finishResult = partitionWriter.finish();
+
+ long endPosition = dataWriter.position();
+ long rowSize = endPosition - partitionWriter.getInitialPosition();
+ maybeLogLargePartitionWarning(key, rowSize);
+ maybeLogManyTombstonesWarning(key, metadataCollector.totalTombstones);
+ metadataCollector.addPartitionSizeInBytes(rowSize);
+ metadataCollector.addKey(key.getKey());
+ metadataCollector.addCellPerPartitionCount();
+
+ lastWrittenKey = key;
+ last = lastWrittenKey;
+ if (first == null)
+ first = lastWrittenKey;
+
+ if (logger.isTraceEnabled())
+ logger.trace("wrote {} at {}", key, endPosition);
+
+ return createRowIndexEntry(key, partitionLevelDeletion, finishResult);
+ }
+
+ protected void onStartPartition(DecoratedKey key)
+ {
+ notifyObservers(o -> o.startPartition(key,
partitionWriter.getInitialPosition(), partitionWriter.getInitialPosition()));
+ }
+
+ protected void onStaticRow(Row row)
+ {
+ notifyObservers(o -> o.staticRow(row));
+ }
+
+ protected void onRow(Row row)
+ {
+ notifyObservers(o -> o.nextUnfilteredCluster(row));
+ }
+
+ protected void onRangeTombstoneMarker(RangeTombstoneMarker marker)
+ {
+ notifyObservers(o -> o.nextUnfilteredCluster(marker));
+ }
+
+ protected abstract RIE createRowIndexEntry(DecoratedKey key, DeletionTime
partitionLevelDeletion, long finishResult) throws IOException;
+
+ protected final void notifyObservers(Consumer<SSTableFlushObserver> action)
+ {
+ if (observers != null && !observers.isEmpty())
+ observers.forEach(action);
+ }
+
+ @Override
+ public void mark()
+ {
+ dataMark = dataWriter.mark();
+ }
+
+ @Override
+ public void resetAndTruncate()
+ {
+ dataWriter.resetAndTruncate(dataMark);
+ partitionWriter.reset();
+ }
+
+ @Override
+ public long getFilePointer()
+ {
+ return dataWriter.position();
+ }
+
+ @Override
+ public long getOnDiskFilePointer()
+ {
+ return dataWriter.getOnDiskFilePointer();
+ }
+
+ @Override
+ public long getEstimatedOnDiskBytesWritten()
+ {
+ return dataWriter.getEstimatedOnDiskBytesWritten();
+ }
+
+ protected FileHandle openDataFile(long lengthOverride, StatsMetadata
statsMetadata)
+ {
+ int dataBufferSize =
ioOptions.diskOptimizationStrategy.bufferSize(statsMetadata.estimatedPartitionSize.percentile(ioOptions.diskOptimizationEstimatePercentile));
+
+ FileHandle dataFile =
dataFileBuilder.mmapped(ioOptions.defaultDiskAccessMode ==
Config.DiskAccessMode.mmap)
Review Comment:
This should be `mmapped(ioOptions.defaultDiskAccessMode)`, after which the
`mmapped(boolean)` version can be removed.
##########
src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java:
##########
@@ -103,4 +105,11 @@ public static Type validate(String name)
{
List<GaugeProvider<?, ?>> getGaugeProviders();
}
+
+ interface SSTableReaderFactory<R extends SSTableReader, B extends
SSTableReaderBuilder<R, B>>
+ {
+ SSTableReaderBuilder<R, B> builder(Descriptor descriptor);
+
+ SSTableReaderLoadingBuilder<R, B> builder(Descriptor descriptor,
TableMetadataRef tableMetadataRef, Set<Component> components);
Review Comment:
The difference between these two is very unclear. Could you add comments or
rename?
##########
src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java:
##########
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable.format;
+
+import java.io.IOException;
+import java.nio.BufferOverflowException;
+import java.util.Collection;
+import java.util.Set;
+import java.util.function.Consumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.rows.PartitionSerializationException;
+import org.apache.cassandra.db.rows.RangeTombstoneBoundMarker;
+import org.apache.cassandra.db.rows.RangeTombstoneBoundaryMarker;
+import org.apache.cassandra.db.rows.RangeTombstoneMarker;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.Rows;
+import org.apache.cassandra.db.rows.Unfiltered;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.util.DataPosition;
+import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.schema.TableMetadataRef;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.TimeUUID;
+
+public abstract class SortedTableWriter<P extends SortedTablePartitionWriter,
RIE extends AbstractRowIndexEntry> extends SSTableWriter<RIE>
+{
+ private final static Logger logger =
LoggerFactory.getLogger(SortedTableWriter.class);
+
+ protected SequentialWriter dataWriter;
+ protected P partitionWriter;
+ private DecoratedKey lastWrittenKey;
+ private DataPosition dataMark;
+
+ public SortedTableWriter(Descriptor descriptor,
+ long keyCount,
+ long repairedAt,
+ TimeUUID pendingRepair,
+ boolean isTransient,
+ TableMetadataRef metadata,
+ MetadataCollector metadataCollector,
+ SerializationHeader header,
+ Collection<SSTableFlushObserver> observers,
+ Set<Component> components)
+ {
+ super(descriptor, keyCount, repairedAt, pendingRepair, isTransient,
metadata, metadataCollector, header, observers, components);
+ }
+
+ /**
+ * Appends partition data to this writer.
+ *
+ * @param partition the partition to write
+ * @return the created index entry if something was written, that is if
{@code iterator}
+ * wasn't empty, {@code null} otherwise.
+ * @throws FSWriteError if write to the dataFile fails
+ */
+ @Override
+ public final RIE append(UnfilteredRowIterator partition)
Review Comment:
There's only one use of the returned index entry, and it does not require
the precise type. Can we make this return `AbstractRowIndexEntry` and do away
with the parameter?
##########
src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java:
##########
@@ -153,183 +153,228 @@ private CompressionParams compressionFor(final
OperationType opType)
public void mark()
{
- dataMark = dataFile.mark();
- iwriter.mark();
+ dataMark = dataWriter.mark();
+ indexWriter.mark();
}
public void resetAndTruncate()
{
- dataFile.resetAndTruncate(dataMark);
- iwriter.resetAndTruncate();
- }
-
- /**
- * Perform sanity checks on @param decoratedKey and @return the position
in the data file before any data is written
- */
- protected long beforeAppend(DecoratedKey decoratedKey)
- {
- assert decoratedKey != null : "Keys must not be null"; // empty keys
ARE allowed b/c of indexed column values
- if (lastWrittenKey != null && lastWrittenKey.compareTo(decoratedKey)
>= 0)
- throw new RuntimeException("Last written key " + lastWrittenKey +
" >= current key " + decoratedKey + " writing into " + getFilename());
- return (lastWrittenKey == null) ? 0 : dataFile.position();
- }
-
- private void afterAppend(DecoratedKey decoratedKey, long dataEnd,
RowIndexEntry index, ByteBuffer indexInfo) throws IOException
- {
- metadataCollector.addKey(decoratedKey.getKey());
- lastWrittenKey = decoratedKey;
- last = lastWrittenKey;
- if (first == null)
- first = lastWrittenKey;
-
- if (logger.isTraceEnabled())
- logger.trace("wrote {} at {}", decoratedKey, dataEnd);
- iwriter.append(decoratedKey, index, dataEnd, indexInfo);
+ dataWriter.resetAndTruncate(dataMark);
+ indexWriter.resetAndTruncate();
}
/**
* Appends partition data to this writer.
*
- * @param iterator the partition to write
+ * @param partition the partition to write
* @return the created index entry if something was written, that is if
{@code iterator}
* wasn't empty, {@code null} otherwise.
- *
- * @throws FSWriteError if a write to the dataFile fails
+ * @throws FSWriteError if write to the dataFile fails
*/
- public RowIndexEntry append(UnfilteredRowIterator iterator)
+ @Override
+ public final RowIndexEntry append(UnfilteredRowIterator partition)
{
- DecoratedKey key = iterator.partitionKey();
-
- if (key.getKey().remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
- {
- logger.error("Key size {} exceeds maximum of {}, skipping row",
key.getKey().remaining(), FBUtilities.MAX_UNSIGNED_SHORT);
- return null;
- }
-
- if (iterator.isEmpty())
+ if (partition.isEmpty())
return null;
- long startPosition = beforeAppend(key);
- observers.forEach((o) -> o.startPartition(key,
columnIndexWriter.getInitialPosition(), iwriter.indexFile.position()));
+ try
+ {
+ if (!verifyPartition(partition.partitionKey()))
+ return null;
- //Reuse the writer for each row
- columnIndexWriter.reset(DatabaseDescriptor.getColumnIndexCacheSize(),
DatabaseDescriptor.getColumnIndexSize());
+ startPartition(partition.partitionKey(),
partition.partitionLevelDeletion());
- try (UnfilteredRowIterator collecting = Transformation.apply(iterator,
new StatsCollector(metadataCollector)))
- {
- columnIndexWriter.buildRowIndex(collecting);
+ RowIndexEntry indexEntry;
+ if (header.hasStatic())
+ addStaticRow(partition.partitionKey(), partition.staticRow());
- // afterAppend() writes the partition key before the first
RowIndexEntry - so we have to add it's
- // serialized size to the index-writer position
- long indexFilePosition =
ByteBufferUtil.serializedSizeWithShortLength(key.getKey()) +
iwriter.indexFile.position();
+ while (partition.hasNext())
+ addUnfiltered(partition.partitionKey(), partition.next());
- RowIndexEntry entry = RowIndexEntry.create(startPosition,
indexFilePosition,
-
collecting.partitionLevelDeletion(),
-
columnIndexWriter.headerLength,
-
columnIndexWriter.columnIndexCount,
-
columnIndexWriter.indexInfoSerializedSize(),
-
columnIndexWriter.indexSamples(),
-
columnIndexWriter.offsets(),
-
rowIndexEntrySerializer.indexInfoSerializer());
+ indexEntry = endPartition(partition.partitionKey(),
partition.partitionLevelDeletion());
- long endPosition = dataFile.position();
- long rowSize = endPosition - startPosition;
- maybeLogLargePartitionWarning(key, rowSize);
- maybeLogManyTombstonesWarning(key,
metadataCollector.totalTombstones);
- metadataCollector.addPartitionSizeInBytes(rowSize);
- afterAppend(key, endPosition, entry, columnIndexWriter.buffer());
- return entry;
+ return indexEntry;
}
catch (BufferOverflowException boe)
{
- throw new PartitionSerializationException(iterator, boe);
+ throw new PartitionSerializationException(partition, boe);
}
catch (IOException e)
{
- throw new FSWriteError(e, dataFile.getPath());
+ throw new FSWriteError(e, getFilename());
}
}
- private void maybeLogLargePartitionWarning(DecoratedKey key, long rowSize)
+ private boolean verifyPartition(DecoratedKey key)
{
- if (rowSize >
DatabaseDescriptor.getCompactionLargePartitionWarningThreshold())
+ assert key != null : "Keys must not be null"; // empty keys ARE
allowed b/c of indexed column values
+
+ if (key.getKey().remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
{
- String keyString =
metadata().partitionKeyType.getString(key.getKey());
- logger.warn("Writing large partition {}/{}:{} ({}) to sstable {}",
metadata.keyspace, metadata.name, keyString,
FBUtilities.prettyPrintMemory(rowSize), getFilename());
+ logger.error("Key size {} exceeds maximum of {}, skipping row",
key.getKey().remaining(), FBUtilities.MAX_UNSIGNED_SHORT);
+ return false;
}
+
+ if (lastWrittenKey != null && lastWrittenKey.compareTo(key) >= 0)
+ throw new RuntimeException(String.format("Last written key %s >=
current key %s, writing into %s", lastWrittenKey, key, getFilename()));
+
+ return true;
}
- private void maybeLogManyTombstonesWarning(DecoratedKey key, int
tombstoneCount)
+ private void startPartition(DecoratedKey key, DeletionTime
partitionLevelDeletion) throws IOException
{
- if (tombstoneCount >
DatabaseDescriptor.getCompactionTombstoneWarningThreshold())
- {
- String keyString =
metadata().partitionKeyType.getString(key.getKey());
- logger.warn("Writing {} tombstones to {}/{}:{} in sstable {}",
tombstoneCount, metadata.keyspace, metadata.name, keyString, getFilename());
- }
+ partitionWriter.start(key, partitionLevelDeletion);
+ metadataCollector.update(partitionLevelDeletion);
+
+ onStartPartition(key);
}
- private static class StatsCollector extends Transformation
+ private void addStaticRow(DecoratedKey key, Row row) throws IOException
{
- private final MetadataCollector collector;
- private int cellCount;
+ guardCollectionSize(metadata(), key, row);
+
+ partitionWriter.addStaticRow(row);
+ if (!row.isEmpty())
+ Rows.collectStats(row, metadataCollector);
- StatsCollector(MetadataCollector collector)
+ onStaticRow(row);
+ }
+
+ private void addUnfiltered(DecoratedKey key, Unfiltered unfiltered) throws
IOException
+ {
+ if (unfiltered.isRow())
{
- this.collector = collector;
+ addRow(key, (Row) unfiltered);
}
-
- @Override
- public Row applyToStatic(Row row)
+ else
{
- if (!row.isEmpty())
- cellCount += Rows.collectStats(row, collector);
- return row;
+ assert unfiltered.isRangeTombstoneMarker();
Review Comment:
Nit: I would remove this -- the cast will check anyway, and this is a rather
hot path.
##########
src/java/org/apache/cassandra/config/CassandraRelevantProperties.java:
##########
@@ -331,7 +331,9 @@
*
* If only keyspaces are specified, mutations for all tables in such
keyspace will be replayed
* */
- COMMIT_LOG_REPLAY_LIST("cassandra.replayList", null)
+ COMMIT_LOG_REPLAY_LIST("cassandra.replayList", null),
+
+ SSTABLE_FORMAT_DEFAULT("cassandra.sstable.format.default", "BIG")
Review Comment:
Nit: I personally prefer the default to be given in context, i.e. in the
format class (which will make the new `getEnum` unnecessary).
##########
src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java:
##########
@@ -57,4 +58,6 @@
* Called when all data is written to the file and it's ready to be
finished up.
*/
void complete();
+
+ void staticRow(Row staticRow);
Review Comment:
This is best placed after `startPartition` and should have JavaDoc.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]