Copilot commented on code in PR #20325:
URL: https://github.com/apache/kafka/pull/20325#discussion_r2315358917


##########
core/src/main/scala/kafka/server/AutoTopicCreationManager.scala:
##########
@@ -53,17 +55,47 @@ trait AutoTopicCreationManager {
     requestContext: RequestContext
   ): Unit
 
+  def getTopicCreationErrors(
+    topicNames: Set[String],
+    errorCacheTtlMs: Long
+  ): Map[String, String]
+
+  def close(): Unit = {}
+
 }
 
+case class CachedTopicCreationError(
+  errorMessage: String,
+  time: Time
+) {
+  val timestamp: Long = time.milliseconds()

Review Comment:
   The timestamp should be captured when the error is created, not when 
accessed. The current implementation captures the timestamp on object 
initialization, but since `time.milliseconds()` is called every time the case 
class is instantiated, this could lead to inconsistent timestamps if the Time 
instance is mutable or if multiple instances share the same Time object.
   ```suggestion
     errorMessage: String
   ) {
     val timestamp: Long = System.currentTimeMillis()
   ```



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2889,9 +2889,31 @@ class KafkaApis(val requestChannel: RequestChannel,
               }
             } else {
               
autoTopicCreationManager.createStreamsInternalTopics(topicsToCreate, 
requestContext);
+              
+              // Check for cached topic creation errors only if there's 
already a MISSING_INTERNAL_TOPICS status
+              val hasMissingInternalTopicsStatus = responseData.status() != 
null && 
+                responseData.status().stream().anyMatch(s => s.statusCode() == 
StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code())
+              
+              if (hasMissingInternalTopicsStatus) {
+                // Calculate group-specific error cache TTL
+                val errorCacheTtlMs = 
Option(groupConfigManager.groupConfig(streamsGroupHeartbeatRequest.data.groupId).orElse(null))
+                  .map(_.streamsSessionTimeoutMs().toLong)
+                  
.getOrElse(config.groupCoordinatorConfig.streamsGroupSessionTimeoutMs().toLong)
+                
+                val cachedErrors = 
autoTopicCreationManager.getTopicCreationErrors(topicsToCreate.keys.toSet, 
errorCacheTtlMs)
+                if (cachedErrors.nonEmpty) {
+                  val missingInternalTopicStatus =
+                    responseData.status().stream().filter(x => x.statusCode() 
== 
StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code()).findFirst()
+                  val creationErrorDetails = cachedErrors.map { case (topic, 
error) => s"$topic ($error)" }.mkString(", ")
+                  if (missingInternalTopicStatus.isPresent) {
+                    missingInternalTopicStatus.get().setStatusDetail(
+                      missingInternalTopicStatus.get().statusDetail() + s"; 
Creation failed: $creationErrorDetails."
+                    )

Review Comment:
   Potential null pointer exception if `statusDetail()` returns null. The 
concatenation should handle the case where the existing status detail is null.



##########
core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala:
##########
@@ -356,4 +365,230 @@ class AutoTopicCreationManagerTest {
       .setNumPartitions(numPartitions)
       .setReplicationFactor(replicationFactor)
   }
+
+  @Test
+  def testTopicCreationErrorCaching(): Unit = {
+    autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+      config,
+      brokerToController,
+      groupCoordinator,
+      transactionCoordinator,
+      shareCoordinator,
+      mockTime)
+
+    val topics = Map(
+      "test-topic-1" -> new 
CreatableTopic().setName("test-topic-1").setNumPartitions(1).setReplicationFactor(1)
+    )
+    val requestContext = initializeRequestContextWithUserPrincipal()
+
+    autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext)
+
+    val argumentCaptor = 
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
+    Mockito.verify(brokerToController).sendRequest(
+      any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
+      argumentCaptor.capture())
+
+    // Simulate a CreateTopicsResponse with errors
+    val createTopicsResponseData = new 
org.apache.kafka.common.message.CreateTopicsResponseData()
+    val topicResult = new 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+      .setName("test-topic-1")
+      .setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code())
+      .setErrorMessage("Topic 'test-topic-1' already exists.")
+    createTopicsResponseData.topics().add(topicResult)
+
+    val createTopicsResponse = new 
CreateTopicsResponse(createTopicsResponseData)
+    val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
+    val clientResponse = new ClientResponse(header, null, null,
+      0, 0, false, null, null, createTopicsResponse)
+
+    // Trigger the completion handler
+    
argumentCaptor.getValue.asInstanceOf[ControllerRequestCompletionHandler].onComplete(clientResponse)
+
+    // Verify that the error was cached
+    val defaultTtlMs = 
config.groupCoordinatorConfig.streamsGroupSessionTimeoutMs()
+    val cachedErrors = 
autoTopicCreationManager.getTopicCreationErrors(Set("test-topic-1"), 
defaultTtlMs)
+    assertEquals(1, cachedErrors.size)
+    assertTrue(cachedErrors.contains("test-topic-1"))
+    assertEquals("Topic 'test-topic-1' already exists.", 
cachedErrors("test-topic-1"))
+  }
+
+  @Test
+  def testGetTopicCreationErrorsWithMultipleTopics(): Unit = {
+    autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+      config,
+      brokerToController,
+      groupCoordinator,
+      transactionCoordinator,
+      shareCoordinator,
+      mockTime)
+
+    val topics = Map(
+      "success-topic" -> new 
CreatableTopic().setName("success-topic").setNumPartitions(1).setReplicationFactor(1),
+      "failed-topic" -> new 
CreatableTopic().setName("failed-topic").setNumPartitions(1).setReplicationFactor(1)
+    )
+    val requestContext = initializeRequestContextWithUserPrincipal()
+    autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext)
+
+    val argumentCaptor = 
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
+    Mockito.verify(brokerToController).sendRequest(
+      any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
+      argumentCaptor.capture())
+
+    // Simulate mixed response - one success, one failure
+    val createTopicsResponseData = new 
org.apache.kafka.common.message.CreateTopicsResponseData()
+    createTopicsResponseData.topics().add(
+      new 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+        .setName("success-topic")
+        .setErrorCode(Errors.NONE.code())
+    )
+    createTopicsResponseData.topics().add(
+      new 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+        .setName("failed-topic")
+        .setErrorCode(Errors.POLICY_VIOLATION.code())
+        .setErrorMessage("Policy violation")
+    )
+
+    val createTopicsResponse = new 
CreateTopicsResponse(createTopicsResponseData)
+    val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
+    val clientResponse = new ClientResponse(header, null, null,
+      0, 0, false, null, null, createTopicsResponse)
+
+    
argumentCaptor.getValue.asInstanceOf[ControllerRequestCompletionHandler].onComplete(clientResponse)
+
+    // Only the failed topic should be cached
+    val defaultTtlMs = 
config.groupCoordinatorConfig.streamsGroupSessionTimeoutMs()
+    val cachedErrors = 
autoTopicCreationManager.getTopicCreationErrors(Set("success-topic", 
"failed-topic", "nonexistent-topic"), defaultTtlMs)
+    assertEquals(1, cachedErrors.size)
+    assertTrue(cachedErrors.contains("failed-topic"))
+    assertEquals("Policy violation", cachedErrors("failed-topic"))
+  }
+
+  @Test 
+  def testErrorCacheTTL(): Unit = {
+    autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+      config,
+      brokerToController,
+      groupCoordinator,
+      transactionCoordinator,
+      shareCoordinator,
+      mockTime)
+
+
+    // First cache an error by simulating topic creation failure
+    val topics = Map(
+      "test-topic" -> new 
CreatableTopic().setName("test-topic").setNumPartitions(1).setReplicationFactor(1)
+    )
+    val requestContext = initializeRequestContextWithUserPrincipal()
+    autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext)
+
+    val argumentCaptor = 
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
+    Mockito.verify(brokerToController).sendRequest(
+      any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
+      argumentCaptor.capture())
+
+    // Simulate a CreateTopicsResponse with error
+    val createTopicsResponseData = new 
org.apache.kafka.common.message.CreateTopicsResponseData()
+    val topicResult = new 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+      .setName("test-topic")
+      .setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code())
+      .setErrorMessage("Invalid replication factor")
+    createTopicsResponseData.topics().add(topicResult)
+
+    val createTopicsResponse = new 
CreateTopicsResponse(createTopicsResponseData)
+    val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
+    val clientResponse = new ClientResponse(header, null, null,
+      0, 0, false, null, null, createTopicsResponse)
+
+    // Cache the error at T0
+    
argumentCaptor.getValue.asInstanceOf[ControllerRequestCompletionHandler].onComplete(clientResponse)
+
+    val shortTtlMs = 1000L // Use 1 second TTL for faster testing
+    
+    // Verify error is cached and accessible within TTL
+    val cachedErrors = 
autoTopicCreationManager.getTopicCreationErrors(Set("test-topic"), shortTtlMs)
+    assertEquals(1, cachedErrors.size)
+    assertEquals("Invalid replication factor", cachedErrors("test-topic"))
+
+    // Advance time beyond TTL
+    mockTime.sleep(shortTtlMs + 100) // T0 + 1.1 seconds
+
+    // Verify error is now expired and proactively cleaned up
+    val expiredErrors = 
autoTopicCreationManager.getTopicCreationErrors(Set("test-topic"), shortTtlMs)
+    assertTrue(expiredErrors.isEmpty, "Expired errors should be proactively 
cleaned up")
+  }
+
+  @Test
+  def testErrorCacheLRUEviction(): Unit = {
+    // Create a config with a small cache size for testing
+    val props = TestUtils.createBrokerConfig(1)
+    props.setProperty(ServerConfigs.REQUEST_TIMEOUT_MS_CONFIG, 
requestTimeout.toString)
+    
props.setProperty(ServerConfigs.MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_CONFIG,
 "3") // Small cache size for testing
+    
+    
props.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG,
 internalTopicPartitions.toString)
+    
props.setProperty(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
 internalTopicPartitions.toString)
+    
props.setProperty(ShareCoordinatorConfig.STATE_TOPIC_REPLICATION_FACTOR_CONFIG 
, internalTopicPartitions.toString)
+    
+    props.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, 
internalTopicReplicationFactor.toString)
+    
props.setProperty(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 
internalTopicReplicationFactor.toString)
+    
props.setProperty(ShareCoordinatorConfig.STATE_TOPIC_NUM_PARTITIONS_CONFIG, 
internalTopicReplicationFactor.toString)
+    
+    val smallCacheConfig = KafkaConfig.fromProps(props)
+    
+    // Verify the configuration was properly set
+    assertEquals(3, smallCacheConfig.maxIncrementalFetchSessionCacheSlots, 
"Cache size configuration should be 3")
+    
+    // Replace the test class's config with our smallCacheConfig
+    // so that initializeRequestContext will use the correct config
+    config = smallCacheConfig

Review Comment:
   Modifying the test class's config field directly could affect other tests if 
they run in the same instance. Consider creating a separate 
AutoTopicCreationManager instance with the small cache config instead of 
modifying the shared config field.



##########
core/src/main/scala/kafka/server/AutoTopicCreationManager.scala:
##########
@@ -96,7 +128,56 @@ class DefaultAutoTopicCreationManager(
     requestContext: RequestContext
   ): Unit = {
     if (topics.nonEmpty) {
-      sendCreateTopicRequest(topics, Some(requestContext))
+      sendCreateTopicRequestWithErrorCaching(topics, Some(requestContext))
+    }
+  }
+
+  override def getTopicCreationErrors(
+    topicNames: Set[String],
+    errorCacheTtlMs: Long
+  ): Map[String, String] = {
+    // Proactively expire old entries using the provided TTL
+    expireOldEntries(errorCacheTtlMs)
+    
+    val errors = mutable.Map.empty[String, String]
+    
+    // Check requested topics  
+    topicNames.foreach { topicName =>
+      Option(topicCreationErrorCache.get(topicName)) match {
+        case Some(error) =>
+          errors.put(topicName, error.errorMessage)
+        case None =>
+      }
+    }
+    
+    errors.toMap
+  }
+
+  /**
+   * Remove expired entries from the cache using the provided TTL.
+   * Since we use LinkedHashMap with insertion order, we only need to check 
+   * entries from the beginning until we find a non-expired entry.
+   */
+  private def expireOldEntries(ttlMs: Long): Unit = {
+    val currentTime = time.milliseconds()
+    
+    // Iterate and remove expired entries
+    val iterator = topicCreationErrorCache.entrySet().iterator()
+    
+    breakable {
+      while (iterator.hasNext) {
+        val entry = iterator.next()
+        val cachedError = entry.getValue
+        
+        if (currentTime - cachedError.timestamp > ttlMs) {
+          iterator.remove()
+          debug(s"Removed expired topic creation error cache entry for 
${entry.getKey}")
+        } else {
+          // Since entries are in insertion order, if this entry is not 
expired,
+          // all following entries are also not expired
+          break()
+        }

Review Comment:
   The assumption that entries are in insertion order and that if one entry is 
not expired, all following entries are also not expired is incorrect. The 
LinkedHashMap is configured with `accessOrder = false` (line 94), meaning it 
maintains insertion order, but entries can have different timestamps based on 
when they were inserted. Breaking early could leave expired entries in the 
cache.
   ```suggestion
       while (iterator.hasNext) {
         val entry = iterator.next()
         val cachedError = entry.getValue
         if (currentTime - cachedError.timestamp > ttlMs) {
           iterator.remove()
           debug(s"Removed expired topic creation error cache entry for 
${entry.getKey}")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to