kirktrue commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1379392210


##########
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##########
@@ -729,6 +729,13 @@ protected MetadataRequest.Builder 
newMetadataRequestBuilderForNewTopics() {
         return null;
     }
 
+    /**
+     * @return Mapping from topic IDs to topic names for all topics in the 
cache.
+     */
+    public synchronized Map<Uuid, String> topicNames() {

Review Comment:
   Now that I'm noticing, is this a public API violation? It's not in 
`internals` and we're adding a `public` method 🤔



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java:
##########
@@ -73,30 +84,55 @@ public interface MembershipManager {
     /**
      * @return Current assignment for the member.
      */
-    ConsumerGroupHeartbeatResponseData.Assignment currentAssignment();
+    Set<TopicPartition> currentAssignment();
 
     /**
-     * Update the assignment for the member, indicating that the provided 
assignment is the new
-     * current assignment.
+     * @return Assignment that the member received from the server but hasn't 
finished processing
+     * yet.
      */
-    void 
onTargetAssignmentProcessComplete(ConsumerGroupHeartbeatResponseData.Assignment 
assignment);
+    Optional<ConsumerGroupHeartbeatResponseData.Assignment> targetAssignment();

Review Comment:
   `targetAssignment()` looks to only be used by unit tests at the moment. Does 
it make sense to remove it from the interface and leave it as a method on the 
implementation only?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -260,42 +743,16 @@ public Optional<String> serverAssignor() {
      * {@inheritDoc}
      */
     @Override
-    public ConsumerGroupHeartbeatResponseData.Assignment currentAssignment() {
+    public Set<TopicPartition> currentAssignment() {
         return this.currentAssignment;
     }
 
 
     /**
      * @return Assignment that the member received from the server but hasn't 
completely processed
-     * yet. Visible for testing.

Review Comment:
   I made a comment up above about removing `targetAssignment()` from the 
`MembershipManager` interface because it was only used for testing. Does the 
removal of this statement imply that it will be used in non-testing later?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java:
##########
@@ -24,21 +24,34 @@
 public enum MemberState {
 
     /**
-     * Member has not joined a consumer group yet, or has been fenced and 
needs to re-join.
+     * Member is not part of the group. This could be the case when it has 
never joined (no call
+     * has been made to the subscribe API), or when the member intentionally 
leaves the group
+     * after a call to the unsubscribe API.
      */
-    UNJOINED,
+    NOT_IN_GROUP,
+
+    /**
+     * Member is attempting to join a consumer group. This could be the case 
when joining for the
+     * first time, or when it has been fenced and tries to re-join.
+     */
+    JOINING,
 
     /**
      * Member has received a new target assignment (partitions could have been 
assigned or
      * revoked), and it is processing it. While in this state, the member will
      * invoke the user callbacks for onPartitionsAssigned or 
onPartitionsRevoked, and then make
      * the new assignment effective.
      */
-    // TODO: determine if separate state will be needed for assign/revoke (not 
for now)
     RECONCILING,
 
     /**
-     * Member is active in a group (heartbeating) and has processed all 
assignments received.
+     * Member has completed reconciling an assignment received, and stays in 
this state until the
+     * next heartbeat request is sent out to acknowledge the assignment to the 
server.

Review Comment:
   As I understand, this is saying the consumer will leave the 
`ACKNOWLEDGING_RECONCILED_ASSIGNMENT` state as soon as the next heartbeat is 
sent off, rather than the next heartbeat is received, right? What happens if 
that heartbeat request gets lost?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java:
##########
@@ -24,21 +24,34 @@
 public enum MemberState {
 
     /**
-     * Member has not joined a consumer group yet, or has been fenced and 
needs to re-join.
+     * Member is not part of the group. This could be the case when it has 
never joined (no call
+     * has been made to the subscribe API), or when the member intentionally 
leaves the group
+     * after a call to the unsubscribe API.
      */
-    UNJOINED,
+    NOT_IN_GROUP,

Review Comment:
   This is the case where the consumer is not in a group _presently_, right? A 
consumer without a configured group ID wouldn't get to this point, would it?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java:
##########
@@ -73,30 +84,55 @@ public interface MembershipManager {
     /**
      * @return Current assignment for the member.
      */
-    ConsumerGroupHeartbeatResponseData.Assignment currentAssignment();
+    Set<TopicPartition> currentAssignment();
 
     /**
-     * Update the assignment for the member, indicating that the provided 
assignment is the new
-     * current assignment.
+     * @return Assignment that the member received from the server but hasn't 
finished processing
+     * yet.
      */
-    void 
onTargetAssignmentProcessComplete(ConsumerGroupHeartbeatResponseData.Assignment 
assignment);
+    Optional<ConsumerGroupHeartbeatResponseData.Assignment> targetAssignment();
 
     /**
-     * Transition the member to the FENCED state and update the member info as 
required. This is
-     * only invoked when the heartbeat returns a FENCED_MEMBER_EPOCH or 
UNKNOWN_MEMBER_ID error.
-     * code.
+     * Transition to the {@link MemberState#JOINING} state, indicating that 
the member will
+     * try to join the group on the next heartbeat request. This is expected 
to be invoked when
+     * the user calls the subscribe API, or when the member wants to rejoin 
after getting fenced.
+     */
+    void transitionToJoinGroup();
+
+    /**
+     * Transition the member to the FENCED state, where the member will 
release the assignment by
+     * calling the onPartitionsLost callback, and when the callback completes, 
it will transition
+     * to {@link MemberState#JOINING} to rejoin the group. This is expected to 
be invoked when
+     * the heartbeat returns a FENCED_MEMBER_EPOCH or UNKNOWN_MEMBER_ID error.
      */
     void transitionToFenced();
 
     /**
      * Transition the member to the FAILED state and update the member info as 
required. This is
      * invoked when un-recoverable errors occur (ex. when the heartbeat 
returns a non-retriable
-     * error or when errors occur while executing the user-provided callbacks)
+     * error)
      */
     void transitionToFailed();
 
     /**
-     * @return True if the member should send heartbeat to the coordinator.
+     * Release assignment and transition to {@link MemberState#LEAVING_GROUP} 
so that a heartbeat
+     * request is sent indicating the broker that the member wants to leave 
the group. This is
+     * expected to be invoked when the user calls the unsubscribe API.
+     *
+     * @return Future that will complete when the callback execution completes 
and the heartbeat
+     * to leave the group has been sent out.
+     */
+    CompletableFuture<Void> leaveGroup();

Review Comment:
   Can this be called directly via the `ApplicationEventProcessor` when the 
consumer sends an event to the network thread to state it is closing? The only 
other place I see it called at the moment is from a unit test.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java:
##########
@@ -48,26 +61,50 @@ public enum MemberState {
      * {@link org.apache.kafka.common.protocol.Errors#FENCED_MEMBER_EPOCH} 
error from the
      * broker. This is a recoverable state, where the member
      * gives up its partitions by invoking the user callbacks for 
onPartitionsLost, and then
-     * transitions to {@link #UNJOINED} to rejoin the group as a new member.
+     * transitions to {@link #JOINING} to rejoin the group as a new member.
      */
     FENCED,
 
     /**
-     * The member failed with an unrecoverable error
+     * The member transitions to this state when it is leaving the group after 
a call to
+     * unsubscribe. It stays in this state while releasing its assignment 
(calling user's callback
+     * for partitions revoked or lost), until the callback completes and a 
heartbeat request is
+     * sent out to effectively leave the group (without waiting for a 
response).
+     */
+    LEAVING_GROUP,
+
+    /**
+     * Member has completed releasing its assignment, and stays in this state 
until the next
+     * heartbeat request is sent out to leave the group.
      */
-    FAILED;
+    SENDING_LEAVE_REQUEST,

Review Comment:
   Sorry to retread this: how are `LEAVING_GROUP` and `SENDING_LEAVE_REQUEST` 
different? They both will call the `onPartitionsLost()` callback first, right?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java:
##########
@@ -48,26 +61,50 @@ public enum MemberState {
      * {@link org.apache.kafka.common.protocol.Errors#FENCED_MEMBER_EPOCH} 
error from the
      * broker. This is a recoverable state, where the member
      * gives up its partitions by invoking the user callbacks for 
onPartitionsLost, and then
-     * transitions to {@link #UNJOINED} to rejoin the group as a new member.
+     * transitions to {@link #JOINING} to rejoin the group as a new member.
      */
     FENCED,
 
     /**
-     * The member failed with an unrecoverable error
+     * The member transitions to this state when it is leaving the group after 
a call to
+     * unsubscribe. It stays in this state while releasing its assignment 
(calling user's callback
+     * for partitions revoked or lost), until the callback completes and a 
heartbeat request is
+     * sent out to effectively leave the group (without waiting for a 
response).
+     */
+    LEAVING_GROUP,
+
+    /**
+     * Member has completed releasing its assignment, and stays in this state 
until the next
+     * heartbeat request is sent out to leave the group.
      */
-    FAILED;
+    SENDING_LEAVE_REQUEST,
 
+    /**
+     * The member failed with an unrecoverable error.
+     */
+    FATAL;
+
+    /**
+     * Valid state transitions
+     */
     static {
-        // Valid state transitions
-        STABLE.previousValidStates = Arrays.asList(UNJOINED, RECONCILING);
 
-        RECONCILING.previousValidStates = Arrays.asList(STABLE, UNJOINED);
+        STABLE.previousValidStates = Arrays.asList(JOINING, 
ACKNOWLEDGING_RECONCILED_ASSIGNMENT);
+
+        RECONCILING.previousValidStates = Arrays.asList(STABLE, JOINING);

Review Comment:
   Does the consumer transition from `JOIN` to `RECONCILING` mean that the 
first heartbeat response after the join request will (may?) contain an 
assignment?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -170,8 +229,11 @@ public void updateState(ConsumerGroupHeartbeatResponseData 
response) {
         ConsumerGroupHeartbeatResponseData.Assignment assignment = 
response.assignment();
         if (assignment != null) {
             setTargetAssignment(assignment);
+            transitionTo(MemberState.RECONCILING);
+            reconcile(targetAssignment.get());

Review Comment:
   The IDE is showing this line with a warning because it's invoking `get()` 
without a `isPresent()` check. I know that it's being set in 
`setTargetAssignment` right above, but can we refactor this code to make it 
more obvious to the compiler (and any humans reading it)?
   
   Here's a quick take:
   
    ```suggestion
               // Take new target assignment received from the server and set 
it as targetAssignment
               // to be processed. Following the consumer group protocol, the 
server won't send a
               // new target member while a previous one hasn't been 
acknowledged by the member, so
               // this will fail if a target assignment already exists.
               if (targetAssignment.isPresent()) {
                   transitionToFailed();
                   throw new IllegalStateException("Cannot set new target 
assignment because a " +
                       "previous one pending to be reconciled already exists.");
               }
   
               log.info("Member {} accepted new target assignment {} to 
reconcile", memberId, assignment);
               transitionTo(MemberState.RECONCILING);
               targetAssignment = Optional.of(assignment);
               reconcile(assignment);
   ```
   
   That way all the logic is together and we can remove 
`setTargetAssignment()`, too. Just a thought.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -169,6 +191,24 @@ CompletableFuture<Void> sendAutoCommit(final 
Map<TopicPartition, OffsetAndMetada
                 });
     }
 
+    /**
+     * @return True if auto-commit is enabled as defined in the config {@link 
ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG}
+     */
+    public boolean autoCommitEnabled() {
+        return autoCommitState.isPresent();
+    }
+
+    /**
+     * Reset the auto-commit timer so that the next auto-commit is sent out on 
the interval
+     * starting from now. If auto-commit is not enabled this will perform no 
action.
+     */
+    public void resetAutoCommitTimer() {
+        if (!autoCommitState.isPresent()) {
+            return;
+        }
+        autoCommitState.get().resetTimer();

Review Comment:
   Nit: more idiomatic:
   
   ```suggestion
           autoCommitState.ifPresent(AutoCommitState::resetTimer);
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -181,33 +243,461 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
     public void transitionToFenced() {
         resetEpoch();
         transitionTo(MemberState.FENCED);
+
+        // Release assignment
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+        callbackResult.whenComplete((result, error) -> {
+            if (error != null) {
+                log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+                        "after member got fenced. Member will rejoin the group 
anyways.", error);
+            }
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+            transitionToJoinGroup();
+        });
     }
 
     /**
      * {@inheritDoc}
      */
     @Override
     public void transitionToFailed() {
-        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-        transitionTo(MemberState.FAILED);
+        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+        // Update epoch to indicate that the member is not in the group 
anymore, so that the
+        // onPartitionsLost is called to release assignment.
+        memberEpoch = -1;
+        invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+
+        transitionTo(MemberState.FATAL);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void transitionToJoinGroup() {
+        resetEpoch();
+        transitionTo(MemberState.JOINING);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public CompletableFuture<Void> leaveGroup() {
+        transitionTo(MemberState.LEAVING_GROUP);
+
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+        callbackResult.whenComplete((result, error) -> {
+
+            // Clear the subscription, no matter if the callback execution 
failed or succeeded.
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+
+            // Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+            // group (even in the case where the member had no assignment to 
release or when the
+            // callback execution failed.)
+            transitionToSendingLeaveGroup();
+
+        });
+
+        // Return callback future to indicate that the leave group is done 
when the callbacks
+        // complete, without waiting for the heartbeat to be sent out. (Best 
effort to send it
+        // but do not hold the leave group operation for it)
+        return callbackResult;
     }
 
+    /**
+     * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+     * onPartitionsLost.
+     * <ul>
+     *     <li>If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+     *     This will be the case when releasing assignment because the member 
is intentionally
+     *     leaving the group (after a call to unsubscribe)</li>
+     *
+     *     <li>If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+     *     This will be the case when releasing assignment after being fenced 
.</li>
+     * </ul>
+     *
+     * @return Future that will complete when the callback execution completes.
+     */
+    private CompletableFuture<Void> 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+        SortedSet<TopicPartition> droppedPartitions = new 
TreeSet<>(COMPARATOR);
+        droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+        CompletableFuture<Void> callbackResult;
+        if (droppedPartitions.isEmpty()) {
+            // No assignment to release
+            callbackResult = CompletableFuture.completedFuture(null);
+        } else {
+            // Release assignment
+            if (memberEpoch > 0) {
+                // Member is part of the group. Invoke onPartitionsRevoked.
+                callbackResult = revokePartitions(droppedPartitions);
+            } else {
+                // Member is not part of the group anymore. Invoke 
onPartitionsLost.
+                callbackResult = 
invokeOnPartitionsLostCallback(droppedPartitions);
+            }
+        }
+        return callbackResult;
+    }
+
+    /**
+     * Reset member epoch to the value required for the leave the group 
heartbeat request, and
+     * transition to the {@link MemberState#SENDING_LEAVE_REQUEST} state so 
that a heartbeat
+     * request is sent out with it.
+     */
+    private void transitionToSendingLeaveGroup() {
+        memberEpoch = leaveGroupEpoch();
+        currentAssignment = new HashSet<>();
+        targetAssignment = Optional.empty();
+        transitionTo(MemberState.SENDING_LEAVE_REQUEST);
+    }
+
+    /**
+     * Return the epoch to use in the Heartbeat request to indicate that the 
member wants to
+     * leave the group. Should be -2 if this is a static member, or -1 in any 
other case.
+     */
+    private int leaveGroupEpoch() {
+        return groupInstanceId.isPresent() ? -2 : -1;

Review Comment:
   Same here, regarding the magic numbers.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -149,15 +149,17 @@ public HeartbeatRequestManager(
      */
     @Override
     public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
-        if (!coordinatorRequestManager.coordinator().isPresent() || 
!membershipManager.shouldSendHeartbeat())
+        if (!coordinatorRequestManager.coordinator().isPresent() || 
membershipManager.shouldSkipHeartbeat())
             return NetworkClientDelegate.PollResult.EMPTY;
 
-        // TODO: We will need to send a heartbeat response after partitions 
being revoke. This needs to be
-        //  implemented either with or after the partition reconciliation 
logic.
-        if (!heartbeatRequestState.canSendRequest(currentTimeMs))
+        boolean heartbeatNow = membershipManager.shouldHeartbeatNow() && 
!heartbeatRequestState.requestInFlight();
+
+        if (!heartbeatRequestState.canSendRequest(currentTimeMs) && 
!heartbeatNow) {
             return new 
NetworkClientDelegate.PollResult(heartbeatRequestState.nextHeartbeatMs(currentTimeMs));
+        }
 
-        this.heartbeatRequestState.onSendAttempt(currentTimeMs);
+        heartbeatRequestState.onSendAttempt(currentTimeMs);
+        membershipManager.onHeartbeatRequestSent();

Review Comment:
   I'm probably confused by taking the naming of the `onHeartbeatRequestSent` 
method too literally, but the request hasn't been _sent_, only _enqueued_. Do 
we need to call this when it's really _sent_ or is enqueued "good enough?"



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -181,33 +243,461 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
     public void transitionToFenced() {
         resetEpoch();
         transitionTo(MemberState.FENCED);
+
+        // Release assignment
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+        callbackResult.whenComplete((result, error) -> {
+            if (error != null) {
+                log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+                        "after member got fenced. Member will rejoin the group 
anyways.", error);
+            }
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+            transitionToJoinGroup();
+        });
     }
 
     /**
      * {@inheritDoc}
      */
     @Override
     public void transitionToFailed() {
-        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-        transitionTo(MemberState.FAILED);
+        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+        // Update epoch to indicate that the member is not in the group 
anymore, so that the
+        // onPartitionsLost is called to release assignment.
+        memberEpoch = -1;
+        invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+
+        transitionTo(MemberState.FATAL);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void transitionToJoinGroup() {
+        resetEpoch();
+        transitionTo(MemberState.JOINING);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public CompletableFuture<Void> leaveGroup() {
+        transitionTo(MemberState.LEAVING_GROUP);
+
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+        callbackResult.whenComplete((result, error) -> {
+
+            // Clear the subscription, no matter if the callback execution 
failed or succeeded.
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+
+            // Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+            // group (even in the case where the member had no assignment to 
release or when the
+            // callback execution failed.)
+            transitionToSendingLeaveGroup();
+
+        });
+
+        // Return callback future to indicate that the leave group is done 
when the callbacks
+        // complete, without waiting for the heartbeat to be sent out. (Best 
effort to send it
+        // but do not hold the leave group operation for it)
+        return callbackResult;
     }
 
+    /**
+     * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+     * onPartitionsLost.
+     * <ul>
+     *     <li>If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+     *     This will be the case when releasing assignment because the member 
is intentionally
+     *     leaving the group (after a call to unsubscribe)</li>
+     *
+     *     <li>If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+     *     This will be the case when releasing assignment after being fenced 
.</li>
+     * </ul>
+     *
+     * @return Future that will complete when the callback execution completes.
+     */
+    private CompletableFuture<Void> 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+        SortedSet<TopicPartition> droppedPartitions = new 
TreeSet<>(COMPARATOR);
+        droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+        CompletableFuture<Void> callbackResult;
+        if (droppedPartitions.isEmpty()) {
+            // No assignment to release
+            callbackResult = CompletableFuture.completedFuture(null);
+        } else {
+            // Release assignment
+            if (memberEpoch > 0) {
+                // Member is part of the group. Invoke onPartitionsRevoked.
+                callbackResult = revokePartitions(droppedPartitions);
+            } else {
+                // Member is not part of the group anymore. Invoke 
onPartitionsLost.
+                callbackResult = 
invokeOnPartitionsLostCallback(droppedPartitions);
+            }
+        }
+        return callbackResult;
+    }
+
+    /**
+     * Reset member epoch to the value required for the leave the group 
heartbeat request, and
+     * transition to the {@link MemberState#SENDING_LEAVE_REQUEST} state so 
that a heartbeat
+     * request is sent out with it.
+     */
+    private void transitionToSendingLeaveGroup() {
+        memberEpoch = leaveGroupEpoch();
+        currentAssignment = new HashSet<>();
+        targetAssignment = Optional.empty();
+        transitionTo(MemberState.SENDING_LEAVE_REQUEST);
+    }
+
+    /**
+     * Return the epoch to use in the Heartbeat request to indicate that the 
member wants to
+     * leave the group. Should be -2 if this is a static member, or -1 in any 
other case.
+     */
+    private int leaveGroupEpoch() {
+        return groupInstanceId.isPresent() ? -2 : -1;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
     @Override
-    public boolean shouldSendHeartbeat() {
-        return state() != MemberState.FAILED;
+    public boolean shouldHeartbeatNow() {
+        return state() == MemberState.ACKNOWLEDGING_RECONCILED_ASSIGNMENT ||
+                state() == MemberState.SENDING_LEAVE_REQUEST;
     }
 
     /**
-     * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
-     * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+     * {@inheritDoc}
      */
-    private boolean maybeTransitionToStable() {
-        if (!hasPendingTargetAssignment()) {
+    @Override
+    public void onHeartbeatRequestSent() {
+        if (state() == MemberState.ACKNOWLEDGING_RECONCILED_ASSIGNMENT) {
             transitionTo(MemberState.STABLE);
+        } else if (state() == MemberState.SENDING_LEAVE_REQUEST) {
+            transitionTo(MemberState.NOT_IN_GROUP);
+        }
+    }
+
+    @Override
+    public boolean shouldSkipHeartbeat() {
+        return state() == MemberState.NOT_IN_GROUP || state() == 
MemberState.FATAL;
+    }
+
+    void reconcile(ConsumerGroupHeartbeatResponseData.Assignment 
targetAssignment) {
+
+        SortedSet<TopicPartition> ownedPartitions = new TreeSet<>(COMPARATOR);
+        ownedPartitions.addAll(subscriptions.assignedPartitions());
+
+        CompletableFuture<SortedSet<TopicPartition>> 
assignedPartitionsByNameResult = 
extractTopicPartitionsFromAssignment(targetAssignment);
+
+        assignedPartitionsByNameResult.whenComplete((assignedPartitions, 
metadataError) -> {
+            if (metadataError != null) {
+                log.error("Reconciliation failed due to error getting metadata 
to resolve topic " +
+                        "names for topic IDs {} in target assignment.", 
targetAssignment);
+                // TODO: failing reconciliation (no ack sent to the broker), 
but leaving
+                //  member in STABLE state. Double check if any other action 
should be taken
+                //  here.
+                transitionTo(MemberState.STABLE);
+                return;
+            }
+
+            if 
(!subscriptions.checkAssignmentMatchedSubscription(assignedPartitions)) {
+                log.debug("Target assignment {} does not match the current 
subscription {}; it is " +
+                                "likely that the subscription has changed 
since we joined the group, " +
+                                "will re-join with current subscription",
+                        targetAssignment,
+                        subscriptions.prettyString());
+                transitionToJoinGroup();
+                return;
+            }
+
+            // Partitions to assign (not previously owned)
+            SortedSet<TopicPartition> addedPartitions = new 
TreeSet<>(COMPARATOR);
+            addedPartitions.addAll(assignedPartitions);
+            addedPartitions.removeAll(ownedPartitions);
+
+
+            // Partitions to revoke
+            SortedSet<TopicPartition> revokedPartitions = new 
TreeSet<>(COMPARATOR);
+            revokedPartitions.addAll(ownedPartitions);
+            revokedPartitions.removeAll(assignedPartitions);
+
+            log.info("Updating assignment with\n" +
+                            "\tAssigned partitions:                       
{}\n" +
+                            "\tCurrent owned partitions:                  
{}\n" +
+                            "\tAdded partitions (assigned - owned):       
{}\n" +
+                            "\tRevoked partitions (owned - assigned):     
{}\n",
+                    assignedPartitions,
+                    ownedPartitions,
+                    addedPartitions,
+                    revokedPartitions
+            );
+
+            CompletableFuture<Void> revocationResult;
+            if (!revokedPartitions.isEmpty()) {
+                revocationResult = revokePartitions(revokedPartitions);
+            } else {
+                revocationResult = CompletableFuture.completedFuture(null);
+                // Reschedule the auto commit starting from now (new 
assignment received without any
+                // revocation).
+                commitRequestManager.resetAutoCommitTimer();
+            }
+
+            // Future that will complete when the full reconciliation process 
completes (revocation
+            // and assignment, executed sequentially)
+            CompletableFuture<Void> reconciliationResult =
+                    revocationResult.thenCompose(r -> {
+                        if (state == MemberState.RECONCILING) {
+                            // Make assignment effective on the client by 
updating the subscription state.
+                            
subscriptions.assignFromSubscribed(assignedPartitions);
+                            // Invoke user call back
+                            return 
invokeOnPartitionsAssignedCallback(addedPartitions);
+                        } else {
+                            // Revocation callback completed but member 
already moved out of the
+                            // reconciling state.
+                            CompletableFuture<Void> res = new 
CompletableFuture<>();
+                            res.completeExceptionally(new 
KafkaException("Interrupting " +
+                                    "reconciliation after revocation, as the 
member already " +
+                                    "transitioned out of the reconciling state 
into " + state));
+                            return res;
+                        }
+                    });
+
+            reconciliationResult.whenComplete((result, error) -> {
+                if (error != null) {
+                    log.error("Reconciliation failed. ", error);
+                } else {
+                    if (state == MemberState.RECONCILING) {
+
+                        // Make assignment effective on the broker by 
transitioning to send acknowledge.
+                        
transitionTo(MemberState.ACKNOWLEDGING_RECONCILED_ASSIGNMENT);
+
+                        // Make assignment effective on the member group 
manager
+                        this.currentAssignment = assignedPartitions;
+                        this.targetAssignment = Optional.empty();
+
+                    } else {
+                        log.debug("New assignment processing completed but the 
member already " +
+                                "transitioned out of the reconciliation state 
into {}. Interrupting " +
+                                "reconciliation as it's not relevant 
anymore,", state);
+                        // TODO: double check if subscription state changes 
needed. This is expected to be
+                        //  the case where the member got fenced, failed or 
unsubscribed while the
+                        //  reconciliation was in process. Transitions to 
those states update the
+                        //  subscription state accordingly so it shouldn't be 
necessary to make any changes
+                        //  to the subscription state at this point.
+                    }
+                }
+            });
+        });
+    }
+
+    /**
+     * Build set of TopicPartition (topic name and partition id) from the 
assignment received
+     * from the broker (topic IDs and list of partitions). For each topic ID 
this will attempt to
+     * find the topic name in the metadata. If a topic ID is not found, this 
will request a
+     * metadata update, and the reconciliation will resume the topic metadata 
is received.
+     *
+     * @param assignment Assignment received from the broker, containing 
partitions grouped by
+     *                   topic id.
+     * @return Set of {@link TopicPartition} containing topic name and 
partition id.
+     */
+    private CompletableFuture<SortedSet<TopicPartition>> 
extractTopicPartitionsFromAssignment(
+            ConsumerGroupHeartbeatResponseData.Assignment assignment) {
+        SortedSet<TopicPartition> assignedPartitions = new 
TreeSet<>(COMPARATOR);
+
+        List<Uuid> topicsRequiringMetadata = new ArrayList<>();
+        assignment.topicPartitions().forEach(topicPartitions -> {
+            Uuid topicId = topicPartitions.topicId();
+            if (!metadata.topicNames().containsKey(topicId)) {
+                topicsRequiringMetadata.add(topicId);
+            } else {
+                String topicName = metadata.topicNames().get(topicId);
+                topicPartitions.partitions().forEach(tp -> 
assignedPartitions.add(new TopicPartition(topicName, tp)));
+            }
+
+        });

Review Comment:
   I guess it's OK to use `ConsumerMetadata` directly like this as we "own" 
updating it on the consumer network thread.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -181,33 +243,461 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
     public void transitionToFenced() {
         resetEpoch();
         transitionTo(MemberState.FENCED);
+
+        // Release assignment
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+        callbackResult.whenComplete((result, error) -> {
+            if (error != null) {
+                log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+                        "after member got fenced. Member will rejoin the group 
anyways.", error);
+            }
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+            transitionToJoinGroup();
+        });
     }
 
     /**
      * {@inheritDoc}
      */
     @Override
     public void transitionToFailed() {
-        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-        transitionTo(MemberState.FAILED);
+        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+        // Update epoch to indicate that the member is not in the group 
anymore, so that the
+        // onPartitionsLost is called to release assignment.
+        memberEpoch = -1;

Review Comment:
   Would you mind making a constant for `-1` just so it's easier to grep 
through the code and find places where the consumer is in this state?



##########
clients/src/main/java/org/apache/kafka/common/Topic.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common;
+
+import java.util.Objects;
+
+/**
+ * A topic represented by a universally unique identifier and a topic name.
+ */
+public class Topic {

Review Comment:
   Two questions:
   
   1. `Topic` wants to unify the topic ID and topic name information, but it 
explicitly allows either to be `null`. Technically, since there are no checks, 
both values could be `null`. Is that intentional?
   2. Notwithstanding the above, can we add this class without a KIP? If not, 
can we move it to `o.a.k.common.internals`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -676,12 +676,12 @@ public ConsumerGroupMetadata groupMetadata() {
 
     @Override
     public void enforceRebalance() {
-        throw new KafkaException("method not implemented");
+        throw new UnsupportedOperationException("Operation not supported in 
new consumer group protocol");

Review Comment:
   Would it be "wrong" to have the method implementation log the message 
instead of throwing an error?



##########
clients/src/main/java/org/apache/kafka/common/Topic.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common;
+
+import java.util.Objects;
+
+/**
+ * A topic represented by a universally unique identifier and a topic name.
+ */
+public class Topic {
+
+    private final Uuid topicId;
+
+    private final String topicName;
+
+    public Topic(Uuid topicId) {
+        this(topicId, null);
+    }
+
+    public Topic(String topicName) {
+        this(null, topicName);
+    }
+
+    public Topic(Uuid topicId, String topicName) {
+        this.topicId = topicId;
+        this.topicName = topicName;
+    }
+
+    public Uuid topicId() {
+        return topicId;
+    }
+
+    public String topicName() {
+        return topicName;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        Topic other = (Topic) o;
+        return Objects.equals(topicId, other.topicId) && 
Objects.equals(topicName, other.topicName);
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = prime + topicId.hashCode();
+        result = prime * result + topicName.hashCode();

Review Comment:
   This would throw a `NullPointerException`, wouldn't it?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -843,6 +843,8 @@ private void updatePatternSubscription(Cluster cluster) {
     @Override
     public void unsubscribe() {
         fetchBuffer.retainAll(Collections.emptySet());
+        // TODO: send leave group event to release assignment and send HB to 
leave. The event
+        //  should end up triggering the membershipManager.leaveGroup()

Review Comment:
   This makes sense!



##########
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##########
@@ -729,6 +729,13 @@ protected MetadataRequest.Builder 
newMetadataRequestBuilderForNewTopics() {
         return null;
     }
 
+    /**
+     * @return Mapping from topic IDs to topic names for all topics in the 
cache.
+     */
+    public synchronized Map<Uuid, String> topicNames() {

Review Comment:
   Now that I'm noticing, is this a public API violation? It's not in 
`internals` and we're adding a `public` method 🤔



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -181,33 +243,461 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
     public void transitionToFenced() {
         resetEpoch();
         transitionTo(MemberState.FENCED);
+
+        // Release assignment
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+        callbackResult.whenComplete((result, error) -> {
+            if (error != null) {
+                log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+                        "after member got fenced. Member will rejoin the group 
anyways.", error);
+            }
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+            transitionToJoinGroup();
+        });
     }
 
     /**
      * {@inheritDoc}
      */
     @Override
     public void transitionToFailed() {
-        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-        transitionTo(MemberState.FAILED);
+        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+        // Update epoch to indicate that the member is not in the group 
anymore, so that the
+        // onPartitionsLost is called to release assignment.
+        memberEpoch = -1;
+        invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+
+        transitionTo(MemberState.FATAL);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void transitionToJoinGroup() {
+        resetEpoch();
+        transitionTo(MemberState.JOINING);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public CompletableFuture<Void> leaveGroup() {
+        transitionTo(MemberState.LEAVING_GROUP);
+
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+        callbackResult.whenComplete((result, error) -> {
+
+            // Clear the subscription, no matter if the callback execution 
failed or succeeded.
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+
+            // Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+            // group (even in the case where the member had no assignment to 
release or when the
+            // callback execution failed.)
+            transitionToSendingLeaveGroup();
+
+        });
+
+        // Return callback future to indicate that the leave group is done 
when the callbacks
+        // complete, without waiting for the heartbeat to be sent out. (Best 
effort to send it
+        // but do not hold the leave group operation for it)
+        return callbackResult;
     }
 
+    /**
+     * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+     * onPartitionsLost.
+     * <ul>
+     *     <li>If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+     *     This will be the case when releasing assignment because the member 
is intentionally
+     *     leaving the group (after a call to unsubscribe)</li>
+     *
+     *     <li>If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+     *     This will be the case when releasing assignment after being fenced 
.</li>
+     * </ul>
+     *
+     * @return Future that will complete when the callback execution completes.
+     */
+    private CompletableFuture<Void> 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+        SortedSet<TopicPartition> droppedPartitions = new 
TreeSet<>(COMPARATOR);
+        droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+        CompletableFuture<Void> callbackResult;
+        if (droppedPartitions.isEmpty()) {
+            // No assignment to release
+            callbackResult = CompletableFuture.completedFuture(null);
+        } else {
+            // Release assignment
+            if (memberEpoch > 0) {
+                // Member is part of the group. Invoke onPartitionsRevoked.
+                callbackResult = revokePartitions(droppedPartitions);
+            } else {
+                // Member is not part of the group anymore. Invoke 
onPartitionsLost.
+                callbackResult = 
invokeOnPartitionsLostCallback(droppedPartitions);
+            }
+        }
+        return callbackResult;
+    }
+
+    /**
+     * Reset member epoch to the value required for the leave the group 
heartbeat request, and
+     * transition to the {@link MemberState#SENDING_LEAVE_REQUEST} state so 
that a heartbeat
+     * request is sent out with it.
+     */
+    private void transitionToSendingLeaveGroup() {
+        memberEpoch = leaveGroupEpoch();
+        currentAssignment = new HashSet<>();
+        targetAssignment = Optional.empty();
+        transitionTo(MemberState.SENDING_LEAVE_REQUEST);
+    }
+
+    /**
+     * Return the epoch to use in the Heartbeat request to indicate that the 
member wants to
+     * leave the group. Should be -2 if this is a static member, or -1 in any 
other case.
+     */
+    private int leaveGroupEpoch() {
+        return groupInstanceId.isPresent() ? -2 : -1;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
     @Override
-    public boolean shouldSendHeartbeat() {
-        return state() != MemberState.FAILED;
+    public boolean shouldHeartbeatNow() {
+        return state() == MemberState.ACKNOWLEDGING_RECONCILED_ASSIGNMENT ||
+                state() == MemberState.SENDING_LEAVE_REQUEST;
     }
 
     /**
-     * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
-     * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+     * {@inheritDoc}
      */
-    private boolean maybeTransitionToStable() {
-        if (!hasPendingTargetAssignment()) {
+    @Override
+    public void onHeartbeatRequestSent() {
+        if (state() == MemberState.ACKNOWLEDGING_RECONCILED_ASSIGNMENT) {
             transitionTo(MemberState.STABLE);
+        } else if (state() == MemberState.SENDING_LEAVE_REQUEST) {
+            transitionTo(MemberState.NOT_IN_GROUP);
+        }
+    }
+
+    @Override
+    public boolean shouldSkipHeartbeat() {
+        return state() == MemberState.NOT_IN_GROUP || state() == 
MemberState.FATAL;
+    }
+
+    void reconcile(ConsumerGroupHeartbeatResponseData.Assignment 
targetAssignment) {
+
+        SortedSet<TopicPartition> ownedPartitions = new TreeSet<>(COMPARATOR);
+        ownedPartitions.addAll(subscriptions.assignedPartitions());
+
+        CompletableFuture<SortedSet<TopicPartition>> 
assignedPartitionsByNameResult = 
extractTopicPartitionsFromAssignment(targetAssignment);
+
+        assignedPartitionsByNameResult.whenComplete((assignedPartitions, 
metadataError) -> {
+            if (metadataError != null) {
+                log.error("Reconciliation failed due to error getting metadata 
to resolve topic " +
+                        "names for topic IDs {} in target assignment.", 
targetAssignment);
+                // TODO: failing reconciliation (no ack sent to the broker), 
but leaving
+                //  member in STABLE state. Double check if any other action 
should be taken
+                //  here.
+                transitionTo(MemberState.STABLE);
+                return;
+            }
+
+            if 
(!subscriptions.checkAssignmentMatchedSubscription(assignedPartitions)) {
+                log.debug("Target assignment {} does not match the current 
subscription {}; it is " +
+                                "likely that the subscription has changed 
since we joined the group, " +
+                                "will re-join with current subscription",
+                        targetAssignment,
+                        subscriptions.prettyString());
+                transitionToJoinGroup();
+                return;
+            }
+
+            // Partitions to assign (not previously owned)
+            SortedSet<TopicPartition> addedPartitions = new 
TreeSet<>(COMPARATOR);
+            addedPartitions.addAll(assignedPartitions);
+            addedPartitions.removeAll(ownedPartitions);
+
+
+            // Partitions to revoke
+            SortedSet<TopicPartition> revokedPartitions = new 
TreeSet<>(COMPARATOR);
+            revokedPartitions.addAll(ownedPartitions);
+            revokedPartitions.removeAll(assignedPartitions);
+
+            log.info("Updating assignment with\n" +
+                            "\tAssigned partitions:                       
{}\n" +
+                            "\tCurrent owned partitions:                  
{}\n" +
+                            "\tAdded partitions (assigned - owned):       
{}\n" +
+                            "\tRevoked partitions (owned - assigned):     
{}\n",
+                    assignedPartitions,
+                    ownedPartitions,
+                    addedPartitions,
+                    revokedPartitions
+            );
+
+            CompletableFuture<Void> revocationResult;
+            if (!revokedPartitions.isEmpty()) {
+                revocationResult = revokePartitions(revokedPartitions);
+            } else {
+                revocationResult = CompletableFuture.completedFuture(null);
+                // Reschedule the auto commit starting from now (new 
assignment received without any
+                // revocation).
+                commitRequestManager.resetAutoCommitTimer();
+            }
+
+            // Future that will complete when the full reconciliation process 
completes (revocation
+            // and assignment, executed sequentially)
+            CompletableFuture<Void> reconciliationResult =
+                    revocationResult.thenCompose(r -> {
+                        if (state == MemberState.RECONCILING) {
+                            // Make assignment effective on the client by 
updating the subscription state.
+                            
subscriptions.assignFromSubscribed(assignedPartitions);
+                            // Invoke user call back
+                            return 
invokeOnPartitionsAssignedCallback(addedPartitions);
+                        } else {
+                            // Revocation callback completed but member 
already moved out of the
+                            // reconciling state.
+                            CompletableFuture<Void> res = new 
CompletableFuture<>();
+                            res.completeExceptionally(new 
KafkaException("Interrupting " +
+                                    "reconciliation after revocation, as the 
member already " +
+                                    "transitioned out of the reconciling state 
into " + state));
+                            return res;
+                        }
+                    });
+
+            reconciliationResult.whenComplete((result, error) -> {
+                if (error != null) {
+                    log.error("Reconciliation failed. ", error);
+                } else {
+                    if (state == MemberState.RECONCILING) {
+
+                        // Make assignment effective on the broker by 
transitioning to send acknowledge.
+                        
transitionTo(MemberState.ACKNOWLEDGING_RECONCILED_ASSIGNMENT);
+
+                        // Make assignment effective on the member group 
manager
+                        this.currentAssignment = assignedPartitions;
+                        this.targetAssignment = Optional.empty();
+
+                    } else {
+                        log.debug("New assignment processing completed but the 
member already " +
+                                "transitioned out of the reconciliation state 
into {}. Interrupting " +
+                                "reconciliation as it's not relevant 
anymore,", state);
+                        // TODO: double check if subscription state changes 
needed. This is expected to be
+                        //  the case where the member got fenced, failed or 
unsubscribed while the
+                        //  reconciliation was in process. Transitions to 
those states update the
+                        //  subscription state accordingly so it shouldn't be 
necessary to make any changes
+                        //  to the subscription state at this point.
+                    }
+                }
+            });
+        });
+    }
+
+    /**
+     * Build set of TopicPartition (topic name and partition id) from the 
assignment received
+     * from the broker (topic IDs and list of partitions). For each topic ID 
this will attempt to
+     * find the topic name in the metadata. If a topic ID is not found, this 
will request a
+     * metadata update, and the reconciliation will resume the topic metadata 
is received.
+     *
+     * @param assignment Assignment received from the broker, containing 
partitions grouped by
+     *                   topic id.
+     * @return Set of {@link TopicPartition} containing topic name and 
partition id.
+     */
+    private CompletableFuture<SortedSet<TopicPartition>> 
extractTopicPartitionsFromAssignment(
+            ConsumerGroupHeartbeatResponseData.Assignment assignment) {
+        SortedSet<TopicPartition> assignedPartitions = new 
TreeSet<>(COMPARATOR);
+
+        List<Uuid> topicsRequiringMetadata = new ArrayList<>();
+        assignment.topicPartitions().forEach(topicPartitions -> {
+            Uuid topicId = topicPartitions.topicId();
+            if (!metadata.topicNames().containsKey(topicId)) {
+                topicsRequiringMetadata.add(topicId);
+            } else {
+                String topicName = metadata.topicNames().get(topicId);
+                topicPartitions.partitions().forEach(tp -> 
assignedPartitions.add(new TopicPartition(topicName, tp)));
+            }
+
+        });
+
+        if (topicsRequiringMetadata.isEmpty()) {
+            return CompletableFuture.completedFuture(assignedPartitions);
         } else {
-            transitionTo(MemberState.RECONCILING);
+            return resolveTopicNamesForTopicIds(topicsRequiringMetadata, 
assignedPartitions);
+        }
+    }
+
+    /**
+     * Perform a topic metadata request to discover topic names for the given 
topic ids.
+     *
+     * @param topicsRequiringMetadata List of topic Uuid for which topic names 
are needed.
+     * @param resolvedTopicPartitions List of TopicPartitions for the topics 
with known names.
+     *                                This list will be extended when the 
missing topic names are
+     *                                received in metadata.
+     *
+     * @return Future that will complete when topic names are received for all
+     * topicsRequiringMetadata. It will fail if a metadata response is 
received but does not
+     * include all the topics that were requested.
+     */
+    private CompletableFuture<SortedSet<TopicPartition>> 
resolveTopicNamesForTopicIds(
+            List<Uuid> topicsRequiringMetadata,
+            SortedSet<TopicPartition> resolvedTopicPartitions) {
+        CompletableFuture<SortedSet<TopicPartition>> result = new 
CompletableFuture<>();
+        log.debug("Topic IDs {} received in assignment were not found in 
metadata. " +
+                "Requesting metadata to resolve topic names and proceed with 
the " +
+                "reconciliation.", topicsRequiringMetadata);
+        // TODO: request metadata only for the topics that require it. Passing 
empty list to
+        //  retrieve it for all topics until the TopicMetadataRequestManager 
supports a list
+        //  of topics.
+        CompletableFuture<Map<Topic, List<PartitionInfo>>> metadataResult = 
metadataRequestManager.requestTopicMetadata(Optional.empty());
+        metadataResult.whenComplete((topicNameAndPartitionInfo, error) -> {
+            if (error != null) {
+                // Metadata request to get topic names failed. The 
TopicMetadataManager
+                // handles retries on retriable errors, so at this point we 
consider this a
+                // fatal error.
+                log.error("Metadata request for topic IDs {} received in 
assignment failed.",
+                        topicsRequiringMetadata, error);
+                result.completeExceptionally(new KafkaException("Failed to get 
metadata for " +
+                        "topic IDs received in target assignment.", error));
+            } else {
+                topicNameAndPartitionInfo.forEach((topic, partitionInfoList) 
-> {
+                    if (topicsRequiringMetadata.contains(topic.topicId())) {
+                        partitionInfoList.forEach(partitionInfo ->
+                                resolvedTopicPartitions.add(new 
TopicPartition(topic.topicName(), partitionInfo.partition())));
+                        topicsRequiringMetadata.remove(topic.topicId());
+                    }
+                });
+                if (topicsRequiringMetadata.isEmpty()) {
+                    result.complete(resolvedTopicPartitions);
+                } else {
+                    // TODO: check if this could happen. If so, we probably 
need to retry the
+                    //  metadata request. Failing for now as simple initial 
approach.
+                    result.completeExceptionally(new KafkaException("Failed to 
resolve topic " +
+                            "names for all topic IDs received in target 
assignment."));
+                }
+            }
+        });
+        return result;
+    }
+
+    /**
+     * Revoke partitions. This will:
+     * <ul>
+     *     <li>Trigger an async commit offsets request if auto-commit 
enabled.</li>
+     *     <li>Invoke the onPartitionsRevoked callback if the user has 
registered it.</li>
+     * </ul>
+     *
+     * This will wait on the commit request to finish before invoking the 
callback. If the commit
+     * request fails, this will proceed to invoke the user callbacks anyway,
+     * returning a future that will complete or fail depending on the callback 
execution only.
+     *
+     * @param revokedPartitions Partitions to revoke.
+     * @return Future that will complete when the commit request and user 
callback completes.
+     */
+    private CompletableFuture<Void> revokePartitions(Set<TopicPartition> 
revokedPartitions) {
+        log.info("Revoking previously assigned partitions {}", 
Utils.join(revokedPartitions, ", "));
+
+        logPausedPartitionsBeingRevoked(revokedPartitions);
+
+        // Mark partitions as pending revocation to stop fetching from the 
partitions (no new
+        // fetches sent out, and no in-flight fetches responses processed).
+        markPendingRevocationToPauseFetching(revokedPartitions);
+
+        // Future that will complete when the revocation completes (including 
offset commit
+        // request and user callback execution)
+        CompletableFuture<Void> revocationResult = new CompletableFuture<>();
+
+        // Commit offsets if auto-commit enabled.
+        CompletableFuture<Void> commitResult;
+        if (commitRequestManager.autoCommitEnabled()) {
+            commitResult = commitRequestManager.maybeAutoCommitAllConsumed();
+        } else {
+            commitResult = CompletableFuture.completedFuture(null);
+        }
+
+        commitResult.whenComplete((result, error) -> {
+
+            if (error != null) {
+                // Commit request failed (commit request manager internally 
retries on
+                // retriable errors, so at this point we assume this is 
non-retriable, but
+                // proceed with the revocation anyway).
+                log.debug("Commit request before revocation failed with 
non-retriable error. Will" +
+                        " proceed with the revocation anyway.", error);
+            }
+
+            CompletableFuture<Void> userCallbackResult = 
invokeOnPartitionsRevokedCallback(revokedPartitions);
+            userCallbackResult.whenComplete((callbackResult, callbackError) -> 
{
+                if (callbackError != null) {
+                    log.error("User provided callback failed on invocation of 
onPartitionsRevoked" +
+                            " for partitions {}", revokedPartitions, 
callbackError);
+                    revocationResult.completeExceptionally(callbackError);
+                } else {
+                    revocationResult.complete(null);
+                }
+
+            });
+        });
+        return revocationResult;
+    }
+
+    /**
+     * Mark partitions as 'pending revocation', to effectively stop fetching 
while waiting for
+     * the commit offsets request to complete, and ensure the application's 
position don't get
+     * ahead of the committed positions. This mark will ensure that:
+     * <ul>
+     *     <li>No new fetches will be sent out for the partitions being 
revoked</li>
+     *     <li>Previous in-flight fetch requests that may complete while the 
partitions are being revoked won't be processed.</li>
+     * </ul>
+     */
+    private void markPendingRevocationToPauseFetching(Set<TopicPartition> 
partitionsToRevoke) {
+        // When asynchronously committing offsets prior to the revocation of a 
set of partitions, there will be a
+        // window of time between when the offset commit is sent and when it 
returns and revocation completes. It is
+        // possible for pending fetches for these partitions to return during 
this time, which means the application's
+        // position may get ahead of the committed position prior to 
revocation. This can cause duplicate consumption.
+        // To prevent this, we mark the partitions as "pending revocation," 
which stops the Fetcher from sending new
+        // fetches or returning data from previous fetches to the user.
+        log.debug("Marking partitions pending for revocation: {}", 
partitionsToRevoke);
+        subscriptions.markPendingRevocation(partitionsToRevoke);
+    }
+
+    private CompletableFuture<Void> 
invokeOnPartitionsRevokedCallback(Set<TopicPartition> partitionsRevoked) {
+        Optional<ConsumerRebalanceListener> listener = 
subscriptions.rebalanceListener();
+        if (listener.isPresent()) {
+            throw new UnsupportedOperationException("User-defined callbacks 
not supported yet");

Review Comment:
   Yes, we'll have to resolve how the callbacks fit into this model that uses 
`Future`s, because the callbacks need to be invoked on the application thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to