Copilot commented on code in PR #20325: URL: https://github.com/apache/kafka/pull/20325#discussion_r2315358917
########## core/src/main/scala/kafka/server/AutoTopicCreationManager.scala: ########## @@ -53,17 +55,47 @@ trait AutoTopicCreationManager { requestContext: RequestContext ): Unit + def getTopicCreationErrors( + topicNames: Set[String], + errorCacheTtlMs: Long + ): Map[String, String] + + def close(): Unit = {} + } +case class CachedTopicCreationError( + errorMessage: String, + time: Time +) { + val timestamp: Long = time.milliseconds() Review Comment: The timestamp should be captured when the error is created, not when accessed. The current implementation captures the timestamp on object initialization, but since `time.milliseconds()` is called every time the case class is instantiated, this could lead to inconsistent timestamps if the Time instance is mutable or if multiple instances share the same Time object. ```suggestion errorMessage: String ) { val timestamp: Long = System.currentTimeMillis() ``` ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -2889,9 +2889,31 @@ class KafkaApis(val requestChannel: RequestChannel, } } else { autoTopicCreationManager.createStreamsInternalTopics(topicsToCreate, requestContext); + + // Check for cached topic creation errors only if there's already a MISSING_INTERNAL_TOPICS status + val hasMissingInternalTopicsStatus = responseData.status() != null && + responseData.status().stream().anyMatch(s => s.statusCode() == StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code()) + + if (hasMissingInternalTopicsStatus) { + // Calculate group-specific error cache TTL + val errorCacheTtlMs = Option(groupConfigManager.groupConfig(streamsGroupHeartbeatRequest.data.groupId).orElse(null)) + .map(_.streamsSessionTimeoutMs().toLong) + .getOrElse(config.groupCoordinatorConfig.streamsGroupSessionTimeoutMs().toLong) + + val cachedErrors = autoTopicCreationManager.getTopicCreationErrors(topicsToCreate.keys.toSet, errorCacheTtlMs) + if (cachedErrors.nonEmpty) { + val missingInternalTopicStatus = + responseData.status().stream().filter(x => x.statusCode() == StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code()).findFirst() + val creationErrorDetails = cachedErrors.map { case (topic, error) => s"$topic ($error)" }.mkString(", ") + if (missingInternalTopicStatus.isPresent) { + missingInternalTopicStatus.get().setStatusDetail( + missingInternalTopicStatus.get().statusDetail() + s"; Creation failed: $creationErrorDetails." + ) Review Comment: Potential null pointer exception if `statusDetail()` returns null. The concatenation should handle the case where the existing status detail is null. ########## core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala: ########## @@ -356,4 +365,230 @@ class AutoTopicCreationManagerTest { .setNumPartitions(numPartitions) .setReplicationFactor(replicationFactor) } + + @Test + def testTopicCreationErrorCaching(): Unit = { + autoTopicCreationManager = new DefaultAutoTopicCreationManager( + config, + brokerToController, + groupCoordinator, + transactionCoordinator, + shareCoordinator, + mockTime) + + val topics = Map( + "test-topic-1" -> new CreatableTopic().setName("test-topic-1").setNumPartitions(1).setReplicationFactor(1) + ) + val requestContext = initializeRequestContextWithUserPrincipal() + + autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext) + + val argumentCaptor = ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler]) + Mockito.verify(brokerToController).sendRequest( + any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]), + argumentCaptor.capture()) + + // Simulate a CreateTopicsResponse with errors + val createTopicsResponseData = new org.apache.kafka.common.message.CreateTopicsResponseData() + val topicResult = new org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult() + .setName("test-topic-1") + .setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code()) + .setErrorMessage("Topic 'test-topic-1' already exists.") + createTopicsResponseData.topics().add(topicResult) + + val createTopicsResponse = new CreateTopicsResponse(createTopicsResponseData) + val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1) + val clientResponse = new ClientResponse(header, null, null, + 0, 0, false, null, null, createTopicsResponse) + + // Trigger the completion handler + argumentCaptor.getValue.asInstanceOf[ControllerRequestCompletionHandler].onComplete(clientResponse) + + // Verify that the error was cached + val defaultTtlMs = config.groupCoordinatorConfig.streamsGroupSessionTimeoutMs() + val cachedErrors = autoTopicCreationManager.getTopicCreationErrors(Set("test-topic-1"), defaultTtlMs) + assertEquals(1, cachedErrors.size) + assertTrue(cachedErrors.contains("test-topic-1")) + assertEquals("Topic 'test-topic-1' already exists.", cachedErrors("test-topic-1")) + } + + @Test + def testGetTopicCreationErrorsWithMultipleTopics(): Unit = { + autoTopicCreationManager = new DefaultAutoTopicCreationManager( + config, + brokerToController, + groupCoordinator, + transactionCoordinator, + shareCoordinator, + mockTime) + + val topics = Map( + "success-topic" -> new CreatableTopic().setName("success-topic").setNumPartitions(1).setReplicationFactor(1), + "failed-topic" -> new CreatableTopic().setName("failed-topic").setNumPartitions(1).setReplicationFactor(1) + ) + val requestContext = initializeRequestContextWithUserPrincipal() + autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext) + + val argumentCaptor = ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler]) + Mockito.verify(brokerToController).sendRequest( + any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]), + argumentCaptor.capture()) + + // Simulate mixed response - one success, one failure + val createTopicsResponseData = new org.apache.kafka.common.message.CreateTopicsResponseData() + createTopicsResponseData.topics().add( + new org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult() + .setName("success-topic") + .setErrorCode(Errors.NONE.code()) + ) + createTopicsResponseData.topics().add( + new org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult() + .setName("failed-topic") + .setErrorCode(Errors.POLICY_VIOLATION.code()) + .setErrorMessage("Policy violation") + ) + + val createTopicsResponse = new CreateTopicsResponse(createTopicsResponseData) + val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1) + val clientResponse = new ClientResponse(header, null, null, + 0, 0, false, null, null, createTopicsResponse) + + argumentCaptor.getValue.asInstanceOf[ControllerRequestCompletionHandler].onComplete(clientResponse) + + // Only the failed topic should be cached + val defaultTtlMs = config.groupCoordinatorConfig.streamsGroupSessionTimeoutMs() + val cachedErrors = autoTopicCreationManager.getTopicCreationErrors(Set("success-topic", "failed-topic", "nonexistent-topic"), defaultTtlMs) + assertEquals(1, cachedErrors.size) + assertTrue(cachedErrors.contains("failed-topic")) + assertEquals("Policy violation", cachedErrors("failed-topic")) + } + + @Test + def testErrorCacheTTL(): Unit = { + autoTopicCreationManager = new DefaultAutoTopicCreationManager( + config, + brokerToController, + groupCoordinator, + transactionCoordinator, + shareCoordinator, + mockTime) + + + // First cache an error by simulating topic creation failure + val topics = Map( + "test-topic" -> new CreatableTopic().setName("test-topic").setNumPartitions(1).setReplicationFactor(1) + ) + val requestContext = initializeRequestContextWithUserPrincipal() + autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext) + + val argumentCaptor = ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler]) + Mockito.verify(brokerToController).sendRequest( + any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]), + argumentCaptor.capture()) + + // Simulate a CreateTopicsResponse with error + val createTopicsResponseData = new org.apache.kafka.common.message.CreateTopicsResponseData() + val topicResult = new org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult() + .setName("test-topic") + .setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code()) + .setErrorMessage("Invalid replication factor") + createTopicsResponseData.topics().add(topicResult) + + val createTopicsResponse = new CreateTopicsResponse(createTopicsResponseData) + val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1) + val clientResponse = new ClientResponse(header, null, null, + 0, 0, false, null, null, createTopicsResponse) + + // Cache the error at T0 + argumentCaptor.getValue.asInstanceOf[ControllerRequestCompletionHandler].onComplete(clientResponse) + + val shortTtlMs = 1000L // Use 1 second TTL for faster testing + + // Verify error is cached and accessible within TTL + val cachedErrors = autoTopicCreationManager.getTopicCreationErrors(Set("test-topic"), shortTtlMs) + assertEquals(1, cachedErrors.size) + assertEquals("Invalid replication factor", cachedErrors("test-topic")) + + // Advance time beyond TTL + mockTime.sleep(shortTtlMs + 100) // T0 + 1.1 seconds + + // Verify error is now expired and proactively cleaned up + val expiredErrors = autoTopicCreationManager.getTopicCreationErrors(Set("test-topic"), shortTtlMs) + assertTrue(expiredErrors.isEmpty, "Expired errors should be proactively cleaned up") + } + + @Test + def testErrorCacheLRUEviction(): Unit = { + // Create a config with a small cache size for testing + val props = TestUtils.createBrokerConfig(1) + props.setProperty(ServerConfigs.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout.toString) + props.setProperty(ServerConfigs.MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_CONFIG, "3") // Small cache size for testing + + props.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, internalTopicPartitions.toString) + props.setProperty(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, internalTopicPartitions.toString) + props.setProperty(ShareCoordinatorConfig.STATE_TOPIC_REPLICATION_FACTOR_CONFIG , internalTopicPartitions.toString) + + props.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, internalTopicReplicationFactor.toString) + props.setProperty(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, internalTopicReplicationFactor.toString) + props.setProperty(ShareCoordinatorConfig.STATE_TOPIC_NUM_PARTITIONS_CONFIG, internalTopicReplicationFactor.toString) + + val smallCacheConfig = KafkaConfig.fromProps(props) + + // Verify the configuration was properly set + assertEquals(3, smallCacheConfig.maxIncrementalFetchSessionCacheSlots, "Cache size configuration should be 3") + + // Replace the test class's config with our smallCacheConfig + // so that initializeRequestContext will use the correct config + config = smallCacheConfig Review Comment: Modifying the test class's config field directly could affect other tests if they run in the same instance. Consider creating a separate AutoTopicCreationManager instance with the small cache config instead of modifying the shared config field. ########## core/src/main/scala/kafka/server/AutoTopicCreationManager.scala: ########## @@ -96,7 +128,56 @@ class DefaultAutoTopicCreationManager( requestContext: RequestContext ): Unit = { if (topics.nonEmpty) { - sendCreateTopicRequest(topics, Some(requestContext)) + sendCreateTopicRequestWithErrorCaching(topics, Some(requestContext)) + } + } + + override def getTopicCreationErrors( + topicNames: Set[String], + errorCacheTtlMs: Long + ): Map[String, String] = { + // Proactively expire old entries using the provided TTL + expireOldEntries(errorCacheTtlMs) + + val errors = mutable.Map.empty[String, String] + + // Check requested topics + topicNames.foreach { topicName => + Option(topicCreationErrorCache.get(topicName)) match { + case Some(error) => + errors.put(topicName, error.errorMessage) + case None => + } + } + + errors.toMap + } + + /** + * Remove expired entries from the cache using the provided TTL. + * Since we use LinkedHashMap with insertion order, we only need to check + * entries from the beginning until we find a non-expired entry. + */ + private def expireOldEntries(ttlMs: Long): Unit = { + val currentTime = time.milliseconds() + + // Iterate and remove expired entries + val iterator = topicCreationErrorCache.entrySet().iterator() + + breakable { + while (iterator.hasNext) { + val entry = iterator.next() + val cachedError = entry.getValue + + if (currentTime - cachedError.timestamp > ttlMs) { + iterator.remove() + debug(s"Removed expired topic creation error cache entry for ${entry.getKey}") + } else { + // Since entries are in insertion order, if this entry is not expired, + // all following entries are also not expired + break() + } Review Comment: The assumption that entries are in insertion order and that if one entry is not expired, all following entries are also not expired is incorrect. The LinkedHashMap is configured with `accessOrder = false` (line 94), meaning it maintains insertion order, but entries can have different timestamps based on when they were inserted. Breaking early could leave expired entries in the cache. ```suggestion while (iterator.hasNext) { val entry = iterator.next() val cachedError = entry.getValue if (currentTime - cachedError.timestamp > ttlMs) { iterator.remove() debug(s"Removed expired topic creation error cache entry for ${entry.getKey}") ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org