adixitconfluent commented on code in PR #17539:
URL: https://github.com/apache/kafka/pull/17539#discussion_r1817933790
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1525,6 +1537,24 @@ private Optional<Throwable> acknowledgeCompleteBatch(
return Optional.empty();
}
+ protected void updateLatestFetchOffsetMetadata(LogOffsetMetadata
fetchOffsetMetadata) {
+ lock.writeLock().lock();
+ try {
+ latestFetchOffsetMetadata = fetchOffsetMetadata;
Review Comment:
Hi @junrao , i am not sure if I understand the approach completely. IIUC,
1. I agree with the common case where during `onComplete` we can update the
file position by directly adding the batch size. But, there are cases where we
return true from `isMinBytesSatisfied` if `fetchOffsetMetadata` was on a
different segment than the `endOffsetMetadata` ([code
reference](https://github.com/apache/kafka/pull/17539/files#diff-d835cdc01e77905316584ce9e6e21a060cb3d36efa717d4b822b16744e4d713aR296)).
In those case, we don't know the number of bytes that got accumulated in the
response. Same goes for the case `fetchOffsetMetadata.messageOffset >
endOffsetMetadata.messageOffset`, how do we know whta is the number of bytes to
add?
2. I am assuming that with this approach, we do not need to do any handling
during acquisition lock timeout/acknowledgements/release acquired records on
session close. Please correct me if I am wrong.
PS - Thanks a lot for taking out the time to explain me and reviewing this
PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]