lucasbru commented on code in PR #20325: URL: https://github.com/apache/kafka/pull/20325#discussion_r2315376555
########## core/src/main/scala/kafka/server/AutoTopicCreationManager.scala: ########## @@ -53,17 +55,47 @@ trait AutoTopicCreationManager { requestContext: RequestContext ): Unit + def getTopicCreationErrors( + topicNames: Set[String], + errorCacheTtlMs: Long + ): Map[String, String] + + def close(): Unit = {} + } +case class CachedTopicCreationError( + errorMessage: String, + time: Time +) { + val timestamp: Long = time.milliseconds() +} + + class DefaultAutoTopicCreationManager( config: KafkaConfig, channelManager: NodeToControllerChannelManager, groupCoordinator: GroupCoordinator, txnCoordinator: TransactionCoordinator, - shareCoordinator: ShareCoordinator + shareCoordinator: ShareCoordinator, + time: Time ) extends AutoTopicCreationManager with Logging { private val inflightTopics = Collections.newSetFromMap(new ConcurrentHashMap[String, java.lang.Boolean]()) + + // Use MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_CONFIG as the size limit for the error cache + // This provides a reasonable bound (default 1000) to prevent unbounded growth + private val maxCacheSize = config.maxIncrementalFetchSessionCacheSlots Review Comment: Using that config seems a bit random. I would just hard-code it. ########## core/src/main/scala/kafka/server/AutoTopicCreationManager.scala: ########## @@ -122,23 +203,12 @@ class DefaultAutoTopicCreationManager( override def onComplete(response: ClientResponse): Unit = { clearInflightRequests(creatableTopics) if (response.authenticationException() != null) { - warn(s"Auto topic creation failed for ${creatableTopics.keys} with authentication exception") + val authException = response.authenticationException() + warn(s"Auto topic creation failed for ${creatableTopics.keys} with authentication exception: ${authException.getMessage}") } else if (response.versionMismatch() != null) { - warn(s"Auto topic creation failed for ${creatableTopics.keys} with invalid version exception") + val versionException = response.versionMismatch() + warn(s"Auto topic creation failed for ${creatableTopics.keys} with version mismatch exception: ${versionException.getMessage}") } else { - if (response.hasResponse) { Review Comment: why did you remove this code? I would revert all changes in `sendCreateTopicRequest` ########## core/src/main/scala/kafka/server/AutoTopicCreationManager.scala: ########## @@ -53,17 +55,47 @@ trait AutoTopicCreationManager { requestContext: RequestContext ): Unit + def getTopicCreationErrors( Review Comment: `getStreamsInternalTopicCreationErrors`? ########## core/src/main/scala/kafka/server/AutoTopicCreationManager.scala: ########## @@ -53,17 +55,47 @@ trait AutoTopicCreationManager { requestContext: RequestContext ): Unit + def getTopicCreationErrors( + topicNames: Set[String], + errorCacheTtlMs: Long + ): Map[String, String] + + def close(): Unit = {} + } +case class CachedTopicCreationError( + errorMessage: String, + time: Time +) { + val timestamp: Long = time.milliseconds() +} + + class DefaultAutoTopicCreationManager( config: KafkaConfig, channelManager: NodeToControllerChannelManager, groupCoordinator: GroupCoordinator, txnCoordinator: TransactionCoordinator, - shareCoordinator: ShareCoordinator + shareCoordinator: ShareCoordinator, + time: Time ) extends AutoTopicCreationManager with Logging { private val inflightTopics = Collections.newSetFromMap(new ConcurrentHashMap[String, java.lang.Boolean]()) + + // Use MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_CONFIG as the size limit for the error cache + // This provides a reasonable bound (default 1000) to prevent unbounded growth + private val maxCacheSize = config.maxIncrementalFetchSessionCacheSlots + info(s"AutoTopicCreationManager initialized with error cache size limit: $maxCacheSize") Review Comment: Remove the extra logging ########## core/src/main/scala/kafka/server/AutoTopicCreationManager.scala: ########## @@ -53,17 +55,47 @@ trait AutoTopicCreationManager { requestContext: RequestContext ): Unit + def getTopicCreationErrors( + topicNames: Set[String], + errorCacheTtlMs: Long + ): Map[String, String] + + def close(): Unit = {} + } +case class CachedTopicCreationError( + errorMessage: String, + time: Time +) { + val timestamp: Long = time.milliseconds() +} + + class DefaultAutoTopicCreationManager( config: KafkaConfig, channelManager: NodeToControllerChannelManager, groupCoordinator: GroupCoordinator, txnCoordinator: TransactionCoordinator, - shareCoordinator: ShareCoordinator + shareCoordinator: ShareCoordinator, + time: Time ) extends AutoTopicCreationManager with Logging { private val inflightTopics = Collections.newSetFromMap(new ConcurrentHashMap[String, java.lang.Boolean]()) + + // Use MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_CONFIG as the size limit for the error cache + // This provides a reasonable bound (default 1000) to prevent unbounded growth + private val maxCacheSize = config.maxIncrementalFetchSessionCacheSlots + info(s"AutoTopicCreationManager initialized with error cache size limit: $maxCacheSize") + + // LRU cache with size limit to prevent unbounded memory growth + private val topicCreationErrorCache = Collections.synchronizedMap( + new java.util.LinkedHashMap[String, CachedTopicCreationError](16, 0.75f, false) { + override def removeEldestEntry(eldest: java.util.Map.Entry[String, CachedTopicCreationError]): Boolean = { + size() > maxCacheSize Review Comment: I kind of expected you to implement a new subclass of `Cache` for this. I think we can also do it this way. Could we also remove the eldest entry if it is expired? ########## core/src/main/scala/kafka/server/AutoTopicCreationManager.scala: ########## @@ -96,7 +128,56 @@ class DefaultAutoTopicCreationManager( requestContext: RequestContext ): Unit = { if (topics.nonEmpty) { - sendCreateTopicRequest(topics, Some(requestContext)) + sendCreateTopicRequestWithErrorCaching(topics, Some(requestContext)) + } + } + + override def getTopicCreationErrors( + topicNames: Set[String], + errorCacheTtlMs: Long + ): Map[String, String] = { + // Proactively expire old entries using the provided TTL + expireOldEntries(errorCacheTtlMs) + + val errors = mutable.Map.empty[String, String] + + // Check requested topics + topicNames.foreach { topicName => + Option(topicCreationErrorCache.get(topicName)) match { + case Some(error) => + errors.put(topicName, error.errorMessage) + case None => + } + } + + errors.toMap + } + + /** + * Remove expired entries from the cache using the provided TTL. + * Since we use LinkedHashMap with insertion order, we only need to check + * entries from the beginning until we find a non-expired entry. + */ + private def expireOldEntries(ttlMs: Long): Unit = { Review Comment: I think the expiration won't work if we have different TTLs for different groups, right? Since the entries will not expire in insertion order. I think we may need a priority queue and hashmap to solve this correctly. We should probably put this into a little helper class that is synchronized. ########## core/src/main/scala/kafka/server/AutoTopicCreationManager.scala: ########## @@ -53,17 +55,47 @@ trait AutoTopicCreationManager { requestContext: RequestContext ): Unit + def getTopicCreationErrors( + topicNames: Set[String], + errorCacheTtlMs: Long + ): Map[String, String] + + def close(): Unit = {} + } +case class CachedTopicCreationError( Review Comment: Is this public or private? Can we make this a purely internal thing inside the topic creation manager, since it does not appear in the interface? ########## core/src/main/scala/kafka/server/AutoTopicCreationManager.scala: ########## @@ -96,7 +128,56 @@ class DefaultAutoTopicCreationManager( requestContext: RequestContext ): Unit = { if (topics.nonEmpty) { - sendCreateTopicRequest(topics, Some(requestContext)) + sendCreateTopicRequestWithErrorCaching(topics, Some(requestContext)) + } + } + + override def getTopicCreationErrors( + topicNames: Set[String], + errorCacheTtlMs: Long Review Comment: Passing the TTL here is incorrect. The TTL is defined group-specific, so you are expiring topic creation errors for one group with the TTL for a different group. I think we need to pass the errorCacheTTL into `createStreamsInternalTopics`, and store the expiry time instead of the error receival timestamp in `CachedTopicCreationError`. ########## core/src/main/scala/kafka/server/AutoTopicCreationManager.scala: ########## @@ -264,4 +334,103 @@ class DefaultAutoTopicCreationManager( (creatableTopics, uncreatableTopics) } + + private def sendCreateTopicRequestWithErrorCaching( + creatableTopics: Map[String, CreatableTopic], + requestContext: Option[RequestContext] + ): Seq[MetadataResponseTopic] = { + val topicsToCreate = new CreateTopicsRequestData.CreatableTopicCollection(creatableTopics.size) + topicsToCreate.addAll(creatableTopics.values.asJavaCollection) + + val createTopicsRequest = new CreateTopicsRequest.Builder( + new CreateTopicsRequestData() + .setTimeoutMs(config.requestTimeoutMs) + .setTopics(topicsToCreate) + ) + + val requestCompletionHandler = new ControllerRequestCompletionHandler { + override def onTimeout(): Unit = { + clearInflightRequests(creatableTopics) + debug(s"Auto topic creation timed out for ${creatableTopics.keys}.") Review Comment: Shouldn't we add a `cacheTopicCreationErrors` in this case as well? ########## core/src/main/scala/kafka/server/AutoTopicCreationManager.scala: ########## @@ -264,4 +334,103 @@ class DefaultAutoTopicCreationManager( (creatableTopics, uncreatableTopics) } + + private def sendCreateTopicRequestWithErrorCaching( + creatableTopics: Map[String, CreatableTopic], + requestContext: Option[RequestContext] + ): Seq[MetadataResponseTopic] = { + val topicsToCreate = new CreateTopicsRequestData.CreatableTopicCollection(creatableTopics.size) + topicsToCreate.addAll(creatableTopics.values.asJavaCollection) + + val createTopicsRequest = new CreateTopicsRequest.Builder( + new CreateTopicsRequestData() + .setTimeoutMs(config.requestTimeoutMs) + .setTopics(topicsToCreate) + ) + + val requestCompletionHandler = new ControllerRequestCompletionHandler { + override def onTimeout(): Unit = { + clearInflightRequests(creatableTopics) + debug(s"Auto topic creation timed out for ${creatableTopics.keys}.") + } + + override def onComplete(response: ClientResponse): Unit = { + clearInflightRequests(creatableTopics) + if (response.authenticationException() != null) { + val authException = response.authenticationException() + warn(s"Auto topic creation failed for ${creatableTopics.keys} with authentication exception: ${authException.getMessage}") + cacheTopicCreationErrors(creatableTopics.keys.toSet, authException.getMessage) + } else if (response.versionMismatch() != null) { + val versionException = response.versionMismatch() + warn(s"Auto topic creation failed for ${creatableTopics.keys} with version mismatch exception: ${versionException.getMessage}") + cacheTopicCreationErrors(creatableTopics.keys.toSet, versionException.getMessage) + } else { + response.responseBody() match { + case createTopicsResponse: CreateTopicsResponse => + cacheTopicCreationErrorsFromResponse(createTopicsResponse) + case _ => + debug(s"Auto topic creation completed for ${creatableTopics.keys} with response ${response.responseBody}.") + } + } + } + } + + val request = requestContext.map { context => + val requestVersion = + channelManager.controllerApiVersions.toScala match { + case None => + // We will rely on the Metadata request to be retried in the case + // that the latest version is not usable by the controller. + ApiKeys.CREATE_TOPICS.latestVersion() + case Some(nodeApiVersions) => + nodeApiVersions.latestUsableVersion(ApiKeys.CREATE_TOPICS) + } + + // Borrow client information such as client id and correlation id from the original request, + // in order to correlate the create request with the original metadata request. + val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS, + requestVersion, + context.clientId, + context.correlationId) + ForwardingManager.buildEnvelopeRequest(context, + createTopicsRequest.build(requestVersion).serializeWithHeader(requestHeader)) + }.getOrElse(createTopicsRequest) + + channelManager.sendRequest(request, requestCompletionHandler) + + val creatableTopicResponses = creatableTopics.keySet.toSeq.map { topic => + new MetadataResponseTopic() + .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code) + .setName(topic) + .setIsInternal(Topic.isInternal(topic)) + } + + info(s"Sent auto-creation request for ${creatableTopics.keys} to the active controller.") Review Comment: Remove this info log, or set it to debug level. We need to be careful to not create to many log messages that will spam the kafka logs with not so relevant information. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org