lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1395132794
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ########## @@ -137,39 +135,104 @@ public HeartbeatRequestManager( } /** - * Determines the maximum wait time until the next poll based on the member's state, and creates a heartbeat - * request. + * This will ensure that the member starts sending heartbeats to join the group with the + * updated subscription, if it is not already part of it. If the member is already part of + * the group, this will only ensure that the updated subscription is sent on the next + * heartbeat request. No action will be taken if the member is in a {@link MemberState#FATAL} + * state. + * <p/> + * Note that list of topics of the subscription is taken from the shared subscription state. + */ + public void onSubscriptionUpdated() { + if (membershipManager.state() == MemberState.FATAL) { + logger.debug("No action taken join the group or update the subscription because " + + "the member is in FATAL state"); + return; + } + + if (membershipManager.state() == MemberState.UNSUBSCRIBED) { + membershipManager.transitionToJoining(); + } + } + + /** + * Release assignment and send heartbeat request to leave the group. If the member is not + * part of the group or is in a FATAL state this won't take any action and will return a + * completed future. + * + * @return Future that will complete when the callback execution completes and the heartbeat + * request to leave is sent out. The future will fail it the callback execution fails. + */ + public CompletableFuture<Void> onUnsubscribe() { + boolean notInGroup = + membershipManager.state() == MemberState.UNSUBSCRIBED || + membershipManager.state() == MemberState.FATAL; + if (notInGroup) { + return CompletableFuture.completedFuture(null); + } + // TODO: Consider no-op if member is already LEAVING too (repeated calls to unsubscribe + // potentially storming the broker?). To double check the current behaviour, as it does + // not seem to handle it that way. Review Comment: Done (now in the membership manager `leaveGroup`). No-op if already leaving, and returning the future that will complete when the ongoing leave completes. Also handling the case where the member already left (no-op and return right away) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org