frankgh commented on code in PR #2824:
URL: https://github.com/apache/cassandra/pull/2824#discussion_r1365904037
##########
src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java:
##########
@@ -36,64 +42,132 @@
* means that rows should be added by increasing md5 of the row key. This is
* rarely possible and SSTableSimpleUnsortedWriter should most of the time be
* prefered.
+ * <p>
+ * Optionally, the writer can be configured with a max SSTable size for
SSTables.
+ * The output will be a series of SSTables that do not exceed a specified size.
+ * By default, all sorted data are written into a single SSTable.
*/
class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
{
+ private final long maxSSTableSize;
+
+ // Used to compute the row serialized size
+ private final SerializationHeader header;
+ private final SerializationHelper helper;
+
protected DecoratedKey currentKey;
protected PartitionUpdate.Builder update;
+ private long currentSize;
private SSTableTxnWriter writer;
- protected SSTableSimpleWriter(File directory, TableMetadataRef metadata,
RegularAndStaticColumns columns)
+ /**
+ * Create a SSTable writer for sorted input data.
+ * When a positive {@param maxSSTableSizeInMiB} is defined, the writer
outputs a sequence of SSTables,
+ * whose sizes do not exceed the specified value.
+ *
+ * @param directory directory to store the sstable files
+ * @param metadata table metadata
+ * @param columns columns to update
+ * @param maxSSTableSizeInMiB defines the max SSTable size if the value is
positive.
+ * Any non-positive value indicates the sstable
size is unlimited.
+ */
+ protected SSTableSimpleWriter(File directory, TableMetadataRef metadata,
RegularAndStaticColumns columns, long maxSSTableSizeInMiB)
{
super(directory, metadata, columns);
+ this.maxSSTableSize = maxSSTableSizeInMiB * 1024L * 1024L;
+ this.header = new SerializationHeader(true, metadata.get(), columns,
EncodingStats.NO_STATS);
+ this.helper = new SerializationHelper(this.header);
}
- private SSTableTxnWriter getOrCreateWriter() throws IOException
- {
- if (writer == null)
- writer = createWriter(null);
-
- return writer;
- }
-
+ @Override
PartitionUpdate.Builder getUpdateFor(DecoratedKey key) throws IOException
{
- assert key != null;
+ Preconditions.checkArgument(key != null, "Partition update cannot have
null key");
- // If that's not the current key, write the current one if necessary
and create a new
- // update for the new key.
- if (!key.equals(currentKey))
+ // update for the first partition or a new partition
+ if (update == null || !key.equals(currentKey))
{
+ // write the previous update if not absent
if (update != null)
- writePartition(update.build());
+ writePartition(update.build()); // might switch to a new
sstable writer and reset currentSize
+
+ // and create a new update for the new key
+ currentSize +=
PartitionUpdate.serializer.serializedSize(createUpdateBuilder(key).build(),
+
format.getLatestVersion().correspondingMessagingVersion());
currentKey = key;
- update = new PartitionUpdate.Builder(metadata.get(), currentKey,
columns, 4);
+ update = createSizeCountingPartitionUpdateBuilder(key,
this::countRow, () -> {});
}
- assert update != null;
+ Preconditions.checkState(update != null, "Partition update to write
cannot be null");
return update;
}
+ @Override
public void close()
+ {
+ writeLastPartitionUpdate(update);
+ closeWriterIfPresent(writer);
+ }
+
+ private boolean shouldSwitchToNewWriter()
+ {
+ return writer == null || (maxSSTableSize > 0 && currentSize >
maxSSTableSize);
+ }
+
+ private SSTableTxnWriter getOrSwitchWriter() throws IOException
+ {
+ if (shouldSwitchToNewWriter())
+ {
+ closeWriterIfPresent(writer);
+ writer = createWriter(null);
+ currentSize = 0;
+ }
+
+ return writer;
+ }
+
+ private void countRow(Row row)
+ {
+ // Note that the accounting of a row is a bit inaccurate (it doesn't
take some of the file format optimization into account).
+ // In particular, what we count is closer to the serialized value, but
it's debatable that it's the right thing
+ // to count since it will take a lot more space in memory.
+ currentSize += UnfilteredSerializer.serializer.serializedSize(row,
helper, 0,
+
format.getLatestVersion().correspondingMessagingVersion());
+ }
+
+ private void writeLastPartitionUpdate(PartitionUpdate.Builder update)
{
try
{
if (update != null)
writePartition(update.build());
+ }
+ catch (Throwable t)
+ {
+ Throwable e = writer == null ? t : writer.abort(t);
+ Throwables.throwIfUnchecked(e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void closeWriterIfPresent(SSTableTxnWriter writer)
+ {
+ try
+ {
if (writer != null)
writer.finish(false);
}
catch (Throwable t)
{
- Throwable e = writer == null ? t : writer.abort(t);
+ Throwable e = writer.abort(t);
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
}
private void writePartition(PartitionUpdate update) throws IOException
{
- getOrCreateWriter().append(update.unfilteredIterator());
+ getOrSwitchWriter().append(update.unfilteredIterator());
Review Comment:
I think the original method name maintains the desired semantics
`getOrCreateWriter` . The `create writer` portion is left for the
implementation to decide when to create a new writer
##########
src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java:
##########
@@ -36,64 +42,132 @@
* means that rows should be added by increasing md5 of the row key. This is
* rarely possible and SSTableSimpleUnsortedWriter should most of the time be
* prefered.
+ * <p>
+ * Optionally, the writer can be configured with a max SSTable size for
SSTables.
+ * The output will be a series of SSTables that do not exceed a specified size.
+ * By default, all sorted data are written into a single SSTable.
*/
class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
{
+ private final long maxSSTableSize;
+
+ // Used to compute the row serialized size
+ private final SerializationHeader header;
+ private final SerializationHelper helper;
+
protected DecoratedKey currentKey;
protected PartitionUpdate.Builder update;
+ private long currentSize;
private SSTableTxnWriter writer;
- protected SSTableSimpleWriter(File directory, TableMetadataRef metadata,
RegularAndStaticColumns columns)
+ /**
+ * Create a SSTable writer for sorted input data.
+ * When a positive {@param maxSSTableSizeInMiB} is defined, the writer
outputs a sequence of SSTables,
+ * whose sizes do not exceed the specified value.
+ *
+ * @param directory directory to store the sstable files
+ * @param metadata table metadata
+ * @param columns columns to update
+ * @param maxSSTableSizeInMiB defines the max SSTable size if the value is
positive.
+ * Any non-positive value indicates the sstable
size is unlimited.
+ */
+ protected SSTableSimpleWriter(File directory, TableMetadataRef metadata,
RegularAndStaticColumns columns, long maxSSTableSizeInMiB)
{
super(directory, metadata, columns);
+ this.maxSSTableSize = maxSSTableSizeInMiB * 1024L * 1024L;
+ this.header = new SerializationHeader(true, metadata.get(), columns,
EncodingStats.NO_STATS);
+ this.helper = new SerializationHelper(this.header);
}
- private SSTableTxnWriter getOrCreateWriter() throws IOException
- {
- if (writer == null)
- writer = createWriter(null);
-
- return writer;
- }
-
+ @Override
PartitionUpdate.Builder getUpdateFor(DecoratedKey key) throws IOException
{
- assert key != null;
+ Preconditions.checkArgument(key != null, "Partition update cannot have
null key");
- // If that's not the current key, write the current one if necessary
and create a new
- // update for the new key.
- if (!key.equals(currentKey))
+ // update for the first partition or a new partition
+ if (update == null || !key.equals(currentKey))
{
+ // write the previous update if not absent
if (update != null)
- writePartition(update.build());
+ writePartition(update.build()); // might switch to a new
sstable writer and reset currentSize
+
+ // and create a new update for the new key
+ currentSize +=
PartitionUpdate.serializer.serializedSize(createUpdateBuilder(key).build(),
+
format.getLatestVersion().correspondingMessagingVersion());
currentKey = key;
- update = new PartitionUpdate.Builder(metadata.get(), currentKey,
columns, 4);
+ update = createSizeCountingPartitionUpdateBuilder(key,
this::countRow, () -> {});
}
- assert update != null;
+ Preconditions.checkState(update != null, "Partition update to write
cannot be null");
return update;
}
+ @Override
public void close()
+ {
+ writeLastPartitionUpdate(update);
+ closeWriterIfPresent(writer);
+ }
+
+ private boolean shouldSwitchToNewWriter()
+ {
+ return writer == null || (maxSSTableSize > 0 && currentSize >
maxSSTableSize);
+ }
+
+ private SSTableTxnWriter getOrSwitchWriter() throws IOException
+ {
+ if (shouldSwitchToNewWriter())
+ {
+ closeWriterIfPresent(writer);
+ writer = createWriter(null);
+ currentSize = 0;
+ }
+
+ return writer;
+ }
+
+ private void countRow(Row row)
+ {
+ // Note that the accounting of a row is a bit inaccurate (it doesn't
take some of the file format optimization into account).
+ // In particular, what we count is closer to the serialized value, but
it's debatable that it's the right thing
+ // to count since it will take a lot more space in memory.
+ currentSize += UnfilteredSerializer.serializer.serializedSize(row,
helper, 0,
+
format.getLatestVersion().correspondingMessagingVersion());
+ }
+
+ private void writeLastPartitionUpdate(PartitionUpdate.Builder update)
{
try
{
if (update != null)
writePartition(update.build());
+ }
+ catch (Throwable t)
+ {
+ Throwable e = writer == null ? t : writer.abort(t);
+ Throwables.throwIfUnchecked(e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void closeWriterIfPresent(SSTableTxnWriter writer)
Review Comment:
NIT for a more succinct method name
```suggestion
private void maybeCloseWriter(SSTableTxnWriter writer)
```
##########
test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java:
##########
@@ -1233,6 +1235,77 @@ public void testWriteWithTimestampsAndTtl() throws
Exception
assertFalse(iter.hasNext());
}
+ @Test
+ public void testWriteWithSorted() throws Exception
+ {
+ String schema = "CREATE TABLE " + qualifiedTable + " ("
+ + " k int PRIMARY KEY,"
+ + " v blob )";
+ CQLSSTableWriter writer = CQLSSTableWriter.builder()
+ .inDirectory(dataDir)
+ .forTable(schema)
+ .using("INSERT INTO " +
qualifiedTable +
+ " (k, v) VALUES
(?,text_as_blob(?))" )
+ .sorted()
+ .build();
+ int rowCount = 10_000;
+ for (int i = 0; i < rowCount; i++)
+ {
+ writer.addRow(i, UUID.randomUUID().toString());
+ }
+ writer.close();
+ loadSSTables(dataDir, keyspace);
+
+ UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT *
FROM " + qualifiedTable);
+ assertEquals(rowCount, resultSet.size());
+ Iterator<UntypedResultSet.Row> iter = resultSet.iterator();
+ for (int i = 0; i < rowCount; i++)
+ {
+ UntypedResultSet.Row row = iter.next();
+ assertEquals(i, row.getInt("k"));
+ }
+ }
+
+ @Test
+ public void testWriteWithSortedAndMaxSize() throws Exception
+ {
+ String schema = "CREATE TABLE " + qualifiedTable + " ("
+ + " k int PRIMARY KEY,"
+ + " v blob )";
+ CQLSSTableWriter writer = CQLSSTableWriter.builder()
+ .inDirectory(dataDir)
+ .forTable(schema)
+ .using("INSERT INTO " +
qualifiedTable +
+ " (k, v) VALUES
(?,text_as_blob(?))" )
+ .sorted()
+
.withMaxSSTableSizeInMiBForSorted(1)
+ .build();
+ int rowCount = 30_000;
+ // Max SSTable size is 1 MiB
+ // 30_000 rows should take 30_000 * (4 + 37) = 1.17 MiB > 1 MiB
+ for (int i = 0; i < rowCount; i++)
+ {
+ writer.addRow(i, UUID.randomUUID().toString());
+ }
+ writer.close();
+
+ File[] dataFiles = dataDir.list(f ->
f.name().endsWith(BigFormat.Components.DATA.type.repr));
Review Comment:
I ran this test and I am seeing 2 files which is correct. However, both
files are a little over 600K , and I was expecting to see a file closer to 1MiB
and the other one taking a few tens of bytes.
##########
src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java:
##########
@@ -554,6 +555,18 @@ public Builder sorted()
return this;
}
+ /**
+ * Defines the maximum SSTable size when using the sorted writer.
+ * By default, i.e. not specified, there is no maximum size limit for
the produced SSTable
+ * @param size the maximum size of each individual SSTable allowed
Review Comment:
```suggestion
* @param size the maximum size in mebibytes of each individual
SSTable allowed
```
##########
src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java:
##########
@@ -36,64 +42,132 @@
* means that rows should be added by increasing md5 of the row key. This is
* rarely possible and SSTableSimpleUnsortedWriter should most of the time be
* prefered.
+ * <p>
+ * Optionally, the writer can be configured with a max SSTable size for
SSTables.
+ * The output will be a series of SSTables that do not exceed a specified size.
+ * By default, all sorted data are written into a single SSTable.
*/
class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
{
+ private final long maxSSTableSize;
Review Comment:
can we either add a comment that mentions the units for the size, or better
yet rename the variable to indicate it measures in bytes?
```suggestion
private final long maxSSTableSizeInBytes;
```
##########
src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java:
##########
@@ -36,64 +42,132 @@
* means that rows should be added by increasing md5 of the row key. This is
* rarely possible and SSTableSimpleUnsortedWriter should most of the time be
* prefered.
+ * <p>
+ * Optionally, the writer can be configured with a max SSTable size for
SSTables.
+ * The output will be a series of SSTables that do not exceed a specified size.
+ * By default, all sorted data are written into a single SSTable.
*/
class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
{
+ private final long maxSSTableSize;
+
+ // Used to compute the row serialized size
+ private final SerializationHeader header;
+ private final SerializationHelper helper;
+
protected DecoratedKey currentKey;
protected PartitionUpdate.Builder update;
+ private long currentSize;
private SSTableTxnWriter writer;
- protected SSTableSimpleWriter(File directory, TableMetadataRef metadata,
RegularAndStaticColumns columns)
+ /**
+ * Create a SSTable writer for sorted input data.
+ * When a positive {@param maxSSTableSizeInMiB} is defined, the writer
outputs a sequence of SSTables,
+ * whose sizes do not exceed the specified value.
+ *
+ * @param directory directory to store the sstable files
+ * @param metadata table metadata
+ * @param columns columns to update
+ * @param maxSSTableSizeInMiB defines the max SSTable size if the value is
positive.
+ * Any non-positive value indicates the sstable
size is unlimited.
+ */
+ protected SSTableSimpleWriter(File directory, TableMetadataRef metadata,
RegularAndStaticColumns columns, long maxSSTableSizeInMiB)
{
super(directory, metadata, columns);
+ this.maxSSTableSize = maxSSTableSizeInMiB * 1024L * 1024L;
+ this.header = new SerializationHeader(true, metadata.get(), columns,
EncodingStats.NO_STATS);
+ this.helper = new SerializationHelper(this.header);
}
- private SSTableTxnWriter getOrCreateWriter() throws IOException
- {
- if (writer == null)
- writer = createWriter(null);
-
- return writer;
- }
-
+ @Override
PartitionUpdate.Builder getUpdateFor(DecoratedKey key) throws IOException
{
- assert key != null;
+ Preconditions.checkArgument(key != null, "Partition update cannot have
null key");
Review Comment:
even though I like this change, we need to be mindful that this will have a
performance difference if assertions are disabled. So before committing to this
change, I think it'll be useful to consider whether the original assertion was
desired as a way to have different performance characteristics depending on
whether assertions are enabled or not. This change could have a noticeable
impact to existing consumers of this class.
##########
src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java:
##########
@@ -36,64 +42,132 @@
* means that rows should be added by increasing md5 of the row key. This is
* rarely possible and SSTableSimpleUnsortedWriter should most of the time be
* prefered.
+ * <p>
+ * Optionally, the writer can be configured with a max SSTable size for
SSTables.
+ * The output will be a series of SSTables that do not exceed a specified size.
+ * By default, all sorted data are written into a single SSTable.
*/
class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
{
+ private final long maxSSTableSize;
+
+ // Used to compute the row serialized size
+ private final SerializationHeader header;
+ private final SerializationHelper helper;
+
protected DecoratedKey currentKey;
protected PartitionUpdate.Builder update;
+ private long currentSize;
private SSTableTxnWriter writer;
- protected SSTableSimpleWriter(File directory, TableMetadataRef metadata,
RegularAndStaticColumns columns)
+ /**
+ * Create a SSTable writer for sorted input data.
+ * When a positive {@param maxSSTableSizeInMiB} is defined, the writer
outputs a sequence of SSTables,
+ * whose sizes do not exceed the specified value.
+ *
+ * @param directory directory to store the sstable files
+ * @param metadata table metadata
+ * @param columns columns to update
+ * @param maxSSTableSizeInMiB defines the max SSTable size if the value is
positive.
+ * Any non-positive value indicates the sstable
size is unlimited.
+ */
+ protected SSTableSimpleWriter(File directory, TableMetadataRef metadata,
RegularAndStaticColumns columns, long maxSSTableSizeInMiB)
{
super(directory, metadata, columns);
+ this.maxSSTableSize = maxSSTableSizeInMiB * 1024L * 1024L;
+ this.header = new SerializationHeader(true, metadata.get(), columns,
EncodingStats.NO_STATS);
+ this.helper = new SerializationHelper(this.header);
}
- private SSTableTxnWriter getOrCreateWriter() throws IOException
- {
- if (writer == null)
- writer = createWriter(null);
-
- return writer;
- }
-
+ @Override
PartitionUpdate.Builder getUpdateFor(DecoratedKey key) throws IOException
{
- assert key != null;
+ Preconditions.checkArgument(key != null, "Partition update cannot have
null key");
- // If that's not the current key, write the current one if necessary
and create a new
- // update for the new key.
- if (!key.equals(currentKey))
+ // update for the first partition or a new partition
+ if (update == null || !key.equals(currentKey))
{
+ // write the previous update if not absent
if (update != null)
- writePartition(update.build());
+ writePartition(update.build()); // might switch to a new
sstable writer and reset currentSize
+
+ // and create a new update for the new key
+ currentSize +=
PartitionUpdate.serializer.serializedSize(createUpdateBuilder(key).build(),
+
format.getLatestVersion().correspondingMessagingVersion());
currentKey = key;
- update = new PartitionUpdate.Builder(metadata.get(), currentKey,
columns, 4);
+ update = createSizeCountingPartitionUpdateBuilder(key,
this::countRow, () -> {});
}
- assert update != null;
+ Preconditions.checkState(update != null, "Partition update to write
cannot be null");
Review Comment:
same comment as above.
##########
src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java:
##########
@@ -110,6 +112,39 @@ private static SSTableId getNextId(File directory, final
String columnFamily) th
}
}
+ /**
+ * Returns the partition update builder for the given key
+ * @param key the partition key of the partition update
+ * @return an update on partition {@code key} that is tied to this writer
+ */
+ protected PartitionUpdate.Builder createUpdateBuilder(DecoratedKey key)
+ {
+ return createSizeCountingPartitionUpdateBuilder(key, row -> {}, () ->
{});
+ }
+
+ /**
+ * Returns the partition update builder that counts row sizes for the
given key.
+ * @param key the partition key of the partition update
+ * @param onRow consumes the row to count the size
+ * @param onRowAdded runnale that runs after the row is added
+ * @return an update on partition {@code key} that is tied to this writer
+ */
+ protected PartitionUpdate.Builder
createSizeCountingPartitionUpdateBuilder(DecoratedKey key,
+
Consumer<Row> onRow,
+
Runnable onRowAdded)
Review Comment:
to me it feels that all the logic can be encapsulated in the consumer
lambda, and renders the runnable unnecessary. For the only calling site where
we actually use the Runnable parameter I would refactor it as:
```
previous = createSizeCountingPartitionUpdateBuilder(key, row -> {
countRow(row);
maybeSync();
});
```
```suggestion
protected PartitionUpdate.Builder
createSizeCountingPartitionUpdateBuilder(DecoratedKey key, Consumer<Row> onRow)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]