chia7712 commented on code in PR #16648:
URL: https://github.com/apache/kafka/pull/16648#discussion_r1712682066


##########
core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala:
##########
@@ -49,12 +52,16 @@ class SaslSslAdminIntegrationTest extends 
BaseAdminIntegrationTest with SaslSetu
   val kraftAuthorizerClassName = classOf[StandardAuthorizer].getName
   val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
JaasTestUtils.KafkaServerPrincipalUnqualifiedName)
   var superUserAdmin: Admin = _
+  val secretKey = "secretKey"
   override protected def securityProtocol = SecurityProtocol.SASL_SSL
   override protected lazy val trustStoreFile = 
Some(TestUtils.tempFile("truststore", ".jks"))
 
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
+    // set this to use delegation token
+    
this.serverConfig.setProperty(DelegationTokenManagerConfigs.DELEGATION_TOKEN_SECRET_KEY_CONFIG,
 secretKey)
     if (TestInfoUtils.isKRaft(testInfo)) {
+      
this.serverConfig.setProperty(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, 
kraftAuthorizerClassName)

Review Comment:
   duplicate?



##########
core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala:
##########
@@ -126,6 +133,79 @@ class SaslSslAdminIntegrationTest extends 
BaseAdminIntegrationTest with SaslSetu
   val groupAcl = new AclBinding(new ResourcePattern(ResourceType.GROUP, "*", 
PatternType.LITERAL),
     new AccessControlEntry("User:*", "*", AclOperation.ALL, 
AclPermissionType.ALLOW))
 
+  @ParameterizedTest
+  @Timeout(30)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testAclOperationsWithOptionTimeoutMs(quorum: String): Unit = {
+    val config = createConfig
+    // this will cause timeout connecting to broker
+    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
s"localhost:${TestUtils.IncorrectBrokerPort}")
+    val brokenClient = Admin.create(config)
+
+    val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, 
"mytopic3", PatternType.LITERAL),
+      new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, 
AclPermissionType.ALLOW))
+    val exception = assertThrows(classOf[ExecutionException], () => {
+      brokenClient.createAcls(Collections.singleton(acl), new 
CreateAclsOptions().timeoutMs(0)).all().get()
+    })
+    assertInstanceOf(classOf[TimeoutException], exception.getCause)
+    brokenClient.close(time.Duration.ZERO)
+  }
+
+  @ParameterizedTest
+  @Timeout(30)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDeleteAclsWithOptionTimeoutMs(quorum: String): Unit = {
+    val config = createConfig
+    // this will cause timeout connecting to broker
+    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
s"localhost:${TestUtils.IncorrectBrokerPort}")
+    val brokenClient = Admin.create(config)
+
+    // prepare normal client
+    client = createAdminClient
+
+    val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, 
"mytopic3", PatternType.LITERAL),
+      new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, 
AclPermissionType.ALLOW))
+
+    client.createAcls(Collections.singleton(acl)).all().get()
+
+    val exception = assertThrows(classOf[ExecutionException], () => {
+      brokenClient.deleteAcls(Collections.singleton(AclBindingFilter.ANY), new 
DeleteAclsOptions().timeoutMs(0)).all().get()
+    })
+    assertInstanceOf(classOf[TimeoutException], exception.getCause)
+    brokenClient.close(time.Duration.ZERO)
+    client.close(time.Duration.ZERO)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk","kraft"))
+  def testExpireDelegationTokenWithOptionExpireTimePeriodMs(quorum: String): 
Unit = {
+    client = createAdminClient
+    val renewer = List(SecurityUtils.parseKafkaPrincipal("User:renewer"))
+
+    def generateTokenResult(maxlifeTimeMs: Int, expiryTimePeriodMs: Int): 
(CreateDelegationTokenResult, ExpireDelegationTokenResult) = {
+      val createResult = client.createDelegationToken(new 
CreateDelegationTokenOptions().renewers(renewer.asJava).maxlifeTimeMs(maxlifeTimeMs))
+      val tokenCreated = createResult.delegationToken.get
+      TestUtils.waitUntilTrue(() => brokers.forall(server => 
server.tokenCache.tokens().size() == 1),
+            "Timed out waiting for token to propagate to all servers")
+      val expireResult = client.expireDelegationToken(
+        tokenCreated.hmac(),
+        new 
ExpireDelegationTokenOptions().expiryTimePeriodMs(expiryTimePeriodMs)
+      )
+      (createResult, expireResult)
+    }
+
+    // Note that maxTimestamp = token created time + maxlifeTimeMs
+    val (createResult1, expireResult1) = generateTokenResult(10000, -1)
+    // if expiryTimePeriodMs < 0, token will be expired immediately.
+    assert(createResult1.delegationToken().get().tokenInfo().maxTimestamp() > 
expireResult1.expiryTimestamp().get())
+
+    // expireDelegationToken will decrease the value of expiryTimestamp, since 
this token is not expired,

Review Comment:
   nice comment! Could you please update docs of APIs?
   
   
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenOptions.java#L31



##########
core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala:
##########
@@ -126,6 +133,79 @@ class SaslSslAdminIntegrationTest extends 
BaseAdminIntegrationTest with SaslSetu
   val groupAcl = new AclBinding(new ResourcePattern(ResourceType.GROUP, "*", 
PatternType.LITERAL),
     new AccessControlEntry("User:*", "*", AclOperation.ALL, 
AclPermissionType.ALLOW))
 
+  @ParameterizedTest
+  @Timeout(30)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testAclOperationsWithOptionTimeoutMs(quorum: String): Unit = {
+    val config = createConfig
+    // this will cause timeout connecting to broker
+    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
s"localhost:${TestUtils.IncorrectBrokerPort}")
+    val brokenClient = Admin.create(config)
+
+    val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, 
"mytopic3", PatternType.LITERAL),
+      new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, 
AclPermissionType.ALLOW))
+    val exception = assertThrows(classOf[ExecutionException], () => {
+      brokenClient.createAcls(Collections.singleton(acl), new 
CreateAclsOptions().timeoutMs(0)).all().get()
+    })
+    assertInstanceOf(classOf[TimeoutException], exception.getCause)
+    brokenClient.close(time.Duration.ZERO)
+  }
+
+  @ParameterizedTest
+  @Timeout(30)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDeleteAclsWithOptionTimeoutMs(quorum: String): Unit = {
+    val config = createConfig
+    // this will cause timeout connecting to broker
+    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
s"localhost:${TestUtils.IncorrectBrokerPort}")
+    val brokenClient = Admin.create(config)
+
+    // prepare normal client
+    client = createAdminClient
+
+    val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, 
"mytopic3", PatternType.LITERAL),
+      new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, 
AclPermissionType.ALLOW))
+
+    client.createAcls(Collections.singleton(acl)).all().get()
+
+    val exception = assertThrows(classOf[ExecutionException], () => {
+      brokenClient.deleteAcls(Collections.singleton(AclBindingFilter.ANY), new 
DeleteAclsOptions().timeoutMs(0)).all().get()
+    })
+    assertInstanceOf(classOf[TimeoutException], exception.getCause)
+    brokenClient.close(time.Duration.ZERO)
+    client.close(time.Duration.ZERO)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk","kraft"))
+  def testExpireDelegationTokenWithOptionExpireTimePeriodMs(quorum: String): 
Unit = {
+    client = createAdminClient
+    val renewer = List(SecurityUtils.parseKafkaPrincipal("User:renewer"))
+
+    def generateTokenResult(maxlifeTimeMs: Int, expiryTimePeriodMs: Int): 
(CreateDelegationTokenResult, ExpireDelegationTokenResult) = {

Review Comment:
   `maxlifeTimeMs` -> `maxLifeTimeMs`



##########
core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala:
##########
@@ -126,6 +133,79 @@ class SaslSslAdminIntegrationTest extends 
BaseAdminIntegrationTest with SaslSetu
   val groupAcl = new AclBinding(new ResourcePattern(ResourceType.GROUP, "*", 
PatternType.LITERAL),
     new AccessControlEntry("User:*", "*", AclOperation.ALL, 
AclPermissionType.ALLOW))
 
+  @ParameterizedTest
+  @Timeout(30)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testAclOperationsWithOptionTimeoutMs(quorum: String): Unit = {
+    val config = createConfig
+    // this will cause timeout connecting to broker
+    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
s"localhost:${TestUtils.IncorrectBrokerPort}")
+    val brokenClient = Admin.create(config)
+
+    val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, 
"mytopic3", PatternType.LITERAL),
+      new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, 
AclPermissionType.ALLOW))
+    val exception = assertThrows(classOf[ExecutionException], () => {
+      brokenClient.createAcls(Collections.singleton(acl), new 
CreateAclsOptions().timeoutMs(0)).all().get()
+    })
+    assertInstanceOf(classOf[TimeoutException], exception.getCause)
+    brokenClient.close(time.Duration.ZERO)
+  }
+
+  @ParameterizedTest
+  @Timeout(30)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDeleteAclsWithOptionTimeoutMs(quorum: String): Unit = {
+    val config = createConfig
+    // this will cause timeout connecting to broker
+    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
s"localhost:${TestUtils.IncorrectBrokerPort}")
+    val brokenClient = Admin.create(config)
+
+    // prepare normal client
+    client = createAdminClient

Review Comment:
   The connection of `brokenClient` is never successful, right? so we don't use 
this `client`



##########
core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala:
##########
@@ -126,6 +133,79 @@ class SaslSslAdminIntegrationTest extends 
BaseAdminIntegrationTest with SaslSetu
   val groupAcl = new AclBinding(new ResourcePattern(ResourceType.GROUP, "*", 
PatternType.LITERAL),
     new AccessControlEntry("User:*", "*", AclOperation.ALL, 
AclPermissionType.ALLOW))
 
+  @ParameterizedTest
+  @Timeout(30)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testAclOperationsWithOptionTimeoutMs(quorum: String): Unit = {
+    val config = createConfig
+    // this will cause timeout connecting to broker
+    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
s"localhost:${TestUtils.IncorrectBrokerPort}")
+    val brokenClient = Admin.create(config)
+
+    val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, 
"mytopic3", PatternType.LITERAL),
+      new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, 
AclPermissionType.ALLOW))
+    val exception = assertThrows(classOf[ExecutionException], () => {
+      brokenClient.createAcls(Collections.singleton(acl), new 
CreateAclsOptions().timeoutMs(0)).all().get()
+    })
+    assertInstanceOf(classOf[TimeoutException], exception.getCause)
+    brokenClient.close(time.Duration.ZERO)
+  }
+
+  @ParameterizedTest
+  @Timeout(30)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDeleteAclsWithOptionTimeoutMs(quorum: String): Unit = {
+    val config = createConfig
+    // this will cause timeout connecting to broker
+    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
s"localhost:${TestUtils.IncorrectBrokerPort}")
+    val brokenClient = Admin.create(config)
+
+    // prepare normal client
+    client = createAdminClient
+
+    val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, 
"mytopic3", PatternType.LITERAL),
+      new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, 
AclPermissionType.ALLOW))
+
+    client.createAcls(Collections.singleton(acl)).all().get()
+
+    val exception = assertThrows(classOf[ExecutionException], () => {
+      brokenClient.deleteAcls(Collections.singleton(AclBindingFilter.ANY), new 
DeleteAclsOptions().timeoutMs(0)).all().get()
+    })
+    assertInstanceOf(classOf[TimeoutException], exception.getCause)
+    brokenClient.close(time.Duration.ZERO)
+    client.close(time.Duration.ZERO)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk","kraft"))
+  def testExpireDelegationTokenWithOptionExpireTimePeriodMs(quorum: String): 
Unit = {
+    client = createAdminClient
+    val renewer = List(SecurityUtils.parseKafkaPrincipal("User:renewer"))
+
+    def generateTokenResult(maxlifeTimeMs: Int, expiryTimePeriodMs: Int): 
(CreateDelegationTokenResult, ExpireDelegationTokenResult) = {
+      val createResult = client.createDelegationToken(new 
CreateDelegationTokenOptions().renewers(renewer.asJava).maxlifeTimeMs(maxlifeTimeMs))
+      val tokenCreated = createResult.delegationToken.get
+      TestUtils.waitUntilTrue(() => brokers.forall(server => 
server.tokenCache.tokens().size() == 1),
+            "Timed out waiting for token to propagate to all servers")
+      val expireResult = client.expireDelegationToken(
+        tokenCreated.hmac(),
+        new 
ExpireDelegationTokenOptions().expiryTimePeriodMs(expiryTimePeriodMs)
+      )
+      (createResult, expireResult)
+    }
+
+    // Note that maxTimestamp = token created time + maxlifeTimeMs
+    val (createResult1, expireResult1) = generateTokenResult(10000, -1)
+    // if expiryTimePeriodMs < 0, token will be expired immediately.
+    assert(createResult1.delegationToken().get().tokenInfo().maxTimestamp() > 
expireResult1.expiryTimestamp().get())

Review Comment:
   `assertTrue`



##########
core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala:
##########
@@ -126,6 +133,79 @@ class SaslSslAdminIntegrationTest extends 
BaseAdminIntegrationTest with SaslSetu
   val groupAcl = new AclBinding(new ResourcePattern(ResourceType.GROUP, "*", 
PatternType.LITERAL),
     new AccessControlEntry("User:*", "*", AclOperation.ALL, 
AclPermissionType.ALLOW))
 
+  @ParameterizedTest
+  @Timeout(30)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testAclOperationsWithOptionTimeoutMs(quorum: String): Unit = {
+    val config = createConfig
+    // this will cause timeout connecting to broker
+    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
s"localhost:${TestUtils.IncorrectBrokerPort}")
+    val brokenClient = Admin.create(config)
+
+    val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, 
"mytopic3", PatternType.LITERAL),
+      new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, 
AclPermissionType.ALLOW))
+    val exception = assertThrows(classOf[ExecutionException], () => {
+      brokenClient.createAcls(Collections.singleton(acl), new 
CreateAclsOptions().timeoutMs(0)).all().get()
+    })
+    assertInstanceOf(classOf[TimeoutException], exception.getCause)
+    brokenClient.close(time.Duration.ZERO)
+  }
+
+  @ParameterizedTest
+  @Timeout(30)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDeleteAclsWithOptionTimeoutMs(quorum: String): Unit = {
+    val config = createConfig
+    // this will cause timeout connecting to broker
+    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
s"localhost:${TestUtils.IncorrectBrokerPort}")
+    val brokenClient = Admin.create(config)
+
+    // prepare normal client
+    client = createAdminClient
+
+    val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, 
"mytopic3", PatternType.LITERAL),
+      new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, 
AclPermissionType.ALLOW))
+
+    client.createAcls(Collections.singleton(acl)).all().get()
+
+    val exception = assertThrows(classOf[ExecutionException], () => {
+      brokenClient.deleteAcls(Collections.singleton(AclBindingFilter.ANY), new 
DeleteAclsOptions().timeoutMs(0)).all().get()
+    })
+    assertInstanceOf(classOf[TimeoutException], exception.getCause)
+    brokenClient.close(time.Duration.ZERO)
+    client.close(time.Duration.ZERO)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk","kraft"))
+  def testExpireDelegationTokenWithOptionExpireTimePeriodMs(quorum: String): 
Unit = {
+    client = createAdminClient
+    val renewer = List(SecurityUtils.parseKafkaPrincipal("User:renewer"))
+
+    def generateTokenResult(maxlifeTimeMs: Int, expiryTimePeriodMs: Int): 
(CreateDelegationTokenResult, ExpireDelegationTokenResult) = {
+      val createResult = client.createDelegationToken(new 
CreateDelegationTokenOptions().renewers(renewer.asJava).maxlifeTimeMs(maxlifeTimeMs))
+      val tokenCreated = createResult.delegationToken.get
+      TestUtils.waitUntilTrue(() => brokers.forall(server => 
server.tokenCache.tokens().size() == 1),
+            "Timed out waiting for token to propagate to all servers")
+      val expireResult = client.expireDelegationToken(
+        tokenCreated.hmac(),
+        new 
ExpireDelegationTokenOptions().expiryTimePeriodMs(expiryTimePeriodMs)
+      )
+      (createResult, expireResult)
+    }
+
+    // Note that maxTimestamp = token created time + maxlifeTimeMs
+    val (createResult1, expireResult1) = generateTokenResult(10000, -1)
+    // if expiryTimePeriodMs < 0, token will be expired immediately.
+    assert(createResult1.delegationToken().get().tokenInfo().maxTimestamp() > 
expireResult1.expiryTimestamp().get())
+
+    // expireDelegationToken will decrease the value of expiryTimestamp, since 
this token is not expired,
+    // expiryTimePeriodMs will be set to min(expiryTimestamp, maxTimestamp),
+    // in this case, maxTimestamp is smaller, so expiryTimestamp will not be 
modified
+    val (createResult2, expireResult2) = generateTokenResult(5000, 100000)
+    assert(createResult2.delegationToken().get().tokenInfo().expiryTimestamp() 
== expireResult2.expiryTimestamp().get())

Review Comment:
   `assertTrue`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to