jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r655642933



##########
File path: core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
##########
@@ -116,61 +122,85 @@ class FetchRequestTest extends BaseRequestTest {
     val fetchRequest1 = createFetchRequest(shuffledTopicPartitions1)
     val fetchResponse1 = sendFetchRequest(leaderId, fetchRequest1)
     checkFetchResponse(shuffledTopicPartitions1, fetchResponse1, 
maxPartitionBytes, maxResponseBytes, messagesPerPartition)
+    val fetchRequest1V12 = createFetchRequest(shuffledTopicPartitions1, 
version = 12)
+    val fetchResponse1V12 = sendFetchRequest(leaderId, fetchRequest1V12)
+    checkFetchResponse(shuffledTopicPartitions1, fetchResponse1V12, 
maxPartitionBytes, maxResponseBytes, messagesPerPartition, 12)
 
     // 2. Same as 1, but shuffled again
     val shuffledTopicPartitions2 = 
random.shuffle(partitionsWithoutLargeMessages) ++ partitionsWithLargeMessages
     val fetchRequest2 = createFetchRequest(shuffledTopicPartitions2)
     val fetchResponse2 = sendFetchRequest(leaderId, fetchRequest2)
     checkFetchResponse(shuffledTopicPartitions2, fetchResponse2, 
maxPartitionBytes, maxResponseBytes, messagesPerPartition)
+    val fetchRequest2V12 = createFetchRequest(shuffledTopicPartitions2, 
version = 12)
+    val fetchResponse2V12 = sendFetchRequest(leaderId, fetchRequest2V12)
+    checkFetchResponse(shuffledTopicPartitions2, fetchResponse2V12, 
maxPartitionBytes, maxResponseBytes, messagesPerPartition, 12)
 
     // 3. Partition with message larger than the partition limit at the start 
of the list
     val shuffledTopicPartitions3 = Seq(partitionWithLargeMessage1, 
partitionWithLargeMessage2) ++
       random.shuffle(partitionsWithoutLargeMessages)
     val fetchRequest3 = createFetchRequest(shuffledTopicPartitions3, 
Map(partitionWithLargeMessage1 -> messagesPerPartition))
     val fetchResponse3 = sendFetchRequest(leaderId, fetchRequest3)
-    assertEquals(shuffledTopicPartitions3, 
fetchResponse3.responseData.keySet.asScala.toSeq)
-    val responseSize3 = fetchResponse3.responseData.asScala.values.map { 
partitionData =>
-      records(partitionData).map(_.sizeInBytes).sum
-    }.sum
-    assertTrue(responseSize3 <= maxResponseBytes)
-    val partitionData3 = 
fetchResponse3.responseData.get(partitionWithLargeMessage1)
-    assertEquals(Errors.NONE.code, partitionData3.errorCode)
-    assertTrue(partitionData3.highWatermark > 0)
-    val size3 = records(partitionData3).map(_.sizeInBytes).sum
-    assertTrue(size3 <= maxResponseBytes, s"Expected $size3 to be smaller than 
$maxResponseBytes")
-    assertTrue(size3 > maxPartitionBytes, s"Expected $size3 to be larger than 
$maxPartitionBytes")
-    assertTrue(maxPartitionBytes < FetchResponse.recordsSize(partitionData3))
+    val fetchRequest3V12 = createFetchRequest(shuffledTopicPartitions3, 
Map(partitionWithLargeMessage1 -> messagesPerPartition), 12)
+    val fetchResponse3V12 = sendFetchRequest(leaderId, fetchRequest3V12)
+    def evaluateResponse3(response: FetchResponse, version: Short = 
ApiKeys.FETCH.latestVersion()) = {
+      val responseData = response.responseData(topicNames, version)
+      assertEquals(shuffledTopicPartitions3, responseData.keySet.asScala.toSeq)
+      val responseSize = responseData.asScala.values.map { partitionData =>
+        records(partitionData).map(_.sizeInBytes).sum
+      }.sum
+      assertTrue(responseSize <= maxResponseBytes)
+      val partitionData = responseData.get(partitionWithLargeMessage1)
+      assertEquals(Errors.NONE.code, partitionData.errorCode)
+      assertTrue(partitionData.highWatermark > 0)
+      val size3 = records(partitionData).map(_.sizeInBytes).sum
+      assertTrue(size3 <= maxResponseBytes, s"Expected $size3 to be smaller 
than $maxResponseBytes")
+      assertTrue(size3 > maxPartitionBytes, s"Expected $size3 to be larger 
than $maxPartitionBytes")
+      assertTrue(maxPartitionBytes < partitionData.records.sizeInBytes)
+    }
+    evaluateResponse3(fetchResponse3)
+    evaluateResponse3(fetchResponse3V12, 12)
 
     // 4. Partition with message larger than the response limit at the start 
of the list
     val shuffledTopicPartitions4 = Seq(partitionWithLargeMessage2, 
partitionWithLargeMessage1) ++
       random.shuffle(partitionsWithoutLargeMessages)
     val fetchRequest4 = createFetchRequest(shuffledTopicPartitions4, 
Map(partitionWithLargeMessage2 -> messagesPerPartition))
     val fetchResponse4 = sendFetchRequest(leaderId, fetchRequest4)
-    assertEquals(shuffledTopicPartitions4, 
fetchResponse4.responseData.keySet.asScala.toSeq)
-    val nonEmptyPartitions4 = 
fetchResponse4.responseData.asScala.toSeq.collect {
-      case (tp, partitionData) if 
records(partitionData).map(_.sizeInBytes).sum > 0 => tp
+    val fetchRequest4V12 = createFetchRequest(shuffledTopicPartitions4, 
Map(partitionWithLargeMessage2 -> messagesPerPartition), 12)
+    val fetchResponse4V12 = sendFetchRequest(leaderId, fetchRequest4V12)
+    def evaluateResponse4(response: FetchResponse, version: Short = 
ApiKeys.FETCH.latestVersion()) = {
+      val responseData = response.responseData(topicNames, version)
+      assertEquals(shuffledTopicPartitions4, responseData.keySet.asScala.toSeq)
+      val nonEmptyPartitions = responseData.asScala.toSeq.collect {
+        case (tp, partitionData) if 
records(partitionData).map(_.sizeInBytes).sum > 0 => tp
+      }
+      assertEquals(Seq(partitionWithLargeMessage2), nonEmptyPartitions)
+      val partitionData = responseData.get(partitionWithLargeMessage2)
+      assertEquals(Errors.NONE.code, partitionData.errorCode)
+      assertTrue(partitionData.highWatermark > 0)
+      val size4 = records(partitionData).map(_.sizeInBytes).sum
+      assertTrue(size4 > maxResponseBytes, s"Expected $size4 to be larger than 
$maxResponseBytes")
+      assertTrue(maxResponseBytes < partitionData.records.sizeInBytes)
     }
-    assertEquals(Seq(partitionWithLargeMessage2), nonEmptyPartitions4)
-    val partitionData4 = 
fetchResponse4.responseData.get(partitionWithLargeMessage2)
-    assertEquals(Errors.NONE.code, partitionData4.errorCode)
-    assertTrue(partitionData4.highWatermark > 0)
-    val size4 = records(partitionData4).map(_.sizeInBytes).sum
-    assertTrue(size4 > maxResponseBytes, s"Expected $size4 to be larger than 
$maxResponseBytes")
-    assertTrue(maxResponseBytes < FetchResponse.recordsSize(partitionData4))
+    evaluateResponse4(fetchResponse4)
+    evaluateResponse4(fetchResponse4V12, 12)
   }
 
   @Test
   def testFetchRequestV2WithOversizedMessage(): Unit = {
     initProducer()
     val maxPartitionBytes = 200
     val (topicPartition, leaderId) = createTopics(numTopics = 1, numPartitions 
= 1).head
+    val topicIds = getTopicIds().asJava
+    val topicNames = topicIds.asScala.map(_.swap).asJava
     producer.send(new ProducerRecord(topicPartition.topic, 
topicPartition.partition,
       "key", new String(new Array[Byte](maxPartitionBytes + 1)))).get
-    val fetchRequest = FetchRequest.Builder.forConsumer(Int.MaxValue, 0, 
createPartitionMap(maxPartitionBytes,
-      Seq(topicPartition))).build(2)
+    val fetchRequest = FetchRequest.Builder.forConsumer(2, Int.MaxValue, 0, 
createPartitionMap(maxPartitionBytes,
+      Seq(topicPartition)), topicIds).build(2)
     val fetchResponse = sendFetchRequest(leaderId, fetchRequest)
-    val partitionData = fetchResponse.responseData.get(topicPartition)
+    val partitionData = fetchResponse.responseData(topicNames, 
2).get(topicPartition)
     assertEquals(Errors.NONE.code, partitionData.errorCode)
+    //assertEquals(Errors.NONE.code, partitionData.errorCode)

Review comment:
       Ah apologies I did not clean up as well as I should have.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to