kirktrue commented on code in PR #17614:
URL: https://github.com/apache/kafka/pull/17614#discussion_r1904733639


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1248,14 +1251,36 @@ public void close(Duration timeout) {
             if (!closed) {
                 // need to close before setting the flag since the close 
function
                 // itself may trigger rebalance callback that needs the 
consumer to be open still
-                close(timeout, false);
+                close(CloseOption.timeout(timeout));
             }
         } finally {
             closed = true;
             release();
         }
     }
 
+    @Override
+    public void close(CloseOption option) {
+        CloseOptionInternal closeOption = new CloseOptionInternal(option);
+        Duration timeout = 
closeOption.timeout().orElse(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS));
+
+        if (timeout.toMillis() < 0)
+            throw new IllegalArgumentException("The timeout cannot be 
negative.");
+
+        acquire();
+        try {
+            if (!closed) {
+                // need to close before setting the flag since the close 
function
+                // itself may trigger rebalance callback that needs the 
consumer to be open still
+                close(timeout, closeOption.groupMembershipOperation(), false);
+            }
+        } finally {
+            closed = true;
+            release();
+        }
+    }
+

Review Comment:
   Nit: extra newline.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -79,6 +80,11 @@ public abstract class AbstractMembershipManager<R extends 
AbstractResponse> impl
      */
     protected final String groupId;
 
+    /**
+     * Group instance ID to be used by the member, provided when creating the 
current membership manager.
+     */
+    protected final Optional<String> groupInstanceId;

Review Comment:
   Per my comment on `pollOnClose()`, it looks like there's perhaps another way 
to structure the code in `pollOnClose()` so the group instance ID doesn't need 
to be exposed here?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java:
##########
@@ -221,7 +222,17 @@ public NetworkClientDelegate.PollResult poll(long 
currentTimeMs) {
      */
     @Override
     public PollResult pollOnClose(long currentTimeMs) {
-        if (membershipManager().isLeavingGroup()) {
+        AbstractMembershipManager<R> membershipManager = membershipManager();
+        GroupMembershipOperation leaveGroupOperation = 
membershipManager.leaveGroupOperation();
+
+        if (membershipManager.isLeavingGroup() &&
+            // Default operation: both static and dynamic consumers will send 
a leave heartbeat
+            (GroupMembershipOperation.DEFAULT == leaveGroupOperation ||
+                // Leave operation: both static and dynamic consumers will 
send a leave heartbeat
+                GroupMembershipOperation.LEAVE_GROUP == leaveGroupOperation ||
+                // Remain in group: only static consumers will send a leave 
heartbeat, while dynamic members will not
+                membershipManager.groupInstanceId().isPresent())
+        ) {

Review Comment:
   Reminder on this question.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to