mjsax commented on code in PR #20325:
URL: https://github.com/apache/kafka/pull/20325#discussion_r2268282851


##########
core/src/main/scala/kafka/server/AutoTopicCreationManager.scala:
##########
@@ -64,6 +77,9 @@ class DefaultAutoTopicCreationManager(
 ) extends AutoTopicCreationManager with Logging {
 
   private val inflightTopics = Collections.newSetFromMap(new 
ConcurrentHashMap[String, java.lang.Boolean]())
+  private val topicCreationErrorCache = new ConcurrentHashMap[String, 
CachedTopicCreationError]()
+  private val errorCacheTtlMs = config.requestTimeoutMs.toLong * 3 // 3x 
request timeout

Review Comment:
   Not sure if we should couple this to request timeout? -- Might be better to 
couple it to session timeout? If a client does not heartbeat within session 
timeout, we would remove it from the group.
   
   Side question: would we need to track error per streams group, and use a 
group specific ttl, given that each group could set an individual session 
timeout?



##########
core/src/main/scala/kafka/server/AutoTopicCreationManager.scala:
##########
@@ -112,6 +128,45 @@ class DefaultAutoTopicCreationManager(
     }
   }
 
+  override def getTopicCreationErrors(
+    topicNames: Set[String]
+  ): Map[String, String] = {
+    val currentTime = System.currentTimeMillis()
+    val errors = mutable.Map.empty[String, String]
+    val expiredKeys = mutable.Set.empty[String]
+    
+    // Check requested topics and collect expired keys
+    topicNames.foreach { topicName =>
+      Option(topicCreationErrorCache.get(topicName)) match {
+        case Some(cachedError) if (currentTime - cachedError.timestamp) <= 
errorCacheTtlMs =>

Review Comment:
   Why would we exclude the error message if we still have it? -- I thought the 
ttl would apply for the case, that we never returned an error, and want to drop 
it on the floor, via some cleanup process?



##########
core/src/main/scala/kafka/server/AutoTopicCreationManager.scala:
##########
@@ -135,10 +190,17 @@ class DefaultAutoTopicCreationManager(
         clearInflightRequests(creatableTopics)
         if (response.authenticationException() != null) {
           warn(s"Auto topic creation failed for ${creatableTopics.keys} with 
authentication exception")
+          cacheTopicCreationErrors(creatableTopics.keys.toSet, "Authentication 
failed")

Review Comment:
   Should we use the error message from `response.authenticationException()` 
instead of a manually created error message? Also wondering, why we don't log 
the actual exception error message in the `warn` above?



##########
core/src/main/scala/kafka/server/AutoTopicCreationManager.scala:
##########
@@ -263,4 +325,31 @@ class DefaultAutoTopicCreationManager(
 
     (creatableTopics, uncreatableTopics)
   }
+
+  private def cacheTopicCreationErrors(topicNames: Set[String], errorMessage: 
String): Unit = {
+    val timestamp = System.currentTimeMillis()
+    topicNames.foreach { topicName =>
+      topicCreationErrorCache.put(topicName, 
CachedTopicCreationError(errorMessage, timestamp))
+    }
+  }
+
+  private def cacheTopicCreationErrorsFromResponse(response: 
CreateTopicsResponse): Unit = {
+    val timestamp = System.currentTimeMillis()

Review Comment:
   One more.



##########
core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala:
##########
@@ -393,4 +393,176 @@ class AutoTopicCreationManagerTest {
       .setNumPartitions(numPartitions)
       .setReplicationFactor(replicationFactor)
   }
+
+  @Test
+  def testTopicCreationErrorCaching(): Unit = {
+    autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+      config,
+      brokerToController,
+      groupCoordinator,
+      transactionCoordinator,
+      shareCoordinator)
+
+    val topics = Map(
+      "test-topic-1" -> new 
CreatableTopic().setName("test-topic-1").setNumPartitions(1).setReplicationFactor(1)
+    )
+    val requestContext = initializeRequestContextWithUserPrincipal()
+
+    autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext)
+
+    val argumentCaptor = 
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
+    Mockito.verify(brokerToController).sendRequest(
+      any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
+      argumentCaptor.capture())
+
+    // Simulate a CreateTopicsResponse with errors
+    val createTopicsResponseData = new 
org.apache.kafka.common.message.CreateTopicsResponseData()
+    val topicResult = new 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+      .setName("test-topic-1")
+      .setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code())
+      .setErrorMessage("Topic 'test-topic-1' already exists.")
+    createTopicsResponseData.topics().add(topicResult)
+
+    val createTopicsResponse = new 
CreateTopicsResponse(createTopicsResponseData)
+    val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
+    val clientResponse = new ClientResponse(header, null, null,
+      0, 0, false, null, null, createTopicsResponse)
+
+    // Trigger the completion handler
+    
argumentCaptor.getValue.asInstanceOf[ControllerRequestCompletionHandler].onComplete(clientResponse)
+
+    // Verify that the error was cached
+    val cachedErrors = 
autoTopicCreationManager.getTopicCreationErrors(Set("test-topic-1"))
+    assertEquals(1, cachedErrors.size)
+    assertTrue(cachedErrors.contains("test-topic-1"))
+    assertEquals("Topic 'test-topic-1' already exists.", 
cachedErrors("test-topic-1"))
+  }
+
+  @Test
+  def testGetTopicCreationErrorsWithMultipleTopics(): Unit = {
+    autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+      config,
+      brokerToController,
+      groupCoordinator,
+      transactionCoordinator,
+      shareCoordinator)
+
+    val topics = Map(
+      "success-topic" -> new 
CreatableTopic().setName("success-topic").setNumPartitions(1).setReplicationFactor(1),
+      "failed-topic" -> new 
CreatableTopic().setName("failed-topic").setNumPartitions(1).setReplicationFactor(1)
+    )
+    val requestContext = initializeRequestContextWithUserPrincipal()
+    autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext)
+
+    val argumentCaptor = 
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
+    Mockito.verify(brokerToController).sendRequest(
+      any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
+      argumentCaptor.capture())
+
+    // Simulate mixed response - one success, one failure
+    val createTopicsResponseData = new 
org.apache.kafka.common.message.CreateTopicsResponseData()
+    createTopicsResponseData.topics().add(
+      new 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+        .setName("success-topic")
+        .setErrorCode(Errors.NONE.code())
+    )
+    createTopicsResponseData.topics().add(
+      new 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+        .setName("failed-topic")
+        .setErrorCode(Errors.POLICY_VIOLATION.code())
+        .setErrorMessage("Policy violation")
+    )
+
+    val createTopicsResponse = new 
CreateTopicsResponse(createTopicsResponseData)
+    val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
+    val clientResponse = new ClientResponse(header, null, null,
+      0, 0, false, null, null, createTopicsResponse)
+
+    
argumentCaptor.getValue.asInstanceOf[ControllerRequestCompletionHandler].onComplete(clientResponse)
+
+    // Only the failed topic should be cached
+    val cachedErrors = 
autoTopicCreationManager.getTopicCreationErrors(Set("success-topic", 
"failed-topic", "nonexistent-topic"))
+    assertEquals(1, cachedErrors.size)
+    assertTrue(cachedErrors.contains("failed-topic"))
+    assertEquals("Policy violation", cachedErrors("failed-topic"))
+  }
+
+  @Test
+  def testLazyCleanupOfExpiredCacheEntries(): Unit = {
+    // Test will verify that expired entries are cleaned up when accessed
+    // We'll simulate the passage of time by directly manipulating the cache
+    
+    autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+      config,
+      brokerToController,
+      groupCoordinator,
+      transactionCoordinator,
+      shareCoordinator)
+    
+    // Manually add an expired entry to the cache using reflection

Review Comment:
   Seems you need to use reflection as you cannot modify the time? If we use 
`MockTime` in the test, we should be able to avoid reflection.



##########
core/src/main/scala/kafka/server/AutoTopicCreationManager.scala:
##########
@@ -64,6 +77,9 @@ class DefaultAutoTopicCreationManager(
 ) extends AutoTopicCreationManager with Logging {
 
   private val inflightTopics = Collections.newSetFromMap(new 
ConcurrentHashMap[String, java.lang.Boolean]())
+  private val topicCreationErrorCache = new ConcurrentHashMap[String, 
CachedTopicCreationError]()
+  private val errorCacheTtlMs = config.requestTimeoutMs.toLong * 3 // 3x 
request timeout
+  private val maxCacheSize = 1000

Review Comment:
   Wondering why we would need to bound the cache size? What is the reasoning 
for this?



##########
core/src/main/scala/kafka/server/AutoTopicCreationManager.scala:
##########
@@ -112,6 +128,45 @@ class DefaultAutoTopicCreationManager(
     }
   }
 
+  override def getTopicCreationErrors(
+    topicNames: Set[String]
+  ): Map[String, String] = {
+    val currentTime = System.currentTimeMillis()
+    val errors = mutable.Map.empty[String, String]
+    val expiredKeys = mutable.Set.empty[String]
+    
+    // Check requested topics and collect expired keys
+    topicNames.foreach { topicName =>
+      Option(topicCreationErrorCache.get(topicName)) match {
+        case Some(cachedError) if (currentTime - cachedError.timestamp) <= 
errorCacheTtlMs =>
+          errors.put(topicName, cachedError.errorMessage)
+        case Some(_) =>
+          expiredKeys += topicName

Review Comment:
   Not sure if I understand this logic? I though we would expire an entry, if 
we never returned it to the client, and if TTL passed?



##########
core/src/main/scala/kafka/server/AutoTopicCreationManager.scala:
##########
@@ -112,6 +128,45 @@ class DefaultAutoTopicCreationManager(
     }
   }
 
+  override def getTopicCreationErrors(
+    topicNames: Set[String]
+  ): Map[String, String] = {
+    val currentTime = System.currentTimeMillis()

Review Comment:
   We should never call `System.currentTimeMillis()` directly, but use a `Time` 
object -- as the `AutoTopicCreationManager` does not have one yet, we need to 
add it to the constructor and make available, and trace back where 
`AutoTopicCreationManager` is setup to find the `Time` object there.
   
   Using `Time` interface allows us to mock time in unit tests.



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -10949,6 +10950,59 @@ class KafkaApisTest extends Logging {
     )
   }
 
+  @Test
+  def testStreamsGroupHeartbeatRequestWithCachedTopicCreationErrors(): Unit = {

Review Comment:
   It's a little unclear to me, what this method is supposed to actually verify?



##########
core/src/main/scala/kafka/server/AutoTopicCreationManager.scala:
##########
@@ -135,10 +190,17 @@ class DefaultAutoTopicCreationManager(
         clearInflightRequests(creatableTopics)
         if (response.authenticationException() != null) {
           warn(s"Auto topic creation failed for ${creatableTopics.keys} with 
authentication exception")
+          cacheTopicCreationErrors(creatableTopics.keys.toSet, "Authentication 
failed")
         } else if (response.versionMismatch() != null) {
           warn(s"Auto topic creation failed for ${creatableTopics.keys} with 
invalid version exception")
+          cacheTopicCreationErrors(creatableTopics.keys.toSet, "Version 
mismatch")

Review Comment:
   As above



##########
core/src/main/scala/kafka/server/AutoTopicCreationManager.scala:
##########
@@ -112,6 +128,45 @@ class DefaultAutoTopicCreationManager(
     }
   }
 
+  override def getTopicCreationErrors(
+    topicNames: Set[String]
+  ): Map[String, String] = {
+    val currentTime = System.currentTimeMillis()
+    val errors = mutable.Map.empty[String, String]
+    val expiredKeys = mutable.Set.empty[String]
+    
+    // Check requested topics and collect expired keys
+    topicNames.foreach { topicName =>
+      Option(topicCreationErrorCache.get(topicName)) match {
+        case Some(cachedError) if (currentTime - cachedError.timestamp) <= 
errorCacheTtlMs =>
+          errors.put(topicName, cachedError.errorMessage)
+        case Some(_) =>

Review Comment:
   If we are using `Some(cacheError)` above, would this case actually every be 
executed (I am not a Scala person, but my understanding is, that 
`Same(cacheError)` would be a "catch all"?



##########
core/src/main/scala/kafka/server/AutoTopicCreationManager.scala:
##########
@@ -263,4 +325,31 @@ class DefaultAutoTopicCreationManager(
 
     (creatableTopics, uncreatableTopics)
   }
+
+  private def cacheTopicCreationErrors(topicNames: Set[String], errorMessage: 
String): Unit = {
+    val timestamp = System.currentTimeMillis()

Review Comment:
   As above



##########
core/src/main/scala/kafka/server/AutoTopicCreationManager.scala:
##########
@@ -263,4 +325,31 @@ class DefaultAutoTopicCreationManager(
 
     (creatableTopics, uncreatableTopics)
   }
+
+  private def cacheTopicCreationErrors(topicNames: Set[String], errorMessage: 
String): Unit = {
+    val timestamp = System.currentTimeMillis()
+    topicNames.foreach { topicName =>
+      topicCreationErrorCache.put(topicName, 
CachedTopicCreationError(errorMessage, timestamp))

Review Comment:
   Should we change `CachedTopicCreationError` constructor to set timestamp 
automatically, instead of passing in it? If we just get current ms ts anyway, 
we can move the code into the constructor. Or would we ever need to pass in 
something different?



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2889,6 +2889,24 @@ class KafkaApis(val requestChannel: RequestChannel,
               }
             } else {
               
autoTopicCreationManager.createStreamsInternalTopics(topicsToCreate, 
requestContext);
+              
+              // Check for cached topic creation errors only if there's 
already a MISSING_INTERNAL_TOPICS status
+              val hasMissingInternalTopicsStatus = responseData.status() != 
null && 
+                responseData.status().stream().anyMatch(s => s.statusCode() == 
StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code())

Review Comment:
   Not sure if I can follow? Why would we need to check 
`MISSING_INTERNAL_TOPICS` status?



##########
core/src/main/scala/kafka/server/AutoTopicCreationManager.scala:
##########
@@ -135,10 +190,17 @@ class DefaultAutoTopicCreationManager(
         clearInflightRequests(creatableTopics)
         if (response.authenticationException() != null) {
           warn(s"Auto topic creation failed for ${creatableTopics.keys} with 
authentication exception")
+          cacheTopicCreationErrors(creatableTopics.keys.toSet, "Authentication 
failed")
         } else if (response.versionMismatch() != null) {
           warn(s"Auto topic creation failed for ${creatableTopics.keys} with 
invalid version exception")
+          cacheTopicCreationErrors(creatableTopics.keys.toSet, "Version 
mismatch")
         } else {
-          debug(s"Auto topic creation completed for ${creatableTopics.keys} 
with response ${response.responseBody}.")
+          response.responseBody() match {
+            case createTopicsResponse: CreateTopicsResponse =>
+              cacheTopicCreationErrorsFromResponse(createTopicsResponse)

Review Comment:
   I thought, for this `else` branch, the request was successful and no error 
would be returned?



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -10949,6 +10950,59 @@ class KafkaApisTest extends Logging {
     )
   }
 
+  @Test
+  def testStreamsGroupHeartbeatRequestWithCachedTopicCreationErrors(): Unit = {
+    val features = mock(classOf[FinalizedFeatures])
+    
when(features.finalizedFeatures()).thenReturn(util.Map.of(StreamsVersion.FEATURE_NAME,
 1.toShort))
+
+    metadataCache = mock(classOf[KRaftMetadataCache])
+    when(metadataCache.features()).thenReturn(features)
+
+    val streamsGroupHeartbeatRequest = new 
StreamsGroupHeartbeatRequestData().setGroupId("group")
+    val requestChannelRequest = buildRequest(new 
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, 
true).build())
+
+    val future = new CompletableFuture[StreamsGroupHeartbeatResult]()
+    when(groupCoordinator.streamsGroupHeartbeat(
+      requestChannelRequest.context,
+      streamsGroupHeartbeatRequest
+    )).thenReturn(future)
+
+    // Mock AutoTopicCreationManager to return cached errors
+    val mockAutoTopicCreationManager = mock(classOf[AutoTopicCreationManager])
+    
when(mockAutoTopicCreationManager.getTopicCreationErrors(Set("test-topic")))
+      .thenReturn(Map("test-topic" -> "INVALID_REPLICATION_FACTOR"))
+
+    kafkaApis = createKafkaApis(autoTopicCreationManager = 
Some(mockAutoTopicCreationManager))
+    kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+    // Group coordinator returns MISSING_INTERNAL_TOPICS status and topics to 
create
+    val missingTopics = util.Map.of("test-topic", new CreatableTopic())
+    val streamsGroupHeartbeatResponse = new StreamsGroupHeartbeatResponseData()
+      .setMemberId("member")
+      .setStatus(util.List.of(
+        new StreamsGroupHeartbeatResponseData.Status()
+          
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code())
+          .setStatusDetail("Internal topics are missing: [test-topic]")
+      ))
+
+    future.complete(new 
StreamsGroupHeartbeatResult(streamsGroupHeartbeatResponse, missingTopics))
+    val response = 
verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
+    
+    assertEquals(Errors.NONE.code, response.data.errorCode())
+    assertEquals(null, response.data.errorMessage())
+    
+    // Verify that the cached error was appended to the existing status detail

Review Comment:
   Are we really verifying this? It seem our test code, does assemble the 
`StreamsGroupHeartbeatResponseData`, so we don't really execute prod code? So 
are we only verifying that our test code does setup the right response? For 
this case, it seems the test would not actually test anything?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to