[ https://issues.apache.org/jira/browse/KAFKA-13435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17454118#comment-17454118 ]
David Jacot commented on KAFKA-13435: ------------------------------------- Let me summarise the options that we have discussed so far: 1. The group coordinator could trigger a rebalance when the static leader re-joins the group. The major advantage of this solution is that it would fix the issue for all client versions. The downside is that it would trigger a rebalance every time that the leader re-joins the group and that goes a bit against the initial goal of the static members. 2. The group coordinator could parse the assignments, listen to partition changes and trigger rebalances when required. That is very likely the cleanest option. However, as the group coordinator does not really parse the assignments yet, it would require a significant effort to be implemented. Given that this is an edge case, I am not sure that it is worth the effort. It might be better to address this when we redesign the protocol. 3. At the moment, the re-joining leader can't detect metadata changes because it does not know that it is the leader when re-joining the group. This is something that we did on purpose otherwise the leader would compute assignment but they would not be propagated to other members. I am not sure if they is a real reason besides reducing noise and confusion on the client side. Here we could do this differently. For instance, we could tell the leader that it is the leader of the group when rejoining it by setting the correct leader in the JoinGroup response. Based on this, the leader would assign partitions and send a SyncGroup request. When the SyncGroup request is received from the leader and the group is Stable, we could check if the assignments have changed and trigger a rebalance if they did. 4. As explained in pt. 3, the issue is that the re-joining member does not know that it is the leader when re-joining the group and this prevents the leader from monitoring any metadata changes based on the so called "assignment snapshot". Unlike the subscriptions which are monitored by all members of the group, we have restricted this case to the leader only. It seems that we could relax this constraint and extend the mechanism to be executed by all members of the group. Note that only a JoinGroup request coming from the leader of group will trigger a rebalance. The downside is that it would increase the number of JoinGroup requests as all members would try to send them. > Group won't consume partitions added after static member restart > ---------------------------------------------------------------- > > Key: KAFKA-13435 > URL: https://issues.apache.org/jira/browse/KAFKA-13435 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 3.0.0 > Reporter: Ryan Leslie > Assignee: David Jacot > Priority: Critical > > When using consumer groups with static membership, if the consumer marked as > leader has restarted, then metadata changes such as partition increase are > not triggering expected rebalances. > To reproduce this issue, simply: > # Create a static consumer subscribed to a single topic > # Close the consumer and create a new one with the same group instance id > # Increase partitions for the topic > # Observe that no rebalance occurs and the new partitions are not assigned > I have only tested this in 2.7, but it may apply to newer versions as well. > h3. Analysis > In {_}ConsumerCoordinator{_}, one responsibility of the leader consumer is to > track metadata and trigger a rebalance if there are changes such as new > partitions added: > [https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L793] > {code:java} > if (assignmentSnapshot != null && > !assignmentSnapshot.matches(metadataSnapshot)) { > ... > requestRejoinIfNecessary(reason); > return true; > } > {code} > Note thatĀ _assignmentSnapshot_ is currently only set if the consumer is the > leader: > [https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L353] > {code:java} > // Only the leader is responsible for monitoring for metadata changes (i.e. > partition changes) > if (!isLeader) > assignmentSnapshot = null; > {code} > And _isLeader_ is only true after an assignment is performed during a > rebalance: > [https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L634] > That is, when a consumer group forms, exactly one consumer in the group > should haveĀ _isLeader == True_ and be responsible for triggering rebalances > on metadata changes. > However, in the case of static membership, if the leader has been restarted > and rejoined the group, the group essentially no longer has a current leader. > Even though the metadata changes are fetched, no rebalance will be triggered. > That is, _isLeader_ will be false for all members. > This issue does not resolve until after an actual group change that causes a > proper rebalance. In order to safely make a partition increase when using > static membership, consumers must be stopped and have timed out, or forcibly > removed with {_}AdminClient.removeMembersFromConsumerGroup(){_}. > Correcting this in the client probably also requires help from the broker. > Currently, when a static consumer that is leader is restarted, the > coordinator does recognize the change: > e.g. leader _bbfcb930-61a3-4d21-945c-85f4576490ff_ was restarted > {noformat} > [2021-11-04 13:53:13,487] INFO [GroupCoordinator 4]: Static member > Some(1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0) of group ryan_test > with unknown member id rejoins, assigning new member id > 1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-af88ecf2- > 6ebf-47da-95ef-c54fef17ab74, while old member id > 1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-bbfcb930-61a3-4d21-945c-85f4576490ff > will be removed. ( > kafka.coordinator.group.GroupCoordinator){noformat} > However, it does not attempt to update the leader id since this isn't a new > rebalance, and JOIN_GROUP will continue returning the now stale member id as > leader: > {noformat} > 2021-11-04 13:53:13,490 DEBUG o.a.k.c.c.i.AbstractCoordinator [Consumer > instanceId=1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0, > clientId=1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0, > groupId=ryan_test] Received successful JoinGroup response: > JoinGroupResponseData(throttleTimeMs=0, errorCode=0, generationId=40, > protocolType='consumer', protocolName='range', > leader='1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-bbfcb930-61a3-4d21-945c-85f4576490ff', > > memberId='1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-af88ecf2-6ebf-47da-95ef-c54fef17ab74', > members=[]){noformat} > This means that it's not easy for any particular restarted member to identify > that it should consider itself leader and handle metadata changes. > There is reference to the difficulty of leader restarts in KAFKA-7728 but the > focus seemed mainly on avoiding needless rebalances for static members. That > goal was accomplished, but this issue seems to be a side effect of both not > rebalancing AND not having the rejoined member reclaim its leadership status. > Also, I have not verified if it's strictly related or valid, but noticed this > ticket has been opened too: KAFKA-12759. -- This message was sent by Atlassian Jira (v8.20.1#820001)