RaidenE1 commented on code in PR #20325:
URL: https://github.com/apache/kafka/pull/20325#discussion_r2271540985


##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -10949,6 +10950,59 @@ class KafkaApisTest extends Logging {
     )
   }
 
+  @Test
+  def testStreamsGroupHeartbeatRequestWithCachedTopicCreationErrors(): Unit = {
+    val features = mock(classOf[FinalizedFeatures])
+    
when(features.finalizedFeatures()).thenReturn(util.Map.of(StreamsVersion.FEATURE_NAME,
 1.toShort))
+
+    metadataCache = mock(classOf[KRaftMetadataCache])
+    when(metadataCache.features()).thenReturn(features)
+
+    val streamsGroupHeartbeatRequest = new 
StreamsGroupHeartbeatRequestData().setGroupId("group")
+    val requestChannelRequest = buildRequest(new 
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, 
true).build())
+
+    val future = new CompletableFuture[StreamsGroupHeartbeatResult]()
+    when(groupCoordinator.streamsGroupHeartbeat(
+      requestChannelRequest.context,
+      streamsGroupHeartbeatRequest
+    )).thenReturn(future)
+
+    // Mock AutoTopicCreationManager to return cached errors
+    val mockAutoTopicCreationManager = mock(classOf[AutoTopicCreationManager])
+    
when(mockAutoTopicCreationManager.getTopicCreationErrors(Set("test-topic")))
+      .thenReturn(Map("test-topic" -> "INVALID_REPLICATION_FACTOR"))
+
+    kafkaApis = createKafkaApis(autoTopicCreationManager = 
Some(mockAutoTopicCreationManager))
+    kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+    // Group coordinator returns MISSING_INTERNAL_TOPICS status and topics to 
create
+    val missingTopics = util.Map.of("test-topic", new CreatableTopic())
+    val streamsGroupHeartbeatResponse = new StreamsGroupHeartbeatResponseData()
+      .setMemberId("member")
+      .setStatus(util.List.of(
+        new StreamsGroupHeartbeatResponseData.Status()
+          
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code())
+          .setStatusDetail("Internal topics are missing: [test-topic]")
+      ))
+
+    future.complete(new 
StreamsGroupHeartbeatResult(streamsGroupHeartbeatResponse, missingTopics))
+    val response = 
verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
+    
+    assertEquals(Errors.NONE.code, response.data.errorCode())
+    assertEquals(null, response.data.errorMessage())
+    
+    // Verify that the cached error was appended to the existing status detail

Review Comment:
   added mock create topic



##########
core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala:
##########
@@ -393,4 +393,176 @@ class AutoTopicCreationManagerTest {
       .setNumPartitions(numPartitions)
       .setReplicationFactor(replicationFactor)
   }
+
+  @Test
+  def testTopicCreationErrorCaching(): Unit = {
+    autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+      config,
+      brokerToController,
+      groupCoordinator,
+      transactionCoordinator,
+      shareCoordinator)
+
+    val topics = Map(
+      "test-topic-1" -> new 
CreatableTopic().setName("test-topic-1").setNumPartitions(1).setReplicationFactor(1)
+    )
+    val requestContext = initializeRequestContextWithUserPrincipal()
+
+    autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext)
+
+    val argumentCaptor = 
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
+    Mockito.verify(brokerToController).sendRequest(
+      any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
+      argumentCaptor.capture())
+
+    // Simulate a CreateTopicsResponse with errors
+    val createTopicsResponseData = new 
org.apache.kafka.common.message.CreateTopicsResponseData()
+    val topicResult = new 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+      .setName("test-topic-1")
+      .setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code())
+      .setErrorMessage("Topic 'test-topic-1' already exists.")
+    createTopicsResponseData.topics().add(topicResult)
+
+    val createTopicsResponse = new 
CreateTopicsResponse(createTopicsResponseData)
+    val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
+    val clientResponse = new ClientResponse(header, null, null,
+      0, 0, false, null, null, createTopicsResponse)
+
+    // Trigger the completion handler
+    
argumentCaptor.getValue.asInstanceOf[ControllerRequestCompletionHandler].onComplete(clientResponse)
+
+    // Verify that the error was cached
+    val cachedErrors = 
autoTopicCreationManager.getTopicCreationErrors(Set("test-topic-1"))
+    assertEquals(1, cachedErrors.size)
+    assertTrue(cachedErrors.contains("test-topic-1"))
+    assertEquals("Topic 'test-topic-1' already exists.", 
cachedErrors("test-topic-1"))
+  }
+
+  @Test
+  def testGetTopicCreationErrorsWithMultipleTopics(): Unit = {
+    autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+      config,
+      brokerToController,
+      groupCoordinator,
+      transactionCoordinator,
+      shareCoordinator)
+
+    val topics = Map(
+      "success-topic" -> new 
CreatableTopic().setName("success-topic").setNumPartitions(1).setReplicationFactor(1),
+      "failed-topic" -> new 
CreatableTopic().setName("failed-topic").setNumPartitions(1).setReplicationFactor(1)
+    )
+    val requestContext = initializeRequestContextWithUserPrincipal()
+    autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext)
+
+    val argumentCaptor = 
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
+    Mockito.verify(brokerToController).sendRequest(
+      any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
+      argumentCaptor.capture())
+
+    // Simulate mixed response - one success, one failure
+    val createTopicsResponseData = new 
org.apache.kafka.common.message.CreateTopicsResponseData()
+    createTopicsResponseData.topics().add(
+      new 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+        .setName("success-topic")
+        .setErrorCode(Errors.NONE.code())
+    )
+    createTopicsResponseData.topics().add(
+      new 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+        .setName("failed-topic")
+        .setErrorCode(Errors.POLICY_VIOLATION.code())
+        .setErrorMessage("Policy violation")
+    )
+
+    val createTopicsResponse = new 
CreateTopicsResponse(createTopicsResponseData)
+    val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
+    val clientResponse = new ClientResponse(header, null, null,
+      0, 0, false, null, null, createTopicsResponse)
+
+    
argumentCaptor.getValue.asInstanceOf[ControllerRequestCompletionHandler].onComplete(clientResponse)
+
+    // Only the failed topic should be cached
+    val cachedErrors = 
autoTopicCreationManager.getTopicCreationErrors(Set("success-topic", 
"failed-topic", "nonexistent-topic"))
+    assertEquals(1, cachedErrors.size)
+    assertTrue(cachedErrors.contains("failed-topic"))
+    assertEquals("Policy violation", cachedErrors("failed-topic"))
+  }
+
+  @Test
+  def testLazyCleanupOfExpiredCacheEntries(): Unit = {
+    // Test will verify that expired entries are cleaned up when accessed
+    // We'll simulate the passage of time by directly manipulating the cache
+    
+    autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+      config,
+      brokerToController,
+      groupCoordinator,
+      transactionCoordinator,
+      shareCoordinator)
+    
+    // Manually add an expired entry to the cache using reflection

Review Comment:
   use mock time and remove reflection



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to