lianetm commented on code in PR #21509:
URL: https://github.com/apache/kafka/pull/21509#discussion_r2946007477


##########
core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala:
##########
@@ -735,6 +735,183 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) 
extends GroupCoordinatorB
     )
   }
 
+  /**
+   * Helper to set up two topics (foo with 3 partitions, bar with 2 
partitions),
+   * join a consumer group, and commit offsets to both topics.
+   */
+  private def setupTopicsJoinAndCommit(): (Uuid, Uuid, String, Int) = {
+    createOffsetsTopic()
+
+    // Create two topics.
+    val fooTopicId = createTopic(topic = "foo", numPartitions = 3)
+    val barTopicId = createTopic(topic = "bar", numPartitions = 2)
+
+    // Join the consumer group.
+    val (memberId, memberEpoch) = joinConsumerGroup("grp", useNewProtocol = 
true)
+
+    // Commit offsets for both topics.
+    for (partitionId <- 0 to 2) {
+      commitOffset(
+        groupId = "grp",
+        memberId = memberId,
+        memberEpoch = memberEpoch,
+        topic = "foo",
+        topicId = fooTopicId,
+        partition = partitionId,
+        offset = 100L + partitionId,
+        expectedError = Errors.NONE,
+        version = ApiKeys.OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)
+      )
+    }
+    for (partitionId <- 0 to 1) {
+      commitOffset(
+        groupId = "grp",
+        memberId = memberId,
+        memberEpoch = memberEpoch,
+        topic = "bar",
+        topicId = barTopicId,
+        partition = partitionId,
+        offset = 200L + partitionId,
+        expectedError = Errors.NONE,
+        version = ApiKeys.OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)
+      )
+    }
+
+    (fooTopicId, barTopicId, memberId, memberEpoch)
+  }
+
+  // Validate responses to OffsetFetch when the topic is deleted and topic IDs 
used (version 10+)
+  // The expectation is that the response contains the deleted topic,
+  // with UNKNOWN_TOPIC_ID error at the partition level.
+  @ClusterTest
+  def testFetchOffsetWithDeletedTopicUsingTopicIds(): Unit = {
+    val (fooTopicId, barTopicId, memberId, memberEpoch) = 
setupTopicsJoinAndCommit()
+
+    // Delete the bar topic.
+    deleteTopic("bar")
+
+    // Wait for the deleted topic (bar) to return UNKNOWN_TOPIC_ID error for 
its partitions.
+    // The undeleted topic (foo) should still return its committed offsets.
+    for (version <- 10 to 
ApiKeys.OFFSET_FETCH.latestVersion(isUnstableApiEnabled)) {
+      val expectedResponse = new 
OffsetFetchResponseData.OffsetFetchResponseGroup()
+        .setGroupId("grp")
+        .setTopics(List(
+          new OffsetFetchResponseData.OffsetFetchResponseTopics()
+            .setTopicId(fooTopicId)
+            .setPartitions(List(
+              new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+                .setPartitionIndex(0)
+                .setCommittedOffset(100L),
+              new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+                .setPartitionIndex(1)
+                .setCommittedOffset(101L),
+              new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+                .setPartitionIndex(2)
+                .setCommittedOffset(102L)
+            ).asJava),
+          new OffsetFetchResponseData.OffsetFetchResponseTopics()
+            .setTopicId(barTopicId)
+            .setPartitions(List(
+              new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+                .setPartitionIndex(0)
+                .setCommittedOffset(-1L)
+                .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code),
+              new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+                .setPartitionIndex(1)
+                .setCommittedOffset(-1L)
+                .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
+            ).asJava)
+        ).asJava)
+
+      TestUtils.waitUntilTrue(
+        () => {
+          expectedResponse == fetchOffsets(
+            group = new OffsetFetchRequestData.OffsetFetchRequestGroup()
+              .setGroupId("grp")
+              .setMemberId(memberId)
+              .setMemberEpoch(memberEpoch)
+              .setTopics(List(
+                new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                  .setTopicId(fooTopicId)
+                  .setPartitionIndexes(List[Integer](0, 1, 2).asJava),
+                new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                  .setTopicId(barTopicId)
+                  .setPartitionIndexes(List[Integer](0, 1).asJava)
+              ).asJava),
+            requireStable = true,

Review Comment:
   the difference wasn't intentional, the requireStable is unrelated to these 
new tests really so doesn't make a difference (we're not involving tx offsets). 
Changed this one to have them all false for consistency. Thanks!  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to