frankvicky commented on code in PR #18618: URL: https://github.com/apache/kafka/pull/18618#discussion_r1924625542
########## core/src/main/scala/kafka/server/DelegationTokenManager.scala: ########## @@ -157,126 +118,25 @@ class DelegationTokenManager(val config: KafkaConfig, * @param token */ def updateToken(token: DelegationToken): Unit = { - updateCache(token) - } - - /** - * - * @param owner - * @param renewers - * @param maxLifeTimeMs - * @param responseCallback - */ - def createToken(owner: KafkaPrincipal, - tokenRequester: KafkaPrincipal, - renewers: List[KafkaPrincipal], - maxLifeTimeMs: Long, - responseCallback: CreateResponseCallback): Unit = { - // Must be forwarded to KRaft Controller or handled in DelegationTokenManagerZk - throw new IllegalStateException("API createToken was not forwarded to a handler.") - } - - /** - * - * @param principal - * @param hmac - * @param renewLifeTimeMs - * @param renewCallback - */ - def renewToken(principal: KafkaPrincipal, - hmac: ByteBuffer, - renewLifeTimeMs: Long, - renewCallback: RenewResponseCallback): Unit = { - // Must be forwarded to KRaft Controller or handled in DelegationTokenManagerZk - throw new IllegalStateException("API renewToken was not forwarded to a handler.") + val hmacString = token.hmacAsBase64String + val scramCredentialMap = prepareScramCredentials(hmacString) + tokenCache.updateCache(token, scramCredentialMap.asJava) } def getDelegationToken(tokenInfo: TokenInformation): DelegationToken = { val hmac = createHmac(tokenInfo.tokenId, secretKey) new DelegationToken(tokenInfo, hmac) } - /** - * - * @param principal - * @param hmac - * @param expireLifeTimeMs - * @param expireResponseCallback - */ - def expireToken(principal: KafkaPrincipal, - hmac: ByteBuffer, - expireLifeTimeMs: Long, - expireResponseCallback: ExpireResponseCallback): Unit = { - // Must be forwarded to KRaft Controller or handled in DelegationTokenManagerZk - throw new IllegalStateException("API expireToken was not forwarded to a handler.") - } - /** * * @param tokenId */ def removeToken(tokenId: String): Unit = { - removeCache(tokenId) - } - - /** - * - * @param tokenId - */ - protected def removeCache(tokenId: String): Unit = { tokenCache.removeCache(tokenId) } - /** - * - * @return - */ - def expireTokens(): Unit = { - lock.synchronized { - for (tokenInfo <- getAllTokenInformation) { - val now = time.milliseconds - if (tokenInfo.maxTimestamp < now || tokenInfo.expiryTimestamp < now) { - info(s"Delegation token expired for token: ${tokenInfo.tokenId} for owner: ${tokenInfo.owner}") - removeToken(tokenInfo.tokenId) - } - } - } - } - - def getAllTokenInformation: List[TokenInformation] = tokenCache.tokens.asScala.toList - def getTokens(filterToken: TokenInformation => Boolean): List[DelegationToken] = { - getAllTokenInformation.filter(filterToken).map(token => getDelegationToken(token)) - } - -} - -case class CreateTokenResult(owner: KafkaPrincipal, - tokenRequester: KafkaPrincipal, - issueTimestamp: Long, - expiryTimestamp: Long, - maxTimestamp: Long, - tokenId: String, - hmac: Array[Byte], - error: Errors) { - - override def equals(other: Any): Boolean = { - other match { - case that: CreateTokenResult => - error.equals(that.error) && - owner.equals(that.owner) && - tokenRequester.equals(that.tokenRequester) && - tokenId.equals(that.tokenId) && - issueTimestamp.equals(that.issueTimestamp) && - expiryTimestamp.equals(that.expiryTimestamp) && - maxTimestamp.equals(that.maxTimestamp) && - (hmac sameElements that.hmac) - case _ => false - } - } - - override def hashCode(): Int = { - val fields = Seq(owner, tokenRequester, issueTimestamp, expiryTimestamp, maxTimestamp, tokenId, hmac, error) - fields.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b) + tokenCache.tokens.asScala.toList.filter(filterToken).map(token => getDelegationToken(token)) Review Comment: Hi @ijuma Thanks for the review. Without `toList`, it would return `Iterable` which does not fit the return type. Do you have any thoughts about this ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org