chia7712 commented on code in PR #16648:
URL: https://github.com/apache/kafka/pull/16648#discussion_r1722203370


##########
clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenOptions.java:
##########
@@ -28,6 +28,13 @@
 public class ExpireDelegationTokenOptions extends 
AbstractOptions<ExpireDelegationTokenOptions> {
     private long expiryTimePeriodMs = -1L;
 
+    /**
+     * @param expiryTimePeriodMs the time period until we should expire this 
token.
+     * {@code expiryTimestamp} is the time we actually expire the token.
+     * If {@code expiryTimePeriodMs} < 0, token will be expired immediately.
+     * If {@code expiryTimePeriodMs} >= 0 and {@code expiryTimestamp} is not 
due,
+     * {@code expiryTimestamp} will be set to {@code min(expiryTimestamp, 
maxTimestamp)}.

Review Comment:
   I feel `min(expiryTimestamp, maxTimestamp)` is a bit puzzling, since the 
public APIs don't mention the `maxTimestamp`. How about:
   
   ```
   {@code expiryTimePeriodMs} >= 0: the token will update the `expiration 
timestamp` if the current expiration timestamp is small than (now + 
expiryTimePeriodMs)
   ```



##########
core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala:
##########
@@ -505,7 +583,7 @@ class SaslSslAdminIntegrationTest extends 
BaseAdminIntegrationTest with SaslSetu
     assertNotEquals(Uuid.ZERO_UUID, createResult.topicId(topic1).get())
     assertEquals(topicIds(topic1), createResult.topicId(topic1).get())
     assertFutureExceptionTypeEquals(createResult.topicId(topic2), 
classOf[TopicAuthorizationException])
-    
+

Review Comment:
   please remove this unrelated change



##########
core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala:
##########
@@ -127,6 +133,78 @@ class SaslSslAdminIntegrationTest extends 
BaseAdminIntegrationTest with SaslSetu
   val groupAcl = new AclBinding(new ResourcePattern(ResourceType.GROUP, "*", 
PatternType.LITERAL),
     new AccessControlEntry("User:*", "*", AclOperation.ALL, 
AclPermissionType.ALLOW))
 
+  @ParameterizedTest
+  @Timeout(30)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testAclOperationsWithOptionTimeoutMs(quorum: String): Unit = {
+    val config = createConfig
+    // this will cause timeout connecting to broker
+    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
s"localhost:${TestUtils.IncorrectBrokerPort}")
+    val brokenClient = Admin.create(config)
+
+    try {
+      val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, 
"mytopic3", PatternType.LITERAL),
+      new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, 
AclPermissionType.ALLOW))
+      val exception = assertThrows(classOf[ExecutionException], () => {
+      brokenClient.createAcls(Collections.singleton(acl), new 
CreateAclsOptions().timeoutMs(0)).all().get()
+      })
+      assertInstanceOf(classOf[TimeoutException], exception.getCause)
+    } finally brokenClient.close(time.Duration.ZERO)
+  }
+
+  @ParameterizedTest
+  @Timeout(30)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDeleteAclsWithOptionTimeoutMs(quorum: String): Unit = {
+    val config = createConfig
+    // this will cause timeout connecting to broker
+    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
s"localhost:${TestUtils.IncorrectBrokerPort}")
+    val brokenClient = Admin.create(config)
+
+    try {
+      val exception = assertThrows(classOf[ExecutionException], () => {
+        brokenClient.deleteAcls(Collections.singleton(AclBindingFilter.ANY), 
new DeleteAclsOptions().timeoutMs(0)).all().get()
+      })
+      assertInstanceOf(classOf[TimeoutException], exception.getCause)
+    } finally brokenClient.close(time.Duration.ZERO)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk","kraft"))
+  def testExpireDelegationTokenWithOptionExpireTimePeriodMs(quorum: String): 
Unit = {
+    client = createAdminClient
+    val renewer = List(SecurityUtils.parseKafkaPrincipal("User:renewer"))
+
+    def generateTokenResult(maxLifeTimeMs: Int, expiryTimePeriodMs: Int, 
expectedTokenNum: Int): (CreateDelegationTokenResult, 
ExpireDelegationTokenResult) = {
+      val createResult = client.createDelegationToken(new 
CreateDelegationTokenOptions().renewers(renewer.asJava).maxlifeTimeMs(maxLifeTimeMs))
+      val tokenCreated = createResult.delegationToken.get
+      TestUtils.waitUntilTrue(() => brokers.forall(server => 
server.tokenCache.tokens().size() == expectedTokenNum),
+            "Timed out waiting for token to propagate to all servers")
+      val expireResult = client.expireDelegationToken(
+        tokenCreated.hmac(),
+        new 
ExpireDelegationTokenOptions().expiryTimePeriodMs(expiryTimePeriodMs)
+      )
+      (createResult, expireResult)
+    }
+
+    try {
+      // Note that maxTimestamp = token created time + maxLifeTimeMs
+      val (createResult1, expireResult1) = generateTokenResult(10000, -1, 1)
+      // if expiryTimePeriodMs < 0, token will be expired immediately.
+      
assertTrue(createResult1.delegationToken().get().tokenInfo().maxTimestamp() > 
expireResult1.expiryTimestamp().get())
+
+      // expireDelegationToken will decrease the value of expiryTimestamp, 
since this token is not expired,
+      // expiryTimestamp will be set to min(expiryTimestamp, maxTimestamp),

Review Comment:
   ditto. please check the bove comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to