Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]
mimaison merged PR #15516: URL: https://github.com/apache/kafka/pull/15516 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]
mimaison commented on PR #15516: URL: https://github.com/apache/kafka/pull/15516#issuecomment-2122934859 None of the test failures seem related, merging to trunk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]
mimaison commented on code in PR #15516: URL: https://github.com/apache/kafka/pull/15516#discussion_r1608274764 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java: ## @@ -331,12 +332,12 @@ public ValidationResult assignOffsetsNonCompressed(LongRef offsetCounter, public ValidationResult validateMessagesAndAssignOffsetsCompressed(LongRef offsetCounter, MetricsRecorder metricsRecorder, BufferSupplier bufferSupplier) { -if (targetCompression == CompressionType.ZSTD && interBrokerProtocolVersion.isLessThan(IBP_2_1_IV0)) +if (targetCompression.type() == CompressionType.ZSTD && interBrokerProtocolVersion.isLessThan(IBP_2_1_IV0)) throw new UnsupportedCompressionTypeException("Produce requests to inter.broker.protocol.version < 2.1 broker " + "are not allowed to use ZStandard compression"); // No in place assignment situation 1 -boolean inPlaceAssignment = sourceCompression == targetCompression; +boolean inPlaceAssignment = sourceCompressionType == targetCompression.type(); Review Comment: Right, I updated the KIP. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]
mimaison commented on PR #15516: URL: https://github.com/apache/kafka/pull/15516#issuecomment-2122361920 Thanks for the reviews! I had to rebase again so I'll wait for the CI to complete. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]
showuon commented on code in PR #15516: URL: https://github.com/apache/kafka/pull/15516#discussion_r1607924069 ## clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java: ## @@ -173,6 +173,14 @@ public class TopicConfig { "accepts 'uncompressed' which is equivalent to no compression; and 'producer' which means retain the " + "original compression codec set by the producer."; + +public static final String COMPRESSION_GZIP_LEVEL_CONFIG = "compression.gzip.level"; +public static final String COMPRESSION_GZIP_LEVEL_DOC = "The compression level to use if " + COMPRESSION_TYPE_CONFIG + " is set to gzip."; +public static final String COMPRESSION_LZ4_LEVEL_CONFIG = "compression.lz4.level"; +public static final String COMPRESSION_LZ4_LEVEL_DOC = "The compression level to use if " + COMPRESSION_TYPE_CONFIG + " is set to lz4."; +public static final String COMPRESSION_ZSTD_LEVEL_CONFIG = "compression.zstd.level"; +public static final String COMPRESSION_ZSTD_LEVEL_DOC = "The compression level to use if " + COMPRESSION_TYPE_CONFIG + " is set to zstd."; Review Comment: I was thinking we added in the config description. Or maybe added in KIP-390 is good enough. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]
showuon commented on code in PR #15516: URL: https://github.com/apache/kafka/pull/15516#discussion_r1607919763 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java: ## @@ -331,12 +332,12 @@ public ValidationResult assignOffsetsNonCompressed(LongRef offsetCounter, public ValidationResult validateMessagesAndAssignOffsetsCompressed(LongRef offsetCounter, MetricsRecorder metricsRecorder, BufferSupplier bufferSupplier) { -if (targetCompression == CompressionType.ZSTD && interBrokerProtocolVersion.isLessThan(IBP_2_1_IV0)) +if (targetCompression.type() == CompressionType.ZSTD && interBrokerProtocolVersion.isLessThan(IBP_2_1_IV0)) throw new UnsupportedCompressionTypeException("Produce requests to inter.broker.protocol.version < 2.1 broker " + "are not allowed to use ZStandard compression"); // No in place assignment situation 1 -boolean inPlaceAssignment = sourceCompression == targetCompression; +boolean inPlaceAssignment = sourceCompressionType == targetCompression.type(); Review Comment: I agree. But I think we should mention this in the KIP-390 at least. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]
mimaison commented on code in PR #15516: URL: https://github.com/apache/kafka/pull/15516#discussion_r1605420401 ## core/src/test/scala/unit/kafka/log/LogValidatorTest.scala: ## @@ -1538,7 +1563,80 @@ class LogValidatorTest { assertEquals(e.recordErrors.size, 3) } - private def testBatchWithoutRecordsNotAllowed(sourceCompression: CompressionType, targetCompression: CompressionType): Unit = { + @Test + def testDifferentLevelDoesNotCauseRecompression(): Unit = { +val records = List( + List.fill(256)("some").mkString("").getBytes, + List.fill(256)("data").mkString("").getBytes +) +// Records from the producer were created with gzip max level +val gzipMax: Compression = Compression.gzip().level(GzipCompression.MAX_LEVEL).build() +val recordsGzipMax = createRecords(records, RecordBatch.MAGIC_VALUE_V2, RecordBatch.NO_TIMESTAMP, gzipMax) + +// The topic is configured with gzip min level +val gzipMin: Compression = Compression.gzip().level(GzipCompression.MIN_LEVEL).build() +val recordsGzipMin = createRecords(records, RecordBatch.MAGIC_VALUE_V2, RecordBatch.NO_TIMESTAMP, gzipMin) + +// ensure data compressed with gzip max and min is different +assertNotEquals(recordsGzipMax, recordsGzipMin) +val validator = new LogValidator(recordsGzipMax, + topicPartition, + time, + gzipMax.`type`(), + gzipMin, + false, + RecordBatch.CURRENT_MAGIC_VALUE, Review Comment: Yes, let's keep it consistent. Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]
junrao commented on code in PR #15516: URL: https://github.com/apache/kafka/pull/15516#discussion_r1605313096 ## core/src/test/scala/unit/kafka/log/LogValidatorTest.scala: ## @@ -1538,7 +1563,80 @@ class LogValidatorTest { assertEquals(e.recordErrors.size, 3) } - private def testBatchWithoutRecordsNotAllowed(sourceCompression: CompressionType, targetCompression: CompressionType): Unit = { + @Test + def testDifferentLevelDoesNotCauseRecompression(): Unit = { +val records = List( + List.fill(256)("some").mkString("").getBytes, + List.fill(256)("data").mkString("").getBytes +) +// Records from the producer were created with gzip max level +val gzipMax: Compression = Compression.gzip().level(GzipCompression.MAX_LEVEL).build() +val recordsGzipMax = createRecords(records, RecordBatch.MAGIC_VALUE_V2, RecordBatch.NO_TIMESTAMP, gzipMax) + +// The topic is configured with gzip min level +val gzipMin: Compression = Compression.gzip().level(GzipCompression.MIN_LEVEL).build() +val recordsGzipMin = createRecords(records, RecordBatch.MAGIC_VALUE_V2, RecordBatch.NO_TIMESTAMP, gzipMin) + +// ensure data compressed with gzip max and min is different +assertNotEquals(recordsGzipMax, recordsGzipMin) +val validator = new LogValidator(recordsGzipMax, + topicPartition, + time, + gzipMax.`type`(), + gzipMin, + false, + RecordBatch.CURRENT_MAGIC_VALUE, Review Comment: Earlier, we use RecordBatch.MAGIC_VALUE_V2. Should we be consistent here? Ditto in the next test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]
mimaison commented on PR #15516: URL: https://github.com/apache/kafka/pull/15516#issuecomment-2115225074 I also added a couple of new tests in LogValidatorTest to check recompression only happens if the compression codec is different between the records from the producer and the topic configuration and does not happen if only the compression levels are different. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]
mimaison commented on code in PR #15516: URL: https://github.com/apache/kafka/pull/15516#discussion_r1603325731 ## server-common/src/test/java/org/apache/kafka/server/record/BrokerCompressionTypeTest.java: ## @@ -16,23 +16,38 @@ */ package org.apache.kafka.server.record; +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.compress.GzipCompression; +import org.apache.kafka.common.compress.Lz4Compression; +import org.apache.kafka.common.compress.SnappyCompression; +import org.apache.kafka.common.compress.ZstdCompression; import org.apache.kafka.common.record.CompressionType; import org.junit.jupiter.api.Test; +import java.util.Optional; + import static org.junit.jupiter.api.Assertions.assertEquals; public class BrokerCompressionTypeTest { @Test public void testTargetCompressionType() { -assertEquals(CompressionType.GZIP, BrokerCompressionType.GZIP.targetCompressionType(CompressionType.ZSTD)); -assertEquals(CompressionType.SNAPPY, BrokerCompressionType.SNAPPY.targetCompressionType(CompressionType.LZ4)); -assertEquals(CompressionType.LZ4, BrokerCompressionType.LZ4.targetCompressionType(CompressionType.ZSTD)); -assertEquals(CompressionType.ZSTD, BrokerCompressionType.ZSTD.targetCompressionType(CompressionType.GZIP)); - -assertEquals(CompressionType.LZ4, BrokerCompressionType.PRODUCER.targetCompressionType(CompressionType.LZ4)); -assertEquals(CompressionType.ZSTD, BrokerCompressionType.PRODUCER.targetCompressionType(CompressionType.ZSTD)); +GzipCompression gzipWithLevel = Compression.gzip().level(GzipCompression.MAX_LEVEL).build(); +assertEquals(gzipWithLevel, BrokerCompressionType.GZIP.targetCompression(Optional.of(gzipWithLevel), CompressionType.ZSTD)); Review Comment: Fixed! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]
junrao commented on code in PR #15516: URL: https://github.com/apache/kafka/pull/15516#discussion_r1602012880 ## server-common/src/test/java/org/apache/kafka/server/record/BrokerCompressionTypeTest.java: ## @@ -16,23 +16,38 @@ */ package org.apache.kafka.server.record; +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.compress.GzipCompression; +import org.apache.kafka.common.compress.Lz4Compression; +import org.apache.kafka.common.compress.SnappyCompression; +import org.apache.kafka.common.compress.ZstdCompression; import org.apache.kafka.common.record.CompressionType; import org.junit.jupiter.api.Test; +import java.util.Optional; + import static org.junit.jupiter.api.Assertions.assertEquals; public class BrokerCompressionTypeTest { @Test public void testTargetCompressionType() { -assertEquals(CompressionType.GZIP, BrokerCompressionType.GZIP.targetCompressionType(CompressionType.ZSTD)); -assertEquals(CompressionType.SNAPPY, BrokerCompressionType.SNAPPY.targetCompressionType(CompressionType.LZ4)); -assertEquals(CompressionType.LZ4, BrokerCompressionType.LZ4.targetCompressionType(CompressionType.ZSTD)); -assertEquals(CompressionType.ZSTD, BrokerCompressionType.ZSTD.targetCompressionType(CompressionType.GZIP)); - -assertEquals(CompressionType.LZ4, BrokerCompressionType.PRODUCER.targetCompressionType(CompressionType.LZ4)); -assertEquals(CompressionType.ZSTD, BrokerCompressionType.PRODUCER.targetCompressionType(CompressionType.ZSTD)); +GzipCompression gzipWithLevel = Compression.gzip().level(GzipCompression.MAX_LEVEL).build(); +assertEquals(gzipWithLevel, BrokerCompressionType.GZIP.targetCompression(Optional.of(gzipWithLevel), CompressionType.ZSTD)); Review Comment: `BrokerCompressionType.GZIP.targetCompression` => `BrokerCompressionType.targetCompression` Ditto below. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]
mimaison commented on code in PR #15516: URL: https://github.com/apache/kafka/pull/15516#discussion_r1601337698 ## clients/src/test/java/org/apache/kafka/common/compress/GzipCompressionTest.java: ## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.compress; + +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class GzipCompressionTest { + +@Test +public void testCompressionDecompression() throws IOException { +GzipCompression.Builder builder = Compression.gzip(); +byte[] data = "data".getBytes(StandardCharsets.UTF_8); Review Comment: Good idea, I increased the input data to 1024 bytes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]
mimaison commented on code in PR #15516: URL: https://github.com/apache/kafka/pull/15516#discussion_r1601283629 ## clients/src/test/java/org/apache/kafka/common/compress/GzipCompressionTest.java: ## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.compress; + +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class GzipCompressionTest { + +@Test +public void testCompressionDecompression() throws IOException { +GzipCompression.Builder builder = Compression.gzip(); +byte[] data = "data".getBytes(StandardCharsets.UTF_8); + +for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) { +for (int level : Arrays.asList(GzipCompression.MIN_LEVEL, GzipCompression.DEFAULT_LEVEL, GzipCompression.MAX_LEVEL)) { +GzipCompression compression = builder.level(level).build(); +ByteBufferOutputStream bufferStream = new ByteBufferOutputStream(4); +try (OutputStream out = compression.wrapForOutput(bufferStream, RecordBatch.CURRENT_MAGIC_VALUE)) { +out.write(data); +out.flush(); +} +bufferStream.buffer().flip(); + +try (InputStream inputStream = compression.wrapForInput(bufferStream.buffer(), magic, BufferSupplier.create())) { +byte[] result = new byte[data.length]; +int read = inputStream.read(result); +assertEquals(data.length, read); +assertArrayEquals(data, result); +} +} +} +} + +@Test +public void testCompressionLevels() { +GzipCompression.Builder builder = Compression.gzip(); + +assertThrows(IllegalArgumentException.class, () -> builder.level(GzipCompression.MIN_LEVEL - 1)); +assertThrows(IllegalArgumentException.class, () -> builder.level(GzipCompression.MAX_LEVEL + 1)); + +builder.level(GzipCompression.MIN_LEVEL); +builder.level(GzipCompression.MAX_LEVEL); +} + +@Test +public void testLevelValidator() { +GzipCompression.LevelValidator validator = new GzipCompression.LevelValidator(); +for (int level = GzipCompression.MIN_LEVEL; level <= GzipCompression.MAX_LEVEL; level++) { +validator.ensureValid("", level); +} +validator.ensureValid("", GzipCompression.DEFAULT_LEVEL); +assertThrows(ConfigException.class, () -> validator.ensureValid("", 0)); Review Comment: Yes this can be removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]
mimaison commented on code in PR #15516: URL: https://github.com/apache/kafka/pull/15516#discussion_r1601240696 ## core/src/test/scala/unit/kafka/log/LogValidatorTest.scala: ## @@ -1587,7 +1612,7 @@ class LogValidatorTest { private def createTwoBatchedRecords(magicValue: Byte, timestamp: Long, - codec: CompressionType): MemoryRecords = { + codec: Compression): MemoryRecords = { Review Comment: Yes, so let's remove it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]
mimaison commented on code in PR #15516: URL: https://github.com/apache/kafka/pull/15516#discussion_r1601222613 ## clients/src/test/java/org/apache/kafka/common/compress/NoCompressionTest.java: ## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.compress; + +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class NoCompressionTest { + +@Test +public void testCompressionDecompression() throws IOException { +NoCompression compression = Compression.NONE; +byte[] data = "data".getBytes(StandardCharsets.UTF_8); + +for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) { +ByteBufferOutputStream bufferStream = new ByteBufferOutputStream(4); +try (OutputStream out = compression.wrapForOutput(bufferStream, RecordBatch.CURRENT_MAGIC_VALUE)) { Review Comment: Yes, good catch! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]
junrao commented on code in PR #15516: URL: https://github.com/apache/kafka/pull/15516#discussion_r1600483273 ## clients/src/test/java/org/apache/kafka/common/compress/GzipCompressionTest.java: ## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.compress; + +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class GzipCompressionTest { + +@Test +public void testCompressionDecompression() throws IOException { +GzipCompression.Builder builder = Compression.gzip(); +byte[] data = "data".getBytes(StandardCharsets.UTF_8); Review Comment: Should we test sth bigger like more than 512 bytes so that it covers the out.flush() logic? ## clients/src/test/java/org/apache/kafka/common/compress/GzipCompressionTest.java: ## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.compress; + +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class GzipCompressionTest { + +@Test +public void testCompressionDecompression() throws IOException { +GzipCompression.Builder builder = Compression.gzip(); +byte[] data = "data".getBytes(StandardCharsets.UTF_8); + +for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) { +for (int level : Arrays.asList(GzipCompression.MIN_LEVEL, GzipCompression.DEFAULT_LEVEL, GzipCompression.MAX_LEVEL)) { +GzipCompression compression = builder.level(level).build(); +ByteBufferOutputStream bufferStream = new ByteBufferOutputStream(4); +try (OutputStream out = compression.wrapForOutput(bufferStream, RecordBatch.CURRENT_MAGIC_VALUE)) { +out.write(data); +out.flush(); +} +bufferStream.buffer().flip(); + +try (InputStream inputStream = compression.wrapForInput(bufferStream.buffer(), magic, BufferSupplier.create())) { +byte[] result = new byte[data.length]; +int read = inputStream.read(result); +assertEquals(data.length, read); +assertArrayEquals(data, result); +} +} +}
Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]
mimaison commented on code in PR #15516: URL: https://github.com/apache/kafka/pull/15516#discussion_r1600267022 ## clients/src/main/java/org/apache/kafka/common/compress/ZstdCompression.java: ## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.compress; + +import com.github.luben.zstd.BufferPool; +import com.github.luben.zstd.RecyclingBufferPool; +import com.github.luben.zstd.Zstd; +import com.github.luben.zstd.ZstdInputStreamNoFinalizer; +import com.github.luben.zstd.ZstdOutputStreamNoFinalizer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferInputStream; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.ChunkedBytesStream; + +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.Objects; + +public class ZstdCompression implements Compression { + +public static final int MIN_LEVEL = Zstd.minCompressionLevel(); +public static final int MAX_LEVEL = Zstd.maxCompressionLevel(); +public static final int DEFAULT_LEVEL = Zstd.defaultCompressionLevel(); + +private final int level; + +private ZstdCompression(int level) { +this.level = level; +} + +@Override +public CompressionType type() { +return CompressionType.ZSTD; +} + +@Override +public OutputStream wrapForOutput(ByteBufferOutputStream bufferStream, byte messageVersion) { +try { +// Set input buffer (uncompressed) to 16 KB (none by default) to ensure reasonable performance +// in cases where the caller passes a small number of bytes to write (potentially a single byte). +return new BufferedOutputStream(new ZstdOutputStreamNoFinalizer(bufferStream, RecyclingBufferPool.INSTANCE, level), 16 * 1024); +} catch (Throwable e) { +throw new KafkaException(e); +} +} + +@Override +public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) { +try { +return new ChunkedBytesStream(wrapForZstdInput(buffer, decompressionBufferSupplier), +decompressionBufferSupplier, +decompressionOutputSize(), +false); +} catch (Throwable e) { +throw new KafkaException(e); +} +} + +// visible for testing +public ZstdInputStreamNoFinalizer wrapForZstdInput(ByteBuffer buffer, BufferSupplier decompressionBufferSupplier) throws IOException { Review Comment: I considered inlining this method as it's only used in a test which kind of test in `DefaultRecordBatchTest` the internal behavior of `ZstdInputStreamNoFinalizer`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]
mimaison commented on code in PR #15516: URL: https://github.com/apache/kafka/pull/15516#discussion_r1600263287 ## clients/src/main/java/org/apache/kafka/common/compress/Lz4Compression.java: ## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.compress; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.ChunkedBytesStream; + +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.Objects; + +public class Lz4Compression implements Compression { + +public static final int MIN_LEVEL = 1; +public static final int MAX_LEVEL = 17; +public static final int DEFAULT_LEVEL = 9; Review Comment: I hesitated defining these constants for this reason but these levels have not changed over 10 years [0], so hopefully this won't require a lot of maintenance. 0: https://github.com/lz4/lz4-java/blame/master/src/java/net/jpountz/lz4/LZ4Constants.java#L23-L24 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]
mimaison commented on code in PR #15516: URL: https://github.com/apache/kafka/pull/15516#discussion_r1600255317 ## clients/src/main/java/org/apache/kafka/common/compress/NoCompression.java: ## @@ -14,37 +14,48 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.kafka.common.compress; -import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.ByteBufferInputStream; import org.apache.kafka.common.utils.ByteBufferOutputStream; -import org.xerial.snappy.SnappyInputStream; -import org.xerial.snappy.SnappyOutputStream; import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; -public class SnappyFactory { +public class NoCompression implements Compression { -private SnappyFactory() { } +@Override +public CompressionType type() { +return CompressionType.NONE; +} -public static OutputStream wrapForOutput(ByteBufferOutputStream buffer) { -try { -return new SnappyOutputStream(buffer); -} catch (Throwable e) { -throw new KafkaException(e); -} +@Override +public OutputStream wrapForOutput(ByteBufferOutputStream bufferStream, byte messageVersion) { +return bufferStream; } -public static InputStream wrapForInput(ByteBuffer buffer) { -try { -return new SnappyInputStream(new ByteBufferInputStream(buffer)); -} catch (Throwable e) { -throw new KafkaException(e); -} +@Override +public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) { +return new ByteBufferInputStream(buffer); +} + +@Override +public boolean equals(Object o) { +return o instanceof NoCompression; +} + +@Override +public int hashCode() { +return super.hashCode(); } +public static class Builder implements Compression.Builder { Review Comment: Yes, I made all of them private. I also considered making the Builders private too as I expect them to be retrieved via the getters in Compression, but I decided to keep them public. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]
mimaison commented on code in PR #15516: URL: https://github.com/apache/kafka/pull/15516#discussion_r1600247977 ## clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java: ## @@ -173,6 +173,14 @@ public class TopicConfig { "accepts 'uncompressed' which is equivalent to no compression; and 'producer' which means retain the " + "original compression codec set by the producer."; + +public static final String COMPRESSION_GZIP_LEVEL_CONFIG = "compression.gzip.level"; +public static final String COMPRESSION_GZIP_LEVEL_DOC = "The compression level to use if " + COMPRESSION_TYPE_CONFIG + " is set to gzip."; +public static final String COMPRESSION_LZ4_LEVEL_CONFIG = "compression.lz4.level"; +public static final String COMPRESSION_LZ4_LEVEL_DOC = "The compression level to use if " + COMPRESSION_TYPE_CONFIG + " is set to lz4."; +public static final String COMPRESSION_ZSTD_LEVEL_CONFIG = "compression.zstd.level"; +public static final String COMPRESSION_ZSTD_LEVEL_DOC = "The compression level to use if " + COMPRESSION_TYPE_CONFIG + " is set to zstd."; Review Comment: Do you mean a link to the compression library websites? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]
mimaison commented on code in PR #15516: URL: https://github.com/apache/kafka/pull/15516#discussion_r1600241274 ## clients/src/main/java/org/apache/kafka/common/compress/NoCompression.java: ## @@ -14,37 +14,48 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.kafka.common.compress; -import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.ByteBufferInputStream; import org.apache.kafka.common.utils.ByteBufferOutputStream; -import org.xerial.snappy.SnappyInputStream; -import org.xerial.snappy.SnappyOutputStream; import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; -public class SnappyFactory { +public class NoCompression implements Compression { -private SnappyFactory() { } +@Override +public CompressionType type() { +return CompressionType.NONE; +} -public static OutputStream wrapForOutput(ByteBufferOutputStream buffer) { -try { -return new SnappyOutputStream(buffer); -} catch (Throwable e) { -throw new KafkaException(e); -} +@Override +public OutputStream wrapForOutput(ByteBufferOutputStream bufferStream, byte messageVersion) { +return bufferStream; } -public static InputStream wrapForInput(ByteBuffer buffer) { -try { -return new SnappyInputStream(new ByteBufferInputStream(buffer)); -} catch (Throwable e) { -throw new KafkaException(e); -} +@Override +public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) { +return new ByteBufferInputStream(buffer); +} + +@Override +public boolean equals(Object o) { +return o instanceof NoCompression; +} + +@Override +public int hashCode() { +return super.hashCode(); Review Comment: Actually I don't think we need to override the default hashCode/equals methods. Initially I wanted to make this a singleton but I don't think it's necessary. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]
mimaison commented on code in PR #15516: URL: https://github.com/apache/kafka/pull/15516#discussion_r1600235959 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java: ## @@ -331,12 +332,12 @@ public ValidationResult assignOffsetsNonCompressed(LongRef offsetCounter, public ValidationResult validateMessagesAndAssignOffsetsCompressed(LongRef offsetCounter, MetricsRecorder metricsRecorder, BufferSupplier bufferSupplier) { -if (targetCompression == CompressionType.ZSTD && interBrokerProtocolVersion.isLessThan(IBP_2_1_IV0)) +if (targetCompression.type() == CompressionType.ZSTD && interBrokerProtocolVersion.isLessThan(IBP_2_1_IV0)) throw new UnsupportedCompressionTypeException("Produce requests to inter.broker.protocol.version < 2.1 broker " + "are not allowed to use ZStandard compression"); // No in place assignment situation 1 -boolean inPlaceAssignment = sourceCompression == targetCompression; +boolean inPlaceAssignment = sourceCompressionType == targetCompression.type(); Review Comment: The broker has no easy way of retrieving the level that the producer used when compressing the records. So if the compression codec matches, I decided to keep the compressed bytes instead of decompressing and compressing everything again as this would be wasteful, especially as the producer could have already used the same compression level. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]
mimaison commented on code in PR #15516: URL: https://github.com/apache/kafka/pull/15516#discussion_r1598619060 ## clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java: ## @@ -500,27 +501,24 @@ private static Stream testBufferReuseInSkipKeyValueIterator() { @MethodSource public void testZstdJniForSkipKeyValueIterator(int expectedJniCalls, byte[] recordValue) throws IOException { MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 0L, -CompressionType.ZSTD, TimestampType.CREATE_TIME, +Compression.zstd().build(), TimestampType.CREATE_TIME, new SimpleRecord(9L, "hakuna-matata".getBytes(), recordValue) ); // Buffer containing compressed data final ByteBuffer compressedBuf = records.buffer(); // Create a RecordBatch object final DefaultRecordBatch batch = spy(new DefaultRecordBatch(compressedBuf.duplicate())); -final CompressionType mockCompression = mock(CompressionType.ZSTD.getClass()); -doReturn(mockCompression).when(batch).compressionType(); - +final ZstdCompression compression = Compression.zstd().build(); // Buffer containing compressed records to be used for creating zstd-jni stream -ByteBuffer recordsBuffer = compressedBuf.duplicate(); +ByteBuffer recordsBuffer = spy(compressedBuf.duplicate()); Review Comment: Right, it's not needed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]
mimaison commented on code in PR #15516: URL: https://github.com/apache/kafka/pull/15516#discussion_r1598609324 ## clients/src/test/java/org/apache/kafka/common/compress/GzipCompressionTest.java: ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.compress; + +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class GzipCompressionTest { + +@Test +public void testCompressionDecompression() throws IOException { +GzipCompression.Builder builder = Compression.gzip(); +byte[] data = "data".getBytes(StandardCharsets.UTF_8); + +for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) { +for (int level : Arrays.asList(GzipCompression.MIN_LEVEL, GzipCompression.DEFAULT_LEVEL, GzipCompression.MAX_LEVEL)) { +GzipCompression compression = builder.level(level).build(); +ByteBufferOutputStream bufferStream = new ByteBufferOutputStream(4); +try (OutputStream out = compression.wrapForOutput(bufferStream, RecordBatch.CURRENT_MAGIC_VALUE)) { +out.write(data); +out.flush(); +} +bufferStream.buffer().flip(); + +try (InputStream inputStream = compression.wrapForInput(bufferStream.buffer(), magic, BufferSupplier.create())) { +byte[] result = new byte[data.length]; +inputStream.read(result); +assertArrayEquals(data, result); +} +} +} +} + +@Test +public void testCompressionLevels() { +GzipCompression.Builder builder = new GzipCompression.Builder(); + +assertThrows(IllegalArgumentException.class, () -> builder.level(GzipCompression.MIN_LEVEL - 1)); +assertThrows(IllegalArgumentException.class, () -> builder.level(GzipCompression.MAX_LEVEL + 1)); + +builder.level(GzipCompression.MIN_LEVEL); +builder.level(GzipCompression.MAX_LEVEL); Review Comment: It's not needed, if `level()` throws, it automatically fails the test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]
junrao commented on code in PR #15516: URL: https://github.com/apache/kafka/pull/15516#discussion_r1586920863 ## clients/src/main/java/org/apache/kafka/common/compress/NoCompression.java: ## @@ -14,37 +14,48 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.kafka.common.compress; -import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.ByteBufferInputStream; import org.apache.kafka.common.utils.ByteBufferOutputStream; -import org.xerial.snappy.SnappyInputStream; -import org.xerial.snappy.SnappyOutputStream; import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; -public class SnappyFactory { +public class NoCompression implements Compression { -private SnappyFactory() { } +@Override +public CompressionType type() { +return CompressionType.NONE; +} -public static OutputStream wrapForOutput(ByteBufferOutputStream buffer) { -try { -return new SnappyOutputStream(buffer); -} catch (Throwable e) { -throw new KafkaException(e); -} +@Override +public OutputStream wrapForOutput(ByteBufferOutputStream bufferStream, byte messageVersion) { +return bufferStream; } -public static InputStream wrapForInput(ByteBuffer buffer) { -try { -return new SnappyInputStream(new ByteBufferInputStream(buffer)); -} catch (Throwable e) { -throw new KafkaException(e); -} +@Override +public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) { +return new ByteBufferInputStream(buffer); +} + +@Override +public boolean equals(Object o) { +return o instanceof NoCompression; +} + +@Override +public int hashCode() { +return super.hashCode(); Review Comment: Hmm, we redefine `equals()` such that all objects of type NoCompression are equal, yet they have different hashcode? ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java: ## @@ -201,8 +203,10 @@ public static byte[] compress(byte[] raw, CompressionType compressionType) throw public static ByteBuffer decompress(byte[] metrics, CompressionType compressionType) { ByteBuffer data = ByteBuffer.wrap(metrics); -try (InputStream in = compressionType.wrapForInput(data, RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create()); +Compression compression = Compression.of(compressionType).build(); +try (InputStream in = compression.wrapForInput(data, RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create()); ByteBufferOutputStream out = new ByteBufferOutputStream(512)) { + Review Comment: extra new line ## clients/src/main/java/org/apache/kafka/common/compress/Lz4Compression.java: ## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.compress; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.ChunkedBytesStream; + +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.Objects; + +public class Lz4Compression implements Compression { + +public static final int MIN_LEVEL = 1; +public static final int MAX_LEVEL = 17; +public static final int DEFAULT_LEVEL = 9; Review Comment: So, every time we update the Lz4 library, we may need to update the above values? We probably want to add a note here. ## clients/src/main/java/org/apache/kafka/common/compress/Compression.java: ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor
Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]
showuon commented on code in PR #15516: URL: https://github.com/apache/kafka/pull/15516#discussion_r1579334152 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ## @@ -189,7 +189,7 @@ public RecordAccumulator(LogContext logContext, BufferPool bufferPool) { this(logContext, batchSize, -compression, +compression, Review Comment: nit: wrong indent ## clients/src/main/java/org/apache/kafka/common/compress/Compression.java: ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.compress; + +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; + +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +public interface Compression { + +/** + * The compression type for this compression codec + */ +CompressionType type(); + +/** + * Wrap bufferStream with an OutputStream that will compress data with this CompressionType. + * Note: Unlike {@link #wrapForInput}, this cannot take {@link ByteBuffer}s directly. + * Currently, MemoryRecordsBuilder writes to the underlying buffer in the given {@link ByteBufferOutputStream} after the compressed data has been written. + * In the event that the buffer needs to be expanded while writing the data, access to the underlying buffer needs to be preserved. + */ +OutputStream wrapForOutput(ByteBufferOutputStream bufferStream, byte messageVersion); + +/** + * Wrap buffer with an InputStream that will decompress data with this CompressionType. + * + * @param buffer The {@link ByteBuffer} instance holding the data to decompress. + * @param messageVersion The record format version to use. + * @param decompressionBufferSupplier The supplier of ByteBuffer(s) used for decompression if supported. + * For small record batches, allocating a potentially large buffer (64 KB for LZ4) Review Comment: miss `@return` ## clients/src/test/java/org/apache/kafka/common/compress/GzipCompressionTest.java: ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.compress; + +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class GzipCompressionTest { + +@Test +public void testCompressionDecompression() throws IOException { +GzipCompression.Builder builder = Compression.gzip(); +byte[] data = "data".getBytes(StandardCharsets.UTF_8); + +for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) { +for (int level : Arrays.asList(GzipCompression.MIN_LEVEL, GzipCompression.DEFAULT_LEVEL,
Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]
mimaison commented on PR #15516: URL: https://github.com/apache/kafka/pull/15516#issuecomment-2048128916 @divijvaidya It seems you've done a bit of work around compression in the past. Can you take a look? Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org