josefk31 commented on code in PR #19664:
URL: https://github.com/apache/kafka/pull/19664#discussion_r2114212111
##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -13321,6 +13323,284 @@ class KafkaApisTest extends Logging {
assertEquals(alterShareGroupOffsetsResponseData, response.data)
}
+ def verifyGetReplicaLogInfoRequest(builder:
GetReplicaLogInfoRequest.Builder, withResponse: (GetReplicaLogInfoResponse =>
Unit)): Unit = {
+ val request = buildRequest(builder.build())
+ val authorizer: Authorizer = mock(classOf[Authorizer])
+ val clusterResource = new ResourcePattern(ResourceType.CLUSTER,
Resource.CLUSTER_NAME, PatternType.LITERAL)
+ val clusterActions = Collections.singletonList(new
Action(AclOperation.CLUSTER_ACTION, clusterResource, 1, true, true))
+ val allowList = Collections.singletonList(AuthorizationResult.ALLOWED)
+ when(authorizer.authorize(request.context,
clusterActions)).thenReturn(allowList)
+ kafkaApis = createKafkaApis(authorizer = Some(authorizer))
+ kafkaApis.handleGetReplicaLogInfo(request)
+ withResponse(verifyNoThrottling[GetReplicaLogInfoResponse](request))
+ }
+
+ @Test
+ def testUnauthorizedGetReplicaLogInfo(): Unit = {
+ val builder = new GetReplicaLogInfoRequest.Builder(new
GetReplicaLogInfoRequestData())
+ val request = buildRequest(builder.build(0))
+ val clusterResource = new ResourcePattern(ResourceType.CLUSTER,
Resource.CLUSTER_NAME, PatternType.LITERAL)
+ val clusterActions = Collections.singletonList(new
Action(AclOperation.CLUSTER_ACTION, clusterResource, 1, true, true))
+ val allowList = Collections.singletonList(AuthorizationResult.DENIED)
+ val authorizer: Authorizer = mock(classOf[Authorizer])
+ when(authorizer.authorize(request.context,
clusterActions)).thenReturn(allowList)
+ kafkaApis = createKafkaApis(authorizer = Some(authorizer))
+ assertThrows(classOf[ClusterAuthorizationException],
+ () => kafkaApis.handleGetReplicaLogInfo(request))
+ }
+
+ @Test
+ def testGetReplicaLogInfoFailedToRetrievePartition(): Unit = {
+ val uuids = (1 to 4).map(_ => Uuid.randomUuid()).toList
+ val tps = uuids.map(new GetReplicaLogInfoRequestData.TopicPartitions()
+ .setTopicId(_)
+ .setPartitions(Collections.singletonList(1)))
+
+ def mockTopicName(uuid: Uuid, idx: Int): String =
s"topic-idx-$idx-with-uuid-$uuid"
+
+ metadataCache = mock(classOf[MetadataCache])
+ uuids.zipWithIndex.foreach { case (uuid, idx) =>
+ val name = mockTopicName(uuid, idx)
+ when(metadataCache.getTopicName(uuid)).thenReturn(Optional.of(name))
+ }
+
+ val log = mock(classOf[UnifiedLog])
+ when(log.logEndOffset).thenReturn(100L)
+ when(log.latestEpoch).thenReturn(Optional.of(10))
+ val partition = mock(classOf[Partition])
+ when(partition.log).thenReturn(Some(log))
+ when(partition.getLeaderEpoch).thenReturn(1)
+ when(partition.partitionId).thenReturn(1)
+ val valid = new TopicPartition(mockTopicName(uuids.head, 0), 1)
+
when(replicaManager.getPartitionOrError(valid)).thenReturn(Right(partition))
+
+ val expected = new
GetReplicaLogInfoResponseData().setBrokerEpoch(brokerEpoch)
+ expected.topicPartitionLogInfoList().add(new
GetReplicaLogInfoResponseData.TopicPartitionLogInfo()
+ .setTopicId(uuids.head)
+ .setPartitionLogInfo(Collections.singletonList(new
GetReplicaLogInfoResponseData.PartitionLogInfo()
+ .setPartition(1)
+ .setLogEndOffset(100L)
+ .setLastWrittenLeaderEpoch(10)
+ .setCurrentLeaderEpoch(1))))
+
+ var idx = 1
+ List(Errors.KAFKA_STORAGE_ERROR, Errors.NOT_LEADER_OR_FOLLOWER,
Errors.UNKNOWN_TOPIC_OR_PARTITION).foreach { err =>
+ val uuid = uuids(idx)
+ val name = mockTopicName(uuid, idx)
+ val invalid = new TopicPartition(name, 1)
+ when(replicaManager.getPartitionOrError(invalid)).thenReturn(Left(err))
+ expected.topicPartitionLogInfoList().add(new
GetReplicaLogInfoResponseData.TopicPartitionLogInfo()
+ .setTopicId(uuid)
+ .setPartitionLogInfo(Collections.singletonList(new
GetReplicaLogInfoResponseData.PartitionLogInfo()
+ .setErrorCode(err.code())
+ .setPartition(1))))
+ idx += 1
+ }
+
+
+ val builder = new GetReplicaLogInfoRequest.Builder(
+ new GetReplicaLogInfoRequestData().setTopicPartitions(tps asJava)
+ )
+ verifyGetReplicaLogInfoRequest(builder, { response =>
+ assertEquals(expected, response.data())
+ })
+ }
+
+ @Test
+ def testGetReplicaLogInfoFailedToRetrieveLog(): Unit = {
+ val topic1 = new GetReplicaLogInfoRequestData.TopicPartitions()
+ .setTopicId(Uuid.randomUuid())
+ .setPartitions(Collections.singletonList(1))
+ val topic2 = new GetReplicaLogInfoRequestData.TopicPartitions()
+ .setTopicId(Uuid.randomUuid())
+ .setPartitions(Collections.singletonList(2))
+ val builder = new GetReplicaLogInfoRequest.Builder(List(topic1, topic2)
asJava)
+
+ metadataCache = mock(classOf[MetadataCache])
+
when(metadataCache.getTopicName(topic1.topicId())).thenReturn(Optional.of("topic1"))
+
when(metadataCache.getTopicName(topic2.topicId())).thenReturn(Optional.of("topic2"))
+
+ val log1 = mock(classOf[UnifiedLog])
+ when(log1.logEndOffset).thenReturn(100L)
+ when(log1.latestEpoch).thenReturn(Optional.of(10))
+ val partition1 = mock(classOf[Partition])
+ when(partition1.log).thenReturn(Some(log1))
+ when(partition1.getLeaderEpoch).thenReturn(1)
+ when(partition1.partitionId).thenReturn(1)
+
+ val partition2 = mock(classOf[Partition])
+ when(partition2.log).thenReturn(None)
+ when(partition2.getLeaderEpoch).thenReturn(2)
+ when(partition2.partitionId).thenReturn(2)
+
+ val tp1 = new TopicPartition("topic1", 1)
+ when(replicaManager.getPartitionOrError(tp1)).thenReturn(Right(partition1))
+ val tp2 = new TopicPartition("topic2", 2)
+ when(replicaManager.getPartitionOrError(tp2)).thenReturn(Right(partition2))
+
+ val expected = new GetReplicaLogInfoResponseData()
+ .setTopicPartitionLogInfoList(List(
+ new GetReplicaLogInfoResponseData.TopicPartitionLogInfo()
+ .setTopicId(topic1.topicId())
+ .setPartitionLogInfo(Collections.singletonList(
+ new GetReplicaLogInfoResponseData.PartitionLogInfo()
+ .setPartition(1)
+ .setLogEndOffset(100L)
+ .setLastWrittenLeaderEpoch(10)
+ .setCurrentLeaderEpoch(1))),
+ new GetReplicaLogInfoResponseData.TopicPartitionLogInfo()
+ .setTopicId(topic2.topicId())
+ .setPartitionLogInfo(Collections.singletonList(
+ new GetReplicaLogInfoResponseData.PartitionLogInfo()
+ .setErrorCode(Errors.LOG_DIR_NOT_FOUND.code())
+ ))) asJava)
+ .setBrokerEpoch(brokerEpoch)
+
+ verifyGetReplicaLogInfoRequest(builder, { response =>
+ assertEquals(expected, response.data())
+ })
+ }
+
+ @Test
+ def testGetReplicaLogInfoUnknownTopic(): Unit = {
+ val expectedPartition = 1
+ val expectedUuid = Uuid.randomUuid()
+ val builder = new GetReplicaLogInfoRequest.Builder(new
GetReplicaLogInfoRequestData()
+ .setTopicPartitions(
+ Collections.singletonList(new
GetReplicaLogInfoRequestData.TopicPartitions()
+ .setTopicId(expectedUuid)
+ .setPartitions(Collections.singletonList(expectedPartition)))))
+ metadataCache = mock(classOf[MetadataCache])
+ when(metadataCache.getTopicName(expectedUuid)).thenReturn(Optional.empty())
+
+ val expectedResponseData = new GetReplicaLogInfoResponseData()
+ .setBrokerEpoch(brokerEpoch)
+ .setTopicPartitionLogInfoList(Collections.singletonList(
+ new GetReplicaLogInfoResponseData.TopicPartitionLogInfo()
+ .setTopicId(expectedUuid)
+ .setPartitionLogInfo(Collections.singletonList(
+ new GetReplicaLogInfoResponseData.PartitionLogInfo()
+ .setPartition(expectedPartition)
+ .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code())
+ ))
+ ))
+ verifyGetReplicaLogInfoRequest(builder, { response =>
+ assertEquals(expectedResponseData, response.data())
+ })
+ }
+
+ @Test
+ def testGetReplicaLogInfoRequestTooManyTopics(): Unit = {
+ // 100 topics, 20 partitions per topic = 2k topic-partitions
+ // only first 1000 should be sent back and truncated = true
+ val numberUuids = 100
+ val numberPartitions = 20
+ val uuids: List[Uuid] = (1 to numberUuids).map(_ =>
Uuid.randomUuid()).toList
+ val tps = uuids.map(new GetReplicaLogInfoRequestData.TopicPartitions()
+ .setTopicId(_)
+ .setPartitions((1 to numberPartitions).map(new Integer(_)).asJava))
+ val builder = new GetReplicaLogInfoRequest.Builder(
+ new GetReplicaLogInfoRequestData().setTopicPartitions(tps asJava))
+ val expectedLogEndOffset = 10L
+ val expectedLeaderEpoch = 2
+ val expectedLatestEpoch = 3
+
+ def mockTopicName(uuid: Uuid, idx: Int): String =
s"topic-idx-$idx-with-uuid-$uuid"
+
+ metadataCache = mock(classOf[MetadataCache])
+ uuids.zipWithIndex.foreach { case (uuid, idx) =>
+
when(metadataCache.getTopicName(uuid)).thenReturn(Optional.of(mockTopicName(uuid,
idx)))
+ }
+
+ val expectedResponseData =
+ new
GetReplicaLogInfoResponseData().setHasMoreData(true).setBrokerEpoch(brokerEpoch)
+ uuids.take(50).zipWithIndex.foreach { case (uuid, idx) =>
+ val tpli = new GetReplicaLogInfoResponseData.TopicPartitionLogInfo()
+ .setTopicId(uuid)
+ val topicName = mockTopicName(uuid, idx)
+ val log = mock(classOf[UnifiedLog])
+ when(log.logEndOffset).thenReturn(expectedLogEndOffset)
+ when(log.latestEpoch).thenReturn(Optional.of(expectedLatestEpoch))
+ (1 to numberPartitions).foreach { pid =>
+ val partition = mock(classOf[Partition])
+ when(partition.log).thenReturn(Some(log))
+ when(partition.getLeaderEpoch).thenReturn(expectedLeaderEpoch)
+ when(partition.partitionId).thenReturn(pid)
+ when(replicaManager.getPartitionOrError(new TopicPartition(topicName,
pid))).thenReturn(Right(partition))
+ tpli.partitionLogInfo().add(new
GetReplicaLogInfoResponseData.PartitionLogInfo()
+ .setPartition(pid)
+ .setLogEndOffset(expectedLogEndOffset)
+ .setLastWrittenLeaderEpoch(expectedLatestEpoch)
+ .setCurrentLeaderEpoch(expectedLeaderEpoch))
+ }
+ expectedResponseData.topicPartitionLogInfoList().add(tpli)
+ }
+
+ verifyGetReplicaLogInfoRequest(builder, { response =>
+ assertEquals(expectedResponseData, response.data())
+ })
+ }
+
+ @Test
+ def testGetReplicaInfoRequestHappyTrail(): Unit = {
Review Comment:
Bc all code has a path lol
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]