showuon commented on code in PR #12024:
URL: https://github.com/apache/kafka/pull/12024#discussion_r850045692


##########
core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala:
##########
@@ -159,6 +167,74 @@ class TopicCommandTest {
     assertEquals(expectedAssignment, actualAssignment)
   }
 
+  @Test
+  def testCreateTopicDoesNotRetryThrottlingQuotaExceededException(): Unit = {
+    val adminClient = mock(classOf[Admin])
+    val topicService = TopicService(adminClient)
+
+    val result = AdminClientTestUtils.createTopicsResult(topicName, 
Errors.THROTTLING_QUOTA_EXCEEDED.exception())
+    when(adminClient.createTopics(any(), any())).thenReturn(result)
+
+    assertThrows(classOf[ThrottlingQuotaExceededException],
+      () => topicService.createTopic(new TopicCommandOptions(Array("--topic", 
topicName))))
+
+    val expectedNewTopic = new NewTopic(topicName, Optional.empty[Integer](), 
Optional.empty[java.lang.Short]())
+      .configs(Map.empty[String, String].asJava)
+
+    verify(adminClient, times(1)).createTopics(
+      eqThat(Set(expectedNewTopic).asJava),
+      argThat((_.shouldRetryOnQuotaViolation() == false): 
ArgumentMatcher[CreateTopicsOptions])
+    )
+  }
+
+  @Test
+  def testDeleteTopicDoesNotRetryThrottlingQuotaExceededException(): Unit = {
+    val adminClient = mock(classOf[Admin])
+    val topicService = TopicService(adminClient)
+
+    val listResult = AdminClientTestUtils.listTopicsResult(topicName)
+    when(adminClient.listTopics(any())).thenReturn(listResult)
+
+    val result = AdminClientTestUtils.deleteTopicsResult(topicName, 
Errors.THROTTLING_QUOTA_EXCEEDED.exception())
+    when(adminClient.deleteTopics(any[Collection[String]](), 
any())).thenReturn(result)
+
+    val exception = assertThrows(classOf[ExecutionException],
+      () => topicService.deleteTopic(new TopicCommandOptions(Array("--topic", 
topicName))))
+    
assertTrue(exception.getCause.isInstanceOf[ThrottlingQuotaExceededException])
+
+    verify(adminClient, times(1)).deleteTopics(
+      eqThat(Seq(topicName).asJavaCollection),
+      argThat((_.shouldRetryOnQuotaViolation() == false): 
ArgumentMatcher[DeleteTopicsOptions])
+    )
+  }
+
+  @Test
+  def testCreatePartitionsDoesNotRetryThrottlingQuotaExceededException(): Unit 
= {

Review Comment:
   question: did we test both `zk` and `kraft` mode here?



##########
core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala:
##########
@@ -19,123 +19,30 @@ package kafka.admin
 import java.util
 import java.util.Properties
 import kafka.admin.ConfigCommand.ConfigCommandOptions
-import kafka.api.ApiVersion
-import kafka.cluster.{Broker, EndPoint}
-import kafka.server.{ConfigEntityName, ConfigType, KafkaConfig, 
QuorumTestHarness}
-import kafka.utils.{Exit, Logging}
-import kafka.zk.{AdminZkClient, BrokerInfo, KafkaZkClient}
+import kafka.cluster.Broker
+import kafka.server.{ConfigEntityName, ConfigType}
+import kafka.utils.Logging
+import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.common.Node
-import org.apache.kafka.common.config.{ConfigException, ConfigResource}
+import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors.InvalidConfigurationException
 import org.apache.kafka.common.internals.KafkaFutureImpl
-import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.quota.{ClientQuotaAlteration, 
ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
-import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils
 import org.apache.kafka.common.utils.Sanitizer
 import org.apache.kafka.test.TestUtils
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
+import org.mockito.ArgumentMatchers.anyString
 import org.mockito.Mockito.{mock, times, verify, when}
 
 import scala.collection.{Seq, mutable}
 import scala.jdk.CollectionConverters._
 
-class ConfigCommandTest extends QuorumTestHarness with Logging {
+class ConfigCommandTest extends Logging {
 
-  @Test
-  def shouldExitWithNonZeroStatusOnArgError(): Unit = {

Review Comment:
   Why should we move these argument tests (and below) to integration tests? 
Looks like they can be unit test?



##########
core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala:
##########
@@ -391,7 +298,7 @@ class ConfigCommandTest extends QuorumTestHarness with 
Logging {
   def shouldFailIfUnrecognisedEntityTypeUsingZookeeper(): Unit = {
     val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
       "--entity-name", "client", "--entity-type", "not-recognised", "--alter", 
"--add-config", "a=b,c=d"))
-    assertThrows(classOf[IllegalArgumentException], () => 
ConfigCommand.alterConfigWithZk(null, createOpts, new 
DummyAdminZkClient(zkClient)))
+    assertThrows(classOf[IllegalArgumentException], () => 
ConfigCommand.alterConfigWithZk(null, createOpts, new DummyAdminZkClient(null)))

Review Comment:
   nit: we can create a DummyAdminZKClient with null zkClient, to use in this 
new unit test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to