yifan-c commented on code in PR #4457:
URL: https://github.com/apache/cassandra/pull/4457#discussion_r2485384646


##########
src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java:
##########
@@ -81,6 +85,12 @@ protected void addIndexGroup(Index.Group indexGroup)
         this.indexGroups.add(indexGroup);
     }
 
+    public void setCompressionDictionary(ColumnFamilyStore cfs, 
CompressionDictionary compressionDictionary)
+    {
+        this.owner = cfs;
+        this.compressionDictionary = compressionDictionary;
+    }

Review Comment:
   Instead of passing cfs in this method, would the API be cleaner if the 
`writer` is created with `cfs`? 



##########
src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java:
##########
@@ -729,8 +745,25 @@ public CQLSSTableWriter build()
                     
Schema.instance.submit(SchemaTransformations.addTable(tableMetadata, true));
                 }
 
+                if (compressionDictionary != null)
+                {
+                    CompressionParams compressionParams = 
tableMetadata.params.compression;
+
+                    if (!compressionParams.isDictionaryCompressionEnabled())
+                    {
+                        throw new IllegalStateException("Table's compressor 
can not accept any dictionary: " + compressionParams.asMap());
+                    }
+
+                    IDictionaryCompressor compressor = (IDictionaryCompressor) 
compressionParams.getSstableCompressor();
+                    if 
(!compressor.canConsumeDictionary(compressionDictionary))
+                    {
+                        throw new IllegalStateException("Provided dictionary 
can not be consumed by table's compressor. Dictionary type: " + 
compressionDictionary.kind()
+                                                        + ", compressor can 
accept type: " + compressor.acceptableDictionaryKind());

Review Comment:
   nit: proposing the revised error message 
   ```suggestion
                           throw new IllegalStateException("Provided dictionary 
can not be consumed by table's compressor. " +
                                                           "Provided dictionary 
type: " + compressionDictionary.kind()
                                                           + "; expected 
dictionary type by the compressor: " + compressor.acceptableDictionaryKind());
   ```



##########
src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java:
##########
@@ -114,6 +124,20 @@ protected SSTableTxnWriter createWriter(SSTable.Owner 
owner) throws IOException
         if (makeRangeAware)
             return SSTableTxnWriter.createRangeAware(metadata, 0, 
ActiveRepairService.UNREPAIRED_SSTABLE, ActiveRepairService.NO_PENDING_REPAIR, 
false, format, header);
 
+
+        SSTable.Owner useOwner;
+
+        if (this.owner != null && this.owner.compressionDictionaryManager() != 
null && compressionDictionary != null)
+        {
+            // already checks if it is cached or not
+            
this.owner.compressionDictionaryManager().add(compressionDictionary);
+            useOwner = this.owner;
+        }
+        else
+        {
+            useOwner = owner;

Review Comment:
   Curious, the input `owner` is always `null` in the code base. Why do we ever 
need this parameter?  Maybe @jacek-lewandowski can share some context. Looks 
like the parameter is added in the `SSTable format API` patch. 
   In case it is consumed by some external system, should it be more clear if 
the `owner` is assigned when creating the `AbstractSSTableSimpleWriter`? So 
that we can avoid the fallback logic here. 



##########
test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java:
##########
@@ -1690,6 +1697,111 @@ public void testConstraintViolation() throws Exception
         }
     }
 
+    @Test
+    public void testWritingWithZstdDictionaryWhenUsingInvalidCompressor()
+    {
+        // the compressor is not dictionary-aware so we will fail
+        final String schema = "CREATE TABLE " + qualifiedTable + " ("
+                              + "  k int,"
+                              + "  v1 text,"
+                              + "  PRIMARY KEY (k)"
+                              + ") WITH compression = {'class': 
'ZstdCompressor'}";
+
+        assertThatThrownBy(() -> CQLSSTableWriter.builder()
+                                                 .inDirectory(dataDir)
+                                                 .forTable(schema)
+                                                 .using("INSERT INTO " + 
keyspace + '.' + table + " (k, v1) VALUES (?, ?)")
+                                                 // does not matter, we will 
fail anyway
+                                                 
.withCompressionDictionary(new ZstdCompressionDictionary(new DictId(ZSTD, 1), 
new byte[0]))
+                                                 .build())
+        .hasMessage("Table's compressor can not accept any dictionary: 
{chunk_length_in_kb=16, class=org.apache.cassandra.io.compress.ZstdCompressor}")
+        .isInstanceOf(IllegalStateException.class);
+    }
+
+    @Test
+    public void testWritingWithZstdDictionary() throws Exception
+    {
+        final String schema = "CREATE TABLE " + qualifiedTable + " ("
+                              + "  k int,"
+                              + "  v1 text,"
+                              + "  PRIMARY KEY (k)"
+                              + ") WITH compression = {'class': 
'ZstdDictionaryCompressor'}";
+
+        CompressionDictionary dictionary = 
DictionaryHelper.trainDictionary(keyspace, table);
+
+        CQLSSTableWriter writer = CQLSSTableWriter.builder()
+                                                  .inDirectory(dataDir)
+                                                  .forTable(schema)
+                                                  .using("INSERT INTO " + 
keyspace + '.' + table + " (k, v1) VALUES (?, ?)")
+                                                  
.withCompressionDictionary(dictionary)
+                                                  .build();
+
+        for (int i = 0; i < 500; i++)
+        {
+            writer.addRow(i, DictionaryHelper.INSTANCE.getRandomSample());
+        }
+
+        writer.close();
+
+        loadSSTables(dataDir, keyspace, table);
+
+        if (verifyDataAfterLoading)
+        {
+            UntypedResultSet resultSet = 
QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable);
+            assertNotNull(resultSet);
+            Iterator<UntypedResultSet.Row> iter = resultSet.iterator();
+            for (int i = 0; i < 500; i++)
+            {
+                UntypedResultSet.Row row = iter.next();
+                assertEquals(i, row.getInt("k"));
+                assertNotNull(row.getString("v1"));
+            }
+        }
+    }
+
+    /**
+     * Simple generator of random data for Zstd compression dictionary and 
dictionary trainer.
+     */
+    private static class DictionaryHelper
+    {
+        public static final DictionaryHelper INSTANCE = new DictionaryHelper();
+        private static final Random random = new Random();
+
+        private static final String[] dates = new String[] 
{"2025-10-20","2025-10-19","2025-10-18","2025-10-17","2025-10-16"};
+        private static final String[] times = new String[] 
{"11:00:01","11:00:012","11:00:03","11:00:04","11:00:05"};

Review Comment:
   `11:00:012` should be `11:00:02`?



##########
src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java:
##########
@@ -114,6 +124,20 @@ protected SSTableTxnWriter createWriter(SSTable.Owner 
owner) throws IOException
         if (makeRangeAware)
             return SSTableTxnWriter.createRangeAware(metadata, 0, 
ActiveRepairService.UNREPAIRED_SSTABLE, ActiveRepairService.NO_PENDING_REPAIR, 
false, format, header);
 
+
+        SSTable.Owner useOwner;

Review Comment:
   nit: how about renaming to `effectiveOwner`? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to