showuon commented on code in PR #12024:
URL: https://github.com/apache/kafka/pull/12024#discussion_r850045692
##########
core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala:
##########
@@ -159,6 +167,74 @@ class TopicCommandTest {
assertEquals(expectedAssignment, actualAssignment)
}
+ @Test
+ def testCreateTopicDoesNotRetryThrottlingQuotaExceededException(): Unit = {
+ val adminClient = mock(classOf[Admin])
+ val topicService = TopicService(adminClient)
+
+ val result = AdminClientTestUtils.createTopicsResult(topicName,
Errors.THROTTLING_QUOTA_EXCEEDED.exception())
+ when(adminClient.createTopics(any(), any())).thenReturn(result)
+
+ assertThrows(classOf[ThrottlingQuotaExceededException],
+ () => topicService.createTopic(new TopicCommandOptions(Array("--topic",
topicName))))
+
+ val expectedNewTopic = new NewTopic(topicName, Optional.empty[Integer](),
Optional.empty[java.lang.Short]())
+ .configs(Map.empty[String, String].asJava)
+
+ verify(adminClient, times(1)).createTopics(
+ eqThat(Set(expectedNewTopic).asJava),
+ argThat((_.shouldRetryOnQuotaViolation() == false):
ArgumentMatcher[CreateTopicsOptions])
+ )
+ }
+
+ @Test
+ def testDeleteTopicDoesNotRetryThrottlingQuotaExceededException(): Unit = {
+ val adminClient = mock(classOf[Admin])
+ val topicService = TopicService(adminClient)
+
+ val listResult = AdminClientTestUtils.listTopicsResult(topicName)
+ when(adminClient.listTopics(any())).thenReturn(listResult)
+
+ val result = AdminClientTestUtils.deleteTopicsResult(topicName,
Errors.THROTTLING_QUOTA_EXCEEDED.exception())
+ when(adminClient.deleteTopics(any[Collection[String]](),
any())).thenReturn(result)
+
+ val exception = assertThrows(classOf[ExecutionException],
+ () => topicService.deleteTopic(new TopicCommandOptions(Array("--topic",
topicName))))
+
assertTrue(exception.getCause.isInstanceOf[ThrottlingQuotaExceededException])
+
+ verify(adminClient, times(1)).deleteTopics(
+ eqThat(Seq(topicName).asJavaCollection),
+ argThat((_.shouldRetryOnQuotaViolation() == false):
ArgumentMatcher[DeleteTopicsOptions])
+ )
+ }
+
+ @Test
+ def testCreatePartitionsDoesNotRetryThrottlingQuotaExceededException(): Unit
= {
Review Comment:
question: did we test both `zk` and `kraft` mode here?
##########
core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala:
##########
@@ -19,123 +19,30 @@ package kafka.admin
import java.util
import java.util.Properties
import kafka.admin.ConfigCommand.ConfigCommandOptions
-import kafka.api.ApiVersion
-import kafka.cluster.{Broker, EndPoint}
-import kafka.server.{ConfigEntityName, ConfigType, KafkaConfig,
QuorumTestHarness}
-import kafka.utils.{Exit, Logging}
-import kafka.zk.{AdminZkClient, BrokerInfo, KafkaZkClient}
+import kafka.cluster.Broker
+import kafka.server.{ConfigEntityName, ConfigType}
+import kafka.utils.Logging
+import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.admin._
import org.apache.kafka.common.Node
-import org.apache.kafka.common.config.{ConfigException, ConfigResource}
+import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.InvalidConfigurationException
import org.apache.kafka.common.internals.KafkaFutureImpl
-import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.quota.{ClientQuotaAlteration,
ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
-import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils
import org.apache.kafka.common.utils.Sanitizer
import org.apache.kafka.test.TestUtils
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
+import org.mockito.ArgumentMatchers.anyString
import org.mockito.Mockito.{mock, times, verify, when}
import scala.collection.{Seq, mutable}
import scala.jdk.CollectionConverters._
-class ConfigCommandTest extends QuorumTestHarness with Logging {
+class ConfigCommandTest extends Logging {
- @Test
- def shouldExitWithNonZeroStatusOnArgError(): Unit = {
Review Comment:
Why should we move these argument tests (and below) to integration tests?
Looks like they can be unit test?
##########
core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala:
##########
@@ -391,7 +298,7 @@ class ConfigCommandTest extends QuorumTestHarness with
Logging {
def shouldFailIfUnrecognisedEntityTypeUsingZookeeper(): Unit = {
val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
"--entity-name", "client", "--entity-type", "not-recognised", "--alter",
"--add-config", "a=b,c=d"))
- assertThrows(classOf[IllegalArgumentException], () =>
ConfigCommand.alterConfigWithZk(null, createOpts, new
DummyAdminZkClient(zkClient)))
+ assertThrows(classOf[IllegalArgumentException], () =>
ConfigCommand.alterConfigWithZk(null, createOpts, new DummyAdminZkClient(null)))
Review Comment:
nit: we can create a DummyAdminZKClient with null zkClient, to use in this
new unit test.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]