apoorvmittal10 commented on code in PR #17709:
URL: https://github.com/apache/kafka/pull/17709#discussion_r1834148813


##########
core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala:
##########
@@ -247,13 +248,26 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, 
ShareRequestMetadata.INITIAL_EPOCH)
     val acknowledgementsMap: Map[TopicIdPartition, 
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
     val shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap)
-    val shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
 
-    val shareFetchResponseData = shareFetchResponse.data()
-    assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
-    assertEquals(1, shareFetchResponseData.responses().size())
-    assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
-    assertEquals(3, 
shareFetchResponseData.responses().get(0).partitions().size())
+    // For the multi partition fetch request, the response may not be 
available in the first attempt
+    // as the share partitions might not be initialized yet. So, we retry 
until we get the response.
+    var responses = Seq[ShareFetchResponseData.PartitionData]()
+    TestUtils.waitUntilTrue(() => {
+      val shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+      val shareFetchResponseData = shareFetchResponse.data()
+      assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+      val partitionsCount = 
shareFetchResponseData.responses().get(0).partitions().size()
+      if (partitionsCount > 0) {
+        assertEquals(1, shareFetchResponseData.responses().size())
+        assertEquals(topicId, 
shareFetchResponseData.responses().get(0).topicId())
+        
shareFetchResponseData.responses().get(0).partitions().foreach(partitionData => 
{
+          if (!partitionData.acquiredRecords().isEmpty) {
+            responses = responses :+ partitionData
+          }
+        })
+      }
+      responses.size == 3

Review Comment:
   We shouldn't as the problem we are trying to solve here is that when we 
enable DefaultStatePersister then we do see a delay in SharePartition getting 
initialized, which is supposed to happen. And with multi topic-partition share 
fetch call, say tp0 and tp1, there can be scenario where tp0 is initialized and 
triggers purgatory's checkAndComplete. Hence share fetch will respond with 
acquired records of tp0 only.
   
    I have added the retires here where the test case is considered successful 
when all topic-partitions, tp0 and tp1 in this case, respond with acquired 
records.
   
   Prior adding topic-partitions in response array I check if the share fetch 
response does have acquired records or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to