This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch 3.5
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 0cadf0db714a5f0bea52e160217198c1233cf0a1
Author: Luke Chen <show...@gmail.com>
AuthorDate: Wed Sep 27 19:00:50 2023 +0800

    KAFKA-15498: bump snappy-java version to 1.1.10.4 (#14434)
    
    bump snappy-java version to 1.1.10.4, and add more tests to verify the 
compressed data can be correctly decompressed and read.
    
    For LogCleanerParameterizedIntegrationTest, we increased the message size 
for snappy decompression since in the new version of snappy, the decompressed 
size is increasing compared with the previous version. But since the 
compression algorithm is not kafka's scope, all we need to do is to make sure 
the compressed data can be successfully decompressed and parsed/read.
    
    Reviewers: Divij Vaidya <di...@amazon.com>, Ismael Juma 
<ism...@juma.me.uk>, Josep Prat <josep.p...@aiven.io>, Kamal Chandraprakash 
<kamal.chandraprak...@gmail.com>
---
 LICENSE-binary                                     |  2 +-
 .../kafka/api/ProducerCompressionTest.scala        | 63 ++++++++++++++++++----
 .../LogCleanerParameterizedIntegrationTest.scala   |  4 +-
 gradle/dependencies.gradle                         |  2 +-
 4 files changed, 58 insertions(+), 13 deletions(-)

diff --git a/LICENSE-binary b/LICENSE-binary
index 115cf010a85..8ea42190c6f 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -252,7 +252,7 @@ scala-library-2.13.10
 scala-logging_2.13-3.9.4
 scala-reflect-2.13.10
 scala-java8-compat_2.13-1.0.2
-snappy-java-1.1.10.1
+snappy-java-1.1.10.4
 swagger-annotations-2.2.8
 zookeeper-3.6.4
 zookeeper-jute-3.6.4
diff --git 
a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala 
b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
index b1e39ebde49..6135ec952ca 100755
--- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
@@ -19,8 +19,10 @@ package kafka.api.test
 
 import kafka.server.{KafkaBroker, KafkaConfig, QuorumTestHarness}
 import kafka.utils.TestUtils
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord, RecordMetadata}
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.header.Header
+import org.apache.kafka.common.header.internals.{RecordHeader, RecordHeaders}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.serialization.ByteArraySerializer
@@ -29,7 +31,10 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, 
TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.CsvSource
 
+import java.util.concurrent.Future
 import java.util.{Collections, Properties}
+import scala.collection.mutable.ListBuffer
+import scala.util.Random
 
 class ProducerCompressionTest extends QuorumTestHarness {
 
@@ -64,10 +69,10 @@ class ProducerCompressionTest extends QuorumTestHarness {
     "kraft,snappy",
     "kraft,lz4",
     "kraft,zstd",
-    "zk,gzip"
+    "zk,gzip",
+    "zk,snappy"
   ))
   def testCompression(quorum: String, compression: String): Unit = {
-
     val producerProps = new Properties()
     val bootstrapServers = TestUtils.plaintextBootstrapServers(Seq(broker))
     producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers)
@@ -88,13 +93,28 @@ class ProducerCompressionTest extends QuorumTestHarness {
       }
       val partition = 0
 
+      def messageValue(length: Int): String = {
+        val random = new Random(0)
+        new String(random.alphanumeric.take(length).toArray)
+      }
+
       // prepare the messages
-      val messageValues = (0 until numRecords).map(i => "value" + i)
+      val messageValues = (0 until numRecords).map(i => messageValue(i))
+      val headerArr = Array[Header](new RecordHeader("key", "value".getBytes))
+      val headers = new RecordHeaders(headerArr)
 
       // make sure the returned messages are correct
       val now = System.currentTimeMillis()
-      val responses = for (message <- messageValues)
-        yield producer.send(new ProducerRecord(topic, null, now, null, 
message.getBytes))
+      val responses: ListBuffer[Future[RecordMetadata]] = new 
ListBuffer[Future[RecordMetadata]]()
+
+      for (message <- messageValues) {
+        // 1. send message without key and header
+        responses += producer.send(new ProducerRecord(topic, null, now, null, 
message.getBytes))
+        // 2. send message with key, without header
+        responses += producer.send(new ProducerRecord(topic, null, now, 
message.length.toString.getBytes, message.getBytes))
+        // 3. send message with key and header
+        responses += producer.send(new ProducerRecord(topic, null, now, 
message.length.toString.getBytes, message.getBytes, headers))
+      }
       for ((future, offset) <- responses.zipWithIndex) {
         assertEquals(offset.toLong, future.get.offset)
       }
@@ -103,12 +123,37 @@ class ProducerCompressionTest extends QuorumTestHarness {
       // make sure the fetched message count match
       consumer.assign(Collections.singleton(tp))
       consumer.seek(tp, 0)
-      val records = TestUtils.consumeRecords(consumer, numRecords)
+      val records = TestUtils.consumeRecords(consumer, numRecords*3)
+
+      for (i <- 0 until numRecords) {
+        val messageValue = messageValues(i)
+        // 1. verify message without key and header
+        var offset = i * 3
+        var record = records(offset)
+        assertNull(record.key())
+        assertEquals(messageValue, new String(record.value))
+        assertEquals(0, record.headers().toArray.length)
+        assertEquals(now, record.timestamp)
+        assertEquals(offset.toLong, record.offset)
+
+        // 2. verify message with key, without header
+        offset = i * 3 + 1
+        record = records(offset)
+        assertEquals(messageValue.length.toString, new String(record.key()))
+        assertEquals(messageValue, new String(record.value))
+        assertEquals(0, record.headers().toArray.length)
+        assertEquals(now, record.timestamp)
+        assertEquals(offset.toLong, record.offset)
 
-      for (((messageValue, record), index) <- 
messageValues.zip(records).zipWithIndex) {
+        // 3. verify message with key and header
+        offset = i * 3 + 2
+        record = records(offset)
+        assertEquals(messageValue.length.toString, new String(record.key()))
         assertEquals(messageValue, new String(record.value))
+        assertEquals(1, record.headers().toArray.length)
+        assertEquals(headerArr.apply(0), record.headers().toArray.apply(0))
         assertEquals(now, record.timestamp)
-        assertEquals(index.toLong, record.offset)
+        assertEquals(offset.toLong, record.offset)
       }
     } finally {
       producer.close()
diff --git 
a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
 
b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
index 9dc94082820..9ba10aade95 100755
--- 
a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
+++ 
b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
@@ -143,9 +143,9 @@ class LogCleanerParameterizedIntegrationTest extends 
AbstractLogCleanerIntegrati
       case _ =>
         // the broker assigns absolute offsets for message format 0 which 
potentially causes the compressed size to
         // increase because the broker offsets are larger than the ones 
assigned by the client
-        // adding `5` to the message set size is good enough for this test: it 
covers the increased message size while
+        // adding `6` to the message set size is good enough for this test: it 
covers the increased message size while
         // still being less than the overhead introduced by the conversion 
from message format version 0 to 1
-        largeMessageSet.sizeInBytes + 5
+        largeMessageSet.sizeInBytes + 6
     }
 
     cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = 
maxMessageSize)
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 735f98cfcd4..24f5859252a 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -122,7 +122,7 @@ versions += [
   scalaJava8Compat : "1.0.2",
   scoverage: "1.9.3",
   slf4j: "1.7.36",
-  snappy: "1.1.10.1",
+  snappy: "1.1.10.4",
   spotbugs: "4.7.3",
   // New version of Swagger 2.2.14 requires minimum JDK 11.
   swaggerAnnotations: "2.2.8",

Reply via email to