hachikuji commented on code in PR #17150:
URL: https://github.com/apache/kafka/pull/17150#discussion_r1796078303
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1316,28 +1316,32 @@ public void wakeup() {
*/
@Override
public void commitSync(final Duration timeout) {
- commitSync(subscriptions.allConsumed(), timeout);
+ commitSync(Optional.empty(), timeout);
}
@Override
public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
- commitSync(offsets, Duration.ofMillis(defaultApiTimeoutMs));
+ commitSync(Optional.of(offsets),
Duration.ofMillis(defaultApiTimeoutMs));
}
@Override
public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets,
Duration timeout) {
+ commitSync(Optional.of(offsets), timeout);
+ }
+
+ private void commitSync(Optional<Map<TopicPartition, OffsetAndMetadata>>
offsets, Duration timeout) {
acquireAndEnsureOpen();
long commitStart = time.nanoseconds();
try {
SyncCommitEvent syncCommitEvent = new SyncCommitEvent(offsets,
calculateDeadlineMs(time, timeout));
- CompletableFuture<Void> commitFuture = commit(syncCommitEvent);
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>
commitFuture = commit(syncCommitEvent);
Timer requestTimer = time.timer(timeout.toMillis());
awaitPendingAsyncCommitsAndExecuteCommitCallbacks(requestTimer,
true);
Review Comment:
Not really clear to me what point this serves. Why do we care about the
async commits if they were all sent prior to the sync commit?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]