jolshan commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r743350813
########## File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala ########## @@ -3530,37 +3534,37 @@ class KafkaApisTest { def testSizeOfThrottledPartitions(): Unit = { val topicNames = new util.HashMap[Uuid, String] val topicIds = new util.HashMap[String, Uuid]() - def fetchResponse(data: Map[TopicPartition, String]): FetchResponse = { - val responseData = new util.LinkedHashMap[TopicPartition, FetchResponseData.PartitionData]( + def fetchResponse(data: Map[TopicIdPartition, String]): FetchResponse = { + val responseData = new util.LinkedHashMap[TopicIdPartition, FetchResponseData.PartitionData]( data.map { case (tp, raw) => tp -> new FetchResponseData.PartitionData() - .setPartitionIndex(tp.partition) + .setPartitionIndex(tp.topicPartition.partition) .setHighWatermark(105) .setLastStableOffset(105) .setLogStartOffset(0) .setRecords(MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(100, raw.getBytes(StandardCharsets.UTF_8)))) }.toMap.asJava) data.foreach{case (tp, _) => - val id = Uuid.randomUuid() - topicIds.put(tp.topic(), id) - topicNames.put(id, tp.topic()) + topicIds.put(tp.topicPartition.topic, tp.topicId) + topicNames.put(tp.topicId, tp.topicPartition.topic) } - FetchResponse.of(Errors.NONE, 100, 100, responseData, topicIds) + FetchResponse.of(Errors.NONE, 100, 100, responseData) } - val throttledPartition = new TopicPartition("throttledData", 0) + val throttledPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("throttledData", 0)) val throttledData = Map(throttledPartition -> "throttledData") val expectedSize = FetchResponse.sizeOf(FetchResponseData.HIGHEST_SUPPORTED_VERSION, - fetchResponse(throttledData).responseData(topicNames, FetchResponseData.HIGHEST_SUPPORTED_VERSION).entrySet.iterator, topicIds) + fetchResponse(throttledData).responseData(topicNames, FetchResponseData.HIGHEST_SUPPORTED_VERSION).entrySet.asScala.map( entry => + (new TopicIdPartition(Uuid.ZERO_UUID, entry.getKey), entry.getValue)).toMap.asJava.entrySet.iterator) - val response = fetchResponse(throttledData ++ Map(new TopicPartition("nonThrottledData", 0) -> "nonThrottledData")) + val response = fetchResponse(throttledData ++ Map(new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("nonThrottledData", 0)) -> "nonThrottledData")) val quota = Mockito.mock(classOf[ReplicationQuotaManager]) Mockito.when(quota.isThrottled(ArgumentMatchers.any(classOf[TopicPartition]))) - .thenAnswer(invocation => throttledPartition == invocation.getArgument(0).asInstanceOf[TopicPartition]) + .thenAnswer(invocation => throttledPartition.topicPartition == invocation.getArgument(0).asInstanceOf[TopicPartition]) - assertEquals(expectedSize, KafkaApis.sizeOfThrottledPartitions(FetchResponseData.HIGHEST_SUPPORTED_VERSION, response, quota, topicIds)) + assertEquals(expectedSize, KafkaApis.sizeOfThrottledPartitions(FetchResponseData.HIGHEST_SUPPORTED_VERSION, response, quota)) } @Test Review comment: What logic are we thinking? Checking that the unresolved topics are handled correctly? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org