This is an automated email from the ASF dual-hosted git repository. mimaison pushed a commit to branch 3.5 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 0cadf0db714a5f0bea52e160217198c1233cf0a1 Author: Luke Chen <show...@gmail.com> AuthorDate: Wed Sep 27 19:00:50 2023 +0800 KAFKA-15498: bump snappy-java version to 1.1.10.4 (#14434) bump snappy-java version to 1.1.10.4, and add more tests to verify the compressed data can be correctly decompressed and read. For LogCleanerParameterizedIntegrationTest, we increased the message size for snappy decompression since in the new version of snappy, the decompressed size is increasing compared with the previous version. But since the compression algorithm is not kafka's scope, all we need to do is to make sure the compressed data can be successfully decompressed and parsed/read. Reviewers: Divij Vaidya <di...@amazon.com>, Ismael Juma <ism...@juma.me.uk>, Josep Prat <josep.p...@aiven.io>, Kamal Chandraprakash <kamal.chandraprak...@gmail.com> --- LICENSE-binary | 2 +- .../kafka/api/ProducerCompressionTest.scala | 63 ++++++++++++++++++---- .../LogCleanerParameterizedIntegrationTest.scala | 4 +- gradle/dependencies.gradle | 2 +- 4 files changed, 58 insertions(+), 13 deletions(-) diff --git a/LICENSE-binary b/LICENSE-binary index 115cf010a85..8ea42190c6f 100644 --- a/LICENSE-binary +++ b/LICENSE-binary @@ -252,7 +252,7 @@ scala-library-2.13.10 scala-logging_2.13-3.9.4 scala-reflect-2.13.10 scala-java8-compat_2.13-1.0.2 -snappy-java-1.1.10.1 +snappy-java-1.1.10.4 swagger-annotations-2.2.8 zookeeper-3.6.4 zookeeper-jute-3.6.4 diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala index b1e39ebde49..6135ec952ca 100755 --- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala @@ -19,8 +19,10 @@ package kafka.api.test import kafka.server.{KafkaBroker, KafkaConfig, QuorumTestHarness} import kafka.utils.TestUtils -import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata} import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.header.Header +import org.apache.kafka.common.header.internals.{RecordHeader, RecordHeaders} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.serialization.ByteArraySerializer @@ -29,7 +31,10 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.CsvSource +import java.util.concurrent.Future import java.util.{Collections, Properties} +import scala.collection.mutable.ListBuffer +import scala.util.Random class ProducerCompressionTest extends QuorumTestHarness { @@ -64,10 +69,10 @@ class ProducerCompressionTest extends QuorumTestHarness { "kraft,snappy", "kraft,lz4", "kraft,zstd", - "zk,gzip" + "zk,gzip", + "zk,snappy" )) def testCompression(quorum: String, compression: String): Unit = { - val producerProps = new Properties() val bootstrapServers = TestUtils.plaintextBootstrapServers(Seq(broker)) producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers) @@ -88,13 +93,28 @@ class ProducerCompressionTest extends QuorumTestHarness { } val partition = 0 + def messageValue(length: Int): String = { + val random = new Random(0) + new String(random.alphanumeric.take(length).toArray) + } + // prepare the messages - val messageValues = (0 until numRecords).map(i => "value" + i) + val messageValues = (0 until numRecords).map(i => messageValue(i)) + val headerArr = Array[Header](new RecordHeader("key", "value".getBytes)) + val headers = new RecordHeaders(headerArr) // make sure the returned messages are correct val now = System.currentTimeMillis() - val responses = for (message <- messageValues) - yield producer.send(new ProducerRecord(topic, null, now, null, message.getBytes)) + val responses: ListBuffer[Future[RecordMetadata]] = new ListBuffer[Future[RecordMetadata]]() + + for (message <- messageValues) { + // 1. send message without key and header + responses += producer.send(new ProducerRecord(topic, null, now, null, message.getBytes)) + // 2. send message with key, without header + responses += producer.send(new ProducerRecord(topic, null, now, message.length.toString.getBytes, message.getBytes)) + // 3. send message with key and header + responses += producer.send(new ProducerRecord(topic, null, now, message.length.toString.getBytes, message.getBytes, headers)) + } for ((future, offset) <- responses.zipWithIndex) { assertEquals(offset.toLong, future.get.offset) } @@ -103,12 +123,37 @@ class ProducerCompressionTest extends QuorumTestHarness { // make sure the fetched message count match consumer.assign(Collections.singleton(tp)) consumer.seek(tp, 0) - val records = TestUtils.consumeRecords(consumer, numRecords) + val records = TestUtils.consumeRecords(consumer, numRecords*3) + + for (i <- 0 until numRecords) { + val messageValue = messageValues(i) + // 1. verify message without key and header + var offset = i * 3 + var record = records(offset) + assertNull(record.key()) + assertEquals(messageValue, new String(record.value)) + assertEquals(0, record.headers().toArray.length) + assertEquals(now, record.timestamp) + assertEquals(offset.toLong, record.offset) + + // 2. verify message with key, without header + offset = i * 3 + 1 + record = records(offset) + assertEquals(messageValue.length.toString, new String(record.key())) + assertEquals(messageValue, new String(record.value)) + assertEquals(0, record.headers().toArray.length) + assertEquals(now, record.timestamp) + assertEquals(offset.toLong, record.offset) - for (((messageValue, record), index) <- messageValues.zip(records).zipWithIndex) { + // 3. verify message with key and header + offset = i * 3 + 2 + record = records(offset) + assertEquals(messageValue.length.toString, new String(record.key())) assertEquals(messageValue, new String(record.value)) + assertEquals(1, record.headers().toArray.length) + assertEquals(headerArr.apply(0), record.headers().toArray.apply(0)) assertEquals(now, record.timestamp) - assertEquals(index.toLong, record.offset) + assertEquals(offset.toLong, record.offset) } } finally { producer.close() diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala index 9dc94082820..9ba10aade95 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala @@ -143,9 +143,9 @@ class LogCleanerParameterizedIntegrationTest extends AbstractLogCleanerIntegrati case _ => // the broker assigns absolute offsets for message format 0 which potentially causes the compressed size to // increase because the broker offsets are larger than the ones assigned by the client - // adding `5` to the message set size is good enough for this test: it covers the increased message size while + // adding `6` to the message set size is good enough for this test: it covers the increased message size while // still being less than the overhead introduced by the conversion from message format version 0 to 1 - largeMessageSet.sizeInBytes + 5 + largeMessageSet.sizeInBytes + 6 } cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = maxMessageSize) diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 735f98cfcd4..24f5859252a 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -122,7 +122,7 @@ versions += [ scalaJava8Compat : "1.0.2", scoverage: "1.9.3", slf4j: "1.7.36", - snappy: "1.1.10.1", + snappy: "1.1.10.4", spotbugs: "4.7.3", // New version of Swagger 2.2.14 requires minimum JDK 11. swaggerAnnotations: "2.2.8",