chia7712 commented on code in PR #16648:
URL: https://github.com/apache/kafka/pull/16648#discussion_r1718663810
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -96,6 +96,62 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
super.tearDown()
}
+ @ParameterizedTest
+ @Timeout(30)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeConfigWithOptionTimeoutMs(quorum: String): Unit = {
+ val config = createConfig
+ config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
s"localhost:${TestUtils.IncorrectBrokerPort}")
+ val brokenClient = Admin.create(config)
Review Comment:
please use try-finally to release this resource
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -96,6 +96,62 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
super.tearDown()
}
+ @ParameterizedTest
+ @Timeout(30)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeConfigWithOptionTimeoutMs(quorum: String): Unit = {
+ val config = createConfig
+ config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
s"localhost:${TestUtils.IncorrectBrokerPort}")
+ val brokenClient = Admin.create(config)
+
+ client = createAdminClient
+
+ // Create topics
Review Comment:
The purpose of this test case is used to make sure timeout works well, so we
can leverage existent configs (broker-level). Hence, could you please remove
those topic creation?
##########
core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala:
##########
@@ -126,6 +133,79 @@ class SaslSslAdminIntegrationTest extends
BaseAdminIntegrationTest with SaslSetu
val groupAcl = new AclBinding(new ResourcePattern(ResourceType.GROUP, "*",
PatternType.LITERAL),
new AccessControlEntry("User:*", "*", AclOperation.ALL,
AclPermissionType.ALLOW))
+ @ParameterizedTest
+ @Timeout(30)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAclOperationsWithOptionTimeoutMs(quorum: String): Unit = {
+ val config = createConfig
+ // this will cause timeout connecting to broker
+ config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
s"localhost:${TestUtils.IncorrectBrokerPort}")
+ val brokenClient = Admin.create(config)
+
+ val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC,
"mytopic3", PatternType.LITERAL),
+ new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE,
AclPermissionType.ALLOW))
+ val exception = assertThrows(classOf[ExecutionException], () => {
+ brokenClient.createAcls(Collections.singleton(acl), new
CreateAclsOptions().timeoutMs(0)).all().get()
+ })
+ assertInstanceOf(classOf[TimeoutException], exception.getCause)
+ brokenClient.close(time.Duration.ZERO)
+ }
+
+ @ParameterizedTest
+ @Timeout(30)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDeleteAclsWithOptionTimeoutMs(quorum: String): Unit = {
+ val config = createConfig
+ // this will cause timeout connecting to broker
+ config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
s"localhost:${TestUtils.IncorrectBrokerPort}")
+ val brokenClient = Admin.create(config)
+
+ // prepare normal client
+ client = createAdminClient
Review Comment:
yes, so we don't need to have `client` and create ACLs, right?
##########
clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenOptions.java:
##########
@@ -28,6 +28,13 @@
public class ExpireDelegationTokenOptions extends
AbstractOptions<ExpireDelegationTokenOptions> {
private long expiryTimePeriodMs = -1L;
+ /**
+ * @param expiryTimePeriodMs the time period until we should expire this
token.
+ * {@code expiryTimestamp} will be set to {@code min(expiryTimestamp,
maxTimestamp)}
Review Comment:
line#33 seems to be duplicate. could you please remove it
##########
core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala:
##########
@@ -127,6 +133,79 @@ class SaslSslAdminIntegrationTest extends
BaseAdminIntegrationTest with SaslSetu
val groupAcl = new AclBinding(new ResourcePattern(ResourceType.GROUP, "*",
PatternType.LITERAL),
new AccessControlEntry("User:*", "*", AclOperation.ALL,
AclPermissionType.ALLOW))
+ @ParameterizedTest
+ @Timeout(30)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAclOperationsWithOptionTimeoutMs(quorum: String): Unit = {
+ val config = createConfig
+ // this will cause timeout connecting to broker
+ config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
s"localhost:${TestUtils.IncorrectBrokerPort}")
+ val brokenClient = Admin.create(config)
+
+ val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC,
"mytopic3", PatternType.LITERAL),
+ new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE,
AclPermissionType.ALLOW))
+ val exception = assertThrows(classOf[ExecutionException], () => {
+ brokenClient.createAcls(Collections.singleton(acl), new
CreateAclsOptions().timeoutMs(0)).all().get()
+ })
+ assertInstanceOf(classOf[TimeoutException], exception.getCause)
+ brokenClient.close(time.Duration.ZERO)
Review Comment:
ditto
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -96,6 +96,62 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
super.tearDown()
}
+ @ParameterizedTest
+ @Timeout(30)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeConfigWithOptionTimeoutMs(quorum: String): Unit = {
+ val config = createConfig
+ config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
s"localhost:${TestUtils.IncorrectBrokerPort}")
+ val brokenClient = Admin.create(config)
+
+ client = createAdminClient
+
+ // Create topics
+ val topic1 = "describe-alter-configs-topic-1"
+ val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1)
+ val topicConfig1 = new Properties
+ topicConfig1.setProperty(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, "500000")
+ topicConfig1.setProperty(TopicConfig.RETENTION_MS_CONFIG, "60000000")
+ createTopic(topic1)
+
+ val topic2 = "describe-alter-configs-topic-2"
+ val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
+ createTopic(topic2)
+
+ // Describe topics and broker
+ val brokerResource1 = new ConfigResource(ConfigResource.Type.BROKER,
brokers(1).config.brokerId.toString)
+ val brokerResource2 = new ConfigResource(ConfigResource.Type.BROKER,
brokers(2).config.brokerId.toString)
+ val configResources = Seq(topicResource1, topicResource2, brokerResource1,
brokerResource2)
+
+ val exception = assertThrows(classOf[ExecutionException], () => {
+ brokenClient.describeConfigs(configResources.asJava,new
DescribeConfigsOptions().timeoutMs(0)).all().get()
+ })
+ assertInstanceOf(classOf[TimeoutException], exception.getCause)
+ brokenClient.close(time.Duration.ZERO)
+ }
+
+ @ParameterizedTest
+ @Timeout(30)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAlterConfigsWithOptionTimeoutMs(quorum: String): Unit = {
+ client = createAdminClient
+ val config = createConfig
+ config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
s"localhost:${TestUtils.IncorrectBrokerPort}")
+ val brokenClient = Admin.create(config)
+
+ val alterLogLevelsEntries = Seq(
+ new ConfigEntry("kafka.controller.KafkaController",
LogLevelConfig.INFO_LOG_LEVEL)
+ ).asJavaCollection
+
+ val exception = assertThrows(classOf[ExecutionException], () => {
+ brokenClient.alterConfigs(
+ Map(brokerLoggerConfigResource -> new
Config(alterLogLevelsEntries)).asJava,
+ new AlterConfigsOptions().timeoutMs(0)).all()
+ })
+ assertInstanceOf(classOf[TimeoutException], exception.getCause)
+ brokenClient.close(time.Duration.ZERO)
Review Comment:
ditto. try-finally
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]