[ 
https://issues.apache.org/jira/browse/KAFKA-7018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16514939#comment-16514939
 ] 

Boyang Chen edited comment on KAFKA-7018 at 6/17/18 12:06 AM:
--------------------------------------------------------------

A summary of the idea:

When leader sends join request, it will always trigger another rebalance 
because there could be metadata change on the topic. So we need to make sure 
other members are aware of that.

Let's imagine a condition where every member joins with previous generation 
info, then:

If follower joins after leader, the stage would be starting from 
prepareRebalance, all the join group request from followers will be retained 
and make member state to awaitJoinCallBack.

If followers are joining before leader joins, they would send another sync 
group request.
 * If leader changes group state to prepareRebalance, we refuse the sync group 
request and they would rejoin.
 * If leader haven’t changed the group state to prepareRebalance, sync group 
request would success and follower starts sending heartbeat. In 
handleHeartbeat() function, eventually the leader will move state towards 
prepareRebalance, so the rebalance in progress error will be triggered.

Code logic here:

 *_} else if (error == Errors.REBALANCE_IN_PROGRESS) {_*

               *_log.debug("Attempt to heartbeat failed since group is 
rebalancing");_*

               *_requestRejoin();_*

               *_future.raise(Errors.REBALANCE_IN_PROGRESS);_*

           *_}_*

So now the only thing we need to do is on client side to make sure member id 
keeps the same through restart, don't need to worry about the join sequence of 
follower/leader.

 


was (Author: bchen225242):
A summary of the idea:

When leader sends join request, it will always trigger another rebalance 
because there could be metadata change on the topic. So we need to make sure 
other members are aware of that.

Let's imagine a condition where every member joins with previous generation 
info, then:

If follower joins after leader, the stage would be starting from 
prepareRebalance, all the join group request from followers will be retained 
and make member state to awaitJoinCallBack. 

If followers are joining before leader joins, they would send another sync 
group request. 
 * If leader changes group state to prepareRebalance, we refuse the sync group 
request and they would rejoin.
 * If leader haven’t changed the group state to prepareRebalance, sync group 
request would success and follower starts sending heartbeat. In 
handleHeartbeat() function, eventually the leader will move state towards 
prepareRebalance, so the rebalance in progress error will be triggered.

Code logic here:

 *_} else if (error == Errors.REBALANCE_IN_PROGRESS) {_*

                *_log.debug("Attempt to heartbeat failed since group is 
rebalancing");_*

                *_requestRejoin();_*

                *_future.raise(Errors.REBALANCE_IN_PROGRESS);_*

            *_}_* 

So now the only thing we need to do is on client side to make sure member id 
keeps the same through restart.

 

> persist memberId for consumer restart
> -------------------------------------
>
>                 Key: KAFKA-7018
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7018
>             Project: Kafka
>          Issue Type: Improvement
>          Components: consumer, streams
>            Reporter: Boyang Chen
>            Assignee: Boyang Chen
>            Priority: Major
>
> In group coordinator, there is a logic to neglect join group request from 
> existing follower consumers:
> {code:java}
> case Empty | Stable =>
>   if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
>     // if the member id is unknown, register the member to the group
>     addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, 
> clientHost, protocolType, protocols, group, responseCallback)
>   } else {
>     val member = group.get(memberId)
>     if (group.isLeader(memberId) || !member.matches(protocols)) {
>       // force a rebalance if a member has changed metadata or if the leader 
> sends JoinGroup.
>       // The latter allows the leader to trigger rebalances for changes 
> affecting assignment
>       // which do not affect the member metadata (such as topic metadata 
> changes for the consumer)
>       updateMemberAndRebalance(group, member, protocols, responseCallback)
>     } else {
>       // for followers with no actual change to their metadata, just return 
> group information
>       // for the current generation which will allow them to issue SyncGroup
>       responseCallback(JoinGroupResult(
>         members = Map.empty,
>         memberId = memberId,
>         generationId = group.generationId,
>         subProtocol = group.protocolOrNull,
>         leaderId = group.leaderOrNull,
>         error = Errors.NONE))
>     }
> {code}
> While looking at the AbstractCoordinator, I found that the generation was 
> hard-coded as 
> NO_GENERATION on restart, which means we will send UNKNOWN_MEMBER_ID in the 
> first join group request. This means we will treat the restarted consumer as 
> a new member, so the rebalance will be triggered until session timeout.
> I'm trying to clarify the following things before we extend the discussion:
>  # Whether my understanding of the above logic is right (Hope [~mjsax] could 
> help me double check)
>  # Whether it makes sense to persist last round of memberId for consumers? We 
> currently only need this feature in stream application, but will do no harm 
> if we also use it for consumer in general. This would be a nice-to-have 
> feature on consumer restart when we configured the loading-previous-memberId 
> to true. If we failed, simply use the UNKNOWN_MEMBER_ID
>  # The behavior could also be changed on the broker side, but I suspect it is 
> very risky. So far client side change should be the least effort. The end 
> goal is to avoid excessive rebalance from the same consumer restart, so if 
> you feel server side change could also help, we could further discuss.
> Thank you for helping out! [~mjsax] [~guozhang]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to