kirktrue commented on code in PR #17614:
URL: https://github.com/apache/kafka/pull/17614#discussion_r1904733639
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1248,14 +1251,36 @@ public void close(Duration timeout) {
if (!closed) {
// need to close before setting the flag since the close
function
// itself may trigger rebalance callback that needs the
consumer to be open still
- close(timeout, false);
+ close(CloseOption.timeout(timeout));
}
} finally {
closed = true;
release();
}
}
+ @Override
+ public void close(CloseOption option) {
+ CloseOptionInternal closeOption = new CloseOptionInternal(option);
+ Duration timeout =
closeOption.timeout().orElse(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS));
+
+ if (timeout.toMillis() < 0)
+ throw new IllegalArgumentException("The timeout cannot be
negative.");
+
+ acquire();
+ try {
+ if (!closed) {
+ // need to close before setting the flag since the close
function
+ // itself may trigger rebalance callback that needs the
consumer to be open still
+ close(timeout, closeOption.groupMembershipOperation(), false);
+ }
+ } finally {
+ closed = true;
+ release();
+ }
+ }
+
Review Comment:
Nit: extra newline.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -79,6 +80,11 @@ public abstract class AbstractMembershipManager<R extends
AbstractResponse> impl
*/
protected final String groupId;
+ /**
+ * Group instance ID to be used by the member, provided when creating the
current membership manager.
+ */
+ protected final Optional<String> groupInstanceId;
Review Comment:
Per my comment on `pollOnClose()`, it looks like there's perhaps another way
to structure the code in `pollOnClose()` so the group instance ID doesn't need
to be exposed here?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java:
##########
@@ -221,7 +222,17 @@ public NetworkClientDelegate.PollResult poll(long
currentTimeMs) {
*/
@Override
public PollResult pollOnClose(long currentTimeMs) {
- if (membershipManager().isLeavingGroup()) {
+ AbstractMembershipManager<R> membershipManager = membershipManager();
+ GroupMembershipOperation leaveGroupOperation =
membershipManager.leaveGroupOperation();
+
+ if (membershipManager.isLeavingGroup() &&
+ // Default operation: both static and dynamic consumers will send
a leave heartbeat
+ (GroupMembershipOperation.DEFAULT == leaveGroupOperation ||
+ // Leave operation: both static and dynamic consumers will
send a leave heartbeat
+ GroupMembershipOperation.LEAVE_GROUP == leaveGroupOperation ||
+ // Remain in group: only static consumers will send a leave
heartbeat, while dynamic members will not
+ membershipManager.groupInstanceId().isPresent())
+ ) {
Review Comment:
Reminder on this question.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]