lianetm commented on code in PR #17150:
URL: https://github.com/apache/kafka/pull/17150#discussion_r1829737477
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -381,20 +386,22 @@ private void
autoCommitSyncBeforeRevocationWithRetries(OffsetCommitRequestState
* exceptionally depending on the response. If the request fails with a
retriable error, the
* future will be completed with a {@link RetriableCommitFailedException}.
*/
- public CompletableFuture<Void> commitAsync(final Map<TopicPartition,
OffsetAndMetadata> offsets) {
- if (offsets.isEmpty()) {
+ public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>
commitAsync(final Optional<Map<TopicPartition, OffsetAndMetadata>> offsets) {
+ Map<TopicPartition, OffsetAndMetadata> commitOffsets =
offsets.orElseGet(subscriptions::allConsumed);
+ if (commitOffsets.isEmpty()) {
log.debug("Skipping commit of empty offsets");
- return CompletableFuture.completedFuture(null);
+ return CompletableFuture.completedFuture(Map.of());
}
- OffsetCommitRequestState commitRequest =
createOffsetCommitRequest(offsets, Long.MAX_VALUE);
+ OffsetCommitRequestState commitRequest =
createOffsetCommitRequest(commitOffsets, Long.MAX_VALUE);
pendingRequests.addOffsetCommitRequest(commitRequest);
- CompletableFuture<Void> asyncCommitResult = new CompletableFuture<>();
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>
asyncCommitResult = new CompletableFuture<>();
commitRequest.future.whenComplete((committedOffsets, error) -> {
if (error != null) {
asyncCommitResult.completeExceptionally(commitAsyncExceptionForError(error));
} else {
- asyncCommitResult.complete(null);
+ asyncCommitResult.complete(commitOffsets);
+ maybeUpdateLastSeenEpochIfNewer(commitOffsets);
Review Comment:
actually, thinking more about this, we shouldn't wait for receiving a
response to the request to update the metadata object with the offsets we sent
in the request (before this PR and on the classic consumer, we update metadata
before sending any request). So this call should really go as soon as we
determine the set of `commitOffsets` (after the early return if
(commitOffsets.isEmpty()) maybe?)
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java:
##########
@@ -72,6 +72,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+@SuppressWarnings("ClassDataAbstractionCoupling")
Review Comment:
could you double check if we really need this after the latest changes? (I
don't see many new deps in the changes anymore)
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -439,14 +450,15 @@ private OffsetCommitRequestState
createOffsetCommitRequest(final Map<TopicPartit
}
private void commitSyncWithRetries(OffsetCommitRequestState requestAttempt,
- CompletableFuture<Void> result) {
+ CompletableFuture<Map<TopicPartition,
OffsetAndMetadata>> result) {
pendingRequests.addOffsetCommitRequest(requestAttempt);
// Retry the same commit request while it fails with
RetriableException and the retry
// timeout hasn't expired.
requestAttempt.future.whenComplete((res, error) -> {
if (error == null) {
- result.complete(null);
+ result.complete(requestAttempt.offsets);
+ maybeUpdateLastSeenEpochIfNewer(requestAttempt.offsets);
Review Comment:
related to comment from above, should we move this to `commitSync`, before
we send the request or receive a response
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]