showuon commented on code in PR #12024:
URL: https://github.com/apache/kafka/pull/12024#discussion_r851035284
##########
core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala:
##########
@@ -479,6 +460,9 @@ class ConfigCommandTest extends QuorumTestHarness with
Logging {
"--alter",
"--add-config", "a=b,c=d"))
+ val zkClient = mock(classOf[KafkaZkClient])
+ when(zkClient.getEntityConfigs(anyString(), anyString())).thenReturn(new
Properties())
Review Comment:
note: Using mock `KafkaZkClient` is fine here because we just want to test
the updated config is correctly parsed by zkClient. Same as below. Thanks.
##########
core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala:
##########
@@ -159,6 +167,74 @@ class TopicCommandTest {
assertEquals(expectedAssignment, actualAssignment)
}
+ @Test
+ def testCreateTopicDoesNotRetryThrottlingQuotaExceededException(): Unit = {
+ val adminClient = mock(classOf[Admin])
+ val topicService = TopicService(adminClient)
+
+ val result = AdminClientTestUtils.createTopicsResult(topicName,
Errors.THROTTLING_QUOTA_EXCEEDED.exception())
+ when(adminClient.createTopics(any(), any())).thenReturn(result)
+
+ assertThrows(classOf[ThrottlingQuotaExceededException],
+ () => topicService.createTopic(new TopicCommandOptions(Array("--topic",
topicName))))
+
+ val expectedNewTopic = new NewTopic(topicName, Optional.empty[Integer](),
Optional.empty[java.lang.Short]())
+ .configs(Map.empty[String, String].asJava)
+
+ verify(adminClient, times(1)).createTopics(
+ eqThat(Set(expectedNewTopic).asJava),
+ argThat((_.shouldRetryOnQuotaViolation() == false):
ArgumentMatcher[CreateTopicsOptions])
+ )
+ }
+
+ @Test
+ def testDeleteTopicDoesNotRetryThrottlingQuotaExceededException(): Unit = {
+ val adminClient = mock(classOf[Admin])
+ val topicService = TopicService(adminClient)
+
+ val listResult = AdminClientTestUtils.listTopicsResult(topicName)
+ when(adminClient.listTopics(any())).thenReturn(listResult)
+
+ val result = AdminClientTestUtils.deleteTopicsResult(topicName,
Errors.THROTTLING_QUOTA_EXCEEDED.exception())
+ when(adminClient.deleteTopics(any[Collection[String]](),
any())).thenReturn(result)
+
+ val exception = assertThrows(classOf[ExecutionException],
+ () => topicService.deleteTopic(new TopicCommandOptions(Array("--topic",
topicName))))
+
assertTrue(exception.getCause.isInstanceOf[ThrottlingQuotaExceededException])
+
+ verify(adminClient, times(1)).deleteTopics(
+ eqThat(Seq(topicName).asJavaCollection),
+ argThat((_.shouldRetryOnQuotaViolation() == false):
ArgumentMatcher[DeleteTopicsOptions])
+ )
+ }
+
+ @Test
+ def testCreatePartitionsDoesNotRetryThrottlingQuotaExceededException(): Unit
= {
Review Comment:
Yes, you're right! Good observation!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]