yifan-c commented on code in PR #2824:
URL: https://github.com/apache/cassandra/pull/2824#discussion_r1365961484
##########
src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java:
##########
@@ -36,64 +42,132 @@
* means that rows should be added by increasing md5 of the row key. This is
* rarely possible and SSTableSimpleUnsortedWriter should most of the time be
* prefered.
+ * <p>
+ * Optionally, the writer can be configured with a max SSTable size for
SSTables.
+ * The output will be a series of SSTables that do not exceed a specified size.
+ * By default, all sorted data are written into a single SSTable.
*/
class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
{
+ private final long maxSSTableSize;
+
+ // Used to compute the row serialized size
+ private final SerializationHeader header;
+ private final SerializationHelper helper;
+
protected DecoratedKey currentKey;
protected PartitionUpdate.Builder update;
+ private long currentSize;
private SSTableTxnWriter writer;
- protected SSTableSimpleWriter(File directory, TableMetadataRef metadata,
RegularAndStaticColumns columns)
+ /**
+ * Create a SSTable writer for sorted input data.
+ * When a positive {@param maxSSTableSizeInMiB} is defined, the writer
outputs a sequence of SSTables,
+ * whose sizes do not exceed the specified value.
+ *
+ * @param directory directory to store the sstable files
+ * @param metadata table metadata
+ * @param columns columns to update
+ * @param maxSSTableSizeInMiB defines the max SSTable size if the value is
positive.
+ * Any non-positive value indicates the sstable
size is unlimited.
+ */
+ protected SSTableSimpleWriter(File directory, TableMetadataRef metadata,
RegularAndStaticColumns columns, long maxSSTableSizeInMiB)
{
super(directory, metadata, columns);
+ this.maxSSTableSize = maxSSTableSizeInMiB * 1024L * 1024L;
+ this.header = new SerializationHeader(true, metadata.get(), columns,
EncodingStats.NO_STATS);
+ this.helper = new SerializationHelper(this.header);
}
- private SSTableTxnWriter getOrCreateWriter() throws IOException
- {
- if (writer == null)
- writer = createWriter(null);
-
- return writer;
- }
-
+ @Override
PartitionUpdate.Builder getUpdateFor(DecoratedKey key) throws IOException
{
- assert key != null;
+ Preconditions.checkArgument(key != null, "Partition update cannot have
null key");
- // If that's not the current key, write the current one if necessary
and create a new
- // update for the new key.
- if (!key.equals(currentKey))
+ // update for the first partition or a new partition
+ if (update == null || !key.equals(currentKey))
{
+ // write the previous update if not absent
if (update != null)
- writePartition(update.build());
+ writePartition(update.build()); // might switch to a new
sstable writer and reset currentSize
+
+ // and create a new update for the new key
+ currentSize +=
PartitionUpdate.serializer.serializedSize(createUpdateBuilder(key).build(),
+
format.getLatestVersion().correspondingMessagingVersion());
currentKey = key;
- update = new PartitionUpdate.Builder(metadata.get(), currentKey,
columns, 4);
+ update = createSizeCountingPartitionUpdateBuilder(key,
this::countRow, () -> {});
}
- assert update != null;
+ Preconditions.checkState(update != null, "Partition update to write
cannot be null");
return update;
}
+ @Override
public void close()
+ {
+ writeLastPartitionUpdate(update);
+ closeWriterIfPresent(writer);
+ }
+
+ private boolean shouldSwitchToNewWriter()
+ {
+ return writer == null || (maxSSTableSize > 0 && currentSize >
maxSSTableSize);
+ }
+
+ private SSTableTxnWriter getOrSwitchWriter() throws IOException
+ {
+ if (shouldSwitchToNewWriter())
+ {
+ closeWriterIfPresent(writer);
+ writer = createWriter(null);
+ currentSize = 0;
+ }
+
+ return writer;
+ }
+
+ private void countRow(Row row)
+ {
+ // Note that the accounting of a row is a bit inaccurate (it doesn't
take some of the file format optimization into account).
+ // In particular, what we count is closer to the serialized value, but
it's debatable that it's the right thing
+ // to count since it will take a lot more space in memory.
+ currentSize += UnfilteredSerializer.serializer.serializedSize(row,
helper, 0,
+
format.getLatestVersion().correspondingMessagingVersion());
+ }
+
+ private void writeLastPartitionUpdate(PartitionUpdate.Builder update)
{
try
{
if (update != null)
writePartition(update.build());
+ }
+ catch (Throwable t)
+ {
+ Throwable e = writer == null ? t : writer.abort(t);
+ Throwables.throwIfUnchecked(e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void closeWriterIfPresent(SSTableTxnWriter writer)
+ {
+ try
+ {
if (writer != null)
writer.finish(false);
}
catch (Throwable t)
{
- Throwable e = writer == null ? t : writer.abort(t);
+ Throwable e = writer.abort(t);
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
}
private void writePartition(PartitionUpdate update) throws IOException
{
- getOrCreateWriter().append(update.unfilteredIterator());
+ getOrSwitchWriter().append(update.unfilteredIterator());
Review Comment:
fair.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]