lianetm commented on code in PR #15003:
URL: https://github.com/apache/kafka/pull/15003#discussion_r1425622660
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -348,34 +348,54 @@ public void
onHeartbeatResponseReceived(ConsumerGroupHeartbeatResponseData respo
ConsumerGroupHeartbeatResponseData.Assignment assignment =
response.assignment();
if (assignment != null) {
- replaceUnresolvedAssignmentWithNewAssignment(assignment);
- if (!assignmentUnresolved.equals(currentAssignment)) {
- // Transition the member to RECONCILING when receiving a new
target
- // assignment from the broker, different from the current
assignment. Note that the
- // reconciliation might not be triggered just yet because of
missing metadata.
- transitionTo(MemberState.RECONCILING);
- assignmentReadyToReconcile.clear();
- resolveMetadataForUnresolvedAssignment();
- reconcile();
- } else {
- // Same assignment received, nothing to reconcile.
- log.debug("Target assignment {} received from the broker is
equals to the member " +
- "current assignment {}. Nothing to reconcile.",
- assignmentUnresolved, currentAssignment);
- // Make sure we transition the member back to STABLE if it was
RECONCILING (ex.
- // member was RECONCILING unresolved assignments that were
just removed by the
- // broker).
- if (state == MemberState.RECONCILING) {
- // This is the case where a member was RECONCILING an
unresolved
- // assignment that was removed by the broker in a
following assignment.
- transitionTo(MemberState.STABLE);
- }
+ if
(!MemberState.RECONCILING.getPreviousValidStates().contains(state)) {
Review Comment:
Sure, agree with your preferred alternative. Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]