jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r655642933
########## File path: core/src/test/scala/unit/kafka/server/FetchRequestTest.scala ########## @@ -116,61 +122,85 @@ class FetchRequestTest extends BaseRequestTest { val fetchRequest1 = createFetchRequest(shuffledTopicPartitions1) val fetchResponse1 = sendFetchRequest(leaderId, fetchRequest1) checkFetchResponse(shuffledTopicPartitions1, fetchResponse1, maxPartitionBytes, maxResponseBytes, messagesPerPartition) + val fetchRequest1V12 = createFetchRequest(shuffledTopicPartitions1, version = 12) + val fetchResponse1V12 = sendFetchRequest(leaderId, fetchRequest1V12) + checkFetchResponse(shuffledTopicPartitions1, fetchResponse1V12, maxPartitionBytes, maxResponseBytes, messagesPerPartition, 12) // 2. Same as 1, but shuffled again val shuffledTopicPartitions2 = random.shuffle(partitionsWithoutLargeMessages) ++ partitionsWithLargeMessages val fetchRequest2 = createFetchRequest(shuffledTopicPartitions2) val fetchResponse2 = sendFetchRequest(leaderId, fetchRequest2) checkFetchResponse(shuffledTopicPartitions2, fetchResponse2, maxPartitionBytes, maxResponseBytes, messagesPerPartition) + val fetchRequest2V12 = createFetchRequest(shuffledTopicPartitions2, version = 12) + val fetchResponse2V12 = sendFetchRequest(leaderId, fetchRequest2V12) + checkFetchResponse(shuffledTopicPartitions2, fetchResponse2V12, maxPartitionBytes, maxResponseBytes, messagesPerPartition, 12) // 3. Partition with message larger than the partition limit at the start of the list val shuffledTopicPartitions3 = Seq(partitionWithLargeMessage1, partitionWithLargeMessage2) ++ random.shuffle(partitionsWithoutLargeMessages) val fetchRequest3 = createFetchRequest(shuffledTopicPartitions3, Map(partitionWithLargeMessage1 -> messagesPerPartition)) val fetchResponse3 = sendFetchRequest(leaderId, fetchRequest3) - assertEquals(shuffledTopicPartitions3, fetchResponse3.responseData.keySet.asScala.toSeq) - val responseSize3 = fetchResponse3.responseData.asScala.values.map { partitionData => - records(partitionData).map(_.sizeInBytes).sum - }.sum - assertTrue(responseSize3 <= maxResponseBytes) - val partitionData3 = fetchResponse3.responseData.get(partitionWithLargeMessage1) - assertEquals(Errors.NONE.code, partitionData3.errorCode) - assertTrue(partitionData3.highWatermark > 0) - val size3 = records(partitionData3).map(_.sizeInBytes).sum - assertTrue(size3 <= maxResponseBytes, s"Expected $size3 to be smaller than $maxResponseBytes") - assertTrue(size3 > maxPartitionBytes, s"Expected $size3 to be larger than $maxPartitionBytes") - assertTrue(maxPartitionBytes < FetchResponse.recordsSize(partitionData3)) + val fetchRequest3V12 = createFetchRequest(shuffledTopicPartitions3, Map(partitionWithLargeMessage1 -> messagesPerPartition), 12) + val fetchResponse3V12 = sendFetchRequest(leaderId, fetchRequest3V12) + def evaluateResponse3(response: FetchResponse, version: Short = ApiKeys.FETCH.latestVersion()) = { + val responseData = response.responseData(topicNames, version) + assertEquals(shuffledTopicPartitions3, responseData.keySet.asScala.toSeq) + val responseSize = responseData.asScala.values.map { partitionData => + records(partitionData).map(_.sizeInBytes).sum + }.sum + assertTrue(responseSize <= maxResponseBytes) + val partitionData = responseData.get(partitionWithLargeMessage1) + assertEquals(Errors.NONE.code, partitionData.errorCode) + assertTrue(partitionData.highWatermark > 0) + val size3 = records(partitionData).map(_.sizeInBytes).sum + assertTrue(size3 <= maxResponseBytes, s"Expected $size3 to be smaller than $maxResponseBytes") + assertTrue(size3 > maxPartitionBytes, s"Expected $size3 to be larger than $maxPartitionBytes") + assertTrue(maxPartitionBytes < partitionData.records.sizeInBytes) + } + evaluateResponse3(fetchResponse3) + evaluateResponse3(fetchResponse3V12, 12) // 4. Partition with message larger than the response limit at the start of the list val shuffledTopicPartitions4 = Seq(partitionWithLargeMessage2, partitionWithLargeMessage1) ++ random.shuffle(partitionsWithoutLargeMessages) val fetchRequest4 = createFetchRequest(shuffledTopicPartitions4, Map(partitionWithLargeMessage2 -> messagesPerPartition)) val fetchResponse4 = sendFetchRequest(leaderId, fetchRequest4) - assertEquals(shuffledTopicPartitions4, fetchResponse4.responseData.keySet.asScala.toSeq) - val nonEmptyPartitions4 = fetchResponse4.responseData.asScala.toSeq.collect { - case (tp, partitionData) if records(partitionData).map(_.sizeInBytes).sum > 0 => tp + val fetchRequest4V12 = createFetchRequest(shuffledTopicPartitions4, Map(partitionWithLargeMessage2 -> messagesPerPartition), 12) + val fetchResponse4V12 = sendFetchRequest(leaderId, fetchRequest4V12) + def evaluateResponse4(response: FetchResponse, version: Short = ApiKeys.FETCH.latestVersion()) = { + val responseData = response.responseData(topicNames, version) + assertEquals(shuffledTopicPartitions4, responseData.keySet.asScala.toSeq) + val nonEmptyPartitions = responseData.asScala.toSeq.collect { + case (tp, partitionData) if records(partitionData).map(_.sizeInBytes).sum > 0 => tp + } + assertEquals(Seq(partitionWithLargeMessage2), nonEmptyPartitions) + val partitionData = responseData.get(partitionWithLargeMessage2) + assertEquals(Errors.NONE.code, partitionData.errorCode) + assertTrue(partitionData.highWatermark > 0) + val size4 = records(partitionData).map(_.sizeInBytes).sum + assertTrue(size4 > maxResponseBytes, s"Expected $size4 to be larger than $maxResponseBytes") + assertTrue(maxResponseBytes < partitionData.records.sizeInBytes) } - assertEquals(Seq(partitionWithLargeMessage2), nonEmptyPartitions4) - val partitionData4 = fetchResponse4.responseData.get(partitionWithLargeMessage2) - assertEquals(Errors.NONE.code, partitionData4.errorCode) - assertTrue(partitionData4.highWatermark > 0) - val size4 = records(partitionData4).map(_.sizeInBytes).sum - assertTrue(size4 > maxResponseBytes, s"Expected $size4 to be larger than $maxResponseBytes") - assertTrue(maxResponseBytes < FetchResponse.recordsSize(partitionData4)) + evaluateResponse4(fetchResponse4) + evaluateResponse4(fetchResponse4V12, 12) } @Test def testFetchRequestV2WithOversizedMessage(): Unit = { initProducer() val maxPartitionBytes = 200 val (topicPartition, leaderId) = createTopics(numTopics = 1, numPartitions = 1).head + val topicIds = getTopicIds().asJava + val topicNames = topicIds.asScala.map(_.swap).asJava producer.send(new ProducerRecord(topicPartition.topic, topicPartition.partition, "key", new String(new Array[Byte](maxPartitionBytes + 1)))).get - val fetchRequest = FetchRequest.Builder.forConsumer(Int.MaxValue, 0, createPartitionMap(maxPartitionBytes, - Seq(topicPartition))).build(2) + val fetchRequest = FetchRequest.Builder.forConsumer(2, Int.MaxValue, 0, createPartitionMap(maxPartitionBytes, + Seq(topicPartition)), topicIds).build(2) val fetchResponse = sendFetchRequest(leaderId, fetchRequest) - val partitionData = fetchResponse.responseData.get(topicPartition) + val partitionData = fetchResponse.responseData(topicNames, 2).get(topicPartition) assertEquals(Errors.NONE.code, partitionData.errorCode) + //assertEquals(Errors.NONE.code, partitionData.errorCode) Review comment: Ah apologies I did not clean up as well as I should have. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org