[ https://issues.apache.org/jira/browse/KAFKA-13435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
David Jacot resolved KAFKA-13435. --------------------------------- Reviewer: Jason Gustafson Resolution: Fixed > Static membership protocol should let the leader skip assignment (KIP-814) > -------------------------------------------------------------------------- > > Key: KAFKA-13435 > URL: https://issues.apache.org/jira/browse/KAFKA-13435 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 3.0.0 > Reporter: Ryan Leslie > Assignee: David Jacot > Priority: Critical > Labels: new-rebalance-should-fix > Fix For: 3.2.0 > > > When using consumer groups with static membership, if the consumer marked as > leader has restarted, then metadata changes such as partition increase are > not triggering expected rebalances. > To reproduce this issue, simply: > # Create a static consumer subscribed to a single topic > # Close the consumer and create a new one with the same group instance id > # Increase partitions for the topic > # Observe that no rebalance occurs and the new partitions are not assigned > I have only tested this in 2.7, but it may apply to newer versions as well. > h3. Analysis > In {_}ConsumerCoordinator{_}, one responsibility of the leader consumer is to > track metadata and trigger a rebalance if there are changes such as new > partitions added: > [https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L793] > {code:java} > if (assignmentSnapshot != null && > !assignmentSnapshot.matches(metadataSnapshot)) { > ... > requestRejoinIfNecessary(reason); > return true; > } > {code} > Note thatĀ _assignmentSnapshot_ is currently only set if the consumer is the > leader: > [https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L353] > {code:java} > // Only the leader is responsible for monitoring for metadata changes (i.e. > partition changes) > if (!isLeader) > assignmentSnapshot = null; > {code} > And _isLeader_ is only true after an assignment is performed during a > rebalance: > [https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L634] > That is, when a consumer group forms, exactly one consumer in the group > should haveĀ _isLeader == True_ and be responsible for triggering rebalances > on metadata changes. > However, in the case of static membership, if the leader has been restarted > and rejoined the group, the group essentially no longer has a current leader. > Even though the metadata changes are fetched, no rebalance will be triggered. > That is, _isLeader_ will be false for all members. > This issue does not resolve until after an actual group change that causes a > proper rebalance. In order to safely make a partition increase when using > static membership, consumers must be stopped and have timed out, or forcibly > removed with {_}AdminClient.removeMembersFromConsumerGroup(){_}. > Correcting this in the client probably also requires help from the broker. > Currently, when a static consumer that is leader is restarted, the > coordinator does recognize the change: > e.g. leader _bbfcb930-61a3-4d21-945c-85f4576490ff_ was restarted > {noformat} > [2021-11-04 13:53:13,487] INFO [GroupCoordinator 4]: Static member > Some(1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0) of group ryan_test > with unknown member id rejoins, assigning new member id > 1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-af88ecf2- > 6ebf-47da-95ef-c54fef17ab74, while old member id > 1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-bbfcb930-61a3-4d21-945c-85f4576490ff > will be removed. ( > kafka.coordinator.group.GroupCoordinator){noformat} > However, it does not attempt to update the leader id since this isn't a new > rebalance, and JOIN_GROUP will continue returning the now stale member id as > leader: > {noformat} > 2021-11-04 13:53:13,490 DEBUG o.a.k.c.c.i.AbstractCoordinator [Consumer > instanceId=1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0, > clientId=1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0, > groupId=ryan_test] Received successful JoinGroup response: > JoinGroupResponseData(throttleTimeMs=0, errorCode=0, generationId=40, > protocolType='consumer', protocolName='range', > leader='1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-bbfcb930-61a3-4d21-945c-85f4576490ff', > > memberId='1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-af88ecf2-6ebf-47da-95ef-c54fef17ab74', > members=[]){noformat} > This means that it's not easy for any particular restarted member to identify > that it should consider itself leader and handle metadata changes. > There is reference to the difficulty of leader restarts in KAFKA-7728 but the > focus seemed mainly on avoiding needless rebalances for static members. That > goal was accomplished, but this issue seems to be a side effect of both not > rebalancing AND not having the rejoined member reclaim its leadership status. > Also, I have not verified if it's strictly related or valid, but noticed this > ticket has been opened too: KAFKA-12759. -- This message was sent by Atlassian Jira (v8.20.1#820001)