[ 
https://issues.apache.org/jira/browse/KAFKA-10455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17194469#comment-17194469
 ] 

Sophie Blee-Goldman commented on KAFKA-10455:
---------------------------------------------

Yeah, I think you're touching on a related issue – fast detection of source 
topic deletion – which I agree can/should be solved along the same lines as the 
probing rebalance bug in this ticket.

Just for some context, to avoid accidental data corruption/loss, we of course 
want to react as fast as possible on source topic deletion. Currently we detect 
the absence of source topics during a rebalance and send an error code to all 
members to shut down. The problem is that there may be a gap of up to the 
metadata.max.age (default 5min) between the topic deletion and the reaction, ie 
triggering a rebalance and informing all members to shut down. Since only the 
leader is guaranteed to trigger a rebalance upon sending a JoinGroup, unless 
the leader happens to be assigned one of the partitions of the deleted tasks, 
it will not notice the topic deletion until it refreshes its metadata. If 
non-leaders are assigned to these deleted partitions and notice the topic 
deletion, they may not be able to trigger a rebalance even if they rejoin the 
group.

Both problems could be solved by modifying the userdata to ensure any member's 
JoinGroup results in a rebalance. We could just add a single byte to the 
SubscriptionInfo and bump it when rejoining. This actually seems like a better 
all-around solution, since members of Streams should not be haphazardly sending 
JoinGroups for no reason – if they do, it must be because they want a 
rebalance. This way we don't have to worry about changing any broker side code 
and finding a workaround for older brokers. 

We could also take the approach of making sure the leader is responsible for 
triggering the rebalance, but this doesn't solve the source topic deletion 
problem. It also wouldn't help us in any new feature we wanted to add that 
required arbitrary members to trigger a rebalance. So I think we should just go 
with bumping a byte in the SubscriptionInfo

> Probing rebalances are not guaranteed to be triggered by non-leader members
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-10455
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10455
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.6.0
>            Reporter: Sophie Blee-Goldman
>            Priority: Major
>
> Apparently, if a consumer rejoins the group with the same subscription 
> userdata that it previously sent, it will not trigger a rebalance. The one 
> exception here is that the group leader will always trigger a rebalance when 
> it rejoins the group.
> This has implications for KIP-441, where we rely on asking an arbitrary 
> thread to enforce the followup probing rebalances. Technically we do ask a 
> thread living on the same instance as the leader, so the odds that the leader 
> will be chosen aren't completely abysmal, but for any multithreaded 
> application they are still at best only 50%.
> Of course in general the userdata will have changed within a span of 10 
> minutes, so the actual likelihood of hitting this is much lower –  it can 
> only happen if the member's task offset sums remained unchanged. 
> Realistically, this probably requires that the member only have 
> fully-restored active tasks (encoded with the constant sentinel -2) and that 
> no tasks be added or removed.
>  
> One solution would be to make sure the leader is responsible for the probing 
> rebalance. To do this, we would need to somehow expose the memberId of the 
> thread's main consumer to the partition assignor. I'm actually not sure if 
> that's currently possible to figure out or not. If not, we could just assign 
> the probing rebalance to every thread on the leader's instance. This 
> shouldn't result in multiple followup rebalances as the rebalance schedule 
> will be updated/reset on the first followup rebalance.
> Another solution would be to make sure the userdata is always different. We 
> could encode an extra bit that flip-flops, but then we'd have to persist the 
> latest value somewhere/somehow. Alternatively we could just encode the next 
> probing rebalance time in the subscription userdata, since that is guaranteed 
> to always be different from the previous rebalance. This might get tricky 
> though, and certainly wastes space in the subscription userdata. Also, this 
> would only solve the problem for KIP-441 probing rebalances, meaning we'd 
> have to individually ensure the userdata has changed for every type of 
> followup rebalance (see related issue below). So the first proposal, 
> requiring the leader trigger the rebalance, would be preferable.
> Note that, imho, we should just allow anyone to trigger a rebalance by 
> rejoining the group. But this would presumably require a broker-side change 
> and thus we would still need a workaround for KIP-441 to work with brokers.
>  
> Related issue:
> This also means the Streams workaround for [KAFKA-9821|http://example.com] is 
> not airtight, as we encode the followup rebalance in the member who is 
> supposed to _receive_ a revoked partition, rather than the member who is 
> actually revoking said partition. While the member doing the revoking will be 
> guaranteed to have different userdata, the member receiving the partition may 
> not. Making it the responsibility of the leader to trigger _any_ type of 
> followup rebalance would solve this issue as well.
> Note that other types of followup rebalance (version probing, static 
> membership with host info change) are guaranteed to have a change in the 
> subscription userdata, and will not hit this bug



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to