philipnee commented on code in PR #14842:
URL: https://github.com/apache/kafka/pull/14842#discussion_r1406789593
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -206,21 +206,21 @@ public CompletableFuture<Void> maybeAutoCommit(final
Map<TopicPartition, OffsetA
* @return Future that will complete when a response is received for the
request, or a
* completed future if no request is generated.
*/
- public CompletableFuture<Void> maybeAutoCommitAllConsumed() {
+ public CompletableFuture<Void> maybeAutoCommit() {
return maybeAutoCommit(subscriptions.allConsumed());
}
+ boolean canAutoCommit() {
+ return autoCommitState.isPresent() &&
!subscriptions.allConsumed().isEmpty();
+ }
+
/**
- * The consumer needs to send an auto commit during the shutdown if
autocommit is enabled.
+ * Return an OffsetCommitRequest of all assigned topicPartitions and their
current positions.
*/
- Optional<NetworkClientDelegate.UnsentRequest>
maybeCreateAutoCommitRequest() {
- if (!autoCommitState.isPresent()) {
- return Optional.empty();
- }
-
+ NetworkClientDelegate.UnsentRequest commitAllConsumedPositions() {
OffsetCommitRequestState request =
pendingRequests.createOffsetCommitRequest(subscriptions.allConsumed(), jitter);
request.future.whenComplete(autoCommitCallback(subscriptions.allConsumed()));
- return Optional.of(request.toUnsentRequest());
+ return request.toUnsentRequest();
Review Comment:
We should always return a request because I moved that check out of it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]