frankgh commented on code in PR #2824:
URL: https://github.com/apache/cassandra/pull/2824#discussion_r1365904037


##########
src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java:
##########
@@ -36,64 +42,132 @@
  * means that rows should be added by increasing md5 of the row key. This is
  * rarely possible and SSTableSimpleUnsortedWriter should most of the time be
  * prefered.
+ * <p>
+ * Optionally, the writer can be configured with a max SSTable size for 
SSTables.
+ * The output will be a series of SSTables that do not exceed a specified size.
+ * By default, all sorted data are written into a single SSTable.
  */
 class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
 {
+    private final long maxSSTableSize;
+
+    // Used to compute the row serialized size
+    private final SerializationHeader header;
+    private final SerializationHelper helper;
+
     protected DecoratedKey currentKey;
     protected PartitionUpdate.Builder update;
 
+    private long currentSize;
     private SSTableTxnWriter writer;
 
-    protected SSTableSimpleWriter(File directory, TableMetadataRef metadata, 
RegularAndStaticColumns columns)
+    /**
+     * Create a SSTable writer for sorted input data.
+     * When a positive {@param maxSSTableSizeInMiB} is defined, the writer 
outputs a sequence of SSTables,
+     * whose sizes do not exceed the specified value.
+     *
+     * @param directory directory to store the sstable files
+     * @param metadata table metadata
+     * @param columns columns to update
+     * @param maxSSTableSizeInMiB defines the max SSTable size if the value is 
positive.
+     *                            Any non-positive value indicates the sstable 
size is unlimited.
+     */
+    protected SSTableSimpleWriter(File directory, TableMetadataRef metadata, 
RegularAndStaticColumns columns, long maxSSTableSizeInMiB)
     {
         super(directory, metadata, columns);
+        this.maxSSTableSize = maxSSTableSizeInMiB * 1024L * 1024L;
+        this.header = new SerializationHeader(true, metadata.get(), columns, 
EncodingStats.NO_STATS);
+        this.helper = new SerializationHelper(this.header);
     }
 
-    private SSTableTxnWriter getOrCreateWriter() throws IOException
-    {
-        if (writer == null)
-            writer = createWriter(null);
-
-        return writer;
-    }
-
+    @Override
     PartitionUpdate.Builder getUpdateFor(DecoratedKey key) throws IOException
     {
-        assert key != null;
+        Preconditions.checkArgument(key != null, "Partition update cannot have 
null key");
 
-        // If that's not the current key, write the current one if necessary 
and create a new
-        // update for the new key.
-        if (!key.equals(currentKey))
+        // update for the first partition or a new partition
+        if (update == null || !key.equals(currentKey))
         {
+            // write the previous update if not absent
             if (update != null)
-                writePartition(update.build());
+                writePartition(update.build()); // might switch to a new 
sstable writer and reset currentSize
+
+            // and create a new update for the new key
+            currentSize += 
PartitionUpdate.serializer.serializedSize(createUpdateBuilder(key).build(),
+                                                                     
format.getLatestVersion().correspondingMessagingVersion());
             currentKey = key;
-            update = new PartitionUpdate.Builder(metadata.get(), currentKey, 
columns, 4);
+            update = createSizeCountingPartitionUpdateBuilder(key, 
this::countRow, () -> {});
         }
 
-        assert update != null;
+        Preconditions.checkState(update != null, "Partition update to write 
cannot be null");
         return update;
     }
 
+    @Override
     public void close()
+    {
+        writeLastPartitionUpdate(update);
+        closeWriterIfPresent(writer);
+    }
+
+    private boolean shouldSwitchToNewWriter()
+    {
+        return writer == null || (maxSSTableSize > 0 && currentSize > 
maxSSTableSize);
+    }
+
+    private SSTableTxnWriter getOrSwitchWriter() throws IOException
+    {
+        if (shouldSwitchToNewWriter())
+        {
+            closeWriterIfPresent(writer);
+            writer = createWriter(null);
+            currentSize = 0;
+        }
+
+        return writer;
+    }
+
+    private void countRow(Row row)
+    {
+        // Note that the accounting of a row is a bit inaccurate (it doesn't 
take some of the file format optimization into account).
+        // In particular, what we count is closer to the serialized value, but 
it's debatable that it's the right thing
+        // to count since it will take a lot more space in memory.
+        currentSize += UnfilteredSerializer.serializer.serializedSize(row, 
helper, 0,
+                                                                      
format.getLatestVersion().correspondingMessagingVersion());
+    }
+
+    private void writeLastPartitionUpdate(PartitionUpdate.Builder update)
     {
         try
         {
             if (update != null)
                 writePartition(update.build());
+        }
+        catch (Throwable t)
+        {
+            Throwable e = writer == null ? t : writer.abort(t);
+            Throwables.throwIfUnchecked(e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void closeWriterIfPresent(SSTableTxnWriter writer)
+    {
+        try
+        {
             if (writer != null)
                 writer.finish(false);
         }
         catch (Throwable t)
         {
-            Throwable e = writer == null ? t : writer.abort(t);
+            Throwable e = writer.abort(t);
             Throwables.throwIfUnchecked(e);
             throw new RuntimeException(e);
         }
     }
 
     private void writePartition(PartitionUpdate update) throws IOException
     {
-        getOrCreateWriter().append(update.unfilteredIterator());
+        getOrSwitchWriter().append(update.unfilteredIterator());

Review Comment:
   I think the original method name maintains the desired semantics 
`getOrCreateWriter` . The `create writer` portion is left for the 
implementation to decide when to create a new writer



##########
src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java:
##########
@@ -36,64 +42,132 @@
  * means that rows should be added by increasing md5 of the row key. This is
  * rarely possible and SSTableSimpleUnsortedWriter should most of the time be
  * prefered.
+ * <p>
+ * Optionally, the writer can be configured with a max SSTable size for 
SSTables.
+ * The output will be a series of SSTables that do not exceed a specified size.
+ * By default, all sorted data are written into a single SSTable.
  */
 class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
 {
+    private final long maxSSTableSize;
+
+    // Used to compute the row serialized size
+    private final SerializationHeader header;
+    private final SerializationHelper helper;
+
     protected DecoratedKey currentKey;
     protected PartitionUpdate.Builder update;
 
+    private long currentSize;
     private SSTableTxnWriter writer;
 
-    protected SSTableSimpleWriter(File directory, TableMetadataRef metadata, 
RegularAndStaticColumns columns)
+    /**
+     * Create a SSTable writer for sorted input data.
+     * When a positive {@param maxSSTableSizeInMiB} is defined, the writer 
outputs a sequence of SSTables,
+     * whose sizes do not exceed the specified value.
+     *
+     * @param directory directory to store the sstable files
+     * @param metadata table metadata
+     * @param columns columns to update
+     * @param maxSSTableSizeInMiB defines the max SSTable size if the value is 
positive.
+     *                            Any non-positive value indicates the sstable 
size is unlimited.
+     */
+    protected SSTableSimpleWriter(File directory, TableMetadataRef metadata, 
RegularAndStaticColumns columns, long maxSSTableSizeInMiB)
     {
         super(directory, metadata, columns);
+        this.maxSSTableSize = maxSSTableSizeInMiB * 1024L * 1024L;
+        this.header = new SerializationHeader(true, metadata.get(), columns, 
EncodingStats.NO_STATS);
+        this.helper = new SerializationHelper(this.header);
     }
 
-    private SSTableTxnWriter getOrCreateWriter() throws IOException
-    {
-        if (writer == null)
-            writer = createWriter(null);
-
-        return writer;
-    }
-
+    @Override
     PartitionUpdate.Builder getUpdateFor(DecoratedKey key) throws IOException
     {
-        assert key != null;
+        Preconditions.checkArgument(key != null, "Partition update cannot have 
null key");
 
-        // If that's not the current key, write the current one if necessary 
and create a new
-        // update for the new key.
-        if (!key.equals(currentKey))
+        // update for the first partition or a new partition
+        if (update == null || !key.equals(currentKey))
         {
+            // write the previous update if not absent
             if (update != null)
-                writePartition(update.build());
+                writePartition(update.build()); // might switch to a new 
sstable writer and reset currentSize
+
+            // and create a new update for the new key
+            currentSize += 
PartitionUpdate.serializer.serializedSize(createUpdateBuilder(key).build(),
+                                                                     
format.getLatestVersion().correspondingMessagingVersion());
             currentKey = key;
-            update = new PartitionUpdate.Builder(metadata.get(), currentKey, 
columns, 4);
+            update = createSizeCountingPartitionUpdateBuilder(key, 
this::countRow, () -> {});
         }
 
-        assert update != null;
+        Preconditions.checkState(update != null, "Partition update to write 
cannot be null");
         return update;
     }
 
+    @Override
     public void close()
+    {
+        writeLastPartitionUpdate(update);
+        closeWriterIfPresent(writer);
+    }
+
+    private boolean shouldSwitchToNewWriter()
+    {
+        return writer == null || (maxSSTableSize > 0 && currentSize > 
maxSSTableSize);
+    }
+
+    private SSTableTxnWriter getOrSwitchWriter() throws IOException
+    {
+        if (shouldSwitchToNewWriter())
+        {
+            closeWriterIfPresent(writer);
+            writer = createWriter(null);
+            currentSize = 0;
+        }
+
+        return writer;
+    }
+
+    private void countRow(Row row)
+    {
+        // Note that the accounting of a row is a bit inaccurate (it doesn't 
take some of the file format optimization into account).
+        // In particular, what we count is closer to the serialized value, but 
it's debatable that it's the right thing
+        // to count since it will take a lot more space in memory.
+        currentSize += UnfilteredSerializer.serializer.serializedSize(row, 
helper, 0,
+                                                                      
format.getLatestVersion().correspondingMessagingVersion());
+    }
+
+    private void writeLastPartitionUpdate(PartitionUpdate.Builder update)
     {
         try
         {
             if (update != null)
                 writePartition(update.build());
+        }
+        catch (Throwable t)
+        {
+            Throwable e = writer == null ? t : writer.abort(t);
+            Throwables.throwIfUnchecked(e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void closeWriterIfPresent(SSTableTxnWriter writer)

Review Comment:
   NIT for a more succinct method name
   ```suggestion
       private void maybeCloseWriter(SSTableTxnWriter writer)
   ```



##########
test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java:
##########
@@ -1233,6 +1235,77 @@ public void testWriteWithTimestampsAndTtl() throws 
Exception
         assertFalse(iter.hasNext());
     }
 
+    @Test
+    public void testWriteWithSorted() throws Exception
+    {
+        String schema = "CREATE TABLE " + qualifiedTable + " ("
+                        + "  k int PRIMARY KEY,"
+                        + "  v blob )";
+        CQLSSTableWriter writer = CQLSSTableWriter.builder()
+                                                  .inDirectory(dataDir)
+                                                  .forTable(schema)
+                                                  .using("INSERT INTO " + 
qualifiedTable +
+                                                         " (k, v) VALUES 
(?,text_as_blob(?))" )
+                                                  .sorted()
+                                                  .build();
+        int rowCount = 10_000;
+        for (int i = 0; i < rowCount; i++)
+        {
+            writer.addRow(i, UUID.randomUUID().toString());
+        }
+        writer.close();
+        loadSSTables(dataDir, keyspace);
+
+        UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * 
FROM " + qualifiedTable);
+        assertEquals(rowCount, resultSet.size());
+        Iterator<UntypedResultSet.Row> iter = resultSet.iterator();
+        for (int i = 0; i < rowCount; i++)
+        {
+            UntypedResultSet.Row row = iter.next();
+            assertEquals(i, row.getInt("k"));
+        }
+    }
+
+    @Test
+    public void testWriteWithSortedAndMaxSize() throws Exception
+    {
+        String schema = "CREATE TABLE " + qualifiedTable + " ("
+                        + "  k int PRIMARY KEY,"
+                        + "  v blob )";
+        CQLSSTableWriter writer = CQLSSTableWriter.builder()
+                                                  .inDirectory(dataDir)
+                                                  .forTable(schema)
+                                                  .using("INSERT INTO " + 
qualifiedTable +
+                                                         " (k, v) VALUES 
(?,text_as_blob(?))" )
+                                                  .sorted()
+                                                  
.withMaxSSTableSizeInMiBForSorted(1)
+                                                  .build();
+        int rowCount = 30_000;
+        // Max SSTable size is 1 MiB
+        // 30_000 rows should take 30_000 * (4 + 37) = 1.17 MiB > 1 MiB
+        for (int i = 0; i < rowCount; i++)
+        {
+            writer.addRow(i, UUID.randomUUID().toString());
+        }
+        writer.close();
+
+        File[] dataFiles = dataDir.list(f -> 
f.name().endsWith(BigFormat.Components.DATA.type.repr));

Review Comment:
   I ran this test and I am seeing 2 files which is correct. However, both 
files are a little over 600K , and I was expecting to see a file closer to 1MiB 
and the other one taking a few tens of bytes.



##########
src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java:
##########
@@ -554,6 +555,18 @@ public Builder sorted()
             return this;
         }
 
+        /**
+         * Defines the maximum SSTable size when using the sorted writer.
+         * By default, i.e. not specified, there is no maximum size limit for 
the produced SSTable
+         * @param size the maximum size of each individual SSTable allowed

Review Comment:
   ```suggestion
            * @param size the maximum size in mebibytes of each individual 
SSTable allowed
   ```



##########
src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java:
##########
@@ -36,64 +42,132 @@
  * means that rows should be added by increasing md5 of the row key. This is
  * rarely possible and SSTableSimpleUnsortedWriter should most of the time be
  * prefered.
+ * <p>
+ * Optionally, the writer can be configured with a max SSTable size for 
SSTables.
+ * The output will be a series of SSTables that do not exceed a specified size.
+ * By default, all sorted data are written into a single SSTable.
  */
 class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
 {
+    private final long maxSSTableSize;

Review Comment:
   can we either add a comment that mentions the units for the size, or better 
yet rename the variable to indicate it measures in bytes?
   ```suggestion
       private final long maxSSTableSizeInBytes;
   ```



##########
src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java:
##########
@@ -36,64 +42,132 @@
  * means that rows should be added by increasing md5 of the row key. This is
  * rarely possible and SSTableSimpleUnsortedWriter should most of the time be
  * prefered.
+ * <p>
+ * Optionally, the writer can be configured with a max SSTable size for 
SSTables.
+ * The output will be a series of SSTables that do not exceed a specified size.
+ * By default, all sorted data are written into a single SSTable.
  */
 class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
 {
+    private final long maxSSTableSize;
+
+    // Used to compute the row serialized size
+    private final SerializationHeader header;
+    private final SerializationHelper helper;
+
     protected DecoratedKey currentKey;
     protected PartitionUpdate.Builder update;
 
+    private long currentSize;
     private SSTableTxnWriter writer;
 
-    protected SSTableSimpleWriter(File directory, TableMetadataRef metadata, 
RegularAndStaticColumns columns)
+    /**
+     * Create a SSTable writer for sorted input data.
+     * When a positive {@param maxSSTableSizeInMiB} is defined, the writer 
outputs a sequence of SSTables,
+     * whose sizes do not exceed the specified value.
+     *
+     * @param directory directory to store the sstable files
+     * @param metadata table metadata
+     * @param columns columns to update
+     * @param maxSSTableSizeInMiB defines the max SSTable size if the value is 
positive.
+     *                            Any non-positive value indicates the sstable 
size is unlimited.
+     */
+    protected SSTableSimpleWriter(File directory, TableMetadataRef metadata, 
RegularAndStaticColumns columns, long maxSSTableSizeInMiB)
     {
         super(directory, metadata, columns);
+        this.maxSSTableSize = maxSSTableSizeInMiB * 1024L * 1024L;
+        this.header = new SerializationHeader(true, metadata.get(), columns, 
EncodingStats.NO_STATS);
+        this.helper = new SerializationHelper(this.header);
     }
 
-    private SSTableTxnWriter getOrCreateWriter() throws IOException
-    {
-        if (writer == null)
-            writer = createWriter(null);
-
-        return writer;
-    }
-
+    @Override
     PartitionUpdate.Builder getUpdateFor(DecoratedKey key) throws IOException
     {
-        assert key != null;
+        Preconditions.checkArgument(key != null, "Partition update cannot have 
null key");

Review Comment:
   even though I like this change, we need to be mindful that this will have a 
performance difference if assertions are disabled. So before committing to this 
change, I think it'll be useful to consider whether the original assertion was 
desired as a way to have different performance characteristics depending on 
whether assertions are enabled or not. This change could have a noticeable 
impact to existing consumers of this class.



##########
src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java:
##########
@@ -36,64 +42,132 @@
  * means that rows should be added by increasing md5 of the row key. This is
  * rarely possible and SSTableSimpleUnsortedWriter should most of the time be
  * prefered.
+ * <p>
+ * Optionally, the writer can be configured with a max SSTable size for 
SSTables.
+ * The output will be a series of SSTables that do not exceed a specified size.
+ * By default, all sorted data are written into a single SSTable.
  */
 class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
 {
+    private final long maxSSTableSize;
+
+    // Used to compute the row serialized size
+    private final SerializationHeader header;
+    private final SerializationHelper helper;
+
     protected DecoratedKey currentKey;
     protected PartitionUpdate.Builder update;
 
+    private long currentSize;
     private SSTableTxnWriter writer;
 
-    protected SSTableSimpleWriter(File directory, TableMetadataRef metadata, 
RegularAndStaticColumns columns)
+    /**
+     * Create a SSTable writer for sorted input data.
+     * When a positive {@param maxSSTableSizeInMiB} is defined, the writer 
outputs a sequence of SSTables,
+     * whose sizes do not exceed the specified value.
+     *
+     * @param directory directory to store the sstable files
+     * @param metadata table metadata
+     * @param columns columns to update
+     * @param maxSSTableSizeInMiB defines the max SSTable size if the value is 
positive.
+     *                            Any non-positive value indicates the sstable 
size is unlimited.
+     */
+    protected SSTableSimpleWriter(File directory, TableMetadataRef metadata, 
RegularAndStaticColumns columns, long maxSSTableSizeInMiB)
     {
         super(directory, metadata, columns);
+        this.maxSSTableSize = maxSSTableSizeInMiB * 1024L * 1024L;
+        this.header = new SerializationHeader(true, metadata.get(), columns, 
EncodingStats.NO_STATS);
+        this.helper = new SerializationHelper(this.header);
     }
 
-    private SSTableTxnWriter getOrCreateWriter() throws IOException
-    {
-        if (writer == null)
-            writer = createWriter(null);
-
-        return writer;
-    }
-
+    @Override
     PartitionUpdate.Builder getUpdateFor(DecoratedKey key) throws IOException
     {
-        assert key != null;
+        Preconditions.checkArgument(key != null, "Partition update cannot have 
null key");
 
-        // If that's not the current key, write the current one if necessary 
and create a new
-        // update for the new key.
-        if (!key.equals(currentKey))
+        // update for the first partition or a new partition
+        if (update == null || !key.equals(currentKey))
         {
+            // write the previous update if not absent
             if (update != null)
-                writePartition(update.build());
+                writePartition(update.build()); // might switch to a new 
sstable writer and reset currentSize
+
+            // and create a new update for the new key
+            currentSize += 
PartitionUpdate.serializer.serializedSize(createUpdateBuilder(key).build(),
+                                                                     
format.getLatestVersion().correspondingMessagingVersion());
             currentKey = key;
-            update = new PartitionUpdate.Builder(metadata.get(), currentKey, 
columns, 4);
+            update = createSizeCountingPartitionUpdateBuilder(key, 
this::countRow, () -> {});
         }
 
-        assert update != null;
+        Preconditions.checkState(update != null, "Partition update to write 
cannot be null");

Review Comment:
   same comment as above.



##########
src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java:
##########
@@ -110,6 +112,39 @@ private static SSTableId getNextId(File directory, final 
String columnFamily) th
         }
     }
 
+    /**
+     * Returns the partition update builder for the given key
+     * @param key the partition key of the partition update
+     * @return an update on partition {@code key} that is tied to this writer
+     */
+    protected PartitionUpdate.Builder createUpdateBuilder(DecoratedKey key)
+    {
+        return createSizeCountingPartitionUpdateBuilder(key, row -> {}, () -> 
{});
+    }
+
+    /**
+     * Returns the partition update builder that counts row sizes for the 
given key.
+     * @param key the partition key of the partition update
+     * @param onRow consumes the row to count the size
+     * @param onRowAdded runnale that runs after the row is added
+     * @return an update on partition {@code key} that is tied to this writer
+     */
+    protected PartitionUpdate.Builder 
createSizeCountingPartitionUpdateBuilder(DecoratedKey key,
+                                                                               
Consumer<Row> onRow,
+                                                                               
Runnable onRowAdded)

Review Comment:
   to me it feels that all the logic can be encapsulated in the consumer 
lambda, and renders the runnable unnecessary. For the only calling site where 
we actually use the Runnable parameter I would refactor it as:
   
   ```
   previous = createSizeCountingPartitionUpdateBuilder(key, row -> {
                   countRow(row);
                   maybeSync();
               });
   ```
   
   ```suggestion
       protected PartitionUpdate.Builder 
createSizeCountingPartitionUpdateBuilder(DecoratedKey key, Consumer<Row> onRow)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to