apoorvmittal10 commented on code in PR #20246:
URL: https://github.com/apache/kafka/pull/20246#discussion_r2367240145
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1553,12 +1555,15 @@ private ShareAcquiredRecords acquireNewBatchRecords(
// Check how many records can be acquired from the batch.
long lastAcquiredOffset = lastOffset;
+ long maxOffset = firstAcquiredOffset + maxFetchRecords - 1;
if (maxFetchRecords < lastAcquiredOffset - firstAcquiredOffset +
1) {
- // The max records to acquire is less than the complete
available batches hence
- // limit the acquired records. The last offset shall be the
batches last offset
- // which falls under the max records limit. As the max fetch
records is the soft
- // limit, the last offset can be higher than the max records.
- lastAcquiredOffset =
lastOffsetFromBatchWithRequestOffset(batches, firstAcquiredOffset +
maxFetchRecords - 1);
+ ShareAcquireMode mode = acquireMode(acquireMode);
+ lastAcquiredOffset = switch (mode) {
+ case STRICT ->
+ maxOffsetFromFirstBatch(batches, maxOffset);
Review Comment:
> Do you mean that the lastAcquiredOffset should still be aligned on
boundaries?
Yes, that's the default assumption in the code.
> I have considered the following scenario:
A producer sends 1000 records in a single batch.
10 share-consumers are started with share.max.poll.records or
max.poll.records set to 1.
So a batch of 1000 records with 0-999 offsets (say that's the offsets for
the batch were), then a single batch of 0-999 should be created. But the
offsets should be acquired as per the strict limit i.e. if only 1 record is
requested then only 0 offset should be acquired. Aligning to batch boundaries
is simple and that's the assumption in the code as well while doing other
operations.
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -2311,6 +2316,24 @@ long findLastOffsetAcknowledged() {
return lastOffsetAcknowledged;
}
+ /**
+ * Find the maximum offset from the first batch within the request offset
for {@link ShareAcquireMode.STRICT} mode.
+ *
+ * @param batches The batches to search for the request offset.
+ * @param offset The request offset to find.
+ * @return The minimum value between the first batch's last offset and the
request offset.
+ * If no batches are available, returns the original request
offset.
+ */
+ private long maxOffsetFromFirstBatch(
+ Iterable<? extends RecordBatch> batches,
+ long offset
+ ) {
+ for (RecordBatch batch : batches) {
+ return Math.min(batch.lastOffset(), offset);
+ }
+ return offset;
+ }
Review Comment:
This still seems a problem.
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -2311,6 +2321,24 @@ long findLastOffsetAcknowledged() {
return lastOffsetAcknowledged;
}
+ /**
+ * In STRICT mode, only acquire records from the first batch.
Review Comment:
Why it should be the behaviour that only records from first batch should be
acquired?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]