cadonna commented on code in PR #15003:
URL: https://github.com/apache/kafka/pull/15003#discussion_r1425582606
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##########
@@ -261,6 +261,32 @@ public void testFencingWhenStateIsPrepareLeaving() {
assertEquals(MemberState.LEAVING, membershipManager.state());
}
+ @Test
+ public void testAssignmentReceivedWhenStateIsPrepareLeaving() {
Review Comment:
Could you have a test for each state that cannot handle new assignments? I
guess that the states that can handle new assignments are tested elsewhere.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -348,34 +348,54 @@ public void
onHeartbeatResponseReceived(ConsumerGroupHeartbeatResponseData respo
ConsumerGroupHeartbeatResponseData.Assignment assignment =
response.assignment();
if (assignment != null) {
- replaceUnresolvedAssignmentWithNewAssignment(assignment);
- if (!assignmentUnresolved.equals(currentAssignment)) {
- // Transition the member to RECONCILING when receiving a new
target
- // assignment from the broker, different from the current
assignment. Note that the
- // reconciliation might not be triggered just yet because of
missing metadata.
- transitionTo(MemberState.RECONCILING);
- assignmentReadyToReconcile.clear();
- resolveMetadataForUnresolvedAssignment();
- reconcile();
- } else {
- // Same assignment received, nothing to reconcile.
- log.debug("Target assignment {} received from the broker is
equals to the member " +
- "current assignment {}. Nothing to reconcile.",
- assignmentUnresolved, currentAssignment);
- // Make sure we transition the member back to STABLE if it was
RECONCILING (ex.
- // member was RECONCILING unresolved assignments that were
just removed by the
- // broker).
- if (state == MemberState.RECONCILING) {
- // This is the case where a member was RECONCILING an
unresolved
- // assignment that was removed by the broker in a
following assignment.
- transitionTo(MemberState.STABLE);
- }
+ if
(!MemberState.RECONCILING.getPreviousValidStates().contains(state)) {
Review Comment:
This was hard to understand.
I propose two options to improve the situation.
1.
```suggestion
final boolean inStateThatCannotHandleNewAssignment =
!MemberState.RECONCILING.getPreviousValidStates().contains(state);
if (inStateThatCannotHandleNewAssignment) {
```
2. (my favorite)
Add a method `canHandleNewAssignment()` to the enum that returns if the
state can handle new assignments or not.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]