showuon commented on code in PR #15516:
URL: https://github.com/apache/kafka/pull/15516#discussion_r1579334152


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -189,7 +189,7 @@ public RecordAccumulator(LogContext logContext,
                              BufferPool bufferPool) {
         this(logContext,
             batchSize,
-            compression,
+                compression,

Review Comment:
   nit: wrong indent



##########
clients/src/main/java/org/apache/kafka/common/compress/Compression.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.compress;
+
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public interface Compression {
+
+    /**
+     * The compression type for this compression codec
+     */
+    CompressionType type();
+
+    /**
+     * Wrap bufferStream with an OutputStream that will compress data with 
this CompressionType.
+     * Note: Unlike {@link #wrapForInput}, this cannot take {@link 
ByteBuffer}s directly.
+     * Currently, MemoryRecordsBuilder writes to the underlying buffer in the 
given {@link ByteBufferOutputStream} after the compressed data has been written.
+     * In the event that the buffer needs to be expanded while writing the 
data, access to the underlying buffer needs to be preserved.
+     */
+    OutputStream wrapForOutput(ByteBufferOutputStream bufferStream, byte 
messageVersion);
+
+    /**
+     * Wrap buffer with an InputStream that will decompress data with this 
CompressionType.
+     *
+     * @param buffer The {@link ByteBuffer} instance holding the data to 
decompress.
+     * @param messageVersion The record format version to use.
+     * @param decompressionBufferSupplier The supplier of ByteBuffer(s) used 
for decompression if supported.
+     * For small record batches, allocating a potentially large buffer (64 KB 
for LZ4)

Review Comment:
   miss `@return`



##########
clients/src/test/java/org/apache/kafka/common/compress/GzipCompressionTest.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.compress;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class GzipCompressionTest {
+
+    @Test
+    public void testCompressionDecompression() throws IOException {
+        GzipCompression.Builder builder = Compression.gzip();
+        byte[] data = "data".getBytes(StandardCharsets.UTF_8);
+
+        for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, 
RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) {
+            for (int level : Arrays.asList(GzipCompression.MIN_LEVEL, 
GzipCompression.DEFAULT_LEVEL, GzipCompression.MAX_LEVEL)) {
+                GzipCompression compression = builder.level(level).build();
+                ByteBufferOutputStream bufferStream = new 
ByteBufferOutputStream(4);
+                try (OutputStream out = 
compression.wrapForOutput(bufferStream, RecordBatch.CURRENT_MAGIC_VALUE)) {
+                    out.write(data);
+                    out.flush();
+                }
+                bufferStream.buffer().flip();
+
+                try (InputStream inputStream = 
compression.wrapForInput(bufferStream.buffer(), magic, 
BufferSupplier.create())) {
+                    byte[] result = new byte[data.length];
+                    inputStream.read(result);
+                    assertArrayEquals(data, result);
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testCompressionLevels() {
+        GzipCompression.Builder builder = new GzipCompression.Builder();
+
+        assertThrows(IllegalArgumentException.class, () -> 
builder.level(GzipCompression.MIN_LEVEL - 1));
+        assertThrows(IllegalArgumentException.class, () -> 
builder.level(GzipCompression.MAX_LEVEL + 1));
+
+        builder.level(GzipCompression.MIN_LEVEL);
+        builder.level(GzipCompression.MAX_LEVEL);

Review Comment:
   nit: `assertDoesNotThrow`



##########
clients/src/main/java/org/apache/kafka/common/compress/Lz4BlockOutputStream.java:
##########
@@ -33,18 +33,14 @@
  *
  * This class is not thread-safe.
  */
-public final class KafkaLZ4BlockOutputStream extends OutputStream {
+public final class Lz4BlockOutputStream extends OutputStream {
 
     public static final int MAGIC = 0x184D2204;
-    public static final int LZ4_MAX_HEADER_LENGTH = 19;
     public static final int LZ4_FRAME_INCOMPRESSIBLE_MASK = 0x80000000;
 
     public static final String CLOSED_STREAM = "The stream is already closed";
 
     public static final int BLOCKSIZE_64KB = 4;
-    public static final int BLOCKSIZE_256KB = 5;
-    public static final int BLOCKSIZE_1MB = 6;
-    public static final int BLOCKSIZE_4MB = 7;

Review Comment:
   They can be removed because we have another KIP to address them. Fine.



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##########
@@ -1600,7 +1601,7 @@ public void 
testDrainWithANodeThatDoesntHostAnyPartitions() {
         int lingerMs = 10;
         long totalSize = 10 * 1024;
         RecordAccumulator accum = createTestRecordAccumulator(batchSize, 
totalSize,
-            CompressionType.NONE, lingerMs);
+                Compression.NONE, lingerMs);

Review Comment:
   wrong indent.



##########
clients/src/main/java/org/apache/kafka/common/compress/Compression.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.compress;
+
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public interface Compression {
+
+    /**
+     * The compression type for this compression codec
+     */
+    CompressionType type();
+
+    /**
+     * Wrap bufferStream with an OutputStream that will compress data with 
this CompressionType.
+     * Note: Unlike {@link #wrapForInput}, this cannot take {@link 
ByteBuffer}s directly.
+     * Currently, MemoryRecordsBuilder writes to the underlying buffer in the 
given {@link ByteBufferOutputStream} after the compressed data has been written.
+     * In the event that the buffer needs to be expanded while writing the 
data, access to the underlying buffer needs to be preserved.
+     */

Review Comment:
   Please add `@param` `@return` .



##########
clients/src/main/java/org/apache/kafka/common/compress/Lz4BlockOutputStream.java:
##########
@@ -64,15 +60,22 @@ public final class KafkaLZ4BlockOutputStream extends 
OutputStream {
      * @param out The output stream to compress
      * @param blockSize Default: 4. The block size used during compression. 
4=64kb, 5=256kb, 6=1mb, 7=4mb. All other
      *            values will generate an exception
+     * @param level The compression level to use
      * @param blockChecksum Default: false. When true, a XXHash32 checksum is 
computed and appended to the stream for
      *            every block of data
      * @param useBrokenFlagDescriptorChecksum Default: false. When true, 
writes an incorrect FrameDescriptor checksum
      *            compatible with older kafka clients.
      * @throws IOException
      */
-    public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize, boolean 
blockChecksum, boolean useBrokenFlagDescriptorChecksum) throws IOException {
+    public Lz4BlockOutputStream(OutputStream out, int blockSize, int level, 
boolean blockChecksum, boolean useBrokenFlagDescriptorChecksum) throws 
IOException {
         this.out = out;
-        compressor = LZ4Factory.fastestInstance().fastCompressor();
+        /*
+         * lz4-java provides two types of compressors; fastCompressor, which 
requires less memory but fast compression speed (with default compression level 
only),
+         * and highCompressor which requires more memory and slower speed but 
compresses more efficiently (with various compression level).
+         *
+         * For backward compatibility, Lz4BlockOutputStream uses 
fastCompressor with default compression level but, with the other level, it 
uses highCompressor.
+         */
+        compressor = level == Lz4Compression.DEFAULT_LEVEL ? 
LZ4Factory.fastestInstance().fastCompressor() : 
LZ4Factory.fastestInstance().highCompressor(level);

Review Comment:
   Should we add this comment in the `lz4.level` config description? I think 
users also need to know it.



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##########
@@ -458,7 +459,7 @@ public void testRetryBackoff() throws Exception {
         String metricGrpName = "producer-metrics";
 
         final RecordAccumulator accum = new RecordAccumulator(logContext, 
batchSize,
-            CompressionType.NONE, lingerMs, retryBackoffMs, retryBackoffMaxMs,
+                Compression.NONE, lingerMs, retryBackoffMs, retryBackoffMaxMs,

Review Comment:
   wrong indent



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java:
##########
@@ -331,12 +332,12 @@ public ValidationResult 
assignOffsetsNonCompressed(LongRef offsetCounter,
     public ValidationResult validateMessagesAndAssignOffsetsCompressed(LongRef 
offsetCounter,
                                                                        
MetricsRecorder metricsRecorder,
                                                                        
BufferSupplier bufferSupplier) {
-        if (targetCompression == CompressionType.ZSTD && 
interBrokerProtocolVersion.isLessThan(IBP_2_1_IV0))
+        if (targetCompression.type() == CompressionType.ZSTD && 
interBrokerProtocolVersion.isLessThan(IBP_2_1_IV0))
             throw new UnsupportedCompressionTypeException("Produce requests to 
inter.broker.protocol.version < 2.1 broker " +
                 "are not allowed to use ZStandard compression");
 
         // No in place assignment situation 1
-        boolean inPlaceAssignment = sourceCompression == targetCompression;
+        boolean inPlaceAssignment = sourceCompressionType == 
targetCompression.type();

Review Comment:
   So we won't do re-compression if only level is different? I didn't see this 
in KIP. Maybe we should add it?



##########
clients/src/test/java/org/apache/kafka/common/compress/Lz4CompressionTest.java:
##########
@@ -27,25 +31,92 @@
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
 import java.util.stream.Stream;
 
-import static 
org.apache.kafka.common.compress.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK;
+import static 
org.apache.kafka.common.compress.Lz4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class KafkaLZ4Test {
+public class Lz4CompressionTest {
 
     private final static Random RANDOM = new Random(0);
 
+    @Test
+    public void testLz4FramingMagicV0() {
+        ByteBuffer buffer = ByteBuffer.allocate(256);
+        Lz4Compression compression = new Lz4Compression.Builder().build();
+        Lz4BlockOutputStream out = (Lz4BlockOutputStream) 
compression.wrapForOutput(
+                new ByteBufferOutputStream(buffer), 
RecordBatch.MAGIC_VALUE_V0);
+        assertTrue(out.useBrokenFlagDescriptorChecksum());
+
+        buffer.rewind();
+
+        ChunkedBytesStream in = (ChunkedBytesStream) 
compression.wrapForInput(buffer, RecordBatch.MAGIC_VALUE_V0, 
BufferSupplier.NO_CACHING);
+        assertTrue(((Lz4BlockInputStream) 
in.sourceStream()).ignoreFlagDescriptorChecksum());
+    }
+
+    @Test
+    public void testLz4FramingMagicV1() {
+        ByteBuffer buffer = ByteBuffer.allocate(256);
+        Lz4Compression compression = new Lz4Compression.Builder().build();
+        Lz4BlockOutputStream out = (Lz4BlockOutputStream) 
compression.wrapForOutput(
+                new ByteBufferOutputStream(buffer), 
RecordBatch.MAGIC_VALUE_V1);
+        assertFalse(out.useBrokenFlagDescriptorChecksum());
+
+        buffer.rewind();
+
+        ChunkedBytesStream in = (ChunkedBytesStream) 
compression.wrapForInput(buffer, RecordBatch.MAGIC_VALUE_V1, 
BufferSupplier.create());
+        assertFalse(((Lz4BlockInputStream) 
in.sourceStream()).ignoreFlagDescriptorChecksum());
+    }
+
+    @Test
+    public void testCompressionDecompression() throws IOException {
+        Lz4Compression.Builder builder = Compression.lz4();
+        byte[] data = "data".getBytes(StandardCharsets.UTF_8);
+
+        for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, 
RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) {
+            for (int level : Arrays.asList(Lz4Compression.MIN_LEVEL, 
Lz4Compression.DEFAULT_LEVEL, Lz4Compression.MAX_LEVEL)) {
+                Lz4Compression compression = builder.level(level).build();
+                ByteBufferOutputStream bufferStream = new 
ByteBufferOutputStream(4);
+                try (OutputStream out = 
compression.wrapForOutput(bufferStream, RecordBatch.CURRENT_MAGIC_VALUE)) {
+                    out.write(data);
+                    out.flush();
+                }
+                bufferStream.buffer().flip();
+
+                try (InputStream inputStream = 
compression.wrapForInput(bufferStream.buffer(), magic, 
BufferSupplier.create())) {
+                    byte[] result = new byte[data.length];
+                    inputStream.read(result);
+                    assertArrayEquals(data, result);
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testCompressionLevels() {
+        Lz4Compression.Builder builder = new Lz4Compression.Builder();
+
+        assertThrows(IllegalArgumentException.class, () -> 
builder.level(Lz4Compression.MIN_LEVEL - 1));
+        assertThrows(IllegalArgumentException.class, () -> 
builder.level(Lz4Compression.MAX_LEVEL + 1));
+
+        builder.level(Lz4Compression.MIN_LEVEL);
+        builder.level(Lz4Compression.MAX_LEVEL);

Review Comment:
   nit: assertDoesNotThrow



##########
clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java:
##########
@@ -500,27 +501,24 @@ private static Stream<Arguments> 
testBufferReuseInSkipKeyValueIterator() {
     @MethodSource
     public void testZstdJniForSkipKeyValueIterator(int expectedJniCalls, 
byte[] recordValue) throws IOException {
         MemoryRecords records = 
MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 0L,
-            CompressionType.ZSTD, TimestampType.CREATE_TIME,
+            Compression.zstd().build(), TimestampType.CREATE_TIME,
             new SimpleRecord(9L, "hakuna-matata".getBytes(), recordValue)
         );
 
         // Buffer containing compressed data
         final ByteBuffer compressedBuf = records.buffer();
         // Create a RecordBatch object
         final DefaultRecordBatch batch = spy(new 
DefaultRecordBatch(compressedBuf.duplicate()));
-        final CompressionType mockCompression = 
mock(CompressionType.ZSTD.getClass());
-        doReturn(mockCompression).when(batch).compressionType();
-
+        final ZstdCompression compression = Compression.zstd().build();
         // Buffer containing compressed records to be used for creating 
zstd-jni stream
-        ByteBuffer recordsBuffer = compressedBuf.duplicate();
+        ByteBuffer recordsBuffer = spy(compressedBuf.duplicate());

Review Comment:
   Why should we spy this?



##########
clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java:
##########
@@ -173,6 +173,14 @@ public class TopicConfig {
         "accepts 'uncompressed' which is equivalent to no compression; and 
'producer' which means retain the " +
         "original compression codec set by the producer.";
 
+
+    public static final String COMPRESSION_GZIP_LEVEL_CONFIG = 
"compression.gzip.level";
+    public static final String COMPRESSION_GZIP_LEVEL_DOC = "The compression 
level to use if " + COMPRESSION_TYPE_CONFIG + " is set to <code>gzip</code>.";
+    public static final String COMPRESSION_LZ4_LEVEL_CONFIG = 
"compression.lz4.level";
+    public static final String COMPRESSION_LZ4_LEVEL_DOC = "The compression 
level to use if " + COMPRESSION_TYPE_CONFIG + " is set to <code>lz4</code>.";
+    public static final String COMPRESSION_ZSTD_LEVEL_CONFIG = 
"compression.zstd.level";
+    public static final String COMPRESSION_ZSTD_LEVEL_DOC = "The compression 
level to use if " + COMPRESSION_TYPE_CONFIG + " is set to <code>zstd</code>.";

Review Comment:
   nit: Should we provide the doc link for each compression type? It's hard to 
know which level means what.



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##########
@@ -1750,7 +1751,7 @@ private RecordAccumulator createTestRecordAccumulator(
         return new RecordAccumulator(
             logContext,
             batchSize,
-            type,
+                compression,

Review Comment:
   wrong indent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to