lucasbru commented on code in PR #14842:
URL: https://github.com/apache/kafka/pull/14842#discussion_r1407524109
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -269,60 +275,56 @@ void cleanup() {
* completed in time.
*/
// Visible for testing
- void maybeAutoCommitAndLeaveGroup(final Timer timer) {
+ void maybeAutocommitOnClose(final Timer timer) {
if (!requestManagers.coordinatorRequestManager.isPresent())
return;
+ if (!requestManagers.commitRequestManager.isPresent()) {
+ log.error("Expecting a CommitRequestManager but the object was
never initialized. Shutting down.");
+ return;
+ }
+
+ if (!requestManagers.commitRequestManager.get().canAutoCommit()) {
+ return;
+ }
+
ensureCoordinatorReady(timer);
- List<NetworkClientDelegate.UnsentRequest> tasks = closingRequests();
- networkClientDelegate.addAll(tasks);
+ List<NetworkClientDelegate.UnsentRequest> autocommitRequest =
Review Comment:
Let's not use a list here, then we can drop the `get(0)` below. You can
create the list in place in the line below or add a `NetworkClientDelegate.add`
method.
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -796,5 +823,45 @@ private HashMap<TopicPartition, Long>
mockTimestampToSearch() {
timestampToSearch.put(t1, 2L);
return timestampToSearch;
}
+
+ private void prepAutocommitOnClose() {
+ Node node = testBuilder.metadata.fetch().nodes().get(0);
+
testBuilder.client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE,
"group-id", node));
+ if (!testBuilder.subscriptions.allConsumed().isEmpty()) {
+ List<TopicPartition> topicPartitions = new
ArrayList<>(testBuilder.subscriptions.assignedPartitionsList());
+ testBuilder.client.prepareResponse(mockAutocommitResponse(
+ topicPartitions,
+ (short) 1,
+ Errors.NONE).responseBody());
+ }
+ }
+ private ClientResponse mockAutocommitResponse(final List<TopicPartition>
topicPartitions,
Review Comment:
nit: newline in between the methods
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -184,7 +184,7 @@ private static long findMinTime(final Collection<? extends
RequestState> request
* completed future if no request is generated.
*/
public CompletableFuture<Void> maybeAutoCommit(final Map<TopicPartition,
OffsetAndMetadata> offsets) {
- if (!autoCommitState.isPresent()) {
+ if (!canAutoCommit()) {
Review Comment:
maybe `shouldAutoCommit`? Because it's more about being obliged to commit
and not about being able to commit? Also, try to be consistent with
`AutoCommit` vs. `Autocommit`.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -210,17 +210,19 @@ public CompletableFuture<Void>
maybeAutoCommitAllConsumed() {
return maybeAutoCommit(subscriptions.allConsumed());
}
+ boolean canAutoCommit() {
+ return autoCommitState.isPresent() &&
!subscriptions.allConsumed().isEmpty();
+ }
+
/**
- * The consumer needs to send an auto commit during the shutdown if
autocommit is enabled.
+ * Returns an OffsetCommitRequest of all assigned topicPartitions and
their current positions.
*/
- Optional<NetworkClientDelegate.UnsentRequest>
maybeCreateAutoCommitRequest() {
- if (!autoCommitState.isPresent()) {
- return Optional.empty();
- }
-
- OffsetCommitRequestState request =
pendingRequests.createOffsetCommitRequest(subscriptions.allConsumed(), jitter);
+ NetworkClientDelegate.UnsentRequest commitAllConsumedPositions() {
Review Comment:
Again, maybe make clear that this only creates the request. also, in other
methods you use `AllConsumed` and drop the `Positions`, so maybe
`createCommitAllConsumedRequest` ?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##########
@@ -234,11 +235,20 @@ public void
testAutocommit_ResendAutocommitAfterException() {
@Test
public void testAutocommit_EnsureOnlyOneInflightRequest() {
+ TopicPartition t1p = new TopicPartition("topic1", 0);
+ subscriptionState.assignFromUser(singleton(t1p));
+ //subscriptionState.seekUnvalidated(t1p, new
SubscriptionState.FetchPosition(100L));
Review Comment:
nit: maybe remove the commented line if we do not need it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]