[jira] [Updated] (KAFKA-9821) Stream task may skip assignment with static members and incremental rebalances

2020-06-01 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-9821:
---
Description: 
When static membership (KIP-345) and incremental rebalancing (KIP-429) are 
turned on at the same time, that upon failure it is possible some partitions 
are not assigned to anyone. The event sequence is the following:

1. An assignment (task1) from rebalance is sent to an existing static member 
with owned list (partition1, partition2), hence upon receiving the assignment 
the static member is supposed to revoke partition2 and then re-join the group 
to trigger another rebalance.

2. The member crashed before re-join the group, lost all of its assigned 
partitions. However since this member is static with long session timeout, it 
was not kicked out of the group yet at the coordinator side.

3. The member resumes and then re-join with a known instance.id. The 
coordinator would not trigger a rebalance in this case and just give it the 
previous assignment (partition1), and since the member has forgot about its 
previous owned partitions it would just take partition1 and not re-join.

4. As a result the partition2 is not owned by this member any more but not 
re-assigned to anyone; until the next rebalance it would not be fetched by any 
member of the group.

The key here is that today we are relying on the member's local memory to 
calculate the added / revoked diff based on (owned, assigned). But if the 
member crashed and lost all of its owned partition, AND if it is a static 
member whose re-join would not trigger a new rebalance, this will break. 

  was:
When static membership (KIP-345) and incremental rebalancing (KIP-429) are 
turned on at the same time, that upon failure it is possible some partitions 
are not assigned to anyone. The event sequence is the following:

1. An assignment (task1) from rebalance is sent to an existing static member 
with owned list (partition1, partition2), hence upon receiving the assignment 
the static member is supposed to revoke partition2 and then re-join the group 
to trigger another rebalance.

2. The member crashed before re-join the group, lost all of its assigned 
partitions. However since this member is static with long session timeout, it 
was not kicked out of the group yet at the coordinator side.

3. The member resumes and then re-join with a known instance.id. The 
coordinator would not trigger a rebalance in this case and just give it the 
previous assignment (partition1), and since the member has forgot about its 
previous owned partitions it would just take partition1 and not re-join.

4. As a result the partition2 is not owned by this member any more but not 
re-assigned to anyone; until the next rebalance it would not be fetched by any 
member of the group.

The key here is that today we are relying on the member's local memory to 
calculate the added / revoked diff based on (owned, assigned). But if the 
member crashed and lost all of its owned partition, AND if it is a static 
member whose re-join would not trigger a new rebalance, this will break.

After thinking about that I think 1) on the consumer side, maybe we should 
augment the Assignment protocol with an error code indicating rebalance_needed 
which is going to be persisted on broker’s offset topic as well so that 
consumer member can learn about this without comparing the assignment with its 
owned partitions.

 


> Stream task may skip assignment with static members and incremental rebalances
> --
>
> Key: KAFKA-9821
> URL: https://issues.apache.org/jira/browse/KAFKA-9821
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Guozhang Wang
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.6.0
>
>
> When static membership (KIP-345) and incremental rebalancing (KIP-429) are 
> turned on at the same time, that upon failure it is possible some partitions 
> are not assigned to anyone. The event sequence is the following:
> 1. An assignment (task1) from rebalance is sent to an existing static member 
> with owned list (partition1, partition2), hence upon receiving the assignment 
> the static member is supposed to revoke partition2 and then re-join the group 
> to trigger another rebalance.
> 2. The member crashed before re-join the group, lost all of its assigned 
> partitions. However since this member is static with long session timeout, it 
> was not kicked out of the group yet at the coordinator side.
> 3. The member resumes and then re-join with a known instance.id. The 
> coordinator would not trigger a rebalance in this case and just give it the 
> previous assignment (partition1), and since the member has forgot about its 

[jira] [Updated] (KAFKA-9821) Stream task may skip assignment with static members and incremental rebalances

2020-06-01 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-9821:
---
Fix Version/s: 2.6.0

> Stream task may skip assignment with static members and incremental rebalances
> --
>
> Key: KAFKA-9821
> URL: https://issues.apache.org/jira/browse/KAFKA-9821
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Guozhang Wang
>Assignee: Sophie Blee-Goldman
>Priority: Major
>  Labels: need-kip
> Fix For: 2.6.0
>
>
> When static membership (KIP-345) and incremental rebalancing (KIP-429) are 
> turned on at the same time, that upon failure it is possible some partitions 
> are not assigned to anyone. The event sequence is the following:
> 1. An assignment (task1) from rebalance is sent to an existing static member 
> with owned list (partition1, partition2), hence upon receiving the assignment 
> the static member is supposed to revoke partition2 and then re-join the group 
> to trigger another rebalance.
> 2. The member crashed before re-join the group, lost all of its assigned 
> partitions. However since this member is static with long session timeout, it 
> was not kicked out of the group yet at the coordinator side.
> 3. The member resumes and then re-join with a known instance.id. The 
> coordinator would not trigger a rebalance in this case and just give it the 
> previous assignment (partition1), and since the member has forgot about its 
> previous owned partitions it would just take partition1 and not re-join.
> 4. As a result the partition2 is not owned by this member any more but not 
> re-assigned to anyone; until the next rebalance it would not be fetched by 
> any member of the group.
> The key here is that today we are relying on the member's local memory to 
> calculate the added / revoked diff based on (owned, assigned). But if the 
> member crashed and lost all of its owned partition, AND if it is a static 
> member whose re-join would not trigger a new rebalance, this will break.
> After thinking about that I think 1) on the consumer side, maybe we should 
> augment the Assignment protocol with an error code indicating 
> rebalance_needed which is going to be persisted on broker’s offset topic as 
> well so that consumer member can learn about this without comparing the 
> assignment with its owned partitions.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9821) Stream task may skip assignment with static members and incremental rebalances

2020-06-01 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-9821:
---
Affects Version/s: 2.4.0

> Stream task may skip assignment with static members and incremental rebalances
> --
>
> Key: KAFKA-9821
> URL: https://issues.apache.org/jira/browse/KAFKA-9821
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Guozhang Wang
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.6.0
>
>
> When static membership (KIP-345) and incremental rebalancing (KIP-429) are 
> turned on at the same time, that upon failure it is possible some partitions 
> are not assigned to anyone. The event sequence is the following:
> 1. An assignment (task1) from rebalance is sent to an existing static member 
> with owned list (partition1, partition2), hence upon receiving the assignment 
> the static member is supposed to revoke partition2 and then re-join the group 
> to trigger another rebalance.
> 2. The member crashed before re-join the group, lost all of its assigned 
> partitions. However since this member is static with long session timeout, it 
> was not kicked out of the group yet at the coordinator side.
> 3. The member resumes and then re-join with a known instance.id. The 
> coordinator would not trigger a rebalance in this case and just give it the 
> previous assignment (partition1), and since the member has forgot about its 
> previous owned partitions it would just take partition1 and not re-join.
> 4. As a result the partition2 is not owned by this member any more but not 
> re-assigned to anyone; until the next rebalance it would not be fetched by 
> any member of the group.
> The key here is that today we are relying on the member's local memory to 
> calculate the added / revoked diff based on (owned, assigned). But if the 
> member crashed and lost all of its owned partition, AND if it is a static 
> member whose re-join would not trigger a new rebalance, this will break.
> After thinking about that I think 1) on the consumer side, maybe we should 
> augment the Assignment protocol with an error code indicating 
> rebalance_needed which is going to be persisted on broker’s offset topic as 
> well so that consumer member can learn about this without comparing the 
> assignment with its owned partitions.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9821) Stream task may skip assignment with static members and incremental rebalances

2020-06-01 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-9821:
---
Summary: Stream task may skip assignment with static members and 
incremental rebalances  (was: Partition may skip assignment with static members 
and incremental rebalances)

> Stream task may skip assignment with static members and incremental rebalances
> --
>
> Key: KAFKA-9821
> URL: https://issues.apache.org/jira/browse/KAFKA-9821
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: need-kip
>
> When static membership (KIP-345) and incremental rebalancing (KIP-429) are 
> turned on at the same time, that upon failure it is possible some partitions 
> are not assigned to anyone. The event sequence is the following:
> 1. An assignment (task1) from rebalance is sent to an existing static member 
> with owned list (partition1, partition2), hence upon receiving the assignment 
> the static member is supposed to revoke partition2 and then re-join the group 
> to trigger another rebalance.
> 2. The member crashed before re-join the group, lost all of its assigned 
> partitions. However since this member is static with long session timeout, it 
> was not kicked out of the group yet at the coordinator side.
> 3. The member resumes and then re-join with a known instance.id. The 
> coordinator would not trigger a rebalance in this case and just give it the 
> previous assignment (partition1), and since the member has forgot about its 
> previous owned partitions it would just take partition1 and not re-join.
> 4. As a result the partition2 is not owned by this member any more but not 
> re-assigned to anyone; until the next rebalance it would not be fetched by 
> any member of the group.
> The key here is that today we are relying on the member's local memory to 
> calculate the added / revoked diff based on (owned, assigned). But if the 
> member crashed and lost all of its owned partition, AND if it is a static 
> member whose re-join would not trigger a new rebalance, this will break.
> After thinking about that I think 1) on the consumer side, maybe we should 
> augment the Assignment protocol with an error code indicating 
> rebalance_needed which is going to be persisted on broker’s offset topic as 
> well so that consumer member can learn about this without comparing the 
> assignment with its owned partitions.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9821) Stream task may skip assignment with static members and incremental rebalances

2020-06-01 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-9821:
---
Labels:   (was: need-kip)

> Stream task may skip assignment with static members and incremental rebalances
> --
>
> Key: KAFKA-9821
> URL: https://issues.apache.org/jira/browse/KAFKA-9821
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Guozhang Wang
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.6.0
>
>
> When static membership (KIP-345) and incremental rebalancing (KIP-429) are 
> turned on at the same time, that upon failure it is possible some partitions 
> are not assigned to anyone. The event sequence is the following:
> 1. An assignment (task1) from rebalance is sent to an existing static member 
> with owned list (partition1, partition2), hence upon receiving the assignment 
> the static member is supposed to revoke partition2 and then re-join the group 
> to trigger another rebalance.
> 2. The member crashed before re-join the group, lost all of its assigned 
> partitions. However since this member is static with long session timeout, it 
> was not kicked out of the group yet at the coordinator side.
> 3. The member resumes and then re-join with a known instance.id. The 
> coordinator would not trigger a rebalance in this case and just give it the 
> previous assignment (partition1), and since the member has forgot about its 
> previous owned partitions it would just take partition1 and not re-join.
> 4. As a result the partition2 is not owned by this member any more but not 
> re-assigned to anyone; until the next rebalance it would not be fetched by 
> any member of the group.
> The key here is that today we are relying on the member's local memory to 
> calculate the added / revoked diff based on (owned, assigned). But if the 
> member crashed and lost all of its owned partition, AND if it is a static 
> member whose re-join would not trigger a new rebalance, this will break.
> After thinking about that I think 1) on the consumer side, maybe we should 
> augment the Assignment protocol with an error code indicating 
> rebalance_needed which is going to be persisted on broker’s offset topic as 
> well so that consumer member can learn about this without comparing the 
> assignment with its owned partitions.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9821) Stream task may skip assignment with static members and incremental rebalances

2020-06-01 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-9821:
---
Component/s: (was: consumer)
 streams

> Stream task may skip assignment with static members and incremental rebalances
> --
>
> Key: KAFKA-9821
> URL: https://issues.apache.org/jira/browse/KAFKA-9821
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.6.0
>
>
> When static membership (KIP-345) and incremental rebalancing (KIP-429) are 
> turned on at the same time, that upon failure it is possible some partitions 
> are not assigned to anyone. The event sequence is the following:
> 1. An assignment (task1) from rebalance is sent to an existing static member 
> with owned list (partition1, partition2), hence upon receiving the assignment 
> the static member is supposed to revoke partition2 and then re-join the group 
> to trigger another rebalance.
> 2. The member crashed before re-join the group, lost all of its assigned 
> partitions. However since this member is static with long session timeout, it 
> was not kicked out of the group yet at the coordinator side.
> 3. The member resumes and then re-join with a known instance.id. The 
> coordinator would not trigger a rebalance in this case and just give it the 
> previous assignment (partition1), and since the member has forgot about its 
> previous owned partitions it would just take partition1 and not re-join.
> 4. As a result the partition2 is not owned by this member any more but not 
> re-assigned to anyone; until the next rebalance it would not be fetched by 
> any member of the group.
> The key here is that today we are relying on the member's local memory to 
> calculate the added / revoked diff based on (owned, assigned). But if the 
> member crashed and lost all of its owned partition, AND if it is a static 
> member whose re-join would not trigger a new rebalance, this will break.
> After thinking about that I think 1) on the consumer side, maybe we should 
> augment the Assignment protocol with an error code indicating 
> rebalance_needed which is going to be persisted on broker’s offset topic as 
> well so that consumer member can learn about this without comparing the 
> assignment with its owned partitions.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)