lucasbru commented on code in PR #20325:
URL: https://github.com/apache/kafka/pull/20325#discussion_r2336984764


##########
core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala:
##########
@@ -356,4 +371,226 @@ class AutoTopicCreationManagerTest {
       .setNumPartitions(numPartitions)
       .setReplicationFactor(replicationFactor)
   }
+
+  @Test
+  def testTopicCreationErrorCaching(): Unit = {
+    autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+      config,
+      brokerToController,
+      groupCoordinator,
+      transactionCoordinator,
+      shareCoordinator,
+      mockTime)
+
+    val topics = Map(
+      "test-topic-1" -> new 
CreatableTopic().setName("test-topic-1").setNumPartitions(1).setReplicationFactor(1)
+    )
+    val requestContext = initializeRequestContextWithUserPrincipal()
+
+    autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext, config.groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs() 
* 2)
+
+    val argumentCaptor = 
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
+    Mockito.verify(brokerToController).sendRequest(
+      any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
+      argumentCaptor.capture())
+
+    // Simulate a CreateTopicsResponse with errors
+    val createTopicsResponseData = new 
org.apache.kafka.common.message.CreateTopicsResponseData()
+    val topicResult = new 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+      .setName("test-topic-1")
+      .setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code())
+      .setErrorMessage("Topic 'test-topic-1' already exists.")
+    createTopicsResponseData.topics().add(topicResult)
+
+    val createTopicsResponse = new 
CreateTopicsResponse(createTopicsResponseData)
+    val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
+    val clientResponse = new ClientResponse(header, null, null,
+      0, 0, false, null, null, createTopicsResponse)
+
+    // Trigger the completion handler
+    
argumentCaptor.getValue.asInstanceOf[ControllerRequestCompletionHandler].onComplete(clientResponse)
+
+    // Verify that the error was cached
+    val cachedErrors = 
autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("test-topic-1"),
 mockTime.milliseconds())
+    assertEquals(1, cachedErrors.size)
+    assertTrue(cachedErrors.contains("test-topic-1"))
+    assertEquals("Topic 'test-topic-1' already exists.", 
cachedErrors("test-topic-1"))
+  }
+
+  @Test
+  def testGetTopicCreationErrorsWithMultipleTopics(): Unit = {
+    autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+      config,
+      brokerToController,
+      groupCoordinator,
+      transactionCoordinator,
+      shareCoordinator,
+      mockTime)
+
+    val topics = Map(
+      "success-topic" -> new 
CreatableTopic().setName("success-topic").setNumPartitions(1).setReplicationFactor(1),
+      "failed-topic" -> new 
CreatableTopic().setName("failed-topic").setNumPartitions(1).setReplicationFactor(1)
+    )
+    val requestContext = initializeRequestContextWithUserPrincipal()
+    autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext, config.groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs() 
* 2)
+
+    val argumentCaptor = 
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
+    Mockito.verify(brokerToController).sendRequest(
+      any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
+      argumentCaptor.capture())
+
+    // Simulate mixed response - one success, one failure
+    val createTopicsResponseData = new 
org.apache.kafka.common.message.CreateTopicsResponseData()
+    createTopicsResponseData.topics().add(
+      new 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+        .setName("success-topic")
+        .setErrorCode(Errors.NONE.code())
+    )
+    createTopicsResponseData.topics().add(
+      new 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+        .setName("failed-topic")
+        .setErrorCode(Errors.POLICY_VIOLATION.code())
+        .setErrorMessage("Policy violation")
+    )
+
+    val createTopicsResponse = new 
CreateTopicsResponse(createTopicsResponseData)
+    val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
+    val clientResponse = new ClientResponse(header, null, null,
+      0, 0, false, null, null, createTopicsResponse)
+
+    
argumentCaptor.getValue.asInstanceOf[ControllerRequestCompletionHandler].onComplete(clientResponse)
+
+    // Only the failed topic should be cached
+    val cachedErrors = 
autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("success-topic",
 "failed-topic", "nonexistent-topic"), mockTime.milliseconds())
+    assertEquals(1, cachedErrors.size)
+    assertTrue(cachedErrors.contains("failed-topic"))
+    assertEquals("Policy violation", cachedErrors("failed-topic"))
+  }
+
+  @Test 
+  def testErrorCacheTTL(): Unit = {
+    autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+      config,
+      brokerToController,
+      groupCoordinator,
+      transactionCoordinator,
+      shareCoordinator,
+      mockTime)
+
+
+    // First cache an error by simulating topic creation failure
+    val topics = Map(
+      "test-topic" -> new 
CreatableTopic().setName("test-topic").setNumPartitions(1).setReplicationFactor(1)
+    )
+    val requestContext = initializeRequestContextWithUserPrincipal()
+    val shortTtlMs = 1000L // Use 1 second TTL for faster testing
+    autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext, shortTtlMs)
+
+    val argumentCaptor = 
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
+    Mockito.verify(brokerToController).sendRequest(
+      any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
+      argumentCaptor.capture())
+
+    // Simulate a CreateTopicsResponse with error
+    val createTopicsResponseData = new 
org.apache.kafka.common.message.CreateTopicsResponseData()
+    val topicResult = new 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+      .setName("test-topic")
+      .setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code())
+      .setErrorMessage("Invalid replication factor")
+    createTopicsResponseData.topics().add(topicResult)
+
+    val createTopicsResponse = new 
CreateTopicsResponse(createTopicsResponseData)
+    val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
+    val clientResponse = new ClientResponse(header, null, null,
+      0, 0, false, null, null, createTopicsResponse)
+
+    // Cache the error at T0
+    
argumentCaptor.getValue.asInstanceOf[ControllerRequestCompletionHandler].onComplete(clientResponse)
+
+    // Verify error is cached and accessible within TTL
+    val cachedErrors = 
autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("test-topic"),
 mockTime.milliseconds())
+    assertEquals(1, cachedErrors.size)
+    assertEquals("Invalid replication factor", cachedErrors("test-topic"))
+
+    // Advance time beyond TTL
+    mockTime.sleep(shortTtlMs + 100) // T0 + 1.1 seconds
+
+    // Verify error is now expired and proactively cleaned up
+    val expiredErrors = 
autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("test-topic"),
 mockTime.milliseconds())
+    assertTrue(expiredErrors.isEmpty, "Expired errors should be proactively 
cleaned up")
+  }
+
+  @Test
+  def testErrorCacheLRUEviction(): Unit = {
+    // Create a config with a small cache size for testing
+    val props = TestUtils.createBrokerConfig(1)
+    props.setProperty(ServerConfigs.REQUEST_TIMEOUT_MS_CONFIG, 
requestTimeout.toString)
+    
props.setProperty(ServerConfigs.MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_CONFIG,
 "3") // Small cache size for testing

Review Comment:
   I don't understand why we are doing this



##########
core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala:
##########
@@ -356,4 +371,226 @@ class AutoTopicCreationManagerTest {
       .setNumPartitions(numPartitions)
       .setReplicationFactor(replicationFactor)
   }
+
+  @Test
+  def testTopicCreationErrorCaching(): Unit = {
+    autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+      config,
+      brokerToController,
+      groupCoordinator,
+      transactionCoordinator,
+      shareCoordinator,
+      mockTime)
+
+    val topics = Map(
+      "test-topic-1" -> new 
CreatableTopic().setName("test-topic-1").setNumPartitions(1).setReplicationFactor(1)
+    )
+    val requestContext = initializeRequestContextWithUserPrincipal()
+
+    autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext, config.groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs() 
* 2)
+
+    val argumentCaptor = 
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
+    Mockito.verify(brokerToController).sendRequest(
+      any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
+      argumentCaptor.capture())
+
+    // Simulate a CreateTopicsResponse with errors
+    val createTopicsResponseData = new 
org.apache.kafka.common.message.CreateTopicsResponseData()
+    val topicResult = new 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+      .setName("test-topic-1")
+      .setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code())
+      .setErrorMessage("Topic 'test-topic-1' already exists.")
+    createTopicsResponseData.topics().add(topicResult)
+
+    val createTopicsResponse = new 
CreateTopicsResponse(createTopicsResponseData)
+    val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
+    val clientResponse = new ClientResponse(header, null, null,
+      0, 0, false, null, null, createTopicsResponse)
+
+    // Trigger the completion handler
+    
argumentCaptor.getValue.asInstanceOf[ControllerRequestCompletionHandler].onComplete(clientResponse)
+
+    // Verify that the error was cached
+    val cachedErrors = 
autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("test-topic-1"),
 mockTime.milliseconds())
+    assertEquals(1, cachedErrors.size)
+    assertTrue(cachedErrors.contains("test-topic-1"))
+    assertEquals("Topic 'test-topic-1' already exists.", 
cachedErrors("test-topic-1"))
+  }
+
+  @Test
+  def testGetTopicCreationErrorsWithMultipleTopics(): Unit = {
+    autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+      config,
+      brokerToController,
+      groupCoordinator,
+      transactionCoordinator,
+      shareCoordinator,
+      mockTime)
+
+    val topics = Map(
+      "success-topic" -> new 
CreatableTopic().setName("success-topic").setNumPartitions(1).setReplicationFactor(1),
+      "failed-topic" -> new 
CreatableTopic().setName("failed-topic").setNumPartitions(1).setReplicationFactor(1)
+    )
+    val requestContext = initializeRequestContextWithUserPrincipal()
+    autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext, config.groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs() 
* 2)
+
+    val argumentCaptor = 
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
+    Mockito.verify(brokerToController).sendRequest(
+      any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
+      argumentCaptor.capture())
+
+    // Simulate mixed response - one success, one failure
+    val createTopicsResponseData = new 
org.apache.kafka.common.message.CreateTopicsResponseData()
+    createTopicsResponseData.topics().add(
+      new 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+        .setName("success-topic")
+        .setErrorCode(Errors.NONE.code())
+    )
+    createTopicsResponseData.topics().add(
+      new 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+        .setName("failed-topic")
+        .setErrorCode(Errors.POLICY_VIOLATION.code())
+        .setErrorMessage("Policy violation")
+    )
+
+    val createTopicsResponse = new 
CreateTopicsResponse(createTopicsResponseData)
+    val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
+    val clientResponse = new ClientResponse(header, null, null,
+      0, 0, false, null, null, createTopicsResponse)
+
+    
argumentCaptor.getValue.asInstanceOf[ControllerRequestCompletionHandler].onComplete(clientResponse)
+
+    // Only the failed topic should be cached
+    val cachedErrors = 
autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("success-topic",
 "failed-topic", "nonexistent-topic"), mockTime.milliseconds())
+    assertEquals(1, cachedErrors.size)
+    assertTrue(cachedErrors.contains("failed-topic"))
+    assertEquals("Policy violation", cachedErrors("failed-topic"))
+  }
+
+  @Test 
+  def testErrorCacheTTL(): Unit = {
+    autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+      config,
+      brokerToController,
+      groupCoordinator,
+      transactionCoordinator,
+      shareCoordinator,
+      mockTime)
+
+
+    // First cache an error by simulating topic creation failure
+    val topics = Map(
+      "test-topic" -> new 
CreatableTopic().setName("test-topic").setNumPartitions(1).setReplicationFactor(1)
+    )
+    val requestContext = initializeRequestContextWithUserPrincipal()
+    val shortTtlMs = 1000L // Use 1 second TTL for faster testing
+    autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext, shortTtlMs)
+
+    val argumentCaptor = 
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
+    Mockito.verify(brokerToController).sendRequest(
+      any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
+      argumentCaptor.capture())
+
+    // Simulate a CreateTopicsResponse with error
+    val createTopicsResponseData = new 
org.apache.kafka.common.message.CreateTopicsResponseData()
+    val topicResult = new 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+      .setName("test-topic")
+      .setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code())
+      .setErrorMessage("Invalid replication factor")
+    createTopicsResponseData.topics().add(topicResult)
+
+    val createTopicsResponse = new 
CreateTopicsResponse(createTopicsResponseData)
+    val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
+    val clientResponse = new ClientResponse(header, null, null,
+      0, 0, false, null, null, createTopicsResponse)
+
+    // Cache the error at T0
+    
argumentCaptor.getValue.asInstanceOf[ControllerRequestCompletionHandler].onComplete(clientResponse)
+
+    // Verify error is cached and accessible within TTL
+    val cachedErrors = 
autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("test-topic"),
 mockTime.milliseconds())
+    assertEquals(1, cachedErrors.size)
+    assertEquals("Invalid replication factor", cachedErrors("test-topic"))
+
+    // Advance time beyond TTL
+    mockTime.sleep(shortTtlMs + 100) // T0 + 1.1 seconds
+
+    // Verify error is now expired and proactively cleaned up
+    val expiredErrors = 
autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("test-topic"),
 mockTime.milliseconds())
+    assertTrue(expiredErrors.isEmpty, "Expired errors should be proactively 
cleaned up")
+  }
+
+  @Test
+  def testErrorCacheLRUEviction(): Unit = {
+    // Create a config with a small cache size for testing
+    val props = TestUtils.createBrokerConfig(1)
+    props.setProperty(ServerConfigs.REQUEST_TIMEOUT_MS_CONFIG, 
requestTimeout.toString)
+    
props.setProperty(ServerConfigs.MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_CONFIG,
 "3") // Small cache size for testing
+    
+    
props.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG,
 internalTopicPartitions.toString)
+    
props.setProperty(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
 internalTopicPartitions.toString)
+    
props.setProperty(ShareCoordinatorConfig.STATE_TOPIC_REPLICATION_FACTOR_CONFIG 
, internalTopicPartitions.toString)
+    
+    props.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, 
internalTopicReplicationFactor.toString)
+    
props.setProperty(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 
internalTopicReplicationFactor.toString)
+    
props.setProperty(ShareCoordinatorConfig.STATE_TOPIC_NUM_PARTITIONS_CONFIG, 
internalTopicReplicationFactor.toString)
+    
+    val smallCacheConfig = KafkaConfig.fromProps(props)
+    
+    // Verify the configuration was properly set
+    assertEquals(3, smallCacheConfig.maxIncrementalFetchSessionCacheSlots, 
"Cache size configuration should be 3")

Review Comment:
   Why are we doing this



##########
core/src/main/scala/kafka/server/AutoTopicCreationManager.scala:
##########
@@ -50,21 +52,111 @@ trait AutoTopicCreationManager {
 
   def createStreamsInternalTopics(
     topics: Map[String, CreatableTopic],
-    requestContext: RequestContext
+    requestContext: RequestContext,
+    timeoutMs: Long
   ): Unit
 
+  def getStreamsInternalTopicCreationErrors(
+    topicNames: Set[String],
+    currentTimeMs: Long
+  ): Map[String, String]
+
+  def close(): Unit = {}
+
+}
+
+/**
+ * Thread-safe cache that stores topic creation errors with per-entry 
expiration.
+ * - Expiration: maintained by a min-heap (priority queue) on expiration time
+ * - Capacity: enforced by insertion-order removal (keeps the most recently 
inserted entries)
+ */
+private[server] class ExpiringErrorCache(maxSize: Int, time: Time) {
+
+  private case class Entry(topicName: String, errorMessage: String, 
expirationTimeMs: Long)
+
+  private val byTopic = new ConcurrentHashMap[String, Entry]()
+  private val expiryQueue = new java.util.PriorityQueue[Entry](11, new 
java.util.Comparator[Entry] {
+    override def compare(a: Entry, b: Entry): Int = 
java.lang.Long.compare(a.expirationTimeMs, b.expirationTimeMs)
+  })
+  private val lock = new ReentrantLock()
+
+  def put(topicName: String, errorMessage: String, ttlMs: Long): Unit = {
+    lock.lock()
+    try {
+      val existing = byTopic.get(topicName)
+      if (existing != null) {
+        // Remove old instance from structures
+        expiryQueue.remove(existing)

Review Comment:
   This remove is a linear time operation, right? I think we should avoid that. 
I think it may be fine to just leave it in the expiryQueue, since once it 
expired, we will no deletethe key from the map if the new value was replaced.



##########
core/src/main/scala/kafka/server/AutoTopicCreationManager.scala:
##########
@@ -50,21 +52,111 @@ trait AutoTopicCreationManager {
 
   def createStreamsInternalTopics(
     topics: Map[String, CreatableTopic],
-    requestContext: RequestContext
+    requestContext: RequestContext,
+    timeoutMs: Long
   ): Unit
 
+  def getStreamsInternalTopicCreationErrors(
+    topicNames: Set[String],
+    currentTimeMs: Long
+  ): Map[String, String]
+
+  def close(): Unit = {}
+
+}
+
+/**
+ * Thread-safe cache that stores topic creation errors with per-entry 
expiration.
+ * - Expiration: maintained by a min-heap (priority queue) on expiration time
+ * - Capacity: enforced by insertion-order removal (keeps the most recently 
inserted entries)
+ */
+private[server] class ExpiringErrorCache(maxSize: Int, time: Time) {
+
+  private case class Entry(topicName: String, errorMessage: String, 
expirationTimeMs: Long)
+
+  private val byTopic = new ConcurrentHashMap[String, Entry]()
+  private val expiryQueue = new java.util.PriorityQueue[Entry](11, new 
java.util.Comparator[Entry] {
+    override def compare(a: Entry, b: Entry): Int = 
java.lang.Long.compare(a.expirationTimeMs, b.expirationTimeMs)
+  })
+  private val lock = new ReentrantLock()
+
+  def put(topicName: String, errorMessage: String, ttlMs: Long): Unit = {
+    lock.lock()
+    try {
+      val existing = byTopic.get(topicName)
+      if (existing != null) {
+        // Remove old instance from structures
+        expiryQueue.remove(existing)
+      }
+
+      val currentTimeMs = time.milliseconds()
+      val expirationTimeMs = currentTimeMs + ttlMs
+      val entry = Entry(topicName, errorMessage, expirationTimeMs)
+      byTopic.put(topicName, entry)
+      expiryQueue.add(entry)
+
+      // Clean up expired entries
+      while (!expiryQueue.isEmpty && expiryQueue.peek().expirationTimeMs <= 
currentTimeMs) {
+        val expired = expiryQueue.poll()
+        val current = byTopic.get(expired.topicName)
+        if (current != null && (current eq expired)) {
+          byTopic.remove(expired.topicName)
+        }
+      }
+
+      // Enforce capacity by removing entries with earliest expiration time 
first
+      while (byTopic.size() > maxSize && !expiryQueue.isEmpty) {

Review Comment:
   Could you merge this loop into the loop above by just checking the condition
   
   `!expiryQueue.isEmpty && (expiryQueue.peek().expirationTimeMs <= 
currentTimeMs || byTopic.size() > maxSize)`
   
   in the while loop?



##########
core/src/main/scala/kafka/server/AutoTopicCreationManager.scala:
##########
@@ -50,21 +52,111 @@ trait AutoTopicCreationManager {
 
   def createStreamsInternalTopics(
     topics: Map[String, CreatableTopic],
-    requestContext: RequestContext
+    requestContext: RequestContext,
+    timeoutMs: Long
   ): Unit
 
+  def getStreamsInternalTopicCreationErrors(
+    topicNames: Set[String],
+    currentTimeMs: Long
+  ): Map[String, String]
+
+  def close(): Unit = {}
+
+}
+
+/**
+ * Thread-safe cache that stores topic creation errors with per-entry 
expiration.
+ * - Expiration: maintained by a min-heap (priority queue) on expiration time
+ * - Capacity: enforced by insertion-order removal (keeps the most recently 
inserted entries)
+ */
+private[server] class ExpiringErrorCache(maxSize: Int, time: Time) {
+
+  private case class Entry(topicName: String, errorMessage: String, 
expirationTimeMs: Long)
+
+  private val byTopic = new ConcurrentHashMap[String, Entry]()
+  private val expiryQueue = new java.util.PriorityQueue[Entry](11, new 
java.util.Comparator[Entry] {
+    override def compare(a: Entry, b: Entry): Int = 
java.lang.Long.compare(a.expirationTimeMs, b.expirationTimeMs)
+  })
+  private val lock = new ReentrantLock()
+
+  def put(topicName: String, errorMessage: String, ttlMs: Long): Unit = {
+    lock.lock()
+    try {
+      val existing = byTopic.get(topicName)
+      if (existing != null) {
+        // Remove old instance from structures
+        expiryQueue.remove(existing)
+      }
+
+      val currentTimeMs = time.milliseconds()
+      val expirationTimeMs = currentTimeMs + ttlMs
+      val entry = Entry(topicName, errorMessage, expirationTimeMs)
+      byTopic.put(topicName, entry)
+      expiryQueue.add(entry)
+
+      // Clean up expired entries
+      while (!expiryQueue.isEmpty && expiryQueue.peek().expirationTimeMs <= 
currentTimeMs) {
+        val expired = expiryQueue.poll()
+        val current = byTopic.get(expired.topicName)
+        if (current != null && (current eq expired)) {

Review Comment:
   Is `eq` doing a deep comparison here? Maybe it would be enough to compare 
the timestamps, the deep comparison is expensive



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2888,10 +2888,35 @@ class KafkaApis(val requestChannel: RequestChannel,
                 )
               }
             } else {
-              
autoTopicCreationManager.createStreamsInternalTopics(topicsToCreate, 
requestContext);
+              // Compute group-specific timeout for caching errors (2 * 
heartbeat interval)
+              val heartbeatIntervalMs = 
Option(groupConfigManager.groupConfig(streamsGroupHeartbeatRequest.data.groupId).orElse(null))

Review Comment:
   The TTL is calculated as 2 × heartbeat interval, but the PR description 
mentions 3 × request.timeout.ms. This inconsistency could be confusing. Can you 
please fix the PR description? I would keep it much shorter and less AI 
generated so that it is easier to keep up-to-date.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to