blambov commented on code in PR #2064:
URL: https://github.com/apache/cassandra/pull/2064#discussion_r1081348044
##########
src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java:
##########
@@ -57,4 +58,6 @@
* Called when all data is written to the file and it's ready to be
finished up.
*/
void complete();
+
+ void staticRow(Row staticRow);
Review Comment:
looks good
##########
src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java:
##########
@@ -153,183 +153,228 @@ private CompressionParams compressionFor(final
OperationType opType)
public void mark()
{
- dataMark = dataFile.mark();
- iwriter.mark();
+ dataMark = dataWriter.mark();
+ indexWriter.mark();
}
public void resetAndTruncate()
{
- dataFile.resetAndTruncate(dataMark);
- iwriter.resetAndTruncate();
- }
-
- /**
- * Perform sanity checks on @param decoratedKey and @return the position
in the data file before any data is written
- */
- protected long beforeAppend(DecoratedKey decoratedKey)
- {
- assert decoratedKey != null : "Keys must not be null"; // empty keys
ARE allowed b/c of indexed column values
- if (lastWrittenKey != null && lastWrittenKey.compareTo(decoratedKey)
>= 0)
- throw new RuntimeException("Last written key " + lastWrittenKey +
" >= current key " + decoratedKey + " writing into " + getFilename());
- return (lastWrittenKey == null) ? 0 : dataFile.position();
- }
-
- private void afterAppend(DecoratedKey decoratedKey, long dataEnd,
RowIndexEntry index, ByteBuffer indexInfo) throws IOException
- {
- metadataCollector.addKey(decoratedKey.getKey());
- lastWrittenKey = decoratedKey;
- last = lastWrittenKey;
- if (first == null)
- first = lastWrittenKey;
-
- if (logger.isTraceEnabled())
- logger.trace("wrote {} at {}", decoratedKey, dataEnd);
- iwriter.append(decoratedKey, index, dataEnd, indexInfo);
+ dataWriter.resetAndTruncate(dataMark);
+ indexWriter.resetAndTruncate();
}
/**
* Appends partition data to this writer.
*
- * @param iterator the partition to write
+ * @param partition the partition to write
* @return the created index entry if something was written, that is if
{@code iterator}
* wasn't empty, {@code null} otherwise.
- *
- * @throws FSWriteError if a write to the dataFile fails
+ * @throws FSWriteError if write to the dataFile fails
*/
- public RowIndexEntry append(UnfilteredRowIterator iterator)
+ @Override
+ public final RowIndexEntry append(UnfilteredRowIterator partition)
{
- DecoratedKey key = iterator.partitionKey();
-
- if (key.getKey().remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
- {
- logger.error("Key size {} exceeds maximum of {}, skipping row",
key.getKey().remaining(), FBUtilities.MAX_UNSIGNED_SHORT);
- return null;
- }
-
- if (iterator.isEmpty())
+ if (partition.isEmpty())
return null;
- long startPosition = beforeAppend(key);
- observers.forEach((o) -> o.startPartition(key,
columnIndexWriter.getInitialPosition(), iwriter.indexFile.position()));
+ try
+ {
+ if (!verifyPartition(partition.partitionKey()))
+ return null;
- //Reuse the writer for each row
- columnIndexWriter.reset(DatabaseDescriptor.getColumnIndexCacheSize(),
DatabaseDescriptor.getColumnIndexSize());
+ startPartition(partition.partitionKey(),
partition.partitionLevelDeletion());
- try (UnfilteredRowIterator collecting = Transformation.apply(iterator,
new StatsCollector(metadataCollector)))
- {
- columnIndexWriter.buildRowIndex(collecting);
+ RowIndexEntry indexEntry;
+ if (header.hasStatic())
+ addStaticRow(partition.partitionKey(), partition.staticRow());
- // afterAppend() writes the partition key before the first
RowIndexEntry - so we have to add it's
- // serialized size to the index-writer position
- long indexFilePosition =
ByteBufferUtil.serializedSizeWithShortLength(key.getKey()) +
iwriter.indexFile.position();
+ while (partition.hasNext())
+ addUnfiltered(partition.partitionKey(), partition.next());
- RowIndexEntry entry = RowIndexEntry.create(startPosition,
indexFilePosition,
-
collecting.partitionLevelDeletion(),
-
columnIndexWriter.headerLength,
-
columnIndexWriter.columnIndexCount,
-
columnIndexWriter.indexInfoSerializedSize(),
-
columnIndexWriter.indexSamples(),
-
columnIndexWriter.offsets(),
-
rowIndexEntrySerializer.indexInfoSerializer());
+ indexEntry = endPartition(partition.partitionKey(),
partition.partitionLevelDeletion());
- long endPosition = dataFile.position();
- long rowSize = endPosition - startPosition;
- maybeLogLargePartitionWarning(key, rowSize);
- maybeLogManyTombstonesWarning(key,
metadataCollector.totalTombstones);
- metadataCollector.addPartitionSizeInBytes(rowSize);
- afterAppend(key, endPosition, entry, columnIndexWriter.buffer());
- return entry;
+ return indexEntry;
}
catch (BufferOverflowException boe)
{
- throw new PartitionSerializationException(iterator, boe);
+ throw new PartitionSerializationException(partition, boe);
}
catch (IOException e)
{
- throw new FSWriteError(e, dataFile.getPath());
+ throw new FSWriteError(e, getFilename());
}
}
- private void maybeLogLargePartitionWarning(DecoratedKey key, long rowSize)
+ private boolean verifyPartition(DecoratedKey key)
{
- if (rowSize >
DatabaseDescriptor.getCompactionLargePartitionWarningThreshold())
+ assert key != null : "Keys must not be null"; // empty keys ARE
allowed b/c of indexed column values
+
+ if (key.getKey().remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
{
- String keyString =
metadata().partitionKeyType.getString(key.getKey());
- logger.warn("Writing large partition {}/{}:{} ({}) to sstable {}",
metadata.keyspace, metadata.name, keyString,
FBUtilities.prettyPrintMemory(rowSize), getFilename());
+ logger.error("Key size {} exceeds maximum of {}, skipping row",
key.getKey().remaining(), FBUtilities.MAX_UNSIGNED_SHORT);
+ return false;
}
+
+ if (lastWrittenKey != null && lastWrittenKey.compareTo(key) >= 0)
+ throw new RuntimeException(String.format("Last written key %s >=
current key %s, writing into %s", lastWrittenKey, key, getFilename()));
+
+ return true;
}
- private void maybeLogManyTombstonesWarning(DecoratedKey key, int
tombstoneCount)
+ private void startPartition(DecoratedKey key, DeletionTime
partitionLevelDeletion) throws IOException
{
- if (tombstoneCount >
DatabaseDescriptor.getCompactionTombstoneWarningThreshold())
- {
- String keyString =
metadata().partitionKeyType.getString(key.getKey());
- logger.warn("Writing {} tombstones to {}/{}:{} in sstable {}",
tombstoneCount, metadata.keyspace, metadata.name, keyString, getFilename());
- }
+ partitionWriter.start(key, partitionLevelDeletion);
+ metadataCollector.update(partitionLevelDeletion);
+
+ onStartPartition(key);
}
- private static class StatsCollector extends Transformation
+ private void addStaticRow(DecoratedKey key, Row row) throws IOException
{
- private final MetadataCollector collector;
- private int cellCount;
+ guardCollectionSize(metadata(), key, row);
+
+ partitionWriter.addStaticRow(row);
+ if (!row.isEmpty())
+ Rows.collectStats(row, metadataCollector);
- StatsCollector(MetadataCollector collector)
+ onStaticRow(row);
+ }
+
+ private void addUnfiltered(DecoratedKey key, Unfiltered unfiltered) throws
IOException
+ {
+ if (unfiltered.isRow())
{
- this.collector = collector;
+ addRow(key, (Row) unfiltered);
}
-
- @Override
- public Row applyToStatic(Row row)
+ else
{
- if (!row.isEmpty())
- cellCount += Rows.collectStats(row, collector);
- return row;
+ assert unfiltered.isRangeTombstoneMarker();
Review Comment:
looks good
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]