chia7712 commented on code in PR #17322:
URL: https://github.com/apache/kafka/pull/17322#discussion_r1807825050
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -513,18 +514,27 @@ public long nextFetchOffset() {
* fetched from the leader.
*
* @param memberId The member id of the client that is fetching
the record.
+ * @param maxFetchRecords The maximum number of records that should be
acquired, this is a soft
+ * limit and the method might acquire more
records than the maxFetchRecords,
+ * if the records are already part of the same
fetch batch.
* @param fetchPartitionData The fetched records for the share partition.
* @return The acquired records for the share partition.
*/
- public List<AcquiredRecords> acquire(
+ public ShareAcquiredRecords acquire(
String memberId,
+ int maxFetchRecords,
FetchPartitionData fetchPartitionData
) {
log.trace("Received acquire request for share partition: {}-{}
memberId: {}", groupId, topicIdPartition, memberId);
+ if (maxFetchRecords <= 0) {
+ // Nothing to acquire.
Review Comment:
I guess this is temporary, right? we should not allow `maxFetchRecords<=0`
in the future.
##########
core/src/main/java/kafka/server/share/ShareFetchUtils.java:
##########
@@ -84,24 +88,31 @@ static Map<TopicIdPartition,
ShareFetchResponseData.PartitionData> processFetchR
partitionData.setErrorMessage(Errors.NONE.message());
}
} else {
- List<AcquiredRecords> acquiredRecords =
sharePartition.acquire(shareFetchData.memberId(), fetchPartitionData);
+ ShareAcquiredRecords shareAcquiredRecords =
sharePartition.acquire(shareFetchData.memberId(),
shareFetchData.maxFetchRecords() - acquiredRecordsCount, fetchPartitionData);
log.trace("Acquired records for topicIdPartition: {} with
share fetch data: {}, records: {}",
- topicIdPartition, shareFetchData, acquiredRecords);
+ topicIdPartition, shareFetchData, shareAcquiredRecords);
// Maybe, in the future, check if no records are acquired, and
we want to retry
// replica manager fetch. Depends on the share partition
manager implementation,
// if we want parallel requests for the same share partition
or not.
- if (acquiredRecords.isEmpty()) {
+ if (shareAcquiredRecords.records().isEmpty()) {
partitionData
.setRecords(null)
.setAcquiredRecords(Collections.emptyList());
} else {
partitionData
+ // We set the records to the fetchPartitionData
records. We do not alter the records
+ // fetched from the replica manager as they follow
zero copy buffer. The acquired records
Review Comment:
Out of curiosity, how should users configure max.poll.records to minimize
network I/O waste? Should users set `max.poll.records=Infinite`? If so, should
we consider making an infinite value the default for `max.poll.records`?
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1658,6 +1706,26 @@ private long findLastOffsetAcknowledged() {
return lastOffsetAcknowledged;
}
+ /**
+ * Find the last offset from the batch which contains the request offset.
If found, return the last offset
+ * of the batch, otherwise return the request offset.
+ *
+ * @param batches The batches to search for the request offset.
+ * @param offset The request offset to find.
+ * @return The last offset of the batch which contains the request offset,
otherwise the request offset.
+ */
+ private long findLastOffsetFromBatchWithRequestOffset(
+ Iterable<? extends RecordBatch> batches,
+ long offset
+ ) {
+ for (RecordBatch batch : batches) {
+ if (offset >= batch.baseOffset() && offset <= batch.lastOffset()) {
Review Comment:
`lastOffset` loads the entire batch header, so perhaps we should avoid
checking `lastOffset` for every batch." For example:
```java
RecordBatch previous = null;
for (RecordBatch batch : batches) {
if (offset >= batch.baseOffset()) {
previous = batch;
continue;
}
break;
}
if (previous != null && offset <= previous.lastOffset()) return
previous.lastOffset();
return offset;
```
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -612,16 +634,20 @@ public List<AcquiredRecords> acquire(
.setFirstOffset(inFlightBatch.firstOffset())
.setLastOffset(inFlightBatch.lastOffset())
.setDeliveryCount((short)
inFlightBatch.batchDeliveryCount()));
+ acquiredCount += (int) (inFlightBatch.lastOffset() -
inFlightBatch.firstOffset() + 1);
Review Comment:
I suppose this `acquiredCount may also be inaccurate if the topic is
compacted, right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]