chirag-wadhwa5 commented on code in PR #17573:
URL: https://github.com/apache/kafka/pull/17573#discussion_r1827271659


##########
core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala:
##########
@@ -138,17 +138,23 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     val topicId = topicIds.get(topic)
     val topicIdPartition = new TopicIdPartition(topicId, new 
TopicPartition(topic, partition))
 
+    val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
+
+    // Send the first share fetch request to initialize the share partition
+    var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, 
ShareRequestMetadata.INITIAL_EPOCH)
+    var acknowledgementsMap: Map[TopicIdPartition, 
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
+    var shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap, 10)
+    var shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+
     initProducer()
     // Producing 10 records to the topic created above
     produceData(topicIdPartition, 10)
 
-    val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
-
-    // Send the share fetch request to fetch the records produced above
-    val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, 
ShareRequestMetadata.INITIAL_EPOCH)
-    val acknowledgementsMap: Map[TopicIdPartition, 
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
-    val shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap)
-    val shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+    // Send the second share fetch request to fetch the records produced above
+    metadata = new ShareRequestMetadata(memberId, 
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
+    acknowledgementsMap = Map.empty
+    shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap)
+    shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)

Review Comment:
   Thanks for the review. With the new code for auto.offset.reset config in 
place, the default value is latest. One thing to keep in mind is that the share 
partition for a particular physical partition and share group is loaded in the 
brokers memory as a result of the first share fetch request (maybeInitialize is 
called here).  If a physical partitions has some records on it and we send our 
first fetch request afterwards, then because of the default value of config as 
latest, all these records will be ignored (fetch offset for timestamp will 
return the next offset post the already produced messages). But if I send a 
share fetch request to a partition before any records have been produced to it, 
the offset with latest timestamp will be 0 by default (because the physical 
partition is empty). Once the star offset is initialized as part of the first 
share fetch call, the tests should work fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to