mimaison commented on code in PR #10826:
URL: https://github.com/apache/kafka/pull/10826#discussion_r850466685


##########
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:
##########
@@ -188,6 +190,12 @@ public class ProducerConfig extends AbstractConfig {
                                                        + " values are 
<code>none</code>, <code>gzip</code>, <code>snappy</code>, <code>lz4</code>, or 
<code>zstd</code>. "
                                                        + "Compression is of 
full batches of data, so the efficacy of batching will also impact the 
compression ratio (more batching means better compression).";
 
+    /** <code>compression.level</code> */
+    public static final String COMPRESSION_LEVEL_CONFIG = "compression.level";
+    private static final String COMPRESSION_LEVEL_DOC = "The compression level 
for all data generated by the producer. The default level and valid value is up 
to "
+        + "compression.type. (<code>none</code>, <code>snappy</code>: not 
available. <code>gzip</code>: 1~9. <code>lz4</code>: 1~17. "

Review Comment:
   Should we include the defaults too?



##########
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:
##########
@@ -523,6 +532,45 @@ boolean idempotenceEnabled() {
         return userConfiguredTransactions || idempotenceEnabled;
     }
 
+    public CompressionConfig getCompressionConfig(CompressionType 
compressionType) {
+        if (getString(ProducerConfig.COMPRESSION_LEVEL_CONFIG).isEmpty()) {

Review Comment:
   Since only 3 cases use the compression level, what about moving the if/else 
inside each case block?
   
   ```suggestion
           public CompressionConfig getCompressionConfig(CompressionType 
compressionType) {
           String compressionLevel = 
getString(ProducerConfig.COMPRESSION_LEVEL_CONFIG);
           switch (compressionType) {
               case NONE:
                   return CompressionConfig.NONE;
               case GZIP:
                   if (compressionLevel.isEmpty())
                       return CompressionConfig.gzip().build();
                   else
                       return 
CompressionConfig.gzip().setLevel(Integer.parseInt(compressionLevel)).build();
               case SNAPPY:
                   return CompressionConfig.snappy().build();
               case LZ4:
                   if (compressionLevel.isEmpty())
                       return CompressionConfig.lz4().build();
                   else
                       return 
CompressionConfig.lz4().setLevel(Integer.parseInt(compressionLevel)).build();
               case ZSTD:
                   if (compressionLevel.isEmpty())
                       return CompressionConfig.zstd().build();
                   else
                       return 
CompressionConfig.zstd().setLevel(Integer.parseInt(compressionLevel)).build();
               default:
                   throw new IllegalArgumentException("Unknown compression 
type: " + compressionType.name);
           }
       }
   ```



##########
clients/src/main/java/org/apache/kafka/common/compress/KafkaLZ4BlockOutputStream.java:
##########
@@ -41,6 +41,10 @@ public final class KafkaLZ4BlockOutputStream extends 
OutputStream {
 
     public static final String CLOSED_STREAM = "The stream is already closed";
 
+    public static final int MIN_COMPRESSION_LEVEL = 1;
+    public static final int MAX_COMPRESSION_LEVEL = 17;

Review Comment:
   Should we use constants from `LZ4Constants` instead of hardcoding values?



##########
clients/src/main/java/org/apache/kafka/common/compress/KafkaLZ4BlockOutputStream.java:
##########
@@ -41,6 +41,10 @@ public final class KafkaLZ4BlockOutputStream extends 
OutputStream {
 
     public static final String CLOSED_STREAM = "The stream is already closed";
 
+    public static final int MIN_COMPRESSION_LEVEL = 1;

Review Comment:
   Should we rename this class to `LZ4OutputStream` to be consistent with 
`GzipOutputStream`?



##########
clients/src/main/java/org/apache/kafka/common/compress/ZstdConfig.java:
##########
@@ -32,21 +34,39 @@
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
-public class ZstdFactory {
+public final class ZstdConfig extends CompressionConfig {
+    /**
+     * Zstd supports compression levels from 1 up to ZSTD_maxCLevel(), which 
is currently 22. 0 is reserved for
+     * the default level that is controlled by the library (currently 3), and 
it also supports negative levels
+     * as an experimental feature; It is why MIN_COMPRESSION_LEVEL is not 
defined here.
+     *
+     * For details, please refer the official zstd manual: 
http://facebook.github.io/zstd/zstd_manual.html
+     */
+    static final int MAX_COMPRESSION_LEVEL = 22;

Review Comment:
   Should we use `Zstd.maxCompressionLevel()` instead of hard coding the value?



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -104,7 +104,7 @@ public final class RecordAccumulator {
      */
     public RecordAccumulator(LogContext logContext,
                              int batchSize,
-                             CompressionType compression,
+                             CompressionConfig compression,

Review Comment:
   Could we rename this argument to `compressionConfig`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to