apoorvmittal10 commented on code in PR #17709: URL: https://github.com/apache/kafka/pull/17709#discussion_r1834148813
########## core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala: ########## @@ -247,13 +248,26 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH) val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty val shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap) - val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest) - val shareFetchResponseData = shareFetchResponse.data() - assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) - assertEquals(1, shareFetchResponseData.responses().size()) - assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId()) - assertEquals(3, shareFetchResponseData.responses().get(0).partitions().size()) + // For the multi partition fetch request, the response may not be available in the first attempt + // as the share partitions might not be initialized yet. So, we retry until we get the response. + var responses = Seq[ShareFetchResponseData.PartitionData]() + TestUtils.waitUntilTrue(() => { + val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest) + val shareFetchResponseData = shareFetchResponse.data() + assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) + val partitionsCount = shareFetchResponseData.responses().get(0).partitions().size() + if (partitionsCount > 0) { + assertEquals(1, shareFetchResponseData.responses().size()) + assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId()) + shareFetchResponseData.responses().get(0).partitions().foreach(partitionData => { + if (!partitionData.acquiredRecords().isEmpty) { + responses = responses :+ partitionData + } + }) + } + responses.size == 3 Review Comment: We shouldn't as the problem we are trying to solve here is that when we enable DefaultStatePersister then we do see a delay in SharePartition getting initialized, which is supposed to happen. And with multi topic-partition share fetch call, say tp0 and tp1, there can be scenario where tp0 is initialized and triggers purgatory's checkAndComplete. Hence share fetch will respond with acquired records of tp0 only. I have added the retires here where the test case is considered successful when all topic-partitions, tp0 and tp1 in this case, respond with acquired records. Prior adding topic-partitions in response array I check if the share fetch response does have acquired records or not. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org