lucasbru commented on code in PR #14842:
URL: https://github.com/apache/kafka/pull/14842#discussion_r1407524109


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -269,60 +275,56 @@ void cleanup() {
      * completed in time.
      */
     // Visible for testing
-    void maybeAutoCommitAndLeaveGroup(final Timer timer) {
+    void maybeAutocommitOnClose(final Timer timer) {
         if (!requestManagers.coordinatorRequestManager.isPresent())
             return;
 
+        if (!requestManagers.commitRequestManager.isPresent()) {
+            log.error("Expecting a CommitRequestManager but the object was 
never initialized. Shutting down.");
+            return;
+        }
+
+        if (!requestManagers.commitRequestManager.get().canAutoCommit()) {
+            return;
+        }
+
         ensureCoordinatorReady(timer);
-        List<NetworkClientDelegate.UnsentRequest> tasks = closingRequests();
-        networkClientDelegate.addAll(tasks);
+        List<NetworkClientDelegate.UnsentRequest> autocommitRequest =

Review Comment:
   Let's not use a list here, then we can drop the `get(0)` below. You can 
create the list in place in the line below or add a `NetworkClientDelegate.add` 
method.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -796,5 +823,45 @@ private HashMap<TopicPartition, Long> 
mockTimestampToSearch() {
         timestampToSearch.put(t1, 2L);
         return timestampToSearch;
     }
+
+    private void prepAutocommitOnClose() {
+        Node node = testBuilder.metadata.fetch().nodes().get(0);
+        
testBuilder.client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE,
 "group-id", node));
+        if (!testBuilder.subscriptions.allConsumed().isEmpty()) {
+            List<TopicPartition> topicPartitions = new 
ArrayList<>(testBuilder.subscriptions.assignedPartitionsList());
+            testBuilder.client.prepareResponse(mockAutocommitResponse(
+                topicPartitions,
+                (short) 1,
+                Errors.NONE).responseBody());
+        }
+    }
+    private ClientResponse mockAutocommitResponse(final List<TopicPartition> 
topicPartitions,

Review Comment:
   nit: newline in between the methods



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -184,7 +184,7 @@ private static long findMinTime(final Collection<? extends 
RequestState> request
      * completed future if no request is generated.
      */
     public CompletableFuture<Void> maybeAutoCommit(final Map<TopicPartition, 
OffsetAndMetadata> offsets) {
-        if (!autoCommitState.isPresent()) {
+        if (!canAutoCommit()) {

Review Comment:
   maybe `shouldAutoCommit`? Because it's more about being obliged to commit 
and not about being able to commit? Also, try to be consistent with 
`AutoCommit` vs. `Autocommit`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -210,17 +210,19 @@ public CompletableFuture<Void> 
maybeAutoCommitAllConsumed() {
         return maybeAutoCommit(subscriptions.allConsumed());
     }
 
+    boolean canAutoCommit() {
+        return autoCommitState.isPresent() && 
!subscriptions.allConsumed().isEmpty();
+    }
+
     /**
-     * The consumer needs to send an auto commit during the shutdown if 
autocommit is enabled.
+     * Returns an OffsetCommitRequest of all assigned topicPartitions and 
their current positions.
      */
-    Optional<NetworkClientDelegate.UnsentRequest> 
maybeCreateAutoCommitRequest() {
-        if (!autoCommitState.isPresent()) {
-            return Optional.empty();
-        }
-
-        OffsetCommitRequestState request = 
pendingRequests.createOffsetCommitRequest(subscriptions.allConsumed(), jitter);
+    NetworkClientDelegate.UnsentRequest commitAllConsumedPositions() {

Review Comment:
   Again, maybe make clear that this only creates the request. also, in other 
methods you use `AllConsumed` and drop the `Positions`, so maybe 
`createCommitAllConsumedRequest` ?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##########
@@ -234,11 +235,20 @@ public void 
testAutocommit_ResendAutocommitAfterException() {
 
     @Test
     public void testAutocommit_EnsureOnlyOneInflightRequest() {
+        TopicPartition t1p = new TopicPartition("topic1", 0);
+        subscriptionState.assignFromUser(singleton(t1p));
+        //subscriptionState.seekUnvalidated(t1p, new 
SubscriptionState.FetchPosition(100L));

Review Comment:
   nit: maybe remove the commented line  if we do not need it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to