[
https://issues.apache.org/jira/browse/KAFKA-7018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Boyang Chen updated KAFKA-7018:
-------------------------------
Pull request: https://github.com/apache/kafka/pull/5176
> persist memberId for consumer restart
> -------------------------------------
>
> Key: KAFKA-7018
> URL: https://issues.apache.org/jira/browse/KAFKA-7018
> Project: Kafka
> Issue Type: Improvement
> Components: consumer, streams
> Reporter: Boyang Chen
> Assignee: Boyang Chen
> Priority: Major
>
> In group coordinator, there is a logic to neglect join group request from
> existing follower consumers:
> {code:java}
> case Empty | Stable =>
> if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
> // if the member id is unknown, register the member to the group
> addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId,
> clientHost, protocolType, protocols, group, responseCallback)
> } else {
> val member = group.get(memberId)
> if (group.isLeader(memberId) || !member.matches(protocols)) {
> // force a rebalance if a member has changed metadata or if the leader
> sends JoinGroup.
> // The latter allows the leader to trigger rebalances for changes
> affecting assignment
> // which do not affect the member metadata (such as topic metadata
> changes for the consumer)
> updateMemberAndRebalance(group, member, protocols, responseCallback)
> } else {
> // for followers with no actual change to their metadata, just return
> group information
> // for the current generation which will allow them to issue SyncGroup
> responseCallback(JoinGroupResult(
> members = Map.empty,
> memberId = memberId,
> generationId = group.generationId,
> subProtocol = group.protocolOrNull,
> leaderId = group.leaderOrNull,
> error = Errors.NONE))
> }
> {code}
> While looking at the AbstractCoordinator, I found that the generation was
> hard-coded as
> NO_GENERATION on restart, which means we will send UNKNOWN_MEMBER_ID in the
> first join group request. This means we will treat the restarted consumer as
> a new member, so the rebalance will be triggered until session timeout.
> I'm trying to clarify the following things before we extend the discussion:
> # Whether my understanding of the above logic is right (Hope [~mjsax] could
> help me double check)
> # Whether it makes sense to persist last round of memberId for consumers? We
> currently only need this feature in stream application, but will do no harm
> if we also use it for consumer in general. This would be a nice-to-have
> feature on consumer restart when we configured the loading-previous-memberId
> to true. If we failed, simply use the UNKNOWN_MEMBER_ID
> # The behavior could also be changed on the broker side, but I suspect it is
> very risky. So far client side change should be the least effort. The end
> goal is to avoid excessive rebalance from the same consumer restart, so if
> you feel server side change could also help, we could further discuss.
> Thank you for helping out! [~mjsax] [~guozhang]
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)