apoorvmittal10 commented on code in PR #19010:
URL: https://github.com/apache/kafka/pull/19010#discussion_r1966868571
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -615,13 +615,44 @@ public long nextFetchOffset() {
* Acquire the fetched records for the share partition. The acquired
records are added to the
* in-flight records and the next fetch offset is updated to the next
offset that should be
* fetched from the leader.
+ * <p>
+ * The method always acquire the full batch records. The cache state can
consist of multiple
+ * full batches as a single batch. This behavior is driven by client
configurations (batch size
+ * and max fetch records) and allows for efficient client
acknowledgements. However, partial batches
+ * can exist in the cache only after a leader change and partial
acknowledgements have been persisted
+ * prior leader change. In such case, when a share partition loses track
of a batch's start and
+ * end offsets (e.g., after a leader change and partial acknowledgements),
the cache stores the
+ * batch based on the offset range provided by the persister. This method
handles these special
+ * batches by maintaining this range up to the last offset returned by the
persister. No special
+ * handling is required after wards; the cache will eventually return to
managing full batches.
+ * <p>
+ * For compacted topics, batches may be non-contiguous, and records within
cached batches may contain gaps.
+ * Because this method operates at the batch level, it acquires entire
batches and relies on the
+ * client to report any gaps in the data. Whether non-contiguous batches
are acquired depends on
+ * the first and last offsets of the fetched batches. Batches outside of
this boundary will never
+ * be acquired. For instance, if fetched batches cover offsets [0-9 and
20-29], and the configured
+ * batch size and maximum fetch records are large enough (greater than 30
in this example), the
+ * intervening batch [10-19] will be acquired. Since full fetched batch is
acquired, the client is
+ * responsible for reporting any data gaps. However, if the [0-9] and
[20-29] ranges are fetched
+ * in separate calls to this method, the [10-19] batch will not be
acquired and cannot exist in
+ * the cache.
+ * <p>
+ * However, for compacted topics, previously acquired batches (e.g., due
to acquisition lock timeout
+ * or explicit client release) might become available for acquisition
again. But subsequent fetches
+ * may reveal that these batches, or parts of them, have been removed by
compaction. Because this
+ * method works with whole batches, the disappearance of individual
offsets within a batch requires
+ * no special handling; the batch will be re-acquired, and the client will
report the gaps. But if
+ * an entire batch has been compacted away, this method must archive it in
the cache to allow the
+ * Share Partition Start Offset (SPSO) to progress. This is accomplished
by comparing the fetchOffset
+ * (the offset from which the log was read) with the first base offset of
the fetch response. Any
+ * batches from fetchOffset to first base offset of the fetch response are
archived.
*
* @param memberId The member id of the client that is fetching
the record.
* @param batchSize The number of records per acquired records
batch.
* @param maxFetchRecords The maximum number of records that should be
acquired, this is a soft
* limit and the method might acquire more
records than the maxFetchRecords,
* if the records are already part of the same
fetch batch.
-* * @param fetchOffset The fetch offset for which the records are
fetched.
+ * @param fetchOffset The fetch offset for which the records are
fetched.
Review Comment:
My bad, corrected.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]