[jira] [Commented] (KAFKA-9752) Consumer rebalance can be stuck after new member timeout with old JoinGroup version

2021-03-01 Thread Peter Sinoros-Szabo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17292724#comment-17292724
 ] 

Peter Sinoros-Szabo commented on KAFKA-9752:


Do you think [~hachikuji] that the problem we described [in this 
thread|https://lists.apache.org/thread.html/rdf907dbb78092d46e039a9669a40edc7f36bd2eaacfd1a3025db4b6e%40%3Cusers.kafka.apache.org%3E]
 is caused by the bug you solved in this ticket? Thanks

> Consumer rebalance can be stuck after new member timeout with old JoinGroup 
> version
> ---
>
> Key: KAFKA-9752
> URL: https://issues.apache.org/jira/browse/KAFKA-9752
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.2, 2.3.1, 2.4.1
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 2.2.3, 2.5.0, 2.3.2, 2.4.2
>
>
> For older versions of the JoinGroup protocol (v3 and below), there was no way 
> for new consumer group members to get their memberId until the first 
> rebalance completed. If the JoinGroup request timed out and the client 
> disconnected, the member would nevertheless be left in the group until the 
> rebalance completed and the session timeout expired. 
> In order to prevent this situation from causing the group size to grow 
> unboundedly, we added logic in KAFKA-7610 to limit the maximum time a new 
> member will be left in the group before it would be kicked out (in spite of 
> rebalance state). 
> In KAFKA-9232, we addressed one issue with this solution. Basically the new 
> member expiration logic did not properly get cancelled after the rebalance 
> completed which means that in certain cases, a successfully joined member 
> might get kicked out of the group unnecessarily. 
> Unfortunately, this patch introduced a regression in the normal session 
> expiration logic following completion of the initial rebalance. Basically the 
> expiration task fails to get scheduled properly. The issue is in this 
> function:
> {code}
>   def shouldKeepAlive(deadlineMs: Long): Boolean = {
> if (isNew) {
>   // New members are expired after the static join timeout
>   latestHeartbeat + GroupCoordinator.NewMemberJoinTimeoutMs > deadlineMs
> } else if (isAwaitingJoin || isAwaitingSync) {
>   // Don't remove members as long as they have a request in purgatory
>   true
> } else {
>   // Otherwise check for session expiration
>   latestHeartbeat + sessionTimeoutMs > deadlineMs
> }
>   }
> {code}
> We use this logic in order to check for session expiration. On the surface, 
> there is nothing wrong with it, but it has an odd interaction with the 
> purgatory. When the heartbeat is first scheduled with `tryCompleteElseWatch`, 
> the code relies on `shouldKeepAlive` returning false so that the heartbeat 
> task is not immediately completed. This only works because we update 
> `latestHeartbeat` just prior to calling `tryCompleteElseWatch`, which means 
> that the first or third checks will fail, `shouldKeepAlive` will return 
> false, and the heartbeat expiration task will not be immediately completed. 
> The bug in this case has to do with the case when `isNew` is true. When we 
> schedule the session expiration task, the `isNew` flag is still set to true, 
> which means we will hit the first check above. Since in most cases, the 
> session timeout is less than the new member timeout of 5 minutes, the check 
> is very likely to return true. This seems like what we would want, but as 
> noted above, we rely on this function returning false when the expiration 
> task is passed to `tryCompleteElseWatch`. Since it returns true instead, the 
> task completes immediately, which means we cannot rely on its expiration.
> The impact of this bug in the worst case is that a consumer group can be left 
> in the `PreparingRebalance` state indefinitely. This state will persist until 
> there is a coordinator change (e.g. as a result of restarting the broker). 
> Note that this is only possible if 1) we have a consumer using an old 
> JoinGroup version, 2) the consumer times out and disconnects from its initial 
> JoinGroup request. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9752) Consumer rebalance can be stuck after new member timeout with old JoinGroup version

2020-09-11 Thread Javier Holguera (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17194082#comment-17194082
 ] 

Javier Holguera commented on KAFKA-9752:


What version of the Kafka clients (consumer/producer/streams) use JoinGroup v4 
that isn’t affected by this bug?

Thx

> Consumer rebalance can be stuck after new member timeout with old JoinGroup 
> version
> ---
>
> Key: KAFKA-9752
> URL: https://issues.apache.org/jira/browse/KAFKA-9752
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.2, 2.3.1, 2.4.1
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 2.2.3, 2.5.0, 2.3.2, 2.4.2
>
>
> For older versions of the JoinGroup protocol (v3 and below), there was no way 
> for new consumer group members to get their memberId until the first 
> rebalance completed. If the JoinGroup request timed out and the client 
> disconnected, the member would nevertheless be left in the group until the 
> rebalance completed and the session timeout expired. 
> In order to prevent this situation from causing the group size to grow 
> unboundedly, we added logic in KAFKA-7610 to limit the maximum time a new 
> member will be left in the group before it would be kicked out (in spite of 
> rebalance state). 
> In KAFKA-9232, we addressed one issue with this solution. Basically the new 
> member expiration logic did not properly get cancelled after the rebalance 
> completed which means that in certain cases, a successfully joined member 
> might get kicked out of the group unnecessarily. 
> Unfortunately, this patch introduced a regression in the normal session 
> expiration logic following completion of the initial rebalance. Basically the 
> expiration task fails to get scheduled properly. The issue is in this 
> function:
> {code}
>   def shouldKeepAlive(deadlineMs: Long): Boolean = {
> if (isNew) {
>   // New members are expired after the static join timeout
>   latestHeartbeat + GroupCoordinator.NewMemberJoinTimeoutMs > deadlineMs
> } else if (isAwaitingJoin || isAwaitingSync) {
>   // Don't remove members as long as they have a request in purgatory
>   true
> } else {
>   // Otherwise check for session expiration
>   latestHeartbeat + sessionTimeoutMs > deadlineMs
> }
>   }
> {code}
> We use this logic in order to check for session expiration. On the surface, 
> there is nothing wrong with it, but it has an odd interaction with the 
> purgatory. When the heartbeat is first scheduled with `tryCompleteElseWatch`, 
> the code relies on `shouldKeepAlive` returning false so that the heartbeat 
> task is not immediately completed. This only works because we update 
> `latestHeartbeat` just prior to calling `tryCompleteElseWatch`, which means 
> that the first or third checks will fail, `shouldKeepAlive` will return 
> false, and the heartbeat expiration task will not be immediately completed. 
> The bug in this case has to do with the case when `isNew` is true. When we 
> schedule the session expiration task, the `isNew` flag is still set to true, 
> which means we will hit the first check above. Since in most cases, the 
> session timeout is less than the new member timeout of 5 minutes, the check 
> is very likely to return true. This seems like what we would want, but as 
> noted above, we rely on this function returning false when the expiration 
> task is passed to `tryCompleteElseWatch`. Since it returns true instead, the 
> task completes immediately, which means we cannot rely on its expiration.
> The impact of this bug in the worst case is that a consumer group can be left 
> in the `PreparingRebalance` state indefinitely. This state will persist until 
> there is a coordinator change (e.g. as a result of restarting the broker). 
> Note that this is only possible if 1) we have a consumer using an old 
> JoinGroup version, 2) the consumer times out and disconnects from its initial 
> JoinGroup request. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9752) Consumer rebalance can be stuck after new member timeout with old JoinGroup version

2020-08-09 Thread Dibyendu Bhattacharya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17173850#comment-17173850
 ] 

Dibyendu Bhattacharya commented on KAFKA-9752:
--

Hi [~ijuma] [~hachikuji], I am seeing a different issue now with this fix . 
Earlier the ConsumerGroup was stuck in "PendingRebalance" state , which is not 
happening now , but now I see members not able to join the group . I see below 
logs where members are being removed after session timeout.

 

[2020-08-09 09:29:00,558] INFO [GroupCoordinator 5]: *Pending member* XXX in 
group YYY  *has been removed after session timeout expiration*. 
(kafka.coordinator.group.GroupCoordinator)

[2020-08-09 09:29:55,856] INFO [GroupCoordinator 5]: *Pending member* ZZZ in 
group YYY  *has been removed after session timeout expiration*. 
(kafka.coordinator.group.GroupCoordinator)

 

As I see the GroupCoridinator code,  when new member tries to join for first 
time,  GroupCoridinator also schedule a addPendingMemberExpiration (in 
doUnknownJoinGroup call ) with SessionTimeOut…

{code:}

addPendingMemberExpiration(group, newMemberId, sessionTimeoutMs)

{code:}

 

If for some reason , if addMemberAndRebalance call takes longer, and member 
still in “Pending” state, the above addPendingMemberExpiration can remove the 
pending member and they cannot join the group. I think that is what is 
happening. 

When for new member , Coordinator is already setting a timeout in 

{code:}

completeAndScheduleNextExpiration(group, member, NewMemberJoinTimeoutMs) 

{code:}

 

What the requirement for one more addPendingMemberExpiration task ? 

 

> Consumer rebalance can be stuck after new member timeout with old JoinGroup 
> version
> ---
>
> Key: KAFKA-9752
> URL: https://issues.apache.org/jira/browse/KAFKA-9752
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.2, 2.3.1, 2.4.1
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 2.2.3, 2.5.0, 2.3.2, 2.4.2
>
>
> For older versions of the JoinGroup protocol (v3 and below), there was no way 
> for new consumer group members to get their memberId until the first 
> rebalance completed. If the JoinGroup request timed out and the client 
> disconnected, the member would nevertheless be left in the group until the 
> rebalance completed and the session timeout expired. 
> In order to prevent this situation from causing the group size to grow 
> unboundedly, we added logic in KAFKA-7610 to limit the maximum time a new 
> member will be left in the group before it would be kicked out (in spite of 
> rebalance state). 
> In KAFKA-9232, we addressed one issue with this solution. Basically the new 
> member expiration logic did not properly get cancelled after the rebalance 
> completed which means that in certain cases, a successfully joined member 
> might get kicked out of the group unnecessarily. 
> Unfortunately, this patch introduced a regression in the normal session 
> expiration logic following completion of the initial rebalance. Basically the 
> expiration task fails to get scheduled properly. The issue is in this 
> function:
> {code}
>   def shouldKeepAlive(deadlineMs: Long): Boolean = {
> if (isNew) {
>   // New members are expired after the static join timeout
>   latestHeartbeat + GroupCoordinator.NewMemberJoinTimeoutMs > deadlineMs
> } else if (isAwaitingJoin || isAwaitingSync) {
>   // Don't remove members as long as they have a request in purgatory
>   true
> } else {
>   // Otherwise check for session expiration
>   latestHeartbeat + sessionTimeoutMs > deadlineMs
> }
>   }
> {code}
> We use this logic in order to check for session expiration. On the surface, 
> there is nothing wrong with it, but it has an odd interaction with the 
> purgatory. When the heartbeat is first scheduled with `tryCompleteElseWatch`, 
> the code relies on `shouldKeepAlive` returning false so that the heartbeat 
> task is not immediately completed. This only works because we update 
> `latestHeartbeat` just prior to calling `tryCompleteElseWatch`, which means 
> that the first or third checks will fail, `shouldKeepAlive` will return 
> false, and the heartbeat expiration task will not be immediately completed. 
> The bug in this case has to do with the case when `isNew` is true. When we 
> schedule the session expiration task, the `isNew` flag is still set to true, 
> which means we will hit the first check above. Since in most cases, the 
> session timeout is less than the new member timeout of 5 minutes, the check 
> is very likely to return true. This seems like what we would want, but as 
> noted above, we rely on this function returning false when the expiration 
> task 

[jira] [Commented] (KAFKA-9752) Consumer rebalance can be stuck after new member timeout with old JoinGroup version

2020-06-24 Thread Gokul Ramanan Subramanian (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17144111#comment-17144111
 ] 

Gokul Ramanan Subramanian commented on KAFKA-9752:
--

[~ijuma] thanks for the explanation. makes sense.

> Consumer rebalance can be stuck after new member timeout with old JoinGroup 
> version
> ---
>
> Key: KAFKA-9752
> URL: https://issues.apache.org/jira/browse/KAFKA-9752
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.2, 2.3.1, 2.4.1
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 2.2.3, 2.5.0, 2.3.2, 2.4.2
>
>
> For older versions of the JoinGroup protocol (v3 and below), there was no way 
> for new consumer group members to get their memberId until the first 
> rebalance completed. If the JoinGroup request timed out and the client 
> disconnected, the member would nevertheless be left in the group until the 
> rebalance completed and the session timeout expired. 
> In order to prevent this situation from causing the group size to grow 
> unboundedly, we added logic in KAFKA-7610 to limit the maximum time a new 
> member will be left in the group before it would be kicked out (in spite of 
> rebalance state). 
> In KAFKA-9232, we addressed one issue with this solution. Basically the new 
> member expiration logic did not properly get cancelled after the rebalance 
> completed which means that in certain cases, a successfully joined member 
> might get kicked out of the group unnecessarily. 
> Unfortunately, this patch introduced a regression in the normal session 
> expiration logic following completion of the initial rebalance. Basically the 
> expiration task fails to get scheduled properly. The issue is in this 
> function:
> {code}
>   def shouldKeepAlive(deadlineMs: Long): Boolean = {
> if (isNew) {
>   // New members are expired after the static join timeout
>   latestHeartbeat + GroupCoordinator.NewMemberJoinTimeoutMs > deadlineMs
> } else if (isAwaitingJoin || isAwaitingSync) {
>   // Don't remove members as long as they have a request in purgatory
>   true
> } else {
>   // Otherwise check for session expiration
>   latestHeartbeat + sessionTimeoutMs > deadlineMs
> }
>   }
> {code}
> We use this logic in order to check for session expiration. On the surface, 
> there is nothing wrong with it, but it has an odd interaction with the 
> purgatory. When the heartbeat is first scheduled with `tryCompleteElseWatch`, 
> the code relies on `shouldKeepAlive` returning false so that the heartbeat 
> task is not immediately completed. This only works because we update 
> `latestHeartbeat` just prior to calling `tryCompleteElseWatch`, which means 
> that the first or third checks will fail, `shouldKeepAlive` will return 
> false, and the heartbeat expiration task will not be immediately completed. 
> The bug in this case has to do with the case when `isNew` is true. When we 
> schedule the session expiration task, the `isNew` flag is still set to true, 
> which means we will hit the first check above. Since in most cases, the 
> session timeout is less than the new member timeout of 5 minutes, the check 
> is very likely to return true. This seems like what we would want, but as 
> noted above, we rely on this function returning false when the expiration 
> task is passed to `tryCompleteElseWatch`. Since it returns true instead, the 
> task completes immediately, which means we cannot rely on its expiration.
> The impact of this bug in the worst case is that a consumer group can be left 
> in the `PreparingRebalance` state indefinitely. This state will persist until 
> there is a coordinator change (e.g. as a result of restarting the broker). 
> Note that this is only possible if 1) we have a consumer using an old 
> JoinGroup version, 2) the consumer times out and disconnects from its initial 
> JoinGroup request. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9752) Consumer rebalance can be stuck after new member timeout with old JoinGroup version

2020-06-24 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17143841#comment-17143841
 ] 

Ismael Juma commented on KAFKA-9752:


[~gokul2411s] That's to signal that they would be included in such releases if 
they are created. Otherwise, how would we create the appropriate release notes?

> Consumer rebalance can be stuck after new member timeout with old JoinGroup 
> version
> ---
>
> Key: KAFKA-9752
> URL: https://issues.apache.org/jira/browse/KAFKA-9752
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.2, 2.3.1, 2.4.1
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 2.2.3, 2.5.0, 2.3.2, 2.4.2
>
>
> For older versions of the JoinGroup protocol (v3 and below), there was no way 
> for new consumer group members to get their memberId until the first 
> rebalance completed. If the JoinGroup request timed out and the client 
> disconnected, the member would nevertheless be left in the group until the 
> rebalance completed and the session timeout expired. 
> In order to prevent this situation from causing the group size to grow 
> unboundedly, we added logic in KAFKA-7610 to limit the maximum time a new 
> member will be left in the group before it would be kicked out (in spite of 
> rebalance state). 
> In KAFKA-9232, we addressed one issue with this solution. Basically the new 
> member expiration logic did not properly get cancelled after the rebalance 
> completed which means that in certain cases, a successfully joined member 
> might get kicked out of the group unnecessarily. 
> Unfortunately, this patch introduced a regression in the normal session 
> expiration logic following completion of the initial rebalance. Basically the 
> expiration task fails to get scheduled properly. The issue is in this 
> function:
> {code}
>   def shouldKeepAlive(deadlineMs: Long): Boolean = {
> if (isNew) {
>   // New members are expired after the static join timeout
>   latestHeartbeat + GroupCoordinator.NewMemberJoinTimeoutMs > deadlineMs
> } else if (isAwaitingJoin || isAwaitingSync) {
>   // Don't remove members as long as they have a request in purgatory
>   true
> } else {
>   // Otherwise check for session expiration
>   latestHeartbeat + sessionTimeoutMs > deadlineMs
> }
>   }
> {code}
> We use this logic in order to check for session expiration. On the surface, 
> there is nothing wrong with it, but it has an odd interaction with the 
> purgatory. When the heartbeat is first scheduled with `tryCompleteElseWatch`, 
> the code relies on `shouldKeepAlive` returning false so that the heartbeat 
> task is not immediately completed. This only works because we update 
> `latestHeartbeat` just prior to calling `tryCompleteElseWatch`, which means 
> that the first or third checks will fail, `shouldKeepAlive` will return 
> false, and the heartbeat expiration task will not be immediately completed. 
> The bug in this case has to do with the case when `isNew` is true. When we 
> schedule the session expiration task, the `isNew` flag is still set to true, 
> which means we will hit the first check above. Since in most cases, the 
> session timeout is less than the new member timeout of 5 minutes, the check 
> is very likely to return true. This seems like what we would want, but as 
> noted above, we rely on this function returning false when the expiration 
> task is passed to `tryCompleteElseWatch`. Since it returns true instead, the 
> task completes immediately, which means we cannot rely on its expiration.
> The impact of this bug in the worst case is that a consumer group can be left 
> in the `PreparingRebalance` state indefinitely. This state will persist until 
> there is a coordinator change (e.g. as a result of restarting the broker). 
> Note that this is only possible if 1) we have a consumer using an old 
> JoinGroup version, 2) the consumer times out and disconnects from its initial 
> JoinGroup request. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9752) Consumer rebalance can be stuck after new member timeout with old JoinGroup version

2020-06-24 Thread Gokul Ramanan Subramanian (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17143638#comment-17143638
 ] 

Gokul Ramanan Subramanian commented on KAFKA-9752:
--

Would be nice if we could remove 2.3.2 and 2.4.2 from the fixed versions 
section, since these Kafka versions don't seem to exist.

> Consumer rebalance can be stuck after new member timeout with old JoinGroup 
> version
> ---
>
> Key: KAFKA-9752
> URL: https://issues.apache.org/jira/browse/KAFKA-9752
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.2, 2.3.1, 2.4.1
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 2.2.3, 2.5.0, 2.3.2, 2.4.2
>
>
> For older versions of the JoinGroup protocol (v3 and below), there was no way 
> for new consumer group members to get their memberId until the first 
> rebalance completed. If the JoinGroup request timed out and the client 
> disconnected, the member would nevertheless be left in the group until the 
> rebalance completed and the session timeout expired. 
> In order to prevent this situation from causing the group size to grow 
> unboundedly, we added logic in KAFKA-7610 to limit the maximum time a new 
> member will be left in the group before it would be kicked out (in spite of 
> rebalance state). 
> In KAFKA-9232, we addressed one issue with this solution. Basically the new 
> member expiration logic did not properly get cancelled after the rebalance 
> completed which means that in certain cases, a successfully joined member 
> might get kicked out of the group unnecessarily. 
> Unfortunately, this patch introduced a regression in the normal session 
> expiration logic following completion of the initial rebalance. Basically the 
> expiration task fails to get scheduled properly. The issue is in this 
> function:
> {code}
>   def shouldKeepAlive(deadlineMs: Long): Boolean = {
> if (isNew) {
>   // New members are expired after the static join timeout
>   latestHeartbeat + GroupCoordinator.NewMemberJoinTimeoutMs > deadlineMs
> } else if (isAwaitingJoin || isAwaitingSync) {
>   // Don't remove members as long as they have a request in purgatory
>   true
> } else {
>   // Otherwise check for session expiration
>   latestHeartbeat + sessionTimeoutMs > deadlineMs
> }
>   }
> {code}
> We use this logic in order to check for session expiration. On the surface, 
> there is nothing wrong with it, but it has an odd interaction with the 
> purgatory. When the heartbeat is first scheduled with `tryCompleteElseWatch`, 
> the code relies on `shouldKeepAlive` returning false so that the heartbeat 
> task is not immediately completed. This only works because we update 
> `latestHeartbeat` just prior to calling `tryCompleteElseWatch`, which means 
> that the first or third checks will fail, `shouldKeepAlive` will return 
> false, and the heartbeat expiration task will not be immediately completed. 
> The bug in this case has to do with the case when `isNew` is true. When we 
> schedule the session expiration task, the `isNew` flag is still set to true, 
> which means we will hit the first check above. Since in most cases, the 
> session timeout is less than the new member timeout of 5 minutes, the check 
> is very likely to return true. This seems like what we would want, but as 
> noted above, we rely on this function returning false when the expiration 
> task is passed to `tryCompleteElseWatch`. Since it returns true instead, the 
> task completes immediately, which means we cannot rely on its expiration.
> The impact of this bug in the worst case is that a consumer group can be left 
> in the `PreparingRebalance` state indefinitely. This state will persist until 
> there is a coordinator change (e.g. as a result of restarting the broker). 
> Note that this is only possible if 1) we have a consumer using an old 
> JoinGroup version, 2) the consumer times out and disconnects from its initial 
> JoinGroup request. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9752) Consumer rebalance can be stuck after new member timeout with old JoinGroup version

2020-03-26 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17067679#comment-17067679
 ] 

ASF GitHub Bot commented on KAFKA-9752:
---

rajinisivaram commented on pull request #8354: KAFKA-9752; New member timeout 
can leave group rebalance stuck (#8339)
URL: https://github.com/apache/kafka/pull/8354
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Consumer rebalance can be stuck after new member timeout with old JoinGroup 
> version
> ---
>
> Key: KAFKA-9752
> URL: https://issues.apache.org/jira/browse/KAFKA-9752
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.2, 2.3.1, 2.4.1
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 2.5.0, 2.3.2, 2.4.2
>
>
> For older versions of the JoinGroup protocol, there was no way for new 
> consumer group members to get their memberId until the first rebalance 
> completed. If the JoinGroup request timed out and the client disconnected, 
> the member would nevertheless be left in the group until the rebalance 
> completed and the session timeout expired. 
> In order to prevent this situation from causing the group size to grow 
> unboundedly, we added logic in KAFKA-7610 to limit the maximum time a new 
> member will be left in the group before it would be kicked out (in spite of 
> rebalance state). 
> In KAFKA-9232, we addressed one issue with this solution. Basically the new 
> member expiration logic did not properly get cancelled after the rebalance 
> completed which means that in certain cases, a successfully joined member 
> might get kicked out of the group unnecessarily. 
> Unfortunately, this patch introduced a regression in the normal session 
> expiration logic following completion of the initial rebalance. Basically the 
> expiration task fails to get scheduled properly. The issue is in this 
> function:
> {code}
>   def shouldKeepAlive(deadlineMs: Long): Boolean = {
> if (isNew) {
>   // New members are expired after the static join timeout
>   latestHeartbeat + GroupCoordinator.NewMemberJoinTimeoutMs > deadlineMs
> } else if (isAwaitingJoin || isAwaitingSync) {
>   // Don't remove members as long as they have a request in purgatory
>   true
> } else {
>   // Otherwise check for session expiration
>   latestHeartbeat + sessionTimeoutMs > deadlineMs
> }
>   }
> {code}
> We use this logic in order to check for session expiration. On the surface, 
> there is nothing wrong with it, but it has an odd interaction with the 
> purgatory. When the heartbeat is first scheduled with `tryCompleteElseWatch`, 
> the code relies on `shouldKeepAlive` returning false so that the heartbeat 
> task is not immediately completed. This only works because we update 
> `latestHeartbeat` just prior to calling `tryCompleteElseWatch`, which means 
> that the first or third checks will fail, `shouldKeepAlive` will return 
> false, and the heartbeat expiration task will not be immediately completed. 
> The bug in this case has to do with the case when `isNew` is true. When we 
> schedule the session expiration task, the `isNew` flag is still set to true, 
> which means we will hit the first check above. Since in most cases, the 
> session timeout is less than the new member timeout of 5 minutes, the check 
> is very likely to return true. This seems like what we would want, but as 
> noted above, we rely on this function returning false when the expiration 
> task is passed to `tryCompleteElseWatch`. Since it returns true instead, the 
> task completes immediately, which means we cannot rely on its expiration.
> The impact of this bug in the worst case is that a consumer group can be left 
> in the `PreparingRebalance` state indefinitely. This state will persist until 
> there is a coordinator change (e.g. as a result of restarting the broker). 
> Note that this is only possible if 1) we have a consumer using an old 
> JoinGroup version, 2) the consumer times out and disconnects from its initial 
> JoinGroup request. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9752) Consumer rebalance can be stuck after new member timeout with old JoinGroup version

2020-03-25 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17067065#comment-17067065
 ] 

ASF GitHub Bot commented on KAFKA-9752:
---

rajinisivaram commented on pull request #8354: KAFKA-9752; New member timeout 
can leave group rebalance stuck (#8339)
URL: https://github.com/apache/kafka/pull/8354
 
 
   Older versions of the JoinGroup rely on a new member timeout to keep the 
group from growing indefinitely in the case of client disconnects and retrying. 
The logic for resetting the heartbeat expiration task following completion of 
the rebalance failed to account for an implicit expectation that 
shouldKeepAlive would return false the first time it is invoked when a 
heartbeat expiration is scheduled. This patch fixes the issue by making 
heartbeat satisfaction logic explicit.
   
   Reviewers:  Chia-Ping Tsai , Guozhang Wang 
, Rajini Sivaram 
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Consumer rebalance can be stuck after new member timeout with old JoinGroup 
> version
> ---
>
> Key: KAFKA-9752
> URL: https://issues.apache.org/jira/browse/KAFKA-9752
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.2, 2.3.1, 2.4.1
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 2.5.0, 2.3.2, 2.4.2
>
>
> For older versions of the JoinGroup protocol, there was no way for new 
> consumer group members to get their memberId until the first rebalance 
> completed. If the JoinGroup request timed out and the client disconnected, 
> the member would nevertheless be left in the group until the rebalance 
> completed and the session timeout expired. 
> In order to prevent this situation from causing the group size to grow 
> unboundedly, we added logic in KAFKA-7610 to limit the maximum time a new 
> member will be left in the group before it would be kicked out (in spite of 
> rebalance state). 
> In KAFKA-9232, we addressed one issue with this solution. Basically the new 
> member expiration logic did not properly get cancelled after the rebalance 
> completed which means that in certain cases, a successfully joined member 
> might get kicked out of the group unnecessarily. 
> Unfortunately, this patch introduced a regression in the normal session 
> expiration logic following completion of the initial rebalance. Basically the 
> expiration task fails to get scheduled properly. The issue is in this 
> function:
> {code}
>   def shouldKeepAlive(deadlineMs: Long): Boolean = {
> if (isNew) {
>   // New members are expired after the static join timeout
>   latestHeartbeat + GroupCoordinator.NewMemberJoinTimeoutMs > deadlineMs
> } else if (isAwaitingJoin || isAwaitingSync) {
>   // Don't remove members as long as they have a request in purgatory
>   true
> } else {
>   // Otherwise check for session expiration
>   latestHeartbeat + sessionTimeoutMs > deadlineMs
> }
>   }
> {code}
> We use this logic in order to check for session expiration. On the surface, 
> there is nothing wrong with it, but it has an odd interaction with the 
> purgatory. When the heartbeat is first scheduled with `tryCompleteElseWatch`, 
> the code relies on `shouldKeepAlive` returning false so that the heartbeat 
> task is not immediately completed. This only works because we update 
> `latestHeartbeat` just prior to calling `tryCompleteElseWatch`, which means 
> that the first or third checks will fail, `shouldKeepAlive` will return 
> false, and the heartbeat expiration task will not be immediately completed. 
> The bug in this case has to do with the case when `isNew` is true. When we 
> schedule the session expiration task, the `isNew` flag is still set to true, 
> which means we will hit the first check above. Since in most cases, the 
> session timeout is less than the new member tim

[jira] [Commented] (KAFKA-9752) Consumer rebalance can be stuck after new member timeout with old JoinGroup version

2020-03-24 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17066395#comment-17066395
 ] 

ASF GitHub Bot commented on KAFKA-9752:
---

hachikuji commented on pull request #8339: KAFKA-9752; New member timeout can 
leave group rebalance stuck
URL: https://github.com/apache/kafka/pull/8339
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Consumer rebalance can be stuck after new member timeout with old JoinGroup 
> version
> ---
>
> Key: KAFKA-9752
> URL: https://issues.apache.org/jira/browse/KAFKA-9752
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.2, 2.3.1, 2.4.1
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 2.5.0
>
>
> For older versions of the JoinGroup protocol, there was no way for new 
> consumer group members to get their memberId until the first rebalance 
> completed. If the JoinGroup request timed out and the client disconnected, 
> the member would nevertheless be left in the group until the rebalance 
> completed and the session timeout expired. 
> In order to prevent this situation from causing the group size to grow 
> unboundedly, we added logic in KAFKA-7610 to limit the maximum time a new 
> member will be left in the group before it would be kicked out (in spite of 
> rebalance state). 
> In KAFKA-9232, we addressed one issue with this solution. Basically the new 
> member expiration logic did not properly get cancelled after the rebalance 
> completed which means that in certain cases, a successfully joined member 
> might get kicked out of the group unnecessarily. 
> Unfortunately, this patch introduced a regression in the normal session 
> expiration logic following completion of the initial rebalance. Basically the 
> expiration task fails to get scheduled properly. The issue is in this 
> function:
> {code}
>   def shouldKeepAlive(deadlineMs: Long): Boolean = {
> if (isNew) {
>   // New members are expired after the static join timeout
>   latestHeartbeat + GroupCoordinator.NewMemberJoinTimeoutMs > deadlineMs
> } else if (isAwaitingJoin || isAwaitingSync) {
>   // Don't remove members as long as they have a request in purgatory
>   true
> } else {
>   // Otherwise check for session expiration
>   latestHeartbeat + sessionTimeoutMs > deadlineMs
> }
>   }
> {code}
> We use this logic in order to check for session expiration. On the surface, 
> there is nothing wrong with it, but it has an odd interaction with the 
> purgatory. When the heartbeat is first scheduled with `tryCompleteElseWatch`, 
> the code relies on `shouldKeepAlive` returning false so that the heartbeat 
> task is not immediately completed. This only works because we update 
> `latestHeartbeat` just prior to calling `tryCompleteElseWatch`, which means 
> that the first or third checks will fail, `shouldKeepAlive` will return 
> false, and the heartbeat expiration task will not be immediately completed. 
> The bug in this case has to do with the case when `isNew` is true. When we 
> schedule the session expiration task, the `isNew` flag is still set to true, 
> which means we will hit the first check above. Since in most cases, the 
> session timeout is less than the new member timeout of 5 minutes, the check 
> is very likely to return true. This seems like what we would want, but as 
> noted above, we rely on this function returning false when the expiration 
> task is passed to `tryCompleteElseWatch`. Since it returns true instead, the 
> task completes immediately, which means we cannot rely on its expiration.
> The impact of this bug in the worst case is that a consumer group can be left 
> in the `PreparingRebalance` state indefinitely. This state will persist until 
> there is a coordinator change (e.g. as a result of restarting the broker). 
> Note that this is only possible if 1) we have a consumer using an old 
> JoinGroup version, 2) the consumer times out and disconnects from its initial 
> JoinGroup request. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9752) Consumer rebalance can be stuck after new member timeout with old JoinGroup version

2020-03-24 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17065371#comment-17065371
 ] 

ASF GitHub Bot commented on KAFKA-9752:
---

hachikuji commented on pull request #8339: KAFKA-9752; New member timeout can 
leave group rebalance stuck
URL: https://github.com/apache/kafka/pull/8339
 
 
   Older versions of the JoinGroup rely on a new member timeout to keep the 
group from growing indefinitely in the case of client disconnects and retrying. 
The logic for resetting the heartbeat expiration task following completion of 
the rebalance failed to account for an implicit expectation that 
`shouldKeepAlive` would return false the first time it is invoked when a 
heartbeat expiration is scheduled. This patch fixes the issue by making 
heartbeat satisfaction logic explicit.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Consumer rebalance can be stuck after new member timeout with old JoinGroup 
> version
> ---
>
> Key: KAFKA-9752
> URL: https://issues.apache.org/jira/browse/KAFKA-9752
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.2, 2.3.1, 2.4.1
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 2.5.0
>
>
> For older versions of the JoinGroup protocol, there was no way for new 
> consumer group members to get their memberId until the first rebalance 
> completed. If the JoinGroup request timed out and the client disconnected, 
> the member would nevertheless be left in the group until the rebalance 
> completed and the session timeout expired. 
> In order to prevent this situation from causing the group size to grow 
> unboundedly, we added logic in KAFKA-7610 to limit the maximum time a new 
> member will be left in the group before it would be kicked out (in spite of 
> rebalance state). 
> In KAFKA-9232, we addressed one issue with this solution. Basically the new 
> member expiration logic did not properly get cancelled after the rebalance 
> completed which means that in certain cases, a successfully joined member 
> might get kicked out of the group unnecessarily. 
> Unfortunately, this patch introduced a regression in the normal session 
> expiration logic following completion of the initial rebalance. Basically the 
> expiration task fails to get scheduled properly. The issue is in this 
> function:
> {code}
>   def shouldKeepAlive(deadlineMs: Long): Boolean = {
> if (isNew) {
>   // New members are expired after the static join timeout
>   latestHeartbeat + GroupCoordinator.NewMemberJoinTimeoutMs > deadlineMs
> } else if (isAwaitingJoin || isAwaitingSync) {
>   // Don't remove members as long as they have a request in purgatory
>   true
> } else {
>   // Otherwise check for session expiration
>   latestHeartbeat + sessionTimeoutMs > deadlineMs
> }
>   }
> {code}
> We use this logic in order to check for session expiration. On the surface, 
> there is nothing wrong with it, but it has an odd interaction with the 
> purgatory. When the heartbeat is first scheduled with `tryCompleteElseWatch`, 
> the code relies on `shouldKeepAlive` returning false so that the heartbeat 
> task is not immediately completed. This only works because we update 
> `latestHeartbeat` just prior to calling `tryCompleteElseWatch`, which means 
> that the first or third checks will fail, `shouldKeepAlive` will return 
> false, and the heartbeat expiration task will not be immediately completed. 
> The bug in this case has to do with the case when `isNew` is true. When we 
> schedule the session expiration task, the `isNew` flag is still set to true, 
> which means we will hit the first check above. Since in most cases, the 
> session timeout is less than the new member timeout of 5 minutes, the check 
> is very likely to return true. This seems like what we would want, but as 
> noted above, we rely on this function returning false when the expiration 
> task is passed to `tryCompleteElseWatch`. Since it returns true instead, the 
> task completes immediately, which means we cannot rely on its expiration.
> The impact of this bug in the worst case is that a consumer group can be left 
> in the `PreparingRebalance` state indefinitely. This state will persist unt