Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]

2024-05-21 Thread via GitHub


mimaison merged PR #15516:
URL: https://github.com/apache/kafka/pull/15516


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]

2024-05-21 Thread via GitHub


mimaison commented on PR #15516:
URL: https://github.com/apache/kafka/pull/15516#issuecomment-2122934859

   None of the test failures seem related, merging to trunk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]

2024-05-21 Thread via GitHub


mimaison commented on code in PR #15516:
URL: https://github.com/apache/kafka/pull/15516#discussion_r1608274764


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java:
##
@@ -331,12 +332,12 @@ public ValidationResult 
assignOffsetsNonCompressed(LongRef offsetCounter,
 public ValidationResult validateMessagesAndAssignOffsetsCompressed(LongRef 
offsetCounter,

MetricsRecorder metricsRecorder,

BufferSupplier bufferSupplier) {
-if (targetCompression == CompressionType.ZSTD && 
interBrokerProtocolVersion.isLessThan(IBP_2_1_IV0))
+if (targetCompression.type() == CompressionType.ZSTD && 
interBrokerProtocolVersion.isLessThan(IBP_2_1_IV0))
 throw new UnsupportedCompressionTypeException("Produce requests to 
inter.broker.protocol.version < 2.1 broker " +
 "are not allowed to use ZStandard compression");
 
 // No in place assignment situation 1
-boolean inPlaceAssignment = sourceCompression == targetCompression;
+boolean inPlaceAssignment = sourceCompressionType == 
targetCompression.type();

Review Comment:
   Right, I updated the KIP.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]

2024-05-21 Thread via GitHub


mimaison commented on PR #15516:
URL: https://github.com/apache/kafka/pull/15516#issuecomment-2122361920

   Thanks for the reviews!
   I had to rebase again so I'll wait for the CI to complete.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]

2024-05-21 Thread via GitHub


showuon commented on code in PR #15516:
URL: https://github.com/apache/kafka/pull/15516#discussion_r1607924069


##
clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java:
##
@@ -173,6 +173,14 @@ public class TopicConfig {
 "accepts 'uncompressed' which is equivalent to no compression; and 
'producer' which means retain the " +
 "original compression codec set by the producer.";
 
+
+public static final String COMPRESSION_GZIP_LEVEL_CONFIG = 
"compression.gzip.level";
+public static final String COMPRESSION_GZIP_LEVEL_DOC = "The compression 
level to use if " + COMPRESSION_TYPE_CONFIG + " is set to gzip.";
+public static final String COMPRESSION_LZ4_LEVEL_CONFIG = 
"compression.lz4.level";
+public static final String COMPRESSION_LZ4_LEVEL_DOC = "The compression 
level to use if " + COMPRESSION_TYPE_CONFIG + " is set to lz4.";
+public static final String COMPRESSION_ZSTD_LEVEL_CONFIG = 
"compression.zstd.level";
+public static final String COMPRESSION_ZSTD_LEVEL_DOC = "The compression 
level to use if " + COMPRESSION_TYPE_CONFIG + " is set to zstd.";

Review Comment:
   I was thinking we added in the config description. Or maybe added in KIP-390 
is good enough.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]

2024-05-21 Thread via GitHub


showuon commented on code in PR #15516:
URL: https://github.com/apache/kafka/pull/15516#discussion_r1607919763


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java:
##
@@ -331,12 +332,12 @@ public ValidationResult 
assignOffsetsNonCompressed(LongRef offsetCounter,
 public ValidationResult validateMessagesAndAssignOffsetsCompressed(LongRef 
offsetCounter,

MetricsRecorder metricsRecorder,

BufferSupplier bufferSupplier) {
-if (targetCompression == CompressionType.ZSTD && 
interBrokerProtocolVersion.isLessThan(IBP_2_1_IV0))
+if (targetCompression.type() == CompressionType.ZSTD && 
interBrokerProtocolVersion.isLessThan(IBP_2_1_IV0))
 throw new UnsupportedCompressionTypeException("Produce requests to 
inter.broker.protocol.version < 2.1 broker " +
 "are not allowed to use ZStandard compression");
 
 // No in place assignment situation 1
-boolean inPlaceAssignment = sourceCompression == targetCompression;
+boolean inPlaceAssignment = sourceCompressionType == 
targetCompression.type();

Review Comment:
   I agree. But I think we should mention this in the KIP-390 at least.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]

2024-05-17 Thread via GitHub


mimaison commented on code in PR #15516:
URL: https://github.com/apache/kafka/pull/15516#discussion_r1605420401


##
core/src/test/scala/unit/kafka/log/LogValidatorTest.scala:
##
@@ -1538,7 +1563,80 @@ class LogValidatorTest {
 assertEquals(e.recordErrors.size, 3)
   }
 
-  private def testBatchWithoutRecordsNotAllowed(sourceCompression: 
CompressionType, targetCompression: CompressionType): Unit = {
+  @Test
+  def testDifferentLevelDoesNotCauseRecompression(): Unit = {
+val records = List(
+  List.fill(256)("some").mkString("").getBytes,
+  List.fill(256)("data").mkString("").getBytes
+)
+// Records from the producer were created with gzip max level
+val gzipMax: Compression = 
Compression.gzip().level(GzipCompression.MAX_LEVEL).build()
+val recordsGzipMax = createRecords(records, RecordBatch.MAGIC_VALUE_V2, 
RecordBatch.NO_TIMESTAMP, gzipMax)
+
+// The topic is configured with gzip min level
+val gzipMin: Compression = 
Compression.gzip().level(GzipCompression.MIN_LEVEL).build()
+val recordsGzipMin = createRecords(records, RecordBatch.MAGIC_VALUE_V2, 
RecordBatch.NO_TIMESTAMP, gzipMin)
+
+// ensure data compressed with gzip max and min is different
+assertNotEquals(recordsGzipMax, recordsGzipMin)
+val validator = new LogValidator(recordsGzipMax,
+  topicPartition,
+  time,
+  gzipMax.`type`(),
+  gzipMin,
+  false,
+  RecordBatch.CURRENT_MAGIC_VALUE,

Review Comment:
   Yes, let's keep it consistent. Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]

2024-05-17 Thread via GitHub


junrao commented on code in PR #15516:
URL: https://github.com/apache/kafka/pull/15516#discussion_r1605313096


##
core/src/test/scala/unit/kafka/log/LogValidatorTest.scala:
##
@@ -1538,7 +1563,80 @@ class LogValidatorTest {
 assertEquals(e.recordErrors.size, 3)
   }
 
-  private def testBatchWithoutRecordsNotAllowed(sourceCompression: 
CompressionType, targetCompression: CompressionType): Unit = {
+  @Test
+  def testDifferentLevelDoesNotCauseRecompression(): Unit = {
+val records = List(
+  List.fill(256)("some").mkString("").getBytes,
+  List.fill(256)("data").mkString("").getBytes
+)
+// Records from the producer were created with gzip max level
+val gzipMax: Compression = 
Compression.gzip().level(GzipCompression.MAX_LEVEL).build()
+val recordsGzipMax = createRecords(records, RecordBatch.MAGIC_VALUE_V2, 
RecordBatch.NO_TIMESTAMP, gzipMax)
+
+// The topic is configured with gzip min level
+val gzipMin: Compression = 
Compression.gzip().level(GzipCompression.MIN_LEVEL).build()
+val recordsGzipMin = createRecords(records, RecordBatch.MAGIC_VALUE_V2, 
RecordBatch.NO_TIMESTAMP, gzipMin)
+
+// ensure data compressed with gzip max and min is different
+assertNotEquals(recordsGzipMax, recordsGzipMin)
+val validator = new LogValidator(recordsGzipMax,
+  topicPartition,
+  time,
+  gzipMax.`type`(),
+  gzipMin,
+  false,
+  RecordBatch.CURRENT_MAGIC_VALUE,

Review Comment:
   Earlier, we use RecordBatch.MAGIC_VALUE_V2. Should we be consistent here? 
Ditto in the next test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]

2024-05-16 Thread via GitHub


mimaison commented on PR #15516:
URL: https://github.com/apache/kafka/pull/15516#issuecomment-2115225074

   I also added a couple of new tests in LogValidatorTest to check 
recompression only happens if the compression codec is different between the 
records from the producer and the topic configuration and does not happen if 
only the compression levels are different.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]

2024-05-16 Thread via GitHub


mimaison commented on code in PR #15516:
URL: https://github.com/apache/kafka/pull/15516#discussion_r1603325731


##
server-common/src/test/java/org/apache/kafka/server/record/BrokerCompressionTypeTest.java:
##
@@ -16,23 +16,38 @@
  */
 package org.apache.kafka.server.record;
 
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.compress.GzipCompression;
+import org.apache.kafka.common.compress.Lz4Compression;
+import org.apache.kafka.common.compress.SnappyCompression;
+import org.apache.kafka.common.compress.ZstdCompression;
 import org.apache.kafka.common.record.CompressionType;
 import org.junit.jupiter.api.Test;
 
+import java.util.Optional;
+
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class BrokerCompressionTypeTest {
 
 @Test
 public void testTargetCompressionType() {
-assertEquals(CompressionType.GZIP, 
BrokerCompressionType.GZIP.targetCompressionType(CompressionType.ZSTD));
-assertEquals(CompressionType.SNAPPY, 
BrokerCompressionType.SNAPPY.targetCompressionType(CompressionType.LZ4));
-assertEquals(CompressionType.LZ4, 
BrokerCompressionType.LZ4.targetCompressionType(CompressionType.ZSTD));
-assertEquals(CompressionType.ZSTD, 
BrokerCompressionType.ZSTD.targetCompressionType(CompressionType.GZIP));
-
-assertEquals(CompressionType.LZ4, 
BrokerCompressionType.PRODUCER.targetCompressionType(CompressionType.LZ4));
-assertEquals(CompressionType.ZSTD, 
BrokerCompressionType.PRODUCER.targetCompressionType(CompressionType.ZSTD));
+GzipCompression gzipWithLevel = 
Compression.gzip().level(GzipCompression.MAX_LEVEL).build();
+assertEquals(gzipWithLevel, 
BrokerCompressionType.GZIP.targetCompression(Optional.of(gzipWithLevel), 
CompressionType.ZSTD));

Review Comment:
   Fixed!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]

2024-05-15 Thread via GitHub


junrao commented on code in PR #15516:
URL: https://github.com/apache/kafka/pull/15516#discussion_r1602012880


##
server-common/src/test/java/org/apache/kafka/server/record/BrokerCompressionTypeTest.java:
##
@@ -16,23 +16,38 @@
  */
 package org.apache.kafka.server.record;
 
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.compress.GzipCompression;
+import org.apache.kafka.common.compress.Lz4Compression;
+import org.apache.kafka.common.compress.SnappyCompression;
+import org.apache.kafka.common.compress.ZstdCompression;
 import org.apache.kafka.common.record.CompressionType;
 import org.junit.jupiter.api.Test;
 
+import java.util.Optional;
+
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class BrokerCompressionTypeTest {
 
 @Test
 public void testTargetCompressionType() {
-assertEquals(CompressionType.GZIP, 
BrokerCompressionType.GZIP.targetCompressionType(CompressionType.ZSTD));
-assertEquals(CompressionType.SNAPPY, 
BrokerCompressionType.SNAPPY.targetCompressionType(CompressionType.LZ4));
-assertEquals(CompressionType.LZ4, 
BrokerCompressionType.LZ4.targetCompressionType(CompressionType.ZSTD));
-assertEquals(CompressionType.ZSTD, 
BrokerCompressionType.ZSTD.targetCompressionType(CompressionType.GZIP));
-
-assertEquals(CompressionType.LZ4, 
BrokerCompressionType.PRODUCER.targetCompressionType(CompressionType.LZ4));
-assertEquals(CompressionType.ZSTD, 
BrokerCompressionType.PRODUCER.targetCompressionType(CompressionType.ZSTD));
+GzipCompression gzipWithLevel = 
Compression.gzip().level(GzipCompression.MAX_LEVEL).build();
+assertEquals(gzipWithLevel, 
BrokerCompressionType.GZIP.targetCompression(Optional.of(gzipWithLevel), 
CompressionType.ZSTD));

Review Comment:
   `BrokerCompressionType.GZIP.targetCompression` => 
`BrokerCompressionType.targetCompression`
   
   Ditto below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]

2024-05-15 Thread via GitHub


mimaison commented on code in PR #15516:
URL: https://github.com/apache/kafka/pull/15516#discussion_r1601337698


##
clients/src/test/java/org/apache/kafka/common/compress/GzipCompressionTest.java:
##
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.compress;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class GzipCompressionTest {
+
+@Test
+public void testCompressionDecompression() throws IOException {
+GzipCompression.Builder builder = Compression.gzip();
+byte[] data = "data".getBytes(StandardCharsets.UTF_8);

Review Comment:
   Good idea, I increased the input data to 1024 bytes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]

2024-05-15 Thread via GitHub


mimaison commented on code in PR #15516:
URL: https://github.com/apache/kafka/pull/15516#discussion_r1601283629


##
clients/src/test/java/org/apache/kafka/common/compress/GzipCompressionTest.java:
##
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.compress;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class GzipCompressionTest {
+
+@Test
+public void testCompressionDecompression() throws IOException {
+GzipCompression.Builder builder = Compression.gzip();
+byte[] data = "data".getBytes(StandardCharsets.UTF_8);
+
+for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, 
RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) {
+for (int level : Arrays.asList(GzipCompression.MIN_LEVEL, 
GzipCompression.DEFAULT_LEVEL, GzipCompression.MAX_LEVEL)) {
+GzipCompression compression = builder.level(level).build();
+ByteBufferOutputStream bufferStream = new 
ByteBufferOutputStream(4);
+try (OutputStream out = 
compression.wrapForOutput(bufferStream, RecordBatch.CURRENT_MAGIC_VALUE)) {
+out.write(data);
+out.flush();
+}
+bufferStream.buffer().flip();
+
+try (InputStream inputStream = 
compression.wrapForInput(bufferStream.buffer(), magic, 
BufferSupplier.create())) {
+byte[] result = new byte[data.length];
+int read = inputStream.read(result);
+assertEquals(data.length, read);
+assertArrayEquals(data, result);
+}
+}
+}
+}
+
+@Test
+public void testCompressionLevels() {
+GzipCompression.Builder builder = Compression.gzip();
+
+assertThrows(IllegalArgumentException.class, () -> 
builder.level(GzipCompression.MIN_LEVEL - 1));
+assertThrows(IllegalArgumentException.class, () -> 
builder.level(GzipCompression.MAX_LEVEL + 1));
+
+builder.level(GzipCompression.MIN_LEVEL);
+builder.level(GzipCompression.MAX_LEVEL);
+}
+
+@Test
+public void testLevelValidator() {
+GzipCompression.LevelValidator validator = new 
GzipCompression.LevelValidator();
+for (int level = GzipCompression.MIN_LEVEL; level <= 
GzipCompression.MAX_LEVEL; level++) {
+validator.ensureValid("", level);
+}
+validator.ensureValid("", GzipCompression.DEFAULT_LEVEL);
+assertThrows(ConfigException.class, () -> validator.ensureValid("", 
0));

Review Comment:
   Yes this can be removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]

2024-05-15 Thread via GitHub


mimaison commented on code in PR #15516:
URL: https://github.com/apache/kafka/pull/15516#discussion_r1601240696


##
core/src/test/scala/unit/kafka/log/LogValidatorTest.scala:
##
@@ -1587,7 +1612,7 @@ class LogValidatorTest {
 
   private def createTwoBatchedRecords(magicValue: Byte,
   timestamp: Long,
-  codec: CompressionType): MemoryRecords = 
{
+  codec: Compression): MemoryRecords = {

Review Comment:
   Yes, so let's remove it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]

2024-05-15 Thread via GitHub


mimaison commented on code in PR #15516:
URL: https://github.com/apache/kafka/pull/15516#discussion_r1601222613


##
clients/src/test/java/org/apache/kafka/common/compress/NoCompressionTest.java:
##
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.compress;
+
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class NoCompressionTest {
+
+@Test
+public void testCompressionDecompression() throws IOException {
+NoCompression compression = Compression.NONE;
+byte[] data = "data".getBytes(StandardCharsets.UTF_8);
+
+for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, 
RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) {
+ByteBufferOutputStream bufferStream = new 
ByteBufferOutputStream(4);
+try (OutputStream out = compression.wrapForOutput(bufferStream, 
RecordBatch.CURRENT_MAGIC_VALUE)) {

Review Comment:
   Yes, good catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]

2024-05-14 Thread via GitHub


junrao commented on code in PR #15516:
URL: https://github.com/apache/kafka/pull/15516#discussion_r1600483273


##
clients/src/test/java/org/apache/kafka/common/compress/GzipCompressionTest.java:
##
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.compress;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class GzipCompressionTest {
+
+@Test
+public void testCompressionDecompression() throws IOException {
+GzipCompression.Builder builder = Compression.gzip();
+byte[] data = "data".getBytes(StandardCharsets.UTF_8);

Review Comment:
   Should we test sth bigger like more than 512 bytes so that it covers the 
out.flush() logic?



##
clients/src/test/java/org/apache/kafka/common/compress/GzipCompressionTest.java:
##
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.compress;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class GzipCompressionTest {
+
+@Test
+public void testCompressionDecompression() throws IOException {
+GzipCompression.Builder builder = Compression.gzip();
+byte[] data = "data".getBytes(StandardCharsets.UTF_8);
+
+for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, 
RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) {
+for (int level : Arrays.asList(GzipCompression.MIN_LEVEL, 
GzipCompression.DEFAULT_LEVEL, GzipCompression.MAX_LEVEL)) {
+GzipCompression compression = builder.level(level).build();
+ByteBufferOutputStream bufferStream = new 
ByteBufferOutputStream(4);
+try (OutputStream out = 
compression.wrapForOutput(bufferStream, RecordBatch.CURRENT_MAGIC_VALUE)) {
+out.write(data);
+out.flush();
+}
+bufferStream.buffer().flip();
+
+try (InputStream inputStream = 
compression.wrapForInput(bufferStream.buffer(), magic, 
BufferSupplier.create())) {
+byte[] result = new byte[data.length];
+int read = inputStream.read(result);
+assertEquals(data.length, read);
+assertArrayEquals(data, result);
+}
+}
+}

Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]

2024-05-14 Thread via GitHub


mimaison commented on code in PR #15516:
URL: https://github.com/apache/kafka/pull/15516#discussion_r1600267022


##
clients/src/main/java/org/apache/kafka/common/compress/ZstdCompression.java:
##
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.compress;
+
+import com.github.luben.zstd.BufferPool;
+import com.github.luben.zstd.RecyclingBufferPool;
+import com.github.luben.zstd.Zstd;
+import com.github.luben.zstd.ZstdInputStreamNoFinalizer;
+import com.github.luben.zstd.ZstdOutputStreamNoFinalizer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferInputStream;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.ChunkedBytesStream;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+public class ZstdCompression implements Compression {
+
+public static final int MIN_LEVEL = Zstd.minCompressionLevel();
+public static final int MAX_LEVEL = Zstd.maxCompressionLevel();
+public static final int DEFAULT_LEVEL = Zstd.defaultCompressionLevel();
+
+private final int level;
+
+private ZstdCompression(int level) {
+this.level = level;
+}
+
+@Override
+public CompressionType type() {
+return CompressionType.ZSTD;
+}
+
+@Override
+public OutputStream wrapForOutput(ByteBufferOutputStream bufferStream, 
byte messageVersion) {
+try {
+// Set input buffer (uncompressed) to 16 KB (none by default) to 
ensure reasonable performance
+// in cases where the caller passes a small number of bytes to 
write (potentially a single byte).
+return new BufferedOutputStream(new 
ZstdOutputStreamNoFinalizer(bufferStream, RecyclingBufferPool.INSTANCE, level), 
16 * 1024);
+} catch (Throwable e) {
+throw new KafkaException(e);
+}
+}
+
+@Override
+public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, 
BufferSupplier decompressionBufferSupplier) {
+try {
+return new ChunkedBytesStream(wrapForZstdInput(buffer, 
decompressionBufferSupplier),
+decompressionBufferSupplier,
+decompressionOutputSize(),
+false);
+} catch (Throwable e) {
+throw new KafkaException(e);
+}
+}
+
+// visible for testing
+public ZstdInputStreamNoFinalizer wrapForZstdInput(ByteBuffer buffer, 
BufferSupplier decompressionBufferSupplier) throws IOException {

Review Comment:
   I considered inlining this method as it's only used in a test which kind of 
test in `DefaultRecordBatchTest` the internal behavior of 
`ZstdInputStreamNoFinalizer`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]

2024-05-14 Thread via GitHub


mimaison commented on code in PR #15516:
URL: https://github.com/apache/kafka/pull/15516#discussion_r1600263287


##
clients/src/main/java/org/apache/kafka/common/compress/Lz4Compression.java:
##
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.compress;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.ChunkedBytesStream;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+public class Lz4Compression implements Compression {
+
+public static final int MIN_LEVEL = 1;
+public static final int MAX_LEVEL = 17;
+public static final int DEFAULT_LEVEL = 9;

Review Comment:
   I hesitated defining these constants for this reason but these levels have 
not changed over 10 years [0], so hopefully this won't require a lot of 
maintenance.
   
   0: 
https://github.com/lz4/lz4-java/blame/master/src/java/net/jpountz/lz4/LZ4Constants.java#L23-L24



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]

2024-05-14 Thread via GitHub


mimaison commented on code in PR #15516:
URL: https://github.com/apache/kafka/pull/15516#discussion_r1600255317


##
clients/src/main/java/org/apache/kafka/common/compress/NoCompression.java:
##
@@ -14,37 +14,48 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.common.compress;
 
-import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.BufferSupplier;
 import org.apache.kafka.common.utils.ByteBufferInputStream;
 import org.apache.kafka.common.utils.ByteBufferOutputStream;
-import org.xerial.snappy.SnappyInputStream;
-import org.xerial.snappy.SnappyOutputStream;
 
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
-public class SnappyFactory {
+public class NoCompression implements Compression {
 
-private SnappyFactory() { }
+@Override
+public CompressionType type() {
+return CompressionType.NONE;
+}
 
-public static OutputStream wrapForOutput(ByteBufferOutputStream buffer) {
-try {
-return new SnappyOutputStream(buffer);
-} catch (Throwable e) {
-throw new KafkaException(e);
-}
+@Override
+public OutputStream wrapForOutput(ByteBufferOutputStream bufferStream, 
byte messageVersion) {
+return bufferStream;
 }
 
-public static InputStream wrapForInput(ByteBuffer buffer) {
-try {
-return new SnappyInputStream(new ByteBufferInputStream(buffer));
-} catch (Throwable e) {
-throw new KafkaException(e);
-}
+@Override
+public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, 
BufferSupplier decompressionBufferSupplier) {
+return new ByteBufferInputStream(buffer);
+}
+
+@Override
+public boolean equals(Object o) {
+return o instanceof NoCompression;
+}
+
+@Override
+public int hashCode() {
+return super.hashCode();
 }
 
+public static class Builder implements Compression.Builder {

Review Comment:
   Yes, I made all of them private.
   I also considered making the Builders private too as I expect them to be 
retrieved via the getters in Compression, but I decided to keep them public.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]

2024-05-14 Thread via GitHub


mimaison commented on code in PR #15516:
URL: https://github.com/apache/kafka/pull/15516#discussion_r1600247977


##
clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java:
##
@@ -173,6 +173,14 @@ public class TopicConfig {
 "accepts 'uncompressed' which is equivalent to no compression; and 
'producer' which means retain the " +
 "original compression codec set by the producer.";
 
+
+public static final String COMPRESSION_GZIP_LEVEL_CONFIG = 
"compression.gzip.level";
+public static final String COMPRESSION_GZIP_LEVEL_DOC = "The compression 
level to use if " + COMPRESSION_TYPE_CONFIG + " is set to gzip.";
+public static final String COMPRESSION_LZ4_LEVEL_CONFIG = 
"compression.lz4.level";
+public static final String COMPRESSION_LZ4_LEVEL_DOC = "The compression 
level to use if " + COMPRESSION_TYPE_CONFIG + " is set to lz4.";
+public static final String COMPRESSION_ZSTD_LEVEL_CONFIG = 
"compression.zstd.level";
+public static final String COMPRESSION_ZSTD_LEVEL_DOC = "The compression 
level to use if " + COMPRESSION_TYPE_CONFIG + " is set to zstd.";

Review Comment:
   Do you mean a link to the compression library websites?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]

2024-05-14 Thread via GitHub


mimaison commented on code in PR #15516:
URL: https://github.com/apache/kafka/pull/15516#discussion_r1600241274


##
clients/src/main/java/org/apache/kafka/common/compress/NoCompression.java:
##
@@ -14,37 +14,48 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.common.compress;
 
-import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.BufferSupplier;
 import org.apache.kafka.common.utils.ByteBufferInputStream;
 import org.apache.kafka.common.utils.ByteBufferOutputStream;
-import org.xerial.snappy.SnappyInputStream;
-import org.xerial.snappy.SnappyOutputStream;
 
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
-public class SnappyFactory {
+public class NoCompression implements Compression {
 
-private SnappyFactory() { }
+@Override
+public CompressionType type() {
+return CompressionType.NONE;
+}
 
-public static OutputStream wrapForOutput(ByteBufferOutputStream buffer) {
-try {
-return new SnappyOutputStream(buffer);
-} catch (Throwable e) {
-throw new KafkaException(e);
-}
+@Override
+public OutputStream wrapForOutput(ByteBufferOutputStream bufferStream, 
byte messageVersion) {
+return bufferStream;
 }
 
-public static InputStream wrapForInput(ByteBuffer buffer) {
-try {
-return new SnappyInputStream(new ByteBufferInputStream(buffer));
-} catch (Throwable e) {
-throw new KafkaException(e);
-}
+@Override
+public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, 
BufferSupplier decompressionBufferSupplier) {
+return new ByteBufferInputStream(buffer);
+}
+
+@Override
+public boolean equals(Object o) {
+return o instanceof NoCompression;
+}
+
+@Override
+public int hashCode() {
+return super.hashCode();

Review Comment:
   Actually I don't think we need to override the default hashCode/equals 
methods. Initially I wanted to make this a singleton but I don't think it's 
necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]

2024-05-14 Thread via GitHub


mimaison commented on code in PR #15516:
URL: https://github.com/apache/kafka/pull/15516#discussion_r1600235959


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java:
##
@@ -331,12 +332,12 @@ public ValidationResult 
assignOffsetsNonCompressed(LongRef offsetCounter,
 public ValidationResult validateMessagesAndAssignOffsetsCompressed(LongRef 
offsetCounter,

MetricsRecorder metricsRecorder,

BufferSupplier bufferSupplier) {
-if (targetCompression == CompressionType.ZSTD && 
interBrokerProtocolVersion.isLessThan(IBP_2_1_IV0))
+if (targetCompression.type() == CompressionType.ZSTD && 
interBrokerProtocolVersion.isLessThan(IBP_2_1_IV0))
 throw new UnsupportedCompressionTypeException("Produce requests to 
inter.broker.protocol.version < 2.1 broker " +
 "are not allowed to use ZStandard compression");
 
 // No in place assignment situation 1
-boolean inPlaceAssignment = sourceCompression == targetCompression;
+boolean inPlaceAssignment = sourceCompressionType == 
targetCompression.type();

Review Comment:
   The broker has no easy way of retrieving the level that the producer used 
when compressing the records. So if the compression codec matches, I decided to 
keep the compressed bytes instead of decompressing and compressing everything 
again as this would be wasteful, especially as the producer could have already 
used the same compression level.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]

2024-05-13 Thread via GitHub


mimaison commented on code in PR #15516:
URL: https://github.com/apache/kafka/pull/15516#discussion_r1598619060


##
clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java:
##
@@ -500,27 +501,24 @@ private static Stream 
testBufferReuseInSkipKeyValueIterator() {
 @MethodSource
 public void testZstdJniForSkipKeyValueIterator(int expectedJniCalls, 
byte[] recordValue) throws IOException {
 MemoryRecords records = 
MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 0L,
-CompressionType.ZSTD, TimestampType.CREATE_TIME,
+Compression.zstd().build(), TimestampType.CREATE_TIME,
 new SimpleRecord(9L, "hakuna-matata".getBytes(), recordValue)
 );
 
 // Buffer containing compressed data
 final ByteBuffer compressedBuf = records.buffer();
 // Create a RecordBatch object
 final DefaultRecordBatch batch = spy(new 
DefaultRecordBatch(compressedBuf.duplicate()));
-final CompressionType mockCompression = 
mock(CompressionType.ZSTD.getClass());
-doReturn(mockCompression).when(batch).compressionType();
-
+final ZstdCompression compression = Compression.zstd().build();
 // Buffer containing compressed records to be used for creating 
zstd-jni stream
-ByteBuffer recordsBuffer = compressedBuf.duplicate();
+ByteBuffer recordsBuffer = spy(compressedBuf.duplicate());

Review Comment:
   Right, it's not needed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]

2024-05-13 Thread via GitHub


mimaison commented on code in PR #15516:
URL: https://github.com/apache/kafka/pull/15516#discussion_r1598609324


##
clients/src/test/java/org/apache/kafka/common/compress/GzipCompressionTest.java:
##
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.compress;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class GzipCompressionTest {
+
+@Test
+public void testCompressionDecompression() throws IOException {
+GzipCompression.Builder builder = Compression.gzip();
+byte[] data = "data".getBytes(StandardCharsets.UTF_8);
+
+for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, 
RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) {
+for (int level : Arrays.asList(GzipCompression.MIN_LEVEL, 
GzipCompression.DEFAULT_LEVEL, GzipCompression.MAX_LEVEL)) {
+GzipCompression compression = builder.level(level).build();
+ByteBufferOutputStream bufferStream = new 
ByteBufferOutputStream(4);
+try (OutputStream out = 
compression.wrapForOutput(bufferStream, RecordBatch.CURRENT_MAGIC_VALUE)) {
+out.write(data);
+out.flush();
+}
+bufferStream.buffer().flip();
+
+try (InputStream inputStream = 
compression.wrapForInput(bufferStream.buffer(), magic, 
BufferSupplier.create())) {
+byte[] result = new byte[data.length];
+inputStream.read(result);
+assertArrayEquals(data, result);
+}
+}
+}
+}
+
+@Test
+public void testCompressionLevels() {
+GzipCompression.Builder builder = new GzipCompression.Builder();
+
+assertThrows(IllegalArgumentException.class, () -> 
builder.level(GzipCompression.MIN_LEVEL - 1));
+assertThrows(IllegalArgumentException.class, () -> 
builder.level(GzipCompression.MAX_LEVEL + 1));
+
+builder.level(GzipCompression.MIN_LEVEL);
+builder.level(GzipCompression.MAX_LEVEL);

Review Comment:
   It's not needed, if `level()` throws, it automatically fails the test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]

2024-05-02 Thread via GitHub


junrao commented on code in PR #15516:
URL: https://github.com/apache/kafka/pull/15516#discussion_r1586920863


##
clients/src/main/java/org/apache/kafka/common/compress/NoCompression.java:
##
@@ -14,37 +14,48 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.common.compress;
 
-import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.BufferSupplier;
 import org.apache.kafka.common.utils.ByteBufferInputStream;
 import org.apache.kafka.common.utils.ByteBufferOutputStream;
-import org.xerial.snappy.SnappyInputStream;
-import org.xerial.snappy.SnappyOutputStream;
 
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
-public class SnappyFactory {
+public class NoCompression implements Compression {
 
-private SnappyFactory() { }
+@Override
+public CompressionType type() {
+return CompressionType.NONE;
+}
 
-public static OutputStream wrapForOutput(ByteBufferOutputStream buffer) {
-try {
-return new SnappyOutputStream(buffer);
-} catch (Throwable e) {
-throw new KafkaException(e);
-}
+@Override
+public OutputStream wrapForOutput(ByteBufferOutputStream bufferStream, 
byte messageVersion) {
+return bufferStream;
 }
 
-public static InputStream wrapForInput(ByteBuffer buffer) {
-try {
-return new SnappyInputStream(new ByteBufferInputStream(buffer));
-} catch (Throwable e) {
-throw new KafkaException(e);
-}
+@Override
+public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, 
BufferSupplier decompressionBufferSupplier) {
+return new ByteBufferInputStream(buffer);
+}
+
+@Override
+public boolean equals(Object o) {
+return o instanceof NoCompression;
+}
+
+@Override
+public int hashCode() {
+return super.hashCode();

Review Comment:
   Hmm, we redefine `equals()` such that all objects of type NoCompression are 
equal, yet they have different hashcode?



##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java:
##
@@ -201,8 +203,10 @@ public static byte[] compress(byte[] raw, CompressionType 
compressionType) throw
 
 public static ByteBuffer decompress(byte[] metrics, CompressionType 
compressionType) {
 ByteBuffer data = ByteBuffer.wrap(metrics);
-try (InputStream in = compressionType.wrapForInput(data, 
RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create());
+Compression compression = Compression.of(compressionType).build();
+try (InputStream in = compression.wrapForInput(data, 
RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create());
 ByteBufferOutputStream out = new ByteBufferOutputStream(512)) {
+

Review Comment:
   extra new line



##
clients/src/main/java/org/apache/kafka/common/compress/Lz4Compression.java:
##
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.compress;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.ChunkedBytesStream;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+public class Lz4Compression implements Compression {
+
+public static final int MIN_LEVEL = 1;
+public static final int MAX_LEVEL = 17;
+public static final int DEFAULT_LEVEL = 9;

Review Comment:
   So, every time we update the Lz4 library, we may need to update the above 
values? We probably want to add a note here.



##
clients/src/main/java/org/apache/kafka/common/compress/Compression.java:
##
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor 

Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]

2024-04-25 Thread via GitHub


showuon commented on code in PR #15516:
URL: https://github.com/apache/kafka/pull/15516#discussion_r1579334152


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##
@@ -189,7 +189,7 @@ public RecordAccumulator(LogContext logContext,
  BufferPool bufferPool) {
 this(logContext,
 batchSize,
-compression,
+compression,

Review Comment:
   nit: wrong indent



##
clients/src/main/java/org/apache/kafka/common/compress/Compression.java:
##
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.compress;
+
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public interface Compression {
+
+/**
+ * The compression type for this compression codec
+ */
+CompressionType type();
+
+/**
+ * Wrap bufferStream with an OutputStream that will compress data with 
this CompressionType.
+ * Note: Unlike {@link #wrapForInput}, this cannot take {@link 
ByteBuffer}s directly.
+ * Currently, MemoryRecordsBuilder writes to the underlying buffer in the 
given {@link ByteBufferOutputStream} after the compressed data has been written.
+ * In the event that the buffer needs to be expanded while writing the 
data, access to the underlying buffer needs to be preserved.
+ */
+OutputStream wrapForOutput(ByteBufferOutputStream bufferStream, byte 
messageVersion);
+
+/**
+ * Wrap buffer with an InputStream that will decompress data with this 
CompressionType.
+ *
+ * @param buffer The {@link ByteBuffer} instance holding the data to 
decompress.
+ * @param messageVersion The record format version to use.
+ * @param decompressionBufferSupplier The supplier of ByteBuffer(s) used 
for decompression if supported.
+ * For small record batches, allocating a potentially large buffer (64 KB 
for LZ4)

Review Comment:
   miss `@return`



##
clients/src/test/java/org/apache/kafka/common/compress/GzipCompressionTest.java:
##
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.compress;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class GzipCompressionTest {
+
+@Test
+public void testCompressionDecompression() throws IOException {
+GzipCompression.Builder builder = Compression.gzip();
+byte[] data = "data".getBytes(StandardCharsets.UTF_8);
+
+for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, 
RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) {
+for (int level : Arrays.asList(GzipCompression.MIN_LEVEL, 
GzipCompression.DEFAULT_LEVEL, 

Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]

2024-04-10 Thread via GitHub


mimaison commented on PR #15516:
URL: https://github.com/apache/kafka/pull/15516#issuecomment-2048128916

   @divijvaidya It seems you've done a bit of work around compression in the 
past. Can you take a look? Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org