lianetm commented on code in PR #21509:
URL: https://github.com/apache/kafka/pull/21509#discussion_r2828824863
##########
core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala:
##########
@@ -735,6 +735,177 @@ class OffsetFetchRequestTest(cluster: ClusterInstance)
extends GroupCoordinatorB
)
}
+ /**
+ * Helper to set up two topics (foo with 3 partitions, bar with 2
partitions),
+ * join a consumer group, and commit offsets to both topics.
+ */
+ private def setupTopicsJoinAndCommit(): (Uuid, Uuid, String, Int) = {
+ createOffsetsTopic()
+
+ // Create two topics.
+ val fooTopicId = createTopic(topic = "foo", numPartitions = 3)
+ val barTopicId = createTopic(topic = "bar", numPartitions = 2)
+
+ // Join the consumer group.
+ val (memberId, memberEpoch) = joinConsumerGroup("grp", useNewProtocol =
true)
+
+ // Commit offsets for both topics.
+ for (partitionId <- 0 to 2) {
+ commitOffset(
+ groupId = "grp",
+ memberId = memberId,
+ memberEpoch = memberEpoch,
+ topic = "foo",
+ topicId = fooTopicId,
+ partition = partitionId,
+ offset = 100L + partitionId,
+ expectedError = Errors.NONE,
+ version = ApiKeys.OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)
+ )
+ }
+ for (partitionId <- 0 to 1) {
+ commitOffset(
+ groupId = "grp",
+ memberId = memberId,
+ memberEpoch = memberEpoch,
+ topic = "bar",
+ topicId = barTopicId,
+ partition = partitionId,
+ offset = 200L + partitionId,
+ expectedError = Errors.NONE,
+ version = ApiKeys.OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)
+ )
+ }
+
+ (fooTopicId, barTopicId, memberId, memberEpoch)
+ }
+
+ // Validate responses to OffsetFetch when the topic is deleted and topic IDs
used (version 10+)
+ // The expectation is that the response contains the deleted topic,
+ // with UNKNOWN_TOPIC_ID error at the partition level.
+ @ClusterTest
+ def testFetchOffsetWithDeletedTopicUsingTopicIds(): Unit = {
+ val (fooTopicId, barTopicId, memberId, memberEpoch) =
setupTopicsJoinAndCommit()
+
+ // Delete the bar topic.
+ deleteTopic("bar")
+
+ // Wait for offsets to be deleted and reflected in the fetch response as
expected.
+ // The deleted topic (bar) should return UNKNOWN_TOPIC_ID error for its
partitions,
+ // while the existing topic (foo) should return the committed offsets.
+ for (version <- 10 to
ApiKeys.OFFSET_FETCH.latestVersion(isUnstableApiEnabled)) {
+ TestUtils.waitUntilTrue(
+ () => {
+ val response = fetchOffsets(
+ group = new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId("grp")
+ .setMemberId(memberId)
+ .setMemberEpoch(memberEpoch)
+ .setTopics(List(
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setTopicId(fooTopicId)
+ .setPartitionIndexes(List[Integer](0, 1, 2).asJava),
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setTopicId(barTopicId)
+ .setPartitionIndexes(List[Integer](0, 1).asJava)
+ ).asJava),
+ requireStable = true,
+ version = version.toShort
+ )
+
+ // Both topics should be in the response.
+ val hasCorrectTopicCount = response.topics.size == 2
+
+ // Verify foo topic has the committed offsets.
+ val fooResponse = response.topics.asScala.find(_.topicId ==
fooTopicId)
+ val fooValid = fooResponse.exists { foo =>
Review Comment:
Yes, done for both tests
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]